import struct
import ssl
from enum import Enum
-from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO, Sequence, TypeVar, cast, Set, Iterable
+from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO, Sequence, TypeVar, cast, Set, Iterable, TextIO
import re
import uuid
CONTAINER_INIT = True
MIN_PODMAN_VERSION = (2, 0, 2)
CGROUPS_SPLIT_PODMAN_VERSION = (2, 1, 0)
+PIDS_LIMIT_UNLIMITED_PODMAN_VERSION = (3, 4, 1)
CUSTOM_PS1 = r'[ceph: \u@\h \W]\$ '
DEFAULT_TIMEOUT = None # in seconds
DEFAULT_RETRY = 15
class Ceph(object):
daemons = ('mon', 'mgr', 'osd', 'mds', 'rgw', 'rbd-mirror',
- 'crash', 'cephfs-mirror')
+ 'crash', 'cephfs-mirror', 'ceph-exporter')
+ gateways = ('iscsi', 'nfs')
##################################
cmd = daemon_type.replace('-', '_')
code = -1
err = ''
+ out = ''
version = ''
if daemon_type == 'alertmanager':
for cmd in ['alertmanager', 'prometheus-alertmanager']:
- _, err, code = call(ctx, [
+ out, err, code = call(ctx, [
ctx.container_engine.path, 'exec', container_id, cmd,
'--version'
], verbosity=CallVerbosity.QUIET)
break
cmd = 'alertmanager' # reset cmd for version extraction
else:
- _, err, code = call(ctx, [
+ out, err, code = call(ctx, [
ctx.container_engine.path, 'exec', container_id, cmd, '--version'
], verbosity=CallVerbosity.QUIET)
- if code == 0 and \
- err.startswith('%s, version ' % cmd):
- version = err.split(' ')[2]
+ if code == 0:
+ if err.startswith('%s, version ' % cmd):
+ version = err.split(' ')[2]
+ elif out.startswith('%s, version ' % cmd):
+ version = out.split(' ')[2]
return version
##################################
# remove extra container args for tcmu container.
# extra args could cause issue with forking service type
tcmu_container.container_args = []
+ set_pids_limit_unlimited(self.ctx, tcmu_container.container_args)
return tcmu_container
##################################
+class CephExporter(object):
+ """Defines a Ceph exporter container"""
+
+ daemon_type = 'ceph-exporter'
+ entrypoint = '/usr/bin/ceph-exporter'
+ DEFAULT_PORT = 9926
+ port_map = {
+ 'ceph-exporter': DEFAULT_PORT,
+ }
+
+ def __init__(self,
+ ctx: CephadmContext,
+ fsid: str, daemon_id: Union[int, str],
+ config_json: Dict[str, Any],
+ image: str = DEFAULT_IMAGE) -> None:
+ self.ctx = ctx
+ self.fsid = fsid
+ self.daemon_id = daemon_id
+ self.image = image
+
+ self.sock_dir = config_json.get('sock-dir', '/var/run/ceph/')
+ self.addrs = config_json.get('addrs', socket.gethostbyname(socket.gethostname()))
+ self.port = config_json.get('port', self.DEFAULT_PORT)
+ self.prio_limit = config_json.get('prio-limit', 5)
+ self.stats_period = config_json.get('stats-period', 5)
+
+ self.validate()
+
+ @classmethod
+ def init(cls, ctx: CephadmContext, fsid: str,
+ daemon_id: Union[int, str]) -> 'CephExporter':
+ return cls(ctx, fsid, daemon_id,
+ get_parm(ctx.config_json), ctx.image)
+
+ @staticmethod
+ def get_container_mounts() -> Dict[str, str]:
+ mounts = dict()
+ mounts['/var/run/ceph'] = '/var/run/ceph:z'
+ return mounts
+
+ def get_daemon_args(self) -> List[str]:
+ args = [
+ f'--sock-dir={self.sock_dir}',
+ f'--addrs={self.addrs}',
+ f'--port={self.port}',
+ f'--prio-limit={self.prio_limit}',
+ f'--stats-period={self.stats_period}',
+ ]
+ return args
+
+ def validate(self) -> None:
+ if not os.path.isdir(self.sock_dir):
+ raise Error(f'Directory does not exist. Got: {self.sock_dir}')
+
+
+##################################
+
+
class HAproxy(object):
"""Defines an HAproxy container"""
daemon_type = 'haproxy'
@staticmethod
def get_sysctl_settings() -> List[str]:
return [
- '# IP forwarding',
+ '# IP forwarding and non-local bind',
'net.ipv4.ip_forward = 1',
+ 'net.ipv4.ip_nonlocal_bind = 1',
]
##################################
return socket.gethostname()
+def get_short_hostname():
+ # type: () -> str
+ return get_hostname().split('.', 1)[0]
+
+
def get_fqdn():
# type: () -> str
return socket.getfqdn() or socket.gethostname()
def generate_service_id():
# type: () -> str
- return get_hostname() + '.' + ''.join(random.choice(string.ascii_lowercase)
- for _ in range(6))
+ return get_short_hostname() + '.' + ''.join(random.choice(string.ascii_lowercase)
+ for _ in range(6))
def generate_password():
# type: (CephadmContext, str, str, Union[int, str]) -> List[str]
r = list() # type: List[str]
- if daemon_type in Ceph.daemons and daemon_type != 'crash':
+ if daemon_type in Ceph.daemons and daemon_type not in ['crash', 'ceph-exporter']:
r += [
'--setuser', 'ceph',
'--setgroup', 'ceph',
port = meta['ports'][0]
r += [f'--web.listen-address={ip}:{port}']
if daemon_type == 'prometheus':
+ config = get_parm(ctx.config_json)
+ retention_time = config.get('retention_time', '15d')
+ retention_size = config.get('retention_size', '0') # default to disabled
+ r += [f'--storage.tsdb.retention.time={retention_time}']
+ r += [f'--storage.tsdb.retention.size={retention_size}']
scheme = 'http'
host = get_fqdn()
r += [f'--web.external-url={scheme}://{host}:{port}']
elif daemon_type == NFSGanesha.daemon_type:
nfs_ganesha = NFSGanesha.init(ctx, fsid, daemon_id)
r += nfs_ganesha.get_daemon_args()
+ elif daemon_type == CephExporter.daemon_type:
+ ceph_exporter = CephExporter.init(ctx, fsid, daemon_id)
+ r.extend(ceph_exporter.get_daemon_args())
elif daemon_type == HAproxy.daemon_type:
haproxy = HAproxy.init(ctx, fsid, daemon_id)
r += haproxy.get_daemon_args()
mounts[data_dir] = cdata_dir + ':z'
if not no_config:
mounts[data_dir + '/config'] = '/etc/ceph/ceph.conf:z'
- if daemon_type in ['rbd-mirror', 'cephfs-mirror', 'crash']:
+ if daemon_type in ['rbd-mirror', 'cephfs-mirror', 'crash', 'ceph-exporter']:
# these do not search for their keyrings in a data directory
mounts[data_dir + '/keyring'] = '/etc/ceph/ceph.client.%s.%s.keyring' % (daemon_type, daemon_id)
)
+def set_pids_limit_unlimited(ctx: CephadmContext, container_args: List[str]) -> None:
+ # set container's pids-limit to unlimited rather than default (Docker 4096 / Podman 2048)
+ # Useful for daemons like iscsi where the default pids-limit limits the number of luns
+ # per iscsi target or rgw where increasing the rgw_thread_pool_size to a value near
+ # the default pids-limit may cause the container to crash.
+ if (
+ isinstance(ctx.container_engine, Podman)
+ and ctx.container_engine.version >= PIDS_LIMIT_UNLIMITED_PODMAN_VERSION
+ ):
+ container_args.append('--pids-limit=-1')
+ else:
+ container_args.append('--pids-limit=0')
+
+
def get_container(ctx: CephadmContext,
fsid: str, daemon_type: str, daemon_id: Union[int, str],
privileged: bool = False,
envs.append('TCMALLOC_MAX_TOTAL_THREAD_CACHE_BYTES=134217728')
if container_args is None:
container_args = []
+ if daemon_type in Ceph.daemons or daemon_type in Ceph.gateways:
+ set_pids_limit_unlimited(ctx, container_args)
if daemon_type in ['mon', 'osd']:
# mon and osd need privileged in order for libudev to query devices
privileged = True
entrypoint = NFSGanesha.entrypoint
name = '%s.%s' % (daemon_type, daemon_id)
envs.extend(NFSGanesha.get_container_envs())
+ elif daemon_type == CephExporter.daemon_type:
+ entrypoint = CephExporter.entrypoint
+ name = 'client.ceph-exporter.%s' % daemon_id
elif daemon_type == HAproxy.daemon_type:
name = '%s.%s' % (daemon_type, daemon_id)
container_args.extend(['--user=root']) # haproxy 2.4 defaults to a different user
'--cidfile',
runtime_dir + '/ceph-%s@%s.%s.service-cid' % (fsid, daemon_type, daemon_id),
])
- if ctx.container_engine.version >= CGROUPS_SPLIT_PODMAN_VERSION:
+ if ctx.container_engine.version >= CGROUPS_SPLIT_PODMAN_VERSION and not ctx.no_cgroups_split:
container_args.append('--cgroups=split')
return CephContainer.for_daemon(
# Open ports explicitly required for the daemon
if ports:
fw = Firewalld(ctx)
- fw.open_ports(ports)
+ fw.open_ports(ports + fw.external_ports.get(daemon_type, []))
fw.apply_rules()
if reconfig and daemon_type not in Ceph.daemons:
ports: Optional[List[int]] = None,
) -> None:
# cmd
+
+ def add_stop_actions(f: TextIO) -> None:
+ # following generated script basically checks if the container exists
+ # before stopping it. Exit code will be success either if it doesn't
+ # exist or if it exists and is stopped successfully.
+ container_exists = f'{ctx.container_engine.path} inspect %s &>/dev/null'
+ f.write(f'! {container_exists % c.old_cname} || {" ".join(c.stop_cmd(old_cname=True))} \n')
+ f.write(f'! {container_exists % c.cname} || {" ".join(c.stop_cmd())} \n')
+
data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
with open(data_dir + '/unit.run.new', 'w') as f, \
open(data_dir + '/unit.meta.new', 'w') as metaf:
# post-stop command(s)
with open(data_dir + '/unit.poststop.new', 'w') as f:
+ # this is a fallback to eventually stop any underlying container that was not stopped properly by unit.stop,
+ # this could happen in very slow setups as described in the issue https://tracker.ceph.com/issues/58242.
+ add_stop_actions(f)
if daemon_type == 'osd':
assert osd_fsid
poststop = get_ceph_volume_container(
# post-stop command(s)
with open(data_dir + '/unit.stop.new', 'w') as f:
- # following generated script basically checks if the container exists
- # before stopping it. Exit code will be success either if it doesn't
- # exist or if it exists and is stopped successfully.
- container_exists = f'{ctx.container_engine.path} inspect %s &>/dev/null'
- f.write(f'! {container_exists % c.old_cname} || {" ".join(c.stop_cmd(old_cname=True))} \n')
- f.write(f'! {container_exists % c.cname} || {" ".join(c.stop_cmd())} \n')
-
+ add_stop_actions(f)
os.fchmod(f.fileno(), 0o600)
os.rename(data_dir + '/unit.stop.new',
data_dir + '/unit.stop')
class Firewalld(object):
+
+ # for specifying ports we should always open when opening
+ # ports for a daemon of that type. Main use case is for ports
+ # that we should open when deploying the daemon type but that
+ # the daemon itself may not necessarily need to bind to the port.
+ # This needs to be handed differently as we don't want to fail
+ # deployment if the port cannot be bound to but we still want to
+ # open the port in the firewall.
+ external_ports: Dict[str, List[int]] = {
+ 'iscsi': [3260] # 3260 is the well known iSCSI port
+ }
+
def __init__(self, ctx):
# type: (CephadmContext) -> None
self.ctx = ctx
call_throws(ctx, ['systemctl', 'enable', 'ceph-%s.target' % fsid])
call_throws(ctx, ['systemctl', 'start', 'ceph-%s.target' % fsid])
+ # don't overwrite file in order to allow users to manipulate it
+ if os.path.exists(ctx.logrotate_dir + f'/ceph-{fsid}'):
+ return
+
# logrotate for the cluster
with open(ctx.logrotate_dir + '/ceph-%s' % fsid, 'w') as f:
"""
KillMode=none
Restart=on-failure
RestartSec=10s
-TimeoutStartSec=120
+TimeoutStartSec=200
TimeoutStopSec=120
StartLimitInterval=30min
StartLimitBurst=5
return None
# Ensure all public CIDR networks are valid
- public_network = cp.get('global', 'public_network')
+ public_network = cp.get('global', 'public_network').strip('"').strip("'")
rc, _, err_msg = check_subnet(public_network)
if rc:
raise Error(f'Invalid public_network {public_network} parameter: {err_msg}')
cp = read_config(ctx.config)
cluster_network = ctx.cluster_network
if cluster_network is None and cp.has_option('global', 'cluster_network'):
- cluster_network = cp.get('global', 'cluster_network')
+ cluster_network = cp.get('global', 'cluster_network').strip('"').strip("'")
if cluster_network:
cluser_nets = set([x.strip() for x in cluster_network.split(',')])
cli(['orch', 'apply', 'crash'])
if not ctx.skip_monitoring_stack:
- for t in ['prometheus', 'grafana', 'node-exporter', 'alertmanager']:
+ for t in ['ceph-exporter', 'prometheus', 'grafana', 'node-exporter', 'alertmanager']:
logger.info('Deploying %s service with default placement...' % t)
cli(['orch', 'apply', t])
pass
-# funcs to process spec file for apply spec
-def _parse_yaml_docs(f: Iterable[str]) -> List[List[str]]:
- docs = []
- current_doc = [] # type: List[str]
+def _extract_host_info_from_applied_spec(f: Iterable[str]) -> List[Dict[str, str]]:
+ # overall goal of this function is to go through an applied spec and find
+ # the hostname (and addr is provided) for each host spec in the applied spec.
+ # Generally, we should be able to just pass the spec to the mgr module where
+ # proper yaml parsing can happen, but for host specs in particular we want to
+ # be able to distribute ssh keys, which requires finding the hostname (and addr
+ # if possible) for each potential host spec in the applied spec.
+
+ specs: List[List[str]] = []
+ current_spec: List[str] = []
for line in f:
if re.search(r'^---\s+', line):
- if current_doc:
- docs.append(current_doc)
- current_doc = []
- else:
- current_doc.append(line.rstrip())
- if current_doc:
- docs.append(current_doc)
- return docs
-
-
-def _parse_yaml_obj(doc: List[str]) -> Dict[str, str]:
- # note: this only parses the first layer of yaml
- obj = {} # type: Dict[str, str]
- current_key = ''
- for line in doc:
- if line.startswith(' '):
- obj[current_key] += line.strip()
- elif line.endswith(':'):
- current_key = line.strip(':')
- obj[current_key] = ''
+ if current_spec:
+ specs.append(current_spec)
+ current_spec = []
else:
- current_key, val = line.split(':')
- obj[current_key] = val.strip()
- return obj
-
+ line = line.strip()
+ if line:
+ current_spec.append(line)
+ if current_spec:
+ specs.append(current_spec)
+
+ host_specs: List[List[str]] = []
+ for spec in specs:
+ for line in spec:
+ if 'service_type' in line:
+ try:
+ _, type = line.split(':')
+ type = type.strip()
+ if type == 'host':
+ host_specs.append(spec)
+ except ValueError as e:
+ spec_str = '\n'.join(spec)
+ logger.error(f'Failed to pull service_type from spec:\n{spec_str}. Got error: {e}')
+ break
+ spec_str = '\n'.join(spec)
+ logger.error(f'Failed to find service_type within spec:\n{spec_str}')
+
+ host_dicts = []
+ for s in host_specs:
+ host_dict = _extract_host_info_from_spec(s)
+ # if host_dict is empty here, we failed to pull the hostname
+ # for the host from the spec. This should have already been logged
+ # so at this point we just don't want to include it in our output
+ if host_dict:
+ host_dicts.append(host_dict)
+
+ return host_dicts
+
+
+def _extract_host_info_from_spec(host_spec: List[str]) -> Dict[str, str]:
+ # note:for our purposes here, we only really want the hostname
+ # and address of the host from each of these specs in order to
+ # be able to distribute ssh keys. We will later apply the spec
+ # through the mgr module where proper yaml parsing can be done
+ # The returned dicts from this function should only contain
+ # one or two entries, one (required) for hostname, one (optional) for addr
+ # {
+ # hostname: <hostname>
+ # addr: <ip-addr>
+ # }
+ # if we fail to find the hostname, an empty dict is returned
+
+ host_dict = {} # type: Dict[str, str]
+ for line in host_spec:
+ for field in ['hostname', 'addr']:
+ if field in line:
+ try:
+ _, field_value = line.split(':')
+ field_value = field_value.strip()
+ host_dict[field] = field_value
+ except ValueError as e:
+ spec_str = '\n'.join(host_spec)
+ logger.error(f'Error trying to pull {field} from host spec:\n{spec_str}. Got error: {e}')
-def parse_yaml_objs(f: Iterable[str]) -> List[Dict[str, str]]:
- objs = []
- for d in _parse_yaml_docs(f):
- objs.append(_parse_yaml_obj(d))
- return objs
+ if 'hostname' not in host_dict:
+ spec_str = '\n'.join(host_spec)
+ logger.error(f'Could not find hostname in host spec:\n{spec_str}')
+ return {}
+ return host_dict
-def _distribute_ssh_keys(ctx: CephadmContext, host_spec: Dict[str, str], bootstrap_hostname: str) -> int:
+def _distribute_ssh_keys(ctx: CephadmContext, host_info: Dict[str, str], bootstrap_hostname: str) -> int:
# copy ssh key to hosts in host spec (used for apply spec)
ssh_key = CEPH_DEFAULT_PUBKEY
if ctx.ssh_public_key:
ssh_key = ctx.ssh_public_key.name
- if bootstrap_hostname != host_spec['hostname']:
- if 'addr' in host_spec:
- addr = host_spec['addr']
+ if bootstrap_hostname != host_info['hostname']:
+ if 'addr' in host_info:
+ addr = host_info['addr']
else:
- addr = host_spec['hostname']
+ addr = host_info['hostname']
out, err, code = call(ctx, ['sudo', '-u', ctx.ssh_user, 'ssh-copy-id', '-f', '-i', ssh_key, '-o StrictHostKeyChecking=no', '%s@%s' % (ctx.ssh_user, addr)])
if code:
- logger.info('\nCopying ssh key to host %s at address %s failed!\n' % (host_spec['hostname'], addr))
+ logger.error('\nCopying ssh key to host %s at address %s failed!\n' % (host_info['hostname'], addr))
return 1
else:
- logger.info('Added ssh key to host %s at address %s\n' % (host_spec['hostname'], addr))
+ logger.info('Added ssh key to host %s at address %s' % (host_info['hostname'], addr))
return 0
hostname = get_hostname()
if '.' in hostname and not ctx.allow_fqdn_hostname:
raise Error('hostname is a fully qualified domain name (%s); either fix (e.g., "sudo hostname %s" or similar) or pass --allow-fqdn-hostname' % (hostname, hostname.split('.')[0]))
- mon_id = ctx.mon_id or hostname
+ mon_id = ctx.mon_id or get_short_hostname()
mgr_id = ctx.mgr_id or generate_service_id()
lock = FileLock(ctx, fsid)
logger.info('Applying %s to cluster' % ctx.apply_spec)
# copy ssh key to hosts in spec file
with open(ctx.apply_spec) as f:
- try:
- for spec in parse_yaml_objs(f):
- if spec.get('service_type') == 'host':
- _distribute_ssh_keys(ctx, spec, hostname)
- except ValueError:
- logger.info('Unable to parse %s succesfully' % ctx.apply_spec)
+ host_dicts = _extract_host_info_from_applied_spec(f)
+ for h in host_dicts:
+ _distribute_ssh_keys(ctx, h, hostname)
mounts = {}
mounts[pathify(ctx.apply_spec)] = '/tmp/spec.yml:ro'
c = get_container(ctx, fsid, daemon_type, daemon_id, privileged, ptrace, container_args)
if 'extra_container_args' in ctx and ctx.extra_container_args:
c.container_args.extend(ctx.extra_container_args)
+ if 'extra_entrypoint_args' in ctx and ctx.extra_entrypoint_args:
+ c.args.extend(ctx.extra_entrypoint_args)
if 'config_json' in ctx and ctx.config_json:
conf_files = get_custom_config_files(ctx.config_json)
mandatory_keys = ['mount_path', 'content']
def __init__(self, ctx: CephadmContext):
self.ctx: CephadmContext = ctx
self.cpu_model: str = 'Unknown'
+ self.sysctl_options: Dict[str, str] = self._populate_sysctl_options()
self.cpu_count: int = 0
self.cpu_cores: int = 0
self.cpu_threads: int = 0
self.arch: str = platform.processor()
self.kernel: str = platform.release()
+ def _populate_sysctl_options(self) -> Dict[str, str]:
+ sysctl_options = {}
+ out, _, _ = call_throws(self.ctx, ['sysctl', '-a'], verbosity=CallVerbosity.QUIET_UNLESS_ERROR)
+ if out:
+ for line in out.splitlines():
+ option, value = line.split('=')
+ sysctl_options[option.strip()] = value.strip()
+ return sysctl_options
+
def _get_cpuinfo(self):
# type: () -> None
"""Determine cpu information via /proc/cpuinfo"""
def _get_capacity(self, dev):
# type: (str) -> int
- """Determine the size of a given device"""
+ """Determine the size of a given device
+
+ The kernel always bases device size calculations based on a 512 byte
+ sector. For more information see
+ https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/tree/include/linux/types.h?h=v5.15.63#n120
+ """
size_path = os.path.join('/sys/block', dev, 'size')
size_blocks = int(read_file([size_path]))
- blk_path = os.path.join('/sys/block', dev, 'queue', 'logical_block_size')
- blk_count = int(read_file([blk_path]))
- return size_blocks * blk_count
+ return size_blocks * 512
def _get_capacity_by_type(self, rota='0'):
# type: (str) -> int
action='store_true',
default=not CONTAINER_INIT,
help='Do not run podman/docker with `--init`')
+ parser.add_argument(
+ '--no-cgroups-split',
+ action='store_true',
+ default=False,
+ help='Do not run containers with --cgroups=split (currently only relevant when using podman)')
subparsers = parser.add_subparsers(help='sub-command')
default=[],
help='Additional container arguments to apply to deamon'
)
+ parser_deploy.add_argument(
+ '--extra-entrypoint-args',
+ action='append',
+ default=[],
+ help='Additional entrypoint arguments to apply to deamon'
+ )
parser_check_host = subparsers.add_parser(
'check-host', help='check host configuration')