]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | """ |
2 | Copyright (C) 2015 Red Hat, Inc. | |
3 | ||
11fdf7f2 | 4 | LGPL2.1. See file COPYING. |
7c673cae FG |
5 | """ |
6 | ||
7 | from contextlib import contextmanager | |
8 | import errno | |
9 | import fcntl | |
10 | import json | |
11 | import logging | |
12 | import os | |
13 | import re | |
14 | import struct | |
15 | import sys | |
16 | import threading | |
17 | import time | |
18 | import uuid | |
19 | ||
20 | from ceph_argparse import json_command | |
21 | ||
22 | import cephfs | |
23 | import rados | |
24 | ||
91327a77 AA |
25 | def to_bytes(param): |
26 | ''' | |
27 | Helper method that returns byte representation of the given parameter. | |
28 | ''' | |
29 | if isinstance(param, str): | |
30 | return param.encode() | |
31 | else: | |
32 | return str(param).encode() | |
7c673cae FG |
33 | |
34 | class RadosError(Exception): | |
35 | """ | |
36 | Something went wrong talking to Ceph with librados | |
37 | """ | |
38 | pass | |
39 | ||
40 | ||
41 | RADOS_TIMEOUT = 10 | |
7c673cae FG |
42 | |
43 | log = logging.getLogger(__name__) | |
44 | ||
7c673cae FG |
45 | # Reserved volume group name which we use in paths for volumes |
46 | # that are not assigned to a group (i.e. created with group=None) | |
47 | NO_GROUP_NAME = "_nogroup" | |
48 | ||
49 | # Filename extensions for meta files. | |
50 | META_FILE_EXT = ".meta" | |
51 | ||
52 | class VolumePath(object): | |
53 | """ | |
54 | Identify a volume's path as group->volume | |
55 | The Volume ID is a unique identifier, but this is a much more | |
56 | helpful thing to pass around. | |
57 | """ | |
58 | def __init__(self, group_id, volume_id): | |
59 | self.group_id = group_id | |
60 | self.volume_id = volume_id | |
61 | assert self.group_id != NO_GROUP_NAME | |
62 | assert self.volume_id != "" and self.volume_id is not None | |
63 | ||
64 | def __str__(self): | |
65 | return "{0}/{1}".format(self.group_id, self.volume_id) | |
66 | ||
67 | ||
68 | class ClusterTimeout(Exception): | |
69 | """ | |
70 | Exception indicating that we timed out trying to talk to the Ceph cluster, | |
71 | either to the mons, or to any individual daemon that the mons indicate ought | |
72 | to be up but isn't responding to us. | |
73 | """ | |
74 | pass | |
75 | ||
76 | ||
77 | class ClusterError(Exception): | |
78 | """ | |
79 | Exception indicating that the cluster returned an error to a command that | |
80 | we thought should be successful based on our last knowledge of the cluster | |
81 | state. | |
82 | """ | |
83 | def __init__(self, action, result_code, result_str): | |
84 | self._action = action | |
85 | self._result_code = result_code | |
86 | self._result_str = result_str | |
87 | ||
88 | def __str__(self): | |
89 | return "Error {0} (\"{1}\") while {2}".format( | |
90 | self._result_code, self._result_str, self._action) | |
91 | ||
92 | ||
93 | class RankEvicter(threading.Thread): | |
94 | """ | |
95 | Thread for evicting client(s) from a particular MDS daemon instance. | |
96 | ||
97 | This is more complex than simply sending a command, because we have to | |
98 | handle cases where MDS daemons might not be fully up yet, and/or might | |
99 | be transiently unresponsive to commands. | |
100 | """ | |
101 | class GidGone(Exception): | |
102 | pass | |
103 | ||
104 | POLL_PERIOD = 5 | |
105 | ||
106 | def __init__(self, volume_client, client_spec, rank, gid, mds_map, ready_timeout): | |
107 | """ | |
108 | :param client_spec: list of strings, used as filter arguments to "session evict" | |
109 | pass ["id=123"] to evict a single client with session id 123. | |
110 | """ | |
111 | self.rank = rank | |
112 | self.gid = gid | |
113 | self._mds_map = mds_map | |
114 | self._client_spec = client_spec | |
115 | self._volume_client = volume_client | |
116 | self._ready_timeout = ready_timeout | |
117 | self._ready_waited = 0 | |
118 | ||
119 | self.success = False | |
120 | self.exception = None | |
121 | ||
122 | super(RankEvicter, self).__init__() | |
123 | ||
124 | def _ready_to_evict(self): | |
125 | if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid: | |
126 | log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format( | |
127 | self._client_spec, self.rank, self.gid | |
128 | )) | |
129 | raise RankEvicter.GidGone() | |
130 | ||
131 | info = self._mds_map['info']["gid_{0}".format(self.gid)] | |
132 | log.debug("_ready_to_evict: state={0}".format(info['state'])) | |
133 | return info['state'] in ["up:active", "up:clientreplay"] | |
134 | ||
135 | def _wait_for_ready(self): | |
136 | """ | |
137 | Wait for that MDS rank to reach an active or clientreplay state, and | |
138 | not be laggy. | |
139 | """ | |
140 | while not self._ready_to_evict(): | |
141 | if self._ready_waited > self._ready_timeout: | |
142 | raise ClusterTimeout() | |
143 | ||
144 | time.sleep(self.POLL_PERIOD) | |
145 | self._ready_waited += self.POLL_PERIOD | |
146 | ||
11fdf7f2 | 147 | self._mds_map = self._volume_client.get_mds_map() |
7c673cae FG |
148 | |
149 | def _evict(self): | |
150 | """ | |
151 | Run the eviction procedure. Return true on success, false on errors. | |
152 | """ | |
153 | ||
154 | # Wait til the MDS is believed by the mon to be available for commands | |
155 | try: | |
156 | self._wait_for_ready() | |
157 | except self.GidGone: | |
158 | return True | |
159 | ||
160 | # Then send it an evict | |
161 | ret = errno.ETIMEDOUT | |
162 | while ret == errno.ETIMEDOUT: | |
163 | log.debug("mds_command: {0}, {1}".format( | |
164 | "%s" % self.gid, ["session", "evict"] + self._client_spec | |
165 | )) | |
166 | ret, outb, outs = self._volume_client.fs.mds_command( | |
167 | "%s" % self.gid, | |
168 | [json.dumps({ | |
169 | "prefix": "session evict", | |
170 | "filters": self._client_spec | |
171 | })], "") | |
172 | log.debug("mds_command: complete {0} {1}".format(ret, outs)) | |
173 | ||
174 | # If we get a clean response, great, it's gone from that rank. | |
175 | if ret == 0: | |
176 | return True | |
177 | elif ret == errno.ETIMEDOUT: | |
178 | # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error) | |
11fdf7f2 | 179 | self._mds_map = self._volume_client.get_mds_map() |
7c673cae FG |
180 | try: |
181 | self._wait_for_ready() | |
182 | except self.GidGone: | |
183 | return True | |
184 | else: | |
185 | raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs) | |
186 | ||
187 | def run(self): | |
188 | try: | |
189 | self._evict() | |
190 | except Exception as e: | |
191 | self.success = False | |
192 | self.exception = e | |
193 | else: | |
194 | self.success = True | |
195 | ||
196 | ||
197 | class EvictionError(Exception): | |
198 | pass | |
199 | ||
200 | ||
201 | class CephFSVolumeClientError(Exception): | |
202 | """ | |
203 | Something went wrong talking to Ceph using CephFSVolumeClient. | |
204 | """ | |
205 | pass | |
206 | ||
207 | ||
208 | CEPHFSVOLUMECLIENT_VERSION_HISTORY = """ | |
209 | ||
210 | CephFSVolumeClient Version History: | |
211 | ||
212 | * 1 - Initial version | |
3efd9988 | 213 | * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient |
28e407b8 | 214 | * 3 - Allow volumes to be created without RADOS namespace isolation |
91327a77 | 215 | * 4 - Added get_object_and_version, put_object_versioned method to CephFSVolumeClient |
7c673cae FG |
216 | """ |
217 | ||
218 | ||
219 | class CephFSVolumeClient(object): | |
220 | """ | |
221 | Combine libcephfs and librados interfaces to implement a | |
222 | 'Volume' concept implemented as a cephfs directory and | |
223 | client capabilities which restrict mount access to this | |
224 | directory. | |
225 | ||
226 | Additionally, volumes may be in a 'Group'. Conveniently, | |
227 | volumes are a lot like manila shares, and groups are a lot | |
228 | like manila consistency groups. | |
229 | ||
230 | Refer to volumes with VolumePath, which specifies the | |
231 | volume and group IDs (both strings). The group ID may | |
232 | be None. | |
233 | ||
234 | In general, functions in this class are allowed raise rados.Error | |
235 | or cephfs.Error exceptions in unexpected situations. | |
236 | """ | |
237 | ||
238 | # Current version | |
91327a77 | 239 | version = 4 |
7c673cae FG |
240 | |
241 | # Where shall we create our volumes? | |
242 | POOL_PREFIX = "fsvolume_" | |
243 | DEFAULT_VOL_PREFIX = "/volumes" | |
244 | DEFAULT_NS_PREFIX = "fsvolumens_" | |
245 | ||
11fdf7f2 TL |
246 | def __init__(self, auth_id=None, conf_path=None, cluster_name=None, |
247 | volume_prefix=None, pool_ns_prefix=None, rados=None, | |
248 | fs_name=None): | |
249 | """ | |
250 | Either set all three of ``auth_id``, ``conf_path`` and | |
251 | ``cluster_name`` (rados constructed on connect), or | |
252 | set ``rados`` (existing rados instance). | |
253 | """ | |
7c673cae | 254 | self.fs = None |
11fdf7f2 | 255 | self.fs_name = fs_name |
7c673cae | 256 | self.connected = False |
11fdf7f2 | 257 | |
7c673cae FG |
258 | self.conf_path = conf_path |
259 | self.cluster_name = cluster_name | |
260 | self.auth_id = auth_id | |
11fdf7f2 TL |
261 | |
262 | self.rados = rados | |
263 | if self.rados: | |
264 | # Using an externally owned rados, so we won't tear it down | |
265 | # on disconnect | |
266 | self.own_rados = False | |
267 | else: | |
268 | # self.rados will be constructed in connect | |
269 | self.own_rados = True | |
270 | ||
7c673cae FG |
271 | self.volume_prefix = volume_prefix if volume_prefix else self.DEFAULT_VOL_PREFIX |
272 | self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else self.DEFAULT_NS_PREFIX | |
273 | # For flock'ing in cephfs, I want a unique ID to distinguish me | |
274 | # from any other manila-share services that are loading this module. | |
275 | # We could use pid, but that's unnecessary weak: generate a | |
276 | # UUID | |
91327a77 | 277 | self._id = struct.unpack(">Q", uuid.uuid1().bytes[0:8])[0] |
7c673cae FG |
278 | |
279 | # TODO: version the on-disk structures | |
280 | ||
281 | def recover(self): | |
282 | # Scan all auth keys to see if they're dirty: if they are, they have | |
283 | # state that might not have propagated to Ceph or to the related | |
284 | # volumes yet. | |
285 | ||
286 | # Important: we *always* acquire locks in the order auth->volume | |
287 | # That means a volume can never be dirty without the auth key | |
288 | # we're updating it with being dirty at the same time. | |
289 | ||
290 | # First list the auth IDs that have potentially dirty on-disk metadata | |
291 | log.debug("Recovering from partial auth updates (if any)...") | |
292 | ||
293 | try: | |
294 | dir_handle = self.fs.opendir(self.volume_prefix) | |
295 | except cephfs.ObjectNotFound: | |
296 | log.debug("Nothing to recover. No auth meta files.") | |
297 | return | |
298 | ||
299 | d = self.fs.readdir(dir_handle) | |
300 | auth_ids = [] | |
301 | ||
302 | if not d: | |
303 | log.debug("Nothing to recover. No auth meta files.") | |
304 | ||
305 | while d: | |
306 | # Identify auth IDs from auth meta filenames. The auth meta files | |
307 | # are named as, "$<auth_id><meta filename extension>" | |
308 | regex = "^\$(.*){0}$".format(re.escape(META_FILE_EXT)) | |
494da23a | 309 | match = re.search(regex, d.d_name.decode(encoding='utf-8')) |
7c673cae FG |
310 | if match: |
311 | auth_ids.append(match.group(1)) | |
312 | ||
313 | d = self.fs.readdir(dir_handle) | |
314 | ||
315 | self.fs.closedir(dir_handle) | |
316 | ||
317 | # Key points based on ordering: | |
318 | # * Anything added in VMeta is already added in AMeta | |
319 | # * Anything added in Ceph is already added in VMeta | |
320 | # * Anything removed in VMeta is already removed in Ceph | |
321 | # * Anything removed in AMeta is already removed in VMeta | |
322 | ||
323 | # Deauthorization: because I only update metadata AFTER the | |
324 | # update of the next level down, I have the same ordering of | |
325 | # -> things which exist in the AMeta should also exist | |
326 | # in the VMeta, should also exist in Ceph, and the same | |
327 | # recovery procedure that gets me consistent after crashes | |
328 | # during authorization will also work during deauthorization | |
329 | ||
330 | # Now for each auth ID, check for dirty flag and apply updates | |
331 | # if dirty flag is found | |
332 | for auth_id in auth_ids: | |
333 | with self._auth_lock(auth_id): | |
334 | auth_meta = self._auth_metadata_get(auth_id) | |
335 | if not auth_meta or not auth_meta['volumes']: | |
336 | # Clean up auth meta file | |
337 | self.fs.unlink(self._auth_metadata_path(auth_id)) | |
338 | continue | |
339 | if not auth_meta['dirty']: | |
340 | continue | |
341 | self._recover_auth_meta(auth_id, auth_meta) | |
342 | ||
343 | log.debug("Recovered from partial auth updates (if any).") | |
344 | ||
345 | def _recover_auth_meta(self, auth_id, auth_meta): | |
346 | """ | |
347 | Call me after locking the auth meta file. | |
348 | """ | |
349 | remove_volumes = [] | |
350 | ||
351 | for volume, volume_data in auth_meta['volumes'].items(): | |
352 | if not volume_data['dirty']: | |
353 | continue | |
354 | ||
355 | (group_id, volume_id) = volume.split('/') | |
356 | group_id = group_id if group_id is not 'None' else None | |
357 | volume_path = VolumePath(group_id, volume_id) | |
358 | access_level = volume_data['access_level'] | |
359 | ||
360 | with self._volume_lock(volume_path): | |
361 | vol_meta = self._volume_metadata_get(volume_path) | |
362 | ||
363 | # No VMeta update indicates that there was no auth update | |
364 | # in Ceph either. So it's safe to remove corresponding | |
365 | # partial update in AMeta. | |
366 | if not vol_meta or auth_id not in vol_meta['auths']: | |
367 | remove_volumes.append(volume) | |
368 | continue | |
369 | ||
370 | want_auth = { | |
371 | 'access_level': access_level, | |
372 | 'dirty': False, | |
373 | } | |
374 | # VMeta update looks clean. Ceph auth update must have been | |
375 | # clean. | |
376 | if vol_meta['auths'][auth_id] == want_auth: | |
377 | continue | |
378 | ||
379 | readonly = True if access_level is 'r' else False | |
380 | self._authorize_volume(volume_path, auth_id, readonly) | |
381 | ||
382 | # Recovered from partial auth updates for the auth ID's access | |
383 | # to a volume. | |
384 | auth_meta['volumes'][volume]['dirty'] = False | |
385 | self._auth_metadata_set(auth_id, auth_meta) | |
386 | ||
387 | for volume in remove_volumes: | |
388 | del auth_meta['volumes'][volume] | |
389 | ||
390 | if not auth_meta['volumes']: | |
391 | # Clean up auth meta file | |
392 | self.fs.unlink(self._auth_metadata_path(auth_id)) | |
393 | return | |
394 | ||
395 | # Recovered from all partial auth updates for the auth ID. | |
396 | auth_meta['dirty'] = False | |
397 | self._auth_metadata_set(auth_id, auth_meta) | |
398 | ||
11fdf7f2 TL |
399 | def get_mds_map(self): |
400 | fs_map = self._rados_command("fs dump", {}) | |
401 | return fs_map['filesystems'][0]['mdsmap'] | |
7c673cae FG |
402 | |
403 | def evict(self, auth_id, timeout=30, volume_path=None): | |
404 | """ | |
405 | Evict all clients based on the authorization ID and optionally based on | |
406 | the volume path mounted. Assumes that the authorization key has been | |
407 | revoked prior to calling this function. | |
408 | ||
409 | This operation can throw an exception if the mon cluster is unresponsive, or | |
410 | any individual MDS daemon is unresponsive for longer than the timeout passed in. | |
411 | """ | |
412 | ||
413 | client_spec = ["auth_name={0}".format(auth_id), ] | |
414 | if volume_path: | |
415 | client_spec.append("client_metadata.root={0}". | |
416 | format(self._get_path(volume_path))) | |
417 | ||
418 | log.info("evict clients with {0}".format(', '.join(client_spec))) | |
419 | ||
11fdf7f2 | 420 | mds_map = self.get_mds_map() |
7c673cae FG |
421 | up = {} |
422 | for name, gid in mds_map['up'].items(): | |
423 | # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0" | |
424 | assert name.startswith("mds_") | |
425 | up[int(name[4:])] = gid | |
426 | ||
427 | # For all MDS ranks held by a daemon | |
428 | # Do the parallelism in python instead of using "tell mds.*", because | |
429 | # the latter doesn't give us per-mds output | |
430 | threads = [] | |
431 | for rank, gid in up.items(): | |
432 | thread = RankEvicter(self, client_spec, rank, gid, mds_map, | |
433 | timeout) | |
434 | thread.start() | |
435 | threads.append(thread) | |
436 | ||
437 | for t in threads: | |
438 | t.join() | |
439 | ||
440 | log.info("evict: joined all") | |
441 | ||
442 | for t in threads: | |
443 | if not t.success: | |
444 | msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}". | |
445 | format(', '.join(client_spec), t.rank, t.gid, t.exception) | |
446 | ) | |
447 | log.error(msg) | |
448 | raise EvictionError(msg) | |
449 | ||
450 | def _get_path(self, volume_path): | |
451 | """ | |
452 | Determine the path within CephFS where this volume will live | |
453 | :return: absolute path (string) | |
454 | """ | |
455 | return os.path.join( | |
456 | self.volume_prefix, | |
457 | volume_path.group_id if volume_path.group_id is not None else NO_GROUP_NAME, | |
458 | volume_path.volume_id) | |
459 | ||
460 | def _get_group_path(self, group_id): | |
461 | if group_id is None: | |
462 | raise ValueError("group_id may not be None") | |
463 | ||
464 | return os.path.join( | |
465 | self.volume_prefix, | |
466 | group_id | |
467 | ) | |
468 | ||
11fdf7f2 | 469 | def _connect(self, premount_evict): |
7c673cae FG |
470 | log.debug("Connecting to cephfs...") |
471 | self.fs = cephfs.LibCephFS(rados_inst=self.rados) | |
472 | log.debug("CephFS initializing...") | |
473 | self.fs.init() | |
474 | if premount_evict is not None: | |
475 | log.debug("Premount eviction of {0} starting".format(premount_evict)) | |
476 | self.evict(premount_evict) | |
477 | log.debug("Premount eviction of {0} completes".format(premount_evict)) | |
478 | log.debug("CephFS mounting...") | |
11fdf7f2 | 479 | self.fs.mount(filesystem_name=self.fs_name) |
7c673cae FG |
480 | log.debug("Connection to cephfs complete") |
481 | ||
482 | # Recover from partial auth updates due to a previous | |
483 | # crash. | |
484 | self.recover() | |
485 | ||
11fdf7f2 TL |
486 | def connect(self, premount_evict = None): |
487 | """ | |
488 | ||
489 | :param premount_evict: Optional auth_id to evict before mounting the filesystem: callers | |
490 | may want to use this to specify their own auth ID if they expect | |
491 | to be a unique instance and don't want to wait for caps to time | |
492 | out after failure of another instance of themselves. | |
493 | """ | |
494 | if self.own_rados: | |
495 | log.debug("Configuring to RADOS with config {0}...".format(self.conf_path)) | |
496 | self.rados = rados.Rados( | |
497 | name="client.{0}".format(self.auth_id), | |
498 | clustername=self.cluster_name, | |
499 | conffile=self.conf_path, | |
500 | conf={} | |
501 | ) | |
502 | if self.rados.state != "connected": | |
503 | log.debug("Connecting to RADOS...") | |
504 | self.rados.connect() | |
505 | log.debug("Connection to RADOS complete") | |
506 | self._connect(premount_evict) | |
507 | ||
7c673cae FG |
508 | def get_mon_addrs(self): |
509 | log.info("get_mon_addrs") | |
510 | result = [] | |
511 | mon_map = self._rados_command("mon dump") | |
512 | for mon in mon_map['mons']: | |
513 | ip_port = mon['addr'].split("/")[0] | |
514 | result.append(ip_port) | |
515 | ||
516 | return result | |
517 | ||
518 | def disconnect(self): | |
519 | log.info("disconnect") | |
520 | if self.fs: | |
521 | log.debug("Disconnecting cephfs...") | |
522 | self.fs.shutdown() | |
523 | self.fs = None | |
524 | log.debug("Disconnecting cephfs complete") | |
525 | ||
11fdf7f2 | 526 | if self.rados and self.own_rados: |
7c673cae FG |
527 | log.debug("Disconnecting rados...") |
528 | self.rados.shutdown() | |
529 | self.rados = None | |
530 | log.debug("Disconnecting rados complete") | |
531 | ||
11fdf7f2 TL |
532 | def __enter__(self): |
533 | self.connect() | |
534 | return self | |
535 | ||
536 | def __exit__(self, exc_type, exc_val, exc_tb): | |
537 | self.disconnect() | |
538 | ||
7c673cae FG |
539 | def __del__(self): |
540 | self.disconnect() | |
541 | ||
542 | def _get_pool_id(self, osd_map, pool_name): | |
543 | # Maybe borrow the OSDMap wrapper class from calamari if more helpers | |
544 | # like this are needed. | |
545 | for pool in osd_map['pools']: | |
546 | if pool['pool_name'] == pool_name: | |
547 | return pool['pool'] | |
548 | ||
549 | return None | |
550 | ||
551 | def _create_volume_pool(self, pool_name): | |
552 | """ | |
553 | Idempotently create a pool for use as a CephFS data pool, with the given name | |
554 | ||
555 | :return The ID of the created pool | |
556 | """ | |
557 | osd_map = self._rados_command('osd dump', {}) | |
558 | ||
559 | existing_id = self._get_pool_id(osd_map, pool_name) | |
560 | if existing_id is not None: | |
561 | log.info("Pool {0} already exists".format(pool_name)) | |
562 | return existing_id | |
563 | ||
564 | osd_count = len(osd_map['osds']) | |
565 | ||
566 | # We can't query the actual cluster config remotely, but since this is | |
567 | # just a heuristic we'll assume that the ceph.conf we have locally reflects | |
568 | # that in use in the rest of the cluster. | |
3efd9988 | 569 | pg_warn_max_per_osd = int(self.rados.conf_get('mon_max_pg_per_osd')) |
7c673cae FG |
570 | |
571 | other_pgs = 0 | |
572 | for pool in osd_map['pools']: | |
573 | if not pool['pool_name'].startswith(self.POOL_PREFIX): | |
574 | other_pgs += pool['pg_num'] | |
575 | ||
576 | # A basic heuristic for picking pg_num: work out the max number of | |
577 | # PGs we can have without tripping a warning, then subtract the number | |
578 | # of PGs already created by non-manila pools, then divide by ten. That'll | |
579 | # give you a reasonable result on a system where you have "a few" manila | |
580 | # shares. | |
91327a77 | 581 | pg_num = ((pg_warn_max_per_osd * osd_count) - other_pgs) // 10 |
7c673cae FG |
582 | # TODO Alternatively, respect an override set by the user. |
583 | ||
584 | self._rados_command( | |
585 | 'osd pool create', | |
586 | { | |
587 | 'pool': pool_name, | |
91327a77 | 588 | 'pg_num': int(pg_num), |
7c673cae FG |
589 | } |
590 | ) | |
591 | ||
592 | osd_map = self._rados_command('osd dump', {}) | |
593 | pool_id = self._get_pool_id(osd_map, pool_name) | |
594 | ||
595 | if pool_id is None: | |
596 | # If the pool isn't there, that's either a ceph bug, or it's some outside influence | |
597 | # removing it right after we created it. | |
598 | log.error("OSD map doesn't contain expected pool '{0}':\n{1}".format( | |
599 | pool_name, json.dumps(osd_map, indent=2) | |
600 | )) | |
601 | raise RuntimeError("Pool '{0}' not present in map after creation".format(pool_name)) | |
602 | else: | |
603 | return pool_id | |
604 | ||
f64942e4 | 605 | def create_group(self, group_id, mode=0o755): |
7c673cae FG |
606 | # Prevent craftily-named volume groups from colliding with the meta |
607 | # files. | |
608 | if group_id.endswith(META_FILE_EXT): | |
609 | raise ValueError("group ID cannot end with '{0}'.".format( | |
610 | META_FILE_EXT)) | |
611 | path = self._get_group_path(group_id) | |
f64942e4 | 612 | self._mkdir_p(path, mode) |
7c673cae FG |
613 | |
614 | def destroy_group(self, group_id): | |
615 | path = self._get_group_path(group_id) | |
616 | try: | |
617 | self.fs.stat(self.volume_prefix) | |
618 | except cephfs.ObjectNotFound: | |
619 | pass | |
620 | else: | |
621 | self.fs.rmdir(path) | |
622 | ||
f64942e4 | 623 | def _mkdir_p(self, path, mode=0o755): |
7c673cae FG |
624 | try: |
625 | self.fs.stat(path) | |
626 | except cephfs.ObjectNotFound: | |
627 | pass | |
628 | else: | |
629 | return | |
630 | ||
631 | parts = path.split(os.path.sep) | |
632 | ||
633 | for i in range(1, len(parts) + 1): | |
634 | subpath = os.path.join(*parts[0:i]) | |
635 | try: | |
636 | self.fs.stat(subpath) | |
637 | except cephfs.ObjectNotFound: | |
f64942e4 | 638 | self.fs.mkdir(subpath, mode) |
7c673cae | 639 | |
f64942e4 AA |
640 | def create_volume(self, volume_path, size=None, data_isolated=False, namespace_isolated=True, |
641 | mode=0o755): | |
7c673cae FG |
642 | """ |
643 | Set up metadata, pools and auth for a volume. | |
644 | ||
645 | This function is idempotent. It is safe to call this again | |
646 | for an already-created volume, even if it is in use. | |
647 | ||
648 | :param volume_path: VolumePath instance | |
649 | :param size: In bytes, or None for no size limit | |
650 | :param data_isolated: If true, create a separate OSD pool for this volume | |
28e407b8 | 651 | :param namespace_isolated: If true, use separate RADOS namespace for this volume |
7c673cae FG |
652 | :return: |
653 | """ | |
654 | path = self._get_path(volume_path) | |
655 | log.info("create_volume: {0}".format(path)) | |
656 | ||
f64942e4 | 657 | self._mkdir_p(path, mode) |
7c673cae FG |
658 | |
659 | if size is not None: | |
91327a77 | 660 | self.fs.setxattr(path, 'ceph.quota.max_bytes', to_bytes(size), 0) |
7c673cae FG |
661 | |
662 | # data_isolated means create a separate pool for this volume | |
663 | if data_isolated: | |
664 | pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id) | |
665 | log.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path, pool_name)) | |
666 | pool_id = self._create_volume_pool(pool_name) | |
11fdf7f2 | 667 | mds_map = self.get_mds_map() |
7c673cae | 668 | if pool_id not in mds_map['data_pools']: |
11fdf7f2 TL |
669 | self._rados_command("fs add_data_pool", { |
670 | 'fs_name': mds_map['fs_name'], | |
7c673cae FG |
671 | 'pool': pool_name |
672 | }) | |
91327a77 AA |
673 | time.sleep(5) # time for MDSMap to be distributed |
674 | self.fs.setxattr(path, 'ceph.dir.layout.pool', to_bytes(pool_name), 0) | |
7c673cae | 675 | |
28e407b8 AA |
676 | # enforce security isolation, use separate namespace for this volume |
677 | if namespace_isolated: | |
678 | namespace = "{0}{1}".format(self.pool_ns_prefix, volume_path.volume_id) | |
679 | log.info("create_volume: {0}, using rados namespace {1} to isolate data.".format(volume_path, namespace)) | |
91327a77 AA |
680 | self.fs.setxattr(path, 'ceph.dir.layout.pool_namespace', |
681 | to_bytes(namespace), 0) | |
28e407b8 AA |
682 | else: |
683 | # If volume's namespace layout is not set, then the volume's pool | |
684 | # layout remains unset and will undesirably change with ancestor's | |
685 | # pool layout changes. | |
686 | pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool") | |
91327a77 AA |
687 | self.fs.setxattr(path, 'ceph.dir.layout.pool', |
688 | to_bytes(pool_name), 0) | |
7c673cae FG |
689 | |
690 | # Create a volume meta file, if it does not already exist, to store | |
691 | # data about auth ids having access to the volume | |
692 | fd = self.fs.open(self._volume_metadata_path(volume_path), | |
693 | os.O_CREAT, 0o755) | |
694 | self.fs.close(fd) | |
695 | ||
696 | return { | |
697 | 'mount_path': path | |
698 | } | |
699 | ||
700 | def delete_volume(self, volume_path, data_isolated=False): | |
701 | """ | |
702 | Make a volume inaccessible to guests. This function is | |
703 | idempotent. This is the fast part of tearing down a volume: you must | |
704 | also later call purge_volume, which is the slow part. | |
705 | ||
706 | :param volume_path: Same identifier used in create_volume | |
707 | :return: | |
708 | """ | |
709 | ||
710 | path = self._get_path(volume_path) | |
711 | log.info("delete_volume: {0}".format(path)) | |
712 | ||
713 | # Create the trash folder if it doesn't already exist | |
714 | trash = os.path.join(self.volume_prefix, "_deleting") | |
715 | self._mkdir_p(trash) | |
716 | ||
717 | # We'll move it to here | |
718 | trashed_volume = os.path.join(trash, volume_path.volume_id) | |
719 | ||
720 | # Move the volume's data to the trash folder | |
721 | try: | |
722 | self.fs.stat(path) | |
723 | except cephfs.ObjectNotFound: | |
724 | log.warning("Trying to delete volume '{0}' but it's already gone".format( | |
725 | path)) | |
726 | else: | |
727 | self.fs.rename(path, trashed_volume) | |
728 | ||
729 | # Delete the volume meta file, if it's not already deleted | |
730 | vol_meta_path = self._volume_metadata_path(volume_path) | |
731 | try: | |
732 | self.fs.unlink(vol_meta_path) | |
733 | except cephfs.ObjectNotFound: | |
734 | pass | |
735 | ||
736 | def purge_volume(self, volume_path, data_isolated=False): | |
737 | """ | |
738 | Finish clearing up a volume that was previously passed to delete_volume. This | |
739 | function is idempotent. | |
740 | """ | |
741 | ||
742 | trash = os.path.join(self.volume_prefix, "_deleting") | |
743 | trashed_volume = os.path.join(trash, volume_path.volume_id) | |
744 | ||
745 | try: | |
746 | self.fs.stat(trashed_volume) | |
747 | except cephfs.ObjectNotFound: | |
748 | log.warning("Trying to purge volume '{0}' but it's already been purged".format( | |
749 | trashed_volume)) | |
750 | return | |
751 | ||
752 | def rmtree(root_path): | |
753 | log.debug("rmtree {0}".format(root_path)) | |
754 | dir_handle = self.fs.opendir(root_path) | |
755 | d = self.fs.readdir(dir_handle) | |
756 | while d: | |
494da23a TL |
757 | d_name = d.d_name.decode(encoding='utf-8') |
758 | if d_name not in [".", ".."]: | |
7c673cae FG |
759 | # Do not use os.path.join because it is sensitive |
760 | # to string encoding, we just pass through dnames | |
761 | # as byte arrays | |
494da23a | 762 | d_full = u"{0}/{1}".format(root_path, d_name) |
7c673cae FG |
763 | if d.is_dir(): |
764 | rmtree(d_full) | |
765 | else: | |
766 | self.fs.unlink(d_full) | |
767 | ||
768 | d = self.fs.readdir(dir_handle) | |
769 | self.fs.closedir(dir_handle) | |
770 | ||
771 | self.fs.rmdir(root_path) | |
772 | ||
773 | rmtree(trashed_volume) | |
774 | ||
775 | if data_isolated: | |
776 | pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id) | |
777 | osd_map = self._rados_command("osd dump", {}) | |
778 | pool_id = self._get_pool_id(osd_map, pool_name) | |
11fdf7f2 | 779 | mds_map = self.get_mds_map() |
7c673cae | 780 | if pool_id in mds_map['data_pools']: |
11fdf7f2 TL |
781 | self._rados_command("fs rm_data_pool", { |
782 | 'fs_name': mds_map['fs_name'], | |
7c673cae FG |
783 | 'pool': pool_name |
784 | }) | |
785 | self._rados_command("osd pool delete", | |
786 | { | |
787 | "pool": pool_name, | |
788 | "pool2": pool_name, | |
11fdf7f2 | 789 | "yes_i_really_really_mean_it": True |
7c673cae FG |
790 | }) |
791 | ||
792 | def _get_ancestor_xattr(self, path, attr): | |
793 | """ | |
794 | Helper for reading layout information: if this xattr is missing | |
795 | on the requested path, keep checking parents until we find it. | |
796 | """ | |
797 | try: | |
91327a77 | 798 | result = self.fs.getxattr(path, attr).decode() |
7c673cae FG |
799 | if result == "": |
800 | # Annoying! cephfs gives us empty instead of an error when attr not found | |
801 | raise cephfs.NoData() | |
802 | else: | |
803 | return result | |
804 | except cephfs.NoData: | |
805 | if path == "/": | |
806 | raise | |
807 | else: | |
808 | return self._get_ancestor_xattr(os.path.split(path)[0], attr) | |
809 | ||
810 | def _check_compat_version(self, compat_version): | |
811 | if self.version < compat_version: | |
812 | msg = ("The current version of CephFSVolumeClient, version {0} " | |
813 | "does not support the required feature. Need version {1} " | |
814 | "or greater".format(self.version, compat_version) | |
815 | ) | |
816 | log.error(msg) | |
817 | raise CephFSVolumeClientError(msg) | |
818 | ||
819 | def _metadata_get(self, path): | |
820 | """ | |
821 | Return a deserialized JSON object, or None | |
822 | """ | |
823 | fd = self.fs.open(path, "r") | |
824 | # TODO iterate instead of assuming file < 4MB | |
825 | read_bytes = self.fs.read(fd, 0, 4096 * 1024) | |
826 | self.fs.close(fd) | |
827 | if read_bytes: | |
91327a77 | 828 | return json.loads(read_bytes.decode()) |
7c673cae FG |
829 | else: |
830 | return None | |
831 | ||
832 | def _metadata_set(self, path, data): | |
833 | serialized = json.dumps(data) | |
834 | fd = self.fs.open(path, "w") | |
835 | try: | |
91327a77 | 836 | self.fs.write(fd, to_bytes(serialized), 0) |
7c673cae FG |
837 | self.fs.fsync(fd, 0) |
838 | finally: | |
839 | self.fs.close(fd) | |
840 | ||
841 | def _lock(self, path): | |
842 | @contextmanager | |
843 | def fn(): | |
844 | while(1): | |
845 | fd = self.fs.open(path, os.O_CREAT, 0o755) | |
846 | self.fs.flock(fd, fcntl.LOCK_EX, self._id) | |
847 | ||
848 | # The locked file will be cleaned up sometime. It could be | |
849 | # unlinked e.g., by an another manila share instance, before | |
850 | # lock was applied on it. Perform checks to ensure that this | |
851 | # does not happen. | |
852 | try: | |
853 | statbuf = self.fs.stat(path) | |
854 | except cephfs.ObjectNotFound: | |
855 | self.fs.close(fd) | |
856 | continue | |
857 | ||
858 | fstatbuf = self.fs.fstat(fd) | |
859 | if statbuf.st_ino == fstatbuf.st_ino: | |
860 | break | |
861 | ||
862 | try: | |
863 | yield | |
864 | finally: | |
865 | self.fs.flock(fd, fcntl.LOCK_UN, self._id) | |
866 | self.fs.close(fd) | |
867 | ||
868 | return fn() | |
869 | ||
870 | def _auth_metadata_path(self, auth_id): | |
871 | return os.path.join(self.volume_prefix, "${0}{1}".format( | |
872 | auth_id, META_FILE_EXT)) | |
873 | ||
874 | def _auth_lock(self, auth_id): | |
875 | return self._lock(self._auth_metadata_path(auth_id)) | |
876 | ||
877 | def _auth_metadata_get(self, auth_id): | |
878 | """ | |
879 | Call me with the metadata locked! | |
880 | ||
881 | Check whether a auth metadata structure can be decoded by the current | |
882 | version of CephFSVolumeClient. | |
883 | ||
884 | Return auth metadata that the current version of CephFSVolumeClient | |
885 | can decode. | |
886 | """ | |
887 | auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id)) | |
888 | ||
889 | if auth_metadata: | |
890 | self._check_compat_version(auth_metadata['compat_version']) | |
891 | ||
892 | return auth_metadata | |
893 | ||
894 | def _auth_metadata_set(self, auth_id, data): | |
895 | """ | |
896 | Call me with the metadata locked! | |
897 | ||
898 | Fsync the auth metadata. | |
899 | ||
900 | Add two version attributes to the auth metadata, | |
901 | 'compat_version', the minimum CephFSVolumeClient version that can | |
902 | decode the metadata, and 'version', the CephFSVolumeClient version | |
903 | that encoded the metadata. | |
904 | """ | |
905 | data['compat_version'] = 1 | |
3efd9988 | 906 | data['version'] = self.version |
7c673cae FG |
907 | return self._metadata_set(self._auth_metadata_path(auth_id), data) |
908 | ||
909 | def _volume_metadata_path(self, volume_path): | |
910 | return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format( | |
911 | volume_path.group_id if volume_path.group_id else "", | |
912 | volume_path.volume_id, | |
913 | META_FILE_EXT | |
914 | )) | |
915 | ||
916 | def _volume_lock(self, volume_path): | |
917 | """ | |
918 | Return a ContextManager which locks the authorization metadata for | |
919 | a particular volume, and persists a flag to the metadata indicating | |
920 | that it is currently locked, so that we can detect dirty situations | |
921 | during recovery. | |
922 | ||
923 | This lock isn't just to make access to the metadata safe: it's also | |
924 | designed to be used over the two-step process of checking the | |
925 | metadata and then responding to an authorization request, to | |
926 | ensure that at the point we respond the metadata hasn't changed | |
927 | in the background. It's key to how we avoid security holes | |
928 | resulting from races during that problem , | |
929 | """ | |
930 | return self._lock(self._volume_metadata_path(volume_path)) | |
931 | ||
932 | def _volume_metadata_get(self, volume_path): | |
933 | """ | |
934 | Call me with the metadata locked! | |
935 | ||
936 | Check whether a volume metadata structure can be decoded by the current | |
937 | version of CephFSVolumeClient. | |
938 | ||
939 | Return a volume_metadata structure that the current version of | |
940 | CephFSVolumeClient can decode. | |
941 | """ | |
942 | volume_metadata = self._metadata_get(self._volume_metadata_path(volume_path)) | |
943 | ||
944 | if volume_metadata: | |
945 | self._check_compat_version(volume_metadata['compat_version']) | |
946 | ||
947 | return volume_metadata | |
948 | ||
949 | def _volume_metadata_set(self, volume_path, data): | |
950 | """ | |
951 | Call me with the metadata locked! | |
952 | ||
953 | Add two version attributes to the volume metadata, | |
954 | 'compat_version', the minimum CephFSVolumeClient version that can | |
955 | decode the metadata and 'version', the CephFSVolumeClient version | |
956 | that encoded the metadata. | |
957 | """ | |
958 | data['compat_version'] = 1 | |
3efd9988 | 959 | data['version'] = self.version |
7c673cae FG |
960 | return self._metadata_set(self._volume_metadata_path(volume_path), data) |
961 | ||
962 | def authorize(self, volume_path, auth_id, readonly=False, tenant_id=None): | |
963 | """ | |
964 | Get-or-create a Ceph auth identity for `auth_id` and grant them access | |
965 | to | |
966 | :param volume_path: | |
967 | :param auth_id: | |
968 | :param readonly: | |
969 | :param tenant_id: Optionally provide a stringizable object to | |
970 | restrict any created cephx IDs to other callers | |
971 | passing the same tenant ID. | |
972 | :return: | |
973 | """ | |
974 | ||
975 | with self._auth_lock(auth_id): | |
976 | # Existing meta, or None, to be updated | |
977 | auth_meta = self._auth_metadata_get(auth_id) | |
978 | ||
979 | # volume data to be inserted | |
980 | volume_path_str = str(volume_path) | |
981 | volume = { | |
982 | volume_path_str : { | |
983 | # The access level at which the auth_id is authorized to | |
984 | # access the volume. | |
985 | 'access_level': 'r' if readonly else 'rw', | |
986 | 'dirty': True, | |
987 | } | |
988 | } | |
989 | if auth_meta is None: | |
990 | sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format( | |
991 | auth_id, tenant_id | |
992 | )) | |
993 | log.debug("Authorize: no existing meta") | |
994 | auth_meta = { | |
995 | 'dirty': True, | |
996 | 'tenant_id': tenant_id.__str__() if tenant_id else None, | |
997 | 'volumes': volume | |
998 | } | |
999 | ||
1000 | # Note: this is *not* guaranteeing that the key doesn't already | |
1001 | # exist in Ceph: we are allowing VolumeClient tenants to | |
1002 | # 'claim' existing Ceph keys. In order to prevent VolumeClient | |
1003 | # tenants from reading e.g. client.admin keys, you need to | |
1004 | # have configured your VolumeClient user (e.g. Manila) to | |
1005 | # have mon auth caps that prevent it from accessing those keys | |
1006 | # (e.g. limit it to only access keys with a manila.* prefix) | |
1007 | else: | |
1008 | # Disallow tenants to share auth IDs | |
1009 | if auth_meta['tenant_id'].__str__() != tenant_id.__str__(): | |
1010 | msg = "auth ID: {0} is already in use".format(auth_id) | |
1011 | log.error(msg) | |
1012 | raise CephFSVolumeClientError(msg) | |
1013 | ||
1014 | if auth_meta['dirty']: | |
1015 | self._recover_auth_meta(auth_id, auth_meta) | |
1016 | ||
1017 | log.debug("Authorize: existing tenant {tenant}".format( | |
1018 | tenant=auth_meta['tenant_id'] | |
1019 | )) | |
1020 | auth_meta['dirty'] = True | |
1021 | auth_meta['volumes'].update(volume) | |
1022 | ||
1023 | self._auth_metadata_set(auth_id, auth_meta) | |
1024 | ||
1025 | with self._volume_lock(volume_path): | |
1026 | key = self._authorize_volume(volume_path, auth_id, readonly) | |
1027 | ||
1028 | auth_meta['dirty'] = False | |
1029 | auth_meta['volumes'][volume_path_str]['dirty'] = False | |
1030 | self._auth_metadata_set(auth_id, auth_meta) | |
1031 | ||
1032 | if tenant_id: | |
1033 | return { | |
1034 | 'auth_key': key | |
1035 | } | |
1036 | else: | |
1037 | # Caller wasn't multi-tenant aware: be safe and don't give | |
1038 | # them a key | |
1039 | return { | |
1040 | 'auth_key': None | |
1041 | } | |
1042 | ||
1043 | def _authorize_volume(self, volume_path, auth_id, readonly): | |
1044 | vol_meta = self._volume_metadata_get(volume_path) | |
1045 | ||
1046 | access_level = 'r' if readonly else 'rw' | |
1047 | auth = { | |
1048 | auth_id: { | |
1049 | 'access_level': access_level, | |
1050 | 'dirty': True, | |
1051 | } | |
1052 | } | |
1053 | ||
1054 | if vol_meta is None: | |
1055 | vol_meta = { | |
1056 | 'auths': auth | |
1057 | } | |
1058 | else: | |
1059 | vol_meta['auths'].update(auth) | |
1060 | self._volume_metadata_set(volume_path, vol_meta) | |
1061 | ||
1062 | key = self._authorize_ceph(volume_path, auth_id, readonly) | |
1063 | ||
1064 | vol_meta['auths'][auth_id]['dirty'] = False | |
1065 | self._volume_metadata_set(volume_path, vol_meta) | |
1066 | ||
1067 | return key | |
1068 | ||
1069 | def _authorize_ceph(self, volume_path, auth_id, readonly): | |
1070 | path = self._get_path(volume_path) | |
1071 | log.debug("Authorizing Ceph id '{0}' for path '{1}'".format( | |
1072 | auth_id, path | |
1073 | )) | |
1074 | ||
1075 | # First I need to work out what the data pool is for this share: | |
1076 | # read the layout | |
1077 | pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool") | |
28e407b8 AA |
1078 | |
1079 | try: | |
91327a77 AA |
1080 | namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_" |
1081 | "namespace").decode() | |
28e407b8 AA |
1082 | except cephfs.NoData: |
1083 | namespace = None | |
7c673cae FG |
1084 | |
1085 | # Now construct auth capabilities that give the guest just enough | |
1086 | # permissions to access the share | |
1087 | client_entity = "client.{0}".format(auth_id) | |
1088 | want_access_level = 'r' if readonly else 'rw' | |
1089 | want_mds_cap = 'allow {0} path={1}'.format(want_access_level, path) | |
28e407b8 AA |
1090 | if namespace: |
1091 | want_osd_cap = 'allow {0} pool={1} namespace={2}'.format( | |
1092 | want_access_level, pool_name, namespace) | |
1093 | else: | |
1094 | want_osd_cap = 'allow {0} pool={1}'.format(want_access_level, | |
1095 | pool_name) | |
7c673cae FG |
1096 | |
1097 | try: | |
1098 | existing = self._rados_command( | |
1099 | 'auth get', | |
1100 | { | |
1101 | 'entity': client_entity | |
1102 | } | |
1103 | ) | |
1104 | # FIXME: rados raising Error instead of ObjectNotFound in auth get failure | |
1105 | except rados.Error: | |
1106 | caps = self._rados_command( | |
1107 | 'auth get-or-create', | |
1108 | { | |
1109 | 'entity': client_entity, | |
1110 | 'caps': [ | |
1111 | 'mds', want_mds_cap, | |
1112 | 'osd', want_osd_cap, | |
1113 | 'mon', 'allow r'] | |
1114 | }) | |
1115 | else: | |
1116 | # entity exists, update it | |
1117 | cap = existing[0] | |
1118 | ||
1119 | # Construct auth caps that if present might conflict with the desired | |
1120 | # auth caps. | |
1121 | unwanted_access_level = 'r' if want_access_level is 'rw' else 'rw' | |
1122 | unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, path) | |
28e407b8 AA |
1123 | if namespace: |
1124 | unwanted_osd_cap = 'allow {0} pool={1} namespace={2}'.format( | |
1125 | unwanted_access_level, pool_name, namespace) | |
1126 | else: | |
1127 | unwanted_osd_cap = 'allow {0} pool={1}'.format( | |
1128 | unwanted_access_level, pool_name) | |
1129 | ||
1130 | def cap_update( | |
1131 | orig_mds_caps, orig_osd_caps, want_mds_cap, | |
1132 | want_osd_cap, unwanted_mds_cap, unwanted_osd_cap): | |
7c673cae | 1133 | |
28e407b8 AA |
1134 | if not orig_mds_caps: |
1135 | return want_mds_cap, want_osd_cap | |
7c673cae | 1136 | |
28e407b8 AA |
1137 | mds_cap_tokens = orig_mds_caps.split(",") |
1138 | osd_cap_tokens = orig_osd_caps.split(",") | |
3efd9988 | 1139 | |
28e407b8 AA |
1140 | if want_mds_cap in mds_cap_tokens: |
1141 | return orig_mds_caps, orig_osd_caps | |
7c673cae | 1142 | |
28e407b8 AA |
1143 | if unwanted_mds_cap in mds_cap_tokens: |
1144 | mds_cap_tokens.remove(unwanted_mds_cap) | |
1145 | osd_cap_tokens.remove(unwanted_osd_cap) | |
7c673cae | 1146 | |
28e407b8 AA |
1147 | mds_cap_tokens.append(want_mds_cap) |
1148 | osd_cap_tokens.append(want_osd_cap) | |
7c673cae | 1149 | |
28e407b8 AA |
1150 | return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens) |
1151 | ||
1152 | orig_mds_caps = cap['caps'].get('mds', "") | |
1153 | orig_osd_caps = cap['caps'].get('osd', "") | |
1154 | ||
1155 | mds_cap_str, osd_cap_str = cap_update( | |
1156 | orig_mds_caps, orig_osd_caps, want_mds_cap, want_osd_cap, | |
1157 | unwanted_mds_cap, unwanted_osd_cap) | |
7c673cae FG |
1158 | |
1159 | caps = self._rados_command( | |
1160 | 'auth caps', | |
1161 | { | |
1162 | 'entity': client_entity, | |
1163 | 'caps': [ | |
1164 | 'mds', mds_cap_str, | |
1165 | 'osd', osd_cap_str, | |
1166 | 'mon', cap['caps'].get('mon', 'allow r')] | |
1167 | }) | |
1168 | caps = self._rados_command( | |
1169 | 'auth get', | |
1170 | { | |
1171 | 'entity': client_entity | |
1172 | } | |
1173 | ) | |
1174 | ||
1175 | # Result expected like this: | |
1176 | # [ | |
1177 | # { | |
1178 | # "entity": "client.foobar", | |
1179 | # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==", | |
1180 | # "caps": { | |
1181 | # "mds": "allow *", | |
1182 | # "mon": "allow *" | |
1183 | # } | |
1184 | # } | |
1185 | # ] | |
1186 | assert len(caps) == 1 | |
1187 | assert caps[0]['entity'] == client_entity | |
1188 | return caps[0]['key'] | |
1189 | ||
1190 | def deauthorize(self, volume_path, auth_id): | |
1191 | with self._auth_lock(auth_id): | |
1192 | # Existing meta, or None, to be updated | |
1193 | auth_meta = self._auth_metadata_get(auth_id) | |
1194 | ||
1195 | volume_path_str = str(volume_path) | |
1196 | if (auth_meta is None) or (not auth_meta['volumes']): | |
1197 | log.warn("deauthorized called for already-removed auth" | |
1198 | "ID '{auth_id}' for volume ID '{volume}'".format( | |
1199 | auth_id=auth_id, volume=volume_path.volume_id | |
1200 | )) | |
1201 | # Clean up the auth meta file of an auth ID | |
1202 | self.fs.unlink(self._auth_metadata_path(auth_id)) | |
1203 | return | |
1204 | ||
1205 | if volume_path_str not in auth_meta['volumes']: | |
1206 | log.warn("deauthorized called for already-removed auth" | |
1207 | "ID '{auth_id}' for volume ID '{volume}'".format( | |
1208 | auth_id=auth_id, volume=volume_path.volume_id | |
1209 | )) | |
1210 | return | |
1211 | ||
1212 | if auth_meta['dirty']: | |
1213 | self._recover_auth_meta(auth_id, auth_meta) | |
1214 | ||
1215 | auth_meta['dirty'] = True | |
1216 | auth_meta['volumes'][volume_path_str]['dirty'] = True | |
1217 | self._auth_metadata_set(auth_id, auth_meta) | |
1218 | ||
1219 | self._deauthorize_volume(volume_path, auth_id) | |
1220 | ||
1221 | # Filter out the volume we're deauthorizing | |
1222 | del auth_meta['volumes'][volume_path_str] | |
1223 | ||
1224 | # Clean up auth meta file | |
1225 | if not auth_meta['volumes']: | |
1226 | self.fs.unlink(self._auth_metadata_path(auth_id)) | |
1227 | return | |
1228 | ||
1229 | auth_meta['dirty'] = False | |
1230 | self._auth_metadata_set(auth_id, auth_meta) | |
1231 | ||
1232 | def _deauthorize_volume(self, volume_path, auth_id): | |
1233 | with self._volume_lock(volume_path): | |
1234 | vol_meta = self._volume_metadata_get(volume_path) | |
1235 | ||
1236 | if (vol_meta is None) or (auth_id not in vol_meta['auths']): | |
1237 | log.warn("deauthorized called for already-removed auth" | |
1238 | "ID '{auth_id}' for volume ID '{volume}'".format( | |
1239 | auth_id=auth_id, volume=volume_path.volume_id | |
1240 | )) | |
1241 | return | |
1242 | ||
1243 | vol_meta['auths'][auth_id]['dirty'] = True | |
1244 | self._volume_metadata_set(volume_path, vol_meta) | |
1245 | ||
1246 | self._deauthorize(volume_path, auth_id) | |
1247 | ||
1248 | # Remove the auth_id from the metadata *after* removing it | |
1249 | # from ceph, so that if we crashed here, we would actually | |
1250 | # recreate the auth ID during recovery (i.e. end up with | |
1251 | # a consistent state). | |
1252 | ||
1253 | # Filter out the auth we're removing | |
1254 | del vol_meta['auths'][auth_id] | |
1255 | self._volume_metadata_set(volume_path, vol_meta) | |
1256 | ||
1257 | def _deauthorize(self, volume_path, auth_id): | |
1258 | """ | |
1259 | The volume must still exist. | |
1260 | """ | |
1261 | client_entity = "client.{0}".format(auth_id) | |
1262 | path = self._get_path(volume_path) | |
1263 | pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool") | |
28e407b8 | 1264 | try: |
91327a77 AA |
1265 | namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_" |
1266 | "namespace").decode() | |
28e407b8 AA |
1267 | except cephfs.NoData: |
1268 | namespace = None | |
7c673cae FG |
1269 | |
1270 | # The auth_id might have read-only or read-write mount access for the | |
1271 | # volume path. | |
1272 | access_levels = ('r', 'rw') | |
28e407b8 AA |
1273 | want_mds_caps = ['allow {0} path={1}'.format(access_level, path) |
1274 | for access_level in access_levels] | |
1275 | if namespace: | |
1276 | want_osd_caps = ['allow {0} pool={1} namespace={2}'.format(access_level, pool_name, namespace) | |
1277 | for access_level in access_levels] | |
1278 | else: | |
1279 | want_osd_caps = ['allow {0} pool={1}'.format(access_level, pool_name) | |
1280 | for access_level in access_levels] | |
1281 | ||
7c673cae FG |
1282 | |
1283 | try: | |
1284 | existing = self._rados_command( | |
1285 | 'auth get', | |
1286 | { | |
1287 | 'entity': client_entity | |
1288 | } | |
1289 | ) | |
1290 | ||
28e407b8 AA |
1291 | def cap_remove(orig_mds_caps, orig_osd_caps, want_mds_caps, want_osd_caps): |
1292 | mds_cap_tokens = orig_mds_caps.split(",") | |
1293 | osd_cap_tokens = orig_osd_caps.split(",") | |
1294 | ||
1295 | for want_mds_cap, want_osd_cap in zip(want_mds_caps, want_osd_caps): | |
1296 | if want_mds_cap in mds_cap_tokens: | |
1297 | mds_cap_tokens.remove(want_mds_cap) | |
1298 | osd_cap_tokens.remove(want_osd_cap) | |
1299 | break | |
1300 | ||
1301 | return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens) | |
7c673cae FG |
1302 | |
1303 | cap = existing[0] | |
28e407b8 AA |
1304 | orig_mds_caps = cap['caps'].get('mds', "") |
1305 | orig_osd_caps = cap['caps'].get('osd', "") | |
1306 | mds_cap_str, osd_cap_str = cap_remove(orig_mds_caps, orig_osd_caps, | |
1307 | want_mds_caps, want_osd_caps) | |
1308 | ||
1309 | if not mds_cap_str: | |
7c673cae FG |
1310 | self._rados_command('auth del', {'entity': client_entity}, decode=False) |
1311 | else: | |
1312 | self._rados_command( | |
1313 | 'auth caps', | |
1314 | { | |
1315 | 'entity': client_entity, | |
1316 | 'caps': [ | |
1317 | 'mds', mds_cap_str, | |
1318 | 'osd', osd_cap_str, | |
1319 | 'mon', cap['caps'].get('mon', 'allow r')] | |
1320 | }) | |
1321 | ||
1322 | # FIXME: rados raising Error instead of ObjectNotFound in auth get failure | |
1323 | except rados.Error: | |
1324 | # Already gone, great. | |
1325 | return | |
1326 | ||
1327 | def get_authorized_ids(self, volume_path): | |
1328 | """ | |
1329 | Expose a list of auth IDs that have access to a volume. | |
1330 | ||
1331 | return: a list of (auth_id, access_level) tuples, where | |
1332 | the access_level can be 'r' , or 'rw'. | |
1333 | None if no auth ID is given access to the volume. | |
1334 | """ | |
1335 | with self._volume_lock(volume_path): | |
1336 | meta = self._volume_metadata_get(volume_path) | |
1337 | auths = [] | |
1338 | if not meta or not meta['auths']: | |
1339 | return None | |
1340 | ||
1341 | for auth, auth_data in meta['auths'].items(): | |
1342 | # Skip partial auth updates. | |
1343 | if not auth_data['dirty']: | |
1344 | auths.append((auth, auth_data['access_level'])) | |
1345 | ||
1346 | return auths | |
1347 | ||
1348 | def _rados_command(self, prefix, args=None, decode=True): | |
1349 | """ | |
1350 | Safer wrapper for ceph_argparse.json_command, which raises | |
1351 | Error exception instead of relying on caller to check return | |
1352 | codes. | |
1353 | ||
1354 | Error exception can result from: | |
1355 | * Timeout | |
1356 | * Actual legitimate errors | |
1357 | * Malformed JSON output | |
1358 | ||
1359 | return: Decoded object from ceph, or None if empty string returned. | |
1360 | If decode is False, return a string (the data returned by | |
1361 | ceph command) | |
1362 | """ | |
1363 | if args is None: | |
1364 | args = {} | |
1365 | ||
1366 | argdict = args.copy() | |
1367 | argdict['format'] = 'json' | |
1368 | ||
1369 | ret, outbuf, outs = json_command(self.rados, | |
1370 | prefix=prefix, | |
1371 | argdict=argdict, | |
1372 | timeout=RADOS_TIMEOUT) | |
1373 | if ret != 0: | |
1374 | raise rados.Error(outs) | |
1375 | else: | |
1376 | if decode: | |
1377 | if outbuf: | |
1378 | try: | |
91327a77 | 1379 | return json.loads(outbuf.decode()) |
7c673cae FG |
1380 | except (ValueError, TypeError): |
1381 | raise RadosError("Invalid JSON output for command {0}".format(argdict)) | |
1382 | else: | |
1383 | return None | |
1384 | else: | |
1385 | return outbuf | |
1386 | ||
1387 | def get_used_bytes(self, volume_path): | |
91327a77 AA |
1388 | return int(self.fs.getxattr(self._get_path(volume_path), "ceph.dir." |
1389 | "rbytes").decode()) | |
7c673cae FG |
1390 | |
1391 | def set_max_bytes(self, volume_path, max_bytes): | |
1392 | self.fs.setxattr(self._get_path(volume_path), 'ceph.quota.max_bytes', | |
91327a77 | 1393 | to_bytes(max_bytes if max_bytes else 0), 0) |
7c673cae FG |
1394 | |
1395 | def _snapshot_path(self, dir_path, snapshot_name): | |
1396 | return os.path.join( | |
3efd9988 | 1397 | dir_path, self.rados.conf_get('client_snapdir'), snapshot_name |
7c673cae FG |
1398 | ) |
1399 | ||
f64942e4 | 1400 | def _snapshot_create(self, dir_path, snapshot_name, mode=0o755): |
7c673cae | 1401 | # TODO: raise intelligible exception for clusters where snaps are disabled |
f64942e4 | 1402 | self.fs.mkdir(self._snapshot_path(dir_path, snapshot_name), mode) |
7c673cae FG |
1403 | |
1404 | def _snapshot_destroy(self, dir_path, snapshot_name): | |
1405 | """ | |
1406 | Remove a snapshot, or do nothing if it already doesn't exist. | |
1407 | """ | |
1408 | try: | |
1409 | self.fs.rmdir(self._snapshot_path(dir_path, snapshot_name)) | |
1410 | except cephfs.ObjectNotFound: | |
1411 | log.warn("Snapshot was already gone: {0}".format(snapshot_name)) | |
1412 | ||
f64942e4 AA |
1413 | def create_snapshot_volume(self, volume_path, snapshot_name, mode=0o755): |
1414 | self._snapshot_create(self._get_path(volume_path), snapshot_name, mode) | |
7c673cae FG |
1415 | |
1416 | def destroy_snapshot_volume(self, volume_path, snapshot_name): | |
1417 | self._snapshot_destroy(self._get_path(volume_path), snapshot_name) | |
1418 | ||
f64942e4 | 1419 | def create_snapshot_group(self, group_id, snapshot_name, mode=0o755): |
7c673cae FG |
1420 | if group_id is None: |
1421 | raise RuntimeError("Group ID may not be None") | |
1422 | ||
f64942e4 AA |
1423 | return self._snapshot_create(self._get_group_path(group_id), snapshot_name, |
1424 | mode) | |
7c673cae FG |
1425 | |
1426 | def destroy_snapshot_group(self, group_id, snapshot_name): | |
1427 | if group_id is None: | |
1428 | raise RuntimeError("Group ID may not be None") | |
1429 | if snapshot_name is None: | |
1430 | raise RuntimeError("Snapshot name may not be None") | |
1431 | ||
1432 | return self._snapshot_destroy(self._get_group_path(group_id), snapshot_name) | |
1433 | ||
1434 | def _cp_r(self, src, dst): | |
1435 | # TODO | |
1436 | raise NotImplementedError() | |
1437 | ||
1438 | def clone_volume_to_existing(self, dest_volume_path, src_volume_path, src_snapshot_name): | |
1439 | dest_fs_path = self._get_path(dest_volume_path) | |
1440 | src_snapshot_path = self._snapshot_path(self._get_path(src_volume_path), src_snapshot_name) | |
1441 | ||
1442 | self._cp_r(src_snapshot_path, dest_fs_path) | |
3efd9988 FG |
1443 | |
1444 | def put_object(self, pool_name, object_name, data): | |
1445 | """ | |
1446 | Synchronously write data to an object. | |
1447 | ||
1448 | :param pool_name: name of the pool | |
1449 | :type pool_name: str | |
1450 | :param object_name: name of the object | |
1451 | :type object_name: str | |
1452 | :param data: data to write | |
1453 | :type data: bytes | |
1454 | """ | |
91327a77 AA |
1455 | return self.put_object_versioned(pool_name, object_name, data) |
1456 | ||
1457 | def put_object_versioned(self, pool_name, object_name, data, version=None): | |
1458 | """ | |
1459 | Synchronously write data to an object only if version of the object | |
1460 | version matches the expected version. | |
1461 | ||
1462 | :param pool_name: name of the pool | |
1463 | :type pool_name: str | |
1464 | :param object_name: name of the object | |
1465 | :type object_name: str | |
1466 | :param data: data to write | |
1467 | :type data: bytes | |
1468 | :param version: expected version of the object to write | |
1469 | :type version: int | |
1470 | """ | |
3efd9988 | 1471 | ioctx = self.rados.open_ioctx(pool_name) |
91327a77 | 1472 | |
3efd9988 FG |
1473 | max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024 |
1474 | if len(data) > max_size: | |
1475 | msg = ("Data to be written to object '{0}' exceeds " | |
1476 | "{1} bytes".format(object_name, max_size)) | |
1477 | log.error(msg) | |
1478 | raise CephFSVolumeClientError(msg) | |
91327a77 | 1479 | |
3efd9988 | 1480 | try: |
81eedcae | 1481 | with rados.WriteOpCtx() as wop: |
91327a77 AA |
1482 | if version is not None: |
1483 | wop.assert_version(version) | |
1484 | wop.write_full(data) | |
1485 | ioctx.operate_write_op(wop, object_name) | |
1486 | except rados.OSError as e: | |
1487 | log.error(e) | |
1488 | raise e | |
3efd9988 FG |
1489 | finally: |
1490 | ioctx.close() | |
1491 | ||
1492 | def get_object(self, pool_name, object_name): | |
1493 | """ | |
1494 | Synchronously read data from object. | |
1495 | ||
1496 | :param pool_name: name of the pool | |
1497 | :type pool_name: str | |
1498 | :param object_name: name of the object | |
1499 | :type object_name: str | |
1500 | ||
1501 | :returns: bytes - data read from object | |
1502 | """ | |
91327a77 AA |
1503 | return self.get_object_and_version(pool_name, object_name)[0] |
1504 | ||
1505 | def get_object_and_version(self, pool_name, object_name): | |
1506 | """ | |
1507 | Synchronously read data from object and get its version. | |
1508 | ||
1509 | :param pool_name: name of the pool | |
1510 | :type pool_name: str | |
1511 | :param object_name: name of the object | |
1512 | :type object_name: str | |
1513 | ||
1514 | :returns: tuple of object data and version | |
1515 | """ | |
3efd9988 FG |
1516 | ioctx = self.rados.open_ioctx(pool_name) |
1517 | max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024 | |
1518 | try: | |
1519 | bytes_read = ioctx.read(object_name, max_size) | |
1520 | if ((len(bytes_read) == max_size) and | |
1521 | (ioctx.read(object_name, 1, offset=max_size))): | |
1522 | log.warning("Size of object {0} exceeds '{1}' bytes " | |
1523 | "read".format(object_name, max_size)) | |
91327a77 | 1524 | obj_version = ioctx.get_last_version() |
3efd9988 FG |
1525 | finally: |
1526 | ioctx.close() | |
91327a77 | 1527 | return (bytes_read, obj_version) |
3efd9988 FG |
1528 | |
1529 | def delete_object(self, pool_name, object_name): | |
1530 | ioctx = self.rados.open_ioctx(pool_name) | |
1531 | try: | |
1532 | ioctx.remove_object(object_name) | |
1533 | except rados.ObjectNotFound: | |
1534 | log.warn("Object '{0}' was already removed".format(object_name)) | |
1535 | finally: | |
1536 | ioctx.close() |