]> git.proxmox.com Git - ceph.git/blame - ceph/src/ceph-volume/ceph_volume/tests/devices/lvm/test_batch.py
update source to 12.2.11
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / devices / lvm / test_batch.py
CommitLineData
91327a77
AA
1from ceph_volume.devices.lvm import batch
2
3
f64942e4
AA
4class TestBatchSmoke(object):
5
6 def test_batch_instance(self, is_root):
7 b = batch.Batch([])
8 b.main()
9
10
91327a77
AA
11class TestFilterDevices(object):
12
13 def test_filter_used_device(self, factory):
14 device1 = factory(used_by_ceph=True, abspath="/dev/sda")
15 args = factory(devices=[device1], filtered_devices={})
16 result = batch.filter_devices(args)
17 assert not result
18 assert device1.abspath in args.filtered_devices
19
20 def test_has_unused_devices(self, factory):
21 device1 = factory(
22 used_by_ceph=False,
23 abspath="/dev/sda",
24 rotational=False,
25 is_lvm_member=False
26 )
27 args = factory(devices=[device1], filtered_devices={})
28 result = batch.filter_devices(args)
29 assert device1 in result
30 assert not args.filtered_devices
31
32 def test_filter_device_used_as_a_journal(self, factory):
33 hdd1 = factory(
34 used_by_ceph=True,
35 abspath="/dev/sda",
36 rotational=True,
37 is_lvm_member=True,
38 )
39 lv = factory(tags={"ceph.type": "journal"})
40 ssd1 = factory(
41 used_by_ceph=False,
42 abspath="/dev/nvme0n1",
43 rotational=False,
44 is_lvm_member=True,
45 lvs=[lv],
46 )
47 args = factory(devices=[hdd1, ssd1], filtered_devices={})
48 result = batch.filter_devices(args)
49 assert not result
50 assert ssd1.abspath in args.filtered_devices
51
52 def test_last_device_is_not_filtered(self, factory):
53 hdd1 = factory(
54 used_by_ceph=True,
55 abspath="/dev/sda",
56 rotational=True,
57 is_lvm_member=True,
58 )
59 ssd1 = factory(
60 used_by_ceph=False,
61 abspath="/dev/nvme0n1",
62 rotational=False,
63 is_lvm_member=False,
64 )
65 args = factory(devices=[hdd1, ssd1], filtered_devices={})
66 result = batch.filter_devices(args)
67 assert result
68 assert len(args.filtered_devices) == 1