3 from typing
import List
, Optional
, Callable
, Iterable
, TypeVar
, Set
6 from ceph
.deployment
.service_spec
import PlacementSpec
, HostPlacementSpec
, ServiceSpec
7 from orchestrator
._interface
import DaemonDescription
8 from orchestrator
import OrchestratorValidationError
10 logger
= logging
.getLogger(__name__
)
14 class BaseScheduler(object):
16 Base Scheduler Interface
18 * requires a ServiceSpec
20 `place(host_pool)` needs to return a List[HostPlacementSpec, ..]
23 def __init__(self
, spec
):
24 # type: (ServiceSpec) -> None
27 def place(self
, host_pool
, count
=None):
28 # type: (List[T], Optional[int]) -> List[T]
29 raise NotImplementedError
32 class SimpleScheduler(BaseScheduler
):
34 The most simple way to pick/schedule a set of hosts.
35 1) Shuffle the provided host_pool
36 2) Select from list up to :count
39 def __init__(self
, spec
):
40 super(SimpleScheduler
, self
).__init
__(spec
)
42 def place(self
, host_pool
, count
=None):
43 # type: (List[T], Optional[int]) -> List[T]
46 host_pool
= [x
for x
in host_pool
]
47 # gen seed off of self.spec to make shuffling deterministic
48 seed
= hash(self
.spec
.service_name())
49 # shuffle for pseudo random selection
50 random
.Random(seed
).shuffle(host_pool
)
51 return host_pool
[:count
]
54 class HostAssignment(object):
57 spec
, # type: ServiceSpec
58 hosts
: List
[orchestrator
.HostSpec
],
59 get_daemons_func
, # type: Callable[[str],List[orchestrator.DaemonDescription]]
60 filter_new_host
=None, # type: Optional[Callable[[str],bool]]
61 scheduler
=None, # type: Optional[BaseScheduler]
63 assert spec
and get_daemons_func
64 self
.spec
= spec
# type: ServiceSpec
65 self
.scheduler
= scheduler
if scheduler
else SimpleScheduler(self
.spec
)
66 self
.hosts
: List
[orchestrator
.HostSpec
] = hosts
67 self
.filter_new_host
= filter_new_host
68 self
.service_name
= spec
.service_name()
69 self
.daemons
= get_daemons_func(self
.service_name
)
71 def hosts_by_label(self
, label
: str) -> List
[orchestrator
.HostSpec
]:
72 return [h
for h
in self
.hosts
if label
in h
.labels
]
74 def get_hostnames(self
) -> List
[str]:
75 return [h
.hostname
for h
in self
.hosts
]
80 if self
.spec
.placement
.count
== 0:
81 raise OrchestratorValidationError(
82 f
'<count> can not be 0 for {self.spec.one_line_str()}')
84 if self
.spec
.placement
.hosts
:
85 explicit_hostnames
= {h
.hostname
for h
in self
.spec
.placement
.hosts
}
86 unknown_hosts
= explicit_hostnames
.difference(set(self
.get_hostnames()))
88 raise OrchestratorValidationError(
89 f
'Cannot place {self.spec.one_line_str()} on {", ".join(sorted(unknown_hosts))}: Unknown hosts')
91 if self
.spec
.placement
.host_pattern
:
92 pattern_hostnames
= self
.spec
.placement
.filter_matching_hostspecs(self
.hosts
)
93 if not pattern_hostnames
:
94 raise OrchestratorValidationError(
95 f
'Cannot place {self.spec.one_line_str()}: No matching hosts')
97 if self
.spec
.placement
.label
:
98 label_hosts
= self
.hosts_by_label(self
.spec
.placement
.label
)
100 raise OrchestratorValidationError(
101 f
'Cannot place {self.spec.one_line_str()}: No matching '
102 f
'hosts for label {self.spec.placement.label}')
105 # type: () -> List[HostPlacementSpec]
107 Generate a list of HostPlacementSpec taking into account:
110 * hosts with existing daemons
112 * self.filter_new_host
117 count
= self
.spec
.placement
.count
119 # get candidates based on [hosts, label, host_pattern]
120 candidates
= self
.get_candidates()
122 # If we don't have <count> the list of candidates is definitive.
124 logger
.debug('Provided hosts: %s' % candidates
)
125 # if asked to place even number of mons, deploy 1 less
126 if self
.spec
.service_type
== 'mon' and (len(candidates
) % 2) == 0:
127 logger
.info("deploying %s monitor(s) instead of %s so monitors may achieve consensus" % (
128 len(candidates
) - 1, len(candidates
)))
129 return candidates
[0:len(candidates
)-1]
132 # if asked to place even number of mons, deploy 1 less
133 if self
.spec
.service_type
== 'mon':
134 # if count >= number of candidates then number of candidates
135 # is determining factor in how many mons will be placed
136 if count
>= len(candidates
):
137 if (len(candidates
) % 2) == 0:
138 logger
.info("deploying %s monitor(s) instead of %s so monitors may achieve consensus" % (
139 len(candidates
) - 1, len(candidates
)))
140 count
= len(candidates
) - 1
141 # if count < number of candidates then count is determining
142 # factor in how many mons will get placed
146 "deploying %s monitor(s) instead of %s so monitors may achieve consensus" % (count
- 1, count
))
149 # prefer hosts that already have services.
150 # this avoids re-assigning to _new_ hosts
151 # and constant re-distribution of hosts when new nodes are
152 # added to the cluster
153 hosts_with_daemons
= self
.hosts_with_daemons(candidates
)
155 # The amount of hosts that need to be selected in order to fulfill count.
156 need
= count
- len(hosts_with_daemons
)
158 # hostspecs that are do not have daemons on them but are still candidates.
159 others
= difference_hostspecs(candidates
, hosts_with_daemons
)
161 # we don't need any additional hosts
163 return self
.prefer_hosts_with_active_daemons(hosts_with_daemons
, count
)
165 # exclusive to 'mon' daemons. Filter out hosts that don't have a public network assigned
166 if self
.filter_new_host
:
168 others
= [h
for h
in others
if self
.filter_new_host(h
.hostname
)]
169 logger
.debug('filtered %s down to %s' % (old
, others
))
171 # ask the scheduler to return a set of hosts with a up to the value of <count>
172 others
= self
.scheduler
.place(others
, need
)
173 logger
.debug('Combine hosts with existing daemons %s + new hosts %s' % (
174 hosts_with_daemons
, others
))
175 # if a host already has the anticipated daemon, merge it with the candidates
176 # to get a list of HostPlacementSpec that can be deployed on.
177 return list(merge_hostspecs(hosts_with_daemons
, others
))
179 def get_hosts_with_active_daemon(self
, hosts
: List
[HostPlacementSpec
]) -> List
[HostPlacementSpec
]:
180 active_hosts
: List
['HostPlacementSpec'] = []
181 for daemon
in self
.daemons
:
184 if h
.hostname
== daemon
.hostname
:
185 active_hosts
.append(h
)
186 # remove duplicates before returning
187 return list(dict.fromkeys(active_hosts
))
189 def prefer_hosts_with_active_daemons(self
, hosts
: List
[HostPlacementSpec
], count
) -> List
[HostPlacementSpec
]:
190 # try to prefer host with active daemon if possible
191 active_hosts
= self
.get_hosts_with_active_daemon(hosts
)
192 if len(active_hosts
) != 0 and count
> 0:
193 for host
in active_hosts
:
195 if len(active_hosts
) >= count
:
196 return self
.scheduler
.place(active_hosts
, count
)
198 return list(merge_hostspecs(self
.scheduler
.place(active_hosts
, count
),
199 self
.scheduler
.place(hosts
, count
- len(active_hosts
))))
200 # ask the scheduler to return a set of hosts with a up to the value of <count>
201 return self
.scheduler
.place(hosts
, count
)
203 def add_daemon_hosts(self
, host_pool
: List
[HostPlacementSpec
]) -> Set
[HostPlacementSpec
]:
204 hosts_with_daemons
= {d
.hostname
for d
in self
.daemons
}
205 _add_daemon_hosts
= set()
206 for host
in host_pool
:
207 if host
.hostname
not in hosts_with_daemons
:
208 _add_daemon_hosts
.add(host
)
209 return _add_daemon_hosts
211 def remove_daemon_hosts(self
, host_pool
: List
[HostPlacementSpec
]) -> Set
[DaemonDescription
]:
212 target_hosts
= [h
.hostname
for h
in host_pool
]
213 _remove_daemon_hosts
= set()
214 for d
in self
.daemons
:
215 if d
.hostname
not in target_hosts
:
216 _remove_daemon_hosts
.add(d
)
217 return _remove_daemon_hosts
219 def get_candidates(self
) -> List
[HostPlacementSpec
]:
220 if self
.spec
.placement
.hosts
:
221 return self
.spec
.placement
.hosts
222 elif self
.spec
.placement
.label
:
224 HostPlacementSpec(x
.hostname
, '', '')
225 for x
in self
.hosts_by_label(self
.spec
.placement
.label
)
227 elif self
.spec
.placement
.host_pattern
:
229 HostPlacementSpec(x
, '', '')
230 for x
in self
.spec
.placement
.filter_matching_hostspecs(self
.hosts
)
232 # If none of the above and also no <count>
233 if self
.spec
.placement
.count
is None:
234 raise OrchestratorValidationError(
235 "placement spec is empty: no hosts, no label, no pattern, no count")
236 # backward compatibility: consider an empty placements to be the same pattern = *
238 HostPlacementSpec(x
.hostname
, '', '')
242 def hosts_with_daemons(self
, candidates
: List
[HostPlacementSpec
]) -> List
[HostPlacementSpec
]:
244 Prefer hosts with daemons. Otherwise we'll constantly schedule daemons
245 on different hosts all the time. This is about keeping daemons where
246 they are. This isn't about co-locating.
248 hosts_with_daemons
= {d
.hostname
for d
in self
.daemons
}
250 # calc existing daemons (that aren't already in chosen)
251 existing
= [hs
for hs
in candidates
if hs
.hostname
in hosts_with_daemons
]
253 logger
.debug('Hosts with existing daemons: {}'.format(existing
))
257 def merge_hostspecs(l
: List
[HostPlacementSpec
], r
: List
[HostPlacementSpec
]) -> Iterable
[HostPlacementSpec
]:
259 Merge two lists of HostPlacementSpec by hostname. always returns `l` first.
261 >>> list(merge_hostspecs([HostPlacementSpec(hostname='h', name='x', network='')],
262 ... [HostPlacementSpec(hostname='h', name='y', network='')]))
263 [HostPlacementSpec(hostname='h', network='', name='x')]
266 l_names
= {h
.hostname
for h
in l
}
268 yield from (h
for h
in r
if h
.hostname
not in l_names
)
271 def difference_hostspecs(l
: List
[HostPlacementSpec
], r
: List
[HostPlacementSpec
]) -> List
[HostPlacementSpec
]:
273 returns l "minus" r by hostname.
275 >>> list(difference_hostspecs([HostPlacementSpec(hostname='h1', name='x', network=''),
276 ... HostPlacementSpec(hostname='h2', name='y', network='')],
277 ... [HostPlacementSpec(hostname='h2', name='', network='')]))
278 [HostPlacementSpec(hostname='h1', network='', name='x')]
281 r_names
= {h
.hostname
for h
in r
}
282 return [h
for h
in l
if h
.hostname
not in r_names
]