import errno
import logging
import time
-import yaml
+from copy import copy
from threading import Event
from functools import wraps
from ceph.deployment import inventory, translate
from ceph.deployment.drive_group import DriveGroupSpec
-from ceph.deployment.drive_selection import selector
+from ceph.deployment.drive_selection.selector import DriveSelection
from ceph.deployment.service_spec import \
HostPlacementSpec, NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host
logger = logging.getLogger(__name__)
-DEFAULT_SSH_CONFIG = ('Host *\n'
- 'User root\n'
- 'StrictHostKeyChecking no\n'
- 'UserKnownHostsFile /dev/null\n')
+DEFAULT_SSH_CONFIG = """
+Host *
+ User root
+ StrictHostKeyChecking no
+ UserKnownHostsFile /dev/null
+ ConnectTimeout=30
+"""
DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
CEPH_DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ'
)
def rm(self, service_name):
- # type: (str) -> None
- if service_name in self.specs:
+ # type: (str) -> bool
+ found = service_name in self.specs
+ if found:
del self.specs[service_name]
del self.spec_created[service_name]
self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
+ return found
- def find(self, service_name=None):
- # type: (Optional[str]) -> List[ServiceSpec]
+ def find(self, service_name: Optional[str] = None) -> List[ServiceSpec]:
specs = []
for sn, spec in self.specs.items():
if not service_name or \
class HostCache():
def __init__(self, mgr):
# type: (CephadmOrchestrator) -> None
- self.mgr = mgr
+ self.mgr: CephadmOrchestrator = mgr
self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
self.last_daemon_update = {} # type: Dict[str, datetime.datetime]
self.devices = {} # type: Dict[str, List[inventory.Device]]
'deps': deps,
'last_config': stamp,
}
-
+
def update_last_host_check(self, host):
# type: (str) -> None
self.last_host_check[host] = datetime.datetime.utcnow()
r.append(dd)
return r
+ def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
+ for host, dm in self.daemons.items():
+ if host in self.mgr.offline_hosts:
+ def set_offline(dd: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
+ ret = copy(dd)
+ ret.status = -1
+ ret.status_desc = 'host is offline'
+ return ret
+ yield host, {name: set_offline(d) for name, d in dm.items()}
+ else:
+ yield host, dm
+
def get_daemons_by_service(self, service_name):
# type: (str) -> List[orchestrator.DaemonDescription]
result = [] # type: List[orchestrator.DaemonDescription]
def host_needs_daemon_refresh(self, host):
# type: (str) -> bool
+ if host in self.mgr.offline_hosts:
+ logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh')
+ return False
if host in self.daemon_refresh_queue:
self.daemon_refresh_queue.remove(host)
return True
def host_needs_device_refresh(self, host):
# type: (str) -> bool
+ if host in self.mgr.offline_hosts:
+ logger.debug(f'Host "{host}" marked as offline. Skipping device refresh')
+ return False
if host in self.device_refresh_queue:
self.device_refresh_queue.remove(host)
return True
if h not in self.inventory:
self.cache.rm_host(h)
+ # in-memory only.
+ self.offline_hosts: Set[str] = set()
+
def shutdown(self):
self.log.debug('shutdown')
self._worker_pool.close()
self._save_upgrade_state()
return
- def _check_hosts(self):
- self.log.debug('_check_hosts')
- bad_hosts = []
- hosts = self.inventory.keys()
- for host in hosts:
- if host not in self.inventory:
- continue
- self.log.debug(' checking %s' % host)
- try:
- out, err, code = self._run_cephadm(
- host, 'client', 'check-host', [],
- error_ok=True, no_fsid=True)
- if code:
- self.log.debug(' host %s failed check' % host)
- if self.warn_on_failed_host_check:
- bad_hosts.append('host %s failed check: %s' % (host, err))
- else:
- self.log.debug(' host %s ok' % host)
- except Exception as e:
- self.log.debug(' host %s failed check' % host)
- bad_hosts.append('host %s failed check: %s' % (host, e))
- if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
- del self.health_checks['CEPHADM_HOST_CHECK_FAILED']
- if bad_hosts:
- self.health_checks['CEPHADM_HOST_CHECK_FAILED'] = {
- 'severity': 'warning',
- 'summary': '%d hosts fail cephadm check' % len(bad_hosts),
- 'count': len(bad_hosts),
- 'detail': bad_hosts,
- }
- self.set_health_checks(self.health_checks)
-
def _check_host(self, host):
if host not in self.inventory:
return
host_detail.append(
'stray host %s has %d stray daemons: %s' % (
host, len(missing_names), missing_names))
- if host_detail:
+ if self.warn_on_stray_hosts and host_detail:
self.health_checks['CEPHADM_STRAY_HOST'] = {
'severity': 'warning',
'summary': '%d stray host(s) with %s daemon(s) '
'count': len(host_detail),
'detail': host_detail,
}
- if daemon_detail:
+ if self.warn_on_stray_daemons and daemon_detail:
self.health_checks['CEPHADM_STRAY_DAEMON'] = {
'severity': 'warning',
'summary': '%d stray daemons(s) not managed by cephadm' % (
Returns the generic service name
"""
p = re.compile(r'(.*)\.%s.*' % (host))
- p.sub(r'\1', daemon_id)
return '%s.%s' % (daemon_type, p.sub(r'\1', daemon_id))
def _save_inventory(self):
conn.exit()
self._cons = {}
+ def offline_hosts_remove(self, host):
+ if host in self.offline_hosts:
+ self.offline_hosts.remove(host)
+
+
@staticmethod
def can_run():
if remoto is not None:
tmp_dir = TemporaryDirectory()
path = tmp_dir.name + '/key'
try:
- subprocess.call([
+ subprocess.check_call([
'/usr/bin/ssh-keygen',
'-C', 'ceph-%s' % self._cluster_fsid,
'-N', '',
if not addr and host in self.inventory:
addr = self.inventory[host].get('addr', host)
+ self.offline_hosts_remove(host)
+
try:
- conn, connr = self._get_connection(addr)
+ try:
+ conn, connr = self._get_connection(addr)
+ except IOError as e:
+ if error_ok:
+ self.log.exception('failed to establish ssh connection')
+ return [], [str("Can't communicate with remote host, possibly because python3 is not installed there")], 1
+ raise
assert image or entity
if not image:
daemon_type = entity.split('.', 1)[0] # type: ignore
- if daemon_type in CEPH_TYPES:
+ if daemon_type in CEPH_TYPES or \
+ daemon_type == 'nfs':
# get container image
ret, image, err = self.mon_command({
'prefix': 'config get',
# this is a misleading exception as it seems to be thrown for
# any sort of connection failure, even those having nothing to
# do with "host not found" (e.g., ssh key permission denied).
+ self.offline_hosts.add(host)
user = 'root' if self.mode == 'root' else 'cephadm'
msg = f'Failed to connect to {host} ({addr}). ' \
f'Check that the host is reachable and accepts connections using the cephadm SSH key\n' \
self.inventory[spec.hostname] = spec.to_json()
self._save_inventory()
self.cache.prime_empty_host(spec.hostname)
+ self.offline_hosts_remove(spec.hostname)
self.event.set() # refresh stray health check
self.log.info('Added host %s' % spec.hostname)
return "Added host '{}'".format(spec.hostname)
hostname,
addr=info.get('addr', hostname),
labels=info.get('labels', []),
- status=info.get('status', ''),
+ status='Offline' if hostname in self.offline_hosts else info.get('status', ''),
))
return r
self._refresh_host_daemons(host)
# <service_map>
sm = {} # type: Dict[str, orchestrator.ServiceDescription]
- for h, dm in self.cache.daemons.items():
+ for h, dm in self.cache.get_daemons_with_volatile_status():
for name, dd in dm.items():
if service_type and service_type != dd.daemon_type:
continue
continue
if dd.daemon_type == 'osd':
continue # ignore OSDs for now
- spec = None
if dd.service_name() in self.spec_store.specs:
spec = self.spec_store.specs[dd.service_name()]
+ else:
+ spec = ServiceSpec(
+ unmanaged=True,
+ service_type=dd.daemon_type,
+ service_id=dd.service_id(),
+ placement=PlacementSpec(
+ hosts=[dd.hostname]
+ )
+ )
if n not in sm:
sm[n] = orchestrator.ServiceDescription(
- service_name=n,
last_refresh=dd.last_refresh,
container_image_id=dd.container_image_id,
container_image_name=dd.container_image_name,
spec=spec,
)
- if spec:
+ if dd.service_name() in self.spec_store.specs:
sm[n].size = self._get_spec_size(spec)
sm[n].created = self.spec_store.spec_created[dd.service_name()]
+ if service_type == 'nfs':
+ spec = cast(NFSServiceSpec, spec)
+ sm[n].rados_config_location = spec.rados_config_location()
else:
sm[n].size = 0
if dd.status == 1:
if service_name is not None and service_name != n:
continue
sm[n] = orchestrator.ServiceDescription(
- service_name=n,
spec=spec,
size=self._get_spec_size(spec),
running=0,
)
- return [s for n, s in sm.items()]
+ if service_type == 'nfs':
+ spec = cast(NFSServiceSpec, spec)
+ sm[n].rados_config_location = spec.rados_config_location()
+ return list(sm.values())
@trivial_completion
def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None,
for hostname, hi in self.inventory.items():
self._refresh_host_daemons(hostname)
result = []
- for h, dm in self.cache.daemons.items():
+ for h, dm in self.cache.get_daemons_with_volatile_status():
if host and h != host:
continue
for name, dd in dm.items():
@trivial_completion
def remove_service(self, service_name):
self.log.info('Remove service %s' % service_name)
- self.spec_store.rm(service_name)
- self._kick_serve_loop()
- return ['Removed service %s' % service_name]
+ found = self.spec_store.rm(service_name)
+ if found:
+ self._kick_serve_loop()
+ return ['Removed service %s' % service_name]
+ else:
+ # must be idempotent: still a success.
+ return [f'Failed to remove service. <{service_name}> was not found.']
@trivial_completion
def get_inventory(self, host_filter=None, refresh=False):
return blink(locs)
def get_osd_uuid_map(self, only_up=False):
- # type: (bool) -> Dict[str,str]
+ # type: (bool) -> Dict[str, str]
osd_map = self.get('osd_map')
r = {}
for o in osd_map['osds']:
# only include OSDs that have ever started in this map. this way
# an interrupted osd create can be repeated and succeed the second
# time around.
- if not only_up or o['up_from'] > 0:
- r[str(o['osd'])] = o['uuid']
+ osd_id = o.get('osd')
+ if osd_id is None:
+ raise OrchestratorError("Could not retrieve osd_id from osd_map")
+ if not only_up or (o['up_from'] > 0):
+ r[str(osd_id)] = o.get('uuid', '')
return r
@trivial_completion
def apply_drivegroups(self, specs: List[DriveGroupSpec]):
return [self._apply(spec) for spec in specs]
+ def find_destroyed_osds(self) -> Dict[str, List[str]]:
+ osd_host_map: Dict[str, List[str]] = dict()
+ ret, out, err = self.mon_command({
+ 'prefix': 'osd tree',
+ 'states': ['destroyed'],
+ 'format': 'json'
+ })
+ if ret != 0:
+ raise OrchestratorError(f"Caught error on calling 'osd tree destroyed' -> {err}")
+ try:
+ tree = json.loads(out)
+ except json.decoder.JSONDecodeError:
+ self.log.error(f"Could not decode json -> {out}")
+ return osd_host_map
+
+ nodes = tree.get('nodes', {})
+ for node in nodes:
+ if node.get('type') == 'host':
+ osd_host_map.update(
+ {node.get('name'): [str(_id) for _id in node.get('children', list())]}
+ )
+ return osd_host_map
+
@trivial_completion
def create_osds(self, drive_group: DriveGroupSpec):
- self.log.debug("Processing DriveGroup {}".format(drive_group))
+ self.log.debug(f"Processing DriveGroup {drive_group}")
+ ret = []
+ drive_group.osd_id_claims = self.find_destroyed_osds()
+ self.log.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}")
+ for host, drive_selection in self.prepare_drivegroup(drive_group):
+ self.log.info('Applying %s on host %s...' % (drive_group.service_id, host))
+ cmd = self.driveselection_to_ceph_volume(drive_group, drive_selection,
+ drive_group.osd_id_claims.get(host, []))
+ if not cmd:
+ self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_id))
+ continue
+ ret_msg = self._create_osd(host, cmd,
+ replace_osd_ids=drive_group.osd_id_claims.get(host, []))
+ ret.append(ret_msg)
+ return ", ".join(ret)
+
+ def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]:
# 1) use fn_filter to determine matching_hosts
matching_hosts = drive_group.placement.pattern_matches_hosts([x for x in self.cache.get_hosts()])
# 2) Map the inventory to the InventoryHost object
+ host_ds_map = []
+
+ # set osd_id_claims
def _find_inv_for_host(hostname: str, inventory_dict: dict):
# This is stupid and needs to be loaded with the host
return _inventory
raise OrchestratorError("No inventory found for host: {}".format(hostname))
- ret = []
- # 3) iterate over matching_host and call DriveSelection and to_ceph_volume
+ # 3) iterate over matching_host and call DriveSelection
self.log.debug(f"Checking matching hosts -> {matching_hosts}")
for host in matching_hosts:
inventory_for_host = _find_inv_for_host(host, self.cache.devices)
self.log.debug(f"Found inventory for host {inventory_for_host}")
- drive_selection = selector.DriveSelection(drive_group, inventory_for_host)
+ drive_selection = DriveSelection(drive_group, inventory_for_host)
self.log.debug(f"Found drive selection {drive_selection}")
- cmd = translate.to_ceph_volume(drive_group, drive_selection).run()
- self.log.debug(f"translated to cmd {cmd}")
- if not cmd:
- self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_name()))
- continue
- self.log.info('Applying %s on host %s...' % (
- drive_group.service_name(), host))
- ret_msg = self._create_osd(host, cmd)
- ret.append(ret_msg)
- return ", ".join(ret)
-
- def _create_osd(self, host, cmd):
+ host_ds_map.append((host, drive_selection))
+ return host_ds_map
+
+ def driveselection_to_ceph_volume(self, drive_group: DriveGroupSpec,
+ drive_selection: DriveSelection,
+ osd_id_claims: Optional[List[str]] = None,
+ preview: bool = False) -> Optional[str]:
+ self.log.debug(f"Translating DriveGroup <{drive_group}> to ceph-volume command")
+ cmd: Optional[str] = translate.to_ceph_volume(drive_group, drive_selection, osd_id_claims, preview=preview).run()
+ self.log.debug(f"Resulting ceph-volume cmd: {cmd}")
+ return cmd
+
+ def preview_drivegroups(self, drive_group_name: Optional[str] = None,
+ dg_specs: Optional[List[DriveGroupSpec]] = None) -> List[Dict[str, Dict[Any, Any]]]:
+ # find drivegroups
+ if drive_group_name:
+ drive_groups = cast(List[DriveGroupSpec],
+ self.spec_store.find(service_name=drive_group_name))
+ elif dg_specs:
+ drive_groups = dg_specs
+ else:
+ drive_groups = []
+ ret_all = []
+ for drive_group in drive_groups:
+ drive_group.osd_id_claims = self.find_destroyed_osds()
+ self.log.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}")
+ # prepare driveselection
+ for host, ds in self.prepare_drivegroup(drive_group):
+ cmd = self.driveselection_to_ceph_volume(drive_group, ds,
+ drive_group.osd_id_claims.get(host, []), preview=True)
+ if not cmd:
+ self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_name()))
+ continue
+ out, err, code = self._run_ceph_volume_command(host, cmd)
+ if out:
+ concat_out = json.loads(" ".join(out))
+ ret_all.append({'data': concat_out, 'drivegroup': drive_group.service_id, 'host': host})
+ return ret_all
+ def _run_ceph_volume_command(self, host: str, cmd: str) -> Tuple[List[str], List[str], int]:
self._require_hosts(host)
# get bootstrap key
'keyring': keyring,
})
- before_osd_uuid_map = self.get_osd_uuid_map(only_up=True)
-
split_cmd = cmd.split(' ')
_cmd = ['--config-json', '-', '--']
_cmd.extend(split_cmd)
_cmd,
stdin=j,
error_ok=True)
+ return out, err, code
+
+ def _create_osd(self, host, cmd, replace_osd_ids=None):
+ out, err, code = self._run_ceph_volume_command(host, cmd)
+
if code == 1 and ', it is already prepared' in '\n'.join(err):
# HACK: when we create against an existing LV, ceph-volume
# returns an error and the above message. To make this
'lvm', 'list',
'--format', 'json',
])
+ before_osd_uuid_map = self.get_osd_uuid_map(only_up=True)
osds_elems = json.loads('\n'.join(out))
fsid = self._cluster_fsid
osd_uuid_map = self.get_osd_uuid_map()
if osd['tags']['ceph.cluster_fsid'] != fsid:
self.log.debug('mismatched fsid, skipping %s' % osd)
continue
- if osd_id in before_osd_uuid_map:
- # this osd existed before we ran prepare
+ if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids:
+ # if it exists but is part of the replacement operation, don't skip
continue
if osd_id not in osd_uuid_map:
- self.log.debug('osd id %d does not exist in cluster' % osd_id)
+ self.log.debug('osd id {} does not exist in cluster'.format(osd_id))
continue
- if osd_uuid_map[osd_id] != osd['tags']['ceph.osd_fsid']:
+ if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']:
self.log.debug('mismatched osd uuid (cluster has %s, osd '
'has %s)' % (
- osd_uuid_map[osd_id],
+ osd_uuid_map.get(osd_id),
osd['tags']['ceph.osd_fsid']))
continue
def _get_config_and_keyring(self, daemon_type, daemon_id,
keyring=None,
- extra_config=None):
+ extra_ceph_config=None):
# type: (str, str, Optional[str], Optional[str]) -> Dict[str, Any]
# keyring
if not keyring:
ret, config, err = self.mon_command({
"prefix": "config generate-minimal-conf",
})
- if extra_config:
- config += extra_config
+ if extra_ceph_config:
+ config += extra_ceph_config
return {
'config': config,
osd_uuid_map=None):
if not extra_args:
extra_args = []
+ if not extra_config:
+ extra_config = {}
name = '%s.%s' % (daemon_type, daemon_id)
start_time = datetime.datetime.utcnow()
cephadm_config = self._get_config_and_keyring(
daemon_type, daemon_id,
keyring=keyring,
- extra_config=extra_config)
+ extra_ceph_config=extra_config.pop('config', ''))
+ if extra_config:
+ cephadm_config.update({'files': extra_config})
extra_args.extend(['--config-json', '-'])
# osd deployments needs an --osd-uuid arg
if daemon_type == 'osd':
if not osd_uuid_map:
osd_uuid_map = self.get_osd_uuid_map()
- osd_uuid = osd_uuid_map.get(daemon_id, None)
+ osd_uuid = osd_uuid_map.get(daemon_id)
if not osd_uuid:
raise OrchestratorError('osd.%d not in osdmap' % daemon_id)
extra_args.extend(['--osd-fsid', osd_uuid])
'prometheus': self._create_prometheus,
'node-exporter': self._create_node_exporter,
'crash': self._create_crash,
+ 'iscsi': self._create_iscsi,
}
config_fns = {
'mds': self._config_mds,
'rgw': self._config_rgw,
'nfs': self._config_nfs,
+ 'iscsi': self._config_iscsi,
}
create_func = create_fns.get(daemon_type, None)
if not create_func:
args.append((daemon_id, host, network)) # type: ignore
elif daemon_type == 'nfs':
args.append((daemon_id, host, spec)) # type: ignore
+ elif daemon_type == 'iscsi':
+ args.append((daemon_id, host, spec)) # type: ignore
else:
args.append((daemon_id, host)) # type: ignore
return self._create_daemon('mon', name, host,
keyring=keyring,
- extra_config=extra_config)
+ extra_config={'config': extra_config})
def add_mon(self, spec):
# type: (ServiceSpec) -> orchestrator.Completion
'mgr': PlacementSpec(count=2),
'mds': PlacementSpec(count=2),
'rgw': PlacementSpec(count=2),
+ 'iscsi': PlacementSpec(count=1),
'rbd-mirror': PlacementSpec(count=2),
'nfs': PlacementSpec(count=1),
'grafana': PlacementSpec(count=1),
spec.service_name(), spec.placement.pretty_str()))
self.spec_store.save(spec)
self._kick_serve_loop()
- return "Scheduled %s update..." % spec.service_type
+ return "Scheduled %s update..." % spec.service_name()
@trivial_completion
def apply(self, specs: List[ServiceSpec]):
# ensure rgw_realm and rgw_zone is set for these daemons
ret, out, err = self.mon_command({
'prefix': 'config set',
- 'who': 'client.rgw.' + spec.service_id,
+ 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
'name': 'rgw_zone',
'value': spec.rgw_zone,
})
ret, out, err = self.mon_command({
'prefix': 'config set',
- 'who': 'client.rgw.' + spec.rgw_realm,
+ 'who': f"{utils.name_to_config_section('rgw')}.{spec.rgw_realm}",
'name': 'rgw_realm',
'value': spec.rgw_realm,
})
- if spec.ssl:
- v = 'beast ssl_port=%d' % spec.get_port()
- else:
- v = 'beast port=%d' % spec.get_port()
ret, out, err = self.mon_command({
'prefix': 'config set',
- 'who': 'client.rgw.' + spec.service_id,
+ 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
'name': 'rgw_frontends',
- 'value': v,
+ 'value': spec.rgw_frontends_config_value(),
})
+ if spec.rgw_frontend_ssl_certificate:
+ if isinstance(spec.rgw_frontend_ssl_certificate, list):
+ cert_data = '\n'.join(spec.rgw_frontend_ssl_certificate)
+ else:
+ cert_data = spec.rgw_frontend_ssl_certificate
+ ret, out, err = self.mon_command({
+ 'prefix': 'config-key set',
+ 'key': f'rgw/cert/{spec.rgw_realm}/{spec.rgw_zone}.crt',
+ 'val': cert_data,
+ })
+
+ if spec.rgw_frontend_ssl_key:
+ if isinstance(spec.rgw_frontend_ssl_key, list):
+ key_data = '\n'.join(spec.rgw_frontend_ssl_key)
+ else:
+ key_data = spec.rgw_frontend_ssl_key
+ ret, out, err = self.mon_command({
+ 'prefix': 'config-key set',
+ 'key': f'rgw/cert/{spec.rgw_realm}/{spec.rgw_zone}.key',
+ 'val': key_data,
+ })
+
+ logger.info('Saving service %s spec with placement %s' % (
+ spec.service_name(), spec.placement.pretty_str()))
+ self.spec_store.save(spec)
+
def _create_rgw(self, rgw_id, host):
ret, keyring, err = self.mon_command({
'prefix': 'auth get-or-create',
- 'entity': 'client.rgw.' + rgw_id,
- 'caps': ['mon', 'allow rw',
+ 'entity': f"{utils.name_to_config_section('rgw')}.{rgw_id}",
+ 'caps': ['mon', 'allow *',
'mgr', 'allow rw',
'osd', 'allow rwx'],
})
def apply_rgw(self, spec):
return self._apply(spec)
+ def add_iscsi(self, spec):
+ # type: (ServiceSpec) -> orchestrator.Completion
+ return self._add_daemon('iscsi', spec, self._create_iscsi, self._config_iscsi)
+
+ def _config_iscsi(self, spec):
+ logger.info('Saving service %s spec with placement %s' % (
+ spec.service_name(), spec.placement.pretty_str()))
+ self.spec_store.save(spec)
+
+ def _create_iscsi(self, igw_id, host, spec):
+ ret, keyring, err = self.mon_command({
+ 'prefix': 'auth get-or-create',
+ 'entity': utils.name_to_config_section('iscsi') + '.' + igw_id,
+ 'caps': ['mon', 'allow rw',
+ 'osd', f'allow rwx pool={spec.pool}'],
+ })
+
+ api_secure = 'false' if spec.api_secure is None else spec.api_secure
+ igw_conf = f"""
+# generated by cephadm
+[config]
+cluster_client_name = {utils.name_to_config_section('iscsi')}.{igw_id}
+pool = {spec.pool}
+trusted_ip_list = {spec.trusted_ip_list or ''}
+minimum_gateways = 1
+fqdn_enabled = {spec.fqdn_enabled or ''}
+api_port = {spec.api_port or ''}
+api_user = {spec.api_user or ''}
+api_password = {spec.api_password or ''}
+api_secure = {api_secure}
+"""
+ extra_config = {'iscsi-gateway.cfg': igw_conf}
+ return self._create_daemon('iscsi', igw_id, host, keyring=keyring,
+ extra_config=extra_config)
+
+ @trivial_completion
+ def apply_iscsi(self, spec):
+ return self._apply(spec)
+
def add_rbd_mirror(self, spec):
return self._add_daemon('rbd-mirror', spec, self._create_rbd_mirror)
cert, pkey = create_self_signed_cert('Ceph', 'cephadm')
self.set_store('grafana_crt', cert)
self.set_store('grafana_key', pkey)
+ self.mon_command({
+ 'prefix': 'dashboard set-grafana-api-ssl-verify',
+ 'value': 'false',
+ })
+
+
config_file = {
'files': {
"""
return self.rm_util.report
- @trivial_completion
- def list_specs(self, service_name=None):
- """
- Loads all entries from the service_spec mon_store root.
- """
- return self.spec_store.find(service_name=service_name)
-
class BaseScheduler(object):
"""
logger.debug('Combine hosts with existing daemons %s + new hosts %s' % (
existing, chosen))
return existing + chosen
-