]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | """ |
2 | Generate Python files containing data Python models classes for | |
3 | all properties of the all CRDs in the file | |
4 | ||
5 | **Note**: generate_model_classes.py is independent of Rook or Ceph. It can be used for all | |
6 | CRDs. | |
7 | ||
8 | For example: | |
9 | python3 -m venv venv | |
10 | pip install -r requirements.txt | |
11 | python generate_model_classes.py <crds.yaml> <output-folder> | |
12 | python setup.py develop | |
13 | ||
14 | Usage: | |
15 | generate_model_classes.py <crds.yaml> <output-folder> | |
16 | """ | |
17 | import os | |
18 | from abc import ABC, abstractmethod | |
19 | from collections import OrderedDict | |
20effc67 TL |
20 | from typing import List, Union, Iterator, Optional, Dict, TypeVar, Callable |
21 | import copy | |
9f95a23c TL |
22 | |
23 | import yaml | |
24 | try: | |
25 | from dataclasses import dataclass | |
26 | except ImportError: | |
27 | from attr import dataclass # type: ignore | |
28 | ||
20effc67 TL |
29 | T = TypeVar('T') |
30 | K = TypeVar('K') | |
31 | ||
9f95a23c TL |
32 | header = '''""" |
33 | This file is automatically generated. | |
34 | Do not modify. | |
35 | """ | |
36 | ||
37 | try: | |
38 | from typing import Any, Optional, Union, List | |
39 | except ImportError: | |
40 | pass | |
41 | ||
42 | from .._helper import _omit, CrdObject, CrdObjectList, CrdClass | |
43 | ||
44 | ''' | |
45 | ||
46 | @dataclass # type: ignore | |
47 | class CRDBase(ABC): | |
48 | name: str | |
49 | nullable: bool | |
50 | required: bool | |
51 | ||
52 | @property | |
53 | def py_name(self): | |
20effc67 TL |
54 | if self.name == 'exec': |
55 | return 'exec_1' | |
56 | return self.name.replace('-', '_') | |
9f95a23c TL |
57 | |
58 | @property | |
59 | @abstractmethod | |
60 | def py_type(self): | |
61 | ... | |
62 | ||
20effc67 TL |
63 | @property |
64 | def py_type_escaped(self): | |
65 | return self.py_type | |
66 | ||
67 | ||
68 | @abstractmethod | |
69 | def flatten(self) -> Iterator[Union['CRDClass', 'CRDList', 'CRDAttribute']]: | |
70 | ... | |
71 | ||
9f95a23c | 72 | @abstractmethod |
20effc67 | 73 | def toplevel(self) -> str: |
9f95a23c TL |
74 | ... |
75 | ||
76 | def py_property(self): | |
77 | return f""" | |
78 | @property | |
79 | def {self.py_name}(self): | |
80 | # type: () -> {self.py_property_return_type} | |
81 | return self._property_impl('{self.py_name}') | |
82 | ||
83 | @{self.py_name}.setter | |
84 | def {self.py_name}(self, new_val): | |
85 | # type: ({self.py_param_type}) -> None | |
86 | self._{self.py_name} = new_val | |
87 | """.strip() | |
88 | ||
89 | @property | |
90 | def py_param(self): | |
91 | if not self.has_default: | |
92 | return f'{self.py_name}, # type: {self.py_param_type}' | |
93 | return f'{self.py_name}=_omit, # type: {self.py_param_type}' | |
94 | ||
95 | @property | |
96 | def has_default(self): | |
97 | return not self.required | |
98 | ||
99 | @property | |
100 | def py_param_type(self): | |
101 | return f'Optional[{self.py_type}]' if (self.nullable or not self.required) else self.py_type | |
102 | ||
103 | @property | |
104 | def py_property_return_type(self): | |
105 | return f'Optional[{self.py_type}]' if (self.nullable) else self.py_type | |
106 | ||
107 | @dataclass | |
108 | class CRDAttribute(CRDBase): | |
109 | type: str | |
110 | default_value: str='_omit' | |
111 | ||
112 | @property | |
113 | def py_param(self): | |
114 | if not self.has_default: | |
115 | return f'{self.py_name}, # type: {self.py_param_type}' | |
116 | return f'{self.py_name}={self.default_value}, # type: {self.py_param_type}' | |
117 | ||
118 | @property | |
119 | def has_default(self): | |
120 | return not self.required or self.default_value != '_omit' | |
121 | ||
122 | @property | |
123 | def py_type(self): | |
124 | return { | |
125 | 'integer': 'int', | |
126 | 'boolean': 'bool', | |
127 | 'string': 'str', | |
128 | 'object': 'Any', | |
129 | 'number': 'float', | |
20effc67 | 130 | 'x-kubernetes-int-or-string': 'Union[int, str]', |
9f95a23c TL |
131 | }[self.type] |
132 | ||
20effc67 | 133 | def flatten(self) -> Iterator[Union['CRDClass', 'CRDList', 'CRDAttribute']]: |
9f95a23c TL |
134 | yield from () |
135 | ||
136 | def toplevel(self): | |
137 | return '' | |
138 | ||
20effc67 TL |
139 | def __hash__(self): |
140 | return hash(repr(self)) | |
141 | ||
9f95a23c TL |
142 | |
143 | @dataclass | |
144 | class CRDList(CRDBase): | |
145 | items: 'CRDClass' | |
146 | ||
147 | @property | |
148 | def py_name(self): | |
149 | return self.name | |
150 | ||
151 | @property | |
152 | def py_type(self): | |
153 | return self.name[0].upper() + self.name[1:] + 'List' | |
154 | ||
20effc67 TL |
155 | @property |
156 | def py_type_escaped(self): | |
157 | return f"'{self.py_type}'" | |
158 | ||
9f95a23c TL |
159 | @property |
160 | def py_param_type(self): | |
161 | inner = f'Union[List[{self.items.py_type}], CrdObjectList]' | |
162 | return f'Optional[{inner}]' if (self.nullable or not self.required) else inner | |
163 | ||
164 | @property | |
165 | def py_property_return_type(self): | |
166 | inner = f'Union[List[{self.items.py_type}], CrdObjectList]' | |
167 | return f'Optional[{inner}]' if (self.nullable) else inner | |
168 | ||
20effc67 | 169 | def flatten(self) -> Iterator[Union['CRDClass', 'CRDList', 'CRDAttribute']]: |
9f95a23c TL |
170 | yield from self.items.flatten() |
171 | yield self | |
172 | ||
173 | def toplevel(self): | |
174 | py_type = self.items.py_type | |
175 | if py_type == 'Any': | |
176 | py_type = 'None' | |
177 | ||
178 | return f""" | |
179 | class {self.py_type}(CrdObjectList): | |
180 | {indent('_items_type = ' + py_type)} | |
181 | """.strip() | |
182 | ||
20effc67 TL |
183 | def __eq__(self, other): |
184 | if type(self) != type(other): | |
185 | return False | |
186 | return self.toplevel() == other.toplevel() | |
187 | ||
188 | def __hash__(self): | |
189 | return hash(self.toplevel()) | |
190 | ||
9f95a23c TL |
191 | |
192 | @dataclass | |
193 | class CRDClass(CRDBase): | |
194 | attrs: List[Union[CRDAttribute, 'CRDClass']] | |
195 | base_class: str = 'CrdObject' | |
196 | ||
20effc67 | 197 | def toplevel(self) -> str: |
9f95a23c TL |
198 | ps = '\n\n'.join(a.py_property() for a in self.attrs) |
199 | return f"""class {self.py_type}({self.base_class}): | |
200 | {indent(self.py_properties())} | |
201 | ||
202 | {indent(self.py_init())} | |
203 | ||
204 | {indent(ps)} | |
205 | """.strip() | |
206 | ||
207 | @property | |
208 | def sub_classes(self) -> List["CRDClass"]: | |
209 | return [a for a in self.attrs if isinstance(a, CRDClass)] | |
210 | ||
211 | @property | |
212 | def py_type(self): | |
213 | return self.name[0].upper() + self.name[1:] | |
214 | ||
20effc67 TL |
215 | @property |
216 | def py_type_escaped(self): | |
217 | return f"'{self.py_type}'" | |
218 | ||
219 | ||
9f95a23c TL |
220 | def py_properties(self): |
221 | def a_to_tuple(a): | |
222 | return ', '.join((f"'{a.name}'", | |
223 | f"'{a.py_name}'", | |
20effc67 | 224 | a.py_type_escaped.replace('Any', 'object'), |
9f95a23c TL |
225 | str(a.required), |
226 | str(a.nullable))) | |
227 | ||
228 | attrlist = ',\n'.join([f'({a_to_tuple(a)})' for a in self.attrs]) | |
229 | return f"""_properties = [\n{indent(attrlist)}\n]""" | |
230 | ||
20effc67 | 231 | def flatten(self) -> Iterator[Union['CRDClass', 'CRDList', 'CRDAttribute']]: |
9f95a23c TL |
232 | for sub_cls in self.attrs: |
233 | yield from sub_cls.flatten() | |
234 | yield self | |
235 | ||
236 | def py_init(self): | |
237 | sorted_attrs = sorted(self.attrs, key=lambda a: a.has_default) | |
238 | params = '\n'.join(a.py_param for a in sorted_attrs) | |
239 | params_set = '\n'.join(f'{a.py_name}={a.py_name},' for a in sorted_attrs) | |
240 | return f""" | |
241 | def __init__(self, | |
242 | {indent(params, indent=4+9)} | |
243 | ): | |
244 | super({self.py_type}, self).__init__( | |
245 | {indent(params_set, indent=8)} | |
246 | ) | |
247 | """.strip() | |
248 | ||
20effc67 TL |
249 | def __eq__(self, other): |
250 | if type(self) != type(other): | |
251 | return False | |
252 | return self.toplevel() == other.toplevel() | |
253 | ||
254 | def __hash__(self): | |
255 | return hash(self.toplevel()) | |
256 | ||
257 | ||
9f95a23c TL |
258 | def indent(s, indent=4): |
259 | return '\n'.join(' '*indent + l for l in s.splitlines()) | |
260 | ||
261 | ||
262 | def handle_property(elem_name, elem: dict, required: bool): | |
263 | nullable = elem.get('nullable', False) | |
264 | if 'properties' in elem: | |
265 | ps = elem['properties'] | |
266 | required_elems = elem.get('required', []) | |
267 | sub_props = [handle_property(k, v, k in required_elems) for k, v in ps.items()] | |
268 | return CRDClass(elem_name, nullable, required, sub_props) | |
269 | elif 'items' in elem: | |
270 | item = handle_property(elem_name + 'Item', elem['items'], False) | |
271 | return CRDList(elem_name, nullable, required, item) | |
272 | elif 'type' in elem: | |
273 | return CRDAttribute(elem_name, nullable, required, elem['type']) | |
274 | elif elem == {}: | |
275 | return CRDAttribute(elem_name, nullable, required, 'object') | |
20effc67 TL |
276 | elif 'x-kubernetes-int-or-string' in elem: |
277 | return CRDAttribute(elem_name, nullable, required, 'x-kubernetes-int-or-string') | |
278 | ||
9f95a23c TL |
279 | assert False, str((elem_name, elem)) |
280 | ||
20effc67 TL |
281 | def spec_get_schema(c_dict: Dict) -> Dict: |
282 | try: | |
283 | return c_dict['spec']['validation']['openAPIV3Schema'] | |
284 | except (KeyError, TypeError): | |
285 | pass | |
286 | versions = c_dict['spec']['versions'] | |
287 | if len(versions) != 1: | |
288 | raise RuntimeError(f'todo: {[v["name"] for v in versions]}') | |
289 | return c_dict['spec']['versions'][0]["schema"]['openAPIV3Schema'] | |
9f95a23c | 290 | |
20effc67 | 291 | def handle_crd(c_dict: dict) -> Optional[CRDClass]: |
9f95a23c TL |
292 | try: |
293 | name = c_dict['spec']['names']['kind'] | |
20effc67 | 294 | s = spec_get_schema(c_dict) |
9f95a23c TL |
295 | except (KeyError, TypeError): |
296 | return None | |
297 | s['required'] = ['spec'] | |
298 | c = handle_property(name, s, True) | |
20effc67 TL |
299 | if 'apiVersion' not in [a.name for a in c.attrs]: |
300 | c.attrs.append(CRDAttribute('apiVersion', False, True, 'string')) | |
301 | if 'metadata' not in [a.name for a in c.attrs]: | |
302 | c.attrs.append(CRDAttribute('metadata', False, True, 'object')) | |
303 | if 'status' not in [a.name for a in c.attrs]: | |
304 | c.attrs.append(CRDAttribute('status', False, False, 'object')) | |
305 | return CRDClass(c.name, False, True, c.attrs, base_class='CrdClass') | |
9f95a23c TL |
306 | |
307 | ||
308 | def local(yaml_filename): | |
309 | with open(yaml_filename) as f: | |
310 | yamls = yaml.safe_load_all(f.read()) | |
311 | for y in yamls: | |
312 | try: | |
313 | yield y | |
314 | except AttributeError: | |
315 | pass | |
316 | ||
317 | ||
20effc67 TL |
318 | def remove_duplicates_by(items: List[T], key: Callable[[T], K], unify: Callable[[T, T], T]) -> List[T]: |
319 | res: OrderedDict[K, T] = OrderedDict() | |
320 | for i in items: | |
321 | k = key(i) | |
322 | if k in res: | |
323 | res[k] = unify(res[k], i) | |
324 | else: | |
325 | res[k] = i | |
326 | return list(res.values()) | |
327 | ||
328 | ||
329 | def remove_duplicates(items: List[T]) -> List[T]: | |
330 | return list(OrderedDict.fromkeys(items).keys()) | |
331 | ||
332 | ||
333 | def unify_classes(left: CRDBase, right: CRDBase) -> CRDBase: | |
334 | assert left.name == right.name | |
335 | ||
336 | if isinstance(left, CRDClass) and isinstance(right, CRDClass): | |
337 | assert left.py_type == right.py_type | |
338 | assert left.base_class == right.base_class | |
339 | ret = CRDClass( | |
340 | name=left.name, | |
341 | nullable=left.nullable or right.nullable, | |
342 | required=False, | |
343 | attrs=remove_duplicates_by(right.attrs + left.attrs, lambda a: a.name, unify_classes), # type: ignore | |
344 | base_class=left.base_class | |
345 | ) | |
346 | for a in ret.attrs: | |
347 | # we have to set all required properties to False | |
348 | a.required = False | |
349 | return ret | |
350 | ||
351 | elif isinstance(left, CRDAttribute) and isinstance(right, CRDAttribute): | |
352 | assert left.type == right.type | |
353 | assert left.name == right.name | |
354 | assert left.default_value == right.default_value | |
355 | return CRDAttribute( | |
356 | name=left.name, | |
357 | nullable=left.nullable or right.nullable, | |
358 | required=False, | |
359 | type=left.type, | |
360 | default_value=left.default_value | |
361 | ) | |
362 | elif type(left) != type(right): | |
363 | # handwaving | |
364 | return CRDAttribute( | |
365 | name=left.name, | |
366 | nullable=left.nullable or right.nullable, | |
367 | required=False, | |
368 | type='object' | |
369 | ) | |
370 | else: | |
371 | assert left == right, (repr(left), repr(right)) | |
372 | return left | |
373 | ||
374 | ||
375 | def get_toplevels(crd: CRDBase) -> List[str]: | |
376 | elems: List[CRDBase] = remove_duplicates(list(crd.flatten())) | |
377 | res = remove_duplicates_by(elems, lambda c: c.py_type, unify_classes) | |
378 | return [e.toplevel() for e in res] | |
9f95a23c TL |
379 | |
380 | ||
381 | def main(yaml_filename, outfolder): | |
382 | for crd in local(yaml_filename): | |
383 | valid_crd = handle_crd(crd) | |
384 | if valid_crd is not None: | |
385 | try: | |
386 | os.mkdir(outfolder) | |
387 | except FileExistsError: | |
388 | pass | |
389 | open(f'{outfolder}/__init__.py', 'w').close() | |
390 | ||
391 | with open(f'{outfolder}/{valid_crd.name.lower()}.py', 'w') as f: | |
392 | f.write(header) | |
393 | classes = get_toplevels(valid_crd) | |
394 | f.write('\n\n\n'.join(classes)) | |
395 | f.write('\n') | |
396 | ||
397 | ||
398 | if __name__ == '__main__': | |
399 | from docopt import docopt | |
400 | args = docopt(__doc__) | |
401 | yaml_filename = '/dev/stdin' if args["<crds.yaml>"] == '-' else args["<crds.yaml>"] | |
402 | main(yaml_filename, args["<output-folder>"]) |