mock_bad_firewalld,
)
+from pyfakefs import fake_filesystem_unittest
+
with mock.patch('builtins.open', create=True):
from importlib.machinery import SourceFileLoader
cd = SourceFileLoader('cephadm', 'cephadm').load_module()
with pytest.raises(Exception):
cd.prepare_dashboard(ctx, 0, 0, lambda _, extra_mounts=None, ___=None : '5', lambda : None)
+ @mock.patch('cephadm.logger')
+ @mock.patch('cephadm.get_custom_config_files')
+ @mock.patch('cephadm.get_container')
+ def test_get_deployment_container(self, _get_container, _get_config, logger):
+ """
+ test get_deployment_container properly makes use of extra container args and custom conf files
+ """
+
+ ctx = cd.CephadmContext()
+ ctx.config_json = '-'
+ ctx.extra_container_args = [
+ '--pids-limit=12345',
+ '--something',
+ ]
+ ctx.data_dir = 'data'
+ _get_config.return_value = {'custom_config_files': [
+ {
+ 'mount_path': '/etc/testing.str',
+ 'content': 'this\nis\na\nstring',
+ }
+ ]}
+ _get_container.return_value = cd.CephContainer.for_daemon(
+ ctx,
+ fsid='9b9d7609-f4d5-4aba-94c8-effa764d96c9',
+ daemon_type='grafana',
+ daemon_id='host1',
+ entrypoint='',
+ args=[],
+ container_args=[],
+ volume_mounts={},
+ bind_mounts=[],
+ envs=[],
+ privileged=False,
+ ptrace=False,
+ host_network=True,
+ )
+ c = cd.get_deployment_container(ctx,
+ '9b9d7609-f4d5-4aba-94c8-effa764d96c9',
+ 'grafana',
+ 'host1',)
+
+ assert '--pids-limit=12345' in c.container_args
+ assert '--something' in c.container_args
+ assert os.path.join('data', '9b9d7609-f4d5-4aba-94c8-effa764d96c9', 'custom_config_files', 'grafana.host1', 'testing.str') in c.volume_mounts
+ assert c.volume_mounts[os.path.join('data', '9b9d7609-f4d5-4aba-94c8-effa764d96c9', 'custom_config_files', 'grafana.host1', 'testing.str')] == '/etc/testing.str'
+
+ @mock.patch('cephadm.logger')
+ @mock.patch('cephadm.get_custom_config_files')
+ def test_write_custom_conf_files(self, _get_config, logger, cephadm_fs):
+ """
+ test _write_custom_conf_files writes the conf files correctly
+ """
+
+ ctx = cd.CephadmContext()
+ ctx.config_json = '-'
+ ctx.data_dir = cd.DATA_DIR
+ _get_config.return_value = {'custom_config_files': [
+ {
+ 'mount_path': '/etc/testing.str',
+ 'content': 'this\nis\na\nstring',
+ },
+ {
+ 'mount_path': '/etc/testing.conf',
+ 'content': 'very_cool_conf_setting: very_cool_conf_value\nx: y',
+ },
+ {
+ 'mount_path': '/etc/no-content.conf',
+ },
+ ]}
+ cd._write_custom_conf_files(ctx, 'mon', 'host1', 'fsid', 0, 0)
+ with open(os.path.join(cd.DATA_DIR, 'fsid', 'custom_config_files', 'mon.host1', 'testing.str'), 'r') as f:
+ assert 'this\nis\na\nstring' == f.read()
+ with open(os.path.join(cd.DATA_DIR, 'fsid', 'custom_config_files', 'mon.host1', 'testing.conf'), 'r') as f:
+ assert 'very_cool_conf_setting: very_cool_conf_value\nx: y' == f.read()
+ with pytest.raises(FileNotFoundError):
+ open(os.path.join(cd.DATA_DIR, 'fsid', 'custom_config_files', 'mon.host1', 'no-content.conf'), 'r')
+
@mock.patch('cephadm.call_throws')
@mock.patch('cephadm.get_parm')
def test_registry_login(self, get_parm, call_throws):
# invalid IPv6 and valid subnets list
with pytest.raises(Exception):
- rc = cd.ip_in_sublets('fe80:2030:31:24', 'fe80::/64')
\ No newline at end of file
+ rc = cd.ip_in_sublets('fe80:2030:31:24', 'fe80::/64')
+
+class TestRescan(fake_filesystem_unittest.TestCase):
+
+ def setUp(self):
+ self.setUpPyfakefs()
+ self.fs.create_dir('/sys/class')
+ self.ctx = cd.CephadmContext()
+ self.ctx.func = cd.command_rescan_disks
+
+ def test_no_hbas(self):
+ out = cd.command_rescan_disks(self.ctx)
+ assert out == 'Ok. No compatible HBAs found'
+
+ def test_success(self):
+ self.fs.create_file('/sys/class/scsi_host/host0/scan')
+ self.fs.create_file('/sys/class/scsi_host/host1/scan')
+ out = cd.command_rescan_disks(self.ctx)
+ assert out.startswith('Ok. 2 adapters detected: 2 rescanned, 0 skipped, 0 failed')
+
+ def test_skip_usb_adapter(self):
+ self.fs.create_file('/sys/class/scsi_host/host0/scan')
+ self.fs.create_file('/sys/class/scsi_host/host1/scan')
+ self.fs.create_file('/sys/class/scsi_host/host1/proc_name', contents='usb-storage')
+ out = cd.command_rescan_disks(self.ctx)
+ assert out.startswith('Ok. 2 adapters detected: 1 rescanned, 1 skipped, 0 failed')
+
+ def test_skip_unknown_adapter(self):
+ self.fs.create_file('/sys/class/scsi_host/host0/scan')
+ self.fs.create_file('/sys/class/scsi_host/host1/scan')
+ self.fs.create_file('/sys/class/scsi_host/host1/proc_name', contents='unknown')
+ out = cd.command_rescan_disks(self.ctx)
+ assert out.startswith('Ok. 2 adapters detected: 1 rescanned, 1 skipped, 0 failed')