14 from mgr_module
import MgrModule
16 from contextlib
import contextmanager
17 from datetime
import datetime
, timedelta
18 from functools
import partial
, wraps
19 from threading
import Condition
, Lock
, Thread
22 GLOBAL_POOL_KEY
= (None, None)
24 QUERY_POOL_ID
= "pool_id"
25 QUERY_POOL_ID_MAP
= "pool_id_map"
26 QUERY_IDS
= "query_ids"
27 QUERY_SUM_POOL_COUNTERS
= "pool_counters"
28 QUERY_RAW_POOL_COUNTERS
= "raw_pool_counters"
29 QUERY_LAST_REQUEST
= "last_request"
31 OSD_PERF_QUERY_REGEX_MATCH_ALL
= '^(.*)$'
32 OSD_PERF_QUERY_COUNTERS
= ['write_ops',
38 OSD_PERF_QUERY_COUNTERS_INDICES
= {
39 OSD_PERF_QUERY_COUNTERS
[i
]: i
for i
in range(len(OSD_PERF_QUERY_COUNTERS
))}
41 OSD_PERF_QUERY_LATENCY_COUNTER_INDICES
= [4, 5]
42 OSD_PERF_QUERY_MAX_RESULTS
= 256
44 POOL_REFRESH_INTERVAL
= timedelta(minutes
=5)
45 QUERY_EXPIRE_INTERVAL
= timedelta(minutes
=1)
46 STATS_RATE_INTERVAL
= timedelta(minutes
=1)
48 REPORT_MAX_RESULTS
= 64
50 RBD_TASK_OID
= "rbd_task"
52 TASK_SEQUENCE
= "sequence"
55 TASK_MESSAGE
= "message"
56 TASK_RETRY_TIME
= "retry_time"
57 TASK_IN_PROGRESS
= "in_progress"
58 TASK_PROGRESS
= "progress"
59 TASK_CANCELED
= "canceled"
61 TASK_REF_POOL_NAME
= "pool_name"
62 TASK_REF_POOL_NAMESPACE
= "pool_namespace"
63 TASK_REF_IMAGE_NAME
= "image_name"
64 TASK_REF_IMAGE_ID
= "image_id"
65 TASK_REF_ACTION
= "action"
67 TASK_REF_ACTION_FLATTEN
= "flatten"
68 TASK_REF_ACTION_REMOVE
= "remove"
69 TASK_REF_ACTION_TRASH_REMOVE
= "trash remove"
70 TASK_REF_ACTION_MIGRATION_EXECUTE
= "migrate execute"
71 TASK_REF_ACTION_MIGRATION_COMMIT
= "migrate commit"
72 TASK_REF_ACTION_MIGRATION_ABORT
= "migrate abort"
74 VALID_TASK_ACTIONS
= [TASK_REF_ACTION_FLATTEN
,
75 TASK_REF_ACTION_REMOVE
,
76 TASK_REF_ACTION_TRASH_REMOVE
,
77 TASK_REF_ACTION_MIGRATION_EXECUTE
,
78 TASK_REF_ACTION_MIGRATION_COMMIT
,
79 TASK_REF_ACTION_MIGRATION_ABORT
]
81 TASK_RETRY_INTERVAL
= timedelta(seconds
=30)
82 MAX_COMPLETED_TASKS
= 50
85 def extract_pool_key(pool_spec
):
87 return GLOBAL_POOL_KEY
89 match
= re
.match(r
'^([^/]+)(?:/([^/]+))?$', pool_spec
)
91 raise ValueError("Invalid pool spec: {}".format(pool_spec
))
92 return (match
.group(1), match
.group(2) or '')
95 def get_rbd_pools(module
):
96 osd_map
= module
.get('osd_map')
97 return {pool
['pool']: pool
['pool_name'] for pool
in osd_map
['pools']
98 if 'rbd' in pool
.get('application_metadata', {})}
106 query_condition
= Condition(lock
)
107 refresh_condition
= Condition(lock
)
110 image_name_cache
= {}
111 image_name_refresh_time
= datetime
.fromtimestamp(0)
114 def prepare_regex(cls
, value
):
115 return '^({})$'.format(value
)
118 def prepare_osd_perf_query(cls
, pool_id
, namespace
, counter_type
):
119 pool_id_regex
= OSD_PERF_QUERY_REGEX_MATCH_ALL
120 namespace_regex
= OSD_PERF_QUERY_REGEX_MATCH_ALL
122 pool_id_regex
= cls
.prepare_regex(pool_id
)
124 namespace_regex
= cls
.prepare_regex(namespace
)
128 {'type': 'pool_id', 'regex': pool_id_regex
},
129 {'type': 'namespace', 'regex': namespace_regex
},
130 {'type': 'object_name',
131 'regex': '^(?:rbd|journal)_data\\.(?:([0-9]+)\\.)?([^.]+)\\.'},
133 'performance_counter_descriptors': OSD_PERF_QUERY_COUNTERS
,
134 'limit': {'order_by': counter_type
,
135 'max_count': OSD_PERF_QUERY_MAX_RESULTS
},
139 def pool_spec_search_keys(cls
, pool_key
):
140 return [pool_key
[0:len(pool_key
) - x
]
141 for x
in range(0, len(pool_key
) + 1)]
144 def submatch_pool_key(cls
, pool_key
, search_key
):
145 return ((pool_key
[1] == search_key
[1] or not search_key
[1])
146 and (pool_key
[0] == search_key
[0] or not search_key
[0]))
148 def __init__(self
, module
):
150 self
.log
= module
.log
152 self
.thread
= Thread(target
=self
.run
)
157 self
.log
.info("PerfHandler: starting")
160 self
.scrub_expired_queries()
161 self
.process_raw_osd_perf_counters()
162 self
.refresh_condition
.notify()
164 stats_period
= int(self
.module
.get_ceph_option("mgr_stats_period"))
165 self
.query_condition
.wait(stats_period
)
167 self
.log
.debug("PerfHandler: tick")
169 except Exception as ex
:
170 self
.log
.fatal("Fatal runtime error: {}\n{}".format(
171 ex
, traceback
.format_exc()))
173 def merge_raw_osd_perf_counters(self
, pool_key
, query
, now_ts
,
174 resolve_image_names
):
175 pool_id_map
= query
[QUERY_POOL_ID_MAP
]
177 # collect and combine the raw counters from all sort orders
178 raw_pool_counters
= query
.setdefault(QUERY_RAW_POOL_COUNTERS
, {})
179 for query_id
in query
[QUERY_IDS
]:
180 res
= self
.module
.get_osd_perf_counters(query_id
)
181 for counter
in res
['counters']:
182 # replace pool id from object name if it exists
184 pool_id
= int(k
[2][0]) if k
[2][0] else int(k
[0][0])
188 # ignore metrics from non-matching pools/namespaces
189 if pool_id
not in pool_id_map
:
191 if pool_key
[1] is not None and pool_key
[1] != namespace
:
194 # flag the pool (and namespace) for refresh if we cannot find
195 # image name in the cache
196 resolve_image_key
= (pool_id
, namespace
)
197 if image_id
not in self
.image_name_cache
.get(resolve_image_key
, {}):
198 resolve_image_names
.add(resolve_image_key
)
200 # copy the 'sum' counter values for each image (ignore count)
201 # if we haven't already processed it for this round
202 raw_namespaces
= raw_pool_counters
.setdefault(pool_id
, {})
203 raw_images
= raw_namespaces
.setdefault(namespace
, {})
204 raw_image
= raw_images
.setdefault(image_id
, [None, None])
206 # save the last two perf counters for each image
207 if raw_image
[0] and raw_image
[0][0] < now_ts
:
208 raw_image
[1] = raw_image
[0]
211 raw_image
[0] = [now_ts
, [int(x
[0]) for x
in counter
['c']]]
213 self
.log
.debug("merge_raw_osd_perf_counters: {}".format(raw_pool_counters
))
214 return raw_pool_counters
216 def sum_osd_perf_counters(self
, query
, raw_pool_counters
, now_ts
):
217 # update the cumulative counters for each image
218 sum_pool_counters
= query
.setdefault(QUERY_SUM_POOL_COUNTERS
, {})
219 for pool_id
, raw_namespaces
in raw_pool_counters
.items():
220 sum_namespaces
= sum_pool_counters
.setdefault(pool_id
, {})
221 for namespace
, raw_images
in raw_namespaces
.items():
222 sum_namespace
= sum_namespaces
.setdefault(namespace
, {})
223 for image_id
, raw_image
in raw_images
.items():
224 # zero-out non-updated raw counters
227 elif raw_image
[0][0] < now_ts
:
228 raw_image
[1] = raw_image
[0]
229 raw_image
[0] = [now_ts
, [0 for x
in raw_image
[1][1]]]
232 counters
= raw_image
[0][1]
234 # copy raw counters if this is a newly discovered image or
235 # increment existing counters
236 sum_image
= sum_namespace
.setdefault(image_id
, None)
238 for i
in range(len(counters
)):
239 sum_image
[i
] += counters
[i
]
241 sum_namespace
[image_id
] = [x
for x
in counters
]
243 self
.log
.debug("sum_osd_perf_counters: {}".format(sum_pool_counters
))
244 return sum_pool_counters
246 def refresh_image_names(self
, resolve_image_names
):
247 for pool_id
, namespace
in resolve_image_names
:
248 image_key
= (pool_id
, namespace
)
249 images
= self
.image_name_cache
.setdefault(image_key
, {})
250 with self
.module
.rados
.open_ioctx2(int(pool_id
)) as ioctx
:
251 ioctx
.set_namespace(namespace
)
252 for image_meta
in rbd
.RBD().list2(ioctx
):
253 images
[image_meta
['id']] = image_meta
['name']
254 self
.log
.debug("resolve_image_names: {}={}".format(image_key
, images
))
256 def scrub_missing_images(self
):
257 for pool_key
, query
in self
.user_queries
.items():
258 raw_pool_counters
= query
.get(QUERY_RAW_POOL_COUNTERS
, {})
259 sum_pool_counters
= query
.get(QUERY_SUM_POOL_COUNTERS
, {})
260 for pool_id
, sum_namespaces
in sum_pool_counters
.items():
261 raw_namespaces
= raw_pool_counters
.get(pool_id
, {})
262 for namespace
, sum_images
in sum_namespaces
.items():
263 raw_images
= raw_namespaces
.get(namespace
, {})
265 image_key
= (pool_id
, namespace
)
266 image_names
= self
.image_name_cache
.get(image_key
, {})
267 for image_id
in list(sum_images
.keys()):
268 # scrub image counters if we failed to resolve image name
269 if image_id
not in image_names
:
270 self
.log
.debug("scrub_missing_images: dropping {}/{}".format(
271 image_key
, image_id
))
272 del sum_images
[image_id
]
273 if image_id
in raw_images
:
274 del raw_images
[image_id
]
276 def process_raw_osd_perf_counters(self
):
278 now_ts
= int(now
.strftime("%s"))
280 # clear the image name cache if we need to refresh all active pools
281 if self
.image_name_cache
and \
282 self
.image_name_refresh_time
+ POOL_REFRESH_INTERVAL
< now
:
283 self
.log
.debug("process_raw_osd_perf_counters: expiring image name cache")
284 self
.image_name_cache
= {}
286 resolve_image_names
= set()
287 for pool_key
, query
in self
.user_queries
.items():
288 if not query
[QUERY_IDS
]:
291 raw_pool_counters
= self
.merge_raw_osd_perf_counters(
292 pool_key
, query
, now_ts
, resolve_image_names
)
293 self
.sum_osd_perf_counters(query
, raw_pool_counters
, now_ts
)
295 if resolve_image_names
:
296 self
.image_name_refresh_time
= now
297 self
.refresh_image_names(resolve_image_names
)
298 self
.scrub_missing_images()
299 elif not self
.image_name_cache
:
300 self
.scrub_missing_images()
302 def resolve_pool_id(self
, pool_name
):
303 pool_id
= self
.module
.rados
.pool_lookup(pool_name
)
305 raise rados
.ObjectNotFound("Pool '{}' not found".format(pool_name
))
308 def scrub_expired_queries(self
):
309 # perf counters need to be periodically refreshed to continue
311 expire_time
= datetime
.now() - QUERY_EXPIRE_INTERVAL
312 for pool_key
in list(self
.user_queries
.keys()):
313 user_query
= self
.user_queries
[pool_key
]
314 if user_query
[QUERY_LAST_REQUEST
] < expire_time
:
315 self
.unregister_osd_perf_queries(pool_key
, user_query
[QUERY_IDS
])
316 del self
.user_queries
[pool_key
]
318 def register_osd_perf_queries(self
, pool_id
, namespace
):
321 for counter
in OSD_PERF_QUERY_COUNTERS
:
322 query
= self
.prepare_osd_perf_query(pool_id
, namespace
, counter
)
323 self
.log
.debug("register_osd_perf_queries: {}".format(query
))
325 query_id
= self
.module
.add_osd_perf_query(query
)
327 raise RuntimeError('Failed to add OSD perf query: {}'.format(query
))
328 query_ids
.append(query_id
)
331 for query_id
in query_ids
:
332 self
.module
.remove_osd_perf_query(query_id
)
337 def unregister_osd_perf_queries(self
, pool_key
, query_ids
):
338 self
.log
.info("unregister_osd_perf_queries: pool_key={}, query_ids={}".format(
339 pool_key
, query_ids
))
340 for query_id
in query_ids
:
341 self
.module
.remove_osd_perf_query(query_id
)
344 def register_query(self
, pool_key
):
345 if pool_key
not in self
.user_queries
:
348 pool_id
= self
.resolve_pool_id(pool_key
[0])
351 QUERY_POOL_ID
: pool_id
,
352 QUERY_POOL_ID_MAP
: {pool_id
: pool_key
[0]},
353 QUERY_IDS
: self
.register_osd_perf_queries(pool_id
, pool_key
[1]),
354 QUERY_LAST_REQUEST
: datetime
.now()
357 self
.user_queries
[pool_key
] = user_query
359 # force an immediate stat pull if this is a new query
360 self
.query_condition
.notify()
361 self
.refresh_condition
.wait(5)
364 user_query
= self
.user_queries
[pool_key
]
366 # ensure query doesn't expire
367 user_query
[QUERY_LAST_REQUEST
] = datetime
.now()
369 if pool_key
== GLOBAL_POOL_KEY
:
370 # refresh the global pool id -> name map upon each
372 user_query
[QUERY_POOL_ID_MAP
] = {
373 pool_id
: pool_name
for pool_id
, pool_name
374 in get_rbd_pools(self
.module
).items()}
376 self
.log
.debug("register_query: pool_key={}, query_ids={}".format(
377 pool_key
, user_query
[QUERY_IDS
]))
381 def extract_stat(self
, index
, raw_image
, sum_image
):
382 # require two raw counters between a fixed time window
383 if not raw_image
or not raw_image
[0] or not raw_image
[1]:
386 current_time
= raw_image
[0][0]
387 previous_time
= raw_image
[1][0]
388 if current_time
<= previous_time
or \
389 current_time
- previous_time
> STATS_RATE_INTERVAL
.total_seconds():
392 current_value
= raw_image
[0][1][index
]
393 instant_rate
= float(current_value
) / (current_time
- previous_time
)
395 # convert latencies from sum to average per op
397 if OSD_PERF_QUERY_COUNTERS
[index
] == 'write_latency':
398 ops_index
= OSD_PERF_QUERY_COUNTERS_INDICES
['write_ops']
399 elif OSD_PERF_QUERY_COUNTERS
[index
] == 'read_latency':
400 ops_index
= OSD_PERF_QUERY_COUNTERS_INDICES
['read_ops']
402 if ops_index
is not None:
403 ops
= max(1, self
.extract_stat(ops_index
, raw_image
, sum_image
))
408 def extract_counter(self
, index
, raw_image
, sum_image
):
410 return sum_image
[index
]
413 def generate_report(self
, query
, sort_by
, extract_data
):
414 pool_id_map
= query
[QUERY_POOL_ID_MAP
]
415 sum_pool_counters
= query
.setdefault(QUERY_SUM_POOL_COUNTERS
, {})
416 raw_pool_counters
= query
.setdefault(QUERY_RAW_POOL_COUNTERS
, {})
418 sort_by_index
= OSD_PERF_QUERY_COUNTERS
.index(sort_by
)
420 # pre-sort and limit the response
422 for pool_id
, sum_namespaces
in sum_pool_counters
.items():
423 if pool_id
not in pool_id_map
:
425 raw_namespaces
= raw_pool_counters
.get(pool_id
, {})
426 for namespace
, sum_images
in sum_namespaces
.items():
427 raw_images
= raw_namespaces
.get(namespace
, {})
428 for image_id
, sum_image
in sum_images
.items():
429 raw_image
= raw_images
.get(image_id
, [])
431 # always sort by recent IO activity
432 results
.append([(pool_id
, namespace
, image_id
),
433 self
.extract_stat(sort_by_index
, raw_image
,
435 results
= sorted(results
, key
=lambda x
: x
[1], reverse
=True)[:REPORT_MAX_RESULTS
]
437 # build the report in sorted order
438 pool_descriptors
= {}
440 for key
, _
in results
:
442 pool_name
= pool_id_map
[pool_id
]
446 image_names
= self
.image_name_cache
.get((pool_id
, namespace
), {})
447 image_name
= image_names
[image_id
]
449 raw_namespaces
= raw_pool_counters
.get(pool_id
, {})
450 raw_images
= raw_namespaces
.get(namespace
, {})
451 raw_image
= raw_images
.get(image_id
, [])
453 sum_namespaces
= sum_pool_counters
[pool_id
]
454 sum_images
= sum_namespaces
[namespace
]
455 sum_image
= sum_images
.get(image_id
, [])
457 pool_descriptor
= pool_name
459 pool_descriptor
+= "/{}".format(namespace
)
460 pool_index
= pool_descriptors
.setdefault(pool_descriptor
,
461 len(pool_descriptors
))
462 image_descriptor
= "{}/{}".format(pool_index
, image_name
)
463 data
= [extract_data(i
, raw_image
, sum_image
)
464 for i
in range(len(OSD_PERF_QUERY_COUNTERS
))]
466 # skip if no data to report
467 if data
== [0 for i
in range(len(OSD_PERF_QUERY_COUNTERS
))]:
470 counters
.append({image_descriptor
: data
})
472 return {idx
: descriptor
for descriptor
, idx
473 in pool_descriptors
.items()}, \
476 def get_perf_data(self
, report
, pool_spec
, sort_by
, extract_data
):
477 self
.log
.debug("get_perf_{}s: pool_spec={}, sort_by={}".format(
478 report
, pool_spec
, sort_by
))
479 self
.scrub_expired_queries()
481 pool_key
= extract_pool_key(pool_spec
)
482 user_query
= self
.register_query(pool_key
)
485 pool_descriptors
, counters
= self
.generate_report(
486 user_query
, sort_by
, extract_data
)
489 'timestamp': time
.mktime(now
.timetuple()),
490 '{}_descriptors'.format(report
): OSD_PERF_QUERY_COUNTERS
,
491 'pool_descriptors': pool_descriptors
,
492 '{}s'.format(report
): counters
495 return 0, json
.dumps(report
), ""
497 def get_perf_stats(self
, pool_spec
, sort_by
):
498 return self
.get_perf_data(
499 "stat", pool_spec
, sort_by
, self
.extract_stat
)
501 def get_perf_counters(self
, pool_spec
, sort_by
):
502 return self
.get_perf_data(
503 "counter", pool_spec
, sort_by
, self
.extract_counter
)
505 def handle_command(self
, inbuf
, prefix
, cmd
):
507 if prefix
== 'image stats':
508 return self
.get_perf_stats(cmd
.get('pool_spec', None),
509 cmd
.get('sort_by', OSD_PERF_QUERY_COUNTERS
[0]))
510 elif prefix
== 'image counters':
511 return self
.get_perf_counters(cmd
.get('pool_spec', None),
512 cmd
.get('sort_by', OSD_PERF_QUERY_COUNTERS
[0]))
514 raise NotImplementedError(cmd
['prefix'])
518 def __init__(self
, throttle_period
):
519 self
.throttle_period
= throttle_period
520 self
.time_of_last_call
= datetime
.min
522 def __call__(self
, fn
):
524 def wrapper(*args
, **kwargs
):
526 if self
.time_of_last_call
+ self
.throttle_period
<= now
:
527 self
.time_of_last_call
= now
528 return fn(*args
, **kwargs
)
533 def __init__(self
, sequence
, task_id
, message
, refs
):
534 self
.sequence
= sequence
535 self
.task_id
= task_id
536 self
.message
= message
538 self
.retry_time
= None
539 self
.in_progress
= False
541 self
.canceled
= False
545 return self
.to_json()
548 def sequence_key(self
):
549 return "{0:016X}".format(self
.sequence
)
553 self
.fail("Operation canceled")
555 def fail(self
, message
):
557 self
.failure_message
= message
560 d
= {TASK_SEQUENCE
: self
.sequence
,
561 TASK_ID
: self
.task_id
,
562 TASK_MESSAGE
: self
.message
,
566 d
[TASK_RETRY_TIME
] = self
.retry_time
.isoformat()
568 d
[TASK_IN_PROGRESS
] = True
569 d
[TASK_PROGRESS
] = self
.progress
571 d
[TASK_CANCELED
] = True
575 return str(json
.dumps(self
.to_dict()))
578 def from_json(cls
, val
):
581 action
= d
.get(TASK_REFS
, {}).get(TASK_REF_ACTION
)
582 if action
not in VALID_TASK_ACTIONS
:
583 raise ValueError("Invalid task action: {}".format(action
))
585 return Task(d
[TASK_SEQUENCE
], d
[TASK_ID
], d
[TASK_MESSAGE
], d
[TASK_REFS
])
586 except json
.JSONDecodeError
as e
:
587 raise ValueError("Invalid JSON ({})".format(str(e
)))
588 except KeyError as e
:
589 raise ValueError("Invalid task format (missing key {})".format(str(e
)))
594 condition
= Condition(lock
)
597 in_progress_task
= None
598 tasks_by_sequence
= dict()
605 def __init__(self
, module
):
607 self
.log
= module
.log
610 self
.init_task_queue()
612 self
.thread
= Thread(target
=self
.run
)
616 def default_pool_name(self
):
617 return self
.module
.get_ceph_option("rbd_default_pool")
619 def extract_pool_spec(self
, pool_spec
):
620 pool_spec
= extract_pool_key(pool_spec
)
621 if pool_spec
== GLOBAL_POOL_KEY
:
622 pool_spec
= (self
.default_pool_name
, '')
625 def extract_image_spec(self
, image_spec
):
626 match
= re
.match(r
'^(?:([^/]+)/(?:([^/]+)/)?)?([^/@]+)$',
629 raise ValueError("Invalid image spec: {}".format(image_spec
))
630 return (match
.group(1) or self
.default_pool_name
, match
.group(2) or '',
635 self
.log
.info("TaskHandler: starting")
639 for sequence
in sorted([sequence
for sequence
, task
640 in self
.tasks_by_sequence
.items()
641 if not task
.retry_time
or task
.retry_time
<= now
]):
642 self
.execute_task(sequence
)
644 self
.condition
.wait(5)
645 self
.log
.debug("TaskHandler: tick")
647 except Exception as ex
:
648 self
.log
.fatal("Fatal runtime error: {}\n{}".format(
649 ex
, traceback
.format_exc()))
652 def open_ioctx(self
, spec
):
654 with self
.module
.rados
.open_ioctx(spec
[0]) as ioctx
:
655 ioctx
.set_namespace(spec
[1])
657 except rados
.ObjectNotFound
:
658 self
.log
.error("Failed to locate pool {}".format(spec
[0]))
662 def format_image_spec(cls
, image_spec
):
663 image
= image_spec
[2]
665 image
= "{}/{}".format(image_spec
[1], image
)
667 image
= "{}/{}".format(image_spec
[0], image
)
670 def init_task_queue(self
):
671 for pool_id
, pool_name
in get_rbd_pools(self
.module
).items():
673 with self
.module
.rados
.open_ioctx2(int(pool_id
)) as ioctx
:
674 self
.load_task_queue(ioctx
, pool_name
)
677 namespaces
= rbd
.RBD().namespace_list(ioctx
)
678 except rbd
.OperationNotSupported
:
679 self
.log
.debug("Namespaces not supported")
682 for namespace
in namespaces
:
683 ioctx
.set_namespace(namespace
)
684 self
.load_task_queue(ioctx
, pool_name
)
686 except rados
.ObjectNotFound
:
690 if self
.tasks_by_sequence
:
691 self
.sequence
= list(sorted(self
.tasks_by_sequence
.keys()))[-1]
693 self
.log
.debug("sequence={}, tasks_by_sequence={}, tasks_by_id={}".format(
694 self
.sequence
, str(self
.tasks_by_sequence
), str(self
.tasks_by_id
)))
696 def load_task_queue(self
, ioctx
, pool_name
):
697 pool_spec
= pool_name
699 pool_spec
+= "/{}".format(ioctx
.nspace
)
704 with rados
.ReadOpCtx() as read_op
:
705 self
.log
.info("load_task_task: {}, start_after={}".format(
706 pool_spec
, start_after
))
707 it
, ret
= ioctx
.get_omap_vals(read_op
, start_after
, "", 128)
708 ioctx
.operate_read_op(read_op
, RBD_TASK_OID
)
714 self
.log
.info("load_task_task: task={}".format(v
))
717 task
= Task
.from_json(v
)
718 self
.append_task(task
)
720 self
.log
.error("Failed to decode task: pool_spec={}, task={}".format(pool_spec
, v
))
725 except StopIteration:
727 except rados
.ObjectNotFound
:
731 def append_task(self
, task
):
732 self
.tasks_by_sequence
[task
.sequence
] = task
733 self
.tasks_by_id
[task
.task_id
] = task
735 def task_refs_match(self
, task_refs
, refs
):
736 if TASK_REF_IMAGE_ID
not in refs
and TASK_REF_IMAGE_ID
in task_refs
:
737 task_refs
= task_refs
.copy()
738 del task_refs
[TASK_REF_IMAGE_ID
]
740 self
.log
.debug("task_refs_match: ref1={}, ref2={}".format(task_refs
, refs
))
741 return task_refs
== refs
743 def find_task(self
, refs
):
744 self
.log
.debug("find_task: refs={}".format(refs
))
746 # search for dups and return the original
747 for task_id
in reversed(sorted(self
.tasks_by_id
.keys())):
748 task
= self
.tasks_by_id
[task_id
]
749 if self
.task_refs_match(task
.refs
, refs
):
752 # search for a completed task (message replay)
753 for task
in reversed(self
.completed_tasks
):
754 if self
.task_refs_match(task
.refs
, refs
):
757 def add_task(self
, ioctx
, message
, refs
):
758 self
.log
.debug("add_task: message={}, refs={}".format(message
, refs
))
760 # ensure unique uuid across all pools
762 task_id
= str(uuid
.uuid4())
763 if task_id
not in self
.tasks_by_id
:
767 task
= Task(self
.sequence
, task_id
, message
, refs
)
769 # add the task to the rbd_task omap
770 task_json
= task
.to_json()
771 omap_keys
= (task
.sequence_key
, )
772 omap_vals
= (str.encode(task_json
), )
773 self
.log
.info("adding task: {} {}".format(omap_keys
[0], omap_vals
[0]))
775 with rados
.WriteOpCtx() as write_op
:
776 ioctx
.set_omap(write_op
, omap_keys
, omap_vals
)
777 ioctx
.operate_write_op(write_op
, RBD_TASK_OID
)
778 self
.append_task(task
)
780 self
.condition
.notify()
783 def remove_task(self
, ioctx
, task
, remove_in_memory
=True):
784 self
.log
.info("remove_task: task={}".format(str(task
)))
785 omap_keys
= (task
.sequence_key
, )
787 with rados
.WriteOpCtx() as write_op
:
788 ioctx
.remove_omap_keys(write_op
, omap_keys
)
789 ioctx
.operate_write_op(write_op
, RBD_TASK_OID
)
790 except rados
.ObjectNotFound
:
795 del self
.tasks_by_id
[task
.task_id
]
796 del self
.tasks_by_sequence
[task
.sequence
]
798 # keep a record of the last N tasks to help avoid command replay
800 if not task
.failed
and not task
.canceled
:
801 self
.log
.debug("remove_task: moving to completed tasks")
802 self
.completed_tasks
.append(task
)
803 self
.completed_tasks
= self
.completed_tasks
[-MAX_COMPLETED_TASKS
:]
808 def execute_task(self
, sequence
):
809 task
= self
.tasks_by_sequence
[sequence
]
810 self
.log
.info("execute_task: task={}".format(str(task
)))
814 with self
.open_ioctx((task
.refs
[TASK_REF_POOL_NAME
],
815 task
.refs
[TASK_REF_POOL_NAMESPACE
])) as ioctx
:
818 action
= task
.refs
[TASK_REF_ACTION
]
819 execute_fn
= {TASK_REF_ACTION_FLATTEN
: self
.execute_flatten
,
820 TASK_REF_ACTION_REMOVE
: self
.execute_remove
,
821 TASK_REF_ACTION_TRASH_REMOVE
: self
.execute_trash_remove
,
822 TASK_REF_ACTION_MIGRATION_EXECUTE
: self
.execute_migration_execute
,
823 TASK_REF_ACTION_MIGRATION_COMMIT
: self
.execute_migration_commit
,
824 TASK_REF_ACTION_MIGRATION_ABORT
: self
.execute_migration_abort
827 self
.log
.error("Invalid task action: {}".format(action
))
829 task
.in_progress
= True
830 self
.in_progress_task
= task
831 self
.update_progress(task
, 0)
835 execute_fn(ioctx
, task
)
837 except rbd
.OperationCanceled
:
838 self
.log
.info("Operation canceled: task={}".format(
844 task
.in_progress
= False
845 self
.in_progress_task
= None
847 self
.complete_progress(task
)
848 self
.remove_task(ioctx
, task
)
850 except rados
.ObjectNotFound
as e
:
851 self
.log
.error("execute_task: {}".format(e
))
853 self
.update_progress(task
, 0)
855 # pool DNE -- remove the task
856 self
.complete_progress(task
)
857 self
.remove_task(ioctx
, task
)
859 except (rados
.Error
, rbd
.Error
) as e
:
860 self
.log
.error("execute_task: {}".format(e
))
861 self
.update_progress(task
, 0)
864 task
.in_progress
= False
865 task
.retry_time
= datetime
.now() + TASK_RETRY_INTERVAL
867 def progress_callback(self
, task
, current
, total
):
868 progress
= float(current
) / float(total
)
869 self
.log
.debug("progress_callback: task={}, progress={}".format(
870 str(task
), progress
))
872 # avoid deadlocking when a new command comes in during a progress callback
873 if not self
.lock
.acquire(False):
877 if not self
.in_progress_task
or self
.in_progress_task
.canceled
:
878 return -rbd
.ECANCELED
879 self
.in_progress_task
.progress
= progress
883 self
.throttled_update_progress(task
, progress
)
886 def execute_flatten(self
, ioctx
, task
):
887 self
.log
.info("execute_flatten: task={}".format(str(task
)))
890 with rbd
.Image(ioctx
, task
.refs
[TASK_REF_IMAGE_NAME
]) as image
:
891 image
.flatten(on_progress
=partial(self
.progress_callback
, task
))
892 except rbd
.InvalidArgument
:
893 task
.fail("Image does not have parent")
894 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
895 except rbd
.ImageNotFound
:
896 task
.fail("Image does not exist")
897 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
899 def execute_remove(self
, ioctx
, task
):
900 self
.log
.info("execute_remove: task={}".format(str(task
)))
903 rbd
.RBD().remove(ioctx
, task
.refs
[TASK_REF_IMAGE_NAME
],
904 on_progress
=partial(self
.progress_callback
, task
))
905 except rbd
.ImageNotFound
:
906 task
.fail("Image does not exist")
907 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
909 def execute_trash_remove(self
, ioctx
, task
):
910 self
.log
.info("execute_trash_remove: task={}".format(str(task
)))
913 rbd
.RBD().trash_remove(ioctx
, task
.refs
[TASK_REF_IMAGE_ID
],
914 on_progress
=partial(self
.progress_callback
, task
))
915 except rbd
.ImageNotFound
:
916 task
.fail("Image does not exist")
917 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
919 def execute_migration_execute(self
, ioctx
, task
):
920 self
.log
.info("execute_migration_execute: task={}".format(str(task
)))
923 rbd
.RBD().migration_execute(ioctx
, task
.refs
[TASK_REF_IMAGE_NAME
],
924 on_progress
=partial(self
.progress_callback
, task
))
925 except rbd
.ImageNotFound
:
926 task
.fail("Image does not exist")
927 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
928 except rbd
.InvalidArgument
:
929 task
.fail("Image is not migrating")
930 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
932 def execute_migration_commit(self
, ioctx
, task
):
933 self
.log
.info("execute_migration_commit: task={}".format(str(task
)))
936 rbd
.RBD().migration_commit(ioctx
, task
.refs
[TASK_REF_IMAGE_NAME
],
937 on_progress
=partial(self
.progress_callback
, task
))
938 except rbd
.ImageNotFound
:
939 task
.fail("Image does not exist")
940 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
941 except rbd
.InvalidArgument
:
942 task
.fail("Image is not migrating or migration not executed")
943 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
945 def execute_migration_abort(self
, ioctx
, task
):
946 self
.log
.info("execute_migration_abort: task={}".format(str(task
)))
949 rbd
.RBD().migration_abort(ioctx
, task
.refs
[TASK_REF_IMAGE_NAME
],
950 on_progress
=partial(self
.progress_callback
, task
))
951 except rbd
.ImageNotFound
:
952 task
.fail("Image does not exist")
953 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
954 except rbd
.InvalidArgument
:
955 task
.fail("Image is not migrating")
956 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
958 def complete_progress(self
, task
):
959 self
.log
.debug("complete_progress: task={}".format(str(task
)))
962 self
.module
.remote("progress", "fail", task
.task_id
,
963 task
.failure_message
)
965 self
.module
.remote("progress", "complete", task
.task_id
)
967 # progress module is disabled
970 def update_progress(self
, task
, progress
):
971 self
.log
.debug("update_progress: task={}, progress={}".format(str(task
), progress
))
973 refs
= {"origin": "rbd_support"}
974 refs
.update(task
.refs
)
976 self
.module
.remote("progress", "update", task
.task_id
,
977 task
.message
, progress
, refs
)
979 # progress module is disabled
982 @Throttle(timedelta(seconds
=1))
983 def throttled_update_progress(self
, task
, progress
):
984 self
.update_progress(task
, progress
)
986 def queue_flatten(self
, image_spec
):
987 image_spec
= self
.extract_image_spec(image_spec
)
988 self
.log
.info("queue_flatten: {}".format(image_spec
))
990 refs
= {TASK_REF_ACTION
: TASK_REF_ACTION_FLATTEN
,
991 TASK_REF_POOL_NAME
: image_spec
[0],
992 TASK_REF_POOL_NAMESPACE
: image_spec
[1],
993 TASK_REF_IMAGE_NAME
: image_spec
[2]}
995 with self
.open_ioctx(image_spec
) as ioctx
:
997 with rbd
.Image(ioctx
, image_spec
[2]) as image
:
998 refs
[TASK_REF_IMAGE_ID
] = image
.id()
1001 parent_image_id
= image
.parent_id()
1002 except rbd
.ImageNotFound
:
1003 parent_image_id
= None
1005 except rbd
.ImageNotFound
:
1008 task
= self
.find_task(refs
)
1010 return 0, task
.to_json(), ''
1012 if TASK_REF_IMAGE_ID
not in refs
:
1013 raise rbd
.ImageNotFound("Image {} does not exist".format(
1014 self
.format_image_spec(image_spec
)), errno
=errno
.ENOENT
)
1015 if not parent_image_id
:
1016 raise rbd
.ImageNotFound("Image {} does not have a parent".format(
1017 self
.format_image_spec(image_spec
)), errno
=errno
.ENOENT
)
1019 return 0, self
.add_task(ioctx
,
1020 "Flattening image {}".format(
1021 self
.format_image_spec(image_spec
)),
1024 def queue_remove(self
, image_spec
):
1025 image_spec
= self
.extract_image_spec(image_spec
)
1026 self
.log
.info("queue_remove: {}".format(image_spec
))
1028 refs
= {TASK_REF_ACTION
: TASK_REF_ACTION_REMOVE
,
1029 TASK_REF_POOL_NAME
: image_spec
[0],
1030 TASK_REF_POOL_NAMESPACE
: image_spec
[1],
1031 TASK_REF_IMAGE_NAME
: image_spec
[2]}
1033 with self
.open_ioctx(image_spec
) as ioctx
:
1035 with rbd
.Image(ioctx
, image_spec
[2]) as image
:
1036 refs
[TASK_REF_IMAGE_ID
] = image
.id()
1037 snaps
= list(image
.list_snaps())
1039 except rbd
.ImageNotFound
:
1042 task
= self
.find_task(refs
)
1044 return 0, task
.to_json(), ''
1046 if TASK_REF_IMAGE_ID
not in refs
:
1047 raise rbd
.ImageNotFound("Image {} does not exist".format(
1048 self
.format_image_spec(image_spec
)), errno
=errno
.ENOENT
)
1050 raise rbd
.ImageBusy("Image {} has snapshots".format(
1051 self
.format_image_spec(image_spec
)), errno
=errno
.EBUSY
)
1053 return 0, self
.add_task(ioctx
,
1054 "Removing image {}".format(
1055 self
.format_image_spec(image_spec
)),
1058 def queue_trash_remove(self
, image_id_spec
):
1059 image_id_spec
= self
.extract_image_spec(image_id_spec
)
1060 self
.log
.info("queue_trash_remove: {}".format(image_id_spec
))
1062 refs
= {TASK_REF_ACTION
: TASK_REF_ACTION_TRASH_REMOVE
,
1063 TASK_REF_POOL_NAME
: image_id_spec
[0],
1064 TASK_REF_POOL_NAMESPACE
: image_id_spec
[1],
1065 TASK_REF_IMAGE_ID
: image_id_spec
[2]}
1066 task
= self
.find_task(refs
)
1068 return 0, task
.to_json(), ''
1070 # verify that image exists in trash
1071 with self
.open_ioctx(image_id_spec
) as ioctx
:
1072 rbd
.RBD().trash_get(ioctx
, image_id_spec
[2])
1074 return 0, self
.add_task(ioctx
,
1075 "Removing image {} from trash".format(
1076 self
.format_image_spec(image_id_spec
)),
1079 def get_migration_status(self
, ioctx
, image_spec
):
1081 return rbd
.RBD().migration_status(ioctx
, image_spec
[2])
1082 except (rbd
.InvalidArgument
, rbd
.ImageNotFound
):
1085 def validate_image_migrating(self
, image_spec
, migration_status
):
1086 if not migration_status
:
1087 raise rbd
.InvalidArgument("Image {} is not migrating".format(
1088 self
.format_image_spec(image_spec
)), errno
=errno
.EINVAL
)
1090 def resolve_pool_name(self
, pool_id
):
1091 osd_map
= self
.module
.get('osd_map')
1092 for pool
in osd_map
['pools']:
1093 if pool
['pool'] == pool_id
:
1094 return pool
['pool_name']
1097 def queue_migration_execute(self
, image_spec
):
1098 image_spec
= self
.extract_image_spec(image_spec
)
1099 self
.log
.info("queue_migration_execute: {}".format(image_spec
))
1101 refs
= {TASK_REF_ACTION
: TASK_REF_ACTION_MIGRATION_EXECUTE
,
1102 TASK_REF_POOL_NAME
: image_spec
[0],
1103 TASK_REF_POOL_NAMESPACE
: image_spec
[1],
1104 TASK_REF_IMAGE_NAME
: image_spec
[2]}
1106 with self
.open_ioctx(image_spec
) as ioctx
:
1107 status
= self
.get_migration_status(ioctx
, image_spec
)
1109 refs
[TASK_REF_IMAGE_ID
] = status
['dest_image_id']
1111 task
= self
.find_task(refs
)
1113 return 0, task
.to_json(), ''
1115 self
.validate_image_migrating(image_spec
, status
)
1116 if status
['state'] not in [rbd
.RBD_IMAGE_MIGRATION_STATE_PREPARED
,
1117 rbd
.RBD_IMAGE_MIGRATION_STATE_EXECUTING
]:
1118 raise rbd
.InvalidArgument("Image {} is not in ready state".format(
1119 self
.format_image_spec(image_spec
)), errno
=errno
.EINVAL
)
1121 source_pool
= self
.resolve_pool_name(status
['source_pool_id'])
1122 dest_pool
= self
.resolve_pool_name(status
['dest_pool_id'])
1123 return 0, self
.add_task(ioctx
,
1124 "Migrating image {} to {}".format(
1125 self
.format_image_spec((source_pool
,
1126 status
['source_pool_namespace'],
1127 status
['source_image_name'])),
1128 self
.format_image_spec((dest_pool
,
1129 status
['dest_pool_namespace'],
1130 status
['dest_image_name']))),
1133 def queue_migration_commit(self
, image_spec
):
1134 image_spec
= self
.extract_image_spec(image_spec
)
1135 self
.log
.info("queue_migration_commit: {}".format(image_spec
))
1137 refs
= {TASK_REF_ACTION
: TASK_REF_ACTION_MIGRATION_COMMIT
,
1138 TASK_REF_POOL_NAME
: image_spec
[0],
1139 TASK_REF_POOL_NAMESPACE
: image_spec
[1],
1140 TASK_REF_IMAGE_NAME
: image_spec
[2]}
1142 with self
.open_ioctx(image_spec
) as ioctx
:
1143 status
= self
.get_migration_status(ioctx
, image_spec
)
1145 refs
[TASK_REF_IMAGE_ID
] = status
['dest_image_id']
1147 task
= self
.find_task(refs
)
1149 return 0, task
.to_json(), ''
1151 self
.validate_image_migrating(image_spec
, status
)
1152 if status
['state'] != rbd
.RBD_IMAGE_MIGRATION_STATE_EXECUTED
:
1153 raise rbd
.InvalidArgument("Image {} has not completed migration".format(
1154 self
.format_image_spec(image_spec
)), errno
=errno
.EINVAL
)
1156 return 0, self
.add_task(ioctx
,
1157 "Committing image migration for {}".format(
1158 self
.format_image_spec(image_spec
)),
1161 def queue_migration_abort(self
, image_spec
):
1162 image_spec
= self
.extract_image_spec(image_spec
)
1163 self
.log
.info("queue_migration_abort: {}".format(image_spec
))
1165 refs
= {TASK_REF_ACTION
: TASK_REF_ACTION_MIGRATION_ABORT
,
1166 TASK_REF_POOL_NAME
: image_spec
[0],
1167 TASK_REF_POOL_NAMESPACE
: image_spec
[1],
1168 TASK_REF_IMAGE_NAME
: image_spec
[2]}
1170 with self
.open_ioctx(image_spec
) as ioctx
:
1171 status
= self
.get_migration_status(ioctx
, image_spec
)
1173 refs
[TASK_REF_IMAGE_ID
] = status
['dest_image_id']
1175 task
= self
.find_task(refs
)
1177 return 0, task
.to_json(), ''
1179 self
.validate_image_migrating(image_spec
, status
)
1180 return 0, self
.add_task(ioctx
,
1181 "Aborting image migration for {}".format(
1182 self
.format_image_spec(image_spec
)),
1185 def task_cancel(self
, task_id
):
1186 self
.log
.info("task_cancel: {}".format(task_id
))
1188 if task_id
not in self
.tasks_by_id
:
1189 return -errno
.ENOENT
, '', "No such task {}".format(task_id
)
1191 task
= self
.tasks_by_id
[task_id
]
1194 remove_in_memory
= True
1195 if self
.in_progress_task
and self
.in_progress_task
.task_id
== task_id
:
1196 self
.log
.info("Attempting to cancel in-progress task: {}".format(str(self
.in_progress_task
)))
1197 remove_in_memory
= False
1199 # complete any associated event in the progress module
1200 self
.complete_progress(task
)
1202 # remove from rbd_task omap
1203 with self
.open_ioctx((task
.refs
[TASK_REF_POOL_NAME
],
1204 task
.refs
[TASK_REF_POOL_NAMESPACE
])) as ioctx
:
1205 self
.remove_task(ioctx
, task
, remove_in_memory
)
1209 def task_list(self
, task_id
):
1210 self
.log
.info("task_list: {}".format(task_id
))
1213 if task_id
not in self
.tasks_by_id
:
1214 return -errno
.ENOENT
, '', "No such task {}".format(task_id
)
1216 result
= self
.tasks_by_id
[task_id
].to_dict()
1219 for sequence
in sorted(self
.tasks_by_sequence
.keys()):
1220 task
= self
.tasks_by_sequence
[sequence
]
1221 result
.append(task
.to_dict())
1223 return 0, json
.dumps(result
), ""
1225 def handle_command(self
, inbuf
, prefix
, cmd
):
1227 if prefix
== 'add flatten':
1228 return self
.queue_flatten(cmd
['image_spec'])
1229 elif prefix
== 'add remove':
1230 return self
.queue_remove(cmd
['image_spec'])
1231 elif prefix
== 'add trash remove':
1232 return self
.queue_trash_remove(cmd
['image_id_spec'])
1233 elif prefix
== 'add migration execute':
1234 return self
.queue_migration_execute(cmd
['image_spec'])
1235 elif prefix
== 'add migration commit':
1236 return self
.queue_migration_commit(cmd
['image_spec'])
1237 elif prefix
== 'add migration abort':
1238 return self
.queue_migration_abort(cmd
['image_spec'])
1239 elif prefix
== 'cancel':
1240 return self
.task_cancel(cmd
['task_id'])
1241 elif prefix
== 'list':
1242 return self
.task_list(cmd
.get('task_id'))
1244 raise NotImplementedError(cmd
['prefix'])
1247 class Module(MgrModule
):
1250 "cmd": "rbd perf image stats "
1251 "name=pool_spec,type=CephString,req=false "
1252 "name=sort_by,type=CephChoices,strings="
1253 "write_ops|write_bytes|write_latency|"
1254 "read_ops|read_bytes|read_latency,"
1256 "desc": "Retrieve current RBD IO performance stats",
1260 "cmd": "rbd perf image counters "
1261 "name=pool_spec,type=CephString,req=false "
1262 "name=sort_by,type=CephChoices,strings="
1263 "write_ops|write_bytes|write_latency|"
1264 "read_ops|read_bytes|read_latency,"
1266 "desc": "Retrieve current RBD IO performance counters",
1270 "cmd": "rbd task add flatten "
1271 "name=image_spec,type=CephString",
1272 "desc": "Flatten a cloned image asynchronously in the background",
1276 "cmd": "rbd task add remove "
1277 "name=image_spec,type=CephString",
1278 "desc": "Remove an image asynchronously in the background",
1282 "cmd": "rbd task add trash remove "
1283 "name=image_id_spec,type=CephString",
1284 "desc": "Remove an image from the trash asynchronously in the background",
1288 "cmd": "rbd task add migration execute "
1289 "name=image_spec,type=CephString",
1290 "desc": "Execute an image migration asynchronously in the background",
1294 "cmd": "rbd task add migration commit "
1295 "name=image_spec,type=CephString",
1296 "desc": "Commit an executed migration asynchronously in the background",
1300 "cmd": "rbd task add migration abort "
1301 "name=image_spec,type=CephString",
1302 "desc": "Abort a prepared migration asynchronously in the background",
1306 "cmd": "rbd task cancel "
1307 "name=task_id,type=CephString ",
1308 "desc": "Cancel a pending or running asynchronous task",
1312 "cmd": "rbd task list "
1313 "name=task_id,type=CephString,req=false ",
1314 "desc": "List pending or running asynchronous tasks",
1323 def __init__(self
, *args
, **kwargs
):
1324 super(Module
, self
).__init
__(*args
, **kwargs
)
1325 self
.perf
= PerfHandler(self
)
1326 self
.task
= TaskHandler(self
)
1328 def handle_command(self
, inbuf
, cmd
):
1329 prefix
= cmd
['prefix']
1332 if prefix
.startswith('rbd perf '):
1333 return self
.perf
.handle_command(inbuf
, prefix
[9:], cmd
)
1334 elif prefix
.startswith('rbd task '):
1335 return self
.task
.handle_command(inbuf
, prefix
[9:], cmd
)
1337 except Exception as ex
:
1338 # log the full traceback but don't send it to the CLI user
1339 self
.log
.fatal("Fatal runtime error: {}\n{}".format(
1340 ex
, traceback
.format_exc()))
1343 except rados
.Error
as ex
:
1344 return -ex
.errno
, "", str(ex
)
1345 except rbd
.OSError as ex
:
1346 return -ex
.errno
, "", str(ex
)
1347 except rbd
.Error
as ex
:
1348 return -errno
.EINVAL
, "", str(ex
)
1349 except KeyError as ex
:
1350 return -errno
.ENOENT
, "", str(ex
)
1351 except ValueError as ex
:
1352 return -errno
.EINVAL
, "", str(ex
)
1354 raise NotImplementedError(cmd
['prefix'])