import fnmatch
+import os
import re
import enum
from collections import OrderedDict
_service_spec_from_json_validate = True
+class CustomConfig:
+ """
+ Class to specify custom config files to be mounted in daemon's container
+ """
+
+ _fields = ['content', 'mount_path']
+
+ def __init__(self, content: str, mount_path: str) -> None:
+ self.content: str = content
+ self.mount_path: str = mount_path
+ self.validate()
+
+ def to_json(self) -> Dict[str, Any]:
+ return {
+ 'content': self.content,
+ 'mount_path': self.mount_path,
+ }
+
+ @classmethod
+ def from_json(cls, data: Dict[str, Any]) -> "CustomConfig":
+ for k in cls._fields:
+ if k not in data:
+ raise SpecValidationError(f'CustomConfig must have "{k}" field')
+ for k in data.keys():
+ if k not in cls._fields:
+ raise SpecValidationError(f'CustomConfig got unknown field "{k}"')
+ return cls(**data)
+
+ @property
+ def filename(self) -> str:
+ return os.path.basename(self.mount_path)
+
+ def __eq__(self, other: Any) -> bool:
+ if isinstance(other, CustomConfig):
+ return (
+ self.content == other.content
+ and self.mount_path == other.mount_path
+ )
+ return NotImplemented
+
+ def __repr__(self) -> str:
+ return f'CustomConfig({self.mount_path})'
+
+ def validate(self) -> None:
+ if not isinstance(self.content, str):
+ raise SpecValidationError(
+ f'CustomConfig content must be a string. Got {type(self.content)}')
+ if not isinstance(self.mount_path, str):
+ raise SpecValidationError(
+ f'CustomConfig content must be a string. Got {type(self.mount_path)}')
+
+
@contextmanager
def service_spec_allow_invalid_from_json() -> Iterator[None]:
"""
preview_only: bool = False,
networks: Optional[List[str]] = None,
extra_container_args: Optional[List[str]] = None,
+ custom_configs: Optional[List[CustomConfig]] = None,
):
#: See :ref:`orchestrator-cli-placement-spec`.
self.config = {k.replace(' ', '_'): v for k, v in config.items()}
self.extra_container_args: Optional[List[str]] = extra_container_args
+ self.custom_configs: Optional[List[CustomConfig]] = custom_configs
@classmethod
@handle_type_error
for k, v in json_spec.items():
if k == 'placement':
v = PlacementSpec.from_json(v)
+ if k == 'custom_configs':
+ v = [CustomConfig.from_json(c) for c in v]
if k == 'spec':
args.update(v)
continue
ret['networks'] = self.networks
if self.extra_container_args:
ret['extra_container_args'] = self.extra_container_args
+ if self.custom_configs:
+ ret['custom_configs'] = [c.to_json() for c in self.custom_configs]
c = {}
for key, val in sorted(self.__dict__.items(), key=lambda tpl: tpl[0]):
networks: Optional[List[str]] = None,
port: Optional[int] = None,
extra_container_args: Optional[List[str]] = None,
+ custom_configs: Optional[List[CustomConfig]] = None,
):
assert service_type == 'nfs'
super(NFSServiceSpec, self).__init__(
'nfs', service_id=service_id,
placement=placement, unmanaged=unmanaged, preview_only=preview_only,
- config=config, networks=networks, extra_container_args=extra_container_args)
+ config=config, networks=networks, extra_container_args=extra_container_args,
+ custom_configs=custom_configs)
self.port = port
networks: Optional[List[str]] = None,
subcluster: Optional[str] = None, # legacy, only for from_json on upgrade
extra_container_args: Optional[List[str]] = None,
+ custom_configs: Optional[List[CustomConfig]] = None,
):
assert service_type == 'rgw', service_type
'rgw', service_id=service_id,
placement=placement, unmanaged=unmanaged,
preview_only=preview_only, config=config, networks=networks,
- extra_container_args=extra_container_args)
+ extra_container_args=extra_container_args, custom_configs=custom_configs)
#: The RGW realm associated with this service. Needs to be manually created
self.rgw_realm: Optional[str] = rgw_realm
config: Optional[Dict[str, str]] = None,
networks: Optional[List[str]] = None,
extra_container_args: Optional[List[str]] = None,
+ custom_configs: Optional[List[CustomConfig]] = None,
):
assert service_type == 'iscsi'
super(IscsiServiceSpec, self).__init__('iscsi', service_id=service_id,
placement=placement, unmanaged=unmanaged,
preview_only=preview_only,
config=config, networks=networks,
- extra_container_args=extra_container_args)
+ extra_container_args=extra_container_args,
+ custom_configs=custom_configs)
#: RADOS pool where ceph-iscsi config data is stored.
self.pool = pool
enable_stats: Optional[bool] = None,
keepalived_password: Optional[str] = None,
virtual_ip: Optional[str] = None,
+ virtual_ips_list: Optional[List[str]] = None,
virtual_interface_networks: Optional[List[str]] = [],
unmanaged: bool = False,
ssl: bool = False,
extra_container_args: Optional[List[str]] = None,
+ custom_configs: Optional[List[CustomConfig]] = None,
):
assert service_type == 'ingress'
+
super(IngressSpec, self).__init__(
'ingress', service_id=service_id,
placement=placement, config=config,
networks=networks,
- extra_container_args=extra_container_args
+ extra_container_args=extra_container_args,
+ custom_configs=custom_configs
)
self.backend_service = backend_service
self.frontend_port = frontend_port
self.monitor_password = monitor_password
self.keepalived_password = keepalived_password
self.virtual_ip = virtual_ip
+ self.virtual_ips_list = virtual_ips_list
self.virtual_interface_networks = virtual_interface_networks or []
self.unmanaged = unmanaged
self.ssl = ssl
if not self.monitor_port:
raise SpecValidationError(
'Cannot add ingress: No monitor_port specified')
- if not self.virtual_ip:
+ if not self.virtual_ip and not self.virtual_ips_list:
raise SpecValidationError(
'Cannot add ingress: No virtual_ip provided')
+ if self.virtual_ip is not None and self.virtual_ips_list is not None:
+ raise SpecValidationError(
+ 'Cannot add ingress: Single and multiple virtual IPs specified')
yaml.add_representer(IngressSpec, ServiceSpec.yaml_representer)
preview_only: bool = False,
port: Optional[int] = None,
extra_container_args: Optional[List[str]] = None,
+ custom_configs: Optional[List[CustomConfig]] = None,
):
assert service_type in ['grafana', 'node-exporter', 'prometheus', 'alertmanager',
'loki', 'promtail']
service_type, service_id,
placement=placement, unmanaged=unmanaged,
preview_only=preview_only, config=config,
- networks=networks, extra_container_args=extra_container_args)
+ networks=networks, extra_container_args=extra_container_args,
+ custom_configs=custom_configs)
self.service_type = service_type
self.port = port
port: Optional[int] = None,
secure: bool = False,
extra_container_args: Optional[List[str]] = None,
+ custom_configs: Optional[List[CustomConfig]] = None,
):
assert service_type == 'alertmanager'
super(AlertManagerSpec, self).__init__(
'alertmanager', service_id=service_id,
placement=placement, unmanaged=unmanaged,
preview_only=preview_only, config=config, networks=networks, port=port,
- extra_container_args=extra_container_args)
+ extra_container_args=extra_container_args, custom_configs=custom_configs)
# Custom configuration.
#
port: Optional[int] = None,
initial_admin_password: Optional[str] = None,
extra_container_args: Optional[List[str]] = None,
+ custom_configs: Optional[List[CustomConfig]] = None,
):
assert service_type == 'grafana'
super(GrafanaSpec, self).__init__(
'grafana', service_id=service_id,
placement=placement, unmanaged=unmanaged,
preview_only=preview_only, config=config, networks=networks, port=port,
- extra_container_args=extra_container_args)
+ extra_container_args=extra_container_args, custom_configs=custom_configs)
self.initial_admin_password = initial_admin_password
preview_only: bool = False,
port: Optional[int] = None,
extra_container_args: Optional[List[str]] = None,
+ custom_configs: Optional[List[CustomConfig]] = None,
):
assert service_type == 'snmp-gateway'
placement=placement,
unmanaged=unmanaged,
preview_only=preview_only,
- extra_container_args=extra_container_args)
+ extra_container_args=extra_container_args,
+ custom_configs=custom_configs)
self.service_type = service_type
self.snmp_version = snmp_version
unmanaged: bool = False,
preview_only: bool = False,
extra_container_args: Optional[List[str]] = None,
+ custom_configs: Optional[List[CustomConfig]] = None,
):
assert service_type == 'mds'
super(MDSSpec, self).__init__('mds', service_id=service_id,
config=config,
unmanaged=unmanaged,
preview_only=preview_only,
- extra_container_args=extra_container_args)
+ extra_container_args=extra_container_args,
+ custom_configs=custom_configs)
def validate(self) -> None:
super(MDSSpec, self).validate()
yaml.add_representer(MDSSpec, ServiceSpec.yaml_representer)
+
+
+class TunedProfileSpec():
+ def __init__(self,
+ profile_name: str,
+ placement: Optional[PlacementSpec] = None,
+ settings: Optional[Dict[str, str]] = None,
+ ):
+ self.profile_name = profile_name
+ self.placement = placement or PlacementSpec(host_pattern='*')
+ self.settings = settings or {}
+ self._last_updated: str = ''
+
+ @classmethod
+ def from_json(cls, spec: Dict[str, Any]) -> 'TunedProfileSpec':
+ data = {}
+ if 'profile_name' not in spec:
+ raise SpecValidationError('Tuned profile spec must include "profile_name" field')
+ data['profile_name'] = spec['profile_name']
+ if not isinstance(data['profile_name'], str):
+ raise SpecValidationError('"profile_name" field must be a string')
+ if 'placement' in spec:
+ data['placement'] = PlacementSpec.from_json(spec['placement'])
+ if 'settings' in spec:
+ data['settings'] = spec['settings']
+ return cls(**data)
+
+ def to_json(self) -> Dict[str, Any]:
+ res: Dict[str, Any] = {}
+ res['profile_name'] = self.profile_name
+ res['placement'] = self.placement.to_json()
+ res['settings'] = self.settings
+ return res
+
+ def __eq__(self, other: Any) -> bool:
+ if isinstance(other, TunedProfileSpec):
+ if (
+ self.placement == other.placement
+ and self.profile_name == other.profile_name
+ and self.settings == other.settings
+ ):
+ return True
+ return False
+ return NotImplemented
+
+ def __repr__(self) -> str:
+ return f'TunedProfile({self.profile_name})'
+
+ def copy(self) -> 'TunedProfileSpec':
+ # for making deep copies so you can edit the settings in one without affecting the other
+ # mostly for testing purposes
+ return TunedProfileSpec(self.profile_name, self.placement, self.settings.copy())