]>
Commit | Line | Data |
---|---|---|
e306af50 TL |
1 | import logging |
2 | import random | |
f67539c2 | 3 | from typing import List, Optional, Callable, TypeVar, Tuple, NamedTuple, Dict |
e306af50 TL |
4 | |
5 | import orchestrator | |
f67539c2 | 6 | from ceph.deployment.service_spec import ServiceSpec |
f6b5b4d7 | 7 | from orchestrator._interface import DaemonDescription |
e306af50 TL |
8 | from orchestrator import OrchestratorValidationError |
9 | ||
10 | logger = logging.getLogger(__name__) | |
f6b5b4d7 | 11 | T = TypeVar('T') |
e306af50 | 12 | |
f91f0fd5 | 13 | |
f67539c2 TL |
14 | class DaemonPlacement(NamedTuple): |
15 | daemon_type: str | |
16 | hostname: str | |
17 | network: str = '' # for mons only | |
18 | name: str = '' | |
19 | ip: Optional[str] = None | |
20 | ports: List[int] = [] | |
21 | ||
22 | def __str__(self) -> str: | |
23 | res = self.daemon_type + ':' + self.hostname | |
24 | other = [] | |
25 | if self.network: | |
26 | other.append(f'network={self.network}') | |
27 | if self.name: | |
28 | other.append(f'name={self.name}') | |
29 | if self.ports: | |
30 | other.append(f'{self.ip or "*"}:{self.ports[0] if len(self.ports) == 1 else ",".join(map(str, self.ports))}') | |
31 | if other: | |
32 | res += '(' + ' '.join(other) + ')' | |
33 | return res | |
34 | ||
35 | def renumber_ports(self, n: int) -> 'DaemonPlacement': | |
36 | return DaemonPlacement( | |
37 | self.daemon_type, | |
38 | self.hostname, | |
39 | self.network, | |
40 | self.name, | |
41 | self.ip, | |
42 | [p + n for p in self.ports], | |
43 | ) | |
44 | ||
45 | def matches_daemon(self, dd: DaemonDescription) -> bool: | |
46 | if self.daemon_type != dd.daemon_type: | |
47 | return False | |
48 | if self.hostname != dd.hostname: | |
49 | return False | |
50 | # fixme: how to match against network? | |
51 | if self.name and self.name != dd.daemon_id: | |
52 | return False | |
53 | if self.ports: | |
54 | if self.ports != dd.ports: | |
55 | return False | |
56 | if self.ip != dd.ip: | |
57 | return False | |
58 | return True | |
e306af50 TL |
59 | |
60 | ||
61 | class HostAssignment(object): | |
e306af50 TL |
62 | |
63 | def __init__(self, | |
64 | spec, # type: ServiceSpec | |
f91f0fd5 | 65 | hosts: List[orchestrator.HostSpec], |
f67539c2 TL |
66 | daemons: List[orchestrator.DaemonDescription], |
67 | networks: Dict[str, Dict[str, Dict[str, List[str]]]] = {}, | |
f91f0fd5 | 68 | filter_new_host=None, # type: Optional[Callable[[str],bool]] |
f67539c2 TL |
69 | allow_colo: bool = False, |
70 | primary_daemon_type: Optional[str] = None, | |
71 | per_host_daemon_type: Optional[str] = None, | |
e306af50 | 72 | ): |
f67539c2 | 73 | assert spec |
e306af50 | 74 | self.spec = spec # type: ServiceSpec |
f67539c2 | 75 | self.primary_daemon_type = primary_daemon_type or spec.service_type |
f91f0fd5 | 76 | self.hosts: List[orchestrator.HostSpec] = hosts |
e306af50 TL |
77 | self.filter_new_host = filter_new_host |
78 | self.service_name = spec.service_name() | |
f67539c2 TL |
79 | self.daemons = daemons |
80 | self.networks = networks | |
81 | self.allow_colo = allow_colo | |
82 | self.per_host_daemon_type = per_host_daemon_type | |
83 | self.ports_start = spec.get_port_start() | |
e306af50 | 84 | |
f91f0fd5 TL |
85 | def hosts_by_label(self, label: str) -> List[orchestrator.HostSpec]: |
86 | return [h for h in self.hosts if label in h.labels] | |
87 | ||
88 | def get_hostnames(self) -> List[str]: | |
89 | return [h.hostname for h in self.hosts] | |
90 | ||
adb31ebb | 91 | def validate(self) -> None: |
e306af50 TL |
92 | self.spec.validate() |
93 | ||
f6b5b4d7 TL |
94 | if self.spec.placement.count == 0: |
95 | raise OrchestratorValidationError( | |
96 | f'<count> can not be 0 for {self.spec.one_line_str()}') | |
97 | ||
f67539c2 TL |
98 | if ( |
99 | self.spec.placement.count_per_host is not None | |
100 | and self.spec.placement.count_per_host > 1 | |
101 | and not self.allow_colo | |
102 | ): | |
103 | raise OrchestratorValidationError( | |
104 | f'Cannot place more than one {self.spec.service_type} per host' | |
105 | ) | |
106 | ||
e306af50 TL |
107 | if self.spec.placement.hosts: |
108 | explicit_hostnames = {h.hostname for h in self.spec.placement.hosts} | |
f91f0fd5 | 109 | unknown_hosts = explicit_hostnames.difference(set(self.get_hostnames())) |
e306af50 TL |
110 | if unknown_hosts: |
111 | raise OrchestratorValidationError( | |
f6b5b4d7 | 112 | f'Cannot place {self.spec.one_line_str()} on {", ".join(sorted(unknown_hosts))}: Unknown hosts') |
e306af50 TL |
113 | |
114 | if self.spec.placement.host_pattern: | |
f91f0fd5 | 115 | pattern_hostnames = self.spec.placement.filter_matching_hostspecs(self.hosts) |
e306af50 TL |
116 | if not pattern_hostnames: |
117 | raise OrchestratorValidationError( | |
118 | f'Cannot place {self.spec.one_line_str()}: No matching hosts') | |
119 | ||
120 | if self.spec.placement.label: | |
f91f0fd5 TL |
121 | label_hosts = self.hosts_by_label(self.spec.placement.label) |
122 | if not label_hosts: | |
e306af50 TL |
123 | raise OrchestratorValidationError( |
124 | f'Cannot place {self.spec.one_line_str()}: No matching ' | |
125 | f'hosts for label {self.spec.placement.label}') | |
126 | ||
f67539c2 TL |
127 | def place_per_host_daemons( |
128 | self, | |
129 | slots: List[DaemonPlacement], | |
130 | to_add: List[DaemonPlacement], | |
131 | to_remove: List[orchestrator.DaemonDescription], | |
132 | ) -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]]: | |
133 | if self.per_host_daemon_type: | |
134 | host_slots = [ | |
135 | DaemonPlacement(daemon_type=self.per_host_daemon_type, | |
136 | hostname=hostname) | |
137 | for hostname in set([s.hostname for s in slots]) | |
138 | ] | |
139 | existing = [ | |
140 | d for d in self.daemons if d.daemon_type == self.per_host_daemon_type | |
141 | ] | |
142 | slots += host_slots | |
143 | for dd in existing: | |
144 | found = False | |
145 | for p in host_slots: | |
146 | if p.matches_daemon(dd): | |
147 | host_slots.remove(p) | |
148 | found = True | |
149 | break | |
150 | if not found: | |
151 | to_remove.append(dd) | |
152 | to_add += host_slots | |
153 | ||
154 | return slots, to_add, to_remove | |
155 | ||
e306af50 | 156 | def place(self): |
f67539c2 | 157 | # type: () -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]] |
e306af50 | 158 | """ |
f6b5b4d7 TL |
159 | Generate a list of HostPlacementSpec taking into account: |
160 | ||
161 | * all known hosts | |
162 | * hosts with existing daemons | |
163 | * placement spec | |
164 | * self.filter_new_host | |
e306af50 TL |
165 | """ |
166 | ||
167 | self.validate() | |
168 | ||
f6b5b4d7 | 169 | count = self.spec.placement.count |
e306af50 | 170 | |
f67539c2 TL |
171 | # get candidate hosts based on [hosts, label, host_pattern] |
172 | candidates = self.get_candidates() # type: List[DaemonPlacement] | |
e306af50 | 173 | |
f67539c2 TL |
174 | def expand_candidates(ls: List[DaemonPlacement], num: int) -> List[DaemonPlacement]: |
175 | r = [] | |
176 | for offset in range(num): | |
177 | r.extend([dp.renumber_ports(offset) for dp in ls]) | |
178 | return r | |
179 | ||
180 | # consider enough slots to fulfill target count-per-host or count | |
f6b5b4d7 | 181 | if count is None: |
f67539c2 TL |
182 | if self.spec.placement.count_per_host: |
183 | per_host = self.spec.placement.count_per_host | |
f91f0fd5 | 184 | else: |
f67539c2 TL |
185 | per_host = 1 |
186 | candidates = expand_candidates(candidates, per_host) | |
187 | elif self.allow_colo and candidates: | |
188 | per_host = 1 + ((count - 1) // len(candidates)) | |
189 | candidates = expand_candidates(candidates, per_host) | |
190 | ||
191 | # consider active (primary) daemons first | |
192 | daemons = [ | |
193 | d for d in self.daemons if d.is_active and d.daemon_type == self.primary_daemon_type | |
194 | ] + [ | |
195 | d for d in self.daemons if not d.is_active and d.daemon_type == self.primary_daemon_type | |
196 | ] | |
f6b5b4d7 | 197 | |
f67539c2 TL |
198 | # sort candidates into existing/used slots that already have a |
199 | # daemon, and others (the rest) | |
200 | existing_active: List[orchestrator.DaemonDescription] = [] | |
201 | existing_standby: List[orchestrator.DaemonDescription] = [] | |
202 | existing_slots: List[DaemonPlacement] = [] | |
203 | to_remove: List[orchestrator.DaemonDescription] = [] | |
204 | others = candidates.copy() | |
205 | for dd in daemons: | |
206 | found = False | |
207 | for p in others: | |
208 | if p.matches_daemon(dd): | |
209 | others.remove(p) | |
210 | if dd.is_active: | |
211 | existing_active.append(dd) | |
212 | else: | |
213 | existing_standby.append(dd) | |
214 | existing_slots.append(p) | |
215 | found = True | |
216 | break | |
217 | if not found: | |
218 | to_remove.append(dd) | |
219 | ||
220 | existing = existing_active + existing_standby | |
f6b5b4d7 | 221 | |
f67539c2 TL |
222 | # If we don't have <count> the list of candidates is definitive. |
223 | if count is None: | |
224 | logger.debug('Provided hosts: %s' % candidates) | |
225 | return self.place_per_host_daemons(candidates, others, to_remove) | |
226 | ||
227 | # The number of new slots that need to be selected in order to fulfill count | |
228 | need = count - len(existing) | |
229 | ||
230 | # we don't need any additional placements | |
231 | if need <= 0: | |
232 | to_remove.extend(existing[count:]) | |
233 | del existing_slots[count:] | |
234 | return self.place_per_host_daemons(existing_slots, [], to_remove) | |
235 | ||
236 | # ask the scheduler to select additional slots | |
237 | to_add = others[:need] | |
238 | logger.debug('Combine hosts with existing daemons %s + new hosts %s' % ( | |
239 | existing, to_add)) | |
240 | return self.place_per_host_daemons(existing_slots + to_add, to_add, to_remove) | |
241 | ||
242 | def find_ip_on_host(self, hostname: str, subnets: List[str]) -> Optional[str]: | |
243 | for subnet in subnets: | |
244 | ips: List[str] = [] | |
245 | for iface, ips in self.networks.get(hostname, {}).get(subnet, {}).items(): | |
246 | ips.extend(ips) | |
247 | if ips: | |
248 | return sorted(ips)[0] | |
249 | return None | |
250 | ||
251 | def get_candidates(self) -> List[DaemonPlacement]: | |
f6b5b4d7 | 252 | if self.spec.placement.hosts: |
f67539c2 TL |
253 | ls = [ |
254 | DaemonPlacement(daemon_type=self.primary_daemon_type, | |
255 | hostname=h.hostname, network=h.network, name=h.name, | |
256 | ports=self.ports_start) | |
257 | for h in self.spec.placement.hosts | |
258 | ] | |
e306af50 | 259 | elif self.spec.placement.label: |
f67539c2 TL |
260 | ls = [ |
261 | DaemonPlacement(daemon_type=self.primary_daemon_type, | |
262 | hostname=x.hostname, ports=self.ports_start) | |
f91f0fd5 | 263 | for x in self.hosts_by_label(self.spec.placement.label) |
e306af50 | 264 | ] |
f6b5b4d7 | 265 | elif self.spec.placement.host_pattern: |
f67539c2 TL |
266 | ls = [ |
267 | DaemonPlacement(daemon_type=self.primary_daemon_type, | |
268 | hostname=x, ports=self.ports_start) | |
f91f0fd5 | 269 | for x in self.spec.placement.filter_matching_hostspecs(self.hosts) |
e306af50 | 270 | ] |
f67539c2 TL |
271 | elif ( |
272 | self.spec.placement.count is not None | |
273 | or self.spec.placement.count_per_host is not None | |
274 | ): | |
275 | ls = [ | |
276 | DaemonPlacement(daemon_type=self.primary_daemon_type, | |
277 | hostname=x.hostname, ports=self.ports_start) | |
278 | for x in self.hosts | |
279 | ] | |
280 | else: | |
f91f0fd5 TL |
281 | raise OrchestratorValidationError( |
282 | "placement spec is empty: no hosts, no label, no pattern, no count") | |
e306af50 | 283 | |
f67539c2 TL |
284 | # allocate an IP? |
285 | if self.spec.networks: | |
286 | orig = ls.copy() | |
287 | ls = [] | |
288 | for p in orig: | |
289 | ip = self.find_ip_on_host(p.hostname, self.spec.networks) | |
290 | if ip: | |
291 | ls.append(DaemonPlacement(daemon_type=self.primary_daemon_type, | |
292 | hostname=p.hostname, network=p.network, | |
293 | name=p.name, ports=p.ports, ip=ip)) | |
294 | else: | |
295 | logger.debug( | |
296 | f'Skipping {p.hostname} with no IP in network(s) {self.spec.networks}' | |
297 | ) | |
298 | ||
299 | if self.filter_new_host: | |
300 | old = ls.copy() | |
301 | ls = [] | |
302 | for h in old: | |
303 | if self.filter_new_host(h.hostname): | |
304 | ls.append(h) | |
305 | else: | |
306 | logger.info( | |
307 | f"Filtered out host {h.hostname}: could not verify host allowed virtual ips") | |
308 | if len(old) > len(ls): | |
309 | logger.debug('Filtered %s down to %s' % (old, ls)) | |
f6b5b4d7 | 310 | |
f67539c2 TL |
311 | # shuffle for pseudo random selection |
312 | # gen seed off of self.spec to make shuffling deterministic | |
313 | seed = hash(self.spec.service_name()) | |
314 | random.Random(seed).shuffle(ls) | |
f6b5b4d7 | 315 | |
f67539c2 | 316 | return ls |