]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/schedule.py
26bee82d9d5d3a4b2af062cfb7e7c3ffb11e5555
[ceph.git] / ceph / src / pybind / mgr / cephadm / schedule.py
1 import logging
2 import random
3 from typing import List, Optional, Callable, Iterable, TypeVar, Set
4
5 import orchestrator
6 from ceph.deployment.service_spec import PlacementSpec, HostPlacementSpec, ServiceSpec
7 from orchestrator._interface import DaemonDescription
8 from orchestrator import OrchestratorValidationError
9
10 logger = logging.getLogger(__name__)
11 T = TypeVar('T')
12
13
14 class BaseScheduler(object):
15 """
16 Base Scheduler Interface
17
18 * requires a ServiceSpec
19
20 `place(host_pool)` needs to return a List[HostPlacementSpec, ..]
21 """
22
23 def __init__(self, spec):
24 # type: (ServiceSpec) -> None
25 self.spec = spec
26
27 def place(self, host_pool, count=None):
28 # type: (List[T], Optional[int]) -> List[T]
29 raise NotImplementedError
30
31
32 class SimpleScheduler(BaseScheduler):
33 """
34 The most simple way to pick/schedule a set of hosts.
35 1) Shuffle the provided host_pool
36 2) Select from list up to :count
37 """
38
39 def __init__(self, spec):
40 super(SimpleScheduler, self).__init__(spec)
41
42 def place(self, host_pool, count=None):
43 # type: (List[T], Optional[int]) -> List[T]
44 if not host_pool:
45 return []
46 host_pool = [x for x in host_pool]
47 # gen seed off of self.spec to make shuffling deterministic
48 seed = hash(self.spec.service_name())
49 # shuffle for pseudo random selection
50 random.Random(seed).shuffle(host_pool)
51 return host_pool[:count]
52
53
54 class HostAssignment(object):
55
56 def __init__(self,
57 spec, # type: ServiceSpec
58 hosts: List[orchestrator.HostSpec],
59 get_daemons_func, # type: Callable[[str],List[orchestrator.DaemonDescription]]
60 filter_new_host=None, # type: Optional[Callable[[str],bool]]
61 scheduler=None, # type: Optional[BaseScheduler]
62 ):
63 assert spec and get_daemons_func
64 self.spec = spec # type: ServiceSpec
65 self.scheduler = scheduler if scheduler else SimpleScheduler(self.spec)
66 self.hosts: List[orchestrator.HostSpec] = hosts
67 self.filter_new_host = filter_new_host
68 self.service_name = spec.service_name()
69 self.daemons = get_daemons_func(self.service_name)
70
71 def hosts_by_label(self, label: str) -> List[orchestrator.HostSpec]:
72 return [h for h in self.hosts if label in h.labels]
73
74 def get_hostnames(self) -> List[str]:
75 return [h.hostname for h in self.hosts]
76
77 def validate(self):
78 self.spec.validate()
79
80 if self.spec.placement.count == 0:
81 raise OrchestratorValidationError(
82 f'<count> can not be 0 for {self.spec.one_line_str()}')
83
84 if self.spec.placement.hosts:
85 explicit_hostnames = {h.hostname for h in self.spec.placement.hosts}
86 unknown_hosts = explicit_hostnames.difference(set(self.get_hostnames()))
87 if unknown_hosts:
88 raise OrchestratorValidationError(
89 f'Cannot place {self.spec.one_line_str()} on {", ".join(sorted(unknown_hosts))}: Unknown hosts')
90
91 if self.spec.placement.host_pattern:
92 pattern_hostnames = self.spec.placement.filter_matching_hostspecs(self.hosts)
93 if not pattern_hostnames:
94 raise OrchestratorValidationError(
95 f'Cannot place {self.spec.one_line_str()}: No matching hosts')
96
97 if self.spec.placement.label:
98 label_hosts = self.hosts_by_label(self.spec.placement.label)
99 if not label_hosts:
100 raise OrchestratorValidationError(
101 f'Cannot place {self.spec.one_line_str()}: No matching '
102 f'hosts for label {self.spec.placement.label}')
103
104 def place(self):
105 # type: () -> List[HostPlacementSpec]
106 """
107 Generate a list of HostPlacementSpec taking into account:
108
109 * all known hosts
110 * hosts with existing daemons
111 * placement spec
112 * self.filter_new_host
113 """
114
115 self.validate()
116
117 count = self.spec.placement.count
118
119 # get candidates based on [hosts, label, host_pattern]
120 candidates = self.get_candidates()
121
122 # If we don't have <count> the list of candidates is definitive.
123 if count is None:
124 logger.debug('Provided hosts: %s' % candidates)
125 # if asked to place even number of mons, deploy 1 less
126 if self.spec.service_type == 'mon' and (len(candidates) % 2) == 0:
127 logger.info("deploying %s monitor(s) instead of %s so monitors may achieve consensus" % (
128 len(candidates) - 1, len(candidates)))
129 return candidates[0:len(candidates)-1]
130 return candidates
131
132 # if asked to place even number of mons, deploy 1 less
133 if self.spec.service_type == 'mon':
134 # if count >= number of candidates then number of candidates
135 # is determining factor in how many mons will be placed
136 if count >= len(candidates):
137 if (len(candidates) % 2) == 0:
138 logger.info("deploying %s monitor(s) instead of %s so monitors may achieve consensus" % (
139 len(candidates) - 1, len(candidates)))
140 count = len(candidates) - 1
141 # if count < number of candidates then count is determining
142 # factor in how many mons will get placed
143 else:
144 if (count % 2) == 0:
145 logger.info(
146 "deploying %s monitor(s) instead of %s so monitors may achieve consensus" % (count - 1, count))
147 count = count - 1
148
149 # prefer hosts that already have services.
150 # this avoids re-assigning to _new_ hosts
151 # and constant re-distribution of hosts when new nodes are
152 # added to the cluster
153 hosts_with_daemons = self.hosts_with_daemons(candidates)
154
155 # The amount of hosts that need to be selected in order to fulfill count.
156 need = count - len(hosts_with_daemons)
157
158 # hostspecs that are do not have daemons on them but are still candidates.
159 others = difference_hostspecs(candidates, hosts_with_daemons)
160
161 # we don't need any additional hosts
162 if need < 0:
163 return self.prefer_hosts_with_active_daemons(hosts_with_daemons, count)
164 else:
165 # exclusive to 'mon' daemons. Filter out hosts that don't have a public network assigned
166 if self.filter_new_host:
167 old = others
168 others = [h for h in others if self.filter_new_host(h.hostname)]
169 logger.debug('filtered %s down to %s' % (old, others))
170
171 # ask the scheduler to return a set of hosts with a up to the value of <count>
172 others = self.scheduler.place(others, need)
173 logger.debug('Combine hosts with existing daemons %s + new hosts %s' % (
174 hosts_with_daemons, others))
175 # if a host already has the anticipated daemon, merge it with the candidates
176 # to get a list of HostPlacementSpec that can be deployed on.
177 return list(merge_hostspecs(hosts_with_daemons, others))
178
179 def get_hosts_with_active_daemon(self, hosts: List[HostPlacementSpec]) -> List[HostPlacementSpec]:
180 active_hosts: List['HostPlacementSpec'] = []
181 for daemon in self.daemons:
182 if daemon.is_active:
183 for h in hosts:
184 if h.hostname == daemon.hostname:
185 active_hosts.append(h)
186 # remove duplicates before returning
187 return list(dict.fromkeys(active_hosts))
188
189 def prefer_hosts_with_active_daemons(self, hosts: List[HostPlacementSpec], count) -> List[HostPlacementSpec]:
190 # try to prefer host with active daemon if possible
191 active_hosts = self.get_hosts_with_active_daemon(hosts)
192 if len(active_hosts) != 0 and count > 0:
193 for host in active_hosts:
194 hosts.remove(host)
195 if len(active_hosts) >= count:
196 return self.scheduler.place(active_hosts, count)
197 else:
198 return list(merge_hostspecs(self.scheduler.place(active_hosts, count),
199 self.scheduler.place(hosts, count - len(active_hosts))))
200 # ask the scheduler to return a set of hosts with a up to the value of <count>
201 return self.scheduler.place(hosts, count)
202
203 def add_daemon_hosts(self, host_pool: List[HostPlacementSpec]) -> Set[HostPlacementSpec]:
204 hosts_with_daemons = {d.hostname for d in self.daemons}
205 _add_daemon_hosts = set()
206 for host in host_pool:
207 if host.hostname not in hosts_with_daemons:
208 _add_daemon_hosts.add(host)
209 return _add_daemon_hosts
210
211 def remove_daemon_hosts(self, host_pool: List[HostPlacementSpec]) -> Set[DaemonDescription]:
212 target_hosts = [h.hostname for h in host_pool]
213 _remove_daemon_hosts = set()
214 for d in self.daemons:
215 if d.hostname not in target_hosts:
216 _remove_daemon_hosts.add(d)
217 return _remove_daemon_hosts
218
219 def get_candidates(self) -> List[HostPlacementSpec]:
220 if self.spec.placement.hosts:
221 return self.spec.placement.hosts
222 elif self.spec.placement.label:
223 return [
224 HostPlacementSpec(x.hostname, '', '')
225 for x in self.hosts_by_label(self.spec.placement.label)
226 ]
227 elif self.spec.placement.host_pattern:
228 return [
229 HostPlacementSpec(x, '', '')
230 for x in self.spec.placement.filter_matching_hostspecs(self.hosts)
231 ]
232 # If none of the above and also no <count>
233 if self.spec.placement.count is None:
234 raise OrchestratorValidationError(
235 "placement spec is empty: no hosts, no label, no pattern, no count")
236 # backward compatibility: consider an empty placements to be the same pattern = *
237 return [
238 HostPlacementSpec(x.hostname, '', '')
239 for x in self.hosts
240 ]
241
242 def hosts_with_daemons(self, candidates: List[HostPlacementSpec]) -> List[HostPlacementSpec]:
243 """
244 Prefer hosts with daemons. Otherwise we'll constantly schedule daemons
245 on different hosts all the time. This is about keeping daemons where
246 they are. This isn't about co-locating.
247 """
248 hosts_with_daemons = {d.hostname for d in self.daemons}
249
250 # calc existing daemons (that aren't already in chosen)
251 existing = [hs for hs in candidates if hs.hostname in hosts_with_daemons]
252
253 logger.debug('Hosts with existing daemons: {}'.format(existing))
254 return existing
255
256
257 def merge_hostspecs(l: List[HostPlacementSpec], r: List[HostPlacementSpec]) -> Iterable[HostPlacementSpec]:
258 """
259 Merge two lists of HostPlacementSpec by hostname. always returns `l` first.
260
261 >>> list(merge_hostspecs([HostPlacementSpec(hostname='h', name='x', network='')],
262 ... [HostPlacementSpec(hostname='h', name='y', network='')]))
263 [HostPlacementSpec(hostname='h', network='', name='x')]
264
265 """
266 l_names = {h.hostname for h in l}
267 yield from l
268 yield from (h for h in r if h.hostname not in l_names)
269
270
271 def difference_hostspecs(l: List[HostPlacementSpec], r: List[HostPlacementSpec]) -> List[HostPlacementSpec]:
272 """
273 returns l "minus" r by hostname.
274
275 >>> list(difference_hostspecs([HostPlacementSpec(hostname='h1', name='x', network=''),
276 ... HostPlacementSpec(hostname='h2', name='y', network='')],
277 ... [HostPlacementSpec(hostname='h2', name='', network='')]))
278 [HostPlacementSpec(hostname='h1', network='', name='x')]
279
280 """
281 r_names = {h.hostname for h in r}
282 return [h for h in l if h.hostname not in r_names]