import struct
import ssl
from enum import Enum
-from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO, Sequence, TypeVar, cast, Set, Iterable, TextIO
+from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO, Sequence, TypeVar, cast, Set, Iterable, TextIO, Generator
import re
import uuid
from configparser import ConfigParser
-from contextlib import redirect_stdout
+from contextlib import redirect_stdout, contextmanager
from functools import wraps
from glob import glob
from io import StringIO
DEFAULT_GRAFANA_IMAGE = 'quay.io/ceph/ceph-grafana:9.4.7'
DEFAULT_HAPROXY_IMAGE = 'quay.io/ceph/haproxy:2.3'
DEFAULT_KEEPALIVED_IMAGE = 'quay.io/ceph/keepalived:2.2.4'
+DEFAULT_NVMEOF_IMAGE = 'quay.io/ceph/nvmeof:0.0.1'
DEFAULT_SNMP_GATEWAY_IMAGE = 'docker.io/maxwo/snmp-notifier:v1.2.1'
DEFAULT_ELASTICSEARCH_IMAGE = 'quay.io/omrizeneva/elasticsearch:6.8.23'
DEFAULT_JAEGER_COLLECTOR_IMAGE = 'quay.io/jaegertracing/jaeger-collector:1.29'
CEPH_DEFAULT_PUBKEY = f'/etc/ceph/{CEPH_PUBKEY}'
LOG_DIR_MODE = 0o770
DATA_DIR_MODE = 0o700
+DEFAULT_MODE = 0o600
CONTAINER_INIT = True
MIN_PODMAN_VERSION = (2, 0, 2)
CGROUPS_SPLIT_PODMAN_VERSION = (2, 1, 0)
DEFAULT_RETRY = 15
DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ'
QUIET_LOG_LEVEL = 9 # DEBUG is 10, so using 9 to be lower level than DEBUG
+NO_DEPRECATED = False
logger: logging.Logger = None # type: ignore
and self.version == other.version)
+class DeploymentType(Enum):
+ # Fresh deployment of a daemon.
+ DEFAULT = 'Deploy'
+ # Redeploying a daemon. Works the same as fresh
+ # deployment minus port checking.
+ REDEPLOY = 'Redeploy'
+ # Reconfiguring a daemon. Rewrites config
+ # files and potentially restarts daemon.
+ RECONFIG = 'Reconfig'
+
+
class BaseConfig:
def __init__(self) -> None:
pass
+class ClusterAlreadyExists(Exception):
+ pass
+
+
class TimeoutExpired(Error):
pass
class Ceph(object):
daemons = ('mon', 'mgr', 'osd', 'mds', 'rgw', 'rbd-mirror',
'crash', 'cephfs-mirror', 'ceph-exporter')
- gateways = ('iscsi', 'nfs')
+ gateways = ('iscsi', 'nfs', 'nvmeof')
##################################
@classmethod
def init(cls, ctx: CephadmContext, fsid: str,
daemon_id: Union[int, str]) -> 'SNMPGateway':
- assert ctx.config_json
- return cls(ctx, fsid, daemon_id,
- get_parm(ctx.config_json), ctx.image)
+ cfgs = fetch_configs(ctx)
+ assert cfgs # assert some config data was found
+ return cls(ctx, fsid, daemon_id, cfgs, ctx.image)
@staticmethod
def get_version(ctx: CephadmContext, fsid: str, daemon_id: str) -> Optional[str]:
@property
def port(self) -> int:
- if not self.ctx.tcp_ports:
+ endpoints = fetch_tcp_ports(self.ctx)
+ if not endpoints:
return self.DEFAULT_PORT
- else:
- if len(self.ctx.tcp_ports) > 0:
- return int(self.ctx.tcp_ports.split()[0])
- else:
- return self.DEFAULT_PORT
+ return endpoints[0].port
def get_daemon_args(self) -> List[str]:
v3_args = []
def create_daemon_conf(self) -> None:
"""Creates the environment file holding 'secrets' passed to the snmp-notifier daemon"""
- with open(os.open(self.conf_file_path, os.O_CREAT | os.O_WRONLY, 0o600), 'w') as f:
+ with write_new(self.conf_file_path) as f:
if self.snmp_version == 'V2c':
f.write(f'SNMP_NOTIFIER_COMMUNITY={self.snmp_community}\n')
else:
##################################
+@contextmanager
+def write_new(
+ destination: Union[str, Path],
+ *,
+ owner: Optional[Tuple[int, int]] = None,
+ perms: Optional[int] = DEFAULT_MODE,
+ encoding: Optional[str] = None,
+) -> Generator[IO, None, None]:
+ """Write a new file in a robust manner, optionally specifying the owner,
+ permissions, or encoding. This function takes care to never leave a file in
+ a partially-written state due to a crash or power outage by writing to
+ temporary file and then renaming that temp file over to the final
+ destination once all data is written. Note that the temporary files can be
+ leaked but only for a "crash" or power outage - regular exceptions will
+ clean up the temporary file.
+ """
+ destination = os.path.abspath(destination)
+ tempname = f'{destination}.new'
+ open_kwargs: Dict[str, Any] = {}
+ if encoding:
+ open_kwargs['encoding'] = encoding
+ try:
+ with open(tempname, 'w', **open_kwargs) as fh:
+ yield fh
+ fh.flush()
+ os.fsync(fh.fileno())
+ if owner is not None:
+ os.fchown(fh.fileno(), *owner)
+ if perms is not None:
+ os.fchmod(fh.fileno(), perms)
+ except Exception:
+ os.unlink(tempname)
+ raise
+ os.rename(tempname, destination)
+
+
def populate_files(config_dir, config_files, uid, gid):
# type: (str, Dict, int, int) -> None
"""create config files for different services"""
config_file = os.path.join(config_dir, fname)
config_content = dict_get_join(config_files, fname)
logger.info('Write file: %s' % (config_file))
- with open(config_file, 'w', encoding='utf-8') as f:
- os.fchown(f.fileno(), uid, gid)
- os.fchmod(f.fileno(), 0o600)
+ with write_new(config_file, owner=(uid, gid), encoding='utf-8') as f:
f.write(config_content)
@classmethod
def init(cls, ctx, fsid, daemon_id):
# type: (CephadmContext, str, Union[int, str]) -> NFSGanesha
- return cls(ctx, fsid, daemon_id, get_parm(ctx.config_json), ctx.image)
+ return cls(ctx, fsid, daemon_id, fetch_configs(ctx), ctx.image)
def get_container_mounts(self, data_dir):
# type: (str) -> Dict[str, str]
# write the RGW keyring
if self.rgw:
keyring_path = os.path.join(data_dir, 'keyring.rgw')
- with open(keyring_path, 'w') as f:
- os.fchmod(f.fileno(), 0o600)
- os.fchown(f.fileno(), uid, gid)
+ with write_new(keyring_path, owner=(uid, gid)) as f:
f.write(self.rgw.get('keyring', ''))
##################################
def init(cls, ctx, fsid, daemon_id):
# type: (CephadmContext, str, Union[int, str]) -> CephIscsi
return cls(ctx, fsid, daemon_id,
- get_parm(ctx.config_json), ctx.image)
+ fetch_configs(ctx), ctx.image)
@staticmethod
def get_container_mounts(data_dir, log_dir):
mounts[os.path.join(data_dir, 'keyring')] = '/etc/ceph/keyring:z'
mounts[os.path.join(data_dir, 'iscsi-gateway.cfg')] = '/etc/ceph/iscsi-gateway.cfg:z'
mounts[os.path.join(data_dir, 'configfs')] = '/sys/kernel/config'
+ mounts[os.path.join(data_dir, 'tcmu-runner-entrypoint.sh')] = '/usr/local/scripts/tcmu-runner-entrypoint.sh'
mounts[log_dir] = '/var/log:z'
mounts['/dev'] = '/dev'
return mounts
version = None
out, err, code = call(ctx,
[ctx.container_engine.path, 'exec', container_id,
- '/usr/bin/python3', '-c', "import pkg_resources; print(pkg_resources.require('ceph_iscsi')[0].version)"],
+ '/usr/bin/python3', '-c',
+ "import pkg_resources; print(pkg_resources.require('ceph_iscsi')[0].version)"],
verbosity=CallVerbosity.QUIET)
if code == 0:
version = out.strip()
configfs_dir = os.path.join(data_dir, 'configfs')
makedirs(configfs_dir, uid, gid, 0o755)
+ # set up the tcmu-runner entrypoint script
+ # to be mounted into the container. For more info
+ # on why we need this script, see the
+ # tcmu_runner_entrypoint_script function
+ self.files['tcmu-runner-entrypoint.sh'] = self.tcmu_runner_entrypoint_script()
+
# populate files from the config-json
populate_files(data_dir, self.files, uid, gid)
+ # we want the tcmu runner entrypoint script to be executable
+ # populate_files will give it 0o600 by default
+ os.chmod(os.path.join(data_dir, 'tcmu-runner-entrypoint.sh'), 0o700)
+
@staticmethod
def configfs_mount_umount(data_dir, mount=True):
# type: (str, bool) -> List[str]
'umount {0}; fi'.format(mount_path)
return cmd.split()
+ @staticmethod
+ def tcmu_runner_entrypoint_script() -> str:
+ # since we are having tcmu-runner be a background
+ # process in its systemd unit (rbd-target-api being
+ # the main process) systemd will not restart it when
+ # it fails. in order to try and get around that for now
+ # we can have a script mounted in the container that
+ # that attempts to do the restarting for us. This script
+ # can then become the entrypoint for the tcmu-runner
+ # container
+
+ # This is intended to be dropped for a better solution
+ # for at least the squid release onward
+ return """#!/bin/bash
+RUN_DIR=/var/run/tcmu-runner
+
+if [ ! -d "${RUN_DIR}" ] ; then
+ mkdir -p "${RUN_DIR}"
+fi
+
+rm -rf "${RUN_DIR}"/*
+
+while true
+do
+ touch "${RUN_DIR}"/start-up-$(date -Ins)
+ /usr/bin/tcmu-runner
+
+ # If we got around 3 kills/segfaults in the last minute,
+ # don't start anymore
+ if [ $(find "${RUN_DIR}" -type f -cmin -1 | wc -l) -ge 3 ] ; then
+ exit 0
+ fi
+
+ sleep 1
+done
+"""
+
def get_tcmu_runner_container(self):
# type: () -> CephContainer
# daemon_id, is used to generated the cid and pid files used by podman but as both tcmu-runner
# and rbd-target-api have the same daemon_id, it conflits and prevent the second container from
# starting. .tcmu runner is appended to the daemon_id to fix that.
- tcmu_container = get_container(self.ctx, self.fsid, self.daemon_type, str(self.daemon_id) + '.tcmu')
- tcmu_container.entrypoint = '/usr/bin/tcmu-runner'
+ tcmu_container = get_deployment_container(self.ctx, self.fsid, self.daemon_type, str(self.daemon_id) + '.tcmu')
+ # TODO: Eventually we don't want to run tcmu-runner through this script.
+ # This is intended to be a workaround backported to older releases
+ # and should eventually be removed in at least squid onward
+ tcmu_container.entrypoint = '/usr/local/scripts/tcmu-runner-entrypoint.sh'
tcmu_container.cname = self.get_container_name(desc='tcmu')
return tcmu_container
+
+##################################
+
+
+class CephNvmeof(object):
+ """Defines a Ceph-Nvmeof container"""
+
+ daemon_type = 'nvmeof'
+ required_files = ['ceph-nvmeof.conf']
+ default_image = DEFAULT_NVMEOF_IMAGE
+
+ def __init__(self,
+ ctx,
+ fsid,
+ daemon_id,
+ config_json,
+ image=DEFAULT_NVMEOF_IMAGE):
+ # type: (CephadmContext, str, Union[int, str], Dict, str) -> None
+ self.ctx = ctx
+ self.fsid = fsid
+ self.daemon_id = daemon_id
+ self.image = image
+
+ # config-json options
+ self.files = dict_get(config_json, 'files', {})
+
+ # validate the supplied args
+ self.validate()
+
+ @classmethod
+ def init(cls, ctx, fsid, daemon_id):
+ # type: (CephadmContext, str, Union[int, str]) -> CephNvmeof
+ return cls(ctx, fsid, daemon_id,
+ fetch_configs(ctx), ctx.image)
+
+ @staticmethod
+ def get_container_mounts(data_dir: str) -> Dict[str, str]:
+ mounts = dict()
+ mounts[os.path.join(data_dir, 'config')] = '/etc/ceph/ceph.conf:z'
+ mounts[os.path.join(data_dir, 'keyring')] = '/etc/ceph/keyring:z'
+ mounts[os.path.join(data_dir, 'ceph-nvmeof.conf')] = '/src/ceph-nvmeof.conf:z'
+ mounts[os.path.join(data_dir, 'configfs')] = '/sys/kernel/config'
+ mounts['/dev/hugepages'] = '/dev/hugepages'
+ mounts['/dev/vfio/vfio'] = '/dev/vfio/vfio'
+ return mounts
+
+ @staticmethod
+ def get_container_binds():
+ # type: () -> List[List[str]]
+ binds = []
+ lib_modules = ['type=bind',
+ 'source=/lib/modules',
+ 'destination=/lib/modules',
+ 'ro=true']
+ binds.append(lib_modules)
+ return binds
+
+ @staticmethod
+ def get_version(ctx: CephadmContext, container_id: str) -> Optional[str]:
+ out, err, ret = call(ctx,
+ [ctx.container_engine.path, 'inspect',
+ '--format', '{{index .Config.Labels "io.ceph.version"}}',
+ ctx.image])
+ version = None
+ if ret == 0:
+ version = out.strip()
+ return version
+
+ def validate(self):
+ # type: () -> None
+ if not is_fsid(self.fsid):
+ raise Error('not an fsid: %s' % self.fsid)
+ if not self.daemon_id:
+ raise Error('invalid daemon_id: %s' % self.daemon_id)
+ if not self.image:
+ raise Error('invalid image: %s' % self.image)
+
+ # check for the required files
+ if self.required_files:
+ for fname in self.required_files:
+ if fname not in self.files:
+ raise Error('required file missing from config-json: %s' % fname)
+
+ def get_daemon_name(self):
+ # type: () -> str
+ return '%s.%s' % (self.daemon_type, self.daemon_id)
+
+ def get_container_name(self, desc=None):
+ # type: (Optional[str]) -> str
+ cname = '%s-%s' % (self.fsid, self.get_daemon_name())
+ if desc:
+ cname = '%s-%s' % (cname, desc)
+ return cname
+
+ def create_daemon_dirs(self, data_dir, uid, gid):
+ # type: (str, int, int) -> None
+ """Create files under the container data dir"""
+ if not os.path.isdir(data_dir):
+ raise OSError('data_dir is not a directory: %s' % (data_dir))
+
+ logger.info('Creating ceph-nvmeof config...')
+ configfs_dir = os.path.join(data_dir, 'configfs')
+ makedirs(configfs_dir, uid, gid, 0o755)
+
+ # populate files from the config-json
+ populate_files(data_dir, self.files, uid, gid)
+
+ @staticmethod
+ def configfs_mount_umount(data_dir, mount=True):
+ # type: (str, bool) -> List[str]
+ mount_path = os.path.join(data_dir, 'configfs')
+ if mount:
+ cmd = 'if ! grep -qs {0} /proc/mounts; then ' \
+ 'mount -t configfs none {0}; fi'.format(mount_path)
+ else:
+ cmd = 'if grep -qs {0} /proc/mounts; then ' \
+ 'umount {0}; fi'.format(mount_path)
+ return cmd.split()
+
+ @staticmethod
+ def get_sysctl_settings() -> List[str]:
+ return [
+ 'vm.nr_hugepages = 4096',
+ ]
+
+
##################################
def init(cls, ctx: CephadmContext, fsid: str,
daemon_id: Union[int, str]) -> 'CephExporter':
return cls(ctx, fsid, daemon_id,
- get_parm(ctx.config_json), ctx.image)
+ fetch_configs(ctx), ctx.image)
@staticmethod
def get_container_mounts() -> Dict[str, str]:
@classmethod
def init(cls, ctx: CephadmContext,
fsid: str, daemon_id: Union[int, str]) -> 'HAproxy':
- return cls(ctx, fsid, daemon_id, get_parm(ctx.config_json),
+ return cls(ctx, fsid, daemon_id, fetch_configs(ctx),
ctx.image)
def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None:
def init(cls, ctx: CephadmContext, fsid: str,
daemon_id: Union[int, str]) -> 'Keepalived':
return cls(ctx, fsid, daemon_id,
- get_parm(ctx.config_json), ctx.image)
+ fetch_configs(ctx), ctx.image)
def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None:
"""Create files under the container data dir"""
def init(cls, ctx: CephadmContext,
fsid: str, daemon_id: Union[int, str]) -> 'CustomContainer':
return cls(fsid, daemon_id,
- get_parm(ctx.config_json), ctx.image)
+ fetch_configs(ctx), ctx.image)
def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None:
"""
logger.info('Creating file: {}'.format(file_path))
content = dict_get_join(self.files, file_path)
file_path = os.path.join(data_dir, file_path.strip('/'))
- with open(file_path, 'w', encoding='utf-8') as f:
- os.fchown(f.fileno(), uid, gid)
- os.fchmod(f.fileno(), 0o600)
+ with write_new(file_path, owner=(uid, gid), encoding='utf-8') as f:
f.write(content)
def get_daemon_args(self) -> List[str]:
##################################
-def dict_get_join(d: Dict, key: str) -> Any:
+def dict_get_join(d: Dict[str, Any], key: str) -> Any:
"""
Helper function to get the value of a given key from a dictionary.
`List` values will be converted to a string by joining them with a
supported_daemons.extend(Monitoring.components)
supported_daemons.append(NFSGanesha.daemon_type)
supported_daemons.append(CephIscsi.daemon_type)
+ supported_daemons.append(CephNvmeof.daemon_type)
supported_daemons.append(CustomContainer.daemon_type)
supported_daemons.append(HAproxy.daemon_type)
supported_daemons.append(Keepalived.daemon_type)
logger.warning(msg)
raise PortOccupiedError(msg)
else:
- raise Error(e)
+ raise e
except Exception as e:
raise Error(e)
finally:
s.close()
-def port_in_use(ctx, port_num):
- # type: (CephadmContext, int) -> bool
+def port_in_use(ctx: CephadmContext, endpoint: EndPoint) -> bool:
"""Detect whether a port is in use on the local machine - IPv4 and IPv6"""
- logger.info('Verifying port %d ...' % port_num)
+ logger.info('Verifying port %s ...' % str(endpoint))
def _port_in_use(af: socket.AddressFamily, address: str) -> bool:
try:
s = socket.socket(af, socket.SOCK_STREAM)
- attempt_bind(ctx, s, address, port_num)
+ attempt_bind(ctx, s, address, endpoint.port)
except PortOccupiedError:
return True
except OSError as e:
else:
raise e
return False
+
+ if endpoint.ip != '0.0.0.0' and endpoint.ip != '::':
+ if is_ipv6(endpoint.ip):
+ return _port_in_use(socket.AF_INET6, endpoint.ip)
+ else:
+ return _port_in_use(socket.AF_INET, endpoint.ip)
+
return any(_port_in_use(af, address) for af, address in (
(socket.AF_INET, '0.0.0.0'),
(socket.AF_INET6, '::')
def default_image(func: FuncT) -> FuncT:
@wraps(func)
def _default_image(ctx: CephadmContext) -> Any:
- if not ctx.image:
- if 'name' in ctx and ctx.name:
- type_ = ctx.name.split('.', 1)[0]
- if type_ in Monitoring.components:
- ctx.image = Monitoring.components[type_]['image']
- if type_ == 'haproxy':
- ctx.image = HAproxy.default_image
- if type_ == 'keepalived':
- ctx.image = Keepalived.default_image
- if type_ == SNMPGateway.daemon_type:
- ctx.image = SNMPGateway.default_image
- if type_ in Tracing.components:
- ctx.image = Tracing.components[type_]['image']
- if not ctx.image:
- ctx.image = os.environ.get('CEPHADM_IMAGE')
- if not ctx.image:
- ctx.image = _get_default_image(ctx)
-
+ update_default_image(ctx)
return func(ctx)
return cast(FuncT, _default_image)
+def update_default_image(ctx: CephadmContext) -> None:
+ if getattr(ctx, 'image', None):
+ return
+ ctx.image = None # ensure ctx.image exists to avoid repeated `getattr`s
+ name = getattr(ctx, 'name', None)
+ if name:
+ type_ = name.split('.', 1)[0]
+ if type_ in Monitoring.components:
+ ctx.image = Monitoring.components[type_]['image']
+ if type_ == 'haproxy':
+ ctx.image = HAproxy.default_image
+ if type_ == 'keepalived':
+ ctx.image = Keepalived.default_image
+ if type_ == SNMPGateway.daemon_type:
+ ctx.image = SNMPGateway.default_image
+ if type_ == CephNvmeof.daemon_type:
+ ctx.image = CephNvmeof.default_image
+ if type_ in Tracing.components:
+ ctx.image = Tracing.components[type_]['image']
+ if not ctx.image:
+ ctx.image = os.environ.get('CEPHADM_IMAGE')
+ if not ctx.image:
+ ctx.image = _get_default_image(ctx)
+
+
+def executes_early(func: FuncT) -> FuncT:
+ """Decorator that indicates the command function is meant to have no
+ dependencies and no environmental requirements and can therefore be
+ executed as non-root and with no logging, etc. Commands that have this
+ decorator applied must be simple and self-contained.
+ """
+ cast(Any, func)._execute_early = True
+ return func
+
+
+def deprecated_command(func: FuncT) -> FuncT:
+ @wraps(func)
+ def _deprecated_command(ctx: CephadmContext) -> Any:
+ logger.warning(f'Deprecated command used: {func}')
+ if NO_DEPRECATED:
+ raise Error('running deprecated commands disabled')
+ return func(ctx)
+
+ return cast(FuncT, _deprecated_command)
+
+
def get_container_info(ctx: CephadmContext, daemon_filter: str, by_name: bool) -> Optional[ContainerInfo]:
"""
:param ctx: Cephadm context
if daemon_type not in ['grafana', 'loki', 'promtail']:
ip = ''
port = Monitoring.port_map[daemon_type][0]
- if 'meta_json' in ctx and ctx.meta_json:
- meta = json.loads(ctx.meta_json) or {}
+ meta = fetch_meta(ctx)
+ if meta:
if 'ip' in meta and meta['ip']:
ip = meta['ip']
if 'ports' in meta and meta['ports']:
port = meta['ports'][0]
r += [f'--web.listen-address={ip}:{port}']
if daemon_type == 'prometheus':
- config = get_parm(ctx.config_json)
+ config = fetch_configs(ctx)
retention_time = config.get('retention_time', '15d')
retention_size = config.get('retention_size', '0') # default to disabled
r += [f'--storage.tsdb.retention.time={retention_time}']
host = wrap_ipv6(addr) if addr else host
r += [f'--web.external-url={scheme}://{host}:{port}']
if daemon_type == 'alertmanager':
- config = get_parm(ctx.config_json)
+ config = fetch_configs(ctx)
peers = config.get('peers', list()) # type: ignore
for peer in peers:
r += ['--cluster.peer={}'.format(peer)]
if daemon_type == 'promtail':
r += ['--config.expand-env']
if daemon_type == 'prometheus':
- config = get_parm(ctx.config_json)
+ config = fetch_configs(ctx)
try:
r += [f'--web.config.file={config["web_config"]}']
except KeyError:
pass
if daemon_type == 'node-exporter':
- config = get_parm(ctx.config_json)
+ config = fetch_configs(ctx)
try:
- r += [f'--web.config={config["web_config"]}']
+ r += [f'--web.config.file={config["web_config"]}']
except KeyError:
pass
r += ['--path.procfs=/host/proc',
if config:
config_path = os.path.join(data_dir, 'config')
- with open(config_path, 'w') as f:
- os.fchown(f.fileno(), uid, gid)
- os.fchmod(f.fileno(), 0o600)
+ with write_new(config_path, owner=(uid, gid)) as f:
f.write(config)
if keyring:
keyring_path = os.path.join(data_dir, 'keyring')
- with open(keyring_path, 'w') as f:
- os.fchmod(f.fileno(), 0o600)
- os.fchown(f.fileno(), uid, gid)
+ with write_new(keyring_path, owner=(uid, gid)) as f:
f.write(keyring)
if daemon_type in Monitoring.components.keys():
- config_json: Dict[str, Any] = dict()
- if 'config_json' in ctx:
- config_json = get_parm(ctx.config_json)
+ config_json = fetch_configs(ctx)
# Set up directories specific to the monitoring component
config_dir = ''
# populate the config directory for the component from the config-json
if 'files' in config_json:
for fname in config_json['files']:
- content = dict_get_join(config_json['files'], fname)
+ # work around mypy wierdness where it thinks `str`s aren't Anys
+ # when used for dictionary values! feels like possibly a mypy bug?!
+ cfg = cast(Dict[str, Any], config_json['files'])
+ content = dict_get_join(cfg, fname)
if os.path.isabs(fname):
fpath = os.path.join(data_dir_root, fname.lstrip(os.path.sep))
else:
fpath = os.path.join(data_dir_root, config_dir, fname)
- with open(fpath, 'w', encoding='utf-8') as f:
- os.fchown(f.fileno(), uid, gid)
- os.fchmod(f.fileno(), 0o600)
+ with write_new(fpath, owner=(uid, gid), encoding='utf-8') as f:
f.write(content)
elif daemon_type == NFSGanesha.daemon_type:
ceph_iscsi = CephIscsi.init(ctx, fsid, daemon_id)
ceph_iscsi.create_daemon_dirs(data_dir, uid, gid)
+ elif daemon_type == CephNvmeof.daemon_type:
+ ceph_nvmeof = CephNvmeof.init(ctx, fsid, daemon_id)
+ ceph_nvmeof.create_daemon_dirs(data_dir, uid, gid)
+
elif daemon_type == HAproxy.daemon_type:
haproxy = HAproxy.init(ctx, fsid, daemon_id)
haproxy.create_daemon_dirs(data_dir, uid, gid)
def _write_custom_conf_files(ctx: CephadmContext, daemon_type: str, daemon_id: str, fsid: str, uid: int, gid: int) -> None:
# mostly making this its own function to make unit testing easier
- if 'config_json' not in ctx or not ctx.config_json:
+ ccfiles = fetch_custom_config_files(ctx)
+ if not ccfiles:
return
- config_json = get_custom_config_files(ctx.config_json)
custom_config_dir = os.path.join(ctx.data_dir, fsid, 'custom_config_files', f'{daemon_type}.{daemon_id}')
if not os.path.exists(custom_config_dir):
makedirs(custom_config_dir, uid, gid, 0o755)
mandatory_keys = ['mount_path', 'content']
- for ccf in config_json['custom_config_files']:
+ for ccf in ccfiles:
if all(k in ccf for k in mandatory_keys):
file_path = os.path.join(custom_config_dir, os.path.basename(ccf['mount_path']))
- with open(file_path, 'w+', encoding='utf-8') as f:
- os.fchown(f.fileno(), uid, gid)
- os.fchmod(f.fileno(), 0o600)
+ with write_new(file_path, owner=(uid, gid), encoding='utf-8') as f:
f.write(ccf['content'])
+ # temporary workaround to make custom config files work for tcmu-runner
+ # container we deploy with iscsi until iscsi is refactored
+ if daemon_type == 'iscsi':
+ tcmu_config_dir = custom_config_dir + '.tcmu'
+ if not os.path.exists(tcmu_config_dir):
+ makedirs(tcmu_config_dir, uid, gid, 0o755)
+ tcmu_file_path = os.path.join(tcmu_config_dir, os.path.basename(ccf['mount_path']))
+ with write_new(tcmu_file_path, owner=(uid, gid), encoding='utf-8') as f:
+ f.write(ccf['content'])
def get_parm(option: str) -> Dict[str, str]:
js = _get_config_json(option)
# custom_config_files is a special field that may be in the config
# dict. It is used for mounting custom config files into daemon's containers
- # and should be accessed through the "get_custom_config_files" function.
+ # and should be accessed through the "fetch_custom_config_files" function.
# For get_parm we need to discard it.
js.pop('custom_config_files', None)
return js
-def get_custom_config_files(option: str) -> Dict[str, List[Dict[str, str]]]:
- js = _get_config_json(option)
- res: Dict[str, List[Dict[str, str]]] = {'custom_config_files': []}
- if 'custom_config_files' in js:
- res['custom_config_files'] = js['custom_config_files']
- return res
-
-
def _get_config_json(option: str) -> Dict[str, Any]:
if not option:
return dict()
return js
+def fetch_meta(ctx: CephadmContext) -> Dict[str, Any]:
+ """Return a dict containing metadata about a deployment.
+ """
+ meta = getattr(ctx, 'meta_properties', None)
+ if meta is not None:
+ return meta
+ mjson = getattr(ctx, 'meta_json', None)
+ if mjson is not None:
+ meta = json.loads(mjson) or {}
+ ctx.meta_properties = meta
+ return meta
+ return {}
+
+
+def fetch_configs(ctx: CephadmContext) -> Dict[str, str]:
+ """Return a dict containing arbitrary configuration parameters.
+ This function filters out the key 'custom_config_files' which
+ must not be part of a deployment's configuration key-value pairs.
+ To access custom configuration file data, use `fetch_custom_config_files`.
+ """
+ # ctx.config_blobs is *always* a dict. it is created once when
+ # a command is parsed/processed and stored "forever"
+ cfg_blobs = getattr(ctx, 'config_blobs', None)
+ if cfg_blobs:
+ cfg_blobs = dict(cfg_blobs)
+ cfg_blobs.pop('custom_config_files', None)
+ return cfg_blobs
+ # ctx.config_json is the legacy equivalent of config_blobs. it is a
+ # string that either contains json or refers to a file name where
+ # the file contains json.
+ cfg_json = getattr(ctx, 'config_json', None)
+ if cfg_json:
+ jdata = _get_config_json(cfg_json) or {}
+ jdata.pop('custom_config_files', None)
+ return jdata
+ return {}
+
+
+def fetch_custom_config_files(ctx: CephadmContext) -> List[Dict[str, Any]]:
+ """Return a list containing dicts that can be used to populate
+ custom configuration files for containers.
+ """
+ # NOTE: this function works like the opposite of fetch_configs.
+ # instead of filtering out custom_config_files, it returns only
+ # the content in that key.
+ cfg_blobs = getattr(ctx, 'config_blobs', None)
+ if cfg_blobs:
+ return cfg_blobs.get('custom_config_files', [])
+ cfg_json = getattr(ctx, 'config_json', None)
+ if cfg_json:
+ jdata = _get_config_json(cfg_json)
+ return jdata.get('custom_config_files', [])
+ return []
+
+
+def fetch_tcp_ports(ctx: CephadmContext) -> List[EndPoint]:
+ """Return a list of Endpoints, which have a port and ip attribute
+ """
+ ports = getattr(ctx, 'tcp_ports', None)
+ if ports is None:
+ ports = []
+ if isinstance(ports, str):
+ ports = list(map(int, ports.split()))
+ port_ips: Dict[str, str] = {}
+ port_ips_attr: Union[str, Dict[str, str], None] = getattr(ctx, 'port_ips', None)
+ if isinstance(port_ips_attr, str):
+ port_ips = json.loads(port_ips_attr)
+ elif port_ips_attr is not None:
+ # if it's not None or a str, assume it's already the dict we want
+ port_ips = port_ips_attr
+
+ endpoints: List[EndPoint] = []
+ for port in ports:
+ if str(port) in port_ips:
+ endpoints.append(EndPoint(port_ips[str(port)], port))
+ else:
+ endpoints.append(EndPoint('0.0.0.0', port))
+
+ return endpoints
+
+
def get_config_and_keyring(ctx):
# type: (CephadmContext) -> Tuple[Optional[str], Optional[str]]
config = None
keyring = None
- if 'config_json' in ctx and ctx.config_json:
- d = get_parm(ctx.config_json)
+ d = fetch_configs(ctx)
+ if d:
config = d.get('config')
keyring = d.get('keyring')
if config and keyring:
if daemon_type == CephIscsi.daemon_type:
binds.extend(CephIscsi.get_container_binds())
+ if daemon_type == CephNvmeof.daemon_type:
+ binds.extend(CephNvmeof.get_container_binds())
elif daemon_type == CustomContainer.daemon_type:
assert daemon_id
cc = CustomContainer.init(ctx, fsid, daemon_id)
if daemon_type == 'osd':
# selinux-policy in the container may not match the host.
if HostFacts(ctx).selinux_enabled:
- selinux_folder = '/var/lib/ceph/%s/selinux' % fsid
- if not os.path.exists(selinux_folder):
- os.makedirs(selinux_folder, mode=0o755)
- mounts[selinux_folder] = '/sys/fs/selinux:ro'
+ cluster_dir = f'{ctx.data_dir}/{fsid}'
+ selinux_folder = f'{cluster_dir}/selinux'
+ if os.path.exists(cluster_dir):
+ if not os.path.exists(selinux_folder):
+ os.makedirs(selinux_folder, mode=0o755)
+ mounts[selinux_folder] = '/sys/fs/selinux:ro'
+ else:
+ logger.error(f'Cluster direcotry {cluster_dir} does not exist.')
mounts['/'] = '/rootfs'
try:
data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
mounts.update(HAproxy.get_container_mounts(data_dir))
+ if daemon_type == CephNvmeof.daemon_type:
+ assert daemon_id
+ data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
+ mounts.update(CephNvmeof.get_container_mounts(data_dir))
+
if daemon_type == CephIscsi.daemon_type:
assert daemon_id
data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
elif daemon_type in Tracing.components:
entrypoint = ''
name = '%s.%s' % (daemon_type, daemon_id)
- config = get_parm(ctx.config_json)
+ config = fetch_configs(ctx)
Tracing.set_configuration(config, daemon_type)
envs.extend(Tracing.components[daemon_type].get('envs', []))
elif daemon_type == NFSGanesha.daemon_type:
name = '%s.%s' % (daemon_type, daemon_id)
envs.extend(Keepalived.get_container_envs())
container_args.extend(['--cap-add=NET_ADMIN', '--cap-add=NET_RAW'])
+ elif daemon_type == CephNvmeof.daemon_type:
+ name = '%s.%s' % (daemon_type, daemon_id)
+ container_args.extend(['--ulimit', 'memlock=-1:-1'])
+ container_args.extend(['--ulimit', 'nofile=10240'])
+ container_args.extend(['--cap-add=SYS_ADMIN', '--cap-add=CAP_SYS_NICE'])
elif daemon_type == CephIscsi.daemon_type:
entrypoint = CephIscsi.entrypoint
name = '%s.%s' % (daemon_type, daemon_id)
raise RuntimeError('uid/gid not found')
-def deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid,
- config=None, keyring=None,
- osd_fsid=None,
- reconfig=False,
- ports=None):
- # type: (CephadmContext, str, str, Union[int, str], Optional[CephContainer], int, int, Optional[str], Optional[str], Optional[str], Optional[bool], Optional[List[int]]) -> None
-
- ports = ports or []
- if any([port_in_use(ctx, port) for port in ports]):
- if daemon_type == 'mgr':
- # non-fatal for mgr when we are in mgr_standby_modules=false, but we can't
- # tell whether that is the case here.
- logger.warning(
- f"ceph-mgr TCP port(s) {','.join(map(str, ports))} already in use"
- )
- else:
- raise Error("TCP Port(s) '{}' required for {} already in use".format(','.join(map(str, ports)), daemon_type))
+def deploy_daemon(ctx: CephadmContext, fsid: str, daemon_type: str,
+ daemon_id: Union[int, str], c: Optional['CephContainer'],
+ uid: int, gid: int, config: Optional[str] = None,
+ keyring: Optional[str] = None, osd_fsid: Optional[str] = None,
+ deployment_type: DeploymentType = DeploymentType.DEFAULT,
+ endpoints: Optional[List[EndPoint]] = None) -> None:
+
+ endpoints = endpoints or []
+ # only check port in use if fresh deployment since service
+ # we are redeploying/reconfiguring will already be using the port
+ if deployment_type == DeploymentType.DEFAULT:
+ if any([port_in_use(ctx, e) for e in endpoints]):
+ if daemon_type == 'mgr':
+ # non-fatal for mgr when we are in mgr_standby_modules=false, but we can't
+ # tell whether that is the case here.
+ logger.warning(
+ f"ceph-mgr TCP port(s) {','.join(map(str, endpoints))} already in use"
+ )
+ else:
+ raise Error("TCP Port(s) '{}' required for {} already in use".format(','.join(map(str, endpoints)), daemon_type))
data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
- if reconfig and not os.path.exists(data_dir):
+ if deployment_type == DeploymentType.RECONFIG and not os.path.exists(data_dir):
raise Error('cannot reconfig, data path %s does not exist' % data_dir)
if daemon_type == 'mon' and not os.path.exists(data_dir):
assert config
).run()
# write conf
- with open(mon_dir + '/config', 'w') as f:
- os.fchown(f.fileno(), uid, gid)
- os.fchmod(f.fileno(), 0o600)
+ with write_new(mon_dir + '/config', owner=(uid, gid)) as f:
f.write(config)
else:
# dirs, conf, keyring
uid, gid,
config, keyring)
- if not reconfig:
+ # only write out unit files and start daemon
+ # with systemd if this is not a reconfig
+ if deployment_type != DeploymentType.RECONFIG:
if daemon_type == CephadmAgent.daemon_type:
- if ctx.config_json == '-':
- config_js = get_parm('-')
- else:
- config_js = get_parm(ctx.config_json)
+ config_js = fetch_configs(ctx)
assert isinstance(config_js, dict)
cephadm_agent = CephadmAgent(ctx, fsid, daemon_id)
else:
if c:
deploy_daemon_units(ctx, fsid, uid, gid, daemon_type, daemon_id,
- c, osd_fsid=osd_fsid, ports=ports)
+ c, osd_fsid=osd_fsid, endpoints=endpoints)
else:
raise RuntimeError('attempting to deploy a daemon without a container image')
if not os.path.exists(data_dir + '/unit.created'):
- with open(data_dir + '/unit.created', 'w') as f:
- os.fchmod(f.fileno(), 0o600)
- os.fchown(f.fileno(), uid, gid)
+ with write_new(data_dir + '/unit.created', owner=(uid, gid)) as f:
f.write('mtime is time the daemon deployment was created\n')
- with open(data_dir + '/unit.configured', 'w') as f:
+ with write_new(data_dir + '/unit.configured', owner=(uid, gid)) as f:
f.write('mtime is time we were last configured\n')
- os.fchmod(f.fileno(), 0o600)
- os.fchown(f.fileno(), uid, gid)
update_firewalld(ctx, daemon_type)
# Open ports explicitly required for the daemon
- if ports:
+ if endpoints:
fw = Firewalld(ctx)
- fw.open_ports(ports + fw.external_ports.get(daemon_type, []))
+ fw.open_ports([e.port for e in endpoints] + fw.external_ports.get(daemon_type, []))
fw.apply_rules()
- if reconfig and daemon_type not in Ceph.daemons:
+ # If this was a reconfig and the daemon is not a Ceph daemon, restart it
+ # so it can pick up potential changes to its configuration files
+ if deployment_type == DeploymentType.RECONFIG and daemon_type not in Ceph.daemons:
# ceph daemons do not need a restart; others (presumably) do to pick
# up the new config
call_throws(ctx, ['systemctl', 'reset-failed',
enable: bool = True,
start: bool = True,
osd_fsid: Optional[str] = None,
- ports: Optional[List[int]] = None,
+ endpoints: Optional[List[EndPoint]] = None,
) -> None:
# cmd
f.write(f'! {container_exists % c.cname} || {" ".join(c.stop_cmd(timeout=timeout))} \n')
data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
- with open(data_dir + '/unit.run.new', 'w') as f, \
- open(data_dir + '/unit.meta.new', 'w') as metaf:
+ run_file_path = data_dir + '/unit.run'
+ meta_file_path = data_dir + '/unit.meta'
+ with write_new(run_file_path) as f, write_new(meta_file_path) as metaf:
+
f.write('set -e\n')
if daemon_type in Ceph.daemons:
_write_container_cmd_to_bash(ctx, f, c, '%s.%s' % (daemon_type, str(daemon_id)))
# some metadata about the deploy
- meta: Dict[str, Any] = {}
- if 'meta_json' in ctx and ctx.meta_json:
- meta = json.loads(ctx.meta_json) or {}
+ meta: Dict[str, Any] = fetch_meta(ctx)
meta.update({
'memory_request': int(ctx.memory_request) if ctx.memory_request else None,
'memory_limit': int(ctx.memory_limit) if ctx.memory_limit else None,
})
if not meta.get('ports'):
- meta['ports'] = ports
+ if endpoints:
+ meta['ports'] = [e.port for e in endpoints]
+ else:
+ meta['ports'] = []
metaf.write(json.dumps(meta, indent=4) + '\n')
- os.fchmod(f.fileno(), 0o600)
- os.fchmod(metaf.fileno(), 0o600)
- os.rename(data_dir + '/unit.run.new',
- data_dir + '/unit.run')
- os.rename(data_dir + '/unit.meta.new',
- data_dir + '/unit.meta')
-
timeout = 30 if daemon_type == 'osd' else None
# post-stop command(s)
- with open(data_dir + '/unit.poststop.new', 'w') as f:
+ with write_new(data_dir + '/unit.poststop') as f:
# this is a fallback to eventually stop any underlying container that was not stopped properly by unit.stop,
# this could happen in very slow setups as described in the issue https://tracker.ceph.com/issues/58242.
- add_stop_actions(f, timeout)
+ add_stop_actions(cast(TextIO, f), timeout)
if daemon_type == 'osd':
assert osd_fsid
poststop = get_ceph_volume_container(
f.write('! ' + 'rm ' + runtime_dir + '/ceph-%s@%s.%s.service-pid' % (fsid, daemon_type, str(daemon_id) + '.tcmu') + '\n')
f.write('! ' + 'rm ' + runtime_dir + '/ceph-%s@%s.%s.service-cid' % (fsid, daemon_type, str(daemon_id) + '.tcmu') + '\n')
f.write(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=False)) + '\n')
- os.fchmod(f.fileno(), 0o600)
- os.rename(data_dir + '/unit.poststop.new',
- data_dir + '/unit.poststop')
# post-stop command(s)
- with open(data_dir + '/unit.stop.new', 'w') as f:
- add_stop_actions(f, timeout)
- os.fchmod(f.fileno(), 0o600)
- os.rename(data_dir + '/unit.stop.new',
- data_dir + '/unit.stop')
+ with write_new(data_dir + '/unit.stop') as f:
+ add_stop_actions(cast(TextIO, f), timeout)
if c:
- with open(data_dir + '/unit.image.new', 'w') as f:
+ with write_new(data_dir + '/unit.image') as f:
f.write(c.image + '\n')
- os.fchmod(f.fileno(), 0o600)
- os.rename(data_dir + '/unit.image.new',
- data_dir + '/unit.image')
# sysctl
install_sysctl(ctx, fsid, daemon_type)
install_base_units(ctx, fsid)
unit = get_unit_file(ctx, fsid)
unit_file = 'ceph-%s@.service' % (fsid)
- with open(ctx.unit_dir + '/' + unit_file + '.new', 'w') as f:
+ with write_new(ctx.unit_dir + '/' + unit_file, perms=None) as f:
f.write(unit)
- os.rename(ctx.unit_dir + '/' + unit_file + '.new',
- ctx.unit_dir + '/' + unit_file)
call_throws(ctx, ['systemctl', 'daemon-reload'])
unit_name = get_unit_name(fsid, daemon_type, daemon_id)
*lines,
'',
]
- with open(conf, 'w') as f:
+ with write_new(conf, owner=None, perms=None) as f:
f.write('\n'.join(lines))
conf = Path(ctx.sysctl_dir).joinpath(f'90-ceph-{fsid}-{daemon_type}.conf')
lines = HAproxy.get_sysctl_settings()
elif daemon_type == 'keepalived':
lines = Keepalived.get_sysctl_settings()
+ elif daemon_type == CephNvmeof.daemon_type:
+ lines = CephNvmeof.get_sysctl_settings()
lines = filter_sysctl_settings(ctx, lines)
# apply the sysctl settings
"""
# global unit
existed = os.path.exists(ctx.unit_dir + '/ceph.target')
- with open(ctx.unit_dir + '/ceph.target.new', 'w') as f:
+ with write_new(ctx.unit_dir + '/ceph.target', perms=None) as f:
f.write('[Unit]\n'
'Description=All Ceph clusters and services\n'
'\n'
'[Install]\n'
'WantedBy=multi-user.target\n')
- os.rename(ctx.unit_dir + '/ceph.target.new',
- ctx.unit_dir + '/ceph.target')
if not existed:
# we disable before enable in case a different ceph.target
# (from the traditional package) is present; while newer
# cluster unit
existed = os.path.exists(ctx.unit_dir + '/ceph-%s.target' % fsid)
- with open(ctx.unit_dir + '/ceph-%s.target.new' % fsid, 'w') as f:
+ with write_new(ctx.unit_dir + f'/ceph-{fsid}.target', perms=None) as f:
f.write(
'[Unit]\n'
'Description=Ceph cluster {fsid}\n'
'WantedBy=multi-user.target ceph.target\n'.format(
fsid=fsid)
)
- os.rename(ctx.unit_dir + '/ceph-%s.target.new' % fsid,
- ctx.unit_dir + '/ceph-%s.target' % fsid)
if not existed:
call_throws(ctx, ['systemctl', 'enable', 'ceph-%s.target' % fsid])
call_throws(ctx, ['systemctl', 'start', 'ceph-%s.target' % fsid])
return
# logrotate for the cluster
- with open(ctx.logrotate_dir + '/ceph-%s' % fsid, 'w') as f:
+ with write_new(ctx.logrotate_dir + f'/ceph-{fsid}', perms=None) as f:
"""
This is a bit sloppy in that the killall/pkill will touch all ceph daemons
in all containers, but I don't see an elegant way to send SIGHUP *just* to
first child (bash), but that isn't the ceph daemon. This is simpler and
should be harmless.
"""
+ targets: List[str] = [
+ 'ceph-mon',
+ 'ceph-mgr',
+ 'ceph-mds',
+ 'ceph-osd',
+ 'ceph-fuse',
+ 'radosgw',
+ 'rbd-mirror',
+ 'cephfs-mirror',
+ 'tcmu-runner'
+ ]
+
f.write("""# created by cephadm
/var/log/ceph/%s/*.log {
rotate 7
compress
sharedscripts
postrotate
- killall -q -1 ceph-mon ceph-mgr ceph-mds ceph-osd ceph-fuse radosgw rbd-mirror cephfs-mirror || pkill -1 -x 'ceph-mon|ceph-mgr|ceph-mds|ceph-osd|ceph-fuse|radosgw|rbd-mirror|cephfs-mirror' || true
+ killall -q -1 %s || pkill -1 -x '%s' || true
endscript
missingok
notifempty
su root root
}
-""" % fsid)
+""" % (fsid, ' '.join(targets), '|'.join(targets)))
def get_unit_file(ctx, fsid):
for filename in config:
if filename in self.agent.required_files:
file_path = os.path.join(self.agent.daemon_dir, filename)
- with open(os.open(file_path + '.new', os.O_CREAT | os.O_WRONLY, 0o600), 'w') as f:
+ with write_new(file_path) as f:
f.write(config[filename])
- os.rename(file_path + '.new', file_path)
self.agent.pull_conf_settings()
self.agent.wakeup()
for filename in config:
if filename in self.required_files:
file_path = os.path.join(self.daemon_dir, filename)
- with open(os.open(file_path + '.new', os.O_CREAT | os.O_WRONLY, 0o600), 'w') as f:
+ with write_new(file_path) as f:
f.write(config[filename])
- os.rename(file_path + '.new', file_path)
unit_run_path = os.path.join(self.daemon_dir, 'unit.run')
- with open(os.open(unit_run_path + '.new', os.O_CREAT | os.O_WRONLY, 0o600), 'w') as f:
+ with write_new(unit_run_path) as f:
f.write(self.unit_run())
- os.rename(unit_run_path + '.new', unit_run_path)
- meta: Dict[str, Any] = {}
+ meta: Dict[str, Any] = fetch_meta(self.ctx)
meta_file_path = os.path.join(self.daemon_dir, 'unit.meta')
- if 'meta_json' in self.ctx and self.ctx.meta_json:
- meta = json.loads(self.ctx.meta_json) or {}
- with open(os.open(meta_file_path + '.new', os.O_CREAT | os.O_WRONLY, 0o600), 'w') as f:
+ with write_new(meta_file_path) as f:
f.write(json.dumps(meta, indent=4) + '\n')
- os.rename(meta_file_path + '.new', meta_file_path)
unit_file_path = os.path.join(self.ctx.unit_dir, self.unit_name())
- with open(os.open(unit_file_path + '.new', os.O_CREAT | os.O_WRONLY, 0o600), 'w') as f:
+ with write_new(unit_file_path) as f:
f.write(self.unit_file())
- os.rename(unit_file_path + '.new', unit_file_path)
call_throws(self.ctx, ['systemctl', 'daemon-reload'])
call(self.ctx, ['systemctl', 'stop', self.unit_name()],
try:
for _ in range(1001):
- if not port_in_use(self.ctx, self.starting_port):
+ if not port_in_use(self.ctx, EndPoint('0.0.0.0', self.starting_port)):
self.listener_port = str(self.starting_port)
break
self.starting_port += 1
##################################
-
-@infer_image
+@executes_early
def command_version(ctx):
# type: (CephadmContext) -> int
- c = CephContainer(ctx, ctx.image, 'ceph', ['--version'])
- out, err, ret = call(ctx, c.run_cmd(), desc=c.entrypoint)
- if not ret:
- print(out.strip())
- return ret
+ import importlib
+
+ try:
+ vmod = importlib.import_module('_version')
+ except ImportError:
+ print('cephadm version UNKNOWN')
+ return 1
+ _unset = '<UNSET>'
+ print('cephadm version {0} ({1}) {2} ({3})'.format(
+ getattr(vmod, 'CEPH_GIT_NICE_VER', _unset),
+ getattr(vmod, 'CEPH_GIT_VER', _unset),
+ getattr(vmod, 'CEPH_RELEASE_NAME', _unset),
+ getattr(vmod, 'CEPH_RELEASE_TYPE', _unset),
+ ))
+ return 0
##################################
fsid: str, mon_id: str
) -> None:
mon_c = get_container(ctx, fsid, 'mon', mon_id)
- ctx.meta_json = json.dumps({'service_name': 'mon'})
+ ctx.meta_properties = {'service_name': 'mon'}
deploy_daemon(ctx, fsid, 'mon', mon_id, mon_c, uid, gid,
config=None, keyring=None)
mgr_keyring = '[mgr.%s]\n\tkey = %s\n' % (mgr_id, mgr_key)
mgr_c = get_container(ctx, fsid, 'mgr', mgr_id)
# Note:the default port used by the Prometheus node exporter is opened in fw
- ctx.meta_json = json.dumps({'service_name': 'mgr'})
- ports = [9283, 8765]
+ ctx.meta_properties = {'service_name': 'mgr'}
+ endpoints = [EndPoint('0.0.0.0', 9283), EndPoint('0.0.0.0', 8765)]
if not ctx.skip_monitoring_stack:
- ports.append(8443)
+ endpoints.append(EndPoint('0.0.0.0', 8443))
deploy_daemon(ctx, fsid, 'mgr', mgr_id, mgr_c, uid, gid,
- config=config, keyring=mgr_keyring, ports=ports)
+ config=config, keyring=mgr_keyring, endpoints=endpoints)
# wait for the service to become available
logger.info('Waiting for mgr to start...')
except Exception as e:
logger.debug('status failed: %s' % e)
return False
+
is_available(ctx, 'mgr', is_mgr_available)
cli(['cephadm', 'set-priv-key', '-i', '/tmp/cephadm-ssh-key'], extra_mounts=mounts)
cli(['cephadm', 'set-pub-key', '-i', '/tmp/cephadm-ssh-key.pub'], extra_mounts=mounts)
ssh_pub = cli(['cephadm', 'get-pub-key'])
+ authorize_ssh_key(ssh_pub, ctx.ssh_user)
+ elif ctx.ssh_private_key and ctx.ssh_signed_cert:
+ logger.info('Using provided ssh private key and signed cert ...')
+ mounts = {
+ pathify(ctx.ssh_private_key.name): '/tmp/cephadm-ssh-key:z',
+ pathify(ctx.ssh_signed_cert.name): '/tmp/cephadm-ssh-key-cert.pub:z'
+ }
+ cli(['cephadm', 'set-priv-key', '-i', '/tmp/cephadm-ssh-key'], extra_mounts=mounts)
+ cli(['cephadm', 'set-signed-cert', '-i', '/tmp/cephadm-ssh-key-cert.pub'], extra_mounts=mounts)
else:
logger.info('Generating ssh key...')
cli(['cephadm', 'generate-key'])
with open(ctx.output_pub_ssh_key, 'w') as f:
f.write(ssh_pub)
logger.info('Wrote public SSH key to %s' % ctx.output_pub_ssh_key)
-
- authorize_ssh_key(ssh_pub, ctx.ssh_user)
+ authorize_ssh_key(ssh_pub, ctx.ssh_user)
host = get_hostname()
logger.info('Adding host %s...' % host)
cli(['config', 'set', 'global', 'container_image', f'{ctx.image}'])
if mon_network:
- logger.info(f'Setting mon public_network to {mon_network}')
- cli(['config', 'set', 'mon', 'public_network', mon_network])
+ cp = read_config(ctx.config)
+ cfg_section = 'global' if cp.has_option('global', 'public_network') else 'mon'
+ logger.info(f'Setting public_network to {mon_network} in {cfg_section} config section')
+ cli(['config', 'set', cfg_section, 'public_network', mon_network])
if cluster_network:
logger.info(f'Setting cluster_network to {cluster_network}')
logger.warning(f'Cannot create cluster configuration directory {conf_dir}')
+def rollback(func: FuncT) -> FuncT:
+ """
+ """
+ @wraps(func)
+ def _rollback(ctx: CephadmContext) -> Any:
+ try:
+ return func(ctx)
+ except ClusterAlreadyExists:
+ # another cluster with the provided fsid already exists: don't remove.
+ raise
+ except (KeyboardInterrupt, Exception) as e:
+ logger.error(f'{type(e).__name__}: {e}')
+ if ctx.cleanup_on_failure:
+ logger.info('\n\n'
+ '\t***************\n'
+ '\tCephadm hit an issue during cluster installation. Current cluster files will be deleted automatically,\n'
+ '\tto disable this behaviour do not pass the --cleanup-on-failure flag. In case of any previous\n'
+ '\tbroken installation user must use the following command to completely delete the broken cluster:\n\n'
+ '\t> cephadm rm-cluster --force --zap-osds --fsid <fsid>\n\n'
+ '\tfor more information please refer to https://docs.ceph.com/en/latest/cephadm/operations/#purging-a-cluster\n'
+ '\t***************\n\n')
+ _rm_cluster(ctx, keep_logs=False, zap_osds=False)
+ else:
+ logger.info('\n\n'
+ '\t***************\n'
+ '\tCephadm hit an issue during cluster installation. Current cluster files will NOT BE DELETED automatically to change\n'
+ '\tthis behaviour you can pass the --cleanup-on-failure. To remove this broken cluster manually please run:\n\n'
+ f'\t > cephadm rm-cluster --force --fsid {ctx.fsid}\n\n'
+ '\tin case of any previous broken installation user must use the rm-cluster command to delete the broken cluster:\n\n'
+ '\t > cephadm rm-cluster --force --zap-osds --fsid <fsid>\n\n'
+ '\tfor more information please refer to https://docs.ceph.com/en/latest/cephadm/operations/#purging-a-cluster\n'
+ '\t***************\n\n')
+ raise
+ return cast(FuncT, _rollback)
+
+
+@rollback
@default_image
def command_bootstrap(ctx):
# type: (CephadmContext) -> int
if not ctx.output_pub_ssh_key:
ctx.output_pub_ssh_key = os.path.join(ctx.output_dir, CEPH_PUBKEY)
- if bool(ctx.ssh_private_key) is not bool(ctx.ssh_public_key):
- raise Error('--ssh-private-key and --ssh-public-key must be provided together or not at all.')
+ if (
+ (bool(ctx.ssh_private_key) is not bool(ctx.ssh_public_key))
+ and (bool(ctx.ssh_private_key) is not bool(ctx.ssh_signed_cert))
+ ):
+ raise Error('--ssh-private-key must be passed with either --ssh-public-key in the case of standard pubkey '
+ 'authentication or with --ssh-signed-cert in the case of CA signed signed keys or not provided at all.')
+
+ if (bool(ctx.ssh_public_key) and bool(ctx.ssh_signed_cert)):
+ raise Error('--ssh-public-key and --ssh-signed-cert are mututally exclusive. --ssh-public-key is intended '
+ 'for standard pubkey encryption where the public key is set as an authorized key on cluster hosts. '
+ '--ssh-signed-cert is intended for the CA signed keys use case where cluster hosts are configured to trust '
+ 'a CA pub key and authentication during SSH is done by authenticating the signed cert, requiring no '
+ 'public key to be installed on the cluster hosts.')
if ctx.fsid:
data_dir_base = os.path.join(ctx.data_dir, ctx.fsid)
if os.path.exists(data_dir_base):
- raise Error(f"A cluster with the same fsid '{ctx.fsid}' already exists.")
+ raise ClusterAlreadyExists(f"A cluster with the same fsid '{ctx.fsid}' already exists.")
else:
logger.warning('Specifying an fsid for your cluster offers no advantages and may increase the likelihood of fsid conflicts.')
+ # initial vars
+ ctx.fsid = ctx.fsid or make_fsid()
+ fsid = ctx.fsid
+ if not is_fsid(fsid):
+ raise Error('not an fsid: %s' % fsid)
+
# verify output files
- for f in [ctx.output_config, ctx.output_keyring,
- ctx.output_pub_ssh_key]:
+ for f in [ctx.output_config, ctx.output_keyring, ctx.output_pub_ssh_key]:
if not ctx.allow_overwrite:
if os.path.exists(f):
- raise Error('%s already exists; delete or pass '
- '--allow-overwrite to overwrite' % f)
+ raise ClusterAlreadyExists('%s already exists; delete or pass --allow-overwrite to overwrite' % f)
dirname = os.path.dirname(f)
if dirname and not os.path.exists(dirname):
fname = os.path.basename(f)
else:
logger.info('Skip prepare_host')
- # initial vars
- fsid = ctx.fsid or make_fsid()
- if not is_fsid(fsid):
- raise Error('not an fsid: %s' % fsid)
logger.info('Cluster fsid: %s' % fsid)
-
hostname = get_hostname()
if '.' in hostname and not ctx.allow_fqdn_hostname:
raise Error('hostname is a fully qualified domain name (%s); either fix (e.g., "sudo hostname %s" or similar) or pass --allow-fqdn-hostname' % (hostname, hostname.split('.')[0]))
(mon_dir, log_dir) = prepare_create_mon(ctx, uid, gid, fsid, mon_id,
bootstrap_keyring.name, monmap.name)
- with open(mon_dir + '/config', 'w') as f:
- os.fchown(f.fileno(), uid, gid)
- os.fchmod(f.fileno(), 0o600)
+ with write_new(mon_dir + '/config', owner=(uid, gid)) as f:
f.write(config)
make_var_run(ctx, fsid, uid, gid)
cluster_network, ipv6_cluster_network)
# output files
- with open(ctx.output_keyring, 'w') as f:
- os.fchmod(f.fileno(), 0o600)
+ with write_new(ctx.output_keyring) as f:
f.write('[client.admin]\n'
'\tkey = ' + admin_key + '\n')
logger.info('Wrote keyring to %s' % ctx.output_keyring)
with open(ctx.apply_spec) as f:
host_dicts = _extract_host_info_from_applied_spec(f)
for h in host_dicts:
- _distribute_ssh_keys(ctx, h, hostname)
+ if ctx.ssh_signed_cert:
+ logger.info('Key distribution is not supported for signed CA key setups. Skipping ...')
+ else:
+ _distribute_ssh_keys(ctx, h, hostname)
mounts = {}
mounts[pathify(ctx.apply_spec)] = '/tmp/spec.yml:ro'
cmd.append('--authfile=/etc/ceph/podman-auth.json')
out, _, _ = call_throws(ctx, cmd)
if isinstance(engine, Podman):
- os.chmod('/etc/ceph/podman-auth.json', 0o600)
+ os.chmod('/etc/ceph/podman-auth.json', DEFAULT_MODE)
except Exception:
raise Error('Failed to login to custom registry @ %s as %s with given password' % (ctx.registry_url, ctx.registry_username))
c.container_args.extend(ctx.extra_container_args)
if 'extra_entrypoint_args' in ctx and ctx.extra_entrypoint_args:
c.args.extend(ctx.extra_entrypoint_args)
- if 'config_json' in ctx and ctx.config_json:
- conf_files = get_custom_config_files(ctx.config_json)
+ ccfiles = fetch_custom_config_files(ctx)
+ if ccfiles:
mandatory_keys = ['mount_path', 'content']
- for conf in conf_files['custom_config_files']:
+ for conf in ccfiles:
if all(k in conf for k in mandatory_keys):
mount_path = conf['mount_path']
file_path = os.path.join(
return c
+def get_deployment_type(ctx: CephadmContext, daemon_type: str, daemon_id: str) -> DeploymentType:
+ deployment_type: DeploymentType = DeploymentType.DEFAULT
+ if ctx.reconfig:
+ deployment_type = DeploymentType.RECONFIG
+ unit_name = get_unit_name(ctx.fsid, daemon_type, daemon_id)
+ (_, state, _) = check_unit(ctx, unit_name)
+ if state == 'running' or is_container_running(ctx, CephContainer.for_daemon(ctx, ctx.fsid, daemon_type, daemon_id, 'bash')):
+ # if reconfig was set, that takes priority over redeploy. If
+ # this is considered a fresh deployment at this stage,
+ # mark it as a redeploy to avoid port checking
+ if deployment_type == DeploymentType.DEFAULT:
+ deployment_type = DeploymentType.REDEPLOY
+
+ logger.info(f'{deployment_type.value} daemon {ctx.name} ...')
+
+ return deployment_type
+
+
@default_image
+@deprecated_command
def command_deploy(ctx):
# type: (CephadmContext) -> None
- daemon_type, daemon_id = ctx.name.split('.', 1)
+ _common_deploy(ctx)
- lock = FileLock(ctx, ctx.fsid)
- lock.acquire()
+def read_configuration_source(ctx: CephadmContext) -> Dict[str, Any]:
+ """Read a JSON configuration based on the `ctx.source` value."""
+ source = '-'
+ if 'source' in ctx and ctx.source:
+ source = ctx.source
+ if source == '-':
+ config_data = json.load(sys.stdin)
+ else:
+ with open(source, 'rb') as fh:
+ config_data = json.load(fh)
+ logger.debug('Loaded deploy configuration: %r', config_data)
+ return config_data
+
+
+def apply_deploy_config_to_ctx(
+ config_data: Dict[str, Any],
+ ctx: CephadmContext,
+) -> None:
+ """Bind properties taken from the config_data dictionary to our ctx,
+ similar to how cli options on `deploy` are bound to the context.
+ """
+ ctx.name = config_data['name']
+ image = config_data.get('image', '')
+ if image:
+ ctx.image = image
+ if 'fsid' in config_data:
+ ctx.fsid = config_data['fsid']
+ if 'meta' in config_data:
+ ctx.meta_properties = config_data['meta']
+ if 'config_blobs' in config_data:
+ ctx.config_blobs = config_data['config_blobs']
+
+ # many functions don't check that an attribute is set on the ctx
+ # (with getattr or the '__contains__' func on ctx).
+ # This reuses the defaults from the CLI options so we don't
+ # have to repeat things and they can stay in sync.
+ facade = ArgumentFacade()
+ _add_deploy_parser_args(facade)
+ facade.apply(ctx)
+ for key, value in config_data.get('params', {}).items():
+ if key not in facade.defaults:
+ logger.warning('unexpected parameter: %r=%r', key, value)
+ setattr(ctx, key, value)
+ update_default_image(ctx)
+ logger.debug('Determined image: %r', ctx.image)
+
+
+def command_deploy_from(ctx: CephadmContext) -> None:
+ """The deploy-from command is similar to deploy but sources nearly all
+ configuration parameters from an input JSON configuration file.
+ """
+ config_data = read_configuration_source(ctx)
+ apply_deploy_config_to_ctx(config_data, ctx)
+ _common_deploy(ctx)
+
+
+def _common_deploy(ctx: CephadmContext) -> None:
+ daemon_type, daemon_id = ctx.name.split('.', 1)
if daemon_type not in get_supported_daemons():
raise Error('daemon type %s not recognized' % daemon_type)
- redeploy = False
- unit_name = get_unit_name(ctx.fsid, daemon_type, daemon_id)
- (_, state, _) = check_unit(ctx, unit_name)
- if state == 'running' or is_container_running(ctx, CephContainer.for_daemon(ctx, ctx.fsid, daemon_type, daemon_id, 'bash')):
- redeploy = True
+ lock = FileLock(ctx, ctx.fsid)
+ lock.acquire()
- if ctx.reconfig:
- logger.info('%s daemon %s ...' % ('Reconfig', ctx.name))
- elif redeploy:
- logger.info('%s daemon %s ...' % ('Redeploy', ctx.name))
- else:
- logger.info('%s daemon %s ...' % ('Deploy', ctx.name))
+ deployment_type = get_deployment_type(ctx, daemon_type, daemon_id)
# Migrate sysctl conf files from /usr/lib to /etc
migrate_sysctl_dir(ctx, ctx.fsid)
# Get and check ports explicitly required to be opened
- daemon_ports = [] # type: List[int]
+ endpoints = fetch_tcp_ports(ctx)
+ _dispatch_deploy(ctx, daemon_type, daemon_id, endpoints, deployment_type)
- # only check port in use if not reconfig or redeploy since service
- # we are redeploying/reconfiguring will already be using the port
- if not ctx.reconfig and not redeploy:
- if ctx.tcp_ports:
- daemon_ports = list(map(int, ctx.tcp_ports.split()))
+def _dispatch_deploy(
+ ctx: CephadmContext,
+ daemon_type: str,
+ daemon_id: str,
+ daemon_endpoints: List[EndPoint],
+ deployment_type: DeploymentType,
+) -> None:
if daemon_type in Ceph.daemons:
config, keyring = get_config_and_keyring(ctx)
uid, gid = extract_uid_gid(ctx)
make_var_run(ctx, ctx.fsid, uid, gid)
- config_json: Optional[Dict[str, str]] = None
- if 'config_json' in ctx and ctx.config_json:
- config_json = get_parm(ctx.config_json)
+ config_json = fetch_configs(ctx)
c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id,
ptrace=ctx.allow_ptrace)
deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
config=config, keyring=keyring,
osd_fsid=ctx.osd_fsid,
- reconfig=ctx.reconfig,
- ports=daemon_ports)
+ deployment_type=deployment_type,
+ endpoints=daemon_endpoints)
elif daemon_type in Monitoring.components:
# monitoring daemon - prometheus, grafana, alertmanager, node-exporter
# Default Checks
# make sure provided config-json is sufficient
- config = get_parm(ctx.config_json) # type: ignore
+ config = fetch_configs(ctx) # type: ignore
required_files = Monitoring.components[daemon_type].get('config-json-files', list())
required_args = Monitoring.components[daemon_type].get('config-json-args', list())
if required_files:
uid, gid = extract_uid_gid_monitoring(ctx, daemon_type)
c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
- reconfig=ctx.reconfig,
- ports=daemon_ports)
+ deployment_type=deployment_type,
+ endpoints=daemon_endpoints)
elif daemon_type == NFSGanesha.daemon_type:
- if not ctx.reconfig and not redeploy and not daemon_ports:
- daemon_ports = list(NFSGanesha.port_map.values())
+ # only check ports if this is a fresh deployment
+ if deployment_type == DeploymentType.DEFAULT and not daemon_endpoints:
+ nfs_ports = list(NFSGanesha.port_map.values())
+ daemon_endpoints = [EndPoint('0.0.0.0', p) for p in nfs_ports]
config, keyring = get_config_and_keyring(ctx)
# TODO: extract ganesha uid/gid (997, 994) ?
c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
config=config, keyring=keyring,
- reconfig=ctx.reconfig,
- ports=daemon_ports)
+ deployment_type=deployment_type,
+ endpoints=daemon_endpoints)
elif daemon_type == CephIscsi.daemon_type:
config, keyring = get_config_and_keyring(ctx)
c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
config=config, keyring=keyring,
- reconfig=ctx.reconfig,
- ports=daemon_ports)
+ deployment_type=deployment_type,
+ endpoints=daemon_endpoints)
+ elif daemon_type == CephNvmeof.daemon_type:
+ config, keyring = get_config_and_keyring(ctx)
+ uid, gid = 167, 167 # TODO: need to get properly the uid/gid
+ c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
+ deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
+ config=config, keyring=keyring,
+ deployment_type=deployment_type,
+ endpoints=daemon_endpoints)
elif daemon_type in Tracing.components:
uid, gid = 65534, 65534
c = get_container(ctx, ctx.fsid, daemon_type, daemon_id)
deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
- reconfig=ctx.reconfig,
- ports=daemon_ports)
+ deployment_type=deployment_type,
+ endpoints=daemon_endpoints)
elif daemon_type == HAproxy.daemon_type:
haproxy = HAproxy.init(ctx, ctx.fsid, daemon_id)
uid, gid = haproxy.extract_uid_gid_haproxy()
c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
- reconfig=ctx.reconfig,
- ports=daemon_ports)
+ deployment_type=deployment_type,
+ endpoints=daemon_endpoints)
elif daemon_type == Keepalived.daemon_type:
keepalived = Keepalived.init(ctx, ctx.fsid, daemon_id)
uid, gid = keepalived.extract_uid_gid_keepalived()
c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
- reconfig=ctx.reconfig,
- ports=daemon_ports)
+ deployment_type=deployment_type,
+ endpoints=daemon_endpoints)
elif daemon_type == CustomContainer.daemon_type:
cc = CustomContainer.init(ctx, ctx.fsid, daemon_id)
- if not ctx.reconfig and not redeploy:
- daemon_ports.extend(cc.ports)
+ # only check ports if this is a fresh deployment
+ if deployment_type == DeploymentType.DEFAULT:
+ daemon_endpoints.extend([EndPoint('0.0.0.0', p) for p in cc.ports])
c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id,
privileged=cc.privileged,
ptrace=ctx.allow_ptrace)
deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c,
uid=cc.uid, gid=cc.gid, config=None,
- keyring=None, reconfig=ctx.reconfig,
- ports=daemon_ports)
+ keyring=None,
+ deployment_type=deployment_type,
+ endpoints=daemon_endpoints)
elif daemon_type == CephadmAgent.daemon_type:
# get current user gid and uid
uid = os.getuid()
gid = os.getgid()
deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, None,
- uid, gid, ports=daemon_ports)
+ uid, gid,
+ deployment_type=deployment_type,
+ endpoints=daemon_endpoints)
elif daemon_type == SNMPGateway.daemon_type:
sc = SNMPGateway.init(ctx, ctx.fsid, daemon_id)
c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c,
sc.uid, sc.gid,
- ports=daemon_ports)
+ deployment_type=deployment_type,
+ endpoints=daemon_endpoints)
else:
raise Error('daemon type {} not implemented in command_deploy function'
privileged=True)
command = c.shell_cmd(command)
+ if ctx.dry_run:
+ print(' '.join(shlex.quote(arg) for arg in command))
+ return 0
+
return call_timeout(ctx, command, ctx.timeout)
##################################
version = NFSGanesha.get_version(ctx, container_id)
if daemon_type == CephIscsi.daemon_type:
version = CephIscsi.get_version(ctx, container_id)
+ if daemon_type == CephNvmeof.daemon_type:
+ version = CephNvmeof.get_version(ctx, container_id)
elif not version:
if daemon_type in Ceph.daemons:
out, err, code = call(ctx,
'haproxy', '-v'],
verbosity=CallVerbosity.QUIET)
if not code and \
- out.startswith('HA-Proxy version '):
+ out.startswith('HA-Proxy version ') or \
+ out.startswith('HAProxy version '):
version = out.split(' ')[2]
seen_versions[image_id] = version
elif daemon_type == 'keepalived':
# type: (CephadmContext, str, str) -> None
daemon_type = 'prometheus'
(uid, gid) = extract_uid_gid_monitoring(ctx, daemon_type)
+ # should try to set the ports we know cephadm defaults
+ # to for these services in the firewall.
+ ports = Monitoring.port_map['prometheus']
+ endpoints = [EndPoint('0.0.0.0', p) for p in ports]
_stop_and_disable(ctx, 'prometheus')
make_var_run(ctx, fsid, uid, gid)
c = get_container(ctx, fsid, daemon_type, daemon_id)
- deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid)
+ deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid,
+ deployment_type=DeploymentType.REDEPLOY, endpoints=endpoints)
update_firewalld(ctx, daemon_type)
daemon_type = 'grafana'
(uid, gid) = extract_uid_gid_monitoring(ctx, daemon_type)
+ # should try to set the ports we know cephadm defaults
+ # to for these services in the firewall.
+ ports = Monitoring.port_map['grafana']
+ endpoints = [EndPoint('0.0.0.0', p) for p in ports]
_stop_and_disable(ctx, 'grafana-server')
make_var_run(ctx, fsid, uid, gid)
c = get_container(ctx, fsid, daemon_type, daemon_id)
- deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid)
+ deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid,
+ deployment_type=DeploymentType.REDEPLOY, endpoints=endpoints)
update_firewalld(ctx, daemon_type)
daemon_type = 'alertmanager'
(uid, gid) = extract_uid_gid_monitoring(ctx, daemon_type)
+ # should try to set the ports we know cephadm defaults
+ # to for these services in the firewall.
+ ports = Monitoring.port_map['alertmanager']
+ endpoints = [EndPoint('0.0.0.0', p) for p in ports]
_stop_and_disable(ctx, 'prometheus-alertmanager')
make_var_run(ctx, fsid, uid, gid)
c = get_container(ctx, fsid, daemon_type, daemon_id)
- deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid)
+ deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid,
+ deployment_type=DeploymentType.REDEPLOY, endpoints=endpoints)
update_firewalld(ctx, daemon_type)
try:
with open(filename, 'r') as grafana_ini:
lines = grafana_ini.readlines()
- with open('{}.new'.format(filename), 'w') as grafana_ini:
+ with write_new(filename, perms=None) as grafana_ini:
server_section = False
for line in lines:
if line.startswith('['):
line = re.sub(r'^cert_key.*',
'cert_key = /etc/grafana/certs/cert_key', line)
grafana_ini.write(line)
- os.rename('{}.new'.format(filename), filename)
except OSError as err:
raise Error('Cannot update {}: {}'.format(filename, err))
else:
call_throws(ctx, ['rm', '-rf', data_dir])
- if 'tcp_ports' in ctx and ctx.tcp_ports is not None:
- ports: List[int] = [int(p) for p in ctx.tcp_ports.split()]
+ endpoints = fetch_tcp_ports(ctx)
+ ports: List[int] = [e.port for e in endpoints]
+ if ports:
try:
fw = Firewalld(ctx)
fw.close_ports(ports)
return len([c for c in os.listdir(ctx.data_dir) if is_fsid(c)])
-def command_rm_cluster(ctx):
- # type: (CephadmContext) -> None
+def command_rm_cluster(ctx: CephadmContext) -> None:
if not ctx.force:
raise Error('must pass --force to proceed: '
'this command may destroy precious data!')
lock = FileLock(ctx, ctx.fsid)
lock.acquire()
+ _rm_cluster(ctx, ctx.keep_logs, ctx.zap_osds)
+
+
+def _rm_cluster(ctx: CephadmContext, keep_logs: bool, zap_osds: bool) -> None:
+
+ if not ctx.fsid:
+ raise Error('must select the cluster to delete by passing --fsid to proceed')
def disable_systemd_service(unit_name: str) -> None:
call(ctx, ['systemctl', 'stop', unit_name],
call(ctx, ['systemctl', 'disable', unit_name],
verbosity=CallVerbosity.DEBUG)
+ logger.info(f'Deleting cluster with fsid: {ctx.fsid}')
+
# stop + disable individual daemon units
for d in list_daemons(ctx, detail=False):
if d['fsid'] != ctx.fsid:
verbosity=CallVerbosity.DEBUG)
# osds?
- if ctx.zap_osds:
+ if zap_osds:
_zap_osds(ctx)
# rm units
# rm data
call_throws(ctx, ['rm', '-rf', ctx.data_dir + '/' + ctx.fsid])
- if not ctx.keep_logs:
+ if not keep_logs:
# rm logs
call_throws(ctx, ['rm', '-rf', ctx.log_dir + '/' + ctx.fsid])
call_throws(ctx, ['rm', '-rf', ctx.log_dir
# rm cephadm logrotate config
call_throws(ctx, ['rm', '-f', ctx.logrotate_dir + '/cephadm'])
- if not ctx.keep_logs:
+ if not keep_logs:
# remove all cephadm logs
for fname in glob(f'{ctx.log_dir}/cephadm.log*'):
os.remove(fname)
p.unlink()
# cleanup remaining ceph directories
- ceph_dirs = [f'/run/ceph/{ctx.fsid}', f'/tmp/var/lib/ceph/{ctx.fsid}', f'/var/run/ceph/{ctx.fsid}']
+ ceph_dirs = [f'/run/ceph/{ctx.fsid}', f'/tmp/cephadm-{ctx.fsid}', f'/var/run/ceph/{ctx.fsid}']
for dd in ceph_dirs:
shutil.rmtree(dd, ignore_errors=True)
with open(auth_keys_file, 'a') as f:
os.fchown(f.fileno(), ssh_uid, ssh_gid) # just in case we created it
- os.fchmod(f.fileno(), 0o600) # just in case we created it
+ os.fchmod(f.fileno(), DEFAULT_MODE) # just in case we created it
if add_newline:
f.write('\n')
f.write(ssh_pub_key + '\n')
_, filename = tempfile.mkstemp()
with open(filename, 'w') as f:
os.fchown(f.fileno(), ssh_uid, ssh_gid)
- os.fchmod(f.fileno(), 0o600) # secure access to the keys file
+ os.fchmod(f.fileno(), DEFAULT_MODE) # secure access to the keys file
for line in lines:
if line.strip() == key.strip():
deleted = True
logger.warning('Cannot check ssh connectivity. Skipping...')
return
- logger.info('Verifying ssh connectivity ...')
+ ssh_priv_key_path = ''
+ ssh_pub_key_path = ''
+ ssh_signed_cert_path = ''
if ctx.ssh_private_key and ctx.ssh_public_key:
# let's use the keys provided by the user
ssh_priv_key_path = pathify(ctx.ssh_private_key.name)
ssh_pub_key_path = pathify(ctx.ssh_public_key.name)
+ elif ctx.ssh_private_key and ctx.ssh_signed_cert:
+ # CA signed keys use case
+ ssh_priv_key_path = pathify(ctx.ssh_private_key.name)
+ ssh_signed_cert_path = pathify(ctx.ssh_signed_cert.name)
else:
# no custom keys, let's generate some random keys just for this check
ssh_priv_key_path = f'/tmp/ssh_key_{uuid.uuid1()}'
logger.warning('Cannot generate keys to check ssh connectivity.')
return
- with open(ssh_pub_key_path, 'r') as f:
- key = f.read().strip()
- new_key = authorize_ssh_key(key, ctx.ssh_user)
- ssh_cfg_file_arg = ['-F', pathify(ctx.ssh_config.name)] if ctx.ssh_config else []
- _, _, code = call(ctx, ['ssh', '-o StrictHostKeyChecking=no',
- *ssh_cfg_file_arg, '-i', ssh_priv_key_path,
- '-o PasswordAuthentication=no',
- f'{ctx.ssh_user}@{get_hostname()}',
- 'sudo echo'])
-
- # we only remove the key if it's a new one. In case the user has provided
- # some already existing key then we don't alter authorized_keys file
- if new_key:
- revoke_ssh_key(key, ctx.ssh_user)
-
- pub_key_msg = '- The public key file configured by --ssh-public-key is valid\n' if ctx.ssh_public_key else ''
- prv_key_msg = '- The private key file configured by --ssh-private-key is valid\n' if ctx.ssh_private_key else ''
- ssh_cfg_msg = '- The ssh configuration file configured by --ssh-config is valid\n' if ctx.ssh_config else ''
- err_msg = f"""
+ if ssh_signed_cert_path:
+ logger.info('Verification for CA signed keys authentication not implemented. Skipping ...')
+ elif ssh_pub_key_path:
+ logger.info('Verifying ssh connectivity using standard pubkey authentication ...')
+ with open(ssh_pub_key_path, 'r') as f:
+ key = f.read().strip()
+ new_key = authorize_ssh_key(key, ctx.ssh_user)
+ ssh_cfg_file_arg = ['-F', pathify(ctx.ssh_config.name)] if ctx.ssh_config else []
+ _, _, code = call(ctx, ['ssh', '-o StrictHostKeyChecking=no',
+ *ssh_cfg_file_arg, '-i', ssh_priv_key_path,
+ '-o PasswordAuthentication=no',
+ f'{ctx.ssh_user}@{get_hostname()}',
+ 'sudo echo'])
+
+ # we only remove the key if it's a new one. In case the user has provided
+ # some already existing key then we don't alter authorized_keys file
+ if new_key:
+ revoke_ssh_key(key, ctx.ssh_user)
+
+ pub_key_msg = '- The public key file configured by --ssh-public-key is valid\n' if ctx.ssh_public_key else ''
+ prv_key_msg = '- The private key file configured by --ssh-private-key is valid\n' if ctx.ssh_private_key else ''
+ ssh_cfg_msg = '- The ssh configuration file configured by --ssh-config is valid\n' if ctx.ssh_config else ''
+ err_msg = f"""
** Please verify your user's ssh configuration and make sure:
- User {ctx.ssh_user} must have passwordless sudo access
{pub_key_msg}{prv_key_msg}{ssh_cfg_msg}
"""
- if code != 0:
- raise Error(err_msg)
+ if code != 0:
+ raise Error(err_msg)
def command_prepare_host(ctx: CephadmContext) -> None:
##################################
+class ArgumentFacade:
+ def __init__(self) -> None:
+ self.defaults: Dict[str, Any] = {}
+
+ def add_argument(self, *args: Any, **kwargs: Any) -> None:
+ if not args:
+ raise ValueError('expected at least one argument')
+ name = args[0]
+ if not name.startswith('--'):
+ raise ValueError(f'expected long option, got: {name!r}')
+ name = name[2:].replace('-', '_')
+ value = kwargs.pop('default', None)
+ self.defaults[name] = value
+
+ def apply(self, ctx: CephadmContext) -> None:
+ for key, value in self.defaults.items():
+ setattr(ctx, key, value)
+
+
+def _add_deploy_parser_args(
+ parser_deploy: Union[argparse.ArgumentParser, ArgumentFacade],
+) -> None:
+ parser_deploy.add_argument(
+ '--config', '-c',
+ help='config file for new daemon')
+ parser_deploy.add_argument(
+ '--config-json',
+ help='Additional configuration information in JSON format')
+ parser_deploy.add_argument(
+ '--keyring',
+ help='keyring for new daemon')
+ parser_deploy.add_argument(
+ '--key',
+ help='key for new daemon')
+ parser_deploy.add_argument(
+ '--osd-fsid',
+ help='OSD uuid, if creating an OSD container')
+ parser_deploy.add_argument(
+ '--skip-firewalld',
+ action='store_true',
+ help='Do not configure firewalld')
+ parser_deploy.add_argument(
+ '--tcp-ports',
+ help='List of tcp ports to open in the host firewall')
+ parser_deploy.add_argument(
+ '--port-ips',
+ help='JSON dict mapping ports to IPs they need to be bound on'
+ )
+ parser_deploy.add_argument(
+ '--reconfig',
+ action='store_true',
+ help='Reconfigure a previously deployed daemon')
+ parser_deploy.add_argument(
+ '--allow-ptrace',
+ action='store_true',
+ help='Allow SYS_PTRACE on daemon container')
+ parser_deploy.add_argument(
+ '--container-init',
+ action='store_true',
+ default=CONTAINER_INIT,
+ help=argparse.SUPPRESS)
+ parser_deploy.add_argument(
+ '--memory-request',
+ help='Container memory request/target'
+ )
+ parser_deploy.add_argument(
+ '--memory-limit',
+ help='Container memory hard limit'
+ )
+ parser_deploy.add_argument(
+ '--meta-json',
+ help='JSON dict of additional metadata'
+ )
+ parser_deploy.add_argument(
+ '--extra-container-args',
+ action='append',
+ default=[],
+ help='Additional container arguments to apply to daemon'
+ )
+ parser_deploy.add_argument(
+ '--extra-entrypoint-args',
+ action='append',
+ default=[],
+ help='Additional entrypoint arguments to apply to deamon'
+ )
+
+
def _get_parser():
# type: () -> argparse.ArgumentParser
parser = argparse.ArgumentParser(
subparsers = parser.add_subparsers(help='sub-command')
parser_version = subparsers.add_parser(
- 'version', help='get ceph version from container')
+ 'version', help='get cephadm version')
parser_version.set_defaults(func=command_version)
parser_pull = subparsers.add_parser(
'--no-hosts',
action='store_true',
help='dont pass /etc/hosts through to the container')
+ parser_shell.add_argument(
+ '--dry-run',
+ action='store_true',
+ help='print, but do not execute, the container command to start the shell')
parser_enter = subparsers.add_parser(
'enter', help='run an interactive shell inside a running daemon container')
'--ssh-public-key',
type=argparse.FileType('r'),
help='SSH public key')
+ parser_bootstrap.add_argument(
+ '--ssh-signed-cert',
+ type=argparse.FileType('r'),
+ help='Signed cert for setups using CA signed SSH keys')
parser_bootstrap.add_argument(
'--ssh-user',
default='root',
'--allow-overwrite',
action='store_true',
help='allow overwrite of existing --output-* config/keyring/ssh files')
+ parser_bootstrap.add_argument(
+ '--cleanup-on-failure',
+ action='store_true',
+ default=False,
+ help='Delete cluster files in case of a failed installation')
parser_bootstrap.add_argument(
'--allow-fqdn-hostname',
action='store_true',
'--fsid',
required=True,
help='cluster FSID')
- parser_deploy.add_argument(
- '--config', '-c',
- help='config file for new daemon')
- parser_deploy.add_argument(
- '--config-json',
- help='Additional configuration information in JSON format')
- parser_deploy.add_argument(
- '--keyring',
- help='keyring for new daemon')
- parser_deploy.add_argument(
- '--key',
- help='key for new daemon')
- parser_deploy.add_argument(
- '--osd-fsid',
- help='OSD uuid, if creating an OSD container')
- parser_deploy.add_argument(
- '--skip-firewalld',
- action='store_true',
- help='Do not configure firewalld')
- parser_deploy.add_argument(
- '--tcp-ports',
- help='List of tcp ports to open in the host firewall')
- parser_deploy.add_argument(
- '--reconfig',
- action='store_true',
- help='Reconfigure a previously deployed daemon')
- parser_deploy.add_argument(
- '--allow-ptrace',
- action='store_true',
- help='Allow SYS_PTRACE on daemon container')
- parser_deploy.add_argument(
- '--container-init',
- action='store_true',
- default=CONTAINER_INIT,
- help=argparse.SUPPRESS)
- parser_deploy.add_argument(
- '--memory-request',
- help='Container memory request/target'
- )
- parser_deploy.add_argument(
- '--memory-limit',
- help='Container memory hard limit'
- )
- parser_deploy.add_argument(
- '--meta-json',
- help='JSON dict of additional metadata'
+ _add_deploy_parser_args(parser_deploy)
+
+ parser_orch = subparsers.add_parser(
+ '_orch',
)
- parser_deploy.add_argument(
- '--extra-container-args',
- action='append',
- default=[],
- help='Additional container arguments to apply to daemon'
+ subparsers_orch = parser_orch.add_subparsers(
+ title='Orchestrator Driven Commands',
+ description='Commands that are typically only run by cephadm mgr module',
)
- parser_deploy.add_argument(
- '--extra-entrypoint-args',
- action='append',
- default=[],
- help='Additional entrypoint arguments to apply to deamon'
+
+ parser_deploy_from = subparsers_orch.add_parser(
+ 'deploy', help='deploy a daemon')
+ parser_deploy_from.set_defaults(func=command_deploy_from)
+ # currently cephadm mgr module passes an fsid option on the CLI too
+ # TODO: remove this and always source fsid from the JSON?
+ parser_deploy_from.add_argument(
+ '--fsid',
+ help='cluster FSID')
+ parser_deploy_from.add_argument(
+ 'source',
+ default='-',
+ nargs='?',
+ help='Configuration input source file',
)
parser_check_host = subparsers.add_parser(
sys.stderr.write('No command specified; pass -h or --help for usage\n')
sys.exit(1)
+ if ctx.has_function() and getattr(ctx.func, '_execute_early', False):
+ try:
+ sys.exit(ctx.func(ctx))
+ except Error as e:
+ if ctx.verbose:
+ raise
+ logger.error('ERROR: %s' % e)
+ sys.exit(1)
+
cephadm_require_root()
cephadm_init_logging(ctx, av)
try:
check_container_engine(ctx)
# command handler
r = ctx.func(ctx)
- except Error as e:
+ except (Error, ClusterAlreadyExists) as e:
if ctx.verbose:
raise
logger.error('ERROR: %s' % e)