]>
git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/conftest.py
3 from ceph_volume
.api
import lvm
as lvm_api
4 from ceph_volume
import conf
, configuration
9 def __init__(self
, *a
, **kw
):
13 self
.return_values
= kw
.get('return_values', False)
14 self
.always_returns
= kw
.get('always_returns', False)
16 def __call__(self
, *a
, **kw
):
17 self
.calls
.append({'args': a
, 'kwargs': kw
})
18 if self
.always_returns
:
19 return self
.always_returns
20 if self
.return_values
:
21 return self
.return_values
.pop()
24 class Factory(object):
26 def __init__(self
, **kw
):
27 for k
, v
in kw
.items():
42 def fake_run(monkeypatch
):
44 monkeypatch
.setattr('ceph_volume.process.run', fake_run
)
49 def fake_call(monkeypatch
):
50 fake_call
= Capture(always_returns
=([], [], 0))
51 monkeypatch
.setattr('ceph_volume.process.call', fake_call
)
56 def stub_call(monkeypatch
):
58 Monkeypatches process.call, so that a caller can add behavior to the response
60 def apply(return_values
):
61 if isinstance(return_values
, tuple):
62 return_values
= [return_values
]
63 stubbed_call
= Capture(return_values
=return_values
)
64 monkeypatch
.setattr('ceph_volume.process.call', stubbed_call
)
71 def conf_ceph(monkeypatch
):
73 Monkeypatches ceph_volume.conf.ceph, which is meant to parse/read
74 a ceph.conf. The patching is naive, it allows one to set return values for
75 specific method calls.
79 monkeypatch
.setattr(conf
, 'ceph', stub
)
85 def conf_ceph_stub(monkeypatch
, tmpfile
):
87 Monkeypatches ceph_volume.conf.ceph with contents from a string that are
88 written to a temporary file and then is fed through the same ceph.conf
89 loading mechanisms for testing. Unlike ``conf_ceph`` which is just a fake,
90 we are actually loading values as seen on a ceph.conf file
92 This is useful when more complex ceph.conf's are needed. In the case of
93 just trying to validate a key/value behavior ``conf_ceph`` is better
97 conf_path
= tmpfile(contents
=contents
)
98 parser
= configuration
.load(conf_path
)
99 monkeypatch
.setattr(conf
, 'ceph', parser
)
105 def volumes(monkeypatch
):
106 monkeypatch
.setattr('ceph_volume.process.call', lambda x
: ('', '', 0))
107 volumes
= lvm_api
.Volumes()
113 def volume_groups(monkeypatch
):
114 monkeypatch
.setattr('ceph_volume.process.call', lambda x
: ('', '', 0))
115 vgs
= lvm_api
.VolumeGroups()
121 def is_root(monkeypatch
):
123 Patch ``os.getuid()`` so that ceph-volume's decorators that ensure a user
124 is root (or is sudoing to superuser) can continue as-is
126 monkeypatch
.setattr('os.getuid', lambda: 0)
132 Create a temporary file, optionally filling it with contents, returns an
133 absolute path to the file when called
135 def generate_file(name
='file', contents
=''):
136 path
= os
.path
.join(str(tmpdir
), name
)
137 with
open(path
, 'w') as fp
: