]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | """ |
2 | RBD support module | |
3 | """ | |
4 | ||
494da23a | 5 | import errno |
11fdf7f2 | 6 | import json |
494da23a TL |
7 | import rados |
8 | import rbd | |
9 | import re | |
11fdf7f2 TL |
10 | import time |
11 | import traceback | |
494da23a | 12 | import uuid |
11fdf7f2 TL |
13 | |
14 | from mgr_module import MgrModule | |
15 | ||
494da23a | 16 | from contextlib import contextmanager |
11fdf7f2 | 17 | from datetime import datetime, timedelta |
494da23a | 18 | from functools import partial, wraps |
11fdf7f2 TL |
19 | from threading import Condition, Lock, Thread |
20 | ||
494da23a | 21 | |
11fdf7f2 TL |
22 | GLOBAL_POOL_KEY = (None, None) |
23 | ||
24 | QUERY_POOL_ID = "pool_id" | |
25 | QUERY_POOL_ID_MAP = "pool_id_map" | |
26 | QUERY_IDS = "query_ids" | |
27 | QUERY_SUM_POOL_COUNTERS = "pool_counters" | |
28 | QUERY_RAW_POOL_COUNTERS = "raw_pool_counters" | |
29 | QUERY_LAST_REQUEST = "last_request" | |
30 | ||
31 | OSD_PERF_QUERY_REGEX_MATCH_ALL = '^(.*)$' | |
32 | OSD_PERF_QUERY_COUNTERS = ['write_ops', | |
33 | 'read_ops', | |
34 | 'write_bytes', | |
35 | 'read_bytes', | |
36 | 'write_latency', | |
37 | 'read_latency'] | |
38 | OSD_PERF_QUERY_COUNTERS_INDICES = { | |
39 | OSD_PERF_QUERY_COUNTERS[i]: i for i in range(len(OSD_PERF_QUERY_COUNTERS))} | |
40 | ||
41 | OSD_PERF_QUERY_LATENCY_COUNTER_INDICES = [4, 5] | |
42 | OSD_PERF_QUERY_MAX_RESULTS = 256 | |
43 | ||
44 | POOL_REFRESH_INTERVAL = timedelta(minutes=5) | |
45 | QUERY_EXPIRE_INTERVAL = timedelta(minutes=1) | |
46 | STATS_RATE_INTERVAL = timedelta(minutes=1) | |
47 | ||
48 | REPORT_MAX_RESULTS = 64 | |
49 | ||
494da23a | 50 | RBD_TASK_OID = "rbd_task" |
11fdf7f2 | 51 | |
494da23a TL |
52 | TASK_SEQUENCE = "sequence" |
53 | TASK_ID = "id" | |
54 | TASK_REFS = "refs" | |
55 | TASK_MESSAGE = "message" | |
56 | TASK_RETRY_TIME = "retry_time" | |
57 | TASK_IN_PROGRESS = "in_progress" | |
58 | TASK_PROGRESS = "progress" | |
59 | TASK_CANCELED = "canceled" | |
60 | ||
61 | TASK_REF_POOL_NAME = "pool_name" | |
62 | TASK_REF_POOL_NAMESPACE = "pool_namespace" | |
63 | TASK_REF_IMAGE_NAME = "image_name" | |
64 | TASK_REF_IMAGE_ID = "image_id" | |
65 | TASK_REF_ACTION = "action" | |
66 | ||
67 | TASK_REF_ACTION_FLATTEN = "flatten" | |
68 | TASK_REF_ACTION_REMOVE = "remove" | |
69 | TASK_REF_ACTION_TRASH_REMOVE = "trash remove" | |
70 | TASK_REF_ACTION_MIGRATION_EXECUTE = "migrate execute" | |
71 | TASK_REF_ACTION_MIGRATION_COMMIT = "migrate commit" | |
72 | TASK_REF_ACTION_MIGRATION_ABORT = "migrate abort" | |
73 | ||
74 | VALID_TASK_ACTIONS = [TASK_REF_ACTION_FLATTEN, | |
75 | TASK_REF_ACTION_REMOVE, | |
76 | TASK_REF_ACTION_TRASH_REMOVE, | |
77 | TASK_REF_ACTION_MIGRATION_EXECUTE, | |
78 | TASK_REF_ACTION_MIGRATION_COMMIT, | |
79 | TASK_REF_ACTION_MIGRATION_ABORT] | |
80 | ||
81 | TASK_RETRY_INTERVAL = timedelta(seconds=30) | |
82 | MAX_COMPLETED_TASKS = 50 | |
83 | ||
84 | ||
85 | def extract_pool_key(pool_spec): | |
86 | if not pool_spec: | |
87 | return GLOBAL_POOL_KEY | |
88 | ||
89 | match = re.match(r'^([^/]+)(?:/([^/]+))?$', pool_spec) | |
90 | if not match: | |
91 | raise ValueError("Invalid pool spec: {}".format(pool_spec)) | |
92 | return (match.group(1), match.group(2) or '') | |
11fdf7f2 | 93 | |
494da23a TL |
94 | |
95 | def get_rbd_pools(module): | |
96 | osd_map = module.get('osd_map') | |
97 | return {pool['pool']: pool['pool_name'] for pool in osd_map['pools'] | |
98 | if 'rbd' in pool.get('application_metadata', {})} | |
99 | ||
100 | ||
101 | class PerfHandler: | |
11fdf7f2 TL |
102 | user_queries = {} |
103 | image_cache = {} | |
104 | ||
105 | lock = Lock() | |
106 | query_condition = Condition(lock) | |
107 | refresh_condition = Condition(lock) | |
108 | thread = None | |
109 | ||
110 | image_name_cache = {} | |
111 | image_name_refresh_time = datetime.fromtimestamp(0) | |
112 | ||
113 | @classmethod | |
114 | def prepare_regex(cls, value): | |
115 | return '^({})$'.format(value) | |
116 | ||
117 | @classmethod | |
118 | def prepare_osd_perf_query(cls, pool_id, namespace, counter_type): | |
119 | pool_id_regex = OSD_PERF_QUERY_REGEX_MATCH_ALL | |
120 | namespace_regex = OSD_PERF_QUERY_REGEX_MATCH_ALL | |
121 | if pool_id: | |
122 | pool_id_regex = cls.prepare_regex(pool_id) | |
123 | if namespace: | |
124 | namespace_regex = cls.prepare_regex(namespace) | |
125 | ||
126 | return { | |
127 | 'key_descriptor': [ | |
128 | {'type': 'pool_id', 'regex': pool_id_regex}, | |
129 | {'type': 'namespace', 'regex': namespace_regex}, | |
130 | {'type': 'object_name', | |
131 | 'regex': '^(?:rbd|journal)_data\\.(?:([0-9]+)\\.)?([^.]+)\\.'}, | |
132 | ], | |
133 | 'performance_counter_descriptors': OSD_PERF_QUERY_COUNTERS, | |
134 | 'limit': {'order_by': counter_type, | |
135 | 'max_count': OSD_PERF_QUERY_MAX_RESULTS}, | |
136 | } | |
137 | ||
11fdf7f2 TL |
138 | @classmethod |
139 | def pool_spec_search_keys(cls, pool_key): | |
140 | return [pool_key[0:len(pool_key) - x] | |
141 | for x in range(0, len(pool_key) + 1)] | |
142 | ||
143 | @classmethod | |
144 | def submatch_pool_key(cls, pool_key, search_key): | |
494da23a TL |
145 | return ((pool_key[1] == search_key[1] or not search_key[1]) |
146 | and (pool_key[0] == search_key[0] or not search_key[0])) | |
147 | ||
148 | def __init__(self, module): | |
149 | self.module = module | |
150 | self.log = module.log | |
11fdf7f2 | 151 | |
11fdf7f2 TL |
152 | self.thread = Thread(target=self.run) |
153 | self.thread.start() | |
154 | ||
155 | def run(self): | |
156 | try: | |
494da23a | 157 | self.log.info("PerfHandler: starting") |
11fdf7f2 TL |
158 | while True: |
159 | with self.lock: | |
160 | self.scrub_expired_queries() | |
161 | self.process_raw_osd_perf_counters() | |
162 | self.refresh_condition.notify() | |
163 | ||
494da23a | 164 | stats_period = int(self.module.get_ceph_option("mgr_stats_period")) |
11fdf7f2 TL |
165 | self.query_condition.wait(stats_period) |
166 | ||
494da23a | 167 | self.log.debug("PerfHandler: tick") |
11fdf7f2 TL |
168 | |
169 | except Exception as ex: | |
170 | self.log.fatal("Fatal runtime error: {}\n{}".format( | |
171 | ex, traceback.format_exc())) | |
172 | ||
173 | def merge_raw_osd_perf_counters(self, pool_key, query, now_ts, | |
174 | resolve_image_names): | |
175 | pool_id_map = query[QUERY_POOL_ID_MAP] | |
176 | ||
177 | # collect and combine the raw counters from all sort orders | |
178 | raw_pool_counters = query.setdefault(QUERY_RAW_POOL_COUNTERS, {}) | |
179 | for query_id in query[QUERY_IDS]: | |
494da23a | 180 | res = self.module.get_osd_perf_counters(query_id) |
11fdf7f2 TL |
181 | for counter in res['counters']: |
182 | # replace pool id from object name if it exists | |
183 | k = counter['k'] | |
184 | pool_id = int(k[2][0]) if k[2][0] else int(k[0][0]) | |
185 | namespace = k[1][0] | |
186 | image_id = k[2][1] | |
187 | ||
188 | # ignore metrics from non-matching pools/namespaces | |
189 | if pool_id not in pool_id_map: | |
190 | continue | |
191 | if pool_key[1] is not None and pool_key[1] != namespace: | |
192 | continue | |
193 | ||
194 | # flag the pool (and namespace) for refresh if we cannot find | |
195 | # image name in the cache | |
196 | resolve_image_key = (pool_id, namespace) | |
197 | if image_id not in self.image_name_cache.get(resolve_image_key, {}): | |
198 | resolve_image_names.add(resolve_image_key) | |
199 | ||
200 | # copy the 'sum' counter values for each image (ignore count) | |
201 | # if we haven't already processed it for this round | |
202 | raw_namespaces = raw_pool_counters.setdefault(pool_id, {}) | |
203 | raw_images = raw_namespaces.setdefault(namespace, {}) | |
204 | raw_image = raw_images.setdefault(image_id, [None, None]) | |
205 | ||
206 | # save the last two perf counters for each image | |
207 | if raw_image[0] and raw_image[0][0] < now_ts: | |
208 | raw_image[1] = raw_image[0] | |
209 | raw_image[0] = None | |
210 | if not raw_image[0]: | |
211 | raw_image[0] = [now_ts, [int(x[0]) for x in counter['c']]] | |
212 | ||
213 | self.log.debug("merge_raw_osd_perf_counters: {}".format(raw_pool_counters)) | |
214 | return raw_pool_counters | |
215 | ||
216 | def sum_osd_perf_counters(self, query, raw_pool_counters, now_ts): | |
217 | # update the cumulative counters for each image | |
218 | sum_pool_counters = query.setdefault(QUERY_SUM_POOL_COUNTERS, {}) | |
219 | for pool_id, raw_namespaces in raw_pool_counters.items(): | |
220 | sum_namespaces = sum_pool_counters.setdefault(pool_id, {}) | |
221 | for namespace, raw_images in raw_namespaces.items(): | |
222 | sum_namespace = sum_namespaces.setdefault(namespace, {}) | |
223 | for image_id, raw_image in raw_images.items(): | |
224 | # zero-out non-updated raw counters | |
225 | if not raw_image[0]: | |
226 | continue | |
227 | elif raw_image[0][0] < now_ts: | |
228 | raw_image[1] = raw_image[0] | |
229 | raw_image[0] = [now_ts, [0 for x in raw_image[1][1]]] | |
230 | continue | |
231 | ||
232 | counters = raw_image[0][1] | |
233 | ||
234 | # copy raw counters if this is a newly discovered image or | |
235 | # increment existing counters | |
236 | sum_image = sum_namespace.setdefault(image_id, None) | |
237 | if sum_image: | |
238 | for i in range(len(counters)): | |
239 | sum_image[i] += counters[i] | |
240 | else: | |
241 | sum_namespace[image_id] = [x for x in counters] | |
242 | ||
243 | self.log.debug("sum_osd_perf_counters: {}".format(sum_pool_counters)) | |
244 | return sum_pool_counters | |
245 | ||
246 | def refresh_image_names(self, resolve_image_names): | |
11fdf7f2 TL |
247 | for pool_id, namespace in resolve_image_names: |
248 | image_key = (pool_id, namespace) | |
249 | images = self.image_name_cache.setdefault(image_key, {}) | |
494da23a | 250 | with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: |
11fdf7f2 | 251 | ioctx.set_namespace(namespace) |
494da23a | 252 | for image_meta in rbd.RBD().list2(ioctx): |
11fdf7f2 TL |
253 | images[image_meta['id']] = image_meta['name'] |
254 | self.log.debug("resolve_image_names: {}={}".format(image_key, images)) | |
255 | ||
256 | def scrub_missing_images(self): | |
257 | for pool_key, query in self.user_queries.items(): | |
258 | raw_pool_counters = query.get(QUERY_RAW_POOL_COUNTERS, {}) | |
259 | sum_pool_counters = query.get(QUERY_SUM_POOL_COUNTERS, {}) | |
260 | for pool_id, sum_namespaces in sum_pool_counters.items(): | |
261 | raw_namespaces = raw_pool_counters.get(pool_id, {}) | |
262 | for namespace, sum_images in sum_namespaces.items(): | |
263 | raw_images = raw_namespaces.get(namespace, {}) | |
264 | ||
265 | image_key = (pool_id, namespace) | |
266 | image_names = self.image_name_cache.get(image_key, {}) | |
267 | for image_id in list(sum_images.keys()): | |
268 | # scrub image counters if we failed to resolve image name | |
269 | if image_id not in image_names: | |
270 | self.log.debug("scrub_missing_images: dropping {}/{}".format( | |
271 | image_key, image_id)) | |
272 | del sum_images[image_id] | |
273 | if image_id in raw_images: | |
274 | del raw_images[image_id] | |
275 | ||
276 | def process_raw_osd_perf_counters(self): | |
277 | now = datetime.now() | |
278 | now_ts = int(now.strftime("%s")) | |
279 | ||
280 | # clear the image name cache if we need to refresh all active pools | |
281 | if self.image_name_cache and \ | |
282 | self.image_name_refresh_time + POOL_REFRESH_INTERVAL < now: | |
283 | self.log.debug("process_raw_osd_perf_counters: expiring image name cache") | |
284 | self.image_name_cache = {} | |
285 | ||
286 | resolve_image_names = set() | |
287 | for pool_key, query in self.user_queries.items(): | |
288 | if not query[QUERY_IDS]: | |
289 | continue | |
290 | ||
291 | raw_pool_counters = self.merge_raw_osd_perf_counters( | |
292 | pool_key, query, now_ts, resolve_image_names) | |
293 | self.sum_osd_perf_counters(query, raw_pool_counters, now_ts) | |
294 | ||
295 | if resolve_image_names: | |
296 | self.image_name_refresh_time = now | |
297 | self.refresh_image_names(resolve_image_names) | |
298 | self.scrub_missing_images() | |
299 | elif not self.image_name_cache: | |
300 | self.scrub_missing_images() | |
301 | ||
11fdf7f2 | 302 | def resolve_pool_id(self, pool_name): |
494da23a | 303 | pool_id = self.module.rados.pool_lookup(pool_name) |
11fdf7f2 | 304 | if not pool_id: |
494da23a | 305 | raise rados.ObjectNotFound("Pool '{}' not found".format(pool_name)) |
11fdf7f2 TL |
306 | return pool_id |
307 | ||
308 | def scrub_expired_queries(self): | |
309 | # perf counters need to be periodically refreshed to continue | |
310 | # to be registered | |
311 | expire_time = datetime.now() - QUERY_EXPIRE_INTERVAL | |
312 | for pool_key in list(self.user_queries.keys()): | |
313 | user_query = self.user_queries[pool_key] | |
314 | if user_query[QUERY_LAST_REQUEST] < expire_time: | |
315 | self.unregister_osd_perf_queries(pool_key, user_query[QUERY_IDS]) | |
316 | del self.user_queries[pool_key] | |
317 | ||
318 | def register_osd_perf_queries(self, pool_id, namespace): | |
319 | query_ids = [] | |
320 | try: | |
321 | for counter in OSD_PERF_QUERY_COUNTERS: | |
322 | query = self.prepare_osd_perf_query(pool_id, namespace, counter) | |
323 | self.log.debug("register_osd_perf_queries: {}".format(query)) | |
324 | ||
494da23a | 325 | query_id = self.module.add_osd_perf_query(query) |
11fdf7f2 TL |
326 | if query_id is None: |
327 | raise RuntimeError('Failed to add OSD perf query: {}'.format(query)) | |
328 | query_ids.append(query_id) | |
329 | ||
330 | except Exception: | |
331 | for query_id in query_ids: | |
494da23a | 332 | self.module.remove_osd_perf_query(query_id) |
11fdf7f2 TL |
333 | raise |
334 | ||
335 | return query_ids | |
336 | ||
337 | def unregister_osd_perf_queries(self, pool_key, query_ids): | |
338 | self.log.info("unregister_osd_perf_queries: pool_key={}, query_ids={}".format( | |
339 | pool_key, query_ids)) | |
340 | for query_id in query_ids: | |
494da23a | 341 | self.module.remove_osd_perf_query(query_id) |
11fdf7f2 TL |
342 | query_ids[:] = [] |
343 | ||
344 | def register_query(self, pool_key): | |
345 | if pool_key not in self.user_queries: | |
346 | pool_id = None | |
347 | if pool_key[0]: | |
348 | pool_id = self.resolve_pool_id(pool_key[0]) | |
349 | ||
350 | user_query = { | |
351 | QUERY_POOL_ID: pool_id, | |
352 | QUERY_POOL_ID_MAP: {pool_id: pool_key[0]}, | |
353 | QUERY_IDS: self.register_osd_perf_queries(pool_id, pool_key[1]), | |
354 | QUERY_LAST_REQUEST: datetime.now() | |
355 | } | |
356 | ||
357 | self.user_queries[pool_key] = user_query | |
358 | ||
359 | # force an immediate stat pull if this is a new query | |
360 | self.query_condition.notify() | |
361 | self.refresh_condition.wait(5) | |
362 | ||
363 | else: | |
364 | user_query = self.user_queries[pool_key] | |
365 | ||
366 | # ensure query doesn't expire | |
367 | user_query[QUERY_LAST_REQUEST] = datetime.now() | |
368 | ||
369 | if pool_key == GLOBAL_POOL_KEY: | |
370 | # refresh the global pool id -> name map upon each | |
371 | # processing period | |
372 | user_query[QUERY_POOL_ID_MAP] = { | |
373 | pool_id: pool_name for pool_id, pool_name | |
494da23a | 374 | in get_rbd_pools(self.module).items()} |
11fdf7f2 TL |
375 | |
376 | self.log.debug("register_query: pool_key={}, query_ids={}".format( | |
377 | pool_key, user_query[QUERY_IDS])) | |
378 | ||
379 | return user_query | |
380 | ||
381 | def extract_stat(self, index, raw_image, sum_image): | |
382 | # require two raw counters between a fixed time window | |
383 | if not raw_image or not raw_image[0] or not raw_image[1]: | |
384 | return 0 | |
385 | ||
386 | current_time = raw_image[0][0] | |
387 | previous_time = raw_image[1][0] | |
388 | if current_time <= previous_time or \ | |
389 | current_time - previous_time > STATS_RATE_INTERVAL.total_seconds(): | |
390 | return 0 | |
391 | ||
392 | current_value = raw_image[0][1][index] | |
393 | instant_rate = float(current_value) / (current_time - previous_time) | |
394 | ||
395 | # convert latencies from sum to average per op | |
396 | ops_index = None | |
397 | if OSD_PERF_QUERY_COUNTERS[index] == 'write_latency': | |
398 | ops_index = OSD_PERF_QUERY_COUNTERS_INDICES['write_ops'] | |
399 | elif OSD_PERF_QUERY_COUNTERS[index] == 'read_latency': | |
400 | ops_index = OSD_PERF_QUERY_COUNTERS_INDICES['read_ops'] | |
401 | ||
402 | if ops_index is not None: | |
403 | ops = max(1, self.extract_stat(ops_index, raw_image, sum_image)) | |
404 | instant_rate /= ops | |
405 | ||
406 | return instant_rate | |
407 | ||
408 | def extract_counter(self, index, raw_image, sum_image): | |
409 | if sum_image: | |
410 | return sum_image[index] | |
411 | return 0 | |
412 | ||
413 | def generate_report(self, query, sort_by, extract_data): | |
414 | pool_id_map = query[QUERY_POOL_ID_MAP] | |
415 | sum_pool_counters = query.setdefault(QUERY_SUM_POOL_COUNTERS, {}) | |
416 | raw_pool_counters = query.setdefault(QUERY_RAW_POOL_COUNTERS, {}) | |
417 | ||
418 | sort_by_index = OSD_PERF_QUERY_COUNTERS.index(sort_by) | |
419 | ||
420 | # pre-sort and limit the response | |
421 | results = [] | |
422 | for pool_id, sum_namespaces in sum_pool_counters.items(): | |
423 | if pool_id not in pool_id_map: | |
424 | continue | |
425 | raw_namespaces = raw_pool_counters.get(pool_id, {}) | |
426 | for namespace, sum_images in sum_namespaces.items(): | |
427 | raw_images = raw_namespaces.get(namespace, {}) | |
428 | for image_id, sum_image in sum_images.items(): | |
429 | raw_image = raw_images.get(image_id, []) | |
430 | ||
431 | # always sort by recent IO activity | |
432 | results.append([(pool_id, namespace, image_id), | |
433 | self.extract_stat(sort_by_index, raw_image, | |
434 | sum_image)]) | |
435 | results = sorted(results, key=lambda x: x[1], reverse=True)[:REPORT_MAX_RESULTS] | |
436 | ||
437 | # build the report in sorted order | |
438 | pool_descriptors = {} | |
439 | counters = [] | |
440 | for key, _ in results: | |
441 | pool_id = key[0] | |
442 | pool_name = pool_id_map[pool_id] | |
443 | ||
444 | namespace = key[1] | |
445 | image_id = key[2] | |
446 | image_names = self.image_name_cache.get((pool_id, namespace), {}) | |
447 | image_name = image_names[image_id] | |
448 | ||
449 | raw_namespaces = raw_pool_counters.get(pool_id, {}) | |
450 | raw_images = raw_namespaces.get(namespace, {}) | |
451 | raw_image = raw_images.get(image_id, []) | |
452 | ||
453 | sum_namespaces = sum_pool_counters[pool_id] | |
454 | sum_images = sum_namespaces[namespace] | |
455 | sum_image = sum_images.get(image_id, []) | |
456 | ||
457 | pool_descriptor = pool_name | |
458 | if namespace: | |
459 | pool_descriptor += "/{}".format(namespace) | |
460 | pool_index = pool_descriptors.setdefault(pool_descriptor, | |
461 | len(pool_descriptors)) | |
462 | image_descriptor = "{}/{}".format(pool_index, image_name) | |
463 | data = [extract_data(i, raw_image, sum_image) | |
464 | for i in range(len(OSD_PERF_QUERY_COUNTERS))] | |
465 | ||
466 | # skip if no data to report | |
467 | if data == [0 for i in range(len(OSD_PERF_QUERY_COUNTERS))]: | |
468 | continue | |
469 | ||
470 | counters.append({image_descriptor: data}) | |
471 | ||
472 | return {idx: descriptor for descriptor, idx | |
473 | in pool_descriptors.items()}, \ | |
474 | counters | |
475 | ||
476 | def get_perf_data(self, report, pool_spec, sort_by, extract_data): | |
477 | self.log.debug("get_perf_{}s: pool_spec={}, sort_by={}".format( | |
478 | report, pool_spec, sort_by)) | |
479 | self.scrub_expired_queries() | |
480 | ||
494da23a | 481 | pool_key = extract_pool_key(pool_spec) |
11fdf7f2 TL |
482 | user_query = self.register_query(pool_key) |
483 | ||
484 | now = datetime.now() | |
485 | pool_descriptors, counters = self.generate_report( | |
486 | user_query, sort_by, extract_data) | |
487 | ||
488 | report = { | |
489 | 'timestamp': time.mktime(now.timetuple()), | |
490 | '{}_descriptors'.format(report): OSD_PERF_QUERY_COUNTERS, | |
491 | 'pool_descriptors': pool_descriptors, | |
492 | '{}s'.format(report): counters | |
493 | } | |
494 | ||
495 | return 0, json.dumps(report), "" | |
496 | ||
497 | def get_perf_stats(self, pool_spec, sort_by): | |
498 | return self.get_perf_data( | |
499 | "stat", pool_spec, sort_by, self.extract_stat) | |
500 | ||
501 | def get_perf_counters(self, pool_spec, sort_by): | |
502 | return self.get_perf_data( | |
503 | "counter", pool_spec, sort_by, self.extract_counter) | |
504 | ||
494da23a | 505 | def handle_command(self, inbuf, prefix, cmd): |
11fdf7f2 | 506 | with self.lock: |
494da23a | 507 | if prefix == 'image stats': |
11fdf7f2 TL |
508 | return self.get_perf_stats(cmd.get('pool_spec', None), |
509 | cmd.get('sort_by', OSD_PERF_QUERY_COUNTERS[0])) | |
494da23a | 510 | elif prefix == 'image counters': |
11fdf7f2 TL |
511 | return self.get_perf_counters(cmd.get('pool_spec', None), |
512 | cmd.get('sort_by', OSD_PERF_QUERY_COUNTERS[0])) | |
513 | ||
494da23a TL |
514 | raise NotImplementedError(cmd['prefix']) |
515 | ||
516 | ||
517 | class Throttle: | |
518 | def __init__(self, throttle_period): | |
519 | self.throttle_period = throttle_period | |
520 | self.time_of_last_call = datetime.min | |
521 | ||
522 | def __call__(self, fn): | |
523 | @wraps(fn) | |
524 | def wrapper(*args, **kwargs): | |
525 | now = datetime.now() | |
526 | if self.time_of_last_call + self.throttle_period <= now: | |
527 | self.time_of_last_call = now | |
528 | return fn(*args, **kwargs) | |
529 | return wrapper | |
530 | ||
531 | ||
532 | class Task: | |
533 | def __init__(self, sequence, task_id, message, refs): | |
534 | self.sequence = sequence | |
535 | self.task_id = task_id | |
536 | self.message = message | |
537 | self.refs = refs | |
538 | self.retry_time = None | |
539 | self.in_progress = False | |
540 | self.progress = 0.0 | |
541 | self.canceled = False | |
542 | self.failed = False | |
543 | ||
544 | def __str__(self): | |
545 | return self.to_json() | |
546 | ||
547 | @property | |
548 | def sequence_key(self): | |
549 | return "{0:016X}".format(self.sequence) | |
550 | ||
551 | def cancel(self): | |
552 | self.canceled = True | |
553 | self.fail("Operation canceled") | |
554 | ||
555 | def fail(self, message): | |
556 | self.failed = True | |
557 | self.failure_message = message | |
558 | ||
559 | def to_dict(self): | |
560 | d = {TASK_SEQUENCE: self.sequence, | |
561 | TASK_ID: self.task_id, | |
562 | TASK_MESSAGE: self.message, | |
563 | TASK_REFS: self.refs | |
564 | } | |
565 | if self.retry_time: | |
566 | d[TASK_RETRY_TIME] = self.retry_time.isoformat() | |
567 | if self.in_progress: | |
568 | d[TASK_IN_PROGRESS] = True | |
569 | d[TASK_PROGRESS] = self.progress | |
570 | if self.canceled: | |
571 | d[TASK_CANCELED] = True | |
572 | return d | |
573 | ||
574 | def to_json(self): | |
575 | return str(json.dumps(self.to_dict())) | |
576 | ||
577 | @classmethod | |
578 | def from_json(cls, val): | |
579 | try: | |
580 | d = json.loads(val) | |
581 | action = d.get(TASK_REFS, {}).get(TASK_REF_ACTION) | |
582 | if action not in VALID_TASK_ACTIONS: | |
583 | raise ValueError("Invalid task action: {}".format(action)) | |
584 | ||
585 | return Task(d[TASK_SEQUENCE], d[TASK_ID], d[TASK_MESSAGE], d[TASK_REFS]) | |
586 | except json.JSONDecodeError as e: | |
587 | raise ValueError("Invalid JSON ({})".format(str(e))) | |
588 | except KeyError as e: | |
589 | raise ValueError("Invalid task format (missing key {})".format(str(e))) | |
590 | ||
591 | ||
592 | class TaskHandler: | |
593 | lock = Lock() | |
594 | condition = Condition(lock) | |
595 | thread = None | |
596 | ||
597 | in_progress_task = None | |
598 | tasks_by_sequence = dict() | |
599 | tasks_by_id = dict() | |
600 | ||
601 | completed_tasks = [] | |
602 | ||
603 | sequence = 0 | |
604 | ||
605 | def __init__(self, module): | |
606 | self.module = module | |
607 | self.log = module.log | |
608 | ||
609 | with self.lock: | |
610 | self.init_task_queue() | |
611 | ||
612 | self.thread = Thread(target=self.run) | |
613 | self.thread.start() | |
614 | ||
615 | @property | |
616 | def default_pool_name(self): | |
617 | return self.module.get_ceph_option("rbd_default_pool") | |
618 | ||
619 | def extract_pool_spec(self, pool_spec): | |
620 | pool_spec = extract_pool_key(pool_spec) | |
621 | if pool_spec == GLOBAL_POOL_KEY: | |
622 | pool_spec = (self.default_pool_name, '') | |
623 | return pool_spec | |
624 | ||
625 | def extract_image_spec(self, image_spec): | |
626 | match = re.match(r'^(?:([^/]+)/(?:([^/]+)/)?)?([^/@]+)$', | |
627 | image_spec or '') | |
628 | if not match: | |
629 | raise ValueError("Invalid image spec: {}".format(image_spec)) | |
630 | return (match.group(1) or self.default_pool_name, match.group(2) or '', | |
631 | match.group(3)) | |
632 | ||
633 | def run(self): | |
634 | try: | |
635 | self.log.info("TaskHandler: starting") | |
636 | while True: | |
637 | with self.lock: | |
638 | now = datetime.now() | |
639 | for sequence in sorted([sequence for sequence, task | |
640 | in self.tasks_by_sequence.items() | |
641 | if not task.retry_time or task.retry_time <= now]): | |
642 | self.execute_task(sequence) | |
643 | ||
644 | self.condition.wait(5) | |
645 | self.log.debug("TaskHandler: tick") | |
646 | ||
647 | except Exception as ex: | |
648 | self.log.fatal("Fatal runtime error: {}\n{}".format( | |
649 | ex, traceback.format_exc())) | |
650 | ||
651 | @contextmanager | |
652 | def open_ioctx(self, spec): | |
653 | try: | |
654 | with self.module.rados.open_ioctx(spec[0]) as ioctx: | |
655 | ioctx.set_namespace(spec[1]) | |
656 | yield ioctx | |
657 | except rados.ObjectNotFound: | |
658 | self.log.error("Failed to locate pool {}".format(spec[0])) | |
659 | raise | |
660 | ||
661 | @classmethod | |
662 | def format_image_spec(cls, image_spec): | |
663 | image = image_spec[2] | |
664 | if image_spec[1]: | |
665 | image = "{}/{}".format(image_spec[1], image) | |
666 | if image_spec[0]: | |
667 | image = "{}/{}".format(image_spec[0], image) | |
668 | return image | |
669 | ||
670 | def init_task_queue(self): | |
671 | for pool_id, pool_name in get_rbd_pools(self.module).items(): | |
672 | try: | |
673 | with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: | |
674 | self.load_task_queue(ioctx, pool_name) | |
675 | ||
676 | try: | |
677 | namespaces = rbd.RBD().namespace_list(ioctx) | |
678 | except rbd.OperationNotSupported: | |
679 | self.log.debug("Namespaces not supported") | |
680 | continue | |
681 | ||
682 | for namespace in namespaces: | |
683 | ioctx.set_namespace(namespace) | |
684 | self.load_task_queue(ioctx, pool_name) | |
685 | ||
686 | except rados.ObjectNotFound: | |
687 | # pool DNE | |
688 | pass | |
689 | ||
690 | if self.tasks_by_sequence: | |
691 | self.sequence = list(sorted(self.tasks_by_sequence.keys()))[-1] | |
692 | ||
693 | self.log.debug("sequence={}, tasks_by_sequence={}, tasks_by_id={}".format( | |
694 | self.sequence, str(self.tasks_by_sequence), str(self.tasks_by_id))) | |
695 | ||
696 | def load_task_queue(self, ioctx, pool_name): | |
697 | pool_spec = pool_name | |
698 | if ioctx.nspace: | |
699 | pool_spec += "/{}".format(ioctx.nspace) | |
700 | ||
701 | start_after = '' | |
702 | try: | |
703 | while True: | |
704 | with rados.ReadOpCtx() as read_op: | |
705 | self.log.info("load_task_task: {}, start_after={}".format( | |
706 | pool_spec, start_after)) | |
707 | it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128) | |
708 | ioctx.operate_read_op(read_op, RBD_TASK_OID) | |
709 | ||
710 | it = list(it) | |
711 | for k, v in it: | |
712 | start_after = k | |
713 | v = v.decode() | |
714 | self.log.info("load_task_task: task={}".format(v)) | |
715 | ||
716 | try: | |
717 | task = Task.from_json(v) | |
718 | self.append_task(task) | |
719 | except ValueError: | |
720 | self.log.error("Failed to decode task: pool_spec={}, task={}".format(pool_spec, v)) | |
721 | ||
722 | if not it: | |
723 | break | |
724 | ||
725 | except StopIteration: | |
726 | pass | |
727 | except rados.ObjectNotFound: | |
728 | # rbd_task DNE | |
729 | pass | |
730 | ||
731 | def append_task(self, task): | |
732 | self.tasks_by_sequence[task.sequence] = task | |
733 | self.tasks_by_id[task.task_id] = task | |
734 | ||
735 | def task_refs_match(self, task_refs, refs): | |
736 | if TASK_REF_IMAGE_ID not in refs and TASK_REF_IMAGE_ID in task_refs: | |
737 | task_refs = task_refs.copy() | |
738 | del task_refs[TASK_REF_IMAGE_ID] | |
739 | ||
740 | self.log.debug("task_refs_match: ref1={}, ref2={}".format(task_refs, refs)) | |
741 | return task_refs == refs | |
742 | ||
743 | def find_task(self, refs): | |
744 | self.log.debug("find_task: refs={}".format(refs)) | |
745 | ||
746 | # search for dups and return the original | |
747 | for task_id in reversed(sorted(self.tasks_by_id.keys())): | |
748 | task = self.tasks_by_id[task_id] | |
749 | if self.task_refs_match(task.refs, refs): | |
750 | return task | |
751 | ||
752 | # search for a completed task (message replay) | |
753 | for task in reversed(self.completed_tasks): | |
754 | if self.task_refs_match(task.refs, refs): | |
755 | return task | |
756 | ||
757 | def add_task(self, ioctx, message, refs): | |
758 | self.log.debug("add_task: message={}, refs={}".format(message, refs)) | |
759 | ||
760 | # ensure unique uuid across all pools | |
761 | while True: | |
762 | task_id = str(uuid.uuid4()) | |
763 | if task_id not in self.tasks_by_id: | |
764 | break | |
765 | ||
766 | self.sequence += 1 | |
767 | task = Task(self.sequence, task_id, message, refs) | |
768 | ||
769 | # add the task to the rbd_task omap | |
770 | task_json = task.to_json() | |
771 | omap_keys = (task.sequence_key, ) | |
772 | omap_vals = (str.encode(task_json), ) | |
773 | self.log.info("adding task: {} {}".format(omap_keys[0], omap_vals[0])) | |
774 | ||
775 | with rados.WriteOpCtx() as write_op: | |
776 | ioctx.set_omap(write_op, omap_keys, omap_vals) | |
777 | ioctx.operate_write_op(write_op, RBD_TASK_OID) | |
778 | self.append_task(task) | |
779 | ||
780 | self.condition.notify() | |
781 | return task_json | |
782 | ||
783 | def remove_task(self, ioctx, task, remove_in_memory=True): | |
784 | self.log.info("remove_task: task={}".format(str(task))) | |
785 | omap_keys = (task.sequence_key, ) | |
786 | try: | |
787 | with rados.WriteOpCtx() as write_op: | |
788 | ioctx.remove_omap_keys(write_op, omap_keys) | |
789 | ioctx.operate_write_op(write_op, RBD_TASK_OID) | |
790 | except rados.ObjectNotFound: | |
791 | pass | |
792 | ||
793 | if remove_in_memory: | |
794 | try: | |
795 | del self.tasks_by_id[task.task_id] | |
796 | del self.tasks_by_sequence[task.sequence] | |
797 | ||
798 | # keep a record of the last N tasks to help avoid command replay | |
799 | # races | |
800 | if not task.failed and not task.canceled: | |
801 | self.log.debug("remove_task: moving to completed tasks") | |
802 | self.completed_tasks.append(task) | |
803 | self.completed_tasks = self.completed_tasks[-MAX_COMPLETED_TASKS:] | |
804 | ||
805 | except KeyError: | |
806 | pass | |
807 | ||
808 | def execute_task(self, sequence): | |
809 | task = self.tasks_by_sequence[sequence] | |
810 | self.log.info("execute_task: task={}".format(str(task))) | |
811 | ||
812 | pool_valid = False | |
813 | try: | |
814 | with self.open_ioctx((task.refs[TASK_REF_POOL_NAME], | |
815 | task.refs[TASK_REF_POOL_NAMESPACE])) as ioctx: | |
816 | pool_valid = True | |
817 | ||
818 | action = task.refs[TASK_REF_ACTION] | |
819 | execute_fn = {TASK_REF_ACTION_FLATTEN: self.execute_flatten, | |
820 | TASK_REF_ACTION_REMOVE: self.execute_remove, | |
821 | TASK_REF_ACTION_TRASH_REMOVE: self.execute_trash_remove, | |
822 | TASK_REF_ACTION_MIGRATION_EXECUTE: self.execute_migration_execute, | |
823 | TASK_REF_ACTION_MIGRATION_COMMIT: self.execute_migration_commit, | |
824 | TASK_REF_ACTION_MIGRATION_ABORT: self.execute_migration_abort | |
825 | }.get(action) | |
826 | if not execute_fn: | |
827 | self.log.error("Invalid task action: {}".format(action)) | |
828 | else: | |
829 | task.in_progress = True | |
830 | self.in_progress_task = task | |
831 | self.update_progress(task, 0) | |
832 | ||
833 | self.lock.release() | |
834 | try: | |
835 | execute_fn(ioctx, task) | |
836 | ||
837 | except rbd.OperationCanceled: | |
838 | self.log.info("Operation canceled: task={}".format( | |
839 | str(task))) | |
840 | ||
841 | finally: | |
842 | self.lock.acquire() | |
843 | ||
844 | task.in_progress = False | |
845 | self.in_progress_task = None | |
846 | ||
847 | self.complete_progress(task) | |
848 | self.remove_task(ioctx, task) | |
849 | ||
850 | except rados.ObjectNotFound as e: | |
851 | self.log.error("execute_task: {}".format(e)) | |
852 | if pool_valid: | |
853 | self.update_progress(task, 0) | |
854 | else: | |
855 | # pool DNE -- remove the task | |
856 | self.complete_progress(task) | |
857 | self.remove_task(ioctx, task) | |
858 | ||
859 | except (rados.Error, rbd.Error) as e: | |
860 | self.log.error("execute_task: {}".format(e)) | |
861 | self.update_progress(task, 0) | |
862 | ||
863 | finally: | |
864 | task.in_progress = False | |
865 | task.retry_time = datetime.now() + TASK_RETRY_INTERVAL | |
866 | ||
867 | def progress_callback(self, task, current, total): | |
868 | progress = float(current) / float(total) | |
869 | self.log.debug("progress_callback: task={}, progress={}".format( | |
870 | str(task), progress)) | |
871 | ||
872 | # avoid deadlocking when a new command comes in during a progress callback | |
873 | if not self.lock.acquire(False): | |
874 | return 0 | |
875 | ||
876 | try: | |
877 | if not self.in_progress_task or self.in_progress_task.canceled: | |
878 | return -rbd.ECANCELED | |
879 | self.in_progress_task.progress = progress | |
880 | finally: | |
881 | self.lock.release() | |
882 | ||
883 | self.throttled_update_progress(task, progress) | |
884 | return 0 | |
885 | ||
886 | def execute_flatten(self, ioctx, task): | |
887 | self.log.info("execute_flatten: task={}".format(str(task))) | |
888 | ||
889 | try: | |
890 | with rbd.Image(ioctx, task.refs[TASK_REF_IMAGE_NAME]) as image: | |
891 | image.flatten(on_progress=partial(self.progress_callback, task)) | |
892 | except rbd.InvalidArgument: | |
893 | task.fail("Image does not have parent") | |
894 | self.log.info("{}: task={}".format(task.failure_message, str(task))) | |
895 | except rbd.ImageNotFound: | |
896 | task.fail("Image does not exist") | |
897 | self.log.info("{}: task={}".format(task.failure_message, str(task))) | |
898 | ||
899 | def execute_remove(self, ioctx, task): | |
900 | self.log.info("execute_remove: task={}".format(str(task))) | |
901 | ||
902 | try: | |
903 | rbd.RBD().remove(ioctx, task.refs[TASK_REF_IMAGE_NAME], | |
904 | on_progress=partial(self.progress_callback, task)) | |
905 | except rbd.ImageNotFound: | |
906 | task.fail("Image does not exist") | |
907 | self.log.info("{}: task={}".format(task.failure_message, str(task))) | |
908 | ||
909 | def execute_trash_remove(self, ioctx, task): | |
910 | self.log.info("execute_trash_remove: task={}".format(str(task))) | |
911 | ||
912 | try: | |
913 | rbd.RBD().trash_remove(ioctx, task.refs[TASK_REF_IMAGE_ID], | |
914 | on_progress=partial(self.progress_callback, task)) | |
915 | except rbd.ImageNotFound: | |
916 | task.fail("Image does not exist") | |
917 | self.log.info("{}: task={}".format(task.failure_message, str(task))) | |
918 | ||
919 | def execute_migration_execute(self, ioctx, task): | |
920 | self.log.info("execute_migration_execute: task={}".format(str(task))) | |
921 | ||
922 | try: | |
923 | rbd.RBD().migration_execute(ioctx, task.refs[TASK_REF_IMAGE_NAME], | |
924 | on_progress=partial(self.progress_callback, task)) | |
925 | except rbd.ImageNotFound: | |
926 | task.fail("Image does not exist") | |
927 | self.log.info("{}: task={}".format(task.failure_message, str(task))) | |
928 | except rbd.InvalidArgument: | |
929 | task.fail("Image is not migrating") | |
930 | self.log.info("{}: task={}".format(task.failure_message, str(task))) | |
931 | ||
932 | def execute_migration_commit(self, ioctx, task): | |
933 | self.log.info("execute_migration_commit: task={}".format(str(task))) | |
934 | ||
935 | try: | |
936 | rbd.RBD().migration_commit(ioctx, task.refs[TASK_REF_IMAGE_NAME], | |
937 | on_progress=partial(self.progress_callback, task)) | |
938 | except rbd.ImageNotFound: | |
939 | task.fail("Image does not exist") | |
940 | self.log.info("{}: task={}".format(task.failure_message, str(task))) | |
941 | except rbd.InvalidArgument: | |
942 | task.fail("Image is not migrating or migration not executed") | |
943 | self.log.info("{}: task={}".format(task.failure_message, str(task))) | |
944 | ||
945 | def execute_migration_abort(self, ioctx, task): | |
946 | self.log.info("execute_migration_abort: task={}".format(str(task))) | |
947 | ||
948 | try: | |
949 | rbd.RBD().migration_abort(ioctx, task.refs[TASK_REF_IMAGE_NAME], | |
950 | on_progress=partial(self.progress_callback, task)) | |
951 | except rbd.ImageNotFound: | |
952 | task.fail("Image does not exist") | |
953 | self.log.info("{}: task={}".format(task.failure_message, str(task))) | |
954 | except rbd.InvalidArgument: | |
955 | task.fail("Image is not migrating") | |
956 | self.log.info("{}: task={}".format(task.failure_message, str(task))) | |
957 | ||
958 | def complete_progress(self, task): | |
959 | self.log.debug("complete_progress: task={}".format(str(task))) | |
960 | try: | |
961 | if task.failed: | |
962 | self.module.remote("progress", "fail", task.task_id, | |
963 | task.failure_message) | |
964 | else: | |
965 | self.module.remote("progress", "complete", task.task_id) | |
966 | except ImportError: | |
967 | # progress module is disabled | |
968 | pass | |
969 | ||
970 | def update_progress(self, task, progress): | |
971 | self.log.debug("update_progress: task={}, progress={}".format(str(task), progress)) | |
972 | try: | |
973 | refs = {"origin": "rbd_support"} | |
974 | refs.update(task.refs) | |
975 | ||
976 | self.module.remote("progress", "update", task.task_id, | |
977 | task.message, progress, refs) | |
978 | except ImportError: | |
979 | # progress module is disabled | |
980 | pass | |
981 | ||
982 | @Throttle(timedelta(seconds=1)) | |
983 | def throttled_update_progress(self, task, progress): | |
984 | self.update_progress(task, progress) | |
985 | ||
986 | def queue_flatten(self, image_spec): | |
987 | image_spec = self.extract_image_spec(image_spec) | |
988 | self.log.info("queue_flatten: {}".format(image_spec)) | |
989 | ||
990 | refs = {TASK_REF_ACTION: TASK_REF_ACTION_FLATTEN, | |
991 | TASK_REF_POOL_NAME: image_spec[0], | |
992 | TASK_REF_POOL_NAMESPACE: image_spec[1], | |
993 | TASK_REF_IMAGE_NAME: image_spec[2]} | |
994 | ||
995 | with self.open_ioctx(image_spec) as ioctx: | |
996 | try: | |
997 | with rbd.Image(ioctx, image_spec[2]) as image: | |
998 | refs[TASK_REF_IMAGE_ID] = image.id() | |
999 | ||
1000 | try: | |
1001 | parent_image_id = image.parent_id() | |
1002 | except rbd.ImageNotFound: | |
1003 | parent_image_id = None | |
1004 | ||
1005 | except rbd.ImageNotFound: | |
1006 | pass | |
1007 | ||
1008 | task = self.find_task(refs) | |
1009 | if task: | |
1010 | return 0, task.to_json(), '' | |
1011 | ||
1012 | if TASK_REF_IMAGE_ID not in refs: | |
1013 | raise rbd.ImageNotFound("Image {} does not exist".format( | |
1014 | self.format_image_spec(image_spec)), errno=errno.ENOENT) | |
1015 | if not parent_image_id: | |
1016 | raise rbd.ImageNotFound("Image {} does not have a parent".format( | |
1017 | self.format_image_spec(image_spec)), errno=errno.ENOENT) | |
1018 | ||
1019 | return 0, self.add_task(ioctx, | |
1020 | "Flattening image {}".format( | |
1021 | self.format_image_spec(image_spec)), | |
1022 | refs), "" | |
1023 | ||
1024 | def queue_remove(self, image_spec): | |
1025 | image_spec = self.extract_image_spec(image_spec) | |
1026 | self.log.info("queue_remove: {}".format(image_spec)) | |
1027 | ||
1028 | refs = {TASK_REF_ACTION: TASK_REF_ACTION_REMOVE, | |
1029 | TASK_REF_POOL_NAME: image_spec[0], | |
1030 | TASK_REF_POOL_NAMESPACE: image_spec[1], | |
1031 | TASK_REF_IMAGE_NAME: image_spec[2]} | |
1032 | ||
1033 | with self.open_ioctx(image_spec) as ioctx: | |
1034 | try: | |
1035 | with rbd.Image(ioctx, image_spec[2]) as image: | |
1036 | refs[TASK_REF_IMAGE_ID] = image.id() | |
1037 | snaps = list(image.list_snaps()) | |
1038 | ||
1039 | except rbd.ImageNotFound: | |
1040 | pass | |
1041 | ||
1042 | task = self.find_task(refs) | |
1043 | if task: | |
1044 | return 0, task.to_json(), '' | |
1045 | ||
1046 | if TASK_REF_IMAGE_ID not in refs: | |
1047 | raise rbd.ImageNotFound("Image {} does not exist".format( | |
1048 | self.format_image_spec(image_spec)), errno=errno.ENOENT) | |
1049 | if snaps: | |
1050 | raise rbd.ImageBusy("Image {} has snapshots".format( | |
1051 | self.format_image_spec(image_spec)), errno=errno.EBUSY) | |
1052 | ||
1053 | return 0, self.add_task(ioctx, | |
1054 | "Removing image {}".format( | |
1055 | self.format_image_spec(image_spec)), | |
1056 | refs), '' | |
1057 | ||
1058 | def queue_trash_remove(self, image_id_spec): | |
1059 | image_id_spec = self.extract_image_spec(image_id_spec) | |
1060 | self.log.info("queue_trash_remove: {}".format(image_id_spec)) | |
1061 | ||
1062 | refs = {TASK_REF_ACTION: TASK_REF_ACTION_TRASH_REMOVE, | |
1063 | TASK_REF_POOL_NAME: image_id_spec[0], | |
1064 | TASK_REF_POOL_NAMESPACE: image_id_spec[1], | |
1065 | TASK_REF_IMAGE_ID: image_id_spec[2]} | |
1066 | task = self.find_task(refs) | |
1067 | if task: | |
1068 | return 0, task.to_json(), '' | |
1069 | ||
1070 | # verify that image exists in trash | |
1071 | with self.open_ioctx(image_id_spec) as ioctx: | |
1072 | rbd.RBD().trash_get(ioctx, image_id_spec[2]) | |
1073 | ||
1074 | return 0, self.add_task(ioctx, | |
1075 | "Removing image {} from trash".format( | |
1076 | self.format_image_spec(image_id_spec)), | |
1077 | refs), '' | |
1078 | ||
1079 | def get_migration_status(self, ioctx, image_spec): | |
1080 | try: | |
1081 | return rbd.RBD().migration_status(ioctx, image_spec[2]) | |
1082 | except (rbd.InvalidArgument, rbd.ImageNotFound): | |
1083 | return None | |
1084 | ||
eafe8130 | 1085 | def validate_image_migrating(self, image_spec, migration_status): |
494da23a TL |
1086 | if not migration_status: |
1087 | raise rbd.InvalidArgument("Image {} is not migrating".format( | |
1088 | self.format_image_spec(image_spec)), errno=errno.EINVAL) | |
1089 | ||
1090 | def resolve_pool_name(self, pool_id): | |
1091 | osd_map = self.module.get('osd_map') | |
1092 | for pool in osd_map['pools']: | |
1093 | if pool['pool'] == pool_id: | |
1094 | return pool['pool_name'] | |
1095 | return '<unknown>' | |
1096 | ||
1097 | def queue_migration_execute(self, image_spec): | |
1098 | image_spec = self.extract_image_spec(image_spec) | |
1099 | self.log.info("queue_migration_execute: {}".format(image_spec)) | |
1100 | ||
1101 | refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_EXECUTE, | |
1102 | TASK_REF_POOL_NAME: image_spec[0], | |
1103 | TASK_REF_POOL_NAMESPACE: image_spec[1], | |
1104 | TASK_REF_IMAGE_NAME: image_spec[2]} | |
1105 | ||
1106 | with self.open_ioctx(image_spec) as ioctx: | |
1107 | status = self.get_migration_status(ioctx, image_spec) | |
1108 | if status: | |
1109 | refs[TASK_REF_IMAGE_ID] = status['dest_image_id'] | |
1110 | ||
1111 | task = self.find_task(refs) | |
1112 | if task: | |
1113 | return 0, task.to_json(), '' | |
1114 | ||
eafe8130 | 1115 | self.validate_image_migrating(image_spec, status) |
494da23a TL |
1116 | if status['state'] not in [rbd.RBD_IMAGE_MIGRATION_STATE_PREPARED, |
1117 | rbd.RBD_IMAGE_MIGRATION_STATE_EXECUTING]: | |
1118 | raise rbd.InvalidArgument("Image {} is not in ready state".format( | |
1119 | self.format_image_spec(image_spec)), errno=errno.EINVAL) | |
1120 | ||
1121 | source_pool = self.resolve_pool_name(status['source_pool_id']) | |
1122 | dest_pool = self.resolve_pool_name(status['dest_pool_id']) | |
1123 | return 0, self.add_task(ioctx, | |
1124 | "Migrating image {} to {}".format( | |
1125 | self.format_image_spec((source_pool, | |
1126 | status['source_pool_namespace'], | |
1127 | status['source_image_name'])), | |
1128 | self.format_image_spec((dest_pool, | |
1129 | status['dest_pool_namespace'], | |
1130 | status['dest_image_name']))), | |
1131 | refs), '' | |
1132 | ||
1133 | def queue_migration_commit(self, image_spec): | |
1134 | image_spec = self.extract_image_spec(image_spec) | |
1135 | self.log.info("queue_migration_commit: {}".format(image_spec)) | |
1136 | ||
1137 | refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_COMMIT, | |
1138 | TASK_REF_POOL_NAME: image_spec[0], | |
1139 | TASK_REF_POOL_NAMESPACE: image_spec[1], | |
1140 | TASK_REF_IMAGE_NAME: image_spec[2]} | |
1141 | ||
1142 | with self.open_ioctx(image_spec) as ioctx: | |
1143 | status = self.get_migration_status(ioctx, image_spec) | |
1144 | if status: | |
1145 | refs[TASK_REF_IMAGE_ID] = status['dest_image_id'] | |
1146 | ||
1147 | task = self.find_task(refs) | |
1148 | if task: | |
1149 | return 0, task.to_json(), '' | |
1150 | ||
eafe8130 | 1151 | self.validate_image_migrating(image_spec, status) |
494da23a TL |
1152 | if status['state'] != rbd.RBD_IMAGE_MIGRATION_STATE_EXECUTED: |
1153 | raise rbd.InvalidArgument("Image {} has not completed migration".format( | |
1154 | self.format_image_spec(image_spec)), errno=errno.EINVAL) | |
1155 | ||
1156 | return 0, self.add_task(ioctx, | |
1157 | "Committing image migration for {}".format( | |
1158 | self.format_image_spec(image_spec)), | |
1159 | refs), '' | |
1160 | ||
1161 | def queue_migration_abort(self, image_spec): | |
1162 | image_spec = self.extract_image_spec(image_spec) | |
1163 | self.log.info("queue_migration_abort: {}".format(image_spec)) | |
1164 | ||
1165 | refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_ABORT, | |
1166 | TASK_REF_POOL_NAME: image_spec[0], | |
1167 | TASK_REF_POOL_NAMESPACE: image_spec[1], | |
1168 | TASK_REF_IMAGE_NAME: image_spec[2]} | |
1169 | ||
1170 | with self.open_ioctx(image_spec) as ioctx: | |
1171 | status = self.get_migration_status(ioctx, image_spec) | |
1172 | if status: | |
1173 | refs[TASK_REF_IMAGE_ID] = status['dest_image_id'] | |
1174 | ||
1175 | task = self.find_task(refs) | |
1176 | if task: | |
1177 | return 0, task.to_json(), '' | |
1178 | ||
eafe8130 | 1179 | self.validate_image_migrating(image_spec, status) |
494da23a TL |
1180 | return 0, self.add_task(ioctx, |
1181 | "Aborting image migration for {}".format( | |
1182 | self.format_image_spec(image_spec)), | |
1183 | refs), '' | |
1184 | ||
1185 | def task_cancel(self, task_id): | |
1186 | self.log.info("task_cancel: {}".format(task_id)) | |
1187 | ||
1188 | if task_id not in self.tasks_by_id: | |
1189 | return -errno.ENOENT, '', "No such task {}".format(task_id) | |
1190 | ||
1191 | task = self.tasks_by_id[task_id] | |
1192 | task.cancel() | |
1193 | ||
1194 | remove_in_memory = True | |
1195 | if self.in_progress_task and self.in_progress_task.task_id == task_id: | |
1196 | self.log.info("Attempting to cancel in-progress task: {}".format(str(self.in_progress_task))) | |
1197 | remove_in_memory = False | |
1198 | ||
1199 | # complete any associated event in the progress module | |
1200 | self.complete_progress(task) | |
1201 | ||
1202 | # remove from rbd_task omap | |
1203 | with self.open_ioctx((task.refs[TASK_REF_POOL_NAME], | |
1204 | task.refs[TASK_REF_POOL_NAMESPACE])) as ioctx: | |
1205 | self.remove_task(ioctx, task, remove_in_memory) | |
1206 | ||
1207 | return 0, "", "" | |
1208 | ||
1209 | def task_list(self, task_id): | |
1210 | self.log.info("task_list: {}".format(task_id)) | |
1211 | ||
1212 | if task_id: | |
1213 | if task_id not in self.tasks_by_id: | |
1214 | return -errno.ENOENT, '', "No such task {}".format(task_id) | |
1215 | ||
1216 | result = self.tasks_by_id[task_id].to_dict() | |
1217 | else: | |
1218 | result = [] | |
1219 | for sequence in sorted(self.tasks_by_sequence.keys()): | |
1220 | task = self.tasks_by_sequence[sequence] | |
1221 | result.append(task.to_dict()) | |
1222 | ||
1223 | return 0, json.dumps(result), "" | |
1224 | ||
1225 | def handle_command(self, inbuf, prefix, cmd): | |
1226 | with self.lock: | |
1227 | if prefix == 'add flatten': | |
1228 | return self.queue_flatten(cmd['image_spec']) | |
1229 | elif prefix == 'add remove': | |
1230 | return self.queue_remove(cmd['image_spec']) | |
1231 | elif prefix == 'add trash remove': | |
1232 | return self.queue_trash_remove(cmd['image_id_spec']) | |
1233 | elif prefix == 'add migration execute': | |
1234 | return self.queue_migration_execute(cmd['image_spec']) | |
1235 | elif prefix == 'add migration commit': | |
1236 | return self.queue_migration_commit(cmd['image_spec']) | |
1237 | elif prefix == 'add migration abort': | |
1238 | return self.queue_migration_abort(cmd['image_spec']) | |
1239 | elif prefix == 'cancel': | |
1240 | return self.task_cancel(cmd['task_id']) | |
1241 | elif prefix == 'list': | |
1242 | return self.task_list(cmd.get('task_id')) | |
1243 | ||
1244 | raise NotImplementedError(cmd['prefix']) | |
1245 | ||
1246 | ||
1247 | class Module(MgrModule): | |
1248 | COMMANDS = [ | |
1249 | { | |
1250 | "cmd": "rbd perf image stats " | |
1251 | "name=pool_spec,type=CephString,req=false " | |
1252 | "name=sort_by,type=CephChoices,strings=" | |
1253 | "write_ops|write_bytes|write_latency|" | |
1254 | "read_ops|read_bytes|read_latency," | |
1255 | "req=false ", | |
1256 | "desc": "Retrieve current RBD IO performance stats", | |
1257 | "perm": "r" | |
1258 | }, | |
1259 | { | |
1260 | "cmd": "rbd perf image counters " | |
1261 | "name=pool_spec,type=CephString,req=false " | |
1262 | "name=sort_by,type=CephChoices,strings=" | |
1263 | "write_ops|write_bytes|write_latency|" | |
1264 | "read_ops|read_bytes|read_latency," | |
1265 | "req=false ", | |
1266 | "desc": "Retrieve current RBD IO performance counters", | |
1267 | "perm": "r" | |
1268 | }, | |
1269 | { | |
1270 | "cmd": "rbd task add flatten " | |
1271 | "name=image_spec,type=CephString", | |
1272 | "desc": "Flatten a cloned image asynchronously in the background", | |
1273 | "perm": "w" | |
1274 | }, | |
1275 | { | |
1276 | "cmd": "rbd task add remove " | |
1277 | "name=image_spec,type=CephString", | |
1278 | "desc": "Remove an image asynchronously in the background", | |
1279 | "perm": "w" | |
1280 | }, | |
1281 | { | |
1282 | "cmd": "rbd task add trash remove " | |
1283 | "name=image_id_spec,type=CephString", | |
1284 | "desc": "Remove an image from the trash asynchronously in the background", | |
1285 | "perm": "w" | |
1286 | }, | |
1287 | { | |
1288 | "cmd": "rbd task add migration execute " | |
1289 | "name=image_spec,type=CephString", | |
1290 | "desc": "Execute an image migration asynchronously in the background", | |
1291 | "perm": "w" | |
1292 | }, | |
1293 | { | |
1294 | "cmd": "rbd task add migration commit " | |
1295 | "name=image_spec,type=CephString", | |
1296 | "desc": "Commit an executed migration asynchronously in the background", | |
1297 | "perm": "w" | |
1298 | }, | |
1299 | { | |
1300 | "cmd": "rbd task add migration abort " | |
1301 | "name=image_spec,type=CephString", | |
1302 | "desc": "Abort a prepared migration asynchronously in the background", | |
1303 | "perm": "w" | |
1304 | }, | |
1305 | { | |
1306 | "cmd": "rbd task cancel " | |
1307 | "name=task_id,type=CephString ", | |
1308 | "desc": "Cancel a pending or running asynchronous task", | |
1309 | "perm": "r" | |
1310 | }, | |
1311 | { | |
1312 | "cmd": "rbd task list " | |
1313 | "name=task_id,type=CephString,req=false ", | |
1314 | "desc": "List pending or running asynchronous tasks", | |
1315 | "perm": "r" | |
1316 | } | |
1317 | ] | |
1318 | MODULE_OPTIONS = [] | |
1319 | ||
1320 | perf = None | |
1321 | task = None | |
1322 | ||
1323 | def __init__(self, *args, **kwargs): | |
1324 | super(Module, self).__init__(*args, **kwargs) | |
1325 | self.perf = PerfHandler(self) | |
1326 | self.task = TaskHandler(self) | |
1327 | ||
1328 | def handle_command(self, inbuf, cmd): | |
1329 | prefix = cmd['prefix'] | |
1330 | try: | |
1331 | try: | |
1332 | if prefix.startswith('rbd perf '): | |
1333 | return self.perf.handle_command(inbuf, prefix[9:], cmd) | |
1334 | elif prefix.startswith('rbd task '): | |
1335 | return self.task.handle_command(inbuf, prefix[9:], cmd) | |
1336 | ||
1337 | except Exception as ex: | |
1338 | # log the full traceback but don't send it to the CLI user | |
1339 | self.log.fatal("Fatal runtime error: {}\n{}".format( | |
1340 | ex, traceback.format_exc())) | |
1341 | raise | |
1342 | ||
1343 | except rados.Error as ex: | |
1344 | return -ex.errno, "", str(ex) | |
1345 | except rbd.OSError as ex: | |
1346 | return -ex.errno, "", str(ex) | |
1347 | except rbd.Error as ex: | |
1348 | return -errno.EINVAL, "", str(ex) | |
1349 | except KeyError as ex: | |
1350 | return -errno.ENOENT, "", str(ex) | |
1351 | except ValueError as ex: | |
1352 | return -errno.EINVAL, "", str(ex) | |
1353 | ||
1354 | raise NotImplementedError(cmd['prefix']) |