]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | """ |
2 | Copyright (C) 2015 Red Hat, Inc. | |
3 | ||
4 | LGPL2. See file COPYING. | |
5 | """ | |
6 | ||
7 | from contextlib import contextmanager | |
8 | import errno | |
9 | import fcntl | |
10 | import json | |
11 | import logging | |
12 | import os | |
13 | import re | |
14 | import struct | |
15 | import sys | |
16 | import threading | |
17 | import time | |
18 | import uuid | |
19 | ||
20 | from ceph_argparse import json_command | |
21 | ||
22 | import cephfs | |
23 | import rados | |
24 | ||
25 | ||
26 | class RadosError(Exception): | |
27 | """ | |
28 | Something went wrong talking to Ceph with librados | |
29 | """ | |
30 | pass | |
31 | ||
32 | ||
33 | RADOS_TIMEOUT = 10 | |
34 | SNAP_DIR = ".snap" | |
35 | ||
36 | log = logging.getLogger(__name__) | |
37 | ||
38 | ||
39 | # Reserved volume group name which we use in paths for volumes | |
40 | # that are not assigned to a group (i.e. created with group=None) | |
41 | NO_GROUP_NAME = "_nogroup" | |
42 | ||
43 | # Filename extensions for meta files. | |
44 | META_FILE_EXT = ".meta" | |
45 | ||
46 | class VolumePath(object): | |
47 | """ | |
48 | Identify a volume's path as group->volume | |
49 | The Volume ID is a unique identifier, but this is a much more | |
50 | helpful thing to pass around. | |
51 | """ | |
52 | def __init__(self, group_id, volume_id): | |
53 | self.group_id = group_id | |
54 | self.volume_id = volume_id | |
55 | assert self.group_id != NO_GROUP_NAME | |
56 | assert self.volume_id != "" and self.volume_id is not None | |
57 | ||
58 | def __str__(self): | |
59 | return "{0}/{1}".format(self.group_id, self.volume_id) | |
60 | ||
61 | ||
62 | class ClusterTimeout(Exception): | |
63 | """ | |
64 | Exception indicating that we timed out trying to talk to the Ceph cluster, | |
65 | either to the mons, or to any individual daemon that the mons indicate ought | |
66 | to be up but isn't responding to us. | |
67 | """ | |
68 | pass | |
69 | ||
70 | ||
71 | class ClusterError(Exception): | |
72 | """ | |
73 | Exception indicating that the cluster returned an error to a command that | |
74 | we thought should be successful based on our last knowledge of the cluster | |
75 | state. | |
76 | """ | |
77 | def __init__(self, action, result_code, result_str): | |
78 | self._action = action | |
79 | self._result_code = result_code | |
80 | self._result_str = result_str | |
81 | ||
82 | def __str__(self): | |
83 | return "Error {0} (\"{1}\") while {2}".format( | |
84 | self._result_code, self._result_str, self._action) | |
85 | ||
86 | ||
87 | class RankEvicter(threading.Thread): | |
88 | """ | |
89 | Thread for evicting client(s) from a particular MDS daemon instance. | |
90 | ||
91 | This is more complex than simply sending a command, because we have to | |
92 | handle cases where MDS daemons might not be fully up yet, and/or might | |
93 | be transiently unresponsive to commands. | |
94 | """ | |
95 | class GidGone(Exception): | |
96 | pass | |
97 | ||
98 | POLL_PERIOD = 5 | |
99 | ||
100 | def __init__(self, volume_client, client_spec, rank, gid, mds_map, ready_timeout): | |
101 | """ | |
102 | :param client_spec: list of strings, used as filter arguments to "session evict" | |
103 | pass ["id=123"] to evict a single client with session id 123. | |
104 | """ | |
105 | self.rank = rank | |
106 | self.gid = gid | |
107 | self._mds_map = mds_map | |
108 | self._client_spec = client_spec | |
109 | self._volume_client = volume_client | |
110 | self._ready_timeout = ready_timeout | |
111 | self._ready_waited = 0 | |
112 | ||
113 | self.success = False | |
114 | self.exception = None | |
115 | ||
116 | super(RankEvicter, self).__init__() | |
117 | ||
118 | def _ready_to_evict(self): | |
119 | if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid: | |
120 | log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format( | |
121 | self._client_spec, self.rank, self.gid | |
122 | )) | |
123 | raise RankEvicter.GidGone() | |
124 | ||
125 | info = self._mds_map['info']["gid_{0}".format(self.gid)] | |
126 | log.debug("_ready_to_evict: state={0}".format(info['state'])) | |
127 | return info['state'] in ["up:active", "up:clientreplay"] | |
128 | ||
129 | def _wait_for_ready(self): | |
130 | """ | |
131 | Wait for that MDS rank to reach an active or clientreplay state, and | |
132 | not be laggy. | |
133 | """ | |
134 | while not self._ready_to_evict(): | |
135 | if self._ready_waited > self._ready_timeout: | |
136 | raise ClusterTimeout() | |
137 | ||
138 | time.sleep(self.POLL_PERIOD) | |
139 | self._ready_waited += self.POLL_PERIOD | |
140 | ||
141 | self._mds_map = self._volume_client._rados_command("mds dump", {}) | |
142 | ||
143 | def _evict(self): | |
144 | """ | |
145 | Run the eviction procedure. Return true on success, false on errors. | |
146 | """ | |
147 | ||
148 | # Wait til the MDS is believed by the mon to be available for commands | |
149 | try: | |
150 | self._wait_for_ready() | |
151 | except self.GidGone: | |
152 | return True | |
153 | ||
154 | # Then send it an evict | |
155 | ret = errno.ETIMEDOUT | |
156 | while ret == errno.ETIMEDOUT: | |
157 | log.debug("mds_command: {0}, {1}".format( | |
158 | "%s" % self.gid, ["session", "evict"] + self._client_spec | |
159 | )) | |
160 | ret, outb, outs = self._volume_client.fs.mds_command( | |
161 | "%s" % self.gid, | |
162 | [json.dumps({ | |
163 | "prefix": "session evict", | |
164 | "filters": self._client_spec | |
165 | })], "") | |
166 | log.debug("mds_command: complete {0} {1}".format(ret, outs)) | |
167 | ||
168 | # If we get a clean response, great, it's gone from that rank. | |
169 | if ret == 0: | |
170 | return True | |
171 | elif ret == errno.ETIMEDOUT: | |
172 | # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error) | |
173 | self._mds_map = self._volume_client._rados_command("mds dump", {}) | |
174 | try: | |
175 | self._wait_for_ready() | |
176 | except self.GidGone: | |
177 | return True | |
178 | else: | |
179 | raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs) | |
180 | ||
181 | def run(self): | |
182 | try: | |
183 | self._evict() | |
184 | except Exception as e: | |
185 | self.success = False | |
186 | self.exception = e | |
187 | else: | |
188 | self.success = True | |
189 | ||
190 | ||
191 | class EvictionError(Exception): | |
192 | pass | |
193 | ||
194 | ||
195 | class CephFSVolumeClientError(Exception): | |
196 | """ | |
197 | Something went wrong talking to Ceph using CephFSVolumeClient. | |
198 | """ | |
199 | pass | |
200 | ||
201 | ||
202 | CEPHFSVOLUMECLIENT_VERSION_HISTORY = """ | |
203 | ||
204 | CephFSVolumeClient Version History: | |
205 | ||
206 | * 1 - Initial version | |
207 | ||
208 | """ | |
209 | ||
210 | ||
211 | class CephFSVolumeClient(object): | |
212 | """ | |
213 | Combine libcephfs and librados interfaces to implement a | |
214 | 'Volume' concept implemented as a cephfs directory and | |
215 | client capabilities which restrict mount access to this | |
216 | directory. | |
217 | ||
218 | Additionally, volumes may be in a 'Group'. Conveniently, | |
219 | volumes are a lot like manila shares, and groups are a lot | |
220 | like manila consistency groups. | |
221 | ||
222 | Refer to volumes with VolumePath, which specifies the | |
223 | volume and group IDs (both strings). The group ID may | |
224 | be None. | |
225 | ||
226 | In general, functions in this class are allowed raise rados.Error | |
227 | or cephfs.Error exceptions in unexpected situations. | |
228 | """ | |
229 | ||
230 | # Current version | |
231 | version = 1 | |
232 | # Earliest compatible version | |
233 | compat_version = 1 | |
234 | ||
235 | # Where shall we create our volumes? | |
236 | POOL_PREFIX = "fsvolume_" | |
237 | DEFAULT_VOL_PREFIX = "/volumes" | |
238 | DEFAULT_NS_PREFIX = "fsvolumens_" | |
239 | ||
240 | def __init__(self, auth_id, conf_path, cluster_name, volume_prefix=None, pool_ns_prefix=None): | |
241 | self.fs = None | |
242 | self.rados = None | |
243 | self.connected = False | |
244 | self.conf_path = conf_path | |
245 | self.cluster_name = cluster_name | |
246 | self.auth_id = auth_id | |
247 | self.volume_prefix = volume_prefix if volume_prefix else self.DEFAULT_VOL_PREFIX | |
248 | self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else self.DEFAULT_NS_PREFIX | |
249 | # For flock'ing in cephfs, I want a unique ID to distinguish me | |
250 | # from any other manila-share services that are loading this module. | |
251 | # We could use pid, but that's unnecessary weak: generate a | |
252 | # UUID | |
253 | self._id = struct.unpack(">Q", uuid.uuid1().get_bytes()[0:8])[0] | |
254 | ||
255 | # TODO: version the on-disk structures | |
256 | ||
257 | def recover(self): | |
258 | # Scan all auth keys to see if they're dirty: if they are, they have | |
259 | # state that might not have propagated to Ceph or to the related | |
260 | # volumes yet. | |
261 | ||
262 | # Important: we *always* acquire locks in the order auth->volume | |
263 | # That means a volume can never be dirty without the auth key | |
264 | # we're updating it with being dirty at the same time. | |
265 | ||
266 | # First list the auth IDs that have potentially dirty on-disk metadata | |
267 | log.debug("Recovering from partial auth updates (if any)...") | |
268 | ||
269 | try: | |
270 | dir_handle = self.fs.opendir(self.volume_prefix) | |
271 | except cephfs.ObjectNotFound: | |
272 | log.debug("Nothing to recover. No auth meta files.") | |
273 | return | |
274 | ||
275 | d = self.fs.readdir(dir_handle) | |
276 | auth_ids = [] | |
277 | ||
278 | if not d: | |
279 | log.debug("Nothing to recover. No auth meta files.") | |
280 | ||
281 | while d: | |
282 | # Identify auth IDs from auth meta filenames. The auth meta files | |
283 | # are named as, "$<auth_id><meta filename extension>" | |
284 | regex = "^\$(.*){0}$".format(re.escape(META_FILE_EXT)) | |
285 | match = re.search(regex, d.d_name) | |
286 | if match: | |
287 | auth_ids.append(match.group(1)) | |
288 | ||
289 | d = self.fs.readdir(dir_handle) | |
290 | ||
291 | self.fs.closedir(dir_handle) | |
292 | ||
293 | # Key points based on ordering: | |
294 | # * Anything added in VMeta is already added in AMeta | |
295 | # * Anything added in Ceph is already added in VMeta | |
296 | # * Anything removed in VMeta is already removed in Ceph | |
297 | # * Anything removed in AMeta is already removed in VMeta | |
298 | ||
299 | # Deauthorization: because I only update metadata AFTER the | |
300 | # update of the next level down, I have the same ordering of | |
301 | # -> things which exist in the AMeta should also exist | |
302 | # in the VMeta, should also exist in Ceph, and the same | |
303 | # recovery procedure that gets me consistent after crashes | |
304 | # during authorization will also work during deauthorization | |
305 | ||
306 | # Now for each auth ID, check for dirty flag and apply updates | |
307 | # if dirty flag is found | |
308 | for auth_id in auth_ids: | |
309 | with self._auth_lock(auth_id): | |
310 | auth_meta = self._auth_metadata_get(auth_id) | |
311 | if not auth_meta or not auth_meta['volumes']: | |
312 | # Clean up auth meta file | |
313 | self.fs.unlink(self._auth_metadata_path(auth_id)) | |
314 | continue | |
315 | if not auth_meta['dirty']: | |
316 | continue | |
317 | self._recover_auth_meta(auth_id, auth_meta) | |
318 | ||
319 | log.debug("Recovered from partial auth updates (if any).") | |
320 | ||
321 | def _recover_auth_meta(self, auth_id, auth_meta): | |
322 | """ | |
323 | Call me after locking the auth meta file. | |
324 | """ | |
325 | remove_volumes = [] | |
326 | ||
327 | for volume, volume_data in auth_meta['volumes'].items(): | |
328 | if not volume_data['dirty']: | |
329 | continue | |
330 | ||
331 | (group_id, volume_id) = volume.split('/') | |
332 | group_id = group_id if group_id is not 'None' else None | |
333 | volume_path = VolumePath(group_id, volume_id) | |
334 | access_level = volume_data['access_level'] | |
335 | ||
336 | with self._volume_lock(volume_path): | |
337 | vol_meta = self._volume_metadata_get(volume_path) | |
338 | ||
339 | # No VMeta update indicates that there was no auth update | |
340 | # in Ceph either. So it's safe to remove corresponding | |
341 | # partial update in AMeta. | |
342 | if not vol_meta or auth_id not in vol_meta['auths']: | |
343 | remove_volumes.append(volume) | |
344 | continue | |
345 | ||
346 | want_auth = { | |
347 | 'access_level': access_level, | |
348 | 'dirty': False, | |
349 | } | |
350 | # VMeta update looks clean. Ceph auth update must have been | |
351 | # clean. | |
352 | if vol_meta['auths'][auth_id] == want_auth: | |
353 | continue | |
354 | ||
355 | readonly = True if access_level is 'r' else False | |
356 | self._authorize_volume(volume_path, auth_id, readonly) | |
357 | ||
358 | # Recovered from partial auth updates for the auth ID's access | |
359 | # to a volume. | |
360 | auth_meta['volumes'][volume]['dirty'] = False | |
361 | self._auth_metadata_set(auth_id, auth_meta) | |
362 | ||
363 | for volume in remove_volumes: | |
364 | del auth_meta['volumes'][volume] | |
365 | ||
366 | if not auth_meta['volumes']: | |
367 | # Clean up auth meta file | |
368 | self.fs.unlink(self._auth_metadata_path(auth_id)) | |
369 | return | |
370 | ||
371 | # Recovered from all partial auth updates for the auth ID. | |
372 | auth_meta['dirty'] = False | |
373 | self._auth_metadata_set(auth_id, auth_meta) | |
374 | ||
375 | ||
376 | def evict(self, auth_id, timeout=30, volume_path=None): | |
377 | """ | |
378 | Evict all clients based on the authorization ID and optionally based on | |
379 | the volume path mounted. Assumes that the authorization key has been | |
380 | revoked prior to calling this function. | |
381 | ||
382 | This operation can throw an exception if the mon cluster is unresponsive, or | |
383 | any individual MDS daemon is unresponsive for longer than the timeout passed in. | |
384 | """ | |
385 | ||
386 | client_spec = ["auth_name={0}".format(auth_id), ] | |
387 | if volume_path: | |
388 | client_spec.append("client_metadata.root={0}". | |
389 | format(self._get_path(volume_path))) | |
390 | ||
391 | log.info("evict clients with {0}".format(', '.join(client_spec))) | |
392 | ||
393 | mds_map = self._rados_command("mds dump", {}) | |
394 | ||
395 | up = {} | |
396 | for name, gid in mds_map['up'].items(): | |
397 | # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0" | |
398 | assert name.startswith("mds_") | |
399 | up[int(name[4:])] = gid | |
400 | ||
401 | # For all MDS ranks held by a daemon | |
402 | # Do the parallelism in python instead of using "tell mds.*", because | |
403 | # the latter doesn't give us per-mds output | |
404 | threads = [] | |
405 | for rank, gid in up.items(): | |
406 | thread = RankEvicter(self, client_spec, rank, gid, mds_map, | |
407 | timeout) | |
408 | thread.start() | |
409 | threads.append(thread) | |
410 | ||
411 | for t in threads: | |
412 | t.join() | |
413 | ||
414 | log.info("evict: joined all") | |
415 | ||
416 | for t in threads: | |
417 | if not t.success: | |
418 | msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}". | |
419 | format(', '.join(client_spec), t.rank, t.gid, t.exception) | |
420 | ) | |
421 | log.error(msg) | |
422 | raise EvictionError(msg) | |
423 | ||
424 | def _get_path(self, volume_path): | |
425 | """ | |
426 | Determine the path within CephFS where this volume will live | |
427 | :return: absolute path (string) | |
428 | """ | |
429 | return os.path.join( | |
430 | self.volume_prefix, | |
431 | volume_path.group_id if volume_path.group_id is not None else NO_GROUP_NAME, | |
432 | volume_path.volume_id) | |
433 | ||
434 | def _get_group_path(self, group_id): | |
435 | if group_id is None: | |
436 | raise ValueError("group_id may not be None") | |
437 | ||
438 | return os.path.join( | |
439 | self.volume_prefix, | |
440 | group_id | |
441 | ) | |
442 | ||
443 | def connect(self, premount_evict = None): | |
444 | """ | |
445 | ||
446 | :param premount_evict: Optional auth_id to evict before mounting the filesystem: callers | |
447 | may want to use this to specify their own auth ID if they expect | |
448 | to be a unique instance and don't want to wait for caps to time | |
449 | out after failure of another instance of themselves. | |
450 | """ | |
451 | log.debug("Connecting to RADOS with config {0}...".format(self.conf_path)) | |
452 | self.rados = rados.Rados( | |
453 | name="client.{0}".format(self.auth_id), | |
454 | clustername=self.cluster_name, | |
455 | conffile=self.conf_path, | |
456 | conf={} | |
457 | ) | |
458 | self.rados.connect() | |
459 | ||
460 | log.debug("Connection to RADOS complete") | |
461 | ||
462 | log.debug("Connecting to cephfs...") | |
463 | self.fs = cephfs.LibCephFS(rados_inst=self.rados) | |
464 | log.debug("CephFS initializing...") | |
465 | self.fs.init() | |
466 | if premount_evict is not None: | |
467 | log.debug("Premount eviction of {0} starting".format(premount_evict)) | |
468 | self.evict(premount_evict) | |
469 | log.debug("Premount eviction of {0} completes".format(premount_evict)) | |
470 | log.debug("CephFS mounting...") | |
471 | self.fs.mount() | |
472 | log.debug("Connection to cephfs complete") | |
473 | ||
474 | # Recover from partial auth updates due to a previous | |
475 | # crash. | |
476 | self.recover() | |
477 | ||
478 | def get_mon_addrs(self): | |
479 | log.info("get_mon_addrs") | |
480 | result = [] | |
481 | mon_map = self._rados_command("mon dump") | |
482 | for mon in mon_map['mons']: | |
483 | ip_port = mon['addr'].split("/")[0] | |
484 | result.append(ip_port) | |
485 | ||
486 | return result | |
487 | ||
488 | def disconnect(self): | |
489 | log.info("disconnect") | |
490 | if self.fs: | |
491 | log.debug("Disconnecting cephfs...") | |
492 | self.fs.shutdown() | |
493 | self.fs = None | |
494 | log.debug("Disconnecting cephfs complete") | |
495 | ||
496 | if self.rados: | |
497 | log.debug("Disconnecting rados...") | |
498 | self.rados.shutdown() | |
499 | self.rados = None | |
500 | log.debug("Disconnecting rados complete") | |
501 | ||
502 | def __del__(self): | |
503 | self.disconnect() | |
504 | ||
505 | def _get_pool_id(self, osd_map, pool_name): | |
506 | # Maybe borrow the OSDMap wrapper class from calamari if more helpers | |
507 | # like this are needed. | |
508 | for pool in osd_map['pools']: | |
509 | if pool['pool_name'] == pool_name: | |
510 | return pool['pool'] | |
511 | ||
512 | return None | |
513 | ||
514 | def _create_volume_pool(self, pool_name): | |
515 | """ | |
516 | Idempotently create a pool for use as a CephFS data pool, with the given name | |
517 | ||
518 | :return The ID of the created pool | |
519 | """ | |
520 | osd_map = self._rados_command('osd dump', {}) | |
521 | ||
522 | existing_id = self._get_pool_id(osd_map, pool_name) | |
523 | if existing_id is not None: | |
524 | log.info("Pool {0} already exists".format(pool_name)) | |
525 | return existing_id | |
526 | ||
527 | osd_count = len(osd_map['osds']) | |
528 | ||
529 | # We can't query the actual cluster config remotely, but since this is | |
530 | # just a heuristic we'll assume that the ceph.conf we have locally reflects | |
531 | # that in use in the rest of the cluster. | |
532 | pg_warn_max_per_osd = int(self.rados.conf_get('mon_pg_warn_max_per_osd')) | |
533 | ||
534 | other_pgs = 0 | |
535 | for pool in osd_map['pools']: | |
536 | if not pool['pool_name'].startswith(self.POOL_PREFIX): | |
537 | other_pgs += pool['pg_num'] | |
538 | ||
539 | # A basic heuristic for picking pg_num: work out the max number of | |
540 | # PGs we can have without tripping a warning, then subtract the number | |
541 | # of PGs already created by non-manila pools, then divide by ten. That'll | |
542 | # give you a reasonable result on a system where you have "a few" manila | |
543 | # shares. | |
544 | pg_num = ((pg_warn_max_per_osd * osd_count) - other_pgs) / 10 | |
545 | # TODO Alternatively, respect an override set by the user. | |
546 | ||
547 | self._rados_command( | |
548 | 'osd pool create', | |
549 | { | |
550 | 'pool': pool_name, | |
551 | 'pg_num': pg_num | |
552 | } | |
553 | ) | |
554 | ||
555 | osd_map = self._rados_command('osd dump', {}) | |
556 | pool_id = self._get_pool_id(osd_map, pool_name) | |
557 | ||
558 | if pool_id is None: | |
559 | # If the pool isn't there, that's either a ceph bug, or it's some outside influence | |
560 | # removing it right after we created it. | |
561 | log.error("OSD map doesn't contain expected pool '{0}':\n{1}".format( | |
562 | pool_name, json.dumps(osd_map, indent=2) | |
563 | )) | |
564 | raise RuntimeError("Pool '{0}' not present in map after creation".format(pool_name)) | |
565 | else: | |
566 | return pool_id | |
567 | ||
568 | def create_group(self, group_id): | |
569 | # Prevent craftily-named volume groups from colliding with the meta | |
570 | # files. | |
571 | if group_id.endswith(META_FILE_EXT): | |
572 | raise ValueError("group ID cannot end with '{0}'.".format( | |
573 | META_FILE_EXT)) | |
574 | path = self._get_group_path(group_id) | |
575 | self._mkdir_p(path) | |
576 | ||
577 | def destroy_group(self, group_id): | |
578 | path = self._get_group_path(group_id) | |
579 | try: | |
580 | self.fs.stat(self.volume_prefix) | |
581 | except cephfs.ObjectNotFound: | |
582 | pass | |
583 | else: | |
584 | self.fs.rmdir(path) | |
585 | ||
586 | def _mkdir_p(self, path): | |
587 | try: | |
588 | self.fs.stat(path) | |
589 | except cephfs.ObjectNotFound: | |
590 | pass | |
591 | else: | |
592 | return | |
593 | ||
594 | parts = path.split(os.path.sep) | |
595 | ||
596 | for i in range(1, len(parts) + 1): | |
597 | subpath = os.path.join(*parts[0:i]) | |
598 | try: | |
599 | self.fs.stat(subpath) | |
600 | except cephfs.ObjectNotFound: | |
601 | self.fs.mkdir(subpath, 0o755) | |
602 | ||
603 | def create_volume(self, volume_path, size=None, data_isolated=False): | |
604 | """ | |
605 | Set up metadata, pools and auth for a volume. | |
606 | ||
607 | This function is idempotent. It is safe to call this again | |
608 | for an already-created volume, even if it is in use. | |
609 | ||
610 | :param volume_path: VolumePath instance | |
611 | :param size: In bytes, or None for no size limit | |
612 | :param data_isolated: If true, create a separate OSD pool for this volume | |
613 | :return: | |
614 | """ | |
615 | path = self._get_path(volume_path) | |
616 | log.info("create_volume: {0}".format(path)) | |
617 | ||
618 | self._mkdir_p(path) | |
619 | ||
620 | if size is not None: | |
621 | self.fs.setxattr(path, 'ceph.quota.max_bytes', size.__str__(), 0) | |
622 | ||
623 | # data_isolated means create a separate pool for this volume | |
624 | if data_isolated: | |
625 | pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id) | |
626 | log.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path, pool_name)) | |
627 | pool_id = self._create_volume_pool(pool_name) | |
628 | mds_map = self._rados_command("mds dump", {}) | |
629 | if pool_id not in mds_map['data_pools']: | |
630 | self._rados_command("mds add_data_pool", { | |
631 | 'pool': pool_name | |
632 | }) | |
633 | self.fs.setxattr(path, 'ceph.dir.layout.pool', pool_name, 0) | |
634 | ||
635 | # enforce security isolation, use seperate namespace for this volume | |
636 | namespace = "{0}{1}".format(self.pool_ns_prefix, volume_path.volume_id) | |
637 | log.info("create_volume: {0}, using rados namespace {1} to isolate data.".format(volume_path, namespace)) | |
638 | self.fs.setxattr(path, 'ceph.dir.layout.pool_namespace', namespace, 0) | |
639 | ||
640 | # Create a volume meta file, if it does not already exist, to store | |
641 | # data about auth ids having access to the volume | |
642 | fd = self.fs.open(self._volume_metadata_path(volume_path), | |
643 | os.O_CREAT, 0o755) | |
644 | self.fs.close(fd) | |
645 | ||
646 | return { | |
647 | 'mount_path': path | |
648 | } | |
649 | ||
650 | def delete_volume(self, volume_path, data_isolated=False): | |
651 | """ | |
652 | Make a volume inaccessible to guests. This function is | |
653 | idempotent. This is the fast part of tearing down a volume: you must | |
654 | also later call purge_volume, which is the slow part. | |
655 | ||
656 | :param volume_path: Same identifier used in create_volume | |
657 | :return: | |
658 | """ | |
659 | ||
660 | path = self._get_path(volume_path) | |
661 | log.info("delete_volume: {0}".format(path)) | |
662 | ||
663 | # Create the trash folder if it doesn't already exist | |
664 | trash = os.path.join(self.volume_prefix, "_deleting") | |
665 | self._mkdir_p(trash) | |
666 | ||
667 | # We'll move it to here | |
668 | trashed_volume = os.path.join(trash, volume_path.volume_id) | |
669 | ||
670 | # Move the volume's data to the trash folder | |
671 | try: | |
672 | self.fs.stat(path) | |
673 | except cephfs.ObjectNotFound: | |
674 | log.warning("Trying to delete volume '{0}' but it's already gone".format( | |
675 | path)) | |
676 | else: | |
677 | self.fs.rename(path, trashed_volume) | |
678 | ||
679 | # Delete the volume meta file, if it's not already deleted | |
680 | vol_meta_path = self._volume_metadata_path(volume_path) | |
681 | try: | |
682 | self.fs.unlink(vol_meta_path) | |
683 | except cephfs.ObjectNotFound: | |
684 | pass | |
685 | ||
686 | def purge_volume(self, volume_path, data_isolated=False): | |
687 | """ | |
688 | Finish clearing up a volume that was previously passed to delete_volume. This | |
689 | function is idempotent. | |
690 | """ | |
691 | ||
692 | trash = os.path.join(self.volume_prefix, "_deleting") | |
693 | trashed_volume = os.path.join(trash, volume_path.volume_id) | |
694 | ||
695 | try: | |
696 | self.fs.stat(trashed_volume) | |
697 | except cephfs.ObjectNotFound: | |
698 | log.warning("Trying to purge volume '{0}' but it's already been purged".format( | |
699 | trashed_volume)) | |
700 | return | |
701 | ||
702 | def rmtree(root_path): | |
703 | log.debug("rmtree {0}".format(root_path)) | |
704 | dir_handle = self.fs.opendir(root_path) | |
705 | d = self.fs.readdir(dir_handle) | |
706 | while d: | |
707 | if d.d_name not in [".", ".."]: | |
708 | # Do not use os.path.join because it is sensitive | |
709 | # to string encoding, we just pass through dnames | |
710 | # as byte arrays | |
711 | d_full = "{0}/{1}".format(root_path, d.d_name) | |
712 | if d.is_dir(): | |
713 | rmtree(d_full) | |
714 | else: | |
715 | self.fs.unlink(d_full) | |
716 | ||
717 | d = self.fs.readdir(dir_handle) | |
718 | self.fs.closedir(dir_handle) | |
719 | ||
720 | self.fs.rmdir(root_path) | |
721 | ||
722 | rmtree(trashed_volume) | |
723 | ||
724 | if data_isolated: | |
725 | pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id) | |
726 | osd_map = self._rados_command("osd dump", {}) | |
727 | pool_id = self._get_pool_id(osd_map, pool_name) | |
728 | mds_map = self._rados_command("mds dump", {}) | |
729 | if pool_id in mds_map['data_pools']: | |
730 | self._rados_command("mds remove_data_pool", { | |
731 | 'pool': pool_name | |
732 | }) | |
733 | self._rados_command("osd pool delete", | |
734 | { | |
735 | "pool": pool_name, | |
736 | "pool2": pool_name, | |
737 | "sure": "--yes-i-really-really-mean-it" | |
738 | }) | |
739 | ||
740 | def _get_ancestor_xattr(self, path, attr): | |
741 | """ | |
742 | Helper for reading layout information: if this xattr is missing | |
743 | on the requested path, keep checking parents until we find it. | |
744 | """ | |
745 | try: | |
746 | result = self.fs.getxattr(path, attr) | |
747 | if result == "": | |
748 | # Annoying! cephfs gives us empty instead of an error when attr not found | |
749 | raise cephfs.NoData() | |
750 | else: | |
751 | return result | |
752 | except cephfs.NoData: | |
753 | if path == "/": | |
754 | raise | |
755 | else: | |
756 | return self._get_ancestor_xattr(os.path.split(path)[0], attr) | |
757 | ||
758 | def _check_compat_version(self, compat_version): | |
759 | if self.version < compat_version: | |
760 | msg = ("The current version of CephFSVolumeClient, version {0} " | |
761 | "does not support the required feature. Need version {1} " | |
762 | "or greater".format(self.version, compat_version) | |
763 | ) | |
764 | log.error(msg) | |
765 | raise CephFSVolumeClientError(msg) | |
766 | ||
767 | def _metadata_get(self, path): | |
768 | """ | |
769 | Return a deserialized JSON object, or None | |
770 | """ | |
771 | fd = self.fs.open(path, "r") | |
772 | # TODO iterate instead of assuming file < 4MB | |
773 | read_bytes = self.fs.read(fd, 0, 4096 * 1024) | |
774 | self.fs.close(fd) | |
775 | if read_bytes: | |
776 | return json.loads(read_bytes) | |
777 | else: | |
778 | return None | |
779 | ||
780 | def _metadata_set(self, path, data): | |
781 | serialized = json.dumps(data) | |
782 | fd = self.fs.open(path, "w") | |
783 | try: | |
784 | self.fs.write(fd, serialized, 0) | |
785 | self.fs.fsync(fd, 0) | |
786 | finally: | |
787 | self.fs.close(fd) | |
788 | ||
789 | def _lock(self, path): | |
790 | @contextmanager | |
791 | def fn(): | |
792 | while(1): | |
793 | fd = self.fs.open(path, os.O_CREAT, 0o755) | |
794 | self.fs.flock(fd, fcntl.LOCK_EX, self._id) | |
795 | ||
796 | # The locked file will be cleaned up sometime. It could be | |
797 | # unlinked e.g., by an another manila share instance, before | |
798 | # lock was applied on it. Perform checks to ensure that this | |
799 | # does not happen. | |
800 | try: | |
801 | statbuf = self.fs.stat(path) | |
802 | except cephfs.ObjectNotFound: | |
803 | self.fs.close(fd) | |
804 | continue | |
805 | ||
806 | fstatbuf = self.fs.fstat(fd) | |
807 | if statbuf.st_ino == fstatbuf.st_ino: | |
808 | break | |
809 | ||
810 | try: | |
811 | yield | |
812 | finally: | |
813 | self.fs.flock(fd, fcntl.LOCK_UN, self._id) | |
814 | self.fs.close(fd) | |
815 | ||
816 | return fn() | |
817 | ||
818 | def _auth_metadata_path(self, auth_id): | |
819 | return os.path.join(self.volume_prefix, "${0}{1}".format( | |
820 | auth_id, META_FILE_EXT)) | |
821 | ||
822 | def _auth_lock(self, auth_id): | |
823 | return self._lock(self._auth_metadata_path(auth_id)) | |
824 | ||
825 | def _auth_metadata_get(self, auth_id): | |
826 | """ | |
827 | Call me with the metadata locked! | |
828 | ||
829 | Check whether a auth metadata structure can be decoded by the current | |
830 | version of CephFSVolumeClient. | |
831 | ||
832 | Return auth metadata that the current version of CephFSVolumeClient | |
833 | can decode. | |
834 | """ | |
835 | auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id)) | |
836 | ||
837 | if auth_metadata: | |
838 | self._check_compat_version(auth_metadata['compat_version']) | |
839 | ||
840 | return auth_metadata | |
841 | ||
842 | def _auth_metadata_set(self, auth_id, data): | |
843 | """ | |
844 | Call me with the metadata locked! | |
845 | ||
846 | Fsync the auth metadata. | |
847 | ||
848 | Add two version attributes to the auth metadata, | |
849 | 'compat_version', the minimum CephFSVolumeClient version that can | |
850 | decode the metadata, and 'version', the CephFSVolumeClient version | |
851 | that encoded the metadata. | |
852 | """ | |
853 | data['compat_version'] = 1 | |
854 | data['version'] = 1 | |
855 | return self._metadata_set(self._auth_metadata_path(auth_id), data) | |
856 | ||
857 | def _volume_metadata_path(self, volume_path): | |
858 | return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format( | |
859 | volume_path.group_id if volume_path.group_id else "", | |
860 | volume_path.volume_id, | |
861 | META_FILE_EXT | |
862 | )) | |
863 | ||
864 | def _volume_lock(self, volume_path): | |
865 | """ | |
866 | Return a ContextManager which locks the authorization metadata for | |
867 | a particular volume, and persists a flag to the metadata indicating | |
868 | that it is currently locked, so that we can detect dirty situations | |
869 | during recovery. | |
870 | ||
871 | This lock isn't just to make access to the metadata safe: it's also | |
872 | designed to be used over the two-step process of checking the | |
873 | metadata and then responding to an authorization request, to | |
874 | ensure that at the point we respond the metadata hasn't changed | |
875 | in the background. It's key to how we avoid security holes | |
876 | resulting from races during that problem , | |
877 | """ | |
878 | return self._lock(self._volume_metadata_path(volume_path)) | |
879 | ||
880 | def _volume_metadata_get(self, volume_path): | |
881 | """ | |
882 | Call me with the metadata locked! | |
883 | ||
884 | Check whether a volume metadata structure can be decoded by the current | |
885 | version of CephFSVolumeClient. | |
886 | ||
887 | Return a volume_metadata structure that the current version of | |
888 | CephFSVolumeClient can decode. | |
889 | """ | |
890 | volume_metadata = self._metadata_get(self._volume_metadata_path(volume_path)) | |
891 | ||
892 | if volume_metadata: | |
893 | self._check_compat_version(volume_metadata['compat_version']) | |
894 | ||
895 | return volume_metadata | |
896 | ||
897 | def _volume_metadata_set(self, volume_path, data): | |
898 | """ | |
899 | Call me with the metadata locked! | |
900 | ||
901 | Add two version attributes to the volume metadata, | |
902 | 'compat_version', the minimum CephFSVolumeClient version that can | |
903 | decode the metadata and 'version', the CephFSVolumeClient version | |
904 | that encoded the metadata. | |
905 | """ | |
906 | data['compat_version'] = 1 | |
907 | data['version'] = 1 | |
908 | return self._metadata_set(self._volume_metadata_path(volume_path), data) | |
909 | ||
910 | def authorize(self, volume_path, auth_id, readonly=False, tenant_id=None): | |
911 | """ | |
912 | Get-or-create a Ceph auth identity for `auth_id` and grant them access | |
913 | to | |
914 | :param volume_path: | |
915 | :param auth_id: | |
916 | :param readonly: | |
917 | :param tenant_id: Optionally provide a stringizable object to | |
918 | restrict any created cephx IDs to other callers | |
919 | passing the same tenant ID. | |
920 | :return: | |
921 | """ | |
922 | ||
923 | with self._auth_lock(auth_id): | |
924 | # Existing meta, or None, to be updated | |
925 | auth_meta = self._auth_metadata_get(auth_id) | |
926 | ||
927 | # volume data to be inserted | |
928 | volume_path_str = str(volume_path) | |
929 | volume = { | |
930 | volume_path_str : { | |
931 | # The access level at which the auth_id is authorized to | |
932 | # access the volume. | |
933 | 'access_level': 'r' if readonly else 'rw', | |
934 | 'dirty': True, | |
935 | } | |
936 | } | |
937 | if auth_meta is None: | |
938 | sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format( | |
939 | auth_id, tenant_id | |
940 | )) | |
941 | log.debug("Authorize: no existing meta") | |
942 | auth_meta = { | |
943 | 'dirty': True, | |
944 | 'tenant_id': tenant_id.__str__() if tenant_id else None, | |
945 | 'volumes': volume | |
946 | } | |
947 | ||
948 | # Note: this is *not* guaranteeing that the key doesn't already | |
949 | # exist in Ceph: we are allowing VolumeClient tenants to | |
950 | # 'claim' existing Ceph keys. In order to prevent VolumeClient | |
951 | # tenants from reading e.g. client.admin keys, you need to | |
952 | # have configured your VolumeClient user (e.g. Manila) to | |
953 | # have mon auth caps that prevent it from accessing those keys | |
954 | # (e.g. limit it to only access keys with a manila.* prefix) | |
955 | else: | |
956 | # Disallow tenants to share auth IDs | |
957 | if auth_meta['tenant_id'].__str__() != tenant_id.__str__(): | |
958 | msg = "auth ID: {0} is already in use".format(auth_id) | |
959 | log.error(msg) | |
960 | raise CephFSVolumeClientError(msg) | |
961 | ||
962 | if auth_meta['dirty']: | |
963 | self._recover_auth_meta(auth_id, auth_meta) | |
964 | ||
965 | log.debug("Authorize: existing tenant {tenant}".format( | |
966 | tenant=auth_meta['tenant_id'] | |
967 | )) | |
968 | auth_meta['dirty'] = True | |
969 | auth_meta['volumes'].update(volume) | |
970 | ||
971 | self._auth_metadata_set(auth_id, auth_meta) | |
972 | ||
973 | with self._volume_lock(volume_path): | |
974 | key = self._authorize_volume(volume_path, auth_id, readonly) | |
975 | ||
976 | auth_meta['dirty'] = False | |
977 | auth_meta['volumes'][volume_path_str]['dirty'] = False | |
978 | self._auth_metadata_set(auth_id, auth_meta) | |
979 | ||
980 | if tenant_id: | |
981 | return { | |
982 | 'auth_key': key | |
983 | } | |
984 | else: | |
985 | # Caller wasn't multi-tenant aware: be safe and don't give | |
986 | # them a key | |
987 | return { | |
988 | 'auth_key': None | |
989 | } | |
990 | ||
991 | def _authorize_volume(self, volume_path, auth_id, readonly): | |
992 | vol_meta = self._volume_metadata_get(volume_path) | |
993 | ||
994 | access_level = 'r' if readonly else 'rw' | |
995 | auth = { | |
996 | auth_id: { | |
997 | 'access_level': access_level, | |
998 | 'dirty': True, | |
999 | } | |
1000 | } | |
1001 | ||
1002 | if vol_meta is None: | |
1003 | vol_meta = { | |
1004 | 'auths': auth | |
1005 | } | |
1006 | else: | |
1007 | vol_meta['auths'].update(auth) | |
1008 | self._volume_metadata_set(volume_path, vol_meta) | |
1009 | ||
1010 | key = self._authorize_ceph(volume_path, auth_id, readonly) | |
1011 | ||
1012 | vol_meta['auths'][auth_id]['dirty'] = False | |
1013 | self._volume_metadata_set(volume_path, vol_meta) | |
1014 | ||
1015 | return key | |
1016 | ||
1017 | def _authorize_ceph(self, volume_path, auth_id, readonly): | |
1018 | path = self._get_path(volume_path) | |
1019 | log.debug("Authorizing Ceph id '{0}' for path '{1}'".format( | |
1020 | auth_id, path | |
1021 | )) | |
1022 | ||
1023 | # First I need to work out what the data pool is for this share: | |
1024 | # read the layout | |
1025 | pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool") | |
1026 | namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_namespace") | |
1027 | ||
1028 | # Now construct auth capabilities that give the guest just enough | |
1029 | # permissions to access the share | |
1030 | client_entity = "client.{0}".format(auth_id) | |
1031 | want_access_level = 'r' if readonly else 'rw' | |
1032 | want_mds_cap = 'allow {0} path={1}'.format(want_access_level, path) | |
1033 | want_osd_cap = 'allow {0} pool={1} namespace={2}'.format( | |
1034 | want_access_level, pool_name, namespace) | |
1035 | ||
1036 | try: | |
1037 | existing = self._rados_command( | |
1038 | 'auth get', | |
1039 | { | |
1040 | 'entity': client_entity | |
1041 | } | |
1042 | ) | |
1043 | # FIXME: rados raising Error instead of ObjectNotFound in auth get failure | |
1044 | except rados.Error: | |
1045 | caps = self._rados_command( | |
1046 | 'auth get-or-create', | |
1047 | { | |
1048 | 'entity': client_entity, | |
1049 | 'caps': [ | |
1050 | 'mds', want_mds_cap, | |
1051 | 'osd', want_osd_cap, | |
1052 | 'mon', 'allow r'] | |
1053 | }) | |
1054 | else: | |
1055 | # entity exists, update it | |
1056 | cap = existing[0] | |
1057 | ||
1058 | # Construct auth caps that if present might conflict with the desired | |
1059 | # auth caps. | |
1060 | unwanted_access_level = 'r' if want_access_level is 'rw' else 'rw' | |
1061 | unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, path) | |
1062 | unwanted_osd_cap = 'allow {0} pool={1} namespace={2}'.format( | |
1063 | unwanted_access_level, pool_name, namespace) | |
1064 | ||
1065 | def cap_update(orig, want, unwanted): | |
1066 | # Updates the existing auth caps such that there is a single | |
1067 | # occurrence of wanted auth caps and no occurrence of | |
1068 | # conflicting auth caps. | |
1069 | ||
1070 | cap_tokens = set(orig.split(",")) | |
1071 | ||
1072 | cap_tokens.discard(unwanted) | |
1073 | cap_tokens.add(want) | |
1074 | ||
1075 | return ",".join(cap_tokens) | |
1076 | ||
1077 | osd_cap_str = cap_update(cap['caps'].get('osd', ""), want_osd_cap, unwanted_osd_cap) | |
1078 | mds_cap_str = cap_update(cap['caps'].get('mds', ""), want_mds_cap, unwanted_mds_cap) | |
1079 | ||
1080 | caps = self._rados_command( | |
1081 | 'auth caps', | |
1082 | { | |
1083 | 'entity': client_entity, | |
1084 | 'caps': [ | |
1085 | 'mds', mds_cap_str, | |
1086 | 'osd', osd_cap_str, | |
1087 | 'mon', cap['caps'].get('mon', 'allow r')] | |
1088 | }) | |
1089 | caps = self._rados_command( | |
1090 | 'auth get', | |
1091 | { | |
1092 | 'entity': client_entity | |
1093 | } | |
1094 | ) | |
1095 | ||
1096 | # Result expected like this: | |
1097 | # [ | |
1098 | # { | |
1099 | # "entity": "client.foobar", | |
1100 | # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==", | |
1101 | # "caps": { | |
1102 | # "mds": "allow *", | |
1103 | # "mon": "allow *" | |
1104 | # } | |
1105 | # } | |
1106 | # ] | |
1107 | assert len(caps) == 1 | |
1108 | assert caps[0]['entity'] == client_entity | |
1109 | return caps[0]['key'] | |
1110 | ||
1111 | def deauthorize(self, volume_path, auth_id): | |
1112 | with self._auth_lock(auth_id): | |
1113 | # Existing meta, or None, to be updated | |
1114 | auth_meta = self._auth_metadata_get(auth_id) | |
1115 | ||
1116 | volume_path_str = str(volume_path) | |
1117 | if (auth_meta is None) or (not auth_meta['volumes']): | |
1118 | log.warn("deauthorized called for already-removed auth" | |
1119 | "ID '{auth_id}' for volume ID '{volume}'".format( | |
1120 | auth_id=auth_id, volume=volume_path.volume_id | |
1121 | )) | |
1122 | # Clean up the auth meta file of an auth ID | |
1123 | self.fs.unlink(self._auth_metadata_path(auth_id)) | |
1124 | return | |
1125 | ||
1126 | if volume_path_str not in auth_meta['volumes']: | |
1127 | log.warn("deauthorized called for already-removed auth" | |
1128 | "ID '{auth_id}' for volume ID '{volume}'".format( | |
1129 | auth_id=auth_id, volume=volume_path.volume_id | |
1130 | )) | |
1131 | return | |
1132 | ||
1133 | if auth_meta['dirty']: | |
1134 | self._recover_auth_meta(auth_id, auth_meta) | |
1135 | ||
1136 | auth_meta['dirty'] = True | |
1137 | auth_meta['volumes'][volume_path_str]['dirty'] = True | |
1138 | self._auth_metadata_set(auth_id, auth_meta) | |
1139 | ||
1140 | self._deauthorize_volume(volume_path, auth_id) | |
1141 | ||
1142 | # Filter out the volume we're deauthorizing | |
1143 | del auth_meta['volumes'][volume_path_str] | |
1144 | ||
1145 | # Clean up auth meta file | |
1146 | if not auth_meta['volumes']: | |
1147 | self.fs.unlink(self._auth_metadata_path(auth_id)) | |
1148 | return | |
1149 | ||
1150 | auth_meta['dirty'] = False | |
1151 | self._auth_metadata_set(auth_id, auth_meta) | |
1152 | ||
1153 | def _deauthorize_volume(self, volume_path, auth_id): | |
1154 | with self._volume_lock(volume_path): | |
1155 | vol_meta = self._volume_metadata_get(volume_path) | |
1156 | ||
1157 | if (vol_meta is None) or (auth_id not in vol_meta['auths']): | |
1158 | log.warn("deauthorized called for already-removed auth" | |
1159 | "ID '{auth_id}' for volume ID '{volume}'".format( | |
1160 | auth_id=auth_id, volume=volume_path.volume_id | |
1161 | )) | |
1162 | return | |
1163 | ||
1164 | vol_meta['auths'][auth_id]['dirty'] = True | |
1165 | self._volume_metadata_set(volume_path, vol_meta) | |
1166 | ||
1167 | self._deauthorize(volume_path, auth_id) | |
1168 | ||
1169 | # Remove the auth_id from the metadata *after* removing it | |
1170 | # from ceph, so that if we crashed here, we would actually | |
1171 | # recreate the auth ID during recovery (i.e. end up with | |
1172 | # a consistent state). | |
1173 | ||
1174 | # Filter out the auth we're removing | |
1175 | del vol_meta['auths'][auth_id] | |
1176 | self._volume_metadata_set(volume_path, vol_meta) | |
1177 | ||
1178 | def _deauthorize(self, volume_path, auth_id): | |
1179 | """ | |
1180 | The volume must still exist. | |
1181 | """ | |
1182 | client_entity = "client.{0}".format(auth_id) | |
1183 | path = self._get_path(volume_path) | |
1184 | pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool") | |
1185 | namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_namespace") | |
1186 | ||
1187 | # The auth_id might have read-only or read-write mount access for the | |
1188 | # volume path. | |
1189 | access_levels = ('r', 'rw') | |
1190 | want_mds_caps = {'allow {0} path={1}'.format(access_level, path) | |
1191 | for access_level in access_levels} | |
1192 | want_osd_caps = {'allow {0} pool={1} namespace={2}'.format( | |
1193 | access_level, pool_name, namespace) | |
1194 | for access_level in access_levels} | |
1195 | ||
1196 | try: | |
1197 | existing = self._rados_command( | |
1198 | 'auth get', | |
1199 | { | |
1200 | 'entity': client_entity | |
1201 | } | |
1202 | ) | |
1203 | ||
1204 | def cap_remove(orig, want): | |
1205 | cap_tokens = set(orig.split(",")) | |
1206 | return ",".join(cap_tokens.difference(want)) | |
1207 | ||
1208 | cap = existing[0] | |
1209 | osd_cap_str = cap_remove(cap['caps'].get('osd', ""), want_osd_caps) | |
1210 | mds_cap_str = cap_remove(cap['caps'].get('mds', ""), want_mds_caps) | |
1211 | if (not osd_cap_str) and (not mds_cap_str): | |
1212 | self._rados_command('auth del', {'entity': client_entity}, decode=False) | |
1213 | else: | |
1214 | self._rados_command( | |
1215 | 'auth caps', | |
1216 | { | |
1217 | 'entity': client_entity, | |
1218 | 'caps': [ | |
1219 | 'mds', mds_cap_str, | |
1220 | 'osd', osd_cap_str, | |
1221 | 'mon', cap['caps'].get('mon', 'allow r')] | |
1222 | }) | |
1223 | ||
1224 | # FIXME: rados raising Error instead of ObjectNotFound in auth get failure | |
1225 | except rados.Error: | |
1226 | # Already gone, great. | |
1227 | return | |
1228 | ||
1229 | def get_authorized_ids(self, volume_path): | |
1230 | """ | |
1231 | Expose a list of auth IDs that have access to a volume. | |
1232 | ||
1233 | return: a list of (auth_id, access_level) tuples, where | |
1234 | the access_level can be 'r' , or 'rw'. | |
1235 | None if no auth ID is given access to the volume. | |
1236 | """ | |
1237 | with self._volume_lock(volume_path): | |
1238 | meta = self._volume_metadata_get(volume_path) | |
1239 | auths = [] | |
1240 | if not meta or not meta['auths']: | |
1241 | return None | |
1242 | ||
1243 | for auth, auth_data in meta['auths'].items(): | |
1244 | # Skip partial auth updates. | |
1245 | if not auth_data['dirty']: | |
1246 | auths.append((auth, auth_data['access_level'])) | |
1247 | ||
1248 | return auths | |
1249 | ||
1250 | def _rados_command(self, prefix, args=None, decode=True): | |
1251 | """ | |
1252 | Safer wrapper for ceph_argparse.json_command, which raises | |
1253 | Error exception instead of relying on caller to check return | |
1254 | codes. | |
1255 | ||
1256 | Error exception can result from: | |
1257 | * Timeout | |
1258 | * Actual legitimate errors | |
1259 | * Malformed JSON output | |
1260 | ||
1261 | return: Decoded object from ceph, or None if empty string returned. | |
1262 | If decode is False, return a string (the data returned by | |
1263 | ceph command) | |
1264 | """ | |
1265 | if args is None: | |
1266 | args = {} | |
1267 | ||
1268 | argdict = args.copy() | |
1269 | argdict['format'] = 'json' | |
1270 | ||
1271 | ret, outbuf, outs = json_command(self.rados, | |
1272 | prefix=prefix, | |
1273 | argdict=argdict, | |
1274 | timeout=RADOS_TIMEOUT) | |
1275 | if ret != 0: | |
1276 | raise rados.Error(outs) | |
1277 | else: | |
1278 | if decode: | |
1279 | if outbuf: | |
1280 | try: | |
1281 | return json.loads(outbuf) | |
1282 | except (ValueError, TypeError): | |
1283 | raise RadosError("Invalid JSON output for command {0}".format(argdict)) | |
1284 | else: | |
1285 | return None | |
1286 | else: | |
1287 | return outbuf | |
1288 | ||
1289 | def get_used_bytes(self, volume_path): | |
1290 | return int(self.fs.getxattr(self._get_path(volume_path), "ceph.dir.rbytes")) | |
1291 | ||
1292 | def set_max_bytes(self, volume_path, max_bytes): | |
1293 | self.fs.setxattr(self._get_path(volume_path), 'ceph.quota.max_bytes', | |
1294 | max_bytes.__str__() if max_bytes is not None else "0", | |
1295 | 0) | |
1296 | ||
1297 | def _snapshot_path(self, dir_path, snapshot_name): | |
1298 | return os.path.join( | |
1299 | dir_path, SNAP_DIR, snapshot_name | |
1300 | ) | |
1301 | ||
1302 | def _snapshot_create(self, dir_path, snapshot_name): | |
1303 | # TODO: raise intelligible exception for clusters where snaps are disabled | |
1304 | self.fs.mkdir(self._snapshot_path(dir_path, snapshot_name), 0o755) | |
1305 | ||
1306 | def _snapshot_destroy(self, dir_path, snapshot_name): | |
1307 | """ | |
1308 | Remove a snapshot, or do nothing if it already doesn't exist. | |
1309 | """ | |
1310 | try: | |
1311 | self.fs.rmdir(self._snapshot_path(dir_path, snapshot_name)) | |
1312 | except cephfs.ObjectNotFound: | |
1313 | log.warn("Snapshot was already gone: {0}".format(snapshot_name)) | |
1314 | ||
1315 | def create_snapshot_volume(self, volume_path, snapshot_name): | |
1316 | self._snapshot_create(self._get_path(volume_path), snapshot_name) | |
1317 | ||
1318 | def destroy_snapshot_volume(self, volume_path, snapshot_name): | |
1319 | self._snapshot_destroy(self._get_path(volume_path), snapshot_name) | |
1320 | ||
1321 | def create_snapshot_group(self, group_id, snapshot_name): | |
1322 | if group_id is None: | |
1323 | raise RuntimeError("Group ID may not be None") | |
1324 | ||
1325 | return self._snapshot_create(self._get_group_path(group_id), snapshot_name) | |
1326 | ||
1327 | def destroy_snapshot_group(self, group_id, snapshot_name): | |
1328 | if group_id is None: | |
1329 | raise RuntimeError("Group ID may not be None") | |
1330 | if snapshot_name is None: | |
1331 | raise RuntimeError("Snapshot name may not be None") | |
1332 | ||
1333 | return self._snapshot_destroy(self._get_group_path(group_id), snapshot_name) | |
1334 | ||
1335 | def _cp_r(self, src, dst): | |
1336 | # TODO | |
1337 | raise NotImplementedError() | |
1338 | ||
1339 | def clone_volume_to_existing(self, dest_volume_path, src_volume_path, src_snapshot_name): | |
1340 | dest_fs_path = self._get_path(dest_volume_path) | |
1341 | src_snapshot_path = self._snapshot_path(self._get_path(src_volume_path), src_snapshot_name) | |
1342 | ||
1343 | self._cp_r(src_snapshot_path, dest_fs_path) |