]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/schedule.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / pybind / mgr / cephadm / schedule.py
CommitLineData
e306af50
TL
1import logging
2import random
f67539c2 3from typing import List, Optional, Callable, TypeVar, Tuple, NamedTuple, Dict
e306af50
TL
4
5import orchestrator
f67539c2 6from ceph.deployment.service_spec import ServiceSpec
f6b5b4d7 7from orchestrator._interface import DaemonDescription
e306af50
TL
8from orchestrator import OrchestratorValidationError
9
10logger = logging.getLogger(__name__)
f6b5b4d7 11T = TypeVar('T')
e306af50 12
f91f0fd5 13
f67539c2
TL
14class DaemonPlacement(NamedTuple):
15 daemon_type: str
16 hostname: str
17 network: str = '' # for mons only
18 name: str = ''
19 ip: Optional[str] = None
20 ports: List[int] = []
21
22 def __str__(self) -> str:
23 res = self.daemon_type + ':' + self.hostname
24 other = []
25 if self.network:
26 other.append(f'network={self.network}')
27 if self.name:
28 other.append(f'name={self.name}')
29 if self.ports:
30 other.append(f'{self.ip or "*"}:{self.ports[0] if len(self.ports) == 1 else ",".join(map(str, self.ports))}')
31 if other:
32 res += '(' + ' '.join(other) + ')'
33 return res
34
35 def renumber_ports(self, n: int) -> 'DaemonPlacement':
36 return DaemonPlacement(
37 self.daemon_type,
38 self.hostname,
39 self.network,
40 self.name,
41 self.ip,
42 [p + n for p in self.ports],
43 )
44
45 def matches_daemon(self, dd: DaemonDescription) -> bool:
46 if self.daemon_type != dd.daemon_type:
47 return False
48 if self.hostname != dd.hostname:
49 return False
50 # fixme: how to match against network?
51 if self.name and self.name != dd.daemon_id:
52 return False
53 if self.ports:
54 if self.ports != dd.ports:
55 return False
56 if self.ip != dd.ip:
57 return False
58 return True
e306af50
TL
59
60
61class HostAssignment(object):
e306af50
TL
62
63 def __init__(self,
64 spec, # type: ServiceSpec
f91f0fd5 65 hosts: List[orchestrator.HostSpec],
f67539c2
TL
66 daemons: List[orchestrator.DaemonDescription],
67 networks: Dict[str, Dict[str, Dict[str, List[str]]]] = {},
f91f0fd5 68 filter_new_host=None, # type: Optional[Callable[[str],bool]]
f67539c2
TL
69 allow_colo: bool = False,
70 primary_daemon_type: Optional[str] = None,
71 per_host_daemon_type: Optional[str] = None,
e306af50 72 ):
f67539c2 73 assert spec
e306af50 74 self.spec = spec # type: ServiceSpec
f67539c2 75 self.primary_daemon_type = primary_daemon_type or spec.service_type
f91f0fd5 76 self.hosts: List[orchestrator.HostSpec] = hosts
e306af50
TL
77 self.filter_new_host = filter_new_host
78 self.service_name = spec.service_name()
f67539c2
TL
79 self.daemons = daemons
80 self.networks = networks
81 self.allow_colo = allow_colo
82 self.per_host_daemon_type = per_host_daemon_type
83 self.ports_start = spec.get_port_start()
e306af50 84
f91f0fd5
TL
85 def hosts_by_label(self, label: str) -> List[orchestrator.HostSpec]:
86 return [h for h in self.hosts if label in h.labels]
87
88 def get_hostnames(self) -> List[str]:
89 return [h.hostname for h in self.hosts]
90
adb31ebb 91 def validate(self) -> None:
e306af50
TL
92 self.spec.validate()
93
f6b5b4d7
TL
94 if self.spec.placement.count == 0:
95 raise OrchestratorValidationError(
96 f'<count> can not be 0 for {self.spec.one_line_str()}')
97
f67539c2
TL
98 if (
99 self.spec.placement.count_per_host is not None
100 and self.spec.placement.count_per_host > 1
101 and not self.allow_colo
102 ):
103 raise OrchestratorValidationError(
104 f'Cannot place more than one {self.spec.service_type} per host'
105 )
106
e306af50
TL
107 if self.spec.placement.hosts:
108 explicit_hostnames = {h.hostname for h in self.spec.placement.hosts}
f91f0fd5 109 unknown_hosts = explicit_hostnames.difference(set(self.get_hostnames()))
e306af50
TL
110 if unknown_hosts:
111 raise OrchestratorValidationError(
f6b5b4d7 112 f'Cannot place {self.spec.one_line_str()} on {", ".join(sorted(unknown_hosts))}: Unknown hosts')
e306af50
TL
113
114 if self.spec.placement.host_pattern:
f91f0fd5 115 pattern_hostnames = self.spec.placement.filter_matching_hostspecs(self.hosts)
e306af50
TL
116 if not pattern_hostnames:
117 raise OrchestratorValidationError(
118 f'Cannot place {self.spec.one_line_str()}: No matching hosts')
119
120 if self.spec.placement.label:
f91f0fd5
TL
121 label_hosts = self.hosts_by_label(self.spec.placement.label)
122 if not label_hosts:
e306af50
TL
123 raise OrchestratorValidationError(
124 f'Cannot place {self.spec.one_line_str()}: No matching '
125 f'hosts for label {self.spec.placement.label}')
126
f67539c2
TL
127 def place_per_host_daemons(
128 self,
129 slots: List[DaemonPlacement],
130 to_add: List[DaemonPlacement],
131 to_remove: List[orchestrator.DaemonDescription],
132 ) -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]]:
133 if self.per_host_daemon_type:
134 host_slots = [
135 DaemonPlacement(daemon_type=self.per_host_daemon_type,
136 hostname=hostname)
137 for hostname in set([s.hostname for s in slots])
138 ]
139 existing = [
140 d for d in self.daemons if d.daemon_type == self.per_host_daemon_type
141 ]
142 slots += host_slots
143 for dd in existing:
144 found = False
145 for p in host_slots:
146 if p.matches_daemon(dd):
147 host_slots.remove(p)
148 found = True
149 break
150 if not found:
151 to_remove.append(dd)
152 to_add += host_slots
153
154 return slots, to_add, to_remove
155
e306af50 156 def place(self):
f67539c2 157 # type: () -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]]
e306af50 158 """
f6b5b4d7
TL
159 Generate a list of HostPlacementSpec taking into account:
160
161 * all known hosts
162 * hosts with existing daemons
163 * placement spec
164 * self.filter_new_host
e306af50
TL
165 """
166
167 self.validate()
168
f6b5b4d7 169 count = self.spec.placement.count
e306af50 170
f67539c2
TL
171 # get candidate hosts based on [hosts, label, host_pattern]
172 candidates = self.get_candidates() # type: List[DaemonPlacement]
e306af50 173
f67539c2
TL
174 def expand_candidates(ls: List[DaemonPlacement], num: int) -> List[DaemonPlacement]:
175 r = []
176 for offset in range(num):
177 r.extend([dp.renumber_ports(offset) for dp in ls])
178 return r
179
180 # consider enough slots to fulfill target count-per-host or count
f6b5b4d7 181 if count is None:
f67539c2
TL
182 if self.spec.placement.count_per_host:
183 per_host = self.spec.placement.count_per_host
f91f0fd5 184 else:
f67539c2
TL
185 per_host = 1
186 candidates = expand_candidates(candidates, per_host)
187 elif self.allow_colo and candidates:
188 per_host = 1 + ((count - 1) // len(candidates))
189 candidates = expand_candidates(candidates, per_host)
190
191 # consider active (primary) daemons first
192 daemons = [
193 d for d in self.daemons if d.is_active and d.daemon_type == self.primary_daemon_type
194 ] + [
195 d for d in self.daemons if not d.is_active and d.daemon_type == self.primary_daemon_type
196 ]
f6b5b4d7 197
f67539c2
TL
198 # sort candidates into existing/used slots that already have a
199 # daemon, and others (the rest)
200 existing_active: List[orchestrator.DaemonDescription] = []
201 existing_standby: List[orchestrator.DaemonDescription] = []
202 existing_slots: List[DaemonPlacement] = []
203 to_remove: List[orchestrator.DaemonDescription] = []
204 others = candidates.copy()
205 for dd in daemons:
206 found = False
207 for p in others:
208 if p.matches_daemon(dd):
209 others.remove(p)
210 if dd.is_active:
211 existing_active.append(dd)
212 else:
213 existing_standby.append(dd)
214 existing_slots.append(p)
215 found = True
216 break
217 if not found:
218 to_remove.append(dd)
219
220 existing = existing_active + existing_standby
f6b5b4d7 221
f67539c2
TL
222 # If we don't have <count> the list of candidates is definitive.
223 if count is None:
224 logger.debug('Provided hosts: %s' % candidates)
225 return self.place_per_host_daemons(candidates, others, to_remove)
226
227 # The number of new slots that need to be selected in order to fulfill count
228 need = count - len(existing)
229
230 # we don't need any additional placements
231 if need <= 0:
232 to_remove.extend(existing[count:])
233 del existing_slots[count:]
234 return self.place_per_host_daemons(existing_slots, [], to_remove)
235
236 # ask the scheduler to select additional slots
237 to_add = others[:need]
238 logger.debug('Combine hosts with existing daemons %s + new hosts %s' % (
239 existing, to_add))
240 return self.place_per_host_daemons(existing_slots + to_add, to_add, to_remove)
241
242 def find_ip_on_host(self, hostname: str, subnets: List[str]) -> Optional[str]:
243 for subnet in subnets:
244 ips: List[str] = []
245 for iface, ips in self.networks.get(hostname, {}).get(subnet, {}).items():
246 ips.extend(ips)
247 if ips:
248 return sorted(ips)[0]
249 return None
250
251 def get_candidates(self) -> List[DaemonPlacement]:
f6b5b4d7 252 if self.spec.placement.hosts:
f67539c2
TL
253 ls = [
254 DaemonPlacement(daemon_type=self.primary_daemon_type,
255 hostname=h.hostname, network=h.network, name=h.name,
256 ports=self.ports_start)
257 for h in self.spec.placement.hosts
258 ]
e306af50 259 elif self.spec.placement.label:
f67539c2
TL
260 ls = [
261 DaemonPlacement(daemon_type=self.primary_daemon_type,
262 hostname=x.hostname, ports=self.ports_start)
f91f0fd5 263 for x in self.hosts_by_label(self.spec.placement.label)
e306af50 264 ]
f6b5b4d7 265 elif self.spec.placement.host_pattern:
f67539c2
TL
266 ls = [
267 DaemonPlacement(daemon_type=self.primary_daemon_type,
268 hostname=x, ports=self.ports_start)
f91f0fd5 269 for x in self.spec.placement.filter_matching_hostspecs(self.hosts)
e306af50 270 ]
f67539c2
TL
271 elif (
272 self.spec.placement.count is not None
273 or self.spec.placement.count_per_host is not None
274 ):
275 ls = [
276 DaemonPlacement(daemon_type=self.primary_daemon_type,
277 hostname=x.hostname, ports=self.ports_start)
278 for x in self.hosts
279 ]
280 else:
f91f0fd5
TL
281 raise OrchestratorValidationError(
282 "placement spec is empty: no hosts, no label, no pattern, no count")
e306af50 283
f67539c2
TL
284 # allocate an IP?
285 if self.spec.networks:
286 orig = ls.copy()
287 ls = []
288 for p in orig:
289 ip = self.find_ip_on_host(p.hostname, self.spec.networks)
290 if ip:
291 ls.append(DaemonPlacement(daemon_type=self.primary_daemon_type,
292 hostname=p.hostname, network=p.network,
293 name=p.name, ports=p.ports, ip=ip))
294 else:
295 logger.debug(
296 f'Skipping {p.hostname} with no IP in network(s) {self.spec.networks}'
297 )
298
299 if self.filter_new_host:
300 old = ls.copy()
301 ls = []
302 for h in old:
303 if self.filter_new_host(h.hostname):
304 ls.append(h)
305 else:
306 logger.info(
307 f"Filtered out host {h.hostname}: could not verify host allowed virtual ips")
308 if len(old) > len(ls):
309 logger.debug('Filtered %s down to %s' % (old, ls))
f6b5b4d7 310
f67539c2
TL
311 # shuffle for pseudo random selection
312 # gen seed off of self.spec to make shuffling deterministic
313 seed = hash(self.spec.service_name())
314 random.Random(seed).shuffle(ls)
f6b5b4d7 315
f67539c2 316 return ls