]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | # -*- coding: utf-8 -*- |
11fdf7f2 | 2 | |
f67539c2 TL |
3 | import collections |
4 | import fnmatch | |
11fdf7f2 TL |
5 | import inspect |
6 | import json | |
9f95a23c | 7 | import logging |
f67539c2 TL |
8 | import threading |
9 | import time | |
10 | import urllib | |
11fdf7f2 TL |
11 | from datetime import datetime, timedelta |
12 | from distutils.util import strtobool | |
11fdf7f2 | 13 | |
f67539c2 | 14 | import cherrypy |
522d829b | 15 | from mgr_util import build_url |
f91f0fd5 | 16 | |
9f95a23c | 17 | from . import mgr |
11fdf7f2 | 18 | from .exceptions import ViewCacheNoDataException |
11fdf7f2 | 19 | from .services.auth import JwtManager |
f67539c2 | 20 | from .settings import Settings |
11fdf7f2 | 21 | |
9f95a23c | 22 | try: |
f67539c2 TL |
23 | from typing import Any, AnyStr, Callable, DefaultDict, Deque, Dict, List, \ |
24 | Optional, Set, Tuple, Union | |
9f95a23c TL |
25 | except ImportError: |
26 | pass # For typing only | |
27 | ||
11fdf7f2 TL |
28 | |
29 | class RequestLoggingTool(cherrypy.Tool): | |
30 | def __init__(self): | |
31 | cherrypy.Tool.__init__(self, 'before_handler', self.request_begin, | |
32 | priority=10) | |
9f95a23c | 33 | self.logger = logging.getLogger('request') |
11fdf7f2 TL |
34 | |
35 | def _setup(self): | |
36 | cherrypy.Tool._setup(self) | |
37 | cherrypy.request.hooks.attach('on_end_request', self.request_end, | |
38 | priority=5) | |
39 | cherrypy.request.hooks.attach('after_error_response', self.request_error, | |
40 | priority=5) | |
41 | ||
42 | def request_begin(self): | |
43 | req = cherrypy.request | |
44 | user = JwtManager.get_username() | |
45 | # Log the request. | |
9f95a23c TL |
46 | self.logger.debug('[%s:%s] [%s] [%s] %s', req.remote.ip, req.remote.port, |
47 | req.method, user, req.path_info) | |
11fdf7f2 TL |
48 | # Audit the request. |
49 | if Settings.AUDIT_API_ENABLED and req.method not in ['GET']: | |
50 | url = build_url(req.remote.ip, scheme=req.scheme, | |
51 | port=req.remote.port) | |
52 | msg = '[DASHBOARD] from=\'{}\' path=\'{}\' method=\'{}\' ' \ | |
53 | 'user=\'{}\''.format(url, req.path_info, req.method, user) | |
54 | if Settings.AUDIT_API_LOG_PAYLOAD: | |
55 | params = dict(req.params or {}, **get_request_body_params(req)) | |
56 | # Hide sensitive data like passwords, secret keys, ... | |
57 | # Extend the list of patterns to search for if necessary. | |
58 | # Currently parameters like this are processed: | |
59 | # - secret_key | |
60 | # - user_password | |
61 | # - new_passwd_to_login | |
62 | keys = [] | |
63 | for key in ['password', 'passwd', 'secret']: | |
64 | keys.extend([x for x in params.keys() if key in x]) | |
65 | for key in keys: | |
66 | params[key] = '***' | |
67 | msg = '{} params=\'{}\''.format(msg, json.dumps(params)) | |
f67539c2 | 68 | mgr.cluster_log('audit', mgr.ClusterLogPrio.INFO, msg) |
11fdf7f2 TL |
69 | |
70 | def request_error(self): | |
9f95a23c TL |
71 | self._request_log(self.logger.error) |
72 | self.logger.error(cherrypy.response.body) | |
11fdf7f2 TL |
73 | |
74 | def request_end(self): | |
75 | status = cherrypy.response.status[:3] | |
9f95a23c | 76 | if status in ["401", "403"]: |
11fdf7f2 | 77 | # log unauthorized accesses |
9f95a23c | 78 | self._request_log(self.logger.warning) |
11fdf7f2 | 79 | else: |
9f95a23c | 80 | self._request_log(self.logger.info) |
11fdf7f2 TL |
81 | |
82 | def _format_bytes(self, num): | |
83 | units = ['B', 'K', 'M', 'G'] | |
84 | ||
85 | if isinstance(num, str): | |
86 | try: | |
87 | num = int(num) | |
88 | except ValueError: | |
89 | return "n/a" | |
90 | ||
91 | format_str = "{:.0f}{}" | |
92 | for i, unit in enumerate(units): | |
93 | div = 2**(10*i) | |
94 | if num < 2**(10*(i+1)): | |
95 | if num % div == 0: | |
96 | format_str = "{}{}" | |
97 | else: | |
98 | div = float(div) | |
99 | format_str = "{:.1f}{}" | |
100 | return format_str.format(num/div, unit[0]) | |
101 | ||
102 | # content-length bigger than 1T!! return value in bytes | |
103 | return "{}B".format(num) | |
104 | ||
105 | def _request_log(self, logger_fn): | |
106 | req = cherrypy.request | |
107 | res = cherrypy.response | |
108 | lat = time.time() - res.time | |
109 | user = JwtManager.get_username() | |
110 | status = res.status[:3] if isinstance(res.status, str) else res.status | |
111 | if 'Content-Length' in res.headers: | |
112 | length = self._format_bytes(res.headers['Content-Length']) | |
113 | else: | |
114 | length = self._format_bytes(0) | |
115 | if user: | |
116 | logger_fn("[%s:%s] [%s] [%s] [%s] [%s] [%s] %s", req.remote.ip, | |
117 | req.remote.port, req.method, status, | |
118 | "{0:.3f}s".format(lat), user, length, req.path_info) | |
119 | else: | |
9f95a23c | 120 | logger_fn("[%s:%s] [%s] [%s] [%s] [%s] [%s] %s", req.remote.ip, |
11fdf7f2 | 121 | req.remote.port, req.method, status, |
9f95a23c | 122 | "{0:.3f}s".format(lat), length, getattr(req, 'unique_id', '-'), req.path_info) |
11fdf7f2 TL |
123 | |
124 | ||
125 | # pylint: disable=too-many-instance-attributes | |
126 | class ViewCache(object): | |
127 | VALUE_OK = 0 | |
128 | VALUE_STALE = 1 | |
129 | VALUE_NONE = 2 | |
130 | ||
131 | class GetterThread(threading.Thread): | |
132 | def __init__(self, view, fn, args, kwargs): | |
133 | super(ViewCache.GetterThread, self).__init__() | |
134 | self._view = view | |
135 | self.event = threading.Event() | |
136 | self.fn = fn | |
137 | self.args = args | |
138 | self.kwargs = kwargs | |
139 | ||
140 | # pylint: disable=broad-except | |
141 | def run(self): | |
142 | t0 = 0.0 | |
143 | t1 = 0.0 | |
144 | try: | |
145 | t0 = time.time() | |
9f95a23c | 146 | self._view.logger.debug("starting execution of %s", self.fn) |
11fdf7f2 TL |
147 | val = self.fn(*self.args, **self.kwargs) |
148 | t1 = time.time() | |
149 | except Exception as ex: | |
150 | with self._view.lock: | |
9f95a23c TL |
151 | self._view.logger.exception("Error while calling fn=%s ex=%s", self.fn, |
152 | str(ex)) | |
11fdf7f2 TL |
153 | self._view.value = None |
154 | self._view.value_when = None | |
155 | self._view.getter_thread = None | |
156 | self._view.exception = ex | |
157 | else: | |
158 | with self._view.lock: | |
159 | self._view.latency = t1 - t0 | |
160 | self._view.value = val | |
161 | self._view.value_when = datetime.now() | |
162 | self._view.getter_thread = None | |
163 | self._view.exception = None | |
164 | ||
9f95a23c TL |
165 | self._view.logger.debug("execution of %s finished in: %s", self.fn, |
166 | t1 - t0) | |
11fdf7f2 TL |
167 | self.event.set() |
168 | ||
169 | class RemoteViewCache(object): | |
170 | # Return stale data if | |
171 | STALE_PERIOD = 1.0 | |
172 | ||
173 | def __init__(self, timeout): | |
174 | self.getter_thread = None | |
175 | # Consider data within 1s old to be sufficiently fresh | |
176 | self.timeout = timeout | |
177 | self.event = threading.Event() | |
178 | self.value_when = None | |
179 | self.value = None | |
180 | self.latency = 0 | |
181 | self.exception = None | |
182 | self.lock = threading.Lock() | |
9f95a23c | 183 | self.logger = logging.getLogger('viewcache') |
11fdf7f2 TL |
184 | |
185 | def reset(self): | |
186 | with self.lock: | |
187 | self.value_when = None | |
188 | self.value = None | |
189 | ||
190 | def run(self, fn, args, kwargs): | |
191 | """ | |
192 | If data less than `stale_period` old is available, return it | |
193 | immediately. | |
194 | If an attempt to fetch data does not complete within `timeout`, then | |
195 | return the most recent data available, with a status to indicate that | |
196 | it is stale. | |
197 | ||
198 | Initialization does not count towards the timeout, so the first call | |
199 | on one of these objects during the process lifetime may be slower | |
200 | than subsequent calls. | |
201 | ||
202 | :return: 2-tuple of value status code, value | |
203 | """ | |
204 | with self.lock: | |
205 | now = datetime.now() | |
206 | if self.value_when and now - self.value_when < timedelta( | |
207 | seconds=self.STALE_PERIOD): | |
208 | return ViewCache.VALUE_OK, self.value | |
209 | ||
210 | if self.getter_thread is None: | |
211 | self.getter_thread = ViewCache.GetterThread(self, fn, args, | |
212 | kwargs) | |
213 | self.getter_thread.start() | |
214 | else: | |
9f95a23c | 215 | self.logger.debug("getter_thread still alive for: %s", fn) |
11fdf7f2 TL |
216 | |
217 | ev = self.getter_thread.event | |
218 | ||
219 | success = ev.wait(timeout=self.timeout) | |
220 | ||
221 | with self.lock: | |
222 | if success: | |
223 | # We fetched the data within the timeout | |
224 | if self.exception: | |
225 | # execution raised an exception | |
226 | # pylint: disable=raising-bad-type | |
227 | raise self.exception | |
228 | return ViewCache.VALUE_OK, self.value | |
229 | if self.value_when is not None: | |
230 | # We have some data, but it doesn't meet freshness requirements | |
231 | return ViewCache.VALUE_STALE, self.value | |
232 | # We have no data, not even stale data | |
233 | raise ViewCacheNoDataException() | |
234 | ||
235 | def __init__(self, timeout=5): | |
236 | self.timeout = timeout | |
237 | self.cache_by_args = {} | |
238 | ||
239 | def __call__(self, fn): | |
240 | def wrapper(*args, **kwargs): | |
241 | rvc = self.cache_by_args.get(args, None) | |
242 | if not rvc: | |
243 | rvc = ViewCache.RemoteViewCache(self.timeout) | |
244 | self.cache_by_args[args] = rvc | |
245 | return rvc.run(fn, args, kwargs) | |
9f95a23c | 246 | wrapper.reset = self.reset # type: ignore |
11fdf7f2 TL |
247 | return wrapper |
248 | ||
249 | def reset(self): | |
250 | for _, rvc in self.cache_by_args.items(): | |
251 | rvc.reset() | |
252 | ||
253 | ||
254 | class NotificationQueue(threading.Thread): | |
255 | _ALL_TYPES_ = '__ALL__' | |
9f95a23c | 256 | _listeners = collections.defaultdict(set) # type: DefaultDict[str, Set[Tuple[int, Callable]]] |
11fdf7f2 TL |
257 | _lock = threading.Lock() |
258 | _cond = threading.Condition() | |
9f95a23c | 259 | _queue = collections.deque() # type: Deque[Tuple[str, Any]] |
11fdf7f2 TL |
260 | _running = False |
261 | _instance = None | |
262 | ||
263 | def __init__(self): | |
264 | super(NotificationQueue, self).__init__() | |
265 | ||
266 | @classmethod | |
267 | def start_queue(cls): | |
268 | with cls._lock: | |
269 | if cls._instance: | |
270 | # the queue thread is already running | |
271 | return | |
272 | cls._running = True | |
273 | cls._instance = NotificationQueue() | |
9f95a23c TL |
274 | cls.logger = logging.getLogger('notification_queue') # type: ignore |
275 | cls.logger.debug("starting notification queue") # type: ignore | |
11fdf7f2 TL |
276 | cls._instance.start() |
277 | ||
278 | @classmethod | |
279 | def stop(cls): | |
280 | with cls._lock: | |
281 | if not cls._instance: | |
282 | # the queue thread was not started | |
283 | return | |
284 | instance = cls._instance | |
285 | cls._instance = None | |
286 | cls._running = False | |
287 | with cls._cond: | |
288 | cls._cond.notify() | |
9f95a23c | 289 | cls.logger.debug("waiting for notification queue to finish") # type: ignore |
11fdf7f2 | 290 | instance.join() |
9f95a23c | 291 | cls.logger.debug("notification queue stopped") # type: ignore |
11fdf7f2 TL |
292 | |
293 | @classmethod | |
294 | def _registered_handler(cls, func, n_types): | |
295 | for _, reg_func in cls._listeners[n_types]: | |
296 | if reg_func == func: | |
297 | return True | |
298 | return False | |
299 | ||
300 | @classmethod | |
301 | def register(cls, func, n_types=None, priority=1): | |
302 | """Registers function to listen for notifications | |
303 | ||
304 | If the second parameter `n_types` is omitted, the function in `func` | |
305 | parameter will be called for any type of notifications. | |
306 | ||
307 | Args: | |
308 | func (function): python function ex: def foo(val) | |
309 | n_types (str|list): the single type to listen, or a list of types | |
310 | priority (int): the priority level (1=max, +inf=min) | |
311 | """ | |
312 | with cls._lock: | |
313 | if not n_types: | |
314 | n_types = [cls._ALL_TYPES_] | |
315 | elif isinstance(n_types, str): | |
316 | n_types = [n_types] | |
317 | elif not isinstance(n_types, list): | |
318 | raise Exception("n_types param is neither a string nor a list") | |
319 | for ev_type in n_types: | |
320 | if not cls._registered_handler(func, ev_type): | |
321 | cls._listeners[ev_type].add((priority, func)) | |
9f95a23c TL |
322 | cls.logger.debug( # type: ignore |
323 | "function %s was registered for events of type %s", | |
324 | func, ev_type | |
325 | ) | |
11fdf7f2 TL |
326 | |
327 | @classmethod | |
328 | def deregister(cls, func, n_types=None): | |
9f95a23c | 329 | # type: (Callable, Union[str, list, None]) -> None |
11fdf7f2 TL |
330 | """Removes the listener function from this notification queue |
331 | ||
332 | If the second parameter `n_types` is omitted, the function is removed | |
333 | from all event types, otherwise the function is removed only for the | |
334 | specified event types. | |
335 | ||
336 | Args: | |
337 | func (function): python function | |
338 | n_types (str|list): the single event type, or a list of event types | |
339 | """ | |
340 | with cls._lock: | |
341 | if not n_types: | |
342 | n_types = list(cls._listeners.keys()) | |
343 | elif isinstance(n_types, str): | |
344 | n_types = [n_types] | |
345 | elif not isinstance(n_types, list): | |
346 | raise Exception("n_types param is neither a string nor a list") | |
347 | for ev_type in n_types: | |
348 | listeners = cls._listeners[ev_type] | |
9f95a23c | 349 | to_remove = None |
11fdf7f2 TL |
350 | for pr, fn in listeners: |
351 | if fn == func: | |
9f95a23c | 352 | to_remove = (pr, fn) |
11fdf7f2 | 353 | break |
9f95a23c TL |
354 | if to_remove: |
355 | listeners.discard(to_remove) | |
356 | cls.logger.debug( # type: ignore | |
357 | "function %s was deregistered for events of type %s", | |
358 | func, ev_type | |
359 | ) | |
11fdf7f2 TL |
360 | |
361 | @classmethod | |
362 | def new_notification(cls, notify_type, notify_value): | |
9f95a23c | 363 | # type: (str, Any) -> None |
11fdf7f2 TL |
364 | with cls._cond: |
365 | cls._queue.append((notify_type, notify_value)) | |
366 | cls._cond.notify() | |
367 | ||
368 | @classmethod | |
369 | def _notify_listeners(cls, events): | |
370 | for ev in events: | |
371 | notify_type, notify_value = ev | |
372 | with cls._lock: | |
373 | listeners = list(cls._listeners[notify_type]) | |
374 | listeners.extend(cls._listeners[cls._ALL_TYPES_]) | |
375 | listeners.sort(key=lambda lis: lis[0]) | |
376 | for listener in listeners: | |
377 | listener[1](notify_value) | |
378 | ||
379 | def run(self): | |
9f95a23c | 380 | self.logger.debug("notification queue started") # type: ignore |
11fdf7f2 TL |
381 | while self._running: |
382 | private_buffer = [] | |
9f95a23c | 383 | self.logger.debug("processing queue: %s", len(self._queue)) # type: ignore |
11fdf7f2 TL |
384 | try: |
385 | while True: | |
386 | private_buffer.append(self._queue.popleft()) | |
387 | except IndexError: | |
388 | pass | |
389 | self._notify_listeners(private_buffer) | |
390 | with self._cond: | |
391 | while self._running and not self._queue: | |
392 | self._cond.wait() | |
393 | # flush remaining events | |
9f95a23c | 394 | self.logger.debug("flush remaining events: %s", len(self._queue)) # type: ignore |
11fdf7f2 TL |
395 | self._notify_listeners(self._queue) |
396 | self._queue.clear() | |
9f95a23c | 397 | self.logger.debug("notification queue finished") # type: ignore |
11fdf7f2 TL |
398 | |
399 | ||
400 | # pylint: disable=too-many-arguments, protected-access | |
401 | class TaskManager(object): | |
402 | FINISHED_TASK_SIZE = 10 | |
403 | FINISHED_TASK_TTL = 60.0 | |
404 | ||
405 | VALUE_DONE = "done" | |
406 | VALUE_EXECUTING = "executing" | |
407 | ||
9f95a23c TL |
408 | _executing_tasks = set() # type: Set[Task] |
409 | _finished_tasks = [] # type: List[Task] | |
11fdf7f2 TL |
410 | _lock = threading.Lock() |
411 | ||
412 | _task_local_data = threading.local() | |
413 | ||
414 | @classmethod | |
415 | def init(cls): | |
9f95a23c | 416 | cls.logger = logging.getLogger('taskmgr') # type: ignore |
11fdf7f2 TL |
417 | NotificationQueue.register(cls._handle_finished_task, 'cd_task_finished') |
418 | ||
419 | @classmethod | |
420 | def _handle_finished_task(cls, task): | |
9f95a23c | 421 | cls.logger.info("finished %s", task) # type: ignore |
11fdf7f2 TL |
422 | with cls._lock: |
423 | cls._executing_tasks.remove(task) | |
424 | cls._finished_tasks.append(task) | |
425 | ||
426 | @classmethod | |
427 | def run(cls, name, metadata, fn, args=None, kwargs=None, executor=None, | |
428 | exception_handler=None): | |
429 | if not args: | |
430 | args = [] | |
431 | if not kwargs: | |
432 | kwargs = {} | |
433 | if not executor: | |
434 | executor = ThreadedExecutor() | |
435 | task = Task(name, metadata, fn, args, kwargs, executor, | |
436 | exception_handler) | |
437 | with cls._lock: | |
438 | if task in cls._executing_tasks: | |
9f95a23c | 439 | cls.logger.debug("task already executing: %s", task) # type: ignore |
11fdf7f2 TL |
440 | for t in cls._executing_tasks: |
441 | if t == task: | |
442 | return t | |
9f95a23c | 443 | cls.logger.debug("created %s", task) # type: ignore |
11fdf7f2 | 444 | cls._executing_tasks.add(task) |
9f95a23c | 445 | cls.logger.info("running %s", task) # type: ignore |
11fdf7f2 TL |
446 | task._run() |
447 | return task | |
448 | ||
449 | @classmethod | |
450 | def current_task(cls): | |
451 | """ | |
452 | Returns the current task object. | |
453 | This method should only be called from a threaded task operation code. | |
454 | """ | |
455 | return cls._task_local_data.task | |
456 | ||
457 | @classmethod | |
458 | def _cleanup_old_tasks(cls, task_list): | |
459 | """ | |
460 | The cleanup rule is: maintain the FINISHED_TASK_SIZE more recent | |
461 | finished tasks, and the rest is maintained up to the FINISHED_TASK_TTL | |
462 | value. | |
463 | """ | |
464 | now = datetime.now() | |
465 | for idx, t in enumerate(task_list): | |
466 | if idx < cls.FINISHED_TASK_SIZE: | |
467 | continue | |
468 | if now - datetime.fromtimestamp(t[1].end_time) > \ | |
469 | timedelta(seconds=cls.FINISHED_TASK_TTL): | |
470 | del cls._finished_tasks[t[0]] | |
471 | ||
472 | @classmethod | |
473 | def list(cls, name_glob=None): | |
474 | executing_tasks = [] | |
475 | finished_tasks = [] | |
476 | with cls._lock: | |
477 | for task in cls._executing_tasks: | |
478 | if not name_glob or fnmatch.fnmatch(task.name, name_glob): | |
479 | executing_tasks.append(task) | |
480 | for idx, task in enumerate(cls._finished_tasks): | |
481 | if not name_glob or fnmatch.fnmatch(task.name, name_glob): | |
482 | finished_tasks.append((idx, task)) | |
483 | finished_tasks.sort(key=lambda t: t[1].end_time, reverse=True) | |
484 | cls._cleanup_old_tasks(finished_tasks) | |
485 | executing_tasks.sort(key=lambda t: t.begin_time, reverse=True) | |
486 | return executing_tasks, [t[1] for t in finished_tasks] | |
487 | ||
488 | @classmethod | |
489 | def list_serializable(cls, ns_glob=None): | |
490 | ex_t, fn_t = cls.list(ns_glob) | |
491 | return [{ | |
492 | 'name': t.name, | |
493 | 'metadata': t.metadata, | |
494 | 'begin_time': "{}Z".format(datetime.fromtimestamp(t.begin_time).isoformat()), | |
495 | 'progress': t.progress | |
496 | } for t in ex_t if t.begin_time], [{ | |
497 | 'name': t.name, | |
498 | 'metadata': t.metadata, | |
499 | 'begin_time': "{}Z".format(datetime.fromtimestamp(t.begin_time).isoformat()), | |
500 | 'end_time': "{}Z".format(datetime.fromtimestamp(t.end_time).isoformat()), | |
501 | 'duration': t.duration, | |
502 | 'progress': t.progress, | |
503 | 'success': not t.exception, | |
504 | 'ret_value': t.ret_value if not t.exception else None, | |
505 | 'exception': t.ret_value if t.exception and t.ret_value else ( | |
506 | {'detail': str(t.exception)} if t.exception else None) | |
507 | } for t in fn_t] | |
508 | ||
509 | ||
510 | # pylint: disable=protected-access | |
511 | class TaskExecutor(object): | |
512 | def __init__(self): | |
9f95a23c | 513 | self.logger = logging.getLogger('taskexec') |
11fdf7f2 TL |
514 | self.task = None |
515 | ||
516 | def init(self, task): | |
517 | self.task = task | |
518 | ||
519 | # pylint: disable=broad-except | |
520 | def start(self): | |
9f95a23c | 521 | self.logger.debug("executing task %s", self.task) |
11fdf7f2 | 522 | try: |
9f95a23c | 523 | self.task.fn(*self.task.fn_args, **self.task.fn_kwargs) # type: ignore |
11fdf7f2 | 524 | except Exception as ex: |
9f95a23c | 525 | self.logger.exception("Error while calling %s", self.task) |
11fdf7f2 TL |
526 | self.finish(None, ex) |
527 | ||
528 | def finish(self, ret_value, exception): | |
529 | if not exception: | |
9f95a23c | 530 | self.logger.debug("successfully finished task: %s", self.task) |
11fdf7f2 | 531 | else: |
9f95a23c TL |
532 | self.logger.debug("task finished with exception: %s", self.task) |
533 | self.task._complete(ret_value, exception) # type: ignore | |
11fdf7f2 TL |
534 | |
535 | ||
536 | # pylint: disable=protected-access | |
537 | class ThreadedExecutor(TaskExecutor): | |
538 | def __init__(self): | |
539 | super(ThreadedExecutor, self).__init__() | |
540 | self._thread = threading.Thread(target=self._run) | |
541 | ||
542 | def start(self): | |
543 | self._thread.start() | |
544 | ||
545 | # pylint: disable=broad-except | |
546 | def _run(self): | |
547 | TaskManager._task_local_data.task = self.task | |
548 | try: | |
9f95a23c TL |
549 | self.logger.debug("executing task %s", self.task) |
550 | val = self.task.fn(*self.task.fn_args, **self.task.fn_kwargs) # type: ignore | |
11fdf7f2 | 551 | except Exception as ex: |
9f95a23c | 552 | self.logger.exception("Error while calling %s", self.task) |
11fdf7f2 TL |
553 | self.finish(None, ex) |
554 | else: | |
555 | self.finish(val, None) | |
556 | ||
557 | ||
558 | class Task(object): | |
559 | def __init__(self, name, metadata, fn, args, kwargs, executor, | |
560 | exception_handler=None): | |
561 | self.name = name | |
562 | self.metadata = metadata | |
563 | self.fn = fn | |
564 | self.fn_args = args | |
565 | self.fn_kwargs = kwargs | |
566 | self.executor = executor | |
567 | self.ex_handler = exception_handler | |
568 | self.running = False | |
569 | self.event = threading.Event() | |
570 | self.progress = None | |
571 | self.ret_value = None | |
f67539c2 TL |
572 | self._begin_time: Optional[float] = None |
573 | self._end_time: Optional[float] = None | |
574 | self.duration = 0.0 | |
11fdf7f2 | 575 | self.exception = None |
9f95a23c | 576 | self.logger = logging.getLogger('task') |
11fdf7f2 TL |
577 | self.lock = threading.Lock() |
578 | ||
579 | def __hash__(self): | |
580 | return hash((self.name, tuple(sorted(self.metadata.items())))) | |
581 | ||
582 | def __eq__(self, other): | |
583 | return self.name == other.name and self.metadata == other.metadata | |
584 | ||
585 | def __str__(self): | |
586 | return "Task(ns={}, md={})" \ | |
587 | .format(self.name, self.metadata) | |
588 | ||
589 | def __repr__(self): | |
590 | return str(self) | |
591 | ||
592 | def _run(self): | |
eafe8130 | 593 | NotificationQueue.register(self._handle_task_finished, 'cd_task_finished', 100) |
11fdf7f2 TL |
594 | with self.lock: |
595 | assert not self.running | |
596 | self.executor.init(self) | |
597 | self.set_progress(0, in_lock=True) | |
f67539c2 | 598 | self._begin_time = time.time() |
11fdf7f2 TL |
599 | self.running = True |
600 | self.executor.start() | |
601 | ||
602 | def _complete(self, ret_value, exception=None): | |
603 | now = time.time() | |
604 | if exception and self.ex_handler: | |
605 | # pylint: disable=broad-except | |
606 | try: | |
607 | ret_value = self.ex_handler(exception, task=self) | |
608 | except Exception as ex: | |
609 | exception = ex | |
610 | with self.lock: | |
611 | assert self.running, "_complete cannot be called before _run" | |
f67539c2 | 612 | self._end_time = now |
11fdf7f2 TL |
613 | self.ret_value = ret_value |
614 | self.exception = exception | |
f67539c2 | 615 | self.duration = now - self.begin_time |
11fdf7f2 TL |
616 | if not self.exception: |
617 | self.set_progress(100, True) | |
618 | NotificationQueue.new_notification('cd_task_finished', self) | |
9f95a23c TL |
619 | self.logger.debug("execution of %s finished in: %s s", self, |
620 | self.duration) | |
11fdf7f2 | 621 | |
eafe8130 TL |
622 | def _handle_task_finished(self, task): |
623 | if self == task: | |
624 | NotificationQueue.deregister(self._handle_task_finished) | |
625 | self.event.set() | |
626 | ||
11fdf7f2 TL |
627 | def wait(self, timeout=None): |
628 | with self.lock: | |
629 | assert self.running, "wait cannot be called before _run" | |
630 | ev = self.event | |
631 | ||
632 | success = ev.wait(timeout=timeout) | |
633 | with self.lock: | |
634 | if success: | |
635 | # the action executed within the timeout | |
636 | if self.exception: | |
637 | # pylint: disable=raising-bad-type | |
638 | # execution raised an exception | |
639 | raise self.exception | |
640 | return TaskManager.VALUE_DONE, self.ret_value | |
641 | # the action is still executing | |
642 | return TaskManager.VALUE_EXECUTING, None | |
643 | ||
644 | def inc_progress(self, delta, in_lock=False): | |
645 | if not isinstance(delta, int) or delta < 0: | |
646 | raise Exception("Progress delta value must be a positive integer") | |
647 | if not in_lock: | |
648 | self.lock.acquire() | |
9f95a23c | 649 | prog = self.progress + delta # type: ignore |
11fdf7f2 TL |
650 | self.progress = prog if prog <= 100 else 100 |
651 | if not in_lock: | |
652 | self.lock.release() | |
653 | ||
654 | def set_progress(self, percentage, in_lock=False): | |
655 | if not isinstance(percentage, int) or percentage < 0 or percentage > 100: | |
656 | raise Exception("Progress value must be in percentage " | |
657 | "(0 <= percentage <= 100)") | |
658 | if not in_lock: | |
659 | self.lock.acquire() | |
660 | self.progress = percentage | |
661 | if not in_lock: | |
662 | self.lock.release() | |
663 | ||
f67539c2 TL |
664 | @property |
665 | def end_time(self) -> float: | |
666 | assert self._end_time is not None | |
667 | return self._end_time | |
668 | ||
669 | @property | |
670 | def begin_time(self) -> float: | |
671 | assert self._begin_time is not None | |
672 | return self._begin_time | |
673 | ||
11fdf7f2 | 674 | |
11fdf7f2 TL |
675 | def prepare_url_prefix(url_prefix): |
676 | """ | |
677 | return '' if no prefix, or '/prefix' without slash in the end. | |
678 | """ | |
f67539c2 | 679 | url_prefix = urllib.parse.urljoin('/', url_prefix) |
11fdf7f2 TL |
680 | return url_prefix.rstrip('/') |
681 | ||
682 | ||
683 | def dict_contains_path(dct, keys): | |
684 | """ | |
685 | Tests whether the keys exist recursively in `dictionary`. | |
686 | ||
687 | :type dct: dict | |
688 | :type keys: list | |
689 | :rtype: bool | |
690 | """ | |
691 | if keys: | |
692 | if not isinstance(dct, dict): | |
693 | return False | |
694 | key = keys.pop(0) | |
695 | if key in dct: | |
696 | dct = dct[key] | |
697 | return dict_contains_path(dct, keys) | |
698 | return False | |
699 | return True | |
700 | ||
701 | ||
9f95a23c TL |
702 | def dict_get(obj, path, default=None): |
703 | """ | |
704 | Get the value at any depth of a nested object based on the path | |
705 | described by `path`. If path doesn't exist, `default` is returned. | |
706 | """ | |
707 | current = obj | |
708 | for part in path.split('.'): | |
709 | if not isinstance(current, dict): | |
710 | return default | |
711 | if part not in current.keys(): | |
712 | return default | |
713 | current = current.get(part, {}) | |
714 | return current | |
715 | ||
716 | ||
11fdf7f2 TL |
717 | def getargspec(func): |
718 | try: | |
719 | while True: | |
720 | func = func.__wrapped__ | |
721 | except AttributeError: | |
722 | pass | |
723 | # pylint: disable=deprecated-method | |
f67539c2 | 724 | return inspect.getfullargspec(func) |
11fdf7f2 TL |
725 | |
726 | ||
727 | def str_to_bool(val): | |
728 | """ | |
729 | Convert a string representation of truth to True or False. | |
730 | ||
731 | >>> str_to_bool('true') and str_to_bool('yes') and str_to_bool('1') and str_to_bool(True) | |
732 | True | |
733 | ||
734 | >>> str_to_bool('false') and str_to_bool('no') and str_to_bool('0') and str_to_bool(False) | |
735 | False | |
736 | ||
737 | >>> str_to_bool('xyz') | |
738 | Traceback (most recent call last): | |
739 | ... | |
740 | ValueError: invalid truth value 'xyz' | |
741 | ||
742 | :param val: The value to convert. | |
743 | :type val: str|bool | |
744 | :rtype: bool | |
745 | """ | |
746 | if isinstance(val, bool): | |
747 | return val | |
748 | return bool(strtobool(val)) | |
749 | ||
750 | ||
9f95a23c TL |
751 | def json_str_to_object(value): # type: (AnyStr) -> Any |
752 | """ | |
753 | It converts a JSON valid string representation to object. | |
754 | ||
755 | >>> result = json_str_to_object('{"a": 1}') | |
756 | >>> result == {'a': 1} | |
757 | True | |
758 | """ | |
759 | if value == '': | |
760 | return value | |
761 | ||
762 | try: | |
763 | # json.loads accepts binary input from version >=3.6 | |
764 | value = value.decode('utf-8') # type: ignore | |
765 | except AttributeError: | |
766 | pass | |
767 | ||
768 | return json.loads(value) | |
769 | ||
770 | ||
771 | def partial_dict(orig, keys): # type: (Dict, List[str]) -> Dict | |
772 | """ | |
773 | It returns Dict containing only the selected keys of original Dict. | |
774 | ||
775 | >>> partial_dict({'a': 1, 'b': 2}, ['b']) | |
776 | {'b': 2} | |
777 | """ | |
778 | return {k: orig[k] for k in keys} | |
779 | ||
780 | ||
11fdf7f2 TL |
781 | def get_request_body_params(request): |
782 | """ | |
783 | Helper function to get parameters from the request body. | |
784 | :param request The CherryPy request object. | |
785 | :type request: cherrypy.Request | |
786 | :return: A dictionary containing the parameters. | |
787 | :rtype: dict | |
788 | """ | |
9f95a23c | 789 | params = {} # type: dict |
11fdf7f2 TL |
790 | if request.method not in request.methods_with_bodies: |
791 | return params | |
792 | ||
793 | content_type = request.headers.get('Content-Type', '') | |
794 | if content_type in ['application/json', 'text/javascript']: | |
795 | if not hasattr(request, 'json'): | |
796 | raise cherrypy.HTTPError(400, 'Expected JSON body') | |
797 | if isinstance(request.json, str): | |
798 | params.update(json.loads(request.json)) | |
799 | else: | |
800 | params.update(request.json) | |
801 | ||
802 | return params | |
803 | ||
804 | ||
805 | def find_object_in_list(key, value, iterable): | |
806 | """ | |
807 | Get the first occurrence of an object within a list with | |
808 | the specified key/value. | |
809 | ||
810 | >>> find_object_in_list('name', 'bar', [{'name': 'foo'}, {'name': 'bar'}]) | |
811 | {'name': 'bar'} | |
812 | ||
813 | >>> find_object_in_list('name', 'xyz', [{'name': 'foo'}, {'name': 'bar'}]) is None | |
814 | True | |
815 | ||
816 | >>> find_object_in_list('foo', 'bar', [{'xyz': 4815162342}]) is None | |
817 | True | |
818 | ||
819 | >>> find_object_in_list('foo', 'bar', []) is None | |
820 | True | |
821 | ||
822 | :param key: The name of the key. | |
823 | :param value: The value to search for. | |
824 | :param iterable: The list to process. | |
825 | :return: Returns the found object or None. | |
826 | """ | |
827 | for obj in iterable: | |
828 | if key in obj and obj[key] == value: | |
829 | return obj | |
830 | return None | |
a4b75251 TL |
831 | |
832 | ||
833 | def merge_list_of_dicts_by_key(target_list: list, source_list: list, key: str): | |
834 | target_list = {d[key]: d for d in target_list} | |
835 | for sdict in source_list: | |
836 | if bool(sdict): | |
837 | if sdict[key] in target_list: | |
838 | target_list[sdict[key]].update(sdict) | |
839 | target_list = [value for value in target_list.values()] | |
840 | return target_list |