]> git.proxmox.com Git - ceph.git/blob - ceph/src/python-common/ceph/deployment/inventory.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / python-common / ceph / deployment / inventory.py
1 try:
2 from typing import List, Optional, Dict, Any
3 except ImportError:
4 pass # for type checking
5
6 import json
7
8
9 class Devices(object):
10 """
11 A container for Device instances with reporting
12 """
13
14 def __init__(self, devices):
15 # type: (List[Device]) -> None
16 self.devices = devices # type: List[Device]
17
18 def __eq__(self, other: Any) -> bool:
19 return self.to_json() == other.to_json()
20
21 def to_json(self):
22 # type: () -> List[dict]
23 return [d.to_json() for d in self.devices]
24
25 @classmethod
26 def from_json(cls, input):
27 # type: (List[Dict[str, Any]]) -> Devices
28 return cls([Device.from_json(i) for i in input])
29
30 def copy(self):
31 # type: () -> Devices
32 return Devices(devices=list(self.devices))
33
34
35 class Device(object):
36 report_fields = [
37 'rejected_reasons',
38 'available',
39 'path',
40 'sys_api',
41 'lvs',
42 'human_readable_type',
43 'device_id',
44 'lsm_data',
45 ]
46
47 def __init__(self,
48 path, # type: str
49 sys_api=None, # type: Optional[Dict[str, Any]]
50 available=None, # type: Optional[bool]
51 rejected_reasons=None, # type: Optional[List[str]]
52 lvs=None, # type: Optional[List[str]]
53 device_id=None, # type: Optional[str]
54 lsm_data=None, # type: Optional[Dict[str, Dict[str, str]]]
55 ):
56 self.path = path
57 self.sys_api = sys_api if sys_api is not None else {} # type: Dict[str, Any]
58 self.available = available
59 self.rejected_reasons = rejected_reasons if rejected_reasons is not None else []
60 self.lvs = lvs
61 self.device_id = device_id
62 self.lsm_data = lsm_data if lsm_data is not None else {} # type: Dict[str, Dict[str, str]]
63
64 def to_json(self):
65 # type: () -> dict
66 return {
67 k: getattr(self, k) for k in self.report_fields
68 }
69
70 @classmethod
71 def from_json(cls, input):
72 # type: (Dict[str, Any]) -> Device
73 if not isinstance(input, dict):
74 raise ValueError('Device: Expected dict. Got `{}...`'.format(json.dumps(input)[:10]))
75 ret = cls(
76 **{
77 key: input.get(key, None)
78 for key in Device.report_fields
79 if key != 'human_readable_type'
80 }
81 )
82 if ret.rejected_reasons:
83 ret.rejected_reasons = sorted(ret.rejected_reasons)
84 return ret
85
86 @property
87 def human_readable_type(self):
88 # type: () -> str
89 if self.sys_api is None or 'rotational' not in self.sys_api:
90 return "unknown"
91 return 'hdd' if self.sys_api["rotational"] == "1" else 'ssd'