]>
Commit | Line | Data |
---|---|---|
b3b6e05e | 1 | import hashlib |
e306af50 TL |
2 | import logging |
3 | import random | |
f67539c2 | 4 | from typing import List, Optional, Callable, TypeVar, Tuple, NamedTuple, Dict |
e306af50 TL |
5 | |
6 | import orchestrator | |
f67539c2 | 7 | from ceph.deployment.service_spec import ServiceSpec |
f6b5b4d7 | 8 | from orchestrator._interface import DaemonDescription |
e306af50 TL |
9 | from orchestrator import OrchestratorValidationError |
10 | ||
11 | logger = logging.getLogger(__name__) | |
f6b5b4d7 | 12 | T = TypeVar('T') |
e306af50 | 13 | |
f91f0fd5 | 14 | |
f67539c2 TL |
15 | class DaemonPlacement(NamedTuple): |
16 | daemon_type: str | |
17 | hostname: str | |
18 | network: str = '' # for mons only | |
19 | name: str = '' | |
20 | ip: Optional[str] = None | |
21 | ports: List[int] = [] | |
b3b6e05e TL |
22 | rank: Optional[int] = None |
23 | rank_generation: Optional[int] = None | |
f67539c2 TL |
24 | |
25 | def __str__(self) -> str: | |
26 | res = self.daemon_type + ':' + self.hostname | |
27 | other = [] | |
b3b6e05e TL |
28 | if self.rank is not None: |
29 | other.append(f'rank={self.rank}.{self.rank_generation}') | |
f67539c2 TL |
30 | if self.network: |
31 | other.append(f'network={self.network}') | |
32 | if self.name: | |
33 | other.append(f'name={self.name}') | |
34 | if self.ports: | |
b3b6e05e | 35 | other.append(f'{self.ip or "*"}:{",".join(map(str, self.ports))}') |
f67539c2 TL |
36 | if other: |
37 | res += '(' + ' '.join(other) + ')' | |
38 | return res | |
39 | ||
40 | def renumber_ports(self, n: int) -> 'DaemonPlacement': | |
41 | return DaemonPlacement( | |
42 | self.daemon_type, | |
43 | self.hostname, | |
44 | self.network, | |
45 | self.name, | |
46 | self.ip, | |
47 | [p + n for p in self.ports], | |
b3b6e05e TL |
48 | self.rank, |
49 | self.rank_generation, | |
50 | ) | |
51 | ||
52 | def assign_rank(self, rank: int, gen: int) -> 'DaemonPlacement': | |
53 | return DaemonPlacement( | |
54 | self.daemon_type, | |
55 | self.hostname, | |
56 | self.network, | |
57 | self.name, | |
58 | self.ip, | |
59 | self.ports, | |
60 | rank, | |
61 | gen, | |
62 | ) | |
63 | ||
64 | def assign_name(self, name: str) -> 'DaemonPlacement': | |
65 | return DaemonPlacement( | |
66 | self.daemon_type, | |
67 | self.hostname, | |
68 | self.network, | |
69 | name, | |
70 | self.ip, | |
71 | self.ports, | |
72 | self.rank, | |
73 | self.rank_generation, | |
74 | ) | |
75 | ||
76 | def assign_rank_generation( | |
77 | self, | |
78 | rank: int, | |
79 | rank_map: Dict[int, Dict[int, Optional[str]]] | |
80 | ) -> 'DaemonPlacement': | |
81 | if rank not in rank_map: | |
82 | rank_map[rank] = {} | |
83 | gen = 0 | |
84 | else: | |
85 | gen = max(rank_map[rank].keys()) + 1 | |
86 | rank_map[rank][gen] = None | |
87 | return DaemonPlacement( | |
88 | self.daemon_type, | |
89 | self.hostname, | |
90 | self.network, | |
91 | self.name, | |
92 | self.ip, | |
93 | self.ports, | |
94 | rank, | |
95 | gen, | |
f67539c2 TL |
96 | ) |
97 | ||
98 | def matches_daemon(self, dd: DaemonDescription) -> bool: | |
99 | if self.daemon_type != dd.daemon_type: | |
100 | return False | |
101 | if self.hostname != dd.hostname: | |
102 | return False | |
103 | # fixme: how to match against network? | |
104 | if self.name and self.name != dd.daemon_id: | |
105 | return False | |
106 | if self.ports: | |
107 | if self.ports != dd.ports: | |
108 | return False | |
109 | if self.ip != dd.ip: | |
110 | return False | |
111 | return True | |
e306af50 | 112 | |
b3b6e05e TL |
113 | def matches_rank_map( |
114 | self, | |
115 | dd: DaemonDescription, | |
116 | rank_map: Optional[Dict[int, Dict[int, Optional[str]]]], | |
117 | ranks: List[int] | |
118 | ) -> bool: | |
119 | if rank_map is None: | |
120 | # daemon should have no rank | |
121 | return dd.rank is None | |
122 | ||
123 | if dd.rank is None: | |
124 | return False | |
125 | ||
126 | if dd.rank not in rank_map: | |
127 | return False | |
128 | if dd.rank not in ranks: | |
129 | return False | |
130 | ||
131 | # must be the highest/newest rank_generation | |
132 | if dd.rank_generation != max(rank_map[dd.rank].keys()): | |
133 | return False | |
134 | ||
135 | # must be *this* daemon | |
136 | return rank_map[dd.rank][dd.rank_generation] == dd.daemon_id | |
137 | ||
e306af50 TL |
138 | |
139 | class HostAssignment(object): | |
e306af50 TL |
140 | |
141 | def __init__(self, | |
142 | spec, # type: ServiceSpec | |
f91f0fd5 | 143 | hosts: List[orchestrator.HostSpec], |
f67539c2 TL |
144 | daemons: List[orchestrator.DaemonDescription], |
145 | networks: Dict[str, Dict[str, Dict[str, List[str]]]] = {}, | |
f91f0fd5 | 146 | filter_new_host=None, # type: Optional[Callable[[str],bool]] |
f67539c2 TL |
147 | allow_colo: bool = False, |
148 | primary_daemon_type: Optional[str] = None, | |
149 | per_host_daemon_type: Optional[str] = None, | |
b3b6e05e | 150 | rank_map: Optional[Dict[int, Dict[int, Optional[str]]]] = None, |
e306af50 | 151 | ): |
f67539c2 | 152 | assert spec |
e306af50 | 153 | self.spec = spec # type: ServiceSpec |
f67539c2 | 154 | self.primary_daemon_type = primary_daemon_type or spec.service_type |
f91f0fd5 | 155 | self.hosts: List[orchestrator.HostSpec] = hosts |
e306af50 TL |
156 | self.filter_new_host = filter_new_host |
157 | self.service_name = spec.service_name() | |
f67539c2 TL |
158 | self.daemons = daemons |
159 | self.networks = networks | |
160 | self.allow_colo = allow_colo | |
161 | self.per_host_daemon_type = per_host_daemon_type | |
162 | self.ports_start = spec.get_port_start() | |
b3b6e05e | 163 | self.rank_map = rank_map |
e306af50 | 164 | |
f91f0fd5 TL |
165 | def hosts_by_label(self, label: str) -> List[orchestrator.HostSpec]: |
166 | return [h for h in self.hosts if label in h.labels] | |
167 | ||
168 | def get_hostnames(self) -> List[str]: | |
169 | return [h.hostname for h in self.hosts] | |
170 | ||
adb31ebb | 171 | def validate(self) -> None: |
e306af50 TL |
172 | self.spec.validate() |
173 | ||
f6b5b4d7 TL |
174 | if self.spec.placement.count == 0: |
175 | raise OrchestratorValidationError( | |
176 | f'<count> can not be 0 for {self.spec.one_line_str()}') | |
177 | ||
f67539c2 TL |
178 | if ( |
179 | self.spec.placement.count_per_host is not None | |
180 | and self.spec.placement.count_per_host > 1 | |
181 | and not self.allow_colo | |
182 | ): | |
183 | raise OrchestratorValidationError( | |
184 | f'Cannot place more than one {self.spec.service_type} per host' | |
185 | ) | |
186 | ||
e306af50 TL |
187 | if self.spec.placement.hosts: |
188 | explicit_hostnames = {h.hostname for h in self.spec.placement.hosts} | |
f91f0fd5 | 189 | unknown_hosts = explicit_hostnames.difference(set(self.get_hostnames())) |
e306af50 TL |
190 | if unknown_hosts: |
191 | raise OrchestratorValidationError( | |
f6b5b4d7 | 192 | f'Cannot place {self.spec.one_line_str()} on {", ".join(sorted(unknown_hosts))}: Unknown hosts') |
e306af50 TL |
193 | |
194 | if self.spec.placement.host_pattern: | |
f91f0fd5 | 195 | pattern_hostnames = self.spec.placement.filter_matching_hostspecs(self.hosts) |
e306af50 TL |
196 | if not pattern_hostnames: |
197 | raise OrchestratorValidationError( | |
198 | f'Cannot place {self.spec.one_line_str()}: No matching hosts') | |
199 | ||
200 | if self.spec.placement.label: | |
f91f0fd5 TL |
201 | label_hosts = self.hosts_by_label(self.spec.placement.label) |
202 | if not label_hosts: | |
e306af50 TL |
203 | raise OrchestratorValidationError( |
204 | f'Cannot place {self.spec.one_line_str()}: No matching ' | |
205 | f'hosts for label {self.spec.placement.label}') | |
206 | ||
f67539c2 TL |
207 | def place_per_host_daemons( |
208 | self, | |
209 | slots: List[DaemonPlacement], | |
210 | to_add: List[DaemonPlacement], | |
211 | to_remove: List[orchestrator.DaemonDescription], | |
212 | ) -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]]: | |
213 | if self.per_host_daemon_type: | |
214 | host_slots = [ | |
215 | DaemonPlacement(daemon_type=self.per_host_daemon_type, | |
216 | hostname=hostname) | |
217 | for hostname in set([s.hostname for s in slots]) | |
218 | ] | |
219 | existing = [ | |
220 | d for d in self.daemons if d.daemon_type == self.per_host_daemon_type | |
221 | ] | |
222 | slots += host_slots | |
223 | for dd in existing: | |
224 | found = False | |
225 | for p in host_slots: | |
226 | if p.matches_daemon(dd): | |
227 | host_slots.remove(p) | |
228 | found = True | |
229 | break | |
230 | if not found: | |
231 | to_remove.append(dd) | |
232 | to_add += host_slots | |
233 | ||
234 | return slots, to_add, to_remove | |
235 | ||
e306af50 | 236 | def place(self): |
f67539c2 | 237 | # type: () -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]] |
e306af50 | 238 | """ |
f6b5b4d7 TL |
239 | Generate a list of HostPlacementSpec taking into account: |
240 | ||
241 | * all known hosts | |
242 | * hosts with existing daemons | |
243 | * placement spec | |
244 | * self.filter_new_host | |
e306af50 TL |
245 | """ |
246 | ||
247 | self.validate() | |
248 | ||
f6b5b4d7 | 249 | count = self.spec.placement.count |
e306af50 | 250 | |
f67539c2 TL |
251 | # get candidate hosts based on [hosts, label, host_pattern] |
252 | candidates = self.get_candidates() # type: List[DaemonPlacement] | |
e306af50 | 253 | |
f67539c2 TL |
254 | def expand_candidates(ls: List[DaemonPlacement], num: int) -> List[DaemonPlacement]: |
255 | r = [] | |
256 | for offset in range(num): | |
257 | r.extend([dp.renumber_ports(offset) for dp in ls]) | |
258 | return r | |
259 | ||
260 | # consider enough slots to fulfill target count-per-host or count | |
f6b5b4d7 | 261 | if count is None: |
f67539c2 TL |
262 | if self.spec.placement.count_per_host: |
263 | per_host = self.spec.placement.count_per_host | |
f91f0fd5 | 264 | else: |
f67539c2 TL |
265 | per_host = 1 |
266 | candidates = expand_candidates(candidates, per_host) | |
267 | elif self.allow_colo and candidates: | |
268 | per_host = 1 + ((count - 1) // len(candidates)) | |
269 | candidates = expand_candidates(candidates, per_host) | |
270 | ||
b3b6e05e TL |
271 | # consider (preserve) existing daemons in a particular order... |
272 | daemons = sorted( | |
273 | [ | |
274 | d for d in self.daemons if d.daemon_type == self.primary_daemon_type | |
275 | ], | |
276 | key=lambda d: ( | |
277 | not d.is_active, # active before standby | |
278 | d.rank is not None, # ranked first, then non-ranked | |
279 | d.rank, # low ranks | |
280 | 0 - (d.rank_generation or 0), # newer generations first | |
281 | ) | |
282 | ) | |
f6b5b4d7 | 283 | |
f67539c2 TL |
284 | # sort candidates into existing/used slots that already have a |
285 | # daemon, and others (the rest) | |
286 | existing_active: List[orchestrator.DaemonDescription] = [] | |
287 | existing_standby: List[orchestrator.DaemonDescription] = [] | |
288 | existing_slots: List[DaemonPlacement] = [] | |
289 | to_remove: List[orchestrator.DaemonDescription] = [] | |
b3b6e05e TL |
290 | ranks: List[int] = list(range(len(candidates))) |
291 | others: List[DaemonPlacement] = candidates.copy() | |
f67539c2 TL |
292 | for dd in daemons: |
293 | found = False | |
294 | for p in others: | |
b3b6e05e | 295 | if p.matches_daemon(dd) and p.matches_rank_map(dd, self.rank_map, ranks): |
f67539c2 TL |
296 | others.remove(p) |
297 | if dd.is_active: | |
298 | existing_active.append(dd) | |
299 | else: | |
300 | existing_standby.append(dd) | |
b3b6e05e TL |
301 | if dd.rank is not None: |
302 | assert dd.rank_generation is not None | |
303 | p = p.assign_rank(dd.rank, dd.rank_generation) | |
304 | ranks.remove(dd.rank) | |
f67539c2 TL |
305 | existing_slots.append(p) |
306 | found = True | |
307 | break | |
308 | if not found: | |
309 | to_remove.append(dd) | |
310 | ||
311 | existing = existing_active + existing_standby | |
f6b5b4d7 | 312 | |
b3b6e05e TL |
313 | # build to_add |
314 | if not count: | |
315 | to_add = others | |
316 | else: | |
317 | # The number of new slots that need to be selected in order to fulfill count | |
318 | need = count - len(existing) | |
319 | ||
320 | # we don't need any additional placements | |
321 | if need <= 0: | |
322 | to_remove.extend(existing[count:]) | |
323 | del existing_slots[count:] | |
324 | return self.place_per_host_daemons(existing_slots, [], to_remove) | |
f67539c2 | 325 | |
b3b6e05e TL |
326 | if need > 0: |
327 | to_add = others[:need] | |
f67539c2 | 328 | |
b3b6e05e TL |
329 | if self.rank_map is not None: |
330 | # assign unused ranks (and rank_generations) to to_add | |
331 | assert len(ranks) >= len(to_add) | |
332 | for i in range(len(to_add)): | |
333 | to_add[i] = to_add[i].assign_rank_generation(ranks[i], self.rank_map) | |
334 | ||
335 | # If we don't have <count> the list of candidates is definitive. | |
336 | if count is None: | |
337 | final = existing_slots + to_add | |
338 | logger.debug('Provided hosts: %s' % final) | |
339 | return self.place_per_host_daemons(final, to_add, to_remove) | |
f67539c2 | 340 | |
f67539c2 TL |
341 | logger.debug('Combine hosts with existing daemons %s + new hosts %s' % ( |
342 | existing, to_add)) | |
343 | return self.place_per_host_daemons(existing_slots + to_add, to_add, to_remove) | |
344 | ||
345 | def find_ip_on_host(self, hostname: str, subnets: List[str]) -> Optional[str]: | |
346 | for subnet in subnets: | |
347 | ips: List[str] = [] | |
348 | for iface, ips in self.networks.get(hostname, {}).get(subnet, {}).items(): | |
349 | ips.extend(ips) | |
350 | if ips: | |
351 | return sorted(ips)[0] | |
352 | return None | |
353 | ||
354 | def get_candidates(self) -> List[DaemonPlacement]: | |
f6b5b4d7 | 355 | if self.spec.placement.hosts: |
f67539c2 TL |
356 | ls = [ |
357 | DaemonPlacement(daemon_type=self.primary_daemon_type, | |
358 | hostname=h.hostname, network=h.network, name=h.name, | |
359 | ports=self.ports_start) | |
360 | for h in self.spec.placement.hosts | |
361 | ] | |
e306af50 | 362 | elif self.spec.placement.label: |
f67539c2 TL |
363 | ls = [ |
364 | DaemonPlacement(daemon_type=self.primary_daemon_type, | |
365 | hostname=x.hostname, ports=self.ports_start) | |
f91f0fd5 | 366 | for x in self.hosts_by_label(self.spec.placement.label) |
e306af50 | 367 | ] |
f6b5b4d7 | 368 | elif self.spec.placement.host_pattern: |
f67539c2 TL |
369 | ls = [ |
370 | DaemonPlacement(daemon_type=self.primary_daemon_type, | |
371 | hostname=x, ports=self.ports_start) | |
f91f0fd5 | 372 | for x in self.spec.placement.filter_matching_hostspecs(self.hosts) |
e306af50 | 373 | ] |
f67539c2 TL |
374 | elif ( |
375 | self.spec.placement.count is not None | |
376 | or self.spec.placement.count_per_host is not None | |
377 | ): | |
378 | ls = [ | |
379 | DaemonPlacement(daemon_type=self.primary_daemon_type, | |
380 | hostname=x.hostname, ports=self.ports_start) | |
381 | for x in self.hosts | |
382 | ] | |
383 | else: | |
f91f0fd5 TL |
384 | raise OrchestratorValidationError( |
385 | "placement spec is empty: no hosts, no label, no pattern, no count") | |
e306af50 | 386 | |
f67539c2 TL |
387 | # allocate an IP? |
388 | if self.spec.networks: | |
389 | orig = ls.copy() | |
390 | ls = [] | |
391 | for p in orig: | |
392 | ip = self.find_ip_on_host(p.hostname, self.spec.networks) | |
393 | if ip: | |
394 | ls.append(DaemonPlacement(daemon_type=self.primary_daemon_type, | |
395 | hostname=p.hostname, network=p.network, | |
396 | name=p.name, ports=p.ports, ip=ip)) | |
397 | else: | |
398 | logger.debug( | |
399 | f'Skipping {p.hostname} with no IP in network(s) {self.spec.networks}' | |
400 | ) | |
401 | ||
402 | if self.filter_new_host: | |
403 | old = ls.copy() | |
404 | ls = [] | |
405 | for h in old: | |
406 | if self.filter_new_host(h.hostname): | |
407 | ls.append(h) | |
f67539c2 TL |
408 | if len(old) > len(ls): |
409 | logger.debug('Filtered %s down to %s' % (old, ls)) | |
f6b5b4d7 | 410 | |
f67539c2 TL |
411 | # shuffle for pseudo random selection |
412 | # gen seed off of self.spec to make shuffling deterministic | |
b3b6e05e TL |
413 | seed = int( |
414 | hashlib.sha1(self.spec.service_name().encode('utf-8')).hexdigest(), | |
415 | 16 | |
416 | ) % (2 ** 32) | |
417 | final = sorted(ls) | |
418 | random.Random(seed).shuffle(final) | |
f67539c2 | 419 | return ls |