]> git.proxmox.com Git - ceph.git/blob - ceph/src/cephadm/tests/fixtures.py
a3f46253976e921a28d1d3640a814996efb65e86
[ceph.git] / ceph / src / cephadm / tests / fixtures.py
1
2 import mock
3 from mock import patch
4 import pytest
5
6 import os
7 import time
8
9 with patch('builtins.open', create=True):
10 from importlib.machinery import SourceFileLoader
11 cd = SourceFileLoader('cephadm', 'cephadm').load_module()
12
13
14 def _daemon_path():
15 return os.getcwd()
16
17
18 def _mock_scrape_host(obj, interval):
19 try:
20 raise ValueError("wah")
21 except Exception as e:
22 obj._handle_thread_exception(e, 'host')
23
24
25 def _mock_run(obj):
26 t = obj._create_thread(obj._scrape_host_facts, 'host', 5)
27 time.sleep(1)
28 if not t.is_alive():
29 obj.cephadm_cache.update_health('host', "inactive", "host thread stopped")
30
31
32 @pytest.fixture
33 def exporter():
34 with mock.patch('cephadm.CephadmDaemon.daemon_path', _daemon_path()), \
35 mock.patch('cephadm.CephadmDaemon.can_run', return_value=True), \
36 mock.patch('cephadm.CephadmDaemon.run', _mock_run), \
37 mock.patch('cephadm.CephadmDaemon._scrape_host_facts', _mock_scrape_host):
38
39 ctx = cd.CephadmContext()
40 exporter = cd.CephadmDaemon(ctx, fsid='foobar', daemon_id='test')
41 assert exporter.token == 'MyAccessToken'
42 yield exporter