]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/rook/rook-client-python/generate_model_classes.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / rook / rook-client-python / generate_model_classes.py
index 760dd5e23e30d7b9d0bb3bdeb52f3e74ac5b3ee0..110995badb2eaf74975bb3b8611b327f71e53b3f 100644 (file)
@@ -17,7 +17,8 @@ Usage:
 import os
 from abc import ABC, abstractmethod
 from collections import OrderedDict
-from typing import List, Union, Iterator, Optional
+from typing import List, Union, Iterator, Optional, Dict, TypeVar, Callable
+import copy
 
 import yaml
 try:
@@ -25,6 +26,9 @@ try:
 except ImportError:
     from attr import dataclass  # type: ignore
 
+T = TypeVar('T')
+K = TypeVar('K')
+
 header = '''"""
 This file is automatically generated.
 Do not modify.
@@ -47,15 +51,26 @@ class CRDBase(ABC):
 
     @property
     def py_name(self):
-        return self.name
+        if self.name == 'exec':
+            return 'exec_1'
+        return self.name.replace('-', '_')
 
     @property
     @abstractmethod
     def py_type(self):
         ...
 
+    @property
+    def py_type_escaped(self):
+        return self.py_type
+
+
+    @abstractmethod
+    def flatten(self) -> Iterator[Union['CRDClass', 'CRDList', 'CRDAttribute']]:
+        ...
+
     @abstractmethod
-    def flatten(self):
+    def toplevel(self) -> str:
         ...
 
     def py_property(self):
@@ -112,14 +127,18 @@ class CRDAttribute(CRDBase):
             'string': 'str',
             'object': 'Any',
             'number': 'float',
+            'x-kubernetes-int-or-string': 'Union[int, str]',
         }[self.type]
 
-    def flatten(self):
+    def flatten(self) -> Iterator[Union['CRDClass', 'CRDList', 'CRDAttribute']]:
         yield from ()
 
     def toplevel(self):
         return ''
 
+    def __hash__(self):
+        return hash(repr(self))
+
 
 @dataclass
 class CRDList(CRDBase):
@@ -133,6 +152,10 @@ class CRDList(CRDBase):
     def py_type(self):
         return self.name[0].upper() + self.name[1:] + 'List'
 
+    @property
+    def py_type_escaped(self):
+        return f"'{self.py_type}'"
+
     @property
     def py_param_type(self):
         inner = f'Union[List[{self.items.py_type}], CrdObjectList]'
@@ -143,7 +166,7 @@ class CRDList(CRDBase):
         inner = f'Union[List[{self.items.py_type}], CrdObjectList]'
         return f'Optional[{inner}]' if (self.nullable) else inner
 
-    def flatten(self):
+    def flatten(self) -> Iterator[Union['CRDClass', 'CRDList', 'CRDAttribute']]:
         yield from self.items.flatten()
         yield self
 
@@ -157,13 +180,21 @@ class {self.py_type}(CrdObjectList):
 {indent('_items_type = ' + py_type)}
 """.strip()
 
+    def __eq__(self, other):
+        if type(self) != type(other):
+            return False
+        return self.toplevel() == other.toplevel()
+
+    def __hash__(self):
+        return hash(self.toplevel())
+
 
 @dataclass
 class CRDClass(CRDBase):
     attrs: List[Union[CRDAttribute, 'CRDClass']]
     base_class: str = 'CrdObject'
 
-    def toplevel(self):
+    def toplevel(self) -> str:
         ps = '\n\n'.join(a.py_property() for a in self.attrs)
         return f"""class {self.py_type}({self.base_class}):
 {indent(self.py_properties())}        
@@ -181,18 +212,23 @@ class CRDClass(CRDBase):
     def py_type(self):
         return self.name[0].upper() + self.name[1:]
 
+    @property
+    def py_type_escaped(self):
+        return f"'{self.py_type}'"
+
+
     def py_properties(self):
         def a_to_tuple(a):
             return ', '.join((f"'{a.name}'",
                        f"'{a.py_name}'",
-                       a.py_type.replace('Any', 'object'),
+                       a.py_type_escaped.replace('Any', 'object'),
                        str(a.required),
                        str(a.nullable)))
 
         attrlist = ',\n'.join([f'({a_to_tuple(a)})' for a in self.attrs])
         return f"""_properties = [\n{indent(attrlist)}\n]"""
 
-    def flatten(self) -> Iterator['CRDClass']:
+    def flatten(self) -> Iterator[Union['CRDClass', 'CRDList', 'CRDAttribute']]:
         for sub_cls in self.attrs:
             yield from sub_cls.flatten()
         yield self
@@ -210,6 +246,15 @@ def __init__(self,
     )
 """.strip()
 
+    def __eq__(self, other):
+        if type(self) != type(other):
+            return False
+        return self.toplevel() == other.toplevel()
+
+    def __hash__(self):
+        return hash(self.toplevel())
+
+
 def indent(s, indent=4):
     return '\n'.join(' '*indent + l for l in s.splitlines())
 
@@ -228,21 +273,36 @@ def handle_property(elem_name, elem: dict, required: bool):
         return CRDAttribute(elem_name, nullable, required, elem['type'])
     elif elem == {}:
         return CRDAttribute(elem_name, nullable, required, 'object')
+    elif 'x-kubernetes-int-or-string' in elem:
+        return CRDAttribute(elem_name, nullable, required, 'x-kubernetes-int-or-string')
+
     assert False, str((elem_name, elem))
 
+def spec_get_schema(c_dict: Dict) -> Dict:
+    try:
+        return c_dict['spec']['validation']['openAPIV3Schema']
+    except (KeyError, TypeError):
+        pass
+    versions = c_dict['spec']['versions']
+    if len(versions) != 1:
+        raise RuntimeError(f'todo: {[v["name"] for v in versions]}')
+    return c_dict['spec']['versions'][0]["schema"]['openAPIV3Schema']
 
-def handle_crd(c_dict) -> Optional[CRDClass]:
+def handle_crd(c_dict: dict) -> Optional[CRDClass]:
     try:
         name = c_dict['spec']['names']['kind']
-        s = c_dict['spec']['validation']['openAPIV3Schema']
+        s = spec_get_schema(c_dict)
     except (KeyError, TypeError):
         return None
     s['required'] = ['spec']
     c = handle_property(name, s, True)
-    k8s_attrs = [CRDAttribute('apiVersion', False, True, 'string'),
-                 CRDAttribute('metadata', False, True, 'object'),
-                 CRDAttribute('status', False, False, 'object')]
-    return CRDClass(c.name, False, True, k8s_attrs + c.attrs, base_class='CrdClass')
+    if 'apiVersion' not in [a.name for a in c.attrs]:
+        c.attrs.append(CRDAttribute('apiVersion', False, True, 'string'))
+    if 'metadata' not in [a.name for a in c.attrs]:
+        c.attrs.append(CRDAttribute('metadata', False, True, 'object'))
+    if 'status' not in [a.name for a in c.attrs]:
+        c.attrs.append(CRDAttribute('status', False, False, 'object'))
+    return CRDClass(c.name, False, True, c.attrs, base_class='CrdClass')
 
 
 def local(yaml_filename):
@@ -255,23 +315,67 @@ def local(yaml_filename):
                 pass
 
 
-def remove_duplicates(items):
-    return OrderedDict.fromkeys(items).keys()
-
-
-def get_toplevels(crd):
-    elems = list(crd.flatten())
-
-    def dup_elems(l):
-        ds = set([x for x in l if l.count(x) > 1])
-        return ds
-
-    names = [t.name for t in elems]
-    for dup_name in dup_elems(names):
-        dups = set(e.toplevel() for e in elems if e.name == dup_name)
-        assert len(dups) == 1, str(dups)
-        
-    return remove_duplicates(cls.toplevel() for cls in elems)
+def remove_duplicates_by(items: List[T], key: Callable[[T], K], unify: Callable[[T, T], T]) -> List[T]:
+    res: OrderedDict[K, T] = OrderedDict()
+    for i in items:
+        k = key(i)
+        if k in res:
+            res[k] = unify(res[k], i)
+        else:
+            res[k] = i
+    return list(res.values())
+
+
+def remove_duplicates(items: List[T]) -> List[T]:
+    return list(OrderedDict.fromkeys(items).keys())
+
+
+def unify_classes(left: CRDBase, right: CRDBase) -> CRDBase:
+    assert left.name == right.name
+
+    if isinstance(left, CRDClass) and isinstance(right, CRDClass):
+        assert left.py_type == right.py_type
+        assert left.base_class == right.base_class
+        ret = CRDClass(
+            name=left.name,
+            nullable=left.nullable or right.nullable,
+            required=False,
+            attrs=remove_duplicates_by(right.attrs + left.attrs, lambda a: a.name, unify_classes),  # type: ignore
+            base_class=left.base_class
+        )
+        for a in ret.attrs:
+            # we have to set all required properties to False
+            a.required = False
+        return ret
+
+    elif isinstance(left, CRDAttribute) and isinstance(right, CRDAttribute):
+        assert left.type == right.type
+        assert left.name == right.name
+        assert left.default_value == right.default_value
+        return CRDAttribute(
+            name=left.name,
+            nullable=left.nullable or right.nullable,
+            required=False,
+            type=left.type,
+            default_value=left.default_value
+        )
+    elif type(left) != type(right):
+        # handwaving
+        return CRDAttribute(
+            name=left.name,
+            nullable=left.nullable or right.nullable,
+            required=False,
+            type='object'
+        )
+    else:
+        assert left == right, (repr(left), repr(right))
+        return left
+
+
+def get_toplevels(crd: CRDBase) -> List[str]:
+    elems: List[CRDBase] = remove_duplicates(list(crd.flatten()))
+    res = remove_duplicates_by(elems, lambda c: c.py_type, unify_classes)
+    return [e.toplevel() for e in res]
 
 
 def main(yaml_filename, outfolder):