]>
git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/devices/lvm/test_listing.py
2 from ceph_volume
.devices
import lvm
3 from ceph_volume
.api
import lvm
as api
5 # TODO: add tests for following commands -
7 # ceph-volume list <path-to-pv>
8 # ceph-volume list <path-to-vg>
9 # ceph-volume list <path-to-lv>
11 class TestReadableTag(object):
13 def test_dots_get_replaced(self
):
14 result
= lvm
.listing
.readable_tag('ceph.foo')
15 assert result
== 'foo'
17 def test_underscores_are_replaced_with_spaces(self
):
18 result
= lvm
.listing
.readable_tag('ceph.long_tag')
19 assert result
== 'long tag'
22 class TestPrettyReport(object):
24 def test_is_empty(self
, capsys
):
25 lvm
.listing
.pretty_report({})
26 stdout
, stderr
= capsys
.readouterr()
29 def test_type_and_path_are_reported(self
, capsys
):
30 lvm
.listing
.pretty_report({0: [
31 {'type': 'data', 'path': '/dev/sda1', 'devices': ['/dev/sda']}
33 stdout
, stderr
= capsys
.readouterr()
34 assert '[data] /dev/sda1' in stdout
36 def test_osd_id_header_is_reported(self
, capsys
):
37 lvm
.listing
.pretty_report({0: [
38 {'type': 'data', 'path': '/dev/sda1', 'devices': ['/dev/sda']}
40 stdout
, stderr
= capsys
.readouterr()
41 assert '====== osd.0 =======' in stdout
43 def test_tags_are_included(self
, capsys
):
44 lvm
.listing
.pretty_report(
48 'tags': {'ceph.osd_id': '0'},
49 'devices': ['/dev/sda'],
52 stdout
, stderr
= capsys
.readouterr()
53 assert 'osd id' in stdout
55 def test_devices_are_comma_separated(self
, capsys
):
56 lvm
.listing
.pretty_report({0: [
57 {'type': 'data', 'path': '/dev/sda1', 'devices': ['/dev/sda', '/dev/sdb1']}
59 stdout
, stderr
= capsys
.readouterr()
60 assert '/dev/sda,/dev/sdb1' in stdout
63 class TestList(object):
65 def test_empty_full_json_zero_exit_status(self
, is_root
,factory
,capsys
):
66 args
= factory(format
='json', device
=None)
67 lvm
.listing
.List([]).list(args
)
68 stdout
, stderr
= capsys
.readouterr()
69 assert stdout
== '{}\n'
71 def test_empty_device_json_zero_exit_status(self
, is_root
,factory
,capsys
):
72 args
= factory(format
='json', device
='/dev/sda1')
73 lvm
.listing
.List([]).list(args
)
74 stdout
, stderr
= capsys
.readouterr()
75 assert stdout
== '{}\n'
77 def test_empty_full_zero_exit_status(self
, is_root
, factory
):
78 args
= factory(format
='pretty', device
=None)
79 with pytest
.raises(SystemExit):
80 lvm
.listing
.List([]).list(args
)
82 def test_empty_device_zero_exit_status(self
, is_root
, factory
):
83 args
= factory(format
='pretty', device
='/dev/sda1')
84 with pytest
.raises(SystemExit):
85 lvm
.listing
.List([]).list(args
)
87 class TestFullReport(object):
89 def test_no_ceph_lvs(self
, monkeypatch
):
90 # ceph lvs are detected by looking into its tags
91 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/VolGroup/lv',
95 monkeypatch
.setattr(lvm
.listing
.api
, 'get_lvs', lambda **kwargs
:
98 result
= lvm
.listing
.List([]).full_report()
101 def test_ceph_data_lv_reported(self
, monkeypatch
):
102 tags
= 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
103 pv
= api
.PVolume(pv_name
='/dev/sda1', pv_tags
={}, pv_uuid
="0000",
104 vg_name
='VolGroup', lv_uuid
="aaaa")
105 osd
= api
.Volume(lv_name
='volume1', lv_uuid
='y', lv_tags
=tags
,
106 lv_path
='/dev/VolGroup/lv', vg_name
='VolGroup')
109 monkeypatch
.setattr(lvm
.listing
.api
, 'get_first_pv', lambda **kwargs
: pv
)
110 monkeypatch
.setattr(lvm
.listing
.api
, 'get_lvs', lambda **kwargs
:
113 result
= lvm
.listing
.List([]).full_report()
114 assert result
['0'][0]['name'] == 'volume1'
116 def test_ceph_journal_lv_reported(self
, monkeypatch
):
117 tags
= 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
118 journal_tags
= 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=journal'
119 pv
= api
.PVolume(pv_name
='/dev/sda1', pv_tags
={}, pv_uuid
="0000",
120 vg_name
="VolGroup", lv_uuid
="aaaa")
121 osd
= api
.Volume(lv_name
='volume1', lv_uuid
='y', lv_tags
=tags
,
122 lv_path
='/dev/VolGroup/lv', vg_name
='VolGroup')
123 journal
= api
.Volume(
124 lv_name
='journal', lv_uuid
='x', lv_tags
=journal_tags
,
125 lv_path
='/dev/VolGroup/journal', vg_name
='VolGroup')
128 volumes
.append(journal
)
129 monkeypatch
.setattr(lvm
.listing
.api
,'get_first_pv',lambda **kwargs
:pv
)
130 monkeypatch
.setattr(lvm
.listing
.api
, 'get_lvs', lambda **kwargs
:
133 result
= lvm
.listing
.List([]).full_report()
134 assert result
['0'][0]['name'] == 'volume1'
135 assert result
['0'][1]['name'] == 'journal'
137 def test_ceph_wal_lv_reported(self
, monkeypatch
):
138 tags
= 'ceph.osd_id=0,ceph.wal_uuid=x,ceph.type=data'
139 wal_tags
= 'ceph.osd_id=0,ceph.wal_uuid=x,ceph.type=wal'
140 osd
= api
.Volume(lv_name
='volume1', lv_uuid
='y', lv_tags
=tags
,
141 lv_path
='/dev/VolGroup/lv', vg_name
='VolGroup')
142 wal
= api
.Volume(lv_name
='wal', lv_uuid
='x', lv_tags
=wal_tags
,
143 lv_path
='/dev/VolGroup/wal', vg_name
='VolGroup')
147 monkeypatch
.setattr(lvm
.listing
.api
, 'get_lvs', lambda **kwargs
:
150 result
= lvm
.listing
.List([]).full_report()
151 assert result
['0'][0]['name'] == 'volume1'
152 assert result
['0'][1]['name'] == 'wal'
154 @pytest.mark
.parametrize('type_', ['journal', 'db', 'wal'])
155 def test_physical_2nd_device_gets_reported(self
, type_
, monkeypatch
):
156 tags
= ('ceph.osd_id=0,ceph.{t}_uuid=x,ceph.type=data,'
157 'ceph.{t}_device=/dev/sda1').format(t
=type_
)
158 osd
= api
.Volume(lv_name
='volume1', lv_uuid
='y', lv_tags
=tags
,
159 vg_name
='VolGroup', lv_path
='/dev/VolGroup/lv')
160 monkeypatch
.setattr(lvm
.listing
.api
, 'get_lvs', lambda **kwargs
:
163 result
= lvm
.listing
.List([]).full_report()
164 assert result
['0'][1]['path'] == '/dev/sda1'
165 assert result
['0'][1]['tags'] == {'PARTUUID': 'x'}
166 assert result
['0'][1]['type'] == type_
169 class TestSingleReport(object):
171 def test_not_a_ceph_lv(self
, monkeypatch
):
172 # ceph lvs are detected by looking into its tags
173 lv
= api
.Volume(lv_name
='lv', lv_tags
={}, lv_path
='/dev/VolGroup/lv',
175 monkeypatch
.setattr(lvm
.listing
.api
, 'get_lvs', lambda **kwargs
:
178 result
= lvm
.listing
.List([]).single_report('VolGroup/lv')
181 def test_report_a_ceph_lv(self
, monkeypatch
):
182 # ceph lvs are detected by looking into its tags
183 tags
= 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
184 lv
= api
.Volume(lv_name
='lv', vg_name
='VolGroup', lv_uuid
='aaaa',
185 lv_path
='/dev/VolGroup/lv', lv_tags
=tags
)
188 monkeypatch
.setattr(lvm
.listing
.api
, 'get_lvs', lambda **kwargs
:
191 result
= lvm
.listing
.List([]).single_report('VolGroup/lv')
192 assert result
['0'][0]['name'] == 'lv'
193 assert result
['0'][0]['lv_tags'] == tags
194 assert result
['0'][0]['path'] == '/dev/VolGroup/lv'
195 assert result
['0'][0]['devices'] == []
197 def test_report_a_ceph_journal_device(self
, monkeypatch
):
198 # ceph lvs are detected by looking into its tags
199 tags
= 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data,' + \
200 'ceph.journal_device=/dev/sda1'
201 lv
= api
.Volume(lv_name
='lv', lv_uuid
='aaa', lv_tags
=tags
,
202 lv_path
='/dev/VolGroup/lv', vg_name
='VolGroup')
203 monkeypatch
.setattr(lvm
.listing
.api
, 'get_lvs', lambda **kwargs
:
204 [lv
] if 'tags' in kwargs
else [])
206 result
= lvm
.listing
.List([]).single_report('/dev/sda1')
207 assert result
['0'][0]['tags'] == {'PARTUUID': 'x'}
208 assert result
['0'][0]['type'] == 'journal'
209 assert result
['0'][0]['path'] == '/dev/sda1'
211 def test_report_a_ceph_lv_with_devices(self
, monkeypatch
):
214 tags
= 'ceph.osd_id=0,ceph.type=data'
215 pv1
= api
.PVolume(vg_name
="VolGroup", pv_name
='/dev/sda1',
216 pv_uuid
='', pv_tags
={}, lv_uuid
="aaaa")
217 pv2
= api
.PVolume(vg_name
="VolGroup", pv_name
='/dev/sdb1',
218 pv_uuid
='', pv_tags
={}, lv_uuid
="aaaa")
224 lv
= api
.Volume(lv_name
='lv', vg_name
='VolGroup',lv_uuid
='aaaa',
225 lv_path
='/dev/VolGroup/lv', lv_tags
=tags
)
228 monkeypatch
.setattr(lvm
.listing
.api
, 'get_pvs', lambda **kwargs
:
230 monkeypatch
.setattr(lvm
.listing
.api
, 'get_lvs', lambda **kwargs
:
233 listing
= lvm
.listing
.List([])
235 {'lv_uuid': 'aaaa', 'pv_name': '/dev/sda1', 'pv_tags': '', 'pv_uuid': ''},
236 {'lv_uuid': 'aaaa', 'pv_name': '/dev/sdb1', 'pv_tags': '', 'pv_uuid': ''},
239 result
= listing
.single_report('VolGroup/lv')
240 assert result
['0'][0]['name'] == 'lv'
241 assert result
['0'][0]['lv_tags'] == tags
242 assert result
['0'][0]['path'] == '/dev/VolGroup/lv'
243 assert result
['0'][0]['devices'] == ['/dev/sda1', '/dev/sdb1']
245 def test_report_a_ceph_lv_with_no_matching_devices(self
, monkeypatch
):
246 tags
= 'ceph.osd_id=0,ceph.type=data'
247 lv
= api
.Volume(lv_name
='lv', vg_name
='VolGroup', lv_uuid
='aaaa',
248 lv_path
='/dev/VolGroup/lv', lv_tags
=tags
)
251 monkeypatch
.setattr(lvm
.listing
.api
, 'get_lvs', lambda **kwargs
:
254 listing
= lvm
.listing
.List([])
256 {'lv_uuid': 'ffff', 'pv_name': '/dev/sda1', 'pv_tags': '',
258 {'lv_uuid': 'ffff', 'pv_name': '/dev/sdb1', 'pv_tags': '',
261 result
= listing
.single_report('VolGroup/lv')
262 assert result
['0'][0]['name'] == 'lv'
263 assert result
['0'][0]['lv_tags'] == tags
264 assert result
['0'][0]['path'] == '/dev/VolGroup/lv'
265 assert result
['0'][0]['devices'] == []