5 from typing
import List
, Optional
, Callable
, TypeVar
, Tuple
, NamedTuple
, Dict
8 from ceph
.deployment
.service_spec
import ServiceSpec
9 from orchestrator
._interface
import DaemonDescription
10 from orchestrator
import OrchestratorValidationError
11 from .utils
import RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES
13 logger
= logging
.getLogger(__name__
)
17 class DaemonPlacement(NamedTuple
):
20 network
: str = '' # for mons only
22 ip
: Optional
[str] = None
24 rank
: Optional
[int] = None
25 rank_generation
: Optional
[int] = None
27 def __str__(self
) -> str:
28 res
= self
.daemon_type
+ ':' + self
.hostname
30 if self
.rank
is not None:
31 other
.append(f
'rank={self.rank}.{self.rank_generation}')
33 other
.append(f
'network={self.network}')
35 other
.append(f
'name={self.name}')
37 other
.append(f
'{self.ip or "*"}:{",".join(map(str, self.ports))}')
39 res
+= '(' + ' '.join(other
) + ')'
42 def renumber_ports(self
, n
: int) -> 'DaemonPlacement':
43 return DaemonPlacement(
49 [p
+ n
for p
in self
.ports
],
54 def assign_rank(self
, rank
: int, gen
: int) -> 'DaemonPlacement':
55 return DaemonPlacement(
66 def assign_name(self
, name
: str) -> 'DaemonPlacement':
67 return DaemonPlacement(
78 def assign_rank_generation(
81 rank_map
: Dict
[int, Dict
[int, Optional
[str]]]
82 ) -> 'DaemonPlacement':
83 if rank
not in rank_map
:
87 gen
= max(rank_map
[rank
].keys()) + 1
88 rank_map
[rank
][gen
] = None
89 return DaemonPlacement(
100 def matches_daemon(self
, dd
: DaemonDescription
) -> bool:
101 if self
.daemon_type
!= dd
.daemon_type
:
103 if self
.hostname
!= dd
.hostname
:
105 # fixme: how to match against network?
106 if self
.name
and self
.name
!= dd
.daemon_id
:
109 if self
.ports
!= dd
.ports
and dd
.ports
:
111 if self
.ip
!= dd
.ip
and dd
.ip
:
115 def matches_rank_map(
117 dd
: DaemonDescription
,
118 rank_map
: Optional
[Dict
[int, Dict
[int, Optional
[str]]]],
122 # daemon should have no rank
123 return dd
.rank
is None
128 if dd
.rank
not in rank_map
:
130 if dd
.rank
not in ranks
:
133 # must be the highest/newest rank_generation
134 if dd
.rank_generation
!= max(rank_map
[dd
.rank
].keys()):
137 # must be *this* daemon
138 return rank_map
[dd
.rank
][dd
.rank_generation
] == dd
.daemon_id
141 class HostAssignment(object):
144 spec
, # type: ServiceSpec
145 hosts
: List
[orchestrator
.HostSpec
],
146 unreachable_hosts
: List
[orchestrator
.HostSpec
],
147 draining_hosts
: List
[orchestrator
.HostSpec
],
148 daemons
: List
[orchestrator
.DaemonDescription
],
149 networks
: Dict
[str, Dict
[str, Dict
[str, List
[str]]]] = {},
150 filter_new_host
=None, # type: Optional[Callable[[str],bool]]
151 allow_colo
: bool = False,
152 primary_daemon_type
: Optional
[str] = None,
153 per_host_daemon_type
: Optional
[str] = None,
154 rank_map
: Optional
[Dict
[int, Dict
[int, Optional
[str]]]] = None,
157 self
.spec
= spec
# type: ServiceSpec
158 self
.primary_daemon_type
= primary_daemon_type
or spec
.service_type
159 self
.hosts
: List
[orchestrator
.HostSpec
] = hosts
160 self
.unreachable_hosts
: List
[orchestrator
.HostSpec
] = unreachable_hosts
161 self
.draining_hosts
: List
[orchestrator
.HostSpec
] = draining_hosts
162 self
.filter_new_host
= filter_new_host
163 self
.service_name
= spec
.service_name()
164 self
.daemons
= daemons
165 self
.networks
= networks
166 self
.allow_colo
= allow_colo
167 self
.per_host_daemon_type
= per_host_daemon_type
168 self
.ports_start
= spec
.get_port_start()
169 self
.rank_map
= rank_map
171 def hosts_by_label(self
, label
: str) -> List
[orchestrator
.HostSpec
]:
172 return [h
for h
in self
.hosts
if label
in h
.labels
]
174 def get_hostnames(self
) -> List
[str]:
175 return [h
.hostname
for h
in self
.hosts
]
177 def validate(self
) -> None:
180 if self
.spec
.placement
.count
== 0:
181 raise OrchestratorValidationError(
182 f
'<count> can not be 0 for {self.spec.one_line_str()}')
185 self
.spec
.placement
.count_per_host
is not None
186 and self
.spec
.placement
.count_per_host
> 1
187 and not self
.allow_colo
189 raise OrchestratorValidationError(
190 f
'Cannot place more than one {self.spec.service_type} per host'
193 if self
.spec
.placement
.hosts
:
194 explicit_hostnames
= {h
.hostname
for h
in self
.spec
.placement
.hosts
}
195 known_hosts
= self
.get_hostnames() + [h
.hostname
for h
in self
.draining_hosts
]
196 unknown_hosts
= explicit_hostnames
.difference(set(known_hosts
))
198 raise OrchestratorValidationError(
199 f
'Cannot place {self.spec.one_line_str()} on {", ".join(sorted(unknown_hosts))}: Unknown hosts')
201 if self
.spec
.placement
.host_pattern
:
202 pattern_hostnames
= self
.spec
.placement
.filter_matching_hostspecs(self
.hosts
)
203 if not pattern_hostnames
:
204 raise OrchestratorValidationError(
205 f
'Cannot place {self.spec.one_line_str()}: No matching hosts')
207 if self
.spec
.placement
.label
:
208 label_hosts
= self
.hosts_by_label(self
.spec
.placement
.label
)
210 raise OrchestratorValidationError(
211 f
'Cannot place {self.spec.one_line_str()}: No matching '
212 f
'hosts for label {self.spec.placement.label}')
214 def place_per_host_daemons(
216 slots
: List
[DaemonPlacement
],
217 to_add
: List
[DaemonPlacement
],
218 to_remove
: List
[orchestrator
.DaemonDescription
],
219 ) -> Tuple
[List
[DaemonPlacement
], List
[DaemonPlacement
], List
[orchestrator
.DaemonDescription
]]:
220 if self
.per_host_daemon_type
:
222 DaemonPlacement(daemon_type
=self
.per_host_daemon_type
,
224 for hostname
in set([s
.hostname
for s
in slots
])
227 d
for d
in self
.daemons
if d
.daemon_type
== self
.per_host_daemon_type
233 if p
.matches_daemon(dd
):
241 to_remove
= [d
for d
in to_remove
if d
.hostname
not in [
242 h
.hostname
for h
in self
.unreachable_hosts
]]
244 return slots
, to_add
, to_remove
247 # type: () -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]]
249 Generate a list of HostPlacementSpec taking into account:
252 * hosts with existing daemons
254 * self.filter_new_host
259 count
= self
.spec
.placement
.count
261 # get candidate hosts based on [hosts, label, host_pattern]
262 candidates
= self
.get_candidates() # type: List[DaemonPlacement]
263 if self
.primary_daemon_type
in RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES
:
264 # remove unreachable hosts that are not in maintenance so daemons
265 # on these hosts will be rescheduled
266 candidates
= self
.remove_non_maintenance_unreachable_candidates(candidates
)
268 def expand_candidates(ls
: List
[DaemonPlacement
], num
: int) -> List
[DaemonPlacement
]:
270 for offset
in range(num
):
271 r
.extend([dp
.renumber_ports(offset
) for dp
in ls
])
274 # consider enough slots to fulfill target count-per-host or count
276 if self
.spec
.placement
.count_per_host
:
277 per_host
= self
.spec
.placement
.count_per_host
280 candidates
= expand_candidates(candidates
, per_host
)
281 elif self
.allow_colo
and candidates
:
282 per_host
= 1 + ((count
- 1) // len(candidates
))
283 candidates
= expand_candidates(candidates
, per_host
)
285 # consider (preserve) existing daemons in a particular order...
288 d
for d
in self
.daemons
if d
.daemon_type
== self
.primary_daemon_type
291 not d
.is_active
, # active before standby
292 d
.rank
is not None, # ranked first, then non-ranked
294 0 - (d
.rank_generation
or 0), # newer generations first
298 # sort candidates into existing/used slots that already have a
299 # daemon, and others (the rest)
300 existing_active
: List
[orchestrator
.DaemonDescription
] = []
301 existing_standby
: List
[orchestrator
.DaemonDescription
] = []
302 existing_slots
: List
[DaemonPlacement
] = []
303 to_add
: List
[DaemonPlacement
] = []
304 to_remove
: List
[orchestrator
.DaemonDescription
] = []
305 ranks
: List
[int] = list(range(len(candidates
)))
306 others
: List
[DaemonPlacement
] = candidates
.copy()
310 if p
.matches_daemon(dd
) and p
.matches_rank_map(dd
, self
.rank_map
, ranks
):
313 existing_active
.append(dd
)
315 existing_standby
.append(dd
)
316 if dd
.rank
is not None:
317 assert dd
.rank_generation
is not None
318 p
= p
.assign_rank(dd
.rank
, dd
.rank_generation
)
319 ranks
.remove(dd
.rank
)
320 existing_slots
.append(p
)
326 # TODO: At some point we want to deploy daemons that are on offline hosts
327 # at what point we do this differs per daemon type. Stateless daemons we could
328 # do quickly to improve availability. Steful daemons we might want to wait longer
329 # to see if the host comes back online
331 existing
= existing_active
+ existing_standby
335 to_add
= [dd
for dd
in others
if dd
.hostname
not in [
336 h
.hostname
for h
in self
.unreachable_hosts
]]
338 # The number of new slots that need to be selected in order to fulfill count
339 need
= count
- len(existing
)
341 # we don't need any additional placements
343 to_remove
.extend(existing
[count
:])
344 del existing_slots
[count
:]
345 return self
.place_per_host_daemons(existing_slots
, [], to_remove
)
350 if dp
.hostname
not in [h
.hostname
for h
in self
.unreachable_hosts
]:
352 need
-= 1 # this is last use of need in this function so it can work as a counter
354 if self
.rank_map
is not None:
355 # assign unused ranks (and rank_generations) to to_add
356 assert len(ranks
) >= len(to_add
)
357 for i
in range(len(to_add
)):
358 to_add
[i
] = to_add
[i
].assign_rank_generation(ranks
[i
], self
.rank_map
)
360 logger
.debug('Combine hosts with existing daemons %s + new hosts %s' % (existing
, to_add
))
361 return self
.place_per_host_daemons(existing_slots
+ to_add
, to_add
, to_remove
)
363 def find_ip_on_host(self
, hostname
: str, subnets
: List
[str]) -> Optional
[str]:
364 for subnet
in subnets
:
366 # following is to allow loopback interfaces for both ipv4 and ipv6. Since we
367 # only have the subnet (and no IP) we assume default loopback IP address.
368 if ipaddress
.ip_network(subnet
).is_loopback
:
369 if ipaddress
.ip_network(subnet
).version
== 4:
370 ips
.append('127.0.0.1')
373 for iface
, iface_ips
in self
.networks
.get(hostname
, {}).get(subnet
, {}).items():
374 ips
.extend(iface_ips
)
376 return sorted(ips
)[0]
379 def get_candidates(self
) -> List
[DaemonPlacement
]:
380 if self
.spec
.placement
.hosts
:
382 DaemonPlacement(daemon_type
=self
.primary_daemon_type
,
383 hostname
=h
.hostname
, network
=h
.network
, name
=h
.name
,
384 ports
=self
.ports_start
)
385 for h
in self
.spec
.placement
.hosts
if h
.hostname
not in [dh
.hostname
for dh
in self
.draining_hosts
]
387 elif self
.spec
.placement
.label
:
389 DaemonPlacement(daemon_type
=self
.primary_daemon_type
,
390 hostname
=x
.hostname
, ports
=self
.ports_start
)
391 for x
in self
.hosts_by_label(self
.spec
.placement
.label
)
393 elif self
.spec
.placement
.host_pattern
:
395 DaemonPlacement(daemon_type
=self
.primary_daemon_type
,
396 hostname
=x
, ports
=self
.ports_start
)
397 for x
in self
.spec
.placement
.filter_matching_hostspecs(self
.hosts
)
400 self
.spec
.placement
.count
is not None
401 or self
.spec
.placement
.count_per_host
is not None
404 DaemonPlacement(daemon_type
=self
.primary_daemon_type
,
405 hostname
=x
.hostname
, ports
=self
.ports_start
)
409 raise OrchestratorValidationError(
410 "placement spec is empty: no hosts, no label, no pattern, no count")
413 if self
.spec
.networks
:
417 ip
= self
.find_ip_on_host(p
.hostname
, self
.spec
.networks
)
419 ls
.append(DaemonPlacement(daemon_type
=self
.primary_daemon_type
,
420 hostname
=p
.hostname
, network
=p
.network
,
421 name
=p
.name
, ports
=p
.ports
, ip
=ip
))
424 f
'Skipping {p.hostname} with no IP in network(s) {self.spec.networks}'
427 if self
.filter_new_host
:
431 if self
.filter_new_host(h
.hostname
):
433 if len(old
) > len(ls
):
434 logger
.debug('Filtered %s down to %s' % (old
, ls
))
436 # now that we have the list of nodes candidates based on the configured
437 # placement, let's shuffle the list for node pseudo-random selection. For this,
438 # we generate a seed from the service name and we use to shuffle the candidates.
439 # This makes shuffling deterministic for the same service name.
441 hashlib
.sha1(self
.spec
.service_name().encode('utf-8')).hexdigest(),
443 ) % (2 ** 32) # truncate result to 32 bits
445 random
.Random(seed
).shuffle(final
)
448 def remove_non_maintenance_unreachable_candidates(self
, candidates
: List
[DaemonPlacement
]) -> List
[DaemonPlacement
]:
449 in_maintenance
: Dict
[str, bool] = {}
451 if h
.status
.lower() == 'maintenance':
452 in_maintenance
[h
.hostname
] = True
454 in_maintenance
[h
.hostname
] = False
455 unreachable_hosts
= [h
.hostname
for h
in self
.unreachable_hosts
]
457 c
for c
in candidates
if c
.hostname
not in unreachable_hosts
or in_maintenance
[c
.hostname
]]