import logging
import random
-from typing import List, Optional, Callable
+from typing import List, Optional, Callable, Iterable, Tuple, TypeVar, Set
import orchestrator
from ceph.deployment.service_spec import PlacementSpec, HostPlacementSpec, ServiceSpec
+from orchestrator._interface import DaemonDescription
from orchestrator import OrchestratorValidationError
logger = logging.getLogger(__name__)
+T = TypeVar('T')
class BaseScheduler(object):
"""
Base Scheduler Interface
- * requires a placement_spec
+ * requires a ServiceSpec
`place(host_pool)` needs to return a List[HostPlacementSpec, ..]
"""
- def __init__(self, placement_spec):
- # type: (PlacementSpec) -> None
- self.placement_spec = placement_spec
+ def __init__(self, spec):
+ # type: (ServiceSpec) -> None
+ self.spec = spec
def place(self, host_pool, count=None):
- # type: (List, Optional[int]) -> List[HostPlacementSpec]
+ # type: (List[T], Optional[int]) -> List[T]
raise NotImplementedError
1) Shuffle the provided host_pool
2) Select from list up to :count
"""
- def __init__(self, placement_spec):
- super(SimpleScheduler, self).__init__(placement_spec)
+ def __init__(self, spec):
+ super(SimpleScheduler, self).__init__(spec)
def place(self, host_pool, count=None):
- # type: (List, Optional[int]) -> List[HostPlacementSpec]
+ # type: (List[T], Optional[int]) -> List[T]
if not host_pool:
return []
host_pool = [x for x in host_pool]
+ # gen seed off of self.spec to make shuffling deterministic
+ seed = hash(self.spec.service_name())
# shuffle for pseudo random selection
- random.shuffle(host_pool)
+ random.Random(seed).shuffle(host_pool)
return host_pool[:count]
class HostAssignment(object):
- """
- A class to detect if hosts are being passed imperative or declarative
- If the spec is populated via the `hosts/hosts` field it will not load
- any hosts into the list.
- If the spec isn't populated, i.e. when only num or label is present (declarative)
- it will use the provided `get_host_func` to load it from the inventory.
-
- Schedulers can be assigned to pick hosts from the pool.
- """
def __init__(self,
spec, # type: ServiceSpec
get_hosts_func, # type: Callable
get_daemons_func, # type: Callable[[str],List[orchestrator.DaemonDescription]]
-
filter_new_host=None, # type: Optional[Callable[[str],bool]]
scheduler=None, # type: Optional[BaseScheduler]
):
assert spec and get_hosts_func and get_daemons_func
self.spec = spec # type: ServiceSpec
- self.scheduler = scheduler if scheduler else SimpleScheduler(self.spec.placement)
+ self.scheduler = scheduler if scheduler else SimpleScheduler(self.spec)
self.get_hosts_func = get_hosts_func
- self.get_daemons_func = get_daemons_func
self.filter_new_host = filter_new_host
self.service_name = spec.service_name()
-
+ self.daemons = get_daemons_func(self.service_name)
def validate(self):
self.spec.validate()
+ if self.spec.placement.count == 0:
+ raise OrchestratorValidationError(
+ f'<count> can not be 0 for {self.spec.one_line_str()}')
+
if self.spec.placement.hosts:
explicit_hostnames = {h.hostname for h in self.spec.placement.hosts}
unknown_hosts = explicit_hostnames.difference(set(self.get_hosts_func()))
if unknown_hosts:
raise OrchestratorValidationError(
- f'Cannot place {self.spec.one_line_str()} on {unknown_hosts}: Unknown hosts')
+ f'Cannot place {self.spec.one_line_str()} on {", ".join(sorted(unknown_hosts))}: Unknown hosts')
if self.spec.placement.host_pattern:
pattern_hostnames = self.spec.placement.filter_matching_hosts(self.get_hosts_func)
def place(self):
# type: () -> List[HostPlacementSpec]
"""
- Load hosts into the spec.placement.hosts container.
+ Generate a list of HostPlacementSpec taking into account:
+
+ * all known hosts
+ * hosts with existing daemons
+ * placement spec
+ * self.filter_new_host
"""
self.validate()
- # count == 0
- if self.spec.placement.count == 0:
- return []
+ count = self.spec.placement.count
- # respect any explicit host list
- if self.spec.placement.hosts and not self.spec.placement.count:
- logger.debug('Provided hosts: %s' % self.spec.placement.hosts)
- return self.spec.placement.hosts
+ # get candidates based on [hosts, label, host_pattern]
+ candidates = self.get_candidates()
- # respect host_pattern
- if self.spec.placement.host_pattern:
- candidates = [
- HostPlacementSpec(x, '', '')
- for x in self.spec.placement.filter_matching_hosts(self.get_hosts_func)
- ]
- logger.debug('All hosts: {}'.format(candidates))
+ # If we don't have <count> the list of candidates is definitive.
+ if count is None:
+ logger.debug('Provided hosts: %s' % candidates)
return candidates
- count = 0
- if self.spec.placement.hosts and \
- self.spec.placement.count and \
- len(self.spec.placement.hosts) >= self.spec.placement.count:
- hosts = self.spec.placement.hosts
- logger.debug('place %d over provided host list: %s' % (
- count, hosts))
- count = self.spec.placement.count
+ # prefer hosts that already have services.
+ # this avoids re-assigning to _new_ hosts
+ # and constant re-distribution of hosts when new nodes are
+ # added to the cluster
+ hosts_with_daemons = self.hosts_with_daemons(candidates)
+
+ # The amount of hosts that need to be selected in order to fulfill count.
+ need = count - len(hosts_with_daemons)
+
+ # hostspecs that are do not have daemons on them but are still candidates.
+ others = difference_hostspecs(candidates, hosts_with_daemons)
+
+ # we don't need any additional hosts
+ if need < 0:
+ return self.prefer_hosts_with_active_daemons(hosts_with_daemons, count)
+ else:
+ # exclusive to 'mon' daemons. Filter out hosts that don't have a public network assigned
+ if self.filter_new_host:
+ old = others
+ others = [h for h in others if self.filter_new_host(h.hostname)]
+ logger.debug('filtered %s down to %s' % (old, candidates))
+
+ # ask the scheduler to return a set of hosts with a up to the value of <count>
+ others = self.scheduler.place(others, need)
+ logger.debug('Combine hosts with existing daemons %s + new hosts %s' % (
+ hosts_with_daemons, others))
+ # if a host already has the anticipated daemon, merge it with the candidates
+ # to get a list of HostPlacementSpec that can be deployed on.
+ return list(merge_hostspecs(hosts_with_daemons, others))
+
+ def get_hosts_with_active_daemon(self, hosts: List[HostPlacementSpec]) -> List[HostPlacementSpec]:
+ active_hosts: List['HostPlacementSpec'] = []
+ for daemon in self.daemons:
+ if daemon.is_active:
+ for h in hosts:
+ if h.hostname == daemon.hostname:
+ active_hosts.append(h)
+ # remove duplicates before returning
+ return list(dict.fromkeys(active_hosts))
+
+ def prefer_hosts_with_active_daemons(self, hosts: List[HostPlacementSpec], count) -> List[HostPlacementSpec]:
+ # try to prefer host with active daemon if possible
+ active_hosts = self.get_hosts_with_active_daemon(hosts)
+ if len(active_hosts) != 0 and count > 0:
+ for host in active_hosts:
+ hosts.remove(host)
+ if len(active_hosts) >= count:
+ return self.scheduler.place(active_hosts, count)
+ else:
+ return list(merge_hostspecs(self.scheduler.place(active_hosts, count),
+ self.scheduler.place(hosts, count - len(active_hosts))))
+ # ask the scheduler to return a set of hosts with a up to the value of <count>
+ return self.scheduler.place(hosts, count)
+
+ def add_daemon_hosts(self, host_pool: List[HostPlacementSpec]) -> Set[HostPlacementSpec]:
+ hosts_with_daemons = {d.hostname for d in self.daemons}
+ _add_daemon_hosts = set()
+ for host in host_pool:
+ if host.hostname not in hosts_with_daemons:
+ _add_daemon_hosts.add(host)
+ return _add_daemon_hosts
+
+ def remove_daemon_hosts(self, host_pool: List[HostPlacementSpec]) -> Set[DaemonDescription]:
+ target_hosts = [h.hostname for h in host_pool]
+ _remove_daemon_hosts = set()
+ for d in self.daemons:
+ if d.hostname not in target_hosts:
+ _remove_daemon_hosts.add(d)
+ return _remove_daemon_hosts
+
+ def get_candidates(self) -> List[HostPlacementSpec]:
+ if self.spec.placement.hosts:
+ return self.spec.placement.hosts
elif self.spec.placement.label:
- hosts = [
+ return [
HostPlacementSpec(x, '', '')
for x in self.get_hosts_func(label=self.spec.placement.label)
]
- if not self.spec.placement.count:
- logger.debug('Labeled hosts: {}'.format(hosts))
- return hosts
- count = self.spec.placement.count
- logger.debug('place %d over label %s: %s' % (
- count, self.spec.placement.label, hosts))
- else:
- hosts = [
+ elif self.spec.placement.host_pattern:
+ return [
HostPlacementSpec(x, '', '')
- for x in self.get_hosts_func()
+ for x in self.spec.placement.filter_matching_hosts(self.get_hosts_func)
]
- if self.spec.placement.count:
- count = self.spec.placement.count
- else:
- # this should be a totally empty spec given all of the
- # alternative paths above.
- assert self.spec.placement.count is None
- assert not self.spec.placement.hosts
- assert not self.spec.placement.label
- count = 1
- logger.debug('place %d over all hosts: %s' % (count, hosts))
-
- # we need to select a subset of the candidates
-
- # if a partial host list is provided, always start with that
- if len(self.spec.placement.hosts) < count:
- chosen = self.spec.placement.hosts
- else:
- chosen = []
+ # If none of the above and also no <count>
+ if self.spec.placement.count is None:
+ raise OrchestratorValidationError("placement spec is empty: no hosts, no label, no pattern, no count")
+ # backward compatibility: consider an empty placements to be the same pattern = *
+ return [
+ HostPlacementSpec(x, '', '')
+ for x in self.get_hosts_func()
+ ]
+
+ def hosts_with_daemons(self, candidates: List[HostPlacementSpec]) -> List[HostPlacementSpec]:
+ """
+ Prefer hosts with daemons. Otherwise we'll constantly schedule daemons
+ on different hosts all the time. This is about keeping daemons where
+ they are. This isn't about co-locating.
+ """
+ hosts_with_daemons = {d.hostname for d in self.daemons}
- # prefer hosts that already have services
- daemons = self.get_daemons_func(self.service_name)
- hosts_with_daemons = {d.hostname for d in daemons}
# calc existing daemons (that aren't already in chosen)
- chosen_hosts = [hs.hostname for hs in chosen]
- existing = [hs for hs in hosts
- if hs.hostname in hosts_with_daemons and \
- hs.hostname not in chosen_hosts]
- if len(chosen + existing) >= count:
- chosen = chosen + self.scheduler.place(
- existing,
- count - len(chosen))
- logger.debug('Hosts with existing daemons: {}'.format(chosen))
- return chosen
-
- need = count - len(existing + chosen)
- others = [hs for hs in hosts
- if hs.hostname not in hosts_with_daemons]
- if self.filter_new_host:
- old = others
- others = [h for h in others if self.filter_new_host(h.hostname)]
- logger.debug('filtered %s down to %s' % (old, hosts))
- chosen = chosen + self.scheduler.place(others, need)
- logger.debug('Combine hosts with existing daemons %s + new hosts %s' % (
- existing, chosen))
- return existing + chosen
+ existing = [hs for hs in candidates if hs.hostname in hosts_with_daemons]
+
+ logger.debug('Hosts with existing daemons: {}'.format(existing))
+ return existing
+
+
+def merge_hostspecs(l: List[HostPlacementSpec], r: List[HostPlacementSpec]) -> Iterable[HostPlacementSpec]:
+ """
+ Merge two lists of HostPlacementSpec by hostname. always returns `l` first.
+
+ >>> list(merge_hostspecs([HostPlacementSpec(hostname='h', name='x', network='')],
+ ... [HostPlacementSpec(hostname='h', name='y', network='')]))
+ [HostPlacementSpec(hostname='h', network='', name='x')]
+
+ """
+ l_names = {h.hostname for h in l}
+ yield from l
+ yield from (h for h in r if h.hostname not in l_names)
+
+
+def difference_hostspecs(l: List[HostPlacementSpec], r: List[HostPlacementSpec]) -> List[HostPlacementSpec]:
+ """
+ returns l "minus" r by hostname.
+
+ >>> list(difference_hostspecs([HostPlacementSpec(hostname='h1', name='x', network=''),
+ ... HostPlacementSpec(hostname='h2', name='y', network='')],
+ ... [HostPlacementSpec(hostname='h2', name='', network='')]))
+ [HostPlacementSpec(hostname='h1', network='', name='x')]
+
+ """
+ r_names = {h.hostname for h in r}
+ return [h for h in l if h.hostname not in r_names]
+