]>
git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/devices/lvm/test_listing.py
2 from ceph_volume
.devices
import lvm
3 from ceph_volume
.api
import lvm
as api
6 class TestReadableTag(object):
8 def test_dots_get_replaced(self
):
9 result
= lvm
.listing
.readable_tag('ceph.foo')
10 assert result
== 'foo'
12 def test_underscores_are_replaced_with_spaces(self
):
13 result
= lvm
.listing
.readable_tag('ceph.long_tag')
14 assert result
== 'long tag'
17 class TestPrettyReport(object):
19 def test_is_empty(self
, capsys
):
20 lvm
.listing
.pretty_report({})
21 stdout
, stderr
= capsys
.readouterr()
24 def test_type_and_path_are_reported(self
, capsys
):
25 lvm
.listing
.pretty_report({0: [{'type': 'data', 'path': '/dev/sda1'}]})
26 stdout
, stderr
= capsys
.readouterr()
27 assert '[data] /dev/sda1' in stdout
29 def test_osd_id_header_is_reported(self
, capsys
):
30 lvm
.listing
.pretty_report({0: [{'type': 'data', 'path': '/dev/sda1'}]})
31 stdout
, stderr
= capsys
.readouterr()
32 assert '====== osd.0 =======' in stdout
34 def test_tags_are_included(self
, capsys
):
35 lvm
.listing
.pretty_report(
39 'tags': {'ceph.osd_id': '0'}
42 stdout
, stderr
= capsys
.readouterr()
43 assert 'osd id' in stdout
46 class TestList(object):
48 def test_empty_full_json_zero_exit_status(self
, is_root
, volumes
, factory
, capsys
):
49 args
= factory(format
='json', device
=None)
50 lvm
.listing
.List([]).list(args
)
51 stdout
, stderr
= capsys
.readouterr()
52 assert stdout
== '{}\n'
54 def test_empty_device_json_zero_exit_status(self
, is_root
, volumes
, factory
, capsys
):
55 args
= factory(format
='json', device
='/dev/sda1')
56 lvm
.listing
.List([]).list(args
)
57 stdout
, stderr
= capsys
.readouterr()
58 assert stdout
== '{}\n'
60 def test_empty_full_zero_exit_status(self
, is_root
, volumes
, factory
):
61 args
= factory(format
='pretty', device
=None)
62 with pytest
.raises(SystemExit):
63 lvm
.listing
.List([]).list(args
)
65 def test_empty_device_zero_exit_status(self
, is_root
, volumes
, factory
):
66 args
= factory(format
='pretty', device
='/dev/sda1')
67 with pytest
.raises(SystemExit):
68 lvm
.listing
.List([]).list(args
)
71 class TestFullReport(object):
73 def test_no_ceph_lvs(self
, volumes
, monkeypatch
):
74 # ceph lvs are detected by looking into its tags
75 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/VolGroup/lv', lv_tags
={})
77 monkeypatch
.setattr(lvm
.listing
.api
, 'Volumes', lambda: volumes
)
78 result
= lvm
.listing
.List([]).full_report()
81 def test_ceph_data_lv_reported(self
, volumes
, monkeypatch
):
82 tags
= 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
84 lv_name
='volume1', lv_uuid
='y', lv_path
='/dev/VolGroup/lv', lv_tags
=tags
)
86 monkeypatch
.setattr(lvm
.listing
.api
, 'Volumes', lambda: volumes
)
87 result
= lvm
.listing
.List([]).full_report()
88 assert result
['0'][0]['name'] == 'volume1'
90 def test_ceph_journal_lv_reported(self
, volumes
, monkeypatch
):
91 tags
= 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
92 journal_tags
= 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=journal'
94 lv_name
='volume1', lv_uuid
='y', lv_path
='/dev/VolGroup/lv', lv_tags
=tags
)
96 lv_name
='journal', lv_uuid
='x', lv_path
='/dev/VolGroup/journal', lv_tags
=journal_tags
)
98 volumes
.append(journal
)
99 monkeypatch
.setattr(lvm
.listing
.api
, 'Volumes', lambda: volumes
)
100 result
= lvm
.listing
.List([]).full_report()
101 assert result
['0'][0]['name'] == 'volume1'
102 assert result
['0'][1]['name'] == 'journal'
104 def test_ceph_wal_lv_reported(self
, volumes
, monkeypatch
):
105 tags
= 'ceph.osd_id=0,ceph.wal_uuid=x,ceph.type=data'
106 wal_tags
= 'ceph.osd_id=0,ceph.wal_uuid=x,ceph.type=wal'
108 lv_name
='volume1', lv_uuid
='y', lv_path
='/dev/VolGroup/lv', lv_tags
=tags
)
110 lv_name
='wal', lv_uuid
='x', lv_path
='/dev/VolGroup/wal', lv_tags
=wal_tags
)
113 monkeypatch
.setattr(lvm
.listing
.api
, 'Volumes', lambda: volumes
)
114 result
= lvm
.listing
.List([]).full_report()
115 assert result
['0'][0]['name'] == 'volume1'
116 assert result
['0'][1]['name'] == 'wal'
118 def test_physical_journal_gets_reported(self
, volumes
, monkeypatch
):
119 tags
= 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
121 lv_name
='volume1', lv_uuid
='y', lv_path
='/dev/VolGroup/lv', lv_tags
=tags
)
123 monkeypatch
.setattr(lvm
.listing
.api
, 'Volumes', lambda: volumes
)
124 monkeypatch
.setattr(lvm
.listing
.disk
, 'get_device_from_partuuid', lambda x
: '/dev/sda1')
125 result
= lvm
.listing
.List([]).full_report()
126 assert result
['0'][1]['path'] == '/dev/sda1'
127 assert result
['0'][1]['tags'] == {'PARTUUID': 'x'}
128 assert result
['0'][1]['type'] == 'journal'
130 def test_physical_wal_gets_reported(self
, volumes
, monkeypatch
):
131 tags
= 'ceph.osd_id=0,ceph.wal_uuid=x,ceph.type=data'
133 lv_name
='volume1', lv_uuid
='y', lv_path
='/dev/VolGroup/lv', lv_tags
=tags
)
135 monkeypatch
.setattr(lvm
.listing
.api
, 'Volumes', lambda: volumes
)
136 monkeypatch
.setattr(lvm
.listing
.disk
, 'get_device_from_partuuid', lambda x
: '/dev/sda1')
137 result
= lvm
.listing
.List([]).full_report()
138 assert result
['0'][1]['path'] == '/dev/sda1'
139 assert result
['0'][1]['tags'] == {'PARTUUID': 'x'}
140 assert result
['0'][1]['type'] == 'wal'
143 class TestSingleReport(object):
145 def test_not_a_ceph_lv(self
, volumes
, monkeypatch
):
146 # ceph lvs are detected by looking into its tags
148 lv_name
='lv', vg_name
='VolGroup', lv_path
='/dev/VolGroup/lv', lv_tags
={})
150 monkeypatch
.setattr(lvm
.listing
.api
, 'Volumes', lambda: volumes
)
151 result
= lvm
.listing
.List([]).single_report('VolGroup/lv')
154 def test_report_a_ceph_lv(self
, volumes
, monkeypatch
):
155 # ceph lvs are detected by looking into its tags
156 tags
= 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
158 lv_name
='lv', vg_name
='VolGroup', lv_path
='/dev/VolGroup/lv', lv_tags
=tags
)
160 monkeypatch
.setattr(lvm
.listing
.api
, 'Volumes', lambda: volumes
)
161 result
= lvm
.listing
.List([]).single_report('VolGroup/lv')
162 assert result
['0'][0]['name'] == 'lv'
163 assert result
['0'][0]['lv_tags'] == tags
164 assert result
['0'][0]['path'] == '/dev/VolGroup/lv'
166 def test_report_a_ceph_journal_device(self
, volumes
, monkeypatch
):
167 # ceph lvs are detected by looking into its tags
168 tags
= 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data,ceph.journal_device=/dev/sda1'
170 lv_name
='lv', vg_name
='VolGroup', lv_path
='/dev/VolGroup/lv', lv_tags
=tags
)
172 monkeypatch
.setattr(lvm
.listing
.api
, 'Volumes', lambda: volumes
)
173 result
= lvm
.listing
.List([]).single_report('/dev/sda1')
174 assert result
['0'][0]['tags'] == {'PARTUUID': 'x'}
175 assert result
['0'][0]['type'] == 'journal'
176 assert result
['0'][0]['path'] == '/dev/sda1'