]>
git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/devices/lvm/test_batch.py
2 from ceph_volume
.devices
.lvm
import batch
5 class TestBatch(object):
7 def test_batch_instance(self
, is_root
):
11 def test_get_devices(self
, monkeypatch
):
18 'sas_device_handle': '',
20 'size': 21474836480.0,
21 'support_discard': '',
25 'sectors': '41940959',
31 'scheduler_mode': 'mq-deadline',
33 'human_readable_size': '20.00 GB',
41 'sas_device_handle': '',
43 'size': 21474836480.0,
44 'support_discard': '',
47 'scheduler_mode': 'mq-deadline',
49 'human_readable_size': '20.00 GB',
53 monkeypatch
.setattr('ceph_volume.devices.lvm.batch.disk.get_devices',
56 result
= b
.get_devices().strip()
57 assert result
== '* /dev/vdf 20.00 GB rotational'
59 def test_disjoint_device_lists(self
, factory
):
60 device1
= factory(used_by_ceph
=False, available
=True, abspath
="/dev/sda")
61 device2
= factory(used_by_ceph
=False, available
=True, abspath
="/dev/sdb")
63 b
.args
.devices
= [device1
, device2
]
64 b
.args
.db_devices
= [device2
]
66 with pytest
.raises(Exception) as disjoint_ex
:
67 b
._ensure
_disjoint
_device
_lists
()
68 assert 'Device lists are not disjoint' in str(disjoint_ex
.value
)
71 class TestFilterDevices(object):
73 def test_filter_used_device(self
, factory
):
74 device1
= factory(used_by_ceph
=True, abspath
="/dev/sda")
75 args
= factory(devices
=[device1
], filtered_devices
={})
76 result
, filtered_devices
= batch
.filter_devices(args
)
78 assert device1
.abspath
in filtered_devices
80 def test_has_unused_devices(self
, factory
):
87 args
= factory(devices
=[device1
], filtered_devices
={})
88 result
, filtered_devices
= batch
.filter_devices(args
)
89 assert device1
in result
90 assert not filtered_devices
92 def test_filter_device_used_as_a_journal(self
, factory
):
99 lv
= factory(tags
={"ceph.type": "journal"})
102 abspath
="/dev/nvme0n1",
107 args
= factory(devices
=[hdd1
, ssd1
], filtered_devices
={})
108 result
, filtered_devices
= batch
.filter_devices(args
)
110 assert ssd1
.abspath
in filtered_devices
112 def test_last_device_is_not_filtered(self
, factory
):
121 abspath
="/dev/nvme0n1",
125 args
= factory(devices
=[hdd1
, ssd1
], filtered_devices
={})
126 result
, filtered_devices
= batch
.filter_devices(args
)
128 assert len(filtered_devices
) == 1