]> git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/devices/lvm/test_batch.py
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / devices / lvm / test_batch.py
1 import pytest
2 from ceph_volume.devices.lvm import batch
3
4
5 class TestBatch(object):
6
7 def test_batch_instance(self, is_root):
8 b = batch.Batch([])
9 b.main()
10
11 def test_get_devices(self, monkeypatch):
12 return_value = {
13 '/dev/vdd': {
14 'removable': '0',
15 'vendor': '0x1af4',
16 'model': '',
17 'sas_address': '',
18 'sas_device_handle': '',
19 'sectors': 0,
20 'size': 21474836480.0,
21 'support_discard': '',
22 'partitions': {
23 'vdd1': {
24 'start': '2048',
25 'sectors': '41940959',
26 'sectorsize': 512,
27 'size': '20.00 GB'
28 }
29 },
30 'rotational': '1',
31 'scheduler_mode': 'mq-deadline',
32 'sectorsize': '512',
33 'human_readable_size': '20.00 GB',
34 'path': '/dev/vdd'
35 },
36 '/dev/vdf': {
37 'removable': '0',
38 'vendor': '0x1af4',
39 'model': '',
40 'sas_address': '',
41 'sas_device_handle': '',
42 'sectors': 0,
43 'size': 21474836480.0,
44 'support_discard': '',
45 'partitions': {},
46 'rotational': '1',
47 'scheduler_mode': 'mq-deadline',
48 'sectorsize': '512',
49 'human_readable_size': '20.00 GB',
50 'path': '/dev/vdf'
51 }
52 }
53 monkeypatch.setattr('ceph_volume.devices.lvm.batch.disk.get_devices',
54 lambda: return_value)
55 b = batch.Batch([])
56 result = b.get_devices().strip()
57 assert result == '* /dev/vdf 20.00 GB rotational'
58
59 def test_disjoint_device_lists(self, factory):
60 device1 = factory(used_by_ceph=False, available=True, abspath="/dev/sda")
61 device2 = factory(used_by_ceph=False, available=True, abspath="/dev/sdb")
62 b = batch.Batch([])
63 b.args.devices = [device1, device2]
64 b.args.db_devices = [device2]
65 b._filter_devices()
66 with pytest.raises(Exception) as disjoint_ex:
67 b._ensure_disjoint_device_lists()
68 assert 'Device lists are not disjoint' in str(disjoint_ex.value)
69
70
71 class TestFilterDevices(object):
72
73 def test_filter_used_device(self, factory):
74 device1 = factory(used_by_ceph=True, abspath="/dev/sda")
75 args = factory(devices=[device1], filtered_devices={})
76 result, filtered_devices = batch.filter_devices(args)
77 assert not result
78 assert device1.abspath in filtered_devices
79
80 def test_has_unused_devices(self, factory):
81 device1 = factory(
82 used_by_ceph=False,
83 abspath="/dev/sda",
84 rotational=False,
85 is_lvm_member=False
86 )
87 args = factory(devices=[device1], filtered_devices={})
88 result, filtered_devices = batch.filter_devices(args)
89 assert device1 in result
90 assert not filtered_devices
91
92 def test_filter_device_used_as_a_journal(self, factory):
93 hdd1 = factory(
94 used_by_ceph=True,
95 abspath="/dev/sda",
96 rotational=True,
97 is_lvm_member=True,
98 )
99 lv = factory(tags={"ceph.type": "journal"})
100 ssd1 = factory(
101 used_by_ceph=False,
102 abspath="/dev/nvme0n1",
103 rotational=False,
104 is_lvm_member=True,
105 lvs=[lv],
106 )
107 args = factory(devices=[hdd1, ssd1], filtered_devices={})
108 result, filtered_devices = batch.filter_devices(args)
109 assert not result
110 assert ssd1.abspath in filtered_devices
111
112 def test_last_device_is_not_filtered(self, factory):
113 hdd1 = factory(
114 used_by_ceph=True,
115 abspath="/dev/sda",
116 rotational=True,
117 is_lvm_member=True,
118 )
119 ssd1 = factory(
120 used_by_ceph=False,
121 abspath="/dev/nvme0n1",
122 rotational=False,
123 is_lvm_member=False,
124 )
125 args = factory(devices=[hdd1, ssd1], filtered_devices={})
126 result, filtered_devices = batch.filter_devices(args)
127 assert result
128 assert len(filtered_devices) == 1