import logging
import re
import json
+import datetime
from enum import Enum
from functools import wraps
from typing import Optional, Callable, TypeVar, List, NewType, TYPE_CHECKING
logger = logging.getLogger(__name__)
ConfEntity = NewType('ConfEntity', str)
-AuthEntity = NewType('AuthEntity', str)
+
+DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
class CephadmNoImage(Enum):
return ConfEntity('mon')
-def name_to_auth_entity(daemon_type: str,
- daemon_id: str,
- host: str = "",
- ) -> AuthEntity:
- """
- Map from daemon names/host to ceph entity names (as seen in config)
- """
- if daemon_type in ['rgw', 'rbd-mirror', 'nfs', "iscsi"]:
- return AuthEntity('client.' + daemon_type + "." + daemon_id)
- elif daemon_type == 'crash':
- if host == "":
- raise OrchestratorError("Host not provided to generate <crash> auth entity name")
- return AuthEntity('client.' + daemon_type + "." + host)
- elif daemon_type == 'mon':
- return AuthEntity('mon.')
- elif daemon_type == 'mgr':
- return AuthEntity(daemon_type + "." + daemon_id)
- elif daemon_type in ['osd', 'mds', 'client']:
- return AuthEntity(daemon_type + "." + daemon_id)
- else:
- raise OrchestratorError("unknown auth entity name")
-
-
def forall_hosts(f: Callable[..., T]) -> Callable[..., List[T]]:
@wraps(f)
def forall_hosts_wrapper(*args) -> List[T]:
raise OrchestratorError('failed to parse health status')
return j['status']
+
+
+def is_repo_digest(image_name: str) -> bool:
+ """
+ repo digest are something like "ceph/ceph@sha256:blablabla"
+ """
+ return '@' in image_name
+
+
+def str_to_datetime(input: str) -> datetime.datetime:
+ return datetime.datetime.strptime(input, DATEFMT)
+
+
+def datetime_to_str(dt: datetime.datetime) -> str:
+ return dt.strftime(DATEFMT)