]>
Commit | Line | Data |
---|---|---|
e306af50 TL |
1 | import logging |
2 | import random | |
f6b5b4d7 | 3 | from typing import List, Optional, Callable, Iterable, Tuple, TypeVar, Set |
e306af50 TL |
4 | |
5 | import orchestrator | |
6 | from ceph.deployment.service_spec import PlacementSpec, HostPlacementSpec, ServiceSpec | |
f6b5b4d7 | 7 | from orchestrator._interface import DaemonDescription |
e306af50 TL |
8 | from orchestrator import OrchestratorValidationError |
9 | ||
10 | logger = logging.getLogger(__name__) | |
f6b5b4d7 | 11 | T = TypeVar('T') |
e306af50 TL |
12 | |
13 | class BaseScheduler(object): | |
14 | """ | |
15 | Base Scheduler Interface | |
16 | ||
f6b5b4d7 | 17 | * requires a ServiceSpec |
e306af50 TL |
18 | |
19 | `place(host_pool)` needs to return a List[HostPlacementSpec, ..] | |
20 | """ | |
21 | ||
f6b5b4d7 TL |
22 | def __init__(self, spec): |
23 | # type: (ServiceSpec) -> None | |
24 | self.spec = spec | |
e306af50 TL |
25 | |
26 | def place(self, host_pool, count=None): | |
f6b5b4d7 | 27 | # type: (List[T], Optional[int]) -> List[T] |
e306af50 TL |
28 | raise NotImplementedError |
29 | ||
30 | ||
31 | class SimpleScheduler(BaseScheduler): | |
32 | """ | |
33 | The most simple way to pick/schedule a set of hosts. | |
34 | 1) Shuffle the provided host_pool | |
35 | 2) Select from list up to :count | |
36 | """ | |
f6b5b4d7 TL |
37 | def __init__(self, spec): |
38 | super(SimpleScheduler, self).__init__(spec) | |
e306af50 TL |
39 | |
40 | def place(self, host_pool, count=None): | |
f6b5b4d7 | 41 | # type: (List[T], Optional[int]) -> List[T] |
e306af50 TL |
42 | if not host_pool: |
43 | return [] | |
44 | host_pool = [x for x in host_pool] | |
f6b5b4d7 TL |
45 | # gen seed off of self.spec to make shuffling deterministic |
46 | seed = hash(self.spec.service_name()) | |
e306af50 | 47 | # shuffle for pseudo random selection |
f6b5b4d7 | 48 | random.Random(seed).shuffle(host_pool) |
e306af50 TL |
49 | return host_pool[:count] |
50 | ||
51 | ||
52 | class HostAssignment(object): | |
e306af50 TL |
53 | |
54 | def __init__(self, | |
55 | spec, # type: ServiceSpec | |
56 | get_hosts_func, # type: Callable | |
57 | get_daemons_func, # type: Callable[[str],List[orchestrator.DaemonDescription]] | |
e306af50 TL |
58 | filter_new_host=None, # type: Optional[Callable[[str],bool]] |
59 | scheduler=None, # type: Optional[BaseScheduler] | |
60 | ): | |
61 | assert spec and get_hosts_func and get_daemons_func | |
62 | self.spec = spec # type: ServiceSpec | |
f6b5b4d7 | 63 | self.scheduler = scheduler if scheduler else SimpleScheduler(self.spec) |
e306af50 | 64 | self.get_hosts_func = get_hosts_func |
e306af50 TL |
65 | self.filter_new_host = filter_new_host |
66 | self.service_name = spec.service_name() | |
f6b5b4d7 | 67 | self.daemons = get_daemons_func(self.service_name) |
e306af50 TL |
68 | |
69 | def validate(self): | |
70 | self.spec.validate() | |
71 | ||
f6b5b4d7 TL |
72 | if self.spec.placement.count == 0: |
73 | raise OrchestratorValidationError( | |
74 | f'<count> can not be 0 for {self.spec.one_line_str()}') | |
75 | ||
e306af50 TL |
76 | if self.spec.placement.hosts: |
77 | explicit_hostnames = {h.hostname for h in self.spec.placement.hosts} | |
78 | unknown_hosts = explicit_hostnames.difference(set(self.get_hosts_func())) | |
79 | if unknown_hosts: | |
80 | raise OrchestratorValidationError( | |
f6b5b4d7 | 81 | f'Cannot place {self.spec.one_line_str()} on {", ".join(sorted(unknown_hosts))}: Unknown hosts') |
e306af50 TL |
82 | |
83 | if self.spec.placement.host_pattern: | |
84 | pattern_hostnames = self.spec.placement.filter_matching_hosts(self.get_hosts_func) | |
85 | if not pattern_hostnames: | |
86 | raise OrchestratorValidationError( | |
87 | f'Cannot place {self.spec.one_line_str()}: No matching hosts') | |
88 | ||
89 | if self.spec.placement.label: | |
90 | label_hostnames = self.get_hosts_func(label=self.spec.placement.label) | |
91 | if not label_hostnames: | |
92 | raise OrchestratorValidationError( | |
93 | f'Cannot place {self.spec.one_line_str()}: No matching ' | |
94 | f'hosts for label {self.spec.placement.label}') | |
95 | ||
96 | def place(self): | |
97 | # type: () -> List[HostPlacementSpec] | |
98 | """ | |
f6b5b4d7 TL |
99 | Generate a list of HostPlacementSpec taking into account: |
100 | ||
101 | * all known hosts | |
102 | * hosts with existing daemons | |
103 | * placement spec | |
104 | * self.filter_new_host | |
e306af50 TL |
105 | """ |
106 | ||
107 | self.validate() | |
108 | ||
f6b5b4d7 | 109 | count = self.spec.placement.count |
e306af50 | 110 | |
f6b5b4d7 TL |
111 | # get candidates based on [hosts, label, host_pattern] |
112 | candidates = self.get_candidates() | |
e306af50 | 113 | |
f6b5b4d7 TL |
114 | # If we don't have <count> the list of candidates is definitive. |
115 | if count is None: | |
116 | logger.debug('Provided hosts: %s' % candidates) | |
e306af50 TL |
117 | return candidates |
118 | ||
f6b5b4d7 TL |
119 | # prefer hosts that already have services. |
120 | # this avoids re-assigning to _new_ hosts | |
121 | # and constant re-distribution of hosts when new nodes are | |
122 | # added to the cluster | |
123 | hosts_with_daemons = self.hosts_with_daemons(candidates) | |
124 | ||
125 | # The amount of hosts that need to be selected in order to fulfill count. | |
126 | need = count - len(hosts_with_daemons) | |
127 | ||
128 | # hostspecs that are do not have daemons on them but are still candidates. | |
129 | others = difference_hostspecs(candidates, hosts_with_daemons) | |
130 | ||
131 | # we don't need any additional hosts | |
132 | if need < 0: | |
133 | return self.prefer_hosts_with_active_daemons(hosts_with_daemons, count) | |
134 | else: | |
135 | # exclusive to 'mon' daemons. Filter out hosts that don't have a public network assigned | |
136 | if self.filter_new_host: | |
137 | old = others | |
138 | others = [h for h in others if self.filter_new_host(h.hostname)] | |
139 | logger.debug('filtered %s down to %s' % (old, candidates)) | |
140 | ||
141 | # ask the scheduler to return a set of hosts with a up to the value of <count> | |
142 | others = self.scheduler.place(others, need) | |
143 | logger.debug('Combine hosts with existing daemons %s + new hosts %s' % ( | |
144 | hosts_with_daemons, others)) | |
145 | # if a host already has the anticipated daemon, merge it with the candidates | |
146 | # to get a list of HostPlacementSpec that can be deployed on. | |
147 | return list(merge_hostspecs(hosts_with_daemons, others)) | |
148 | ||
149 | def get_hosts_with_active_daemon(self, hosts: List[HostPlacementSpec]) -> List[HostPlacementSpec]: | |
150 | active_hosts: List['HostPlacementSpec'] = [] | |
151 | for daemon in self.daemons: | |
152 | if daemon.is_active: | |
153 | for h in hosts: | |
154 | if h.hostname == daemon.hostname: | |
155 | active_hosts.append(h) | |
156 | # remove duplicates before returning | |
157 | return list(dict.fromkeys(active_hosts)) | |
158 | ||
159 | def prefer_hosts_with_active_daemons(self, hosts: List[HostPlacementSpec], count) -> List[HostPlacementSpec]: | |
160 | # try to prefer host with active daemon if possible | |
161 | active_hosts = self.get_hosts_with_active_daemon(hosts) | |
162 | if len(active_hosts) != 0 and count > 0: | |
163 | for host in active_hosts: | |
164 | hosts.remove(host) | |
165 | if len(active_hosts) >= count: | |
166 | return self.scheduler.place(active_hosts, count) | |
167 | else: | |
168 | return list(merge_hostspecs(self.scheduler.place(active_hosts, count), | |
169 | self.scheduler.place(hosts, count - len(active_hosts)))) | |
170 | # ask the scheduler to return a set of hosts with a up to the value of <count> | |
171 | return self.scheduler.place(hosts, count) | |
172 | ||
173 | def add_daemon_hosts(self, host_pool: List[HostPlacementSpec]) -> Set[HostPlacementSpec]: | |
174 | hosts_with_daemons = {d.hostname for d in self.daemons} | |
175 | _add_daemon_hosts = set() | |
176 | for host in host_pool: | |
177 | if host.hostname not in hosts_with_daemons: | |
178 | _add_daemon_hosts.add(host) | |
179 | return _add_daemon_hosts | |
180 | ||
181 | def remove_daemon_hosts(self, host_pool: List[HostPlacementSpec]) -> Set[DaemonDescription]: | |
182 | target_hosts = [h.hostname for h in host_pool] | |
183 | _remove_daemon_hosts = set() | |
184 | for d in self.daemons: | |
185 | if d.hostname not in target_hosts: | |
186 | _remove_daemon_hosts.add(d) | |
187 | return _remove_daemon_hosts | |
188 | ||
189 | def get_candidates(self) -> List[HostPlacementSpec]: | |
190 | if self.spec.placement.hosts: | |
191 | return self.spec.placement.hosts | |
e306af50 | 192 | elif self.spec.placement.label: |
f6b5b4d7 | 193 | return [ |
e306af50 TL |
194 | HostPlacementSpec(x, '', '') |
195 | for x in self.get_hosts_func(label=self.spec.placement.label) | |
196 | ] | |
f6b5b4d7 TL |
197 | elif self.spec.placement.host_pattern: |
198 | return [ | |
e306af50 | 199 | HostPlacementSpec(x, '', '') |
f6b5b4d7 | 200 | for x in self.spec.placement.filter_matching_hosts(self.get_hosts_func) |
e306af50 | 201 | ] |
f6b5b4d7 TL |
202 | # If none of the above and also no <count> |
203 | if self.spec.placement.count is None: | |
204 | raise OrchestratorValidationError("placement spec is empty: no hosts, no label, no pattern, no count") | |
205 | # backward compatibility: consider an empty placements to be the same pattern = * | |
206 | return [ | |
207 | HostPlacementSpec(x, '', '') | |
208 | for x in self.get_hosts_func() | |
209 | ] | |
210 | ||
211 | def hosts_with_daemons(self, candidates: List[HostPlacementSpec]) -> List[HostPlacementSpec]: | |
212 | """ | |
213 | Prefer hosts with daemons. Otherwise we'll constantly schedule daemons | |
214 | on different hosts all the time. This is about keeping daemons where | |
215 | they are. This isn't about co-locating. | |
216 | """ | |
217 | hosts_with_daemons = {d.hostname for d in self.daemons} | |
e306af50 | 218 | |
e306af50 | 219 | # calc existing daemons (that aren't already in chosen) |
f6b5b4d7 TL |
220 | existing = [hs for hs in candidates if hs.hostname in hosts_with_daemons] |
221 | ||
222 | logger.debug('Hosts with existing daemons: {}'.format(existing)) | |
223 | return existing | |
224 | ||
225 | ||
226 | def merge_hostspecs(l: List[HostPlacementSpec], r: List[HostPlacementSpec]) -> Iterable[HostPlacementSpec]: | |
227 | """ | |
228 | Merge two lists of HostPlacementSpec by hostname. always returns `l` first. | |
229 | ||
230 | >>> list(merge_hostspecs([HostPlacementSpec(hostname='h', name='x', network='')], | |
231 | ... [HostPlacementSpec(hostname='h', name='y', network='')])) | |
232 | [HostPlacementSpec(hostname='h', network='', name='x')] | |
233 | ||
234 | """ | |
235 | l_names = {h.hostname for h in l} | |
236 | yield from l | |
237 | yield from (h for h in r if h.hostname not in l_names) | |
238 | ||
239 | ||
240 | def difference_hostspecs(l: List[HostPlacementSpec], r: List[HostPlacementSpec]) -> List[HostPlacementSpec]: | |
241 | """ | |
242 | returns l "minus" r by hostname. | |
243 | ||
244 | >>> list(difference_hostspecs([HostPlacementSpec(hostname='h1', name='x', network=''), | |
245 | ... HostPlacementSpec(hostname='h2', name='y', network='')], | |
246 | ... [HostPlacementSpec(hostname='h2', name='', network='')])) | |
247 | [HostPlacementSpec(hostname='h1', network='', name='x')] | |
248 | ||
249 | """ | |
250 | r_names = {h.hostname for h in r} | |
251 | return [h for h in l if h.hostname not in r_names] | |
252 |