]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/ceph_volume_client.py
import ceph pacific 16.2.5
[ceph.git] / ceph / src / pybind / ceph_volume_client.py
CommitLineData
7c673cae
FG
1"""
2Copyright (C) 2015 Red Hat, Inc.
3
9f95a23c 4LGPL-2.1 or LGPL-3.0. See file COPYING.
7c673cae
FG
5"""
6
7from contextlib import contextmanager
8import errno
9import fcntl
10import json
11import logging
12import os
13import re
14import struct
15import sys
16import threading
17import time
18import uuid
19
20from ceph_argparse import json_command
21
22import cephfs
23import rados
24
91327a77
AA
25def to_bytes(param):
26 '''
27 Helper method that returns byte representation of the given parameter.
28 '''
29 if isinstance(param, str):
eafe8130
TL
30 return param.encode('utf-8')
31 elif param is None:
32 return param
91327a77 33 else:
eafe8130 34 return str(param).encode('utf-8')
7c673cae
FG
35
36class RadosError(Exception):
37 """
38 Something went wrong talking to Ceph with librados
39 """
40 pass
41
42
43RADOS_TIMEOUT = 10
7c673cae
FG
44
45log = logging.getLogger(__name__)
46
7c673cae
FG
47# Reserved volume group name which we use in paths for volumes
48# that are not assigned to a group (i.e. created with group=None)
49NO_GROUP_NAME = "_nogroup"
50
51# Filename extensions for meta files.
52META_FILE_EXT = ".meta"
53
54class VolumePath(object):
55 """
56 Identify a volume's path as group->volume
57 The Volume ID is a unique identifier, but this is a much more
58 helpful thing to pass around.
59 """
60 def __init__(self, group_id, volume_id):
61 self.group_id = group_id
62 self.volume_id = volume_id
63 assert self.group_id != NO_GROUP_NAME
64 assert self.volume_id != "" and self.volume_id is not None
65
66 def __str__(self):
67 return "{0}/{1}".format(self.group_id, self.volume_id)
68
69
70class ClusterTimeout(Exception):
71 """
72 Exception indicating that we timed out trying to talk to the Ceph cluster,
73 either to the mons, or to any individual daemon that the mons indicate ought
74 to be up but isn't responding to us.
75 """
76 pass
77
78
79class ClusterError(Exception):
80 """
81 Exception indicating that the cluster returned an error to a command that
82 we thought should be successful based on our last knowledge of the cluster
83 state.
84 """
85 def __init__(self, action, result_code, result_str):
86 self._action = action
87 self._result_code = result_code
88 self._result_str = result_str
89
90 def __str__(self):
91 return "Error {0} (\"{1}\") while {2}".format(
92 self._result_code, self._result_str, self._action)
93
94
95class RankEvicter(threading.Thread):
96 """
97 Thread for evicting client(s) from a particular MDS daemon instance.
98
99 This is more complex than simply sending a command, because we have to
100 handle cases where MDS daemons might not be fully up yet, and/or might
101 be transiently unresponsive to commands.
102 """
103 class GidGone(Exception):
104 pass
105
106 POLL_PERIOD = 5
107
108 def __init__(self, volume_client, client_spec, rank, gid, mds_map, ready_timeout):
109 """
110 :param client_spec: list of strings, used as filter arguments to "session evict"
111 pass ["id=123"] to evict a single client with session id 123.
112 """
113 self.rank = rank
114 self.gid = gid
115 self._mds_map = mds_map
116 self._client_spec = client_spec
117 self._volume_client = volume_client
118 self._ready_timeout = ready_timeout
119 self._ready_waited = 0
120
121 self.success = False
122 self.exception = None
123
124 super(RankEvicter, self).__init__()
125
126 def _ready_to_evict(self):
127 if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid:
128 log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format(
129 self._client_spec, self.rank, self.gid
130 ))
131 raise RankEvicter.GidGone()
132
133 info = self._mds_map['info']["gid_{0}".format(self.gid)]
134 log.debug("_ready_to_evict: state={0}".format(info['state']))
135 return info['state'] in ["up:active", "up:clientreplay"]
136
137 def _wait_for_ready(self):
138 """
139 Wait for that MDS rank to reach an active or clientreplay state, and
140 not be laggy.
141 """
142 while not self._ready_to_evict():
143 if self._ready_waited > self._ready_timeout:
144 raise ClusterTimeout()
145
146 time.sleep(self.POLL_PERIOD)
147 self._ready_waited += self.POLL_PERIOD
148
11fdf7f2 149 self._mds_map = self._volume_client.get_mds_map()
7c673cae
FG
150
151 def _evict(self):
152 """
153 Run the eviction procedure. Return true on success, false on errors.
154 """
155
156 # Wait til the MDS is believed by the mon to be available for commands
157 try:
158 self._wait_for_ready()
159 except self.GidGone:
160 return True
161
162 # Then send it an evict
163 ret = errno.ETIMEDOUT
164 while ret == errno.ETIMEDOUT:
165 log.debug("mds_command: {0}, {1}".format(
166 "%s" % self.gid, ["session", "evict"] + self._client_spec
167 ))
168 ret, outb, outs = self._volume_client.fs.mds_command(
169 "%s" % self.gid,
cd265ab1 170 json.dumps({
7c673cae
FG
171 "prefix": "session evict",
172 "filters": self._client_spec
cd265ab1 173 }), "")
7c673cae
FG
174 log.debug("mds_command: complete {0} {1}".format(ret, outs))
175
176 # If we get a clean response, great, it's gone from that rank.
177 if ret == 0:
178 return True
179 elif ret == errno.ETIMEDOUT:
180 # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error)
11fdf7f2 181 self._mds_map = self._volume_client.get_mds_map()
7c673cae
FG
182 try:
183 self._wait_for_ready()
184 except self.GidGone:
185 return True
186 else:
187 raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs)
188
189 def run(self):
190 try:
191 self._evict()
192 except Exception as e:
193 self.success = False
194 self.exception = e
195 else:
196 self.success = True
197
198
199class EvictionError(Exception):
200 pass
201
202
203class CephFSVolumeClientError(Exception):
204 """
205 Something went wrong talking to Ceph using CephFSVolumeClient.
206 """
207 pass
208
209
210CEPHFSVOLUMECLIENT_VERSION_HISTORY = """
211
212 CephFSVolumeClient Version History:
213
214 * 1 - Initial version
3efd9988 215 * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient
28e407b8 216 * 3 - Allow volumes to be created without RADOS namespace isolation
91327a77 217 * 4 - Added get_object_and_version, put_object_versioned method to CephFSVolumeClient
f91f0fd5 218 * 5 - Disallow authorize API for users not created by CephFSVolumeClient
cd265ab1 219 * 6 - The 'volumes' key in auth-metadata-file is changed to 'subvolumes'.
7c673cae
FG
220"""
221
222
223class CephFSVolumeClient(object):
224 """
225 Combine libcephfs and librados interfaces to implement a
226 'Volume' concept implemented as a cephfs directory and
227 client capabilities which restrict mount access to this
228 directory.
229
230 Additionally, volumes may be in a 'Group'. Conveniently,
231 volumes are a lot like manila shares, and groups are a lot
232 like manila consistency groups.
233
234 Refer to volumes with VolumePath, which specifies the
235 volume and group IDs (both strings). The group ID may
236 be None.
237
238 In general, functions in this class are allowed raise rados.Error
239 or cephfs.Error exceptions in unexpected situations.
240 """
241
242 # Current version
cd265ab1 243 version = 6
7c673cae
FG
244
245 # Where shall we create our volumes?
246 POOL_PREFIX = "fsvolume_"
247 DEFAULT_VOL_PREFIX = "/volumes"
248 DEFAULT_NS_PREFIX = "fsvolumens_"
249
11fdf7f2
TL
250 def __init__(self, auth_id=None, conf_path=None, cluster_name=None,
251 volume_prefix=None, pool_ns_prefix=None, rados=None,
252 fs_name=None):
253 """
254 Either set all three of ``auth_id``, ``conf_path`` and
255 ``cluster_name`` (rados constructed on connect), or
256 set ``rados`` (existing rados instance).
257 """
7c673cae 258 self.fs = None
11fdf7f2 259 self.fs_name = fs_name
7c673cae 260 self.connected = False
11fdf7f2 261
7c673cae
FG
262 self.conf_path = conf_path
263 self.cluster_name = cluster_name
264 self.auth_id = auth_id
11fdf7f2
TL
265
266 self.rados = rados
267 if self.rados:
268 # Using an externally owned rados, so we won't tear it down
269 # on disconnect
270 self.own_rados = False
271 else:
272 # self.rados will be constructed in connect
273 self.own_rados = True
274
7c673cae
FG
275 self.volume_prefix = volume_prefix if volume_prefix else self.DEFAULT_VOL_PREFIX
276 self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else self.DEFAULT_NS_PREFIX
277 # For flock'ing in cephfs, I want a unique ID to distinguish me
278 # from any other manila-share services that are loading this module.
279 # We could use pid, but that's unnecessary weak: generate a
280 # UUID
91327a77 281 self._id = struct.unpack(">Q", uuid.uuid1().bytes[0:8])[0]
7c673cae
FG
282
283 # TODO: version the on-disk structures
284
285 def recover(self):
286 # Scan all auth keys to see if they're dirty: if they are, they have
287 # state that might not have propagated to Ceph or to the related
288 # volumes yet.
289
290 # Important: we *always* acquire locks in the order auth->volume
291 # That means a volume can never be dirty without the auth key
292 # we're updating it with being dirty at the same time.
293
294 # First list the auth IDs that have potentially dirty on-disk metadata
295 log.debug("Recovering from partial auth updates (if any)...")
296
297 try:
298 dir_handle = self.fs.opendir(self.volume_prefix)
299 except cephfs.ObjectNotFound:
300 log.debug("Nothing to recover. No auth meta files.")
301 return
302
303 d = self.fs.readdir(dir_handle)
304 auth_ids = []
305
306 if not d:
307 log.debug("Nothing to recover. No auth meta files.")
308
309 while d:
310 # Identify auth IDs from auth meta filenames. The auth meta files
311 # are named as, "$<auth_id><meta filename extension>"
312 regex = "^\$(.*){0}$".format(re.escape(META_FILE_EXT))
494da23a 313 match = re.search(regex, d.d_name.decode(encoding='utf-8'))
7c673cae
FG
314 if match:
315 auth_ids.append(match.group(1))
316
317 d = self.fs.readdir(dir_handle)
318
319 self.fs.closedir(dir_handle)
320
321 # Key points based on ordering:
322 # * Anything added in VMeta is already added in AMeta
323 # * Anything added in Ceph is already added in VMeta
324 # * Anything removed in VMeta is already removed in Ceph
325 # * Anything removed in AMeta is already removed in VMeta
326
327 # Deauthorization: because I only update metadata AFTER the
328 # update of the next level down, I have the same ordering of
329 # -> things which exist in the AMeta should also exist
330 # in the VMeta, should also exist in Ceph, and the same
331 # recovery procedure that gets me consistent after crashes
332 # during authorization will also work during deauthorization
333
334 # Now for each auth ID, check for dirty flag and apply updates
335 # if dirty flag is found
336 for auth_id in auth_ids:
337 with self._auth_lock(auth_id):
338 auth_meta = self._auth_metadata_get(auth_id)
cd265ab1
TL
339 # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key
340 if auth_meta and 'volumes' in auth_meta:
341 auth_meta['subvolumes'] = auth_meta.pop('volumes')
342 if not auth_meta or not auth_meta['subvolumes']:
7c673cae
FG
343 # Clean up auth meta file
344 self.fs.unlink(self._auth_metadata_path(auth_id))
345 continue
346 if not auth_meta['dirty']:
347 continue
348 self._recover_auth_meta(auth_id, auth_meta)
349
350 log.debug("Recovered from partial auth updates (if any).")
351
352 def _recover_auth_meta(self, auth_id, auth_meta):
353 """
354 Call me after locking the auth meta file.
355 """
356 remove_volumes = []
357
cd265ab1 358 for volume, volume_data in auth_meta['subvolumes'].items():
7c673cae
FG
359 if not volume_data['dirty']:
360 continue
361
362 (group_id, volume_id) = volume.split('/')
f6b5b4d7 363 group_id = group_id if group_id != 'None' else None
7c673cae
FG
364 volume_path = VolumePath(group_id, volume_id)
365 access_level = volume_data['access_level']
366
367 with self._volume_lock(volume_path):
368 vol_meta = self._volume_metadata_get(volume_path)
369
370 # No VMeta update indicates that there was no auth update
371 # in Ceph either. So it's safe to remove corresponding
372 # partial update in AMeta.
373 if not vol_meta or auth_id not in vol_meta['auths']:
374 remove_volumes.append(volume)
375 continue
376
377 want_auth = {
378 'access_level': access_level,
379 'dirty': False,
380 }
381 # VMeta update looks clean. Ceph auth update must have been
382 # clean.
383 if vol_meta['auths'][auth_id] == want_auth:
cd265ab1
TL
384 auth_meta['subvolumes'][volume]['dirty'] = False
385 self._auth_metadata_set(auth_id, auth_meta)
7c673cae
FG
386 continue
387
f6b5b4d7 388 readonly = access_level == 'r'
f91f0fd5
TL
389 client_entity = "client.{0}".format(auth_id)
390 try:
391 existing_caps = self._rados_command(
392 'auth get',
393 {
394 'entity': client_entity
395 }
396 )
397 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
398 except rados.Error:
399 existing_caps = None
400 self._authorize_volume(volume_path, auth_id, readonly, existing_caps)
7c673cae
FG
401
402 # Recovered from partial auth updates for the auth ID's access
403 # to a volume.
cd265ab1 404 auth_meta['subvolumes'][volume]['dirty'] = False
7c673cae
FG
405 self._auth_metadata_set(auth_id, auth_meta)
406
407 for volume in remove_volumes:
cd265ab1 408 del auth_meta['subvolumes'][volume]
7c673cae 409
cd265ab1 410 if not auth_meta['subvolumes']:
7c673cae
FG
411 # Clean up auth meta file
412 self.fs.unlink(self._auth_metadata_path(auth_id))
413 return
414
415 # Recovered from all partial auth updates for the auth ID.
416 auth_meta['dirty'] = False
417 self._auth_metadata_set(auth_id, auth_meta)
418
11fdf7f2
TL
419 def get_mds_map(self):
420 fs_map = self._rados_command("fs dump", {})
421 return fs_map['filesystems'][0]['mdsmap']
7c673cae
FG
422
423 def evict(self, auth_id, timeout=30, volume_path=None):
424 """
425 Evict all clients based on the authorization ID and optionally based on
426 the volume path mounted. Assumes that the authorization key has been
427 revoked prior to calling this function.
428
429 This operation can throw an exception if the mon cluster is unresponsive, or
430 any individual MDS daemon is unresponsive for longer than the timeout passed in.
431 """
432
433 client_spec = ["auth_name={0}".format(auth_id), ]
434 if volume_path:
435 client_spec.append("client_metadata.root={0}".
436 format(self._get_path(volume_path)))
437
438 log.info("evict clients with {0}".format(', '.join(client_spec)))
439
11fdf7f2 440 mds_map = self.get_mds_map()
7c673cae
FG
441 up = {}
442 for name, gid in mds_map['up'].items():
443 # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0"
444 assert name.startswith("mds_")
445 up[int(name[4:])] = gid
446
447 # For all MDS ranks held by a daemon
448 # Do the parallelism in python instead of using "tell mds.*", because
449 # the latter doesn't give us per-mds output
450 threads = []
451 for rank, gid in up.items():
452 thread = RankEvicter(self, client_spec, rank, gid, mds_map,
453 timeout)
454 thread.start()
455 threads.append(thread)
456
457 for t in threads:
458 t.join()
459
460 log.info("evict: joined all")
461
462 for t in threads:
463 if not t.success:
464 msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}".
465 format(', '.join(client_spec), t.rank, t.gid, t.exception)
466 )
467 log.error(msg)
468 raise EvictionError(msg)
469
470 def _get_path(self, volume_path):
471 """
472 Determine the path within CephFS where this volume will live
473 :return: absolute path (string)
474 """
475 return os.path.join(
476 self.volume_prefix,
477 volume_path.group_id if volume_path.group_id is not None else NO_GROUP_NAME,
478 volume_path.volume_id)
479
480 def _get_group_path(self, group_id):
481 if group_id is None:
482 raise ValueError("group_id may not be None")
483
484 return os.path.join(
485 self.volume_prefix,
486 group_id
487 )
488
11fdf7f2 489 def _connect(self, premount_evict):
7c673cae
FG
490 log.debug("Connecting to cephfs...")
491 self.fs = cephfs.LibCephFS(rados_inst=self.rados)
492 log.debug("CephFS initializing...")
493 self.fs.init()
494 if premount_evict is not None:
495 log.debug("Premount eviction of {0} starting".format(premount_evict))
496 self.evict(premount_evict)
497 log.debug("Premount eviction of {0} completes".format(premount_evict))
498 log.debug("CephFS mounting...")
eafe8130 499 self.fs.mount(filesystem_name=to_bytes(self.fs_name))
7c673cae
FG
500 log.debug("Connection to cephfs complete")
501
502 # Recover from partial auth updates due to a previous
503 # crash.
504 self.recover()
505
11fdf7f2
TL
506 def connect(self, premount_evict = None):
507 """
508
509 :param premount_evict: Optional auth_id to evict before mounting the filesystem: callers
510 may want to use this to specify their own auth ID if they expect
511 to be a unique instance and don't want to wait for caps to time
512 out after failure of another instance of themselves.
513 """
514 if self.own_rados:
515 log.debug("Configuring to RADOS with config {0}...".format(self.conf_path))
516 self.rados = rados.Rados(
517 name="client.{0}".format(self.auth_id),
518 clustername=self.cluster_name,
519 conffile=self.conf_path,
520 conf={}
521 )
522 if self.rados.state != "connected":
523 log.debug("Connecting to RADOS...")
524 self.rados.connect()
525 log.debug("Connection to RADOS complete")
526 self._connect(premount_evict)
527
7c673cae
FG
528 def get_mon_addrs(self):
529 log.info("get_mon_addrs")
530 result = []
531 mon_map = self._rados_command("mon dump")
532 for mon in mon_map['mons']:
533 ip_port = mon['addr'].split("/")[0]
534 result.append(ip_port)
535
536 return result
537
538 def disconnect(self):
539 log.info("disconnect")
540 if self.fs:
541 log.debug("Disconnecting cephfs...")
542 self.fs.shutdown()
543 self.fs = None
544 log.debug("Disconnecting cephfs complete")
545
11fdf7f2 546 if self.rados and self.own_rados:
7c673cae
FG
547 log.debug("Disconnecting rados...")
548 self.rados.shutdown()
549 self.rados = None
550 log.debug("Disconnecting rados complete")
551
11fdf7f2
TL
552 def __enter__(self):
553 self.connect()
554 return self
555
556 def __exit__(self, exc_type, exc_val, exc_tb):
557 self.disconnect()
558
7c673cae
FG
559 def __del__(self):
560 self.disconnect()
561
562 def _get_pool_id(self, osd_map, pool_name):
563 # Maybe borrow the OSDMap wrapper class from calamari if more helpers
564 # like this are needed.
565 for pool in osd_map['pools']:
566 if pool['pool_name'] == pool_name:
567 return pool['pool']
568
569 return None
570
571 def _create_volume_pool(self, pool_name):
572 """
573 Idempotently create a pool for use as a CephFS data pool, with the given name
574
575 :return The ID of the created pool
576 """
577 osd_map = self._rados_command('osd dump', {})
578
579 existing_id = self._get_pool_id(osd_map, pool_name)
580 if existing_id is not None:
581 log.info("Pool {0} already exists".format(pool_name))
582 return existing_id
583
7c673cae
FG
584 self._rados_command(
585 'osd pool create',
586 {
587 'pool': pool_name,
7c673cae
FG
588 }
589 )
590
591 osd_map = self._rados_command('osd dump', {})
592 pool_id = self._get_pool_id(osd_map, pool_name)
593
594 if pool_id is None:
595 # If the pool isn't there, that's either a ceph bug, or it's some outside influence
596 # removing it right after we created it.
597 log.error("OSD map doesn't contain expected pool '{0}':\n{1}".format(
598 pool_name, json.dumps(osd_map, indent=2)
599 ))
600 raise RuntimeError("Pool '{0}' not present in map after creation".format(pool_name))
601 else:
602 return pool_id
603
f64942e4 604 def create_group(self, group_id, mode=0o755):
7c673cae
FG
605 # Prevent craftily-named volume groups from colliding with the meta
606 # files.
607 if group_id.endswith(META_FILE_EXT):
608 raise ValueError("group ID cannot end with '{0}'.".format(
609 META_FILE_EXT))
610 path = self._get_group_path(group_id)
b3b6e05e 611 self.fs.mkdirs(path, mode)
7c673cae
FG
612
613 def destroy_group(self, group_id):
614 path = self._get_group_path(group_id)
615 try:
616 self.fs.stat(self.volume_prefix)
617 except cephfs.ObjectNotFound:
618 pass
619 else:
620 self.fs.rmdir(path)
621
f64942e4
AA
622 def create_volume(self, volume_path, size=None, data_isolated=False, namespace_isolated=True,
623 mode=0o755):
7c673cae
FG
624 """
625 Set up metadata, pools and auth for a volume.
626
627 This function is idempotent. It is safe to call this again
628 for an already-created volume, even if it is in use.
629
630 :param volume_path: VolumePath instance
631 :param size: In bytes, or None for no size limit
632 :param data_isolated: If true, create a separate OSD pool for this volume
28e407b8 633 :param namespace_isolated: If true, use separate RADOS namespace for this volume
7c673cae
FG
634 :return:
635 """
636 path = self._get_path(volume_path)
637 log.info("create_volume: {0}".format(path))
638
b3b6e05e 639 self.fs.mkdirs(path, mode)
7c673cae
FG
640
641 if size is not None:
91327a77 642 self.fs.setxattr(path, 'ceph.quota.max_bytes', to_bytes(size), 0)
7c673cae
FG
643
644 # data_isolated means create a separate pool for this volume
645 if data_isolated:
646 pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
647 log.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path, pool_name))
648 pool_id = self._create_volume_pool(pool_name)
11fdf7f2 649 mds_map = self.get_mds_map()
7c673cae 650 if pool_id not in mds_map['data_pools']:
11fdf7f2
TL
651 self._rados_command("fs add_data_pool", {
652 'fs_name': mds_map['fs_name'],
7c673cae
FG
653 'pool': pool_name
654 })
91327a77
AA
655 time.sleep(5) # time for MDSMap to be distributed
656 self.fs.setxattr(path, 'ceph.dir.layout.pool', to_bytes(pool_name), 0)
7c673cae 657
28e407b8
AA
658 # enforce security isolation, use separate namespace for this volume
659 if namespace_isolated:
660 namespace = "{0}{1}".format(self.pool_ns_prefix, volume_path.volume_id)
661 log.info("create_volume: {0}, using rados namespace {1} to isolate data.".format(volume_path, namespace))
91327a77
AA
662 self.fs.setxattr(path, 'ceph.dir.layout.pool_namespace',
663 to_bytes(namespace), 0)
28e407b8
AA
664 else:
665 # If volume's namespace layout is not set, then the volume's pool
666 # layout remains unset and will undesirably change with ancestor's
667 # pool layout changes.
668 pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
91327a77
AA
669 self.fs.setxattr(path, 'ceph.dir.layout.pool',
670 to_bytes(pool_name), 0)
7c673cae
FG
671
672 # Create a volume meta file, if it does not already exist, to store
673 # data about auth ids having access to the volume
674 fd = self.fs.open(self._volume_metadata_path(volume_path),
675 os.O_CREAT, 0o755)
676 self.fs.close(fd)
677
678 return {
679 'mount_path': path
680 }
681
682 def delete_volume(self, volume_path, data_isolated=False):
683 """
684 Make a volume inaccessible to guests. This function is
685 idempotent. This is the fast part of tearing down a volume: you must
686 also later call purge_volume, which is the slow part.
687
688 :param volume_path: Same identifier used in create_volume
689 :return:
690 """
691
692 path = self._get_path(volume_path)
693 log.info("delete_volume: {0}".format(path))
694
695 # Create the trash folder if it doesn't already exist
696 trash = os.path.join(self.volume_prefix, "_deleting")
b3b6e05e 697 self.fs.mkdirs(trash, 0o755)
7c673cae
FG
698
699 # We'll move it to here
700 trashed_volume = os.path.join(trash, volume_path.volume_id)
701
702 # Move the volume's data to the trash folder
703 try:
704 self.fs.stat(path)
705 except cephfs.ObjectNotFound:
706 log.warning("Trying to delete volume '{0}' but it's already gone".format(
707 path))
708 else:
709 self.fs.rename(path, trashed_volume)
710
711 # Delete the volume meta file, if it's not already deleted
712 vol_meta_path = self._volume_metadata_path(volume_path)
713 try:
714 self.fs.unlink(vol_meta_path)
715 except cephfs.ObjectNotFound:
716 pass
717
718 def purge_volume(self, volume_path, data_isolated=False):
719 """
720 Finish clearing up a volume that was previously passed to delete_volume. This
721 function is idempotent.
722 """
723
724 trash = os.path.join(self.volume_prefix, "_deleting")
725 trashed_volume = os.path.join(trash, volume_path.volume_id)
726
727 try:
728 self.fs.stat(trashed_volume)
729 except cephfs.ObjectNotFound:
730 log.warning("Trying to purge volume '{0}' but it's already been purged".format(
731 trashed_volume))
732 return
733
734 def rmtree(root_path):
735 log.debug("rmtree {0}".format(root_path))
736 dir_handle = self.fs.opendir(root_path)
737 d = self.fs.readdir(dir_handle)
738 while d:
494da23a
TL
739 d_name = d.d_name.decode(encoding='utf-8')
740 if d_name not in [".", ".."]:
7c673cae
FG
741 # Do not use os.path.join because it is sensitive
742 # to string encoding, we just pass through dnames
743 # as byte arrays
494da23a 744 d_full = u"{0}/{1}".format(root_path, d_name)
7c673cae
FG
745 if d.is_dir():
746 rmtree(d_full)
747 else:
748 self.fs.unlink(d_full)
749
750 d = self.fs.readdir(dir_handle)
751 self.fs.closedir(dir_handle)
752
753 self.fs.rmdir(root_path)
754
755 rmtree(trashed_volume)
756
757 if data_isolated:
758 pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
759 osd_map = self._rados_command("osd dump", {})
760 pool_id = self._get_pool_id(osd_map, pool_name)
11fdf7f2 761 mds_map = self.get_mds_map()
7c673cae 762 if pool_id in mds_map['data_pools']:
11fdf7f2
TL
763 self._rados_command("fs rm_data_pool", {
764 'fs_name': mds_map['fs_name'],
7c673cae
FG
765 'pool': pool_name
766 })
767 self._rados_command("osd pool delete",
768 {
769 "pool": pool_name,
770 "pool2": pool_name,
11fdf7f2 771 "yes_i_really_really_mean_it": True
7c673cae
FG
772 })
773
774 def _get_ancestor_xattr(self, path, attr):
775 """
776 Helper for reading layout information: if this xattr is missing
777 on the requested path, keep checking parents until we find it.
778 """
779 try:
91327a77 780 result = self.fs.getxattr(path, attr).decode()
7c673cae
FG
781 if result == "":
782 # Annoying! cephfs gives us empty instead of an error when attr not found
783 raise cephfs.NoData()
784 else:
785 return result
786 except cephfs.NoData:
787 if path == "/":
788 raise
789 else:
790 return self._get_ancestor_xattr(os.path.split(path)[0], attr)
791
792 def _check_compat_version(self, compat_version):
793 if self.version < compat_version:
794 msg = ("The current version of CephFSVolumeClient, version {0} "
795 "does not support the required feature. Need version {1} "
796 "or greater".format(self.version, compat_version)
797 )
798 log.error(msg)
799 raise CephFSVolumeClientError(msg)
800
801 def _metadata_get(self, path):
802 """
803 Return a deserialized JSON object, or None
804 """
805 fd = self.fs.open(path, "r")
806 # TODO iterate instead of assuming file < 4MB
807 read_bytes = self.fs.read(fd, 0, 4096 * 1024)
808 self.fs.close(fd)
809 if read_bytes:
91327a77 810 return json.loads(read_bytes.decode())
7c673cae
FG
811 else:
812 return None
813
814 def _metadata_set(self, path, data):
815 serialized = json.dumps(data)
816 fd = self.fs.open(path, "w")
817 try:
91327a77 818 self.fs.write(fd, to_bytes(serialized), 0)
7c673cae
FG
819 self.fs.fsync(fd, 0)
820 finally:
821 self.fs.close(fd)
822
823 def _lock(self, path):
824 @contextmanager
825 def fn():
826 while(1):
827 fd = self.fs.open(path, os.O_CREAT, 0o755)
828 self.fs.flock(fd, fcntl.LOCK_EX, self._id)
829
830 # The locked file will be cleaned up sometime. It could be
831 # unlinked e.g., by an another manila share instance, before
832 # lock was applied on it. Perform checks to ensure that this
833 # does not happen.
834 try:
835 statbuf = self.fs.stat(path)
836 except cephfs.ObjectNotFound:
837 self.fs.close(fd)
838 continue
839
840 fstatbuf = self.fs.fstat(fd)
841 if statbuf.st_ino == fstatbuf.st_ino:
842 break
843
844 try:
845 yield
846 finally:
847 self.fs.flock(fd, fcntl.LOCK_UN, self._id)
848 self.fs.close(fd)
849
850 return fn()
851
852 def _auth_metadata_path(self, auth_id):
853 return os.path.join(self.volume_prefix, "${0}{1}".format(
854 auth_id, META_FILE_EXT))
855
856 def _auth_lock(self, auth_id):
857 return self._lock(self._auth_metadata_path(auth_id))
858
859 def _auth_metadata_get(self, auth_id):
860 """
861 Call me with the metadata locked!
862
863 Check whether a auth metadata structure can be decoded by the current
864 version of CephFSVolumeClient.
865
866 Return auth metadata that the current version of CephFSVolumeClient
867 can decode.
868 """
869 auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id))
870
871 if auth_metadata:
872 self._check_compat_version(auth_metadata['compat_version'])
873
874 return auth_metadata
875
876 def _auth_metadata_set(self, auth_id, data):
877 """
878 Call me with the metadata locked!
879
880 Fsync the auth metadata.
881
882 Add two version attributes to the auth metadata,
883 'compat_version', the minimum CephFSVolumeClient version that can
884 decode the metadata, and 'version', the CephFSVolumeClient version
885 that encoded the metadata.
886 """
cd265ab1 887 data['compat_version'] = 6
3efd9988 888 data['version'] = self.version
7c673cae
FG
889 return self._metadata_set(self._auth_metadata_path(auth_id), data)
890
891 def _volume_metadata_path(self, volume_path):
892 return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format(
893 volume_path.group_id if volume_path.group_id else "",
894 volume_path.volume_id,
895 META_FILE_EXT
896 ))
897
898 def _volume_lock(self, volume_path):
899 """
900 Return a ContextManager which locks the authorization metadata for
901 a particular volume, and persists a flag to the metadata indicating
902 that it is currently locked, so that we can detect dirty situations
903 during recovery.
904
905 This lock isn't just to make access to the metadata safe: it's also
906 designed to be used over the two-step process of checking the
907 metadata and then responding to an authorization request, to
908 ensure that at the point we respond the metadata hasn't changed
909 in the background. It's key to how we avoid security holes
910 resulting from races during that problem ,
911 """
912 return self._lock(self._volume_metadata_path(volume_path))
913
914 def _volume_metadata_get(self, volume_path):
915 """
916 Call me with the metadata locked!
917
918 Check whether a volume metadata structure can be decoded by the current
919 version of CephFSVolumeClient.
920
921 Return a volume_metadata structure that the current version of
922 CephFSVolumeClient can decode.
923 """
924 volume_metadata = self._metadata_get(self._volume_metadata_path(volume_path))
925
926 if volume_metadata:
927 self._check_compat_version(volume_metadata['compat_version'])
928
929 return volume_metadata
930
931 def _volume_metadata_set(self, volume_path, data):
932 """
933 Call me with the metadata locked!
934
935 Add two version attributes to the volume metadata,
936 'compat_version', the minimum CephFSVolumeClient version that can
937 decode the metadata and 'version', the CephFSVolumeClient version
938 that encoded the metadata.
939 """
940 data['compat_version'] = 1
3efd9988 941 data['version'] = self.version
7c673cae
FG
942 return self._metadata_set(self._volume_metadata_path(volume_path), data)
943
f91f0fd5
TL
944 def _prepare_updated_caps_list(self, existing_caps, mds_cap_str, osd_cap_str, authorize=True):
945 caps_list = []
946 for k, v in existing_caps['caps'].items():
947 if k == 'mds' or k == 'osd':
948 continue
949 elif k == 'mon':
950 if not authorize and v == 'allow r':
951 continue
952 caps_list.extend((k,v))
953
954 if mds_cap_str:
955 caps_list.extend(('mds', mds_cap_str))
956 if osd_cap_str:
957 caps_list.extend(('osd', osd_cap_str))
958
959 if authorize and 'mon' not in caps_list:
960 caps_list.extend(('mon', 'allow r'))
961
962 return caps_list
963
964 def authorize(self, volume_path, auth_id, readonly=False, tenant_id=None, allow_existing_id=False):
7c673cae
FG
965 """
966 Get-or-create a Ceph auth identity for `auth_id` and grant them access
967 to
968 :param volume_path:
969 :param auth_id:
970 :param readonly:
971 :param tenant_id: Optionally provide a stringizable object to
972 restrict any created cephx IDs to other callers
973 passing the same tenant ID.
f91f0fd5
TL
974 :allow_existing_id: Optionally authorize existing auth-ids not
975 created by ceph_volume_client
7c673cae
FG
976 :return:
977 """
978
979 with self._auth_lock(auth_id):
f91f0fd5
TL
980 client_entity = "client.{0}".format(auth_id)
981 try:
982 existing_caps = self._rados_command(
983 'auth get',
984 {
985 'entity': client_entity
986 }
987 )
988 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
989 except rados.Error:
990 existing_caps = None
991
7c673cae
FG
992 # Existing meta, or None, to be updated
993 auth_meta = self._auth_metadata_get(auth_id)
994
cd265ab1 995 # subvolume data to be inserted
7c673cae 996 volume_path_str = str(volume_path)
cd265ab1 997 subvolume = {
7c673cae
FG
998 volume_path_str : {
999 # The access level at which the auth_id is authorized to
1000 # access the volume.
1001 'access_level': 'r' if readonly else 'rw',
1002 'dirty': True,
1003 }
1004 }
f91f0fd5 1005
7c673cae 1006 if auth_meta is None:
f91f0fd5
TL
1007 if not allow_existing_id and existing_caps is not None:
1008 msg = "auth ID: {0} exists and not created by ceph_volume_client. Not allowed to modify".format(auth_id)
1009 log.error(msg)
1010 raise CephFSVolumeClientError(msg)
1011
1012 # non-existent auth IDs
7c673cae
FG
1013 sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format(
1014 auth_id, tenant_id
1015 ))
1016 log.debug("Authorize: no existing meta")
1017 auth_meta = {
1018 'dirty': True,
1019 'tenant_id': tenant_id.__str__() if tenant_id else None,
cd265ab1 1020 'subvolumes': subvolume
7c673cae 1021 }
7c673cae 1022 else:
cd265ab1
TL
1023 # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key
1024 if 'volumes' in auth_meta:
1025 auth_meta['subvolumes'] = auth_meta.pop('volumes')
1026
7c673cae
FG
1027 # Disallow tenants to share auth IDs
1028 if auth_meta['tenant_id'].__str__() != tenant_id.__str__():
1029 msg = "auth ID: {0} is already in use".format(auth_id)
1030 log.error(msg)
1031 raise CephFSVolumeClientError(msg)
1032
1033 if auth_meta['dirty']:
1034 self._recover_auth_meta(auth_id, auth_meta)
1035
1036 log.debug("Authorize: existing tenant {tenant}".format(
1037 tenant=auth_meta['tenant_id']
1038 ))
1039 auth_meta['dirty'] = True
cd265ab1 1040 auth_meta['subvolumes'].update(subvolume)
7c673cae
FG
1041
1042 self._auth_metadata_set(auth_id, auth_meta)
1043
1044 with self._volume_lock(volume_path):
f91f0fd5 1045 key = self._authorize_volume(volume_path, auth_id, readonly, existing_caps)
7c673cae
FG
1046
1047 auth_meta['dirty'] = False
cd265ab1 1048 auth_meta['subvolumes'][volume_path_str]['dirty'] = False
7c673cae
FG
1049 self._auth_metadata_set(auth_id, auth_meta)
1050
1051 if tenant_id:
1052 return {
1053 'auth_key': key
1054 }
1055 else:
1056 # Caller wasn't multi-tenant aware: be safe and don't give
1057 # them a key
1058 return {
1059 'auth_key': None
1060 }
1061
f91f0fd5 1062 def _authorize_volume(self, volume_path, auth_id, readonly, existing_caps):
7c673cae
FG
1063 vol_meta = self._volume_metadata_get(volume_path)
1064
1065 access_level = 'r' if readonly else 'rw'
1066 auth = {
1067 auth_id: {
1068 'access_level': access_level,
1069 'dirty': True,
1070 }
1071 }
1072
1073 if vol_meta is None:
1074 vol_meta = {
1075 'auths': auth
1076 }
1077 else:
1078 vol_meta['auths'].update(auth)
1079 self._volume_metadata_set(volume_path, vol_meta)
1080
f91f0fd5 1081 key = self._authorize_ceph(volume_path, auth_id, readonly, existing_caps)
7c673cae
FG
1082
1083 vol_meta['auths'][auth_id]['dirty'] = False
1084 self._volume_metadata_set(volume_path, vol_meta)
1085
1086 return key
1087
f91f0fd5 1088 def _authorize_ceph(self, volume_path, auth_id, readonly, existing_caps):
7c673cae
FG
1089 path = self._get_path(volume_path)
1090 log.debug("Authorizing Ceph id '{0}' for path '{1}'".format(
1091 auth_id, path
1092 ))
1093
1094 # First I need to work out what the data pool is for this share:
1095 # read the layout
1096 pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
28e407b8
AA
1097
1098 try:
91327a77
AA
1099 namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_"
1100 "namespace").decode()
28e407b8
AA
1101 except cephfs.NoData:
1102 namespace = None
7c673cae
FG
1103
1104 # Now construct auth capabilities that give the guest just enough
1105 # permissions to access the share
1106 client_entity = "client.{0}".format(auth_id)
1107 want_access_level = 'r' if readonly else 'rw'
1108 want_mds_cap = 'allow {0} path={1}'.format(want_access_level, path)
28e407b8
AA
1109 if namespace:
1110 want_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
1111 want_access_level, pool_name, namespace)
1112 else:
1113 want_osd_cap = 'allow {0} pool={1}'.format(want_access_level,
1114 pool_name)
7c673cae 1115
f91f0fd5 1116 if existing_caps is None:
7c673cae
FG
1117 caps = self._rados_command(
1118 'auth get-or-create',
1119 {
1120 'entity': client_entity,
1121 'caps': [
1122 'mds', want_mds_cap,
1123 'osd', want_osd_cap,
1124 'mon', 'allow r']
1125 })
1126 else:
1127 # entity exists, update it
f91f0fd5 1128 cap = existing_caps[0]
7c673cae
FG
1129
1130 # Construct auth caps that if present might conflict with the desired
1131 # auth caps.
f6b5b4d7 1132 unwanted_access_level = 'r' if want_access_level == 'rw' else 'rw'
7c673cae 1133 unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, path)
28e407b8
AA
1134 if namespace:
1135 unwanted_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
1136 unwanted_access_level, pool_name, namespace)
1137 else:
1138 unwanted_osd_cap = 'allow {0} pool={1}'.format(
1139 unwanted_access_level, pool_name)
1140
1141 def cap_update(
1142 orig_mds_caps, orig_osd_caps, want_mds_cap,
1143 want_osd_cap, unwanted_mds_cap, unwanted_osd_cap):
7c673cae 1144
28e407b8
AA
1145 if not orig_mds_caps:
1146 return want_mds_cap, want_osd_cap
7c673cae 1147
f91f0fd5
TL
1148 mds_cap_tokens = [x.strip() for x in orig_mds_caps.split(",")]
1149 osd_cap_tokens = [x.strip() for x in orig_osd_caps.split(",")]
3efd9988 1150
28e407b8
AA
1151 if want_mds_cap in mds_cap_tokens:
1152 return orig_mds_caps, orig_osd_caps
7c673cae 1153
28e407b8
AA
1154 if unwanted_mds_cap in mds_cap_tokens:
1155 mds_cap_tokens.remove(unwanted_mds_cap)
1156 osd_cap_tokens.remove(unwanted_osd_cap)
7c673cae 1157
28e407b8
AA
1158 mds_cap_tokens.append(want_mds_cap)
1159 osd_cap_tokens.append(want_osd_cap)
7c673cae 1160
28e407b8
AA
1161 return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens)
1162
1163 orig_mds_caps = cap['caps'].get('mds', "")
1164 orig_osd_caps = cap['caps'].get('osd', "")
1165
1166 mds_cap_str, osd_cap_str = cap_update(
1167 orig_mds_caps, orig_osd_caps, want_mds_cap, want_osd_cap,
1168 unwanted_mds_cap, unwanted_osd_cap)
7c673cae 1169
f91f0fd5 1170 caps_list = self._prepare_updated_caps_list(cap, mds_cap_str, osd_cap_str)
7c673cae
FG
1171 caps = self._rados_command(
1172 'auth caps',
1173 {
1174 'entity': client_entity,
f91f0fd5 1175 'caps': caps_list
7c673cae 1176 })
f91f0fd5 1177
7c673cae
FG
1178 caps = self._rados_command(
1179 'auth get',
1180 {
1181 'entity': client_entity
1182 }
1183 )
1184
1185 # Result expected like this:
1186 # [
1187 # {
1188 # "entity": "client.foobar",
1189 # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==",
1190 # "caps": {
1191 # "mds": "allow *",
1192 # "mon": "allow *"
1193 # }
1194 # }
1195 # ]
1196 assert len(caps) == 1
1197 assert caps[0]['entity'] == client_entity
1198 return caps[0]['key']
1199
1200 def deauthorize(self, volume_path, auth_id):
1201 with self._auth_lock(auth_id):
1202 # Existing meta, or None, to be updated
1203 auth_meta = self._auth_metadata_get(auth_id)
1204
cd265ab1
TL
1205 # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key
1206 if auth_meta and 'volumes' in auth_meta:
1207 auth_meta['subvolumes'] = auth_meta.pop('volumes')
1208
7c673cae 1209 volume_path_str = str(volume_path)
cd265ab1 1210 if (auth_meta is None) or (not auth_meta['subvolumes']):
e306af50 1211 log.warning("deauthorized called for already-removed auth"
7c673cae
FG
1212 "ID '{auth_id}' for volume ID '{volume}'".format(
1213 auth_id=auth_id, volume=volume_path.volume_id
1214 ))
1215 # Clean up the auth meta file of an auth ID
1216 self.fs.unlink(self._auth_metadata_path(auth_id))
1217 return
1218
cd265ab1 1219 if volume_path_str not in auth_meta['subvolumes']:
e306af50 1220 log.warning("deauthorized called for already-removed auth"
7c673cae
FG
1221 "ID '{auth_id}' for volume ID '{volume}'".format(
1222 auth_id=auth_id, volume=volume_path.volume_id
1223 ))
1224 return
1225
1226 if auth_meta['dirty']:
1227 self._recover_auth_meta(auth_id, auth_meta)
1228
1229 auth_meta['dirty'] = True
cd265ab1 1230 auth_meta['subvolumes'][volume_path_str]['dirty'] = True
7c673cae
FG
1231 self._auth_metadata_set(auth_id, auth_meta)
1232
1233 self._deauthorize_volume(volume_path, auth_id)
1234
1235 # Filter out the volume we're deauthorizing
cd265ab1 1236 del auth_meta['subvolumes'][volume_path_str]
7c673cae
FG
1237
1238 # Clean up auth meta file
cd265ab1 1239 if not auth_meta['subvolumes']:
7c673cae
FG
1240 self.fs.unlink(self._auth_metadata_path(auth_id))
1241 return
1242
1243 auth_meta['dirty'] = False
1244 self._auth_metadata_set(auth_id, auth_meta)
1245
1246 def _deauthorize_volume(self, volume_path, auth_id):
1247 with self._volume_lock(volume_path):
1248 vol_meta = self._volume_metadata_get(volume_path)
1249
1250 if (vol_meta is None) or (auth_id not in vol_meta['auths']):
e306af50 1251 log.warning("deauthorized called for already-removed auth"
7c673cae
FG
1252 "ID '{auth_id}' for volume ID '{volume}'".format(
1253 auth_id=auth_id, volume=volume_path.volume_id
1254 ))
1255 return
1256
1257 vol_meta['auths'][auth_id]['dirty'] = True
1258 self._volume_metadata_set(volume_path, vol_meta)
1259
1260 self._deauthorize(volume_path, auth_id)
1261
1262 # Remove the auth_id from the metadata *after* removing it
1263 # from ceph, so that if we crashed here, we would actually
1264 # recreate the auth ID during recovery (i.e. end up with
1265 # a consistent state).
1266
1267 # Filter out the auth we're removing
1268 del vol_meta['auths'][auth_id]
1269 self._volume_metadata_set(volume_path, vol_meta)
1270
1271 def _deauthorize(self, volume_path, auth_id):
1272 """
1273 The volume must still exist.
1274 """
1275 client_entity = "client.{0}".format(auth_id)
1276 path = self._get_path(volume_path)
1277 pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
28e407b8 1278 try:
91327a77
AA
1279 namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_"
1280 "namespace").decode()
28e407b8
AA
1281 except cephfs.NoData:
1282 namespace = None
7c673cae
FG
1283
1284 # The auth_id might have read-only or read-write mount access for the
1285 # volume path.
1286 access_levels = ('r', 'rw')
28e407b8
AA
1287 want_mds_caps = ['allow {0} path={1}'.format(access_level, path)
1288 for access_level in access_levels]
1289 if namespace:
1290 want_osd_caps = ['allow {0} pool={1} namespace={2}'.format(access_level, pool_name, namespace)
1291 for access_level in access_levels]
1292 else:
1293 want_osd_caps = ['allow {0} pool={1}'.format(access_level, pool_name)
1294 for access_level in access_levels]
1295
7c673cae
FG
1296
1297 try:
1298 existing = self._rados_command(
1299 'auth get',
1300 {
1301 'entity': client_entity
1302 }
1303 )
1304
28e407b8 1305 def cap_remove(orig_mds_caps, orig_osd_caps, want_mds_caps, want_osd_caps):
f91f0fd5
TL
1306 mds_cap_tokens = [x.strip() for x in orig_mds_caps.split(",")]
1307 osd_cap_tokens = [x.strip() for x in orig_osd_caps.split(",")]
28e407b8
AA
1308
1309 for want_mds_cap, want_osd_cap in zip(want_mds_caps, want_osd_caps):
1310 if want_mds_cap in mds_cap_tokens:
1311 mds_cap_tokens.remove(want_mds_cap)
1312 osd_cap_tokens.remove(want_osd_cap)
1313 break
1314
1315 return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens)
7c673cae
FG
1316
1317 cap = existing[0]
28e407b8
AA
1318 orig_mds_caps = cap['caps'].get('mds', "")
1319 orig_osd_caps = cap['caps'].get('osd', "")
1320 mds_cap_str, osd_cap_str = cap_remove(orig_mds_caps, orig_osd_caps,
1321 want_mds_caps, want_osd_caps)
1322
f91f0fd5
TL
1323 caps_list = self._prepare_updated_caps_list(cap, mds_cap_str, osd_cap_str, authorize=False)
1324 if not caps_list:
7c673cae
FG
1325 self._rados_command('auth del', {'entity': client_entity}, decode=False)
1326 else:
1327 self._rados_command(
1328 'auth caps',
1329 {
1330 'entity': client_entity,
f91f0fd5 1331 'caps': caps_list
7c673cae
FG
1332 })
1333
1334 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1335 except rados.Error:
1336 # Already gone, great.
1337 return
1338
1339 def get_authorized_ids(self, volume_path):
1340 """
1341 Expose a list of auth IDs that have access to a volume.
1342
1343 return: a list of (auth_id, access_level) tuples, where
1344 the access_level can be 'r' , or 'rw'.
1345 None if no auth ID is given access to the volume.
1346 """
1347 with self._volume_lock(volume_path):
1348 meta = self._volume_metadata_get(volume_path)
1349 auths = []
1350 if not meta or not meta['auths']:
1351 return None
1352
1353 for auth, auth_data in meta['auths'].items():
1354 # Skip partial auth updates.
1355 if not auth_data['dirty']:
1356 auths.append((auth, auth_data['access_level']))
1357
1358 return auths
1359
1360 def _rados_command(self, prefix, args=None, decode=True):
1361 """
1362 Safer wrapper for ceph_argparse.json_command, which raises
1363 Error exception instead of relying on caller to check return
1364 codes.
1365
1366 Error exception can result from:
1367 * Timeout
1368 * Actual legitimate errors
1369 * Malformed JSON output
1370
1371 return: Decoded object from ceph, or None if empty string returned.
1372 If decode is False, return a string (the data returned by
1373 ceph command)
1374 """
1375 if args is None:
1376 args = {}
1377
1378 argdict = args.copy()
1379 argdict['format'] = 'json'
1380
1381 ret, outbuf, outs = json_command(self.rados,
1382 prefix=prefix,
1383 argdict=argdict,
1384 timeout=RADOS_TIMEOUT)
1385 if ret != 0:
1386 raise rados.Error(outs)
1387 else:
1388 if decode:
1389 if outbuf:
1390 try:
91327a77 1391 return json.loads(outbuf.decode())
7c673cae
FG
1392 except (ValueError, TypeError):
1393 raise RadosError("Invalid JSON output for command {0}".format(argdict))
1394 else:
1395 return None
1396 else:
1397 return outbuf
1398
1399 def get_used_bytes(self, volume_path):
91327a77
AA
1400 return int(self.fs.getxattr(self._get_path(volume_path), "ceph.dir."
1401 "rbytes").decode())
7c673cae
FG
1402
1403 def set_max_bytes(self, volume_path, max_bytes):
1404 self.fs.setxattr(self._get_path(volume_path), 'ceph.quota.max_bytes',
91327a77 1405 to_bytes(max_bytes if max_bytes else 0), 0)
7c673cae
FG
1406
1407 def _snapshot_path(self, dir_path, snapshot_name):
1408 return os.path.join(
3efd9988 1409 dir_path, self.rados.conf_get('client_snapdir'), snapshot_name
7c673cae
FG
1410 )
1411
f64942e4 1412 def _snapshot_create(self, dir_path, snapshot_name, mode=0o755):
7c673cae 1413 # TODO: raise intelligible exception for clusters where snaps are disabled
f64942e4 1414 self.fs.mkdir(self._snapshot_path(dir_path, snapshot_name), mode)
7c673cae
FG
1415
1416 def _snapshot_destroy(self, dir_path, snapshot_name):
1417 """
1418 Remove a snapshot, or do nothing if it already doesn't exist.
1419 """
1420 try:
1421 self.fs.rmdir(self._snapshot_path(dir_path, snapshot_name))
1422 except cephfs.ObjectNotFound:
e306af50 1423 log.warning("Snapshot was already gone: {0}".format(snapshot_name))
7c673cae 1424
f64942e4
AA
1425 def create_snapshot_volume(self, volume_path, snapshot_name, mode=0o755):
1426 self._snapshot_create(self._get_path(volume_path), snapshot_name, mode)
7c673cae
FG
1427
1428 def destroy_snapshot_volume(self, volume_path, snapshot_name):
1429 self._snapshot_destroy(self._get_path(volume_path), snapshot_name)
1430
f64942e4 1431 def create_snapshot_group(self, group_id, snapshot_name, mode=0o755):
7c673cae
FG
1432 if group_id is None:
1433 raise RuntimeError("Group ID may not be None")
1434
f64942e4
AA
1435 return self._snapshot_create(self._get_group_path(group_id), snapshot_name,
1436 mode)
7c673cae
FG
1437
1438 def destroy_snapshot_group(self, group_id, snapshot_name):
1439 if group_id is None:
1440 raise RuntimeError("Group ID may not be None")
1441 if snapshot_name is None:
1442 raise RuntimeError("Snapshot name may not be None")
1443
1444 return self._snapshot_destroy(self._get_group_path(group_id), snapshot_name)
1445
1446 def _cp_r(self, src, dst):
1447 # TODO
1448 raise NotImplementedError()
1449
1450 def clone_volume_to_existing(self, dest_volume_path, src_volume_path, src_snapshot_name):
1451 dest_fs_path = self._get_path(dest_volume_path)
1452 src_snapshot_path = self._snapshot_path(self._get_path(src_volume_path), src_snapshot_name)
1453
1454 self._cp_r(src_snapshot_path, dest_fs_path)
3efd9988
FG
1455
1456 def put_object(self, pool_name, object_name, data):
1457 """
1458 Synchronously write data to an object.
1459
1460 :param pool_name: name of the pool
1461 :type pool_name: str
1462 :param object_name: name of the object
1463 :type object_name: str
1464 :param data: data to write
1465 :type data: bytes
1466 """
91327a77
AA
1467 return self.put_object_versioned(pool_name, object_name, data)
1468
1469 def put_object_versioned(self, pool_name, object_name, data, version=None):
1470 """
1471 Synchronously write data to an object only if version of the object
1472 version matches the expected version.
1473
1474 :param pool_name: name of the pool
1475 :type pool_name: str
1476 :param object_name: name of the object
1477 :type object_name: str
1478 :param data: data to write
1479 :type data: bytes
1480 :param version: expected version of the object to write
1481 :type version: int
1482 """
3efd9988 1483 ioctx = self.rados.open_ioctx(pool_name)
91327a77 1484
3efd9988
FG
1485 max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
1486 if len(data) > max_size:
1487 msg = ("Data to be written to object '{0}' exceeds "
1488 "{1} bytes".format(object_name, max_size))
1489 log.error(msg)
1490 raise CephFSVolumeClientError(msg)
91327a77 1491
3efd9988 1492 try:
81eedcae 1493 with rados.WriteOpCtx() as wop:
91327a77
AA
1494 if version is not None:
1495 wop.assert_version(version)
1496 wop.write_full(data)
1497 ioctx.operate_write_op(wop, object_name)
1498 except rados.OSError as e:
1499 log.error(e)
1500 raise e
3efd9988
FG
1501 finally:
1502 ioctx.close()
1503
1504 def get_object(self, pool_name, object_name):
1505 """
1506 Synchronously read data from object.
1507
1508 :param pool_name: name of the pool
1509 :type pool_name: str
1510 :param object_name: name of the object
1511 :type object_name: str
1512
1513 :returns: bytes - data read from object
1514 """
91327a77
AA
1515 return self.get_object_and_version(pool_name, object_name)[0]
1516
1517 def get_object_and_version(self, pool_name, object_name):
1518 """
1519 Synchronously read data from object and get its version.
1520
1521 :param pool_name: name of the pool
1522 :type pool_name: str
1523 :param object_name: name of the object
1524 :type object_name: str
1525
1526 :returns: tuple of object data and version
1527 """
3efd9988
FG
1528 ioctx = self.rados.open_ioctx(pool_name)
1529 max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
1530 try:
1531 bytes_read = ioctx.read(object_name, max_size)
1532 if ((len(bytes_read) == max_size) and
1533 (ioctx.read(object_name, 1, offset=max_size))):
1534 log.warning("Size of object {0} exceeds '{1}' bytes "
1535 "read".format(object_name, max_size))
91327a77 1536 obj_version = ioctx.get_last_version()
3efd9988
FG
1537 finally:
1538 ioctx.close()
91327a77 1539 return (bytes_read, obj_version)
3efd9988
FG
1540
1541 def delete_object(self, pool_name, object_name):
1542 ioctx = self.rados.open_ioctx(pool_name)
1543 try:
1544 ioctx.remove_object(object_name)
1545 except rados.ObjectNotFound:
e306af50 1546 log.warning("Object '{0}' was already removed".format(object_name))
3efd9988
FG
1547 finally:
1548 ioctx.close()