"wait",
]
+NFS_GANESHA_SUPPORTED_FSALS = ['CEPH', 'RGW']
+NFS_POOL_NAME = '.nfs'
+
class CommandResult(object):
"""
@staticmethod
def load_func_metadata(f: HandlerFuncType) -> Tuple[str, Dict[str, Any], int, str]:
- desc = inspect.getdoc(f) or ''
+ desc = (inspect.getdoc(f) or '').replace('\n', ' ')
full_argspec = inspect.getfullargspec(f)
arg_spec = full_argspec.annotations
first_default = len(arg_spec)
"""
self._ceph_send_command(result, svc_type, svc_id, command, tag, inbuf)
+ def tool_exec(
+ self,
+ args: List[str],
+ timeout: int = 10,
+ stdin: Optional[bytes] = None
+ ) -> Tuple[int, str, str]:
+ try:
+ tool = args.pop(0)
+ cmd = [
+ tool,
+ '-k', str(self.get_ceph_option('keyring')),
+ '-n', f'mgr.{self.get_mgr_id()}',
+ ] + args
+ self.log.debug('exec: ' + ' '.join(cmd))
+ p = subprocess.run(
+ cmd,
+ input=stdin,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ timeout=timeout,
+ )
+ except subprocess.TimeoutExpired as ex:
+ self.log.error(ex)
+ return -errno.ETIMEDOUT, '', str(ex)
+ if p.returncode:
+ self.log.error(f'Non-zero return from {cmd}: {p.stderr.decode()}')
+ return p.returncode, p.stdout.decode(), p.stderr.decode()
+
def set_health_checks(self, checks: HealthChecksT) -> None:
"""
Set the module's current map of health checks. Argument is a