2 This module is a thin wrapper around libcephfs.
5 from cpython cimport PyObject, ref, exc
6 from libc.stdint cimport *
7 from libc.stdlib cimport malloc, realloc, free
11 include "mock_cephfs.pxi"
16 from c_cephfs cimport *
17 from rados cimport Rados
19 from collections import namedtuple
20 from datetime import datetime
23 from typing import Any, Dict, Optional
25 AT_SYMLINK_NOFOLLOW = 0x0100
26 AT_STATX_SYNC_TYPE = 0x6000
27 AT_STATX_SYNC_AS_STAT = 0x0000
28 AT_STATX_FORCE_SYNC = 0x2000
29 AT_STATX_DONT_SYNC = 0x4000
30 cdef int AT_SYMLINK_NOFOLLOW_CDEF = AT_SYMLINK_NOFOLLOW
31 CEPH_STATX_BASIC_STATS = 0x7ff
32 cdef int CEPH_STATX_BASIC_STATS_CDEF = CEPH_STATX_BASIC_STATS
34 CEPH_STATX_NLINK = 0x2
37 CEPH_STATX_RDEV = 0x10
38 CEPH_STATX_ATIME = 0x20
39 CEPH_STATX_MTIME = 0x40
40 CEPH_STATX_CTIME = 0x80
41 CEPH_STATX_INO = 0x100
42 CEPH_STATX_SIZE = 0x200
43 CEPH_STATX_BLOCKS = 0x400
44 CEPH_STATX_BTIME = 0x800
45 CEPH_STATX_VERSION = 0x1000
47 FALLOC_FL_KEEP_SIZE = 0x01
48 FALLOC_FL_PUNCH_HOLE = 0x02
49 FALLOC_FL_NO_HIDE_STALE = 0x04
51 CEPH_SETATTR_MODE = 0x1
52 CEPH_SETATTR_UID = 0x2
53 CEPH_SETATTR_GID = 0x4
54 CEPH_SETATTR_MTIME = 0x8
55 CEPH_SETATTR_ATIME = 0x10
56 CEPH_SETATTR_SIZE = 0x20
57 CEPH_SETATTR_CTIME = 0x40
58 CEPH_SETATTR_BTIME = 0x200
62 CEPHFS_EBLOCKLISTED = 108
66 CEPHFS_ETIMEDOUT = 110
80 CEPHFS_ENAMETOOLONG = 36
86 CEPHFS_ECANCELED = 125
88 CEPHFS_EOPNOTSUPP = 95
91 CEPHFS_ENOTRECOVERABLE = 131
93 CEPHFS_EWOULDBLOCK = CEPHFS_EAGAIN
96 CEPHFS_EDEADLOCK = CEPHFS_EDEADLK
100 CEPHFS_EOLDSNAPC = 85
102 cdef extern from "Python.h":
103 # These are in cpython/string.pxd, but use "object" types instead of
104 # PyObject*, which invokes assumptions in cpython that we need to
105 # legitimately break to implement zero-copy string buffers in Image.read().
106 # This is valid use of the Python API and documented as a special case.
107 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
108 char* PyBytes_AsString(PyObject *string) except NULL
109 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
110 void PyEval_InitThreads()
113 class Error(Exception):
114 def get_error_code(self):
118 class LibCephFSStateError(Error):
122 class OSError(Error):
123 def __init__(self, errno, strerror):
124 super(OSError, self).__init__(errno, strerror)
126 self.strerror = "%s: %s" % (strerror, os.strerror(errno))
129 return '{} [Errno {}]'.format(self.strerror, self.errno)
131 def get_error_code(self):
135 class PermissionError(OSError):
139 class ObjectNotFound(OSError):
143 class NoData(OSError):
147 class ObjectExists(OSError):
151 class IOError(OSError):
155 class NoSpace(OSError):
159 class InvalidValue(OSError):
163 class OperationNotSupported(OSError):
167 class WouldBlock(OSError):
171 class OutOfRange(OSError):
175 class ObjectNotEmpty(OSError):
178 class NotDirectory(OSError):
181 class DiskQuotaExceeded(OSError):
183 class PermissionDenied(OSError):
186 cdef errno_to_exception = {
187 CEPHFS_EPERM : PermissionError,
188 CEPHFS_ENOENT : ObjectNotFound,
189 CEPHFS_EIO : IOError,
190 CEPHFS_ENOSPC : NoSpace,
191 CEPHFS_EEXIST : ObjectExists,
192 CEPHFS_ENODATA : NoData,
193 CEPHFS_EINVAL : InvalidValue,
194 CEPHFS_EOPNOTSUPP : OperationNotSupported,
195 CEPHFS_ERANGE : OutOfRange,
196 CEPHFS_EWOULDBLOCK: WouldBlock,
197 CEPHFS_ENOTEMPTY : ObjectNotEmpty,
198 CEPHFS_ENOTDIR : NotDirectory,
199 CEPHFS_EDQUOT : DiskQuotaExceeded,
200 CEPHFS_EACCES : PermissionDenied,
204 cdef make_ex(ret, msg):
206 Translate a libcephfs return code into an exception.
208 :param ret: the return code
210 :param msg: the error message to use
212 :returns: a subclass of :class:`Error`
215 if ret in errno_to_exception:
216 return errno_to_exception[ret](ret, msg)
218 return OSError(ret, msg)
221 class DirEntry(namedtuple('DirEntry',
222 ['d_ino', 'd_off', 'd_reclen', 'd_type', 'd_name'])):
227 return self.d_type == self.DT_DIR
229 def is_symbol_file(self):
230 return self.d_type == self.DT_LNK
233 return self.d_type == self.DT_REG
235 StatResult = namedtuple('StatResult',
236 ["st_dev", "st_ino", "st_mode", "st_nlink", "st_uid",
237 "st_gid", "st_rdev", "st_size", "st_blksize",
238 "st_blocks", "st_atime", "st_mtime", "st_ctime"])
240 cdef class DirResult(object):
242 cdef ceph_dir_result* handle
244 # Bug in older Cython instances prevents this from being a static method.
246 # cdef create(LibCephFS lib, ceph_dir_result* handle):
252 def __dealloc__(self):
257 raise make_ex(CEPHFS_EBADF, "dir is not open")
258 self.lib.require_state("mounted")
260 ceph_rewinddir(self.lib.cluster, self.handle)
263 def __exit__(self, type_, value, traceback):
268 self.lib.require_state("mounted")
271 dirent = ceph_readdir(self.lib.cluster, self.handle)
275 IF UNAME_SYSNAME == "FreeBSD" or UNAME_SYSNAME == "Darwin":
276 return DirEntry(d_ino=dirent.d_ino,
278 d_reclen=dirent.d_reclen,
279 d_type=dirent.d_type,
280 d_name=dirent.d_name)
282 return DirEntry(d_ino=dirent.d_ino,
284 d_reclen=dirent.d_reclen,
285 d_type=dirent.d_type,
286 d_name=dirent.d_name)
290 self.lib.require_state("mounted")
292 ret = ceph_closedir(self.lib.cluster, self.handle)
294 raise make_ex(ret, "closedir failed")
299 raise make_ex(CEPHFS_EBADF, "dir is not open")
300 self.lib.require_state("mounted")
302 ceph_rewinddir(self.lib.cluster, self.handle)
306 raise make_ex(CEPHFS_EBADF, "dir is not open")
307 self.lib.require_state("mounted")
309 ret = ceph_telldir(self.lib.cluster, self.handle)
311 raise make_ex(ret, "telldir failed")
314 def seekdir(self, offset):
316 raise make_ex(CEPHFS_EBADF, "dir is not open")
317 if not isinstance(offset, int):
318 raise TypeError('offset must be an int')
319 self.lib.require_state("mounted")
320 cdef int64_t _offset = offset
322 ceph_seekdir(self.lib.cluster, self.handle, _offset)
325 def cstr(val, name, encoding="utf-8", opt=False) -> bytes:
327 Create a byte string from a Python string
329 :param basestring val: Python string
330 :param str name: Name of the string parameter, for exceptions
331 :param str encoding: Encoding to use
332 :param bool opt: If True, None is allowed
333 :raises: :class:`InvalidArgument`
335 if opt and val is None:
337 if isinstance(val, bytes):
341 v = val.encode(encoding)
343 raise TypeError('%s must be encodeable as a bytearray' % name)
344 assert isinstance(v, bytes)
347 def cstr_list(list_str, name, encoding="utf-8"):
348 return [cstr(s, name) for s in list_str]
351 def decode_cstr(val, encoding="utf-8") -> Optional[str]:
353 Decode a byte string into a Python string.
355 :param bytes val: byte string
360 return val.decode(encoding)
362 cdef timeval to_timeval(t):
364 return timeval equivalent from time
367 cdef timeval buf = timeval(tt, (t - tt) * 1000000)
370 cdef timespec to_timespec(t):
372 return timespec equivalent from time
375 cdef timespec buf = timespec(tt, (t - tt) * 1000000000)
378 cdef char* opt_str(s) except? NULL:
384 cdef char ** to_bytes_array(list_bytes):
385 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
387 raise MemoryError("malloc failed")
388 for i in range(len(list_bytes)):
389 ret[i] = <char *>list_bytes[i]
393 cdef void* realloc_chk(void* ptr, size_t size) except NULL:
394 cdef void *ret = realloc(ptr, size)
396 raise MemoryError("realloc failed")
400 cdef iovec * to_iovec(buffers) except NULL:
401 cdef iovec *iov = <iovec *>malloc(len(buffers) * sizeof(iovec))
404 raise MemoryError("malloc failed")
405 for i in xrange(len(buffers)):
406 s = <char*>buffers[i]
407 iov[i] = [<void*>s, len(buffers[i])]
411 cdef class LibCephFS(object):
412 """libcephfs python wrapper"""
414 cdef public object state
415 cdef ceph_mount_info *cluster
417 def require_state(self, *args):
418 if self.state in args:
420 raise LibCephFSStateError("You cannot perform that operation on a "
421 "CephFS object in state %s." % (self.state))
423 def __cinit__(self, conf=None, conffile=None, auth_id=None, rados_inst=None):
424 """Create a libcephfs wrapper
426 :param conf dict opt: settings overriding the default ones and conffile
427 :param conffile str opt: the path to ceph.conf to override the default settings
428 :auth_id str opt: the id used to authenticate the client entity
429 :rados_inst Rados opt: a rados.Rados instance
432 self.state = "uninitialized"
433 if rados_inst is not None:
434 if auth_id is not None or conffile is not None or conf is not None:
435 raise make_ex(CEPHFS_EINVAL,
436 "May not pass RADOS instance as well as other configuration")
438 self.create_with_rados(rados_inst)
440 self.create(conf, conffile, auth_id)
442 def create_with_rados(self, Rados rados_inst):
445 ret = ceph_create_from_rados(&self.cluster, rados_inst.cluster)
447 raise Error("libcephfs_initialize failed with error code: %d" % ret)
448 self.state = "configuring"
451 "special value that indicates no conffile should be read when creating a mount handle"
452 DEFAULT_CONF_FILES = -2
453 "special value that indicates the default conffiles should be read when creating a mount handle"
455 def create(self, conf=None, conffile=NO_CONF_FILE, auth_id=None):
457 Create a mount handle for interacting with Ceph. All libcephfs
458 functions operate on a mount info handle.
460 :param conf dict opt: settings overriding the default ones and conffile
461 :param conffile Union[int,str], optional: the path to ceph.conf to override the default settings
462 :auth_id str opt: the id used to authenticate the client entity
464 if conf is not None and not isinstance(conf, dict):
465 raise TypeError("conf must be dict or None")
466 cstr(conffile, 'configfile', opt=True)
467 auth_id = cstr(auth_id, 'auth_id', opt=True)
470 char* _auth_id = opt_str(auth_id)
474 ret = ceph_create(&self.cluster, <const char*>_auth_id)
476 raise Error("libcephfs_initialize failed with error code: %d" % ret)
478 self.state = "configuring"
479 if conffile in (self.NO_CONF_FILE, None):
481 elif conffile in (self.DEFAULT_CONF_FILES, ''):
482 self.conf_read_file(None)
484 self.conf_read_file(conffile)
486 for key, value in conf.items():
487 self.conf_set(key, value)
491 Return the file system id for this fs client.
493 self.require_state("mounted")
495 ret = ceph_get_fs_cid(self.cluster)
497 raise make_ex(ret, "error fetching fscid")
502 Get associated client addresses with this RADOS session.
504 self.require_state("mounted")
512 ret = ceph_getaddrs(self.cluster, &addrs)
514 raise make_ex(ret, "error calling getaddrs")
516 return decode_cstr(addrs)
518 ceph_buffer_free(addrs)
521 def conf_read_file(self, conffile=None):
523 Load the ceph configuration from the specified config file.
525 :param conffile str opt: the path to ceph.conf to override the default settings
527 conffile = cstr(conffile, 'conffile', opt=True)
529 char *_conffile = opt_str(conffile)
531 ret = ceph_conf_read_file(self.cluster, <const char*>_conffile)
533 raise make_ex(ret, "error calling conf_read_file")
535 def conf_parse_argv(self, argv):
537 Parse the command line arguments and load the configuration parameters.
539 :param argv: the argument list
541 self.require_state("configuring")
542 cargv = cstr_list(argv, 'argv')
544 int _argc = len(argv)
545 char **_argv = to_bytes_array(cargv)
549 ret = ceph_conf_parse_argv(self.cluster, _argc,
550 <const char **>_argv)
552 raise make_ex(ret, "error calling conf_parse_argv")
558 Unmount and destroy the ceph mount handle.
560 if self.state in ["initialized", "mounted"]:
562 ceph_shutdown(self.cluster)
563 self.state = "shutdown"
569 def __exit__(self, type_, value, traceback):
573 def __dealloc__(self):
578 Get the version number of the ``libcephfs`` C library.
580 :returns: a tuple of ``(major, minor, extra)`` components of the
588 ceph_version(&major, &minor, &extra)
589 return (major, minor, extra)
591 def conf_get(self, option):
593 Gets the configuration value as a string.
595 :param option: the config option to get
597 self.require_state("configuring", "initialized", "mounted")
599 option = cstr(option, 'option')
601 char *_option = option
607 ret_buf = <char *>realloc_chk(ret_buf, length)
609 ret = ceph_conf_get(self.cluster, _option, ret_buf, length)
611 return decode_cstr(ret_buf)
612 elif ret == -CEPHFS_ENAMETOOLONG:
614 elif ret == -CEPHFS_ENOENT:
617 raise make_ex(ret, "error calling conf_get")
621 def conf_set(self, option, val):
623 Sets a configuration value from a string.
625 :param option: the configuration option to set
626 :param value: the value of the configuration option to set
628 self.require_state("configuring", "initialized", "mounted")
630 option = cstr(option, 'option')
631 val = cstr(val, 'val')
633 char *_option = option
637 ret = ceph_conf_set(self.cluster, _option, _val)
639 raise make_ex(ret, "error calling conf_set")
641 def set_mount_timeout(self, timeout):
645 :param timeout: mount timeout
647 self.require_state("configuring", "initialized")
648 if not isinstance(timeout, int):
649 raise TypeError('timeout must be an integer')
651 raise make_ex(CEPHFS_EINVAL, 'timeout must be greater than or equal to 0')
653 uint32_t _timeout = timeout
655 ret = ceph_set_mount_timeout(self.cluster, _timeout)
657 raise make_ex(ret, "error setting mount timeout")
661 Initialize the filesystem client (but do not mount the filesystem yet)
663 self.require_state("configuring")
665 ret = ceph_init(self.cluster)
667 raise make_ex(ret, "error calling ceph_init")
668 self.state = "initialized"
670 def mount(self, mount_root=None, filesystem_name=None):
672 Perform a mount using the path for the root of the mount.
674 if self.state == "configuring":
676 self.require_state("initialized")
678 # Configure which filesystem to mount if one was specified
679 if filesystem_name is None:
680 filesystem_name = b""
682 filesystem_name = cstr(filesystem_name, 'filesystem_name')
684 char *_filesystem_name = filesystem_name
687 ret = ceph_select_filesystem(self.cluster,
690 raise make_ex(ret, "error calling ceph_select_filesystem")
692 # Prepare mount_root argument, default to "/"
693 root = b"/" if mount_root is None else mount_root
695 char *_mount_root = root
698 ret = ceph_mount(self.cluster, _mount_root)
700 raise make_ex(ret, "error calling ceph_mount")
701 self.state = "mounted"
705 Unmount a mount handle.
707 self.require_state("mounted")
709 ret = ceph_unmount(self.cluster)
711 raise make_ex(ret, "error calling ceph_unmount")
712 self.state = "initialized"
714 def abort_conn(self):
716 Abort mds connections.
718 self.require_state("mounted")
720 ret = ceph_abort_conn(self.cluster)
722 raise make_ex(ret, "error calling ceph_abort_conn")
723 self.state = "initialized"
725 def get_instance_id(self):
727 Get a global id for current instance
729 self.require_state("initialized", "mounted")
731 ret = ceph_get_instance_id(self.cluster)
734 def statfs(self, path):
736 Perform a statfs on the ceph file system. This call fills in file system wide statistics
737 into the passed in buffer.
739 :param path: any path within the mounted filesystem
741 self.require_state("mounted")
742 path = cstr(path, 'path')
748 ret = ceph_statfs(self.cluster, _path, &statbuf)
750 raise make_ex(ret, "statfs failed: %s" % path)
751 return {'f_bsize': statbuf.f_bsize,
752 'f_frsize': statbuf.f_frsize,
753 'f_blocks': statbuf.f_blocks,
754 'f_bfree': statbuf.f_bfree,
755 'f_bavail': statbuf.f_bavail,
756 'f_files': statbuf.f_files,
757 'f_ffree': statbuf.f_ffree,
758 'f_favail': statbuf.f_favail,
759 'f_fsid': statbuf.f_fsid,
760 'f_flag': statbuf.f_flag,
761 'f_namemax': statbuf.f_namemax}
765 Synchronize all filesystem data to persistent media
767 self.require_state("mounted")
769 ret = ceph_sync_fs(self.cluster)
771 raise make_ex(ret, "sync_fs failed")
773 def fsync(self, int fd, int syncdataonly):
775 Synchronize an open file to persistent media.
777 :param fd: the file descriptor of the file to sync.
778 :param syncdataonly: a boolean whether to synchronize metadata and data (0)
781 self.require_state("mounted")
783 ret = ceph_fsync(self.cluster, fd, syncdataonly)
785 raise make_ex(ret, "fsync failed")
787 def lazyio(self, fd, enable):
789 Enable/disable lazyio for the file.
791 :param fd: the file descriptor of the file for which to enable lazio.
792 :param enable: a boolean to enable lazyio or disable lazyio.
795 self.require_state("mounted")
796 if not isinstance(fd, int):
797 raise TypeError('fd must be an int')
798 if not isinstance(enable, int):
799 raise TypeError('enable must be an int')
806 ret = ceph_lazyio(self.cluster, _fd, _enable)
808 raise make_ex(ret, "lazyio failed")
810 def lazyio_propagate(self, fd, offset, count):
812 Flushes the write buffer for the file thereby propogating the buffered write to the file.
814 :param fd: the file descriptor of the file to sync.
815 :param offset: the byte range starting.
816 :param count: the number of bytes starting from offset.
819 self.require_state("mounted")
820 if not isinstance(fd, int):
821 raise TypeError('fd must be an int')
822 if not isinstance(offset, int):
823 raise TypeError('offset must be an int')
824 if not isinstance(count, int):
825 raise TypeError('count must be an int')
829 int64_t _offset = offset
830 size_t _count = count
833 ret = ceph_lazyio_propagate(self.cluster, _fd, _offset, _count)
835 raise make_ex(ret, "lazyio_propagate failed")
837 def lazyio_synchronize(self, fd, offset, count):
839 Flushes the write buffer for the file and invalidate the read cache. This allows a
840 subsequent read operation to read and cache data directly from the file and hence
841 everyone's propagated writes would be visible.
843 :param fd: the file descriptor of the file to sync.
844 :param offset: the byte range starting.
845 :param count: the number of bytes starting from offset.
848 self.require_state("mounted")
849 if not isinstance(fd, int):
850 raise TypeError('fd must be an int')
851 if not isinstance(offset, int):
852 raise TypeError('offset must be an int')
853 if not isinstance(count, int):
854 raise TypeError('count must be an int')
858 int64_t _offset = offset
859 size_t _count = count
862 ret = ceph_lazyio_synchronize(self.cluster, _fd, _offset, _count)
864 raise make_ex(ret, "lazyio_synchronize failed")
866 def fallocate(self, fd, offset, length, mode=0):
868 Preallocate or release disk space for the file for the byte range.
870 :param fd: the file descriptor of the file to fallocate.
871 :param mode: the flags determines the operation to be performed on the given
872 range. default operation (0) allocate and initialize to zero
873 the file in the byte range, and the file size will be changed
874 if offset + length is greater than the file size. if the
875 FALLOC_FL_KEEP_SIZE flag is specified in the mode, the file size
876 will not be changed. if the FALLOC_FL_PUNCH_HOLE flag is specified
877 in the mode, the operation is deallocate space and zero the byte range.
878 :param offset: the byte range starting.
879 :param length: the length of the range.
882 self.require_state("mounted")
883 if not isinstance(fd, int):
884 raise TypeError('fd must be an int')
885 if not isinstance(mode, int):
886 raise TypeError('mode must be an int')
887 if not isinstance(offset, int):
888 raise TypeError('offset must be an int')
889 if not isinstance(length, int):
890 raise TypeError('length must be an int')
895 int64_t _offset = offset
896 int64_t _length = length
899 ret = ceph_fallocate(self.cluster, _fd, _mode, _offset, _length)
901 raise make_ex(ret, "fallocate failed")
903 def getcwd(self) -> bytes:
905 Get the current working directory.
907 :returns: the path to the current working directory
909 self.require_state("mounted")
911 ret = ceph_getcwd(self.cluster)
914 def chdir(self, path):
916 Change the current working directory.
918 :param path: the path to the working directory to change into.
920 self.require_state("mounted")
922 path = cstr(path, 'path')
923 cdef char* _path = path
925 ret = ceph_chdir(self.cluster, _path)
927 raise make_ex(ret, "chdir failed")
929 def opendir(self, path) -> DirResult:
931 Open the given directory.
933 :param path: the path name of the directory to open. Must be either an absolute path
934 or a path relative to the current working directory.
935 :returns: the open directory stream handle
937 self.require_state("mounted")
939 path = cstr(path, 'path')
942 ceph_dir_result* handle
944 ret = ceph_opendir(self.cluster, _path, &handle);
946 raise make_ex(ret, "opendir failed at {}".format(path.decode('utf-8')))
952 def readdir(self, DirResult handle) -> Optional[DirEntry]:
954 Get the next entry in an open directory.
956 :param handle: the open directory stream handle
957 :returns: the next directory entry or None if at the end of the
958 directory (or the directory is empty. This pointer
959 should not be freed by the caller, and is only safe to
960 access between return and the next call to readdir or
963 self.require_state("mounted")
965 return handle.readdir()
967 def closedir(self, DirResult handle):
969 Close the open directory.
971 :param handle: the open directory stream handle
973 self.require_state("mounted")
975 return handle.close()
977 def rewinddir(self, DirResult handle):
979 Rewind the directory stream to the beginning of the directory.
981 :param handle: the open directory stream handle
983 return handle.rewinddir()
985 def telldir(self, DirResult handle):
987 Get the current position of a directory stream.
989 :param handle: the open directory stream handle
990 :return value: The position of the directory stream. Note that the offsets
991 returned by ceph_telldir do not have a particular order (cannot
992 be compared with inequality).
994 return handle.telldir()
996 def seekdir(self, DirResult handle, offset):
998 Move the directory stream to a position specified by the given offset.
1000 :param handle: the open directory stream handle
1001 :param offset: the position to move the directory stream to. This offset should be
1002 a value returned by telldir. Note that this value does not refer to
1003 the nth entry in a directory, and can not be manipulated with plus
1006 return handle.seekdir(offset)
1008 def mkdir(self, path, mode):
1012 :param path: the path of the directory to create. This must be either an
1013 absolute path or a relative path off of the current working directory.
1014 :param mode: the permissions the directory should have once created.
1017 self.require_state("mounted")
1018 path = cstr(path, 'path')
1019 if not isinstance(mode, int):
1020 raise TypeError('mode must be an int')
1025 ret = ceph_mkdir(self.cluster, _path, _mode)
1027 raise make_ex(ret, "error in mkdir {}".format(path.decode('utf-8')))
1029 def mksnap(self, path, name, mode, metadata={}) -> int:
1033 :param path: path of the directory to snapshot.
1034 :param name: snapshot name
1035 :param mode: permission of the snapshot
1036 :param metadata: metadata key/value to store with the snapshot
1038 :raises: :class: `TypeError`
1039 :raises: :class: `Error`
1040 :returns: 0 on success
1043 self.require_state("mounted")
1044 path = cstr(path, 'path')
1045 name = cstr(name, 'name')
1046 if not isinstance(mode, int):
1047 raise TypeError('mode must be an int')
1048 if not isinstance(metadata, dict):
1049 raise TypeError('metadata must be an dictionary')
1051 for key, value in metadata.items():
1052 if not isinstance(key, str) or not isinstance(value, str):
1053 raise TypeError('metadata key and values should be strings')
1054 md[key.encode('utf-8')] = value.encode('utf-8')
1060 snap_metadata *_snap_meta = <snap_metadata *>malloc(nr * sizeof(snap_metadata))
1061 if nr and _snap_meta == NULL:
1062 raise MemoryError("malloc failed")
1064 for key, value in md.items():
1065 _snap_meta[i] = snap_metadata(<char*>key, <char*>value)
1068 ret = ceph_mksnap(self.cluster, _path, _name, _mode, _snap_meta, nr)
1071 raise make_ex(ret, "mksnap error")
1074 def rmsnap(self, path, name) -> int:
1078 :param path: path of the directory for removing snapshot
1079 :param name: snapshot name
1081 :raises: :class: `Error`
1082 :returns: 0 on success
1084 self.require_state("mounted")
1085 path = cstr(path, 'path')
1086 name = cstr(name, 'name')
1090 ret = ceph_rmsnap(self.cluster, _path, _name)
1092 raise make_ex(ret, "rmsnap error")
1095 def snap_info(self, path) -> Dict[str, Any]:
1099 :param path: snapshot path
1101 :raises: :class: `Error`
1102 :returns: snapshot metadata
1104 self.require_state("mounted")
1105 path = cstr(path, 'path')
1109 ret = ceph_get_snap_info(self.cluster, _path, &info)
1111 raise make_ex(ret, "snap_info error")
1113 if info.nr_snap_metadata:
1114 md = {snap_meta.key.decode('utf-8'): snap_meta.value.decode('utf-8') for snap_meta in
1115 info.snap_metadata[:info.nr_snap_metadata]}
1116 ceph_free_snap_info_buffer(&info)
1117 return {'id': info.id, 'metadata': md}
1119 def chmod(self, path, mode) -> None:
1121 Change directory mode.
1123 :param path: the path of the directory to create. This must be either an
1124 absolute path or a relative path off of the current working directory.
1125 :param mode: the permissions the directory should have once created.
1127 self.require_state("mounted")
1128 path = cstr(path, 'path')
1129 if not isinstance(mode, int):
1130 raise TypeError('mode must be an int')
1135 ret = ceph_chmod(self.cluster, _path, _mode)
1137 raise make_ex(ret, "error in chmod {}".format(path.decode('utf-8')))
1139 def lchmod(self, path, mode) -> None:
1141 Change file mode. If the path is a symbolic link, it won't be dereferenced.
1143 :param path: the path of the file. This must be either an absolute path or
1144 a relative path off of the current working directory.
1145 :param mode: the permissions to be set .
1147 self.require_state("mounted")
1148 path = cstr(path, 'path')
1149 if not isinstance(mode, int):
1150 raise TypeError('mode must be an int')
1155 ret = ceph_lchmod(self.cluster, _path, _mode)
1157 raise make_ex(ret, "error in chmod {}".format(path.decode('utf-8')))
1159 def fchmod(self, fd, mode) :
1161 Change file mode based on fd.
1163 :param fd: the file descriptor of the file to change file mode
1164 :param mode: the permissions to be set.
1166 self.require_state("mounted")
1167 if not isinstance(fd, int):
1168 raise TypeError('fd must be an int')
1169 if not isinstance(mode, int):
1170 raise TypeError('mode must be an int')
1175 ret = ceph_fchmod(self.cluster, _fd, _mode)
1177 raise make_ex(ret, "error in fchmod")
1179 def chown(self, path, uid, gid, follow_symlink=True):
1181 Change directory ownership
1183 :param path: the path of the directory to change.
1184 :param uid: the uid to set
1185 :param gid: the gid to set
1186 :param follow_symlink: perform the operation on the target file if @path
1187 is a symbolic link (default)
1189 self.require_state("mounted")
1190 path = cstr(path, 'path')
1191 if not isinstance(uid, int):
1192 raise TypeError('uid must be an int')
1193 elif not isinstance(gid, int):
1194 raise TypeError('gid must be an int')
1202 ret = ceph_chown(self.cluster, _path, _uid, _gid)
1205 ret = ceph_lchown(self.cluster, _path, _uid, _gid)
1207 raise make_ex(ret, "error in chown {}".format(path.decode('utf-8')))
1209 def lchown(self, path, uid, gid):
1211 Change ownership of a symbolic link
1213 :param path: the path of the symbolic link to change.
1214 :param uid: the uid to set
1215 :param gid: the gid to set
1217 self.chown(path, uid, gid, follow_symlink=False)
1219 def fchown(self, fd, uid, gid):
1221 Change file ownership
1223 :param fd: the file descriptor of the file to change ownership
1224 :param uid: the uid to set
1225 :param gid: the gid to set
1227 self.require_state("mounted")
1228 if not isinstance(fd, int):
1229 raise TypeError('fd must be an int')
1230 if not isinstance(uid, int):
1231 raise TypeError('uid must be an int')
1232 elif not isinstance(gid, int):
1233 raise TypeError('gid must be an int')
1240 ret = ceph_fchown(self.cluster, _fd, _uid, _gid)
1242 raise make_ex(ret, "error in fchown")
1244 def mkdirs(self, path, mode):
1246 Create multiple directories at once.
1248 :param path: the full path of directories and sub-directories that should
1250 :param mode: the permissions the directory should have once created
1252 self.require_state("mounted")
1253 path = cstr(path, 'path')
1254 if not isinstance(mode, int):
1255 raise TypeError('mode must be an int')
1261 ret = ceph_mkdirs(self.cluster, _path, _mode)
1263 raise make_ex(ret, "error in mkdirs {}".format(path.decode('utf-8')))
1265 def rmdir(self, path):
1269 :param path: the path of the directory to remove.
1271 self.require_state("mounted")
1272 path = cstr(path, 'path')
1273 cdef char* _path = path
1274 ret = ceph_rmdir(self.cluster, _path)
1276 raise make_ex(ret, "error in rmdir {}".format(path.decode('utf-8')))
1278 def open(self, path, flags, mode=0):
1280 Create and/or open a file.
1282 :param path: the path of the file to open. If the flags parameter includes O_CREAT,
1283 the file will first be created before opening.
1284 :param flags: set of option masks that control how the file is created/opened.
1285 :param mode: the permissions to place on the file if the file does not exist and O_CREAT
1286 is specified in the flags.
1288 self.require_state("mounted")
1289 path = cstr(path, 'path')
1291 if not isinstance(mode, int):
1292 raise TypeError('mode must be an int')
1293 if isinstance(flags, str):
1296 cephfs_flags = os.O_RDONLY
1304 cephfs_flags |= os.O_TRUNC | os.O_CREAT
1305 elif access_flags > 0 and c == '+':
1308 raise make_ex(CEPHFS_EOPNOTSUPP,
1309 "open flags doesn't support %s" % c)
1311 if access_flags == 1:
1312 cephfs_flags |= os.O_RDONLY;
1313 elif access_flags == 2:
1314 cephfs_flags |= os.O_WRONLY;
1316 cephfs_flags |= os.O_RDWR;
1318 elif isinstance(flags, int):
1319 cephfs_flags = flags
1321 raise TypeError("flags must be a string or an integer")
1325 int _flags = cephfs_flags
1329 ret = ceph_open(self.cluster, _path, _flags, _mode)
1331 raise make_ex(ret, "error in open {}".format(path.decode('utf-8')))
1334 def close(self, fd):
1336 Close the open file.
1338 :param fd: the file descriptor referring to the open file.
1341 self.require_state("mounted")
1342 if not isinstance(fd, int):
1343 raise TypeError('fd must be an int')
1346 ret = ceph_close(self.cluster, _fd)
1348 raise make_ex(ret, "error in close")
1350 def read(self, fd, offset, l):
1352 Read data from the file.
1354 :param fd: the file descriptor of the open file to read from.
1355 :param offset: the offset in the file to read from. If this value is negative, the
1356 function reads from the current offset of the file descriptor.
1357 :param l: the flag to indicate what type of seeking to perform
1359 self.require_state("mounted")
1360 if not isinstance(offset, int):
1361 raise TypeError('offset must be an int')
1362 if not isinstance(l, int):
1363 raise TypeError('l must be an int')
1364 if not isinstance(fd, int):
1365 raise TypeError('fd must be an int')
1368 int64_t _offset = offset
1372 PyObject* ret_s = NULL
1374 ret_s = PyBytes_FromStringAndSize(NULL, _length)
1376 ret_buf = PyBytes_AsString(ret_s)
1378 ret = ceph_read(self.cluster, _fd, ret_buf, _length, _offset)
1380 raise make_ex(ret, "error in read")
1383 _PyBytes_Resize(&ret_s, ret)
1385 return <object>ret_s
1387 # We DECREF unconditionally: the cast to object above will have
1388 # INCREFed if necessary. This also takes care of exceptions,
1389 # including if _PyString_Resize fails (that will free the string
1390 # itself and set ret_s to NULL, hence XDECREF).
1391 ref.Py_XDECREF(ret_s)
1393 def preadv(self, fd, buffers, offset):
1395 Write data to a file.
1397 :param fd: the file descriptor of the open file to read from
1398 :param buffers: the list of byte object to read from the file
1399 :param offset: the offset of the file read from. If this value is negative, the
1400 function reads from the current offset of the file descriptor.
1402 self.require_state("mounted")
1403 if not isinstance(fd, int):
1404 raise TypeError('fd must be an int')
1405 if not isinstance(buffers, list):
1406 raise TypeError('buffers must be a list')
1408 if not isinstance(buf, bytearray):
1409 raise TypeError('buffers must be a list of bytes')
1410 if not isinstance(offset, int):
1411 raise TypeError('offset must be an int')
1415 int _iovcnt = len(buffers)
1416 int64_t _offset = offset
1417 iovec *_iov = to_iovec(buffers)
1420 ret = ceph_preadv(self.cluster, _fd, _iov, _iovcnt, _offset)
1422 raise make_ex(ret, "error in preadv")
1427 def write(self, fd, buf, offset):
1429 Write data to a file.
1431 :param fd: the file descriptor of the open file to write to
1432 :param buf: the bytes to write to the file
1433 :param offset: the offset of the file write into. If this value is negative, the
1434 function writes to the current offset of the file descriptor.
1436 self.require_state("mounted")
1437 if not isinstance(fd, int):
1438 raise TypeError('fd must be an int')
1439 if not isinstance(buf, bytes):
1440 raise TypeError('buf must be a bytes')
1441 if not isinstance(offset, int):
1442 raise TypeError('offset must be an int')
1447 int64_t _offset = offset
1449 size_t length = len(buf)
1452 ret = ceph_write(self.cluster, _fd, _data, length, _offset)
1454 raise make_ex(ret, "error in write")
1457 def pwritev(self, fd, buffers, offset):
1459 Write data to a file.
1461 :param fd: the file descriptor of the open file to write to
1462 :param buffers: the list of byte object to write to the file
1463 :param offset: the offset of the file write into. If this value is negative, the
1464 function writes to the current offset of the file descriptor.
1466 self.require_state("mounted")
1467 if not isinstance(fd, int):
1468 raise TypeError('fd must be an int')
1469 if not isinstance(buffers, list):
1470 raise TypeError('buffers must be a list')
1472 if not isinstance(buf, bytes):
1473 raise TypeError('buffers must be a list of bytes')
1474 if not isinstance(offset, int):
1475 raise TypeError('offset must be an int')
1479 int _iovcnt = len(buffers)
1480 int64_t _offset = offset
1481 iovec *_iov = to_iovec(buffers)
1484 ret = ceph_pwritev(self.cluster, _fd, _iov, _iovcnt, _offset)
1486 raise make_ex(ret, "error in pwritev")
1491 def flock(self, fd, operation, owner):
1493 Apply or remove an advisory lock.
1495 :param fd: the open file descriptor to change advisory lock.
1496 :param operation: the advisory lock operation to be performed on the file
1497 :param owner: the user-supplied owner identifier (an arbitrary integer)
1499 self.require_state("mounted")
1500 if not isinstance(fd, int):
1501 raise TypeError('fd must be an int')
1502 if not isinstance(operation, int):
1503 raise TypeError('operation must be an int')
1508 uint64_t _owner = owner
1511 ret = ceph_flock(self.cluster, _fd, _op, _owner)
1513 raise make_ex(ret, "error in write")
1516 def truncate(self, path, size):
1518 Truncate the file to the given size. If this operation causes the
1519 file to expand, the empty bytes will be filled in with zeros.
1521 :param path: the path to the file to truncate.
1522 :param size: the new size of the file.
1525 if not isinstance(size, int):
1526 raise TypeError('size must be a int')
1529 statx_dict["size"] = size
1530 self.setattrx(path, statx_dict, CEPH_SETATTR_SIZE, AT_SYMLINK_NOFOLLOW)
1532 def ftruncate(self, fd, size):
1534 Truncate the file to the given size. If this operation causes the
1535 file to expand, the empty bytes will be filled in with zeros.
1537 :param path: the path to the file to truncate.
1538 :param size: the new size of the file.
1541 if not isinstance(size, int):
1542 raise TypeError('size must be a int')
1545 statx_dict["size"] = size
1546 self.fsetattrx(fd, statx_dict, CEPH_SETATTR_SIZE)
1548 def mknod(self, path, mode, rdev=0):
1550 Make a block or character special file.
1552 :param path: the path to the special file.
1553 :param mode: the permissions to use and the type of special file. The type can be
1554 one of stat.S_IFREG, stat.S_IFCHR, stat.S_IFBLK, stat.S_IFIFO. Both
1555 should be combined using bitwise OR.
1556 :param rdev: If the file type is stat.S_IFCHR or stat.S_IFBLK then this parameter
1557 specifies the major and minor numbers of the newly created device
1558 special file. Otherwise, it is ignored.
1560 self.require_state("mounted")
1561 path = cstr(path, 'path')
1563 if not isinstance(mode, int):
1564 raise TypeError('mode must be an int')
1565 if not isinstance(rdev, int):
1566 raise TypeError('rdev must be an int')
1574 ret = ceph_mknod(self.cluster, _path, _mode, _rdev)
1576 raise make_ex(ret, "error in mknod {}".format(path.decode('utf-8')))
1578 def getxattr(self, path, name, size=255, follow_symlink=True):
1580 Get an extended attribute.
1582 :param path: the path to the file
1583 :param name: the name of the extended attribute to get
1584 :param size: the size of the pre-allocated buffer
1586 self.require_state("mounted")
1588 path = cstr(path, 'path')
1589 name = cstr(name, 'name')
1595 size_t ret_length = size
1596 char *ret_buf = NULL
1599 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
1602 ret = ceph_getxattr(self.cluster, _path, _name, ret_buf,
1606 ret = ceph_lgetxattr(self.cluster, _path, _name, ret_buf,
1610 raise make_ex(ret, "error in getxattr")
1612 return ret_buf[:ret]
1616 def fgetxattr(self, fd, name, size=255):
1618 Get an extended attribute given the fd of a file.
1620 :param fd: the open file descriptor referring to the file
1621 :param name: the name of the extended attribute to get
1622 :param size: the size of the pre-allocated buffer
1624 self.require_state("mounted")
1626 if not isinstance(fd, int):
1627 raise TypeError('fd must be an int')
1628 name = cstr(name, 'name')
1634 size_t ret_length = size
1635 char *ret_buf = NULL
1638 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
1640 ret = ceph_fgetxattr(self.cluster, _fd, _name, ret_buf,
1644 raise make_ex(ret, "error in fgetxattr")
1646 return ret_buf[:ret]
1650 def lgetxattr(self, path, name, size=255):
1652 Get an extended attribute without following symbolic links. This
1653 function is identical to ceph_getxattr, but if the path refers to
1654 a symbolic link, we get the extended attributes of the symlink
1655 rather than the attributes of the file it points to.
1657 :param path: the path to the file
1658 :param name: the name of the extended attribute to get
1659 :param size: the size of the pre-allocated buffer
1662 return self.getxattr(path, name, size=size, follow_symlink=False)
1664 def setxattr(self, path, name, value, flags, follow_symlink=True):
1666 Set an extended attribute on a file.
1668 :param path: the path to the file.
1669 :param name: the name of the extended attribute to set.
1670 :param value: the bytes of the extended attribute value
1672 self.require_state("mounted")
1674 name = cstr(name, 'name')
1675 path = cstr(path, 'path')
1676 if not isinstance(flags, int):
1677 raise TypeError('flags must be a int')
1678 if not isinstance(value, bytes):
1679 raise TypeError('value must be a bytes')
1684 char *_value = value
1685 size_t _value_len = len(value)
1690 ret = ceph_setxattr(self.cluster, _path, _name,
1691 _value, _value_len, _flags)
1694 ret = ceph_lsetxattr(self.cluster, _path, _name,
1695 _value, _value_len, _flags)
1698 raise make_ex(ret, "error in setxattr")
1700 def fsetxattr(self, fd, name, value, flags):
1702 Set an extended attribute on a file.
1704 :param fd: the open file descriptor referring to the file.
1705 :param name: the name of the extended attribute to set.
1706 :param value: the bytes of the extended attribute value
1708 self.require_state("mounted")
1710 name = cstr(name, 'name')
1711 if not isinstance(fd, int):
1712 raise TypeError('fd must be an int')
1713 if not isinstance(flags, int):
1714 raise TypeError('flags must be a int')
1715 if not isinstance(value, bytes):
1716 raise TypeError('value must be a bytes')
1721 char *_value = value
1722 size_t _value_len = len(value)
1726 ret = ceph_fsetxattr(self.cluster, _fd, _name,
1727 _value, _value_len, _flags)
1729 raise make_ex(ret, "error in fsetxattr")
1731 def lsetxattr(self, path, name, value, flags):
1733 Set an extended attribute on a file but do not follow symbolic link.
1735 :param path: the path to the file.
1736 :param name: the name of the extended attribute to set.
1737 :param value: the bytes of the extended attribute value
1740 self.setxattr(path, name, value, flags, follow_symlink=False)
1742 def removexattr(self, path, name, follow_symlink=True):
1744 Remove an extended attribute of a file.
1746 :param path: path of the file.
1747 :param name: name of the extended attribute to remove.
1749 self.require_state("mounted")
1751 name = cstr(name, 'name')
1752 path = cstr(path, 'path')
1760 ret = ceph_removexattr(self.cluster, _path, _name)
1763 ret = ceph_lremovexattr(self.cluster, _path, _name)
1766 raise make_ex(ret, "error in removexattr")
1768 def fremovexattr(self, fd, name):
1770 Remove an extended attribute of a file.
1772 :param fd: the open file descriptor referring to the file.
1773 :param name: name of the extended attribute to remove.
1775 self.require_state("mounted")
1777 if not isinstance(fd, int):
1778 raise TypeError('fd must be an int')
1779 name = cstr(name, 'name')
1786 ret = ceph_fremovexattr(self.cluster, _fd, _name)
1788 raise make_ex(ret, "error in fremovexattr")
1790 def lremovexattr(self, path, name):
1792 Remove an extended attribute of a file but do not follow symbolic link.
1794 :param path: path of the file.
1795 :param name: name of the extended attribute to remove.
1797 self.removexattr(path, name, follow_symlink=False)
1799 def listxattr(self, path, size=65536, follow_symlink=True):
1801 List the extended attribute keys set on a file.
1803 :param path: path of the file.
1804 :param size: the size of list buffer to be filled with extended attribute keys.
1806 self.require_state("mounted")
1808 path = cstr(path, 'path')
1812 char *ret_buf = NULL
1813 size_t ret_length = size
1816 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
1819 ret = ceph_listxattr(self.cluster, _path, ret_buf, ret_length)
1822 ret = ceph_llistxattr(self.cluster, _path, ret_buf, ret_length)
1825 raise make_ex(ret, "error in listxattr")
1827 return ret, ret_buf[:ret]
1831 def flistxattr(self, fd, size=65536):
1833 List the extended attribute keys set on a file.
1835 :param fd: the open file descriptor referring to the file.
1836 :param size: the size of list buffer to be filled with extended attribute keys.
1838 self.require_state("mounted")
1840 if not isinstance(fd, int):
1841 raise TypeError('fd must be an int')
1845 char *ret_buf = NULL
1846 size_t ret_length = size
1849 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
1851 ret = ceph_flistxattr(self.cluster, _fd, ret_buf, ret_length)
1854 raise make_ex(ret, "error in flistxattr")
1856 return ret, ret_buf[:ret]
1860 def llistxattr(self, path, size=65536):
1862 List the extended attribute keys set on a file but do not follow symbolic link.
1864 :param path: path of the file.
1865 :param size: the size of list buffer to be filled with extended attribute keys.
1868 return self.listxattr(path, size=size, follow_symlink=False)
1870 def stat(self, path, follow_symlink=True):
1872 Get a file's extended statistics and attributes.
1874 :param path: the file or directory to get the statistics of.
1876 self.require_state("mounted")
1877 path = cstr(path, 'path')
1885 ret = ceph_statx(self.cluster, _path, &stx,
1886 CEPH_STATX_BASIC_STATS_CDEF, 0)
1889 ret = ceph_statx(self.cluster, _path, &stx,
1890 CEPH_STATX_BASIC_STATS_CDEF, AT_SYMLINK_NOFOLLOW_CDEF)
1893 raise make_ex(ret, "error in stat: {}".format(path.decode('utf-8')))
1894 return StatResult(st_dev=stx.stx_dev, st_ino=stx.stx_ino,
1895 st_mode=stx.stx_mode, st_nlink=stx.stx_nlink,
1896 st_uid=stx.stx_uid, st_gid=stx.stx_gid,
1897 st_rdev=stx.stx_rdev, st_size=stx.stx_size,
1898 st_blksize=stx.stx_blksize,
1899 st_blocks=stx.stx_blocks,
1900 st_atime=datetime.fromtimestamp(stx.stx_atime.tv_sec),
1901 st_mtime=datetime.fromtimestamp(stx.stx_mtime.tv_sec),
1902 st_ctime=datetime.fromtimestamp(stx.stx_ctime.tv_sec))
1904 def lstat(self, path):
1906 Get a file's extended statistics and attributes. When file's a
1907 symbolic link, return the informaion of the link itself rather
1908 than that of the file it points too.
1910 :param path: the file or directory to get the statistics of.
1912 return self.stat(path, follow_symlink=False)
1914 def fstat(self, fd):
1916 Get an open file's extended statistics and attributes.
1918 :param fd: the file descriptor of the file to get statistics of.
1920 self.require_state("mounted")
1921 if not isinstance(fd, int):
1922 raise TypeError('fd must be an int')
1929 ret = ceph_fstatx(self.cluster, _fd, &stx,
1930 CEPH_STATX_BASIC_STATS_CDEF, 0)
1932 raise make_ex(ret, "error in fsat")
1933 return StatResult(st_dev=stx.stx_dev, st_ino=stx.stx_ino,
1934 st_mode=stx.stx_mode, st_nlink=stx.stx_nlink,
1935 st_uid=stx.stx_uid, st_gid=stx.stx_gid,
1936 st_rdev=stx.stx_rdev, st_size=stx.stx_size,
1937 st_blksize=stx.stx_blksize,
1938 st_blocks=stx.stx_blocks,
1939 st_atime=datetime.fromtimestamp(stx.stx_atime.tv_sec),
1940 st_mtime=datetime.fromtimestamp(stx.stx_mtime.tv_sec),
1941 st_ctime=datetime.fromtimestamp(stx.stx_ctime.tv_sec))
1943 def statx(self, path, mask, flag):
1945 Get a file's extended statistics and attributes.
1947 :param path: the file or directory to get the statistics of.
1948 :param mask: want bitfield of CEPH_STATX_* flags showing designed attributes.
1949 :param flag: bitfield that can be used to set AT_* modifier flags (AT_STATX_SYNC_AS_STAT, AT_STATX_FORCE_SYNC, AT_STATX_DONT_SYNC and AT_SYMLINK_NOFOLLOW)
1952 self.require_state("mounted")
1953 path = cstr(path, 'path')
1954 if not isinstance(mask, int):
1955 raise TypeError('flag must be a int')
1956 if not isinstance(flag, int):
1957 raise TypeError('flag must be a int')
1964 dict_result = dict()
1967 ret = ceph_statx(self.cluster, _path, &stx, _mask, _flag)
1969 raise make_ex(ret, "error in stat: %s" % path)
1971 if (_mask & CEPH_STATX_MODE):
1972 dict_result["mode"] = stx.stx_mode
1973 if (_mask & CEPH_STATX_NLINK):
1974 dict_result["nlink"] = stx.stx_nlink
1975 if (_mask & CEPH_STATX_UID):
1976 dict_result["uid"] = stx.stx_uid
1977 if (_mask & CEPH_STATX_GID):
1978 dict_result["gid"] = stx.stx_gid
1979 if (_mask & CEPH_STATX_RDEV):
1980 dict_result["rdev"] = stx.stx_rdev
1981 if (_mask & CEPH_STATX_ATIME):
1982 dict_result["atime"] = datetime.fromtimestamp(stx.stx_atime.tv_sec)
1983 if (_mask & CEPH_STATX_MTIME):
1984 dict_result["mtime"] = datetime.fromtimestamp(stx.stx_mtime.tv_sec)
1985 if (_mask & CEPH_STATX_CTIME):
1986 dict_result["ctime"] = datetime.fromtimestamp(stx.stx_ctime.tv_sec)
1987 if (_mask & CEPH_STATX_INO):
1988 dict_result["ino"] = stx.stx_ino
1989 if (_mask & CEPH_STATX_SIZE):
1990 dict_result["size"] = stx.stx_size
1991 if (_mask & CEPH_STATX_BLOCKS):
1992 dict_result["blocks"] = stx.stx_blocks
1993 if (_mask & CEPH_STATX_BTIME):
1994 dict_result["btime"] = datetime.fromtimestamp(stx.stx_btime.tv_sec)
1995 if (_mask & CEPH_STATX_VERSION):
1996 dict_result["version"] = stx.stx_version
2000 def setattrx(self, path, dict_stx, mask, flags):
2002 Set a file's attributes.
2004 :param path: the path to the file/directory to set the attributes of.
2005 :param mask: a mask of all the CEPH_SETATTR_* values that have been set in the statx struct.
2006 :param stx: a dict of statx structure that must include attribute values to set on the file.
2007 :param flags: mask of AT_* flags (only AT_ATTR_NOFOLLOW is respected for now)
2010 self.require_state("mounted")
2011 path = cstr(path, 'path')
2012 if not isinstance(dict_stx, dict):
2013 raise TypeError('dict_stx must be a dict')
2014 if not isinstance(mask, int):
2015 raise TypeError('mask must be a int')
2016 if not isinstance(flags, int):
2017 raise TypeError('flags must be a int')
2021 if (mask & CEPH_SETATTR_MODE):
2022 stx.stx_mode = dict_stx["mode"]
2023 if (mask & CEPH_SETATTR_UID):
2024 stx.stx_uid = dict_stx["uid"]
2025 if (mask & CEPH_SETATTR_GID):
2026 stx.stx_gid = dict_stx["gid"]
2027 if (mask & CEPH_SETATTR_MTIME):
2028 stx.stx_mtime = to_timespec(dict_stx["mtime"].timestamp())
2029 if (mask & CEPH_SETATTR_ATIME):
2030 stx.stx_atime = to_timespec(dict_stx["atime"].timestamp())
2031 if (mask & CEPH_SETATTR_CTIME):
2032 stx.stx_ctime = to_timespec(dict_stx["ctime"].timestamp())
2033 if (mask & CEPH_SETATTR_SIZE):
2034 stx.stx_size = dict_stx["size"]
2035 if (mask & CEPH_SETATTR_BTIME):
2036 stx.stx_btime = to_timespec(dict_stx["btime"].timestamp())
2042 dict_result = dict()
2045 ret = ceph_setattrx(self.cluster, _path, &stx, _mask, _flags)
2047 raise make_ex(ret, "error in setattrx: %s" % path)
2049 def fsetattrx(self, fd, dict_stx, mask):
2051 Set a file's attributes.
2053 :param path: the path to the file/directory to set the attributes of.
2054 :param mask: a mask of all the CEPH_SETATTR_* values that have been set in the statx struct.
2055 :param stx: a dict of statx structure that must include attribute values to set on the file.
2058 self.require_state("mounted")
2059 if not isinstance(fd, int):
2060 raise TypeError('fd must be a int')
2061 if not isinstance(dict_stx, dict):
2062 raise TypeError('dict_stx must be a dict')
2063 if not isinstance(mask, int):
2064 raise TypeError('mask must be a int')
2068 if (mask & CEPH_SETATTR_MODE):
2069 stx.stx_mode = dict_stx["mode"]
2070 if (mask & CEPH_SETATTR_UID):
2071 stx.stx_uid = dict_stx["uid"]
2072 if (mask & CEPH_SETATTR_GID):
2073 stx.stx_gid = dict_stx["gid"]
2074 if (mask & CEPH_SETATTR_MTIME):
2075 stx.stx_mtime = to_timespec(dict_stx["mtime"].timestamp())
2076 if (mask & CEPH_SETATTR_ATIME):
2077 stx.stx_atime = to_timespec(dict_stx["atime"].timestamp())
2078 if (mask & CEPH_SETATTR_CTIME):
2079 stx.stx_ctime = to_timespec(dict_stx["ctime"].timestamp())
2080 if (mask & CEPH_SETATTR_SIZE):
2081 stx.stx_size = dict_stx["size"]
2082 if (mask & CEPH_SETATTR_BTIME):
2083 stx.stx_btime = to_timespec(dict_stx["btime"].timestamp())
2088 dict_result = dict()
2091 ret = ceph_fsetattrx(self.cluster, _fd, &stx, _mask)
2093 raise make_ex(ret, "error in fsetattrx")
2095 def symlink(self, existing, newname):
2097 Creates a symbolic link.
2099 :param existing: the path to the existing file/directory to link to.
2100 :param newname: the path to the new file/directory to link from.
2102 self.require_state("mounted")
2103 existing = cstr(existing, 'existing')
2104 newname = cstr(newname, 'newname')
2106 char* _existing = existing
2107 char* _newname = newname
2110 ret = ceph_symlink(self.cluster, _existing, _newname)
2112 raise make_ex(ret, "error in symlink")
2114 def link(self, existing, newname):
2118 :param existing: the path to the existing file/directory to link to.
2119 :param newname: the path to the new file/directory to link from.
2122 self.require_state("mounted")
2123 existing = cstr(existing, 'existing')
2124 newname = cstr(newname, 'newname')
2126 char* _existing = existing
2127 char* _newname = newname
2130 ret = ceph_link(self.cluster, _existing, _newname)
2132 raise make_ex(ret, "error in link")
2134 def readlink(self, path, size) -> bytes:
2136 Read a symbolic link.
2138 :param path: the path to the symlink to read
2139 :param size: the length of the buffer
2140 :returns: buffer to hold the path of the file that the symlink points to.
2142 self.require_state("mounted")
2143 path = cstr(path, 'path')
2147 int64_t _size = size
2151 buf = <char *>realloc_chk(buf, _size)
2153 ret = ceph_readlink(self.cluster, _path, buf, _size)
2155 raise make_ex(ret, "error in readlink")
2160 def unlink(self, path):
2162 Removes a file, link, or symbolic link. If the file/link has multiple links to it, the
2163 file will not disappear from the namespace until all references to it are removed.
2165 :param path: the path of the file or link to unlink.
2167 self.require_state("mounted")
2168 path = cstr(path, 'path')
2169 cdef char* _path = path
2171 ret = ceph_unlink(self.cluster, _path)
2173 raise make_ex(ret, "error in unlink: {}".format(path.decode('utf-8')))
2175 def rename(self, src, dst):
2177 Rename a file or directory.
2179 :param src: the path to the existing file or directory.
2180 :param dst: the new name of the file or directory.
2183 self.require_state("mounted")
2185 src = cstr(src, 'source')
2186 dst = cstr(dst, 'destination')
2193 ret = ceph_rename(self.cluster, _src, _dst)
2195 raise make_ex(ret, "error in rename {} to {}".format(src.decode(
2196 'utf-8'), dst.decode('utf-8')))
2198 def mds_command(self, mds_spec, args, input_data):
2200 :returns: 3-tuple of output status int, output status string, output data
2202 mds_spec = cstr(mds_spec, 'mds_spec')
2203 args = cstr(args, 'args')
2204 input_data = cstr(input_data, 'input_data')
2207 char *_mds_spec = opt_str(mds_spec)
2208 char **_cmd = to_bytes_array([args])
2211 char *_inbuf = input_data
2212 size_t _inbuf_len = len(input_data)
2214 char *_outbuf = NULL
2215 size_t _outbuf_len = 0
2217 size_t _outs_len = 0
2221 ret = ceph_mds_command(self.cluster, _mds_spec,
2222 <const char **>_cmd, _cmdlen,
2223 <const char*>_inbuf, _inbuf_len,
2224 &_outbuf, &_outbuf_len,
2226 my_outs = decode_cstr(_outs[:_outs_len])
2227 my_outbuf = _outbuf[:_outbuf_len]
2229 ceph_buffer_free(_outs)
2231 ceph_buffer_free(_outbuf)
2232 return (ret, my_outbuf, my_outs)
2236 def umask(self, mode) :
2237 self.require_state("mounted")
2241 ret = ceph_umask(self.cluster, _mode)
2243 raise make_ex(ret, "error in umask")
2246 def lseek(self, fd, offset, whence):
2248 Set the file's current position.
2250 :param fd: the file descriptor of the open file to read from.
2251 :param offset: the offset in the file to read from. If this value is negative, the
2252 function reads from the current offset of the file descriptor.
2253 :param whence: the flag to indicate what type of seeking to performs:SEEK_SET, SEEK_CUR, SEEK_END
2255 self.require_state("mounted")
2256 if not isinstance(fd, int):
2257 raise TypeError('fd must be an int')
2258 if not isinstance(offset, int):
2259 raise TypeError('offset must be an int')
2260 if not isinstance(whence, int):
2261 raise TypeError('whence must be an int')
2265 int64_t _offset = offset
2266 int64_t _whence = whence
2269 ret = ceph_lseek(self.cluster, _fd, _offset, _whence)
2272 raise make_ex(ret, "error in lseek")
2276 def utime(self, path, times=None):
2278 Set access and modification time for path
2280 :param path: file path for which timestamps have to be changed
2281 :param times: if times is not None, it must be a tuple (atime, mtime)
2284 self.require_state("mounted")
2285 path = cstr(path, 'path')
2287 if not isinstance(times, tuple):
2288 raise TypeError('times must be a tuple')
2289 if not isinstance(times[0], int):
2290 raise TypeError('atime must be an int')
2291 if not isinstance(times[1], int):
2292 raise TypeError('mtime must be an int')
2293 actime = modtime = int(time.time())
2300 utimbuf buf = utimbuf(actime, modtime)
2302 ret = ceph_utime(self.cluster, pth, &buf)
2304 raise make_ex(ret, "error in utime {}".format(path.decode('utf-8')))
2306 def futime(self, fd, times=None):
2308 Set access and modification time for a file pointed by descriptor
2310 :param fd: file descriptor of the open file
2311 :param times: if times is not None, it must be a tuple (atime, mtime)
2314 self.require_state("mounted")
2315 if not isinstance(fd, int):
2316 raise TypeError('fd must be an int')
2318 if not isinstance(times, tuple):
2319 raise TypeError('times must be a tuple')
2320 if not isinstance(times[0], int):
2321 raise TypeError('atime must be an int')
2322 if not isinstance(times[1], int):
2323 raise TypeError('mtime must be an int')
2324 actime = modtime = int(time.time())
2331 utimbuf buf = utimbuf(actime, modtime)
2333 ret = ceph_futime(self.cluster, _fd, &buf)
2335 raise make_ex(ret, "error in futime")
2337 def utimes(self, path, times=None, follow_symlink=True):
2339 Set access and modification time for path
2341 :param path: file path for which timestamps have to be changed
2342 :param times: if times is not None, it must be a tuple (atime, mtime)
2343 :param follow_symlink: perform the operation on the target file if @path
2344 is a symbolic link (default)
2347 self.require_state("mounted")
2348 path = cstr(path, 'path')
2350 if not isinstance(times, tuple):
2351 raise TypeError('times must be a tuple')
2352 if not isinstance(times[0], (int, float)):
2353 raise TypeError('atime must be an int or a float')
2354 if not isinstance(times[1], (int, float)):
2355 raise TypeError('mtime must be an int or a float')
2356 actime = modtime = time.time()
2358 actime = float(times[0])
2359 modtime = float(times[1])
2363 timeval *buf = [to_timeval(actime), to_timeval(modtime)]
2366 ret = ceph_utimes(self.cluster, pth, buf)
2369 ret = ceph_lutimes(self.cluster, pth, buf)
2371 raise make_ex(ret, "error in utimes {}".format(path.decode('utf-8')))
2373 def lutimes(self, path, times=None):
2375 Set access and modification time for a file. If the file is a symbolic
2376 link do not follow to the target.
2378 :param path: file path for which timestamps have to be changed
2379 :param times: if times is not None, it must be a tuple (atime, mtime)
2381 self.utimes(path, times=times, follow_symlink=False)
2383 def futimes(self, fd, times=None):
2385 Set access and modification time for a file pointer by descriptor
2387 :param fd: file descriptor of the open file
2388 :param times: if times is not None, it must be a tuple (atime, mtime)
2391 self.require_state("mounted")
2392 if not isinstance(fd, int):
2393 raise TypeError('fd must be an int')
2395 if not isinstance(times, tuple):
2396 raise TypeError('times must be a tuple')
2397 if not isinstance(times[0], (int, float)):
2398 raise TypeError('atime must be an int or a float')
2399 if not isinstance(times[1], (int, float)):
2400 raise TypeError('mtime must be an int or a float')
2401 actime = modtime = time.time()
2403 actime = float(times[0])
2404 modtime = float(times[1])
2408 timeval *buf = [to_timeval(actime), to_timeval(modtime)]
2410 ret = ceph_futimes(self.cluster, _fd, buf)
2412 raise make_ex(ret, "error in futimes")
2414 def futimens(self, fd, times=None):
2416 Set access and modification time for a file pointer by descriptor
2418 :param fd: file descriptor of the open file
2419 :param times: if times is not None, it must be a tuple (atime, mtime)
2422 self.require_state("mounted")
2423 if not isinstance(fd, int):
2424 raise TypeError('fd must be an int')
2426 if not isinstance(times, tuple):
2427 raise TypeError('times must be a tuple')
2428 if not isinstance(times[0], (int, float)):
2429 raise TypeError('atime must be an int or a float')
2430 if not isinstance(times[1], (int, float)):
2431 raise TypeError('mtime must be an int or a float')
2432 actime = modtime = time.time()
2434 actime = float(times[0])
2435 modtime = float(times[1])
2439 timespec *buf = [to_timespec(actime), to_timespec(modtime)]
2441 ret = ceph_futimens(self.cluster, _fd, buf)
2443 raise make_ex(ret, "error in futimens")
2445 def get_file_replication(self, fd):
2447 Get the file replication information from an open file descriptor.
2449 :param fd: the open file descriptor referring to the file to get
2450 the replication information of.
2452 self.require_state("mounted")
2453 if not isinstance(fd, int):
2454 raise TypeError('fd must be an int')
2460 ret = ceph_get_file_replication(self.cluster, _fd)
2462 raise make_ex(ret, "error in get_file_replication")
2466 def get_path_replication(self, path):
2468 Get the file replication information given the path.
2470 :param path: the path of the file/directory to get the replication information of.
2472 self.require_state("mounted")
2473 path = cstr(path, 'path')
2479 ret = ceph_get_path_replication(self.cluster, _path)
2481 raise make_ex(ret, "error in get_path_replication")
2485 def get_pool_id(self, pool_name):
2487 Get the id of the named pool.
2489 :param pool_name: the name of the pool.
2492 self.require_state("mounted")
2493 pool_name = cstr(pool_name, 'pool_name')
2496 char* _pool_name = pool_name
2499 ret = ceph_get_pool_id(self.cluster, _pool_name)
2501 raise make_ex(ret, "error in get_pool_id")
2505 def get_pool_replication(self, pool_id):
2507 Get the pool replication factor.
2509 :param pool_id: the pool id to look up
2512 self.require_state("mounted")
2513 if not isinstance(pool_id, int):
2514 raise TypeError('pool_id must be an int')
2517 int _pool_id = pool_id
2520 ret = ceph_get_pool_replication(self.cluster, _pool_id)
2522 raise make_ex(ret, "error in get_pool_replication")
2526 def debug_get_fd_caps(self, fd):
2528 Get the capabilities currently issued to the client given the fd.
2530 :param fd: the file descriptor to get issued
2533 self.require_state("mounted")
2534 if not isinstance(fd, int):
2535 raise TypeError('fd must be an int')
2541 ret = ceph_debug_get_fd_caps(self.cluster, _fd)
2543 raise make_ex(ret, "error in debug_get_fd_caps")
2547 def debug_get_file_caps(self, path):
2549 Get the capabilities currently issued to the client given the path.
2551 :param path: the path of the file/directory to get the capabilities of.
2554 self.require_state("mounted")
2555 path = cstr(path, 'path')
2561 ret = ceph_debug_get_file_caps(self.cluster, _path)
2563 raise make_ex(ret, "error in debug_get_file_caps")
2567 def get_cap_return_timeout(self):
2569 Get the amount of time that the client has to return caps
2571 In the event that a client does not return its caps, the MDS may blocklist
2572 it after this timeout. Applications should check this value and ensure
2573 that they set the delegation timeout to a value lower than this.
2576 self.require_state("mounted")
2579 ret = ceph_get_cap_return_timeout(self.cluster)
2581 raise make_ex(ret, "error in get_cap_return_timeout")
2585 def set_uuid(self, uuid):
2587 Set ceph client uuid. Must be called before mount.
2589 :param uuid: the uuid to set
2592 uuid = cstr(uuid, 'uuid')
2598 ceph_set_uuid(self.cluster, _uuid)
2600 def set_session_timeout(self, timeout):
2602 Set ceph client session timeout. Must be called before mount.
2604 :param timeout: the timeout to set
2607 if not isinstance(timeout, int):
2608 raise TypeError('timeout must be an int')
2611 int _timeout = timeout
2614 ceph_set_session_timeout(self.cluster, _timeout)
2616 def get_layout(self, fd):
2618 Get the file layout from an open file descriptor.
2620 :param fd: the open file descriptor referring to the file to get the layout of.
2623 if not isinstance(fd, int):
2624 raise TypeError('fd must be an int')
2634 dict_result = dict()
2637 ret = ceph_get_file_layout(self.cluster, _fd, &stripe_unit, &stripe_count, &object_size, &pool_id)
2639 raise make_ex(stripe_unit, "error in get_file_layout")
2640 dict_result["stripe_unit"] = stripe_unit
2641 dict_result["stripe_count"] = stripe_count
2642 dict_result["object_size"] = object_size
2643 dict_result["pool_id"] = pool_id
2647 buf = <char *>realloc_chk(buf, buflen)
2649 ret = ceph_get_file_pool_name(self.cluster, _fd, buf, buflen)
2651 dict_result["pool_name"] = decode_cstr(buf)
2653 elif ret == -CEPHFS_ERANGE:
2656 raise make_ex(ret, "error in get_file_pool_name")
2661 def get_default_pool(self):
2663 Get the default pool name and id of cephfs. This returns dict{pool_name, pool_id}.
2669 dict_result = dict()
2673 buf = <char *>realloc_chk(buf, buflen)
2675 ret = ceph_get_default_data_pool_name(self.cluster, buf, buflen)
2677 dict_result["pool_name"] = decode_cstr(buf)
2679 elif ret == -CEPHFS_ERANGE:
2682 raise make_ex(ret, "error in get_default_data_pool_name")
2685 ret = ceph_get_pool_id(self.cluster, buf)
2687 raise make_ex(ret, "error in get_pool_id")
2688 dict_result["pool_id"] = ret