2 This module is a thin wrapper around libcephfs.
5 from cpython cimport PyObject, ref, exc
6 from libc cimport errno
7 from libc.stdint cimport *
8 from libc.stdlib cimport malloc, realloc, free
12 from collections import namedtuple
13 from datetime import datetime
19 # Are we running Python 2.x
20 if sys.version_info[0] < 3:
25 AT_NO_ATTR_SYNC = 0x4000
26 AT_SYMLINK_NOFOLLOW = 0x100
27 cdef int AT_SYMLINK_NOFOLLOW_CDEF = AT_SYMLINK_NOFOLLOW
28 CEPH_STATX_BASIC_STATS = 0x7ff
29 cdef int CEPH_STATX_BASIC_STATS_CDEF = CEPH_STATX_BASIC_STATS
31 CEPH_STATX_NLINK = 0x2
34 CEPH_STATX_RDEV = 0x10
35 CEPH_STATX_ATIME = 0x20
36 CEPH_STATX_MTIME = 0x40
37 CEPH_STATX_CTIME = 0x80
38 CEPH_STATX_INO = 0x100
39 CEPH_STATX_SIZE = 0x200
40 CEPH_STATX_BLOCKS = 0x400
41 CEPH_STATX_BTIME = 0x800
42 CEPH_STATX_VERSION = 0x1000
44 cdef extern from "Python.h":
45 # These are in cpython/string.pxd, but use "object" types instead of
46 # PyObject*, which invokes assumptions in cpython that we need to
47 # legitimately break to implement zero-copy string buffers in Image.read().
48 # This is valid use of the Python API and documented as a special case.
49 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
50 char* PyBytes_AsString(PyObject *string) except NULL
51 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
52 void PyEval_InitThreads()
55 cdef extern from "sys/statvfs.h":
57 unsigned long int f_bsize
58 unsigned long int f_frsize
59 unsigned long int f_blocks
60 unsigned long int f_bfree
61 unsigned long int f_bavail
62 unsigned long int f_files
63 unsigned long int f_ffree
64 unsigned long int f_favail
65 unsigned long int f_fsid
66 unsigned long int f_flag
67 unsigned long int f_namemax
68 unsigned long int f_padding[32]
71 IF UNAME_SYSNAME == "FreeBSD":
72 cdef extern from "dirent.h":
75 unsigned short int d_reclen
79 cdef extern from "dirent.h":
82 unsigned long int d_off
83 unsigned short int d_reclen
88 cdef extern from "time.h":
89 ctypedef long int time_t
91 cdef extern from "time.h":
96 cdef extern from "sys/types.h":
97 ctypedef unsigned long mode_t
99 cdef extern from "<utime.h>":
104 cdef extern from "sys/time.h":
109 cdef extern from "cephfs/ceph_ll_client.h":
110 cdef struct statx "ceph_statx":
128 cdef extern from "cephfs/libcephfs.h" nogil:
129 cdef struct ceph_mount_info:
132 cdef struct ceph_dir_result:
135 ctypedef void* rados_t
137 const char *ceph_version(int *major, int *minor, int *patch)
139 int ceph_create(ceph_mount_info **cmount, const char * const id)
140 int ceph_create_from_rados(ceph_mount_info **cmount, rados_t cluster)
141 int ceph_init(ceph_mount_info *cmount)
142 void ceph_shutdown(ceph_mount_info *cmount)
144 int ceph_getaddrs(ceph_mount_info* cmount, char** addrs)
145 int ceph_conf_read_file(ceph_mount_info *cmount, const char *path_list)
146 int ceph_conf_parse_argv(ceph_mount_info *cmount, int argc, const char **argv)
147 int ceph_conf_get(ceph_mount_info *cmount, const char *option, char *buf, size_t len)
148 int ceph_conf_set(ceph_mount_info *cmount, const char *option, const char *value)
150 int ceph_mount(ceph_mount_info *cmount, const char *root)
151 int ceph_select_filesystem(ceph_mount_info *cmount, const char *fs_name)
152 int ceph_unmount(ceph_mount_info *cmount)
153 int ceph_abort_conn(ceph_mount_info *cmount)
154 uint64_t ceph_get_instance_id(ceph_mount_info *cmount)
155 int ceph_fstatx(ceph_mount_info *cmount, int fd, statx *stx, unsigned want, unsigned flags)
156 int ceph_statx(ceph_mount_info *cmount, const char *path, statx *stx, unsigned want, unsigned flags)
157 int ceph_statfs(ceph_mount_info *cmount, const char *path, statvfs *stbuf)
159 int ceph_mds_command(ceph_mount_info *cmount, const char *mds_spec, const char **cmd, size_t cmdlen,
160 const char *inbuf, size_t inbuflen, char **outbuf, size_t *outbuflen,
161 char **outs, size_t *outslen)
162 int ceph_rename(ceph_mount_info *cmount, const char *from_, const char *to)
163 int ceph_link(ceph_mount_info *cmount, const char *existing, const char *newname)
164 int ceph_unlink(ceph_mount_info *cmount, const char *path)
165 int ceph_symlink(ceph_mount_info *cmount, const char *existing, const char *newname)
166 int ceph_readlink(ceph_mount_info *cmount, const char *path, char *buf, int64_t size)
167 int ceph_setxattr(ceph_mount_info *cmount, const char *path, const char *name,
168 const void *value, size_t size, int flags)
169 int ceph_getxattr(ceph_mount_info *cmount, const char *path, const char *name,
170 void *value, size_t size)
171 int ceph_removexattr(ceph_mount_info *cmount, const char *path, const char *name)
172 int ceph_listxattr(ceph_mount_info *cmount, const char *path, char *list, size_t size)
173 int ceph_write(ceph_mount_info *cmount, int fd, const char *buf, int64_t size, int64_t offset)
174 int ceph_read(ceph_mount_info *cmount, int fd, char *buf, int64_t size, int64_t offset)
175 int ceph_flock(ceph_mount_info *cmount, int fd, int operation, uint64_t owner)
176 int ceph_close(ceph_mount_info *cmount, int fd)
177 int ceph_open(ceph_mount_info *cmount, const char *path, int flags, mode_t mode)
178 int ceph_mkdir(ceph_mount_info *cmount, const char *path, mode_t mode)
179 int ceph_mkdirs(ceph_mount_info *cmount, const char *path, mode_t mode)
180 int ceph_closedir(ceph_mount_info *cmount, ceph_dir_result *dirp)
181 int ceph_opendir(ceph_mount_info *cmount, const char *name, ceph_dir_result **dirpp)
182 void ceph_rewinddir(ceph_mount_info *cmount, ceph_dir_result *dirp)
183 int ceph_chdir(ceph_mount_info *cmount, const char *path)
184 dirent * ceph_readdir(ceph_mount_info *cmount, ceph_dir_result *dirp)
185 int ceph_rmdir(ceph_mount_info *cmount, const char *path)
186 const char* ceph_getcwd(ceph_mount_info *cmount)
187 int ceph_sync_fs(ceph_mount_info *cmount)
188 int ceph_fsync(ceph_mount_info *cmount, int fd, int syncdataonly)
189 int ceph_conf_parse_argv(ceph_mount_info *cmount, int argc, const char **argv)
190 int ceph_chmod(ceph_mount_info *cmount, const char *path, mode_t mode)
191 int ceph_chown(ceph_mount_info *cmount, const char *path, int uid, int gid)
192 int ceph_lchown(ceph_mount_info *cmount, const char *path, int uid, int gid)
193 int64_t ceph_lseek(ceph_mount_info *cmount, int fd, int64_t offset, int whence)
194 void ceph_buffer_free(char *buf)
195 mode_t ceph_umask(ceph_mount_info *cmount, mode_t mode)
196 int ceph_utime(ceph_mount_info *cmount, const char *path, utimbuf *buf)
197 int ceph_futime(ceph_mount_info *cmount, int fd, utimbuf *buf)
198 int ceph_utimes(ceph_mount_info *cmount, const char *path, timeval times[2])
199 int ceph_lutimes(ceph_mount_info *cmount, const char *path, timeval times[2])
200 int ceph_futimes(ceph_mount_info *cmount, int fd, timeval times[2])
201 int ceph_futimens(ceph_mount_info *cmount, int fd, timespec times[2])
204 class Error(Exception):
205 def get_error_code(self):
209 class LibCephFSStateError(Error):
213 class OSError(Error):
214 def __init__(self, errno, strerror):
215 super(OSError, self).__init__(errno, strerror)
217 self.strerror = "%s: %s" % (strerror, os.strerror(errno))
220 return '{} [Errno {}]'.format(self.strerror, self.errno)
222 def get_error_code(self):
226 class PermissionError(OSError):
230 class ObjectNotFound(OSError):
234 class NoData(OSError):
238 class ObjectExists(OSError):
242 class IOError(OSError):
246 class NoSpace(OSError):
250 class InvalidValue(OSError):
254 class OperationNotSupported(OSError):
258 class WouldBlock(OSError):
262 class OutOfRange(OSError):
266 class ObjectNotEmpty(OSError):
269 class NotDirectory(OSError):
272 IF UNAME_SYSNAME == "FreeBSD":
273 cdef errno_to_exception = {
274 errno.EPERM : PermissionError,
275 errno.ENOENT : ObjectNotFound,
277 errno.ENOSPC : NoSpace,
278 errno.EEXIST : ObjectExists,
279 errno.ENOATTR : NoData,
280 errno.EINVAL : InvalidValue,
281 errno.EOPNOTSUPP : OperationNotSupported,
282 errno.ERANGE : OutOfRange,
283 errno.EWOULDBLOCK: WouldBlock,
284 errno.ENOTEMPTY : ObjectNotEmpty,
287 cdef errno_to_exception = {
288 errno.EPERM : PermissionError,
289 errno.ENOENT : ObjectNotFound,
291 errno.ENOSPC : NoSpace,
292 errno.EEXIST : ObjectExists,
293 errno.ENODATA : NoData,
294 errno.EINVAL : InvalidValue,
295 errno.EOPNOTSUPP : OperationNotSupported,
296 errno.ERANGE : OutOfRange,
297 errno.EWOULDBLOCK: WouldBlock,
298 errno.ENOTEMPTY : ObjectNotEmpty,
299 errno.ENOTDIR : NotDirectory
303 cdef make_ex(ret, msg):
305 Translate a librados return code into an exception.
307 :param ret: the return code
309 :param msg: the error message to use
311 :returns: a subclass of :class:`Error`
314 if ret in errno_to_exception:
315 return errno_to_exception[ret](ret, msg)
317 return Error(msg + ': {} [Errno {:d}]'.format(os.strerror(ret), ret))
320 class DirEntry(namedtuple('DirEntry',
321 ['d_ino', 'd_off', 'd_reclen', 'd_type', 'd_name'])):
326 return self.d_type == self.DT_DIR
328 def is_symbol_file(self):
329 return self.d_type == self.DT_LNK
332 return self.d_type == self.DT_REG
334 StatResult = namedtuple('StatResult',
335 ["st_dev", "st_ino", "st_mode", "st_nlink", "st_uid",
336 "st_gid", "st_rdev", "st_size", "st_blksize",
337 "st_blocks", "st_atime", "st_mtime", "st_ctime"])
339 cdef class DirResult(object):
341 cdef ceph_dir_result* handle
343 # Bug in older Cython instances prevents this from being a static method.
345 # cdef create(LibCephFS lib, ceph_dir_result* handle):
351 def __dealloc__(self):
356 raise make_ex(errno.EBADF, "dir is not open")
357 self.lib.require_state("mounted")
359 ceph_rewinddir(self.lib.cluster, self.handle)
362 def __exit__(self, type_, value, traceback):
367 self.lib.require_state("mounted")
370 dirent = ceph_readdir(self.lib.cluster, self.handle)
374 IF UNAME_SYSNAME == "FreeBSD":
375 return DirEntry(d_ino=dirent.d_ino,
377 d_reclen=dirent.d_reclen,
378 d_type=dirent.d_type,
379 d_name=dirent.d_name)
381 return DirEntry(d_ino=dirent.d_ino,
383 d_reclen=dirent.d_reclen,
384 d_type=dirent.d_type,
385 d_name=dirent.d_name)
389 self.lib.require_state("mounted")
391 ret = ceph_closedir(self.lib.cluster, self.handle)
393 raise make_ex(ret, "closedir failed")
396 def cstr(val, name, encoding="utf-8", opt=False):
398 Create a byte string from a Python string
400 :param basestring val: Python string
401 :param str name: Name of the string parameter, for exceptions
402 :param str encoding: Encoding to use
403 :param bool opt: If True, None is allowed
405 :raises: :class:`InvalidArgument`
407 if opt and val is None:
409 if isinstance(val, bytes):
413 v = val.encode(encoding)
415 raise TypeError('%s must be encodeable as a bytearray' % name)
416 assert isinstance(v, bytes)
419 def cstr_list(list_str, name, encoding="utf-8"):
420 return [cstr(s, name) for s in list_str]
423 def decode_cstr(val, encoding="utf-8"):
425 Decode a byte string into a Python string.
427 :param bytes val: byte string
428 :rtype: unicode or None
433 return val.decode(encoding)
435 cdef timeval to_timeval(t):
437 return timeval equivalent from time
440 cdef timeval buf = timeval(tt, (t - tt) * 1000000)
443 cdef timespec to_timespec(t):
445 return timespec equivalent from time
448 cdef timespec buf = timespec(tt, (t - tt) * 1000000000)
451 cdef char* opt_str(s) except? NULL:
457 cdef char ** to_bytes_array(list_bytes):
458 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
460 raise MemoryError("malloc failed")
461 for i in range(len(list_bytes)):
462 ret[i] = <char *>list_bytes[i]
466 cdef void* realloc_chk(void* ptr, size_t size) except NULL:
467 cdef void *ret = realloc(ptr, size)
469 raise MemoryError("realloc failed")
473 cdef class LibCephFS(object):
474 """libcephfs python wrapper"""
476 cdef public object state
477 cdef ceph_mount_info *cluster
479 def require_state(self, *args):
480 if self.state in args:
482 raise LibCephFSStateError("You cannot perform that operation on a "
483 "CephFS object in state %s." % (self.state))
485 def __cinit__(self, conf=None, conffile=None, auth_id=None, rados_inst=None):
486 """Create a libcephfs wrapper
488 :param conf dict opt: settings overriding the default ones and conffile
489 :param conffile str opt: the path to ceph.conf to override the default settings
490 :auth_id str opt: the id used to authenticate the client entity
491 :rados_inst Rados opt: a rados.Rados instance
494 self.state = "uninitialized"
495 if rados_inst is not None:
496 if auth_id is not None or conffile is not None or conf is not None:
497 raise make_ex(errno.EINVAL,
498 "May not pass RADOS instance as well as other configuration")
500 self.create_with_rados(rados_inst)
502 self.create(conf, conffile, auth_id)
504 def create_with_rados(self, rados.Rados rados_inst):
507 ret = ceph_create_from_rados(&self.cluster, rados_inst.cluster)
509 raise Error("libcephfs_initialize failed with error code: %d" % ret)
510 self.state = "configuring"
512 def create(self, conf=None, conffile=None, auth_id=None):
514 Create a mount handle for interacting with Ceph. All libcephfs
515 functions operate on a mount info handle.
517 :param conf dict opt: settings overriding the default ones and conffile
518 :param conffile str opt: the path to ceph.conf to override the default settings
519 :auth_id str opt: the id used to authenticate the client entity
521 if conf is not None and not isinstance(conf, dict):
522 raise TypeError("conf must be dict or None")
523 cstr(conffile, 'configfile', opt=True)
524 auth_id = cstr(auth_id, 'auth_id', opt=True)
527 char* _auth_id = opt_str(auth_id)
531 ret = ceph_create(&self.cluster, <const char*>_auth_id)
533 raise Error("libcephfs_initialize failed with error code: %d" % ret)
535 self.state = "configuring"
536 if conffile is not None:
537 # read the default conf file when '' is given
540 self.conf_read_file(conffile)
542 for key, value in conf.iteritems():
543 self.conf_set(key, value)
547 Get associated client addresses with this RADOS session.
549 self.require_state("mounted")
557 ret = ceph_getaddrs(self.cluster, &addrs)
559 raise make_ex(ret, "error calling getaddrs")
561 return decode_cstr(addrs)
563 ceph_buffer_free(addrs)
566 def conf_read_file(self, conffile=None):
568 Load the ceph configuration from the specified config file.
570 :param conffile str opt: the path to ceph.conf to override the default settings
572 conffile = cstr(conffile, 'conffile', opt=True)
574 char *_conffile = opt_str(conffile)
576 ret = ceph_conf_read_file(self.cluster, <const char*>_conffile)
578 raise make_ex(ret, "error calling conf_read_file")
580 def conf_parse_argv(self, argv):
582 Parse the command line arguments and load the configuration parameters.
584 :param argv: the argument list
586 self.require_state("configuring")
587 cargv = cstr_list(argv, 'argv')
589 int _argc = len(argv)
590 char **_argv = to_bytes_array(cargv)
594 ret = ceph_conf_parse_argv(self.cluster, _argc,
595 <const char **>_argv)
597 raise make_ex(ret, "error calling conf_parse_argv")
603 Unmount and destroy the ceph mount handle.
605 if self.state in ["initialized", "mounted"]:
607 ceph_shutdown(self.cluster)
608 self.state = "shutdown"
614 def __exit__(self, type_, value, traceback):
618 def __dealloc__(self):
623 Get the version number of the ``libcephfs`` C library.
625 :returns: a tuple of ``(major, minor, extra)`` components of the
633 ceph_version(&major, &minor, &extra)
634 return (major, minor, extra)
636 def conf_get(self, option):
638 Gets the configuration value as a string.
640 :param option: the config option to get
642 self.require_state("configuring", "initialized", "mounted")
644 option = cstr(option, 'option')
646 char *_option = option
652 ret_buf = <char *>realloc_chk(ret_buf, length)
654 ret = ceph_conf_get(self.cluster, _option, ret_buf, length)
656 return decode_cstr(ret_buf)
657 elif ret == -errno.ENAMETOOLONG:
659 elif ret == -errno.ENOENT:
662 raise make_ex(ret, "error calling conf_get")
666 def conf_set(self, option, val):
668 Sets a configuration value from a string.
670 :param option: the configuration option to set
671 :param value: the value of the configuration option to set
673 self.require_state("configuring", "initialized", "mounted")
675 option = cstr(option, 'option')
676 val = cstr(val, 'val')
678 char *_option = option
682 ret = ceph_conf_set(self.cluster, _option, _val)
684 raise make_ex(ret, "error calling conf_set")
688 Initialize the filesystem client (but do not mount the filesystem yet)
690 self.require_state("configuring")
692 ret = ceph_init(self.cluster)
694 raise make_ex(ret, "error calling ceph_init")
695 self.state = "initialized"
697 def mount(self, mount_root=None, filesystem_name=None):
699 Perform a mount using the path for the root of the mount.
701 if self.state == "configuring":
703 self.require_state("initialized")
705 # Configure which filesystem to mount if one was specified
706 if filesystem_name is None:
707 filesystem_name = b""
709 filesystem_name = cstr(filesystem_name, 'filesystem_name')
711 char *_filesystem_name = filesystem_name
714 ret = ceph_select_filesystem(self.cluster,
717 raise make_ex(ret, "error calling ceph_select_filesystem")
719 # Prepare mount_root argument, default to "/"
720 root = b"/" if mount_root is None else mount_root
722 char *_mount_root = root
725 ret = ceph_mount(self.cluster, _mount_root)
727 raise make_ex(ret, "error calling ceph_mount")
728 self.state = "mounted"
732 Unmount a mount handle.
734 self.require_state("mounted")
736 ret = ceph_unmount(self.cluster)
738 raise make_ex(ret, "error calling ceph_unmount")
739 self.state = "initialized"
741 def abort_conn(self):
743 Abort mds connections.
745 self.require_state("mounted")
747 ret = ceph_abort_conn(self.cluster)
749 raise make_ex(ret, "error calling ceph_abort_conn")
750 self.state = "initialized"
752 def get_instance_id(self):
754 Get a global id for current instance
756 self.require_state("initialized", "mounted")
758 ret = ceph_get_instance_id(self.cluster)
761 def statfs(self, path):
763 Perform a statfs on the ceph file system. This call fills in file system wide statistics
764 into the passed in buffer.
766 :param path: any path within the mounted filesystem
768 self.require_state("mounted")
769 path = cstr(path, 'path')
775 ret = ceph_statfs(self.cluster, _path, &statbuf)
777 raise make_ex(ret, "statfs failed: %s" % path)
778 return {'f_bsize': statbuf.f_bsize,
779 'f_frsize': statbuf.f_frsize,
780 'f_blocks': statbuf.f_blocks,
781 'f_bfree': statbuf.f_bfree,
782 'f_bavail': statbuf.f_bavail,
783 'f_files': statbuf.f_files,
784 'f_ffree': statbuf.f_ffree,
785 'f_favail': statbuf.f_favail,
786 'f_fsid': statbuf.f_fsid,
787 'f_flag': statbuf.f_flag,
788 'f_namemax': statbuf.f_namemax}
792 Synchronize all filesystem data to persistent media
794 self.require_state("mounted")
796 ret = ceph_sync_fs(self.cluster)
798 raise make_ex(ret, "sync_fs failed")
800 def fsync(self, int fd, int syncdataonly):
802 Synchronize an open file to persistent media.
804 :param fd: the file descriptor of the file to sync.
805 :param syncdataonly: a boolean whether to synchronize metadata and data (0)
808 self.require_state("mounted")
810 ret = ceph_fsync(self.cluster, fd, syncdataonly)
812 raise make_ex(ret, "fsync failed")
816 Get the current working directory.
818 :rtype the path to the current working directory
820 self.require_state("mounted")
822 ret = ceph_getcwd(self.cluster)
825 def chdir(self, path):
827 Change the current working directory.
829 :param path the path to the working directory to change into.
831 self.require_state("mounted")
833 path = cstr(path, 'path')
834 cdef char* _path = path
836 ret = ceph_chdir(self.cluster, _path)
838 raise make_ex(ret, "chdir failed")
840 def opendir(self, path):
842 Open the given directory.
844 :param path: the path name of the directory to open. Must be either an absolute path
845 or a path relative to the current working directory.
846 :rtype handle: the open directory stream handle
848 self.require_state("mounted")
850 path = cstr(path, 'path')
853 ceph_dir_result* handle
855 ret = ceph_opendir(self.cluster, _path, &handle);
857 raise make_ex(ret, "opendir failed")
863 def readdir(self, DirResult handle):
865 Get the next entry in an open directory.
867 :param handle: the open directory stream handle
868 :rtype dentry: the next directory entry or None if at the end of the
869 directory (or the directory is empty. This pointer
870 should not be freed by the caller, and is only safe to
871 access between return and the next call to readdir or
874 self.require_state("mounted")
876 return handle.readdir()
878 def closedir(self, DirResult handle):
880 Close the open directory.
882 :param handle: the open directory stream handle
884 self.require_state("mounted")
886 return handle.close()
888 def mkdir(self, path, mode):
892 :param path: the path of the directory to create. This must be either an
893 absolute path or a relative path off of the current working directory.
894 :param mode the permissions the directory should have once created.
897 self.require_state("mounted")
898 path = cstr(path, 'path')
899 if not isinstance(mode, int):
900 raise TypeError('mode must be an int')
905 ret = ceph_mkdir(self.cluster, _path, _mode)
907 raise make_ex(ret, "error in mkdir {}".format(path.decode('utf-8')))
909 def chmod(self, path, mode) :
911 Change directory mode.
912 :param path: the path of the directory to create. This must be either an
913 absolute path or a relative path off of the current working directory.
914 :param mode the permissions the directory should have once created.
916 self.require_state("mounted")
917 path = cstr(path, 'path')
918 if not isinstance(mode, int):
919 raise TypeError('mode must be an int')
924 ret = ceph_chmod(self.cluster, _path, _mode)
926 raise make_ex(ret, "error in chmod {}".format(path.decode('utf-8')))
928 def chown(self, path, uid, gid, follow_symlink=True):
930 Change directory ownership
931 :param path: the path of the directory to change.
932 :param uid: the uid to set
933 :param gid: the gid to set
934 :param follow_symlink: perform the operation on the target file if @path
935 is a symbolic link (default)
937 self.require_state("mounted")
938 path = cstr(path, 'path')
939 if not isinstance(uid, int):
940 raise TypeError('uid must be an int')
941 elif not isinstance(gid, int):
942 raise TypeError('gid must be an int')
950 ret = ceph_chown(self.cluster, _path, _uid, _gid)
953 ret = ceph_lchown(self.cluster, _path, _uid, _gid)
955 raise make_ex(ret, "error in chown {}".format(path.decode('utf-8')))
957 def lchown(self, path, uid, gid):
959 Change ownership of a symbolic link
960 :param path: the path of the symbolic link to change.
961 :param uid: the uid to set
962 :param gid: the gid to set
964 self.chown(path, uid, gid, follow_symlink=False)
966 def mkdirs(self, path, mode):
968 Create multiple directories at once.
970 :param path: the full path of directories and sub-directories that should
972 :param mode the permissions the directory should have once created
974 self.require_state("mounted")
975 path = cstr(path, 'path')
976 if not isinstance(mode, int):
977 raise TypeError('mode must be an int')
983 ret = ceph_mkdirs(self.cluster, _path, _mode)
985 raise make_ex(ret, "error in mkdirs {}".format(path.decode('utf-8')))
987 def rmdir(self, path):
991 :param path: the path of the directory to remove.
993 self.require_state("mounted")
994 path = cstr(path, 'path')
995 cdef char* _path = path
996 ret = ceph_rmdir(self.cluster, _path)
998 raise make_ex(ret, "error in rmdir {}".format(path.decode('utf-8')))
1000 def open(self, path, flags, mode=0):
1002 Create and/or open a file.
1004 :param path: the path of the file to open. If the flags parameter includes O_CREAT,
1005 the file will first be created before opening.
1006 :param flags: set of option masks that control how the file is created/opened.
1007 :param mode: the permissions to place on the file if the file does not exist and O_CREAT
1008 is specified in the flags.
1010 self.require_state("mounted")
1011 path = cstr(path, 'path')
1013 if not isinstance(mode, int):
1014 raise TypeError('mode must be an int')
1015 if isinstance(flags, str):
1018 cephfs_flags = os.O_RDONLY
1026 cephfs_flags |= os.O_TRUNC | os.O_CREAT
1027 elif access_flags > 0 and c == '+':
1030 raise make_ex(errno.EOPNOTSUPP,
1031 "open flags doesn't support %s" % c)
1033 if access_flags == 1:
1034 cephfs_flags |= os.O_RDONLY;
1035 elif access_flags == 2:
1036 cephfs_flags |= os.O_WRONLY;
1038 cephfs_flags |= os.O_RDWR;
1040 elif isinstance(flags, int):
1041 cephfs_flags = flags
1043 raise TypeError("flags must be a string or an integer")
1047 int _flags = cephfs_flags
1051 ret = ceph_open(self.cluster, _path, _flags, _mode)
1053 raise make_ex(ret, "error in open {}".format(path.decode('utf-8')))
1056 def close(self, fd):
1058 Close the open file.
1060 :param fd: the file descriptor referring to the open file.
1063 self.require_state("mounted")
1064 if not isinstance(fd, int):
1065 raise TypeError('fd must be an int')
1068 ret = ceph_close(self.cluster, _fd)
1070 raise make_ex(ret, "error in close")
1072 def read(self, fd, offset, l):
1074 Read data from the file.
1076 :param fd : the file descriptor of the open file to read from.
1077 :param offset : the offset in the file to read from. If this value is negative, the
1078 function reads from the current offset of the file descriptor.
1079 :param l : the flag to indicate what type of seeking to perform
1081 self.require_state("mounted")
1082 if not isinstance(offset, int):
1083 raise TypeError('offset must be an int')
1084 if not isinstance(l, int):
1085 raise TypeError('l must be an int')
1086 if not isinstance(fd, int):
1087 raise TypeError('fd must be an int')
1090 int64_t _offset = offset
1094 PyObject* ret_s = NULL
1096 ret_s = PyBytes_FromStringAndSize(NULL, _length)
1098 ret_buf = PyBytes_AsString(ret_s)
1100 ret = ceph_read(self.cluster, _fd, ret_buf, _length, _offset)
1102 raise make_ex(ret, "error in read")
1105 _PyBytes_Resize(&ret_s, ret)
1107 return <object>ret_s
1109 # We DECREF unconditionally: the cast to object above will have
1110 # INCREFed if necessary. This also takes care of exceptions,
1111 # including if _PyString_Resize fails (that will free the string
1112 # itself and set ret_s to NULL, hence XDECREF).
1113 ref.Py_XDECREF(ret_s)
1115 def write(self, fd, buf, offset):
1117 Write data to a file.
1119 :param fd : the file descriptor of the open file to write to
1120 :param buf : the bytes to write to the file
1121 :param offset : the offset of the file write into. If this value is negative, the
1122 function writes to the current offset of the file descriptor.
1124 self.require_state("mounted")
1125 if not isinstance(fd, int):
1126 raise TypeError('fd must be an int')
1127 if not isinstance(buf, bytes):
1128 raise TypeError('buf must be a bytes')
1129 if not isinstance(offset, int):
1130 raise TypeError('offset must be an int')
1135 int64_t _offset = offset
1137 size_t length = len(buf)
1140 ret = ceph_write(self.cluster, _fd, _data, length, _offset)
1142 raise make_ex(ret, "error in write")
1145 def flock(self, fd, operation, owner):
1147 Apply or remove an advisory lock.
1149 :param fd: the open file descriptor to change advisory lock.
1150 :param operation: the advisory lock operation to be performed on the file
1151 :param owner: the user-supplied owner identifier (an arbitrary integer)
1153 self.require_state("mounted")
1154 if not isinstance(fd, int):
1155 raise TypeError('fd must be an int')
1156 if not isinstance(operation, int):
1157 raise TypeError('operation must be an int')
1162 uint64_t _owner = owner
1165 ret = ceph_flock(self.cluster, _fd, _op, _owner)
1167 raise make_ex(ret, "error in write")
1170 def getxattr(self, path, name, size=255):
1172 Get an extended attribute.
1174 :param path: the path to the file
1175 :param name: the name of the extended attribute to get
1176 :param size: the size of the pre-allocated buffer
1178 self.require_state("mounted")
1180 path = cstr(path, 'path')
1181 name = cstr(name, 'name')
1187 size_t ret_length = size
1188 char *ret_buf = NULL
1191 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
1193 ret = ceph_getxattr(self.cluster, _path, _name, ret_buf,
1197 raise make_ex(ret, "error in getxattr")
1199 return ret_buf[:ret]
1203 def setxattr(self, path, name, value, flags):
1205 Set an extended attribute on a file.
1207 :param path: the path to the file.
1208 :param name: the name of the extended attribute to set.
1209 :param value: the bytes of the extended attribute value
1211 self.require_state("mounted")
1213 name = cstr(name, 'name')
1214 path = cstr(path, 'path')
1215 if not isinstance(flags, int):
1216 raise TypeError('flags must be a int')
1217 if not isinstance(value, bytes):
1218 raise TypeError('value must be a bytes')
1223 char *_value = value
1224 size_t _value_len = len(value)
1228 ret = ceph_setxattr(self.cluster, _path, _name,
1229 _value, _value_len, _flags)
1231 raise make_ex(ret, "error in setxattr")
1233 def removexattr(self, path, name):
1235 Remove an extended attribute of a file.
1237 :param path: path of the file.
1238 :param name: name of the extended attribute to remove.
1240 self.require_state("mounted")
1242 name = cstr(name, 'name')
1243 path = cstr(path, 'path')
1250 ret = ceph_removexattr(self.cluster, _path, _name)
1252 raise make_ex(ret, "error in removexattr")
1254 def listxattr(self, path, size=65536):
1256 List the extended attribute keys set on a file.
1258 :param path: path of the file.
1259 :param size: the size of list buffer to be filled with extended attribute keys.
1261 self.require_state("mounted")
1263 path = cstr(path, 'path')
1267 char *ret_buf = NULL
1268 size_t ret_length = size
1271 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
1273 ret = ceph_listxattr(self.cluster, _path, ret_buf, ret_length)
1276 raise make_ex(ret, "error in listxattr")
1278 return ret, ret_buf[:ret]
1282 def stat(self, path, follow_symlink=True):
1284 Get a file's extended statistics and attributes.
1286 :param path: the file or directory to get the statistics of.
1288 self.require_state("mounted")
1289 path = cstr(path, 'path')
1297 ret = ceph_statx(self.cluster, _path, &stx,
1298 CEPH_STATX_BASIC_STATS_CDEF, 0)
1301 ret = ceph_statx(self.cluster, _path, &stx,
1302 CEPH_STATX_BASIC_STATS_CDEF, AT_SYMLINK_NOFOLLOW_CDEF)
1305 raise make_ex(ret, "error in stat: {}".format(path.decode('utf-8')))
1306 return StatResult(st_dev=stx.stx_dev, st_ino=stx.stx_ino,
1307 st_mode=stx.stx_mode, st_nlink=stx.stx_nlink,
1308 st_uid=stx.stx_uid, st_gid=stx.stx_gid,
1309 st_rdev=stx.stx_rdev, st_size=stx.stx_size,
1310 st_blksize=stx.stx_blksize,
1311 st_blocks=stx.stx_blocks,
1312 st_atime=datetime.fromtimestamp(stx.stx_atime.tv_sec),
1313 st_mtime=datetime.fromtimestamp(stx.stx_mtime.tv_sec),
1314 st_ctime=datetime.fromtimestamp(stx.stx_ctime.tv_sec))
1316 def lstat(self, path):
1318 Get a file's extended statistics and attributes. When file's a
1319 symbolic link, return the informaion of the link itself rather
1320 than that of the file it points too.
1322 :param path: the file or directory to get the statistics of.
1324 return self.stat(path, follow_symlink=False)
1326 def fstat(self, fd):
1328 Get an open file's extended statistics and attributes.
1330 :param fd: the file descriptor of the file to get statistics of.
1332 self.require_state("mounted")
1333 if not isinstance(fd, int):
1334 raise TypeError('fd must be an int')
1341 ret = ceph_fstatx(self.cluster, _fd, &stx,
1342 CEPH_STATX_BASIC_STATS_CDEF, 0)
1344 raise make_ex(ret, "error in fsat")
1345 return StatResult(st_dev=stx.stx_dev, st_ino=stx.stx_ino,
1346 st_mode=stx.stx_mode, st_nlink=stx.stx_nlink,
1347 st_uid=stx.stx_uid, st_gid=stx.stx_gid,
1348 st_rdev=stx.stx_rdev, st_size=stx.stx_size,
1349 st_blksize=stx.stx_blksize,
1350 st_blocks=stx.stx_blocks,
1351 st_atime=datetime.fromtimestamp(stx.stx_atime.tv_sec),
1352 st_mtime=datetime.fromtimestamp(stx.stx_mtime.tv_sec),
1353 st_ctime=datetime.fromtimestamp(stx.stx_ctime.tv_sec))
1355 def statx(self, path, mask, flag):
1357 Get a file's extended statistics and attributes.
1359 :param path: the file or directory to get the statistics of.
1360 :param mask: want bitfield of CEPH_STATX_* flags showing designed attributes.
1361 :param flag: bitfield that can be used to set AT_* modifier flags (only AT_NO_ATTR_SYNC and AT_SYMLINK_NOFOLLOW)
1364 self.require_state("mounted")
1365 path = cstr(path, 'path')
1366 if not isinstance(mask, int):
1367 raise TypeError('flag must be a int')
1368 if not isinstance(flag, int):
1369 raise TypeError('flag must be a int')
1376 dict_result = dict()
1379 ret = ceph_statx(self.cluster, _path, &stx, _mask, _flag)
1381 raise make_ex(ret, "error in stat: %s" % path)
1383 if (_mask & CEPH_STATX_MODE):
1384 dict_result["mode"] = stx.stx_mode
1385 if (_mask & CEPH_STATX_NLINK):
1386 dict_result["nlink"] = stx.stx_nlink
1387 if (_mask & CEPH_STATX_UID):
1388 dict_result["uid"] = stx.stx_uid
1389 if (_mask & CEPH_STATX_GID):
1390 dict_result["gid"] = stx.stx_gid
1391 if (_mask & CEPH_STATX_RDEV):
1392 dict_result["rdev"] = stx.stx_rdev
1393 if (_mask & CEPH_STATX_ATIME):
1394 dict_result["atime"] = datetime.fromtimestamp(stx.stx_atime.tv_sec)
1395 if (_mask & CEPH_STATX_MTIME):
1396 dict_result["mtime"] = datetime.fromtimestamp(stx.stx_mtime.tv_sec)
1397 if (_mask & CEPH_STATX_CTIME):
1398 dict_result["ctime"] = datetime.fromtimestamp(stx.stx_ctime.tv_sec)
1399 if (_mask & CEPH_STATX_INO):
1400 dict_result["ino"] = stx.stx_ino
1401 if (_mask & CEPH_STATX_SIZE):
1402 dict_result["size"] = stx.stx_size
1403 if (_mask & CEPH_STATX_BLOCKS):
1404 dict_result["blocks"] = stx.stx_blocks
1405 if (_mask & CEPH_STATX_BTIME):
1406 dict_result["btime"] = datetime.fromtimestamp(stx.stx_btime.tv_sec)
1407 if (_mask & CEPH_STATX_VERSION):
1408 dict_result["version"] = stx.stx_version
1412 def symlink(self, existing, newname):
1414 Creates a symbolic link.
1416 :param existing: the path to the existing file/directory to link to.
1417 :param newname: the path to the new file/directory to link from.
1419 self.require_state("mounted")
1420 existing = cstr(existing, 'existing')
1421 newname = cstr(newname, 'newname')
1423 char* _existing = existing
1424 char* _newname = newname
1427 ret = ceph_symlink(self.cluster, _existing, _newname)
1429 raise make_ex(ret, "error in symlink")
1431 def link(self, existing, newname):
1435 :param existing: the path to the existing file/directory to link to.
1436 :param newname: the path to the new file/directory to link from.
1439 self.require_state("mounted")
1440 existing = cstr(existing, 'existing')
1441 newname = cstr(newname, 'newname')
1443 char* _existing = existing
1444 char* _newname = newname
1447 ret = ceph_link(self.cluster, _existing, _newname)
1449 raise make_ex(ret, "error in link")
1451 def readlink(self, path, size):
1453 Read a symbolic link.
1455 :param path: the path to the symlink to read
1456 :param size: the length of the buffer
1457 :rtype buf: buffer to hold the path of the file that the symlink points to.
1459 self.require_state("mounted")
1460 path = cstr(path, 'path')
1464 int64_t _size = size
1468 buf = <char *>realloc_chk(buf, _size)
1470 ret = ceph_readlink(self.cluster, _path, buf, _size)
1472 raise make_ex(ret, "error in readlink")
1477 def unlink(self, path):
1479 Removes a file, link, or symbolic link. If the file/link has multiple links to it, the
1480 file will not disappear from the namespace until all references to it are removed.
1482 :param path: the path of the file or link to unlink.
1484 self.require_state("mounted")
1485 path = cstr(path, 'path')
1486 cdef char* _path = path
1488 ret = ceph_unlink(self.cluster, _path)
1490 raise make_ex(ret, "error in unlink: {}".format(path.decode('utf-8')))
1492 def rename(self, src, dst):
1494 Rename a file or directory.
1496 :param src: the path to the existing file or directory.
1497 :param dst: the new name of the file or directory.
1500 self.require_state("mounted")
1502 src = cstr(src, 'source')
1503 dst = cstr(dst, 'destination')
1510 ret = ceph_rename(self.cluster, _src, _dst)
1512 raise make_ex(ret, "error in rename {} to {}".format(src.decode(
1513 'utf-8'), dst.decode('utf-8')))
1515 def mds_command(self, mds_spec, args, input_data):
1517 :return 3-tuple of output status int, output status string, output data
1519 mds_spec = cstr(mds_spec, 'mds_spec')
1520 args = cstr_list(args, 'args')
1521 input_data = cstr(input_data, 'input_data')
1524 char *_mds_spec = opt_str(mds_spec)
1525 char **_cmd = to_bytes_array(args)
1526 size_t _cmdlen = len(args)
1528 char *_inbuf = input_data
1529 size_t _inbuf_len = len(input_data)
1531 char *_outbuf = NULL
1532 size_t _outbuf_len = 0
1534 size_t _outs_len = 0
1538 ret = ceph_mds_command(self.cluster, _mds_spec,
1539 <const char **>_cmd, _cmdlen,
1540 <const char*>_inbuf, _inbuf_len,
1541 &_outbuf, &_outbuf_len,
1543 my_outs = decode_cstr(_outs[:_outs_len])
1544 my_outbuf = _outbuf[:_outbuf_len]
1546 ceph_buffer_free(_outs)
1548 ceph_buffer_free(_outbuf)
1549 return (ret, my_outbuf, my_outs)
1553 def umask(self, mode) :
1554 self.require_state("mounted")
1558 ret = ceph_umask(self.cluster, _mode)
1560 raise make_ex(ret, "error in umask")
1563 def lseek(self, fd, offset, whence):
1565 Set the file's current position.
1567 :param fd : the file descriptor of the open file to read from.
1568 :param offset : the offset in the file to read from. If this value is negative, the
1569 function reads from the current offset of the file descriptor.
1570 :param whence : the flag to indicate what type of seeking to performs:SEEK_SET, SEEK_CUR, SEEK_END
1572 self.require_state("mounted")
1573 if not isinstance(fd, int):
1574 raise TypeError('fd must be an int')
1575 if not isinstance(offset, int):
1576 raise TypeError('offset must be an int')
1577 if not isinstance(whence, int):
1578 raise TypeError('whence must be an int')
1582 int64_t _offset = offset
1583 int64_t _whence = whence
1586 ret = ceph_lseek(self.cluster, _fd, _offset, _whence)
1589 raise make_ex(ret, "error in lseek")
1593 def utime(self, path, times=None):
1595 Set access and modification time for path
1597 :param path: file path for which timestamps have to be changed
1598 :param times: if times is not None, it must be a tuple (atime, mtime)
1601 self.require_state("mounted")
1602 path = cstr(path, 'path')
1604 if not isinstance(times, tuple):
1605 raise TypeError('times must be a tuple')
1606 if not isinstance(times[0], int):
1607 raise TypeError('atime must be an int')
1608 if not isinstance(times[1], int):
1609 raise TypeError('mtime must be an int')
1610 actime = modtime = int(time.time())
1617 utimbuf buf = utimbuf(actime, modtime)
1619 ret = ceph_utime(self.cluster, pth, &buf)
1621 raise make_ex(ret, "error in utime {}".format(path.decode('utf-8')))
1623 def futime(self, fd, times=None):
1625 Set access and modification time for a file pointed by descriptor
1627 :param fd: file descriptor of the open file
1628 :param times: if times is not None, it must be a tuple (atime, mtime)
1631 self.require_state("mounted")
1632 if not isinstance(fd, int):
1633 raise TypeError('fd must be an int')
1635 if not isinstance(times, tuple):
1636 raise TypeError('times must be a tuple')
1637 if not isinstance(times[0], int):
1638 raise TypeError('atime must be an int')
1639 if not isinstance(times[1], int):
1640 raise TypeError('mtime must be an int')
1641 actime = modtime = int(time.time())
1648 utimbuf buf = utimbuf(actime, modtime)
1650 ret = ceph_futime(self.cluster, _fd, &buf)
1652 raise make_ex(ret, "error in futime")
1654 def utimes(self, path, times=None, follow_symlink=True):
1656 Set access and modification time for path
1658 :param path: file path for which timestamps have to be changed
1659 :param times: if times is not None, it must be a tuple (atime, mtime)
1660 :param follow_symlink: perform the operation on the target file if @path
1661 is a symbolic link (default)
1664 self.require_state("mounted")
1665 path = cstr(path, 'path')
1667 if not isinstance(times, tuple):
1668 raise TypeError('times must be a tuple')
1669 if not isinstance(times[0], (int, float)):
1670 raise TypeError('atime must be an int or a float')
1671 if not isinstance(times[1], (int, float)):
1672 raise TypeError('mtime must be an int or a float')
1673 actime = modtime = time.time()
1675 actime = float(times[0])
1676 modtime = float(times[1])
1680 timeval *buf = [to_timeval(actime), to_timeval(modtime)]
1683 ret = ceph_utimes(self.cluster, pth, buf)
1686 ret = ceph_lutimes(self.cluster, pth, buf)
1688 raise make_ex(ret, "error in utimes {}".format(path.decode('utf-8')))
1690 def lutimes(self, path, times=None):
1692 Set access and modification time for a file. If the file is a symbolic
1693 link do not follow to the target.
1695 :param path: file path for which timestamps have to be changed
1696 :param times: if times is not None, it must be a tuple (atime, mtime)
1698 self.utimes(path, times=times, follow_symlink=False)
1700 def futimes(self, fd, times=None):
1702 Set access and modification time for a file pointer by descriptor
1704 :param fd: file descriptor of the open file
1705 :param times: if times is not None, it must be a tuple (atime, mtime)
1708 self.require_state("mounted")
1709 if not isinstance(fd, int):
1710 raise TypeError('fd must be an int')
1712 if not isinstance(times, tuple):
1713 raise TypeError('times must be a tuple')
1714 if not isinstance(times[0], (int, float)):
1715 raise TypeError('atime must be an int or a float')
1716 if not isinstance(times[1], (int, float)):
1717 raise TypeError('mtime must be an int or a float')
1718 actime = modtime = time.time()
1720 actime = float(times[0])
1721 modtime = float(times[1])
1725 timeval *buf = [to_timeval(actime), to_timeval(modtime)]
1727 ret = ceph_futimes(self.cluster, _fd, buf)
1729 raise make_ex(ret, "error in futimes")
1731 def futimens(self, fd, times=None):
1733 Set access and modification time for a file pointer by descriptor
1735 :param fd: file descriptor of the open file
1736 :param times: if times is not None, it must be a tuple (atime, mtime)
1739 self.require_state("mounted")
1740 if not isinstance(fd, int):
1741 raise TypeError('fd must be an int')
1743 if not isinstance(times, tuple):
1744 raise TypeError('times must be a tuple')
1745 if not isinstance(times[0], (int, float)):
1746 raise TypeError('atime must be an int or a float')
1747 if not isinstance(times[1], (int, float)):
1748 raise TypeError('mtime must be an int or a float')
1749 actime = modtime = time.time()
1751 actime = float(times[0])
1752 modtime = float(times[1])
1756 timespec *buf = [to_timespec(actime), to_timespec(modtime)]
1758 ret = ceph_futimens(self.cluster, _fd, buf)
1760 raise make_ex(ret, "error in futimens")