#!/usr/bin/python3
DEFAULT_IMAGE='docker.io/ceph/ceph:v15'
-DATA_DIR='/var/lib/ceph'
-LOG_DIR='/var/log/ceph'
-LOCK_DIR='/run/cephadm'
-LOGROTATE_DIR='/etc/logrotate.d'
-UNIT_DIR='/etc/systemd/system'
-LOG_DIR_MODE=0o770
-DATA_DIR_MODE=0o700
+DEFAULT_IMAGE_IS_MASTER=False
+LATEST_STABLE_RELEASE = 'octopus'
+DATA_DIR = '/var/lib/ceph'
+LOG_DIR = '/var/log/ceph'
+LOCK_DIR = '/run/cephadm'
+LOGROTATE_DIR = '/etc/logrotate.d'
+UNIT_DIR = '/etc/systemd/system'
+LOG_DIR_MODE = 0o770
+DATA_DIR_MODE = 0o700
CONTAINER_PREFERENCE = ['podman', 'docker'] # prefer podman to docker
-CUSTOM_PS1=r'[ceph: \u@\h \W]\$ '
-DEFAULT_TIMEOUT=None # in seconds
-DEFAULT_RETRY=10
-SHELL_DEFAULT_CONF='/etc/ceph/ceph.conf'
-SHELL_DEFAULT_KEYRING='/etc/ceph/ceph.client.admin.keyring'
+CUSTOM_PS1 = r'[ceph: \u@\h \W]\$ '
+DEFAULT_TIMEOUT = None # in seconds
+DEFAULT_RETRY = 10
+SHELL_DEFAULT_CONF = '/etc/ceph/ceph.conf'
+SHELL_DEFAULT_KEYRING = '/etc/ceph/ceph.client.admin.keyring'
"""
You can invoke cephadm in two ways:
injected_stdin = '...'
"""
-
import argparse
import datetime
import fcntl
+import ipaddress
import json
import logging
+from logging.config import dictConfig
import os
import platform
+import pwd
import random
-import re
import select
import shutil
import socket
import tempfile
import time
import errno
+import struct
+from enum import Enum
try:
- from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable
+ from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO
except ImportError:
pass
+
+import re
import uuid
from functools import wraps
else:
from urllib2 import urlopen, HTTPError
+if sys.version_info > (3, 0):
+ unicode = str
+
container_path = ''
cached_stdin = None
-DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
+DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ'
+
+# Log and console output config
+logging_config = {
+ 'version': 1,
+ 'disable_existing_loggers': True,
+ 'formatters': {
+ 'cephadm': {
+ 'format': '%(asctime)s %(levelname)s %(message)s'
+ },
+ },
+ 'handlers': {
+ 'console':{
+ 'level':'INFO',
+ 'class':'logging.StreamHandler',
+ },
+ 'log_file': {
+ 'level': 'DEBUG',
+ 'class': 'logging.handlers.RotatingFileHandler',
+ 'formatter': 'cephadm',
+ 'filename': '%s/cephadm.log' % LOG_DIR,
+ 'maxBytes': 1024000,
+ 'backupCount': 1,
+ }
+ },
+ 'loggers': {
+ '': {
+ 'level': 'DEBUG',
+ 'handlers': ['console', 'log_file'],
+ }
+ }
+}
+
+class termcolor:
+ yellow = '\033[93m'
+ red = '\033[31m'
+ end = '\033[0m'
+
class Error(Exception):
pass
+
class TimeoutExpired(Error):
pass
##################################
+
class Ceph(object):
daemons = ('mon', 'mgr', 'mds', 'osd', 'rgw', 'rbd-mirror',
'crash')
##################################
+
class Monitoring(object):
"""Define the configs for the monitoring containers"""
components = {
"prometheus": {
- "image": "prom/prometheus:latest",
+ "image": "docker.io/prom/prometheus:v2.18.1",
"cpus": '2',
"memory": '4GB',
"args": [
],
},
"node-exporter": {
- "image": "prom/node-exporter",
+ "image": "docker.io/prom/node-exporter:v0.18.1",
"cpus": "1",
"memory": "1GB",
"args": [
],
},
"grafana": {
- "image": "ceph/ceph-grafana:latest",
+ "image": "docker.io/ceph/ceph-grafana:6.7.4",
"cpus": "2",
"memory": "4GB",
"args": [],
],
},
"alertmanager": {
- "image": "prom/alertmanager",
+ "image": "docker.io/prom/alertmanager:v0.20.0",
"cpus": "2",
"memory": "2GB",
- "args": [],
+ "args": [
+ "--web.listen-address=:{}".format(port_map['alertmanager'][0]),
+ "--cluster.listen-address=:{}".format(port_map['alertmanager'][1]),
+ ],
"config-json-files": [
"alertmanager.yml",
],
##################################
+
class NFSGanesha(object):
"""Defines a NFS-Ganesha container"""
self.daemon_id = daemon_id
self.image = image
- def json_get(key, default=None, require=False):
- if require and not key in config_json.keys():
- raise Error('{} missing from config-json'.format(key))
- return config_json.get(key, default)
-
# config-json options
- self.pool = json_get('pool', require=True)
- self.namespace = json_get('namespace')
- self.files = json_get('files', {})
+ self.pool = dict_get(config_json, 'pool', require=True)
+ self.namespace = dict_get(config_json, 'namespace')
+ self.userid = dict_get(config_json, 'userid')
+ self.extra_args = dict_get(config_json, 'extra_args', [])
+ self.files = dict_get(config_json, 'files', {})
+ self.rgw = dict_get(config_json, 'rgw', {})
# validate the supplied args
self.validate()
# type: (str, Union[int, str]) -> NFSGanesha
return cls(fsid, daemon_id, get_parm(args.config_json), args.image)
- @staticmethod
- def port_in_use():
- # type () -> None
- for (srv, port) in NFSGanesha.port_map.items():
- if port_in_use(port):
- msg = 'TCP port {} required for {} is already in use'.format(port, srv)
- raise Error(msg)
-
- @staticmethod
- def get_container_mounts(data_dir):
+ def get_container_mounts(self, data_dir):
# type: (str) -> Dict[str, str]
mounts = dict()
mounts[os.path.join(data_dir, 'config')] = '/etc/ceph/ceph.conf:z'
mounts[os.path.join(data_dir, 'keyring')] = '/etc/ceph/keyring:z'
mounts[os.path.join(data_dir, 'etc/ganesha')] = '/etc/ganesha:z'
+ if self.rgw:
+ cluster = self.rgw.get('cluster', 'ceph')
+ rgw_user = self.rgw.get('user', 'admin')
+ mounts[os.path.join(data_dir, 'keyring.rgw')] = \
+ '/var/lib/ceph/radosgw/%s-%s/keyring:z' % (cluster, rgw_user)
return mounts
@staticmethod
@staticmethod
def get_version(container_id):
- # type(str) -> Optional[str]
+ # type: (str) -> Optional[str]
version = None
out, err, code = call(
[container_path, 'exec', container_id,
return version
def validate(self):
- # type () -> None
+ # type: () -> None
if not is_fsid(self.fsid):
raise Error('not an fsid: %s' % self.fsid)
if not self.daemon_id:
if fname not in self.files:
raise Error('required file missing from config-json: %s' % fname)
+ # check for an RGW config
+ if self.rgw:
+ if not self.rgw.get('keyring'):
+ raise Error('RGW keyring is missing')
+ if not self.rgw.get('user'):
+ raise Error('RGW user is missing')
+
def get_daemon_name(self):
# type: () -> str
return '%s.%s' % (self.daemon_type, self.daemon_id)
cname = '%s-%s' % (cname, desc)
return cname
- def get_file_content(self, fname):
- # type: (str) -> str
- """Normalize the json file content into a string"""
- content = self.files.get(fname)
- if isinstance(content, list):
- content = '\n'.join(content)
- return content
+ def get_daemon_args(self):
+ # type: () -> List[str]
+ return self.daemon_args + self.extra_args
def create_daemon_dirs(self, data_dir, uid, gid):
# type: (str, int, int) -> None
# populate files from the config-json
for fname in self.files:
config_file = os.path.join(config_dir, fname)
- config_content = self.get_file_content(fname)
+ config_content = dict_get_join(self.files, fname)
logger.info('Write file: %s' % (config_file))
with open(config_file, 'w') as f:
os.fchown(f.fileno(), uid, gid)
os.fchmod(f.fileno(), 0o600)
f.write(config_content)
+ # write the RGW keyring
+ if self.rgw:
+ keyring_path = os.path.join(data_dir, 'keyring.rgw')
+ with open(keyring_path, 'w') as f:
+ os.fchmod(f.fileno(), 0o600)
+ os.fchown(f.fileno(), uid, gid)
+ f.write(self.rgw.get('keyring', ''))
+
def get_rados_grace_container(self, action):
# type: (str) -> CephContainer
"""Container for a ganesha action on the grace db"""
args=['--pool', self.pool]
if self.namespace:
args += ['--ns', self.namespace]
+ if self.userid:
+ args += ['--userid', self.userid]
args += [action, self.get_daemon_name()]
data_dir = get_data_dir(self.fsid, self.daemon_type, self.daemon_id)
volume_mounts = self.get_container_mounts(data_dir)
envs = self.get_container_envs()
- logger.info('Creating RADOS grace for action: %s' % (action))
+ logger.info('Creating RADOS grace for action: %s' % action)
c = CephContainer(
image=self.image,
entrypoint=entrypoint,
args=args,
volume_mounts=volume_mounts,
- cname=self.get_container_name(desc='grace-%s' % (action)),
+ cname=self.get_container_name(desc='grace-%s' % action),
envs=envs
)
return c
##################################
+
+class CephIscsi(object):
+ """Defines a Ceph-Iscsi container"""
+
+ daemon_type = 'iscsi'
+ entrypoint = '/usr/bin/rbd-target-api'
+
+ required_files = ['iscsi-gateway.cfg']
+
+ def __init__(self,
+ fsid,
+ daemon_id,
+ config_json,
+ image=DEFAULT_IMAGE):
+ # type: (str, Union[int, str], Dict, str) -> None
+ self.fsid = fsid
+ self.daemon_id = daemon_id
+ self.image = image
+
+ # config-json options
+ self.files = dict_get(config_json, 'files', {})
+
+ # validate the supplied args
+ self.validate()
+
+ @classmethod
+ def init(cls, fsid, daemon_id):
+ # type: (str, Union[int, str]) -> CephIscsi
+ return cls(fsid, daemon_id, get_parm(args.config_json), args.image)
+
+ @staticmethod
+ def get_container_mounts(data_dir, log_dir):
+ # type: (str, str) -> Dict[str, str]
+ mounts = dict()
+ mounts[os.path.join(data_dir, 'config')] = '/etc/ceph/ceph.conf:z'
+ mounts[os.path.join(data_dir, 'keyring')] = '/etc/ceph/keyring:z'
+ mounts[os.path.join(data_dir, 'iscsi-gateway.cfg')] = '/etc/ceph/iscsi-gateway.cfg:z'
+ mounts[os.path.join(data_dir, 'configfs')] = '/sys/kernel/config'
+ mounts[log_dir] = '/var/log/rbd-target-api:z'
+ mounts['/dev'] = '/dev'
+ return mounts
+
+ @staticmethod
+ def get_container_binds():
+ # type: () -> List[List[str]]
+ binds = []
+ lib_modules = ['type=bind',
+ 'source=/lib/modules',
+ 'destination=/lib/modules',
+ 'ro=true']
+ binds.append(lib_modules)
+ return binds
+
+ @staticmethod
+ def get_version(container_id):
+ # type: (str) -> Optional[str]
+ version = None
+ out, err, code = call(
+ [container_path, 'exec', container_id,
+ '/usr/bin/python3', '-c', "import pkg_resources; print(pkg_resources.require('ceph_iscsi')[0].version)"])
+ if code == 0:
+ version = out.strip()
+ return version
+
+ def validate(self):
+ # type: () -> None
+ if not is_fsid(self.fsid):
+ raise Error('not an fsid: %s' % self.fsid)
+ if not self.daemon_id:
+ raise Error('invalid daemon_id: %s' % self.daemon_id)
+ if not self.image:
+ raise Error('invalid image: %s' % self.image)
+
+ # check for the required files
+ if self.required_files:
+ for fname in self.required_files:
+ if fname not in self.files:
+ raise Error('required file missing from config-json: %s' % fname)
+
+ def get_daemon_name(self):
+ # type: () -> str
+ return '%s.%s' % (self.daemon_type, self.daemon_id)
+
+ def get_container_name(self, desc=None):
+ # type: (Optional[str]) -> str
+ cname = 'ceph-%s-%s' % (self.fsid, self.get_daemon_name())
+ if desc:
+ cname = '%s-%s' % (cname, desc)
+ return cname
+
+ def create_daemon_dirs(self, data_dir, uid, gid):
+ # type: (str, int, int) -> None
+ """Create files under the container data dir"""
+ if not os.path.isdir(data_dir):
+ raise OSError('data_dir is not a directory: %s' % (data_dir))
+
+ logger.info('Creating ceph-iscsi config...')
+ configfs_dir = os.path.join(data_dir, 'configfs')
+ makedirs(configfs_dir, uid, gid, 0o755)
+
+ # populate files from the config-json
+ for fname in self.files:
+ config_file = os.path.join(data_dir, fname)
+ config_content = dict_get_join(self.files, fname)
+ logger.info('Write file: %s' % (config_file))
+ with open(config_file, 'w') as f:
+ os.fchown(f.fileno(), uid, gid)
+ os.fchmod(f.fileno(), 0o600)
+ f.write(config_content)
+
+ @staticmethod
+ def configfs_mount_umount(data_dir, mount=True):
+ # type: (str, bool) -> List[str]
+ mount_path = os.path.join(data_dir, 'configfs')
+ if mount:
+ cmd = "if ! grep -qs {0} /proc/mounts; then " \
+ "mount -t configfs none {0}; fi".format(mount_path)
+ else:
+ cmd = "if grep -qs {0} /proc/mounts; then " \
+ "umount {0}; fi".format(mount_path)
+ return cmd.split()
+
+ def get_tcmu_runner_container(self):
+ # type: () -> CephContainer
+ tcmu_container = get_container(self.fsid, self.daemon_type, self.daemon_id)
+ tcmu_container.entrypoint = "/usr/bin/tcmu-runner"
+ tcmu_container.cname = self.get_container_name(desc='tcmu')
+ # remove extra container args for tcmu container.
+ # extra args could cause issue with forking service type
+ tcmu_container.container_args = []
+ return tcmu_container
+
+##################################
+
+
+class CustomContainer(object):
+ """Defines a custom container"""
+ daemon_type = 'container'
+
+ def __init__(self, fsid: str, daemon_id: Union[int, str],
+ config_json: Dict, image: str) -> None:
+ self.fsid = fsid
+ self.daemon_id = daemon_id
+ self.image = image
+
+ # config-json options
+ self.entrypoint = dict_get(config_json, 'entrypoint')
+ self.uid = dict_get(config_json, 'uid', 65534) # nobody
+ self.gid = dict_get(config_json, 'gid', 65534) # nobody
+ self.volume_mounts = dict_get(config_json, 'volume_mounts', {})
+ self.args = dict_get(config_json, 'args', [])
+ self.envs = dict_get(config_json, 'envs', [])
+ self.privileged = dict_get(config_json, 'privileged', False)
+ self.bind_mounts = dict_get(config_json, 'bind_mounts', [])
+ self.ports = dict_get(config_json, 'ports', [])
+ self.dirs = dict_get(config_json, 'dirs', [])
+ self.files = dict_get(config_json, 'files', {})
+
+ @classmethod
+ def init(cls, fsid: str, daemon_id: Union[int, str]) -> 'CustomContainer':
+ return cls(fsid, daemon_id, get_parm(args.config_json), args.image)
+
+ def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None:
+ """
+ Create dirs/files below the container data directory.
+ """
+ logger.info('Creating custom container configuration '
+ 'dirs/files in {} ...'.format(data_dir))
+
+ if not os.path.isdir(data_dir):
+ raise OSError('data_dir is not a directory: %s' % data_dir)
+
+ for dir_path in self.dirs:
+ logger.info('Creating directory: {}'.format(dir_path))
+ dir_path = os.path.join(data_dir, dir_path.strip('/'))
+ makedirs(dir_path, uid, gid, 0o755)
+
+ for file_path in self.files:
+ logger.info('Creating file: {}'.format(file_path))
+ content = dict_get_join(self.files, file_path)
+ file_path = os.path.join(data_dir, file_path.strip('/'))
+ with open(file_path, 'w', encoding='utf-8') as f:
+ os.fchown(f.fileno(), uid, gid)
+ os.fchmod(f.fileno(), 0o600)
+ f.write(content)
+
+ def get_daemon_args(self) -> List[str]:
+ return []
+
+ def get_container_args(self) -> List[str]:
+ return self.args
+
+ def get_container_envs(self) -> List[str]:
+ return self.envs
+
+ def get_container_mounts(self, data_dir: str) -> Dict[str, str]:
+ """
+ Get the volume mounts. Relative source paths will be located below
+ `/var/lib/ceph/<cluster-fsid>/<daemon-name>`.
+
+ Example:
+ {
+ /foo/conf: /conf
+ foo/conf: /conf
+ }
+ becomes
+ {
+ /foo/conf: /conf
+ /var/lib/ceph/<cluster-fsid>/<daemon-name>/foo/conf: /conf
+ }
+ """
+ mounts = {}
+ for source, destination in self.volume_mounts.items():
+ source = os.path.join(data_dir, source)
+ mounts[source] = destination
+ return mounts
+
+ def get_container_binds(self, data_dir: str) -> List[List[str]]:
+ """
+ Get the bind mounts. Relative `source=...` paths will be located below
+ `/var/lib/ceph/<cluster-fsid>/<daemon-name>`.
+
+ Example:
+ [
+ 'type=bind',
+ 'source=lib/modules',
+ 'destination=/lib/modules',
+ 'ro=true'
+ ]
+ becomes
+ [
+ ...
+ 'source=/var/lib/ceph/<cluster-fsid>/<daemon-name>/lib/modules',
+ ...
+ ]
+ """
+ binds = self.bind_mounts.copy()
+ for bind in binds:
+ for index, value in enumerate(bind):
+ match = re.match(r'^source=(.+)$', value)
+ if match:
+ bind[index] = 'source={}'.format(os.path.join(
+ data_dir, match.group(1)))
+ return binds
+
+##################################
+
+
+def dict_get(d: Dict, key: str, default: Any = None, require: bool = False) -> Any:
+ """
+ Helper function to get a key from a dictionary.
+ :param d: The dictionary to process.
+ :param key: The name of the key to get.
+ :param default: The default value in case the key does not
+ exist. Default is `None`.
+ :param require: Set to `True` if the key is required. An
+ exception will be raised if the key does not exist in
+ the given dictionary.
+ :return: Returns the value of the given key.
+ :raises: :exc:`self.Error` if the given key does not exist
+ and `require` is set to `True`.
+ """
+ if require and key not in d.keys():
+ raise Error('{} missing from dict'.format(key))
+ return d.get(key, default)
+
+##################################
+
+
+def dict_get_join(d: Dict, key: str) -> Any:
+ """
+ Helper function to get the value of a given key from a dictionary.
+ `List` values will be converted to a string by joining them with a
+ line break.
+ :param d: The dictionary to process.
+ :param key: The name of the key to get.
+ :return: Returns the value of the given key. If it was a `list`, it
+ will be joining with a line break.
+ """
+ value = d.get(key)
+ if isinstance(value, list):
+ value = '\n'.join(map(str, value))
+ return value
+
+##################################
+
+
def get_supported_daemons():
+ # type: () -> List[str]
supported_daemons = list(Ceph.daemons)
supported_daemons.extend(Monitoring.components)
supported_daemons.append(NFSGanesha.daemon_type)
+ supported_daemons.append(CephIscsi.daemon_type)
+ supported_daemons.append(CustomContainer.daemon_type)
assert len(supported_daemons) == len(set(supported_daemons))
return supported_daemons
##################################
+
def attempt_bind(s, address, port):
- # type (str) -> None
+ # type: (socket.socket, str, int) -> None
try:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((address, port))
finally:
s.close()
+
def port_in_use(port_num):
- # type (int) -> bool
+ # type: (int) -> bool
"""Detect whether a port is in use on the local machine - IPv4 and IPv6"""
- logger.info('Verifying port %d ...' % (port_num))
+ logger.info('Verifying port %d ...' % port_num)
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
attempt_bind(s, '0.0.0.0', port_num)
else:
return False
+
def check_ip_port(ip, port):
+ # type: (str, int) -> None
if not args.skip_ping_check:
logger.info('Verifying IP %s port %d ...' % (ip, port))
- if ip.startswith('[') or '::' in ip:
+ if is_ipv6(ip):
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+ ip = unwrap_ipv6(ip)
else:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
except NameError:
TimeoutError = OSError
+
class Timeout(TimeoutError):
"""
Raised when the lock could not be acquired in *timeout*
class FileLock(object):
- def __init__(self, name, timeout = -1):
+ def __init__(self, name, timeout=-1):
if not os.path.exists(LOCK_DIR):
os.mkdir(LOCK_DIR, 0o700)
self._lock_file = os.path.join(LOCK_DIR, name + '.lock')
lock_id, lock_filename, poll_intervall
)
time.sleep(poll_intervall)
- except:
+ except: # noqa
# Something did go wrong, so decrement the counter.
self._lock_counter = max(0, self._lock_counter - 1)
raise
return _Acquire_ReturnProxy(lock = self)
- def release(self, force = False):
+ def release(self, force=False):
"""
Releases the file lock.
Please note, that the lock is only completly released, if the lock
return None
def __del__(self):
- self.release(force = True)
+ self.release(force=True)
return None
-
def _acquire(self):
open_mode = os.O_RDWR | os.O_CREAT | os.O_TRUNC
fd = os.open(self._lock_file, open_mode)
# https://stackoverflow.com/questions/17708885/flock-removing-locked-file-without-race-condition
fd = self._lock_file_fd
self._lock_file_fd = None
- fcntl.flock(fd, fcntl.LOCK_UN)
- os.close(fd)
+ fcntl.flock(fd, fcntl.LOCK_UN) # type: ignore
+ os.close(fd) # type: ignore
return None
##################################
# Popen wrappers, lifted from ceph-volume
-def call(command, # type: List[str]
- desc=None, # type: Optional[str]
- verbose=False, # type: bool
- verbose_on_failure=True, # type: bool
- timeout=DEFAULT_TIMEOUT, # type: Optional[int]
- **kwargs):
+class CallVerbosity(Enum):
+ SILENT = 0
+ # log stdout/stderr to logger.debug
+ DEBUG = 1
+ # On a non-zero exit status, it will forcefully set
+ # logging ON for the terminal
+ VERBOSE_ON_FAILURE = 2
+ # log at info (instead of debug) level.
+ VERBOSE = 3
+
+
+def call(command: List[str],
+ desc: Optional[str] = None,
+ verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE,
+ timeout: Optional[int] = DEFAULT_TIMEOUT,
+ **kwargs) -> Tuple[str, str, int]:
"""
Wrap subprocess.Popen to
- decode utf-8
- cleanly return out, err, returncode
- If verbose=True, log at info (instead of debug) level.
-
- :param verbose_on_failure: On a non-zero exit status, it will forcefully set
- logging ON for the terminal
:param timeout: timeout in seconds
"""
- if not desc:
+ if desc is None:
desc = command[0]
+ if desc:
+ desc += ': '
timeout = timeout or args.timeout
logger.debug("Running command: %s" % ' '.join(command))
end_time = start_time + timeout
while not stop:
if end_time and (time.time() >= end_time):
- logger.info(desc + ':timeout after %s seconds' % timeout)
stop = True
- process.kill()
+ if process.poll() is None:
+ logger.info(desc + 'timeout after %s seconds' % timeout)
+ process.kill()
if reads and process.poll() is not None:
# we want to stop, but first read off anything remaining
# on stdout/stderr
lines = message.split('\n')
out_buffer = lines.pop()
for line in lines:
- if verbose:
- logger.info(desc + ':stdout ' + line)
- else:
- logger.debug(desc + ':stdout ' + line)
+ if verbosity == CallVerbosity.VERBOSE:
+ logger.info(desc + 'stdout ' + line)
+ elif verbosity != CallVerbosity.SILENT:
+ logger.debug(desc + 'stdout ' + line)
elif fd == process.stderr.fileno():
err += message
message = err_buffer + message
lines = message.split('\n')
err_buffer = lines.pop()
for line in lines:
- if verbose:
- logger.info(desc + ':stderr ' + line)
- else:
- logger.debug(desc + ':stderr ' + line)
+ if verbosity == CallVerbosity.VERBOSE:
+ logger.info(desc + 'stderr ' + line)
+ elif verbosity != CallVerbosity.SILENT:
+ logger.debug(desc + 'stderr ' + line)
else:
assert False
except (IOError, OSError):
pass
+ if verbosity == CallVerbosity.VERBOSE:
+ logger.debug(desc + 'profile rt=%s, stop=%s, exit=%s, reads=%s'
+ % (time.time()-start_time, stop, process.poll(), reads))
returncode = process.wait()
if out_buffer != '':
- if verbose:
- logger.info(desc + ':stdout ' + out_buffer)
- else:
- logger.debug(desc + ':stdout ' + out_buffer)
+ if verbosity == CallVerbosity.VERBOSE:
+ logger.info(desc + 'stdout ' + out_buffer)
+ elif verbosity != CallVerbosity.SILENT:
+ logger.debug(desc + 'stdout ' + out_buffer)
if err_buffer != '':
- if verbose:
- logger.info(desc + ':stderr ' + err_buffer)
- else:
- logger.debug(desc + ':stderr ' + err_buffer)
+ if verbosity == CallVerbosity.VERBOSE:
+ logger.info(desc + 'stderr ' + err_buffer)
+ elif verbosity != CallVerbosity.SILENT:
+ logger.debug(desc + 'stderr ' + err_buffer)
- if returncode != 0 and verbose_on_failure and not verbose:
+ if returncode != 0 and verbosity == CallVerbosity.VERBOSE_ON_FAILURE:
# dump stdout + stderr
logger.info('Non-zero exit code %d from %s' % (returncode, ' '.join(command)))
for line in out.splitlines():
- logger.info(desc + ':stdout ' + line)
+ logger.info(desc + 'stdout ' + line)
for line in err.splitlines():
- logger.info(desc + ':stderr ' + line)
+ logger.info(desc + 'stderr ' + line)
return out, err, returncode
-def call_throws(command, **kwargs):
- # type: (List[str], Any) -> Tuple[str, str, int]
- out, err, ret = call(command, **kwargs)
+def call_throws(command: List[str],
+ desc: Optional[str] = None,
+ verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE,
+ timeout: Optional[int] = DEFAULT_TIMEOUT,
+ **kwargs) -> Tuple[str, str, int]:
+ out, err, ret = call(command, desc, verbosity, timeout, **kwargs)
if ret:
raise RuntimeError('Failed command: %s' % ' '.join(command))
return out, err, ret
##################################
+
def is_available(what, func):
# type: (str, Callable[[], bool]) -> None
"""
:param func: the callable object that determines availability
"""
retry = args.retry
- logger.info('Waiting for %s...' % (what))
+ logger.info('Waiting for %s...' % what)
num = 1
while True:
if func():
+ logger.info('%s is available'
+ % what)
break
elif num > retry:
raise Error('%s not available after %s tries'
return cp
+
def pathify(p):
# type: (str) -> str
- if not p.startswith('/'):
- return os.path.join(os.getcwd(), p)
- return p
+ p = os.path.expanduser(p)
+ return os.path.abspath(p)
+
def get_file_timestamp(fn):
+ # type: (str) -> Optional[str]
try:
mt = os.path.getmtime(fn)
return datetime.datetime.fromtimestamp(
mt, tz=datetime.timezone.utc
).strftime(DATEFMT)
- except Exception as e:
+ except Exception:
return None
+
def try_convert_datetime(s):
+ # type: (str) -> Optional[str]
# This is super irritating because
# 1) podman and docker use different formats
# 2) python's strptime can't parse either one
p = re.compile(r'(\.[\d]{6})[\d]*')
s = p.sub(r'\1', s)
- # replace trailling Z with -0000, since (on python 3.6.8) it won't parse
+ # replace trailing Z with -0000, since (on python 3.6.8) it won't parse
if s and s[-1] == 'Z':
s = s[:-1] + '-0000'
- # cut off the redundnat 'CST' part that strptime can't parse, if
+ # cut off the redundant 'CST' part that strptime can't parse, if
# present.
v = s.split(' ')
s = ' '.join(v[0:3])
pass
return None
+
def get_podman_version():
# type: () -> Tuple[int, ...]
if 'podman' not in container_path:
out, _, _ = call_throws([container_path, '--version'])
return _parse_podman_version(out)
+
def _parse_podman_version(out):
# type: (str) -> Tuple[int, ...]
_, _, version_str = out.strip().split()
# type: () -> str
return socket.gethostname()
+
def get_fqdn():
# type: () -> str
return socket.getfqdn() or socket.gethostname()
+
def get_arch():
# type: () -> str
return platform.uname().machine
+
def generate_service_id():
# type: () -> str
return get_hostname() + '.' + ''.join(random.choice(string.ascii_lowercase)
for _ in range(6))
+
def generate_password():
# type: () -> str
return ''.join(random.choice(string.ascii_lowercase + string.digits)
for i in range(10))
+
def normalize_container_id(i):
# type: (str) -> str
# docker adds the sha256: prefix, but AFAICS both
i = i[len(prefix):]
return i
+
def make_fsid():
# type: () -> str
return str(uuid.uuid1())
+
def is_fsid(s):
# type: (str) -> bool
try:
return False
return True
+
def infer_fsid(func):
"""
If we only find a single fsid in /var/lib/ceph/*, use that
logger.debug('Using specified fsid: %s' % args.fsid)
return func()
- fsids = set()
+ fsids_set = set()
daemon_list = list_daemons(detail=False)
for daemon in daemon_list:
- if 'name' not in args or not args.name:
- fsids.add(daemon['fsid'])
+ if not is_fsid(daemon['fsid']):
+ # 'unknown' fsid
+ continue
+ elif 'name' not in args or not args.name:
+ # args.name not specified
+ fsids_set.add(daemon['fsid'])
elif daemon['name'] == args.name:
- fsids.add(daemon['fsid'])
- fsids = list(fsids)
+ # args.name is a match
+ fsids_set.add(daemon['fsid'])
+ fsids = sorted(fsids_set)
if not fsids:
# some commands do not always require an fsid
return _infer_fsid
+
+def infer_config(func):
+ """
+ If we find a MON daemon, use the config from that container
+ """
+ @wraps(func)
+ def _infer_config():
+ if args.config:
+ logger.debug('Using specified config: %s' % args.config)
+ return func()
+ config = None
+ if args.fsid:
+ name = args.name
+ if not name:
+ daemon_list = list_daemons(detail=False)
+ for daemon in daemon_list:
+ if daemon['name'].startswith('mon.'):
+ name = daemon['name']
+ break
+ if name:
+ config = '/var/lib/ceph/{}/{}/config'.format(args.fsid, name)
+ if config:
+ logger.info('Inferring config %s' % config)
+ args.config = config
+ elif os.path.exists(SHELL_DEFAULT_CONF):
+ logger.debug('Using default config: %s' % SHELL_DEFAULT_CONF)
+ args.config = SHELL_DEFAULT_CONF
+ return func()
+
+ return _infer_config
+
+
+def _get_default_image():
+ if DEFAULT_IMAGE_IS_MASTER:
+ warn = '''This is a development version of cephadm.
+For information regarding the latest stable release:
+ https://docs.ceph.com/docs/{}/cephadm/install
+'''.format(LATEST_STABLE_RELEASE)
+ for line in warn.splitlines():
+ logger.warning('{}{}{}'.format(termcolor.yellow, line, termcolor.end))
+ return DEFAULT_IMAGE
+
+
def infer_image(func):
"""
Use the most recent ceph image
if not args.image:
args.image = get_last_local_ceph_image()
if not args.image:
- args.image = DEFAULT_IMAGE
+ args.image = _get_default_image()
return func()
return _infer_image
+
def default_image(func):
@wraps(func)
def _default_image():
if not args.image:
args.image = os.environ.get('CEPHADM_IMAGE')
if not args.image:
- args.image = DEFAULT_IMAGE
+ args.image = _get_default_image()
+
return func()
return _default_image
+
def get_last_local_ceph_image():
"""
:return: The most recent local ceph image (already pulled)
out, _, _ = call_throws(
[container_path, 'images',
'--filter', 'label=ceph=True',
- '--format', '{{.Repository}} {{.Tag}}'])
- out_lines = out.splitlines()
- if len(out_lines) > 0:
- repository, tag = out_lines[0].split()
- r = '{}:{}'.format(repository, tag)
- logger.info('Using recent ceph image %s' % r)
- return r
+ '--filter', 'dangling=false',
+ '--format', '{{.Repository}}@{{.Digest}}'])
+ return _filter_last_local_ceph_image(out)
+
+
+def _filter_last_local_ceph_image(out):
+ # str -> Optional[str]
+ for image in out.splitlines():
+ if image and not image.endswith('@'):
+ logger.info('Using recent ceph image %s' % image)
+ return image
return None
+
def write_tmp(s, uid, gid):
+ # type: (str, int, int) -> Any
tmp_f = tempfile.NamedTemporaryFile(mode='w',
prefix='ceph-tmp')
os.fchown(tmp_f.fileno(), uid, gid)
return tmp_f
+
def makedirs(dir, uid, gid, mode):
# type: (str, int, int, int) -> None
if not os.path.exists(dir):
os.chown(dir, uid, gid)
os.chmod(dir, mode) # the above is masked by umask...
+
def get_data_dir(fsid, t, n):
# type: (str, str, Union[int, str]) -> str
return os.path.join(args.data_dir, fsid, '%s.%s' % (t, n))
+
def get_log_dir(fsid):
# type: (str) -> str
return os.path.join(args.log_dir, fsid)
+
def make_data_dir_base(fsid, uid, gid):
# type: (str, int, int) -> str
data_dir_base = os.path.join(args.data_dir, fsid)
DATA_DIR_MODE)
return data_dir_base
+
def make_data_dir(fsid, daemon_type, daemon_id, uid=None, gid=None):
- # type: (str, str, Union[int, str], int, int) -> str
- if not uid or not gid:
- (uid, gid) = extract_uid_gid()
+ # type: (str, str, Union[int, str], Optional[int], Optional[int]) -> str
+ if uid is None or gid is None:
+ uid, gid = extract_uid_gid()
make_data_dir_base(fsid, uid, gid)
data_dir = get_data_dir(fsid, daemon_type, daemon_id)
makedirs(data_dir, uid, gid, DATA_DIR_MODE)
return data_dir
+
def make_log_dir(fsid, uid=None, gid=None):
- # type: (str, int, int) -> str
- if not uid or not gid:
- (uid, gid) = extract_uid_gid()
+ # type: (str, Optional[int], Optional[int]) -> str
+ if uid is None or gid is None:
+ uid, gid = extract_uid_gid()
log_dir = get_log_dir(fsid)
makedirs(log_dir, uid, gid, LOG_DIR_MODE)
return log_dir
+
def make_var_run(fsid, uid, gid):
# type: (str, int, int) -> None
call_throws(['install', '-d', '-m0770', '-o', str(uid), '-g', str(gid),
'/var/run/ceph/%s' % fsid])
+
def copy_tree(src, dst, uid=None, gid=None):
- # type: (List[str], str, int, int) -> None
+ # type: (List[str], str, Optional[int], Optional[int]) -> None
"""
Copy a directory tree from src to dst
"""
- if not uid or not gid:
+ if uid is None or gid is None:
(uid, gid) = extract_uid_gid()
for src_dir in src:
def copy_files(src, dst, uid=None, gid=None):
- # type: (List[str], str, int, int) -> None
+ # type: (List[str], str, Optional[int], Optional[int]) -> None
"""
Copy a files from src to dst
"""
- if not uid or not gid:
+ if uid is None or gid is None:
(uid, gid) = extract_uid_gid()
for src_file in src:
logger.debug('chown %s:%s \'%s\'' % (uid, gid, dst_file))
os.chown(dst_file, uid, gid)
+
def move_files(src, dst, uid=None, gid=None):
- # type: (List[str], str, int, int) -> None
+ # type: (List[str], str, Optional[int], Optional[int]) -> None
"""
Move files from src to dst
"""
- if not uid or not gid:
+ if uid is None or gid is None:
(uid, gid) = extract_uid_gid()
for src_file in src:
logger.debug('chown %s:%s \'%s\'' % (uid, gid, dst_file))
os.chown(dst_file, uid, gid)
+
## copied from distutils ##
def find_executable(executable, path=None):
"""Tries to find 'executable' in the directories listed in 'path'.
return f
return None
+
def find_program(filename):
# type: (str) -> str
name = find_executable(filename)
raise ValueError('%s not found' % filename)
return name
+
def get_unit_name(fsid, daemon_type, daemon_id=None):
# type: (str, str, Optional[Union[int, str]]) -> str
# accept either name or type + id
else:
return 'ceph-%s@%s' % (fsid, daemon_type)
+
+def get_unit_name_by_daemon_name(fsid, name):
+ daemon = get_daemon_description(fsid, name)
+ try:
+ return daemon['systemd_unit']
+ except KeyError:
+ raise Error('Failed to get unit name for {}'.format(daemon))
+
+
def check_unit(unit_name):
# type: (str) -> Tuple[bool, str, bool]
# NOTE: we ignore the exit code here because systemctl outputs
installed = False
try:
out, err, code = call(['systemctl', 'is-enabled', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
if code == 0:
enabled = True
installed = True
state = 'unknown'
try:
out, err, code = call(['systemctl', 'is-active', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
out = out.strip()
if out in ['active']:
state = 'running'
state = 'unknown'
return (enabled, state, installed)
+
def check_units(units, enabler=None):
# type: (List[str], Optional[Packager]) -> bool
for u in units:
enabler.enable_service(u)
return False
+
def get_legacy_config_fsid(cluster, legacy_dir=None):
- # type: (str, str) -> Optional[str]
+ # type: (str, Optional[str]) -> Optional[str]
config_file = '/etc/ceph/%s.conf' % cluster
if legacy_dir is not None:
config_file = os.path.abspath(legacy_dir + config_file)
return config.get('global', 'fsid')
return None
+
def get_legacy_daemon_fsid(cluster, daemon_type, daemon_id, legacy_dir=None):
- # type: (str, str, Union[int, str], str) -> Optional[str]
+ # type: (str, str, Union[int, str], Optional[str]) -> Optional[str]
fsid = None
if daemon_type == 'osd':
try:
fsid = get_legacy_config_fsid(cluster, legacy_dir=legacy_dir)
return fsid
+
def get_daemon_args(fsid, daemon_type, daemon_id):
# type: (str, str, Union[int, str]) -> List[str]
r = list() # type: List[str]
peers = config.get('peers', list()) # type: ignore
for peer in peers:
r += ["--cluster.peer={}".format(peer)]
+ # some alertmanager, by default, look elsewhere for a config
+ r += ["--config.file=/etc/alertmanager/alertmanager.yml"]
elif daemon_type == NFSGanesha.daemon_type:
- r += NFSGanesha.daemon_args
+ nfs_ganesha = NFSGanesha.init(fsid, daemon_id)
+ r += nfs_ganesha.get_daemon_args()
+ elif daemon_type == CustomContainer.daemon_type:
+ cc = CustomContainer.init(fsid, daemon_id)
+ r.extend(cc.get_daemon_args())
return r
+
def create_daemon_dirs(fsid, daemon_type, daemon_id, uid, gid,
- config=None, keyring=None,
- reconfig=False):
- # type: (str, str, Union[int, str], int, int, Optional[str], Optional[str], Optional[bool]) -> None
+ config=None, keyring=None):
+ # type: (str, str, Union[int, str], int, int, Optional[str], Optional[str]) -> None
data_dir = make_data_dir(fsid, daemon_type, daemon_id, uid=uid, gid=gid)
make_log_dir(fsid, uid=uid, gid=gid)
os.fchown(f.fileno(), uid, gid)
os.fchmod(f.fileno(), 0o600)
f.write(config)
+
if keyring:
keyring_path = os.path.join(data_dir, 'keyring')
with open(keyring_path, 'w') as f:
f.write(keyring)
if daemon_type in Monitoring.components.keys():
- config = get_parm(args.config_json) # type: ignore
+ config_json: Dict[str, Any] = get_parm(args.config_json)
required_files = Monitoring.components[daemon_type].get('config-json-files', list())
# Set up directories specific to the monitoring component
makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755)
makedirs(os.path.join(data_dir_root, config_dir, 'data'), uid, gid, 0o755)
-
# populate the config directory for the component from the config-json
for fname in required_files:
- if 'files' in config: # type: ignore
- if isinstance(config['files'][fname], list): # type: ignore
- content = '\n'.join(config['files'][fname]) # type: ignore
- else:
- content = config['files'][fname] # type: ignore
-
+ if 'files' in config_json: # type: ignore
+ content = dict_get_join(config_json['files'], fname)
with open(os.path.join(data_dir_root, config_dir, fname), 'w') as f:
os.fchown(f.fileno(), uid, gid)
os.fchmod(f.fileno(), 0o600)
f.write(content)
- if daemon_type == NFSGanesha.daemon_type:
+ elif daemon_type == NFSGanesha.daemon_type:
nfs_ganesha = NFSGanesha.init(fsid, daemon_id)
nfs_ganesha.create_daemon_dirs(data_dir, uid, gid)
+ elif daemon_type == CephIscsi.daemon_type:
+ ceph_iscsi = CephIscsi.init(fsid, daemon_id)
+ ceph_iscsi.create_daemon_dirs(data_dir, uid, gid)
+
+ elif daemon_type == CustomContainer.daemon_type:
+ cc = CustomContainer.init(fsid, daemon_id)
+ cc.create_daemon_dirs(data_dir, uid, gid)
+
+
def get_parm(option):
# type: (str) -> Dict[str, str]
else:
return js
+
def get_config_and_keyring():
# type: () -> Tuple[Optional[str], Optional[str]]
config = None
with open(args.keyring, 'r') as f:
keyring = f.read()
- return (config, keyring)
+ return config, keyring
+
+
+def get_container_binds(fsid, daemon_type, daemon_id):
+ # type: (str, str, Union[int, str, None]) -> List[List[str]]
+ binds = list()
+
+ if daemon_type == CephIscsi.daemon_type:
+ binds.extend(CephIscsi.get_container_binds())
+ elif daemon_type == CustomContainer.daemon_type:
+ assert daemon_id
+ cc = CustomContainer.init(fsid, daemon_id)
+ data_dir = get_data_dir(fsid, daemon_type, daemon_id)
+ binds.extend(cc.get_container_binds(data_dir))
+
+ return binds
+
def get_container_mounts(fsid, daemon_type, daemon_id,
no_config=False):
mounts['/run/lvm'] = '/run/lvm'
mounts['/run/lock/lvm'] = '/run/lock/lvm'
+ try:
+ if args.shared_ceph_folder: # make easy manager modules/ceph-volume development
+ ceph_folder = pathify(args.shared_ceph_folder)
+ if os.path.exists(ceph_folder):
+ mounts[ceph_folder + '/src/ceph-volume/ceph_volume'] = '/usr/lib/python3.6/site-packages/ceph_volume'
+ mounts[ceph_folder + '/src/pybind/mgr'] = '/usr/share/ceph/mgr'
+ mounts[ceph_folder + '/src/python-common/ceph'] = '/usr/lib/python3.6/site-packages/ceph'
+ mounts[ceph_folder + '/monitoring/grafana/dashboards'] = '/etc/grafana/dashboards/ceph-dashboard'
+ mounts[ceph_folder + '/monitoring/prometheus/alerts'] = '/etc/prometheus/ceph'
+ else:
+ logger.error('{}{}{}'.format(termcolor.red,
+ 'Ceph shared source folder does not exist.',
+ termcolor.end))
+ except AttributeError:
+ pass
+
if daemon_type in Monitoring.components and daemon_id:
data_dir = get_data_dir(fsid, daemon_type, daemon_id)
if daemon_type == 'prometheus':
mounts[os.path.join(data_dir, 'etc/grafana/provisioning/datasources')] = '/etc/grafana/provisioning/datasources:Z'
mounts[os.path.join(data_dir, 'etc/grafana/certs')] = '/etc/grafana/certs:Z'
elif daemon_type == 'alertmanager':
- mounts[os.path.join(data_dir, 'etc/alertmanager')] = '/alertmanager:Z'
+ mounts[os.path.join(data_dir, 'etc/alertmanager')] = '/etc/alertmanager:Z'
if daemon_type == NFSGanesha.daemon_type:
assert daemon_id
data_dir = get_data_dir(fsid, daemon_type, daemon_id)
- mounts.update(NFSGanesha.get_container_mounts(data_dir))
+ nfs_ganesha = NFSGanesha.init(fsid, daemon_id)
+ mounts.update(nfs_ganesha.get_container_mounts(data_dir))
+
+ if daemon_type == CephIscsi.daemon_type:
+ assert daemon_id
+ data_dir = get_data_dir(fsid, daemon_type, daemon_id)
+ log_dir = get_log_dir(fsid)
+ mounts.update(CephIscsi.get_container_mounts(data_dir, log_dir))
+
+ if daemon_type == CustomContainer.daemon_type:
+ assert daemon_id
+ cc = CustomContainer.init(fsid, daemon_id)
+ data_dir = get_data_dir(fsid, daemon_type, daemon_id)
+ mounts.update(cc.get_container_mounts(data_dir))
return mounts
-def get_container(fsid, daemon_type, daemon_id,
- privileged=False,
- ptrace=False,
- container_args=[]):
- # type: (str, str, Union[int, str], bool, bool, List[str]) -> CephContainer
+
+def get_container(fsid: str, daemon_type: str, daemon_id: Union[int, str],
+ privileged: bool = False,
+ ptrace: bool = False,
+ container_args: Optional[List[str]] = None) -> 'CephContainer':
+ entrypoint: str = ''
+ name: str = ''
+ ceph_args: List[str] = []
+ envs: List[str] = []
+ host_network: bool = True
+
+ if container_args is None:
+ container_args = []
if daemon_type in ['mon', 'osd']:
# mon and osd need privileged in order for libudev to query devices
privileged = True
name = '%s.%s' % (daemon_type, daemon_id)
elif daemon_type in Monitoring.components:
entrypoint = ''
- name = ''
elif daemon_type == NFSGanesha.daemon_type:
entrypoint = NFSGanesha.entrypoint
name = '%s.%s' % (daemon_type, daemon_id)
- else:
- entrypoint = ''
- name = ''
+ envs.extend(NFSGanesha.get_container_envs())
+ elif daemon_type == CephIscsi.daemon_type:
+ entrypoint = CephIscsi.entrypoint
+ name = '%s.%s' % (daemon_type, daemon_id)
+ # So the container can modprobe iscsi_target_mod and have write perms
+ # to configfs we need to make this a privileged container.
+ privileged = True
+ elif daemon_type == CustomContainer.daemon_type:
+ cc = CustomContainer.init(fsid, daemon_id)
+ entrypoint = cc.entrypoint
+ host_network = False
+ envs.extend(cc.get_container_envs())
+ container_args.extend(cc.get_container_args())
- ceph_args = [] # type: List[str]
if daemon_type in Monitoring.components:
uid, gid = extract_uid_gid_monitoring(daemon_type)
- m = Monitoring.components[daemon_type] # type: ignore
- metadata = m.get('image', dict()) # type: ignore
monitoring_args = [
'--user',
str(uid),
# FIXME: disable cpu/memory limits for the time being (not supported
# by ubuntu 18.04 kernel!)
- #'--cpus',
- #metadata.get('cpus', '2'),
- #'--memory',
- #metadata.get('memory', '4GB')
]
container_args.extend(monitoring_args)
elif daemon_type == 'crash':
elif daemon_type in Ceph.daemons:
ceph_args = ['-n', name, '-f']
- envs=[] # type: List[str]
- if daemon_type == NFSGanesha.daemon_type:
- envs.extend(NFSGanesha.get_container_envs())
+ # if using podman, set -d, --conmon-pidfile & --cidfile flags
+ # so service can have Type=Forking
+ if 'podman' in container_path:
+ runtime_dir = '/run'
+ container_args.extend(['-d',
+ '--conmon-pidfile',
+ runtime_dir + '/ceph-%s@%s.%s.service-pid' % (fsid, daemon_type, daemon_id),
+ '--cidfile',
+ runtime_dir + '/ceph-%s@%s.%s.service-cid' % (fsid, daemon_type, daemon_id)])
return CephContainer(
image=args.image,
args=ceph_args + get_daemon_args(fsid, daemon_type, daemon_id),
container_args=container_args,
volume_mounts=get_container_mounts(fsid, daemon_type, daemon_id),
+ bind_mounts=get_container_binds(fsid, daemon_type, daemon_id),
cname='ceph-%s-%s.%s' % (fsid, daemon_type, daemon_id),
envs=envs,
privileged=privileged,
ptrace=ptrace,
+ init=args.container_init,
+ host_network=host_network,
)
+
def extract_uid_gid(img='', file_path='/var/lib/ceph'):
- # type: (str, str) -> Tuple[int, int]
+ # type: (str, Union[str, List[str]]) -> Tuple[int, int]
if not img:
img = args.image
- out = CephContainer(
- image=img,
- entrypoint='stat',
- args=['-c', '%u %g', file_path]
- ).run()
- (uid, gid) = out.split(' ')
- return (int(uid), int(gid))
+ if isinstance(file_path, str):
+ paths = [file_path]
+ else:
+ paths = file_path
+
+ for fp in paths:
+ try:
+ out = CephContainer(
+ image=img,
+ entrypoint='stat',
+ args=['-c', '%u %g', fp]
+ ).run()
+ uid, gid = out.split(' ')
+ return int(uid), int(gid)
+ except RuntimeError:
+ pass
+ raise RuntimeError('uid/gid not found')
+
def deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid,
config=None, keyring=None,
osd_fsid=None,
- reconfig=False):
- # type: (str, str, Union[int, str], CephContainer, int, int, Optional[str], Optional[str], Optional[str], Optional[bool]) -> None
+ reconfig=False,
+ ports=None):
+ # type: (str, str, Union[int, str], CephContainer, int, int, Optional[str], Optional[str], Optional[str], Optional[bool], Optional[List[int]]) -> None
+
+ ports = ports or []
+ if any([port_in_use(port) for port in ports]):
+ raise Error("TCP Port(s) '{}' required for {} already in use".format(",".join(map(str, ports)), daemon_type))
+
data_dir = get_data_dir(fsid, daemon_type, daemon_id)
if reconfig and not os.path.exists(data_dir):
raise Error('cannot reconfig, data path %s does not exist' % data_dir)
update_firewalld(daemon_type)
+ # Open ports explicitly required for the daemon
+ if ports:
+ fw = Firewalld()
+ fw.open_ports(ports)
+ fw.apply_rules()
+
if reconfig and daemon_type not in Ceph.daemons:
# ceph daemons do not need a restart; others (presumably) do to pick
# up the new config
call_throws(['systemctl', 'restart',
get_unit_name(fsid, daemon_type, daemon_id)])
+def _write_container_cmd_to_bash(file_obj, container, comment=None, background=False):
+ # type: (IO[str], CephContainer, Optional[str], Optional[bool]) -> None
+ if comment:
+ # Sometimes adding a comment, especially if there are multiple containers in one
+ # unit file, makes it easier to read and grok.
+ file_obj.write('# ' + comment + '\n')
+ # Sometimes, adding `--rm` to a run_cmd doesn't work. Let's remove the container manually
+ file_obj.write('! '+ ' '.join(container.rm_cmd()) + ' 2> /dev/null\n')
+ # Sometimes, `podman rm` doesn't find the container. Then you'll have to add `--storage`
+ if 'podman' in container_path:
+ file_obj.write('! '+ ' '.join(container.rm_cmd(storage=True)) + ' 2> /dev/null\n')
+
+ # container run command
+ file_obj.write(' '.join(container.run_cmd()) + (' &' if background else '') + '\n')
+
+
def deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c,
enable=True, start=True,
osd_fsid=None):
# cmd
data_dir = get_data_dir(fsid, daemon_type, daemon_id)
with open(data_dir + '/unit.run.new', 'w') as f:
+ f.write('set -e\n')
+
+ if daemon_type in Ceph.daemons:
+ install_path = find_program('install')
+ f.write('{install_path} -d -m0770 -o {uid} -g {gid} /var/run/ceph/{fsid}\n'.format(install_path=install_path, fsid=fsid, uid=uid, gid=gid))
+
# pre-start cmd(s)
if daemon_type == 'osd':
# osds have a pre-start step
assert osd_fsid
- prestart = CephContainer(
- image=args.image,
- entrypoint='/usr/sbin/ceph-volume',
- args=[
- 'lvm', 'activate',
- str(daemon_id), osd_fsid,
- '--no-systemd'
- ],
- privileged=True,
- volume_mounts=get_container_mounts(fsid, daemon_type, daemon_id),
- cname='ceph-%s-%s.%s-activate' % (fsid, daemon_type, daemon_id),
- )
- f.write(' '.join(prestart.run_cmd()) + '\n')
+ simple_fn = os.path.join('/etc/ceph/osd',
+ '%s-%s.json.adopted-by-cephadm' % (daemon_id, osd_fsid))
+ if os.path.exists(simple_fn):
+ f.write('# Simple OSDs need chown on startup:\n')
+ for n in ['block', 'block.db', 'block.wal']:
+ p = os.path.join(data_dir, n)
+ f.write('[ ! -L {p} ] || chown {uid}:{gid} {p}\n'.format(p=p, uid=uid, gid=gid))
+ else:
+ prestart = CephContainer(
+ image=args.image,
+ entrypoint='/usr/sbin/ceph-volume',
+ args=[
+ 'lvm', 'activate',
+ str(daemon_id), osd_fsid,
+ '--no-systemd'
+ ],
+ privileged=True,
+ volume_mounts=get_container_mounts(fsid, daemon_type, daemon_id),
+ bind_mounts=get_container_binds(fsid, daemon_type, daemon_id),
+ cname='ceph-%s-%s.%s-activate' % (fsid, daemon_type, daemon_id),
+ )
+ _write_container_cmd_to_bash(f, prestart, 'LVM OSDs use ceph-volume lvm activate')
elif daemon_type == NFSGanesha.daemon_type:
# add nfs to the rados grace db
nfs_ganesha = NFSGanesha.init(fsid, daemon_id)
prestart = nfs_ganesha.get_rados_grace_container('add')
- f.write(' '.join(prestart.run_cmd()) + '\n')
-
- # container run command
- f.write(' '.join(c.run_cmd()) + '\n')
+ _write_container_cmd_to_bash(f, prestart, 'add daemon to rados grace')
+ elif daemon_type == CephIscsi.daemon_type:
+ f.write(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=True)) + '\n')
+ ceph_iscsi = CephIscsi.init(fsid, daemon_id)
+ tcmu_container = ceph_iscsi.get_tcmu_runner_container()
+ _write_container_cmd_to_bash(f, tcmu_container, 'iscsi tcmu-runnter container', background=True)
+
+ _write_container_cmd_to_bash(f, c, '%s.%s' % (daemon_type, str(daemon_id)))
os.fchmod(f.fileno(), 0o600)
os.rename(data_dir + '/unit.run.new',
data_dir + '/unit.run')
],
privileged=True,
volume_mounts=get_container_mounts(fsid, daemon_type, daemon_id),
+ bind_mounts=get_container_binds(fsid, daemon_type, daemon_id),
cname='ceph-%s-%s.%s-deactivate' % (fsid, daemon_type,
daemon_id),
)
- f.write(' '.join(poststop.run_cmd()) + '\n')
+ _write_container_cmd_to_bash(f, poststop, 'deactivate osd')
elif daemon_type == NFSGanesha.daemon_type:
# remove nfs from the rados grace db
nfs_ganesha = NFSGanesha.init(fsid, daemon_id)
poststop = nfs_ganesha.get_rados_grace_container('remove')
- f.write(' '.join(poststop.run_cmd()) + '\n')
+ _write_container_cmd_to_bash(f, poststop, 'remove daemon from rados grace')
+ elif daemon_type == CephIscsi.daemon_type:
+ # make sure we also stop the tcmu container
+ ceph_iscsi = CephIscsi.init(fsid, daemon_id)
+ tcmu_container = ceph_iscsi.get_tcmu_runner_container()
+ f.write('! '+ ' '.join(tcmu_container.stop_cmd()) + '\n')
+ f.write(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=False)) + '\n')
os.fchmod(f.fileno(), 0o600)
os.rename(data_dir + '/unit.poststop.new',
data_dir + '/unit.poststop')
# systemd
install_base_units(fsid)
- unit = get_unit_file(fsid, uid, gid)
+ unit = get_unit_file(fsid)
unit_file = 'ceph-%s@.service' % (fsid)
with open(args.unit_dir + '/' + unit_file + '.new', 'w') as f:
f.write(unit)
unit_name = get_unit_name(fsid, daemon_type, daemon_id)
call(['systemctl', 'stop', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
call(['systemctl', 'reset-failed', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
if enable:
call_throws(['systemctl', 'enable', unit_name])
if start:
call_throws(['systemctl', 'start', unit_name])
-def update_firewalld(daemon_type):
- # type: (str) -> None
- if args.skip_firewalld:
- return
- cmd = find_executable('firewall-cmd')
- if not cmd:
- logger.debug('firewalld does not appear to be present')
- return
- (enabled, state, _) = check_unit('firewalld.service')
- if not enabled:
- logger.debug('firewalld.service is not enabled')
- return
-
- fw_services = []
- fw_ports = []
- if daemon_type == 'mon':
- fw_services.append('ceph-mon')
- elif daemon_type in ['mgr', 'mds', 'osd']:
- fw_services.append('ceph')
- if daemon_type == 'mgr':
- fw_ports.append(8080) # dashboard
- fw_ports.append(8443) # dashboard
- fw_ports.append(9283) # mgr/prometheus exporter
- elif daemon_type in Monitoring.port_map.keys():
- fw_ports.extend(Monitoring.port_map[daemon_type]) # prometheus etc
- elif daemon_type == NFSGanesha.daemon_type:
- fw_services.append('nfs')
- for svc in fw_services:
- out, err, ret = call([cmd, '--permanent', '--query-service', svc])
+
+class Firewalld(object):
+ def __init__(self):
+ # type: () -> None
+ self.available = self.check()
+
+ def check(self):
+ # type: () -> bool
+ self.cmd = find_executable('firewall-cmd')
+ if not self.cmd:
+ logger.debug('firewalld does not appear to be present')
+ return False
+ (enabled, state, _) = check_unit('firewalld.service')
+ if not enabled:
+ logger.debug('firewalld.service is not enabled')
+ return False
+ if state != "running":
+ logger.debug('firewalld.service is not running')
+ return False
+
+ logger.info("firewalld ready")
+ return True
+
+ def enable_service_for(self, daemon_type):
+ # type: (str) -> None
+ if not self.available:
+ logger.debug('Not possible to enable service <%s>. firewalld.service is not available' % daemon_type)
+ return
+
+ if daemon_type == 'mon':
+ svc = 'ceph-mon'
+ elif daemon_type in ['mgr', 'mds', 'osd']:
+ svc = 'ceph'
+ elif daemon_type == NFSGanesha.daemon_type:
+ svc = 'nfs'
+ else:
+ return
+
+ out, err, ret = call([self.cmd, '--permanent', '--query-service', svc], verbosity=CallVerbosity.DEBUG)
if ret:
logger.info('Enabling firewalld service %s in current zone...' % svc)
- out, err, ret = call([cmd, '--permanent', '--add-service', svc])
+ out, err, ret = call([self.cmd, '--permanent', '--add-service', svc])
if ret:
raise RuntimeError(
'unable to add service %s to current zone: %s' % (svc, err))
else:
logger.debug('firewalld service %s is enabled in current zone' % svc)
- for port in fw_ports:
- tcp_port = str(port) + '/tcp'
- out, err, ret = call([cmd, '--permanent', '--query-port', tcp_port])
- if ret:
- logger.info('Enabling firewalld port %s in current zone...' % tcp_port)
- out, err, ret = call([cmd, '--permanent', '--add-port', tcp_port])
+
+ def open_ports(self, fw_ports):
+ # type: (List[int]) -> None
+ if not self.available:
+ logger.debug('Not possible to open ports <%s>. firewalld.service is not available' % fw_ports)
+ return
+
+ for port in fw_ports:
+ tcp_port = str(port) + '/tcp'
+ out, err, ret = call([self.cmd, '--permanent', '--query-port', tcp_port], verbosity=CallVerbosity.DEBUG)
if ret:
- raise RuntimeError('unable to add port %s to current zone: %s' %
- (tcp_port, err))
- else:
- logger.debug('firewalld port %s is enabled in current zone' % tcp_port)
- call_throws([cmd, '--reload'])
+ logger.info('Enabling firewalld port %s in current zone...' % tcp_port)
+ out, err, ret = call([self.cmd, '--permanent', '--add-port', tcp_port])
+ if ret:
+ raise RuntimeError('unable to add port %s to current zone: %s' %
+ (tcp_port, err))
+ else:
+ logger.debug('firewalld port %s is enabled in current zone' % tcp_port)
+
+ def apply_rules(self):
+ # type: () -> None
+ if not self.available:
+ return
+
+ call_throws([self.cmd, '--reload'])
+
+
+def update_firewalld(daemon_type):
+ # type: (str) -> None
+ firewall = Firewalld()
+
+ firewall.enable_service_for(daemon_type)
+
+ fw_ports = []
+
+ if daemon_type in Monitoring.port_map.keys():
+ fw_ports.extend(Monitoring.port_map[daemon_type]) # prometheus etc
+
+ firewall.open_ports(fw_ports)
+ firewall.apply_rules()
def install_base_units(fsid):
# type: (str) -> None
}
""" % fsid)
-def get_unit_file(fsid, uid, gid):
- # type: (str, int, int) -> str
- install_path = find_program('install')
+
+def get_unit_file(fsid):
+ # type: (str) -> str
+ extra_args = ''
+ if 'podman' in container_path:
+ extra_args = ('ExecStartPre=-/bin/rm -f /%t/%n-pid /%t/%n-cid\n'
+ 'ExecStopPost=-/bin/rm -f /%t/%n-pid /%t/%n-cid\n'
+ 'Type=forking\n'
+ 'PIDFile=/%t/%n-pid\n')
+
u = """# generated by cephadm
[Unit]
Description=Ceph %i for {fsid}
LimitNOFILE=1048576
LimitNPROC=1048576
EnvironmentFile=-/etc/environment
-ExecStartPre=-{container_path} rm ceph-{fsid}-%i
-ExecStartPre=-{install_path} -d -m0770 -o {uid} -g {gid} /var/run/ceph/{fsid}
ExecStart=/bin/bash {data_dir}/{fsid}/%i/unit.run
ExecStop=-{container_path} stop ceph-{fsid}-%i
ExecStopPost=-/bin/bash {data_dir}/{fsid}/%i/unit.poststop
Restart=on-failure
RestartSec=10s
TimeoutStartSec=120
-TimeoutStopSec=15
+TimeoutStopSec=120
StartLimitInterval=30min
StartLimitBurst=5
-
+{extra_args}
[Install]
WantedBy=ceph-{fsid}.target
""".format(
container_path=container_path,
- install_path=install_path,
fsid=fsid,
- uid=uid,
- gid=gid,
- data_dir=args.data_dir)
+ data_dir=args.data_dir,
+ extra_args=extra_args)
+
return u
##################################
+
class CephContainer:
def __init__(self,
- image,
- entrypoint,
- args=[],
- volume_mounts={},
- cname='',
- container_args=[],
- envs=None,
- privileged=False,
- ptrace=False):
- # type: (str, str, List[str], Dict[str, str], str, List[str], Optional[List[str]], bool, bool) -> None
+ image: str,
+ entrypoint: str,
+ args: List[str] = [],
+ volume_mounts: Dict[str, str] = {},
+ cname: str = '',
+ container_args: List[str] = [],
+ envs: Optional[List[str]] = None,
+ privileged: bool = False,
+ ptrace: bool = False,
+ bind_mounts: Optional[List[List[str]]] = None,
+ init: bool = False,
+ host_network: bool = True,
+ ) -> None:
self.image = image
self.entrypoint = entrypoint
self.args = args
self.envs = envs
self.privileged = privileged
self.ptrace = ptrace
+ self.bind_mounts = bind_mounts if bind_mounts else []
+ self.init = init
+ self.host_network = host_network
- def run_cmd(self):
- # type: () -> List[str]
- vols = [] # type: List[str]
- envs = [] # type: List[str]
- cname = [] # type: List[str]
- entrypoint = [] # type: List[str]
- if self.entrypoint:
- entrypoint = ['--entrypoint', self.entrypoint]
+ def run_cmd(self) -> List[str]:
+ cmd_args: List[str] = [
+ str(container_path),
+ 'run',
+ '--rm',
+ '--ipc=host',
+ ]
+ envs: List[str] = [
+ '-e', 'CONTAINER_IMAGE=%s' % self.image,
+ '-e', 'NODE_NAME=%s' % get_hostname(),
+ ]
+ vols: List[str] = []
+ binds: List[str] = []
- priv = [] # type: List[str]
+ if self.host_network:
+ cmd_args.append('--net=host')
+ if self.entrypoint:
+ cmd_args.extend(['--entrypoint', self.entrypoint])
if self.privileged:
- priv = ['--privileged',
- # let OSD etc read block devs that haven't been chowned
- '--group-add=disk']
- if self.ptrace:
- priv.append('--cap-add=SYS_PTRACE')
+ cmd_args.extend([
+ '--privileged',
+ # let OSD etc read block devs that haven't been chowned
+ '--group-add=disk'])
+ if self.ptrace and not self.privileged:
+ # if privileged, the SYS_PTRACE cap is already added
+ # in addition, --cap-add and --privileged are mutually
+ # exclusive since podman >= 2.0
+ cmd_args.append('--cap-add=SYS_PTRACE')
+ if self.init:
+ cmd_args.append('--init')
+ if self.cname:
+ cmd_args.extend(['--name', self.cname])
+ if self.envs:
+ for env in self.envs:
+ envs.extend(['-e', env])
+
vols = sum(
[['-v', '%s:%s' % (host_dir, container_dir)]
for host_dir, container_dir in self.volume_mounts.items()], [])
- envs = [
- '-e', 'CONTAINER_IMAGE=%s' % self.image,
- '-e', 'NODE_NAME=%s' % get_hostname(),
- ]
- if self.envs:
- for e in self.envs:
- envs.extend(['-e', e])
- cname = ['--name', self.cname] if self.cname else []
- return [
+ binds = sum([['--mount', '{}'.format(','.join(bind))]
+ for bind in self.bind_mounts], [])
+
+ return cmd_args + self.container_args + envs + vols + binds + [
+ self.image,
+ ] + self.args # type: ignore
+
+ def shell_cmd(self, cmd: List[str]) -> List[str]:
+ cmd_args: List[str] = [
str(container_path),
'run',
'--rm',
- '--net=host',
- ] + self.container_args + priv + \
- cname + envs + \
- vols + entrypoint + \
- [
- self.image
- ] + self.args # type: ignore
+ '--ipc=host',
+ ]
+ envs: List[str] = [
+ '-e', 'CONTAINER_IMAGE=%s' % self.image,
+ '-e', 'NODE_NAME=%s' % get_hostname(),
+ ]
+ vols: List[str] = []
+ binds: List[str] = []
- def shell_cmd(self, cmd):
- # type: (List[str]) -> List[str]
- priv = [] # type: List[str]
+ if self.host_network:
+ cmd_args.append('--net=host')
if self.privileged:
- priv = ['--privileged',
- # let OSD etc read block devs that haven't been chowned
- '--group-add=disk']
- vols = [] # type: List[str]
+ cmd_args.extend([
+ '--privileged',
+ # let OSD etc read block devs that haven't been chowned
+ '--group-add=disk',
+ ])
+ if self.envs:
+ for env in self.envs:
+ envs.extend(['-e', env])
+
vols = sum(
[['-v', '%s:%s' % (host_dir, container_dir)]
for host_dir, container_dir in self.volume_mounts.items()], [])
- envs = [
- '-e', 'CONTAINER_IMAGE=%s' % self.image,
- '-e', 'NODE_NAME=%s' % get_hostname(),
- ]
- if self.envs:
- for e in self.envs:
- envs.extend(['-e', e])
- cmd_args = [] # type: List[str]
- if cmd:
- cmd_args = ['-c'] + cmd
- return [
- str(container_path),
- 'run',
- '--rm',
- '--net=host',
- ] + self.container_args + priv + envs + vols + [
+ binds = sum([['--mount', '{}'.format(','.join(bind))]
+ for bind in self.bind_mounts], [])
+
+ return cmd_args + self.container_args + envs + vols + binds + [
'--entrypoint', cmd[0],
- self.image
+ self.image,
] + cmd[1:]
def exec_cmd(self, cmd):
self.cname,
] + cmd
+ def rm_cmd(self, storage=False):
+ # type: (bool) -> List[str]
+ ret = [
+ str(container_path),
+ 'rm', '-f',
+ ]
+ if storage:
+ ret.append('--storage')
+ ret.append(self.cname)
+ return ret
+
+ def stop_cmd(self):
+ # type () -> List[str]
+ ret = [
+ str(container_path),
+ 'stop', self.cname,
+ ]
+ return ret
+
def run(self, timeout=DEFAULT_TIMEOUT):
# type: (Optional[int]) -> str
- logger.debug(self.run_cmd())
out, _, _ = call_throws(
self.run_cmd(), desc=self.entrypoint, timeout=timeout)
return out
##################################
+
@infer_image
def command_version():
# type: () -> int
##################################
+
@infer_image
def command_pull():
# type: () -> int
- logger.info('Pulling latest %s...' % args.image)
- call_throws([container_path, 'pull', args.image])
+
+ _pull_image(args.image)
return command_inspect_image()
+
+def _pull_image(image):
+ # type: (str) -> None
+ logger.info('Pulling container image %s...' % image)
+
+ ignorelist = [
+ "error creating read-write layer with ID",
+ "net/http: TLS handshake timeout",
+ "Digest did not match, expected",
+ ]
+
+ cmd = [container_path, 'pull', image]
+ cmd_str = ' '.join(cmd)
+
+ for sleep_secs in [1, 4, 25]:
+ out, err, ret = call(cmd)
+ if not ret:
+ return
+
+ if not any(pattern in err for pattern in ignorelist):
+ raise RuntimeError('Failed command: %s' % cmd_str)
+
+ logger.info('"%s failed transiently. Retrying. waiting %s seconds...' % (cmd_str, sleep_secs))
+ time.sleep(sleep_secs)
+
+ raise RuntimeError('Failed command: %s: maximum retries reached' % cmd_str)
##################################
+
@infer_image
def command_inspect_image():
# type: () -> int
out, err, ret = call_throws([
container_path, 'inspect',
- '--format', '{{.Id}}',
+ '--format', '{{.ID}},{{.RepoDigests}}',
args.image])
if ret:
return errno.ENOENT
- image_id = normalize_container_id(out.strip())
+ info_from = get_image_info_from_inspect(out.strip(), args.image)
+
ver = CephContainer(args.image, 'ceph', ['--version']).run().strip()
+ info_from['ceph_version'] = ver
+
+ print(json.dumps(info_from, indent=4, sort_keys=True))
+ return 0
+
+
+def get_image_info_from_inspect(out, image):
+ # type: (str, str) -> Dict[str, str]
+ image_id, digests = out.split(',', 1)
+ if not out:
+ raise Error('inspect {}: empty result'.format(image))
r = {
- 'image_id': image_id,
- 'ceph_version': ver,
+ 'image_id': normalize_container_id(image_id)
}
- print(json.dumps(r, indent=4, sort_keys=True))
- return 0
+ if digests:
+ json_digests = digests[1:-1].split(' ')
+ if json_digests:
+ r['repo_digest'] = json_digests[0]
+ return r
+
##################################
+
+def unwrap_ipv6(address):
+ # type: (str) -> str
+ if address.startswith('[') and address.endswith(']'):
+ return address[1:-1]
+ return address
+
+
+def wrap_ipv6(address):
+ # type: (str) -> str
+
+ # We cannot assume it's already wrapped or even an IPv6 address if
+ # it's already wrapped it'll not pass (like if it's a hostname) and trigger
+ # the ValueError
+ try:
+ if ipaddress.ip_address(unicode(address)).version == 6:
+ return f"[{address}]"
+ except ValueError:
+ pass
+
+ return address
+
+
+def is_ipv6(address):
+ # type: (str) -> bool
+ address = unwrap_ipv6(address)
+ try:
+ return ipaddress.ip_address(unicode(address)).version == 6
+ except ValueError:
+ logger.warning("Address: {} isn't a valid IP address".format(address))
+ return False
+
+
@default_image
def command_bootstrap():
# type: () -> int
'--allow-overwrite to overwrite' % f)
dirname = os.path.dirname(f)
if dirname and not os.path.exists(dirname):
- raise Error('%s directory %s does not exist' % (f, dirname))
+ fname = os.path.basename(f)
+ logger.info(f"Creating directory {dirname} for {fname}")
+ try:
+ # use makedirs to create intermediate missing dirs
+ os.makedirs(dirname, 0o755)
+ except PermissionError:
+ raise Error(f"Unable to create {dirname} due to permissions failure. Retry with root, or sudo or preallocate the directory.")
+
if not args.skip_prepare_host:
command_prepare_host()
raise Error('hostname is a fully qualified domain name (%s); either fix (e.g., "sudo hostname %s" or similar) or pass --allow-fqdn-hostname' % (hostname, hostname.split('.')[0]))
mon_id = args.mon_id or hostname
mgr_id = args.mgr_id or generate_service_id()
- logging.info('Cluster fsid: %s' % fsid)
+ logger.info('Cluster fsid: %s' % fsid)
+ ipv6 = False
l = FileLock(fsid)
l.acquire()
# ip
r = re.compile(r':(\d+)$')
- base_ip = None
+ base_ip = ''
if args.mon_ip:
+ ipv6 = is_ipv6(args.mon_ip)
+ if ipv6:
+ args.mon_ip = wrap_ipv6(args.mon_ip)
hasport = r.findall(args.mon_ip)
if hasport:
port = int(hasport[0])
if addr_arg[0] != '[' or addr_arg[-1] != ']':
raise Error('--mon-addrv value %s must use square backets' %
addr_arg)
+ ipv6 = addr_arg.count('[') > 1
for addr in addr_arg[1:-1].split(','):
hasport = r.findall(addr)
if not hasport:
# make sure IP is configured locally, and then figure out the
# CIDR network
for net, ips in list_networks().items():
- if base_ip in ips:
+ if ipaddress.ip_address(unicode(unwrap_ipv6(base_ip))) in \
+ [ipaddress.ip_address(unicode(ip)) for ip in ips]:
mon_network = net
logger.info('Mon IP %s is in CIDR network %s' % (base_ip,
mon_network))
cp.write(cpf)
config = cpf.getvalue()
+ if args.registry_json or args.registry_url:
+ command_registry_login()
+
if not args.skip_pull:
- logger.info('Pulling latest %s container...' % args.image)
- call_throws([container_path, 'pull', args.image])
+ _pull_image(args.image)
logger.info('Extracting ceph user uid/gid from container image...')
(uid, gid) = extract_uid_gid()
# wait for the service to become available
def is_mon_available():
# type: () -> bool
- timeout=args.timeout if args.timeout else 30 # seconds
+ timeout=args.timeout if args.timeout else 60 # seconds
out, err, ret = call(c.run_cmd(),
desc=c.entrypoint,
timeout=timeout)
logger.info('Setting mon public_network...')
cli(['config', 'set', 'mon', 'public_network', mon_network])
+ if ipv6:
+ logger.info('Enabling IPv6 (ms_bind_ipv6)')
+ cli(['config', 'set', 'global', 'ms_bind_ipv6', 'true'])
+
# create mgr
logger.info('Creating mgr...')
mgr_keyring = '[mgr.%s]\n\tkey = %s\n' % (mgr_id, mgr_key)
mgr_c = get_container(fsid, 'mgr', mgr_id)
+ # Note:the default port used by the Prometheus node exporter is opened in fw
deploy_daemon(fsid, 'mgr', mgr_id, mgr_c, uid, gid,
- config=config, keyring=mgr_keyring)
+ config=config, keyring=mgr_keyring, ports=[9283])
# output files
with open(args.output_keyring, 'w') as f:
logger.info('Waiting for mgr to start...')
def is_mgr_available():
# type: () -> bool
- timeout=args.timeout if args.timeout else 30 # seconds
- out = cli(['status', '-f', 'json-pretty'], timeout=timeout)
- j = json.loads(out)
- return j.get('mgrmap', {}).get('available', False)
+ timeout=args.timeout if args.timeout else 60 # seconds
+ try:
+ out = cli(['status', '-f', 'json-pretty'], timeout=timeout)
+ j = json.loads(out)
+ return j.get('mgrmap', {}).get('available', False)
+ except Exception as e:
+ logger.debug('status failed: %s' % e)
+ return False
is_available('mgr', is_mgr_available)
# wait for mgr to restart (after enabling a module)
# ssh
if not args.skip_ssh:
+ cli(['config-key', 'set', 'mgr/cephadm/ssh_user', args.ssh_user])
+
logger.info('Enabling cephadm module...')
cli(['mgr', 'module', 'enable', 'cephadm'])
wait_for_mgr_restart()
logger.info('Setting orchestrator backend to cephadm...')
cli(['orch', 'set', 'backend', 'cephadm'])
- logger.info('Generating ssh key...')
- cli(['cephadm', 'generate-key'])
- ssh_pub = cli(['cephadm', 'get-pub-key'])
-
- with open(args.output_pub_ssh_key, 'w') as f:
- f.write(ssh_pub)
- logger.info('Wrote public SSH key to to %s' % args.output_pub_ssh_key)
-
- logger.info('Adding key to root@localhost\'s authorized_keys...')
- if not os.path.exists('/root/.ssh'):
- os.mkdir('/root/.ssh', 0o700)
- auth_keys_file = '/root/.ssh/authorized_keys'
- add_newline = False
- if os.path.exists(auth_keys_file):
- with open(auth_keys_file, 'r') as f:
- f.seek(0, os.SEEK_END)
- if f.tell() > 0:
- f.seek(f.tell()-1, os.SEEK_SET) # go to last char
- if f.read() != '\n':
- add_newline = True
- with open(auth_keys_file, 'a') as f:
- os.fchmod(f.fileno(), 0o600) # just in case we created it
- if add_newline:
- f.write('\n')
- f.write(ssh_pub.strip() + '\n')
+ if args.ssh_config:
+ logger.info('Using provided ssh config...')
+ mounts = {
+ pathify(args.ssh_config.name): '/tmp/cephadm-ssh-config:z',
+ }
+ cli(['cephadm', 'set-ssh-config', '-i', '/tmp/cephadm-ssh-config'], extra_mounts=mounts)
+
+ if args.ssh_private_key and args.ssh_public_key:
+ logger.info('Using provided ssh keys...')
+ mounts = {
+ pathify(args.ssh_private_key.name): '/tmp/cephadm-ssh-key:z',
+ pathify(args.ssh_public_key.name): '/tmp/cephadm-ssh-key.pub:z'
+ }
+ cli(['cephadm', 'set-priv-key', '-i', '/tmp/cephadm-ssh-key'], extra_mounts=mounts)
+ cli(['cephadm', 'set-pub-key', '-i', '/tmp/cephadm-ssh-key.pub'], extra_mounts=mounts)
+ else:
+ logger.info('Generating ssh key...')
+ cli(['cephadm', 'generate-key'])
+ ssh_pub = cli(['cephadm', 'get-pub-key'])
+
+ with open(args.output_pub_ssh_key, 'w') as f:
+ f.write(ssh_pub)
+ logger.info('Wrote public SSH key to to %s' % args.output_pub_ssh_key)
+
+ logger.info('Adding key to %s@localhost\'s authorized_keys...' % args.ssh_user)
+ try:
+ s_pwd = pwd.getpwnam(args.ssh_user)
+ except KeyError as e:
+ raise Error('Cannot find uid/gid for ssh-user: %s' % (args.ssh_user))
+ ssh_uid = s_pwd.pw_uid
+ ssh_gid = s_pwd.pw_gid
+ ssh_dir = os.path.join(s_pwd.pw_dir, '.ssh')
+
+ if not os.path.exists(ssh_dir):
+ makedirs(ssh_dir, ssh_uid, ssh_gid, 0o700)
+
+ auth_keys_file = '%s/authorized_keys' % ssh_dir
+ add_newline = False
+
+ if os.path.exists(auth_keys_file):
+ with open(auth_keys_file, 'r') as f:
+ f.seek(0, os.SEEK_END)
+ if f.tell() > 0:
+ f.seek(f.tell()-1, os.SEEK_SET) # go to last char
+ if f.read() != '\n':
+ add_newline = True
+
+ with open(auth_keys_file, 'a') as f:
+ os.fchown(f.fileno(), ssh_uid, ssh_gid) # just in case we created it
+ os.fchmod(f.fileno(), 0o600) # just in case we created it
+ if add_newline:
+ f.write('\n')
+ f.write(ssh_pub.strip() + '\n')
host = get_hostname()
logger.info('Adding host %s...' % host)
- cli(['orch', 'host', 'add', host])
+ try:
+ cli(['orch', 'host', 'add', host])
+ except RuntimeError as e:
+ raise Error('Failed to add host <%s>: %s' % (host, e))
if not args.orphan_initial_daemons:
for t in ['mon', 'mgr', 'crash']:
logger.info('Deploying %s service with default placement...' % t)
cli(['orch', 'apply', t])
+ if args.registry_url and args.registry_username and args.registry_password:
+ cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_url', args.registry_url, '--force'])
+ cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_username', args.registry_username, '--force'])
+ cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_password', args.registry_password, '--force'])
+
+ if args.container_init:
+ cli(['config', 'set', 'mgr', 'mgr/cephadm/container_init', str(args.container_init), '--force'])
+
if not args.skip_dashboard:
+ # Configure SSL port (cephadm only allows to configure dashboard SSL port)
+ # if the user does not want to use SSL he can change this setting once the cluster is up
+ cli(["config", "set", "mgr", "mgr/dashboard/ssl_server_port" , str(args.ssl_dashboard_port)])
+
+ # configuring dashboard parameters
logger.info('Enabling the dashboard module...')
cli(['mgr', 'module', 'enable', 'dashboard'])
wait_for_mgr_restart()
# dashboard crt and key
if args.dashboard_key and args.dashboard_crt:
logger.info('Using provided dashboard certificate...')
- mounts = {}
- mounts[pathify(args.dashboard_crt)] = '/tmp/dashboard.crt:z'
- mounts[pathify(args.dashboard_key)] = '/tmp/dashboard.key:z'
+ mounts = {
+ pathify(args.dashboard_crt.name): '/tmp/dashboard.crt:z',
+ pathify(args.dashboard_key.name): '/tmp/dashboard.key:z'
+ }
cli(['dashboard', 'set-ssl-certificate', '-i', '/tmp/dashboard.crt'], extra_mounts=mounts)
cli(['dashboard', 'set-ssl-certificate-key', '-i', '/tmp/dashboard.key'], extra_mounts=mounts)
else:
logger.info('Creating initial admin user...')
password = args.initial_dashboard_password or generate_password()
- cmd = ['dashboard', 'ac-user-create', args.initial_dashboard_user, password, 'administrator', '--force-password']
+ tmp_password_file = write_tmp(password, uid, gid)
+ cmd = ['dashboard', 'ac-user-create', args.initial_dashboard_user, '-i', '/tmp/dashboard.pw', 'administrator', '--force-password']
if not args.dashboard_password_noupdate:
cmd.append('--pwd-update-required')
- cli(cmd)
+ cli(cmd, extra_mounts={pathify(tmp_password_file.name): '/tmp/dashboard.pw:z'})
logger.info('Fetching dashboard port number...')
out = cli(['config', 'get', 'mgr', 'mgr/dashboard/ssl_server_port'])
port = int(out)
+ # Open dashboard port
+ fw = Firewalld()
+ fw.open_ports([port])
+ fw.apply_rules()
+
logger.info('Ceph Dashboard is now available at:\n\n'
'\t URL: https://%s:%s/\n'
'\t User: %s\n'
args.initial_dashboard_user,
password))
+ if args.apply_spec:
+ logger.info('Applying %s to cluster' % args.apply_spec)
+
+ with open(args.apply_spec) as f:
+ for line in f:
+ if 'hostname:' in line:
+ line = line.replace('\n', '')
+ split = line.split(': ')
+ if split[1] != host:
+ logger.info('Adding ssh key to %s' % split[1])
+
+ ssh_key = '/etc/ceph/ceph.pub'
+ if args.ssh_public_key:
+ ssh_key = args.ssh_public_key.name
+ out, err, code = call_throws(['ssh-copy-id', '-f', '-i', ssh_key, '%s@%s' % (args.ssh_user, split[1])])
+
+ mounts = {}
+ mounts[pathify(args.apply_spec)] = '/tmp/spec.yml:z'
+
+ out = cli(['orch', 'apply', '-i', '/tmp/spec.yml'], extra_mounts=mounts)
+ logger.info(out)
+
logger.info('You can access the Ceph CLI with:\n\n'
'\tsudo %s shell --fsid %s -c %s -k %s\n' % (
sys.argv[0],
##################################
+def command_registry_login():
+ if args.registry_json:
+ logger.info("Pulling custom registry login info from %s." % args.registry_json)
+ d = get_parm(args.registry_json)
+ if d.get('url') and d.get('username') and d.get('password'):
+ args.registry_url = d.get('url')
+ args.registry_username = d.get('username')
+ args.registry_password = d.get('password')
+ registry_login(args.registry_url, args.registry_username, args.registry_password)
+ else:
+ raise Error("json provided for custom registry login did not include all necessary fields. "
+ "Please setup json file as\n"
+ "{\n"
+ " \"url\": \"REGISTRY_URL\",\n"
+ " \"username\": \"REGISTRY_USERNAME\",\n"
+ " \"password\": \"REGISTRY_PASSWORD\"\n"
+ "}\n")
+ elif args.registry_url and args.registry_username and args.registry_password:
+ registry_login(args.registry_url, args.registry_username, args.registry_password)
+ else:
+ raise Error("Invalid custom registry arguments received. To login to a custom registry include "
+ "--registry-url, --registry-username and --registry-password "
+ "options or --registry-json option")
+ return 0
+
+def registry_login(url, username, password):
+ logger.info("Logging into custom registry.")
+ try:
+ out, _, _ = call_throws([container_path, 'login',
+ '-u', username,
+ '-p', password,
+ url])
+ except:
+ raise Error("Failed to login to custom registry @ %s as %s with given password" % (args.registry_url, args.registry_username))
+
+##################################
+
+
def extract_uid_gid_monitoring(daemon_type):
# type: (str) -> Tuple[int, int]
elif daemon_type == 'grafana':
uid, gid = extract_uid_gid(file_path='/var/lib/grafana')
elif daemon_type == 'alertmanager':
- uid, gid = extract_uid_gid(file_path='/etc/alertmanager')
+ uid, gid = extract_uid_gid(file_path=['/etc/alertmanager', '/etc/prometheus'])
else:
raise Error("{} not implemented yet".format(daemon_type))
return uid, gid
@default_image
def command_deploy():
# type: () -> None
- (daemon_type, daemon_id) = args.name.split('.', 1)
+ daemon_type, daemon_id = args.name.split('.', 1)
l = FileLock(args.fsid)
l.acquire()
if daemon_type not in get_supported_daemons():
raise Error('daemon type %s not recognized' % daemon_type)
- logger.info('Deploying daemon %s.%s ...' % (daemon_type, daemon_id))
+ redeploy = False
+ unit_name = get_unit_name(args.fsid, daemon_type, daemon_id)
+ (_, state, _) = check_unit(unit_name)
+ if state == 'running':
+ redeploy = True
+
+ if args.reconfig:
+ logger.info('%s daemon %s ...' % ('Reconfig', args.name))
+ elif redeploy:
+ logger.info('%s daemon %s ...' % ('Redeploy', args.name))
+ else:
+ logger.info('%s daemon %s ...' % ('Deploy', args.name))
+
+ # Get and check ports explicitly required to be opened
+ daemon_ports = [] # type: List[int]
+ if args.tcp_ports:
+ daemon_ports = list(map(int, args.tcp_ports.split()))
if daemon_type in Ceph.daemons:
- (config, keyring) = get_config_and_keyring()
- (uid, gid) = extract_uid_gid()
+ config, keyring = get_config_and_keyring()
+ uid, gid = extract_uid_gid()
make_var_run(args.fsid, uid, gid)
+
c = get_container(args.fsid, daemon_type, daemon_id,
ptrace=args.allow_ptrace)
deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid,
config=config, keyring=keyring,
osd_fsid=args.osd_fsid,
- reconfig=args.reconfig)
+ reconfig=args.reconfig,
+ ports=daemon_ports)
elif daemon_type in Monitoring.components:
# monitoring daemon - prometheus, grafana, alertmanager, node-exporter
- monitoring_args = [] # type: List[str]
-
# Default Checks
- if not args.reconfig:
- daemon_ports = Monitoring.port_map[daemon_type] # type: List[int]
- if any([port_in_use(port) for port in daemon_ports]):
- raise Error("TCP Port(s) '{}' required for {} is already in use".format(",".join(map(str, daemon_ports)), daemon_type))
+ if not args.reconfig and not redeploy:
+ daemon_ports.extend(Monitoring.port_map[daemon_type])
# make sure provided config-json is sufficient
config = get_parm(args.config_json) # type: ignore
raise Error("{} deployment requires config-json which must "
"contain arg for {}".format(daemon_type.capitalize(), ', '.join(required_args)))
-
uid, gid = extract_uid_gid_monitoring(daemon_type)
c = get_container(args.fsid, daemon_type, daemon_id)
deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid,
- reconfig=args.reconfig)
+ reconfig=args.reconfig,
+ ports=daemon_ports)
elif daemon_type == NFSGanesha.daemon_type:
- NFSGanesha.port_in_use()
- (config, keyring) = get_config_and_keyring()
+ if not args.reconfig and not redeploy:
+ daemon_ports.extend(NFSGanesha.port_map.values())
+
+ config, keyring = get_config_and_keyring()
# TODO: extract ganesha uid/gid (997, 994) ?
- (uid, gid) = extract_uid_gid()
+ uid, gid = extract_uid_gid()
+ c = get_container(args.fsid, daemon_type, daemon_id)
+ deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid,
+ config=config, keyring=keyring,
+ reconfig=args.reconfig,
+ ports=daemon_ports)
+
+ elif daemon_type == CephIscsi.daemon_type:
+ config, keyring = get_config_and_keyring()
+ uid, gid = extract_uid_gid()
c = get_container(args.fsid, daemon_type, daemon_id)
deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid,
config=config, keyring=keyring,
- reconfig=args.reconfig)
+ reconfig=args.reconfig,
+ ports=daemon_ports)
+
+ elif daemon_type == CustomContainer.daemon_type:
+ cc = CustomContainer.init(args.fsid, daemon_id)
+ if not args.reconfig and not redeploy:
+ daemon_ports.extend(cc.ports)
+ c = get_container(args.fsid, daemon_type, daemon_id,
+ privileged=cc.privileged,
+ ptrace=args.allow_ptrace)
+ deploy_daemon(args.fsid, daemon_type, daemon_id, c,
+ uid=cc.uid, gid=cc.gid, config=None,
+ keyring=None, reconfig=args.reconfig,
+ ports=daemon_ports)
+
else:
- raise Error("{} not implemented in command_deploy function".format(daemon_type))
+ raise Error('daemon type {} not implemented in command_deploy function'
+ .format(daemon_type))
##################################
+
@infer_image
def command_run():
# type: () -> int
##################################
+
@infer_fsid
+@infer_config
@infer_image
def command_shell():
# type: () -> int
# use /etc/ceph files by default, if present. we do this instead of
# making these defaults in the arg parser because we don't want an error
# if they don't exist.
- if not args.config and os.path.exists(SHELL_DEFAULT_CONF):
- args.config = SHELL_DEFAULT_CONF
if not args.keyring and os.path.exists(SHELL_DEFAULT_KEYRING):
args.keyring = SHELL_DEFAULT_KEYRING
container_args = [] # type: List[str]
mounts = get_container_mounts(args.fsid, daemon_type, daemon_id,
no_config=True if args.config else False)
+ binds = get_container_binds(args.fsid, daemon_type, daemon_id)
if args.config:
mounts[pathify(args.config)] = '/etc/ceph/ceph.conf:z'
if args.keyring:
mounts[pathify(args.keyring)] = '/etc/ceph/ceph.keyring:z'
+ if args.mount:
+ for _mount in args.mount:
+ split_src_dst = _mount.split(':')
+ mount = pathify(split_src_dst[0])
+ filename = os.path.basename(split_src_dst[0])
+ if len(split_src_dst) > 1:
+ dst = split_src_dst[1] + ':z' if len(split_src_dst) == 3 else split_src_dst[1]
+ mounts[mount] = dst
+ else:
+ mounts[mount] = '/mnt/{}:z'.format(filename)
if args.command:
command = args.command
else:
args=[],
container_args=container_args,
volume_mounts=mounts,
+ bind_mounts=binds,
envs=args.env,
privileged=True)
command = c.shell_cmd(command)
##################################
+
@infer_fsid
def command_enter():
# type: () -> int
'-e', 'LANG=C',
'-e', "PS1=%s" % CUSTOM_PS1,
]
- c = get_container(args.fsid, daemon_type, daemon_id,
- container_args=container_args)
+ c = CephContainer(
+ image=args.image,
+ entrypoint='doesnotmatter',
+ container_args=container_args,
+ cname='ceph-%s-%s.%s' % (args.fsid, daemon_type, daemon_id),
+ )
command = c.exec_cmd(command)
return call_timeout(command, args.timeout)
##################################
+
@infer_fsid
@infer_image
def command_ceph_volume():
if args.fsid:
make_log_dir(args.fsid)
+ l = FileLock(args.fsid)
+ l.acquire()
+
(uid, gid) = (0, 0) # ceph-volume runs as root
mounts = get_container_mounts(args.fsid, 'osd', None)
c = CephContainer(
image=args.image,
entrypoint='/usr/sbin/ceph-volume',
+ envs=args.env,
args=args.command,
privileged=True,
volume_mounts=mounts,
)
- out, err, code = call_throws(c.run_cmd(), verbose=True)
+ out, err, code = call_throws(c.run_cmd(), verbosity=CallVerbosity.VERBOSE)
if not code:
print(out)
##################################
+
@infer_fsid
def command_unit():
# type: () -> None
if not args.fsid:
raise Error('must pass --fsid to specify cluster')
- (daemon_type, daemon_id) = args.name.split('.', 1)
- unit_name = get_unit_name(args.fsid, daemon_type, daemon_id)
+
+ unit_name = get_unit_name_by_daemon_name(args.fsid, args.name)
+
call_throws([
'systemctl',
args.command,
- unit_name])
+ unit_name],
+ verbosity=CallVerbosity.VERBOSE,
+ desc=''
+ )
##################################
+
@infer_fsid
def command_logs():
# type: () -> None
if not args.fsid:
raise Error('must pass --fsid to specify cluster')
- (daemon_type, daemon_id) = args.name.split('.', 1)
- unit_name = get_unit_name(args.fsid, daemon_type, daemon_id)
+ unit_name = get_unit_name_by_daemon_name(args.fsid, args.name)
cmd = [find_program('journalctl')]
cmd.extend(['-u', unit_name])
##################################
+
def list_networks():
# type: () -> Dict[str,List[str]]
#j = json.loads(out)
#for x in j:
+ res = _list_ipv4_networks()
+ res.update(_list_ipv6_networks())
+ return res
+
+
+def _list_ipv4_networks():
out, _, _ = call_throws([find_executable('ip'), 'route', 'ls'])
- return _parse_ip_route(out)
+ return _parse_ipv4_route(out)
-def _parse_ip_route(out):
+
+def _parse_ipv4_route(out):
r = {} # type: Dict[str,List[str]]
p = re.compile(r'^(\S+) (.*)scope link (.*)src (\S+)')
for line in out.splitlines():
r[net].append(ip)
return r
+
+def _list_ipv6_networks():
+ routes, _, _ = call_throws([find_executable('ip'), '-6', 'route', 'ls'])
+ ips, _, _ = call_throws([find_executable('ip'), '-6', 'addr', 'ls'])
+ return _parse_ipv6_route(routes, ips)
+
+
+def _parse_ipv6_route(routes, ips):
+ r = {} # type: Dict[str,List[str]]
+ route_p = re.compile(r'^(\S+) dev (\S+) proto (\S+) metric (\S+) .*pref (\S+)$')
+ ip_p = re.compile(r'^\s+inet6 (\S+)/(.*)scope (.*)$')
+ for line in routes.splitlines():
+ m = route_p.findall(line)
+ if not m or m[0][0].lower() == 'default':
+ continue
+ net = m[0][0]
+ if net not in r:
+ r[net] = []
+
+ for line in ips.splitlines():
+ m = ip_p.findall(line)
+ if not m:
+ continue
+ ip = m[0][0]
+ # find the network it belongs to
+ net = [n for n in r.keys()
+ if ipaddress.ip_address(unicode(ip)) in ipaddress.ip_network(unicode(n))]
+ if net:
+ r[net[0]].append(ip)
+
+ return r
+
+
def command_list_networks():
# type: () -> None
r = list_networks()
##################################
+
def command_ls():
# type: () -> None
+
ls = list_daemons(detail=not args.no_detail,
legacy_dir=args.legacy_dir)
print(json.dumps(ls, indent=4))
+
def list_daemons(detail=True, legacy_dir=None):
# type: (bool, Optional[str]) -> List[Dict[str, str]]
host_version = None
fsid = get_legacy_daemon_fsid(
cluster, daemon_type, daemon_id,
legacy_dir=legacy_dir)
+ legacy_unit_name = 'ceph-%s@%s' % (daemon_type, daemon_id)
i = {
'style': 'legacy',
'name': '%s.%s' % (daemon_type, daemon_id),
'fsid': fsid if fsid is not None else 'unknown',
+ 'systemd_unit': legacy_unit_name,
}
if detail:
- (i['enabled'], i['state'], _) = check_unit(
- 'ceph-%s@%s' % (daemon_type, daemon_id))
+ (i['enabled'], i['state'], _) = check_unit(legacy_unit_name)
if not host_version:
try:
out, err, code = call(['ceph', '-v'])
'style': 'cephadm:v1',
'name': name,
'fsid': fsid,
+ 'systemd_unit': unit_name,
}
if detail:
# get container id
'--format', '{{.Id}},{{.Config.Image}},{{%s}},{{.Created}},{{index .Config.Labels "io.ceph.version"}}' % image_field,
'ceph-%s-%s' % (fsid, j)
],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
if not code:
(container_id, image_name, image_id, start,
version) = out.strip().split(',')
version = seen_versions.get(image_id, None)
if daemon_type == NFSGanesha.daemon_type:
version = NFSGanesha.get_version(container_id)
+ if daemon_type == CephIscsi.daemon_type:
+ version = CephIscsi.get_version(container_id)
elif not version:
if daemon_type in Ceph.daemons:
out, err, code = call(
err.startswith('%s, version ' % cmd):
version = err.split(' ')[2]
seen_versions[image_id] = version
+ elif daemon_type == CustomContainer.daemon_type:
+ # Because a custom container can contain
+ # everything, we do not know which command
+ # to execute to get the version.
+ pass
else:
- logging.warning('version for unknown daemon type %s' % daemon_type)
+ logger.warning('version for unknown daemon type %s' % daemon_type)
else:
vfile = os.path.join(data_dir, fsid, j, 'unit.image') # type: ignore
try:
ls.append(i)
- # /var/lib/rook
- # WRITE ME
return ls
+def get_daemon_description(fsid, name, detail=False, legacy_dir=None):
+ # type: (str, str, bool, Optional[str]) -> Dict[str, str]
+
+ for d in list_daemons(detail=detail, legacy_dir=legacy_dir):
+ if d['fsid'] != fsid:
+ continue
+ if d['name'] != name:
+ continue
+ return d
+ raise Error('Daemon not found: {}. See `cephadm ls`'.format(name))
+
+
##################################
@default_image
# type: () -> None
if not args.skip_pull:
- logger.info('Pulling latest %s container...' % args.image)
- call_throws([container_path, 'pull', args.image])
+ _pull_image(args.image)
(daemon_type, daemon_id) = args.name.split('.', 1)
raise Error('daemon type %s not recognized' % daemon_type)
+class AdoptOsd(object):
+ def __init__(self, osd_data_dir, osd_id):
+ # type: (str, str) -> None
+ self.osd_data_dir = osd_data_dir
+ self.osd_id = osd_id
+
+ def check_online_osd(self):
+ # type: () -> Tuple[Optional[str], Optional[str]]
+
+ osd_fsid, osd_type = None, None
+
+ path = os.path.join(self.osd_data_dir, 'fsid')
+ try:
+ with open(path, 'r') as f:
+ osd_fsid = f.read().strip()
+ logger.info("Found online OSD at %s" % path)
+ except IOError:
+ logger.info('Unable to read OSD fsid from %s' % path)
+ if os.path.exists(os.path.join(self.osd_data_dir, 'type')):
+ with open(os.path.join(self.osd_data_dir, 'type')) as f:
+ osd_type = f.read().strip()
+ else:
+ logger.info('"type" file missing for OSD data dir')
+
+ return osd_fsid, osd_type
+
+ def check_offline_lvm_osd(self):
+ # type: () -> Tuple[Optional[str], Optional[str]]
+
+ osd_fsid, osd_type = None, None
+
+ c = CephContainer(
+ image=args.image,
+ entrypoint='/usr/sbin/ceph-volume',
+ args=['lvm', 'list', '--format=json'],
+ privileged=True
+ )
+ out, err, code = call_throws(c.run_cmd())
+ if not code:
+ try:
+ js = json.loads(out)
+ if self.osd_id in js:
+ logger.info("Found offline LVM OSD {}".format(self.osd_id))
+ osd_fsid = js[self.osd_id][0]['tags']['ceph.osd_fsid']
+ for device in js[self.osd_id]:
+ if device['tags']['ceph.type'] == 'block':
+ osd_type = 'bluestore'
+ break
+ if device['tags']['ceph.type'] == 'data':
+ osd_type = 'filestore'
+ break
+ except ValueError as e:
+ logger.info("Invalid JSON in ceph-volume lvm list: {}".format(e))
+
+ return osd_fsid, osd_type
+
+ def check_offline_simple_osd(self):
+ # type: () -> Tuple[Optional[str], Optional[str]]
+
+ osd_fsid, osd_type = None, None
+
+ osd_file = glob("/etc/ceph/osd/{}-[a-f0-9-]*.json".format(self.osd_id))
+ if len(osd_file) == 1:
+ with open(osd_file[0], 'r') as f:
+ try:
+ js = json.loads(f.read())
+ logger.info("Found offline simple OSD {}".format(self.osd_id))
+ osd_fsid = js["fsid"]
+ osd_type = js["type"]
+ if osd_type != "filestore":
+ # need this to be mounted for the adopt to work, as it
+ # needs to move files from this directory
+ call_throws(['mount', js["data"]["path"], self.osd_data_dir])
+ except ValueError as e:
+ logger.info("Invalid JSON in {}: {}".format(osd_file, e))
+
+ return osd_fsid, osd_type
+
def command_adopt_ceph(daemon_type, daemon_id, fsid):
# type: (str, str, str) -> None
(daemon_type, args.cluster, daemon_id))
data_dir_src = os.path.abspath(args.legacy_dir + data_dir_src)
+ if not os.path.exists(data_dir_src):
+ raise Error("{}.{} data directory '{}' does not exist. "
+ "Incorrect ID specified, or daemon alrady adopted?".format(
+ daemon_type, daemon_id, data_dir_src))
+
osd_fsid = None
if daemon_type == 'osd':
- path = os.path.join(data_dir_src, 'fsid')
- try:
- with open(path, 'r') as f:
- osd_fsid = f.read().strip()
- except IOError:
- raise Error('unable to read OSD fsid from %s' % path)
- os_type = None
- if os.path.exists(os.path.join(data_dir_src, 'type')):
- with open(os.path.join(data_dir_src, 'type')) as f:
- os_type = f.read().strip()
- else:
- raise Error('"type" file missing for OSD data dir')
- logger.info('objectstore_type is %s' % os_type)
- if os_type == 'filestore':
+ adopt_osd = AdoptOsd(data_dir_src, daemon_id)
+ osd_fsid, osd_type = adopt_osd.check_online_osd()
+ if not osd_fsid:
+ osd_fsid, osd_type = adopt_osd.check_offline_lvm_osd()
+ if not osd_fsid:
+ osd_fsid, osd_type = adopt_osd.check_offline_simple_osd()
+ if not osd_fsid:
+ raise Error('Unable to find OSD {}'.format(daemon_id))
+ logger.info('objectstore_type is %s' % osd_type)
+ assert osd_type
+ if osd_type == 'filestore':
raise Error('FileStore is not supported by cephadm')
# NOTE: implicit assumption here that the units correspond to the
logger.info('Renaming %s -> %s', simple_fn, new_fn)
os.rename(simple_fn, new_fn)
logger.info('Disabling host unit ceph-volume@ simple unit...')
- call_throws(['systemctl', 'disable',
- 'ceph-volume@simple-%s-%s.service' % (
- daemon_id, osd_fsid)])
+ call(['systemctl', 'disable',
+ 'ceph-volume@simple-%s-%s.service' % (daemon_id, osd_fsid)])
else:
# assume this is an 'lvm' c-v for now, but don't error
# out if it's not.
c = get_container(fsid, daemon_type, daemon_id)
deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c,
enable=True, # unconditionally enable the new unit
- start=(state == 'running'),
+ start=(state == 'running' or args.force_start),
osd_fsid=osd_fsid)
update_firewalld(daemon_type)
config_src = '/etc/prometheus/prometheus.yml'
config_src = os.path.abspath(args.legacy_dir + config_src)
config_dst = os.path.join(data_dir_dst, 'etc/prometheus')
+ makedirs(config_dst, uid, gid, 0o755)
copy_files([config_src], config_dst, uid=uid, gid=gid)
# data
deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid)
update_firewalld(daemon_type)
+
def command_adopt_grafana(daemon_id, fsid):
# type: (str, str) -> None
else:
logger.debug("Skipping ssl, missing cert {} or key {}".format(cert, key))
-
# data - possible custom dashboards/plugins
data_src = '/var/lib/grafana/'
data_src = os.path.abspath(args.legacy_dir + data_src)
deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid)
update_firewalld(daemon_type)
+
def command_adopt_alertmanager(daemon_id, fsid):
# type: (str, str) -> None
deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid)
update_firewalld(daemon_type)
+
def _adjust_grafana_ini(filename):
# type: (str) -> None
l = FileLock(args.fsid)
l.acquire()
+ unit_name = get_unit_name_by_daemon_name(args.fsid, args.name)
+
(daemon_type, daemon_id) = args.name.split('.', 1)
if daemon_type in ['mon', 'osd'] and not args.force:
raise Error('must pass --force to proceed: '
'this command may destroy precious data!')
- unit_name = get_unit_name(args.fsid, daemon_type, daemon_id)
+
call(['systemctl', 'stop', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
call(['systemctl', 'reset-failed', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
call(['systemctl', 'disable', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
data_dir = get_data_dir(args.fsid, daemon_type, daemon_id)
if daemon_type in ['mon', 'osd', 'prometheus'] and \
not args.force_delete_data:
##################################
+
def command_rm_cluster():
# type: () -> None
if not args.force:
continue
unit_name = get_unit_name(args.fsid, d['name'])
call(['systemctl', 'stop', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
call(['systemctl', 'reset-failed', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
call(['systemctl', 'disable', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
# cluster units
for unit_name in ['ceph-%s.target' % args.fsid]:
call(['systemctl', 'stop', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
call(['systemctl', 'reset-failed', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
call(['systemctl', 'disable', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
slice_name = 'system-%s.slice' % (('ceph-%s' % args.fsid).replace('-',
'\\x2d'))
call(['systemctl', 'stop', slice_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
# rm units
call_throws(['rm', '-f', args.unit_dir +
# rm logrotate config
call_throws(['rm', '-f', args.logrotate_dir + '/ceph-%s' % args.fsid])
+ # clean up config, keyring, and pub key files
+ files = ['/etc/ceph/ceph.conf', '/etc/ceph/ceph.pub', '/etc/ceph/ceph.client.admin.keyring']
+
+ if os.path.exists(files[0]):
+ valid_fsid = False
+ with open(files[0]) as f:
+ if args.fsid in f.read():
+ valid_fsid = True
+ if valid_fsid:
+ for n in range(0, len(files)):
+ if os.path.exists(files[n]):
+ os.remove(files[n])
+
##################################
'systemd-timesyncd.service',
'ntpd.service', # el7 (at least)
'ntp.service', # 18.04 (at least)
+ 'ntpsec.service', # 20.04 (at least) / buster
]
- if not check_units(units, enabler=None):
+ if not check_units(units, enabler):
logger.warning('No time sync service is running; checked for %s' % units)
return False
return True
+
def command_check_host():
# type: () -> None
- # caller already checked for docker/podman
- logger.info('podman|docker (%s) is present' % container_path)
+ global container_path
+ errors = []
commands = ['systemctl', 'lvcreate']
+ if args.docker:
+ container_path = find_program('docker')
+ else:
+ for i in CONTAINER_PREFERENCE:
+ try:
+ container_path = find_program(i)
+ break
+ except Exception as e:
+ logger.debug('Could not locate %s: %s' % (i, e))
+ if not container_path:
+ errors.append('Unable to locate any of %s' % CONTAINER_PREFERENCE)
+ else:
+ logger.info('podman|docker (%s) is present' % container_path)
+
for command in commands:
try:
find_program(command)
logger.info('%s is present' % command)
except ValueError:
- raise Error('%s binary does not appear to be installed' % command)
+ errors.append('%s binary does not appear to be installed' % command)
# check for configured+running chronyd or ntp
if not check_time_sync():
- raise Error('No time synchronization is active')
+ errors.append('No time synchronization is active')
if 'expect_hostname' in args and args.expect_hostname:
- if get_hostname() != args.expect_hostname:
- raise Error('hostname "%s" does not match expected hostname "%s"' % (
+ if get_hostname().lower() != args.expect_hostname.lower():
+ errors.append('hostname "%s" does not match expected hostname "%s"' % (
get_hostname(), args.expect_hostname))
logger.info('Hostname "%s" matches what is expected.',
args.expect_hostname)
+ if errors:
+ raise Error('\n'.join(errors))
+
logger.info('Host looks OK')
##################################
+
def command_prepare_host():
# type: () -> None
logger.info('Verifying podman|docker is present...')
##################################
+
class CustomValidation(argparse.Action):
def _check_name(self, values):
##################################
+
def get_distro():
+ # type: () -> Tuple[Optional[str], Optional[str], Optional[str]]
distro = None
distro_version = None
distro_codename = None
distro_codename = val.lower()
return distro, distro_version, distro_codename
+
class Packager(object):
def __init__(self, stable=None, version=None, branch=None, commit=None):
assert \
def query_shaman(self, distro, distro_version, branch, commit):
# query shaman
- logging.info('Fetching repo metadata from shaman and chacra...')
+ logger.info('Fetching repo metadata from shaman and chacra...')
shaman_url = 'https://shaman.ceph.com/api/repos/ceph/{branch}/{sha1}/{distro}/{distro_version}/repo/?arch={arch}'.format(
distro=distro,
distro_version=distro_version,
try:
shaman_response = urlopen(shaman_url)
except HTTPError as err:
- logging.error('repository not found in shaman (might not be available yet)')
+ logger.error('repository not found in shaman (might not be available yet)')
raise Error('%s, failed to fetch %s' % (err, shaman_url))
try:
chacra_url = shaman_response.geturl()
chacra_response = urlopen(chacra_url)
except HTTPError as err:
- logging.error('repository not found in chacra (might not be available yet)')
+ logger.error('repository not found in chacra (might not be available yet)')
raise Error('%s, failed to fetch %s' % (err, chacra_url))
return chacra_response.read().decode('utf-8')
branch=branch, commit=commit)
self.distro = self.DISTRO_NAMES[distro]
self.distro_codename = distro_codename
+ self.distro_version = distro_version
def repo_path(self):
return '/etc/apt/sources.list.d/ceph.list'
def add_repo(self):
url, name = self.repo_gpgkey()
- logging.info('Installing repo GPG key from %s...' % url)
+ logger.info('Installing repo GPG key from %s...' % url)
try:
response = urlopen(url)
except HTTPError as err:
- logging.error('failed to fetch GPG repo key from %s: %s' % (
+ logger.error('failed to fetch GPG repo key from %s: %s' % (
url, err))
raise Error('failed to fetch GPG key')
key = response.read().decode('utf-8')
content = self.query_shaman(self.distro, self.distro_codename, self.branch,
self.commit)
- logging.info('Installing repo file at %s...' % self.repo_path())
+ logger.info('Installing repo file at %s...' % self.repo_path())
with open(self.repo_path(), 'w') as f:
f.write(content)
for name in ['autobuild', 'release']:
p = '/etc/apt/trusted.gpg.d/ceph.%s.gpg' % name
if os.path.exists(p):
- logging.info('Removing repo GPG key %s...' % p)
+ logger.info('Removing repo GPG key %s...' % p)
os.unlink(p)
if os.path.exists(self.repo_path()):
- logging.info('Removing repo at %s...' % self.repo_path())
+ logger.info('Removing repo at %s...' % self.repo_path())
os.unlink(self.repo_path())
+ if self.distro == 'ubuntu':
+ self.rm_kubic_repo()
+
def install(self, ls):
- logging.info('Installing packages %s...' % ls)
- call_throws(['apt', 'install', '-y'] + ls)
+ logger.info('Installing packages %s...' % ls)
+ call_throws(['apt-get', 'install', '-y'] + ls)
def install_podman(self):
if self.distro == 'ubuntu':
- logging.info('Setting up repo for pdoman...')
- self.install(['software-properties-common'])
- call_throws(['add-apt-repository', '-y', 'ppa:projectatomic/ppa'])
- call_throws(['apt', 'update'])
+ logger.info('Setting up repo for podman...')
+ self.add_kubic_repo()
+ call_throws(['apt-get', 'update'])
- logging.info('Attempting podman install...')
+ logger.info('Attempting podman install...')
try:
self.install(['podman'])
except Error as e:
- logging.info('Podman did not work. Falling back to docker...')
+ logger.info('Podman did not work. Falling back to docker...')
self.install(['docker.io'])
+ def kubic_repo_url(self):
+ return 'https://download.opensuse.org/repositories/devel:/kubic:/' \
+ 'libcontainers:/stable/xUbuntu_%s/' % self.distro_version
+
+ def kubic_repo_path(self):
+ return '/etc/apt/sources.list.d/devel:kubic:libcontainers:stable.list'
+
+ def kubric_repo_gpgkey_url(self):
+ return '%s/Release.key' % self.kubic_repo_url()
+
+ def kubric_repo_gpgkey_path(self):
+ return '/etc/apt/trusted.gpg.d/kubic.release.gpg'
+
+ def add_kubic_repo(self):
+ url = self.kubric_repo_gpgkey_url()
+ logger.info('Installing repo GPG key from %s...' % url)
+ try:
+ response = urlopen(url)
+ except HTTPError as err:
+ logger.error('failed to fetch GPG repo key from %s: %s' % (
+ url, err))
+ raise Error('failed to fetch GPG key')
+ key = response.read().decode('utf-8')
+ tmp_key = write_tmp(key, 0, 0)
+ keyring = self.kubric_repo_gpgkey_path()
+ call_throws(['apt-key', '--keyring', keyring, 'add', tmp_key.name])
+
+ logger.info('Installing repo file at %s...' % self.kubic_repo_path())
+ content = 'deb %s /\n' % self.kubic_repo_url()
+ with open(self.kubic_repo_path(), 'w') as f:
+ f.write(content)
+
+ def rm_kubic_repo(self):
+ keyring = self.kubric_repo_gpgkey_path()
+ if os.path.exists(keyring):
+ logger.info('Removing repo GPG key %s...' % keyring)
+ os.unlink(keyring)
+
+ p = self.kubic_repo_path()
+ if os.path.exists(p):
+ logger.info('Removing repo at %s...' % p)
+ os.unlink(p)
+
+
class YumDnf(Packager):
DISTRO_NAMES = {
'centos': ('centos', 'el'),
self.branch,
self.commit)
- logging.info('Writing repo to %s...' % self.repo_path())
+ logger.info('Writing repo to %s...' % self.repo_path())
with open(self.repo_path(), 'w') as f:
f.write(content)
if self.distro_code.startswith('el'):
logger.info('Enabling EPEL...')
call_throws([self.tool, 'install', '-y', 'epel-release'])
- if self.distro_code == 'el8':
- # we also need Ken's copr repo, at least for now
- logger.info('Enabling supplementary copr repo ktdreyer/ceph-el8...')
- call_throws(['dnf', 'copr', 'enable', '-y', 'ktdreyer/ceph-el8'])
def rm_repo(self):
if os.path.exists(self.repo_path()):
os.unlink(self.repo_path())
- if self.distro_code == 'el8':
- logger.info('Disabling supplementary copr repo ktdreyer/ceph-el8...')
- call_throws(['dnf', 'copr', 'disable', '-y', 'ktdreyer/ceph-el8'])
def install(self, ls):
logger.info('Installing packages %s...' % ls)
self.branch,
self.commit)
- logging.info('Writing repo to %s...' % self.repo_path())
+ logger.info('Writing repo to %s...' % self.repo_path())
with open(self.repo_path(), 'w') as f:
f.write(content)
def command_add_repo():
if args.version and args.release:
raise Error('you can specify either --release or --version but not both')
+ if not args.version and not args.release and not args.dev and not args.dev_commit:
+ raise Error('please supply a --release, --version, --dev or --dev-commit argument')
if args.version:
try:
(x, y, z) = args.version.split('.')
commit=args.dev_commit)
pkg.add_repo()
+
def command_rm_repo():
pkg = create_packager()
pkg.rm_repo()
+
def command_install():
pkg = create_packager()
pkg.install(args.packages)
##################################
+def get_ipv4_address(ifname):
+ # type: (str) -> str
+ def _extract(sock, offset):
+ return socket.inet_ntop(
+ socket.AF_INET,
+ fcntl.ioctl(
+ sock.fileno(),
+ offset,
+ struct.pack('256s', bytes(ifname[:15], 'utf-8'))
+ )[20:24])
+
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ try:
+ addr = _extract(s, 35093) # '0x8915' = SIOCGIFADDR
+ dq_mask = _extract(s, 35099) # 0x891b = SIOCGIFNETMASK
+ except OSError:
+ # interface does not have an ipv4 address
+ return ''
+
+ dec_mask = sum([bin(int(i)).count('1')
+ for i in dq_mask.split('.')])
+ return '{}/{}'.format(addr, dec_mask)
+
+
+def get_ipv6_address(ifname):
+ # type: (str) -> str
+ if not os.path.exists('/proc/net/if_inet6'):
+ return ''
+
+ raw = read_file(['/proc/net/if_inet6'])
+ data = raw.splitlines()
+ # based on docs @ https://www.tldp.org/HOWTO/Linux+IPv6-HOWTO/ch11s04.html
+ # field 0 is ipv6, field 2 is scope
+ for iface_setting in data:
+ field = iface_setting.split()
+ if field[-1] == ifname:
+ ipv6_raw = field[0]
+ ipv6_fmtd = ":".join([ipv6_raw[_p:_p+4] for _p in range(0, len(field[0]),4)])
+ # apply naming rules using ipaddress module
+ ipv6 = ipaddress.ip_address(ipv6_fmtd)
+ return "{}/{}".format(str(ipv6), int('0x{}'.format(field[2]), 16))
+ return ''
+
+
+def bytes_to_human(num, mode='decimal'):
+ # type: (float, str) -> str
+ """Convert a bytes value into it's human-readable form.
+
+ :param num: number, in bytes, to convert
+ :param mode: Either decimal (default) or binary to determine divisor
+ :returns: string representing the bytes value in a more readable format
+ """
+ unit_list = ['', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB']
+ divisor = 1000.0
+ yotta = "YB"
+
+ if mode == 'binary':
+ unit_list = ['', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB']
+ divisor = 1024.0
+ yotta = "YiB"
+
+ for unit in unit_list:
+ if abs(num) < divisor:
+ return "%3.1f%s" % (num, unit)
+ num /= divisor
+ return "%.1f%s" % (num, yotta)
+
+
+def read_file(path_list, file_name=''):
+ # type: (List[str], str) -> str
+ """Returns the content of the first file found within the `path_list`
+
+ :param path_list: list of file paths to search
+ :param file_name: optional file_name to be applied to a file path
+ :returns: content of the file or 'Unknown'
+ """
+ for path in path_list:
+ if file_name:
+ file_path = os.path.join(path, file_name)
+ else:
+ file_path = path
+ if os.path.exists(file_path):
+ with open(file_path, 'r') as f:
+ try:
+ content = f.read().strip()
+ except OSError:
+ # sysfs may populate the file, but for devices like
+ # virtio reads can fail
+ return "Unknown"
+ else:
+ return content
+ return "Unknown"
+
+
+##################################
+class HostFacts():
+ _dmi_path_list = ['/sys/class/dmi/id']
+ _nic_path_list = ['/sys/class/net']
+ _selinux_path_list = ['/etc/selinux/config']
+ _apparmor_path_list = ['/etc/apparmor']
+ _disk_vendor_workarounds = {
+ "0x1af4": "Virtio Block Device"
+ }
+
+ def __init__(self):
+ self.cpu_model = 'Unknown'
+ self.cpu_count = 0
+ self.cpu_cores = 0
+ self.cpu_threads = 0
+ self.interfaces = {}
+
+ self._meminfo = read_file(['/proc/meminfo']).splitlines()
+ self._get_cpuinfo()
+ self._process_nics()
+ self.arch = platform.processor()
+ self.kernel = platform.release()
+
+ def _get_cpuinfo(self):
+ # type: () -> None
+ """Determine cpu information via /proc/cpuinfo"""
+ raw = read_file(['/proc/cpuinfo'])
+ output = raw.splitlines()
+ cpu_set = set()
+
+ for line in output:
+ field = [l.strip() for l in line.split(':')]
+ if "model name" in line:
+ self.cpu_model = field[1]
+ if "physical id" in line:
+ cpu_set.add(field[1])
+ if "siblings" in line:
+ self.cpu_threads = int(field[1].strip())
+ if "cpu cores" in line:
+ self.cpu_cores = int(field[1].strip())
+ pass
+ self.cpu_count = len(cpu_set)
+
+ def _get_block_devs(self):
+ # type: () -> List[str]
+ """Determine the list of block devices by looking at /sys/block"""
+ return [dev for dev in os.listdir('/sys/block')
+ if not dev.startswith('dm')]
+
+ def _get_devs_by_type(self, rota='0'):
+ # type: (str) -> List[str]
+ """Filter block devices by a given rotational attribute (0=flash, 1=spinner)"""
+ devs = list()
+ for blk_dev in self._get_block_devs():
+ rot_path = '/sys/block/{}/queue/rotational'.format(blk_dev)
+ rot_value = read_file([rot_path])
+ if rot_value == rota:
+ devs.append(blk_dev)
+ return devs
+
+ @property
+ def operating_system(self):
+ # type: () -> str
+ """Determine OS version"""
+ raw_info = read_file(['/etc/os-release'])
+ os_release = raw_info.splitlines()
+ rel_str = 'Unknown'
+ rel_dict = dict()
+
+ for line in os_release:
+ if "=" in line:
+ var_name, var_value = line.split('=')
+ rel_dict[var_name] = var_value.strip('"')
+
+ # Would normally use PRETTY_NAME, but NAME and VERSION are more
+ # consistent
+ if all(_v in rel_dict for _v in ["NAME", "VERSION"]):
+ rel_str = "{} {}".format(rel_dict['NAME'], rel_dict['VERSION'])
+ return rel_str
+
+ @property
+ def hostname(self):
+ # type: () -> str
+ """Return the hostname"""
+ return platform.node()
+
+ @property
+ def subscribed(self):
+ # type: () -> str
+ """Highlevel check to see if the host is subscribed to receive updates/support"""
+ def _red_hat():
+ # type: () -> str
+ # RHEL 7 and RHEL 8
+ entitlements_dir = '/etc/pki/entitlement'
+ if os.path.exists(entitlements_dir):
+ pems = glob('{}/*.pem'.format(entitlements_dir))
+ if len(pems) >= 2:
+ return "Yes"
+
+ return "No"
+
+ os_name = self.operating_system
+ if os_name.upper().startswith("RED HAT"):
+ return _red_hat()
+
+ return "Unknown"
+
+ @property
+ def hdd_count(self):
+ # type: () -> int
+ """Return a count of HDDs (spinners)"""
+ return len(self._get_devs_by_type(rota='1'))
+
+ def _get_capacity(self, dev):
+ # type: (str) -> int
+ """Determine the size of a given device"""
+ size_path = os.path.join('/sys/block', dev, 'size')
+ size_blocks = int(read_file([size_path]))
+ blk_path = os.path.join('/sys/block', dev, 'queue', 'logical_block_size')
+ blk_count = int(read_file([blk_path]))
+ return size_blocks * blk_count
+
+ def _get_capacity_by_type(self, rota='0'):
+ # type: (str) -> int
+ """Return the total capacity of a category of device (flash or hdd)"""
+ devs = self._get_devs_by_type(rota=rota)
+ capacity = 0
+ for dev in devs:
+ capacity += self._get_capacity(dev)
+ return capacity
+
+ def _dev_list(self, dev_list):
+ # type: (List[str]) -> List[Dict[str, object]]
+ """Return a 'pretty' name list for each device in the `dev_list`"""
+ disk_list = list()
+
+ for dev in dev_list:
+ disk_model = read_file(['/sys/block/{}/device/model'.format(dev)]).strip()
+ disk_rev = read_file(['/sys/block/{}/device/rev'.format(dev)]).strip()
+ disk_wwid = read_file(['/sys/block/{}/device/wwid'.format(dev)]).strip()
+ vendor = read_file(['/sys/block/{}/device/vendor'.format(dev)]).strip()
+ disk_vendor = HostFacts._disk_vendor_workarounds.get(vendor, vendor)
+ disk_size_bytes = self._get_capacity(dev)
+ disk_list.append({
+ "description": "{} {} ({})".format(disk_vendor, disk_model, bytes_to_human(disk_size_bytes)),
+ "vendor": disk_vendor,
+ "model": disk_model,
+ "rev": disk_rev,
+ "wwid": disk_wwid,
+ "dev_name": dev,
+ "disk_size_bytes": disk_size_bytes,
+ }
+ )
+ return disk_list
+
+ @property
+ def hdd_list(self):
+ # type: () -> List[Dict[str, object]]
+ """Return a list of devices that are HDDs (spinners)"""
+ devs = self._get_devs_by_type(rota='1')
+ return self._dev_list(devs)
+
+ @property
+ def flash_list(self):
+ # type: () -> List[Dict[str, object]]
+ """Return a list of devices that are flash based (SSD, NVMe)"""
+ devs = self._get_devs_by_type(rota='0')
+ return self._dev_list(devs)
+
+ @property
+ def hdd_capacity_bytes(self):
+ # type: () -> int
+ """Return the total capacity for all HDD devices (bytes)"""
+ return self._get_capacity_by_type(rota='1')
+
+ @property
+ def hdd_capacity(self):
+ # type: () -> str
+ """Return the total capacity for all HDD devices (human readable format)"""
+ return bytes_to_human(self.hdd_capacity_bytes)
+
+ @property
+ def cpu_load(self):
+ # type: () -> Dict[str, float]
+ """Return the cpu load average data for the host"""
+ raw = read_file(['/proc/loadavg']).strip()
+ data = raw.split()
+ return {
+ "1min": float(data[0]),
+ "5min": float(data[1]),
+ "15min": float(data[2]),
+ }
+
+ @property
+ def flash_count(self):
+ # type: () -> int
+ """Return the number of flash devices in the system (SSD, NVMe)"""
+ return len(self._get_devs_by_type(rota='0'))
+
+ @property
+ def flash_capacity_bytes(self):
+ # type: () -> int
+ """Return the total capacity for all flash devices (bytes)"""
+ return self._get_capacity_by_type(rota='0')
+
+ @property
+ def flash_capacity(self):
+ # type: () -> str
+ """Return the total capacity for all Flash devices (human readable format)"""
+ return bytes_to_human(self.flash_capacity_bytes)
+
+ def _process_nics(self):
+ # type: () -> None
+ """Look at the NIC devices and extract network related metadata"""
+ # from https://github.com/torvalds/linux/blob/master/include/uapi/linux/if_arp.h
+ hw_lookup = {
+ "1": "ethernet",
+ "32": "infiniband",
+ "772": "loopback",
+ }
+
+ for nic_path in HostFacts._nic_path_list:
+ if not os.path.exists(nic_path):
+ continue
+ for iface in os.listdir(nic_path):
+
+ lower_devs_list = [os.path.basename(link.replace("lower_", "")) for link in glob(os.path.join(nic_path, iface, "lower_*"))]
+ upper_devs_list = [os.path.basename(link.replace("upper_", "")) for link in glob(os.path.join(nic_path, iface, "upper_*"))]
+
+ try:
+ mtu = int(read_file([os.path.join(nic_path, iface, 'mtu')]))
+ except ValueError:
+ mtu = 0
+
+ operstate = read_file([os.path.join(nic_path, iface, 'operstate')])
+ try:
+ speed = int(read_file([os.path.join(nic_path, iface, 'speed')]))
+ except (OSError, ValueError):
+ # OSError : device doesn't support the ethtool get_link_ksettings
+ # ValueError : raised when the read fails, and returns Unknown
+ #
+ # Either way, we show a -1 when speed isn't available
+ speed = -1
+
+ if os.path.exists(os.path.join(nic_path, iface, 'bridge')):
+ nic_type = "bridge"
+ elif os.path.exists(os.path.join(nic_path, iface, 'bonding')):
+ nic_type = "bonding"
+ else:
+ nic_type = hw_lookup.get(read_file([os.path.join(nic_path, iface, 'type')]), "Unknown")
+
+ dev_link = os.path.join(nic_path, iface, 'device')
+ if os.path.exists(dev_link):
+ iftype = 'physical'
+ driver_path = os.path.join(dev_link, 'driver')
+ if os.path.exists(driver_path):
+ driver = os.path.basename(
+ os.path.realpath(driver_path))
+ else:
+ driver = 'Unknown'
+
+ else:
+ iftype = 'logical'
+ driver = ''
+
+ self.interfaces[iface] = {
+ "mtu": mtu,
+ "upper_devs_list": upper_devs_list,
+ "lower_devs_list": lower_devs_list,
+ "operstate": operstate,
+ "iftype": iftype,
+ "nic_type": nic_type,
+ "driver": driver,
+ "speed": speed,
+ "ipv4_address": get_ipv4_address(iface),
+ "ipv6_address": get_ipv6_address(iface),
+ }
+
+ @property
+ def nic_count(self):
+ # type: () -> int
+ """Return a total count of all physical NICs detected in the host"""
+ phys_devs = []
+ for iface in self.interfaces:
+ if self.interfaces[iface]["iftype"] == 'physical':
+ phys_devs.append(iface)
+ return len(phys_devs)
+
+
+ def _get_mem_data(self, field_name):
+ # type: (str) -> int
+ for line in self._meminfo:
+ if line.startswith(field_name):
+ _d = line.split()
+ return int(_d[1])
+ return 0
+
+ @property
+ def memory_total_kb(self):
+ # type: () -> int
+ """Determine the memory installed (kb)"""
+ return self._get_mem_data('MemTotal')
+
+ @property
+ def memory_free_kb(self):
+ # type: () -> int
+ """Determine the memory free (not cache, immediately usable)"""
+ return self._get_mem_data('MemFree')
+
+ @property
+ def memory_available_kb(self):
+ # type: () -> int
+ """Determine the memory available to new applications without swapping"""
+ return self._get_mem_data('MemAvailable')
+
+ @property
+ def vendor(self):
+ # type: () -> str
+ """Determine server vendor from DMI data in sysfs"""
+ return read_file(HostFacts._dmi_path_list, "sys_vendor")
+
+ @property
+ def model(self):
+ # type: () -> str
+ """Determine server model information from DMI data in sysfs"""
+ family = read_file(HostFacts._dmi_path_list, "product_family")
+ product = read_file(HostFacts._dmi_path_list, "product_name")
+ if family == 'Unknown' and product:
+ return "{}".format(product)
+
+ return "{} ({})".format(family, product)
+
+ @property
+ def bios_version(self):
+ # type: () -> str
+ """Determine server BIOS version from DMI data in sysfs"""
+ return read_file(HostFacts._dmi_path_list, "bios_version")
+
+ @property
+ def bios_date(self):
+ # type: () -> str
+ """Determine server BIOS date from DMI data in sysfs"""
+ return read_file(HostFacts._dmi_path_list, "bios_date")
+
+ @property
+ def timestamp(self):
+ # type: () -> float
+ """Return the current time as Epoch seconds"""
+ return time.time()
+
+ @property
+ def system_uptime(self):
+ # type: () -> float
+ """Return the system uptime (in secs)"""
+ raw_time = read_file(['/proc/uptime'])
+ up_secs, _ = raw_time.split()
+ return float(up_secs)
+
+ def kernel_security(self):
+ # type: () -> Dict[str, str]
+ """Determine the security features enabled in the kernel - SELinux, AppArmor"""
+ def _fetch_selinux():
+ """Read the selinux config file to determine state"""
+ security = {}
+ for selinux_path in HostFacts._selinux_path_list:
+ if os.path.exists(selinux_path):
+ selinux_config = read_file([selinux_path]).splitlines()
+ security['type'] = 'SELinux'
+ for line in selinux_config:
+ if line.strip().startswith('#'):
+ continue
+ k, v = line.split('=')
+ security[k] = v
+ if security['SELINUX'].lower() == "disabled":
+ security['description'] = "SELinux: Disabled"
+ else:
+ security['description'] = "SELinux: Enabled({}, {})".format(security['SELINUX'], security['SELINUXTYPE'])
+ return security
+
+ def _fetch_apparmor():
+ """Read the apparmor profiles directly, returning an overview of AppArmor status"""
+ security = {}
+ for apparmor_path in HostFacts._apparmor_path_list:
+ if os.path.exists(apparmor_path):
+ security['type'] = "AppArmor"
+ security['description'] = "AppArmor: Enabled"
+ try:
+ profiles = read_file(['/sys/kernel/security/apparmor/profiles'])
+ except OSError:
+ pass
+ else:
+ summary = {} # type: Dict[str, int]
+ for line in profiles.split('\n'):
+ item, mode = line.split(' ')
+ mode= mode.strip('()')
+ if mode in summary:
+ summary[mode] += 1
+ else:
+ summary[mode] = 0
+ summary_str = ",".join(["{} {}".format(v, k) for k, v in summary.items()])
+ security = {**security, **summary} # type: ignore
+ security['description'] += "({})".format(summary_str)
+
+ return security
+
+ if os.path.exists('/sys/kernel/security/lsm'):
+ lsm = read_file(['/sys/kernel/security/lsm']).strip()
+ if 'selinux' in lsm:
+ return _fetch_selinux()
+ elif 'apparmor' in lsm:
+ return _fetch_apparmor()
+ else:
+ return {
+ "type": "Unknown",
+ "description": "Linux Security Module framework is active, but is not using SELinux or AppArmor"
+ }
+
+ return {
+ "type": "None",
+ "description": "Linux Security Module framework is not available"
+ }
+
+ @property
+ def kernel_parameters(self):
+ # type: () -> Dict[str, str]
+ """Get kernel parameters required/used in Ceph clusters"""
+
+ k_param = {}
+ out, _, _ = call_throws(['sysctl', '-a'], verbosity=CallVerbosity.SILENT)
+ if out:
+ param_list = out.split('\n')
+ param_dict = { param.split(" = ")[0]:param.split(" = ")[-1] for param in param_list}
+
+ # return only desired parameters
+ if 'net.ipv4.ip_nonlocal_bind' in param_dict:
+ k_param['net.ipv4.ip_nonlocal_bind'] = param_dict['net.ipv4.ip_nonlocal_bind']
+
+ return k_param
+
+ def dump(self):
+ # type: () -> str
+ """Return the attributes of this HostFacts object as json"""
+ data = {k: getattr(self, k) for k in dir(self)
+ if not k.startswith('_') and
+ isinstance(getattr(self, k),
+ (float, int, str, list, dict, tuple))
+ }
+ return json.dumps(data, indent=2, sort_keys=True)
+
+##################################
+
+def command_gather_facts():
+ """gather_facts is intended to provide host releated metadata to the caller"""
+ host = HostFacts()
+ print(host.dump())
+
+
+##################################
+
+
def _get_parser():
# type: () -> argparse.ArgumentParser
parser = argparse.ArgumentParser(
type=int,
default=DEFAULT_RETRY,
help='max number of retries')
+ parser.add_argument(
+ '--env', '-e',
+ action='append',
+ default=[],
+ help='set environment variable')
subparsers = parser.add_subparsers(help='sub-command')
'--skip-pull',
action='store_true',
help='do not pull the latest image before adopting')
+ parser_adopt.add_argument(
+ '--force-start',
+ action='store_true',
+ help="start newly adoped daemon, even if it wasn't running previously")
+ parser_adopt.add_argument(
+ '--container-init',
+ action='store_true',
+ help='Run podman/docker with `--init`')
parser_rm_daemon = subparsers.add_parser(
'rm-daemon', help='remove daemon instance')
parser_shell.add_argument(
'--keyring', '-k',
help='ceph.keyring to pass through to the container')
+ parser_shell.add_argument(
+ '--mount', '-m',
+ help=("mount a file or directory in the container. "
+ "Support multiple mounts. "
+ "ie: `--mount /foo /bar:/bar`. "
+ "When no destination is passed, default is /mnt"),
+ nargs='+')
parser_shell.add_argument(
'--env', '-e',
action='append',
default=[],
help='set environment variable')
parser_shell.add_argument(
- 'command', nargs='*',
+ 'command', nargs=argparse.REMAINDER,
help='command (optional)')
parser_enter = subparsers.add_parser(
required=True,
help='daemon name (type.id)')
parser_enter.add_argument(
- 'command', nargs='*',
+ 'command', nargs=argparse.REMAINDER,
help='command')
parser_ceph_volume = subparsers.add_parser(
'--keyring', '-k',
help='ceph.keyring to pass through to the container')
parser_ceph_volume.add_argument(
- 'command', nargs='+',
+ 'command', nargs=argparse.REMAINDER,
help='command')
parser_unit = subparsers.add_parser(
parser_bootstrap.add_argument(
'--initial-dashboard-password',
help='Initial password for the initial dashboard user')
-
+ parser_bootstrap.add_argument(
+ '--ssl-dashboard-port',
+ type=int,
+ default = 8443,
+ help='Port number used to connect with dashboard using SSL')
parser_bootstrap.add_argument(
'--dashboard-key',
+ type=argparse.FileType('r'),
help='Dashboard key')
parser_bootstrap.add_argument(
'--dashboard-crt',
+ type=argparse.FileType('r'),
help='Dashboard certificate')
+ parser_bootstrap.add_argument(
+ '--ssh-config',
+ type=argparse.FileType('r'),
+ help='SSH config')
+ parser_bootstrap.add_argument(
+ '--ssh-private-key',
+ type=argparse.FileType('r'),
+ help='SSH private key')
+ parser_bootstrap.add_argument(
+ '--ssh-public-key',
+ type=argparse.FileType('r'),
+ help='SSH public key')
+ parser_bootstrap.add_argument(
+ '--ssh-user',
+ default='root',
+ help='set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users')
+
parser_bootstrap.add_argument(
'--skip-mon-network',
action='store_true',
'--skip-monitoring-stack',
action='store_true',
help='Do not automatically provision monitoring stack (prometheus, grafana, alertmanager, node-exporter)')
+ parser_bootstrap.add_argument(
+ '--apply-spec',
+ help='Apply cluster spec after bootstrap (copy ssh key, add hosts and apply services)')
+
+ parser_bootstrap.add_argument(
+ '--shared_ceph_folder',
+ metavar='CEPH_SOURCE_FOLDER',
+ help='Development mode. Several folders in containers are volumes mapped to different sub-folders in the ceph source folder')
+
+ parser_bootstrap.add_argument(
+ '--registry-url',
+ help='url for custom registry')
+ parser_bootstrap.add_argument(
+ '--registry-username',
+ help='username for custom registry')
+ parser_bootstrap.add_argument(
+ '--registry-password',
+ help='password for custom registry')
+ parser_bootstrap.add_argument(
+ '--registry-json',
+ help='json file with custom registry login info (URL, Username, Password)')
+ parser_bootstrap.add_argument(
+ '--container-init',
+ action='store_true',
+ help='Run podman/docker with `--init`')
parser_deploy = subparsers.add_parser(
'deploy', help='deploy a daemon')
'--skip-firewalld',
action='store_true',
help='Do not configure firewalld')
+ parser_deploy.add_argument(
+ '--tcp-ports',
+ help='List of tcp ports to open in the host firewall')
parser_deploy.add_argument(
'--reconfig',
action='store_true',
'--allow-ptrace',
action='store_true',
help='Allow SYS_PTRACE on daemon container')
+ parser_deploy.add_argument(
+ '--container-init',
+ action='store_true',
+ help='Run podman/docker with `--init`')
parser_check_host = subparsers.add_parser(
'check-host', help='check host configuration')
parser_add_repo.set_defaults(func=command_add_repo)
parser_add_repo.add_argument(
'--release',
- help='use latest version of a named release (e.g., octopus)')
+ help='use latest version of a named release (e.g., {})'.format(LATEST_STABLE_RELEASE))
parser_add_repo.add_argument(
'--version',
help='use specific upstream version (x.y.z)')
default=['cephadm'],
help='packages')
+ parser_registry_login = subparsers.add_parser(
+ 'registry-login', help='log host into authenticated registry')
+ parser_registry_login.set_defaults(func=command_registry_login)
+ parser_registry_login.add_argument(
+ '--registry-url',
+ help='url for custom registry')
+ parser_registry_login.add_argument(
+ '--registry-username',
+ help='username for custom registry')
+ parser_registry_login.add_argument(
+ '--registry-password',
+ help='password for custom registry')
+ parser_registry_login.add_argument(
+ '--registry-json',
+ help='json file with custom registry login info (URL, Username, Password)')
+ parser_registry_login.add_argument(
+ '--fsid',
+ help='cluster FSID')
+
+ parser_gather_facts = subparsers.add_parser(
+ 'gather-facts', help='gather and return host related information (JSON format)')
+ parser_gather_facts.set_defaults(func=command_gather_facts)
+
return parser
+
def _parse_args(av):
parser = _get_parser()
- return parser.parse_args(av)
+ args = parser.parse_args(av)
+ if 'command' in args and args.command and args.command[0] == "--":
+ args.command.pop(0)
+ return args
+
if __name__ == "__main__":
+
+ # root?
+ if os.geteuid() != 0:
+ sys.stderr.write('ERROR: cephadm should be run as root\n')
+ sys.exit(1)
+
+ # Logger configuration
+ if not os.path.exists(LOG_DIR):
+ os.makedirs(LOG_DIR)
+ dictConfig(logging_config)
+ logger = logging.getLogger()
+
# allow argv to be injected
try:
- av = injected_argv # type: ignore
+ av = injected_argv # type: ignore
except NameError:
av = sys.argv[1:]
+ logger.debug("%s\ncephadm %s" % ("-" * 80, av))
args = _parse_args(av)
+ # More verbose console output
if args.verbose:
- logging.basicConfig(level=logging.DEBUG)
- else:
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger('cephadm')
-
- # root?
- if os.geteuid() != 0:
- sys.stderr.write('ERROR: cephadm should be run as root\n')
- sys.exit(1)
-
- # podman or docker?
- if args.docker:
- container_path = find_program('docker')
- else:
- for i in CONTAINER_PREFERENCE:
- try:
- container_path = find_program(i)
- break
- except Exception as e:
- logger.debug('Could not locate %s: %s' % (i, e))
- if not container_path and args.func != command_prepare_host:
- sys.stderr.write('Unable to locate any of %s\n' % CONTAINER_PREFERENCE)
- sys.exit(1)
+ for handler in logger.handlers:
+ if handler.name == "console":
+ handler.setLevel(logging.DEBUG)
if 'func' not in args:
sys.stderr.write('No command specified; pass -h or --help for usage\n')
sys.exit(1)
+ # podman or docker?
+ if args.func != command_check_host:
+ if args.docker:
+ container_path = find_program('docker')
+ else:
+ for i in CONTAINER_PREFERENCE:
+ try:
+ container_path = find_program(i)
+ break
+ except Exception as e:
+ logger.debug('Could not locate %s: %s' % (i, e))
+ if not container_path and args.func != command_prepare_host\
+ and args.func != command_add_repo:
+ sys.stderr.write('Unable to locate any of %s\n' % CONTAINER_PREFERENCE)
+ sys.exit(1)
+
try:
r = args.func()
except Error as e: