# type: ignore
-import argparse
import mock
-import os
-import sys
+from mock import patch, call
import unittest
+import errno
+import socket
import pytest
-if sys.version_info >= (3, 3):
+with patch('builtins.open', create=True):
from importlib.machinery import SourceFileLoader
cd = SourceFileLoader('cephadm', 'cephadm').load_module()
-else:
- import imp
- cd = imp.load_source('cephadm', 'cephadm')
class TestCephAdm(object):
- def test_is_fsid(self):
+
+ def test_docker_unit_file(self):
+ cd.args = mock.Mock()
+ cd.container_path = '/usr/bin/docker'
+ r = cd.get_unit_file('9b9d7609-f4d5-4aba-94c8-effa764d96c9')
+ assert 'Requires=docker.service' in r
+ cd.container_path = '/usr/sbin/podman'
+ r = cd.get_unit_file('9b9d7609-f4d5-4aba-94c8-effa764d96c9')
+ assert 'Requires=docker.service' not in r
+
+ def test_attempt_bind(self):
+ cd.logger = mock.Mock()
+ address = None
+ port = 0
+
+ def os_error(errno):
+ _os_error = OSError()
+ _os_error.errno = errno
+ return _os_error
+
+ for side_effect, expected_exception in (
+ (os_error(errno.EADDRINUSE), cd.PortOccupiedError),
+ (os_error(errno.EAFNOSUPPORT), OSError),
+ (os_error(errno.EADDRNOTAVAIL), OSError),
+ (None, None),
+ ):
+ _socket = mock.Mock()
+ _socket.bind.side_effect = side_effect
+ try:
+ cd.attempt_bind(_socket, address, port)
+ except Exception as e:
+ assert isinstance(e, expected_exception)
+ else:
+ if expected_exception is not None:
+ assert False, '{} should not be None'.format(expected_exception)
+
+ @mock.patch('cephadm.attempt_bind')
+ def test_port_in_use(self, attempt_bind):
+
+ assert cd.port_in_use(9100) == False
+
+ attempt_bind.side_effect = cd.PortOccupiedError('msg')
+ assert cd.port_in_use(9100) == True
+
+ os_error = OSError()
+ os_error.errno = errno.EADDRNOTAVAIL
+ attempt_bind.side_effect = os_error
+ assert cd.port_in_use(9100) == False
+
+ os_error = OSError()
+ os_error.errno = errno.EAFNOSUPPORT
+ attempt_bind.side_effect = os_error
+ assert cd.port_in_use(9100) == False
+
+ @mock.patch('socket.socket')
+ @mock.patch('cephadm.args')
+ def test_check_ip_port_success(self, args, _socket):
+ args.skip_ping_check = False
+
+ for address, address_family in (
+ ('0.0.0.0', socket.AF_INET),
+ ('::', socket.AF_INET6),
+ ):
+ try:
+ cd.check_ip_port(address, 9100)
+ except:
+ assert False
+ else:
+ assert _socket.call_args == call(address_family, socket.SOCK_STREAM)
+
+ @mock.patch('socket.socket')
+ @mock.patch('cephadm.args')
+ def test_check_ip_port_failure(self, args, _socket):
+ args.skip_ping_check = False
+
+ def os_error(errno):
+ _os_error = OSError()
+ _os_error.errno = errno
+ return _os_error
+
+ for address, address_family in (
+ ('0.0.0.0', socket.AF_INET),
+ ('::', socket.AF_INET6),
+ ):
+ for side_effect, expected_exception in (
+ (os_error(errno.EADDRINUSE), cd.PortOccupiedError),
+ (os_error(errno.EADDRNOTAVAIL), OSError),
+ (os_error(errno.EAFNOSUPPORT), OSError),
+ (None, None),
+ ):
+ mock_socket_obj = mock.Mock()
+ mock_socket_obj.bind.side_effect = side_effect
+ _socket.return_value = mock_socket_obj
+ try:
+ cd.check_ip_port(address, 9100)
+ except Exception as e:
+ assert isinstance(e, expected_exception)
+ else:
+ assert side_effect is None
+
+
+ def test_is_not_fsid(self):
assert not cd.is_fsid('no-uuid')
+ def test_is_fsid(self):
+ assert cd.is_fsid('e863154d-33c7-4350-bca5-921e0467e55b')
+
def test__get_parser_image(self):
args = cd._parse_args(['--image', 'foo', 'version'])
assert args.image == 'foo'
'192.168.122.0/24': ['192.168.122.1']}
),
])
- def test_parse_ip_route(self, test_input, expected):
- assert cd._parse_ip_route(test_input) == expected
+ def test_parse_ipv4_route(self, test_input, expected):
+ assert cd._parse_ipv4_route(test_input) == expected
+
+ @pytest.mark.parametrize("test_routes, test_ips, expected", [
+ (
+"""
+::1 dev lo proto kernel metric 256 pref medium
+fdbc:7574:21fe:9200::/64 dev wlp2s0 proto ra metric 600 pref medium
+fdd8:591e:4969:6363::/64 dev wlp2s0 proto ra metric 600 pref medium
+fde4:8dba:82e1::/64 dev eth1 proto kernel metric 256 expires 1844sec pref medium
+fe80::/64 dev tun0 proto kernel metric 256 pref medium
+fe80::/64 dev wlp2s0 proto kernel metric 600 pref medium
+default dev tun0 proto static metric 50 pref medium
+default via fe80::2480:28ec:5097:3fe2 dev wlp2s0 proto ra metric 20600 pref medium
+""",
+"""
+1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 state UNKNOWN qlen 1000
+ inet6 ::1/128 scope host
+ valid_lft forever preferred_lft forever
+2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 state UP qlen 1000
+ inet6 fdd8:591e:4969:6363:4c52:cafe:8dd4:dc4/64 scope global temporary dynamic
+ valid_lft 86394sec preferred_lft 14394sec
+ inet6 fdbc:7574:21fe:9200:4c52:cafe:8dd4:dc4/64 scope global temporary dynamic
+ valid_lft 6745sec preferred_lft 3145sec
+ inet6 fdd8:591e:4969:6363:103a:abcd:af1f:57f3/64 scope global temporary deprecated dynamic
+ valid_lft 86394sec preferred_lft 0sec
+ inet6 fdbc:7574:21fe:9200:103a:abcd:af1f:57f3/64 scope global temporary deprecated dynamic
+ valid_lft 6745sec preferred_lft 0sec
+ inet6 fdd8:591e:4969:6363:a128:1234:2bdd:1b6f/64 scope global temporary deprecated dynamic
+ valid_lft 86394sec preferred_lft 0sec
+ inet6 fdbc:7574:21fe:9200:a128:1234:2bdd:1b6f/64 scope global temporary deprecated dynamic
+ valid_lft 6745sec preferred_lft 0sec
+ inet6 fdd8:591e:4969:6363:d581:4321:380b:3905/64 scope global temporary deprecated dynamic
+ valid_lft 86394sec preferred_lft 0sec
+ inet6 fdbc:7574:21fe:9200:d581:4321:380b:3905/64 scope global temporary deprecated dynamic
+ valid_lft 6745sec preferred_lft 0sec
+ inet6 fe80::1111:2222:3333:4444/64 scope link noprefixroute
+ valid_lft forever preferred_lft forever
+ inet6 fde4:8dba:82e1:0:ec4a:e402:e9df:b357/64 scope global temporary dynamic
+ valid_lft 1074sec preferred_lft 1074sec
+ inet6 fde4:8dba:82e1:0:5054:ff:fe72:61af/64 scope global dynamic mngtmpaddr
+ valid_lft 1074sec preferred_lft 1074sec
+12: tun0: <POINTOPOINT,MULTICAST,NOARP,UP,LOWER_UP> mtu 1500 state UNKNOWN qlen 100
+ inet6 fe80::cafe:cafe:cafe:cafe/64 scope link stable-privacy
+ valid_lft forever preferred_lft forever
+""",
+ {
+ "::1": ["::1"],
+ "fdbc:7574:21fe:9200::/64": ["fdbc:7574:21fe:9200:4c52:cafe:8dd4:dc4",
+ "fdbc:7574:21fe:9200:103a:abcd:af1f:57f3",
+ "fdbc:7574:21fe:9200:a128:1234:2bdd:1b6f",
+ "fdbc:7574:21fe:9200:d581:4321:380b:3905"],
+ "fdd8:591e:4969:6363::/64": ["fdd8:591e:4969:6363:4c52:cafe:8dd4:dc4",
+ "fdd8:591e:4969:6363:103a:abcd:af1f:57f3",
+ "fdd8:591e:4969:6363:a128:1234:2bdd:1b6f",
+ "fdd8:591e:4969:6363:d581:4321:380b:3905"],
+ "fde4:8dba:82e1::/64": ["fde4:8dba:82e1:0:ec4a:e402:e9df:b357",
+ "fde4:8dba:82e1:0:5054:ff:fe72:61af"],
+ "fe80::/64": ["fe80::1111:2222:3333:4444",
+ "fe80::cafe:cafe:cafe:cafe"]
+ }
+ )])
+ def test_parse_ipv6_route(self, test_routes, test_ips, expected):
+ assert cd._parse_ipv6_route(test_routes, test_ips) == expected
+
+ def test_is_ipv6(self):
+ cd.logger = mock.Mock()
+ for good in ("[::1]", "::1",
+ "fff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"):
+ assert cd.is_ipv6(good)
+ for bad in ("127.0.0.1",
+ "ffff:ffff:ffff:ffff:ffff:ffff:ffff:fffg",
+ "1:2:3:4:5:6:7:8:9", "fd00::1::1", "[fg::1]"):
+ assert not cd.is_ipv6(bad)
+
+ def test_unwrap_ipv6(self):
+ def unwrap_test(address, expected):
+ assert cd.unwrap_ipv6(address) == expected
+
+ tests = [
+ ('::1', '::1'), ('[::1]', '::1'),
+ ('[fde4:8dba:82e1:0:5054:ff:fe6a:357]', 'fde4:8dba:82e1:0:5054:ff:fe6a:357'),
+ ('can actually be any string', 'can actually be any string'),
+ ('[but needs to be stripped] ', '[but needs to be stripped] ')]
+ for address, expected in tests:
+ unwrap_test(address, expected)
+
+ def test_wrap_ipv6(self):
+ def wrap_test(address, expected):
+ assert cd.wrap_ipv6(address) == expected
+
+ tests = [
+ ('::1', '[::1]'), ('[::1]', '[::1]'),
+ ('fde4:8dba:82e1:0:5054:ff:fe6a:357',
+ '[fde4:8dba:82e1:0:5054:ff:fe6a:357]'),
+ ('myhost.example.com', 'myhost.example.com'),
+ ('192.168.0.1', '192.168.0.1'),
+ ('', ''), ('fd00::1::1', 'fd00::1::1')]
+ for address, expected in tests:
+ wrap_test(address, expected)
+
+ @mock.patch('cephadm.call_throws')
+ @mock.patch('cephadm.get_parm')
+ def test_registry_login(self, get_parm, call_throws):
+
+ # test normal valid login with url, username and password specified
+ call_throws.return_value = '', '', 0
+ args = cd._parse_args(['registry-login', '--registry-url', 'sample-url', '--registry-username', 'sample-user', '--registry-password', 'sample-pass'])
+ cd.args = args
+ retval = cd.command_registry_login()
+ assert retval == 0
+
+ # test bad login attempt with invalid arguments given
+ args = cd._parse_args(['registry-login', '--registry-url', 'bad-args-url'])
+ cd.args = args
+ with pytest.raises(Exception) as e:
+ assert cd.command_registry_login()
+ assert str(e.value) == ('Invalid custom registry arguments received. To login to a custom registry include '
+ '--registry-url, --registry-username and --registry-password options or --registry-json option')
+
+ # test normal valid login with json file
+ get_parm.return_value = {"url": "sample-url", "username": "sample-username", "password": "sample-password"}
+ args = cd._parse_args(['registry-login', '--registry-json', 'sample-json'])
+ cd.args = args
+ retval = cd.command_registry_login()
+ assert retval == 0
+
+ # test bad login attempt with bad json file
+ get_parm.return_value = {"bad-json": "bad-json"}
+ args = cd._parse_args(['registry-login', '--registry-json', 'sample-json'])
+ cd.args = args
+ with pytest.raises(Exception) as e:
+ assert cd.command_registry_login()
+ assert str(e.value) == ("json provided for custom registry login did not include all necessary fields. "
+ "Please setup json file as\n"
+ "{\n"
+ " \"url\": \"REGISTRY_URL\",\n"
+ " \"username\": \"REGISTRY_USERNAME\",\n"
+ " \"password\": \"REGISTRY_PASSWORD\"\n"
+ "}\n")
+
+ # test login attempt with valid arguments where login command fails
+ call_throws.side_effect = Exception
+ args = cd._parse_args(['registry-login', '--registry-url', 'sample-url', '--registry-username', 'sample-user', '--registry-password', 'sample-pass'])
+ cd.args = args
+ with pytest.raises(Exception) as e:
+ cd.command_registry_login()
+ assert str(e.value) == "Failed to login to custom registry @ sample-url as sample-user with given password"
+
+ def test_get_image_info_from_inspect(self):
+ # podman
+ out = """204a01f9b0b6710dd0c0af7f37ce7139c47ff0f0105d778d7104c69282dfbbf1,[docker.io/ceph/ceph@sha256:1cc9b824e1b076cdff52a9aa3f0cc8557d879fb2fbbba0cafed970aca59a3992]"""
+ r = cd.get_image_info_from_inspect(out, 'registry/ceph/ceph:latest')
+ assert r == {
+ 'image_id': '204a01f9b0b6710dd0c0af7f37ce7139c47ff0f0105d778d7104c69282dfbbf1',
+ 'repo_digest': 'docker.io/ceph/ceph@sha256:1cc9b824e1b076cdff52a9aa3f0cc8557d879fb2fbbba0cafed970aca59a3992'
+ }
+
+ # docker
+ out = """sha256:16f4549cf7a8f112bbebf7946749e961fbbd1b0838627fe619aab16bc17ce552,[quay.ceph.io/ceph-ci/ceph@sha256:4e13da36c1bd6780b312a985410ae678984c37e6a9493a74c87e4a50b9bda41f]"""
+ r = cd.get_image_info_from_inspect(out, 'registry/ceph/ceph:latest')
+ assert r == {
+ 'image_id': '16f4549cf7a8f112bbebf7946749e961fbbd1b0838627fe619aab16bc17ce552',
+ 'repo_digest': 'quay.ceph.io/ceph-ci/ceph@sha256:4e13da36c1bd6780b312a985410ae678984c37e6a9493a74c87e4a50b9bda41f'
+ }
+
+ def test_dict_get(self):
+ result = cd.dict_get({'a': 1}, 'a', require=True)
+ assert result == 1
+ result = cd.dict_get({'a': 1}, 'b')
+ assert result is None
+ result = cd.dict_get({'a': 1}, 'b', default=2)
+ assert result == 2
+
+ def test_dict_get_error(self):
+ with pytest.raises(cd.Error):
+ cd.dict_get({'a': 1}, 'b', require=True)
+
+ def test_dict_get_join(self):
+ result = cd.dict_get_join({'foo': ['a', 'b']}, 'foo')
+ assert result == 'a\nb'
+ result = cd.dict_get_join({'foo': [1, 2]}, 'foo')
+ assert result == '1\n2'
+ result = cd.dict_get_join({'bar': 'a'}, 'bar')
+ assert result == 'a'
+ result = cd.dict_get_join({'a': 1}, 'a')
+ assert result == 1
+
+ def test_last_local_images(self):
+ out = '''
+docker.io/ceph/daemon-base@
+docker.io/ceph/ceph:v15.2.5
+docker.io/ceph/daemon-base:octopus
+ '''
+ image = cd._filter_last_local_ceph_image(out)
+ assert image == 'docker.io/ceph/ceph:v15.2.5'
+
+
+class TestCustomContainer(unittest.TestCase):
+ cc: cd.CustomContainer
+
+ def setUp(self):
+ self.cc = cd.CustomContainer(
+ 'e863154d-33c7-4350-bca5-921e0467e55b',
+ 'container',
+ config_json={
+ 'entrypoint': 'bash',
+ 'gid': 1000,
+ 'args': [
+ '--no-healthcheck',
+ '-p 6800:6800'
+ ],
+ 'envs': ['SECRET=password'],
+ 'ports': [8080, 8443],
+ 'volume_mounts': {
+ '/CONFIG_DIR': '/foo/conf',
+ 'bar/config': '/bar:ro'
+ },
+ 'bind_mounts': [
+ [
+ 'type=bind',
+ 'source=/CONFIG_DIR',
+ 'destination=/foo/conf',
+ ''
+ ],
+ [
+ 'type=bind',
+ 'source=bar/config',
+ 'destination=/bar:ro',
+ 'ro=true'
+ ]
+ ]
+ },
+ image='docker.io/library/hello-world:latest'
+ )
+
+ def test_entrypoint(self):
+ self.assertEqual(self.cc.entrypoint, 'bash')
+
+ def test_uid_gid(self):
+ self.assertEqual(self.cc.uid, 65534)
+ self.assertEqual(self.cc.gid, 1000)
+
+ def test_ports(self):
+ self.assertEqual(self.cc.ports, [8080, 8443])
+
+ def test_get_container_args(self):
+ result = self.cc.get_container_args()
+ self.assertEqual(result, [
+ '--no-healthcheck',
+ '-p 6800:6800'
+ ])
+
+ def test_get_container_envs(self):
+ result = self.cc.get_container_envs()
+ self.assertEqual(result, ['SECRET=password'])
+
+ def test_get_container_mounts(self):
+ result = self.cc.get_container_mounts('/xyz')
+ self.assertDictEqual(result, {
+ '/CONFIG_DIR': '/foo/conf',
+ '/xyz/bar/config': '/bar:ro'
+ })
+
+ def test_get_container_binds(self):
+ result = self.cc.get_container_binds('/xyz')
+ self.assertEqual(result, [
+ [
+ 'type=bind',
+ 'source=/CONFIG_DIR',
+ 'destination=/foo/conf',
+ ''
+ ],
+ [
+ 'type=bind',
+ 'source=/xyz/bar/config',
+ 'destination=/bar:ro',
+ 'ro=true'
+ ]
+ ])
+
+
+class TestMonitoring(object):
+ @mock.patch('cephadm.call')
+ def test_get_version_alertmanager(self, _call):
+ ctx = mock.Mock()
+ daemon_type = 'alertmanager'
+
+ # binary `prometheus`
+ _call.return_value = '', '{}, version 0.16.1'.format(daemon_type), 0
+ version = cd.Monitoring.get_version(ctx, 'container_id', daemon_type)
+ assert version == '0.16.1'
+
+ # binary `prometheus-alertmanager`
+ _call.side_effect = (
+ ('', '', 1),
+ ('', '{}, version 0.16.1'.format(daemon_type), 0),
+ )
+ version = cd.Monitoring.get_version(ctx, 'container_id', daemon_type)
+ assert version == '0.16.1'
+
+ @mock.patch('cephadm.call')
+ def test_get_version_prometheus(self, _call):
+ ctx = mock.Mock()
+ daemon_type = 'prometheus'
+ _call.return_value = '', '{}, version 0.16.1'.format(daemon_type), 0
+ version = cd.Monitoring.get_version(ctx, 'container_id', daemon_type)
+ assert version == '0.16.1'
+
+ @mock.patch('cephadm.call')
+ def test_get_version_node_exporter(self, _call):
+ ctx = mock.Mock()
+ daemon_type = 'node-exporter'
+ _call.return_value = '', '{}, version 0.16.1'.format(daemon_type.replace('-', '_')), 0
+ version = cd.Monitoring.get_version(ctx, 'container_id', daemon_type)
+ assert version == '0.16.1'
+
+ @mock.patch('cephadm.os.fchown')
+ @mock.patch('cephadm.get_parm')
+ @mock.patch('cephadm.makedirs')
+ @mock.patch('cephadm.open')
+ @mock.patch('cephadm.make_log_dir')
+ @mock.patch('cephadm.make_data_dir')
+ @mock.patch('cephadm.args')
+ def test_create_daemon_dirs_prometheus(self, args, make_data_dir, make_log_dir, _open, makedirs,
+ get_parm, fchown):
+ """
+ Ensures the required and optional files given in the configuration are
+ created and mapped correctly inside the container. Tests absolute and
+ relative file paths given in the configuration.
+ """
+ args.data_dir = '/somedir'
+ fsid = 'aaf5a720-13fe-4a3b-82b9-2d99b7fd9704'
+ daemon_type = 'prometheus'
+ uid, gid = 50, 50
+ daemon_id = 'home'
+ files = {
+ 'files': {
+ 'prometheus.yml': 'foo',
+ '/etc/prometheus/alerting/ceph_alerts.yml': 'bar'
+ }
+ }
+ get_parm.return_value = files
+
+ cd.create_daemon_dirs(fsid,
+ daemon_type,
+ daemon_id,
+ uid,
+ gid,
+ config=None,
+ keyring=None)
+
+ prefix = '{data_dir}/{fsid}/{daemon_type}.{daemon_id}'.format(
+ data_dir=args.data_dir,
+ fsid=fsid,
+ daemon_type=daemon_type,
+ daemon_id=daemon_id
+ )
+ assert _open.call_args_list == [
+ call('{}/etc/prometheus/prometheus.yml'.format(prefix), 'w',
+ encoding='utf-8'),
+ call('{}/etc/prometheus/alerting/ceph_alerts.yml'.format(prefix), 'w',
+ encoding='utf-8'),
+ ]
+ assert call().__enter__().write('foo') in _open.mock_calls
+ assert call().__enter__().write('bar') in _open.mock_calls