]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/mgr/dashboard/helper.py
bump version to 18.2.4-pve3
[ceph.git] / ceph / qa / tasks / mgr / dashboard / helper.py
1 # -*- coding: utf-8 -*-
2 # pylint: disable=W0212,too-many-return-statements,too-many-public-methods
3 from __future__ import absolute_import
4
5 import json
6 import logging
7 import random
8 import re
9 import string
10 import time
11 from collections import namedtuple
12 from typing import List
13
14 import requests
15 from tasks.mgr.mgr_test_case import MgrTestCase
16 from teuthology.exceptions import \
17 CommandFailedError # pylint: disable=import-error
18
19 from . import DEFAULT_API_VERSION
20
21 log = logging.getLogger(__name__)
22
23
24 class DashboardTestCase(MgrTestCase):
25 # Display full error diffs
26 maxDiff = None
27
28 # Increased x3 (20 -> 60)
29 TIMEOUT_HEALTH_CLEAR = 60
30
31 MGRS_REQUIRED = 2
32 MDSS_REQUIRED = 1
33 REQUIRE_FILESYSTEM = True
34 CLIENTS_REQUIRED = 1
35 CEPHFS = False
36 ORCHESTRATOR = False
37 ORCHESTRATOR_TEST_DATA = {
38 'inventory': [
39 {
40 'name': 'test-host0',
41 'addr': '1.2.3.4',
42 'devices': [
43 {
44 'path': '/dev/sda',
45 }
46 ]
47 },
48 {
49 'name': 'test-host1',
50 'addr': '1.2.3.5',
51 'devices': [
52 {
53 'path': '/dev/sdb',
54 }
55 ]
56 }
57 ],
58 'daemons': [
59 {
60 'nodename': 'test-host0',
61 'daemon_type': 'mon',
62 'daemon_id': 'a'
63 },
64 {
65 'nodename': 'test-host0',
66 'daemon_type': 'mgr',
67 'daemon_id': 'x'
68 },
69 {
70 'nodename': 'test-host0',
71 'daemon_type': 'osd',
72 'daemon_id': '0'
73 },
74 {
75 'nodename': 'test-host1',
76 'daemon_type': 'osd',
77 'daemon_id': '1'
78 }
79 ]
80 }
81
82 _session = None # type: requests.sessions.Session
83 _token = None
84 _resp = None # type: requests.models.Response
85 _loggedin = False
86 _base_uri = None
87
88 AUTO_AUTHENTICATE = True
89
90 AUTH_ROLES = ['administrator']
91
92 @classmethod
93 def create_user(cls, username, password, roles=None,
94 force_password=True, cmd_args=None):
95 # pylint: disable=too-many-arguments
96 """
97 :param username: The name of the user.
98 :type username: str
99 :param password: The password.
100 :type password: str
101 :param roles: A list of roles.
102 :type roles: list
103 :param force_password: Force the use of the specified password. This
104 will bypass the password complexity check. Defaults to 'True'.
105 :type force_password: bool
106 :param cmd_args: Additional command line arguments for the
107 'ac-user-create' command.
108 :type cmd_args: None | list[str]
109 """
110 try:
111 cls._ceph_cmd(['dashboard', 'ac-user-show', username])
112 cls._ceph_cmd(['dashboard', 'ac-user-delete', username])
113 except CommandFailedError as ex:
114 if ex.exitstatus != 2:
115 raise ex
116
117 user_create_args = [
118 'dashboard', 'ac-user-create', username
119 ]
120 if force_password:
121 user_create_args.append('--force-password')
122 if cmd_args:
123 user_create_args.extend(cmd_args)
124 cls._ceph_cmd_with_secret(user_create_args, password)
125 if roles:
126 set_roles_args = ['dashboard', 'ac-user-set-roles', username]
127 for idx, role in enumerate(roles):
128 if isinstance(role, str):
129 set_roles_args.append(role)
130 else:
131 assert isinstance(role, dict)
132 rolename = 'test_role_{}'.format(idx)
133 try:
134 cls._ceph_cmd(['dashboard', 'ac-role-show', rolename])
135 cls._ceph_cmd(['dashboard', 'ac-role-delete', rolename])
136 except CommandFailedError as ex:
137 if ex.exitstatus != 2:
138 raise ex
139 cls._ceph_cmd(['dashboard', 'ac-role-create', rolename])
140 for mod, perms in role.items():
141 args = ['dashboard', 'ac-role-add-scope-perms', rolename, mod]
142 args.extend(perms)
143 cls._ceph_cmd(args)
144 set_roles_args.append(rolename)
145 cls._ceph_cmd(set_roles_args)
146
147 @classmethod
148 def create_pool(cls, name, pg_num, pool_type, application='rbd'):
149 data = {
150 'pool': name,
151 'pg_num': pg_num,
152 'pool_type': pool_type,
153 'application_metadata': [application]
154 }
155 if pool_type == 'erasure':
156 data['flags'] = ['ec_overwrites']
157 cls._task_post("/api/pool", data)
158
159 @classmethod
160 def login(cls, username, password, set_cookies=False):
161 if cls._loggedin:
162 cls.logout()
163 cls._post('/api/auth', {'username': username,
164 'password': password}, set_cookies=set_cookies)
165 cls._assertEq(cls._resp.status_code, 201)
166 cls._token = cls.jsonBody()['token']
167 cls._loggedin = True
168
169 @classmethod
170 def logout(cls, set_cookies=False):
171 if cls._loggedin:
172 cls._post('/api/auth/logout', set_cookies=set_cookies)
173 cls._assertEq(cls._resp.status_code, 200)
174 cls._token = None
175 cls._loggedin = False
176
177 @classmethod
178 def delete_user(cls, username, roles=None):
179 if roles is None:
180 roles = []
181 cls._ceph_cmd(['dashboard', 'ac-user-delete', username])
182 for idx, role in enumerate(roles):
183 if isinstance(role, dict):
184 cls._ceph_cmd(['dashboard', 'ac-role-delete', 'test_role_{}'.format(idx)])
185
186 @classmethod
187 def RunAs(cls, username, password, roles=None, force_password=True,
188 cmd_args=None, login=True):
189 # pylint: disable=too-many-arguments
190 def wrapper(func):
191 def execute(self, *args, **kwargs):
192 self.create_user(username, password, roles,
193 force_password, cmd_args)
194 if login:
195 self.login(username, password)
196 res = func(self, *args, **kwargs)
197 if login:
198 self.logout()
199 self.delete_user(username, roles)
200 return res
201
202 return execute
203
204 return wrapper
205
206 @classmethod
207 def set_jwt_token(cls, token):
208 cls._token = token
209
210 @classmethod
211 def setUpClass(cls):
212 super(DashboardTestCase, cls).setUpClass()
213 cls._assign_ports("dashboard", "ssl_server_port")
214 cls._load_module("dashboard")
215 cls.update_base_uri()
216
217 if cls.CEPHFS:
218 cls.mds_cluster.clear_firewall()
219
220 # To avoid any issues with e.g. unlink bugs, we destroy and recreate
221 # the filesystem rather than just doing a rm -rf of files
222 cls.mds_cluster.mds_stop()
223 cls.mds_cluster.mds_fail()
224 cls.mds_cluster.delete_all_filesystems()
225 cls.fs = None # is now invalid!
226
227 cls.fs = cls.mds_cluster.newfs(create=True)
228 cls.fs.mds_restart()
229
230 # In case some test messed with auth caps, reset them
231 # pylint: disable=not-an-iterable
232 client_mount_ids = [m.client_id for m in cls.mounts]
233 for client_id in client_mount_ids:
234 cls.mds_cluster.mon_manager.raw_cluster_cmd_result(
235 'auth', 'caps', "client.{0}".format(client_id),
236 'mds', 'allow',
237 'mon', 'allow r',
238 'osd', 'allow rw pool={0}'.format(cls.fs.get_data_pool_name()))
239
240 # wait for mds restart to complete...
241 cls.fs.wait_for_daemons()
242
243 if cls.ORCHESTRATOR:
244 cls._load_module("test_orchestrator")
245
246 cmd = ['orch', 'set', 'backend', 'test_orchestrator']
247 cls.mgr_cluster.mon_manager.raw_cluster_cmd(*cmd)
248
249 cmd = ['test_orchestrator', 'load_data', '-i', '-']
250 cls.mgr_cluster.mon_manager.raw_cluster_cmd_result(*cmd, stdin=json.dumps(
251 cls.ORCHESTRATOR_TEST_DATA
252 ))
253
254 cls._token = None
255 cls._session = requests.Session()
256 cls._resp = None
257
258 cls.create_user('admin', 'admin', cls.AUTH_ROLES)
259 if cls.AUTO_AUTHENTICATE:
260 cls.login('admin', 'admin')
261
262 @classmethod
263 def update_base_uri(cls):
264 if cls._base_uri is None:
265 cls._base_uri = cls._get_uri("dashboard").rstrip('/')
266
267 def setUp(self):
268 super(DashboardTestCase, self).setUp()
269 if not self._loggedin and self.AUTO_AUTHENTICATE:
270 self.login('admin', 'admin')
271 self.wait_for_health_clear(self.TIMEOUT_HEALTH_CLEAR)
272
273 @classmethod
274 def tearDownClass(cls):
275 super(DashboardTestCase, cls).tearDownClass()
276
277 # pylint: disable=inconsistent-return-statements, too-many-arguments, too-many-branches
278 @classmethod
279 def _request(cls, url, method, data=None, params=None, version=DEFAULT_API_VERSION,
280 set_cookies=False, headers=None):
281 url = "{}{}".format(cls._base_uri, url)
282 log.debug("Request %s to %s", method, url)
283 if headers is None:
284 headers = {}
285 cookies = {}
286 if cls._token:
287 if set_cookies:
288 cookies['token'] = cls._token
289 else:
290 headers['Authorization'] = "Bearer {}".format(cls._token)
291 if version is None:
292 headers['Accept'] = 'application/json'
293 else:
294 headers['Accept'] = 'application/vnd.ceph.api.v{}+json'.format(version)
295
296 if set_cookies:
297 if method == 'GET':
298 cls._resp = cls._session.get(url, params=params, verify=False,
299 headers=headers, cookies=cookies)
300 elif method == 'POST':
301 cls._resp = cls._session.post(url, json=data, params=params,
302 verify=False, headers=headers, cookies=cookies)
303 elif method == 'DELETE':
304 cls._resp = cls._session.delete(url, json=data, params=params,
305 verify=False, headers=headers, cookies=cookies)
306 elif method == 'PUT':
307 cls._resp = cls._session.put(url, json=data, params=params,
308 verify=False, headers=headers, cookies=cookies)
309 else:
310 assert False
311 else:
312 if method == 'GET':
313 cls._resp = cls._session.get(url, params=params, verify=False,
314 headers=headers)
315 elif method == 'POST':
316 cls._resp = cls._session.post(url, json=data, params=params,
317 verify=False, headers=headers)
318 elif method == 'DELETE':
319 cls._resp = cls._session.delete(url, json=data, params=params,
320 verify=False, headers=headers)
321 elif method == 'PUT':
322 cls._resp = cls._session.put(url, json=data, params=params,
323 verify=False, headers=headers)
324 else:
325 assert False
326 try:
327 if not cls._resp.ok:
328 # Output response for easier debugging.
329 log.error("Request response: %s", cls._resp.text)
330 content_type = cls._resp.headers['content-type']
331 if re.match(r'^application/.*json',
332 content_type) and cls._resp.text and cls._resp.text != "":
333 return cls._resp.json()
334 return cls._resp.text
335 except ValueError as ex:
336 log.exception("Failed to decode response: %s", cls._resp.text)
337 raise ex
338
339 @classmethod
340 def _get(cls, url, params=None, version=DEFAULT_API_VERSION, set_cookies=False, headers=None):
341 return cls._request(url, 'GET', params=params, version=version,
342 set_cookies=set_cookies, headers=headers)
343
344 @classmethod
345 def _view_cache_get(cls, url, retries=5):
346 retry = True
347 while retry and retries > 0:
348 retry = False
349 res = cls._get(url, version=DEFAULT_API_VERSION)
350 if isinstance(res, dict):
351 res = [res]
352 for view in res:
353 assert 'value' in view
354 if not view['value']:
355 retry = True
356 retries -= 1
357 if retries == 0:
358 raise Exception("{} view cache exceeded number of retries={}"
359 .format(url, retries))
360 return res
361
362 @classmethod
363 def _post(cls, url, data=None, params=None, version=DEFAULT_API_VERSION, set_cookies=False):
364 cls._request(url, 'POST', data, params, version=version, set_cookies=set_cookies)
365
366 @classmethod
367 def _delete(cls, url, data=None, params=None, version=DEFAULT_API_VERSION, set_cookies=False):
368 cls._request(url, 'DELETE', data, params, version=version, set_cookies=set_cookies)
369
370 @classmethod
371 def _put(cls, url, data=None, params=None, version=DEFAULT_API_VERSION, set_cookies=False):
372 cls._request(url, 'PUT', data, params, version=version, set_cookies=set_cookies)
373
374 @classmethod
375 def _assertEq(cls, v1, v2):
376 if not v1 == v2:
377 raise Exception("assertion failed: {} != {}".format(v1, v2))
378
379 @classmethod
380 def _assertIn(cls, v1, v2):
381 if v1 not in v2:
382 raise Exception("assertion failed: {} not in {}".format(v1, v2))
383
384 @classmethod
385 def _assertIsInst(cls, v1, v2):
386 if not isinstance(v1, v2):
387 raise Exception("assertion failed: {} not instance of {}".format(v1, v2))
388
389 # pylint: disable=too-many-arguments
390 @classmethod
391 def _task_request(cls, method, url, data, timeout, version=DEFAULT_API_VERSION,
392 set_cookies=False):
393 res = cls._request(url, method, data, version=version, set_cookies=set_cookies)
394 cls._assertIn(cls._resp.status_code, [200, 201, 202, 204, 400, 403, 404])
395
396 if cls._resp.status_code == 403:
397 return None
398
399 if cls._resp.status_code != 202:
400 log.debug("task finished immediately")
401 return res
402
403 cls._assertIn('name', res)
404 cls._assertIn('metadata', res)
405 task_name = res['name']
406 task_metadata = res['metadata']
407
408 retries = int(timeout)
409 res_task = None
410 while retries > 0 and not res_task:
411 retries -= 1
412 log.debug("task (%s, %s) is still executing", task_name, task_metadata)
413 time.sleep(1)
414 _res = cls._get('/api/task?name={}'.format(task_name), version=version)
415 cls._assertEq(cls._resp.status_code, 200)
416 executing_tasks = [task for task in _res['executing_tasks'] if
417 task['metadata'] == task_metadata]
418 finished_tasks = [task for task in _res['finished_tasks'] if
419 task['metadata'] == task_metadata]
420 if not executing_tasks and finished_tasks:
421 res_task = finished_tasks[0]
422
423 if retries <= 0:
424 raise Exception("Waiting for task ({}, {}) to finish timed out. {}"
425 .format(task_name, task_metadata, _res))
426
427 log.debug("task (%s, %s) finished", task_name, task_metadata)
428 if res_task['success']:
429 if method == 'POST':
430 cls._resp.status_code = 201
431 elif method == 'PUT':
432 cls._resp.status_code = 200
433 elif method == 'DELETE':
434 cls._resp.status_code = 204
435 return res_task['ret_value']
436
437 if 'status' in res_task['exception']:
438 cls._resp.status_code = res_task['exception']['status']
439 else:
440 cls._resp.status_code = 500
441 return res_task['exception']
442
443 @classmethod
444 def _task_post(cls, url, data=None, timeout=60, version=DEFAULT_API_VERSION, set_cookies=False):
445 return cls._task_request('POST', url, data, timeout, version=version,
446 set_cookies=set_cookies)
447
448 @classmethod
449 def _task_delete(cls, url, timeout=60, version=DEFAULT_API_VERSION, set_cookies=False):
450 return cls._task_request('DELETE', url, None, timeout, version=version,
451 set_cookies=set_cookies)
452
453 @classmethod
454 def _task_put(cls, url, data=None, timeout=60, version=DEFAULT_API_VERSION, set_cookies=False):
455 return cls._task_request('PUT', url, data, timeout, version=version,
456 set_cookies=set_cookies)
457
458 @classmethod
459 def cookies(cls):
460 return cls._resp.cookies
461
462 @classmethod
463 def jsonBody(cls):
464 return cls._resp.json()
465
466 @classmethod
467 def reset_session(cls):
468 cls._session = requests.Session()
469
470 def assertSubset(self, data, biggerData):
471 for key, value in data.items():
472 self.assertEqual(biggerData[key], value)
473
474 def assertJsonBody(self, data):
475 body = self._resp.json()
476 self.assertEqual(body, data)
477
478 def assertJsonSubset(self, data):
479 self.assertSubset(data, self._resp.json())
480
481 def assertSchema(self, data, schema):
482 try:
483 return _validate_json(data, schema)
484 except _ValError as e:
485 self.assertEqual(data, str(e))
486
487 def assertSchemaBody(self, schema):
488 self.assertSchema(self.jsonBody(), schema)
489
490 def assertBody(self, body):
491 self.assertEqual(self._resp.text, body)
492
493 def assertStatus(self, status):
494 if isinstance(status, list):
495 self.assertIn(self._resp.status_code, status)
496 else:
497 self.assertEqual(self._resp.status_code, status)
498
499 def assertHeaders(self, headers):
500 for name, value in headers.items():
501 self.assertIn(name, self._resp.headers)
502 self.assertEqual(self._resp.headers[name], value)
503
504 def assertError(self, code=None, component=None, detail=None):
505 body = self._resp.json()
506 if code:
507 self.assertEqual(body['code'], code)
508 if component:
509 self.assertEqual(body['component'], component)
510 if detail:
511 self.assertEqual(body['detail'], detail)
512
513 @classmethod
514 def _ceph_cmd(cls, cmd):
515 res = cls.mgr_cluster.mon_manager.raw_cluster_cmd(*cmd)
516 log.debug("command result: %s", res)
517 return res
518
519 @classmethod
520 def _ceph_cmd_result(cls, cmd):
521 exitstatus = cls.mgr_cluster.mon_manager.raw_cluster_cmd_result(*cmd)
522 log.debug("command exit status: %d", exitstatus)
523 return exitstatus
524
525 @classmethod
526 def _ceph_cmd_with_secret(cls, cmd: List[str], secret: str, return_exit_code: bool = False):
527 cmd.append('-i')
528 cmd.append('{}'.format(cls._ceph_create_tmp_file(secret)))
529 if return_exit_code:
530 return cls._ceph_cmd_result(cmd)
531 return cls._ceph_cmd(cmd)
532
533 @classmethod
534 def _ceph_create_tmp_file(cls, content: str) -> str:
535 """Create a temporary file in the remote cluster"""
536 file_name = ''.join(random.choices(string.ascii_letters + string.digits, k=20))
537 file_path = '/tmp/{}'.format(file_name)
538 cls._cmd(['sh', '-c', 'echo -n {} > {}'.format(content, file_path)])
539 return file_path
540
541 def set_config_key(self, key, value):
542 self._ceph_cmd(['config-key', 'set', key, value])
543
544 def get_config_key(self, key):
545 return self._ceph_cmd(['config-key', 'get', key])
546
547 @classmethod
548 def _cmd(cls, args):
549 return cls.mgr_cluster.admin_remote.run(args=args)
550
551 @classmethod
552 def _rbd_cmd(cls, cmd):
553 args = ['rbd']
554 args.extend(cmd)
555 cls._cmd(args)
556
557 @classmethod
558 def _radosgw_admin_cmd(cls, cmd):
559 args = ['radosgw-admin']
560 args.extend(cmd)
561 cls._cmd(args)
562
563 @classmethod
564 def _rados_cmd(cls, cmd):
565 args = ['rados']
566 args.extend(cmd)
567 cls._cmd(args)
568
569 @classmethod
570 def mons(cls):
571 out = cls.ceph_cluster.mon_manager.raw_cluster_cmd('quorum_status')
572 j = json.loads(out)
573 return [mon['name'] for mon in j['monmap']['mons']]
574
575 @classmethod
576 def find_object_in_list(cls, key, value, iterable):
577 """
578 Get the first occurrence of an object within a list with
579 the specified key/value.
580 :param key: The name of the key.
581 :param value: The value to search for.
582 :param iterable: The list to process.
583 :return: Returns the found object or None.
584 """
585 for obj in iterable:
586 if key in obj and obj[key] == value:
587 return obj
588 return None
589
590
591 # TODP: pass defaults=(False,) to namedtuple() if python3.7
592 class JLeaf(namedtuple('JLeaf', ['typ', 'none'])):
593 def __new__(cls, typ, none=False):
594 return super().__new__(cls, typ, none)
595
596
597 JList = namedtuple('JList', ['elem_typ'])
598
599 JTuple = namedtuple('JTuple', ['elem_typs'])
600
601 JUnion = namedtuple('JUnion', ['elem_typs'])
602
603
604 class JObj(namedtuple('JObj', ['sub_elems', 'allow_unknown', 'none', 'unknown_schema'])):
605 def __new__(cls, sub_elems, allow_unknown=False, none=False, unknown_schema=None):
606 """
607 :type sub_elems: dict[str, JAny | JLeaf | JList | JObj | type]
608 :type allow_unknown: bool
609 :type none: bool
610 :type unknown_schema: int, str, JAny | JLeaf | JList | JObj
611 :return:
612 """
613 return super(JObj, cls).__new__(cls, sub_elems, allow_unknown, none, unknown_schema)
614
615
616 JAny = namedtuple('JAny', ['none'])
617
618 module_options_object_schema = JObj({
619 'name': str,
620 'type': str,
621 'level': str,
622 'flags': int,
623 'default_value': JAny(none=True),
624 'min': JAny(none=False),
625 'max': JAny(none=False),
626 'enum_allowed': JList(str),
627 'see_also': JList(str),
628 'desc': str,
629 'long_desc': str,
630 'tags': JList(str),
631 })
632
633 module_options_schema = JObj(
634 {},
635 allow_unknown=True,
636 unknown_schema=module_options_object_schema)
637
638 addrvec_schema = JList(JObj({
639 'addr': str,
640 'nonce': int,
641 'type': str
642 }))
643
644 devices_schema = JList(JObj({
645 'daemons': JList(str),
646 'devid': str,
647 'location': JList(JObj({
648 'host': str,
649 'dev': str,
650 'path': str
651 }))
652 }, allow_unknown=True))
653
654
655 class _ValError(Exception):
656 def __init__(self, msg, path):
657 path_str = ''.join('[{}]'.format(repr(p)) for p in path)
658 super(_ValError, self).__init__('In `input{}`: {}'.format(path_str, msg))
659
660
661 # pylint: disable=dangerous-default-value,inconsistent-return-statements,too-many-branches
662 def _validate_json(val, schema, path=[]):
663 """
664 >>> d = {'a': 1, 'b': 'x', 'c': range(10)}
665 ... ds = JObj({'a': int, 'b': str, 'c': JList(int)})
666 ... _validate_json(d, ds)
667 True
668 >>> _validate_json({'num': 1}, JObj({'num': JUnion([int,float])}))
669 True
670 >>> _validate_json({'num': 'a'}, JObj({'num': JUnion([int,float])}))
671 False
672 """
673 if isinstance(schema, JAny):
674 if not schema.none and val is None:
675 raise _ValError('val is None', path)
676 return True
677 if isinstance(schema, JLeaf):
678 if schema.none and val is None:
679 return True
680 if not isinstance(val, schema.typ):
681 raise _ValError('val not of type {}'.format(schema.typ), path)
682 return True
683 if isinstance(schema, JList):
684 if not isinstance(val, list):
685 raise _ValError('val="{}" is not a list'.format(val), path)
686 return all(_validate_json(e, schema.elem_typ, path + [i]) for i, e in enumerate(val))
687 if isinstance(schema, JTuple):
688 return all(_validate_json(val[i], typ, path + [i])
689 for i, typ in enumerate(schema.elem_typs))
690 if isinstance(schema, JUnion):
691 for typ in schema.elem_typs:
692 try:
693 if _validate_json(val, typ, path):
694 return True
695 except _ValError:
696 pass
697 return False
698 if isinstance(schema, JObj):
699 if val is None and schema.none:
700 return True
701 if val is None:
702 raise _ValError('val is None', path)
703 if not hasattr(val, 'keys'):
704 raise _ValError('val="{}" is not a dict'.format(val), path)
705 missing_keys = set(schema.sub_elems.keys()).difference(set(val.keys()))
706 if missing_keys:
707 raise _ValError('missing keys: {}'.format(missing_keys), path)
708 unknown_keys = set(val.keys()).difference(set(schema.sub_elems.keys()))
709 if not schema.allow_unknown and unknown_keys:
710 raise _ValError('unknown keys: {}'.format(unknown_keys), path)
711 result = all(
712 _validate_json(val[key], sub_schema, path + [key])
713 for key, sub_schema in schema.sub_elems.items()
714 )
715 if unknown_keys and schema.allow_unknown and schema.unknown_schema:
716 result += all(
717 _validate_json(val[key], schema.unknown_schema, path + [key])
718 for key in unknown_keys
719 )
720 return result
721 if schema in [str, int, float, bool]:
722 return _validate_json(val, JLeaf(schema), path)
723
724 assert False, str(path)