+from collections import OrderedDict
import errno
-try:
- from typing import Optional, List, Any
-except ImportError:
- pass # just for type checking
+import re
+from typing import Optional, List, Any, Dict
+
+
+def assert_valid_host(name: str) -> None:
+ p = re.compile('^[a-zA-Z0-9-]+$')
+ try:
+ assert len(name) <= 250, 'name is too long (max 250 chars)'
+ for part in name.split('.'):
+ assert len(part) > 0, '.-delimited name component must not be empty'
+ assert len(part) <= 63, '.-delimited name component must not be more than 63 chars'
+ assert p.match(part), 'name component must include only a-z, 0-9, and -'
+ except AssertionError as e:
+ raise SpecValidationError(str(e))
class SpecValidationError(Exception):
Information about hosts. Like e.g. ``kubectl get nodes``
"""
def __init__(self,
- hostname, # type: str
- addr=None, # type: Optional[str]
- labels=None, # type: Optional[List[str]]
- status=None, # type: Optional[str]
+ hostname: str,
+ addr: Optional[str] = None,
+ labels: Optional[List[str]] = None,
+ status: Optional[str] = None,
+ location: Optional[Dict[str, str]] = None,
):
self.service_type = 'host'
#: human readable status
self.status = status or '' # type: str
- def to_json(self) -> dict:
- return {
+ self.location = location
+
+ def validate(self) -> None:
+ assert_valid_host(self.hostname)
+
+ def to_json(self) -> Dict[str, Any]:
+ r: Dict[str, Any] = {
'hostname': self.hostname,
'addr': self.addr,
- 'labels': list(set((self.labels))),
+ 'labels': list(OrderedDict.fromkeys((self.labels))),
'status': self.status,
}
+ if self.location:
+ r['location'] = self.location
+ return r
@classmethod
def from_json(cls, host_spec: dict) -> 'HostSpec':
host_spec = cls.normalize_json(host_spec)
- _cls = cls(host_spec['hostname'],
- host_spec['addr'] if 'addr' in host_spec else None,
- list(set(host_spec['labels'])) if 'labels' in host_spec else None,
- host_spec['status'] if 'status' in host_spec else None)
+ _cls = cls(
+ host_spec['hostname'],
+ host_spec['addr'] if 'addr' in host_spec else None,
+ list(OrderedDict.fromkeys(
+ host_spec['labels'])) if 'labels' in host_spec else None,
+ host_spec['status'] if 'status' in host_spec else None,
+ host_spec.get('location'),
+ )
return _cls
@staticmethod
def normalize_json(host_spec: dict) -> dict:
labels = host_spec.get('labels')
- if labels is None:
- return host_spec
- if isinstance(labels, list):
- return host_spec
- if not isinstance(labels, str):
- raise SpecValidationError(f'Labels ({labels}) must be a string or list of strings')
- host_spec['labels'] = [labels]
+ if labels is not None:
+ if isinstance(labels, str):
+ host_spec['labels'] = [labels]
+ elif (
+ not isinstance(labels, list)
+ or any(not isinstance(v, str) for v in labels)
+ ):
+ raise SpecValidationError(
+ f'Labels ({labels}) must be a string or list of strings'
+ )
+
+ loc = host_spec.get('location')
+ if loc is not None:
+ if (
+ not isinstance(loc, dict)
+ or any(not isinstance(k, str) for k in loc.keys())
+ or any(not isinstance(v, str) for v in loc.values())
+ ):
+ raise SpecValidationError(
+ f'Location ({loc}) must be a dictionary of strings to strings'
+ )
+
return host_spec
def __repr__(self) -> str:
args.append(self.labels)
if self.status:
args.append(self.status)
+ if self.location:
+ args.append(self.location)
return "HostSpec({})".format(', '.join(map(repr, args)))
# Let's omit `status` for the moment, as it is still the very same host.
return self.hostname == other.hostname and \
self.addr == other.addr and \
- self.labels == other.labels
+ sorted(self.labels) == sorted(other.labels) and \
+ self.location == other.location