+import hashlib
import logging
import random
from typing import List, Optional, Callable, TypeVar, Tuple, NamedTuple, Dict
name: str = ''
ip: Optional[str] = None
ports: List[int] = []
+ rank: Optional[int] = None
+ rank_generation: Optional[int] = None
def __str__(self) -> str:
res = self.daemon_type + ':' + self.hostname
other = []
+ if self.rank is not None:
+ other.append(f'rank={self.rank}.{self.rank_generation}')
if self.network:
other.append(f'network={self.network}')
if self.name:
other.append(f'name={self.name}')
if self.ports:
- other.append(f'{self.ip or "*"}:{self.ports[0] if len(self.ports) == 1 else ",".join(map(str, self.ports))}')
+ other.append(f'{self.ip or "*"}:{",".join(map(str, self.ports))}')
if other:
res += '(' + ' '.join(other) + ')'
return res
self.name,
self.ip,
[p + n for p in self.ports],
+ self.rank,
+ self.rank_generation,
+ )
+
+ def assign_rank(self, rank: int, gen: int) -> 'DaemonPlacement':
+ return DaemonPlacement(
+ self.daemon_type,
+ self.hostname,
+ self.network,
+ self.name,
+ self.ip,
+ self.ports,
+ rank,
+ gen,
+ )
+
+ def assign_name(self, name: str) -> 'DaemonPlacement':
+ return DaemonPlacement(
+ self.daemon_type,
+ self.hostname,
+ self.network,
+ name,
+ self.ip,
+ self.ports,
+ self.rank,
+ self.rank_generation,
+ )
+
+ def assign_rank_generation(
+ self,
+ rank: int,
+ rank_map: Dict[int, Dict[int, Optional[str]]]
+ ) -> 'DaemonPlacement':
+ if rank not in rank_map:
+ rank_map[rank] = {}
+ gen = 0
+ else:
+ gen = max(rank_map[rank].keys()) + 1
+ rank_map[rank][gen] = None
+ return DaemonPlacement(
+ self.daemon_type,
+ self.hostname,
+ self.network,
+ self.name,
+ self.ip,
+ self.ports,
+ rank,
+ gen,
)
def matches_daemon(self, dd: DaemonDescription) -> bool:
return False
return True
+ def matches_rank_map(
+ self,
+ dd: DaemonDescription,
+ rank_map: Optional[Dict[int, Dict[int, Optional[str]]]],
+ ranks: List[int]
+ ) -> bool:
+ if rank_map is None:
+ # daemon should have no rank
+ return dd.rank is None
+
+ if dd.rank is None:
+ return False
+
+ if dd.rank not in rank_map:
+ return False
+ if dd.rank not in ranks:
+ return False
+
+ # must be the highest/newest rank_generation
+ if dd.rank_generation != max(rank_map[dd.rank].keys()):
+ return False
+
+ # must be *this* daemon
+ return rank_map[dd.rank][dd.rank_generation] == dd.daemon_id
+
class HostAssignment(object):
allow_colo: bool = False,
primary_daemon_type: Optional[str] = None,
per_host_daemon_type: Optional[str] = None,
+ rank_map: Optional[Dict[int, Dict[int, Optional[str]]]] = None,
):
assert spec
self.spec = spec # type: ServiceSpec
self.allow_colo = allow_colo
self.per_host_daemon_type = per_host_daemon_type
self.ports_start = spec.get_port_start()
+ self.rank_map = rank_map
def hosts_by_label(self, label: str) -> List[orchestrator.HostSpec]:
return [h for h in self.hosts if label in h.labels]
per_host = 1 + ((count - 1) // len(candidates))
candidates = expand_candidates(candidates, per_host)
- # consider active (primary) daemons first
- daemons = [
- d for d in self.daemons if d.is_active and d.daemon_type == self.primary_daemon_type
- ] + [
- d for d in self.daemons if not d.is_active and d.daemon_type == self.primary_daemon_type
- ]
+ # consider (preserve) existing daemons in a particular order...
+ daemons = sorted(
+ [
+ d for d in self.daemons if d.daemon_type == self.primary_daemon_type
+ ],
+ key=lambda d: (
+ not d.is_active, # active before standby
+ d.rank is not None, # ranked first, then non-ranked
+ d.rank, # low ranks
+ 0 - (d.rank_generation or 0), # newer generations first
+ )
+ )
# sort candidates into existing/used slots that already have a
# daemon, and others (the rest)
existing_standby: List[orchestrator.DaemonDescription] = []
existing_slots: List[DaemonPlacement] = []
to_remove: List[orchestrator.DaemonDescription] = []
- others = candidates.copy()
+ ranks: List[int] = list(range(len(candidates)))
+ others: List[DaemonPlacement] = candidates.copy()
for dd in daemons:
found = False
for p in others:
- if p.matches_daemon(dd):
+ if p.matches_daemon(dd) and p.matches_rank_map(dd, self.rank_map, ranks):
others.remove(p)
if dd.is_active:
existing_active.append(dd)
else:
existing_standby.append(dd)
+ if dd.rank is not None:
+ assert dd.rank_generation is not None
+ p = p.assign_rank(dd.rank, dd.rank_generation)
+ ranks.remove(dd.rank)
existing_slots.append(p)
found = True
break
existing = existing_active + existing_standby
- # If we don't have <count> the list of candidates is definitive.
- if count is None:
- logger.debug('Provided hosts: %s' % candidates)
- return self.place_per_host_daemons(candidates, others, to_remove)
+ # build to_add
+ if not count:
+ to_add = others
+ else:
+ # The number of new slots that need to be selected in order to fulfill count
+ need = count - len(existing)
+
+ # we don't need any additional placements
+ if need <= 0:
+ to_remove.extend(existing[count:])
+ del existing_slots[count:]
+ return self.place_per_host_daemons(existing_slots, [], to_remove)
- # The number of new slots that need to be selected in order to fulfill count
- need = count - len(existing)
+ if need > 0:
+ to_add = others[:need]
- # we don't need any additional placements
- if need <= 0:
- to_remove.extend(existing[count:])
- del existing_slots[count:]
- return self.place_per_host_daemons(existing_slots, [], to_remove)
+ if self.rank_map is not None:
+ # assign unused ranks (and rank_generations) to to_add
+ assert len(ranks) >= len(to_add)
+ for i in range(len(to_add)):
+ to_add[i] = to_add[i].assign_rank_generation(ranks[i], self.rank_map)
+
+ # If we don't have <count> the list of candidates is definitive.
+ if count is None:
+ final = existing_slots + to_add
+ logger.debug('Provided hosts: %s' % final)
+ return self.place_per_host_daemons(final, to_add, to_remove)
- # ask the scheduler to select additional slots
- to_add = others[:need]
logger.debug('Combine hosts with existing daemons %s + new hosts %s' % (
existing, to_add))
return self.place_per_host_daemons(existing_slots + to_add, to_add, to_remove)
for h in old:
if self.filter_new_host(h.hostname):
ls.append(h)
- else:
- logger.info(
- f"Filtered out host {h.hostname}: could not verify host allowed virtual ips")
if len(old) > len(ls):
logger.debug('Filtered %s down to %s' % (old, ls))
# shuffle for pseudo random selection
# gen seed off of self.spec to make shuffling deterministic
- seed = hash(self.spec.service_name())
- random.Random(seed).shuffle(ls)
-
+ seed = int(
+ hashlib.sha1(self.spec.service_name().encode('utf-8')).hexdigest(),
+ 16
+ ) % (2 ** 32)
+ final = sorted(ls)
+ random.Random(seed).shuffle(final)
return ls