+ @pytest.mark.parametrize(
+ "spec, raise_exception, msg",
+ [
+ # Valid retention_time values (valid units: 'y', 'w', 'd', 'h', 'm', 's')
+ (PrometheusSpec(retention_time='1y'), False, ''),
+ (PrometheusSpec(retention_time=' 10w '), False, ''),
+ (PrometheusSpec(retention_time=' 1348d'), False, ''),
+ (PrometheusSpec(retention_time='2000h '), False, ''),
+ (PrometheusSpec(retention_time='173847m'), False, ''),
+ (PrometheusSpec(retention_time='200s'), False, ''),
+ (PrometheusSpec(retention_time=' '), False, ''), # default value will be used
+
+ # Invalid retention_time values
+ (PrometheusSpec(retention_time='100k'), True, '^Invalid retention time'), # invalid unit
+ (PrometheusSpec(retention_time='10'), True, '^Invalid retention time'), # no unit
+ (PrometheusSpec(retention_time='100.00y'), True, '^Invalid retention time'), # invalid value and valid unit
+ (PrometheusSpec(retention_time='100.00k'), True, '^Invalid retention time'), # invalid value and invalid unit
+ (PrometheusSpec(retention_time='---'), True, '^Invalid retention time'), # invalid value
+
+ # Valid retention_size values (valid units: 'B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB')
+ (PrometheusSpec(retention_size='123456789B'), False, ''),
+ (PrometheusSpec(retention_size=' 200KB'), False, ''),
+ (PrometheusSpec(retention_size='99999MB '), False, ''),
+ (PrometheusSpec(retention_size=' 10GB '), False, ''),
+ (PrometheusSpec(retention_size='100TB'), False, ''),
+ (PrometheusSpec(retention_size='500PB'), False, ''),
+ (PrometheusSpec(retention_size='200EB'), False, ''),
+ (PrometheusSpec(retention_size=' '), False, ''), # default value will be used
+
+ # Invalid retention_size values
+ (PrometheusSpec(retention_size='100b'), True, '^Invalid retention size'), # invalid unit (case sensitive)
+ (PrometheusSpec(retention_size='333kb'), True, '^Invalid retention size'), # invalid unit (case sensitive)
+ (PrometheusSpec(retention_size='2000'), True, '^Invalid retention size'), # no unit
+ (PrometheusSpec(retention_size='200.00PB'), True, '^Invalid retention size'), # invalid value and valid unit
+ (PrometheusSpec(retention_size='400.B'), True, '^Invalid retention size'), # invalid value and invalid unit
+ (PrometheusSpec(retention_size='10.000s'), True, '^Invalid retention size'), # invalid value and invalid unit
+ (PrometheusSpec(retention_size='...'), True, '^Invalid retention size'), # invalid value
+
+ # valid retention_size and valid retention_time
+ (PrometheusSpec(retention_time='1y', retention_size='100GB'), False, ''),
+ # invalid retention_time and valid retention_size
+ (PrometheusSpec(retention_time='1j', retention_size='100GB'), True, '^Invalid retention time'),
+ # valid retention_time and invalid retention_size
+ (PrometheusSpec(retention_time='1y', retention_size='100gb'), True, '^Invalid retention size'),
+ # valid retention_time and invalid retention_size
+ (PrometheusSpec(retention_time='1y', retention_size='100gb'), True, '^Invalid retention size'),
+ # invalid retention_time and invalid retention_size
+ (PrometheusSpec(retention_time='1i', retention_size='100gb'), True, '^Invalid retention time'),
+ ])
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_apply_prometheus(self, spec: PrometheusSpec, raise_exception: bool, msg: str, cephadm_module: CephadmOrchestrator):
+ with with_host(cephadm_module, 'test'):
+ if not raise_exception:
+ cephadm_module._apply(spec)
+ else:
+ with pytest.raises(OrchestratorError, match=msg):
+ cephadm_module._apply(spec)
+