14 from mgr_module
import MgrModule
16 from contextlib
import contextmanager
17 from datetime
import datetime
, timedelta
18 from functools
import partial
, wraps
19 from threading
import Condition
, Lock
, Thread
22 GLOBAL_POOL_KEY
= (None, None)
24 QUERY_POOL_ID
= "pool_id"
25 QUERY_POOL_ID_MAP
= "pool_id_map"
26 QUERY_IDS
= "query_ids"
27 QUERY_SUM_POOL_COUNTERS
= "pool_counters"
28 QUERY_RAW_POOL_COUNTERS
= "raw_pool_counters"
29 QUERY_LAST_REQUEST
= "last_request"
31 OSD_PERF_QUERY_REGEX_MATCH_ALL
= '^(.*)$'
32 OSD_PERF_QUERY_COUNTERS
= ['write_ops',
38 OSD_PERF_QUERY_COUNTERS_INDICES
= {
39 OSD_PERF_QUERY_COUNTERS
[i
]: i
for i
in range(len(OSD_PERF_QUERY_COUNTERS
))}
41 OSD_PERF_QUERY_LATENCY_COUNTER_INDICES
= [4, 5]
42 OSD_PERF_QUERY_MAX_RESULTS
= 256
44 POOL_REFRESH_INTERVAL
= timedelta(minutes
=5)
45 QUERY_EXPIRE_INTERVAL
= timedelta(minutes
=1)
46 STATS_RATE_INTERVAL
= timedelta(minutes
=1)
48 REPORT_MAX_RESULTS
= 64
50 RBD_TASK_OID
= "rbd_task"
52 TASK_SEQUENCE
= "sequence"
55 TASK_MESSAGE
= "message"
56 TASK_RETRY_TIME
= "retry_time"
57 TASK_IN_PROGRESS
= "in_progress"
58 TASK_PROGRESS
= "progress"
59 TASK_CANCELED
= "canceled"
61 TASK_REF_POOL_NAME
= "pool_name"
62 TASK_REF_POOL_NAMESPACE
= "pool_namespace"
63 TASK_REF_IMAGE_NAME
= "image_name"
64 TASK_REF_IMAGE_ID
= "image_id"
65 TASK_REF_ACTION
= "action"
67 TASK_REF_ACTION_FLATTEN
= "flatten"
68 TASK_REF_ACTION_REMOVE
= "remove"
69 TASK_REF_ACTION_TRASH_REMOVE
= "trash remove"
70 TASK_REF_ACTION_MIGRATION_EXECUTE
= "migrate execute"
71 TASK_REF_ACTION_MIGRATION_COMMIT
= "migrate commit"
72 TASK_REF_ACTION_MIGRATION_ABORT
= "migrate abort"
74 VALID_TASK_ACTIONS
= [TASK_REF_ACTION_FLATTEN
,
75 TASK_REF_ACTION_REMOVE
,
76 TASK_REF_ACTION_TRASH_REMOVE
,
77 TASK_REF_ACTION_MIGRATION_EXECUTE
,
78 TASK_REF_ACTION_MIGRATION_COMMIT
,
79 TASK_REF_ACTION_MIGRATION_ABORT
]
81 TASK_RETRY_INTERVAL
= timedelta(seconds
=30)
82 MAX_COMPLETED_TASKS
= 50
85 class NotAuthorizedError(Exception):
89 def is_authorized(module
, pool
, namespace
):
90 return module
.is_authorized({"pool": pool
or '',
91 "namespace": namespace
or ''})
94 def authorize_request(module
, pool
, namespace
):
95 if not is_authorized(module
, pool
, namespace
):
96 raise NotAuthorizedError("not authorized on pool={}, namespace={}".format(
100 def extract_pool_key(pool_spec
):
102 return GLOBAL_POOL_KEY
104 match
= re
.match(r
'^([^/]+)(?:/([^/]+))?$', pool_spec
)
106 raise ValueError("Invalid pool spec: {}".format(pool_spec
))
107 return (match
.group(1), match
.group(2) or '')
110 def get_rbd_pools(module
):
111 osd_map
= module
.get('osd_map')
112 return {pool
['pool']: pool
['pool_name'] for pool
in osd_map
['pools']
113 if 'rbd' in pool
.get('application_metadata', {})}
121 query_condition
= Condition(lock
)
122 refresh_condition
= Condition(lock
)
125 image_name_cache
= {}
126 image_name_refresh_time
= datetime
.fromtimestamp(0)
129 def prepare_regex(cls
, value
):
130 return '^({})$'.format(value
)
133 def prepare_osd_perf_query(cls
, pool_id
, namespace
, counter_type
):
134 pool_id_regex
= OSD_PERF_QUERY_REGEX_MATCH_ALL
135 namespace_regex
= OSD_PERF_QUERY_REGEX_MATCH_ALL
137 pool_id_regex
= cls
.prepare_regex(pool_id
)
139 namespace_regex
= cls
.prepare_regex(namespace
)
143 {'type': 'pool_id', 'regex': pool_id_regex
},
144 {'type': 'namespace', 'regex': namespace_regex
},
145 {'type': 'object_name',
146 'regex': '^(?:rbd|journal)_data\\.(?:([0-9]+)\\.)?([^.]+)\\.'},
148 'performance_counter_descriptors': OSD_PERF_QUERY_COUNTERS
,
149 'limit': {'order_by': counter_type
,
150 'max_count': OSD_PERF_QUERY_MAX_RESULTS
},
154 def pool_spec_search_keys(cls
, pool_key
):
155 return [pool_key
[0:len(pool_key
) - x
]
156 for x
in range(0, len(pool_key
) + 1)]
159 def submatch_pool_key(cls
, pool_key
, search_key
):
160 return ((pool_key
[1] == search_key
[1] or not search_key
[1])
161 and (pool_key
[0] == search_key
[0] or not search_key
[0]))
163 def __init__(self
, module
):
165 self
.log
= module
.log
167 self
.thread
= Thread(target
=self
.run
)
172 self
.log
.info("PerfHandler: starting")
175 self
.scrub_expired_queries()
176 self
.process_raw_osd_perf_counters()
177 self
.refresh_condition
.notify()
179 stats_period
= int(self
.module
.get_ceph_option("mgr_stats_period"))
180 self
.query_condition
.wait(stats_period
)
182 self
.log
.debug("PerfHandler: tick")
184 except Exception as ex
:
185 self
.log
.fatal("Fatal runtime error: {}\n{}".format(
186 ex
, traceback
.format_exc()))
188 def merge_raw_osd_perf_counters(self
, pool_key
, query
, now_ts
,
189 resolve_image_names
):
190 pool_id_map
= query
[QUERY_POOL_ID_MAP
]
192 # collect and combine the raw counters from all sort orders
193 raw_pool_counters
= query
.setdefault(QUERY_RAW_POOL_COUNTERS
, {})
194 for query_id
in query
[QUERY_IDS
]:
195 res
= self
.module
.get_osd_perf_counters(query_id
)
196 for counter
in res
['counters']:
197 # replace pool id from object name if it exists
199 pool_id
= int(k
[2][0]) if k
[2][0] else int(k
[0][0])
203 # ignore metrics from non-matching pools/namespaces
204 if pool_id
not in pool_id_map
:
206 if pool_key
[1] is not None and pool_key
[1] != namespace
:
209 # flag the pool (and namespace) for refresh if we cannot find
210 # image name in the cache
211 resolve_image_key
= (pool_id
, namespace
)
212 if image_id
not in self
.image_name_cache
.get(resolve_image_key
, {}):
213 resolve_image_names
.add(resolve_image_key
)
215 # copy the 'sum' counter values for each image (ignore count)
216 # if we haven't already processed it for this round
217 raw_namespaces
= raw_pool_counters
.setdefault(pool_id
, {})
218 raw_images
= raw_namespaces
.setdefault(namespace
, {})
219 raw_image
= raw_images
.setdefault(image_id
, [None, None])
221 # save the last two perf counters for each image
222 if raw_image
[0] and raw_image
[0][0] < now_ts
:
223 raw_image
[1] = raw_image
[0]
226 raw_image
[0] = [now_ts
, [int(x
[0]) for x
in counter
['c']]]
228 self
.log
.debug("merge_raw_osd_perf_counters: {}".format(raw_pool_counters
))
229 return raw_pool_counters
231 def sum_osd_perf_counters(self
, query
, raw_pool_counters
, now_ts
):
232 # update the cumulative counters for each image
233 sum_pool_counters
= query
.setdefault(QUERY_SUM_POOL_COUNTERS
, {})
234 for pool_id
, raw_namespaces
in raw_pool_counters
.items():
235 sum_namespaces
= sum_pool_counters
.setdefault(pool_id
, {})
236 for namespace
, raw_images
in raw_namespaces
.items():
237 sum_namespace
= sum_namespaces
.setdefault(namespace
, {})
238 for image_id
, raw_image
in raw_images
.items():
239 # zero-out non-updated raw counters
242 elif raw_image
[0][0] < now_ts
:
243 raw_image
[1] = raw_image
[0]
244 raw_image
[0] = [now_ts
, [0 for x
in raw_image
[1][1]]]
247 counters
= raw_image
[0][1]
249 # copy raw counters if this is a newly discovered image or
250 # increment existing counters
251 sum_image
= sum_namespace
.setdefault(image_id
, None)
253 for i
in range(len(counters
)):
254 sum_image
[i
] += counters
[i
]
256 sum_namespace
[image_id
] = [x
for x
in counters
]
258 self
.log
.debug("sum_osd_perf_counters: {}".format(sum_pool_counters
))
259 return sum_pool_counters
261 def refresh_image_names(self
, resolve_image_names
):
262 for pool_id
, namespace
in resolve_image_names
:
263 image_key
= (pool_id
, namespace
)
264 images
= self
.image_name_cache
.setdefault(image_key
, {})
265 with self
.module
.rados
.open_ioctx2(int(pool_id
)) as ioctx
:
266 ioctx
.set_namespace(namespace
)
267 for image_meta
in rbd
.RBD().list2(ioctx
):
268 images
[image_meta
['id']] = image_meta
['name']
269 self
.log
.debug("resolve_image_names: {}={}".format(image_key
, images
))
271 def scrub_missing_images(self
):
272 for pool_key
, query
in self
.user_queries
.items():
273 raw_pool_counters
= query
.get(QUERY_RAW_POOL_COUNTERS
, {})
274 sum_pool_counters
= query
.get(QUERY_SUM_POOL_COUNTERS
, {})
275 for pool_id
, sum_namespaces
in sum_pool_counters
.items():
276 raw_namespaces
= raw_pool_counters
.get(pool_id
, {})
277 for namespace
, sum_images
in sum_namespaces
.items():
278 raw_images
= raw_namespaces
.get(namespace
, {})
280 image_key
= (pool_id
, namespace
)
281 image_names
= self
.image_name_cache
.get(image_key
, {})
282 for image_id
in list(sum_images
.keys()):
283 # scrub image counters if we failed to resolve image name
284 if image_id
not in image_names
:
285 self
.log
.debug("scrub_missing_images: dropping {}/{}".format(
286 image_key
, image_id
))
287 del sum_images
[image_id
]
288 if image_id
in raw_images
:
289 del raw_images
[image_id
]
291 def process_raw_osd_perf_counters(self
):
293 now_ts
= int(now
.strftime("%s"))
295 # clear the image name cache if we need to refresh all active pools
296 if self
.image_name_cache
and \
297 self
.image_name_refresh_time
+ POOL_REFRESH_INTERVAL
< now
:
298 self
.log
.debug("process_raw_osd_perf_counters: expiring image name cache")
299 self
.image_name_cache
= {}
301 resolve_image_names
= set()
302 for pool_key
, query
in self
.user_queries
.items():
303 if not query
[QUERY_IDS
]:
306 raw_pool_counters
= self
.merge_raw_osd_perf_counters(
307 pool_key
, query
, now_ts
, resolve_image_names
)
308 self
.sum_osd_perf_counters(query
, raw_pool_counters
, now_ts
)
310 if resolve_image_names
:
311 self
.image_name_refresh_time
= now
312 self
.refresh_image_names(resolve_image_names
)
313 self
.scrub_missing_images()
314 elif not self
.image_name_cache
:
315 self
.scrub_missing_images()
317 def resolve_pool_id(self
, pool_name
):
318 pool_id
= self
.module
.rados
.pool_lookup(pool_name
)
320 raise rados
.ObjectNotFound("Pool '{}' not found".format(pool_name
),
324 def scrub_expired_queries(self
):
325 # perf counters need to be periodically refreshed to continue
327 expire_time
= datetime
.now() - QUERY_EXPIRE_INTERVAL
328 for pool_key
in list(self
.user_queries
.keys()):
329 user_query
= self
.user_queries
[pool_key
]
330 if user_query
[QUERY_LAST_REQUEST
] < expire_time
:
331 self
.unregister_osd_perf_queries(pool_key
, user_query
[QUERY_IDS
])
332 del self
.user_queries
[pool_key
]
334 def register_osd_perf_queries(self
, pool_id
, namespace
):
337 for counter
in OSD_PERF_QUERY_COUNTERS
:
338 query
= self
.prepare_osd_perf_query(pool_id
, namespace
, counter
)
339 self
.log
.debug("register_osd_perf_queries: {}".format(query
))
341 query_id
= self
.module
.add_osd_perf_query(query
)
343 raise RuntimeError('Failed to add OSD perf query: {}'.format(query
))
344 query_ids
.append(query_id
)
347 for query_id
in query_ids
:
348 self
.module
.remove_osd_perf_query(query_id
)
353 def unregister_osd_perf_queries(self
, pool_key
, query_ids
):
354 self
.log
.info("unregister_osd_perf_queries: pool_key={}, query_ids={}".format(
355 pool_key
, query_ids
))
356 for query_id
in query_ids
:
357 self
.module
.remove_osd_perf_query(query_id
)
360 def register_query(self
, pool_key
):
361 if pool_key
not in self
.user_queries
:
364 pool_id
= self
.resolve_pool_id(pool_key
[0])
367 QUERY_POOL_ID
: pool_id
,
368 QUERY_POOL_ID_MAP
: {pool_id
: pool_key
[0]},
369 QUERY_IDS
: self
.register_osd_perf_queries(pool_id
, pool_key
[1]),
370 QUERY_LAST_REQUEST
: datetime
.now()
373 self
.user_queries
[pool_key
] = user_query
375 # force an immediate stat pull if this is a new query
376 self
.query_condition
.notify()
377 self
.refresh_condition
.wait(5)
380 user_query
= self
.user_queries
[pool_key
]
382 # ensure query doesn't expire
383 user_query
[QUERY_LAST_REQUEST
] = datetime
.now()
385 if pool_key
== GLOBAL_POOL_KEY
:
386 # refresh the global pool id -> name map upon each
388 user_query
[QUERY_POOL_ID_MAP
] = {
389 pool_id
: pool_name
for pool_id
, pool_name
390 in get_rbd_pools(self
.module
).items()}
392 self
.log
.debug("register_query: pool_key={}, query_ids={}".format(
393 pool_key
, user_query
[QUERY_IDS
]))
397 def extract_stat(self
, index
, raw_image
, sum_image
):
398 # require two raw counters between a fixed time window
399 if not raw_image
or not raw_image
[0] or not raw_image
[1]:
402 current_time
= raw_image
[0][0]
403 previous_time
= raw_image
[1][0]
404 if current_time
<= previous_time
or \
405 current_time
- previous_time
> STATS_RATE_INTERVAL
.total_seconds():
408 current_value
= raw_image
[0][1][index
]
409 instant_rate
= float(current_value
) / (current_time
- previous_time
)
411 # convert latencies from sum to average per op
413 if OSD_PERF_QUERY_COUNTERS
[index
] == 'write_latency':
414 ops_index
= OSD_PERF_QUERY_COUNTERS_INDICES
['write_ops']
415 elif OSD_PERF_QUERY_COUNTERS
[index
] == 'read_latency':
416 ops_index
= OSD_PERF_QUERY_COUNTERS_INDICES
['read_ops']
418 if ops_index
is not None:
419 ops
= max(1, self
.extract_stat(ops_index
, raw_image
, sum_image
))
424 def extract_counter(self
, index
, raw_image
, sum_image
):
426 return sum_image
[index
]
429 def generate_report(self
, query
, sort_by
, extract_data
):
430 pool_id_map
= query
[QUERY_POOL_ID_MAP
]
431 sum_pool_counters
= query
.setdefault(QUERY_SUM_POOL_COUNTERS
, {})
432 raw_pool_counters
= query
.setdefault(QUERY_RAW_POOL_COUNTERS
, {})
434 sort_by_index
= OSD_PERF_QUERY_COUNTERS
.index(sort_by
)
436 # pre-sort and limit the response
438 for pool_id
, sum_namespaces
in sum_pool_counters
.items():
439 if pool_id
not in pool_id_map
:
441 raw_namespaces
= raw_pool_counters
.get(pool_id
, {})
442 for namespace
, sum_images
in sum_namespaces
.items():
443 raw_images
= raw_namespaces
.get(namespace
, {})
444 for image_id
, sum_image
in sum_images
.items():
445 raw_image
= raw_images
.get(image_id
, [])
447 # always sort by recent IO activity
448 results
.append([(pool_id
, namespace
, image_id
),
449 self
.extract_stat(sort_by_index
, raw_image
,
451 results
= sorted(results
, key
=lambda x
: x
[1], reverse
=True)[:REPORT_MAX_RESULTS
]
453 # build the report in sorted order
454 pool_descriptors
= {}
456 for key
, _
in results
:
458 pool_name
= pool_id_map
[pool_id
]
462 image_names
= self
.image_name_cache
.get((pool_id
, namespace
), {})
463 image_name
= image_names
[image_id
]
465 raw_namespaces
= raw_pool_counters
.get(pool_id
, {})
466 raw_images
= raw_namespaces
.get(namespace
, {})
467 raw_image
= raw_images
.get(image_id
, [])
469 sum_namespaces
= sum_pool_counters
[pool_id
]
470 sum_images
= sum_namespaces
[namespace
]
471 sum_image
= sum_images
.get(image_id
, [])
473 pool_descriptor
= pool_name
475 pool_descriptor
+= "/{}".format(namespace
)
476 pool_index
= pool_descriptors
.setdefault(pool_descriptor
,
477 len(pool_descriptors
))
478 image_descriptor
= "{}/{}".format(pool_index
, image_name
)
479 data
= [extract_data(i
, raw_image
, sum_image
)
480 for i
in range(len(OSD_PERF_QUERY_COUNTERS
))]
482 # skip if no data to report
483 if data
== [0 for i
in range(len(OSD_PERF_QUERY_COUNTERS
))]:
486 counters
.append({image_descriptor
: data
})
488 return {idx
: descriptor
for descriptor
, idx
489 in pool_descriptors
.items()}, \
492 def get_perf_data(self
, report
, pool_spec
, sort_by
, extract_data
):
493 self
.log
.debug("get_perf_{}s: pool_spec={}, sort_by={}".format(
494 report
, pool_spec
, sort_by
))
495 self
.scrub_expired_queries()
497 pool_key
= extract_pool_key(pool_spec
)
498 authorize_request(self
.module
, pool_key
[0], pool_key
[1])
500 user_query
= self
.register_query(pool_key
)
503 pool_descriptors
, counters
= self
.generate_report(
504 user_query
, sort_by
, extract_data
)
507 'timestamp': time
.mktime(now
.timetuple()),
508 '{}_descriptors'.format(report
): OSD_PERF_QUERY_COUNTERS
,
509 'pool_descriptors': pool_descriptors
,
510 '{}s'.format(report
): counters
513 return 0, json
.dumps(report
), ""
515 def get_perf_stats(self
, pool_spec
, sort_by
):
516 return self
.get_perf_data(
517 "stat", pool_spec
, sort_by
, self
.extract_stat
)
519 def get_perf_counters(self
, pool_spec
, sort_by
):
520 return self
.get_perf_data(
521 "counter", pool_spec
, sort_by
, self
.extract_counter
)
523 def handle_command(self
, inbuf
, prefix
, cmd
):
525 if prefix
== 'image stats':
526 return self
.get_perf_stats(cmd
.get('pool_spec', None),
527 cmd
.get('sort_by', OSD_PERF_QUERY_COUNTERS
[0]))
528 elif prefix
== 'image counters':
529 return self
.get_perf_counters(cmd
.get('pool_spec', None),
530 cmd
.get('sort_by', OSD_PERF_QUERY_COUNTERS
[0]))
532 raise NotImplementedError(cmd
['prefix'])
536 def __init__(self
, throttle_period
):
537 self
.throttle_period
= throttle_period
538 self
.time_of_last_call
= datetime
.min
540 def __call__(self
, fn
):
542 def wrapper(*args
, **kwargs
):
544 if self
.time_of_last_call
+ self
.throttle_period
<= now
:
545 self
.time_of_last_call
= now
546 return fn(*args
, **kwargs
)
551 def __init__(self
, sequence
, task_id
, message
, refs
):
552 self
.sequence
= sequence
553 self
.task_id
= task_id
554 self
.message
= message
556 self
.retry_time
= None
557 self
.in_progress
= False
559 self
.canceled
= False
563 return self
.to_json()
566 def sequence_key(self
):
567 return "{0:016X}".format(self
.sequence
)
571 self
.fail("Operation canceled")
573 def fail(self
, message
):
575 self
.failure_message
= message
578 d
= {TASK_SEQUENCE
: self
.sequence
,
579 TASK_ID
: self
.task_id
,
580 TASK_MESSAGE
: self
.message
,
584 d
[TASK_RETRY_TIME
] = self
.retry_time
.isoformat()
586 d
[TASK_IN_PROGRESS
] = True
587 d
[TASK_PROGRESS
] = self
.progress
589 d
[TASK_CANCELED
] = True
593 return str(json
.dumps(self
.to_dict()))
596 def from_json(cls
, val
):
599 action
= d
.get(TASK_REFS
, {}).get(TASK_REF_ACTION
)
600 if action
not in VALID_TASK_ACTIONS
:
601 raise ValueError("Invalid task action: {}".format(action
))
603 return Task(d
[TASK_SEQUENCE
], d
[TASK_ID
], d
[TASK_MESSAGE
], d
[TASK_REFS
])
604 except json
.JSONDecodeError
as e
:
605 raise ValueError("Invalid JSON ({})".format(str(e
)))
606 except KeyError as e
:
607 raise ValueError("Invalid task format (missing key {})".format(str(e
)))
612 condition
= Condition(lock
)
615 in_progress_task
= None
616 tasks_by_sequence
= dict()
623 def __init__(self
, module
):
625 self
.log
= module
.log
628 self
.init_task_queue()
630 self
.thread
= Thread(target
=self
.run
)
634 def default_pool_name(self
):
635 return self
.module
.get_ceph_option("rbd_default_pool")
637 def extract_pool_spec(self
, pool_spec
):
638 pool_spec
= extract_pool_key(pool_spec
)
639 if pool_spec
== GLOBAL_POOL_KEY
:
640 pool_spec
= (self
.default_pool_name
, '')
643 def extract_image_spec(self
, image_spec
):
644 match
= re
.match(r
'^(?:([^/]+)/(?:([^/]+)/)?)?([^/@]+)$',
647 raise ValueError("Invalid image spec: {}".format(image_spec
))
648 return (match
.group(1) or self
.default_pool_name
, match
.group(2) or '',
653 self
.log
.info("TaskHandler: starting")
657 for sequence
in sorted([sequence
for sequence
, task
658 in self
.tasks_by_sequence
.items()
659 if not task
.retry_time
or task
.retry_time
<= now
]):
660 self
.execute_task(sequence
)
662 self
.condition
.wait(5)
663 self
.log
.debug("TaskHandler: tick")
665 except Exception as ex
:
666 self
.log
.fatal("Fatal runtime error: {}\n{}".format(
667 ex
, traceback
.format_exc()))
670 def open_ioctx(self
, spec
):
672 with self
.module
.rados
.open_ioctx(spec
[0]) as ioctx
:
673 ioctx
.set_namespace(spec
[1])
675 except rados
.ObjectNotFound
:
676 self
.log
.error("Failed to locate pool {}".format(spec
[0]))
680 def format_image_spec(cls
, image_spec
):
681 image
= image_spec
[2]
683 image
= "{}/{}".format(image_spec
[1], image
)
685 image
= "{}/{}".format(image_spec
[0], image
)
688 def init_task_queue(self
):
689 for pool_id
, pool_name
in get_rbd_pools(self
.module
).items():
691 with self
.module
.rados
.open_ioctx2(int(pool_id
)) as ioctx
:
692 self
.load_task_queue(ioctx
, pool_name
)
695 namespaces
= rbd
.RBD().namespace_list(ioctx
)
696 except rbd
.OperationNotSupported
:
697 self
.log
.debug("Namespaces not supported")
700 for namespace
in namespaces
:
701 ioctx
.set_namespace(namespace
)
702 self
.load_task_queue(ioctx
, pool_name
)
704 except rados
.ObjectNotFound
:
708 if self
.tasks_by_sequence
:
709 self
.sequence
= list(sorted(self
.tasks_by_sequence
.keys()))[-1]
711 self
.log
.debug("sequence={}, tasks_by_sequence={}, tasks_by_id={}".format(
712 self
.sequence
, str(self
.tasks_by_sequence
), str(self
.tasks_by_id
)))
714 def load_task_queue(self
, ioctx
, pool_name
):
715 pool_spec
= pool_name
717 pool_spec
+= "/{}".format(ioctx
.nspace
)
722 with rados
.ReadOpCtx() as read_op
:
723 self
.log
.info("load_task_task: {}, start_after={}".format(
724 pool_spec
, start_after
))
725 it
, ret
= ioctx
.get_omap_vals(read_op
, start_after
, "", 128)
726 ioctx
.operate_read_op(read_op
, RBD_TASK_OID
)
732 self
.log
.info("load_task_task: task={}".format(v
))
735 task
= Task
.from_json(v
)
736 self
.append_task(task
)
738 self
.log
.error("Failed to decode task: pool_spec={}, task={}".format(pool_spec
, v
))
743 except StopIteration:
745 except rados
.ObjectNotFound
:
749 def append_task(self
, task
):
750 self
.tasks_by_sequence
[task
.sequence
] = task
751 self
.tasks_by_id
[task
.task_id
] = task
753 def task_refs_match(self
, task_refs
, refs
):
754 if TASK_REF_IMAGE_ID
not in refs
and TASK_REF_IMAGE_ID
in task_refs
:
755 task_refs
= task_refs
.copy()
756 del task_refs
[TASK_REF_IMAGE_ID
]
758 self
.log
.debug("task_refs_match: ref1={}, ref2={}".format(task_refs
, refs
))
759 return task_refs
== refs
761 def find_task(self
, refs
):
762 self
.log
.debug("find_task: refs={}".format(refs
))
764 # search for dups and return the original
765 for task_id
in reversed(sorted(self
.tasks_by_id
.keys())):
766 task
= self
.tasks_by_id
[task_id
]
767 if self
.task_refs_match(task
.refs
, refs
):
770 # search for a completed task (message replay)
771 for task
in reversed(self
.completed_tasks
):
772 if self
.task_refs_match(task
.refs
, refs
):
775 def add_task(self
, ioctx
, message
, refs
):
776 self
.log
.debug("add_task: message={}, refs={}".format(message
, refs
))
778 # ensure unique uuid across all pools
780 task_id
= str(uuid
.uuid4())
781 if task_id
not in self
.tasks_by_id
:
785 task
= Task(self
.sequence
, task_id
, message
, refs
)
787 # add the task to the rbd_task omap
788 task_json
= task
.to_json()
789 omap_keys
= (task
.sequence_key
, )
790 omap_vals
= (str.encode(task_json
), )
791 self
.log
.info("adding task: {} {}".format(omap_keys
[0], omap_vals
[0]))
793 with rados
.WriteOpCtx() as write_op
:
794 ioctx
.set_omap(write_op
, omap_keys
, omap_vals
)
795 ioctx
.operate_write_op(write_op
, RBD_TASK_OID
)
796 self
.append_task(task
)
798 self
.condition
.notify()
801 def remove_task(self
, ioctx
, task
, remove_in_memory
=True):
802 self
.log
.info("remove_task: task={}".format(str(task
)))
803 omap_keys
= (task
.sequence_key
, )
805 with rados
.WriteOpCtx() as write_op
:
806 ioctx
.remove_omap_keys(write_op
, omap_keys
)
807 ioctx
.operate_write_op(write_op
, RBD_TASK_OID
)
808 except rados
.ObjectNotFound
:
813 del self
.tasks_by_id
[task
.task_id
]
814 del self
.tasks_by_sequence
[task
.sequence
]
816 # keep a record of the last N tasks to help avoid command replay
818 if not task
.failed
and not task
.canceled
:
819 self
.log
.debug("remove_task: moving to completed tasks")
820 self
.completed_tasks
.append(task
)
821 self
.completed_tasks
= self
.completed_tasks
[-MAX_COMPLETED_TASKS
:]
826 def execute_task(self
, sequence
):
827 task
= self
.tasks_by_sequence
[sequence
]
828 self
.log
.info("execute_task: task={}".format(str(task
)))
832 with self
.open_ioctx((task
.refs
[TASK_REF_POOL_NAME
],
833 task
.refs
[TASK_REF_POOL_NAMESPACE
])) as ioctx
:
836 action
= task
.refs
[TASK_REF_ACTION
]
837 execute_fn
= {TASK_REF_ACTION_FLATTEN
: self
.execute_flatten
,
838 TASK_REF_ACTION_REMOVE
: self
.execute_remove
,
839 TASK_REF_ACTION_TRASH_REMOVE
: self
.execute_trash_remove
,
840 TASK_REF_ACTION_MIGRATION_EXECUTE
: self
.execute_migration_execute
,
841 TASK_REF_ACTION_MIGRATION_COMMIT
: self
.execute_migration_commit
,
842 TASK_REF_ACTION_MIGRATION_ABORT
: self
.execute_migration_abort
845 self
.log
.error("Invalid task action: {}".format(action
))
847 task
.in_progress
= True
848 self
.in_progress_task
= task
849 self
.update_progress(task
, 0)
853 execute_fn(ioctx
, task
)
855 except rbd
.OperationCanceled
:
856 self
.log
.info("Operation canceled: task={}".format(
862 task
.in_progress
= False
863 self
.in_progress_task
= None
865 self
.complete_progress(task
)
866 self
.remove_task(ioctx
, task
)
868 except rados
.ObjectNotFound
as e
:
869 self
.log
.error("execute_task: {}".format(e
))
871 self
.update_progress(task
, 0)
873 # pool DNE -- remove the task
874 self
.complete_progress(task
)
875 self
.remove_task(ioctx
, task
)
877 except (rados
.Error
, rbd
.Error
) as e
:
878 self
.log
.error("execute_task: {}".format(e
))
879 self
.update_progress(task
, 0)
882 task
.in_progress
= False
883 task
.retry_time
= datetime
.now() + TASK_RETRY_INTERVAL
885 def progress_callback(self
, task
, current
, total
):
886 progress
= float(current
) / float(total
)
887 self
.log
.debug("progress_callback: task={}, progress={}".format(
888 str(task
), progress
))
890 # avoid deadlocking when a new command comes in during a progress callback
891 if not self
.lock
.acquire(False):
895 if not self
.in_progress_task
or self
.in_progress_task
.canceled
:
896 return -rbd
.ECANCELED
897 self
.in_progress_task
.progress
= progress
901 self
.throttled_update_progress(task
, progress
)
904 def execute_flatten(self
, ioctx
, task
):
905 self
.log
.info("execute_flatten: task={}".format(str(task
)))
908 with rbd
.Image(ioctx
, task
.refs
[TASK_REF_IMAGE_NAME
]) as image
:
909 image
.flatten(on_progress
=partial(self
.progress_callback
, task
))
910 except rbd
.InvalidArgument
:
911 task
.fail("Image does not have parent")
912 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
913 except rbd
.ImageNotFound
:
914 task
.fail("Image does not exist")
915 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
917 def execute_remove(self
, ioctx
, task
):
918 self
.log
.info("execute_remove: task={}".format(str(task
)))
921 rbd
.RBD().remove(ioctx
, task
.refs
[TASK_REF_IMAGE_NAME
],
922 on_progress
=partial(self
.progress_callback
, task
))
923 except rbd
.ImageNotFound
:
924 task
.fail("Image does not exist")
925 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
927 def execute_trash_remove(self
, ioctx
, task
):
928 self
.log
.info("execute_trash_remove: task={}".format(str(task
)))
931 rbd
.RBD().trash_remove(ioctx
, task
.refs
[TASK_REF_IMAGE_ID
],
932 on_progress
=partial(self
.progress_callback
, task
))
933 except rbd
.ImageNotFound
:
934 task
.fail("Image does not exist")
935 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
937 def execute_migration_execute(self
, ioctx
, task
):
938 self
.log
.info("execute_migration_execute: task={}".format(str(task
)))
941 rbd
.RBD().migration_execute(ioctx
, task
.refs
[TASK_REF_IMAGE_NAME
],
942 on_progress
=partial(self
.progress_callback
, task
))
943 except rbd
.ImageNotFound
:
944 task
.fail("Image does not exist")
945 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
946 except rbd
.InvalidArgument
:
947 task
.fail("Image is not migrating")
948 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
950 def execute_migration_commit(self
, ioctx
, task
):
951 self
.log
.info("execute_migration_commit: task={}".format(str(task
)))
954 rbd
.RBD().migration_commit(ioctx
, task
.refs
[TASK_REF_IMAGE_NAME
],
955 on_progress
=partial(self
.progress_callback
, task
))
956 except rbd
.ImageNotFound
:
957 task
.fail("Image does not exist")
958 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
959 except rbd
.InvalidArgument
:
960 task
.fail("Image is not migrating or migration not executed")
961 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
963 def execute_migration_abort(self
, ioctx
, task
):
964 self
.log
.info("execute_migration_abort: task={}".format(str(task
)))
967 rbd
.RBD().migration_abort(ioctx
, task
.refs
[TASK_REF_IMAGE_NAME
],
968 on_progress
=partial(self
.progress_callback
, task
))
969 except rbd
.ImageNotFound
:
970 task
.fail("Image does not exist")
971 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
972 except rbd
.InvalidArgument
:
973 task
.fail("Image is not migrating")
974 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
976 def complete_progress(self
, task
):
977 self
.log
.debug("complete_progress: task={}".format(str(task
)))
980 self
.module
.remote("progress", "fail", task
.task_id
,
981 task
.failure_message
)
983 self
.module
.remote("progress", "complete", task
.task_id
)
985 # progress module is disabled
988 def update_progress(self
, task
, progress
):
989 self
.log
.debug("update_progress: task={}, progress={}".format(str(task
), progress
))
991 refs
= {"origin": "rbd_support"}
992 refs
.update(task
.refs
)
994 self
.module
.remote("progress", "update", task
.task_id
,
995 task
.message
, progress
, refs
)
997 # progress module is disabled
1000 @Throttle(timedelta(seconds
=1))
1001 def throttled_update_progress(self
, task
, progress
):
1002 self
.update_progress(task
, progress
)
1004 def queue_flatten(self
, image_spec
):
1005 image_spec
= self
.extract_image_spec(image_spec
)
1007 authorize_request(self
.module
, image_spec
[0], image_spec
[1])
1008 self
.log
.info("queue_flatten: {}".format(image_spec
))
1010 refs
= {TASK_REF_ACTION
: TASK_REF_ACTION_FLATTEN
,
1011 TASK_REF_POOL_NAME
: image_spec
[0],
1012 TASK_REF_POOL_NAMESPACE
: image_spec
[1],
1013 TASK_REF_IMAGE_NAME
: image_spec
[2]}
1015 with self
.open_ioctx(image_spec
) as ioctx
:
1017 with rbd
.Image(ioctx
, image_spec
[2]) as image
:
1018 refs
[TASK_REF_IMAGE_ID
] = image
.id()
1021 parent_image_id
= image
.parent_id()
1022 except rbd
.ImageNotFound
:
1023 parent_image_id
= None
1025 except rbd
.ImageNotFound
:
1028 task
= self
.find_task(refs
)
1030 return 0, task
.to_json(), ''
1032 if TASK_REF_IMAGE_ID
not in refs
:
1033 raise rbd
.ImageNotFound("Image {} does not exist".format(
1034 self
.format_image_spec(image_spec
)), errno
=errno
.ENOENT
)
1035 if not parent_image_id
:
1036 raise rbd
.ImageNotFound("Image {} does not have a parent".format(
1037 self
.format_image_spec(image_spec
)), errno
=errno
.ENOENT
)
1039 return 0, self
.add_task(ioctx
,
1040 "Flattening image {}".format(
1041 self
.format_image_spec(image_spec
)),
1044 def queue_remove(self
, image_spec
):
1045 image_spec
= self
.extract_image_spec(image_spec
)
1047 authorize_request(self
.module
, image_spec
[0], image_spec
[1])
1048 self
.log
.info("queue_remove: {}".format(image_spec
))
1050 refs
= {TASK_REF_ACTION
: TASK_REF_ACTION_REMOVE
,
1051 TASK_REF_POOL_NAME
: image_spec
[0],
1052 TASK_REF_POOL_NAMESPACE
: image_spec
[1],
1053 TASK_REF_IMAGE_NAME
: image_spec
[2]}
1055 with self
.open_ioctx(image_spec
) as ioctx
:
1057 with rbd
.Image(ioctx
, image_spec
[2]) as image
:
1058 refs
[TASK_REF_IMAGE_ID
] = image
.id()
1059 snaps
= list(image
.list_snaps())
1061 except rbd
.ImageNotFound
:
1064 task
= self
.find_task(refs
)
1066 return 0, task
.to_json(), ''
1068 if TASK_REF_IMAGE_ID
not in refs
:
1069 raise rbd
.ImageNotFound("Image {} does not exist".format(
1070 self
.format_image_spec(image_spec
)), errno
=errno
.ENOENT
)
1072 raise rbd
.ImageBusy("Image {} has snapshots".format(
1073 self
.format_image_spec(image_spec
)), errno
=errno
.EBUSY
)
1075 return 0, self
.add_task(ioctx
,
1076 "Removing image {}".format(
1077 self
.format_image_spec(image_spec
)),
1080 def queue_trash_remove(self
, image_id_spec
):
1081 image_id_spec
= self
.extract_image_spec(image_id_spec
)
1083 authorize_request(self
.module
, image_id_spec
[0], image_id_spec
[1])
1084 self
.log
.info("queue_trash_remove: {}".format(image_id_spec
))
1086 refs
= {TASK_REF_ACTION
: TASK_REF_ACTION_TRASH_REMOVE
,
1087 TASK_REF_POOL_NAME
: image_id_spec
[0],
1088 TASK_REF_POOL_NAMESPACE
: image_id_spec
[1],
1089 TASK_REF_IMAGE_ID
: image_id_spec
[2]}
1090 task
= self
.find_task(refs
)
1092 return 0, task
.to_json(), ''
1094 # verify that image exists in trash
1095 with self
.open_ioctx(image_id_spec
) as ioctx
:
1096 rbd
.RBD().trash_get(ioctx
, image_id_spec
[2])
1098 return 0, self
.add_task(ioctx
,
1099 "Removing image {} from trash".format(
1100 self
.format_image_spec(image_id_spec
)),
1103 def get_migration_status(self
, ioctx
, image_spec
):
1105 return rbd
.RBD().migration_status(ioctx
, image_spec
[2])
1106 except (rbd
.InvalidArgument
, rbd
.ImageNotFound
):
1109 def validate_image_migrating(self
, image_spec
, migration_status
):
1110 if not migration_status
:
1111 raise rbd
.InvalidArgument("Image {} is not migrating".format(
1112 self
.format_image_spec(image_spec
)), errno
=errno
.EINVAL
)
1114 def resolve_pool_name(self
, pool_id
):
1115 osd_map
= self
.module
.get('osd_map')
1116 for pool
in osd_map
['pools']:
1117 if pool
['pool'] == pool_id
:
1118 return pool
['pool_name']
1121 def queue_migration_execute(self
, image_spec
):
1122 image_spec
= self
.extract_image_spec(image_spec
)
1124 authorize_request(self
.module
, image_spec
[0], image_spec
[1])
1125 self
.log
.info("queue_migration_execute: {}".format(image_spec
))
1127 refs
= {TASK_REF_ACTION
: TASK_REF_ACTION_MIGRATION_EXECUTE
,
1128 TASK_REF_POOL_NAME
: image_spec
[0],
1129 TASK_REF_POOL_NAMESPACE
: image_spec
[1],
1130 TASK_REF_IMAGE_NAME
: image_spec
[2]}
1132 with self
.open_ioctx(image_spec
) as ioctx
:
1133 status
= self
.get_migration_status(ioctx
, image_spec
)
1135 refs
[TASK_REF_IMAGE_ID
] = status
['dest_image_id']
1137 task
= self
.find_task(refs
)
1139 return 0, task
.to_json(), ''
1141 self
.validate_image_migrating(image_spec
, status
)
1142 if status
['state'] not in [rbd
.RBD_IMAGE_MIGRATION_STATE_PREPARED
,
1143 rbd
.RBD_IMAGE_MIGRATION_STATE_EXECUTING
]:
1144 raise rbd
.InvalidArgument("Image {} is not in ready state".format(
1145 self
.format_image_spec(image_spec
)), errno
=errno
.EINVAL
)
1147 source_pool
= self
.resolve_pool_name(status
['source_pool_id'])
1148 dest_pool
= self
.resolve_pool_name(status
['dest_pool_id'])
1149 return 0, self
.add_task(ioctx
,
1150 "Migrating image {} to {}".format(
1151 self
.format_image_spec((source_pool
,
1152 status
['source_pool_namespace'],
1153 status
['source_image_name'])),
1154 self
.format_image_spec((dest_pool
,
1155 status
['dest_pool_namespace'],
1156 status
['dest_image_name']))),
1159 def queue_migration_commit(self
, image_spec
):
1160 image_spec
= self
.extract_image_spec(image_spec
)
1162 authorize_request(self
.module
, image_spec
[0], image_spec
[1])
1163 self
.log
.info("queue_migration_commit: {}".format(image_spec
))
1165 refs
= {TASK_REF_ACTION
: TASK_REF_ACTION_MIGRATION_COMMIT
,
1166 TASK_REF_POOL_NAME
: image_spec
[0],
1167 TASK_REF_POOL_NAMESPACE
: image_spec
[1],
1168 TASK_REF_IMAGE_NAME
: image_spec
[2]}
1170 with self
.open_ioctx(image_spec
) as ioctx
:
1171 status
= self
.get_migration_status(ioctx
, image_spec
)
1173 refs
[TASK_REF_IMAGE_ID
] = status
['dest_image_id']
1175 task
= self
.find_task(refs
)
1177 return 0, task
.to_json(), ''
1179 self
.validate_image_migrating(image_spec
, status
)
1180 if status
['state'] != rbd
.RBD_IMAGE_MIGRATION_STATE_EXECUTED
:
1181 raise rbd
.InvalidArgument("Image {} has not completed migration".format(
1182 self
.format_image_spec(image_spec
)), errno
=errno
.EINVAL
)
1184 return 0, self
.add_task(ioctx
,
1185 "Committing image migration for {}".format(
1186 self
.format_image_spec(image_spec
)),
1189 def queue_migration_abort(self
, image_spec
):
1190 image_spec
= self
.extract_image_spec(image_spec
)
1192 authorize_request(self
.module
, image_spec
[0], image_spec
[1])
1193 self
.log
.info("queue_migration_abort: {}".format(image_spec
))
1195 refs
= {TASK_REF_ACTION
: TASK_REF_ACTION_MIGRATION_ABORT
,
1196 TASK_REF_POOL_NAME
: image_spec
[0],
1197 TASK_REF_POOL_NAMESPACE
: image_spec
[1],
1198 TASK_REF_IMAGE_NAME
: image_spec
[2]}
1200 with self
.open_ioctx(image_spec
) as ioctx
:
1201 status
= self
.get_migration_status(ioctx
, image_spec
)
1203 refs
[TASK_REF_IMAGE_ID
] = status
['dest_image_id']
1205 task
= self
.find_task(refs
)
1207 return 0, task
.to_json(), ''
1209 self
.validate_image_migrating(image_spec
, status
)
1210 return 0, self
.add_task(ioctx
,
1211 "Aborting image migration for {}".format(
1212 self
.format_image_spec(image_spec
)),
1215 def task_cancel(self
, task_id
):
1216 self
.log
.info("task_cancel: {}".format(task_id
))
1218 task
= self
.tasks_by_id
.get(task_id
)
1219 if not task
or not is_authorized(self
.module
,
1220 task
.refs
[TASK_REF_POOL_NAME
],
1221 task
.refs
[TASK_REF_POOL_NAMESPACE
]):
1222 return -errno
.ENOENT
, '', "No such task {}".format(task_id
)
1226 remove_in_memory
= True
1227 if self
.in_progress_task
and self
.in_progress_task
.task_id
== task_id
:
1228 self
.log
.info("Attempting to cancel in-progress task: {}".format(str(self
.in_progress_task
)))
1229 remove_in_memory
= False
1231 # complete any associated event in the progress module
1232 self
.complete_progress(task
)
1234 # remove from rbd_task omap
1235 with self
.open_ioctx((task
.refs
[TASK_REF_POOL_NAME
],
1236 task
.refs
[TASK_REF_POOL_NAMESPACE
])) as ioctx
:
1237 self
.remove_task(ioctx
, task
, remove_in_memory
)
1241 def task_list(self
, task_id
):
1242 self
.log
.info("task_list: {}".format(task_id
))
1245 task
= self
.tasks_by_id
.get(task_id
)
1246 if not task
or not is_authorized(self
.module
,
1247 task
.refs
[TASK_REF_POOL_NAME
],
1248 task
.refs
[TASK_REF_POOL_NAMESPACE
]):
1249 return -errno
.ENOENT
, '', "No such task {}".format(task_id
)
1251 result
= task
.to_dict()
1254 for sequence
in sorted(self
.tasks_by_sequence
.keys()):
1255 task
= self
.tasks_by_sequence
[sequence
]
1256 if is_authorized(self
.module
,
1257 task
.refs
[TASK_REF_POOL_NAME
],
1258 task
.refs
[TASK_REF_POOL_NAMESPACE
]):
1259 result
.append(task
.to_dict())
1261 return 0, json
.dumps(result
), ""
1263 def handle_command(self
, inbuf
, prefix
, cmd
):
1265 if prefix
== 'add flatten':
1266 return self
.queue_flatten(cmd
['image_spec'])
1267 elif prefix
== 'add remove':
1268 return self
.queue_remove(cmd
['image_spec'])
1269 elif prefix
== 'add trash remove':
1270 return self
.queue_trash_remove(cmd
['image_id_spec'])
1271 elif prefix
== 'add migration execute':
1272 return self
.queue_migration_execute(cmd
['image_spec'])
1273 elif prefix
== 'add migration commit':
1274 return self
.queue_migration_commit(cmd
['image_spec'])
1275 elif prefix
== 'add migration abort':
1276 return self
.queue_migration_abort(cmd
['image_spec'])
1277 elif prefix
== 'cancel':
1278 return self
.task_cancel(cmd
['task_id'])
1279 elif prefix
== 'list':
1280 return self
.task_list(cmd
.get('task_id'))
1282 raise NotImplementedError(cmd
['prefix'])
1285 class Module(MgrModule
):
1288 "cmd": "rbd perf image stats "
1289 "name=pool_spec,type=CephString,req=false "
1290 "name=sort_by,type=CephChoices,strings="
1291 "write_ops|write_bytes|write_latency|"
1292 "read_ops|read_bytes|read_latency,"
1294 "desc": "Retrieve current RBD IO performance stats",
1298 "cmd": "rbd perf image counters "
1299 "name=pool_spec,type=CephString,req=false "
1300 "name=sort_by,type=CephChoices,strings="
1301 "write_ops|write_bytes|write_latency|"
1302 "read_ops|read_bytes|read_latency,"
1304 "desc": "Retrieve current RBD IO performance counters",
1308 "cmd": "rbd task add flatten "
1309 "name=image_spec,type=CephString",
1310 "desc": "Flatten a cloned image asynchronously in the background",
1314 "cmd": "rbd task add remove "
1315 "name=image_spec,type=CephString",
1316 "desc": "Remove an image asynchronously in the background",
1320 "cmd": "rbd task add trash remove "
1321 "name=image_id_spec,type=CephString",
1322 "desc": "Remove an image from the trash asynchronously in the background",
1326 "cmd": "rbd task add migration execute "
1327 "name=image_spec,type=CephString",
1328 "desc": "Execute an image migration asynchronously in the background",
1332 "cmd": "rbd task add migration commit "
1333 "name=image_spec,type=CephString",
1334 "desc": "Commit an executed migration asynchronously in the background",
1338 "cmd": "rbd task add migration abort "
1339 "name=image_spec,type=CephString",
1340 "desc": "Abort a prepared migration asynchronously in the background",
1344 "cmd": "rbd task cancel "
1345 "name=task_id,type=CephString ",
1346 "desc": "Cancel a pending or running asynchronous task",
1350 "cmd": "rbd task list "
1351 "name=task_id,type=CephString,req=false ",
1352 "desc": "List pending or running asynchronous tasks",
1361 def __init__(self
, *args
, **kwargs
):
1362 super(Module
, self
).__init
__(*args
, **kwargs
)
1363 self
.perf
= PerfHandler(self
)
1364 self
.task
= TaskHandler(self
)
1366 def handle_command(self
, inbuf
, cmd
):
1367 prefix
= cmd
['prefix']
1370 if prefix
.startswith('rbd perf '):
1371 return self
.perf
.handle_command(inbuf
, prefix
[9:], cmd
)
1372 elif prefix
.startswith('rbd task '):
1373 return self
.task
.handle_command(inbuf
, prefix
[9:], cmd
)
1375 except NotAuthorizedError
:
1377 except Exception as ex
:
1378 # log the full traceback but don't send it to the CLI user
1379 self
.log
.fatal("Fatal runtime error: {}\n{}".format(
1380 ex
, traceback
.format_exc()))
1383 except rados
.Error
as ex
:
1384 return -ex
.errno
, "", str(ex
)
1385 except rbd
.OSError as ex
:
1386 return -ex
.errno
, "", str(ex
)
1387 except rbd
.Error
as ex
:
1388 return -errno
.EINVAL
, "", str(ex
)
1389 except KeyError as ex
:
1390 return -errno
.ENOENT
, "", str(ex
)
1391 except ValueError as ex
:
1392 return -errno
.EINVAL
, "", str(ex
)
1393 except NotAuthorizedError
as ex
:
1394 return -errno
.EACCES
, "", str(ex
)
1396 raise NotImplementedError(cmd
['prefix'])