]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | # -*- coding: utf-8 -*- |
11fdf7f2 TL |
2 | |
3 | import json | |
11fdf7f2 TL |
4 | import threading |
5 | import time | |
f67539c2 | 6 | import unittest |
11fdf7f2 TL |
7 | from collections import defaultdict |
8 | from functools import partial | |
9 | ||
10 | from ..services.exception import serialize_dashboard_exception | |
f67539c2 | 11 | from ..tools import NotificationQueue, TaskExecutor, TaskManager |
11fdf7f2 TL |
12 | |
13 | ||
14 | class MyTask(object): | |
15 | class CallbackExecutor(TaskExecutor): | |
16 | def __init__(self, fail, progress): | |
17 | super(MyTask.CallbackExecutor, self).__init__() | |
18 | self.fail = fail | |
19 | self.progress = progress | |
20 | ||
21 | def init(self, task): | |
22 | super(MyTask.CallbackExecutor, self).init(task) | |
23 | args = [self.callback] | |
24 | args.extend(self.task.fn_args) | |
25 | self.task.fn_args = args | |
26 | ||
27 | def callback(self, result): | |
28 | self.task.set_progress(self.progress) | |
29 | if self.fail: | |
30 | self.finish(None, Exception("Task Unexpected Exception")) | |
31 | else: | |
32 | self.finish(result, None) | |
33 | ||
34 | # pylint: disable=too-many-arguments | |
35 | def __init__(self, op_seconds, wait=False, fail=False, progress=50, | |
36 | is_async=False, handle_ex=False): | |
37 | self.op_seconds = op_seconds | |
38 | self.wait = wait | |
39 | self.fail = fail | |
40 | self.progress = progress | |
41 | self.is_async = is_async | |
42 | self.handle_ex = handle_ex | |
43 | self._event = threading.Event() | |
44 | ||
45 | def run(self, ns, timeout=None): | |
46 | args = ['dummy arg'] | |
47 | kwargs = {'dummy': 'arg'} | |
48 | h_ex = partial(serialize_dashboard_exception, | |
49 | include_http_status=True) if self.handle_ex else None | |
50 | if not self.is_async: | |
51 | task = TaskManager.run( | |
52 | ns, self.metadata(), self.task_op, args, kwargs, | |
53 | exception_handler=h_ex) | |
54 | else: | |
55 | task = TaskManager.run( | |
56 | ns, self.metadata(), self.task_async_op, args, kwargs, | |
57 | executor=MyTask.CallbackExecutor(self.fail, self.progress), | |
58 | exception_handler=h_ex) | |
59 | return task.wait(timeout) | |
60 | ||
61 | def task_op(self, *args, **kwargs): | |
62 | time.sleep(self.op_seconds) | |
63 | TaskManager.current_task().set_progress(self.progress) | |
64 | if self.fail: | |
65 | raise Exception("Task Unexpected Exception") | |
66 | if self.wait: | |
67 | self._event.wait() | |
68 | return {'args': list(args), 'kwargs': kwargs} | |
69 | ||
70 | def task_async_op(self, callback, *args, **kwargs): | |
71 | if self.fail == "premature": | |
72 | raise Exception("Task Unexpected Exception") | |
73 | ||
74 | def _run_bg(): | |
75 | time.sleep(self.op_seconds) | |
76 | if self.wait: | |
77 | self._event.wait() | |
78 | callback({'args': list(args), 'kwargs': kwargs}) | |
79 | ||
80 | worker = threading.Thread(target=_run_bg) | |
81 | worker.start() | |
82 | ||
83 | def resume(self): | |
84 | self._event.set() | |
85 | ||
86 | def metadata(self): | |
87 | return { | |
88 | 'op_seconds': self.op_seconds, | |
89 | 'wait': self.wait, | |
90 | 'fail': self.fail, | |
91 | 'progress': self.progress, | |
92 | 'is_async': self.is_async, | |
93 | 'handle_ex': self.handle_ex | |
94 | } | |
95 | ||
96 | ||
97 | class TaskTest(unittest.TestCase): | |
98 | ||
99 | TASK_FINISHED_MAP = defaultdict(threading.Event) | |
100 | ||
101 | @classmethod | |
102 | def _handle_task(cls, task): | |
103 | cls.TASK_FINISHED_MAP[task.name].set() | |
104 | ||
105 | @classmethod | |
106 | def wait_for_task(cls, name): | |
107 | cls.TASK_FINISHED_MAP[name].wait() | |
108 | ||
109 | @classmethod | |
110 | def setUpClass(cls): | |
111 | NotificationQueue.start_queue() | |
112 | TaskManager.init() | |
113 | NotificationQueue.register(cls._handle_task, 'cd_task_finished', | |
114 | priority=100) | |
115 | ||
116 | @classmethod | |
117 | def tearDownClass(cls): | |
118 | NotificationQueue.deregister(cls._handle_task, 'cd_task_finished') | |
119 | NotificationQueue.stop() | |
120 | ||
121 | def setUp(self): | |
122 | TaskManager.FINISHED_TASK_SIZE = 10 | |
123 | TaskManager.FINISHED_TASK_TTL = 60.0 | |
124 | ||
9f95a23c | 125 | def assertTaskResult(self, result): # noqa: N802 |
11fdf7f2 TL |
126 | self.assertEqual(result, |
127 | {'args': ['dummy arg'], 'kwargs': {'dummy': 'arg'}}) | |
128 | ||
129 | def test_fast_task(self): | |
130 | task1 = MyTask(1) | |
131 | state, result = task1.run('test1/task1') | |
132 | self.assertEqual(state, TaskManager.VALUE_DONE) | |
133 | self.assertTaskResult(result) | |
134 | self.wait_for_task('test1/task1') | |
135 | _, fn_t = TaskManager.list('test1/*') | |
136 | self.assertEqual(len(fn_t), 1) | |
137 | self.assertIsNone(fn_t[0].exception) | |
138 | self.assertTaskResult(fn_t[0].ret_value) | |
139 | self.assertEqual(fn_t[0].progress, 100) | |
140 | ||
141 | def test_slow_task(self): | |
142 | task1 = MyTask(1) | |
143 | state, result = task1.run('test2/task1', 0.5) | |
144 | self.assertEqual(state, TaskManager.VALUE_EXECUTING) | |
145 | self.assertIsNone(result) | |
146 | self.wait_for_task('test2/task1') | |
147 | _, fn_t = TaskManager.list('test2/*') | |
148 | self.assertEqual(len(fn_t), 1) | |
149 | self.assertIsNone(fn_t[0].exception) | |
150 | self.assertTaskResult(fn_t[0].ret_value) | |
151 | self.assertEqual(fn_t[0].progress, 100) | |
152 | ||
153 | def test_fast_task_with_failure(self): | |
154 | task1 = MyTask(1, fail=True, progress=40) | |
155 | ||
156 | with self.assertRaises(Exception) as ctx: | |
157 | task1.run('test3/task1') | |
158 | ||
159 | self.assertEqual(str(ctx.exception), "Task Unexpected Exception") | |
160 | self.wait_for_task('test3/task1') | |
161 | _, fn_t = TaskManager.list('test3/*') | |
162 | self.assertEqual(len(fn_t), 1) | |
163 | self.assertIsNone(fn_t[0].ret_value) | |
164 | self.assertEqual(str(fn_t[0].exception), "Task Unexpected Exception") | |
165 | self.assertEqual(fn_t[0].progress, 40) | |
166 | ||
167 | def test_slow_task_with_failure(self): | |
168 | task1 = MyTask(1, fail=True, progress=70) | |
169 | state, result = task1.run('test4/task1', 0.5) | |
170 | self.assertEqual(state, TaskManager.VALUE_EXECUTING) | |
171 | self.assertIsNone(result) | |
172 | self.wait_for_task('test4/task1') | |
173 | _, fn_t = TaskManager.list('test4/*') | |
174 | self.assertEqual(len(fn_t), 1) | |
175 | self.assertIsNone(fn_t[0].ret_value) | |
176 | self.assertEqual(str(fn_t[0].exception), "Task Unexpected Exception") | |
177 | self.assertEqual(fn_t[0].progress, 70) | |
178 | ||
179 | def test_executing_tasks_list(self): | |
180 | task1 = MyTask(0, wait=True, progress=30) | |
181 | task2 = MyTask(0, wait=True, progress=60) | |
182 | state, result = task1.run('test5/task1', 0.5) | |
183 | self.assertEqual(state, TaskManager.VALUE_EXECUTING) | |
184 | self.assertIsNone(result) | |
185 | ex_t, _ = TaskManager.list('test5/*') | |
186 | self.assertEqual(len(ex_t), 1) | |
187 | self.assertEqual(ex_t[0].name, 'test5/task1') | |
188 | self.assertEqual(ex_t[0].progress, 30) | |
189 | state, result = task2.run('test5/task2', 0.5) | |
190 | self.assertEqual(state, TaskManager.VALUE_EXECUTING) | |
191 | self.assertIsNone(result) | |
192 | ex_t, _ = TaskManager.list('test5/*') | |
193 | self.assertEqual(len(ex_t), 2) | |
194 | for task in ex_t: | |
195 | if task.name == 'test5/task1': | |
196 | self.assertEqual(task.progress, 30) | |
197 | elif task.name == 'test5/task2': | |
198 | self.assertEqual(task.progress, 60) | |
199 | task2.resume() | |
200 | self.wait_for_task('test5/task2') | |
201 | ex_t, _ = TaskManager.list('test5/*') | |
202 | self.assertEqual(len(ex_t), 1) | |
203 | self.assertEqual(ex_t[0].name, 'test5/task1') | |
204 | task1.resume() | |
205 | self.wait_for_task('test5/task1') | |
206 | ex_t, _ = TaskManager.list('test5/*') | |
207 | self.assertEqual(len(ex_t), 0) | |
208 | ||
209 | def test_task_idempotent(self): | |
210 | task1 = MyTask(0, wait=True) | |
211 | task1_clone = MyTask(0, wait=True) | |
212 | state, result = task1.run('test6/task1', 0.5) | |
213 | self.assertEqual(state, TaskManager.VALUE_EXECUTING) | |
214 | self.assertIsNone(result) | |
215 | ex_t, _ = TaskManager.list('test6/*') | |
216 | self.assertEqual(len(ex_t), 1) | |
217 | self.assertEqual(ex_t[0].name, 'test6/task1') | |
218 | state, result = task1_clone.run('test6/task1', 0.5) | |
219 | self.assertEqual(state, TaskManager.VALUE_EXECUTING) | |
220 | self.assertIsNone(result) | |
221 | ex_t, _ = TaskManager.list('test6/*') | |
222 | self.assertEqual(len(ex_t), 1) | |
223 | self.assertEqual(ex_t[0].name, 'test6/task1') | |
224 | task1.resume() | |
225 | self.wait_for_task('test6/task1') | |
226 | ex_t, fn_t = TaskManager.list('test6/*') | |
227 | self.assertEqual(len(ex_t), 0) | |
228 | self.assertEqual(len(fn_t), 1) | |
229 | ||
230 | def test_finished_cleanup(self): | |
231 | TaskManager.FINISHED_TASK_SIZE = 2 | |
232 | TaskManager.FINISHED_TASK_TTL = 0.5 | |
233 | task1 = MyTask(0) | |
234 | task2 = MyTask(0) | |
235 | state, result = task1.run('test7/task1') | |
236 | self.assertEqual(state, TaskManager.VALUE_DONE) | |
237 | self.assertTaskResult(result) | |
238 | self.wait_for_task('test7/task1') | |
239 | state, result = task2.run('test7/task2') | |
240 | self.assertEqual(state, TaskManager.VALUE_DONE) | |
241 | self.assertTaskResult(result) | |
242 | self.wait_for_task('test7/task2') | |
243 | time.sleep(1) | |
244 | _, fn_t = TaskManager.list('test7/*') | |
245 | self.assertEqual(len(fn_t), 2) | |
246 | for idx, task in enumerate(fn_t): | |
247 | self.assertEqual(task.name, | |
248 | "test7/task{}".format(len(fn_t)-idx)) | |
249 | task3 = MyTask(0) | |
250 | state, result = task3.run('test7/task3') | |
251 | self.assertEqual(state, TaskManager.VALUE_DONE) | |
252 | self.assertTaskResult(result) | |
253 | self.wait_for_task('test7/task3') | |
254 | time.sleep(1) | |
255 | _, fn_t = TaskManager.list('test7/*') | |
256 | self.assertEqual(len(fn_t), 3) | |
257 | for idx, task in enumerate(fn_t): | |
258 | self.assertEqual(task.name, | |
259 | "test7/task{}".format(len(fn_t)-idx)) | |
260 | _, fn_t = TaskManager.list('test7/*') | |
261 | self.assertEqual(len(fn_t), 2) | |
262 | for idx, task in enumerate(fn_t): | |
263 | self.assertEqual(task.name, | |
264 | "test7/task{}".format(len(fn_t)-idx+1)) | |
265 | ||
266 | def test_task_serialization_format(self): | |
267 | task1 = MyTask(0, wait=True, progress=20) | |
268 | task2 = MyTask(1) | |
269 | task1.run('test8/task1', 0.5) | |
270 | task2.run('test8/task2', 0.5) | |
271 | self.wait_for_task('test8/task2') | |
272 | ex_t, fn_t = TaskManager.list_serializable('test8/*') | |
273 | self.assertEqual(len(ex_t), 1) | |
274 | self.assertEqual(len(fn_t), 1) | |
275 | ||
276 | try: | |
277 | json.dumps(ex_t) | |
278 | except ValueError as ex: | |
279 | self.fail("Failed to serialize executing tasks: {}".format(str(ex))) | |
280 | ||
281 | try: | |
282 | json.dumps(fn_t) | |
283 | except ValueError as ex: | |
284 | self.fail("Failed to serialize finished tasks: {}".format(str(ex))) | |
285 | ||
286 | # validate executing tasks attributes | |
287 | self.assertEqual(len(ex_t[0].keys()), 4) | |
288 | self.assertEqual(ex_t[0]['name'], 'test8/task1') | |
289 | self.assertEqual(ex_t[0]['metadata'], task1.metadata()) | |
290 | self.assertIsNotNone(ex_t[0]['begin_time']) | |
291 | self.assertEqual(ex_t[0]['progress'], 20) | |
292 | # validate finished tasks attributes | |
293 | self.assertEqual(len(fn_t[0].keys()), 9) | |
294 | self.assertEqual(fn_t[0]['name'], 'test8/task2') | |
295 | self.assertEqual(fn_t[0]['metadata'], task2.metadata()) | |
296 | self.assertIsNotNone(fn_t[0]['begin_time']) | |
297 | self.assertIsNotNone(fn_t[0]['end_time']) | |
298 | self.assertGreaterEqual(fn_t[0]['duration'], 1.0) | |
299 | self.assertEqual(fn_t[0]['progress'], 100) | |
300 | self.assertTrue(fn_t[0]['success']) | |
301 | self.assertTaskResult(fn_t[0]['ret_value']) | |
302 | self.assertIsNone(fn_t[0]['exception']) | |
303 | task1.resume() | |
304 | self.wait_for_task('test8/task1') | |
305 | ||
306 | def test_fast_async_task(self): | |
307 | task1 = MyTask(1, is_async=True) | |
308 | state, result = task1.run('test9/task1') | |
309 | self.assertEqual(state, TaskManager.VALUE_DONE) | |
310 | self.assertTaskResult(result) | |
311 | self.wait_for_task('test9/task1') | |
312 | _, fn_t = TaskManager.list('test9/*') | |
313 | self.assertEqual(len(fn_t), 1) | |
314 | self.assertIsNone(fn_t[0].exception) | |
315 | self.assertTaskResult(fn_t[0].ret_value) | |
316 | self.assertEqual(fn_t[0].progress, 100) | |
317 | ||
318 | def test_slow_async_task(self): | |
319 | task1 = MyTask(1, is_async=True) | |
320 | state, result = task1.run('test10/task1', 0.5) | |
321 | self.assertEqual(state, TaskManager.VALUE_EXECUTING) | |
322 | self.assertIsNone(result) | |
323 | self.wait_for_task('test10/task1') | |
324 | _, fn_t = TaskManager.list('test10/*') | |
325 | self.assertEqual(len(fn_t), 1) | |
326 | self.assertIsNone(fn_t[0].exception) | |
327 | self.assertTaskResult(fn_t[0].ret_value) | |
328 | self.assertEqual(fn_t[0].progress, 100) | |
329 | ||
330 | def test_fast_async_task_with_failure(self): | |
331 | task1 = MyTask(1, fail=True, progress=40, is_async=True) | |
332 | ||
333 | with self.assertRaises(Exception) as ctx: | |
334 | task1.run('test11/task1') | |
335 | ||
336 | self.assertEqual(str(ctx.exception), "Task Unexpected Exception") | |
337 | self.wait_for_task('test11/task1') | |
338 | _, fn_t = TaskManager.list('test11/*') | |
339 | self.assertEqual(len(fn_t), 1) | |
340 | self.assertIsNone(fn_t[0].ret_value) | |
341 | self.assertEqual(str(fn_t[0].exception), "Task Unexpected Exception") | |
342 | self.assertEqual(fn_t[0].progress, 40) | |
343 | ||
344 | def test_slow_async_task_with_failure(self): | |
345 | task1 = MyTask(1, fail=True, progress=70, is_async=True) | |
346 | state, result = task1.run('test12/task1', 0.5) | |
347 | self.assertEqual(state, TaskManager.VALUE_EXECUTING) | |
348 | self.assertIsNone(result) | |
349 | self.wait_for_task('test12/task1') | |
350 | _, fn_t = TaskManager.list('test12/*') | |
351 | self.assertEqual(len(fn_t), 1) | |
352 | self.assertIsNone(fn_t[0].ret_value) | |
353 | self.assertEqual(str(fn_t[0].exception), "Task Unexpected Exception") | |
354 | self.assertEqual(fn_t[0].progress, 70) | |
355 | ||
356 | def test_fast_async_task_with_premature_failure(self): | |
357 | task1 = MyTask(1, fail="premature", progress=40, is_async=True) | |
358 | ||
359 | with self.assertRaises(Exception) as ctx: | |
360 | task1.run('test13/task1') | |
361 | ||
362 | self.assertEqual(str(ctx.exception), "Task Unexpected Exception") | |
363 | self.wait_for_task('test13/task1') | |
364 | _, fn_t = TaskManager.list('test13/*') | |
365 | self.assertEqual(len(fn_t), 1) | |
366 | self.assertIsNone(fn_t[0].ret_value) | |
367 | self.assertEqual(str(fn_t[0].exception), "Task Unexpected Exception") | |
368 | ||
369 | def test_task_serialization_format_on_failure(self): | |
370 | task1 = MyTask(1, fail=True) | |
371 | task1.run('test14/task1', 0.5) | |
372 | self.wait_for_task('test14/task1') | |
373 | ex_t, fn_t = TaskManager.list_serializable('test14/*') | |
374 | self.assertEqual(len(ex_t), 0) | |
375 | self.assertEqual(len(fn_t), 1) | |
376 | # validate finished tasks attributes | |
377 | ||
378 | try: | |
379 | json.dumps(fn_t) | |
380 | except TypeError as ex: | |
381 | self.fail("Failed to serialize finished tasks: {}".format(str(ex))) | |
382 | ||
383 | self.assertEqual(len(fn_t[0].keys()), 9) | |
384 | self.assertEqual(fn_t[0]['name'], 'test14/task1') | |
385 | self.assertEqual(fn_t[0]['metadata'], task1.metadata()) | |
386 | self.assertIsNotNone(fn_t[0]['begin_time']) | |
387 | self.assertIsNotNone(fn_t[0]['end_time']) | |
388 | self.assertGreaterEqual(fn_t[0]['duration'], 1.0) | |
389 | self.assertEqual(fn_t[0]['progress'], 50) | |
390 | self.assertFalse(fn_t[0]['success']) | |
391 | self.assertIsNotNone(fn_t[0]['exception']) | |
392 | self.assertEqual(fn_t[0]['exception'], | |
393 | {"detail": "Task Unexpected Exception"}) | |
394 | ||
395 | def test_task_serialization_format_on_failure_with_handler(self): | |
396 | task1 = MyTask(1, fail=True, handle_ex=True) | |
397 | task1.run('test15/task1', 0.5) | |
398 | self.wait_for_task('test15/task1') | |
399 | ex_t, fn_t = TaskManager.list_serializable('test15/*') | |
400 | self.assertEqual(len(ex_t), 0) | |
401 | self.assertEqual(len(fn_t), 1) | |
402 | # validate finished tasks attributes | |
403 | ||
404 | try: | |
405 | json.dumps(fn_t) | |
406 | except TypeError as ex: | |
407 | self.fail("Failed to serialize finished tasks: {}".format(str(ex))) | |
408 | ||
409 | self.assertEqual(len(fn_t[0].keys()), 9) | |
410 | self.assertEqual(fn_t[0]['name'], 'test15/task1') | |
411 | self.assertEqual(fn_t[0]['metadata'], task1.metadata()) | |
412 | self.assertIsNotNone(fn_t[0]['begin_time']) | |
413 | self.assertIsNotNone(fn_t[0]['end_time']) | |
414 | self.assertGreaterEqual(fn_t[0]['duration'], 1.0) | |
415 | self.assertEqual(fn_t[0]['progress'], 50) | |
416 | self.assertFalse(fn_t[0]['success']) | |
417 | self.assertIsNotNone(fn_t[0]['exception']) | |
418 | self.assertEqual(fn_t[0]['exception'], { | |
419 | 'component': None, | |
420 | 'detail': 'Task Unexpected Exception', | |
421 | 'status': 500, | |
422 | 'task': { | |
423 | 'metadata': { | |
424 | 'fail': True, | |
425 | 'handle_ex': True, | |
426 | 'is_async': False, | |
427 | 'op_seconds': 1, | |
428 | 'progress': 50, | |
429 | 'wait': False}, | |
430 | 'name': 'test15/task1' | |
431 | } | |
432 | }) |