2 This module is a thin wrapper around libcephfs.
5 from cpython cimport PyObject, ref, exc
6 from libc cimport errno
7 from libc.stdint cimport *
8 from libc.stdlib cimport malloc, realloc, free
12 from collections import namedtuple
13 from datetime import datetime
18 # Are we running Python 2.x
19 if sys.version_info[0] < 3:
24 cdef int AT_SYMLINK_NOFOLLOW = 0x100
26 cdef extern from "Python.h":
27 # These are in cpython/string.pxd, but use "object" types instead of
28 # PyObject*, which invokes assumptions in cpython that we need to
29 # legitimately break to implement zero-copy string buffers in Image.read().
30 # This is valid use of the Python API and documented as a special case.
31 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
32 char* PyBytes_AsString(PyObject *string) except NULL
33 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
34 void PyEval_InitThreads()
37 cdef extern from "sys/statvfs.h":
39 unsigned long int f_bsize
40 unsigned long int f_frsize
41 unsigned long int f_blocks
42 unsigned long int f_bfree
43 unsigned long int f_bavail
44 unsigned long int f_files
45 unsigned long int f_ffree
46 unsigned long int f_favail
47 unsigned long int f_fsid
48 unsigned long int f_flag
49 unsigned long int f_namemax
50 unsigned long int f_padding[32]
53 IF UNAME_SYSNAME == "FreeBSD":
54 cdef extern from "dirent.h":
57 unsigned short int d_reclen
61 cdef extern from "dirent.h":
64 unsigned long int d_off
65 unsigned short int d_reclen
70 cdef extern from "time.h":
71 ctypedef long int time_t
73 cdef extern from "time.h":
78 cdef extern from "sys/types.h":
79 ctypedef unsigned long mode_t
81 cdef extern from "cephfs/ceph_statx.h":
82 cdef struct statx "ceph_statx":
100 cdef extern from "cephfs/libcephfs.h" nogil:
101 cdef struct ceph_mount_info:
104 cdef struct ceph_dir_result:
107 ctypedef void* rados_t
109 const char *ceph_version(int *major, int *minor, int *patch)
111 int ceph_create(ceph_mount_info **cmount, const char * const id)
112 int ceph_create_from_rados(ceph_mount_info **cmount, rados_t cluster)
113 int ceph_init(ceph_mount_info *cmount)
114 void ceph_shutdown(ceph_mount_info *cmount)
116 int ceph_conf_read_file(ceph_mount_info *cmount, const char *path_list)
117 int ceph_conf_parse_argv(ceph_mount_info *cmount, int argc, const char **argv)
118 int ceph_conf_get(ceph_mount_info *cmount, const char *option, char *buf, size_t len)
119 int ceph_conf_set(ceph_mount_info *cmount, const char *option, const char *value)
121 int ceph_mount(ceph_mount_info *cmount, const char *root)
122 int ceph_select_filesystem(ceph_mount_info *cmount, const char *fs_name)
123 int ceph_unmount(ceph_mount_info *cmount)
124 int ceph_abort_conn(ceph_mount_info *cmount)
125 uint64_t ceph_get_instance_id(ceph_mount_info *cmount)
126 int ceph_fstatx(ceph_mount_info *cmount, int fd, statx *stx, unsigned want, unsigned flags)
127 int ceph_statx(ceph_mount_info *cmount, const char *path, statx *stx, unsigned want, unsigned flags)
128 int ceph_statfs(ceph_mount_info *cmount, const char *path, statvfs *stbuf)
130 int ceph_mds_command(ceph_mount_info *cmount, const char *mds_spec, const char **cmd, size_t cmdlen,
131 const char *inbuf, size_t inbuflen, char **outbuf, size_t *outbuflen,
132 char **outs, size_t *outslen)
133 int ceph_rename(ceph_mount_info *cmount, const char *from_, const char *to)
134 int ceph_link(ceph_mount_info *cmount, const char *existing, const char *newname)
135 int ceph_unlink(ceph_mount_info *cmount, const char *path)
136 int ceph_symlink(ceph_mount_info *cmount, const char *existing, const char *newname)
137 int ceph_readlink(ceph_mount_info *cmount, const char *path, char *buf, int64_t size)
138 int ceph_setxattr(ceph_mount_info *cmount, const char *path, const char *name,
139 const void *value, size_t size, int flags)
140 int ceph_getxattr(ceph_mount_info *cmount, const char *path, const char *name,
141 void *value, size_t size)
142 int ceph_write(ceph_mount_info *cmount, int fd, const char *buf, int64_t size, int64_t offset)
143 int ceph_read(ceph_mount_info *cmount, int fd, char *buf, int64_t size, int64_t offset)
144 int ceph_flock(ceph_mount_info *cmount, int fd, int operation, uint64_t owner)
145 int ceph_close(ceph_mount_info *cmount, int fd)
146 int ceph_open(ceph_mount_info *cmount, const char *path, int flags, mode_t mode)
147 int ceph_mkdir(ceph_mount_info *cmount, const char *path, mode_t mode)
148 int ceph_mkdirs(ceph_mount_info *cmount, const char *path, mode_t mode)
149 int ceph_closedir(ceph_mount_info *cmount, ceph_dir_result *dirp)
150 int ceph_opendir(ceph_mount_info *cmount, const char *name, ceph_dir_result **dirpp)
151 void ceph_rewinddir(ceph_mount_info *cmount, ceph_dir_result *dirp)
152 int ceph_chdir(ceph_mount_info *cmount, const char *path)
153 dirent * ceph_readdir(ceph_mount_info *cmount, ceph_dir_result *dirp)
154 int ceph_rmdir(ceph_mount_info *cmount, const char *path)
155 const char* ceph_getcwd(ceph_mount_info *cmount)
156 int ceph_sync_fs(ceph_mount_info *cmount)
157 int ceph_fsync(ceph_mount_info *cmount, int fd, int syncdataonly)
158 int ceph_conf_parse_argv(ceph_mount_info *cmount, int argc, const char **argv)
159 int ceph_chmod(ceph_mount_info *cmount, const char *path, mode_t mode)
160 int ceph_chown(ceph_mount_info *cmount, const char *path, int uid, int gid)
161 int64_t ceph_lseek(ceph_mount_info *cmount, int fd, int64_t offset, int whence)
162 void ceph_buffer_free(char *buf)
163 mode_t ceph_umask(ceph_mount_info *cmount, mode_t mode)
166 class Error(Exception):
170 class OSError(Error):
171 def __init__(self, errno, strerror):
172 super(OSError, self).__init__(errno, strerror)
174 self.strerror = strerror
177 return '{0}: {1} [Errno {2}]'.format(self.strerror, os.strerror(self.errno), self.errno)
180 class PermissionError(OSError):
184 class ObjectNotFound(OSError):
188 class NoData(OSError):
192 class ObjectExists(OSError):
196 class IOError(OSError):
200 class NoSpace(OSError):
204 class InvalidValue(OSError):
208 class OperationNotSupported(OSError):
212 class LibCephFSStateError(Error):
216 class WouldBlock(OSError):
220 class OutOfRange(OSError):
224 class ObjectNotEmpty(OSError):
228 IF UNAME_SYSNAME == "FreeBSD":
229 cdef errno_to_exception = {
230 errno.EPERM : PermissionError,
231 errno.ENOENT : ObjectNotFound,
233 errno.ENOSPC : NoSpace,
234 errno.EEXIST : ObjectExists,
235 errno.ENOATTR : NoData,
236 errno.EINVAL : InvalidValue,
237 errno.EOPNOTSUPP : OperationNotSupported,
238 errno.ERANGE : OutOfRange,
239 errno.EWOULDBLOCK: WouldBlock,
240 errno.ENOTEMPTY : ObjectNotEmpty,
243 cdef errno_to_exception = {
244 errno.EPERM : PermissionError,
245 errno.ENOENT : ObjectNotFound,
247 errno.ENOSPC : NoSpace,
248 errno.EEXIST : ObjectExists,
249 errno.ENODATA : NoData,
250 errno.EINVAL : InvalidValue,
251 errno.EOPNOTSUPP : OperationNotSupported,
252 errno.ERANGE : OutOfRange,
253 errno.EWOULDBLOCK: WouldBlock,
254 errno.ENOTEMPTY : ObjectNotEmpty,
258 cdef make_ex(ret, msg):
260 Translate a librados return code into an exception.
262 :param ret: the return code
264 :param msg: the error message to use
266 :returns: a subclass of :class:`Error`
269 if ret in errno_to_exception:
270 return errno_to_exception[ret](ret, msg)
272 return Error(msg + ': {} [Errno {:d}]'.format(os.strerror(ret), ret))
275 class DirEntry(namedtuple('DirEntry',
276 ['d_ino', 'd_off', 'd_reclen', 'd_type', 'd_name'])):
281 return self.d_type == self.DT_DIR
283 def is_symbol_file(self):
284 return self.d_type == self.DT_LNK
287 return self.d_type == self.DT_REG
289 StatResult = namedtuple('StatResult',
290 ["st_dev", "st_ino", "st_mode", "st_nlink", "st_uid",
291 "st_gid", "st_rdev", "st_size", "st_blksize",
292 "st_blocks", "st_atime", "st_mtime", "st_ctime"])
294 cdef class DirResult(object):
296 cdef ceph_dir_result* handle
298 # Bug in older Cython instances prevents this from being a static method.
300 # cdef create(LibCephFS lib, ceph_dir_result* handle):
306 def __dealloc__(self):
311 raise make_ex(errno.EBADF, "dir is not open")
312 self.lib.require_state("mounted")
314 ceph_rewinddir(self.lib.cluster, self.handle)
317 def __exit__(self, type_, value, traceback):
322 self.lib.require_state("mounted")
325 dirent = ceph_readdir(self.lib.cluster, self.handle)
329 IF UNAME_SYSNAME == "FreeBSD":
330 return DirEntry(d_ino=dirent.d_ino,
332 d_reclen=dirent.d_reclen,
333 d_type=dirent.d_type,
334 d_name=dirent.d_name)
336 return DirEntry(d_ino=dirent.d_ino,
338 d_reclen=dirent.d_reclen,
339 d_type=dirent.d_type,
340 d_name=dirent.d_name)
344 self.lib.require_state("mounted")
346 ret = ceph_closedir(self.lib.cluster, self.handle)
348 raise make_ex(ret, "closedir failed")
351 def cstr(val, name, encoding="utf-8", opt=False):
353 Create a byte string from a Python string
355 :param basestring val: Python string
356 :param str name: Name of the string parameter, for exceptions
357 :param str encoding: Encoding to use
358 :param bool opt: If True, None is allowed
360 :raises: :class:`InvalidArgument`
362 if opt and val is None:
364 if isinstance(val, bytes):
368 v = val.encode(encoding)
370 raise TypeError('%s must be encodeable as a bytearray' % name)
371 assert isinstance(v, bytes)
374 def cstr_list(list_str, name, encoding="utf-8"):
375 return [cstr(s, name) for s in list_str]
378 def decode_cstr(val, encoding="utf-8"):
380 Decode a byte string into a Python string.
382 :param bytes val: byte string
383 :rtype: unicode or None
388 return val.decode(encoding)
391 cdef char* opt_str(s) except? NULL:
397 cdef char ** to_bytes_array(list_bytes):
398 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
400 raise MemoryError("malloc failed")
401 for i in xrange(len(list_bytes)):
402 ret[i] = <char *>list_bytes[i]
406 cdef void* realloc_chk(void* ptr, size_t size) except NULL:
407 cdef void *ret = realloc(ptr, size)
409 raise MemoryError("realloc failed")
413 cdef class LibCephFS(object):
414 """libcephfs python wrapper"""
416 cdef public object state
417 cdef ceph_mount_info *cluster
419 def require_state(self, *args):
420 if self.state in args:
422 raise LibCephFSStateError("You cannot perform that operation on a "
423 "CephFS object in state %s." % (self.state))
425 def __cinit__(self, conf=None, conffile=None, auth_id=None, rados_inst=None):
426 """Create a libcephfs wrapper
428 :param conf dict opt: settings overriding the default ones and conffile
429 :param conffile str opt: the path to ceph.conf to override the default settings
430 :auth_id str opt: the id used to authenticate the client entity
431 :rados_inst Rados opt: a rados.Rados instance
434 self.state = "uninitialized"
435 if rados_inst is not None:
436 if auth_id is not None or conffile is not None or conf is not None:
437 raise make_ex(errno.EINVAL,
438 "May not pass RADOS instance as well as other configuration")
440 self.create_with_rados(rados_inst)
442 self.create(conf, conffile, auth_id)
444 def create_with_rados(self, rados.Rados rados_inst):
447 ret = ceph_create_from_rados(&self.cluster, rados_inst.cluster)
449 raise Error("libcephfs_initialize failed with error code: %d" % ret)
450 self.state = "configuring"
452 def create(self, conf=None, conffile=None, auth_id=None):
454 Create a mount handle for interacting with Ceph. All libcephfs
455 functions operate on a mount info handle.
457 :param conf dict opt: settings overriding the default ones and conffile
458 :param conffile str opt: the path to ceph.conf to override the default settings
459 :auth_id str opt: the id used to authenticate the client entity
461 if conf is not None and not isinstance(conf, dict):
462 raise TypeError("conf must be dict or None")
463 cstr(conffile, 'configfile', opt=True)
464 auth_id = cstr(auth_id, 'auth_id', opt=True)
467 char* _auth_id = opt_str(auth_id)
471 ret = ceph_create(&self.cluster, <const char*>_auth_id)
473 raise Error("libcephfs_initialize failed with error code: %d" % ret)
475 self.state = "configuring"
476 if conffile is not None:
477 # read the default conf file when '' is given
480 self.conf_read_file(conffile)
482 for key, value in conf.iteritems():
483 self.conf_set(key, value)
485 def conf_read_file(self, conffile=None):
487 Load the ceph configuration from the specified config file.
489 :param conffile str opt: the path to ceph.conf to override the default settings
491 conffile = cstr(conffile, 'conffile', opt=True)
493 char *_conffile = opt_str(conffile)
495 ret = ceph_conf_read_file(self.cluster, <const char*>_conffile)
497 raise make_ex(ret, "error calling conf_read_file")
499 def conf_parse_argv(self, argv):
501 Parse the command line arguments and load the configuration parameters.
503 :param argv: the argument list
505 self.require_state("configuring")
506 cargv = cstr_list(argv, 'argv')
508 int _argc = len(argv)
509 char **_argv = to_bytes_array(cargv)
513 ret = ceph_conf_parse_argv(self.cluster, _argc,
514 <const char **>_argv)
516 raise make_ex(ret, "error calling conf_parse_argv")
522 Unmount and destroy the ceph mount handle.
524 if self.state in ["initialized", "mounted"]:
526 ceph_shutdown(self.cluster)
527 self.state = "shutdown"
533 def __exit__(self, type_, value, traceback):
537 def __dealloc__(self):
542 Get the version number of the ``libcephfs`` C library.
544 :returns: a tuple of ``(major, minor, extra)`` components of the
552 ceph_version(&major, &minor, &extra)
553 return (major, minor, extra)
555 def conf_get(self, option):
557 Gets the configuration value as a string.
559 :param option: the config option to get
561 self.require_state("configuring", "initialized", "mounted")
563 option = cstr(option, 'option')
565 char *_option = option
571 ret_buf = <char *>realloc_chk(ret_buf, length)
573 ret = ceph_conf_get(self.cluster, _option, ret_buf, length)
575 return decode_cstr(ret_buf)
576 elif ret == -errno.ENAMETOOLONG:
578 elif ret == -errno.ENOENT:
581 raise make_ex(ret, "error calling conf_get")
585 def conf_set(self, option, val):
587 Sets a configuration value from a string.
589 :param option: the configuration option to set
590 :param value: the value of the configuration option to set
592 self.require_state("configuring", "initialized", "mounted")
594 option = cstr(option, 'option')
595 val = cstr(val, 'val')
597 char *_option = option
601 ret = ceph_conf_set(self.cluster, _option, _val)
603 raise make_ex(ret, "error calling conf_set")
607 Initialize the filesystem client (but do not mount the filesystem yet)
609 self.require_state("configuring")
611 ret = ceph_init(self.cluster)
613 raise make_ex(ret, "error calling ceph_init")
614 self.state = "initialized"
616 def mount(self, mount_root=None, filesystem_name=None):
618 Perform a mount using the path for the root of the mount.
620 if self.state == "configuring":
622 self.require_state("initialized")
624 # Configure which filesystem to mount if one was specified
625 if filesystem_name is None:
626 filesystem_name = b""
628 char *_filesystem_name = filesystem_name
631 ret = ceph_select_filesystem(self.cluster,
634 raise make_ex(ret, "error calling ceph_select_filesystem")
636 # Prepare mount_root argument, default to "/"
637 root = b"/" if mount_root is None else mount_root
639 char *_mount_root = root
642 ret = ceph_mount(self.cluster, _mount_root)
644 raise make_ex(ret, "error calling ceph_mount")
645 self.state = "mounted"
649 Unmount a mount handle.
651 self.require_state("mounted")
653 ret = ceph_unmount(self.cluster)
655 raise make_ex(ret, "error calling ceph_unmount")
656 self.state = "initialized"
658 def abort_conn(self):
660 Abort mds connections.
662 self.require_state("mounted")
664 ret = ceph_abort_conn(self.cluster)
666 raise make_ex(ret, "error calling ceph_abort_conn")
667 self.state = "initialized"
669 def get_instance_id(self):
671 Get a global id for current instance
673 self.require_state("initialized", "mounted")
675 ret = ceph_get_instance_id(self.cluster)
678 def statfs(self, path):
680 Perform a statfs on the ceph file system. This call fills in file system wide statistics
681 into the passed in buffer.
683 :param path: any path within the mounted filesystem
685 self.require_state("mounted")
686 path = cstr(path, 'path')
692 ret = ceph_statfs(self.cluster, _path, &statbuf)
694 raise make_ex(ret, "statfs failed: %s" % path)
695 return {'f_bsize': statbuf.f_bsize,
696 'f_frsize': statbuf.f_frsize,
697 'f_blocks': statbuf.f_blocks,
698 'f_bfree': statbuf.f_bfree,
699 'f_bavail': statbuf.f_bavail,
700 'f_files': statbuf.f_files,
701 'f_ffree': statbuf.f_ffree,
702 'f_favail': statbuf.f_favail,
703 'f_fsid': statbuf.f_fsid,
704 'f_flag': statbuf.f_flag,
705 'f_namemax': statbuf.f_namemax}
709 Synchronize all filesystem data to persistent media
711 self.require_state("mounted")
713 ret = ceph_sync_fs(self.cluster)
715 raise make_ex(ret, "sync_fs failed")
717 def fsync(self, int fd, int syncdataonly):
719 Synchronize an open file to persistent media.
721 :param fd: the file descriptor of the file to sync.
722 :param syncdataonly: a boolean whether to synchronize metadata and data (0)
725 self.require_state("mounted")
727 ret = ceph_fsync(self.cluster, fd, syncdataonly)
729 raise make_ex(ret, "fsync failed")
733 Get the current working directory.
735 :rtype the path to the current working directory
737 self.require_state("mounted")
739 ret = ceph_getcwd(self.cluster)
742 def chdir(self, path):
744 Change the current working directory.
746 :param path the path to the working directory to change into.
748 self.require_state("mounted")
750 path = cstr(path, 'path')
751 cdef char* _path = path
753 ret = ceph_chdir(self.cluster, _path)
755 raise make_ex(ret, "chdir failed")
757 def opendir(self, path):
759 Open the given directory.
761 :param path: the path name of the directory to open. Must be either an absolute path
762 or a path relative to the current working directory.
763 :rtype handle: the open directory stream handle
765 self.require_state("mounted")
767 path = cstr(path, 'path')
770 ceph_dir_result* handle
772 ret = ceph_opendir(self.cluster, _path, &handle);
774 raise make_ex(ret, "opendir failed")
780 def readdir(self, DirResult handle):
782 Get the next entry in an open directory.
784 :param handle: the open directory stream handle
785 :rtype dentry: the next directory entry or None if at the end of the
786 directory (or the directory is empty. This pointer
787 should not be freed by the caller, and is only safe to
788 access between return and the next call to readdir or
791 self.require_state("mounted")
793 return handle.readdir()
795 def closedir(self, DirResult handle):
797 Close the open directory.
799 :param handle: the open directory stream handle
801 self.require_state("mounted")
803 return handle.close()
805 def mkdir(self, path, mode):
809 :param path: the path of the directory to create. This must be either an
810 absolute path or a relative path off of the current working directory.
811 :param mode the permissions the directory should have once created.
814 self.require_state("mounted")
815 path = cstr(path, 'path')
816 if not isinstance(mode, int):
817 raise TypeError('mode must be an int')
822 ret = ceph_mkdir(self.cluster, _path, _mode)
824 raise make_ex(ret, "error in mkdir {}".format(path.decode('utf-8')))
826 def chmod(self, path, mode) :
828 Change directory mode.
829 :param path: the path of the directory to create. This must be either an
830 absolute path or a relative path off of the current working directory.
831 :param mode the permissions the directory should have once created.
833 self.require_state("mounted")
834 path = cstr(path, 'path')
835 if not isinstance(mode, int):
836 raise TypeError('mode must be an int')
841 ret = ceph_chmod(self.cluster, _path, _mode)
843 raise make_ex(ret, "error in chmod {}".format(path.decode('utf-8')))
845 def chown(self, path, uid, gid):
847 Change directory ownership
848 :param path: the path of the directory to change.
849 :param uid: the uid to set
850 :param gid: the gid to set
852 self.require_state("mounted")
853 path = cstr(path, 'path')
854 if not isinstance(uid, int):
855 raise TypeError('uid must be an int')
856 elif not isinstance(gid, int):
857 raise TypeError('gid must be an int')
864 ret = ceph_chown(self.cluster, _path, _uid, _gid)
866 raise make_ex(ret, "error in chown {}".format(path.decode('utf-8')))
868 def mkdirs(self, path, mode):
870 Create multiple directories at once.
872 :param path: the full path of directories and sub-directories that should
874 :param mode the permissions the directory should have once created
876 self.require_state("mounted")
877 path = cstr(path, 'path')
878 if not isinstance(mode, int):
879 raise TypeError('mode must be an int')
885 ret = ceph_mkdirs(self.cluster, _path, _mode)
887 raise make_ex(ret, "error in mkdirs {}".format(path.decode('utf-8')))
889 def rmdir(self, path):
893 :param path: the path of the directory to remove.
895 self.require_state("mounted")
896 path = cstr(path, 'path')
897 cdef char* _path = path
898 ret = ceph_rmdir(self.cluster, _path)
900 raise make_ex(ret, "error in rmdir {}".format(path.decode('utf-8')))
902 def open(self, path, flags, mode=0):
904 Create and/or open a file.
906 :param path: the path of the file to open. If the flags parameter includes O_CREAT,
907 the file will first be created before opening.
908 :param flags: set of option masks that control how the file is created/opened.
909 :param mode: the permissions to place on the file if the file does not exist and O_CREAT
910 is specified in the flags.
912 self.require_state("mounted")
913 path = cstr(path, 'path')
915 if not isinstance(mode, int):
916 raise TypeError('mode must be an int')
917 if isinstance(flags, str):
920 cephfs_flags = os.O_RDONLY
928 cephfs_flags |= os.O_TRUNC | os.O_CREAT
929 elif access_flags > 0 and c == '+':
932 raise make_ex(errno.EOPNOTSUPP,
933 "open flags doesn't support %s" % c)
935 if access_flags == 1:
936 cephfs_flags |= os.O_RDONLY;
937 elif access_flags == 2:
938 cephfs_flags |= os.O_WRONLY;
940 cephfs_flags |= os.O_RDWR;
942 elif isinstance(flags, int):
945 raise TypeError("flags must be a string or an integer")
949 int _flags = cephfs_flags
953 ret = ceph_open(self.cluster, _path, _flags, _mode)
955 raise make_ex(ret, "error in open {}".format(path.decode('utf-8')))
962 :param fd: the file descriptor referring to the open file.
965 self.require_state("mounted")
966 if not isinstance(fd, int):
967 raise TypeError('fd must be an int')
970 ret = ceph_close(self.cluster, _fd)
972 raise make_ex(ret, "error in close")
974 def read(self, fd, offset, l):
976 Read data from the file.
978 :param fd : the file descriptor of the open file to read from.
979 :param offset : the offset in the file to read from. If this value is negative, the
980 function reads from the current offset of the file descriptor.
981 :param l : the flag to indicate what type of seeking to perform
983 self.require_state("mounted")
984 if not isinstance(offset, int):
985 raise TypeError('offset must be an int')
986 if not isinstance(l, int):
987 raise TypeError('l must be an int')
988 if not isinstance(fd, int):
989 raise TypeError('fd must be an int')
992 int64_t _offset = offset
996 PyObject* ret_s = NULL
998 ret_s = PyBytes_FromStringAndSize(NULL, _length)
1000 ret_buf = PyBytes_AsString(ret_s)
1002 ret = ceph_read(self.cluster, _fd, ret_buf, _length, _offset)
1004 raise make_ex(ret, "error in read")
1007 _PyBytes_Resize(&ret_s, ret)
1009 return <object>ret_s
1011 # We DECREF unconditionally: the cast to object above will have
1012 # INCREFed if necessary. This also takes care of exceptions,
1013 # including if _PyString_Resize fails (that will free the string
1014 # itself and set ret_s to NULL, hence XDECREF).
1015 ref.Py_XDECREF(ret_s)
1017 def write(self, fd, buf, offset):
1019 Write data to a file.
1021 :param fd : the file descriptor of the open file to write to
1022 :param buf : the bytes to write to the file
1023 :param offset : the offset of the file write into. If this value is negative, the
1024 function writes to the current offset of the file descriptor.
1026 self.require_state("mounted")
1027 if not isinstance(fd, int):
1028 raise TypeError('fd must be an int')
1029 if not isinstance(buf, bytes):
1030 raise TypeError('buf must be a bytes')
1031 if not isinstance(offset, int):
1032 raise TypeError('offset must be an int')
1037 int64_t _offset = offset
1039 size_t length = len(buf)
1042 ret = ceph_write(self.cluster, _fd, _data, length, _offset)
1044 raise make_ex(ret, "error in write")
1047 def flock(self, fd, operation, owner):
1049 Apply or remove an advisory lock.
1051 :param fd: the open file descriptor to change advisory lock.
1052 :param operation: the advisory lock operation to be performed on the file
1053 :param owner: the user-supplied owner identifier (an arbitrary integer)
1055 self.require_state("mounted")
1056 if not isinstance(fd, int):
1057 raise TypeError('fd must be an int')
1058 if not isinstance(operation, int):
1059 raise TypeError('operation must be an int')
1064 uint64_t _owner = owner
1067 ret = ceph_flock(self.cluster, _fd, _op, _owner)
1069 raise make_ex(ret, "error in write")
1072 def getxattr(self, path, name, size=255):
1074 Get an extended attribute.
1076 :param path: the path to the file
1077 :param name: the name of the extended attribute to get
1078 :param size: the size of the pre-allocated buffer
1080 self.require_state("mounted")
1082 path = cstr(path, 'path')
1083 name = cstr(name, 'name')
1089 size_t ret_length = size
1090 char *ret_buf = NULL
1093 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
1095 ret = ceph_getxattr(self.cluster, _path, _name, ret_buf,
1099 raise make_ex(ret, "error in getxattr")
1101 return ret_buf[:ret]
1105 def setxattr(self, path, name, value, flags):
1107 Set an extended attribute on a file.
1109 :param path: the path to the file.
1110 :param name: the name of the extended attribute to set.
1111 :param value: the bytes of the extended attribute value
1113 self.require_state("mounted")
1115 name = cstr(name, 'name')
1116 path = cstr(path, 'path')
1117 if not isinstance(flags, int):
1118 raise TypeError('flags must be a int')
1119 if not isinstance(value, bytes):
1120 raise TypeError('value must be a bytes')
1125 char *_value = value
1126 size_t _value_len = len(value)
1130 ret = ceph_setxattr(self.cluster, _path, _name,
1131 _value, _value_len, _flags)
1133 raise make_ex(ret, "error in setxattr")
1136 def stat(self, path, follow_symlink=True):
1138 Get a file's extended statistics and attributes.
1140 :param path: the file or directory to get the statistics of.
1142 self.require_state("mounted")
1143 path = cstr(path, 'path')
1151 # FIXME: replace magic number with CEPH_STATX_BASIC_STATS
1152 ret = ceph_statx(self.cluster, _path, &stx, 0x7ffu, 0)
1155 ret = ceph_statx(self.cluster, _path, &stx, 0x7ffu,
1156 AT_SYMLINK_NOFOLLOW)
1159 raise make_ex(ret, "error in stat: {}".format(path.decode('utf-8')))
1160 return StatResult(st_dev=stx.stx_dev, st_ino=stx.stx_ino,
1161 st_mode=stx.stx_mode, st_nlink=stx.stx_nlink,
1162 st_uid=stx.stx_uid, st_gid=stx.stx_gid,
1163 st_rdev=stx.stx_rdev, st_size=stx.stx_size,
1164 st_blksize=stx.stx_blksize,
1165 st_blocks=stx.stx_blocks,
1166 st_atime=datetime.fromtimestamp(stx.stx_atime.tv_sec),
1167 st_mtime=datetime.fromtimestamp(stx.stx_mtime.tv_sec),
1168 st_ctime=datetime.fromtimestamp(stx.stx_ctime.tv_sec))
1170 def lstat(self, path):
1172 Get a file's extended statistics and attributes. When file's a
1173 symbolic link, return the informaion of the link itself rather
1174 than that of the file it points too.
1176 :param path: the file or directory to get the statistics of.
1178 return self.stat(path, follow_symlink=False)
1180 def fstat(self, fd):
1182 Get an open file's extended statistics and attributes.
1184 :param fd: the file descriptor of the file to get statistics of.
1186 self.require_state("mounted")
1187 if not isinstance(fd, int):
1188 raise TypeError('fd must be an int')
1195 # FIXME: replace magic number with CEPH_STATX_BASIC_STATS
1196 ret = ceph_fstatx(self.cluster, _fd, &stx, 0x7ffu, 0)
1198 raise make_ex(ret, "error in fsat")
1199 return StatResult(st_dev=stx.stx_dev, st_ino=stx.stx_ino,
1200 st_mode=stx.stx_mode, st_nlink=stx.stx_nlink,
1201 st_uid=stx.stx_uid, st_gid=stx.stx_gid,
1202 st_rdev=stx.stx_rdev, st_size=stx.stx_size,
1203 st_blksize=stx.stx_blksize,
1204 st_blocks=stx.stx_blocks,
1205 st_atime=datetime.fromtimestamp(stx.stx_atime.tv_sec),
1206 st_mtime=datetime.fromtimestamp(stx.stx_mtime.tv_sec),
1207 st_ctime=datetime.fromtimestamp(stx.stx_ctime.tv_sec))
1209 def symlink(self, existing, newname):
1211 Creates a symbolic link.
1213 :param existing: the path to the existing file/directory to link to.
1214 :param newname: the path to the new file/directory to link from.
1216 self.require_state("mounted")
1217 existing = cstr(existing, 'existing')
1218 newname = cstr(newname, 'newname')
1220 char* _existing = existing
1221 char* _newname = newname
1224 ret = ceph_symlink(self.cluster, _existing, _newname)
1226 raise make_ex(ret, "error in symlink")
1228 def link(self, existing, newname):
1232 :param existing: the path to the existing file/directory to link to.
1233 :param newname: the path to the new file/directory to link from.
1236 self.require_state("mounted")
1237 existing = cstr(existing, 'existing')
1238 newname = cstr(newname, 'newname')
1240 char* _existing = existing
1241 char* _newname = newname
1244 ret = ceph_link(self.cluster, _existing, _newname)
1246 raise make_ex(ret, "error in link")
1248 def readlink(self, path, size):
1250 Read a symbolic link.
1252 :param path: the path to the symlink to read
1253 :param size: the length of the buffer
1254 :rtype buf: buffer to hold the path of the file that the symlink points to.
1256 self.require_state("mounted")
1257 path = cstr(path, 'path')
1261 int64_t _size = size
1265 buf = <char *>realloc_chk(buf, _size)
1267 ret = ceph_readlink(self.cluster, _path, buf, _size)
1269 raise make_ex(ret, "error in readlink")
1274 def unlink(self, path):
1276 Removes a file, link, or symbolic link. If the file/link has multiple links to it, the
1277 file will not disappear from the namespace until all references to it are removed.
1279 :param path: the path of the file or link to unlink.
1281 self.require_state("mounted")
1282 path = cstr(path, 'path')
1283 cdef char* _path = path
1285 ret = ceph_unlink(self.cluster, _path)
1287 raise make_ex(ret, "error in unlink: {}".format(path.decode('utf-8')))
1289 def rename(self, src, dst):
1291 Rename a file or directory.
1293 :param src: the path to the existing file or directory.
1294 :param dst: the new name of the file or directory.
1297 self.require_state("mounted")
1299 src = cstr(src, 'source')
1300 dst = cstr(dst, 'destination')
1307 ret = ceph_rename(self.cluster, _src, _dst)
1309 raise make_ex(ret, "error in rename {} to {}".format(src.decode(
1310 'utf-8'), dst.decode('utf-8')))
1312 def mds_command(self, mds_spec, args, input_data):
1314 :return 3-tuple of output status int, output status string, output data
1316 mds_spec = cstr(mds_spec, 'mds_spec')
1317 args = cstr_list(args, 'args')
1318 input_data = cstr(input_data, 'input_data')
1321 char *_mds_spec = opt_str(mds_spec)
1322 char **_cmd = to_bytes_array(args)
1323 size_t _cmdlen = len(args)
1325 char *_inbuf = input_data
1326 size_t _inbuf_len = len(input_data)
1328 char *_outbuf = NULL
1329 size_t _outbuf_len = 0
1331 size_t _outs_len = 0
1335 ret = ceph_mds_command(self.cluster, _mds_spec,
1336 <const char **>_cmd, _cmdlen,
1337 <const char*>_inbuf, _inbuf_len,
1338 &_outbuf, &_outbuf_len,
1340 my_outs = decode_cstr(_outs[:_outs_len])
1341 my_outbuf = _outbuf[:_outbuf_len]
1343 ceph_buffer_free(_outs)
1345 ceph_buffer_free(_outbuf)
1346 return (ret, my_outbuf, my_outs)
1350 def umask(self, mode) :
1351 self.require_state("mounted")
1355 ret = ceph_umask(self.cluster, _mode)
1357 raise make_ex(ret, "error in umask")
1360 def lseek(self, fd, offset, whence):
1362 Set the file's current position.
1364 :param fd : the file descriptor of the open file to read from.
1365 :param offset : the offset in the file to read from. If this value is negative, the
1366 function reads from the current offset of the file descriptor.
1367 :param whence : the flag to indicate what type of seeking to performs:SEEK_SET, SEEK_CUR, SEEK_END
1369 self.require_state("mounted")
1370 if not isinstance(fd, int):
1371 raise TypeError('fd must be an int')
1372 if not isinstance(offset, int):
1373 raise TypeError('offset must be an int')
1374 if not isinstance(whence, int):
1375 raise TypeError('whence must be an int')
1379 int64_t _offset = offset
1380 int64_t _whence = whence
1383 ret = ceph_lseek(self.cluster, _fd, _offset, _whence)
1386 raise make_ex(ret, "error in lseek")