]>
git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/devices/lvm/test_prepare.py
2 from ceph_volume
.devices
import lvm
3 from ceph_volume
.api
import lvm
as api
4 from mock
.mock
import patch
, Mock
9 def test_main_spits_help_with_no_arguments(self
, capsys
):
10 lvm
.main
.LVM([]).main()
11 stdout
, stderr
= capsys
.readouterr()
12 assert 'Use LVM and LVM-based technologies to deploy' in stdout
14 def test_main_shows_activate_subcommands(self
, capsys
):
15 lvm
.main
.LVM([]).main()
16 stdout
, stderr
= capsys
.readouterr()
17 assert 'activate ' in stdout
18 assert 'Discover and mount' in stdout
20 def test_main_shows_prepare_subcommands(self
, capsys
):
21 lvm
.main
.LVM([]).main()
22 stdout
, stderr
= capsys
.readouterr()
23 assert 'prepare ' in stdout
24 assert 'Format an LVM device' in stdout
27 class TestPrepareDevice(object):
29 def test_cannot_use_device(self
, factory
):
30 args
= factory(data
='/dev/var/foo')
31 with pytest
.raises(RuntimeError) as error
:
32 p
= lvm
.prepare
.Prepare([])
34 p
.prepare_data_device( 'data', '0')
35 assert 'Cannot use device (/dev/var/foo)' in str(error
.value
)
36 assert 'A vg/lv path or an existing device is needed' in str(error
.value
)
39 class TestGetClusterFsid(object):
41 def test_fsid_is_passed_in(self
, factory
):
42 args
= factory(cluster_fsid
='aaaa-1111')
43 prepare_obj
= lvm
.prepare
.Prepare([])
44 prepare_obj
.args
= args
45 assert prepare_obj
.get_cluster_fsid() == 'aaaa-1111'
47 def test_fsid_is_read_from_ceph_conf(self
, factory
, conf_ceph_stub
):
48 conf_ceph_stub('[global]\nfsid = bbbb-2222')
49 prepare_obj
= lvm
.prepare
.Prepare([])
50 prepare_obj
.args
= factory(cluster_fsid
=None)
51 assert prepare_obj
.get_cluster_fsid() == 'bbbb-2222'
54 class TestPrepare(object):
56 def test_main_spits_help_with_no_arguments(self
, capsys
):
57 lvm
.prepare
.Prepare([]).main()
58 stdout
, stderr
= capsys
.readouterr()
59 assert 'Prepare an OSD by assigning an ID and FSID' in stdout
61 def test_main_shows_full_help(self
, capsys
):
62 with pytest
.raises(SystemExit):
63 lvm
.prepare
.Prepare(argv
=['--help']).main()
64 stdout
, stderr
= capsys
.readouterr()
65 assert 'Use the bluestore objectstore' in stdout
66 assert 'A physical device or logical' in stdout
68 @patch('ceph_volume.devices.lvm.prepare.api.is_ceph_device')
69 def test_safe_prepare_osd_already_created(self
, m_is_ceph_device
):
70 m_is_ceph_device
.return_value
= True
71 with pytest
.raises(RuntimeError) as error
:
72 prepare
= lvm
.prepare
.Prepare(argv
=[])
74 prepare
.args
.data
= '/dev/sdfoo'
75 prepare
.get_lv
= Mock()
76 prepare
.safe_prepare()
77 expected
= 'skipping {}, it is already prepared'.format('/dev/sdfoo')
78 assert expected
in str(error
.value
)
80 def test_setup_device_device_name_is_none(self
):
81 result
= lvm
.prepare
.Prepare([]).setup_device(device_type
='data', device_name
=None, tags
={'ceph.type': 'data'}, size
=0, slots
=None)
82 assert result
== ('', '', {'ceph.type': 'data'})
84 @patch('ceph_volume.api.lvm.Volume.set_tags')
85 @patch('ceph_volume.devices.lvm.prepare.api.get_single_lv')
86 def test_setup_device_lv_passed(self
, m_get_single_lv
, m_set_tags
):
87 fake_volume
= api
.Volume(lv_name
='lv_foo', lv_path
='/fake-path', vg_name
='vg_foo', lv_tags
='', lv_uuid
='fake-uuid')
88 m_get_single_lv
.return_value
= fake_volume
89 result
= lvm
.prepare
.Prepare([]).setup_device(device_type
='data', device_name
='vg_foo/lv_foo', tags
={'ceph.type': 'data'}, size
=0, slots
=None)
91 assert result
== ('/fake-path', 'fake-uuid', {'ceph.type': 'data',
93 'ceph.data_uuid': 'fake-uuid',
94 'ceph.data_device': '/fake-path'})
96 @patch('ceph_volume.devices.lvm.prepare.api.create_lv')
97 @patch('ceph_volume.api.lvm.Volume.set_tags')
98 @patch('ceph_volume.util.disk.is_device')
99 def test_setup_device_device_passed(self
, m_is_device
, m_set_tags
, m_create_lv
):
100 fake_volume
= api
.Volume(lv_name
='lv_foo', lv_path
='/fake-path', vg_name
='vg_foo', lv_tags
='', lv_uuid
='fake-uuid')
101 m_is_device
.return_value
= True
102 m_create_lv
.return_value
= fake_volume
103 result
= lvm
.prepare
.Prepare([]).setup_device(device_type
='data', device_name
='/dev/sdx', tags
={'ceph.type': 'data'}, size
=0, slots
=None)
105 assert result
== ('/fake-path', 'fake-uuid', {'ceph.type': 'data',
107 'ceph.data_uuid': 'fake-uuid',
108 'ceph.data_device': '/fake-path'})
110 @patch('ceph_volume.devices.lvm.prepare.Prepare.get_ptuuid')
111 @patch('ceph_volume.devices.lvm.prepare.api.get_single_lv')
112 def test_setup_device_partition_passed(self
, m_get_single_lv
, m_get_ptuuid
):
113 m_get_single_lv
.side_effect
= ValueError()
114 m_get_ptuuid
.return_value
= 'fake-uuid'
115 result
= lvm
.prepare
.Prepare([]).setup_device(device_type
='data', device_name
='/dev/sdx', tags
={'ceph.type': 'data'}, size
=0, slots
=None)
117 assert result
== ('/dev/sdx', 'fake-uuid', {'ceph.type': 'data',
119 'ceph.data_uuid': 'fake-uuid',
120 'ceph.data_device': '/dev/sdx'})
122 def test_invalid_osd_id_passed(self
):
123 with pytest
.raises(SystemExit):
124 lvm
.prepare
.Prepare(argv
=['--osd-id', 'foo']).main()
127 class TestActivate(object):
129 def test_main_spits_help_with_no_arguments(self
, capsys
):
130 lvm
.activate
.Activate([]).main()
131 stdout
, stderr
= capsys
.readouterr()
132 assert 'Activate OSDs by discovering them with' in stdout
134 def test_main_shows_full_help(self
, capsys
):
135 with pytest
.raises(SystemExit):
136 lvm
.activate
.Activate(argv
=['--help']).main()
137 stdout
, stderr
= capsys
.readouterr()
138 assert 'optional arguments' in stdout
139 assert 'positional arguments' in stdout