]> git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/devices/simple/test_activate.py
update sources to 12.2.10
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / devices / simple / test_activate.py
1 import os
2 import pytest
3 from ceph_volume.devices.simple import activate
4
5
6 class TestActivate(object):
7
8 def test_no_data_uuid(self, factory, tmpfile, is_root, monkeypatch, capture):
9 json_config = tmpfile(contents='{}')
10 args = factory(osd_id='0', osd_fsid='1234', json_config=json_config)
11 with pytest.raises(RuntimeError):
12 activate.Activate([]).activate(args)
13
14 def test_invalid_json_path(self):
15 os.environ['CEPH_VOLUME_SIMPLE_JSON_DIR'] = '/non/existing/path'
16 with pytest.raises(RuntimeError) as error:
17 activate.Activate(['1', 'asdf']).main()
18 assert 'RuntimeError: Expected JSON config path not found' in str(error)
19
20 def test_main_spits_help_with_no_arguments(self, capsys):
21 activate.Activate([]).main()
22 stdout, stderr = capsys.readouterr()
23 assert 'Activate OSDs by mounting devices previously configured' in stdout
24
25
26 class TestEnableSystemdUnits(object):
27
28 def test_nothing_is_activated(self, tmpfile, is_root, capsys):
29 json_config = tmpfile(contents='{}')
30 activation = activate.Activate(['--no-systemd', '--file', json_config, '0', '1234'], from_trigger=True)
31 activation.activate = lambda x: True
32 activation.main()
33 activation.enable_systemd_units('0', '1234')
34 out, err = capsys.readouterr()
35 assert 'Skipping enabling of `simple`' in out
36 assert 'Skipping masking of ceph-disk' in out
37 assert 'Skipping enabling and starting OSD simple' in out
38
39 def test_no_systemd_flag_is_true(self, tmpfile, is_root):
40 json_config = tmpfile(contents='{}')
41 activation = activate.Activate(['--no-systemd', '--file', json_config, '0', '1234'], from_trigger=True)
42 activation.activate = lambda x: True
43 activation.main()
44 assert activation.skip_systemd is True
45
46 def test_no_systemd_flag_is_false(self, tmpfile, is_root):
47 json_config = tmpfile(contents='{}')
48 activation = activate.Activate(['--file', json_config, '0', '1234'], from_trigger=True)
49 activation.activate = lambda x: True
50 activation.main()
51 assert activation.skip_systemd is False
52
53 def test_masks_ceph_disk(self, tmpfile, is_root, monkeypatch, capture):
54 monkeypatch.setattr('ceph_volume.systemd.systemctl.mask_ceph_disk', capture)
55 monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_volume', lambda *a: True)
56 monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_osd', lambda *a: True)
57 monkeypatch.setattr('ceph_volume.systemd.systemctl.start_osd', lambda *a: True)
58
59 json_config = tmpfile(contents='{}')
60 activation = activate.Activate(['--file', json_config, '0', '1234'], from_trigger=False)
61 activation.activate = lambda x: True
62 activation.main()
63 activation.enable_systemd_units('0', '1234')
64 assert len(capture.calls) == 1
65
66 def test_enables_simple_unit(self, tmpfile, is_root, monkeypatch, capture):
67 monkeypatch.setattr('ceph_volume.systemd.systemctl.mask_ceph_disk', lambda *a: True)
68 monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_volume', capture)
69 monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_osd', lambda *a: True)
70 monkeypatch.setattr('ceph_volume.systemd.systemctl.start_osd', lambda *a: True)
71
72 json_config = tmpfile(contents='{}')
73 activation = activate.Activate(['--file', json_config, '0', '1234'], from_trigger=False)
74 activation.activate = lambda x: True
75 activation.main()
76 activation.enable_systemd_units('0', '1234')
77 assert len(capture.calls) == 1
78 assert capture.calls[0]['args'] == ('0', '1234', 'simple')
79
80 def test_enables_osd_unit(self, tmpfile, is_root, monkeypatch, capture):
81 monkeypatch.setattr('ceph_volume.systemd.systemctl.mask_ceph_disk', lambda *a: True)
82 monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_volume', lambda *a: True)
83 monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_osd', capture)
84 monkeypatch.setattr('ceph_volume.systemd.systemctl.start_osd', lambda *a: True)
85
86 json_config = tmpfile(contents='{}')
87 activation = activate.Activate(['--file', json_config, '0', '1234'], from_trigger=False)
88 activation.activate = lambda x: True
89 activation.main()
90 activation.enable_systemd_units('0', '1234')
91 assert len(capture.calls) == 1
92 assert capture.calls[0]['args'] == ('0',)
93
94 def test_starts_osd_unit(self, tmpfile, is_root, monkeypatch, capture):
95 monkeypatch.setattr('ceph_volume.systemd.systemctl.mask_ceph_disk', lambda *a: True)
96 monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_volume', lambda *a: True)
97 monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_osd', lambda *a: True)
98 monkeypatch.setattr('ceph_volume.systemd.systemctl.start_osd', capture)
99
100 json_config = tmpfile(contents='{}')
101 activation = activate.Activate(['--file', json_config, '0', '1234'], from_trigger=False)
102 activation.activate = lambda x: True
103 activation.main()
104 activation.enable_systemd_units('0', '1234')
105 assert len(capture.calls) == 1
106 assert capture.calls[0]['args'] == ('0',)
107
108
109 class TestValidateDevices(object):
110
111 def test_filestore_missing_journal(self):
112 activation = activate.Activate([])
113 with pytest.raises(RuntimeError) as error:
114 activation.validate_devices({'type': 'filestore', 'data': {}})
115 assert 'Unable to activate filestore OSD due to missing devices' in str(error)
116
117 def test_filestore_missing_data(self):
118 activation = activate.Activate([])
119 with pytest.raises(RuntimeError) as error:
120 activation.validate_devices({'type': 'filestore', 'journal': {}})
121 assert 'Unable to activate filestore OSD due to missing devices' in str(error)
122
123 def test_filestore_journal_device_found(self, capsys):
124 activation = activate.Activate([])
125 with pytest.raises(RuntimeError):
126 activation.validate_devices({'type': 'filestore', 'journal': {}})
127 stdout, stderr = capsys.readouterr()
128 assert "devices found: ['journal']" in stdout
129
130 def test_filestore_data_device_found(self, capsys):
131 activation = activate.Activate([])
132 with pytest.raises(RuntimeError):
133 activation.validate_devices({'type': 'filestore', 'data': {}})
134 stdout, stderr = capsys.readouterr()
135 assert "devices found: ['data']" in stdout
136
137 def test_filestore_with_all_devices(self):
138 activation = activate.Activate([])
139 result = activation.validate_devices({'type': 'filestore', 'journal': {}, 'data': {}})
140 assert result is True
141
142 def test_bluestore_with_all_devices(self):
143 activation = activate.Activate([])
144 result = activation.validate_devices({'type': 'bluestore', 'data': {}, 'block': {}})
145 assert result is True
146
147 def test_bluestore_is_default(self):
148 activation = activate.Activate([])
149 result = activation.validate_devices({'data': {}, 'block': {}})
150 assert result is True
151
152 def test_bluestore_data_device_found(self, capsys):
153 activation = activate.Activate([])
154 with pytest.raises(RuntimeError):
155 activation.validate_devices({'data': {}})
156 stdout, stderr = capsys.readouterr()
157 assert "devices found: ['data']" in stdout
158
159 def test_bluestore_missing_data(self):
160 activation = activate.Activate([])
161 with pytest.raises(RuntimeError) as error:
162 activation.validate_devices({'type': 'bluestore', 'block': {}})
163 assert 'Unable to activate bluestore OSD due to missing devices' in str(error)
164
165 def test_bluestore_block_device_found(self, capsys):
166 activation = activate.Activate([])
167 with pytest.raises(RuntimeError):
168 activation.validate_devices({'block': {}})
169 stdout, stderr = capsys.readouterr()
170 assert "devices found: ['block']" in stdout