]>
git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/conftest.py
3 from ceph_volume
.api
import lvm
as lvm_api
4 from ceph_volume
import conf
, configuration
9 def __init__(self
, *a
, **kw
):
13 self
.return_values
= kw
.get('return_values', False)
14 self
.always_returns
= kw
.get('always_returns', False)
16 def __call__(self
, *a
, **kw
):
17 self
.calls
.append({'args': a
, 'kwargs': kw
})
18 if self
.always_returns
:
19 return self
.always_returns
20 if self
.return_values
:
21 return self
.return_values
.pop()
24 class Factory(object):
26 def __init__(self
, **kw
):
27 for k
, v
in kw
.items():
42 def fake_run(monkeypatch
):
44 monkeypatch
.setattr('ceph_volume.process.run', fake_run
)
49 def fake_call(monkeypatch
):
50 fake_call
= Capture(always_returns
=([], [], 0))
51 monkeypatch
.setattr('ceph_volume.process.call', fake_call
)
56 def fakedevice(factory
):
68 params
.update(dict(kw
))
69 return factory(**params
)
74 def stub_call(monkeypatch
):
76 Monkeypatches process.call, so that a caller can add behavior to the response
78 def apply(return_values
):
79 if isinstance(return_values
, tuple):
80 return_values
= [return_values
]
81 stubbed_call
= Capture(return_values
=return_values
)
82 monkeypatch
.setattr('ceph_volume.process.call', stubbed_call
)
88 @pytest.fixture(autouse
=True)
89 def reset_cluster_name(request
, monkeypatch
):
91 The globally available ``ceph_volume.conf.cluster`` might get mangled in
92 tests, make sure that after evert test, it gets reset, preventing pollution
93 going into other tests later.
98 os
.environ
.pop('CEPH_CONF')
101 request
.addfinalizer(fin
)
105 def conf_ceph(monkeypatch
):
107 Monkeypatches ceph_volume.conf.ceph, which is meant to parse/read
108 a ceph.conf. The patching is naive, it allows one to set return values for
109 specific method calls.
113 monkeypatch
.setattr(conf
, 'ceph', stub
)
119 def conf_ceph_stub(monkeypatch
, tmpfile
):
121 Monkeypatches ceph_volume.conf.ceph with contents from a string that are
122 written to a temporary file and then is fed through the same ceph.conf
123 loading mechanisms for testing. Unlike ``conf_ceph`` which is just a fake,
124 we are actually loading values as seen on a ceph.conf file
126 This is useful when more complex ceph.conf's are needed. In the case of
127 just trying to validate a key/value behavior ``conf_ceph`` is better
131 conf_path
= tmpfile(contents
=contents
)
132 parser
= configuration
.load(conf_path
)
133 monkeypatch
.setattr(conf
, 'ceph', parser
)
139 def volumes(monkeypatch
):
140 monkeypatch
.setattr('ceph_volume.process.call', lambda x
, **kw
: ('', '', 0))
141 volumes
= lvm_api
.Volumes()
147 def volume_groups(monkeypatch
):
148 monkeypatch
.setattr('ceph_volume.process.call', lambda x
, **kw
: ('', '', 0))
149 vgs
= lvm_api
.VolumeGroups()
155 def stub_vgs(monkeypatch
, volume_groups
):
157 monkeypatch
.setattr(lvm_api
, 'get_api_vgs', lambda: vgs
)
162 def pvolumes(monkeypatch
):
163 monkeypatch
.setattr('ceph_volume.process.call', lambda x
, **kw
: ('', '', 0))
164 pvolumes
= lvm_api
.PVolumes()
170 def is_root(monkeypatch
):
172 Patch ``os.getuid()`` so that ceph-volume's decorators that ensure a user
173 is root (or is sudoing to superuser) can continue as-is
175 monkeypatch
.setattr('os.getuid', lambda: 0)
181 Create a temporary file, optionally filling it with contents, returns an
182 absolute path to the file when called
184 def generate_file(name
='file', contents
='', directory
=None):
185 directory
= directory
or str(tmpdir
)
186 path
= os
.path
.join(directory
, name
)
187 with
open(path
, 'w') as fp
:
194 def device_info(monkeypatch
):
195 def apply(devices
=None, lsblk
=None, lv
=None, blkid
=None, udevadm
=None):
196 devices
= devices
if devices
else {}
197 lsblk
= lsblk
if lsblk
else {}
198 blkid
= blkid
if blkid
else {}
199 udevadm
= udevadm
if udevadm
else {}
200 lv
= Factory(**lv
) if lv
else None
201 monkeypatch
.setattr("ceph_volume.sys_info.devices", {})
202 monkeypatch
.setattr("ceph_volume.util.device.disk.get_devices", lambda: devices
)
204 monkeypatch
.setattr("ceph_volume.util.device.lvm.get_lv_from_argument", lambda path
: lv
)
206 monkeypatch
.setattr("ceph_volume.util.device.lvm.get_lv_from_argument", lambda path
: None)
207 monkeypatch
.setattr("ceph_volume.util.device.lvm.get_lv", lambda vg_name
, lv_uuid
: lv
)
208 monkeypatch
.setattr("ceph_volume.util.device.disk.lsblk", lambda path
: lsblk
)
209 monkeypatch
.setattr("ceph_volume.util.device.disk.blkid", lambda path
: blkid
)
210 monkeypatch
.setattr("ceph_volume.util.disk.udevadm_property", lambda *a
, **kw
: udevadm
)