]>
git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/devices/simple/test_activate.py
3 from ceph_volume
.devices
.simple
import activate
6 class TestActivate(object):
8 def test_no_data_uuid(self
, factory
, tmpfile
, is_root
, monkeypatch
, capture
):
9 json_config
= tmpfile(contents
='{}')
10 args
= factory(osd_id
='0', osd_fsid
='1234', json_config
=json_config
)
11 with pytest
.raises(RuntimeError):
12 activate
.Activate([]).activate(args
)
14 def test_invalid_json_path(self
):
15 os
.environ
['CEPH_VOLUME_SIMPLE_JSON_DIR'] = '/non/existing/path'
16 with pytest
.raises(RuntimeError) as error
:
17 activate
.Activate(['1', 'asdf']).main()
18 assert 'Expected JSON config path not found' in str(error
.value
)
20 def test_main_spits_help_with_no_arguments(self
, capsys
):
21 activate
.Activate([]).main()
22 stdout
, stderr
= capsys
.readouterr()
23 assert 'Activate OSDs by mounting devices previously configured' in stdout
25 def test_activate_all(self
, is_root
, monkeypatch
):
27 make sure Activate calls activate for each file returned by glob
31 path
= os
.path
.dirname(glob
)
32 mocked_glob
.extend(['{}/{}.json'.format(path
, file_
) for file_
in
36 def mock_activate(self
, args
):
37 activate_files
.append(args
.json_config
)
38 monkeypatch
.setattr('glob.glob', mock_glob
)
39 monkeypatch
.setattr(activate
.Activate
, 'activate', mock_activate
)
40 activate
.Activate(['--all']).main()
41 assert activate_files
== mocked_glob
46 class TestEnableSystemdUnits(object):
48 def test_nothing_is_activated(self
, tmpfile
, is_root
, capsys
):
49 json_config
= tmpfile(contents
='{}')
50 activation
= activate
.Activate(['--no-systemd', '--file', json_config
, '0', '1234'], from_trigger
=True)
51 activation
.activate
= lambda x
: True
53 activation
.enable_systemd_units('0', '1234')
54 stdout
, stderr
= capsys
.readouterr()
55 assert 'Skipping enabling of `simple`' in stderr
56 assert 'Skipping masking of ceph-disk' in stderr
57 assert 'Skipping enabling and starting OSD simple' in stderr
59 def test_no_systemd_flag_is_true(self
, tmpfile
, is_root
):
60 json_config
= tmpfile(contents
='{}')
61 activation
= activate
.Activate(['--no-systemd', '--file', json_config
, '0', '1234'], from_trigger
=True)
62 activation
.activate
= lambda x
: True
64 assert activation
.skip_systemd
is True
66 def test_no_systemd_flag_is_false(self
, tmpfile
, is_root
):
67 json_config
= tmpfile(contents
='{}')
68 activation
= activate
.Activate(['--file', json_config
, '0', '1234'], from_trigger
=True)
69 activation
.activate
= lambda x
: True
71 assert activation
.skip_systemd
is False
73 def test_masks_ceph_disk(self
, tmpfile
, is_root
, monkeypatch
, capture
):
74 monkeypatch
.setattr('ceph_volume.systemd.systemctl.mask_ceph_disk', capture
)
75 monkeypatch
.setattr('ceph_volume.systemd.systemctl.enable_volume', lambda *a
: True)
76 monkeypatch
.setattr('ceph_volume.systemd.systemctl.enable_osd', lambda *a
: True)
77 monkeypatch
.setattr('ceph_volume.systemd.systemctl.start_osd', lambda *a
: True)
79 json_config
= tmpfile(contents
='{}')
80 activation
= activate
.Activate(['--file', json_config
, '0', '1234'], from_trigger
=False)
81 activation
.activate
= lambda x
: True
83 activation
.enable_systemd_units('0', '1234')
84 assert len(capture
.calls
) == 1
86 def test_enables_simple_unit(self
, tmpfile
, is_root
, monkeypatch
, capture
):
87 monkeypatch
.setattr('ceph_volume.systemd.systemctl.mask_ceph_disk', lambda *a
: True)
88 monkeypatch
.setattr('ceph_volume.systemd.systemctl.enable_volume', capture
)
89 monkeypatch
.setattr('ceph_volume.systemd.systemctl.enable_osd', lambda *a
: True)
90 monkeypatch
.setattr('ceph_volume.systemd.systemctl.start_osd', lambda *a
: True)
92 json_config
= tmpfile(contents
='{}')
93 activation
= activate
.Activate(['--file', json_config
, '0', '1234'], from_trigger
=False)
94 activation
.activate
= lambda x
: True
96 activation
.enable_systemd_units('0', '1234')
97 assert len(capture
.calls
) == 1
98 assert capture
.calls
[0]['args'] == ('0', '1234', 'simple')
100 def test_enables_osd_unit(self
, tmpfile
, is_root
, monkeypatch
, capture
):
101 monkeypatch
.setattr('ceph_volume.systemd.systemctl.mask_ceph_disk', lambda *a
: True)
102 monkeypatch
.setattr('ceph_volume.systemd.systemctl.enable_volume', lambda *a
: True)
103 monkeypatch
.setattr('ceph_volume.systemd.systemctl.enable_osd', capture
)
104 monkeypatch
.setattr('ceph_volume.systemd.systemctl.start_osd', lambda *a
: True)
106 json_config
= tmpfile(contents
='{}')
107 activation
= activate
.Activate(['--file', json_config
, '0', '1234'], from_trigger
=False)
108 activation
.activate
= lambda x
: True
110 activation
.enable_systemd_units('0', '1234')
111 assert len(capture
.calls
) == 1
112 assert capture
.calls
[0]['args'] == ('0',)
114 def test_starts_osd_unit(self
, tmpfile
, is_root
, monkeypatch
, capture
):
115 monkeypatch
.setattr('ceph_volume.systemd.systemctl.mask_ceph_disk', lambda *a
: True)
116 monkeypatch
.setattr('ceph_volume.systemd.systemctl.enable_volume', lambda *a
: True)
117 monkeypatch
.setattr('ceph_volume.systemd.systemctl.enable_osd', lambda *a
: True)
118 monkeypatch
.setattr('ceph_volume.systemd.systemctl.start_osd', capture
)
120 json_config
= tmpfile(contents
='{}')
121 activation
= activate
.Activate(['--file', json_config
, '0', '1234'], from_trigger
=False)
122 activation
.activate
= lambda x
: True
124 activation
.enable_systemd_units('0', '1234')
125 assert len(capture
.calls
) == 1
126 assert capture
.calls
[0]['args'] == ('0',)
129 class TestValidateDevices(object):
131 def test_filestore_missing_journal(self
):
132 activation
= activate
.Activate([])
133 with pytest
.raises(RuntimeError) as error
:
134 activation
.validate_devices({'type': 'filestore', 'data': {}})
135 assert 'Unable to activate filestore OSD due to missing devices' in str(error
.value
)
137 def test_filestore_missing_data(self
):
138 activation
= activate
.Activate([])
139 with pytest
.raises(RuntimeError) as error
:
140 activation
.validate_devices({'type': 'filestore', 'journal': {}})
141 assert 'Unable to activate filestore OSD due to missing devices' in str(error
.value
)
143 def test_filestore_journal_device_found(self
, capsys
):
144 activation
= activate
.Activate([])
145 with pytest
.raises(RuntimeError):
146 activation
.validate_devices({'type': 'filestore', 'journal': {}})
147 stdout
, stderr
= capsys
.readouterr()
148 assert "devices found: ['journal']" in stderr
150 def test_filestore_data_device_found(self
, capsys
):
151 activation
= activate
.Activate([])
152 with pytest
.raises(RuntimeError):
153 activation
.validate_devices({'type': 'filestore', 'data': {}})
154 stdout
, stderr
= capsys
.readouterr()
155 assert "devices found: ['data']" in stderr
157 def test_filestore_with_all_devices(self
):
158 activation
= activate
.Activate([])
159 result
= activation
.validate_devices({'type': 'filestore', 'journal': {}, 'data': {}})
160 assert result
is True
162 def test_filestore_without_type(self
):
163 activation
= activate
.Activate([])
164 result
= activation
.validate_devices({'journal': {}, 'data': {}})
165 assert result
is True
167 def test_bluestore_with_all_devices(self
):
168 activation
= activate
.Activate([])
169 result
= activation
.validate_devices({'type': 'bluestore', 'data': {}, 'block': {}})
170 assert result
is True
172 def test_bluestore_without_type(self
):
173 activation
= activate
.Activate([])
174 result
= activation
.validate_devices({'data': {}, 'block': {}})
175 assert result
is True
177 def test_bluestore_is_default(self
):
178 activation
= activate
.Activate([])
179 result
= activation
.validate_devices({'data': {}, 'block': {}})
180 assert result
is True
182 def test_bluestore_data_device_found(self
, capsys
):
183 activation
= activate
.Activate([])
184 with pytest
.raises(RuntimeError):
185 activation
.validate_devices({'data': {}})
186 stdout
, stderr
= capsys
.readouterr()
187 assert "devices found: ['data']" in stderr
189 def test_bluestore_missing_data(self
):
190 activation
= activate
.Activate([])
191 with pytest
.raises(RuntimeError) as error
:
192 activation
.validate_devices({'type': 'bluestore', 'block': {}})
193 assert 'Unable to activate bluestore OSD due to missing devices' in str(error
.value
)
195 def test_bluestore_block_device_found(self
, capsys
):
196 activation
= activate
.Activate([])
197 with pytest
.raises(RuntimeError):
198 activation
.validate_devices({'block': {}})
199 stdout
, stderr
= capsys
.readouterr()
200 assert "devices found: ['block']" in stderr