1 # cython: embedsignature=True
3 This module is a thin wrapper around librados.
5 Error codes from librados are turned into exceptions that subclass
6 :class:`Error`. Almost all methods may raise :class:`Error(the base class of all rados exceptions), :class:`PermissionError`
7 (the base class of all rados exceptions), :class:`PermissionError`
8 and :class:`IOError`, in addition to those documented for the
11 # Copyright 2011 Josh Durgin
12 # Copyright 2011, Hannu Valtonen <hannu.valtonen@ormod.com>
13 # Copyright 2015 Hector Martin <marcan@marcan.st>
14 # Copyright 2016 Mehdi Abaakouk <sileht@redhat.com>
16 from cpython cimport PyObject, ref
17 from cpython.pycapsule cimport *
18 from libc cimport errno
19 from libc.stdint cimport *
20 from libc.stdlib cimport malloc, realloc, free
26 from collections import Callable
27 from datetime import datetime
28 from functools import partial, wraps
29 from itertools import chain
31 # Are we running Python 2.x
32 if sys.version_info[0] < 3:
38 cdef extern from "Python.h":
39 # These are in cpython/string.pxd, but use "object" types instead of
40 # PyObject*, which invokes assumptions in cpython that we need to
41 # legitimately break to implement zero-copy string buffers in Ioctx.read().
42 # This is valid use of the Python API and documented as a special case.
43 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
44 char* PyBytes_AsString(PyObject *string) except NULL
45 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
46 void PyEval_InitThreads()
49 cdef extern from "time.h":
50 ctypedef long int time_t
51 ctypedef long int suseconds_t
54 cdef extern from "sys/time.h":
60 cdef extern from "rados/rados_types.h" nogil:
61 cdef char* _LIBRADOS_ALL_NSPACES "LIBRADOS_ALL_NSPACES"
64 cdef extern from "rados/librados.h" nogil:
66 _LIBRADOS_OP_FLAG_EXCL "LIBRADOS_OP_FLAG_EXCL"
67 _LIBRADOS_OP_FLAG_FAILOK "LIBRADOS_OP_FLAG_FAILOK"
68 _LIBRADOS_OP_FLAG_FADVISE_RANDOM "LIBRADOS_OP_FLAG_FADVISE_RANDOM"
69 _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL "LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL"
70 _LIBRADOS_OP_FLAG_FADVISE_WILLNEED "LIBRADOS_OP_FLAG_FADVISE_WILLNEED"
71 _LIBRADOS_OP_FLAG_FADVISE_DONTNEED "LIBRADOS_OP_FLAG_FADVISE_DONTNEED"
72 _LIBRADOS_OP_FLAG_FADVISE_NOCACHE "LIBRADOS_OP_FLAG_FADVISE_NOCACHE"
76 _LIBRADOS_OPERATION_NOFLAG "LIBRADOS_OPERATION_NOFLAG"
77 _LIBRADOS_OPERATION_BALANCE_READS "LIBRADOS_OPERATION_BALANCE_READS"
78 _LIBRADOS_OPERATION_LOCALIZE_READS "LIBRADOS_OPERATION_LOCALIZE_READS"
79 _LIBRADOS_OPERATION_ORDER_READS_WRITES "LIBRADOS_OPERATION_ORDER_READS_WRITES"
80 _LIBRADOS_OPERATION_IGNORE_CACHE "LIBRADOS_OPERATION_IGNORE_CACHE"
81 _LIBRADOS_OPERATION_SKIPRWLOCKS "LIBRADOS_OPERATION_SKIPRWLOCKS"
82 _LIBRADOS_OPERATION_IGNORE_OVERLAY "LIBRADOS_OPERATION_IGNORE_OVERLAY"
83 _LIBRADOS_CREATE_EXCLUSIVE "LIBRADOS_CREATE_EXCLUSIVE"
84 _LIBRADOS_CREATE_IDEMPOTENT "LIBRADOS_CREATE_IDEMPOTENT"
86 cdef uint64_t _LIBRADOS_SNAP_HEAD "LIBRADOS_SNAP_HEAD"
88 ctypedef void* rados_t
89 ctypedef void* rados_config_t
90 ctypedef void* rados_ioctx_t
91 ctypedef void* rados_xattrs_iter_t
92 ctypedef void* rados_omap_iter_t
93 ctypedef void* rados_list_ctx_t
94 ctypedef uint64_t rados_snap_t
95 ctypedef void *rados_write_op_t
96 ctypedef void *rados_read_op_t
97 ctypedef void *rados_completion_t
98 ctypedef void (*rados_callback_t)(rados_completion_t cb, void *arg)
99 ctypedef void (*rados_log_callback_t)(void *arg, const char *line, const char *who,
100 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
101 ctypedef void (*rados_log_callback2_t)(void *arg, const char *line, const char *channel, const char *who, const char *name,
102 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
105 cdef struct rados_cluster_stat_t:
111 cdef struct rados_pool_stat_t:
115 uint64_t num_object_clones
116 uint64_t num_object_copies
117 uint64_t num_objects_missing_on_primary
118 uint64_t num_objects_unfound
119 uint64_t num_objects_degraded
125 void rados_buffer_free(char *buf)
127 void rados_version(int *major, int *minor, int *extra)
128 int rados_create2(rados_t *pcluster, const char *const clustername,
129 const char * const name, uint64_t flags)
130 int rados_create_with_context(rados_t *cluster, rados_config_t cct)
131 int rados_connect(rados_t cluster)
132 void rados_shutdown(rados_t cluster)
133 int rados_conf_read_file(rados_t cluster, const char *path)
134 int rados_conf_parse_argv_remainder(rados_t cluster, int argc, const char **argv, const char **remargv)
135 int rados_conf_parse_env(rados_t cluster, const char *var)
136 int rados_conf_set(rados_t cluster, char *option, const char *value)
137 int rados_conf_get(rados_t cluster, char *option, char *buf, size_t len)
139 int rados_ioctx_pool_stat(rados_ioctx_t io, rados_pool_stat_t *stats)
140 int64_t rados_pool_lookup(rados_t cluster, const char *pool_name)
141 int rados_pool_reverse_lookup(rados_t cluster, int64_t id, char *buf, size_t maxlen)
142 int rados_pool_create(rados_t cluster, const char *pool_name)
143 int rados_pool_create_with_auid(rados_t cluster, const char *pool_name, uint64_t auid)
144 int rados_pool_create_with_crush_rule(rados_t cluster, const char *pool_name, uint8_t crush_rule_num)
145 int rados_pool_create_with_all(rados_t cluster, const char *pool_name, uint64_t auid, uint8_t crush_rule_num)
146 int rados_pool_get_base_tier(rados_t cluster, int64_t pool, int64_t *base_tier)
147 int rados_pool_list(rados_t cluster, char *buf, size_t len)
148 int rados_pool_delete(rados_t cluster, const char *pool_name)
149 int rados_inconsistent_pg_list(rados_t cluster, int64_t pool, char *buf, size_t len)
151 int rados_cluster_stat(rados_t cluster, rados_cluster_stat_t *result)
152 int rados_cluster_fsid(rados_t cluster, char *buf, size_t len)
153 int rados_blacklist_add(rados_t cluster, char *client_address, uint32_t expire_seconds)
155 int rados_ping_monitor(rados_t cluster, const char *mon_id, char **outstr, size_t *outstrlen)
156 int rados_mon_command(rados_t cluster, const char **cmd, size_t cmdlen,
157 const char *inbuf, size_t inbuflen,
158 char **outbuf, size_t *outbuflen,
159 char **outs, size_t *outslen)
160 int rados_mgr_command(rados_t cluster, const char **cmd, size_t cmdlen,
161 const char *inbuf, size_t inbuflen,
162 char **outbuf, size_t *outbuflen,
163 char **outs, size_t *outslen)
164 int rados_mon_command_target(rados_t cluster, const char *name, const char **cmd, size_t cmdlen,
165 const char *inbuf, size_t inbuflen,
166 char **outbuf, size_t *outbuflen,
167 char **outs, size_t *outslen)
168 int rados_osd_command(rados_t cluster, int osdid, const char **cmd, size_t cmdlen,
169 const char *inbuf, size_t inbuflen,
170 char **outbuf, size_t *outbuflen,
171 char **outs, size_t *outslen)
172 int rados_pg_command(rados_t cluster, const char *pgstr, const char **cmd, size_t cmdlen,
173 const char *inbuf, size_t inbuflen,
174 char **outbuf, size_t *outbuflen,
175 char **outs, size_t *outslen)
176 int rados_monitor_log(rados_t cluster, const char *level, rados_log_callback_t cb, void *arg)
177 int rados_monitor_log2(rados_t cluster, const char *level, rados_log_callback2_t cb, void *arg)
179 int rados_wait_for_latest_osdmap(rados_t cluster)
181 int rados_ioctx_create(rados_t cluster, const char *pool_name, rados_ioctx_t *ioctx)
182 void rados_ioctx_destroy(rados_ioctx_t io)
183 int rados_ioctx_pool_set_auid(rados_ioctx_t io, uint64_t auid)
184 void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key)
185 void rados_ioctx_set_namespace(rados_ioctx_t io, const char * nspace)
187 uint64_t rados_get_last_version(rados_ioctx_t io)
188 int rados_stat(rados_ioctx_t io, const char *o, uint64_t *psize, time_t *pmtime)
189 int rados_write(rados_ioctx_t io, const char *oid, const char *buf, size_t len, uint64_t off)
190 int rados_write_full(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
191 int rados_append(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
192 int rados_read(rados_ioctx_t io, const char *oid, char *buf, size_t len, uint64_t off)
193 int rados_remove(rados_ioctx_t io, const char *oid)
194 int rados_trunc(rados_ioctx_t io, const char *oid, uint64_t size)
195 int rados_getxattr(rados_ioctx_t io, const char *o, const char *name, char *buf, size_t len)
196 int rados_setxattr(rados_ioctx_t io, const char *o, const char *name, const char *buf, size_t len)
197 int rados_rmxattr(rados_ioctx_t io, const char *o, const char *name)
198 int rados_getxattrs(rados_ioctx_t io, const char *oid, rados_xattrs_iter_t *iter)
199 int rados_getxattrs_next(rados_xattrs_iter_t iter, const char **name, const char **val, size_t *len)
200 void rados_getxattrs_end(rados_xattrs_iter_t iter)
202 int rados_nobjects_list_open(rados_ioctx_t io, rados_list_ctx_t *ctx)
203 int rados_nobjects_list_next(rados_list_ctx_t ctx, const char **entry, const char **key, const char **nspace)
204 void rados_nobjects_list_close(rados_list_ctx_t ctx)
206 int rados_ioctx_snap_rollback(rados_ioctx_t io, const char * oid, const char * snapname)
207 int rados_ioctx_snap_create(rados_ioctx_t io, const char * snapname)
208 int rados_ioctx_snap_remove(rados_ioctx_t io, const char * snapname)
209 int rados_ioctx_snap_lookup(rados_ioctx_t io, const char * name, rados_snap_t * id)
210 int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id, char * name, int maxlen)
211 void rados_ioctx_snap_set_read(rados_ioctx_t io, rados_snap_t snap)
212 int rados_ioctx_snap_list(rados_ioctx_t io, rados_snap_t * snaps, int maxlen)
213 int rados_ioctx_snap_get_stamp(rados_ioctx_t io, rados_snap_t id, time_t * t)
215 int rados_lock_exclusive(rados_ioctx_t io, const char * oid, const char * name,
216 const char * cookie, const char * desc,
217 timeval * duration, uint8_t flags)
218 int rados_lock_shared(rados_ioctx_t io, const char * o, const char * name,
219 const char * cookie, const char * tag, const char * desc,
220 timeval * duration, uint8_t flags)
221 int rados_unlock(rados_ioctx_t io, const char * o, const char * name, const char * cookie)
223 rados_write_op_t rados_create_write_op()
224 void rados_release_write_op(rados_write_op_t write_op)
226 rados_read_op_t rados_create_read_op()
227 void rados_release_read_op(rados_read_op_t read_op)
229 int rados_aio_create_completion(void * cb_arg, rados_callback_t cb_complete, rados_callback_t cb_safe, rados_completion_t * pc)
230 void rados_aio_release(rados_completion_t c)
231 int rados_aio_stat(rados_ioctx_t io, const char *oid, rados_completion_t completion, uint64_t *psize, time_t *pmtime)
232 int rados_aio_write(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len, uint64_t off)
233 int rados_aio_append(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
234 int rados_aio_write_full(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
235 int rados_aio_remove(rados_ioctx_t io, const char * oid, rados_completion_t completion)
236 int rados_aio_read(rados_ioctx_t io, const char * oid, rados_completion_t completion, char * buf, size_t len, uint64_t off)
237 int rados_aio_flush(rados_ioctx_t io)
239 int rados_aio_get_return_value(rados_completion_t c)
240 int rados_aio_wait_for_complete_and_cb(rados_completion_t c)
241 int rados_aio_wait_for_safe_and_cb(rados_completion_t c)
242 int rados_aio_wait_for_complete(rados_completion_t c)
243 int rados_aio_wait_for_safe(rados_completion_t c)
244 int rados_aio_is_complete(rados_completion_t c)
245 int rados_aio_is_safe(rados_completion_t c)
247 int rados_exec(rados_ioctx_t io, const char * oid, const char * cls, const char * method,
248 const char * in_buf, size_t in_len, char * buf, size_t out_len)
249 int rados_aio_exec(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * cls, const char * method,
250 const char * in_buf, size_t in_len, char * buf, size_t out_len)
252 int rados_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, const char * oid, time_t * mtime, int flags)
253 int rados_aio_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, time_t *mtime, int flags)
254 void rados_write_op_omap_set(rados_write_op_t write_op, const char * const* keys, const char * const* vals, const size_t * lens, size_t num)
255 void rados_write_op_omap_rm_keys(rados_write_op_t write_op, const char * const* keys, size_t keys_len)
256 void rados_write_op_omap_clear(rados_write_op_t write_op)
257 void rados_write_op_set_flags(rados_write_op_t write_op, int flags)
259 void rados_write_op_create(rados_write_op_t write_op, int exclusive, const char *category)
260 void rados_write_op_append(rados_write_op_t write_op, const char *buffer, size_t len)
261 void rados_write_op_write_full(rados_write_op_t write_op, const char *buffer, size_t len)
262 void rados_write_op_write(rados_write_op_t write_op, const char *buffer, size_t len, uint64_t offset)
263 void rados_write_op_remove(rados_write_op_t write_op)
264 void rados_write_op_truncate(rados_write_op_t write_op, uint64_t offset)
265 void rados_write_op_zero(rados_write_op_t write_op, uint64_t offset, uint64_t len)
267 void rados_read_op_omap_get_vals(rados_read_op_t read_op, const char * start_after, const char * filter_prefix, uint64_t max_return, rados_omap_iter_t * iter, int * prval)
268 void rados_read_op_omap_get_keys(rados_read_op_t read_op, const char * start_after, uint64_t max_return, rados_omap_iter_t * iter, int * prval)
269 void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op, const char * const* keys, size_t keys_len, rados_omap_iter_t * iter, int * prval)
270 int rados_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, const char * oid, int flags)
271 int rados_aio_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, int flags)
272 void rados_read_op_set_flags(rados_read_op_t read_op, int flags)
273 int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len)
274 void rados_omap_get_end(rados_omap_iter_t iter)
277 LIBRADOS_OP_FLAG_EXCL = _LIBRADOS_OP_FLAG_EXCL
278 LIBRADOS_OP_FLAG_FAILOK = _LIBRADOS_OP_FLAG_FAILOK
279 LIBRADOS_OP_FLAG_FADVISE_RANDOM = _LIBRADOS_OP_FLAG_FADVISE_RANDOM
280 LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL = _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL
281 LIBRADOS_OP_FLAG_FADVISE_WILLNEED = _LIBRADOS_OP_FLAG_FADVISE_WILLNEED
282 LIBRADOS_OP_FLAG_FADVISE_DONTNEED = _LIBRADOS_OP_FLAG_FADVISE_DONTNEED
283 LIBRADOS_OP_FLAG_FADVISE_NOCACHE = _LIBRADOS_OP_FLAG_FADVISE_NOCACHE
285 LIBRADOS_SNAP_HEAD = _LIBRADOS_SNAP_HEAD
287 LIBRADOS_OPERATION_NOFLAG = _LIBRADOS_OPERATION_NOFLAG
288 LIBRADOS_OPERATION_BALANCE_READS = _LIBRADOS_OPERATION_BALANCE_READS
289 LIBRADOS_OPERATION_LOCALIZE_READS = _LIBRADOS_OPERATION_LOCALIZE_READS
290 LIBRADOS_OPERATION_ORDER_READS_WRITES = _LIBRADOS_OPERATION_ORDER_READS_WRITES
291 LIBRADOS_OPERATION_IGNORE_CACHE = _LIBRADOS_OPERATION_IGNORE_CACHE
292 LIBRADOS_OPERATION_SKIPRWLOCKS = _LIBRADOS_OPERATION_SKIPRWLOCKS
293 LIBRADOS_OPERATION_IGNORE_OVERLAY = _LIBRADOS_OPERATION_IGNORE_OVERLAY
295 LIBRADOS_ALL_NSPACES = _LIBRADOS_ALL_NSPACES.decode('utf-8')
297 LIBRADOS_CREATE_EXCLUSIVE = _LIBRADOS_CREATE_EXCLUSIVE
298 LIBRADOS_CREATE_IDEMPOTENT = _LIBRADOS_CREATE_IDEMPOTENT
300 ANONYMOUS_AUID = 0xffffffffffffffff
304 class Error(Exception):
305 """ `Error` class, derived from `Exception` """
309 class InvalidArgumentError(Error):
313 class OSError(Error):
314 """ `OSError` class, derived from `Error` """
315 def __init__(self, message, errno=None):
316 super(OSError, self).__init__(message)
320 msg = super(OSError, self).__str__()
321 if self.errno is None:
323 return '[errno {0}] {1}'.format(self.errno, msg)
325 def __reduce__(self):
326 return (self.__class__, (self.message, self.errno))
328 class InterruptedOrTimeoutError(OSError):
329 """ `InterruptedOrTimeoutError` class, derived from `OSError` """
333 class PermissionError(OSError):
334 """ `PermissionError` class, derived from `OSError` """
338 class PermissionDeniedError(OSError):
339 """ deal with EACCES related. """
343 class ObjectNotFound(OSError):
344 """ `ObjectNotFound` class, derived from `OSError` """
348 class NoData(OSError):
349 """ `NoData` class, derived from `OSError` """
353 class ObjectExists(OSError):
354 """ `ObjectExists` class, derived from `OSError` """
358 class ObjectBusy(OSError):
359 """ `ObjectBusy` class, derived from `IOError` """
363 class IOError(OSError):
364 """ `ObjectBusy` class, derived from `OSError` """
368 class NoSpace(OSError):
369 """ `NoSpace` class, derived from `OSError` """
373 class RadosStateError(Error):
374 """ `RadosStateError` class, derived from `Error` """
378 class IoctxStateError(Error):
379 """ `IoctxStateError` class, derived from `Error` """
383 class ObjectStateError(Error):
384 """ `ObjectStateError` class, derived from `Error` """
388 class LogicError(Error):
389 """ `` class, derived from `Error` """
393 class TimedOut(OSError):
394 """ `TimedOut` class, derived from `OSError` """
398 IF UNAME_SYSNAME == "FreeBSD":
399 cdef errno_to_exception = {
400 errno.EPERM : PermissionError,
401 errno.ENOENT : ObjectNotFound,
403 errno.ENOSPC : NoSpace,
404 errno.EEXIST : ObjectExists,
405 errno.EBUSY : ObjectBusy,
406 errno.ENOATTR : NoData,
407 errno.EINTR : InterruptedOrTimeoutError,
408 errno.ETIMEDOUT : TimedOut,
409 errno.EACCES : PermissionDeniedError,
410 errno.EINVAL : InvalidArgumentError,
413 cdef errno_to_exception = {
414 errno.EPERM : PermissionError,
415 errno.ENOENT : ObjectNotFound,
417 errno.ENOSPC : NoSpace,
418 errno.EEXIST : ObjectExists,
419 errno.EBUSY : ObjectBusy,
420 errno.ENODATA : NoData,
421 errno.EINTR : InterruptedOrTimeoutError,
422 errno.ETIMEDOUT : TimedOut,
423 errno.EACCES : PermissionDeniedError,
424 errno.EINVAL : InvalidArgumentError,
428 cdef make_ex(ret, msg):
430 Translate a librados return code into an exception.
432 :param ret: the return code
434 :param msg: the error message to use
436 :returns: a subclass of :class:`Error`
439 if ret in errno_to_exception:
440 return errno_to_exception[ret](msg, errno=ret)
442 return OSError(msg, errno=ret)
445 # helper to specify an optional argument, where in addition to `cls`, `None`
451 # validate argument types of an instance method
452 # kwargs is an un-ordered dict, so use args instead
453 def requires(*types):
454 def is_type_of(v, t):
458 return isinstance(v, t)
460 def check_type(val, arg_name, arg_type):
461 if isinstance(arg_type, tuple):
462 if any(is_type_of(val, t) for t in arg_type):
464 type_names = ' or '.join('None' if t is None else t.__name__
466 raise TypeError('%s must be %s' % (arg_name, type_names))
468 if is_type_of(val, arg_type):
470 assert(arg_type is not None)
471 raise TypeError('%s must be %s' % (arg_name, arg_type.__name__))
474 # FIXME(sileht): this stop with
475 # AttributeError: 'method_descriptor' object has no attribute '__module__'
477 def validate_func(*args, **kwargs):
478 # ignore the `self` arg
479 pos_args = zip(args[1:], types)
480 named_args = ((kwargs[name], (name, spec)) for name, spec in types
482 for arg_val, (arg_name, arg_type) in chain(pos_args, named_args):
483 check_type(arg_val, arg_name, arg_type)
484 return f(*args, **kwargs)
489 def cstr(val, name, encoding="utf-8", opt=False):
491 Create a byte string from a Python string
493 :param basestring val: Python string
494 :param str name: Name of the string parameter, for exceptions
495 :param str encoding: Encoding to use
496 :param bool opt: If True, None is allowed
498 :raises: :class:`InvalidArgument`
500 if opt and val is None:
502 if isinstance(val, bytes):
504 elif isinstance(val, unicode):
505 return val.encode(encoding)
507 raise TypeError('%s must be a string' % name)
510 def cstr_list(list_str, name, encoding="utf-8"):
511 return [cstr(s, name) for s in list_str]
514 def decode_cstr(val, encoding="utf-8"):
516 Decode a byte string into a Python string.
518 :param bytes val: byte string
519 :rtype: unicode or None
524 return val.decode(encoding)
527 cdef char* opt_str(s) except? NULL:
533 cdef void* realloc_chk(void* ptr, size_t size) except NULL:
534 cdef void *ret = realloc(ptr, size)
536 raise MemoryError("realloc failed")
540 cdef size_t * to_csize_t_array(list_int):
541 cdef size_t *ret = <size_t *>malloc(len(list_int) * sizeof(size_t))
543 raise MemoryError("malloc failed")
544 for i in xrange(len(list_int)):
545 ret[i] = <size_t>list_int[i]
549 cdef char ** to_bytes_array(list_bytes):
550 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
552 raise MemoryError("malloc failed")
553 for i in xrange(len(list_bytes)):
554 ret[i] = <char *>list_bytes[i]
559 cdef int __monitor_callback(void *arg, const char *line, const char *who,
560 uint64_t sec, uint64_t nsec, uint64_t seq,
561 const char *level, const char *msg) with gil:
562 cdef object cb_info = <object>arg
563 cb_info[0](cb_info[1], line, who, sec, nsec, seq, level, msg)
566 cdef int __monitor_callback2(void *arg, const char *line, const char *channel,
569 uint64_t sec, uint64_t nsec, uint64_t seq,
570 const char *level, const char *msg) with gil:
571 cdef object cb_info = <object>arg
572 cb_info[0](cb_info[1], line, channel, name, who, sec, nsec, seq, level, msg)
576 class Version(object):
577 """ Version information """
578 def __init__(self, major, minor, extra):
584 return "%d.%d.%d" % (self.major, self.minor, self.extra)
587 cdef class Rados(object):
588 """This class wraps librados functions"""
589 # NOTE(sileht): attributes declared in .pyd
591 def __init__(self, *args, **kwargs):
593 self.__setup(*args, **kwargs)
595 @requires(('rados_id', opt(str_type)), ('name', opt(str_type)), ('clustername', opt(str_type)),
596 ('conffile', opt(str_type)))
597 def __setup(self, rados_id=None, name=None, clustername=None,
598 conf_defaults=None, conffile=None, conf=None, flags=0,
600 self.monitor_callback = None
601 self.monitor_callback2 = None
602 self.parsed_args = []
603 self.conf_defaults = conf_defaults
604 self.conffile = conffile
605 self.rados_id = rados_id
607 if rados_id and name:
608 raise Error("Rados(): can't supply both rados_id and name")
610 name = 'client.' + rados_id
612 name = 'client.admin'
613 if clustername is None:
616 name = cstr(name, 'name')
617 clustername = cstr(clustername, 'clustername')
620 char *_clustername = clustername
625 # Unpack void* (aka rados_config_t) from capsule
626 rados_config = <rados_config_t> PyCapsule_GetPointer(context, NULL)
628 ret = rados_create_with_context(&self.cluster, rados_config)
631 ret = rados_create2(&self.cluster, _clustername, _name, _flags)
633 raise Error("rados_initialize failed with error code: %d" % ret)
635 self.state = "configuring"
636 # order is important: conf_defaults, then conffile, then conf
638 for key, value in conf_defaults.items():
639 self.conf_set(key, value)
640 if conffile is not None:
641 # read the default conf file when '' is given
644 self.conf_read_file(conffile)
646 for key, value in conf.items():
647 self.conf_set(key, value)
649 def require_state(self, *args):
651 Checks if the Rados object is in a special state
653 :raises: RadosStateError
655 if self.state in args:
657 raise RadosStateError("You cannot perform that operation on a \
658 Rados object in state %s." % self.state)
662 Disconnects from the cluster. Call this explicitly when a
663 Rados.connect()ed object is no longer used.
665 if self.state != "shutdown":
667 rados_shutdown(self.cluster)
668 self.state = "shutdown"
674 def __exit__(self, type_, value, traceback):
680 Get the version number of the ``librados`` C library.
682 :returns: a tuple of ``(major, minor, extra)`` components of the
689 rados_version(&major, &minor, &extra)
690 return Version(major, minor, extra)
692 @requires(('path', opt(str_type)))
693 def conf_read_file(self, path=None):
695 Configure the cluster handle using a Ceph config file.
697 :param path: path to the config file
700 self.require_state("configuring", "connected")
701 path = cstr(path, 'path', opt=True)
703 char *_path = opt_str(path)
705 ret = rados_conf_read_file(self.cluster, _path)
707 raise make_ex(ret, "error calling conf_read_file")
709 def conf_parse_argv(self, args):
711 Parse known arguments from args, and remove; returned
712 args contain only those unknown to ceph
714 self.require_state("configuring", "connected")
718 cargs = cstr_list(args, 'args')
720 int _argc = len(args)
721 char **_argv = to_bytes_array(cargs)
722 char **_remargv = NULL
725 _remargv = <char **>malloc(_argc * sizeof(char *))
727 ret = rados_conf_parse_argv_remainder(self.cluster, _argc,
729 <const char**>_remargv)
731 raise make_ex(ret, "error calling conf_parse_argv_remainder")
733 # _remargv was allocated with fixed argc; collapse return
734 # list to eliminate any missing args
735 retargs = [decode_cstr(a) for a in _remargv[:_argc]
737 self.parsed_args = args
743 def conf_parse_env(self, var='CEPH_ARGS'):
745 Parse known arguments from an environment variable, normally
748 self.require_state("configuring", "connected")
752 var = cstr(var, 'var')
756 ret = rados_conf_parse_env(self.cluster, _var)
758 raise make_ex(ret, "error calling conf_parse_env")
760 @requires(('option', str_type))
761 def conf_get(self, option):
763 Get the value of a configuration option
765 :param option: which option to read
768 :returns: str - value of the option or None
769 :raises: :class:`TypeError`
771 self.require_state("configuring", "connected")
772 option = cstr(option, 'option')
774 char *_option = option
780 ret_buf = <char *>realloc_chk(ret_buf, length)
782 ret = rados_conf_get(self.cluster, _option, ret_buf, length)
784 return decode_cstr(ret_buf)
785 elif ret == -errno.ENAMETOOLONG:
787 elif ret == -errno.ENOENT:
790 raise make_ex(ret, "error calling conf_get")
794 @requires(('option', str_type), ('val', str_type))
795 def conf_set(self, option, val):
797 Set the value of a configuration option
799 :param option: which option to set
801 :param option: value of the option
804 :raises: :class:`TypeError`, :class:`ObjectNotFound`
806 self.require_state("configuring", "connected")
807 option = cstr(option, 'option')
808 val = cstr(val, 'val')
810 char *_option = option
814 ret = rados_conf_set(self.cluster, _option, _val)
816 raise make_ex(ret, "error calling conf_set")
818 def ping_monitor(self, mon_id):
820 Ping a monitor to assess liveness
822 May be used as a simply way to assess liveness, or to obtain
823 information about the monitor in a simple way even in the
826 :param mon_id: the ID portion of the monitor's name (i.e., mon.<ID>)
828 :returns: the string reply from the monitor
831 self.require_state("configuring", "connected")
833 mon_id = cstr(mon_id, 'mon_id')
835 char *_mon_id = mon_id
840 ret = rados_ping_monitor(self.cluster, _mon_id, &outstr, &outstrlen)
843 raise make_ex(ret, "error calling ping_monitor")
846 my_outstr = outstr[:outstrlen]
847 rados_buffer_free(outstr)
848 return decode_cstr(my_outstr)
850 def connect(self, timeout=0):
852 Connect to the cluster. Use shutdown() to release resources.
854 self.require_state("configuring")
855 # NOTE(sileht): timeout was supported by old python API,
856 # but this is not something available in C API, so ignore
857 # for now and remove it later
859 ret = rados_connect(self.cluster)
861 raise make_ex(ret, "error connecting to the cluster")
862 self.state = "connected"
864 def get_cluster_stats(self):
866 Read usage info about the cluster
868 This tells you total space, space used, space available, and number
869 of objects. These are not updated immediately when data is written,
870 they are eventually consistent.
872 :returns: dict - contains the following keys:
874 - ``kb`` (int) - total space
876 - ``kb_used`` (int) - space used
878 - ``kb_avail`` (int) - free space available
880 - ``num_objects`` (int) - number of objects
884 rados_cluster_stat_t stats
887 ret = rados_cluster_stat(self.cluster, &stats)
891 ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id)
892 return {'kb': stats.kb,
893 'kb_used': stats.kb_used,
894 'kb_avail': stats.kb_avail,
895 'num_objects': stats.num_objects}
897 @requires(('pool_name', str_type))
898 def pool_exists(self, pool_name):
900 Checks if a given pool exists.
902 :param pool_name: name of the pool to check
905 :raises: :class:`TypeError`, :class:`Error`
906 :returns: bool - whether the pool exists, false otherwise.
908 self.require_state("connected")
910 pool_name = cstr(pool_name, 'pool_name')
912 char *_pool_name = pool_name
915 ret = rados_pool_lookup(self.cluster, _pool_name)
918 elif ret == -errno.ENOENT:
921 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
923 @requires(('pool_name', str_type))
924 def pool_lookup(self, pool_name):
926 Returns a pool's ID based on its name.
928 :param pool_name: name of the pool to look up
931 :raises: :class:`TypeError`, :class:`Error`
932 :returns: int - pool ID, or None if it doesn't exist
934 self.require_state("connected")
935 pool_name = cstr(pool_name, 'pool_name')
937 char *_pool_name = pool_name
940 ret = rados_pool_lookup(self.cluster, _pool_name)
943 elif ret == -errno.ENOENT:
946 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
948 @requires(('pool_id', int))
949 def pool_reverse_lookup(self, pool_id):
951 Returns a pool's name based on its ID.
953 :param pool_id: ID of the pool to look up
956 :raises: :class:`TypeError`, :class:`Error`
957 :returns: string - pool name, or None if it doesn't exist
959 self.require_state("connected")
961 int64_t _pool_id = pool_id
967 name = <char *>realloc_chk(name, size)
969 ret = rados_pool_reverse_lookup(self.cluster, _pool_id, name, size)
972 elif ret != -errno.ERANGE and size <= 4096:
974 elif ret == -errno.ENOENT:
977 raise make_ex(ret, "error reverse looking up pool '%s'" % pool_id)
979 return decode_cstr(name)
984 @requires(('pool_name', str_type), ('auid', opt(int)), ('crush_rule', opt(int)))
985 def create_pool(self, pool_name, auid=None, crush_rule=None):
988 - with default settings: if auid=None and crush_rule=None
989 - owned by a specific auid: auid given and crush_rule=None
990 - with a specific CRUSH rule: if auid=None and crush_rule given
991 - with a specific CRUSH rule and auid: if auid and crush_rule given
993 :param pool_name: name of the pool to create
995 :param auid: the id of the owner of the new pool
997 :param crush_rule: rule to use for placement in the new pool
998 :type crush_rule: int
1000 :raises: :class:`TypeError`, :class:`Error`
1002 self.require_state("connected")
1004 pool_name = cstr(pool_name, 'pool_name')
1006 char *_pool_name = pool_name
1010 if auid is None and crush_rule is None:
1012 ret = rados_pool_create(self.cluster, _pool_name)
1014 _crush_rule = crush_rule
1016 ret = rados_pool_create_with_crush_rule(self.cluster, _pool_name, _crush_rule)
1017 elif crush_rule is None:
1020 ret = rados_pool_create_with_auid(self.cluster, _pool_name, _auid)
1023 _crush_rule = crush_rule
1025 ret = rados_pool_create_with_all(self.cluster, _pool_name, _auid, _crush_rule)
1027 raise make_ex(ret, "error creating pool '%s'" % pool_name)
1029 @requires(('pool_id', int))
1030 def get_pool_base_tier(self, pool_id):
1034 :returns: base pool, or pool_id if tiering is not configured for the pool
1036 self.require_state("connected")
1038 int64_t base_tier = 0
1039 int64_t _pool_id = pool_id
1042 ret = rados_pool_get_base_tier(self.cluster, _pool_id, &base_tier)
1044 raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id)
1045 return int(base_tier)
1047 @requires(('pool_name', str_type))
1048 def delete_pool(self, pool_name):
1050 Delete a pool and all data inside it.
1052 The pool is removed from the cluster immediately,
1053 but the actual data is deleted in the background.
1055 :param pool_name: name of the pool to delete
1056 :type pool_name: str
1058 :raises: :class:`TypeError`, :class:`Error`
1060 self.require_state("connected")
1062 pool_name = cstr(pool_name, 'pool_name')
1064 char *_pool_name = pool_name
1067 ret = rados_pool_delete(self.cluster, _pool_name)
1069 raise make_ex(ret, "error deleting pool '%s'" % pool_name)
1071 @requires(('pool_id', int))
1072 def get_inconsistent_pgs(self, pool_id):
1074 List inconsistent placement groups in the given pool
1076 :param pool_id: ID of the pool in which PGs are listed
1078 :returns: list - inconsistent placement groups
1080 self.require_state("connected")
1082 int64_t pool = pool_id
1088 pgs = <char *>realloc_chk(pgs, size);
1090 ret = rados_inconsistent_pg_list(self.cluster, pool,
1097 raise make_ex(ret, "error calling inconsistent_pg_list")
1098 return [pg for pg in decode_cstr(pgs[:ret]).split('\0') if pg]
1102 def list_pools(self):
1104 Gets a list of pool names.
1106 :returns: list - of pool names.
1108 self.require_state("connected")
1111 char *c_names = NULL
1115 c_names = <char *>realloc_chk(c_names, size)
1117 ret = rados_pool_list(self.cluster, c_names, size)
1122 return [name for name in decode_cstr(c_names[:ret]).split('\0')
1129 Get the fsid of the cluster as a hexadecimal string.
1131 :raises: :class:`Error`
1132 :returns: str - cluster fsid
1134 self.require_state("connected")
1138 PyObject* ret_s = NULL
1140 ret_s = PyBytes_FromStringAndSize(NULL, buf_len)
1142 ret_buf = PyBytes_AsString(ret_s)
1144 ret = rados_cluster_fsid(self.cluster, ret_buf, buf_len)
1146 raise make_ex(ret, "error getting cluster fsid")
1147 if ret != <int>buf_len:
1148 _PyBytes_Resize(&ret_s, ret)
1149 return <object>ret_s
1151 # We DECREF unconditionally: the cast to object above will have
1152 # INCREFed if necessary. This also takes care of exceptions,
1153 # including if _PyString_Resize fails (that will free the string
1154 # itself and set ret_s to NULL, hence XDECREF).
1155 ref.Py_XDECREF(ret_s)
1157 @requires(('ioctx_name', str_type))
1158 def open_ioctx(self, ioctx_name):
1160 Create an io context
1162 The io context allows you to perform operations within a particular
1165 :param ioctx_name: name of the pool
1166 :type ioctx_name: str
1168 :raises: :class:`TypeError`, :class:`Error`
1169 :returns: Ioctx - Rados Ioctx object
1171 self.require_state("connected")
1172 ioctx_name = cstr(ioctx_name, 'ioctx_name')
1175 char *_ioctx_name = ioctx_name
1177 ret = rados_ioctx_create(self.cluster, _ioctx_name, &ioctx)
1179 raise make_ex(ret, "error opening pool '%s'" % ioctx_name)
1180 io = Ioctx(ioctx_name)
1184 def mon_command(self, cmd, inbuf, timeout=0, target=None):
1186 mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen)
1187 returns (int ret, string outbuf, string outs)
1189 # NOTE(sileht): timeout is ignored because C API doesn't provide
1190 # timeout argument, but we keep it for backward compat with old python binding
1192 self.require_state("connected")
1193 cmd = cstr_list(cmd, 'c')
1195 if isinstance(target, int):
1196 # NOTE(sileht): looks weird but test_monmap_dump pass int
1197 target = str(target)
1199 target = cstr(target, 'target', opt=True)
1200 inbuf = cstr(inbuf, 'inbuf')
1203 char *_target = opt_str(target)
1204 char **_cmd = to_bytes_array(cmd)
1205 size_t _cmdlen = len(cmd)
1207 char *_inbuf = inbuf
1208 size_t _inbuf_len = len(inbuf)
1218 ret = rados_mon_command_target(self.cluster, _target,
1219 <const char **>_cmd, _cmdlen,
1220 <const char*>_inbuf, _inbuf_len,
1221 &_outbuf, &_outbuf_len,
1225 ret = rados_mon_command(self.cluster,
1226 <const char **>_cmd, _cmdlen,
1227 <const char*>_inbuf, _inbuf_len,
1228 &_outbuf, &_outbuf_len,
1231 my_outs = decode_cstr(_outs[:_outs_len])
1232 my_outbuf = _outbuf[:_outbuf_len]
1234 rados_buffer_free(_outs)
1236 rados_buffer_free(_outbuf)
1237 return (ret, my_outbuf, my_outs)
1241 def osd_command(self, osdid, cmd, inbuf, timeout=0):
1243 osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1244 returns (int ret, string outbuf, string outs)
1246 # NOTE(sileht): timeout is ignored because C API doesn't provide
1247 # timeout argument, but we keep it for backward compat with old python binding
1248 self.require_state("connected")
1250 cmd = cstr_list(cmd, 'cmd')
1251 inbuf = cstr(inbuf, 'inbuf')
1255 char **_cmd = to_bytes_array(cmd)
1256 size_t _cmdlen = len(cmd)
1258 char *_inbuf = inbuf
1259 size_t _inbuf_len = len(inbuf)
1268 ret = rados_osd_command(self.cluster, _osdid,
1269 <const char **>_cmd, _cmdlen,
1270 <const char*>_inbuf, _inbuf_len,
1271 &_outbuf, &_outbuf_len,
1274 my_outs = decode_cstr(_outs[:_outs_len])
1275 my_outbuf = _outbuf[:_outbuf_len]
1277 rados_buffer_free(_outs)
1279 rados_buffer_free(_outbuf)
1280 return (ret, my_outbuf, my_outs)
1284 def mgr_command(self, cmd, inbuf, timeout=0):
1286 returns (int ret, string outbuf, string outs)
1288 # NOTE(sileht): timeout is ignored because C API doesn't provide
1289 # timeout argument, but we keep it for backward compat with old python binding
1290 self.require_state("connected")
1292 cmd = cstr_list(cmd, 'cmd')
1293 inbuf = cstr(inbuf, 'inbuf')
1296 char **_cmd = to_bytes_array(cmd)
1297 size_t _cmdlen = len(cmd)
1299 char *_inbuf = inbuf
1300 size_t _inbuf_len = len(inbuf)
1309 ret = rados_mgr_command(self.cluster,
1310 <const char **>_cmd, _cmdlen,
1311 <const char*>_inbuf, _inbuf_len,
1312 &_outbuf, &_outbuf_len,
1315 my_outs = decode_cstr(_outs[:_outs_len])
1316 my_outbuf = _outbuf[:_outbuf_len]
1318 rados_buffer_free(_outs)
1320 rados_buffer_free(_outbuf)
1321 return (ret, my_outbuf, my_outs)
1325 def pg_command(self, pgid, cmd, inbuf, timeout=0):
1327 pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1328 returns (int ret, string outbuf, string outs)
1330 # NOTE(sileht): timeout is ignored because C API doesn't provide
1331 # timeout argument, but we keep it for backward compat with old python binding
1332 self.require_state("connected")
1334 pgid = cstr(pgid, 'pgid')
1335 cmd = cstr_list(cmd, 'cmd')
1336 inbuf = cstr(inbuf, 'inbuf')
1340 char **_cmd = to_bytes_array(cmd)
1341 size_t _cmdlen = len(cmd)
1343 char *_inbuf = inbuf
1344 size_t _inbuf_len = len(inbuf)
1353 ret = rados_pg_command(self.cluster, _pgid,
1354 <const char **>_cmd, _cmdlen,
1355 <const char *>_inbuf, _inbuf_len,
1356 &_outbuf, &_outbuf_len,
1359 my_outs = decode_cstr(_outs[:_outs_len])
1360 my_outbuf = _outbuf[:_outbuf_len]
1362 rados_buffer_free(_outs)
1364 rados_buffer_free(_outbuf)
1365 return (ret, my_outbuf, my_outs)
1369 def wait_for_latest_osdmap(self):
1370 self.require_state("connected")
1372 ret = rados_wait_for_latest_osdmap(self.cluster)
1375 def blacklist_add(self, client_address, expire_seconds=0):
1377 Blacklist a client from the OSDs
1379 :param client_address: client address
1380 :type client_address: str
1381 :param expire_seconds: number of seconds to blacklist
1382 :type expire_seconds: int
1384 :raises: :class:`Error`
1386 self.require_state("connected")
1387 client_address = cstr(client_address, 'client_address')
1389 uint32_t _expire_seconds = expire_seconds
1390 char *_client_address = client_address
1393 ret = rados_blacklist_add(self.cluster, _client_address, _expire_seconds)
1395 raise make_ex(ret, "error blacklisting client '%s'" % client_address)
1397 def monitor_log(self, level, callback, arg):
1398 if level not in MONITOR_LEVELS:
1399 raise LogicError("invalid monitor level " + level)
1400 if callback is not None and not callable(callback):
1401 raise LogicError("callback must be a callable function or None")
1403 level = cstr(level, 'level')
1404 cdef char *_level = level
1406 if callback is None:
1408 r = rados_monitor_log(self.cluster, <const char*>_level, NULL, NULL)
1409 self.monitor_callback = None
1410 self.monitor_callback2 = None
1413 cb = (callback, arg)
1414 cdef PyObject* _arg = <PyObject*>cb
1416 r = rados_monitor_log(self.cluster, <const char*>_level,
1417 <rados_log_callback_t>&__monitor_callback, _arg)
1420 raise make_ex(r, 'error calling rados_monitor_log')
1421 # NOTE(sileht): Prevents the callback method from being garbage collected
1422 self.monitor_callback = cb
1423 self.monitor_callback2 = None
1425 def monitor_log2(self, level, callback, arg):
1426 if level not in MONITOR_LEVELS:
1427 raise LogicError("invalid monitor level " + level)
1428 if callback is not None and not callable(callback):
1429 raise LogicError("callback must be a callable function or None")
1431 level = cstr(level, 'level')
1432 cdef char *_level = level
1434 if callback is None:
1436 r = rados_monitor_log2(self.cluster, <const char*>_level, NULL, NULL)
1437 self.monitor_callback = None
1438 self.monitor_callback2 = None
1441 cb = (callback, arg)
1442 cdef PyObject* _arg = <PyObject*>cb
1444 r = rados_monitor_log2(self.cluster, <const char*>_level,
1445 <rados_log_callback2_t>&__monitor_callback2, _arg)
1448 raise make_ex(r, 'error calling rados_monitor_log')
1449 # NOTE(sileht): Prevents the callback method from being garbage collected
1450 self.monitor_callback = None
1451 self.monitor_callback2 = cb
1454 cdef class OmapIterator(object):
1457 cdef public Ioctx ioctx
1458 cdef rados_omap_iter_t ctx
1460 def __cinit__(self, Ioctx ioctx):
1468 Get the next key-value pair in the object
1469 :returns: next rados.OmapItem
1477 ret = rados_omap_get_next(self.ctx, &key_, &val_, &len_)
1480 raise make_ex(ret, "error iterating over the omap")
1482 raise StopIteration()
1483 key = decode_cstr(key_)
1489 def __dealloc__(self):
1491 rados_omap_get_end(self.ctx)
1494 cdef class ObjectIterator(object):
1495 """rados.Ioctx Object iterator"""
1497 cdef rados_list_ctx_t ctx
1499 cdef public object ioctx
1501 def __cinit__(self, Ioctx ioctx):
1505 ret = rados_nobjects_list_open(ioctx.io, &self.ctx)
1507 raise make_ex(ret, "error iterating over the objects in ioctx '%s'"
1515 Get the next object name and locator in the pool
1517 :raises: StopIteration
1518 :returns: next rados.Ioctx Object
1521 const char *key_ = NULL
1522 const char *locator_ = NULL
1523 const char *nspace_ = NULL
1526 ret = rados_nobjects_list_next(self.ctx, &key_, &locator_, &nspace_)
1529 raise StopIteration()
1531 key = decode_cstr(key_)
1532 locator = decode_cstr(locator_) if locator_ != NULL else None
1533 nspace = decode_cstr(nspace_) if nspace_ != NULL else None
1534 return Object(self.ioctx, key, locator, nspace)
1536 def __dealloc__(self):
1538 rados_nobjects_list_close(self.ctx)
1541 cdef class XattrIterator(object):
1542 """Extended attribute iterator"""
1544 cdef rados_xattrs_iter_t it
1547 cdef public Ioctx ioctx
1548 cdef public object oid
1550 def __cinit__(self, Ioctx ioctx, oid):
1552 self.oid = cstr(oid, 'oid')
1553 self._oid = self.oid
1556 ret = rados_getxattrs(ioctx.io, self._oid, &self.it)
1558 raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid)
1565 Get the next xattr on the object
1567 :raises: StopIteration
1568 :returns: pair - of name and value of the next Xattr
1571 const char *name_ = NULL
1572 const char *val_ = NULL
1576 ret = rados_getxattrs_next(self.it, &name_, &val_, &len_)
1578 raise make_ex(ret, "error iterating over the extended attributes \
1579 in '%s'" % self.oid)
1581 raise StopIteration()
1582 name = decode_cstr(name_)
1586 def __dealloc__(self):
1588 rados_getxattrs_end(self.it)
1591 cdef class SnapIterator(object):
1592 """Snapshot iterator"""
1594 cdef public Ioctx ioctx
1596 cdef rados_snap_t *snaps
1600 def __cinit__(self, Ioctx ioctx):
1602 # We don't know how big a buffer we need until we've called the
1603 # function. So use the exponential doubling strategy.
1604 cdef int num_snaps = 10
1606 self.snaps = <rados_snap_t*>realloc_chk(self.snaps,
1608 sizeof(rados_snap_t))
1611 ret = rados_ioctx_snap_list(ioctx.io, self.snaps, num_snaps)
1615 elif ret != -errno.ERANGE:
1616 raise make_ex(ret, "error calling rados_snap_list for \
1617 ioctx '%s'" % self.ioctx.name)
1618 num_snaps = num_snaps * 2
1626 Get the next Snapshot
1628 :raises: :class:`Error`, StopIteration
1629 :returns: Snap - next snapshot
1631 if self.cur_snap >= self.max_snap:
1635 rados_snap_t snap_id = self.snaps[self.cur_snap]
1641 name = <char *>realloc_chk(name, name_len)
1643 ret = rados_ioctx_snap_get_name(self.ioctx.io, snap_id, name, name_len)
1646 elif ret != -errno.ERANGE:
1647 raise make_ex(ret, "rados_snap_get_name error")
1649 name_len = name_len * 2
1651 snap = Snap(self.ioctx, decode_cstr(name[:name_len]).rstrip('\0'), snap_id)
1652 self.cur_snap = self.cur_snap + 1
1658 cdef class Snap(object):
1659 """Snapshot object"""
1660 cdef public Ioctx ioctx
1661 cdef public object name
1663 # NOTE(sileht): old API was storing the ctypes object
1664 # instead of the value ....
1665 cdef public rados_snap_t snap_id
1667 def __cinit__(self, Ioctx ioctx, object name, rados_snap_t snap_id):
1670 self.snap_id = snap_id
1673 return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \
1674 % (str(self.ioctx), self.name, self.snap_id)
1676 def get_timestamp(self):
1678 Find when a snapshot in the current pool occurred
1680 :raises: :class:`Error`
1681 :returns: datetime - the data and time the snapshot was created
1683 cdef time_t snap_time
1686 ret = rados_ioctx_snap_get_stamp(self.ioctx.io, self.snap_id, &snap_time)
1688 raise make_ex(ret, "rados_ioctx_snap_get_stamp error")
1689 return datetime.fromtimestamp(snap_time)
1692 cdef class Completion(object):
1693 """completion object"""
1701 rados_callback_t complete_cb
1702 rados_callback_t safe_cb
1703 rados_completion_t rados_comp
1706 def __cinit__(self, Ioctx ioctx, object oncomplete, object onsafe):
1707 self.oncomplete = oncomplete
1708 self.onsafe = onsafe
1713 Is an asynchronous operation safe?
1715 This does not imply that the safe callback has finished.
1717 :returns: True if the operation is safe
1720 ret = rados_aio_is_safe(self.rados_comp)
1723 def is_complete(self):
1725 Has an asynchronous operation completed?
1727 This does not imply that the safe callback has finished.
1729 :returns: True if the operation is completed
1732 ret = rados_aio_is_complete(self.rados_comp)
1735 def wait_for_safe(self):
1737 Wait for an asynchronous operation to be marked safe
1739 This does not imply that the safe callback has finished.
1742 rados_aio_wait_for_safe(self.rados_comp)
1744 def wait_for_complete(self):
1746 Wait for an asynchronous operation to complete
1748 This does not imply that the complete callback has finished.
1751 rados_aio_wait_for_complete(self.rados_comp)
1753 def wait_for_safe_and_cb(self):
1755 Wait for an asynchronous operation to be marked safe and for
1756 the safe callback to have returned
1759 rados_aio_wait_for_safe_and_cb(self.rados_comp)
1761 def wait_for_complete_and_cb(self):
1763 Wait for an asynchronous operation to complete and for the
1764 complete callback to have returned
1766 :returns: whether the operation is completed
1769 ret = rados_aio_wait_for_complete_and_cb(self.rados_comp)
1772 def get_return_value(self):
1774 Get the return value of an asychronous operation
1776 The return value is set when the operation is complete or safe,
1777 whichever comes first.
1779 :returns: int - return value of the operation
1782 ret = rados_aio_get_return_value(self.rados_comp)
1785 def __dealloc__(self):
1787 Release a completion
1789 Call this when you no longer need the completion. It may not be
1790 freed immediately if the operation is not acked and committed.
1792 ref.Py_XDECREF(self.buf)
1794 if self.rados_comp != NULL:
1796 rados_aio_release(self.rados_comp)
1797 self.rados_comp = NULL
1799 def _complete(self):
1800 self.oncomplete(self)
1801 with self.ioctx.lock:
1803 self.ioctx.complete_completions.remove(self)
1807 with self.ioctx.lock:
1809 self.ioctx.safe_completions.remove(self)
1812 with self.ioctx.lock:
1814 self.ioctx.complete_completions.remove(self)
1816 self.ioctx.safe_completions.remove(self)
1819 class OpCtx(object):
1820 def __enter__(self):
1821 return self.create()
1823 def __exit__(self, type, msg, traceback):
1827 cdef class WriteOp(object):
1828 cdef rados_write_op_t write_op
1832 self.write_op = rados_create_write_op()
1837 rados_release_write_op(self.write_op)
1839 @requires(('exclusive', opt(int)))
1840 def new(self, exclusive=None):
1846 int _exclusive = exclusive
1849 rados_write_op_create(self.write_op, _exclusive, NULL)
1857 rados_write_op_remove(self.write_op)
1859 @requires(('flags', int))
1860 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
1862 Set flags for the last operation added to this write_op.
1863 :para flags: flags to apply to the last operation
1871 rados_write_op_set_flags(self.write_op, _flags)
1873 @requires(('to_write', bytes))
1874 def append(self, to_write):
1876 Append data to an object synchronously
1877 :param to_write: data to write
1878 :type to_write: bytes
1882 char *_to_write = to_write
1883 size_t length = len(to_write)
1886 rados_write_op_append(self.write_op, _to_write, length)
1888 @requires(('to_write', bytes))
1889 def write_full(self, to_write):
1891 Write whole object, atomically replacing it.
1892 :param to_write: data to write
1893 :type to_write: bytes
1897 char *_to_write = to_write
1898 size_t length = len(to_write)
1901 rados_write_op_write_full(self.write_op, _to_write, length)
1903 @requires(('to_write', bytes), ('offset', int))
1904 def write(self, to_write, offset=0):
1907 :param to_write: data to write
1908 :type to_write: bytes
1909 :param offset: byte offset in the object to begin writing at
1914 char *_to_write = to_write
1915 size_t length = len(to_write)
1916 uint64_t _offset = offset
1919 rados_write_op_write(self.write_op, _to_write, length, _offset)
1921 @requires(('offset', int), ('length', int))
1922 def zero(self, offset, length):
1924 Zero part of an object.
1925 :param offset: byte offset in the object to begin writing at
1927 :param offset: number of zero to write
1932 size_t _length = length
1933 uint64_t _offset = offset
1936 rados_write_op_zero(self.write_op, _length, _offset)
1938 @requires(('offset', int))
1939 def truncate(self, offset):
1942 :param offset: byte offset in the object to begin truncating at
1947 uint64_t _offset = offset
1950 rados_write_op_truncate(self.write_op, _offset)
1953 class WriteOpCtx(WriteOp, OpCtx):
1954 """write operation context manager"""
1957 cdef class ReadOp(object):
1958 cdef rados_read_op_t read_op
1962 self.read_op = rados_create_read_op()
1967 rados_release_read_op(self.read_op)
1969 @requires(('flags', int))
1970 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
1972 Set flags for the last operation added to this read_op.
1973 :para flags: flags to apply to the last operation
1981 rados_read_op_set_flags(self.read_op, _flags)
1984 class ReadOpCtx(ReadOp, OpCtx):
1985 """read operation context manager"""
1988 cdef int __aio_safe_cb(rados_completion_t completion, void *args) with gil:
1990 Callback to onsafe() for asynchronous operations
1992 cdef object cb = <object>args
1997 cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
1999 Callback to oncomplete() for asynchronous operations
2001 cdef object cb = <object>args
2006 cdef class Ioctx(object):
2007 """rados.Ioctx object"""
2008 # NOTE(sileht): attributes declared in .pyd
2010 def __init__(self, name):
2014 self.locator_key = ""
2016 self.lock = threading.Lock()
2017 self.safe_completions = []
2018 self.complete_completions = []
2020 def __enter__(self):
2023 def __exit__(self, type_, value, traceback):
2027 def __dealloc__(self):
2030 def __track_completion(self, completion_obj):
2031 if completion_obj.oncomplete:
2033 self.complete_completions.append(completion_obj)
2034 if completion_obj.onsafe:
2036 self.safe_completions.append(completion_obj)
2038 def __get_completion(self, oncomplete, onsafe):
2040 Constructs a completion to use with asynchronous operations
2042 :param oncomplete: what to do when the write is safe and complete in memory
2044 :type oncomplete: completion
2045 :param onsafe: what to do when the write is safe and complete on storage
2047 :type onsafe: completion
2049 :raises: :class:`Error`
2050 :returns: completion object
2053 completion_obj = Completion(self, oncomplete, onsafe)
2056 rados_callback_t complete_cb = NULL
2057 rados_callback_t safe_cb = NULL
2058 rados_completion_t completion
2059 PyObject* p_completion_obj= <PyObject*>completion_obj
2062 complete_cb = <rados_callback_t>&__aio_complete_cb
2064 safe_cb = <rados_callback_t>&__aio_safe_cb
2067 ret = rados_aio_create_completion(p_completion_obj, complete_cb, safe_cb,
2070 raise make_ex(ret, "error getting a completion")
2072 completion_obj.rados_comp = completion
2073 return completion_obj
2075 @requires(('object_name', str_type), ('oncomplete', opt(Callable)))
2076 def aio_stat(self, object_name, oncomplete):
2078 Asynchronously get object stats (size/mtime)
2080 oncomplete will be called with the returned size and mtime
2081 as well as the completion:
2083 oncomplete(completion, size, mtime)
2085 :param object_name: the name of the object to get stats from
2086 :type object_name: str
2087 :param oncomplete: what to do when the stat is complete
2088 :type oncomplete: completion
2090 :raises: :class:`Error`
2091 :returns: completion object
2094 object_name = cstr(object_name, 'object_name')
2097 Completion completion
2098 char *_object_name = object_name
2102 def oncomplete_(completion_v):
2103 cdef Completion _completion_v = completion_v
2104 return_value = _completion_v.get_return_value()
2105 if return_value >= 0:
2106 return oncomplete(_completion_v, psize, time.localtime(pmtime))
2108 return oncomplete(_completion_v, None, None)
2110 completion = self.__get_completion(oncomplete_, None)
2111 self.__track_completion(completion)
2113 ret = rados_aio_stat(self.io, _object_name, completion.rados_comp,
2117 completion._cleanup()
2118 raise make_ex(ret, "error stating %s" % object_name)
2121 @requires(('object_name', str_type), ('to_write', bytes), ('offset', int),
2122 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2123 def aio_write(self, object_name, to_write, offset=0,
2124 oncomplete=None, onsafe=None):
2126 Write data to an object asynchronously
2128 Queues the write and returns.
2130 :param object_name: name of the object
2131 :type object_name: str
2132 :param to_write: data to write
2133 :type to_write: bytes
2134 :param offset: byte offset in the object to begin writing at
2136 :param oncomplete: what to do when the write is safe and complete in memory
2138 :type oncomplete: completion
2139 :param onsafe: what to do when the write is safe and complete on storage
2141 :type onsafe: completion
2143 :raises: :class:`Error`
2144 :returns: completion object
2147 object_name = cstr(object_name, 'object_name')
2150 Completion completion
2151 char* _object_name = object_name
2152 char* _to_write = to_write
2153 size_t size = len(to_write)
2154 uint64_t _offset = offset
2156 completion = self.__get_completion(oncomplete, onsafe)
2157 self.__track_completion(completion)
2159 ret = rados_aio_write(self.io, _object_name, completion.rados_comp,
2160 _to_write, size, _offset)
2162 completion._cleanup()
2163 raise make_ex(ret, "error writing object %s" % object_name)
2166 @requires(('object_name', str_type), ('to_write', bytes), ('oncomplete', opt(Callable)),
2167 ('onsafe', opt(Callable)))
2168 def aio_write_full(self, object_name, to_write,
2169 oncomplete=None, onsafe=None):
2171 Asychronously write an entire object
2173 The object is filled with the provided data. If the object exists,
2174 it is atomically truncated and then written.
2175 Queues the write and returns.
2177 :param object_name: name of the object
2178 :type object_name: str
2179 :param to_write: data to write
2181 :param oncomplete: what to do when the write is safe and complete in memory
2183 :type oncomplete: completion
2184 :param onsafe: what to do when the write is safe and complete on storage
2186 :type onsafe: completion
2188 :raises: :class:`Error`
2189 :returns: completion object
2192 object_name = cstr(object_name, 'object_name')
2195 Completion completion
2196 char* _object_name = object_name
2197 char* _to_write = to_write
2198 size_t size = len(to_write)
2200 completion = self.__get_completion(oncomplete, onsafe)
2201 self.__track_completion(completion)
2203 ret = rados_aio_write_full(self.io, _object_name,
2204 completion.rados_comp,
2207 completion._cleanup()
2208 raise make_ex(ret, "error writing object %s" % object_name)
2211 @requires(('object_name', str_type), ('to_append', bytes), ('oncomplete', opt(Callable)),
2212 ('onsafe', opt(Callable)))
2213 def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):
2215 Asychronously append data to an object
2217 Queues the write and returns.
2219 :param object_name: name of the object
2220 :type object_name: str
2221 :param to_append: data to append
2222 :type to_append: str
2223 :param offset: byte offset in the object to begin writing at
2225 :param oncomplete: what to do when the write is safe and complete in memory
2227 :type oncomplete: completion
2228 :param onsafe: what to do when the write is safe and complete on storage
2230 :type onsafe: completion
2232 :raises: :class:`Error`
2233 :returns: completion object
2235 object_name = cstr(object_name, 'object_name')
2238 Completion completion
2239 char* _object_name = object_name
2240 char* _to_append = to_append
2241 size_t size = len(to_append)
2243 completion = self.__get_completion(oncomplete, onsafe)
2244 self.__track_completion(completion)
2246 ret = rados_aio_append(self.io, _object_name,
2247 completion.rados_comp,
2250 completion._cleanup()
2251 raise make_ex(ret, "error appending object %s" % object_name)
2254 def aio_flush(self):
2256 Block until all pending writes in an io context are safe
2258 :raises: :class:`Error`
2261 ret = rados_aio_flush(self.io)
2263 raise make_ex(ret, "error flushing")
2265 @requires(('object_name', str_type), ('length', int), ('offset', int),
2266 ('oncomplete', opt(Callable)))
2267 def aio_read(self, object_name, length, offset, oncomplete):
2269 Asychronously read data from an object
2271 oncomplete will be called with the returned read value as
2272 well as the completion:
2274 oncomplete(completion, data_read)
2276 :param object_name: name of the object to read from
2277 :type object_name: str
2278 :param length: the number of bytes to read
2280 :param offset: byte offset in the object to begin reading from
2282 :param oncomplete: what to do when the read is complete
2283 :type oncomplete: completion
2285 :raises: :class:`Error`
2286 :returns: completion object
2289 object_name = cstr(object_name, 'object_name')
2292 Completion completion
2293 char* _object_name = object_name
2294 uint64_t _offset = offset
2297 size_t _length = length
2299 def oncomplete_(completion_v):
2300 cdef Completion _completion_v = completion_v
2301 return_value = _completion_v.get_return_value()
2302 if return_value > 0 and return_value != length:
2303 _PyBytes_Resize(&_completion_v.buf, return_value)
2304 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2306 completion = self.__get_completion(oncomplete_, None)
2307 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2308 ret_buf = PyBytes_AsString(completion.buf)
2309 self.__track_completion(completion)
2311 ret = rados_aio_read(self.io, _object_name, completion.rados_comp,
2312 ret_buf, _length, _offset)
2314 completion._cleanup()
2315 raise make_ex(ret, "error reading %s" % object_name)
2318 @requires(('object_name', str_type), ('cls', str_type), ('method', str_type),
2319 ('data', bytes), ('length', int),
2320 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2321 def aio_execute(self, object_name, cls, method, data,
2322 length=8192, oncomplete=None, onsafe=None):
2324 Asynchronously execute an OSD class method on an object.
2326 oncomplete and onsafe will be called with the data returned from
2327 the plugin as well as the completion:
2329 oncomplete(completion, data)
2330 onsafe(completion, data)
2332 :param object_name: name of the object
2333 :type object_name: str
2334 :param cls: name of the object class
2336 :param method: name of the method
2338 :param data: input data
2340 :param length: size of output buffer in bytes (default=8192)
2342 :param oncomplete: what to do when the execution is complete
2343 :type oncomplete: completion
2344 :param onsafe: what to do when the execution is safe and complete
2345 :type onsafe: completion
2347 :raises: :class:`Error`
2348 :returns: completion object
2351 object_name = cstr(object_name, 'object_name')
2352 cls = cstr(cls, 'cls')
2353 method = cstr(method, 'method')
2355 Completion completion
2356 char *_object_name = object_name
2358 char *_method = method
2360 size_t _data_len = len(data)
2363 size_t _length = length
2365 def oncomplete_(completion_v):
2366 cdef Completion _completion_v = completion_v
2367 return_value = _completion_v.get_return_value()
2368 if return_value > 0 and return_value != length:
2369 _PyBytes_Resize(&_completion_v.buf, return_value)
2370 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2372 def onsafe_(completion_v):
2373 cdef Completion _completion_v = completion_v
2374 return_value = _completion_v.get_return_value()
2375 return onsafe(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2377 completion = self.__get_completion(oncomplete_ if oncomplete else None, onsafe_ if onsafe else None)
2378 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2379 ret_buf = PyBytes_AsString(completion.buf)
2380 self.__track_completion(completion)
2382 ret = rados_aio_exec(self.io, _object_name, completion.rados_comp,
2383 _cls, _method, _data, _data_len, ret_buf, _length)
2385 completion._cleanup()
2386 raise make_ex(ret, "error executing %s::%s on %s" % (cls, method, object_name))
2389 @requires(('object_name', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2390 def aio_remove(self, object_name, oncomplete=None, onsafe=None):
2392 Asychronously remove an object
2394 :param object_name: name of the object to remove
2395 :type object_name: str
2396 :param oncomplete: what to do when the remove is safe and complete in memory
2398 :type oncomplete: completion
2399 :param onsafe: what to do when the remove is safe and complete on storage
2401 :type onsafe: completion
2403 :raises: :class:`Error`
2404 :returns: completion object
2406 object_name = cstr(object_name, 'object_name')
2409 Completion completion
2410 char* _object_name = object_name
2412 completion = self.__get_completion(oncomplete, onsafe)
2413 self.__track_completion(completion)
2415 ret = rados_aio_remove(self.io, _object_name,
2416 completion.rados_comp)
2418 completion._cleanup()
2419 raise make_ex(ret, "error removing %s" % object_name)
2422 def require_ioctx_open(self):
2424 Checks if the rados.Ioctx object state is 'open'
2426 :raises: IoctxStateError
2428 if self.state != "open":
2429 raise IoctxStateError("The pool is %s" % self.state)
2431 def change_auid(self, auid):
2433 Attempt to change an io context's associated auid "owner."
2435 Requires that you have write permission on both the current and new
2438 :raises: :class:`Error`
2440 self.require_ioctx_open()
2443 uint64_t _auid = auid
2446 ret = rados_ioctx_pool_set_auid(self.io, _auid)
2448 raise make_ex(ret, "error changing auid of '%s' to %d"
2449 % (self.name, auid))
2451 @requires(('loc_key', str_type))
2452 def set_locator_key(self, loc_key):
2454 Set the key for mapping objects to pgs within an io context.
2456 The key is used instead of the object name to determine which
2457 placement groups an object is put in. This affects all subsequent
2458 operations of the io context - until a different locator key is
2459 set, all objects in this io context will be placed in the same pg.
2461 :param loc_key: the key to use as the object locator, or NULL to discard
2462 any previously set key
2465 :raises: :class:`TypeError`
2467 self.require_ioctx_open()
2468 cloc_key = cstr(loc_key, 'loc_key')
2469 cdef char *_loc_key = cloc_key
2471 rados_ioctx_locator_set_key(self.io, _loc_key)
2472 self.locator_key = loc_key
2474 def get_locator_key(self):
2476 Get the locator_key of context
2478 :returns: locator_key
2480 return self.locator_key
2482 @requires(('snap_id', long))
2483 def set_read(self, snap_id):
2485 Set the snapshot for reading objects.
2487 To stop to read from snapshot, use set_read(LIBRADOS_SNAP_HEAD)
2489 :param snap_id: the snapshot Id
2492 :raises: :class:`TypeError`
2494 self.require_ioctx_open()
2495 cdef rados_snap_t _snap_id = snap_id
2497 rados_ioctx_snap_set_read(self.io, _snap_id)
2499 @requires(('nspace', str_type))
2500 def set_namespace(self, nspace):
2502 Set the namespace for objects within an io context.
2504 The namespace in addition to the object name fully identifies
2505 an object. This affects all subsequent operations of the io context
2506 - until a different namespace is set, all objects in this io context
2507 will be placed in the same namespace.
2509 :param nspace: the namespace to use, or None/"" for the default namespace
2512 :raises: :class:`TypeError`
2514 self.require_ioctx_open()
2517 cnspace = cstr(nspace, 'nspace')
2518 cdef char *_nspace = cnspace
2520 rados_ioctx_set_namespace(self.io, _nspace)
2521 self.nspace = nspace
2523 def get_namespace(self):
2525 Get the namespace of context
2533 Close a rados.Ioctx object.
2535 This just tells librados that you no longer need to use the io context.
2536 It may not be freed immediately if there are pending asynchronous
2537 requests on it, but you should not use an io context again after
2538 calling this function on it.
2540 if self.state == "open":
2541 self.require_ioctx_open()
2543 rados_ioctx_destroy(self.io)
2544 self.state = "closed"
2547 @requires(('key', str_type), ('data', bytes))
2548 def write(self, key, data, offset=0):
2550 Write data to an object synchronously
2552 :param key: name of the object
2554 :param data: data to write
2556 :param offset: byte offset in the object to begin writing at
2559 :raises: :class:`TypeError`
2560 :raises: :class:`LogicError`
2561 :returns: int - 0 on success
2563 self.require_ioctx_open()
2565 key = cstr(key, 'key')
2569 size_t length = len(data)
2570 uint64_t _offset = offset
2573 ret = rados_write(self.io, _key, _data, length, _offset)
2577 raise make_ex(ret, "Ioctx.write(%s): failed to write %s"
2580 raise LogicError("Ioctx.write(%s): rados_write \
2581 returned %d, but should return zero on success." % (self.name, ret))
2583 @requires(('key', str_type), ('data', bytes))
2584 def write_full(self, key, data):
2586 Write an entire object synchronously.
2588 The object is filled with the provided data. If the object exists,
2589 it is atomically truncated and then written.
2591 :param key: name of the object
2593 :param data: data to write
2596 :raises: :class:`TypeError`
2597 :raises: :class:`Error`
2598 :returns: int - 0 on success
2600 self.require_ioctx_open()
2601 key = cstr(key, 'key')
2605 size_t length = len(data)
2608 ret = rados_write_full(self.io, _key, _data, length)
2612 raise make_ex(ret, "Ioctx.write_full(%s): failed to write %s"
2615 raise LogicError("Ioctx.write_full(%s): rados_write_full \
2616 returned %d, but should return zero on success." % (self.name, ret))
2618 @requires(('key', str_type), ('data', bytes))
2619 def append(self, key, data):
2621 Append data to an object synchronously
2623 :param key: name of the object
2625 :param data: data to write
2628 :raises: :class:`TypeError`
2629 :raises: :class:`LogicError`
2630 :returns: int - 0 on success
2632 self.require_ioctx_open()
2633 key = cstr(key, 'key')
2637 size_t length = len(data)
2640 ret = rados_append(self.io, _key, _data, length)
2644 raise make_ex(ret, "Ioctx.append(%s): failed to append %s"
2647 raise LogicError("Ioctx.append(%s): rados_append \
2648 returned %d, but should return zero on success." % (self.name, ret))
2650 @requires(('key', str_type))
2651 def read(self, key, length=8192, offset=0):
2653 Read data from an object synchronously
2655 :param key: name of the object
2657 :param length: the number of bytes to read (default=8192)
2659 :param offset: byte offset in the object to begin reading at
2662 :raises: :class:`TypeError`
2663 :raises: :class:`Error`
2664 :returns: str - data read from object
2666 self.require_ioctx_open()
2667 key = cstr(key, 'key')
2671 uint64_t _offset = offset
2672 size_t _length = length
2673 PyObject* ret_s = NULL
2675 ret_s = PyBytes_FromStringAndSize(NULL, length)
2677 ret_buf = PyBytes_AsString(ret_s)
2679 ret = rados_read(self.io, _key, ret_buf, _length, _offset)
2681 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2684 _PyBytes_Resize(&ret_s, ret)
2686 return <object>ret_s
2688 # We DECREF unconditionally: the cast to object above will have
2689 # INCREFed if necessary. This also takes care of exceptions,
2690 # including if _PyString_Resize fails (that will free the string
2691 # itself and set ret_s to NULL, hence XDECREF).
2692 ref.Py_XDECREF(ret_s)
2694 @requires(('key', str_type), ('cls', str_type), ('method', str_type), ('data', bytes))
2695 def execute(self, key, cls, method, data, length=8192):
2697 Execute an OSD class method on an object.
2699 :param key: name of the object
2701 :param cls: name of the object class
2703 :param method: name of the method
2705 :param data: input data
2707 :param length: size of output buffer in bytes (default=8192)
2710 :raises: :class:`TypeError`
2711 :raises: :class:`Error`
2712 :returns: (ret, method output)
2714 self.require_ioctx_open()
2716 key = cstr(key, 'key')
2717 cls = cstr(cls, 'cls')
2718 method = cstr(method, 'method')
2722 char *_method = method
2724 size_t _data_len = len(data)
2727 size_t _length = length
2728 PyObject* ret_s = NULL
2730 ret_s = PyBytes_FromStringAndSize(NULL, length)
2732 ret_buf = PyBytes_AsString(ret_s)
2734 ret = rados_exec(self.io, _key, _cls, _method, _data,
2735 _data_len, ret_buf, _length)
2737 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2740 _PyBytes_Resize(&ret_s, ret)
2742 return ret, <object>ret_s
2744 # We DECREF unconditionally: the cast to object above will have
2745 # INCREFed if necessary. This also takes care of exceptions,
2746 # including if _PyString_Resize fails (that will free the string
2747 # itself and set ret_s to NULL, hence XDECREF).
2748 ref.Py_XDECREF(ret_s)
2750 def get_stats(self):
2752 Get pool usage statistics
2754 :returns: dict - contains the following keys:
2756 - ``num_bytes`` (int) - size of pool in bytes
2758 - ``num_kb`` (int) - size of pool in kbytes
2760 - ``num_objects`` (int) - number of objects in the pool
2762 - ``num_object_clones`` (int) - number of object clones
2764 - ``num_object_copies`` (int) - number of object copies
2766 - ``num_objects_missing_on_primary`` (int) - number of objets
2769 - ``num_objects_unfound`` (int) - number of unfound objects
2771 - ``num_objects_degraded`` (int) - number of degraded objects
2773 - ``num_rd`` (int) - bytes read
2775 - ``num_rd_kb`` (int) - kbytes read
2777 - ``num_wr`` (int) - bytes written
2779 - ``num_wr_kb`` (int) - kbytes written
2781 self.require_ioctx_open()
2782 cdef rados_pool_stat_t stats
2784 ret = rados_ioctx_pool_stat(self.io, &stats)
2786 raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name)
2787 return {'num_bytes': stats.num_bytes,
2788 'num_kb': stats.num_kb,
2789 'num_objects': stats.num_objects,
2790 'num_object_clones': stats.num_object_clones,
2791 'num_object_copies': stats.num_object_copies,
2792 "num_objects_missing_on_primary": stats.num_objects_missing_on_primary,
2793 "num_objects_unfound": stats.num_objects_unfound,
2794 "num_objects_degraded": stats.num_objects_degraded,
2795 "num_rd": stats.num_rd,
2796 "num_rd_kb": stats.num_rd_kb,
2797 "num_wr": stats.num_wr,
2798 "num_wr_kb": stats.num_wr_kb}
2800 @requires(('key', str_type))
2801 def remove_object(self, key):
2805 This does not delete any snapshots of the object.
2807 :param key: the name of the object to delete
2810 :raises: :class:`TypeError`
2811 :raises: :class:`Error`
2812 :returns: bool - True on success
2814 self.require_ioctx_open()
2815 key = cstr(key, 'key')
2820 ret = rados_remove(self.io, _key)
2822 raise make_ex(ret, "Failed to remove '%s'" % key)
2825 @requires(('key', str_type))
2826 def trunc(self, key, size):
2830 If this enlarges the object, the new area is logically filled with
2831 zeroes. If this shrinks the object, the excess data is removed.
2833 :param key: the name of the object to resize
2835 :param size: the new size of the object in bytes
2838 :raises: :class:`TypeError`
2839 :raises: :class:`Error`
2840 :returns: int - 0 on success, otherwise raises error
2843 self.require_ioctx_open()
2844 key = cstr(key, 'key')
2847 uint64_t _size = size
2850 ret = rados_trunc(self.io, _key, _size)
2852 raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))
2855 @requires(('key', str_type))
2856 def stat(self, key):
2858 Get object stats (size/mtime)
2860 :param key: the name of the object to get stats from
2863 :raises: :class:`TypeError`
2864 :raises: :class:`Error`
2865 :returns: (size,timestamp)
2867 self.require_ioctx_open()
2869 key = cstr(key, 'key')
2876 ret = rados_stat(self.io, _key, &psize, &pmtime)
2878 raise make_ex(ret, "Failed to stat %r" % key)
2879 return psize, time.localtime(pmtime)
2881 @requires(('key', str_type), ('xattr_name', str_type))
2882 def get_xattr(self, key, xattr_name):
2884 Get the value of an extended attribute on an object.
2886 :param key: the name of the object to get xattr from
2888 :param xattr_name: which extended attribute to read
2889 :type xattr_name: str
2891 :raises: :class:`TypeError`
2892 :raises: :class:`Error`
2893 :returns: str - value of the xattr
2895 self.require_ioctx_open()
2897 key = cstr(key, 'key')
2898 xattr_name = cstr(xattr_name, 'xattr_name')
2901 char *_xattr_name = xattr_name
2902 size_t ret_length = 4096
2903 char *ret_buf = NULL
2906 while ret_length < 4096 * 1024 * 1024:
2907 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
2909 ret = rados_getxattr(self.io, _key, _xattr_name, ret_buf, ret_length)
2910 if ret == -errno.ERANGE:
2913 raise make_ex(ret, "Failed to get xattr %r" % xattr_name)
2916 return ret_buf[:ret]
2920 @requires(('oid', str_type))
2921 def get_xattrs(self, oid):
2923 Start iterating over xattrs on an object.
2925 :param oid: the name of the object to get xattrs from
2928 :raises: :class:`TypeError`
2929 :raises: :class:`Error`
2930 :returns: XattrIterator
2932 self.require_ioctx_open()
2933 return XattrIterator(self, oid)
2935 @requires(('key', str_type), ('xattr_name', str_type), ('xattr_value', bytes))
2936 def set_xattr(self, key, xattr_name, xattr_value):
2938 Set an extended attribute on an object.
2940 :param key: the name of the object to set xattr to
2942 :param xattr_name: which extended attribute to set
2943 :type xattr_name: str
2944 :param xattr_value: the value of the extended attribute
2945 :type xattr_value: bytes
2947 :raises: :class:`TypeError`
2948 :raises: :class:`Error`
2949 :returns: bool - True on success, otherwise raise an error
2951 self.require_ioctx_open()
2953 key = cstr(key, 'key')
2954 xattr_name = cstr(xattr_name, 'xattr_name')
2957 char *_xattr_name = xattr_name
2958 char *_xattr_value = xattr_value
2959 size_t _xattr_value_len = len(xattr_value)
2962 ret = rados_setxattr(self.io, _key, _xattr_name,
2963 _xattr_value, _xattr_value_len)
2965 raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
2968 @requires(('key', str_type), ('xattr_name', str_type))
2969 def rm_xattr(self, key, xattr_name):
2971 Removes an extended attribute on from an object.
2973 :param key: the name of the object to remove xattr from
2975 :param xattr_name: which extended attribute to remove
2976 :type xattr_name: str
2978 :raises: :class:`TypeError`
2979 :raises: :class:`Error`
2980 :returns: bool - True on success, otherwise raise an error
2982 self.require_ioctx_open()
2984 key = cstr(key, 'key')
2985 xattr_name = cstr(xattr_name, 'xattr_name')
2988 char *_xattr_name = xattr_name
2991 ret = rados_rmxattr(self.io, _key, _xattr_name)
2993 raise make_ex(ret, "Failed to delete key %r xattr %r" %
2997 def list_objects(self):
2999 Get ObjectIterator on rados.Ioctx object.
3001 :returns: ObjectIterator
3003 self.require_ioctx_open()
3004 return ObjectIterator(self)
3006 def list_snaps(self):
3008 Get SnapIterator on rados.Ioctx object.
3010 :returns: SnapIterator
3012 self.require_ioctx_open()
3013 return SnapIterator(self)
3015 @requires(('snap_name', str_type))
3016 def create_snap(self, snap_name):
3018 Create a pool-wide snapshot
3020 :param snap_name: the name of the snapshot
3021 :type snap_name: str
3023 :raises: :class:`TypeError`
3024 :raises: :class:`Error`
3026 self.require_ioctx_open()
3027 snap_name = cstr(snap_name, 'snap_name')
3028 cdef char *_snap_name = snap_name
3031 ret = rados_ioctx_snap_create(self.io, _snap_name)
3033 raise make_ex(ret, "Failed to create snap %s" % snap_name)
3035 @requires(('snap_name', str_type))
3036 def remove_snap(self, snap_name):
3038 Removes a pool-wide snapshot
3040 :param snap_name: the name of the snapshot
3041 :type snap_name: str
3043 :raises: :class:`TypeError`
3044 :raises: :class:`Error`
3046 self.require_ioctx_open()
3047 snap_name = cstr(snap_name, 'snap_name')
3048 cdef char *_snap_name = snap_name
3051 ret = rados_ioctx_snap_remove(self.io, _snap_name)
3053 raise make_ex(ret, "Failed to remove snap %s" % snap_name)
3055 @requires(('snap_name', str_type))
3056 def lookup_snap(self, snap_name):
3058 Get the id of a pool snapshot
3060 :param snap_name: the name of the snapshot to lookop
3061 :type snap_name: str
3063 :raises: :class:`TypeError`
3064 :raises: :class:`Error`
3065 :returns: Snap - on success
3067 self.require_ioctx_open()
3068 csnap_name = cstr(snap_name, 'snap_name')
3070 char *_snap_name = csnap_name
3071 rados_snap_t snap_id
3074 ret = rados_ioctx_snap_lookup(self.io, _snap_name, &snap_id)
3076 raise make_ex(ret, "Failed to lookup snap %s" % snap_name)
3077 return Snap(self, snap_name, int(snap_id))
3079 @requires(('oid', str_type), ('snap_name', str_type))
3080 def snap_rollback(self, oid, snap_name):
3082 Rollback an object to a snapshot
3084 :param oid: the name of the object
3086 :param snap_name: the name of the snapshot
3087 :type snap_name: str
3089 :raises: :class:`TypeError`
3090 :raises: :class:`Error`
3092 self.require_ioctx_open()
3093 oid = cstr(oid, 'oid')
3094 snap_name = cstr(snap_name, 'snap_name')
3096 char *_snap_name = snap_name
3100 ret = rados_ioctx_snap_rollback(self.io, _oid, _snap_name)
3102 raise make_ex(ret, "Failed to rollback %s" % oid)
3104 def get_last_version(self):
3106 Return the version of the last object read or written to.
3108 This exposes the internal version number of the last object read or
3109 written via this io context
3111 :returns: version of the last object used
3113 self.require_ioctx_open()
3115 ret = rados_get_last_version(self.io)
3118 def create_write_op(self):
3120 create write operation object.
3121 need call release_write_op after use
3123 return WriteOp().create()
3125 def create_read_op(self):
3127 create read operation object.
3128 need call release_read_op after use
3130 return ReadOp().create()
3132 def release_write_op(self, write_op):
3134 release memory alloc by create_write_op
3138 def release_read_op(self, read_op):
3140 release memory alloc by create_read_op
3141 :para read_op: read_op object
3146 @requires(('write_op', WriteOp), ('keys', tuple), ('values', tuple))
3147 def set_omap(self, write_op, keys, values):
3149 set keys values to write_op
3150 :para write_op: write_operation object
3151 :type write_op: WriteOp
3152 :para keys: a tuple of keys
3154 :para values: a tuple of values
3158 if len(keys) != len(values):
3159 raise Error("Rados(): keys and values must have the same number of items")
3161 keys = cstr_list(keys, 'keys')
3163 WriteOp _write_op = write_op
3164 size_t key_num = len(keys)
3165 char **_keys = to_bytes_array(keys)
3166 char **_values = to_bytes_array(values)
3167 size_t *_lens = to_csize_t_array([len(v) for v in values])
3171 rados_write_op_omap_set(_write_op.write_op,
3172 <const char**>_keys,
3173 <const char**>_values,
3174 <const size_t*>_lens, key_num)
3180 @requires(('write_op', WriteOp), ('oid', str_type), ('mtime', opt(int)), ('flags', opt(int)))
3181 def operate_write_op(self, write_op, oid, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3183 excute the real write operation
3184 :para write_op: write operation object
3185 :type write_op: WriteOp
3186 :para oid: object name
3188 :para mtime: the time to set the mtime to, 0 for the current time
3190 :para flags: flags to apply to the entire operation
3194 oid = cstr(oid, 'oid')
3196 WriteOp _write_op = write_op
3198 time_t _mtime = mtime
3202 ret = rados_write_op_operate(_write_op.write_op, self.io, _oid, &_mtime, _flags)
3204 raise make_ex(ret, "Failed to operate write op for oid %s" % oid)
3206 @requires(('write_op', WriteOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('mtime', opt(int)), ('flags', opt(int)))
3207 def operate_aio_write_op(self, write_op, oid, oncomplete=None, onsafe=None, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3209 excute the real write operation asynchronously
3210 :para write_op: write operation object
3211 :type write_op: WriteOp
3212 :para oid: object name
3214 :param oncomplete: what to do when the remove is safe and complete in memory
3216 :type oncomplete: completion
3217 :param onsafe: what to do when the remove is safe and complete on storage
3219 :type onsafe: completion
3220 :para mtime: the time to set the mtime to, 0 for the current time
3222 :para flags: flags to apply to the entire operation
3225 :raises: :class:`Error`
3226 :returns: completion object
3229 oid = cstr(oid, 'oid')
3231 WriteOp _write_op = write_op
3233 Completion completion
3234 time_t _mtime = mtime
3237 completion = self.__get_completion(oncomplete, onsafe)
3238 self.__track_completion(completion)
3241 ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid,
3244 completion._cleanup()
3245 raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid)
3248 @requires(('read_op', ReadOp), ('oid', str_type), ('flag', opt(int)))
3249 def operate_read_op(self, read_op, oid, flag=LIBRADOS_OPERATION_NOFLAG):
3251 excute the real read operation
3252 :para read_op: read operation object
3253 :type read_op: ReadOp
3254 :para oid: object name
3256 :para flag: flags to apply to the entire operation
3259 oid = cstr(oid, 'oid')
3261 ReadOp _read_op = read_op
3266 ret = rados_read_op_operate(_read_op.read_op, self.io, _oid, _flag)
3268 raise make_ex(ret, "Failed to operate read op for oid %s" % oid)
3270 @requires(('read_op', ReadOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('flag', opt(int)))
3271 def operate_aio_read_op(self, read_op, oid, oncomplete=None, onsafe=None, flag=LIBRADOS_OPERATION_NOFLAG):
3273 excute the real read operation
3274 :para read_op: read operation object
3275 :type read_op: ReadOp
3276 :para oid: object name
3278 :param oncomplete: what to do when the remove is safe and complete in memory
3280 :type oncomplete: completion
3281 :param onsafe: what to do when the remove is safe and complete on storage
3283 :type onsafe: completion
3284 :para flag: flags to apply to the entire operation
3287 oid = cstr(oid, 'oid')
3289 ReadOp _read_op = read_op
3291 Completion completion
3294 completion = self.__get_completion(oncomplete, onsafe)
3295 self.__track_completion(completion)
3298 ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag)
3300 completion._cleanup()
3301 raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid)
3304 @requires(('read_op', ReadOp), ('start_after', str_type), ('filter_prefix', str_type), ('max_return', int))
3305 def get_omap_vals(self, read_op, start_after, filter_prefix, max_return):
3308 :para read_op: read operation object
3309 :type read_op: ReadOp
3310 :para start_after: list keys starting after start_after
3311 :type start_after: str
3312 :para filter_prefix: list only keys beginning with filter_prefix
3313 :type filter_prefix: str
3314 :para max_return: list no more than max_return key/value pairs
3315 :type max_return: int
3316 :returns: an iterator over the requested omap values, return value from this action
3319 start_after = cstr(start_after, 'start_after') if start_after else None
3320 filter_prefix = cstr(filter_prefix, 'filter_prefix') if filter_prefix else None
3322 char *_start_after = opt_str(start_after)
3323 char *_filter_prefix = opt_str(filter_prefix)
3324 ReadOp _read_op = read_op
3325 rados_omap_iter_t iter_addr = NULL
3326 int _max_return = max_return
3330 rados_read_op_omap_get_vals(_read_op.read_op, _start_after, _filter_prefix,
3331 _max_return, &iter_addr, &prval)
3332 it = OmapIterator(self)
3334 return it, int(prval)
3336 @requires(('read_op', ReadOp), ('start_after', str_type), ('max_return', int))
3337 def get_omap_keys(self, read_op, start_after, max_return):
3340 :para read_op: read operation object
3341 :type read_op: ReadOp
3342 :para start_after: list keys starting after start_after
3343 :type start_after: str
3344 :para max_return: list no more than max_return key/value pairs
3345 :type max_return: int
3346 :returns: an iterator over the requested omap values, return value from this action
3348 start_after = cstr(start_after, 'start_after') if start_after else None
3350 char *_start_after = opt_str(start_after)
3351 ReadOp _read_op = read_op
3352 rados_omap_iter_t iter_addr = NULL
3353 int _max_return = max_return
3357 rados_read_op_omap_get_keys(_read_op.read_op, _start_after,
3358 _max_return, &iter_addr, &prval)
3359 it = OmapIterator(self)
3361 return it, int(prval)
3363 @requires(('read_op', ReadOp), ('keys', tuple))
3364 def get_omap_vals_by_keys(self, read_op, keys):
3366 get the omap values by keys
3367 :para read_op: read operation object
3368 :type read_op: ReadOp
3369 :para keys: input key tuple
3371 :returns: an iterator over the requested omap values, return value from this action
3373 keys = cstr_list(keys, 'keys')
3375 ReadOp _read_op = read_op
3376 rados_omap_iter_t iter_addr
3377 char **_keys = to_bytes_array(keys)
3378 size_t key_num = len(keys)
3383 rados_read_op_omap_get_vals_by_keys(_read_op.read_op,
3384 <const char**>_keys,
3385 key_num, &iter_addr, &prval)
3386 it = OmapIterator(self)
3388 return it, int(prval)
3392 @requires(('write_op', WriteOp), ('keys', tuple))
3393 def remove_omap_keys(self, write_op, keys):
3395 remove omap keys specifiled
3396 :para write_op: write operation object
3397 :type write_op: WriteOp
3398 :para keys: input key tuple
3402 keys = cstr_list(keys, 'keys')
3404 WriteOp _write_op = write_op
3405 size_t key_num = len(keys)
3406 char **_keys = to_bytes_array(keys)
3410 rados_write_op_omap_rm_keys(_write_op.write_op, <const char**>_keys, key_num)
3414 @requires(('write_op', WriteOp))
3415 def clear_omap(self, write_op):
3417 Remove all key/value pairs from an object
3418 :para write_op: write operation object
3419 :type write_op: WriteOp
3423 WriteOp _write_op = write_op
3426 rados_write_op_omap_clear(_write_op.write_op)
3428 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('desc', str_type),
3429 ('duration', opt(int)), ('flags', int))
3430 def lock_exclusive(self, key, name, cookie, desc="", duration=None, flags=0):
3433 Take an exclusive lock on an object
3435 :param key: name of the object
3437 :param name: name of the lock
3439 :param cookie: cookie of the lock
3441 :param desc: description of the lock
3443 :param duration: duration of the lock in seconds
3448 :raises: :class:`TypeError`
3449 :raises: :class:`Error`
3451 self.require_ioctx_open()
3453 key = cstr(key, 'key')
3454 name = cstr(name, 'name')
3455 cookie = cstr(cookie, 'cookie')
3456 desc = cstr(desc, 'desc')
3461 char* _cookie = cookie
3463 uint8_t _flags = flags
3466 if duration is None:
3468 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3471 _duration.tv_sec = duration
3473 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3477 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3479 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('tag', str_type),
3480 ('desc', str_type), ('duration', opt(int)), ('flags', int))
3481 def lock_shared(self, key, name, cookie, tag, desc="", duration=None, flags=0):
3484 Take a shared lock on an object
3486 :param key: name of the object
3488 :param name: name of the lock
3490 :param cookie: cookie of the lock
3492 :param tag: tag of the lock
3494 :param desc: description of the lock
3496 :param duration: duration of the lock in seconds
3501 :raises: :class:`TypeError`
3502 :raises: :class:`Error`
3504 self.require_ioctx_open()
3506 key = cstr(key, 'key')
3507 tag = cstr(tag, 'tag')
3508 name = cstr(name, 'name')
3509 cookie = cstr(cookie, 'cookie')
3510 desc = cstr(desc, 'desc')
3516 char* _cookie = cookie
3518 uint8_t _flags = flags
3521 if duration is None:
3523 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3526 _duration.tv_sec = duration
3528 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3531 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3533 @requires(('key', str_type), ('name', str_type), ('cookie', str_type))
3534 def unlock(self, key, name, cookie):
3537 Release a shared or exclusive lock on an object
3539 :param key: name of the object
3541 :param name: name of the lock
3543 :param cookie: cookie of the lock
3546 :raises: :class:`TypeError`
3547 :raises: :class:`Error`
3549 self.require_ioctx_open()
3551 key = cstr(key, 'key')
3552 name = cstr(name, 'name')
3553 cookie = cstr(cookie, 'cookie')
3558 char* _cookie = cookie
3561 ret = rados_unlock(self.io, _key, _name, _cookie)
3563 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3566 def set_object_locator(func):
3567 def retfunc(self, *args, **kwargs):
3568 if self.locator_key is not None:
3569 old_locator = self.ioctx.get_locator_key()
3570 self.ioctx.set_locator_key(self.locator_key)
3571 retval = func(self, *args, **kwargs)
3572 self.ioctx.set_locator_key(old_locator)
3575 return func(self, *args, **kwargs)
3579 def set_object_namespace(func):
3580 def retfunc(self, *args, **kwargs):
3581 if self.nspace is None:
3582 raise LogicError("Namespace not set properly in context")
3583 old_nspace = self.ioctx.get_namespace()
3584 self.ioctx.set_namespace(self.nspace)
3585 retval = func(self, *args, **kwargs)
3586 self.ioctx.set_namespace(old_nspace)
3591 class Object(object):
3592 """Rados object wrapper, makes the object look like a file"""
3593 def __init__(self, ioctx, key, locator_key=None, nspace=None):
3597 self.state = "exists"
3598 self.locator_key = locator_key
3599 self.nspace = "" if nspace is None else nspace
3602 return "rados.Object(ioctx=%s,key=%s,nspace=%s,locator=%s)" % \
3603 (str(self.ioctx), self.key, "--default--"
3604 if self.nspace is "" else self.nspace, self.locator_key)
3606 def require_object_exists(self):
3607 if self.state != "exists":
3608 raise ObjectStateError("The object is %s" % self.state)
3611 @set_object_namespace
3612 def read(self, length=1024 * 1024):
3613 self.require_object_exists()
3614 ret = self.ioctx.read(self.key, length, self.offset)
3615 self.offset += len(ret)
3619 @set_object_namespace
3620 def write(self, string_to_write):
3621 self.require_object_exists()
3622 ret = self.ioctx.write(self.key, string_to_write, self.offset)
3624 self.offset += len(string_to_write)
3628 @set_object_namespace
3630 self.require_object_exists()
3631 self.ioctx.remove_object(self.key)
3632 self.state = "removed"
3635 @set_object_namespace
3637 self.require_object_exists()
3638 return self.ioctx.stat(self.key)
3640 def seek(self, position):
3641 self.require_object_exists()
3642 self.offset = position
3645 @set_object_namespace
3646 def get_xattr(self, xattr_name):
3647 self.require_object_exists()
3648 return self.ioctx.get_xattr(self.key, xattr_name)
3651 @set_object_namespace
3652 def get_xattrs(self):
3653 self.require_object_exists()
3654 return self.ioctx.get_xattrs(self.key)
3657 @set_object_namespace
3658 def set_xattr(self, xattr_name, xattr_value):
3659 self.require_object_exists()
3660 return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)
3663 @set_object_namespace
3664 def rm_xattr(self, xattr_name):
3665 self.require_object_exists()
3666 return self.ioctx.rm_xattr(self.key, xattr_name)
3677 class MonitorLog(object):
3678 # NOTE(sileht): Keep this class for backward compat
3679 # method moved to Rados.monitor_log()
3681 For watching cluster log messages. Instantiate an object and keep
3682 it around while callback is periodically called. Construct with
3683 'level' to monitor 'level' messages (one of MONITOR_LEVELS).
3684 arg will be passed to the callback.
3686 callback will be called with:
3687 arg (given to __init__)
3688 line (the full line, including timestamp, who, level, msg)
3689 who (which entity issued the log message)
3690 timestamp_sec (sec of a struct timespec)
3691 timestamp_nsec (sec of a struct timespec)
3692 seq (sequence number)
3693 level (string representing the level of the log message)
3694 msg (the message itself)
3695 callback's return value is ignored
3697 def __init__(self, cluster, level, callback, arg):
3699 self.callback = callback
3701 self.cluster = cluster
3702 self.cluster.monitor_log(level, callback, arg)