2 This module is a thin wrapper around libcephfs.
5 from cpython cimport PyObject, ref, exc
6 from libc cimport errno
7 from libc.stdint cimport *
8 from libc.stdlib cimport malloc, realloc, free
12 from collections import namedtuple
13 from datetime import datetime
18 # Are we running Python 2.x
19 if sys.version_info[0] < 3:
25 cdef extern from "Python.h":
26 # These are in cpython/string.pxd, but use "object" types instead of
27 # PyObject*, which invokes assumptions in cpython that we need to
28 # legitimately break to implement zero-copy string buffers in Image.read().
29 # This is valid use of the Python API and documented as a special case.
30 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
31 char* PyBytes_AsString(PyObject *string) except NULL
32 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
33 void PyEval_InitThreads()
36 cdef extern from "sys/statvfs.h":
38 unsigned long int f_bsize
39 unsigned long int f_frsize
40 unsigned long int f_blocks
41 unsigned long int f_bfree
42 unsigned long int f_bavail
43 unsigned long int f_files
44 unsigned long int f_ffree
45 unsigned long int f_favail
46 unsigned long int f_fsid
47 unsigned long int f_flag
48 unsigned long int f_namemax
49 unsigned long int f_padding[32]
52 cdef extern from "dirent.h":
55 unsigned long int d_off
56 unsigned short int d_reclen
61 cdef extern from "time.h":
62 ctypedef long int time_t
64 cdef extern from "time.h":
69 cdef extern from "sys/types.h":
70 ctypedef unsigned long mode_t
72 cdef extern from "cephfs/ceph_statx.h":
73 cdef struct statx "ceph_statx":
91 cdef extern from "cephfs/libcephfs.h" nogil:
92 cdef struct ceph_mount_info:
95 cdef struct ceph_dir_result:
98 ctypedef void* rados_t
100 const char *ceph_version(int *major, int *minor, int *patch)
102 int ceph_create(ceph_mount_info **cmount, const char * const id)
103 int ceph_create_from_rados(ceph_mount_info **cmount, rados_t cluster)
104 int ceph_init(ceph_mount_info *cmount)
105 void ceph_shutdown(ceph_mount_info *cmount)
107 int ceph_conf_read_file(ceph_mount_info *cmount, const char *path_list)
108 int ceph_conf_parse_argv(ceph_mount_info *cmount, int argc, const char **argv)
109 int ceph_conf_get(ceph_mount_info *cmount, const char *option, char *buf, size_t len)
110 int ceph_conf_set(ceph_mount_info *cmount, const char *option, const char *value)
112 int ceph_mount(ceph_mount_info *cmount, const char *root)
113 int ceph_select_filesystem(ceph_mount_info *cmount, const char *fs_name)
114 int ceph_unmount(ceph_mount_info *cmount)
115 int ceph_abort_conn(ceph_mount_info *cmount)
116 uint64_t ceph_get_instance_id(ceph_mount_info *cmount)
117 int ceph_fstatx(ceph_mount_info *cmount, int fd, statx *stx, unsigned want, unsigned flags)
118 int ceph_statx(ceph_mount_info *cmount, const char *path, statx *stx, unsigned want, unsigned flags)
119 int ceph_statfs(ceph_mount_info *cmount, const char *path, statvfs *stbuf)
121 int ceph_mds_command(ceph_mount_info *cmount, const char *mds_spec, const char **cmd, size_t cmdlen,
122 const char *inbuf, size_t inbuflen, char **outbuf, size_t *outbuflen,
123 char **outs, size_t *outslen)
124 int ceph_rename(ceph_mount_info *cmount, const char *from_, const char *to)
125 int ceph_link(ceph_mount_info *cmount, const char *existing, const char *newname)
126 int ceph_unlink(ceph_mount_info *cmount, const char *path)
127 int ceph_symlink(ceph_mount_info *cmount, const char *existing, const char *newname)
128 int ceph_readlink(ceph_mount_info *cmount, const char *path, char *buf, int64_t size)
129 int ceph_setxattr(ceph_mount_info *cmount, const char *path, const char *name,
130 const void *value, size_t size, int flags)
131 int ceph_getxattr(ceph_mount_info *cmount, const char *path, const char *name,
132 void *value, size_t size)
133 int ceph_write(ceph_mount_info *cmount, int fd, const char *buf, int64_t size, int64_t offset)
134 int ceph_read(ceph_mount_info *cmount, int fd, char *buf, int64_t size, int64_t offset)
135 int ceph_flock(ceph_mount_info *cmount, int fd, int operation, uint64_t owner)
136 int ceph_close(ceph_mount_info *cmount, int fd)
137 int ceph_open(ceph_mount_info *cmount, const char *path, int flags, mode_t mode)
138 int ceph_mkdir(ceph_mount_info *cmount, const char *path, mode_t mode)
139 int ceph_mkdirs(ceph_mount_info *cmount, const char *path, mode_t mode)
140 int ceph_closedir(ceph_mount_info *cmount, ceph_dir_result *dirp)
141 int ceph_opendir(ceph_mount_info *cmount, const char *name, ceph_dir_result **dirpp)
142 int ceph_chdir(ceph_mount_info *cmount, const char *path)
143 dirent * ceph_readdir(ceph_mount_info *cmount, ceph_dir_result *dirp)
144 int ceph_rmdir(ceph_mount_info *cmount, const char *path)
145 const char* ceph_getcwd(ceph_mount_info *cmount)
146 int ceph_sync_fs(ceph_mount_info *cmount)
147 int ceph_fsync(ceph_mount_info *cmount, int fd, int syncdataonly)
148 int ceph_conf_parse_argv(ceph_mount_info *cmount, int argc, const char **argv)
149 int ceph_chmod(ceph_mount_info *cmount, const char *path, mode_t mode)
150 void ceph_buffer_free(char *buf)
151 mode_t ceph_umask(ceph_mount_info *cmount, mode_t mode)
154 class Error(Exception):
158 class OSError(Error):
159 def __init__(self, errno, strerror):
161 self.strerror = strerror
164 return '[Errno {0}] {1}'.format(self.errno, self.strerror)
167 class PermissionError(OSError):
171 class ObjectNotFound(OSError):
175 class NoData(OSError):
179 class ObjectExists(OSError):
183 class IOError(OSError):
187 class NoSpace(OSError):
191 class InvalidValue(OSError):
195 class OperationNotSupported(OSError):
199 class LibCephFSStateError(Error):
203 class WouldBlock(OSError):
207 class OutOfRange(OSError):
211 IF UNAME_SYSNAME == "FreeBSD":
212 cdef errno_to_exception = {
213 errno.EPERM : PermissionError,
214 errno.ENOENT : ObjectNotFound,
216 errno.ENOSPC : NoSpace,
217 errno.EEXIST : ObjectExists,
218 errno.ENOATTR : NoData,
219 errno.EINVAL : InvalidValue,
220 errno.EOPNOTSUPP : OperationNotSupported,
221 errno.ERANGE : OutOfRange,
222 errno.EWOULDBLOCK: WouldBlock,
225 cdef errno_to_exception = {
226 errno.EPERM : PermissionError,
227 errno.ENOENT : ObjectNotFound,
229 errno.ENOSPC : NoSpace,
230 errno.EEXIST : ObjectExists,
231 errno.ENODATA : NoData,
232 errno.EINVAL : InvalidValue,
233 errno.EOPNOTSUPP : OperationNotSupported,
234 errno.ERANGE : OutOfRange,
235 errno.EWOULDBLOCK: WouldBlock,
239 cdef make_ex(ret, msg):
241 Translate a librados return code into an exception.
243 :param ret: the return code
245 :param msg: the error message to use
247 :returns: a subclass of :class:`Error`
250 if ret in errno_to_exception:
251 return errno_to_exception[ret](ret, msg)
253 return Error(ret, msg + (": error code %d" % ret))
256 class DirEntry(namedtuple('DirEntry',
257 ['d_ino', 'd_off', 'd_reclen', 'd_type', 'd_name'])):
262 return self.d_type == self.DT_DIR
264 def is_symbol_file(self):
265 return self.d_type == self.DT_LNK
268 return self.d_type == self.DT_REG
270 StatResult = namedtuple('StatResult',
271 ["st_dev", "st_ino", "st_mode", "st_nlink", "st_uid",
272 "st_gid", "st_rdev", "st_size", "st_blksize",
273 "st_blocks", "st_atime", "st_mtime", "st_ctime"])
275 cdef class DirResult(object):
276 cdef ceph_dir_result *handler
279 def cstr(val, name, encoding="utf-8", opt=False):
281 Create a byte string from a Python string
283 :param basestring val: Python string
284 :param str name: Name of the string parameter, for exceptions
285 :param str encoding: Encoding to use
286 :param bool opt: If True, None is allowed
288 :raises: :class:`InvalidArgument`
290 if opt and val is None:
292 if isinstance(val, bytes):
294 elif isinstance(val, unicode):
295 return val.encode(encoding)
297 raise TypeError('%s must be a string' % name)
300 def cstr_list(list_str, name, encoding="utf-8"):
301 return [cstr(s, name) for s in list_str]
304 def decode_cstr(val, encoding="utf-8"):
306 Decode a byte string into a Python string.
308 :param bytes val: byte string
309 :rtype: unicode or None
314 return val.decode(encoding)
317 cdef char* opt_str(s) except? NULL:
323 cdef char ** to_bytes_array(list_bytes):
324 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
326 raise MemoryError("malloc failed")
327 for i in xrange(len(list_bytes)):
328 ret[i] = <char *>list_bytes[i]
332 cdef void* realloc_chk(void* ptr, size_t size) except NULL:
333 cdef void *ret = realloc(ptr, size)
335 raise MemoryError("realloc failed")
339 cdef class LibCephFS(object):
340 """libcephfs python wrapper"""
342 cdef public object state
343 cdef ceph_mount_info *cluster
345 def require_state(self, *args):
346 if self.state in args:
348 raise LibCephFSStateError("You cannot perform that operation on a "
349 "CephFS object in state %s." % (self.state))
351 def __cinit__(self, conf=None, conffile=None, auth_id=None, rados_inst=None):
352 """Create a libcephfs wrapper
354 :param conf dict opt: settings overriding the default ones and conffile
355 :param conffile str opt: the path to ceph.conf to override the default settings
356 :auth_id str opt: the id used to authenticate the client entity
357 :rados_inst Rados opt: a rados.Rados instance
360 self.state = "uninitialized"
361 if rados_inst is not None:
362 if auth_id is not None or conffile is not None or conf is not None:
363 raise make_ex(errno.EINVAL,
364 "May not pass RADOS instance as well as other configuration")
366 self.create_with_rados(rados_inst)
368 self.create(conf, conffile, auth_id)
370 def create_with_rados(self, rados.Rados rados_inst):
373 ret = ceph_create_from_rados(&self.cluster, rados_inst.cluster)
375 raise Error("libcephfs_initialize failed with error code: %d" % ret)
376 self.state = "configuring"
378 def create(self, conf=None, conffile=None, auth_id=None):
380 Create a mount handle for interacting with Ceph. All libcephfs
381 functions operate on a mount info handle.
383 :param conf dict opt: settings overriding the default ones and conffile
384 :param conffile str opt: the path to ceph.conf to override the default settings
385 :auth_id str opt: the id used to authenticate the client entity
387 if conf is not None and not isinstance(conf, dict):
388 raise TypeError("conf must be dict or None")
389 cstr(conffile, 'configfile', opt=True)
390 auth_id = cstr(auth_id, 'auth_id', opt=True)
393 char* _auth_id = opt_str(auth_id)
397 ret = ceph_create(&self.cluster, <const char*>_auth_id)
399 raise Error("libcephfs_initialize failed with error code: %d" % ret)
401 self.state = "configuring"
402 if conffile is not None:
403 # read the default conf file when '' is given
406 self.conf_read_file(conffile)
408 for key, value in conf.iteritems():
409 self.conf_set(key, value)
411 def conf_read_file(self, conffile=None):
413 Load the ceph configuration from the specified config file.
415 :param conffile str opt: the path to ceph.conf to override the default settings
417 conffile = cstr(conffile, 'conffile', opt=True)
419 char *_conffile = opt_str(conffile)
421 ret = ceph_conf_read_file(self.cluster, <const char*>_conffile)
423 raise make_ex(ret, "error calling conf_read_file")
425 def conf_parse_argv(self, argv):
427 Parse the command line arguments and load the configuration parameters.
429 :param argv: the argument list
431 self.require_state("configuring")
432 cargv = cstr_list(argv, 'argv')
434 int _argc = len(argv)
435 char **_argv = to_bytes_array(cargv)
439 ret = ceph_conf_parse_argv(self.cluster, _argc,
440 <const char **>_argv)
442 raise make_ex(ret, "error calling conf_parse_argv")
448 Unmount and destroy the ceph mount handle.
450 if self.state in ["initialized", "mounted"]:
452 ceph_shutdown(self.cluster)
453 self.state = "shutdown"
459 def __exit__(self, type_, value, traceback):
463 def __dealloc__(self):
468 Get the version number of the ``libcephfs`` C library.
470 :returns: a tuple of ``(major, minor, extra)`` components of the
478 ceph_version(&major, &minor, &extra)
479 return (major, minor, extra)
481 def conf_get(self, option):
483 Gets the configuration value as a string.
485 :param option: the config option to get
487 self.require_state("configuring", "initialized", "mounted")
489 option = cstr(option, 'option')
491 char *_option = option
497 ret_buf = <char *>realloc_chk(ret_buf, length)
499 ret = ceph_conf_get(self.cluster, _option, ret_buf, length)
501 return decode_cstr(ret_buf)
502 elif ret == -errno.ENAMETOOLONG:
504 elif ret == -errno.ENOENT:
507 raise make_ex(ret, "error calling conf_get")
511 def conf_set(self, option, val):
513 Sets a configuration value from a string.
515 :param option: the configuration option to set
516 :param value: the value of the configuration option to set
518 self.require_state("configuring", "initialized", "mounted")
520 option = cstr(option, 'option')
521 val = cstr(val, 'val')
523 char *_option = option
527 ret = ceph_conf_set(self.cluster, _option, _val)
529 raise make_ex(ret, "error calling conf_set")
533 Initialize the filesystem client (but do not mount the filesystem yet)
535 self.require_state("configuring")
537 ret = ceph_init(self.cluster)
539 raise make_ex(ret, "error calling ceph_init")
540 self.state = "initialized"
542 def mount(self, mount_root=None, filesystem_name=None):
544 Perform a mount using the path for the root of the mount.
546 if self.state == "configuring":
548 self.require_state("initialized")
550 # Configure which filesystem to mount if one was specified
551 if filesystem_name is None:
552 filesystem_name = b""
554 char *_filesystem_name = filesystem_name
557 ret = ceph_select_filesystem(self.cluster,
560 raise make_ex(ret, "error calling ceph_select_filesystem")
562 # Prepare mount_root argument, default to "/"
563 root = b"/" if mount_root is None else mount_root
565 char *_mount_root = root
568 ret = ceph_mount(self.cluster, _mount_root)
570 raise make_ex(ret, "error calling ceph_mount")
571 self.state = "mounted"
575 Unmount a mount handle.
577 self.require_state("mounted")
579 ret = ceph_unmount(self.cluster)
581 raise make_ex(ret, "error calling ceph_unmount")
582 self.state = "initialized"
584 def abort_conn(self):
586 Abort mds connections.
588 self.require_state("mounted")
590 ret = ceph_abort_conn(self.cluster)
592 raise make_ex(ret, "error calling ceph_abort_conn")
593 self.state = "initialized"
595 def get_instance_id(self):
597 Get a global id for current instance
599 self.require_state("initialized", "mounted")
601 ret = ceph_get_instance_id(self.cluster)
604 def statfs(self, path):
606 Perform a statfs on the ceph file system. This call fills in file system wide statistics
607 into the passed in buffer.
609 :param path: any path within the mounted filesystem
611 self.require_state("mounted")
612 path = cstr(path, 'path')
618 ret = ceph_statfs(self.cluster, _path, &statbuf)
620 raise make_ex(ret, "statfs failed: %s" % path)
621 return {'f_bsize': statbuf.f_bsize,
622 'f_frsize': statbuf.f_frsize,
623 'f_blocks': statbuf.f_blocks,
624 'f_bfree': statbuf.f_bfree,
625 'f_bavail': statbuf.f_bavail,
626 'f_files': statbuf.f_files,
627 'f_ffree': statbuf.f_ffree,
628 'f_favail': statbuf.f_favail,
629 'f_fsid': statbuf.f_fsid,
630 'f_flag': statbuf.f_flag,
631 'f_namemax': statbuf.f_namemax}
635 Synchronize all filesystem data to persistent media
637 self.require_state("mounted")
639 ret = ceph_sync_fs(self.cluster)
641 raise make_ex(ret, "sync_fs failed")
643 def fsync(self, int fd, int syncdataonly):
645 Synchronize an open file to persistent media.
647 :param fd: the file descriptor of the file to sync.
648 :param syncdataonly: a boolean whether to synchronize metadata and data (0)
651 self.require_state("mounted")
653 ret = ceph_fsync(self.cluster, fd, syncdataonly)
655 raise make_ex(ret, "fsync failed")
659 Get the current working directory.
661 :rtype the path to the current working directory
663 self.require_state("mounted")
665 ret = ceph_getcwd(self.cluster)
668 def chdir(self, path):
670 Change the current working directory.
672 :param path the path to the working directory to change into.
674 self.require_state("mounted")
676 path = cstr(path, 'path')
677 cdef char* _path = path
679 ret = ceph_chdir(self.cluster, _path)
681 raise make_ex(ret, "chdir failed")
683 def opendir(self, path):
685 Open the given directory.
687 :param path: the path name of the directory to open. Must be either an absolute path
688 or a path relative to the current working directory.
689 :param dir_handler: the directory result pointer structure to fill in.
691 self.require_state("mounted")
693 path = cstr(path, 'path')
696 ceph_dir_result *dir_handler
698 ret = ceph_opendir(self.cluster, _path, &dir_handler);
700 raise make_ex(ret, "opendir failed")
702 d.handler = dir_handler
705 def readdir(self, DirResult dir_handler):
707 Get the next entry in an open directory.
709 :param dir_handler: the directory stream pointer from an opendir holding the state of the
710 next entry to return.
711 :rtype dir_handler: the next directory entry or NULL if at the end of the directory (or the directory is empty.
712 This pointer should not be freed by the caller, and is only safe to access between return and
713 the next call to readdir or closedir.
715 self.require_state("mounted")
717 cdef ceph_dir_result *_dir_handler = dir_handler.handler
719 dirent = ceph_readdir(self.cluster, _dir_handler)
723 d_name = dirent.d_name if sys.version[0:2] == '2.' else dirent.d_name.\
725 return DirEntry(d_ino=dirent.d_ino,
727 d_reclen=dirent.d_reclen,
728 d_type=dirent.d_type,
731 def closedir(self, DirResult dir_handler):
733 Close the open directory.
735 :param dir_handler: the directory result pointer (set by ceph_opendir) to close
737 self.require_state("mounted")
739 ceph_dir_result *_dir_handler = dir_handler.handler
742 ret = ceph_closedir(self.cluster, _dir_handler)
744 raise make_ex(ret, "closedir failed")
746 def mkdir(self, path, mode):
750 :param path: the path of the directory to create. This must be either an
751 absolute path or a relative path off of the current working directory.
752 :param mode the permissions the directory should have once created.
755 self.require_state("mounted")
756 path = cstr(path, 'path')
757 if not isinstance(mode, int):
758 raise TypeError('mode must be an int')
763 ret = ceph_mkdir(self.cluster, _path, _mode)
765 raise make_ex(ret, "error in mkdir '%s'" % path)
767 def chmod(self, path, mode) :
769 Change directory mode.
770 :param path: the path of the directory to create. This must be either an
771 absolute path or a relative path off of the current working directory.
772 :param mode the permissions the directory should have once created.
774 self.require_state("mounted")
775 path = cstr(path, 'path')
776 if not isinstance(mode, int):
777 raise TypeError('mode must be an int')
782 ret = ceph_chmod(self.cluster, _path, _mode)
784 raise make_ex(ret, "error in chmod '%s'" % path)
786 def mkdirs(self, path, mode):
788 Create multiple directories at once.
790 :param path: the full path of directories and sub-directories that should
792 :param mode the permissions the directory should have once created
794 self.require_state("mounted")
795 path = cstr(path, 'path')
796 if not isinstance(mode, int):
797 raise TypeError('mode must be an int')
803 ret = ceph_mkdirs(self.cluster, _path, _mode)
805 raise make_ex(ret, "error in mkdirs '%s'" % path)
807 def rmdir(self, path):
811 :param path: the path of the directory to remove.
813 self.require_state("mounted")
814 path = cstr(path, 'path')
815 cdef char* _path = path
816 ret = ceph_rmdir(self.cluster, _path)
818 raise make_ex(ret, "error in rmdir '%s'" % path)
820 def open(self, path, flags, mode=0):
822 Create and/or open a file.
824 :param path: the path of the file to open. If the flags parameter includes O_CREAT,
825 the file will first be created before opening.
826 :param flags: set of option masks that control how the file is created/opened.
827 :param mode: the permissions to place on the file if the file does not exist and O_CREAT
828 is specified in the flags.
830 self.require_state("mounted")
831 path = cstr(path, 'path')
833 if not isinstance(mode, int):
834 raise TypeError('mode must be an int')
835 if isinstance(flags, str):
838 cephfs_flags = os.O_RDONLY
846 cephfs_flags |= os.O_TRUNC | os.O_CREAT
847 elif access_flags > 0 and c == '+':
850 raise make_ex(errno.EOPNOTSUPP,
851 "open flags doesn't support %s" % c)
853 if access_flags == 1:
854 cephfs_flags |= os.O_RDONLY;
855 elif access_flags == 2:
856 cephfs_flags |= os.O_WRONLY;
858 cephfs_flags |= os.O_RDWR;
860 elif isinstance(flags, int):
863 raise TypeError("flags must be a string or an integer")
867 int _flags = cephfs_flags
871 ret = ceph_open(self.cluster, _path, _flags, _mode)
873 raise make_ex(ret, "error in open '%s'" % path)
880 :param fd: the file descriptor referring to the open file.
883 self.require_state("mounted")
884 if not isinstance(fd, int):
885 raise TypeError('fd must be an int')
888 ret = ceph_close(self.cluster, _fd)
890 raise make_ex(ret, "error in close")
892 def read(self, fd, offset, l):
894 Read data from the file.
896 :param fd : the file descriptor of the open file to read from.
897 :param offset : the offset in the file to read from. If this value is negative, the
898 function reads from the current offset of the file descriptor.
899 :param l : the flag to indicate what type of seeking to perform
901 self.require_state("mounted")
902 if not isinstance(offset, int):
903 raise TypeError('offset must be an int')
904 if not isinstance(l, int):
905 raise TypeError('l must be an int')
906 if not isinstance(fd, int):
907 raise TypeError('fd must be an int')
910 int64_t _offset = offset
914 PyObject* ret_s = NULL
916 ret_s = PyBytes_FromStringAndSize(NULL, _length)
918 ret_buf = PyBytes_AsString(ret_s)
920 ret = ceph_read(self.cluster, _fd, ret_buf, _length, _offset)
922 raise make_ex(ret, "error in read")
925 _PyBytes_Resize(&ret_s, ret)
929 # We DECREF unconditionally: the cast to object above will have
930 # INCREFed if necessary. This also takes care of exceptions,
931 # including if _PyString_Resize fails (that will free the string
932 # itself and set ret_s to NULL, hence XDECREF).
933 ref.Py_XDECREF(ret_s)
935 def write(self, fd, buf, offset):
937 Write data to a file.
939 :param fd : the file descriptor of the open file to write to
940 :param buf : the bytes to write to the file
941 :param offset : the offset of the file write into. If this value is negative, the
942 function writes to the current offset of the file descriptor.
944 self.require_state("mounted")
945 if not isinstance(fd, int):
946 raise TypeError('fd must be an int')
947 if not isinstance(buf, bytes):
948 raise TypeError('buf must be a bytes')
949 if not isinstance(offset, int):
950 raise TypeError('offset must be an int')
955 int64_t _offset = offset
957 size_t length = len(buf)
960 ret = ceph_write(self.cluster, _fd, _data, length, _offset)
962 raise make_ex(ret, "error in write")
965 def flock(self, fd, operation, owner):
967 Apply or remove an advisory lock.
969 :param fd: the open file descriptor to change advisory lock.
970 :param operation: the advisory lock operation to be performed on the file
971 :param owner: the user-supplied owner identifier (an arbitrary integer)
973 self.require_state("mounted")
974 if not isinstance(fd, int):
975 raise TypeError('fd must be an int')
976 if not isinstance(operation, int):
977 raise TypeError('operation must be an int')
982 uint64_t _owner = owner
985 ret = ceph_flock(self.cluster, _fd, _op, _owner)
987 raise make_ex(ret, "error in write")
990 def getxattr(self, path, name, size=255):
992 Get an extended attribute.
994 :param path: the path to the file
995 :param name: the name of the extended attribute to get
996 :param size: the size of the pre-allocated buffer
998 self.require_state("mounted")
1000 path = cstr(path, 'path')
1001 name = cstr(name, 'name')
1007 size_t ret_length = size
1008 char *ret_buf = NULL
1011 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
1013 ret = ceph_getxattr(self.cluster, _path, _name, ret_buf,
1017 raise make_ex(ret, "error in getxattr")
1019 return ret_buf[:ret]
1023 def setxattr(self, path, name, value, flags):
1025 Set an extended attribute on a file.
1027 :param path: the path to the file.
1028 :param name: the name of the extended attribute to set.
1029 :param value: the bytes of the extended attribute value
1031 self.require_state("mounted")
1033 name = cstr(name, 'name')
1034 path = cstr(path, 'path')
1035 if not isinstance(flags, int):
1036 raise TypeError('flags must be a int')
1037 if not isinstance(value, bytes):
1038 raise TypeError('value must be a bytes')
1043 char *_value = value
1044 size_t _value_len = len(value)
1048 ret = ceph_setxattr(self.cluster, _path, _name,
1049 _value, _value_len, _flags)
1051 raise make_ex(ret, "error in setxattr")
1054 def stat(self, path):
1056 Get a file's extended statistics and attributes.
1058 :param path: the file or directory to get the statistics of.
1060 self.require_state("mounted")
1061 path = cstr(path, 'path')
1068 # FIXME: replace magic number with CEPH_STATX_BASIC_STATS
1069 ret = ceph_statx(self.cluster, _path, &stx, 0x7ffu, 0)
1071 raise make_ex(ret, "error in stat: %s" % path)
1072 return StatResult(st_dev=stx.stx_dev, st_ino=stx.stx_ino,
1073 st_mode=stx.stx_mode, st_nlink=stx.stx_nlink,
1074 st_uid=stx.stx_uid, st_gid=stx.stx_gid,
1075 st_rdev=stx.stx_rdev, st_size=stx.stx_size,
1076 st_blksize=stx.stx_blksize,
1077 st_blocks=stx.stx_blocks,
1078 st_atime=datetime.fromtimestamp(stx.stx_atime.tv_sec),
1079 st_mtime=datetime.fromtimestamp(stx.stx_mtime.tv_sec),
1080 st_ctime=datetime.fromtimestamp(stx.stx_ctime.tv_sec))
1082 def fstat(self, fd):
1084 Get an open file's extended statistics and attributes.
1086 :param fd: the file descriptor of the file to get statistics of.
1088 self.require_state("mounted")
1089 if not isinstance(fd, int):
1090 raise TypeError('fd must be an int')
1097 # FIXME: replace magic number with CEPH_STATX_BASIC_STATS
1098 ret = ceph_fstatx(self.cluster, _fd, &stx, 0x7ffu, 0)
1100 raise make_ex(ret, "error in fsat")
1101 return StatResult(st_dev=stx.stx_dev, st_ino=stx.stx_ino,
1102 st_mode=stx.stx_mode, st_nlink=stx.stx_nlink,
1103 st_uid=stx.stx_uid, st_gid=stx.stx_gid,
1104 st_rdev=stx.stx_rdev, st_size=stx.stx_size,
1105 st_blksize=stx.stx_blksize,
1106 st_blocks=stx.stx_blocks,
1107 st_atime=datetime.fromtimestamp(stx.stx_atime.tv_sec),
1108 st_mtime=datetime.fromtimestamp(stx.stx_mtime.tv_sec),
1109 st_ctime=datetime.fromtimestamp(stx.stx_ctime.tv_sec))
1111 def symlink(self, existing, newname):
1113 Creates a symbolic link.
1115 :param existing: the path to the existing file/directory to link to.
1116 :param newname: the path to the new file/directory to link from.
1118 self.require_state("mounted")
1119 existing = cstr(existing, 'existing')
1120 newname = cstr(newname, 'newname')
1122 char* _existing = existing
1123 char* _newname = newname
1126 ret = ceph_symlink(self.cluster, _existing, _newname)
1128 raise make_ex(ret, "error in symlink")
1130 def link(self, existing, newname):
1134 :param existing: the path to the existing file/directory to link to.
1135 :param newname: the path to the new file/directory to link from.
1138 self.require_state("mounted")
1139 existing = cstr(existing, 'existing')
1140 newname = cstr(newname, 'newname')
1142 char* _existing = existing
1143 char* _newname = newname
1146 ret = ceph_link(self.cluster, _existing, _newname)
1148 raise make_ex(ret, "error in link")
1150 def readlink(self, path, size):
1152 Read a symbolic link.
1154 :param path: the path to the symlink to read
1155 :param size: the length of the buffer
1156 :rtype buf: buffer to hold the path of the file that the symlink points to.
1158 self.require_state("mounted")
1159 path = cstr(path, 'path')
1163 int64_t _size = size
1167 buf = <char *>realloc_chk(buf, _size)
1169 ret = ceph_readlink(self.cluster, _path, buf, _size)
1171 raise make_ex(ret, "error in readlink")
1176 def unlink(self, path):
1178 Removes a file, link, or symbolic link. If the file/link has multiple links to it, the
1179 file will not disappear from the namespace until all references to it are removed.
1181 :param path: the path of the file or link to unlink.
1183 self.require_state("mounted")
1184 path = cstr(path, 'path')
1185 cdef char* _path = path
1187 ret = ceph_unlink(self.cluster, _path)
1189 raise make_ex(ret, "error in unlink: %s" % path)
1191 def rename(self, src, dst):
1193 Rename a file or directory.
1195 :param src: the path to the existing file or directory.
1196 :param dst: the new name of the file or directory.
1199 self.require_state("mounted")
1201 src = cstr(src, 'source')
1202 dst = cstr(dst, 'destination')
1209 ret = ceph_rename(self.cluster, _src, _dst)
1211 raise make_ex(ret, "error in rename '%s' to '%s'" % (src, dst))
1213 def mds_command(self, mds_spec, args, input_data):
1215 :return 3-tuple of output status int, output status string, output data
1217 mds_spec = cstr(mds_spec, 'mds_spec')
1218 args = cstr_list(args, 'args')
1219 input_data = cstr(input_data, 'input_data')
1222 char *_mds_spec = opt_str(mds_spec)
1223 char **_cmd = to_bytes_array(args)
1224 size_t _cmdlen = len(args)
1226 char *_inbuf = input_data
1227 size_t _inbuf_len = len(input_data)
1229 char *_outbuf = NULL
1230 size_t _outbuf_len = 0
1232 size_t _outs_len = 0
1236 ret = ceph_mds_command(self.cluster, _mds_spec,
1237 <const char **>_cmd, _cmdlen,
1238 <const char*>_inbuf, _inbuf_len,
1239 &_outbuf, &_outbuf_len,
1241 my_outs = decode_cstr(_outs[:_outs_len])
1242 my_outbuf = _outbuf[:_outbuf_len]
1244 ceph_buffer_free(_outs)
1246 ceph_buffer_free(_outbuf)
1247 return (ret, my_outbuf, my_outs)
1251 def umask(self, mode) :
1252 self.require_state("mounted")
1256 ret = ceph_umask(self.cluster, _mode)
1258 raise make_ex(ret, "error in umask")