1 # cython: embedsignature=True
3 This module is a thin wrapper around librados.
5 Error codes from librados are turned into exceptions that subclass
6 :class:`Error`. Almost all methods may raise :class:`Error(the base class of all rados exceptions), :class:`PermissionError`
7 (the base class of all rados exceptions), :class:`PermissionError`
8 and :class:`IOError`, in addition to those documented for the
11 # Copyright 2011 Josh Durgin
12 # Copyright 2011, Hannu Valtonen <hannu.valtonen@ormod.com>
13 # Copyright 2015 Hector Martin <marcan@marcan.st>
14 # Copyright 2016 Mehdi Abaakouk <sileht@redhat.com>
16 from cpython cimport PyObject, ref
17 from cpython.pycapsule cimport *
18 from libc cimport errno
19 from libc.stdint cimport *
20 from libc.stdlib cimport malloc, realloc, free
26 from collections import Callable
27 from datetime import datetime
28 from functools import partial, wraps
29 from itertools import chain
31 # Are we running Python 2.x
32 if sys.version_info[0] < 3:
38 cdef extern from "Python.h":
39 # These are in cpython/string.pxd, but use "object" types instead of
40 # PyObject*, which invokes assumptions in cpython that we need to
41 # legitimately break to implement zero-copy string buffers in Ioctx.read().
42 # This is valid use of the Python API and documented as a special case.
43 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
44 char* PyBytes_AsString(PyObject *string) except NULL
45 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
46 void PyEval_InitThreads()
49 cdef extern from "time.h":
50 ctypedef long int time_t
51 ctypedef long int suseconds_t
54 cdef extern from "sys/time.h":
60 cdef extern from "rados/rados_types.h" nogil:
61 cdef char* _LIBRADOS_ALL_NSPACES "LIBRADOS_ALL_NSPACES"
64 cdef extern from "rados/librados.h" nogil:
66 _LIBRADOS_OP_FLAG_EXCL "LIBRADOS_OP_FLAG_EXCL"
67 _LIBRADOS_OP_FLAG_FAILOK "LIBRADOS_OP_FLAG_FAILOK"
68 _LIBRADOS_OP_FLAG_FADVISE_RANDOM "LIBRADOS_OP_FLAG_FADVISE_RANDOM"
69 _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL "LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL"
70 _LIBRADOS_OP_FLAG_FADVISE_WILLNEED "LIBRADOS_OP_FLAG_FADVISE_WILLNEED"
71 _LIBRADOS_OP_FLAG_FADVISE_DONTNEED "LIBRADOS_OP_FLAG_FADVISE_DONTNEED"
72 _LIBRADOS_OP_FLAG_FADVISE_NOCACHE "LIBRADOS_OP_FLAG_FADVISE_NOCACHE"
76 _LIBRADOS_OPERATION_NOFLAG "LIBRADOS_OPERATION_NOFLAG"
77 _LIBRADOS_OPERATION_BALANCE_READS "LIBRADOS_OPERATION_BALANCE_READS"
78 _LIBRADOS_OPERATION_LOCALIZE_READS "LIBRADOS_OPERATION_LOCALIZE_READS"
79 _LIBRADOS_OPERATION_ORDER_READS_WRITES "LIBRADOS_OPERATION_ORDER_READS_WRITES"
80 _LIBRADOS_OPERATION_IGNORE_CACHE "LIBRADOS_OPERATION_IGNORE_CACHE"
81 _LIBRADOS_OPERATION_SKIPRWLOCKS "LIBRADOS_OPERATION_SKIPRWLOCKS"
82 _LIBRADOS_OPERATION_IGNORE_OVERLAY "LIBRADOS_OPERATION_IGNORE_OVERLAY"
83 _LIBRADOS_CREATE_EXCLUSIVE "LIBRADOS_CREATE_EXCLUSIVE"
84 _LIBRADOS_CREATE_IDEMPOTENT "LIBRADOS_CREATE_IDEMPOTENT"
86 cdef uint64_t _LIBRADOS_SNAP_HEAD "LIBRADOS_SNAP_HEAD"
88 ctypedef void* rados_t
89 ctypedef void* rados_config_t
90 ctypedef void* rados_ioctx_t
91 ctypedef void* rados_xattrs_iter_t
92 ctypedef void* rados_omap_iter_t
93 ctypedef void* rados_list_ctx_t
94 ctypedef uint64_t rados_snap_t
95 ctypedef void *rados_write_op_t
96 ctypedef void *rados_read_op_t
97 ctypedef void *rados_completion_t
98 ctypedef void (*rados_callback_t)(rados_completion_t cb, void *arg)
99 ctypedef void (*rados_log_callback_t)(void *arg, const char *line, const char *who,
100 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
101 ctypedef void (*rados_log_callback2_t)(void *arg, const char *line, const char *channel, const char *who, const char *name,
102 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
105 cdef struct rados_cluster_stat_t:
111 cdef struct rados_pool_stat_t:
115 uint64_t num_object_clones
116 uint64_t num_object_copies
117 uint64_t num_objects_missing_on_primary
118 uint64_t num_objects_unfound
119 uint64_t num_objects_degraded
125 void rados_buffer_free(char *buf)
127 void rados_version(int *major, int *minor, int *extra)
128 int rados_create2(rados_t *pcluster, const char *const clustername,
129 const char * const name, uint64_t flags)
130 int rados_create_with_context(rados_t *cluster, rados_config_t cct)
131 int rados_connect(rados_t cluster)
132 void rados_shutdown(rados_t cluster)
133 int rados_conf_read_file(rados_t cluster, const char *path)
134 int rados_conf_parse_argv_remainder(rados_t cluster, int argc, const char **argv, const char **remargv)
135 int rados_conf_parse_env(rados_t cluster, const char *var)
136 int rados_conf_set(rados_t cluster, char *option, const char *value)
137 int rados_conf_get(rados_t cluster, char *option, char *buf, size_t len)
139 int rados_ioctx_pool_stat(rados_ioctx_t io, rados_pool_stat_t *stats)
140 int64_t rados_pool_lookup(rados_t cluster, const char *pool_name)
141 int rados_pool_reverse_lookup(rados_t cluster, int64_t id, char *buf, size_t maxlen)
142 int rados_pool_create(rados_t cluster, const char *pool_name)
143 int rados_pool_create_with_auid(rados_t cluster, const char *pool_name, uint64_t auid)
144 int rados_pool_create_with_crush_rule(rados_t cluster, const char *pool_name, uint8_t crush_rule_num)
145 int rados_pool_create_with_all(rados_t cluster, const char *pool_name, uint64_t auid, uint8_t crush_rule_num)
146 int rados_pool_get_base_tier(rados_t cluster, int64_t pool, int64_t *base_tier)
147 int rados_pool_list(rados_t cluster, char *buf, size_t len)
148 int rados_pool_delete(rados_t cluster, const char *pool_name)
149 int rados_inconsistent_pg_list(rados_t cluster, int64_t pool, char *buf, size_t len)
151 int rados_cluster_stat(rados_t cluster, rados_cluster_stat_t *result)
152 int rados_cluster_fsid(rados_t cluster, char *buf, size_t len)
153 int rados_blacklist_add(rados_t cluster, char *client_address, uint32_t expire_seconds)
154 int rados_application_enable(rados_ioctx_t io, const char *app_name,
156 int rados_application_list(rados_ioctx_t io, char *values,
158 int rados_application_metadata_get(rados_ioctx_t io, const char *app_name,
159 const char *key, char *value,
161 int rados_application_metadata_set(rados_ioctx_t io, const char *app_name,
162 const char *key, const char *value)
163 int rados_application_metadata_remove(rados_ioctx_t io,
164 const char *app_name, const char *key)
165 int rados_application_metadata_list(rados_ioctx_t io,
166 const char *app_name, char *keys,
167 size_t *key_len, char *values,
169 int rados_ping_monitor(rados_t cluster, const char *mon_id, char **outstr, size_t *outstrlen)
170 int rados_mon_command(rados_t cluster, const char **cmd, size_t cmdlen,
171 const char *inbuf, size_t inbuflen,
172 char **outbuf, size_t *outbuflen,
173 char **outs, size_t *outslen)
174 int rados_mgr_command(rados_t cluster, const char **cmd, size_t cmdlen,
175 const char *inbuf, size_t inbuflen,
176 char **outbuf, size_t *outbuflen,
177 char **outs, size_t *outslen)
178 int rados_mon_command_target(rados_t cluster, const char *name, const char **cmd, size_t cmdlen,
179 const char *inbuf, size_t inbuflen,
180 char **outbuf, size_t *outbuflen,
181 char **outs, size_t *outslen)
182 int rados_osd_command(rados_t cluster, int osdid, const char **cmd, size_t cmdlen,
183 const char *inbuf, size_t inbuflen,
184 char **outbuf, size_t *outbuflen,
185 char **outs, size_t *outslen)
186 int rados_pg_command(rados_t cluster, const char *pgstr, const char **cmd, size_t cmdlen,
187 const char *inbuf, size_t inbuflen,
188 char **outbuf, size_t *outbuflen,
189 char **outs, size_t *outslen)
190 int rados_monitor_log(rados_t cluster, const char *level, rados_log_callback_t cb, void *arg)
191 int rados_monitor_log2(rados_t cluster, const char *level, rados_log_callback2_t cb, void *arg)
193 int rados_wait_for_latest_osdmap(rados_t cluster)
195 int rados_ioctx_create(rados_t cluster, const char *pool_name, rados_ioctx_t *ioctx)
196 void rados_ioctx_destroy(rados_ioctx_t io)
197 int rados_ioctx_pool_set_auid(rados_ioctx_t io, uint64_t auid)
198 void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key)
199 void rados_ioctx_set_namespace(rados_ioctx_t io, const char * nspace)
201 uint64_t rados_get_last_version(rados_ioctx_t io)
202 int rados_stat(rados_ioctx_t io, const char *o, uint64_t *psize, time_t *pmtime)
203 int rados_write(rados_ioctx_t io, const char *oid, const char *buf, size_t len, uint64_t off)
204 int rados_write_full(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
205 int rados_append(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
206 int rados_read(rados_ioctx_t io, const char *oid, char *buf, size_t len, uint64_t off)
207 int rados_remove(rados_ioctx_t io, const char *oid)
208 int rados_trunc(rados_ioctx_t io, const char *oid, uint64_t size)
209 int rados_getxattr(rados_ioctx_t io, const char *o, const char *name, char *buf, size_t len)
210 int rados_setxattr(rados_ioctx_t io, const char *o, const char *name, const char *buf, size_t len)
211 int rados_rmxattr(rados_ioctx_t io, const char *o, const char *name)
212 int rados_getxattrs(rados_ioctx_t io, const char *oid, rados_xattrs_iter_t *iter)
213 int rados_getxattrs_next(rados_xattrs_iter_t iter, const char **name, const char **val, size_t *len)
214 void rados_getxattrs_end(rados_xattrs_iter_t iter)
216 int rados_nobjects_list_open(rados_ioctx_t io, rados_list_ctx_t *ctx)
217 int rados_nobjects_list_next(rados_list_ctx_t ctx, const char **entry, const char **key, const char **nspace)
218 void rados_nobjects_list_close(rados_list_ctx_t ctx)
220 int rados_ioctx_snap_rollback(rados_ioctx_t io, const char * oid, const char * snapname)
221 int rados_ioctx_snap_create(rados_ioctx_t io, const char * snapname)
222 int rados_ioctx_snap_remove(rados_ioctx_t io, const char * snapname)
223 int rados_ioctx_snap_lookup(rados_ioctx_t io, const char * name, rados_snap_t * id)
224 int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id, char * name, int maxlen)
225 void rados_ioctx_snap_set_read(rados_ioctx_t io, rados_snap_t snap)
226 int rados_ioctx_snap_list(rados_ioctx_t io, rados_snap_t * snaps, int maxlen)
227 int rados_ioctx_snap_get_stamp(rados_ioctx_t io, rados_snap_t id, time_t * t)
229 int rados_ioctx_selfmanaged_snap_create(rados_ioctx_t io,
230 rados_snap_t *snapid)
231 int rados_ioctx_selfmanaged_snap_remove(rados_ioctx_t io,
233 int rados_ioctx_selfmanaged_snap_set_write_ctx(rados_ioctx_t io,
234 rados_snap_t snap_seq,
237 int rados_ioctx_selfmanaged_snap_rollback(rados_ioctx_t io, const char *oid,
240 int rados_lock_exclusive(rados_ioctx_t io, const char * oid, const char * name,
241 const char * cookie, const char * desc,
242 timeval * duration, uint8_t flags)
243 int rados_lock_shared(rados_ioctx_t io, const char * o, const char * name,
244 const char * cookie, const char * tag, const char * desc,
245 timeval * duration, uint8_t flags)
246 int rados_unlock(rados_ioctx_t io, const char * o, const char * name, const char * cookie)
248 rados_write_op_t rados_create_write_op()
249 void rados_release_write_op(rados_write_op_t write_op)
251 rados_read_op_t rados_create_read_op()
252 void rados_release_read_op(rados_read_op_t read_op)
254 int rados_aio_create_completion(void * cb_arg, rados_callback_t cb_complete, rados_callback_t cb_safe, rados_completion_t * pc)
255 void rados_aio_release(rados_completion_t c)
256 int rados_aio_stat(rados_ioctx_t io, const char *oid, rados_completion_t completion, uint64_t *psize, time_t *pmtime)
257 int rados_aio_write(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len, uint64_t off)
258 int rados_aio_append(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
259 int rados_aio_write_full(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
260 int rados_aio_remove(rados_ioctx_t io, const char * oid, rados_completion_t completion)
261 int rados_aio_read(rados_ioctx_t io, const char * oid, rados_completion_t completion, char * buf, size_t len, uint64_t off)
262 int rados_aio_flush(rados_ioctx_t io)
264 int rados_aio_get_return_value(rados_completion_t c)
265 int rados_aio_wait_for_complete_and_cb(rados_completion_t c)
266 int rados_aio_wait_for_safe_and_cb(rados_completion_t c)
267 int rados_aio_wait_for_complete(rados_completion_t c)
268 int rados_aio_wait_for_safe(rados_completion_t c)
269 int rados_aio_is_complete(rados_completion_t c)
270 int rados_aio_is_safe(rados_completion_t c)
272 int rados_exec(rados_ioctx_t io, const char * oid, const char * cls, const char * method,
273 const char * in_buf, size_t in_len, char * buf, size_t out_len)
274 int rados_aio_exec(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * cls, const char * method,
275 const char * in_buf, size_t in_len, char * buf, size_t out_len)
277 int rados_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, const char * oid, time_t * mtime, int flags)
278 int rados_aio_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, time_t *mtime, int flags)
279 void rados_write_op_omap_set(rados_write_op_t write_op, const char * const* keys, const char * const* vals, const size_t * lens, size_t num)
280 void rados_write_op_omap_rm_keys(rados_write_op_t write_op, const char * const* keys, size_t keys_len)
281 void rados_write_op_omap_clear(rados_write_op_t write_op)
282 void rados_write_op_set_flags(rados_write_op_t write_op, int flags)
284 void rados_write_op_create(rados_write_op_t write_op, int exclusive, const char *category)
285 void rados_write_op_append(rados_write_op_t write_op, const char *buffer, size_t len)
286 void rados_write_op_write_full(rados_write_op_t write_op, const char *buffer, size_t len)
287 void rados_write_op_write(rados_write_op_t write_op, const char *buffer, size_t len, uint64_t offset)
288 void rados_write_op_remove(rados_write_op_t write_op)
289 void rados_write_op_truncate(rados_write_op_t write_op, uint64_t offset)
290 void rados_write_op_zero(rados_write_op_t write_op, uint64_t offset, uint64_t len)
292 void rados_read_op_omap_get_vals2(rados_read_op_t read_op, const char * start_after, const char * filter_prefix, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
293 void rados_read_op_omap_get_keys2(rados_read_op_t read_op, const char * start_after, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
294 void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op, const char * const* keys, size_t keys_len, rados_omap_iter_t * iter, int * prval)
295 int rados_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, const char * oid, int flags)
296 int rados_aio_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, int flags)
297 void rados_read_op_set_flags(rados_read_op_t read_op, int flags)
298 int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len)
299 void rados_omap_get_end(rados_omap_iter_t iter)
302 LIBRADOS_OP_FLAG_EXCL = _LIBRADOS_OP_FLAG_EXCL
303 LIBRADOS_OP_FLAG_FAILOK = _LIBRADOS_OP_FLAG_FAILOK
304 LIBRADOS_OP_FLAG_FADVISE_RANDOM = _LIBRADOS_OP_FLAG_FADVISE_RANDOM
305 LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL = _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL
306 LIBRADOS_OP_FLAG_FADVISE_WILLNEED = _LIBRADOS_OP_FLAG_FADVISE_WILLNEED
307 LIBRADOS_OP_FLAG_FADVISE_DONTNEED = _LIBRADOS_OP_FLAG_FADVISE_DONTNEED
308 LIBRADOS_OP_FLAG_FADVISE_NOCACHE = _LIBRADOS_OP_FLAG_FADVISE_NOCACHE
310 LIBRADOS_SNAP_HEAD = _LIBRADOS_SNAP_HEAD
312 LIBRADOS_OPERATION_NOFLAG = _LIBRADOS_OPERATION_NOFLAG
313 LIBRADOS_OPERATION_BALANCE_READS = _LIBRADOS_OPERATION_BALANCE_READS
314 LIBRADOS_OPERATION_LOCALIZE_READS = _LIBRADOS_OPERATION_LOCALIZE_READS
315 LIBRADOS_OPERATION_ORDER_READS_WRITES = _LIBRADOS_OPERATION_ORDER_READS_WRITES
316 LIBRADOS_OPERATION_IGNORE_CACHE = _LIBRADOS_OPERATION_IGNORE_CACHE
317 LIBRADOS_OPERATION_SKIPRWLOCKS = _LIBRADOS_OPERATION_SKIPRWLOCKS
318 LIBRADOS_OPERATION_IGNORE_OVERLAY = _LIBRADOS_OPERATION_IGNORE_OVERLAY
320 LIBRADOS_ALL_NSPACES = _LIBRADOS_ALL_NSPACES.decode('utf-8')
322 LIBRADOS_CREATE_EXCLUSIVE = _LIBRADOS_CREATE_EXCLUSIVE
323 LIBRADOS_CREATE_IDEMPOTENT = _LIBRADOS_CREATE_IDEMPOTENT
325 ANONYMOUS_AUID = 0xffffffffffffffff
329 class Error(Exception):
330 """ `Error` class, derived from `Exception` """
331 def __init__(self, message, errno=None):
332 super(Exception, self).__init__(message)
336 msg = super(Exception, self).__str__()
337 if self.errno is None:
339 return '[errno {0}] {1}'.format(self.errno, msg)
341 def __reduce__(self):
342 return (self.__class__, (self.message, self.errno))
344 class InvalidArgumentError(Error):
347 class OSError(Error):
348 """ `OSError` class, derived from `Error` """
351 class InterruptedOrTimeoutError(OSError):
352 """ `InterruptedOrTimeoutError` class, derived from `OSError` """
356 class PermissionError(OSError):
357 """ `PermissionError` class, derived from `OSError` """
361 class PermissionDeniedError(OSError):
362 """ deal with EACCES related. """
366 class ObjectNotFound(OSError):
367 """ `ObjectNotFound` class, derived from `OSError` """
371 class NoData(OSError):
372 """ `NoData` class, derived from `OSError` """
376 class ObjectExists(OSError):
377 """ `ObjectExists` class, derived from `OSError` """
381 class ObjectBusy(OSError):
382 """ `ObjectBusy` class, derived from `IOError` """
386 class IOError(OSError):
387 """ `ObjectBusy` class, derived from `OSError` """
391 class NoSpace(OSError):
392 """ `NoSpace` class, derived from `OSError` """
396 class RadosStateError(Error):
397 """ `RadosStateError` class, derived from `Error` """
401 class IoctxStateError(Error):
402 """ `IoctxStateError` class, derived from `Error` """
406 class ObjectStateError(Error):
407 """ `ObjectStateError` class, derived from `Error` """
411 class LogicError(Error):
412 """ `` class, derived from `Error` """
416 class TimedOut(OSError):
417 """ `TimedOut` class, derived from `OSError` """
421 IF UNAME_SYSNAME == "FreeBSD":
422 cdef errno_to_exception = {
423 errno.EPERM : PermissionError,
424 errno.ENOENT : ObjectNotFound,
426 errno.ENOSPC : NoSpace,
427 errno.EEXIST : ObjectExists,
428 errno.EBUSY : ObjectBusy,
429 errno.ENOATTR : NoData,
430 errno.EINTR : InterruptedOrTimeoutError,
431 errno.ETIMEDOUT : TimedOut,
432 errno.EACCES : PermissionDeniedError,
433 errno.EINVAL : InvalidArgumentError,
436 cdef errno_to_exception = {
437 errno.EPERM : PermissionError,
438 errno.ENOENT : ObjectNotFound,
440 errno.ENOSPC : NoSpace,
441 errno.EEXIST : ObjectExists,
442 errno.EBUSY : ObjectBusy,
443 errno.ENODATA : NoData,
444 errno.EINTR : InterruptedOrTimeoutError,
445 errno.ETIMEDOUT : TimedOut,
446 errno.EACCES : PermissionDeniedError,
447 errno.EINVAL : InvalidArgumentError,
451 cdef make_ex(ret, msg):
453 Translate a librados return code into an exception.
455 :param ret: the return code
457 :param msg: the error message to use
459 :returns: a subclass of :class:`Error`
462 if ret in errno_to_exception:
463 return errno_to_exception[ret](msg, errno=ret)
465 return OSError(msg, errno=ret)
468 # helper to specify an optional argument, where in addition to `cls`, `None`
474 # validate argument types of an instance method
475 # kwargs is an un-ordered dict, so use args instead
476 def requires(*types):
477 def is_type_of(v, t):
481 return isinstance(v, t)
483 def check_type(val, arg_name, arg_type):
484 if isinstance(arg_type, tuple):
485 if any(is_type_of(val, t) for t in arg_type):
487 type_names = ' or '.join('None' if t is None else t.__name__
489 raise TypeError('%s must be %s' % (arg_name, type_names))
491 if is_type_of(val, arg_type):
493 assert(arg_type is not None)
494 raise TypeError('%s must be %s' % (arg_name, arg_type.__name__))
497 # FIXME(sileht): this stop with
498 # AttributeError: 'method_descriptor' object has no attribute '__module__'
500 def validate_func(*args, **kwargs):
501 # ignore the `self` arg
502 pos_args = zip(args[1:], types)
503 named_args = ((kwargs[name], (name, spec)) for name, spec in types
505 for arg_val, (arg_name, arg_type) in chain(pos_args, named_args):
506 check_type(arg_val, arg_name, arg_type)
507 return f(*args, **kwargs)
512 def cstr(val, name, encoding="utf-8", opt=False):
514 Create a byte string from a Python string
516 :param basestring val: Python string
517 :param str name: Name of the string parameter, for exceptions
518 :param str encoding: Encoding to use
519 :param bool opt: If True, None is allowed
521 :raises: :class:`InvalidArgument`
523 if opt and val is None:
525 if isinstance(val, bytes):
527 elif isinstance(val, unicode):
528 return val.encode(encoding)
530 raise TypeError('%s must be a string' % name)
533 def cstr_list(list_str, name, encoding="utf-8"):
534 return [cstr(s, name) for s in list_str]
537 def decode_cstr(val, encoding="utf-8"):
539 Decode a byte string into a Python string.
541 :param bytes val: byte string
542 :rtype: unicode or None
547 return val.decode(encoding)
550 cdef char* opt_str(s) except? NULL:
556 cdef void* realloc_chk(void* ptr, size_t size) except NULL:
557 cdef void *ret = realloc(ptr, size)
559 raise MemoryError("realloc failed")
563 cdef size_t * to_csize_t_array(list_int):
564 cdef size_t *ret = <size_t *>malloc(len(list_int) * sizeof(size_t))
566 raise MemoryError("malloc failed")
567 for i in xrange(len(list_int)):
568 ret[i] = <size_t>list_int[i]
572 cdef char ** to_bytes_array(list_bytes):
573 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
575 raise MemoryError("malloc failed")
576 for i in xrange(len(list_bytes)):
577 ret[i] = <char *>list_bytes[i]
582 cdef int __monitor_callback(void *arg, const char *line, const char *who,
583 uint64_t sec, uint64_t nsec, uint64_t seq,
584 const char *level, const char *msg) with gil:
585 cdef object cb_info = <object>arg
586 cb_info[0](cb_info[1], line, who, sec, nsec, seq, level, msg)
589 cdef int __monitor_callback2(void *arg, const char *line, const char *channel,
592 uint64_t sec, uint64_t nsec, uint64_t seq,
593 const char *level, const char *msg) with gil:
594 cdef object cb_info = <object>arg
595 cb_info[0](cb_info[1], line, channel, name, who, sec, nsec, seq, level, msg)
599 class Version(object):
600 """ Version information """
601 def __init__(self, major, minor, extra):
607 return "%d.%d.%d" % (self.major, self.minor, self.extra)
610 cdef class Rados(object):
611 """This class wraps librados functions"""
612 # NOTE(sileht): attributes declared in .pyd
614 def __init__(self, *args, **kwargs):
616 self.__setup(*args, **kwargs)
618 @requires(('rados_id', opt(str_type)), ('name', opt(str_type)), ('clustername', opt(str_type)),
619 ('conffile', opt(str_type)))
620 def __setup(self, rados_id=None, name=None, clustername=None,
621 conf_defaults=None, conffile=None, conf=None, flags=0,
623 self.monitor_callback = None
624 self.monitor_callback2 = None
625 self.parsed_args = []
626 self.conf_defaults = conf_defaults
627 self.conffile = conffile
628 self.rados_id = rados_id
630 if rados_id and name:
631 raise Error("Rados(): can't supply both rados_id and name")
633 name = 'client.' + rados_id
635 name = 'client.admin'
636 if clustername is None:
639 name = cstr(name, 'name')
640 clustername = cstr(clustername, 'clustername')
643 char *_clustername = clustername
648 # Unpack void* (aka rados_config_t) from capsule
649 rados_config = <rados_config_t> PyCapsule_GetPointer(context, NULL)
651 ret = rados_create_with_context(&self.cluster, rados_config)
654 ret = rados_create2(&self.cluster, _clustername, _name, _flags)
656 raise Error("rados_initialize failed with error code: %d" % ret)
658 self.state = "configuring"
659 # order is important: conf_defaults, then conffile, then conf
661 for key, value in conf_defaults.items():
662 self.conf_set(key, value)
663 if conffile is not None:
664 # read the default conf file when '' is given
667 self.conf_read_file(conffile)
669 for key, value in conf.items():
670 self.conf_set(key, value)
672 def require_state(self, *args):
674 Checks if the Rados object is in a special state
676 :raises: RadosStateError
678 if self.state in args:
680 raise RadosStateError("You cannot perform that operation on a \
681 Rados object in state %s." % self.state)
685 Disconnects from the cluster. Call this explicitly when a
686 Rados.connect()ed object is no longer used.
688 if self.state != "shutdown":
690 rados_shutdown(self.cluster)
691 self.state = "shutdown"
697 def __exit__(self, type_, value, traceback):
703 Get the version number of the ``librados`` C library.
705 :returns: a tuple of ``(major, minor, extra)`` components of the
712 rados_version(&major, &minor, &extra)
713 return Version(major, minor, extra)
715 @requires(('path', opt(str_type)))
716 def conf_read_file(self, path=None):
718 Configure the cluster handle using a Ceph config file.
720 :param path: path to the config file
723 self.require_state("configuring", "connected")
724 path = cstr(path, 'path', opt=True)
726 char *_path = opt_str(path)
728 ret = rados_conf_read_file(self.cluster, _path)
730 raise make_ex(ret, "error calling conf_read_file")
732 def conf_parse_argv(self, args):
734 Parse known arguments from args, and remove; returned
735 args contain only those unknown to ceph
737 self.require_state("configuring", "connected")
741 cargs = cstr_list(args, 'args')
743 int _argc = len(args)
744 char **_argv = to_bytes_array(cargs)
745 char **_remargv = NULL
748 _remargv = <char **>malloc(_argc * sizeof(char *))
750 ret = rados_conf_parse_argv_remainder(self.cluster, _argc,
752 <const char**>_remargv)
754 raise make_ex(ret, "error calling conf_parse_argv_remainder")
756 # _remargv was allocated with fixed argc; collapse return
757 # list to eliminate any missing args
758 retargs = [decode_cstr(a) for a in _remargv[:_argc]
760 self.parsed_args = args
766 def conf_parse_env(self, var='CEPH_ARGS'):
768 Parse known arguments from an environment variable, normally
771 self.require_state("configuring", "connected")
775 var = cstr(var, 'var')
779 ret = rados_conf_parse_env(self.cluster, _var)
781 raise make_ex(ret, "error calling conf_parse_env")
783 @requires(('option', str_type))
784 def conf_get(self, option):
786 Get the value of a configuration option
788 :param option: which option to read
791 :returns: str - value of the option or None
792 :raises: :class:`TypeError`
794 self.require_state("configuring", "connected")
795 option = cstr(option, 'option')
797 char *_option = option
803 ret_buf = <char *>realloc_chk(ret_buf, length)
805 ret = rados_conf_get(self.cluster, _option, ret_buf, length)
807 return decode_cstr(ret_buf)
808 elif ret == -errno.ENAMETOOLONG:
810 elif ret == -errno.ENOENT:
813 raise make_ex(ret, "error calling conf_get")
817 @requires(('option', str_type), ('val', str_type))
818 def conf_set(self, option, val):
820 Set the value of a configuration option
822 :param option: which option to set
824 :param option: value of the option
827 :raises: :class:`TypeError`, :class:`ObjectNotFound`
829 self.require_state("configuring", "connected")
830 option = cstr(option, 'option')
831 val = cstr(val, 'val')
833 char *_option = option
837 ret = rados_conf_set(self.cluster, _option, _val)
839 raise make_ex(ret, "error calling conf_set")
841 def ping_monitor(self, mon_id):
843 Ping a monitor to assess liveness
845 May be used as a simply way to assess liveness, or to obtain
846 information about the monitor in a simple way even in the
849 :param mon_id: the ID portion of the monitor's name (i.e., mon.<ID>)
851 :returns: the string reply from the monitor
854 self.require_state("configuring", "connected")
856 mon_id = cstr(mon_id, 'mon_id')
858 char *_mon_id = mon_id
863 ret = rados_ping_monitor(self.cluster, _mon_id, &outstr, &outstrlen)
866 raise make_ex(ret, "error calling ping_monitor")
869 my_outstr = outstr[:outstrlen]
870 rados_buffer_free(outstr)
871 return decode_cstr(my_outstr)
873 def connect(self, timeout=0):
875 Connect to the cluster. Use shutdown() to release resources.
877 self.require_state("configuring")
878 # NOTE(sileht): timeout was supported by old python API,
879 # but this is not something available in C API, so ignore
880 # for now and remove it later
882 ret = rados_connect(self.cluster)
884 raise make_ex(ret, "error connecting to the cluster")
885 self.state = "connected"
887 def get_cluster_stats(self):
889 Read usage info about the cluster
891 This tells you total space, space used, space available, and number
892 of objects. These are not updated immediately when data is written,
893 they are eventually consistent.
895 :returns: dict - contains the following keys:
897 - ``kb`` (int) - total space
899 - ``kb_used`` (int) - space used
901 - ``kb_avail`` (int) - free space available
903 - ``num_objects`` (int) - number of objects
907 rados_cluster_stat_t stats
910 ret = rados_cluster_stat(self.cluster, &stats)
914 ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id)
915 return {'kb': stats.kb,
916 'kb_used': stats.kb_used,
917 'kb_avail': stats.kb_avail,
918 'num_objects': stats.num_objects}
920 @requires(('pool_name', str_type))
921 def pool_exists(self, pool_name):
923 Checks if a given pool exists.
925 :param pool_name: name of the pool to check
928 :raises: :class:`TypeError`, :class:`Error`
929 :returns: bool - whether the pool exists, false otherwise.
931 self.require_state("connected")
933 pool_name = cstr(pool_name, 'pool_name')
935 char *_pool_name = pool_name
938 ret = rados_pool_lookup(self.cluster, _pool_name)
941 elif ret == -errno.ENOENT:
944 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
946 @requires(('pool_name', str_type))
947 def pool_lookup(self, pool_name):
949 Returns a pool's ID based on its name.
951 :param pool_name: name of the pool to look up
954 :raises: :class:`TypeError`, :class:`Error`
955 :returns: int - pool ID, or None if it doesn't exist
957 self.require_state("connected")
958 pool_name = cstr(pool_name, 'pool_name')
960 char *_pool_name = pool_name
963 ret = rados_pool_lookup(self.cluster, _pool_name)
966 elif ret == -errno.ENOENT:
969 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
971 @requires(('pool_id', int))
972 def pool_reverse_lookup(self, pool_id):
974 Returns a pool's name based on its ID.
976 :param pool_id: ID of the pool to look up
979 :raises: :class:`TypeError`, :class:`Error`
980 :returns: string - pool name, or None if it doesn't exist
982 self.require_state("connected")
984 int64_t _pool_id = pool_id
990 name = <char *>realloc_chk(name, size)
992 ret = rados_pool_reverse_lookup(self.cluster, _pool_id, name, size)
995 elif ret != -errno.ERANGE and size <= 4096:
997 elif ret == -errno.ENOENT:
1000 raise make_ex(ret, "error reverse looking up pool '%s'" % pool_id)
1002 return decode_cstr(name)
1007 @requires(('pool_name', str_type), ('auid', opt(int)), ('crush_rule', opt(int)))
1008 def create_pool(self, pool_name, auid=None, crush_rule=None):
1011 - with default settings: if auid=None and crush_rule=None
1012 - owned by a specific auid: auid given and crush_rule=None
1013 - with a specific CRUSH rule: if auid=None and crush_rule given
1014 - with a specific CRUSH rule and auid: if auid and crush_rule given
1016 :param pool_name: name of the pool to create
1017 :type pool_name: str
1018 :param auid: the id of the owner of the new pool
1020 :param crush_rule: rule to use for placement in the new pool
1021 :type crush_rule: int
1023 :raises: :class:`TypeError`, :class:`Error`
1025 self.require_state("connected")
1027 pool_name = cstr(pool_name, 'pool_name')
1029 char *_pool_name = pool_name
1033 if auid is None and crush_rule is None:
1035 ret = rados_pool_create(self.cluster, _pool_name)
1037 _crush_rule = crush_rule
1039 ret = rados_pool_create_with_crush_rule(self.cluster, _pool_name, _crush_rule)
1040 elif crush_rule is None:
1043 ret = rados_pool_create_with_auid(self.cluster, _pool_name, _auid)
1046 _crush_rule = crush_rule
1048 ret = rados_pool_create_with_all(self.cluster, _pool_name, _auid, _crush_rule)
1050 raise make_ex(ret, "error creating pool '%s'" % pool_name)
1052 @requires(('pool_id', int))
1053 def get_pool_base_tier(self, pool_id):
1057 :returns: base pool, or pool_id if tiering is not configured for the pool
1059 self.require_state("connected")
1061 int64_t base_tier = 0
1062 int64_t _pool_id = pool_id
1065 ret = rados_pool_get_base_tier(self.cluster, _pool_id, &base_tier)
1067 raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id)
1068 return int(base_tier)
1070 @requires(('pool_name', str_type))
1071 def delete_pool(self, pool_name):
1073 Delete a pool and all data inside it.
1075 The pool is removed from the cluster immediately,
1076 but the actual data is deleted in the background.
1078 :param pool_name: name of the pool to delete
1079 :type pool_name: str
1081 :raises: :class:`TypeError`, :class:`Error`
1083 self.require_state("connected")
1085 pool_name = cstr(pool_name, 'pool_name')
1087 char *_pool_name = pool_name
1090 ret = rados_pool_delete(self.cluster, _pool_name)
1092 raise make_ex(ret, "error deleting pool '%s'" % pool_name)
1094 @requires(('pool_id', int))
1095 def get_inconsistent_pgs(self, pool_id):
1097 List inconsistent placement groups in the given pool
1099 :param pool_id: ID of the pool in which PGs are listed
1101 :returns: list - inconsistent placement groups
1103 self.require_state("connected")
1105 int64_t pool = pool_id
1111 pgs = <char *>realloc_chk(pgs, size);
1113 ret = rados_inconsistent_pg_list(self.cluster, pool,
1120 raise make_ex(ret, "error calling inconsistent_pg_list")
1121 return [pg for pg in decode_cstr(pgs[:ret]).split('\0') if pg]
1125 def list_pools(self):
1127 Gets a list of pool names.
1129 :returns: list - of pool names.
1131 self.require_state("connected")
1134 char *c_names = NULL
1138 c_names = <char *>realloc_chk(c_names, size)
1140 ret = rados_pool_list(self.cluster, c_names, size)
1145 return [name for name in decode_cstr(c_names[:ret]).split('\0')
1152 Get the fsid of the cluster as a hexadecimal string.
1154 :raises: :class:`Error`
1155 :returns: str - cluster fsid
1157 self.require_state("connected")
1161 PyObject* ret_s = NULL
1163 ret_s = PyBytes_FromStringAndSize(NULL, buf_len)
1165 ret_buf = PyBytes_AsString(ret_s)
1167 ret = rados_cluster_fsid(self.cluster, ret_buf, buf_len)
1169 raise make_ex(ret, "error getting cluster fsid")
1170 if ret != <int>buf_len:
1171 _PyBytes_Resize(&ret_s, ret)
1172 return <object>ret_s
1174 # We DECREF unconditionally: the cast to object above will have
1175 # INCREFed if necessary. This also takes care of exceptions,
1176 # including if _PyString_Resize fails (that will free the string
1177 # itself and set ret_s to NULL, hence XDECREF).
1178 ref.Py_XDECREF(ret_s)
1180 @requires(('ioctx_name', str_type))
1181 def open_ioctx(self, ioctx_name):
1183 Create an io context
1185 The io context allows you to perform operations within a particular
1188 :param ioctx_name: name of the pool
1189 :type ioctx_name: str
1191 :raises: :class:`TypeError`, :class:`Error`
1192 :returns: Ioctx - Rados Ioctx object
1194 self.require_state("connected")
1195 ioctx_name = cstr(ioctx_name, 'ioctx_name')
1198 char *_ioctx_name = ioctx_name
1200 ret = rados_ioctx_create(self.cluster, _ioctx_name, &ioctx)
1202 raise make_ex(ret, "error opening pool '%s'" % ioctx_name)
1203 io = Ioctx(ioctx_name)
1207 def mon_command(self, cmd, inbuf, timeout=0, target=None):
1209 mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen)
1210 returns (int ret, string outbuf, string outs)
1212 # NOTE(sileht): timeout is ignored because C API doesn't provide
1213 # timeout argument, but we keep it for backward compat with old python binding
1215 self.require_state("connected")
1216 cmd = cstr_list(cmd, 'c')
1218 if isinstance(target, int):
1219 # NOTE(sileht): looks weird but test_monmap_dump pass int
1220 target = str(target)
1222 target = cstr(target, 'target', opt=True)
1223 inbuf = cstr(inbuf, 'inbuf')
1226 char *_target = opt_str(target)
1227 char **_cmd = to_bytes_array(cmd)
1228 size_t _cmdlen = len(cmd)
1230 char *_inbuf = inbuf
1231 size_t _inbuf_len = len(inbuf)
1241 ret = rados_mon_command_target(self.cluster, _target,
1242 <const char **>_cmd, _cmdlen,
1243 <const char*>_inbuf, _inbuf_len,
1244 &_outbuf, &_outbuf_len,
1248 ret = rados_mon_command(self.cluster,
1249 <const char **>_cmd, _cmdlen,
1250 <const char*>_inbuf, _inbuf_len,
1251 &_outbuf, &_outbuf_len,
1254 my_outs = decode_cstr(_outs[:_outs_len])
1255 my_outbuf = _outbuf[:_outbuf_len]
1257 rados_buffer_free(_outs)
1259 rados_buffer_free(_outbuf)
1260 return (ret, my_outbuf, my_outs)
1264 def osd_command(self, osdid, cmd, inbuf, timeout=0):
1266 osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1267 returns (int ret, string outbuf, string outs)
1269 # NOTE(sileht): timeout is ignored because C API doesn't provide
1270 # timeout argument, but we keep it for backward compat with old python binding
1271 self.require_state("connected")
1273 cmd = cstr_list(cmd, 'cmd')
1274 inbuf = cstr(inbuf, 'inbuf')
1278 char **_cmd = to_bytes_array(cmd)
1279 size_t _cmdlen = len(cmd)
1281 char *_inbuf = inbuf
1282 size_t _inbuf_len = len(inbuf)
1291 ret = rados_osd_command(self.cluster, _osdid,
1292 <const char **>_cmd, _cmdlen,
1293 <const char*>_inbuf, _inbuf_len,
1294 &_outbuf, &_outbuf_len,
1297 my_outs = decode_cstr(_outs[:_outs_len])
1298 my_outbuf = _outbuf[:_outbuf_len]
1300 rados_buffer_free(_outs)
1302 rados_buffer_free(_outbuf)
1303 return (ret, my_outbuf, my_outs)
1307 def mgr_command(self, cmd, inbuf, timeout=0):
1309 returns (int ret, string outbuf, string outs)
1311 # NOTE(sileht): timeout is ignored because C API doesn't provide
1312 # timeout argument, but we keep it for backward compat with old python binding
1313 self.require_state("connected")
1315 cmd = cstr_list(cmd, 'cmd')
1316 inbuf = cstr(inbuf, 'inbuf')
1319 char **_cmd = to_bytes_array(cmd)
1320 size_t _cmdlen = len(cmd)
1322 char *_inbuf = inbuf
1323 size_t _inbuf_len = len(inbuf)
1332 ret = rados_mgr_command(self.cluster,
1333 <const char **>_cmd, _cmdlen,
1334 <const char*>_inbuf, _inbuf_len,
1335 &_outbuf, &_outbuf_len,
1338 my_outs = decode_cstr(_outs[:_outs_len])
1339 my_outbuf = _outbuf[:_outbuf_len]
1341 rados_buffer_free(_outs)
1343 rados_buffer_free(_outbuf)
1344 return (ret, my_outbuf, my_outs)
1348 def pg_command(self, pgid, cmd, inbuf, timeout=0):
1350 pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1351 returns (int ret, string outbuf, string outs)
1353 # NOTE(sileht): timeout is ignored because C API doesn't provide
1354 # timeout argument, but we keep it for backward compat with old python binding
1355 self.require_state("connected")
1357 pgid = cstr(pgid, 'pgid')
1358 cmd = cstr_list(cmd, 'cmd')
1359 inbuf = cstr(inbuf, 'inbuf')
1363 char **_cmd = to_bytes_array(cmd)
1364 size_t _cmdlen = len(cmd)
1366 char *_inbuf = inbuf
1367 size_t _inbuf_len = len(inbuf)
1376 ret = rados_pg_command(self.cluster, _pgid,
1377 <const char **>_cmd, _cmdlen,
1378 <const char *>_inbuf, _inbuf_len,
1379 &_outbuf, &_outbuf_len,
1382 my_outs = decode_cstr(_outs[:_outs_len])
1383 my_outbuf = _outbuf[:_outbuf_len]
1385 rados_buffer_free(_outs)
1387 rados_buffer_free(_outbuf)
1388 return (ret, my_outbuf, my_outs)
1392 def wait_for_latest_osdmap(self):
1393 self.require_state("connected")
1395 ret = rados_wait_for_latest_osdmap(self.cluster)
1398 def blacklist_add(self, client_address, expire_seconds=0):
1400 Blacklist a client from the OSDs
1402 :param client_address: client address
1403 :type client_address: str
1404 :param expire_seconds: number of seconds to blacklist
1405 :type expire_seconds: int
1407 :raises: :class:`Error`
1409 self.require_state("connected")
1410 client_address = cstr(client_address, 'client_address')
1412 uint32_t _expire_seconds = expire_seconds
1413 char *_client_address = client_address
1416 ret = rados_blacklist_add(self.cluster, _client_address, _expire_seconds)
1418 raise make_ex(ret, "error blacklisting client '%s'" % client_address)
1420 def monitor_log(self, level, callback, arg):
1421 if level not in MONITOR_LEVELS:
1422 raise LogicError("invalid monitor level " + level)
1423 if callback is not None and not callable(callback):
1424 raise LogicError("callback must be a callable function or None")
1426 level = cstr(level, 'level')
1427 cdef char *_level = level
1429 if callback is None:
1431 r = rados_monitor_log(self.cluster, <const char*>_level, NULL, NULL)
1432 self.monitor_callback = None
1433 self.monitor_callback2 = None
1436 cb = (callback, arg)
1437 cdef PyObject* _arg = <PyObject*>cb
1439 r = rados_monitor_log(self.cluster, <const char*>_level,
1440 <rados_log_callback_t>&__monitor_callback, _arg)
1443 raise make_ex(r, 'error calling rados_monitor_log')
1444 # NOTE(sileht): Prevents the callback method from being garbage collected
1445 self.monitor_callback = cb
1446 self.monitor_callback2 = None
1448 def monitor_log2(self, level, callback, arg):
1449 if level not in MONITOR_LEVELS:
1450 raise LogicError("invalid monitor level " + level)
1451 if callback is not None and not callable(callback):
1452 raise LogicError("callback must be a callable function or None")
1454 level = cstr(level, 'level')
1455 cdef char *_level = level
1457 if callback is None:
1459 r = rados_monitor_log2(self.cluster, <const char*>_level, NULL, NULL)
1460 self.monitor_callback = None
1461 self.monitor_callback2 = None
1464 cb = (callback, arg)
1465 cdef PyObject* _arg = <PyObject*>cb
1467 r = rados_monitor_log2(self.cluster, <const char*>_level,
1468 <rados_log_callback2_t>&__monitor_callback2, _arg)
1471 raise make_ex(r, 'error calling rados_monitor_log')
1472 # NOTE(sileht): Prevents the callback method from being garbage collected
1473 self.monitor_callback = None
1474 self.monitor_callback2 = cb
1477 cdef class OmapIterator(object):
1480 cdef public Ioctx ioctx
1481 cdef rados_omap_iter_t ctx
1483 def __cinit__(self, Ioctx ioctx):
1491 Get the next key-value pair in the object
1492 :returns: next rados.OmapItem
1500 ret = rados_omap_get_next(self.ctx, &key_, &val_, &len_)
1503 raise make_ex(ret, "error iterating over the omap")
1505 raise StopIteration()
1506 key = decode_cstr(key_)
1512 def __dealloc__(self):
1514 rados_omap_get_end(self.ctx)
1517 cdef class ObjectIterator(object):
1518 """rados.Ioctx Object iterator"""
1520 cdef rados_list_ctx_t ctx
1522 cdef public object ioctx
1524 def __cinit__(self, Ioctx ioctx):
1528 ret = rados_nobjects_list_open(ioctx.io, &self.ctx)
1530 raise make_ex(ret, "error iterating over the objects in ioctx '%s'"
1538 Get the next object name and locator in the pool
1540 :raises: StopIteration
1541 :returns: next rados.Ioctx Object
1544 const char *key_ = NULL
1545 const char *locator_ = NULL
1546 const char *nspace_ = NULL
1549 ret = rados_nobjects_list_next(self.ctx, &key_, &locator_, &nspace_)
1552 raise StopIteration()
1554 key = decode_cstr(key_)
1555 locator = decode_cstr(locator_) if locator_ != NULL else None
1556 nspace = decode_cstr(nspace_) if nspace_ != NULL else None
1557 return Object(self.ioctx, key, locator, nspace)
1559 def __dealloc__(self):
1561 rados_nobjects_list_close(self.ctx)
1564 cdef class XattrIterator(object):
1565 """Extended attribute iterator"""
1567 cdef rados_xattrs_iter_t it
1570 cdef public Ioctx ioctx
1571 cdef public object oid
1573 def __cinit__(self, Ioctx ioctx, oid):
1575 self.oid = cstr(oid, 'oid')
1576 self._oid = self.oid
1579 ret = rados_getxattrs(ioctx.io, self._oid, &self.it)
1581 raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid)
1588 Get the next xattr on the object
1590 :raises: StopIteration
1591 :returns: pair - of name and value of the next Xattr
1594 const char *name_ = NULL
1595 const char *val_ = NULL
1599 ret = rados_getxattrs_next(self.it, &name_, &val_, &len_)
1601 raise make_ex(ret, "error iterating over the extended attributes \
1602 in '%s'" % self.oid)
1604 raise StopIteration()
1605 name = decode_cstr(name_)
1609 def __dealloc__(self):
1611 rados_getxattrs_end(self.it)
1614 cdef class SnapIterator(object):
1615 """Snapshot iterator"""
1617 cdef public Ioctx ioctx
1619 cdef rados_snap_t *snaps
1623 def __cinit__(self, Ioctx ioctx):
1625 # We don't know how big a buffer we need until we've called the
1626 # function. So use the exponential doubling strategy.
1627 cdef int num_snaps = 10
1629 self.snaps = <rados_snap_t*>realloc_chk(self.snaps,
1631 sizeof(rados_snap_t))
1634 ret = rados_ioctx_snap_list(ioctx.io, self.snaps, num_snaps)
1638 elif ret != -errno.ERANGE:
1639 raise make_ex(ret, "error calling rados_snap_list for \
1640 ioctx '%s'" % self.ioctx.name)
1641 num_snaps = num_snaps * 2
1649 Get the next Snapshot
1651 :raises: :class:`Error`, StopIteration
1652 :returns: Snap - next snapshot
1654 if self.cur_snap >= self.max_snap:
1658 rados_snap_t snap_id = self.snaps[self.cur_snap]
1664 name = <char *>realloc_chk(name, name_len)
1666 ret = rados_ioctx_snap_get_name(self.ioctx.io, snap_id, name, name_len)
1669 elif ret != -errno.ERANGE:
1670 raise make_ex(ret, "rados_snap_get_name error")
1672 name_len = name_len * 2
1674 snap = Snap(self.ioctx, decode_cstr(name[:name_len]).rstrip('\0'), snap_id)
1675 self.cur_snap = self.cur_snap + 1
1681 cdef class Snap(object):
1682 """Snapshot object"""
1683 cdef public Ioctx ioctx
1684 cdef public object name
1686 # NOTE(sileht): old API was storing the ctypes object
1687 # instead of the value ....
1688 cdef public rados_snap_t snap_id
1690 def __cinit__(self, Ioctx ioctx, object name, rados_snap_t snap_id):
1693 self.snap_id = snap_id
1696 return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \
1697 % (str(self.ioctx), self.name, self.snap_id)
1699 def get_timestamp(self):
1701 Find when a snapshot in the current pool occurred
1703 :raises: :class:`Error`
1704 :returns: datetime - the data and time the snapshot was created
1706 cdef time_t snap_time
1709 ret = rados_ioctx_snap_get_stamp(self.ioctx.io, self.snap_id, &snap_time)
1711 raise make_ex(ret, "rados_ioctx_snap_get_stamp error")
1712 return datetime.fromtimestamp(snap_time)
1715 cdef class Completion(object):
1716 """completion object"""
1724 rados_callback_t complete_cb
1725 rados_callback_t safe_cb
1726 rados_completion_t rados_comp
1729 def __cinit__(self, Ioctx ioctx, object oncomplete, object onsafe):
1730 self.oncomplete = oncomplete
1731 self.onsafe = onsafe
1736 Is an asynchronous operation safe?
1738 This does not imply that the safe callback has finished.
1740 :returns: True if the operation is safe
1743 ret = rados_aio_is_safe(self.rados_comp)
1746 def is_complete(self):
1748 Has an asynchronous operation completed?
1750 This does not imply that the safe callback has finished.
1752 :returns: True if the operation is completed
1755 ret = rados_aio_is_complete(self.rados_comp)
1758 def wait_for_safe(self):
1760 Wait for an asynchronous operation to be marked safe
1762 This does not imply that the safe callback has finished.
1765 rados_aio_wait_for_safe(self.rados_comp)
1767 def wait_for_complete(self):
1769 Wait for an asynchronous operation to complete
1771 This does not imply that the complete callback has finished.
1774 rados_aio_wait_for_complete(self.rados_comp)
1776 def wait_for_safe_and_cb(self):
1778 Wait for an asynchronous operation to be marked safe and for
1779 the safe callback to have returned
1782 rados_aio_wait_for_safe_and_cb(self.rados_comp)
1784 def wait_for_complete_and_cb(self):
1786 Wait for an asynchronous operation to complete and for the
1787 complete callback to have returned
1789 :returns: whether the operation is completed
1792 ret = rados_aio_wait_for_complete_and_cb(self.rados_comp)
1795 def get_return_value(self):
1797 Get the return value of an asychronous operation
1799 The return value is set when the operation is complete or safe,
1800 whichever comes first.
1802 :returns: int - return value of the operation
1805 ret = rados_aio_get_return_value(self.rados_comp)
1808 def __dealloc__(self):
1810 Release a completion
1812 Call this when you no longer need the completion. It may not be
1813 freed immediately if the operation is not acked and committed.
1815 ref.Py_XDECREF(self.buf)
1817 if self.rados_comp != NULL:
1819 rados_aio_release(self.rados_comp)
1820 self.rados_comp = NULL
1822 def _complete(self):
1823 self.oncomplete(self)
1824 with self.ioctx.lock:
1826 self.ioctx.complete_completions.remove(self)
1830 with self.ioctx.lock:
1832 self.ioctx.safe_completions.remove(self)
1835 with self.ioctx.lock:
1837 self.ioctx.complete_completions.remove(self)
1839 self.ioctx.safe_completions.remove(self)
1842 class OpCtx(object):
1843 def __enter__(self):
1844 return self.create()
1846 def __exit__(self, type, msg, traceback):
1850 cdef class WriteOp(object):
1851 cdef rados_write_op_t write_op
1855 self.write_op = rados_create_write_op()
1860 rados_release_write_op(self.write_op)
1862 @requires(('exclusive', opt(int)))
1863 def new(self, exclusive=None):
1869 int _exclusive = exclusive
1872 rados_write_op_create(self.write_op, _exclusive, NULL)
1880 rados_write_op_remove(self.write_op)
1882 @requires(('flags', int))
1883 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
1885 Set flags for the last operation added to this write_op.
1886 :para flags: flags to apply to the last operation
1894 rados_write_op_set_flags(self.write_op, _flags)
1896 @requires(('to_write', bytes))
1897 def append(self, to_write):
1899 Append data to an object synchronously
1900 :param to_write: data to write
1901 :type to_write: bytes
1905 char *_to_write = to_write
1906 size_t length = len(to_write)
1909 rados_write_op_append(self.write_op, _to_write, length)
1911 @requires(('to_write', bytes))
1912 def write_full(self, to_write):
1914 Write whole object, atomically replacing it.
1915 :param to_write: data to write
1916 :type to_write: bytes
1920 char *_to_write = to_write
1921 size_t length = len(to_write)
1924 rados_write_op_write_full(self.write_op, _to_write, length)
1926 @requires(('to_write', bytes), ('offset', int))
1927 def write(self, to_write, offset=0):
1930 :param to_write: data to write
1931 :type to_write: bytes
1932 :param offset: byte offset in the object to begin writing at
1937 char *_to_write = to_write
1938 size_t length = len(to_write)
1939 uint64_t _offset = offset
1942 rados_write_op_write(self.write_op, _to_write, length, _offset)
1944 @requires(('offset', int), ('length', int))
1945 def zero(self, offset, length):
1947 Zero part of an object.
1948 :param offset: byte offset in the object to begin writing at
1950 :param offset: number of zero to write
1955 size_t _length = length
1956 uint64_t _offset = offset
1959 rados_write_op_zero(self.write_op, _length, _offset)
1961 @requires(('offset', int))
1962 def truncate(self, offset):
1965 :param offset: byte offset in the object to begin truncating at
1970 uint64_t _offset = offset
1973 rados_write_op_truncate(self.write_op, _offset)
1976 class WriteOpCtx(WriteOp, OpCtx):
1977 """write operation context manager"""
1980 cdef class ReadOp(object):
1981 cdef rados_read_op_t read_op
1985 self.read_op = rados_create_read_op()
1990 rados_release_read_op(self.read_op)
1992 @requires(('flags', int))
1993 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
1995 Set flags for the last operation added to this read_op.
1996 :para flags: flags to apply to the last operation
2004 rados_read_op_set_flags(self.read_op, _flags)
2007 class ReadOpCtx(ReadOp, OpCtx):
2008 """read operation context manager"""
2011 cdef int __aio_safe_cb(rados_completion_t completion, void *args) with gil:
2013 Callback to onsafe() for asynchronous operations
2015 cdef object cb = <object>args
2020 cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
2022 Callback to oncomplete() for asynchronous operations
2024 cdef object cb = <object>args
2029 cdef class Ioctx(object):
2030 """rados.Ioctx object"""
2031 # NOTE(sileht): attributes declared in .pyd
2033 def __init__(self, name):
2037 self.locator_key = ""
2039 self.lock = threading.Lock()
2040 self.safe_completions = []
2041 self.complete_completions = []
2043 def __enter__(self):
2046 def __exit__(self, type_, value, traceback):
2050 def __dealloc__(self):
2053 def __track_completion(self, completion_obj):
2054 if completion_obj.oncomplete:
2056 self.complete_completions.append(completion_obj)
2057 if completion_obj.onsafe:
2059 self.safe_completions.append(completion_obj)
2061 def __get_completion(self, oncomplete, onsafe):
2063 Constructs a completion to use with asynchronous operations
2065 :param oncomplete: what to do when the write is safe and complete in memory
2067 :type oncomplete: completion
2068 :param onsafe: what to do when the write is safe and complete on storage
2070 :type onsafe: completion
2072 :raises: :class:`Error`
2073 :returns: completion object
2076 completion_obj = Completion(self, oncomplete, onsafe)
2079 rados_callback_t complete_cb = NULL
2080 rados_callback_t safe_cb = NULL
2081 rados_completion_t completion
2082 PyObject* p_completion_obj= <PyObject*>completion_obj
2085 complete_cb = <rados_callback_t>&__aio_complete_cb
2087 safe_cb = <rados_callback_t>&__aio_safe_cb
2090 ret = rados_aio_create_completion(p_completion_obj, complete_cb, safe_cb,
2093 raise make_ex(ret, "error getting a completion")
2095 completion_obj.rados_comp = completion
2096 return completion_obj
2098 @requires(('object_name', str_type), ('oncomplete', opt(Callable)))
2099 def aio_stat(self, object_name, oncomplete):
2101 Asynchronously get object stats (size/mtime)
2103 oncomplete will be called with the returned size and mtime
2104 as well as the completion:
2106 oncomplete(completion, size, mtime)
2108 :param object_name: the name of the object to get stats from
2109 :type object_name: str
2110 :param oncomplete: what to do when the stat is complete
2111 :type oncomplete: completion
2113 :raises: :class:`Error`
2114 :returns: completion object
2117 object_name = cstr(object_name, 'object_name')
2120 Completion completion
2121 char *_object_name = object_name
2125 def oncomplete_(completion_v):
2126 cdef Completion _completion_v = completion_v
2127 return_value = _completion_v.get_return_value()
2128 if return_value >= 0:
2129 return oncomplete(_completion_v, psize, time.localtime(pmtime))
2131 return oncomplete(_completion_v, None, None)
2133 completion = self.__get_completion(oncomplete_, None)
2134 self.__track_completion(completion)
2136 ret = rados_aio_stat(self.io, _object_name, completion.rados_comp,
2140 completion._cleanup()
2141 raise make_ex(ret, "error stating %s" % object_name)
2144 @requires(('object_name', str_type), ('to_write', bytes), ('offset', int),
2145 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2146 def aio_write(self, object_name, to_write, offset=0,
2147 oncomplete=None, onsafe=None):
2149 Write data to an object asynchronously
2151 Queues the write and returns.
2153 :param object_name: name of the object
2154 :type object_name: str
2155 :param to_write: data to write
2156 :type to_write: bytes
2157 :param offset: byte offset in the object to begin writing at
2159 :param oncomplete: what to do when the write is safe and complete in memory
2161 :type oncomplete: completion
2162 :param onsafe: what to do when the write is safe and complete on storage
2164 :type onsafe: completion
2166 :raises: :class:`Error`
2167 :returns: completion object
2170 object_name = cstr(object_name, 'object_name')
2173 Completion completion
2174 char* _object_name = object_name
2175 char* _to_write = to_write
2176 size_t size = len(to_write)
2177 uint64_t _offset = offset
2179 completion = self.__get_completion(oncomplete, onsafe)
2180 self.__track_completion(completion)
2182 ret = rados_aio_write(self.io, _object_name, completion.rados_comp,
2183 _to_write, size, _offset)
2185 completion._cleanup()
2186 raise make_ex(ret, "error writing object %s" % object_name)
2189 @requires(('object_name', str_type), ('to_write', bytes), ('oncomplete', opt(Callable)),
2190 ('onsafe', opt(Callable)))
2191 def aio_write_full(self, object_name, to_write,
2192 oncomplete=None, onsafe=None):
2194 Asychronously write an entire object
2196 The object is filled with the provided data. If the object exists,
2197 it is atomically truncated and then written.
2198 Queues the write and returns.
2200 :param object_name: name of the object
2201 :type object_name: str
2202 :param to_write: data to write
2204 :param oncomplete: what to do when the write is safe and complete in memory
2206 :type oncomplete: completion
2207 :param onsafe: what to do when the write is safe and complete on storage
2209 :type onsafe: completion
2211 :raises: :class:`Error`
2212 :returns: completion object
2215 object_name = cstr(object_name, 'object_name')
2218 Completion completion
2219 char* _object_name = object_name
2220 char* _to_write = to_write
2221 size_t size = len(to_write)
2223 completion = self.__get_completion(oncomplete, onsafe)
2224 self.__track_completion(completion)
2226 ret = rados_aio_write_full(self.io, _object_name,
2227 completion.rados_comp,
2230 completion._cleanup()
2231 raise make_ex(ret, "error writing object %s" % object_name)
2234 @requires(('object_name', str_type), ('to_append', bytes), ('oncomplete', opt(Callable)),
2235 ('onsafe', opt(Callable)))
2236 def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):
2238 Asychronously append data to an object
2240 Queues the write and returns.
2242 :param object_name: name of the object
2243 :type object_name: str
2244 :param to_append: data to append
2245 :type to_append: str
2246 :param offset: byte offset in the object to begin writing at
2248 :param oncomplete: what to do when the write is safe and complete in memory
2250 :type oncomplete: completion
2251 :param onsafe: what to do when the write is safe and complete on storage
2253 :type onsafe: completion
2255 :raises: :class:`Error`
2256 :returns: completion object
2258 object_name = cstr(object_name, 'object_name')
2261 Completion completion
2262 char* _object_name = object_name
2263 char* _to_append = to_append
2264 size_t size = len(to_append)
2266 completion = self.__get_completion(oncomplete, onsafe)
2267 self.__track_completion(completion)
2269 ret = rados_aio_append(self.io, _object_name,
2270 completion.rados_comp,
2273 completion._cleanup()
2274 raise make_ex(ret, "error appending object %s" % object_name)
2277 def aio_flush(self):
2279 Block until all pending writes in an io context are safe
2281 :raises: :class:`Error`
2284 ret = rados_aio_flush(self.io)
2286 raise make_ex(ret, "error flushing")
2288 @requires(('object_name', str_type), ('length', int), ('offset', int),
2289 ('oncomplete', opt(Callable)))
2290 def aio_read(self, object_name, length, offset, oncomplete):
2292 Asychronously read data from an object
2294 oncomplete will be called with the returned read value as
2295 well as the completion:
2297 oncomplete(completion, data_read)
2299 :param object_name: name of the object to read from
2300 :type object_name: str
2301 :param length: the number of bytes to read
2303 :param offset: byte offset in the object to begin reading from
2305 :param oncomplete: what to do when the read is complete
2306 :type oncomplete: completion
2308 :raises: :class:`Error`
2309 :returns: completion object
2312 object_name = cstr(object_name, 'object_name')
2315 Completion completion
2316 char* _object_name = object_name
2317 uint64_t _offset = offset
2320 size_t _length = length
2322 def oncomplete_(completion_v):
2323 cdef Completion _completion_v = completion_v
2324 return_value = _completion_v.get_return_value()
2325 if return_value > 0 and return_value != length:
2326 _PyBytes_Resize(&_completion_v.buf, return_value)
2327 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2329 completion = self.__get_completion(oncomplete_, None)
2330 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2331 ret_buf = PyBytes_AsString(completion.buf)
2332 self.__track_completion(completion)
2334 ret = rados_aio_read(self.io, _object_name, completion.rados_comp,
2335 ret_buf, _length, _offset)
2337 completion._cleanup()
2338 raise make_ex(ret, "error reading %s" % object_name)
2341 @requires(('object_name', str_type), ('cls', str_type), ('method', str_type),
2342 ('data', bytes), ('length', int),
2343 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2344 def aio_execute(self, object_name, cls, method, data,
2345 length=8192, oncomplete=None, onsafe=None):
2347 Asynchronously execute an OSD class method on an object.
2349 oncomplete and onsafe will be called with the data returned from
2350 the plugin as well as the completion:
2352 oncomplete(completion, data)
2353 onsafe(completion, data)
2355 :param object_name: name of the object
2356 :type object_name: str
2357 :param cls: name of the object class
2359 :param method: name of the method
2361 :param data: input data
2363 :param length: size of output buffer in bytes (default=8192)
2365 :param oncomplete: what to do when the execution is complete
2366 :type oncomplete: completion
2367 :param onsafe: what to do when the execution is safe and complete
2368 :type onsafe: completion
2370 :raises: :class:`Error`
2371 :returns: completion object
2374 object_name = cstr(object_name, 'object_name')
2375 cls = cstr(cls, 'cls')
2376 method = cstr(method, 'method')
2378 Completion completion
2379 char *_object_name = object_name
2381 char *_method = method
2383 size_t _data_len = len(data)
2386 size_t _length = length
2388 def oncomplete_(completion_v):
2389 cdef Completion _completion_v = completion_v
2390 return_value = _completion_v.get_return_value()
2391 if return_value > 0 and return_value != length:
2392 _PyBytes_Resize(&_completion_v.buf, return_value)
2393 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2395 def onsafe_(completion_v):
2396 cdef Completion _completion_v = completion_v
2397 return_value = _completion_v.get_return_value()
2398 return onsafe(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2400 completion = self.__get_completion(oncomplete_ if oncomplete else None, onsafe_ if onsafe else None)
2401 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2402 ret_buf = PyBytes_AsString(completion.buf)
2403 self.__track_completion(completion)
2405 ret = rados_aio_exec(self.io, _object_name, completion.rados_comp,
2406 _cls, _method, _data, _data_len, ret_buf, _length)
2408 completion._cleanup()
2409 raise make_ex(ret, "error executing %s::%s on %s" % (cls, method, object_name))
2412 @requires(('object_name', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2413 def aio_remove(self, object_name, oncomplete=None, onsafe=None):
2415 Asychronously remove an object
2417 :param object_name: name of the object to remove
2418 :type object_name: str
2419 :param oncomplete: what to do when the remove is safe and complete in memory
2421 :type oncomplete: completion
2422 :param onsafe: what to do when the remove is safe and complete on storage
2424 :type onsafe: completion
2426 :raises: :class:`Error`
2427 :returns: completion object
2429 object_name = cstr(object_name, 'object_name')
2432 Completion completion
2433 char* _object_name = object_name
2435 completion = self.__get_completion(oncomplete, onsafe)
2436 self.__track_completion(completion)
2438 ret = rados_aio_remove(self.io, _object_name,
2439 completion.rados_comp)
2441 completion._cleanup()
2442 raise make_ex(ret, "error removing %s" % object_name)
2445 def require_ioctx_open(self):
2447 Checks if the rados.Ioctx object state is 'open'
2449 :raises: IoctxStateError
2451 if self.state != "open":
2452 raise IoctxStateError("The pool is %s" % self.state)
2454 def change_auid(self, auid):
2456 Attempt to change an io context's associated auid "owner."
2458 Requires that you have write permission on both the current and new
2461 :raises: :class:`Error`
2463 self.require_ioctx_open()
2466 uint64_t _auid = auid
2469 ret = rados_ioctx_pool_set_auid(self.io, _auid)
2471 raise make_ex(ret, "error changing auid of '%s' to %d"
2472 % (self.name, auid))
2474 @requires(('loc_key', str_type))
2475 def set_locator_key(self, loc_key):
2477 Set the key for mapping objects to pgs within an io context.
2479 The key is used instead of the object name to determine which
2480 placement groups an object is put in. This affects all subsequent
2481 operations of the io context - until a different locator key is
2482 set, all objects in this io context will be placed in the same pg.
2484 :param loc_key: the key to use as the object locator, or NULL to discard
2485 any previously set key
2488 :raises: :class:`TypeError`
2490 self.require_ioctx_open()
2491 cloc_key = cstr(loc_key, 'loc_key')
2492 cdef char *_loc_key = cloc_key
2494 rados_ioctx_locator_set_key(self.io, _loc_key)
2495 self.locator_key = loc_key
2497 def get_locator_key(self):
2499 Get the locator_key of context
2501 :returns: locator_key
2503 return self.locator_key
2505 @requires(('snap_id', long))
2506 def set_read(self, snap_id):
2508 Set the snapshot for reading objects.
2510 To stop to read from snapshot, use set_read(LIBRADOS_SNAP_HEAD)
2512 :param snap_id: the snapshot Id
2515 :raises: :class:`TypeError`
2517 self.require_ioctx_open()
2518 cdef rados_snap_t _snap_id = snap_id
2520 rados_ioctx_snap_set_read(self.io, _snap_id)
2522 @requires(('nspace', str_type))
2523 def set_namespace(self, nspace):
2525 Set the namespace for objects within an io context.
2527 The namespace in addition to the object name fully identifies
2528 an object. This affects all subsequent operations of the io context
2529 - until a different namespace is set, all objects in this io context
2530 will be placed in the same namespace.
2532 :param nspace: the namespace to use, or None/"" for the default namespace
2535 :raises: :class:`TypeError`
2537 self.require_ioctx_open()
2540 cnspace = cstr(nspace, 'nspace')
2541 cdef char *_nspace = cnspace
2543 rados_ioctx_set_namespace(self.io, _nspace)
2544 self.nspace = nspace
2546 def get_namespace(self):
2548 Get the namespace of context
2556 Close a rados.Ioctx object.
2558 This just tells librados that you no longer need to use the io context.
2559 It may not be freed immediately if there are pending asynchronous
2560 requests on it, but you should not use an io context again after
2561 calling this function on it.
2563 if self.state == "open":
2564 self.require_ioctx_open()
2566 rados_ioctx_destroy(self.io)
2567 self.state = "closed"
2570 @requires(('key', str_type), ('data', bytes))
2571 def write(self, key, data, offset=0):
2573 Write data to an object synchronously
2575 :param key: name of the object
2577 :param data: data to write
2579 :param offset: byte offset in the object to begin writing at
2582 :raises: :class:`TypeError`
2583 :raises: :class:`LogicError`
2584 :returns: int - 0 on success
2586 self.require_ioctx_open()
2588 key = cstr(key, 'key')
2592 size_t length = len(data)
2593 uint64_t _offset = offset
2596 ret = rados_write(self.io, _key, _data, length, _offset)
2600 raise make_ex(ret, "Ioctx.write(%s): failed to write %s"
2603 raise LogicError("Ioctx.write(%s): rados_write \
2604 returned %d, but should return zero on success." % (self.name, ret))
2606 @requires(('key', str_type), ('data', bytes))
2607 def write_full(self, key, data):
2609 Write an entire object synchronously.
2611 The object is filled with the provided data. If the object exists,
2612 it is atomically truncated and then written.
2614 :param key: name of the object
2616 :param data: data to write
2619 :raises: :class:`TypeError`
2620 :raises: :class:`Error`
2621 :returns: int - 0 on success
2623 self.require_ioctx_open()
2624 key = cstr(key, 'key')
2628 size_t length = len(data)
2631 ret = rados_write_full(self.io, _key, _data, length)
2635 raise make_ex(ret, "Ioctx.write_full(%s): failed to write %s"
2638 raise LogicError("Ioctx.write_full(%s): rados_write_full \
2639 returned %d, but should return zero on success." % (self.name, ret))
2641 @requires(('key', str_type), ('data', bytes))
2642 def append(self, key, data):
2644 Append data to an object synchronously
2646 :param key: name of the object
2648 :param data: data to write
2651 :raises: :class:`TypeError`
2652 :raises: :class:`LogicError`
2653 :returns: int - 0 on success
2655 self.require_ioctx_open()
2656 key = cstr(key, 'key')
2660 size_t length = len(data)
2663 ret = rados_append(self.io, _key, _data, length)
2667 raise make_ex(ret, "Ioctx.append(%s): failed to append %s"
2670 raise LogicError("Ioctx.append(%s): rados_append \
2671 returned %d, but should return zero on success." % (self.name, ret))
2673 @requires(('key', str_type))
2674 def read(self, key, length=8192, offset=0):
2676 Read data from an object synchronously
2678 :param key: name of the object
2680 :param length: the number of bytes to read (default=8192)
2682 :param offset: byte offset in the object to begin reading at
2685 :raises: :class:`TypeError`
2686 :raises: :class:`Error`
2687 :returns: str - data read from object
2689 self.require_ioctx_open()
2690 key = cstr(key, 'key')
2694 uint64_t _offset = offset
2695 size_t _length = length
2696 PyObject* ret_s = NULL
2698 ret_s = PyBytes_FromStringAndSize(NULL, length)
2700 ret_buf = PyBytes_AsString(ret_s)
2702 ret = rados_read(self.io, _key, ret_buf, _length, _offset)
2704 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2707 _PyBytes_Resize(&ret_s, ret)
2709 return <object>ret_s
2711 # We DECREF unconditionally: the cast to object above will have
2712 # INCREFed if necessary. This also takes care of exceptions,
2713 # including if _PyString_Resize fails (that will free the string
2714 # itself and set ret_s to NULL, hence XDECREF).
2715 ref.Py_XDECREF(ret_s)
2717 @requires(('key', str_type), ('cls', str_type), ('method', str_type), ('data', bytes))
2718 def execute(self, key, cls, method, data, length=8192):
2720 Execute an OSD class method on an object.
2722 :param key: name of the object
2724 :param cls: name of the object class
2726 :param method: name of the method
2728 :param data: input data
2730 :param length: size of output buffer in bytes (default=8192)
2733 :raises: :class:`TypeError`
2734 :raises: :class:`Error`
2735 :returns: (ret, method output)
2737 self.require_ioctx_open()
2739 key = cstr(key, 'key')
2740 cls = cstr(cls, 'cls')
2741 method = cstr(method, 'method')
2745 char *_method = method
2747 size_t _data_len = len(data)
2750 size_t _length = length
2751 PyObject* ret_s = NULL
2753 ret_s = PyBytes_FromStringAndSize(NULL, length)
2755 ret_buf = PyBytes_AsString(ret_s)
2757 ret = rados_exec(self.io, _key, _cls, _method, _data,
2758 _data_len, ret_buf, _length)
2760 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2763 _PyBytes_Resize(&ret_s, ret)
2765 return ret, <object>ret_s
2767 # We DECREF unconditionally: the cast to object above will have
2768 # INCREFed if necessary. This also takes care of exceptions,
2769 # including if _PyString_Resize fails (that will free the string
2770 # itself and set ret_s to NULL, hence XDECREF).
2771 ref.Py_XDECREF(ret_s)
2773 def get_stats(self):
2775 Get pool usage statistics
2777 :returns: dict - contains the following keys:
2779 - ``num_bytes`` (int) - size of pool in bytes
2781 - ``num_kb`` (int) - size of pool in kbytes
2783 - ``num_objects`` (int) - number of objects in the pool
2785 - ``num_object_clones`` (int) - number of object clones
2787 - ``num_object_copies`` (int) - number of object copies
2789 - ``num_objects_missing_on_primary`` (int) - number of objets
2792 - ``num_objects_unfound`` (int) - number of unfound objects
2794 - ``num_objects_degraded`` (int) - number of degraded objects
2796 - ``num_rd`` (int) - bytes read
2798 - ``num_rd_kb`` (int) - kbytes read
2800 - ``num_wr`` (int) - bytes written
2802 - ``num_wr_kb`` (int) - kbytes written
2804 self.require_ioctx_open()
2805 cdef rados_pool_stat_t stats
2807 ret = rados_ioctx_pool_stat(self.io, &stats)
2809 raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name)
2810 return {'num_bytes': stats.num_bytes,
2811 'num_kb': stats.num_kb,
2812 'num_objects': stats.num_objects,
2813 'num_object_clones': stats.num_object_clones,
2814 'num_object_copies': stats.num_object_copies,
2815 "num_objects_missing_on_primary": stats.num_objects_missing_on_primary,
2816 "num_objects_unfound": stats.num_objects_unfound,
2817 "num_objects_degraded": stats.num_objects_degraded,
2818 "num_rd": stats.num_rd,
2819 "num_rd_kb": stats.num_rd_kb,
2820 "num_wr": stats.num_wr,
2821 "num_wr_kb": stats.num_wr_kb}
2823 @requires(('key', str_type))
2824 def remove_object(self, key):
2828 This does not delete any snapshots of the object.
2830 :param key: the name of the object to delete
2833 :raises: :class:`TypeError`
2834 :raises: :class:`Error`
2835 :returns: bool - True on success
2837 self.require_ioctx_open()
2838 key = cstr(key, 'key')
2843 ret = rados_remove(self.io, _key)
2845 raise make_ex(ret, "Failed to remove '%s'" % key)
2848 @requires(('key', str_type))
2849 def trunc(self, key, size):
2853 If this enlarges the object, the new area is logically filled with
2854 zeroes. If this shrinks the object, the excess data is removed.
2856 :param key: the name of the object to resize
2858 :param size: the new size of the object in bytes
2861 :raises: :class:`TypeError`
2862 :raises: :class:`Error`
2863 :returns: int - 0 on success, otherwise raises error
2866 self.require_ioctx_open()
2867 key = cstr(key, 'key')
2870 uint64_t _size = size
2873 ret = rados_trunc(self.io, _key, _size)
2875 raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))
2878 @requires(('key', str_type))
2879 def stat(self, key):
2881 Get object stats (size/mtime)
2883 :param key: the name of the object to get stats from
2886 :raises: :class:`TypeError`
2887 :raises: :class:`Error`
2888 :returns: (size,timestamp)
2890 self.require_ioctx_open()
2892 key = cstr(key, 'key')
2899 ret = rados_stat(self.io, _key, &psize, &pmtime)
2901 raise make_ex(ret, "Failed to stat %r" % key)
2902 return psize, time.localtime(pmtime)
2904 @requires(('key', str_type), ('xattr_name', str_type))
2905 def get_xattr(self, key, xattr_name):
2907 Get the value of an extended attribute on an object.
2909 :param key: the name of the object to get xattr from
2911 :param xattr_name: which extended attribute to read
2912 :type xattr_name: str
2914 :raises: :class:`TypeError`
2915 :raises: :class:`Error`
2916 :returns: str - value of the xattr
2918 self.require_ioctx_open()
2920 key = cstr(key, 'key')
2921 xattr_name = cstr(xattr_name, 'xattr_name')
2924 char *_xattr_name = xattr_name
2925 size_t ret_length = 4096
2926 char *ret_buf = NULL
2929 while ret_length < 4096 * 1024 * 1024:
2930 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
2932 ret = rados_getxattr(self.io, _key, _xattr_name, ret_buf, ret_length)
2933 if ret == -errno.ERANGE:
2936 raise make_ex(ret, "Failed to get xattr %r" % xattr_name)
2939 return ret_buf[:ret]
2943 @requires(('oid', str_type))
2944 def get_xattrs(self, oid):
2946 Start iterating over xattrs on an object.
2948 :param oid: the name of the object to get xattrs from
2951 :raises: :class:`TypeError`
2952 :raises: :class:`Error`
2953 :returns: XattrIterator
2955 self.require_ioctx_open()
2956 return XattrIterator(self, oid)
2958 @requires(('key', str_type), ('xattr_name', str_type), ('xattr_value', bytes))
2959 def set_xattr(self, key, xattr_name, xattr_value):
2961 Set an extended attribute on an object.
2963 :param key: the name of the object to set xattr to
2965 :param xattr_name: which extended attribute to set
2966 :type xattr_name: str
2967 :param xattr_value: the value of the extended attribute
2968 :type xattr_value: bytes
2970 :raises: :class:`TypeError`
2971 :raises: :class:`Error`
2972 :returns: bool - True on success, otherwise raise an error
2974 self.require_ioctx_open()
2976 key = cstr(key, 'key')
2977 xattr_name = cstr(xattr_name, 'xattr_name')
2980 char *_xattr_name = xattr_name
2981 char *_xattr_value = xattr_value
2982 size_t _xattr_value_len = len(xattr_value)
2985 ret = rados_setxattr(self.io, _key, _xattr_name,
2986 _xattr_value, _xattr_value_len)
2988 raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
2991 @requires(('key', str_type), ('xattr_name', str_type))
2992 def rm_xattr(self, key, xattr_name):
2994 Removes an extended attribute on from an object.
2996 :param key: the name of the object to remove xattr from
2998 :param xattr_name: which extended attribute to remove
2999 :type xattr_name: str
3001 :raises: :class:`TypeError`
3002 :raises: :class:`Error`
3003 :returns: bool - True on success, otherwise raise an error
3005 self.require_ioctx_open()
3007 key = cstr(key, 'key')
3008 xattr_name = cstr(xattr_name, 'xattr_name')
3011 char *_xattr_name = xattr_name
3014 ret = rados_rmxattr(self.io, _key, _xattr_name)
3016 raise make_ex(ret, "Failed to delete key %r xattr %r" %
3020 def list_objects(self):
3022 Get ObjectIterator on rados.Ioctx object.
3024 :returns: ObjectIterator
3026 self.require_ioctx_open()
3027 return ObjectIterator(self)
3029 def list_snaps(self):
3031 Get SnapIterator on rados.Ioctx object.
3033 :returns: SnapIterator
3035 self.require_ioctx_open()
3036 return SnapIterator(self)
3038 @requires(('snap_name', str_type))
3039 def create_snap(self, snap_name):
3041 Create a pool-wide snapshot
3043 :param snap_name: the name of the snapshot
3044 :type snap_name: str
3046 :raises: :class:`TypeError`
3047 :raises: :class:`Error`
3049 self.require_ioctx_open()
3050 snap_name = cstr(snap_name, 'snap_name')
3051 cdef char *_snap_name = snap_name
3054 ret = rados_ioctx_snap_create(self.io, _snap_name)
3056 raise make_ex(ret, "Failed to create snap %s" % snap_name)
3058 @requires(('snap_name', str_type))
3059 def remove_snap(self, snap_name):
3061 Removes a pool-wide snapshot
3063 :param snap_name: the name of the snapshot
3064 :type snap_name: str
3066 :raises: :class:`TypeError`
3067 :raises: :class:`Error`
3069 self.require_ioctx_open()
3070 snap_name = cstr(snap_name, 'snap_name')
3071 cdef char *_snap_name = snap_name
3074 ret = rados_ioctx_snap_remove(self.io, _snap_name)
3076 raise make_ex(ret, "Failed to remove snap %s" % snap_name)
3078 @requires(('snap_name', str_type))
3079 def lookup_snap(self, snap_name):
3081 Get the id of a pool snapshot
3083 :param snap_name: the name of the snapshot to lookop
3084 :type snap_name: str
3086 :raises: :class:`TypeError`
3087 :raises: :class:`Error`
3088 :returns: Snap - on success
3090 self.require_ioctx_open()
3091 csnap_name = cstr(snap_name, 'snap_name')
3093 char *_snap_name = csnap_name
3094 rados_snap_t snap_id
3097 ret = rados_ioctx_snap_lookup(self.io, _snap_name, &snap_id)
3099 raise make_ex(ret, "Failed to lookup snap %s" % snap_name)
3100 return Snap(self, snap_name, int(snap_id))
3102 @requires(('oid', str_type), ('snap_name', str_type))
3103 def snap_rollback(self, oid, snap_name):
3105 Rollback an object to a snapshot
3107 :param oid: the name of the object
3109 :param snap_name: the name of the snapshot
3110 :type snap_name: str
3112 :raises: :class:`TypeError`
3113 :raises: :class:`Error`
3115 self.require_ioctx_open()
3116 oid = cstr(oid, 'oid')
3117 snap_name = cstr(snap_name, 'snap_name')
3119 char *_snap_name = snap_name
3123 ret = rados_ioctx_snap_rollback(self.io, _oid, _snap_name)
3125 raise make_ex(ret, "Failed to rollback %s" % oid)
3127 def create_self_managed_snap(self):
3129 Creates a self-managed snapshot
3131 :returns: snap id on success
3133 :raises: :class:`Error`
3135 self.require_ioctx_open()
3137 rados_snap_t _snap_id
3139 ret = rados_ioctx_selfmanaged_snap_create(self.io, &_snap_id)
3141 raise make_ex(ret, "Failed to create self-managed snapshot")
3142 return int(_snap_id)
3144 @requires(('snap_id', int))
3145 def remove_self_managed_snap(self, snap_id):
3147 Removes a self-managed snapshot
3149 :param snap_id: the name of the snapshot
3152 :raises: :class:`TypeError`
3153 :raises: :class:`Error`
3155 self.require_ioctx_open()
3157 rados_snap_t _snap_id = snap_id
3159 ret = rados_ioctx_selfmanaged_snap_remove(self.io, _snap_id)
3161 raise make_ex(ret, "Failed to remove self-managed snapshot")
3163 def set_self_managed_snap_write(self, snaps):
3165 Updates the write context to the specified self-managed
3168 :param snaps: all associated self-managed snapshot ids
3171 :raises: :class:`TypeError`
3172 :raises: :class:`Error`
3174 self.require_ioctx_open()
3178 sorted_snaps = sorted([int(x) for x in snaps], reverse=True)
3179 snap_seq = sorted_snaps[0]
3182 rados_snap_t _snap_seq = snap_seq
3183 rados_snap_t *_snaps = NULL
3184 int _num_snaps = len(sorted_snaps)
3186 _snaps = <rados_snap_t *>malloc(_num_snaps * sizeof(rados_snap_t))
3187 for i in range(len(sorted_snaps)):
3188 _snaps[i] = sorted_snaps[i]
3190 ret = rados_ioctx_selfmanaged_snap_set_write_ctx(self.io,
3195 raise make_ex(ret, "Failed to update snapshot write context")
3199 @requires(('oid', str_type), ('snap_id', int))
3200 def rollback_self_managed_snap(self, oid, snap_id):
3202 Rolls an specific object back to a self-managed snapshot revision
3204 :param oid: the name of the object
3206 :param snap_id: the name of the snapshot
3209 :raises: :class:`TypeError`
3210 :raises: :class:`Error`
3212 self.require_ioctx_open()
3213 oid = cstr(oid, 'oid')
3216 rados_snap_t _snap_id = snap_id
3218 ret = rados_ioctx_selfmanaged_snap_rollback(self.io, _oid, _snap_id)
3220 raise make_ex(ret, "Failed to rollback %s" % oid)
3222 def get_last_version(self):
3224 Return the version of the last object read or written to.
3226 This exposes the internal version number of the last object read or
3227 written via this io context
3229 :returns: version of the last object used
3231 self.require_ioctx_open()
3233 ret = rados_get_last_version(self.io)
3236 def create_write_op(self):
3238 create write operation object.
3239 need call release_write_op after use
3241 return WriteOp().create()
3243 def create_read_op(self):
3245 create read operation object.
3246 need call release_read_op after use
3248 return ReadOp().create()
3250 def release_write_op(self, write_op):
3252 release memory alloc by create_write_op
3256 def release_read_op(self, read_op):
3258 release memory alloc by create_read_op
3259 :para read_op: read_op object
3264 @requires(('write_op', WriteOp), ('keys', tuple), ('values', tuple))
3265 def set_omap(self, write_op, keys, values):
3267 set keys values to write_op
3268 :para write_op: write_operation object
3269 :type write_op: WriteOp
3270 :para keys: a tuple of keys
3272 :para values: a tuple of values
3276 if len(keys) != len(values):
3277 raise Error("Rados(): keys and values must have the same number of items")
3279 keys = cstr_list(keys, 'keys')
3281 WriteOp _write_op = write_op
3282 size_t key_num = len(keys)
3283 char **_keys = to_bytes_array(keys)
3284 char **_values = to_bytes_array(values)
3285 size_t *_lens = to_csize_t_array([len(v) for v in values])
3289 rados_write_op_omap_set(_write_op.write_op,
3290 <const char**>_keys,
3291 <const char**>_values,
3292 <const size_t*>_lens, key_num)
3298 @requires(('write_op', WriteOp), ('oid', str_type), ('mtime', opt(int)), ('flags', opt(int)))
3299 def operate_write_op(self, write_op, oid, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3301 excute the real write operation
3302 :para write_op: write operation object
3303 :type write_op: WriteOp
3304 :para oid: object name
3306 :para mtime: the time to set the mtime to, 0 for the current time
3308 :para flags: flags to apply to the entire operation
3312 oid = cstr(oid, 'oid')
3314 WriteOp _write_op = write_op
3316 time_t _mtime = mtime
3320 ret = rados_write_op_operate(_write_op.write_op, self.io, _oid, &_mtime, _flags)
3322 raise make_ex(ret, "Failed to operate write op for oid %s" % oid)
3324 @requires(('write_op', WriteOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('mtime', opt(int)), ('flags', opt(int)))
3325 def operate_aio_write_op(self, write_op, oid, oncomplete=None, onsafe=None, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3327 excute the real write operation asynchronously
3328 :para write_op: write operation object
3329 :type write_op: WriteOp
3330 :para oid: object name
3332 :param oncomplete: what to do when the remove is safe and complete in memory
3334 :type oncomplete: completion
3335 :param onsafe: what to do when the remove is safe and complete on storage
3337 :type onsafe: completion
3338 :para mtime: the time to set the mtime to, 0 for the current time
3340 :para flags: flags to apply to the entire operation
3343 :raises: :class:`Error`
3344 :returns: completion object
3347 oid = cstr(oid, 'oid')
3349 WriteOp _write_op = write_op
3351 Completion completion
3352 time_t _mtime = mtime
3355 completion = self.__get_completion(oncomplete, onsafe)
3356 self.__track_completion(completion)
3359 ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid,
3362 completion._cleanup()
3363 raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid)
3366 @requires(('read_op', ReadOp), ('oid', str_type), ('flag', opt(int)))
3367 def operate_read_op(self, read_op, oid, flag=LIBRADOS_OPERATION_NOFLAG):
3369 excute the real read operation
3370 :para read_op: read operation object
3371 :type read_op: ReadOp
3372 :para oid: object name
3374 :para flag: flags to apply to the entire operation
3377 oid = cstr(oid, 'oid')
3379 ReadOp _read_op = read_op
3384 ret = rados_read_op_operate(_read_op.read_op, self.io, _oid, _flag)
3386 raise make_ex(ret, "Failed to operate read op for oid %s" % oid)
3388 @requires(('read_op', ReadOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('flag', opt(int)))
3389 def operate_aio_read_op(self, read_op, oid, oncomplete=None, onsafe=None, flag=LIBRADOS_OPERATION_NOFLAG):
3391 excute the real read operation
3392 :para read_op: read operation object
3393 :type read_op: ReadOp
3394 :para oid: object name
3396 :param oncomplete: what to do when the remove is safe and complete in memory
3398 :type oncomplete: completion
3399 :param onsafe: what to do when the remove is safe and complete on storage
3401 :type onsafe: completion
3402 :para flag: flags to apply to the entire operation
3405 oid = cstr(oid, 'oid')
3407 ReadOp _read_op = read_op
3409 Completion completion
3412 completion = self.__get_completion(oncomplete, onsafe)
3413 self.__track_completion(completion)
3416 ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag)
3418 completion._cleanup()
3419 raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid)
3422 @requires(('read_op', ReadOp), ('start_after', str_type), ('filter_prefix', str_type), ('max_return', int))
3423 def get_omap_vals(self, read_op, start_after, filter_prefix, max_return):
3426 :para read_op: read operation object
3427 :type read_op: ReadOp
3428 :para start_after: list keys starting after start_after
3429 :type start_after: str
3430 :para filter_prefix: list only keys beginning with filter_prefix
3431 :type filter_prefix: str
3432 :para max_return: list no more than max_return key/value pairs
3433 :type max_return: int
3434 :returns: an iterator over the requested omap values, return value from this action
3437 start_after = cstr(start_after, 'start_after') if start_after else None
3438 filter_prefix = cstr(filter_prefix, 'filter_prefix') if filter_prefix else None
3440 char *_start_after = opt_str(start_after)
3441 char *_filter_prefix = opt_str(filter_prefix)
3442 ReadOp _read_op = read_op
3443 rados_omap_iter_t iter_addr = NULL
3444 int _max_return = max_return
3448 rados_read_op_omap_get_vals2(_read_op.read_op, _start_after, _filter_prefix,
3449 _max_return, &iter_addr, NULL, &prval)
3450 it = OmapIterator(self)
3452 return it, int(prval)
3454 @requires(('read_op', ReadOp), ('start_after', str_type), ('max_return', int))
3455 def get_omap_keys(self, read_op, start_after, max_return):
3458 :para read_op: read operation object
3459 :type read_op: ReadOp
3460 :para start_after: list keys starting after start_after
3461 :type start_after: str
3462 :para max_return: list no more than max_return key/value pairs
3463 :type max_return: int
3464 :returns: an iterator over the requested omap values, return value from this action
3466 start_after = cstr(start_after, 'start_after') if start_after else None
3468 char *_start_after = opt_str(start_after)
3469 ReadOp _read_op = read_op
3470 rados_omap_iter_t iter_addr = NULL
3471 int _max_return = max_return
3475 rados_read_op_omap_get_keys2(_read_op.read_op, _start_after,
3476 _max_return, &iter_addr, NULL, &prval)
3477 it = OmapIterator(self)
3479 return it, int(prval)
3481 @requires(('read_op', ReadOp), ('keys', tuple))
3482 def get_omap_vals_by_keys(self, read_op, keys):
3484 get the omap values by keys
3485 :para read_op: read operation object
3486 :type read_op: ReadOp
3487 :para keys: input key tuple
3489 :returns: an iterator over the requested omap values, return value from this action
3491 keys = cstr_list(keys, 'keys')
3493 ReadOp _read_op = read_op
3494 rados_omap_iter_t iter_addr
3495 char **_keys = to_bytes_array(keys)
3496 size_t key_num = len(keys)
3501 rados_read_op_omap_get_vals_by_keys(_read_op.read_op,
3502 <const char**>_keys,
3503 key_num, &iter_addr, &prval)
3504 it = OmapIterator(self)
3506 return it, int(prval)
3510 @requires(('write_op', WriteOp), ('keys', tuple))
3511 def remove_omap_keys(self, write_op, keys):
3513 remove omap keys specifiled
3514 :para write_op: write operation object
3515 :type write_op: WriteOp
3516 :para keys: input key tuple
3520 keys = cstr_list(keys, 'keys')
3522 WriteOp _write_op = write_op
3523 size_t key_num = len(keys)
3524 char **_keys = to_bytes_array(keys)
3528 rados_write_op_omap_rm_keys(_write_op.write_op, <const char**>_keys, key_num)
3532 @requires(('write_op', WriteOp))
3533 def clear_omap(self, write_op):
3535 Remove all key/value pairs from an object
3536 :para write_op: write operation object
3537 :type write_op: WriteOp
3541 WriteOp _write_op = write_op
3544 rados_write_op_omap_clear(_write_op.write_op)
3546 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('desc', str_type),
3547 ('duration', opt(int)), ('flags', int))
3548 def lock_exclusive(self, key, name, cookie, desc="", duration=None, flags=0):
3551 Take an exclusive lock on an object
3553 :param key: name of the object
3555 :param name: name of the lock
3557 :param cookie: cookie of the lock
3559 :param desc: description of the lock
3561 :param duration: duration of the lock in seconds
3566 :raises: :class:`TypeError`
3567 :raises: :class:`Error`
3569 self.require_ioctx_open()
3571 key = cstr(key, 'key')
3572 name = cstr(name, 'name')
3573 cookie = cstr(cookie, 'cookie')
3574 desc = cstr(desc, 'desc')
3579 char* _cookie = cookie
3581 uint8_t _flags = flags
3584 if duration is None:
3586 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3589 _duration.tv_sec = duration
3591 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3595 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3597 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('tag', str_type),
3598 ('desc', str_type), ('duration', opt(int)), ('flags', int))
3599 def lock_shared(self, key, name, cookie, tag, desc="", duration=None, flags=0):
3602 Take a shared lock on an object
3604 :param key: name of the object
3606 :param name: name of the lock
3608 :param cookie: cookie of the lock
3610 :param tag: tag of the lock
3612 :param desc: description of the lock
3614 :param duration: duration of the lock in seconds
3619 :raises: :class:`TypeError`
3620 :raises: :class:`Error`
3622 self.require_ioctx_open()
3624 key = cstr(key, 'key')
3625 tag = cstr(tag, 'tag')
3626 name = cstr(name, 'name')
3627 cookie = cstr(cookie, 'cookie')
3628 desc = cstr(desc, 'desc')
3634 char* _cookie = cookie
3636 uint8_t _flags = flags
3639 if duration is None:
3641 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3644 _duration.tv_sec = duration
3646 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3649 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3651 @requires(('key', str_type), ('name', str_type), ('cookie', str_type))
3652 def unlock(self, key, name, cookie):
3655 Release a shared or exclusive lock on an object
3657 :param key: name of the object
3659 :param name: name of the lock
3661 :param cookie: cookie of the lock
3664 :raises: :class:`TypeError`
3665 :raises: :class:`Error`
3667 self.require_ioctx_open()
3669 key = cstr(key, 'key')
3670 name = cstr(name, 'name')
3671 cookie = cstr(cookie, 'cookie')
3676 char* _cookie = cookie
3679 ret = rados_unlock(self.io, _key, _name, _cookie)
3681 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3683 def application_enable(self, app_name, force=False):
3685 Enable an application on an OSD pool
3687 :param app_name: application name
3689 :param force: False if only a single app should exist per pool
3690 :type expire_seconds: boool
3692 :raises: :class:`Error`
3694 app_name = cstr(app_name, 'app_name')
3696 char *_app_name = app_name
3697 int _force = (1 if force else 0)
3700 ret = rados_application_enable(self.io, _app_name, _force)
3702 raise make_ex(ret, "error enabling application")
3704 def application_list(self):
3706 Returns a list of enabled applications
3708 :returns: list of app name string
3716 apps = <char *>realloc_chk(apps, length)
3718 ret = rados_application_list(self.io, apps, &length)
3720 return [decode_cstr(app) for app in
3721 apps[:length].split(b'\0') if app]
3722 elif ret == -errno.ENOENT:
3724 elif ret == -errno.ERANGE:
3727 raise make_ex(ret, "error listing applications")
3731 def application_metadata_set(self, app_name, key, value):
3733 Sets application metadata on an OSD pool
3735 :param app_name: application name
3737 :param key: metadata key
3739 :param value: metadata value
3742 :raises: :class:`Error`
3744 app_name = cstr(app_name, 'app_name')
3745 key = cstr(key, 'key')
3746 value = cstr(value, 'value')
3748 char *_app_name = app_name
3750 char *_value = value
3753 ret = rados_application_metadata_set(self.io, _app_name, _key,
3756 raise make_ex(ret, "error setting application metadata")
3758 def application_metadata_remove(self, app_name, key):
3760 Remove application metadata from an OSD pool
3762 :param app_name: application name
3764 :param key: metadata key
3767 :raises: :class:`Error`
3769 app_name = cstr(app_name, 'app_name')
3770 key = cstr(key, 'key')
3772 char *_app_name = app_name
3776 ret = rados_application_metadata_remove(self.io, _app_name, _key)
3778 raise make_ex(ret, "error removing application metadata")
3780 def application_metadata_list(self, app_name):
3782 Returns a list of enabled applications
3784 :param app_name: application name
3786 :returns: list of key/value tuples
3788 app_name = cstr(app_name, 'app_name')
3790 char *_app_name = app_name
3791 size_t key_length = 128
3792 size_t val_length = 128
3798 c_keys = <char *>realloc_chk(c_keys, key_length)
3799 c_vals = <char *>realloc_chk(c_vals, val_length)
3801 ret = rados_application_metadata_list(self.io, _app_name,
3802 c_keys, &key_length,
3803 c_vals, &val_length)
3805 keys = [decode_cstr(key) for key in
3806 c_keys[:key_length].split(b'\0') if key]
3807 vals = [decode_cstr(val) for val in
3808 c_vals[:val_length].split(b'\0') if val]
3809 return zip(keys, vals)
3810 elif ret == -errno.ERANGE:
3813 raise make_ex(ret, "error listing application metadata")
3819 def set_object_locator(func):
3820 def retfunc(self, *args, **kwargs):
3821 if self.locator_key is not None:
3822 old_locator = self.ioctx.get_locator_key()
3823 self.ioctx.set_locator_key(self.locator_key)
3824 retval = func(self, *args, **kwargs)
3825 self.ioctx.set_locator_key(old_locator)
3828 return func(self, *args, **kwargs)
3832 def set_object_namespace(func):
3833 def retfunc(self, *args, **kwargs):
3834 if self.nspace is None:
3835 raise LogicError("Namespace not set properly in context")
3836 old_nspace = self.ioctx.get_namespace()
3837 self.ioctx.set_namespace(self.nspace)
3838 retval = func(self, *args, **kwargs)
3839 self.ioctx.set_namespace(old_nspace)
3844 class Object(object):
3845 """Rados object wrapper, makes the object look like a file"""
3846 def __init__(self, ioctx, key, locator_key=None, nspace=None):
3850 self.state = "exists"
3851 self.locator_key = locator_key
3852 self.nspace = "" if nspace is None else nspace
3855 return "rados.Object(ioctx=%s,key=%s,nspace=%s,locator=%s)" % \
3856 (str(self.ioctx), self.key, "--default--"
3857 if self.nspace is "" else self.nspace, self.locator_key)
3859 def require_object_exists(self):
3860 if self.state != "exists":
3861 raise ObjectStateError("The object is %s" % self.state)
3864 @set_object_namespace
3865 def read(self, length=1024 * 1024):
3866 self.require_object_exists()
3867 ret = self.ioctx.read(self.key, length, self.offset)
3868 self.offset += len(ret)
3872 @set_object_namespace
3873 def write(self, string_to_write):
3874 self.require_object_exists()
3875 ret = self.ioctx.write(self.key, string_to_write, self.offset)
3877 self.offset += len(string_to_write)
3881 @set_object_namespace
3883 self.require_object_exists()
3884 self.ioctx.remove_object(self.key)
3885 self.state = "removed"
3888 @set_object_namespace
3890 self.require_object_exists()
3891 return self.ioctx.stat(self.key)
3893 def seek(self, position):
3894 self.require_object_exists()
3895 self.offset = position
3898 @set_object_namespace
3899 def get_xattr(self, xattr_name):
3900 self.require_object_exists()
3901 return self.ioctx.get_xattr(self.key, xattr_name)
3904 @set_object_namespace
3905 def get_xattrs(self):
3906 self.require_object_exists()
3907 return self.ioctx.get_xattrs(self.key)
3910 @set_object_namespace
3911 def set_xattr(self, xattr_name, xattr_value):
3912 self.require_object_exists()
3913 return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)
3916 @set_object_namespace
3917 def rm_xattr(self, xattr_name):
3918 self.require_object_exists()
3919 return self.ioctx.rm_xattr(self.key, xattr_name)
3930 class MonitorLog(object):
3931 # NOTE(sileht): Keep this class for backward compat
3932 # method moved to Rados.monitor_log()
3934 For watching cluster log messages. Instantiate an object and keep
3935 it around while callback is periodically called. Construct with
3936 'level' to monitor 'level' messages (one of MONITOR_LEVELS).
3937 arg will be passed to the callback.
3939 callback will be called with:
3940 arg (given to __init__)
3941 line (the full line, including timestamp, who, level, msg)
3942 who (which entity issued the log message)
3943 timestamp_sec (sec of a struct timespec)
3944 timestamp_nsec (sec of a struct timespec)
3945 seq (sequence number)
3946 level (string representing the level of the log message)
3947 msg (the message itself)
3948 callback's return value is ignored
3950 def __init__(self, cluster, level, callback, arg):
3952 self.callback = callback
3954 self.cluster = cluster
3955 self.cluster.monitor_log(level, callback, arg)