]> git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/devices/simple/test_activate.py
import 12.2.13 release
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / devices / simple / test_activate.py
1 import os
2 import pytest
3 from ceph_volume.devices.simple import activate
4
5
6 class TestActivate(object):
7
8 def test_no_data_uuid(self, factory, tmpfile, is_root, monkeypatch, capture):
9 json_config = tmpfile(contents='{}')
10 args = factory(osd_id='0', osd_fsid='1234', json_config=json_config)
11 with pytest.raises(RuntimeError):
12 activate.Activate([]).activate(args)
13
14 def test_invalid_json_path(self):
15 os.environ['CEPH_VOLUME_SIMPLE_JSON_DIR'] = '/non/existing/path'
16 with pytest.raises(RuntimeError) as error:
17 activate.Activate(['1', 'asdf']).main()
18 assert 'Expected JSON config path not found' in str(error.value)
19
20 def test_main_spits_help_with_no_arguments(self, capsys):
21 activate.Activate([]).main()
22 stdout, stderr = capsys.readouterr()
23 assert 'Activate OSDs by mounting devices previously configured' in stdout
24
25 def test_activate_all(self, is_root, monkeypatch):
26 '''
27 make sure Activate calls activate for each file returned by glob
28 '''
29 mocked_glob = []
30 def mock_glob(glob):
31 path = os.path.dirname(glob)
32 mocked_glob.extend(['{}/{}.json'.format(path, file_) for file_ in
33 ['1', '2', '3']])
34 return mocked_glob
35 activate_files = []
36 def mock_activate(self, args):
37 activate_files.append(args.json_config)
38 monkeypatch.setattr('glob.glob', mock_glob)
39 monkeypatch.setattr(activate.Activate, 'activate', mock_activate)
40 activate.Activate(['--all']).main()
41 assert activate_files == mocked_glob
42
43
44
45
46 class TestEnableSystemdUnits(object):
47
48 def test_nothing_is_activated(self, tmpfile, is_root, capsys):
49 json_config = tmpfile(contents='{}')
50 activation = activate.Activate(['--no-systemd', '--file', json_config, '0', '1234'], from_trigger=True)
51 activation.activate = lambda x: True
52 activation.main()
53 activation.enable_systemd_units('0', '1234')
54 stdout, stderr = capsys.readouterr()
55 assert 'Skipping enabling of `simple`' in stderr
56 assert 'Skipping masking of ceph-disk' in stderr
57 assert 'Skipping enabling and starting OSD simple' in stderr
58
59 def test_no_systemd_flag_is_true(self, tmpfile, is_root):
60 json_config = tmpfile(contents='{}')
61 activation = activate.Activate(['--no-systemd', '--file', json_config, '0', '1234'], from_trigger=True)
62 activation.activate = lambda x: True
63 activation.main()
64 assert activation.skip_systemd is True
65
66 def test_no_systemd_flag_is_false(self, tmpfile, is_root):
67 json_config = tmpfile(contents='{}')
68 activation = activate.Activate(['--file', json_config, '0', '1234'], from_trigger=True)
69 activation.activate = lambda x: True
70 activation.main()
71 assert activation.skip_systemd is False
72
73 def test_masks_ceph_disk(self, tmpfile, is_root, monkeypatch, capture):
74 monkeypatch.setattr('ceph_volume.systemd.systemctl.mask_ceph_disk', capture)
75 monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_volume', lambda *a: True)
76 monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_osd', lambda *a: True)
77 monkeypatch.setattr('ceph_volume.systemd.systemctl.start_osd', lambda *a: True)
78
79 json_config = tmpfile(contents='{}')
80 activation = activate.Activate(['--file', json_config, '0', '1234'], from_trigger=False)
81 activation.activate = lambda x: True
82 activation.main()
83 activation.enable_systemd_units('0', '1234')
84 assert len(capture.calls) == 1
85
86 def test_enables_simple_unit(self, tmpfile, is_root, monkeypatch, capture):
87 monkeypatch.setattr('ceph_volume.systemd.systemctl.mask_ceph_disk', lambda *a: True)
88 monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_volume', capture)
89 monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_osd', lambda *a: True)
90 monkeypatch.setattr('ceph_volume.systemd.systemctl.start_osd', lambda *a: True)
91
92 json_config = tmpfile(contents='{}')
93 activation = activate.Activate(['--file', json_config, '0', '1234'], from_trigger=False)
94 activation.activate = lambda x: True
95 activation.main()
96 activation.enable_systemd_units('0', '1234')
97 assert len(capture.calls) == 1
98 assert capture.calls[0]['args'] == ('0', '1234', 'simple')
99
100 def test_enables_osd_unit(self, tmpfile, is_root, monkeypatch, capture):
101 monkeypatch.setattr('ceph_volume.systemd.systemctl.mask_ceph_disk', lambda *a: True)
102 monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_volume', lambda *a: True)
103 monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_osd', capture)
104 monkeypatch.setattr('ceph_volume.systemd.systemctl.start_osd', lambda *a: True)
105
106 json_config = tmpfile(contents='{}')
107 activation = activate.Activate(['--file', json_config, '0', '1234'], from_trigger=False)
108 activation.activate = lambda x: True
109 activation.main()
110 activation.enable_systemd_units('0', '1234')
111 assert len(capture.calls) == 1
112 assert capture.calls[0]['args'] == ('0',)
113
114 def test_starts_osd_unit(self, tmpfile, is_root, monkeypatch, capture):
115 monkeypatch.setattr('ceph_volume.systemd.systemctl.mask_ceph_disk', lambda *a: True)
116 monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_volume', lambda *a: True)
117 monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_osd', lambda *a: True)
118 monkeypatch.setattr('ceph_volume.systemd.systemctl.start_osd', capture)
119
120 json_config = tmpfile(contents='{}')
121 activation = activate.Activate(['--file', json_config, '0', '1234'], from_trigger=False)
122 activation.activate = lambda x: True
123 activation.main()
124 activation.enable_systemd_units('0', '1234')
125 assert len(capture.calls) == 1
126 assert capture.calls[0]['args'] == ('0',)
127
128
129 class TestValidateDevices(object):
130
131 def test_filestore_missing_journal(self):
132 activation = activate.Activate([])
133 with pytest.raises(RuntimeError) as error:
134 activation.validate_devices({'type': 'filestore', 'data': {}})
135 assert 'Unable to activate filestore OSD due to missing devices' in str(error.value)
136
137 def test_filestore_missing_data(self):
138 activation = activate.Activate([])
139 with pytest.raises(RuntimeError) as error:
140 activation.validate_devices({'type': 'filestore', 'journal': {}})
141 assert 'Unable to activate filestore OSD due to missing devices' in str(error.value)
142
143 def test_filestore_journal_device_found(self, capsys):
144 activation = activate.Activate([])
145 with pytest.raises(RuntimeError):
146 activation.validate_devices({'type': 'filestore', 'journal': {}})
147 stdout, stderr = capsys.readouterr()
148 assert "devices found: ['journal']" in stderr
149
150 def test_filestore_data_device_found(self, capsys):
151 activation = activate.Activate([])
152 with pytest.raises(RuntimeError):
153 activation.validate_devices({'type': 'filestore', 'data': {}})
154 stdout, stderr = capsys.readouterr()
155 assert "devices found: ['data']" in stderr
156
157 def test_filestore_with_all_devices(self):
158 activation = activate.Activate([])
159 result = activation.validate_devices({'type': 'filestore', 'journal': {}, 'data': {}})
160 assert result is True
161
162 def test_filestore_without_type(self):
163 activation = activate.Activate([])
164 result = activation.validate_devices({'journal': {}, 'data': {}})
165 assert result is True
166
167 def test_bluestore_with_all_devices(self):
168 activation = activate.Activate([])
169 result = activation.validate_devices({'type': 'bluestore', 'data': {}, 'block': {}})
170 assert result is True
171
172 def test_bluestore_without_type(self):
173 activation = activate.Activate([])
174 result = activation.validate_devices({'data': {}, 'block': {}})
175 assert result is True
176
177 def test_bluestore_is_default(self):
178 activation = activate.Activate([])
179 result = activation.validate_devices({'data': {}, 'block': {}})
180 assert result is True
181
182 def test_bluestore_data_device_found(self, capsys):
183 activation = activate.Activate([])
184 with pytest.raises(RuntimeError):
185 activation.validate_devices({'data': {}})
186 stdout, stderr = capsys.readouterr()
187 assert "devices found: ['data']" in stderr
188
189 def test_bluestore_missing_data(self):
190 activation = activate.Activate([])
191 with pytest.raises(RuntimeError) as error:
192 activation.validate_devices({'type': 'bluestore', 'block': {}})
193 assert 'Unable to activate bluestore OSD due to missing devices' in str(error.value)
194
195 def test_bluestore_block_device_found(self, capsys):
196 activation = activate.Activate([])
197 with pytest.raises(RuntimeError):
198 activation.validate_devices({'block': {}})
199 stdout, stderr = capsys.readouterr()
200 assert "devices found: ['block']" in stderr