]> git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/test_inventory.py
import ceph quincy 17.2.1
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / test_inventory.py
1 # -*- coding: utf-8 -*-
2
3 import pytest
4 from ceph_volume.util.device import Devices
5 from ceph_volume.util.lsmdisk import LSMDisk
6 import ceph_volume.util.lsmdisk as lsmdisk
7
8
9 @pytest.fixture
10 def device_report_keys(device_info):
11 device_info(devices={
12 # example output of disk.get_devices()
13 '/dev/sdb': {'human_readable_size': '1.82 TB',
14 'locked': 0,
15 'model': 'PERC H700',
16 'nr_requests': '128',
17 'partitions': {},
18 'path': '/dev/sdb',
19 'removable': '0',
20 'rev': '2.10',
21 'ro': '0',
22 'rotational': '1',
23 'sas_address': '',
24 'sas_device_handle': '',
25 'scheduler_mode': 'cfq',
26 'sectors': 0,
27 'sectorsize': '512',
28 'size': 1999844147200.0,
29 'support_discard': '',
30 'vendor': 'DELL',
31 'device_id': 'Vendor-Model-Serial'}
32 }
33 )
34 report = Devices().json_report()[0]
35 return list(report.keys())
36
37 @pytest.fixture
38 def device_sys_api_keys(device_info):
39 device_info(devices={
40 # example output of disk.get_devices()
41 '/dev/sdb': {'human_readable_size': '1.82 TB',
42 'locked': 0,
43 'model': 'PERC H700',
44 'nr_requests': '128',
45 'partitions': {},
46 'path': '/dev/sdb',
47 'removable': '0',
48 'rev': '2.10',
49 'ro': '0',
50 'rotational': '1',
51 'sas_address': '',
52 'sas_device_handle': '',
53 'scheduler_mode': 'cfq',
54 'sectors': 0,
55 'sectorsize': '512',
56 'size': 1999844147200.0,
57 'support_discard': '',
58 'vendor': 'DELL'}
59 }
60 )
61 report = Devices().json_report()[0]
62 return list(report['sys_api'].keys())
63
64 @pytest.fixture
65 def device_data(device_info):
66 device_info(
67 devices={
68 # example output of disk.get_devices()
69 '/dev/sdb': {
70 'human_readable_size': '1.82 TB',
71 'locked': 0,
72 'model': 'PERC H700',
73 'nr_requests': '128',
74 'partitions': {},
75 'path': '/dev/sdb',
76 'removable': '0',
77 'rev': '2.10',
78 'ro': '0',
79 'rotational': '1',
80 'sas_address': '',
81 'sas_device_handle': '',
82 'scheduler_mode': 'cfq',
83 'sectors': 0,
84 'sectorsize': '512',
85 'size': 1999844147200.0,
86 'support_discard': '',
87 'vendor': 'DELL',
88 }
89 }
90 )
91
92 dev = Devices().devices[0]
93 dev.lsm_data = {
94 "serialNum": 'S2X9NX0H935283',
95 "transport": 'SAS',
96 "mediaType": 'HDD',
97 "rpm": 10000,
98 "linkSpeed": 6000,
99 "health": 'Good',
100 "ledSupport": {
101 "IDENTsupport": 'Supported',
102 "IDENTstatus": 'Off',
103 "FAILsupport": 'Supported',
104 "FAILstatus": 'Off',
105 },
106 "errors": [],
107 }
108 return dev.json_report()
109
110
111 class TestInventory(object):
112
113 expected_keys = [
114 'ceph_device',
115 'path',
116 'rejected_reasons',
117 'sys_api',
118 'available',
119 'lvs',
120 'device_id',
121 'lsm_data',
122 ]
123
124 expected_sys_api_keys = [
125 'human_readable_size',
126 'locked',
127 'model',
128 'nr_requests',
129 'partitions',
130 'path',
131 'removable',
132 'rev',
133 'ro',
134 'rotational',
135 'sas_address',
136 'sas_device_handle',
137 'scheduler_mode',
138 'sectors',
139 'sectorsize',
140 'size',
141 'support_discard',
142 'vendor',
143 ]
144
145 expected_lsm_keys = [
146 'serialNum',
147 'transport',
148 'mediaType',
149 'rpm',
150 'linkSpeed',
151 'health',
152 'ledSupport',
153 'errors',
154 ]
155
156 def test_json_inventory_keys_unexpected(self, fake_call, device_report_keys):
157 for k in device_report_keys:
158 assert k in self.expected_keys, "unexpected key {} in report".format(k)
159
160 def test_json_inventory_keys_missing(self, fake_call, device_report_keys):
161 for k in self.expected_keys:
162 assert k in device_report_keys, "expected key {} in report".format(k)
163
164 def test_sys_api_keys_unexpected(self, fake_call, device_sys_api_keys):
165 for k in device_sys_api_keys:
166 assert k in self.expected_sys_api_keys, "unexpected key {} in sys_api field".format(k)
167
168 def test_sys_api_keys_missing(self, fake_call, device_sys_api_keys):
169 for k in self.expected_sys_api_keys:
170 assert k in device_sys_api_keys, "expected key {} in sys_api field".format(k)
171
172 def test_lsm_data_type_unexpected(self, fake_call, device_data):
173 assert isinstance(device_data['lsm_data'], dict), "lsm_data field must be of type dict"
174
175 def test_lsm_data_keys_unexpected(self, fake_call, device_data):
176 for k in device_data['lsm_data'].keys():
177 assert k in self.expected_lsm_keys, "unexpected key {} in lsm_data field".format(k)
178
179 def test_lsm_data_keys_missing(self, fake_call, device_data):
180 lsm_keys = device_data['lsm_data'].keys()
181 assert lsm_keys
182 for k in self.expected_lsm_keys:
183 assert k in lsm_keys, "expected key {} in lsm_data field".format(k)
184
185
186 @pytest.fixture
187 def lsm_info(monkeypatch):
188 def mock_query_lsm(_, func, path):
189 query_map = {
190 'serial_num_get': "S2X9NX0H935283",
191 'link_type_get': 6,
192 'rpm_get': 0,
193 'link_speed_get': 6000,
194 'health_status_get': 2,
195 'led_status_get': 36,
196 }
197 return query_map.get(func, 'Unknown')
198
199 # mocked states and settings taken from the libstoragemgmt code base
200 # c_binding/include/libstoragemgmt/libstoragemgmt_types.h at
201 # https://github.com/libstorage/libstoragemgmt/
202 mock_health_map = {
203 -1: "Unknown",
204 0: "Fail",
205 1: "Warn",
206 2: "Good",
207 }
208 mock_transport_map = {
209 -1: "Unavailable",
210 0: "Fibre Channel",
211 2: "IBM SSA",
212 3: "Serial Bus",
213 4: "SCSI RDMA",
214 5: "iSCSI",
215 6: "SAS",
216 7: "ADT (Tape)",
217 8: "ATA/SATA",
218 9: "USB",
219 10: "SCSI over PCI-E",
220 11: "PCI-E",
221 }
222 class MockLEDStates():
223 LED_STATUS_UNKNOWN = 1
224 LED_STATUS_IDENT_ON = 2
225 LED_STATUS_IDENT_OFF = 4
226 LED_STATUS_IDENT_UNKNOWN = 8
227 LED_STATUS_FAULT_ON = 16
228 LED_STATUS_FAULT_OFF = 32
229 LED_STATUS_FAULT_UNKNOWN = 64
230
231 monkeypatch.setattr(LSMDisk, '_query_lsm', mock_query_lsm)
232 monkeypatch.setattr(lsmdisk, 'health_map', mock_health_map)
233 monkeypatch.setattr(lsmdisk, 'transport_map', mock_transport_map)
234 monkeypatch.setattr(lsmdisk, 'lsm_Disk', MockLEDStates)
235
236 return LSMDisk('/dev/sda')
237
238
239 class TestLSM(object):
240 def test_lsmdisk_health(self, lsm_info):
241 assert lsm_info.health == "Good"
242 def test_lsmdisk_transport(self, lsm_info):
243 assert lsm_info.transport == 'SAS'
244 def test_lsmdisk_mediatype(self, lsm_info):
245 assert lsm_info.media_type == 'Flash'
246 def test_lsmdisk_led_ident_support(self, lsm_info):
247 assert lsm_info.led_ident_support == 'Supported'
248 def test_lsmdisk_led_ident(self, lsm_info):
249 assert lsm_info.led_ident_state == 'Off'
250 def test_lsmdisk_led_fault_support(self, lsm_info):
251 assert lsm_info.led_fault_support == 'Supported'
252 def test_lsmdisk_led_fault(self, lsm_info):
253 assert lsm_info.led_fault_state == 'Off'
254 def test_lsmdisk_report(self, lsm_info):
255 assert isinstance(lsm_info.json_report(), dict)