]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | """ |
2 | Copyright (C) 2015 Red Hat, Inc. | |
3 | ||
4 | LGPL2. See file COPYING. | |
5 | """ | |
6 | ||
7 | from contextlib import contextmanager | |
8 | import errno | |
9 | import fcntl | |
10 | import json | |
11 | import logging | |
12 | import os | |
13 | import re | |
14 | import struct | |
15 | import sys | |
16 | import threading | |
17 | import time | |
18 | import uuid | |
19 | ||
20 | from ceph_argparse import json_command | |
21 | ||
22 | import cephfs | |
23 | import rados | |
24 | ||
25 | ||
26 | class RadosError(Exception): | |
27 | """ | |
28 | Something went wrong talking to Ceph with librados | |
29 | """ | |
30 | pass | |
31 | ||
32 | ||
33 | RADOS_TIMEOUT = 10 | |
7c673cae FG |
34 | |
35 | log = logging.getLogger(__name__) | |
36 | ||
37 | ||
38 | # Reserved volume group name which we use in paths for volumes | |
39 | # that are not assigned to a group (i.e. created with group=None) | |
40 | NO_GROUP_NAME = "_nogroup" | |
41 | ||
42 | # Filename extensions for meta files. | |
43 | META_FILE_EXT = ".meta" | |
44 | ||
45 | class VolumePath(object): | |
46 | """ | |
47 | Identify a volume's path as group->volume | |
48 | The Volume ID is a unique identifier, but this is a much more | |
49 | helpful thing to pass around. | |
50 | """ | |
51 | def __init__(self, group_id, volume_id): | |
52 | self.group_id = group_id | |
53 | self.volume_id = volume_id | |
54 | assert self.group_id != NO_GROUP_NAME | |
55 | assert self.volume_id != "" and self.volume_id is not None | |
56 | ||
57 | def __str__(self): | |
58 | return "{0}/{1}".format(self.group_id, self.volume_id) | |
59 | ||
60 | ||
61 | class ClusterTimeout(Exception): | |
62 | """ | |
63 | Exception indicating that we timed out trying to talk to the Ceph cluster, | |
64 | either to the mons, or to any individual daemon that the mons indicate ought | |
65 | to be up but isn't responding to us. | |
66 | """ | |
67 | pass | |
68 | ||
69 | ||
70 | class ClusterError(Exception): | |
71 | """ | |
72 | Exception indicating that the cluster returned an error to a command that | |
73 | we thought should be successful based on our last knowledge of the cluster | |
74 | state. | |
75 | """ | |
76 | def __init__(self, action, result_code, result_str): | |
77 | self._action = action | |
78 | self._result_code = result_code | |
79 | self._result_str = result_str | |
80 | ||
81 | def __str__(self): | |
82 | return "Error {0} (\"{1}\") while {2}".format( | |
83 | self._result_code, self._result_str, self._action) | |
84 | ||
85 | ||
86 | class RankEvicter(threading.Thread): | |
87 | """ | |
88 | Thread for evicting client(s) from a particular MDS daemon instance. | |
89 | ||
90 | This is more complex than simply sending a command, because we have to | |
91 | handle cases where MDS daemons might not be fully up yet, and/or might | |
92 | be transiently unresponsive to commands. | |
93 | """ | |
94 | class GidGone(Exception): | |
95 | pass | |
96 | ||
97 | POLL_PERIOD = 5 | |
98 | ||
99 | def __init__(self, volume_client, client_spec, rank, gid, mds_map, ready_timeout): | |
100 | """ | |
101 | :param client_spec: list of strings, used as filter arguments to "session evict" | |
102 | pass ["id=123"] to evict a single client with session id 123. | |
103 | """ | |
104 | self.rank = rank | |
105 | self.gid = gid | |
106 | self._mds_map = mds_map | |
107 | self._client_spec = client_spec | |
108 | self._volume_client = volume_client | |
109 | self._ready_timeout = ready_timeout | |
110 | self._ready_waited = 0 | |
111 | ||
112 | self.success = False | |
113 | self.exception = None | |
114 | ||
115 | super(RankEvicter, self).__init__() | |
116 | ||
117 | def _ready_to_evict(self): | |
118 | if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid: | |
119 | log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format( | |
120 | self._client_spec, self.rank, self.gid | |
121 | )) | |
122 | raise RankEvicter.GidGone() | |
123 | ||
124 | info = self._mds_map['info']["gid_{0}".format(self.gid)] | |
125 | log.debug("_ready_to_evict: state={0}".format(info['state'])) | |
126 | return info['state'] in ["up:active", "up:clientreplay"] | |
127 | ||
128 | def _wait_for_ready(self): | |
129 | """ | |
130 | Wait for that MDS rank to reach an active or clientreplay state, and | |
131 | not be laggy. | |
132 | """ | |
133 | while not self._ready_to_evict(): | |
134 | if self._ready_waited > self._ready_timeout: | |
135 | raise ClusterTimeout() | |
136 | ||
137 | time.sleep(self.POLL_PERIOD) | |
138 | self._ready_waited += self.POLL_PERIOD | |
139 | ||
140 | self._mds_map = self._volume_client._rados_command("mds dump", {}) | |
141 | ||
142 | def _evict(self): | |
143 | """ | |
144 | Run the eviction procedure. Return true on success, false on errors. | |
145 | """ | |
146 | ||
147 | # Wait til the MDS is believed by the mon to be available for commands | |
148 | try: | |
149 | self._wait_for_ready() | |
150 | except self.GidGone: | |
151 | return True | |
152 | ||
153 | # Then send it an evict | |
154 | ret = errno.ETIMEDOUT | |
155 | while ret == errno.ETIMEDOUT: | |
156 | log.debug("mds_command: {0}, {1}".format( | |
157 | "%s" % self.gid, ["session", "evict"] + self._client_spec | |
158 | )) | |
159 | ret, outb, outs = self._volume_client.fs.mds_command( | |
160 | "%s" % self.gid, | |
161 | [json.dumps({ | |
162 | "prefix": "session evict", | |
163 | "filters": self._client_spec | |
164 | })], "") | |
165 | log.debug("mds_command: complete {0} {1}".format(ret, outs)) | |
166 | ||
167 | # If we get a clean response, great, it's gone from that rank. | |
168 | if ret == 0: | |
169 | return True | |
170 | elif ret == errno.ETIMEDOUT: | |
171 | # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error) | |
172 | self._mds_map = self._volume_client._rados_command("mds dump", {}) | |
173 | try: | |
174 | self._wait_for_ready() | |
175 | except self.GidGone: | |
176 | return True | |
177 | else: | |
178 | raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs) | |
179 | ||
180 | def run(self): | |
181 | try: | |
182 | self._evict() | |
183 | except Exception as e: | |
184 | self.success = False | |
185 | self.exception = e | |
186 | else: | |
187 | self.success = True | |
188 | ||
189 | ||
190 | class EvictionError(Exception): | |
191 | pass | |
192 | ||
193 | ||
194 | class CephFSVolumeClientError(Exception): | |
195 | """ | |
196 | Something went wrong talking to Ceph using CephFSVolumeClient. | |
197 | """ | |
198 | pass | |
199 | ||
200 | ||
201 | CEPHFSVOLUMECLIENT_VERSION_HISTORY = """ | |
202 | ||
203 | CephFSVolumeClient Version History: | |
204 | ||
205 | * 1 - Initial version | |
3efd9988 | 206 | * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient |
28e407b8 | 207 | * 3 - Allow volumes to be created without RADOS namespace isolation |
7c673cae FG |
208 | """ |
209 | ||
210 | ||
211 | class CephFSVolumeClient(object): | |
212 | """ | |
213 | Combine libcephfs and librados interfaces to implement a | |
214 | 'Volume' concept implemented as a cephfs directory and | |
215 | client capabilities which restrict mount access to this | |
216 | directory. | |
217 | ||
218 | Additionally, volumes may be in a 'Group'. Conveniently, | |
219 | volumes are a lot like manila shares, and groups are a lot | |
220 | like manila consistency groups. | |
221 | ||
222 | Refer to volumes with VolumePath, which specifies the | |
223 | volume and group IDs (both strings). The group ID may | |
224 | be None. | |
225 | ||
226 | In general, functions in this class are allowed raise rados.Error | |
227 | or cephfs.Error exceptions in unexpected situations. | |
228 | """ | |
229 | ||
230 | # Current version | |
28e407b8 | 231 | version = 3 |
7c673cae FG |
232 | |
233 | # Where shall we create our volumes? | |
234 | POOL_PREFIX = "fsvolume_" | |
235 | DEFAULT_VOL_PREFIX = "/volumes" | |
236 | DEFAULT_NS_PREFIX = "fsvolumens_" | |
237 | ||
238 | def __init__(self, auth_id, conf_path, cluster_name, volume_prefix=None, pool_ns_prefix=None): | |
239 | self.fs = None | |
240 | self.rados = None | |
241 | self.connected = False | |
242 | self.conf_path = conf_path | |
243 | self.cluster_name = cluster_name | |
244 | self.auth_id = auth_id | |
245 | self.volume_prefix = volume_prefix if volume_prefix else self.DEFAULT_VOL_PREFIX | |
246 | self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else self.DEFAULT_NS_PREFIX | |
247 | # For flock'ing in cephfs, I want a unique ID to distinguish me | |
248 | # from any other manila-share services that are loading this module. | |
249 | # We could use pid, but that's unnecessary weak: generate a | |
250 | # UUID | |
251 | self._id = struct.unpack(">Q", uuid.uuid1().get_bytes()[0:8])[0] | |
252 | ||
253 | # TODO: version the on-disk structures | |
254 | ||
255 | def recover(self): | |
256 | # Scan all auth keys to see if they're dirty: if they are, they have | |
257 | # state that might not have propagated to Ceph or to the related | |
258 | # volumes yet. | |
259 | ||
260 | # Important: we *always* acquire locks in the order auth->volume | |
261 | # That means a volume can never be dirty without the auth key | |
262 | # we're updating it with being dirty at the same time. | |
263 | ||
264 | # First list the auth IDs that have potentially dirty on-disk metadata | |
265 | log.debug("Recovering from partial auth updates (if any)...") | |
266 | ||
267 | try: | |
268 | dir_handle = self.fs.opendir(self.volume_prefix) | |
269 | except cephfs.ObjectNotFound: | |
270 | log.debug("Nothing to recover. No auth meta files.") | |
271 | return | |
272 | ||
273 | d = self.fs.readdir(dir_handle) | |
274 | auth_ids = [] | |
275 | ||
276 | if not d: | |
277 | log.debug("Nothing to recover. No auth meta files.") | |
278 | ||
279 | while d: | |
280 | # Identify auth IDs from auth meta filenames. The auth meta files | |
281 | # are named as, "$<auth_id><meta filename extension>" | |
282 | regex = "^\$(.*){0}$".format(re.escape(META_FILE_EXT)) | |
283 | match = re.search(regex, d.d_name) | |
284 | if match: | |
285 | auth_ids.append(match.group(1)) | |
286 | ||
287 | d = self.fs.readdir(dir_handle) | |
288 | ||
289 | self.fs.closedir(dir_handle) | |
290 | ||
291 | # Key points based on ordering: | |
292 | # * Anything added in VMeta is already added in AMeta | |
293 | # * Anything added in Ceph is already added in VMeta | |
294 | # * Anything removed in VMeta is already removed in Ceph | |
295 | # * Anything removed in AMeta is already removed in VMeta | |
296 | ||
297 | # Deauthorization: because I only update metadata AFTER the | |
298 | # update of the next level down, I have the same ordering of | |
299 | # -> things which exist in the AMeta should also exist | |
300 | # in the VMeta, should also exist in Ceph, and the same | |
301 | # recovery procedure that gets me consistent after crashes | |
302 | # during authorization will also work during deauthorization | |
303 | ||
304 | # Now for each auth ID, check for dirty flag and apply updates | |
305 | # if dirty flag is found | |
306 | for auth_id in auth_ids: | |
307 | with self._auth_lock(auth_id): | |
308 | auth_meta = self._auth_metadata_get(auth_id) | |
309 | if not auth_meta or not auth_meta['volumes']: | |
310 | # Clean up auth meta file | |
311 | self.fs.unlink(self._auth_metadata_path(auth_id)) | |
312 | continue | |
313 | if not auth_meta['dirty']: | |
314 | continue | |
315 | self._recover_auth_meta(auth_id, auth_meta) | |
316 | ||
317 | log.debug("Recovered from partial auth updates (if any).") | |
318 | ||
319 | def _recover_auth_meta(self, auth_id, auth_meta): | |
320 | """ | |
321 | Call me after locking the auth meta file. | |
322 | """ | |
323 | remove_volumes = [] | |
324 | ||
325 | for volume, volume_data in auth_meta['volumes'].items(): | |
326 | if not volume_data['dirty']: | |
327 | continue | |
328 | ||
329 | (group_id, volume_id) = volume.split('/') | |
330 | group_id = group_id if group_id is not 'None' else None | |
331 | volume_path = VolumePath(group_id, volume_id) | |
332 | access_level = volume_data['access_level'] | |
333 | ||
334 | with self._volume_lock(volume_path): | |
335 | vol_meta = self._volume_metadata_get(volume_path) | |
336 | ||
337 | # No VMeta update indicates that there was no auth update | |
338 | # in Ceph either. So it's safe to remove corresponding | |
339 | # partial update in AMeta. | |
340 | if not vol_meta or auth_id not in vol_meta['auths']: | |
341 | remove_volumes.append(volume) | |
342 | continue | |
343 | ||
344 | want_auth = { | |
345 | 'access_level': access_level, | |
346 | 'dirty': False, | |
347 | } | |
348 | # VMeta update looks clean. Ceph auth update must have been | |
349 | # clean. | |
350 | if vol_meta['auths'][auth_id] == want_auth: | |
351 | continue | |
352 | ||
353 | readonly = True if access_level is 'r' else False | |
354 | self._authorize_volume(volume_path, auth_id, readonly) | |
355 | ||
356 | # Recovered from partial auth updates for the auth ID's access | |
357 | # to a volume. | |
358 | auth_meta['volumes'][volume]['dirty'] = False | |
359 | self._auth_metadata_set(auth_id, auth_meta) | |
360 | ||
361 | for volume in remove_volumes: | |
362 | del auth_meta['volumes'][volume] | |
363 | ||
364 | if not auth_meta['volumes']: | |
365 | # Clean up auth meta file | |
366 | self.fs.unlink(self._auth_metadata_path(auth_id)) | |
367 | return | |
368 | ||
369 | # Recovered from all partial auth updates for the auth ID. | |
370 | auth_meta['dirty'] = False | |
371 | self._auth_metadata_set(auth_id, auth_meta) | |
372 | ||
373 | ||
374 | def evict(self, auth_id, timeout=30, volume_path=None): | |
375 | """ | |
376 | Evict all clients based on the authorization ID and optionally based on | |
377 | the volume path mounted. Assumes that the authorization key has been | |
378 | revoked prior to calling this function. | |
379 | ||
380 | This operation can throw an exception if the mon cluster is unresponsive, or | |
381 | any individual MDS daemon is unresponsive for longer than the timeout passed in. | |
382 | """ | |
383 | ||
384 | client_spec = ["auth_name={0}".format(auth_id), ] | |
385 | if volume_path: | |
386 | client_spec.append("client_metadata.root={0}". | |
387 | format(self._get_path(volume_path))) | |
388 | ||
389 | log.info("evict clients with {0}".format(', '.join(client_spec))) | |
390 | ||
391 | mds_map = self._rados_command("mds dump", {}) | |
392 | ||
393 | up = {} | |
394 | for name, gid in mds_map['up'].items(): | |
395 | # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0" | |
396 | assert name.startswith("mds_") | |
397 | up[int(name[4:])] = gid | |
398 | ||
399 | # For all MDS ranks held by a daemon | |
400 | # Do the parallelism in python instead of using "tell mds.*", because | |
401 | # the latter doesn't give us per-mds output | |
402 | threads = [] | |
403 | for rank, gid in up.items(): | |
404 | thread = RankEvicter(self, client_spec, rank, gid, mds_map, | |
405 | timeout) | |
406 | thread.start() | |
407 | threads.append(thread) | |
408 | ||
409 | for t in threads: | |
410 | t.join() | |
411 | ||
412 | log.info("evict: joined all") | |
413 | ||
414 | for t in threads: | |
415 | if not t.success: | |
416 | msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}". | |
417 | format(', '.join(client_spec), t.rank, t.gid, t.exception) | |
418 | ) | |
419 | log.error(msg) | |
420 | raise EvictionError(msg) | |
421 | ||
422 | def _get_path(self, volume_path): | |
423 | """ | |
424 | Determine the path within CephFS where this volume will live | |
425 | :return: absolute path (string) | |
426 | """ | |
427 | return os.path.join( | |
428 | self.volume_prefix, | |
429 | volume_path.group_id if volume_path.group_id is not None else NO_GROUP_NAME, | |
430 | volume_path.volume_id) | |
431 | ||
432 | def _get_group_path(self, group_id): | |
433 | if group_id is None: | |
434 | raise ValueError("group_id may not be None") | |
435 | ||
436 | return os.path.join( | |
437 | self.volume_prefix, | |
438 | group_id | |
439 | ) | |
440 | ||
441 | def connect(self, premount_evict = None): | |
442 | """ | |
443 | ||
444 | :param premount_evict: Optional auth_id to evict before mounting the filesystem: callers | |
445 | may want to use this to specify their own auth ID if they expect | |
446 | to be a unique instance and don't want to wait for caps to time | |
447 | out after failure of another instance of themselves. | |
448 | """ | |
449 | log.debug("Connecting to RADOS with config {0}...".format(self.conf_path)) | |
450 | self.rados = rados.Rados( | |
451 | name="client.{0}".format(self.auth_id), | |
452 | clustername=self.cluster_name, | |
453 | conffile=self.conf_path, | |
454 | conf={} | |
455 | ) | |
456 | self.rados.connect() | |
457 | ||
458 | log.debug("Connection to RADOS complete") | |
459 | ||
460 | log.debug("Connecting to cephfs...") | |
461 | self.fs = cephfs.LibCephFS(rados_inst=self.rados) | |
462 | log.debug("CephFS initializing...") | |
463 | self.fs.init() | |
464 | if premount_evict is not None: | |
465 | log.debug("Premount eviction of {0} starting".format(premount_evict)) | |
466 | self.evict(premount_evict) | |
467 | log.debug("Premount eviction of {0} completes".format(premount_evict)) | |
468 | log.debug("CephFS mounting...") | |
469 | self.fs.mount() | |
470 | log.debug("Connection to cephfs complete") | |
471 | ||
472 | # Recover from partial auth updates due to a previous | |
473 | # crash. | |
474 | self.recover() | |
475 | ||
476 | def get_mon_addrs(self): | |
477 | log.info("get_mon_addrs") | |
478 | result = [] | |
479 | mon_map = self._rados_command("mon dump") | |
480 | for mon in mon_map['mons']: | |
481 | ip_port = mon['addr'].split("/")[0] | |
482 | result.append(ip_port) | |
483 | ||
484 | return result | |
485 | ||
486 | def disconnect(self): | |
487 | log.info("disconnect") | |
488 | if self.fs: | |
489 | log.debug("Disconnecting cephfs...") | |
490 | self.fs.shutdown() | |
491 | self.fs = None | |
492 | log.debug("Disconnecting cephfs complete") | |
493 | ||
494 | if self.rados: | |
495 | log.debug("Disconnecting rados...") | |
496 | self.rados.shutdown() | |
497 | self.rados = None | |
498 | log.debug("Disconnecting rados complete") | |
499 | ||
500 | def __del__(self): | |
501 | self.disconnect() | |
502 | ||
503 | def _get_pool_id(self, osd_map, pool_name): | |
504 | # Maybe borrow the OSDMap wrapper class from calamari if more helpers | |
505 | # like this are needed. | |
506 | for pool in osd_map['pools']: | |
507 | if pool['pool_name'] == pool_name: | |
508 | return pool['pool'] | |
509 | ||
510 | return None | |
511 | ||
512 | def _create_volume_pool(self, pool_name): | |
513 | """ | |
514 | Idempotently create a pool for use as a CephFS data pool, with the given name | |
515 | ||
516 | :return The ID of the created pool | |
517 | """ | |
518 | osd_map = self._rados_command('osd dump', {}) | |
519 | ||
520 | existing_id = self._get_pool_id(osd_map, pool_name) | |
521 | if existing_id is not None: | |
522 | log.info("Pool {0} already exists".format(pool_name)) | |
523 | return existing_id | |
524 | ||
525 | osd_count = len(osd_map['osds']) | |
526 | ||
527 | # We can't query the actual cluster config remotely, but since this is | |
528 | # just a heuristic we'll assume that the ceph.conf we have locally reflects | |
529 | # that in use in the rest of the cluster. | |
3efd9988 | 530 | pg_warn_max_per_osd = int(self.rados.conf_get('mon_max_pg_per_osd')) |
7c673cae FG |
531 | |
532 | other_pgs = 0 | |
533 | for pool in osd_map['pools']: | |
534 | if not pool['pool_name'].startswith(self.POOL_PREFIX): | |
535 | other_pgs += pool['pg_num'] | |
536 | ||
537 | # A basic heuristic for picking pg_num: work out the max number of | |
538 | # PGs we can have without tripping a warning, then subtract the number | |
539 | # of PGs already created by non-manila pools, then divide by ten. That'll | |
540 | # give you a reasonable result on a system where you have "a few" manila | |
541 | # shares. | |
542 | pg_num = ((pg_warn_max_per_osd * osd_count) - other_pgs) / 10 | |
543 | # TODO Alternatively, respect an override set by the user. | |
544 | ||
545 | self._rados_command( | |
546 | 'osd pool create', | |
547 | { | |
548 | 'pool': pool_name, | |
549 | 'pg_num': pg_num | |
550 | } | |
551 | ) | |
552 | ||
553 | osd_map = self._rados_command('osd dump', {}) | |
554 | pool_id = self._get_pool_id(osd_map, pool_name) | |
555 | ||
556 | if pool_id is None: | |
557 | # If the pool isn't there, that's either a ceph bug, or it's some outside influence | |
558 | # removing it right after we created it. | |
559 | log.error("OSD map doesn't contain expected pool '{0}':\n{1}".format( | |
560 | pool_name, json.dumps(osd_map, indent=2) | |
561 | )) | |
562 | raise RuntimeError("Pool '{0}' not present in map after creation".format(pool_name)) | |
563 | else: | |
564 | return pool_id | |
565 | ||
566 | def create_group(self, group_id): | |
567 | # Prevent craftily-named volume groups from colliding with the meta | |
568 | # files. | |
569 | if group_id.endswith(META_FILE_EXT): | |
570 | raise ValueError("group ID cannot end with '{0}'.".format( | |
571 | META_FILE_EXT)) | |
572 | path = self._get_group_path(group_id) | |
573 | self._mkdir_p(path) | |
574 | ||
575 | def destroy_group(self, group_id): | |
576 | path = self._get_group_path(group_id) | |
577 | try: | |
578 | self.fs.stat(self.volume_prefix) | |
579 | except cephfs.ObjectNotFound: | |
580 | pass | |
581 | else: | |
582 | self.fs.rmdir(path) | |
583 | ||
584 | def _mkdir_p(self, path): | |
585 | try: | |
586 | self.fs.stat(path) | |
587 | except cephfs.ObjectNotFound: | |
588 | pass | |
589 | else: | |
590 | return | |
591 | ||
592 | parts = path.split(os.path.sep) | |
593 | ||
594 | for i in range(1, len(parts) + 1): | |
595 | subpath = os.path.join(*parts[0:i]) | |
596 | try: | |
597 | self.fs.stat(subpath) | |
598 | except cephfs.ObjectNotFound: | |
599 | self.fs.mkdir(subpath, 0o755) | |
600 | ||
28e407b8 | 601 | def create_volume(self, volume_path, size=None, data_isolated=False, namespace_isolated=True): |
7c673cae FG |
602 | """ |
603 | Set up metadata, pools and auth for a volume. | |
604 | ||
605 | This function is idempotent. It is safe to call this again | |
606 | for an already-created volume, even if it is in use. | |
607 | ||
608 | :param volume_path: VolumePath instance | |
609 | :param size: In bytes, or None for no size limit | |
610 | :param data_isolated: If true, create a separate OSD pool for this volume | |
28e407b8 | 611 | :param namespace_isolated: If true, use separate RADOS namespace for this volume |
7c673cae FG |
612 | :return: |
613 | """ | |
614 | path = self._get_path(volume_path) | |
615 | log.info("create_volume: {0}".format(path)) | |
616 | ||
617 | self._mkdir_p(path) | |
618 | ||
619 | if size is not None: | |
620 | self.fs.setxattr(path, 'ceph.quota.max_bytes', size.__str__(), 0) | |
621 | ||
622 | # data_isolated means create a separate pool for this volume | |
623 | if data_isolated: | |
624 | pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id) | |
625 | log.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path, pool_name)) | |
626 | pool_id = self._create_volume_pool(pool_name) | |
627 | mds_map = self._rados_command("mds dump", {}) | |
628 | if pool_id not in mds_map['data_pools']: | |
629 | self._rados_command("mds add_data_pool", { | |
630 | 'pool': pool_name | |
631 | }) | |
632 | self.fs.setxattr(path, 'ceph.dir.layout.pool', pool_name, 0) | |
633 | ||
28e407b8 AA |
634 | # enforce security isolation, use separate namespace for this volume |
635 | if namespace_isolated: | |
636 | namespace = "{0}{1}".format(self.pool_ns_prefix, volume_path.volume_id) | |
637 | log.info("create_volume: {0}, using rados namespace {1} to isolate data.".format(volume_path, namespace)) | |
638 | self.fs.setxattr(path, 'ceph.dir.layout.pool_namespace', namespace, 0) | |
639 | else: | |
640 | # If volume's namespace layout is not set, then the volume's pool | |
641 | # layout remains unset and will undesirably change with ancestor's | |
642 | # pool layout changes. | |
643 | pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool") | |
644 | self.fs.setxattr(path, 'ceph.dir.layout.pool', pool_name, 0) | |
7c673cae FG |
645 | |
646 | # Create a volume meta file, if it does not already exist, to store | |
647 | # data about auth ids having access to the volume | |
648 | fd = self.fs.open(self._volume_metadata_path(volume_path), | |
649 | os.O_CREAT, 0o755) | |
650 | self.fs.close(fd) | |
651 | ||
652 | return { | |
653 | 'mount_path': path | |
654 | } | |
655 | ||
656 | def delete_volume(self, volume_path, data_isolated=False): | |
657 | """ | |
658 | Make a volume inaccessible to guests. This function is | |
659 | idempotent. This is the fast part of tearing down a volume: you must | |
660 | also later call purge_volume, which is the slow part. | |
661 | ||
662 | :param volume_path: Same identifier used in create_volume | |
663 | :return: | |
664 | """ | |
665 | ||
666 | path = self._get_path(volume_path) | |
667 | log.info("delete_volume: {0}".format(path)) | |
668 | ||
669 | # Create the trash folder if it doesn't already exist | |
670 | trash = os.path.join(self.volume_prefix, "_deleting") | |
671 | self._mkdir_p(trash) | |
672 | ||
673 | # We'll move it to here | |
674 | trashed_volume = os.path.join(trash, volume_path.volume_id) | |
675 | ||
676 | # Move the volume's data to the trash folder | |
677 | try: | |
678 | self.fs.stat(path) | |
679 | except cephfs.ObjectNotFound: | |
680 | log.warning("Trying to delete volume '{0}' but it's already gone".format( | |
681 | path)) | |
682 | else: | |
683 | self.fs.rename(path, trashed_volume) | |
684 | ||
685 | # Delete the volume meta file, if it's not already deleted | |
686 | vol_meta_path = self._volume_metadata_path(volume_path) | |
687 | try: | |
688 | self.fs.unlink(vol_meta_path) | |
689 | except cephfs.ObjectNotFound: | |
690 | pass | |
691 | ||
692 | def purge_volume(self, volume_path, data_isolated=False): | |
693 | """ | |
694 | Finish clearing up a volume that was previously passed to delete_volume. This | |
695 | function is idempotent. | |
696 | """ | |
697 | ||
698 | trash = os.path.join(self.volume_prefix, "_deleting") | |
699 | trashed_volume = os.path.join(trash, volume_path.volume_id) | |
700 | ||
701 | try: | |
702 | self.fs.stat(trashed_volume) | |
703 | except cephfs.ObjectNotFound: | |
704 | log.warning("Trying to purge volume '{0}' but it's already been purged".format( | |
705 | trashed_volume)) | |
706 | return | |
707 | ||
708 | def rmtree(root_path): | |
709 | log.debug("rmtree {0}".format(root_path)) | |
710 | dir_handle = self.fs.opendir(root_path) | |
711 | d = self.fs.readdir(dir_handle) | |
712 | while d: | |
713 | if d.d_name not in [".", ".."]: | |
714 | # Do not use os.path.join because it is sensitive | |
715 | # to string encoding, we just pass through dnames | |
716 | # as byte arrays | |
717 | d_full = "{0}/{1}".format(root_path, d.d_name) | |
718 | if d.is_dir(): | |
719 | rmtree(d_full) | |
720 | else: | |
721 | self.fs.unlink(d_full) | |
722 | ||
723 | d = self.fs.readdir(dir_handle) | |
724 | self.fs.closedir(dir_handle) | |
725 | ||
726 | self.fs.rmdir(root_path) | |
727 | ||
728 | rmtree(trashed_volume) | |
729 | ||
730 | if data_isolated: | |
731 | pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id) | |
732 | osd_map = self._rados_command("osd dump", {}) | |
733 | pool_id = self._get_pool_id(osd_map, pool_name) | |
734 | mds_map = self._rados_command("mds dump", {}) | |
735 | if pool_id in mds_map['data_pools']: | |
736 | self._rados_command("mds remove_data_pool", { | |
737 | 'pool': pool_name | |
738 | }) | |
739 | self._rados_command("osd pool delete", | |
740 | { | |
741 | "pool": pool_name, | |
742 | "pool2": pool_name, | |
743 | "sure": "--yes-i-really-really-mean-it" | |
744 | }) | |
745 | ||
746 | def _get_ancestor_xattr(self, path, attr): | |
747 | """ | |
748 | Helper for reading layout information: if this xattr is missing | |
749 | on the requested path, keep checking parents until we find it. | |
750 | """ | |
751 | try: | |
752 | result = self.fs.getxattr(path, attr) | |
753 | if result == "": | |
754 | # Annoying! cephfs gives us empty instead of an error when attr not found | |
755 | raise cephfs.NoData() | |
756 | else: | |
757 | return result | |
758 | except cephfs.NoData: | |
759 | if path == "/": | |
760 | raise | |
761 | else: | |
762 | return self._get_ancestor_xattr(os.path.split(path)[0], attr) | |
763 | ||
764 | def _check_compat_version(self, compat_version): | |
765 | if self.version < compat_version: | |
766 | msg = ("The current version of CephFSVolumeClient, version {0} " | |
767 | "does not support the required feature. Need version {1} " | |
768 | "or greater".format(self.version, compat_version) | |
769 | ) | |
770 | log.error(msg) | |
771 | raise CephFSVolumeClientError(msg) | |
772 | ||
773 | def _metadata_get(self, path): | |
774 | """ | |
775 | Return a deserialized JSON object, or None | |
776 | """ | |
777 | fd = self.fs.open(path, "r") | |
778 | # TODO iterate instead of assuming file < 4MB | |
779 | read_bytes = self.fs.read(fd, 0, 4096 * 1024) | |
780 | self.fs.close(fd) | |
781 | if read_bytes: | |
782 | return json.loads(read_bytes) | |
783 | else: | |
784 | return None | |
785 | ||
786 | def _metadata_set(self, path, data): | |
787 | serialized = json.dumps(data) | |
788 | fd = self.fs.open(path, "w") | |
789 | try: | |
790 | self.fs.write(fd, serialized, 0) | |
791 | self.fs.fsync(fd, 0) | |
792 | finally: | |
793 | self.fs.close(fd) | |
794 | ||
795 | def _lock(self, path): | |
796 | @contextmanager | |
797 | def fn(): | |
798 | while(1): | |
799 | fd = self.fs.open(path, os.O_CREAT, 0o755) | |
800 | self.fs.flock(fd, fcntl.LOCK_EX, self._id) | |
801 | ||
802 | # The locked file will be cleaned up sometime. It could be | |
803 | # unlinked e.g., by an another manila share instance, before | |
804 | # lock was applied on it. Perform checks to ensure that this | |
805 | # does not happen. | |
806 | try: | |
807 | statbuf = self.fs.stat(path) | |
808 | except cephfs.ObjectNotFound: | |
809 | self.fs.close(fd) | |
810 | continue | |
811 | ||
812 | fstatbuf = self.fs.fstat(fd) | |
813 | if statbuf.st_ino == fstatbuf.st_ino: | |
814 | break | |
815 | ||
816 | try: | |
817 | yield | |
818 | finally: | |
819 | self.fs.flock(fd, fcntl.LOCK_UN, self._id) | |
820 | self.fs.close(fd) | |
821 | ||
822 | return fn() | |
823 | ||
824 | def _auth_metadata_path(self, auth_id): | |
825 | return os.path.join(self.volume_prefix, "${0}{1}".format( | |
826 | auth_id, META_FILE_EXT)) | |
827 | ||
828 | def _auth_lock(self, auth_id): | |
829 | return self._lock(self._auth_metadata_path(auth_id)) | |
830 | ||
831 | def _auth_metadata_get(self, auth_id): | |
832 | """ | |
833 | Call me with the metadata locked! | |
834 | ||
835 | Check whether a auth metadata structure can be decoded by the current | |
836 | version of CephFSVolumeClient. | |
837 | ||
838 | Return auth metadata that the current version of CephFSVolumeClient | |
839 | can decode. | |
840 | """ | |
841 | auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id)) | |
842 | ||
843 | if auth_metadata: | |
844 | self._check_compat_version(auth_metadata['compat_version']) | |
845 | ||
846 | return auth_metadata | |
847 | ||
848 | def _auth_metadata_set(self, auth_id, data): | |
849 | """ | |
850 | Call me with the metadata locked! | |
851 | ||
852 | Fsync the auth metadata. | |
853 | ||
854 | Add two version attributes to the auth metadata, | |
855 | 'compat_version', the minimum CephFSVolumeClient version that can | |
856 | decode the metadata, and 'version', the CephFSVolumeClient version | |
857 | that encoded the metadata. | |
858 | """ | |
859 | data['compat_version'] = 1 | |
3efd9988 | 860 | data['version'] = self.version |
7c673cae FG |
861 | return self._metadata_set(self._auth_metadata_path(auth_id), data) |
862 | ||
863 | def _volume_metadata_path(self, volume_path): | |
864 | return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format( | |
865 | volume_path.group_id if volume_path.group_id else "", | |
866 | volume_path.volume_id, | |
867 | META_FILE_EXT | |
868 | )) | |
869 | ||
870 | def _volume_lock(self, volume_path): | |
871 | """ | |
872 | Return a ContextManager which locks the authorization metadata for | |
873 | a particular volume, and persists a flag to the metadata indicating | |
874 | that it is currently locked, so that we can detect dirty situations | |
875 | during recovery. | |
876 | ||
877 | This lock isn't just to make access to the metadata safe: it's also | |
878 | designed to be used over the two-step process of checking the | |
879 | metadata and then responding to an authorization request, to | |
880 | ensure that at the point we respond the metadata hasn't changed | |
881 | in the background. It's key to how we avoid security holes | |
882 | resulting from races during that problem , | |
883 | """ | |
884 | return self._lock(self._volume_metadata_path(volume_path)) | |
885 | ||
886 | def _volume_metadata_get(self, volume_path): | |
887 | """ | |
888 | Call me with the metadata locked! | |
889 | ||
890 | Check whether a volume metadata structure can be decoded by the current | |
891 | version of CephFSVolumeClient. | |
892 | ||
893 | Return a volume_metadata structure that the current version of | |
894 | CephFSVolumeClient can decode. | |
895 | """ | |
896 | volume_metadata = self._metadata_get(self._volume_metadata_path(volume_path)) | |
897 | ||
898 | if volume_metadata: | |
899 | self._check_compat_version(volume_metadata['compat_version']) | |
900 | ||
901 | return volume_metadata | |
902 | ||
903 | def _volume_metadata_set(self, volume_path, data): | |
904 | """ | |
905 | Call me with the metadata locked! | |
906 | ||
907 | Add two version attributes to the volume metadata, | |
908 | 'compat_version', the minimum CephFSVolumeClient version that can | |
909 | decode the metadata and 'version', the CephFSVolumeClient version | |
910 | that encoded the metadata. | |
911 | """ | |
912 | data['compat_version'] = 1 | |
3efd9988 | 913 | data['version'] = self.version |
7c673cae FG |
914 | return self._metadata_set(self._volume_metadata_path(volume_path), data) |
915 | ||
916 | def authorize(self, volume_path, auth_id, readonly=False, tenant_id=None): | |
917 | """ | |
918 | Get-or-create a Ceph auth identity for `auth_id` and grant them access | |
919 | to | |
920 | :param volume_path: | |
921 | :param auth_id: | |
922 | :param readonly: | |
923 | :param tenant_id: Optionally provide a stringizable object to | |
924 | restrict any created cephx IDs to other callers | |
925 | passing the same tenant ID. | |
926 | :return: | |
927 | """ | |
928 | ||
929 | with self._auth_lock(auth_id): | |
930 | # Existing meta, or None, to be updated | |
931 | auth_meta = self._auth_metadata_get(auth_id) | |
932 | ||
933 | # volume data to be inserted | |
934 | volume_path_str = str(volume_path) | |
935 | volume = { | |
936 | volume_path_str : { | |
937 | # The access level at which the auth_id is authorized to | |
938 | # access the volume. | |
939 | 'access_level': 'r' if readonly else 'rw', | |
940 | 'dirty': True, | |
941 | } | |
942 | } | |
943 | if auth_meta is None: | |
944 | sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format( | |
945 | auth_id, tenant_id | |
946 | )) | |
947 | log.debug("Authorize: no existing meta") | |
948 | auth_meta = { | |
949 | 'dirty': True, | |
950 | 'tenant_id': tenant_id.__str__() if tenant_id else None, | |
951 | 'volumes': volume | |
952 | } | |
953 | ||
954 | # Note: this is *not* guaranteeing that the key doesn't already | |
955 | # exist in Ceph: we are allowing VolumeClient tenants to | |
956 | # 'claim' existing Ceph keys. In order to prevent VolumeClient | |
957 | # tenants from reading e.g. client.admin keys, you need to | |
958 | # have configured your VolumeClient user (e.g. Manila) to | |
959 | # have mon auth caps that prevent it from accessing those keys | |
960 | # (e.g. limit it to only access keys with a manila.* prefix) | |
961 | else: | |
962 | # Disallow tenants to share auth IDs | |
963 | if auth_meta['tenant_id'].__str__() != tenant_id.__str__(): | |
964 | msg = "auth ID: {0} is already in use".format(auth_id) | |
965 | log.error(msg) | |
966 | raise CephFSVolumeClientError(msg) | |
967 | ||
968 | if auth_meta['dirty']: | |
969 | self._recover_auth_meta(auth_id, auth_meta) | |
970 | ||
971 | log.debug("Authorize: existing tenant {tenant}".format( | |
972 | tenant=auth_meta['tenant_id'] | |
973 | )) | |
974 | auth_meta['dirty'] = True | |
975 | auth_meta['volumes'].update(volume) | |
976 | ||
977 | self._auth_metadata_set(auth_id, auth_meta) | |
978 | ||
979 | with self._volume_lock(volume_path): | |
980 | key = self._authorize_volume(volume_path, auth_id, readonly) | |
981 | ||
982 | auth_meta['dirty'] = False | |
983 | auth_meta['volumes'][volume_path_str]['dirty'] = False | |
984 | self._auth_metadata_set(auth_id, auth_meta) | |
985 | ||
986 | if tenant_id: | |
987 | return { | |
988 | 'auth_key': key | |
989 | } | |
990 | else: | |
991 | # Caller wasn't multi-tenant aware: be safe and don't give | |
992 | # them a key | |
993 | return { | |
994 | 'auth_key': None | |
995 | } | |
996 | ||
997 | def _authorize_volume(self, volume_path, auth_id, readonly): | |
998 | vol_meta = self._volume_metadata_get(volume_path) | |
999 | ||
1000 | access_level = 'r' if readonly else 'rw' | |
1001 | auth = { | |
1002 | auth_id: { | |
1003 | 'access_level': access_level, | |
1004 | 'dirty': True, | |
1005 | } | |
1006 | } | |
1007 | ||
1008 | if vol_meta is None: | |
1009 | vol_meta = { | |
1010 | 'auths': auth | |
1011 | } | |
1012 | else: | |
1013 | vol_meta['auths'].update(auth) | |
1014 | self._volume_metadata_set(volume_path, vol_meta) | |
1015 | ||
1016 | key = self._authorize_ceph(volume_path, auth_id, readonly) | |
1017 | ||
1018 | vol_meta['auths'][auth_id]['dirty'] = False | |
1019 | self._volume_metadata_set(volume_path, vol_meta) | |
1020 | ||
1021 | return key | |
1022 | ||
1023 | def _authorize_ceph(self, volume_path, auth_id, readonly): | |
1024 | path = self._get_path(volume_path) | |
1025 | log.debug("Authorizing Ceph id '{0}' for path '{1}'".format( | |
1026 | auth_id, path | |
1027 | )) | |
1028 | ||
1029 | # First I need to work out what the data pool is for this share: | |
1030 | # read the layout | |
1031 | pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool") | |
28e407b8 AA |
1032 | |
1033 | try: | |
1034 | namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_namespace") | |
1035 | except cephfs.NoData: | |
1036 | namespace = None | |
7c673cae FG |
1037 | |
1038 | # Now construct auth capabilities that give the guest just enough | |
1039 | # permissions to access the share | |
1040 | client_entity = "client.{0}".format(auth_id) | |
1041 | want_access_level = 'r' if readonly else 'rw' | |
1042 | want_mds_cap = 'allow {0} path={1}'.format(want_access_level, path) | |
28e407b8 AA |
1043 | if namespace: |
1044 | want_osd_cap = 'allow {0} pool={1} namespace={2}'.format( | |
1045 | want_access_level, pool_name, namespace) | |
1046 | else: | |
1047 | want_osd_cap = 'allow {0} pool={1}'.format(want_access_level, | |
1048 | pool_name) | |
7c673cae FG |
1049 | |
1050 | try: | |
1051 | existing = self._rados_command( | |
1052 | 'auth get', | |
1053 | { | |
1054 | 'entity': client_entity | |
1055 | } | |
1056 | ) | |
1057 | # FIXME: rados raising Error instead of ObjectNotFound in auth get failure | |
1058 | except rados.Error: | |
1059 | caps = self._rados_command( | |
1060 | 'auth get-or-create', | |
1061 | { | |
1062 | 'entity': client_entity, | |
1063 | 'caps': [ | |
1064 | 'mds', want_mds_cap, | |
1065 | 'osd', want_osd_cap, | |
1066 | 'mon', 'allow r'] | |
1067 | }) | |
1068 | else: | |
1069 | # entity exists, update it | |
1070 | cap = existing[0] | |
1071 | ||
1072 | # Construct auth caps that if present might conflict with the desired | |
1073 | # auth caps. | |
1074 | unwanted_access_level = 'r' if want_access_level is 'rw' else 'rw' | |
1075 | unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, path) | |
28e407b8 AA |
1076 | if namespace: |
1077 | unwanted_osd_cap = 'allow {0} pool={1} namespace={2}'.format( | |
1078 | unwanted_access_level, pool_name, namespace) | |
1079 | else: | |
1080 | unwanted_osd_cap = 'allow {0} pool={1}'.format( | |
1081 | unwanted_access_level, pool_name) | |
1082 | ||
1083 | def cap_update( | |
1084 | orig_mds_caps, orig_osd_caps, want_mds_cap, | |
1085 | want_osd_cap, unwanted_mds_cap, unwanted_osd_cap): | |
7c673cae | 1086 | |
28e407b8 AA |
1087 | if not orig_mds_caps: |
1088 | return want_mds_cap, want_osd_cap | |
7c673cae | 1089 | |
28e407b8 AA |
1090 | mds_cap_tokens = orig_mds_caps.split(",") |
1091 | osd_cap_tokens = orig_osd_caps.split(",") | |
3efd9988 | 1092 | |
28e407b8 AA |
1093 | if want_mds_cap in mds_cap_tokens: |
1094 | return orig_mds_caps, orig_osd_caps | |
7c673cae | 1095 | |
28e407b8 AA |
1096 | if unwanted_mds_cap in mds_cap_tokens: |
1097 | mds_cap_tokens.remove(unwanted_mds_cap) | |
1098 | osd_cap_tokens.remove(unwanted_osd_cap) | |
7c673cae | 1099 | |
28e407b8 AA |
1100 | mds_cap_tokens.append(want_mds_cap) |
1101 | osd_cap_tokens.append(want_osd_cap) | |
7c673cae | 1102 | |
28e407b8 AA |
1103 | return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens) |
1104 | ||
1105 | orig_mds_caps = cap['caps'].get('mds', "") | |
1106 | orig_osd_caps = cap['caps'].get('osd', "") | |
1107 | ||
1108 | mds_cap_str, osd_cap_str = cap_update( | |
1109 | orig_mds_caps, orig_osd_caps, want_mds_cap, want_osd_cap, | |
1110 | unwanted_mds_cap, unwanted_osd_cap) | |
7c673cae FG |
1111 | |
1112 | caps = self._rados_command( | |
1113 | 'auth caps', | |
1114 | { | |
1115 | 'entity': client_entity, | |
1116 | 'caps': [ | |
1117 | 'mds', mds_cap_str, | |
1118 | 'osd', osd_cap_str, | |
1119 | 'mon', cap['caps'].get('mon', 'allow r')] | |
1120 | }) | |
1121 | caps = self._rados_command( | |
1122 | 'auth get', | |
1123 | { | |
1124 | 'entity': client_entity | |
1125 | } | |
1126 | ) | |
1127 | ||
1128 | # Result expected like this: | |
1129 | # [ | |
1130 | # { | |
1131 | # "entity": "client.foobar", | |
1132 | # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==", | |
1133 | # "caps": { | |
1134 | # "mds": "allow *", | |
1135 | # "mon": "allow *" | |
1136 | # } | |
1137 | # } | |
1138 | # ] | |
1139 | assert len(caps) == 1 | |
1140 | assert caps[0]['entity'] == client_entity | |
1141 | return caps[0]['key'] | |
1142 | ||
1143 | def deauthorize(self, volume_path, auth_id): | |
1144 | with self._auth_lock(auth_id): | |
1145 | # Existing meta, or None, to be updated | |
1146 | auth_meta = self._auth_metadata_get(auth_id) | |
1147 | ||
1148 | volume_path_str = str(volume_path) | |
1149 | if (auth_meta is None) or (not auth_meta['volumes']): | |
1150 | log.warn("deauthorized called for already-removed auth" | |
1151 | "ID '{auth_id}' for volume ID '{volume}'".format( | |
1152 | auth_id=auth_id, volume=volume_path.volume_id | |
1153 | )) | |
1154 | # Clean up the auth meta file of an auth ID | |
1155 | self.fs.unlink(self._auth_metadata_path(auth_id)) | |
1156 | return | |
1157 | ||
1158 | if volume_path_str not in auth_meta['volumes']: | |
1159 | log.warn("deauthorized called for already-removed auth" | |
1160 | "ID '{auth_id}' for volume ID '{volume}'".format( | |
1161 | auth_id=auth_id, volume=volume_path.volume_id | |
1162 | )) | |
1163 | return | |
1164 | ||
1165 | if auth_meta['dirty']: | |
1166 | self._recover_auth_meta(auth_id, auth_meta) | |
1167 | ||
1168 | auth_meta['dirty'] = True | |
1169 | auth_meta['volumes'][volume_path_str]['dirty'] = True | |
1170 | self._auth_metadata_set(auth_id, auth_meta) | |
1171 | ||
1172 | self._deauthorize_volume(volume_path, auth_id) | |
1173 | ||
1174 | # Filter out the volume we're deauthorizing | |
1175 | del auth_meta['volumes'][volume_path_str] | |
1176 | ||
1177 | # Clean up auth meta file | |
1178 | if not auth_meta['volumes']: | |
1179 | self.fs.unlink(self._auth_metadata_path(auth_id)) | |
1180 | return | |
1181 | ||
1182 | auth_meta['dirty'] = False | |
1183 | self._auth_metadata_set(auth_id, auth_meta) | |
1184 | ||
1185 | def _deauthorize_volume(self, volume_path, auth_id): | |
1186 | with self._volume_lock(volume_path): | |
1187 | vol_meta = self._volume_metadata_get(volume_path) | |
1188 | ||
1189 | if (vol_meta is None) or (auth_id not in vol_meta['auths']): | |
1190 | log.warn("deauthorized called for already-removed auth" | |
1191 | "ID '{auth_id}' for volume ID '{volume}'".format( | |
1192 | auth_id=auth_id, volume=volume_path.volume_id | |
1193 | )) | |
1194 | return | |
1195 | ||
1196 | vol_meta['auths'][auth_id]['dirty'] = True | |
1197 | self._volume_metadata_set(volume_path, vol_meta) | |
1198 | ||
1199 | self._deauthorize(volume_path, auth_id) | |
1200 | ||
1201 | # Remove the auth_id from the metadata *after* removing it | |
1202 | # from ceph, so that if we crashed here, we would actually | |
1203 | # recreate the auth ID during recovery (i.e. end up with | |
1204 | # a consistent state). | |
1205 | ||
1206 | # Filter out the auth we're removing | |
1207 | del vol_meta['auths'][auth_id] | |
1208 | self._volume_metadata_set(volume_path, vol_meta) | |
1209 | ||
1210 | def _deauthorize(self, volume_path, auth_id): | |
1211 | """ | |
1212 | The volume must still exist. | |
1213 | """ | |
1214 | client_entity = "client.{0}".format(auth_id) | |
1215 | path = self._get_path(volume_path) | |
1216 | pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool") | |
28e407b8 AA |
1217 | try: |
1218 | namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_namespace") | |
1219 | except cephfs.NoData: | |
1220 | namespace = None | |
7c673cae FG |
1221 | |
1222 | # The auth_id might have read-only or read-write mount access for the | |
1223 | # volume path. | |
1224 | access_levels = ('r', 'rw') | |
28e407b8 AA |
1225 | want_mds_caps = ['allow {0} path={1}'.format(access_level, path) |
1226 | for access_level in access_levels] | |
1227 | if namespace: | |
1228 | want_osd_caps = ['allow {0} pool={1} namespace={2}'.format(access_level, pool_name, namespace) | |
1229 | for access_level in access_levels] | |
1230 | else: | |
1231 | want_osd_caps = ['allow {0} pool={1}'.format(access_level, pool_name) | |
1232 | for access_level in access_levels] | |
1233 | ||
7c673cae FG |
1234 | |
1235 | try: | |
1236 | existing = self._rados_command( | |
1237 | 'auth get', | |
1238 | { | |
1239 | 'entity': client_entity | |
1240 | } | |
1241 | ) | |
1242 | ||
28e407b8 AA |
1243 | def cap_remove(orig_mds_caps, orig_osd_caps, want_mds_caps, want_osd_caps): |
1244 | mds_cap_tokens = orig_mds_caps.split(",") | |
1245 | osd_cap_tokens = orig_osd_caps.split(",") | |
1246 | ||
1247 | for want_mds_cap, want_osd_cap in zip(want_mds_caps, want_osd_caps): | |
1248 | if want_mds_cap in mds_cap_tokens: | |
1249 | mds_cap_tokens.remove(want_mds_cap) | |
1250 | osd_cap_tokens.remove(want_osd_cap) | |
1251 | break | |
1252 | ||
1253 | return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens) | |
7c673cae FG |
1254 | |
1255 | cap = existing[0] | |
28e407b8 AA |
1256 | orig_mds_caps = cap['caps'].get('mds', "") |
1257 | orig_osd_caps = cap['caps'].get('osd', "") | |
1258 | mds_cap_str, osd_cap_str = cap_remove(orig_mds_caps, orig_osd_caps, | |
1259 | want_mds_caps, want_osd_caps) | |
1260 | ||
1261 | if not mds_cap_str: | |
7c673cae FG |
1262 | self._rados_command('auth del', {'entity': client_entity}, decode=False) |
1263 | else: | |
1264 | self._rados_command( | |
1265 | 'auth caps', | |
1266 | { | |
1267 | 'entity': client_entity, | |
1268 | 'caps': [ | |
1269 | 'mds', mds_cap_str, | |
1270 | 'osd', osd_cap_str, | |
1271 | 'mon', cap['caps'].get('mon', 'allow r')] | |
1272 | }) | |
1273 | ||
1274 | # FIXME: rados raising Error instead of ObjectNotFound in auth get failure | |
1275 | except rados.Error: | |
1276 | # Already gone, great. | |
1277 | return | |
1278 | ||
1279 | def get_authorized_ids(self, volume_path): | |
1280 | """ | |
1281 | Expose a list of auth IDs that have access to a volume. | |
1282 | ||
1283 | return: a list of (auth_id, access_level) tuples, where | |
1284 | the access_level can be 'r' , or 'rw'. | |
1285 | None if no auth ID is given access to the volume. | |
1286 | """ | |
1287 | with self._volume_lock(volume_path): | |
1288 | meta = self._volume_metadata_get(volume_path) | |
1289 | auths = [] | |
1290 | if not meta or not meta['auths']: | |
1291 | return None | |
1292 | ||
1293 | for auth, auth_data in meta['auths'].items(): | |
1294 | # Skip partial auth updates. | |
1295 | if not auth_data['dirty']: | |
1296 | auths.append((auth, auth_data['access_level'])) | |
1297 | ||
1298 | return auths | |
1299 | ||
1300 | def _rados_command(self, prefix, args=None, decode=True): | |
1301 | """ | |
1302 | Safer wrapper for ceph_argparse.json_command, which raises | |
1303 | Error exception instead of relying on caller to check return | |
1304 | codes. | |
1305 | ||
1306 | Error exception can result from: | |
1307 | * Timeout | |
1308 | * Actual legitimate errors | |
1309 | * Malformed JSON output | |
1310 | ||
1311 | return: Decoded object from ceph, or None if empty string returned. | |
1312 | If decode is False, return a string (the data returned by | |
1313 | ceph command) | |
1314 | """ | |
1315 | if args is None: | |
1316 | args = {} | |
1317 | ||
1318 | argdict = args.copy() | |
1319 | argdict['format'] = 'json' | |
1320 | ||
1321 | ret, outbuf, outs = json_command(self.rados, | |
1322 | prefix=prefix, | |
1323 | argdict=argdict, | |
1324 | timeout=RADOS_TIMEOUT) | |
1325 | if ret != 0: | |
1326 | raise rados.Error(outs) | |
1327 | else: | |
1328 | if decode: | |
1329 | if outbuf: | |
1330 | try: | |
1331 | return json.loads(outbuf) | |
1332 | except (ValueError, TypeError): | |
1333 | raise RadosError("Invalid JSON output for command {0}".format(argdict)) | |
1334 | else: | |
1335 | return None | |
1336 | else: | |
1337 | return outbuf | |
1338 | ||
1339 | def get_used_bytes(self, volume_path): | |
1340 | return int(self.fs.getxattr(self._get_path(volume_path), "ceph.dir.rbytes")) | |
1341 | ||
1342 | def set_max_bytes(self, volume_path, max_bytes): | |
1343 | self.fs.setxattr(self._get_path(volume_path), 'ceph.quota.max_bytes', | |
1344 | max_bytes.__str__() if max_bytes is not None else "0", | |
1345 | 0) | |
1346 | ||
1347 | def _snapshot_path(self, dir_path, snapshot_name): | |
1348 | return os.path.join( | |
3efd9988 | 1349 | dir_path, self.rados.conf_get('client_snapdir'), snapshot_name |
7c673cae FG |
1350 | ) |
1351 | ||
1352 | def _snapshot_create(self, dir_path, snapshot_name): | |
1353 | # TODO: raise intelligible exception for clusters where snaps are disabled | |
1354 | self.fs.mkdir(self._snapshot_path(dir_path, snapshot_name), 0o755) | |
1355 | ||
1356 | def _snapshot_destroy(self, dir_path, snapshot_name): | |
1357 | """ | |
1358 | Remove a snapshot, or do nothing if it already doesn't exist. | |
1359 | """ | |
1360 | try: | |
1361 | self.fs.rmdir(self._snapshot_path(dir_path, snapshot_name)) | |
1362 | except cephfs.ObjectNotFound: | |
1363 | log.warn("Snapshot was already gone: {0}".format(snapshot_name)) | |
1364 | ||
1365 | def create_snapshot_volume(self, volume_path, snapshot_name): | |
1366 | self._snapshot_create(self._get_path(volume_path), snapshot_name) | |
1367 | ||
1368 | def destroy_snapshot_volume(self, volume_path, snapshot_name): | |
1369 | self._snapshot_destroy(self._get_path(volume_path), snapshot_name) | |
1370 | ||
1371 | def create_snapshot_group(self, group_id, snapshot_name): | |
1372 | if group_id is None: | |
1373 | raise RuntimeError("Group ID may not be None") | |
1374 | ||
1375 | return self._snapshot_create(self._get_group_path(group_id), snapshot_name) | |
1376 | ||
1377 | def destroy_snapshot_group(self, group_id, snapshot_name): | |
1378 | if group_id is None: | |
1379 | raise RuntimeError("Group ID may not be None") | |
1380 | if snapshot_name is None: | |
1381 | raise RuntimeError("Snapshot name may not be None") | |
1382 | ||
1383 | return self._snapshot_destroy(self._get_group_path(group_id), snapshot_name) | |
1384 | ||
1385 | def _cp_r(self, src, dst): | |
1386 | # TODO | |
1387 | raise NotImplementedError() | |
1388 | ||
1389 | def clone_volume_to_existing(self, dest_volume_path, src_volume_path, src_snapshot_name): | |
1390 | dest_fs_path = self._get_path(dest_volume_path) | |
1391 | src_snapshot_path = self._snapshot_path(self._get_path(src_volume_path), src_snapshot_name) | |
1392 | ||
1393 | self._cp_r(src_snapshot_path, dest_fs_path) | |
3efd9988 FG |
1394 | |
1395 | def put_object(self, pool_name, object_name, data): | |
1396 | """ | |
1397 | Synchronously write data to an object. | |
1398 | ||
1399 | :param pool_name: name of the pool | |
1400 | :type pool_name: str | |
1401 | :param object_name: name of the object | |
1402 | :type object_name: str | |
1403 | :param data: data to write | |
1404 | :type data: bytes | |
1405 | """ | |
1406 | ioctx = self.rados.open_ioctx(pool_name) | |
1407 | max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024 | |
1408 | if len(data) > max_size: | |
1409 | msg = ("Data to be written to object '{0}' exceeds " | |
1410 | "{1} bytes".format(object_name, max_size)) | |
1411 | log.error(msg) | |
1412 | raise CephFSVolumeClientError(msg) | |
1413 | try: | |
1414 | ioctx.write_full(object_name, data) | |
1415 | finally: | |
1416 | ioctx.close() | |
1417 | ||
1418 | def get_object(self, pool_name, object_name): | |
1419 | """ | |
1420 | Synchronously read data from object. | |
1421 | ||
1422 | :param pool_name: name of the pool | |
1423 | :type pool_name: str | |
1424 | :param object_name: name of the object | |
1425 | :type object_name: str | |
1426 | ||
1427 | :returns: bytes - data read from object | |
1428 | """ | |
1429 | ioctx = self.rados.open_ioctx(pool_name) | |
1430 | max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024 | |
1431 | try: | |
1432 | bytes_read = ioctx.read(object_name, max_size) | |
1433 | if ((len(bytes_read) == max_size) and | |
1434 | (ioctx.read(object_name, 1, offset=max_size))): | |
1435 | log.warning("Size of object {0} exceeds '{1}' bytes " | |
1436 | "read".format(object_name, max_size)) | |
1437 | finally: | |
1438 | ioctx.close() | |
1439 | return bytes_read | |
1440 | ||
1441 | def delete_object(self, pool_name, object_name): | |
1442 | ioctx = self.rados.open_ioctx(pool_name) | |
1443 | try: | |
1444 | ioctx.remove_object(object_name) | |
1445 | except rados.ObjectNotFound: | |
1446 | log.warn("Object '{0}' was already removed".format(object_name)) | |
1447 | finally: | |
1448 | ioctx.close() |