8 from subprocess
import check_output
, CalledProcessError
10 from ceph
.deployment
.service_spec
import NFSServiceSpec
, ServiceSpec
13 from typing
import Callable
, List
, Sequence
, Tuple
19 from ceph
.deployment
import inventory
20 from ceph
.deployment
.drive_group
import DriveGroupSpec
21 from mgr_module
import CLICommand
, HandleCommandResult
22 from mgr_module
import MgrModule
27 class TestCompletion(orchestrator
.Completion
):
33 # type: (Callable) -> Callable[..., TestCompletion]
35 Decorator to make methods return
36 a completion object that executes themselves.
40 def wrapper(*args
, **kwargs
):
41 return TestCompletion(on_complete
=lambda _
: f(*args
, **kwargs
))
46 def deferred_write(message
):
48 # type: (Callable) -> Callable[..., TestCompletion]
51 def wrapper(self
, *args
, **kwargs
):
52 return TestCompletion
.with_progress(
55 on_complete
=lambda _
: f(self
, *args
, **kwargs
),
62 class TestOrchestrator(MgrModule
, orchestrator
.Orchestrator
):
64 This is an orchestrator implementation used for internal testing. It's meant for
65 development environments and integration testing.
67 It does not actually do anything.
69 The implementation is similar to the Rook orchestrator, but simpler.
72 def process(self
, completions
):
73 # type: (List[TestCompletion]) -> None
75 self
.log
.info("process: completions={0}".format(orchestrator
.pretty_print(completions
)))
80 @CLICommand('test_orchestrator load_data', '', 'load dummy data into test orchestrator', 'w')
81 def _load_data(self
, inbuf
):
83 data
= json
.loads(inbuf
)
85 return HandleCommandResult()
86 except json
.decoder
.JSONDecodeError
as e
:
87 msg
= 'Invalid JSON file: {}'.format(e
)
88 return HandleCommandResult(retval
=-errno
.EINVAL
, stderr
=msg
)
89 except orchestrator
.OrchestratorValidationError
as e
:
90 return HandleCommandResult(retval
=-errno
.EINVAL
, stderr
=str(e
))
95 def __init__(self
, *args
, **kwargs
):
96 super(TestOrchestrator
, self
).__init
__(*args
, **kwargs
)
98 self
._initialized
= threading
.Event()
99 self
._shutdown
= threading
.Event()
101 self
.all_progress_references
= list() # type: List[orchestrator.ProgressReference]
108 self
._initialized
.set()
110 while not self
._shutdown
.is_set():
111 # XXX hack (or is it?) to kick all completions periodically,
112 # in case we had a caller that wait()'ed on them long enough
113 # to get persistence but not long enough to get completion
115 self
.all_progress_references
= [p
for p
in self
.all_progress_references
if not p
.effective
]
116 for p
in self
.all_progress_references
:
119 self
._shutdown
.wait(5)
121 def _init_data(self
, data
=None):
122 self
._inventory
= [orchestrator
.InventoryHost
.from_json(inventory_host
)
123 for inventory_host
in data
.get('inventory', [])]
124 self
._services
= [orchestrator
.ServiceDescription
.from_json(service
)
125 for service
in data
.get('services', [])]
126 self
._daemons
= [orchestrator
.DaemonDescription
.from_json(daemon
)
127 for daemon
in data
.get('daemons', [])]
130 def get_inventory(self
, host_filter
=None, refresh
=False):
132 There is no guarantee which devices are returned by get_inventory.
134 if host_filter
and host_filter
.hosts
is not None:
135 assert isinstance(host_filter
.hosts
, list)
139 return list(filter(lambda host
: host
.name
in host_filter
.hosts
,
141 return self
._inventory
144 c_v_out
= check_output(['ceph-volume', 'inventory', '--format', 'json'])
147 . {tmpdir}/ceph-volume-virtualenv/bin/activate
148 ceph-volume inventory --format json
151 c_v_out
= check_output(cmd
.format(tmpdir
=os
.environ
.get('TMPDIR', '/tmp')), shell
=True)
152 except (OSError, CalledProcessError
):
153 c_v_out
= check_output(cmd
.format(tmpdir
='.'),shell
=True)
155 for out
in c_v_out
.splitlines():
157 devs
= inventory
.Devices
.from_json(json
.loads(out
))
158 return [orchestrator
.InventoryHost('localhost', devs
)]
159 self
.log
.error('c-v failed: ' + str(c_v_out
))
160 raise Exception('c-v failed')
162 def _get_ceph_daemons(self
):
163 # type: () -> List[orchestrator.DaemonDescription]
164 """ Return ceph daemons on the running host."""
165 types
= ("mds", "osd", "mon", "rgw", "mgr")
166 out
= map(str, check_output(['ps', 'aux']).splitlines())
167 processes
= [p
for p
in out
if any(
168 [('ceph-{} '.format(t
) in p
) for t
in types
])]
173 m
= re
.search('ceph-([^ ]+)', p
)
175 _daemon_type
= m
.group(1)
177 raise AssertionError('Fail to determine daemon type from {}'.format(p
))
179 # parse daemon ID. Possible options: `-i <id>`, `--id=<id>`, `--id <id>`
180 patterns
= [r
'-i\s(\w+)', r
'--id[\s=](\w+)']
181 for pattern
in patterns
:
182 m
= re
.search(pattern
, p
)
184 daemon_id
= m
.group(1)
187 raise AssertionError('Fail to determine daemon ID from {}'.format(p
))
188 daemon
= orchestrator
.DaemonDescription(
189 daemon_type
=_daemon_type
, daemon_id
=daemon_id
, hostname
='localhost')
190 daemons
.append(daemon
)
194 def describe_service(self
, service_type
=None, service_name
=None, refresh
=False):
197 services
= self
._services
198 if service_type
is not None:
199 services
= list(filter(lambda s
: s
.spec
.service_type
== service_type
, services
))
201 # Deduce services from daemons running on localhost
202 all_daemons
= self
._get
_ceph
_daemons
()
204 for daemon_type
, daemons
in itertools
.groupby(all_daemons
, key
=lambda d
: d
.daemon_type
):
205 if service_type
is not None and service_type
!= daemon_type
:
207 daemon_size
= len(list(daemons
))
208 services
.append(orchestrator
.ServiceDescription(
210 service_type
=daemon_type
,
212 size
=daemon_size
, running
=daemon_size
))
214 def _filter_func(svc
):
215 if service_name
is not None and service_name
!= svc
.spec
.service_name():
219 return list(filter(_filter_func
, services
))
222 def list_daemons(self
, service_name
=None, daemon_type
=None, daemon_id
=None, host
=None, refresh
=False):
224 There is no guarantee which daemons are returned by describe_service, except that
225 it returns the mgr we're running in.
228 daemon_types
= ("mds", "osd", "mon", "rgw", "mgr", "iscsi", "crash")
229 assert daemon_type
in daemon_types
, daemon_type
+ " unsupported"
231 daemons
= self
._daemons
if self
._daemons
else self
._get
_ceph
_daemons
()
234 if service_name
is not None and service_name
!= d
.service_name():
236 if daemon_type
is not None and daemon_type
!= d
.daemon_type
:
238 if daemon_id
is not None and daemon_id
!= d
.daemon_id
:
240 if host
is not None and host
!= d
.hostname
:
244 return list(filter(_filter_func
, daemons
))
246 def preview_drivegroups(self
, drive_group_name
=None, dg_specs
=None):
249 def create_osds(self
, drive_group
):
250 # type: (DriveGroupSpec) -> TestCompletion
251 """ Creates OSDs from a drive group specification.
253 $: ceph orch osd create -i <dg.file>
255 The drivegroup file must only contain one spec at a time.
259 # type: (List[orchestrator.HostSpec]) -> None
260 drive_group
.validate()
261 if not drive_group
.placement
.filter_matching_hosts(lambda label
=None, as_hostspec
=None:
262 [h
.hostname
for h
in all_hosts
]):
263 raise orchestrator
.OrchestratorValidationError('failed to match')
265 return self
.get_hosts().then(run
).then(
266 on_complete
=orchestrator
.ProgressReference(
267 message
='create_osds',
272 def apply_drivegroups(self
, specs
):
273 # type: (List[DriveGroupSpec]) -> TestCompletion
274 drive_group
= specs
[0]
277 # type: (List[orchestrator.HostSpec]) -> None
278 drive_group
.validate()
279 if not drive_group
.placement
.filter_matching_hosts(lambda label
=None, as_hostspec
=None:
280 [h
.hostname
for h
in all_hosts
]):
281 raise orchestrator
.OrchestratorValidationError('failed to match')
282 return self
.get_hosts().then(run
).then(
283 on_complete
=orchestrator
.ProgressReference(
284 message
='apply_drivesgroups',
289 @deferred_write("remove_daemons")
290 def remove_daemons(self
, names
):
291 assert isinstance(names
, list)
293 @deferred_write("remove_service")
294 def remove_service(self
, service_name
):
295 assert isinstance(service_name
, str)
297 @deferred_write("blink_device_light")
298 def blink_device_light(self
, ident_fault
, on
, locations
):
299 assert ident_fault
in ("ident", "fault")
300 assert len(locations
)
303 @deferred_write("service_action")
304 def service_action(self
, action
, service_name
):
307 @deferred_write("daemon_action")
308 def daemon_action(self
, action
, daemon_type
, daemon_id
):
311 @deferred_write("Adding NFS service")
312 def add_nfs(self
, spec
):
313 # type: (NFSServiceSpec) -> None
314 assert isinstance(spec
.pool
, str)
316 @deferred_write("apply_nfs")
317 def apply_nfs(self
, spec
):
320 @deferred_write("add_mds")
321 def add_mds(self
, spec
):
324 @deferred_write("add_rgw")
325 def add_rgw(self
, spec
):
331 return [orchestrator
.HostSpec(i
.name
, i
.addr
, i
.labels
) for i
in self
._inventory
]
332 return [orchestrator
.HostSpec('localhost')]
334 @deferred_write("add_host")
335 def add_host(self
, spec
):
336 # type: (orchestrator.HostSpec) -> None
338 if host
== 'raise_no_support':
339 raise orchestrator
.OrchestratorValidationError("MON count must be either 1, 3 or 5")
340 if host
== 'raise_bug':
341 raise ZeroDivisionError()
342 if host
== 'raise_not_implemented':
343 raise NotImplementedError()
344 if host
== 'raise_no_orchestrator':
345 raise orchestrator
.NoOrchestrator()
346 if host
== 'raise_import_error':
347 raise ImportError("test_orchestrator not enabled")
348 assert isinstance(host
, six
.string_types
)
350 @deferred_write("remove_host")
351 def remove_host(self
, host
):
352 assert isinstance(host
, six
.string_types
)
354 @deferred_write("apply_mgr")
355 def apply_mgr(self
, spec
):
356 # type: (ServiceSpec) -> None
358 assert not spec
.placement
.hosts
or len(spec
.placement
.hosts
) == spec
.placement
.count
359 assert all([isinstance(h
, str) for h
in spec
.placement
.hosts
])
361 @deferred_write("apply_mon")
362 def apply_mon(self
, spec
):
363 # type: (ServiceSpec) -> None
365 assert not spec
.placement
.hosts
or len(spec
.placement
.hosts
) == spec
.placement
.count
366 assert all([isinstance(h
[0], str) for h
in spec
.placement
.hosts
])
367 assert all([isinstance(h
[1], str) or h
[1] is None for h
in spec
.placement
.hosts
])