import struct
import ssl
from enum import Enum
-
from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO, Sequence, TypeVar, cast, Set, Iterable
import re
DEFAULT_IMAGE_IS_MASTER = False
DEFAULT_IMAGE_RELEASE = 'quincy'
DEFAULT_PROMETHEUS_IMAGE = 'quay.io/prometheus/prometheus:v2.33.4'
+DEFAULT_LOKI_IMAGE = 'docker.io/grafana/loki:2.4.0'
+DEFAULT_PROMTAIL_IMAGE = 'docker.io/grafana/promtail:2.4.0'
DEFAULT_NODE_EXPORTER_IMAGE = 'quay.io/prometheus/node-exporter:v1.3.1'
DEFAULT_ALERT_MANAGER_IMAGE = 'quay.io/prometheus/alertmanager:v0.23.0'
DEFAULT_GRAFANA_IMAGE = 'quay.io/ceph/ceph-grafana:8.3.5'
LOG_DIR = '/var/log/ceph'
LOCK_DIR = '/run/cephadm'
LOGROTATE_DIR = '/etc/logrotate.d'
-SYSCTL_DIR = '/usr/lib/sysctl.d'
+SYSCTL_DIR = '/etc/sysctl.d'
UNIT_DIR = '/etc/systemd/system'
+CEPH_CONF_DIR = 'config'
+CEPH_CONF = 'ceph.conf'
+CEPH_PUBKEY = 'ceph.pub'
+CEPH_KEYRING = 'ceph.client.admin.keyring'
+CEPH_DEFAULT_CONF = f'/etc/ceph/{CEPH_CONF}'
+CEPH_DEFAULT_KEYRING = f'/etc/ceph/{CEPH_KEYRING}'
+CEPH_DEFAULT_PUBKEY = f'/etc/ceph/{CEPH_PUBKEY}'
LOG_DIR_MODE = 0o770
DATA_DIR_MODE = 0o700
CONTAINER_INIT = True
CUSTOM_PS1 = r'[ceph: \u@\h \W]\$ '
DEFAULT_TIMEOUT = None # in seconds
DEFAULT_RETRY = 15
-SHELL_DEFAULT_CONF = '/etc/ceph/ceph.conf'
-SHELL_DEFAULT_KEYRING = '/etc/ceph/ceph.client.admin.keyring'
DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ'
logger: logging.Logger = None # type: ignore
##################################
+class EndPoint:
+ """EndPoint representing an ip:port format"""
+
+ def __init__(self, ip: str, port: int) -> None:
+ self.ip = ip
+ self.port = port
+
+ def __str__(self) -> str:
+ return f'{self.ip}:{self.port}'
+
+ def __repr__(self) -> str:
+ return f'{self.ip}:{self.port}'
+
+
+class ContainerInfo:
+ def __init__(self, container_id: str,
+ image_name: str,
+ image_id: str,
+ start: str,
+ version: str) -> None:
+ self.container_id = container_id
+ self.image_name = image_name
+ self.image_id = image_id
+ self.start = start
+ self.version = version
+
+ def __eq__(self, other: Any) -> bool:
+ if not isinstance(other, ContainerInfo):
+ return NotImplemented
+ return (self.container_id == other.container_id
+ and self.image_name == other.image_name
+ and self.image_id == other.image_id
+ and self.start == other.start
+ and self.version == other.version)
+
+
class BaseConfig:
def __init__(self) -> None:
CONTAINER_PREFERENCE = (Podman, Docker) # prefer podman to docker
-# Log and console output config
+# During normal cephadm operations (cephadm ls, gather-facts, etc ) we use:
+# stdout: for JSON output only
+# stderr: for error, debug, info, etc
logging_config = {
'version': 1,
'disable_existing_loggers': True,
}
+class ExcludeErrorsFilter(logging.Filter):
+ def filter(self, record: logging.LogRecord) -> bool:
+ """Only lets through log messages with log level below WARNING ."""
+ return record.levelno < logging.WARNING
+
+
+# When cephadm is used as standard binary (bootstrap, rm-cluster, etc) we use:
+# stdout: for debug and info
+# stderr: for errors and warnings
+interactive_logging_config = {
+ 'version': 1,
+ 'filters': {
+ 'exclude_errors': {
+ '()': ExcludeErrorsFilter
+ }
+ },
+ 'disable_existing_loggers': True,
+ 'formatters': {
+ 'cephadm': {
+ 'format': '%(asctime)s %(thread)x %(levelname)s %(message)s'
+ },
+ },
+ 'handlers': {
+ 'console_stdout': {
+ 'level': 'INFO',
+ 'class': 'logging.StreamHandler',
+ 'filters': ['exclude_errors'],
+ 'stream': sys.stdout
+ },
+ 'console_stderr': {
+ 'level': 'WARNING',
+ 'class': 'logging.StreamHandler',
+ 'stream': sys.stderr
+ },
+ 'log_file': {
+ 'level': 'DEBUG',
+ 'class': 'logging.handlers.WatchedFileHandler',
+ 'formatter': 'cephadm',
+ 'filename': '%s/cephadm.log' % LOG_DIR,
+ }
+ },
+ 'loggers': {
+ '': {
+ 'level': 'DEBUG',
+ 'handlers': ['console_stdout', 'console_stderr', 'log_file'],
+ }
+ }
+}
+
+
class termcolor:
yellow = '\033[93m'
red = '\033[31m'
class TimeoutExpired(Error):
pass
+
+class UnauthorizedRegistryError(Error):
+ pass
+
##################################
class Ceph(object):
- daemons = ('mon', 'mgr', 'mds', 'osd', 'rgw', 'rbd-mirror',
+ daemons = ('mon', 'mgr', 'osd', 'mds', 'rgw', 'rbd-mirror',
'crash', 'cephfs-mirror')
##################################
'node-exporter': [9100],
'grafana': [3000],
'alertmanager': [9093, 9094],
+ 'loki': [3100],
+ 'promtail': [9080]
}
components = {
'prometheus.yml',
],
},
+ 'loki': {
+ 'image': DEFAULT_LOKI_IMAGE,
+ 'cpus': '1',
+ 'memory': '1GB',
+ 'args': [
+ '--config.file=/etc/loki/loki.yml',
+ ],
+ 'config-json-files': [
+ 'loki.yml'
+ ],
+ },
+ 'promtail': {
+ 'image': DEFAULT_PROMTAIL_IMAGE,
+ 'cpus': '1',
+ 'memory': '1GB',
+ 'args': [
+ '--config.file=/etc/promtail/promtail.yml',
+ ],
+ 'config-json-files': [
+ 'promtail.yml',
+ ],
+ },
'node-exporter': {
'image': DEFAULT_NODE_EXPORTER_IMAGE,
'cpus': '1',
def get_version(ctx, container_id, daemon_type):
# type: (CephadmContext, str, str) -> str
"""
- :param: daemon_type Either "prometheus", "alertmanager" or "node-exporter"
+ :param: daemon_type Either "prometheus", "alertmanager", "loki", "promtail" or "node-exporter"
"""
- assert daemon_type in ('prometheus', 'alertmanager', 'node-exporter')
+ assert daemon_type in ('prometheus', 'alertmanager', 'node-exporter', 'loki', 'promtail')
cmd = daemon_type.replace('-', '_')
code = -1
err = ''
def get_container_envs():
# type: () -> List[str]
envs = [
- 'CEPH_CONF=%s' % ('/etc/ceph/ceph.conf')
+ 'CEPH_CONF=%s' % (CEPH_DEFAULT_CONF)
]
return envs
))
-def check_ip_port(ctx, ip, port):
- # type: (CephadmContext, str, int) -> None
+def check_ip_port(ctx, ep):
+ # type: (CephadmContext, EndPoint) -> None
if not ctx.skip_ping_check:
- logger.info('Verifying IP %s port %d ...' % (ip, port))
- if is_ipv6(ip):
+ logger.info(f'Verifying IP {ep.ip} port {ep.port} ...')
+ if is_ipv6(ep.ip):
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
- ip = unwrap_ipv6(ip)
+ ip = unwrap_ipv6(ep.ip)
else:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- attempt_bind(ctx, s, ip, port)
+ ip = ep.ip
+ attempt_bind(ctx, s, ip, ep.port)
##################################
logger.info('Inferring fsid %s' % fsids[0])
ctx.fsid = fsids[0]
else:
- raise Error('Cannot infer an fsid, one must be specified: %s' % fsids)
+ raise Error('Cannot infer an fsid, one must be specified (using --fsid): %s' % fsids)
return func(ctx)
return cast(FuncT, _infer_fsid)
def infer_config(func: FuncT) -> FuncT:
"""
- If we find a MON daemon, use the config from that container
+ Infer the clusater configuration using the followign priority order:
+ 1- if the user has provided custom conf file (-c option) use it
+ 2- otherwise if daemon --name has been provided use daemon conf
+ 3- otherwise find the mon daemon conf file and use it (if v1)
+ 4- otherwise if {ctx.data_dir}/{fsid}/{CEPH_CONF_DIR} dir exists use it
+ 5- finally: fallback to the default file /etc/ceph/ceph.conf
"""
@wraps(func)
def _infer_config(ctx: CephadmContext) -> Any:
+
+ def config_path(daemon_type: str, daemon_name: str) -> str:
+ data_dir = get_data_dir(ctx.fsid, ctx.data_dir, daemon_type, daemon_name)
+ return os.path.join(data_dir, 'config')
+
+ def get_mon_daemon_name(fsid: str) -> Optional[str]:
+ daemon_list = list_daemons(ctx, detail=False)
+ for daemon in daemon_list:
+ if (
+ daemon.get('name', '').startswith('mon.')
+ and daemon.get('fsid', '') == fsid
+ and daemon.get('style', '') == 'cephadm:v1'
+ and os.path.exists(config_path('mon', daemon['name'].split('.', 1)[1]))
+ ):
+ return daemon['name']
+ return None
+
ctx.config = ctx.config if 'config' in ctx else None
- if ctx.config:
- logger.debug('Using specified config: %s' % ctx.config)
+ # check if user has provided conf by using -c option
+ if ctx.config and (ctx.config != CEPH_DEFAULT_CONF):
+ logger.debug(f'Using specified config: {ctx.config}')
return func(ctx)
+
if 'fsid' in ctx and ctx.fsid:
- name = ctx.name if 'name' in ctx else None
- if not name:
- daemon_list = list_daemons(ctx, detail=False)
- for daemon in daemon_list:
- if daemon.get('name', '').startswith('mon.') and daemon.get('fsid', '') == ctx.fsid:
- name = daemon['name']
- break
- if name:
- ctx.config = f'/var/lib/ceph/{ctx.fsid}/{name}/config'
+ name = ctx.name if ('name' in ctx and ctx.name) else get_mon_daemon_name(ctx.fsid)
+ if name is not None:
+ # daemon name has been specified (or inffered from mon), let's use its conf
+ ctx.config = config_path(name.split('.', 1)[0], name.split('.', 1)[1])
+ else:
+ # no daemon, in case the cluster has a config dir then use it
+ ceph_conf = f'{ctx.data_dir}/{ctx.fsid}/{CEPH_CONF_DIR}/{CEPH_CONF}'
+ if os.path.exists(ceph_conf):
+ ctx.config = ceph_conf
+
if ctx.config:
- logger.info('Inferring config %s' % ctx.config)
- elif os.path.exists(SHELL_DEFAULT_CONF):
- logger.debug('Using default config: %s' % SHELL_DEFAULT_CONF)
- ctx.config = SHELL_DEFAULT_CONF
+ logger.info(f'Inferring config {ctx.config}')
+ elif os.path.exists(CEPH_DEFAULT_CONF):
+ logger.debug(f'Using default config {CEPH_DEFAULT_CONF}')
+ ctx.config = CEPH_DEFAULT_CONF
return func(ctx)
return cast(FuncT, _infer_config)
if not ctx.image:
ctx.image = os.environ.get('CEPHADM_IMAGE')
if not ctx.image:
- ctx.image = get_last_local_ceph_image(ctx, ctx.container_engine.path)
+ ctx.image = infer_local_ceph_image(ctx, ctx.container_engine.path)
if not ctx.image:
ctx.image = _get_default_image(ctx)
return func(ctx)
return cast(FuncT, _default_image)
-def get_last_local_ceph_image(ctx: CephadmContext, container_path: str) -> Optional[str]:
+def get_container_info(ctx: CephadmContext, daemon_filter: str, by_name: bool) -> Optional[ContainerInfo]:
+ """
+ :param ctx: Cephadm context
+ :param daemon_filter: daemon name or type
+ :param by_name: must be set to True if daemon name is provided
+ :return: Container information or None
"""
+ def daemon_name_or_type(daemon: Dict[str, str]) -> str:
+ return daemon['name'] if by_name else daemon['name'].split('.', 1)[0]
+
+ if by_name and '.' not in daemon_filter:
+ logger.warning(f'Trying to get container info using invalid daemon name {daemon_filter}')
+ return None
+ daemons = list_daemons(ctx, detail=False)
+ matching_daemons = [d for d in daemons if daemon_name_or_type(d) == daemon_filter and d['fsid'] == ctx.fsid]
+ if matching_daemons:
+ d_type, d_id = matching_daemons[0]['name'].split('.', 1)
+ out, _, code = get_container_stats(ctx, ctx.container_engine.path, ctx.fsid, d_type, d_id)
+ if not code:
+ (container_id, image_name, image_id, start, version) = out.strip().split(',')
+ return ContainerInfo(container_id, image_name, image_id, start, version)
+ return None
+
+
+def infer_local_ceph_image(ctx: CephadmContext, container_path: str) -> Optional[str]:
+ """
+ Infer the local ceph image based on the following priority criteria:
+ 1- the image specified by --image arg (if provided).
+ 2- the same image as the daemon container specified by --name arg (if provided).
+ 3- image used by any ceph container running on the host. In this case we use daemon types.
+ 4- if no container is found then we use the most ceph recent image on the host.
+
+ Note: any selected container must have the same fsid inferred previously.
+
:return: The most recent local ceph image (already pulled)
"""
+ # '|' special character is used to separate the output fields into:
+ # - Repository@digest
+ # - Image Id
+ # - Image Tag
+ # - Image creation date
out, _, _ = call_throws(ctx,
[container_path, 'images',
'--filter', 'label=ceph=True',
'--filter', 'dangling=false',
- '--format', '{{.Repository}}@{{.Digest}}'])
- return _filter_last_local_ceph_image(out)
-
+ '--format', '{{.Repository}}@{{.Digest}}|{{.ID}}|{{.Tag}}|{{.CreatedAt}}'])
+
+ container_info = None
+ daemon_name = ctx.name if ('name' in ctx and ctx.name and '.' in ctx.name) else None
+ daemons_ls = [daemon_name] if daemon_name is not None else Ceph.daemons # daemon types: 'mon', 'mgr', etc
+ for daemon in daemons_ls:
+ container_info = get_container_info(ctx, daemon, daemon_name is not None)
+ if container_info is not None:
+ logger.debug(f"Using container info for daemon '{daemon}'")
+ break
-def _filter_last_local_ceph_image(out):
- # type: (str) -> Optional[str]
for image in out.splitlines():
- if image and not image.endswith('@'):
- logger.info('Using recent ceph image %s' % image)
- return image
+ if image and not image.isspace():
+ (digest, image_id, tag, created_date) = image.lstrip().split('|')
+ if container_info is not None and image_id not in container_info.image_id:
+ continue
+ if digest and not digest.endswith('@'):
+ logger.info(f"Using ceph image with id '{image_id}' and tag '{tag}' created on {created_date}\n{digest}")
+ return digest
return None
os.chown(dst_file, uid, gid)
+def recursive_chown(path: str, uid: int, gid: int) -> None:
+ for dirpath, dirnames, filenames in os.walk(path):
+ os.chown(dirpath, uid, gid)
+ for filename in filenames:
+ os.chown(os.path.join(dirpath, filename), uid, gid)
+
+
# copied from distutils
def find_executable(executable: str, path: Optional[str] = None) -> Optional[str]:
"""Tries to find 'executable' in the directories listed in 'path'.
metadata = Monitoring.components[daemon_type]
r += metadata.get('args', list())
# set ip and port to bind to for nodeexporter,alertmanager,prometheus
- if daemon_type != 'grafana':
+ if daemon_type not in ['grafana', 'loki', 'promtail']:
ip = ''
port = Monitoring.port_map[daemon_type][0]
if 'meta_json' in ctx and ctx.meta_json:
if 'ports' in meta and meta['ports']:
port = meta['ports'][0]
r += [f'--web.listen-address={ip}:{port}']
+ if daemon_type == 'prometheus':
+ scheme = 'http'
+ host = get_fqdn()
+ r += [f'--web.external-url={scheme}://{host}:{port}']
if daemon_type == 'alertmanager':
config = get_parm(ctx.config_json)
peers = config.get('peers', list()) # type: ignore
r += ['--cluster.peer={}'.format(peer)]
# some alertmanager, by default, look elsewhere for a config
r += ['--config.file=/etc/alertmanager/alertmanager.yml']
+ if daemon_type == 'loki':
+ r += ['--config.file=/etc/loki/loki.yml']
+ if daemon_type == 'promtail':
+ r += ['--config.file=/etc/promtail/promtail.yml']
+ if daemon_type == 'node-exporter':
+ r += ['--path.procfs=/host/proc',
+ '--path.sysfs=/host/sys',
+ '--path.rootfs=/rootfs']
elif daemon_type == NFSGanesha.daemon_type:
nfs_ganesha = NFSGanesha.init(ctx, fsid, daemon_id)
r += nfs_ganesha.get_daemon_args()
makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755)
makedirs(os.path.join(data_dir_root, config_dir, 'alerting'), uid, gid, 0o755)
makedirs(os.path.join(data_dir_root, 'data'), uid, gid, 0o755)
+ recursive_chown(os.path.join(data_dir_root, 'etc'), uid, gid)
+ recursive_chown(os.path.join(data_dir_root, 'data'), uid, gid)
elif daemon_type == 'grafana':
data_dir_root = get_data_dir(fsid, ctx.data_dir,
daemon_type, daemon_id)
config_dir = 'etc/alertmanager'
makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755)
makedirs(os.path.join(data_dir_root, config_dir, 'data'), uid, gid, 0o755)
+ elif daemon_type == 'promtail':
+ data_dir_root = get_data_dir(fsid, ctx.data_dir,
+ daemon_type, daemon_id)
+ config_dir = 'etc/promtail'
+ makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755)
+ makedirs(os.path.join(data_dir_root, 'data'), uid, gid, 0o755)
+ elif daemon_type == 'loki':
+ data_dir_root = get_data_dir(fsid, ctx.data_dir,
+ daemon_type, daemon_id)
+ config_dir = 'etc/loki'
+ makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755)
+ makedirs(os.path.join(data_dir_root, 'data'), uid, gid, 0o755)
# populate the config directory for the component from the config-json
if 'files' in config_json:
if daemon_type in Monitoring.components and daemon_id:
data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
+ log_dir = get_log_dir(fsid, ctx.log_dir)
if daemon_type == 'prometheus':
mounts[os.path.join(data_dir, 'etc/prometheus')] = '/etc/prometheus:Z'
mounts[os.path.join(data_dir, 'data')] = '/prometheus:Z'
+ elif daemon_type == 'loki':
+ mounts[os.path.join(data_dir, 'etc/loki')] = '/etc/loki:Z'
+ mounts[os.path.join(data_dir, 'data')] = '/loki:Z'
+ elif daemon_type == 'promtail':
+ mounts[os.path.join(data_dir, 'etc/promtail')] = '/etc/promtail:Z'
+ mounts[log_dir] = '/var/log/ceph:z'
+ mounts[os.path.join(data_dir, 'data')] = '/promtail:Z'
elif daemon_type == 'node-exporter':
mounts['/proc'] = '/host/proc:ro'
mounts['/sys'] = '/host/sys:ro'
# by ubuntu 18.04 kernel!)
]
container_args.extend(monitoring_args)
+ if daemon_type == 'node-exporter':
+ # in order to support setting '--path.procfs=/host/proc','--path.sysfs=/host/sys',
+ # '--path.rootfs=/rootfs' for node-exporter we need to disable selinux separation
+ # between the node-exporter container and the host to avoid selinux denials
+ container_args.extend(['--security-opt', 'label=disable'])
elif daemon_type == 'crash':
ceph_args = ['-n', name]
elif daemon_type in Ceph.daemons:
bind_mounts=get_container_binds(ctx, fsid, daemon_type, daemon_id),
cname='ceph-%s-%s.%s-activate' % (fsid, daemon_type, daemon_id),
)
+ if 'cluster' in ctx and ctx.cluster:
+ # ctx.cluster is only set during adoption of a daemon from a cluster
+ # with a custom name (not "ceph"). The initial activate command the first
+ # time we start the new cephadm based systemd unit for this osd must account
+ # for this by mounting to the correct data dir in the container. Otherwise
+ # necessary files from the old data dir of the daemon won't be copied over
+ # to the new data dir on the host. After the first start (e.g. on any redeploys)
+ # this is no longer necessary as we will have these files in the data dir on the host
+ if data_dir in prestart.volume_mounts:
+ prestart.volume_mounts[data_dir] = f'/var/lib/ceph/osd/{ctx.cluster}-{daemon_id}'
_write_container_cmd_to_bash(ctx, f, prestart, 'LVM OSDs use ceph-volume lvm activate')
elif daemon_type == CephIscsi.daemon_type:
f.write(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=True)) + '\n')
# post-stop command(s)
with open(data_dir + '/unit.stop.new', 'w') as f:
- f.write('! ' + ' '.join(c.stop_cmd()) + '\n')
- f.write('! ' + ' '.join(c.stop_cmd(old_cname=True)) + '\n')
+ # following generated script basically checks if the container exists
+ # before stopping it. Exit code will be success either if it doesn't
+ # exist or if it exists and is stopped successfully.
+ container_exists = f'{ctx.container_engine.path} inspect %s &>/dev/null'
+ f.write(f'! {container_exists % c.old_cname} || {" ".join(c.stop_cmd(old_cname=True))} \n')
+ f.write(f'! {container_exists % c.cname} || {" ".join(c.stop_cmd())} \n')
os.fchmod(f.fileno(), 0o600)
os.rename(data_dir + '/unit.stop.new',
def update_firewalld(ctx, daemon_type):
# type: (CephadmContext, str) -> None
- firewall = Firewalld(ctx)
- firewall.enable_service_for(daemon_type)
- firewall.apply_rules()
+ if not ('skip_firewalld' in ctx and ctx.skip_firewalld):
+ firewall = Firewalld(ctx)
+ firewall.enable_service_for(daemon_type)
+ firewall.apply_rules()
def install_sysctl(ctx: CephadmContext, fsid: str, daemon_type: str) -> None:
call_throws(ctx, ['sysctl', '--system'])
+def migrate_sysctl_dir(ctx: CephadmContext, fsid: str) -> None:
+ """
+ Cephadm once used '/usr/lib/sysctl.d' for storing sysctl configuration.
+ This moves it to '/etc/sysctl.d'.
+ """
+ deprecated_location: str = '/usr/lib/sysctl.d'
+ deprecated_confs: List[str] = glob(f'{deprecated_location}/90-ceph-{fsid}-*.conf')
+ if not deprecated_confs:
+ return
+
+ file_count: int = len(deprecated_confs)
+ logger.info(f'Found sysctl {file_count} files in deprecated location {deprecated_location}. Starting Migration.')
+ for conf in deprecated_confs:
+ try:
+ shutil.move(conf, ctx.sysctl_dir)
+ file_count -= 1
+ except shutil.Error as err:
+ if str(err).endswith('already exists'):
+ logger.warning(f'Destination file already exists. Deleting {conf}.')
+ try:
+ os.unlink(conf)
+ file_count -= 1
+ except OSError as del_err:
+ logger.warning(f'Could not remove {conf}: {del_err}.')
+ else:
+ logger.warning(f'Could not move {conf} from {deprecated_location} to {ctx.sysctl_dir}: {err}')
+
+ # Log successful migration
+ if file_count == 0:
+ logger.info(f'Successfully migrated sysctl config to {ctx.sysctl_dir}.')
+ return
+
+ # Log partially successful / unsuccessful migration
+ files_processed: int = len(deprecated_confs)
+ if file_count < files_processed:
+ status: str = f'partially successful (failed {file_count}/{files_processed})'
+ elif file_count == files_processed:
+ status = 'unsuccessful'
+ logger.warning(f'Migration of sysctl configuration {status}. You may want to perform a migration manually.')
+
+
def install_base_units(ctx, fsid):
# type: (CephadmContext, str) -> None
"""
LimitNPROC=1048576
EnvironmentFile=-/etc/environment
ExecStart=/bin/bash {data_dir}/{fsid}/%i/unit.run
-ExecStop=-/bin/bash -c '{container_path} stop ceph-{fsid}-%i ; bash {data_dir}/{fsid}/%i/unit.stop'
+ExecStop=-/bin/bash -c 'bash {data_dir}/{fsid}/%i/unit.stop'
ExecStopPost=-/bin/bash {data_dir}/{fsid}/%i/unit.poststop
KillMode=none
Restart=on-failure
{extra_args}
[Install]
WantedBy=ceph-{fsid}.target
-""".format(container_path=ctx.container_engine.path,
- fsid=fsid,
+""".format(fsid=fsid,
data_dir=ctx.data_dir,
extra_args=extra_args,
# if docker, we depend on docker.service
##################################
-@infer_image
+@default_image
def command_pull(ctx):
# type: (CephadmContext) -> int
- _pull_image(ctx, ctx.image, ctx.insecure)
+ try:
+ _pull_image(ctx, ctx.image, ctx.insecure)
+ except UnauthorizedRegistryError:
+ err_str = 'Failed to pull container image. Check that host(s) are logged into the registry'
+ logger.debug(f'Pulling image for `command_pull` failed: {err_str}')
+ raise Error(err_str)
return command_inspect_image(ctx)
if not ret:
return
+ if 'unauthorized' in err:
+ raise UnauthorizedRegistryError()
+
if not any(pattern in err for pattern in ignorelist):
raise Error('Failed command: %s' % cmd_str)
subnet_list = subnets.split(',')
for subnet in subnet_list:
# ensure the format of the string is as expected address/netmask
+ subnet = subnet.strip()
if not re.search(r'\/\d+$', subnet):
rc = 1
errors.append(f'{subnet} is not in CIDR format (address/netmask)')
return False
-def prepare_mon_addresses(
- ctx: CephadmContext
-) -> Tuple[str, bool, Optional[str]]:
+def ip_in_subnets(ip_addr: str, subnets: str) -> bool:
+ """Determine if the ip_addr belongs to any of the subnets list."""
+ subnet_list = [x.strip() for x in subnets.split(',')]
+ for subnet in subnet_list:
+ ip_address = unwrap_ipv6(ip_addr) if is_ipv6(ip_addr) else ip_addr
+ if ipaddress.ip_address(ip_address) in ipaddress.ip_network(subnet):
+ return True
+ return False
+
+
+def parse_mon_addrv(addrv_arg: str) -> List[EndPoint]:
+ """Parse mon-addrv param into a list of mon end points."""
r = re.compile(r':(\d+)$')
- base_ip = ''
+ addrv_args = []
+ addr_arg = addrv_arg
+ if addr_arg[0] != '[' or addr_arg[-1] != ']':
+ raise Error(f'--mon-addrv value {addr_arg} must use square backets')
+
+ for addr in addr_arg[1: -1].split(','):
+ hasport = r.findall(addr)
+ if not hasport:
+ raise Error(f'--mon-addrv value {addr_arg} must include port number')
+ port_str = hasport[0]
+ addr = re.sub(r'^v\d+:', '', addr) # strip off v1: or v2: prefix
+ base_ip = addr[0:-(len(port_str)) - 1]
+ addrv_args.append(EndPoint(base_ip, int(port_str)))
+
+ return addrv_args
+
+
+def parse_mon_ip(mon_ip: str) -> List[EndPoint]:
+ """Parse mon-ip param into a list of mon end points."""
+ r = re.compile(r':(\d+)$')
+ addrv_args = []
+ hasport = r.findall(mon_ip)
+ if hasport:
+ port_str = hasport[0]
+ base_ip = mon_ip[0:-(len(port_str)) - 1]
+ addrv_args.append(EndPoint(base_ip, int(port_str)))
+ else:
+ # No port provided: use fixed ports for ceph monitor
+ addrv_args.append(EndPoint(mon_ip, 3300))
+ addrv_args.append(EndPoint(mon_ip, 6789))
+
+ return addrv_args
+
+
+def build_addrv_params(addrv: List[EndPoint]) -> str:
+ """Convert mon end-points (ip:port) into the format: [v[1|2]:ip:port1]"""
+ if len(addrv) > 2:
+ raise Error('Detected a local mon-addrv list with more than 2 entries.')
+ port_to_ver: Dict[int, str] = {6789: 'v1', 3300: 'v2'}
+ addr_arg_list: List[str] = []
+ for ep in addrv:
+ if ep.port in port_to_ver:
+ ver = port_to_ver[ep.port]
+ else:
+ ver = 'v2' # default mon protocol version if port is not provided
+ logger.warning(f'Using msgr2 protocol for unrecognized port {ep}')
+ addr_arg_list.append(f'{ver}:{ep.ip}:{ep.port}')
+
+ addr_arg = '[{0}]'.format(','.join(addr_arg_list))
+ return addr_arg
+
+
+def get_public_net_from_cfg(ctx: CephadmContext) -> Optional[str]:
+ """Get mon public network from configuration file."""
+ cp = read_config(ctx.config)
+ if not cp.has_option('global', 'public_network'):
+ return None
+
+ # Ensure all public CIDR networks are valid
+ public_network = cp.get('global', 'public_network')
+ rc, _, err_msg = check_subnet(public_network)
+ if rc:
+ raise Error(f'Invalid public_network {public_network} parameter: {err_msg}')
+
+ # Ensure all public CIDR networks are configured locally
+ configured_subnets = set([x.strip() for x in public_network.split(',')])
+ local_subnets = set([x[0] for x in list_networks(ctx).items()])
+ valid_public_net = False
+ for net in configured_subnets:
+ if net in local_subnets:
+ valid_public_net = True
+ else:
+ logger.warning(f'The public CIDR network {net} (from -c conf file) is not configured locally.')
+ if not valid_public_net:
+ raise Error(f'None of the public CIDR network(s) {configured_subnets} (from -c conf file) is configured locally.')
+
+ # Ensure public_network is compatible with the provided mon-ip (or mon-addrv)
+ if ctx.mon_ip:
+ if not ip_in_subnets(ctx.mon_ip, public_network):
+ raise Error(f'The provided --mon-ip {ctx.mon_ip} does not belong to any public_network(s) {public_network}')
+ elif ctx.mon_addrv:
+ addrv_args = parse_mon_addrv(ctx.mon_addrv)
+ for addrv in addrv_args:
+ if not ip_in_subnets(addrv.ip, public_network):
+ raise Error(f'The provided --mon-addrv {addrv.ip} ip does not belong to any public_network(s) {public_network}')
+
+ logger.debug(f'Using mon public network from configuration file {public_network}')
+ return public_network
+
+
+def infer_mon_network(ctx: CephadmContext, mon_eps: List[EndPoint]) -> Optional[str]:
+ """Infer mon public network from local network."""
+ # Make sure IP is configured locally, and then figure out the CIDR network
+ mon_networks = []
+ for net, ifaces in list_networks(ctx).items():
+ # build local_ips list for the specified network
+ local_ips: List[str] = []
+ for _, ls in ifaces.items():
+ local_ips.extend([ipaddress.ip_address(ip) for ip in ls])
+
+ # check if any of mon ips belong to this net
+ for mon_ep in mon_eps:
+ try:
+ if ipaddress.ip_address(unwrap_ipv6(mon_ep.ip)) in local_ips:
+ mon_networks.append(net)
+ logger.info(f'Mon IP `{mon_ep.ip}` is in CIDR network `{net}`')
+ except ValueError as e:
+ logger.warning(f'Cannot infer CIDR network for mon IP `{mon_ep.ip}` : {e}')
+
+ if not mon_networks:
+ raise Error('Cannot infer CIDR network. Pass --skip-mon-network to configure it later')
+ else:
+ logger.debug(f'Inferred mon public CIDR from local network configuration {mon_networks}')
+
+ mon_networks = list(set(mon_networks)) # remove duplicates
+ return ','.join(mon_networks)
+
+
+def prepare_mon_addresses(ctx: CephadmContext) -> Tuple[str, bool, Optional[str]]:
+ """Get mon public network configuration."""
ipv6 = False
+ addrv_args: List[EndPoint] = []
+ mon_addrv: str = '' # i.e: [v2:192.168.100.1:3300,v1:192.168.100.1:6789]
if ctx.mon_ip:
ipv6 = is_ipv6(ctx.mon_ip)
if ipv6:
ctx.mon_ip = wrap_ipv6(ctx.mon_ip)
- hasport = r.findall(ctx.mon_ip)
- if hasport:
- port_str = hasport[0]
- port = int(port_str)
- if port == 6789:
- addr_arg = '[v1:%s]' % ctx.mon_ip
- elif port == 3300:
- addr_arg = '[v2:%s]' % ctx.mon_ip
- else:
- logger.warning('Using msgr2 protocol for unrecognized port %d' %
- port)
- addr_arg = '[v2:%s]' % ctx.mon_ip
- base_ip = ctx.mon_ip[0:-(len(port_str)) - 1]
- check_ip_port(ctx, base_ip, port)
- else:
- base_ip = ctx.mon_ip
- addr_arg = '[v2:%s:3300,v1:%s:6789]' % (ctx.mon_ip, ctx.mon_ip)
- check_ip_port(ctx, ctx.mon_ip, 3300)
- check_ip_port(ctx, ctx.mon_ip, 6789)
+ addrv_args = parse_mon_ip(ctx.mon_ip)
+ mon_addrv = build_addrv_params(addrv_args)
elif ctx.mon_addrv:
- addr_arg = ctx.mon_addrv
- if addr_arg[0] != '[' or addr_arg[-1] != ']':
- raise Error('--mon-addrv value %s must use square backets' %
- addr_arg)
- ipv6 = addr_arg.count('[') > 1
- for addr in addr_arg[1: -1].split(','):
- hasport = r.findall(addr)
- if not hasport:
- raise Error('--mon-addrv value %s must include port number' %
- addr_arg)
- port_str = hasport[0]
- port = int(port_str)
- # strip off v1: or v2: prefix
- addr = re.sub(r'^v\d+:', '', addr)
- base_ip = addr[0:-(len(port_str)) - 1]
- check_ip_port(ctx, base_ip, port)
+ ipv6 = ctx.mon_addrv.count('[') > 1
+ addrv_args = parse_mon_addrv(ctx.mon_addrv)
+ mon_addrv = ctx.mon_addrv
else:
raise Error('must specify --mon-ip or --mon-addrv')
- logger.debug('Base mon IP is %s, final addrv is %s' % (base_ip, addr_arg))
+ if addrv_args:
+ for end_point in addrv_args:
+ check_ip_port(ctx, end_point)
+
+ logger.debug(f'Base mon IP(s) is {addrv_args}, mon addrv is {mon_addrv}')
mon_network = None
if not ctx.skip_mon_network:
- # make sure IP is configured locally, and then figure out the
- # CIDR network
- errmsg = f'Cannot infer CIDR network for mon IP `{base_ip}`'
- for net, ifaces in list_networks(ctx).items():
- ips: List[str] = []
- for iface, ls in ifaces.items():
- ips.extend(ls)
- try:
- if ipaddress.ip_address(unwrap_ipv6(base_ip)) in \
- [ipaddress.ip_address(ip) for ip in ips]:
- mon_network = net
- logger.info(f'Mon IP `{base_ip}` is in CIDR network `{mon_network}`')
- break
- except ValueError as e:
- logger.warning(f'{errmsg}: {e}')
- if not mon_network:
- raise Error(f'{errmsg}: pass --skip-mon-network to configure it later')
+ mon_network = get_public_net_from_cfg(ctx) or infer_mon_network(ctx, addrv_args)
- return (addr_arg, ipv6, mon_network)
+ return (mon_addrv, ipv6, mon_network)
def prepare_cluster_network(ctx: CephadmContext) -> Tuple[str, bool]:
- cluster_network = ''
- ipv6_cluster_network = False
# the cluster network may not exist on this node, so all we can do is
# validate that the address given is valid ipv4 or ipv6 subnet
- if ctx.cluster_network:
- rc, versions, err_msg = check_subnet(ctx.cluster_network)
+ ipv6_cluster_network = False
+ cp = read_config(ctx.config)
+ cluster_network = ctx.cluster_network
+ if cluster_network is None and cp.has_option('global', 'cluster_network'):
+ cluster_network = cp.get('global', 'cluster_network')
+
+ if cluster_network:
+ cluser_nets = set([x.strip() for x in cluster_network.split(',')])
+ local_subnets = set([x[0] for x in list_networks(ctx).items()])
+ for net in cluser_nets:
+ if net not in local_subnets:
+ logger.warning(f'The cluster CIDR network {net} is not configured locally.')
+
+ rc, versions, err_msg = check_subnet(cluster_network)
if rc:
raise Error(f'Invalid --cluster-network parameter: {err_msg}')
- cluster_network = ctx.cluster_network
ipv6_cluster_network = True if 6 in versions else False
else:
- logger.info('- internal network (--cluster-network) has not '
+ logger.info('Internal network (--cluster-network) has not '
'been provided, OSD replication will default to '
'the public_network')
}
cli(['cephadm', 'set-priv-key', '-i', '/tmp/cephadm-ssh-key'], extra_mounts=mounts)
cli(['cephadm', 'set-pub-key', '-i', '/tmp/cephadm-ssh-key.pub'], extra_mounts=mounts)
+ ssh_pub = cli(['cephadm', 'get-pub-key'])
else:
logger.info('Generating ssh key...')
cli(['cephadm', 'generate-key'])
ssh_pub = cli(['cephadm', 'get-pub-key'])
-
with open(ctx.output_pub_ssh_key, 'w') as f:
f.write(ssh_pub)
logger.info('Wrote public SSH key to %s' % ctx.output_pub_ssh_key)
- logger.info('Adding key to %s@localhost authorized_keys...' % ctx.ssh_user)
- try:
- s_pwd = pwd.getpwnam(ctx.ssh_user)
- except KeyError:
- raise Error('Cannot find uid/gid for ssh-user: %s' % (ctx.ssh_user))
- ssh_uid = s_pwd.pw_uid
- ssh_gid = s_pwd.pw_gid
- ssh_dir = os.path.join(s_pwd.pw_dir, '.ssh')
-
- if not os.path.exists(ssh_dir):
- makedirs(ssh_dir, ssh_uid, ssh_gid, 0o700)
-
- auth_keys_file = '%s/authorized_keys' % ssh_dir
- add_newline = False
-
- if os.path.exists(auth_keys_file):
- with open(auth_keys_file, 'r') as f:
- f.seek(0, os.SEEK_END)
- if f.tell() > 0:
- f.seek(f.tell() - 1, os.SEEK_SET) # go to last char
- if f.read() != '\n':
- add_newline = True
-
- with open(auth_keys_file, 'a') as f:
- os.fchown(f.fileno(), ssh_uid, ssh_gid) # just in case we created it
- os.fchmod(f.fileno(), 0o600) # just in case we created it
- if add_newline:
- f.write('\n')
- f.write(ssh_pub.strip() + '\n')
+ authorize_ssh_key(ssh_pub, ctx.ssh_user)
host = get_hostname()
logger.info('Adding host %s...' % host)
args = ['orch', 'host', 'add', host]
if ctx.mon_ip:
args.append(unwrap_ipv6(ctx.mon_ip))
+ elif ctx.mon_addrv:
+ addrv_args = parse_mon_addrv(ctx.mon_addrv)
+ args.append(unwrap_ipv6(addrv_args[0].ip))
cli(args)
except RuntimeError as e:
raise Error('Failed to add host <%s>: %s' % (host, e))
port = int(out)
# Open dashboard port
- fw = Firewalld(ctx)
- fw.open_ports([port])
- fw.apply_rules()
+ if not ('skip_firewalld' in ctx and ctx.skip_firewalld):
+ fw = Firewalld(ctx)
+ fw.open_ports([port])
+ fw.apply_rules()
logger.info('Ceph Dashboard is now available at:\n\n'
'\t URL: https://%s:%s/\n'
'restart',
get_unit_name(fsid, 'mon', mon_id)
])
+ elif 'image' in ctx and ctx.image:
+ # we still want to assimilate the given container image if provided
+ cli(['config', 'set', 'global', 'container_image', f'{ctx.image}'])
if mon_network:
logger.info(f'Setting mon public_network to {mon_network}')
docs = []
current_doc = [] # type: List[str]
for line in f:
- if '---' in line:
+ if re.search(r'^---\s+', line):
if current_doc:
docs.append(current_doc)
current_doc = []
def _distribute_ssh_keys(ctx: CephadmContext, host_spec: Dict[str, str], bootstrap_hostname: str) -> int:
# copy ssh key to hosts in host spec (used for apply spec)
- ssh_key = '/etc/ceph/ceph.pub'
+ ssh_key = CEPH_DEFAULT_PUBKEY
if ctx.ssh_public_key:
ssh_key = ctx.ssh_public_key.name
return 0
+def save_cluster_config(ctx: CephadmContext, uid: int, gid: int, fsid: str) -> None:
+ """Save cluster configuration to the per fsid directory """
+ def copy_file(src: str, dst: str) -> None:
+ if src:
+ shutil.copyfile(src, dst)
+
+ conf_dir = f'{ctx.data_dir}/{fsid}/{CEPH_CONF_DIR}'
+ makedirs(conf_dir, uid, gid, DATA_DIR_MODE)
+ if os.path.exists(conf_dir):
+ logger.info(f'Saving cluster configuration to {conf_dir} directory')
+ copy_file(ctx.output_config, os.path.join(conf_dir, CEPH_CONF))
+ copy_file(ctx.output_keyring, os.path.join(conf_dir, CEPH_KEYRING))
+ # ctx.output_pub_ssh_key may not exist if user has provided custom ssh keys
+ if (os.path.exists(ctx.output_pub_ssh_key)):
+ copy_file(ctx.output_pub_ssh_key, os.path.join(conf_dir, CEPH_PUBKEY))
+ else:
+ logger.warning(f'Cannot create cluster configuration directory {conf_dir}')
+
+
@default_image
def command_bootstrap(ctx):
# type: (CephadmContext) -> int
if not ctx.output_config:
- ctx.output_config = os.path.join(ctx.output_dir, 'ceph.conf')
+ ctx.output_config = os.path.join(ctx.output_dir, CEPH_CONF)
if not ctx.output_keyring:
- ctx.output_keyring = os.path.join(ctx.output_dir,
- 'ceph.client.admin.keyring')
+ ctx.output_keyring = os.path.join(ctx.output_dir, CEPH_KEYRING)
if not ctx.output_pub_ssh_key:
- ctx.output_pub_ssh_key = os.path.join(ctx.output_dir, 'ceph.pub')
+ ctx.output_pub_ssh_key = os.path.join(ctx.output_dir, CEPH_PUBKEY)
+
+ if bool(ctx.ssh_private_key) is not bool(ctx.ssh_public_key):
+ raise Error('--ssh-private-key and --ssh-public-key must be provided together or not at all.')
+
+ if ctx.fsid:
+ data_dir_base = os.path.join(ctx.data_dir, ctx.fsid)
+ if os.path.exists(data_dir_base):
+ raise Error(f"A cluster with the same fsid '{ctx.fsid}' already exists.")
+ else:
+ logger.warning('Specifying an fsid for your cluster offers no advantages and may increase the likelihood of fsid conflicts.')
# verify output files
for f in [ctx.output_config, ctx.output_keyring,
(user_conf, _) = get_config_and_keyring(ctx)
+ if ctx.ssh_user != 'root':
+ check_ssh_connectivity(ctx)
+
if not ctx.skip_prepare_host:
command_prepare_host(ctx)
else:
config = prepare_bootstrap_config(ctx, fsid, addr_arg, ctx.image)
if not ctx.skip_pull:
- _pull_image(ctx, ctx.image)
+ try:
+ _pull_image(ctx, ctx.image)
+ except UnauthorizedRegistryError:
+ err_str = 'Failed to pull container image. Check that correct registry credentials are provided in bootstrap by --registry-url, --registry-username, --registry-password, or supply --registry-json with credentials'
+ logger.debug(f'Pulling image for bootstrap on {hostname} failed: {err_str}')
+ raise Error(err_str)
image_ver = CephContainer(ctx, ctx.image, 'ceph', ['--version']).run().strip()
logger.info(f'Ceph version: {image_ver}')
if not ctx.skip_dashboard:
prepare_dashboard(ctx, uid, gid, cli, wait_for_mgr_restart)
- if ctx.output_config == '/etc/ceph/ceph.conf' and not ctx.skip_admin_label:
+ if ctx.output_config == CEPH_DEFAULT_CONF and not ctx.skip_admin_label and not ctx.no_minimize_config:
logger.info('Enabling client.admin keyring and conf on hosts with "admin" label')
try:
cli(['orch', 'client-keyring', 'set', 'client.admin', 'label:_admin'])
except Exception:
logger.info('\nApplying %s to cluster failed!\n' % ctx.apply_spec)
+ save_cluster_config(ctx, uid, gid, fsid)
+
# enable autotune for osd_memory_target
logger.info('Enabling autotune for osd_memory_target')
cli(['config', 'set', 'osd', 'osd_memory_target_autotune', 'true'])
# Notify the Dashboard to show the 'Expand cluster' page on first log in.
cli(['config-key', 'set', 'mgr/dashboard/cluster/status', 'INSTALLED'])
- logger.info('You can access the Ceph CLI with:\n\n'
+ logger.info('You can access the Ceph CLI as following in case of multi-cluster or non-default config:\n\n'
'\tsudo %s shell --fsid %s -c %s -k %s\n' % (
sys.argv[0],
fsid,
ctx.output_config,
ctx.output_keyring))
+
+ logger.info('Or, if you are only running a single cluster on this host:\n\n\tsudo %s shell \n' % (sys.argv[0]))
+
logger.info('Please consider enabling telemetry to help improve Ceph:\n\n'
'\tceph telemetry on\n\n'
'For more information see:\n\n'
uid, gid = 65534, 65534
elif daemon_type == 'grafana':
uid, gid = extract_uid_gid(ctx, file_path='/var/lib/grafana')
+ elif daemon_type == 'loki':
+ uid, gid = extract_uid_gid(ctx, file_path='/etc/loki')
+ elif daemon_type == 'promtail':
+ uid, gid = extract_uid_gid(ctx, file_path='/etc/promtail')
elif daemon_type == 'alertmanager':
uid, gid = extract_uid_gid(ctx, file_path=['/etc/alertmanager', '/etc/prometheus'])
else:
else:
logger.info('%s daemon %s ...' % ('Deploy', ctx.name))
+ # Migrate sysctl conf files from /usr/lib to /etc
+ migrate_sysctl_dir(ctx, ctx.fsid)
+
# Get and check ports explicitly required to be opened
daemon_ports = [] # type: List[int]
if daemon_id and not ctx.fsid:
raise Error('must pass --fsid to specify cluster')
- # use /etc/ceph files by default, if present. we do this instead of
+ # in case a dedicated keyring for the specified fsid is found we us it.
+ # Otherwise, use /etc/ceph files by default, if present. We do this instead of
# making these defaults in the arg parser because we don't want an error
# if they don't exist.
- if not ctx.keyring and os.path.exists(SHELL_DEFAULT_KEYRING):
- ctx.keyring = SHELL_DEFAULT_KEYRING
+ if not ctx.keyring:
+ keyring_file = f'{ctx.data_dir}/{ctx.fsid}/{CEPH_CONF_DIR}/{CEPH_KEYRING}'
+ if os.path.exists(keyring_file):
+ ctx.keyring = keyring_file
+ elif os.path.exists(CEPH_DEFAULT_KEYRING):
+ ctx.keyring = CEPH_DEFAULT_KEYRING
container_args: List[str] = ['-i']
mounts = get_container_mounts(ctx, ctx.fsid, daemon_type, daemon_id,
@infer_fsid
def command_unit(ctx):
- # type: (CephadmContext) -> None
+ # type: (CephadmContext) -> int
if not ctx.fsid:
raise Error('must pass --fsid to specify cluster')
unit_name = get_unit_name_by_daemon_name(ctx, ctx.fsid, ctx.name)
- call_throws(ctx, [
- 'systemctl',
- ctx.command,
- unit_name],
+ _, _, code = call(
+ ctx,
+ ['systemctl', ctx.command, unit_name],
verbosity=CallVerbosity.VERBOSE,
desc=''
)
+ return code
##################################
def _parse_ipv4_route(out: str) -> Dict[str, Dict[str, Set[str]]]:
r = {} # type: Dict[str, Dict[str, Set[str]]]
- p = re.compile(r'^(\S+) dev (\S+) (.*)scope link (.*)src (\S+)')
+ p = re.compile(r'^(\S+) (?:via \S+)? ?dev (\S+) (.*)scope link (.*)src (\S+)')
for line in out.splitlines():
m = p.findall(line)
if not m:
continue
net = m[0][0]
+ if '/' not in net: # aggregate /32 mask for single host sub-networks
+ net += '/32'
iface = m[0][1]
ip = m[0][4]
if net not in r:
if not m or m[0][0].lower() == 'default':
continue
net = m[0][0]
- if '/' not in net: # only consider networks with a mask
- continue
+ if '/' not in net: # aggregate /128 mask for single host sub-networks
+ net += '/128'
iface = m[0][1]
+ if iface == 'lo': # skip loopback devices
+ continue
if net not in r:
r[net] = {}
if iface not in r[net]:
# keep track of image digests
seen_digests = {} # type: Dict[str, List[str]]
- # keep track of memory usage we've seen
+ # keep track of memory and cpu usage we've seen
seen_memusage = {} # type: Dict[str, int]
+ seen_cpuperc = {} # type: Dict[str, str]
out, err, code = call(
ctx,
[container_path, 'stats', '--format', '{{.ID}},{{.MemUsage}}', '--no-stream'],
)
seen_memusage_cid_len, seen_memusage = _parse_mem_usage(code, out)
+ out, err, code = call(
+ ctx,
+ [container_path, 'stats', '--format', '{{.ID}},{{.CPUPerc}}', '--no-stream'],
+ verbosity=CallVerbosity.DEBUG
+ )
+ seen_cpuperc_cid_len, seen_cpuperc = _parse_cpu_perc(code, out)
+
# /var/lib/ceph
if os.path.exists(data_dir):
for i in os.listdir(data_dir):
seen_versions[image_id] = version
elif daemon_type in ['prometheus',
'alertmanager',
- 'node-exporter']:
+ 'node-exporter',
+ 'loki',
+ 'promtail']:
version = Monitoring.get_version(ctx, container_id, daemon_type)
seen_versions[image_id] = version
elif daemon_type == 'haproxy':
val['container_image_digests'] = image_digests
if container_id:
val['memory_usage'] = seen_memusage.get(container_id[0:seen_memusage_cid_len])
+ val['cpu_percentage'] = seen_cpuperc.get(container_id[0:seen_cpuperc_cid_len])
val['version'] = version
val['started'] = start_stamp
val['created'] = get_file_timestamp(
os.path.join(data_dir, fsid, j, 'unit.image'))
val['configured'] = get_file_timestamp(
os.path.join(data_dir, fsid, j, 'unit.configured'))
-
ls.append(val)
return ls
return seen_memusage_cid_len, seen_memusage
+def _parse_cpu_perc(code: int, out: str) -> Tuple[int, Dict[str, str]]:
+ seen_cpuperc = {}
+ seen_cpuperc_cid_len = 0
+ if not code:
+ for line in out.splitlines():
+ (cid, cpuperc) = line.split(',')
+ try:
+ seen_cpuperc[cid] = cpuperc
+ if not seen_cpuperc_cid_len:
+ seen_cpuperc_cid_len = len(cid)
+ except ValueError:
+ logger.info('unable to parse cpu percentage line\n>{}'.format(line))
+ pass
+ return seen_cpuperc_cid_len, seen_cpuperc
+
+
def get_daemon_description(ctx, fsid, name, detail=False, legacy_dir=None):
# type: (CephadmContext, str, str, bool, Optional[str]) -> Dict[str, str]
# type: (CephadmContext) -> None
if not ctx.skip_pull:
- _pull_image(ctx, ctx.image)
+ try:
+ _pull_image(ctx, ctx.image)
+ except UnauthorizedRegistryError:
+ err_str = 'Failed to pull container image. Host may not be logged into container registry. Try `cephadm registry-login --registry-url <url> --registry-username <username> --registry-password <password>` or supply login info via a json file with `cephadm registry-login --registry-json <file>`'
+ logger.debug(f'Pulling image for `command_adopt` failed: {err_str}')
+ raise Error(err_str)
(daemon_type, daemon_id) = ctx.name.split('.', 1)
else:
call_throws(ctx, ['rm', '-rf', data_dir])
+ if 'tcp_ports' in ctx and ctx.tcp_ports is not None:
+ ports: List[int] = [int(p) for p in ctx.tcp_ports.split()]
+ try:
+ fw = Firewalld(ctx)
+ fw.close_ports(ports)
+ fw.apply_rules()
+ except RuntimeError as e:
+ # in case we cannot close the ports we will remove
+ # the daemon but keep them open.
+ logger.warning(f' Error when trying to close ports: {e}')
+
+
##################################
##################################
+def get_ceph_cluster_count(ctx: CephadmContext) -> int:
+ return len([c for c in os.listdir(ctx.data_dir) if is_fsid(c)])
+
+
def command_rm_cluster(ctx):
# type: (CephadmContext) -> None
if not ctx.force:
lock = FileLock(ctx, ctx.fsid)
lock.acquire()
- # stop + disable individual daemon units
- for d in list_daemons(ctx, detail=False):
- if d['fsid'] != ctx.fsid:
- continue
- if d['style'] != 'cephadm:v1':
- continue
- unit_name = get_unit_name(ctx.fsid, d['name'])
+ def disable_systemd_service(unit_name: str) -> None:
call(ctx, ['systemctl', 'stop', unit_name],
verbosity=CallVerbosity.DEBUG)
call(ctx, ['systemctl', 'reset-failed', unit_name],
call(ctx, ['systemctl', 'disable', unit_name],
verbosity=CallVerbosity.DEBUG)
+ # stop + disable individual daemon units
+ for d in list_daemons(ctx, detail=False):
+ if d['fsid'] != ctx.fsid:
+ continue
+ if d['style'] != 'cephadm:v1':
+ continue
+ disable_systemd_service(get_unit_name(ctx.fsid, d['name']))
+
# cluster units
for unit_name in ['ceph-%s.target' % ctx.fsid]:
- call(ctx, ['systemctl', 'stop', unit_name],
- verbosity=CallVerbosity.DEBUG)
- call(ctx, ['systemctl', 'reset-failed', unit_name],
- verbosity=CallVerbosity.DEBUG)
- call(ctx, ['systemctl', 'disable', unit_name],
- verbosity=CallVerbosity.DEBUG)
+ disable_systemd_service(unit_name)
slice_name = 'system-ceph\\x2d{}.slice'.format(ctx.fsid.replace('-', '\\x2d'))
call(ctx, ['systemctl', 'stop', slice_name],
# rm logrotate config
call_throws(ctx, ['rm', '-f', ctx.logrotate_dir + '/ceph-%s' % ctx.fsid])
- # rm cephadm logrotate config if last cluster on host
- if not os.listdir(ctx.data_dir):
+ # if last cluster on host remove shared files
+ if get_ceph_cluster_count(ctx) == 0:
+ disable_systemd_service('ceph.target')
+
+ # rm shared ceph target files
+ call_throws(ctx, ['rm', '-f', ctx.unit_dir + '/multi-user.target.wants/ceph.target'])
+ call_throws(ctx, ['rm', '-f', ctx.unit_dir + '/ceph.target'])
+
+ # rm cephadm logrotate config
call_throws(ctx, ['rm', '-f', ctx.logrotate_dir + '/cephadm'])
+ if not ctx.keep_logs:
+ # remove all cephadm logs
+ for fname in glob(f'{ctx.log_dir}/cephadm.log*'):
+ os.remove(fname)
+
# rm sysctl settings
- sysctl_dir = Path(ctx.sysctl_dir)
- for p in sysctl_dir.glob(f'90-ceph-{ctx.fsid}-*.conf'):
- p.unlink()
+ sysctl_dirs: List[Path] = [Path(ctx.sysctl_dir), Path('/usr/lib/sysctl.d')]
- # clean up config, keyring, and pub key files
- files = ['/etc/ceph/ceph.conf', '/etc/ceph/ceph.pub', '/etc/ceph/ceph.client.admin.keyring']
+ for sysctl_dir in sysctl_dirs:
+ for p in sysctl_dir.glob(f'90-ceph-{ctx.fsid}-*.conf'):
+ p.unlink()
+ # cleanup remaining ceph directories
+ ceph_dirs = [f'/run/ceph/{ctx.fsid}', f'/tmp/var/lib/ceph/{ctx.fsid}', f'/var/run/ceph/{ctx.fsid}']
+ for dd in ceph_dirs:
+ shutil.rmtree(dd, ignore_errors=True)
+
+ # clean up config, keyring, and pub key files
+ files = [CEPH_DEFAULT_CONF, CEPH_DEFAULT_PUBKEY, CEPH_DEFAULT_KEYRING]
if os.path.exists(files[0]):
valid_fsid = False
with open(files[0]) as f:
if ctx.fsid in f.read():
valid_fsid = True
if valid_fsid:
+ # rm configuration files on /etc/ceph
for n in range(0, len(files)):
if os.path.exists(files[n]):
os.remove(files[n])
-
##################################
##################################
+def get_ssh_vars(ssh_user: str) -> Tuple[int, int, str]:
+ try:
+ s_pwd = pwd.getpwnam(ssh_user)
+ except KeyError:
+ raise Error('Cannot find uid/gid for ssh-user: %s' % (ssh_user))
+
+ ssh_uid = s_pwd.pw_uid
+ ssh_gid = s_pwd.pw_gid
+ ssh_dir = os.path.join(s_pwd.pw_dir, '.ssh')
+ return ssh_uid, ssh_gid, ssh_dir
+
+
+def authorize_ssh_key(ssh_pub_key: str, ssh_user: str) -> bool:
+ """Authorize the public key for the provided ssh user"""
+
+ def key_in_file(path: str, key: str) -> bool:
+ if not os.path.exists(path):
+ return False
+ with open(path) as f:
+ lines = f.readlines()
+ for line in lines:
+ if line.strip() == key.strip():
+ return True
+ return False
+
+ logger.info(f'Adding key to {ssh_user}@localhost authorized_keys...')
+ if ssh_pub_key is None or ssh_pub_key.isspace():
+ raise Error('Trying to authorize an empty ssh key')
+
+ ssh_pub_key = ssh_pub_key.strip()
+ ssh_uid, ssh_gid, ssh_dir = get_ssh_vars(ssh_user)
+ if not os.path.exists(ssh_dir):
+ makedirs(ssh_dir, ssh_uid, ssh_gid, 0o700)
+
+ auth_keys_file = '%s/authorized_keys' % ssh_dir
+ if key_in_file(auth_keys_file, ssh_pub_key):
+ logger.info(f'key already in {ssh_user}@localhost authorized_keys...')
+ return False
+
+ add_newline = False
+ if os.path.exists(auth_keys_file):
+ with open(auth_keys_file, 'r') as f:
+ f.seek(0, os.SEEK_END)
+ if f.tell() > 0:
+ f.seek(f.tell() - 1, os.SEEK_SET) # go to last char
+ if f.read() != '\n':
+ add_newline = True
+
+ with open(auth_keys_file, 'a') as f:
+ os.fchown(f.fileno(), ssh_uid, ssh_gid) # just in case we created it
+ os.fchmod(f.fileno(), 0o600) # just in case we created it
+ if add_newline:
+ f.write('\n')
+ f.write(ssh_pub_key + '\n')
+
+ return True
+
+
+def revoke_ssh_key(key: str, ssh_user: str) -> None:
+ """Revoke the public key authorization for the ssh user"""
+ ssh_uid, ssh_gid, ssh_dir = get_ssh_vars(ssh_user)
+ auth_keys_file = '%s/authorized_keys' % ssh_dir
+ deleted = False
+ if os.path.exists(auth_keys_file):
+ with open(auth_keys_file, 'r') as f:
+ lines = f.readlines()
+ _, filename = tempfile.mkstemp()
+ with open(filename, 'w') as f:
+ os.fchown(f.fileno(), ssh_uid, ssh_gid)
+ os.fchmod(f.fileno(), 0o600) # secure access to the keys file
+ for line in lines:
+ if line.strip() == key.strip():
+ deleted = True
+ else:
+ f.write(line)
+
+ if deleted:
+ shutil.move(filename, auth_keys_file)
+ else:
+ logger.warning('Cannot find the ssh key to be deleted')
+
+
+def check_ssh_connectivity(ctx: CephadmContext) -> None:
+
+ def cmd_is_available(cmd: str) -> bool:
+ if shutil.which(cmd) is None:
+ logger.warning(f'Command not found: {cmd}')
+ return False
+ return True
+
+ if not cmd_is_available('ssh') or not cmd_is_available('ssh-keygen'):
+ logger.warning('Cannot check ssh connectivity. Skipping...')
+ return
+
+ logger.info('Verifying ssh connectivity ...')
+ if ctx.ssh_private_key and ctx.ssh_public_key:
+ # let's use the keys provided by the user
+ ssh_priv_key_path = pathify(ctx.ssh_private_key.name)
+ ssh_pub_key_path = pathify(ctx.ssh_public_key.name)
+ else:
+ # no custom keys, let's generate some random keys just for this check
+ ssh_priv_key_path = f'/tmp/ssh_key_{uuid.uuid1()}'
+ ssh_pub_key_path = f'{ssh_priv_key_path}.pub'
+ ssh_key_gen_cmd = ['ssh-keygen', '-q', '-t', 'rsa', '-N', '', '-C', '', '-f', ssh_priv_key_path]
+ _, _, code = call(ctx, ssh_key_gen_cmd)
+ if code != 0:
+ logger.warning('Cannot generate keys to check ssh connectivity.')
+ return
+
+ with open(ssh_pub_key_path, 'r') as f:
+ key = f.read().strip()
+ new_key = authorize_ssh_key(key, ctx.ssh_user)
+ ssh_cfg_file_arg = ['-F', pathify(ctx.ssh_config.name)] if ctx.ssh_config else []
+ _, _, code = call(ctx, ['ssh', '-o StrictHostKeyChecking=no',
+ *ssh_cfg_file_arg, '-i', ssh_priv_key_path,
+ '-o PasswordAuthentication=no',
+ f'{ctx.ssh_user}@{get_hostname()}',
+ 'sudo echo'])
+
+ # we only remove the key if it's a new one. In case the user has provided
+ # some already existing key then we don't alter authorized_keys file
+ if new_key:
+ revoke_ssh_key(key, ctx.ssh_user)
+
+ pub_key_msg = '- The public key file configured by --ssh-public-key is valid\n' if ctx.ssh_public_key else ''
+ prv_key_msg = '- The private key file configured by --ssh-private-key is valid\n' if ctx.ssh_private_key else ''
+ ssh_cfg_msg = '- The ssh configuration file configured by --ssh-config is valid\n' if ctx.ssh_config else ''
+ err_msg = f"""
+** Please verify your user's ssh configuration and make sure:
+- User {ctx.ssh_user} must have passwordless sudo access
+{pub_key_msg}{prv_key_msg}{ssh_cfg_msg}
+"""
+ if code != 0:
+ raise Error(err_msg)
+
+
def command_prepare_host(ctx: CephadmContext) -> None:
logger.info('Verifying podman|docker is present...')
pkg = None
continue
for iface in os.listdir(nic_path):
+ if os.path.exists(os.path.join(nic_path, iface, 'bridge')):
+ nic_type = 'bridge'
+ elif os.path.exists(os.path.join(nic_path, iface, 'bonding')):
+ nic_type = 'bonding'
+ else:
+ nic_type = hw_lookup.get(read_file([os.path.join(nic_path, iface, 'type')]), 'Unknown')
+
+ if nic_type == 'loopback': # skip loopback devices
+ continue
+
lower_devs_list = [os.path.basename(link.replace('lower_', '')) for link in glob(os.path.join(nic_path, iface, 'lower_*'))]
upper_devs_list = [os.path.basename(link.replace('upper_', '')) for link in glob(os.path.join(nic_path, iface, 'upper_*'))]
# Either way, we show a -1 when speed isn't available
speed = -1
- if os.path.exists(os.path.join(nic_path, iface, 'bridge')):
- nic_type = 'bridge'
- elif os.path.exists(os.path.join(nic_path, iface, 'bonding')):
- nic_type = 'bonding'
- else:
- nic_type = hw_lookup.get(read_file([os.path.join(nic_path, iface, 'type')]), 'Unknown')
-
dev_link = os.path.join(nic_path, iface, 'device')
if os.path.exists(dev_link):
iftype = 'physical'
parser_version.set_defaults(func=command_version)
parser_pull = subparsers.add_parser(
- 'pull', help='pull latest image version')
+ 'pull', help='pull the default container image')
parser_pull.set_defaults(func=command_pull)
parser_pull.add_argument(
'--insecure',
parser_adopt.add_argument(
'--skip-pull',
action='store_true',
- help='do not pull the latest image before adopting')
+ help='do not pull the default image before adopting')
parser_adopt.add_argument(
'--force-start',
action='store_true',
required=True,
action=CustomValidation,
help='daemon name (type.id)')
+ parser_rm_daemon.add_argument(
+ '--tcp-ports',
+ help='List of tcp ports to close in the host firewall')
parser_rm_daemon.add_argument(
'--fsid',
required=True,
'--mon-id',
required=False,
help='mon id (default: local hostname)')
- parser_bootstrap.add_argument(
+ group = parser_bootstrap.add_mutually_exclusive_group()
+ group.add_argument(
'--mon-addrv',
help='mon IPs (e.g., [v2:localipaddr:3300,v1:localipaddr:6789])')
- parser_bootstrap.add_argument(
+ group.add_argument(
'--mon-ip',
help='mon IP')
parser_bootstrap.add_argument(
parser_bootstrap.add_argument(
'--skip-pull',
action='store_true',
- help='do not pull the latest image before bootstrapping')
+ help='do not pull the default image before bootstrapping')
parser_bootstrap.add_argument(
'--skip-firewalld',
action='store_true',
global logger
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
- dictConfig(logging_config)
+ operations = ['bootstrap', 'rm-cluster']
+ if any(op in args for op in operations):
+ dictConfig(interactive_logging_config)
+ else:
+ dictConfig(logging_config)
+
logger = logging.getLogger()
if not os.path.exists(ctx.logrotate_dir + '/cephadm'):