def __init__(self, **kw):
# default flags
self.bluestore = False
- self.filestore = False
self.no_systemd = False
self.auto_detect_objectstore = None
for k, v in kw.items():
# test the negative side effect with an actual functional run, so we must
# setup a perfect scenario for this test to check it can really work
# with/without osd_id
- def test_no_osd_id_matches_fsid(self, is_root, monkeypatch, capture):
- FooVolume = api.Volume(lv_name='foo', lv_path='/dev/vg/foo',
- lv_tags="ceph.osd_fsid=1234")
- volumes = []
- volumes.append(FooVolume)
- monkeypatch.setattr(api, 'get_lvs', lambda **kwargs: volumes)
- monkeypatch.setattr(activate, 'activate_filestore', capture)
- args = Args(osd_id=None, osd_fsid='1234', filestore=True)
- activate.Activate([]).activate(args)
- assert capture.calls[0]['args'][0] == [FooVolume]
-
def test_no_osd_id_matches_fsid_bluestore(self, is_root, monkeypatch, capture):
FooVolume = api.Volume(lv_name='foo', lv_path='/dev/vg/foo',
lv_tags="ceph.osd_fsid=1234")
activate.Activate([]).activate(args)
assert capture.calls[0]['args'][0] == [FooVolume]
- def test_no_osd_id_no_matching_fsid(self, is_root, monkeypatch, capture):
- FooVolume = api.Volume(lv_name='foo', lv_path='/dev/vg/foo',
- lv_tags="ceph.osd_fsid=1111")
- volumes = []
- volumes.append(FooVolume)
- monkeypatch.setattr(api, 'get_lvs', lambda **kwargs: [])
- monkeypatch.setattr(api, 'get_single_lv', lambda **kwargs: [])
- monkeypatch.setattr(activate, 'activate_filestore', capture)
-
- args = Args(osd_id=None, osd_fsid='2222')
- with pytest.raises(RuntimeError):
- activate.Activate([]).activate(args)
-
def test_osd_id_no_osd_fsid(self, is_root):
args = Args(osd_id=42, osd_fsid=None)
with pytest.raises(RuntimeError) as result:
activate.Activate([]).activate(args)
assert result.value.args[0] == 'Please provide both osd_id and osd_fsid'
- def test_filestore_no_systemd(self, is_root, monkeypatch, capture):
- monkeypatch.setattr('ceph_volume.configuration.load', lambda: None)
- fake_enable = Capture()
- fake_start_osd = Capture()
- monkeypatch.setattr('ceph_volume.util.system.device_is_mounted', lambda *a, **kw: True)
- monkeypatch.setattr('ceph_volume.util.system.chown', lambda *a, **kw: True)
- monkeypatch.setattr('ceph_volume.process.run', lambda *a, **kw: True)
- monkeypatch.setattr(activate.systemctl, 'enable_volume', fake_enable)
- monkeypatch.setattr(activate.systemctl, 'start_osd', fake_start_osd)
- JournalVolume = api.Volume(
- lv_name='journal',
- lv_path='/dev/vg/journal',
- lv_uuid='000',
- lv_tags=','.join([
- "ceph.cluster_name=ceph", "ceph.journal_device=/dev/vg/journal",
- "ceph.journal_uuid=000", "ceph.type=journal",
- "ceph.osd_id=0", "ceph.osd_fsid=1234"])
- )
- DataVolume = api.Volume(
- lv_name='data',
- lv_path='/dev/vg/data',
- lv_uuid='001',
- lv_tags="ceph.cluster_name=ceph,ceph.journal_device=/dev/vg/" + \
- "journal,ceph.journal_uuid=000,ceph.type=data," + \
- "ceph.osd_id=0,ceph.osd_fsid=1234")
- volumes = []
- volumes.append(DataVolume)
- volumes.append(JournalVolume)
- monkeypatch.setattr(api, 'get_lvs', lambda **kwargs: deepcopy(volumes))
-
- args = Args(osd_id=None, osd_fsid='1234', no_systemd=True, filestore=True)
- activate.Activate([]).activate(args)
- assert fake_enable.calls == []
- assert fake_start_osd.calls == []
-
- def test_filestore_no_systemd_autodetect(self, is_root, monkeypatch, capture):
- monkeypatch.setattr('ceph_volume.configuration.load', lambda: None)
- fake_enable = Capture()
- fake_start_osd = Capture()
- monkeypatch.setattr('ceph_volume.util.system.device_is_mounted', lambda *a, **kw: True)
- monkeypatch.setattr('ceph_volume.util.system.chown', lambda *a, **kw: True)
- monkeypatch.setattr('ceph_volume.process.run', lambda *a, **kw: True)
- monkeypatch.setattr(activate.systemctl, 'enable_volume', fake_enable)
- monkeypatch.setattr(activate.systemctl, 'start_osd', fake_start_osd)
- JournalVolume = api.Volume(
- lv_name='journal',
- lv_path='/dev/vg/journal',
- lv_uuid='000',
- lv_tags=','.join([
- "ceph.cluster_name=ceph", "ceph.journal_device=/dev/vg/journal",
- "ceph.journal_uuid=000", "ceph.type=journal",
- "ceph.osd_id=0", "ceph.osd_fsid=1234"])
- )
- DataVolume = api.Volume(
- lv_name='data',
- lv_path='/dev/vg/data',
- lv_uuid='001',
- lv_tags="ceph.cluster_name=ceph,ceph.journal_device=/dev/vg/" + \
- "journal,ceph.journal_uuid=000,ceph.type=data," + \
- "ceph.osd_id=0,ceph.osd_fsid=1234")
- volumes = []
- volumes.append(DataVolume)
- volumes.append(JournalVolume)
- monkeypatch.setattr(api, 'get_lvs', lambda **kwargs: deepcopy(volumes))
-
- args = Args(osd_id=None, osd_fsid='1234', no_systemd=True,
- filestore=True, auto_detect_objectstore=True)
- activate.Activate([]).activate(args)
- assert fake_enable.calls == []
- assert fake_start_osd.calls == []
-
- def test_filestore_systemd_autodetect(self, is_root, monkeypatch, capture):
- fake_enable = Capture()
- fake_start_osd = Capture()
- monkeypatch.setattr('ceph_volume.configuration.load', lambda: None)
- monkeypatch.setattr('ceph_volume.util.system.device_is_mounted', lambda *a, **kw: True)
- monkeypatch.setattr('ceph_volume.util.system.chown', lambda *a, **kw: True)
- monkeypatch.setattr('ceph_volume.process.run', lambda *a, **kw: True)
- monkeypatch.setattr(activate.systemctl, 'enable_volume', fake_enable)
- monkeypatch.setattr(activate.systemctl, 'start_osd', fake_start_osd)
- JournalVolume = api.Volume(
- lv_name='journal',
- lv_path='/dev/vg/journal',
- lv_uuid='000',
- lv_tags=','.join([
- "ceph.cluster_name=ceph", "ceph.journal_device=/dev/vg/journal",
- "ceph.journal_uuid=000", "ceph.type=journal",
- "ceph.osd_id=0","ceph.osd_fsid=1234"])
- )
- DataVolume = api.Volume(
- lv_name='data',
- lv_path='/dev/vg/data',
- lv_uuid='001',
- lv_tags="ceph.cluster_name=ceph,ceph.journal_device=/dev/vg/" + \
- "journal,ceph.journal_uuid=000,ceph.type=data," + \
- "ceph.osd_id=0,ceph.osd_fsid=1234")
- volumes = []
- volumes.append(DataVolume)
- volumes.append(JournalVolume)
- monkeypatch.setattr(api, 'get_lvs', lambda **kwargs: deepcopy(volumes))
-
- args = Args(osd_id=None, osd_fsid='1234', no_systemd=False,
- filestore=True, auto_detect_objectstore=False)
- activate.Activate([]).activate(args)
- assert fake_enable.calls != []
- assert fake_start_osd.calls != []
-
- def test_filestore_systemd(self, is_root, monkeypatch, capture):
- fake_enable = Capture()
- fake_start_osd = Capture()
- monkeypatch.setattr('ceph_volume.configuration.load', lambda: None)
- monkeypatch.setattr('ceph_volume.util.system.device_is_mounted', lambda *a, **kw: True)
- monkeypatch.setattr('ceph_volume.util.system.chown', lambda *a, **kw: True)
- monkeypatch.setattr('ceph_volume.process.run', lambda *a, **kw: True)
- monkeypatch.setattr(activate.systemctl, 'enable_volume', fake_enable)
- monkeypatch.setattr(activate.systemctl, 'start_osd', fake_start_osd)
- JournalVolume = api.Volume(
- lv_name='journal',
- lv_path='/dev/vg/journal',
- lv_uuid='000',
- lv_tags=','.join([
- "ceph.cluster_name=ceph", "ceph.journal_device=/dev/vg/journal",
- "ceph.journal_uuid=000", "ceph.type=journal",
- "ceph.osd_id=0","ceph.osd_fsid=1234"])
- )
- DataVolume = api.Volume(
- lv_name='data',
- lv_path='/dev/vg/data',
- lv_uuid='001',
- lv_tags="ceph.cluster_name=ceph,ceph.journal_device=/dev/vg/" + \
- "journal,ceph.journal_uuid=000,ceph.type=data," + \
- "ceph.osd_id=0,ceph.osd_fsid=1234")
- volumes = []
- volumes.append(DataVolume)
- volumes.append(JournalVolume)
- monkeypatch.setattr(api, 'get_lvs', lambda **kwargs: deepcopy(volumes))
-
- args = Args(osd_id=None, osd_fsid='1234', no_systemd=False,
- filestore=True)
- activate.Activate([]).activate(args)
- assert fake_enable.calls != []
- assert fake_start_osd.calls != []
-
def test_bluestore_no_systemd(self, is_root, monkeypatch, capture):
monkeypatch.setattr('ceph_volume.configuration.load', lambda: None)
fake_enable = Capture()
activation.activate = capture
activation.main()
parsed_args = capture.calls[0]['args'][0]
- assert parsed_args.filestore is False
- assert parsed_args.bluestore is False
-
- def test_uses_filestore(self, capture):
- args = ['--filestore', '0', 'asdf-ljh-asdf']
- activation = activate.Activate(args)
- activation.activate = capture
- activation.main()
- parsed_args = capture.calls[0]['args'][0]
- assert parsed_args.filestore is True
assert parsed_args.bluestore is False
def test_uses_bluestore(self, capture):
activation.activate = capture
activation.main()
parsed_args = capture.calls[0]['args'][0]
- assert parsed_args.filestore is False
assert parsed_args.bluestore is True