_service_spec_from_json_validate = True
+class ArgumentSpec:
+ """The ArgumentSpec type represents an argument that can be
+ passed to an underyling subsystem, like a container engine or
+ another command line tool.
+
+ The ArgumentSpec aims to be backwards compatible with the previous
+ form of argument, a single string. The string was always assumed
+ to be indentended to be split on spaces. For example:
+ `--cpus 8` becomes `["--cpus", "8"]`. This type is converted from
+ either a string or an json/yaml object. In the object form you
+ can choose if the string part should be split so an argument like
+ `--migrate-from=//192.168.5.22/My Documents` can be expressed.
+ """
+ _fields = ['argument', 'split']
+
+ class OriginalType(enum.Enum):
+ OBJECT = 0
+ STRING = 1
+
+ def __init__(
+ self,
+ argument: str,
+ split: bool = False,
+ *,
+ origin: OriginalType = OriginalType.OBJECT,
+ ) -> None:
+ self.argument = argument
+ self.split = bool(split)
+ # origin helps with round-tripping between inputs that
+ # are simple strings or objects (dicts)
+ self._origin = origin
+ self.validate()
+
+ def to_json(self) -> Union[str, Dict[str, Any]]:
+ """Return a json-safe represenation of the ArgumentSpec."""
+ if self._origin == self.OriginalType.STRING:
+ return self.argument
+ return {
+ 'argument': self.argument,
+ 'split': self.split,
+ }
+
+ def to_args(self) -> List[str]:
+ """Convert this ArgumentSpec into a list of arguments suitable for
+ adding to an argv-style command line.
+ """
+ if not self.split:
+ return [self.argument]
+ return [part for part in self.argument.split(" ") if part]
+
+ def __eq__(self, other: Any) -> bool:
+ if isinstance(other, ArgumentSpec):
+ return (
+ self.argument == other.argument
+ and self.split == other.split
+ )
+ if isinstance(other, object):
+ # This is a workaround for silly ceph mgr object/type identity
+ # mismatches due to multiple python interpreters in use.
+ try:
+ argument = getattr(other, 'argument')
+ split = getattr(other, 'split')
+ return (self.argument == argument and self.split == split)
+ except AttributeError:
+ pass
+ return NotImplemented
+
+ def __repr__(self) -> str:
+ return f'ArgumentSpec({self.argument!r}, {self.split!r})'
+
+ def validate(self) -> None:
+ if not isinstance(self.argument, str):
+ raise SpecValidationError(
+ f'ArgumentSpec argument must be a string. Got {type(self.argument)}')
+ if not isinstance(self.split, bool):
+ raise SpecValidationError(
+ f'ArgumentSpec split must be a boolean. Got {type(self.split)}')
+
+ @classmethod
+ def from_json(cls, data: Union[str, Dict[str, Any]]) -> "ArgumentSpec":
+ """Convert a json-object (dict) to an ArgumentSpec."""
+ if isinstance(data, str):
+ return cls(data, split=True, origin=cls.OriginalType.STRING)
+ if 'argument' not in data:
+ raise SpecValidationError(f'ArgumentSpec must have an "argument" field')
+ for k in data.keys():
+ if k not in cls._fields:
+ raise SpecValidationError(f'ArgumentSpec got an unknown field {k!r}')
+ return cls(**data)
+
+ @staticmethod
+ def map_json(
+ values: Optional["ArgumentList"]
+ ) -> Optional[List[Union[str, Dict[str, Any]]]]:
+ """Given a list of ArgumentSpec objects return a json-safe
+ representation.of them."""
+ if values is None:
+ return None
+ return [v.to_json() for v in values]
+
+ @classmethod
+ def from_general_args(cls, data: "GeneralArgList") -> "ArgumentList":
+ """Convert a list of strs, dicts, or existing ArgumentSpec objects
+ to a list of only ArgumentSpec objects.
+ """
+ out: ArgumentList = []
+ for item in data:
+ if isinstance(item, (str, dict)):
+ out.append(cls.from_json(item))
+ elif isinstance(item, cls):
+ out.append(item)
+ elif hasattr(item, 'to_json'):
+ # This is a workaround for silly ceph mgr object/type identity
+ # mismatches due to multiple python interpreters in use.
+ # It should be safe because we already have to be able to
+ # round-trip between json/yaml.
+ out.append(cls.from_json(item.to_json()))
+ else:
+ raise SpecValidationError(f"Unknown type for argument: {type(item)}")
+ return out
+
+
+ArgumentList = List[ArgumentSpec]
+GeneralArgList = List[Union[str, Dict[str, Any], "ArgumentSpec"]]
+
+
class ServiceSpec(object):
"""
Details of service creation.
Request to the orchestrator for a cluster of daemons
- such as MDS, RGW, iscsi gateway, MONs, MGRs, Prometheus
+ such as MDS, RGW, iscsi gateway, nvmeof gateway, MONs, MGRs, Prometheus
This structure is supposed to be enough information to
start the services.
"""
- KNOWN_SERVICE_TYPES = 'alertmanager crash grafana iscsi loki promtail mds mgr mon nfs ' \
+ KNOWN_SERVICE_TYPES = 'alertmanager crash grafana iscsi nvmeof loki promtail mds mgr mon nfs ' \
'node-exporter osd prometheus rbd-mirror rgw agent ceph-exporter ' \
'container ingress cephfs-mirror snmp-gateway jaeger-tracing ' \
'elasticsearch jaeger-agent jaeger-collector jaeger-query'.split()
- REQUIRES_SERVICE_ID = 'iscsi mds nfs rgw container ingress '.split()
+ REQUIRES_SERVICE_ID = 'iscsi nvmeof mds nfs rgw container ingress '.split()
MANAGED_CONFIG_OPTIONS = [
'mds_join_fs',
]
'osd': DriveGroupSpec,
'mds': MDSSpec,
'iscsi': IscsiServiceSpec,
+ 'nvmeof': NvmeofServiceSpec,
'alertmanager': AlertManagerSpec,
'ingress': IngressSpec,
'container': CustomContainerSpec,
unmanaged: bool = False,
preview_only: bool = False,
networks: Optional[List[str]] = None,
- extra_container_args: Optional[List[str]] = None,
- extra_entrypoint_args: Optional[List[str]] = None,
+ extra_container_args: Optional[GeneralArgList] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
custom_configs: Optional[List[CustomConfig]] = None,
):
#: ``prometheus``) or (``container``) for custom containers.
self.service_type = service_type
- #: The name of the service. Required for ``iscsi``, ``mds``, ``nfs``, ``osd``, ``rgw``,
- #: ``container``, ``ingress``
+ #: The name of the service. Required for ``iscsi``, ``nvmeof``, ``mds``, ``nfs``, ``osd``,
+ #: ``rgw``, ``container``, ``ingress``
self.service_id = None
if self.service_type in self.REQUIRES_SERVICE_ID or self.service_type == 'osd':
if config:
self.config = {k.replace(' ', '_'): v for k, v in config.items()}
- self.extra_container_args: Optional[List[str]] = extra_container_args
- self.extra_entrypoint_args: Optional[List[str]] = extra_entrypoint_args
+ self.extra_container_args: Optional[ArgumentList] = None
+ self.extra_entrypoint_args: Optional[ArgumentList] = None
+ if extra_container_args:
+ self.extra_container_args = ArgumentSpec.from_general_args(
+ extra_container_args)
+ if extra_entrypoint_args:
+ self.extra_entrypoint_args = ArgumentSpec.from_general_args(
+ extra_entrypoint_args)
self.custom_configs: Optional[List[CustomConfig]] = custom_configs
@classmethod
if self.networks:
ret['networks'] = self.networks
if self.extra_container_args:
- ret['extra_container_args'] = self.extra_container_args
+ ret['extra_container_args'] = ArgumentSpec.map_json(
+ self.extra_container_args
+ )
if self.extra_entrypoint_args:
- ret['extra_entrypoint_args'] = self.extra_entrypoint_args
+ ret['extra_entrypoint_args'] = ArgumentSpec.map_json(
+ self.extra_entrypoint_args
+ )
if self.custom_configs:
ret['custom_configs'] = [c.to_json() for c in self.custom_configs]
networks: Optional[List[str]] = None,
port: Optional[int] = None,
virtual_ip: Optional[str] = None,
- extra_container_args: Optional[List[str]] = None,
- extra_entrypoint_args: Optional[List[str]] = None,
+ extra_container_args: Optional[GeneralArgList] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
+ enable_haproxy_protocol: bool = False,
custom_configs: Optional[List[CustomConfig]] = None,
):
assert service_type == 'nfs'
self.port = port
self.virtual_ip = virtual_ip
+ self.enable_haproxy_protocol = enable_haproxy_protocol
def get_port_start(self) -> List[int]:
if self.port:
config: Optional[Dict[str, str]] = None,
networks: Optional[List[str]] = None,
subcluster: Optional[str] = None, # legacy, only for from_json on upgrade
- extra_container_args: Optional[List[str]] = None,
- extra_entrypoint_args: Optional[List[str]] = None,
+ extra_container_args: Optional[GeneralArgList] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
custom_configs: Optional[List[CustomConfig]] = None,
rgw_realm_token: Optional[str] = None,
update_endpoints: Optional[bool] = False,
yaml.add_representer(RGWSpec, ServiceSpec.yaml_representer)
+class NvmeofServiceSpec(ServiceSpec):
+ def __init__(self,
+ service_type: str = 'nvmeof',
+ service_id: Optional[str] = None,
+ name: Optional[str] = None,
+ group: Optional[str] = None,
+ port: Optional[int] = None,
+ pool: Optional[str] = None,
+ enable_auth: bool = False,
+ server_key: Optional[str] = None,
+ server_cert: Optional[str] = None,
+ client_key: Optional[str] = None,
+ client_cert: Optional[str] = None,
+ spdk_path: Optional[str] = None,
+ tgt_path: Optional[str] = None,
+ timeout: Optional[int] = 60,
+ conn_retries: Optional[int] = 10,
+ transports: Optional[str] = 'tcp',
+ transport_tcp_options: Optional[Dict[str, int]] =
+ {"in_capsule_data_size": 8192, "max_io_qpairs_per_ctrlr": 7},
+ tgt_cmd_extra_args: Optional[str] = None,
+ placement: Optional[PlacementSpec] = None,
+ unmanaged: bool = False,
+ preview_only: bool = False,
+ config: Optional[Dict[str, str]] = None,
+ networks: Optional[List[str]] = None,
+ extra_container_args: Optional[GeneralArgList] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
+ custom_configs: Optional[List[CustomConfig]] = None,
+ ):
+ assert service_type == 'nvmeof'
+ super(NvmeofServiceSpec, self).__init__('nvmeof', service_id=service_id,
+ placement=placement, unmanaged=unmanaged,
+ preview_only=preview_only,
+ config=config, networks=networks,
+ extra_container_args=extra_container_args,
+ extra_entrypoint_args=extra_entrypoint_args,
+ custom_configs=custom_configs)
+
+ #: RADOS pool where ceph-nvmeof config data is stored.
+ self.pool = pool
+ #: ``port`` port of the nvmeof gateway
+ self.port = port or 5500
+ #: ``name`` name of the nvmeof gateway
+ self.name = name
+ #: ``group`` name of the nvmeof gateway
+ self.group = group
+ #: ``enable_auth`` enables user authentication on nvmeof gateway
+ self.enable_auth = enable_auth
+ #: ``server_key`` gateway server key
+ self.server_key = server_key or './server.key'
+ #: ``server_cert`` gateway server certificate
+ self.server_cert = server_cert or './server.crt'
+ #: ``client_key`` client key
+ self.client_key = client_key or './client.key'
+ #: ``client_cert`` client certificate
+ self.client_cert = client_cert or './client.crt'
+ #: ``spdk_path`` path to SPDK
+ self.spdk_path = spdk_path or '/usr/local/bin/nvmf_tgt'
+ #: ``tgt_path`` nvmeof target path
+ self.tgt_path = tgt_path or '/usr/local/bin/nvmf_tgt'
+ #: ``timeout`` ceph connectivity timeout
+ self.timeout = timeout
+ #: ``conn_retries`` ceph connection retries number
+ self.conn_retries = conn_retries
+ #: ``transports`` tcp
+ self.transports = transports
+ #: List of extra arguments for transports in the form opt=value
+ self.transport_tcp_options: Optional[Dict[str, int]] = transport_tcp_options
+ #: ``tgt_cmd_extra_args`` extra arguments for the nvmf_tgt process
+ self.tgt_cmd_extra_args = tgt_cmd_extra_args
+
+ def get_port_start(self) -> List[int]:
+ return [5500, 4420, 8009]
+
+ def validate(self) -> None:
+ # TODO: what other parameters should be validated as part of this function?
+ super(NvmeofServiceSpec, self).validate()
+
+ if not self.pool:
+ raise SpecValidationError('Cannot add NVMEOF: No Pool specified')
+
+ if self.enable_auth:
+ if not any([self.server_key, self.server_cert, self.client_key, self.client_cert]):
+ raise SpecValidationError(
+ 'enable_auth is true but client/server certificates are missing')
+
+ if self.transports not in ['tcp']:
+ raise SpecValidationError('Invalid transport. Valid values are tcp')
+
+
+yaml.add_representer(NvmeofServiceSpec, ServiceSpec.yaml_representer)
+
+
class IscsiServiceSpec(ServiceSpec):
def __init__(self,
service_type: str = 'iscsi',
preview_only: bool = False,
config: Optional[Dict[str, str]] = None,
networks: Optional[List[str]] = None,
- extra_container_args: Optional[List[str]] = None,
- extra_entrypoint_args: Optional[List[str]] = None,
+ extra_container_args: Optional[GeneralArgList] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
custom_configs: Optional[List[CustomConfig]] = None,
):
assert service_type == 'iscsi'
virtual_ip: Optional[str] = None,
virtual_ips_list: Optional[List[str]] = None,
virtual_interface_networks: Optional[List[str]] = [],
+ use_keepalived_multicast: Optional[bool] = False,
+ vrrp_interface_network: Optional[str] = None,
+ first_virtual_router_id: Optional[int] = 50,
unmanaged: bool = False,
ssl: bool = False,
keepalive_only: bool = False,
- extra_container_args: Optional[List[str]] = None,
- extra_entrypoint_args: Optional[List[str]] = None,
+ extra_container_args: Optional[GeneralArgList] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
+ enable_haproxy_protocol: bool = False,
custom_configs: Optional[List[CustomConfig]] = None,
):
assert service_type == 'ingress'
self.virtual_ip = virtual_ip
self.virtual_ips_list = virtual_ips_list
self.virtual_interface_networks = virtual_interface_networks or []
+ self.use_keepalived_multicast = use_keepalived_multicast
+ self.vrrp_interface_network = vrrp_interface_network
+ self.first_virtual_router_id = first_virtual_router_id
self.unmanaged = unmanaged
self.ssl = ssl
self.keepalive_only = keepalive_only
+ self.enable_haproxy_protocol = enable_haproxy_protocol
def get_port_start(self) -> List[int]:
ports = []
preview_only: bool = False,
image: Optional[str] = None,
entrypoint: Optional[str] = None,
- extra_entrypoint_args: Optional[List[str]] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
uid: Optional[int] = None,
gid: Optional[int] = None,
volume_mounts: Optional[Dict[str, str]] = {},
- args: Optional[List[str]] = [], # args for the container runtime, not entrypoint
+ # args are for the container runtime, not entrypoint
+ args: Optional[GeneralArgList] = [],
envs: Optional[List[str]] = [],
privileged: Optional[bool] = False,
bind_mounts: Optional[List[List[str]]] = None,
unmanaged: bool = False,
preview_only: bool = False,
port: Optional[int] = None,
- extra_container_args: Optional[List[str]] = None,
- extra_entrypoint_args: Optional[List[str]] = None,
+ extra_container_args: Optional[GeneralArgList] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
custom_configs: Optional[List[CustomConfig]] = None,
):
assert service_type in ['grafana', 'node-exporter', 'prometheus', 'alertmanager',
networks: Optional[List[str]] = None,
port: Optional[int] = None,
secure: bool = False,
- extra_container_args: Optional[List[str]] = None,
- extra_entrypoint_args: Optional[List[str]] = None,
+ extra_container_args: Optional[GeneralArgList] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
custom_configs: Optional[List[CustomConfig]] = None,
):
assert service_type == 'alertmanager'
protocol: Optional[str] = 'https',
initial_admin_password: Optional[str] = None,
anonymous_access: Optional[bool] = True,
- extra_container_args: Optional[List[str]] = None,
- extra_entrypoint_args: Optional[List[str]] = None,
+ extra_container_args: Optional[GeneralArgList] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
custom_configs: Optional[List[CustomConfig]] = None,
):
assert service_type == 'grafana'
port: Optional[int] = None,
retention_time: Optional[str] = None,
retention_size: Optional[str] = None,
- extra_container_args: Optional[List[str]] = None,
- extra_entrypoint_args: Optional[List[str]] = None,
+ extra_container_args: Optional[GeneralArgList] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
custom_configs: Optional[List[CustomConfig]] = None,
):
assert service_type == 'prometheus'
unmanaged: bool = False,
preview_only: bool = False,
port: Optional[int] = None,
- extra_container_args: Optional[List[str]] = None,
- extra_entrypoint_args: Optional[List[str]] = None,
+ extra_container_args: Optional[GeneralArgList] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
custom_configs: Optional[List[CustomConfig]] = None,
):
assert service_type == 'snmp-gateway'
config: Optional[Dict[str, str]] = None,
unmanaged: bool = False,
preview_only: bool = False,
- extra_container_args: Optional[List[str]] = None,
- extra_entrypoint_args: Optional[List[str]] = None,
+ extra_container_args: Optional[GeneralArgList] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
custom_configs: Optional[List[CustomConfig]] = None,
):
assert service_type == 'mds'
unmanaged: bool = False,
preview_only: bool = False,
networks: Optional[List[str]] = None,
- extra_container_args: Optional[List[str]] = None,
+ extra_container_args: Optional[GeneralArgList] = None,
custom_configs: Optional[List[CustomConfig]] = None,
crush_locations: Optional[Dict[str, List[str]]] = None,
):
placement: Optional[PlacementSpec] = None,
unmanaged: bool = False,
preview_only: bool = False,
- extra_container_args: Optional[List[str]] = None,
+ extra_container_args: Optional[GeneralArgList] = None,
):
assert service_type == 'ceph-exporter'