1 # cython: embedsignature=True, binding=True
3 This module is a thin wrapper around librados.
5 Error codes from librados are turned into exceptions that subclass
6 :class:`Error`. Almost all methods may raise :class:`Error(the base class of all rados exceptions), :class:`PermissionError`
7 (the base class of all rados exceptions), :class:`PermissionError`
8 and :class:`IOError`, in addition to those documented for the
11 # Copyright 2011 Josh Durgin
12 # Copyright 2011, Hannu Valtonen <hannu.valtonen@ormod.com>
13 # Copyright 2015 Hector Martin <marcan@marcan.st>
14 # Copyright 2016 Mehdi Abaakouk <sileht@redhat.com>
16 from cpython cimport PyObject, ref
17 from cpython.pycapsule cimport *
18 from libc cimport errno
19 from libc.stdint cimport *
20 from libc.stdlib cimport malloc, realloc, free
22 include "mock_rados.pxi"
24 from c_rados cimport *
29 from datetime import datetime, timedelta
30 from functools import partial, wraps
31 from itertools import chain
32 from typing import Callable, Dict, List, Optional, Sequence, Tuple, Union
34 cdef extern from "Python.h":
35 # These are in cpython/string.pxd, but use "object" types instead of
36 # PyObject*, which invokes assumptions in cpython that we need to
37 # legitimately break to implement zero-copy string buffers in Ioctx.read().
38 # This is valid use of the Python API and documented as a special case.
39 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
40 char* PyBytes_AsString(PyObject *string) except NULL
41 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
42 void PyEval_InitThreads()
44 LIBRADOS_OP_FLAG_EXCL = _LIBRADOS_OP_FLAG_EXCL
45 LIBRADOS_OP_FLAG_FAILOK = _LIBRADOS_OP_FLAG_FAILOK
46 LIBRADOS_OP_FLAG_FADVISE_RANDOM = _LIBRADOS_OP_FLAG_FADVISE_RANDOM
47 LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL = _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL
48 LIBRADOS_OP_FLAG_FADVISE_WILLNEED = _LIBRADOS_OP_FLAG_FADVISE_WILLNEED
49 LIBRADOS_OP_FLAG_FADVISE_DONTNEED = _LIBRADOS_OP_FLAG_FADVISE_DONTNEED
50 LIBRADOS_OP_FLAG_FADVISE_NOCACHE = _LIBRADOS_OP_FLAG_FADVISE_NOCACHE
52 LIBRADOS_CMPXATTR_OP_EQ = _LIBRADOS_CMPXATTR_OP_EQ
53 LIBRADOS_CMPXATTR_OP_NE = _LIBRADOS_CMPXATTR_OP_NE
54 LIBRADOS_CMPXATTR_OP_GT = _LIBRADOS_CMPXATTR_OP_GT
55 LIBRADOS_CMPXATTR_OP_GTE = _LIBRADOS_CMPXATTR_OP_GTE
56 LIBRADOS_CMPXATTR_OP_LT = _LIBRADOS_CMPXATTR_OP_LT
57 LIBRADOS_CMPXATTR_OP_LTE = _LIBRADOS_CMPXATTR_OP_LTE
59 LIBRADOS_SNAP_HEAD = _LIBRADOS_SNAP_HEAD
61 LIBRADOS_OPERATION_NOFLAG = _LIBRADOS_OPERATION_NOFLAG
62 LIBRADOS_OPERATION_BALANCE_READS = _LIBRADOS_OPERATION_BALANCE_READS
63 LIBRADOS_OPERATION_LOCALIZE_READS = _LIBRADOS_OPERATION_LOCALIZE_READS
64 LIBRADOS_OPERATION_ORDER_READS_WRITES = _LIBRADOS_OPERATION_ORDER_READS_WRITES
65 LIBRADOS_OPERATION_IGNORE_CACHE = _LIBRADOS_OPERATION_IGNORE_CACHE
66 LIBRADOS_OPERATION_SKIPRWLOCKS = _LIBRADOS_OPERATION_SKIPRWLOCKS
67 LIBRADOS_OPERATION_IGNORE_OVERLAY = _LIBRADOS_OPERATION_IGNORE_OVERLAY
69 LIBRADOS_ALL_NSPACES = _LIBRADOS_ALL_NSPACES.decode('utf-8')
71 LIBRADOS_CREATE_EXCLUSIVE = _LIBRADOS_CREATE_EXCLUSIVE
72 LIBRADOS_CREATE_IDEMPOTENT = _LIBRADOS_CREATE_IDEMPOTENT
74 MAX_ERRNO = _MAX_ERRNO
76 ANONYMOUS_AUID = 0xffffffffffffffff
80 class Error(Exception):
81 """ `Error` class, derived from `Exception` """
82 def __init__(self, message, errno=None):
83 super(Exception, self).__init__(message)
87 msg = super(Exception, self).__str__()
88 if self.errno is None:
90 return '[errno {0}] {1}'.format(self.errno, msg)
93 return (self.__class__, (self.message, self.errno))
95 class InvalidArgumentError(Error):
96 def __init__(self, message, errno=None):
97 super(InvalidArgumentError, self).__init__(
98 "RADOS invalid argument (%s)" % message, errno)
100 class ExtendMismatch(Error):
101 def __init__(self, message, errno, offset):
103 "object content does not match (%s)" % message, errno)
106 class OSError(Error):
107 """ `OSError` class, derived from `Error` """
110 class InterruptedOrTimeoutError(OSError):
111 """ `InterruptedOrTimeoutError` class, derived from `OSError` """
112 def __init__(self, message, errno=None):
113 super(InterruptedOrTimeoutError, self).__init__(
114 "RADOS interrupted or timeout (%s)" % message, errno)
117 class PermissionError(OSError):
118 """ `PermissionError` class, derived from `OSError` """
119 def __init__(self, message, errno=None):
120 super(PermissionError, self).__init__(
121 "RADOS permission error (%s)" % message, errno)
124 class PermissionDeniedError(OSError):
125 """ deal with EACCES related. """
126 def __init__(self, message, errno=None):
127 super(PermissionDeniedError, self).__init__(
128 "RADOS permission denied (%s)" % message, errno)
131 class ObjectNotFound(OSError):
132 """ `ObjectNotFound` class, derived from `OSError` """
133 def __init__(self, message, errno=None):
134 super(ObjectNotFound, self).__init__(
135 "RADOS object not found (%s)" % message, errno)
138 class NoData(OSError):
139 """ `NoData` class, derived from `OSError` """
140 def __init__(self, message, errno=None):
141 super(NoData, self).__init__(
142 "RADOS no data (%s)" % message, errno)
145 class ObjectExists(OSError):
146 """ `ObjectExists` class, derived from `OSError` """
147 def __init__(self, message, errno=None):
148 super(ObjectExists, self).__init__(
149 "RADOS object exists (%s)" % message, errno)
152 class ObjectBusy(OSError):
153 """ `ObjectBusy` class, derived from `IOError` """
154 def __init__(self, message, errno=None):
155 super(ObjectBusy, self).__init__(
156 "RADOS object busy (%s)" % message, errno)
159 class IOError(OSError):
160 """ `ObjectBusy` class, derived from `OSError` """
161 def __init__(self, message, errno=None):
162 super(IOError, self).__init__(
163 "RADOS I/O error (%s)" % message, errno)
166 class NoSpace(OSError):
167 """ `NoSpace` class, derived from `OSError` """
168 def __init__(self, message, errno=None):
169 super(NoSpace, self).__init__(
170 "RADOS no space (%s)" % message, errno)
172 class NotConnected(OSError):
173 """ `NotConnected` class, derived from `OSError` """
174 def __init__(self, message, errno=None):
175 super(NotConnected, self).__init__(
176 "RADOS not connected (%s)" % message, errno)
178 class RadosStateError(Error):
179 """ `RadosStateError` class, derived from `Error` """
180 def __init__(self, message, errno=None):
181 super(RadosStateError, self).__init__(
182 "RADOS rados state (%s)" % message, errno)
185 class IoctxStateError(Error):
186 """ `IoctxStateError` class, derived from `Error` """
187 def __init__(self, message, errno=None):
188 super(IoctxStateError, self).__init__(
189 "RADOS Ioctx state error (%s)" % message, errno)
192 class ObjectStateError(Error):
193 """ `ObjectStateError` class, derived from `Error` """
194 def __init__(self, message, errno=None):
195 super(ObjectStateError, self).__init__(
196 "RADOS object state error (%s)" % message, errno)
199 class LogicError(Error):
200 """ `` class, derived from `Error` """
201 def __init__(self, message, errno=None):
202 super(LogicError, self).__init__(
203 "RADOS logic error (%s)" % message, errno)
206 class TimedOut(OSError):
207 """ `TimedOut` class, derived from `OSError` """
208 def __init__(self, message, errno=None):
209 super(TimedOut, self).__init__(
210 "RADOS timed out (%s)" % message, errno)
213 class InProgress(Error):
214 """ `InProgress` class, derived from `Error` """
215 def __init__(self, message, errno=None):
216 super(InProgress, self).__init__(
217 "RADOS in progress error (%s)" % message, errno)
220 class IsConnected(Error):
221 """ `IsConnected` class, derived from `Error` """
222 def __init__(self, message, errno=None):
223 super(IsConnected, self).__init__(
224 "RADOS is connected error (%s)" % message, errno)
227 class ConnectionShutdown(OSError):
228 """ `ConnectionShutdown` class, derived from `OSError` """
229 def __init__(self, message, errno=None):
230 super(ConnectionShutdown, self).__init__(
231 "RADOS connection was shutdown (%s)" % message, errno)
234 IF UNAME_SYSNAME == "FreeBSD":
235 cdef errno_to_exception = {
236 errno.EPERM : PermissionError,
237 errno.ENOENT : ObjectNotFound,
239 errno.ENOSPC : NoSpace,
240 errno.EEXIST : ObjectExists,
241 errno.EBUSY : ObjectBusy,
242 errno.ENOATTR : NoData,
243 errno.EINTR : InterruptedOrTimeoutError,
244 errno.ETIMEDOUT : TimedOut,
245 errno.EACCES : PermissionDeniedError,
246 errno.EINPROGRESS : InProgress,
247 errno.EISCONN : IsConnected,
248 errno.EINVAL : InvalidArgumentError,
249 errno.ENOTCONN : NotConnected,
250 errno.ESHUTDOWN : ConnectionShutdown,
253 cdef errno_to_exception = {
254 errno.EPERM : PermissionError,
255 errno.ENOENT : ObjectNotFound,
257 errno.ENOSPC : NoSpace,
258 errno.EEXIST : ObjectExists,
259 errno.EBUSY : ObjectBusy,
260 errno.ENODATA : NoData,
261 errno.EINTR : InterruptedOrTimeoutError,
262 errno.ETIMEDOUT : TimedOut,
263 errno.EACCES : PermissionDeniedError,
264 errno.EINPROGRESS : InProgress,
265 errno.EISCONN : IsConnected,
266 errno.EINVAL : InvalidArgumentError,
267 errno.ENOTCONN : NotConnected,
268 errno.ESHUTDOWN : ConnectionShutdown,
272 cdef make_ex(ret: int, msg: str):
274 Translate a librados return code into an exception.
276 :param ret: the return code
278 :param msg: the error message to use
280 :returns: a subclass of :class:`Error`
283 if ret in errno_to_exception:
284 return errno_to_exception[ret](msg, errno=ret)
285 elif ret > MAX_ERRNO:
286 offset = ret - MAX_ERRNO
287 return ExtendMismatch(msg, ret, offset)
289 return OSError(msg, errno=ret)
292 def cstr(val, name, encoding="utf-8", opt=False) -> Optional[bytes]:
294 Create a byte string from a Python string
296 :param basestring val: Python string
297 :param str name: Name of the string parameter, for exceptions
298 :param str encoding: Encoding to use
299 :param bool opt: If True, None is allowed
300 :raises: :class:`InvalidArgument`
302 if opt and val is None:
304 if isinstance(val, bytes):
306 elif isinstance(val, str):
307 return val.encode(encoding)
309 raise TypeError('%s must be a string' % name)
312 def cstr_list(list_str, name, encoding="utf-8"):
313 return [cstr(s, name) for s in list_str]
316 def decode_cstr(val, encoding="utf-8") -> Optional[str]:
318 Decode a byte string into a Python string.
320 :param bytes val: byte string
325 return val.decode(encoding)
328 def flatten_dict(d, name):
329 items = chain.from_iterable(d.items())
330 return cstr(''.join(i + '\0' for i in items), name)
333 cdef char* opt_str(s) except? NULL:
339 cdef void* realloc_chk(void* ptr, size_t size) except NULL:
340 cdef void *ret = realloc(ptr, size)
342 raise MemoryError("realloc failed")
346 cdef size_t * to_csize_t_array(list_int):
347 cdef size_t *ret = <size_t *>malloc(len(list_int) * sizeof(size_t))
349 raise MemoryError("malloc failed")
350 for i in range(len(list_int)):
351 ret[i] = <size_t>list_int[i]
355 cdef char ** to_bytes_array(list_bytes):
356 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
358 raise MemoryError("malloc failed")
359 for i in range(len(list_bytes)):
360 ret[i] = <char *>list_bytes[i]
363 cdef int __monitor_callback(void *arg, const char *line, const char *who,
364 uint64_t sec, uint64_t nsec, uint64_t seq,
365 const char *level, const char *msg) with gil:
366 cdef object cb_info = <object>arg
367 cb_info[0](cb_info[1], line, who, sec, nsec, seq, level, msg)
370 cdef int __monitor_callback2(void *arg, const char *line, const char *channel,
373 uint64_t sec, uint64_t nsec, uint64_t seq,
374 const char *level, const char *msg) with gil:
375 cdef object cb_info = <object>arg
376 cb_info[0](cb_info[1], line, channel, name, who, sec, nsec, seq, level, msg)
380 class Version(object):
381 """ Version information """
382 def __init__(self, major, minor, extra):
388 return "%d.%d.%d" % (self.major, self.minor, self.extra)
391 cdef class Rados(object):
392 """This class wraps librados functions"""
393 # NOTE(sileht): attributes declared in .pyd
395 def __init__(self, *args, **kwargs):
397 self.__setup(*args, **kwargs)
400 "special value that indicates no conffile should be read when creating a mount handle"
401 DEFAULT_CONF_FILES = -2
402 "special value that indicates the default conffiles should be read when creating a mount handle"
405 rados_id: Optional[str] = None,
406 name: Optional[str] = None,
407 clustername: Optional[str] = None,
408 conf_defaults: Optional[Dict[str, str]] = None,
409 conffile: Union[str, int, None] = NO_CONF_FILE,
410 conf: Optional[Dict[str, str]] = None,
412 context: object = None):
413 self.monitor_callback = None
414 self.monitor_callback2 = None
415 self.parsed_args = []
416 self.conf_defaults = conf_defaults
417 self.conffile = conffile
418 self.rados_id = rados_id
420 if rados_id and name:
421 raise Error("Rados(): can't supply both rados_id and name")
423 name = 'client.' + rados_id
425 name = 'client.admin'
426 if clustername is None:
429 name_raw = cstr(name, 'name')
430 clustername_raw = cstr(clustername, 'clustername')
432 char *_name = name_raw
433 char *_clustername = clustername_raw
438 # Unpack void* (aka rados_config_t) from capsule
439 rados_config = <rados_config_t> PyCapsule_GetPointer(context, NULL)
441 ret = rados_create_with_context(&self.cluster, rados_config)
444 ret = rados_create2(&self.cluster, _clustername, _name, _flags)
446 raise Error("rados_initialize failed with error code: %d" % ret)
448 self.state = "configuring"
449 # order is important: conf_defaults, then conffile, then conf
451 for key, value in conf_defaults.items():
452 self.conf_set(key, value)
453 if conffile in (self.NO_CONF_FILE, None):
455 elif conffile in (self.DEFAULT_CONF_FILES, ''):
456 self.conf_read_file(None)
458 self.conf_read_file(conffile)
460 for key, value in conf.items():
461 self.conf_set(key, value)
465 Get associated client addresses with this RADOS session.
467 self.require_state("configuring", "connected")
475 ret = rados_getaddrs(self.cluster, &addrs)
477 raise make_ex(ret, "error calling getaddrs")
479 return decode_cstr(addrs)
483 def require_state(self, *args):
485 Checks if the Rados object is in a special state
487 :raises: :class:`RadosStateError`
489 if self.state in args:
491 raise RadosStateError("You cannot perform that operation on a \
492 Rados object in state %s." % self.state)
496 Disconnects from the cluster. Call this explicitly when a
497 Rados.connect()ed object is no longer used.
499 if self.state != "shutdown":
501 rados_shutdown(self.cluster)
502 self.state = "shutdown"
508 def __exit__(self, type_, value, traceback):
512 def version(self) -> Version:
514 Get the version number of the ``librados`` C library.
516 :returns: a tuple of ``(major, minor, extra)`` components of the
523 rados_version(&major, &minor, &extra)
524 return Version(major, minor, extra)
526 def conf_read_file(self, path: Optional[str] = None):
528 Configure the cluster handle using a Ceph config file.
530 :param path: path to the config file
532 self.require_state("configuring", "connected")
533 path_raw = cstr(path, 'path', opt=True)
535 char *_path = opt_str(path_raw)
537 ret = rados_conf_read_file(self.cluster, _path)
539 raise make_ex(ret, "error calling conf_read_file")
541 def conf_parse_argv(self, args: Sequence[str]):
543 Parse known arguments from args, and remove; returned
544 args contain only those unknown to ceph
546 self.require_state("configuring", "connected")
550 cargs = cstr_list(args, 'args')
552 int _argc = len(args)
553 char **_argv = to_bytes_array(cargs)
554 char **_remargv = NULL
557 _remargv = <char **>malloc(_argc * sizeof(char *))
559 ret = rados_conf_parse_argv_remainder(self.cluster, _argc,
561 <const char**>_remargv)
563 raise make_ex(ret, "error calling conf_parse_argv_remainder")
565 # _remargv was allocated with fixed argc; collapse return
566 # list to eliminate any missing args
567 retargs = [decode_cstr(a) for a in _remargv[:_argc]
569 self.parsed_args = args
575 def conf_parse_env(self, var: Optional[str] = 'CEPH_ARGS'):
577 Parse known arguments from an environment variable, normally
580 self.require_state("configuring", "connected")
584 var_raw = cstr(var, 'var')
588 ret = rados_conf_parse_env(self.cluster, _var)
590 raise make_ex(ret, "error calling conf_parse_env")
592 def conf_get(self, option: str) -> Optional[str]:
594 Get the value of a configuration option
596 :param option: which option to read
598 :returns: value of the option or None
599 :raises: :class:`TypeError`
601 self.require_state("configuring", "connected")
602 option_raw = cstr(option, 'option')
604 char *_option = option_raw
610 ret_buf = <char *>realloc_chk(ret_buf, length)
612 ret = rados_conf_get(self.cluster, _option, ret_buf, length)
614 return decode_cstr(ret_buf)
615 elif ret == -errno.ENAMETOOLONG:
617 elif ret == -errno.ENOENT:
620 raise make_ex(ret, "error calling conf_get")
624 def conf_set(self, option: str, val: str):
626 Set the value of a configuration option
628 :param option: which option to set
629 :param option: value of the option
631 :raises: :class:`TypeError`, :class:`ObjectNotFound`
633 self.require_state("configuring", "connected")
634 option_raw = cstr(option, 'option')
635 val_raw = cstr(val, 'val')
637 char *_option = option_raw
641 ret = rados_conf_set(self.cluster, _option, _val)
643 raise make_ex(ret, "error calling conf_set")
645 def ping_monitor(self, mon_id: str):
647 Ping a monitor to assess liveness
649 May be used as a simply way to assess liveness, or to obtain
650 information about the monitor in a simple way even in the
653 :param mon_id: the ID portion of the monitor's name (i.e., mon.<ID>)
654 :returns: the string reply from the monitor
657 self.require_state("configuring", "connected")
659 mon_id_raw = cstr(mon_id, 'mon_id')
661 char *_mon_id = mon_id_raw
666 ret = rados_ping_monitor(self.cluster, _mon_id, &outstr, &outstrlen)
669 raise make_ex(ret, "error calling ping_monitor")
672 my_outstr = outstr[:outstrlen]
673 rados_buffer_free(outstr)
674 return decode_cstr(my_outstr)
676 def connect(self, timeout: int = 0):
678 Connect to the cluster. Use shutdown() to release resources.
680 :param timeout: Any supplied timeout value is currently ignored.
682 self.require_state("configuring")
683 # NOTE(sileht): timeout was supported by old python API,
684 # but this is not something available in C API, so ignore
685 # for now and remove it later
687 ret = rados_connect(self.cluster)
689 raise make_ex(ret, "error connecting to the cluster")
690 self.state = "connected"
692 def get_instance_id(self) -> int:
694 Get a global id for current instance
696 self.require_state("connected")
698 ret = rados_get_instance_id(self.cluster)
701 def get_cluster_stats(self) -> Dict[str, int]:
703 Read usage info about the cluster
705 This tells you total space, space used, space available, and number
706 of objects. These are not updated immediately when data is written,
707 they are eventually consistent.
708 :returns: contains the following keys:
710 - ``kb`` (int) - total space
712 - ``kb_used`` (int) - space used
714 - ``kb_avail`` (int) - free space available
716 - ``num_objects`` (int) - number of objects
720 rados_cluster_stat_t stats
723 ret = rados_cluster_stat(self.cluster, &stats)
727 ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id)
728 return {'kb': stats.kb,
729 'kb_used': stats.kb_used,
730 'kb_avail': stats.kb_avail,
731 'num_objects': stats.num_objects}
733 def pool_exists(self, pool_name: str) -> bool:
735 Checks if a given pool exists.
737 :param pool_name: name of the pool to check
739 :raises: :class:`TypeError`, :class:`Error`
740 :returns: whether the pool exists, false otherwise.
742 self.require_state("connected")
744 pool_name_raw = cstr(pool_name, 'pool_name')
746 char *_pool_name = pool_name_raw
749 ret = rados_pool_lookup(self.cluster, _pool_name)
752 elif ret == -errno.ENOENT:
755 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
757 def pool_lookup(self, pool_name: str) -> int:
759 Returns a pool's ID based on its name.
761 :param pool_name: name of the pool to look up
763 :raises: :class:`TypeError`, :class:`Error`
764 :returns: pool ID, or None if it doesn't exist
766 self.require_state("connected")
767 pool_name_raw = cstr(pool_name, 'pool_name')
769 char *_pool_name = pool_name_raw
772 ret = rados_pool_lookup(self.cluster, _pool_name)
775 elif ret == -errno.ENOENT:
778 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
780 def pool_reverse_lookup(self, pool_id: int) -> Optional[str]:
782 Returns a pool's name based on its ID.
784 :param pool_id: ID of the pool to look up
786 :raises: :class:`TypeError`, :class:`Error`
787 :returns: pool name, or None if it doesn't exist
789 self.require_state("connected")
791 int64_t _pool_id = pool_id
797 name = <char *>realloc_chk(name, size)
799 ret = rados_pool_reverse_lookup(self.cluster, _pool_id, name, size)
802 elif ret != -errno.ERANGE and size <= 4096:
804 elif ret == -errno.ENOENT:
807 raise make_ex(ret, "error reverse looking up pool '%s'" % pool_id)
809 return decode_cstr(name)
814 def create_pool(self, pool_name: str,
815 crush_rule: Optional[int] = None,
816 auid: Optional[int] = None):
819 - with default settings: if crush_rule=None and auid=None
820 - with a specific CRUSH rule: crush_rule given
821 - with a specific auid: auid given
822 - with a specific CRUSH rule and auid: crush_rule and auid given
824 :param pool_name: name of the pool to create
825 :param crush_rule: rule to use for placement in the new pool
826 :param auid: id of the owner of the new pool
828 :raises: :class:`TypeError`, :class:`Error`
830 self.require_state("connected")
832 pool_name_raw = cstr(pool_name, 'pool_name')
834 char *_pool_name = pool_name_raw
838 if crush_rule is None and auid is None:
840 ret = rados_pool_create(self.cluster, _pool_name)
841 elif crush_rule is not None and auid is None:
842 _crush_rule = crush_rule
844 ret = rados_pool_create_with_crush_rule(self.cluster, _pool_name, _crush_rule)
845 elif crush_rule is None and auid is not None:
848 ret = rados_pool_create_with_auid(self.cluster, _pool_name, _auid)
850 _crush_rule = crush_rule
853 ret = rados_pool_create_with_all(self.cluster, _pool_name, _auid, _crush_rule)
855 raise make_ex(ret, "error creating pool '%s'" % pool_name)
857 def get_pool_base_tier(self, pool_id: int) -> int:
861 :returns: base pool, or pool_id if tiering is not configured for the pool
863 self.require_state("connected")
865 int64_t base_tier = 0
866 int64_t _pool_id = pool_id
869 ret = rados_pool_get_base_tier(self.cluster, _pool_id, &base_tier)
871 raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id)
872 return int(base_tier)
874 def delete_pool(self, pool_name: str):
876 Delete a pool and all data inside it.
878 The pool is removed from the cluster immediately,
879 but the actual data is deleted in the background.
881 :param pool_name: name of the pool to delete
883 :raises: :class:`TypeError`, :class:`Error`
885 self.require_state("connected")
887 pool_name_raw = cstr(pool_name, 'pool_name')
889 char *_pool_name = pool_name_raw
892 ret = rados_pool_delete(self.cluster, _pool_name)
894 raise make_ex(ret, "error deleting pool '%s'" % pool_name)
896 def get_inconsistent_pgs(self, pool_id: int) -> List[str]:
898 List inconsistent placement groups in the given pool
900 :param pool_id: ID of the pool in which PGs are listed
901 :returns: inconsistent placement groups
903 self.require_state("connected")
905 int64_t pool = pool_id
911 pgs = <char *>realloc_chk(pgs, size);
913 ret = rados_inconsistent_pg_list(self.cluster, pool,
920 raise make_ex(ret, "error calling inconsistent_pg_list")
921 return [pg for pg in decode_cstr(pgs[:ret]).split('\0') if pg]
925 def list_pools(self) -> List[str]:
927 Gets a list of pool names.
929 :returns: list of pool names.
931 self.require_state("connected")
938 c_names = <char *>realloc_chk(c_names, size)
940 ret = rados_pool_list(self.cluster, c_names, size)
945 return [name for name in decode_cstr(c_names[:ret]).split('\0')
950 def get_fsid(self) -> str:
952 Get the fsid of the cluster as a hexadecimal string.
954 :raises: :class:`Error`
955 :returns: cluster fsid
957 self.require_state("connected")
964 ret_buf = <char *>realloc_chk(ret_buf, buf_len)
966 ret = rados_cluster_fsid(self.cluster, ret_buf, buf_len)
967 if ret == -errno.ERANGE:
968 buf_len = buf_len * 2
970 raise make_ex(ret, "error getting cluster fsid")
973 return decode_cstr(ret_buf)
977 def open_ioctx(self, ioctx_name: str) -> Ioctx:
981 The io context allows you to perform operations within a particular
984 :param ioctx_name: name of the pool
986 :raises: :class:`TypeError`, :class:`Error`
987 :returns: Rados Ioctx object
989 self.require_state("connected")
990 ioctx_name_raw = cstr(ioctx_name, 'ioctx_name')
993 char *_ioctx_name = ioctx_name_raw
995 ret = rados_ioctx_create(self.cluster, _ioctx_name, &ioctx)
997 raise make_ex(ret, "error opening pool '%s'" % ioctx_name)
998 io = Ioctx(self, ioctx_name)
1002 def open_ioctx2(self, pool_id: int) -> Ioctx:
1004 Create an io context
1006 The io context allows you to perform operations within a particular
1009 :param pool_id: ID of the pool
1011 :raises: :class:`TypeError`, :class:`Error`
1012 :returns: Rados Ioctx object
1014 self.require_state("connected")
1017 int64_t _pool_id = pool_id
1019 ret = rados_ioctx_create2(self.cluster, _pool_id, &ioctx)
1021 raise make_ex(ret, "error opening pool id '%s'" % pool_id)
1022 io = Ioctx(self, str(pool_id))
1026 def mon_command(self,
1030 target: Optional[Union[str, int]] = None) -> Tuple[int, bytes, str]:
1032 Send a command to the mon.
1034 mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen)
1036 :param cmd: JSON formatted string.
1037 :param inbuf: optional string.
1038 :param timeout: This parameter is ignored.
1039 :param target: name or rank of a specific mon. Optional
1040 :return: (int ret, string outbuf, string outs)
1045 >>> c = Rados(conffile='/etc/ceph/ceph.conf')
1047 >>> cmd = json.dumps({"prefix": "osd safe-to-destroy", "ids": ["2"], "format": "json"})
1048 >>> c.mon_command(cmd, b'')
1050 # NOTE(sileht): timeout is ignored because C API doesn't provide
1051 # timeout argument, but we keep it for backward compat with old python binding
1052 self.require_state("connected")
1053 cmds = [cstr(cmd, 'cmd')]
1055 if isinstance(target, int):
1056 # NOTE(sileht): looks weird but test_monmap_dump pass int
1057 target = str(target)
1059 target = cstr(target, 'target', opt=True)
1062 char *_target = opt_str(target)
1063 char **_cmd = to_bytes_array(cmds)
1064 size_t _cmdlen = len(cmds)
1066 char *_inbuf = inbuf
1067 size_t _inbuf_len = len(inbuf)
1077 ret = rados_mon_command_target(self.cluster, _target,
1078 <const char **>_cmd, _cmdlen,
1079 <const char*>_inbuf, _inbuf_len,
1080 &_outbuf, &_outbuf_len,
1084 ret = rados_mon_command(self.cluster,
1085 <const char **>_cmd, _cmdlen,
1086 <const char*>_inbuf, _inbuf_len,
1087 &_outbuf, &_outbuf_len,
1090 my_outs = decode_cstr(_outs[:_outs_len])
1091 my_outbuf = _outbuf[:_outbuf_len]
1093 rados_buffer_free(_outs)
1095 rados_buffer_free(_outbuf)
1096 return (ret, my_outbuf, my_outs)
1100 def osd_command(self,
1104 timeout: int = 0) -> Tuple[int, bytes, str]:
1106 osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1108 :return: (int ret, string outbuf, string outs)
1110 # NOTE(sileht): timeout is ignored because C API doesn't provide
1111 # timeout argument, but we keep it for backward compat with old python binding
1112 self.require_state("connected")
1114 cmds = [cstr(cmd, 'cmd')]
1118 char **_cmd = to_bytes_array(cmds)
1119 size_t _cmdlen = len(cmds)
1121 char *_inbuf = inbuf
1122 size_t _inbuf_len = len(inbuf)
1131 ret = rados_osd_command(self.cluster, _osdid,
1132 <const char **>_cmd, _cmdlen,
1133 <const char*>_inbuf, _inbuf_len,
1134 &_outbuf, &_outbuf_len,
1137 my_outs = decode_cstr(_outs[:_outs_len])
1138 my_outbuf = _outbuf[:_outbuf_len]
1140 rados_buffer_free(_outs)
1142 rados_buffer_free(_outbuf)
1143 return (ret, my_outbuf, my_outs)
1147 def mgr_command(self,
1151 target: Optional[str] = None) -> Tuple[int, str, bytes]:
1153 :return: (int ret, string outbuf, string outs)
1155 # NOTE(sileht): timeout is ignored because C API doesn't provide
1156 # timeout argument, but we keep it for backward compat with old python binding
1157 self.require_state("connected")
1159 cmds = [cstr(cmd, 'cmd')]
1160 target = cstr(target, 'target', opt=True)
1163 char *_target = opt_str(target)
1165 char **_cmd = to_bytes_array(cmds)
1166 size_t _cmdlen = len(cmds)
1168 char *_inbuf = inbuf
1169 size_t _inbuf_len = len(inbuf)
1177 if target is not None:
1179 ret = rados_mgr_command_target(self.cluster,
1180 <const char*>_target,
1181 <const char **>_cmd, _cmdlen,
1182 <const char*>_inbuf, _inbuf_len,
1183 &_outbuf, &_outbuf_len,
1187 ret = rados_mgr_command(self.cluster,
1188 <const char **>_cmd, _cmdlen,
1189 <const char*>_inbuf, _inbuf_len,
1190 &_outbuf, &_outbuf_len,
1193 my_outs = decode_cstr(_outs[:_outs_len])
1194 my_outbuf = _outbuf[:_outbuf_len]
1196 rados_buffer_free(_outs)
1198 rados_buffer_free(_outbuf)
1199 return (ret, my_outbuf, my_outs)
1203 def pg_command(self,
1207 timeout: int = 0) -> Tuple[int, bytes, str]:
1209 pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1211 :return: (int ret, string outbuf, string outs)
1213 # NOTE(sileht): timeout is ignored because C API doesn't provide
1214 # timeout argument, but we keep it for backward compat with old python binding
1215 self.require_state("connected")
1217 pgid_raw = cstr(pgid, 'pgid')
1218 cmds = [cstr(cmd, 'cmd')]
1221 char *_pgid = pgid_raw
1222 char **_cmd = to_bytes_array(cmds)
1223 size_t _cmdlen = len(cmds)
1225 char *_inbuf = inbuf
1226 size_t _inbuf_len = len(inbuf)
1235 ret = rados_pg_command(self.cluster, _pgid,
1236 <const char **>_cmd, _cmdlen,
1237 <const char *>_inbuf, _inbuf_len,
1238 &_outbuf, &_outbuf_len,
1241 my_outs = decode_cstr(_outs[:_outs_len])
1242 my_outbuf = _outbuf[:_outbuf_len]
1244 rados_buffer_free(_outs)
1246 rados_buffer_free(_outbuf)
1247 return (ret, my_outbuf, my_outs)
1251 def wait_for_latest_osdmap(self) -> int:
1252 self.require_state("connected")
1254 ret = rados_wait_for_latest_osdmap(self.cluster)
1257 def blocklist_add(self, client_address: str, expire_seconds: int = 0):
1259 Blocklist a client from the OSDs
1261 :param client_address: client address
1262 :param expire_seconds: number of seconds to blocklist
1264 :raises: :class:`Error`
1266 self.require_state("connected")
1267 client_address_raw = cstr(client_address, 'client_address')
1269 uint32_t _expire_seconds = expire_seconds
1270 char *_client_address = client_address_raw
1273 ret = rados_blocklist_add(self.cluster, _client_address, _expire_seconds)
1275 raise make_ex(ret, "error blocklisting client '%s'" % client_address)
1277 def monitor_log(self, level: str,
1278 callback: Optional[Callable[[object, str, str, str, int, int, int, str, str], None]] = None,
1279 arg: Optional[object] = None):
1280 if level not in MONITOR_LEVELS:
1281 raise LogicError("invalid monitor level " + level)
1282 if callback is not None and not callable(callback):
1283 raise LogicError("callback must be a callable function or None")
1285 level_raw = cstr(level, 'level')
1286 cdef char *_level = level_raw
1288 if callback is None:
1290 r = rados_monitor_log(self.cluster, <const char*>_level, NULL, NULL)
1291 self.monitor_callback = None
1292 self.monitor_callback2 = None
1295 cb = (callback, arg)
1296 cdef PyObject* _arg = <PyObject*>cb
1298 r = rados_monitor_log(self.cluster, <const char*>_level,
1299 <rados_log_callback_t>&__monitor_callback, _arg)
1302 raise make_ex(r, 'error calling rados_monitor_log')
1303 # NOTE(sileht): Prevents the callback method from being garbage collected
1304 self.monitor_callback = cb
1305 self.monitor_callback2 = None
1307 def monitor_log2(self, level: str,
1308 callback: Optional[Callable[[object, str, str, str, str, int, int, int, str, str], None]] = None,
1309 arg: Optional[object] = None):
1310 if level not in MONITOR_LEVELS:
1311 raise LogicError("invalid monitor level " + level)
1312 if callback is not None and not callable(callback):
1313 raise LogicError("callback must be a callable function or None")
1315 level_raw = cstr(level, 'level')
1316 cdef char *_level = level_raw
1318 if callback is None:
1320 r = rados_monitor_log2(self.cluster, <const char*>_level, NULL, NULL)
1321 self.monitor_callback = None
1322 self.monitor_callback2 = None
1325 cb = (callback, arg)
1326 cdef PyObject* _arg = <PyObject*>cb
1328 r = rados_monitor_log2(self.cluster, <const char*>_level,
1329 <rados_log_callback2_t>&__monitor_callback2, _arg)
1332 raise make_ex(r, 'error calling rados_monitor_log')
1333 # NOTE(sileht): Prevents the callback method from being garbage collected
1334 self.monitor_callback = None
1335 self.monitor_callback2 = cb
1337 def service_daemon_register(self, service: str, daemon: str, metadata: Dict[str, str]):
1339 :param str service: service name (e.g. "rgw")
1340 :param str daemon: daemon name (e.g. "gwfoo")
1341 :param dict metadata: static metadata about the register daemon
1342 (e.g., the version of Ceph, the kernel version.)
1344 service_raw = cstr(service, 'service')
1345 daemon_raw = cstr(daemon, 'daemon')
1346 metadata_dict = flatten_dict(metadata, 'metadata')
1348 char *_service = service_raw
1349 char *_daemon = daemon_raw
1350 char *_metadata = metadata_dict
1353 ret = rados_service_register(self.cluster, _service, _daemon, _metadata)
1355 raise make_ex(ret, "error calling service_register()")
1357 def service_daemon_update(self, status: Dict[str, str]):
1358 status_dict = flatten_dict(status, 'status')
1360 char *_status = status_dict
1363 ret = rados_service_update_status(self.cluster, _status)
1365 raise make_ex(ret, "error calling service_daemon_update()")
1368 cdef class OmapIterator(object):
1371 cdef public Ioctx ioctx
1372 cdef rados_omap_iter_t ctx
1374 def __cinit__(self, Ioctx ioctx):
1382 Get the next key-value pair in the object
1383 :returns: next rados.OmapItem
1391 ret = rados_omap_get_next(self.ctx, &key_, &val_, &len_)
1394 raise make_ex(ret, "error iterating over the omap")
1396 raise StopIteration()
1397 key = decode_cstr(key_)
1403 def __dealloc__(self):
1405 rados_omap_get_end(self.ctx)
1408 cdef class ObjectIterator(object):
1409 """rados.Ioctx Object iterator"""
1411 cdef rados_list_ctx_t ctx
1413 cdef public object ioctx
1415 def __cinit__(self, Ioctx ioctx):
1419 ret = rados_nobjects_list_open(ioctx.io, &self.ctx)
1421 raise make_ex(ret, "error iterating over the objects in ioctx '%s'"
1429 Get the next object name and locator in the pool
1431 :raises: StopIteration
1432 :returns: next rados.Ioctx Object
1435 const char *key_ = NULL
1436 const char *locator_ = NULL
1437 const char *nspace_ = NULL
1438 size_t key_size_ = 0
1439 size_t locator_size_ = 0
1440 size_t nspace_size_ = 0
1443 ret = rados_nobjects_list_next2(self.ctx, &key_, &locator_, &nspace_,
1444 &key_size_, &locator_size_, &nspace_size_)
1447 raise StopIteration()
1449 key = decode_cstr(key_[:key_size_])
1450 locator = decode_cstr(locator_[:locator_size_]) if locator_ != NULL else None
1451 nspace = decode_cstr(nspace_[:nspace_size_]) if nspace_ != NULL else None
1452 return Object(self.ioctx, key, locator, nspace)
1454 def __dealloc__(self):
1456 rados_nobjects_list_close(self.ctx)
1459 cdef class XattrIterator(object):
1460 """Extended attribute iterator"""
1462 cdef rados_xattrs_iter_t it
1465 cdef public Ioctx ioctx
1466 cdef public object oid
1468 def __cinit__(self, Ioctx ioctx, oid):
1470 self.oid = cstr(oid, 'oid')
1471 self._oid = self.oid
1474 ret = rados_getxattrs(ioctx.io, self._oid, &self.it)
1476 raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid)
1483 Get the next xattr on the object
1485 :raises: StopIteration
1486 :returns: pair - of name and value of the next Xattr
1489 const char *name_ = NULL
1490 const char *val_ = NULL
1494 ret = rados_getxattrs_next(self.it, &name_, &val_, &len_)
1496 raise make_ex(ret, "error iterating over the extended attributes \
1497 in '%s'" % self.oid)
1499 raise StopIteration()
1500 name = decode_cstr(name_)
1504 def __dealloc__(self):
1506 rados_getxattrs_end(self.it)
1509 cdef class SnapIterator(object):
1510 """Snapshot iterator"""
1512 cdef public Ioctx ioctx
1514 cdef rados_snap_t *snaps
1518 def __cinit__(self, Ioctx ioctx):
1520 # We don't know how big a buffer we need until we've called the
1521 # function. So use the exponential doubling strategy.
1522 cdef int num_snaps = 10
1524 self.snaps = <rados_snap_t*>realloc_chk(self.snaps,
1526 sizeof(rados_snap_t))
1529 ret = rados_ioctx_snap_list(ioctx.io, self.snaps, num_snaps)
1533 elif ret != -errno.ERANGE:
1534 raise make_ex(ret, "error calling rados_snap_list for \
1535 ioctx '%s'" % self.ioctx.name)
1536 num_snaps = num_snaps * 2
1539 def __iter__(self) -> 'SnapIterator':
1542 def __next__(self) -> 'Snap':
1544 Get the next Snapshot
1546 :raises: :class:`Error`, StopIteration
1547 :returns: next snapshot
1549 if self.cur_snap >= self.max_snap:
1553 rados_snap_t snap_id = self.snaps[self.cur_snap]
1559 name = <char *>realloc_chk(name, name_len)
1561 ret = rados_ioctx_snap_get_name(self.ioctx.io, snap_id, name, name_len)
1564 elif ret != -errno.ERANGE:
1565 raise make_ex(ret, "rados_snap_get_name error")
1567 name_len = name_len * 2
1569 snap = Snap(self.ioctx, decode_cstr(name[:name_len]).rstrip('\0'), snap_id)
1570 self.cur_snap = self.cur_snap + 1
1576 cdef class Snap(object):
1577 """Snapshot object"""
1578 cdef public Ioctx ioctx
1579 cdef public object name
1581 # NOTE(sileht): old API was storing the ctypes object
1582 # instead of the value ....
1583 cdef public rados_snap_t snap_id
1585 def __cinit__(self, Ioctx ioctx, object name, rados_snap_t snap_id):
1588 self.snap_id = snap_id
1591 return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \
1592 % (str(self.ioctx), self.name, self.snap_id)
1594 def get_timestamp(self) -> float:
1596 Find when a snapshot in the current pool occurred
1598 :raises: :class:`Error`
1599 :returns: the data and time the snapshot was created
1601 cdef time_t snap_time
1604 ret = rados_ioctx_snap_get_stamp(self.ioctx.io, self.snap_id, &snap_time)
1606 raise make_ex(ret, "rados_ioctx_snap_get_stamp error")
1607 return datetime.fromtimestamp(snap_time)
1609 # https://github.com/cython/cython/issues/1370
1612 cdef class Completion(object):
1613 """completion object"""
1621 rados_callback_t complete_cb
1622 rados_callback_t safe_cb
1623 rados_completion_t rados_comp
1626 def __cinit__(self, Ioctx ioctx, object oncomplete, object onsafe):
1627 self.oncomplete = oncomplete
1628 self.onsafe = onsafe
1631 def is_safe(self) -> bool:
1633 Is an asynchronous operation safe?
1635 This does not imply that the safe callback has finished.
1637 :returns: True if the operation is safe
1639 return self.is_complete()
1641 def is_complete(self) -> bool:
1643 Has an asynchronous operation completed?
1645 This does not imply that the safe callback has finished.
1647 :returns: True if the operation is completed
1650 ret = rados_aio_is_complete(self.rados_comp)
1653 def wait_for_safe(self):
1655 Wait for an asynchronous operation to be marked safe
1657 wait_for_safe() is an alias of wait_for_complete() since Luminous
1659 self.wait_for_complete()
1661 def wait_for_complete(self):
1663 Wait for an asynchronous operation to complete
1665 This does not imply that the complete callback has finished.
1668 rados_aio_wait_for_complete(self.rados_comp)
1670 def wait_for_safe_and_cb(self):
1672 Wait for an asynchronous operation to be marked safe and for
1673 the safe callback to have returned
1675 return self.wait_for_complete_and_cb()
1677 def wait_for_complete_and_cb(self):
1679 Wait for an asynchronous operation to complete and for the
1680 complete callback to have returned
1682 :returns: whether the operation is completed
1685 ret = rados_aio_wait_for_complete_and_cb(self.rados_comp)
1688 def get_return_value(self) -> int:
1690 Get the return value of an asychronous operation
1692 The return value is set when the operation is complete or safe,
1693 whichever comes first.
1695 :returns: return value of the operation
1698 ret = rados_aio_get_return_value(self.rados_comp)
1701 def __dealloc__(self):
1703 Release a completion
1705 Call this when you no longer need the completion. It may not be
1706 freed immediately if the operation is not acked and committed.
1708 ref.Py_XDECREF(self.buf)
1710 if self.rados_comp != NULL:
1712 rados_aio_release(self.rados_comp)
1713 self.rados_comp = NULL
1715 def _complete(self):
1716 self.oncomplete(self)
1722 with self.ioctx.lock:
1724 self.ioctx.complete_completions.remove(self)
1726 self.ioctx.safe_completions.remove(self)
1729 class OpCtx(object):
1730 def __enter__(self):
1731 return self.create()
1733 def __exit__(self, type, msg, traceback):
1737 cdef class WriteOp(object):
1738 cdef rados_write_op_t write_op
1742 self.write_op = rados_create_write_op()
1747 rados_release_write_op(self.write_op)
1749 def new(self, exclusive: Optional[int] = None):
1755 int _exclusive = exclusive
1758 rados_write_op_create(self.write_op, _exclusive, NULL)
1766 rados_write_op_remove(self.write_op)
1768 def set_flags(self, flags: int = LIBRADOS_OPERATION_NOFLAG):
1770 Set flags for the last operation added to this write_op.
1771 :para flags: flags to apply to the last operation
1778 rados_write_op_set_flags(self.write_op, _flags)
1780 def set_xattr(self, xattr_name: str, xattr_value: bytes):
1782 Set an extended attribute on an object.
1783 :param xattr_name: name of the xattr
1784 :param xattr_value: buffer to set xattr to
1786 xattr_name_raw = cstr(xattr_name, 'xattr_name')
1788 char *_xattr_name = xattr_name_raw
1789 char *_xattr_value = xattr_value
1790 size_t _xattr_value_len = len(xattr_value)
1792 rados_write_op_setxattr(self.write_op, _xattr_name, _xattr_value, _xattr_value_len)
1794 def rm_xattr(self, xattr_name: str):
1796 Removes an extended attribute on from an object.
1797 :param xattr_name: name of the xattr to remove
1799 xattr_name_raw = cstr(xattr_name, 'xattr_name')
1801 char *_xattr_name = xattr_name_raw
1803 rados_write_op_rmxattr(self.write_op, _xattr_name)
1805 def append(self, to_write: bytes):
1807 Append data to an object synchronously
1808 :param to_write: data to write
1812 char *_to_write = to_write
1813 size_t length = len(to_write)
1816 rados_write_op_append(self.write_op, _to_write, length)
1818 def write_full(self, to_write: bytes):
1820 Write whole object, atomically replacing it.
1821 :param to_write: data to write
1825 char *_to_write = to_write
1826 size_t length = len(to_write)
1829 rados_write_op_write_full(self.write_op, _to_write, length)
1831 def write(self, to_write: bytes, offset: int = 0):
1834 :param to_write: data to write
1835 :param offset: byte offset in the object to begin writing at
1839 char *_to_write = to_write
1840 size_t length = len(to_write)
1841 uint64_t _offset = offset
1844 rados_write_op_write(self.write_op, _to_write, length, _offset)
1846 def assert_version(self, version: int):
1848 Check if object's version is the expected one.
1849 :param version: expected version of the object
1853 uint64_t _version = version
1856 rados_write_op_assert_version(self.write_op, _version)
1858 def zero(self, offset: int, length: int):
1860 Zero part of an object.
1861 :param offset: byte offset in the object to begin writing at
1862 :param offset: number of zero to write
1866 size_t _length = length
1867 uint64_t _offset = offset
1870 rados_write_op_zero(self.write_op, _length, _offset)
1872 def truncate(self, offset: int):
1875 :param offset: byte offset in the object to begin truncating at
1879 uint64_t _offset = offset
1882 rados_write_op_truncate(self.write_op, _offset)
1884 def execute(self, cls: str, method: str, data: bytes):
1886 Execute an OSD class method on an object
1888 :param cls: name of the object class
1889 :param method: name of the method
1890 :param data: input data
1893 cls_raw = cstr(cls, 'cls')
1894 method_raw = cstr(method, 'method')
1896 char *_cls = cls_raw
1897 char *_method = method_raw
1899 size_t _data_len = len(data)
1902 rados_write_op_exec(self.write_op, _cls, _method, _data, _data_len, NULL)
1904 def writesame(self, to_write: bytes, write_len: int, offset: int = 0):
1906 Write the same buffer multiple times
1907 :param to_write: data to write
1908 :param write_len: total number of bytes to write
1909 :param offset: byte offset in the object to begin writing at
1912 char *_to_write = to_write
1913 size_t _data_len = len(to_write)
1914 size_t _write_len = write_len
1915 uint64_t _offset = offset
1917 rados_write_op_writesame(self.write_op, _to_write, _data_len, _write_len, _offset)
1919 def cmpext(self, cmp_buf: bytes, offset: int = 0):
1921 Ensure that given object range (extent) satisfies comparison
1922 :param cmp_buf: buffer containing bytes to be compared with object contents
1923 :param offset: object byte offset at which to start the comparison
1926 char *_cmp_buf = cmp_buf
1927 size_t _cmp_buf_len = len(cmp_buf)
1928 uint64_t _offset = offset
1930 rados_write_op_cmpext(self.write_op, _cmp_buf, _cmp_buf_len, _offset, NULL)
1932 def omap_cmp(self, key: str, val: str, cmp_op: int = LIBRADOS_CMPXATTR_OP_EQ):
1934 Ensure that an omap key value satisfies comparison
1935 :param key: omap key whose associated value is evaluated for comparison
1936 :param val: value to compare with
1937 :param cmp_op: comparison operator, one of LIBRADOS_CMPXATTR_OP_EQ (1),
1938 LIBRADOS_CMPXATTR_OP_GT (3), or LIBRADOS_CMPXATTR_OP_LT (5).
1940 key_raw = cstr(key, 'key')
1941 val_raw = cstr(val, 'val')
1943 char *_key = key_raw
1944 char *_val = val_raw
1945 size_t _val_len = len(val)
1946 uint8_t _comparison_operator = cmp_op
1948 rados_write_op_omap_cmp(self.write_op, _key, _comparison_operator, _val, _val_len, NULL)
1950 class WriteOpCtx(WriteOp, OpCtx):
1951 """write operation context manager"""
1954 cdef class ReadOp(object):
1955 cdef rados_read_op_t read_op
1959 self.read_op = rados_create_read_op()
1964 rados_release_read_op(self.read_op)
1966 def cmpext(self, cmp_buf: bytes, offset: int = 0):
1968 Ensure that given object range (extent) satisfies comparison
1969 :param cmp_buf: buffer containing bytes to be compared with object contents
1970 :param offset: object byte offset at which to start the comparison
1973 char *_cmp_buf = cmp_buf
1974 size_t _cmp_buf_len = len(cmp_buf)
1975 uint64_t _offset = offset
1977 rados_read_op_cmpext(self.read_op, _cmp_buf, _cmp_buf_len, _offset, NULL)
1979 def set_flags(self, flags: int = LIBRADOS_OPERATION_NOFLAG):
1981 Set flags for the last operation added to this read_op.
1982 :para flags: flags to apply to the last operation
1989 rados_read_op_set_flags(self.read_op, _flags)
1992 class ReadOpCtx(ReadOp, OpCtx):
1993 """read operation context manager"""
1996 cdef void __watch_callback(void *_arg, int64_t _notify_id, uint64_t _cookie,
1997 uint64_t _notifier_id, void *_data,
1998 size_t _data_len) with gil:
2002 cdef object watch = <object>_arg
2005 data = (<char *>_data)[:_data_len]
2006 watch._callback(_notify_id, _notifier_id, _cookie, data)
2008 cdef void __watch_error_callback(void *_arg, uint64_t _cookie,
2009 int _error) with gil:
2011 Watch error callback
2013 cdef object watch = <object>_arg
2014 watch._error_callback(_cookie, _error)
2017 cdef class Watch(object):
2025 object error_callback
2027 def __cinit__(self, Ioctx ioctx, object oid, object callback,
2028 object error_callback, object timeout):
2030 self.ioctx = ioctx.dup()
2031 self.oid = cstr(oid, 'oid')
2032 self.callback = callback
2033 self.error_callback = error_callback
2039 char *_oid = self.oid
2041 uint32_t _timeout = timeout;
2042 void *_args = <PyObject*>self
2045 ret = rados_watch3(self.ioctx.io, _oid, &_cookie,
2046 <rados_watchcb2_t>&__watch_callback,
2047 <rados_watcherrcb_t>&__watch_error_callback,
2050 raise make_ex(ret, "watch error")
2052 self.id = int(_cookie);
2054 def __enter__(self):
2057 def __exit__(self, type_, value, traceback):
2061 def __dealloc__(self):
2064 self.ioctx.rados.require_state("connected")
2067 def _callback(self, notify_id, notifier_id, watch_id, data):
2068 replay = self.callback(notify_id, notifier_id, watch_id, data)
2071 rados_ioctx_t _io = <rados_ioctx_t>self.ioctx.io
2072 char *_obj = self.oid
2073 int64_t _notify_id = notify_id
2074 uint64_t _cookie = watch_id
2075 char *_replay = NULL
2078 if replay is not None:
2079 replay = cstr(replay, 'replay')
2081 _replaylen = len(replay)
2084 rados_notify_ack(_io, _obj, _notify_id, _cookie, _replay,
2087 def _error_callback(self, watch_id, error):
2088 if self.error_callback is None:
2090 self.error_callback(watch_id, error)
2092 def get_id(self) -> int:
2097 Check on watch validity.
2099 :raises: :class:`Error`
2100 :returns: timedelta since last confirmed valid
2102 self.ioctx.require_ioctx_open()
2105 uint64_t _cookie = self.id
2108 ret = rados_watch_check(self.ioctx.io, _cookie)
2110 raise make_ex(ret, "check error")
2112 return timedelta(milliseconds=ret)
2116 Unregister an interest in an object.
2118 :raises: :class:`Error`
2123 self.ioctx.require_ioctx_open()
2126 uint64_t _cookie = self.id
2129 ret = rados_unwatch2(self.ioctx.io, _cookie)
2130 if ret < 0 and ret != -errno.ENOENT:
2131 raise make_ex(ret, "unwatch error")
2135 cluster = rados_ioctx_get_cluster(self.ioctx.io)
2136 ret = rados_watch_flush(cluster);
2138 raise make_ex(ret, "watch_flush error")
2143 cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
2145 Callback to oncomplete() for asynchronous operations
2147 cdef object cb = <object>args
2151 cdef class Ioctx(object):
2152 """rados.Ioctx object"""
2153 # NOTE(sileht): attributes declared in .pyd
2155 def __init__(self, rados, name):
2160 self.locator_key = ""
2162 self.lock = threading.Lock()
2163 self.safe_completions = []
2164 self.complete_completions = []
2166 def __enter__(self):
2169 def __exit__(self, type_, value, traceback):
2173 def __dealloc__(self):
2176 def __track_completion(self, completion_obj):
2177 if completion_obj.oncomplete:
2179 self.complete_completions.append(completion_obj)
2180 if completion_obj.onsafe:
2182 self.safe_completions.append(completion_obj)
2184 def __get_completion(self,
2185 oncomplete: Callable[[Completion], None],
2186 onsafe: Callable[[Completion], None]):
2188 Constructs a completion to use with asynchronous operations
2190 :param oncomplete: what to do when the write is safe and complete in memory
2192 :param onsafe: what to do when the write is safe and complete on storage
2195 :raises: :class:`Error`
2196 :returns: completion object
2199 completion_obj = Completion(self, oncomplete, onsafe)
2202 rados_callback_t complete_cb = NULL
2203 rados_completion_t completion
2204 PyObject* p_completion_obj= <PyObject*>completion_obj
2207 complete_cb = <rados_callback_t>&__aio_complete_cb
2210 ret = rados_aio_create_completion2(p_completion_obj, complete_cb,
2213 raise make_ex(ret, "error getting a completion")
2215 completion_obj.rados_comp = completion
2216 return completion_obj
2223 ioctx = self.rados.open_ioctx2(self.get_pool_id())
2224 ioctx.set_namespace(self.get_namespace())
2229 oncomplete: Callable[[Completion, Optional[int], Optional[time.struct_time]], None]) -> Completion:
2231 Asynchronously get object stats (size/mtime)
2233 oncomplete will be called with the returned size and mtime
2234 as well as the completion:
2236 oncomplete(completion, size, mtime)
2238 :param object_name: the name of the object to get stats from
2239 :param oncomplete: what to do when the stat is complete
2241 :raises: :class:`Error`
2242 :returns: completion object
2245 object_name_raw = cstr(object_name, 'object_name')
2248 Completion completion
2249 char *_object_name = object_name_raw
2253 def oncomplete_(completion_v):
2254 cdef Completion _completion_v = completion_v
2255 return_value = _completion_v.get_return_value()
2256 if return_value >= 0:
2257 return oncomplete(_completion_v, psize, time.localtime(pmtime))
2259 return oncomplete(_completion_v, None, None)
2261 completion = self.__get_completion(oncomplete_, None)
2262 self.__track_completion(completion)
2264 ret = rados_aio_stat(self.io, _object_name, completion.rados_comp,
2268 completion._cleanup()
2269 raise make_ex(ret, "error stating %s" % object_name)
2272 def aio_write(self, object_name: str, to_write: bytes, offset: int = 0,
2273 oncomplete: Optional[Callable[[Completion], None]] = None,
2274 onsafe: Optional[Callable[[Completion], None]] = None) -> Completion:
2276 Write data to an object asynchronously
2278 Queues the write and returns.
2280 :param object_name: name of the object
2281 :param to_write: data to write
2282 :param offset: byte offset in the object to begin writing at
2283 :param oncomplete: what to do when the write is safe and complete in memory
2285 :param onsafe: what to do when the write is safe and complete on storage
2288 :raises: :class:`Error`
2289 :returns: completion object
2292 object_name_raw = cstr(object_name, 'object_name')
2295 Completion completion
2296 char* _object_name = object_name_raw
2297 char* _to_write = to_write
2298 size_t size = len(to_write)
2299 uint64_t _offset = offset
2301 completion = self.__get_completion(oncomplete, onsafe)
2302 self.__track_completion(completion)
2304 ret = rados_aio_write(self.io, _object_name, completion.rados_comp,
2305 _to_write, size, _offset)
2307 completion._cleanup()
2308 raise make_ex(ret, "error writing object %s" % object_name)
2311 def aio_write_full(self, object_name: str, to_write: bytes,
2312 oncomplete: Optional[Callable] = None,
2313 onsafe: Optional[Callable] = None) -> Completion:
2315 Asynchronously write an entire object
2317 The object is filled with the provided data. If the object exists,
2318 it is atomically truncated and then written.
2319 Queues the write and returns.
2321 :param object_name: name of the object
2322 :param to_write: data to write
2323 :param oncomplete: what to do when the write is safe and complete in memory
2325 :param onsafe: what to do when the write is safe and complete on storage
2328 :raises: :class:`Error`
2329 :returns: completion object
2332 object_name_raw = cstr(object_name, 'object_name')
2335 Completion completion
2336 char* _object_name = object_name_raw
2337 char* _to_write = to_write
2338 size_t size = len(to_write)
2340 completion = self.__get_completion(oncomplete, onsafe)
2341 self.__track_completion(completion)
2343 ret = rados_aio_write_full(self.io, _object_name,
2344 completion.rados_comp,
2347 completion._cleanup()
2348 raise make_ex(ret, "error writing object %s" % object_name)
2351 def aio_writesame(self, object_name: str, to_write: bytes,
2352 write_len: int, offset: int = 0,
2353 oncomplete: Optional[Callable] = None) -> Completion:
2355 Asynchronously write the same buffer multiple times
2357 :param object_name: name of the object
2358 :param to_write: data to write
2359 :param write_len: total number of bytes to write
2360 :param offset: byte offset in the object to begin writing at
2361 :param oncomplete: what to do when the writesame is safe and
2362 complete in memory on all replicas
2363 :raises: :class:`Error`
2364 :returns: completion object
2367 object_name_raw = cstr(object_name, 'object_name')
2370 Completion completion
2371 char* _object_name = object_name_raw
2372 char* _to_write = to_write
2373 size_t _data_len = len(to_write)
2374 size_t _write_len = write_len
2375 uint64_t _offset = offset
2377 completion = self.__get_completion(oncomplete, None)
2378 self.__track_completion(completion)
2380 ret = rados_aio_writesame(self.io, _object_name, completion.rados_comp,
2381 _to_write, _data_len, _write_len, _offset)
2384 completion._cleanup()
2385 raise make_ex(ret, "error writing object %s" % object_name)
2388 def aio_append(self, object_name: str, to_append: bytes,
2389 oncomplete: Optional[Callable] = None,
2390 onsafe: Optional[Callable] = None) -> Completion:
2392 Asynchronously append data to an object
2394 Queues the write and returns.
2396 :param object_name: name of the object
2397 :param to_append: data to append
2398 :param offset: byte offset in the object to begin writing at
2399 :param oncomplete: what to do when the write is safe and complete in memory
2401 :param onsafe: what to do when the write is safe and complete on storage
2404 :raises: :class:`Error`
2405 :returns: completion object
2407 object_name_raw = cstr(object_name, 'object_name')
2410 Completion completion
2411 char* _object_name = object_name_raw
2412 char* _to_append = to_append
2413 size_t size = len(to_append)
2415 completion = self.__get_completion(oncomplete, onsafe)
2416 self.__track_completion(completion)
2418 ret = rados_aio_append(self.io, _object_name,
2419 completion.rados_comp,
2422 completion._cleanup()
2423 raise make_ex(ret, "error appending object %s" % object_name)
2426 def aio_flush(self):
2428 Block until all pending writes in an io context are safe
2430 :raises: :class:`Error`
2433 ret = rados_aio_flush(self.io)
2435 raise make_ex(ret, "error flushing")
2437 def aio_cmpext(self, object_name: str, cmp_buf: bytes, offset: int = 0,
2438 oncomplete: Optional[Callable] = None) -> Completion:
2440 Asynchronously compare an on-disk object range with a buffer
2441 :param object_name: the name of the object
2442 :param cmp_buf: buffer containing bytes to be compared with object contents
2443 :param offset: object byte offset at which to start the comparison
2444 :param oncomplete: what to do when the write is safe and complete in memory
2447 :raises: :class:`TypeError`
2448 returns: 0 - on success, negative error code on failure,
2449 (-MAX_ERRNO - mismatch_off) on mismatch
2451 object_name_raw = cstr(object_name, 'object_name')
2454 Completion completion
2455 char* _object_name = object_name_raw
2456 char* _cmp_buf = cmp_buf
2457 size_t _cmp_buf_len = len(cmp_buf)
2458 uint64_t _offset = offset
2460 completion = self.__get_completion(oncomplete, None)
2461 self.__track_completion(completion)
2464 ret = rados_aio_cmpext(self.io, _object_name, completion.rados_comp,
2465 _cmp_buf, _cmp_buf_len, _offset)
2468 completion._cleanup()
2469 raise make_ex(ret, "failed to compare %s" % object_name)
2472 def aio_rmxattr(self, object_name: str, xattr_name: str,
2473 oncomplete: Optional[Callable] = None) -> Completion:
2475 Asynchronously delete an extended attribute from an object
2477 :param object_name: the name of the object to remove xattr from
2478 :param xattr_name: which extended attribute to remove
2479 :param oncomplete: what to do when the rmxattr completes
2481 :raises: :class:`Error`
2482 :returns: completion object
2484 object_name_raw = cstr(object_name, 'object_name')
2485 xattr_name_raw = cstr(xattr_name , 'xattr_name')
2488 Completion completion
2489 char* _object_name = object_name_raw
2490 char* _xattr_name = xattr_name_raw
2492 completion = self.__get_completion(oncomplete, None)
2493 self.__track_completion(completion)
2495 ret = rados_aio_rmxattr(self.io, _object_name,
2496 completion.rados_comp, _xattr_name)
2499 completion._cleanup()
2500 raise make_ex(ret, "Failed to remove xattr %r" % xattr_name)
2503 def aio_read(self, object_name: str, length: int, offset: int,
2504 oncomplete: Optional[Callable] = None) -> Completion:
2506 Asynchronously read data from an object
2508 oncomplete will be called with the returned read value as
2509 well as the completion:
2511 oncomplete(completion, data_read)
2513 :param object_name: name of the object to read from
2514 :param length: the number of bytes to read
2515 :param offset: byte offset in the object to begin reading from
2516 :param oncomplete: what to do when the read is complete
2518 :raises: :class:`Error`
2519 :returns: completion object
2522 object_name_raw = cstr(object_name, 'object_name')
2525 Completion completion
2526 char* _object_name = object_name_raw
2527 uint64_t _offset = offset
2530 size_t _length = length
2532 def oncomplete_(completion_v):
2533 cdef Completion _completion_v = completion_v
2534 return_value = _completion_v.get_return_value()
2535 if return_value > 0 and return_value != length:
2536 _PyBytes_Resize(&_completion_v.buf, return_value)
2537 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2539 completion = self.__get_completion(oncomplete_, None)
2540 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2541 ret_buf = PyBytes_AsString(completion.buf)
2542 self.__track_completion(completion)
2544 ret = rados_aio_read(self.io, _object_name, completion.rados_comp,
2545 ret_buf, _length, _offset)
2547 completion._cleanup()
2548 raise make_ex(ret, "error reading %s" % object_name)
2551 def aio_execute(self, object_name: str, cls: str, method: str,
2552 data: bytes, length: int = 8192,
2553 oncomplete: Optional[Callable[[Completion, bytes], None]] = None,
2554 onsafe: Optional[Callable[[Completion, bytes], None]] = None) -> Completion:
2556 Asynchronously execute an OSD class method on an object.
2558 oncomplete and onsafe will be called with the data returned from
2559 the plugin as well as the completion:
2561 oncomplete(completion, data)
2562 onsafe(completion, data)
2564 :param object_name: name of the object
2565 :param cls: name of the object class
2566 :param method: name of the method
2567 :param data: input data
2568 :param length: size of output buffer in bytes (default=8192)
2569 :param oncomplete: what to do when the execution is complete
2570 :param onsafe: what to do when the execution is safe and complete
2572 :raises: :class:`Error`
2573 :returns: completion object
2576 object_name_raw = cstr(object_name, 'object_name')
2577 cls_raw = cstr(cls, 'cls')
2578 method_raw = cstr(method, 'method')
2580 Completion completion
2581 char *_object_name = object_name_raw
2582 char *_cls = cls_raw
2583 char *_method = method_raw
2585 size_t _data_len = len(data)
2588 size_t _length = length
2590 def oncomplete_(completion_v):
2591 cdef Completion _completion_v = completion_v
2592 return_value = _completion_v.get_return_value()
2593 if return_value > 0 and return_value != length:
2594 _PyBytes_Resize(&_completion_v.buf, return_value)
2595 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2597 def onsafe_(completion_v):
2598 cdef Completion _completion_v = completion_v
2599 return_value = _completion_v.get_return_value()
2600 return onsafe(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2602 completion = self.__get_completion(oncomplete_ if oncomplete else None, onsafe_ if onsafe else None)
2603 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2604 ret_buf = PyBytes_AsString(completion.buf)
2605 self.__track_completion(completion)
2607 ret = rados_aio_exec(self.io, _object_name, completion.rados_comp,
2608 _cls, _method, _data, _data_len, ret_buf, _length)
2610 completion._cleanup()
2611 raise make_ex(ret, "error executing %s::%s on %s" % (cls, method, object_name))
2614 def aio_setxattr(self, object_name: str, xattr_name: str, xattr_value: bytes,
2615 oncomplete: Optional[Callable] = None) -> Completion:
2617 Asynchronously set an extended attribute on an object
2619 :param object_name: the name of the object to set xattr to
2620 :param xattr_name: which extended attribute to set
2621 :param xattr_value: the value of the extended attribute
2622 :param oncomplete: what to do when the setxttr completes
2624 :raises: :class:`Error`
2625 :returns: completion object
2627 object_name_raw = cstr(object_name, 'object_name')
2628 xattr_name_raw = cstr(xattr_name , 'xattr_name')
2631 Completion completion
2632 char* _object_name = object_name_raw
2633 char* _xattr_name = xattr_name_raw
2634 char* _xattr_value = xattr_value
2635 size_t xattr_value_len = len(xattr_value)
2637 completion = self.__get_completion(oncomplete, None)
2638 self.__track_completion(completion)
2640 ret = rados_aio_setxattr(self.io, _object_name,
2641 completion.rados_comp,
2642 _xattr_name, _xattr_value, xattr_value_len)
2645 completion._cleanup()
2646 raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
2649 def aio_remove(self, object_name: str,
2650 oncomplete: Optional[Callable] = None,
2651 onsafe: Optional[Callable] = None) -> Completion:
2653 Asynchronously remove an object
2655 :param object_name: name of the object to remove
2656 :param oncomplete: what to do when the remove is safe and complete in memory
2658 :param onsafe: what to do when the remove is safe and complete on storage
2661 :raises: :class:`Error`
2662 :returns: completion object
2664 object_name_raw = cstr(object_name, 'object_name')
2667 Completion completion
2668 char* _object_name = object_name_raw
2670 completion = self.__get_completion(oncomplete, onsafe)
2671 self.__track_completion(completion)
2673 ret = rados_aio_remove(self.io, _object_name,
2674 completion.rados_comp)
2676 completion._cleanup()
2677 raise make_ex(ret, "error removing %s" % object_name)
2680 def require_ioctx_open(self):
2682 Checks if the rados.Ioctx object state is 'open'
2684 :raises: IoctxStateError
2686 if self.state != "open":
2687 raise IoctxStateError("The pool is %s" % self.state)
2689 def set_locator_key(self, loc_key: str):
2691 Set the key for mapping objects to pgs within an io context.
2693 The key is used instead of the object name to determine which
2694 placement groups an object is put in. This affects all subsequent
2695 operations of the io context - until a different locator key is
2696 set, all objects in this io context will be placed in the same pg.
2698 :param loc_key: the key to use as the object locator, or NULL to discard
2699 any previously set key
2701 :raises: :class:`TypeError`
2703 self.require_ioctx_open()
2704 cloc_key = cstr(loc_key, 'loc_key')
2705 cdef char *_loc_key = cloc_key
2707 rados_ioctx_locator_set_key(self.io, _loc_key)
2708 self.locator_key = loc_key
2710 def get_locator_key(self) -> str:
2712 Get the locator_key of context
2714 :returns: locator_key
2716 return self.locator_key
2718 def set_read(self, snap_id: int):
2720 Set the snapshot for reading objects.
2722 To stop to read from snapshot, use set_read(LIBRADOS_SNAP_HEAD)
2724 :param snap_id: the snapshot Id
2726 :raises: :class:`TypeError`
2728 self.require_ioctx_open()
2729 cdef rados_snap_t _snap_id = snap_id
2731 rados_ioctx_snap_set_read(self.io, _snap_id)
2733 def set_namespace(self, nspace: str):
2735 Set the namespace for objects within an io context.
2737 The namespace in addition to the object name fully identifies
2738 an object. This affects all subsequent operations of the io context
2739 - until a different namespace is set, all objects in this io context
2740 will be placed in the same namespace.
2742 :param nspace: the namespace to use, or None/"" for the default namespace
2744 :raises: :class:`TypeError`
2746 self.require_ioctx_open()
2749 cnspace = cstr(nspace, 'nspace')
2750 cdef char *_nspace = cnspace
2752 rados_ioctx_set_namespace(self.io, _nspace)
2753 self.nspace = nspace
2755 def get_namespace(self) -> str:
2757 Get the namespace of context
2765 Close a rados.Ioctx object.
2767 This just tells librados that you no longer need to use the io context.
2768 It may not be freed immediately if there are pending asynchronous
2769 requests on it, but you should not use an io context again after
2770 calling this function on it.
2772 if self.state == "open":
2773 self.require_ioctx_open()
2775 rados_ioctx_destroy(self.io)
2776 self.state = "closed"
2779 def write(self, key: str, data: bytes, offset: int = 0):
2781 Write data to an object synchronously
2783 :param key: name of the object
2784 :param data: data to write
2785 :param offset: byte offset in the object to begin writing at
2787 :raises: :class:`TypeError`
2788 :raises: :class:`LogicError`
2789 :returns: int - 0 on success
2791 self.require_ioctx_open()
2793 key_raw = cstr(key, 'key')
2795 char *_key = key_raw
2797 size_t length = len(data)
2798 uint64_t _offset = offset
2801 ret = rados_write(self.io, _key, _data, length, _offset)
2805 raise make_ex(ret, "Ioctx.write(%s): failed to write %s"
2808 raise LogicError("Ioctx.write(%s): rados_write \
2809 returned %d, but should return zero on success." % (self.name, ret))
2811 def write_full(self, key: str, data: bytes):
2813 Write an entire object synchronously.
2815 The object is filled with the provided data. If the object exists,
2816 it is atomically truncated and then written.
2818 :param key: name of the object
2819 :param data: data to write
2821 :raises: :class:`TypeError`
2822 :raises: :class:`Error`
2823 :returns: int - 0 on success
2825 self.require_ioctx_open()
2826 key_raw = cstr(key, 'key')
2828 char *_key = key_raw
2830 size_t length = len(data)
2833 ret = rados_write_full(self.io, _key, _data, length)
2837 raise make_ex(ret, "Ioctx.write_full(%s): failed to write %s"
2840 raise LogicError("Ioctx.write_full(%s): rados_write_full \
2841 returned %d, but should return zero on success." % (self.name, ret))
2843 def writesame(self, key: str, data: bytes, write_len: int, offset: int = 0):
2845 Write the same buffer multiple times
2846 :param key: name of the object
2847 :param data: data to write
2848 :param write_len: total number of bytes to write
2849 :param offset: byte offset in the object to begin writing at
2851 :raises: :class:`TypeError`
2852 :raises: :class:`LogicError`
2854 self.require_ioctx_open()
2856 key_raw = cstr(key, 'key')
2858 char *_key = key_raw
2860 size_t _data_len = len(data)
2861 size_t _write_len = write_len
2862 uint64_t _offset = offset
2865 ret = rados_writesame(self.io, _key, _data, _data_len, _write_len, _offset)
2867 raise make_ex(ret, "Ioctx.writesame(%s): failed to write %s"
2871 def append(self, key: str, data: bytes):
2873 Append data to an object synchronously
2875 :param key: name of the object
2876 :param data: data to write
2878 :raises: :class:`TypeError`
2879 :raises: :class:`LogicError`
2880 :returns: int - 0 on success
2882 self.require_ioctx_open()
2883 key_raw = cstr(key, 'key')
2885 char *_key = key_raw
2887 size_t length = len(data)
2890 ret = rados_append(self.io, _key, _data, length)
2894 raise make_ex(ret, "Ioctx.append(%s): failed to append %s"
2897 raise LogicError("Ioctx.append(%s): rados_append \
2898 returned %d, but should return zero on success." % (self.name, ret))
2900 def read(self, key: str, length: int = 8192, offset: int = 0) -> bytes:
2902 Read data from an object synchronously
2904 :param key: name of the object
2905 :param length: the number of bytes to read (default=8192)
2906 :param offset: byte offset in the object to begin reading at
2908 :raises: :class:`TypeError`
2909 :raises: :class:`Error`
2910 :returns: data read from object
2912 self.require_ioctx_open()
2913 key_raw = cstr(key, 'key')
2915 char *_key = key_raw
2917 uint64_t _offset = offset
2918 size_t _length = length
2919 PyObject* ret_s = NULL
2921 ret_s = PyBytes_FromStringAndSize(NULL, length)
2923 ret_buf = PyBytes_AsString(ret_s)
2925 ret = rados_read(self.io, _key, ret_buf, _length, _offset)
2927 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2930 _PyBytes_Resize(&ret_s, ret)
2932 return <object>ret_s
2934 # We DECREF unconditionally: the cast to object above will have
2935 # INCREFed if necessary. This also takes care of exceptions,
2936 # including if _PyString_Resize fails (that will free the string
2937 # itself and set ret_s to NULL, hence XDECREF).
2938 ref.Py_XDECREF(ret_s)
2940 def execute(self, key: str, cls: str, method: str, data: bytes, length: int = 8192) -> Tuple[int, object]:
2942 Execute an OSD class method on an object.
2944 :param key: name of the object
2945 :param cls: name of the object class
2946 :param method: name of the method
2947 :param data: input data
2948 :param length: size of output buffer in bytes (default=8192)
2950 :raises: :class:`TypeError`
2951 :raises: :class:`Error`
2952 :returns: (ret, method output)
2954 self.require_ioctx_open()
2956 key_raw = cstr(key, 'key')
2957 cls_raw = cstr(cls, 'cls')
2958 method_raw = cstr(method, 'method')
2960 char *_key = key_raw
2961 char *_cls = cls_raw
2962 char *_method = method_raw
2964 size_t _data_len = len(data)
2967 size_t _length = length
2968 PyObject* ret_s = NULL
2970 ret_s = PyBytes_FromStringAndSize(NULL, length)
2972 ret_buf = PyBytes_AsString(ret_s)
2974 ret = rados_exec(self.io, _key, _cls, _method, _data,
2975 _data_len, ret_buf, _length)
2977 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2980 _PyBytes_Resize(&ret_s, ret)
2982 return ret, <object>ret_s
2984 # We DECREF unconditionally: the cast to object above will have
2985 # INCREFed if necessary. This also takes care of exceptions,
2986 # including if _PyString_Resize fails (that will free the string
2987 # itself and set ret_s to NULL, hence XDECREF).
2988 ref.Py_XDECREF(ret_s)
2990 def get_stats(self) -> Dict[str, int]:
2992 Get pool usage statistics
2994 :returns: dict contains the following keys:
2996 - ``num_bytes`` (int) - size of pool in bytes
2998 - ``num_kb`` (int) - size of pool in kbytes
3000 - ``num_objects`` (int) - number of objects in the pool
3002 - ``num_object_clones`` (int) - number of object clones
3004 - ``num_object_copies`` (int) - number of object copies
3006 - ``num_objects_missing_on_primary`` (int) - number of objects
3009 - ``num_objects_unfound`` (int) - number of unfound objects
3011 - ``num_objects_degraded`` (int) - number of degraded objects
3013 - ``num_rd`` (int) - bytes read
3015 - ``num_rd_kb`` (int) - kbytes read
3017 - ``num_wr`` (int) - bytes written
3019 - ``num_wr_kb`` (int) - kbytes written
3021 self.require_ioctx_open()
3022 cdef rados_pool_stat_t stats
3024 ret = rados_ioctx_pool_stat(self.io, &stats)
3026 raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name)
3027 return {'num_bytes': stats.num_bytes,
3028 'num_kb': stats.num_kb,
3029 'num_objects': stats.num_objects,
3030 'num_object_clones': stats.num_object_clones,
3031 'num_object_copies': stats.num_object_copies,
3032 "num_objects_missing_on_primary": stats.num_objects_missing_on_primary,
3033 "num_objects_unfound": stats.num_objects_unfound,
3034 "num_objects_degraded": stats.num_objects_degraded,
3035 "num_rd": stats.num_rd,
3036 "num_rd_kb": stats.num_rd_kb,
3037 "num_wr": stats.num_wr,
3038 "num_wr_kb": stats.num_wr_kb}
3040 def remove_object(self, key: str) -> bool:
3044 This does not delete any snapshots of the object.
3046 :param key: the name of the object to delete
3048 :raises: :class:`TypeError`
3049 :raises: :class:`Error`
3050 :returns: True on success
3052 self.require_ioctx_open()
3053 key_raw = cstr(key, 'key')
3055 char *_key = key_raw
3058 ret = rados_remove(self.io, _key)
3060 raise make_ex(ret, "Failed to remove '%s'" % key)
3063 def trunc(self, key: str, size: int) -> int:
3067 If this enlarges the object, the new area is logically filled with
3068 zeroes. If this shrinks the object, the excess data is removed.
3070 :param key: the name of the object to resize
3071 :param size: the new size of the object in bytes
3073 :raises: :class:`TypeError`
3074 :raises: :class:`Error`
3075 :returns: 0 on success, otherwise raises error
3078 self.require_ioctx_open()
3079 key_raw = cstr(key, 'key')
3081 char *_key = key_raw
3082 uint64_t _size = size
3085 ret = rados_trunc(self.io, _key, _size)
3087 raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))
3090 def cmpext(self, key: str, cmp_buf: bytes, offset: int = 0) -> int:
3092 Compare an on-disk object range with a buffer
3093 :param key: the name of the object
3094 :param cmp_buf: buffer containing bytes to be compared with object contents
3095 :param offset: object byte offset at which to start the comparison
3097 :raises: :class:`TypeError`
3098 :raises: :class:`Error`
3099 :returns: 0 - on success, negative error code on failure,
3100 (-MAX_ERRNO - mismatch_off) on mismatch
3102 self.require_ioctx_open()
3103 key_raw = cstr(key, 'key')
3105 char *_key = key_raw
3106 char *_cmp_buf = cmp_buf
3107 size_t _cmp_buf_len = len(cmp_buf)
3108 uint64_t _offset = offset
3110 ret = rados_cmpext(self.io, _key, _cmp_buf, _cmp_buf_len, _offset)
3111 assert ret < -MAX_ERRNO or ret == 0, "Ioctx.cmpext(%s): failed to compare %s" % (self.name, key)
3114 def stat(self, key: str) -> Tuple[int, time.struct_time]:
3116 Get object stats (size/mtime)
3118 :param key: the name of the object to get stats from
3120 :raises: :class:`TypeError`
3121 :raises: :class:`Error`
3122 :returns: (size,timestamp)
3124 self.require_ioctx_open()
3126 key_raw = cstr(key, 'key')
3128 char *_key = key_raw
3133 ret = rados_stat(self.io, _key, &psize, &pmtime)
3135 raise make_ex(ret, "Failed to stat %r" % key)
3136 return psize, time.localtime(pmtime)
3138 def get_xattr(self, key: str, xattr_name: str) -> bytes:
3140 Get the value of an extended attribute on an object.
3142 :param key: the name of the object to get xattr from
3143 :param xattr_name: which extended attribute to read
3145 :raises: :class:`TypeError`
3146 :raises: :class:`Error`
3147 :returns: value of the xattr
3149 self.require_ioctx_open()
3151 key_raw = cstr(key, 'key')
3152 xattr_name_raw = cstr(xattr_name, 'xattr_name')
3154 char *_key = key_raw
3155 char *_xattr_name = xattr_name_raw
3156 size_t ret_length = 4096
3157 char *ret_buf = NULL
3160 while ret_length < 4096 * 1024 * 1024:
3161 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
3163 ret = rados_getxattr(self.io, _key, _xattr_name, ret_buf, ret_length)
3164 if ret == -errno.ERANGE:
3167 raise make_ex(ret, "Failed to get xattr %r" % xattr_name)
3170 return ret_buf[:ret]
3174 def get_xattrs(self, oid: str) -> XattrIterator:
3176 Start iterating over xattrs on an object.
3178 :param oid: the name of the object to get xattrs from
3180 :raises: :class:`TypeError`
3181 :raises: :class:`Error`
3182 :returns: XattrIterator
3184 self.require_ioctx_open()
3185 return XattrIterator(self, oid)
3187 def set_xattr(self, key: str, xattr_name: str, xattr_value: bytes) -> bool:
3189 Set an extended attribute on an object.
3191 :param key: the name of the object to set xattr to
3192 :param xattr_name: which extended attribute to set
3193 :param xattr_value: the value of the extended attribute
3195 :raises: :class:`TypeError`
3196 :raises: :class:`Error`
3197 :returns: True on success, otherwise raise an error
3199 self.require_ioctx_open()
3201 key_raw = cstr(key, 'key')
3202 xattr_name_raw = cstr(xattr_name, 'xattr_name')
3204 char *_key = key_raw
3205 char *_xattr_name = xattr_name_raw
3206 char *_xattr_value = xattr_value
3207 size_t _xattr_value_len = len(xattr_value)
3210 ret = rados_setxattr(self.io, _key, _xattr_name,
3211 _xattr_value, _xattr_value_len)
3213 raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
3216 def rm_xattr(self, key: str, xattr_name: str) -> bool:
3218 Removes an extended attribute on from an object.
3220 :param key: the name of the object to remove xattr from
3221 :param xattr_name: which extended attribute to remove
3223 :raises: :class:`TypeError`
3224 :raises: :class:`Error`
3225 :returns: True on success, otherwise raise an error
3227 self.require_ioctx_open()
3229 key_raw = cstr(key, 'key')
3230 xattr_name_raw = cstr(xattr_name, 'xattr_name')
3232 char *_key = key_raw
3233 char *_xattr_name = xattr_name_raw
3236 ret = rados_rmxattr(self.io, _key, _xattr_name)
3238 raise make_ex(ret, "Failed to delete key %r xattr %r" %
3242 def notify(self, obj: str, msg: str = '', timeout_ms: int = 5000) -> bool:
3244 Send a rados notification to an object.
3246 :param obj: the name of the object to notify
3247 :param msg: optional message to send in the notification
3248 :param timeout_ms: notify timeout (in ms)
3250 :raises: :class:`TypeError`
3251 :raises: :class:`Error`
3252 :returns: True on success, otherwise raise an error
3254 self.require_ioctx_open()
3257 obj_raw = cstr(obj, 'obj')
3258 msg_raw = cstr(msg, 'msg')
3260 char *_obj = obj_raw
3261 char *_msg = msg_raw
3262 int _msglen = msglen
3263 uint64_t _timeout_ms = timeout_ms
3266 ret = rados_notify2(self.io, _obj, _msg, _msglen, _timeout_ms,
3269 raise make_ex(ret, "Failed to notify %r" % (obj))
3272 def aio_notify(self, obj: str,
3273 oncomplete: Callable[[Completion, int, Optional[List], Optional[List]], None],
3274 msg: str = '', timeout_ms: int = 5000) -> Completion:
3276 Asynchronously send a rados notification to an object
3278 self.require_ioctx_open()
3281 obj_raw = cstr(obj, 'obj')
3282 msg_raw = cstr(msg, 'msg')
3285 Completion completion
3286 char *_obj = obj_raw
3287 char *_msg = msg_raw
3288 int _msglen = msglen
3289 uint64_t _timeout_ms = timeout_ms
3293 def oncomplete_(completion_v):
3295 Completion _completion_v = completion_v
3296 notify_ack_t *acks = NULL
3297 notify_timeout_t *timeouts = NULL
3300 return_value = _completion_v.get_return_value()
3301 if return_value == 0:
3302 return_value = rados_decode_notify_response(reply, replylen, &acks, &nr_acks, &timeouts, &nr_timeouts)
3303 rados_buffer_free(reply)
3304 if return_value == 0:
3305 ack_list = [(ack.notifier_id, ack.cookie, '' if not ack.payload_len \
3306 else ack.payload[:ack.payload_len]) for ack in acks[:nr_acks]]
3307 timeout_list = [(timeout.notifier_id, timeout.cookie) for timeout in timeouts[:nr_timeouts]]
3308 rados_free_notify_response(acks, nr_acks, timeouts)
3309 return oncomplete(_completion_v, 0, ack_list, timeout_list)
3311 return oncomplete(_completion_v, return_value, None, None)
3313 completion = self.__get_completion(oncomplete_, None)
3314 self.__track_completion(completion)
3316 ret = rados_aio_notify(self.io, _obj, completion.rados_comp,
3317 _msg, _msglen, _timeout_ms, &reply, &replylen)
3319 completion._cleanup()
3320 raise make_ex(ret, "aio_notify error: %s" % obj)
3323 def watch(self, obj: str,
3324 callback: Callable[[int, str, int, bytes], None],
3325 error_callback: Optional[Callable[[int], None]] = None,
3326 timeout: Optional[int] = None) -> Watch:
3328 Register an interest in an object.
3330 :param obj: the name of the object to notify
3331 :param callback: what to do when a notify is received on this object
3332 :param error_callback: what to do when the watch session encounters an error
3333 :param timeout: how many seconds the connection will keep after disconnection
3335 :raises: :class:`TypeError`
3336 :raises: :class:`Error`
3337 :returns: watch_id - internal id assigned to this watch
3339 self.require_ioctx_open()
3341 return Watch(self, obj, callback, error_callback, timeout)
3343 def list_objects(self) -> ObjectIterator:
3345 Get ObjectIterator on rados.Ioctx object.
3347 :returns: ObjectIterator
3349 self.require_ioctx_open()
3350 return ObjectIterator(self)
3352 def list_snaps(self) -> SnapIterator:
3354 Get SnapIterator on rados.Ioctx object.
3356 :returns: SnapIterator
3358 self.require_ioctx_open()
3359 return SnapIterator(self)
3361 def get_pool_id(self) -> int:
3365 :returns: int - pool id
3368 ret = rados_ioctx_get_id(self.io)
3371 def get_pool_name(self) -> str:
3383 name = <char *>realloc_chk(name, name_len)
3385 ret = rados_ioctx_get_pool_name(self.io, name, name_len)
3388 elif ret != -errno.ERANGE:
3389 raise make_ex(ret, "get pool name error")
3391 name_len = name_len * 2
3393 return decode_cstr(name)
3398 def create_snap(self, snap_name: str):
3400 Create a pool-wide snapshot
3402 :param snap_name: the name of the snapshot
3404 :raises: :class:`TypeError`
3405 :raises: :class:`Error`
3407 self.require_ioctx_open()
3408 snap_name_raw = cstr(snap_name, 'snap_name')
3409 cdef char *_snap_name = snap_name_raw
3412 ret = rados_ioctx_snap_create(self.io, _snap_name)
3414 raise make_ex(ret, "Failed to create snap %s" % snap_name)
3416 def remove_snap(self, snap_name: str):
3418 Removes a pool-wide snapshot
3420 :param snap_name: the name of the snapshot
3422 :raises: :class:`TypeError`
3423 :raises: :class:`Error`
3425 self.require_ioctx_open()
3426 snap_name_raw = cstr(snap_name, 'snap_name')
3427 cdef char *_snap_name = snap_name_raw
3430 ret = rados_ioctx_snap_remove(self.io, _snap_name)
3432 raise make_ex(ret, "Failed to remove snap %s" % snap_name)
3434 def lookup_snap(self, snap_name: str) -> Snap:
3436 Get the id of a pool snapshot
3438 :param snap_name: the name of the snapshot to lookop
3440 :raises: :class:`TypeError`
3441 :raises: :class:`Error`
3442 :returns: Snap - on success
3444 self.require_ioctx_open()
3445 csnap_name = cstr(snap_name, 'snap_name')
3447 char *_snap_name = csnap_name
3448 rados_snap_t snap_id
3451 ret = rados_ioctx_snap_lookup(self.io, _snap_name, &snap_id)
3453 raise make_ex(ret, "Failed to lookup snap %s" % snap_name)
3454 return Snap(self, snap_name, int(snap_id))
3456 def snap_rollback(self, oid: str, snap_name: str):
3458 Rollback an object to a snapshot
3460 :param oid: the name of the object
3461 :param snap_name: the name of the snapshot
3463 :raises: :class:`TypeError`
3464 :raises: :class:`Error`
3466 self.require_ioctx_open()
3467 oid_raw = cstr(oid, 'oid')
3468 snap_name_raw = cstr(snap_name, 'snap_name')
3470 char *_oid = oid_raw
3471 char *_snap_name = snap_name_raw
3474 ret = rados_ioctx_snap_rollback(self.io, _oid, _snap_name)
3476 raise make_ex(ret, "Failed to rollback %s" % oid)
3478 def create_self_managed_snap(self):
3480 Creates a self-managed snapshot
3482 :returns: snap id on success
3484 :raises: :class:`Error`
3486 self.require_ioctx_open()
3488 rados_snap_t _snap_id
3490 ret = rados_ioctx_selfmanaged_snap_create(self.io, &_snap_id)
3492 raise make_ex(ret, "Failed to create self-managed snapshot")
3493 return int(_snap_id)
3495 def remove_self_managed_snap(self, snap_id: int):
3497 Removes a self-managed snapshot
3499 :param snap_id: the name of the snapshot
3501 :raises: :class:`TypeError`
3502 :raises: :class:`Error`
3504 self.require_ioctx_open()
3506 rados_snap_t _snap_id = snap_id
3508 ret = rados_ioctx_selfmanaged_snap_remove(self.io, _snap_id)
3510 raise make_ex(ret, "Failed to remove self-managed snapshot")
3512 def set_self_managed_snap_write(self, snaps: Sequence[Union[int, str]]):
3514 Updates the write context to the specified self-managed
3517 :param snaps: all associated self-managed snapshot ids
3519 :raises: :class:`TypeError`
3520 :raises: :class:`Error`
3522 self.require_ioctx_open()
3526 sorted_snaps = sorted([int(x) for x in snaps], reverse=True)
3527 snap_seq = sorted_snaps[0]
3530 rados_snap_t _snap_seq = snap_seq
3531 rados_snap_t *_snaps = NULL
3532 int _num_snaps = len(sorted_snaps)
3534 _snaps = <rados_snap_t *>malloc(_num_snaps * sizeof(rados_snap_t))
3535 for i in range(len(sorted_snaps)):
3536 _snaps[i] = sorted_snaps[i]
3538 ret = rados_ioctx_selfmanaged_snap_set_write_ctx(self.io,
3543 raise make_ex(ret, "Failed to update snapshot write context")
3547 def rollback_self_managed_snap(self, oid: str, snap_id: int):
3549 Rolls an specific object back to a self-managed snapshot revision
3551 :param oid: the name of the object
3552 :param snap_id: the name of the snapshot
3554 :raises: :class:`TypeError`
3555 :raises: :class:`Error`
3557 self.require_ioctx_open()
3558 oid_raw = cstr(oid, 'oid')
3560 char *_oid = oid_raw
3561 rados_snap_t _snap_id = snap_id
3563 ret = rados_ioctx_selfmanaged_snap_rollback(self.io, _oid, _snap_id)
3565 raise make_ex(ret, "Failed to rollback %s" % oid)
3567 def get_last_version(self) -> int:
3569 Return the version of the last object read or written to.
3571 This exposes the internal version number of the last object read or
3572 written via this io context
3574 :returns: version of the last object used
3576 self.require_ioctx_open()
3578 ret = rados_get_last_version(self.io)
3581 def create_write_op(self) -> WriteOp:
3583 create write operation object.
3584 need call release_write_op after use
3586 return WriteOp().create()
3588 def create_read_op(self) -> ReadOp:
3590 create read operation object.
3591 need call release_read_op after use
3593 return ReadOp().create()
3595 def release_write_op(self, write_op):
3597 release memory alloc by create_write_op
3601 def release_read_op(self, read_op: ReadOp):
3603 release memory alloc by create_read_op
3604 :para read_op: read_op object
3608 def set_omap(self, write_op: WriteOp, keys: Sequence[str], values: Sequence[bytes]):
3610 set keys values to write_op
3611 :para write_op: write_operation object
3612 :para keys: a tuple of keys
3613 :para values: a tuple of values
3616 if len(keys) != len(values):
3617 raise Error("Rados(): keys and values must have the same number of items")
3619 keys = cstr_list(keys, 'keys')
3620 values = cstr_list(values, 'values')
3621 lens = [len(v) for v in values]
3623 WriteOp _write_op = write_op
3624 size_t key_num = len(keys)
3625 char **_keys = to_bytes_array(keys)
3626 char **_values = to_bytes_array(values)
3627 size_t *_lens = to_csize_t_array(lens)
3631 rados_write_op_omap_set(_write_op.write_op,
3632 <const char**>_keys,
3633 <const char**>_values,
3634 <const size_t*>_lens, key_num)
3640 def operate_write_op(self,
3644 flags: int = LIBRADOS_OPERATION_NOFLAG):
3646 execute the real write operation
3647 :para write_op: write operation object
3648 :para oid: object name
3649 :para mtime: the time to set the mtime to, 0 for the current time
3650 :para flags: flags to apply to the entire operation
3653 oid_raw = cstr(oid, 'oid')
3655 WriteOp _write_op = write_op
3656 char *_oid = oid_raw
3657 time_t _mtime = mtime
3661 ret = rados_write_op_operate(_write_op.write_op, self.io, _oid, &_mtime, _flags)
3663 raise make_ex(ret, "Failed to operate write op for oid %s" % oid)
3665 def operate_aio_write_op(self, write_op: WriteOp, oid: str,
3666 oncomplete: Optional[Callable[[Completion], None]] = None,
3667 onsafe: Optional[Callable[[Completion], None]] = None,
3669 flags: int = LIBRADOS_OPERATION_NOFLAG) -> Completion:
3671 execute the real write operation asynchronously
3672 :para write_op: write operation object
3673 :para oid: object name
3674 :param oncomplete: what to do when the remove is safe and complete in memory
3676 :param onsafe: what to do when the remove is safe and complete on storage
3678 :para mtime: the time to set the mtime to, 0 for the current time
3679 :para flags: flags to apply to the entire operation
3681 :raises: :class:`Error`
3682 :returns: completion object
3685 oid_raw = cstr(oid, 'oid')
3687 WriteOp _write_op = write_op
3688 char *_oid = oid_raw
3689 Completion completion
3690 time_t _mtime = mtime
3693 completion = self.__get_completion(oncomplete, onsafe)
3694 self.__track_completion(completion)
3697 ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid,
3700 completion._cleanup()
3701 raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid)
3704 def operate_read_op(self, read_op: ReadOp, oid: str, flag: int = LIBRADOS_OPERATION_NOFLAG):
3706 execute the real read operation
3707 :para read_op: read operation object
3708 :para oid: object name
3709 :para flag: flags to apply to the entire operation
3711 oid_raw = cstr(oid, 'oid')
3713 ReadOp _read_op = read_op
3714 char *_oid = oid_raw
3718 ret = rados_read_op_operate(_read_op.read_op, self.io, _oid, _flag)
3720 raise make_ex(ret, "Failed to operate read op for oid %s" % oid)
3722 def operate_aio_read_op(self, read_op: ReadOp, oid: str,
3723 oncomplete: Optional[Callable[[Completion], None]] = None,
3724 onsafe: Optional[Callable[[Completion], None]] = None,
3725 flag: int = LIBRADOS_OPERATION_NOFLAG) -> Completion:
3727 execute the real read operation
3728 :para read_op: read operation object
3729 :para oid: object name
3730 :param oncomplete: what to do when the remove is safe and complete in memory
3732 :param onsafe: what to do when the remove is safe and complete on storage
3734 :para flag: flags to apply to the entire operation
3736 oid_raw = cstr(oid, 'oid')
3738 ReadOp _read_op = read_op
3739 char *_oid = oid_raw
3740 Completion completion
3743 completion = self.__get_completion(oncomplete, onsafe)
3744 self.__track_completion(completion)
3747 ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag)
3749 completion._cleanup()
3750 raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid)
3753 def get_omap_vals(self,
3757 max_return: int) -> Tuple[OmapIterator, int]:
3760 :para read_op: read operation object
3761 :para start_after: list keys starting after start_after
3762 :para filter_prefix: list only keys beginning with filter_prefix
3763 :para max_return: list no more than max_return key/value pairs
3764 :returns: an iterator over the requested omap values, return value from this action
3767 start_after_raw = cstr(start_after, 'start_after') if start_after else None
3768 filter_prefix_raw = cstr(filter_prefix, 'filter_prefix') if filter_prefix else None
3770 char *_start_after = opt_str(start_after_raw)
3771 char *_filter_prefix = opt_str(filter_prefix_raw)
3772 ReadOp _read_op = read_op
3773 rados_omap_iter_t iter_addr = NULL
3774 int _max_return = max_return
3777 rados_read_op_omap_get_vals2(_read_op.read_op, _start_after, _filter_prefix,
3778 _max_return, &iter_addr, NULL, NULL)
3779 it = OmapIterator(self)
3781 return it, 0 # 0 is meaningless; there for backward-compat
3783 def get_omap_keys(self, read_op: ReadOp, start_after: str, max_return: int) -> Tuple[OmapIterator, int]:
3786 :para read_op: read operation object
3787 :para start_after: list keys starting after start_after
3788 :para max_return: list no more than max_return key/value pairs
3789 :returns: an iterator over the requested omap values, return value from this action
3791 start_after = cstr(start_after, 'start_after') if start_after else None
3793 char *_start_after = opt_str(start_after)
3794 ReadOp _read_op = read_op
3795 rados_omap_iter_t iter_addr = NULL
3796 int _max_return = max_return
3799 rados_read_op_omap_get_keys2(_read_op.read_op, _start_after,
3800 _max_return, &iter_addr, NULL, NULL)
3801 it = OmapIterator(self)
3803 return it, 0 # 0 is meaningless; there for backward-compat
3805 def get_omap_vals_by_keys(self, read_op: ReadOp, keys: Sequence[str]) -> Tuple[OmapIterator, int]:
3807 get the omap values by keys
3808 :para read_op: read operation object
3809 :para keys: input key tuple
3810 :returns: an iterator over the requested omap values, return value from this action
3812 keys = cstr_list(keys, 'keys')
3814 ReadOp _read_op = read_op
3815 rados_omap_iter_t iter_addr
3816 char **_keys = to_bytes_array(keys)
3817 size_t key_num = len(keys)
3821 rados_read_op_omap_get_vals_by_keys(_read_op.read_op,
3822 <const char**>_keys,
3823 key_num, &iter_addr, NULL)
3824 it = OmapIterator(self)
3826 return it, 0 # 0 is meaningless; there for backward-compat
3830 def remove_omap_keys(self, write_op: WriteOp, keys: Sequence[str]):
3832 remove omap keys specifiled
3833 :para write_op: write operation object
3834 :para keys: input key tuple
3837 keys = cstr_list(keys, 'keys')
3839 WriteOp _write_op = write_op
3840 size_t key_num = len(keys)
3841 char **_keys = to_bytes_array(keys)
3845 rados_write_op_omap_rm_keys(_write_op.write_op, <const char**>_keys, key_num)
3849 def clear_omap(self, write_op: WriteOp):
3851 Remove all key/value pairs from an object
3852 :para write_op: write operation object
3856 WriteOp _write_op = write_op
3859 rados_write_op_omap_clear(_write_op.write_op)
3861 def remove_omap_range2(self, write_op: WriteOp, key_begin: str, key_end: str):
3863 Remove key/value pairs from an object whose keys are in the range
3864 [key_begin, key_end)
3865 :param write_op: write operation object
3866 :param key_begin: the lower bound of the key range to remove
3867 :param key_end: the upper bound of the key range to remove
3869 key_begin_raw = cstr(key_begin, 'key_begin')
3870 key_end_raw = cstr(key_end, 'key_end')
3872 WriteOp _write_op = write_op
3873 char* _key_begin = key_begin_raw
3874 size_t key_begin_len = len(key_begin)
3875 char* _key_end = key_end_raw
3876 size_t key_end_len = len(key_end)
3878 rados_write_op_omap_rm_range2(_write_op.write_op, _key_begin, key_begin_len,
3879 _key_end, key_end_len)
3881 def lock_exclusive(self, key: str, name: str, cookie: str, desc: str = "",
3882 duration: Optional[int] = None,
3886 Take an exclusive lock on an object
3888 :param key: name of the object
3889 :param name: name of the lock
3890 :param cookie: cookie of the lock
3891 :param desc: description of the lock
3892 :param duration: duration of the lock in seconds
3895 :raises: :class:`TypeError`
3896 :raises: :class:`Error`
3898 self.require_ioctx_open()
3900 key_raw = cstr(key, 'key')
3901 name_raw = cstr(name, 'name')
3902 cookie_raw = cstr(cookie, 'cookie')
3903 desc_raw = cstr(desc, 'desc')
3906 char* _key = key_raw
3907 char* _name = name_raw
3908 char* _cookie = cookie_raw
3909 char* _desc = desc_raw
3910 uint8_t _flags = flags
3913 if duration is None:
3915 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3918 _duration.tv_sec = duration
3920 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3924 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3926 def lock_shared(self, key: str, name: str, cookie: str, tag: str, desc: str = "",
3927 duration: Optional[int] = None,
3931 Take a shared lock on an object
3933 :param key: name of the object
3934 :param name: name of the lock
3935 :param cookie: cookie of the lock
3936 :param tag: tag of the lock
3937 :param desc: description of the lock
3938 :param duration: duration of the lock in seconds
3941 :raises: :class:`TypeError`
3942 :raises: :class:`Error`
3944 self.require_ioctx_open()
3946 key_raw = cstr(key, 'key')
3947 tag_raw = cstr(tag, 'tag')
3948 name_raw = cstr(name, 'name')
3949 cookie_raw = cstr(cookie, 'cookie')
3950 desc_raw = cstr(desc, 'desc')
3953 char* _key = key_raw
3954 char* _tag = tag_raw
3955 char* _name = name_raw
3956 char* _cookie = cookie_raw
3957 char* _desc = desc_raw
3958 uint8_t _flags = flags
3961 if duration is None:
3963 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3966 _duration.tv_sec = duration
3968 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3971 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3973 def unlock(self, key: str, name: str, cookie: str):
3976 Release a shared or exclusive lock on an object
3978 :param key: name of the object
3979 :param name: name of the lock
3980 :param cookie: cookie of the lock
3982 :raises: :class:`TypeError`
3983 :raises: :class:`Error`
3985 self.require_ioctx_open()
3987 key_raw = cstr(key, 'key')
3988 name_raw = cstr(name, 'name')
3989 cookie_raw = cstr(cookie, 'cookie')
3992 char* _key = key_raw
3993 char* _name = name_raw
3994 char* _cookie = cookie_raw
3997 ret = rados_unlock(self.io, _key, _name, _cookie)
3999 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
4001 def set_osdmap_full_try(self):
4003 Set global osdmap_full_try label to true
4006 rados_set_pool_full_try(self.io)
4008 def unset_osdmap_full_try(self):
4013 rados_unset_pool_full_try(self.io)
4015 def application_enable(self, app_name: str, force: bool = False):
4017 Enable an application on an OSD pool
4019 :param app_name: application name
4021 :param force: False if only a single app should exist per pool
4022 :type expire_seconds: boool
4024 :raises: :class:`Error`
4026 app_name_raw = cstr(app_name, 'app_name')
4028 char *_app_name = app_name_raw
4029 int _force = (1 if force else 0)
4032 ret = rados_application_enable(self.io, _app_name, _force)
4034 raise make_ex(ret, "error enabling application")
4036 def application_list(self) -> List[str]:
4038 Returns a list of enabled applications
4040 :returns: list of app name string
4048 apps = <char *>realloc_chk(apps, length)
4050 ret = rados_application_list(self.io, apps, &length)
4052 return [decode_cstr(app) for app in
4053 apps[:length].split(b'\0') if app]
4054 elif ret == -errno.ENOENT:
4056 elif ret == -errno.ERANGE:
4059 raise make_ex(ret, "error listing applications")
4063 def application_metadata_get(self, app_name: str, key: str) -> str:
4065 Gets application metadata on an OSD pool for the given key
4067 :param app_name: application name
4069 :param key: metadata key
4071 :returns: str - metadata value
4073 :raises: :class:`Error`
4076 app_name_raw = cstr(app_name, 'app_name')
4077 key_raw = cstr(key, 'key')
4079 char *_app_name = app_name_raw
4080 char *_key = key_raw
4086 value = <char *>realloc_chk(value, size)
4088 ret = rados_application_metadata_get(self.io, _app_name,
4090 if ret != -errno.ERANGE:
4092 if ret == -errno.ENOENT:
4093 raise KeyError('no metadata %s for application %s' % (key, _app_name))
4095 raise make_ex(ret, 'error getting metadata %s for application %s' %
4097 return decode_cstr(value)
4101 def application_metadata_set(self, app_name: str, key: str, value: str):
4103 Sets application metadata on an OSD pool
4105 :param app_name: application name
4107 :param key: metadata key
4109 :param value: metadata value
4112 :raises: :class:`Error`
4114 app_name_raw = cstr(app_name, 'app_name')
4115 key_raw = cstr(key, 'key')
4116 value_raw = cstr(value, 'value')
4118 char *_app_name = app_name_raw
4119 char *_key = key_raw
4120 char *_value = value_raw
4123 ret = rados_application_metadata_set(self.io, _app_name, _key,
4126 raise make_ex(ret, "error setting application metadata")
4128 def application_metadata_remove(self, app_name: str, key: str):
4130 Remove application metadata from an OSD pool
4132 :param app_name: application name
4134 :param key: metadata key
4137 :raises: :class:`Error`
4139 app_name_raw = cstr(app_name, 'app_name')
4140 key_raw = cstr(key, 'key')
4142 char *_app_name = app_name_raw
4143 char *_key = key_raw
4146 ret = rados_application_metadata_remove(self.io, _app_name, _key)
4148 raise make_ex(ret, "error removing application metadata")
4150 def application_metadata_list(self, app_name: str) -> List[Tuple[str, str]]:
4152 Returns a list of enabled applications
4154 :param app_name: application name
4156 :returns: list of key/value tuples
4158 app_name_raw = cstr(app_name, 'app_name')
4160 char *_app_name = app_name_raw
4161 size_t key_length = 128
4162 size_t val_length = 128
4168 c_keys = <char *>realloc_chk(c_keys, key_length)
4169 c_vals = <char *>realloc_chk(c_vals, val_length)
4171 ret = rados_application_metadata_list(self.io, _app_name,
4172 c_keys, &key_length,
4173 c_vals, &val_length)
4175 keys = [decode_cstr(key) for key in
4176 c_keys[:key_length].split(b'\0')]
4177 vals = [decode_cstr(val) for val in
4178 c_vals[:val_length].split(b'\0')]
4179 return list(zip(keys, vals))[:-1]
4180 elif ret == -errno.ERANGE:
4183 raise make_ex(ret, "error listing application metadata")
4188 def alignment(self) -> int:
4190 Returns pool alignment
4193 Number of alignment bytes required by the current pool, or None if
4194 alignment is not required.
4201 ret = rados_ioctx_pool_requires_alignment2(self.io, &requires)
4203 raise make_ex(ret, "error checking alignment")
4208 ret = rados_ioctx_pool_required_alignment2(self.io, &_alignment)
4210 raise make_ex(ret, "error querying alignment")
4211 alignment = _alignment
4215 def set_object_locator(func):
4216 def retfunc(self, *args, **kwargs):
4217 if self.locator_key is not None:
4218 old_locator = self.ioctx.get_locator_key()
4219 self.ioctx.set_locator_key(self.locator_key)
4220 retval = func(self, *args, **kwargs)
4221 self.ioctx.set_locator_key(old_locator)
4224 return func(self, *args, **kwargs)
4228 def set_object_namespace(func):
4229 def retfunc(self, *args, **kwargs):
4230 if self.nspace is None:
4231 raise LogicError("Namespace not set properly in context")
4232 old_nspace = self.ioctx.get_namespace()
4233 self.ioctx.set_namespace(self.nspace)
4234 retval = func(self, *args, **kwargs)
4235 self.ioctx.set_namespace(old_nspace)
4240 class Object(object):
4241 """Rados object wrapper, makes the object look like a file"""
4242 def __init__(self, ioctx, key, locator_key=None, nspace=None):
4246 self.state = "exists"
4247 self.locator_key = locator_key
4248 self.nspace = "" if nspace is None else nspace
4251 return "rados.Object(ioctx=%s,key=%s,nspace=%s,locator=%s)" % \
4252 (str(self.ioctx), self.key, "--default--"
4253 if self.nspace is "" else self.nspace, self.locator_key)
4255 def require_object_exists(self):
4256 if self.state != "exists":
4257 raise ObjectStateError("The object is %s" % self.state)
4260 @set_object_namespace
4261 def read(self, length=1024 * 1024):
4262 self.require_object_exists()
4263 ret = self.ioctx.read(self.key, length, self.offset)
4264 self.offset += len(ret)
4268 @set_object_namespace
4269 def write(self, string_to_write):
4270 self.require_object_exists()
4271 ret = self.ioctx.write(self.key, string_to_write, self.offset)
4273 self.offset += len(string_to_write)
4277 @set_object_namespace
4279 self.require_object_exists()
4280 self.ioctx.remove_object(self.key)
4281 self.state = "removed"
4284 @set_object_namespace
4285 def stat(self) -> Tuple[int, time.struct_time]:
4286 self.require_object_exists()
4287 return self.ioctx.stat(self.key)
4289 def seek(self, position: int):
4290 self.require_object_exists()
4291 self.offset = position
4294 @set_object_namespace
4295 def get_xattr(self, xattr_name: str) -> bytes:
4296 self.require_object_exists()
4297 return self.ioctx.get_xattr(self.key, xattr_name)
4300 @set_object_namespace
4301 def get_xattrs(self) -> XattrIterator:
4302 self.require_object_exists()
4303 return self.ioctx.get_xattrs(self.key)
4306 @set_object_namespace
4307 def set_xattr(self, xattr_name: str, xattr_value: bytes) -> bool:
4308 self.require_object_exists()
4309 return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)
4312 @set_object_namespace
4313 def rm_xattr(self, xattr_name: str) -> bool:
4314 self.require_object_exists()
4315 return self.ioctx.rm_xattr(self.key, xattr_name)
4326 class MonitorLog(object):
4327 # NOTE(sileht): Keep this class for backward compat
4328 # method moved to Rados.monitor_log()
4330 For watching cluster log messages. Instantiate an object and keep
4331 it around while callback is periodically called. Construct with
4332 'level' to monitor 'level' messages (one of MONITOR_LEVELS).
4333 arg will be passed to the callback.
4335 callback will be called with:
4336 arg (given to __init__)
4337 line (the full line, including timestamp, who, level, msg)
4338 who (which entity issued the log message)
4339 timestamp_sec (sec of a struct timespec)
4340 timestamp_nsec (sec of a struct timespec)
4341 seq (sequence number)
4342 level (string representing the level of the log message)
4343 msg (the message itself)
4344 callback's return value is ignored
4346 def __init__(self, cluster, level, callback, arg):
4348 self.callback = callback
4350 self.cluster = cluster
4351 self.cluster.monitor_log(level, callback, arg)