from cephadm.services.osd import OSDRemovalQueue, OSD
import pytest
from tests import mock
+from .fixtures import with_cephadm_module
from datetime import datetime
# rm_util.process_removal_queue()
pass
+ @pytest.mark.parametrize(
+ "max_osd_draining_count, draining_osds, idling_osds, ok_to_stop, expected",
+ [
+ # drain one at a time, one already draining
+ (1, [1], [1], [True], 0),
+ # drain one at a time, none draining yet
+ (1, [], [1, 2, 3], [True, True, True], 1),
+ # drain one at a time, one already draining, none ok-to-stop
+ (1, [1], [1], [False], 0),
+ # drain one at a time, none draining, one ok-to-stop
+ (1, [], [1, 2, 3], [False, False, True], 1),
+ # drain three at a time, one already draining, all ok-to-stop
+ (3, [1], [1, 2, 3], [True, True, True], 2),
+ # drain two at a time, none already draining, none ok-to-stop
+ (2, [], [1, 2, 3], [False, False, False], 0),
+ # drain two at a time, none already draining, none idling
+ (2, [], [], [], 0),
+ ]
+ )
+ def test_ready_to_drain_osds(self, max_osd_draining_count, draining_osds, idling_osds, ok_to_stop, expected):
+ with with_cephadm_module({'max_osd_draining_count': max_osd_draining_count}) as m:
+ with mock.patch("cephadm.services.osd.OSDRemovalQueue.draining_osds", return_value=draining_osds):
+ with mock.patch("cephadm.services.osd.OSDRemovalQueue.idling_osds", return_value=idling_osds):
+ with mock.patch("cephadm.services.osd.RemoveUtil.ok_to_stop", side_effect=ok_to_stop):
+ removal_queue = OSDRemovalQueue(m)
+ assert len(removal_queue._ready_to_drain_osds()) == expected
+
def test_ok_to_stop(self, rm_util):
rm_util.ok_to_stop([MockOSD(1)])
- rm_util._run_mon_cmd.assert_called_with({'prefix': 'osd ok-to-stop', 'ids': ['1']})
+ rm_util._run_mon_cmd.assert_called_with({'prefix': 'osd ok-to-stop', 'ids': ['1']},
+ error_ok=True)
def test_safe_to_destroy(self, rm_util):
rm_util.safe_to_destroy([1])
- rm_util._run_mon_cmd.assert_called_with({'prefix': 'osd safe-to-destroy', 'ids': ['1']})
+ rm_util._run_mon_cmd.assert_called_with({'prefix': 'osd safe-to-destroy',
+ 'ids': ['1']}, error_ok=True)
def test_destroy_osd(self, rm_util):
rm_util.destroy_osd(1)
"stopped": False,
"replace": False,
"force": False,
+ "zap": False,
"nodename": "node2",
"drain_started_at": "2020-09-14T11:41:53.960463",
"drain_stopped_at": None,