5 from typing
import List
, Optional
, Callable
, TypeVar
, Tuple
, NamedTuple
, Dict
8 from ceph
.deployment
.service_spec
import ServiceSpec
9 from orchestrator
._interface
import DaemonDescription
10 from orchestrator
import OrchestratorValidationError
11 from .utils
import RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES
13 logger
= logging
.getLogger(__name__
)
17 class DaemonPlacement(NamedTuple
):
20 network
: str = '' # for mons only
22 ip
: Optional
[str] = None
24 rank
: Optional
[int] = None
25 rank_generation
: Optional
[int] = None
27 def __str__(self
) -> str:
28 res
= self
.daemon_type
+ ':' + self
.hostname
30 if self
.rank
is not None:
31 other
.append(f
'rank={self.rank}.{self.rank_generation}')
33 other
.append(f
'network={self.network}')
35 other
.append(f
'name={self.name}')
37 other
.append(f
'{self.ip or "*"}:{",".join(map(str, self.ports))}')
39 res
+= '(' + ' '.join(other
) + ')'
42 def renumber_ports(self
, n
: int) -> 'DaemonPlacement':
43 return DaemonPlacement(
49 [p
+ n
for p
in self
.ports
],
54 def assign_rank(self
, rank
: int, gen
: int) -> 'DaemonPlacement':
55 return DaemonPlacement(
66 def assign_name(self
, name
: str) -> 'DaemonPlacement':
67 return DaemonPlacement(
78 def assign_rank_generation(
81 rank_map
: Dict
[int, Dict
[int, Optional
[str]]]
82 ) -> 'DaemonPlacement':
83 if rank
not in rank_map
:
87 gen
= max(rank_map
[rank
].keys()) + 1
88 rank_map
[rank
][gen
] = None
89 return DaemonPlacement(
100 def matches_daemon(self
, dd
: DaemonDescription
) -> bool:
101 if self
.daemon_type
!= dd
.daemon_type
:
103 if self
.hostname
!= dd
.hostname
:
105 # fixme: how to match against network?
106 if self
.name
and self
.name
!= dd
.daemon_id
:
109 if self
.ports
!= dd
.ports
and dd
.ports
:
111 if self
.ip
!= dd
.ip
and dd
.ip
:
115 def matches_rank_map(
117 dd
: DaemonDescription
,
118 rank_map
: Optional
[Dict
[int, Dict
[int, Optional
[str]]]],
122 # daemon should have no rank
123 return dd
.rank
is None
128 if dd
.rank
not in rank_map
:
130 if dd
.rank
not in ranks
:
133 # must be the highest/newest rank_generation
134 if dd
.rank_generation
!= max(rank_map
[dd
.rank
].keys()):
137 # must be *this* daemon
138 return rank_map
[dd
.rank
][dd
.rank_generation
] == dd
.daemon_id
141 class HostAssignment(object):
145 hosts
: List
[orchestrator
.HostSpec
],
146 unreachable_hosts
: List
[orchestrator
.HostSpec
],
147 draining_hosts
: List
[orchestrator
.HostSpec
],
148 daemons
: List
[orchestrator
.DaemonDescription
],
149 related_service_daemons
: Optional
[List
[DaemonDescription
]] = None,
150 networks
: Dict
[str, Dict
[str, Dict
[str, List
[str]]]] = {},
151 filter_new_host
: Optional
[Callable
[[str, ServiceSpec
], bool]] = None,
152 allow_colo
: bool = False,
153 primary_daemon_type
: Optional
[str] = None,
154 per_host_daemon_type
: Optional
[str] = None,
155 rank_map
: Optional
[Dict
[int, Dict
[int, Optional
[str]]]] = None,
158 self
.spec
= spec
# type: ServiceSpec
159 self
.primary_daemon_type
= primary_daemon_type
or spec
.service_type
160 self
.hosts
: List
[orchestrator
.HostSpec
] = hosts
161 self
.unreachable_hosts
: List
[orchestrator
.HostSpec
] = unreachable_hosts
162 self
.draining_hosts
: List
[orchestrator
.HostSpec
] = draining_hosts
163 self
.filter_new_host
= filter_new_host
164 self
.service_name
= spec
.service_name()
165 self
.daemons
= daemons
166 self
.related_service_daemons
= related_service_daemons
167 self
.networks
= networks
168 self
.allow_colo
= allow_colo
169 self
.per_host_daemon_type
= per_host_daemon_type
170 self
.ports_start
= spec
.get_port_start()
171 self
.rank_map
= rank_map
173 def hosts_by_label(self
, label
: str) -> List
[orchestrator
.HostSpec
]:
174 return [h
for h
in self
.hosts
if label
in h
.labels
]
176 def get_hostnames(self
) -> List
[str]:
177 return [h
.hostname
for h
in self
.hosts
]
179 def validate(self
) -> None:
182 if self
.spec
.placement
.count
== 0:
183 raise OrchestratorValidationError(
184 f
'<count> can not be 0 for {self.spec.one_line_str()}')
187 self
.spec
.placement
.count_per_host
is not None
188 and self
.spec
.placement
.count_per_host
> 1
189 and not self
.allow_colo
191 raise OrchestratorValidationError(
192 f
'Cannot place more than one {self.spec.service_type} per host'
195 if self
.spec
.placement
.hosts
:
196 explicit_hostnames
= {h
.hostname
for h
in self
.spec
.placement
.hosts
}
197 known_hosts
= self
.get_hostnames() + [h
.hostname
for h
in self
.draining_hosts
]
198 unknown_hosts
= explicit_hostnames
.difference(set(known_hosts
))
200 raise OrchestratorValidationError(
201 f
'Cannot place {self.spec.one_line_str()} on {", ".join(sorted(unknown_hosts))}: Unknown hosts')
203 if self
.spec
.placement
.host_pattern
:
204 pattern_hostnames
= self
.spec
.placement
.filter_matching_hostspecs(self
.hosts
)
205 if not pattern_hostnames
:
206 raise OrchestratorValidationError(
207 f
'Cannot place {self.spec.one_line_str()}: No matching hosts')
209 if self
.spec
.placement
.label
:
210 label_hosts
= self
.hosts_by_label(self
.spec
.placement
.label
)
212 raise OrchestratorValidationError(
213 f
'Cannot place {self.spec.one_line_str()}: No matching '
214 f
'hosts for label {self.spec.placement.label}')
216 def place_per_host_daemons(
218 slots
: List
[DaemonPlacement
],
219 to_add
: List
[DaemonPlacement
],
220 to_remove
: List
[orchestrator
.DaemonDescription
],
221 ) -> Tuple
[List
[DaemonPlacement
], List
[DaemonPlacement
], List
[orchestrator
.DaemonDescription
]]:
222 if self
.per_host_daemon_type
:
224 DaemonPlacement(daemon_type
=self
.per_host_daemon_type
,
226 for hostname
in set([s
.hostname
for s
in slots
])
229 d
for d
in self
.daemons
if d
.daemon_type
== self
.per_host_daemon_type
235 if p
.matches_daemon(dd
):
243 to_remove
= [d
for d
in to_remove
if d
.hostname
not in [
244 h
.hostname
for h
in self
.unreachable_hosts
]]
246 return slots
, to_add
, to_remove
249 # type: () -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]]
251 Generate a list of HostPlacementSpec taking into account:
254 * hosts with existing daemons
256 * self.filter_new_host
261 count
= self
.spec
.placement
.count
263 # get candidate hosts based on [hosts, label, host_pattern]
264 candidates
= self
.get_candidates() # type: List[DaemonPlacement]
265 if self
.primary_daemon_type
in RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES
:
266 # remove unreachable hosts that are not in maintenance so daemons
267 # on these hosts will be rescheduled
268 candidates
= self
.remove_non_maintenance_unreachable_candidates(candidates
)
270 def expand_candidates(ls
: List
[DaemonPlacement
], num
: int) -> List
[DaemonPlacement
]:
272 for offset
in range(num
):
273 r
.extend([dp
.renumber_ports(offset
) for dp
in ls
])
276 # consider enough slots to fulfill target count-per-host or count
278 if self
.spec
.placement
.count_per_host
:
279 per_host
= self
.spec
.placement
.count_per_host
282 candidates
= expand_candidates(candidates
, per_host
)
283 elif self
.allow_colo
and candidates
:
284 per_host
= 1 + ((count
- 1) // len(candidates
))
285 candidates
= expand_candidates(candidates
, per_host
)
287 # consider (preserve) existing daemons in a particular order...
290 d
for d
in self
.daemons
if d
.daemon_type
== self
.primary_daemon_type
293 not d
.is_active
, # active before standby
294 d
.rank
is not None, # ranked first, then non-ranked
296 0 - (d
.rank_generation
or 0), # newer generations first
300 # sort candidates into existing/used slots that already have a
301 # daemon, and others (the rest)
302 existing_active
: List
[orchestrator
.DaemonDescription
] = []
303 existing_standby
: List
[orchestrator
.DaemonDescription
] = []
304 existing_slots
: List
[DaemonPlacement
] = []
305 to_add
: List
[DaemonPlacement
] = []
306 to_remove
: List
[orchestrator
.DaemonDescription
] = []
307 ranks
: List
[int] = list(range(len(candidates
)))
308 others
: List
[DaemonPlacement
] = candidates
.copy()
312 if p
.matches_daemon(dd
) and p
.matches_rank_map(dd
, self
.rank_map
, ranks
):
315 existing_active
.append(dd
)
317 existing_standby
.append(dd
)
318 if dd
.rank
is not None:
319 assert dd
.rank_generation
is not None
320 p
= p
.assign_rank(dd
.rank
, dd
.rank_generation
)
321 ranks
.remove(dd
.rank
)
322 existing_slots
.append(p
)
328 # TODO: At some point we want to deploy daemons that are on offline hosts
329 # at what point we do this differs per daemon type. Stateless daemons we could
330 # do quickly to improve availability. Stateful daemons we might want to wait longer
331 # to see if the host comes back online
333 existing
= existing_active
+ existing_standby
337 to_add
= [dd
for dd
in others
if dd
.hostname
not in [
338 h
.hostname
for h
in self
.unreachable_hosts
]]
340 # The number of new slots that need to be selected in order to fulfill count
341 need
= count
- len(existing
)
343 # we don't need any additional placements
345 to_remove
.extend(existing
[count
:])
346 del existing_slots
[count
:]
347 return self
.place_per_host_daemons(existing_slots
, [], to_remove
)
349 if self
.related_service_daemons
:
350 # prefer to put daemons on the same host(s) as daemons of the related service
351 # Note that we are only doing this over picking arbitrary hosts to satisfy
352 # the count. We are not breaking any deterministic placements in order to
353 # match the placement with a related service.
354 related_service_hosts
= list(set(dd
.hostname
for dd
in self
.related_service_daemons
))
355 matching_dps
= [dp
for dp
in others
if dp
.hostname
in related_service_hosts
]
356 for dp
in matching_dps
:
359 if dp
.hostname
in related_service_hosts
and dp
.hostname
not in [h
.hostname
for h
in self
.unreachable_hosts
]:
360 logger
.debug(f
'Preferring {dp.hostname} for service {self.service_name} as related daemons have been placed there')
362 need
-= 1 # this is last use of need so it can work as a counter
363 # at this point, we've either met our placement quota entirely using hosts with related
364 # service daemons, or we still need to place more. If we do need to place more,
365 # we should make sure not to re-use hosts with related service daemons by filtering
366 # them out from the "others" list
368 others
= [dp
for dp
in others
if dp
.hostname
not in related_service_hosts
]
373 if dp
.hostname
not in [h
.hostname
for h
in self
.unreachable_hosts
]:
375 need
-= 1 # this is last use of need in this function so it can work as a counter
377 if self
.rank_map
is not None:
378 # assign unused ranks (and rank_generations) to to_add
379 assert len(ranks
) >= len(to_add
)
380 for i
in range(len(to_add
)):
381 to_add
[i
] = to_add
[i
].assign_rank_generation(ranks
[i
], self
.rank_map
)
383 logger
.debug('Combine hosts with existing daemons %s + new hosts %s' % (existing
, to_add
))
384 return self
.place_per_host_daemons(existing_slots
+ to_add
, to_add
, to_remove
)
386 def find_ip_on_host(self
, hostname
: str, subnets
: List
[str]) -> Optional
[str]:
387 for subnet
in subnets
:
389 # following is to allow loopback interfaces for both ipv4 and ipv6. Since we
390 # only have the subnet (and no IP) we assume default loopback IP address.
391 if ipaddress
.ip_network(subnet
).is_loopback
:
392 if ipaddress
.ip_network(subnet
).version
== 4:
393 ips
.append('127.0.0.1')
396 for iface
, iface_ips
in self
.networks
.get(hostname
, {}).get(subnet
, {}).items():
397 ips
.extend(iface_ips
)
399 return sorted(ips
)[0]
402 def get_candidates(self
) -> List
[DaemonPlacement
]:
403 if self
.spec
.placement
.hosts
:
405 DaemonPlacement(daemon_type
=self
.primary_daemon_type
,
406 hostname
=h
.hostname
, network
=h
.network
, name
=h
.name
,
407 ports
=self
.ports_start
)
408 for h
in self
.spec
.placement
.hosts
if h
.hostname
not in [dh
.hostname
for dh
in self
.draining_hosts
]
410 elif self
.spec
.placement
.label
:
412 DaemonPlacement(daemon_type
=self
.primary_daemon_type
,
413 hostname
=x
.hostname
, ports
=self
.ports_start
)
414 for x
in self
.hosts_by_label(self
.spec
.placement
.label
)
416 elif self
.spec
.placement
.host_pattern
:
418 DaemonPlacement(daemon_type
=self
.primary_daemon_type
,
419 hostname
=x
, ports
=self
.ports_start
)
420 for x
in self
.spec
.placement
.filter_matching_hostspecs(self
.hosts
)
423 self
.spec
.placement
.count
is not None
424 or self
.spec
.placement
.count_per_host
is not None
427 DaemonPlacement(daemon_type
=self
.primary_daemon_type
,
428 hostname
=x
.hostname
, ports
=self
.ports_start
)
432 raise OrchestratorValidationError(
433 "placement spec is empty: no hosts, no label, no pattern, no count")
436 if self
.spec
.networks
:
440 ip
= self
.find_ip_on_host(p
.hostname
, self
.spec
.networks
)
442 ls
.append(DaemonPlacement(daemon_type
=self
.primary_daemon_type
,
443 hostname
=p
.hostname
, network
=p
.network
,
444 name
=p
.name
, ports
=p
.ports
, ip
=ip
))
447 f
'Skipping {p.hostname} with no IP in network(s) {self.spec.networks}'
450 if self
.filter_new_host
:
454 if self
.filter_new_host(h
.hostname
, self
.spec
):
456 if len(old
) > len(ls
):
457 logger
.debug('Filtered %s down to %s' % (old
, ls
))
459 # now that we have the list of nodes candidates based on the configured
460 # placement, let's shuffle the list for node pseudo-random selection. For this,
461 # we generate a seed from the service name and we use to shuffle the candidates.
462 # This makes shuffling deterministic for the same service name.
464 hashlib
.sha1(self
.spec
.service_name().encode('utf-8')).hexdigest(),
466 ) % (2 ** 32) # truncate result to 32 bits
468 random
.Random(seed
).shuffle(final
)
471 def remove_non_maintenance_unreachable_candidates(self
, candidates
: List
[DaemonPlacement
]) -> List
[DaemonPlacement
]:
472 in_maintenance
: Dict
[str, bool] = {}
474 if h
.status
.lower() == 'maintenance':
475 in_maintenance
[h
.hostname
] = True
477 in_maintenance
[h
.hostname
] = False
478 unreachable_hosts
= [h
.hostname
for h
in self
.unreachable_hosts
]
480 c
for c
in candidates
if c
.hostname
not in unreachable_hosts
or in_maintenance
[c
.hostname
]]