]>
git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/devices/lvm/test_listing.py
2 from ceph_volume
.devices
import lvm
3 from ceph_volume
.api
import lvm
as api
6 class TestReadableTag(object):
8 def test_dots_get_replaced(self
):
9 result
= lvm
.listing
.readable_tag('ceph.foo')
10 assert result
== 'foo'
12 def test_underscores_are_replaced_with_spaces(self
):
13 result
= lvm
.listing
.readable_tag('ceph.long_tag')
14 assert result
== 'long tag'
17 class TestPrettyReport(object):
19 def test_is_empty(self
, capsys
):
20 lvm
.listing
.pretty_report({})
21 stdout
, stderr
= capsys
.readouterr()
24 def test_type_and_path_are_reported(self
, capsys
):
25 lvm
.listing
.pretty_report({0: [
26 {'type': 'data', 'path': '/dev/sda1', 'devices': ['/dev/sda']}
28 stdout
, stderr
= capsys
.readouterr()
29 assert '[data] /dev/sda1' in stdout
31 def test_osd_id_header_is_reported(self
, capsys
):
32 lvm
.listing
.pretty_report({0: [
33 {'type': 'data', 'path': '/dev/sda1', 'devices': ['/dev/sda']}
35 stdout
, stderr
= capsys
.readouterr()
36 assert '====== osd.0 =======' in stdout
38 def test_tags_are_included(self
, capsys
):
39 lvm
.listing
.pretty_report(
43 'tags': {'ceph.osd_id': '0'},
44 'devices': ['/dev/sda'],
47 stdout
, stderr
= capsys
.readouterr()
48 assert 'osd id' in stdout
50 def test_devices_are_comma_separated(self
, capsys
):
51 lvm
.listing
.pretty_report({0: [
52 {'type': 'data', 'path': '/dev/sda1', 'devices': ['/dev/sda', '/dev/sdb1']}
54 stdout
, stderr
= capsys
.readouterr()
55 assert '/dev/sda,/dev/sdb1' in stdout
58 class TestList(object):
60 def test_empty_full_json_zero_exit_status(self
, is_root
, volumes
, factory
, capsys
):
61 args
= factory(format
='json', device
=None)
62 lvm
.listing
.List([]).list(args
)
63 stdout
, stderr
= capsys
.readouterr()
64 assert stdout
== '{}\n'
66 def test_empty_device_json_zero_exit_status(self
, is_root
, volumes
, factory
, capsys
):
67 args
= factory(format
='json', device
='/dev/sda1')
68 lvm
.listing
.List([]).list(args
)
69 stdout
, stderr
= capsys
.readouterr()
70 assert stdout
== '{}\n'
72 def test_empty_full_zero_exit_status(self
, is_root
, volumes
, factory
):
73 args
= factory(format
='pretty', device
=None)
74 with pytest
.raises(SystemExit):
75 lvm
.listing
.List([]).list(args
)
77 def test_empty_device_zero_exit_status(self
, is_root
, volumes
, factory
):
78 args
= factory(format
='pretty', device
='/dev/sda1')
79 with pytest
.raises(SystemExit):
80 lvm
.listing
.List([]).list(args
)
83 class TestFullReport(object):
85 def test_no_ceph_lvs(self
, volumes
, monkeypatch
):
86 # ceph lvs are detected by looking into its tags
87 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/VolGroup/lv', lv_tags
={})
89 monkeypatch
.setattr(lvm
.listing
.api
, 'Volumes', lambda: volumes
)
90 result
= lvm
.listing
.List([]).full_report()
93 def test_ceph_data_lv_reported(self
, volumes
, monkeypatch
):
94 tags
= 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
96 lv_name
='volume1', lv_uuid
='y', lv_path
='/dev/VolGroup/lv', lv_tags
=tags
)
98 monkeypatch
.setattr(lvm
.listing
.api
, 'Volumes', lambda: volumes
)
99 result
= lvm
.listing
.List([]).full_report()
100 assert result
['0'][0]['name'] == 'volume1'
102 def test_ceph_journal_lv_reported(self
, volumes
, monkeypatch
):
103 tags
= 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
104 journal_tags
= 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=journal'
106 lv_name
='volume1', lv_uuid
='y', lv_path
='/dev/VolGroup/lv', lv_tags
=tags
)
107 journal
= api
.Volume(
108 lv_name
='journal', lv_uuid
='x', lv_path
='/dev/VolGroup/journal', lv_tags
=journal_tags
)
110 volumes
.append(journal
)
111 monkeypatch
.setattr(lvm
.listing
.api
, 'Volumes', lambda: volumes
)
112 result
= lvm
.listing
.List([]).full_report()
113 assert result
['0'][0]['name'] == 'volume1'
114 assert result
['0'][1]['name'] == 'journal'
116 def test_ceph_wal_lv_reported(self
, volumes
, monkeypatch
):
117 tags
= 'ceph.osd_id=0,ceph.wal_uuid=x,ceph.type=data'
118 wal_tags
= 'ceph.osd_id=0,ceph.wal_uuid=x,ceph.type=wal'
120 lv_name
='volume1', lv_uuid
='y', lv_path
='/dev/VolGroup/lv', lv_tags
=tags
)
122 lv_name
='wal', lv_uuid
='x', lv_path
='/dev/VolGroup/wal', lv_tags
=wal_tags
)
125 monkeypatch
.setattr(lvm
.listing
.api
, 'Volumes', lambda: volumes
)
126 result
= lvm
.listing
.List([]).full_report()
127 assert result
['0'][0]['name'] == 'volume1'
128 assert result
['0'][1]['name'] == 'wal'
130 def test_physical_journal_gets_reported(self
, volumes
, monkeypatch
):
131 tags
= 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
133 lv_name
='volume1', lv_uuid
='y', lv_path
='/dev/VolGroup/lv', lv_tags
=tags
)
135 monkeypatch
.setattr(lvm
.listing
.api
, 'Volumes', lambda: volumes
)
136 monkeypatch
.setattr(lvm
.listing
.disk
, 'get_device_from_partuuid', lambda x
: '/dev/sda1')
137 result
= lvm
.listing
.List([]).full_report()
138 assert result
['0'][1]['path'] == '/dev/sda1'
139 assert result
['0'][1]['tags'] == {'PARTUUID': 'x'}
140 assert result
['0'][1]['type'] == 'journal'
142 def test_physical_wal_gets_reported(self
, volumes
, monkeypatch
):
143 tags
= 'ceph.osd_id=0,ceph.wal_uuid=x,ceph.type=data'
145 lv_name
='volume1', lv_uuid
='y', lv_path
='/dev/VolGroup/lv', lv_tags
=tags
)
147 monkeypatch
.setattr(lvm
.listing
.api
, 'Volumes', lambda: volumes
)
148 monkeypatch
.setattr(lvm
.listing
.disk
, 'get_device_from_partuuid', lambda x
: '/dev/sda1')
149 result
= lvm
.listing
.List([]).full_report()
150 assert result
['0'][1]['path'] == '/dev/sda1'
151 assert result
['0'][1]['tags'] == {'PARTUUID': 'x'}
152 assert result
['0'][1]['type'] == 'wal'
155 class TestSingleReport(object):
157 def test_not_a_ceph_lv(self
, volumes
, monkeypatch
):
158 # ceph lvs are detected by looking into its tags
160 lv_name
='lv', vg_name
='VolGroup', lv_path
='/dev/VolGroup/lv', lv_tags
={})
162 monkeypatch
.setattr(lvm
.listing
.api
, 'Volumes', lambda: volumes
)
163 result
= lvm
.listing
.List([]).single_report('VolGroup/lv')
166 def test_report_a_ceph_lv(self
, volumes
, monkeypatch
):
167 # ceph lvs are detected by looking into its tags
168 tags
= 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
170 lv_name
='lv', vg_name
='VolGroup',
171 lv_uuid
='aaaa', lv_path
='/dev/VolGroup/lv', lv_tags
=tags
174 monkeypatch
.setattr(lvm
.listing
.api
, 'Volumes', lambda: volumes
)
175 result
= lvm
.listing
.List([]).single_report('VolGroup/lv')
176 assert result
['0'][0]['name'] == 'lv'
177 assert result
['0'][0]['lv_tags'] == tags
178 assert result
['0'][0]['path'] == '/dev/VolGroup/lv'
179 assert result
['0'][0]['devices'] == []
181 def test_report_a_ceph_journal_device(self
, volumes
, monkeypatch
):
182 # ceph lvs are detected by looking into its tags
183 tags
= 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data,ceph.journal_device=/dev/sda1'
185 lv_name
='lv', vg_name
='VolGroup', lv_path
='/dev/VolGroup/lv',
186 lv_uuid
='aaa', lv_tags
=tags
)
188 monkeypatch
.setattr(lvm
.listing
.api
, 'Volumes', lambda: volumes
)
189 result
= lvm
.listing
.List([]).single_report('/dev/sda1')
190 assert result
['0'][0]['tags'] == {'PARTUUID': 'x'}
191 assert result
['0'][0]['type'] == 'journal'
192 assert result
['0'][0]['path'] == '/dev/sda1'
194 def test_report_a_ceph_lv_with_devices(self
, volumes
, monkeypatch
):
195 tags
= 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
197 lv_name
='lv', vg_name
='VolGroup',
198 lv_uuid
='aaaa', lv_path
='/dev/VolGroup/lv', lv_tags
=tags
201 monkeypatch
.setattr(lvm
.listing
.api
, 'Volumes', lambda: volumes
)
202 listing
= lvm
.listing
.List([])
204 {'lv_uuid': 'aaaa', 'pv_name': '/dev/sda1', 'pv_tags': '', 'pv_uuid': ''},
205 {'lv_uuid': 'aaaa', 'pv_name': '/dev/sdb1', 'pv_tags': '', 'pv_uuid': ''},
207 result
= listing
.single_report('VolGroup/lv')
208 assert result
['0'][0]['name'] == 'lv'
209 assert result
['0'][0]['lv_tags'] == tags
210 assert result
['0'][0]['path'] == '/dev/VolGroup/lv'
211 assert result
['0'][0]['devices'] == ['/dev/sda1', '/dev/sdb1']
213 def test_report_a_ceph_lv_with_no_matching_devices(self
, volumes
, monkeypatch
):
214 tags
= 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
216 lv_name
='lv', vg_name
='VolGroup',
217 lv_uuid
='aaaa', lv_path
='/dev/VolGroup/lv', lv_tags
=tags
220 monkeypatch
.setattr(lvm
.listing
.api
, 'Volumes', lambda: volumes
)
221 listing
= lvm
.listing
.List([])
223 {'lv_uuid': 'ffff', 'pv_name': '/dev/sda1', 'pv_tags': '', 'pv_uuid': ''},
224 {'lv_uuid': 'ffff', 'pv_name': '/dev/sdb1', 'pv_tags': '', 'pv_uuid': ''},
226 result
= listing
.single_report('VolGroup/lv')
227 assert result
['0'][0]['name'] == 'lv'
228 assert result
['0'][0]['lv_tags'] == tags
229 assert result
['0'][0]['path'] == '/dev/VolGroup/lv'
230 assert result
['0'][0]['devices'] == []
233 class TestListingPVs(object):
237 {'lv_uuid': 'ffff', 'pv_name': '/dev/sda1', 'pv_tags': '', 'pv_uuid': ''},
238 {'lv_uuid': 'ffff', 'pv_name': '/dev/sdb1', 'pv_tags': '', 'pv_uuid': ''},
241 def test_pvs_is_unset(self
, monkeypatch
):
242 monkeypatch
.setattr(lvm
.listing
.api
, 'get_api_pvs', lambda: self
.default_pvs
)
243 listing
= lvm
.listing
.List([])
244 assert listing
.pvs
== self
.default_pvs
246 def test_pvs_is_set(self
, monkeypatch
):
247 # keep it patched so that we can fail if this gets returned
248 monkeypatch
.setattr(lvm
.listing
.api
, 'get_api_pvs', lambda: self
.default_pvs
)
249 listing
= lvm
.listing
.List([])
251 assert listing
.pvs
== []