1 from collections
import namedtuple
2 from rest
.app
.util
import memoize
4 from rest
.logger
import logger
8 CRUSH_RULE_TYPE_REPLICATED
= 1
9 CRUSH_RULE_TYPE_ERASURE
= 3
12 ServiceId
= namedtuple('ServiceId', ['fsid', 'service_type', 'service_id'])
20 CRUSH_RULE
= 'crush_rule'
25 class SyncObject(object):
27 An object from a Ceph cluster that we are maintaining
28 a copy of on the Calamari server.
30 We wrap these JSON-serializable objects in a python object to:
32 - Decorate them with things like id-to-entry dicts
33 - Have a generic way of seeing the version of an object
36 def __init__(self
, version
, data
):
37 self
.version
= version
43 Slight bastardization of cmp. Takes two versions,
44 and returns a cmp-like value, except that if versions
45 are not sortable we only return 0 or 1.
47 # Version is something unique per version (like a hash)
48 return 1 if a
!= b
else 0
51 class VersionedSyncObject(SyncObject
):
54 # Version is something numeric like an epoch
58 class OsdMap(VersionedSyncObject
):
61 def __init__(self
, version
, data
):
62 super(OsdMap
, self
).__init
__(version
, data
)
64 self
.osds_by_id
= dict([(o
['osd'], o
) for o
in data
['osds']])
65 self
.pools_by_id
= dict([(p
['pool'], p
) for p
in data
['pools']])
66 self
.osd_tree_node_by_id
= dict([(o
['id'], o
) for o
in data
['tree']['nodes'] if o
['id'] >= 0])
69 flags
= data
.get('flags', '').replace('pauserd,pausewr', 'pause')
70 tokenized_flags
= flags
.split(',')
72 self
.flags
= dict([(x
, x
in tokenized_flags
) for x
in OSD_FLAGS
])
76 self
.osd_tree_node_by_id
= {}
77 self
.flags
= dict([(x
, False) for x
in OSD_FLAGS
])
80 def osd_metadata(self
):
81 return self
.data
['osd_metadata']
84 def get_tree_nodes_by_id(self
):
85 return dict((n
["id"], n
) for n
in self
.data
['tree']["nodes"])
87 def _get_crush_rule_osds(self
, rule
):
88 nodes_by_id
= self
.get_tree_nodes_by_id()
90 def _gather_leaf_ids(node
):
92 return set([node
['id']])
95 for child_id
in node
['children']:
99 result |
= _gather_leaf_ids(nodes_by_id
[child_id
])
103 def _gather_descendent_ids(node
, typ
):
105 for child_id
in node
['children']:
106 child_node
= nodes_by_id
[child_id
]
107 if child_node
['type'] == typ
:
108 result
.add(child_node
['id'])
109 elif 'children' in child_node
:
110 result |
= _gather_descendent_ids(child_node
, typ
)
114 def _gather_osds(root
, steps
):
116 return set([root
['id']])
120 if step
['op'] == 'choose_firstn':
121 # Choose all descendents of the current node of type 'type'
122 d
= _gather_descendent_ids(root
, step
['type'])
123 for desc_node
in [nodes_by_id
[i
] for i
in d
]:
124 osds |
= _gather_osds(desc_node
, steps
[1:])
125 elif step
['op'] == 'chooseleaf_firstn':
126 # Choose all descendents of the current node of type 'type',
127 # and select all leaves beneath those
128 for desc_node
in [nodes_by_id
[i
] for i
in _gather_descendent_ids(root
, step
['type'])]:
129 # Short circuit another iteration to find the emit
130 # and assume anything we've done a chooseleaf on
131 # is going to be part of the selected set of osds
132 osds |
= _gather_leaf_ids(desc_node
)
133 elif step
['op'] == 'emit':
140 for i
, step
in enumerate(rule
['steps']):
141 if step
['op'] == 'take':
142 osds |
= _gather_osds(nodes_by_id
[step
['item']], rule
['steps'][i
+ 1:])
147 def osds_by_rule_id(self
):
149 for rule
in self
.data
['crush']['rules']:
150 result
[rule
['rule_id']] = list(self
._get
_crush
_rule
_osds
(rule
))
156 def osds_by_pool(self
):
158 Get the OSDS which may be used in this pool
160 :return dict of pool ID to OSD IDs in the pool
164 for pool_id
, pool
in self
.pools_by_id
.items():
166 for rule
in [r
for r
in self
.data
['crush']['rules'] if r
['ruleset'] == pool
['crush_ruleset']]:
167 if rule
['min_size'] <= pool
['size'] <= rule
['max_size']:
168 osds
= self
.osds_by_rule_id
[rule
['rule_id']]
171 # Fallthrough, the pool size didn't fall within any of the rules in its ruleset, Calamari
172 # doesn't understand. Just report all OSDs instead of failing horribly.
173 log
.error("Cannot determine OSDS for pool %s" % pool_id
)
174 osds
= self
.osds_by_id
.keys()
176 result
[pool_id
] = osds
184 A dict of OSD ID to list of pool IDs
186 osds
= dict([(osd_id
, []) for osd_id
in self
.osds_by_id
.keys()])
187 for pool_id
in self
.pools_by_id
.keys():
188 for in_pool_id
in self
.osds_by_pool
[pool_id
]:
189 osds
[in_pool_id
].append(pool_id
)
194 class FsMap(VersionedSyncObject
):
198 class MonMap(VersionedSyncObject
):
202 class MonStatus(VersionedSyncObject
):
205 def __init__(self
, version
, data
):
206 super(MonStatus
, self
).__init
__(version
, data
)
208 self
.mons_by_rank
= dict([(m
['rank'], m
) for m
in data
['monmap']['mons']])
210 self
.mons_by_rank
= {}
213 class PgSummary(SyncObject
):
215 A summary of the state of PGs in the cluster, reported by pool and by OSD.
220 class Health(SyncObject
):
224 class Config(SyncObject
):
228 class NotFound(Exception):
229 def __init__(self
, object_type
, object_id
):
230 self
.object_type
= object_type
231 self
.object_id
= object_id
234 return "Object of type %s with id %s not found" % (self
.object_type
, self
.object_id
)
237 # The objects that ClusterMonitor keeps copies of from the mon
238 SYNC_OBJECT_TYPES
= [FsMap
, OsdMap
, MonMap
, MonStatus
, PgSummary
, Health
, Config
]
239 SYNC_OBJECT_STR_TYPE
= dict((t
.str, t
) for t
in SYNC_OBJECT_TYPES
)
241 USER_REQUEST_COMPLETE
= 'complete'
242 USER_REQUEST_SUBMITTED
= 'submitted'
244 # List of allowable things to send as ceph commands to OSDs
245 OSD_IMPLEMENTED_COMMANDS
= ('scrub', 'deep_scrub', 'repair')
246 OSD_FLAGS
= ('pause', 'noup', 'nodown', 'noout', 'noin', 'nobackfill', 'norecover', 'noscrub', 'nodeep-scrub')
248 # Severity codes for Calamari events
256 CRITICAL
: "CRITICAL",
259 RECOVERY
: "RECOVERY",
263 STR_TO_SEVERITY
= dict([(b
, a
) for (a
, b
) in SEVERITIES
.items()])
266 def severity_str(severity
):
267 return SEVERITIES
[severity
]
270 def severity_from_str(severitry_str
):
271 return STR_TO_SEVERITY
[severitry_str
]