]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/schedule.py
692b4282e860e6ab196e771f14c22fd391f82dad
[ceph.git] / ceph / src / pybind / mgr / cephadm / schedule.py
1 import ipaddress
2 import hashlib
3 import logging
4 import random
5 from typing import List, Optional, Callable, TypeVar, Tuple, NamedTuple, Dict
6
7 import orchestrator
8 from ceph.deployment.service_spec import ServiceSpec
9 from orchestrator._interface import DaemonDescription
10 from orchestrator import OrchestratorValidationError
11 from .utils import RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES
12
13 logger = logging.getLogger(__name__)
14 T = TypeVar('T')
15
16
17 class DaemonPlacement(NamedTuple):
18 daemon_type: str
19 hostname: str
20 network: str = '' # for mons only
21 name: str = ''
22 ip: Optional[str] = None
23 ports: List[int] = []
24 rank: Optional[int] = None
25 rank_generation: Optional[int] = None
26
27 def __str__(self) -> str:
28 res = self.daemon_type + ':' + self.hostname
29 other = []
30 if self.rank is not None:
31 other.append(f'rank={self.rank}.{self.rank_generation}')
32 if self.network:
33 other.append(f'network={self.network}')
34 if self.name:
35 other.append(f'name={self.name}')
36 if self.ports:
37 other.append(f'{self.ip or "*"}:{",".join(map(str, self.ports))}')
38 if other:
39 res += '(' + ' '.join(other) + ')'
40 return res
41
42 def renumber_ports(self, n: int) -> 'DaemonPlacement':
43 return DaemonPlacement(
44 self.daemon_type,
45 self.hostname,
46 self.network,
47 self.name,
48 self.ip,
49 [p + n for p in self.ports],
50 self.rank,
51 self.rank_generation,
52 )
53
54 def assign_rank(self, rank: int, gen: int) -> 'DaemonPlacement':
55 return DaemonPlacement(
56 self.daemon_type,
57 self.hostname,
58 self.network,
59 self.name,
60 self.ip,
61 self.ports,
62 rank,
63 gen,
64 )
65
66 def assign_name(self, name: str) -> 'DaemonPlacement':
67 return DaemonPlacement(
68 self.daemon_type,
69 self.hostname,
70 self.network,
71 name,
72 self.ip,
73 self.ports,
74 self.rank,
75 self.rank_generation,
76 )
77
78 def assign_rank_generation(
79 self,
80 rank: int,
81 rank_map: Dict[int, Dict[int, Optional[str]]]
82 ) -> 'DaemonPlacement':
83 if rank not in rank_map:
84 rank_map[rank] = {}
85 gen = 0
86 else:
87 gen = max(rank_map[rank].keys()) + 1
88 rank_map[rank][gen] = None
89 return DaemonPlacement(
90 self.daemon_type,
91 self.hostname,
92 self.network,
93 self.name,
94 self.ip,
95 self.ports,
96 rank,
97 gen,
98 )
99
100 def matches_daemon(self, dd: DaemonDescription) -> bool:
101 if self.daemon_type != dd.daemon_type:
102 return False
103 if self.hostname != dd.hostname:
104 return False
105 # fixme: how to match against network?
106 if self.name and self.name != dd.daemon_id:
107 return False
108 if self.ports:
109 if self.ports != dd.ports and dd.ports:
110 return False
111 if self.ip != dd.ip and dd.ip:
112 return False
113 return True
114
115 def matches_rank_map(
116 self,
117 dd: DaemonDescription,
118 rank_map: Optional[Dict[int, Dict[int, Optional[str]]]],
119 ranks: List[int]
120 ) -> bool:
121 if rank_map is None:
122 # daemon should have no rank
123 return dd.rank is None
124
125 if dd.rank is None:
126 return False
127
128 if dd.rank not in rank_map:
129 return False
130 if dd.rank not in ranks:
131 return False
132
133 # must be the highest/newest rank_generation
134 if dd.rank_generation != max(rank_map[dd.rank].keys()):
135 return False
136
137 # must be *this* daemon
138 return rank_map[dd.rank][dd.rank_generation] == dd.daemon_id
139
140
141 class HostAssignment(object):
142
143 def __init__(self,
144 spec, # type: ServiceSpec
145 hosts: List[orchestrator.HostSpec],
146 unreachable_hosts: List[orchestrator.HostSpec],
147 draining_hosts: List[orchestrator.HostSpec],
148 daemons: List[orchestrator.DaemonDescription],
149 networks: Dict[str, Dict[str, Dict[str, List[str]]]] = {},
150 filter_new_host=None, # type: Optional[Callable[[str],bool]]
151 allow_colo: bool = False,
152 primary_daemon_type: Optional[str] = None,
153 per_host_daemon_type: Optional[str] = None,
154 rank_map: Optional[Dict[int, Dict[int, Optional[str]]]] = None,
155 ):
156 assert spec
157 self.spec = spec # type: ServiceSpec
158 self.primary_daemon_type = primary_daemon_type or spec.service_type
159 self.hosts: List[orchestrator.HostSpec] = hosts
160 self.unreachable_hosts: List[orchestrator.HostSpec] = unreachable_hosts
161 self.draining_hosts: List[orchestrator.HostSpec] = draining_hosts
162 self.filter_new_host = filter_new_host
163 self.service_name = spec.service_name()
164 self.daemons = daemons
165 self.networks = networks
166 self.allow_colo = allow_colo
167 self.per_host_daemon_type = per_host_daemon_type
168 self.ports_start = spec.get_port_start()
169 self.rank_map = rank_map
170
171 def hosts_by_label(self, label: str) -> List[orchestrator.HostSpec]:
172 return [h for h in self.hosts if label in h.labels]
173
174 def get_hostnames(self) -> List[str]:
175 return [h.hostname for h in self.hosts]
176
177 def validate(self) -> None:
178 self.spec.validate()
179
180 if self.spec.placement.count == 0:
181 raise OrchestratorValidationError(
182 f'<count> can not be 0 for {self.spec.one_line_str()}')
183
184 if (
185 self.spec.placement.count_per_host is not None
186 and self.spec.placement.count_per_host > 1
187 and not self.allow_colo
188 ):
189 raise OrchestratorValidationError(
190 f'Cannot place more than one {self.spec.service_type} per host'
191 )
192
193 if self.spec.placement.hosts:
194 explicit_hostnames = {h.hostname for h in self.spec.placement.hosts}
195 known_hosts = self.get_hostnames() + [h.hostname for h in self.draining_hosts]
196 unknown_hosts = explicit_hostnames.difference(set(known_hosts))
197 if unknown_hosts:
198 raise OrchestratorValidationError(
199 f'Cannot place {self.spec.one_line_str()} on {", ".join(sorted(unknown_hosts))}: Unknown hosts')
200
201 if self.spec.placement.host_pattern:
202 pattern_hostnames = self.spec.placement.filter_matching_hostspecs(self.hosts)
203 if not pattern_hostnames:
204 raise OrchestratorValidationError(
205 f'Cannot place {self.spec.one_line_str()}: No matching hosts')
206
207 if self.spec.placement.label:
208 label_hosts = self.hosts_by_label(self.spec.placement.label)
209 if not label_hosts:
210 raise OrchestratorValidationError(
211 f'Cannot place {self.spec.one_line_str()}: No matching '
212 f'hosts for label {self.spec.placement.label}')
213
214 def place_per_host_daemons(
215 self,
216 slots: List[DaemonPlacement],
217 to_add: List[DaemonPlacement],
218 to_remove: List[orchestrator.DaemonDescription],
219 ) -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]]:
220 if self.per_host_daemon_type:
221 host_slots = [
222 DaemonPlacement(daemon_type=self.per_host_daemon_type,
223 hostname=hostname)
224 for hostname in set([s.hostname for s in slots])
225 ]
226 existing = [
227 d for d in self.daemons if d.daemon_type == self.per_host_daemon_type
228 ]
229 slots += host_slots
230 for dd in existing:
231 found = False
232 for p in host_slots:
233 if p.matches_daemon(dd):
234 host_slots.remove(p)
235 found = True
236 break
237 if not found:
238 to_remove.append(dd)
239 to_add += host_slots
240
241 to_remove = [d for d in to_remove if d.hostname not in [
242 h.hostname for h in self.unreachable_hosts]]
243
244 return slots, to_add, to_remove
245
246 def place(self):
247 # type: () -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]]
248 """
249 Generate a list of HostPlacementSpec taking into account:
250
251 * all known hosts
252 * hosts with existing daemons
253 * placement spec
254 * self.filter_new_host
255 """
256
257 self.validate()
258
259 count = self.spec.placement.count
260
261 # get candidate hosts based on [hosts, label, host_pattern]
262 candidates = self.get_candidates() # type: List[DaemonPlacement]
263 if self.primary_daemon_type in RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES:
264 # remove unreachable hosts that are not in maintenance so daemons
265 # on these hosts will be rescheduled
266 candidates = self.remove_non_maintenance_unreachable_candidates(candidates)
267
268 def expand_candidates(ls: List[DaemonPlacement], num: int) -> List[DaemonPlacement]:
269 r = []
270 for offset in range(num):
271 r.extend([dp.renumber_ports(offset) for dp in ls])
272 return r
273
274 # consider enough slots to fulfill target count-per-host or count
275 if count is None:
276 if self.spec.placement.count_per_host:
277 per_host = self.spec.placement.count_per_host
278 else:
279 per_host = 1
280 candidates = expand_candidates(candidates, per_host)
281 elif self.allow_colo and candidates:
282 per_host = 1 + ((count - 1) // len(candidates))
283 candidates = expand_candidates(candidates, per_host)
284
285 # consider (preserve) existing daemons in a particular order...
286 daemons = sorted(
287 [
288 d for d in self.daemons if d.daemon_type == self.primary_daemon_type
289 ],
290 key=lambda d: (
291 not d.is_active, # active before standby
292 d.rank is not None, # ranked first, then non-ranked
293 d.rank, # low ranks
294 0 - (d.rank_generation or 0), # newer generations first
295 )
296 )
297
298 # sort candidates into existing/used slots that already have a
299 # daemon, and others (the rest)
300 existing_active: List[orchestrator.DaemonDescription] = []
301 existing_standby: List[orchestrator.DaemonDescription] = []
302 existing_slots: List[DaemonPlacement] = []
303 to_add: List[DaemonPlacement] = []
304 to_remove: List[orchestrator.DaemonDescription] = []
305 ranks: List[int] = list(range(len(candidates)))
306 others: List[DaemonPlacement] = candidates.copy()
307 for dd in daemons:
308 found = False
309 for p in others:
310 if p.matches_daemon(dd) and p.matches_rank_map(dd, self.rank_map, ranks):
311 others.remove(p)
312 if dd.is_active:
313 existing_active.append(dd)
314 else:
315 existing_standby.append(dd)
316 if dd.rank is not None:
317 assert dd.rank_generation is not None
318 p = p.assign_rank(dd.rank, dd.rank_generation)
319 ranks.remove(dd.rank)
320 existing_slots.append(p)
321 found = True
322 break
323 if not found:
324 to_remove.append(dd)
325
326 # TODO: At some point we want to deploy daemons that are on offline hosts
327 # at what point we do this differs per daemon type. Stateless daemons we could
328 # do quickly to improve availability. Steful daemons we might want to wait longer
329 # to see if the host comes back online
330
331 existing = existing_active + existing_standby
332
333 # build to_add
334 if not count:
335 to_add = [dd for dd in others if dd.hostname not in [
336 h.hostname for h in self.unreachable_hosts]]
337 else:
338 # The number of new slots that need to be selected in order to fulfill count
339 need = count - len(existing)
340
341 # we don't need any additional placements
342 if need <= 0:
343 to_remove.extend(existing[count:])
344 del existing_slots[count:]
345 return self.place_per_host_daemons(existing_slots, [], to_remove)
346
347 for dp in others:
348 if need <= 0:
349 break
350 if dp.hostname not in [h.hostname for h in self.unreachable_hosts]:
351 to_add.append(dp)
352 need -= 1 # this is last use of need in this function so it can work as a counter
353
354 if self.rank_map is not None:
355 # assign unused ranks (and rank_generations) to to_add
356 assert len(ranks) >= len(to_add)
357 for i in range(len(to_add)):
358 to_add[i] = to_add[i].assign_rank_generation(ranks[i], self.rank_map)
359
360 logger.debug('Combine hosts with existing daemons %s + new hosts %s' % (existing, to_add))
361 return self.place_per_host_daemons(existing_slots + to_add, to_add, to_remove)
362
363 def find_ip_on_host(self, hostname: str, subnets: List[str]) -> Optional[str]:
364 for subnet in subnets:
365 ips: List[str] = []
366 # following is to allow loopback interfaces for both ipv4 and ipv6. Since we
367 # only have the subnet (and no IP) we assume default loopback IP address.
368 if ipaddress.ip_network(subnet).is_loopback:
369 if ipaddress.ip_network(subnet).version == 4:
370 ips.append('127.0.0.1')
371 else:
372 ips.append('::1')
373 for iface, iface_ips in self.networks.get(hostname, {}).get(subnet, {}).items():
374 ips.extend(iface_ips)
375 if ips:
376 return sorted(ips)[0]
377 return None
378
379 def get_candidates(self) -> List[DaemonPlacement]:
380 if self.spec.placement.hosts:
381 ls = [
382 DaemonPlacement(daemon_type=self.primary_daemon_type,
383 hostname=h.hostname, network=h.network, name=h.name,
384 ports=self.ports_start)
385 for h in self.spec.placement.hosts if h.hostname not in [dh.hostname for dh in self.draining_hosts]
386 ]
387 elif self.spec.placement.label:
388 ls = [
389 DaemonPlacement(daemon_type=self.primary_daemon_type,
390 hostname=x.hostname, ports=self.ports_start)
391 for x in self.hosts_by_label(self.spec.placement.label)
392 ]
393 elif self.spec.placement.host_pattern:
394 ls = [
395 DaemonPlacement(daemon_type=self.primary_daemon_type,
396 hostname=x, ports=self.ports_start)
397 for x in self.spec.placement.filter_matching_hostspecs(self.hosts)
398 ]
399 elif (
400 self.spec.placement.count is not None
401 or self.spec.placement.count_per_host is not None
402 ):
403 ls = [
404 DaemonPlacement(daemon_type=self.primary_daemon_type,
405 hostname=x.hostname, ports=self.ports_start)
406 for x in self.hosts
407 ]
408 else:
409 raise OrchestratorValidationError(
410 "placement spec is empty: no hosts, no label, no pattern, no count")
411
412 # allocate an IP?
413 if self.spec.networks:
414 orig = ls.copy()
415 ls = []
416 for p in orig:
417 ip = self.find_ip_on_host(p.hostname, self.spec.networks)
418 if ip:
419 ls.append(DaemonPlacement(daemon_type=self.primary_daemon_type,
420 hostname=p.hostname, network=p.network,
421 name=p.name, ports=p.ports, ip=ip))
422 else:
423 logger.debug(
424 f'Skipping {p.hostname} with no IP in network(s) {self.spec.networks}'
425 )
426
427 if self.filter_new_host:
428 old = ls.copy()
429 ls = []
430 for h in old:
431 if self.filter_new_host(h.hostname):
432 ls.append(h)
433 if len(old) > len(ls):
434 logger.debug('Filtered %s down to %s' % (old, ls))
435
436 # now that we have the list of nodes candidates based on the configured
437 # placement, let's shuffle the list for node pseudo-random selection. For this,
438 # we generate a seed from the service name and we use to shuffle the candidates.
439 # This makes shuffling deterministic for the same service name.
440 seed = int(
441 hashlib.sha1(self.spec.service_name().encode('utf-8')).hexdigest(),
442 16
443 ) % (2 ** 32) # truncate result to 32 bits
444 final = sorted(ls)
445 random.Random(seed).shuffle(final)
446 return final
447
448 def remove_non_maintenance_unreachable_candidates(self, candidates: List[DaemonPlacement]) -> List[DaemonPlacement]:
449 in_maintenance: Dict[str, bool] = {}
450 for h in self.hosts:
451 if h.status.lower() == 'maintenance':
452 in_maintenance[h.hostname] = True
453 continue
454 in_maintenance[h.hostname] = False
455 unreachable_hosts = [h.hostname for h in self.unreachable_hosts]
456 candidates = [
457 c for c in candidates if c.hostname not in unreachable_hosts or in_maintenance[c.hostname]]
458 return candidates