2 This module is a thin wrapper around libcephfs.
5 from cpython cimport PyObject, ref, exc
6 from libc cimport errno
7 from libc.stdint cimport *
8 from libc.stdlib cimport malloc, realloc, free
12 from collections import namedtuple
13 from datetime import datetime
18 # Are we running Python 2.x
19 if sys.version_info[0] < 3:
25 cdef extern from "Python.h":
26 # These are in cpython/string.pxd, but use "object" types instead of
27 # PyObject*, which invokes assumptions in cpython that we need to
28 # legitimately break to implement zero-copy string buffers in Image.read().
29 # This is valid use of the Python API and documented as a special case.
30 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
31 char* PyBytes_AsString(PyObject *string) except NULL
32 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
33 void PyEval_InitThreads()
36 cdef extern from "sys/statvfs.h":
38 unsigned long int f_bsize
39 unsigned long int f_frsize
40 unsigned long int f_blocks
41 unsigned long int f_bfree
42 unsigned long int f_bavail
43 unsigned long int f_files
44 unsigned long int f_ffree
45 unsigned long int f_favail
46 unsigned long int f_fsid
47 unsigned long int f_flag
48 unsigned long int f_namemax
49 unsigned long int f_padding[32]
52 cdef extern from "dirent.h":
55 unsigned long int d_off
56 unsigned short int d_reclen
61 cdef extern from "time.h":
62 ctypedef long int time_t
64 cdef extern from "time.h":
69 cdef extern from "sys/types.h":
70 ctypedef unsigned long mode_t
72 cdef extern from "cephfs/ceph_statx.h":
73 cdef struct statx "ceph_statx":
91 cdef extern from "cephfs/libcephfs.h" nogil:
92 cdef struct ceph_mount_info:
95 cdef struct ceph_dir_result:
98 ctypedef void* rados_t
100 const char *ceph_version(int *major, int *minor, int *patch)
102 int ceph_create(ceph_mount_info **cmount, const char * const id)
103 int ceph_create_from_rados(ceph_mount_info **cmount, rados_t cluster)
104 int ceph_init(ceph_mount_info *cmount)
105 void ceph_shutdown(ceph_mount_info *cmount)
107 int ceph_conf_read_file(ceph_mount_info *cmount, const char *path_list)
108 int ceph_conf_parse_argv(ceph_mount_info *cmount, int argc, const char **argv)
109 int ceph_conf_get(ceph_mount_info *cmount, const char *option, char *buf, size_t len)
110 int ceph_conf_set(ceph_mount_info *cmount, const char *option, const char *value)
112 int ceph_mount(ceph_mount_info *cmount, const char *root)
113 int ceph_select_filesystem(ceph_mount_info *cmount, const char *fs_name)
114 int ceph_unmount(ceph_mount_info *cmount)
115 int ceph_abort_conn(ceph_mount_info *cmount)
116 uint64_t ceph_get_instance_id(ceph_mount_info *cmount)
117 int ceph_fstatx(ceph_mount_info *cmount, int fd, statx *stx, unsigned want, unsigned flags)
118 int ceph_statx(ceph_mount_info *cmount, const char *path, statx *stx, unsigned want, unsigned flags)
119 int ceph_statfs(ceph_mount_info *cmount, const char *path, statvfs *stbuf)
121 int ceph_mds_command(ceph_mount_info *cmount, const char *mds_spec, const char **cmd, size_t cmdlen,
122 const char *inbuf, size_t inbuflen, char **outbuf, size_t *outbuflen,
123 char **outs, size_t *outslen)
124 int ceph_rename(ceph_mount_info *cmount, const char *from_, const char *to)
125 int ceph_link(ceph_mount_info *cmount, const char *existing, const char *newname)
126 int ceph_unlink(ceph_mount_info *cmount, const char *path)
127 int ceph_symlink(ceph_mount_info *cmount, const char *existing, const char *newname)
128 int ceph_readlink(ceph_mount_info *cmount, const char *path, char *buf, int64_t size)
129 int ceph_setxattr(ceph_mount_info *cmount, const char *path, const char *name,
130 const void *value, size_t size, int flags)
131 int ceph_getxattr(ceph_mount_info *cmount, const char *path, const char *name,
132 void *value, size_t size)
133 int ceph_write(ceph_mount_info *cmount, int fd, const char *buf, int64_t size, int64_t offset)
134 int ceph_read(ceph_mount_info *cmount, int fd, char *buf, int64_t size, int64_t offset)
135 int ceph_flock(ceph_mount_info *cmount, int fd, int operation, uint64_t owner)
136 int ceph_close(ceph_mount_info *cmount, int fd)
137 int ceph_open(ceph_mount_info *cmount, const char *path, int flags, mode_t mode)
138 int ceph_mkdir(ceph_mount_info *cmount, const char *path, mode_t mode)
139 int ceph_mkdirs(ceph_mount_info *cmount, const char *path, mode_t mode)
140 int ceph_closedir(ceph_mount_info *cmount, ceph_dir_result *dirp)
141 int ceph_opendir(ceph_mount_info *cmount, const char *name, ceph_dir_result **dirpp)
142 void ceph_rewinddir(ceph_mount_info *cmount, ceph_dir_result *dirp)
143 int ceph_chdir(ceph_mount_info *cmount, const char *path)
144 dirent * ceph_readdir(ceph_mount_info *cmount, ceph_dir_result *dirp)
145 int ceph_rmdir(ceph_mount_info *cmount, const char *path)
146 const char* ceph_getcwd(ceph_mount_info *cmount)
147 int ceph_sync_fs(ceph_mount_info *cmount)
148 int ceph_fsync(ceph_mount_info *cmount, int fd, int syncdataonly)
149 int ceph_conf_parse_argv(ceph_mount_info *cmount, int argc, const char **argv)
150 int ceph_chmod(ceph_mount_info *cmount, const char *path, mode_t mode)
151 int64_t ceph_lseek(ceph_mount_info *cmount, int fd, int64_t offset, int whence)
152 void ceph_buffer_free(char *buf)
153 mode_t ceph_umask(ceph_mount_info *cmount, mode_t mode)
156 class Error(Exception):
160 class OSError(Error):
161 def __init__(self, errno, strerror):
162 super(OSError, self).__init__(errno, strerror)
164 self.strerror = strerror
167 return '[Errno {0}] {1}'.format(self.errno, self.strerror)
170 class PermissionError(OSError):
174 class ObjectNotFound(OSError):
178 class NoData(OSError):
182 class ObjectExists(OSError):
186 class IOError(OSError):
190 class NoSpace(OSError):
194 class InvalidValue(OSError):
198 class OperationNotSupported(OSError):
202 class LibCephFSStateError(Error):
206 class WouldBlock(OSError):
210 class OutOfRange(OSError):
214 IF UNAME_SYSNAME == "FreeBSD":
215 cdef errno_to_exception = {
216 errno.EPERM : PermissionError,
217 errno.ENOENT : ObjectNotFound,
219 errno.ENOSPC : NoSpace,
220 errno.EEXIST : ObjectExists,
221 errno.ENOATTR : NoData,
222 errno.EINVAL : InvalidValue,
223 errno.EOPNOTSUPP : OperationNotSupported,
224 errno.ERANGE : OutOfRange,
225 errno.EWOULDBLOCK: WouldBlock,
228 cdef errno_to_exception = {
229 errno.EPERM : PermissionError,
230 errno.ENOENT : ObjectNotFound,
232 errno.ENOSPC : NoSpace,
233 errno.EEXIST : ObjectExists,
234 errno.ENODATA : NoData,
235 errno.EINVAL : InvalidValue,
236 errno.EOPNOTSUPP : OperationNotSupported,
237 errno.ERANGE : OutOfRange,
238 errno.EWOULDBLOCK: WouldBlock,
242 cdef make_ex(ret, msg):
244 Translate a librados return code into an exception.
246 :param ret: the return code
248 :param msg: the error message to use
250 :returns: a subclass of :class:`Error`
253 if ret in errno_to_exception:
254 return errno_to_exception[ret](ret, msg)
256 return Error(ret, msg + (": error code %d" % ret))
259 class DirEntry(namedtuple('DirEntry',
260 ['d_ino', 'd_off', 'd_reclen', 'd_type', 'd_name'])):
265 return self.d_type == self.DT_DIR
267 def is_symbol_file(self):
268 return self.d_type == self.DT_LNK
271 return self.d_type == self.DT_REG
273 StatResult = namedtuple('StatResult',
274 ["st_dev", "st_ino", "st_mode", "st_nlink", "st_uid",
275 "st_gid", "st_rdev", "st_size", "st_blksize",
276 "st_blocks", "st_atime", "st_mtime", "st_ctime"])
278 cdef class DirResult(object):
280 cdef ceph_dir_result* handle
282 # Bug in older Cython instances prevents this from being a static method.
284 # cdef create(LibCephFS lib, ceph_dir_result* handle):
290 def __dealloc__(self):
295 raise make_ex(errno.EBADF, "dir is not open")
296 self.lib.require_state("mounted")
298 ceph_rewinddir(self.lib.cluster, self.handle)
301 def __exit__(self, type_, value, traceback):
306 self.lib.require_state("mounted")
309 dirent = ceph_readdir(self.lib.cluster, self.handle)
313 return DirEntry(d_ino=dirent.d_ino,
315 d_reclen=dirent.d_reclen,
316 d_type=dirent.d_type,
317 d_name=dirent.d_name)
321 self.lib.require_state("mounted")
323 ret = ceph_closedir(self.lib.cluster, self.handle)
325 raise make_ex(ret, "closedir failed")
328 def cstr(val, name, encoding="utf-8", opt=False):
330 Create a byte string from a Python string
332 :param basestring val: Python string
333 :param str name: Name of the string parameter, for exceptions
334 :param str encoding: Encoding to use
335 :param bool opt: If True, None is allowed
337 :raises: :class:`InvalidArgument`
339 if opt and val is None:
341 if isinstance(val, bytes):
345 v = val.encode(encoding)
347 raise TypeError('%s must be encodeable as a bytearray' % name)
348 assert isinstance(v, bytes)
351 def cstr_list(list_str, name, encoding="utf-8"):
352 return [cstr(s, name) for s in list_str]
355 def decode_cstr(val, encoding="utf-8"):
357 Decode a byte string into a Python string.
359 :param bytes val: byte string
360 :rtype: unicode or None
365 return val.decode(encoding)
368 cdef char* opt_str(s) except? NULL:
374 cdef char ** to_bytes_array(list_bytes):
375 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
377 raise MemoryError("malloc failed")
378 for i in xrange(len(list_bytes)):
379 ret[i] = <char *>list_bytes[i]
383 cdef void* realloc_chk(void* ptr, size_t size) except NULL:
384 cdef void *ret = realloc(ptr, size)
386 raise MemoryError("realloc failed")
390 cdef class LibCephFS(object):
391 """libcephfs python wrapper"""
393 cdef public object state
394 cdef ceph_mount_info *cluster
396 def require_state(self, *args):
397 if self.state in args:
399 raise LibCephFSStateError("You cannot perform that operation on a "
400 "CephFS object in state %s." % (self.state))
402 def __cinit__(self, conf=None, conffile=None, auth_id=None, rados_inst=None):
403 """Create a libcephfs wrapper
405 :param conf dict opt: settings overriding the default ones and conffile
406 :param conffile str opt: the path to ceph.conf to override the default settings
407 :auth_id str opt: the id used to authenticate the client entity
408 :rados_inst Rados opt: a rados.Rados instance
411 self.state = "uninitialized"
412 if rados_inst is not None:
413 if auth_id is not None or conffile is not None or conf is not None:
414 raise make_ex(errno.EINVAL,
415 "May not pass RADOS instance as well as other configuration")
417 self.create_with_rados(rados_inst)
419 self.create(conf, conffile, auth_id)
421 def create_with_rados(self, rados.Rados rados_inst):
424 ret = ceph_create_from_rados(&self.cluster, rados_inst.cluster)
426 raise Error("libcephfs_initialize failed with error code: %d" % ret)
427 self.state = "configuring"
429 def create(self, conf=None, conffile=None, auth_id=None):
431 Create a mount handle for interacting with Ceph. All libcephfs
432 functions operate on a mount info handle.
434 :param conf dict opt: settings overriding the default ones and conffile
435 :param conffile str opt: the path to ceph.conf to override the default settings
436 :auth_id str opt: the id used to authenticate the client entity
438 if conf is not None and not isinstance(conf, dict):
439 raise TypeError("conf must be dict or None")
440 cstr(conffile, 'configfile', opt=True)
441 auth_id = cstr(auth_id, 'auth_id', opt=True)
444 char* _auth_id = opt_str(auth_id)
448 ret = ceph_create(&self.cluster, <const char*>_auth_id)
450 raise Error("libcephfs_initialize failed with error code: %d" % ret)
452 self.state = "configuring"
453 if conffile is not None:
454 # read the default conf file when '' is given
457 self.conf_read_file(conffile)
459 for key, value in conf.iteritems():
460 self.conf_set(key, value)
462 def conf_read_file(self, conffile=None):
464 Load the ceph configuration from the specified config file.
466 :param conffile str opt: the path to ceph.conf to override the default settings
468 conffile = cstr(conffile, 'conffile', opt=True)
470 char *_conffile = opt_str(conffile)
472 ret = ceph_conf_read_file(self.cluster, <const char*>_conffile)
474 raise make_ex(ret, "error calling conf_read_file")
476 def conf_parse_argv(self, argv):
478 Parse the command line arguments and load the configuration parameters.
480 :param argv: the argument list
482 self.require_state("configuring")
483 cargv = cstr_list(argv, 'argv')
485 int _argc = len(argv)
486 char **_argv = to_bytes_array(cargv)
490 ret = ceph_conf_parse_argv(self.cluster, _argc,
491 <const char **>_argv)
493 raise make_ex(ret, "error calling conf_parse_argv")
499 Unmount and destroy the ceph mount handle.
501 if self.state in ["initialized", "mounted"]:
503 ceph_shutdown(self.cluster)
504 self.state = "shutdown"
510 def __exit__(self, type_, value, traceback):
514 def __dealloc__(self):
519 Get the version number of the ``libcephfs`` C library.
521 :returns: a tuple of ``(major, minor, extra)`` components of the
529 ceph_version(&major, &minor, &extra)
530 return (major, minor, extra)
532 def conf_get(self, option):
534 Gets the configuration value as a string.
536 :param option: the config option to get
538 self.require_state("configuring", "initialized", "mounted")
540 option = cstr(option, 'option')
542 char *_option = option
548 ret_buf = <char *>realloc_chk(ret_buf, length)
550 ret = ceph_conf_get(self.cluster, _option, ret_buf, length)
552 return decode_cstr(ret_buf)
553 elif ret == -errno.ENAMETOOLONG:
555 elif ret == -errno.ENOENT:
558 raise make_ex(ret, "error calling conf_get")
562 def conf_set(self, option, val):
564 Sets a configuration value from a string.
566 :param option: the configuration option to set
567 :param value: the value of the configuration option to set
569 self.require_state("configuring", "initialized", "mounted")
571 option = cstr(option, 'option')
572 val = cstr(val, 'val')
574 char *_option = option
578 ret = ceph_conf_set(self.cluster, _option, _val)
580 raise make_ex(ret, "error calling conf_set")
584 Initialize the filesystem client (but do not mount the filesystem yet)
586 self.require_state("configuring")
588 ret = ceph_init(self.cluster)
590 raise make_ex(ret, "error calling ceph_init")
591 self.state = "initialized"
593 def mount(self, mount_root=None, filesystem_name=None):
595 Perform a mount using the path for the root of the mount.
597 if self.state == "configuring":
599 self.require_state("initialized")
601 # Configure which filesystem to mount if one was specified
602 if filesystem_name is None:
603 filesystem_name = b""
605 char *_filesystem_name = filesystem_name
608 ret = ceph_select_filesystem(self.cluster,
611 raise make_ex(ret, "error calling ceph_select_filesystem")
613 # Prepare mount_root argument, default to "/"
614 root = b"/" if mount_root is None else mount_root
616 char *_mount_root = root
619 ret = ceph_mount(self.cluster, _mount_root)
621 raise make_ex(ret, "error calling ceph_mount")
622 self.state = "mounted"
626 Unmount a mount handle.
628 self.require_state("mounted")
630 ret = ceph_unmount(self.cluster)
632 raise make_ex(ret, "error calling ceph_unmount")
633 self.state = "initialized"
635 def abort_conn(self):
637 Abort mds connections.
639 self.require_state("mounted")
641 ret = ceph_abort_conn(self.cluster)
643 raise make_ex(ret, "error calling ceph_abort_conn")
644 self.state = "initialized"
646 def get_instance_id(self):
648 Get a global id for current instance
650 self.require_state("initialized", "mounted")
652 ret = ceph_get_instance_id(self.cluster)
655 def statfs(self, path):
657 Perform a statfs on the ceph file system. This call fills in file system wide statistics
658 into the passed in buffer.
660 :param path: any path within the mounted filesystem
662 self.require_state("mounted")
663 path = cstr(path, 'path')
669 ret = ceph_statfs(self.cluster, _path, &statbuf)
671 raise make_ex(ret, "statfs failed: %s" % path)
672 return {'f_bsize': statbuf.f_bsize,
673 'f_frsize': statbuf.f_frsize,
674 'f_blocks': statbuf.f_blocks,
675 'f_bfree': statbuf.f_bfree,
676 'f_bavail': statbuf.f_bavail,
677 'f_files': statbuf.f_files,
678 'f_ffree': statbuf.f_ffree,
679 'f_favail': statbuf.f_favail,
680 'f_fsid': statbuf.f_fsid,
681 'f_flag': statbuf.f_flag,
682 'f_namemax': statbuf.f_namemax}
686 Synchronize all filesystem data to persistent media
688 self.require_state("mounted")
690 ret = ceph_sync_fs(self.cluster)
692 raise make_ex(ret, "sync_fs failed")
694 def fsync(self, int fd, int syncdataonly):
696 Synchronize an open file to persistent media.
698 :param fd: the file descriptor of the file to sync.
699 :param syncdataonly: a boolean whether to synchronize metadata and data (0)
702 self.require_state("mounted")
704 ret = ceph_fsync(self.cluster, fd, syncdataonly)
706 raise make_ex(ret, "fsync failed")
710 Get the current working directory.
712 :rtype the path to the current working directory
714 self.require_state("mounted")
716 ret = ceph_getcwd(self.cluster)
719 def chdir(self, path):
721 Change the current working directory.
723 :param path the path to the working directory to change into.
725 self.require_state("mounted")
727 path = cstr(path, 'path')
728 cdef char* _path = path
730 ret = ceph_chdir(self.cluster, _path)
732 raise make_ex(ret, "chdir failed")
734 def opendir(self, path):
736 Open the given directory.
738 :param path: the path name of the directory to open. Must be either an absolute path
739 or a path relative to the current working directory.
740 :rtype handle: the open directory stream handle
742 self.require_state("mounted")
744 path = cstr(path, 'path')
747 ceph_dir_result* handle
749 ret = ceph_opendir(self.cluster, _path, &handle);
751 raise make_ex(ret, "opendir failed")
757 def readdir(self, DirResult handle):
759 Get the next entry in an open directory.
761 :param handle: the open directory stream handle
762 :rtype dentry: the next directory entry or None if at the end of the
763 directory (or the directory is empty. This pointer
764 should not be freed by the caller, and is only safe to
765 access between return and the next call to readdir or
768 self.require_state("mounted")
770 return handle.readdir()
772 def closedir(self, DirResult handle):
774 Close the open directory.
776 :param handle: the open directory stream handle
778 self.require_state("mounted")
780 return handle.close()
782 def mkdir(self, path, mode):
786 :param path: the path of the directory to create. This must be either an
787 absolute path or a relative path off of the current working directory.
788 :param mode the permissions the directory should have once created.
791 self.require_state("mounted")
792 path = cstr(path, 'path')
793 if not isinstance(mode, int):
794 raise TypeError('mode must be an int')
799 ret = ceph_mkdir(self.cluster, _path, _mode)
801 raise make_ex(ret, "error in mkdir '%s'" % path)
803 def chmod(self, path, mode) :
805 Change directory mode.
806 :param path: the path of the directory to create. This must be either an
807 absolute path or a relative path off of the current working directory.
808 :param mode the permissions the directory should have once created.
810 self.require_state("mounted")
811 path = cstr(path, 'path')
812 if not isinstance(mode, int):
813 raise TypeError('mode must be an int')
818 ret = ceph_chmod(self.cluster, _path, _mode)
820 raise make_ex(ret, "error in chmod '%s'" % path)
822 def mkdirs(self, path, mode):
824 Create multiple directories at once.
826 :param path: the full path of directories and sub-directories that should
828 :param mode the permissions the directory should have once created
830 self.require_state("mounted")
831 path = cstr(path, 'path')
832 if not isinstance(mode, int):
833 raise TypeError('mode must be an int')
839 ret = ceph_mkdirs(self.cluster, _path, _mode)
841 raise make_ex(ret, "error in mkdirs '%s'" % path)
843 def rmdir(self, path):
847 :param path: the path of the directory to remove.
849 self.require_state("mounted")
850 path = cstr(path, 'path')
851 cdef char* _path = path
852 ret = ceph_rmdir(self.cluster, _path)
854 raise make_ex(ret, "error in rmdir '%s'" % path)
856 def open(self, path, flags, mode=0):
858 Create and/or open a file.
860 :param path: the path of the file to open. If the flags parameter includes O_CREAT,
861 the file will first be created before opening.
862 :param flags: set of option masks that control how the file is created/opened.
863 :param mode: the permissions to place on the file if the file does not exist and O_CREAT
864 is specified in the flags.
866 self.require_state("mounted")
867 path = cstr(path, 'path')
869 if not isinstance(mode, int):
870 raise TypeError('mode must be an int')
871 if isinstance(flags, str):
874 cephfs_flags = os.O_RDONLY
882 cephfs_flags |= os.O_TRUNC | os.O_CREAT
883 elif access_flags > 0 and c == '+':
886 raise make_ex(errno.EOPNOTSUPP,
887 "open flags doesn't support %s" % c)
889 if access_flags == 1:
890 cephfs_flags |= os.O_RDONLY;
891 elif access_flags == 2:
892 cephfs_flags |= os.O_WRONLY;
894 cephfs_flags |= os.O_RDWR;
896 elif isinstance(flags, int):
899 raise TypeError("flags must be a string or an integer")
903 int _flags = cephfs_flags
907 ret = ceph_open(self.cluster, _path, _flags, _mode)
909 raise make_ex(ret, "error in open '%s'" % path)
916 :param fd: the file descriptor referring to the open file.
919 self.require_state("mounted")
920 if not isinstance(fd, int):
921 raise TypeError('fd must be an int')
924 ret = ceph_close(self.cluster, _fd)
926 raise make_ex(ret, "error in close")
928 def read(self, fd, offset, l):
930 Read data from the file.
932 :param fd : the file descriptor of the open file to read from.
933 :param offset : the offset in the file to read from. If this value is negative, the
934 function reads from the current offset of the file descriptor.
935 :param l : the flag to indicate what type of seeking to perform
937 self.require_state("mounted")
938 if not isinstance(offset, int):
939 raise TypeError('offset must be an int')
940 if not isinstance(l, int):
941 raise TypeError('l must be an int')
942 if not isinstance(fd, int):
943 raise TypeError('fd must be an int')
946 int64_t _offset = offset
950 PyObject* ret_s = NULL
952 ret_s = PyBytes_FromStringAndSize(NULL, _length)
954 ret_buf = PyBytes_AsString(ret_s)
956 ret = ceph_read(self.cluster, _fd, ret_buf, _length, _offset)
958 raise make_ex(ret, "error in read")
961 _PyBytes_Resize(&ret_s, ret)
965 # We DECREF unconditionally: the cast to object above will have
966 # INCREFed if necessary. This also takes care of exceptions,
967 # including if _PyString_Resize fails (that will free the string
968 # itself and set ret_s to NULL, hence XDECREF).
969 ref.Py_XDECREF(ret_s)
971 def write(self, fd, buf, offset):
973 Write data to a file.
975 :param fd : the file descriptor of the open file to write to
976 :param buf : the bytes to write to the file
977 :param offset : the offset of the file write into. If this value is negative, the
978 function writes to the current offset of the file descriptor.
980 self.require_state("mounted")
981 if not isinstance(fd, int):
982 raise TypeError('fd must be an int')
983 if not isinstance(buf, bytes):
984 raise TypeError('buf must be a bytes')
985 if not isinstance(offset, int):
986 raise TypeError('offset must be an int')
991 int64_t _offset = offset
993 size_t length = len(buf)
996 ret = ceph_write(self.cluster, _fd, _data, length, _offset)
998 raise make_ex(ret, "error in write")
1001 def flock(self, fd, operation, owner):
1003 Apply or remove an advisory lock.
1005 :param fd: the open file descriptor to change advisory lock.
1006 :param operation: the advisory lock operation to be performed on the file
1007 :param owner: the user-supplied owner identifier (an arbitrary integer)
1009 self.require_state("mounted")
1010 if not isinstance(fd, int):
1011 raise TypeError('fd must be an int')
1012 if not isinstance(operation, int):
1013 raise TypeError('operation must be an int')
1018 uint64_t _owner = owner
1021 ret = ceph_flock(self.cluster, _fd, _op, _owner)
1023 raise make_ex(ret, "error in write")
1026 def getxattr(self, path, name, size=255):
1028 Get an extended attribute.
1030 :param path: the path to the file
1031 :param name: the name of the extended attribute to get
1032 :param size: the size of the pre-allocated buffer
1034 self.require_state("mounted")
1036 path = cstr(path, 'path')
1037 name = cstr(name, 'name')
1043 size_t ret_length = size
1044 char *ret_buf = NULL
1047 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
1049 ret = ceph_getxattr(self.cluster, _path, _name, ret_buf,
1053 raise make_ex(ret, "error in getxattr")
1055 return ret_buf[:ret]
1059 def setxattr(self, path, name, value, flags):
1061 Set an extended attribute on a file.
1063 :param path: the path to the file.
1064 :param name: the name of the extended attribute to set.
1065 :param value: the bytes of the extended attribute value
1067 self.require_state("mounted")
1069 name = cstr(name, 'name')
1070 path = cstr(path, 'path')
1071 if not isinstance(flags, int):
1072 raise TypeError('flags must be a int')
1073 if not isinstance(value, bytes):
1074 raise TypeError('value must be a bytes')
1079 char *_value = value
1080 size_t _value_len = len(value)
1084 ret = ceph_setxattr(self.cluster, _path, _name,
1085 _value, _value_len, _flags)
1087 raise make_ex(ret, "error in setxattr")
1090 def stat(self, path):
1092 Get a file's extended statistics and attributes.
1094 :param path: the file or directory to get the statistics of.
1096 self.require_state("mounted")
1097 path = cstr(path, 'path')
1104 # FIXME: replace magic number with CEPH_STATX_BASIC_STATS
1105 ret = ceph_statx(self.cluster, _path, &stx, 0x7ffu, 0)
1107 raise make_ex(ret, "error in stat: %s" % path)
1108 return StatResult(st_dev=stx.stx_dev, st_ino=stx.stx_ino,
1109 st_mode=stx.stx_mode, st_nlink=stx.stx_nlink,
1110 st_uid=stx.stx_uid, st_gid=stx.stx_gid,
1111 st_rdev=stx.stx_rdev, st_size=stx.stx_size,
1112 st_blksize=stx.stx_blksize,
1113 st_blocks=stx.stx_blocks,
1114 st_atime=datetime.fromtimestamp(stx.stx_atime.tv_sec),
1115 st_mtime=datetime.fromtimestamp(stx.stx_mtime.tv_sec),
1116 st_ctime=datetime.fromtimestamp(stx.stx_ctime.tv_sec))
1118 def fstat(self, fd):
1120 Get an open file's extended statistics and attributes.
1122 :param fd: the file descriptor of the file to get statistics of.
1124 self.require_state("mounted")
1125 if not isinstance(fd, int):
1126 raise TypeError('fd must be an int')
1133 # FIXME: replace magic number with CEPH_STATX_BASIC_STATS
1134 ret = ceph_fstatx(self.cluster, _fd, &stx, 0x7ffu, 0)
1136 raise make_ex(ret, "error in fsat")
1137 return StatResult(st_dev=stx.stx_dev, st_ino=stx.stx_ino,
1138 st_mode=stx.stx_mode, st_nlink=stx.stx_nlink,
1139 st_uid=stx.stx_uid, st_gid=stx.stx_gid,
1140 st_rdev=stx.stx_rdev, st_size=stx.stx_size,
1141 st_blksize=stx.stx_blksize,
1142 st_blocks=stx.stx_blocks,
1143 st_atime=datetime.fromtimestamp(stx.stx_atime.tv_sec),
1144 st_mtime=datetime.fromtimestamp(stx.stx_mtime.tv_sec),
1145 st_ctime=datetime.fromtimestamp(stx.stx_ctime.tv_sec))
1147 def symlink(self, existing, newname):
1149 Creates a symbolic link.
1151 :param existing: the path to the existing file/directory to link to.
1152 :param newname: the path to the new file/directory to link from.
1154 self.require_state("mounted")
1155 existing = cstr(existing, 'existing')
1156 newname = cstr(newname, 'newname')
1158 char* _existing = existing
1159 char* _newname = newname
1162 ret = ceph_symlink(self.cluster, _existing, _newname)
1164 raise make_ex(ret, "error in symlink")
1166 def link(self, existing, newname):
1170 :param existing: the path to the existing file/directory to link to.
1171 :param newname: the path to the new file/directory to link from.
1174 self.require_state("mounted")
1175 existing = cstr(existing, 'existing')
1176 newname = cstr(newname, 'newname')
1178 char* _existing = existing
1179 char* _newname = newname
1182 ret = ceph_link(self.cluster, _existing, _newname)
1184 raise make_ex(ret, "error in link")
1186 def readlink(self, path, size):
1188 Read a symbolic link.
1190 :param path: the path to the symlink to read
1191 :param size: the length of the buffer
1192 :rtype buf: buffer to hold the path of the file that the symlink points to.
1194 self.require_state("mounted")
1195 path = cstr(path, 'path')
1199 int64_t _size = size
1203 buf = <char *>realloc_chk(buf, _size)
1205 ret = ceph_readlink(self.cluster, _path, buf, _size)
1207 raise make_ex(ret, "error in readlink")
1212 def unlink(self, path):
1214 Removes a file, link, or symbolic link. If the file/link has multiple links to it, the
1215 file will not disappear from the namespace until all references to it are removed.
1217 :param path: the path of the file or link to unlink.
1219 self.require_state("mounted")
1220 path = cstr(path, 'path')
1221 cdef char* _path = path
1223 ret = ceph_unlink(self.cluster, _path)
1225 raise make_ex(ret, "error in unlink: %s" % path)
1227 def rename(self, src, dst):
1229 Rename a file or directory.
1231 :param src: the path to the existing file or directory.
1232 :param dst: the new name of the file or directory.
1235 self.require_state("mounted")
1237 src = cstr(src, 'source')
1238 dst = cstr(dst, 'destination')
1245 ret = ceph_rename(self.cluster, _src, _dst)
1247 raise make_ex(ret, "error in rename '%s' to '%s'" % (src, dst))
1249 def mds_command(self, mds_spec, args, input_data):
1251 :return 3-tuple of output status int, output status string, output data
1253 mds_spec = cstr(mds_spec, 'mds_spec')
1254 args = cstr_list(args, 'args')
1255 input_data = cstr(input_data, 'input_data')
1258 char *_mds_spec = opt_str(mds_spec)
1259 char **_cmd = to_bytes_array(args)
1260 size_t _cmdlen = len(args)
1262 char *_inbuf = input_data
1263 size_t _inbuf_len = len(input_data)
1265 char *_outbuf = NULL
1266 size_t _outbuf_len = 0
1268 size_t _outs_len = 0
1272 ret = ceph_mds_command(self.cluster, _mds_spec,
1273 <const char **>_cmd, _cmdlen,
1274 <const char*>_inbuf, _inbuf_len,
1275 &_outbuf, &_outbuf_len,
1277 my_outs = decode_cstr(_outs[:_outs_len])
1278 my_outbuf = _outbuf[:_outbuf_len]
1280 ceph_buffer_free(_outs)
1282 ceph_buffer_free(_outbuf)
1283 return (ret, my_outbuf, my_outs)
1287 def umask(self, mode) :
1288 self.require_state("mounted")
1292 ret = ceph_umask(self.cluster, _mode)
1294 raise make_ex(ret, "error in umask")
1297 def lseek(self, fd, offset, whence):
1299 Set the file's current position.
1301 :param fd : the file descriptor of the open file to read from.
1302 :param offset : the offset in the file to read from. If this value is negative, the
1303 function reads from the current offset of the file descriptor.
1304 :param whence : the flag to indicate what type of seeking to performs:SEEK_SET, SEEK_CUR, SEEK_END
1306 self.require_state("mounted")
1307 if not isinstance(fd, int):
1308 raise TypeError('fd must be an int')
1309 if not isinstance(offset, int):
1310 raise TypeError('offset must be an int')
1311 if not isinstance(whence, int):
1312 raise TypeError('whence must be an int')
1316 int64_t _offset = offset
1317 int64_t _whence = whence
1320 ret = ceph_lseek(self.cluster, _fd, _offset, _whence)
1323 raise make_ex(ret, "error in lseek")