]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | """ |
2 | Copyright (C) 2015 Red Hat, Inc. | |
3 | ||
9f95a23c | 4 | LGPL-2.1 or LGPL-3.0. See file COPYING. |
7c673cae FG |
5 | """ |
6 | ||
7 | from contextlib import contextmanager | |
8 | import errno | |
9 | import fcntl | |
10 | import json | |
11 | import logging | |
12 | import os | |
13 | import re | |
14 | import struct | |
15 | import sys | |
16 | import threading | |
17 | import time | |
18 | import uuid | |
19 | ||
20 | from ceph_argparse import json_command | |
21 | ||
22 | import cephfs | |
23 | import rados | |
24 | ||
91327a77 AA |
25 | def to_bytes(param): |
26 | ''' | |
27 | Helper method that returns byte representation of the given parameter. | |
28 | ''' | |
29 | if isinstance(param, str): | |
eafe8130 TL |
30 | return param.encode('utf-8') |
31 | elif param is None: | |
32 | return param | |
91327a77 | 33 | else: |
eafe8130 | 34 | return str(param).encode('utf-8') |
7c673cae FG |
35 | |
36 | class RadosError(Exception): | |
37 | """ | |
38 | Something went wrong talking to Ceph with librados | |
39 | """ | |
40 | pass | |
41 | ||
42 | ||
43 | RADOS_TIMEOUT = 10 | |
7c673cae FG |
44 | |
45 | log = logging.getLogger(__name__) | |
46 | ||
7c673cae FG |
47 | # Reserved volume group name which we use in paths for volumes |
48 | # that are not assigned to a group (i.e. created with group=None) | |
49 | NO_GROUP_NAME = "_nogroup" | |
50 | ||
51 | # Filename extensions for meta files. | |
52 | META_FILE_EXT = ".meta" | |
53 | ||
54 | class VolumePath(object): | |
55 | """ | |
56 | Identify a volume's path as group->volume | |
57 | The Volume ID is a unique identifier, but this is a much more | |
58 | helpful thing to pass around. | |
59 | """ | |
60 | def __init__(self, group_id, volume_id): | |
61 | self.group_id = group_id | |
62 | self.volume_id = volume_id | |
63 | assert self.group_id != NO_GROUP_NAME | |
64 | assert self.volume_id != "" and self.volume_id is not None | |
65 | ||
66 | def __str__(self): | |
67 | return "{0}/{1}".format(self.group_id, self.volume_id) | |
68 | ||
69 | ||
70 | class ClusterTimeout(Exception): | |
71 | """ | |
72 | Exception indicating that we timed out trying to talk to the Ceph cluster, | |
73 | either to the mons, or to any individual daemon that the mons indicate ought | |
74 | to be up but isn't responding to us. | |
75 | """ | |
76 | pass | |
77 | ||
78 | ||
79 | class ClusterError(Exception): | |
80 | """ | |
81 | Exception indicating that the cluster returned an error to a command that | |
82 | we thought should be successful based on our last knowledge of the cluster | |
83 | state. | |
84 | """ | |
85 | def __init__(self, action, result_code, result_str): | |
86 | self._action = action | |
87 | self._result_code = result_code | |
88 | self._result_str = result_str | |
89 | ||
90 | def __str__(self): | |
91 | return "Error {0} (\"{1}\") while {2}".format( | |
92 | self._result_code, self._result_str, self._action) | |
93 | ||
94 | ||
95 | class RankEvicter(threading.Thread): | |
96 | """ | |
97 | Thread for evicting client(s) from a particular MDS daemon instance. | |
98 | ||
99 | This is more complex than simply sending a command, because we have to | |
100 | handle cases where MDS daemons might not be fully up yet, and/or might | |
101 | be transiently unresponsive to commands. | |
102 | """ | |
103 | class GidGone(Exception): | |
104 | pass | |
105 | ||
106 | POLL_PERIOD = 5 | |
107 | ||
108 | def __init__(self, volume_client, client_spec, rank, gid, mds_map, ready_timeout): | |
109 | """ | |
110 | :param client_spec: list of strings, used as filter arguments to "session evict" | |
111 | pass ["id=123"] to evict a single client with session id 123. | |
112 | """ | |
113 | self.rank = rank | |
114 | self.gid = gid | |
115 | self._mds_map = mds_map | |
116 | self._client_spec = client_spec | |
117 | self._volume_client = volume_client | |
118 | self._ready_timeout = ready_timeout | |
119 | self._ready_waited = 0 | |
120 | ||
121 | self.success = False | |
122 | self.exception = None | |
123 | ||
124 | super(RankEvicter, self).__init__() | |
125 | ||
126 | def _ready_to_evict(self): | |
127 | if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid: | |
128 | log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format( | |
129 | self._client_spec, self.rank, self.gid | |
130 | )) | |
131 | raise RankEvicter.GidGone() | |
132 | ||
133 | info = self._mds_map['info']["gid_{0}".format(self.gid)] | |
134 | log.debug("_ready_to_evict: state={0}".format(info['state'])) | |
135 | return info['state'] in ["up:active", "up:clientreplay"] | |
136 | ||
137 | def _wait_for_ready(self): | |
138 | """ | |
139 | Wait for that MDS rank to reach an active or clientreplay state, and | |
140 | not be laggy. | |
141 | """ | |
142 | while not self._ready_to_evict(): | |
143 | if self._ready_waited > self._ready_timeout: | |
144 | raise ClusterTimeout() | |
145 | ||
146 | time.sleep(self.POLL_PERIOD) | |
147 | self._ready_waited += self.POLL_PERIOD | |
148 | ||
11fdf7f2 | 149 | self._mds_map = self._volume_client.get_mds_map() |
7c673cae FG |
150 | |
151 | def _evict(self): | |
152 | """ | |
153 | Run the eviction procedure. Return true on success, false on errors. | |
154 | """ | |
155 | ||
156 | # Wait til the MDS is believed by the mon to be available for commands | |
157 | try: | |
158 | self._wait_for_ready() | |
159 | except self.GidGone: | |
160 | return True | |
161 | ||
162 | # Then send it an evict | |
163 | ret = errno.ETIMEDOUT | |
164 | while ret == errno.ETIMEDOUT: | |
165 | log.debug("mds_command: {0}, {1}".format( | |
166 | "%s" % self.gid, ["session", "evict"] + self._client_spec | |
167 | )) | |
168 | ret, outb, outs = self._volume_client.fs.mds_command( | |
169 | "%s" % self.gid, | |
cd265ab1 | 170 | json.dumps({ |
7c673cae FG |
171 | "prefix": "session evict", |
172 | "filters": self._client_spec | |
cd265ab1 | 173 | }), "") |
7c673cae FG |
174 | log.debug("mds_command: complete {0} {1}".format(ret, outs)) |
175 | ||
176 | # If we get a clean response, great, it's gone from that rank. | |
177 | if ret == 0: | |
178 | return True | |
179 | elif ret == errno.ETIMEDOUT: | |
180 | # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error) | |
11fdf7f2 | 181 | self._mds_map = self._volume_client.get_mds_map() |
7c673cae FG |
182 | try: |
183 | self._wait_for_ready() | |
184 | except self.GidGone: | |
185 | return True | |
186 | else: | |
187 | raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs) | |
188 | ||
189 | def run(self): | |
190 | try: | |
191 | self._evict() | |
192 | except Exception as e: | |
193 | self.success = False | |
194 | self.exception = e | |
195 | else: | |
196 | self.success = True | |
197 | ||
198 | ||
199 | class EvictionError(Exception): | |
200 | pass | |
201 | ||
202 | ||
203 | class CephFSVolumeClientError(Exception): | |
204 | """ | |
205 | Something went wrong talking to Ceph using CephFSVolumeClient. | |
206 | """ | |
207 | pass | |
208 | ||
209 | ||
210 | CEPHFSVOLUMECLIENT_VERSION_HISTORY = """ | |
211 | ||
212 | CephFSVolumeClient Version History: | |
213 | ||
214 | * 1 - Initial version | |
3efd9988 | 215 | * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient |
28e407b8 | 216 | * 3 - Allow volumes to be created without RADOS namespace isolation |
91327a77 | 217 | * 4 - Added get_object_and_version, put_object_versioned method to CephFSVolumeClient |
f91f0fd5 | 218 | * 5 - Disallow authorize API for users not created by CephFSVolumeClient |
cd265ab1 | 219 | * 6 - The 'volumes' key in auth-metadata-file is changed to 'subvolumes'. |
7c673cae FG |
220 | """ |
221 | ||
222 | ||
223 | class CephFSVolumeClient(object): | |
224 | """ | |
225 | Combine libcephfs and librados interfaces to implement a | |
226 | 'Volume' concept implemented as a cephfs directory and | |
227 | client capabilities which restrict mount access to this | |
228 | directory. | |
229 | ||
230 | Additionally, volumes may be in a 'Group'. Conveniently, | |
231 | volumes are a lot like manila shares, and groups are a lot | |
232 | like manila consistency groups. | |
233 | ||
234 | Refer to volumes with VolumePath, which specifies the | |
235 | volume and group IDs (both strings). The group ID may | |
236 | be None. | |
237 | ||
238 | In general, functions in this class are allowed raise rados.Error | |
239 | or cephfs.Error exceptions in unexpected situations. | |
240 | """ | |
241 | ||
242 | # Current version | |
cd265ab1 | 243 | version = 6 |
7c673cae FG |
244 | |
245 | # Where shall we create our volumes? | |
246 | POOL_PREFIX = "fsvolume_" | |
247 | DEFAULT_VOL_PREFIX = "/volumes" | |
248 | DEFAULT_NS_PREFIX = "fsvolumens_" | |
249 | ||
11fdf7f2 TL |
250 | def __init__(self, auth_id=None, conf_path=None, cluster_name=None, |
251 | volume_prefix=None, pool_ns_prefix=None, rados=None, | |
252 | fs_name=None): | |
253 | """ | |
254 | Either set all three of ``auth_id``, ``conf_path`` and | |
255 | ``cluster_name`` (rados constructed on connect), or | |
256 | set ``rados`` (existing rados instance). | |
257 | """ | |
7c673cae | 258 | self.fs = None |
11fdf7f2 | 259 | self.fs_name = fs_name |
7c673cae | 260 | self.connected = False |
11fdf7f2 | 261 | |
7c673cae FG |
262 | self.conf_path = conf_path |
263 | self.cluster_name = cluster_name | |
264 | self.auth_id = auth_id | |
11fdf7f2 TL |
265 | |
266 | self.rados = rados | |
267 | if self.rados: | |
268 | # Using an externally owned rados, so we won't tear it down | |
269 | # on disconnect | |
270 | self.own_rados = False | |
271 | else: | |
272 | # self.rados will be constructed in connect | |
273 | self.own_rados = True | |
274 | ||
7c673cae FG |
275 | self.volume_prefix = volume_prefix if volume_prefix else self.DEFAULT_VOL_PREFIX |
276 | self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else self.DEFAULT_NS_PREFIX | |
277 | # For flock'ing in cephfs, I want a unique ID to distinguish me | |
278 | # from any other manila-share services that are loading this module. | |
279 | # We could use pid, but that's unnecessary weak: generate a | |
280 | # UUID | |
91327a77 | 281 | self._id = struct.unpack(">Q", uuid.uuid1().bytes[0:8])[0] |
7c673cae FG |
282 | |
283 | # TODO: version the on-disk structures | |
284 | ||
285 | def recover(self): | |
286 | # Scan all auth keys to see if they're dirty: if they are, they have | |
287 | # state that might not have propagated to Ceph or to the related | |
288 | # volumes yet. | |
289 | ||
290 | # Important: we *always* acquire locks in the order auth->volume | |
291 | # That means a volume can never be dirty without the auth key | |
292 | # we're updating it with being dirty at the same time. | |
293 | ||
294 | # First list the auth IDs that have potentially dirty on-disk metadata | |
295 | log.debug("Recovering from partial auth updates (if any)...") | |
296 | ||
297 | try: | |
298 | dir_handle = self.fs.opendir(self.volume_prefix) | |
299 | except cephfs.ObjectNotFound: | |
300 | log.debug("Nothing to recover. No auth meta files.") | |
301 | return | |
302 | ||
303 | d = self.fs.readdir(dir_handle) | |
304 | auth_ids = [] | |
305 | ||
306 | if not d: | |
307 | log.debug("Nothing to recover. No auth meta files.") | |
308 | ||
309 | while d: | |
310 | # Identify auth IDs from auth meta filenames. The auth meta files | |
311 | # are named as, "$<auth_id><meta filename extension>" | |
312 | regex = "^\$(.*){0}$".format(re.escape(META_FILE_EXT)) | |
494da23a | 313 | match = re.search(regex, d.d_name.decode(encoding='utf-8')) |
7c673cae FG |
314 | if match: |
315 | auth_ids.append(match.group(1)) | |
316 | ||
317 | d = self.fs.readdir(dir_handle) | |
318 | ||
319 | self.fs.closedir(dir_handle) | |
320 | ||
321 | # Key points based on ordering: | |
322 | # * Anything added in VMeta is already added in AMeta | |
323 | # * Anything added in Ceph is already added in VMeta | |
324 | # * Anything removed in VMeta is already removed in Ceph | |
325 | # * Anything removed in AMeta is already removed in VMeta | |
326 | ||
327 | # Deauthorization: because I only update metadata AFTER the | |
328 | # update of the next level down, I have the same ordering of | |
329 | # -> things which exist in the AMeta should also exist | |
330 | # in the VMeta, should also exist in Ceph, and the same | |
331 | # recovery procedure that gets me consistent after crashes | |
332 | # during authorization will also work during deauthorization | |
333 | ||
334 | # Now for each auth ID, check for dirty flag and apply updates | |
335 | # if dirty flag is found | |
336 | for auth_id in auth_ids: | |
337 | with self._auth_lock(auth_id): | |
338 | auth_meta = self._auth_metadata_get(auth_id) | |
cd265ab1 TL |
339 | # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key |
340 | if auth_meta and 'volumes' in auth_meta: | |
341 | auth_meta['subvolumes'] = auth_meta.pop('volumes') | |
342 | if not auth_meta or not auth_meta['subvolumes']: | |
7c673cae FG |
343 | # Clean up auth meta file |
344 | self.fs.unlink(self._auth_metadata_path(auth_id)) | |
345 | continue | |
346 | if not auth_meta['dirty']: | |
347 | continue | |
348 | self._recover_auth_meta(auth_id, auth_meta) | |
349 | ||
350 | log.debug("Recovered from partial auth updates (if any).") | |
351 | ||
352 | def _recover_auth_meta(self, auth_id, auth_meta): | |
353 | """ | |
354 | Call me after locking the auth meta file. | |
355 | """ | |
356 | remove_volumes = [] | |
357 | ||
cd265ab1 | 358 | for volume, volume_data in auth_meta['subvolumes'].items(): |
7c673cae FG |
359 | if not volume_data['dirty']: |
360 | continue | |
361 | ||
362 | (group_id, volume_id) = volume.split('/') | |
f6b5b4d7 | 363 | group_id = group_id if group_id != 'None' else None |
7c673cae FG |
364 | volume_path = VolumePath(group_id, volume_id) |
365 | access_level = volume_data['access_level'] | |
366 | ||
367 | with self._volume_lock(volume_path): | |
368 | vol_meta = self._volume_metadata_get(volume_path) | |
369 | ||
370 | # No VMeta update indicates that there was no auth update | |
371 | # in Ceph either. So it's safe to remove corresponding | |
372 | # partial update in AMeta. | |
373 | if not vol_meta or auth_id not in vol_meta['auths']: | |
374 | remove_volumes.append(volume) | |
375 | continue | |
376 | ||
377 | want_auth = { | |
378 | 'access_level': access_level, | |
379 | 'dirty': False, | |
380 | } | |
381 | # VMeta update looks clean. Ceph auth update must have been | |
382 | # clean. | |
383 | if vol_meta['auths'][auth_id] == want_auth: | |
cd265ab1 TL |
384 | auth_meta['subvolumes'][volume]['dirty'] = False |
385 | self._auth_metadata_set(auth_id, auth_meta) | |
7c673cae FG |
386 | continue |
387 | ||
f6b5b4d7 | 388 | readonly = access_level == 'r' |
f91f0fd5 TL |
389 | client_entity = "client.{0}".format(auth_id) |
390 | try: | |
391 | existing_caps = self._rados_command( | |
392 | 'auth get', | |
393 | { | |
394 | 'entity': client_entity | |
395 | } | |
396 | ) | |
397 | # FIXME: rados raising Error instead of ObjectNotFound in auth get failure | |
398 | except rados.Error: | |
399 | existing_caps = None | |
400 | self._authorize_volume(volume_path, auth_id, readonly, existing_caps) | |
7c673cae FG |
401 | |
402 | # Recovered from partial auth updates for the auth ID's access | |
403 | # to a volume. | |
cd265ab1 | 404 | auth_meta['subvolumes'][volume]['dirty'] = False |
7c673cae FG |
405 | self._auth_metadata_set(auth_id, auth_meta) |
406 | ||
407 | for volume in remove_volumes: | |
cd265ab1 | 408 | del auth_meta['subvolumes'][volume] |
7c673cae | 409 | |
cd265ab1 | 410 | if not auth_meta['subvolumes']: |
7c673cae FG |
411 | # Clean up auth meta file |
412 | self.fs.unlink(self._auth_metadata_path(auth_id)) | |
413 | return | |
414 | ||
415 | # Recovered from all partial auth updates for the auth ID. | |
416 | auth_meta['dirty'] = False | |
417 | self._auth_metadata_set(auth_id, auth_meta) | |
418 | ||
11fdf7f2 TL |
419 | def get_mds_map(self): |
420 | fs_map = self._rados_command("fs dump", {}) | |
421 | return fs_map['filesystems'][0]['mdsmap'] | |
7c673cae FG |
422 | |
423 | def evict(self, auth_id, timeout=30, volume_path=None): | |
424 | """ | |
425 | Evict all clients based on the authorization ID and optionally based on | |
426 | the volume path mounted. Assumes that the authorization key has been | |
427 | revoked prior to calling this function. | |
428 | ||
429 | This operation can throw an exception if the mon cluster is unresponsive, or | |
430 | any individual MDS daemon is unresponsive for longer than the timeout passed in. | |
431 | """ | |
432 | ||
433 | client_spec = ["auth_name={0}".format(auth_id), ] | |
434 | if volume_path: | |
435 | client_spec.append("client_metadata.root={0}". | |
436 | format(self._get_path(volume_path))) | |
437 | ||
438 | log.info("evict clients with {0}".format(', '.join(client_spec))) | |
439 | ||
11fdf7f2 | 440 | mds_map = self.get_mds_map() |
7c673cae FG |
441 | up = {} |
442 | for name, gid in mds_map['up'].items(): | |
443 | # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0" | |
444 | assert name.startswith("mds_") | |
445 | up[int(name[4:])] = gid | |
446 | ||
447 | # For all MDS ranks held by a daemon | |
448 | # Do the parallelism in python instead of using "tell mds.*", because | |
449 | # the latter doesn't give us per-mds output | |
450 | threads = [] | |
451 | for rank, gid in up.items(): | |
452 | thread = RankEvicter(self, client_spec, rank, gid, mds_map, | |
453 | timeout) | |
454 | thread.start() | |
455 | threads.append(thread) | |
456 | ||
457 | for t in threads: | |
458 | t.join() | |
459 | ||
460 | log.info("evict: joined all") | |
461 | ||
462 | for t in threads: | |
463 | if not t.success: | |
464 | msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}". | |
465 | format(', '.join(client_spec), t.rank, t.gid, t.exception) | |
466 | ) | |
467 | log.error(msg) | |
468 | raise EvictionError(msg) | |
469 | ||
470 | def _get_path(self, volume_path): | |
471 | """ | |
472 | Determine the path within CephFS where this volume will live | |
473 | :return: absolute path (string) | |
474 | """ | |
475 | return os.path.join( | |
476 | self.volume_prefix, | |
477 | volume_path.group_id if volume_path.group_id is not None else NO_GROUP_NAME, | |
478 | volume_path.volume_id) | |
479 | ||
480 | def _get_group_path(self, group_id): | |
481 | if group_id is None: | |
482 | raise ValueError("group_id may not be None") | |
483 | ||
484 | return os.path.join( | |
485 | self.volume_prefix, | |
486 | group_id | |
487 | ) | |
488 | ||
11fdf7f2 | 489 | def _connect(self, premount_evict): |
7c673cae FG |
490 | log.debug("Connecting to cephfs...") |
491 | self.fs = cephfs.LibCephFS(rados_inst=self.rados) | |
492 | log.debug("CephFS initializing...") | |
493 | self.fs.init() | |
494 | if premount_evict is not None: | |
495 | log.debug("Premount eviction of {0} starting".format(premount_evict)) | |
496 | self.evict(premount_evict) | |
497 | log.debug("Premount eviction of {0} completes".format(premount_evict)) | |
498 | log.debug("CephFS mounting...") | |
eafe8130 | 499 | self.fs.mount(filesystem_name=to_bytes(self.fs_name)) |
7c673cae FG |
500 | log.debug("Connection to cephfs complete") |
501 | ||
502 | # Recover from partial auth updates due to a previous | |
503 | # crash. | |
504 | self.recover() | |
505 | ||
11fdf7f2 TL |
506 | def connect(self, premount_evict = None): |
507 | """ | |
508 | ||
509 | :param premount_evict: Optional auth_id to evict before mounting the filesystem: callers | |
510 | may want to use this to specify their own auth ID if they expect | |
511 | to be a unique instance and don't want to wait for caps to time | |
512 | out after failure of another instance of themselves. | |
513 | """ | |
514 | if self.own_rados: | |
515 | log.debug("Configuring to RADOS with config {0}...".format(self.conf_path)) | |
516 | self.rados = rados.Rados( | |
517 | name="client.{0}".format(self.auth_id), | |
518 | clustername=self.cluster_name, | |
519 | conffile=self.conf_path, | |
520 | conf={} | |
521 | ) | |
522 | if self.rados.state != "connected": | |
523 | log.debug("Connecting to RADOS...") | |
524 | self.rados.connect() | |
525 | log.debug("Connection to RADOS complete") | |
526 | self._connect(premount_evict) | |
527 | ||
7c673cae FG |
528 | def get_mon_addrs(self): |
529 | log.info("get_mon_addrs") | |
530 | result = [] | |
531 | mon_map = self._rados_command("mon dump") | |
532 | for mon in mon_map['mons']: | |
533 | ip_port = mon['addr'].split("/")[0] | |
534 | result.append(ip_port) | |
535 | ||
536 | return result | |
537 | ||
538 | def disconnect(self): | |
539 | log.info("disconnect") | |
540 | if self.fs: | |
541 | log.debug("Disconnecting cephfs...") | |
542 | self.fs.shutdown() | |
543 | self.fs = None | |
544 | log.debug("Disconnecting cephfs complete") | |
545 | ||
11fdf7f2 | 546 | if self.rados and self.own_rados: |
7c673cae FG |
547 | log.debug("Disconnecting rados...") |
548 | self.rados.shutdown() | |
549 | self.rados = None | |
550 | log.debug("Disconnecting rados complete") | |
551 | ||
11fdf7f2 TL |
552 | def __enter__(self): |
553 | self.connect() | |
554 | return self | |
555 | ||
556 | def __exit__(self, exc_type, exc_val, exc_tb): | |
557 | self.disconnect() | |
558 | ||
7c673cae FG |
559 | def __del__(self): |
560 | self.disconnect() | |
561 | ||
562 | def _get_pool_id(self, osd_map, pool_name): | |
563 | # Maybe borrow the OSDMap wrapper class from calamari if more helpers | |
564 | # like this are needed. | |
565 | for pool in osd_map['pools']: | |
566 | if pool['pool_name'] == pool_name: | |
567 | return pool['pool'] | |
568 | ||
569 | return None | |
570 | ||
571 | def _create_volume_pool(self, pool_name): | |
572 | """ | |
573 | Idempotently create a pool for use as a CephFS data pool, with the given name | |
574 | ||
575 | :return The ID of the created pool | |
576 | """ | |
577 | osd_map = self._rados_command('osd dump', {}) | |
578 | ||
579 | existing_id = self._get_pool_id(osd_map, pool_name) | |
580 | if existing_id is not None: | |
581 | log.info("Pool {0} already exists".format(pool_name)) | |
582 | return existing_id | |
583 | ||
7c673cae FG |
584 | self._rados_command( |
585 | 'osd pool create', | |
586 | { | |
587 | 'pool': pool_name, | |
7c673cae FG |
588 | } |
589 | ) | |
590 | ||
591 | osd_map = self._rados_command('osd dump', {}) | |
592 | pool_id = self._get_pool_id(osd_map, pool_name) | |
593 | ||
594 | if pool_id is None: | |
595 | # If the pool isn't there, that's either a ceph bug, or it's some outside influence | |
596 | # removing it right after we created it. | |
597 | log.error("OSD map doesn't contain expected pool '{0}':\n{1}".format( | |
598 | pool_name, json.dumps(osd_map, indent=2) | |
599 | )) | |
600 | raise RuntimeError("Pool '{0}' not present in map after creation".format(pool_name)) | |
601 | else: | |
602 | return pool_id | |
603 | ||
f64942e4 | 604 | def create_group(self, group_id, mode=0o755): |
7c673cae FG |
605 | # Prevent craftily-named volume groups from colliding with the meta |
606 | # files. | |
607 | if group_id.endswith(META_FILE_EXT): | |
608 | raise ValueError("group ID cannot end with '{0}'.".format( | |
609 | META_FILE_EXT)) | |
610 | path = self._get_group_path(group_id) | |
b3b6e05e | 611 | self.fs.mkdirs(path, mode) |
7c673cae FG |
612 | |
613 | def destroy_group(self, group_id): | |
614 | path = self._get_group_path(group_id) | |
615 | try: | |
616 | self.fs.stat(self.volume_prefix) | |
617 | except cephfs.ObjectNotFound: | |
618 | pass | |
619 | else: | |
620 | self.fs.rmdir(path) | |
621 | ||
f64942e4 AA |
622 | def create_volume(self, volume_path, size=None, data_isolated=False, namespace_isolated=True, |
623 | mode=0o755): | |
7c673cae FG |
624 | """ |
625 | Set up metadata, pools and auth for a volume. | |
626 | ||
627 | This function is idempotent. It is safe to call this again | |
628 | for an already-created volume, even if it is in use. | |
629 | ||
630 | :param volume_path: VolumePath instance | |
631 | :param size: In bytes, or None for no size limit | |
632 | :param data_isolated: If true, create a separate OSD pool for this volume | |
28e407b8 | 633 | :param namespace_isolated: If true, use separate RADOS namespace for this volume |
7c673cae FG |
634 | :return: |
635 | """ | |
636 | path = self._get_path(volume_path) | |
637 | log.info("create_volume: {0}".format(path)) | |
638 | ||
b3b6e05e | 639 | self.fs.mkdirs(path, mode) |
7c673cae FG |
640 | |
641 | if size is not None: | |
91327a77 | 642 | self.fs.setxattr(path, 'ceph.quota.max_bytes', to_bytes(size), 0) |
7c673cae FG |
643 | |
644 | # data_isolated means create a separate pool for this volume | |
645 | if data_isolated: | |
646 | pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id) | |
647 | log.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path, pool_name)) | |
648 | pool_id = self._create_volume_pool(pool_name) | |
11fdf7f2 | 649 | mds_map = self.get_mds_map() |
7c673cae | 650 | if pool_id not in mds_map['data_pools']: |
11fdf7f2 TL |
651 | self._rados_command("fs add_data_pool", { |
652 | 'fs_name': mds_map['fs_name'], | |
7c673cae FG |
653 | 'pool': pool_name |
654 | }) | |
91327a77 AA |
655 | time.sleep(5) # time for MDSMap to be distributed |
656 | self.fs.setxattr(path, 'ceph.dir.layout.pool', to_bytes(pool_name), 0) | |
7c673cae | 657 | |
28e407b8 AA |
658 | # enforce security isolation, use separate namespace for this volume |
659 | if namespace_isolated: | |
660 | namespace = "{0}{1}".format(self.pool_ns_prefix, volume_path.volume_id) | |
661 | log.info("create_volume: {0}, using rados namespace {1} to isolate data.".format(volume_path, namespace)) | |
91327a77 AA |
662 | self.fs.setxattr(path, 'ceph.dir.layout.pool_namespace', |
663 | to_bytes(namespace), 0) | |
28e407b8 AA |
664 | else: |
665 | # If volume's namespace layout is not set, then the volume's pool | |
666 | # layout remains unset and will undesirably change with ancestor's | |
667 | # pool layout changes. | |
668 | pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool") | |
91327a77 AA |
669 | self.fs.setxattr(path, 'ceph.dir.layout.pool', |
670 | to_bytes(pool_name), 0) | |
7c673cae FG |
671 | |
672 | # Create a volume meta file, if it does not already exist, to store | |
673 | # data about auth ids having access to the volume | |
674 | fd = self.fs.open(self._volume_metadata_path(volume_path), | |
675 | os.O_CREAT, 0o755) | |
676 | self.fs.close(fd) | |
677 | ||
678 | return { | |
679 | 'mount_path': path | |
680 | } | |
681 | ||
682 | def delete_volume(self, volume_path, data_isolated=False): | |
683 | """ | |
684 | Make a volume inaccessible to guests. This function is | |
685 | idempotent. This is the fast part of tearing down a volume: you must | |
686 | also later call purge_volume, which is the slow part. | |
687 | ||
688 | :param volume_path: Same identifier used in create_volume | |
689 | :return: | |
690 | """ | |
691 | ||
692 | path = self._get_path(volume_path) | |
693 | log.info("delete_volume: {0}".format(path)) | |
694 | ||
695 | # Create the trash folder if it doesn't already exist | |
696 | trash = os.path.join(self.volume_prefix, "_deleting") | |
b3b6e05e | 697 | self.fs.mkdirs(trash, 0o755) |
7c673cae FG |
698 | |
699 | # We'll move it to here | |
700 | trashed_volume = os.path.join(trash, volume_path.volume_id) | |
701 | ||
702 | # Move the volume's data to the trash folder | |
703 | try: | |
704 | self.fs.stat(path) | |
705 | except cephfs.ObjectNotFound: | |
706 | log.warning("Trying to delete volume '{0}' but it's already gone".format( | |
707 | path)) | |
708 | else: | |
709 | self.fs.rename(path, trashed_volume) | |
710 | ||
711 | # Delete the volume meta file, if it's not already deleted | |
712 | vol_meta_path = self._volume_metadata_path(volume_path) | |
713 | try: | |
714 | self.fs.unlink(vol_meta_path) | |
715 | except cephfs.ObjectNotFound: | |
716 | pass | |
717 | ||
718 | def purge_volume(self, volume_path, data_isolated=False): | |
719 | """ | |
720 | Finish clearing up a volume that was previously passed to delete_volume. This | |
721 | function is idempotent. | |
722 | """ | |
723 | ||
724 | trash = os.path.join(self.volume_prefix, "_deleting") | |
725 | trashed_volume = os.path.join(trash, volume_path.volume_id) | |
726 | ||
727 | try: | |
728 | self.fs.stat(trashed_volume) | |
729 | except cephfs.ObjectNotFound: | |
730 | log.warning("Trying to purge volume '{0}' but it's already been purged".format( | |
731 | trashed_volume)) | |
732 | return | |
733 | ||
734 | def rmtree(root_path): | |
735 | log.debug("rmtree {0}".format(root_path)) | |
736 | dir_handle = self.fs.opendir(root_path) | |
737 | d = self.fs.readdir(dir_handle) | |
738 | while d: | |
494da23a TL |
739 | d_name = d.d_name.decode(encoding='utf-8') |
740 | if d_name not in [".", ".."]: | |
7c673cae FG |
741 | # Do not use os.path.join because it is sensitive |
742 | # to string encoding, we just pass through dnames | |
743 | # as byte arrays | |
494da23a | 744 | d_full = u"{0}/{1}".format(root_path, d_name) |
7c673cae FG |
745 | if d.is_dir(): |
746 | rmtree(d_full) | |
747 | else: | |
748 | self.fs.unlink(d_full) | |
749 | ||
750 | d = self.fs.readdir(dir_handle) | |
751 | self.fs.closedir(dir_handle) | |
752 | ||
753 | self.fs.rmdir(root_path) | |
754 | ||
755 | rmtree(trashed_volume) | |
756 | ||
757 | if data_isolated: | |
758 | pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id) | |
759 | osd_map = self._rados_command("osd dump", {}) | |
760 | pool_id = self._get_pool_id(osd_map, pool_name) | |
11fdf7f2 | 761 | mds_map = self.get_mds_map() |
7c673cae | 762 | if pool_id in mds_map['data_pools']: |
11fdf7f2 TL |
763 | self._rados_command("fs rm_data_pool", { |
764 | 'fs_name': mds_map['fs_name'], | |
7c673cae FG |
765 | 'pool': pool_name |
766 | }) | |
767 | self._rados_command("osd pool delete", | |
768 | { | |
769 | "pool": pool_name, | |
770 | "pool2": pool_name, | |
11fdf7f2 | 771 | "yes_i_really_really_mean_it": True |
7c673cae FG |
772 | }) |
773 | ||
774 | def _get_ancestor_xattr(self, path, attr): | |
775 | """ | |
776 | Helper for reading layout information: if this xattr is missing | |
777 | on the requested path, keep checking parents until we find it. | |
778 | """ | |
779 | try: | |
91327a77 | 780 | result = self.fs.getxattr(path, attr).decode() |
7c673cae FG |
781 | if result == "": |
782 | # Annoying! cephfs gives us empty instead of an error when attr not found | |
783 | raise cephfs.NoData() | |
784 | else: | |
785 | return result | |
786 | except cephfs.NoData: | |
787 | if path == "/": | |
788 | raise | |
789 | else: | |
790 | return self._get_ancestor_xattr(os.path.split(path)[0], attr) | |
791 | ||
792 | def _check_compat_version(self, compat_version): | |
793 | if self.version < compat_version: | |
794 | msg = ("The current version of CephFSVolumeClient, version {0} " | |
795 | "does not support the required feature. Need version {1} " | |
796 | "or greater".format(self.version, compat_version) | |
797 | ) | |
798 | log.error(msg) | |
799 | raise CephFSVolumeClientError(msg) | |
800 | ||
801 | def _metadata_get(self, path): | |
802 | """ | |
803 | Return a deserialized JSON object, or None | |
804 | """ | |
805 | fd = self.fs.open(path, "r") | |
806 | # TODO iterate instead of assuming file < 4MB | |
807 | read_bytes = self.fs.read(fd, 0, 4096 * 1024) | |
808 | self.fs.close(fd) | |
809 | if read_bytes: | |
91327a77 | 810 | return json.loads(read_bytes.decode()) |
7c673cae FG |
811 | else: |
812 | return None | |
813 | ||
814 | def _metadata_set(self, path, data): | |
815 | serialized = json.dumps(data) | |
816 | fd = self.fs.open(path, "w") | |
817 | try: | |
91327a77 | 818 | self.fs.write(fd, to_bytes(serialized), 0) |
7c673cae FG |
819 | self.fs.fsync(fd, 0) |
820 | finally: | |
821 | self.fs.close(fd) | |
822 | ||
823 | def _lock(self, path): | |
824 | @contextmanager | |
825 | def fn(): | |
826 | while(1): | |
827 | fd = self.fs.open(path, os.O_CREAT, 0o755) | |
828 | self.fs.flock(fd, fcntl.LOCK_EX, self._id) | |
829 | ||
830 | # The locked file will be cleaned up sometime. It could be | |
831 | # unlinked e.g., by an another manila share instance, before | |
832 | # lock was applied on it. Perform checks to ensure that this | |
833 | # does not happen. | |
834 | try: | |
835 | statbuf = self.fs.stat(path) | |
836 | except cephfs.ObjectNotFound: | |
837 | self.fs.close(fd) | |
838 | continue | |
839 | ||
840 | fstatbuf = self.fs.fstat(fd) | |
841 | if statbuf.st_ino == fstatbuf.st_ino: | |
842 | break | |
843 | ||
844 | try: | |
845 | yield | |
846 | finally: | |
847 | self.fs.flock(fd, fcntl.LOCK_UN, self._id) | |
848 | self.fs.close(fd) | |
849 | ||
850 | return fn() | |
851 | ||
852 | def _auth_metadata_path(self, auth_id): | |
853 | return os.path.join(self.volume_prefix, "${0}{1}".format( | |
854 | auth_id, META_FILE_EXT)) | |
855 | ||
856 | def _auth_lock(self, auth_id): | |
857 | return self._lock(self._auth_metadata_path(auth_id)) | |
858 | ||
859 | def _auth_metadata_get(self, auth_id): | |
860 | """ | |
861 | Call me with the metadata locked! | |
862 | ||
863 | Check whether a auth metadata structure can be decoded by the current | |
864 | version of CephFSVolumeClient. | |
865 | ||
866 | Return auth metadata that the current version of CephFSVolumeClient | |
867 | can decode. | |
868 | """ | |
869 | auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id)) | |
870 | ||
871 | if auth_metadata: | |
872 | self._check_compat_version(auth_metadata['compat_version']) | |
873 | ||
874 | return auth_metadata | |
875 | ||
876 | def _auth_metadata_set(self, auth_id, data): | |
877 | """ | |
878 | Call me with the metadata locked! | |
879 | ||
880 | Fsync the auth metadata. | |
881 | ||
882 | Add two version attributes to the auth metadata, | |
883 | 'compat_version', the minimum CephFSVolumeClient version that can | |
884 | decode the metadata, and 'version', the CephFSVolumeClient version | |
885 | that encoded the metadata. | |
886 | """ | |
cd265ab1 | 887 | data['compat_version'] = 6 |
3efd9988 | 888 | data['version'] = self.version |
7c673cae FG |
889 | return self._metadata_set(self._auth_metadata_path(auth_id), data) |
890 | ||
891 | def _volume_metadata_path(self, volume_path): | |
892 | return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format( | |
893 | volume_path.group_id if volume_path.group_id else "", | |
894 | volume_path.volume_id, | |
895 | META_FILE_EXT | |
896 | )) | |
897 | ||
898 | def _volume_lock(self, volume_path): | |
899 | """ | |
900 | Return a ContextManager which locks the authorization metadata for | |
901 | a particular volume, and persists a flag to the metadata indicating | |
902 | that it is currently locked, so that we can detect dirty situations | |
903 | during recovery. | |
904 | ||
905 | This lock isn't just to make access to the metadata safe: it's also | |
906 | designed to be used over the two-step process of checking the | |
907 | metadata and then responding to an authorization request, to | |
908 | ensure that at the point we respond the metadata hasn't changed | |
909 | in the background. It's key to how we avoid security holes | |
910 | resulting from races during that problem , | |
911 | """ | |
912 | return self._lock(self._volume_metadata_path(volume_path)) | |
913 | ||
914 | def _volume_metadata_get(self, volume_path): | |
915 | """ | |
916 | Call me with the metadata locked! | |
917 | ||
918 | Check whether a volume metadata structure can be decoded by the current | |
919 | version of CephFSVolumeClient. | |
920 | ||
921 | Return a volume_metadata structure that the current version of | |
922 | CephFSVolumeClient can decode. | |
923 | """ | |
924 | volume_metadata = self._metadata_get(self._volume_metadata_path(volume_path)) | |
925 | ||
926 | if volume_metadata: | |
927 | self._check_compat_version(volume_metadata['compat_version']) | |
928 | ||
929 | return volume_metadata | |
930 | ||
931 | def _volume_metadata_set(self, volume_path, data): | |
932 | """ | |
933 | Call me with the metadata locked! | |
934 | ||
935 | Add two version attributes to the volume metadata, | |
936 | 'compat_version', the minimum CephFSVolumeClient version that can | |
937 | decode the metadata and 'version', the CephFSVolumeClient version | |
938 | that encoded the metadata. | |
939 | """ | |
940 | data['compat_version'] = 1 | |
3efd9988 | 941 | data['version'] = self.version |
7c673cae FG |
942 | return self._metadata_set(self._volume_metadata_path(volume_path), data) |
943 | ||
f91f0fd5 TL |
944 | def _prepare_updated_caps_list(self, existing_caps, mds_cap_str, osd_cap_str, authorize=True): |
945 | caps_list = [] | |
946 | for k, v in existing_caps['caps'].items(): | |
947 | if k == 'mds' or k == 'osd': | |
948 | continue | |
949 | elif k == 'mon': | |
950 | if not authorize and v == 'allow r': | |
951 | continue | |
952 | caps_list.extend((k,v)) | |
953 | ||
954 | if mds_cap_str: | |
955 | caps_list.extend(('mds', mds_cap_str)) | |
956 | if osd_cap_str: | |
957 | caps_list.extend(('osd', osd_cap_str)) | |
958 | ||
959 | if authorize and 'mon' not in caps_list: | |
960 | caps_list.extend(('mon', 'allow r')) | |
961 | ||
962 | return caps_list | |
963 | ||
964 | def authorize(self, volume_path, auth_id, readonly=False, tenant_id=None, allow_existing_id=False): | |
7c673cae FG |
965 | """ |
966 | Get-or-create a Ceph auth identity for `auth_id` and grant them access | |
967 | to | |
968 | :param volume_path: | |
969 | :param auth_id: | |
970 | :param readonly: | |
971 | :param tenant_id: Optionally provide a stringizable object to | |
972 | restrict any created cephx IDs to other callers | |
973 | passing the same tenant ID. | |
f91f0fd5 TL |
974 | :allow_existing_id: Optionally authorize existing auth-ids not |
975 | created by ceph_volume_client | |
7c673cae FG |
976 | :return: |
977 | """ | |
978 | ||
979 | with self._auth_lock(auth_id): | |
f91f0fd5 TL |
980 | client_entity = "client.{0}".format(auth_id) |
981 | try: | |
982 | existing_caps = self._rados_command( | |
983 | 'auth get', | |
984 | { | |
985 | 'entity': client_entity | |
986 | } | |
987 | ) | |
988 | # FIXME: rados raising Error instead of ObjectNotFound in auth get failure | |
989 | except rados.Error: | |
990 | existing_caps = None | |
991 | ||
7c673cae FG |
992 | # Existing meta, or None, to be updated |
993 | auth_meta = self._auth_metadata_get(auth_id) | |
994 | ||
cd265ab1 | 995 | # subvolume data to be inserted |
7c673cae | 996 | volume_path_str = str(volume_path) |
cd265ab1 | 997 | subvolume = { |
7c673cae FG |
998 | volume_path_str : { |
999 | # The access level at which the auth_id is authorized to | |
1000 | # access the volume. | |
1001 | 'access_level': 'r' if readonly else 'rw', | |
1002 | 'dirty': True, | |
1003 | } | |
1004 | } | |
f91f0fd5 | 1005 | |
7c673cae | 1006 | if auth_meta is None: |
f91f0fd5 TL |
1007 | if not allow_existing_id and existing_caps is not None: |
1008 | msg = "auth ID: {0} exists and not created by ceph_volume_client. Not allowed to modify".format(auth_id) | |
1009 | log.error(msg) | |
1010 | raise CephFSVolumeClientError(msg) | |
1011 | ||
1012 | # non-existent auth IDs | |
7c673cae FG |
1013 | sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format( |
1014 | auth_id, tenant_id | |
1015 | )) | |
1016 | log.debug("Authorize: no existing meta") | |
1017 | auth_meta = { | |
1018 | 'dirty': True, | |
1019 | 'tenant_id': tenant_id.__str__() if tenant_id else None, | |
cd265ab1 | 1020 | 'subvolumes': subvolume |
7c673cae | 1021 | } |
7c673cae | 1022 | else: |
cd265ab1 TL |
1023 | # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key |
1024 | if 'volumes' in auth_meta: | |
1025 | auth_meta['subvolumes'] = auth_meta.pop('volumes') | |
1026 | ||
7c673cae FG |
1027 | # Disallow tenants to share auth IDs |
1028 | if auth_meta['tenant_id'].__str__() != tenant_id.__str__(): | |
1029 | msg = "auth ID: {0} is already in use".format(auth_id) | |
1030 | log.error(msg) | |
1031 | raise CephFSVolumeClientError(msg) | |
1032 | ||
1033 | if auth_meta['dirty']: | |
1034 | self._recover_auth_meta(auth_id, auth_meta) | |
1035 | ||
1036 | log.debug("Authorize: existing tenant {tenant}".format( | |
1037 | tenant=auth_meta['tenant_id'] | |
1038 | )) | |
1039 | auth_meta['dirty'] = True | |
cd265ab1 | 1040 | auth_meta['subvolumes'].update(subvolume) |
7c673cae FG |
1041 | |
1042 | self._auth_metadata_set(auth_id, auth_meta) | |
1043 | ||
1044 | with self._volume_lock(volume_path): | |
f91f0fd5 | 1045 | key = self._authorize_volume(volume_path, auth_id, readonly, existing_caps) |
7c673cae FG |
1046 | |
1047 | auth_meta['dirty'] = False | |
cd265ab1 | 1048 | auth_meta['subvolumes'][volume_path_str]['dirty'] = False |
7c673cae FG |
1049 | self._auth_metadata_set(auth_id, auth_meta) |
1050 | ||
1051 | if tenant_id: | |
1052 | return { | |
1053 | 'auth_key': key | |
1054 | } | |
1055 | else: | |
1056 | # Caller wasn't multi-tenant aware: be safe and don't give | |
1057 | # them a key | |
1058 | return { | |
1059 | 'auth_key': None | |
1060 | } | |
1061 | ||
f91f0fd5 | 1062 | def _authorize_volume(self, volume_path, auth_id, readonly, existing_caps): |
7c673cae FG |
1063 | vol_meta = self._volume_metadata_get(volume_path) |
1064 | ||
1065 | access_level = 'r' if readonly else 'rw' | |
1066 | auth = { | |
1067 | auth_id: { | |
1068 | 'access_level': access_level, | |
1069 | 'dirty': True, | |
1070 | } | |
1071 | } | |
1072 | ||
1073 | if vol_meta is None: | |
1074 | vol_meta = { | |
1075 | 'auths': auth | |
1076 | } | |
1077 | else: | |
1078 | vol_meta['auths'].update(auth) | |
1079 | self._volume_metadata_set(volume_path, vol_meta) | |
1080 | ||
f91f0fd5 | 1081 | key = self._authorize_ceph(volume_path, auth_id, readonly, existing_caps) |
7c673cae FG |
1082 | |
1083 | vol_meta['auths'][auth_id]['dirty'] = False | |
1084 | self._volume_metadata_set(volume_path, vol_meta) | |
1085 | ||
1086 | return key | |
1087 | ||
f91f0fd5 | 1088 | def _authorize_ceph(self, volume_path, auth_id, readonly, existing_caps): |
7c673cae FG |
1089 | path = self._get_path(volume_path) |
1090 | log.debug("Authorizing Ceph id '{0}' for path '{1}'".format( | |
1091 | auth_id, path | |
1092 | )) | |
1093 | ||
1094 | # First I need to work out what the data pool is for this share: | |
1095 | # read the layout | |
1096 | pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool") | |
28e407b8 AA |
1097 | |
1098 | try: | |
91327a77 AA |
1099 | namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_" |
1100 | "namespace").decode() | |
28e407b8 AA |
1101 | except cephfs.NoData: |
1102 | namespace = None | |
7c673cae FG |
1103 | |
1104 | # Now construct auth capabilities that give the guest just enough | |
1105 | # permissions to access the share | |
1106 | client_entity = "client.{0}".format(auth_id) | |
1107 | want_access_level = 'r' if readonly else 'rw' | |
1108 | want_mds_cap = 'allow {0} path={1}'.format(want_access_level, path) | |
28e407b8 AA |
1109 | if namespace: |
1110 | want_osd_cap = 'allow {0} pool={1} namespace={2}'.format( | |
1111 | want_access_level, pool_name, namespace) | |
1112 | else: | |
1113 | want_osd_cap = 'allow {0} pool={1}'.format(want_access_level, | |
1114 | pool_name) | |
7c673cae | 1115 | |
f91f0fd5 | 1116 | if existing_caps is None: |
7c673cae FG |
1117 | caps = self._rados_command( |
1118 | 'auth get-or-create', | |
1119 | { | |
1120 | 'entity': client_entity, | |
1121 | 'caps': [ | |
1122 | 'mds', want_mds_cap, | |
1123 | 'osd', want_osd_cap, | |
1124 | 'mon', 'allow r'] | |
1125 | }) | |
1126 | else: | |
1127 | # entity exists, update it | |
f91f0fd5 | 1128 | cap = existing_caps[0] |
7c673cae FG |
1129 | |
1130 | # Construct auth caps that if present might conflict with the desired | |
1131 | # auth caps. | |
f6b5b4d7 | 1132 | unwanted_access_level = 'r' if want_access_level == 'rw' else 'rw' |
7c673cae | 1133 | unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, path) |
28e407b8 AA |
1134 | if namespace: |
1135 | unwanted_osd_cap = 'allow {0} pool={1} namespace={2}'.format( | |
1136 | unwanted_access_level, pool_name, namespace) | |
1137 | else: | |
1138 | unwanted_osd_cap = 'allow {0} pool={1}'.format( | |
1139 | unwanted_access_level, pool_name) | |
1140 | ||
1141 | def cap_update( | |
1142 | orig_mds_caps, orig_osd_caps, want_mds_cap, | |
1143 | want_osd_cap, unwanted_mds_cap, unwanted_osd_cap): | |
7c673cae | 1144 | |
28e407b8 AA |
1145 | if not orig_mds_caps: |
1146 | return want_mds_cap, want_osd_cap | |
7c673cae | 1147 | |
f91f0fd5 TL |
1148 | mds_cap_tokens = [x.strip() for x in orig_mds_caps.split(",")] |
1149 | osd_cap_tokens = [x.strip() for x in orig_osd_caps.split(",")] | |
3efd9988 | 1150 | |
28e407b8 AA |
1151 | if want_mds_cap in mds_cap_tokens: |
1152 | return orig_mds_caps, orig_osd_caps | |
7c673cae | 1153 | |
28e407b8 AA |
1154 | if unwanted_mds_cap in mds_cap_tokens: |
1155 | mds_cap_tokens.remove(unwanted_mds_cap) | |
1156 | osd_cap_tokens.remove(unwanted_osd_cap) | |
7c673cae | 1157 | |
28e407b8 AA |
1158 | mds_cap_tokens.append(want_mds_cap) |
1159 | osd_cap_tokens.append(want_osd_cap) | |
7c673cae | 1160 | |
28e407b8 AA |
1161 | return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens) |
1162 | ||
1163 | orig_mds_caps = cap['caps'].get('mds', "") | |
1164 | orig_osd_caps = cap['caps'].get('osd', "") | |
1165 | ||
1166 | mds_cap_str, osd_cap_str = cap_update( | |
1167 | orig_mds_caps, orig_osd_caps, want_mds_cap, want_osd_cap, | |
1168 | unwanted_mds_cap, unwanted_osd_cap) | |
7c673cae | 1169 | |
f91f0fd5 | 1170 | caps_list = self._prepare_updated_caps_list(cap, mds_cap_str, osd_cap_str) |
7c673cae FG |
1171 | caps = self._rados_command( |
1172 | 'auth caps', | |
1173 | { | |
1174 | 'entity': client_entity, | |
f91f0fd5 | 1175 | 'caps': caps_list |
7c673cae | 1176 | }) |
f91f0fd5 | 1177 | |
7c673cae FG |
1178 | caps = self._rados_command( |
1179 | 'auth get', | |
1180 | { | |
1181 | 'entity': client_entity | |
1182 | } | |
1183 | ) | |
1184 | ||
1185 | # Result expected like this: | |
1186 | # [ | |
1187 | # { | |
1188 | # "entity": "client.foobar", | |
1189 | # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==", | |
1190 | # "caps": { | |
1191 | # "mds": "allow *", | |
1192 | # "mon": "allow *" | |
1193 | # } | |
1194 | # } | |
1195 | # ] | |
1196 | assert len(caps) == 1 | |
1197 | assert caps[0]['entity'] == client_entity | |
1198 | return caps[0]['key'] | |
1199 | ||
1200 | def deauthorize(self, volume_path, auth_id): | |
1201 | with self._auth_lock(auth_id): | |
1202 | # Existing meta, or None, to be updated | |
1203 | auth_meta = self._auth_metadata_get(auth_id) | |
1204 | ||
cd265ab1 TL |
1205 | # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key |
1206 | if auth_meta and 'volumes' in auth_meta: | |
1207 | auth_meta['subvolumes'] = auth_meta.pop('volumes') | |
1208 | ||
7c673cae | 1209 | volume_path_str = str(volume_path) |
cd265ab1 | 1210 | if (auth_meta is None) or (not auth_meta['subvolumes']): |
e306af50 | 1211 | log.warning("deauthorized called for already-removed auth" |
7c673cae FG |
1212 | "ID '{auth_id}' for volume ID '{volume}'".format( |
1213 | auth_id=auth_id, volume=volume_path.volume_id | |
1214 | )) | |
1215 | # Clean up the auth meta file of an auth ID | |
1216 | self.fs.unlink(self._auth_metadata_path(auth_id)) | |
1217 | return | |
1218 | ||
cd265ab1 | 1219 | if volume_path_str not in auth_meta['subvolumes']: |
e306af50 | 1220 | log.warning("deauthorized called for already-removed auth" |
7c673cae FG |
1221 | "ID '{auth_id}' for volume ID '{volume}'".format( |
1222 | auth_id=auth_id, volume=volume_path.volume_id | |
1223 | )) | |
1224 | return | |
1225 | ||
1226 | if auth_meta['dirty']: | |
1227 | self._recover_auth_meta(auth_id, auth_meta) | |
1228 | ||
1229 | auth_meta['dirty'] = True | |
cd265ab1 | 1230 | auth_meta['subvolumes'][volume_path_str]['dirty'] = True |
7c673cae FG |
1231 | self._auth_metadata_set(auth_id, auth_meta) |
1232 | ||
1233 | self._deauthorize_volume(volume_path, auth_id) | |
1234 | ||
1235 | # Filter out the volume we're deauthorizing | |
cd265ab1 | 1236 | del auth_meta['subvolumes'][volume_path_str] |
7c673cae FG |
1237 | |
1238 | # Clean up auth meta file | |
cd265ab1 | 1239 | if not auth_meta['subvolumes']: |
7c673cae FG |
1240 | self.fs.unlink(self._auth_metadata_path(auth_id)) |
1241 | return | |
1242 | ||
1243 | auth_meta['dirty'] = False | |
1244 | self._auth_metadata_set(auth_id, auth_meta) | |
1245 | ||
1246 | def _deauthorize_volume(self, volume_path, auth_id): | |
1247 | with self._volume_lock(volume_path): | |
1248 | vol_meta = self._volume_metadata_get(volume_path) | |
1249 | ||
1250 | if (vol_meta is None) or (auth_id not in vol_meta['auths']): | |
e306af50 | 1251 | log.warning("deauthorized called for already-removed auth" |
7c673cae FG |
1252 | "ID '{auth_id}' for volume ID '{volume}'".format( |
1253 | auth_id=auth_id, volume=volume_path.volume_id | |
1254 | )) | |
1255 | return | |
1256 | ||
1257 | vol_meta['auths'][auth_id]['dirty'] = True | |
1258 | self._volume_metadata_set(volume_path, vol_meta) | |
1259 | ||
1260 | self._deauthorize(volume_path, auth_id) | |
1261 | ||
1262 | # Remove the auth_id from the metadata *after* removing it | |
1263 | # from ceph, so that if we crashed here, we would actually | |
1264 | # recreate the auth ID during recovery (i.e. end up with | |
1265 | # a consistent state). | |
1266 | ||
1267 | # Filter out the auth we're removing | |
1268 | del vol_meta['auths'][auth_id] | |
1269 | self._volume_metadata_set(volume_path, vol_meta) | |
1270 | ||
1271 | def _deauthorize(self, volume_path, auth_id): | |
1272 | """ | |
1273 | The volume must still exist. | |
1274 | """ | |
1275 | client_entity = "client.{0}".format(auth_id) | |
1276 | path = self._get_path(volume_path) | |
1277 | pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool") | |
28e407b8 | 1278 | try: |
91327a77 AA |
1279 | namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_" |
1280 | "namespace").decode() | |
28e407b8 AA |
1281 | except cephfs.NoData: |
1282 | namespace = None | |
7c673cae FG |
1283 | |
1284 | # The auth_id might have read-only or read-write mount access for the | |
1285 | # volume path. | |
1286 | access_levels = ('r', 'rw') | |
28e407b8 AA |
1287 | want_mds_caps = ['allow {0} path={1}'.format(access_level, path) |
1288 | for access_level in access_levels] | |
1289 | if namespace: | |
1290 | want_osd_caps = ['allow {0} pool={1} namespace={2}'.format(access_level, pool_name, namespace) | |
1291 | for access_level in access_levels] | |
1292 | else: | |
1293 | want_osd_caps = ['allow {0} pool={1}'.format(access_level, pool_name) | |
1294 | for access_level in access_levels] | |
1295 | ||
7c673cae FG |
1296 | |
1297 | try: | |
1298 | existing = self._rados_command( | |
1299 | 'auth get', | |
1300 | { | |
1301 | 'entity': client_entity | |
1302 | } | |
1303 | ) | |
1304 | ||
28e407b8 | 1305 | def cap_remove(orig_mds_caps, orig_osd_caps, want_mds_caps, want_osd_caps): |
f91f0fd5 TL |
1306 | mds_cap_tokens = [x.strip() for x in orig_mds_caps.split(",")] |
1307 | osd_cap_tokens = [x.strip() for x in orig_osd_caps.split(",")] | |
28e407b8 AA |
1308 | |
1309 | for want_mds_cap, want_osd_cap in zip(want_mds_caps, want_osd_caps): | |
1310 | if want_mds_cap in mds_cap_tokens: | |
1311 | mds_cap_tokens.remove(want_mds_cap) | |
1312 | osd_cap_tokens.remove(want_osd_cap) | |
1313 | break | |
1314 | ||
1315 | return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens) | |
7c673cae FG |
1316 | |
1317 | cap = existing[0] | |
28e407b8 AA |
1318 | orig_mds_caps = cap['caps'].get('mds', "") |
1319 | orig_osd_caps = cap['caps'].get('osd', "") | |
1320 | mds_cap_str, osd_cap_str = cap_remove(orig_mds_caps, orig_osd_caps, | |
1321 | want_mds_caps, want_osd_caps) | |
1322 | ||
f91f0fd5 TL |
1323 | caps_list = self._prepare_updated_caps_list(cap, mds_cap_str, osd_cap_str, authorize=False) |
1324 | if not caps_list: | |
7c673cae FG |
1325 | self._rados_command('auth del', {'entity': client_entity}, decode=False) |
1326 | else: | |
1327 | self._rados_command( | |
1328 | 'auth caps', | |
1329 | { | |
1330 | 'entity': client_entity, | |
f91f0fd5 | 1331 | 'caps': caps_list |
7c673cae FG |
1332 | }) |
1333 | ||
1334 | # FIXME: rados raising Error instead of ObjectNotFound in auth get failure | |
1335 | except rados.Error: | |
1336 | # Already gone, great. | |
1337 | return | |
1338 | ||
1339 | def get_authorized_ids(self, volume_path): | |
1340 | """ | |
1341 | Expose a list of auth IDs that have access to a volume. | |
1342 | ||
1343 | return: a list of (auth_id, access_level) tuples, where | |
1344 | the access_level can be 'r' , or 'rw'. | |
1345 | None if no auth ID is given access to the volume. | |
1346 | """ | |
1347 | with self._volume_lock(volume_path): | |
1348 | meta = self._volume_metadata_get(volume_path) | |
1349 | auths = [] | |
1350 | if not meta or not meta['auths']: | |
1351 | return None | |
1352 | ||
1353 | for auth, auth_data in meta['auths'].items(): | |
1354 | # Skip partial auth updates. | |
1355 | if not auth_data['dirty']: | |
1356 | auths.append((auth, auth_data['access_level'])) | |
1357 | ||
1358 | return auths | |
1359 | ||
1360 | def _rados_command(self, prefix, args=None, decode=True): | |
1361 | """ | |
1362 | Safer wrapper for ceph_argparse.json_command, which raises | |
1363 | Error exception instead of relying on caller to check return | |
1364 | codes. | |
1365 | ||
1366 | Error exception can result from: | |
1367 | * Timeout | |
1368 | * Actual legitimate errors | |
1369 | * Malformed JSON output | |
1370 | ||
1371 | return: Decoded object from ceph, or None if empty string returned. | |
1372 | If decode is False, return a string (the data returned by | |
1373 | ceph command) | |
1374 | """ | |
1375 | if args is None: | |
1376 | args = {} | |
1377 | ||
1378 | argdict = args.copy() | |
1379 | argdict['format'] = 'json' | |
1380 | ||
1381 | ret, outbuf, outs = json_command(self.rados, | |
1382 | prefix=prefix, | |
1383 | argdict=argdict, | |
1384 | timeout=RADOS_TIMEOUT) | |
1385 | if ret != 0: | |
1386 | raise rados.Error(outs) | |
1387 | else: | |
1388 | if decode: | |
1389 | if outbuf: | |
1390 | try: | |
91327a77 | 1391 | return json.loads(outbuf.decode()) |
7c673cae FG |
1392 | except (ValueError, TypeError): |
1393 | raise RadosError("Invalid JSON output for command {0}".format(argdict)) | |
1394 | else: | |
1395 | return None | |
1396 | else: | |
1397 | return outbuf | |
1398 | ||
1399 | def get_used_bytes(self, volume_path): | |
91327a77 AA |
1400 | return int(self.fs.getxattr(self._get_path(volume_path), "ceph.dir." |
1401 | "rbytes").decode()) | |
7c673cae FG |
1402 | |
1403 | def set_max_bytes(self, volume_path, max_bytes): | |
1404 | self.fs.setxattr(self._get_path(volume_path), 'ceph.quota.max_bytes', | |
91327a77 | 1405 | to_bytes(max_bytes if max_bytes else 0), 0) |
7c673cae FG |
1406 | |
1407 | def _snapshot_path(self, dir_path, snapshot_name): | |
1408 | return os.path.join( | |
3efd9988 | 1409 | dir_path, self.rados.conf_get('client_snapdir'), snapshot_name |
7c673cae FG |
1410 | ) |
1411 | ||
f64942e4 | 1412 | def _snapshot_create(self, dir_path, snapshot_name, mode=0o755): |
7c673cae | 1413 | # TODO: raise intelligible exception for clusters where snaps are disabled |
f64942e4 | 1414 | self.fs.mkdir(self._snapshot_path(dir_path, snapshot_name), mode) |
7c673cae FG |
1415 | |
1416 | def _snapshot_destroy(self, dir_path, snapshot_name): | |
1417 | """ | |
1418 | Remove a snapshot, or do nothing if it already doesn't exist. | |
1419 | """ | |
1420 | try: | |
1421 | self.fs.rmdir(self._snapshot_path(dir_path, snapshot_name)) | |
1422 | except cephfs.ObjectNotFound: | |
e306af50 | 1423 | log.warning("Snapshot was already gone: {0}".format(snapshot_name)) |
7c673cae | 1424 | |
f64942e4 AA |
1425 | def create_snapshot_volume(self, volume_path, snapshot_name, mode=0o755): |
1426 | self._snapshot_create(self._get_path(volume_path), snapshot_name, mode) | |
7c673cae FG |
1427 | |
1428 | def destroy_snapshot_volume(self, volume_path, snapshot_name): | |
1429 | self._snapshot_destroy(self._get_path(volume_path), snapshot_name) | |
1430 | ||
f64942e4 | 1431 | def create_snapshot_group(self, group_id, snapshot_name, mode=0o755): |
7c673cae FG |
1432 | if group_id is None: |
1433 | raise RuntimeError("Group ID may not be None") | |
1434 | ||
f64942e4 AA |
1435 | return self._snapshot_create(self._get_group_path(group_id), snapshot_name, |
1436 | mode) | |
7c673cae FG |
1437 | |
1438 | def destroy_snapshot_group(self, group_id, snapshot_name): | |
1439 | if group_id is None: | |
1440 | raise RuntimeError("Group ID may not be None") | |
1441 | if snapshot_name is None: | |
1442 | raise RuntimeError("Snapshot name may not be None") | |
1443 | ||
1444 | return self._snapshot_destroy(self._get_group_path(group_id), snapshot_name) | |
1445 | ||
1446 | def _cp_r(self, src, dst): | |
1447 | # TODO | |
1448 | raise NotImplementedError() | |
1449 | ||
1450 | def clone_volume_to_existing(self, dest_volume_path, src_volume_path, src_snapshot_name): | |
1451 | dest_fs_path = self._get_path(dest_volume_path) | |
1452 | src_snapshot_path = self._snapshot_path(self._get_path(src_volume_path), src_snapshot_name) | |
1453 | ||
1454 | self._cp_r(src_snapshot_path, dest_fs_path) | |
3efd9988 FG |
1455 | |
1456 | def put_object(self, pool_name, object_name, data): | |
1457 | """ | |
1458 | Synchronously write data to an object. | |
1459 | ||
1460 | :param pool_name: name of the pool | |
1461 | :type pool_name: str | |
1462 | :param object_name: name of the object | |
1463 | :type object_name: str | |
1464 | :param data: data to write | |
1465 | :type data: bytes | |
1466 | """ | |
91327a77 AA |
1467 | return self.put_object_versioned(pool_name, object_name, data) |
1468 | ||
1469 | def put_object_versioned(self, pool_name, object_name, data, version=None): | |
1470 | """ | |
1471 | Synchronously write data to an object only if version of the object | |
1472 | version matches the expected version. | |
1473 | ||
1474 | :param pool_name: name of the pool | |
1475 | :type pool_name: str | |
1476 | :param object_name: name of the object | |
1477 | :type object_name: str | |
1478 | :param data: data to write | |
1479 | :type data: bytes | |
1480 | :param version: expected version of the object to write | |
1481 | :type version: int | |
1482 | """ | |
3efd9988 | 1483 | ioctx = self.rados.open_ioctx(pool_name) |
91327a77 | 1484 | |
3efd9988 FG |
1485 | max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024 |
1486 | if len(data) > max_size: | |
1487 | msg = ("Data to be written to object '{0}' exceeds " | |
1488 | "{1} bytes".format(object_name, max_size)) | |
1489 | log.error(msg) | |
1490 | raise CephFSVolumeClientError(msg) | |
91327a77 | 1491 | |
3efd9988 | 1492 | try: |
81eedcae | 1493 | with rados.WriteOpCtx() as wop: |
91327a77 AA |
1494 | if version is not None: |
1495 | wop.assert_version(version) | |
1496 | wop.write_full(data) | |
1497 | ioctx.operate_write_op(wop, object_name) | |
1498 | except rados.OSError as e: | |
1499 | log.error(e) | |
1500 | raise e | |
3efd9988 FG |
1501 | finally: |
1502 | ioctx.close() | |
1503 | ||
1504 | def get_object(self, pool_name, object_name): | |
1505 | """ | |
1506 | Synchronously read data from object. | |
1507 | ||
1508 | :param pool_name: name of the pool | |
1509 | :type pool_name: str | |
1510 | :param object_name: name of the object | |
1511 | :type object_name: str | |
1512 | ||
1513 | :returns: bytes - data read from object | |
1514 | """ | |
91327a77 AA |
1515 | return self.get_object_and_version(pool_name, object_name)[0] |
1516 | ||
1517 | def get_object_and_version(self, pool_name, object_name): | |
1518 | """ | |
1519 | Synchronously read data from object and get its version. | |
1520 | ||
1521 | :param pool_name: name of the pool | |
1522 | :type pool_name: str | |
1523 | :param object_name: name of the object | |
1524 | :type object_name: str | |
1525 | ||
1526 | :returns: tuple of object data and version | |
1527 | """ | |
3efd9988 FG |
1528 | ioctx = self.rados.open_ioctx(pool_name) |
1529 | max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024 | |
1530 | try: | |
1531 | bytes_read = ioctx.read(object_name, max_size) | |
1532 | if ((len(bytes_read) == max_size) and | |
1533 | (ioctx.read(object_name, 1, offset=max_size))): | |
1534 | log.warning("Size of object {0} exceeds '{1}' bytes " | |
1535 | "read".format(object_name, max_size)) | |
91327a77 | 1536 | obj_version = ioctx.get_last_version() |
3efd9988 FG |
1537 | finally: |
1538 | ioctx.close() | |
91327a77 | 1539 | return (bytes_read, obj_version) |
3efd9988 FG |
1540 | |
1541 | def delete_object(self, pool_name, object_name): | |
1542 | ioctx = self.rados.open_ioctx(pool_name) | |
1543 | try: | |
1544 | ioctx.remove_object(object_name) | |
1545 | except rados.ObjectNotFound: | |
e306af50 | 1546 | log.warning("Object '{0}' was already removed".format(object_name)) |
3efd9988 FG |
1547 | finally: |
1548 | ioctx.close() |