]> git.proxmox.com Git - ceph.git/blame - ceph/src/ceph-volume/ceph_volume/tests/devices/lvm/test_batch.py
build: use dgit for download target
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / devices / lvm / test_batch.py
CommitLineData
91327a77
AA
1from ceph_volume.devices.lvm import batch
2
3
11fdf7f2 4class TestBatch(object):
f64942e4
AA
5
6 def test_batch_instance(self, is_root):
7 b = batch.Batch([])
8 b.main()
9
11fdf7f2
TL
10 def test_get_devices(self, monkeypatch):
11 return_value = {
12 '/dev/vdd': {
13 'removable': '0',
14 'vendor': '0x1af4',
15 'model': '',
16 'sas_address': '',
17 'sas_device_handle': '',
18 'sectors': 0,
19 'size': 21474836480.0,
20 'support_discard': '',
21 'partitions': {
22 'vdd1': {
23 'start': '2048',
24 'sectors': '41940959',
25 'sectorsize': 512,
26 'size': '20.00 GB'
27 }
28 },
29 'rotational': '1',
30 'scheduler_mode': 'mq-deadline',
31 'sectorsize': '512',
32 'human_readable_size': '20.00 GB',
33 'path': '/dev/vdd'
34 },
35 '/dev/vdf': {
36 'removable': '0',
37 'vendor': '0x1af4',
38 'model': '',
39 'sas_address': '',
40 'sas_device_handle': '',
41 'sectors': 0,
42 'size': 21474836480.0,
43 'support_discard': '',
44 'partitions': {},
45 'rotational': '1',
46 'scheduler_mode': 'mq-deadline',
47 'sectorsize': '512',
48 'human_readable_size': '20.00 GB',
49 'path': '/dev/vdf'
50 }
51 }
52 monkeypatch.setattr('ceph_volume.devices.lvm.batch.disk.get_devices',
53 lambda: return_value)
54 b = batch.Batch([])
55 result = b.get_devices().strip()
56 assert result == '* /dev/vdf 20.00 GB rotational'
57
f64942e4 58
91327a77
AA
59class TestFilterDevices(object):
60
61 def test_filter_used_device(self, factory):
62 device1 = factory(used_by_ceph=True, abspath="/dev/sda")
63 args = factory(devices=[device1], filtered_devices={})
11fdf7f2 64 result, filtered_devices = batch.filter_devices(args)
91327a77 65 assert not result
11fdf7f2 66 assert device1.abspath in filtered_devices
91327a77
AA
67
68 def test_has_unused_devices(self, factory):
69 device1 = factory(
70 used_by_ceph=False,
71 abspath="/dev/sda",
72 rotational=False,
73 is_lvm_member=False
74 )
75 args = factory(devices=[device1], filtered_devices={})
11fdf7f2 76 result, filtered_devices = batch.filter_devices(args)
91327a77 77 assert device1 in result
11fdf7f2 78 assert not filtered_devices
91327a77
AA
79
80 def test_filter_device_used_as_a_journal(self, factory):
81 hdd1 = factory(
82 used_by_ceph=True,
83 abspath="/dev/sda",
84 rotational=True,
85 is_lvm_member=True,
86 )
87 lv = factory(tags={"ceph.type": "journal"})
88 ssd1 = factory(
89 used_by_ceph=False,
90 abspath="/dev/nvme0n1",
91 rotational=False,
92 is_lvm_member=True,
93 lvs=[lv],
94 )
95 args = factory(devices=[hdd1, ssd1], filtered_devices={})
11fdf7f2 96 result, filtered_devices = batch.filter_devices(args)
91327a77 97 assert not result
11fdf7f2 98 assert ssd1.abspath in filtered_devices
91327a77
AA
99
100 def test_last_device_is_not_filtered(self, factory):
101 hdd1 = factory(
102 used_by_ceph=True,
103 abspath="/dev/sda",
104 rotational=True,
105 is_lvm_member=True,
106 )
107 ssd1 = factory(
108 used_by_ceph=False,
109 abspath="/dev/nvme0n1",
110 rotational=False,
111 is_lvm_member=False,
112 )
113 args = factory(devices=[hdd1, ssd1], filtered_devices={})
11fdf7f2 114 result, filtered_devices = batch.filter_devices(args)
91327a77 115 assert result
11fdf7f2 116 assert len(filtered_devices) == 1