]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/schedule.py
dbe723875049e74a848ecafcac37985a83c15229
[ceph.git] / ceph / src / pybind / mgr / cephadm / schedule.py
1 import logging
2 import random
3 from typing import List, Optional, Callable, TypeVar, Tuple, NamedTuple, Dict
4
5 import orchestrator
6 from ceph.deployment.service_spec import ServiceSpec
7 from orchestrator._interface import DaemonDescription
8 from orchestrator import OrchestratorValidationError
9
10 logger = logging.getLogger(__name__)
11 T = TypeVar('T')
12
13
14 class DaemonPlacement(NamedTuple):
15 daemon_type: str
16 hostname: str
17 network: str = '' # for mons only
18 name: str = ''
19 ip: Optional[str] = None
20 ports: List[int] = []
21
22 def __str__(self) -> str:
23 res = self.daemon_type + ':' + self.hostname
24 other = []
25 if self.network:
26 other.append(f'network={self.network}')
27 if self.name:
28 other.append(f'name={self.name}')
29 if self.ports:
30 other.append(f'{self.ip or "*"}:{self.ports[0] if len(self.ports) == 1 else ",".join(map(str, self.ports))}')
31 if other:
32 res += '(' + ' '.join(other) + ')'
33 return res
34
35 def renumber_ports(self, n: int) -> 'DaemonPlacement':
36 return DaemonPlacement(
37 self.daemon_type,
38 self.hostname,
39 self.network,
40 self.name,
41 self.ip,
42 [p + n for p in self.ports],
43 )
44
45 def matches_daemon(self, dd: DaemonDescription) -> bool:
46 if self.daemon_type != dd.daemon_type:
47 return False
48 if self.hostname != dd.hostname:
49 return False
50 # fixme: how to match against network?
51 if self.name and self.name != dd.daemon_id:
52 return False
53 if self.ports:
54 if self.ports != dd.ports:
55 return False
56 if self.ip != dd.ip:
57 return False
58 return True
59
60
61 class HostAssignment(object):
62
63 def __init__(self,
64 spec, # type: ServiceSpec
65 hosts: List[orchestrator.HostSpec],
66 daemons: List[orchestrator.DaemonDescription],
67 networks: Dict[str, Dict[str, Dict[str, List[str]]]] = {},
68 filter_new_host=None, # type: Optional[Callable[[str],bool]]
69 allow_colo: bool = False,
70 primary_daemon_type: Optional[str] = None,
71 per_host_daemon_type: Optional[str] = None,
72 ):
73 assert spec
74 self.spec = spec # type: ServiceSpec
75 self.primary_daemon_type = primary_daemon_type or spec.service_type
76 self.hosts: List[orchestrator.HostSpec] = hosts
77 self.filter_new_host = filter_new_host
78 self.service_name = spec.service_name()
79 self.daemons = daemons
80 self.networks = networks
81 self.allow_colo = allow_colo
82 self.per_host_daemon_type = per_host_daemon_type
83 self.ports_start = spec.get_port_start()
84
85 def hosts_by_label(self, label: str) -> List[orchestrator.HostSpec]:
86 return [h for h in self.hosts if label in h.labels]
87
88 def get_hostnames(self) -> List[str]:
89 return [h.hostname for h in self.hosts]
90
91 def validate(self) -> None:
92 self.spec.validate()
93
94 if self.spec.placement.count == 0:
95 raise OrchestratorValidationError(
96 f'<count> can not be 0 for {self.spec.one_line_str()}')
97
98 if (
99 self.spec.placement.count_per_host is not None
100 and self.spec.placement.count_per_host > 1
101 and not self.allow_colo
102 ):
103 raise OrchestratorValidationError(
104 f'Cannot place more than one {self.spec.service_type} per host'
105 )
106
107 if self.spec.placement.hosts:
108 explicit_hostnames = {h.hostname for h in self.spec.placement.hosts}
109 unknown_hosts = explicit_hostnames.difference(set(self.get_hostnames()))
110 if unknown_hosts:
111 raise OrchestratorValidationError(
112 f'Cannot place {self.spec.one_line_str()} on {", ".join(sorted(unknown_hosts))}: Unknown hosts')
113
114 if self.spec.placement.host_pattern:
115 pattern_hostnames = self.spec.placement.filter_matching_hostspecs(self.hosts)
116 if not pattern_hostnames:
117 raise OrchestratorValidationError(
118 f'Cannot place {self.spec.one_line_str()}: No matching hosts')
119
120 if self.spec.placement.label:
121 label_hosts = self.hosts_by_label(self.spec.placement.label)
122 if not label_hosts:
123 raise OrchestratorValidationError(
124 f'Cannot place {self.spec.one_line_str()}: No matching '
125 f'hosts for label {self.spec.placement.label}')
126
127 def place_per_host_daemons(
128 self,
129 slots: List[DaemonPlacement],
130 to_add: List[DaemonPlacement],
131 to_remove: List[orchestrator.DaemonDescription],
132 ) -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]]:
133 if self.per_host_daemon_type:
134 host_slots = [
135 DaemonPlacement(daemon_type=self.per_host_daemon_type,
136 hostname=hostname)
137 for hostname in set([s.hostname for s in slots])
138 ]
139 existing = [
140 d for d in self.daemons if d.daemon_type == self.per_host_daemon_type
141 ]
142 slots += host_slots
143 for dd in existing:
144 found = False
145 for p in host_slots:
146 if p.matches_daemon(dd):
147 host_slots.remove(p)
148 found = True
149 break
150 if not found:
151 to_remove.append(dd)
152 to_add += host_slots
153
154 return slots, to_add, to_remove
155
156 def place(self):
157 # type: () -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]]
158 """
159 Generate a list of HostPlacementSpec taking into account:
160
161 * all known hosts
162 * hosts with existing daemons
163 * placement spec
164 * self.filter_new_host
165 """
166
167 self.validate()
168
169 count = self.spec.placement.count
170
171 # get candidate hosts based on [hosts, label, host_pattern]
172 candidates = self.get_candidates() # type: List[DaemonPlacement]
173
174 def expand_candidates(ls: List[DaemonPlacement], num: int) -> List[DaemonPlacement]:
175 r = []
176 for offset in range(num):
177 r.extend([dp.renumber_ports(offset) for dp in ls])
178 return r
179
180 # consider enough slots to fulfill target count-per-host or count
181 if count is None:
182 if self.spec.placement.count_per_host:
183 per_host = self.spec.placement.count_per_host
184 else:
185 per_host = 1
186 candidates = expand_candidates(candidates, per_host)
187 elif self.allow_colo and candidates:
188 per_host = 1 + ((count - 1) // len(candidates))
189 candidates = expand_candidates(candidates, per_host)
190
191 # consider active (primary) daemons first
192 daemons = [
193 d for d in self.daemons if d.is_active and d.daemon_type == self.primary_daemon_type
194 ] + [
195 d for d in self.daemons if not d.is_active and d.daemon_type == self.primary_daemon_type
196 ]
197
198 # sort candidates into existing/used slots that already have a
199 # daemon, and others (the rest)
200 existing_active: List[orchestrator.DaemonDescription] = []
201 existing_standby: List[orchestrator.DaemonDescription] = []
202 existing_slots: List[DaemonPlacement] = []
203 to_remove: List[orchestrator.DaemonDescription] = []
204 others = candidates.copy()
205 for dd in daemons:
206 found = False
207 for p in others:
208 if p.matches_daemon(dd):
209 others.remove(p)
210 if dd.is_active:
211 existing_active.append(dd)
212 else:
213 existing_standby.append(dd)
214 existing_slots.append(p)
215 found = True
216 break
217 if not found:
218 to_remove.append(dd)
219
220 existing = existing_active + existing_standby
221
222 # If we don't have <count> the list of candidates is definitive.
223 if count is None:
224 logger.debug('Provided hosts: %s' % candidates)
225 return self.place_per_host_daemons(candidates, others, to_remove)
226
227 # The number of new slots that need to be selected in order to fulfill count
228 need = count - len(existing)
229
230 # we don't need any additional placements
231 if need <= 0:
232 to_remove.extend(existing[count:])
233 del existing_slots[count:]
234 return self.place_per_host_daemons(existing_slots, [], to_remove)
235
236 # ask the scheduler to select additional slots
237 to_add = others[:need]
238 logger.debug('Combine hosts with existing daemons %s + new hosts %s' % (
239 existing, to_add))
240 return self.place_per_host_daemons(existing_slots + to_add, to_add, to_remove)
241
242 def find_ip_on_host(self, hostname: str, subnets: List[str]) -> Optional[str]:
243 for subnet in subnets:
244 ips: List[str] = []
245 for iface, ips in self.networks.get(hostname, {}).get(subnet, {}).items():
246 ips.extend(ips)
247 if ips:
248 return sorted(ips)[0]
249 return None
250
251 def get_candidates(self) -> List[DaemonPlacement]:
252 if self.spec.placement.hosts:
253 ls = [
254 DaemonPlacement(daemon_type=self.primary_daemon_type,
255 hostname=h.hostname, network=h.network, name=h.name,
256 ports=self.ports_start)
257 for h in self.spec.placement.hosts
258 ]
259 elif self.spec.placement.label:
260 ls = [
261 DaemonPlacement(daemon_type=self.primary_daemon_type,
262 hostname=x.hostname, ports=self.ports_start)
263 for x in self.hosts_by_label(self.spec.placement.label)
264 ]
265 elif self.spec.placement.host_pattern:
266 ls = [
267 DaemonPlacement(daemon_type=self.primary_daemon_type,
268 hostname=x, ports=self.ports_start)
269 for x in self.spec.placement.filter_matching_hostspecs(self.hosts)
270 ]
271 elif (
272 self.spec.placement.count is not None
273 or self.spec.placement.count_per_host is not None
274 ):
275 ls = [
276 DaemonPlacement(daemon_type=self.primary_daemon_type,
277 hostname=x.hostname, ports=self.ports_start)
278 for x in self.hosts
279 ]
280 else:
281 raise OrchestratorValidationError(
282 "placement spec is empty: no hosts, no label, no pattern, no count")
283
284 # allocate an IP?
285 if self.spec.networks:
286 orig = ls.copy()
287 ls = []
288 for p in orig:
289 ip = self.find_ip_on_host(p.hostname, self.spec.networks)
290 if ip:
291 ls.append(DaemonPlacement(daemon_type=self.primary_daemon_type,
292 hostname=p.hostname, network=p.network,
293 name=p.name, ports=p.ports, ip=ip))
294 else:
295 logger.debug(
296 f'Skipping {p.hostname} with no IP in network(s) {self.spec.networks}'
297 )
298
299 if self.filter_new_host:
300 old = ls.copy()
301 ls = []
302 for h in old:
303 if self.filter_new_host(h.hostname):
304 ls.append(h)
305 else:
306 logger.info(
307 f"Filtered out host {h.hostname}: could not verify host allowed virtual ips")
308 if len(old) > len(ls):
309 logger.debug('Filtered %s down to %s' % (old, ls))
310
311 # shuffle for pseudo random selection
312 # gen seed off of self.spec to make shuffling deterministic
313 seed = hash(self.spec.service_name())
314 random.Random(seed).shuffle(ls)
315
316 return ls