2 This module is a thin wrapper around libcephfs.
5 from cpython cimport PyObject, ref, exc
6 from libc.stdint cimport *
7 from libc.stdlib cimport malloc, realloc, free
11 include "mock_cephfs.pxi"
16 from c_cephfs cimport *
17 from rados cimport Rados
19 from collections import namedtuple
20 from datetime import datetime
23 from typing import Any, Dict, Optional
25 AT_NO_ATTR_SYNC = 0x4000
26 AT_SYMLINK_NOFOLLOW = 0x100
27 cdef int AT_SYMLINK_NOFOLLOW_CDEF = AT_SYMLINK_NOFOLLOW
28 CEPH_STATX_BASIC_STATS = 0x7ff
29 cdef int CEPH_STATX_BASIC_STATS_CDEF = CEPH_STATX_BASIC_STATS
31 CEPH_STATX_NLINK = 0x2
34 CEPH_STATX_RDEV = 0x10
35 CEPH_STATX_ATIME = 0x20
36 CEPH_STATX_MTIME = 0x40
37 CEPH_STATX_CTIME = 0x80
38 CEPH_STATX_INO = 0x100
39 CEPH_STATX_SIZE = 0x200
40 CEPH_STATX_BLOCKS = 0x400
41 CEPH_STATX_BTIME = 0x800
42 CEPH_STATX_VERSION = 0x1000
44 FALLOC_FL_KEEP_SIZE = 0x01
45 FALLOC_FL_PUNCH_HOLE = 0x02
46 FALLOC_FL_NO_HIDE_STALE = 0x04
48 CEPH_SETATTR_MODE = 0x1
49 CEPH_SETATTR_UID = 0x2
50 CEPH_SETATTR_GID = 0x4
51 CEPH_SETATTR_MTIME = 0x8
52 CEPH_SETATTR_ATIME = 0x10
53 CEPH_SETATTR_SIZE = 0x20
54 CEPH_SETATTR_CTIME = 0x40
55 CEPH_SETATTR_BTIME = 0x200
59 CEPHFS_EBLOCKLISTED = 108
63 CEPHFS_ETIMEDOUT = 110
77 CEPHFS_ENAMETOOLONG = 36
83 CEPHFS_ECANCELED = 125
85 CEPHFS_EOPNOTSUPP = 95
88 CEPHFS_ENOTRECOVERABLE = 131
90 CEPHFS_EWOULDBLOCK = CEPHFS_EAGAIN
93 CEPHFS_EDEADLOCK = CEPHFS_EDEADLK
99 cdef extern from "Python.h":
100 # These are in cpython/string.pxd, but use "object" types instead of
101 # PyObject*, which invokes assumptions in cpython that we need to
102 # legitimately break to implement zero-copy string buffers in Image.read().
103 # This is valid use of the Python API and documented as a special case.
104 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
105 char* PyBytes_AsString(PyObject *string) except NULL
106 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
107 void PyEval_InitThreads()
110 class Error(Exception):
111 def get_error_code(self):
115 class LibCephFSStateError(Error):
119 class OSError(Error):
120 def __init__(self, errno, strerror):
121 super(OSError, self).__init__(errno, strerror)
123 self.strerror = "%s: %s" % (strerror, os.strerror(errno))
126 return '{} [Errno {}]'.format(self.strerror, self.errno)
128 def get_error_code(self):
132 class PermissionError(OSError):
136 class ObjectNotFound(OSError):
140 class NoData(OSError):
144 class ObjectExists(OSError):
148 class IOError(OSError):
152 class NoSpace(OSError):
156 class InvalidValue(OSError):
160 class OperationNotSupported(OSError):
164 class WouldBlock(OSError):
168 class OutOfRange(OSError):
172 class ObjectNotEmpty(OSError):
175 class NotDirectory(OSError):
178 class DiskQuotaExceeded(OSError):
182 cdef errno_to_exception = {
183 CEPHFS_EPERM : PermissionError,
184 CEPHFS_ENOENT : ObjectNotFound,
185 CEPHFS_EIO : IOError,
186 CEPHFS_ENOSPC : NoSpace,
187 CEPHFS_EEXIST : ObjectExists,
188 CEPHFS_ENODATA : NoData,
189 CEPHFS_EINVAL : InvalidValue,
190 CEPHFS_EOPNOTSUPP : OperationNotSupported,
191 CEPHFS_ERANGE : OutOfRange,
192 CEPHFS_EWOULDBLOCK: WouldBlock,
193 CEPHFS_ENOTEMPTY : ObjectNotEmpty,
194 CEPHFS_ENOTDIR : NotDirectory,
195 CEPHFS_EDQUOT : DiskQuotaExceeded,
199 cdef make_ex(ret, msg):
201 Translate a libcephfs return code into an exception.
203 :param ret: the return code
205 :param msg: the error message to use
207 :returns: a subclass of :class:`Error`
210 if ret in errno_to_exception:
211 return errno_to_exception[ret](ret, msg)
213 return OSError(ret, msg)
216 class DirEntry(namedtuple('DirEntry',
217 ['d_ino', 'd_off', 'd_reclen', 'd_type', 'd_name'])):
222 return self.d_type == self.DT_DIR
224 def is_symbol_file(self):
225 return self.d_type == self.DT_LNK
228 return self.d_type == self.DT_REG
230 StatResult = namedtuple('StatResult',
231 ["st_dev", "st_ino", "st_mode", "st_nlink", "st_uid",
232 "st_gid", "st_rdev", "st_size", "st_blksize",
233 "st_blocks", "st_atime", "st_mtime", "st_ctime"])
235 cdef class DirResult(object):
237 cdef ceph_dir_result* handle
239 # Bug in older Cython instances prevents this from being a static method.
241 # cdef create(LibCephFS lib, ceph_dir_result* handle):
247 def __dealloc__(self):
252 raise make_ex(CEPHFS_EBADF, "dir is not open")
253 self.lib.require_state("mounted")
255 ceph_rewinddir(self.lib.cluster, self.handle)
258 def __exit__(self, type_, value, traceback):
263 self.lib.require_state("mounted")
266 dirent = ceph_readdir(self.lib.cluster, self.handle)
270 IF UNAME_SYSNAME == "FreeBSD" or UNAME_SYSNAME == "Darwin":
271 return DirEntry(d_ino=dirent.d_ino,
273 d_reclen=dirent.d_reclen,
274 d_type=dirent.d_type,
275 d_name=dirent.d_name)
277 return DirEntry(d_ino=dirent.d_ino,
279 d_reclen=dirent.d_reclen,
280 d_type=dirent.d_type,
281 d_name=dirent.d_name)
285 self.lib.require_state("mounted")
287 ret = ceph_closedir(self.lib.cluster, self.handle)
289 raise make_ex(ret, "closedir failed")
294 raise make_ex(CEPHFS_EBADF, "dir is not open")
295 self.lib.require_state("mounted")
297 ceph_rewinddir(self.lib.cluster, self.handle)
301 raise make_ex(CEPHFS_EBADF, "dir is not open")
302 self.lib.require_state("mounted")
304 ret = ceph_telldir(self.lib.cluster, self.handle)
306 raise make_ex(ret, "telldir failed")
309 def seekdir(self, offset):
311 raise make_ex(CEPHFS_EBADF, "dir is not open")
312 if not isinstance(offset, int):
313 raise TypeError('offset must be an int')
314 self.lib.require_state("mounted")
315 cdef int64_t _offset = offset
317 ceph_seekdir(self.lib.cluster, self.handle, _offset)
320 def cstr(val, name, encoding="utf-8", opt=False) -> bytes:
322 Create a byte string from a Python string
324 :param basestring val: Python string
325 :param str name: Name of the string parameter, for exceptions
326 :param str encoding: Encoding to use
327 :param bool opt: If True, None is allowed
328 :raises: :class:`InvalidArgument`
330 if opt and val is None:
332 if isinstance(val, bytes):
336 v = val.encode(encoding)
338 raise TypeError('%s must be encodeable as a bytearray' % name)
339 assert isinstance(v, bytes)
342 def cstr_list(list_str, name, encoding="utf-8"):
343 return [cstr(s, name) for s in list_str]
346 def decode_cstr(val, encoding="utf-8") -> Optional[str]:
348 Decode a byte string into a Python string.
350 :param bytes val: byte string
355 return val.decode(encoding)
357 cdef timeval to_timeval(t):
359 return timeval equivalent from time
362 cdef timeval buf = timeval(tt, (t - tt) * 1000000)
365 cdef timespec to_timespec(t):
367 return timespec equivalent from time
370 cdef timespec buf = timespec(tt, (t - tt) * 1000000000)
373 cdef char* opt_str(s) except? NULL:
379 cdef char ** to_bytes_array(list_bytes):
380 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
382 raise MemoryError("malloc failed")
383 for i in range(len(list_bytes)):
384 ret[i] = <char *>list_bytes[i]
388 cdef void* realloc_chk(void* ptr, size_t size) except NULL:
389 cdef void *ret = realloc(ptr, size)
391 raise MemoryError("realloc failed")
395 cdef iovec * to_iovec(buffers) except NULL:
396 cdef iovec *iov = <iovec *>malloc(len(buffers) * sizeof(iovec))
399 raise MemoryError("malloc failed")
400 for i in xrange(len(buffers)):
401 s = <char*>buffers[i]
402 iov[i] = [<void*>s, len(buffers[i])]
406 cdef class LibCephFS(object):
407 """libcephfs python wrapper"""
409 cdef public object state
410 cdef ceph_mount_info *cluster
412 def require_state(self, *args):
413 if self.state in args:
415 raise LibCephFSStateError("You cannot perform that operation on a "
416 "CephFS object in state %s." % (self.state))
418 def __cinit__(self, conf=None, conffile=None, auth_id=None, rados_inst=None):
419 """Create a libcephfs wrapper
421 :param conf dict opt: settings overriding the default ones and conffile
422 :param conffile str opt: the path to ceph.conf to override the default settings
423 :auth_id str opt: the id used to authenticate the client entity
424 :rados_inst Rados opt: a rados.Rados instance
427 self.state = "uninitialized"
428 if rados_inst is not None:
429 if auth_id is not None or conffile is not None or conf is not None:
430 raise make_ex(CEPHFS_EINVAL,
431 "May not pass RADOS instance as well as other configuration")
433 self.create_with_rados(rados_inst)
435 self.create(conf, conffile, auth_id)
437 def create_with_rados(self, Rados rados_inst):
440 ret = ceph_create_from_rados(&self.cluster, rados_inst.cluster)
442 raise Error("libcephfs_initialize failed with error code: %d" % ret)
443 self.state = "configuring"
446 "special value that indicates no conffile should be read when creating a mount handle"
447 DEFAULT_CONF_FILES = -2
448 "special value that indicates the default conffiles should be read when creating a mount handle"
450 def create(self, conf=None, conffile=NO_CONF_FILE, auth_id=None):
452 Create a mount handle for interacting with Ceph. All libcephfs
453 functions operate on a mount info handle.
455 :param conf dict opt: settings overriding the default ones and conffile
456 :param conffile Union[int,str], optional: the path to ceph.conf to override the default settings
457 :auth_id str opt: the id used to authenticate the client entity
459 if conf is not None and not isinstance(conf, dict):
460 raise TypeError("conf must be dict or None")
461 cstr(conffile, 'configfile', opt=True)
462 auth_id = cstr(auth_id, 'auth_id', opt=True)
465 char* _auth_id = opt_str(auth_id)
469 ret = ceph_create(&self.cluster, <const char*>_auth_id)
471 raise Error("libcephfs_initialize failed with error code: %d" % ret)
473 self.state = "configuring"
474 if conffile in (self.NO_CONF_FILE, None):
476 elif conffile in (self.DEFAULT_CONF_FILES, ''):
477 self.conf_read_file(None)
479 self.conf_read_file(conffile)
481 for key, value in conf.items():
482 self.conf_set(key, value)
486 Return the file system id for this fs client.
488 self.require_state("mounted")
490 ret = ceph_get_fs_cid(self.cluster)
492 raise make_ex(ret, "error fetching fscid")
497 Get associated client addresses with this RADOS session.
499 self.require_state("mounted")
507 ret = ceph_getaddrs(self.cluster, &addrs)
509 raise make_ex(ret, "error calling getaddrs")
511 return decode_cstr(addrs)
513 ceph_buffer_free(addrs)
516 def conf_read_file(self, conffile=None):
518 Load the ceph configuration from the specified config file.
520 :param conffile str opt: the path to ceph.conf to override the default settings
522 conffile = cstr(conffile, 'conffile', opt=True)
524 char *_conffile = opt_str(conffile)
526 ret = ceph_conf_read_file(self.cluster, <const char*>_conffile)
528 raise make_ex(ret, "error calling conf_read_file")
530 def conf_parse_argv(self, argv):
532 Parse the command line arguments and load the configuration parameters.
534 :param argv: the argument list
536 self.require_state("configuring")
537 cargv = cstr_list(argv, 'argv')
539 int _argc = len(argv)
540 char **_argv = to_bytes_array(cargv)
544 ret = ceph_conf_parse_argv(self.cluster, _argc,
545 <const char **>_argv)
547 raise make_ex(ret, "error calling conf_parse_argv")
553 Unmount and destroy the ceph mount handle.
555 if self.state in ["initialized", "mounted"]:
557 ceph_shutdown(self.cluster)
558 self.state = "shutdown"
564 def __exit__(self, type_, value, traceback):
568 def __dealloc__(self):
573 Get the version number of the ``libcephfs`` C library.
575 :returns: a tuple of ``(major, minor, extra)`` components of the
583 ceph_version(&major, &minor, &extra)
584 return (major, minor, extra)
586 def conf_get(self, option):
588 Gets the configuration value as a string.
590 :param option: the config option to get
592 self.require_state("configuring", "initialized", "mounted")
594 option = cstr(option, 'option')
596 char *_option = option
602 ret_buf = <char *>realloc_chk(ret_buf, length)
604 ret = ceph_conf_get(self.cluster, _option, ret_buf, length)
606 return decode_cstr(ret_buf)
607 elif ret == -CEPHFS_ENAMETOOLONG:
609 elif ret == -CEPHFS_ENOENT:
612 raise make_ex(ret, "error calling conf_get")
616 def conf_set(self, option, val):
618 Sets a configuration value from a string.
620 :param option: the configuration option to set
621 :param value: the value of the configuration option to set
623 self.require_state("configuring", "initialized", "mounted")
625 option = cstr(option, 'option')
626 val = cstr(val, 'val')
628 char *_option = option
632 ret = ceph_conf_set(self.cluster, _option, _val)
634 raise make_ex(ret, "error calling conf_set")
636 def set_mount_timeout(self, timeout):
640 :param timeout: mount timeout
642 self.require_state("configuring", "initialized")
643 if not isinstance(timeout, int):
644 raise TypeError('timeout must be an integer')
646 raise make_ex(CEPHFS_EINVAL, 'timeout must be greater than or equal to 0')
648 uint32_t _timeout = timeout
650 ret = ceph_set_mount_timeout(self.cluster, _timeout)
652 raise make_ex(ret, "error setting mount timeout")
656 Initialize the filesystem client (but do not mount the filesystem yet)
658 self.require_state("configuring")
660 ret = ceph_init(self.cluster)
662 raise make_ex(ret, "error calling ceph_init")
663 self.state = "initialized"
665 def mount(self, mount_root=None, filesystem_name=None):
667 Perform a mount using the path for the root of the mount.
669 if self.state == "configuring":
671 self.require_state("initialized")
673 # Configure which filesystem to mount if one was specified
674 if filesystem_name is None:
675 filesystem_name = b""
677 filesystem_name = cstr(filesystem_name, 'filesystem_name')
679 char *_filesystem_name = filesystem_name
682 ret = ceph_select_filesystem(self.cluster,
685 raise make_ex(ret, "error calling ceph_select_filesystem")
687 # Prepare mount_root argument, default to "/"
688 root = b"/" if mount_root is None else mount_root
690 char *_mount_root = root
693 ret = ceph_mount(self.cluster, _mount_root)
695 raise make_ex(ret, "error calling ceph_mount")
696 self.state = "mounted"
700 Unmount a mount handle.
702 self.require_state("mounted")
704 ret = ceph_unmount(self.cluster)
706 raise make_ex(ret, "error calling ceph_unmount")
707 self.state = "initialized"
709 def abort_conn(self):
711 Abort mds connections.
713 self.require_state("mounted")
715 ret = ceph_abort_conn(self.cluster)
717 raise make_ex(ret, "error calling ceph_abort_conn")
718 self.state = "initialized"
720 def get_instance_id(self):
722 Get a global id for current instance
724 self.require_state("initialized", "mounted")
726 ret = ceph_get_instance_id(self.cluster)
729 def statfs(self, path):
731 Perform a statfs on the ceph file system. This call fills in file system wide statistics
732 into the passed in buffer.
734 :param path: any path within the mounted filesystem
736 self.require_state("mounted")
737 path = cstr(path, 'path')
743 ret = ceph_statfs(self.cluster, _path, &statbuf)
745 raise make_ex(ret, "statfs failed: %s" % path)
746 return {'f_bsize': statbuf.f_bsize,
747 'f_frsize': statbuf.f_frsize,
748 'f_blocks': statbuf.f_blocks,
749 'f_bfree': statbuf.f_bfree,
750 'f_bavail': statbuf.f_bavail,
751 'f_files': statbuf.f_files,
752 'f_ffree': statbuf.f_ffree,
753 'f_favail': statbuf.f_favail,
754 'f_fsid': statbuf.f_fsid,
755 'f_flag': statbuf.f_flag,
756 'f_namemax': statbuf.f_namemax}
760 Synchronize all filesystem data to persistent media
762 self.require_state("mounted")
764 ret = ceph_sync_fs(self.cluster)
766 raise make_ex(ret, "sync_fs failed")
768 def fsync(self, int fd, int syncdataonly):
770 Synchronize an open file to persistent media.
772 :param fd: the file descriptor of the file to sync.
773 :param syncdataonly: a boolean whether to synchronize metadata and data (0)
776 self.require_state("mounted")
778 ret = ceph_fsync(self.cluster, fd, syncdataonly)
780 raise make_ex(ret, "fsync failed")
782 def lazyio(self, fd, enable):
784 Enable/disable lazyio for the file.
786 :param fd: the file descriptor of the file for which to enable lazio.
787 :param enable: a boolean to enable lazyio or disable lazyio.
790 self.require_state("mounted")
791 if not isinstance(fd, int):
792 raise TypeError('fd must be an int')
793 if not isinstance(enable, int):
794 raise TypeError('enable must be an int')
801 ret = ceph_lazyio(self.cluster, _fd, _enable)
803 raise make_ex(ret, "lazyio failed")
805 def lazyio_propagate(self, fd, offset, count):
807 Flushes the write buffer for the file thereby propogating the buffered write to the file.
809 :param fd: the file descriptor of the file to sync.
810 :param offset: the byte range starting.
811 :param count: the number of bytes starting from offset.
814 self.require_state("mounted")
815 if not isinstance(fd, int):
816 raise TypeError('fd must be an int')
817 if not isinstance(offset, int):
818 raise TypeError('offset must be an int')
819 if not isinstance(count, int):
820 raise TypeError('count must be an int')
824 int64_t _offset = offset
825 size_t _count = count
828 ret = ceph_lazyio_propagate(self.cluster, _fd, _offset, _count)
830 raise make_ex(ret, "lazyio_propagate failed")
832 def lazyio_synchronize(self, fd, offset, count):
834 Flushes the write buffer for the file and invalidate the read cache. This allows a
835 subsequent read operation to read and cache data directly from the file and hence
836 everyone's propagated writes would be visible.
838 :param fd: the file descriptor of the file to sync.
839 :param offset: the byte range starting.
840 :param count: the number of bytes starting from offset.
843 self.require_state("mounted")
844 if not isinstance(fd, int):
845 raise TypeError('fd must be an int')
846 if not isinstance(offset, int):
847 raise TypeError('offset must be an int')
848 if not isinstance(count, int):
849 raise TypeError('count must be an int')
853 int64_t _offset = offset
854 size_t _count = count
857 ret = ceph_lazyio_synchronize(self.cluster, _fd, _offset, _count)
859 raise make_ex(ret, "lazyio_synchronize failed")
861 def fallocate(self, fd, offset, length, mode=0):
863 Preallocate or release disk space for the file for the byte range.
865 :param fd: the file descriptor of the file to fallocate.
866 :param mode: the flags determines the operation to be performed on the given
867 range. default operation (0) allocate and initialize to zero
868 the file in the byte range, and the file size will be changed
869 if offset + length is greater than the file size. if the
870 FALLOC_FL_KEEP_SIZE flag is specified in the mode, the file size
871 will not be changed. if the FALLOC_FL_PUNCH_HOLE flag is specified
872 in the mode, the operation is deallocate space and zero the byte range.
873 :param offset: the byte range starting.
874 :param length: the length of the range.
877 self.require_state("mounted")
878 if not isinstance(fd, int):
879 raise TypeError('fd must be an int')
880 if not isinstance(mode, int):
881 raise TypeError('mode must be an int')
882 if not isinstance(offset, int):
883 raise TypeError('offset must be an int')
884 if not isinstance(length, int):
885 raise TypeError('length must be an int')
890 int64_t _offset = offset
891 int64_t _length = length
894 ret = ceph_fallocate(self.cluster, _fd, _mode, _offset, _length)
896 raise make_ex(ret, "fallocate failed")
898 def getcwd(self) -> bytes:
900 Get the current working directory.
902 :returns: the path to the current working directory
904 self.require_state("mounted")
906 ret = ceph_getcwd(self.cluster)
909 def chdir(self, path):
911 Change the current working directory.
913 :param path: the path to the working directory to change into.
915 self.require_state("mounted")
917 path = cstr(path, 'path')
918 cdef char* _path = path
920 ret = ceph_chdir(self.cluster, _path)
922 raise make_ex(ret, "chdir failed")
924 def opendir(self, path) -> DirResult:
926 Open the given directory.
928 :param path: the path name of the directory to open. Must be either an absolute path
929 or a path relative to the current working directory.
930 :returns: the open directory stream handle
932 self.require_state("mounted")
934 path = cstr(path, 'path')
937 ceph_dir_result* handle
939 ret = ceph_opendir(self.cluster, _path, &handle);
941 raise make_ex(ret, "opendir failed")
947 def readdir(self, DirResult handle) -> Optional[DirEntry]:
949 Get the next entry in an open directory.
951 :param handle: the open directory stream handle
952 :returns: the next directory entry or None if at the end of the
953 directory (or the directory is empty. This pointer
954 should not be freed by the caller, and is only safe to
955 access between return and the next call to readdir or
958 self.require_state("mounted")
960 return handle.readdir()
962 def closedir(self, DirResult handle):
964 Close the open directory.
966 :param handle: the open directory stream handle
968 self.require_state("mounted")
970 return handle.close()
972 def rewinddir(self, DirResult handle):
974 Rewind the directory stream to the beginning of the directory.
976 :param handle: the open directory stream handle
978 return handle.rewinddir()
980 def telldir(self, DirResult handle):
982 Get the current position of a directory stream.
984 :param handle: the open directory stream handle
985 :return value: The position of the directory stream. Note that the offsets
986 returned by ceph_telldir do not have a particular order (cannot
987 be compared with inequality).
989 return handle.telldir()
991 def seekdir(self, DirResult handle, offset):
993 Move the directory stream to a position specified by the given offset.
995 :param handle: the open directory stream handle
996 :param offset: the position to move the directory stream to. This offset should be
997 a value returned by telldir. Note that this value does not refer to
998 the nth entry in a directory, and can not be manipulated with plus
1001 return handle.seekdir(offset)
1003 def mkdir(self, path, mode):
1007 :param path: the path of the directory to create. This must be either an
1008 absolute path or a relative path off of the current working directory.
1009 :param mode: the permissions the directory should have once created.
1012 self.require_state("mounted")
1013 path = cstr(path, 'path')
1014 if not isinstance(mode, int):
1015 raise TypeError('mode must be an int')
1020 ret = ceph_mkdir(self.cluster, _path, _mode)
1022 raise make_ex(ret, "error in mkdir {}".format(path.decode('utf-8')))
1024 def mksnap(self, path, name, mode, metadata={}) -> int:
1028 :param path: path of the directory to snapshot.
1029 :param name: snapshot name
1030 :param mode: permission of the snapshot
1031 :param metadata: metadata key/value to store with the snapshot
1033 :raises: :class: `TypeError`
1034 :raises: :class: `Error`
1035 :returns: 0 on success
1038 self.require_state("mounted")
1039 path = cstr(path, 'path')
1040 name = cstr(name, 'name')
1041 if not isinstance(mode, int):
1042 raise TypeError('mode must be an int')
1043 if not isinstance(metadata, dict):
1044 raise TypeError('metadata must be an dictionary')
1046 for key, value in metadata.items():
1047 if not isinstance(key, str) or not isinstance(value, str):
1048 raise TypeError('metadata key and values should be strings')
1049 md[key.encode('utf-8')] = value.encode('utf-8')
1055 snap_metadata *_snap_meta = <snap_metadata *>malloc(nr * sizeof(snap_metadata))
1056 if nr and _snap_meta == NULL:
1057 raise MemoryError("malloc failed")
1059 for key, value in md.items():
1060 _snap_meta[i] = snap_metadata(<char*>key, <char*>value)
1063 ret = ceph_mksnap(self.cluster, _path, _name, _mode, _snap_meta, nr)
1066 raise make_ex(ret, "mksnap error")
1069 def rmsnap(self, path, name) -> int:
1073 :param path: path of the directory for removing snapshot
1074 :param name: snapshot name
1076 :raises: :class: `Error`
1077 :returns: 0 on success
1079 self.require_state("mounted")
1080 path = cstr(path, 'path')
1081 name = cstr(name, 'name')
1085 ret = ceph_rmsnap(self.cluster, _path, _name)
1087 raise make_ex(ret, "rmsnap error")
1090 def snap_info(self, path) -> Dict[str, Any]:
1094 :param path: snapshot path
1096 :raises: :class: `Error`
1097 :returns: snapshot metadata
1099 self.require_state("mounted")
1100 path = cstr(path, 'path')
1104 ret = ceph_get_snap_info(self.cluster, _path, &info)
1106 raise make_ex(ret, "snap_info error")
1108 if info.nr_snap_metadata:
1109 md = {snap_meta.key.decode('utf-8'): snap_meta.value.decode('utf-8') for snap_meta in
1110 info.snap_metadata[:info.nr_snap_metadata]}
1111 ceph_free_snap_info_buffer(&info)
1112 return {'id': info.id, 'metadata': md}
1114 def chmod(self, path, mode) -> None:
1116 Change directory mode.
1118 :param path: the path of the directory to create. This must be either an
1119 absolute path or a relative path off of the current working directory.
1120 :param mode: the permissions the directory should have once created.
1122 self.require_state("mounted")
1123 path = cstr(path, 'path')
1124 if not isinstance(mode, int):
1125 raise TypeError('mode must be an int')
1130 ret = ceph_chmod(self.cluster, _path, _mode)
1132 raise make_ex(ret, "error in chmod {}".format(path.decode('utf-8')))
1134 def lchmod(self, path, mode) -> None:
1136 Change file mode. If the path is a symbolic link, it won't be dereferenced.
1138 :param path: the path of the file. This must be either an absolute path or
1139 a relative path off of the current working directory.
1140 :param mode: the permissions to be set .
1142 self.require_state("mounted")
1143 path = cstr(path, 'path')
1144 if not isinstance(mode, int):
1145 raise TypeError('mode must be an int')
1150 ret = ceph_lchmod(self.cluster, _path, _mode)
1152 raise make_ex(ret, "error in chmod {}".format(path.decode('utf-8')))
1154 def fchmod(self, fd, mode) :
1156 Change file mode based on fd.
1158 :param fd: the file descriptor of the file to change file mode
1159 :param mode: the permissions to be set.
1161 self.require_state("mounted")
1162 if not isinstance(fd, int):
1163 raise TypeError('fd must be an int')
1164 if not isinstance(mode, int):
1165 raise TypeError('mode must be an int')
1170 ret = ceph_fchmod(self.cluster, _fd, _mode)
1172 raise make_ex(ret, "error in fchmod")
1174 def chown(self, path, uid, gid, follow_symlink=True):
1176 Change directory ownership
1178 :param path: the path of the directory to change.
1179 :param uid: the uid to set
1180 :param gid: the gid to set
1181 :param follow_symlink: perform the operation on the target file if @path
1182 is a symbolic link (default)
1184 self.require_state("mounted")
1185 path = cstr(path, 'path')
1186 if not isinstance(uid, int):
1187 raise TypeError('uid must be an int')
1188 elif not isinstance(gid, int):
1189 raise TypeError('gid must be an int')
1197 ret = ceph_chown(self.cluster, _path, _uid, _gid)
1200 ret = ceph_lchown(self.cluster, _path, _uid, _gid)
1202 raise make_ex(ret, "error in chown {}".format(path.decode('utf-8')))
1204 def lchown(self, path, uid, gid):
1206 Change ownership of a symbolic link
1208 :param path: the path of the symbolic link to change.
1209 :param uid: the uid to set
1210 :param gid: the gid to set
1212 self.chown(path, uid, gid, follow_symlink=False)
1214 def fchown(self, fd, uid, gid):
1216 Change file ownership
1218 :param fd: the file descriptor of the file to change ownership
1219 :param uid: the uid to set
1220 :param gid: the gid to set
1222 self.require_state("mounted")
1223 if not isinstance(fd, int):
1224 raise TypeError('fd must be an int')
1225 if not isinstance(uid, int):
1226 raise TypeError('uid must be an int')
1227 elif not isinstance(gid, int):
1228 raise TypeError('gid must be an int')
1235 ret = ceph_fchown(self.cluster, _fd, _uid, _gid)
1237 raise make_ex(ret, "error in fchown")
1239 def mkdirs(self, path, mode):
1241 Create multiple directories at once.
1243 :param path: the full path of directories and sub-directories that should
1245 :param mode: the permissions the directory should have once created
1247 self.require_state("mounted")
1248 path = cstr(path, 'path')
1249 if not isinstance(mode, int):
1250 raise TypeError('mode must be an int')
1256 ret = ceph_mkdirs(self.cluster, _path, _mode)
1258 raise make_ex(ret, "error in mkdirs {}".format(path.decode('utf-8')))
1260 def rmdir(self, path):
1264 :param path: the path of the directory to remove.
1266 self.require_state("mounted")
1267 path = cstr(path, 'path')
1268 cdef char* _path = path
1269 ret = ceph_rmdir(self.cluster, _path)
1271 raise make_ex(ret, "error in rmdir {}".format(path.decode('utf-8')))
1273 def open(self, path, flags, mode=0):
1275 Create and/or open a file.
1277 :param path: the path of the file to open. If the flags parameter includes O_CREAT,
1278 the file will first be created before opening.
1279 :param flags: set of option masks that control how the file is created/opened.
1280 :param mode: the permissions to place on the file if the file does not exist and O_CREAT
1281 is specified in the flags.
1283 self.require_state("mounted")
1284 path = cstr(path, 'path')
1286 if not isinstance(mode, int):
1287 raise TypeError('mode must be an int')
1288 if isinstance(flags, str):
1291 cephfs_flags = os.O_RDONLY
1299 cephfs_flags |= os.O_TRUNC | os.O_CREAT
1300 elif access_flags > 0 and c == '+':
1303 raise make_ex(CEPHFS_EOPNOTSUPP,
1304 "open flags doesn't support %s" % c)
1306 if access_flags == 1:
1307 cephfs_flags |= os.O_RDONLY;
1308 elif access_flags == 2:
1309 cephfs_flags |= os.O_WRONLY;
1311 cephfs_flags |= os.O_RDWR;
1313 elif isinstance(flags, int):
1314 cephfs_flags = flags
1316 raise TypeError("flags must be a string or an integer")
1320 int _flags = cephfs_flags
1324 ret = ceph_open(self.cluster, _path, _flags, _mode)
1326 raise make_ex(ret, "error in open {}".format(path.decode('utf-8')))
1329 def close(self, fd):
1331 Close the open file.
1333 :param fd: the file descriptor referring to the open file.
1336 self.require_state("mounted")
1337 if not isinstance(fd, int):
1338 raise TypeError('fd must be an int')
1341 ret = ceph_close(self.cluster, _fd)
1343 raise make_ex(ret, "error in close")
1345 def read(self, fd, offset, l):
1347 Read data from the file.
1349 :param fd: the file descriptor of the open file to read from.
1350 :param offset: the offset in the file to read from. If this value is negative, the
1351 function reads from the current offset of the file descriptor.
1352 :param l: the flag to indicate what type of seeking to perform
1354 self.require_state("mounted")
1355 if not isinstance(offset, int):
1356 raise TypeError('offset must be an int')
1357 if not isinstance(l, int):
1358 raise TypeError('l must be an int')
1359 if not isinstance(fd, int):
1360 raise TypeError('fd must be an int')
1363 int64_t _offset = offset
1367 PyObject* ret_s = NULL
1369 ret_s = PyBytes_FromStringAndSize(NULL, _length)
1371 ret_buf = PyBytes_AsString(ret_s)
1373 ret = ceph_read(self.cluster, _fd, ret_buf, _length, _offset)
1375 raise make_ex(ret, "error in read")
1378 _PyBytes_Resize(&ret_s, ret)
1380 return <object>ret_s
1382 # We DECREF unconditionally: the cast to object above will have
1383 # INCREFed if necessary. This also takes care of exceptions,
1384 # including if _PyString_Resize fails (that will free the string
1385 # itself and set ret_s to NULL, hence XDECREF).
1386 ref.Py_XDECREF(ret_s)
1388 def preadv(self, fd, buffers, offset):
1390 Write data to a file.
1392 :param fd: the file descriptor of the open file to read from
1393 :param buffers: the list of byte object to read from the file
1394 :param offset: the offset of the file read from. If this value is negative, the
1395 function reads from the current offset of the file descriptor.
1397 self.require_state("mounted")
1398 if not isinstance(fd, int):
1399 raise TypeError('fd must be an int')
1400 if not isinstance(buffers, list):
1401 raise TypeError('buffers must be a list')
1403 if not isinstance(buf, bytearray):
1404 raise TypeError('buffers must be a list of bytes')
1405 if not isinstance(offset, int):
1406 raise TypeError('offset must be an int')
1410 int _iovcnt = len(buffers)
1411 int64_t _offset = offset
1412 iovec *_iov = to_iovec(buffers)
1415 ret = ceph_preadv(self.cluster, _fd, _iov, _iovcnt, _offset)
1417 raise make_ex(ret, "error in preadv")
1422 def write(self, fd, buf, offset):
1424 Write data to a file.
1426 :param fd: the file descriptor of the open file to write to
1427 :param buf: the bytes to write to the file
1428 :param offset: the offset of the file write into. If this value is negative, the
1429 function writes to the current offset of the file descriptor.
1431 self.require_state("mounted")
1432 if not isinstance(fd, int):
1433 raise TypeError('fd must be an int')
1434 if not isinstance(buf, bytes):
1435 raise TypeError('buf must be a bytes')
1436 if not isinstance(offset, int):
1437 raise TypeError('offset must be an int')
1442 int64_t _offset = offset
1444 size_t length = len(buf)
1447 ret = ceph_write(self.cluster, _fd, _data, length, _offset)
1449 raise make_ex(ret, "error in write")
1452 def pwritev(self, fd, buffers, offset):
1454 Write data to a file.
1456 :param fd: the file descriptor of the open file to write to
1457 :param buffers: the list of byte object to write to the file
1458 :param offset: the offset of the file write into. If this value is negative, the
1459 function writes to the current offset of the file descriptor.
1461 self.require_state("mounted")
1462 if not isinstance(fd, int):
1463 raise TypeError('fd must be an int')
1464 if not isinstance(buffers, list):
1465 raise TypeError('buffers must be a list')
1467 if not isinstance(buf, bytes):
1468 raise TypeError('buffers must be a list of bytes')
1469 if not isinstance(offset, int):
1470 raise TypeError('offset must be an int')
1474 int _iovcnt = len(buffers)
1475 int64_t _offset = offset
1476 iovec *_iov = to_iovec(buffers)
1479 ret = ceph_pwritev(self.cluster, _fd, _iov, _iovcnt, _offset)
1481 raise make_ex(ret, "error in pwritev")
1486 def flock(self, fd, operation, owner):
1488 Apply or remove an advisory lock.
1490 :param fd: the open file descriptor to change advisory lock.
1491 :param operation: the advisory lock operation to be performed on the file
1492 :param owner: the user-supplied owner identifier (an arbitrary integer)
1494 self.require_state("mounted")
1495 if not isinstance(fd, int):
1496 raise TypeError('fd must be an int')
1497 if not isinstance(operation, int):
1498 raise TypeError('operation must be an int')
1503 uint64_t _owner = owner
1506 ret = ceph_flock(self.cluster, _fd, _op, _owner)
1508 raise make_ex(ret, "error in write")
1511 def truncate(self, path, size):
1513 Truncate the file to the given size. If this operation causes the
1514 file to expand, the empty bytes will be filled in with zeros.
1516 :param path: the path to the file to truncate.
1517 :param size: the new size of the file.
1520 if not isinstance(size, int):
1521 raise TypeError('size must be a int')
1524 statx_dict["size"] = size
1525 self.setattrx(path, statx_dict, CEPH_SETATTR_SIZE, AT_SYMLINK_NOFOLLOW)
1527 def ftruncate(self, fd, size):
1529 Truncate the file to the given size. If this operation causes the
1530 file to expand, the empty bytes will be filled in with zeros.
1532 :param path: the path to the file to truncate.
1533 :param size: the new size of the file.
1536 if not isinstance(size, int):
1537 raise TypeError('size must be a int')
1540 statx_dict["size"] = size
1541 self.fsetattrx(fd, statx_dict, CEPH_SETATTR_SIZE)
1543 def mknod(self, path, mode, rdev=0):
1545 Make a block or character special file.
1547 :param path: the path to the special file.
1548 :param mode: the permissions to use and the type of special file. The type can be
1549 one of stat.S_IFREG, stat.S_IFCHR, stat.S_IFBLK, stat.S_IFIFO. Both
1550 should be combined using bitwise OR.
1551 :param rdev: If the file type is stat.S_IFCHR or stat.S_IFBLK then this parameter
1552 specifies the major and minor numbers of the newly created device
1553 special file. Otherwise, it is ignored.
1555 self.require_state("mounted")
1556 path = cstr(path, 'path')
1558 if not isinstance(mode, int):
1559 raise TypeError('mode must be an int')
1560 if not isinstance(rdev, int):
1561 raise TypeError('rdev must be an int')
1569 ret = ceph_mknod(self.cluster, _path, _mode, _rdev)
1571 raise make_ex(ret, "error in mknod {}".format(path.decode('utf-8')))
1573 def getxattr(self, path, name, size=255, follow_symlink=True):
1575 Get an extended attribute.
1577 :param path: the path to the file
1578 :param name: the name of the extended attribute to get
1579 :param size: the size of the pre-allocated buffer
1581 self.require_state("mounted")
1583 path = cstr(path, 'path')
1584 name = cstr(name, 'name')
1590 size_t ret_length = size
1591 char *ret_buf = NULL
1594 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
1597 ret = ceph_getxattr(self.cluster, _path, _name, ret_buf,
1601 ret = ceph_lgetxattr(self.cluster, _path, _name, ret_buf,
1605 raise make_ex(ret, "error in getxattr")
1607 return ret_buf[:ret]
1611 def fgetxattr(self, fd, name, size=255):
1613 Get an extended attribute given the fd of a file.
1615 :param fd: the open file descriptor referring to the file
1616 :param name: the name of the extended attribute to get
1617 :param size: the size of the pre-allocated buffer
1619 self.require_state("mounted")
1621 if not isinstance(fd, int):
1622 raise TypeError('fd must be an int')
1623 name = cstr(name, 'name')
1629 size_t ret_length = size
1630 char *ret_buf = NULL
1633 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
1635 ret = ceph_fgetxattr(self.cluster, _fd, _name, ret_buf,
1639 raise make_ex(ret, "error in fgetxattr")
1641 return ret_buf[:ret]
1645 def lgetxattr(self, path, name, size=255):
1647 Get an extended attribute without following symbolic links. This
1648 function is identical to ceph_getxattr, but if the path refers to
1649 a symbolic link, we get the extended attributes of the symlink
1650 rather than the attributes of the file it points to.
1652 :param path: the path to the file
1653 :param name: the name of the extended attribute to get
1654 :param size: the size of the pre-allocated buffer
1657 return self.getxattr(path, name, size=size, follow_symlink=False)
1659 def setxattr(self, path, name, value, flags, follow_symlink=True):
1661 Set an extended attribute on a file.
1663 :param path: the path to the file.
1664 :param name: the name of the extended attribute to set.
1665 :param value: the bytes of the extended attribute value
1667 self.require_state("mounted")
1669 name = cstr(name, 'name')
1670 path = cstr(path, 'path')
1671 if not isinstance(flags, int):
1672 raise TypeError('flags must be a int')
1673 if not isinstance(value, bytes):
1674 raise TypeError('value must be a bytes')
1679 char *_value = value
1680 size_t _value_len = len(value)
1685 ret = ceph_setxattr(self.cluster, _path, _name,
1686 _value, _value_len, _flags)
1689 ret = ceph_lsetxattr(self.cluster, _path, _name,
1690 _value, _value_len, _flags)
1693 raise make_ex(ret, "error in setxattr")
1695 def fsetxattr(self, fd, name, value, flags):
1697 Set an extended attribute on a file.
1699 :param fd: the open file descriptor referring to the file.
1700 :param name: the name of the extended attribute to set.
1701 :param value: the bytes of the extended attribute value
1703 self.require_state("mounted")
1705 name = cstr(name, 'name')
1706 if not isinstance(fd, int):
1707 raise TypeError('fd must be an int')
1708 if not isinstance(flags, int):
1709 raise TypeError('flags must be a int')
1710 if not isinstance(value, bytes):
1711 raise TypeError('value must be a bytes')
1716 char *_value = value
1717 size_t _value_len = len(value)
1721 ret = ceph_fsetxattr(self.cluster, _fd, _name,
1722 _value, _value_len, _flags)
1724 raise make_ex(ret, "error in fsetxattr")
1726 def lsetxattr(self, path, name, value, flags):
1728 Set an extended attribute on a file but do not follow symbolic link.
1730 :param path: the path to the file.
1731 :param name: the name of the extended attribute to set.
1732 :param value: the bytes of the extended attribute value
1735 self.setxattr(path, name, value, flags, follow_symlink=False)
1737 def removexattr(self, path, name, follow_symlink=True):
1739 Remove an extended attribute of a file.
1741 :param path: path of the file.
1742 :param name: name of the extended attribute to remove.
1744 self.require_state("mounted")
1746 name = cstr(name, 'name')
1747 path = cstr(path, 'path')
1755 ret = ceph_removexattr(self.cluster, _path, _name)
1758 ret = ceph_lremovexattr(self.cluster, _path, _name)
1761 raise make_ex(ret, "error in removexattr")
1763 def fremovexattr(self, fd, name):
1765 Remove an extended attribute of a file.
1767 :param fd: the open file descriptor referring to the file.
1768 :param name: name of the extended attribute to remove.
1770 self.require_state("mounted")
1772 if not isinstance(fd, int):
1773 raise TypeError('fd must be an int')
1774 name = cstr(name, 'name')
1781 ret = ceph_fremovexattr(self.cluster, _fd, _name)
1783 raise make_ex(ret, "error in fremovexattr")
1785 def lremovexattr(self, path, name):
1787 Remove an extended attribute of a file but do not follow symbolic link.
1789 :param path: path of the file.
1790 :param name: name of the extended attribute to remove.
1792 self.removexattr(path, name, follow_symlink=False)
1794 def listxattr(self, path, size=65536, follow_symlink=True):
1796 List the extended attribute keys set on a file.
1798 :param path: path of the file.
1799 :param size: the size of list buffer to be filled with extended attribute keys.
1801 self.require_state("mounted")
1803 path = cstr(path, 'path')
1807 char *ret_buf = NULL
1808 size_t ret_length = size
1811 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
1814 ret = ceph_listxattr(self.cluster, _path, ret_buf, ret_length)
1817 ret = ceph_llistxattr(self.cluster, _path, ret_buf, ret_length)
1820 raise make_ex(ret, "error in listxattr")
1822 return ret, ret_buf[:ret]
1826 def flistxattr(self, fd, size=65536):
1828 List the extended attribute keys set on a file.
1830 :param fd: the open file descriptor referring to the file.
1831 :param size: the size of list buffer to be filled with extended attribute keys.
1833 self.require_state("mounted")
1835 if not isinstance(fd, int):
1836 raise TypeError('fd must be an int')
1840 char *ret_buf = NULL
1841 size_t ret_length = size
1844 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
1846 ret = ceph_flistxattr(self.cluster, _fd, ret_buf, ret_length)
1849 raise make_ex(ret, "error in flistxattr")
1851 return ret, ret_buf[:ret]
1855 def llistxattr(self, path, size=65536):
1857 List the extended attribute keys set on a file but do not follow symbolic link.
1859 :param path: path of the file.
1860 :param size: the size of list buffer to be filled with extended attribute keys.
1863 return self.listxattr(path, size=size, follow_symlink=False)
1865 def stat(self, path, follow_symlink=True):
1867 Get a file's extended statistics and attributes.
1869 :param path: the file or directory to get the statistics of.
1871 self.require_state("mounted")
1872 path = cstr(path, 'path')
1880 ret = ceph_statx(self.cluster, _path, &stx,
1881 CEPH_STATX_BASIC_STATS_CDEF, 0)
1884 ret = ceph_statx(self.cluster, _path, &stx,
1885 CEPH_STATX_BASIC_STATS_CDEF, AT_SYMLINK_NOFOLLOW_CDEF)
1888 raise make_ex(ret, "error in stat: {}".format(path.decode('utf-8')))
1889 return StatResult(st_dev=stx.stx_dev, st_ino=stx.stx_ino,
1890 st_mode=stx.stx_mode, st_nlink=stx.stx_nlink,
1891 st_uid=stx.stx_uid, st_gid=stx.stx_gid,
1892 st_rdev=stx.stx_rdev, st_size=stx.stx_size,
1893 st_blksize=stx.stx_blksize,
1894 st_blocks=stx.stx_blocks,
1895 st_atime=datetime.fromtimestamp(stx.stx_atime.tv_sec),
1896 st_mtime=datetime.fromtimestamp(stx.stx_mtime.tv_sec),
1897 st_ctime=datetime.fromtimestamp(stx.stx_ctime.tv_sec))
1899 def lstat(self, path):
1901 Get a file's extended statistics and attributes. When file's a
1902 symbolic link, return the informaion of the link itself rather
1903 than that of the file it points too.
1905 :param path: the file or directory to get the statistics of.
1907 return self.stat(path, follow_symlink=False)
1909 def fstat(self, fd):
1911 Get an open file's extended statistics and attributes.
1913 :param fd: the file descriptor of the file to get statistics of.
1915 self.require_state("mounted")
1916 if not isinstance(fd, int):
1917 raise TypeError('fd must be an int')
1924 ret = ceph_fstatx(self.cluster, _fd, &stx,
1925 CEPH_STATX_BASIC_STATS_CDEF, 0)
1927 raise make_ex(ret, "error in fsat")
1928 return StatResult(st_dev=stx.stx_dev, st_ino=stx.stx_ino,
1929 st_mode=stx.stx_mode, st_nlink=stx.stx_nlink,
1930 st_uid=stx.stx_uid, st_gid=stx.stx_gid,
1931 st_rdev=stx.stx_rdev, st_size=stx.stx_size,
1932 st_blksize=stx.stx_blksize,
1933 st_blocks=stx.stx_blocks,
1934 st_atime=datetime.fromtimestamp(stx.stx_atime.tv_sec),
1935 st_mtime=datetime.fromtimestamp(stx.stx_mtime.tv_sec),
1936 st_ctime=datetime.fromtimestamp(stx.stx_ctime.tv_sec))
1938 def statx(self, path, mask, flag):
1940 Get a file's extended statistics and attributes.
1942 :param path: the file or directory to get the statistics of.
1943 :param mask: want bitfield of CEPH_STATX_* flags showing designed attributes.
1944 :param flag: bitfield that can be used to set AT_* modifier flags (only AT_NO_ATTR_SYNC and AT_SYMLINK_NOFOLLOW)
1947 self.require_state("mounted")
1948 path = cstr(path, 'path')
1949 if not isinstance(mask, int):
1950 raise TypeError('flag must be a int')
1951 if not isinstance(flag, int):
1952 raise TypeError('flag must be a int')
1959 dict_result = dict()
1962 ret = ceph_statx(self.cluster, _path, &stx, _mask, _flag)
1964 raise make_ex(ret, "error in stat: %s" % path)
1966 if (_mask & CEPH_STATX_MODE):
1967 dict_result["mode"] = stx.stx_mode
1968 if (_mask & CEPH_STATX_NLINK):
1969 dict_result["nlink"] = stx.stx_nlink
1970 if (_mask & CEPH_STATX_UID):
1971 dict_result["uid"] = stx.stx_uid
1972 if (_mask & CEPH_STATX_GID):
1973 dict_result["gid"] = stx.stx_gid
1974 if (_mask & CEPH_STATX_RDEV):
1975 dict_result["rdev"] = stx.stx_rdev
1976 if (_mask & CEPH_STATX_ATIME):
1977 dict_result["atime"] = datetime.fromtimestamp(stx.stx_atime.tv_sec)
1978 if (_mask & CEPH_STATX_MTIME):
1979 dict_result["mtime"] = datetime.fromtimestamp(stx.stx_mtime.tv_sec)
1980 if (_mask & CEPH_STATX_CTIME):
1981 dict_result["ctime"] = datetime.fromtimestamp(stx.stx_ctime.tv_sec)
1982 if (_mask & CEPH_STATX_INO):
1983 dict_result["ino"] = stx.stx_ino
1984 if (_mask & CEPH_STATX_SIZE):
1985 dict_result["size"] = stx.stx_size
1986 if (_mask & CEPH_STATX_BLOCKS):
1987 dict_result["blocks"] = stx.stx_blocks
1988 if (_mask & CEPH_STATX_BTIME):
1989 dict_result["btime"] = datetime.fromtimestamp(stx.stx_btime.tv_sec)
1990 if (_mask & CEPH_STATX_VERSION):
1991 dict_result["version"] = stx.stx_version
1995 def setattrx(self, path, dict_stx, mask, flags):
1997 Set a file's attributes.
1999 :param path: the path to the file/directory to set the attributes of.
2000 :param mask: a mask of all the CEPH_SETATTR_* values that have been set in the statx struct.
2001 :param stx: a dict of statx structure that must include attribute values to set on the file.
2002 :param flags: mask of AT_* flags (only AT_ATTR_NOFOLLOW is respected for now)
2005 self.require_state("mounted")
2006 path = cstr(path, 'path')
2007 if not isinstance(dict_stx, dict):
2008 raise TypeError('dict_stx must be a dict')
2009 if not isinstance(mask, int):
2010 raise TypeError('mask must be a int')
2011 if not isinstance(flags, int):
2012 raise TypeError('flags must be a int')
2016 if (mask & CEPH_SETATTR_MODE):
2017 stx.stx_mode = dict_stx["mode"]
2018 if (mask & CEPH_SETATTR_UID):
2019 stx.stx_uid = dict_stx["uid"]
2020 if (mask & CEPH_SETATTR_GID):
2021 stx.stx_gid = dict_stx["gid"]
2022 if (mask & CEPH_SETATTR_MTIME):
2023 stx.stx_mtime = to_timespec(dict_stx["mtime"].timestamp())
2024 if (mask & CEPH_SETATTR_ATIME):
2025 stx.stx_atime = to_timespec(dict_stx["atime"].timestamp())
2026 if (mask & CEPH_SETATTR_CTIME):
2027 stx.stx_ctime = to_timespec(dict_stx["ctime"].timestamp())
2028 if (mask & CEPH_SETATTR_SIZE):
2029 stx.stx_size = dict_stx["size"]
2030 if (mask & CEPH_SETATTR_BTIME):
2031 stx.stx_btime = to_timespec(dict_stx["btime"].timestamp())
2037 dict_result = dict()
2040 ret = ceph_setattrx(self.cluster, _path, &stx, _mask, _flags)
2042 raise make_ex(ret, "error in setattrx: %s" % path)
2044 def fsetattrx(self, fd, dict_stx, mask):
2046 Set a file's attributes.
2048 :param path: the path to the file/directory to set the attributes of.
2049 :param mask: a mask of all the CEPH_SETATTR_* values that have been set in the statx struct.
2050 :param stx: a dict of statx structure that must include attribute values to set on the file.
2053 self.require_state("mounted")
2054 if not isinstance(fd, int):
2055 raise TypeError('fd must be a int')
2056 if not isinstance(dict_stx, dict):
2057 raise TypeError('dict_stx must be a dict')
2058 if not isinstance(mask, int):
2059 raise TypeError('mask must be a int')
2063 if (mask & CEPH_SETATTR_MODE):
2064 stx.stx_mode = dict_stx["mode"]
2065 if (mask & CEPH_SETATTR_UID):
2066 stx.stx_uid = dict_stx["uid"]
2067 if (mask & CEPH_SETATTR_GID):
2068 stx.stx_gid = dict_stx["gid"]
2069 if (mask & CEPH_SETATTR_MTIME):
2070 stx.stx_mtime = to_timespec(dict_stx["mtime"].timestamp())
2071 if (mask & CEPH_SETATTR_ATIME):
2072 stx.stx_atime = to_timespec(dict_stx["atime"].timestamp())
2073 if (mask & CEPH_SETATTR_CTIME):
2074 stx.stx_ctime = to_timespec(dict_stx["ctime"].timestamp())
2075 if (mask & CEPH_SETATTR_SIZE):
2076 stx.stx_size = dict_stx["size"]
2077 if (mask & CEPH_SETATTR_BTIME):
2078 stx.stx_btime = to_timespec(dict_stx["btime"].timestamp())
2083 dict_result = dict()
2086 ret = ceph_fsetattrx(self.cluster, _fd, &stx, _mask)
2088 raise make_ex(ret, "error in fsetattrx")
2090 def symlink(self, existing, newname):
2092 Creates a symbolic link.
2094 :param existing: the path to the existing file/directory to link to.
2095 :param newname: the path to the new file/directory to link from.
2097 self.require_state("mounted")
2098 existing = cstr(existing, 'existing')
2099 newname = cstr(newname, 'newname')
2101 char* _existing = existing
2102 char* _newname = newname
2105 ret = ceph_symlink(self.cluster, _existing, _newname)
2107 raise make_ex(ret, "error in symlink")
2109 def link(self, existing, newname):
2113 :param existing: the path to the existing file/directory to link to.
2114 :param newname: the path to the new file/directory to link from.
2117 self.require_state("mounted")
2118 existing = cstr(existing, 'existing')
2119 newname = cstr(newname, 'newname')
2121 char* _existing = existing
2122 char* _newname = newname
2125 ret = ceph_link(self.cluster, _existing, _newname)
2127 raise make_ex(ret, "error in link")
2129 def readlink(self, path, size) -> bytes:
2131 Read a symbolic link.
2133 :param path: the path to the symlink to read
2134 :param size: the length of the buffer
2135 :returns: buffer to hold the path of the file that the symlink points to.
2137 self.require_state("mounted")
2138 path = cstr(path, 'path')
2142 int64_t _size = size
2146 buf = <char *>realloc_chk(buf, _size)
2148 ret = ceph_readlink(self.cluster, _path, buf, _size)
2150 raise make_ex(ret, "error in readlink")
2155 def unlink(self, path):
2157 Removes a file, link, or symbolic link. If the file/link has multiple links to it, the
2158 file will not disappear from the namespace until all references to it are removed.
2160 :param path: the path of the file or link to unlink.
2162 self.require_state("mounted")
2163 path = cstr(path, 'path')
2164 cdef char* _path = path
2166 ret = ceph_unlink(self.cluster, _path)
2168 raise make_ex(ret, "error in unlink: {}".format(path.decode('utf-8')))
2170 def rename(self, src, dst):
2172 Rename a file or directory.
2174 :param src: the path to the existing file or directory.
2175 :param dst: the new name of the file or directory.
2178 self.require_state("mounted")
2180 src = cstr(src, 'source')
2181 dst = cstr(dst, 'destination')
2188 ret = ceph_rename(self.cluster, _src, _dst)
2190 raise make_ex(ret, "error in rename {} to {}".format(src.decode(
2191 'utf-8'), dst.decode('utf-8')))
2193 def mds_command(self, mds_spec, args, input_data):
2195 :returns: 3-tuple of output status int, output status string, output data
2197 mds_spec = cstr(mds_spec, 'mds_spec')
2198 args = cstr(args, 'args')
2199 input_data = cstr(input_data, 'input_data')
2202 char *_mds_spec = opt_str(mds_spec)
2203 char **_cmd = to_bytes_array([args])
2206 char *_inbuf = input_data
2207 size_t _inbuf_len = len(input_data)
2209 char *_outbuf = NULL
2210 size_t _outbuf_len = 0
2212 size_t _outs_len = 0
2216 ret = ceph_mds_command(self.cluster, _mds_spec,
2217 <const char **>_cmd, _cmdlen,
2218 <const char*>_inbuf, _inbuf_len,
2219 &_outbuf, &_outbuf_len,
2221 my_outs = decode_cstr(_outs[:_outs_len])
2222 my_outbuf = _outbuf[:_outbuf_len]
2224 ceph_buffer_free(_outs)
2226 ceph_buffer_free(_outbuf)
2227 return (ret, my_outbuf, my_outs)
2231 def umask(self, mode) :
2232 self.require_state("mounted")
2236 ret = ceph_umask(self.cluster, _mode)
2238 raise make_ex(ret, "error in umask")
2241 def lseek(self, fd, offset, whence):
2243 Set the file's current position.
2245 :param fd: the file descriptor of the open file to read from.
2246 :param offset: the offset in the file to read from. If this value is negative, the
2247 function reads from the current offset of the file descriptor.
2248 :param whence: the flag to indicate what type of seeking to performs:SEEK_SET, SEEK_CUR, SEEK_END
2250 self.require_state("mounted")
2251 if not isinstance(fd, int):
2252 raise TypeError('fd must be an int')
2253 if not isinstance(offset, int):
2254 raise TypeError('offset must be an int')
2255 if not isinstance(whence, int):
2256 raise TypeError('whence must be an int')
2260 int64_t _offset = offset
2261 int64_t _whence = whence
2264 ret = ceph_lseek(self.cluster, _fd, _offset, _whence)
2267 raise make_ex(ret, "error in lseek")
2271 def utime(self, path, times=None):
2273 Set access and modification time for path
2275 :param path: file path for which timestamps have to be changed
2276 :param times: if times is not None, it must be a tuple (atime, mtime)
2279 self.require_state("mounted")
2280 path = cstr(path, 'path')
2282 if not isinstance(times, tuple):
2283 raise TypeError('times must be a tuple')
2284 if not isinstance(times[0], int):
2285 raise TypeError('atime must be an int')
2286 if not isinstance(times[1], int):
2287 raise TypeError('mtime must be an int')
2288 actime = modtime = int(time.time())
2295 utimbuf buf = utimbuf(actime, modtime)
2297 ret = ceph_utime(self.cluster, pth, &buf)
2299 raise make_ex(ret, "error in utime {}".format(path.decode('utf-8')))
2301 def futime(self, fd, times=None):
2303 Set access and modification time for a file pointed by descriptor
2305 :param fd: file descriptor of the open file
2306 :param times: if times is not None, it must be a tuple (atime, mtime)
2309 self.require_state("mounted")
2310 if not isinstance(fd, int):
2311 raise TypeError('fd must be an int')
2313 if not isinstance(times, tuple):
2314 raise TypeError('times must be a tuple')
2315 if not isinstance(times[0], int):
2316 raise TypeError('atime must be an int')
2317 if not isinstance(times[1], int):
2318 raise TypeError('mtime must be an int')
2319 actime = modtime = int(time.time())
2326 utimbuf buf = utimbuf(actime, modtime)
2328 ret = ceph_futime(self.cluster, _fd, &buf)
2330 raise make_ex(ret, "error in futime")
2332 def utimes(self, path, times=None, follow_symlink=True):
2334 Set access and modification time for path
2336 :param path: file path for which timestamps have to be changed
2337 :param times: if times is not None, it must be a tuple (atime, mtime)
2338 :param follow_symlink: perform the operation on the target file if @path
2339 is a symbolic link (default)
2342 self.require_state("mounted")
2343 path = cstr(path, 'path')
2345 if not isinstance(times, tuple):
2346 raise TypeError('times must be a tuple')
2347 if not isinstance(times[0], (int, float)):
2348 raise TypeError('atime must be an int or a float')
2349 if not isinstance(times[1], (int, float)):
2350 raise TypeError('mtime must be an int or a float')
2351 actime = modtime = time.time()
2353 actime = float(times[0])
2354 modtime = float(times[1])
2358 timeval *buf = [to_timeval(actime), to_timeval(modtime)]
2361 ret = ceph_utimes(self.cluster, pth, buf)
2364 ret = ceph_lutimes(self.cluster, pth, buf)
2366 raise make_ex(ret, "error in utimes {}".format(path.decode('utf-8')))
2368 def lutimes(self, path, times=None):
2370 Set access and modification time for a file. If the file is a symbolic
2371 link do not follow to the target.
2373 :param path: file path for which timestamps have to be changed
2374 :param times: if times is not None, it must be a tuple (atime, mtime)
2376 self.utimes(path, times=times, follow_symlink=False)
2378 def futimes(self, fd, times=None):
2380 Set access and modification time for a file pointer by descriptor
2382 :param fd: file descriptor of the open file
2383 :param times: if times is not None, it must be a tuple (atime, mtime)
2386 self.require_state("mounted")
2387 if not isinstance(fd, int):
2388 raise TypeError('fd must be an int')
2390 if not isinstance(times, tuple):
2391 raise TypeError('times must be a tuple')
2392 if not isinstance(times[0], (int, float)):
2393 raise TypeError('atime must be an int or a float')
2394 if not isinstance(times[1], (int, float)):
2395 raise TypeError('mtime must be an int or a float')
2396 actime = modtime = time.time()
2398 actime = float(times[0])
2399 modtime = float(times[1])
2403 timeval *buf = [to_timeval(actime), to_timeval(modtime)]
2405 ret = ceph_futimes(self.cluster, _fd, buf)
2407 raise make_ex(ret, "error in futimes")
2409 def futimens(self, fd, times=None):
2411 Set access and modification time for a file pointer by descriptor
2413 :param fd: file descriptor of the open file
2414 :param times: if times is not None, it must be a tuple (atime, mtime)
2417 self.require_state("mounted")
2418 if not isinstance(fd, int):
2419 raise TypeError('fd must be an int')
2421 if not isinstance(times, tuple):
2422 raise TypeError('times must be a tuple')
2423 if not isinstance(times[0], (int, float)):
2424 raise TypeError('atime must be an int or a float')
2425 if not isinstance(times[1], (int, float)):
2426 raise TypeError('mtime must be an int or a float')
2427 actime = modtime = time.time()
2429 actime = float(times[0])
2430 modtime = float(times[1])
2434 timespec *buf = [to_timespec(actime), to_timespec(modtime)]
2436 ret = ceph_futimens(self.cluster, _fd, buf)
2438 raise make_ex(ret, "error in futimens")
2440 def get_file_replication(self, fd):
2442 Get the file replication information from an open file descriptor.
2444 :param fd: the open file descriptor referring to the file to get
2445 the replication information of.
2447 self.require_state("mounted")
2448 if not isinstance(fd, int):
2449 raise TypeError('fd must be an int')
2455 ret = ceph_get_file_replication(self.cluster, _fd)
2457 raise make_ex(ret, "error in get_file_replication")
2461 def get_path_replication(self, path):
2463 Get the file replication information given the path.
2465 :param path: the path of the file/directory to get the replication information of.
2467 self.require_state("mounted")
2468 path = cstr(path, 'path')
2474 ret = ceph_get_path_replication(self.cluster, _path)
2476 raise make_ex(ret, "error in get_path_replication")
2480 def get_pool_id(self, pool_name):
2482 Get the id of the named pool.
2484 :param pool_name: the name of the pool.
2487 self.require_state("mounted")
2488 pool_name = cstr(pool_name, 'pool_name')
2491 char* _pool_name = pool_name
2494 ret = ceph_get_pool_id(self.cluster, _pool_name)
2496 raise make_ex(ret, "error in get_pool_id")
2500 def get_pool_replication(self, pool_id):
2502 Get the pool replication factor.
2504 :param pool_id: the pool id to look up
2507 self.require_state("mounted")
2508 if not isinstance(pool_id, int):
2509 raise TypeError('pool_id must be an int')
2512 int _pool_id = pool_id
2515 ret = ceph_get_pool_replication(self.cluster, _pool_id)
2517 raise make_ex(ret, "error in get_pool_replication")
2521 def debug_get_fd_caps(self, fd):
2523 Get the capabilities currently issued to the client given the fd.
2525 :param fd: the file descriptor to get issued
2528 self.require_state("mounted")
2529 if not isinstance(fd, int):
2530 raise TypeError('fd must be an int')
2536 ret = ceph_debug_get_fd_caps(self.cluster, _fd)
2538 raise make_ex(ret, "error in debug_get_fd_caps")
2542 def debug_get_file_caps(self, path):
2544 Get the capabilities currently issued to the client given the path.
2546 :param path: the path of the file/directory to get the capabilities of.
2549 self.require_state("mounted")
2550 path = cstr(path, 'path')
2556 ret = ceph_debug_get_file_caps(self.cluster, _path)
2558 raise make_ex(ret, "error in debug_get_file_caps")
2562 def get_cap_return_timeout(self):
2564 Get the amount of time that the client has to return caps
2566 In the event that a client does not return its caps, the MDS may blocklist
2567 it after this timeout. Applications should check this value and ensure
2568 that they set the delegation timeout to a value lower than this.
2571 self.require_state("mounted")
2574 ret = ceph_get_cap_return_timeout(self.cluster)
2576 raise make_ex(ret, "error in get_cap_return_timeout")
2580 def set_uuid(self, uuid):
2582 Set ceph client uuid. Must be called before mount.
2584 :param uuid: the uuid to set
2587 uuid = cstr(uuid, 'uuid')
2593 ceph_set_uuid(self.cluster, _uuid)
2595 def set_session_timeout(self, timeout):
2597 Set ceph client session timeout. Must be called before mount.
2599 :param timeout: the timeout to set
2602 if not isinstance(timeout, int):
2603 raise TypeError('timeout must be an int')
2606 int _timeout = timeout
2609 ceph_set_session_timeout(self.cluster, _timeout)
2611 def get_layout(self, fd):
2613 Get the file layout from an open file descriptor.
2615 :param fd: the open file descriptor referring to the file to get the layout of.
2618 if not isinstance(fd, int):
2619 raise TypeError('fd must be an int')
2629 dict_result = dict()
2632 ret = ceph_get_file_layout(self.cluster, _fd, &stripe_unit, &stripe_count, &object_size, &pool_id)
2634 raise make_ex(stripe_unit, "error in get_file_layout")
2635 dict_result["stripe_unit"] = stripe_unit
2636 dict_result["stripe_count"] = stripe_count
2637 dict_result["object_size"] = object_size
2638 dict_result["pool_id"] = pool_id
2642 buf = <char *>realloc_chk(buf, buflen)
2644 ret = ceph_get_file_pool_name(self.cluster, _fd, buf, buflen)
2646 dict_result["pool_name"] = decode_cstr(buf)
2648 elif ret == -CEPHFS_ERANGE:
2651 raise make_ex(ret, "error in get_file_pool_name")
2656 def get_default_pool(self):
2658 Get the default pool name and id of cephfs. This returns dict{pool_name, pool_id}.
2664 dict_result = dict()
2668 buf = <char *>realloc_chk(buf, buflen)
2670 ret = ceph_get_default_data_pool_name(self.cluster, buf, buflen)
2672 dict_result["pool_name"] = decode_cstr(buf)
2674 elif ret == -CEPHFS_ERANGE:
2677 raise make_ex(ret, "error in get_default_data_pool_name")
2680 ret = ceph_get_pool_id(self.cluster, buf)
2682 raise make_ex(ret, "error in get_pool_id")
2683 dict_result["pool_id"] = ret