]>
Commit | Line | Data |
---|---|---|
2a845540 | 1 | import ipaddress |
b3b6e05e | 2 | import hashlib |
e306af50 TL |
3 | import logging |
4 | import random | |
f67539c2 | 5 | from typing import List, Optional, Callable, TypeVar, Tuple, NamedTuple, Dict |
e306af50 TL |
6 | |
7 | import orchestrator | |
f67539c2 | 8 | from ceph.deployment.service_spec import ServiceSpec |
f6b5b4d7 | 9 | from orchestrator._interface import DaemonDescription |
e306af50 | 10 | from orchestrator import OrchestratorValidationError |
33c7a0ef | 11 | from .utils import RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES |
e306af50 TL |
12 | |
13 | logger = logging.getLogger(__name__) | |
f6b5b4d7 | 14 | T = TypeVar('T') |
e306af50 | 15 | |
f91f0fd5 | 16 | |
f67539c2 TL |
17 | class DaemonPlacement(NamedTuple): |
18 | daemon_type: str | |
19 | hostname: str | |
20 | network: str = '' # for mons only | |
21 | name: str = '' | |
22 | ip: Optional[str] = None | |
23 | ports: List[int] = [] | |
b3b6e05e TL |
24 | rank: Optional[int] = None |
25 | rank_generation: Optional[int] = None | |
f67539c2 TL |
26 | |
27 | def __str__(self) -> str: | |
28 | res = self.daemon_type + ':' + self.hostname | |
29 | other = [] | |
b3b6e05e TL |
30 | if self.rank is not None: |
31 | other.append(f'rank={self.rank}.{self.rank_generation}') | |
f67539c2 TL |
32 | if self.network: |
33 | other.append(f'network={self.network}') | |
34 | if self.name: | |
35 | other.append(f'name={self.name}') | |
36 | if self.ports: | |
b3b6e05e | 37 | other.append(f'{self.ip or "*"}:{",".join(map(str, self.ports))}') |
f67539c2 TL |
38 | if other: |
39 | res += '(' + ' '.join(other) + ')' | |
40 | return res | |
41 | ||
42 | def renumber_ports(self, n: int) -> 'DaemonPlacement': | |
43 | return DaemonPlacement( | |
44 | self.daemon_type, | |
45 | self.hostname, | |
46 | self.network, | |
47 | self.name, | |
48 | self.ip, | |
49 | [p + n for p in self.ports], | |
b3b6e05e TL |
50 | self.rank, |
51 | self.rank_generation, | |
52 | ) | |
53 | ||
54 | def assign_rank(self, rank: int, gen: int) -> 'DaemonPlacement': | |
55 | return DaemonPlacement( | |
56 | self.daemon_type, | |
57 | self.hostname, | |
58 | self.network, | |
59 | self.name, | |
60 | self.ip, | |
61 | self.ports, | |
62 | rank, | |
63 | gen, | |
64 | ) | |
65 | ||
66 | def assign_name(self, name: str) -> 'DaemonPlacement': | |
67 | return DaemonPlacement( | |
68 | self.daemon_type, | |
69 | self.hostname, | |
70 | self.network, | |
71 | name, | |
72 | self.ip, | |
73 | self.ports, | |
74 | self.rank, | |
75 | self.rank_generation, | |
76 | ) | |
77 | ||
78 | def assign_rank_generation( | |
79 | self, | |
80 | rank: int, | |
81 | rank_map: Dict[int, Dict[int, Optional[str]]] | |
82 | ) -> 'DaemonPlacement': | |
83 | if rank not in rank_map: | |
84 | rank_map[rank] = {} | |
85 | gen = 0 | |
86 | else: | |
87 | gen = max(rank_map[rank].keys()) + 1 | |
88 | rank_map[rank][gen] = None | |
89 | return DaemonPlacement( | |
90 | self.daemon_type, | |
91 | self.hostname, | |
92 | self.network, | |
93 | self.name, | |
94 | self.ip, | |
95 | self.ports, | |
96 | rank, | |
97 | gen, | |
f67539c2 TL |
98 | ) |
99 | ||
100 | def matches_daemon(self, dd: DaemonDescription) -> bool: | |
101 | if self.daemon_type != dd.daemon_type: | |
102 | return False | |
103 | if self.hostname != dd.hostname: | |
104 | return False | |
105 | # fixme: how to match against network? | |
106 | if self.name and self.name != dd.daemon_id: | |
107 | return False | |
108 | if self.ports: | |
a4b75251 | 109 | if self.ports != dd.ports and dd.ports: |
f67539c2 | 110 | return False |
a4b75251 | 111 | if self.ip != dd.ip and dd.ip: |
f67539c2 TL |
112 | return False |
113 | return True | |
e306af50 | 114 | |
b3b6e05e TL |
115 | def matches_rank_map( |
116 | self, | |
117 | dd: DaemonDescription, | |
118 | rank_map: Optional[Dict[int, Dict[int, Optional[str]]]], | |
119 | ranks: List[int] | |
120 | ) -> bool: | |
121 | if rank_map is None: | |
122 | # daemon should have no rank | |
123 | return dd.rank is None | |
124 | ||
125 | if dd.rank is None: | |
126 | return False | |
127 | ||
128 | if dd.rank not in rank_map: | |
129 | return False | |
130 | if dd.rank not in ranks: | |
131 | return False | |
132 | ||
133 | # must be the highest/newest rank_generation | |
134 | if dd.rank_generation != max(rank_map[dd.rank].keys()): | |
135 | return False | |
136 | ||
137 | # must be *this* daemon | |
138 | return rank_map[dd.rank][dd.rank_generation] == dd.daemon_id | |
139 | ||
e306af50 TL |
140 | |
141 | class HostAssignment(object): | |
e306af50 TL |
142 | |
143 | def __init__(self, | |
144 | spec, # type: ServiceSpec | |
f91f0fd5 | 145 | hosts: List[orchestrator.HostSpec], |
522d829b | 146 | unreachable_hosts: List[orchestrator.HostSpec], |
2a845540 | 147 | draining_hosts: List[orchestrator.HostSpec], |
f67539c2 TL |
148 | daemons: List[orchestrator.DaemonDescription], |
149 | networks: Dict[str, Dict[str, Dict[str, List[str]]]] = {}, | |
f91f0fd5 | 150 | filter_new_host=None, # type: Optional[Callable[[str],bool]] |
f67539c2 TL |
151 | allow_colo: bool = False, |
152 | primary_daemon_type: Optional[str] = None, | |
153 | per_host_daemon_type: Optional[str] = None, | |
b3b6e05e | 154 | rank_map: Optional[Dict[int, Dict[int, Optional[str]]]] = None, |
e306af50 | 155 | ): |
f67539c2 | 156 | assert spec |
e306af50 | 157 | self.spec = spec # type: ServiceSpec |
f67539c2 | 158 | self.primary_daemon_type = primary_daemon_type or spec.service_type |
f91f0fd5 | 159 | self.hosts: List[orchestrator.HostSpec] = hosts |
522d829b | 160 | self.unreachable_hosts: List[orchestrator.HostSpec] = unreachable_hosts |
2a845540 | 161 | self.draining_hosts: List[orchestrator.HostSpec] = draining_hosts |
e306af50 TL |
162 | self.filter_new_host = filter_new_host |
163 | self.service_name = spec.service_name() | |
f67539c2 TL |
164 | self.daemons = daemons |
165 | self.networks = networks | |
166 | self.allow_colo = allow_colo | |
167 | self.per_host_daemon_type = per_host_daemon_type | |
168 | self.ports_start = spec.get_port_start() | |
b3b6e05e | 169 | self.rank_map = rank_map |
e306af50 | 170 | |
f91f0fd5 TL |
171 | def hosts_by_label(self, label: str) -> List[orchestrator.HostSpec]: |
172 | return [h for h in self.hosts if label in h.labels] | |
173 | ||
174 | def get_hostnames(self) -> List[str]: | |
175 | return [h.hostname for h in self.hosts] | |
176 | ||
adb31ebb | 177 | def validate(self) -> None: |
e306af50 TL |
178 | self.spec.validate() |
179 | ||
f6b5b4d7 TL |
180 | if self.spec.placement.count == 0: |
181 | raise OrchestratorValidationError( | |
182 | f'<count> can not be 0 for {self.spec.one_line_str()}') | |
183 | ||
f67539c2 TL |
184 | if ( |
185 | self.spec.placement.count_per_host is not None | |
186 | and self.spec.placement.count_per_host > 1 | |
187 | and not self.allow_colo | |
188 | ): | |
189 | raise OrchestratorValidationError( | |
190 | f'Cannot place more than one {self.spec.service_type} per host' | |
191 | ) | |
192 | ||
e306af50 TL |
193 | if self.spec.placement.hosts: |
194 | explicit_hostnames = {h.hostname for h in self.spec.placement.hosts} | |
2a845540 TL |
195 | known_hosts = self.get_hostnames() + [h.hostname for h in self.draining_hosts] |
196 | unknown_hosts = explicit_hostnames.difference(set(known_hosts)) | |
e306af50 TL |
197 | if unknown_hosts: |
198 | raise OrchestratorValidationError( | |
f6b5b4d7 | 199 | f'Cannot place {self.spec.one_line_str()} on {", ".join(sorted(unknown_hosts))}: Unknown hosts') |
e306af50 TL |
200 | |
201 | if self.spec.placement.host_pattern: | |
f91f0fd5 | 202 | pattern_hostnames = self.spec.placement.filter_matching_hostspecs(self.hosts) |
e306af50 TL |
203 | if not pattern_hostnames: |
204 | raise OrchestratorValidationError( | |
205 | f'Cannot place {self.spec.one_line_str()}: No matching hosts') | |
206 | ||
207 | if self.spec.placement.label: | |
f91f0fd5 TL |
208 | label_hosts = self.hosts_by_label(self.spec.placement.label) |
209 | if not label_hosts: | |
e306af50 TL |
210 | raise OrchestratorValidationError( |
211 | f'Cannot place {self.spec.one_line_str()}: No matching ' | |
212 | f'hosts for label {self.spec.placement.label}') | |
213 | ||
f67539c2 TL |
214 | def place_per_host_daemons( |
215 | self, | |
216 | slots: List[DaemonPlacement], | |
217 | to_add: List[DaemonPlacement], | |
218 | to_remove: List[orchestrator.DaemonDescription], | |
219 | ) -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]]: | |
220 | if self.per_host_daemon_type: | |
221 | host_slots = [ | |
222 | DaemonPlacement(daemon_type=self.per_host_daemon_type, | |
223 | hostname=hostname) | |
224 | for hostname in set([s.hostname for s in slots]) | |
225 | ] | |
226 | existing = [ | |
227 | d for d in self.daemons if d.daemon_type == self.per_host_daemon_type | |
228 | ] | |
229 | slots += host_slots | |
230 | for dd in existing: | |
231 | found = False | |
232 | for p in host_slots: | |
233 | if p.matches_daemon(dd): | |
234 | host_slots.remove(p) | |
235 | found = True | |
236 | break | |
237 | if not found: | |
238 | to_remove.append(dd) | |
239 | to_add += host_slots | |
240 | ||
522d829b TL |
241 | to_remove = [d for d in to_remove if d.hostname not in [ |
242 | h.hostname for h in self.unreachable_hosts]] | |
243 | ||
f67539c2 TL |
244 | return slots, to_add, to_remove |
245 | ||
e306af50 | 246 | def place(self): |
f67539c2 | 247 | # type: () -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]] |
e306af50 | 248 | """ |
f6b5b4d7 TL |
249 | Generate a list of HostPlacementSpec taking into account: |
250 | ||
251 | * all known hosts | |
252 | * hosts with existing daemons | |
253 | * placement spec | |
254 | * self.filter_new_host | |
e306af50 TL |
255 | """ |
256 | ||
257 | self.validate() | |
258 | ||
f6b5b4d7 | 259 | count = self.spec.placement.count |
e306af50 | 260 | |
f67539c2 TL |
261 | # get candidate hosts based on [hosts, label, host_pattern] |
262 | candidates = self.get_candidates() # type: List[DaemonPlacement] | |
33c7a0ef TL |
263 | if self.primary_daemon_type in RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES: |
264 | # remove unreachable hosts that are not in maintenance so daemons | |
265 | # on these hosts will be rescheduled | |
266 | candidates = self.remove_non_maintenance_unreachable_candidates(candidates) | |
e306af50 | 267 | |
f67539c2 TL |
268 | def expand_candidates(ls: List[DaemonPlacement], num: int) -> List[DaemonPlacement]: |
269 | r = [] | |
270 | for offset in range(num): | |
271 | r.extend([dp.renumber_ports(offset) for dp in ls]) | |
272 | return r | |
273 | ||
274 | # consider enough slots to fulfill target count-per-host or count | |
f6b5b4d7 | 275 | if count is None: |
f67539c2 TL |
276 | if self.spec.placement.count_per_host: |
277 | per_host = self.spec.placement.count_per_host | |
f91f0fd5 | 278 | else: |
f67539c2 TL |
279 | per_host = 1 |
280 | candidates = expand_candidates(candidates, per_host) | |
281 | elif self.allow_colo and candidates: | |
282 | per_host = 1 + ((count - 1) // len(candidates)) | |
283 | candidates = expand_candidates(candidates, per_host) | |
284 | ||
b3b6e05e TL |
285 | # consider (preserve) existing daemons in a particular order... |
286 | daemons = sorted( | |
287 | [ | |
288 | d for d in self.daemons if d.daemon_type == self.primary_daemon_type | |
289 | ], | |
290 | key=lambda d: ( | |
291 | not d.is_active, # active before standby | |
292 | d.rank is not None, # ranked first, then non-ranked | |
293 | d.rank, # low ranks | |
294 | 0 - (d.rank_generation or 0), # newer generations first | |
295 | ) | |
296 | ) | |
f6b5b4d7 | 297 | |
f67539c2 TL |
298 | # sort candidates into existing/used slots that already have a |
299 | # daemon, and others (the rest) | |
300 | existing_active: List[orchestrator.DaemonDescription] = [] | |
301 | existing_standby: List[orchestrator.DaemonDescription] = [] | |
302 | existing_slots: List[DaemonPlacement] = [] | |
522d829b | 303 | to_add: List[DaemonPlacement] = [] |
f67539c2 | 304 | to_remove: List[orchestrator.DaemonDescription] = [] |
b3b6e05e TL |
305 | ranks: List[int] = list(range(len(candidates))) |
306 | others: List[DaemonPlacement] = candidates.copy() | |
f67539c2 TL |
307 | for dd in daemons: |
308 | found = False | |
309 | for p in others: | |
b3b6e05e | 310 | if p.matches_daemon(dd) and p.matches_rank_map(dd, self.rank_map, ranks): |
f67539c2 TL |
311 | others.remove(p) |
312 | if dd.is_active: | |
313 | existing_active.append(dd) | |
314 | else: | |
315 | existing_standby.append(dd) | |
b3b6e05e TL |
316 | if dd.rank is not None: |
317 | assert dd.rank_generation is not None | |
318 | p = p.assign_rank(dd.rank, dd.rank_generation) | |
319 | ranks.remove(dd.rank) | |
f67539c2 TL |
320 | existing_slots.append(p) |
321 | found = True | |
322 | break | |
323 | if not found: | |
324 | to_remove.append(dd) | |
325 | ||
522d829b TL |
326 | # TODO: At some point we want to deploy daemons that are on offline hosts |
327 | # at what point we do this differs per daemon type. Stateless daemons we could | |
328 | # do quickly to improve availability. Steful daemons we might want to wait longer | |
329 | # to see if the host comes back online | |
330 | ||
f67539c2 | 331 | existing = existing_active + existing_standby |
f6b5b4d7 | 332 | |
b3b6e05e TL |
333 | # build to_add |
334 | if not count: | |
522d829b TL |
335 | to_add = [dd for dd in others if dd.hostname not in [ |
336 | h.hostname for h in self.unreachable_hosts]] | |
b3b6e05e TL |
337 | else: |
338 | # The number of new slots that need to be selected in order to fulfill count | |
339 | need = count - len(existing) | |
340 | ||
341 | # we don't need any additional placements | |
342 | if need <= 0: | |
343 | to_remove.extend(existing[count:]) | |
344 | del existing_slots[count:] | |
345 | return self.place_per_host_daemons(existing_slots, [], to_remove) | |
f67539c2 | 346 | |
522d829b TL |
347 | for dp in others: |
348 | if need <= 0: | |
349 | break | |
350 | if dp.hostname not in [h.hostname for h in self.unreachable_hosts]: | |
351 | to_add.append(dp) | |
352 | need -= 1 # this is last use of need in this function so it can work as a counter | |
f67539c2 | 353 | |
b3b6e05e TL |
354 | if self.rank_map is not None: |
355 | # assign unused ranks (and rank_generations) to to_add | |
356 | assert len(ranks) >= len(to_add) | |
357 | for i in range(len(to_add)): | |
358 | to_add[i] = to_add[i].assign_rank_generation(ranks[i], self.rank_map) | |
359 | ||
2a845540 | 360 | logger.debug('Combine hosts with existing daemons %s + new hosts %s' % (existing, to_add)) |
f67539c2 TL |
361 | return self.place_per_host_daemons(existing_slots + to_add, to_add, to_remove) |
362 | ||
363 | def find_ip_on_host(self, hostname: str, subnets: List[str]) -> Optional[str]: | |
364 | for subnet in subnets: | |
365 | ips: List[str] = [] | |
2a845540 TL |
366 | # following is to allow loopback interfaces for both ipv4 and ipv6. Since we |
367 | # only have the subnet (and no IP) we assume default loopback IP address. | |
368 | if ipaddress.ip_network(subnet).is_loopback: | |
369 | if ipaddress.ip_network(subnet).version == 4: | |
370 | ips.append('127.0.0.1') | |
371 | else: | |
372 | ips.append('::1') | |
a4b75251 TL |
373 | for iface, iface_ips in self.networks.get(hostname, {}).get(subnet, {}).items(): |
374 | ips.extend(iface_ips) | |
f67539c2 TL |
375 | if ips: |
376 | return sorted(ips)[0] | |
377 | return None | |
378 | ||
379 | def get_candidates(self) -> List[DaemonPlacement]: | |
f6b5b4d7 | 380 | if self.spec.placement.hosts: |
f67539c2 TL |
381 | ls = [ |
382 | DaemonPlacement(daemon_type=self.primary_daemon_type, | |
383 | hostname=h.hostname, network=h.network, name=h.name, | |
384 | ports=self.ports_start) | |
2a845540 | 385 | for h in self.spec.placement.hosts if h.hostname not in [dh.hostname for dh in self.draining_hosts] |
f67539c2 | 386 | ] |
e306af50 | 387 | elif self.spec.placement.label: |
f67539c2 TL |
388 | ls = [ |
389 | DaemonPlacement(daemon_type=self.primary_daemon_type, | |
390 | hostname=x.hostname, ports=self.ports_start) | |
f91f0fd5 | 391 | for x in self.hosts_by_label(self.spec.placement.label) |
e306af50 | 392 | ] |
f6b5b4d7 | 393 | elif self.spec.placement.host_pattern: |
f67539c2 TL |
394 | ls = [ |
395 | DaemonPlacement(daemon_type=self.primary_daemon_type, | |
396 | hostname=x, ports=self.ports_start) | |
f91f0fd5 | 397 | for x in self.spec.placement.filter_matching_hostspecs(self.hosts) |
e306af50 | 398 | ] |
f67539c2 TL |
399 | elif ( |
400 | self.spec.placement.count is not None | |
401 | or self.spec.placement.count_per_host is not None | |
402 | ): | |
403 | ls = [ | |
404 | DaemonPlacement(daemon_type=self.primary_daemon_type, | |
405 | hostname=x.hostname, ports=self.ports_start) | |
406 | for x in self.hosts | |
407 | ] | |
408 | else: | |
f91f0fd5 TL |
409 | raise OrchestratorValidationError( |
410 | "placement spec is empty: no hosts, no label, no pattern, no count") | |
e306af50 | 411 | |
f67539c2 TL |
412 | # allocate an IP? |
413 | if self.spec.networks: | |
414 | orig = ls.copy() | |
415 | ls = [] | |
416 | for p in orig: | |
417 | ip = self.find_ip_on_host(p.hostname, self.spec.networks) | |
418 | if ip: | |
419 | ls.append(DaemonPlacement(daemon_type=self.primary_daemon_type, | |
420 | hostname=p.hostname, network=p.network, | |
421 | name=p.name, ports=p.ports, ip=ip)) | |
422 | else: | |
423 | logger.debug( | |
424 | f'Skipping {p.hostname} with no IP in network(s) {self.spec.networks}' | |
425 | ) | |
426 | ||
427 | if self.filter_new_host: | |
428 | old = ls.copy() | |
429 | ls = [] | |
430 | for h in old: | |
431 | if self.filter_new_host(h.hostname): | |
432 | ls.append(h) | |
f67539c2 TL |
433 | if len(old) > len(ls): |
434 | logger.debug('Filtered %s down to %s' % (old, ls)) | |
f6b5b4d7 | 435 | |
2a845540 TL |
436 | # now that we have the list of nodes candidates based on the configured |
437 | # placement, let's shuffle the list for node pseudo-random selection. For this, | |
438 | # we generate a seed from the service name and we use to shuffle the candidates. | |
439 | # This makes shuffling deterministic for the same service name. | |
b3b6e05e TL |
440 | seed = int( |
441 | hashlib.sha1(self.spec.service_name().encode('utf-8')).hexdigest(), | |
442 | 16 | |
2a845540 | 443 | ) % (2 ** 32) # truncate result to 32 bits |
b3b6e05e TL |
444 | final = sorted(ls) |
445 | random.Random(seed).shuffle(final) | |
2a845540 | 446 | return final |
33c7a0ef TL |
447 | |
448 | def remove_non_maintenance_unreachable_candidates(self, candidates: List[DaemonPlacement]) -> List[DaemonPlacement]: | |
449 | in_maintenance: Dict[str, bool] = {} | |
450 | for h in self.hosts: | |
451 | if h.status.lower() == 'maintenance': | |
452 | in_maintenance[h.hostname] = True | |
453 | continue | |
454 | in_maintenance[h.hostname] = False | |
455 | unreachable_hosts = [h.hostname for h in self.unreachable_hosts] | |
456 | candidates = [ | |
457 | c for c in candidates if c.hostname not in unreachable_hosts or in_maintenance[c.hostname]] | |
458 | return candidates |