]>
git.proxmox.com Git - ceph.git/blob - ceph/src/cephadm/tests/fixtures.py
a3f46253976e921a28d1d3640a814996efb65e86
9 with
patch('builtins.open', create
=True):
10 from importlib
.machinery
import SourceFileLoader
11 cd
= SourceFileLoader('cephadm', 'cephadm').load_module()
18 def _mock_scrape_host(obj
, interval
):
20 raise ValueError("wah")
21 except Exception as e
:
22 obj
._handle
_thread
_exception
(e
, 'host')
26 t
= obj
._create
_thread
(obj
._scrape
_host
_facts
, 'host', 5)
29 obj
.cephadm_cache
.update_health('host', "inactive", "host thread stopped")
34 with mock
.patch('cephadm.CephadmDaemon.daemon_path', _daemon_path()), \
35 mock
.patch('cephadm.CephadmDaemon.can_run', return_value
=True), \
36 mock
.patch('cephadm.CephadmDaemon.run', _mock_run
), \
37 mock
.patch('cephadm.CephadmDaemon._scrape_host_facts', _mock_scrape_host
):
39 ctx
= cd
.CephadmContext()
40 exporter
= cd
.CephadmDaemon(ctx
, fsid
='foobar', daemon_id
='test')
41 assert exporter
.token
== 'MyAccessToken'