DEFAULT_IMAGE='docker.io/ceph/ceph:v15'
DEFAULT_IMAGE_IS_MASTER=False
-LATEST_STABLE_RELEASE='octopus'
-DATA_DIR='/var/lib/ceph'
-LOG_DIR='/var/log/ceph'
-LOCK_DIR='/run/cephadm'
-LOGROTATE_DIR='/etc/logrotate.d'
-UNIT_DIR='/etc/systemd/system'
-LOG_DIR_MODE=0o770
-DATA_DIR_MODE=0o700
+LATEST_STABLE_RELEASE = 'octopus'
+DATA_DIR = '/var/lib/ceph'
+LOG_DIR = '/var/log/ceph'
+LOCK_DIR = '/run/cephadm'
+LOGROTATE_DIR = '/etc/logrotate.d'
+UNIT_DIR = '/etc/systemd/system'
+LOG_DIR_MODE = 0o770
+DATA_DIR_MODE = 0o700
CONTAINER_PREFERENCE = ['podman', 'docker'] # prefer podman to docker
-CUSTOM_PS1=r'[ceph: \u@\h \W]\$ '
-DEFAULT_TIMEOUT=None # in seconds
-DEFAULT_RETRY=10
-SHELL_DEFAULT_CONF='/etc/ceph/ceph.conf'
-SHELL_DEFAULT_KEYRING='/etc/ceph/ceph.client.admin.keyring'
+CUSTOM_PS1 = r'[ceph: \u@\h \W]\$ '
+DEFAULT_TIMEOUT = None # in seconds
+DEFAULT_RETRY = 10
+SHELL_DEFAULT_CONF = '/etc/ceph/ceph.conf'
+SHELL_DEFAULT_KEYRING = '/etc/ceph/ceph.client.admin.keyring'
"""
You can invoke cephadm in two ways:
import argparse
import datetime
import fcntl
+import ipaddress
import json
import logging
import os
import platform
+import pwd
import random
import re
import select
import time
import errno
try:
- from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable
+ from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO
except ImportError:
pass
import uuid
else:
from urllib2 import urlopen, HTTPError
+if sys.version_info > (3, 0):
+ unicode = str
+
container_path = ''
cached_stdin = None
red = '\033[31m'
end = '\033[0m'
+
class Error(Exception):
pass
+
class TimeoutExpired(Error):
pass
##################################
+
class Ceph(object):
daemons = ('mon', 'mgr', 'mds', 'osd', 'rgw', 'rbd-mirror',
'crash')
##################################
+
class Monitoring(object):
"""Define the configs for the monitoring containers"""
##################################
+
class NFSGanesha(object):
"""Defines a NFS-Ganesha container"""
# type: (str, Union[int, str]) -> NFSGanesha
return cls(fsid, daemon_id, get_parm(args.config_json), args.image)
- @staticmethod
- def port_in_use():
- # type () -> None
- for (srv, port) in NFSGanesha.port_map.items():
- if port_in_use(port):
- msg = 'TCP port {} required for {} is already in use'.format(port, srv)
- raise Error(msg)
-
@staticmethod
def get_container_mounts(data_dir):
# type: (str) -> Dict[str, str]
volume_mounts = self.get_container_mounts(data_dir)
envs = self.get_container_envs()
- logger.info('Creating RADOS grace for action: %s' % (action))
+ logger.info('Creating RADOS grace for action: %s' % action)
c = CephContainer(
image=self.image,
entrypoint=entrypoint,
args=args,
volume_mounts=volume_mounts,
- cname=self.get_container_name(desc='grace-%s' % (action)),
+ cname=self.get_container_name(desc='grace-%s' % action),
envs=envs
)
return c
##################################
+
class CephIscsi(object):
"""Defines a Ceph-Iscsi container"""
mounts['/dev/log'] = '/dev/log:z'
return mounts
+ @staticmethod
+ def get_container_binds():
+ # type: () -> List[List[str]]
+ binds = []
+ lib_modules = ['type=bind',
+ 'source=/lib/modules',
+ 'destination=/lib/modules',
+ 'ro=true']
+ binds.append(lib_modules)
+ return binds
+
@staticmethod
def get_version(container_id):
# type: (str) -> Optional[str]
[container_path, 'exec', container_id,
'/usr/bin/python3', '-c', "import pkg_resources; print(pkg_resources.require('ceph_iscsi')[0].version)"])
if code == 0:
- version = out
+ version = out.strip()
return version
def validate(self):
"umount {0}; fi".format(mount_path)
return cmd.split()
+ def get_tcmu_runner_container(self):
+ # type: () -> CephContainer
+ tcmu_container = get_container(self.fsid, self.daemon_type, self.daemon_id)
+ tcmu_container.entrypoint = "/usr/bin/tcmu-runner"
+ tcmu_container.volume_mounts.pop("/dev/log")
+ tcmu_container.volume_mounts["/dev"] = "/dev:z"
+ tcmu_container.cname = self.get_container_name(desc='tcmu')
+ return tcmu_container
+
##################################
+
def get_supported_daemons():
# type: () -> List[str]
supported_daemons = list(Ceph.daemons)
##################################
+
def attempt_bind(s, address, port):
# type: (socket.socket, str, int) -> None
try:
finally:
s.close()
+
def port_in_use(port_num):
# type: (int) -> bool
"""Detect whether a port is in use on the local machine - IPv4 and IPv6"""
else:
return False
+
def check_ip_port(ip, port):
# type: (str, int) -> None
if not args.skip_ping_check:
except NameError:
TimeoutError = OSError
+
class Timeout(TimeoutError):
"""
Raised when the lock could not be acquired in *timeout*
class FileLock(object):
- def __init__(self, name, timeout = -1):
+ def __init__(self, name, timeout=-1):
if not os.path.exists(LOCK_DIR):
os.mkdir(LOCK_DIR, 0o700)
self._lock_file = os.path.join(LOCK_DIR, name + '.lock')
lock_id, lock_filename, poll_intervall
)
time.sleep(poll_intervall)
- except:
+ except: # noqa
# Something did go wrong, so decrement the counter.
self._lock_counter = max(0, self._lock_counter - 1)
raise
return _Acquire_ReturnProxy(lock = self)
- def release(self, force = False):
+ def release(self, force=False):
"""
Releases the file lock.
Please note, that the lock is only completly released, if the lock
return None
def __del__(self):
- self.release(force = True)
+ self.release(force=True)
return None
-
def _acquire(self):
open_mode = os.O_RDWR | os.O_CREAT | os.O_TRUNC
fd = os.open(self._lock_file, open_mode)
# https://stackoverflow.com/questions/17708885/flock-removing-locked-file-without-race-condition
fd = self._lock_file_fd
self._lock_file_fd = None
- fcntl.flock(fd, fcntl.LOCK_UN)
- os.close(fd)
+ fcntl.flock(fd, fcntl.LOCK_UN) # type: ignore
+ os.close(fd) # type: ignore
return None
end_time = start_time + timeout
while not stop:
if end_time and (time.time() >= end_time):
- logger.info(desc + ':timeout after %s seconds' % timeout)
stop = True
- process.kill()
+ if process.poll() is None:
+ logger.info(desc + ':timeout after %s seconds' % timeout)
+ process.kill()
if reads and process.poll() is not None:
# we want to stop, but first read off anything remaining
# on stdout/stderr
assert False
except (IOError, OSError):
pass
+ logger.debug(desc + ':profile rt=%s, stop=%s, exit=%s, reads=%s'
+ % (time.time()-start_time, stop, process.poll(), reads))
returncode = process.wait()
##################################
+
def is_available(what, func):
# type: (str, Callable[[], bool]) -> None
"""
:param func: the callable object that determines availability
"""
retry = args.retry
- logger.info('Waiting for %s...' % (what))
+ logger.info('Waiting for %s...' % what)
num = 1
while True:
if func():
logger.info('%s is available'
- % (what))
+ % what)
break
elif num > retry:
raise Error('%s not available after %s tries'
return cp
+
def pathify(p):
# type: (str) -> str
p = os.path.expanduser(p)
return os.path.abspath(p)
+
def get_file_timestamp(fn):
# type: (str) -> Optional[str]
try:
except Exception as e:
return None
+
def try_convert_datetime(s):
# type: (str) -> Optional[str]
# This is super irritating because
pass
return None
+
def get_podman_version():
# type: () -> Tuple[int, ...]
if 'podman' not in container_path:
out, _, _ = call_throws([container_path, '--version'])
return _parse_podman_version(out)
+
def _parse_podman_version(out):
# type: (str) -> Tuple[int, ...]
_, _, version_str = out.strip().split()
# type: () -> str
return socket.gethostname()
+
def get_fqdn():
# type: () -> str
return socket.getfqdn() or socket.gethostname()
+
def get_arch():
# type: () -> str
return platform.uname().machine
+
def generate_service_id():
# type: () -> str
return get_hostname() + '.' + ''.join(random.choice(string.ascii_lowercase)
for _ in range(6))
+
def generate_password():
# type: () -> str
return ''.join(random.choice(string.ascii_lowercase + string.digits)
for i in range(10))
+
def normalize_container_id(i):
# type: (str) -> str
# docker adds the sha256: prefix, but AFAICS both
i = i[len(prefix):]
return i
+
def make_fsid():
# type: () -> str
return str(uuid.uuid1())
+
def is_fsid(s):
# type: (str) -> bool
try:
return False
return True
+
def infer_fsid(func):
"""
If we only find a single fsid in /var/lib/ceph/*, use that
logger.debug('Using specified fsid: %s' % args.fsid)
return func()
- fsids = set()
+ fsids_set = set()
daemon_list = list_daemons(detail=False)
for daemon in daemon_list:
- if 'name' not in args or not args.name:
- fsids.add(daemon['fsid'])
+ if not is_fsid(daemon['fsid']):
+ # 'unknown' fsid
+ continue
+ elif 'name' not in args or not args.name:
+ # args.name not specified
+ fsids_set.add(daemon['fsid'])
elif daemon['name'] == args.name:
- fsids.add(daemon['fsid'])
- fsids = list(fsids)
+ # args.name is a match
+ fsids_set.add(daemon['fsid'])
+ fsids = sorted(fsids_set)
if not fsids:
# some commands do not always require an fsid
return _infer_fsid
+
def infer_config(func):
"""
If we find a MON daemon, use the config from that container
return _infer_config
+
def _get_default_image():
if DEFAULT_IMAGE_IS_MASTER:
warn = '''This is a development version of cephadm.
logger.warning('{}{}{}'.format(termcolor.yellow, line, termcolor.end))
return DEFAULT_IMAGE
+
def infer_image(func):
"""
Use the most recent ceph image
return _infer_image
+
def default_image(func):
@wraps(func)
def _default_image():
return _default_image
+
def get_last_local_ceph_image():
"""
:return: The most recent local ceph image (already pulled)
return r
return None
+
def write_tmp(s, uid, gid):
# type: (str, int, int) -> Any
tmp_f = tempfile.NamedTemporaryFile(mode='w',
return tmp_f
+
def makedirs(dir, uid, gid, mode):
# type: (str, int, int, int) -> None
if not os.path.exists(dir):
os.chown(dir, uid, gid)
os.chmod(dir, mode) # the above is masked by umask...
+
def get_data_dir(fsid, t, n):
# type: (str, str, Union[int, str]) -> str
return os.path.join(args.data_dir, fsid, '%s.%s' % (t, n))
+
def get_log_dir(fsid):
# type: (str) -> str
return os.path.join(args.log_dir, fsid)
+
def make_data_dir_base(fsid, uid, gid):
# type: (str, int, int) -> str
data_dir_base = os.path.join(args.data_dir, fsid)
DATA_DIR_MODE)
return data_dir_base
+
def make_data_dir(fsid, daemon_type, daemon_id, uid=None, gid=None):
- # type: (str, str, Union[int, str], int, int) -> str
- if not uid or not gid:
- (uid, gid) = extract_uid_gid()
+ # type: (str, str, Union[int, str], Optional[int], Optional[int]) -> str
+ if uid is None or gid is None:
+ uid, gid = extract_uid_gid()
make_data_dir_base(fsid, uid, gid)
data_dir = get_data_dir(fsid, daemon_type, daemon_id)
makedirs(data_dir, uid, gid, DATA_DIR_MODE)
return data_dir
+
def make_log_dir(fsid, uid=None, gid=None):
- # type: (str, int, int) -> str
- if not uid or not gid:
- (uid, gid) = extract_uid_gid()
+ # type: (str, Optional[int], Optional[int]) -> str
+ if uid is None or gid is None:
+ uid, gid = extract_uid_gid()
log_dir = get_log_dir(fsid)
makedirs(log_dir, uid, gid, LOG_DIR_MODE)
return log_dir
+
def make_var_run(fsid, uid, gid):
# type: (str, int, int) -> None
call_throws(['install', '-d', '-m0770', '-o', str(uid), '-g', str(gid),
'/var/run/ceph/%s' % fsid])
+
def copy_tree(src, dst, uid=None, gid=None):
- # type: (List[str], str, int, int) -> None
+ # type: (List[str], str, Optional[int], Optional[int]) -> None
"""
Copy a directory tree from src to dst
"""
def copy_files(src, dst, uid=None, gid=None):
- # type: (List[str], str, int, int) -> None
+ # type: (List[str], str, Optional[int], Optional[int]) -> None
"""
Copy a files from src to dst
"""
logger.debug('chown %s:%s \'%s\'' % (uid, gid, dst_file))
os.chown(dst_file, uid, gid)
+
def move_files(src, dst, uid=None, gid=None):
- # type: (List[str], str, int, int) -> None
+ # type: (List[str], str, Optional[int], Optional[int]) -> None
"""
Move files from src to dst
"""
logger.debug('chown %s:%s \'%s\'' % (uid, gid, dst_file))
os.chown(dst_file, uid, gid)
+
## copied from distutils ##
def find_executable(executable, path=None):
"""Tries to find 'executable' in the directories listed in 'path'.
return f
return None
+
def find_program(filename):
# type: (str) -> str
name = find_executable(filename)
raise ValueError('%s not found' % filename)
return name
+
def get_unit_name(fsid, daemon_type, daemon_id=None):
# type: (str, str, Optional[Union[int, str]]) -> str
# accept either name or type + id
else:
return 'ceph-%s@%s' % (fsid, daemon_type)
+
def get_unit_name_by_daemon_name(fsid, name):
daemon = get_daemon_description(fsid, name)
try:
except KeyError:
raise Error('Failed to get unit name for {}'.format(daemon))
+
def check_unit(unit_name):
# type: (str) -> Tuple[bool, str, bool]
# NOTE: we ignore the exit code here because systemctl outputs
state = 'unknown'
return (enabled, state, installed)
+
def check_units(units, enabler=None):
# type: (List[str], Optional[Packager]) -> bool
for u in units:
enabler.enable_service(u)
return False
+
def get_legacy_config_fsid(cluster, legacy_dir=None):
- # type: (str, str) -> Optional[str]
+ # type: (str, Optional[str]) -> Optional[str]
config_file = '/etc/ceph/%s.conf' % cluster
if legacy_dir is not None:
config_file = os.path.abspath(legacy_dir + config_file)
return config.get('global', 'fsid')
return None
+
def get_legacy_daemon_fsid(cluster, daemon_type, daemon_id, legacy_dir=None):
- # type: (str, str, Union[int, str], str) -> Optional[str]
+ # type: (str, str, Union[int, str], Optional[str]) -> Optional[str]
fsid = None
if daemon_type == 'osd':
try:
fsid = get_legacy_config_fsid(cluster, legacy_dir=legacy_dir)
return fsid
+
def get_daemon_args(fsid, daemon_type, daemon_id):
# type: (str, str, Union[int, str]) -> List[str]
r = list() # type: List[str]
peers = config.get('peers', list()) # type: ignore
for peer in peers:
r += ["--cluster.peer={}".format(peer)]
+ # some alertmanager, by default, look elsewhere for a config
+ r += ["--config.file=/etc/alertmanager/alertmanager.yml"]
elif daemon_type == NFSGanesha.daemon_type:
nfs_ganesha = NFSGanesha.init(fsid, daemon_id)
r += nfs_ganesha.get_daemon_args()
return r
+
def create_daemon_dirs(fsid, daemon_type, daemon_id, uid, gid,
config=None, keyring=None):
# type: (str, str, Union[int, str], int, int, Optional[str], Optional[str]) -> None
makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755)
makedirs(os.path.join(data_dir_root, config_dir, 'data'), uid, gid, 0o755)
-
# populate the config directory for the component from the config-json
for fname in required_files:
if 'files' in config: # type: ignore
ceph_iscsi = CephIscsi.init(fsid, daemon_id)
ceph_iscsi.create_daemon_dirs(data_dir, uid, gid)
+
def get_parm(option):
# type: (str) -> Dict[str, str]
else:
return js
+
def get_config_and_keyring():
# type: () -> Tuple[Optional[str], Optional[str]]
config = None
with open(args.keyring, 'r') as f:
keyring = f.read()
- return (config, keyring)
+ return config, keyring
+
+
+def get_container_binds(fsid, daemon_type, daemon_id):
+ # type: (str, str, Union[int, str, None]) -> List[List[str]]
+ binds = list()
+
+ if daemon_type == CephIscsi.daemon_type:
+ assert daemon_id
+ binds.extend(CephIscsi.get_container_binds())
+
+ return binds
+
def get_container_mounts(fsid, daemon_type, daemon_id,
no_config=False):
mounts[os.path.join(data_dir, 'etc/grafana/provisioning/datasources')] = '/etc/grafana/provisioning/datasources:Z'
mounts[os.path.join(data_dir, 'etc/grafana/certs')] = '/etc/grafana/certs:Z'
elif daemon_type == 'alertmanager':
- mounts[os.path.join(data_dir, 'etc/alertmanager')] = '/alertmanager:Z'
+ mounts[os.path.join(data_dir, 'etc/alertmanager')] = '/etc/alertmanager:Z'
if daemon_type == NFSGanesha.daemon_type:
assert daemon_id
return mounts
+
def get_container(fsid, daemon_type, daemon_id,
privileged=False,
ptrace=False,
entrypoint = ''
name = ''
- ceph_args = [] # type: List[str]
+ ceph_args = [] # type: List[str]
if daemon_type in Monitoring.components:
uid, gid = extract_uid_gid_monitoring(daemon_type)
- m = Monitoring.components[daemon_type] # type: ignore
- metadata = m.get('image', dict()) # type: ignore
monitoring_args = [
'--user',
str(uid),
# FIXME: disable cpu/memory limits for the time being (not supported
# by ubuntu 18.04 kernel!)
- #'--cpus',
- #metadata.get('cpus', '2'),
- #'--memory',
- #metadata.get('memory', '4GB')
]
container_args.extend(monitoring_args)
elif daemon_type == 'crash':
elif daemon_type in Ceph.daemons:
ceph_args = ['-n', name, '-f']
- envs=[] # type: List[str]
+ envs = [] # type: List[str]
if daemon_type == NFSGanesha.daemon_type:
envs.extend(NFSGanesha.get_container_envs())
args=ceph_args + get_daemon_args(fsid, daemon_type, daemon_id),
container_args=container_args,
volume_mounts=get_container_mounts(fsid, daemon_type, daemon_id),
+ bind_mounts=get_container_binds(fsid, daemon_type, daemon_id),
cname='ceph-%s-%s.%s' % (fsid, daemon_type, daemon_id),
envs=envs,
privileged=privileged,
ptrace=ptrace,
)
+
def extract_uid_gid(img='', file_path='/var/lib/ceph'):
- # type: (str, str) -> Tuple[int, int]
+ # type: (str, Union[str, List[str]]) -> Tuple[int, int]
if not img:
img = args.image
- out = CephContainer(
- image=img,
- entrypoint='stat',
- args=['-c', '%u %g', file_path]
- ).run()
- (uid, gid) = out.split(' ')
- return (int(uid), int(gid))
+ if isinstance(file_path, str):
+ paths = [file_path]
+ else:
+ paths = file_path
+
+ for fp in paths:
+ try:
+ out = CephContainer(
+ image=img,
+ entrypoint='stat',
+ args=['-c', '%u %g', fp]
+ ).run()
+ uid, gid = out.split(' ')
+ return int(uid), int(gid)
+ except RuntimeError:
+ pass
+ raise RuntimeError('uid/gid not found')
+
def deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid,
config=None, keyring=None,
osd_fsid=None,
- reconfig=False):
- # type: (str, str, Union[int, str], CephContainer, int, int, Optional[str], Optional[str], Optional[str], Optional[bool]) -> None
+ reconfig=False,
+ ports=None):
+ # type: (str, str, Union[int, str], CephContainer, int, int, Optional[str], Optional[str], Optional[str], Optional[bool], Optional[List[int]]) -> None
+
+ ports = ports or []
+ if any([port_in_use(port) for port in ports]):
+ raise Error("TCP Port(s) '{}' required for {} already in use".format(",".join(map(str, ports)), daemon_type))
+
data_dir = get_data_dir(fsid, daemon_type, daemon_id)
if reconfig and not os.path.exists(data_dir):
raise Error('cannot reconfig, data path %s does not exist' % data_dir)
update_firewalld(daemon_type)
+ # Open ports explicitly required for the daemon
+ if ports:
+ fw = Firewalld()
+ fw.open_ports(ports)
+ fw.apply_rules()
+
if reconfig and daemon_type not in Ceph.daemons:
# ceph daemons do not need a restart; others (presumably) do to pick
# up the new config
call_throws(['systemctl', 'restart',
get_unit_name(fsid, daemon_type, daemon_id)])
+def _write_container_cmd_to_bash(file_obj, container, comment=None, background=False):
+ # type: (IO[str], CephContainer, Optional[str], Optional[bool]) -> None
+ if comment:
+ # Sometimes adding a comment, espectially if there are multiple containers in one
+ # unit file, makes it easier to read and grok.
+ file_obj.write('# ' + comment + '\n')
+ # Sometimes, adding `--rm` to a run_cmd doesn't work. Let's remove the container manually
+ file_obj.write('! '+ ' '.join(container.rm_cmd()) + '\n')
+ # Sometimes, `podman rm` doesn't find the container. Then you'll have to add `--storage`
+ if 'podman' in container_path:
+ file_obj.write('! '+ ' '.join(container.rm_cmd(storage=True)) + '\n')
+
+ # container run command
+ file_obj.write(' '.join(container.run_cmd()) + (' &' if background else '') + '\n')
+
def deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c,
enable=True, start=True,
osd_fsid=None):
# cmd
data_dir = get_data_dir(fsid, daemon_type, daemon_id)
with open(data_dir + '/unit.run.new', 'w') as f:
+ f.write('set -e\n')
# pre-start cmd(s)
if daemon_type == 'osd':
# osds have a pre-start step
assert osd_fsid
- f.write('# Simple OSDs need chown on startup:\n')
- for n in ['block', 'block.db', 'block.wal']:
- p = os.path.join(data_dir, n)
- f.write('[ ! -L {p} ] || chown {uid}:{gid} {p}\n'.format(p=p, uid=uid, gid=gid))
- f.write('# LVM OSDs use ceph-volume lvm activate:\n')
- prestart = CephContainer(
- image=args.image,
- entrypoint='/usr/sbin/ceph-volume',
- args=[
- 'lvm', 'activate',
- str(daemon_id), osd_fsid,
- '--no-systemd'
- ],
- privileged=True,
- volume_mounts=get_container_mounts(fsid, daemon_type, daemon_id),
- cname='ceph-%s-%s.%s-activate' % (fsid, daemon_type, daemon_id),
- )
- f.write(' '.join(prestart.run_cmd()) + '\n')
+ simple_fn = os.path.join('/etc/ceph/osd',
+ '%s-%s.json.adopted-by-cephadm' % (daemon_id, osd_fsid))
+ if os.path.exists(simple_fn):
+ f.write('# Simple OSDs need chown on startup:\n')
+ for n in ['block', 'block.db', 'block.wal']:
+ p = os.path.join(data_dir, n)
+ f.write('[ ! -L {p} ] || chown {uid}:{gid} {p}\n'.format(p=p, uid=uid, gid=gid))
+ else:
+ f.write('# LVM OSDs use ceph-volume lvm activate:\n')
+ prestart = CephContainer(
+ image=args.image,
+ entrypoint='/usr/sbin/ceph-volume',
+ args=[
+ 'lvm', 'activate',
+ str(daemon_id), osd_fsid,
+ '--no-systemd'
+ ],
+ privileged=True,
+ volume_mounts=get_container_mounts(fsid, daemon_type, daemon_id),
+ bind_mounts=get_container_binds(fsid, daemon_type, daemon_id),
+ cname='ceph-%s-%s.%s-activate' % (fsid, daemon_type, daemon_id),
+ )
+ f.write(' '.join(prestart.run_cmd()) + '\n')
elif daemon_type == NFSGanesha.daemon_type:
# add nfs to the rados grace db
nfs_ganesha = NFSGanesha.init(fsid, daemon_id)
f.write(' '.join(prestart.run_cmd()) + '\n')
elif daemon_type == CephIscsi.daemon_type:
f.write(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=True)) + '\n')
+ ceph_iscsi = CephIscsi.init(fsid, daemon_id)
+ tcmu_container = ceph_iscsi.get_tcmu_runner_container()
+ _write_container_cmd_to_bash(f, tcmu_container, 'iscsi tcmu-runnter container', background=True)
if daemon_type in Ceph.daemons:
install_path = find_program('install')
f.write('{install_path} -d -m0770 -o {uid} -g {gid} /var/run/ceph/{fsid}\n'.format(install_path=install_path, fsid=fsid, uid=uid, gid=gid))
- # container run command
- f.write(' '.join(c.run_cmd()) + '\n')
+ _write_container_cmd_to_bash(f, c, '%s.%s' % (daemon_type, str(daemon_id)))
os.fchmod(f.fileno(), 0o600)
os.rename(data_dir + '/unit.run.new',
data_dir + '/unit.run')
],
privileged=True,
volume_mounts=get_container_mounts(fsid, daemon_type, daemon_id),
+ bind_mounts=get_container_binds(fsid, daemon_type, daemon_id),
cname='ceph-%s-%s.%s-deactivate' % (fsid, daemon_type,
daemon_id),
)
poststop = nfs_ganesha.get_rados_grace_container('remove')
f.write(' '.join(poststop.run_cmd()) + '\n')
elif daemon_type == CephIscsi.daemon_type:
+ # make sure we also stop the tcmu container
+ ceph_iscsi = CephIscsi.init(fsid, daemon_id)
+ tcmu_container = ceph_iscsi.get_tcmu_runner_container()
+ f.write('! '+ ' '.join(tcmu_container.stop_cmd()) + '\n')
f.write(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=False)) + '\n')
os.fchmod(f.fileno(), 0o600)
os.rename(data_dir + '/unit.poststop.new',
if start:
call_throws(['systemctl', 'start', unit_name])
-def update_firewalld(daemon_type):
- # type: (str) -> None
- if args.skip_firewalld:
- return
- cmd = find_executable('firewall-cmd')
- if not cmd:
- logger.debug('firewalld does not appear to be present')
- return
- (enabled, state, _) = check_unit('firewalld.service')
- if not enabled:
- logger.debug('firewalld.service is not enabled')
- return
-
- fw_services = []
- fw_ports = []
- if daemon_type == 'mon':
- fw_services.append('ceph-mon')
- elif daemon_type in ['mgr', 'mds', 'osd']:
- fw_services.append('ceph')
- if daemon_type == 'mgr':
- fw_ports.append(8080) # dashboard
- fw_ports.append(8443) # dashboard
- fw_ports.append(9283) # mgr/prometheus exporter
- elif daemon_type in Monitoring.port_map.keys():
- fw_ports.extend(Monitoring.port_map[daemon_type]) # prometheus etc
- elif daemon_type == NFSGanesha.daemon_type:
- fw_services.append('nfs')
- for svc in fw_services:
- out, err, ret = call([cmd, '--permanent', '--query-service', svc])
+
+class Firewalld(object):
+ def __init__(self):
+ # type: () -> None
+ self.available = self.check()
+
+ def check(self):
+ # type: () -> bool
+ self.cmd = find_executable('firewall-cmd')
+ if not self.cmd:
+ logger.debug('firewalld does not appear to be present')
+ return False
+ (enabled, state, _) = check_unit('firewalld.service')
+ if not enabled:
+ logger.debug('firewalld.service is not enabled')
+ return False
+ if state != "running":
+ logger.debug('firewalld.service is not running')
+ return False
+
+ logger.info("firewalld ready")
+ return True
+
+ def enable_service_for(self, daemon_type):
+ # type: (str) -> None
+ if not self.available:
+ logger.debug('Not possible to enable service <%s>. firewalld.service is not available' % daemon_type)
+ return
+
+ if daemon_type == 'mon':
+ svc = 'ceph-mon'
+ elif daemon_type in ['mgr', 'mds', 'osd']:
+ svc = 'ceph'
+ elif daemon_type == NFSGanesha.daemon_type:
+ svc = 'nfs'
+ else:
+ return
+
+ out, err, ret = call([self.cmd, '--permanent', '--query-service', svc], verbose_on_failure=False)
if ret:
logger.info('Enabling firewalld service %s in current zone...' % svc)
- out, err, ret = call([cmd, '--permanent', '--add-service', svc])
+ out, err, ret = call([self.cmd, '--permanent', '--add-service', svc])
if ret:
raise RuntimeError(
'unable to add service %s to current zone: %s' % (svc, err))
else:
logger.debug('firewalld service %s is enabled in current zone' % svc)
- for port in fw_ports:
- tcp_port = str(port) + '/tcp'
- out, err, ret = call([cmd, '--permanent', '--query-port', tcp_port])
- if ret:
- logger.info('Enabling firewalld port %s in current zone...' % tcp_port)
- out, err, ret = call([cmd, '--permanent', '--add-port', tcp_port])
+
+ def open_ports(self, fw_ports):
+ # type: (List[int]) -> None
+ if not self.available:
+ logger.debug('Not possible to open ports <%s>. firewalld.service is not available' % fw_ports)
+ return
+
+ for port in fw_ports:
+ tcp_port = str(port) + '/tcp'
+ out, err, ret = call([self.cmd, '--permanent', '--query-port', tcp_port], verbose_on_failure=False)
if ret:
- raise RuntimeError('unable to add port %s to current zone: %s' %
- (tcp_port, err))
- else:
- logger.debug('firewalld port %s is enabled in current zone' % tcp_port)
- call_throws([cmd, '--reload'])
+ logger.info('Enabling firewalld port %s in current zone...' % tcp_port)
+ out, err, ret = call([self.cmd, '--permanent', '--add-port', tcp_port])
+ if ret:
+ raise RuntimeError('unable to add port %s to current zone: %s' %
+ (tcp_port, err))
+ else:
+ logger.debug('firewalld port %s is enabled in current zone' % tcp_port)
+
+ def apply_rules(self):
+ # type: () -> None
+ if not self.available:
+ return
+
+ call_throws([self.cmd, '--reload'])
+
+
+def update_firewalld(daemon_type):
+ # type: (str) -> None
+ firewall = Firewalld()
+
+ firewall.enable_service_for(daemon_type)
+
+ fw_ports = []
+
+ if daemon_type in Monitoring.port_map.keys():
+ fw_ports.extend(Monitoring.port_map[daemon_type]) # prometheus etc
+
+ firewall.open_ports(fw_ports)
+ firewall.apply_rules()
def install_base_units(fsid):
# type: (str) -> None
}
""" % fsid)
+
def get_unit_file(fsid):
# type: (str) -> str
u = """# generated by cephadm
##################################
+
class CephContainer:
def __init__(self,
image,
container_args=[],
envs=None,
privileged=False,
- ptrace=False):
- # type: (str, str, List[str], Dict[str, str], str, List[str], Optional[List[str]], bool, bool) -> None
+ ptrace=False,
+ bind_mounts=None):
+ # type: (str, str, List[str], Dict[str, str], str, List[str], Optional[List[str]], bool, bool, Optional[List[List[str]]]) -> None
self.image = image
self.entrypoint = entrypoint
self.args = args
self.envs = envs
self.privileged = privileged
self.ptrace = ptrace
+ self.bind_mounts = bind_mounts if bind_mounts else []
def run_cmd(self):
# type: () -> List[str]
- vols = [] # type: List[str]
- envs = [] # type: List[str]
- cname = [] # type: List[str]
- entrypoint = [] # type: List[str]
+ vols = [] # type: List[str]
+ envs = [] # type: List[str]
+ cname = [] # type: List[str]
+ binds = [] # type: List[str]
+ entrypoint = [] # type: List[str]
if self.entrypoint:
entrypoint = ['--entrypoint', self.entrypoint]
- priv = [] # type: List[str]
+ priv = [] # type: List[str]
if self.privileged:
priv = ['--privileged',
# let OSD etc read block devs that haven't been chowned
vols = sum(
[['-v', '%s:%s' % (host_dir, container_dir)]
for host_dir, container_dir in self.volume_mounts.items()], [])
+ binds = sum([['--mount', '{}'.format(','.join(bind))]
+ for bind in self.bind_mounts],[])
envs = [
'-e', 'CONTAINER_IMAGE=%s' % self.image,
'-e', 'NODE_NAME=%s' % get_hostname(),
'--ipc=host',
] + self.container_args + priv + \
cname + envs + \
- vols + entrypoint + \
+ vols + binds + entrypoint + \
[
self.image
] + self.args # type: ignore
def shell_cmd(self, cmd):
# type: (List[str]) -> List[str]
- priv = [] # type: List[str]
+ priv = [] # type: List[str]
if self.privileged:
priv = ['--privileged',
# let OSD etc read block devs that haven't been chowned
'--group-add=disk']
- vols = [] # type: List[str]
+ vols = [] # type: List[str]
vols = sum(
[['-v', '%s:%s' % (host_dir, container_dir)]
for host_dir, container_dir in self.volume_mounts.items()], [])
+ binds = [] # type: List[str]
+ binds = sum([['--mount', '{}'.format(','.join(bind))]
+ for bind in self.bind_mounts], [])
envs = [
'-e', 'CONTAINER_IMAGE=%s' % self.image,
'-e', 'NODE_NAME=%s' % get_hostname(),
'--rm',
'--net=host',
'--ipc=host',
- ] + self.container_args + priv + envs + vols + [
+ ] + self.container_args + priv + envs + vols + binds + [
'--entrypoint', cmd[0],
self.image
] + cmd[1:]
self.cname,
] + cmd
+ def rm_cmd(self, storage=False):
+ # type: (bool) -> List[str]
+ ret = [
+ str(container_path),
+ 'rm', '-f',
+ ]
+ if storage:
+ ret.append('--storage')
+ ret.append(self.cname)
+ return ret
+
+ def stop_cmd(self):
+ # type () -> List[str]
+ ret = [
+ str(container_path),
+ 'stop', self.cname,
+ ]
+ return ret
+
def run(self, timeout=DEFAULT_TIMEOUT):
# type: (Optional[int]) -> str
logger.debug(self.run_cmd())
##################################
+
@infer_image
def command_version():
# type: () -> int
##################################
+
@infer_image
def command_pull():
# type: () -> int
- logger.info('Pulling latest %s...' % args.image)
- call_throws([container_path, 'pull', args.image])
+
+ _pull_image(args.image)
return command_inspect_image()
+
+def _pull_image(image):
+ # type: (str) -> None
+ logger.info('Pulling container image %s...' % image)
+
+ ignorelist = [
+ "error creating read-write layer with ID",
+ "net/http: TLS handshake timeout",
+ "Digest did not match, expected",
+ ]
+
+ cmd = [container_path, 'pull', image]
+ cmd_str = ' '.join(cmd)
+
+ for sleep_secs in [1, 4, 25]:
+ out, err, ret = call(cmd)
+ if not ret:
+ return
+
+ if not any(pattern in err for pattern in ignorelist):
+ raise RuntimeError('Failed command: %s' % cmd_str)
+
+ logger.info('"%s failed transiently. Retrying. waiting %s seconds...' % (cmd_str, sleep_secs))
+ time.sleep(sleep_secs)
+
+ raise RuntimeError('Failed command: %s: maximum retries reached' % cmd_str)
##################################
+
@infer_image
def command_inspect_image():
# type: () -> int
##################################
+def unwrap_ipv6(address):
+ # type: (str) -> str
+ if address.startswith('[') and address.endswith(']'):
+ return address[1:-1]
+ return address
+
+
+def is_ipv6(address):
+ # type: (str) -> bool
+ address = unwrap_ipv6(address)
+ try:
+ return ipaddress.ip_address(unicode(address)).version == 6
+ except ValueError:
+ logger.warning("Address: {} isn't a valid IP address".format(address))
+ return False
+
+
@default_image
def command_bootstrap():
# type: () -> int
mon_id = args.mon_id or hostname
mgr_id = args.mgr_id or generate_service_id()
logging.info('Cluster fsid: %s' % fsid)
+ ipv6 = False
l = FileLock(fsid)
l.acquire()
# ip
r = re.compile(r':(\d+)$')
- base_ip = None
+ base_ip = ''
if args.mon_ip:
+ ipv6 = is_ipv6(args.mon_ip)
hasport = r.findall(args.mon_ip)
if hasport:
port = int(hasport[0])
if addr_arg[0] != '[' or addr_arg[-1] != ']':
raise Error('--mon-addrv value %s must use square backets' %
addr_arg)
+ ipv6 = addr_arg.count('[') > 1
for addr in addr_arg[1:-1].split(','):
hasport = r.findall(addr)
if not hasport:
# make sure IP is configured locally, and then figure out the
# CIDR network
for net, ips in list_networks().items():
- if base_ip in ips:
+ if ipaddress.ip_address(unicode(unwrap_ipv6(base_ip))) in \
+ [ipaddress.ip_address(unicode(ip)) for ip in ips]:
mon_network = net
logger.info('Mon IP %s is in CIDR network %s' % (base_ip,
mon_network))
cp.write(cpf)
config = cpf.getvalue()
+ if args.registry_json or args.registry_url:
+ command_registry_login()
+
if not args.skip_pull:
- logger.info('Pulling latest %s container...' % args.image)
- call_throws([container_path, 'pull', args.image])
+ _pull_image(args.image)
logger.info('Extracting ceph user uid/gid from container image...')
(uid, gid) = extract_uid_gid()
# wait for the service to become available
def is_mon_available():
# type: () -> bool
- timeout=args.timeout if args.timeout else 30 # seconds
+ timeout=args.timeout if args.timeout else 60 # seconds
out, err, ret = call(c.run_cmd(),
desc=c.entrypoint,
timeout=timeout)
logger.info('Setting mon public_network...')
cli(['config', 'set', 'mon', 'public_network', mon_network])
+ if ipv6:
+ logger.info('Enabling IPv6 (ms_bind_ipv6)')
+ cli(['config', 'set', 'global', 'ms_bind_ipv6', 'true'])
+
# create mgr
logger.info('Creating mgr...')
mgr_keyring = '[mgr.%s]\n\tkey = %s\n' % (mgr_id, mgr_key)
mgr_c = get_container(fsid, 'mgr', mgr_id)
+ # Note:the default port used by the Prometheus node exporter is opened in fw
deploy_daemon(fsid, 'mgr', mgr_id, mgr_c, uid, gid,
- config=config, keyring=mgr_keyring)
+ config=config, keyring=mgr_keyring, ports=[9283])
# output files
with open(args.output_keyring, 'w') as f:
logger.info('Waiting for mgr to start...')
def is_mgr_available():
# type: () -> bool
- timeout=args.timeout if args.timeout else 30 # seconds
+ timeout=args.timeout if args.timeout else 60 # seconds
try:
out = cli(['status', '-f', 'json-pretty'], timeout=timeout)
j = json.loads(out)
# ssh
if not args.skip_ssh:
+ cli(['config-key', 'set', 'mgr/cephadm/ssh_user', args.ssh_user])
+
logger.info('Enabling cephadm module...')
cli(['mgr', 'module', 'enable', 'cephadm'])
wait_for_mgr_restart()
f.write(ssh_pub)
logger.info('Wrote public SSH key to to %s' % args.output_pub_ssh_key)
- logger.info('Adding key to root@localhost\'s authorized_keys...')
- if not os.path.exists('/root/.ssh'):
- os.mkdir('/root/.ssh', 0o700)
- auth_keys_file = '/root/.ssh/authorized_keys'
+ logger.info('Adding key to %s@localhost\'s authorized_keys...' % args.ssh_user)
+ try:
+ s_pwd = pwd.getpwnam(args.ssh_user)
+ except KeyError as e:
+ raise Error('Cannot find uid/gid for ssh-user: %s' % (args.ssh_user))
+ ssh_uid = s_pwd.pw_uid
+ ssh_gid = s_pwd.pw_gid
+ ssh_dir = os.path.join(s_pwd.pw_dir, '.ssh')
+
+ if not os.path.exists(ssh_dir):
+ makedirs(ssh_dir, ssh_uid, ssh_gid, 0o700)
+
+ auth_keys_file = '%s/authorized_keys' % ssh_dir
add_newline = False
+
if os.path.exists(auth_keys_file):
with open(auth_keys_file, 'r') as f:
f.seek(0, os.SEEK_END)
f.seek(f.tell()-1, os.SEEK_SET) # go to last char
if f.read() != '\n':
add_newline = True
+
with open(auth_keys_file, 'a') as f:
+ os.fchown(f.fileno(), ssh_uid, ssh_gid) # just in case we created it
os.fchmod(f.fileno(), 0o600) # just in case we created it
if add_newline:
f.write('\n')
host = get_hostname()
logger.info('Adding host %s...' % host)
- cli(['orch', 'host', 'add', host])
+ try:
+ cli(['orch', 'host', 'add', host])
+ except RuntimeError as e:
+ raise Error('Failed to add host <%s>: %s' % (host, e))
if not args.orphan_initial_daemons:
for t in ['mon', 'mgr', 'crash']:
logger.info('Deploying %s service with default placement...' % t)
cli(['orch', 'apply', t])
+ if args.registry_url and args.registry_username and args.registry_password:
+ cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_url', args.registry_url, '--force'])
+ cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_username', args.registry_username, '--force'])
+ cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_password', args.registry_password, '--force'])
+
if not args.skip_dashboard:
+ # Configure SSL port (cephadm only allows to configure dashboard SSL port)
+ # if the user does not want to use SSL he can change this setting once the cluster is up
+ cli(["config", "set", "mgr", "mgr/dashboard/ssl_server_port" , str(args.ssl_dashboard_port)])
+
+ # configuring dashboard parameters
logger.info('Enabling the dashboard module...')
cli(['mgr', 'module', 'enable', 'dashboard'])
wait_for_mgr_restart()
out = cli(['config', 'get', 'mgr', 'mgr/dashboard/ssl_server_port'])
port = int(out)
+ # Open dashboard port
+ fw = Firewalld()
+ fw.open_ports([port])
+ fw.apply_rules()
+
logger.info('Ceph Dashboard is now available at:\n\n'
'\t URL: https://%s:%s/\n'
'\t User: %s\n'
get_fqdn(), port,
args.initial_dashboard_user,
password))
-
+
if args.apply_spec:
logger.info('Applying %s to cluster' % args.apply_spec)
ssh_key = '/etc/ceph/ceph.pub'
if args.ssh_public_key:
ssh_key = args.ssh_public_key.name
- out, err, code = call_throws(['ssh-copy-id', '-f', '-i', ssh_key, 'root@%s' % split[1]])
+ out, err, code = call_throws(['ssh-copy-id', '-f', '-i', ssh_key, '%s@%s' % (args.ssh_user, split[1])])
mounts = {}
mounts[pathify(args.apply_spec)] = '/tmp/spec.yml:z'
##################################
+def command_registry_login():
+ if args.registry_json:
+ logger.info("Pulling custom registry login info from %s." % args.registry_json)
+ d = get_parm(args.registry_json)
+ if d.get('url') and d.get('username') and d.get('password'):
+ args.registry_url = d.get('url')
+ args.registry_username = d.get('username')
+ args.registry_password = d.get('password')
+ registry_login(args.registry_url, args.registry_username, args.registry_password)
+ else:
+ raise Error("json provided for custom registry login did not include all necessary fields. "
+ "Please setup json file as\n"
+ "{\n"
+ " \"url\": \"REGISTRY_URL\",\n"
+ " \"username\": \"REGISTRY_USERNAME\",\n"
+ " \"password\": \"REGISTRY_PASSWORD\"\n"
+ "}\n")
+ elif args.registry_url and args.registry_username and args.registry_password:
+ registry_login(args.registry_url, args.registry_username, args.registry_password)
+ else:
+ raise Error("Invalid custom registry arguments received. To login to a custom registry include "
+ "--registry-url, --registry-username and --registry-password "
+ "options or --registry-json option")
+ return 0
+
+def registry_login(url, username, password):
+ logger.info("Logging into custom registry.")
+ try:
+ out, _, _ = call_throws([container_path, 'login',
+ '-u', username,
+ '-p', password,
+ url])
+ except:
+ raise Error("Failed to login to custom registry @ %s as %s with given password" % (args.registry_url, args.registry_username))
+
+##################################
+
+
def extract_uid_gid_monitoring(daemon_type):
# type: (str) -> Tuple[int, int]
elif daemon_type == 'grafana':
uid, gid = extract_uid_gid(file_path='/var/lib/grafana')
elif daemon_type == 'alertmanager':
- uid, gid = extract_uid_gid(file_path='/etc/alertmanager')
+ uid, gid = extract_uid_gid(file_path=['/etc/alertmanager', '/etc/prometheus'])
else:
raise Error("{} not implemented yet".format(daemon_type))
return uid, gid
else:
logger.info('%s daemon %s ...' % ('Deploy', args.name))
+ # Get and check ports explicitly required to be opened
+ daemon_ports = [] # type: List[int]
+ if args.tcp_ports:
+ daemon_ports = list(map(int, args.tcp_ports.split()))
+
if daemon_type in Ceph.daemons:
config, keyring = get_config_and_keyring()
uid, gid = extract_uid_gid()
make_var_run(args.fsid, uid, gid)
+
c = get_container(args.fsid, daemon_type, daemon_id,
ptrace=args.allow_ptrace)
deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid,
config=config, keyring=keyring,
osd_fsid=args.osd_fsid,
- reconfig=args.reconfig)
+ reconfig=args.reconfig,
+ ports=daemon_ports)
elif daemon_type in Monitoring.components:
# monitoring daemon - prometheus, grafana, alertmanager, node-exporter
# Default Checks
if not args.reconfig and not redeploy:
- daemon_ports = Monitoring.port_map[daemon_type] # type: List[int]
- if any([port_in_use(port) for port in daemon_ports]):
- raise Error("TCP Port(s) '{}' required for {} is already in use".format(",".join(map(str, daemon_ports)), daemon_type))
+ daemon_ports.extend(Monitoring.port_map[daemon_type])
# make sure provided config-json is sufficient
config = get_parm(args.config_json) # type: ignore
uid, gid = extract_uid_gid_monitoring(daemon_type)
c = get_container(args.fsid, daemon_type, daemon_id)
deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid,
- reconfig=args.reconfig)
+ reconfig=args.reconfig,
+ ports=daemon_ports)
elif daemon_type == NFSGanesha.daemon_type:
if not args.reconfig and not redeploy:
- NFSGanesha.port_in_use()
+ daemon_ports.extend(NFSGanesha.port_map.values())
+
config, keyring = get_config_and_keyring()
# TODO: extract ganesha uid/gid (997, 994) ?
uid, gid = extract_uid_gid()
c = get_container(args.fsid, daemon_type, daemon_id)
deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid,
config=config, keyring=keyring,
- reconfig=args.reconfig)
+ reconfig=args.reconfig,
+ ports=daemon_ports)
elif daemon_type == CephIscsi.daemon_type:
config, keyring = get_config_and_keyring()
c = get_container(args.fsid, daemon_type, daemon_id)
deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid,
config=config, keyring=keyring,
- reconfig=args.reconfig)
+ reconfig=args.reconfig,
+ ports=daemon_ports)
else:
raise Error("{} not implemented in command_deploy function".format(daemon_type))
##################################
+
@infer_image
def command_run():
# type: () -> int
##################################
+
@infer_fsid
@infer_config
@infer_image
container_args = [] # type: List[str]
mounts = get_container_mounts(args.fsid, daemon_type, daemon_id,
no_config=True if args.config else False)
+ binds = get_container_binds(args.fsid, daemon_type, daemon_id)
if args.config:
mounts[pathify(args.config)] = '/etc/ceph/ceph.conf:z'
if args.keyring:
args=[],
container_args=container_args,
volume_mounts=mounts,
+ bind_mounts=binds,
envs=args.env,
privileged=True)
command = c.shell_cmd(command)
##################################
+
@infer_fsid
def command_enter():
# type: () -> int
##################################
+
@infer_fsid
@infer_image
def command_ceph_volume():
##################################
+
@infer_fsid
def command_unit():
# type: () -> None
##################################
+
@infer_fsid
def command_logs():
# type: () -> None
##################################
+
def list_networks():
# type: () -> Dict[str,List[str]]
#j = json.loads(out)
#for x in j:
+ res = _list_ipv4_networks()
+ res.update(_list_ipv6_networks())
+ return res
+
+
+def _list_ipv4_networks():
out, _, _ = call_throws([find_executable('ip'), 'route', 'ls'])
- return _parse_ip_route(out)
+ return _parse_ipv4_route(out)
-def _parse_ip_route(out):
+
+def _parse_ipv4_route(out):
r = {} # type: Dict[str,List[str]]
p = re.compile(r'^(\S+) (.*)scope link (.*)src (\S+)')
for line in out.splitlines():
r[net].append(ip)
return r
+
+def _list_ipv6_networks():
+ routes, _, _ = call_throws([find_executable('ip'), '-6', 'route', 'ls'])
+ ips, _, _ = call_throws([find_executable('ip'), '-6', 'addr', 'ls'])
+ return _parse_ipv6_route(routes, ips)
+
+
+def _parse_ipv6_route(routes, ips):
+ r = {} # type: Dict[str,List[str]]
+ route_p = re.compile(r'^(\S+) dev (\S+) proto (\S+) metric (\S+) .*pref (\S+)$')
+ ip_p = re.compile(r'^\s+inet6 (\S+)/(.*)scope (.*)$')
+ for line in routes.splitlines():
+ m = route_p.findall(line)
+ if not m or m[0][0].lower() == 'default':
+ continue
+ net = m[0][0]
+ if net not in r:
+ r[net] = []
+
+ for line in ips.splitlines():
+ m = ip_p.findall(line)
+ if not m:
+ continue
+ ip = m[0][0]
+ # find the network it belongs to
+ net = [n for n in r.keys()
+ if ipaddress.ip_address(unicode(ip)) in ipaddress.ip_network(unicode(n))]
+ if net:
+ r[net[0]].append(ip)
+
+ return r
+
+
def command_list_networks():
# type: () -> None
r = list_networks()
##################################
+
def command_ls():
# type: () -> None
ls = list_daemons(detail=not args.no_detail,
legacy_dir=args.legacy_dir)
print(json.dumps(ls, indent=4))
+
def list_daemons(detail=True, legacy_dir=None):
# type: (bool, Optional[str]) -> List[Dict[str, str]]
host_version = None
# type: () -> None
if not args.skip_pull:
- logger.info('Pulling latest %s container...' % args.image)
- call_throws([container_path, 'pull', args.image])
+ _pull_image(args.image)
(daemon_type, daemon_id) = args.name.split('.', 1)
deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid)
update_firewalld(daemon_type)
+
def command_adopt_grafana(daemon_id, fsid):
# type: (str, str) -> None
else:
logger.debug("Skipping ssl, missing cert {} or key {}".format(cert, key))
-
# data - possible custom dashboards/plugins
data_src = '/var/lib/grafana/'
data_src = os.path.abspath(args.legacy_dir + data_src)
deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid)
update_firewalld(daemon_type)
+
def command_adopt_alertmanager(daemon_id, fsid):
# type: (str, str) -> None
deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid)
update_firewalld(daemon_type)
+
def _adjust_grafana_ini(filename):
# type: (str) -> None
##################################
+
def command_rm_cluster():
# type: () -> None
if not args.force:
return False
return True
+
def command_check_host():
# type: () -> None
+ global container_path
+
errors = []
commands = ['systemctl', 'lvcreate']
if args.docker:
- container_path = find_program('docker')
+ container_path = find_program('docker')
else:
for i in CONTAINER_PREFERENCE:
try:
##################################
+
def command_prepare_host():
# type: () -> None
logger.info('Verifying podman|docker is present...')
##################################
+
class CustomValidation(argparse.Action):
def _check_name(self, values):
##################################
+
def get_distro():
# type: () -> Tuple[Optional[str], Optional[str], Optional[str]]
distro = None
distro_codename = val.lower()
return distro, distro_version, distro_codename
+
class Packager(object):
def __init__(self, stable=None, version=None, branch=None, commit=None):
assert \
logging.info('Podman did not work. Falling back to docker...')
self.install(['docker.io'])
+
class YumDnf(Packager):
DISTRO_NAMES = {
'centos': ('centos', 'el'),
if self.distro_code.startswith('el'):
logger.info('Enabling EPEL...')
call_throws([self.tool, 'install', '-y', 'epel-release'])
- if self.distro_code == 'el8':
- # we also need Ken's copr repo, at least for now
- logger.info('Enabling supplementary copr repo ktdreyer/ceph-el8...')
- call_throws(['dnf', 'copr', 'enable', '-y', 'ktdreyer/ceph-el8'])
def rm_repo(self):
if os.path.exists(self.repo_path()):
os.unlink(self.repo_path())
- if self.distro_code == 'el8':
- logger.info('Disabling supplementary copr repo ktdreyer/ceph-el8...')
- call_throws(['dnf', 'copr', 'disable', '-y', 'ktdreyer/ceph-el8'])
def install(self, ls):
logger.info('Installing packages %s...' % ls)
commit=args.dev_commit)
pkg.add_repo()
+
def command_rm_repo():
pkg = create_packager()
pkg.rm_repo()
+
def command_install():
pkg = create_packager()
pkg.install(args.packages)
##################################
+
def _get_parser():
# type: () -> argparse.ArgumentParser
parser = argparse.ArgumentParser(
help='ceph.keyring to pass through to the container')
parser_shell.add_argument(
'--mount', '-m',
- help='file or directory path that will be mounted in container /mnt')
+ help='mount a file or directory under /mnt in the container')
parser_shell.add_argument(
'--env', '-e',
action='append',
parser_bootstrap.add_argument(
'--initial-dashboard-password',
help='Initial password for the initial dashboard user')
-
+ parser_bootstrap.add_argument(
+ '--ssl-dashboard-port',
+ type=int,
+ default = 8443,
+ help='Port number used to connect with dashboard using SSL')
parser_bootstrap.add_argument(
'--dashboard-key',
type=argparse.FileType('r'),
'--ssh-public-key',
type=argparse.FileType('r'),
help='SSH public key')
+ parser_bootstrap.add_argument(
+ '--ssh-user',
+ default='root',
+ help='set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users')
parser_bootstrap.add_argument(
'--skip-mon-network',
'--apply-spec',
help='Apply cluster spec after bootstrap (copy ssh key, add hosts and apply services)')
-
parser_bootstrap.add_argument(
'--shared_ceph_folder',
metavar='CEPH_SOURCE_FOLDER',
help='Development mode. Several folders in containers are volumes mapped to different sub-folders in the ceph source folder')
+ parser_bootstrap.add_argument(
+ '--registry-url',
+ help='url for custom registry')
+ parser_bootstrap.add_argument(
+ '--registry-username',
+ help='username for custom registry')
+ parser_bootstrap.add_argument(
+ '--registry-password',
+ help='password for custom registry')
+ parser_bootstrap.add_argument(
+ '--registry-json',
+ help='json file with custom registry login info (URL, Username, Password)')
+
parser_deploy = subparsers.add_parser(
'deploy', help='deploy a daemon')
parser_deploy.set_defaults(func=command_deploy)
'--skip-firewalld',
action='store_true',
help='Do not configure firewalld')
+ parser_deploy.add_argument(
+ '--tcp-ports',
+ help='List of tcp ports to open in the host firewall')
parser_deploy.add_argument(
'--reconfig',
action='store_true',
default=['cephadm'],
help='packages')
+ parser_registry_login = subparsers.add_parser(
+ 'registry-login', help='log host into authenticated registry')
+ parser_registry_login.set_defaults(func=command_registry_login)
+ parser_registry_login.add_argument(
+ '--registry-url',
+ help='url for custom registry')
+ parser_registry_login.add_argument(
+ '--registry-username',
+ help='username for custom registry')
+ parser_registry_login.add_argument(
+ '--registry-password',
+ help='password for custom registry')
+ parser_registry_login.add_argument(
+ '--registry-json',
+ help='json file with custom registry login info (URL, Username, Password)')
+ parser_registry_login.add_argument(
+ '--fsid',
+ help='cluster FSID')
+
return parser
+
def _parse_args(av):
parser = _get_parser()
args = parser.parse_args(av)
args.command.pop(0)
return args
+
if __name__ == "__main__":
# allow argv to be injected
try:
- av = injected_argv # type: ignore
+ av = injected_argv # type: ignore
except NameError:
av = sys.argv[1:]
args = _parse_args(av)