]> git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/devices/lvm/test_zap.py
import ceph 14.2.5
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / devices / lvm / test_zap.py
1 import os
2 import pytest
3 from ceph_volume.api import lvm as api
4 from ceph_volume.devices.lvm import zap
5
6
7 class TestFindAssociatedDevices(object):
8
9 def test_no_lvs_found_that_match_id(self, volumes, monkeypatch, device_info):
10 monkeypatch.setattr(zap.api, 'Volumes', lambda: volumes)
11 tags = 'ceph.osd_id=9,ceph.journal_uuid=x,ceph.type=data'
12 osd = api.Volume(
13 lv_name='volume1', lv_uuid='y', lv_path='/dev/VolGroup/lv', vg_name='vg', lv_tags=tags)
14 volumes.append(osd)
15 with pytest.raises(RuntimeError):
16 zap.find_associated_devices(osd_id=10)
17
18 def test_no_lvs_found_that_match_fsid(self, volumes, monkeypatch, device_info):
19 monkeypatch.setattr(zap.api, 'Volumes', lambda: volumes)
20 tags = 'ceph.osd_id=9,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=data'
21 osd = api.Volume(
22 lv_name='volume1', lv_uuid='y', lv_path='/dev/VolGroup/lv', vg_name='vg', lv_tags=tags)
23 volumes.append(osd)
24 with pytest.raises(RuntimeError):
25 zap.find_associated_devices(osd_fsid='aaaa-lkjh')
26
27 def test_no_lvs_found_that_match_id_fsid(self, volumes, monkeypatch, device_info):
28 monkeypatch.setattr(zap.api, 'Volumes', lambda: volumes)
29 tags = 'ceph.osd_id=9,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=data'
30 osd = api.Volume(
31 lv_name='volume1', lv_uuid='y', lv_path='/dev/VolGroup/lv', vg_name='vg', lv_tags=tags)
32 volumes.append(osd)
33 with pytest.raises(RuntimeError):
34 zap.find_associated_devices(osd_id='9', osd_fsid='aaaa-lkjh')
35
36 def test_no_ceph_lvs_found(self, volumes, monkeypatch):
37 monkeypatch.setattr(zap.api, 'Volumes', lambda: volumes)
38 osd = api.Volume(
39 lv_name='volume1', lv_uuid='y', lv_path='/dev/VolGroup/lv', lv_tags='')
40 volumes.append(osd)
41 with pytest.raises(RuntimeError):
42 zap.find_associated_devices(osd_id=100)
43
44 def test_lv_is_matched_id(self, volumes, monkeypatch):
45 monkeypatch.setattr(zap.api, 'Volumes', lambda: volumes)
46 tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
47 osd = api.Volume(
48 lv_name='volume1', lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/lv', lv_tags=tags)
49 volumes.append(osd)
50 result = zap.find_associated_devices(osd_id='0')
51 assert result[0].abspath == '/dev/VolGroup/lv'
52
53 def test_lv_is_matched_fsid(self, volumes, monkeypatch):
54 monkeypatch.setattr(zap.api, 'Volumes', lambda: volumes)
55 tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=data'
56 osd = api.Volume(
57 lv_name='volume1', lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/lv', lv_tags=tags)
58 volumes.append(osd)
59 result = zap.find_associated_devices(osd_fsid='asdf-lkjh')
60 assert result[0].abspath == '/dev/VolGroup/lv'
61
62 def test_lv_is_matched_id_fsid(self, volumes, monkeypatch):
63 monkeypatch.setattr(zap.api, 'Volumes', lambda: volumes)
64 tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=data'
65 osd = api.Volume(
66 lv_name='volume1', lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/lv', lv_tags=tags)
67 volumes.append(osd)
68 result = zap.find_associated_devices(osd_id='0', osd_fsid='asdf-lkjh')
69 assert result[0].abspath == '/dev/VolGroup/lv'
70
71
72 class TestEnsureAssociatedLVs(object):
73
74 def test_nothing_is_found(self, volumes):
75 result = zap.ensure_associated_lvs(volumes)
76 assert result == []
77
78 def test_data_is_found(self, volumes):
79 tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=data'
80 osd = api.Volume(
81 lv_name='volume1', lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/data', lv_tags=tags)
82 volumes.append(osd)
83 result = zap.ensure_associated_lvs(volumes)
84 assert result == ['/dev/VolGroup/data']
85
86 def test_block_is_found(self, volumes):
87 tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=block'
88 osd = api.Volume(
89 lv_name='volume1', lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/block', lv_tags=tags)
90 volumes.append(osd)
91 result = zap.ensure_associated_lvs(volumes)
92 assert result == ['/dev/VolGroup/block']
93
94 def test_success_message_for_fsid(self, factory, is_root, capsys):
95 cli_zap = zap.Zap([])
96 args = factory(devices=[], osd_id=None, osd_fsid='asdf-lkjh')
97 cli_zap.args = args
98 cli_zap.zap()
99 out, err = capsys.readouterr()
100 assert "Zapping successful for OSD: asdf-lkjh" in err
101
102 def test_success_message_for_id(self, factory, is_root, capsys):
103 cli_zap = zap.Zap([])
104 args = factory(devices=[], osd_id='1', osd_fsid=None)
105 cli_zap.args = args
106 cli_zap.zap()
107 out, err = capsys.readouterr()
108 assert "Zapping successful for OSD: 1" in err
109
110 def test_block_and_partition_are_found(self, volumes, monkeypatch):
111 monkeypatch.setattr(zap.disk, 'get_device_from_partuuid', lambda x: '/dev/sdb1')
112 tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=block'
113 osd = api.Volume(
114 lv_name='volume1', lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/block', lv_tags=tags)
115 volumes.append(osd)
116 result = zap.ensure_associated_lvs(volumes)
117 assert '/dev/sdb1' in result
118 assert '/dev/VolGroup/block' in result
119
120 def test_journal_is_found(self, volumes):
121 tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=journal'
122 osd = api.Volume(
123 lv_name='volume1', lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/lv', lv_tags=tags)
124 volumes.append(osd)
125 result = zap.ensure_associated_lvs(volumes)
126 assert result == ['/dev/VolGroup/lv']
127
128 def test_multiple_journals_are_found(self, volumes):
129 tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=journal'
130 for i in range(3):
131 osd = api.Volume(
132 lv_name='volume%s' % i, lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/lv%s' % i, lv_tags=tags)
133 volumes.append(osd)
134 result = zap.ensure_associated_lvs(volumes)
135 assert '/dev/VolGroup/lv0' in result
136 assert '/dev/VolGroup/lv1' in result
137 assert '/dev/VolGroup/lv2' in result
138
139 def test_multiple_dbs_are_found(self, volumes):
140 tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.journal_uuid=x,ceph.type=db'
141 for i in range(3):
142 osd = api.Volume(
143 lv_name='volume%s' % i, lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/lv%s' % i, lv_tags=tags)
144 volumes.append(osd)
145 result = zap.ensure_associated_lvs(volumes)
146 assert '/dev/VolGroup/lv0' in result
147 assert '/dev/VolGroup/lv1' in result
148 assert '/dev/VolGroup/lv2' in result
149
150 def test_multiple_wals_are_found(self, volumes):
151 tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.wal_uuid=x,ceph.type=wal'
152 for i in range(3):
153 osd = api.Volume(
154 lv_name='volume%s' % i, lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/lv%s' % i, lv_tags=tags)
155 volumes.append(osd)
156 result = zap.ensure_associated_lvs(volumes)
157 assert '/dev/VolGroup/lv0' in result
158 assert '/dev/VolGroup/lv1' in result
159 assert '/dev/VolGroup/lv2' in result
160
161 def test_multiple_backing_devs_are_found(self, volumes):
162 for _type in ['journal', 'db', 'wal']:
163 tags = 'ceph.osd_id=0,ceph.osd_fsid=asdf-lkjh,ceph.wal_uuid=x,ceph.type=%s' % _type
164 osd = api.Volume(
165 lv_name='volume%s' % _type, lv_uuid='y', vg_name='', lv_path='/dev/VolGroup/lv%s' % _type, lv_tags=tags)
166 volumes.append(osd)
167 result = zap.ensure_associated_lvs(volumes)
168 assert '/dev/VolGroup/lvjournal' in result
169 assert '/dev/VolGroup/lvwal' in result
170 assert '/dev/VolGroup/lvdb' in result
171
172
173 class TestWipeFs(object):
174
175 def setup(self):
176 os.environ['CEPH_VOLUME_WIPEFS_INTERVAL'] = '0'
177
178 def test_works_on_second_try(self, stub_call):
179 os.environ['CEPH_VOLUME_WIPEFS_TRIES'] = '2'
180 stub_call([('wiping /dev/sda', '', 1), ('', '', 0)])
181 result = zap.wipefs('/dev/sda')
182 assert result is None
183
184 def test_does_not_work_after_several_tries(self, stub_call):
185 os.environ['CEPH_VOLUME_WIPEFS_TRIES'] = '2'
186 stub_call([('wiping /dev/sda', '', 1), ('', '', 1)])
187 with pytest.raises(RuntimeError):
188 zap.wipefs('/dev/sda')
189
190 def test_does_not_work_default_tries(self, stub_call):
191 stub_call([('wiping /dev/sda', '', 1)]*8)
192 with pytest.raises(RuntimeError):
193 zap.wipefs('/dev/sda')