import ssl
from enum import Enum
-from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO, Sequence, TypeVar, cast, Set
+from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO, Sequence, TypeVar, cast, Set, Iterable
import re
import uuid
def EXE(cls) -> str:
raise NotImplementedError()
+ def __str__(self) -> str:
+ return f'{self.EXE} ({self.path})'
+
class Podman(ContainerEngine):
EXE = 'podman'
out, _, _ = call_throws(ctx, [self.path, 'version', '--format', '{{.Client.Version}}'])
self._version = _parse_podman_version(out)
+ def __str__(self) -> str:
+ version = '.'.join(map(str, self.version))
+ return f'{self.EXE} ({self.path}) version {version}'
+
class Docker(ContainerEngine):
EXE = 'docker'
'disable_existing_loggers': True,
'formatters': {
'cephadm': {
- 'format': '%(asctime)s %(levelname)s %(message)s'
+ 'format': '%(asctime)s %(thread)x %(levelname)s %(message)s'
},
},
'handlers': {
mounts[os.path.join(data_dir, 'keyring')] = '/etc/ceph/keyring:z'
mounts[os.path.join(data_dir, 'iscsi-gateway.cfg')] = '/etc/ceph/iscsi-gateway.cfg:z'
mounts[os.path.join(data_dir, 'configfs')] = '/sys/kernel/config'
- mounts[log_dir] = '/var/log/rbd-target-api:z'
+ mounts[log_dir] = '/var/log:z'
mounts['/dev'] = '/dev'
return mounts
def __init__(self) -> None:
self._pid_counter = itertools.count(0)
- self._threads = {}
+ self._threads: Dict[Any, Any] = {}
- def is_active(self):
+ def is_active(self) -> bool:
return True
- def close(self):
+ def close(self) -> None:
self._join_threads()
- def _join_threads(self):
+ def _join_threads(self) -> None:
"""Internal: Join all non-daemon threads"""
threads = [thread for thread in list(self._threads.values())
if thread.is_alive() and not thread.daemon]
for thread in threads:
thread.join()
- def __enter__(self):
+ def __enter__(self) -> Any:
return self
- def __exit__(self, exc_type, exc_val, exc_tb):
+ def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
pass
- def __del__(self, _warn=warnings.warn):
+ def __del__(self, _warn: Any = warnings.warn) -> None:
threads = [thread for thread in list(self._threads.values())
if thread.is_alive()]
if threads:
ResourceWarning,
source=self)
- def add_child_handler(self, pid, callback, *args):
+ def add_child_handler(self, pid: Any, callback: Any, *args: Any) -> None:
loop = events.get_event_loop()
thread = threading.Thread(target=self._do_waitpid,
name=f'waitpid-{next(self._pid_counter)}',
self._threads[pid] = thread
thread.start()
- def remove_child_handler(self, pid):
+ def remove_child_handler(self, pid: Any) -> bool:
# asyncio never calls remove_child_handler() !!!
# The method is no-op but is implemented because
# abstract base classe requires it
return True
- def attach_loop(self, loop):
+ def attach_loop(self, loop: Any) -> None:
pass
- def _do_waitpid(self, loop, expected_pid, callback, args):
+ def _do_waitpid(self, loop: Any, expected_pid: Any, callback: Any, args: Any) -> None:
assert expected_pid > 0
try:
prefix += ': '
timeout = timeout or ctx.timeout
- logger.debug('Running command: %s' % ' '.join(command))
-
async def tee(reader: asyncio.StreamReader) -> str:
collected = StringIO()
async for line in reader:
for i in CONTAINER_PREFERENCE:
try:
return i()
- except Exception as e:
- logger.debug('Could not locate %s: %s' % (i.EXE, e))
+ except Exception:
+ pass
return None
-def check_container_engine(ctx):
- # type: (CephadmContext) -> None
+def check_container_engine(ctx: CephadmContext) -> ContainerEngine:
engine = ctx.container_engine
if not isinstance(engine, CONTAINER_PREFERENCE):
# See https://github.com/python/mypy/issues/8993
engine.get_version(ctx)
if engine.version < MIN_PODMAN_VERSION:
raise Error('podman version %d.%d.%d or later is required' % MIN_PODMAN_VERSION)
+ return engine
def get_unit_name(fsid, daemon_type, daemon_id=None):
d = get_parm(ctx.config_json)
config = d.get('config')
keyring = d.get('keyring')
+ if config and keyring:
+ return config, keyring
if 'config' in ctx and ctx.config:
try:
f.write(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=True)) + '\n')
ceph_iscsi = CephIscsi.init(ctx, fsid, daemon_id)
tcmu_container = ceph_iscsi.get_tcmu_runner_container()
- _write_container_cmd_to_bash(ctx, f, tcmu_container, 'iscsi tcmu-runnter container', background=True)
+ _write_container_cmd_to_bash(ctx, f, tcmu_container, 'iscsi tcmu-runner container', background=True)
_write_container_cmd_to_bash(ctx, f, c, '%s.%s' % (daemon_type, str(daemon_id)))
def command_pull(ctx):
# type: (CephadmContext) -> int
- _pull_image(ctx, ctx.image)
+ _pull_image(ctx, ctx.image, ctx.insecure)
return command_inspect_image(ctx)
-def _pull_image(ctx, image):
- # type: (CephadmContext, str) -> None
+def _pull_image(ctx, image, insecure=False):
+ # type: (CephadmContext, str, bool) -> None
logger.info('Pulling container image %s...' % image)
ignorelist = [
]
cmd = [ctx.container_engine.path, 'pull', image]
- if isinstance(ctx.container_engine, Podman) and os.path.exists('/etc/ceph/podman-auth.json'):
- cmd.append('--authfile=/etc/ceph/podman-auth.json')
+ if isinstance(ctx.container_engine, Podman):
+ if insecure:
+ cmd.append('--tls-verify=false')
+
+ if os.path.exists('/etc/ceph/podman-auth.json'):
+ cmd.append('--authfile=/etc/ceph/podman-auth.json')
cmd_str = ' '.join(cmd)
for sleep_secs in [1, 4, 25]:
return
if not any(pattern in err for pattern in ignorelist):
- raise RuntimeError('Failed command: %s' % cmd_str)
+ raise Error('Failed command: %s' % cmd_str)
logger.info('`%s` failed transiently. Retrying. waiting %s seconds...' % (cmd_str, sleep_secs))
time.sleep(sleep_secs)
- raise RuntimeError('Failed command: %s: maximum retries reached' % cmd_str)
+ raise Error('Failed command: %s: maximum retries reached' % cmd_str)
##################################
logger.info('Adjusting default settings to suit single-host cluster...')
# replicate across osds, not hosts
if (
- not cp.has_option('global', 'osd_crush_choose_leaf_type')
- and not cp.has_option('global', 'osd crush choose leaf type')
+ not cp.has_option('global', 'osd_crush_chooseleaf_type')
+ and not cp.has_option('global', 'osd crush chooseleaf type')
):
- cp.set('global', 'osd_crush_choose_leaf_type', '0')
+ cp.set('global', 'osd_crush_chooseleaf_type', '0')
# replica 2x
if (
not cp.has_option('global', 'osd_pool_default_size')
pass
+# funcs to process spec file for apply spec
+def _parse_yaml_docs(f: Iterable[str]) -> List[List[str]]:
+ docs = []
+ current_doc = [] # type: List[str]
+ for line in f:
+ if '---' in line:
+ if current_doc:
+ docs.append(current_doc)
+ current_doc = []
+ else:
+ current_doc.append(line.rstrip())
+ if current_doc:
+ docs.append(current_doc)
+ return docs
+
+
+def _parse_yaml_obj(doc: List[str]) -> Dict[str, str]:
+ # note: this only parses the first layer of yaml
+ obj = {} # type: Dict[str, str]
+ current_key = ''
+ for line in doc:
+ if line.startswith(' '):
+ obj[current_key] += line.strip()
+ elif line.endswith(':'):
+ current_key = line.strip(':')
+ obj[current_key] = ''
+ else:
+ current_key, val = line.split(':')
+ obj[current_key] = val.strip()
+ return obj
+
+
+def parse_yaml_objs(f: Iterable[str]) -> List[Dict[str, str]]:
+ objs = []
+ for d in _parse_yaml_docs(f):
+ objs.append(_parse_yaml_obj(d))
+ return objs
+
+
+def _distribute_ssh_keys(ctx: CephadmContext, host_spec: Dict[str, str], bootstrap_hostname: str) -> int:
+ # copy ssh key to hosts in host spec (used for apply spec)
+ ssh_key = '/etc/ceph/ceph.pub'
+ if ctx.ssh_public_key:
+ ssh_key = ctx.ssh_public_key.name
+
+ if bootstrap_hostname != host_spec['hostname']:
+ if 'addr' in host_spec:
+ addr = host_spec['addr']
+ else:
+ addr = host_spec['hostname']
+ out, err, code = call(ctx, ['sudo', '-u', ctx.ssh_user, 'ssh-copy-id', '-f', '-i', ssh_key, '-o StrictHostKeyChecking=no', '%s@%s' % (ctx.ssh_user, addr)])
+ if code:
+ logger.info('\nCopying ssh key to host %s at address %s failed!\n' % (host_spec['hostname'], addr))
+ return 1
+ else:
+ logger.info('Added ssh key to host %s at address %s\n' % (host_spec['hostname'], addr))
+ return 0
+
+
@default_image
def command_bootstrap(ctx):
# type: (CephadmContext) -> int
if ctx.apply_spec:
logger.info('Applying %s to cluster' % ctx.apply_spec)
-
+ # copy ssh key to hosts in spec file
with open(ctx.apply_spec) as f:
- for line in f:
- if 'hostname:' in line:
- line = line.replace('\n', '')
- split = line.split(': ')
- if split[1] != hostname:
- logger.info('Adding ssh key to %s' % split[1])
-
- ssh_key = '/etc/ceph/ceph.pub'
- if ctx.ssh_public_key:
- ssh_key = ctx.ssh_public_key.name
- out, err, code = call_throws(ctx, ['sudo', '-u', ctx.ssh_user, 'ssh-copy-id', '-f', '-i', ssh_key, '-o StrictHostKeyChecking=no', '%s@%s' % (ctx.ssh_user, split[1])])
+ try:
+ for spec in parse_yaml_objs(f):
+ if spec.get('service_type') == 'host':
+ _distribute_ssh_keys(ctx, spec, hostname)
+ except ValueError:
+ logger.info('Unable to parse %s succesfully' % ctx.apply_spec)
mounts = {}
- mounts[pathify(ctx.apply_spec)] = '/tmp/spec.yml:z'
-
- out = cli(['orch', 'apply', '-i', '/tmp/spec.yml'], extra_mounts=mounts)
- logger.info(out)
+ mounts[pathify(ctx.apply_spec)] = '/tmp/spec.yml:ro'
+ try:
+ out = cli(['orch', 'apply', '-i', '/tmp/spec.yml'], extra_mounts=mounts)
+ logger.info(out)
+ except Exception:
+ logger.info('\nApplying %s to cluster failed!\n' % ctx.apply_spec)
logger.info('You can access the Ceph CLI with:\n\n'
'\tsudo %s shell --fsid %s -c %s -k %s\n' % (
mount = pathify(split_src_dst[0])
filename = os.path.basename(split_src_dst[0])
if len(split_src_dst) > 1:
- dst = split_src_dst[1] + ':z' if len(split_src_dst) == 3 else split_src_dst[1]
+ dst = split_src_dst[1]
+ if len(split_src_dst) == 3:
+ dst = '{}:{}'.format(dst, split_src_dst[2])
mounts[mount] = dst
else:
- mounts[mount] = '/mnt/{}:z'.format(filename)
+ mounts[mount] = '/mnt/{}'.format(filename)
if ctx.command:
command = ctx.command
else:
def command_check_host(ctx: CephadmContext) -> None:
- container_path = ctx.container_engine.path
-
errors = []
commands = ['systemctl', 'lvcreate']
try:
- check_container_engine(ctx)
- logger.info('podman|docker (%s) is present' % container_path)
+ engine = check_container_engine(ctx)
+ logger.info(f'{engine} is present')
except Error as e:
errors.append(str(e))
'rocky': ('centos', 'el'),
'almalinux': ('centos', 'el'),
'fedora': ('fedora', 'fc'),
+ 'mariner': ('mariner', 'cm'),
}
def __init__(self, ctx: CephadmContext,
if (self.distro_code == 'fc' and self.major >= 30) or \
(self.distro_code == 'el' and self.major >= 8):
self.tool = 'dnf'
+ elif (self.distro_code == 'cm'):
+ self.tool = 'tdnf'
else:
self.tool = 'yum'
_disk_vendor_workarounds = {
'0x1af4': 'Virtio Block Device'
}
+ _excluded_block_devices = ('sr', 'zram', 'dm-')
def __init__(self, ctx: CephadmContext):
self.ctx: CephadmContext = ctx
# type: () -> List[str]
"""Determine the list of block devices by looking at /sys/block"""
return [dev for dev in os.listdir('/sys/block')
- if not dev.startswith('dm')]
+ if not dev.startswith(HostFacts._excluded_block_devices)]
def _get_devs_by_type(self, rota='0'):
# type: (str) -> List[str]
##################################
-def systemd_target_state(target_name: str, subsystem: str = 'ceph') -> bool:
+def systemd_target_state(ctx: CephadmContext, target_name: str, subsystem: str = 'ceph') -> bool:
# TODO: UNITTEST
return os.path.exists(
os.path.join(
- UNIT_DIR,
+ ctx.unit_dir,
f'{subsystem}.target.wants',
target_name
)
)
+def target_exists(ctx: CephadmContext) -> bool:
+ return os.path.exists(ctx.unit_dir + '/ceph.target')
+
+
@infer_fsid
def command_maintenance(ctx: CephadmContext) -> str:
if not ctx.fsid:
- raise Error('must pass --fsid to specify cluster')
+ raise Error('failed - must pass --fsid to specify cluster')
target = f'ceph-{ctx.fsid}.target'
if ctx.maintenance_action.lower() == 'enter':
logger.info('Requested to place host into maintenance')
- if systemd_target_state(target):
+ if systemd_target_state(ctx, target):
_out, _err, code = call(ctx,
['systemctl', 'disable', target],
verbosity=CallVerbosity.DEBUG)
else:
logger.info('Requested to exit maintenance state')
+ # if we've never deployed a daemon on this host there will be no systemd
+ # target to disable so attempting a disable will fail. We still need to
+ # return success here or host will be permanently stuck in maintenance mode
+ # as no daemons can be deployed so no systemd target will ever exist to disable.
+ if not target_exists(ctx):
+ return 'skipped - systemd target not present on this host. Host removed from maintenance mode.'
# exit maintenance request
- if not systemd_target_state(target):
+ if not systemd_target_state(ctx, target):
_out, _err, code = call(ctx,
['systemctl', 'enable', target],
verbosity=CallVerbosity.DEBUG)
parser_pull = subparsers.add_parser(
'pull', help='pull latest image version')
parser_pull.set_defaults(func=command_pull)
+ parser_pull.add_argument(
+ '--insecure',
+ action='store_true',
+ help=argparse.SUPPRESS,
+ )
parser_inspect_image = subparsers.add_parser(
'inspect-image', help='inspect local container image')
for handler in logger.handlers:
if handler.name == 'console':
handler.setLevel(logging.DEBUG)
-
+ logger.debug('%s\ncephadm %s' % ('-' * 80, args))
return ctx
# podman or docker?
ctx.container_engine = find_container_engine(ctx)
if ctx.func not in \
- [command_check_host, command_prepare_host, command_add_repo, command_install]:
+ [
+ command_check_host,
+ command_prepare_host,
+ command_add_repo,
+ command_rm_repo,
+ command_install
+ ]:
check_container_engine(ctx)
# command handler
r = ctx.func(ctx)