2 This module is a thin wrapper around libcephfs.
5 from cpython cimport PyObject, ref, exc
6 from libc.stdint cimport *
7 from libc.stdlib cimport malloc, realloc, free
11 include "mock_cephfs.pxi"
16 from c_cephfs cimport *
17 from rados cimport Rados
19 from collections import namedtuple
20 from datetime import datetime
24 AT_NO_ATTR_SYNC = 0x4000
25 AT_SYMLINK_NOFOLLOW = 0x100
26 cdef int AT_SYMLINK_NOFOLLOW_CDEF = AT_SYMLINK_NOFOLLOW
27 CEPH_STATX_BASIC_STATS = 0x7ff
28 cdef int CEPH_STATX_BASIC_STATS_CDEF = CEPH_STATX_BASIC_STATS
30 CEPH_STATX_NLINK = 0x2
33 CEPH_STATX_RDEV = 0x10
34 CEPH_STATX_ATIME = 0x20
35 CEPH_STATX_MTIME = 0x40
36 CEPH_STATX_CTIME = 0x80
37 CEPH_STATX_INO = 0x100
38 CEPH_STATX_SIZE = 0x200
39 CEPH_STATX_BLOCKS = 0x400
40 CEPH_STATX_BTIME = 0x800
41 CEPH_STATX_VERSION = 0x1000
43 FALLOC_FL_KEEP_SIZE = 0x01
44 FALLOC_FL_PUNCH_HOLE = 0x02
45 FALLOC_FL_NO_HIDE_STALE = 0x04
47 CEPH_SETATTR_MODE = 0x1
48 CEPH_SETATTR_UID = 0x2
49 CEPH_SETATTR_GID = 0x4
50 CEPH_SETATTR_MTIME = 0x8
51 CEPH_SETATTR_ATIME = 0x10
52 CEPH_SETATTR_SIZE = 0x20
53 CEPH_SETATTR_CTIME = 0x40
54 CEPH_SETATTR_BTIME = 0x200
58 CEPHFS_EBLOCKLISTED = 108
62 CEPHFS_ETIMEDOUT = 110
76 CEPHFS_ENAMETOOLONG = 36
82 CEPHFS_ECANCELED = 125
84 CEPHFS_EOPNOTSUPP = 95
87 CEPHFS_ENOTRECOVERABLE = 131
89 CEPHFS_EWOULDBLOCK = CEPHFS_EAGAIN
92 CEPHFS_EDEADLOCK = CEPHFS_EDEADLK
98 cdef extern from "Python.h":
99 # These are in cpython/string.pxd, but use "object" types instead of
100 # PyObject*, which invokes assumptions in cpython that we need to
101 # legitimately break to implement zero-copy string buffers in Image.read().
102 # This is valid use of the Python API and documented as a special case.
103 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
104 char* PyBytes_AsString(PyObject *string) except NULL
105 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
106 void PyEval_InitThreads()
109 class Error(Exception):
110 def get_error_code(self):
114 class LibCephFSStateError(Error):
118 class OSError(Error):
119 def __init__(self, errno, strerror):
120 super(OSError, self).__init__(errno, strerror)
122 self.strerror = "%s: %s" % (strerror, os.strerror(errno))
125 return '{} [Errno {}]'.format(self.strerror, self.errno)
127 def get_error_code(self):
131 class PermissionError(OSError):
135 class ObjectNotFound(OSError):
139 class NoData(OSError):
143 class ObjectExists(OSError):
147 class IOError(OSError):
151 class NoSpace(OSError):
155 class InvalidValue(OSError):
159 class OperationNotSupported(OSError):
163 class WouldBlock(OSError):
167 class OutOfRange(OSError):
171 class ObjectNotEmpty(OSError):
174 class NotDirectory(OSError):
177 class DiskQuotaExceeded(OSError):
181 cdef errno_to_exception = {
182 CEPHFS_EPERM : PermissionError,
183 CEPHFS_ENOENT : ObjectNotFound,
184 CEPHFS_EIO : IOError,
185 CEPHFS_ENOSPC : NoSpace,
186 CEPHFS_EEXIST : ObjectExists,
187 CEPHFS_ENODATA : NoData,
188 CEPHFS_EINVAL : InvalidValue,
189 CEPHFS_EOPNOTSUPP : OperationNotSupported,
190 CEPHFS_ERANGE : OutOfRange,
191 CEPHFS_EWOULDBLOCK: WouldBlock,
192 CEPHFS_ENOTEMPTY : ObjectNotEmpty,
193 CEPHFS_ENOTDIR : NotDirectory,
194 CEPHFS_EDQUOT : DiskQuotaExceeded,
198 cdef make_ex(ret, msg):
200 Translate a libcephfs return code into an exception.
202 :param ret: the return code
204 :param msg: the error message to use
206 :returns: a subclass of :class:`Error`
209 if ret in errno_to_exception:
210 return errno_to_exception[ret](ret, msg)
212 return OSError(ret, msg)
215 class DirEntry(namedtuple('DirEntry',
216 ['d_ino', 'd_off', 'd_reclen', 'd_type', 'd_name'])):
221 return self.d_type == self.DT_DIR
223 def is_symbol_file(self):
224 return self.d_type == self.DT_LNK
227 return self.d_type == self.DT_REG
229 StatResult = namedtuple('StatResult',
230 ["st_dev", "st_ino", "st_mode", "st_nlink", "st_uid",
231 "st_gid", "st_rdev", "st_size", "st_blksize",
232 "st_blocks", "st_atime", "st_mtime", "st_ctime"])
234 cdef class DirResult(object):
236 cdef ceph_dir_result* handle
238 # Bug in older Cython instances prevents this from being a static method.
240 # cdef create(LibCephFS lib, ceph_dir_result* handle):
246 def __dealloc__(self):
251 raise make_ex(CEPHFS_EBADF, "dir is not open")
252 self.lib.require_state("mounted")
254 ceph_rewinddir(self.lib.cluster, self.handle)
257 def __exit__(self, type_, value, traceback):
262 self.lib.require_state("mounted")
265 dirent = ceph_readdir(self.lib.cluster, self.handle)
269 IF UNAME_SYSNAME == "FreeBSD" or UNAME_SYSNAME == "Darwin":
270 return DirEntry(d_ino=dirent.d_ino,
272 d_reclen=dirent.d_reclen,
273 d_type=dirent.d_type,
274 d_name=dirent.d_name)
276 return DirEntry(d_ino=dirent.d_ino,
278 d_reclen=dirent.d_reclen,
279 d_type=dirent.d_type,
280 d_name=dirent.d_name)
284 self.lib.require_state("mounted")
286 ret = ceph_closedir(self.lib.cluster, self.handle)
288 raise make_ex(ret, "closedir failed")
293 raise make_ex(CEPHFS_EBADF, "dir is not open")
294 self.lib.require_state("mounted")
296 ceph_rewinddir(self.lib.cluster, self.handle)
300 raise make_ex(CEPHFS_EBADF, "dir is not open")
301 self.lib.require_state("mounted")
303 ret = ceph_telldir(self.lib.cluster, self.handle)
305 raise make_ex(ret, "telldir failed")
308 def seekdir(self, offset):
310 raise make_ex(CEPHFS_EBADF, "dir is not open")
311 if not isinstance(offset, int):
312 raise TypeError('offset must be an int')
313 self.lib.require_state("mounted")
314 cdef int64_t _offset = offset
316 ceph_seekdir(self.lib.cluster, self.handle, _offset)
319 def cstr(val, name, encoding="utf-8", opt=False):
321 Create a byte string from a Python string
323 :param basestring val: Python string
324 :param str name: Name of the string parameter, for exceptions
325 :param str encoding: Encoding to use
326 :param bool opt: If True, None is allowed
328 :raises: :class:`InvalidArgument`
330 if opt and val is None:
332 if isinstance(val, bytes):
336 v = val.encode(encoding)
338 raise TypeError('%s must be encodeable as a bytearray' % name)
339 assert isinstance(v, bytes)
342 def cstr_list(list_str, name, encoding="utf-8"):
343 return [cstr(s, name) for s in list_str]
346 def decode_cstr(val, encoding="utf-8"):
348 Decode a byte string into a Python string.
350 :param bytes val: byte string
356 return val.decode(encoding)
358 cdef timeval to_timeval(t):
360 return timeval equivalent from time
363 cdef timeval buf = timeval(tt, (t - tt) * 1000000)
366 cdef timespec to_timespec(t):
368 return timespec equivalent from time
371 cdef timespec buf = timespec(tt, (t - tt) * 1000000000)
374 cdef char* opt_str(s) except? NULL:
380 cdef char ** to_bytes_array(list_bytes):
381 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
383 raise MemoryError("malloc failed")
384 for i in range(len(list_bytes)):
385 ret[i] = <char *>list_bytes[i]
389 cdef void* realloc_chk(void* ptr, size_t size) except NULL:
390 cdef void *ret = realloc(ptr, size)
392 raise MemoryError("realloc failed")
396 cdef iovec * to_iovec(buffers) except NULL:
397 cdef iovec *iov = <iovec *>malloc(len(buffers) * sizeof(iovec))
400 raise MemoryError("malloc failed")
401 for i in xrange(len(buffers)):
402 s = <char*>buffers[i]
403 iov[i] = [<void*>s, len(buffers[i])]
407 cdef class LibCephFS(object):
408 """libcephfs python wrapper"""
410 cdef public object state
411 cdef ceph_mount_info *cluster
413 def require_state(self, *args):
414 if self.state in args:
416 raise LibCephFSStateError("You cannot perform that operation on a "
417 "CephFS object in state %s." % (self.state))
419 def __cinit__(self, conf=None, conffile=None, auth_id=None, rados_inst=None):
420 """Create a libcephfs wrapper
422 :param conf dict opt: settings overriding the default ones and conffile
423 :param conffile str opt: the path to ceph.conf to override the default settings
424 :auth_id str opt: the id used to authenticate the client entity
425 :rados_inst Rados opt: a rados.Rados instance
428 self.state = "uninitialized"
429 if rados_inst is not None:
430 if auth_id is not None or conffile is not None or conf is not None:
431 raise make_ex(CEPHFS_EINVAL,
432 "May not pass RADOS instance as well as other configuration")
434 self.create_with_rados(rados_inst)
436 self.create(conf, conffile, auth_id)
438 def create_with_rados(self, Rados rados_inst):
441 ret = ceph_create_from_rados(&self.cluster, rados_inst.cluster)
443 raise Error("libcephfs_initialize failed with error code: %d" % ret)
444 self.state = "configuring"
447 "special value that indicates no conffile should be read when creating a mount handle"
448 DEFAULT_CONF_FILES = -2
449 "special value that indicates the default conffiles should be read when creating a mount handle"
451 def create(self, conf=None, conffile=NO_CONF_FILE, auth_id=None):
453 Create a mount handle for interacting with Ceph. All libcephfs
454 functions operate on a mount info handle.
456 :param conf dict opt: settings overriding the default ones and conffile
457 :param conffile Union[int,str], optional: the path to ceph.conf to override the default settings
458 :auth_id str opt: the id used to authenticate the client entity
460 if conf is not None and not isinstance(conf, dict):
461 raise TypeError("conf must be dict or None")
462 cstr(conffile, 'configfile', opt=True)
463 auth_id = cstr(auth_id, 'auth_id', opt=True)
466 char* _auth_id = opt_str(auth_id)
470 ret = ceph_create(&self.cluster, <const char*>_auth_id)
472 raise Error("libcephfs_initialize failed with error code: %d" % ret)
474 self.state = "configuring"
475 if conffile in (self.NO_CONF_FILE, None):
477 elif conffile in (self.DEFAULT_CONF_FILES, ''):
478 self.conf_read_file(None)
480 self.conf_read_file(conffile)
482 for key, value in conf.items():
483 self.conf_set(key, value)
487 Return the file system id for this fs client.
489 self.require_state("mounted")
491 ret = ceph_get_fs_cid(self.cluster)
493 raise make_ex(ret, "error fetching fscid")
498 Get associated client addresses with this RADOS session.
500 self.require_state("mounted")
508 ret = ceph_getaddrs(self.cluster, &addrs)
510 raise make_ex(ret, "error calling getaddrs")
512 return decode_cstr(addrs)
514 ceph_buffer_free(addrs)
517 def conf_read_file(self, conffile=None):
519 Load the ceph configuration from the specified config file.
521 :param conffile str opt: the path to ceph.conf to override the default settings
523 conffile = cstr(conffile, 'conffile', opt=True)
525 char *_conffile = opt_str(conffile)
527 ret = ceph_conf_read_file(self.cluster, <const char*>_conffile)
529 raise make_ex(ret, "error calling conf_read_file")
531 def conf_parse_argv(self, argv):
533 Parse the command line arguments and load the configuration parameters.
535 :param argv: the argument list
537 self.require_state("configuring")
538 cargv = cstr_list(argv, 'argv')
540 int _argc = len(argv)
541 char **_argv = to_bytes_array(cargv)
545 ret = ceph_conf_parse_argv(self.cluster, _argc,
546 <const char **>_argv)
548 raise make_ex(ret, "error calling conf_parse_argv")
554 Unmount and destroy the ceph mount handle.
556 if self.state in ["initialized", "mounted"]:
558 ceph_shutdown(self.cluster)
559 self.state = "shutdown"
565 def __exit__(self, type_, value, traceback):
569 def __dealloc__(self):
574 Get the version number of the ``libcephfs`` C library.
576 :returns: a tuple of ``(major, minor, extra)`` components of the
584 ceph_version(&major, &minor, &extra)
585 return (major, minor, extra)
587 def conf_get(self, option):
589 Gets the configuration value as a string.
591 :param option: the config option to get
593 self.require_state("configuring", "initialized", "mounted")
595 option = cstr(option, 'option')
597 char *_option = option
603 ret_buf = <char *>realloc_chk(ret_buf, length)
605 ret = ceph_conf_get(self.cluster, _option, ret_buf, length)
607 return decode_cstr(ret_buf)
608 elif ret == -CEPHFS_ENAMETOOLONG:
610 elif ret == -CEPHFS_ENOENT:
613 raise make_ex(ret, "error calling conf_get")
617 def conf_set(self, option, val):
619 Sets a configuration value from a string.
621 :param option: the configuration option to set
622 :param value: the value of the configuration option to set
624 self.require_state("configuring", "initialized", "mounted")
626 option = cstr(option, 'option')
627 val = cstr(val, 'val')
629 char *_option = option
633 ret = ceph_conf_set(self.cluster, _option, _val)
635 raise make_ex(ret, "error calling conf_set")
637 def set_mount_timeout(self, timeout):
641 :param timeout: mount timeout
643 self.require_state("configuring", "initialized")
644 if not isinstance(timeout, int):
645 raise TypeError('timeout must be an integer')
647 raise make_ex(CEPHFS_EINVAL, 'timeout must be greater than or equal to 0')
649 uint32_t _timeout = timeout
651 ret = ceph_set_mount_timeout(self.cluster, _timeout)
653 raise make_ex(ret, "error setting mount timeout")
657 Initialize the filesystem client (but do not mount the filesystem yet)
659 self.require_state("configuring")
661 ret = ceph_init(self.cluster)
663 raise make_ex(ret, "error calling ceph_init")
664 self.state = "initialized"
666 def mount(self, mount_root=None, filesystem_name=None):
668 Perform a mount using the path for the root of the mount.
670 if self.state == "configuring":
672 self.require_state("initialized")
674 # Configure which filesystem to mount if one was specified
675 if filesystem_name is None:
676 filesystem_name = b""
678 filesystem_name = cstr(filesystem_name, 'filesystem_name')
680 char *_filesystem_name = filesystem_name
683 ret = ceph_select_filesystem(self.cluster,
686 raise make_ex(ret, "error calling ceph_select_filesystem")
688 # Prepare mount_root argument, default to "/"
689 root = b"/" if mount_root is None else mount_root
691 char *_mount_root = root
694 ret = ceph_mount(self.cluster, _mount_root)
696 raise make_ex(ret, "error calling ceph_mount")
697 self.state = "mounted"
701 Unmount a mount handle.
703 self.require_state("mounted")
705 ret = ceph_unmount(self.cluster)
707 raise make_ex(ret, "error calling ceph_unmount")
708 self.state = "initialized"
710 def abort_conn(self):
712 Abort mds connections.
714 self.require_state("mounted")
716 ret = ceph_abort_conn(self.cluster)
718 raise make_ex(ret, "error calling ceph_abort_conn")
719 self.state = "initialized"
721 def get_instance_id(self):
723 Get a global id for current instance
725 self.require_state("initialized", "mounted")
727 ret = ceph_get_instance_id(self.cluster)
730 def statfs(self, path):
732 Perform a statfs on the ceph file system. This call fills in file system wide statistics
733 into the passed in buffer.
735 :param path: any path within the mounted filesystem
737 self.require_state("mounted")
738 path = cstr(path, 'path')
744 ret = ceph_statfs(self.cluster, _path, &statbuf)
746 raise make_ex(ret, "statfs failed: %s" % path)
747 return {'f_bsize': statbuf.f_bsize,
748 'f_frsize': statbuf.f_frsize,
749 'f_blocks': statbuf.f_blocks,
750 'f_bfree': statbuf.f_bfree,
751 'f_bavail': statbuf.f_bavail,
752 'f_files': statbuf.f_files,
753 'f_ffree': statbuf.f_ffree,
754 'f_favail': statbuf.f_favail,
755 'f_fsid': statbuf.f_fsid,
756 'f_flag': statbuf.f_flag,
757 'f_namemax': statbuf.f_namemax}
761 Synchronize all filesystem data to persistent media
763 self.require_state("mounted")
765 ret = ceph_sync_fs(self.cluster)
767 raise make_ex(ret, "sync_fs failed")
769 def fsync(self, int fd, int syncdataonly):
771 Synchronize an open file to persistent media.
773 :param fd: the file descriptor of the file to sync.
774 :param syncdataonly: a boolean whether to synchronize metadata and data (0)
777 self.require_state("mounted")
779 ret = ceph_fsync(self.cluster, fd, syncdataonly)
781 raise make_ex(ret, "fsync failed")
783 def lazyio(self, fd, enable):
785 Enable/disable lazyio for the file.
787 :param fd: the file descriptor of the file for which to enable lazio.
788 :param enable: a boolean to enable lazyio or disable lazyio.
791 self.require_state("mounted")
792 if not isinstance(fd, int):
793 raise TypeError('fd must be an int')
794 if not isinstance(enable, int):
795 raise TypeError('enable must be an int')
802 ret = ceph_lazyio(self.cluster, _fd, _enable)
804 raise make_ex(ret, "lazyio failed")
806 def lazyio_propagate(self, fd, offset, count):
808 Flushes the write buffer for the file thereby propogating the buffered write to the file.
810 :param fd: the file descriptor of the file to sync.
811 :param offset: the byte range starting.
812 :param count: the number of bytes starting from offset.
815 self.require_state("mounted")
816 if not isinstance(fd, int):
817 raise TypeError('fd must be an int')
818 if not isinstance(offset, int):
819 raise TypeError('offset must be an int')
820 if not isinstance(count, int):
821 raise TypeError('count must be an int')
825 int64_t _offset = offset
826 size_t _count = count
829 ret = ceph_lazyio_propagate(self.cluster, _fd, _offset, _count)
831 raise make_ex(ret, "lazyio_propagate failed")
833 def lazyio_synchronize(self, fd, offset, count):
835 Flushes the write buffer for the file and invalidate the read cache. This allows a
836 subsequent read operation to read and cache data directly from the file and hence
837 everyone's propagated writes would be visible.
839 :param fd: the file descriptor of the file to sync.
840 :param offset: the byte range starting.
841 :param count: the number of bytes starting from offset.
844 self.require_state("mounted")
845 if not isinstance(fd, int):
846 raise TypeError('fd must be an int')
847 if not isinstance(offset, int):
848 raise TypeError('offset must be an int')
849 if not isinstance(count, int):
850 raise TypeError('count must be an int')
854 int64_t _offset = offset
855 size_t _count = count
858 ret = ceph_lazyio_synchronize(self.cluster, _fd, _offset, _count)
860 raise make_ex(ret, "lazyio_synchronize failed")
862 def fallocate(self, fd, offset, length, mode=0):
864 Preallocate or release disk space for the file for the byte range.
866 :param fd: the file descriptor of the file to fallocate.
867 :param mode: the flags determines the operation to be performed on the given
868 range. default operation (0) allocate and initialize to zero
869 the file in the byte range, and the file size will be changed
870 if offset + length is greater than the file size. if the
871 FALLOC_FL_KEEP_SIZE flag is specified in the mode, the file size
872 will not be changed. if the FALLOC_FL_PUNCH_HOLE flag is specified
873 in the mode, the operation is deallocate space and zero the byte range.
874 :param offset: the byte range starting.
875 :param length: the length of the range.
878 self.require_state("mounted")
879 if not isinstance(fd, int):
880 raise TypeError('fd must be an int')
881 if not isinstance(mode, int):
882 raise TypeError('mode must be an int')
883 if not isinstance(offset, int):
884 raise TypeError('offset must be an int')
885 if not isinstance(length, int):
886 raise TypeError('length must be an int')
891 int64_t _offset = offset
892 int64_t _length = length
895 ret = ceph_fallocate(self.cluster, _fd, _mode, _offset, _length)
897 raise make_ex(ret, "fallocate failed")
901 Get the current working directory.
903 :rtype the path to the current working directory
905 self.require_state("mounted")
907 ret = ceph_getcwd(self.cluster)
910 def chdir(self, path):
912 Change the current working directory.
914 :param path the path to the working directory to change into.
916 self.require_state("mounted")
918 path = cstr(path, 'path')
919 cdef char* _path = path
921 ret = ceph_chdir(self.cluster, _path)
923 raise make_ex(ret, "chdir failed")
925 def opendir(self, path):
927 Open the given directory.
929 :param path: the path name of the directory to open. Must be either an absolute path
930 or a path relative to the current working directory.
931 :rtype handle: the open directory stream handle
933 self.require_state("mounted")
935 path = cstr(path, 'path')
938 ceph_dir_result* handle
940 ret = ceph_opendir(self.cluster, _path, &handle);
942 raise make_ex(ret, "opendir failed")
948 def readdir(self, DirResult handle):
950 Get the next entry in an open directory.
952 :param handle: the open directory stream handle
953 :rtype dentry: the next directory entry or None if at the end of the
954 directory (or the directory is empty. This pointer
955 should not be freed by the caller, and is only safe to
956 access between return and the next call to readdir or
959 self.require_state("mounted")
961 return handle.readdir()
963 def closedir(self, DirResult handle):
965 Close the open directory.
967 :param handle: the open directory stream handle
969 self.require_state("mounted")
971 return handle.close()
973 def rewinddir(self, DirResult handle):
975 Rewind the directory stream to the beginning of the directory.
977 :param handle: the open directory stream handle
979 return handle.rewinddir()
981 def telldir(self, DirResult handle):
983 Get the current position of a directory stream.
985 :param handle: the open directory stream handle
986 :return value: The position of the directory stream. Note that the offsets
987 returned by ceph_telldir do not have a particular order (cannot
988 be compared with inequality).
990 return handle.telldir()
992 def seekdir(self, DirResult handle, offset):
994 Move the directory stream to a position specified by the given offset.
996 :param handle: the open directory stream handle
997 :param offset: the position to move the directory stream to. This offset should be
998 a value returned by telldir. Note that this value does not refer to
999 the nth entry in a directory, and can not be manipulated with plus
1002 return handle.seekdir(offset)
1004 def mkdir(self, path, mode):
1008 :param path: the path of the directory to create. This must be either an
1009 absolute path or a relative path off of the current working directory.
1010 :param mode: the permissions the directory should have once created.
1013 self.require_state("mounted")
1014 path = cstr(path, 'path')
1015 if not isinstance(mode, int):
1016 raise TypeError('mode must be an int')
1021 ret = ceph_mkdir(self.cluster, _path, _mode)
1023 raise make_ex(ret, "error in mkdir {}".format(path.decode('utf-8')))
1025 def mksnap(self, path, name, mode, metadata={}):
1029 :param path: path of the directory to snapshot.
1030 :param name: snapshot name
1031 :param mode: permission of the snapshot
1032 :param metadata: metadata key/value to store with the snapshot
1034 :raises: :class: `TypeError`
1035 :raises: :class: `Error`
1036 :returns: int: 0 on success
1039 self.require_state("mounted")
1040 path = cstr(path, 'path')
1041 name = cstr(name, 'name')
1042 if not isinstance(mode, int):
1043 raise TypeError('mode must be an int')
1044 if not isinstance(metadata, dict):
1045 raise TypeError('metadata must be an dictionary')
1047 for key, value in metadata.items():
1048 if not isinstance(key, str) or not isinstance(value, str):
1049 raise TypeError('metadata key and values should be strings')
1050 md[key.encode('utf-8')] = value.encode('utf-8')
1056 snap_metadata *_snap_meta = <snap_metadata *>malloc(nr * sizeof(snap_metadata))
1057 if nr and _snap_meta == NULL:
1058 raise MemoryError("malloc failed")
1060 for key, value in md.items():
1061 _snap_meta[i] = snap_metadata(<char*>key, <char*>value)
1064 ret = ceph_mksnap(self.cluster, _path, _name, _mode, _snap_meta, nr)
1067 raise make_ex(ret, "mksnap error")
1070 def rmsnap(self, path, name):
1074 :param path: path of the directory for removing snapshot
1075 :param name: snapshot name
1077 :raises: :class: `Error`
1078 :returns: int: 0 on success
1080 self.require_state("mounted")
1081 path = cstr(path, 'path')
1082 name = cstr(name, 'name')
1086 ret = ceph_rmsnap(self.cluster, _path, _name)
1088 raise make_ex(ret, "rmsnap error")
1091 def snap_info(self, path):
1095 :param path: snapshot path
1097 :raises: :class: `Error`
1098 :returns: dict: snapshot metadata
1100 self.require_state("mounted")
1101 path = cstr(path, 'path')
1105 ret = ceph_get_snap_info(self.cluster, _path, &info)
1107 raise make_ex(ret, "snap_info error")
1109 if info.nr_snap_metadata:
1110 md = {snap_meta.key.decode('utf-8'): snap_meta.value.decode('utf-8') for snap_meta in
1111 info.snap_metadata[:info.nr_snap_metadata]}
1112 ceph_free_snap_info_buffer(&info)
1113 return {'id': info.id, 'metadata': md}
1115 def chmod(self, path, mode) :
1117 Change directory mode.
1119 :param path: the path of the directory to create. This must be either an
1120 absolute path or a relative path off of the current working directory.
1121 :param mode: the permissions the directory should have once created.
1123 self.require_state("mounted")
1124 path = cstr(path, 'path')
1125 if not isinstance(mode, int):
1126 raise TypeError('mode must be an int')
1131 ret = ceph_chmod(self.cluster, _path, _mode)
1133 raise make_ex(ret, "error in chmod {}".format(path.decode('utf-8')))
1135 def lchmod(self, path, mode) -> None:
1137 Change file mode. If the path is a symbolic link, it won't be dereferenced.
1139 :param path: the path of the file. This must be either an absolute path or
1140 a relative path off of the current working directory.
1141 :param mode: the permissions to be set .
1143 self.require_state("mounted")
1144 path = cstr(path, 'path')
1145 if not isinstance(mode, int):
1146 raise TypeError('mode must be an int')
1151 ret = ceph_lchmod(self.cluster, _path, _mode)
1153 raise make_ex(ret, "error in chmod {}".format(path.decode('utf-8')))
1155 def fchmod(self, fd, mode) :
1157 Change file mode based on fd.
1158 :param fd: the file descriptor of the file to change file mode
1159 :param mode: the permissions to be set.
1161 self.require_state("mounted")
1162 if not isinstance(fd, int):
1163 raise TypeError('fd must be an int')
1164 if not isinstance(mode, int):
1165 raise TypeError('mode must be an int')
1170 ret = ceph_fchmod(self.cluster, _fd, _mode)
1172 raise make_ex(ret, "error in fchmod")
1174 def chown(self, path, uid, gid, follow_symlink=True):
1176 Change directory ownership
1178 :param path: the path of the directory to change.
1179 :param uid: the uid to set
1180 :param gid: the gid to set
1181 :param follow_symlink: perform the operation on the target file if @path
1182 is a symbolic link (default)
1184 self.require_state("mounted")
1185 path = cstr(path, 'path')
1186 if not isinstance(uid, int):
1187 raise TypeError('uid must be an int')
1188 elif not isinstance(gid, int):
1189 raise TypeError('gid must be an int')
1197 ret = ceph_chown(self.cluster, _path, _uid, _gid)
1200 ret = ceph_lchown(self.cluster, _path, _uid, _gid)
1202 raise make_ex(ret, "error in chown {}".format(path.decode('utf-8')))
1204 def lchown(self, path, uid, gid):
1206 Change ownership of a symbolic link
1207 :param path: the path of the symbolic link to change.
1208 :param uid: the uid to set
1209 :param gid: the gid to set
1211 self.chown(path, uid, gid, follow_symlink=False)
1213 def fchown(self, fd, uid, gid):
1215 Change file ownership
1216 :param fd: the file descriptor of the file to change ownership
1217 :param uid: the uid to set
1218 :param gid: the gid to set
1220 self.require_state("mounted")
1221 if not isinstance(fd, int):
1222 raise TypeError('fd must be an int')
1223 if not isinstance(uid, int):
1224 raise TypeError('uid must be an int')
1225 elif not isinstance(gid, int):
1226 raise TypeError('gid must be an int')
1233 ret = ceph_fchown(self.cluster, _fd, _uid, _gid)
1235 raise make_ex(ret, "error in fchown")
1237 def mkdirs(self, path, mode):
1239 Create multiple directories at once.
1241 :param path: the full path of directories and sub-directories that should
1243 :param mode: the permissions the directory should have once created
1245 self.require_state("mounted")
1246 path = cstr(path, 'path')
1247 if not isinstance(mode, int):
1248 raise TypeError('mode must be an int')
1254 ret = ceph_mkdirs(self.cluster, _path, _mode)
1256 raise make_ex(ret, "error in mkdirs {}".format(path.decode('utf-8')))
1258 def rmdir(self, path):
1262 :param path: the path of the directory to remove.
1264 self.require_state("mounted")
1265 path = cstr(path, 'path')
1266 cdef char* _path = path
1267 ret = ceph_rmdir(self.cluster, _path)
1269 raise make_ex(ret, "error in rmdir {}".format(path.decode('utf-8')))
1271 def open(self, path, flags, mode=0):
1273 Create and/or open a file.
1275 :param path: the path of the file to open. If the flags parameter includes O_CREAT,
1276 the file will first be created before opening.
1277 :param flags: set of option masks that control how the file is created/opened.
1278 :param mode: the permissions to place on the file if the file does not exist and O_CREAT
1279 is specified in the flags.
1281 self.require_state("mounted")
1282 path = cstr(path, 'path')
1284 if not isinstance(mode, int):
1285 raise TypeError('mode must be an int')
1286 if isinstance(flags, str):
1289 cephfs_flags = os.O_RDONLY
1297 cephfs_flags |= os.O_TRUNC | os.O_CREAT
1298 elif access_flags > 0 and c == '+':
1301 raise make_ex(CEPHFS_EOPNOTSUPP,
1302 "open flags doesn't support %s" % c)
1304 if access_flags == 1:
1305 cephfs_flags |= os.O_RDONLY;
1306 elif access_flags == 2:
1307 cephfs_flags |= os.O_WRONLY;
1309 cephfs_flags |= os.O_RDWR;
1311 elif isinstance(flags, int):
1312 cephfs_flags = flags
1314 raise TypeError("flags must be a string or an integer")
1318 int _flags = cephfs_flags
1322 ret = ceph_open(self.cluster, _path, _flags, _mode)
1324 raise make_ex(ret, "error in open {}".format(path.decode('utf-8')))
1327 def close(self, fd):
1329 Close the open file.
1331 :param fd: the file descriptor referring to the open file.
1334 self.require_state("mounted")
1335 if not isinstance(fd, int):
1336 raise TypeError('fd must be an int')
1339 ret = ceph_close(self.cluster, _fd)
1341 raise make_ex(ret, "error in close")
1343 def read(self, fd, offset, l):
1345 Read data from the file.
1347 :param fd: the file descriptor of the open file to read from.
1348 :param offset: the offset in the file to read from. If this value is negative, the
1349 function reads from the current offset of the file descriptor.
1350 :param l: the flag to indicate what type of seeking to perform
1352 self.require_state("mounted")
1353 if not isinstance(offset, int):
1354 raise TypeError('offset must be an int')
1355 if not isinstance(l, int):
1356 raise TypeError('l must be an int')
1357 if not isinstance(fd, int):
1358 raise TypeError('fd must be an int')
1361 int64_t _offset = offset
1365 PyObject* ret_s = NULL
1367 ret_s = PyBytes_FromStringAndSize(NULL, _length)
1369 ret_buf = PyBytes_AsString(ret_s)
1371 ret = ceph_read(self.cluster, _fd, ret_buf, _length, _offset)
1373 raise make_ex(ret, "error in read")
1376 _PyBytes_Resize(&ret_s, ret)
1378 return <object>ret_s
1380 # We DECREF unconditionally: the cast to object above will have
1381 # INCREFed if necessary. This also takes care of exceptions,
1382 # including if _PyString_Resize fails (that will free the string
1383 # itself and set ret_s to NULL, hence XDECREF).
1384 ref.Py_XDECREF(ret_s)
1386 def preadv(self, fd, buffers, offset):
1388 Write data to a file.
1390 :param fd: the file descriptor of the open file to read from
1391 :param buffers: the list of byte object to read from the file
1392 :param offset: the offset of the file read from. If this value is negative, the
1393 function reads from the current offset of the file descriptor.
1395 self.require_state("mounted")
1396 if not isinstance(fd, int):
1397 raise TypeError('fd must be an int')
1398 if not isinstance(buffers, list):
1399 raise TypeError('buffers must be a list')
1401 if not isinstance(buf, bytearray):
1402 raise TypeError('buffers must be a list of bytes')
1403 if not isinstance(offset, int):
1404 raise TypeError('offset must be an int')
1408 int _iovcnt = len(buffers)
1409 int64_t _offset = offset
1410 iovec *_iov = to_iovec(buffers)
1413 ret = ceph_preadv(self.cluster, _fd, _iov, _iovcnt, _offset)
1415 raise make_ex(ret, "error in preadv")
1420 def write(self, fd, buf, offset):
1422 Write data to a file.
1424 :param fd: the file descriptor of the open file to write to
1425 :param buf: the bytes to write to the file
1426 :param offset: the offset of the file write into. If this value is negative, the
1427 function writes to the current offset of the file descriptor.
1429 self.require_state("mounted")
1430 if not isinstance(fd, int):
1431 raise TypeError('fd must be an int')
1432 if not isinstance(buf, bytes):
1433 raise TypeError('buf must be a bytes')
1434 if not isinstance(offset, int):
1435 raise TypeError('offset must be an int')
1440 int64_t _offset = offset
1442 size_t length = len(buf)
1445 ret = ceph_write(self.cluster, _fd, _data, length, _offset)
1447 raise make_ex(ret, "error in write")
1450 def pwritev(self, fd, buffers, offset):
1452 Write data to a file.
1454 :param fd: the file descriptor of the open file to write to
1455 :param buffers: the list of byte object to write to the file
1456 :param offset: the offset of the file write into. If this value is negative, the
1457 function writes to the current offset of the file descriptor.
1459 self.require_state("mounted")
1460 if not isinstance(fd, int):
1461 raise TypeError('fd must be an int')
1462 if not isinstance(buffers, list):
1463 raise TypeError('buffers must be a list')
1465 if not isinstance(buf, bytes):
1466 raise TypeError('buffers must be a list of bytes')
1467 if not isinstance(offset, int):
1468 raise TypeError('offset must be an int')
1472 int _iovcnt = len(buffers)
1473 int64_t _offset = offset
1474 iovec *_iov = to_iovec(buffers)
1477 ret = ceph_pwritev(self.cluster, _fd, _iov, _iovcnt, _offset)
1479 raise make_ex(ret, "error in pwritev")
1484 def flock(self, fd, operation, owner):
1486 Apply or remove an advisory lock.
1488 :param fd: the open file descriptor to change advisory lock.
1489 :param operation: the advisory lock operation to be performed on the file
1490 :param owner: the user-supplied owner identifier (an arbitrary integer)
1492 self.require_state("mounted")
1493 if not isinstance(fd, int):
1494 raise TypeError('fd must be an int')
1495 if not isinstance(operation, int):
1496 raise TypeError('operation must be an int')
1501 uint64_t _owner = owner
1504 ret = ceph_flock(self.cluster, _fd, _op, _owner)
1506 raise make_ex(ret, "error in write")
1509 def truncate(self, path, size):
1511 Truncate the file to the given size. If this operation causes the
1512 file to expand, the empty bytes will be filled in with zeros.
1514 :param path: the path to the file to truncate.
1515 :param size: the new size of the file.
1518 if not isinstance(size, int):
1519 raise TypeError('size must be a int')
1522 statx_dict["size"] = size
1523 self.setattrx(path, statx_dict, CEPH_SETATTR_SIZE, AT_SYMLINK_NOFOLLOW)
1525 def ftruncate(self, fd, size):
1527 Truncate the file to the given size. If this operation causes the
1528 file to expand, the empty bytes will be filled in with zeros.
1530 :param path: the path to the file to truncate.
1531 :param size: the new size of the file.
1534 if not isinstance(size, int):
1535 raise TypeError('size must be a int')
1538 statx_dict["size"] = size
1539 self.fsetattrx(fd, statx_dict, CEPH_SETATTR_SIZE)
1541 def mknod(self, path, mode, rdev=0):
1543 Make a block or character special file.
1545 :param path: the path to the special file.
1546 :param mode: the permissions to use and the type of special file. The type can be
1547 one of stat.S_IFREG, stat.S_IFCHR, stat.S_IFBLK, stat.S_IFIFO. Both
1548 should be combined using bitwise OR.
1549 :param rdev: If the file type is stat.S_IFCHR or stat.S_IFBLK then this parameter
1550 specifies the major and minor numbers of the newly created device
1551 special file. Otherwise, it is ignored.
1553 self.require_state("mounted")
1554 path = cstr(path, 'path')
1556 if not isinstance(mode, int):
1557 raise TypeError('mode must be an int')
1558 if not isinstance(rdev, int):
1559 raise TypeError('rdev must be an int')
1567 ret = ceph_mknod(self.cluster, _path, _mode, _rdev)
1569 raise make_ex(ret, "error in mknod {}".format(path.decode('utf-8')))
1571 def getxattr(self, path, name, size=255, follow_symlink=True):
1573 Get an extended attribute.
1575 :param path: the path to the file
1576 :param name: the name of the extended attribute to get
1577 :param size: the size of the pre-allocated buffer
1579 self.require_state("mounted")
1581 path = cstr(path, 'path')
1582 name = cstr(name, 'name')
1588 size_t ret_length = size
1589 char *ret_buf = NULL
1592 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
1595 ret = ceph_getxattr(self.cluster, _path, _name, ret_buf,
1599 ret = ceph_lgetxattr(self.cluster, _path, _name, ret_buf,
1603 raise make_ex(ret, "error in getxattr")
1605 return ret_buf[:ret]
1609 def fgetxattr(self, fd, name, size=255):
1611 Get an extended attribute given the fd of a file.
1613 :param fd: the open file descriptor referring to the file
1614 :param name: the name of the extended attribute to get
1615 :param size: the size of the pre-allocated buffer
1617 self.require_state("mounted")
1619 if not isinstance(fd, int):
1620 raise TypeError('fd must be an int')
1621 name = cstr(name, 'name')
1627 size_t ret_length = size
1628 char *ret_buf = NULL
1631 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
1633 ret = ceph_fgetxattr(self.cluster, _fd, _name, ret_buf,
1637 raise make_ex(ret, "error in fgetxattr")
1639 return ret_buf[:ret]
1643 def lgetxattr(self, path, name, size=255):
1645 Get an extended attribute without following symbolic links. This
1646 function is identical to ceph_getxattr, but if the path refers to
1647 a symbolic link, we get the extended attributes of the symlink
1648 rather than the attributes of the file it points to.
1650 :param path: the path to the file
1651 :param name: the name of the extended attribute to get
1652 :param size: the size of the pre-allocated buffer
1655 return self.getxattr(path, name, size=size, follow_symlink=False)
1657 def setxattr(self, path, name, value, flags, follow_symlink=True):
1659 Set an extended attribute on a file.
1661 :param path: the path to the file.
1662 :param name: the name of the extended attribute to set.
1663 :param value: the bytes of the extended attribute value
1665 self.require_state("mounted")
1667 name = cstr(name, 'name')
1668 path = cstr(path, 'path')
1669 if not isinstance(flags, int):
1670 raise TypeError('flags must be a int')
1671 if not isinstance(value, bytes):
1672 raise TypeError('value must be a bytes')
1677 char *_value = value
1678 size_t _value_len = len(value)
1683 ret = ceph_setxattr(self.cluster, _path, _name,
1684 _value, _value_len, _flags)
1687 ret = ceph_lsetxattr(self.cluster, _path, _name,
1688 _value, _value_len, _flags)
1691 raise make_ex(ret, "error in setxattr")
1693 def fsetxattr(self, fd, name, value, flags):
1695 Set an extended attribute on a file.
1697 :param fd: the open file descriptor referring to the file.
1698 :param name: the name of the extended attribute to set.
1699 :param value: the bytes of the extended attribute value
1701 self.require_state("mounted")
1703 name = cstr(name, 'name')
1704 if not isinstance(fd, int):
1705 raise TypeError('fd must be an int')
1706 if not isinstance(flags, int):
1707 raise TypeError('flags must be a int')
1708 if not isinstance(value, bytes):
1709 raise TypeError('value must be a bytes')
1714 char *_value = value
1715 size_t _value_len = len(value)
1719 ret = ceph_fsetxattr(self.cluster, _fd, _name,
1720 _value, _value_len, _flags)
1722 raise make_ex(ret, "error in fsetxattr")
1724 def lsetxattr(self, path, name, value, flags):
1726 Set an extended attribute on a file but do not follow symbolic link.
1728 :param path: the path to the file.
1729 :param name: the name of the extended attribute to set.
1730 :param value: the bytes of the extended attribute value
1733 self.setxattr(path, name, value, flags, follow_symlink=False)
1735 def removexattr(self, path, name, follow_symlink=True):
1737 Remove an extended attribute of a file.
1739 :param path: path of the file.
1740 :param name: name of the extended attribute to remove.
1742 self.require_state("mounted")
1744 name = cstr(name, 'name')
1745 path = cstr(path, 'path')
1753 ret = ceph_removexattr(self.cluster, _path, _name)
1756 ret = ceph_lremovexattr(self.cluster, _path, _name)
1759 raise make_ex(ret, "error in removexattr")
1761 def fremovexattr(self, fd, name):
1763 Remove an extended attribute of a file.
1765 :param fd: the open file descriptor referring to the file.
1766 :param name: name of the extended attribute to remove.
1768 self.require_state("mounted")
1770 if not isinstance(fd, int):
1771 raise TypeError('fd must be an int')
1772 name = cstr(name, 'name')
1779 ret = ceph_fremovexattr(self.cluster, _fd, _name)
1781 raise make_ex(ret, "error in fremovexattr")
1783 def lremovexattr(self, path, name):
1785 Remove an extended attribute of a file but do not follow symbolic link.
1787 :param path: path of the file.
1788 :param name: name of the extended attribute to remove.
1790 self.removexattr(path, name, follow_symlink=False)
1792 def listxattr(self, path, size=65536, follow_symlink=True):
1794 List the extended attribute keys set on a file.
1796 :param path: path of the file.
1797 :param size: the size of list buffer to be filled with extended attribute keys.
1799 self.require_state("mounted")
1801 path = cstr(path, 'path')
1805 char *ret_buf = NULL
1806 size_t ret_length = size
1809 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
1812 ret = ceph_listxattr(self.cluster, _path, ret_buf, ret_length)
1815 ret = ceph_llistxattr(self.cluster, _path, ret_buf, ret_length)
1818 raise make_ex(ret, "error in listxattr")
1820 return ret, ret_buf[:ret]
1824 def flistxattr(self, fd, size=65536):
1826 List the extended attribute keys set on a file.
1828 :param fd: the open file descriptor referring to the file.
1829 :param size: the size of list buffer to be filled with extended attribute keys.
1831 self.require_state("mounted")
1833 if not isinstance(fd, int):
1834 raise TypeError('fd must be an int')
1838 char *ret_buf = NULL
1839 size_t ret_length = size
1842 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
1844 ret = ceph_flistxattr(self.cluster, _fd, ret_buf, ret_length)
1847 raise make_ex(ret, "error in flistxattr")
1849 return ret, ret_buf[:ret]
1853 def llistxattr(self, path, size=65536):
1855 List the extended attribute keys set on a file but do not follow symbolic link.
1857 :param path: path of the file.
1858 :param size: the size of list buffer to be filled with extended attribute keys.
1861 return self.listxattr(path, size=size, follow_symlink=False)
1863 def stat(self, path, follow_symlink=True):
1865 Get a file's extended statistics and attributes.
1867 :param path: the file or directory to get the statistics of.
1869 self.require_state("mounted")
1870 path = cstr(path, 'path')
1878 ret = ceph_statx(self.cluster, _path, &stx,
1879 CEPH_STATX_BASIC_STATS_CDEF, 0)
1882 ret = ceph_statx(self.cluster, _path, &stx,
1883 CEPH_STATX_BASIC_STATS_CDEF, AT_SYMLINK_NOFOLLOW_CDEF)
1886 raise make_ex(ret, "error in stat: {}".format(path.decode('utf-8')))
1887 return StatResult(st_dev=stx.stx_dev, st_ino=stx.stx_ino,
1888 st_mode=stx.stx_mode, st_nlink=stx.stx_nlink,
1889 st_uid=stx.stx_uid, st_gid=stx.stx_gid,
1890 st_rdev=stx.stx_rdev, st_size=stx.stx_size,
1891 st_blksize=stx.stx_blksize,
1892 st_blocks=stx.stx_blocks,
1893 st_atime=datetime.fromtimestamp(stx.stx_atime.tv_sec),
1894 st_mtime=datetime.fromtimestamp(stx.stx_mtime.tv_sec),
1895 st_ctime=datetime.fromtimestamp(stx.stx_ctime.tv_sec))
1897 def lstat(self, path):
1899 Get a file's extended statistics and attributes. When file's a
1900 symbolic link, return the informaion of the link itself rather
1901 than that of the file it points too.
1903 :param path: the file or directory to get the statistics of.
1905 return self.stat(path, follow_symlink=False)
1907 def fstat(self, fd):
1909 Get an open file's extended statistics and attributes.
1911 :param fd: the file descriptor of the file to get statistics of.
1913 self.require_state("mounted")
1914 if not isinstance(fd, int):
1915 raise TypeError('fd must be an int')
1922 ret = ceph_fstatx(self.cluster, _fd, &stx,
1923 CEPH_STATX_BASIC_STATS_CDEF, 0)
1925 raise make_ex(ret, "error in fsat")
1926 return StatResult(st_dev=stx.stx_dev, st_ino=stx.stx_ino,
1927 st_mode=stx.stx_mode, st_nlink=stx.stx_nlink,
1928 st_uid=stx.stx_uid, st_gid=stx.stx_gid,
1929 st_rdev=stx.stx_rdev, st_size=stx.stx_size,
1930 st_blksize=stx.stx_blksize,
1931 st_blocks=stx.stx_blocks,
1932 st_atime=datetime.fromtimestamp(stx.stx_atime.tv_sec),
1933 st_mtime=datetime.fromtimestamp(stx.stx_mtime.tv_sec),
1934 st_ctime=datetime.fromtimestamp(stx.stx_ctime.tv_sec))
1936 def statx(self, path, mask, flag):
1938 Get a file's extended statistics and attributes.
1940 :param path: the file or directory to get the statistics of.
1941 :param mask: want bitfield of CEPH_STATX_* flags showing designed attributes.
1942 :param flag: bitfield that can be used to set AT_* modifier flags (only AT_NO_ATTR_SYNC and AT_SYMLINK_NOFOLLOW)
1945 self.require_state("mounted")
1946 path = cstr(path, 'path')
1947 if not isinstance(mask, int):
1948 raise TypeError('flag must be a int')
1949 if not isinstance(flag, int):
1950 raise TypeError('flag must be a int')
1957 dict_result = dict()
1960 ret = ceph_statx(self.cluster, _path, &stx, _mask, _flag)
1962 raise make_ex(ret, "error in stat: %s" % path)
1964 if (_mask & CEPH_STATX_MODE):
1965 dict_result["mode"] = stx.stx_mode
1966 if (_mask & CEPH_STATX_NLINK):
1967 dict_result["nlink"] = stx.stx_nlink
1968 if (_mask & CEPH_STATX_UID):
1969 dict_result["uid"] = stx.stx_uid
1970 if (_mask & CEPH_STATX_GID):
1971 dict_result["gid"] = stx.stx_gid
1972 if (_mask & CEPH_STATX_RDEV):
1973 dict_result["rdev"] = stx.stx_rdev
1974 if (_mask & CEPH_STATX_ATIME):
1975 dict_result["atime"] = datetime.fromtimestamp(stx.stx_atime.tv_sec)
1976 if (_mask & CEPH_STATX_MTIME):
1977 dict_result["mtime"] = datetime.fromtimestamp(stx.stx_mtime.tv_sec)
1978 if (_mask & CEPH_STATX_CTIME):
1979 dict_result["ctime"] = datetime.fromtimestamp(stx.stx_ctime.tv_sec)
1980 if (_mask & CEPH_STATX_INO):
1981 dict_result["ino"] = stx.stx_ino
1982 if (_mask & CEPH_STATX_SIZE):
1983 dict_result["size"] = stx.stx_size
1984 if (_mask & CEPH_STATX_BLOCKS):
1985 dict_result["blocks"] = stx.stx_blocks
1986 if (_mask & CEPH_STATX_BTIME):
1987 dict_result["btime"] = datetime.fromtimestamp(stx.stx_btime.tv_sec)
1988 if (_mask & CEPH_STATX_VERSION):
1989 dict_result["version"] = stx.stx_version
1993 def setattrx(self, path, dict_stx, mask, flags):
1995 Set a file's attributes.
1997 :param path: the path to the file/directory to set the attributes of.
1998 :param mask: a mask of all the CEPH_SETATTR_* values that have been set in the statx struct.
1999 :param stx: a dict of statx structure that must include attribute values to set on the file.
2000 :param flags: mask of AT_* flags (only AT_ATTR_NOFOLLOW is respected for now)
2003 self.require_state("mounted")
2004 path = cstr(path, 'path')
2005 if not isinstance(dict_stx, dict):
2006 raise TypeError('dict_stx must be a dict')
2007 if not isinstance(mask, int):
2008 raise TypeError('mask must be a int')
2009 if not isinstance(flags, int):
2010 raise TypeError('flags must be a int')
2014 if (mask & CEPH_SETATTR_MODE):
2015 stx.stx_mode = dict_stx["mode"]
2016 if (mask & CEPH_SETATTR_UID):
2017 stx.stx_uid = dict_stx["uid"]
2018 if (mask & CEPH_SETATTR_GID):
2019 stx.stx_gid = dict_stx["gid"]
2020 if (mask & CEPH_SETATTR_MTIME):
2021 stx.stx_mtime = to_timespec(dict_stx["mtime"].timestamp())
2022 if (mask & CEPH_SETATTR_ATIME):
2023 stx.stx_atime = to_timespec(dict_stx["atime"].timestamp())
2024 if (mask & CEPH_SETATTR_CTIME):
2025 stx.stx_ctime = to_timespec(dict_stx["ctime"].timestamp())
2026 if (mask & CEPH_SETATTR_SIZE):
2027 stx.stx_size = dict_stx["size"]
2028 if (mask & CEPH_SETATTR_BTIME):
2029 stx.stx_btime = to_timespec(dict_stx["btime"].timestamp())
2035 dict_result = dict()
2038 ret = ceph_setattrx(self.cluster, _path, &stx, _mask, _flags)
2040 raise make_ex(ret, "error in setattrx: %s" % path)
2042 def fsetattrx(self, fd, dict_stx, mask):
2044 Set a file's attributes.
2046 :param path: the path to the file/directory to set the attributes of.
2047 :param mask: a mask of all the CEPH_SETATTR_* values that have been set in the statx struct.
2048 :param stx: a dict of statx structure that must include attribute values to set on the file.
2051 self.require_state("mounted")
2052 if not isinstance(fd, int):
2053 raise TypeError('fd must be a int')
2054 if not isinstance(dict_stx, dict):
2055 raise TypeError('dict_stx must be a dict')
2056 if not isinstance(mask, int):
2057 raise TypeError('mask must be a int')
2061 if (mask & CEPH_SETATTR_MODE):
2062 stx.stx_mode = dict_stx["mode"]
2063 if (mask & CEPH_SETATTR_UID):
2064 stx.stx_uid = dict_stx["uid"]
2065 if (mask & CEPH_SETATTR_GID):
2066 stx.stx_gid = dict_stx["gid"]
2067 if (mask & CEPH_SETATTR_MTIME):
2068 stx.stx_mtime = to_timespec(dict_stx["mtime"].timestamp())
2069 if (mask & CEPH_SETATTR_ATIME):
2070 stx.stx_atime = to_timespec(dict_stx["atime"].timestamp())
2071 if (mask & CEPH_SETATTR_CTIME):
2072 stx.stx_ctime = to_timespec(dict_stx["ctime"].timestamp())
2073 if (mask & CEPH_SETATTR_SIZE):
2074 stx.stx_size = dict_stx["size"]
2075 if (mask & CEPH_SETATTR_BTIME):
2076 stx.stx_btime = to_timespec(dict_stx["btime"].timestamp())
2081 dict_result = dict()
2084 ret = ceph_fsetattrx(self.cluster, _fd, &stx, _mask)
2086 raise make_ex(ret, "error in fsetattrx")
2088 def symlink(self, existing, newname):
2090 Creates a symbolic link.
2092 :param existing: the path to the existing file/directory to link to.
2093 :param newname: the path to the new file/directory to link from.
2095 self.require_state("mounted")
2096 existing = cstr(existing, 'existing')
2097 newname = cstr(newname, 'newname')
2099 char* _existing = existing
2100 char* _newname = newname
2103 ret = ceph_symlink(self.cluster, _existing, _newname)
2105 raise make_ex(ret, "error in symlink")
2107 def link(self, existing, newname):
2111 :param existing: the path to the existing file/directory to link to.
2112 :param newname: the path to the new file/directory to link from.
2115 self.require_state("mounted")
2116 existing = cstr(existing, 'existing')
2117 newname = cstr(newname, 'newname')
2119 char* _existing = existing
2120 char* _newname = newname
2123 ret = ceph_link(self.cluster, _existing, _newname)
2125 raise make_ex(ret, "error in link")
2127 def readlink(self, path, size):
2129 Read a symbolic link.
2131 :param path: the path to the symlink to read
2132 :param size: the length of the buffer
2133 :rtype buf: buffer to hold the path of the file that the symlink points to.
2135 self.require_state("mounted")
2136 path = cstr(path, 'path')
2140 int64_t _size = size
2144 buf = <char *>realloc_chk(buf, _size)
2146 ret = ceph_readlink(self.cluster, _path, buf, _size)
2148 raise make_ex(ret, "error in readlink")
2153 def unlink(self, path):
2155 Removes a file, link, or symbolic link. If the file/link has multiple links to it, the
2156 file will not disappear from the namespace until all references to it are removed.
2158 :param path: the path of the file or link to unlink.
2160 self.require_state("mounted")
2161 path = cstr(path, 'path')
2162 cdef char* _path = path
2164 ret = ceph_unlink(self.cluster, _path)
2166 raise make_ex(ret, "error in unlink: {}".format(path.decode('utf-8')))
2168 def rename(self, src, dst):
2170 Rename a file or directory.
2172 :param src: the path to the existing file or directory.
2173 :param dst: the new name of the file or directory.
2176 self.require_state("mounted")
2178 src = cstr(src, 'source')
2179 dst = cstr(dst, 'destination')
2186 ret = ceph_rename(self.cluster, _src, _dst)
2188 raise make_ex(ret, "error in rename {} to {}".format(src.decode(
2189 'utf-8'), dst.decode('utf-8')))
2191 def mds_command(self, mds_spec, args, input_data):
2193 :return 3-tuple of output status int, output status string, output data
2195 mds_spec = cstr(mds_spec, 'mds_spec')
2196 args = cstr(args, 'args')
2197 input_data = cstr(input_data, 'input_data')
2200 char *_mds_spec = opt_str(mds_spec)
2201 char **_cmd = to_bytes_array([args])
2204 char *_inbuf = input_data
2205 size_t _inbuf_len = len(input_data)
2207 char *_outbuf = NULL
2208 size_t _outbuf_len = 0
2210 size_t _outs_len = 0
2214 ret = ceph_mds_command(self.cluster, _mds_spec,
2215 <const char **>_cmd, _cmdlen,
2216 <const char*>_inbuf, _inbuf_len,
2217 &_outbuf, &_outbuf_len,
2219 my_outs = decode_cstr(_outs[:_outs_len])
2220 my_outbuf = _outbuf[:_outbuf_len]
2222 ceph_buffer_free(_outs)
2224 ceph_buffer_free(_outbuf)
2225 return (ret, my_outbuf, my_outs)
2229 def umask(self, mode) :
2230 self.require_state("mounted")
2234 ret = ceph_umask(self.cluster, _mode)
2236 raise make_ex(ret, "error in umask")
2239 def lseek(self, fd, offset, whence):
2241 Set the file's current position.
2243 :param fd: the file descriptor of the open file to read from.
2244 :param offset: the offset in the file to read from. If this value is negative, the
2245 function reads from the current offset of the file descriptor.
2246 :param whence: the flag to indicate what type of seeking to performs:SEEK_SET, SEEK_CUR, SEEK_END
2248 self.require_state("mounted")
2249 if not isinstance(fd, int):
2250 raise TypeError('fd must be an int')
2251 if not isinstance(offset, int):
2252 raise TypeError('offset must be an int')
2253 if not isinstance(whence, int):
2254 raise TypeError('whence must be an int')
2258 int64_t _offset = offset
2259 int64_t _whence = whence
2262 ret = ceph_lseek(self.cluster, _fd, _offset, _whence)
2265 raise make_ex(ret, "error in lseek")
2269 def utime(self, path, times=None):
2271 Set access and modification time for path
2273 :param path: file path for which timestamps have to be changed
2274 :param times: if times is not None, it must be a tuple (atime, mtime)
2277 self.require_state("mounted")
2278 path = cstr(path, 'path')
2280 if not isinstance(times, tuple):
2281 raise TypeError('times must be a tuple')
2282 if not isinstance(times[0], int):
2283 raise TypeError('atime must be an int')
2284 if not isinstance(times[1], int):
2285 raise TypeError('mtime must be an int')
2286 actime = modtime = int(time.time())
2293 utimbuf buf = utimbuf(actime, modtime)
2295 ret = ceph_utime(self.cluster, pth, &buf)
2297 raise make_ex(ret, "error in utime {}".format(path.decode('utf-8')))
2299 def futime(self, fd, times=None):
2301 Set access and modification time for a file pointed by descriptor
2303 :param fd: file descriptor of the open file
2304 :param times: if times is not None, it must be a tuple (atime, mtime)
2307 self.require_state("mounted")
2308 if not isinstance(fd, int):
2309 raise TypeError('fd must be an int')
2311 if not isinstance(times, tuple):
2312 raise TypeError('times must be a tuple')
2313 if not isinstance(times[0], int):
2314 raise TypeError('atime must be an int')
2315 if not isinstance(times[1], int):
2316 raise TypeError('mtime must be an int')
2317 actime = modtime = int(time.time())
2324 utimbuf buf = utimbuf(actime, modtime)
2326 ret = ceph_futime(self.cluster, _fd, &buf)
2328 raise make_ex(ret, "error in futime")
2330 def utimes(self, path, times=None, follow_symlink=True):
2332 Set access and modification time for path
2334 :param path: file path for which timestamps have to be changed
2335 :param times: if times is not None, it must be a tuple (atime, mtime)
2336 :param follow_symlink: perform the operation on the target file if @path
2337 is a symbolic link (default)
2340 self.require_state("mounted")
2341 path = cstr(path, 'path')
2343 if not isinstance(times, tuple):
2344 raise TypeError('times must be a tuple')
2345 if not isinstance(times[0], (int, float)):
2346 raise TypeError('atime must be an int or a float')
2347 if not isinstance(times[1], (int, float)):
2348 raise TypeError('mtime must be an int or a float')
2349 actime = modtime = time.time()
2351 actime = float(times[0])
2352 modtime = float(times[1])
2356 timeval *buf = [to_timeval(actime), to_timeval(modtime)]
2359 ret = ceph_utimes(self.cluster, pth, buf)
2362 ret = ceph_lutimes(self.cluster, pth, buf)
2364 raise make_ex(ret, "error in utimes {}".format(path.decode('utf-8')))
2366 def lutimes(self, path, times=None):
2368 Set access and modification time for a file. If the file is a symbolic
2369 link do not follow to the target.
2371 :param path: file path for which timestamps have to be changed
2372 :param times: if times is not None, it must be a tuple (atime, mtime)
2374 self.utimes(path, times=times, follow_symlink=False)
2376 def futimes(self, fd, times=None):
2378 Set access and modification time for a file pointer by descriptor
2380 :param fd: file descriptor of the open file
2381 :param times: if times is not None, it must be a tuple (atime, mtime)
2384 self.require_state("mounted")
2385 if not isinstance(fd, int):
2386 raise TypeError('fd must be an int')
2388 if not isinstance(times, tuple):
2389 raise TypeError('times must be a tuple')
2390 if not isinstance(times[0], (int, float)):
2391 raise TypeError('atime must be an int or a float')
2392 if not isinstance(times[1], (int, float)):
2393 raise TypeError('mtime must be an int or a float')
2394 actime = modtime = time.time()
2396 actime = float(times[0])
2397 modtime = float(times[1])
2401 timeval *buf = [to_timeval(actime), to_timeval(modtime)]
2403 ret = ceph_futimes(self.cluster, _fd, buf)
2405 raise make_ex(ret, "error in futimes")
2407 def futimens(self, fd, times=None):
2409 Set access and modification time for a file pointer by descriptor
2411 :param fd: file descriptor of the open file
2412 :param times: if times is not None, it must be a tuple (atime, mtime)
2415 self.require_state("mounted")
2416 if not isinstance(fd, int):
2417 raise TypeError('fd must be an int')
2419 if not isinstance(times, tuple):
2420 raise TypeError('times must be a tuple')
2421 if not isinstance(times[0], (int, float)):
2422 raise TypeError('atime must be an int or a float')
2423 if not isinstance(times[1], (int, float)):
2424 raise TypeError('mtime must be an int or a float')
2425 actime = modtime = time.time()
2427 actime = float(times[0])
2428 modtime = float(times[1])
2432 timespec *buf = [to_timespec(actime), to_timespec(modtime)]
2434 ret = ceph_futimens(self.cluster, _fd, buf)
2436 raise make_ex(ret, "error in futimens")
2438 def get_file_replication(self, fd):
2440 Get the file replication information from an open file descriptor.
2442 :param fd : the open file descriptor referring to the file to get
2443 the replication information of.
2445 self.require_state("mounted")
2446 if not isinstance(fd, int):
2447 raise TypeError('fd must be an int')
2453 ret = ceph_get_file_replication(self.cluster, _fd)
2455 raise make_ex(ret, "error in get_file_replication")
2459 def get_path_replication(self, path):
2461 Get the file replication information given the path.
2463 :param path: the path of the file/directory to get the replication information of.
2465 self.require_state("mounted")
2466 path = cstr(path, 'path')
2472 ret = ceph_get_path_replication(self.cluster, _path)
2474 raise make_ex(ret, "error in get_path_replication")
2478 def get_pool_id(self, pool_name):
2480 Get the id of the named pool.
2482 :param pool_name: the name of the pool.
2485 self.require_state("mounted")
2486 pool_name = cstr(pool_name, 'pool_name')
2489 char* _pool_name = pool_name
2492 ret = ceph_get_pool_id(self.cluster, _pool_name)
2494 raise make_ex(ret, "error in get_pool_id")
2498 def get_pool_replication(self, pool_id):
2500 Get the pool replication factor.
2502 :param pool_id: the pool id to look up
2505 self.require_state("mounted")
2506 if not isinstance(pool_id, int):
2507 raise TypeError('pool_id must be an int')
2510 int _pool_id = pool_id
2513 ret = ceph_get_pool_replication(self.cluster, _pool_id)
2515 raise make_ex(ret, "error in get_pool_replication")
2519 def debug_get_fd_caps(self, fd):
2521 Get the capabilities currently issued to the client given the fd.
2523 :param fd: the file descriptor to get issued
2526 self.require_state("mounted")
2527 if not isinstance(fd, int):
2528 raise TypeError('fd must be an int')
2534 ret = ceph_debug_get_fd_caps(self.cluster, _fd)
2536 raise make_ex(ret, "error in debug_get_fd_caps")
2540 def debug_get_file_caps(self, path):
2542 Get the capabilities currently issued to the client given the path.
2544 :param path: the path of the file/directory to get the capabilities of.
2547 self.require_state("mounted")
2548 path = cstr(path, 'path')
2554 ret = ceph_debug_get_file_caps(self.cluster, _path)
2556 raise make_ex(ret, "error in debug_get_file_caps")
2560 def get_cap_return_timeout(self):
2562 Get the amount of time that the client has to return caps
2564 In the event that a client does not return its caps, the MDS may blocklist
2565 it after this timeout. Applications should check this value and ensure
2566 that they set the delegation timeout to a value lower than this.
2569 self.require_state("mounted")
2572 ret = ceph_get_cap_return_timeout(self.cluster)
2574 raise make_ex(ret, "error in get_cap_return_timeout")
2578 def set_uuid(self, uuid):
2580 Set ceph client uuid. Must be called before mount.
2582 :param uuid: the uuid to set
2585 uuid = cstr(uuid, 'uuid')
2591 ceph_set_uuid(self.cluster, _uuid)
2593 def set_session_timeout(self, timeout):
2595 Set ceph client session timeout. Must be called before mount.
2597 :param timeout: the timeout to set
2600 if not isinstance(timeout, int):
2601 raise TypeError('timeout must be an int')
2604 int _timeout = timeout
2607 ceph_set_session_timeout(self.cluster, _timeout)
2609 def get_layout(self, fd):
2611 Set ceph client session timeout. Must be called before mount.
2613 :param fd: file descriptor of the file/directory for which to get the layout
2616 if not isinstance(fd, int):
2617 raise TypeError('fd must be an int')
2627 dict_result = dict()
2630 ret = ceph_get_file_layout(self.cluster, _fd, &stripe_unit, &stripe_count, &object_size, &pool_id)
2632 raise make_ex(stripe_unit, "error in get_file_layout")
2633 dict_result["stripe_unit"] = stripe_unit
2634 dict_result["stripe_count"] = stripe_count
2635 dict_result["object_size"] = object_size
2636 dict_result["pool_id"] = pool_id
2640 buf = <char *>realloc_chk(buf, buflen)
2642 ret = ceph_get_file_pool_name(self.cluster, _fd, buf, buflen)
2644 dict_result["pool_name"] = decode_cstr(buf)
2646 elif ret == -CEPHFS_ERANGE:
2649 raise make_ex(ret, "error in get_file_pool_name")
2654 def get_default_pool(self):
2656 Get the default pool name and id of cephfs. This returns dict{pool_name, pool_id}.
2662 dict_result = dict()
2666 buf = <char *>realloc_chk(buf, buflen)
2668 ret = ceph_get_default_data_pool_name(self.cluster, buf, buflen)
2670 dict_result["pool_name"] = decode_cstr(buf)
2672 elif ret == -CEPHFS_ERANGE:
2675 raise make_ex(ret, "error in get_default_data_pool_name")
2678 ret = ceph_get_pool_id(self.cluster, buf)
2680 raise make_ex(ret, "error in get_pool_id")
2681 dict_result["pool_id"] = ret