assert cephadm_module.migration_current == LAST_MIGRATION
assert cephadm_module.keys.keys['client.admin'].placement.label == '_admin'
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+def test_migrate_set_sane_value(cephadm_module: CephadmOrchestrator):
+ cephadm_module.migration_current = 0
+ cephadm_module.migration.set_sane_migration_current()
+ assert cephadm_module.migration_current == 0
+
+ cephadm_module.migration_current = LAST_MIGRATION
+ cephadm_module.migration.set_sane_migration_current()
+ assert cephadm_module.migration_current == LAST_MIGRATION
+
+ cephadm_module.migration_current = None
+ cephadm_module.migration.set_sane_migration_current()
+ assert cephadm_module.migration_current == LAST_MIGRATION
+
+ cephadm_module.migration_current = LAST_MIGRATION + 1
+ cephadm_module.migration.set_sane_migration_current()
+ assert cephadm_module.migration_current == 0
+
+ cephadm_module.migration_current = None
+ ongoing = cephadm_module.migration.is_migration_ongoing()
+ assert not ongoing
+ assert cephadm_module.migration_current == LAST_MIGRATION
+
+ cephadm_module.migration_current = LAST_MIGRATION + 1
+ ongoing = cephadm_module.migration.is_migration_ongoing()
+ assert ongoing
+ assert cephadm_module.migration_current == 0