]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/schedule.py
import ceph pacific 16.2.5
[ceph.git] / ceph / src / pybind / mgr / cephadm / schedule.py
CommitLineData
b3b6e05e 1import hashlib
e306af50
TL
2import logging
3import random
f67539c2 4from typing import List, Optional, Callable, TypeVar, Tuple, NamedTuple, Dict
e306af50
TL
5
6import orchestrator
f67539c2 7from ceph.deployment.service_spec import ServiceSpec
f6b5b4d7 8from orchestrator._interface import DaemonDescription
e306af50
TL
9from orchestrator import OrchestratorValidationError
10
11logger = logging.getLogger(__name__)
f6b5b4d7 12T = TypeVar('T')
e306af50 13
f91f0fd5 14
f67539c2
TL
15class DaemonPlacement(NamedTuple):
16 daemon_type: str
17 hostname: str
18 network: str = '' # for mons only
19 name: str = ''
20 ip: Optional[str] = None
21 ports: List[int] = []
b3b6e05e
TL
22 rank: Optional[int] = None
23 rank_generation: Optional[int] = None
f67539c2
TL
24
25 def __str__(self) -> str:
26 res = self.daemon_type + ':' + self.hostname
27 other = []
b3b6e05e
TL
28 if self.rank is not None:
29 other.append(f'rank={self.rank}.{self.rank_generation}')
f67539c2
TL
30 if self.network:
31 other.append(f'network={self.network}')
32 if self.name:
33 other.append(f'name={self.name}')
34 if self.ports:
b3b6e05e 35 other.append(f'{self.ip or "*"}:{",".join(map(str, self.ports))}')
f67539c2
TL
36 if other:
37 res += '(' + ' '.join(other) + ')'
38 return res
39
40 def renumber_ports(self, n: int) -> 'DaemonPlacement':
41 return DaemonPlacement(
42 self.daemon_type,
43 self.hostname,
44 self.network,
45 self.name,
46 self.ip,
47 [p + n for p in self.ports],
b3b6e05e
TL
48 self.rank,
49 self.rank_generation,
50 )
51
52 def assign_rank(self, rank: int, gen: int) -> 'DaemonPlacement':
53 return DaemonPlacement(
54 self.daemon_type,
55 self.hostname,
56 self.network,
57 self.name,
58 self.ip,
59 self.ports,
60 rank,
61 gen,
62 )
63
64 def assign_name(self, name: str) -> 'DaemonPlacement':
65 return DaemonPlacement(
66 self.daemon_type,
67 self.hostname,
68 self.network,
69 name,
70 self.ip,
71 self.ports,
72 self.rank,
73 self.rank_generation,
74 )
75
76 def assign_rank_generation(
77 self,
78 rank: int,
79 rank_map: Dict[int, Dict[int, Optional[str]]]
80 ) -> 'DaemonPlacement':
81 if rank not in rank_map:
82 rank_map[rank] = {}
83 gen = 0
84 else:
85 gen = max(rank_map[rank].keys()) + 1
86 rank_map[rank][gen] = None
87 return DaemonPlacement(
88 self.daemon_type,
89 self.hostname,
90 self.network,
91 self.name,
92 self.ip,
93 self.ports,
94 rank,
95 gen,
f67539c2
TL
96 )
97
98 def matches_daemon(self, dd: DaemonDescription) -> bool:
99 if self.daemon_type != dd.daemon_type:
100 return False
101 if self.hostname != dd.hostname:
102 return False
103 # fixme: how to match against network?
104 if self.name and self.name != dd.daemon_id:
105 return False
106 if self.ports:
107 if self.ports != dd.ports:
108 return False
109 if self.ip != dd.ip:
110 return False
111 return True
e306af50 112
b3b6e05e
TL
113 def matches_rank_map(
114 self,
115 dd: DaemonDescription,
116 rank_map: Optional[Dict[int, Dict[int, Optional[str]]]],
117 ranks: List[int]
118 ) -> bool:
119 if rank_map is None:
120 # daemon should have no rank
121 return dd.rank is None
122
123 if dd.rank is None:
124 return False
125
126 if dd.rank not in rank_map:
127 return False
128 if dd.rank not in ranks:
129 return False
130
131 # must be the highest/newest rank_generation
132 if dd.rank_generation != max(rank_map[dd.rank].keys()):
133 return False
134
135 # must be *this* daemon
136 return rank_map[dd.rank][dd.rank_generation] == dd.daemon_id
137
e306af50
TL
138
139class HostAssignment(object):
e306af50
TL
140
141 def __init__(self,
142 spec, # type: ServiceSpec
f91f0fd5 143 hosts: List[orchestrator.HostSpec],
f67539c2
TL
144 daemons: List[orchestrator.DaemonDescription],
145 networks: Dict[str, Dict[str, Dict[str, List[str]]]] = {},
f91f0fd5 146 filter_new_host=None, # type: Optional[Callable[[str],bool]]
f67539c2
TL
147 allow_colo: bool = False,
148 primary_daemon_type: Optional[str] = None,
149 per_host_daemon_type: Optional[str] = None,
b3b6e05e 150 rank_map: Optional[Dict[int, Dict[int, Optional[str]]]] = None,
e306af50 151 ):
f67539c2 152 assert spec
e306af50 153 self.spec = spec # type: ServiceSpec
f67539c2 154 self.primary_daemon_type = primary_daemon_type or spec.service_type
f91f0fd5 155 self.hosts: List[orchestrator.HostSpec] = hosts
e306af50
TL
156 self.filter_new_host = filter_new_host
157 self.service_name = spec.service_name()
f67539c2
TL
158 self.daemons = daemons
159 self.networks = networks
160 self.allow_colo = allow_colo
161 self.per_host_daemon_type = per_host_daemon_type
162 self.ports_start = spec.get_port_start()
b3b6e05e 163 self.rank_map = rank_map
e306af50 164
f91f0fd5
TL
165 def hosts_by_label(self, label: str) -> List[orchestrator.HostSpec]:
166 return [h for h in self.hosts if label in h.labels]
167
168 def get_hostnames(self) -> List[str]:
169 return [h.hostname for h in self.hosts]
170
adb31ebb 171 def validate(self) -> None:
e306af50
TL
172 self.spec.validate()
173
f6b5b4d7
TL
174 if self.spec.placement.count == 0:
175 raise OrchestratorValidationError(
176 f'<count> can not be 0 for {self.spec.one_line_str()}')
177
f67539c2
TL
178 if (
179 self.spec.placement.count_per_host is not None
180 and self.spec.placement.count_per_host > 1
181 and not self.allow_colo
182 ):
183 raise OrchestratorValidationError(
184 f'Cannot place more than one {self.spec.service_type} per host'
185 )
186
e306af50
TL
187 if self.spec.placement.hosts:
188 explicit_hostnames = {h.hostname for h in self.spec.placement.hosts}
f91f0fd5 189 unknown_hosts = explicit_hostnames.difference(set(self.get_hostnames()))
e306af50
TL
190 if unknown_hosts:
191 raise OrchestratorValidationError(
f6b5b4d7 192 f'Cannot place {self.spec.one_line_str()} on {", ".join(sorted(unknown_hosts))}: Unknown hosts')
e306af50
TL
193
194 if self.spec.placement.host_pattern:
f91f0fd5 195 pattern_hostnames = self.spec.placement.filter_matching_hostspecs(self.hosts)
e306af50
TL
196 if not pattern_hostnames:
197 raise OrchestratorValidationError(
198 f'Cannot place {self.spec.one_line_str()}: No matching hosts')
199
200 if self.spec.placement.label:
f91f0fd5
TL
201 label_hosts = self.hosts_by_label(self.spec.placement.label)
202 if not label_hosts:
e306af50
TL
203 raise OrchestratorValidationError(
204 f'Cannot place {self.spec.one_line_str()}: No matching '
205 f'hosts for label {self.spec.placement.label}')
206
f67539c2
TL
207 def place_per_host_daemons(
208 self,
209 slots: List[DaemonPlacement],
210 to_add: List[DaemonPlacement],
211 to_remove: List[orchestrator.DaemonDescription],
212 ) -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]]:
213 if self.per_host_daemon_type:
214 host_slots = [
215 DaemonPlacement(daemon_type=self.per_host_daemon_type,
216 hostname=hostname)
217 for hostname in set([s.hostname for s in slots])
218 ]
219 existing = [
220 d for d in self.daemons if d.daemon_type == self.per_host_daemon_type
221 ]
222 slots += host_slots
223 for dd in existing:
224 found = False
225 for p in host_slots:
226 if p.matches_daemon(dd):
227 host_slots.remove(p)
228 found = True
229 break
230 if not found:
231 to_remove.append(dd)
232 to_add += host_slots
233
234 return slots, to_add, to_remove
235
e306af50 236 def place(self):
f67539c2 237 # type: () -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]]
e306af50 238 """
f6b5b4d7
TL
239 Generate a list of HostPlacementSpec taking into account:
240
241 * all known hosts
242 * hosts with existing daemons
243 * placement spec
244 * self.filter_new_host
e306af50
TL
245 """
246
247 self.validate()
248
f6b5b4d7 249 count = self.spec.placement.count
e306af50 250
f67539c2
TL
251 # get candidate hosts based on [hosts, label, host_pattern]
252 candidates = self.get_candidates() # type: List[DaemonPlacement]
e306af50 253
f67539c2
TL
254 def expand_candidates(ls: List[DaemonPlacement], num: int) -> List[DaemonPlacement]:
255 r = []
256 for offset in range(num):
257 r.extend([dp.renumber_ports(offset) for dp in ls])
258 return r
259
260 # consider enough slots to fulfill target count-per-host or count
f6b5b4d7 261 if count is None:
f67539c2
TL
262 if self.spec.placement.count_per_host:
263 per_host = self.spec.placement.count_per_host
f91f0fd5 264 else:
f67539c2
TL
265 per_host = 1
266 candidates = expand_candidates(candidates, per_host)
267 elif self.allow_colo and candidates:
268 per_host = 1 + ((count - 1) // len(candidates))
269 candidates = expand_candidates(candidates, per_host)
270
b3b6e05e
TL
271 # consider (preserve) existing daemons in a particular order...
272 daemons = sorted(
273 [
274 d for d in self.daemons if d.daemon_type == self.primary_daemon_type
275 ],
276 key=lambda d: (
277 not d.is_active, # active before standby
278 d.rank is not None, # ranked first, then non-ranked
279 d.rank, # low ranks
280 0 - (d.rank_generation or 0), # newer generations first
281 )
282 )
f6b5b4d7 283
f67539c2
TL
284 # sort candidates into existing/used slots that already have a
285 # daemon, and others (the rest)
286 existing_active: List[orchestrator.DaemonDescription] = []
287 existing_standby: List[orchestrator.DaemonDescription] = []
288 existing_slots: List[DaemonPlacement] = []
289 to_remove: List[orchestrator.DaemonDescription] = []
b3b6e05e
TL
290 ranks: List[int] = list(range(len(candidates)))
291 others: List[DaemonPlacement] = candidates.copy()
f67539c2
TL
292 for dd in daemons:
293 found = False
294 for p in others:
b3b6e05e 295 if p.matches_daemon(dd) and p.matches_rank_map(dd, self.rank_map, ranks):
f67539c2
TL
296 others.remove(p)
297 if dd.is_active:
298 existing_active.append(dd)
299 else:
300 existing_standby.append(dd)
b3b6e05e
TL
301 if dd.rank is not None:
302 assert dd.rank_generation is not None
303 p = p.assign_rank(dd.rank, dd.rank_generation)
304 ranks.remove(dd.rank)
f67539c2
TL
305 existing_slots.append(p)
306 found = True
307 break
308 if not found:
309 to_remove.append(dd)
310
311 existing = existing_active + existing_standby
f6b5b4d7 312
b3b6e05e
TL
313 # build to_add
314 if not count:
315 to_add = others
316 else:
317 # The number of new slots that need to be selected in order to fulfill count
318 need = count - len(existing)
319
320 # we don't need any additional placements
321 if need <= 0:
322 to_remove.extend(existing[count:])
323 del existing_slots[count:]
324 return self.place_per_host_daemons(existing_slots, [], to_remove)
f67539c2 325
b3b6e05e
TL
326 if need > 0:
327 to_add = others[:need]
f67539c2 328
b3b6e05e
TL
329 if self.rank_map is not None:
330 # assign unused ranks (and rank_generations) to to_add
331 assert len(ranks) >= len(to_add)
332 for i in range(len(to_add)):
333 to_add[i] = to_add[i].assign_rank_generation(ranks[i], self.rank_map)
334
335 # If we don't have <count> the list of candidates is definitive.
336 if count is None:
337 final = existing_slots + to_add
338 logger.debug('Provided hosts: %s' % final)
339 return self.place_per_host_daemons(final, to_add, to_remove)
f67539c2 340
f67539c2
TL
341 logger.debug('Combine hosts with existing daemons %s + new hosts %s' % (
342 existing, to_add))
343 return self.place_per_host_daemons(existing_slots + to_add, to_add, to_remove)
344
345 def find_ip_on_host(self, hostname: str, subnets: List[str]) -> Optional[str]:
346 for subnet in subnets:
347 ips: List[str] = []
348 for iface, ips in self.networks.get(hostname, {}).get(subnet, {}).items():
349 ips.extend(ips)
350 if ips:
351 return sorted(ips)[0]
352 return None
353
354 def get_candidates(self) -> List[DaemonPlacement]:
f6b5b4d7 355 if self.spec.placement.hosts:
f67539c2
TL
356 ls = [
357 DaemonPlacement(daemon_type=self.primary_daemon_type,
358 hostname=h.hostname, network=h.network, name=h.name,
359 ports=self.ports_start)
360 for h in self.spec.placement.hosts
361 ]
e306af50 362 elif self.spec.placement.label:
f67539c2
TL
363 ls = [
364 DaemonPlacement(daemon_type=self.primary_daemon_type,
365 hostname=x.hostname, ports=self.ports_start)
f91f0fd5 366 for x in self.hosts_by_label(self.spec.placement.label)
e306af50 367 ]
f6b5b4d7 368 elif self.spec.placement.host_pattern:
f67539c2
TL
369 ls = [
370 DaemonPlacement(daemon_type=self.primary_daemon_type,
371 hostname=x, ports=self.ports_start)
f91f0fd5 372 for x in self.spec.placement.filter_matching_hostspecs(self.hosts)
e306af50 373 ]
f67539c2
TL
374 elif (
375 self.spec.placement.count is not None
376 or self.spec.placement.count_per_host is not None
377 ):
378 ls = [
379 DaemonPlacement(daemon_type=self.primary_daemon_type,
380 hostname=x.hostname, ports=self.ports_start)
381 for x in self.hosts
382 ]
383 else:
f91f0fd5
TL
384 raise OrchestratorValidationError(
385 "placement spec is empty: no hosts, no label, no pattern, no count")
e306af50 386
f67539c2
TL
387 # allocate an IP?
388 if self.spec.networks:
389 orig = ls.copy()
390 ls = []
391 for p in orig:
392 ip = self.find_ip_on_host(p.hostname, self.spec.networks)
393 if ip:
394 ls.append(DaemonPlacement(daemon_type=self.primary_daemon_type,
395 hostname=p.hostname, network=p.network,
396 name=p.name, ports=p.ports, ip=ip))
397 else:
398 logger.debug(
399 f'Skipping {p.hostname} with no IP in network(s) {self.spec.networks}'
400 )
401
402 if self.filter_new_host:
403 old = ls.copy()
404 ls = []
405 for h in old:
406 if self.filter_new_host(h.hostname):
407 ls.append(h)
f67539c2
TL
408 if len(old) > len(ls):
409 logger.debug('Filtered %s down to %s' % (old, ls))
f6b5b4d7 410
f67539c2
TL
411 # shuffle for pseudo random selection
412 # gen seed off of self.spec to make shuffling deterministic
b3b6e05e
TL
413 seed = int(
414 hashlib.sha1(self.spec.service_name().encode('utf-8')).hexdigest(),
415 16
416 ) % (2 ** 32)
417 final = sorted(ls)
418 random.Random(seed).shuffle(final)
f67539c2 419 return ls