4 from typing
import List
, Optional
, Callable
, TypeVar
, Tuple
, NamedTuple
, Dict
7 from ceph
.deployment
.service_spec
import ServiceSpec
8 from orchestrator
._interface
import DaemonDescription
9 from orchestrator
import OrchestratorValidationError
11 logger
= logging
.getLogger(__name__
)
15 class DaemonPlacement(NamedTuple
):
18 network
: str = '' # for mons only
20 ip
: Optional
[str] = None
22 rank
: Optional
[int] = None
23 rank_generation
: Optional
[int] = None
25 def __str__(self
) -> str:
26 res
= self
.daemon_type
+ ':' + self
.hostname
28 if self
.rank
is not None:
29 other
.append(f
'rank={self.rank}.{self.rank_generation}')
31 other
.append(f
'network={self.network}')
33 other
.append(f
'name={self.name}')
35 other
.append(f
'{self.ip or "*"}:{",".join(map(str, self.ports))}')
37 res
+= '(' + ' '.join(other
) + ')'
40 def renumber_ports(self
, n
: int) -> 'DaemonPlacement':
41 return DaemonPlacement(
47 [p
+ n
for p
in self
.ports
],
52 def assign_rank(self
, rank
: int, gen
: int) -> 'DaemonPlacement':
53 return DaemonPlacement(
64 def assign_name(self
, name
: str) -> 'DaemonPlacement':
65 return DaemonPlacement(
76 def assign_rank_generation(
79 rank_map
: Dict
[int, Dict
[int, Optional
[str]]]
80 ) -> 'DaemonPlacement':
81 if rank
not in rank_map
:
85 gen
= max(rank_map
[rank
].keys()) + 1
86 rank_map
[rank
][gen
] = None
87 return DaemonPlacement(
98 def matches_daemon(self
, dd
: DaemonDescription
) -> bool:
99 if self
.daemon_type
!= dd
.daemon_type
:
101 if self
.hostname
!= dd
.hostname
:
103 # fixme: how to match against network?
104 if self
.name
and self
.name
!= dd
.daemon_id
:
107 if self
.ports
!= dd
.ports
:
113 def matches_rank_map(
115 dd
: DaemonDescription
,
116 rank_map
: Optional
[Dict
[int, Dict
[int, Optional
[str]]]],
120 # daemon should have no rank
121 return dd
.rank
is None
126 if dd
.rank
not in rank_map
:
128 if dd
.rank
not in ranks
:
131 # must be the highest/newest rank_generation
132 if dd
.rank_generation
!= max(rank_map
[dd
.rank
].keys()):
135 # must be *this* daemon
136 return rank_map
[dd
.rank
][dd
.rank_generation
] == dd
.daemon_id
139 class HostAssignment(object):
142 spec
, # type: ServiceSpec
143 hosts
: List
[orchestrator
.HostSpec
],
144 daemons
: List
[orchestrator
.DaemonDescription
],
145 networks
: Dict
[str, Dict
[str, Dict
[str, List
[str]]]] = {},
146 filter_new_host
=None, # type: Optional[Callable[[str],bool]]
147 allow_colo
: bool = False,
148 primary_daemon_type
: Optional
[str] = None,
149 per_host_daemon_type
: Optional
[str] = None,
150 rank_map
: Optional
[Dict
[int, Dict
[int, Optional
[str]]]] = None,
153 self
.spec
= spec
# type: ServiceSpec
154 self
.primary_daemon_type
= primary_daemon_type
or spec
.service_type
155 self
.hosts
: List
[orchestrator
.HostSpec
] = hosts
156 self
.filter_new_host
= filter_new_host
157 self
.service_name
= spec
.service_name()
158 self
.daemons
= daemons
159 self
.networks
= networks
160 self
.allow_colo
= allow_colo
161 self
.per_host_daemon_type
= per_host_daemon_type
162 self
.ports_start
= spec
.get_port_start()
163 self
.rank_map
= rank_map
165 def hosts_by_label(self
, label
: str) -> List
[orchestrator
.HostSpec
]:
166 return [h
for h
in self
.hosts
if label
in h
.labels
]
168 def get_hostnames(self
) -> List
[str]:
169 return [h
.hostname
for h
in self
.hosts
]
171 def validate(self
) -> None:
174 if self
.spec
.placement
.count
== 0:
175 raise OrchestratorValidationError(
176 f
'<count> can not be 0 for {self.spec.one_line_str()}')
179 self
.spec
.placement
.count_per_host
is not None
180 and self
.spec
.placement
.count_per_host
> 1
181 and not self
.allow_colo
183 raise OrchestratorValidationError(
184 f
'Cannot place more than one {self.spec.service_type} per host'
187 if self
.spec
.placement
.hosts
:
188 explicit_hostnames
= {h
.hostname
for h
in self
.spec
.placement
.hosts
}
189 unknown_hosts
= explicit_hostnames
.difference(set(self
.get_hostnames()))
191 raise OrchestratorValidationError(
192 f
'Cannot place {self.spec.one_line_str()} on {", ".join(sorted(unknown_hosts))}: Unknown hosts')
194 if self
.spec
.placement
.host_pattern
:
195 pattern_hostnames
= self
.spec
.placement
.filter_matching_hostspecs(self
.hosts
)
196 if not pattern_hostnames
:
197 raise OrchestratorValidationError(
198 f
'Cannot place {self.spec.one_line_str()}: No matching hosts')
200 if self
.spec
.placement
.label
:
201 label_hosts
= self
.hosts_by_label(self
.spec
.placement
.label
)
203 raise OrchestratorValidationError(
204 f
'Cannot place {self.spec.one_line_str()}: No matching '
205 f
'hosts for label {self.spec.placement.label}')
207 def place_per_host_daemons(
209 slots
: List
[DaemonPlacement
],
210 to_add
: List
[DaemonPlacement
],
211 to_remove
: List
[orchestrator
.DaemonDescription
],
212 ) -> Tuple
[List
[DaemonPlacement
], List
[DaemonPlacement
], List
[orchestrator
.DaemonDescription
]]:
213 if self
.per_host_daemon_type
:
215 DaemonPlacement(daemon_type
=self
.per_host_daemon_type
,
217 for hostname
in set([s
.hostname
for s
in slots
])
220 d
for d
in self
.daemons
if d
.daemon_type
== self
.per_host_daemon_type
226 if p
.matches_daemon(dd
):
234 return slots
, to_add
, to_remove
237 # type: () -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]]
239 Generate a list of HostPlacementSpec taking into account:
242 * hosts with existing daemons
244 * self.filter_new_host
249 count
= self
.spec
.placement
.count
251 # get candidate hosts based on [hosts, label, host_pattern]
252 candidates
= self
.get_candidates() # type: List[DaemonPlacement]
254 def expand_candidates(ls
: List
[DaemonPlacement
], num
: int) -> List
[DaemonPlacement
]:
256 for offset
in range(num
):
257 r
.extend([dp
.renumber_ports(offset
) for dp
in ls
])
260 # consider enough slots to fulfill target count-per-host or count
262 if self
.spec
.placement
.count_per_host
:
263 per_host
= self
.spec
.placement
.count_per_host
266 candidates
= expand_candidates(candidates
, per_host
)
267 elif self
.allow_colo
and candidates
:
268 per_host
= 1 + ((count
- 1) // len(candidates
))
269 candidates
= expand_candidates(candidates
, per_host
)
271 # consider (preserve) existing daemons in a particular order...
274 d
for d
in self
.daemons
if d
.daemon_type
== self
.primary_daemon_type
277 not d
.is_active
, # active before standby
278 d
.rank
is not None, # ranked first, then non-ranked
280 0 - (d
.rank_generation
or 0), # newer generations first
284 # sort candidates into existing/used slots that already have a
285 # daemon, and others (the rest)
286 existing_active
: List
[orchestrator
.DaemonDescription
] = []
287 existing_standby
: List
[orchestrator
.DaemonDescription
] = []
288 existing_slots
: List
[DaemonPlacement
] = []
289 to_remove
: List
[orchestrator
.DaemonDescription
] = []
290 ranks
: List
[int] = list(range(len(candidates
)))
291 others
: List
[DaemonPlacement
] = candidates
.copy()
295 if p
.matches_daemon(dd
) and p
.matches_rank_map(dd
, self
.rank_map
, ranks
):
298 existing_active
.append(dd
)
300 existing_standby
.append(dd
)
301 if dd
.rank
is not None:
302 assert dd
.rank_generation
is not None
303 p
= p
.assign_rank(dd
.rank
, dd
.rank_generation
)
304 ranks
.remove(dd
.rank
)
305 existing_slots
.append(p
)
311 existing
= existing_active
+ existing_standby
317 # The number of new slots that need to be selected in order to fulfill count
318 need
= count
- len(existing
)
320 # we don't need any additional placements
322 to_remove
.extend(existing
[count
:])
323 del existing_slots
[count
:]
324 return self
.place_per_host_daemons(existing_slots
, [], to_remove
)
327 to_add
= others
[:need
]
329 if self
.rank_map
is not None:
330 # assign unused ranks (and rank_generations) to to_add
331 assert len(ranks
) >= len(to_add
)
332 for i
in range(len(to_add
)):
333 to_add
[i
] = to_add
[i
].assign_rank_generation(ranks
[i
], self
.rank_map
)
335 # If we don't have <count> the list of candidates is definitive.
337 final
= existing_slots
+ to_add
338 logger
.debug('Provided hosts: %s' % final
)
339 return self
.place_per_host_daemons(final
, to_add
, to_remove
)
341 logger
.debug('Combine hosts with existing daemons %s + new hosts %s' % (
343 return self
.place_per_host_daemons(existing_slots
+ to_add
, to_add
, to_remove
)
345 def find_ip_on_host(self
, hostname
: str, subnets
: List
[str]) -> Optional
[str]:
346 for subnet
in subnets
:
348 for iface
, ips
in self
.networks
.get(hostname
, {}).get(subnet
, {}).items():
351 return sorted(ips
)[0]
354 def get_candidates(self
) -> List
[DaemonPlacement
]:
355 if self
.spec
.placement
.hosts
:
357 DaemonPlacement(daemon_type
=self
.primary_daemon_type
,
358 hostname
=h
.hostname
, network
=h
.network
, name
=h
.name
,
359 ports
=self
.ports_start
)
360 for h
in self
.spec
.placement
.hosts
362 elif self
.spec
.placement
.label
:
364 DaemonPlacement(daemon_type
=self
.primary_daemon_type
,
365 hostname
=x
.hostname
, ports
=self
.ports_start
)
366 for x
in self
.hosts_by_label(self
.spec
.placement
.label
)
368 elif self
.spec
.placement
.host_pattern
:
370 DaemonPlacement(daemon_type
=self
.primary_daemon_type
,
371 hostname
=x
, ports
=self
.ports_start
)
372 for x
in self
.spec
.placement
.filter_matching_hostspecs(self
.hosts
)
375 self
.spec
.placement
.count
is not None
376 or self
.spec
.placement
.count_per_host
is not None
379 DaemonPlacement(daemon_type
=self
.primary_daemon_type
,
380 hostname
=x
.hostname
, ports
=self
.ports_start
)
384 raise OrchestratorValidationError(
385 "placement spec is empty: no hosts, no label, no pattern, no count")
388 if self
.spec
.networks
:
392 ip
= self
.find_ip_on_host(p
.hostname
, self
.spec
.networks
)
394 ls
.append(DaemonPlacement(daemon_type
=self
.primary_daemon_type
,
395 hostname
=p
.hostname
, network
=p
.network
,
396 name
=p
.name
, ports
=p
.ports
, ip
=ip
))
399 f
'Skipping {p.hostname} with no IP in network(s) {self.spec.networks}'
402 if self
.filter_new_host
:
406 if self
.filter_new_host(h
.hostname
):
408 if len(old
) > len(ls
):
409 logger
.debug('Filtered %s down to %s' % (old
, ls
))
411 # shuffle for pseudo random selection
412 # gen seed off of self.spec to make shuffling deterministic
414 hashlib
.sha1(self
.spec
.service_name().encode('utf-8')).hexdigest(),
418 random
.Random(seed
).shuffle(final
)