]> git.proxmox.com Git - ceph.git/blob - ceph/src/python-common/ceph/deployment/inventory.py
import 15.2.4
[ceph.git] / ceph / src / python-common / ceph / deployment / inventory.py
1 try:
2 from typing import List, Optional, Dict, Any
3 except ImportError:
4 pass # for type checking
5
6 import json
7
8
9 class Devices(object):
10 """
11 A container for Device instances with reporting
12 """
13
14 def __init__(self, devices):
15 # type: (List[Device]) -> None
16 self.devices = devices # type: List[Device]
17
18 def __eq__(self, other):
19 return self.to_json() == other.to_json()
20
21 def to_json(self):
22 # type: () -> List[dict]
23 return [d.to_json() for d in self.devices]
24
25 @classmethod
26 def from_json(cls, input):
27 # type: (List[Dict[str, Any]]) -> Devices
28 return cls([Device.from_json(i) for i in input])
29
30 def copy(self):
31 # type: () -> Devices
32 return Devices(devices=list(self.devices))
33
34
35 class Device(object):
36 report_fields = [
37 'rejected_reasons',
38 'available',
39 'path',
40 'sys_api',
41 'lvs',
42 'human_readable_type',
43 'device_id'
44 ]
45
46 def __init__(self,
47 path, # type: str
48 sys_api=None, # type: Optional[Dict[str, Any]]
49 available=None, # type: Optional[bool]
50 rejected_reasons=None, # type: Optional[List[str]]
51 lvs=None, # type: Optional[List[str]]
52 device_id=None, # type: Optional[str]
53 ):
54 self.path = path
55 self.sys_api = sys_api if sys_api is not None else {} # type: Dict[str, Any]
56 self.available = available
57 self.rejected_reasons = rejected_reasons if rejected_reasons is not None else []
58 self.lvs = lvs
59 self.device_id = device_id
60
61 def to_json(self):
62 # type: () -> dict
63 return {
64 k: getattr(self, k) for k in self.report_fields
65 }
66
67 @classmethod
68 def from_json(cls, input):
69 # type: (Dict[str, Any]) -> Device
70 if not isinstance(input, dict):
71 raise ValueError('Device: Expected dict. Got `{}...`'.format(json.dumps(input)[:10]))
72 ret = cls(
73 **{
74 key: input.get(key, None)
75 for key in Device.report_fields
76 if key != 'human_readable_type'
77 }
78 )
79 return ret
80
81 @property
82 def human_readable_type(self):
83 # type: () -> str
84 if self.sys_api is None or 'rotational' not in self.sys_api:
85 return "unknown"
86 return 'hdd' if self.sys_api["rotational"] == "1" else 'ssd'