from mgr_module import HandleCommandResult, MonCommandFailed
-from ceph.deployment.service_spec import ServiceSpec, RGWSpec, CephExporterSpec
+from ceph.deployment.service_spec import ServiceSpec, RGWSpec, CephExporterSpec, MONSpec
from ceph.deployment.utils import is_ipv6, unwrap_ipv6
from mgr_util import build_url, merge_dicts
from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
"""
return False
- def primary_daemon_type(self) -> str:
+ def primary_daemon_type(self, spec: Optional[ServiceSpec] = None) -> str:
"""
This is the type of the primary (usually only) daemon to be deployed.
"""
return self.TYPE
- def per_host_daemon_type(self) -> Optional[str]:
+ def per_host_daemon_type(self, spec: Optional[ServiceSpec] = None) -> Optional[str]:
"""
If defined, this type of daemon will be deployed once for each host
containing one or more daemons of the primary type.
raise NotImplementedError()
def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
- # if this is called for a service type where it hasn't explcitly been
+ # if this is called for a service type where it hasn't explicitly been
# defined, return empty Daemon Desc
return DaemonDescription()
return daemon_spec
- def _check_safe_to_destroy(self, mon_id: str) -> None:
+ def config(self, spec: ServiceSpec) -> None:
+ assert self.TYPE == spec.service_type
+ self.set_crush_locations(self.mgr.cache.get_daemons_by_type('mon'), spec)
+
+ def _get_quorum_status(self) -> Dict[Any, Any]:
ret, out, err = self.mgr.check_mon_command({
'prefix': 'quorum_status',
})
try:
j = json.loads(out)
- except Exception:
- raise OrchestratorError('failed to parse quorum status')
+ except Exception as e:
+ raise OrchestratorError(f'failed to parse mon quorum status: {e}')
+ return j
- mons = [m['name'] for m in j['monmap']['mons']]
+ def _check_safe_to_destroy(self, mon_id: str) -> None:
+ quorum_status = self._get_quorum_status()
+ mons = [m['name'] for m in quorum_status['monmap']['mons']]
if mon_id not in mons:
logger.info('Safe to remove mon.%s: not in monmap (%s)' % (
mon_id, mons))
return
new_mons = [m for m in mons if m != mon_id]
- new_quorum = [m for m in j['quorum_names'] if m != mon_id]
+ new_quorum = [m for m in quorum_status['quorum_names'] if m != mon_id]
if len(new_quorum) > len(new_mons) / 2:
logger.info('Safe to remove mon.%s: new quorum should be %s (from %s)' %
(mon_id, new_quorum, new_mons))
# super().post_remove(daemon)
pass
+ def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
+ daemon_spec.final_config, daemon_spec.deps = super().generate_config(daemon_spec)
+
+ # realistically, we expect there to always be a mon spec
+ # in a real deployment, but the way teuthology deploys some daemons
+ # it's possible there might not be. For that reason we need to
+ # verify the service is present in the spec store.
+ if daemon_spec.service_name in self.mgr.spec_store:
+ mon_spec = cast(MONSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
+ if mon_spec.crush_locations:
+ if daemon_spec.host in mon_spec.crush_locations:
+ # the --crush-location flag only supports a single bucket=loc pair so
+ # others will have to be handled later. The idea is to set the flag
+ # for the first bucket=loc pair in the list in order to facilitate
+ # replacing a tiebreaker mon (https://docs.ceph.com/en/quincy/rados/operations/stretch-mode/#other-commands)
+ c_loc = mon_spec.crush_locations[daemon_spec.host][0]
+ daemon_spec.final_config['crush_location'] = c_loc
+
+ return daemon_spec.final_config, daemon_spec.deps
+
+ def set_crush_locations(self, daemon_descrs: List[DaemonDescription], spec: ServiceSpec) -> None:
+ logger.debug('Setting mon crush locations from spec')
+ if not daemon_descrs:
+ return
+ assert self.TYPE == spec.service_type
+ mon_spec = cast(MONSpec, spec)
+
+ if not mon_spec.crush_locations:
+ return
+
+ quorum_status = self._get_quorum_status()
+ mons_in_monmap = [m['name'] for m in quorum_status['monmap']['mons']]
+ for dd in daemon_descrs:
+ assert dd.daemon_id is not None
+ assert dd.hostname is not None
+ if dd.hostname not in mon_spec.crush_locations:
+ continue
+ if dd.daemon_id not in mons_in_monmap:
+ continue
+ # expected format for crush_locations from the quorum status is
+ # {bucket1=loc1,bucket2=loc2} etc. for the number of bucket=loc pairs
+ try:
+ current_crush_locs = [m['crush_location'] for m in quorum_status['monmap']['mons'] if m['name'] == dd.daemon_id][0]
+ except (KeyError, IndexError) as e:
+ logger.warning(f'Failed setting crush location for mon {dd.daemon_id}: {e}\n'
+ 'Mon may not have a monmap entry yet. Try re-applying mon spec once mon is confirmed up.')
+ desired_crush_locs = '{' + ','.join(mon_spec.crush_locations[dd.hostname]) + '}'
+ logger.debug(f'Found spec defined crush locations for mon on {dd.hostname}: {desired_crush_locs}')
+ logger.debug(f'Current crush locations for mon on {dd.hostname}: {current_crush_locs}')
+ if current_crush_locs != desired_crush_locs:
+ logger.info(f'Setting crush location for mon {dd.daemon_id} to {desired_crush_locs}')
+ try:
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'mon set_location',
+ 'name': dd.daemon_id,
+ 'args': mon_spec.crush_locations[dd.hostname]
+ })
+ except Exception as e:
+ logger.error(f'Failed setting crush location for mon {dd.daemon_id}: {e}')
+
class MgrService(CephService):
TYPE = 'mgr'
if ports:
daemon_spec.ports = ports
+ daemon_spec.ports.append(self.mgr.service_discovery_port)
daemon_spec.keyring = keyring
daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
def config(self, spec: RGWSpec) -> None: # type: ignore
assert self.TYPE == spec.service_type
- # set rgw_realm and rgw_zone, if present
+ # set rgw_realm rgw_zonegroup and rgw_zone, if present
if spec.rgw_realm:
ret, out, err = self.mgr.check_mon_command({
'prefix': 'config set',
'name': 'rgw_realm',
'value': spec.rgw_realm,
})
+ if spec.rgw_zonegroup:
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'config set',
+ 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
+ 'name': 'rgw_zonegroup',
+ 'value': spec.rgw_zonegroup,
+ })
if spec.rgw_zone:
ret, out, err = self.mgr.check_mon_command({
'prefix': 'config set',
args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
else:
args.append(f"port={port}")
+ else:
+ raise OrchestratorError(f'Invalid rgw_frontend_type parameter: {ftype}. Valid values are: beast, civetweb.')
+
+ if spec.rgw_frontend_extra_args is not None:
+ args.extend(spec.rgw_frontend_extra_args)
+
frontend = f'{ftype} {" ".join(args)}'
ret, out, err = self.mgr.check_mon_command({
assert self.TYPE == daemon_spec.daemon_type
daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
- if not self.mgr.cherrypy_thread:
+ if not self.mgr.http_server.agent:
raise OrchestratorError('Cannot deploy agent before creating cephadm endpoint')
keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), [])
return daemon_spec
def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
+ agent = self.mgr.http_server.agent
try:
- assert self.mgr.cherrypy_thread
- assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
- assert self.mgr.cherrypy_thread.server_port
+ assert agent
+ assert agent.ssl_certs.get_root_cert()
+ assert agent.server_port
except Exception:
raise OrchestratorError(
'Cannot deploy agent daemons until cephadm endpoint has finished generating certs')
cfg = {'target_ip': self.mgr.get_mgr_ip(),
- 'target_port': self.mgr.cherrypy_thread.server_port,
+ 'target_port': agent.server_port,
'refresh_period': self.mgr.agent_refresh_rate,
'listener_port': self.mgr.agent_starting_port,
'host': daemon_spec.host,
'device_enhanced_scan': str(self.mgr.device_enhanced_scan)}
- listener_cert, listener_key = self.mgr.cherrypy_thread.ssl_certs.generate_cert(
- self.mgr.inventory.get_addr(daemon_spec.host))
+ listener_cert, listener_key = agent.ssl_certs.generate_cert(daemon_spec.host, self.mgr.inventory.get_addr(daemon_spec.host))
config = {
'agent.json': json.dumps(cfg),
'keyring': daemon_spec.keyring,
- 'root_cert.pem': self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
+ 'root_cert.pem': agent.ssl_certs.get_root_cert(),
'listener.crt': listener_cert,
'listener.key': listener_key,
}
- return config, sorted([str(self.mgr.get_mgr_ip()), str(self.mgr.cherrypy_thread.server_port),
- self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
+ return config, sorted([str(self.mgr.get_mgr_ip()), str(agent.server_port),
+ agent.ssl_certs.get_root_cert(),
str(self.mgr.get_module_option('device_enhanced_scan'))])