]> git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/devices/simple/test_activate.py
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / devices / simple / test_activate.py
1 import os
2 import pytest
3 from ceph_volume.devices.simple import activate
4
5
6 class TestActivate(object):
7
8 def test_no_data_uuid(self, factory, is_root, monkeypatch, capture, fake_filesystem):
9 fake_filesystem.create_file('/tmp/json-config', contents='{}')
10 args = factory(osd_id='0', osd_fsid='1234', json_config='/tmp/json-config')
11 with pytest.raises(RuntimeError):
12 activate.Activate([]).activate(args)
13
14 def test_invalid_json_path(self):
15 os.environ['CEPH_VOLUME_SIMPLE_JSON_DIR'] = '/non/existing/path'
16 with pytest.raises(RuntimeError) as error:
17 activate.Activate(['1', 'asdf']).main()
18 assert 'Expected JSON config path not found' in str(error.value)
19
20 def test_main_spits_help_with_no_arguments(self, capsys):
21 activate.Activate([]).main()
22 stdout, stderr = capsys.readouterr()
23 assert 'Activate OSDs by mounting devices previously configured' in stdout
24
25 def test_activate_all(self, is_root, monkeypatch):
26 '''
27 make sure Activate calls activate for each file returned by glob
28 '''
29 mocked_glob = []
30 def mock_glob(glob):
31 path = os.path.dirname(glob)
32 mocked_glob.extend(['{}/{}.json'.format(path, file_) for file_ in
33 ['1', '2', '3']])
34 return mocked_glob
35 activate_files = []
36 def mock_activate(self, args):
37 activate_files.append(args.json_config)
38 monkeypatch.setattr('glob.glob', mock_glob)
39 monkeypatch.setattr(activate.Activate, 'activate', mock_activate)
40 activate.Activate(['--all']).main()
41 assert activate_files == mocked_glob
42
43
44
45
46 class TestEnableSystemdUnits(object):
47
48 def test_nothing_is_activated(self, is_root, capsys, fake_filesystem):
49 fake_filesystem.create_file('/tmp/json-config', contents='{}')
50 activation = activate.Activate(['--no-systemd', '--file', '/tmp/json-config', '0', '1234'], from_trigger=True)
51 activation.activate = lambda x: True
52 activation.main()
53 activation.enable_systemd_units('0', '1234')
54 stdout, stderr = capsys.readouterr()
55 assert 'Skipping enabling of `simple`' in stderr
56 assert 'Skipping masking of ceph-disk' in stderr
57 assert 'Skipping enabling and starting OSD simple' in stderr
58
59 def test_no_systemd_flag_is_true(self, is_root, fake_filesystem):
60 fake_filesystem.create_file('/tmp/json-config', contents='{}')
61 activation = activate.Activate(['--no-systemd', '--file', '/tmp/json-config', '0', '1234'], from_trigger=True)
62 activation.activate = lambda x: True
63 activation.main()
64 assert activation.skip_systemd is True
65
66 def test_no_systemd_flag_is_false(self, is_root, fake_filesystem):
67 fake_filesystem.create_file('/tmp/json-config', contents='{}')
68 activation = activate.Activate(['--file', '/tmp/json-config', '0', '1234'], from_trigger=True)
69 activation.activate = lambda x: True
70 activation.main()
71 assert activation.skip_systemd is False
72
73 def test_masks_ceph_disk(self, is_root, monkeypatch, capture, fake_filesystem):
74 monkeypatch.setattr('ceph_volume.systemd.systemctl.mask_ceph_disk', capture)
75 monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_volume', lambda *a: True)
76 monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_osd', lambda *a: True)
77 monkeypatch.setattr('ceph_volume.systemd.systemctl.start_osd', lambda *a: True)
78
79 fake_filesystem.create_file('/tmp/json-config', contents='{}')
80 activation = activate.Activate(['--file', '/tmp/json-config', '0', '1234'], from_trigger=False)
81 activation.activate = lambda x: True
82 activation.main()
83 activation.enable_systemd_units('0', '1234')
84 assert len(capture.calls) == 1
85
86 def test_enables_simple_unit(self, is_root, monkeypatch, capture, fake_filesystem):
87 monkeypatch.setattr('ceph_volume.systemd.systemctl.mask_ceph_disk', lambda *a: True)
88 monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_volume', capture)
89 monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_osd', lambda *a: True)
90 monkeypatch.setattr('ceph_volume.systemd.systemctl.start_osd', lambda *a: True)
91
92 fake_filesystem.create_file('/tmp/json-config', contents='{}')
93 activation = activate.Activate(['--file', '/tmp/json-config', '0', '1234'], from_trigger=False)
94 activation.activate = lambda x: True
95 activation.main()
96 activation.enable_systemd_units('0', '1234')
97 assert len(capture.calls) == 1
98 assert capture.calls[0]['args'] == ('0', '1234', 'simple')
99
100 def test_enables_osd_unit(self, is_root, monkeypatch, capture, fake_filesystem):
101 monkeypatch.setattr('ceph_volume.systemd.systemctl.mask_ceph_disk', lambda *a: True)
102 monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_volume', lambda *a: True)
103 monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_osd', capture)
104 monkeypatch.setattr('ceph_volume.systemd.systemctl.start_osd', lambda *a: True)
105
106 fake_filesystem.create_file('/tmp/json-config', contents='{}')
107 activation = activate.Activate(['--file', '/tmp/json-config', '0', '1234'], from_trigger=False)
108 activation.activate = lambda x: True
109 activation.main()
110 activation.enable_systemd_units('0', '1234')
111 assert len(capture.calls) == 1
112 assert capture.calls[0]['args'] == ('0',)
113
114 def test_starts_osd_unit(self, is_root, monkeypatch, capture, fake_filesystem):
115 monkeypatch.setattr('ceph_volume.systemd.systemctl.mask_ceph_disk', lambda *a: True)
116 monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_volume', lambda *a: True)
117 monkeypatch.setattr('ceph_volume.systemd.systemctl.enable_osd', lambda *a: True)
118 monkeypatch.setattr('ceph_volume.systemd.systemctl.start_osd', capture)
119
120 fake_filesystem.create_file('/tmp/json-config', contents='{}')
121 activation = activate.Activate(['--file', '/tmp/json-config', '0', '1234'], from_trigger=False)
122 activation.activate = lambda x: True
123 activation.main()
124 activation.enable_systemd_units('0', '1234')
125 assert len(capture.calls) == 1
126 assert capture.calls[0]['args'] == ('0',)
127
128
129 class TestValidateDevices(object):
130
131 def test_bluestore_with_all_devices(self):
132 activation = activate.Activate([])
133 result = activation.validate_devices({'type': 'bluestore', 'data': {}, 'block': {}})
134 assert result is True
135
136 def test_bluestore_without_type(self):
137 activation = activate.Activate([])
138 result = activation.validate_devices({'data': {}, 'block': {}})
139 assert result is True
140
141 def test_bluestore_is_default(self):
142 activation = activate.Activate([])
143 result = activation.validate_devices({'data': {}, 'block': {}})
144 assert result is True
145
146 def test_bluestore_data_device_found(self, capsys):
147 activation = activate.Activate([])
148 with pytest.raises(RuntimeError):
149 activation.validate_devices({'data': {}})
150 stdout, stderr = capsys.readouterr()
151 assert "devices found: ['data']" in stderr
152
153 def test_bluestore_missing_data(self):
154 activation = activate.Activate([])
155 with pytest.raises(RuntimeError) as error:
156 activation.validate_devices({'type': 'bluestore', 'block': {}})
157 assert 'Unable to activate bluestore OSD due to missing devices' in str(error.value)
158
159 def test_bluestore_block_device_found(self, capsys):
160 activation = activate.Activate([])
161 with pytest.raises(RuntimeError):
162 activation.validate_devices({'block': {}})
163 stdout, stderr = capsys.readouterr()
164 assert "devices found: ['block']" in stderr