1 # cython: embedsignature=True, binding=True
3 This module is a thin wrapper around librados.
5 Error codes from librados are turned into exceptions that subclass
6 :class:`Error`. Almost all methods may raise :class:`Error(the base class of all rados exceptions), :class:`PermissionError`
7 (the base class of all rados exceptions), :class:`PermissionError`
8 and :class:`IOError`, in addition to those documented for the
11 # Copyright 2011 Josh Durgin
12 # Copyright 2011, Hannu Valtonen <hannu.valtonen@ormod.com>
13 # Copyright 2015 Hector Martin <marcan@marcan.st>
14 # Copyright 2016 Mehdi Abaakouk <sileht@redhat.com>
16 from cpython cimport PyObject, ref
17 from cpython.pycapsule cimport *
18 from libc cimport errno
19 from libc.stdint cimport *
20 from libc.stdlib cimport malloc, realloc, free
22 include "mock_rados.pxi"
24 from c_rados cimport *
29 from datetime import datetime, timedelta
30 from functools import partial, wraps
31 from itertools import chain
32 from typing import Callable, Dict, List, Optional, Sequence, Tuple, Union
34 cdef extern from "Python.h":
35 # These are in cpython/string.pxd, but use "object" types instead of
36 # PyObject*, which invokes assumptions in cpython that we need to
37 # legitimately break to implement zero-copy string buffers in Ioctx.read().
38 # This is valid use of the Python API and documented as a special case.
39 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
40 char* PyBytes_AsString(PyObject *string) except NULL
41 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
42 void PyEval_InitThreads()
44 LIBRADOS_OP_FLAG_EXCL = _LIBRADOS_OP_FLAG_EXCL
45 LIBRADOS_OP_FLAG_FAILOK = _LIBRADOS_OP_FLAG_FAILOK
46 LIBRADOS_OP_FLAG_FADVISE_RANDOM = _LIBRADOS_OP_FLAG_FADVISE_RANDOM
47 LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL = _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL
48 LIBRADOS_OP_FLAG_FADVISE_WILLNEED = _LIBRADOS_OP_FLAG_FADVISE_WILLNEED
49 LIBRADOS_OP_FLAG_FADVISE_DONTNEED = _LIBRADOS_OP_FLAG_FADVISE_DONTNEED
50 LIBRADOS_OP_FLAG_FADVISE_NOCACHE = _LIBRADOS_OP_FLAG_FADVISE_NOCACHE
52 LIBRADOS_CMPXATTR_OP_EQ = _LIBRADOS_CMPXATTR_OP_EQ
53 LIBRADOS_CMPXATTR_OP_NE = _LIBRADOS_CMPXATTR_OP_NE
54 LIBRADOS_CMPXATTR_OP_GT = _LIBRADOS_CMPXATTR_OP_GT
55 LIBRADOS_CMPXATTR_OP_GTE = _LIBRADOS_CMPXATTR_OP_GTE
56 LIBRADOS_CMPXATTR_OP_LT = _LIBRADOS_CMPXATTR_OP_LT
57 LIBRADOS_CMPXATTR_OP_LTE = _LIBRADOS_CMPXATTR_OP_LTE
59 LIBRADOS_SNAP_HEAD = _LIBRADOS_SNAP_HEAD
61 LIBRADOS_OPERATION_NOFLAG = _LIBRADOS_OPERATION_NOFLAG
62 LIBRADOS_OPERATION_BALANCE_READS = _LIBRADOS_OPERATION_BALANCE_READS
63 LIBRADOS_OPERATION_LOCALIZE_READS = _LIBRADOS_OPERATION_LOCALIZE_READS
64 LIBRADOS_OPERATION_ORDER_READS_WRITES = _LIBRADOS_OPERATION_ORDER_READS_WRITES
65 LIBRADOS_OPERATION_IGNORE_CACHE = _LIBRADOS_OPERATION_IGNORE_CACHE
66 LIBRADOS_OPERATION_SKIPRWLOCKS = _LIBRADOS_OPERATION_SKIPRWLOCKS
67 LIBRADOS_OPERATION_IGNORE_OVERLAY = _LIBRADOS_OPERATION_IGNORE_OVERLAY
69 LIBRADOS_ALL_NSPACES = _LIBRADOS_ALL_NSPACES.decode('utf-8')
71 LIBRADOS_CREATE_EXCLUSIVE = _LIBRADOS_CREATE_EXCLUSIVE
72 LIBRADOS_CREATE_IDEMPOTENT = _LIBRADOS_CREATE_IDEMPOTENT
74 MAX_ERRNO = _MAX_ERRNO
76 ANONYMOUS_AUID = 0xffffffffffffffff
79 OMAP_KEY_TYPE = Union[str,bytes]
81 class Error(Exception):
82 """ `Error` class, derived from `Exception` """
83 def __init__(self, message, errno=None):
84 super(Exception, self).__init__(message)
88 msg = super(Exception, self).__str__()
89 if self.errno is None:
91 return '[errno {0}] {1}'.format(self.errno, msg)
94 return (self.__class__, (self.message, self.errno))
96 class InvalidArgumentError(Error):
97 def __init__(self, message, errno=None):
98 super(InvalidArgumentError, self).__init__(
99 "RADOS invalid argument (%s)" % message, errno)
101 class ExtendMismatch(Error):
102 def __init__(self, message, errno, offset):
104 "object content does not match (%s)" % message, errno)
107 class OSError(Error):
108 """ `OSError` class, derived from `Error` """
111 class InterruptedOrTimeoutError(OSError):
112 """ `InterruptedOrTimeoutError` class, derived from `OSError` """
113 def __init__(self, message, errno=None):
114 super(InterruptedOrTimeoutError, self).__init__(
115 "RADOS interrupted or timeout (%s)" % message, errno)
118 class PermissionError(OSError):
119 """ `PermissionError` class, derived from `OSError` """
120 def __init__(self, message, errno=None):
121 super(PermissionError, self).__init__(
122 "RADOS permission error (%s)" % message, errno)
125 class PermissionDeniedError(OSError):
126 """ deal with EACCES related. """
127 def __init__(self, message, errno=None):
128 super(PermissionDeniedError, self).__init__(
129 "RADOS permission denied (%s)" % message, errno)
132 class ObjectNotFound(OSError):
133 """ `ObjectNotFound` class, derived from `OSError` """
134 def __init__(self, message, errno=None):
135 super(ObjectNotFound, self).__init__(
136 "RADOS object not found (%s)" % message, errno)
139 class NoData(OSError):
140 """ `NoData` class, derived from `OSError` """
141 def __init__(self, message, errno=None):
142 super(NoData, self).__init__(
143 "RADOS no data (%s)" % message, errno)
146 class ObjectExists(OSError):
147 """ `ObjectExists` class, derived from `OSError` """
148 def __init__(self, message, errno=None):
149 super(ObjectExists, self).__init__(
150 "RADOS object exists (%s)" % message, errno)
153 class ObjectBusy(OSError):
154 """ `ObjectBusy` class, derived from `IOError` """
155 def __init__(self, message, errno=None):
156 super(ObjectBusy, self).__init__(
157 "RADOS object busy (%s)" % message, errno)
160 class IOError(OSError):
161 """ `ObjectBusy` class, derived from `OSError` """
162 def __init__(self, message, errno=None):
163 super(IOError, self).__init__(
164 "RADOS I/O error (%s)" % message, errno)
167 class NoSpace(OSError):
168 """ `NoSpace` class, derived from `OSError` """
169 def __init__(self, message, errno=None):
170 super(NoSpace, self).__init__(
171 "RADOS no space (%s)" % message, errno)
173 class NotConnected(OSError):
174 """ `NotConnected` class, derived from `OSError` """
175 def __init__(self, message, errno=None):
176 super(NotConnected, self).__init__(
177 "RADOS not connected (%s)" % message, errno)
179 class RadosStateError(Error):
180 """ `RadosStateError` class, derived from `Error` """
181 def __init__(self, message, errno=None):
182 super(RadosStateError, self).__init__(
183 "RADOS rados state (%s)" % message, errno)
186 class IoctxStateError(Error):
187 """ `IoctxStateError` class, derived from `Error` """
188 def __init__(self, message, errno=None):
189 super(IoctxStateError, self).__init__(
190 "RADOS Ioctx state error (%s)" % message, errno)
193 class ObjectStateError(Error):
194 """ `ObjectStateError` class, derived from `Error` """
195 def __init__(self, message, errno=None):
196 super(ObjectStateError, self).__init__(
197 "RADOS object state error (%s)" % message, errno)
200 class LogicError(Error):
201 """ `` class, derived from `Error` """
202 def __init__(self, message, errno=None):
203 super(LogicError, self).__init__(
204 "RADOS logic error (%s)" % message, errno)
207 class TimedOut(OSError):
208 """ `TimedOut` class, derived from `OSError` """
209 def __init__(self, message, errno=None):
210 super(TimedOut, self).__init__(
211 "RADOS timed out (%s)" % message, errno)
214 class InProgress(Error):
215 """ `InProgress` class, derived from `Error` """
216 def __init__(self, message, errno=None):
217 super(InProgress, self).__init__(
218 "RADOS in progress error (%s)" % message, errno)
221 class IsConnected(Error):
222 """ `IsConnected` class, derived from `Error` """
223 def __init__(self, message, errno=None):
224 super(IsConnected, self).__init__(
225 "RADOS is connected error (%s)" % message, errno)
228 class ConnectionShutdown(OSError):
229 """ `ConnectionShutdown` class, derived from `OSError` """
230 def __init__(self, message, errno=None):
231 super(ConnectionShutdown, self).__init__(
232 "RADOS connection was shutdown (%s)" % message, errno)
235 IF UNAME_SYSNAME == "FreeBSD":
236 cdef errno_to_exception = {
237 errno.EPERM : PermissionError,
238 errno.ENOENT : ObjectNotFound,
240 errno.ENOSPC : NoSpace,
241 errno.EEXIST : ObjectExists,
242 errno.EBUSY : ObjectBusy,
243 errno.ENOATTR : NoData,
244 errno.EINTR : InterruptedOrTimeoutError,
245 errno.ETIMEDOUT : TimedOut,
246 errno.EACCES : PermissionDeniedError,
247 errno.EINPROGRESS : InProgress,
248 errno.EISCONN : IsConnected,
249 errno.EINVAL : InvalidArgumentError,
250 errno.ENOTCONN : NotConnected,
251 errno.ESHUTDOWN : ConnectionShutdown,
254 cdef errno_to_exception = {
255 errno.EPERM : PermissionError,
256 errno.ENOENT : ObjectNotFound,
258 errno.ENOSPC : NoSpace,
259 errno.EEXIST : ObjectExists,
260 errno.EBUSY : ObjectBusy,
261 errno.ENODATA : NoData,
262 errno.EINTR : InterruptedOrTimeoutError,
263 errno.ETIMEDOUT : TimedOut,
264 errno.EACCES : PermissionDeniedError,
265 errno.EINPROGRESS : InProgress,
266 errno.EISCONN : IsConnected,
267 errno.EINVAL : InvalidArgumentError,
268 errno.ENOTCONN : NotConnected,
269 errno.ESHUTDOWN : ConnectionShutdown,
273 cdef make_ex(ret: int, msg: str):
275 Translate a librados return code into an exception.
277 :param ret: the return code
279 :param msg: the error message to use
281 :returns: a subclass of :class:`Error`
284 if ret in errno_to_exception:
285 return errno_to_exception[ret](msg, errno=ret)
286 elif ret > MAX_ERRNO:
287 offset = ret - MAX_ERRNO
288 return ExtendMismatch(msg, ret, offset)
290 return OSError(msg, errno=ret)
293 def cstr(val, name, encoding="utf-8", opt=False) -> Optional[bytes]:
295 Create a byte string from a Python string
297 :param basestring val: Python string
298 :param str name: Name of the string parameter, for exceptions
299 :param str encoding: Encoding to use
300 :param bool opt: If True, None is allowed
301 :raises: :class:`InvalidArgument`
303 if opt and val is None:
305 if isinstance(val, bytes):
307 elif isinstance(val, str):
308 return val.encode(encoding)
310 raise TypeError('%s must be a string' % name)
313 def cstr_list(list_str, name, encoding="utf-8"):
314 return [cstr(s, name) for s in list_str]
317 def decode_cstr(val, encoding="utf-8") -> Optional[str]:
319 Decode a byte string into a Python string.
321 :param bytes val: byte string
326 return val.decode(encoding)
329 def flatten_dict(d, name):
330 items = chain.from_iterable(d.items())
331 return cstr(''.join(i + '\0' for i in items), name)
334 cdef char* opt_str(s) except? NULL:
340 cdef void* realloc_chk(void* ptr, size_t size) except NULL:
341 cdef void *ret = realloc(ptr, size)
343 raise MemoryError("realloc failed")
347 cdef size_t * to_csize_t_array(list_int):
348 cdef size_t *ret = <size_t *>malloc(len(list_int) * sizeof(size_t))
350 raise MemoryError("malloc failed")
351 for i in range(len(list_int)):
352 ret[i] = <size_t>list_int[i]
356 cdef char ** to_bytes_array(list_bytes):
357 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
359 raise MemoryError("malloc failed")
360 for i in range(len(list_bytes)):
361 ret[i] = <char *>list_bytes[i]
364 cdef int __monitor_callback(void *arg, const char *line, const char *who,
365 uint64_t sec, uint64_t nsec, uint64_t seq,
366 const char *level, const char *msg) with gil:
367 cdef object cb_info = <object>arg
368 cb_info[0](cb_info[1], line, who, sec, nsec, seq, level, msg)
371 cdef int __monitor_callback2(void *arg, const char *line, const char *channel,
374 uint64_t sec, uint64_t nsec, uint64_t seq,
375 const char *level, const char *msg) with gil:
376 cdef object cb_info = <object>arg
377 cb_info[0](cb_info[1], line, channel, name, who, sec, nsec, seq, level, msg)
381 class Version(object):
382 """ Version information """
383 def __init__(self, major, minor, extra):
389 return "%d.%d.%d" % (self.major, self.minor, self.extra)
392 cdef class Rados(object):
393 """This class wraps librados functions"""
394 # NOTE(sileht): attributes declared in .pyd
396 def __init__(self, *args, **kwargs):
398 self.__setup(*args, **kwargs)
401 "special value that indicates no conffile should be read when creating a mount handle"
402 DEFAULT_CONF_FILES = -2
403 "special value that indicates the default conffiles should be read when creating a mount handle"
406 rados_id: Optional[str] = None,
407 name: Optional[str] = None,
408 clustername: Optional[str] = None,
409 conf_defaults: Optional[Dict[str, str]] = None,
410 conffile: Union[str, int, None] = NO_CONF_FILE,
411 conf: Optional[Dict[str, str]] = None,
413 context: object = None):
414 self.monitor_callback = None
415 self.monitor_callback2 = None
416 self.parsed_args = []
417 self.conf_defaults = conf_defaults
418 self.conffile = conffile
419 self.rados_id = rados_id
421 if rados_id and name:
422 raise Error("Rados(): can't supply both rados_id and name")
424 name = 'client.' + rados_id
426 name = 'client.admin'
427 if clustername is None:
430 name_raw = cstr(name, 'name')
431 clustername_raw = cstr(clustername, 'clustername')
433 char *_name = name_raw
434 char *_clustername = clustername_raw
439 # Unpack void* (aka rados_config_t) from capsule
440 rados_config = <rados_config_t> PyCapsule_GetPointer(context, NULL)
442 ret = rados_create_with_context(&self.cluster, rados_config)
445 ret = rados_create2(&self.cluster, _clustername, _name, _flags)
447 raise Error("rados_initialize failed with error code: %d" % ret)
449 self.state = "configuring"
450 # order is important: conf_defaults, then conffile, then conf
452 for key, value in conf_defaults.items():
453 self.conf_set(key, value)
454 if conffile in (self.NO_CONF_FILE, None):
456 elif conffile in (self.DEFAULT_CONF_FILES, ''):
457 self.conf_read_file(None)
459 self.conf_read_file(conffile)
461 for key, value in conf.items():
462 self.conf_set(key, value)
466 Get associated client addresses with this RADOS session.
468 self.require_state("configuring", "connected")
476 ret = rados_getaddrs(self.cluster, &addrs)
478 raise make_ex(ret, "error calling getaddrs")
480 return decode_cstr(addrs)
484 def require_state(self, *args):
486 Checks if the Rados object is in a special state
488 :raises: :class:`RadosStateError`
490 if self.state in args:
492 raise RadosStateError("You cannot perform that operation on a \
493 Rados object in state %s." % self.state)
497 Disconnects from the cluster. Call this explicitly when a
498 Rados.connect()ed object is no longer used.
500 if self.state != "shutdown":
502 rados_shutdown(self.cluster)
503 self.state = "shutdown"
509 def __exit__(self, type_, value, traceback):
513 def version(self) -> Version:
515 Get the version number of the ``librados`` C library.
517 :returns: a tuple of ``(major, minor, extra)`` components of the
524 rados_version(&major, &minor, &extra)
525 return Version(major, minor, extra)
527 def conf_read_file(self, path: Optional[str] = None):
529 Configure the cluster handle using a Ceph config file.
531 :param path: path to the config file
533 self.require_state("configuring", "connected")
534 path_raw = cstr(path, 'path', opt=True)
536 char *_path = opt_str(path_raw)
538 ret = rados_conf_read_file(self.cluster, _path)
540 raise make_ex(ret, "error calling conf_read_file")
542 def conf_parse_argv(self, args: Sequence[str]):
544 Parse known arguments from args, and remove; returned
545 args contain only those unknown to ceph
547 self.require_state("configuring", "connected")
551 cargs = cstr_list(args, 'args')
553 int _argc = len(args)
554 char **_argv = to_bytes_array(cargs)
555 char **_remargv = NULL
558 _remargv = <char **>malloc(_argc * sizeof(char *))
560 ret = rados_conf_parse_argv_remainder(self.cluster, _argc,
562 <const char**>_remargv)
564 raise make_ex(ret, "error calling conf_parse_argv_remainder")
566 # _remargv was allocated with fixed argc; collapse return
567 # list to eliminate any missing args
568 retargs = [decode_cstr(a) for a in _remargv[:_argc]
570 self.parsed_args = args
576 def conf_parse_env(self, var: Optional[str] = 'CEPH_ARGS'):
578 Parse known arguments from an environment variable, normally
581 self.require_state("configuring", "connected")
585 var_raw = cstr(var, 'var')
589 ret = rados_conf_parse_env(self.cluster, _var)
591 raise make_ex(ret, "error calling conf_parse_env")
593 def conf_get(self, option: str) -> Optional[str]:
595 Get the value of a configuration option
597 :param option: which option to read
599 :returns: value of the option or None
600 :raises: :class:`TypeError`
602 self.require_state("configuring", "connected")
603 option_raw = cstr(option, 'option')
605 char *_option = option_raw
611 ret_buf = <char *>realloc_chk(ret_buf, length)
613 ret = rados_conf_get(self.cluster, _option, ret_buf, length)
615 return decode_cstr(ret_buf)
616 elif ret == -errno.ENAMETOOLONG:
618 elif ret == -errno.ENOENT:
621 raise make_ex(ret, "error calling conf_get")
625 def conf_set(self, option: str, val: str):
627 Set the value of a configuration option
629 :param option: which option to set
630 :param option: value of the option
632 :raises: :class:`TypeError`, :class:`ObjectNotFound`
634 self.require_state("configuring", "connected")
635 option_raw = cstr(option, 'option')
636 val_raw = cstr(val, 'val')
638 char *_option = option_raw
642 ret = rados_conf_set(self.cluster, _option, _val)
644 raise make_ex(ret, "error calling conf_set")
646 def ping_monitor(self, mon_id: str):
648 Ping a monitor to assess liveness
650 May be used as a simply way to assess liveness, or to obtain
651 information about the monitor in a simple way even in the
654 :param mon_id: the ID portion of the monitor's name (i.e., mon.<ID>)
655 :returns: the string reply from the monitor
658 self.require_state("configuring", "connected")
660 mon_id_raw = cstr(mon_id, 'mon_id')
662 char *_mon_id = mon_id_raw
667 ret = rados_ping_monitor(self.cluster, _mon_id, &outstr, &outstrlen)
670 raise make_ex(ret, "error calling ping_monitor")
673 my_outstr = outstr[:outstrlen]
674 rados_buffer_free(outstr)
675 return decode_cstr(my_outstr)
677 def connect(self, timeout: int = 0):
679 Connect to the cluster. Use shutdown() to release resources.
681 :param timeout: Any supplied timeout value is currently ignored.
683 self.require_state("configuring")
684 # NOTE(sileht): timeout was supported by old python API,
685 # but this is not something available in C API, so ignore
686 # for now and remove it later
688 ret = rados_connect(self.cluster)
690 raise make_ex(ret, "error connecting to the cluster")
691 self.state = "connected"
693 def get_instance_id(self) -> int:
695 Get a global id for current instance
697 self.require_state("connected")
699 ret = rados_get_instance_id(self.cluster)
702 def get_cluster_stats(self) -> Dict[str, int]:
704 Read usage info about the cluster
706 This tells you total space, space used, space available, and number
707 of objects. These are not updated immediately when data is written,
708 they are eventually consistent.
709 :returns: contains the following keys:
711 - ``kb`` (int) - total space
713 - ``kb_used`` (int) - space used
715 - ``kb_avail`` (int) - free space available
717 - ``num_objects`` (int) - number of objects
721 rados_cluster_stat_t stats
724 ret = rados_cluster_stat(self.cluster, &stats)
728 ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id)
729 return {'kb': stats.kb,
730 'kb_used': stats.kb_used,
731 'kb_avail': stats.kb_avail,
732 'num_objects': stats.num_objects}
734 def pool_exists(self, pool_name: str) -> bool:
736 Checks if a given pool exists.
738 :param pool_name: name of the pool to check
740 :raises: :class:`TypeError`, :class:`Error`
741 :returns: whether the pool exists, false otherwise.
743 self.require_state("connected")
745 pool_name_raw = cstr(pool_name, 'pool_name')
747 char *_pool_name = pool_name_raw
750 ret = rados_pool_lookup(self.cluster, _pool_name)
753 elif ret == -errno.ENOENT:
756 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
758 def pool_lookup(self, pool_name: str) -> int:
760 Returns a pool's ID based on its name.
762 :param pool_name: name of the pool to look up
764 :raises: :class:`TypeError`, :class:`Error`
765 :returns: pool ID, or None if it doesn't exist
767 self.require_state("connected")
768 pool_name_raw = cstr(pool_name, 'pool_name')
770 char *_pool_name = pool_name_raw
773 ret = rados_pool_lookup(self.cluster, _pool_name)
776 elif ret == -errno.ENOENT:
779 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
781 def pool_reverse_lookup(self, pool_id: int) -> Optional[str]:
783 Returns a pool's name based on its ID.
785 :param pool_id: ID of the pool to look up
787 :raises: :class:`TypeError`, :class:`Error`
788 :returns: pool name, or None if it doesn't exist
790 self.require_state("connected")
792 int64_t _pool_id = pool_id
798 name = <char *>realloc_chk(name, size)
800 ret = rados_pool_reverse_lookup(self.cluster, _pool_id, name, size)
803 elif ret != -errno.ERANGE and size <= 4096:
805 elif ret == -errno.ENOENT:
808 raise make_ex(ret, "error reverse looking up pool '%s'" % pool_id)
810 return decode_cstr(name)
815 def create_pool(self, pool_name: str,
816 crush_rule: Optional[int] = None,
817 auid: Optional[int] = None):
820 - with default settings: if crush_rule=None and auid=None
821 - with a specific CRUSH rule: crush_rule given
822 - with a specific auid: auid given
823 - with a specific CRUSH rule and auid: crush_rule and auid given
825 :param pool_name: name of the pool to create
826 :param crush_rule: rule to use for placement in the new pool
827 :param auid: id of the owner of the new pool
829 :raises: :class:`TypeError`, :class:`Error`
831 self.require_state("connected")
833 pool_name_raw = cstr(pool_name, 'pool_name')
835 char *_pool_name = pool_name_raw
839 if crush_rule is None and auid is None:
841 ret = rados_pool_create(self.cluster, _pool_name)
842 elif crush_rule is not None and auid is None:
843 _crush_rule = crush_rule
845 ret = rados_pool_create_with_crush_rule(self.cluster, _pool_name, _crush_rule)
846 elif crush_rule is None and auid is not None:
849 ret = rados_pool_create_with_auid(self.cluster, _pool_name, _auid)
851 _crush_rule = crush_rule
854 ret = rados_pool_create_with_all(self.cluster, _pool_name, _auid, _crush_rule)
856 raise make_ex(ret, "error creating pool '%s'" % pool_name)
858 def get_pool_base_tier(self, pool_id: int) -> int:
862 :returns: base pool, or pool_id if tiering is not configured for the pool
864 self.require_state("connected")
866 int64_t base_tier = 0
867 int64_t _pool_id = pool_id
870 ret = rados_pool_get_base_tier(self.cluster, _pool_id, &base_tier)
872 raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id)
873 return int(base_tier)
875 def delete_pool(self, pool_name: str):
877 Delete a pool and all data inside it.
879 The pool is removed from the cluster immediately,
880 but the actual data is deleted in the background.
882 :param pool_name: name of the pool to delete
884 :raises: :class:`TypeError`, :class:`Error`
886 self.require_state("connected")
888 pool_name_raw = cstr(pool_name, 'pool_name')
890 char *_pool_name = pool_name_raw
893 ret = rados_pool_delete(self.cluster, _pool_name)
895 raise make_ex(ret, "error deleting pool '%s'" % pool_name)
897 def get_inconsistent_pgs(self, pool_id: int) -> List[str]:
899 List inconsistent placement groups in the given pool
901 :param pool_id: ID of the pool in which PGs are listed
902 :returns: inconsistent placement groups
904 self.require_state("connected")
906 int64_t pool = pool_id
912 pgs = <char *>realloc_chk(pgs, size);
914 ret = rados_inconsistent_pg_list(self.cluster, pool,
921 raise make_ex(ret, "error calling inconsistent_pg_list")
922 return [pg for pg in decode_cstr(pgs[:ret]).split('\0') if pg]
926 def list_pools(self) -> List[str]:
928 Gets a list of pool names.
930 :returns: list of pool names.
932 self.require_state("connected")
939 c_names = <char *>realloc_chk(c_names, size)
941 ret = rados_pool_list(self.cluster, c_names, size)
946 return [name for name in decode_cstr(c_names[:ret]).split('\0')
951 def get_fsid(self) -> str:
953 Get the fsid of the cluster as a hexadecimal string.
955 :raises: :class:`Error`
956 :returns: cluster fsid
958 self.require_state("connected")
965 ret_buf = <char *>realloc_chk(ret_buf, buf_len)
967 ret = rados_cluster_fsid(self.cluster, ret_buf, buf_len)
968 if ret == -errno.ERANGE:
969 buf_len = buf_len * 2
971 raise make_ex(ret, "error getting cluster fsid")
974 return decode_cstr(ret_buf)
978 def open_ioctx(self, ioctx_name: str) -> Ioctx:
982 The io context allows you to perform operations within a particular
985 :param ioctx_name: name of the pool
987 :raises: :class:`TypeError`, :class:`Error`
988 :returns: Rados Ioctx object
990 self.require_state("connected")
991 ioctx_name_raw = cstr(ioctx_name, 'ioctx_name')
994 char *_ioctx_name = ioctx_name_raw
996 ret = rados_ioctx_create(self.cluster, _ioctx_name, &ioctx)
998 raise make_ex(ret, "error opening pool '%s'" % ioctx_name)
999 io = Ioctx(self, ioctx_name)
1003 def open_ioctx2(self, pool_id: int) -> Ioctx:
1005 Create an io context
1007 The io context allows you to perform operations within a particular
1010 :param pool_id: ID of the pool
1012 :raises: :class:`TypeError`, :class:`Error`
1013 :returns: Rados Ioctx object
1015 self.require_state("connected")
1018 int64_t _pool_id = pool_id
1020 ret = rados_ioctx_create2(self.cluster, _pool_id, &ioctx)
1022 raise make_ex(ret, "error opening pool id '%s'" % pool_id)
1023 io = Ioctx(self, str(pool_id))
1027 def mon_command(self,
1031 target: Optional[Union[str, int]] = None) -> Tuple[int, bytes, str]:
1033 Send a command to the mon.
1035 mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen)
1037 :param cmd: JSON formatted string.
1038 :param inbuf: optional string.
1039 :param timeout: This parameter is ignored.
1040 :param target: name or rank of a specific mon. Optional
1041 :return: (int ret, string outbuf, string outs)
1046 >>> c = Rados(conffile='/etc/ceph/ceph.conf')
1048 >>> cmd = json.dumps({"prefix": "osd safe-to-destroy", "ids": ["2"], "format": "json"})
1049 >>> c.mon_command(cmd, b'')
1051 # NOTE(sileht): timeout is ignored because C API doesn't provide
1052 # timeout argument, but we keep it for backward compat with old python binding
1053 self.require_state("connected")
1054 cmds = [cstr(cmd, 'cmd')]
1056 if isinstance(target, int):
1057 # NOTE(sileht): looks weird but test_monmap_dump pass int
1058 target = str(target)
1060 target = cstr(target, 'target', opt=True)
1063 char *_target = opt_str(target)
1064 char **_cmd = to_bytes_array(cmds)
1065 size_t _cmdlen = len(cmds)
1067 char *_inbuf = inbuf
1068 size_t _inbuf_len = len(inbuf)
1078 ret = rados_mon_command_target(self.cluster, _target,
1079 <const char **>_cmd, _cmdlen,
1080 <const char*>_inbuf, _inbuf_len,
1081 &_outbuf, &_outbuf_len,
1085 ret = rados_mon_command(self.cluster,
1086 <const char **>_cmd, _cmdlen,
1087 <const char*>_inbuf, _inbuf_len,
1088 &_outbuf, &_outbuf_len,
1091 my_outs = decode_cstr(_outs[:_outs_len])
1092 my_outbuf = _outbuf[:_outbuf_len]
1094 rados_buffer_free(_outs)
1096 rados_buffer_free(_outbuf)
1097 return (ret, my_outbuf, my_outs)
1101 def osd_command(self,
1105 timeout: int = 0) -> Tuple[int, bytes, str]:
1107 osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1109 :return: (int ret, string outbuf, string outs)
1111 # NOTE(sileht): timeout is ignored because C API doesn't provide
1112 # timeout argument, but we keep it for backward compat with old python binding
1113 self.require_state("connected")
1115 cmds = [cstr(cmd, 'cmd')]
1119 char **_cmd = to_bytes_array(cmds)
1120 size_t _cmdlen = len(cmds)
1122 char *_inbuf = inbuf
1123 size_t _inbuf_len = len(inbuf)
1132 ret = rados_osd_command(self.cluster, _osdid,
1133 <const char **>_cmd, _cmdlen,
1134 <const char*>_inbuf, _inbuf_len,
1135 &_outbuf, &_outbuf_len,
1138 my_outs = decode_cstr(_outs[:_outs_len])
1139 my_outbuf = _outbuf[:_outbuf_len]
1141 rados_buffer_free(_outs)
1143 rados_buffer_free(_outbuf)
1144 return (ret, my_outbuf, my_outs)
1148 def mgr_command(self,
1152 target: Optional[str] = None) -> Tuple[int, str, bytes]:
1154 :return: (int ret, string outbuf, string outs)
1156 # NOTE(sileht): timeout is ignored because C API doesn't provide
1157 # timeout argument, but we keep it for backward compat with old python binding
1158 self.require_state("connected")
1160 cmds = [cstr(cmd, 'cmd')]
1161 target = cstr(target, 'target', opt=True)
1164 char *_target = opt_str(target)
1166 char **_cmd = to_bytes_array(cmds)
1167 size_t _cmdlen = len(cmds)
1169 char *_inbuf = inbuf
1170 size_t _inbuf_len = len(inbuf)
1178 if target is not None:
1180 ret = rados_mgr_command_target(self.cluster,
1181 <const char*>_target,
1182 <const char **>_cmd, _cmdlen,
1183 <const char*>_inbuf, _inbuf_len,
1184 &_outbuf, &_outbuf_len,
1188 ret = rados_mgr_command(self.cluster,
1189 <const char **>_cmd, _cmdlen,
1190 <const char*>_inbuf, _inbuf_len,
1191 &_outbuf, &_outbuf_len,
1194 my_outs = decode_cstr(_outs[:_outs_len])
1195 my_outbuf = _outbuf[:_outbuf_len]
1197 rados_buffer_free(_outs)
1199 rados_buffer_free(_outbuf)
1200 return (ret, my_outbuf, my_outs)
1204 def pg_command(self,
1208 timeout: int = 0) -> Tuple[int, bytes, str]:
1210 pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1212 :return: (int ret, string outbuf, string outs)
1214 # NOTE(sileht): timeout is ignored because C API doesn't provide
1215 # timeout argument, but we keep it for backward compat with old python binding
1216 self.require_state("connected")
1218 pgid_raw = cstr(pgid, 'pgid')
1219 cmds = [cstr(cmd, 'cmd')]
1222 char *_pgid = pgid_raw
1223 char **_cmd = to_bytes_array(cmds)
1224 size_t _cmdlen = len(cmds)
1226 char *_inbuf = inbuf
1227 size_t _inbuf_len = len(inbuf)
1236 ret = rados_pg_command(self.cluster, _pgid,
1237 <const char **>_cmd, _cmdlen,
1238 <const char *>_inbuf, _inbuf_len,
1239 &_outbuf, &_outbuf_len,
1242 my_outs = decode_cstr(_outs[:_outs_len])
1243 my_outbuf = _outbuf[:_outbuf_len]
1245 rados_buffer_free(_outs)
1247 rados_buffer_free(_outbuf)
1248 return (ret, my_outbuf, my_outs)
1252 def wait_for_latest_osdmap(self) -> int:
1253 self.require_state("connected")
1255 ret = rados_wait_for_latest_osdmap(self.cluster)
1258 def blocklist_add(self, client_address: str, expire_seconds: int = 0):
1260 Blocklist a client from the OSDs
1262 :param client_address: client address
1263 :param expire_seconds: number of seconds to blocklist
1265 :raises: :class:`Error`
1267 self.require_state("connected")
1268 client_address_raw = cstr(client_address, 'client_address')
1270 uint32_t _expire_seconds = expire_seconds
1271 char *_client_address = client_address_raw
1274 ret = rados_blocklist_add(self.cluster, _client_address, _expire_seconds)
1276 raise make_ex(ret, "error blocklisting client '%s'" % client_address)
1278 def monitor_log(self, level: str,
1279 callback: Optional[Callable[[object, str, str, str, int, int, int, str, str], None]] = None,
1280 arg: Optional[object] = None):
1281 if level not in MONITOR_LEVELS:
1282 raise LogicError("invalid monitor level " + level)
1283 if callback is not None and not callable(callback):
1284 raise LogicError("callback must be a callable function or None")
1286 level_raw = cstr(level, 'level')
1287 cdef char *_level = level_raw
1289 if callback is None:
1291 r = rados_monitor_log(self.cluster, <const char*>_level, NULL, NULL)
1292 self.monitor_callback = None
1293 self.monitor_callback2 = None
1296 cb = (callback, arg)
1297 cdef PyObject* _arg = <PyObject*>cb
1299 r = rados_monitor_log(self.cluster, <const char*>_level,
1300 <rados_log_callback_t>&__monitor_callback, _arg)
1303 raise make_ex(r, 'error calling rados_monitor_log')
1304 # NOTE(sileht): Prevents the callback method from being garbage collected
1305 self.monitor_callback = cb
1306 self.monitor_callback2 = None
1308 def monitor_log2(self, level: str,
1309 callback: Optional[Callable[[object, str, str, str, str, int, int, int, str, str], None]] = None,
1310 arg: Optional[object] = None):
1311 if level not in MONITOR_LEVELS:
1312 raise LogicError("invalid monitor level " + level)
1313 if callback is not None and not callable(callback):
1314 raise LogicError("callback must be a callable function or None")
1316 level_raw = cstr(level, 'level')
1317 cdef char *_level = level_raw
1319 if callback is None:
1321 r = rados_monitor_log2(self.cluster, <const char*>_level, NULL, NULL)
1322 self.monitor_callback = None
1323 self.monitor_callback2 = None
1326 cb = (callback, arg)
1327 cdef PyObject* _arg = <PyObject*>cb
1329 r = rados_monitor_log2(self.cluster, <const char*>_level,
1330 <rados_log_callback2_t>&__monitor_callback2, _arg)
1333 raise make_ex(r, 'error calling rados_monitor_log')
1334 # NOTE(sileht): Prevents the callback method from being garbage collected
1335 self.monitor_callback = None
1336 self.monitor_callback2 = cb
1338 def service_daemon_register(self, service: str, daemon: str, metadata: Dict[str, str]):
1340 :param str service: service name (e.g. "rgw")
1341 :param str daemon: daemon name (e.g. "gwfoo")
1342 :param dict metadata: static metadata about the register daemon
1343 (e.g., the version of Ceph, the kernel version.)
1345 service_raw = cstr(service, 'service')
1346 daemon_raw = cstr(daemon, 'daemon')
1347 metadata_dict = flatten_dict(metadata, 'metadata')
1349 char *_service = service_raw
1350 char *_daemon = daemon_raw
1351 char *_metadata = metadata_dict
1354 ret = rados_service_register(self.cluster, _service, _daemon, _metadata)
1356 raise make_ex(ret, "error calling service_register()")
1358 def service_daemon_update(self, status: Dict[str, str]):
1359 status_dict = flatten_dict(status, 'status')
1361 char *_status = status_dict
1364 ret = rados_service_update_status(self.cluster, _status)
1366 raise make_ex(ret, "error calling service_daemon_update()")
1369 cdef class OmapIterator(object):
1372 cdef public Ioctx ioctx
1373 cdef public object omap_key_type
1374 cdef rados_omap_iter_t ctx
1376 def __cinit__(self, Ioctx ioctx, omap_key_type):
1378 self.omap_key_type = omap_key_type
1385 Get the next key-value pair in the object
1386 :returns: next rados.OmapItem
1394 ret = rados_omap_get_next(self.ctx, &key_, &val_, &len_)
1397 raise make_ex(ret, "error iterating over the omap")
1399 raise StopIteration()
1400 key = self.omap_key_type(key_)
1406 def __dealloc__(self):
1408 rados_omap_get_end(self.ctx)
1411 cdef class ObjectIterator(object):
1412 """rados.Ioctx Object iterator"""
1414 cdef rados_list_ctx_t ctx
1416 cdef public object ioctx
1418 def __cinit__(self, Ioctx ioctx):
1422 ret = rados_nobjects_list_open(ioctx.io, &self.ctx)
1424 raise make_ex(ret, "error iterating over the objects in ioctx '%s'"
1432 Get the next object name and locator in the pool
1434 :raises: StopIteration
1435 :returns: next rados.Ioctx Object
1438 const char *key_ = NULL
1439 const char *locator_ = NULL
1440 const char *nspace_ = NULL
1441 size_t key_size_ = 0
1442 size_t locator_size_ = 0
1443 size_t nspace_size_ = 0
1446 ret = rados_nobjects_list_next2(self.ctx, &key_, &locator_, &nspace_,
1447 &key_size_, &locator_size_, &nspace_size_)
1450 raise StopIteration()
1452 key = decode_cstr(key_[:key_size_])
1453 locator = decode_cstr(locator_[:locator_size_]) if locator_ != NULL else None
1454 nspace = decode_cstr(nspace_[:nspace_size_]) if nspace_ != NULL else None
1455 return Object(self.ioctx, key, locator, nspace)
1457 def __dealloc__(self):
1459 rados_nobjects_list_close(self.ctx)
1462 cdef class XattrIterator(object):
1463 """Extended attribute iterator"""
1465 cdef rados_xattrs_iter_t it
1468 cdef public Ioctx ioctx
1469 cdef public object oid
1471 def __cinit__(self, Ioctx ioctx, oid):
1473 self.oid = cstr(oid, 'oid')
1474 self._oid = self.oid
1477 ret = rados_getxattrs(ioctx.io, self._oid, &self.it)
1479 raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid)
1486 Get the next xattr on the object
1488 :raises: StopIteration
1489 :returns: pair - of name and value of the next Xattr
1492 const char *name_ = NULL
1493 const char *val_ = NULL
1497 ret = rados_getxattrs_next(self.it, &name_, &val_, &len_)
1499 raise make_ex(ret, "error iterating over the extended attributes \
1500 in '%s'" % self.oid)
1502 raise StopIteration()
1503 name = decode_cstr(name_)
1507 def __dealloc__(self):
1509 rados_getxattrs_end(self.it)
1512 cdef class SnapIterator(object):
1513 """Snapshot iterator"""
1515 cdef public Ioctx ioctx
1517 cdef rados_snap_t *snaps
1521 def __cinit__(self, Ioctx ioctx):
1523 # We don't know how big a buffer we need until we've called the
1524 # function. So use the exponential doubling strategy.
1525 cdef int num_snaps = 10
1527 self.snaps = <rados_snap_t*>realloc_chk(self.snaps,
1529 sizeof(rados_snap_t))
1532 ret = rados_ioctx_snap_list(ioctx.io, self.snaps, num_snaps)
1536 elif ret != -errno.ERANGE:
1537 raise make_ex(ret, "error calling rados_snap_list for \
1538 ioctx '%s'" % self.ioctx.name)
1539 num_snaps = num_snaps * 2
1542 def __iter__(self) -> 'SnapIterator':
1545 def __next__(self) -> 'Snap':
1547 Get the next Snapshot
1549 :raises: :class:`Error`, StopIteration
1550 :returns: next snapshot
1552 if self.cur_snap >= self.max_snap:
1556 rados_snap_t snap_id = self.snaps[self.cur_snap]
1562 name = <char *>realloc_chk(name, name_len)
1564 ret = rados_ioctx_snap_get_name(self.ioctx.io, snap_id, name, name_len)
1567 elif ret != -errno.ERANGE:
1568 raise make_ex(ret, "rados_snap_get_name error")
1570 name_len = name_len * 2
1572 snap = Snap(self.ioctx, decode_cstr(name[:name_len]).rstrip('\0'), snap_id)
1573 self.cur_snap = self.cur_snap + 1
1579 cdef class Snap(object):
1580 """Snapshot object"""
1581 cdef public Ioctx ioctx
1582 cdef public object name
1584 # NOTE(sileht): old API was storing the ctypes object
1585 # instead of the value ....
1586 cdef public rados_snap_t snap_id
1588 def __cinit__(self, Ioctx ioctx, object name, rados_snap_t snap_id):
1591 self.snap_id = snap_id
1594 return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \
1595 % (str(self.ioctx), self.name, self.snap_id)
1597 def get_timestamp(self) -> float:
1599 Find when a snapshot in the current pool occurred
1601 :raises: :class:`Error`
1602 :returns: the data and time the snapshot was created
1604 cdef time_t snap_time
1607 ret = rados_ioctx_snap_get_stamp(self.ioctx.io, self.snap_id, &snap_time)
1609 raise make_ex(ret, "rados_ioctx_snap_get_stamp error")
1610 return datetime.fromtimestamp(snap_time)
1612 # https://github.com/cython/cython/issues/1370
1615 cdef class Completion(object):
1616 """completion object"""
1624 rados_callback_t complete_cb
1625 rados_callback_t safe_cb
1626 rados_completion_t rados_comp
1629 def __cinit__(self, Ioctx ioctx, object oncomplete, object onsafe):
1630 self.oncomplete = oncomplete
1631 self.onsafe = onsafe
1634 def is_safe(self) -> bool:
1636 Is an asynchronous operation safe?
1638 This does not imply that the safe callback has finished.
1640 :returns: True if the operation is safe
1642 return self.is_complete()
1644 def is_complete(self) -> bool:
1646 Has an asynchronous operation completed?
1648 This does not imply that the safe callback has finished.
1650 :returns: True if the operation is completed
1653 ret = rados_aio_is_complete(self.rados_comp)
1656 def wait_for_safe(self):
1658 Wait for an asynchronous operation to be marked safe
1660 wait_for_safe() is an alias of wait_for_complete() since Luminous
1662 self.wait_for_complete()
1664 def wait_for_complete(self):
1666 Wait for an asynchronous operation to complete
1668 This does not imply that the complete callback has finished.
1671 rados_aio_wait_for_complete(self.rados_comp)
1673 def wait_for_safe_and_cb(self):
1675 Wait for an asynchronous operation to be marked safe and for
1676 the safe callback to have returned
1678 return self.wait_for_complete_and_cb()
1680 def wait_for_complete_and_cb(self):
1682 Wait for an asynchronous operation to complete and for the
1683 complete callback to have returned
1685 :returns: whether the operation is completed
1688 ret = rados_aio_wait_for_complete_and_cb(self.rados_comp)
1691 def get_return_value(self) -> int:
1693 Get the return value of an asychronous operation
1695 The return value is set when the operation is complete or safe,
1696 whichever comes first.
1698 :returns: return value of the operation
1701 ret = rados_aio_get_return_value(self.rados_comp)
1704 def __dealloc__(self):
1706 Release a completion
1708 Call this when you no longer need the completion. It may not be
1709 freed immediately if the operation is not acked and committed.
1711 ref.Py_XDECREF(self.buf)
1713 if self.rados_comp != NULL:
1715 rados_aio_release(self.rados_comp)
1716 self.rados_comp = NULL
1718 def _complete(self):
1719 self.oncomplete(self)
1725 with self.ioctx.lock:
1727 self.ioctx.complete_completions.remove(self)
1729 self.ioctx.safe_completions.remove(self)
1732 class OpCtx(object):
1733 def __enter__(self):
1734 return self.create()
1736 def __exit__(self, type, msg, traceback):
1740 cdef class WriteOp(object):
1741 cdef rados_write_op_t write_op
1745 self.write_op = rados_create_write_op()
1750 rados_release_write_op(self.write_op)
1752 def new(self, exclusive: Optional[int] = None):
1758 int _exclusive = exclusive
1761 rados_write_op_create(self.write_op, _exclusive, NULL)
1769 rados_write_op_remove(self.write_op)
1771 def set_flags(self, flags: int = LIBRADOS_OPERATION_NOFLAG):
1773 Set flags for the last operation added to this write_op.
1774 :para flags: flags to apply to the last operation
1781 rados_write_op_set_flags(self.write_op, _flags)
1783 def set_xattr(self, xattr_name: str, xattr_value: bytes):
1785 Set an extended attribute on an object.
1786 :param xattr_name: name of the xattr
1787 :param xattr_value: buffer to set xattr to
1789 xattr_name_raw = cstr(xattr_name, 'xattr_name')
1791 char *_xattr_name = xattr_name_raw
1792 char *_xattr_value = xattr_value
1793 size_t _xattr_value_len = len(xattr_value)
1795 rados_write_op_setxattr(self.write_op, _xattr_name, _xattr_value, _xattr_value_len)
1797 def rm_xattr(self, xattr_name: str):
1799 Removes an extended attribute on from an object.
1800 :param xattr_name: name of the xattr to remove
1802 xattr_name_raw = cstr(xattr_name, 'xattr_name')
1804 char *_xattr_name = xattr_name_raw
1806 rados_write_op_rmxattr(self.write_op, _xattr_name)
1808 def append(self, to_write: bytes):
1810 Append data to an object synchronously
1811 :param to_write: data to write
1815 char *_to_write = to_write
1816 size_t length = len(to_write)
1819 rados_write_op_append(self.write_op, _to_write, length)
1821 def write_full(self, to_write: bytes):
1823 Write whole object, atomically replacing it.
1824 :param to_write: data to write
1828 char *_to_write = to_write
1829 size_t length = len(to_write)
1832 rados_write_op_write_full(self.write_op, _to_write, length)
1834 def write(self, to_write: bytes, offset: int = 0):
1837 :param to_write: data to write
1838 :param offset: byte offset in the object to begin writing at
1842 char *_to_write = to_write
1843 size_t length = len(to_write)
1844 uint64_t _offset = offset
1847 rados_write_op_write(self.write_op, _to_write, length, _offset)
1849 def assert_version(self, version: int):
1851 Check if object's version is the expected one.
1852 :param version: expected version of the object
1856 uint64_t _version = version
1859 rados_write_op_assert_version(self.write_op, _version)
1861 def zero(self, offset: int, length: int):
1863 Zero part of an object.
1864 :param offset: byte offset in the object to begin writing at
1865 :param offset: number of zero to write
1869 size_t _length = length
1870 uint64_t _offset = offset
1873 rados_write_op_zero(self.write_op, _length, _offset)
1875 def truncate(self, offset: int):
1878 :param offset: byte offset in the object to begin truncating at
1882 uint64_t _offset = offset
1885 rados_write_op_truncate(self.write_op, _offset)
1887 def execute(self, cls: str, method: str, data: bytes):
1889 Execute an OSD class method on an object
1891 :param cls: name of the object class
1892 :param method: name of the method
1893 :param data: input data
1896 cls_raw = cstr(cls, 'cls')
1897 method_raw = cstr(method, 'method')
1899 char *_cls = cls_raw
1900 char *_method = method_raw
1902 size_t _data_len = len(data)
1905 rados_write_op_exec(self.write_op, _cls, _method, _data, _data_len, NULL)
1907 def writesame(self, to_write: bytes, write_len: int, offset: int = 0):
1909 Write the same buffer multiple times
1910 :param to_write: data to write
1911 :param write_len: total number of bytes to write
1912 :param offset: byte offset in the object to begin writing at
1915 char *_to_write = to_write
1916 size_t _data_len = len(to_write)
1917 size_t _write_len = write_len
1918 uint64_t _offset = offset
1920 rados_write_op_writesame(self.write_op, _to_write, _data_len, _write_len, _offset)
1922 def cmpext(self, cmp_buf: bytes, offset: int = 0):
1924 Ensure that given object range (extent) satisfies comparison
1925 :param cmp_buf: buffer containing bytes to be compared with object contents
1926 :param offset: object byte offset at which to start the comparison
1929 char *_cmp_buf = cmp_buf
1930 size_t _cmp_buf_len = len(cmp_buf)
1931 uint64_t _offset = offset
1933 rados_write_op_cmpext(self.write_op, _cmp_buf, _cmp_buf_len, _offset, NULL)
1935 def omap_cmp(self, key: OMAP_KEY_TYPE, val: OMAP_KEY_TYPE, cmp_op: int = LIBRADOS_CMPXATTR_OP_EQ):
1937 Ensure that an omap key value satisfies comparison
1938 :param key: omap key whose associated value is evaluated for comparison
1939 :param val: value to compare with
1940 :param cmp_op: comparison operator, one of LIBRADOS_CMPXATTR_OP_EQ (1),
1941 LIBRADOS_CMPXATTR_OP_GT (3), or LIBRADOS_CMPXATTR_OP_LT (5).
1943 key_raw = cstr(key, 'key')
1944 val_raw = cstr(val, 'val')
1946 char *_key = key_raw
1947 char *_val = val_raw
1948 size_t _val_len = len(val)
1949 uint8_t _comparison_operator = cmp_op
1951 rados_write_op_omap_cmp(self.write_op, _key, _comparison_operator, _val, _val_len, NULL)
1953 class WriteOpCtx(WriteOp, OpCtx):
1954 """write operation context manager"""
1957 cdef class ReadOp(object):
1958 cdef rados_read_op_t read_op
1962 self.read_op = rados_create_read_op()
1967 rados_release_read_op(self.read_op)
1969 def cmpext(self, cmp_buf: bytes, offset: int = 0):
1971 Ensure that given object range (extent) satisfies comparison
1972 :param cmp_buf: buffer containing bytes to be compared with object contents
1973 :param offset: object byte offset at which to start the comparison
1976 char *_cmp_buf = cmp_buf
1977 size_t _cmp_buf_len = len(cmp_buf)
1978 uint64_t _offset = offset
1980 rados_read_op_cmpext(self.read_op, _cmp_buf, _cmp_buf_len, _offset, NULL)
1982 def set_flags(self, flags: int = LIBRADOS_OPERATION_NOFLAG):
1984 Set flags for the last operation added to this read_op.
1985 :para flags: flags to apply to the last operation
1992 rados_read_op_set_flags(self.read_op, _flags)
1995 class ReadOpCtx(ReadOp, OpCtx):
1996 """read operation context manager"""
1999 cdef void __watch_callback(void *_arg, int64_t _notify_id, uint64_t _cookie,
2000 uint64_t _notifier_id, void *_data,
2001 size_t _data_len) with gil:
2005 cdef object watch = <object>_arg
2008 data = (<char *>_data)[:_data_len]
2009 watch._callback(_notify_id, _notifier_id, _cookie, data)
2011 cdef void __watch_error_callback(void *_arg, uint64_t _cookie,
2012 int _error) with gil:
2014 Watch error callback
2016 cdef object watch = <object>_arg
2017 watch._error_callback(_cookie, _error)
2020 cdef class Watch(object):
2028 object error_callback
2030 def __cinit__(self, Ioctx ioctx, object oid, object callback,
2031 object error_callback, object timeout):
2033 self.ioctx = ioctx.dup()
2034 self.oid = cstr(oid, 'oid')
2035 self.callback = callback
2036 self.error_callback = error_callback
2042 char *_oid = self.oid
2044 uint32_t _timeout = timeout;
2045 void *_args = <PyObject*>self
2048 ret = rados_watch3(self.ioctx.io, _oid, &_cookie,
2049 <rados_watchcb2_t>&__watch_callback,
2050 <rados_watcherrcb_t>&__watch_error_callback,
2053 raise make_ex(ret, "watch error")
2055 self.id = int(_cookie);
2057 def __enter__(self):
2060 def __exit__(self, type_, value, traceback):
2064 def __dealloc__(self):
2067 self.ioctx.rados.require_state("connected")
2070 def _callback(self, notify_id, notifier_id, watch_id, data):
2071 replay = self.callback(notify_id, notifier_id, watch_id, data)
2074 rados_ioctx_t _io = <rados_ioctx_t>self.ioctx.io
2075 char *_obj = self.oid
2076 int64_t _notify_id = notify_id
2077 uint64_t _cookie = watch_id
2078 char *_replay = NULL
2081 if replay is not None:
2082 replay = cstr(replay, 'replay')
2084 _replaylen = len(replay)
2087 rados_notify_ack(_io, _obj, _notify_id, _cookie, _replay,
2090 def _error_callback(self, watch_id, error):
2091 if self.error_callback is None:
2093 self.error_callback(watch_id, error)
2095 def get_id(self) -> int:
2100 Check on watch validity.
2102 :raises: :class:`Error`
2103 :returns: timedelta since last confirmed valid
2105 self.ioctx.require_ioctx_open()
2108 uint64_t _cookie = self.id
2111 ret = rados_watch_check(self.ioctx.io, _cookie)
2113 raise make_ex(ret, "check error")
2115 return timedelta(milliseconds=ret)
2119 Unregister an interest in an object.
2121 :raises: :class:`Error`
2126 self.ioctx.require_ioctx_open()
2129 uint64_t _cookie = self.id
2132 ret = rados_unwatch2(self.ioctx.io, _cookie)
2133 if ret < 0 and ret != -errno.ENOENT:
2134 raise make_ex(ret, "unwatch error")
2138 cluster = rados_ioctx_get_cluster(self.ioctx.io)
2139 ret = rados_watch_flush(cluster);
2141 raise make_ex(ret, "watch_flush error")
2146 cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
2148 Callback to oncomplete() for asynchronous operations
2150 cdef object cb = <object>args
2154 cdef class Ioctx(object):
2155 """rados.Ioctx object"""
2156 # NOTE(sileht): attributes declared in .pyd
2158 def __init__(self, rados, name):
2163 self.locator_key = ""
2165 self.lock = threading.Lock()
2166 self.safe_completions = []
2167 self.complete_completions = []
2169 def __enter__(self):
2172 def __exit__(self, type_, value, traceback):
2176 def __dealloc__(self):
2179 def __track_completion(self, completion_obj):
2180 if completion_obj.oncomplete:
2182 self.complete_completions.append(completion_obj)
2183 if completion_obj.onsafe:
2185 self.safe_completions.append(completion_obj)
2187 def __get_completion(self,
2188 oncomplete: Callable[[Completion], None],
2189 onsafe: Callable[[Completion], None]):
2191 Constructs a completion to use with asynchronous operations
2193 :param oncomplete: what to do when the write is safe and complete in memory
2195 :param onsafe: what to do when the write is safe and complete on storage
2198 :raises: :class:`Error`
2199 :returns: completion object
2202 completion_obj = Completion(self, oncomplete, onsafe)
2205 rados_callback_t complete_cb = NULL
2206 rados_completion_t completion
2207 PyObject* p_completion_obj= <PyObject*>completion_obj
2210 complete_cb = <rados_callback_t>&__aio_complete_cb
2213 ret = rados_aio_create_completion2(p_completion_obj, complete_cb,
2216 raise make_ex(ret, "error getting a completion")
2218 completion_obj.rados_comp = completion
2219 return completion_obj
2226 ioctx = self.rados.open_ioctx2(self.get_pool_id())
2227 ioctx.set_namespace(self.get_namespace())
2232 oncomplete: Callable[[Completion, Optional[int], Optional[time.struct_time]], None]) -> Completion:
2234 Asynchronously get object stats (size/mtime)
2236 oncomplete will be called with the returned size and mtime
2237 as well as the completion:
2239 oncomplete(completion, size, mtime)
2241 :param object_name: the name of the object to get stats from
2242 :param oncomplete: what to do when the stat is complete
2244 :raises: :class:`Error`
2245 :returns: completion object
2248 object_name_raw = cstr(object_name, 'object_name')
2251 Completion completion
2252 char *_object_name = object_name_raw
2256 def oncomplete_(completion_v):
2257 cdef Completion _completion_v = completion_v
2258 return_value = _completion_v.get_return_value()
2259 if return_value >= 0:
2260 return oncomplete(_completion_v, psize, time.localtime(pmtime))
2262 return oncomplete(_completion_v, None, None)
2264 completion = self.__get_completion(oncomplete_, None)
2265 self.__track_completion(completion)
2267 ret = rados_aio_stat(self.io, _object_name, completion.rados_comp,
2271 completion._cleanup()
2272 raise make_ex(ret, "error stating %s" % object_name)
2275 def aio_write(self, object_name: str, to_write: bytes, offset: int = 0,
2276 oncomplete: Optional[Callable[[Completion], None]] = None,
2277 onsafe: Optional[Callable[[Completion], None]] = None) -> Completion:
2279 Write data to an object asynchronously
2281 Queues the write and returns.
2283 :param object_name: name of the object
2284 :param to_write: data to write
2285 :param offset: byte offset in the object to begin writing at
2286 :param oncomplete: what to do when the write is safe and complete in memory
2288 :param onsafe: what to do when the write is safe and complete on storage
2291 :raises: :class:`Error`
2292 :returns: completion object
2295 object_name_raw = cstr(object_name, 'object_name')
2298 Completion completion
2299 char* _object_name = object_name_raw
2300 char* _to_write = to_write
2301 size_t size = len(to_write)
2302 uint64_t _offset = offset
2304 completion = self.__get_completion(oncomplete, onsafe)
2305 self.__track_completion(completion)
2307 ret = rados_aio_write(self.io, _object_name, completion.rados_comp,
2308 _to_write, size, _offset)
2310 completion._cleanup()
2311 raise make_ex(ret, "error writing object %s" % object_name)
2314 def aio_write_full(self, object_name: str, to_write: bytes,
2315 oncomplete: Optional[Callable] = None,
2316 onsafe: Optional[Callable] = None) -> Completion:
2318 Asynchronously write an entire object
2320 The object is filled with the provided data. If the object exists,
2321 it is atomically truncated and then written.
2322 Queues the write and returns.
2324 :param object_name: name of the object
2325 :param to_write: data to write
2326 :param oncomplete: what to do when the write is safe and complete in memory
2328 :param onsafe: what to do when the write is safe and complete on storage
2331 :raises: :class:`Error`
2332 :returns: completion object
2335 object_name_raw = cstr(object_name, 'object_name')
2338 Completion completion
2339 char* _object_name = object_name_raw
2340 char* _to_write = to_write
2341 size_t size = len(to_write)
2343 completion = self.__get_completion(oncomplete, onsafe)
2344 self.__track_completion(completion)
2346 ret = rados_aio_write_full(self.io, _object_name,
2347 completion.rados_comp,
2350 completion._cleanup()
2351 raise make_ex(ret, "error writing object %s" % object_name)
2354 def aio_writesame(self, object_name: str, to_write: bytes,
2355 write_len: int, offset: int = 0,
2356 oncomplete: Optional[Callable] = None) -> Completion:
2358 Asynchronously write the same buffer multiple times
2360 :param object_name: name of the object
2361 :param to_write: data to write
2362 :param write_len: total number of bytes to write
2363 :param offset: byte offset in the object to begin writing at
2364 :param oncomplete: what to do when the writesame is safe and
2365 complete in memory on all replicas
2366 :raises: :class:`Error`
2367 :returns: completion object
2370 object_name_raw = cstr(object_name, 'object_name')
2373 Completion completion
2374 char* _object_name = object_name_raw
2375 char* _to_write = to_write
2376 size_t _data_len = len(to_write)
2377 size_t _write_len = write_len
2378 uint64_t _offset = offset
2380 completion = self.__get_completion(oncomplete, None)
2381 self.__track_completion(completion)
2383 ret = rados_aio_writesame(self.io, _object_name, completion.rados_comp,
2384 _to_write, _data_len, _write_len, _offset)
2387 completion._cleanup()
2388 raise make_ex(ret, "error writing object %s" % object_name)
2391 def aio_append(self, object_name: str, to_append: bytes,
2392 oncomplete: Optional[Callable] = None,
2393 onsafe: Optional[Callable] = None) -> Completion:
2395 Asynchronously append data to an object
2397 Queues the write and returns.
2399 :param object_name: name of the object
2400 :param to_append: data to append
2401 :param offset: byte offset in the object to begin writing at
2402 :param oncomplete: what to do when the write is safe and complete in memory
2404 :param onsafe: what to do when the write is safe and complete on storage
2407 :raises: :class:`Error`
2408 :returns: completion object
2410 object_name_raw = cstr(object_name, 'object_name')
2413 Completion completion
2414 char* _object_name = object_name_raw
2415 char* _to_append = to_append
2416 size_t size = len(to_append)
2418 completion = self.__get_completion(oncomplete, onsafe)
2419 self.__track_completion(completion)
2421 ret = rados_aio_append(self.io, _object_name,
2422 completion.rados_comp,
2425 completion._cleanup()
2426 raise make_ex(ret, "error appending object %s" % object_name)
2429 def aio_flush(self):
2431 Block until all pending writes in an io context are safe
2433 :raises: :class:`Error`
2436 ret = rados_aio_flush(self.io)
2438 raise make_ex(ret, "error flushing")
2440 def aio_cmpext(self, object_name: str, cmp_buf: bytes, offset: int = 0,
2441 oncomplete: Optional[Callable] = None) -> Completion:
2443 Asynchronously compare an on-disk object range with a buffer
2444 :param object_name: the name of the object
2445 :param cmp_buf: buffer containing bytes to be compared with object contents
2446 :param offset: object byte offset at which to start the comparison
2447 :param oncomplete: what to do when the write is safe and complete in memory
2450 :raises: :class:`TypeError`
2451 returns: 0 - on success, negative error code on failure,
2452 (-MAX_ERRNO - mismatch_off) on mismatch
2454 object_name_raw = cstr(object_name, 'object_name')
2457 Completion completion
2458 char* _object_name = object_name_raw
2459 char* _cmp_buf = cmp_buf
2460 size_t _cmp_buf_len = len(cmp_buf)
2461 uint64_t _offset = offset
2463 completion = self.__get_completion(oncomplete, None)
2464 self.__track_completion(completion)
2467 ret = rados_aio_cmpext(self.io, _object_name, completion.rados_comp,
2468 _cmp_buf, _cmp_buf_len, _offset)
2471 completion._cleanup()
2472 raise make_ex(ret, "failed to compare %s" % object_name)
2475 def aio_rmxattr(self, object_name: str, xattr_name: str,
2476 oncomplete: Optional[Callable] = None) -> Completion:
2478 Asynchronously delete an extended attribute from an object
2480 :param object_name: the name of the object to remove xattr from
2481 :param xattr_name: which extended attribute to remove
2482 :param oncomplete: what to do when the rmxattr completes
2484 :raises: :class:`Error`
2485 :returns: completion object
2487 object_name_raw = cstr(object_name, 'object_name')
2488 xattr_name_raw = cstr(xattr_name , 'xattr_name')
2491 Completion completion
2492 char* _object_name = object_name_raw
2493 char* _xattr_name = xattr_name_raw
2495 completion = self.__get_completion(oncomplete, None)
2496 self.__track_completion(completion)
2498 ret = rados_aio_rmxattr(self.io, _object_name,
2499 completion.rados_comp, _xattr_name)
2502 completion._cleanup()
2503 raise make_ex(ret, "Failed to remove xattr %r" % xattr_name)
2506 def aio_read(self, object_name: str, length: int, offset: int,
2507 oncomplete: Optional[Callable] = None) -> Completion:
2509 Asynchronously read data from an object
2511 oncomplete will be called with the returned read value as
2512 well as the completion:
2514 oncomplete(completion, data_read)
2516 :param object_name: name of the object to read from
2517 :param length: the number of bytes to read
2518 :param offset: byte offset in the object to begin reading from
2519 :param oncomplete: what to do when the read is complete
2521 :raises: :class:`Error`
2522 :returns: completion object
2525 object_name_raw = cstr(object_name, 'object_name')
2528 Completion completion
2529 char* _object_name = object_name_raw
2530 uint64_t _offset = offset
2533 size_t _length = length
2535 def oncomplete_(completion_v):
2536 cdef Completion _completion_v = completion_v
2537 return_value = _completion_v.get_return_value()
2538 if return_value > 0 and return_value != length:
2539 _PyBytes_Resize(&_completion_v.buf, return_value)
2540 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2542 completion = self.__get_completion(oncomplete_, None)
2543 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2544 ret_buf = PyBytes_AsString(completion.buf)
2545 self.__track_completion(completion)
2547 ret = rados_aio_read(self.io, _object_name, completion.rados_comp,
2548 ret_buf, _length, _offset)
2550 completion._cleanup()
2551 raise make_ex(ret, "error reading %s" % object_name)
2554 def aio_execute(self, object_name: str, cls: str, method: str,
2555 data: bytes, length: int = 8192,
2556 oncomplete: Optional[Callable[[Completion, bytes], None]] = None,
2557 onsafe: Optional[Callable[[Completion, bytes], None]] = None) -> Completion:
2559 Asynchronously execute an OSD class method on an object.
2561 oncomplete and onsafe will be called with the data returned from
2562 the plugin as well as the completion:
2564 oncomplete(completion, data)
2565 onsafe(completion, data)
2567 :param object_name: name of the object
2568 :param cls: name of the object class
2569 :param method: name of the method
2570 :param data: input data
2571 :param length: size of output buffer in bytes (default=8192)
2572 :param oncomplete: what to do when the execution is complete
2573 :param onsafe: what to do when the execution is safe and complete
2575 :raises: :class:`Error`
2576 :returns: completion object
2579 object_name_raw = cstr(object_name, 'object_name')
2580 cls_raw = cstr(cls, 'cls')
2581 method_raw = cstr(method, 'method')
2583 Completion completion
2584 char *_object_name = object_name_raw
2585 char *_cls = cls_raw
2586 char *_method = method_raw
2588 size_t _data_len = len(data)
2591 size_t _length = length
2593 def oncomplete_(completion_v):
2594 cdef Completion _completion_v = completion_v
2595 return_value = _completion_v.get_return_value()
2596 if return_value > 0 and return_value != length:
2597 _PyBytes_Resize(&_completion_v.buf, return_value)
2598 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2600 def onsafe_(completion_v):
2601 cdef Completion _completion_v = completion_v
2602 return_value = _completion_v.get_return_value()
2603 return onsafe(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2605 completion = self.__get_completion(oncomplete_ if oncomplete else None, onsafe_ if onsafe else None)
2606 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2607 ret_buf = PyBytes_AsString(completion.buf)
2608 self.__track_completion(completion)
2610 ret = rados_aio_exec(self.io, _object_name, completion.rados_comp,
2611 _cls, _method, _data, _data_len, ret_buf, _length)
2613 completion._cleanup()
2614 raise make_ex(ret, "error executing %s::%s on %s" % (cls, method, object_name))
2617 def aio_setxattr(self, object_name: str, xattr_name: str, xattr_value: bytes,
2618 oncomplete: Optional[Callable] = None) -> Completion:
2620 Asynchronously set an extended attribute on an object
2622 :param object_name: the name of the object to set xattr to
2623 :param xattr_name: which extended attribute to set
2624 :param xattr_value: the value of the extended attribute
2625 :param oncomplete: what to do when the setxttr completes
2627 :raises: :class:`Error`
2628 :returns: completion object
2630 object_name_raw = cstr(object_name, 'object_name')
2631 xattr_name_raw = cstr(xattr_name , 'xattr_name')
2634 Completion completion
2635 char* _object_name = object_name_raw
2636 char* _xattr_name = xattr_name_raw
2637 char* _xattr_value = xattr_value
2638 size_t xattr_value_len = len(xattr_value)
2640 completion = self.__get_completion(oncomplete, None)
2641 self.__track_completion(completion)
2643 ret = rados_aio_setxattr(self.io, _object_name,
2644 completion.rados_comp,
2645 _xattr_name, _xattr_value, xattr_value_len)
2648 completion._cleanup()
2649 raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
2652 def aio_remove(self, object_name: str,
2653 oncomplete: Optional[Callable] = None,
2654 onsafe: Optional[Callable] = None) -> Completion:
2656 Asynchronously remove an object
2658 :param object_name: name of the object to remove
2659 :param oncomplete: what to do when the remove is safe and complete in memory
2661 :param onsafe: what to do when the remove is safe and complete on storage
2664 :raises: :class:`Error`
2665 :returns: completion object
2667 object_name_raw = cstr(object_name, 'object_name')
2670 Completion completion
2671 char* _object_name = object_name_raw
2673 completion = self.__get_completion(oncomplete, onsafe)
2674 self.__track_completion(completion)
2676 ret = rados_aio_remove(self.io, _object_name,
2677 completion.rados_comp)
2679 completion._cleanup()
2680 raise make_ex(ret, "error removing %s" % object_name)
2683 def require_ioctx_open(self):
2685 Checks if the rados.Ioctx object state is 'open'
2687 :raises: IoctxStateError
2689 if self.state != "open":
2690 raise IoctxStateError("The pool is %s" % self.state)
2692 def set_locator_key(self, loc_key: str):
2694 Set the key for mapping objects to pgs within an io context.
2696 The key is used instead of the object name to determine which
2697 placement groups an object is put in. This affects all subsequent
2698 operations of the io context - until a different locator key is
2699 set, all objects in this io context will be placed in the same pg.
2701 :param loc_key: the key to use as the object locator, or NULL to discard
2702 any previously set key
2704 :raises: :class:`TypeError`
2706 self.require_ioctx_open()
2707 cloc_key = cstr(loc_key, 'loc_key')
2708 cdef char *_loc_key = cloc_key
2710 rados_ioctx_locator_set_key(self.io, _loc_key)
2711 self.locator_key = loc_key
2713 def get_locator_key(self) -> str:
2715 Get the locator_key of context
2717 :returns: locator_key
2719 return self.locator_key
2721 def set_read(self, snap_id: int):
2723 Set the snapshot for reading objects.
2725 To stop to read from snapshot, use set_read(LIBRADOS_SNAP_HEAD)
2727 :param snap_id: the snapshot Id
2729 :raises: :class:`TypeError`
2731 self.require_ioctx_open()
2732 cdef rados_snap_t _snap_id = snap_id
2734 rados_ioctx_snap_set_read(self.io, _snap_id)
2736 def set_namespace(self, nspace: str):
2738 Set the namespace for objects within an io context.
2740 The namespace in addition to the object name fully identifies
2741 an object. This affects all subsequent operations of the io context
2742 - until a different namespace is set, all objects in this io context
2743 will be placed in the same namespace.
2745 :param nspace: the namespace to use, or None/"" for the default namespace
2747 :raises: :class:`TypeError`
2749 self.require_ioctx_open()
2752 cnspace = cstr(nspace, 'nspace')
2753 cdef char *_nspace = cnspace
2755 rados_ioctx_set_namespace(self.io, _nspace)
2756 self.nspace = nspace
2758 def get_namespace(self) -> str:
2760 Get the namespace of context
2768 Close a rados.Ioctx object.
2770 This just tells librados that you no longer need to use the io context.
2771 It may not be freed immediately if there are pending asynchronous
2772 requests on it, but you should not use an io context again after
2773 calling this function on it.
2775 if self.state == "open":
2776 self.require_ioctx_open()
2778 rados_ioctx_destroy(self.io)
2779 self.state = "closed"
2782 def write(self, key: str, data: bytes, offset: int = 0):
2784 Write data to an object synchronously
2786 :param key: name of the object
2787 :param data: data to write
2788 :param offset: byte offset in the object to begin writing at
2790 :raises: :class:`TypeError`
2791 :raises: :class:`LogicError`
2792 :returns: int - 0 on success
2794 self.require_ioctx_open()
2796 key_raw = cstr(key, 'key')
2798 char *_key = key_raw
2800 size_t length = len(data)
2801 uint64_t _offset = offset
2804 ret = rados_write(self.io, _key, _data, length, _offset)
2808 raise make_ex(ret, "Ioctx.write(%s): failed to write %s"
2811 raise LogicError("Ioctx.write(%s): rados_write \
2812 returned %d, but should return zero on success." % (self.name, ret))
2814 def write_full(self, key: str, data: bytes):
2816 Write an entire object synchronously.
2818 The object is filled with the provided data. If the object exists,
2819 it is atomically truncated and then written.
2821 :param key: name of the object
2822 :param data: data to write
2824 :raises: :class:`TypeError`
2825 :raises: :class:`Error`
2826 :returns: int - 0 on success
2828 self.require_ioctx_open()
2829 key_raw = cstr(key, 'key')
2831 char *_key = key_raw
2833 size_t length = len(data)
2836 ret = rados_write_full(self.io, _key, _data, length)
2840 raise make_ex(ret, "Ioctx.write_full(%s): failed to write %s"
2843 raise LogicError("Ioctx.write_full(%s): rados_write_full \
2844 returned %d, but should return zero on success." % (self.name, ret))
2846 def writesame(self, key: str, data: bytes, write_len: int, offset: int = 0):
2848 Write the same buffer multiple times
2849 :param key: name of the object
2850 :param data: data to write
2851 :param write_len: total number of bytes to write
2852 :param offset: byte offset in the object to begin writing at
2854 :raises: :class:`TypeError`
2855 :raises: :class:`LogicError`
2857 self.require_ioctx_open()
2859 key_raw = cstr(key, 'key')
2861 char *_key = key_raw
2863 size_t _data_len = len(data)
2864 size_t _write_len = write_len
2865 uint64_t _offset = offset
2868 ret = rados_writesame(self.io, _key, _data, _data_len, _write_len, _offset)
2870 raise make_ex(ret, "Ioctx.writesame(%s): failed to write %s"
2874 def append(self, key: str, data: bytes):
2876 Append data to an object synchronously
2878 :param key: name of the object
2879 :param data: data to write
2881 :raises: :class:`TypeError`
2882 :raises: :class:`LogicError`
2883 :returns: int - 0 on success
2885 self.require_ioctx_open()
2886 key_raw = cstr(key, 'key')
2888 char *_key = key_raw
2890 size_t length = len(data)
2893 ret = rados_append(self.io, _key, _data, length)
2897 raise make_ex(ret, "Ioctx.append(%s): failed to append %s"
2900 raise LogicError("Ioctx.append(%s): rados_append \
2901 returned %d, but should return zero on success." % (self.name, ret))
2903 def read(self, key: str, length: int = 8192, offset: int = 0) -> bytes:
2905 Read data from an object synchronously
2907 :param key: name of the object
2908 :param length: the number of bytes to read (default=8192)
2909 :param offset: byte offset in the object to begin reading at
2911 :raises: :class:`TypeError`
2912 :raises: :class:`Error`
2913 :returns: data read from object
2915 self.require_ioctx_open()
2916 key_raw = cstr(key, 'key')
2918 char *_key = key_raw
2920 uint64_t _offset = offset
2921 size_t _length = length
2922 PyObject* ret_s = NULL
2924 ret_s = PyBytes_FromStringAndSize(NULL, length)
2926 ret_buf = PyBytes_AsString(ret_s)
2928 ret = rados_read(self.io, _key, ret_buf, _length, _offset)
2930 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2933 _PyBytes_Resize(&ret_s, ret)
2935 return <object>ret_s
2937 # We DECREF unconditionally: the cast to object above will have
2938 # INCREFed if necessary. This also takes care of exceptions,
2939 # including if _PyString_Resize fails (that will free the string
2940 # itself and set ret_s to NULL, hence XDECREF).
2941 ref.Py_XDECREF(ret_s)
2943 def execute(self, key: str, cls: str, method: str, data: bytes, length: int = 8192) -> Tuple[int, object]:
2945 Execute an OSD class method on an object.
2947 :param key: name of the object
2948 :param cls: name of the object class
2949 :param method: name of the method
2950 :param data: input data
2951 :param length: size of output buffer in bytes (default=8192)
2953 :raises: :class:`TypeError`
2954 :raises: :class:`Error`
2955 :returns: (ret, method output)
2957 self.require_ioctx_open()
2959 key_raw = cstr(key, 'key')
2960 cls_raw = cstr(cls, 'cls')
2961 method_raw = cstr(method, 'method')
2963 char *_key = key_raw
2964 char *_cls = cls_raw
2965 char *_method = method_raw
2967 size_t _data_len = len(data)
2970 size_t _length = length
2971 PyObject* ret_s = NULL
2973 ret_s = PyBytes_FromStringAndSize(NULL, length)
2975 ret_buf = PyBytes_AsString(ret_s)
2977 ret = rados_exec(self.io, _key, _cls, _method, _data,
2978 _data_len, ret_buf, _length)
2980 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2983 _PyBytes_Resize(&ret_s, ret)
2985 return ret, <object>ret_s
2987 # We DECREF unconditionally: the cast to object above will have
2988 # INCREFed if necessary. This also takes care of exceptions,
2989 # including if _PyString_Resize fails (that will free the string
2990 # itself and set ret_s to NULL, hence XDECREF).
2991 ref.Py_XDECREF(ret_s)
2993 def get_stats(self) -> Dict[str, int]:
2995 Get pool usage statistics
2997 :returns: dict contains the following keys:
2999 - ``num_bytes`` (int) - size of pool in bytes
3001 - ``num_kb`` (int) - size of pool in kbytes
3003 - ``num_objects`` (int) - number of objects in the pool
3005 - ``num_object_clones`` (int) - number of object clones
3007 - ``num_object_copies`` (int) - number of object copies
3009 - ``num_objects_missing_on_primary`` (int) - number of objects
3012 - ``num_objects_unfound`` (int) - number of unfound objects
3014 - ``num_objects_degraded`` (int) - number of degraded objects
3016 - ``num_rd`` (int) - bytes read
3018 - ``num_rd_kb`` (int) - kbytes read
3020 - ``num_wr`` (int) - bytes written
3022 - ``num_wr_kb`` (int) - kbytes written
3024 self.require_ioctx_open()
3025 cdef rados_pool_stat_t stats
3027 ret = rados_ioctx_pool_stat(self.io, &stats)
3029 raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name)
3030 return {'num_bytes': stats.num_bytes,
3031 'num_kb': stats.num_kb,
3032 'num_objects': stats.num_objects,
3033 'num_object_clones': stats.num_object_clones,
3034 'num_object_copies': stats.num_object_copies,
3035 "num_objects_missing_on_primary": stats.num_objects_missing_on_primary,
3036 "num_objects_unfound": stats.num_objects_unfound,
3037 "num_objects_degraded": stats.num_objects_degraded,
3038 "num_rd": stats.num_rd,
3039 "num_rd_kb": stats.num_rd_kb,
3040 "num_wr": stats.num_wr,
3041 "num_wr_kb": stats.num_wr_kb}
3043 def remove_object(self, key: str) -> bool:
3047 This does not delete any snapshots of the object.
3049 :param key: the name of the object to delete
3051 :raises: :class:`TypeError`
3052 :raises: :class:`Error`
3053 :returns: True on success
3055 self.require_ioctx_open()
3056 key_raw = cstr(key, 'key')
3058 char *_key = key_raw
3061 ret = rados_remove(self.io, _key)
3063 raise make_ex(ret, "Failed to remove '%s'" % key)
3066 def trunc(self, key: str, size: int) -> int:
3070 If this enlarges the object, the new area is logically filled with
3071 zeroes. If this shrinks the object, the excess data is removed.
3073 :param key: the name of the object to resize
3074 :param size: the new size of the object in bytes
3076 :raises: :class:`TypeError`
3077 :raises: :class:`Error`
3078 :returns: 0 on success, otherwise raises error
3081 self.require_ioctx_open()
3082 key_raw = cstr(key, 'key')
3084 char *_key = key_raw
3085 uint64_t _size = size
3088 ret = rados_trunc(self.io, _key, _size)
3090 raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))
3093 def cmpext(self, key: str, cmp_buf: bytes, offset: int = 0) -> int:
3095 Compare an on-disk object range with a buffer
3096 :param key: the name of the object
3097 :param cmp_buf: buffer containing bytes to be compared with object contents
3098 :param offset: object byte offset at which to start the comparison
3100 :raises: :class:`TypeError`
3101 :raises: :class:`Error`
3102 :returns: 0 - on success, negative error code on failure,
3103 (-MAX_ERRNO - mismatch_off) on mismatch
3105 self.require_ioctx_open()
3106 key_raw = cstr(key, 'key')
3108 char *_key = key_raw
3109 char *_cmp_buf = cmp_buf
3110 size_t _cmp_buf_len = len(cmp_buf)
3111 uint64_t _offset = offset
3113 ret = rados_cmpext(self.io, _key, _cmp_buf, _cmp_buf_len, _offset)
3114 assert ret < -MAX_ERRNO or ret == 0, "Ioctx.cmpext(%s): failed to compare %s" % (self.name, key)
3117 def stat(self, key: str) -> Tuple[int, time.struct_time]:
3119 Get object stats (size/mtime)
3121 :param key: the name of the object to get stats from
3123 :raises: :class:`TypeError`
3124 :raises: :class:`Error`
3125 :returns: (size,timestamp)
3127 self.require_ioctx_open()
3129 key_raw = cstr(key, 'key')
3131 char *_key = key_raw
3136 ret = rados_stat(self.io, _key, &psize, &pmtime)
3138 raise make_ex(ret, "Failed to stat %r" % key)
3139 return psize, time.localtime(pmtime)
3141 def get_xattr(self, key: str, xattr_name: str) -> bytes:
3143 Get the value of an extended attribute on an object.
3145 :param key: the name of the object to get xattr from
3146 :param xattr_name: which extended attribute to read
3148 :raises: :class:`TypeError`
3149 :raises: :class:`Error`
3150 :returns: value of the xattr
3152 self.require_ioctx_open()
3154 key_raw = cstr(key, 'key')
3155 xattr_name_raw = cstr(xattr_name, 'xattr_name')
3157 char *_key = key_raw
3158 char *_xattr_name = xattr_name_raw
3159 size_t ret_length = 4096
3160 char *ret_buf = NULL
3163 while ret_length < 4096 * 1024 * 1024:
3164 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
3166 ret = rados_getxattr(self.io, _key, _xattr_name, ret_buf, ret_length)
3167 if ret == -errno.ERANGE:
3170 raise make_ex(ret, "Failed to get xattr %r" % xattr_name)
3173 return ret_buf[:ret]
3177 def get_xattrs(self, oid: str) -> XattrIterator:
3179 Start iterating over xattrs on an object.
3181 :param oid: the name of the object to get xattrs from
3183 :raises: :class:`TypeError`
3184 :raises: :class:`Error`
3185 :returns: XattrIterator
3187 self.require_ioctx_open()
3188 return XattrIterator(self, oid)
3190 def set_xattr(self, key: str, xattr_name: str, xattr_value: bytes) -> bool:
3192 Set an extended attribute on an object.
3194 :param key: the name of the object to set xattr to
3195 :param xattr_name: which extended attribute to set
3196 :param xattr_value: the value of the extended attribute
3198 :raises: :class:`TypeError`
3199 :raises: :class:`Error`
3200 :returns: True on success, otherwise raise an error
3202 self.require_ioctx_open()
3204 key_raw = cstr(key, 'key')
3205 xattr_name_raw = cstr(xattr_name, 'xattr_name')
3207 char *_key = key_raw
3208 char *_xattr_name = xattr_name_raw
3209 char *_xattr_value = xattr_value
3210 size_t _xattr_value_len = len(xattr_value)
3213 ret = rados_setxattr(self.io, _key, _xattr_name,
3214 _xattr_value, _xattr_value_len)
3216 raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
3219 def rm_xattr(self, key: str, xattr_name: str) -> bool:
3221 Removes an extended attribute on from an object.
3223 :param key: the name of the object to remove xattr from
3224 :param xattr_name: which extended attribute to remove
3226 :raises: :class:`TypeError`
3227 :raises: :class:`Error`
3228 :returns: True on success, otherwise raise an error
3230 self.require_ioctx_open()
3232 key_raw = cstr(key, 'key')
3233 xattr_name_raw = cstr(xattr_name, 'xattr_name')
3235 char *_key = key_raw
3236 char *_xattr_name = xattr_name_raw
3239 ret = rados_rmxattr(self.io, _key, _xattr_name)
3241 raise make_ex(ret, "Failed to delete key %r xattr %r" %
3245 def notify(self, obj: str, msg: str = '', timeout_ms: int = 5000) -> bool:
3247 Send a rados notification to an object.
3249 :param obj: the name of the object to notify
3250 :param msg: optional message to send in the notification
3251 :param timeout_ms: notify timeout (in ms)
3253 :raises: :class:`TypeError`
3254 :raises: :class:`Error`
3255 :returns: True on success, otherwise raise an error
3257 self.require_ioctx_open()
3260 obj_raw = cstr(obj, 'obj')
3261 msg_raw = cstr(msg, 'msg')
3263 char *_obj = obj_raw
3264 char *_msg = msg_raw
3265 int _msglen = msglen
3266 uint64_t _timeout_ms = timeout_ms
3269 ret = rados_notify2(self.io, _obj, _msg, _msglen, _timeout_ms,
3272 raise make_ex(ret, "Failed to notify %r" % (obj))
3275 def aio_notify(self, obj: str,
3276 oncomplete: Callable[[Completion, int, Optional[List], Optional[List]], None],
3277 msg: str = '', timeout_ms: int = 5000) -> Completion:
3279 Asynchronously send a rados notification to an object
3281 self.require_ioctx_open()
3284 obj_raw = cstr(obj, 'obj')
3285 msg_raw = cstr(msg, 'msg')
3288 Completion completion
3289 char *_obj = obj_raw
3290 char *_msg = msg_raw
3291 int _msglen = msglen
3292 uint64_t _timeout_ms = timeout_ms
3296 def oncomplete_(completion_v):
3298 Completion _completion_v = completion_v
3299 notify_ack_t *acks = NULL
3300 notify_timeout_t *timeouts = NULL
3303 return_value = _completion_v.get_return_value()
3304 if return_value == 0:
3305 return_value = rados_decode_notify_response(reply, replylen, &acks, &nr_acks, &timeouts, &nr_timeouts)
3306 rados_buffer_free(reply)
3307 if return_value == 0:
3308 ack_list = [(ack.notifier_id, ack.cookie, '' if not ack.payload_len \
3309 else ack.payload[:ack.payload_len]) for ack in acks[:nr_acks]]
3310 timeout_list = [(timeout.notifier_id, timeout.cookie) for timeout in timeouts[:nr_timeouts]]
3311 rados_free_notify_response(acks, nr_acks, timeouts)
3312 return oncomplete(_completion_v, 0, ack_list, timeout_list)
3314 return oncomplete(_completion_v, return_value, None, None)
3316 completion = self.__get_completion(oncomplete_, None)
3317 self.__track_completion(completion)
3319 ret = rados_aio_notify(self.io, _obj, completion.rados_comp,
3320 _msg, _msglen, _timeout_ms, &reply, &replylen)
3322 completion._cleanup()
3323 raise make_ex(ret, "aio_notify error: %s" % obj)
3326 def watch(self, obj: str,
3327 callback: Callable[[int, str, int, bytes], None],
3328 error_callback: Optional[Callable[[int], None]] = None,
3329 timeout: Optional[int] = None) -> Watch:
3331 Register an interest in an object.
3333 :param obj: the name of the object to notify
3334 :param callback: what to do when a notify is received on this object
3335 :param error_callback: what to do when the watch session encounters an error
3336 :param timeout: how many seconds the connection will keep after disconnection
3338 :raises: :class:`TypeError`
3339 :raises: :class:`Error`
3340 :returns: watch_id - internal id assigned to this watch
3342 self.require_ioctx_open()
3344 return Watch(self, obj, callback, error_callback, timeout)
3346 def list_objects(self) -> ObjectIterator:
3348 Get ObjectIterator on rados.Ioctx object.
3350 :returns: ObjectIterator
3352 self.require_ioctx_open()
3353 return ObjectIterator(self)
3355 def list_snaps(self) -> SnapIterator:
3357 Get SnapIterator on rados.Ioctx object.
3359 :returns: SnapIterator
3361 self.require_ioctx_open()
3362 return SnapIterator(self)
3364 def get_pool_id(self) -> int:
3368 :returns: int - pool id
3371 ret = rados_ioctx_get_id(self.io)
3374 def get_pool_name(self) -> str:
3386 name = <char *>realloc_chk(name, name_len)
3388 ret = rados_ioctx_get_pool_name(self.io, name, name_len)
3391 elif ret != -errno.ERANGE:
3392 raise make_ex(ret, "get pool name error")
3394 name_len = name_len * 2
3396 return decode_cstr(name)
3401 def create_snap(self, snap_name: str):
3403 Create a pool-wide snapshot
3405 :param snap_name: the name of the snapshot
3407 :raises: :class:`TypeError`
3408 :raises: :class:`Error`
3410 self.require_ioctx_open()
3411 snap_name_raw = cstr(snap_name, 'snap_name')
3412 cdef char *_snap_name = snap_name_raw
3415 ret = rados_ioctx_snap_create(self.io, _snap_name)
3417 raise make_ex(ret, "Failed to create snap %s" % snap_name)
3419 def remove_snap(self, snap_name: str):
3421 Removes a pool-wide snapshot
3423 :param snap_name: the name of the snapshot
3425 :raises: :class:`TypeError`
3426 :raises: :class:`Error`
3428 self.require_ioctx_open()
3429 snap_name_raw = cstr(snap_name, 'snap_name')
3430 cdef char *_snap_name = snap_name_raw
3433 ret = rados_ioctx_snap_remove(self.io, _snap_name)
3435 raise make_ex(ret, "Failed to remove snap %s" % snap_name)
3437 def lookup_snap(self, snap_name: str) -> Snap:
3439 Get the id of a pool snapshot
3441 :param snap_name: the name of the snapshot to lookop
3443 :raises: :class:`TypeError`
3444 :raises: :class:`Error`
3445 :returns: Snap - on success
3447 self.require_ioctx_open()
3448 csnap_name = cstr(snap_name, 'snap_name')
3450 char *_snap_name = csnap_name
3451 rados_snap_t snap_id
3454 ret = rados_ioctx_snap_lookup(self.io, _snap_name, &snap_id)
3456 raise make_ex(ret, "Failed to lookup snap %s" % snap_name)
3457 return Snap(self, snap_name, int(snap_id))
3459 def snap_rollback(self, oid: str, snap_name: str):
3461 Rollback an object to a snapshot
3463 :param oid: the name of the object
3464 :param snap_name: the name of the snapshot
3466 :raises: :class:`TypeError`
3467 :raises: :class:`Error`
3469 self.require_ioctx_open()
3470 oid_raw = cstr(oid, 'oid')
3471 snap_name_raw = cstr(snap_name, 'snap_name')
3473 char *_oid = oid_raw
3474 char *_snap_name = snap_name_raw
3477 ret = rados_ioctx_snap_rollback(self.io, _oid, _snap_name)
3479 raise make_ex(ret, "Failed to rollback %s" % oid)
3481 def create_self_managed_snap(self):
3483 Creates a self-managed snapshot
3485 :returns: snap id on success
3487 :raises: :class:`Error`
3489 self.require_ioctx_open()
3491 rados_snap_t _snap_id
3493 ret = rados_ioctx_selfmanaged_snap_create(self.io, &_snap_id)
3495 raise make_ex(ret, "Failed to create self-managed snapshot")
3496 return int(_snap_id)
3498 def remove_self_managed_snap(self, snap_id: int):
3500 Removes a self-managed snapshot
3502 :param snap_id: the name of the snapshot
3504 :raises: :class:`TypeError`
3505 :raises: :class:`Error`
3507 self.require_ioctx_open()
3509 rados_snap_t _snap_id = snap_id
3511 ret = rados_ioctx_selfmanaged_snap_remove(self.io, _snap_id)
3513 raise make_ex(ret, "Failed to remove self-managed snapshot")
3515 def set_self_managed_snap_write(self, snaps: Sequence[Union[int, str]]):
3517 Updates the write context to the specified self-managed
3520 :param snaps: all associated self-managed snapshot ids
3522 :raises: :class:`TypeError`
3523 :raises: :class:`Error`
3525 self.require_ioctx_open()
3529 sorted_snaps = sorted([int(x) for x in snaps], reverse=True)
3530 snap_seq = sorted_snaps[0]
3533 rados_snap_t _snap_seq = snap_seq
3534 rados_snap_t *_snaps = NULL
3535 int _num_snaps = len(sorted_snaps)
3537 _snaps = <rados_snap_t *>malloc(_num_snaps * sizeof(rados_snap_t))
3538 for i in range(len(sorted_snaps)):
3539 _snaps[i] = sorted_snaps[i]
3541 ret = rados_ioctx_selfmanaged_snap_set_write_ctx(self.io,
3546 raise make_ex(ret, "Failed to update snapshot write context")
3550 def rollback_self_managed_snap(self, oid: str, snap_id: int):
3552 Rolls an specific object back to a self-managed snapshot revision
3554 :param oid: the name of the object
3555 :param snap_id: the name of the snapshot
3557 :raises: :class:`TypeError`
3558 :raises: :class:`Error`
3560 self.require_ioctx_open()
3561 oid_raw = cstr(oid, 'oid')
3563 char *_oid = oid_raw
3564 rados_snap_t _snap_id = snap_id
3566 ret = rados_ioctx_selfmanaged_snap_rollback(self.io, _oid, _snap_id)
3568 raise make_ex(ret, "Failed to rollback %s" % oid)
3570 def get_last_version(self) -> int:
3572 Return the version of the last object read or written to.
3574 This exposes the internal version number of the last object read or
3575 written via this io context
3577 :returns: version of the last object used
3579 self.require_ioctx_open()
3581 ret = rados_get_last_version(self.io)
3584 def create_write_op(self) -> WriteOp:
3586 create write operation object.
3587 need call release_write_op after use
3589 return WriteOp().create()
3591 def create_read_op(self) -> ReadOp:
3593 create read operation object.
3594 need call release_read_op after use
3596 return ReadOp().create()
3598 def release_write_op(self, write_op):
3600 release memory alloc by create_write_op
3604 def release_read_op(self, read_op: ReadOp):
3606 release memory alloc by create_read_op
3607 :para read_op: read_op object
3611 def set_omap(self, write_op: WriteOp, keys: Sequence[OMAP_KEY_TYPE], values: Sequence[bytes]):
3613 set keys values to write_op
3614 :para write_op: write_operation object
3615 :para keys: a tuple of keys
3616 :para values: a tuple of values
3619 if len(keys) != len(values):
3620 raise Error("Rados(): keys and values must have the same number of items")
3622 keys = cstr_list(keys, 'keys')
3623 values = cstr_list(values, 'values')
3624 lens = [len(v) for v in values]
3626 WriteOp _write_op = write_op
3627 size_t key_num = len(keys)
3628 char **_keys = to_bytes_array(keys)
3629 char **_values = to_bytes_array(values)
3630 size_t *_lens = to_csize_t_array(lens)
3634 rados_write_op_omap_set(_write_op.write_op,
3635 <const char**>_keys,
3636 <const char**>_values,
3637 <const size_t*>_lens, key_num)
3643 def operate_write_op(self,
3647 flags: int = LIBRADOS_OPERATION_NOFLAG):
3649 execute the real write operation
3650 :para write_op: write operation object
3651 :para oid: object name
3652 :para mtime: the time to set the mtime to, 0 for the current time
3653 :para flags: flags to apply to the entire operation
3656 oid_raw = cstr(oid, 'oid')
3658 WriteOp _write_op = write_op
3659 char *_oid = oid_raw
3660 time_t _mtime = mtime
3664 ret = rados_write_op_operate(_write_op.write_op, self.io, _oid, &_mtime, _flags)
3666 raise make_ex(ret, "Failed to operate write op for oid %s" % oid)
3668 def operate_aio_write_op(self, write_op: WriteOp, oid: str,
3669 oncomplete: Optional[Callable[[Completion], None]] = None,
3670 onsafe: Optional[Callable[[Completion], None]] = None,
3672 flags: int = LIBRADOS_OPERATION_NOFLAG) -> Completion:
3674 execute the real write operation asynchronously
3675 :para write_op: write operation object
3676 :para oid: object name
3677 :param oncomplete: what to do when the remove is safe and complete in memory
3679 :param onsafe: what to do when the remove is safe and complete on storage
3681 :para mtime: the time to set the mtime to, 0 for the current time
3682 :para flags: flags to apply to the entire operation
3684 :raises: :class:`Error`
3685 :returns: completion object
3688 oid_raw = cstr(oid, 'oid')
3690 WriteOp _write_op = write_op
3691 char *_oid = oid_raw
3692 Completion completion
3693 time_t _mtime = mtime
3696 completion = self.__get_completion(oncomplete, onsafe)
3697 self.__track_completion(completion)
3700 ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid,
3703 completion._cleanup()
3704 raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid)
3707 def operate_read_op(self, read_op: ReadOp, oid: str, flag: int = LIBRADOS_OPERATION_NOFLAG):
3709 execute the real read operation
3710 :para read_op: read operation object
3711 :para oid: object name
3712 :para flag: flags to apply to the entire operation
3714 oid_raw = cstr(oid, 'oid')
3716 ReadOp _read_op = read_op
3717 char *_oid = oid_raw
3721 ret = rados_read_op_operate(_read_op.read_op, self.io, _oid, _flag)
3723 raise make_ex(ret, "Failed to operate read op for oid %s" % oid)
3725 def operate_aio_read_op(self, read_op: ReadOp, oid: str,
3726 oncomplete: Optional[Callable[[Completion], None]] = None,
3727 onsafe: Optional[Callable[[Completion], None]] = None,
3728 flag: int = LIBRADOS_OPERATION_NOFLAG) -> Completion:
3730 execute the real read operation
3731 :para read_op: read operation object
3732 :para oid: object name
3733 :param oncomplete: what to do when the remove is safe and complete in memory
3735 :param onsafe: what to do when the remove is safe and complete on storage
3737 :para flag: flags to apply to the entire operation
3739 oid_raw = cstr(oid, 'oid')
3741 ReadOp _read_op = read_op
3742 char *_oid = oid_raw
3743 Completion completion
3746 completion = self.__get_completion(oncomplete, onsafe)
3747 self.__track_completion(completion)
3750 ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag)
3752 completion._cleanup()
3753 raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid)
3756 def get_omap_vals(self,
3758 start_after: OMAP_KEY_TYPE,
3759 filter_prefix: OMAP_KEY_TYPE,
3761 omap_key_type = bytes.decode) -> Tuple[OmapIterator, int]:
3764 :para read_op: read operation object
3765 :para start_after: list keys starting after start_after
3766 :para filter_prefix: list only keys beginning with filter_prefix
3767 :para max_return: list no more than max_return key/value pairs
3768 :returns: an iterator over the requested omap values, return value from this action
3771 start_after_raw = cstr(start_after, 'start_after') if start_after else None
3772 filter_prefix_raw = cstr(filter_prefix, 'filter_prefix') if filter_prefix else None
3774 char *_start_after = opt_str(start_after_raw)
3775 char *_filter_prefix = opt_str(filter_prefix_raw)
3776 ReadOp _read_op = read_op
3777 rados_omap_iter_t iter_addr = NULL
3778 int _max_return = max_return
3781 rados_read_op_omap_get_vals2(_read_op.read_op, _start_after, _filter_prefix,
3782 _max_return, &iter_addr, NULL, NULL)
3783 it = OmapIterator(self, omap_key_type)
3785 return it, 0 # 0 is meaningless; there for backward-compat
3787 def get_omap_keys(self,
3789 start_after: OMAP_KEY_TYPE,
3791 omap_key_type = bytes.decode) -> Tuple[OmapIterator, int]:
3794 :para read_op: read operation object
3795 :para start_after: list keys starting after start_after
3796 :para max_return: list no more than max_return key/value pairs
3797 :returns: an iterator over the requested omap values, return value from this action
3799 start_after = cstr(start_after, 'start_after') if start_after else None
3801 char *_start_after = opt_str(start_after)
3802 ReadOp _read_op = read_op
3803 rados_omap_iter_t iter_addr = NULL
3804 int _max_return = max_return
3807 rados_read_op_omap_get_keys2(_read_op.read_op, _start_after,
3808 _max_return, &iter_addr, NULL, NULL)
3809 it = OmapIterator(self, omap_key_type)
3811 return it, 0 # 0 is meaningless; there for backward-compat
3813 def get_omap_vals_by_keys(self,
3815 keys: Sequence[OMAP_KEY_TYPE],
3816 omap_key_type = bytes.decode) -> Tuple[OmapIterator, int]:
3818 get the omap values by keys
3819 :para read_op: read operation object
3820 :para keys: input key tuple
3821 :returns: an iterator over the requested omap values, return value from this action
3823 keys = cstr_list(keys, 'keys')
3825 ReadOp _read_op = read_op
3826 rados_omap_iter_t iter_addr
3827 char **_keys = to_bytes_array(keys)
3828 size_t key_num = len(keys)
3832 rados_read_op_omap_get_vals_by_keys(_read_op.read_op,
3833 <const char**>_keys,
3834 key_num, &iter_addr, NULL)
3835 it = OmapIterator(self, omap_key_type)
3837 return it, 0 # 0 is meaningless; there for backward-compat
3841 def remove_omap_keys(self, write_op: WriteOp, keys: Sequence[OMAP_KEY_TYPE]):
3843 remove omap keys specifiled
3844 :para write_op: write operation object
3845 :para keys: input key tuple
3848 keys = cstr_list(keys, 'keys')
3850 WriteOp _write_op = write_op
3851 size_t key_num = len(keys)
3852 char **_keys = to_bytes_array(keys)
3856 rados_write_op_omap_rm_keys(_write_op.write_op, <const char**>_keys, key_num)
3860 def clear_omap(self, write_op: WriteOp):
3862 Remove all key/value pairs from an object
3863 :para write_op: write operation object
3867 WriteOp _write_op = write_op
3870 rados_write_op_omap_clear(_write_op.write_op)
3872 def remove_omap_range2(self, write_op: WriteOp, key_begin: OMAP_KEY_TYPE, key_end: OMAP_KEY_TYPE):
3874 Remove key/value pairs from an object whose keys are in the range
3875 [key_begin, key_end)
3876 :param write_op: write operation object
3877 :param key_begin: the lower bound of the key range to remove
3878 :param key_end: the upper bound of the key range to remove
3880 key_begin_raw = cstr(key_begin, 'key_begin')
3881 key_end_raw = cstr(key_end, 'key_end')
3883 WriteOp _write_op = write_op
3884 char* _key_begin = key_begin_raw
3885 size_t key_begin_len = len(key_begin)
3886 char* _key_end = key_end_raw
3887 size_t key_end_len = len(key_end)
3889 rados_write_op_omap_rm_range2(_write_op.write_op, _key_begin, key_begin_len,
3890 _key_end, key_end_len)
3892 def lock_exclusive(self, key: str, name: str, cookie: str, desc: str = "",
3893 duration: Optional[int] = None,
3897 Take an exclusive lock on an object
3899 :param key: name of the object
3900 :param name: name of the lock
3901 :param cookie: cookie of the lock
3902 :param desc: description of the lock
3903 :param duration: duration of the lock in seconds
3906 :raises: :class:`TypeError`
3907 :raises: :class:`Error`
3909 self.require_ioctx_open()
3911 key_raw = cstr(key, 'key')
3912 name_raw = cstr(name, 'name')
3913 cookie_raw = cstr(cookie, 'cookie')
3914 desc_raw = cstr(desc, 'desc')
3917 char* _key = key_raw
3918 char* _name = name_raw
3919 char* _cookie = cookie_raw
3920 char* _desc = desc_raw
3921 uint8_t _flags = flags
3924 if duration is None:
3926 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3929 _duration.tv_sec = duration
3931 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3935 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3937 def lock_shared(self, key: str, name: str, cookie: str, tag: str, desc: str = "",
3938 duration: Optional[int] = None,
3942 Take a shared lock on an object
3944 :param key: name of the object
3945 :param name: name of the lock
3946 :param cookie: cookie of the lock
3947 :param tag: tag of the lock
3948 :param desc: description of the lock
3949 :param duration: duration of the lock in seconds
3952 :raises: :class:`TypeError`
3953 :raises: :class:`Error`
3955 self.require_ioctx_open()
3957 key_raw = cstr(key, 'key')
3958 tag_raw = cstr(tag, 'tag')
3959 name_raw = cstr(name, 'name')
3960 cookie_raw = cstr(cookie, 'cookie')
3961 desc_raw = cstr(desc, 'desc')
3964 char* _key = key_raw
3965 char* _tag = tag_raw
3966 char* _name = name_raw
3967 char* _cookie = cookie_raw
3968 char* _desc = desc_raw
3969 uint8_t _flags = flags
3972 if duration is None:
3974 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3977 _duration.tv_sec = duration
3979 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3982 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3984 def unlock(self, key: str, name: str, cookie: str):
3987 Release a shared or exclusive lock on an object
3989 :param key: name of the object
3990 :param name: name of the lock
3991 :param cookie: cookie of the lock
3993 :raises: :class:`TypeError`
3994 :raises: :class:`Error`
3996 self.require_ioctx_open()
3998 key_raw = cstr(key, 'key')
3999 name_raw = cstr(name, 'name')
4000 cookie_raw = cstr(cookie, 'cookie')
4003 char* _key = key_raw
4004 char* _name = name_raw
4005 char* _cookie = cookie_raw
4008 ret = rados_unlock(self.io, _key, _name, _cookie)
4010 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
4012 def set_osdmap_full_try(self):
4014 Set global osdmap_full_try label to true
4017 rados_set_pool_full_try(self.io)
4019 def unset_osdmap_full_try(self):
4024 rados_unset_pool_full_try(self.io)
4026 def application_enable(self, app_name: str, force: bool = False):
4028 Enable an application on an OSD pool
4030 :param app_name: application name
4032 :param force: False if only a single app should exist per pool
4033 :type expire_seconds: boool
4035 :raises: :class:`Error`
4037 app_name_raw = cstr(app_name, 'app_name')
4039 char *_app_name = app_name_raw
4040 int _force = (1 if force else 0)
4043 ret = rados_application_enable(self.io, _app_name, _force)
4045 raise make_ex(ret, "error enabling application")
4047 def application_list(self) -> List[str]:
4049 Returns a list of enabled applications
4051 :returns: list of app name string
4059 apps = <char *>realloc_chk(apps, length)
4061 ret = rados_application_list(self.io, apps, &length)
4063 return [decode_cstr(app) for app in
4064 apps[:length].split(b'\0') if app]
4065 elif ret == -errno.ENOENT:
4067 elif ret == -errno.ERANGE:
4070 raise make_ex(ret, "error listing applications")
4074 def application_metadata_get(self, app_name: str, key: str) -> str:
4076 Gets application metadata on an OSD pool for the given key
4078 :param app_name: application name
4080 :param key: metadata key
4082 :returns: str - metadata value
4084 :raises: :class:`Error`
4087 app_name_raw = cstr(app_name, 'app_name')
4088 key_raw = cstr(key, 'key')
4090 char *_app_name = app_name_raw
4091 char *_key = key_raw
4097 value = <char *>realloc_chk(value, size)
4099 ret = rados_application_metadata_get(self.io, _app_name,
4101 if ret != -errno.ERANGE:
4103 if ret == -errno.ENOENT:
4104 raise KeyError('no metadata %s for application %s' % (key, _app_name))
4106 raise make_ex(ret, 'error getting metadata %s for application %s' %
4108 return decode_cstr(value)
4112 def application_metadata_set(self, app_name: str, key: str, value: str):
4114 Sets application metadata on an OSD pool
4116 :param app_name: application name
4118 :param key: metadata key
4120 :param value: metadata value
4123 :raises: :class:`Error`
4125 app_name_raw = cstr(app_name, 'app_name')
4126 key_raw = cstr(key, 'key')
4127 value_raw = cstr(value, 'value')
4129 char *_app_name = app_name_raw
4130 char *_key = key_raw
4131 char *_value = value_raw
4134 ret = rados_application_metadata_set(self.io, _app_name, _key,
4137 raise make_ex(ret, "error setting application metadata")
4139 def application_metadata_remove(self, app_name: str, key: str):
4141 Remove application metadata from an OSD pool
4143 :param app_name: application name
4145 :param key: metadata key
4148 :raises: :class:`Error`
4150 app_name_raw = cstr(app_name, 'app_name')
4151 key_raw = cstr(key, 'key')
4153 char *_app_name = app_name_raw
4154 char *_key = key_raw
4157 ret = rados_application_metadata_remove(self.io, _app_name, _key)
4159 raise make_ex(ret, "error removing application metadata")
4161 def application_metadata_list(self, app_name: str) -> List[Tuple[str, str]]:
4163 Returns a list of enabled applications
4165 :param app_name: application name
4167 :returns: list of key/value tuples
4169 app_name_raw = cstr(app_name, 'app_name')
4171 char *_app_name = app_name_raw
4172 size_t key_length = 128
4173 size_t val_length = 128
4179 c_keys = <char *>realloc_chk(c_keys, key_length)
4180 c_vals = <char *>realloc_chk(c_vals, val_length)
4182 ret = rados_application_metadata_list(self.io, _app_name,
4183 c_keys, &key_length,
4184 c_vals, &val_length)
4186 keys = [decode_cstr(key) for key in
4187 c_keys[:key_length].split(b'\0')]
4188 vals = [decode_cstr(val) for val in
4189 c_vals[:val_length].split(b'\0')]
4190 return list(zip(keys, vals))[:-1]
4191 elif ret == -errno.ERANGE:
4194 raise make_ex(ret, "error listing application metadata")
4199 def alignment(self) -> int:
4201 Returns pool alignment
4204 Number of alignment bytes required by the current pool, or None if
4205 alignment is not required.
4212 ret = rados_ioctx_pool_requires_alignment2(self.io, &requires)
4214 raise make_ex(ret, "error checking alignment")
4219 ret = rados_ioctx_pool_required_alignment2(self.io, &_alignment)
4221 raise make_ex(ret, "error querying alignment")
4222 alignment = _alignment
4226 def set_object_locator(func):
4227 def retfunc(self, *args, **kwargs):
4228 if self.locator_key is not None:
4229 old_locator = self.ioctx.get_locator_key()
4230 self.ioctx.set_locator_key(self.locator_key)
4231 retval = func(self, *args, **kwargs)
4232 self.ioctx.set_locator_key(old_locator)
4235 return func(self, *args, **kwargs)
4239 def set_object_namespace(func):
4240 def retfunc(self, *args, **kwargs):
4241 if self.nspace is None:
4242 raise LogicError("Namespace not set properly in context")
4243 old_nspace = self.ioctx.get_namespace()
4244 self.ioctx.set_namespace(self.nspace)
4245 retval = func(self, *args, **kwargs)
4246 self.ioctx.set_namespace(old_nspace)
4251 class Object(object):
4252 """Rados object wrapper, makes the object look like a file"""
4253 def __init__(self, ioctx, key, locator_key=None, nspace=None):
4257 self.state = "exists"
4258 self.locator_key = locator_key
4259 self.nspace = "" if nspace is None else nspace
4262 return "rados.Object(ioctx=%s,key=%s,nspace=%s,locator=%s)" % \
4263 (str(self.ioctx), self.key, "--default--"
4264 if self.nspace is "" else self.nspace, self.locator_key)
4266 def require_object_exists(self):
4267 if self.state != "exists":
4268 raise ObjectStateError("The object is %s" % self.state)
4271 @set_object_namespace
4272 def read(self, length=1024 * 1024):
4273 self.require_object_exists()
4274 ret = self.ioctx.read(self.key, length, self.offset)
4275 self.offset += len(ret)
4279 @set_object_namespace
4280 def write(self, string_to_write):
4281 self.require_object_exists()
4282 ret = self.ioctx.write(self.key, string_to_write, self.offset)
4284 self.offset += len(string_to_write)
4288 @set_object_namespace
4290 self.require_object_exists()
4291 self.ioctx.remove_object(self.key)
4292 self.state = "removed"
4295 @set_object_namespace
4296 def stat(self) -> Tuple[int, time.struct_time]:
4297 self.require_object_exists()
4298 return self.ioctx.stat(self.key)
4300 def seek(self, position: int):
4301 self.require_object_exists()
4302 self.offset = position
4305 @set_object_namespace
4306 def get_xattr(self, xattr_name: str) -> bytes:
4307 self.require_object_exists()
4308 return self.ioctx.get_xattr(self.key, xattr_name)
4311 @set_object_namespace
4312 def get_xattrs(self) -> XattrIterator:
4313 self.require_object_exists()
4314 return self.ioctx.get_xattrs(self.key)
4317 @set_object_namespace
4318 def set_xattr(self, xattr_name: str, xattr_value: bytes) -> bool:
4319 self.require_object_exists()
4320 return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)
4323 @set_object_namespace
4324 def rm_xattr(self, xattr_name: str) -> bool:
4325 self.require_object_exists()
4326 return self.ioctx.rm_xattr(self.key, xattr_name)
4337 class MonitorLog(object):
4338 # NOTE(sileht): Keep this class for backward compat
4339 # method moved to Rados.monitor_log()
4341 For watching cluster log messages. Instantiate an object and keep
4342 it around while callback is periodically called. Construct with
4343 'level' to monitor 'level' messages (one of MONITOR_LEVELS).
4344 arg will be passed to the callback.
4346 callback will be called with:
4347 arg (given to __init__)
4348 line (the full line, including timestamp, who, level, msg)
4349 who (which entity issued the log message)
4350 timestamp_sec (sec of a struct timespec)
4351 timestamp_nsec (sec of a struct timespec)
4352 seq (sequence number)
4353 level (string representing the level of the log message)
4354 msg (the message itself)
4355 callback's return value is ignored
4357 def __init__(self, cluster, level, callback, arg):
4359 self.callback = callback
4361 self.cluster = cluster
4362 self.cluster.monitor_log(level, callback, arg)