]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/dashboard/tests/test_task.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / dashboard / tests / test_task.py
CommitLineData
11fdf7f2 1# -*- coding: utf-8 -*-
11fdf7f2
TL
2
3import json
11fdf7f2
TL
4import threading
5import time
f67539c2 6import unittest
11fdf7f2
TL
7from collections import defaultdict
8from functools import partial
9
10from ..services.exception import serialize_dashboard_exception
f67539c2 11from ..tools import NotificationQueue, TaskExecutor, TaskManager
11fdf7f2
TL
12
13
14class MyTask(object):
15 class CallbackExecutor(TaskExecutor):
16 def __init__(self, fail, progress):
17 super(MyTask.CallbackExecutor, self).__init__()
18 self.fail = fail
19 self.progress = progress
20
21 def init(self, task):
22 super(MyTask.CallbackExecutor, self).init(task)
23 args = [self.callback]
24 args.extend(self.task.fn_args)
25 self.task.fn_args = args
26
27 def callback(self, result):
28 self.task.set_progress(self.progress)
29 if self.fail:
30 self.finish(None, Exception("Task Unexpected Exception"))
31 else:
32 self.finish(result, None)
33
34 # pylint: disable=too-many-arguments
35 def __init__(self, op_seconds, wait=False, fail=False, progress=50,
36 is_async=False, handle_ex=False):
37 self.op_seconds = op_seconds
38 self.wait = wait
39 self.fail = fail
40 self.progress = progress
41 self.is_async = is_async
42 self.handle_ex = handle_ex
43 self._event = threading.Event()
44
45 def run(self, ns, timeout=None):
46 args = ['dummy arg']
47 kwargs = {'dummy': 'arg'}
48 h_ex = partial(serialize_dashboard_exception,
49 include_http_status=True) if self.handle_ex else None
50 if not self.is_async:
51 task = TaskManager.run(
52 ns, self.metadata(), self.task_op, args, kwargs,
53 exception_handler=h_ex)
54 else:
55 task = TaskManager.run(
56 ns, self.metadata(), self.task_async_op, args, kwargs,
57 executor=MyTask.CallbackExecutor(self.fail, self.progress),
58 exception_handler=h_ex)
59 return task.wait(timeout)
60
61 def task_op(self, *args, **kwargs):
62 time.sleep(self.op_seconds)
63 TaskManager.current_task().set_progress(self.progress)
64 if self.fail:
65 raise Exception("Task Unexpected Exception")
66 if self.wait:
67 self._event.wait()
68 return {'args': list(args), 'kwargs': kwargs}
69
70 def task_async_op(self, callback, *args, **kwargs):
71 if self.fail == "premature":
72 raise Exception("Task Unexpected Exception")
73
74 def _run_bg():
75 time.sleep(self.op_seconds)
76 if self.wait:
77 self._event.wait()
78 callback({'args': list(args), 'kwargs': kwargs})
79
80 worker = threading.Thread(target=_run_bg)
81 worker.start()
82
83 def resume(self):
84 self._event.set()
85
86 def metadata(self):
87 return {
88 'op_seconds': self.op_seconds,
89 'wait': self.wait,
90 'fail': self.fail,
91 'progress': self.progress,
92 'is_async': self.is_async,
93 'handle_ex': self.handle_ex
94 }
95
96
97class TaskTest(unittest.TestCase):
98
99 TASK_FINISHED_MAP = defaultdict(threading.Event)
100
101 @classmethod
102 def _handle_task(cls, task):
103 cls.TASK_FINISHED_MAP[task.name].set()
104
105 @classmethod
106 def wait_for_task(cls, name):
107 cls.TASK_FINISHED_MAP[name].wait()
108
109 @classmethod
110 def setUpClass(cls):
111 NotificationQueue.start_queue()
112 TaskManager.init()
113 NotificationQueue.register(cls._handle_task, 'cd_task_finished',
114 priority=100)
115
116 @classmethod
117 def tearDownClass(cls):
118 NotificationQueue.deregister(cls._handle_task, 'cd_task_finished')
119 NotificationQueue.stop()
120
121 def setUp(self):
122 TaskManager.FINISHED_TASK_SIZE = 10
123 TaskManager.FINISHED_TASK_TTL = 60.0
124
9f95a23c 125 def assertTaskResult(self, result): # noqa: N802
11fdf7f2
TL
126 self.assertEqual(result,
127 {'args': ['dummy arg'], 'kwargs': {'dummy': 'arg'}})
128
129 def test_fast_task(self):
130 task1 = MyTask(1)
131 state, result = task1.run('test1/task1')
132 self.assertEqual(state, TaskManager.VALUE_DONE)
133 self.assertTaskResult(result)
134 self.wait_for_task('test1/task1')
135 _, fn_t = TaskManager.list('test1/*')
136 self.assertEqual(len(fn_t), 1)
137 self.assertIsNone(fn_t[0].exception)
138 self.assertTaskResult(fn_t[0].ret_value)
139 self.assertEqual(fn_t[0].progress, 100)
140
141 def test_slow_task(self):
142 task1 = MyTask(1)
143 state, result = task1.run('test2/task1', 0.5)
144 self.assertEqual(state, TaskManager.VALUE_EXECUTING)
145 self.assertIsNone(result)
146 self.wait_for_task('test2/task1')
147 _, fn_t = TaskManager.list('test2/*')
148 self.assertEqual(len(fn_t), 1)
149 self.assertIsNone(fn_t[0].exception)
150 self.assertTaskResult(fn_t[0].ret_value)
151 self.assertEqual(fn_t[0].progress, 100)
152
153 def test_fast_task_with_failure(self):
154 task1 = MyTask(1, fail=True, progress=40)
155
156 with self.assertRaises(Exception) as ctx:
157 task1.run('test3/task1')
158
159 self.assertEqual(str(ctx.exception), "Task Unexpected Exception")
160 self.wait_for_task('test3/task1')
161 _, fn_t = TaskManager.list('test3/*')
162 self.assertEqual(len(fn_t), 1)
163 self.assertIsNone(fn_t[0].ret_value)
164 self.assertEqual(str(fn_t[0].exception), "Task Unexpected Exception")
165 self.assertEqual(fn_t[0].progress, 40)
166
167 def test_slow_task_with_failure(self):
168 task1 = MyTask(1, fail=True, progress=70)
169 state, result = task1.run('test4/task1', 0.5)
170 self.assertEqual(state, TaskManager.VALUE_EXECUTING)
171 self.assertIsNone(result)
172 self.wait_for_task('test4/task1')
173 _, fn_t = TaskManager.list('test4/*')
174 self.assertEqual(len(fn_t), 1)
175 self.assertIsNone(fn_t[0].ret_value)
176 self.assertEqual(str(fn_t[0].exception), "Task Unexpected Exception")
177 self.assertEqual(fn_t[0].progress, 70)
178
179 def test_executing_tasks_list(self):
180 task1 = MyTask(0, wait=True, progress=30)
181 task2 = MyTask(0, wait=True, progress=60)
182 state, result = task1.run('test5/task1', 0.5)
183 self.assertEqual(state, TaskManager.VALUE_EXECUTING)
184 self.assertIsNone(result)
185 ex_t, _ = TaskManager.list('test5/*')
186 self.assertEqual(len(ex_t), 1)
187 self.assertEqual(ex_t[0].name, 'test5/task1')
188 self.assertEqual(ex_t[0].progress, 30)
189 state, result = task2.run('test5/task2', 0.5)
190 self.assertEqual(state, TaskManager.VALUE_EXECUTING)
191 self.assertIsNone(result)
192 ex_t, _ = TaskManager.list('test5/*')
193 self.assertEqual(len(ex_t), 2)
194 for task in ex_t:
195 if task.name == 'test5/task1':
196 self.assertEqual(task.progress, 30)
197 elif task.name == 'test5/task2':
198 self.assertEqual(task.progress, 60)
199 task2.resume()
200 self.wait_for_task('test5/task2')
201 ex_t, _ = TaskManager.list('test5/*')
202 self.assertEqual(len(ex_t), 1)
203 self.assertEqual(ex_t[0].name, 'test5/task1')
204 task1.resume()
205 self.wait_for_task('test5/task1')
206 ex_t, _ = TaskManager.list('test5/*')
207 self.assertEqual(len(ex_t), 0)
208
209 def test_task_idempotent(self):
210 task1 = MyTask(0, wait=True)
211 task1_clone = MyTask(0, wait=True)
212 state, result = task1.run('test6/task1', 0.5)
213 self.assertEqual(state, TaskManager.VALUE_EXECUTING)
214 self.assertIsNone(result)
215 ex_t, _ = TaskManager.list('test6/*')
216 self.assertEqual(len(ex_t), 1)
217 self.assertEqual(ex_t[0].name, 'test6/task1')
218 state, result = task1_clone.run('test6/task1', 0.5)
219 self.assertEqual(state, TaskManager.VALUE_EXECUTING)
220 self.assertIsNone(result)
221 ex_t, _ = TaskManager.list('test6/*')
222 self.assertEqual(len(ex_t), 1)
223 self.assertEqual(ex_t[0].name, 'test6/task1')
224 task1.resume()
225 self.wait_for_task('test6/task1')
226 ex_t, fn_t = TaskManager.list('test6/*')
227 self.assertEqual(len(ex_t), 0)
228 self.assertEqual(len(fn_t), 1)
229
230 def test_finished_cleanup(self):
231 TaskManager.FINISHED_TASK_SIZE = 2
232 TaskManager.FINISHED_TASK_TTL = 0.5
233 task1 = MyTask(0)
234 task2 = MyTask(0)
235 state, result = task1.run('test7/task1')
236 self.assertEqual(state, TaskManager.VALUE_DONE)
237 self.assertTaskResult(result)
238 self.wait_for_task('test7/task1')
239 state, result = task2.run('test7/task2')
240 self.assertEqual(state, TaskManager.VALUE_DONE)
241 self.assertTaskResult(result)
242 self.wait_for_task('test7/task2')
243 time.sleep(1)
244 _, fn_t = TaskManager.list('test7/*')
245 self.assertEqual(len(fn_t), 2)
246 for idx, task in enumerate(fn_t):
247 self.assertEqual(task.name,
248 "test7/task{}".format(len(fn_t)-idx))
249 task3 = MyTask(0)
250 state, result = task3.run('test7/task3')
251 self.assertEqual(state, TaskManager.VALUE_DONE)
252 self.assertTaskResult(result)
253 self.wait_for_task('test7/task3')
254 time.sleep(1)
255 _, fn_t = TaskManager.list('test7/*')
256 self.assertEqual(len(fn_t), 3)
257 for idx, task in enumerate(fn_t):
258 self.assertEqual(task.name,
259 "test7/task{}".format(len(fn_t)-idx))
260 _, fn_t = TaskManager.list('test7/*')
261 self.assertEqual(len(fn_t), 2)
262 for idx, task in enumerate(fn_t):
263 self.assertEqual(task.name,
264 "test7/task{}".format(len(fn_t)-idx+1))
265
266 def test_task_serialization_format(self):
267 task1 = MyTask(0, wait=True, progress=20)
268 task2 = MyTask(1)
269 task1.run('test8/task1', 0.5)
270 task2.run('test8/task2', 0.5)
271 self.wait_for_task('test8/task2')
272 ex_t, fn_t = TaskManager.list_serializable('test8/*')
273 self.assertEqual(len(ex_t), 1)
274 self.assertEqual(len(fn_t), 1)
275
276 try:
277 json.dumps(ex_t)
278 except ValueError as ex:
279 self.fail("Failed to serialize executing tasks: {}".format(str(ex)))
280
281 try:
282 json.dumps(fn_t)
283 except ValueError as ex:
284 self.fail("Failed to serialize finished tasks: {}".format(str(ex)))
285
286 # validate executing tasks attributes
287 self.assertEqual(len(ex_t[0].keys()), 4)
288 self.assertEqual(ex_t[0]['name'], 'test8/task1')
289 self.assertEqual(ex_t[0]['metadata'], task1.metadata())
290 self.assertIsNotNone(ex_t[0]['begin_time'])
291 self.assertEqual(ex_t[0]['progress'], 20)
292 # validate finished tasks attributes
293 self.assertEqual(len(fn_t[0].keys()), 9)
294 self.assertEqual(fn_t[0]['name'], 'test8/task2')
295 self.assertEqual(fn_t[0]['metadata'], task2.metadata())
296 self.assertIsNotNone(fn_t[0]['begin_time'])
297 self.assertIsNotNone(fn_t[0]['end_time'])
298 self.assertGreaterEqual(fn_t[0]['duration'], 1.0)
299 self.assertEqual(fn_t[0]['progress'], 100)
300 self.assertTrue(fn_t[0]['success'])
301 self.assertTaskResult(fn_t[0]['ret_value'])
302 self.assertIsNone(fn_t[0]['exception'])
303 task1.resume()
304 self.wait_for_task('test8/task1')
305
306 def test_fast_async_task(self):
307 task1 = MyTask(1, is_async=True)
308 state, result = task1.run('test9/task1')
309 self.assertEqual(state, TaskManager.VALUE_DONE)
310 self.assertTaskResult(result)
311 self.wait_for_task('test9/task1')
312 _, fn_t = TaskManager.list('test9/*')
313 self.assertEqual(len(fn_t), 1)
314 self.assertIsNone(fn_t[0].exception)
315 self.assertTaskResult(fn_t[0].ret_value)
316 self.assertEqual(fn_t[0].progress, 100)
317
318 def test_slow_async_task(self):
319 task1 = MyTask(1, is_async=True)
320 state, result = task1.run('test10/task1', 0.5)
321 self.assertEqual(state, TaskManager.VALUE_EXECUTING)
322 self.assertIsNone(result)
323 self.wait_for_task('test10/task1')
324 _, fn_t = TaskManager.list('test10/*')
325 self.assertEqual(len(fn_t), 1)
326 self.assertIsNone(fn_t[0].exception)
327 self.assertTaskResult(fn_t[0].ret_value)
328 self.assertEqual(fn_t[0].progress, 100)
329
330 def test_fast_async_task_with_failure(self):
331 task1 = MyTask(1, fail=True, progress=40, is_async=True)
332
333 with self.assertRaises(Exception) as ctx:
334 task1.run('test11/task1')
335
336 self.assertEqual(str(ctx.exception), "Task Unexpected Exception")
337 self.wait_for_task('test11/task1')
338 _, fn_t = TaskManager.list('test11/*')
339 self.assertEqual(len(fn_t), 1)
340 self.assertIsNone(fn_t[0].ret_value)
341 self.assertEqual(str(fn_t[0].exception), "Task Unexpected Exception")
342 self.assertEqual(fn_t[0].progress, 40)
343
344 def test_slow_async_task_with_failure(self):
345 task1 = MyTask(1, fail=True, progress=70, is_async=True)
346 state, result = task1.run('test12/task1', 0.5)
347 self.assertEqual(state, TaskManager.VALUE_EXECUTING)
348 self.assertIsNone(result)
349 self.wait_for_task('test12/task1')
350 _, fn_t = TaskManager.list('test12/*')
351 self.assertEqual(len(fn_t), 1)
352 self.assertIsNone(fn_t[0].ret_value)
353 self.assertEqual(str(fn_t[0].exception), "Task Unexpected Exception")
354 self.assertEqual(fn_t[0].progress, 70)
355
356 def test_fast_async_task_with_premature_failure(self):
357 task1 = MyTask(1, fail="premature", progress=40, is_async=True)
358
359 with self.assertRaises(Exception) as ctx:
360 task1.run('test13/task1')
361
362 self.assertEqual(str(ctx.exception), "Task Unexpected Exception")
363 self.wait_for_task('test13/task1')
364 _, fn_t = TaskManager.list('test13/*')
365 self.assertEqual(len(fn_t), 1)
366 self.assertIsNone(fn_t[0].ret_value)
367 self.assertEqual(str(fn_t[0].exception), "Task Unexpected Exception")
368
369 def test_task_serialization_format_on_failure(self):
370 task1 = MyTask(1, fail=True)
371 task1.run('test14/task1', 0.5)
372 self.wait_for_task('test14/task1')
373 ex_t, fn_t = TaskManager.list_serializable('test14/*')
374 self.assertEqual(len(ex_t), 0)
375 self.assertEqual(len(fn_t), 1)
376 # validate finished tasks attributes
377
378 try:
379 json.dumps(fn_t)
380 except TypeError as ex:
381 self.fail("Failed to serialize finished tasks: {}".format(str(ex)))
382
383 self.assertEqual(len(fn_t[0].keys()), 9)
384 self.assertEqual(fn_t[0]['name'], 'test14/task1')
385 self.assertEqual(fn_t[0]['metadata'], task1.metadata())
386 self.assertIsNotNone(fn_t[0]['begin_time'])
387 self.assertIsNotNone(fn_t[0]['end_time'])
388 self.assertGreaterEqual(fn_t[0]['duration'], 1.0)
389 self.assertEqual(fn_t[0]['progress'], 50)
390 self.assertFalse(fn_t[0]['success'])
391 self.assertIsNotNone(fn_t[0]['exception'])
392 self.assertEqual(fn_t[0]['exception'],
393 {"detail": "Task Unexpected Exception"})
394
395 def test_task_serialization_format_on_failure_with_handler(self):
396 task1 = MyTask(1, fail=True, handle_ex=True)
397 task1.run('test15/task1', 0.5)
398 self.wait_for_task('test15/task1')
399 ex_t, fn_t = TaskManager.list_serializable('test15/*')
400 self.assertEqual(len(ex_t), 0)
401 self.assertEqual(len(fn_t), 1)
402 # validate finished tasks attributes
403
404 try:
405 json.dumps(fn_t)
406 except TypeError as ex:
407 self.fail("Failed to serialize finished tasks: {}".format(str(ex)))
408
409 self.assertEqual(len(fn_t[0].keys()), 9)
410 self.assertEqual(fn_t[0]['name'], 'test15/task1')
411 self.assertEqual(fn_t[0]['metadata'], task1.metadata())
412 self.assertIsNotNone(fn_t[0]['begin_time'])
413 self.assertIsNotNone(fn_t[0]['end_time'])
414 self.assertGreaterEqual(fn_t[0]['duration'], 1.0)
415 self.assertEqual(fn_t[0]['progress'], 50)
416 self.assertFalse(fn_t[0]['success'])
417 self.assertIsNotNone(fn_t[0]['exception'])
418 self.assertEqual(fn_t[0]['exception'], {
419 'component': None,
420 'detail': 'Task Unexpected Exception',
421 'status': 500,
422 'task': {
423 'metadata': {
424 'fail': True,
425 'handle_ex': True,
426 'is_async': False,
427 'op_seconds': 1,
428 'progress': 50,
429 'wait': False},
430 'name': 'test15/task1'
431 }
432 })