]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/schedule.py
import ceph pacific 16.2.5
[ceph.git] / ceph / src / pybind / mgr / cephadm / schedule.py
1 import hashlib
2 import logging
3 import random
4 from typing import List, Optional, Callable, TypeVar, Tuple, NamedTuple, Dict
5
6 import orchestrator
7 from ceph.deployment.service_spec import ServiceSpec
8 from orchestrator._interface import DaemonDescription
9 from orchestrator import OrchestratorValidationError
10
11 logger = logging.getLogger(__name__)
12 T = TypeVar('T')
13
14
15 class DaemonPlacement(NamedTuple):
16 daemon_type: str
17 hostname: str
18 network: str = '' # for mons only
19 name: str = ''
20 ip: Optional[str] = None
21 ports: List[int] = []
22 rank: Optional[int] = None
23 rank_generation: Optional[int] = None
24
25 def __str__(self) -> str:
26 res = self.daemon_type + ':' + self.hostname
27 other = []
28 if self.rank is not None:
29 other.append(f'rank={self.rank}.{self.rank_generation}')
30 if self.network:
31 other.append(f'network={self.network}')
32 if self.name:
33 other.append(f'name={self.name}')
34 if self.ports:
35 other.append(f'{self.ip or "*"}:{",".join(map(str, self.ports))}')
36 if other:
37 res += '(' + ' '.join(other) + ')'
38 return res
39
40 def renumber_ports(self, n: int) -> 'DaemonPlacement':
41 return DaemonPlacement(
42 self.daemon_type,
43 self.hostname,
44 self.network,
45 self.name,
46 self.ip,
47 [p + n for p in self.ports],
48 self.rank,
49 self.rank_generation,
50 )
51
52 def assign_rank(self, rank: int, gen: int) -> 'DaemonPlacement':
53 return DaemonPlacement(
54 self.daemon_type,
55 self.hostname,
56 self.network,
57 self.name,
58 self.ip,
59 self.ports,
60 rank,
61 gen,
62 )
63
64 def assign_name(self, name: str) -> 'DaemonPlacement':
65 return DaemonPlacement(
66 self.daemon_type,
67 self.hostname,
68 self.network,
69 name,
70 self.ip,
71 self.ports,
72 self.rank,
73 self.rank_generation,
74 )
75
76 def assign_rank_generation(
77 self,
78 rank: int,
79 rank_map: Dict[int, Dict[int, Optional[str]]]
80 ) -> 'DaemonPlacement':
81 if rank not in rank_map:
82 rank_map[rank] = {}
83 gen = 0
84 else:
85 gen = max(rank_map[rank].keys()) + 1
86 rank_map[rank][gen] = None
87 return DaemonPlacement(
88 self.daemon_type,
89 self.hostname,
90 self.network,
91 self.name,
92 self.ip,
93 self.ports,
94 rank,
95 gen,
96 )
97
98 def matches_daemon(self, dd: DaemonDescription) -> bool:
99 if self.daemon_type != dd.daemon_type:
100 return False
101 if self.hostname != dd.hostname:
102 return False
103 # fixme: how to match against network?
104 if self.name and self.name != dd.daemon_id:
105 return False
106 if self.ports:
107 if self.ports != dd.ports:
108 return False
109 if self.ip != dd.ip:
110 return False
111 return True
112
113 def matches_rank_map(
114 self,
115 dd: DaemonDescription,
116 rank_map: Optional[Dict[int, Dict[int, Optional[str]]]],
117 ranks: List[int]
118 ) -> bool:
119 if rank_map is None:
120 # daemon should have no rank
121 return dd.rank is None
122
123 if dd.rank is None:
124 return False
125
126 if dd.rank not in rank_map:
127 return False
128 if dd.rank not in ranks:
129 return False
130
131 # must be the highest/newest rank_generation
132 if dd.rank_generation != max(rank_map[dd.rank].keys()):
133 return False
134
135 # must be *this* daemon
136 return rank_map[dd.rank][dd.rank_generation] == dd.daemon_id
137
138
139 class HostAssignment(object):
140
141 def __init__(self,
142 spec, # type: ServiceSpec
143 hosts: List[orchestrator.HostSpec],
144 daemons: List[orchestrator.DaemonDescription],
145 networks: Dict[str, Dict[str, Dict[str, List[str]]]] = {},
146 filter_new_host=None, # type: Optional[Callable[[str],bool]]
147 allow_colo: bool = False,
148 primary_daemon_type: Optional[str] = None,
149 per_host_daemon_type: Optional[str] = None,
150 rank_map: Optional[Dict[int, Dict[int, Optional[str]]]] = None,
151 ):
152 assert spec
153 self.spec = spec # type: ServiceSpec
154 self.primary_daemon_type = primary_daemon_type or spec.service_type
155 self.hosts: List[orchestrator.HostSpec] = hosts
156 self.filter_new_host = filter_new_host
157 self.service_name = spec.service_name()
158 self.daemons = daemons
159 self.networks = networks
160 self.allow_colo = allow_colo
161 self.per_host_daemon_type = per_host_daemon_type
162 self.ports_start = spec.get_port_start()
163 self.rank_map = rank_map
164
165 def hosts_by_label(self, label: str) -> List[orchestrator.HostSpec]:
166 return [h for h in self.hosts if label in h.labels]
167
168 def get_hostnames(self) -> List[str]:
169 return [h.hostname for h in self.hosts]
170
171 def validate(self) -> None:
172 self.spec.validate()
173
174 if self.spec.placement.count == 0:
175 raise OrchestratorValidationError(
176 f'<count> can not be 0 for {self.spec.one_line_str()}')
177
178 if (
179 self.spec.placement.count_per_host is not None
180 and self.spec.placement.count_per_host > 1
181 and not self.allow_colo
182 ):
183 raise OrchestratorValidationError(
184 f'Cannot place more than one {self.spec.service_type} per host'
185 )
186
187 if self.spec.placement.hosts:
188 explicit_hostnames = {h.hostname for h in self.spec.placement.hosts}
189 unknown_hosts = explicit_hostnames.difference(set(self.get_hostnames()))
190 if unknown_hosts:
191 raise OrchestratorValidationError(
192 f'Cannot place {self.spec.one_line_str()} on {", ".join(sorted(unknown_hosts))}: Unknown hosts')
193
194 if self.spec.placement.host_pattern:
195 pattern_hostnames = self.spec.placement.filter_matching_hostspecs(self.hosts)
196 if not pattern_hostnames:
197 raise OrchestratorValidationError(
198 f'Cannot place {self.spec.one_line_str()}: No matching hosts')
199
200 if self.spec.placement.label:
201 label_hosts = self.hosts_by_label(self.spec.placement.label)
202 if not label_hosts:
203 raise OrchestratorValidationError(
204 f'Cannot place {self.spec.one_line_str()}: No matching '
205 f'hosts for label {self.spec.placement.label}')
206
207 def place_per_host_daemons(
208 self,
209 slots: List[DaemonPlacement],
210 to_add: List[DaemonPlacement],
211 to_remove: List[orchestrator.DaemonDescription],
212 ) -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]]:
213 if self.per_host_daemon_type:
214 host_slots = [
215 DaemonPlacement(daemon_type=self.per_host_daemon_type,
216 hostname=hostname)
217 for hostname in set([s.hostname for s in slots])
218 ]
219 existing = [
220 d for d in self.daemons if d.daemon_type == self.per_host_daemon_type
221 ]
222 slots += host_slots
223 for dd in existing:
224 found = False
225 for p in host_slots:
226 if p.matches_daemon(dd):
227 host_slots.remove(p)
228 found = True
229 break
230 if not found:
231 to_remove.append(dd)
232 to_add += host_slots
233
234 return slots, to_add, to_remove
235
236 def place(self):
237 # type: () -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]]
238 """
239 Generate a list of HostPlacementSpec taking into account:
240
241 * all known hosts
242 * hosts with existing daemons
243 * placement spec
244 * self.filter_new_host
245 """
246
247 self.validate()
248
249 count = self.spec.placement.count
250
251 # get candidate hosts based on [hosts, label, host_pattern]
252 candidates = self.get_candidates() # type: List[DaemonPlacement]
253
254 def expand_candidates(ls: List[DaemonPlacement], num: int) -> List[DaemonPlacement]:
255 r = []
256 for offset in range(num):
257 r.extend([dp.renumber_ports(offset) for dp in ls])
258 return r
259
260 # consider enough slots to fulfill target count-per-host or count
261 if count is None:
262 if self.spec.placement.count_per_host:
263 per_host = self.spec.placement.count_per_host
264 else:
265 per_host = 1
266 candidates = expand_candidates(candidates, per_host)
267 elif self.allow_colo and candidates:
268 per_host = 1 + ((count - 1) // len(candidates))
269 candidates = expand_candidates(candidates, per_host)
270
271 # consider (preserve) existing daemons in a particular order...
272 daemons = sorted(
273 [
274 d for d in self.daemons if d.daemon_type == self.primary_daemon_type
275 ],
276 key=lambda d: (
277 not d.is_active, # active before standby
278 d.rank is not None, # ranked first, then non-ranked
279 d.rank, # low ranks
280 0 - (d.rank_generation or 0), # newer generations first
281 )
282 )
283
284 # sort candidates into existing/used slots that already have a
285 # daemon, and others (the rest)
286 existing_active: List[orchestrator.DaemonDescription] = []
287 existing_standby: List[orchestrator.DaemonDescription] = []
288 existing_slots: List[DaemonPlacement] = []
289 to_remove: List[orchestrator.DaemonDescription] = []
290 ranks: List[int] = list(range(len(candidates)))
291 others: List[DaemonPlacement] = candidates.copy()
292 for dd in daemons:
293 found = False
294 for p in others:
295 if p.matches_daemon(dd) and p.matches_rank_map(dd, self.rank_map, ranks):
296 others.remove(p)
297 if dd.is_active:
298 existing_active.append(dd)
299 else:
300 existing_standby.append(dd)
301 if dd.rank is not None:
302 assert dd.rank_generation is not None
303 p = p.assign_rank(dd.rank, dd.rank_generation)
304 ranks.remove(dd.rank)
305 existing_slots.append(p)
306 found = True
307 break
308 if not found:
309 to_remove.append(dd)
310
311 existing = existing_active + existing_standby
312
313 # build to_add
314 if not count:
315 to_add = others
316 else:
317 # The number of new slots that need to be selected in order to fulfill count
318 need = count - len(existing)
319
320 # we don't need any additional placements
321 if need <= 0:
322 to_remove.extend(existing[count:])
323 del existing_slots[count:]
324 return self.place_per_host_daemons(existing_slots, [], to_remove)
325
326 if need > 0:
327 to_add = others[:need]
328
329 if self.rank_map is not None:
330 # assign unused ranks (and rank_generations) to to_add
331 assert len(ranks) >= len(to_add)
332 for i in range(len(to_add)):
333 to_add[i] = to_add[i].assign_rank_generation(ranks[i], self.rank_map)
334
335 # If we don't have <count> the list of candidates is definitive.
336 if count is None:
337 final = existing_slots + to_add
338 logger.debug('Provided hosts: %s' % final)
339 return self.place_per_host_daemons(final, to_add, to_remove)
340
341 logger.debug('Combine hosts with existing daemons %s + new hosts %s' % (
342 existing, to_add))
343 return self.place_per_host_daemons(existing_slots + to_add, to_add, to_remove)
344
345 def find_ip_on_host(self, hostname: str, subnets: List[str]) -> Optional[str]:
346 for subnet in subnets:
347 ips: List[str] = []
348 for iface, ips in self.networks.get(hostname, {}).get(subnet, {}).items():
349 ips.extend(ips)
350 if ips:
351 return sorted(ips)[0]
352 return None
353
354 def get_candidates(self) -> List[DaemonPlacement]:
355 if self.spec.placement.hosts:
356 ls = [
357 DaemonPlacement(daemon_type=self.primary_daemon_type,
358 hostname=h.hostname, network=h.network, name=h.name,
359 ports=self.ports_start)
360 for h in self.spec.placement.hosts
361 ]
362 elif self.spec.placement.label:
363 ls = [
364 DaemonPlacement(daemon_type=self.primary_daemon_type,
365 hostname=x.hostname, ports=self.ports_start)
366 for x in self.hosts_by_label(self.spec.placement.label)
367 ]
368 elif self.spec.placement.host_pattern:
369 ls = [
370 DaemonPlacement(daemon_type=self.primary_daemon_type,
371 hostname=x, ports=self.ports_start)
372 for x in self.spec.placement.filter_matching_hostspecs(self.hosts)
373 ]
374 elif (
375 self.spec.placement.count is not None
376 or self.spec.placement.count_per_host is not None
377 ):
378 ls = [
379 DaemonPlacement(daemon_type=self.primary_daemon_type,
380 hostname=x.hostname, ports=self.ports_start)
381 for x in self.hosts
382 ]
383 else:
384 raise OrchestratorValidationError(
385 "placement spec is empty: no hosts, no label, no pattern, no count")
386
387 # allocate an IP?
388 if self.spec.networks:
389 orig = ls.copy()
390 ls = []
391 for p in orig:
392 ip = self.find_ip_on_host(p.hostname, self.spec.networks)
393 if ip:
394 ls.append(DaemonPlacement(daemon_type=self.primary_daemon_type,
395 hostname=p.hostname, network=p.network,
396 name=p.name, ports=p.ports, ip=ip))
397 else:
398 logger.debug(
399 f'Skipping {p.hostname} with no IP in network(s) {self.spec.networks}'
400 )
401
402 if self.filter_new_host:
403 old = ls.copy()
404 ls = []
405 for h in old:
406 if self.filter_new_host(h.hostname):
407 ls.append(h)
408 if len(old) > len(ls):
409 logger.debug('Filtered %s down to %s' % (old, ls))
410
411 # shuffle for pseudo random selection
412 # gen seed off of self.spec to make shuffling deterministic
413 seed = int(
414 hashlib.sha1(self.spec.service_name().encode('utf-8')).hexdigest(),
415 16
416 ) % (2 ** 32)
417 final = sorted(ls)
418 random.Random(seed).shuffle(final)
419 return ls