]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/tools.py
2b6d92ca55f7d1ab90e4dadf2c380d4f7f2e79d7
1 # -*- coding: utf-8 -*-
2 from __future__
import absolute_import
12 from datetime
import datetime
, timedelta
13 from distutils
.util
import strtobool
18 from six
.moves
import urllib
22 from urlparse
import urljoin
24 from urllib
.parse
import urljoin
27 from .exceptions
import ViewCacheNoDataException
28 from .settings
import Settings
29 from .services
.auth
import JwtManager
32 from typing
import Any
, AnyStr
, Callable
, DefaultDict
, Deque
,\
33 Dict
, List
, Set
, Tuple
, Union
# noqa pylint: disable=unused-import
35 pass # For typing only
38 class RequestLoggingTool(cherrypy
.Tool
):
40 cherrypy
.Tool
.__init
__(self
, 'before_handler', self
.request_begin
,
42 self
.logger
= logging
.getLogger('request')
45 cherrypy
.Tool
._setup
(self
)
46 cherrypy
.request
.hooks
.attach('on_end_request', self
.request_end
,
48 cherrypy
.request
.hooks
.attach('after_error_response', self
.request_error
,
51 def request_begin(self
):
52 req
= cherrypy
.request
53 user
= JwtManager
.get_username()
55 self
.logger
.debug('[%s:%s] [%s] [%s] %s', req
.remote
.ip
, req
.remote
.port
,
56 req
.method
, user
, req
.path_info
)
58 if Settings
.AUDIT_API_ENABLED
and req
.method
not in ['GET']:
59 url
= build_url(req
.remote
.ip
, scheme
=req
.scheme
,
61 msg
= '[DASHBOARD] from=\'{}\' path=\'{}\' method=\'{}\' ' \
62 'user=\'{}\''.format(url
, req
.path_info
, req
.method
, user
)
63 if Settings
.AUDIT_API_LOG_PAYLOAD
:
64 params
= dict(req
.params
or {}, **get_request_body_params(req
))
65 # Hide sensitive data like passwords, secret keys, ...
66 # Extend the list of patterns to search for if necessary.
67 # Currently parameters like this are processed:
70 # - new_passwd_to_login
72 for key
in ['password', 'passwd', 'secret']:
73 keys
.extend([x
for x
in params
.keys() if key
in x
])
76 msg
= '{} params=\'{}\''.format(msg
, json
.dumps(params
))
77 mgr
.cluster_log('audit', mgr
.CLUSTER_LOG_PRIO_INFO
, msg
)
79 def request_error(self
):
80 self
._request
_log
(self
.logger
.error
)
81 self
.logger
.error(cherrypy
.response
.body
)
83 def request_end(self
):
84 status
= cherrypy
.response
.status
[:3]
85 if status
in ["401", "403"]:
86 # log unauthorized accesses
87 self
._request
_log
(self
.logger
.warning
)
89 self
._request
_log
(self
.logger
.info
)
91 def _format_bytes(self
, num
):
92 units
= ['B', 'K', 'M', 'G']
94 if isinstance(num
, str):
100 format_str
= "{:.0f}{}"
101 for i
, unit
in enumerate(units
):
103 if num
< 2**(10*(i
+1)):
108 format_str
= "{:.1f}{}"
109 return format_str
.format(num
/div
, unit
[0])
111 # content-length bigger than 1T!! return value in bytes
112 return "{}B".format(num
)
114 def _request_log(self
, logger_fn
):
115 req
= cherrypy
.request
116 res
= cherrypy
.response
117 lat
= time
.time() - res
.time
118 user
= JwtManager
.get_username()
119 status
= res
.status
[:3] if isinstance(res
.status
, str) else res
.status
120 if 'Content-Length' in res
.headers
:
121 length
= self
._format
_bytes
(res
.headers
['Content-Length'])
123 length
= self
._format
_bytes
(0)
125 logger_fn("[%s:%s] [%s] [%s] [%s] [%s] [%s] %s", req
.remote
.ip
,
126 req
.remote
.port
, req
.method
, status
,
127 "{0:.3f}s".format(lat
), user
, length
, req
.path_info
)
129 logger_fn("[%s:%s] [%s] [%s] [%s] [%s] [%s] %s", req
.remote
.ip
,
130 req
.remote
.port
, req
.method
, status
,
131 "{0:.3f}s".format(lat
), length
, getattr(req
, 'unique_id', '-'), req
.path_info
)
134 # pylint: disable=too-many-instance-attributes
135 class ViewCache(object):
140 class GetterThread(threading
.Thread
):
141 def __init__(self
, view
, fn
, args
, kwargs
):
142 super(ViewCache
.GetterThread
, self
).__init
__()
144 self
.event
= threading
.Event()
149 # pylint: disable=broad-except
155 self
._view
.logger
.debug("starting execution of %s", self
.fn
)
156 val
= self
.fn(*self
.args
, **self
.kwargs
)
158 except Exception as ex
:
159 with self
._view
.lock
:
160 self
._view
.logger
.exception("Error while calling fn=%s ex=%s", self
.fn
,
162 self
._view
.value
= None
163 self
._view
.value_when
= None
164 self
._view
.getter_thread
= None
165 self
._view
.exception
= ex
167 with self
._view
.lock
:
168 self
._view
.latency
= t1
- t0
169 self
._view
.value
= val
170 self
._view
.value_when
= datetime
.now()
171 self
._view
.getter_thread
= None
172 self
._view
.exception
= None
174 self
._view
.logger
.debug("execution of %s finished in: %s", self
.fn
,
178 class RemoteViewCache(object):
179 # Return stale data if
182 def __init__(self
, timeout
):
183 self
.getter_thread
= None
184 # Consider data within 1s old to be sufficiently fresh
185 self
.timeout
= timeout
186 self
.event
= threading
.Event()
187 self
.value_when
= None
190 self
.exception
= None
191 self
.lock
= threading
.Lock()
192 self
.logger
= logging
.getLogger('viewcache')
196 self
.value_when
= None
199 def run(self
, fn
, args
, kwargs
):
201 If data less than `stale_period` old is available, return it
203 If an attempt to fetch data does not complete within `timeout`, then
204 return the most recent data available, with a status to indicate that
207 Initialization does not count towards the timeout, so the first call
208 on one of these objects during the process lifetime may be slower
209 than subsequent calls.
211 :return: 2-tuple of value status code, value
215 if self
.value_when
and now
- self
.value_when
< timedelta(
216 seconds
=self
.STALE_PERIOD
):
217 return ViewCache
.VALUE_OK
, self
.value
219 if self
.getter_thread
is None:
220 self
.getter_thread
= ViewCache
.GetterThread(self
, fn
, args
,
222 self
.getter_thread
.start()
224 self
.logger
.debug("getter_thread still alive for: %s", fn
)
226 ev
= self
.getter_thread
.event
228 success
= ev
.wait(timeout
=self
.timeout
)
232 # We fetched the data within the timeout
234 # execution raised an exception
235 # pylint: disable=raising-bad-type
237 return ViewCache
.VALUE_OK
, self
.value
238 if self
.value_when
is not None:
239 # We have some data, but it doesn't meet freshness requirements
240 return ViewCache
.VALUE_STALE
, self
.value
241 # We have no data, not even stale data
242 raise ViewCacheNoDataException()
244 def __init__(self
, timeout
=5):
245 self
.timeout
= timeout
246 self
.cache_by_args
= {}
248 def __call__(self
, fn
):
249 def wrapper(*args
, **kwargs
):
250 rvc
= self
.cache_by_args
.get(args
, None)
252 rvc
= ViewCache
.RemoteViewCache(self
.timeout
)
253 self
.cache_by_args
[args
] = rvc
254 return rvc
.run(fn
, args
, kwargs
)
255 wrapper
.reset
= self
.reset
# type: ignore
259 for _
, rvc
in self
.cache_by_args
.items():
263 class NotificationQueue(threading
.Thread
):
264 _ALL_TYPES_
= '__ALL__'
265 _listeners
= collections
.defaultdict(set) # type: DefaultDict[str, Set[Tuple[int, Callable]]]
266 _lock
= threading
.Lock()
267 _cond
= threading
.Condition()
268 _queue
= collections
.deque() # type: Deque[Tuple[str, Any]]
273 super(NotificationQueue
, self
).__init
__()
276 def start_queue(cls
):
279 # the queue thread is already running
282 cls
._instance
= NotificationQueue()
283 cls
.logger
= logging
.getLogger('notification_queue') # type: ignore
284 cls
.logger
.debug("starting notification queue") # type: ignore
285 cls
._instance
.start()
290 if not cls
._instance
:
291 # the queue thread was not started
293 instance
= cls
._instance
298 cls
.logger
.debug("waiting for notification queue to finish") # type: ignore
300 cls
.logger
.debug("notification queue stopped") # type: ignore
303 def _registered_handler(cls
, func
, n_types
):
304 for _
, reg_func
in cls
._listeners
[n_types
]:
310 def register(cls
, func
, n_types
=None, priority
=1):
311 """Registers function to listen for notifications
313 If the second parameter `n_types` is omitted, the function in `func`
314 parameter will be called for any type of notifications.
317 func (function): python function ex: def foo(val)
318 n_types (str|list): the single type to listen, or a list of types
319 priority (int): the priority level (1=max, +inf=min)
323 n_types
= [cls
._ALL
_TYPES
_]
324 elif isinstance(n_types
, str):
326 elif not isinstance(n_types
, list):
327 raise Exception("n_types param is neither a string nor a list")
328 for ev_type
in n_types
:
329 if not cls
._registered
_handler
(func
, ev_type
):
330 cls
._listeners
[ev_type
].add((priority
, func
))
331 cls
.logger
.debug( # type: ignore
332 "function %s was registered for events of type %s",
337 def deregister(cls
, func
, n_types
=None):
338 # type: (Callable, Union[str, list, None]) -> None
339 """Removes the listener function from this notification queue
341 If the second parameter `n_types` is omitted, the function is removed
342 from all event types, otherwise the function is removed only for the
343 specified event types.
346 func (function): python function
347 n_types (str|list): the single event type, or a list of event types
351 n_types
= list(cls
._listeners
.keys())
352 elif isinstance(n_types
, str):
354 elif not isinstance(n_types
, list):
355 raise Exception("n_types param is neither a string nor a list")
356 for ev_type
in n_types
:
357 listeners
= cls
._listeners
[ev_type
]
359 for pr
, fn
in listeners
:
364 listeners
.discard(to_remove
)
365 cls
.logger
.debug( # type: ignore
366 "function %s was deregistered for events of type %s",
371 def new_notification(cls
, notify_type
, notify_value
):
372 # type: (str, Any) -> None
374 cls
._queue
.append((notify_type
, notify_value
))
378 def _notify_listeners(cls
, events
):
380 notify_type
, notify_value
= ev
382 listeners
= list(cls
._listeners
[notify_type
])
383 listeners
.extend(cls
._listeners
[cls
._ALL
_TYPES
_])
384 listeners
.sort(key
=lambda lis
: lis
[0])
385 for listener
in listeners
:
386 listener
[1](notify_value
)
389 self
.logger
.debug("notification queue started") # type: ignore
392 self
.logger
.debug("processing queue: %s", len(self
._queue
)) # type: ignore
395 private_buffer
.append(self
._queue
.popleft())
398 self
._notify
_listeners
(private_buffer
)
400 while self
._running
and not self
._queue
:
402 # flush remaining events
403 self
.logger
.debug("flush remaining events: %s", len(self
._queue
)) # type: ignore
404 self
._notify
_listeners
(self
._queue
)
406 self
.logger
.debug("notification queue finished") # type: ignore
409 # pylint: disable=too-many-arguments, protected-access
410 class TaskManager(object):
411 FINISHED_TASK_SIZE
= 10
412 FINISHED_TASK_TTL
= 60.0
415 VALUE_EXECUTING
= "executing"
417 _executing_tasks
= set() # type: Set[Task]
418 _finished_tasks
= [] # type: List[Task]
419 _lock
= threading
.Lock()
421 _task_local_data
= threading
.local()
425 cls
.logger
= logging
.getLogger('taskmgr') # type: ignore
426 NotificationQueue
.register(cls
._handle
_finished
_task
, 'cd_task_finished')
429 def _handle_finished_task(cls
, task
):
430 cls
.logger
.info("finished %s", task
) # type: ignore
432 cls
._executing
_tasks
.remove(task
)
433 cls
._finished
_tasks
.append(task
)
436 def run(cls
, name
, metadata
, fn
, args
=None, kwargs
=None, executor
=None,
437 exception_handler
=None):
443 executor
= ThreadedExecutor()
444 task
= Task(name
, metadata
, fn
, args
, kwargs
, executor
,
447 if task
in cls
._executing
_tasks
:
448 cls
.logger
.debug("task already executing: %s", task
) # type: ignore
449 for t
in cls
._executing
_tasks
:
452 cls
.logger
.debug("created %s", task
) # type: ignore
453 cls
._executing
_tasks
.add(task
)
454 cls
.logger
.info("running %s", task
) # type: ignore
459 def current_task(cls
):
461 Returns the current task object.
462 This method should only be called from a threaded task operation code.
464 return cls
._task
_local
_data
.task
467 def _cleanup_old_tasks(cls
, task_list
):
469 The cleanup rule is: maintain the FINISHED_TASK_SIZE more recent
470 finished tasks, and the rest is maintained up to the FINISHED_TASK_TTL
474 for idx
, t
in enumerate(task_list
):
475 if idx
< cls
.FINISHED_TASK_SIZE
:
477 if now
- datetime
.fromtimestamp(t
[1].end_time
) > \
478 timedelta(seconds
=cls
.FINISHED_TASK_TTL
):
479 del cls
._finished
_tasks
[t
[0]]
482 def list(cls
, name_glob
=None):
486 for task
in cls
._executing
_tasks
:
487 if not name_glob
or fnmatch
.fnmatch(task
.name
, name_glob
):
488 executing_tasks
.append(task
)
489 for idx
, task
in enumerate(cls
._finished
_tasks
):
490 if not name_glob
or fnmatch
.fnmatch(task
.name
, name_glob
):
491 finished_tasks
.append((idx
, task
))
492 finished_tasks
.sort(key
=lambda t
: t
[1].end_time
, reverse
=True)
493 cls
._cleanup
_old
_tasks
(finished_tasks
)
494 executing_tasks
.sort(key
=lambda t
: t
.begin_time
, reverse
=True)
495 return executing_tasks
, [t
[1] for t
in finished_tasks
]
498 def list_serializable(cls
, ns_glob
=None):
499 ex_t
, fn_t
= cls
.list(ns_glob
)
502 'metadata': t
.metadata
,
503 'begin_time': "{}Z".format(datetime
.fromtimestamp(t
.begin_time
).isoformat()),
504 'progress': t
.progress
505 } for t
in ex_t
if t
.begin_time
], [{
507 'metadata': t
.metadata
,
508 'begin_time': "{}Z".format(datetime
.fromtimestamp(t
.begin_time
).isoformat()),
509 'end_time': "{}Z".format(datetime
.fromtimestamp(t
.end_time
).isoformat()),
510 'duration': t
.duration
,
511 'progress': t
.progress
,
512 'success': not t
.exception
,
513 'ret_value': t
.ret_value
if not t
.exception
else None,
514 'exception': t
.ret_value
if t
.exception
and t
.ret_value
else (
515 {'detail': str(t
.exception
)} if t
.exception
else None)
519 # pylint: disable=protected-access
520 class TaskExecutor(object):
522 self
.logger
= logging
.getLogger('taskexec')
525 def init(self
, task
):
528 # pylint: disable=broad-except
530 self
.logger
.debug("executing task %s", self
.task
)
532 self
.task
.fn(*self
.task
.fn_args
, **self
.task
.fn_kwargs
) # type: ignore
533 except Exception as ex
:
534 self
.logger
.exception("Error while calling %s", self
.task
)
535 self
.finish(None, ex
)
537 def finish(self
, ret_value
, exception
):
539 self
.logger
.debug("successfully finished task: %s", self
.task
)
541 self
.logger
.debug("task finished with exception: %s", self
.task
)
542 self
.task
._complete
(ret_value
, exception
) # type: ignore
545 # pylint: disable=protected-access
546 class ThreadedExecutor(TaskExecutor
):
548 super(ThreadedExecutor
, self
).__init
__()
549 self
._thread
= threading
.Thread(target
=self
._run
)
554 # pylint: disable=broad-except
556 TaskManager
._task
_local
_data
.task
= self
.task
558 self
.logger
.debug("executing task %s", self
.task
)
559 val
= self
.task
.fn(*self
.task
.fn_args
, **self
.task
.fn_kwargs
) # type: ignore
560 except Exception as ex
:
561 self
.logger
.exception("Error while calling %s", self
.task
)
562 self
.finish(None, ex
)
564 self
.finish(val
, None)
568 def __init__(self
, name
, metadata
, fn
, args
, kwargs
, executor
,
569 exception_handler
=None):
571 self
.metadata
= metadata
574 self
.fn_kwargs
= kwargs
575 self
.executor
= executor
576 self
.ex_handler
= exception_handler
578 self
.event
= threading
.Event()
580 self
.ret_value
= None
581 self
.begin_time
= None
584 self
.exception
= None
585 self
.logger
= logging
.getLogger('task')
586 self
.lock
= threading
.Lock()
589 return hash((self
.name
, tuple(sorted(self
.metadata
.items()))))
591 def __eq__(self
, other
):
592 return self
.name
== other
.name
and self
.metadata
== other
.metadata
595 return "Task(ns={}, md={})" \
596 .format(self
.name
, self
.metadata
)
602 NotificationQueue
.register(self
._handle
_task
_finished
, 'cd_task_finished', 100)
604 assert not self
.running
605 self
.executor
.init(self
)
606 self
.set_progress(0, in_lock
=True)
607 self
.begin_time
= time
.time()
609 self
.executor
.start()
611 def _complete(self
, ret_value
, exception
=None):
613 if exception
and self
.ex_handler
:
614 # pylint: disable=broad-except
616 ret_value
= self
.ex_handler(exception
, task
=self
)
617 except Exception as ex
:
620 assert self
.running
, "_complete cannot be called before _run"
622 self
.ret_value
= ret_value
623 self
.exception
= exception
624 self
.duration
= now
- self
.begin_time
# type: ignore
625 if not self
.exception
:
626 self
.set_progress(100, True)
627 NotificationQueue
.new_notification('cd_task_finished', self
)
628 self
.logger
.debug("execution of %s finished in: %s s", self
,
631 def _handle_task_finished(self
, task
):
633 NotificationQueue
.deregister(self
._handle
_task
_finished
)
636 def wait(self
, timeout
=None):
638 assert self
.running
, "wait cannot be called before _run"
641 success
= ev
.wait(timeout
=timeout
)
644 # the action executed within the timeout
646 # pylint: disable=raising-bad-type
647 # execution raised an exception
649 return TaskManager
.VALUE_DONE
, self
.ret_value
650 # the action is still executing
651 return TaskManager
.VALUE_EXECUTING
, None
653 def inc_progress(self
, delta
, in_lock
=False):
654 if not isinstance(delta
, int) or delta
< 0:
655 raise Exception("Progress delta value must be a positive integer")
658 prog
= self
.progress
+ delta
# type: ignore
659 self
.progress
= prog
if prog
<= 100 else 100
663 def set_progress(self
, percentage
, in_lock
=False):
664 if not isinstance(percentage
, int) or percentage
< 0 or percentage
> 100:
665 raise Exception("Progress value must be in percentage "
666 "(0 <= percentage <= 100)")
669 self
.progress
= percentage
674 def build_url(host
, scheme
=None, port
=None):
676 Build a valid URL. IPv6 addresses specified in host will be enclosed in brackets
679 >>> build_url('example.com', 'https', 443)
680 'https://example.com:443'
682 >>> build_url(host='example.com', port=443)
685 >>> build_url('fce:9af7:a667:7286:4917:b8d3:34df:8373', port=80, scheme='http')
686 'http://[fce:9af7:a667:7286:4917:b8d3:34df:8373]:80'
688 :param scheme: The scheme, e.g. http, https or ftp.
690 :param host: Consisting of either a registered name (including but not limited to
691 a hostname) or an IP address.
702 ipaddress
.IPv6Address(u_host
)
703 netloc
= '[{}]'.format(host
)
707 netloc
+= ':{}'.format(port
)
708 pr
= urllib
.parse
.ParseResult(
709 scheme
=scheme
if scheme
else '',
718 def prepare_url_prefix(url_prefix
):
720 return '' if no prefix, or '/prefix' without slash in the end.
722 url_prefix
= urljoin('/', url_prefix
)
723 return url_prefix
.rstrip('/')
726 def dict_contains_path(dct
, keys
):
728 Tests whether the keys exist recursively in `dictionary`.
735 if not isinstance(dct
, dict):
740 return dict_contains_path(dct
, keys
)
745 def dict_get(obj
, path
, default
=None):
747 Get the value at any depth of a nested object based on the path
748 described by `path`. If path doesn't exist, `default` is returned.
751 for part
in path
.split('.'):
752 if not isinstance(current
, dict):
754 if part
not in current
.keys():
756 current
= current
.get(part
, {})
760 if sys
.version_info
> (3, 0):
761 wraps
= functools
.wraps
762 _getargspec
= inspect
.getfullargspec
765 def decorator(wrapper
):
766 new_wrapper
= functools
.wraps(func
)(wrapper
)
767 new_wrapper
.__wrapped
__ = func
# set __wrapped__ even for Python 2
771 _getargspec
= inspect
.getargspec
774 def getargspec(func
):
777 func
= func
.__wrapped
__
778 except AttributeError:
780 # pylint: disable=deprecated-method
781 return _getargspec(func
)
784 def str_to_bool(val
):
786 Convert a string representation of truth to True or False.
788 >>> str_to_bool('true') and str_to_bool('yes') and str_to_bool('1') and str_to_bool(True)
791 >>> str_to_bool('false') and str_to_bool('no') and str_to_bool('0') and str_to_bool(False)
794 >>> str_to_bool('xyz')
795 Traceback (most recent call last):
797 ValueError: invalid truth value 'xyz'
799 :param val: The value to convert.
803 if isinstance(val
, bool):
805 return bool(strtobool(val
))
808 def json_str_to_object(value
): # type: (AnyStr) -> Any
810 It converts a JSON valid string representation to object.
812 >>> result = json_str_to_object('{"a": 1}')
813 >>> result == {'a': 1}
820 # json.loads accepts binary input from version >=3.6
821 value
= value
.decode('utf-8') # type: ignore
822 except AttributeError:
825 return json
.loads(value
)
828 def partial_dict(orig
, keys
): # type: (Dict, List[str]) -> Dict
830 It returns Dict containing only the selected keys of original Dict.
832 >>> partial_dict({'a': 1, 'b': 2}, ['b'])
835 return {k
: orig
[k
] for k
in keys
}
838 def get_request_body_params(request
):
840 Helper function to get parameters from the request body.
841 :param request The CherryPy request object.
842 :type request: cherrypy.Request
843 :return: A dictionary containing the parameters.
846 params
= {} # type: dict
847 if request
.method
not in request
.methods_with_bodies
:
850 content_type
= request
.headers
.get('Content-Type', '')
851 if content_type
in ['application/json', 'text/javascript']:
852 if not hasattr(request
, 'json'):
853 raise cherrypy
.HTTPError(400, 'Expected JSON body')
854 if isinstance(request
.json
, str):
855 params
.update(json
.loads(request
.json
))
857 params
.update(request
.json
)
862 def find_object_in_list(key
, value
, iterable
):
864 Get the first occurrence of an object within a list with
865 the specified key/value.
867 >>> find_object_in_list('name', 'bar', [{'name': 'foo'}, {'name': 'bar'}])
870 >>> find_object_in_list('name', 'xyz', [{'name': 'foo'}, {'name': 'bar'}]) is None
873 >>> find_object_in_list('foo', 'bar', [{'xyz': 4815162342}]) is None
876 >>> find_object_in_list('foo', 'bar', []) is None
879 :param key: The name of the key.
880 :param value: The value to search for.
881 :param iterable: The list to process.
882 :return: Returns the found object or None.
885 if key
in obj
and obj
[key
] == value
: