8 from subprocess
import check_output
, CalledProcessError
10 from ceph
.deployment
.service_spec
import ServiceSpec
, NFSServiceSpec
, IscsiServiceSpec
13 from typing
import Callable
, List
, Sequence
, Tuple
17 from ceph
.deployment
import inventory
18 from ceph
.deployment
.drive_group
import DriveGroupSpec
19 from mgr_module
import CLICommand
, HandleCommandResult
20 from mgr_module
import MgrModule
23 from orchestrator
import handle_orch_error
, raise_if_exception
26 class TestOrchestrator(MgrModule
, orchestrator
.Orchestrator
):
28 This is an orchestrator implementation used for internal testing. It's meant for
29 development environments and integration testing.
31 It does not actually do anything.
33 The implementation is similar to the Rook orchestrator, but simpler.
36 @CLICommand('test_orchestrator load_data', perm
='w')
37 def _load_data(self
, inbuf
):
39 load dummy data into test orchestrator
42 data
= json
.loads(inbuf
)
44 return HandleCommandResult()
45 except json
.decoder
.JSONDecodeError
as e
:
46 msg
= 'Invalid JSON file: {}'.format(e
)
47 return HandleCommandResult(retval
=-errno
.EINVAL
, stderr
=msg
)
48 except orchestrator
.OrchestratorValidationError
as e
:
49 return HandleCommandResult(retval
=-errno
.EINVAL
, stderr
=str(e
))
54 def __init__(self
, *args
, **kwargs
):
55 super(TestOrchestrator
, self
).__init
__(*args
, **kwargs
)
57 self
._initialized
= threading
.Event()
58 self
._shutdown
= threading
.Event()
66 self
._initialized
.set()
68 while not self
._shutdown
.is_set():
69 self
._shutdown
.wait(5)
71 def _init_data(self
, data
=None):
72 self
._inventory
= [orchestrator
.InventoryHost
.from_json(inventory_host
)
73 for inventory_host
in data
.get('inventory', [])]
74 self
._services
= [orchestrator
.ServiceDescription
.from_json(service
)
75 for service
in data
.get('services', [])]
76 self
._daemons
= [orchestrator
.DaemonDescription
.from_json(daemon
)
77 for daemon
in data
.get('daemons', [])]
80 def get_inventory(self
, host_filter
=None, refresh
=False):
82 There is no guarantee which devices are returned by get_inventory.
84 if host_filter
and host_filter
.hosts
is not None:
85 assert isinstance(host_filter
.hosts
, list)
89 return list(filter(lambda host
: host
.name
in host_filter
.hosts
,
91 return self
._inventory
94 c_v_out
= check_output(['ceph-volume', 'inventory', '--format', 'json'])
97 . {tmpdir}/ceph-volume-virtualenv/bin/activate
98 ceph-volume inventory --format json
101 c_v_out
= check_output(cmd
.format(tmpdir
=os
.environ
.get('TMPDIR', '/tmp')), shell
=True)
102 except (OSError, CalledProcessError
):
103 c_v_out
= check_output(cmd
.format(tmpdir
='.'),shell
=True)
105 for out
in c_v_out
.splitlines():
107 devs
= inventory
.Devices
.from_json(json
.loads(out
))
108 return [orchestrator
.InventoryHost('localhost', devs
)]
109 self
.log
.error('c-v failed: ' + str(c_v_out
))
110 raise Exception('c-v failed')
112 def _get_ceph_daemons(self
):
113 # type: () -> List[orchestrator.DaemonDescription]
114 """ Return ceph daemons on the running host."""
115 types
= ("mds", "osd", "mon", "rgw", "mgr", "nfs", "iscsi")
116 out
= map(str, check_output(['ps', 'aux']).splitlines())
117 processes
= [p
for p
in out
if any(
118 [('ceph-{} '.format(t
) in p
) for t
in types
])]
123 m
= re
.search('ceph-([^ ]+)', p
)
125 _daemon_type
= m
.group(1)
127 raise AssertionError('Fail to determine daemon type from {}'.format(p
))
129 # parse daemon ID. Possible options: `-i <id>`, `--id=<id>`, `--id <id>`
130 patterns
= [r
'-i\s(\w+)', r
'--id[\s=](\w+)']
131 for pattern
in patterns
:
132 m
= re
.search(pattern
, p
)
134 daemon_id
= m
.group(1)
137 raise AssertionError('Fail to determine daemon ID from {}'.format(p
))
138 daemon
= orchestrator
.DaemonDescription(
139 daemon_type
=_daemon_type
, daemon_id
=daemon_id
, hostname
='localhost')
140 daemons
.append(daemon
)
144 def describe_service(self
, service_type
=None, service_name
=None, refresh
=False):
147 services
= self
._services
148 if service_type
is not None:
149 services
= list(filter(lambda s
: s
.spec
.service_type
== service_type
, services
))
151 # Deduce services from daemons running on localhost
152 all_daemons
= self
._get
_ceph
_daemons
()
154 for daemon_type
, daemons
in itertools
.groupby(all_daemons
, key
=lambda d
: d
.daemon_type
):
155 if service_type
is not None and service_type
!= daemon_type
:
157 daemon_size
= len(list(daemons
))
158 services
.append(orchestrator
.ServiceDescription(
160 service_type
=daemon_type
, # type: ignore
162 size
=daemon_size
, running
=daemon_size
))
164 def _filter_func(svc
):
165 if service_name
is not None and service_name
!= svc
.spec
.service_name():
169 return list(filter(_filter_func
, services
))
172 def list_daemons(self
, service_name
=None, daemon_type
=None, daemon_id
=None, host
=None, refresh
=False):
174 There is no guarantee which daemons are returned by describe_service, except that
175 it returns the mgr we're running in.
178 daemon_types
= ("mds", "osd", "mon", "rgw", "mgr", "iscsi", "crash", "nfs")
179 assert daemon_type
in daemon_types
, daemon_type
+ " unsupported"
181 daemons
= self
._daemons
if self
._daemons
else self
._get
_ceph
_daemons
()
184 if service_name
is not None and service_name
!= d
.service_name():
186 if daemon_type
is not None and daemon_type
!= d
.daemon_type
:
188 if daemon_id
is not None and daemon_id
!= d
.daemon_id
:
190 if host
is not None and host
!= d
.hostname
:
194 return list(filter(_filter_func
, daemons
))
196 def preview_drivegroups(self
, drive_group_name
=None, dg_specs
=None):
200 def create_osds(self
, drive_group
):
201 # type: (DriveGroupSpec) -> str
202 """ Creates OSDs from a drive group specification.
204 $: ceph orch osd create -i <dg.file>
206 The drivegroup file must only contain one spec at a time.
208 return self
._create
_osds
(drive_group
)
210 def _create_osds(self
, drive_group
):
211 # type: (DriveGroupSpec) -> str
213 drive_group
.validate()
214 all_hosts
= raise_if_exception(self
.get_hosts())
215 if not drive_group
.placement
.filter_matching_hostspecs(all_hosts
):
216 raise orchestrator
.OrchestratorValidationError('failed to match')
220 def apply_drivegroups(self
, specs
):
221 # type: (List[DriveGroupSpec]) -> List[str]
222 return [self
._create
_osds
(dg
) for dg
in specs
]
225 def remove_daemons(self
, names
):
226 assert isinstance(names
, list)
230 def remove_service(self
, service_name
):
231 assert isinstance(service_name
, str)
235 def blink_device_light(self
, ident_fault
, on
, locations
):
236 assert ident_fault
in ("ident", "fault")
237 assert len(locations
)
241 def service_action(self
, action
, service_name
):
245 def daemon_action(self
, action
, daemon_name
, image
=None):
249 def add_daemon(self
, spec
: ServiceSpec
):
250 return [spec
.one_line_str()]
253 def apply_nfs(self
, spec
):
254 return spec
.one_line_str()
257 def apply_iscsi(self
, spec
):
258 # type: (IscsiServiceSpec) -> str
259 return spec
.one_line_str()
264 return [orchestrator
.HostSpec(i
.name
, i
.addr
, i
.labels
) for i
in self
._inventory
]
265 return [orchestrator
.HostSpec('localhost')]
268 def add_host(self
, spec
):
269 # type: (orchestrator.HostSpec) -> str
271 if host
== 'raise_validation_error':
272 raise orchestrator
.OrchestratorValidationError("MON count must be either 1, 3 or 5")
273 if host
== 'raise_error':
274 raise orchestrator
.OrchestratorError("host address is empty")
275 if host
== 'raise_bug':
276 raise ZeroDivisionError()
277 if host
== 'raise_not_implemented':
278 raise NotImplementedError()
279 if host
== 'raise_no_orchestrator':
280 raise orchestrator
.NoOrchestrator()
281 if host
== 'raise_import_error':
282 raise ImportError("test_orchestrator not enabled")
283 assert isinstance(host
, str)
287 def remove_host(self
, host
):
288 assert isinstance(host
, str)
292 def apply_mgr(self
, spec
):
293 # type: (ServiceSpec) -> str
295 assert not spec
.placement
.hosts
or len(spec
.placement
.hosts
) == spec
.placement
.count
296 assert all([isinstance(h
, str) for h
in spec
.placement
.hosts
])
297 return spec
.one_line_str()
300 def apply_mon(self
, spec
):
301 # type: (ServiceSpec) -> str
303 assert not spec
.placement
.hosts
or len(spec
.placement
.hosts
) == spec
.placement
.count
304 assert all([isinstance(h
[0], str) for h
in spec
.placement
.hosts
])
305 assert all([isinstance(h
[1], str) or h
[1] is None for h
in spec
.placement
.hosts
])
306 return spec
.one_line_str()