]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/mgr/dashboard/helper.py
import quincy beta 17.1.0
[ceph.git] / ceph / qa / tasks / mgr / dashboard / helper.py
1 # -*- coding: utf-8 -*-
2 # pylint: disable=W0212,too-many-return-statements,too-many-public-methods
3 from __future__ import absolute_import
4
5 import json
6 import logging
7 import random
8 import re
9 import string
10 import time
11 from collections import namedtuple
12 from typing import List
13
14 import requests
15 from tasks.mgr.mgr_test_case import MgrTestCase
16 from teuthology.exceptions import \
17 CommandFailedError # pylint: disable=import-error
18
19 from . import DEFAULT_API_VERSION
20
21 log = logging.getLogger(__name__)
22
23
24 class DashboardTestCase(MgrTestCase):
25 # Display full error diffs
26 maxDiff = None
27
28 # Increased x3 (20 -> 60)
29 TIMEOUT_HEALTH_CLEAR = 60
30
31 MGRS_REQUIRED = 2
32 MDSS_REQUIRED = 1
33 REQUIRE_FILESYSTEM = True
34 CLIENTS_REQUIRED = 1
35 CEPHFS = False
36 ORCHESTRATOR = False
37 ORCHESTRATOR_TEST_DATA = {
38 'inventory': [
39 {
40 'name': 'test-host0',
41 'addr': '1.2.3.4',
42 'devices': [
43 {
44 'path': '/dev/sda',
45 }
46 ]
47 },
48 {
49 'name': 'test-host1',
50 'addr': '1.2.3.5',
51 'devices': [
52 {
53 'path': '/dev/sdb',
54 }
55 ]
56 }
57 ],
58 'daemons': [
59 {
60 'nodename': 'test-host0',
61 'daemon_type': 'mon',
62 'daemon_id': 'a'
63 },
64 {
65 'nodename': 'test-host0',
66 'daemon_type': 'mgr',
67 'daemon_id': 'x'
68 },
69 {
70 'nodename': 'test-host0',
71 'daemon_type': 'osd',
72 'daemon_id': '0'
73 },
74 {
75 'nodename': 'test-host1',
76 'daemon_type': 'osd',
77 'daemon_id': '1'
78 }
79 ]
80 }
81
82 _session = None # type: requests.sessions.Session
83 _token = None
84 _resp = None # type: requests.models.Response
85 _loggedin = False
86 _base_uri = None
87
88 AUTO_AUTHENTICATE = True
89
90 AUTH_ROLES = ['administrator']
91
92 @classmethod
93 def create_user(cls, username, password, roles=None,
94 force_password=True, cmd_args=None):
95 # pylint: disable=too-many-arguments
96 """
97 :param username: The name of the user.
98 :type username: str
99 :param password: The password.
100 :type password: str
101 :param roles: A list of roles.
102 :type roles: list
103 :param force_password: Force the use of the specified password. This
104 will bypass the password complexity check. Defaults to 'True'.
105 :type force_password: bool
106 :param cmd_args: Additional command line arguments for the
107 'ac-user-create' command.
108 :type cmd_args: None | list[str]
109 """
110 try:
111 cls._ceph_cmd(['dashboard', 'ac-user-show', username])
112 cls._ceph_cmd(['dashboard', 'ac-user-delete', username])
113 except CommandFailedError as ex:
114 if ex.exitstatus != 2:
115 raise ex
116
117 user_create_args = [
118 'dashboard', 'ac-user-create', username
119 ]
120 if force_password:
121 user_create_args.append('--force-password')
122 if cmd_args:
123 user_create_args.extend(cmd_args)
124 cls._ceph_cmd_with_secret(user_create_args, password)
125 if roles:
126 set_roles_args = ['dashboard', 'ac-user-set-roles', username]
127 for idx, role in enumerate(roles):
128 if isinstance(role, str):
129 set_roles_args.append(role)
130 else:
131 assert isinstance(role, dict)
132 rolename = 'test_role_{}'.format(idx)
133 try:
134 cls._ceph_cmd(['dashboard', 'ac-role-show', rolename])
135 cls._ceph_cmd(['dashboard', 'ac-role-delete', rolename])
136 except CommandFailedError as ex:
137 if ex.exitstatus != 2:
138 raise ex
139 cls._ceph_cmd(['dashboard', 'ac-role-create', rolename])
140 for mod, perms in role.items():
141 args = ['dashboard', 'ac-role-add-scope-perms', rolename, mod]
142 args.extend(perms)
143 cls._ceph_cmd(args)
144 set_roles_args.append(rolename)
145 cls._ceph_cmd(set_roles_args)
146
147 @classmethod
148 def create_pool(cls, name, pg_num, pool_type, application='rbd'):
149 data = {
150 'pool': name,
151 'pg_num': pg_num,
152 'pool_type': pool_type,
153 'application_metadata': [application]
154 }
155 if pool_type == 'erasure':
156 data['flags'] = ['ec_overwrites']
157 cls._task_post("/api/pool", data)
158
159 @classmethod
160 def login(cls, username, password, set_cookies=False):
161 if cls._loggedin:
162 cls.logout()
163 cls._post('/api/auth', {'username': username,
164 'password': password}, set_cookies=set_cookies)
165 cls._assertEq(cls._resp.status_code, 201)
166 cls._token = cls.jsonBody()['token']
167 cls._loggedin = True
168
169 @classmethod
170 def logout(cls, set_cookies=False):
171 if cls._loggedin:
172 cls._post('/api/auth/logout', set_cookies=set_cookies)
173 cls._assertEq(cls._resp.status_code, 200)
174 cls._token = None
175 cls._loggedin = False
176
177 @classmethod
178 def delete_user(cls, username, roles=None):
179 if roles is None:
180 roles = []
181 cls._ceph_cmd(['dashboard', 'ac-user-delete', username])
182 for idx, role in enumerate(roles):
183 if isinstance(role, dict):
184 cls._ceph_cmd(['dashboard', 'ac-role-delete', 'test_role_{}'.format(idx)])
185
186 @classmethod
187 def RunAs(cls, username, password, roles=None, force_password=True,
188 cmd_args=None, login=True):
189 # pylint: disable=too-many-arguments
190 def wrapper(func):
191 def execute(self, *args, **kwargs):
192 self.create_user(username, password, roles,
193 force_password, cmd_args)
194 if login:
195 self.login(username, password)
196 res = func(self, *args, **kwargs)
197 if login:
198 self.logout()
199 self.delete_user(username, roles)
200 return res
201
202 return execute
203
204 return wrapper
205
206 @classmethod
207 def set_jwt_token(cls, token):
208 cls._token = token
209
210 @classmethod
211 def setUpClass(cls):
212 super(DashboardTestCase, cls).setUpClass()
213 cls._assign_ports("dashboard", "ssl_server_port")
214 cls._load_module("dashboard")
215 cls.update_base_uri()
216
217 if cls.CEPHFS:
218 cls.mds_cluster.clear_firewall()
219
220 # To avoid any issues with e.g. unlink bugs, we destroy and recreate
221 # the filesystem rather than just doing a rm -rf of files
222 cls.mds_cluster.mds_stop()
223 cls.mds_cluster.mds_fail()
224 cls.mds_cluster.delete_all_filesystems()
225 cls.fs = None # is now invalid!
226
227 cls.fs = cls.mds_cluster.newfs(create=True)
228 cls.fs.mds_restart()
229
230 # In case some test messed with auth caps, reset them
231 # pylint: disable=not-an-iterable
232 client_mount_ids = [m.client_id for m in cls.mounts]
233 for client_id in client_mount_ids:
234 cls.mds_cluster.mon_manager.raw_cluster_cmd_result(
235 'auth', 'caps', "client.{0}".format(client_id),
236 'mds', 'allow',
237 'mon', 'allow r',
238 'osd', 'allow rw pool={0}'.format(cls.fs.get_data_pool_name()))
239
240 # wait for mds restart to complete...
241 cls.fs.wait_for_daemons()
242
243 if cls.ORCHESTRATOR:
244 cls._load_module("test_orchestrator")
245
246 cmd = ['orch', 'set', 'backend', 'test_orchestrator']
247 cls.mgr_cluster.mon_manager.raw_cluster_cmd(*cmd)
248
249 cmd = ['test_orchestrator', 'load_data', '-i', '-']
250 cls.mgr_cluster.mon_manager.raw_cluster_cmd_result(*cmd, stdin=json.dumps(
251 cls.ORCHESTRATOR_TEST_DATA
252 ))
253
254 cls._token = None
255 cls._session = requests.Session()
256 cls._resp = None
257
258 cls.create_user('admin', 'admin', cls.AUTH_ROLES)
259 if cls.AUTO_AUTHENTICATE:
260 cls.login('admin', 'admin')
261
262 @classmethod
263 def update_base_uri(cls):
264 if cls._base_uri is None:
265 cls._base_uri = cls._get_uri("dashboard").rstrip('/')
266
267 def setUp(self):
268 super(DashboardTestCase, self).setUp()
269 if not self._loggedin and self.AUTO_AUTHENTICATE:
270 self.login('admin', 'admin')
271 self.wait_for_health_clear(self.TIMEOUT_HEALTH_CLEAR)
272
273 @classmethod
274 def tearDownClass(cls):
275 super(DashboardTestCase, cls).tearDownClass()
276
277 # pylint: disable=inconsistent-return-statements, too-many-arguments, too-many-branches
278 @classmethod
279 def _request(cls, url, method, data=None, params=None, version=DEFAULT_API_VERSION,
280 set_cookies=False):
281 url = "{}{}".format(cls._base_uri, url)
282 log.debug("Request %s to %s", method, url)
283 headers = {}
284 cookies = {}
285 if cls._token:
286 if set_cookies:
287 cookies['token'] = cls._token
288 else:
289 headers['Authorization'] = "Bearer {}".format(cls._token)
290 if version is None:
291 headers['Accept'] = 'application/json'
292 else:
293 headers['Accept'] = 'application/vnd.ceph.api.v{}+json'.format(version)
294
295 if set_cookies:
296 if method == 'GET':
297 cls._resp = cls._session.get(url, params=params, verify=False,
298 headers=headers, cookies=cookies)
299 elif method == 'POST':
300 cls._resp = cls._session.post(url, json=data, params=params,
301 verify=False, headers=headers, cookies=cookies)
302 elif method == 'DELETE':
303 cls._resp = cls._session.delete(url, json=data, params=params,
304 verify=False, headers=headers, cookies=cookies)
305 elif method == 'PUT':
306 cls._resp = cls._session.put(url, json=data, params=params,
307 verify=False, headers=headers, cookies=cookies)
308 else:
309 assert False
310 else:
311 if method == 'GET':
312 cls._resp = cls._session.get(url, params=params, verify=False,
313 headers=headers)
314 elif method == 'POST':
315 cls._resp = cls._session.post(url, json=data, params=params,
316 verify=False, headers=headers)
317 elif method == 'DELETE':
318 cls._resp = cls._session.delete(url, json=data, params=params,
319 verify=False, headers=headers)
320 elif method == 'PUT':
321 cls._resp = cls._session.put(url, json=data, params=params,
322 verify=False, headers=headers)
323 else:
324 assert False
325 try:
326 if not cls._resp.ok:
327 # Output response for easier debugging.
328 log.error("Request response: %s", cls._resp.text)
329 content_type = cls._resp.headers['content-type']
330 if re.match(r'^application/.*json',
331 content_type) and cls._resp.text and cls._resp.text != "":
332 return cls._resp.json()
333 return cls._resp.text
334 except ValueError as ex:
335 log.exception("Failed to decode response: %s", cls._resp.text)
336 raise ex
337
338 @classmethod
339 def _get(cls, url, params=None, version=DEFAULT_API_VERSION, set_cookies=False):
340 return cls._request(url, 'GET', params=params, version=version, set_cookies=set_cookies)
341
342 @classmethod
343 def _view_cache_get(cls, url, retries=5):
344 retry = True
345 while retry and retries > 0:
346 retry = False
347 res = cls._get(url, version=DEFAULT_API_VERSION)
348 if isinstance(res, dict):
349 res = [res]
350 for view in res:
351 assert 'value' in view
352 if not view['value']:
353 retry = True
354 retries -= 1
355 if retries == 0:
356 raise Exception("{} view cache exceeded number of retries={}"
357 .format(url, retries))
358 return res
359
360 @classmethod
361 def _post(cls, url, data=None, params=None, version=DEFAULT_API_VERSION, set_cookies=False):
362 cls._request(url, 'POST', data, params, version=version, set_cookies=set_cookies)
363
364 @classmethod
365 def _delete(cls, url, data=None, params=None, version=DEFAULT_API_VERSION, set_cookies=False):
366 cls._request(url, 'DELETE', data, params, version=version, set_cookies=set_cookies)
367
368 @classmethod
369 def _put(cls, url, data=None, params=None, version=DEFAULT_API_VERSION, set_cookies=False):
370 cls._request(url, 'PUT', data, params, version=version, set_cookies=set_cookies)
371
372 @classmethod
373 def _assertEq(cls, v1, v2):
374 if not v1 == v2:
375 raise Exception("assertion failed: {} != {}".format(v1, v2))
376
377 @classmethod
378 def _assertIn(cls, v1, v2):
379 if v1 not in v2:
380 raise Exception("assertion failed: {} not in {}".format(v1, v2))
381
382 @classmethod
383 def _assertIsInst(cls, v1, v2):
384 if not isinstance(v1, v2):
385 raise Exception("assertion failed: {} not instance of {}".format(v1, v2))
386
387 # pylint: disable=too-many-arguments
388 @classmethod
389 def _task_request(cls, method, url, data, timeout, version=DEFAULT_API_VERSION,
390 set_cookies=False):
391 res = cls._request(url, method, data, version=version, set_cookies=set_cookies)
392 cls._assertIn(cls._resp.status_code, [200, 201, 202, 204, 400, 403, 404])
393
394 if cls._resp.status_code == 403:
395 return None
396
397 if cls._resp.status_code != 202:
398 log.debug("task finished immediately")
399 return res
400
401 cls._assertIn('name', res)
402 cls._assertIn('metadata', res)
403 task_name = res['name']
404 task_metadata = res['metadata']
405
406 retries = int(timeout)
407 res_task = None
408 while retries > 0 and not res_task:
409 retries -= 1
410 log.debug("task (%s, %s) is still executing", task_name, task_metadata)
411 time.sleep(1)
412 _res = cls._get('/api/task?name={}'.format(task_name), version=version)
413 cls._assertEq(cls._resp.status_code, 200)
414 executing_tasks = [task for task in _res['executing_tasks'] if
415 task['metadata'] == task_metadata]
416 finished_tasks = [task for task in _res['finished_tasks'] if
417 task['metadata'] == task_metadata]
418 if not executing_tasks and finished_tasks:
419 res_task = finished_tasks[0]
420
421 if retries <= 0:
422 raise Exception("Waiting for task ({}, {}) to finish timed out. {}"
423 .format(task_name, task_metadata, _res))
424
425 log.debug("task (%s, %s) finished", task_name, task_metadata)
426 if res_task['success']:
427 if method == 'POST':
428 cls._resp.status_code = 201
429 elif method == 'PUT':
430 cls._resp.status_code = 200
431 elif method == 'DELETE':
432 cls._resp.status_code = 204
433 return res_task['ret_value']
434
435 if 'status' in res_task['exception']:
436 cls._resp.status_code = res_task['exception']['status']
437 else:
438 cls._resp.status_code = 500
439 return res_task['exception']
440
441 @classmethod
442 def _task_post(cls, url, data=None, timeout=60, version=DEFAULT_API_VERSION, set_cookies=False):
443 return cls._task_request('POST', url, data, timeout, version=version,
444 set_cookies=set_cookies)
445
446 @classmethod
447 def _task_delete(cls, url, timeout=60, version=DEFAULT_API_VERSION, set_cookies=False):
448 return cls._task_request('DELETE', url, None, timeout, version=version,
449 set_cookies=set_cookies)
450
451 @classmethod
452 def _task_put(cls, url, data=None, timeout=60, version=DEFAULT_API_VERSION, set_cookies=False):
453 return cls._task_request('PUT', url, data, timeout, version=version,
454 set_cookies=set_cookies)
455
456 @classmethod
457 def cookies(cls):
458 return cls._resp.cookies
459
460 @classmethod
461 def jsonBody(cls):
462 return cls._resp.json()
463
464 @classmethod
465 def reset_session(cls):
466 cls._session = requests.Session()
467
468 def assertSubset(self, data, biggerData):
469 for key, value in data.items():
470 self.assertEqual(biggerData[key], value)
471
472 def assertJsonBody(self, data):
473 body = self._resp.json()
474 self.assertEqual(body, data)
475
476 def assertJsonSubset(self, data):
477 self.assertSubset(data, self._resp.json())
478
479 def assertSchema(self, data, schema):
480 try:
481 return _validate_json(data, schema)
482 except _ValError as e:
483 self.assertEqual(data, str(e))
484
485 def assertSchemaBody(self, schema):
486 self.assertSchema(self.jsonBody(), schema)
487
488 def assertBody(self, body):
489 self.assertEqual(self._resp.text, body)
490
491 def assertStatus(self, status):
492 if isinstance(status, list):
493 self.assertIn(self._resp.status_code, status)
494 else:
495 self.assertEqual(self._resp.status_code, status)
496
497 def assertHeaders(self, headers):
498 for name, value in headers.items():
499 self.assertIn(name, self._resp.headers)
500 self.assertEqual(self._resp.headers[name], value)
501
502 def assertError(self, code=None, component=None, detail=None):
503 body = self._resp.json()
504 if code:
505 self.assertEqual(body['code'], code)
506 if component:
507 self.assertEqual(body['component'], component)
508 if detail:
509 self.assertEqual(body['detail'], detail)
510
511 @classmethod
512 def _ceph_cmd(cls, cmd):
513 res = cls.mgr_cluster.mon_manager.raw_cluster_cmd(*cmd)
514 log.debug("command result: %s", res)
515 return res
516
517 @classmethod
518 def _ceph_cmd_result(cls, cmd):
519 exitstatus = cls.mgr_cluster.mon_manager.raw_cluster_cmd_result(*cmd)
520 log.debug("command exit status: %d", exitstatus)
521 return exitstatus
522
523 @classmethod
524 def _ceph_cmd_with_secret(cls, cmd: List[str], secret: str, return_exit_code: bool = False):
525 cmd.append('-i')
526 cmd.append('{}'.format(cls._ceph_create_tmp_file(secret)))
527 if return_exit_code:
528 return cls._ceph_cmd_result(cmd)
529 return cls._ceph_cmd(cmd)
530
531 @classmethod
532 def _ceph_create_tmp_file(cls, content: str) -> str:
533 """Create a temporary file in the remote cluster"""
534 file_name = ''.join(random.choices(string.ascii_letters + string.digits, k=20))
535 file_path = '/tmp/{}'.format(file_name)
536 cls._cmd(['sh', '-c', 'echo -n {} > {}'.format(content, file_path)])
537 return file_path
538
539 def set_config_key(self, key, value):
540 self._ceph_cmd(['config-key', 'set', key, value])
541
542 def get_config_key(self, key):
543 return self._ceph_cmd(['config-key', 'get', key])
544
545 @classmethod
546 def _cmd(cls, args):
547 return cls.mgr_cluster.admin_remote.run(args=args)
548
549 @classmethod
550 def _rbd_cmd(cls, cmd):
551 args = ['rbd']
552 args.extend(cmd)
553 cls._cmd(args)
554
555 @classmethod
556 def _radosgw_admin_cmd(cls, cmd):
557 args = ['radosgw-admin']
558 args.extend(cmd)
559 cls._cmd(args)
560
561 @classmethod
562 def _rados_cmd(cls, cmd):
563 args = ['rados']
564 args.extend(cmd)
565 cls._cmd(args)
566
567 @classmethod
568 def mons(cls):
569 out = cls.ceph_cluster.mon_manager.raw_cluster_cmd('quorum_status')
570 j = json.loads(out)
571 return [mon['name'] for mon in j['monmap']['mons']]
572
573 @classmethod
574 def find_object_in_list(cls, key, value, iterable):
575 """
576 Get the first occurrence of an object within a list with
577 the specified key/value.
578 :param key: The name of the key.
579 :param value: The value to search for.
580 :param iterable: The list to process.
581 :return: Returns the found object or None.
582 """
583 for obj in iterable:
584 if key in obj and obj[key] == value:
585 return obj
586 return None
587
588
589 # TODP: pass defaults=(False,) to namedtuple() if python3.7
590 class JLeaf(namedtuple('JLeaf', ['typ', 'none'])):
591 def __new__(cls, typ, none=False):
592 return super().__new__(cls, typ, none)
593
594
595 JList = namedtuple('JList', ['elem_typ'])
596
597 JTuple = namedtuple('JTuple', ['elem_typs'])
598
599 JUnion = namedtuple('JUnion', ['elem_typs'])
600
601
602 class JObj(namedtuple('JObj', ['sub_elems', 'allow_unknown', 'none', 'unknown_schema'])):
603 def __new__(cls, sub_elems, allow_unknown=False, none=False, unknown_schema=None):
604 """
605 :type sub_elems: dict[str, JAny | JLeaf | JList | JObj | type]
606 :type allow_unknown: bool
607 :type none: bool
608 :type unknown_schema: int, str, JAny | JLeaf | JList | JObj
609 :return:
610 """
611 return super(JObj, cls).__new__(cls, sub_elems, allow_unknown, none, unknown_schema)
612
613
614 JAny = namedtuple('JAny', ['none'])
615
616 module_options_object_schema = JObj({
617 'name': str,
618 'type': str,
619 'level': str,
620 'flags': int,
621 'default_value': JAny(none=True),
622 'min': JAny(none=False),
623 'max': JAny(none=False),
624 'enum_allowed': JList(str),
625 'see_also': JList(str),
626 'desc': str,
627 'long_desc': str,
628 'tags': JList(str),
629 })
630
631 module_options_schema = JObj(
632 {},
633 allow_unknown=True,
634 unknown_schema=module_options_object_schema)
635
636 addrvec_schema = JList(JObj({
637 'addr': str,
638 'nonce': int,
639 'type': str
640 }))
641
642 devices_schema = JList(JObj({
643 'daemons': JList(str),
644 'devid': str,
645 'location': JList(JObj({
646 'host': str,
647 'dev': str,
648 'path': str
649 }))
650 }, allow_unknown=True))
651
652
653 class _ValError(Exception):
654 def __init__(self, msg, path):
655 path_str = ''.join('[{}]'.format(repr(p)) for p in path)
656 super(_ValError, self).__init__('In `input{}`: {}'.format(path_str, msg))
657
658
659 # pylint: disable=dangerous-default-value,inconsistent-return-statements,too-many-branches
660 def _validate_json(val, schema, path=[]):
661 """
662 >>> d = {'a': 1, 'b': 'x', 'c': range(10)}
663 ... ds = JObj({'a': int, 'b': str, 'c': JList(int)})
664 ... _validate_json(d, ds)
665 True
666 >>> _validate_json({'num': 1}, JObj({'num': JUnion([int,float])}))
667 True
668 >>> _validate_json({'num': 'a'}, JObj({'num': JUnion([int,float])}))
669 False
670 """
671 if isinstance(schema, JAny):
672 if not schema.none and val is None:
673 raise _ValError('val is None', path)
674 return True
675 if isinstance(schema, JLeaf):
676 if schema.none and val is None:
677 return True
678 if not isinstance(val, schema.typ):
679 raise _ValError('val not of type {}'.format(schema.typ), path)
680 return True
681 if isinstance(schema, JList):
682 if not isinstance(val, list):
683 raise _ValError('val="{}" is not a list'.format(val), path)
684 return all(_validate_json(e, schema.elem_typ, path + [i]) for i, e in enumerate(val))
685 if isinstance(schema, JTuple):
686 return all(_validate_json(val[i], typ, path + [i])
687 for i, typ in enumerate(schema.elem_typs))
688 if isinstance(schema, JUnion):
689 for typ in schema.elem_typs:
690 try:
691 if _validate_json(val, typ, path):
692 return True
693 except _ValError:
694 pass
695 return False
696 if isinstance(schema, JObj):
697 if val is None and schema.none:
698 return True
699 if val is None:
700 raise _ValError('val is None', path)
701 if not hasattr(val, 'keys'):
702 raise _ValError('val="{}" is not a dict'.format(val), path)
703 missing_keys = set(schema.sub_elems.keys()).difference(set(val.keys()))
704 if missing_keys:
705 raise _ValError('missing keys: {}'.format(missing_keys), path)
706 unknown_keys = set(val.keys()).difference(set(schema.sub_elems.keys()))
707 if not schema.allow_unknown and unknown_keys:
708 raise _ValError('unknown keys: {}'.format(unknown_keys), path)
709 result = all(
710 _validate_json(val[key], sub_schema, path + [key])
711 for key, sub_schema in schema.sub_elems.items()
712 )
713 if unknown_keys and schema.allow_unknown and schema.unknown_schema:
714 result += all(
715 _validate_json(val[key], schema.unknown_schema, path + [key])
716 for key in unknown_keys
717 )
718 return result
719 if schema in [str, int, float, bool]:
720 return _validate_json(val, JLeaf(schema), path)
721
722 assert False, str(path)