import os
from abc import ABC, abstractmethod
from collections import OrderedDict
-from typing import List, Union, Iterator, Optional
+from typing import List, Union, Iterator, Optional, Dict, TypeVar, Callable
+import copy
import yaml
try:
except ImportError:
from attr import dataclass # type: ignore
+T = TypeVar('T')
+K = TypeVar('K')
+
header = '''"""
This file is automatically generated.
Do not modify.
@property
def py_name(self):
- return self.name
+ if self.name == 'exec':
+ return 'exec_1'
+ return self.name.replace('-', '_')
@property
@abstractmethod
def py_type(self):
...
+ @property
+ def py_type_escaped(self):
+ return self.py_type
+
+
+ @abstractmethod
+ def flatten(self) -> Iterator[Union['CRDClass', 'CRDList', 'CRDAttribute']]:
+ ...
+
@abstractmethod
- def flatten(self):
+ def toplevel(self) -> str:
...
def py_property(self):
'string': 'str',
'object': 'Any',
'number': 'float',
+ 'x-kubernetes-int-or-string': 'Union[int, str]',
}[self.type]
- def flatten(self):
+ def flatten(self) -> Iterator[Union['CRDClass', 'CRDList', 'CRDAttribute']]:
yield from ()
def toplevel(self):
return ''
+ def __hash__(self):
+ return hash(repr(self))
+
@dataclass
class CRDList(CRDBase):
def py_type(self):
return self.name[0].upper() + self.name[1:] + 'List'
+ @property
+ def py_type_escaped(self):
+ return f"'{self.py_type}'"
+
@property
def py_param_type(self):
inner = f'Union[List[{self.items.py_type}], CrdObjectList]'
inner = f'Union[List[{self.items.py_type}], CrdObjectList]'
return f'Optional[{inner}]' if (self.nullable) else inner
- def flatten(self):
+ def flatten(self) -> Iterator[Union['CRDClass', 'CRDList', 'CRDAttribute']]:
yield from self.items.flatten()
yield self
{indent('_items_type = ' + py_type)}
""".strip()
+ def __eq__(self, other):
+ if type(self) != type(other):
+ return False
+ return self.toplevel() == other.toplevel()
+
+ def __hash__(self):
+ return hash(self.toplevel())
+
@dataclass
class CRDClass(CRDBase):
attrs: List[Union[CRDAttribute, 'CRDClass']]
base_class: str = 'CrdObject'
- def toplevel(self):
+ def toplevel(self) -> str:
ps = '\n\n'.join(a.py_property() for a in self.attrs)
return f"""class {self.py_type}({self.base_class}):
{indent(self.py_properties())}
def py_type(self):
return self.name[0].upper() + self.name[1:]
+ @property
+ def py_type_escaped(self):
+ return f"'{self.py_type}'"
+
+
def py_properties(self):
def a_to_tuple(a):
return ', '.join((f"'{a.name}'",
f"'{a.py_name}'",
- a.py_type.replace('Any', 'object'),
+ a.py_type_escaped.replace('Any', 'object'),
str(a.required),
str(a.nullable)))
attrlist = ',\n'.join([f'({a_to_tuple(a)})' for a in self.attrs])
return f"""_properties = [\n{indent(attrlist)}\n]"""
- def flatten(self) -> Iterator['CRDClass']:
+ def flatten(self) -> Iterator[Union['CRDClass', 'CRDList', 'CRDAttribute']]:
for sub_cls in self.attrs:
yield from sub_cls.flatten()
yield self
)
""".strip()
+ def __eq__(self, other):
+ if type(self) != type(other):
+ return False
+ return self.toplevel() == other.toplevel()
+
+ def __hash__(self):
+ return hash(self.toplevel())
+
+
def indent(s, indent=4):
return '\n'.join(' '*indent + l for l in s.splitlines())
return CRDAttribute(elem_name, nullable, required, elem['type'])
elif elem == {}:
return CRDAttribute(elem_name, nullable, required, 'object')
+ elif 'x-kubernetes-int-or-string' in elem:
+ return CRDAttribute(elem_name, nullable, required, 'x-kubernetes-int-or-string')
+
assert False, str((elem_name, elem))
+def spec_get_schema(c_dict: Dict) -> Dict:
+ try:
+ return c_dict['spec']['validation']['openAPIV3Schema']
+ except (KeyError, TypeError):
+ pass
+ versions = c_dict['spec']['versions']
+ if len(versions) != 1:
+ raise RuntimeError(f'todo: {[v["name"] for v in versions]}')
+ return c_dict['spec']['versions'][0]["schema"]['openAPIV3Schema']
-def handle_crd(c_dict) -> Optional[CRDClass]:
+def handle_crd(c_dict: dict) -> Optional[CRDClass]:
try:
name = c_dict['spec']['names']['kind']
- s = c_dict['spec']['validation']['openAPIV3Schema']
+ s = spec_get_schema(c_dict)
except (KeyError, TypeError):
return None
s['required'] = ['spec']
c = handle_property(name, s, True)
- k8s_attrs = [CRDAttribute('apiVersion', False, True, 'string'),
- CRDAttribute('metadata', False, True, 'object'),
- CRDAttribute('status', False, False, 'object')]
- return CRDClass(c.name, False, True, k8s_attrs + c.attrs, base_class='CrdClass')
+ if 'apiVersion' not in [a.name for a in c.attrs]:
+ c.attrs.append(CRDAttribute('apiVersion', False, True, 'string'))
+ if 'metadata' not in [a.name for a in c.attrs]:
+ c.attrs.append(CRDAttribute('metadata', False, True, 'object'))
+ if 'status' not in [a.name for a in c.attrs]:
+ c.attrs.append(CRDAttribute('status', False, False, 'object'))
+ return CRDClass(c.name, False, True, c.attrs, base_class='CrdClass')
def local(yaml_filename):
pass
-def remove_duplicates(items):
- return OrderedDict.fromkeys(items).keys()
-
-
-def get_toplevels(crd):
- elems = list(crd.flatten())
-
- def dup_elems(l):
- ds = set([x for x in l if l.count(x) > 1])
- return ds
-
- names = [t.name for t in elems]
- for dup_name in dup_elems(names):
- dups = set(e.toplevel() for e in elems if e.name == dup_name)
- assert len(dups) == 1, str(dups)
-
- return remove_duplicates(cls.toplevel() for cls in elems)
+def remove_duplicates_by(items: List[T], key: Callable[[T], K], unify: Callable[[T, T], T]) -> List[T]:
+ res: OrderedDict[K, T] = OrderedDict()
+ for i in items:
+ k = key(i)
+ if k in res:
+ res[k] = unify(res[k], i)
+ else:
+ res[k] = i
+ return list(res.values())
+
+
+def remove_duplicates(items: List[T]) -> List[T]:
+ return list(OrderedDict.fromkeys(items).keys())
+
+
+def unify_classes(left: CRDBase, right: CRDBase) -> CRDBase:
+ assert left.name == right.name
+
+ if isinstance(left, CRDClass) and isinstance(right, CRDClass):
+ assert left.py_type == right.py_type
+ assert left.base_class == right.base_class
+ ret = CRDClass(
+ name=left.name,
+ nullable=left.nullable or right.nullable,
+ required=False,
+ attrs=remove_duplicates_by(right.attrs + left.attrs, lambda a: a.name, unify_classes), # type: ignore
+ base_class=left.base_class
+ )
+ for a in ret.attrs:
+ # we have to set all required properties to False
+ a.required = False
+ return ret
+
+ elif isinstance(left, CRDAttribute) and isinstance(right, CRDAttribute):
+ assert left.type == right.type
+ assert left.name == right.name
+ assert left.default_value == right.default_value
+ return CRDAttribute(
+ name=left.name,
+ nullable=left.nullable or right.nullable,
+ required=False,
+ type=left.type,
+ default_value=left.default_value
+ )
+ elif type(left) != type(right):
+ # handwaving
+ return CRDAttribute(
+ name=left.name,
+ nullable=left.nullable or right.nullable,
+ required=False,
+ type='object'
+ )
+ else:
+ assert left == right, (repr(left), repr(right))
+ return left
+
+
+def get_toplevels(crd: CRDBase) -> List[str]:
+ elems: List[CRDBase] = remove_duplicates(list(crd.flatten()))
+ res = remove_duplicates_by(elems, lambda c: c.py_type, unify_classes)
+ return [e.toplevel() for e in res]
def main(yaml_filename, outfolder):