4 import asyncio
.subprocess
11 from logging
.config
import dictConfig
26 from socketserver
import ThreadingMixIn
27 from http
.server
import BaseHTTPRequestHandler
, HTTPServer
30 from contextlib
import redirect_stdout
34 from typing
import Dict
, List
, Tuple
, Optional
, Union
, Any
, NoReturn
, Callable
, IO
39 from configparser
import ConfigParser
40 from functools
import wraps
42 from io
import StringIO
43 from threading
import Thread
, RLock
44 from urllib
.error
import HTTPError
45 from urllib
.request
import urlopen
46 from pathlib
import Path
48 # Default container images -----------------------------------------------------
49 DEFAULT_IMAGE
= 'docker.io/ceph/ceph:v16'
50 DEFAULT_IMAGE_IS_MASTER
= False
51 DEFAULT_IMAGE_RELEASE
= 'pacific'
52 DEFAULT_PROMETHEUS_IMAGE
= 'docker.io/prom/prometheus:v2.18.1'
53 DEFAULT_NODE_EXPORTER_IMAGE
= 'docker.io/prom/node-exporter:v0.18.1'
54 DEFAULT_GRAFANA_IMAGE
= 'docker.io/ceph/ceph-grafana:6.7.4'
55 DEFAULT_ALERT_MANAGER_IMAGE
= 'docker.io/prom/alertmanager:v0.20.0'
56 DEFAULT_REGISTRY
= 'docker.io' # normalize unqualified digests to this
57 # ------------------------------------------------------------------------------
59 LATEST_STABLE_RELEASE
= 'pacific'
60 DATA_DIR
= '/var/lib/ceph'
61 LOG_DIR
= '/var/log/ceph'
62 LOCK_DIR
= '/run/cephadm'
63 LOGROTATE_DIR
= '/etc/logrotate.d'
64 UNIT_DIR
= '/etc/systemd/system'
68 MIN_PODMAN_VERSION
= (2, 0, 2)
69 CGROUPS_SPLIT_PODMAN_VERSION
= (2, 1, 0)
70 CUSTOM_PS1
= r
'[ceph: \u@\h \W]\$ '
71 DEFAULT_TIMEOUT
= None # in seconds
73 SHELL_DEFAULT_CONF
= '/etc/ceph/ceph.conf'
74 SHELL_DEFAULT_KEYRING
= '/etc/ceph/ceph.client.admin.keyring'
75 DATEFMT
= '%Y-%m-%dT%H:%M:%S.%fZ'
77 logger
: logging
.Logger
= None # type: ignore
80 You can invoke cephadm in two ways:
82 1. The normal way, at the command line.
84 2. By piping the script to the python3 binary. In this latter case, you should
85 prepend one or more lines to the beginning of the script.
93 injected_argv = ['ls']
95 For reading stdin from the '--config-json -' argument,
97 injected_stdin = '...'
101 ##################################
108 self
.docker
: bool = False
109 self
.data_dir
: str = DATA_DIR
110 self
.log_dir
: str = LOG_DIR
111 self
.logrotate_dir
: str = LOGROTATE_DIR
112 self
.unit_dir
: str = UNIT_DIR
113 self
.verbose
: bool = False
114 self
.timeout
: Optional
[int] = DEFAULT_TIMEOUT
115 self
.retry
: int = DEFAULT_RETRY
116 self
.env
: List
[str] = []
117 self
.memory_request
: Optional
[int] = None
118 self
.memory_limit
: Optional
[int] = None
120 self
.container_init
: bool = CONTAINER_INIT
121 self
.container_engine
: Optional
[ContainerEngine
] = None
123 def set_from_args(self
, args
: argparse
.Namespace
):
124 argdict
: Dict
[str, Any
] = vars(args
)
125 for k
, v
in argdict
.items():
130 class CephadmContext
:
133 self
.__dict
__['_args'] = None
134 self
.__dict
__['_conf'] = BaseConfig()
136 def set_args(self
, args
: argparse
.Namespace
) -> None:
137 self
._conf
.set_from_args(args
)
140 def has_function(self
) -> bool:
141 return 'func' in self
._args
143 def __contains__(self
, name
: str) -> bool:
144 return hasattr(self
, name
)
146 def __getattr__(self
, name
: str) -> Any
:
147 if '_conf' in self
.__dict
__ and hasattr(self
._conf
, name
):
148 return getattr(self
._conf
, name
)
149 elif '_args' in self
.__dict
__ and hasattr(self
._args
, name
):
150 return getattr(self
._args
, name
)
152 return super().__getattribute
__(name
)
154 def __setattr__(self
, name
: str, value
: Any
) -> None:
155 if hasattr(self
._conf
, name
):
156 setattr(self
._conf
, name
, value
)
157 elif hasattr(self
._args
, name
):
158 setattr(self
._args
, name
, value
)
160 super().__setattr
__(name
, value
)
163 class ContainerEngine
:
165 self
.path
= find_program(self
.EXE
)
168 def EXE(self
) -> str:
169 raise NotImplementedError()
172 class Podman(ContainerEngine
):
181 if self
._version
is None:
182 raise RuntimeError('Please call `get_version` first')
185 def get_version(self
, ctx
: CephadmContext
):
186 out
, _
, _
= call_throws(ctx
, [self
.path
, 'version', '--format', '{{.Client.Version}}'])
187 self
._version
= _parse_podman_version(out
)
190 class Docker(ContainerEngine
):
194 CONTAINER_PREFERENCE
= (Podman
, Docker
) # prefer podman to docker
197 # Log and console output config
200 'disable_existing_loggers': True,
203 'format': '%(asctime)s %(levelname)s %(message)s'
209 'class': 'logging.StreamHandler',
213 'class': 'logging.handlers.RotatingFileHandler',
214 'formatter': 'cephadm',
215 'filename': '%s/cephadm.log' % LOG_DIR
,
223 'handlers': ['console', 'log_file'],
235 class Error(Exception):
239 class TimeoutExpired(Error
):
242 ##################################
246 daemons
= ('mon', 'mgr', 'mds', 'osd', 'rgw', 'rbd-mirror',
247 'crash', 'cephfs-mirror')
249 ##################################
252 class Monitoring(object):
253 """Define the configs for the monitoring containers"""
256 'prometheus': [9095], # Avoid default 9090, due to conflict with cockpit UI
257 'node-exporter': [9100],
259 'alertmanager': [9093, 9094],
264 'image': DEFAULT_PROMETHEUS_IMAGE
,
268 '--config.file=/etc/prometheus/prometheus.yml',
269 '--storage.tsdb.path=/prometheus',
270 '--web.listen-address=:{}'.format(port_map
['prometheus'][0]),
272 'config-json-files': [
277 'image': DEFAULT_NODE_EXPORTER_IMAGE
,
281 '--no-collector.timex',
285 'image': DEFAULT_GRAFANA_IMAGE
,
289 'config-json-files': [
291 'provisioning/datasources/ceph-dashboard.yml',
297 'image': DEFAULT_ALERT_MANAGER_IMAGE
,
301 '--web.listen-address=:{}'.format(port_map
['alertmanager'][0]),
302 '--cluster.listen-address=:{}'.format(port_map
['alertmanager'][1]),
304 'config-json-files': [
307 'config-json-args': [
314 def get_version(ctx
, container_id
, daemon_type
):
315 # type: (CephadmContext, str, str) -> str
317 :param: daemon_type Either "prometheus", "alertmanager" or "node-exporter"
319 assert daemon_type
in ('prometheus', 'alertmanager', 'node-exporter')
320 cmd
= daemon_type
.replace('-', '_')
324 if daemon_type
== 'alertmanager':
325 for cmd
in ['alertmanager', 'prometheus-alertmanager']:
326 _
, err
, code
= call(ctx
, [
327 ctx
.container_engine
.path
, 'exec', container_id
, cmd
,
329 ], verbosity
=CallVerbosity
.DEBUG
)
332 cmd
= 'alertmanager' # reset cmd for version extraction
334 _
, err
, code
= call(ctx
, [
335 ctx
.container_engine
.path
, 'exec', container_id
, cmd
, '--version'
336 ], verbosity
=CallVerbosity
.DEBUG
)
338 err
.startswith('%s, version ' % cmd
):
339 version
= err
.split(' ')[2]
342 ##################################
345 def populate_files(config_dir
, config_files
, uid
, gid
):
346 # type: (str, Dict, int, int) -> None
347 """create config files for different services"""
348 for fname
in config_files
:
349 config_file
= os
.path
.join(config_dir
, fname
)
350 config_content
= dict_get_join(config_files
, fname
)
351 logger
.info('Write file: %s' % (config_file
))
352 with
open(config_file
, 'w') as f
:
353 os
.fchown(f
.fileno(), uid
, gid
)
354 os
.fchmod(f
.fileno(), 0o600)
355 f
.write(config_content
)
358 class NFSGanesha(object):
359 """Defines a NFS-Ganesha container"""
362 entrypoint
= '/usr/bin/ganesha.nfsd'
363 daemon_args
= ['-F', '-L', 'STDERR']
365 required_files
= ['ganesha.conf']
376 image
=DEFAULT_IMAGE
):
377 # type: (CephadmContext, str, Union[int, str], Dict, str) -> None
380 self
.daemon_id
= daemon_id
383 # config-json options
384 self
.pool
= dict_get(config_json
, 'pool', require
=True)
385 self
.namespace
= dict_get(config_json
, 'namespace')
386 self
.userid
= dict_get(config_json
, 'userid')
387 self
.extra_args
= dict_get(config_json
, 'extra_args', [])
388 self
.files
= dict_get(config_json
, 'files', {})
389 self
.rgw
= dict_get(config_json
, 'rgw', {})
391 # validate the supplied args
395 def init(cls
, ctx
, fsid
, daemon_id
):
396 # type: (CephadmContext, str, Union[int, str]) -> NFSGanesha
397 return cls(ctx
, fsid
, daemon_id
, get_parm(ctx
.config_json
), ctx
.image
)
399 def get_container_mounts(self
, data_dir
):
400 # type: (str) -> Dict[str, str]
402 mounts
[os
.path
.join(data_dir
, 'config')] = '/etc/ceph/ceph.conf:z'
403 mounts
[os
.path
.join(data_dir
, 'keyring')] = '/etc/ceph/keyring:z'
404 mounts
[os
.path
.join(data_dir
, 'etc/ganesha')] = '/etc/ganesha:z'
406 cluster
= self
.rgw
.get('cluster', 'ceph')
407 rgw_user
= self
.rgw
.get('user', 'admin')
408 mounts
[os
.path
.join(data_dir
, 'keyring.rgw')] = \
409 '/var/lib/ceph/radosgw/%s-%s/keyring:z' % (cluster
, rgw_user
)
413 def get_container_envs():
414 # type: () -> List[str]
416 'CEPH_CONF=%s' % ('/etc/ceph/ceph.conf')
421 def get_version(ctx
, container_id
):
422 # type: (CephadmContext, str) -> Optional[str]
424 out
, err
, code
= call(ctx
,
425 [ctx
.container_engine
.path
, 'exec', container_id
,
426 NFSGanesha
.entrypoint
, '-v'],
427 verbosity
=CallVerbosity
.DEBUG
)
429 match
= re
.search(r
'NFS-Ganesha Release\s*=\s*[V]*([\d.]+)', out
)
431 version
= match
.group(1)
436 if not is_fsid(self
.fsid
):
437 raise Error('not an fsid: %s' % self
.fsid
)
438 if not self
.daemon_id
:
439 raise Error('invalid daemon_id: %s' % self
.daemon_id
)
441 raise Error('invalid image: %s' % self
.image
)
443 # check for the required files
444 if self
.required_files
:
445 for fname
in self
.required_files
:
446 if fname
not in self
.files
:
447 raise Error('required file missing from config-json: %s' % fname
)
449 # check for an RGW config
451 if not self
.rgw
.get('keyring'):
452 raise Error('RGW keyring is missing')
453 if not self
.rgw
.get('user'):
454 raise Error('RGW user is missing')
456 def get_daemon_name(self
):
458 return '%s.%s' % (self
.daemon_type
, self
.daemon_id
)
460 def get_container_name(self
, desc
=None):
461 # type: (Optional[str]) -> str
462 cname
= 'ceph-%s-%s' % (self
.fsid
, self
.get_daemon_name())
464 cname
= '%s-%s' % (cname
, desc
)
467 def get_daemon_args(self
):
468 # type: () -> List[str]
469 return self
.daemon_args
+ self
.extra_args
471 def create_daemon_dirs(self
, data_dir
, uid
, gid
):
472 # type: (str, int, int) -> None
473 """Create files under the container data dir"""
474 if not os
.path
.isdir(data_dir
):
475 raise OSError('data_dir is not a directory: %s' % (data_dir
))
477 logger
.info('Creating ganesha config...')
479 # create the ganesha conf dir
480 config_dir
= os
.path
.join(data_dir
, 'etc/ganesha')
481 makedirs(config_dir
, uid
, gid
, 0o755)
483 # populate files from the config-json
484 populate_files(config_dir
, self
.files
, uid
, gid
)
486 # write the RGW keyring
488 keyring_path
= os
.path
.join(data_dir
, 'keyring.rgw')
489 with
open(keyring_path
, 'w') as f
:
490 os
.fchmod(f
.fileno(), 0o600)
491 os
.fchown(f
.fileno(), uid
, gid
)
492 f
.write(self
.rgw
.get('keyring', ''))
494 def get_rados_grace_container(self
, action
):
495 # type: (str) -> CephContainer
496 """Container for a ganesha action on the grace db"""
497 entrypoint
= '/usr/bin/ganesha-rados-grace'
500 args
= ['--pool', self
.pool
]
502 args
+= ['--ns', self
.namespace
]
504 args
+= ['--userid', self
.userid
]
505 args
+= [action
, self
.get_daemon_name()]
507 data_dir
= get_data_dir(self
.fsid
, self
.ctx
.data_dir
,
508 self
.daemon_type
, self
.daemon_id
)
509 volume_mounts
= self
.get_container_mounts(data_dir
)
510 envs
= self
.get_container_envs()
512 logger
.info('Creating RADOS grace for action: %s' % action
)
516 entrypoint
=entrypoint
,
518 volume_mounts
=volume_mounts
,
519 cname
=self
.get_container_name(desc
='grace-%s' % action
),
524 ##################################
527 class CephIscsi(object):
528 """Defines a Ceph-Iscsi container"""
530 daemon_type
= 'iscsi'
531 entrypoint
= '/usr/bin/rbd-target-api'
533 required_files
= ['iscsi-gateway.cfg']
540 image
=DEFAULT_IMAGE
):
541 # type: (CephadmContext, str, Union[int, str], Dict, str) -> None
544 self
.daemon_id
= daemon_id
547 # config-json options
548 self
.files
= dict_get(config_json
, 'files', {})
550 # validate the supplied args
554 def init(cls
, ctx
, fsid
, daemon_id
):
555 # type: (CephadmContext, str, Union[int, str]) -> CephIscsi
556 return cls(ctx
, fsid
, daemon_id
,
557 get_parm(ctx
.config_json
), ctx
.image
)
560 def get_container_mounts(data_dir
, log_dir
):
561 # type: (str, str) -> Dict[str, str]
563 mounts
[os
.path
.join(data_dir
, 'config')] = '/etc/ceph/ceph.conf:z'
564 mounts
[os
.path
.join(data_dir
, 'keyring')] = '/etc/ceph/keyring:z'
565 mounts
[os
.path
.join(data_dir
, 'iscsi-gateway.cfg')] = '/etc/ceph/iscsi-gateway.cfg:z'
566 mounts
[os
.path
.join(data_dir
, 'configfs')] = '/sys/kernel/config'
567 mounts
[log_dir
] = '/var/log/rbd-target-api:z'
568 mounts
['/dev'] = '/dev'
572 def get_container_binds():
573 # type: () -> List[List[str]]
575 lib_modules
= ['type=bind',
576 'source=/lib/modules',
577 'destination=/lib/modules',
579 binds
.append(lib_modules
)
583 def get_version(ctx
, container_id
):
584 # type: (CephadmContext, str) -> Optional[str]
586 out
, err
, code
= call(ctx
,
587 [ctx
.container_engine
.path
, 'exec', container_id
,
588 '/usr/bin/python3', '-c', "import pkg_resources; print(pkg_resources.require('ceph_iscsi')[0].version)"],
589 verbosity
=CallVerbosity
.DEBUG
)
591 version
= out
.strip()
596 if not is_fsid(self
.fsid
):
597 raise Error('not an fsid: %s' % self
.fsid
)
598 if not self
.daemon_id
:
599 raise Error('invalid daemon_id: %s' % self
.daemon_id
)
601 raise Error('invalid image: %s' % self
.image
)
603 # check for the required files
604 if self
.required_files
:
605 for fname
in self
.required_files
:
606 if fname
not in self
.files
:
607 raise Error('required file missing from config-json: %s' % fname
)
609 def get_daemon_name(self
):
611 return '%s.%s' % (self
.daemon_type
, self
.daemon_id
)
613 def get_container_name(self
, desc
=None):
614 # type: (Optional[str]) -> str
615 cname
= 'ceph-%s-%s' % (self
.fsid
, self
.get_daemon_name())
617 cname
= '%s-%s' % (cname
, desc
)
620 def create_daemon_dirs(self
, data_dir
, uid
, gid
):
621 # type: (str, int, int) -> None
622 """Create files under the container data dir"""
623 if not os
.path
.isdir(data_dir
):
624 raise OSError('data_dir is not a directory: %s' % (data_dir
))
626 logger
.info('Creating ceph-iscsi config...')
627 configfs_dir
= os
.path
.join(data_dir
, 'configfs')
628 makedirs(configfs_dir
, uid
, gid
, 0o755)
630 # populate files from the config-json
631 populate_files(data_dir
, self
.files
, uid
, gid
)
634 def configfs_mount_umount(data_dir
, mount
=True):
635 # type: (str, bool) -> List[str]
636 mount_path
= os
.path
.join(data_dir
, 'configfs')
638 cmd
= 'if ! grep -qs {0} /proc/mounts; then ' \
639 'mount -t configfs none {0}; fi'.format(mount_path
)
641 cmd
= 'if grep -qs {0} /proc/mounts; then ' \
642 'umount {0}; fi'.format(mount_path
)
645 def get_tcmu_runner_container(self
):
646 # type: () -> CephContainer
647 tcmu_container
= get_container(self
.ctx
, self
.fsid
, self
.daemon_type
, self
.daemon_id
)
648 tcmu_container
.entrypoint
= '/usr/bin/tcmu-runner'
649 tcmu_container
.cname
= self
.get_container_name(desc
='tcmu')
650 # remove extra container args for tcmu container.
651 # extra args could cause issue with forking service type
652 tcmu_container
.container_args
= []
653 return tcmu_container
655 ##################################
658 class HAproxy(object):
659 """Defines an HAproxy container"""
660 daemon_type
= 'haproxy'
661 required_files
= ['haproxy.cfg']
662 default_image
= 'haproxy'
666 fsid
: str, daemon_id
: Union
[int, str],
667 config_json
: Dict
, image
: str) -> None:
670 self
.daemon_id
= daemon_id
673 # config-json options
674 self
.files
= dict_get(config_json
, 'files', {})
679 def init(cls
, ctx
: CephadmContext
,
680 fsid
: str, daemon_id
: Union
[int, str]) -> 'HAproxy':
681 return cls(ctx
, fsid
, daemon_id
, get_parm(ctx
.config_json
),
684 def create_daemon_dirs(self
, data_dir
: str, uid
: int, gid
: int) -> None:
685 """Create files under the container data dir"""
686 if not os
.path
.isdir(data_dir
):
687 raise OSError('data_dir is not a directory: %s' % (data_dir
))
689 # create additional directories in data dir for HAproxy to use
690 if not os
.path
.isdir(os
.path
.join(data_dir
, 'haproxy')):
691 makedirs(os
.path
.join(data_dir
, 'haproxy'), uid
, gid
, DATA_DIR_MODE
)
693 data_dir
= os
.path
.join(data_dir
, 'haproxy')
694 populate_files(data_dir
, self
.files
, uid
, gid
)
696 def get_daemon_args(self
) -> List
[str]:
697 return ['haproxy', '-f', '/var/lib/haproxy/haproxy.cfg']
701 if not is_fsid(self
.fsid
):
702 raise Error('not an fsid: %s' % self
.fsid
)
703 if not self
.daemon_id
:
704 raise Error('invalid daemon_id: %s' % self
.daemon_id
)
706 raise Error('invalid image: %s' % self
.image
)
708 # check for the required files
709 if self
.required_files
:
710 for fname
in self
.required_files
:
711 if fname
not in self
.files
:
712 raise Error('required file missing from config-json: %s' % fname
)
714 def get_daemon_name(self
):
716 return '%s.%s' % (self
.daemon_type
, self
.daemon_id
)
718 def get_container_name(self
, desc
=None):
719 # type: (Optional[str]) -> str
720 cname
= 'ceph-%s-%s' % (self
.fsid
, self
.get_daemon_name())
722 cname
= '%s-%s' % (cname
, desc
)
725 def extract_uid_gid_haproxy(self
):
726 # better directory for this?
727 return extract_uid_gid(self
.ctx
, file_path
='/var/lib')
730 def get_container_mounts(data_dir
: str) -> Dict
[str, str]:
732 mounts
[os
.path
.join(data_dir
, 'haproxy')] = '/var/lib/haproxy'
735 ##################################
738 class Keepalived(object):
739 """Defines an Keepalived container"""
740 daemon_type
= 'keepalived'
741 required_files
= ['keepalived.conf']
742 default_image
= 'arcts/keepalived'
746 fsid
: str, daemon_id
: Union
[int, str],
747 config_json
: Dict
, image
: str) -> None:
750 self
.daemon_id
= daemon_id
753 # config-json options
754 self
.files
= dict_get(config_json
, 'files', {})
759 def init(cls
, ctx
: CephadmContext
, fsid
: str,
760 daemon_id
: Union
[int, str]) -> 'Keepalived':
761 return cls(ctx
, fsid
, daemon_id
,
762 get_parm(ctx
.config_json
), ctx
.image
)
764 def create_daemon_dirs(self
, data_dir
: str, uid
: int, gid
: int) -> None:
765 """Create files under the container data dir"""
766 if not os
.path
.isdir(data_dir
):
767 raise OSError('data_dir is not a directory: %s' % (data_dir
))
769 # create additional directories in data dir for keepalived to use
770 if not os
.path
.isdir(os
.path
.join(data_dir
, 'keepalived')):
771 makedirs(os
.path
.join(data_dir
, 'keepalived'), uid
, gid
, DATA_DIR_MODE
)
773 # populate files from the config-json
774 populate_files(data_dir
, self
.files
, uid
, gid
)
778 if not is_fsid(self
.fsid
):
779 raise Error('not an fsid: %s' % self
.fsid
)
780 if not self
.daemon_id
:
781 raise Error('invalid daemon_id: %s' % self
.daemon_id
)
783 raise Error('invalid image: %s' % self
.image
)
785 # check for the required files
786 if self
.required_files
:
787 for fname
in self
.required_files
:
788 if fname
not in self
.files
:
789 raise Error('required file missing from config-json: %s' % fname
)
791 def get_daemon_name(self
):
793 return '%s.%s' % (self
.daemon_type
, self
.daemon_id
)
795 def get_container_name(self
, desc
=None):
796 # type: (Optional[str]) -> str
797 cname
= 'ceph-%s-%s' % (self
.fsid
, self
.get_daemon_name())
799 cname
= '%s-%s' % (cname
, desc
)
803 def get_container_envs():
804 # type: () -> List[str]
806 'KEEPALIVED_AUTOCONF=false',
807 'KEEPALIVED_CONF=/etc/keepalived/keepalived.conf',
808 'KEEPALIVED_CMD=/usr/sbin/keepalived -n -l -f /etc/keepalived/keepalived.conf',
809 'KEEPALIVED_DEBUG=false'
816 '# keepalived needs IP forwarding and non-local bind\n'
817 'sysctl net.ipv4.ip_forward=1\n'
818 'sysctl net.ipv4.ip_nonlocal_bind=1\n'
821 def extract_uid_gid_keepalived(self
):
822 # better directory for this?
823 return extract_uid_gid(self
.ctx
, file_path
='/var/lib')
826 def get_container_mounts(data_dir
: str) -> Dict
[str, str]:
828 mounts
[os
.path
.join(data_dir
, 'keepalived.conf')] = '/etc/keepalived/keepalived.conf'
831 ##################################
834 class CustomContainer(object):
835 """Defines a custom container"""
836 daemon_type
= 'container'
839 fsid
: str, daemon_id
: Union
[int, str],
840 config_json
: Dict
, image
: str) -> None:
842 self
.daemon_id
= daemon_id
845 # config-json options
846 self
.entrypoint
= dict_get(config_json
, 'entrypoint')
847 self
.uid
= dict_get(config_json
, 'uid', 65534) # nobody
848 self
.gid
= dict_get(config_json
, 'gid', 65534) # nobody
849 self
.volume_mounts
= dict_get(config_json
, 'volume_mounts', {})
850 self
.args
= dict_get(config_json
, 'args', [])
851 self
.envs
= dict_get(config_json
, 'envs', [])
852 self
.privileged
= dict_get(config_json
, 'privileged', False)
853 self
.bind_mounts
= dict_get(config_json
, 'bind_mounts', [])
854 self
.ports
= dict_get(config_json
, 'ports', [])
855 self
.dirs
= dict_get(config_json
, 'dirs', [])
856 self
.files
= dict_get(config_json
, 'files', {})
859 def init(cls
, ctx
: CephadmContext
,
860 fsid
: str, daemon_id
: Union
[int, str]) -> 'CustomContainer':
861 return cls(fsid
, daemon_id
,
862 get_parm(ctx
.config_json
), ctx
.image
)
864 def create_daemon_dirs(self
, data_dir
: str, uid
: int, gid
: int) -> None:
866 Create dirs/files below the container data directory.
868 logger
.info('Creating custom container configuration '
869 'dirs/files in {} ...'.format(data_dir
))
871 if not os
.path
.isdir(data_dir
):
872 raise OSError('data_dir is not a directory: %s' % data_dir
)
874 for dir_path
in self
.dirs
:
875 logger
.info('Creating directory: {}'.format(dir_path
))
876 dir_path
= os
.path
.join(data_dir
, dir_path
.strip('/'))
877 makedirs(dir_path
, uid
, gid
, 0o755)
879 for file_path
in self
.files
:
880 logger
.info('Creating file: {}'.format(file_path
))
881 content
= dict_get_join(self
.files
, file_path
)
882 file_path
= os
.path
.join(data_dir
, file_path
.strip('/'))
883 with
open(file_path
, 'w', encoding
='utf-8') as f
:
884 os
.fchown(f
.fileno(), uid
, gid
)
885 os
.fchmod(f
.fileno(), 0o600)
888 def get_daemon_args(self
) -> List
[str]:
891 def get_container_args(self
) -> List
[str]:
894 def get_container_envs(self
) -> List
[str]:
897 def get_container_mounts(self
, data_dir
: str) -> Dict
[str, str]:
899 Get the volume mounts. Relative source paths will be located below
900 `/var/lib/ceph/<cluster-fsid>/<daemon-name>`.
910 /var/lib/ceph/<cluster-fsid>/<daemon-name>/foo/conf: /conf
914 for source
, destination
in self
.volume_mounts
.items():
915 source
= os
.path
.join(data_dir
, source
)
916 mounts
[source
] = destination
919 def get_container_binds(self
, data_dir
: str) -> List
[List
[str]]:
921 Get the bind mounts. Relative `source=...` paths will be located below
922 `/var/lib/ceph/<cluster-fsid>/<daemon-name>`.
927 'source=lib/modules',
928 'destination=/lib/modules',
934 'source=/var/lib/ceph/<cluster-fsid>/<daemon-name>/lib/modules',
938 binds
= self
.bind_mounts
.copy()
940 for index
, value
in enumerate(bind
):
941 match
= re
.match(r
'^source=(.+)$', value
)
943 bind
[index
] = 'source={}'.format(os
.path
.join(
944 data_dir
, match
.group(1)))
947 ##################################
950 def touch(file_path
: str, uid
: Optional
[int] = None, gid
: Optional
[int] = None) -> None:
951 Path(file_path
).touch()
953 os
.chown(file_path
, uid
, gid
)
956 ##################################
959 def dict_get(d
: Dict
, key
: str, default
: Any
= None, require
: bool = False) -> Any
:
961 Helper function to get a key from a dictionary.
962 :param d: The dictionary to process.
963 :param key: The name of the key to get.
964 :param default: The default value in case the key does not
965 exist. Default is `None`.
966 :param require: Set to `True` if the key is required. An
967 exception will be raised if the key does not exist in
968 the given dictionary.
969 :return: Returns the value of the given key.
970 :raises: :exc:`self.Error` if the given key does not exist
971 and `require` is set to `True`.
973 if require
and key
not in d
.keys():
974 raise Error('{} missing from dict'.format(key
))
975 return d
.get(key
, default
) # type: ignore
977 ##################################
980 def dict_get_join(d
: Dict
, key
: str) -> Any
:
982 Helper function to get the value of a given key from a dictionary.
983 `List` values will be converted to a string by joining them with a
985 :param d: The dictionary to process.
986 :param key: The name of the key to get.
987 :return: Returns the value of the given key. If it was a `list`, it
988 will be joining with a line break.
991 if isinstance(value
, list):
992 value
= '\n'.join(map(str, value
))
995 ##################################
998 def get_supported_daemons():
999 # type: () -> List[str]
1000 supported_daemons
= list(Ceph
.daemons
)
1001 supported_daemons
.extend(Monitoring
.components
)
1002 supported_daemons
.append(NFSGanesha
.daemon_type
)
1003 supported_daemons
.append(CephIscsi
.daemon_type
)
1004 supported_daemons
.append(CustomContainer
.daemon_type
)
1005 supported_daemons
.append(CephadmDaemon
.daemon_type
)
1006 supported_daemons
.append(HAproxy
.daemon_type
)
1007 supported_daemons
.append(Keepalived
.daemon_type
)
1008 assert len(supported_daemons
) == len(set(supported_daemons
))
1009 return supported_daemons
1011 ##################################
1014 class PortOccupiedError(Error
):
1018 def attempt_bind(ctx
, s
, address
, port
):
1019 # type: (CephadmContext, socket.socket, str, int) -> None
1021 s
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
1022 s
.bind((address
, port
))
1023 except (socket
.error
, OSError) as e
: # py2 and py3
1024 if e
.errno
== errno
.EADDRINUSE
:
1025 msg
= 'Cannot bind to IP %s port %d: %s' % (address
, port
, e
)
1027 raise PortOccupiedError(msg
)
1034 def port_in_use(ctx
, port_num
):
1035 # type: (CephadmContext, int) -> bool
1036 """Detect whether a port is in use on the local machine - IPv4 and IPv6"""
1037 logger
.info('Verifying port %d ...' % port_num
)
1039 def _port_in_use(af
: socket
.AddressFamily
, address
: str) -> bool:
1041 s
= socket
.socket(af
, socket
.SOCK_STREAM
)
1042 attempt_bind(ctx
, s
, address
, port_num
)
1043 except PortOccupiedError
:
1045 except OSError as e
:
1046 if e
.errno
in (errno
.EAFNOSUPPORT
, errno
.EADDRNOTAVAIL
):
1047 # Ignore EAFNOSUPPORT and EADDRNOTAVAIL as two interfaces are
1048 # being tested here and one might be intentionally be disabled.
1049 # In that case no error should be raised.
1054 return any(_port_in_use(af
, address
) for af
, address
in (
1055 (socket
.AF_INET
, '0.0.0.0'),
1056 (socket
.AF_INET6
, '::')
1060 def check_ip_port(ctx
, ip
, port
):
1061 # type: (CephadmContext, str, int) -> None
1062 if not ctx
.skip_ping_check
:
1063 logger
.info('Verifying IP %s port %d ...' % (ip
, port
))
1065 s
= socket
.socket(socket
.AF_INET6
, socket
.SOCK_STREAM
)
1066 ip
= unwrap_ipv6(ip
)
1068 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
1069 attempt_bind(ctx
, s
, ip
, port
)
1071 ##################################
1074 # this is an abbreviated version of
1075 # https://github.com/benediktschmitt/py-filelock/blob/master/filelock.py
1076 # that drops all of the compatibility (this is Unix/Linux only).
1078 class Timeout(TimeoutError
):
1080 Raised when the lock could not be acquired in *timeout*
1084 def __init__(self
, lock_file
):
1087 #: The path of the file lock.
1088 self
.lock_file
= lock_file
1092 temp
= "The file lock '{}' could not be acquired."\
1093 .format(self
.lock_file
)
1097 class _Acquire_ReturnProxy(object):
1098 def __init__(self
, lock
):
1102 def __enter__(self
):
1105 def __exit__(self
, exc_type
, exc_value
, traceback
):
1110 class FileLock(object):
1111 def __init__(self
, ctx
: CephadmContext
, name
, timeout
=-1):
1112 if not os
.path
.exists(LOCK_DIR
):
1113 os
.mkdir(LOCK_DIR
, 0o700)
1114 self
._lock
_file
= os
.path
.join(LOCK_DIR
, name
+ '.lock')
1117 # The file descriptor for the *_lock_file* as it is returned by the
1118 # os.open() function.
1119 # This file lock is only NOT None, if the object currently holds the
1121 self
._lock
_file
_fd
: Optional
[int] = None
1122 self
.timeout
= timeout
1123 # The lock counter is used for implementing the nested locking
1124 # mechanism. Whenever the lock is acquired, the counter is increased and
1125 # the lock is only released, when this value is 0 again.
1126 self
._lock
_counter
= 0
1130 def is_locked(self
):
1131 return self
._lock
_file
_fd
is not None
1133 def acquire(self
, timeout
=None, poll_intervall
=0.05):
1135 Acquires the file lock or fails with a :exc:`Timeout` error.
1136 .. code-block:: python
1137 # You can use this method in the context manager (recommended)
1138 with lock.acquire():
1140 # Or use an equivalent try-finally construct:
1147 The maximum time waited for the file lock.
1148 If ``timeout < 0``, there is no timeout and this method will
1149 block until the lock could be acquired.
1150 If ``timeout`` is None, the default :attr:`~timeout` is used.
1151 :arg float poll_intervall:
1152 We check once in *poll_intervall* seconds if we can acquire the
1155 if the lock could not be acquired in *timeout* seconds.
1156 .. versionchanged:: 2.0.0
1157 This method returns now a *proxy* object instead of *self*,
1158 so that it can be used in a with statement without side effects.
1161 # Use the default timeout, if no timeout is provided.
1163 timeout
= self
.timeout
1165 # Increment the number right at the beginning.
1166 # We can still undo it, if something fails.
1167 self
._lock
_counter
+= 1
1170 lock_filename
= self
._lock
_file
1171 start_time
= time
.time()
1174 if not self
.is_locked
:
1175 logger
.debug('Acquiring lock %s on %s', lock_id
,
1180 logger
.debug('Lock %s acquired on %s', lock_id
,
1183 elif timeout
>= 0 and time
.time() - start_time
> timeout
:
1184 logger
.warning('Timeout acquiring lock %s on %s', lock_id
,
1186 raise Timeout(self
._lock
_file
)
1189 'Lock %s not acquired on %s, waiting %s seconds ...',
1190 lock_id
, lock_filename
, poll_intervall
1192 time
.sleep(poll_intervall
)
1194 # Something did go wrong, so decrement the counter.
1195 self
._lock
_counter
= max(0, self
._lock
_counter
- 1)
1198 return _Acquire_ReturnProxy(lock
=self
)
1200 def release(self
, force
=False):
1202 Releases the file lock.
1203 Please note, that the lock is only completly released, if the lock
1205 Also note, that the lock file itself is not automatically deleted.
1207 If true, the lock counter is ignored and the lock is released in
1211 self
._lock
_counter
-= 1
1213 if self
._lock
_counter
== 0 or force
:
1215 lock_filename
= self
._lock
_file
1217 logger
.debug('Releasing lock %s on %s', lock_id
, lock_filename
)
1219 self
._lock
_counter
= 0
1220 logger
.debug('Lock %s released on %s', lock_id
, lock_filename
)
1224 def __enter__(self
):
1228 def __exit__(self
, exc_type
, exc_value
, traceback
):
1233 self
.release(force
=True)
1237 open_mode
= os
.O_RDWR | os
.O_CREAT | os
.O_TRUNC
1238 fd
= os
.open(self
._lock
_file
, open_mode
)
1241 fcntl
.flock(fd
, fcntl
.LOCK_EX | fcntl
.LOCK_NB
)
1242 except (IOError, OSError):
1245 self
._lock
_file
_fd
= fd
1249 # Do not remove the lockfile:
1251 # https://github.com/benediktschmitt/py-filelock/issues/31
1252 # https://stackoverflow.com/questions/17708885/flock-removing-locked-file-without-race-condition
1253 fd
= self
._lock
_file
_fd
1254 self
._lock
_file
_fd
= None
1255 fcntl
.flock(fd
, fcntl
.LOCK_UN
) # type: ignore
1256 os
.close(fd
) # type: ignore
1260 ##################################
1261 # Popen wrappers, lifted from ceph-volume
1263 class CallVerbosity(Enum
):
1265 # log stdout/stderr to logger.debug
1267 # On a non-zero exit status, it will forcefully set
1268 # logging ON for the terminal
1269 VERBOSE_ON_FAILURE
= 2
1270 # log at info (instead of debug) level.
1274 if sys
.version_info
< (3, 8):
1278 from asyncio
import events
1280 class ThreadedChildWatcher(asyncio
.AbstractChildWatcher
):
1281 """Threaded child watcher implementation.
1282 The watcher uses a thread per process
1283 for waiting for the process finish.
1284 It doesn't require subscription on POSIX signal
1285 but a thread creation is not free.
1286 The watcher has O(1) complexity, its performance doesn't depend
1287 on amount of spawn processes.
1291 self
._pid
_counter
= itertools
.count(0)
1294 def is_active(self
):
1298 self
._join
_threads
()
1300 def _join_threads(self
):
1301 """Internal: Join all non-daemon threads"""
1302 threads
= [thread
for thread
in list(self
._threads
.values())
1303 if thread
.is_alive() and not thread
.daemon
]
1304 for thread
in threads
:
1307 def __enter__(self
):
1310 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
1313 def __del__(self
, _warn
=warnings
.warn
):
1314 threads
= [thread
for thread
in list(self
._threads
.values())
1315 if thread
.is_alive()]
1317 _warn(f
'{self.__class__} has registered but not finished child processes',
1321 def add_child_handler(self
, pid
, callback
, *args
):
1322 loop
= events
.get_event_loop()
1323 thread
= threading
.Thread(target
=self
._do
_waitpid
,
1324 name
=f
'waitpid-{next(self._pid_counter)}',
1325 args
=(loop
, pid
, callback
, args
),
1327 self
._threads
[pid
] = thread
1330 def remove_child_handler(self
, pid
):
1331 # asyncio never calls remove_child_handler() !!!
1332 # The method is no-op but is implemented because
1333 # abstract base classe requires it
1336 def attach_loop(self
, loop
):
1339 def _do_waitpid(self
, loop
, expected_pid
, callback
, args
):
1340 assert expected_pid
> 0
1343 pid
, status
= os
.waitpid(expected_pid
, 0)
1344 except ChildProcessError
:
1345 # The child process is already reaped
1346 # (may happen if waitpid() is called elsewhere).
1350 'Unknown child process pid %d, will report returncode 255',
1353 if os
.WIFEXITED(status
):
1354 returncode
= os
.WEXITSTATUS(status
)
1355 elif os
.WIFSIGNALED(status
):
1356 returncode
= -os
.WTERMSIG(status
)
1358 raise ValueError(f
'unknown wait status {status}')
1359 if loop
.get_debug():
1360 logger
.debug('process %s exited with returncode %s',
1361 expected_pid
, returncode
)
1363 if loop
.is_closed():
1364 logger
.warning('Loop %r that handles pid %r is closed', loop
, pid
)
1366 loop
.call_soon_threadsafe(callback
, pid
, returncode
, *args
)
1368 self
._threads
.pop(expected_pid
)
1370 # unlike SafeChildWatcher which handles SIGCHLD in the main thread,
1371 # ThreadedChildWatcher runs in a separated thread, hence allows us to
1372 # run create_subprocess_exec() in non-main thread, see
1373 # https://bugs.python.org/issue35621
1374 asyncio
.set_child_watcher(ThreadedChildWatcher())
1378 from asyncio
import run
as async_run
# type: ignore[attr-defined]
1380 def async_run(coro
): # type: ignore
1381 loop
= asyncio
.new_event_loop()
1383 asyncio
.set_event_loop(loop
)
1384 return loop
.run_until_complete(coro
)
1387 loop
.run_until_complete(loop
.shutdown_asyncgens())
1389 asyncio
.set_event_loop(None)
1393 def call(ctx
: CephadmContext
,
1395 desc
: Optional
[str] = None,
1396 verbosity
: CallVerbosity
= CallVerbosity
.VERBOSE_ON_FAILURE
,
1397 timeout
: Optional
[int] = DEFAULT_TIMEOUT
,
1398 **kwargs
) -> Tuple
[str, str, int]:
1400 Wrap subprocess.Popen to
1402 - log stdout/stderr to a logger,
1404 - cleanly return out, err, returncode
1406 :param timeout: timeout in seconds
1409 prefix
= command
[0] if desc
is None else desc
1412 timeout
= timeout
or ctx
.timeout
1414 logger
.debug('Running command: %s' % ' '.join(command
))
1416 async def tee(reader
: asyncio
.StreamReader
) -> str:
1417 collected
= StringIO()
1418 async for line
in reader
:
1419 message
= line
.decode('utf-8')
1420 collected
.write(message
)
1421 if verbosity
== CallVerbosity
.VERBOSE
:
1422 logger
.info(prefix
+ message
.rstrip())
1423 elif verbosity
!= CallVerbosity
.SILENT
:
1424 logger
.debug(prefix
+ message
.rstrip())
1425 return collected
.getvalue()
1427 async def run_with_timeout() -> Tuple
[str, str, int]:
1428 process
= await asyncio
.create_subprocess_exec(
1430 stdout
=asyncio
.subprocess
.PIPE
,
1431 stderr
=asyncio
.subprocess
.PIPE
)
1432 assert process
.stdout
1433 assert process
.stderr
1435 stdout
, stderr
= await asyncio
.gather(tee(process
.stdout
),
1436 tee(process
.stderr
))
1437 returncode
= await asyncio
.wait_for(process
.wait(), timeout
)
1438 except asyncio
.TimeoutError
:
1439 logger
.info(prefix
+ f
'timeout after {timeout} seconds')
1442 return stdout
, stderr
, returncode
1444 stdout
, stderr
, returncode
= async_run(run_with_timeout())
1445 if returncode
!= 0 and verbosity
== CallVerbosity
.VERBOSE_ON_FAILURE
:
1446 logger
.info('Non-zero exit code %d from %s',
1447 returncode
, ' '.join(command
))
1448 for line
in stdout
.splitlines():
1449 logger
.info(prefix
+ 'stdout ' + line
)
1450 for line
in stderr
.splitlines():
1451 logger
.info(prefix
+ 'stderr ' + line
)
1452 return stdout
, stderr
, returncode
1456 ctx
: CephadmContext
,
1458 desc
: Optional
[str] = None,
1459 verbosity
: CallVerbosity
= CallVerbosity
.VERBOSE_ON_FAILURE
,
1460 timeout
: Optional
[int] = DEFAULT_TIMEOUT
,
1461 **kwargs
) -> Tuple
[str, str, int]:
1462 out
, err
, ret
= call(ctx
, command
, desc
, verbosity
, timeout
, **kwargs
)
1464 raise RuntimeError('Failed command: %s' % ' '.join(command
))
1465 return out
, err
, ret
1468 def call_timeout(ctx
, command
, timeout
):
1469 # type: (CephadmContext, List[str], int) -> int
1470 logger
.debug('Running command (timeout=%s): %s'
1471 % (timeout
, ' '.join(command
)))
1473 def raise_timeout(command
, timeout
):
1474 # type: (List[str], int) -> NoReturn
1475 msg
= 'Command `%s` timed out after %s seconds' % (command
, timeout
)
1477 raise TimeoutExpired(msg
)
1480 return subprocess
.call(command
, timeout
=timeout
)
1481 except subprocess
.TimeoutExpired
:
1482 raise_timeout(command
, timeout
)
1484 ##################################
1487 def is_available(ctx
, what
, func
):
1488 # type: (CephadmContext, str, Callable[[], bool]) -> None
1490 Wait for a service to become available
1492 :param what: the name of the service
1493 :param func: the callable object that determines availability
1496 logger
.info('Waiting for %s...' % what
)
1500 logger
.info('%s is available'
1504 raise Error('%s not available after %s tries'
1507 logger
.info('%s not available, waiting (%s/%s)...'
1508 % (what
, num
, retry
))
1514 def read_config(fn
):
1515 # type: (Optional[str]) -> ConfigParser
1523 # type: (str) -> str
1524 p
= os
.path
.expanduser(p
)
1525 return os
.path
.abspath(p
)
1528 def get_file_timestamp(fn
):
1529 # type: (str) -> Optional[str]
1531 mt
= os
.path
.getmtime(fn
)
1532 return datetime
.datetime
.fromtimestamp(
1533 mt
, tz
=datetime
.timezone
.utc
1539 def try_convert_datetime(s
):
1540 # type: (str) -> Optional[str]
1541 # This is super irritating because
1542 # 1) podman and docker use different formats
1543 # 2) python's strptime can't parse either one
1546 # docker 18.09.7: 2020-03-03T09:21:43.636153304Z
1547 # podman 1.7.0: 2020-03-03T15:52:30.136257504-06:00
1548 # 2020-03-03 15:52:30.136257504 -0600 CST
1549 # (In the podman case, there is a different string format for
1550 # 'inspect' and 'inspect --format {{.Created}}'!!)
1552 # In *all* cases, the 9 digit second precision is too much for
1553 # python's strptime. Shorten it to 6 digits.
1554 p
= re
.compile(r
'(\.[\d]{6})[\d]*')
1557 # replace trailing Z with -0000, since (on python 3.6.8) it won't parse
1558 if s
and s
[-1] == 'Z':
1559 s
= s
[:-1] + '-0000'
1561 # cut off the redundant 'CST' part that strptime can't parse, if
1564 s
= ' '.join(v
[0:3])
1566 # try parsing with several format strings
1568 '%Y-%m-%dT%H:%M:%S.%f%z',
1569 '%Y-%m-%d %H:%M:%S.%f %z',
1573 # return timestamp normalized to UTC, rendered as DATEFMT.
1574 return datetime
.datetime
.strptime(s
, f
).astimezone(tz
=datetime
.timezone
.utc
).strftime(DATEFMT
)
1580 def _parse_podman_version(version_str
):
1581 # type: (str) -> Tuple[int, ...]
1582 def to_int(val
, org_e
=None):
1583 if not val
and org_e
:
1587 except ValueError as e
:
1588 return to_int(val
[0:-1], org_e
or e
)
1590 return tuple(map(to_int
, version_str
.split('.')))
1595 return socket
.gethostname()
1600 return socket
.getfqdn() or socket
.gethostname()
1605 return platform
.uname().machine
1608 def generate_service_id():
1610 return get_hostname() + '.' + ''.join(random
.choice(string
.ascii_lowercase
)
1614 def generate_password():
1616 return ''.join(random
.choice(string
.ascii_lowercase
+ string
.digits
)
1620 def normalize_container_id(i
):
1621 # type: (str) -> str
1622 # docker adds the sha256: prefix, but AFAICS both
1623 # docker (18.09.7 in bionic at least) and podman
1624 # both always use sha256, so leave off the prefix
1627 if i
.startswith(prefix
):
1634 return str(uuid
.uuid1())
1638 # type: (str) -> bool
1646 def infer_fsid(func
):
1648 If we only find a single fsid in /var/lib/ceph/*, use that
1651 def _infer_fsid(ctx
: CephadmContext
):
1653 logger
.debug('Using specified fsid: %s' % ctx
.fsid
)
1657 daemon_list
= list_daemons(ctx
, detail
=False)
1658 for daemon
in daemon_list
:
1659 if not is_fsid(daemon
['fsid']):
1662 elif 'name' not in ctx
or not ctx
.name
:
1663 # ctx.name not specified
1664 fsids_set
.add(daemon
['fsid'])
1665 elif daemon
['name'] == ctx
.name
:
1666 # ctx.name is a match
1667 fsids_set
.add(daemon
['fsid'])
1668 fsids
= sorted(fsids_set
)
1671 # some commands do not always require an fsid
1673 elif len(fsids
) == 1:
1674 logger
.info('Inferring fsid %s' % fsids
[0])
1677 raise Error('Cannot infer an fsid, one must be specified: %s' % fsids
)
1683 def infer_config(func
):
1685 If we find a MON daemon, use the config from that container
1688 def _infer_config(ctx
: CephadmContext
):
1690 logger
.debug('Using specified config: %s' % ctx
.config
)
1696 daemon_list
= list_daemons(ctx
, detail
=False)
1697 for daemon
in daemon_list
:
1698 if daemon
['name'].startswith('mon.'):
1699 name
= daemon
['name']
1702 config
= '/var/lib/ceph/{}/{}/config'.format(ctx
.fsid
,
1705 logger
.info('Inferring config %s' % config
)
1707 elif os
.path
.exists(SHELL_DEFAULT_CONF
):
1708 logger
.debug('Using default config: %s' % SHELL_DEFAULT_CONF
)
1709 ctx
.config
= SHELL_DEFAULT_CONF
1712 return _infer_config
1715 def _get_default_image(ctx
: CephadmContext
):
1716 if DEFAULT_IMAGE_IS_MASTER
:
1717 warn
= """This is a development version of cephadm.
1718 For information regarding the latest stable release:
1719 https://docs.ceph.com/docs/{}/cephadm/install
1720 """.format(LATEST_STABLE_RELEASE
)
1721 for line
in warn
.splitlines():
1722 logger
.warning('{}{}{}'.format(termcolor
.yellow
, line
, termcolor
.end
))
1723 return DEFAULT_IMAGE
1726 def infer_image(func
):
1728 Use the most recent ceph image
1731 def _infer_image(ctx
: CephadmContext
):
1733 ctx
.image
= os
.environ
.get('CEPHADM_IMAGE')
1735 ctx
.image
= get_last_local_ceph_image(ctx
, ctx
.container_engine
.path
)
1737 ctx
.image
= _get_default_image(ctx
)
1743 def default_image(func
):
1745 def _default_image(ctx
: CephadmContext
):
1747 if 'name' in ctx
and ctx
.name
:
1748 type_
= ctx
.name
.split('.', 1)[0]
1749 if type_
in Monitoring
.components
:
1750 ctx
.image
= Monitoring
.components
[type_
]['image']
1751 if type_
== 'haproxy':
1752 ctx
.image
= HAproxy
.default_image
1753 if type_
== 'keepalived':
1754 ctx
.image
= Keepalived
.default_image
1756 ctx
.image
= os
.environ
.get('CEPHADM_IMAGE')
1758 ctx
.image
= _get_default_image(ctx
)
1762 return _default_image
1765 def get_last_local_ceph_image(ctx
: CephadmContext
, container_path
: str):
1767 :return: The most recent local ceph image (already pulled)
1769 out
, _
, _
= call_throws(ctx
,
1770 [container_path
, 'images',
1771 '--filter', 'label=ceph=True',
1772 '--filter', 'dangling=false',
1773 '--format', '{{.Repository}}@{{.Digest}}'])
1774 return _filter_last_local_ceph_image(out
)
1777 def _filter_last_local_ceph_image(out
):
1778 # type: (str) -> Optional[str]
1779 for image
in out
.splitlines():
1780 if image
and not image
.endswith('@'):
1781 logger
.info('Using recent ceph image %s' % image
)
1786 def write_tmp(s
, uid
, gid
):
1787 # type: (str, int, int) -> IO[str]
1788 tmp_f
= tempfile
.NamedTemporaryFile(mode
='w',
1790 os
.fchown(tmp_f
.fileno(), uid
, gid
)
1797 def makedirs(dir, uid
, gid
, mode
):
1798 # type: (str, int, int, int) -> None
1799 if not os
.path
.exists(dir):
1800 os
.makedirs(dir, mode
=mode
)
1803 os
.chown(dir, uid
, gid
)
1804 os
.chmod(dir, mode
) # the above is masked by umask...
1807 def get_data_dir(fsid
, data_dir
, t
, n
):
1808 # type: (str, str, str, Union[int, str]) -> str
1809 return os
.path
.join(data_dir
, fsid
, '%s.%s' % (t
, n
))
1812 def get_log_dir(fsid
, log_dir
):
1813 # type: (str, str) -> str
1814 return os
.path
.join(log_dir
, fsid
)
1817 def make_data_dir_base(fsid
, data_dir
, uid
, gid
):
1818 # type: (str, str, int, int) -> str
1819 data_dir_base
= os
.path
.join(data_dir
, fsid
)
1820 makedirs(data_dir_base
, uid
, gid
, DATA_DIR_MODE
)
1821 makedirs(os
.path
.join(data_dir_base
, 'crash'), uid
, gid
, DATA_DIR_MODE
)
1822 makedirs(os
.path
.join(data_dir_base
, 'crash', 'posted'), uid
, gid
,
1824 return data_dir_base
1827 def make_data_dir(ctx
, fsid
, daemon_type
, daemon_id
, uid
=None, gid
=None):
1828 # type: (CephadmContext, str, str, Union[int, str], Optional[int], Optional[int]) -> str
1829 if uid
is None or gid
is None:
1830 uid
, gid
= extract_uid_gid(ctx
)
1831 make_data_dir_base(fsid
, ctx
.data_dir
, uid
, gid
)
1832 data_dir
= get_data_dir(fsid
, ctx
.data_dir
, daemon_type
, daemon_id
)
1833 makedirs(data_dir
, uid
, gid
, DATA_DIR_MODE
)
1837 def make_log_dir(ctx
, fsid
, uid
=None, gid
=None):
1838 # type: (CephadmContext, str, Optional[int], Optional[int]) -> str
1839 if uid
is None or gid
is None:
1840 uid
, gid
= extract_uid_gid(ctx
)
1841 log_dir
= get_log_dir(fsid
, ctx
.log_dir
)
1842 makedirs(log_dir
, uid
, gid
, LOG_DIR_MODE
)
1846 def make_var_run(ctx
, fsid
, uid
, gid
):
1847 # type: (CephadmContext, str, int, int) -> None
1848 call_throws(ctx
, ['install', '-d', '-m0770', '-o', str(uid
), '-g', str(gid
),
1849 '/var/run/ceph/%s' % fsid
])
1852 def copy_tree(ctx
, src
, dst
, uid
=None, gid
=None):
1853 # type: (CephadmContext, List[str], str, Optional[int], Optional[int]) -> None
1855 Copy a directory tree from src to dst
1857 if uid
is None or gid
is None:
1858 (uid
, gid
) = extract_uid_gid(ctx
)
1862 if os
.path
.isdir(dst
):
1863 dst_dir
= os
.path
.join(dst
, os
.path
.basename(src_dir
))
1865 logger
.debug('copy directory `%s` -> `%s`' % (src_dir
, dst_dir
))
1866 shutil
.rmtree(dst_dir
, ignore_errors
=True)
1867 shutil
.copytree(src_dir
, dst_dir
) # dirs_exist_ok needs python 3.8
1869 for dirpath
, dirnames
, filenames
in os
.walk(dst_dir
):
1870 logger
.debug('chown %s:%s `%s`' % (uid
, gid
, dirpath
))
1871 os
.chown(dirpath
, uid
, gid
)
1872 for filename
in filenames
:
1873 logger
.debug('chown %s:%s `%s`' % (uid
, gid
, filename
))
1874 os
.chown(os
.path
.join(dirpath
, filename
), uid
, gid
)
1877 def copy_files(ctx
, src
, dst
, uid
=None, gid
=None):
1878 # type: (CephadmContext, List[str], str, Optional[int], Optional[int]) -> None
1880 Copy a files from src to dst
1882 if uid
is None or gid
is None:
1883 (uid
, gid
) = extract_uid_gid(ctx
)
1885 for src_file
in src
:
1887 if os
.path
.isdir(dst
):
1888 dst_file
= os
.path
.join(dst
, os
.path
.basename(src_file
))
1890 logger
.debug('copy file `%s` -> `%s`' % (src_file
, dst_file
))
1891 shutil
.copyfile(src_file
, dst_file
)
1893 logger
.debug('chown %s:%s `%s`' % (uid
, gid
, dst_file
))
1894 os
.chown(dst_file
, uid
, gid
)
1897 def move_files(ctx
, src
, dst
, uid
=None, gid
=None):
1898 # type: (CephadmContext, List[str], str, Optional[int], Optional[int]) -> None
1900 Move files from src to dst
1902 if uid
is None or gid
is None:
1903 (uid
, gid
) = extract_uid_gid(ctx
)
1905 for src_file
in src
:
1907 if os
.path
.isdir(dst
):
1908 dst_file
= os
.path
.join(dst
, os
.path
.basename(src_file
))
1910 if os
.path
.islink(src_file
):
1911 # shutil.move() in py2 does not handle symlinks correctly
1912 src_rl
= os
.readlink(src_file
)
1913 logger
.debug("symlink '%s' -> '%s'" % (dst_file
, src_rl
))
1914 os
.symlink(src_rl
, dst_file
)
1917 logger
.debug("move file '%s' -> '%s'" % (src_file
, dst_file
))
1918 shutil
.move(src_file
, dst_file
)
1919 logger
.debug('chown %s:%s `%s`' % (uid
, gid
, dst_file
))
1920 os
.chown(dst_file
, uid
, gid
)
1923 # copied from distutils
1924 def find_executable(executable
, path
=None):
1925 """Tries to find 'executable' in the directories listed in 'path'.
1926 A string listing directories separated by 'os.pathsep'; defaults to
1927 os.environ['PATH']. Returns the complete filename or None if not found.
1929 _
, ext
= os
.path
.splitext(executable
)
1930 if (sys
.platform
== 'win32') and (ext
!= '.exe'):
1931 executable
= executable
+ '.exe'
1933 if os
.path
.isfile(executable
):
1937 path
= os
.environ
.get('PATH', None)
1940 path
= os
.confstr('CS_PATH')
1941 except (AttributeError, ValueError):
1942 # os.confstr() or CS_PATH is not available
1944 # bpo-35755: Don't use os.defpath if the PATH environment variable is
1945 # set to an empty string
1947 # PATH='' doesn't match, whereas PATH=':' looks in the current directory
1951 paths
= path
.split(os
.pathsep
)
1953 f
= os
.path
.join(p
, executable
)
1954 if os
.path
.isfile(f
):
1955 # the file exists, we have a shot at spawn working
1960 def find_program(filename
):
1961 # type: (str) -> str
1962 name
= find_executable(filename
)
1964 raise ValueError('%s not found' % filename
)
1968 def find_container_engine(ctx
: CephadmContext
):
1972 for i
in CONTAINER_PREFERENCE
:
1975 except Exception as e
:
1976 logger
.debug('Could not locate %s: %s' % (i
.EXE
, e
))
1980 def check_container_engine(ctx
):
1981 # type: (CephadmContext) -> None
1982 engine
= ctx
.container_engine
1983 if not isinstance(engine
, CONTAINER_PREFERENCE
):
1984 raise Error('Unable to locate any of %s' % [i
.EXE
for i
in CONTAINER_PREFERENCE
])
1985 elif isinstance(engine
, Podman
):
1986 engine
.get_version(ctx
)
1987 if engine
.version
< MIN_PODMAN_VERSION
:
1988 raise Error('podman version %d.%d.%d or later is required' % MIN_PODMAN_VERSION
)
1991 def get_unit_name(fsid
, daemon_type
, daemon_id
=None):
1992 # type: (str, str, Optional[Union[int, str]]) -> str
1993 # accept either name or type + id
1994 if daemon_type
== CephadmDaemon
.daemon_type
and daemon_id
is not None:
1995 return 'ceph-%s-%s.%s' % (fsid
, daemon_type
, daemon_id
)
1996 elif daemon_id
is not None:
1997 return 'ceph-%s@%s.%s' % (fsid
, daemon_type
, daemon_id
)
1999 return 'ceph-%s@%s' % (fsid
, daemon_type
)
2002 def get_unit_name_by_daemon_name(ctx
: CephadmContext
, fsid
, name
):
2003 daemon
= get_daemon_description(ctx
, fsid
, name
)
2005 return daemon
['systemd_unit']
2007 raise Error('Failed to get unit name for {}'.format(daemon
))
2010 def check_unit(ctx
, unit_name
):
2011 # type: (CephadmContext, str) -> Tuple[bool, str, bool]
2012 # NOTE: we ignore the exit code here because systemctl outputs
2013 # various exit codes based on the state of the service, but the
2014 # string result is more explicit (and sufficient).
2018 out
, err
, code
= call(ctx
, ['systemctl', 'is-enabled', unit_name
],
2019 verbosity
=CallVerbosity
.DEBUG
)
2023 elif 'disabled' in out
:
2025 except Exception as e
:
2026 logger
.warning('unable to run systemctl: %s' % e
)
2032 out
, err
, code
= call(ctx
, ['systemctl', 'is-active', unit_name
],
2033 verbosity
=CallVerbosity
.DEBUG
)
2035 if out
in ['active']:
2037 elif out
in ['inactive']:
2039 elif out
in ['failed', 'auto-restart']:
2043 except Exception as e
:
2044 logger
.warning('unable to run systemctl: %s' % e
)
2046 return (enabled
, state
, installed
)
2049 def check_units(ctx
, units
, enabler
=None):
2050 # type: (CephadmContext, List[str], Optional[Packager]) -> bool
2052 (enabled
, state
, installed
) = check_unit(ctx
, u
)
2053 if enabled
and state
== 'running':
2054 logger
.info('Unit %s is enabled and running' % u
)
2056 if enabler
is not None:
2058 logger
.info('Enabling unit %s' % u
)
2059 enabler
.enable_service(u
)
2063 def is_container_running(ctx
: CephadmContext
, name
: str) -> bool:
2064 out
, err
, ret
= call_throws(ctx
, [
2065 ctx
.container_engine
.path
, 'ps',
2066 '--format', '{{.Names}}'])
2070 def get_legacy_config_fsid(cluster
, legacy_dir
=None):
2071 # type: (str, Optional[str]) -> Optional[str]
2072 config_file
= '/etc/ceph/%s.conf' % cluster
2073 if legacy_dir
is not None:
2074 config_file
= os
.path
.abspath(legacy_dir
+ config_file
)
2076 if os
.path
.exists(config_file
):
2077 config
= read_config(config_file
)
2078 if config
.has_section('global') and config
.has_option('global', 'fsid'):
2079 return config
.get('global', 'fsid')
2083 def get_legacy_daemon_fsid(ctx
, cluster
,
2084 daemon_type
, daemon_id
, legacy_dir
=None):
2085 # type: (CephadmContext, str, str, Union[int, str], Optional[str]) -> Optional[str]
2087 if daemon_type
== 'osd':
2089 fsid_file
= os
.path
.join(ctx
.data_dir
,
2091 'ceph-%s' % daemon_id
,
2093 if legacy_dir
is not None:
2094 fsid_file
= os
.path
.abspath(legacy_dir
+ fsid_file
)
2095 with
open(fsid_file
, 'r') as f
:
2096 fsid
= f
.read().strip()
2100 fsid
= get_legacy_config_fsid(cluster
, legacy_dir
=legacy_dir
)
2104 def get_daemon_args(ctx
, fsid
, daemon_type
, daemon_id
):
2105 # type: (CephadmContext, str, str, Union[int, str]) -> List[str]
2106 r
= list() # type: List[str]
2108 if daemon_type
in Ceph
.daemons
and daemon_type
!= 'crash':
2110 '--setuser', 'ceph',
2111 '--setgroup', 'ceph',
2112 '--default-log-to-file=false',
2113 '--default-log-to-stderr=true',
2114 '--default-log-stderr-prefix=debug ',
2116 if daemon_type
== 'mon':
2118 '--default-mon-cluster-log-to-file=false',
2119 '--default-mon-cluster-log-to-stderr=true',
2121 elif daemon_type
in Monitoring
.components
:
2122 metadata
= Monitoring
.components
[daemon_type
]
2123 r
+= metadata
.get('args', list())
2124 if daemon_type
== 'alertmanager':
2125 config
= get_parm(ctx
.config_json
)
2126 peers
= config
.get('peers', list()) # type: ignore
2128 r
+= ['--cluster.peer={}'.format(peer
)]
2129 # some alertmanager, by default, look elsewhere for a config
2130 r
+= ['--config.file=/etc/alertmanager/alertmanager.yml']
2131 elif daemon_type
== NFSGanesha
.daemon_type
:
2132 nfs_ganesha
= NFSGanesha
.init(ctx
, fsid
, daemon_id
)
2133 r
+= nfs_ganesha
.get_daemon_args()
2134 elif daemon_type
== HAproxy
.daemon_type
:
2135 haproxy
= HAproxy
.init(ctx
, fsid
, daemon_id
)
2136 r
+= haproxy
.get_daemon_args()
2137 elif daemon_type
== CustomContainer
.daemon_type
:
2138 cc
= CustomContainer
.init(ctx
, fsid
, daemon_id
)
2139 r
.extend(cc
.get_daemon_args())
2144 def create_daemon_dirs(ctx
, fsid
, daemon_type
, daemon_id
, uid
, gid
,
2145 config
=None, keyring
=None):
2146 # type: (CephadmContext, str, str, Union[int, str], int, int, Optional[str], Optional[str]) -> None
2147 data_dir
= make_data_dir(ctx
, fsid
, daemon_type
, daemon_id
, uid
=uid
, gid
=gid
)
2148 make_log_dir(ctx
, fsid
, uid
=uid
, gid
=gid
)
2151 config_path
= os
.path
.join(data_dir
, 'config')
2152 with
open(config_path
, 'w') as f
:
2153 os
.fchown(f
.fileno(), uid
, gid
)
2154 os
.fchmod(f
.fileno(), 0o600)
2158 keyring_path
= os
.path
.join(data_dir
, 'keyring')
2159 with
open(keyring_path
, 'w') as f
:
2160 os
.fchmod(f
.fileno(), 0o600)
2161 os
.fchown(f
.fileno(), uid
, gid
)
2164 if daemon_type
in Monitoring
.components
.keys():
2165 config_json
: Dict
[str, Any
] = get_parm(ctx
.config_json
)
2166 required_files
= Monitoring
.components
[daemon_type
].get('config-json-files', list())
2168 # Set up directories specific to the monitoring component
2171 if daemon_type
== 'prometheus':
2172 data_dir_root
= get_data_dir(fsid
, ctx
.data_dir
,
2173 daemon_type
, daemon_id
)
2174 config_dir
= 'etc/prometheus'
2175 makedirs(os
.path
.join(data_dir_root
, config_dir
), uid
, gid
, 0o755)
2176 makedirs(os
.path
.join(data_dir_root
, config_dir
, 'alerting'), uid
, gid
, 0o755)
2177 makedirs(os
.path
.join(data_dir_root
, 'data'), uid
, gid
, 0o755)
2178 elif daemon_type
== 'grafana':
2179 data_dir_root
= get_data_dir(fsid
, ctx
.data_dir
,
2180 daemon_type
, daemon_id
)
2181 config_dir
= 'etc/grafana'
2182 makedirs(os
.path
.join(data_dir_root
, config_dir
), uid
, gid
, 0o755)
2183 makedirs(os
.path
.join(data_dir_root
, config_dir
, 'certs'), uid
, gid
, 0o755)
2184 makedirs(os
.path
.join(data_dir_root
, config_dir
, 'provisioning/datasources'), uid
, gid
, 0o755)
2185 makedirs(os
.path
.join(data_dir_root
, 'data'), uid
, gid
, 0o755)
2186 touch(os
.path
.join(data_dir_root
, 'data', 'grafana.db'), uid
, gid
)
2187 elif daemon_type
== 'alertmanager':
2188 data_dir_root
= get_data_dir(fsid
, ctx
.data_dir
,
2189 daemon_type
, daemon_id
)
2190 config_dir
= 'etc/alertmanager'
2191 makedirs(os
.path
.join(data_dir_root
, config_dir
), uid
, gid
, 0o755)
2192 makedirs(os
.path
.join(data_dir_root
, config_dir
, 'data'), uid
, gid
, 0o755)
2194 # populate the config directory for the component from the config-json
2195 for fname
in required_files
:
2196 if 'files' in config_json
: # type: ignore
2197 content
= dict_get_join(config_json
['files'], fname
)
2198 with
open(os
.path
.join(data_dir_root
, config_dir
, fname
), 'w') as f
:
2199 os
.fchown(f
.fileno(), uid
, gid
)
2200 os
.fchmod(f
.fileno(), 0o600)
2203 elif daemon_type
== NFSGanesha
.daemon_type
:
2204 nfs_ganesha
= NFSGanesha
.init(ctx
, fsid
, daemon_id
)
2205 nfs_ganesha
.create_daemon_dirs(data_dir
, uid
, gid
)
2207 elif daemon_type
== CephIscsi
.daemon_type
:
2208 ceph_iscsi
= CephIscsi
.init(ctx
, fsid
, daemon_id
)
2209 ceph_iscsi
.create_daemon_dirs(data_dir
, uid
, gid
)
2211 elif daemon_type
== HAproxy
.daemon_type
:
2212 haproxy
= HAproxy
.init(ctx
, fsid
, daemon_id
)
2213 haproxy
.create_daemon_dirs(data_dir
, uid
, gid
)
2215 elif daemon_type
== Keepalived
.daemon_type
:
2216 keepalived
= Keepalived
.init(ctx
, fsid
, daemon_id
)
2217 keepalived
.create_daemon_dirs(data_dir
, uid
, gid
)
2219 elif daemon_type
== CustomContainer
.daemon_type
:
2220 cc
= CustomContainer
.init(ctx
, fsid
, daemon_id
)
2221 cc
.create_daemon_dirs(data_dir
, uid
, gid
)
2224 def get_parm(option
):
2225 # type: (str) -> Dict[str, str]
2232 if cached_stdin
is not None:
2235 j
= sys
.stdin
.read()
2238 # inline json string
2239 if option
[0] == '{' and option
[-1] == '}':
2242 elif os
.path
.exists(option
):
2243 with
open(option
, 'r') as f
:
2246 raise Error('Config file {} not found'.format(option
))
2250 except ValueError as e
:
2251 raise Error('Invalid JSON in {}: {}'.format(option
, e
))
2256 def get_config_and_keyring(ctx
):
2257 # type: (CephadmContext) -> Tuple[Optional[str], Optional[str]]
2261 if 'config_json' in ctx
and ctx
.config_json
:
2262 d
= get_parm(ctx
.config_json
)
2263 config
= d
.get('config')
2264 keyring
= d
.get('keyring')
2266 if 'config' in ctx
and ctx
.config
:
2268 with
open(ctx
.config
, 'r') as f
:
2270 except FileNotFoundError
:
2271 raise Error('config file: %s does not exist' % ctx
.config
)
2273 if 'key' in ctx
and ctx
.key
:
2274 keyring
= '[%s]\n\tkey = %s\n' % (ctx
.name
, ctx
.key
)
2275 elif 'keyring' in ctx
and ctx
.keyring
:
2277 with
open(ctx
.keyring
, 'r') as f
:
2279 except FileNotFoundError
:
2280 raise Error('keyring file: %s does not exist' % ctx
.keyring
)
2282 return config
, keyring
2285 def get_container_binds(ctx
, fsid
, daemon_type
, daemon_id
):
2286 # type: (CephadmContext, str, str, Union[int, str, None]) -> List[List[str]]
2289 if daemon_type
== CephIscsi
.daemon_type
:
2290 binds
.extend(CephIscsi
.get_container_binds())
2291 elif daemon_type
== CustomContainer
.daemon_type
:
2293 cc
= CustomContainer
.init(ctx
, fsid
, daemon_id
)
2294 data_dir
= get_data_dir(fsid
, ctx
.data_dir
, daemon_type
, daemon_id
)
2295 binds
.extend(cc
.get_container_binds(data_dir
))
2300 def get_container_mounts(ctx
, fsid
, daemon_type
, daemon_id
,
2302 # type: (CephadmContext, str, str, Union[int, str, None], Optional[bool]) -> Dict[str, str]
2305 if daemon_type
in Ceph
.daemons
:
2307 run_path
= os
.path
.join('/var/run/ceph', fsid
)
2308 if os
.path
.exists(run_path
):
2309 mounts
[run_path
] = '/var/run/ceph:z'
2310 log_dir
= get_log_dir(fsid
, ctx
.log_dir
)
2311 mounts
[log_dir
] = '/var/log/ceph:z'
2312 crash_dir
= '/var/lib/ceph/%s/crash' % fsid
2313 if os
.path
.exists(crash_dir
):
2314 mounts
[crash_dir
] = '/var/lib/ceph/crash:z'
2316 if daemon_type
in Ceph
.daemons
and daemon_id
:
2317 data_dir
= get_data_dir(fsid
, ctx
.data_dir
, daemon_type
, daemon_id
)
2318 if daemon_type
== 'rgw':
2319 cdata_dir
= '/var/lib/ceph/radosgw/ceph-rgw.%s' % (daemon_id
)
2321 cdata_dir
= '/var/lib/ceph/%s/ceph-%s' % (daemon_type
, daemon_id
)
2322 if daemon_type
!= 'crash':
2323 mounts
[data_dir
] = cdata_dir
+ ':z'
2325 mounts
[data_dir
+ '/config'] = '/etc/ceph/ceph.conf:z'
2326 if daemon_type
in ['rbd-mirror', 'cephfs-mirror', 'crash']:
2327 # these do not search for their keyrings in a data directory
2328 mounts
[data_dir
+ '/keyring'] = '/etc/ceph/ceph.client.%s.%s.keyring' % (daemon_type
, daemon_id
)
2330 if daemon_type
in ['mon', 'osd']:
2331 mounts
['/dev'] = '/dev' # FIXME: narrow this down?
2332 mounts
['/run/udev'] = '/run/udev'
2333 if daemon_type
== 'osd':
2334 mounts
['/sys'] = '/sys' # for numa.cc, pick_address, cgroups, ...
2335 # selinux-policy in the container may not match the host.
2336 if HostFacts(ctx
).selinux_enabled
:
2337 selinux_folder
= '/var/lib/ceph/%s/selinux' % fsid
2338 if not os
.path
.exists(selinux_folder
):
2339 os
.makedirs(selinux_folder
, mode
=0o755)
2340 mounts
[selinux_folder
] = '/sys/fs/selinux:ro'
2341 mounts
['/run/lvm'] = '/run/lvm'
2342 mounts
['/run/lock/lvm'] = '/run/lock/lvm'
2345 if ctx
.shared_ceph_folder
: # make easy manager modules/ceph-volume development
2346 ceph_folder
= pathify(ctx
.shared_ceph_folder
)
2347 if os
.path
.exists(ceph_folder
):
2348 mounts
[ceph_folder
+ '/src/ceph-volume/ceph_volume'] = '/usr/lib/python3.6/site-packages/ceph_volume'
2349 mounts
[ceph_folder
+ '/src/pybind/mgr'] = '/usr/share/ceph/mgr'
2350 mounts
[ceph_folder
+ '/src/python-common/ceph'] = '/usr/lib/python3.6/site-packages/ceph'
2351 mounts
[ceph_folder
+ '/monitoring/grafana/dashboards'] = '/etc/grafana/dashboards/ceph-dashboard'
2352 mounts
[ceph_folder
+ '/monitoring/prometheus/alerts'] = '/etc/prometheus/ceph'
2354 logger
.error('{}{}{}'.format(termcolor
.red
,
2355 'Ceph shared source folder does not exist.',
2357 except AttributeError:
2360 if daemon_type
in Monitoring
.components
and daemon_id
:
2361 data_dir
= get_data_dir(fsid
, ctx
.data_dir
, daemon_type
, daemon_id
)
2362 if daemon_type
== 'prometheus':
2363 mounts
[os
.path
.join(data_dir
, 'etc/prometheus')] = '/etc/prometheus:Z'
2364 mounts
[os
.path
.join(data_dir
, 'data')] = '/prometheus:Z'
2365 elif daemon_type
== 'node-exporter':
2366 mounts
['/proc'] = '/host/proc:ro'
2367 mounts
['/sys'] = '/host/sys:ro'
2368 mounts
['/'] = '/rootfs:ro'
2369 elif daemon_type
== 'grafana':
2370 mounts
[os
.path
.join(data_dir
, 'etc/grafana/grafana.ini')] = '/etc/grafana/grafana.ini:Z'
2371 mounts
[os
.path
.join(data_dir
, 'etc/grafana/provisioning/datasources')] = '/etc/grafana/provisioning/datasources:Z'
2372 mounts
[os
.path
.join(data_dir
, 'etc/grafana/certs')] = '/etc/grafana/certs:Z'
2373 mounts
[os
.path
.join(data_dir
, 'data/grafana.db')] = '/var/lib/grafana/grafana.db:Z'
2374 elif daemon_type
== 'alertmanager':
2375 mounts
[os
.path
.join(data_dir
, 'etc/alertmanager')] = '/etc/alertmanager:Z'
2377 if daemon_type
== NFSGanesha
.daemon_type
:
2379 data_dir
= get_data_dir(fsid
, ctx
.data_dir
, daemon_type
, daemon_id
)
2380 nfs_ganesha
= NFSGanesha
.init(ctx
, fsid
, daemon_id
)
2381 mounts
.update(nfs_ganesha
.get_container_mounts(data_dir
))
2383 if daemon_type
== HAproxy
.daemon_type
:
2385 data_dir
= get_data_dir(fsid
, ctx
.data_dir
, daemon_type
, daemon_id
)
2386 mounts
.update(HAproxy
.get_container_mounts(data_dir
))
2388 if daemon_type
== CephIscsi
.daemon_type
:
2390 data_dir
= get_data_dir(fsid
, ctx
.data_dir
, daemon_type
, daemon_id
)
2391 log_dir
= get_log_dir(fsid
, ctx
.log_dir
)
2392 mounts
.update(CephIscsi
.get_container_mounts(data_dir
, log_dir
))
2394 if daemon_type
== Keepalived
.daemon_type
:
2396 data_dir
= get_data_dir(fsid
, ctx
.data_dir
, daemon_type
, daemon_id
)
2397 mounts
.update(Keepalived
.get_container_mounts(data_dir
))
2399 if daemon_type
== CustomContainer
.daemon_type
:
2401 cc
= CustomContainer
.init(ctx
, fsid
, daemon_id
)
2402 data_dir
= get_data_dir(fsid
, ctx
.data_dir
, daemon_type
, daemon_id
)
2403 mounts
.update(cc
.get_container_mounts(data_dir
))
2408 def get_container(ctx
: CephadmContext
,
2409 fsid
: str, daemon_type
: str, daemon_id
: Union
[int, str],
2410 privileged
: bool = False,
2411 ptrace
: bool = False,
2412 container_args
: Optional
[List
[str]] = None) -> 'CephContainer':
2413 entrypoint
: str = ''
2415 ceph_args
: List
[str] = []
2416 envs
: List
[str] = []
2417 host_network
: bool = True
2419 if container_args
is None:
2421 if daemon_type
in ['mon', 'osd']:
2422 # mon and osd need privileged in order for libudev to query devices
2424 if daemon_type
== 'rgw':
2425 entrypoint
= '/usr/bin/radosgw'
2426 name
= 'client.rgw.%s' % daemon_id
2427 elif daemon_type
== 'rbd-mirror':
2428 entrypoint
= '/usr/bin/rbd-mirror'
2429 name
= 'client.rbd-mirror.%s' % daemon_id
2430 elif daemon_type
== 'cephfs-mirror':
2431 entrypoint
= '/usr/bin/cephfs-mirror'
2432 name
= 'client.cephfs-mirror.%s' % daemon_id
2433 elif daemon_type
== 'crash':
2434 entrypoint
= '/usr/bin/ceph-crash'
2435 name
= 'client.crash.%s' % daemon_id
2436 elif daemon_type
in ['mon', 'mgr', 'mds', 'osd']:
2437 entrypoint
= '/usr/bin/ceph-' + daemon_type
2438 name
= '%s.%s' % (daemon_type
, daemon_id
)
2439 elif daemon_type
in Monitoring
.components
:
2441 elif daemon_type
== NFSGanesha
.daemon_type
:
2442 entrypoint
= NFSGanesha
.entrypoint
2443 name
= '%s.%s' % (daemon_type
, daemon_id
)
2444 envs
.extend(NFSGanesha
.get_container_envs())
2445 elif daemon_type
== HAproxy
.daemon_type
:
2446 name
= '%s.%s' % (daemon_type
, daemon_id
)
2447 elif daemon_type
== Keepalived
.daemon_type
:
2448 name
= '%s.%s' % (daemon_type
, daemon_id
)
2449 envs
.extend(Keepalived
.get_container_envs())
2450 container_args
.extend(['--cap-add=NET_ADMIN', '--cap-add=NET_RAW'])
2451 elif daemon_type
== CephIscsi
.daemon_type
:
2452 entrypoint
= CephIscsi
.entrypoint
2453 name
= '%s.%s' % (daemon_type
, daemon_id
)
2454 # So the container can modprobe iscsi_target_mod and have write perms
2455 # to configfs we need to make this a privileged container.
2457 elif daemon_type
== CustomContainer
.daemon_type
:
2458 cc
= CustomContainer
.init(ctx
, fsid
, daemon_id
)
2459 entrypoint
= cc
.entrypoint
2460 host_network
= False
2461 envs
.extend(cc
.get_container_envs())
2462 container_args
.extend(cc
.get_container_args())
2464 if daemon_type
in Monitoring
.components
:
2465 uid
, gid
= extract_uid_gid_monitoring(ctx
, daemon_type
)
2469 # FIXME: disable cpu/memory limits for the time being (not supported
2470 # by ubuntu 18.04 kernel!)
2472 container_args
.extend(monitoring_args
)
2473 elif daemon_type
== 'crash':
2474 ceph_args
= ['-n', name
]
2475 elif daemon_type
in Ceph
.daemons
:
2476 ceph_args
= ['-n', name
, '-f']
2478 # if using podman, set -d, --conmon-pidfile & --cidfile flags
2479 # so service can have Type=Forking
2480 if isinstance(ctx
.container_engine
, Podman
):
2481 runtime_dir
= '/run'
2482 container_args
.extend([
2483 '-d', '--log-driver', 'journald',
2485 runtime_dir
+ '/ceph-%s@%s.%s.service-pid' % (fsid
, daemon_type
, daemon_id
),
2487 runtime_dir
+ '/ceph-%s@%s.%s.service-cid' % (fsid
, daemon_type
, daemon_id
),
2489 if ctx
.container_engine
.version
>= CGROUPS_SPLIT_PODMAN_VERSION
:
2490 container_args
.append('--cgroups=split')
2492 return CephContainer(
2495 entrypoint
=entrypoint
,
2496 args
=ceph_args
+ get_daemon_args(ctx
, fsid
, daemon_type
, daemon_id
),
2497 container_args
=container_args
,
2498 volume_mounts
=get_container_mounts(ctx
, fsid
, daemon_type
, daemon_id
),
2499 bind_mounts
=get_container_binds(ctx
, fsid
, daemon_type
, daemon_id
),
2500 cname
='ceph-%s-%s.%s' % (fsid
, daemon_type
, daemon_id
),
2502 privileged
=privileged
,
2504 host_network
=host_network
,
2508 def extract_uid_gid(ctx
, img
='', file_path
='/var/lib/ceph'):
2509 # type: (CephadmContext, str, Union[str, List[str]]) -> Tuple[int, int]
2514 if isinstance(file_path
, str):
2521 out
= CephContainer(
2525 args
=['-c', '%u %g', fp
]
2527 uid
, gid
= out
.split(' ')
2528 return int(uid
), int(gid
)
2529 except RuntimeError:
2531 raise RuntimeError('uid/gid not found')
2534 def deploy_daemon(ctx
, fsid
, daemon_type
, daemon_id
, c
, uid
, gid
,
2535 config
=None, keyring
=None,
2539 # type: (CephadmContext, str, str, Union[int, str], Optional[CephContainer], int, int, Optional[str], Optional[str], Optional[str], Optional[bool], Optional[List[int]]) -> None
2542 if any([port_in_use(ctx
, port
) for port
in ports
]):
2543 raise Error("TCP Port(s) '{}' required for {} already in use".format(','.join(map(str, ports
)), daemon_type
))
2545 data_dir
= get_data_dir(fsid
, ctx
.data_dir
, daemon_type
, daemon_id
)
2546 if reconfig
and not os
.path
.exists(data_dir
):
2547 raise Error('cannot reconfig, data path %s does not exist' % data_dir
)
2548 if daemon_type
== 'mon' and not os
.path
.exists(data_dir
):
2552 tmp_keyring
= write_tmp(keyring
, uid
, gid
)
2555 tmp_config
= write_tmp(config
, uid
, gid
)
2558 create_daemon_dirs(ctx
, fsid
, daemon_type
, daemon_id
, uid
, gid
)
2559 mon_dir
= get_data_dir(fsid
, ctx
.data_dir
, 'mon', daemon_id
)
2560 log_dir
= get_log_dir(fsid
, ctx
.log_dir
)
2564 entrypoint
='/usr/bin/ceph-mon',
2567 '-i', str(daemon_id
),
2569 '-c', '/tmp/config',
2570 '--keyring', '/tmp/keyring',
2571 ] + get_daemon_args(ctx
, fsid
, 'mon', daemon_id
),
2573 log_dir
: '/var/log/ceph:z',
2574 mon_dir
: '/var/lib/ceph/mon/ceph-%s:z' % (daemon_id
),
2575 tmp_keyring
.name
: '/tmp/keyring:z',
2576 tmp_config
.name
: '/tmp/config:z',
2581 with
open(mon_dir
+ '/config', 'w') as f
:
2582 os
.fchown(f
.fileno(), uid
, gid
)
2583 os
.fchmod(f
.fileno(), 0o600)
2586 # dirs, conf, keyring
2589 fsid
, daemon_type
, daemon_id
,
2594 if daemon_type
== CephadmDaemon
.daemon_type
:
2595 port
= next(iter(ports
), None) # get first tcp port provided or None
2597 if ctx
.config_json
== '-':
2598 config_js
= get_parm('-')
2600 config_js
= get_parm(ctx
.config_json
)
2601 assert isinstance(config_js
, dict)
2603 cephadm_exporter
= CephadmDaemon(ctx
, fsid
, daemon_id
, port
)
2604 cephadm_exporter
.deploy_daemon_unit(config_js
)
2607 deploy_daemon_units(ctx
, fsid
, uid
, gid
, daemon_type
, daemon_id
,
2608 c
, osd_fsid
=osd_fsid
, ports
=ports
)
2610 raise RuntimeError('attempting to deploy a daemon without a container image')
2612 if not os
.path
.exists(data_dir
+ '/unit.created'):
2613 with
open(data_dir
+ '/unit.created', 'w') as f
:
2614 os
.fchmod(f
.fileno(), 0o600)
2615 os
.fchown(f
.fileno(), uid
, gid
)
2616 f
.write('mtime is time the daemon deployment was created\n')
2618 with
open(data_dir
+ '/unit.configured', 'w') as f
:
2619 f
.write('mtime is time we were last configured\n')
2620 os
.fchmod(f
.fileno(), 0o600)
2621 os
.fchown(f
.fileno(), uid
, gid
)
2623 update_firewalld(ctx
, daemon_type
)
2625 # Open ports explicitly required for the daemon
2628 fw
.open_ports(ports
)
2631 if reconfig
and daemon_type
not in Ceph
.daemons
:
2632 # ceph daemons do not need a restart; others (presumably) do to pick
2634 call_throws(ctx
, ['systemctl', 'reset-failed',
2635 get_unit_name(fsid
, daemon_type
, daemon_id
)])
2636 call_throws(ctx
, ['systemctl', 'restart',
2637 get_unit_name(fsid
, daemon_type
, daemon_id
)])
2640 def _write_container_cmd_to_bash(ctx
, file_obj
, container
, comment
=None, background
=False):
2641 # type: (CephadmContext, IO[str], CephContainer, Optional[str], Optional[bool]) -> None
2643 # Sometimes adding a comment, especially if there are multiple containers in one
2644 # unit file, makes it easier to read and grok.
2645 file_obj
.write('# ' + comment
+ '\n')
2646 # Sometimes, adding `--rm` to a run_cmd doesn't work. Let's remove the container manually
2647 file_obj
.write('! ' + ' '.join(container
.rm_cmd()) + ' 2> /dev/null\n')
2648 # Sometimes, `podman rm` doesn't find the container. Then you'll have to add `--storage`
2649 if isinstance(ctx
.container_engine
, Podman
):
2652 + ' '.join([shlex
.quote(a
) for a
in container
.rm_cmd(storage
=True)])
2653 + ' 2> /dev/null\n')
2655 # container run command
2657 ' '.join([shlex
.quote(a
) for a
in container
.run_cmd()])
2658 + (' &' if background
else '') + '\n')
2661 def deploy_daemon_units(
2662 ctx
: CephadmContext
,
2667 daemon_id
: Union
[int, str],
2669 enable
: bool = True,
2671 osd_fsid
: Optional
[str] = None,
2672 ports
: Optional
[List
[int]] = None,
2675 data_dir
= get_data_dir(fsid
, ctx
.data_dir
, daemon_type
, daemon_id
)
2676 with
open(data_dir
+ '/unit.run.new', 'w') as f
, \
2677 open(data_dir
+ '/unit.meta.new', 'w') as metaf
:
2680 if daemon_type
in Ceph
.daemons
:
2681 install_path
= find_program('install')
2682 f
.write('{install_path} -d -m0770 -o {uid} -g {gid} /var/run/ceph/{fsid}\n'.format(install_path
=install_path
, fsid
=fsid
, uid
=uid
, gid
=gid
))
2685 if daemon_type
== 'osd':
2686 # osds have a pre-start step
2688 simple_fn
= os
.path
.join('/etc/ceph/osd',
2689 '%s-%s.json.adopted-by-cephadm' % (daemon_id
, osd_fsid
))
2690 if os
.path
.exists(simple_fn
):
2691 f
.write('# Simple OSDs need chown on startup:\n')
2692 for n
in ['block', 'block.db', 'block.wal']:
2693 p
= os
.path
.join(data_dir
, n
)
2694 f
.write('[ ! -L {p} ] || chown {uid}:{gid} {p}\n'.format(p
=p
, uid
=uid
, gid
=gid
))
2696 prestart
= CephContainer(
2699 entrypoint
='/usr/sbin/ceph-volume',
2702 str(daemon_id
), osd_fsid
,
2706 volume_mounts
=get_container_mounts(ctx
, fsid
, daemon_type
, daemon_id
),
2707 bind_mounts
=get_container_binds(ctx
, fsid
, daemon_type
, daemon_id
),
2708 cname
='ceph-%s-%s.%s-activate' % (fsid
, daemon_type
, daemon_id
),
2709 memory_request
=ctx
.memory_request
,
2710 memory_limit
=ctx
.memory_limit
,
2712 _write_container_cmd_to_bash(ctx
, f
, prestart
, 'LVM OSDs use ceph-volume lvm activate')
2713 elif daemon_type
== NFSGanesha
.daemon_type
:
2714 # add nfs to the rados grace db
2715 nfs_ganesha
= NFSGanesha
.init(ctx
, fsid
, daemon_id
)
2716 prestart
= nfs_ganesha
.get_rados_grace_container('add')
2717 _write_container_cmd_to_bash(ctx
, f
, prestart
, 'add daemon to rados grace')
2718 elif daemon_type
== CephIscsi
.daemon_type
:
2719 f
.write(' '.join(CephIscsi
.configfs_mount_umount(data_dir
, mount
=True)) + '\n')
2720 ceph_iscsi
= CephIscsi
.init(ctx
, fsid
, daemon_id
)
2721 tcmu_container
= ceph_iscsi
.get_tcmu_runner_container()
2722 _write_container_cmd_to_bash(ctx
, f
, tcmu_container
, 'iscsi tcmu-runnter container', background
=True)
2723 elif daemon_type
== Keepalived
.daemon_type
:
2724 f
.write(Keepalived
.get_prestart())
2726 _write_container_cmd_to_bash(ctx
, f
, c
, '%s.%s' % (daemon_type
, str(daemon_id
)))
2728 # some metadata about the deploy
2729 meta
: Dict
[str, Any
] = {}
2730 if 'meta_json' in ctx
and ctx
.meta_json
:
2731 meta
= json
.loads(ctx
.meta_json
) or {}
2733 'memory_request': int(ctx
.memory_request
) if ctx
.memory_request
else None,
2734 'memory_limit': int(ctx
.memory_limit
) if ctx
.memory_limit
else None,
2736 if not meta
.get('ports'):
2737 meta
['ports'] = ports
2738 metaf
.write(json
.dumps(meta
, indent
=4) + '\n')
2740 os
.fchmod(f
.fileno(), 0o600)
2741 os
.fchmod(metaf
.fileno(), 0o600)
2742 os
.rename(data_dir
+ '/unit.run.new',
2743 data_dir
+ '/unit.run')
2744 os
.rename(data_dir
+ '/unit.meta.new',
2745 data_dir
+ '/unit.meta')
2747 # post-stop command(s)
2748 with
open(data_dir
+ '/unit.poststop.new', 'w') as f
:
2749 if daemon_type
== 'osd':
2751 poststop
= CephContainer(
2754 entrypoint
='/usr/sbin/ceph-volume',
2756 'lvm', 'deactivate',
2757 str(daemon_id
), osd_fsid
,
2760 volume_mounts
=get_container_mounts(ctx
, fsid
, daemon_type
, daemon_id
),
2761 bind_mounts
=get_container_binds(ctx
, fsid
, daemon_type
, daemon_id
),
2762 cname
='ceph-%s-%s.%s-deactivate' % (fsid
, daemon_type
,
2765 _write_container_cmd_to_bash(ctx
, f
, poststop
, 'deactivate osd')
2766 elif daemon_type
== NFSGanesha
.daemon_type
:
2767 # remove nfs from the rados grace db
2768 nfs_ganesha
= NFSGanesha
.init(ctx
, fsid
, daemon_id
)
2769 poststop
= nfs_ganesha
.get_rados_grace_container('remove')
2770 _write_container_cmd_to_bash(ctx
, f
, poststop
, 'remove daemon from rados grace')
2771 elif daemon_type
== CephIscsi
.daemon_type
:
2772 # make sure we also stop the tcmu container
2773 ceph_iscsi
= CephIscsi
.init(ctx
, fsid
, daemon_id
)
2774 tcmu_container
= ceph_iscsi
.get_tcmu_runner_container()
2775 f
.write('! ' + ' '.join(tcmu_container
.stop_cmd()) + '\n')
2776 f
.write(' '.join(CephIscsi
.configfs_mount_umount(data_dir
, mount
=False)) + '\n')
2777 os
.fchmod(f
.fileno(), 0o600)
2778 os
.rename(data_dir
+ '/unit.poststop.new',
2779 data_dir
+ '/unit.poststop')
2782 with
open(data_dir
+ '/unit.image.new', 'w') as f
:
2783 f
.write(c
.image
+ '\n')
2784 os
.fchmod(f
.fileno(), 0o600)
2785 os
.rename(data_dir
+ '/unit.image.new',
2786 data_dir
+ '/unit.image')
2789 install_base_units(ctx
, fsid
)
2790 unit
= get_unit_file(ctx
, fsid
)
2791 unit_file
= 'ceph-%s@.service' % (fsid
)
2792 with
open(ctx
.unit_dir
+ '/' + unit_file
+ '.new', 'w') as f
:
2794 os
.rename(ctx
.unit_dir
+ '/' + unit_file
+ '.new',
2795 ctx
.unit_dir
+ '/' + unit_file
)
2796 call_throws(ctx
, ['systemctl', 'daemon-reload'])
2798 unit_name
= get_unit_name(fsid
, daemon_type
, daemon_id
)
2799 call(ctx
, ['systemctl', 'stop', unit_name
],
2800 verbosity
=CallVerbosity
.DEBUG
)
2801 call(ctx
, ['systemctl', 'reset-failed', unit_name
],
2802 verbosity
=CallVerbosity
.DEBUG
)
2804 call_throws(ctx
, ['systemctl', 'enable', unit_name
])
2806 call_throws(ctx
, ['systemctl', 'start', unit_name
])
2809 class Firewalld(object):
2810 def __init__(self
, ctx
):
2811 # type: (CephadmContext) -> None
2813 self
.available
= self
.check()
2817 self
.cmd
= find_executable('firewall-cmd')
2819 logger
.debug('firewalld does not appear to be present')
2821 (enabled
, state
, _
) = check_unit(self
.ctx
, 'firewalld.service')
2823 logger
.debug('firewalld.service is not enabled')
2825 if state
!= 'running':
2826 logger
.debug('firewalld.service is not running')
2829 logger
.info('firewalld ready')
2832 def enable_service_for(self
, daemon_type
):
2833 # type: (str) -> None
2834 if not self
.available
:
2835 logger
.debug('Not possible to enable service <%s>. firewalld.service is not available' % daemon_type
)
2838 if daemon_type
== 'mon':
2840 elif daemon_type
in ['mgr', 'mds', 'osd']:
2842 elif daemon_type
== NFSGanesha
.daemon_type
:
2848 raise RuntimeError('command not defined')
2850 out
, err
, ret
= call(self
.ctx
, [self
.cmd
, '--permanent', '--query-service', svc
], verbosity
=CallVerbosity
.DEBUG
)
2852 logger
.info('Enabling firewalld service %s in current zone...' % svc
)
2853 out
, err
, ret
= call(self
.ctx
, [self
.cmd
, '--permanent', '--add-service', svc
])
2856 'unable to add service %s to current zone: %s' % (svc
, err
))
2858 logger
.debug('firewalld service %s is enabled in current zone' % svc
)
2860 def open_ports(self
, fw_ports
):
2861 # type: (List[int]) -> None
2862 if not self
.available
:
2863 logger
.debug('Not possible to open ports <%s>. firewalld.service is not available' % fw_ports
)
2867 raise RuntimeError('command not defined')
2869 for port
in fw_ports
:
2870 tcp_port
= str(port
) + '/tcp'
2871 out
, err
, ret
= call(self
.ctx
, [self
.cmd
, '--permanent', '--query-port', tcp_port
], verbosity
=CallVerbosity
.DEBUG
)
2873 logger
.info('Enabling firewalld port %s in current zone...' % tcp_port
)
2874 out
, err
, ret
= call(self
.ctx
, [self
.cmd
, '--permanent', '--add-port', tcp_port
])
2876 raise RuntimeError('unable to add port %s to current zone: %s' %
2879 logger
.debug('firewalld port %s is enabled in current zone' % tcp_port
)
2881 def close_ports(self
, fw_ports
):
2882 # type: (List[int]) -> None
2883 if not self
.available
:
2884 logger
.debug('Not possible to close ports <%s>. firewalld.service is not available' % fw_ports
)
2888 raise RuntimeError('command not defined')
2890 for port
in fw_ports
:
2891 tcp_port
= str(port
) + '/tcp'
2892 out
, err
, ret
= call(self
.ctx
, [self
.cmd
, '--permanent', '--query-port', tcp_port
], verbosity
=CallVerbosity
.DEBUG
)
2894 logger
.info('Disabling port %s in current zone...' % tcp_port
)
2895 out
, err
, ret
= call(self
.ctx
, [self
.cmd
, '--permanent', '--remove-port', tcp_port
])
2897 raise RuntimeError('unable to remove port %s from current zone: %s' %
2900 logger
.info(f
'Port {tcp_port} disabled')
2902 logger
.info(f
'firewalld port {tcp_port} already closed')
2904 def apply_rules(self
):
2906 if not self
.available
:
2910 raise RuntimeError('command not defined')
2912 call_throws(self
.ctx
, [self
.cmd
, '--reload'])
2915 def update_firewalld(ctx
, daemon_type
):
2916 # type: (CephadmContext, str) -> None
2917 firewall
= Firewalld(ctx
)
2919 firewall
.enable_service_for(daemon_type
)
2923 if daemon_type
in Monitoring
.port_map
.keys():
2924 fw_ports
.extend(Monitoring
.port_map
[daemon_type
]) # prometheus etc
2926 firewall
.open_ports(fw_ports
)
2927 firewall
.apply_rules()
2930 def install_base_units(ctx
, fsid
):
2931 # type: (CephadmContext, str) -> None
2933 Set up ceph.target and ceph-$fsid.target units.
2936 existed
= os
.path
.exists(ctx
.unit_dir
+ '/ceph.target')
2937 with
open(ctx
.unit_dir
+ '/ceph.target.new', 'w') as f
:
2939 'Description=All Ceph clusters and services\n'
2942 'WantedBy=multi-user.target\n')
2943 os
.rename(ctx
.unit_dir
+ '/ceph.target.new',
2944 ctx
.unit_dir
+ '/ceph.target')
2946 # we disable before enable in case a different ceph.target
2947 # (from the traditional package) is present; while newer
2948 # systemd is smart enough to disable the old
2949 # (/lib/systemd/...) and enable the new (/etc/systemd/...),
2950 # some older versions of systemd error out with EEXIST.
2951 call_throws(ctx
, ['systemctl', 'disable', 'ceph.target'])
2952 call_throws(ctx
, ['systemctl', 'enable', 'ceph.target'])
2953 call_throws(ctx
, ['systemctl', 'start', 'ceph.target'])
2956 existed
= os
.path
.exists(ctx
.unit_dir
+ '/ceph-%s.target' % fsid
)
2957 with
open(ctx
.unit_dir
+ '/ceph-%s.target.new' % fsid
, 'w') as f
:
2960 'Description=Ceph cluster {fsid}\n'
2961 'PartOf=ceph.target\n'
2962 'Before=ceph.target\n'
2965 'WantedBy=multi-user.target ceph.target\n'.format(
2968 os
.rename(ctx
.unit_dir
+ '/ceph-%s.target.new' % fsid
,
2969 ctx
.unit_dir
+ '/ceph-%s.target' % fsid
)
2971 call_throws(ctx
, ['systemctl', 'enable', 'ceph-%s.target' % fsid
])
2972 call_throws(ctx
, ['systemctl', 'start', 'ceph-%s.target' % fsid
])
2974 # logrotate for the cluster
2975 with
open(ctx
.logrotate_dir
+ '/ceph-%s' % fsid
, 'w') as f
:
2977 This is a bit sloppy in that the killall/pkill will touch all ceph daemons
2978 in all containers, but I don't see an elegant way to send SIGHUP *just* to
2979 the daemons for this cluster. (1) systemd kill -s will get the signal to
2980 podman, but podman will exit. (2) podman kill will get the signal to the
2981 first child (bash), but that isn't the ceph daemon. This is simpler and
2984 f
.write("""# created by cephadm
2985 /var/log/ceph/%s/*.log {
2991 killall -q -1 ceph-mon ceph-mgr ceph-mds ceph-osd ceph-fuse radosgw rbd-mirror cephfs-mirror || pkill -1 -x 'ceph-mon|ceph-mgr|ceph-mds|ceph-osd|ceph-fuse|radosgw|rbd-mirror|cephfs-mirror' || true
3000 def get_unit_file(ctx
, fsid
):
3001 # type: (CephadmContext, str) -> str
3003 if isinstance(ctx
.container_engine
, Podman
):
3004 extra_args
= ('ExecStartPre=-/bin/rm -f %t/%n-pid %t/%n-cid\n'
3005 'ExecStopPost=-/bin/rm -f %t/%n-pid %t/%n-cid\n'
3007 'PIDFile=%t/%n-pid\n')
3008 if ctx
.container_engine
.version
>= CGROUPS_SPLIT_PODMAN_VERSION
:
3009 extra_args
+= 'Delegate=yes\n'
3011 docker
= isinstance(ctx
.container_engine
, Docker
)
3012 u
= """# generated by cephadm
3014 Description=Ceph %i for {fsid}
3017 # http://www.freedesktop.org/wiki/Software/systemd/NetworkTarget
3018 # these can be removed once ceph-mon will dynamically change network
3020 After=network-online.target local-fs.target time-sync.target{docker_after}
3021 Wants=network-online.target local-fs.target time-sync.target
3024 PartOf=ceph-{fsid}.target
3025 Before=ceph-{fsid}.target
3030 EnvironmentFile=-/etc/environment
3031 ExecStart=/bin/bash {data_dir}/{fsid}/%i/unit.run
3032 ExecStop=-{container_path} stop ceph-{fsid}-%i
3033 ExecStopPost=-/bin/bash {data_dir}/{fsid}/%i/unit.poststop
3039 StartLimitInterval=30min
3043 WantedBy=ceph-{fsid}.target
3044 """.format(container_path
=ctx
.container_engine
.path
,
3046 data_dir
=ctx
.data_dir
,
3047 extra_args
=extra_args
,
3048 # if docker, we depend on docker.service
3049 docker_after
=' docker.service' if docker
else '',
3050 docker_requires
='Requires=docker.service\n' if docker
else '')
3054 ##################################
3057 class CephContainer
:
3059 ctx
: CephadmContext
,
3062 args
: List
[str] = [],
3063 volume_mounts
: Dict
[str, str] = {},
3065 container_args
: List
[str] = [],
3066 envs
: Optional
[List
[str]] = None,
3067 privileged
: bool = False,
3068 ptrace
: bool = False,
3069 bind_mounts
: Optional
[List
[List
[str]]] = None,
3070 init
: Optional
[bool] = None,
3071 host_network
: bool = True,
3072 memory_request
: Optional
[str] = None,
3073 memory_limit
: Optional
[str] = None,
3077 self
.entrypoint
= entrypoint
3079 self
.volume_mounts
= volume_mounts
3081 self
.container_args
= container_args
3083 self
.privileged
= privileged
3084 self
.ptrace
= ptrace
3085 self
.bind_mounts
= bind_mounts
if bind_mounts
else []
3086 self
.init
= init
if init
else ctx
.container_init
3087 self
.host_network
= host_network
3088 self
.memory_request
= memory_request
3089 self
.memory_limit
= memory_limit
3091 def run_cmd(self
) -> List
[str]:
3092 cmd_args
: List
[str] = [
3093 str(self
.ctx
.container_engine
.path
),
3099 if isinstance(self
.ctx
.container_engine
, Podman
):
3100 # podman adds the container *name* to /etc/hosts (for 127.0.1.1)
3101 # by default, which makes python's socket.getfqdn() return that
3102 # instead of a valid hostname.
3103 cmd_args
.append('--no-hosts')
3104 if os
.path
.exists('/etc/ceph/podman-auth.json'):
3105 cmd_args
.append('--authfile=/etc/ceph/podman-auth.json')
3108 '-e', 'CONTAINER_IMAGE=%s' % self
.image
,
3109 '-e', 'NODE_NAME=%s' % get_hostname(),
3111 vols
: List
[str] = []
3112 binds
: List
[str] = []
3114 if self
.memory_request
:
3115 cmd_args
.extend(['-e', 'POD_MEMORY_REQUEST', str(self
.memory_request
)])
3116 if self
.memory_limit
:
3117 cmd_args
.extend(['-e', 'POD_MEMORY_LIMIT', str(self
.memory_limit
)])
3118 cmd_args
.extend(['--memory', str(self
.memory_limit
)])
3120 if self
.host_network
:
3121 cmd_args
.append('--net=host')
3123 cmd_args
.extend(['--entrypoint', self
.entrypoint
])
3127 # let OSD etc read block devs that haven't been chowned
3128 '--group-add=disk'])
3129 if self
.ptrace
and not self
.privileged
:
3130 # if privileged, the SYS_PTRACE cap is already added
3131 # in addition, --cap-add and --privileged are mutually
3132 # exclusive since podman >= 2.0
3133 cmd_args
.append('--cap-add=SYS_PTRACE')
3135 cmd_args
.append('--init')
3136 envs
+= ['-e', 'CEPH_USE_RANDOM_NONCE=1']
3138 cmd_args
.extend(['--name', self
.cname
])
3140 for env
in self
.envs
:
3141 envs
.extend(['-e', env
])
3144 [['-v', '%s:%s' % (host_dir
, container_dir
)]
3145 for host_dir
, container_dir
in self
.volume_mounts
.items()], [])
3146 binds
= sum([['--mount', '{}'.format(','.join(bind
))]
3147 for bind
in self
.bind_mounts
], [])
3150 cmd_args
+ self
.container_args
+ \
3151 envs
+ vols
+ binds
+ \
3152 [self
.image
] + self
.args
# type: ignore
3154 def shell_cmd(self
, cmd
: List
[str]) -> List
[str]:
3155 cmd_args
: List
[str] = [
3156 str(self
.ctx
.container_engine
.path
),
3162 '-e', 'CONTAINER_IMAGE=%s' % self
.image
,
3163 '-e', 'NODE_NAME=%s' % get_hostname(),
3165 vols
: List
[str] = []
3166 binds
: List
[str] = []
3168 if self
.host_network
:
3169 cmd_args
.append('--net=host')
3173 # let OSD etc read block devs that haven't been chowned
3177 cmd_args
.append('--init')
3178 envs
+= ['-e', 'CEPH_USE_RANDOM_NONCE=1']
3180 for env
in self
.envs
:
3181 envs
.extend(['-e', env
])
3184 [['-v', '%s:%s' % (host_dir
, container_dir
)]
3185 for host_dir
, container_dir
in self
.volume_mounts
.items()], [])
3186 binds
= sum([['--mount', '{}'.format(','.join(bind
))]
3187 for bind
in self
.bind_mounts
], [])
3189 return cmd_args
+ self
.container_args
+ envs
+ vols
+ binds
+ [
3190 '--entrypoint', cmd
[0],
3194 def exec_cmd(self
, cmd
):
3195 # type: (List[str]) -> List[str]
3197 str(self
.ctx
.container_engine
.path
),
3199 ] + self
.container_args
+ [
3203 def rm_cmd(self
, storage
=False):
3204 # type: (bool) -> List[str]
3206 str(self
.ctx
.container_engine
.path
),
3210 ret
.append('--storage')
3211 ret
.append(self
.cname
)
3215 # type () -> List[str]
3217 str(self
.ctx
.container_engine
.path
),
3222 def run(self
, timeout
=DEFAULT_TIMEOUT
):
3223 # type: (Optional[int]) -> str
3224 out
, _
, _
= call_throws(self
.ctx
, self
.run_cmd(),
3225 desc
=self
.entrypoint
, timeout
=timeout
)
3228 ##################################
3232 def command_version(ctx
):
3233 # type: (CephadmContext) -> int
3234 c
= CephContainer(ctx
, ctx
.image
, 'ceph', ['--version'])
3235 out
, err
, ret
= call(ctx
, c
.run_cmd(), desc
=c
.entrypoint
)
3240 ##################################
3244 def command_pull(ctx
):
3245 # type: (CephadmContext) -> int
3247 _pull_image(ctx
, ctx
.image
)
3248 return command_inspect_image(ctx
)
3251 def _pull_image(ctx
, image
):
3252 # type: (CephadmContext, str) -> None
3253 logger
.info('Pulling container image %s...' % image
)
3256 'error creating read-write layer with ID',
3257 'net/http: TLS handshake timeout',
3258 'Digest did not match, expected',
3261 cmd
= [ctx
.container_engine
.path
, 'pull', image
]
3262 if isinstance(ctx
.container_engine
, Podman
) and os
.path
.exists('/etc/ceph/podman-auth.json'):
3263 cmd
.append('--authfile=/etc/ceph/podman-auth.json')
3264 cmd_str
= ' '.join(cmd
)
3266 for sleep_secs
in [1, 4, 25]:
3267 out
, err
, ret
= call(ctx
, cmd
)
3271 if not any(pattern
in err
for pattern
in ignorelist
):
3272 raise RuntimeError('Failed command: %s' % cmd_str
)
3274 logger
.info('`%s` failed transiently. Retrying. waiting %s seconds...' % (cmd_str
, sleep_secs
))
3275 time
.sleep(sleep_secs
)
3277 raise RuntimeError('Failed command: %s: maximum retries reached' % cmd_str
)
3279 ##################################
3283 def command_inspect_image(ctx
):
3284 # type: (CephadmContext) -> int
3285 out
, err
, ret
= call_throws(ctx
, [
3286 ctx
.container_engine
.path
, 'inspect',
3287 '--format', '{{.ID}},{{.RepoDigests}}',
3291 info_from
= get_image_info_from_inspect(out
.strip(), ctx
.image
)
3293 ver
= CephContainer(ctx
, ctx
.image
, 'ceph', ['--version']).run().strip()
3294 info_from
['ceph_version'] = ver
3296 print(json
.dumps(info_from
, indent
=4, sort_keys
=True))
3300 def normalize_image_digest(digest
):
3302 # ceph/ceph -> docker.io/ceph/ceph
3303 # edge cases that shouldn't ever come up:
3304 # ubuntu -> docker.io/ubuntu (ubuntu alias for library/ubuntu)
3306 # quay.ceph.io/ceph/ceph -> ceph
3307 # docker.io/ubuntu -> no change
3308 bits
= digest
.split('/')
3309 if '.' not in bits
[0] or len(bits
) < 3:
3310 digest
= DEFAULT_REGISTRY
+ '/' + digest
3314 def get_image_info_from_inspect(out
, image
):
3315 # type: (str, str) -> Dict[str, Union[str,List[str]]]
3316 image_id
, digests
= out
.split(',', 1)
3318 raise Error('inspect {}: empty result'.format(image
))
3320 'image_id': normalize_container_id(image_id
)
3321 } # type: Dict[str, Union[str,List[str]]]
3323 r
['repo_digests'] = list(map(normalize_image_digest
, digests
[1:-1].split(' ')))
3326 ##################################
3329 def check_subnet(subnets
: str) -> Tuple
[int, List
[int], str]:
3330 """Determine whether the given string is a valid subnet
3332 :param subnets: subnet string, a single definition or comma separated list of CIDR subnets
3333 :returns: return code, IP version list of the subnets and msg describing any errors validation errors
3339 subnet_list
= subnets
.split(',')
3340 for subnet
in subnet_list
:
3341 # ensure the format of the string is as expected address/netmask
3342 if not re
.search(r
'\/\d+$', subnet
):
3344 errors
.append(f
'{subnet} is not in CIDR format (address/netmask)')
3347 v
= ipaddress
.ip_network(subnet
).version
3349 except ValueError as e
:
3351 errors
.append(f
'{subnet} invalid: {str(e)}')
3353 return rc
, list(versions
), ', '.join(errors
)
3356 def unwrap_ipv6(address
):
3357 # type: (str) -> str
3358 if address
.startswith('[') and address
.endswith(']'):
3359 return address
[1:-1]
3363 def wrap_ipv6(address
):
3364 # type: (str) -> str
3366 # We cannot assume it's already wrapped or even an IPv6 address if
3367 # it's already wrapped it'll not pass (like if it's a hostname) and trigger
3370 if ipaddress
.ip_address(address
).version
== 6:
3371 return f
'[{address}]'
3378 def is_ipv6(address
):
3379 # type: (str) -> bool
3380 address
= unwrap_ipv6(address
)
3382 return ipaddress
.ip_address(address
).version
== 6
3384 logger
.warning('Address: {} is not a valid IP address'.format(address
))
3388 def prepare_mon_addresses(
3390 ) -> Tuple
[str, bool, Optional
[str]]:
3391 r
= re
.compile(r
':(\d+)$')
3396 ipv6
= is_ipv6(ctx
.mon_ip
)
3398 ctx
.mon_ip
= wrap_ipv6(ctx
.mon_ip
)
3399 hasport
= r
.findall(ctx
.mon_ip
)
3401 port
= int(hasport
[0])
3403 addr_arg
= '[v1:%s]' % ctx
.mon_ip
3405 addr_arg
= '[v2:%s]' % ctx
.mon_ip
3407 logger
.warning('Using msgr2 protocol for unrecognized port %d' %
3409 addr_arg
= '[v2:%s]' % ctx
.mon_ip
3410 base_ip
= ctx
.mon_ip
[0:-(len(str(port
))) - 1]
3411 check_ip_port(ctx
, base_ip
, port
)
3413 base_ip
= ctx
.mon_ip
3414 addr_arg
= '[v2:%s:3300,v1:%s:6789]' % (ctx
.mon_ip
, ctx
.mon_ip
)
3415 check_ip_port(ctx
, ctx
.mon_ip
, 3300)
3416 check_ip_port(ctx
, ctx
.mon_ip
, 6789)
3418 addr_arg
= ctx
.mon_addrv
3419 if addr_arg
[0] != '[' or addr_arg
[-1] != ']':
3420 raise Error('--mon-addrv value %s must use square backets' %
3422 ipv6
= addr_arg
.count('[') > 1
3423 for addr
in addr_arg
[1:-1].split(','):
3424 hasport
= r
.findall(addr
)
3426 raise Error('--mon-addrv value %s must include port number' %
3428 port
= int(hasport
[0])
3429 # strip off v1: or v2: prefix
3430 addr
= re
.sub(r
'^\w+:', '', addr
)
3431 base_ip
= addr
[0:-(len(str(port
))) - 1]
3432 check_ip_port(ctx
, base_ip
, port
)
3434 raise Error('must specify --mon-ip or --mon-addrv')
3435 logger
.debug('Base mon IP is %s, final addrv is %s' % (base_ip
, addr_arg
))
3438 if not ctx
.skip_mon_network
:
3439 # make sure IP is configured locally, and then figure out the
3441 for net
, ifaces
in list_networks(ctx
).items():
3443 for iface
, ls
in ifaces
.items():
3445 if ipaddress
.ip_address(unwrap_ipv6(base_ip
)) in \
3446 [ipaddress
.ip_address(ip
) for ip
in ips
]:
3448 logger
.info('Mon IP %s is in CIDR network %s' % (base_ip
,
3452 raise Error('Failed to infer CIDR network for mon ip %s; pass '
3453 '--skip-mon-network to configure it later' % base_ip
)
3455 return (addr_arg
, ipv6
, mon_network
)
3458 def prepare_cluster_network(ctx
: CephadmContext
) -> Tuple
[str, bool]:
3459 cluster_network
= ''
3460 ipv6_cluster_network
= False
3461 # the cluster network may not exist on this node, so all we can do is
3462 # validate that the address given is valid ipv4 or ipv6 subnet
3463 if ctx
.cluster_network
:
3464 rc
, versions
, err_msg
= check_subnet(ctx
.cluster_network
)
3466 raise Error(f
'Invalid --cluster-network parameter: {err_msg}')
3467 cluster_network
= ctx
.cluster_network
3468 ipv6_cluster_network
= True if 6 in versions
else False
3470 logger
.info('- internal network (--cluster-network) has not '
3471 'been provided, OSD replication will default to '
3472 'the public_network')
3474 return cluster_network
, ipv6_cluster_network
3477 def create_initial_keys(
3478 ctx
: CephadmContext
,
3481 ) -> Tuple
[str, str, str, Any
, Any
]: # type: ignore
3485 # create some initial keys
3486 logger
.info('Creating initial keys...')
3487 mon_key
= CephContainer(
3490 entrypoint
='/usr/bin/ceph-authtool',
3491 args
=['--gen-print-key'],
3493 admin_key
= CephContainer(
3496 entrypoint
='/usr/bin/ceph-authtool',
3497 args
=['--gen-print-key'],
3499 mgr_key
= CephContainer(
3502 entrypoint
='/usr/bin/ceph-authtool',
3503 args
=['--gen-print-key'],
3506 keyring
= ('[mon.]\n'
3508 '\tcaps mon = allow *\n'
3511 '\tcaps mon = allow *\n'
3512 '\tcaps mds = allow *\n'
3513 '\tcaps mgr = allow *\n'
3514 '\tcaps osd = allow *\n'
3517 '\tcaps mon = profile mgr\n'
3518 '\tcaps mds = allow *\n'
3519 '\tcaps osd = allow *\n'
3520 % (mon_key
, admin_key
, mgr_id
, mgr_key
))
3522 admin_keyring
= write_tmp('[client.admin]\n'
3523 '\tkey = ' + admin_key
+ '\n',
3527 bootstrap_keyring
= write_tmp(keyring
, uid
, gid
)
3528 return (mon_key
, mgr_key
, admin_key
,
3529 bootstrap_keyring
, admin_keyring
)
3532 def create_initial_monmap(
3533 ctx
: CephadmContext
,
3536 mon_id
: str, mon_addr
: str
3538 logger
.info('Creating initial monmap...')
3539 monmap
= write_tmp('', 0, 0)
3540 out
= CephContainer(
3543 entrypoint
='/usr/bin/monmaptool',
3548 '--addv', mon_id
, mon_addr
,
3552 monmap
.name
: '/tmp/monmap:z',
3555 logger
.debug(f
'monmaptool for {mon_id} {mon_addr} on {out}')
3557 # pass monmap file to ceph user for use by ceph-mon --mkfs below
3558 os
.fchown(monmap
.fileno(), uid
, gid
)
3562 def prepare_create_mon(
3563 ctx
: CephadmContext
,
3565 fsid
: str, mon_id
: str,
3566 bootstrap_keyring_path
: str,
3569 logger
.info('Creating mon...')
3570 create_daemon_dirs(ctx
, fsid
, 'mon', mon_id
, uid
, gid
)
3571 mon_dir
= get_data_dir(fsid
, ctx
.data_dir
, 'mon', mon_id
)
3572 log_dir
= get_log_dir(fsid
, ctx
.log_dir
)
3573 out
= CephContainer(
3576 entrypoint
='/usr/bin/ceph-mon',
3582 '--monmap', '/tmp/monmap',
3583 '--keyring', '/tmp/keyring',
3584 ] + get_daemon_args(ctx
, fsid
, 'mon', mon_id
),
3586 log_dir
: '/var/log/ceph:z',
3587 mon_dir
: '/var/lib/ceph/mon/ceph-%s:z' % (mon_id
),
3588 bootstrap_keyring_path
: '/tmp/keyring:z',
3589 monmap_path
: '/tmp/monmap:z',
3592 logger
.debug(f
'create mon.{mon_id} on {out}')
3593 return (mon_dir
, log_dir
)
3597 ctx
: CephadmContext
,
3599 fsid
: str, mon_id
: str
3601 mon_c
= get_container(ctx
, fsid
, 'mon', mon_id
)
3602 ctx
.meta_json
= json
.dumps({'service_name': 'mon'})
3603 deploy_daemon(ctx
, fsid
, 'mon', mon_id
, mon_c
, uid
, gid
,
3604 config
=None, keyring
=None)
3608 ctx
: CephadmContext
,
3609 mon_id
: str, mon_dir
: str,
3610 admin_keyring_path
: str, config_path
: str
3612 logger
.info('Waiting for mon to start...')
3616 entrypoint
='/usr/bin/ceph',
3620 mon_dir
: '/var/lib/ceph/mon/ceph-%s:z' % (mon_id
),
3621 admin_keyring_path
: '/etc/ceph/ceph.client.admin.keyring:z',
3622 config_path
: '/etc/ceph/ceph.conf:z',
3626 # wait for the service to become available
3627 def is_mon_available():
3629 timeout
= ctx
.timeout
if ctx
.timeout
else 60 # seconds
3630 out
, err
, ret
= call(ctx
, c
.run_cmd(),
3635 is_available(ctx
, 'mon', is_mon_available
)
3639 ctx
: CephadmContext
,
3641 fsid
: str, mgr_id
: str, mgr_key
: str,
3642 config
: str, clifunc
: Callable
3644 logger
.info('Creating mgr...')
3645 mgr_keyring
= '[mgr.%s]\n\tkey = %s\n' % (mgr_id
, mgr_key
)
3646 mgr_c
= get_container(ctx
, fsid
, 'mgr', mgr_id
)
3647 # Note:the default port used by the Prometheus node exporter is opened in fw
3648 ctx
.meta_json
= json
.dumps({'service_name': 'mgr'})
3649 deploy_daemon(ctx
, fsid
, 'mgr', mgr_id
, mgr_c
, uid
, gid
,
3650 config
=config
, keyring
=mgr_keyring
, ports
=[9283])
3652 # wait for the service to become available
3653 logger
.info('Waiting for mgr to start...')
3655 def is_mgr_available():
3657 timeout
= ctx
.timeout
if ctx
.timeout
else 60 # seconds
3659 out
= clifunc(['status', '-f', 'json-pretty'], timeout
=timeout
)
3661 return j
.get('mgrmap', {}).get('available', False)
3662 except Exception as e
:
3663 logger
.debug('status failed: %s' % e
)
3665 is_available(ctx
, 'mgr', is_mgr_available
)
3669 ctx
: CephadmContext
,
3670 cli
: Callable
, wait_for_mgr_restart
: Callable
3673 cli(['cephadm', 'set-user', ctx
.ssh_user
])
3676 logger
.info('Using provided ssh config...')
3678 pathify(ctx
.ssh_config
.name
): '/tmp/cephadm-ssh-config:z',
3680 cli(['cephadm', 'set-ssh-config', '-i', '/tmp/cephadm-ssh-config'], extra_mounts
=mounts
)
3682 if ctx
.ssh_private_key
and ctx
.ssh_public_key
:
3683 logger
.info('Using provided ssh keys...')
3685 pathify(ctx
.ssh_private_key
.name
): '/tmp/cephadm-ssh-key:z',
3686 pathify(ctx
.ssh_public_key
.name
): '/tmp/cephadm-ssh-key.pub:z'
3688 cli(['cephadm', 'set-priv-key', '-i', '/tmp/cephadm-ssh-key'], extra_mounts
=mounts
)
3689 cli(['cephadm', 'set-pub-key', '-i', '/tmp/cephadm-ssh-key.pub'], extra_mounts
=mounts
)
3691 logger
.info('Generating ssh key...')
3692 cli(['cephadm', 'generate-key'])
3693 ssh_pub
= cli(['cephadm', 'get-pub-key'])
3695 with
open(ctx
.output_pub_ssh_key
, 'w') as f
:
3697 logger
.info('Wrote public SSH key to %s' % ctx
.output_pub_ssh_key
)
3699 logger
.info('Adding key to %s@localhost authorized_keys...' % ctx
.ssh_user
)
3701 s_pwd
= pwd
.getpwnam(ctx
.ssh_user
)
3703 raise Error('Cannot find uid/gid for ssh-user: %s' % (ctx
.ssh_user
))
3704 ssh_uid
= s_pwd
.pw_uid
3705 ssh_gid
= s_pwd
.pw_gid
3706 ssh_dir
= os
.path
.join(s_pwd
.pw_dir
, '.ssh')
3708 if not os
.path
.exists(ssh_dir
):
3709 makedirs(ssh_dir
, ssh_uid
, ssh_gid
, 0o700)
3711 auth_keys_file
= '%s/authorized_keys' % ssh_dir
3714 if os
.path
.exists(auth_keys_file
):
3715 with
open(auth_keys_file
, 'r') as f
:
3716 f
.seek(0, os
.SEEK_END
)
3718 f
.seek(f
.tell() - 1, os
.SEEK_SET
) # go to last char
3719 if f
.read() != '\n':
3722 with
open(auth_keys_file
, 'a') as f
:
3723 os
.fchown(f
.fileno(), ssh_uid
, ssh_gid
) # just in case we created it
3724 os
.fchmod(f
.fileno(), 0o600) # just in case we created it
3727 f
.write(ssh_pub
.strip() + '\n')
3729 host
= get_hostname()
3730 logger
.info('Adding host %s...' % host
)
3732 args
= ['orch', 'host', 'add', host
]
3734 args
.append(ctx
.mon_ip
)
3736 except RuntimeError as e
:
3737 raise Error('Failed to add host <%s>: %s' % (host
, e
))
3739 for t
in ['mon', 'mgr']:
3740 if not ctx
.orphan_initial_daemons
:
3741 logger
.info('Deploying %s service with default placement...' % t
)
3742 cli(['orch', 'apply', t
])
3744 logger
.info('Deploying unmanaged %s service...' % t
)
3745 cli(['orch', 'apply', t
, '--unmanaged'])
3747 if not ctx
.orphan_initial_daemons
:
3748 logger
.info('Deploying crash service with default placement...')
3749 cli(['orch', 'apply', 'crash'])
3751 if not ctx
.skip_monitoring_stack
:
3752 logger
.info('Enabling mgr prometheus module...')
3753 cli(['mgr', 'module', 'enable', 'prometheus'])
3754 for t
in ['prometheus', 'grafana', 'node-exporter', 'alertmanager']:
3755 logger
.info('Deploying %s service with default placement...' % t
)
3756 cli(['orch', 'apply', t
])
3759 def enable_cephadm_mgr_module(
3760 cli
: Callable
, wait_for_mgr_restart
: Callable
3763 logger
.info('Enabling cephadm module...')
3764 cli(['mgr', 'module', 'enable', 'cephadm'])
3765 wait_for_mgr_restart()
3766 logger
.info('Setting orchestrator backend to cephadm...')
3767 cli(['orch', 'set', 'backend', 'cephadm'])
3770 def prepare_dashboard(
3771 ctx
: CephadmContext
,
3773 cli
: Callable
, wait_for_mgr_restart
: Callable
3776 # Configure SSL port (cephadm only allows to configure dashboard SSL port)
3777 # if the user does not want to use SSL he can change this setting once the cluster is up
3778 cli(['config', 'set', 'mgr', 'mgr/dashboard/ssl_server_port', str(ctx
.ssl_dashboard_port
)])
3780 # configuring dashboard parameters
3781 logger
.info('Enabling the dashboard module...')
3782 cli(['mgr', 'module', 'enable', 'dashboard'])
3783 wait_for_mgr_restart()
3785 # dashboard crt and key
3786 if ctx
.dashboard_key
and ctx
.dashboard_crt
:
3787 logger
.info('Using provided dashboard certificate...')
3789 pathify(ctx
.dashboard_crt
.name
): '/tmp/dashboard.crt:z',
3790 pathify(ctx
.dashboard_key
.name
): '/tmp/dashboard.key:z'
3792 cli(['dashboard', 'set-ssl-certificate', '-i', '/tmp/dashboard.crt'], extra_mounts
=mounts
)
3793 cli(['dashboard', 'set-ssl-certificate-key', '-i', '/tmp/dashboard.key'], extra_mounts
=mounts
)
3795 logger
.info('Generating a dashboard self-signed certificate...')
3796 cli(['dashboard', 'create-self-signed-cert'])
3798 logger
.info('Creating initial admin user...')
3799 password
= ctx
.initial_dashboard_password
or generate_password()
3800 tmp_password_file
= write_tmp(password
, uid
, gid
)
3801 cmd
= ['dashboard', 'ac-user-create', ctx
.initial_dashboard_user
, '-i', '/tmp/dashboard.pw', 'administrator', '--force-password']
3802 if not ctx
.dashboard_password_noupdate
:
3803 cmd
.append('--pwd-update-required')
3804 cli(cmd
, extra_mounts
={pathify(tmp_password_file
.name
): '/tmp/dashboard.pw:z'})
3805 logger
.info('Fetching dashboard port number...')
3806 out
= cli(['config', 'get', 'mgr', 'mgr/dashboard/ssl_server_port'])
3809 # Open dashboard port
3811 fw
.open_ports([port
])
3814 logger
.info('Ceph Dashboard is now available at:\n\n'
3815 '\t URL: https://%s:%s/\n'
3817 '\tPassword: %s\n' % (
3819 ctx
.initial_dashboard_user
,
3823 def prepare_bootstrap_config(
3824 ctx
: CephadmContext
,
3825 fsid
: str, mon_addr
: str, image
: str
3829 cp
= read_config(ctx
.config
)
3830 if not cp
.has_section('global'):
3831 cp
.add_section('global')
3832 cp
.set('global', 'fsid', fsid
)
3833 cp
.set('global', 'mon_host', mon_addr
)
3834 cp
.set('global', 'container_image', image
)
3835 if not cp
.has_section('mon'):
3836 cp
.add_section('mon')
3838 not cp
.has_option('mon', 'auth_allow_insecure_global_id_reclaim')
3839 and not cp
.has_option('mon', 'auth allow insecure global id reclaim')
3841 cp
.set('mon', 'auth_allow_insecure_global_id_reclaim', 'false')
3844 config
= cpf
.getvalue()
3846 if ctx
.registry_json
or ctx
.registry_url
:
3847 command_registry_login(ctx
)
3852 def finish_bootstrap_config(
3853 ctx
: CephadmContext
,
3856 mon_id
: str, mon_dir
: str,
3857 mon_network
: Optional
[str], ipv6
: bool,
3859 cluster_network
: Optional
[str], ipv6_cluster_network
: bool
3862 if not ctx
.no_minimize_config
:
3863 logger
.info('Assimilating anything we can from ceph.conf...')
3865 'config', 'assimilate-conf',
3866 '-i', '/var/lib/ceph/mon/ceph-%s/config' % mon_id
3868 mon_dir
: '/var/lib/ceph/mon/ceph-%s:z' % mon_id
3870 logger
.info('Generating new minimal ceph.conf...')
3872 'config', 'generate-minimal-conf',
3873 '-o', '/var/lib/ceph/mon/ceph-%s/config' % mon_id
3875 mon_dir
: '/var/lib/ceph/mon/ceph-%s:z' % mon_id
3877 # re-read our minimized config
3878 with
open(mon_dir
+ '/config', 'r') as f
:
3880 logger
.info('Restarting the monitor...')
3884 get_unit_name(fsid
, 'mon', mon_id
)
3888 logger
.info(f
'Setting mon public_network to {mon_network}')
3889 cli(['config', 'set', 'mon', 'public_network', mon_network
])
3892 logger
.info(f
'Setting cluster_network to {cluster_network}')
3893 cli(['config', 'set', 'global', 'cluster_network', cluster_network
])
3895 if ipv6
or ipv6_cluster_network
:
3896 logger
.info('Enabling IPv6 (ms_bind_ipv6) binding')
3897 cli(['config', 'set', 'global', 'ms_bind_ipv6', 'true'])
3899 with
open(ctx
.output_config
, 'w') as f
:
3901 logger
.info('Wrote config to %s' % ctx
.output_config
)
3906 def command_bootstrap(ctx
):
3907 # type: (CephadmContext) -> int
3909 if not ctx
.output_config
:
3910 ctx
.output_config
= os
.path
.join(ctx
.output_dir
, 'ceph.conf')
3911 if not ctx
.output_keyring
:
3912 ctx
.output_keyring
= os
.path
.join(ctx
.output_dir
,
3913 'ceph.client.admin.keyring')
3914 if not ctx
.output_pub_ssh_key
:
3915 ctx
.output_pub_ssh_key
= os
.path
.join(ctx
.output_dir
, 'ceph.pub')
3917 # verify output files
3918 for f
in [ctx
.output_config
, ctx
.output_keyring
,
3919 ctx
.output_pub_ssh_key
]:
3920 if not ctx
.allow_overwrite
:
3921 if os
.path
.exists(f
):
3922 raise Error('%s already exists; delete or pass '
3923 '--allow-overwrite to overwrite' % f
)
3924 dirname
= os
.path
.dirname(f
)
3925 if dirname
and not os
.path
.exists(dirname
):
3926 fname
= os
.path
.basename(f
)
3927 logger
.info(f
'Creating directory {dirname} for {fname}')
3929 # use makedirs to create intermediate missing dirs
3930 os
.makedirs(dirname
, 0o755)
3931 except PermissionError
:
3932 raise Error(f
'Unable to create {dirname} due to permissions failure. Retry with root, or sudo or preallocate the directory.')
3934 if not ctx
.skip_prepare_host
:
3935 command_prepare_host(ctx
)
3937 logger
.info('Skip prepare_host')
3940 fsid
= ctx
.fsid
or make_fsid()
3941 hostname
= get_hostname()
3942 if '.' in hostname
and not ctx
.allow_fqdn_hostname
:
3943 raise Error('hostname is a fully qualified domain name (%s); either fix (e.g., "sudo hostname %s" or similar) or pass --allow-fqdn-hostname' % (hostname
, hostname
.split('.')[0]))
3944 mon_id
= ctx
.mon_id
or hostname
3945 mgr_id
= ctx
.mgr_id
or generate_service_id()
3946 logger
.info('Cluster fsid: %s' % fsid
)
3948 lock
= FileLock(ctx
, fsid
)
3951 (addr_arg
, ipv6
, mon_network
) = prepare_mon_addresses(ctx
)
3952 cluster_network
, ipv6_cluster_network
= prepare_cluster_network(ctx
)
3954 config
= prepare_bootstrap_config(ctx
, fsid
, addr_arg
, ctx
.image
)
3956 if not ctx
.skip_pull
:
3957 _pull_image(ctx
, ctx
.image
)
3959 image_ver
= CephContainer(ctx
, ctx
.image
, 'ceph', ['--version']).run().strip()
3960 logger
.info(f
'Ceph version: {image_ver}')
3961 image_release
= image_ver
.split()[4]
3963 not ctx
.allow_mismatched_release
3964 and image_release
not in [DEFAULT_IMAGE_RELEASE
, LATEST_STABLE_RELEASE
]
3967 f
'Container release {image_release} != cephadm release {DEFAULT_IMAGE_RELEASE}; please use matching version of cephadm (pass --allow-mismatched-release to continue anyway)'
3970 logger
.info('Extracting ceph user uid/gid from container image...')
3971 (uid
, gid
) = extract_uid_gid(ctx
)
3973 # create some initial keys
3974 (mon_key
, mgr_key
, admin_key
, bootstrap_keyring
, admin_keyring
) = \
3975 create_initial_keys(ctx
, uid
, gid
, mgr_id
)
3977 monmap
= create_initial_monmap(ctx
, uid
, gid
, fsid
, mon_id
, addr_arg
)
3978 (mon_dir
, log_dir
) = \
3979 prepare_create_mon(ctx
, uid
, gid
, fsid
, mon_id
,
3980 bootstrap_keyring
.name
, monmap
.name
)
3982 with
open(mon_dir
+ '/config', 'w') as f
:
3983 os
.fchown(f
.fileno(), uid
, gid
)
3984 os
.fchmod(f
.fileno(), 0o600)
3987 make_var_run(ctx
, fsid
, uid
, gid
)
3988 create_mon(ctx
, uid
, gid
, fsid
, mon_id
)
3990 # config to issue various CLI commands
3991 tmp_config
= write_tmp(config
, uid
, gid
)
3993 # a CLI helper to reduce our typing
3994 def cli(cmd
, extra_mounts
={}, timeout
=DEFAULT_TIMEOUT
):
3995 # type: (List[str], Dict[str, str], Optional[int]) -> str
3997 log_dir
: '/var/log/ceph:z',
3998 admin_keyring
.name
: '/etc/ceph/ceph.client.admin.keyring:z',
3999 tmp_config
.name
: '/etc/ceph/ceph.conf:z',
4001 for k
, v
in extra_mounts
.items():
4003 timeout
= timeout
or ctx
.timeout
4004 return CephContainer(
4007 entrypoint
='/usr/bin/ceph',
4009 volume_mounts
=mounts
,
4010 ).run(timeout
=timeout
)
4012 wait_for_mon(ctx
, mon_id
, mon_dir
, admin_keyring
.name
, tmp_config
.name
)
4014 finish_bootstrap_config(ctx
, fsid
, config
, mon_id
, mon_dir
,
4015 mon_network
, ipv6
, cli
,
4016 cluster_network
, ipv6_cluster_network
)
4019 with
open(ctx
.output_keyring
, 'w') as f
:
4020 os
.fchmod(f
.fileno(), 0o600)
4021 f
.write('[client.admin]\n'
4022 '\tkey = ' + admin_key
+ '\n')
4023 logger
.info('Wrote keyring to %s' % ctx
.output_keyring
)
4026 create_mgr(ctx
, uid
, gid
, fsid
, mgr_id
, mgr_key
, config
, cli
)
4028 def json_loads_retry(cli_func
):
4029 for sleep_secs
in [1, 4, 4]:
4031 return json
.loads(cli_func())
4032 except json
.JSONDecodeError
:
4033 logger
.debug('Invalid JSON. Retrying in %s seconds...' % sleep_secs
)
4034 time
.sleep(sleep_secs
)
4035 return json
.loads(cli_func())
4037 # wait for mgr to restart (after enabling a module)
4038 def wait_for_mgr_restart():
4039 # first get latest mgrmap epoch from the mon. try newer 'mgr
4040 # stat' command first, then fall back to 'mgr dump' if
4043 j
= json_loads_retry(lambda: cli(['mgr', 'stat']))
4045 j
= json_loads_retry(lambda: cli(['mgr', 'dump']))
4048 # wait for mgr to have it
4049 logger
.info('Waiting for the mgr to restart...')
4051 def mgr_has_latest_epoch():
4054 out
= cli(['tell', 'mgr', 'mgr_status'])
4056 return j
['mgrmap_epoch'] >= epoch
4057 except Exception as e
:
4058 logger
.debug('tell mgr mgr_status failed: %s' % e
)
4060 is_available(ctx
, 'mgr epoch %d' % epoch
, mgr_has_latest_epoch
)
4062 enable_cephadm_mgr_module(cli
, wait_for_mgr_restart
)
4065 if not ctx
.skip_ssh
:
4066 prepare_ssh(ctx
, cli
, wait_for_mgr_restart
)
4068 if ctx
.registry_url
and ctx
.registry_username
and ctx
.registry_password
:
4069 cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_url', ctx
.registry_url
, '--force'])
4070 cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_username', ctx
.registry_username
, '--force'])
4071 cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_password', ctx
.registry_password
, '--force'])
4073 cli(['config', 'set', 'mgr', 'mgr/cephadm/container_init', str(ctx
.container_init
), '--force'])
4075 if ctx
.with_exporter
:
4076 cli(['config-key', 'set', 'mgr/cephadm/exporter_enabled', 'true'])
4077 if ctx
.exporter_config
:
4078 logger
.info('Applying custom cephadm exporter settings')
4079 # validated within the parser, so we can just apply to the store
4080 with tempfile
.NamedTemporaryFile(buffering
=0) as tmp
:
4081 tmp
.write(json
.dumps(ctx
.exporter_config
).encode('utf-8'))
4083 tmp
.name
: '/tmp/exporter-config.json:z'
4085 cli(['cephadm', 'set-exporter-config', '-i', '/tmp/exporter-config.json'], extra_mounts
=mounts
)
4086 logger
.info('-> Use ceph orch apply cephadm-exporter to deploy')
4088 # generate a default SSL configuration for the exporter(s)
4089 logger
.info('Generating a default cephadm exporter configuration (self-signed)')
4090 cli(['cephadm', 'generate-exporter-config'])
4092 # deploy the service (commented out until the cephadm changes are in the ceph container build)
4093 logger
.info('Deploying cephadm exporter service with default placement...')
4094 cli(['orch', 'apply', 'cephadm-exporter'])
4096 if not ctx
.skip_dashboard
:
4097 prepare_dashboard(ctx
, uid
, gid
, cli
, wait_for_mgr_restart
)
4100 logger
.info('Applying %s to cluster' % ctx
.apply_spec
)
4102 with
open(ctx
.apply_spec
) as f
:
4104 if 'hostname:' in line
:
4105 line
= line
.replace('\n', '')
4106 split
= line
.split(': ')
4107 if split
[1] != hostname
:
4108 logger
.info('Adding ssh key to %s' % split
[1])
4110 ssh_key
= '/etc/ceph/ceph.pub'
4111 if ctx
.ssh_public_key
:
4112 ssh_key
= ctx
.ssh_public_key
.name
4113 out
, err
, code
= call_throws(ctx
, ['sudo', '-u', ctx
.ssh_user
, 'ssh-copy-id', '-f', '-i', ssh_key
, '-o StrictHostKeyChecking=no', '%s@%s' % (ctx
.ssh_user
, split
[1])])
4116 mounts
[pathify(ctx
.apply_spec
)] = '/tmp/spec.yml:z'
4118 out
= cli(['orch', 'apply', '-i', '/tmp/spec.yml'], extra_mounts
=mounts
)
4121 logger
.info('You can access the Ceph CLI with:\n\n'
4122 '\tsudo %s shell --fsid %s -c %s -k %s\n' % (
4126 ctx
.output_keyring
))
4127 logger
.info('Please consider enabling telemetry to help improve Ceph:\n\n'
4128 '\tceph telemetry on\n\n'
4129 'For more information see:\n\n'
4130 '\thttps://docs.ceph.com/docs/pacific/mgr/telemetry/\n')
4131 logger
.info('Bootstrap complete.')
4134 ##################################
4137 def command_registry_login(ctx
: CephadmContext
):
4138 if ctx
.registry_json
:
4139 logger
.info('Pulling custom registry login info from %s.' % ctx
.registry_json
)
4140 d
= get_parm(ctx
.registry_json
)
4141 if d
.get('url') and d
.get('username') and d
.get('password'):
4142 ctx
.registry_url
= d
.get('url')
4143 ctx
.registry_username
= d
.get('username')
4144 ctx
.registry_password
= d
.get('password')
4145 registry_login(ctx
, ctx
.registry_url
, ctx
.registry_username
, ctx
.registry_password
)
4147 raise Error('json provided for custom registry login did not include all necessary fields. '
4148 'Please setup json file as\n'
4150 ' "url": "REGISTRY_URL",\n'
4151 ' "username": "REGISTRY_USERNAME",\n'
4152 ' "password": "REGISTRY_PASSWORD"\n'
4154 elif ctx
.registry_url
and ctx
.registry_username
and ctx
.registry_password
:
4155 registry_login(ctx
, ctx
.registry_url
, ctx
.registry_username
, ctx
.registry_password
)
4157 raise Error('Invalid custom registry arguments received. To login to a custom registry include '
4158 '--registry-url, --registry-username and --registry-password '
4159 'options or --registry-json option')
4163 def registry_login(ctx
: CephadmContext
, url
, username
, password
):
4164 logger
.info('Logging into custom registry.')
4166 engine
= ctx
.container_engine
4167 cmd
= [engine
.path
, 'login',
4168 '-u', username
, '-p', password
,
4170 if isinstance(engine
, Podman
):
4171 cmd
.append('--authfile=/etc/ceph/podman-auth.json')
4172 out
, _
, _
= call_throws(ctx
, cmd
)
4173 if isinstance(engine
, Podman
):
4174 os
.chmod('/etc/ceph/podman-auth.json', 0o600)
4176 raise Error('Failed to login to custom registry @ %s as %s with given password' % (ctx
.registry_url
, ctx
.registry_username
))
4178 ##################################
4181 def extract_uid_gid_monitoring(ctx
, daemon_type
):
4182 # type: (CephadmContext, str) -> Tuple[int, int]
4184 if daemon_type
== 'prometheus':
4185 uid
, gid
= extract_uid_gid(ctx
, file_path
='/etc/prometheus')
4186 elif daemon_type
== 'node-exporter':
4187 uid
, gid
= 65534, 65534
4188 elif daemon_type
== 'grafana':
4189 uid
, gid
= extract_uid_gid(ctx
, file_path
='/var/lib/grafana')
4190 elif daemon_type
== 'alertmanager':
4191 uid
, gid
= extract_uid_gid(ctx
, file_path
=['/etc/alertmanager', '/etc/prometheus'])
4193 raise Error('{} not implemented yet'.format(daemon_type
))
4198 def command_deploy(ctx
):
4199 # type: (CephadmContext) -> None
4200 daemon_type
, daemon_id
= ctx
.name
.split('.', 1)
4202 lock
= FileLock(ctx
, ctx
.fsid
)
4205 if daemon_type
not in get_supported_daemons():
4206 raise Error('daemon type %s not recognized' % daemon_type
)
4209 unit_name
= get_unit_name(ctx
.fsid
, daemon_type
, daemon_id
)
4210 container_name
= 'ceph-%s-%s.%s' % (ctx
.fsid
, daemon_type
, daemon_id
)
4211 (_
, state
, _
) = check_unit(ctx
, unit_name
)
4212 if state
== 'running' or is_container_running(ctx
, container_name
):
4216 logger
.info('%s daemon %s ...' % ('Reconfig', ctx
.name
))
4218 logger
.info('%s daemon %s ...' % ('Redeploy', ctx
.name
))
4220 logger
.info('%s daemon %s ...' % ('Deploy', ctx
.name
))
4222 # Get and check ports explicitly required to be opened
4223 daemon_ports
= [] # type: List[int]
4225 # only check port in use if not reconfig or redeploy since service
4226 # we are redeploying/reconfiguring will already be using the port
4227 if not ctx
.reconfig
and not redeploy
:
4229 daemon_ports
= list(map(int, ctx
.tcp_ports
.split()))
4231 if daemon_type
in Ceph
.daemons
:
4232 config
, keyring
= get_config_and_keyring(ctx
)
4233 uid
, gid
= extract_uid_gid(ctx
)
4234 make_var_run(ctx
, ctx
.fsid
, uid
, gid
)
4236 c
= get_container(ctx
, ctx
.fsid
, daemon_type
, daemon_id
,
4237 ptrace
=ctx
.allow_ptrace
)
4238 deploy_daemon(ctx
, ctx
.fsid
, daemon_type
, daemon_id
, c
, uid
, gid
,
4239 config
=config
, keyring
=keyring
,
4240 osd_fsid
=ctx
.osd_fsid
,
4241 reconfig
=ctx
.reconfig
,
4244 elif daemon_type
in Monitoring
.components
:
4245 # monitoring daemon - prometheus, grafana, alertmanager, node-exporter
4247 if not ctx
.reconfig
and not redeploy
:
4248 daemon_ports
.extend(Monitoring
.port_map
[daemon_type
])
4250 # make sure provided config-json is sufficient
4251 config
= get_parm(ctx
.config_json
) # type: ignore
4252 required_files
= Monitoring
.components
[daemon_type
].get('config-json-files', list())
4253 required_args
= Monitoring
.components
[daemon_type
].get('config-json-args', list())
4255 if not config
or not all(c
in config
.get('files', {}).keys() for c
in required_files
): # type: ignore
4256 raise Error('{} deployment requires config-json which must '
4257 'contain file content for {}'.format(daemon_type
.capitalize(), ', '.join(required_files
)))
4259 if not config
or not all(c
in config
.keys() for c
in required_args
): # type: ignore
4260 raise Error('{} deployment requires config-json which must '
4261 'contain arg for {}'.format(daemon_type
.capitalize(), ', '.join(required_args
)))
4263 uid
, gid
= extract_uid_gid_monitoring(ctx
, daemon_type
)
4264 c
= get_container(ctx
, ctx
.fsid
, daemon_type
, daemon_id
)
4265 deploy_daemon(ctx
, ctx
.fsid
, daemon_type
, daemon_id
, c
, uid
, gid
,
4266 reconfig
=ctx
.reconfig
,
4269 elif daemon_type
== NFSGanesha
.daemon_type
:
4270 if not ctx
.reconfig
and not redeploy
:
4271 daemon_ports
.extend(NFSGanesha
.port_map
.values())
4273 config
, keyring
= get_config_and_keyring(ctx
)
4274 # TODO: extract ganesha uid/gid (997, 994) ?
4275 uid
, gid
= extract_uid_gid(ctx
)
4276 c
= get_container(ctx
, ctx
.fsid
, daemon_type
, daemon_id
)
4277 deploy_daemon(ctx
, ctx
.fsid
, daemon_type
, daemon_id
, c
, uid
, gid
,
4278 config
=config
, keyring
=keyring
,
4279 reconfig
=ctx
.reconfig
,
4282 elif daemon_type
== CephIscsi
.daemon_type
:
4283 config
, keyring
= get_config_and_keyring(ctx
)
4284 uid
, gid
= extract_uid_gid(ctx
)
4285 c
= get_container(ctx
, ctx
.fsid
, daemon_type
, daemon_id
)
4286 deploy_daemon(ctx
, ctx
.fsid
, daemon_type
, daemon_id
, c
, uid
, gid
,
4287 config
=config
, keyring
=keyring
,
4288 reconfig
=ctx
.reconfig
,
4291 elif daemon_type
== HAproxy
.daemon_type
:
4292 haproxy
= HAproxy
.init(ctx
, ctx
.fsid
, daemon_id
)
4293 uid
, gid
= haproxy
.extract_uid_gid_haproxy()
4294 c
= get_container(ctx
, ctx
.fsid
, daemon_type
, daemon_id
)
4295 deploy_daemon(ctx
, ctx
.fsid
, daemon_type
, daemon_id
, c
, uid
, gid
,
4296 reconfig
=ctx
.reconfig
,
4299 elif daemon_type
== Keepalived
.daemon_type
:
4300 keepalived
= Keepalived
.init(ctx
, ctx
.fsid
, daemon_id
)
4301 uid
, gid
= keepalived
.extract_uid_gid_keepalived()
4302 c
= get_container(ctx
, ctx
.fsid
, daemon_type
, daemon_id
)
4303 deploy_daemon(ctx
, ctx
.fsid
, daemon_type
, daemon_id
, c
, uid
, gid
,
4304 reconfig
=ctx
.reconfig
,
4307 elif daemon_type
== CustomContainer
.daemon_type
:
4308 cc
= CustomContainer
.init(ctx
, ctx
.fsid
, daemon_id
)
4309 if not ctx
.reconfig
and not redeploy
:
4310 daemon_ports
.extend(cc
.ports
)
4311 c
= get_container(ctx
, ctx
.fsid
, daemon_type
, daemon_id
,
4312 privileged
=cc
.privileged
,
4313 ptrace
=ctx
.allow_ptrace
)
4314 deploy_daemon(ctx
, ctx
.fsid
, daemon_type
, daemon_id
, c
,
4315 uid
=cc
.uid
, gid
=cc
.gid
, config
=None,
4316 keyring
=None, reconfig
=ctx
.reconfig
,
4319 elif daemon_type
== CephadmDaemon
.daemon_type
:
4320 # get current user gid and uid
4323 config_js
= get_parm(ctx
.config_json
) # type: Dict[str, str]
4324 if not daemon_ports
:
4325 logger
.info('cephadm-exporter will use default port ({})'.format(CephadmDaemon
.default_port
))
4326 daemon_ports
= [CephadmDaemon
.default_port
]
4328 CephadmDaemon
.validate_config(config_js
)
4330 deploy_daemon(ctx
, ctx
.fsid
, daemon_type
, daemon_id
, None,
4331 uid
, gid
, ports
=daemon_ports
)
4334 raise Error('daemon type {} not implemented in command_deploy function'
4335 .format(daemon_type
))
4337 ##################################
4341 def command_run(ctx
):
4342 # type: (CephadmContext) -> int
4343 (daemon_type
, daemon_id
) = ctx
.name
.split('.', 1)
4344 c
= get_container(ctx
, ctx
.fsid
, daemon_type
, daemon_id
)
4345 command
= c
.run_cmd()
4346 return call_timeout(ctx
, command
, ctx
.timeout
)
4348 ##################################
4351 def fsid_conf_mismatch(ctx
):
4352 # type: (CephadmContext) -> bool
4353 (config
, _
) = get_config_and_keyring(ctx
)
4355 for c
in config
.split('\n'):
4356 if 'fsid = ' in c
.strip():
4357 if 'fsid = ' + ctx
.fsid
!= c
.strip():
4365 def command_shell(ctx
):
4366 # type: (CephadmContext) -> int
4367 if fsid_conf_mismatch(ctx
):
4368 raise Error('fsid does not match ceph conf')
4371 make_log_dir(ctx
, ctx
.fsid
)
4374 (daemon_type
, daemon_id
) = ctx
.name
.split('.', 1)
4376 daemon_type
= ctx
.name
4379 daemon_type
= 'osd' # get the most mounts
4382 if daemon_id
and not ctx
.fsid
:
4383 raise Error('must pass --fsid to specify cluster')
4385 # use /etc/ceph files by default, if present. we do this instead of
4386 # making these defaults in the arg parser because we don't want an error
4387 # if they don't exist.
4388 if not ctx
.keyring
and os
.path
.exists(SHELL_DEFAULT_KEYRING
):
4389 ctx
.keyring
= SHELL_DEFAULT_KEYRING
4391 container_args
: List
[str] = ['-i']
4392 mounts
= get_container_mounts(ctx
, ctx
.fsid
, daemon_type
, daemon_id
,
4393 no_config
=True if ctx
.config
else False)
4394 binds
= get_container_binds(ctx
, ctx
.fsid
, daemon_type
, daemon_id
)
4396 mounts
[pathify(ctx
.config
)] = '/etc/ceph/ceph.conf:z'
4398 mounts
[pathify(ctx
.keyring
)] = '/etc/ceph/ceph.keyring:z'
4400 for _mount
in ctx
.mount
:
4401 split_src_dst
= _mount
.split(':')
4402 mount
= pathify(split_src_dst
[0])
4403 filename
= os
.path
.basename(split_src_dst
[0])
4404 if len(split_src_dst
) > 1:
4405 dst
= split_src_dst
[1] + ':z' if len(split_src_dst
) == 3 else split_src_dst
[1]
4408 mounts
[mount
] = '/mnt/{}:z'.format(filename
)
4410 command
= ctx
.command
4416 '-e', 'PS1=%s' % CUSTOM_PS1
,
4419 home
= os
.path
.join(ctx
.data_dir
, ctx
.fsid
, 'home')
4420 if not os
.path
.exists(home
):
4421 logger
.debug('Creating root home at %s' % home
)
4422 makedirs(home
, 0, 0, 0o660)
4423 if os
.path
.exists('/etc/skel'):
4424 for f
in os
.listdir('/etc/skel'):
4425 if f
.startswith('.bash'):
4426 shutil
.copyfile(os
.path
.join('/etc/skel', f
),
4427 os
.path
.join(home
, f
))
4428 mounts
[home
] = '/root'
4433 entrypoint
='doesnotmatter',
4435 container_args
=container_args
,
4436 volume_mounts
=mounts
,
4440 command
= c
.shell_cmd(command
)
4442 return call_timeout(ctx
, command
, ctx
.timeout
)
4444 ##################################
4448 def command_enter(ctx
):
4449 # type: (CephadmContext) -> int
4451 raise Error('must pass --fsid to specify cluster')
4452 (daemon_type
, daemon_id
) = ctx
.name
.split('.', 1)
4453 container_args
= ['-i'] # type: List[str]
4455 command
= ctx
.command
4461 '-e', 'PS1=%s' % CUSTOM_PS1
,
4466 entrypoint
='doesnotmatter',
4467 container_args
=container_args
,
4468 cname
='ceph-%s-%s.%s' % (ctx
.fsid
, daemon_type
, daemon_id
),
4470 command
= c
.exec_cmd(command
)
4471 return call_timeout(ctx
, command
, ctx
.timeout
)
4473 ##################################
4478 def command_ceph_volume(ctx
):
4479 # type: (CephadmContext) -> None
4481 make_log_dir(ctx
, ctx
.fsid
)
4483 lock
= FileLock(ctx
, ctx
.fsid
)
4486 (uid
, gid
) = (0, 0) # ceph-volume runs as root
4487 mounts
= get_container_mounts(ctx
, ctx
.fsid
, 'osd', None)
4492 (config
, keyring
) = get_config_and_keyring(ctx
)
4496 tmp_config
= write_tmp(config
, uid
, gid
)
4497 mounts
[tmp_config
.name
] = '/etc/ceph/ceph.conf:z'
4501 tmp_keyring
= write_tmp(keyring
, uid
, gid
)
4502 mounts
[tmp_keyring
.name
] = '/var/lib/ceph/bootstrap-osd/ceph.keyring:z'
4507 entrypoint
='/usr/sbin/ceph-volume',
4511 volume_mounts
=mounts
,
4513 verbosity
= CallVerbosity
.VERBOSE
if ctx
.log_output
else CallVerbosity
.VERBOSE_ON_FAILURE
4514 out
, err
, code
= call_throws(ctx
, c
.run_cmd(), verbosity
=verbosity
)
4518 ##################################
4522 def command_unit(ctx
):
4523 # type: (CephadmContext) -> None
4525 raise Error('must pass --fsid to specify cluster')
4527 unit_name
= get_unit_name_by_daemon_name(ctx
, ctx
.fsid
, ctx
.name
)
4533 verbosity
=CallVerbosity
.VERBOSE
,
4537 ##################################
4541 def command_logs(ctx
):
4542 # type: (CephadmContext) -> None
4544 raise Error('must pass --fsid to specify cluster')
4546 unit_name
= get_unit_name_by_daemon_name(ctx
, ctx
.fsid
, ctx
.name
)
4548 cmd
= [find_program('journalctl')]
4549 cmd
.extend(['-u', unit_name
])
4551 cmd
.extend(ctx
.command
)
4553 # call this directly, without our wrapper, so that we get an unmolested
4554 # stdout with logger prefixing.
4555 logger
.debug('Running command: %s' % ' '.join(cmd
))
4556 subprocess
.call(cmd
) # type: ignore
4558 ##################################
4561 def list_networks(ctx
):
4562 # type: (CephadmContext) -> Dict[str,Dict[str,List[str]]]
4564 # sadly, 18.04's iproute2 4.15.0-2ubun doesn't support the -j flag,
4565 # so we'll need to use a regex to parse 'ip' command output.
4567 # out, _, _ = call_throws(['ip', '-j', 'route', 'ls'])
4568 # j = json.loads(out)
4571 res
= _list_ipv4_networks(ctx
)
4572 res
.update(_list_ipv6_networks(ctx
))
4576 def _list_ipv4_networks(ctx
: CephadmContext
):
4577 execstr
: Optional
[str] = find_executable('ip')
4579 raise FileNotFoundError("unable to find 'ip' command")
4580 out
, _
, _
= call_throws(ctx
, [execstr
, 'route', 'ls'])
4581 return _parse_ipv4_route(out
)
4584 def _parse_ipv4_route(out
):
4585 r
= {} # type: Dict[str,Dict[str,List[str]]]
4586 p
= re
.compile(r
'^(\S+) dev (\S+) (.*)scope link (.*)src (\S+)')
4587 for line
in out
.splitlines():
4596 if iface
not in r
[net
]:
4598 r
[net
][iface
].append(ip
)
4602 def _list_ipv6_networks(ctx
: CephadmContext
):
4603 execstr
: Optional
[str] = find_executable('ip')
4605 raise FileNotFoundError("unable to find 'ip' command")
4606 routes
, _
, _
= call_throws(ctx
, [execstr
, '-6', 'route', 'ls'])
4607 ips
, _
, _
= call_throws(ctx
, [execstr
, '-6', 'addr', 'ls'])
4608 return _parse_ipv6_route(routes
, ips
)
4611 def _parse_ipv6_route(routes
, ips
):
4612 r
= {} # type: Dict[str,Dict[str,List[str]]]
4613 route_p
= re
.compile(r
'^(\S+) dev (\S+) proto (\S+) metric (\S+) .*pref (\S+)$')
4614 ip_p
= re
.compile(r
'^\s+inet6 (\S+)/(.*)scope (.*)$')
4615 iface_p
= re
.compile(r
'^(\d+): (\S+): (.*)$')
4616 for line
in routes
.splitlines():
4617 m
= route_p
.findall(line
)
4618 if not m
or m
[0][0].lower() == 'default':
4621 if '/' not in net
: # only consider networks with a mask
4626 if iface
not in r
[net
]:
4630 for line
in ips
.splitlines():
4631 m
= ip_p
.findall(line
)
4633 m
= iface_p
.findall(line
)
4635 # drop @... suffix, if present
4636 iface
= m
[0][1].split('@')[0]
4639 # find the network it belongs to
4640 net
= [n
for n
in r
.keys()
4641 if ipaddress
.ip_address(ip
) in ipaddress
.ip_network(n
)]
4644 r
[net
[0]][iface
].append(ip
)
4649 def command_list_networks(ctx
):
4650 # type: (CephadmContext) -> None
4651 r
= list_networks(ctx
)
4652 print(json
.dumps(r
, indent
=4))
4654 ##################################
4657 def command_ls(ctx
):
4658 # type: (CephadmContext) -> None
4659 ls
= list_daemons(ctx
, detail
=not ctx
.no_detail
,
4660 legacy_dir
=ctx
.legacy_dir
)
4661 print(json
.dumps(ls
, indent
=4))
4664 def with_units_to_int(v
: str) -> int:
4665 if v
.endswith('iB'):
4667 elif v
.endswith('B'):
4670 if v
[-1].upper() == 'K':
4673 elif v
[-1].upper() == 'M':
4676 elif v
[-1].upper() == 'G':
4677 mult
= 1024 * 1024 * 1024
4679 elif v
[-1].upper() == 'T':
4680 mult
= 1024 * 1024 * 1024 * 1024
4682 return int(float(v
) * mult
)
4685 def list_daemons(ctx
, detail
=True, legacy_dir
=None):
4686 # type: (CephadmContext, bool, Optional[str]) -> List[Dict[str, str]]
4687 host_version
: Optional
[str] = None
4689 container_path
= ctx
.container_engine
.path
4691 data_dir
= ctx
.data_dir
4692 if legacy_dir
is not None:
4693 data_dir
= os
.path
.abspath(legacy_dir
+ data_dir
)
4695 # keep track of ceph versions we see
4696 seen_versions
= {} # type: Dict[str, Optional[str]]
4698 # keep track of image digests
4699 seen_digests
= {} # type: Dict[str, List[str]]
4701 # keep track of memory usage we've seen
4702 seen_memusage
= {} # type: Dict[str, int]
4703 out
, err
, code
= call(
4705 [container_path
, 'stats', '--format', '{{.ID}},{{.MemUsage}}', '--no-stream'],
4706 verbosity
=CallVerbosity
.DEBUG
4708 seen_memusage_cid_len
= 0
4710 for line
in out
.splitlines():
4711 (cid
, usage
) = line
.split(',')
4712 (used
, limit
) = usage
.split(' / ')
4713 seen_memusage
[cid
] = with_units_to_int(used
)
4714 if not seen_memusage_cid_len
:
4715 seen_memusage_cid_len
= len(cid
)
4718 if os
.path
.exists(data_dir
):
4719 for i
in os
.listdir(data_dir
):
4720 if i
in ['mon', 'osd', 'mds', 'mgr']:
4722 for j
in os
.listdir(os
.path
.join(data_dir
, i
)):
4725 (cluster
, daemon_id
) = j
.split('-', 1)
4726 fsid
= get_legacy_daemon_fsid(ctx
,
4727 cluster
, daemon_type
, daemon_id
,
4728 legacy_dir
=legacy_dir
)
4729 legacy_unit_name
= 'ceph-%s@%s' % (daemon_type
, daemon_id
)
4730 val
: Dict
[str, Any
] = {
4732 'name': '%s.%s' % (daemon_type
, daemon_id
),
4733 'fsid': fsid
if fsid
is not None else 'unknown',
4734 'systemd_unit': legacy_unit_name
,
4737 (val
['enabled'], val
['state'], _
) = \
4738 check_unit(ctx
, legacy_unit_name
)
4739 if not host_version
:
4741 out
, err
, code
= call(ctx
,
4743 verbosity
=CallVerbosity
.DEBUG
)
4744 if not code
and out
.startswith('ceph version '):
4745 host_version
= out
.split(' ')[2]
4748 val
['host_version'] = host_version
4751 fsid
= str(i
) # convince mypy that fsid is a str here
4752 for j
in os
.listdir(os
.path
.join(data_dir
, i
)):
4753 if '.' in j
and os
.path
.isdir(os
.path
.join(data_dir
, fsid
, j
)):
4755 (daemon_type
, daemon_id
) = j
.split('.', 1)
4756 unit_name
= get_unit_name(fsid
,
4762 'style': 'cephadm:v1',
4765 'systemd_unit': unit_name
,
4769 (val
['enabled'], val
['state'], _
) = \
4770 check_unit(ctx
, unit_name
)
4774 image_digests
= None
4779 container_path
, 'inspect',
4780 '--format', '{{.Id}},{{.Config.Image}},{{.Image}},{{.Created}},{{index .Config.Labels "io.ceph.version"}}',
4781 'ceph-%s-%s' % (fsid
, j
)
4783 out
, err
, code
= call(ctx
, cmd
, verbosity
=CallVerbosity
.DEBUG
)
4785 (container_id
, image_name
, image_id
, start
,
4786 version
) = out
.strip().split(',')
4787 image_id
= normalize_container_id(image_id
)
4788 daemon_type
= name
.split('.', 1)[0]
4789 start_stamp
= try_convert_datetime(start
)
4791 # collect digests for this image id
4792 image_digests
= seen_digests
.get(image_id
)
4793 if not image_digests
:
4794 out
, err
, code
= call(
4797 container_path
, 'image', 'inspect', image_id
,
4798 '--format', '{{.RepoDigests}}',
4800 verbosity
=CallVerbosity
.DEBUG
)
4802 image_digests
= out
.strip()[1:-1].split(' ')
4803 seen_digests
[image_id
] = image_digests
4805 # identify software version inside the container (if we can)
4806 if not version
or '.' not in version
:
4807 version
= seen_versions
.get(image_id
, None)
4808 if daemon_type
== NFSGanesha
.daemon_type
:
4809 version
= NFSGanesha
.get_version(ctx
, container_id
)
4810 if daemon_type
== CephIscsi
.daemon_type
:
4811 version
= CephIscsi
.get_version(ctx
, container_id
)
4813 if daemon_type
in Ceph
.daemons
:
4814 out
, err
, code
= call(ctx
,
4815 [container_path
, 'exec', container_id
,
4817 verbosity
=CallVerbosity
.DEBUG
)
4819 out
.startswith('ceph version '):
4820 version
= out
.split(' ')[2]
4821 seen_versions
[image_id
] = version
4822 elif daemon_type
== 'grafana':
4823 out
, err
, code
= call(ctx
,
4824 [container_path
, 'exec', container_id
,
4825 'grafana-server', '-v'],
4826 verbosity
=CallVerbosity
.DEBUG
)
4828 out
.startswith('Version '):
4829 version
= out
.split(' ')[1]
4830 seen_versions
[image_id
] = version
4831 elif daemon_type
in ['prometheus',
4834 version
= Monitoring
.get_version(ctx
, container_id
, daemon_type
)
4835 seen_versions
[image_id
] = version
4836 elif daemon_type
== 'haproxy':
4837 out
, err
, code
= call(ctx
,
4838 [container_path
, 'exec', container_id
,
4840 verbosity
=CallVerbosity
.DEBUG
)
4842 out
.startswith('HA-Proxy version '):
4843 version
= out
.split(' ')[2]
4844 seen_versions
[image_id
] = version
4845 elif daemon_type
== 'keepalived':
4846 out
, err
, code
= call(ctx
,
4847 [container_path
, 'exec', container_id
,
4848 'keepalived', '--version'],
4849 verbosity
=CallVerbosity
.DEBUG
)
4851 err
.startswith('Keepalived '):
4852 version
= err
.split(' ')[1]
4853 if version
[0] == 'v':
4854 version
= version
[1:]
4855 seen_versions
[image_id
] = version
4856 elif daemon_type
== CustomContainer
.daemon_type
:
4857 # Because a custom container can contain
4858 # everything, we do not know which command
4859 # to execute to get the version.
4862 logger
.warning('version for unknown daemon type %s' % daemon_type
)
4864 vfile
= os
.path
.join(data_dir
, fsid
, j
, 'unit.image') # type: ignore
4866 with
open(vfile
, 'r') as f
:
4867 image_name
= f
.read().strip() or None
4872 mfile
= os
.path
.join(data_dir
, fsid
, j
, 'unit.meta') # type: ignore
4874 with
open(mfile
, 'r') as f
:
4875 meta
= json
.loads(f
.read())
4880 val
['container_id'] = container_id
4881 val
['container_image_name'] = image_name
4882 val
['container_image_id'] = image_id
4883 val
['container_image_digests'] = image_digests
4885 val
['memory_usage'] = seen_memusage
.get(container_id
[0:seen_memusage_cid_len
])
4886 val
['version'] = version
4887 val
['started'] = start_stamp
4888 val
['created'] = get_file_timestamp(
4889 os
.path
.join(data_dir
, fsid
, j
, 'unit.created')
4891 val
['deployed'] = get_file_timestamp(
4892 os
.path
.join(data_dir
, fsid
, j
, 'unit.image'))
4893 val
['configured'] = get_file_timestamp(
4894 os
.path
.join(data_dir
, fsid
, j
, 'unit.configured'))
4901 def get_daemon_description(ctx
, fsid
, name
, detail
=False, legacy_dir
=None):
4902 # type: (CephadmContext, str, str, bool, Optional[str]) -> Dict[str, str]
4904 for d
in list_daemons(ctx
, detail
=detail
, legacy_dir
=legacy_dir
):
4905 if d
['fsid'] != fsid
:
4907 if d
['name'] != name
:
4910 raise Error('Daemon not found: {}. See `cephadm ls`'.format(name
))
4912 ##################################
4916 def command_adopt(ctx
):
4917 # type: (CephadmContext) -> None
4919 if not ctx
.skip_pull
:
4920 _pull_image(ctx
, ctx
.image
)
4922 (daemon_type
, daemon_id
) = ctx
.name
.split('.', 1)
4925 if ctx
.style
!= 'legacy':
4926 raise Error('adoption of style %s not implemented' % ctx
.style
)
4929 fsid
= get_legacy_daemon_fsid(ctx
,
4933 legacy_dir
=ctx
.legacy_dir
)
4935 raise Error('could not detect legacy fsid; set fsid in ceph.conf')
4936 lock
= FileLock(ctx
, fsid
)
4939 # call correct adoption
4940 if daemon_type
in Ceph
.daemons
:
4941 command_adopt_ceph(ctx
, daemon_type
, daemon_id
, fsid
)
4942 elif daemon_type
== 'prometheus':
4943 command_adopt_prometheus(ctx
, daemon_id
, fsid
)
4944 elif daemon_type
== 'grafana':
4945 command_adopt_grafana(ctx
, daemon_id
, fsid
)
4946 elif daemon_type
== 'node-exporter':
4947 raise Error('adoption of node-exporter not implemented')
4948 elif daemon_type
== 'alertmanager':
4949 command_adopt_alertmanager(ctx
, daemon_id
, fsid
)
4951 raise Error('daemon type %s not recognized' % daemon_type
)
4954 class AdoptOsd(object):
4955 def __init__(self
, ctx
, osd_data_dir
, osd_id
):
4956 # type: (CephadmContext, str, str) -> None
4958 self
.osd_data_dir
= osd_data_dir
4959 self
.osd_id
= osd_id
4961 def check_online_osd(self
):
4962 # type: () -> Tuple[Optional[str], Optional[str]]
4964 osd_fsid
, osd_type
= None, None
4966 path
= os
.path
.join(self
.osd_data_dir
, 'fsid')
4968 with
open(path
, 'r') as f
:
4969 osd_fsid
= f
.read().strip()
4970 logger
.info('Found online OSD at %s' % path
)
4972 logger
.info('Unable to read OSD fsid from %s' % path
)
4973 if os
.path
.exists(os
.path
.join(self
.osd_data_dir
, 'type')):
4974 with
open(os
.path
.join(self
.osd_data_dir
, 'type')) as f
:
4975 osd_type
= f
.read().strip()
4977 logger
.info('"type" file missing for OSD data dir')
4979 return osd_fsid
, osd_type
4981 def check_offline_lvm_osd(self
):
4982 # type: () -> Tuple[Optional[str], Optional[str]]
4983 osd_fsid
, osd_type
= None, None
4987 image
=self
.ctx
.image
,
4988 entrypoint
='/usr/sbin/ceph-volume',
4989 args
=['lvm', 'list', '--format=json'],
4992 out
, err
, code
= call_throws(self
.ctx
, c
.run_cmd())
4995 js
= json
.loads(out
)
4996 if self
.osd_id
in js
:
4997 logger
.info('Found offline LVM OSD {}'.format(self
.osd_id
))
4998 osd_fsid
= js
[self
.osd_id
][0]['tags']['ceph.osd_fsid']
4999 for device
in js
[self
.osd_id
]:
5000 if device
['tags']['ceph.type'] == 'block':
5001 osd_type
= 'bluestore'
5003 if device
['tags']['ceph.type'] == 'data':
5004 osd_type
= 'filestore'
5006 except ValueError as e
:
5007 logger
.info('Invalid JSON in ceph-volume lvm list: {}'.format(e
))
5009 return osd_fsid
, osd_type
5011 def check_offline_simple_osd(self
):
5012 # type: () -> Tuple[Optional[str], Optional[str]]
5013 osd_fsid
, osd_type
= None, None
5015 osd_file
= glob('/etc/ceph/osd/{}-[a-f0-9-]*.json'.format(self
.osd_id
))
5016 if len(osd_file
) == 1:
5017 with
open(osd_file
[0], 'r') as f
:
5019 js
= json
.loads(f
.read())
5020 logger
.info('Found offline simple OSD {}'.format(self
.osd_id
))
5021 osd_fsid
= js
['fsid']
5022 osd_type
= js
['type']
5023 if osd_type
!= 'filestore':
5024 # need this to be mounted for the adopt to work, as it
5025 # needs to move files from this directory
5026 call_throws(self
.ctx
, ['mount', js
['data']['path'], self
.osd_data_dir
])
5027 except ValueError as e
:
5028 logger
.info('Invalid JSON in {}: {}'.format(osd_file
, e
))
5030 return osd_fsid
, osd_type
5033 def command_adopt_ceph(ctx
, daemon_type
, daemon_id
, fsid
):
5034 # type: (CephadmContext, str, str, str) -> None
5036 (uid
, gid
) = extract_uid_gid(ctx
)
5038 data_dir_src
= ('/var/lib/ceph/%s/%s-%s' %
5039 (daemon_type
, ctx
.cluster
, daemon_id
))
5040 data_dir_src
= os
.path
.abspath(ctx
.legacy_dir
+ data_dir_src
)
5042 if not os
.path
.exists(data_dir_src
):
5043 raise Error("{}.{} data directory '{}' does not exist. "
5044 'Incorrect ID specified, or daemon already adopted?'.format(
5045 daemon_type
, daemon_id
, data_dir_src
))
5048 if daemon_type
== 'osd':
5049 adopt_osd
= AdoptOsd(ctx
, data_dir_src
, daemon_id
)
5050 osd_fsid
, osd_type
= adopt_osd
.check_online_osd()
5052 osd_fsid
, osd_type
= adopt_osd
.check_offline_lvm_osd()
5054 osd_fsid
, osd_type
= adopt_osd
.check_offline_simple_osd()
5056 raise Error('Unable to find OSD {}'.format(daemon_id
))
5057 logger
.info('objectstore_type is %s' % osd_type
)
5059 if osd_type
== 'filestore':
5060 raise Error('FileStore is not supported by cephadm')
5062 # NOTE: implicit assumption here that the units correspond to the
5063 # cluster we are adopting based on the /etc/{defaults,sysconfig}/ceph
5065 unit_name
= 'ceph-%s@%s' % (daemon_type
, daemon_id
)
5066 (enabled
, state
, _
) = check_unit(ctx
, unit_name
)
5067 if state
== 'running':
5068 logger
.info('Stopping old systemd unit %s...' % unit_name
)
5069 call_throws(ctx
, ['systemctl', 'stop', unit_name
])
5071 logger
.info('Disabling old systemd unit %s...' % unit_name
)
5072 call_throws(ctx
, ['systemctl', 'disable', unit_name
])
5075 logger
.info('Moving data...')
5076 data_dir_dst
= make_data_dir(ctx
, fsid
, daemon_type
, daemon_id
,
5078 move_files(ctx
, glob(os
.path
.join(data_dir_src
, '*')),
5081 logger
.debug('Remove dir `%s`' % (data_dir_src
))
5082 if os
.path
.ismount(data_dir_src
):
5083 call_throws(ctx
, ['umount', data_dir_src
])
5084 os
.rmdir(data_dir_src
)
5086 logger
.info('Chowning content...')
5087 call_throws(ctx
, ['chown', '-c', '-R', '%d.%d' % (uid
, gid
), data_dir_dst
])
5089 if daemon_type
== 'mon':
5090 # rename *.ldb -> *.sst, in case they are coming from ubuntu
5091 store
= os
.path
.join(data_dir_dst
, 'store.db')
5093 if os
.path
.exists(store
):
5094 for oldf
in os
.listdir(store
):
5095 if oldf
.endswith('.ldb'):
5096 newf
= oldf
.replace('.ldb', '.sst')
5097 oldp
= os
.path
.join(store
, oldf
)
5098 newp
= os
.path
.join(store
, newf
)
5099 logger
.debug('Renaming %s -> %s' % (oldp
, newp
))
5100 os
.rename(oldp
, newp
)
5102 logger
.info('Renamed %d leveldb *.ldb files to *.sst',
5104 if daemon_type
== 'osd':
5105 for n
in ['block', 'block.db', 'block.wal']:
5106 p
= os
.path
.join(data_dir_dst
, n
)
5107 if os
.path
.exists(p
):
5108 logger
.info('Chowning %s...' % p
)
5109 os
.chown(p
, uid
, gid
)
5110 # disable the ceph-volume 'simple' mode files on the host
5111 simple_fn
= os
.path
.join('/etc/ceph/osd',
5112 '%s-%s.json' % (daemon_id
, osd_fsid
))
5113 if os
.path
.exists(simple_fn
):
5114 new_fn
= simple_fn
+ '.adopted-by-cephadm'
5115 logger
.info('Renaming %s -> %s', simple_fn
, new_fn
)
5116 os
.rename(simple_fn
, new_fn
)
5117 logger
.info('Disabling host unit ceph-volume@ simple unit...')
5118 call(ctx
, ['systemctl', 'disable',
5119 'ceph-volume@simple-%s-%s.service' % (daemon_id
, osd_fsid
)])
5121 # assume this is an 'lvm' c-v for now, but don't error
5123 logger
.info('Disabling host unit ceph-volume@ lvm unit...')
5124 call(ctx
, ['systemctl', 'disable',
5125 'ceph-volume@lvm-%s-%s.service' % (daemon_id
, osd_fsid
)])
5128 config_src
= '/etc/ceph/%s.conf' % (ctx
.cluster
)
5129 config_src
= os
.path
.abspath(ctx
.legacy_dir
+ config_src
)
5130 config_dst
= os
.path
.join(data_dir_dst
, 'config')
5131 copy_files(ctx
, [config_src
], config_dst
, uid
=uid
, gid
=gid
)
5134 logger
.info('Moving logs...')
5135 log_dir_src
= ('/var/log/ceph/%s-%s.%s.log*' %
5136 (ctx
.cluster
, daemon_type
, daemon_id
))
5137 log_dir_src
= os
.path
.abspath(ctx
.legacy_dir
+ log_dir_src
)
5138 log_dir_dst
= make_log_dir(ctx
, fsid
, uid
=uid
, gid
=gid
)
5139 move_files(ctx
, glob(log_dir_src
),
5143 logger
.info('Creating new units...')
5144 make_var_run(ctx
, fsid
, uid
, gid
)
5145 c
= get_container(ctx
, fsid
, daemon_type
, daemon_id
)
5146 deploy_daemon_units(ctx
, fsid
, uid
, gid
, daemon_type
, daemon_id
, c
,
5147 enable
=True, # unconditionally enable the new unit
5148 start
=(state
== 'running' or ctx
.force_start
),
5150 update_firewalld(ctx
, daemon_type
)
5153 def command_adopt_prometheus(ctx
, daemon_id
, fsid
):
5154 # type: (CephadmContext, str, str) -> None
5155 daemon_type
= 'prometheus'
5156 (uid
, gid
) = extract_uid_gid_monitoring(ctx
, daemon_type
)
5158 _stop_and_disable(ctx
, 'prometheus')
5160 data_dir_dst
= make_data_dir(ctx
, fsid
, daemon_type
, daemon_id
,
5164 config_src
= '/etc/prometheus/prometheus.yml'
5165 config_src
= os
.path
.abspath(ctx
.legacy_dir
+ config_src
)
5166 config_dst
= os
.path
.join(data_dir_dst
, 'etc/prometheus')
5167 makedirs(config_dst
, uid
, gid
, 0o755)
5168 copy_files(ctx
, [config_src
], config_dst
, uid
=uid
, gid
=gid
)
5171 data_src
= '/var/lib/prometheus/metrics/'
5172 data_src
= os
.path
.abspath(ctx
.legacy_dir
+ data_src
)
5173 data_dst
= os
.path
.join(data_dir_dst
, 'data')
5174 copy_tree(ctx
, [data_src
], data_dst
, uid
=uid
, gid
=gid
)
5176 make_var_run(ctx
, fsid
, uid
, gid
)
5177 c
= get_container(ctx
, fsid
, daemon_type
, daemon_id
)
5178 deploy_daemon(ctx
, fsid
, daemon_type
, daemon_id
, c
, uid
, gid
)
5179 update_firewalld(ctx
, daemon_type
)
5182 def command_adopt_grafana(ctx
, daemon_id
, fsid
):
5183 # type: (CephadmContext, str, str) -> None
5185 daemon_type
= 'grafana'
5186 (uid
, gid
) = extract_uid_gid_monitoring(ctx
, daemon_type
)
5188 _stop_and_disable(ctx
, 'grafana-server')
5190 data_dir_dst
= make_data_dir(ctx
, fsid
, daemon_type
, daemon_id
,
5194 config_src
= '/etc/grafana/grafana.ini'
5195 config_src
= os
.path
.abspath(ctx
.legacy_dir
+ config_src
)
5196 config_dst
= os
.path
.join(data_dir_dst
, 'etc/grafana')
5197 makedirs(config_dst
, uid
, gid
, 0o755)
5198 copy_files(ctx
, [config_src
], config_dst
, uid
=uid
, gid
=gid
)
5200 prov_src
= '/etc/grafana/provisioning/'
5201 prov_src
= os
.path
.abspath(ctx
.legacy_dir
+ prov_src
)
5202 prov_dst
= os
.path
.join(data_dir_dst
, 'etc/grafana')
5203 copy_tree(ctx
, [prov_src
], prov_dst
, uid
=uid
, gid
=gid
)
5206 cert
= '/etc/grafana/grafana.crt'
5207 key
= '/etc/grafana/grafana.key'
5208 if os
.path
.exists(cert
) and os
.path
.exists(key
):
5209 cert_src
= '/etc/grafana/grafana.crt'
5210 cert_src
= os
.path
.abspath(ctx
.legacy_dir
+ cert_src
)
5211 makedirs(os
.path
.join(data_dir_dst
, 'etc/grafana/certs'), uid
, gid
, 0o755)
5212 cert_dst
= os
.path
.join(data_dir_dst
, 'etc/grafana/certs/cert_file')
5213 copy_files(ctx
, [cert_src
], cert_dst
, uid
=uid
, gid
=gid
)
5215 key_src
= '/etc/grafana/grafana.key'
5216 key_src
= os
.path
.abspath(ctx
.legacy_dir
+ key_src
)
5217 key_dst
= os
.path
.join(data_dir_dst
, 'etc/grafana/certs/cert_key')
5218 copy_files(ctx
, [key_src
], key_dst
, uid
=uid
, gid
=gid
)
5220 _adjust_grafana_ini(os
.path
.join(config_dst
, 'grafana.ini'))
5222 logger
.debug('Skipping ssl, missing cert {} or key {}'.format(cert
, key
))
5224 # data - possible custom dashboards/plugins
5225 data_src
= '/var/lib/grafana/'
5226 data_src
= os
.path
.abspath(ctx
.legacy_dir
+ data_src
)
5227 data_dst
= os
.path
.join(data_dir_dst
, 'data')
5228 copy_tree(ctx
, [data_src
], data_dst
, uid
=uid
, gid
=gid
)
5230 make_var_run(ctx
, fsid
, uid
, gid
)
5231 c
= get_container(ctx
, fsid
, daemon_type
, daemon_id
)
5232 deploy_daemon(ctx
, fsid
, daemon_type
, daemon_id
, c
, uid
, gid
)
5233 update_firewalld(ctx
, daemon_type
)
5236 def command_adopt_alertmanager(ctx
, daemon_id
, fsid
):
5237 # type: (CephadmContext, str, str) -> None
5239 daemon_type
= 'alertmanager'
5240 (uid
, gid
) = extract_uid_gid_monitoring(ctx
, daemon_type
)
5242 _stop_and_disable(ctx
, 'prometheus-alertmanager')
5244 data_dir_dst
= make_data_dir(ctx
, fsid
, daemon_type
, daemon_id
,
5248 config_src
= '/etc/prometheus/alertmanager.yml'
5249 config_src
= os
.path
.abspath(ctx
.legacy_dir
+ config_src
)
5250 config_dst
= os
.path
.join(data_dir_dst
, 'etc/alertmanager')
5251 makedirs(config_dst
, uid
, gid
, 0o755)
5252 copy_files(ctx
, [config_src
], config_dst
, uid
=uid
, gid
=gid
)
5255 data_src
= '/var/lib/prometheus/alertmanager/'
5256 data_src
= os
.path
.abspath(ctx
.legacy_dir
+ data_src
)
5257 data_dst
= os
.path
.join(data_dir_dst
, 'etc/alertmanager/data')
5258 copy_tree(ctx
, [data_src
], data_dst
, uid
=uid
, gid
=gid
)
5260 make_var_run(ctx
, fsid
, uid
, gid
)
5261 c
= get_container(ctx
, fsid
, daemon_type
, daemon_id
)
5262 deploy_daemon(ctx
, fsid
, daemon_type
, daemon_id
, c
, uid
, gid
)
5263 update_firewalld(ctx
, daemon_type
)
5266 def _adjust_grafana_ini(filename
):
5267 # type: (str) -> None
5269 # Update cert_file, cert_key pathnames in server section
5270 # ConfigParser does not preserve comments
5272 with
open(filename
, 'r') as grafana_ini
:
5273 lines
= grafana_ini
.readlines()
5274 with
open('{}.new'.format(filename
), 'w') as grafana_ini
:
5275 server_section
= False
5277 if line
.startswith('['):
5278 server_section
= False
5279 if line
.startswith('[server]'):
5280 server_section
= True
5282 line
= re
.sub(r
'^cert_file.*',
5283 'cert_file = /etc/grafana/certs/cert_file', line
)
5284 line
= re
.sub(r
'^cert_key.*',
5285 'cert_key = /etc/grafana/certs/cert_key', line
)
5286 grafana_ini
.write(line
)
5287 os
.rename('{}.new'.format(filename
), filename
)
5288 except OSError as err
:
5289 raise Error('Cannot update {}: {}'.format(filename
, err
))
5292 def _stop_and_disable(ctx
, unit_name
):
5293 # type: (CephadmContext, str) -> None
5295 (enabled
, state
, _
) = check_unit(ctx
, unit_name
)
5296 if state
== 'running':
5297 logger
.info('Stopping old systemd unit %s...' % unit_name
)
5298 call_throws(ctx
, ['systemctl', 'stop', unit_name
])
5300 logger
.info('Disabling old systemd unit %s...' % unit_name
)
5301 call_throws(ctx
, ['systemctl', 'disable', unit_name
])
5303 ##################################
5306 def command_rm_daemon(ctx
):
5307 # type: (CephadmContext) -> None
5308 lock
= FileLock(ctx
, ctx
.fsid
)
5311 (daemon_type
, daemon_id
) = ctx
.name
.split('.', 1)
5312 unit_name
= get_unit_name_by_daemon_name(ctx
, ctx
.fsid
, ctx
.name
)
5314 if daemon_type
in ['mon', 'osd'] and not ctx
.force
:
5315 raise Error('must pass --force to proceed: '
5316 'this command may destroy precious data!')
5318 call(ctx
, ['systemctl', 'stop', unit_name
],
5319 verbosity
=CallVerbosity
.DEBUG
)
5320 call(ctx
, ['systemctl', 'reset-failed', unit_name
],
5321 verbosity
=CallVerbosity
.DEBUG
)
5322 call(ctx
, ['systemctl', 'disable', unit_name
],
5323 verbosity
=CallVerbosity
.DEBUG
)
5324 data_dir
= get_data_dir(ctx
.fsid
, ctx
.data_dir
, daemon_type
, daemon_id
)
5325 if daemon_type
in ['mon', 'osd', 'prometheus'] and \
5326 not ctx
.force_delete_data
:
5327 # rename it out of the way -- do not delete
5328 backup_dir
= os
.path
.join(ctx
.data_dir
, ctx
.fsid
, 'removed')
5329 if not os
.path
.exists(backup_dir
):
5330 makedirs(backup_dir
, 0, 0, DATA_DIR_MODE
)
5331 dirname
= '%s.%s_%s' % (daemon_type
, daemon_id
,
5332 datetime
.datetime
.utcnow().strftime(DATEFMT
))
5334 os
.path
.join(backup_dir
, dirname
))
5336 if daemon_type
== CephadmDaemon
.daemon_type
:
5337 CephadmDaemon
.uninstall(ctx
, ctx
.fsid
, daemon_type
, daemon_id
)
5338 call_throws(ctx
, ['rm', '-rf', data_dir
])
5340 ##################################
5343 def command_rm_cluster(ctx
):
5344 # type: (CephadmContext) -> None
5346 raise Error('must pass --force to proceed: '
5347 'this command may destroy precious data!')
5349 lock
= FileLock(ctx
, ctx
.fsid
)
5352 # stop + disable individual daemon units
5353 for d
in list_daemons(ctx
, detail
=False):
5354 if d
['fsid'] != ctx
.fsid
:
5356 if d
['style'] != 'cephadm:v1':
5358 unit_name
= get_unit_name(ctx
.fsid
, d
['name'])
5359 call(ctx
, ['systemctl', 'stop', unit_name
],
5360 verbosity
=CallVerbosity
.DEBUG
)
5361 call(ctx
, ['systemctl', 'reset-failed', unit_name
],
5362 verbosity
=CallVerbosity
.DEBUG
)
5363 call(ctx
, ['systemctl', 'disable', unit_name
],
5364 verbosity
=CallVerbosity
.DEBUG
)
5367 for unit_name
in ['ceph-%s.target' % ctx
.fsid
]:
5368 call(ctx
, ['systemctl', 'stop', unit_name
],
5369 verbosity
=CallVerbosity
.DEBUG
)
5370 call(ctx
, ['systemctl', 'reset-failed', unit_name
],
5371 verbosity
=CallVerbosity
.DEBUG
)
5372 call(ctx
, ['systemctl', 'disable', unit_name
],
5373 verbosity
=CallVerbosity
.DEBUG
)
5375 slice_name
= 'system-%s.slice' % (('ceph-%s' % ctx
.fsid
).replace('-', '\\x2d'))
5376 call(ctx
, ['systemctl', 'stop', slice_name
],
5377 verbosity
=CallVerbosity
.DEBUG
)
5380 call_throws(ctx
, ['rm', '-f', ctx
.unit_dir
+ # noqa: W504
5381 '/ceph-%s@.service' % ctx
.fsid
])
5382 call_throws(ctx
, ['rm', '-f', ctx
.unit_dir
+ # noqa: W504
5383 '/ceph-%s.target' % ctx
.fsid
])
5384 call_throws(ctx
, ['rm', '-rf',
5385 ctx
.unit_dir
+ '/ceph-%s.target.wants' % ctx
.fsid
])
5387 call_throws(ctx
, ['rm', '-rf', ctx
.data_dir
+ '/' + ctx
.fsid
])
5389 if not ctx
.keep_logs
:
5391 call_throws(ctx
, ['rm', '-rf', ctx
.log_dir
+ '/' + ctx
.fsid
])
5392 call_throws(ctx
, ['rm', '-rf', ctx
.log_dir
+ # noqa: W504
5393 '/*.wants/ceph-%s@*' % ctx
.fsid
])
5395 # rm logrotate config
5396 call_throws(ctx
, ['rm', '-f', ctx
.logrotate_dir
+ '/ceph-%s' % ctx
.fsid
])
5398 # clean up config, keyring, and pub key files
5399 files
= ['/etc/ceph/ceph.conf', '/etc/ceph/ceph.pub', '/etc/ceph/ceph.client.admin.keyring']
5401 if os
.path
.exists(files
[0]):
5403 with
open(files
[0]) as f
:
5404 if ctx
.fsid
in f
.read():
5407 for n
in range(0, len(files
)):
5408 if os
.path
.exists(files
[n
]):
5411 ##################################
5414 def check_time_sync(ctx
, enabler
=None):
5415 # type: (CephadmContext, Optional[Packager]) -> bool
5417 'chrony.service', # 18.04 (at least)
5418 'chronyd.service', # el / opensuse
5419 'systemd-timesyncd.service',
5420 'ntpd.service', # el7 (at least)
5421 'ntp.service', # 18.04 (at least)
5422 'ntpsec.service', # 20.04 (at least) / buster
5424 if not check_units(ctx
, units
, enabler
):
5425 logger
.warning('No time sync service is running; checked for %s' % units
)
5430 def command_check_host(ctx
: CephadmContext
) -> None:
5431 container_path
= ctx
.container_engine
.path
5434 commands
= ['systemctl', 'lvcreate']
5437 check_container_engine(ctx
)
5438 logger
.info('podman|docker (%s) is present' % container_path
)
5440 errors
.append(str(e
))
5442 for command
in commands
:
5444 find_program(command
)
5445 logger
.info('%s is present' % command
)
5447 errors
.append('%s binary does not appear to be installed' % command
)
5449 # check for configured+running chronyd or ntp
5450 if not check_time_sync(ctx
):
5451 errors
.append('No time synchronization is active')
5453 if 'expect_hostname' in ctx
and ctx
.expect_hostname
:
5454 if get_hostname().lower() != ctx
.expect_hostname
.lower():
5455 errors
.append('hostname "%s" does not match expected hostname "%s"' % (
5456 get_hostname(), ctx
.expect_hostname
))
5457 logger
.info('Hostname "%s" matches what is expected.',
5458 ctx
.expect_hostname
)
5461 raise Error('\nERROR: '.join(errors
))
5463 logger
.info('Host looks OK')
5465 ##################################
5468 def command_prepare_host(ctx
: CephadmContext
) -> None:
5469 logger
.info('Verifying podman|docker is present...')
5472 check_container_engine(ctx
)
5474 logger
.warning(str(e
))
5476 pkg
= create_packager(ctx
)
5477 pkg
.install_podman()
5479 logger
.info('Verifying lvm2 is present...')
5480 if not find_executable('lvcreate'):
5482 pkg
= create_packager(ctx
)
5483 pkg
.install(['lvm2'])
5485 logger
.info('Verifying time synchronization is in place...')
5486 if not check_time_sync(ctx
):
5488 pkg
= create_packager(ctx
)
5489 pkg
.install(['chrony'])
5490 # check again, and this time try to enable
5492 check_time_sync(ctx
, enabler
=pkg
)
5494 if 'expect_hostname' in ctx
and ctx
.expect_hostname
and ctx
.expect_hostname
!= get_hostname():
5495 logger
.warning('Adjusting hostname from %s -> %s...' % (get_hostname(), ctx
.expect_hostname
))
5496 call_throws(ctx
, ['hostname', ctx
.expect_hostname
])
5497 with
open('/etc/hostname', 'w') as f
:
5498 f
.write(ctx
.expect_hostname
+ '\n')
5500 logger
.info('Repeating the final host check...')
5501 command_check_host(ctx
)
5503 ##################################
5506 class CustomValidation(argparse
.Action
):
5508 def _check_name(self
, values
):
5510 (daemon_type
, daemon_id
) = values
.split('.', 1)
5512 raise argparse
.ArgumentError(self
,
5513 'must be of the format <type>.<id>. For example, osd.1 or prometheus.myhost.com')
5515 daemons
= get_supported_daemons()
5516 if daemon_type
not in daemons
:
5517 raise argparse
.ArgumentError(self
,
5518 'name must declare the type of daemon e.g. '
5519 '{}'.format(', '.join(daemons
)))
5521 def __call__(self
, parser
, namespace
, values
, option_string
=None):
5522 if self
.dest
== 'name':
5523 self
._check
_name
(values
)
5524 setattr(namespace
, self
.dest
, values
)
5525 elif self
.dest
== 'exporter_config':
5526 cfg
= get_parm(values
)
5527 # run the class' validate method, and convert to an argparse error
5528 # if problems are found
5530 CephadmDaemon
.validate_config(cfg
)
5532 raise argparse
.ArgumentError(self
,
5534 setattr(namespace
, self
.dest
, cfg
)
5536 ##################################
5540 # type: () -> Tuple[Optional[str], Optional[str], Optional[str]]
5542 distro_version
= None
5543 distro_codename
= None
5544 with
open('/etc/os-release', 'r') as f
:
5545 for line
in f
.readlines():
5547 if '=' not in line
or line
.startswith('#'):
5549 (var
, val
) = line
.split('=', 1)
5550 if val
[0] == '"' and val
[-1] == '"':
5553 distro
= val
.lower()
5554 elif var
== 'VERSION_ID':
5555 distro_version
= val
.lower()
5556 elif var
== 'VERSION_CODENAME':
5557 distro_codename
= val
.lower()
5558 return distro
, distro_version
, distro_codename
5561 class Packager(object):
5562 def __init__(self
, ctx
: CephadmContext
,
5563 stable
=None, version
=None, branch
=None, commit
=None):
5565 (stable
and not version
and not branch
and not commit
) or \
5566 (not stable
and version
and not branch
and not commit
) or \
5567 (not stable
and not version
and branch
) or \
5568 (not stable
and not version
and not branch
and not commit
)
5570 self
.stable
= stable
5571 self
.version
= version
5572 self
.branch
= branch
5573 self
.commit
= commit
5576 raise NotImplementedError
5579 raise NotImplementedError
5581 def query_shaman(self
, distro
, distro_version
, branch
, commit
):
5583 logger
.info('Fetching repo metadata from shaman and chacra...')
5584 shaman_url
= 'https://shaman.ceph.com/api/repos/ceph/{branch}/{sha1}/{distro}/{distro_version}/repo/?arch={arch}'.format(
5586 distro_version
=distro_version
,
5588 sha1
=commit
or 'latest',
5592 shaman_response
= urlopen(shaman_url
)
5593 except HTTPError
as err
:
5594 logger
.error('repository not found in shaman (might not be available yet)')
5595 raise Error('%s, failed to fetch %s' % (err
, shaman_url
))
5598 chacra_url
= shaman_response
.geturl()
5599 chacra_response
= urlopen(chacra_url
)
5600 except HTTPError
as err
:
5601 logger
.error('repository not found in chacra (might not be available yet)')
5602 raise Error('%s, failed to fetch %s' % (err
, chacra_url
))
5603 return chacra_response
.read().decode('utf-8')
5605 def repo_gpgkey(self
):
5606 if self
.ctx
.gpg_url
:
5607 return self
.ctx
.gpg_url
5608 if self
.stable
or self
.version
:
5609 return 'https://download.ceph.com/keys/release.asc', 'release'
5611 return 'https://download.ceph.com/keys/autobuild.asc', 'autobuild'
5613 def enable_service(self
, service
):
5615 Start and enable the service (typically using systemd).
5617 call_throws(self
.ctx
, ['systemctl', 'enable', '--now', service
])
5620 class Apt(Packager
):
5626 def __init__(self
, ctx
: CephadmContext
,
5627 stable
, version
, branch
, commit
,
5628 distro
, distro_version
, distro_codename
):
5629 super(Apt
, self
).__init
__(ctx
, stable
=stable
, version
=version
,
5630 branch
=branch
, commit
=commit
)
5632 self
.distro
= self
.DISTRO_NAMES
[distro
]
5633 self
.distro_codename
= distro_codename
5634 self
.distro_version
= distro_version
5636 def repo_path(self
):
5637 return '/etc/apt/sources.list.d/ceph.list'
5641 url
, name
= self
.repo_gpgkey()
5642 logger
.info('Installing repo GPG key from %s...' % url
)
5644 response
= urlopen(url
)
5645 except HTTPError
as err
:
5646 logger
.error('failed to fetch GPG repo key from %s: %s' % (
5648 raise Error('failed to fetch GPG key')
5649 key
= response
.read().decode('utf-8')
5650 with
open('/etc/apt/trusted.gpg.d/ceph.%s.gpg' % name
, 'w') as f
:
5654 content
= 'deb %s/debian-%s/ %s main\n' % (
5655 self
.ctx
.repo_url
, self
.version
, self
.distro_codename
)
5657 content
= 'deb %s/debian-%s/ %s main\n' % (
5658 self
.ctx
.repo_url
, self
.stable
, self
.distro_codename
)
5660 content
= self
.query_shaman(self
.distro
, self
.distro_codename
, self
.branch
,
5663 logger
.info('Installing repo file at %s...' % self
.repo_path())
5664 with
open(self
.repo_path(), 'w') as f
:
5668 for name
in ['autobuild', 'release']:
5669 p
= '/etc/apt/trusted.gpg.d/ceph.%s.gpg' % name
5670 if os
.path
.exists(p
):
5671 logger
.info('Removing repo GPG key %s...' % p
)
5673 if os
.path
.exists(self
.repo_path()):
5674 logger
.info('Removing repo at %s...' % self
.repo_path())
5675 os
.unlink(self
.repo_path())
5677 if self
.distro
== 'ubuntu':
5678 self
.rm_kubic_repo()
5680 def install(self
, ls
):
5681 logger
.info('Installing packages %s...' % ls
)
5682 call_throws(self
.ctx
, ['apt-get', 'install', '-y'] + ls
)
5684 def install_podman(self
):
5685 if self
.distro
== 'ubuntu':
5686 logger
.info('Setting up repo for podman...')
5687 self
.add_kubic_repo()
5688 call_throws(self
.ctx
, ['apt-get', 'update'])
5690 logger
.info('Attempting podman install...')
5692 self
.install(['podman'])
5694 logger
.info('Podman did not work. Falling back to docker...')
5695 self
.install(['docker.io'])
5697 def kubic_repo_url(self
):
5698 return 'https://download.opensuse.org/repositories/devel:/kubic:/' \
5699 'libcontainers:/stable/xUbuntu_%s/' % self
.distro_version
5701 def kubic_repo_path(self
):
5702 return '/etc/apt/sources.list.d/devel:kubic:libcontainers:stable.list'
5704 def kubric_repo_gpgkey_url(self
):
5705 return '%s/Release.key' % self
.kubic_repo_url()
5707 def kubric_repo_gpgkey_path(self
):
5708 return '/etc/apt/trusted.gpg.d/kubic.release.gpg'
5710 def add_kubic_repo(self
):
5711 url
= self
.kubric_repo_gpgkey_url()
5712 logger
.info('Installing repo GPG key from %s...' % url
)
5714 response
= urlopen(url
)
5715 except HTTPError
as err
:
5716 logger
.error('failed to fetch GPG repo key from %s: %s' % (
5718 raise Error('failed to fetch GPG key')
5719 key
= response
.read().decode('utf-8')
5720 tmp_key
= write_tmp(key
, 0, 0)
5721 keyring
= self
.kubric_repo_gpgkey_path()
5722 call_throws(self
.ctx
, ['apt-key', '--keyring', keyring
, 'add', tmp_key
.name
])
5724 logger
.info('Installing repo file at %s...' % self
.kubic_repo_path())
5725 content
= 'deb %s /\n' % self
.kubic_repo_url()
5726 with
open(self
.kubic_repo_path(), 'w') as f
:
5729 def rm_kubic_repo(self
):
5730 keyring
= self
.kubric_repo_gpgkey_path()
5731 if os
.path
.exists(keyring
):
5732 logger
.info('Removing repo GPG key %s...' % keyring
)
5735 p
= self
.kubic_repo_path()
5736 if os
.path
.exists(p
):
5737 logger
.info('Removing repo at %s...' % p
)
5741 class YumDnf(Packager
):
5743 'centos': ('centos', 'el'),
5744 'rhel': ('centos', 'el'),
5745 'scientific': ('centos', 'el'),
5746 'fedora': ('fedora', 'fc'),
5749 def __init__(self
, ctx
: CephadmContext
,
5750 stable
, version
, branch
, commit
,
5751 distro
, distro_version
):
5752 super(YumDnf
, self
).__init
__(ctx
, stable
=stable
, version
=version
,
5753 branch
=branch
, commit
=commit
)
5755 self
.major
= int(distro_version
.split('.')[0])
5756 self
.distro_normalized
= self
.DISTRO_NAMES
[distro
][0]
5757 self
.distro_code
= self
.DISTRO_NAMES
[distro
][1] + str(self
.major
)
5758 if (self
.distro_code
== 'fc' and self
.major
>= 30) or \
5759 (self
.distro_code
== 'el' and self
.major
>= 8):
5764 def custom_repo(self
, **kw
):
5766 Repo files need special care in that a whole line should not be present
5767 if there is no value for it. Because we were using `format()` we could
5768 not conditionally add a line for a repo file. So the end result would
5769 contain a key with a missing value (say if we were passing `None`).
5771 For example, it could look like::
5778 Which breaks. This function allows us to conditionally add lines,
5779 preserving an order and be more careful.
5781 Previously, and for historical purposes, this is how the template used
5797 # by using tuples (vs a dict) we preserve the order of what we want to
5798 # return, like starting with a [repo name]
5800 ('reponame', '[%s]'),
5801 ('name', 'name=%s'),
5802 ('baseurl', 'baseurl=%s'),
5803 ('enabled', 'enabled=%s'),
5804 ('gpgcheck', 'gpgcheck=%s'),
5805 ('_type', 'type=%s'),
5806 ('gpgkey', 'gpgkey=%s'),
5807 ('proxy', 'proxy=%s'),
5808 ('priority', 'priority=%s'),
5812 tmpl_key
, tmpl_value
= line
# key values from tmpl
5814 # ensure that there is an actual value (not None nor empty string)
5815 if tmpl_key
in kw
and kw
.get(tmpl_key
) not in (None, ''):
5816 lines
.append(tmpl_value
% kw
.get(tmpl_key
))
5818 return '\n'.join(lines
)
5820 def repo_path(self
):
5821 return '/etc/yum.repos.d/ceph.repo'
5823 def repo_baseurl(self
):
5824 assert self
.stable
or self
.version
5826 return '%s/rpm-%s/%s' % (self
.ctx
.repo_url
, self
.version
,
5829 return '%s/rpm-%s/%s' % (self
.ctx
.repo_url
, self
.stable
,
5833 if self
.stable
or self
.version
:
5836 'Ceph': '$basearch',
5837 'Ceph-noarch': 'noarch',
5838 'Ceph-source': 'SRPMS'}.items():
5839 content
+= '[%s]\n' % (n
)
5840 content
+= self
.custom_repo(
5842 baseurl
=self
.repo_baseurl() + '/' + t
,
5845 gpgkey
=self
.repo_gpgkey()[0],
5849 content
= self
.query_shaman(self
.distro_normalized
, self
.major
,
5853 logger
.info('Writing repo to %s...' % self
.repo_path())
5854 with
open(self
.repo_path(), 'w') as f
:
5857 if self
.distro_code
.startswith('el'):
5858 logger
.info('Enabling EPEL...')
5859 call_throws(self
.ctx
, [self
.tool
, 'install', '-y', 'epel-release'])
5862 if os
.path
.exists(self
.repo_path()):
5863 os
.unlink(self
.repo_path())
5865 def install(self
, ls
):
5866 logger
.info('Installing packages %s...' % ls
)
5867 call_throws(self
.ctx
, [self
.tool
, 'install', '-y'] + ls
)
5869 def install_podman(self
):
5870 self
.install(['podman'])
5873 class Zypper(Packager
):
5876 'opensuse-tumbleweed',
5880 def __init__(self
, ctx
: CephadmContext
,
5881 stable
, version
, branch
, commit
,
5882 distro
, distro_version
):
5883 super(Zypper
, self
).__init
__(ctx
, stable
=stable
, version
=version
,
5884 branch
=branch
, commit
=commit
)
5886 self
.tool
= 'zypper'
5887 self
.distro
= 'opensuse'
5888 self
.distro_version
= '15.1'
5889 if 'tumbleweed' not in distro
and distro_version
is not None:
5890 self
.distro_version
= distro_version
5892 def custom_repo(self
, **kw
):
5894 See YumDnf for format explanation.
5898 # by using tuples (vs a dict) we preserve the order of what we want to
5899 # return, like starting with a [repo name]
5901 ('reponame', '[%s]'),
5902 ('name', 'name=%s'),
5903 ('baseurl', 'baseurl=%s'),
5904 ('enabled', 'enabled=%s'),
5905 ('gpgcheck', 'gpgcheck=%s'),
5906 ('_type', 'type=%s'),
5907 ('gpgkey', 'gpgkey=%s'),
5908 ('proxy', 'proxy=%s'),
5909 ('priority', 'priority=%s'),
5913 tmpl_key
, tmpl_value
= line
# key values from tmpl
5915 # ensure that there is an actual value (not None nor empty string)
5916 if tmpl_key
in kw
and kw
.get(tmpl_key
) not in (None, ''):
5917 lines
.append(tmpl_value
% kw
.get(tmpl_key
))
5919 return '\n'.join(lines
)
5921 def repo_path(self
):
5922 return '/etc/zypp/repos.d/ceph.repo'
5924 def repo_baseurl(self
):
5925 assert self
.stable
or self
.version
5927 return '%s/rpm-%s/%s' % (self
.ctx
.repo_url
,
5928 self
.stable
, self
.distro
)
5930 return '%s/rpm-%s/%s' % (self
.ctx
.repo_url
,
5931 self
.stable
, self
.distro
)
5934 if self
.stable
or self
.version
:
5937 'Ceph': '$basearch',
5938 'Ceph-noarch': 'noarch',
5939 'Ceph-source': 'SRPMS'}.items():
5940 content
+= '[%s]\n' % (n
)
5941 content
+= self
.custom_repo(
5943 baseurl
=self
.repo_baseurl() + '/' + t
,
5946 gpgkey
=self
.repo_gpgkey()[0],
5950 content
= self
.query_shaman(self
.distro
, self
.distro_version
,
5954 logger
.info('Writing repo to %s...' % self
.repo_path())
5955 with
open(self
.repo_path(), 'w') as f
:
5959 if os
.path
.exists(self
.repo_path()):
5960 os
.unlink(self
.repo_path())
5962 def install(self
, ls
):
5963 logger
.info('Installing packages %s...' % ls
)
5964 call_throws(self
.ctx
, [self
.tool
, 'in', '-y'] + ls
)
5966 def install_podman(self
):
5967 self
.install(['podman'])
5970 def create_packager(ctx
: CephadmContext
,
5971 stable
=None, version
=None, branch
=None, commit
=None):
5972 distro
, distro_version
, distro_codename
= get_distro()
5973 if distro
in YumDnf
.DISTRO_NAMES
:
5974 return YumDnf(ctx
, stable
=stable
, version
=version
,
5975 branch
=branch
, commit
=commit
,
5976 distro
=distro
, distro_version
=distro_version
)
5977 elif distro
in Apt
.DISTRO_NAMES
:
5978 return Apt(ctx
, stable
=stable
, version
=version
,
5979 branch
=branch
, commit
=commit
,
5980 distro
=distro
, distro_version
=distro_version
,
5981 distro_codename
=distro_codename
)
5982 elif distro
in Zypper
.DISTRO_NAMES
:
5983 return Zypper(ctx
, stable
=stable
, version
=version
,
5984 branch
=branch
, commit
=commit
,
5985 distro
=distro
, distro_version
=distro_version
)
5986 raise Error('Distro %s version %s not supported' % (distro
, distro_version
))
5989 def command_add_repo(ctx
: CephadmContext
):
5990 if ctx
.version
and ctx
.release
:
5991 raise Error('you can specify either --release or --version but not both')
5992 if not ctx
.version
and not ctx
.release
and not ctx
.dev
and not ctx
.dev_commit
:
5993 raise Error('please supply a --release, --version, --dev or --dev-commit argument')
5996 (x
, y
, z
) = ctx
.version
.split('.')
5998 raise Error('version must be in the form x.y.z (e.g., 15.2.0)')
6000 pkg
= create_packager(ctx
, stable
=ctx
.release
,
6001 version
=ctx
.version
,
6003 commit
=ctx
.dev_commit
)
6007 def command_rm_repo(ctx
: CephadmContext
):
6008 pkg
= create_packager(ctx
)
6012 def command_install(ctx
: CephadmContext
):
6013 pkg
= create_packager(ctx
)
6014 pkg
.install(ctx
.packages
)
6016 ##################################
6019 def get_ipv4_address(ifname
):
6020 # type: (str) -> str
6021 def _extract(sock
, offset
):
6022 return socket
.inet_ntop(
6027 struct
.pack('256s', bytes(ifname
[:15], 'utf-8'))
6030 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
)
6032 addr
= _extract(s
, 35093) # '0x8915' = SIOCGIFADDR
6033 dq_mask
= _extract(s
, 35099) # 0x891b = SIOCGIFNETMASK
6035 # interface does not have an ipv4 address
6038 dec_mask
= sum([bin(int(i
)).count('1')
6039 for i
in dq_mask
.split('.')])
6040 return '{}/{}'.format(addr
, dec_mask
)
6043 def get_ipv6_address(ifname
):
6044 # type: (str) -> str
6045 if not os
.path
.exists('/proc/net/if_inet6'):
6048 raw
= read_file(['/proc/net/if_inet6'])
6049 data
= raw
.splitlines()
6050 # based on docs @ https://www.tldp.org/HOWTO/Linux+IPv6-HOWTO/ch11s04.html
6051 # field 0 is ipv6, field 2 is scope
6052 for iface_setting
in data
:
6053 field
= iface_setting
.split()
6054 if field
[-1] == ifname
:
6056 ipv6_fmtd
= ':'.join([ipv6_raw
[_p
:_p
+ 4] for _p
in range(0, len(field
[0]), 4)])
6057 # apply naming rules using ipaddress module
6058 ipv6
= ipaddress
.ip_address(ipv6_fmtd
)
6059 return '{}/{}'.format(str(ipv6
), int('0x{}'.format(field
[2]), 16))
6063 def bytes_to_human(num
, mode
='decimal'):
6064 # type: (float, str) -> str
6065 """Convert a bytes value into it's human-readable form.
6067 :param num: number, in bytes, to convert
6068 :param mode: Either decimal (default) or binary to determine divisor
6069 :returns: string representing the bytes value in a more readable format
6071 unit_list
= ['', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB']
6075 if mode
== 'binary':
6076 unit_list
= ['', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB']
6080 for unit
in unit_list
:
6081 if abs(num
) < divisor
:
6082 return '%3.1f%s' % (num
, unit
)
6084 return '%.1f%s' % (num
, yotta
)
6087 def read_file(path_list
, file_name
=''):
6088 # type: (List[str], str) -> str
6089 """Returns the content of the first file found within the `path_list`
6091 :param path_list: list of file paths to search
6092 :param file_name: optional file_name to be applied to a file path
6093 :returns: content of the file or 'Unknown'
6095 for path
in path_list
:
6097 file_path
= os
.path
.join(path
, file_name
)
6100 if os
.path
.exists(file_path
):
6101 with
open(file_path
, 'r') as f
:
6103 content
= f
.read().strip()
6105 # sysfs may populate the file, but for devices like
6106 # virtio reads can fail
6112 ##################################
6116 _dmi_path_list
= ['/sys/class/dmi/id']
6117 _nic_path_list
= ['/sys/class/net']
6118 _selinux_path_list
= ['/etc/selinux/config']
6119 _apparmor_path_list
= ['/etc/apparmor']
6120 _disk_vendor_workarounds
= {
6121 '0x1af4': 'Virtio Block Device'
6124 def __init__(self
, ctx
: CephadmContext
):
6125 self
.ctx
: CephadmContext
= ctx
6126 self
.cpu_model
: str = 'Unknown'
6127 self
.cpu_count
: int = 0
6128 self
.cpu_cores
: int = 0
6129 self
.cpu_threads
: int = 0
6130 self
.interfaces
: Dict
[str, Any
] = {}
6132 self
._meminfo
: List
[str] = read_file(['/proc/meminfo']).splitlines()
6134 self
._process
_nics
()
6135 self
.arch
: str = platform
.processor()
6136 self
.kernel
: str = platform
.release()
6138 def _get_cpuinfo(self
):
6140 """Determine cpu information via /proc/cpuinfo"""
6141 raw
= read_file(['/proc/cpuinfo'])
6142 output
= raw
.splitlines()
6146 field
= [f
.strip() for f
in line
.split(':')]
6147 if 'model name' in line
:
6148 self
.cpu_model
= field
[1]
6149 if 'physical id' in line
:
6150 cpu_set
.add(field
[1])
6151 if 'siblings' in line
:
6152 self
.cpu_threads
= int(field
[1].strip())
6153 if 'cpu cores' in line
:
6154 self
.cpu_cores
= int(field
[1].strip())
6156 self
.cpu_count
= len(cpu_set
)
6158 def _get_block_devs(self
):
6159 # type: () -> List[str]
6160 """Determine the list of block devices by looking at /sys/block"""
6161 return [dev
for dev
in os
.listdir('/sys/block')
6162 if not dev
.startswith('dm')]
6164 def _get_devs_by_type(self
, rota
='0'):
6165 # type: (str) -> List[str]
6166 """Filter block devices by a given rotational attribute (0=flash, 1=spinner)"""
6168 for blk_dev
in self
._get
_block
_devs
():
6169 rot_path
= '/sys/block/{}/queue/rotational'.format(blk_dev
)
6170 rot_value
= read_file([rot_path
])
6171 if rot_value
== rota
:
6172 devs
.append(blk_dev
)
6176 def operating_system(self
):
6178 """Determine OS version"""
6179 raw_info
= read_file(['/etc/os-release'])
6180 os_release
= raw_info
.splitlines()
6184 for line
in os_release
:
6186 var_name
, var_value
= line
.split('=')
6187 rel_dict
[var_name
] = var_value
.strip('"')
6189 # Would normally use PRETTY_NAME, but NAME and VERSION are more
6191 if all(_v
in rel_dict
for _v
in ['NAME', 'VERSION']):
6192 rel_str
= '{} {}'.format(rel_dict
['NAME'], rel_dict
['VERSION'])
6198 """Return the hostname"""
6199 return platform
.node()
6202 def subscribed(self
):
6204 """Highlevel check to see if the host is subscribed to receive updates/support"""
6208 entitlements_dir
= '/etc/pki/entitlement'
6209 if os
.path
.exists(entitlements_dir
):
6210 pems
= glob('{}/*.pem'.format(entitlements_dir
))
6216 os_name
= self
.operating_system
6217 if os_name
.upper().startswith('RED HAT'):
6223 def hdd_count(self
):
6225 """Return a count of HDDs (spinners)"""
6226 return len(self
._get
_devs
_by
_type
(rota
='1'))
6228 def _get_capacity(self
, dev
):
6229 # type: (str) -> int
6230 """Determine the size of a given device"""
6231 size_path
= os
.path
.join('/sys/block', dev
, 'size')
6232 size_blocks
= int(read_file([size_path
]))
6233 blk_path
= os
.path
.join('/sys/block', dev
, 'queue', 'logical_block_size')
6234 blk_count
= int(read_file([blk_path
]))
6235 return size_blocks
* blk_count
6237 def _get_capacity_by_type(self
, rota
='0'):
6238 # type: (str) -> int
6239 """Return the total capacity of a category of device (flash or hdd)"""
6240 devs
= self
._get
_devs
_by
_type
(rota
=rota
)
6243 capacity
+= self
._get
_capacity
(dev
)
6246 def _dev_list(self
, dev_list
):
6247 # type: (List[str]) -> List[Dict[str, object]]
6248 """Return a 'pretty' name list for each device in the `dev_list`"""
6251 for dev
in dev_list
:
6252 disk_model
= read_file(['/sys/block/{}/device/model'.format(dev
)]).strip()
6253 disk_rev
= read_file(['/sys/block/{}/device/rev'.format(dev
)]).strip()
6254 disk_wwid
= read_file(['/sys/block/{}/device/wwid'.format(dev
)]).strip()
6255 vendor
= read_file(['/sys/block/{}/device/vendor'.format(dev
)]).strip()
6256 disk_vendor
= HostFacts
._disk
_vendor
_workarounds
.get(vendor
, vendor
)
6257 disk_size_bytes
= self
._get
_capacity
(dev
)
6259 'description': '{} {} ({})'.format(disk_vendor
, disk_model
, bytes_to_human(disk_size_bytes
)),
6260 'vendor': disk_vendor
,
6261 'model': disk_model
,
6265 'disk_size_bytes': disk_size_bytes
,
6271 # type: () -> List[Dict[str, object]]
6272 """Return a list of devices that are HDDs (spinners)"""
6273 devs
= self
._get
_devs
_by
_type
(rota
='1')
6274 return self
._dev
_list
(devs
)
6277 def flash_list(self
):
6278 # type: () -> List[Dict[str, object]]
6279 """Return a list of devices that are flash based (SSD, NVMe)"""
6280 devs
= self
._get
_devs
_by
_type
(rota
='0')
6281 return self
._dev
_list
(devs
)
6284 def hdd_capacity_bytes(self
):
6286 """Return the total capacity for all HDD devices (bytes)"""
6287 return self
._get
_capacity
_by
_type
(rota
='1')
6290 def hdd_capacity(self
):
6292 """Return the total capacity for all HDD devices (human readable format)"""
6293 return bytes_to_human(self
.hdd_capacity_bytes
)
6297 # type: () -> Dict[str, float]
6298 """Return the cpu load average data for the host"""
6299 raw
= read_file(['/proc/loadavg']).strip()
6302 '1min': float(data
[0]),
6303 '5min': float(data
[1]),
6304 '15min': float(data
[2]),
6308 def flash_count(self
):
6310 """Return the number of flash devices in the system (SSD, NVMe)"""
6311 return len(self
._get
_devs
_by
_type
(rota
='0'))
6314 def flash_capacity_bytes(self
):
6316 """Return the total capacity for all flash devices (bytes)"""
6317 return self
._get
_capacity
_by
_type
(rota
='0')
6320 def flash_capacity(self
):
6322 """Return the total capacity for all Flash devices (human readable format)"""
6323 return bytes_to_human(self
.flash_capacity_bytes
)
6325 def _process_nics(self
):
6327 """Look at the NIC devices and extract network related metadata"""
6328 # from https://github.com/torvalds/linux/blob/master/include/uapi/linux/if_arp.h
6335 for nic_path
in HostFacts
._nic
_path
_list
:
6336 if not os
.path
.exists(nic_path
):
6338 for iface
in os
.listdir(nic_path
):
6340 lower_devs_list
= [os
.path
.basename(link
.replace('lower_', '')) for link
in glob(os
.path
.join(nic_path
, iface
, 'lower_*'))]
6341 upper_devs_list
= [os
.path
.basename(link
.replace('upper_', '')) for link
in glob(os
.path
.join(nic_path
, iface
, 'upper_*'))]
6344 mtu
= int(read_file([os
.path
.join(nic_path
, iface
, 'mtu')]))
6348 operstate
= read_file([os
.path
.join(nic_path
, iface
, 'operstate')])
6350 speed
= int(read_file([os
.path
.join(nic_path
, iface
, 'speed')]))
6351 except (OSError, ValueError):
6352 # OSError : device doesn't support the ethtool get_link_ksettings
6353 # ValueError : raised when the read fails, and returns Unknown
6355 # Either way, we show a -1 when speed isn't available
6358 if os
.path
.exists(os
.path
.join(nic_path
, iface
, 'bridge')):
6360 elif os
.path
.exists(os
.path
.join(nic_path
, iface
, 'bonding')):
6361 nic_type
= 'bonding'
6363 nic_type
= hw_lookup
.get(read_file([os
.path
.join(nic_path
, iface
, 'type')]), 'Unknown')
6365 dev_link
= os
.path
.join(nic_path
, iface
, 'device')
6366 if os
.path
.exists(dev_link
):
6368 driver_path
= os
.path
.join(dev_link
, 'driver')
6369 if os
.path
.exists(driver_path
):
6370 driver
= os
.path
.basename(os
.path
.realpath(driver_path
))
6378 self
.interfaces
[iface
] = {
6380 'upper_devs_list': upper_devs_list
,
6381 'lower_devs_list': lower_devs_list
,
6382 'operstate': operstate
,
6384 'nic_type': nic_type
,
6387 'ipv4_address': get_ipv4_address(iface
),
6388 'ipv6_address': get_ipv6_address(iface
),
6392 def nic_count(self
):
6394 """Return a total count of all physical NICs detected in the host"""
6396 for iface
in self
.interfaces
:
6397 if self
.interfaces
[iface
]['iftype'] == 'physical':
6398 phys_devs
.append(iface
)
6399 return len(phys_devs
)
6401 def _get_mem_data(self
, field_name
):
6402 # type: (str) -> int
6403 for line
in self
._meminfo
:
6404 if line
.startswith(field_name
):
6410 def memory_total_kb(self
):
6412 """Determine the memory installed (kb)"""
6413 return self
._get
_mem
_data
('MemTotal')
6416 def memory_free_kb(self
):
6418 """Determine the memory free (not cache, immediately usable)"""
6419 return self
._get
_mem
_data
('MemFree')
6422 def memory_available_kb(self
):
6424 """Determine the memory available to new applications without swapping"""
6425 return self
._get
_mem
_data
('MemAvailable')
6430 """Determine server vendor from DMI data in sysfs"""
6431 return read_file(HostFacts
._dmi
_path
_list
, 'sys_vendor')
6436 """Determine server model information from DMI data in sysfs"""
6437 family
= read_file(HostFacts
._dmi
_path
_list
, 'product_family')
6438 product
= read_file(HostFacts
._dmi
_path
_list
, 'product_name')
6439 if family
== 'Unknown' and product
:
6440 return '{}'.format(product
)
6442 return '{} ({})'.format(family
, product
)
6445 def bios_version(self
):
6447 """Determine server BIOS version from DMI data in sysfs"""
6448 return read_file(HostFacts
._dmi
_path
_list
, 'bios_version')
6451 def bios_date(self
):
6453 """Determine server BIOS date from DMI data in sysfs"""
6454 return read_file(HostFacts
._dmi
_path
_list
, 'bios_date')
6457 def timestamp(self
):
6459 """Return the current time as Epoch seconds"""
6463 def system_uptime(self
):
6465 """Return the system uptime (in secs)"""
6466 raw_time
= read_file(['/proc/uptime'])
6467 up_secs
, _
= raw_time
.split()
6468 return float(up_secs
)
6471 def kernel_security(self
):
6472 # type: () -> Dict[str, str]
6473 """Determine the security features enabled in the kernel - SELinux, AppArmor"""
6474 def _fetch_selinux() -> Dict
[str, str]:
6475 """Read the selinux config file to determine state"""
6477 for selinux_path
in HostFacts
._selinux
_path
_list
:
6478 if os
.path
.exists(selinux_path
):
6479 selinux_config
= read_file([selinux_path
]).splitlines()
6480 security
['type'] = 'SELinux'
6481 for line
in selinux_config
:
6482 if line
.strip().startswith('#'):
6484 k
, v
= line
.split('=')
6486 if security
['SELINUX'].lower() == 'disabled':
6487 security
['description'] = 'SELinux: Disabled'
6489 security
['description'] = 'SELinux: Enabled({}, {})'.format(security
['SELINUX'], security
['SELINUXTYPE'])
6493 def _fetch_apparmor() -> Dict
[str, str]:
6494 """Read the apparmor profiles directly, returning an overview of AppArmor status"""
6496 for apparmor_path
in HostFacts
._apparmor
_path
_list
:
6497 if os
.path
.exists(apparmor_path
):
6498 security
['type'] = 'AppArmor'
6499 security
['description'] = 'AppArmor: Enabled'
6501 profiles
= read_file(['/sys/kernel/security/apparmor/profiles'])
6505 summary
= {} # type: Dict[str, int]
6506 for line
in profiles
.split('\n'):
6507 item
, mode
= line
.split(' ')
6508 mode
= mode
.strip('()')
6513 summary_str
= ','.join(['{} {}'.format(v
, k
) for k
, v
in summary
.items()])
6514 security
= {**security
, **summary
} # type: ignore
6515 security
['description'] += '({})'.format(summary_str
)
6521 if os
.path
.exists('/sys/kernel/security/lsm'):
6522 lsm
= read_file(['/sys/kernel/security/lsm']).strip()
6523 if 'selinux' in lsm
:
6524 ret
= _fetch_selinux()
6525 elif 'apparmor' in lsm
:
6526 ret
= _fetch_apparmor()
6530 'description': 'Linux Security Module framework is active, but is not using SELinux or AppArmor'
6538 'description': 'Linux Security Module framework is not available'
6542 def selinux_enabled(self
):
6543 return (self
.kernel_security
['type'] == 'SELinux') and \
6544 (self
.kernel_security
['description'] != 'SELinux: Disabled')
6547 def kernel_parameters(self
):
6548 # type: () -> Dict[str, str]
6549 """Get kernel parameters required/used in Ceph clusters"""
6552 out
, _
, _
= call_throws(self
.ctx
, ['sysctl', '-a'], verbosity
=CallVerbosity
.SILENT
)
6554 param_list
= out
.split('\n')
6555 param_dict
= {param
.split(' = ')[0]: param
.split(' = ')[-1] for param
in param_list
}
6557 # return only desired parameters
6558 if 'net.ipv4.ip_nonlocal_bind' in param_dict
:
6559 k_param
['net.ipv4.ip_nonlocal_bind'] = param_dict
['net.ipv4.ip_nonlocal_bind']
6565 """Return the attributes of this HostFacts object as json"""
6567 k
: getattr(self
, k
) for k
in dir(self
)
6568 if not k
.startswith('_')
6569 and isinstance(getattr(self
, k
), (float, int, str, list, dict, tuple))
6571 return json
.dumps(data
, indent
=2, sort_keys
=True)
6573 ##################################
6576 def command_gather_facts(ctx
: CephadmContext
):
6577 """gather_facts is intended to provide host releated metadata to the caller"""
6578 host
= HostFacts(ctx
)
6581 ##################################
6584 def command_verify_prereqs(ctx
: CephadmContext
):
6585 if ctx
.service_type
== 'haproxy' or ctx
.service_type
== 'keepalived':
6586 out
, err
, code
= call(
6587 ctx
, ['sysctl', '-n', 'net.ipv4.ip_nonlocal_bind']
6589 if out
.strip() != '1':
6590 raise Error('net.ipv4.ip_nonlocal_bind not set to 1')
6592 ##################################
6596 task_types
= ['disks', 'daemons', 'host', 'http_server']
6599 self
.started_epoch_secs
= time
.time()
6601 'daemons': 'inactive',
6602 'disks': 'inactive',
6604 'http_server': 'inactive',
6615 'started_epoch_secs': self
.started_epoch_secs
,
6616 'tasks': self
.tasks
,
6617 'errors': self
.errors
,
6622 'health': self
.health
,
6624 'daemons': self
.daemons
,
6625 'disks': self
.disks
,
6628 def update_health(self
, task_type
, task_status
, error_msg
=None):
6629 assert task_type
in CephadmCache
.task_types
6631 self
.tasks
[task_type
] = task_status
6633 self
.errors
.append(error_msg
)
6635 def update_task(self
, task_type
, content
):
6636 assert task_type
in CephadmCache
.task_types
6637 assert isinstance(content
, dict)
6639 current
= getattr(self
, task_type
)
6641 current
[k
] = content
[k
]
6643 setattr(self
, task_type
, current
)
6646 class CephadmHTTPServer(ThreadingMixIn
, HTTPServer
):
6647 allow_reuse_address
= True
6648 daemon_threads
= True
6649 cephadm_cache
: CephadmCache
6653 class CephadmDaemonHandler(BaseHTTPRequestHandler
):
6654 server
: CephadmHTTPServer
6657 f
'/{api_version}/metadata',
6658 f
'/{api_version}/metadata/health',
6659 f
'/{api_version}/metadata/disks',
6660 f
'/{api_version}/metadata/daemons',
6661 f
'/{api_version}/metadata/host',
6666 def authorize(cls
, f
):
6667 """Implement a basic token check.
6669 The token is installed at deployment time and must be provided to
6670 ensure we only respond to callers who know our token i.e. mgr
6672 def wrapper(self
, *args
, **kwargs
):
6673 auth
= self
.headers
.get('Authorization', None)
6674 if auth
!= 'Bearer ' + self
.server
.token
:
6675 self
.send_error(401)
6677 f(self
, *args
, **kwargs
)
6680 def _help_page(self
):
6681 return """<!DOCTYPE html>
6683 <head><title>cephadm metadata exporter</title></head>
6686 font-family: sans-serif;
6691 border-spacing: 0px;
6695 background: PowderBlue;
6702 <h1>cephadm metadata exporter {api_version}</h1>
6705 <tr><th>Endpoint</th><th>Methods</th><th>Response</th><th>Description</th></tr>
6707 <tr><td><a href='{api_version}/metadata'>{api_version}/metadata</a></td><td>GET</td><td>JSON</td><td>Return <b>all</b> metadata for the host</td></tr>
6708 <tr><td><a href='{api_version}/metadata/daemons'>{api_version}/metadata/daemons</a></td><td>GET</td><td>JSON</td><td>Return daemon and systemd states for ceph daemons (ls)</td></tr>
6709 <tr><td><a href='{api_version}/metadata/disks'>{api_version}/metadata/disks</a></td><td>GET</td><td>JSON</td><td>show disk inventory (ceph-volume)</td></tr>
6710 <tr><td><a href='{api_version}/metadata/health'>{api_version}/metadata/health</a></td><td>GET</td><td>JSON</td><td>Show current health of the exporter sub-tasks</td></tr>
6711 <tr><td><a href='{api_version}/metadata/host'>{api_version}/metadata/host</a></td><td>GET</td><td>JSON</td><td>Show host metadata (gather-facts)</td></tr>
6714 </html>""".format(api_version
=CephadmDaemonHandler
.api_version
)
6716 def _fetch_root(self
):
6717 self
.send_response(200)
6718 self
.send_header('Content-type', 'text/html; charset=utf-8')
6720 self
.wfile
.write(self
._help
_page
().encode('utf-8'))
6722 @Decorators.authorize
6724 """Handle *all* GET requests"""
6726 if self
.path
== '/':
6727 # provide a html response if someone hits the root url, to document the
6728 # available api endpoints
6729 return self
._fetch
_root
()
6730 elif self
.path
in CephadmDaemonHandler
.valid_routes
:
6731 u
= self
.path
.split('/')[-1]
6732 data
= json
.dumps({})
6735 tasks
= self
.server
.cephadm_cache
.health
.get('tasks', {})
6738 # We're using the http status code to help indicate thread health
6739 # - 200 (OK): request successful
6740 # - 204 (No Content): access to a cache relating to a dead thread
6741 # - 206 (Partial content): one or more theads are inactive
6742 # - 500 (Server Error): all threads inactive
6744 data
= json
.dumps(self
.server
.cephadm_cache
.to_json())
6745 if all([tasks
[task_name
] == 'inactive' for task_name
in tasks
if task_name
!= 'http_server']):
6746 # All the subtasks are dead!
6748 elif any([tasks
[task_name
] == 'inactive' for task_name
in tasks
if task_name
!= 'http_server']):
6751 # Individual GETs against the a tasks endpoint will also return a 503 if the corresponding thread is inactive
6752 elif u
== 'daemons':
6753 data
= json
.dumps(self
.server
.cephadm_cache
.daemons
)
6754 if tasks
['daemons'] == 'inactive':
6757 data
= json
.dumps(self
.server
.cephadm_cache
.disks
)
6758 if tasks
['disks'] == 'inactive':
6761 data
= json
.dumps(self
.server
.cephadm_cache
.host
)
6762 if tasks
['host'] == 'inactive':
6765 # a GET against health will always return a 200, since the op is always successful
6767 data
= json
.dumps(self
.server
.cephadm_cache
.health
)
6769 self
.send_response(status_code
)
6770 self
.send_header('Content-type', 'application/json')
6772 self
.wfile
.write(data
.encode('utf-8'))
6775 bad_request_msg
= 'Valid URLs are: {}'.format(', '.join(CephadmDaemonHandler
.valid_routes
))
6776 self
.send_response(404, message
=bad_request_msg
) # reason
6777 self
.send_header('Content-type', 'application/json')
6779 self
.wfile
.write(json
.dumps({'message': bad_request_msg
}).encode('utf-8'))
6781 def log_message(self
, format
, *args
):
6782 rqst
= ' '.join(str(a
) for a
in args
)
6783 logger
.info(f
'client:{self.address_string()} [{self.log_date_time_string()}] {rqst}')
6786 class CephadmDaemon():
6788 daemon_type
= 'cephadm-exporter'
6792 token_name
= 'token'
6793 config_requirements
= [
6799 thread_check_interval
= 5
6801 def __init__(self
, ctx
: CephadmContext
, fsid
, daemon_id
=None, port
=None):
6804 self
.daemon_id
= daemon_id
6806 self
.port
= CephadmDaemon
.default_port
6809 self
.workers
: List
[Thread
] = []
6810 self
.http_server
: CephadmHTTPServer
6812 self
.cephadm_cache
= CephadmCache()
6813 self
.errors
: List
[str] = []
6814 self
.token
= read_file([os
.path
.join(self
.daemon_path
, CephadmDaemon
.token_name
)])
6817 def validate_config(cls
, config
):
6818 reqs
= ', '.join(CephadmDaemon
.config_requirements
)
6821 if not config
or not all([k_name
in config
for k_name
in CephadmDaemon
.config_requirements
]):
6822 raise Error(f
'config must contain the following fields : {reqs}')
6824 if not all([isinstance(config
[k_name
], str) for k_name
in CephadmDaemon
.config_requirements
]):
6825 errors
.append(f
'the following fields must be strings: {reqs}')
6827 crt
= config
[CephadmDaemon
.crt_name
]
6828 key
= config
[CephadmDaemon
.key_name
]
6829 token
= config
[CephadmDaemon
.token_name
]
6831 if not crt
.startswith('-----BEGIN CERTIFICATE-----') or not crt
.endswith('-----END CERTIFICATE-----\n'):
6832 errors
.append('crt field is not a valid SSL certificate')
6833 if not key
.startswith('-----BEGIN PRIVATE KEY-----') or not key
.endswith('-----END PRIVATE KEY-----\n'):
6834 errors
.append('key is not a valid SSL private key')
6836 errors
.append("'token' must be more than 8 characters long")
6838 if 'port' in config
:
6840 p
= int(config
['port'])
6843 except (TypeError, ValueError):
6844 errors
.append('port must be an integer > 1024')
6847 raise Error('Parameter errors : {}'.format(', '.join(errors
)))
6850 def port_active(self
):
6851 return port_in_use(self
.ctx
, self
.port
)
6856 if self
.port_active
:
6857 self
.errors
.append(f
'TCP port {self.port} already in use, unable to bind')
6858 if not os
.path
.exists(os
.path
.join(self
.daemon_path
, CephadmDaemon
.key_name
)):
6859 self
.errors
.append(f
"Key file '{CephadmDaemon.key_name}' is missing from {self.daemon_path}")
6860 if not os
.path
.exists(os
.path
.join(self
.daemon_path
, CephadmDaemon
.crt_name
)):
6861 self
.errors
.append(f
"Certificate file '{CephadmDaemon.crt_name}' is missing from {self.daemon_path}")
6862 if self
.token
== 'Unknown':
6863 self
.errors
.append(f
"Authentication token '{CephadmDaemon.token_name}' is missing from {self.daemon_path}")
6864 return len(self
.errors
) == 0
6867 def _unit_name(fsid
, daemon_id
):
6868 return '{}.service'.format(get_unit_name(fsid
, CephadmDaemon
.daemon_type
, daemon_id
))
6871 def unit_name(self
):
6872 return CephadmDaemon
._unit
_name
(self
.fsid
, self
.daemon_id
)
6875 def daemon_path(self
):
6876 return os
.path
.join(
6879 f
'{self.daemon_type}.{self.daemon_id}'
6883 def binary_path(self
):
6884 path
= os
.path
.realpath(__file__
)
6885 assert os
.path
.isfile(path
)
6888 def _handle_thread_exception(self
, exc
, thread_type
):
6889 e_msg
= f
'{exc.__class__.__name__} exception: {str(exc)}'
6890 thread_info
= getattr(self
.cephadm_cache
, thread_type
)
6891 errors
= thread_info
.get('scrape_errors', [])
6892 errors
.append(e_msg
)
6894 logger
.exception(exc
)
6895 self
.cephadm_cache
.update_task(
6898 'scrape_errors': errors
,
6903 def _scrape_host_facts(self
, refresh_interval
=10):
6905 exception_encountered
= False
6909 if self
.stop
or exception_encountered
:
6912 if ctr
>= refresh_interval
:
6914 logger
.debug('executing host-facts scrape')
6916 s_time
= time
.time()
6919 facts
= HostFacts(self
.ctx
)
6920 except Exception as e
:
6921 self
._handle
_thread
_exception
(e
, 'host')
6922 exception_encountered
= True
6924 elapsed
= time
.time() - s_time
6926 data
= json
.loads(facts
.dump())
6927 except json
.decoder
.JSONDecodeError
:
6928 errors
.append('host-facts provided invalid JSON')
6929 logger
.warning(errors
[-1])
6931 self
.cephadm_cache
.update_task(
6934 'scrape_timestamp': s_time
,
6935 'scrape_duration_secs': elapsed
,
6936 'scrape_errors': errors
,
6940 logger
.debug(f
'completed host-facts scrape - {elapsed}s')
6942 time
.sleep(CephadmDaemon
.loop_delay
)
6943 ctr
+= CephadmDaemon
.loop_delay
6944 logger
.info('host-facts thread stopped')
6946 def _scrape_ceph_volume(self
, refresh_interval
=15):
6947 # we're invoking the ceph_volume command, so we need to set the args that it
6949 self
.ctx
.command
= 'inventory --format=json'.split()
6950 self
.ctx
.fsid
= self
.fsid
6951 self
.ctx
.log_output
= False
6954 exception_encountered
= False
6957 if self
.stop
or exception_encountered
:
6960 if ctr
>= refresh_interval
:
6962 logger
.debug('executing ceph-volume scrape')
6964 s_time
= time
.time()
6965 stream
= io
.StringIO()
6967 with
redirect_stdout(stream
):
6968 command_ceph_volume(self
.ctx
)
6969 except Exception as e
:
6970 self
._handle
_thread
_exception
(e
, 'disks')
6971 exception_encountered
= True
6973 elapsed
= time
.time() - s_time
6975 # if the call to ceph-volume returns junk with the
6976 # json, it won't parse
6977 stdout
= stream
.getvalue()
6982 data
= json
.loads(stdout
)
6983 except json
.decoder
.JSONDecodeError
:
6984 errors
.append('ceph-volume thread provided bad json data')
6985 logger
.warning(errors
[-1])
6987 errors
.append('ceph-volume did not return any data')
6988 logger
.warning(errors
[-1])
6990 self
.cephadm_cache
.update_task(
6993 'scrape_timestamp': s_time
,
6994 'scrape_duration_secs': elapsed
,
6995 'scrape_errors': errors
,
7000 logger
.debug(f
'completed ceph-volume scrape - {elapsed}s')
7001 time
.sleep(CephadmDaemon
.loop_delay
)
7002 ctr
+= CephadmDaemon
.loop_delay
7004 logger
.info('ceph-volume thread stopped')
7006 def _scrape_list_daemons(self
, refresh_interval
=20):
7008 exception_encountered
= False
7010 if self
.stop
or exception_encountered
:
7013 if ctr
>= refresh_interval
:
7015 logger
.debug('executing list-daemons scrape')
7017 s_time
= time
.time()
7020 # list daemons should ideally be invoked with a fsid
7021 data
= list_daemons(self
.ctx
)
7022 except Exception as e
:
7023 self
._handle
_thread
_exception
(e
, 'daemons')
7024 exception_encountered
= True
7026 if not isinstance(data
, list):
7027 errors
.append('list-daemons did not supply a list?')
7028 logger
.warning(errors
[-1])
7030 elapsed
= time
.time() - s_time
7031 self
.cephadm_cache
.update_task(
7034 'scrape_timestamp': s_time
,
7035 'scrape_duration_secs': elapsed
,
7036 'scrape_errors': errors
,
7040 logger
.debug(f
'completed list-daemons scrape - {elapsed}s')
7042 time
.sleep(CephadmDaemon
.loop_delay
)
7043 ctr
+= CephadmDaemon
.loop_delay
7044 logger
.info('list-daemons thread stopped')
7046 def _create_thread(self
, target
, name
, refresh_interval
=None):
7047 if refresh_interval
:
7048 t
= Thread(target
=target
, args
=(refresh_interval
,))
7050 t
= Thread(target
=target
)
7053 self
.cephadm_cache
.update_health(name
, 'active')
7056 start_msg
= f
'Started {name} thread'
7057 if refresh_interval
:
7058 logger
.info(f
'{start_msg}, with a refresh interval of {refresh_interval}s')
7060 logger
.info(f
'{start_msg}')
7063 def reload(self
, *args
):
7064 """reload -HUP received
7066 This is a placeholder function only, and serves to provide the hook that could
7067 be exploited later if the exporter evolves to incorporate a config file
7069 logger
.info('Reload request received - ignoring, no action needed')
7071 def shutdown(self
, *args
):
7072 logger
.info('Shutdown request received')
7074 self
.http_server
.shutdown()
7077 logger
.info(f
"cephadm exporter starting for FSID '{self.fsid}'")
7078 if not self
.can_run
:
7079 logger
.error('Unable to start the exporter daemon')
7080 for e
in self
.errors
:
7084 # register signal handlers for running under systemd control
7085 signal
.signal(signal
.SIGTERM
, self
.shutdown
)
7086 signal
.signal(signal
.SIGINT
, self
.shutdown
)
7087 signal
.signal(signal
.SIGHUP
, self
.reload)
7088 logger
.debug('Signal handlers attached')
7090 host_facts
= self
._create
_thread
(self
._scrape
_host
_facts
, 'host', 5)
7091 self
.workers
.append(host_facts
)
7093 daemons
= self
._create
_thread
(self
._scrape
_list
_daemons
, 'daemons', 20)
7094 self
.workers
.append(daemons
)
7096 disks
= self
._create
_thread
(self
._scrape
_ceph
_volume
, 'disks', 20)
7097 self
.workers
.append(disks
)
7099 self
.http_server
= CephadmHTTPServer(('0.0.0.0', self
.port
), CephadmDaemonHandler
) # IPv4 only
7100 self
.http_server
.socket
= ssl
.wrap_socket(self
.http_server
.socket
,
7101 keyfile
=os
.path
.join(self
.daemon_path
, CephadmDaemon
.key_name
),
7102 certfile
=os
.path
.join(self
.daemon_path
, CephadmDaemon
.crt_name
),
7105 self
.http_server
.cephadm_cache
= self
.cephadm_cache
7106 self
.http_server
.token
= self
.token
7107 server_thread
= self
._create
_thread
(self
.http_server
.serve_forever
, 'http_server')
7108 logger
.info(f
'https server listening on {self.http_server.server_address[0]}:{self.http_server.server_port}')
7111 while server_thread
.is_alive():
7115 if ctr
>= CephadmDaemon
.thread_check_interval
:
7117 for worker
in self
.workers
:
7118 if self
.cephadm_cache
.tasks
[worker
.name
] == 'inactive':
7120 if not worker
.is_alive():
7121 logger
.warning(f
'{worker.name} thread not running')
7122 stop_time
= datetime
.datetime
.now().strftime('%Y/%m/%d %H:%M:%S')
7123 self
.cephadm_cache
.update_health(worker
.name
, 'inactive', f
'{worker.name} stopped at {stop_time}')
7125 time
.sleep(CephadmDaemon
.loop_delay
)
7126 ctr
+= CephadmDaemon
.loop_delay
7128 logger
.info('Main http server thread stopped')
7134 {py3} {bin_path} exporter --fsid {fsid} --id {daemon_id} --port {port} &""".format(
7135 py3
=shutil
.which('python3'),
7136 bin_path
=self
.binary_path
,
7138 daemon_id
=self
.daemon_id
,
7143 def unit_file(self
):
7144 docker
= isinstance(self
.ctx
.container_engine
, Docker
)
7145 return """#generated by cephadm
7147 Description=cephadm exporter service for cluster {fsid}
7148 After=network-online.target{docker_after}
7149 Wants=network-online.target
7152 PartOf=ceph-{fsid}.target
7153 Before=ceph-{fsid}.target
7157 ExecStart=/bin/bash {daemon_path}/unit.run
7158 ExecReload=/bin/kill -HUP $MAINPID
7163 WantedBy=ceph-{fsid}.target
7164 """.format(fsid
=self
.fsid
,
7165 daemon_path
=self
.daemon_path
,
7166 # if docker, we depend on docker.service
7167 docker_after
=' docker.service' if docker
else '',
7168 docker_requires
='Requires=docker.service\n' if docker
else '')
7170 def deploy_daemon_unit(self
, config
=None):
7171 """deploy a specific unit file for cephadm
7173 The normal deploy_daemon_units doesn't apply for this
7174 daemon since it's not a container, so we just create a
7175 simple service definition and add it to the fsid's target
7178 raise Error('Attempting to deploy cephadm daemon without a config')
7179 assert isinstance(config
, dict)
7181 # Create the required config files in the daemons dir, with restricted permissions
7182 for filename
in config
:
7183 with
open(os
.open(os
.path
.join(self
.daemon_path
, filename
), os
.O_CREAT | os
.O_WRONLY
, mode
=0o600), 'w') as f
:
7184 f
.write(config
[filename
])
7186 # When __file__ is <stdin> we're being invoked over remoto via the orchestrator, so
7187 # we pick up the file from where the orchestrator placed it - otherwise we'll
7188 # copy it to the binary location for this cluster
7189 if not __file__
== '<stdin>':
7190 shutil
.copy(__file__
,
7193 with
open(os
.path
.join(self
.daemon_path
, 'unit.run'), 'w') as f
:
7194 f
.write(self
.unit_run
)
7197 os
.path
.join(self
.ctx
.unit_dir
,
7198 f
'{self.unit_name}.new'),
7201 f
.write(self
.unit_file
)
7203 os
.path
.join(self
.ctx
.unit_dir
, f
'{self.unit_name}.new'),
7204 os
.path
.join(self
.ctx
.unit_dir
, self
.unit_name
))
7206 call_throws(self
.ctx
, ['systemctl', 'daemon-reload'])
7207 call(self
.ctx
, ['systemctl', 'stop', self
.unit_name
],
7208 verbosity
=CallVerbosity
.DEBUG
)
7209 call(self
.ctx
, ['systemctl', 'reset-failed', self
.unit_name
],
7210 verbosity
=CallVerbosity
.DEBUG
)
7211 call_throws(self
.ctx
, ['systemctl', 'enable', '--now', self
.unit_name
])
7214 def uninstall(cls
, ctx
: CephadmContext
, fsid
, daemon_type
, daemon_id
):
7215 unit_name
= CephadmDaemon
._unit
_name
(fsid
, daemon_id
)
7216 unit_path
= os
.path
.join(ctx
.unit_dir
, unit_name
)
7217 unit_run
= os
.path
.join(ctx
.data_dir
, fsid
, f
'{daemon_type}.{daemon_id}', 'unit.run')
7220 with
open(unit_run
, 'r') as u
:
7221 contents
= u
.read().strip(' &')
7223 logger
.warning(f
'Unable to access the unit.run file @ {unit_run}')
7227 for line
in contents
.split('\n'):
7228 if '--port ' in line
:
7230 port
= int(line
.split('--port ')[-1])
7232 logger
.warning('Unexpected format in unit.run file: port is not numeric')
7233 logger
.warning('Unable to remove the systemd file and close the port')
7240 fw
.close_ports([port
])
7241 except RuntimeError:
7242 logger
.error(f
'Unable to close port {port}')
7244 stdout
, stderr
, rc
= call(ctx
, ['rm', '-f', unit_path
])
7246 logger
.error(f
'Unable to remove the systemd file @ {unit_path}')
7248 logger
.info(f
'removed systemd unit file @ {unit_path}')
7249 stdout
, stderr
, rc
= call(ctx
, ['systemctl', 'daemon-reload'])
7252 def command_exporter(ctx
: CephadmContext
):
7253 exporter
= CephadmDaemon(ctx
, ctx
.fsid
, daemon_id
=ctx
.id, port
=ctx
.port
)
7255 if ctx
.fsid
not in os
.listdir(ctx
.data_dir
):
7256 raise Error(f
"cluster fsid '{ctx.fsid}' not found in '{ctx.data_dir}'")
7260 ##################################
7263 def systemd_target_state(target_name
: str, subsystem
: str = 'ceph') -> bool:
7265 return os
.path
.exists(
7268 f
'{subsystem}.target.wants',
7275 def command_maintenance(ctx
: CephadmContext
):
7277 raise Error('must pass --fsid to specify cluster')
7279 target
= f
'ceph-{ctx.fsid}.target'
7281 if ctx
.maintenance_action
.lower() == 'enter':
7282 logger
.info('Requested to place host into maintenance')
7283 if systemd_target_state(target
):
7284 _out
, _err
, code
= call(ctx
,
7285 ['systemctl', 'disable', target
],
7286 verbosity
=CallVerbosity
.DEBUG
)
7288 logger
.error(f
'Failed to disable the {target} target')
7289 return 'failed - to disable the target'
7291 # stopping a target waits by default
7292 _out
, _err
, code
= call(ctx
,
7293 ['systemctl', 'stop', target
],
7294 verbosity
=CallVerbosity
.DEBUG
)
7296 logger
.error(f
'Failed to stop the {target} target')
7297 return 'failed - to disable the target'
7299 return f
'success - systemd target {target} disabled'
7302 return 'skipped - target already disabled'
7305 logger
.info('Requested to exit maintenance state')
7306 # exit maintenance request
7307 if not systemd_target_state(target
):
7308 _out
, _err
, code
= call(ctx
,
7309 ['systemctl', 'enable', target
],
7310 verbosity
=CallVerbosity
.DEBUG
)
7312 logger
.error(f
'Failed to enable the {target} target')
7313 return 'failed - unable to enable the target'
7315 # starting a target waits by default
7316 _out
, _err
, code
= call(ctx
,
7317 ['systemctl', 'start', target
],
7318 verbosity
=CallVerbosity
.DEBUG
)
7320 logger
.error(f
'Failed to start the {target} target')
7321 return 'failed - unable to start the target'
7323 return f
'success - systemd target {target} enabled and started'
7325 ##################################
7329 # type: () -> argparse.ArgumentParser
7330 parser
= argparse
.ArgumentParser(
7331 description
='Bootstrap Ceph daemons with systemd and containers.',
7332 formatter_class
=argparse
.ArgumentDefaultsHelpFormatter
)
7333 parser
.add_argument(
7335 help='container image. Can also be set via the "CEPHADM_IMAGE" '
7337 parser
.add_argument(
7339 action
='store_true',
7340 help='use docker instead of podman')
7341 parser
.add_argument(
7344 help='base directory for daemon data')
7345 parser
.add_argument(
7348 help='base directory for daemon logs')
7349 parser
.add_argument(
7351 default
=LOGROTATE_DIR
,
7352 help='location of logrotate configuration files')
7353 parser
.add_argument(
7356 help='base directory for systemd units')
7357 parser
.add_argument(
7359 action
='store_true',
7360 help='Show debug-level log messages')
7361 parser
.add_argument(
7364 default
=DEFAULT_TIMEOUT
,
7365 help='timeout in seconds')
7366 parser
.add_argument(
7369 default
=DEFAULT_RETRY
,
7370 help='max number of retries')
7371 parser
.add_argument(
7375 help='set environment variable')
7376 parser
.add_argument(
7377 '--no-container-init',
7378 action
='store_true',
7379 default
=not CONTAINER_INIT
,
7380 help='Do not run podman/docker with `--init`')
7382 subparsers
= parser
.add_subparsers(help='sub-command')
7384 parser_version
= subparsers
.add_parser(
7385 'version', help='get ceph version from container')
7386 parser_version
.set_defaults(func
=command_version
)
7388 parser_pull
= subparsers
.add_parser(
7389 'pull', help='pull latest image version')
7390 parser_pull
.set_defaults(func
=command_pull
)
7392 parser_inspect_image
= subparsers
.add_parser(
7393 'inspect-image', help='inspect local container image')
7394 parser_inspect_image
.set_defaults(func
=command_inspect_image
)
7396 parser_ls
= subparsers
.add_parser(
7397 'ls', help='list daemon instances on this host')
7398 parser_ls
.set_defaults(func
=command_ls
)
7399 parser_ls
.add_argument(
7401 action
='store_true',
7402 help='Do not include daemon status')
7403 parser_ls
.add_argument(
7406 help='base directory for legacy daemon data')
7408 parser_list_networks
= subparsers
.add_parser(
7409 'list-networks', help='list IP networks')
7410 parser_list_networks
.set_defaults(func
=command_list_networks
)
7412 parser_adopt
= subparsers
.add_parser(
7413 'adopt', help='adopt daemon deployed with a different tool')
7414 parser_adopt
.set_defaults(func
=command_adopt
)
7415 parser_adopt
.add_argument(
7418 help='daemon name (type.id)')
7419 parser_adopt
.add_argument(
7422 help='deployment style (legacy, ...)')
7423 parser_adopt
.add_argument(
7426 help='cluster name')
7427 parser_adopt
.add_argument(
7430 help='base directory for legacy daemon data')
7431 parser_adopt
.add_argument(
7433 help='Additional configuration information in JSON format')
7434 parser_adopt
.add_argument(
7436 action
='store_true',
7437 help='Do not configure firewalld')
7438 parser_adopt
.add_argument(
7440 action
='store_true',
7441 help='do not pull the latest image before adopting')
7442 parser_adopt
.add_argument(
7444 action
='store_true',
7445 help='start newly adoped daemon, even if it was not running previously')
7446 parser_adopt
.add_argument(
7448 action
='store_true',
7449 default
=CONTAINER_INIT
,
7450 help=argparse
.SUPPRESS
)
7452 parser_rm_daemon
= subparsers
.add_parser(
7453 'rm-daemon', help='remove daemon instance')
7454 parser_rm_daemon
.set_defaults(func
=command_rm_daemon
)
7455 parser_rm_daemon
.add_argument(
7458 action
=CustomValidation
,
7459 help='daemon name (type.id)')
7460 parser_rm_daemon
.add_argument(
7463 help='cluster FSID')
7464 parser_rm_daemon
.add_argument(
7466 action
='store_true',
7467 help='proceed, even though this may destroy valuable data')
7468 parser_rm_daemon
.add_argument(
7469 '--force-delete-data',
7470 action
='store_true',
7471 help='delete valuable daemon data instead of making a backup')
7473 parser_rm_cluster
= subparsers
.add_parser(
7474 'rm-cluster', help='remove all daemons for a cluster')
7475 parser_rm_cluster
.set_defaults(func
=command_rm_cluster
)
7476 parser_rm_cluster
.add_argument(
7479 help='cluster FSID')
7480 parser_rm_cluster
.add_argument(
7482 action
='store_true',
7483 help='proceed, even though this may destroy valuable data')
7484 parser_rm_cluster
.add_argument(
7486 action
='store_true',
7487 help='do not remove log files')
7489 parser_run
= subparsers
.add_parser(
7490 'run', help='run a ceph daemon, in a container, in the foreground')
7491 parser_run
.set_defaults(func
=command_run
)
7492 parser_run
.add_argument(
7495 help='daemon name (type.id)')
7496 parser_run
.add_argument(
7499 help='cluster FSID')
7501 parser_shell
= subparsers
.add_parser(
7502 'shell', help='run an interactive shell inside a daemon container')
7503 parser_shell
.set_defaults(func
=command_shell
)
7504 parser_shell
.add_argument(
7506 help='cluster FSID')
7507 parser_shell
.add_argument(
7509 help='daemon name (type.id)')
7510 parser_shell
.add_argument(
7512 help='ceph.conf to pass through to the container')
7513 parser_shell
.add_argument(
7515 help='ceph.keyring to pass through to the container')
7516 parser_shell
.add_argument(
7518 help=('mount a file or directory in the container. '
7519 'Support multiple mounts. '
7520 'ie: `--mount /foo /bar:/bar`. '
7521 'When no destination is passed, default is /mnt'),
7523 parser_shell
.add_argument(
7527 help='set environment variable')
7528 parser_shell
.add_argument(
7529 'command', nargs
=argparse
.REMAINDER
,
7530 help='command (optional)')
7532 parser_enter
= subparsers
.add_parser(
7533 'enter', help='run an interactive shell inside a running daemon container')
7534 parser_enter
.set_defaults(func
=command_enter
)
7535 parser_enter
.add_argument(
7537 help='cluster FSID')
7538 parser_enter
.add_argument(
7541 help='daemon name (type.id)')
7542 parser_enter
.add_argument(
7543 'command', nargs
=argparse
.REMAINDER
,
7546 parser_ceph_volume
= subparsers
.add_parser(
7547 'ceph-volume', help='run ceph-volume inside a container')
7548 parser_ceph_volume
.set_defaults(func
=command_ceph_volume
)
7549 parser_ceph_volume
.add_argument(
7551 help='cluster FSID')
7552 parser_ceph_volume
.add_argument(
7554 help='JSON file with config and (client.bootrap-osd) key')
7555 parser_ceph_volume
.add_argument(
7557 help='ceph conf file')
7558 parser_ceph_volume
.add_argument(
7560 help='ceph.keyring to pass through to the container')
7561 parser_ceph_volume
.add_argument(
7563 action
='store_true',
7565 help='suppress ceph volume output from the log')
7566 parser_ceph_volume
.add_argument(
7567 'command', nargs
=argparse
.REMAINDER
,
7570 parser_unit
= subparsers
.add_parser(
7571 'unit', help="operate on the daemon's systemd unit")
7572 parser_unit
.set_defaults(func
=command_unit
)
7573 parser_unit
.add_argument(
7575 help='systemd command (start, stop, restart, enable, disable, ...)')
7576 parser_unit
.add_argument(
7578 help='cluster FSID')
7579 parser_unit
.add_argument(
7582 help='daemon name (type.id)')
7584 parser_logs
= subparsers
.add_parser(
7585 'logs', help='print journald logs for a daemon container')
7586 parser_logs
.set_defaults(func
=command_logs
)
7587 parser_logs
.add_argument(
7589 help='cluster FSID')
7590 parser_logs
.add_argument(
7593 help='daemon name (type.id)')
7594 parser_logs
.add_argument(
7595 'command', nargs
='*',
7596 help='additional journalctl args')
7598 parser_bootstrap
= subparsers
.add_parser(
7599 'bootstrap', help='bootstrap a cluster (mon + mgr daemons)')
7600 parser_bootstrap
.set_defaults(func
=command_bootstrap
)
7601 parser_bootstrap
.add_argument(
7603 help='ceph conf file to incorporate')
7604 parser_bootstrap
.add_argument(
7607 help='mon id (default: local hostname)')
7608 parser_bootstrap
.add_argument(
7610 help='mon IPs (e.g., [v2:localipaddr:3300,v1:localipaddr:6789])')
7611 parser_bootstrap
.add_argument(
7614 parser_bootstrap
.add_argument(
7617 help='mgr id (default: randomly generated)')
7618 parser_bootstrap
.add_argument(
7620 help='cluster FSID')
7621 parser_bootstrap
.add_argument(
7623 default
='/etc/ceph',
7624 help='directory to write config, keyring, and pub key files')
7625 parser_bootstrap
.add_argument(
7627 help='location to write keyring file with new cluster admin and mon keys')
7628 parser_bootstrap
.add_argument(
7630 help='location to write conf file to connect to new cluster')
7631 parser_bootstrap
.add_argument(
7632 '--output-pub-ssh-key',
7633 help="location to write the cluster's public SSH key")
7634 parser_bootstrap
.add_argument(
7636 action
='store_true',
7637 help='skip setup of ssh key on local host')
7638 parser_bootstrap
.add_argument(
7639 '--initial-dashboard-user',
7641 help='Initial user for the dashboard')
7642 parser_bootstrap
.add_argument(
7643 '--initial-dashboard-password',
7644 help='Initial password for the initial dashboard user')
7645 parser_bootstrap
.add_argument(
7646 '--ssl-dashboard-port',
7649 help='Port number used to connect with dashboard using SSL')
7650 parser_bootstrap
.add_argument(
7652 type=argparse
.FileType('r'),
7653 help='Dashboard key')
7654 parser_bootstrap
.add_argument(
7656 type=argparse
.FileType('r'),
7657 help='Dashboard certificate')
7659 parser_bootstrap
.add_argument(
7661 type=argparse
.FileType('r'),
7663 parser_bootstrap
.add_argument(
7664 '--ssh-private-key',
7665 type=argparse
.FileType('r'),
7666 help='SSH private key')
7667 parser_bootstrap
.add_argument(
7669 type=argparse
.FileType('r'),
7670 help='SSH public key')
7671 parser_bootstrap
.add_argument(
7674 help='set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users')
7676 parser_bootstrap
.add_argument(
7677 '--skip-mon-network',
7678 action
='store_true',
7679 help='set mon public_network based on bootstrap mon ip')
7680 parser_bootstrap
.add_argument(
7682 action
='store_true',
7683 help='do not enable the Ceph Dashboard')
7684 parser_bootstrap
.add_argument(
7685 '--dashboard-password-noupdate',
7686 action
='store_true',
7687 help='stop forced dashboard password change')
7688 parser_bootstrap
.add_argument(
7689 '--no-minimize-config',
7690 action
='store_true',
7691 help='do not assimilate and minimize the config file')
7692 parser_bootstrap
.add_argument(
7693 '--skip-ping-check',
7694 action
='store_true',
7695 help='do not verify that mon IP is pingable')
7696 parser_bootstrap
.add_argument(
7698 action
='store_true',
7699 help='do not pull the latest image before bootstrapping')
7700 parser_bootstrap
.add_argument(
7702 action
='store_true',
7703 help='Do not configure firewalld')
7704 parser_bootstrap
.add_argument(
7705 '--allow-overwrite',
7706 action
='store_true',
7707 help='allow overwrite of existing --output-* config/keyring/ssh files')
7708 parser_bootstrap
.add_argument(
7709 '--allow-fqdn-hostname',
7710 action
='store_true',
7711 help='allow hostname that is fully-qualified (contains ".")')
7712 parser_bootstrap
.add_argument(
7713 '--allow-mismatched-release',
7714 action
='store_true',
7715 help="allow bootstrap of ceph that doesn't match this version of cephadm")
7716 parser_bootstrap
.add_argument(
7717 '--skip-prepare-host',
7718 action
='store_true',
7719 help='Do not prepare host')
7720 parser_bootstrap
.add_argument(
7721 '--orphan-initial-daemons',
7722 action
='store_true',
7723 help='Set mon and mgr service to `unmanaged`, Do not create the crash service')
7724 parser_bootstrap
.add_argument(
7725 '--skip-monitoring-stack',
7726 action
='store_true',
7727 help='Do not automatically provision monitoring stack (prometheus, grafana, alertmanager, node-exporter)')
7728 parser_bootstrap
.add_argument(
7730 help='Apply cluster spec after bootstrap (copy ssh key, add hosts and apply services)')
7732 parser_bootstrap
.add_argument(
7733 '--shared_ceph_folder',
7734 metavar
='CEPH_SOURCE_FOLDER',
7735 help='Development mode. Several folders in containers are volumes mapped to different sub-folders in the ceph source folder')
7737 parser_bootstrap
.add_argument(
7739 help='url for custom registry')
7740 parser_bootstrap
.add_argument(
7741 '--registry-username',
7742 help='username for custom registry')
7743 parser_bootstrap
.add_argument(
7744 '--registry-password',
7745 help='password for custom registry')
7746 parser_bootstrap
.add_argument(
7748 help='json file with custom registry login info (URL, Username, Password)')
7749 parser_bootstrap
.add_argument(
7751 action
='store_true',
7752 default
=CONTAINER_INIT
,
7753 help=argparse
.SUPPRESS
)
7754 parser_bootstrap
.add_argument(
7756 action
='store_true',
7757 help='Automatically deploy cephadm metadata exporter to each node')
7758 parser_bootstrap
.add_argument(
7759 '--exporter-config',
7760 action
=CustomValidation
,
7761 help=f
'Exporter configuration information in JSON format (providing: {", ".join(CephadmDaemon.config_requirements)}, port information)')
7762 parser_bootstrap
.add_argument(
7763 '--cluster-network',
7764 help='subnet to use for cluster replication, recovery and heartbeats (in CIDR notation network/mask)')
7766 parser_deploy
= subparsers
.add_parser(
7767 'deploy', help='deploy a daemon')
7768 parser_deploy
.set_defaults(func
=command_deploy
)
7769 parser_deploy
.add_argument(
7772 action
=CustomValidation
,
7773 help='daemon name (type.id)')
7774 parser_deploy
.add_argument(
7777 help='cluster FSID')
7778 parser_deploy
.add_argument(
7780 help='config file for new daemon')
7781 parser_deploy
.add_argument(
7783 help='Additional configuration information in JSON format')
7784 parser_deploy
.add_argument(
7786 help='keyring for new daemon')
7787 parser_deploy
.add_argument(
7789 help='key for new daemon')
7790 parser_deploy
.add_argument(
7792 help='OSD uuid, if creating an OSD container')
7793 parser_deploy
.add_argument(
7795 action
='store_true',
7796 help='Do not configure firewalld')
7797 parser_deploy
.add_argument(
7799 help='List of tcp ports to open in the host firewall')
7800 parser_deploy
.add_argument(
7802 action
='store_true',
7803 help='Reconfigure a previously deployed daemon')
7804 parser_deploy
.add_argument(
7806 action
='store_true',
7807 help='Allow SYS_PTRACE on daemon container')
7808 parser_deploy
.add_argument(
7810 action
='store_true',
7811 default
=CONTAINER_INIT
,
7812 help=argparse
.SUPPRESS
)
7813 parser_deploy
.add_argument(
7815 help='Container memory request/target'
7817 parser_deploy
.add_argument(
7819 help='Container memory hard limit'
7821 parser_deploy
.add_argument(
7823 help='JSON dict of additional metadata'
7826 parser_check_host
= subparsers
.add_parser(
7827 'check-host', help='check host configuration')
7828 parser_check_host
.set_defaults(func
=command_check_host
)
7829 parser_check_host
.add_argument(
7830 '--expect-hostname',
7831 help='Check that hostname matches an expected value')
7833 parser_prepare_host
= subparsers
.add_parser(
7834 'prepare-host', help='prepare a host for cephadm use')
7835 parser_prepare_host
.set_defaults(func
=command_prepare_host
)
7836 parser_prepare_host
.add_argument(
7837 '--expect-hostname',
7838 help='Set hostname')
7840 parser_add_repo
= subparsers
.add_parser(
7841 'add-repo', help='configure package repository')
7842 parser_add_repo
.set_defaults(func
=command_add_repo
)
7843 parser_add_repo
.add_argument(
7845 help='use latest version of a named release (e.g., {})'.format(LATEST_STABLE_RELEASE
))
7846 parser_add_repo
.add_argument(
7848 help='use specific upstream version (x.y.z)')
7849 parser_add_repo
.add_argument(
7851 help='use specified bleeding edge build from git branch or tag')
7852 parser_add_repo
.add_argument(
7854 help='use specified bleeding edge build from git commit')
7855 parser_add_repo
.add_argument(
7857 help='specify alternative GPG key location')
7858 parser_add_repo
.add_argument(
7860 default
='https://download.ceph.com',
7861 help='specify alternative repo location')
7864 parser_rm_repo
= subparsers
.add_parser(
7865 'rm-repo', help='remove package repository configuration')
7866 parser_rm_repo
.set_defaults(func
=command_rm_repo
)
7868 parser_install
= subparsers
.add_parser(
7869 'install', help='install ceph package(s)')
7870 parser_install
.set_defaults(func
=command_install
)
7871 parser_install
.add_argument(
7872 'packages', nargs
='*',
7873 default
=['cephadm'],
7876 parser_registry_login
= subparsers
.add_parser(
7877 'registry-login', help='log host into authenticated registry')
7878 parser_registry_login
.set_defaults(func
=command_registry_login
)
7879 parser_registry_login
.add_argument(
7881 help='url for custom registry')
7882 parser_registry_login
.add_argument(
7883 '--registry-username',
7884 help='username for custom registry')
7885 parser_registry_login
.add_argument(
7886 '--registry-password',
7887 help='password for custom registry')
7888 parser_registry_login
.add_argument(
7890 help='json file with custom registry login info (URL, Username, Password)')
7891 parser_registry_login
.add_argument(
7893 help='cluster FSID')
7895 parser_gather_facts
= subparsers
.add_parser(
7896 'gather-facts', help='gather and return host related information (JSON format)')
7897 parser_gather_facts
.set_defaults(func
=command_gather_facts
)
7899 parser_exporter
= subparsers
.add_parser(
7900 'exporter', help='Start cephadm in exporter mode (web service), providing host/daemon/disk metadata')
7901 parser_exporter
.add_argument(
7905 help='fsid of the cephadm exporter to run against')
7906 parser_exporter
.add_argument(
7909 default
=int(CephadmDaemon
.default_port
),
7910 help='port number for the cephadm exporter service')
7911 parser_exporter
.add_argument(
7914 default
=get_hostname().split('.')[0],
7915 help='daemon identifer for the exporter')
7916 parser_exporter
.set_defaults(func
=command_exporter
)
7918 parser_maintenance
= subparsers
.add_parser(
7919 'host-maintenance', help='Manage the maintenance state of a host')
7920 parser_maintenance
.add_argument(
7922 help='cluster FSID')
7923 parser_maintenance
.add_argument(
7924 'maintenance_action',
7926 choices
=['enter', 'exit'],
7927 help='Maintenance action - enter maintenance, or exit maintenance')
7928 parser_maintenance
.set_defaults(func
=command_maintenance
)
7930 parser_verify_prereqs
= subparsers
.add_parser(
7932 help='verify system prerequisites for a given service are met on this host')
7933 parser_verify_prereqs
.set_defaults(func
=command_verify_prereqs
)
7934 parser_verify_prereqs
.add_argument(
7937 help='service type of service to whose prereqs will be checked')
7942 def _parse_args(av
):
7943 parser
= _get_parser()
7945 args
= parser
.parse_args(av
)
7946 if 'command' in args
and args
.command
and args
.command
[0] == '--':
7949 # workaround argparse to deprecate the subparser `--container-init` flag
7950 # container_init and no_container_init must always be mutually exclusive
7951 container_init_args
= ('--container-init', '--no-container-init')
7952 if set(container_init_args
).issubset(av
):
7953 parser
.error('argument %s: not allowed with argument %s' % (container_init_args
))
7954 elif '--container-init' in av
:
7955 args
.no_container_init
= not args
.container_init
7957 args
.container_init
= not args
.no_container_init
7958 assert args
.container_init
is not args
.no_container_init
7963 def cephadm_init_ctx(args
: List
[str]) -> Optional
[CephadmContext
]:
7965 ctx
= CephadmContext()
7966 ctx
.set_args(_parse_args(args
))
7970 def cephadm_init(args
: List
[str]) -> Optional
[CephadmContext
]:
7973 ctx
= cephadm_init_ctx(args
)
7974 assert ctx
is not None
7976 # Logger configuration
7977 if not os
.path
.exists(LOG_DIR
):
7978 os
.makedirs(LOG_DIR
)
7979 dictConfig(logging_config
)
7980 logger
= logging
.getLogger()
7983 for handler
in logger
.handlers
:
7984 if handler
.name
== 'console':
7985 handler
.setLevel(logging
.DEBUG
)
7987 if not ctx
.has_function():
7988 sys
.stderr
.write('No command specified; pass -h or --help for usage\n')
7997 if os
.geteuid() != 0:
7998 sys
.stderr
.write('ERROR: cephadm should be run as root\n')
8004 ctx
= cephadm_init(av
)
8005 if not ctx
: # error, exit
8010 ctx
.container_engine
= find_container_engine(ctx
)
8011 if ctx
.func
not in \
8012 [command_check_host
, command_prepare_host
, command_add_repo
]:
8013 check_container_engine(ctx
)
8019 logger
.error('ERROR: %s' % e
)
8026 if __name__
== '__main__':