3 from typing
import List
, Optional
, Callable
, TypeVar
, Tuple
, NamedTuple
, Dict
6 from ceph
.deployment
.service_spec
import ServiceSpec
7 from orchestrator
._interface
import DaemonDescription
8 from orchestrator
import OrchestratorValidationError
10 logger
= logging
.getLogger(__name__
)
14 class DaemonPlacement(NamedTuple
):
17 network
: str = '' # for mons only
19 ip
: Optional
[str] = None
22 def __str__(self
) -> str:
23 res
= self
.daemon_type
+ ':' + self
.hostname
26 other
.append(f
'network={self.network}')
28 other
.append(f
'name={self.name}')
30 other
.append(f
'{self.ip or "*"}:{self.ports[0] if len(self.ports) == 1 else ",".join(map(str, self.ports))}')
32 res
+= '(' + ' '.join(other
) + ')'
35 def renumber_ports(self
, n
: int) -> 'DaemonPlacement':
36 return DaemonPlacement(
42 [p
+ n
for p
in self
.ports
],
45 def matches_daemon(self
, dd
: DaemonDescription
) -> bool:
46 if self
.daemon_type
!= dd
.daemon_type
:
48 if self
.hostname
!= dd
.hostname
:
50 # fixme: how to match against network?
51 if self
.name
and self
.name
!= dd
.daemon_id
:
54 if self
.ports
!= dd
.ports
:
61 class HostAssignment(object):
64 spec
, # type: ServiceSpec
65 hosts
: List
[orchestrator
.HostSpec
],
66 daemons
: List
[orchestrator
.DaemonDescription
],
67 networks
: Dict
[str, Dict
[str, Dict
[str, List
[str]]]] = {},
68 filter_new_host
=None, # type: Optional[Callable[[str],bool]]
69 allow_colo
: bool = False,
70 primary_daemon_type
: Optional
[str] = None,
71 per_host_daemon_type
: Optional
[str] = None,
74 self
.spec
= spec
# type: ServiceSpec
75 self
.primary_daemon_type
= primary_daemon_type
or spec
.service_type
76 self
.hosts
: List
[orchestrator
.HostSpec
] = hosts
77 self
.filter_new_host
= filter_new_host
78 self
.service_name
= spec
.service_name()
79 self
.daemons
= daemons
80 self
.networks
= networks
81 self
.allow_colo
= allow_colo
82 self
.per_host_daemon_type
= per_host_daemon_type
83 self
.ports_start
= spec
.get_port_start()
85 def hosts_by_label(self
, label
: str) -> List
[orchestrator
.HostSpec
]:
86 return [h
for h
in self
.hosts
if label
in h
.labels
]
88 def get_hostnames(self
) -> List
[str]:
89 return [h
.hostname
for h
in self
.hosts
]
91 def validate(self
) -> None:
94 if self
.spec
.placement
.count
== 0:
95 raise OrchestratorValidationError(
96 f
'<count> can not be 0 for {self.spec.one_line_str()}')
99 self
.spec
.placement
.count_per_host
is not None
100 and self
.spec
.placement
.count_per_host
> 1
101 and not self
.allow_colo
103 raise OrchestratorValidationError(
104 f
'Cannot place more than one {self.spec.service_type} per host'
107 if self
.spec
.placement
.hosts
:
108 explicit_hostnames
= {h
.hostname
for h
in self
.spec
.placement
.hosts
}
109 unknown_hosts
= explicit_hostnames
.difference(set(self
.get_hostnames()))
111 raise OrchestratorValidationError(
112 f
'Cannot place {self.spec.one_line_str()} on {", ".join(sorted(unknown_hosts))}: Unknown hosts')
114 if self
.spec
.placement
.host_pattern
:
115 pattern_hostnames
= self
.spec
.placement
.filter_matching_hostspecs(self
.hosts
)
116 if not pattern_hostnames
:
117 raise OrchestratorValidationError(
118 f
'Cannot place {self.spec.one_line_str()}: No matching hosts')
120 if self
.spec
.placement
.label
:
121 label_hosts
= self
.hosts_by_label(self
.spec
.placement
.label
)
123 raise OrchestratorValidationError(
124 f
'Cannot place {self.spec.one_line_str()}: No matching '
125 f
'hosts for label {self.spec.placement.label}')
127 def place_per_host_daemons(
129 slots
: List
[DaemonPlacement
],
130 to_add
: List
[DaemonPlacement
],
131 to_remove
: List
[orchestrator
.DaemonDescription
],
132 ) -> Tuple
[List
[DaemonPlacement
], List
[DaemonPlacement
], List
[orchestrator
.DaemonDescription
]]:
133 if self
.per_host_daemon_type
:
135 DaemonPlacement(daemon_type
=self
.per_host_daemon_type
,
137 for hostname
in set([s
.hostname
for s
in slots
])
140 d
for d
in self
.daemons
if d
.daemon_type
== self
.per_host_daemon_type
146 if p
.matches_daemon(dd
):
154 return slots
, to_add
, to_remove
157 # type: () -> Tuple[List[DaemonPlacement], List[DaemonPlacement], List[orchestrator.DaemonDescription]]
159 Generate a list of HostPlacementSpec taking into account:
162 * hosts with existing daemons
164 * self.filter_new_host
169 count
= self
.spec
.placement
.count
171 # get candidate hosts based on [hosts, label, host_pattern]
172 candidates
= self
.get_candidates() # type: List[DaemonPlacement]
174 def expand_candidates(ls
: List
[DaemonPlacement
], num
: int) -> List
[DaemonPlacement
]:
176 for offset
in range(num
):
177 r
.extend([dp
.renumber_ports(offset
) for dp
in ls
])
180 # consider enough slots to fulfill target count-per-host or count
182 if self
.spec
.placement
.count_per_host
:
183 per_host
= self
.spec
.placement
.count_per_host
186 candidates
= expand_candidates(candidates
, per_host
)
187 elif self
.allow_colo
and candidates
:
188 per_host
= 1 + ((count
- 1) // len(candidates
))
189 candidates
= expand_candidates(candidates
, per_host
)
191 # consider active (primary) daemons first
193 d
for d
in self
.daemons
if d
.is_active
and d
.daemon_type
== self
.primary_daemon_type
195 d
for d
in self
.daemons
if not d
.is_active
and d
.daemon_type
== self
.primary_daemon_type
198 # sort candidates into existing/used slots that already have a
199 # daemon, and others (the rest)
200 existing_active
: List
[orchestrator
.DaemonDescription
] = []
201 existing_standby
: List
[orchestrator
.DaemonDescription
] = []
202 existing_slots
: List
[DaemonPlacement
] = []
203 to_remove
: List
[orchestrator
.DaemonDescription
] = []
204 others
= candidates
.copy()
208 if p
.matches_daemon(dd
):
211 existing_active
.append(dd
)
213 existing_standby
.append(dd
)
214 existing_slots
.append(p
)
220 existing
= existing_active
+ existing_standby
222 # If we don't have <count> the list of candidates is definitive.
224 logger
.debug('Provided hosts: %s' % candidates
)
225 return self
.place_per_host_daemons(candidates
, others
, to_remove
)
227 # The number of new slots that need to be selected in order to fulfill count
228 need
= count
- len(existing
)
230 # we don't need any additional placements
232 to_remove
.extend(existing
[count
:])
233 del existing_slots
[count
:]
234 return self
.place_per_host_daemons(existing_slots
, [], to_remove
)
236 # ask the scheduler to select additional slots
237 to_add
= others
[:need
]
238 logger
.debug('Combine hosts with existing daemons %s + new hosts %s' % (
240 return self
.place_per_host_daemons(existing_slots
+ to_add
, to_add
, to_remove
)
242 def find_ip_on_host(self
, hostname
: str, subnets
: List
[str]) -> Optional
[str]:
243 for subnet
in subnets
:
245 for iface
, ips
in self
.networks
.get(hostname
, {}).get(subnet
, {}).items():
248 return sorted(ips
)[0]
251 def get_candidates(self
) -> List
[DaemonPlacement
]:
252 if self
.spec
.placement
.hosts
:
254 DaemonPlacement(daemon_type
=self
.primary_daemon_type
,
255 hostname
=h
.hostname
, network
=h
.network
, name
=h
.name
,
256 ports
=self
.ports_start
)
257 for h
in self
.spec
.placement
.hosts
259 elif self
.spec
.placement
.label
:
261 DaemonPlacement(daemon_type
=self
.primary_daemon_type
,
262 hostname
=x
.hostname
, ports
=self
.ports_start
)
263 for x
in self
.hosts_by_label(self
.spec
.placement
.label
)
265 elif self
.spec
.placement
.host_pattern
:
267 DaemonPlacement(daemon_type
=self
.primary_daemon_type
,
268 hostname
=x
, ports
=self
.ports_start
)
269 for x
in self
.spec
.placement
.filter_matching_hostspecs(self
.hosts
)
272 self
.spec
.placement
.count
is not None
273 or self
.spec
.placement
.count_per_host
is not None
276 DaemonPlacement(daemon_type
=self
.primary_daemon_type
,
277 hostname
=x
.hostname
, ports
=self
.ports_start
)
281 raise OrchestratorValidationError(
282 "placement spec is empty: no hosts, no label, no pattern, no count")
285 if self
.spec
.networks
:
289 ip
= self
.find_ip_on_host(p
.hostname
, self
.spec
.networks
)
291 ls
.append(DaemonPlacement(daemon_type
=self
.primary_daemon_type
,
292 hostname
=p
.hostname
, network
=p
.network
,
293 name
=p
.name
, ports
=p
.ports
, ip
=ip
))
296 f
'Skipping {p.hostname} with no IP in network(s) {self.spec.networks}'
299 if self
.filter_new_host
:
303 if self
.filter_new_host(h
.hostname
):
307 f
"Filtered out host {h.hostname}: could not verify host allowed virtual ips")
308 if len(old
) > len(ls
):
309 logger
.debug('Filtered %s down to %s' % (old
, ls
))
311 # shuffle for pseudo random selection
312 # gen seed off of self.spec to make shuffling deterministic
313 seed
= hash(self
.spec
.service_name())
314 random
.Random(seed
).shuffle(ls
)