]> git.proxmox.com Git - ceph.git/blame - ceph/src/ceph-volume/ceph_volume/tests/devices/lvm/test_batch.py
bump version to 15.2.4-pve1
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / devices / lvm / test_batch.py
CommitLineData
494da23a 1import pytest
91327a77
AA
2from ceph_volume.devices.lvm import batch
3
4
11fdf7f2 5class TestBatch(object):
f64942e4
AA
6
7 def test_batch_instance(self, is_root):
8 b = batch.Batch([])
9 b.main()
10
11fdf7f2
TL
11 def test_get_devices(self, monkeypatch):
12 return_value = {
13 '/dev/vdd': {
14 'removable': '0',
15 'vendor': '0x1af4',
16 'model': '',
17 'sas_address': '',
18 'sas_device_handle': '',
19 'sectors': 0,
20 'size': 21474836480.0,
21 'support_discard': '',
22 'partitions': {
23 'vdd1': {
24 'start': '2048',
25 'sectors': '41940959',
26 'sectorsize': 512,
27 'size': '20.00 GB'
28 }
29 },
30 'rotational': '1',
31 'scheduler_mode': 'mq-deadline',
32 'sectorsize': '512',
33 'human_readable_size': '20.00 GB',
34 'path': '/dev/vdd'
35 },
36 '/dev/vdf': {
37 'removable': '0',
38 'vendor': '0x1af4',
39 'model': '',
40 'sas_address': '',
41 'sas_device_handle': '',
42 'sectors': 0,
43 'size': 21474836480.0,
44 'support_discard': '',
45 'partitions': {},
46 'rotational': '1',
47 'scheduler_mode': 'mq-deadline',
48 'sectorsize': '512',
49 'human_readable_size': '20.00 GB',
50 'path': '/dev/vdf'
51 }
52 }
53 monkeypatch.setattr('ceph_volume.devices.lvm.batch.disk.get_devices',
54 lambda: return_value)
55 b = batch.Batch([])
56 result = b.get_devices().strip()
57 assert result == '* /dev/vdf 20.00 GB rotational'
58
494da23a
TL
59 def test_disjoint_device_lists(self, factory):
60 device1 = factory(used_by_ceph=False, available=True, abspath="/dev/sda")
61 device2 = factory(used_by_ceph=False, available=True, abspath="/dev/sdb")
62 b = batch.Batch([])
63 b.args.devices = [device1, device2]
64 b.args.db_devices = [device2]
65 b._filter_devices()
66 with pytest.raises(Exception) as disjoint_ex:
67 b._ensure_disjoint_device_lists()
68 assert 'Device lists are not disjoint' in str(disjoint_ex.value)
69
f64942e4 70
91327a77
AA
71class TestFilterDevices(object):
72
73 def test_filter_used_device(self, factory):
74 device1 = factory(used_by_ceph=True, abspath="/dev/sda")
75 args = factory(devices=[device1], filtered_devices={})
11fdf7f2 76 result, filtered_devices = batch.filter_devices(args)
91327a77 77 assert not result
11fdf7f2 78 assert device1.abspath in filtered_devices
91327a77
AA
79
80 def test_has_unused_devices(self, factory):
81 device1 = factory(
82 used_by_ceph=False,
83 abspath="/dev/sda",
84 rotational=False,
85 is_lvm_member=False
86 )
87 args = factory(devices=[device1], filtered_devices={})
11fdf7f2 88 result, filtered_devices = batch.filter_devices(args)
91327a77 89 assert device1 in result
11fdf7f2 90 assert not filtered_devices
91327a77
AA
91
92 def test_filter_device_used_as_a_journal(self, factory):
93 hdd1 = factory(
94 used_by_ceph=True,
95 abspath="/dev/sda",
96 rotational=True,
97 is_lvm_member=True,
98 )
99 lv = factory(tags={"ceph.type": "journal"})
100 ssd1 = factory(
101 used_by_ceph=False,
102 abspath="/dev/nvme0n1",
103 rotational=False,
104 is_lvm_member=True,
105 lvs=[lv],
106 )
107 args = factory(devices=[hdd1, ssd1], filtered_devices={})
11fdf7f2 108 result, filtered_devices = batch.filter_devices(args)
91327a77 109 assert not result
11fdf7f2 110 assert ssd1.abspath in filtered_devices
91327a77
AA
111
112 def test_last_device_is_not_filtered(self, factory):
113 hdd1 = factory(
114 used_by_ceph=True,
115 abspath="/dev/sda",
116 rotational=True,
117 is_lvm_member=True,
118 )
119 ssd1 = factory(
120 used_by_ceph=False,
121 abspath="/dev/nvme0n1",
122 rotational=False,
123 is_lvm_member=False,
124 )
125 args = factory(devices=[hdd1, ssd1], filtered_devices={})
11fdf7f2 126 result, filtered_devices = batch.filter_devices(args)
91327a77 127 assert result
11fdf7f2 128 assert len(filtered_devices) == 1
92f5a8d4
TL
129
130 def test_no_auto_fails_on_unavailable_device(self, factory):
131 hdd1 = factory(
132 used_by_ceph=False,
133 abspath="/dev/sda",
134 rotational=True,
135 is_lvm_member=False,
136 available=True
137 )
138 ssd1 = factory(
139 used_by_ceph=True,
140 abspath="/dev/nvme0n1",
141 rotational=False,
142 is_lvm_member=True,
143 available=False
144 )
145 args = factory(devices=[hdd1], db_devices=[ssd1], filtered_devices={},
146 yes=True)
147 b = batch.Batch([])
148 b.args = args
149 with pytest.raises(RuntimeError) as ex:
150 b._filter_devices()
151 assert '1 devices were filtered in non-interactive mode, bailing out' in str(ex.value)