]>
git.proxmox.com Git - ceph.git/blob - ceph/src/cephadm/tests/fixtures.py
1f5f171214863ff577a2b6fa20277925b6c58fa4
6 from contextlib
import contextmanager
7 from pyfakefs
import fake_filesystem
9 from typing
import Callable
, Dict
, List
, Optional
12 with mock
.patch('builtins.open', create
=True):
13 from importlib
.machinery
import SourceFileLoader
14 cd
= SourceFileLoader('cephadm', 'cephadm').load_module()
18 docker
= mock
.Mock(cd
.Docker
)
19 docker
.path
= '/usr/bin/docker'
24 podman
= mock
.Mock(cd
.Podman
)
25 podman
.path
= '/usr/bin/podman'
26 podman
.version
= (2, 1, 0)
34 def mock_bad_firewalld():
35 def raise_bad_firewalld():
36 raise Exception('Called bad firewalld')
37 f
= mock
.Mock(cd
.Firewalld
)
38 f
.enable_service_for
= lambda _
: raise_bad_firewalld()
39 f
.apply_rules
= lambda : raise_bad_firewalld()
40 f
.open_ports
= lambda _
: raise_bad_firewalld()
42 def _mock_scrape_host(obj
, interval
):
44 raise ValueError("wah")
45 except Exception as e
:
46 obj
._handle
_thread
_exception
(e
, 'host')
50 t
= obj
._create
_thread
(obj
._scrape
_host
_facts
, 'host', 5)
53 obj
.cephadm_cache
.update_health('host', "inactive", "host thread stopped")
58 fs
: fake_filesystem
.FakeFilesystem
,
61 use pyfakefs to stub filesystem calls
66 with mock
.patch('os.fchown'), \
67 mock
.patch('os.fchmod'), \
68 mock
.patch('platform.processor', return_value
='x86_64'), \
69 mock
.patch('cephadm.extract_uid_gid', return_value
=(uid
, gid
)):
71 fs
.create_dir(cd
.DATA_DIR
)
72 fs
.create_dir(cd
.LOG_DIR
)
73 fs
.create_dir(cd
.LOCK_DIR
)
74 fs
.create_dir(cd
.LOGROTATE_DIR
)
75 fs
.create_dir(cd
.UNIT_DIR
)
83 container_engine
: Callable
= mock_podman(),
84 list_networks
: Optional
[Dict
[str,Dict
[str,List
[str]]]] = None,
85 hostname
: Optional
[str] = None,
88 :param cmd: cephadm command argv
89 :param container_engine: container engine mock (podman or docker)
90 :param list_networks: mock 'list-networks' return
91 :param hostname: mock 'socket.gethostname' return
96 with mock
.patch('cephadm.attempt_bind'), \
97 mock
.patch('cephadm.call', return_value
=('', '', 0)), \
98 mock
.patch('cephadm.call_timeout', return_value
=0), \
99 mock
.patch('cephadm.find_executable', return_value
='foo'), \
100 mock
.patch('cephadm.is_available', return_value
=True), \
101 mock
.patch('cephadm.get_container_info', return_value
=None), \
102 mock
.patch('cephadm.json_loads_retry', return_value
={'epoch' : 1}), \
103 mock
.patch('socket.gethostname', return_value
=hostname
):
104 ctx
: cd
.CephadmContext
= cd
.cephadm_init_ctx(cmd
)
105 ctx
.container_engine
= container_engine
106 if list_networks
is not None:
107 with mock
.patch('cephadm.list_networks', return_value
=list_networks
):