]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/schedule.py
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / pybind / mgr / cephadm / schedule.py
1 import ipaddress
2 import hashlib
3 import logging
4 import random
5 from typing import List, Optional, Callable, TypeVar, Tuple, NamedTuple, Dict
6
7 import orchestrator
8 from ceph.deployment.service_spec import ServiceSpec
9 from orchestrator._interface import DaemonDescription
10 from orchestrator import OrchestratorValidationError
11 from .utils import RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES
12
13 logger = logging.getLogger(__name__)
14 T = TypeVar('T')
15
16
17 class DaemonPlacement(NamedTuple):
18 daemon_type: str
19 hostname: str
20 network: str = '' # for mons only
21 name: str = ''
22 ip: Optional[str] = None
23 ports: List[int] = []
24 rank: Optional[int] = None
25 rank_generation: Optional[int] = None
26
27 def __str__(self) -> str:
28 res = self.daemon_type + ':' + self.hostname
29 other = []
30 if self.rank is not None:
31 other.append(f'rank={self.rank}.{self.rank_generation}')
32 if self.network:
33 other.append(f'network={self.network}')
34 if self.name:
35 other.append(f'name={self.name}')
36 if self.ports:
37 other.append(f'{self.ip or "*"}:{",".join(map(str, self.ports))}')
38 if other:
39 res += '(' + ' '.join(other) + ')'
40 return res
41
42 def renumber_ports(self, n: int) -> 'DaemonPlacement':
43 return DaemonPlacement(
44 self.daemon_type,
45 self.hostname,
46 self.network,
47 self.name,
48 self.ip,
49 [p + n for p in self.ports],
50 self.rank,
51 self.rank_generation,
52 )
53
54 def assign_rank(self, rank: int, gen: int) -> 'DaemonPlacement':
55 return DaemonPlacement(
56 self.daemon_type,
57 self.hostname,
58 self.network,
59 self.name,
60 self.ip,
61 self.ports,
62 rank,
63 gen,
64 )
65
66 def assign_name(self, name: str) -> 'DaemonPlacement':
67 return DaemonPlacement(
68 self.daemon_type,
69 self.hostname,
70 self.network,
71 name,
72 self.ip,
73 self.ports,
74 self.rank,
75 self.rank_generation,
76 )
77
78 def assign_rank_generation(
79 self,
80 rank: int,
81 rank_map: Dict[int, Dict[int, Optional[str]]]
82 ) -> 'DaemonPlacement':
83 if rank not in rank_map:
84 rank_map[rank] = {}
85 gen = 0
86 else:
87 gen = max(rank_map[rank].keys()) + 1
88 rank_map[rank][gen] = None
89 return DaemonPlacement(
90 self.daemon_type,
91 self.hostname,
92 self.network,
93 self.name,
94 self.ip,
95 self.ports,
96 rank,
97 gen,
98 )
99
100 def matches_daemon(self, dd: DaemonDescription) -> bool:
101 if self.daemon_type != dd.daemon_type:
102 return False
103 if self.hostname != dd.hostname:
104 return False
105 # fixme: how to match against network?
106 if self.name and self.name != dd.daemon_id:
107 return False
108 if self.ports:
109 if self.ports != dd.ports and dd.ports:
110 return False
111 if self.ip != dd.ip and dd.ip:
112 return False
113 return True
114
115 def matches_rank_map(
116 self,
117 dd: DaemonDescription,
118 rank_map: Optional[Dict[int, Dict[int, Optional[str]]]],
119 ranks: List[int]
120 ) -> bool:
121 if rank_map is None:
122 # daemon should have no rank
123 return dd.rank is None
124
125 if dd.rank is None:
126 return False
127
128 if dd.rank not in rank_map:
129 return False
130 if dd.rank not in ranks:
131 return False
132
133 # must be the highest/newest rank_generation
134 if dd.rank_generation != max(rank_map[dd.rank].keys()):
135 return False
136
137 # must be *this* daemon
138 return rank_map[dd.rank][dd.rank_generation] == dd.daemon_id
139
140
141 class HostAssignment(object):
142
143 def __init__(self,
144 spec: ServiceSpec,
145 hosts: List[orchestrator.HostSpec],
146 unreachable_hosts: List[orchestrator.HostSpec],
147 draining_hosts: List[orchestrator.HostSpec],
148 daemons: List[orchestrator.DaemonDescription],
149 related_service_daemons: Optional[List[DaemonDescription]] = None,
150 networks: Dict[str, Dict[str, Dict[str, List[str]]]] = {},
151 filter_new_host: Optional[Callable[[str, ServiceSpec], bool]] = None,
152 allow_colo: bool = False,
153 primary_daemon_type: Optional[str] = None,
154 per_host_daemon_type: Optional[str] = None,
155 rank_map: Optional[Dict[int, Dict[int, Optional[str]]]] = None,
156 ):
157 assert spec
158 self.spec = spec # type: ServiceSpec
159 self.primary_daemon_type = primary_daemon_type or spec.service_type
160 self.hosts: List[orchestrator.HostSpec] = hosts
161 self.unreachable_hosts: List[orchestrator.HostSpec] = unreachable_hosts
162 self.draining_hosts: List[orchestrator.HostSpec] = draining_hosts
163 self.filter_new_host = filter_new_host
164 self.service_name = spec.service_name()
165 self.daemons = daemons
166 self.related_service_daemons = related_service_daemons
167 self.networks = networks
168 self.allow_colo = allow_colo
169 self.per_host_daemon_type = per_host_daemon_type
170 self.ports_start = spec.get_port_start()
171 self.rank_map = rank_map
172
173 def hosts_by_label(self, label: str) -> List[orchestrator.HostSpec]:
174 return [h for h in self.hosts if label in h.labels]
175
176 def get_hostnames(self) -> List[str]:
177 return [h.hostname for h in self.hosts]
178
179 def validate(self) -> None:
180 self.spec.validate()
181
182 if self.spec.placement.count == 0:
183 raise OrchestratorValidationError(
184 f'<count> can not be 0 for {self.spec.one_line_str()}')
185
186 if (
187 self.spec.placement.count_per_host is not None
188 and self.spec.placement.count_per_host > 1
189 and not self.allow_colo
190 ):
191 raise OrchestratorValidationError(
192 f'Cannot place more than one {self.spec.service_type} per host'
193 )
194
195 if self.spec.placement.hosts:
196 explicit_hostnames = {h.hostname for h in self.spec.placement.hosts}
197 known_hosts = self.get_hostnames() + [h.hostname for h in self.draining_hosts]
198 unknown_hosts = explicit_hostnames.difference(set(known_hosts))
199 if unknown_hosts:
200 raise OrchestratorValidationError(
201 f'Cannot place {self.spec.one_line_str()} on {", ".join(sorted(unknown_hosts))}: Unknown hosts')
202
203 if self.spec.placement.host_pattern:
204 pattern_hostnames = self.spec.placement.filter_matching_hostspecs(self.hosts)
205 if not pattern_hostnames:
206 raise OrchestratorValidationError(
207 f'Cannot place {self.spec.one_line_str()}: No matching hosts')
208
209 if self.spec.placement.label:
210 label_hosts = self.hosts_by_label(self.spec.placement.label)
211 if not label_hosts:
212 raise OrchestratorValidationError(
213 f'Cannot place {self.spec.one_line_str()}: No matching '
214 f'hosts for label {self.spec.placement.label}')
215
216 def place_per_host_daemons(
217 self,
218 slots: List[DaemonPlacement],
219 to_add: List[DaemonPlacement],
220 to_remove: List[orchestrator.DaemonDescription],
221 ) -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]]:
222 if self.per_host_daemon_type:
223 host_slots = [
224 DaemonPlacement(daemon_type=self.per_host_daemon_type,
225 hostname=hostname)
226 for hostname in set([s.hostname for s in slots])
227 ]
228 existing = [
229 d for d in self.daemons if d.daemon_type == self.per_host_daemon_type
230 ]
231 slots += host_slots
232 for dd in existing:
233 found = False
234 for p in host_slots:
235 if p.matches_daemon(dd):
236 host_slots.remove(p)
237 found = True
238 break
239 if not found:
240 to_remove.append(dd)
241 to_add += host_slots
242
243 to_remove = [d for d in to_remove if d.hostname not in [
244 h.hostname for h in self.unreachable_hosts]]
245
246 return slots, to_add, to_remove
247
248 def place(self):
249 # type: () -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]]
250 """
251 Generate a list of HostPlacementSpec taking into account:
252
253 * all known hosts
254 * hosts with existing daemons
255 * placement spec
256 * self.filter_new_host
257 """
258
259 self.validate()
260
261 count = self.spec.placement.count
262
263 # get candidate hosts based on [hosts, label, host_pattern]
264 candidates = self.get_candidates() # type: List[DaemonPlacement]
265 if self.primary_daemon_type in RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES:
266 # remove unreachable hosts that are not in maintenance so daemons
267 # on these hosts will be rescheduled
268 candidates = self.remove_non_maintenance_unreachable_candidates(candidates)
269
270 def expand_candidates(ls: List[DaemonPlacement], num: int) -> List[DaemonPlacement]:
271 r = []
272 for offset in range(num):
273 r.extend([dp.renumber_ports(offset) for dp in ls])
274 return r
275
276 # consider enough slots to fulfill target count-per-host or count
277 if count is None:
278 if self.spec.placement.count_per_host:
279 per_host = self.spec.placement.count_per_host
280 else:
281 per_host = 1
282 candidates = expand_candidates(candidates, per_host)
283 elif self.allow_colo and candidates:
284 per_host = 1 + ((count - 1) // len(candidates))
285 candidates = expand_candidates(candidates, per_host)
286
287 # consider (preserve) existing daemons in a particular order...
288 daemons = sorted(
289 [
290 d for d in self.daemons if d.daemon_type == self.primary_daemon_type
291 ],
292 key=lambda d: (
293 not d.is_active, # active before standby
294 d.rank is not None, # ranked first, then non-ranked
295 d.rank, # low ranks
296 0 - (d.rank_generation or 0), # newer generations first
297 )
298 )
299
300 # sort candidates into existing/used slots that already have a
301 # daemon, and others (the rest)
302 existing_active: List[orchestrator.DaemonDescription] = []
303 existing_standby: List[orchestrator.DaemonDescription] = []
304 existing_slots: List[DaemonPlacement] = []
305 to_add: List[DaemonPlacement] = []
306 to_remove: List[orchestrator.DaemonDescription] = []
307 ranks: List[int] = list(range(len(candidates)))
308 others: List[DaemonPlacement] = candidates.copy()
309 for dd in daemons:
310 found = False
311 for p in others:
312 if p.matches_daemon(dd) and p.matches_rank_map(dd, self.rank_map, ranks):
313 others.remove(p)
314 if dd.is_active:
315 existing_active.append(dd)
316 else:
317 existing_standby.append(dd)
318 if dd.rank is not None:
319 assert dd.rank_generation is not None
320 p = p.assign_rank(dd.rank, dd.rank_generation)
321 ranks.remove(dd.rank)
322 existing_slots.append(p)
323 found = True
324 break
325 if not found:
326 to_remove.append(dd)
327
328 # TODO: At some point we want to deploy daemons that are on offline hosts
329 # at what point we do this differs per daemon type. Stateless daemons we could
330 # do quickly to improve availability. Stateful daemons we might want to wait longer
331 # to see if the host comes back online
332
333 existing = existing_active + existing_standby
334
335 # build to_add
336 if not count:
337 to_add = [dd for dd in others if dd.hostname not in [
338 h.hostname for h in self.unreachable_hosts]]
339 else:
340 # The number of new slots that need to be selected in order to fulfill count
341 need = count - len(existing)
342
343 # we don't need any additional placements
344 if need <= 0:
345 to_remove.extend(existing[count:])
346 del existing_slots[count:]
347 return self.place_per_host_daemons(existing_slots, [], to_remove)
348
349 if self.related_service_daemons:
350 # prefer to put daemons on the same host(s) as daemons of the related service
351 # Note that we are only doing this over picking arbitrary hosts to satisfy
352 # the count. We are not breaking any deterministic placements in order to
353 # match the placement with a related service.
354 related_service_hosts = list(set(dd.hostname for dd in self.related_service_daemons))
355 matching_dps = [dp for dp in others if dp.hostname in related_service_hosts]
356 for dp in matching_dps:
357 if need <= 0:
358 break
359 if dp.hostname in related_service_hosts and dp.hostname not in [h.hostname for h in self.unreachable_hosts]:
360 logger.debug(f'Preferring {dp.hostname} for service {self.service_name} as related daemons have been placed there')
361 to_add.append(dp)
362 need -= 1 # this is last use of need so it can work as a counter
363 # at this point, we've either met our placement quota entirely using hosts with related
364 # service daemons, or we still need to place more. If we do need to place more,
365 # we should make sure not to re-use hosts with related service daemons by filtering
366 # them out from the "others" list
367 if need > 0:
368 others = [dp for dp in others if dp.hostname not in related_service_hosts]
369
370 for dp in others:
371 if need <= 0:
372 break
373 if dp.hostname not in [h.hostname for h in self.unreachable_hosts]:
374 to_add.append(dp)
375 need -= 1 # this is last use of need in this function so it can work as a counter
376
377 if self.rank_map is not None:
378 # assign unused ranks (and rank_generations) to to_add
379 assert len(ranks) >= len(to_add)
380 for i in range(len(to_add)):
381 to_add[i] = to_add[i].assign_rank_generation(ranks[i], self.rank_map)
382
383 logger.debug('Combine hosts with existing daemons %s + new hosts %s' % (existing, to_add))
384 return self.place_per_host_daemons(existing_slots + to_add, to_add, to_remove)
385
386 def find_ip_on_host(self, hostname: str, subnets: List[str]) -> Optional[str]:
387 for subnet in subnets:
388 ips: List[str] = []
389 # following is to allow loopback interfaces for both ipv4 and ipv6. Since we
390 # only have the subnet (and no IP) we assume default loopback IP address.
391 if ipaddress.ip_network(subnet).is_loopback:
392 if ipaddress.ip_network(subnet).version == 4:
393 ips.append('127.0.0.1')
394 else:
395 ips.append('::1')
396 for iface, iface_ips in self.networks.get(hostname, {}).get(subnet, {}).items():
397 ips.extend(iface_ips)
398 if ips:
399 return sorted(ips)[0]
400 return None
401
402 def get_candidates(self) -> List[DaemonPlacement]:
403 if self.spec.placement.hosts:
404 ls = [
405 DaemonPlacement(daemon_type=self.primary_daemon_type,
406 hostname=h.hostname, network=h.network, name=h.name,
407 ports=self.ports_start)
408 for h in self.spec.placement.hosts if h.hostname not in [dh.hostname for dh in self.draining_hosts]
409 ]
410 elif self.spec.placement.label:
411 ls = [
412 DaemonPlacement(daemon_type=self.primary_daemon_type,
413 hostname=x.hostname, ports=self.ports_start)
414 for x in self.hosts_by_label(self.spec.placement.label)
415 ]
416 elif self.spec.placement.host_pattern:
417 ls = [
418 DaemonPlacement(daemon_type=self.primary_daemon_type,
419 hostname=x, ports=self.ports_start)
420 for x in self.spec.placement.filter_matching_hostspecs(self.hosts)
421 ]
422 elif (
423 self.spec.placement.count is not None
424 or self.spec.placement.count_per_host is not None
425 ):
426 ls = [
427 DaemonPlacement(daemon_type=self.primary_daemon_type,
428 hostname=x.hostname, ports=self.ports_start)
429 for x in self.hosts
430 ]
431 else:
432 raise OrchestratorValidationError(
433 "placement spec is empty: no hosts, no label, no pattern, no count")
434
435 # allocate an IP?
436 if self.spec.networks:
437 orig = ls.copy()
438 ls = []
439 for p in orig:
440 ip = self.find_ip_on_host(p.hostname, self.spec.networks)
441 if ip:
442 ls.append(DaemonPlacement(daemon_type=self.primary_daemon_type,
443 hostname=p.hostname, network=p.network,
444 name=p.name, ports=p.ports, ip=ip))
445 else:
446 logger.debug(
447 f'Skipping {p.hostname} with no IP in network(s) {self.spec.networks}'
448 )
449
450 if self.filter_new_host:
451 old = ls.copy()
452 ls = []
453 for h in old:
454 if self.filter_new_host(h.hostname, self.spec):
455 ls.append(h)
456 if len(old) > len(ls):
457 logger.debug('Filtered %s down to %s' % (old, ls))
458
459 # now that we have the list of nodes candidates based on the configured
460 # placement, let's shuffle the list for node pseudo-random selection. For this,
461 # we generate a seed from the service name and we use to shuffle the candidates.
462 # This makes shuffling deterministic for the same service name.
463 seed = int(
464 hashlib.sha1(self.spec.service_name().encode('utf-8')).hexdigest(),
465 16
466 ) % (2 ** 32) # truncate result to 32 bits
467 final = sorted(ls)
468 random.Random(seed).shuffle(final)
469 return final
470
471 def remove_non_maintenance_unreachable_candidates(self, candidates: List[DaemonPlacement]) -> List[DaemonPlacement]:
472 in_maintenance: Dict[str, bool] = {}
473 for h in self.hosts:
474 if h.status.lower() == 'maintenance':
475 in_maintenance[h.hostname] = True
476 continue
477 in_maintenance[h.hostname] = False
478 unreachable_hosts = [h.hostname for h in self.unreachable_hosts]
479 candidates = [
480 c for c in candidates if c.hostname not in unreachable_hosts or in_maintenance[c.hostname]]
481 return candidates