]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | """ |
2 | Copyright (C) 2015 Red Hat, Inc. | |
3 | ||
9f95a23c | 4 | LGPL-2.1 or LGPL-3.0. See file COPYING. |
7c673cae FG |
5 | """ |
6 | ||
7 | from contextlib import contextmanager | |
8 | import errno | |
9 | import fcntl | |
10 | import json | |
11 | import logging | |
12 | import os | |
13 | import re | |
14 | import struct | |
15 | import sys | |
16 | import threading | |
17 | import time | |
18 | import uuid | |
19 | ||
20 | from ceph_argparse import json_command | |
21 | ||
22 | import cephfs | |
23 | import rados | |
24 | ||
91327a77 AA |
25 | def to_bytes(param): |
26 | ''' | |
27 | Helper method that returns byte representation of the given parameter. | |
28 | ''' | |
29 | if isinstance(param, str): | |
eafe8130 TL |
30 | return param.encode('utf-8') |
31 | elif param is None: | |
32 | return param | |
91327a77 | 33 | else: |
eafe8130 | 34 | return str(param).encode('utf-8') |
7c673cae FG |
35 | |
36 | class RadosError(Exception): | |
37 | """ | |
38 | Something went wrong talking to Ceph with librados | |
39 | """ | |
40 | pass | |
41 | ||
42 | ||
43 | RADOS_TIMEOUT = 10 | |
7c673cae FG |
44 | |
45 | log = logging.getLogger(__name__) | |
46 | ||
7c673cae FG |
47 | # Reserved volume group name which we use in paths for volumes |
48 | # that are not assigned to a group (i.e. created with group=None) | |
49 | NO_GROUP_NAME = "_nogroup" | |
50 | ||
51 | # Filename extensions for meta files. | |
52 | META_FILE_EXT = ".meta" | |
53 | ||
54 | class VolumePath(object): | |
55 | """ | |
56 | Identify a volume's path as group->volume | |
57 | The Volume ID is a unique identifier, but this is a much more | |
58 | helpful thing to pass around. | |
59 | """ | |
60 | def __init__(self, group_id, volume_id): | |
61 | self.group_id = group_id | |
62 | self.volume_id = volume_id | |
63 | assert self.group_id != NO_GROUP_NAME | |
64 | assert self.volume_id != "" and self.volume_id is not None | |
65 | ||
66 | def __str__(self): | |
67 | return "{0}/{1}".format(self.group_id, self.volume_id) | |
68 | ||
69 | ||
70 | class ClusterTimeout(Exception): | |
71 | """ | |
72 | Exception indicating that we timed out trying to talk to the Ceph cluster, | |
73 | either to the mons, or to any individual daemon that the mons indicate ought | |
74 | to be up but isn't responding to us. | |
75 | """ | |
76 | pass | |
77 | ||
78 | ||
79 | class ClusterError(Exception): | |
80 | """ | |
81 | Exception indicating that the cluster returned an error to a command that | |
82 | we thought should be successful based on our last knowledge of the cluster | |
83 | state. | |
84 | """ | |
85 | def __init__(self, action, result_code, result_str): | |
86 | self._action = action | |
87 | self._result_code = result_code | |
88 | self._result_str = result_str | |
89 | ||
90 | def __str__(self): | |
91 | return "Error {0} (\"{1}\") while {2}".format( | |
92 | self._result_code, self._result_str, self._action) | |
93 | ||
94 | ||
95 | class RankEvicter(threading.Thread): | |
96 | """ | |
97 | Thread for evicting client(s) from a particular MDS daemon instance. | |
98 | ||
99 | This is more complex than simply sending a command, because we have to | |
100 | handle cases where MDS daemons might not be fully up yet, and/or might | |
101 | be transiently unresponsive to commands. | |
102 | """ | |
103 | class GidGone(Exception): | |
104 | pass | |
105 | ||
106 | POLL_PERIOD = 5 | |
107 | ||
108 | def __init__(self, volume_client, client_spec, rank, gid, mds_map, ready_timeout): | |
109 | """ | |
110 | :param client_spec: list of strings, used as filter arguments to "session evict" | |
111 | pass ["id=123"] to evict a single client with session id 123. | |
112 | """ | |
113 | self.rank = rank | |
114 | self.gid = gid | |
115 | self._mds_map = mds_map | |
116 | self._client_spec = client_spec | |
117 | self._volume_client = volume_client | |
118 | self._ready_timeout = ready_timeout | |
119 | self._ready_waited = 0 | |
120 | ||
121 | self.success = False | |
122 | self.exception = None | |
123 | ||
124 | super(RankEvicter, self).__init__() | |
125 | ||
126 | def _ready_to_evict(self): | |
127 | if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid: | |
128 | log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format( | |
129 | self._client_spec, self.rank, self.gid | |
130 | )) | |
131 | raise RankEvicter.GidGone() | |
132 | ||
133 | info = self._mds_map['info']["gid_{0}".format(self.gid)] | |
134 | log.debug("_ready_to_evict: state={0}".format(info['state'])) | |
135 | return info['state'] in ["up:active", "up:clientreplay"] | |
136 | ||
137 | def _wait_for_ready(self): | |
138 | """ | |
139 | Wait for that MDS rank to reach an active or clientreplay state, and | |
140 | not be laggy. | |
141 | """ | |
142 | while not self._ready_to_evict(): | |
143 | if self._ready_waited > self._ready_timeout: | |
144 | raise ClusterTimeout() | |
145 | ||
146 | time.sleep(self.POLL_PERIOD) | |
147 | self._ready_waited += self.POLL_PERIOD | |
148 | ||
11fdf7f2 | 149 | self._mds_map = self._volume_client.get_mds_map() |
7c673cae FG |
150 | |
151 | def _evict(self): | |
152 | """ | |
153 | Run the eviction procedure. Return true on success, false on errors. | |
154 | """ | |
155 | ||
156 | # Wait til the MDS is believed by the mon to be available for commands | |
157 | try: | |
158 | self._wait_for_ready() | |
159 | except self.GidGone: | |
160 | return True | |
161 | ||
162 | # Then send it an evict | |
163 | ret = errno.ETIMEDOUT | |
164 | while ret == errno.ETIMEDOUT: | |
165 | log.debug("mds_command: {0}, {1}".format( | |
166 | "%s" % self.gid, ["session", "evict"] + self._client_spec | |
167 | )) | |
168 | ret, outb, outs = self._volume_client.fs.mds_command( | |
169 | "%s" % self.gid, | |
170 | [json.dumps({ | |
171 | "prefix": "session evict", | |
172 | "filters": self._client_spec | |
173 | })], "") | |
174 | log.debug("mds_command: complete {0} {1}".format(ret, outs)) | |
175 | ||
176 | # If we get a clean response, great, it's gone from that rank. | |
177 | if ret == 0: | |
178 | return True | |
179 | elif ret == errno.ETIMEDOUT: | |
180 | # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error) | |
11fdf7f2 | 181 | self._mds_map = self._volume_client.get_mds_map() |
7c673cae FG |
182 | try: |
183 | self._wait_for_ready() | |
184 | except self.GidGone: | |
185 | return True | |
186 | else: | |
187 | raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs) | |
188 | ||
189 | def run(self): | |
190 | try: | |
191 | self._evict() | |
192 | except Exception as e: | |
193 | self.success = False | |
194 | self.exception = e | |
195 | else: | |
196 | self.success = True | |
197 | ||
198 | ||
199 | class EvictionError(Exception): | |
200 | pass | |
201 | ||
202 | ||
203 | class CephFSVolumeClientError(Exception): | |
204 | """ | |
205 | Something went wrong talking to Ceph using CephFSVolumeClient. | |
206 | """ | |
207 | pass | |
208 | ||
209 | ||
210 | CEPHFSVOLUMECLIENT_VERSION_HISTORY = """ | |
211 | ||
212 | CephFSVolumeClient Version History: | |
213 | ||
214 | * 1 - Initial version | |
3efd9988 | 215 | * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient |
28e407b8 | 216 | * 3 - Allow volumes to be created without RADOS namespace isolation |
91327a77 | 217 | * 4 - Added get_object_and_version, put_object_versioned method to CephFSVolumeClient |
7c673cae FG |
218 | """ |
219 | ||
220 | ||
221 | class CephFSVolumeClient(object): | |
222 | """ | |
223 | Combine libcephfs and librados interfaces to implement a | |
224 | 'Volume' concept implemented as a cephfs directory and | |
225 | client capabilities which restrict mount access to this | |
226 | directory. | |
227 | ||
228 | Additionally, volumes may be in a 'Group'. Conveniently, | |
229 | volumes are a lot like manila shares, and groups are a lot | |
230 | like manila consistency groups. | |
231 | ||
232 | Refer to volumes with VolumePath, which specifies the | |
233 | volume and group IDs (both strings). The group ID may | |
234 | be None. | |
235 | ||
236 | In general, functions in this class are allowed raise rados.Error | |
237 | or cephfs.Error exceptions in unexpected situations. | |
238 | """ | |
239 | ||
240 | # Current version | |
91327a77 | 241 | version = 4 |
7c673cae FG |
242 | |
243 | # Where shall we create our volumes? | |
244 | POOL_PREFIX = "fsvolume_" | |
245 | DEFAULT_VOL_PREFIX = "/volumes" | |
246 | DEFAULT_NS_PREFIX = "fsvolumens_" | |
247 | ||
11fdf7f2 TL |
248 | def __init__(self, auth_id=None, conf_path=None, cluster_name=None, |
249 | volume_prefix=None, pool_ns_prefix=None, rados=None, | |
250 | fs_name=None): | |
251 | """ | |
252 | Either set all three of ``auth_id``, ``conf_path`` and | |
253 | ``cluster_name`` (rados constructed on connect), or | |
254 | set ``rados`` (existing rados instance). | |
255 | """ | |
7c673cae | 256 | self.fs = None |
11fdf7f2 | 257 | self.fs_name = fs_name |
7c673cae | 258 | self.connected = False |
11fdf7f2 | 259 | |
7c673cae FG |
260 | self.conf_path = conf_path |
261 | self.cluster_name = cluster_name | |
262 | self.auth_id = auth_id | |
11fdf7f2 TL |
263 | |
264 | self.rados = rados | |
265 | if self.rados: | |
266 | # Using an externally owned rados, so we won't tear it down | |
267 | # on disconnect | |
268 | self.own_rados = False | |
269 | else: | |
270 | # self.rados will be constructed in connect | |
271 | self.own_rados = True | |
272 | ||
7c673cae FG |
273 | self.volume_prefix = volume_prefix if volume_prefix else self.DEFAULT_VOL_PREFIX |
274 | self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else self.DEFAULT_NS_PREFIX | |
275 | # For flock'ing in cephfs, I want a unique ID to distinguish me | |
276 | # from any other manila-share services that are loading this module. | |
277 | # We could use pid, but that's unnecessary weak: generate a | |
278 | # UUID | |
91327a77 | 279 | self._id = struct.unpack(">Q", uuid.uuid1().bytes[0:8])[0] |
7c673cae FG |
280 | |
281 | # TODO: version the on-disk structures | |
282 | ||
283 | def recover(self): | |
284 | # Scan all auth keys to see if they're dirty: if they are, they have | |
285 | # state that might not have propagated to Ceph or to the related | |
286 | # volumes yet. | |
287 | ||
288 | # Important: we *always* acquire locks in the order auth->volume | |
289 | # That means a volume can never be dirty without the auth key | |
290 | # we're updating it with being dirty at the same time. | |
291 | ||
292 | # First list the auth IDs that have potentially dirty on-disk metadata | |
293 | log.debug("Recovering from partial auth updates (if any)...") | |
294 | ||
295 | try: | |
296 | dir_handle = self.fs.opendir(self.volume_prefix) | |
297 | except cephfs.ObjectNotFound: | |
298 | log.debug("Nothing to recover. No auth meta files.") | |
299 | return | |
300 | ||
301 | d = self.fs.readdir(dir_handle) | |
302 | auth_ids = [] | |
303 | ||
304 | if not d: | |
305 | log.debug("Nothing to recover. No auth meta files.") | |
306 | ||
307 | while d: | |
308 | # Identify auth IDs from auth meta filenames. The auth meta files | |
309 | # are named as, "$<auth_id><meta filename extension>" | |
310 | regex = "^\$(.*){0}$".format(re.escape(META_FILE_EXT)) | |
494da23a | 311 | match = re.search(regex, d.d_name.decode(encoding='utf-8')) |
7c673cae FG |
312 | if match: |
313 | auth_ids.append(match.group(1)) | |
314 | ||
315 | d = self.fs.readdir(dir_handle) | |
316 | ||
317 | self.fs.closedir(dir_handle) | |
318 | ||
319 | # Key points based on ordering: | |
320 | # * Anything added in VMeta is already added in AMeta | |
321 | # * Anything added in Ceph is already added in VMeta | |
322 | # * Anything removed in VMeta is already removed in Ceph | |
323 | # * Anything removed in AMeta is already removed in VMeta | |
324 | ||
325 | # Deauthorization: because I only update metadata AFTER the | |
326 | # update of the next level down, I have the same ordering of | |
327 | # -> things which exist in the AMeta should also exist | |
328 | # in the VMeta, should also exist in Ceph, and the same | |
329 | # recovery procedure that gets me consistent after crashes | |
330 | # during authorization will also work during deauthorization | |
331 | ||
332 | # Now for each auth ID, check for dirty flag and apply updates | |
333 | # if dirty flag is found | |
334 | for auth_id in auth_ids: | |
335 | with self._auth_lock(auth_id): | |
336 | auth_meta = self._auth_metadata_get(auth_id) | |
337 | if not auth_meta or not auth_meta['volumes']: | |
338 | # Clean up auth meta file | |
339 | self.fs.unlink(self._auth_metadata_path(auth_id)) | |
340 | continue | |
341 | if not auth_meta['dirty']: | |
342 | continue | |
343 | self._recover_auth_meta(auth_id, auth_meta) | |
344 | ||
345 | log.debug("Recovered from partial auth updates (if any).") | |
346 | ||
347 | def _recover_auth_meta(self, auth_id, auth_meta): | |
348 | """ | |
349 | Call me after locking the auth meta file. | |
350 | """ | |
351 | remove_volumes = [] | |
352 | ||
353 | for volume, volume_data in auth_meta['volumes'].items(): | |
354 | if not volume_data['dirty']: | |
355 | continue | |
356 | ||
357 | (group_id, volume_id) = volume.split('/') | |
358 | group_id = group_id if group_id is not 'None' else None | |
359 | volume_path = VolumePath(group_id, volume_id) | |
360 | access_level = volume_data['access_level'] | |
361 | ||
362 | with self._volume_lock(volume_path): | |
363 | vol_meta = self._volume_metadata_get(volume_path) | |
364 | ||
365 | # No VMeta update indicates that there was no auth update | |
366 | # in Ceph either. So it's safe to remove corresponding | |
367 | # partial update in AMeta. | |
368 | if not vol_meta or auth_id not in vol_meta['auths']: | |
369 | remove_volumes.append(volume) | |
370 | continue | |
371 | ||
372 | want_auth = { | |
373 | 'access_level': access_level, | |
374 | 'dirty': False, | |
375 | } | |
376 | # VMeta update looks clean. Ceph auth update must have been | |
377 | # clean. | |
378 | if vol_meta['auths'][auth_id] == want_auth: | |
379 | continue | |
380 | ||
381 | readonly = True if access_level is 'r' else False | |
382 | self._authorize_volume(volume_path, auth_id, readonly) | |
383 | ||
384 | # Recovered from partial auth updates for the auth ID's access | |
385 | # to a volume. | |
386 | auth_meta['volumes'][volume]['dirty'] = False | |
387 | self._auth_metadata_set(auth_id, auth_meta) | |
388 | ||
389 | for volume in remove_volumes: | |
390 | del auth_meta['volumes'][volume] | |
391 | ||
392 | if not auth_meta['volumes']: | |
393 | # Clean up auth meta file | |
394 | self.fs.unlink(self._auth_metadata_path(auth_id)) | |
395 | return | |
396 | ||
397 | # Recovered from all partial auth updates for the auth ID. | |
398 | auth_meta['dirty'] = False | |
399 | self._auth_metadata_set(auth_id, auth_meta) | |
400 | ||
11fdf7f2 TL |
401 | def get_mds_map(self): |
402 | fs_map = self._rados_command("fs dump", {}) | |
403 | return fs_map['filesystems'][0]['mdsmap'] | |
7c673cae FG |
404 | |
405 | def evict(self, auth_id, timeout=30, volume_path=None): | |
406 | """ | |
407 | Evict all clients based on the authorization ID and optionally based on | |
408 | the volume path mounted. Assumes that the authorization key has been | |
409 | revoked prior to calling this function. | |
410 | ||
411 | This operation can throw an exception if the mon cluster is unresponsive, or | |
412 | any individual MDS daemon is unresponsive for longer than the timeout passed in. | |
413 | """ | |
414 | ||
415 | client_spec = ["auth_name={0}".format(auth_id), ] | |
416 | if volume_path: | |
417 | client_spec.append("client_metadata.root={0}". | |
418 | format(self._get_path(volume_path))) | |
419 | ||
420 | log.info("evict clients with {0}".format(', '.join(client_spec))) | |
421 | ||
11fdf7f2 | 422 | mds_map = self.get_mds_map() |
7c673cae FG |
423 | up = {} |
424 | for name, gid in mds_map['up'].items(): | |
425 | # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0" | |
426 | assert name.startswith("mds_") | |
427 | up[int(name[4:])] = gid | |
428 | ||
429 | # For all MDS ranks held by a daemon | |
430 | # Do the parallelism in python instead of using "tell mds.*", because | |
431 | # the latter doesn't give us per-mds output | |
432 | threads = [] | |
433 | for rank, gid in up.items(): | |
434 | thread = RankEvicter(self, client_spec, rank, gid, mds_map, | |
435 | timeout) | |
436 | thread.start() | |
437 | threads.append(thread) | |
438 | ||
439 | for t in threads: | |
440 | t.join() | |
441 | ||
442 | log.info("evict: joined all") | |
443 | ||
444 | for t in threads: | |
445 | if not t.success: | |
446 | msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}". | |
447 | format(', '.join(client_spec), t.rank, t.gid, t.exception) | |
448 | ) | |
449 | log.error(msg) | |
450 | raise EvictionError(msg) | |
451 | ||
452 | def _get_path(self, volume_path): | |
453 | """ | |
454 | Determine the path within CephFS where this volume will live | |
455 | :return: absolute path (string) | |
456 | """ | |
457 | return os.path.join( | |
458 | self.volume_prefix, | |
459 | volume_path.group_id if volume_path.group_id is not None else NO_GROUP_NAME, | |
460 | volume_path.volume_id) | |
461 | ||
462 | def _get_group_path(self, group_id): | |
463 | if group_id is None: | |
464 | raise ValueError("group_id may not be None") | |
465 | ||
466 | return os.path.join( | |
467 | self.volume_prefix, | |
468 | group_id | |
469 | ) | |
470 | ||
11fdf7f2 | 471 | def _connect(self, premount_evict): |
7c673cae FG |
472 | log.debug("Connecting to cephfs...") |
473 | self.fs = cephfs.LibCephFS(rados_inst=self.rados) | |
474 | log.debug("CephFS initializing...") | |
475 | self.fs.init() | |
476 | if premount_evict is not None: | |
477 | log.debug("Premount eviction of {0} starting".format(premount_evict)) | |
478 | self.evict(premount_evict) | |
479 | log.debug("Premount eviction of {0} completes".format(premount_evict)) | |
480 | log.debug("CephFS mounting...") | |
eafe8130 | 481 | self.fs.mount(filesystem_name=to_bytes(self.fs_name)) |
7c673cae FG |
482 | log.debug("Connection to cephfs complete") |
483 | ||
484 | # Recover from partial auth updates due to a previous | |
485 | # crash. | |
486 | self.recover() | |
487 | ||
11fdf7f2 TL |
488 | def connect(self, premount_evict = None): |
489 | """ | |
490 | ||
491 | :param premount_evict: Optional auth_id to evict before mounting the filesystem: callers | |
492 | may want to use this to specify their own auth ID if they expect | |
493 | to be a unique instance and don't want to wait for caps to time | |
494 | out after failure of another instance of themselves. | |
495 | """ | |
496 | if self.own_rados: | |
497 | log.debug("Configuring to RADOS with config {0}...".format(self.conf_path)) | |
498 | self.rados = rados.Rados( | |
499 | name="client.{0}".format(self.auth_id), | |
500 | clustername=self.cluster_name, | |
501 | conffile=self.conf_path, | |
502 | conf={} | |
503 | ) | |
504 | if self.rados.state != "connected": | |
505 | log.debug("Connecting to RADOS...") | |
506 | self.rados.connect() | |
507 | log.debug("Connection to RADOS complete") | |
508 | self._connect(premount_evict) | |
509 | ||
7c673cae FG |
510 | def get_mon_addrs(self): |
511 | log.info("get_mon_addrs") | |
512 | result = [] | |
513 | mon_map = self._rados_command("mon dump") | |
514 | for mon in mon_map['mons']: | |
515 | ip_port = mon['addr'].split("/")[0] | |
516 | result.append(ip_port) | |
517 | ||
518 | return result | |
519 | ||
520 | def disconnect(self): | |
521 | log.info("disconnect") | |
522 | if self.fs: | |
523 | log.debug("Disconnecting cephfs...") | |
524 | self.fs.shutdown() | |
525 | self.fs = None | |
526 | log.debug("Disconnecting cephfs complete") | |
527 | ||
11fdf7f2 | 528 | if self.rados and self.own_rados: |
7c673cae FG |
529 | log.debug("Disconnecting rados...") |
530 | self.rados.shutdown() | |
531 | self.rados = None | |
532 | log.debug("Disconnecting rados complete") | |
533 | ||
11fdf7f2 TL |
534 | def __enter__(self): |
535 | self.connect() | |
536 | return self | |
537 | ||
538 | def __exit__(self, exc_type, exc_val, exc_tb): | |
539 | self.disconnect() | |
540 | ||
7c673cae FG |
541 | def __del__(self): |
542 | self.disconnect() | |
543 | ||
544 | def _get_pool_id(self, osd_map, pool_name): | |
545 | # Maybe borrow the OSDMap wrapper class from calamari if more helpers | |
546 | # like this are needed. | |
547 | for pool in osd_map['pools']: | |
548 | if pool['pool_name'] == pool_name: | |
549 | return pool['pool'] | |
550 | ||
551 | return None | |
552 | ||
553 | def _create_volume_pool(self, pool_name): | |
554 | """ | |
555 | Idempotently create a pool for use as a CephFS data pool, with the given name | |
556 | ||
557 | :return The ID of the created pool | |
558 | """ | |
559 | osd_map = self._rados_command('osd dump', {}) | |
560 | ||
561 | existing_id = self._get_pool_id(osd_map, pool_name) | |
562 | if existing_id is not None: | |
563 | log.info("Pool {0} already exists".format(pool_name)) | |
564 | return existing_id | |
565 | ||
7c673cae FG |
566 | self._rados_command( |
567 | 'osd pool create', | |
568 | { | |
569 | 'pool': pool_name, | |
7c673cae FG |
570 | } |
571 | ) | |
572 | ||
573 | osd_map = self._rados_command('osd dump', {}) | |
574 | pool_id = self._get_pool_id(osd_map, pool_name) | |
575 | ||
576 | if pool_id is None: | |
577 | # If the pool isn't there, that's either a ceph bug, or it's some outside influence | |
578 | # removing it right after we created it. | |
579 | log.error("OSD map doesn't contain expected pool '{0}':\n{1}".format( | |
580 | pool_name, json.dumps(osd_map, indent=2) | |
581 | )) | |
582 | raise RuntimeError("Pool '{0}' not present in map after creation".format(pool_name)) | |
583 | else: | |
584 | return pool_id | |
585 | ||
f64942e4 | 586 | def create_group(self, group_id, mode=0o755): |
7c673cae FG |
587 | # Prevent craftily-named volume groups from colliding with the meta |
588 | # files. | |
589 | if group_id.endswith(META_FILE_EXT): | |
590 | raise ValueError("group ID cannot end with '{0}'.".format( | |
591 | META_FILE_EXT)) | |
592 | path = self._get_group_path(group_id) | |
f64942e4 | 593 | self._mkdir_p(path, mode) |
7c673cae FG |
594 | |
595 | def destroy_group(self, group_id): | |
596 | path = self._get_group_path(group_id) | |
597 | try: | |
598 | self.fs.stat(self.volume_prefix) | |
599 | except cephfs.ObjectNotFound: | |
600 | pass | |
601 | else: | |
602 | self.fs.rmdir(path) | |
603 | ||
f64942e4 | 604 | def _mkdir_p(self, path, mode=0o755): |
7c673cae FG |
605 | try: |
606 | self.fs.stat(path) | |
607 | except cephfs.ObjectNotFound: | |
608 | pass | |
609 | else: | |
610 | return | |
611 | ||
612 | parts = path.split(os.path.sep) | |
613 | ||
614 | for i in range(1, len(parts) + 1): | |
615 | subpath = os.path.join(*parts[0:i]) | |
616 | try: | |
617 | self.fs.stat(subpath) | |
618 | except cephfs.ObjectNotFound: | |
f64942e4 | 619 | self.fs.mkdir(subpath, mode) |
7c673cae | 620 | |
f64942e4 AA |
621 | def create_volume(self, volume_path, size=None, data_isolated=False, namespace_isolated=True, |
622 | mode=0o755): | |
7c673cae FG |
623 | """ |
624 | Set up metadata, pools and auth for a volume. | |
625 | ||
626 | This function is idempotent. It is safe to call this again | |
627 | for an already-created volume, even if it is in use. | |
628 | ||
629 | :param volume_path: VolumePath instance | |
630 | :param size: In bytes, or None for no size limit | |
631 | :param data_isolated: If true, create a separate OSD pool for this volume | |
28e407b8 | 632 | :param namespace_isolated: If true, use separate RADOS namespace for this volume |
7c673cae FG |
633 | :return: |
634 | """ | |
635 | path = self._get_path(volume_path) | |
636 | log.info("create_volume: {0}".format(path)) | |
637 | ||
f64942e4 | 638 | self._mkdir_p(path, mode) |
7c673cae FG |
639 | |
640 | if size is not None: | |
91327a77 | 641 | self.fs.setxattr(path, 'ceph.quota.max_bytes', to_bytes(size), 0) |
7c673cae FG |
642 | |
643 | # data_isolated means create a separate pool for this volume | |
644 | if data_isolated: | |
645 | pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id) | |
646 | log.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path, pool_name)) | |
647 | pool_id = self._create_volume_pool(pool_name) | |
11fdf7f2 | 648 | mds_map = self.get_mds_map() |
7c673cae | 649 | if pool_id not in mds_map['data_pools']: |
11fdf7f2 TL |
650 | self._rados_command("fs add_data_pool", { |
651 | 'fs_name': mds_map['fs_name'], | |
7c673cae FG |
652 | 'pool': pool_name |
653 | }) | |
91327a77 AA |
654 | time.sleep(5) # time for MDSMap to be distributed |
655 | self.fs.setxattr(path, 'ceph.dir.layout.pool', to_bytes(pool_name), 0) | |
7c673cae | 656 | |
28e407b8 AA |
657 | # enforce security isolation, use separate namespace for this volume |
658 | if namespace_isolated: | |
659 | namespace = "{0}{1}".format(self.pool_ns_prefix, volume_path.volume_id) | |
660 | log.info("create_volume: {0}, using rados namespace {1} to isolate data.".format(volume_path, namespace)) | |
91327a77 AA |
661 | self.fs.setxattr(path, 'ceph.dir.layout.pool_namespace', |
662 | to_bytes(namespace), 0) | |
28e407b8 AA |
663 | else: |
664 | # If volume's namespace layout is not set, then the volume's pool | |
665 | # layout remains unset and will undesirably change with ancestor's | |
666 | # pool layout changes. | |
667 | pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool") | |
91327a77 AA |
668 | self.fs.setxattr(path, 'ceph.dir.layout.pool', |
669 | to_bytes(pool_name), 0) | |
7c673cae FG |
670 | |
671 | # Create a volume meta file, if it does not already exist, to store | |
672 | # data about auth ids having access to the volume | |
673 | fd = self.fs.open(self._volume_metadata_path(volume_path), | |
674 | os.O_CREAT, 0o755) | |
675 | self.fs.close(fd) | |
676 | ||
677 | return { | |
678 | 'mount_path': path | |
679 | } | |
680 | ||
681 | def delete_volume(self, volume_path, data_isolated=False): | |
682 | """ | |
683 | Make a volume inaccessible to guests. This function is | |
684 | idempotent. This is the fast part of tearing down a volume: you must | |
685 | also later call purge_volume, which is the slow part. | |
686 | ||
687 | :param volume_path: Same identifier used in create_volume | |
688 | :return: | |
689 | """ | |
690 | ||
691 | path = self._get_path(volume_path) | |
692 | log.info("delete_volume: {0}".format(path)) | |
693 | ||
694 | # Create the trash folder if it doesn't already exist | |
695 | trash = os.path.join(self.volume_prefix, "_deleting") | |
696 | self._mkdir_p(trash) | |
697 | ||
698 | # We'll move it to here | |
699 | trashed_volume = os.path.join(trash, volume_path.volume_id) | |
700 | ||
701 | # Move the volume's data to the trash folder | |
702 | try: | |
703 | self.fs.stat(path) | |
704 | except cephfs.ObjectNotFound: | |
705 | log.warning("Trying to delete volume '{0}' but it's already gone".format( | |
706 | path)) | |
707 | else: | |
708 | self.fs.rename(path, trashed_volume) | |
709 | ||
710 | # Delete the volume meta file, if it's not already deleted | |
711 | vol_meta_path = self._volume_metadata_path(volume_path) | |
712 | try: | |
713 | self.fs.unlink(vol_meta_path) | |
714 | except cephfs.ObjectNotFound: | |
715 | pass | |
716 | ||
717 | def purge_volume(self, volume_path, data_isolated=False): | |
718 | """ | |
719 | Finish clearing up a volume that was previously passed to delete_volume. This | |
720 | function is idempotent. | |
721 | """ | |
722 | ||
723 | trash = os.path.join(self.volume_prefix, "_deleting") | |
724 | trashed_volume = os.path.join(trash, volume_path.volume_id) | |
725 | ||
726 | try: | |
727 | self.fs.stat(trashed_volume) | |
728 | except cephfs.ObjectNotFound: | |
729 | log.warning("Trying to purge volume '{0}' but it's already been purged".format( | |
730 | trashed_volume)) | |
731 | return | |
732 | ||
733 | def rmtree(root_path): | |
734 | log.debug("rmtree {0}".format(root_path)) | |
735 | dir_handle = self.fs.opendir(root_path) | |
736 | d = self.fs.readdir(dir_handle) | |
737 | while d: | |
494da23a TL |
738 | d_name = d.d_name.decode(encoding='utf-8') |
739 | if d_name not in [".", ".."]: | |
7c673cae FG |
740 | # Do not use os.path.join because it is sensitive |
741 | # to string encoding, we just pass through dnames | |
742 | # as byte arrays | |
494da23a | 743 | d_full = u"{0}/{1}".format(root_path, d_name) |
7c673cae FG |
744 | if d.is_dir(): |
745 | rmtree(d_full) | |
746 | else: | |
747 | self.fs.unlink(d_full) | |
748 | ||
749 | d = self.fs.readdir(dir_handle) | |
750 | self.fs.closedir(dir_handle) | |
751 | ||
752 | self.fs.rmdir(root_path) | |
753 | ||
754 | rmtree(trashed_volume) | |
755 | ||
756 | if data_isolated: | |
757 | pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id) | |
758 | osd_map = self._rados_command("osd dump", {}) | |
759 | pool_id = self._get_pool_id(osd_map, pool_name) | |
11fdf7f2 | 760 | mds_map = self.get_mds_map() |
7c673cae | 761 | if pool_id in mds_map['data_pools']: |
11fdf7f2 TL |
762 | self._rados_command("fs rm_data_pool", { |
763 | 'fs_name': mds_map['fs_name'], | |
7c673cae FG |
764 | 'pool': pool_name |
765 | }) | |
766 | self._rados_command("osd pool delete", | |
767 | { | |
768 | "pool": pool_name, | |
769 | "pool2": pool_name, | |
11fdf7f2 | 770 | "yes_i_really_really_mean_it": True |
7c673cae FG |
771 | }) |
772 | ||
773 | def _get_ancestor_xattr(self, path, attr): | |
774 | """ | |
775 | Helper for reading layout information: if this xattr is missing | |
776 | on the requested path, keep checking parents until we find it. | |
777 | """ | |
778 | try: | |
91327a77 | 779 | result = self.fs.getxattr(path, attr).decode() |
7c673cae FG |
780 | if result == "": |
781 | # Annoying! cephfs gives us empty instead of an error when attr not found | |
782 | raise cephfs.NoData() | |
783 | else: | |
784 | return result | |
785 | except cephfs.NoData: | |
786 | if path == "/": | |
787 | raise | |
788 | else: | |
789 | return self._get_ancestor_xattr(os.path.split(path)[0], attr) | |
790 | ||
791 | def _check_compat_version(self, compat_version): | |
792 | if self.version < compat_version: | |
793 | msg = ("The current version of CephFSVolumeClient, version {0} " | |
794 | "does not support the required feature. Need version {1} " | |
795 | "or greater".format(self.version, compat_version) | |
796 | ) | |
797 | log.error(msg) | |
798 | raise CephFSVolumeClientError(msg) | |
799 | ||
800 | def _metadata_get(self, path): | |
801 | """ | |
802 | Return a deserialized JSON object, or None | |
803 | """ | |
804 | fd = self.fs.open(path, "r") | |
805 | # TODO iterate instead of assuming file < 4MB | |
806 | read_bytes = self.fs.read(fd, 0, 4096 * 1024) | |
807 | self.fs.close(fd) | |
808 | if read_bytes: | |
91327a77 | 809 | return json.loads(read_bytes.decode()) |
7c673cae FG |
810 | else: |
811 | return None | |
812 | ||
813 | def _metadata_set(self, path, data): | |
814 | serialized = json.dumps(data) | |
815 | fd = self.fs.open(path, "w") | |
816 | try: | |
91327a77 | 817 | self.fs.write(fd, to_bytes(serialized), 0) |
7c673cae FG |
818 | self.fs.fsync(fd, 0) |
819 | finally: | |
820 | self.fs.close(fd) | |
821 | ||
822 | def _lock(self, path): | |
823 | @contextmanager | |
824 | def fn(): | |
825 | while(1): | |
826 | fd = self.fs.open(path, os.O_CREAT, 0o755) | |
827 | self.fs.flock(fd, fcntl.LOCK_EX, self._id) | |
828 | ||
829 | # The locked file will be cleaned up sometime. It could be | |
830 | # unlinked e.g., by an another manila share instance, before | |
831 | # lock was applied on it. Perform checks to ensure that this | |
832 | # does not happen. | |
833 | try: | |
834 | statbuf = self.fs.stat(path) | |
835 | except cephfs.ObjectNotFound: | |
836 | self.fs.close(fd) | |
837 | continue | |
838 | ||
839 | fstatbuf = self.fs.fstat(fd) | |
840 | if statbuf.st_ino == fstatbuf.st_ino: | |
841 | break | |
842 | ||
843 | try: | |
844 | yield | |
845 | finally: | |
846 | self.fs.flock(fd, fcntl.LOCK_UN, self._id) | |
847 | self.fs.close(fd) | |
848 | ||
849 | return fn() | |
850 | ||
851 | def _auth_metadata_path(self, auth_id): | |
852 | return os.path.join(self.volume_prefix, "${0}{1}".format( | |
853 | auth_id, META_FILE_EXT)) | |
854 | ||
855 | def _auth_lock(self, auth_id): | |
856 | return self._lock(self._auth_metadata_path(auth_id)) | |
857 | ||
858 | def _auth_metadata_get(self, auth_id): | |
859 | """ | |
860 | Call me with the metadata locked! | |
861 | ||
862 | Check whether a auth metadata structure can be decoded by the current | |
863 | version of CephFSVolumeClient. | |
864 | ||
865 | Return auth metadata that the current version of CephFSVolumeClient | |
866 | can decode. | |
867 | """ | |
868 | auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id)) | |
869 | ||
870 | if auth_metadata: | |
871 | self._check_compat_version(auth_metadata['compat_version']) | |
872 | ||
873 | return auth_metadata | |
874 | ||
875 | def _auth_metadata_set(self, auth_id, data): | |
876 | """ | |
877 | Call me with the metadata locked! | |
878 | ||
879 | Fsync the auth metadata. | |
880 | ||
881 | Add two version attributes to the auth metadata, | |
882 | 'compat_version', the minimum CephFSVolumeClient version that can | |
883 | decode the metadata, and 'version', the CephFSVolumeClient version | |
884 | that encoded the metadata. | |
885 | """ | |
886 | data['compat_version'] = 1 | |
3efd9988 | 887 | data['version'] = self.version |
7c673cae FG |
888 | return self._metadata_set(self._auth_metadata_path(auth_id), data) |
889 | ||
890 | def _volume_metadata_path(self, volume_path): | |
891 | return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format( | |
892 | volume_path.group_id if volume_path.group_id else "", | |
893 | volume_path.volume_id, | |
894 | META_FILE_EXT | |
895 | )) | |
896 | ||
897 | def _volume_lock(self, volume_path): | |
898 | """ | |
899 | Return a ContextManager which locks the authorization metadata for | |
900 | a particular volume, and persists a flag to the metadata indicating | |
901 | that it is currently locked, so that we can detect dirty situations | |
902 | during recovery. | |
903 | ||
904 | This lock isn't just to make access to the metadata safe: it's also | |
905 | designed to be used over the two-step process of checking the | |
906 | metadata and then responding to an authorization request, to | |
907 | ensure that at the point we respond the metadata hasn't changed | |
908 | in the background. It's key to how we avoid security holes | |
909 | resulting from races during that problem , | |
910 | """ | |
911 | return self._lock(self._volume_metadata_path(volume_path)) | |
912 | ||
913 | def _volume_metadata_get(self, volume_path): | |
914 | """ | |
915 | Call me with the metadata locked! | |
916 | ||
917 | Check whether a volume metadata structure can be decoded by the current | |
918 | version of CephFSVolumeClient. | |
919 | ||
920 | Return a volume_metadata structure that the current version of | |
921 | CephFSVolumeClient can decode. | |
922 | """ | |
923 | volume_metadata = self._metadata_get(self._volume_metadata_path(volume_path)) | |
924 | ||
925 | if volume_metadata: | |
926 | self._check_compat_version(volume_metadata['compat_version']) | |
927 | ||
928 | return volume_metadata | |
929 | ||
930 | def _volume_metadata_set(self, volume_path, data): | |
931 | """ | |
932 | Call me with the metadata locked! | |
933 | ||
934 | Add two version attributes to the volume metadata, | |
935 | 'compat_version', the minimum CephFSVolumeClient version that can | |
936 | decode the metadata and 'version', the CephFSVolumeClient version | |
937 | that encoded the metadata. | |
938 | """ | |
939 | data['compat_version'] = 1 | |
3efd9988 | 940 | data['version'] = self.version |
7c673cae FG |
941 | return self._metadata_set(self._volume_metadata_path(volume_path), data) |
942 | ||
943 | def authorize(self, volume_path, auth_id, readonly=False, tenant_id=None): | |
944 | """ | |
945 | Get-or-create a Ceph auth identity for `auth_id` and grant them access | |
946 | to | |
947 | :param volume_path: | |
948 | :param auth_id: | |
949 | :param readonly: | |
950 | :param tenant_id: Optionally provide a stringizable object to | |
951 | restrict any created cephx IDs to other callers | |
952 | passing the same tenant ID. | |
953 | :return: | |
954 | """ | |
955 | ||
956 | with self._auth_lock(auth_id): | |
957 | # Existing meta, or None, to be updated | |
958 | auth_meta = self._auth_metadata_get(auth_id) | |
959 | ||
960 | # volume data to be inserted | |
961 | volume_path_str = str(volume_path) | |
962 | volume = { | |
963 | volume_path_str : { | |
964 | # The access level at which the auth_id is authorized to | |
965 | # access the volume. | |
966 | 'access_level': 'r' if readonly else 'rw', | |
967 | 'dirty': True, | |
968 | } | |
969 | } | |
970 | if auth_meta is None: | |
971 | sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format( | |
972 | auth_id, tenant_id | |
973 | )) | |
974 | log.debug("Authorize: no existing meta") | |
975 | auth_meta = { | |
976 | 'dirty': True, | |
977 | 'tenant_id': tenant_id.__str__() if tenant_id else None, | |
978 | 'volumes': volume | |
979 | } | |
980 | ||
981 | # Note: this is *not* guaranteeing that the key doesn't already | |
982 | # exist in Ceph: we are allowing VolumeClient tenants to | |
983 | # 'claim' existing Ceph keys. In order to prevent VolumeClient | |
984 | # tenants from reading e.g. client.admin keys, you need to | |
985 | # have configured your VolumeClient user (e.g. Manila) to | |
986 | # have mon auth caps that prevent it from accessing those keys | |
987 | # (e.g. limit it to only access keys with a manila.* prefix) | |
988 | else: | |
989 | # Disallow tenants to share auth IDs | |
990 | if auth_meta['tenant_id'].__str__() != tenant_id.__str__(): | |
991 | msg = "auth ID: {0} is already in use".format(auth_id) | |
992 | log.error(msg) | |
993 | raise CephFSVolumeClientError(msg) | |
994 | ||
995 | if auth_meta['dirty']: | |
996 | self._recover_auth_meta(auth_id, auth_meta) | |
997 | ||
998 | log.debug("Authorize: existing tenant {tenant}".format( | |
999 | tenant=auth_meta['tenant_id'] | |
1000 | )) | |
1001 | auth_meta['dirty'] = True | |
1002 | auth_meta['volumes'].update(volume) | |
1003 | ||
1004 | self._auth_metadata_set(auth_id, auth_meta) | |
1005 | ||
1006 | with self._volume_lock(volume_path): | |
1007 | key = self._authorize_volume(volume_path, auth_id, readonly) | |
1008 | ||
1009 | auth_meta['dirty'] = False | |
1010 | auth_meta['volumes'][volume_path_str]['dirty'] = False | |
1011 | self._auth_metadata_set(auth_id, auth_meta) | |
1012 | ||
1013 | if tenant_id: | |
1014 | return { | |
1015 | 'auth_key': key | |
1016 | } | |
1017 | else: | |
1018 | # Caller wasn't multi-tenant aware: be safe and don't give | |
1019 | # them a key | |
1020 | return { | |
1021 | 'auth_key': None | |
1022 | } | |
1023 | ||
1024 | def _authorize_volume(self, volume_path, auth_id, readonly): | |
1025 | vol_meta = self._volume_metadata_get(volume_path) | |
1026 | ||
1027 | access_level = 'r' if readonly else 'rw' | |
1028 | auth = { | |
1029 | auth_id: { | |
1030 | 'access_level': access_level, | |
1031 | 'dirty': True, | |
1032 | } | |
1033 | } | |
1034 | ||
1035 | if vol_meta is None: | |
1036 | vol_meta = { | |
1037 | 'auths': auth | |
1038 | } | |
1039 | else: | |
1040 | vol_meta['auths'].update(auth) | |
1041 | self._volume_metadata_set(volume_path, vol_meta) | |
1042 | ||
1043 | key = self._authorize_ceph(volume_path, auth_id, readonly) | |
1044 | ||
1045 | vol_meta['auths'][auth_id]['dirty'] = False | |
1046 | self._volume_metadata_set(volume_path, vol_meta) | |
1047 | ||
1048 | return key | |
1049 | ||
1050 | def _authorize_ceph(self, volume_path, auth_id, readonly): | |
1051 | path = self._get_path(volume_path) | |
1052 | log.debug("Authorizing Ceph id '{0}' for path '{1}'".format( | |
1053 | auth_id, path | |
1054 | )) | |
1055 | ||
1056 | # First I need to work out what the data pool is for this share: | |
1057 | # read the layout | |
1058 | pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool") | |
28e407b8 AA |
1059 | |
1060 | try: | |
91327a77 AA |
1061 | namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_" |
1062 | "namespace").decode() | |
28e407b8 AA |
1063 | except cephfs.NoData: |
1064 | namespace = None | |
7c673cae FG |
1065 | |
1066 | # Now construct auth capabilities that give the guest just enough | |
1067 | # permissions to access the share | |
1068 | client_entity = "client.{0}".format(auth_id) | |
1069 | want_access_level = 'r' if readonly else 'rw' | |
1070 | want_mds_cap = 'allow {0} path={1}'.format(want_access_level, path) | |
28e407b8 AA |
1071 | if namespace: |
1072 | want_osd_cap = 'allow {0} pool={1} namespace={2}'.format( | |
1073 | want_access_level, pool_name, namespace) | |
1074 | else: | |
1075 | want_osd_cap = 'allow {0} pool={1}'.format(want_access_level, | |
1076 | pool_name) | |
7c673cae FG |
1077 | |
1078 | try: | |
1079 | existing = self._rados_command( | |
1080 | 'auth get', | |
1081 | { | |
1082 | 'entity': client_entity | |
1083 | } | |
1084 | ) | |
1085 | # FIXME: rados raising Error instead of ObjectNotFound in auth get failure | |
1086 | except rados.Error: | |
1087 | caps = self._rados_command( | |
1088 | 'auth get-or-create', | |
1089 | { | |
1090 | 'entity': client_entity, | |
1091 | 'caps': [ | |
1092 | 'mds', want_mds_cap, | |
1093 | 'osd', want_osd_cap, | |
1094 | 'mon', 'allow r'] | |
1095 | }) | |
1096 | else: | |
1097 | # entity exists, update it | |
1098 | cap = existing[0] | |
1099 | ||
1100 | # Construct auth caps that if present might conflict with the desired | |
1101 | # auth caps. | |
1102 | unwanted_access_level = 'r' if want_access_level is 'rw' else 'rw' | |
1103 | unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, path) | |
28e407b8 AA |
1104 | if namespace: |
1105 | unwanted_osd_cap = 'allow {0} pool={1} namespace={2}'.format( | |
1106 | unwanted_access_level, pool_name, namespace) | |
1107 | else: | |
1108 | unwanted_osd_cap = 'allow {0} pool={1}'.format( | |
1109 | unwanted_access_level, pool_name) | |
1110 | ||
1111 | def cap_update( | |
1112 | orig_mds_caps, orig_osd_caps, want_mds_cap, | |
1113 | want_osd_cap, unwanted_mds_cap, unwanted_osd_cap): | |
7c673cae | 1114 | |
28e407b8 AA |
1115 | if not orig_mds_caps: |
1116 | return want_mds_cap, want_osd_cap | |
7c673cae | 1117 | |
28e407b8 AA |
1118 | mds_cap_tokens = orig_mds_caps.split(",") |
1119 | osd_cap_tokens = orig_osd_caps.split(",") | |
3efd9988 | 1120 | |
28e407b8 AA |
1121 | if want_mds_cap in mds_cap_tokens: |
1122 | return orig_mds_caps, orig_osd_caps | |
7c673cae | 1123 | |
28e407b8 AA |
1124 | if unwanted_mds_cap in mds_cap_tokens: |
1125 | mds_cap_tokens.remove(unwanted_mds_cap) | |
1126 | osd_cap_tokens.remove(unwanted_osd_cap) | |
7c673cae | 1127 | |
28e407b8 AA |
1128 | mds_cap_tokens.append(want_mds_cap) |
1129 | osd_cap_tokens.append(want_osd_cap) | |
7c673cae | 1130 | |
28e407b8 AA |
1131 | return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens) |
1132 | ||
1133 | orig_mds_caps = cap['caps'].get('mds', "") | |
1134 | orig_osd_caps = cap['caps'].get('osd', "") | |
1135 | ||
1136 | mds_cap_str, osd_cap_str = cap_update( | |
1137 | orig_mds_caps, orig_osd_caps, want_mds_cap, want_osd_cap, | |
1138 | unwanted_mds_cap, unwanted_osd_cap) | |
7c673cae FG |
1139 | |
1140 | caps = self._rados_command( | |
1141 | 'auth caps', | |
1142 | { | |
1143 | 'entity': client_entity, | |
1144 | 'caps': [ | |
1145 | 'mds', mds_cap_str, | |
1146 | 'osd', osd_cap_str, | |
1147 | 'mon', cap['caps'].get('mon', 'allow r')] | |
1148 | }) | |
1149 | caps = self._rados_command( | |
1150 | 'auth get', | |
1151 | { | |
1152 | 'entity': client_entity | |
1153 | } | |
1154 | ) | |
1155 | ||
1156 | # Result expected like this: | |
1157 | # [ | |
1158 | # { | |
1159 | # "entity": "client.foobar", | |
1160 | # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==", | |
1161 | # "caps": { | |
1162 | # "mds": "allow *", | |
1163 | # "mon": "allow *" | |
1164 | # } | |
1165 | # } | |
1166 | # ] | |
1167 | assert len(caps) == 1 | |
1168 | assert caps[0]['entity'] == client_entity | |
1169 | return caps[0]['key'] | |
1170 | ||
1171 | def deauthorize(self, volume_path, auth_id): | |
1172 | with self._auth_lock(auth_id): | |
1173 | # Existing meta, or None, to be updated | |
1174 | auth_meta = self._auth_metadata_get(auth_id) | |
1175 | ||
1176 | volume_path_str = str(volume_path) | |
1177 | if (auth_meta is None) or (not auth_meta['volumes']): | |
1178 | log.warn("deauthorized called for already-removed auth" | |
1179 | "ID '{auth_id}' for volume ID '{volume}'".format( | |
1180 | auth_id=auth_id, volume=volume_path.volume_id | |
1181 | )) | |
1182 | # Clean up the auth meta file of an auth ID | |
1183 | self.fs.unlink(self._auth_metadata_path(auth_id)) | |
1184 | return | |
1185 | ||
1186 | if volume_path_str not in auth_meta['volumes']: | |
1187 | log.warn("deauthorized called for already-removed auth" | |
1188 | "ID '{auth_id}' for volume ID '{volume}'".format( | |
1189 | auth_id=auth_id, volume=volume_path.volume_id | |
1190 | )) | |
1191 | return | |
1192 | ||
1193 | if auth_meta['dirty']: | |
1194 | self._recover_auth_meta(auth_id, auth_meta) | |
1195 | ||
1196 | auth_meta['dirty'] = True | |
1197 | auth_meta['volumes'][volume_path_str]['dirty'] = True | |
1198 | self._auth_metadata_set(auth_id, auth_meta) | |
1199 | ||
1200 | self._deauthorize_volume(volume_path, auth_id) | |
1201 | ||
1202 | # Filter out the volume we're deauthorizing | |
1203 | del auth_meta['volumes'][volume_path_str] | |
1204 | ||
1205 | # Clean up auth meta file | |
1206 | if not auth_meta['volumes']: | |
1207 | self.fs.unlink(self._auth_metadata_path(auth_id)) | |
1208 | return | |
1209 | ||
1210 | auth_meta['dirty'] = False | |
1211 | self._auth_metadata_set(auth_id, auth_meta) | |
1212 | ||
1213 | def _deauthorize_volume(self, volume_path, auth_id): | |
1214 | with self._volume_lock(volume_path): | |
1215 | vol_meta = self._volume_metadata_get(volume_path) | |
1216 | ||
1217 | if (vol_meta is None) or (auth_id not in vol_meta['auths']): | |
1218 | log.warn("deauthorized called for already-removed auth" | |
1219 | "ID '{auth_id}' for volume ID '{volume}'".format( | |
1220 | auth_id=auth_id, volume=volume_path.volume_id | |
1221 | )) | |
1222 | return | |
1223 | ||
1224 | vol_meta['auths'][auth_id]['dirty'] = True | |
1225 | self._volume_metadata_set(volume_path, vol_meta) | |
1226 | ||
1227 | self._deauthorize(volume_path, auth_id) | |
1228 | ||
1229 | # Remove the auth_id from the metadata *after* removing it | |
1230 | # from ceph, so that if we crashed here, we would actually | |
1231 | # recreate the auth ID during recovery (i.e. end up with | |
1232 | # a consistent state). | |
1233 | ||
1234 | # Filter out the auth we're removing | |
1235 | del vol_meta['auths'][auth_id] | |
1236 | self._volume_metadata_set(volume_path, vol_meta) | |
1237 | ||
1238 | def _deauthorize(self, volume_path, auth_id): | |
1239 | """ | |
1240 | The volume must still exist. | |
1241 | """ | |
1242 | client_entity = "client.{0}".format(auth_id) | |
1243 | path = self._get_path(volume_path) | |
1244 | pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool") | |
28e407b8 | 1245 | try: |
91327a77 AA |
1246 | namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_" |
1247 | "namespace").decode() | |
28e407b8 AA |
1248 | except cephfs.NoData: |
1249 | namespace = None | |
7c673cae FG |
1250 | |
1251 | # The auth_id might have read-only or read-write mount access for the | |
1252 | # volume path. | |
1253 | access_levels = ('r', 'rw') | |
28e407b8 AA |
1254 | want_mds_caps = ['allow {0} path={1}'.format(access_level, path) |
1255 | for access_level in access_levels] | |
1256 | if namespace: | |
1257 | want_osd_caps = ['allow {0} pool={1} namespace={2}'.format(access_level, pool_name, namespace) | |
1258 | for access_level in access_levels] | |
1259 | else: | |
1260 | want_osd_caps = ['allow {0} pool={1}'.format(access_level, pool_name) | |
1261 | for access_level in access_levels] | |
1262 | ||
7c673cae FG |
1263 | |
1264 | try: | |
1265 | existing = self._rados_command( | |
1266 | 'auth get', | |
1267 | { | |
1268 | 'entity': client_entity | |
1269 | } | |
1270 | ) | |
1271 | ||
28e407b8 AA |
1272 | def cap_remove(orig_mds_caps, orig_osd_caps, want_mds_caps, want_osd_caps): |
1273 | mds_cap_tokens = orig_mds_caps.split(",") | |
1274 | osd_cap_tokens = orig_osd_caps.split(",") | |
1275 | ||
1276 | for want_mds_cap, want_osd_cap in zip(want_mds_caps, want_osd_caps): | |
1277 | if want_mds_cap in mds_cap_tokens: | |
1278 | mds_cap_tokens.remove(want_mds_cap) | |
1279 | osd_cap_tokens.remove(want_osd_cap) | |
1280 | break | |
1281 | ||
1282 | return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens) | |
7c673cae FG |
1283 | |
1284 | cap = existing[0] | |
28e407b8 AA |
1285 | orig_mds_caps = cap['caps'].get('mds', "") |
1286 | orig_osd_caps = cap['caps'].get('osd', "") | |
1287 | mds_cap_str, osd_cap_str = cap_remove(orig_mds_caps, orig_osd_caps, | |
1288 | want_mds_caps, want_osd_caps) | |
1289 | ||
1290 | if not mds_cap_str: | |
7c673cae FG |
1291 | self._rados_command('auth del', {'entity': client_entity}, decode=False) |
1292 | else: | |
1293 | self._rados_command( | |
1294 | 'auth caps', | |
1295 | { | |
1296 | 'entity': client_entity, | |
1297 | 'caps': [ | |
1298 | 'mds', mds_cap_str, | |
1299 | 'osd', osd_cap_str, | |
1300 | 'mon', cap['caps'].get('mon', 'allow r')] | |
1301 | }) | |
1302 | ||
1303 | # FIXME: rados raising Error instead of ObjectNotFound in auth get failure | |
1304 | except rados.Error: | |
1305 | # Already gone, great. | |
1306 | return | |
1307 | ||
1308 | def get_authorized_ids(self, volume_path): | |
1309 | """ | |
1310 | Expose a list of auth IDs that have access to a volume. | |
1311 | ||
1312 | return: a list of (auth_id, access_level) tuples, where | |
1313 | the access_level can be 'r' , or 'rw'. | |
1314 | None if no auth ID is given access to the volume. | |
1315 | """ | |
1316 | with self._volume_lock(volume_path): | |
1317 | meta = self._volume_metadata_get(volume_path) | |
1318 | auths = [] | |
1319 | if not meta or not meta['auths']: | |
1320 | return None | |
1321 | ||
1322 | for auth, auth_data in meta['auths'].items(): | |
1323 | # Skip partial auth updates. | |
1324 | if not auth_data['dirty']: | |
1325 | auths.append((auth, auth_data['access_level'])) | |
1326 | ||
1327 | return auths | |
1328 | ||
1329 | def _rados_command(self, prefix, args=None, decode=True): | |
1330 | """ | |
1331 | Safer wrapper for ceph_argparse.json_command, which raises | |
1332 | Error exception instead of relying on caller to check return | |
1333 | codes. | |
1334 | ||
1335 | Error exception can result from: | |
1336 | * Timeout | |
1337 | * Actual legitimate errors | |
1338 | * Malformed JSON output | |
1339 | ||
1340 | return: Decoded object from ceph, or None if empty string returned. | |
1341 | If decode is False, return a string (the data returned by | |
1342 | ceph command) | |
1343 | """ | |
1344 | if args is None: | |
1345 | args = {} | |
1346 | ||
1347 | argdict = args.copy() | |
1348 | argdict['format'] = 'json' | |
1349 | ||
1350 | ret, outbuf, outs = json_command(self.rados, | |
1351 | prefix=prefix, | |
1352 | argdict=argdict, | |
1353 | timeout=RADOS_TIMEOUT) | |
1354 | if ret != 0: | |
1355 | raise rados.Error(outs) | |
1356 | else: | |
1357 | if decode: | |
1358 | if outbuf: | |
1359 | try: | |
91327a77 | 1360 | return json.loads(outbuf.decode()) |
7c673cae FG |
1361 | except (ValueError, TypeError): |
1362 | raise RadosError("Invalid JSON output for command {0}".format(argdict)) | |
1363 | else: | |
1364 | return None | |
1365 | else: | |
1366 | return outbuf | |
1367 | ||
1368 | def get_used_bytes(self, volume_path): | |
91327a77 AA |
1369 | return int(self.fs.getxattr(self._get_path(volume_path), "ceph.dir." |
1370 | "rbytes").decode()) | |
7c673cae FG |
1371 | |
1372 | def set_max_bytes(self, volume_path, max_bytes): | |
1373 | self.fs.setxattr(self._get_path(volume_path), 'ceph.quota.max_bytes', | |
91327a77 | 1374 | to_bytes(max_bytes if max_bytes else 0), 0) |
7c673cae FG |
1375 | |
1376 | def _snapshot_path(self, dir_path, snapshot_name): | |
1377 | return os.path.join( | |
3efd9988 | 1378 | dir_path, self.rados.conf_get('client_snapdir'), snapshot_name |
7c673cae FG |
1379 | ) |
1380 | ||
f64942e4 | 1381 | def _snapshot_create(self, dir_path, snapshot_name, mode=0o755): |
7c673cae | 1382 | # TODO: raise intelligible exception for clusters where snaps are disabled |
f64942e4 | 1383 | self.fs.mkdir(self._snapshot_path(dir_path, snapshot_name), mode) |
7c673cae FG |
1384 | |
1385 | def _snapshot_destroy(self, dir_path, snapshot_name): | |
1386 | """ | |
1387 | Remove a snapshot, or do nothing if it already doesn't exist. | |
1388 | """ | |
1389 | try: | |
1390 | self.fs.rmdir(self._snapshot_path(dir_path, snapshot_name)) | |
1391 | except cephfs.ObjectNotFound: | |
1392 | log.warn("Snapshot was already gone: {0}".format(snapshot_name)) | |
1393 | ||
f64942e4 AA |
1394 | def create_snapshot_volume(self, volume_path, snapshot_name, mode=0o755): |
1395 | self._snapshot_create(self._get_path(volume_path), snapshot_name, mode) | |
7c673cae FG |
1396 | |
1397 | def destroy_snapshot_volume(self, volume_path, snapshot_name): | |
1398 | self._snapshot_destroy(self._get_path(volume_path), snapshot_name) | |
1399 | ||
f64942e4 | 1400 | def create_snapshot_group(self, group_id, snapshot_name, mode=0o755): |
7c673cae FG |
1401 | if group_id is None: |
1402 | raise RuntimeError("Group ID may not be None") | |
1403 | ||
f64942e4 AA |
1404 | return self._snapshot_create(self._get_group_path(group_id), snapshot_name, |
1405 | mode) | |
7c673cae FG |
1406 | |
1407 | def destroy_snapshot_group(self, group_id, snapshot_name): | |
1408 | if group_id is None: | |
1409 | raise RuntimeError("Group ID may not be None") | |
1410 | if snapshot_name is None: | |
1411 | raise RuntimeError("Snapshot name may not be None") | |
1412 | ||
1413 | return self._snapshot_destroy(self._get_group_path(group_id), snapshot_name) | |
1414 | ||
1415 | def _cp_r(self, src, dst): | |
1416 | # TODO | |
1417 | raise NotImplementedError() | |
1418 | ||
1419 | def clone_volume_to_existing(self, dest_volume_path, src_volume_path, src_snapshot_name): | |
1420 | dest_fs_path = self._get_path(dest_volume_path) | |
1421 | src_snapshot_path = self._snapshot_path(self._get_path(src_volume_path), src_snapshot_name) | |
1422 | ||
1423 | self._cp_r(src_snapshot_path, dest_fs_path) | |
3efd9988 FG |
1424 | |
1425 | def put_object(self, pool_name, object_name, data): | |
1426 | """ | |
1427 | Synchronously write data to an object. | |
1428 | ||
1429 | :param pool_name: name of the pool | |
1430 | :type pool_name: str | |
1431 | :param object_name: name of the object | |
1432 | :type object_name: str | |
1433 | :param data: data to write | |
1434 | :type data: bytes | |
1435 | """ | |
91327a77 AA |
1436 | return self.put_object_versioned(pool_name, object_name, data) |
1437 | ||
1438 | def put_object_versioned(self, pool_name, object_name, data, version=None): | |
1439 | """ | |
1440 | Synchronously write data to an object only if version of the object | |
1441 | version matches the expected version. | |
1442 | ||
1443 | :param pool_name: name of the pool | |
1444 | :type pool_name: str | |
1445 | :param object_name: name of the object | |
1446 | :type object_name: str | |
1447 | :param data: data to write | |
1448 | :type data: bytes | |
1449 | :param version: expected version of the object to write | |
1450 | :type version: int | |
1451 | """ | |
3efd9988 | 1452 | ioctx = self.rados.open_ioctx(pool_name) |
91327a77 | 1453 | |
3efd9988 FG |
1454 | max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024 |
1455 | if len(data) > max_size: | |
1456 | msg = ("Data to be written to object '{0}' exceeds " | |
1457 | "{1} bytes".format(object_name, max_size)) | |
1458 | log.error(msg) | |
1459 | raise CephFSVolumeClientError(msg) | |
91327a77 | 1460 | |
3efd9988 | 1461 | try: |
81eedcae | 1462 | with rados.WriteOpCtx() as wop: |
91327a77 AA |
1463 | if version is not None: |
1464 | wop.assert_version(version) | |
1465 | wop.write_full(data) | |
1466 | ioctx.operate_write_op(wop, object_name) | |
1467 | except rados.OSError as e: | |
1468 | log.error(e) | |
1469 | raise e | |
3efd9988 FG |
1470 | finally: |
1471 | ioctx.close() | |
1472 | ||
1473 | def get_object(self, pool_name, object_name): | |
1474 | """ | |
1475 | Synchronously read data from object. | |
1476 | ||
1477 | :param pool_name: name of the pool | |
1478 | :type pool_name: str | |
1479 | :param object_name: name of the object | |
1480 | :type object_name: str | |
1481 | ||
1482 | :returns: bytes - data read from object | |
1483 | """ | |
91327a77 AA |
1484 | return self.get_object_and_version(pool_name, object_name)[0] |
1485 | ||
1486 | def get_object_and_version(self, pool_name, object_name): | |
1487 | """ | |
1488 | Synchronously read data from object and get its version. | |
1489 | ||
1490 | :param pool_name: name of the pool | |
1491 | :type pool_name: str | |
1492 | :param object_name: name of the object | |
1493 | :type object_name: str | |
1494 | ||
1495 | :returns: tuple of object data and version | |
1496 | """ | |
3efd9988 FG |
1497 | ioctx = self.rados.open_ioctx(pool_name) |
1498 | max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024 | |
1499 | try: | |
1500 | bytes_read = ioctx.read(object_name, max_size) | |
1501 | if ((len(bytes_read) == max_size) and | |
1502 | (ioctx.read(object_name, 1, offset=max_size))): | |
1503 | log.warning("Size of object {0} exceeds '{1}' bytes " | |
1504 | "read".format(object_name, max_size)) | |
91327a77 | 1505 | obj_version = ioctx.get_last_version() |
3efd9988 FG |
1506 | finally: |
1507 | ioctx.close() | |
91327a77 | 1508 | return (bytes_read, obj_version) |
3efd9988 FG |
1509 | |
1510 | def delete_object(self, pool_name, object_name): | |
1511 | ioctx = self.rados.open_ioctx(pool_name) | |
1512 | try: | |
1513 | ioctx.remove_object(object_name) | |
1514 | except rados.ObjectNotFound: | |
1515 | log.warn("Object '{0}' was already removed".format(object_name)) | |
1516 | finally: | |
1517 | ioctx.close() |