1 # cython: embedsignature=True
3 This module is a thin wrapper around librados.
5 Error codes from librados are turned into exceptions that subclass
6 :class:`Error`. Almost all methods may raise :class:`Error(the base class of all rados exceptions), :class:`PermissionError`
7 (the base class of all rados exceptions), :class:`PermissionError`
8 and :class:`IOError`, in addition to those documented for the
11 # Copyright 2011 Josh Durgin
12 # Copyright 2011, Hannu Valtonen <hannu.valtonen@ormod.com>
13 # Copyright 2015 Hector Martin <marcan@marcan.st>
14 # Copyright 2016 Mehdi Abaakouk <sileht@redhat.com>
16 from cpython cimport PyObject, ref
17 from cpython.pycapsule cimport *
18 from libc cimport errno
19 from libc.stdint cimport *
20 from libc.stdlib cimport malloc, realloc, free
26 from collections import Callable
27 from datetime import datetime
28 from functools import partial, wraps
29 from itertools import chain
31 # Are we running Python 2.x
32 if sys.version_info[0] < 3:
38 cdef extern from "Python.h":
39 # These are in cpython/string.pxd, but use "object" types instead of
40 # PyObject*, which invokes assumptions in cpython that we need to
41 # legitimately break to implement zero-copy string buffers in Ioctx.read().
42 # This is valid use of the Python API and documented as a special case.
43 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
44 char* PyBytes_AsString(PyObject *string) except NULL
45 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
46 void PyEval_InitThreads()
49 cdef extern from "time.h":
50 ctypedef long int time_t
51 ctypedef long int suseconds_t
54 cdef extern from "sys/time.h":
60 cdef extern from "rados/rados_types.h" nogil:
61 cdef char* _LIBRADOS_ALL_NSPACES "LIBRADOS_ALL_NSPACES"
64 cdef extern from "rados/librados.h" nogil:
66 _LIBRADOS_OP_FLAG_EXCL "LIBRADOS_OP_FLAG_EXCL"
67 _LIBRADOS_OP_FLAG_FAILOK "LIBRADOS_OP_FLAG_FAILOK"
68 _LIBRADOS_OP_FLAG_FADVISE_RANDOM "LIBRADOS_OP_FLAG_FADVISE_RANDOM"
69 _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL "LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL"
70 _LIBRADOS_OP_FLAG_FADVISE_WILLNEED "LIBRADOS_OP_FLAG_FADVISE_WILLNEED"
71 _LIBRADOS_OP_FLAG_FADVISE_DONTNEED "LIBRADOS_OP_FLAG_FADVISE_DONTNEED"
72 _LIBRADOS_OP_FLAG_FADVISE_NOCACHE "LIBRADOS_OP_FLAG_FADVISE_NOCACHE"
76 _LIBRADOS_OPERATION_NOFLAG "LIBRADOS_OPERATION_NOFLAG"
77 _LIBRADOS_OPERATION_BALANCE_READS "LIBRADOS_OPERATION_BALANCE_READS"
78 _LIBRADOS_OPERATION_LOCALIZE_READS "LIBRADOS_OPERATION_LOCALIZE_READS"
79 _LIBRADOS_OPERATION_ORDER_READS_WRITES "LIBRADOS_OPERATION_ORDER_READS_WRITES"
80 _LIBRADOS_OPERATION_IGNORE_CACHE "LIBRADOS_OPERATION_IGNORE_CACHE"
81 _LIBRADOS_OPERATION_SKIPRWLOCKS "LIBRADOS_OPERATION_SKIPRWLOCKS"
82 _LIBRADOS_OPERATION_IGNORE_OVERLAY "LIBRADOS_OPERATION_IGNORE_OVERLAY"
83 _LIBRADOS_CREATE_EXCLUSIVE "LIBRADOS_CREATE_EXCLUSIVE"
84 _LIBRADOS_CREATE_IDEMPOTENT "LIBRADOS_CREATE_IDEMPOTENT"
86 cdef uint64_t _LIBRADOS_SNAP_HEAD "LIBRADOS_SNAP_HEAD"
88 ctypedef void* rados_t
89 ctypedef void* rados_config_t
90 ctypedef void* rados_ioctx_t
91 ctypedef void* rados_xattrs_iter_t
92 ctypedef void* rados_omap_iter_t
93 ctypedef void* rados_list_ctx_t
94 ctypedef uint64_t rados_snap_t
95 ctypedef void *rados_write_op_t
96 ctypedef void *rados_read_op_t
97 ctypedef void *rados_completion_t
98 ctypedef void (*rados_callback_t)(rados_completion_t cb, void *arg)
99 ctypedef void (*rados_log_callback_t)(void *arg, const char *line, const char *who,
100 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
101 ctypedef void (*rados_log_callback2_t)(void *arg, const char *line, const char *channel, const char *who, const char *name,
102 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
105 cdef struct rados_cluster_stat_t:
111 cdef struct rados_pool_stat_t:
115 uint64_t num_object_clones
116 uint64_t num_object_copies
117 uint64_t num_objects_missing_on_primary
118 uint64_t num_objects_unfound
119 uint64_t num_objects_degraded
125 void rados_buffer_free(char *buf)
127 void rados_version(int *major, int *minor, int *extra)
128 int rados_create2(rados_t *pcluster, const char *const clustername,
129 const char * const name, uint64_t flags)
130 int rados_create_with_context(rados_t *cluster, rados_config_t cct)
131 int rados_connect(rados_t cluster)
132 void rados_shutdown(rados_t cluster)
133 int rados_conf_read_file(rados_t cluster, const char *path)
134 int rados_conf_parse_argv_remainder(rados_t cluster, int argc, const char **argv, const char **remargv)
135 int rados_conf_parse_env(rados_t cluster, const char *var)
136 int rados_conf_set(rados_t cluster, char *option, const char *value)
137 int rados_conf_get(rados_t cluster, char *option, char *buf, size_t len)
139 int rados_ioctx_pool_stat(rados_ioctx_t io, rados_pool_stat_t *stats)
140 int64_t rados_pool_lookup(rados_t cluster, const char *pool_name)
141 int rados_pool_reverse_lookup(rados_t cluster, int64_t id, char *buf, size_t maxlen)
142 int rados_pool_create(rados_t cluster, const char *pool_name)
143 int rados_pool_create_with_auid(rados_t cluster, const char *pool_name, uint64_t auid)
144 int rados_pool_create_with_crush_rule(rados_t cluster, const char *pool_name, uint8_t crush_rule_num)
145 int rados_pool_create_with_all(rados_t cluster, const char *pool_name, uint64_t auid, uint8_t crush_rule_num)
146 int rados_pool_get_base_tier(rados_t cluster, int64_t pool, int64_t *base_tier)
147 int rados_pool_list(rados_t cluster, char *buf, size_t len)
148 int rados_pool_delete(rados_t cluster, const char *pool_name)
149 int rados_inconsistent_pg_list(rados_t cluster, int64_t pool, char *buf, size_t len)
151 int rados_cluster_stat(rados_t cluster, rados_cluster_stat_t *result)
152 int rados_cluster_fsid(rados_t cluster, char *buf, size_t len)
153 int rados_blacklist_add(rados_t cluster, char *client_address, uint32_t expire_seconds)
154 int rados_application_enable(rados_ioctx_t io, const char *app_name,
156 int rados_application_list(rados_ioctx_t io, char *values,
158 int rados_application_metadata_get(rados_ioctx_t io, const char *app_name,
159 const char *key, char *value,
161 int rados_application_metadata_set(rados_ioctx_t io, const char *app_name,
162 const char *key, const char *value)
163 int rados_application_metadata_remove(rados_ioctx_t io,
164 const char *app_name, const char *key)
165 int rados_application_metadata_list(rados_ioctx_t io,
166 const char *app_name, char *keys,
167 size_t *key_len, char *values,
169 int rados_ping_monitor(rados_t cluster, const char *mon_id, char **outstr, size_t *outstrlen)
170 int rados_mon_command(rados_t cluster, const char **cmd, size_t cmdlen,
171 const char *inbuf, size_t inbuflen,
172 char **outbuf, size_t *outbuflen,
173 char **outs, size_t *outslen)
174 int rados_mgr_command(rados_t cluster, const char **cmd, size_t cmdlen,
175 const char *inbuf, size_t inbuflen,
176 char **outbuf, size_t *outbuflen,
177 char **outs, size_t *outslen)
178 int rados_mon_command_target(rados_t cluster, const char *name, const char **cmd, size_t cmdlen,
179 const char *inbuf, size_t inbuflen,
180 char **outbuf, size_t *outbuflen,
181 char **outs, size_t *outslen)
182 int rados_osd_command(rados_t cluster, int osdid, const char **cmd, size_t cmdlen,
183 const char *inbuf, size_t inbuflen,
184 char **outbuf, size_t *outbuflen,
185 char **outs, size_t *outslen)
186 int rados_pg_command(rados_t cluster, const char *pgstr, const char **cmd, size_t cmdlen,
187 const char *inbuf, size_t inbuflen,
188 char **outbuf, size_t *outbuflen,
189 char **outs, size_t *outslen)
190 int rados_monitor_log(rados_t cluster, const char *level, rados_log_callback_t cb, void *arg)
191 int rados_monitor_log2(rados_t cluster, const char *level, rados_log_callback2_t cb, void *arg)
193 int rados_wait_for_latest_osdmap(rados_t cluster)
195 int rados_ioctx_create(rados_t cluster, const char *pool_name, rados_ioctx_t *ioctx)
196 void rados_ioctx_destroy(rados_ioctx_t io)
197 int rados_ioctx_pool_set_auid(rados_ioctx_t io, uint64_t auid)
198 void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key)
199 void rados_ioctx_set_namespace(rados_ioctx_t io, const char * nspace)
201 uint64_t rados_get_last_version(rados_ioctx_t io)
202 int rados_stat(rados_ioctx_t io, const char *o, uint64_t *psize, time_t *pmtime)
203 int rados_write(rados_ioctx_t io, const char *oid, const char *buf, size_t len, uint64_t off)
204 int rados_write_full(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
205 int rados_append(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
206 int rados_read(rados_ioctx_t io, const char *oid, char *buf, size_t len, uint64_t off)
207 int rados_remove(rados_ioctx_t io, const char *oid)
208 int rados_trunc(rados_ioctx_t io, const char *oid, uint64_t size)
209 int rados_getxattr(rados_ioctx_t io, const char *o, const char *name, char *buf, size_t len)
210 int rados_setxattr(rados_ioctx_t io, const char *o, const char *name, const char *buf, size_t len)
211 int rados_rmxattr(rados_ioctx_t io, const char *o, const char *name)
212 int rados_getxattrs(rados_ioctx_t io, const char *oid, rados_xattrs_iter_t *iter)
213 int rados_getxattrs_next(rados_xattrs_iter_t iter, const char **name, const char **val, size_t *len)
214 void rados_getxattrs_end(rados_xattrs_iter_t iter)
216 int rados_nobjects_list_open(rados_ioctx_t io, rados_list_ctx_t *ctx)
217 int rados_nobjects_list_next(rados_list_ctx_t ctx, const char **entry, const char **key, const char **nspace)
218 void rados_nobjects_list_close(rados_list_ctx_t ctx)
220 int rados_ioctx_snap_rollback(rados_ioctx_t io, const char * oid, const char * snapname)
221 int rados_ioctx_snap_create(rados_ioctx_t io, const char * snapname)
222 int rados_ioctx_snap_remove(rados_ioctx_t io, const char * snapname)
223 int rados_ioctx_snap_lookup(rados_ioctx_t io, const char * name, rados_snap_t * id)
224 int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id, char * name, int maxlen)
225 void rados_ioctx_snap_set_read(rados_ioctx_t io, rados_snap_t snap)
226 int rados_ioctx_snap_list(rados_ioctx_t io, rados_snap_t * snaps, int maxlen)
227 int rados_ioctx_snap_get_stamp(rados_ioctx_t io, rados_snap_t id, time_t * t)
229 int rados_ioctx_selfmanaged_snap_create(rados_ioctx_t io,
230 rados_snap_t *snapid)
231 int rados_ioctx_selfmanaged_snap_remove(rados_ioctx_t io,
233 int rados_ioctx_selfmanaged_snap_set_write_ctx(rados_ioctx_t io,
234 rados_snap_t snap_seq,
237 int rados_ioctx_selfmanaged_snap_rollback(rados_ioctx_t io, const char *oid,
240 int rados_lock_exclusive(rados_ioctx_t io, const char * oid, const char * name,
241 const char * cookie, const char * desc,
242 timeval * duration, uint8_t flags)
243 int rados_lock_shared(rados_ioctx_t io, const char * o, const char * name,
244 const char * cookie, const char * tag, const char * desc,
245 timeval * duration, uint8_t flags)
246 int rados_unlock(rados_ioctx_t io, const char * o, const char * name, const char * cookie)
248 rados_write_op_t rados_create_write_op()
249 void rados_release_write_op(rados_write_op_t write_op)
251 rados_read_op_t rados_create_read_op()
252 void rados_release_read_op(rados_read_op_t read_op)
254 int rados_aio_create_completion(void * cb_arg, rados_callback_t cb_complete, rados_callback_t cb_safe, rados_completion_t * pc)
255 void rados_aio_release(rados_completion_t c)
256 int rados_aio_stat(rados_ioctx_t io, const char *oid, rados_completion_t completion, uint64_t *psize, time_t *pmtime)
257 int rados_aio_write(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len, uint64_t off)
258 int rados_aio_append(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
259 int rados_aio_write_full(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
260 int rados_aio_remove(rados_ioctx_t io, const char * oid, rados_completion_t completion)
261 int rados_aio_read(rados_ioctx_t io, const char * oid, rados_completion_t completion, char * buf, size_t len, uint64_t off)
262 int rados_aio_flush(rados_ioctx_t io)
264 int rados_aio_get_return_value(rados_completion_t c)
265 int rados_aio_wait_for_complete_and_cb(rados_completion_t c)
266 int rados_aio_wait_for_safe_and_cb(rados_completion_t c)
267 int rados_aio_wait_for_complete(rados_completion_t c)
268 int rados_aio_wait_for_safe(rados_completion_t c)
269 int rados_aio_is_complete(rados_completion_t c)
270 int rados_aio_is_safe(rados_completion_t c)
272 int rados_exec(rados_ioctx_t io, const char * oid, const char * cls, const char * method,
273 const char * in_buf, size_t in_len, char * buf, size_t out_len)
274 int rados_aio_exec(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * cls, const char * method,
275 const char * in_buf, size_t in_len, char * buf, size_t out_len)
277 int rados_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, const char * oid, time_t * mtime, int flags)
278 int rados_aio_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, time_t *mtime, int flags)
279 void rados_write_op_omap_set(rados_write_op_t write_op, const char * const* keys, const char * const* vals, const size_t * lens, size_t num)
280 void rados_write_op_omap_rm_keys(rados_write_op_t write_op, const char * const* keys, size_t keys_len)
281 void rados_write_op_omap_clear(rados_write_op_t write_op)
282 void rados_write_op_set_flags(rados_write_op_t write_op, int flags)
284 void rados_write_op_create(rados_write_op_t write_op, int exclusive, const char *category)
285 void rados_write_op_append(rados_write_op_t write_op, const char *buffer, size_t len)
286 void rados_write_op_write_full(rados_write_op_t write_op, const char *buffer, size_t len)
287 void rados_write_op_write(rados_write_op_t write_op, const char *buffer, size_t len, uint64_t offset)
288 void rados_write_op_remove(rados_write_op_t write_op)
289 void rados_write_op_truncate(rados_write_op_t write_op, uint64_t offset)
290 void rados_write_op_zero(rados_write_op_t write_op, uint64_t offset, uint64_t len)
292 void rados_read_op_omap_get_vals2(rados_read_op_t read_op, const char * start_after, const char * filter_prefix, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
293 void rados_read_op_omap_get_keys2(rados_read_op_t read_op, const char * start_after, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
294 void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op, const char * const* keys, size_t keys_len, rados_omap_iter_t * iter, int * prval)
295 int rados_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, const char * oid, int flags)
296 int rados_aio_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, int flags)
297 void rados_read_op_set_flags(rados_read_op_t read_op, int flags)
298 int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len)
299 void rados_omap_get_end(rados_omap_iter_t iter)
302 LIBRADOS_OP_FLAG_EXCL = _LIBRADOS_OP_FLAG_EXCL
303 LIBRADOS_OP_FLAG_FAILOK = _LIBRADOS_OP_FLAG_FAILOK
304 LIBRADOS_OP_FLAG_FADVISE_RANDOM = _LIBRADOS_OP_FLAG_FADVISE_RANDOM
305 LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL = _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL
306 LIBRADOS_OP_FLAG_FADVISE_WILLNEED = _LIBRADOS_OP_FLAG_FADVISE_WILLNEED
307 LIBRADOS_OP_FLAG_FADVISE_DONTNEED = _LIBRADOS_OP_FLAG_FADVISE_DONTNEED
308 LIBRADOS_OP_FLAG_FADVISE_NOCACHE = _LIBRADOS_OP_FLAG_FADVISE_NOCACHE
310 LIBRADOS_SNAP_HEAD = _LIBRADOS_SNAP_HEAD
312 LIBRADOS_OPERATION_NOFLAG = _LIBRADOS_OPERATION_NOFLAG
313 LIBRADOS_OPERATION_BALANCE_READS = _LIBRADOS_OPERATION_BALANCE_READS
314 LIBRADOS_OPERATION_LOCALIZE_READS = _LIBRADOS_OPERATION_LOCALIZE_READS
315 LIBRADOS_OPERATION_ORDER_READS_WRITES = _LIBRADOS_OPERATION_ORDER_READS_WRITES
316 LIBRADOS_OPERATION_IGNORE_CACHE = _LIBRADOS_OPERATION_IGNORE_CACHE
317 LIBRADOS_OPERATION_SKIPRWLOCKS = _LIBRADOS_OPERATION_SKIPRWLOCKS
318 LIBRADOS_OPERATION_IGNORE_OVERLAY = _LIBRADOS_OPERATION_IGNORE_OVERLAY
320 LIBRADOS_ALL_NSPACES = _LIBRADOS_ALL_NSPACES.decode('utf-8')
322 LIBRADOS_CREATE_EXCLUSIVE = _LIBRADOS_CREATE_EXCLUSIVE
323 LIBRADOS_CREATE_IDEMPOTENT = _LIBRADOS_CREATE_IDEMPOTENT
325 ANONYMOUS_AUID = 0xffffffffffffffff
329 class Error(Exception):
330 """ `Error` class, derived from `Exception` """
334 class InvalidArgumentError(Error):
338 class OSError(Error):
339 """ `OSError` class, derived from `Error` """
340 def __init__(self, message, errno=None):
341 super(OSError, self).__init__(message)
345 msg = super(OSError, self).__str__()
346 if self.errno is None:
348 return '[errno {0}] {1}'.format(self.errno, msg)
350 def __reduce__(self):
351 return (self.__class__, (self.message, self.errno))
353 class InterruptedOrTimeoutError(OSError):
354 """ `InterruptedOrTimeoutError` class, derived from `OSError` """
358 class PermissionError(OSError):
359 """ `PermissionError` class, derived from `OSError` """
363 class PermissionDeniedError(OSError):
364 """ deal with EACCES related. """
368 class ObjectNotFound(OSError):
369 """ `ObjectNotFound` class, derived from `OSError` """
373 class NoData(OSError):
374 """ `NoData` class, derived from `OSError` """
378 class ObjectExists(OSError):
379 """ `ObjectExists` class, derived from `OSError` """
383 class ObjectBusy(OSError):
384 """ `ObjectBusy` class, derived from `IOError` """
388 class IOError(OSError):
389 """ `ObjectBusy` class, derived from `OSError` """
393 class NoSpace(OSError):
394 """ `NoSpace` class, derived from `OSError` """
398 class RadosStateError(Error):
399 """ `RadosStateError` class, derived from `Error` """
403 class IoctxStateError(Error):
404 """ `IoctxStateError` class, derived from `Error` """
408 class ObjectStateError(Error):
409 """ `ObjectStateError` class, derived from `Error` """
413 class LogicError(Error):
414 """ `` class, derived from `Error` """
418 class TimedOut(OSError):
419 """ `TimedOut` class, derived from `OSError` """
423 IF UNAME_SYSNAME == "FreeBSD":
424 cdef errno_to_exception = {
425 errno.EPERM : PermissionError,
426 errno.ENOENT : ObjectNotFound,
428 errno.ENOSPC : NoSpace,
429 errno.EEXIST : ObjectExists,
430 errno.EBUSY : ObjectBusy,
431 errno.ENOATTR : NoData,
432 errno.EINTR : InterruptedOrTimeoutError,
433 errno.ETIMEDOUT : TimedOut,
434 errno.EACCES : PermissionDeniedError,
435 errno.EINVAL : InvalidArgumentError,
438 cdef errno_to_exception = {
439 errno.EPERM : PermissionError,
440 errno.ENOENT : ObjectNotFound,
442 errno.ENOSPC : NoSpace,
443 errno.EEXIST : ObjectExists,
444 errno.EBUSY : ObjectBusy,
445 errno.ENODATA : NoData,
446 errno.EINTR : InterruptedOrTimeoutError,
447 errno.ETIMEDOUT : TimedOut,
448 errno.EACCES : PermissionDeniedError,
449 errno.EINVAL : InvalidArgumentError,
453 cdef make_ex(ret, msg):
455 Translate a librados return code into an exception.
457 :param ret: the return code
459 :param msg: the error message to use
461 :returns: a subclass of :class:`Error`
464 if ret in errno_to_exception:
465 return errno_to_exception[ret](msg, errno=ret)
467 return OSError(msg, errno=ret)
470 # helper to specify an optional argument, where in addition to `cls`, `None`
476 # validate argument types of an instance method
477 # kwargs is an un-ordered dict, so use args instead
478 def requires(*types):
479 def is_type_of(v, t):
483 return isinstance(v, t)
485 def check_type(val, arg_name, arg_type):
486 if isinstance(arg_type, tuple):
487 if any(is_type_of(val, t) for t in arg_type):
489 type_names = ' or '.join('None' if t is None else t.__name__
491 raise TypeError('%s must be %s' % (arg_name, type_names))
493 if is_type_of(val, arg_type):
495 assert(arg_type is not None)
496 raise TypeError('%s must be %s' % (arg_name, arg_type.__name__))
499 # FIXME(sileht): this stop with
500 # AttributeError: 'method_descriptor' object has no attribute '__module__'
502 def validate_func(*args, **kwargs):
503 # ignore the `self` arg
504 pos_args = zip(args[1:], types)
505 named_args = ((kwargs[name], (name, spec)) for name, spec in types
507 for arg_val, (arg_name, arg_type) in chain(pos_args, named_args):
508 check_type(arg_val, arg_name, arg_type)
509 return f(*args, **kwargs)
514 def cstr(val, name, encoding="utf-8", opt=False):
516 Create a byte string from a Python string
518 :param basestring val: Python string
519 :param str name: Name of the string parameter, for exceptions
520 :param str encoding: Encoding to use
521 :param bool opt: If True, None is allowed
523 :raises: :class:`InvalidArgument`
525 if opt and val is None:
527 if isinstance(val, bytes):
529 elif isinstance(val, unicode):
530 return val.encode(encoding)
532 raise TypeError('%s must be a string' % name)
535 def cstr_list(list_str, name, encoding="utf-8"):
536 return [cstr(s, name) for s in list_str]
539 def decode_cstr(val, encoding="utf-8"):
541 Decode a byte string into a Python string.
543 :param bytes val: byte string
544 :rtype: unicode or None
549 return val.decode(encoding)
552 cdef char* opt_str(s) except? NULL:
558 cdef void* realloc_chk(void* ptr, size_t size) except NULL:
559 cdef void *ret = realloc(ptr, size)
561 raise MemoryError("realloc failed")
565 cdef size_t * to_csize_t_array(list_int):
566 cdef size_t *ret = <size_t *>malloc(len(list_int) * sizeof(size_t))
568 raise MemoryError("malloc failed")
569 for i in xrange(len(list_int)):
570 ret[i] = <size_t>list_int[i]
574 cdef char ** to_bytes_array(list_bytes):
575 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
577 raise MemoryError("malloc failed")
578 for i in xrange(len(list_bytes)):
579 ret[i] = <char *>list_bytes[i]
584 cdef int __monitor_callback(void *arg, const char *line, const char *who,
585 uint64_t sec, uint64_t nsec, uint64_t seq,
586 const char *level, const char *msg) with gil:
587 cdef object cb_info = <object>arg
588 cb_info[0](cb_info[1], line, who, sec, nsec, seq, level, msg)
591 cdef int __monitor_callback2(void *arg, const char *line, const char *channel,
594 uint64_t sec, uint64_t nsec, uint64_t seq,
595 const char *level, const char *msg) with gil:
596 cdef object cb_info = <object>arg
597 cb_info[0](cb_info[1], line, channel, name, who, sec, nsec, seq, level, msg)
601 class Version(object):
602 """ Version information """
603 def __init__(self, major, minor, extra):
609 return "%d.%d.%d" % (self.major, self.minor, self.extra)
612 cdef class Rados(object):
613 """This class wraps librados functions"""
614 # NOTE(sileht): attributes declared in .pyd
616 def __init__(self, *args, **kwargs):
618 self.__setup(*args, **kwargs)
620 @requires(('rados_id', opt(str_type)), ('name', opt(str_type)), ('clustername', opt(str_type)),
621 ('conffile', opt(str_type)))
622 def __setup(self, rados_id=None, name=None, clustername=None,
623 conf_defaults=None, conffile=None, conf=None, flags=0,
625 self.monitor_callback = None
626 self.monitor_callback2 = None
627 self.parsed_args = []
628 self.conf_defaults = conf_defaults
629 self.conffile = conffile
630 self.rados_id = rados_id
632 if rados_id and name:
633 raise Error("Rados(): can't supply both rados_id and name")
635 name = 'client.' + rados_id
637 name = 'client.admin'
638 if clustername is None:
641 name = cstr(name, 'name')
642 clustername = cstr(clustername, 'clustername')
645 char *_clustername = clustername
650 # Unpack void* (aka rados_config_t) from capsule
651 rados_config = <rados_config_t> PyCapsule_GetPointer(context, NULL)
653 ret = rados_create_with_context(&self.cluster, rados_config)
656 ret = rados_create2(&self.cluster, _clustername, _name, _flags)
658 raise Error("rados_initialize failed with error code: %d" % ret)
660 self.state = "configuring"
661 # order is important: conf_defaults, then conffile, then conf
663 for key, value in conf_defaults.items():
664 self.conf_set(key, value)
665 if conffile is not None:
666 # read the default conf file when '' is given
669 self.conf_read_file(conffile)
671 for key, value in conf.items():
672 self.conf_set(key, value)
674 def require_state(self, *args):
676 Checks if the Rados object is in a special state
678 :raises: RadosStateError
680 if self.state in args:
682 raise RadosStateError("You cannot perform that operation on a \
683 Rados object in state %s." % self.state)
687 Disconnects from the cluster. Call this explicitly when a
688 Rados.connect()ed object is no longer used.
690 if self.state != "shutdown":
692 rados_shutdown(self.cluster)
693 self.state = "shutdown"
699 def __exit__(self, type_, value, traceback):
705 Get the version number of the ``librados`` C library.
707 :returns: a tuple of ``(major, minor, extra)`` components of the
714 rados_version(&major, &minor, &extra)
715 return Version(major, minor, extra)
717 @requires(('path', opt(str_type)))
718 def conf_read_file(self, path=None):
720 Configure the cluster handle using a Ceph config file.
722 :param path: path to the config file
725 self.require_state("configuring", "connected")
726 path = cstr(path, 'path', opt=True)
728 char *_path = opt_str(path)
730 ret = rados_conf_read_file(self.cluster, _path)
732 raise make_ex(ret, "error calling conf_read_file")
734 def conf_parse_argv(self, args):
736 Parse known arguments from args, and remove; returned
737 args contain only those unknown to ceph
739 self.require_state("configuring", "connected")
743 cargs = cstr_list(args, 'args')
745 int _argc = len(args)
746 char **_argv = to_bytes_array(cargs)
747 char **_remargv = NULL
750 _remargv = <char **>malloc(_argc * sizeof(char *))
752 ret = rados_conf_parse_argv_remainder(self.cluster, _argc,
754 <const char**>_remargv)
756 raise make_ex(ret, "error calling conf_parse_argv_remainder")
758 # _remargv was allocated with fixed argc; collapse return
759 # list to eliminate any missing args
760 retargs = [decode_cstr(a) for a in _remargv[:_argc]
762 self.parsed_args = args
768 def conf_parse_env(self, var='CEPH_ARGS'):
770 Parse known arguments from an environment variable, normally
773 self.require_state("configuring", "connected")
777 var = cstr(var, 'var')
781 ret = rados_conf_parse_env(self.cluster, _var)
783 raise make_ex(ret, "error calling conf_parse_env")
785 @requires(('option', str_type))
786 def conf_get(self, option):
788 Get the value of a configuration option
790 :param option: which option to read
793 :returns: str - value of the option or None
794 :raises: :class:`TypeError`
796 self.require_state("configuring", "connected")
797 option = cstr(option, 'option')
799 char *_option = option
805 ret_buf = <char *>realloc_chk(ret_buf, length)
807 ret = rados_conf_get(self.cluster, _option, ret_buf, length)
809 return decode_cstr(ret_buf)
810 elif ret == -errno.ENAMETOOLONG:
812 elif ret == -errno.ENOENT:
815 raise make_ex(ret, "error calling conf_get")
819 @requires(('option', str_type), ('val', str_type))
820 def conf_set(self, option, val):
822 Set the value of a configuration option
824 :param option: which option to set
826 :param option: value of the option
829 :raises: :class:`TypeError`, :class:`ObjectNotFound`
831 self.require_state("configuring", "connected")
832 option = cstr(option, 'option')
833 val = cstr(val, 'val')
835 char *_option = option
839 ret = rados_conf_set(self.cluster, _option, _val)
841 raise make_ex(ret, "error calling conf_set")
843 def ping_monitor(self, mon_id):
845 Ping a monitor to assess liveness
847 May be used as a simply way to assess liveness, or to obtain
848 information about the monitor in a simple way even in the
851 :param mon_id: the ID portion of the monitor's name (i.e., mon.<ID>)
853 :returns: the string reply from the monitor
856 self.require_state("configuring", "connected")
858 mon_id = cstr(mon_id, 'mon_id')
860 char *_mon_id = mon_id
865 ret = rados_ping_monitor(self.cluster, _mon_id, &outstr, &outstrlen)
868 raise make_ex(ret, "error calling ping_monitor")
871 my_outstr = outstr[:outstrlen]
872 rados_buffer_free(outstr)
873 return decode_cstr(my_outstr)
875 def connect(self, timeout=0):
877 Connect to the cluster. Use shutdown() to release resources.
879 self.require_state("configuring")
880 # NOTE(sileht): timeout was supported by old python API,
881 # but this is not something available in C API, so ignore
882 # for now and remove it later
884 ret = rados_connect(self.cluster)
886 raise make_ex(ret, "error connecting to the cluster")
887 self.state = "connected"
889 def get_cluster_stats(self):
891 Read usage info about the cluster
893 This tells you total space, space used, space available, and number
894 of objects. These are not updated immediately when data is written,
895 they are eventually consistent.
897 :returns: dict - contains the following keys:
899 - ``kb`` (int) - total space
901 - ``kb_used`` (int) - space used
903 - ``kb_avail`` (int) - free space available
905 - ``num_objects`` (int) - number of objects
909 rados_cluster_stat_t stats
912 ret = rados_cluster_stat(self.cluster, &stats)
916 ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id)
917 return {'kb': stats.kb,
918 'kb_used': stats.kb_used,
919 'kb_avail': stats.kb_avail,
920 'num_objects': stats.num_objects}
922 @requires(('pool_name', str_type))
923 def pool_exists(self, pool_name):
925 Checks if a given pool exists.
927 :param pool_name: name of the pool to check
930 :raises: :class:`TypeError`, :class:`Error`
931 :returns: bool - whether the pool exists, false otherwise.
933 self.require_state("connected")
935 pool_name = cstr(pool_name, 'pool_name')
937 char *_pool_name = pool_name
940 ret = rados_pool_lookup(self.cluster, _pool_name)
943 elif ret == -errno.ENOENT:
946 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
948 @requires(('pool_name', str_type))
949 def pool_lookup(self, pool_name):
951 Returns a pool's ID based on its name.
953 :param pool_name: name of the pool to look up
956 :raises: :class:`TypeError`, :class:`Error`
957 :returns: int - pool ID, or None if it doesn't exist
959 self.require_state("connected")
960 pool_name = cstr(pool_name, 'pool_name')
962 char *_pool_name = pool_name
965 ret = rados_pool_lookup(self.cluster, _pool_name)
968 elif ret == -errno.ENOENT:
971 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
973 @requires(('pool_id', int))
974 def pool_reverse_lookup(self, pool_id):
976 Returns a pool's name based on its ID.
978 :param pool_id: ID of the pool to look up
981 :raises: :class:`TypeError`, :class:`Error`
982 :returns: string - pool name, or None if it doesn't exist
984 self.require_state("connected")
986 int64_t _pool_id = pool_id
992 name = <char *>realloc_chk(name, size)
994 ret = rados_pool_reverse_lookup(self.cluster, _pool_id, name, size)
997 elif ret != -errno.ERANGE and size <= 4096:
999 elif ret == -errno.ENOENT:
1002 raise make_ex(ret, "error reverse looking up pool '%s'" % pool_id)
1004 return decode_cstr(name)
1009 @requires(('pool_name', str_type), ('auid', opt(int)), ('crush_rule', opt(int)))
1010 def create_pool(self, pool_name, auid=None, crush_rule=None):
1013 - with default settings: if auid=None and crush_rule=None
1014 - owned by a specific auid: auid given and crush_rule=None
1015 - with a specific CRUSH rule: if auid=None and crush_rule given
1016 - with a specific CRUSH rule and auid: if auid and crush_rule given
1018 :param pool_name: name of the pool to create
1019 :type pool_name: str
1020 :param auid: the id of the owner of the new pool
1022 :param crush_rule: rule to use for placement in the new pool
1023 :type crush_rule: int
1025 :raises: :class:`TypeError`, :class:`Error`
1027 self.require_state("connected")
1029 pool_name = cstr(pool_name, 'pool_name')
1031 char *_pool_name = pool_name
1035 if auid is None and crush_rule is None:
1037 ret = rados_pool_create(self.cluster, _pool_name)
1039 _crush_rule = crush_rule
1041 ret = rados_pool_create_with_crush_rule(self.cluster, _pool_name, _crush_rule)
1042 elif crush_rule is None:
1045 ret = rados_pool_create_with_auid(self.cluster, _pool_name, _auid)
1048 _crush_rule = crush_rule
1050 ret = rados_pool_create_with_all(self.cluster, _pool_name, _auid, _crush_rule)
1052 raise make_ex(ret, "error creating pool '%s'" % pool_name)
1054 @requires(('pool_id', int))
1055 def get_pool_base_tier(self, pool_id):
1059 :returns: base pool, or pool_id if tiering is not configured for the pool
1061 self.require_state("connected")
1063 int64_t base_tier = 0
1064 int64_t _pool_id = pool_id
1067 ret = rados_pool_get_base_tier(self.cluster, _pool_id, &base_tier)
1069 raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id)
1070 return int(base_tier)
1072 @requires(('pool_name', str_type))
1073 def delete_pool(self, pool_name):
1075 Delete a pool and all data inside it.
1077 The pool is removed from the cluster immediately,
1078 but the actual data is deleted in the background.
1080 :param pool_name: name of the pool to delete
1081 :type pool_name: str
1083 :raises: :class:`TypeError`, :class:`Error`
1085 self.require_state("connected")
1087 pool_name = cstr(pool_name, 'pool_name')
1089 char *_pool_name = pool_name
1092 ret = rados_pool_delete(self.cluster, _pool_name)
1094 raise make_ex(ret, "error deleting pool '%s'" % pool_name)
1096 @requires(('pool_id', int))
1097 def get_inconsistent_pgs(self, pool_id):
1099 List inconsistent placement groups in the given pool
1101 :param pool_id: ID of the pool in which PGs are listed
1103 :returns: list - inconsistent placement groups
1105 self.require_state("connected")
1107 int64_t pool = pool_id
1113 pgs = <char *>realloc_chk(pgs, size);
1115 ret = rados_inconsistent_pg_list(self.cluster, pool,
1122 raise make_ex(ret, "error calling inconsistent_pg_list")
1123 return [pg for pg in decode_cstr(pgs[:ret]).split('\0') if pg]
1127 def list_pools(self):
1129 Gets a list of pool names.
1131 :returns: list - of pool names.
1133 self.require_state("connected")
1136 char *c_names = NULL
1140 c_names = <char *>realloc_chk(c_names, size)
1142 ret = rados_pool_list(self.cluster, c_names, size)
1147 return [name for name in decode_cstr(c_names[:ret]).split('\0')
1154 Get the fsid of the cluster as a hexadecimal string.
1156 :raises: :class:`Error`
1157 :returns: str - cluster fsid
1159 self.require_state("connected")
1163 PyObject* ret_s = NULL
1165 ret_s = PyBytes_FromStringAndSize(NULL, buf_len)
1167 ret_buf = PyBytes_AsString(ret_s)
1169 ret = rados_cluster_fsid(self.cluster, ret_buf, buf_len)
1171 raise make_ex(ret, "error getting cluster fsid")
1172 if ret != <int>buf_len:
1173 _PyBytes_Resize(&ret_s, ret)
1174 return <object>ret_s
1176 # We DECREF unconditionally: the cast to object above will have
1177 # INCREFed if necessary. This also takes care of exceptions,
1178 # including if _PyString_Resize fails (that will free the string
1179 # itself and set ret_s to NULL, hence XDECREF).
1180 ref.Py_XDECREF(ret_s)
1182 @requires(('ioctx_name', str_type))
1183 def open_ioctx(self, ioctx_name):
1185 Create an io context
1187 The io context allows you to perform operations within a particular
1190 :param ioctx_name: name of the pool
1191 :type ioctx_name: str
1193 :raises: :class:`TypeError`, :class:`Error`
1194 :returns: Ioctx - Rados Ioctx object
1196 self.require_state("connected")
1197 ioctx_name = cstr(ioctx_name, 'ioctx_name')
1200 char *_ioctx_name = ioctx_name
1202 ret = rados_ioctx_create(self.cluster, _ioctx_name, &ioctx)
1204 raise make_ex(ret, "error opening pool '%s'" % ioctx_name)
1205 io = Ioctx(ioctx_name)
1209 def mon_command(self, cmd, inbuf, timeout=0, target=None):
1211 mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen)
1212 returns (int ret, string outbuf, string outs)
1214 # NOTE(sileht): timeout is ignored because C API doesn't provide
1215 # timeout argument, but we keep it for backward compat with old python binding
1217 self.require_state("connected")
1218 cmd = cstr_list(cmd, 'c')
1220 if isinstance(target, int):
1221 # NOTE(sileht): looks weird but test_monmap_dump pass int
1222 target = str(target)
1224 target = cstr(target, 'target', opt=True)
1225 inbuf = cstr(inbuf, 'inbuf')
1228 char *_target = opt_str(target)
1229 char **_cmd = to_bytes_array(cmd)
1230 size_t _cmdlen = len(cmd)
1232 char *_inbuf = inbuf
1233 size_t _inbuf_len = len(inbuf)
1243 ret = rados_mon_command_target(self.cluster, _target,
1244 <const char **>_cmd, _cmdlen,
1245 <const char*>_inbuf, _inbuf_len,
1246 &_outbuf, &_outbuf_len,
1250 ret = rados_mon_command(self.cluster,
1251 <const char **>_cmd, _cmdlen,
1252 <const char*>_inbuf, _inbuf_len,
1253 &_outbuf, &_outbuf_len,
1256 my_outs = decode_cstr(_outs[:_outs_len])
1257 my_outbuf = _outbuf[:_outbuf_len]
1259 rados_buffer_free(_outs)
1261 rados_buffer_free(_outbuf)
1262 return (ret, my_outbuf, my_outs)
1266 def osd_command(self, osdid, cmd, inbuf, timeout=0):
1268 osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1269 returns (int ret, string outbuf, string outs)
1271 # NOTE(sileht): timeout is ignored because C API doesn't provide
1272 # timeout argument, but we keep it for backward compat with old python binding
1273 self.require_state("connected")
1275 cmd = cstr_list(cmd, 'cmd')
1276 inbuf = cstr(inbuf, 'inbuf')
1280 char **_cmd = to_bytes_array(cmd)
1281 size_t _cmdlen = len(cmd)
1283 char *_inbuf = inbuf
1284 size_t _inbuf_len = len(inbuf)
1293 ret = rados_osd_command(self.cluster, _osdid,
1294 <const char **>_cmd, _cmdlen,
1295 <const char*>_inbuf, _inbuf_len,
1296 &_outbuf, &_outbuf_len,
1299 my_outs = decode_cstr(_outs[:_outs_len])
1300 my_outbuf = _outbuf[:_outbuf_len]
1302 rados_buffer_free(_outs)
1304 rados_buffer_free(_outbuf)
1305 return (ret, my_outbuf, my_outs)
1309 def mgr_command(self, cmd, inbuf, timeout=0):
1311 returns (int ret, string outbuf, string outs)
1313 # NOTE(sileht): timeout is ignored because C API doesn't provide
1314 # timeout argument, but we keep it for backward compat with old python binding
1315 self.require_state("connected")
1317 cmd = cstr_list(cmd, 'cmd')
1318 inbuf = cstr(inbuf, 'inbuf')
1321 char **_cmd = to_bytes_array(cmd)
1322 size_t _cmdlen = len(cmd)
1324 char *_inbuf = inbuf
1325 size_t _inbuf_len = len(inbuf)
1334 ret = rados_mgr_command(self.cluster,
1335 <const char **>_cmd, _cmdlen,
1336 <const char*>_inbuf, _inbuf_len,
1337 &_outbuf, &_outbuf_len,
1340 my_outs = decode_cstr(_outs[:_outs_len])
1341 my_outbuf = _outbuf[:_outbuf_len]
1343 rados_buffer_free(_outs)
1345 rados_buffer_free(_outbuf)
1346 return (ret, my_outbuf, my_outs)
1350 def pg_command(self, pgid, cmd, inbuf, timeout=0):
1352 pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1353 returns (int ret, string outbuf, string outs)
1355 # NOTE(sileht): timeout is ignored because C API doesn't provide
1356 # timeout argument, but we keep it for backward compat with old python binding
1357 self.require_state("connected")
1359 pgid = cstr(pgid, 'pgid')
1360 cmd = cstr_list(cmd, 'cmd')
1361 inbuf = cstr(inbuf, 'inbuf')
1365 char **_cmd = to_bytes_array(cmd)
1366 size_t _cmdlen = len(cmd)
1368 char *_inbuf = inbuf
1369 size_t _inbuf_len = len(inbuf)
1378 ret = rados_pg_command(self.cluster, _pgid,
1379 <const char **>_cmd, _cmdlen,
1380 <const char *>_inbuf, _inbuf_len,
1381 &_outbuf, &_outbuf_len,
1384 my_outs = decode_cstr(_outs[:_outs_len])
1385 my_outbuf = _outbuf[:_outbuf_len]
1387 rados_buffer_free(_outs)
1389 rados_buffer_free(_outbuf)
1390 return (ret, my_outbuf, my_outs)
1394 def wait_for_latest_osdmap(self):
1395 self.require_state("connected")
1397 ret = rados_wait_for_latest_osdmap(self.cluster)
1400 def blacklist_add(self, client_address, expire_seconds=0):
1402 Blacklist a client from the OSDs
1404 :param client_address: client address
1405 :type client_address: str
1406 :param expire_seconds: number of seconds to blacklist
1407 :type expire_seconds: int
1409 :raises: :class:`Error`
1411 self.require_state("connected")
1412 client_address = cstr(client_address, 'client_address')
1414 uint32_t _expire_seconds = expire_seconds
1415 char *_client_address = client_address
1418 ret = rados_blacklist_add(self.cluster, _client_address, _expire_seconds)
1420 raise make_ex(ret, "error blacklisting client '%s'" % client_address)
1422 def monitor_log(self, level, callback, arg):
1423 if level not in MONITOR_LEVELS:
1424 raise LogicError("invalid monitor level " + level)
1425 if callback is not None and not callable(callback):
1426 raise LogicError("callback must be a callable function or None")
1428 level = cstr(level, 'level')
1429 cdef char *_level = level
1431 if callback is None:
1433 r = rados_monitor_log(self.cluster, <const char*>_level, NULL, NULL)
1434 self.monitor_callback = None
1435 self.monitor_callback2 = None
1438 cb = (callback, arg)
1439 cdef PyObject* _arg = <PyObject*>cb
1441 r = rados_monitor_log(self.cluster, <const char*>_level,
1442 <rados_log_callback_t>&__monitor_callback, _arg)
1445 raise make_ex(r, 'error calling rados_monitor_log')
1446 # NOTE(sileht): Prevents the callback method from being garbage collected
1447 self.monitor_callback = cb
1448 self.monitor_callback2 = None
1450 def monitor_log2(self, level, callback, arg):
1451 if level not in MONITOR_LEVELS:
1452 raise LogicError("invalid monitor level " + level)
1453 if callback is not None and not callable(callback):
1454 raise LogicError("callback must be a callable function or None")
1456 level = cstr(level, 'level')
1457 cdef char *_level = level
1459 if callback is None:
1461 r = rados_monitor_log2(self.cluster, <const char*>_level, NULL, NULL)
1462 self.monitor_callback = None
1463 self.monitor_callback2 = None
1466 cb = (callback, arg)
1467 cdef PyObject* _arg = <PyObject*>cb
1469 r = rados_monitor_log2(self.cluster, <const char*>_level,
1470 <rados_log_callback2_t>&__monitor_callback2, _arg)
1473 raise make_ex(r, 'error calling rados_monitor_log')
1474 # NOTE(sileht): Prevents the callback method from being garbage collected
1475 self.monitor_callback = None
1476 self.monitor_callback2 = cb
1479 cdef class OmapIterator(object):
1482 cdef public Ioctx ioctx
1483 cdef rados_omap_iter_t ctx
1485 def __cinit__(self, Ioctx ioctx):
1493 Get the next key-value pair in the object
1494 :returns: next rados.OmapItem
1502 ret = rados_omap_get_next(self.ctx, &key_, &val_, &len_)
1505 raise make_ex(ret, "error iterating over the omap")
1507 raise StopIteration()
1508 key = decode_cstr(key_)
1514 def __dealloc__(self):
1516 rados_omap_get_end(self.ctx)
1519 cdef class ObjectIterator(object):
1520 """rados.Ioctx Object iterator"""
1522 cdef rados_list_ctx_t ctx
1524 cdef public object ioctx
1526 def __cinit__(self, Ioctx ioctx):
1530 ret = rados_nobjects_list_open(ioctx.io, &self.ctx)
1532 raise make_ex(ret, "error iterating over the objects in ioctx '%s'"
1540 Get the next object name and locator in the pool
1542 :raises: StopIteration
1543 :returns: next rados.Ioctx Object
1546 const char *key_ = NULL
1547 const char *locator_ = NULL
1548 const char *nspace_ = NULL
1551 ret = rados_nobjects_list_next(self.ctx, &key_, &locator_, &nspace_)
1554 raise StopIteration()
1556 key = decode_cstr(key_)
1557 locator = decode_cstr(locator_) if locator_ != NULL else None
1558 nspace = decode_cstr(nspace_) if nspace_ != NULL else None
1559 return Object(self.ioctx, key, locator, nspace)
1561 def __dealloc__(self):
1563 rados_nobjects_list_close(self.ctx)
1566 cdef class XattrIterator(object):
1567 """Extended attribute iterator"""
1569 cdef rados_xattrs_iter_t it
1572 cdef public Ioctx ioctx
1573 cdef public object oid
1575 def __cinit__(self, Ioctx ioctx, oid):
1577 self.oid = cstr(oid, 'oid')
1578 self._oid = self.oid
1581 ret = rados_getxattrs(ioctx.io, self._oid, &self.it)
1583 raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid)
1590 Get the next xattr on the object
1592 :raises: StopIteration
1593 :returns: pair - of name and value of the next Xattr
1596 const char *name_ = NULL
1597 const char *val_ = NULL
1601 ret = rados_getxattrs_next(self.it, &name_, &val_, &len_)
1603 raise make_ex(ret, "error iterating over the extended attributes \
1604 in '%s'" % self.oid)
1606 raise StopIteration()
1607 name = decode_cstr(name_)
1611 def __dealloc__(self):
1613 rados_getxattrs_end(self.it)
1616 cdef class SnapIterator(object):
1617 """Snapshot iterator"""
1619 cdef public Ioctx ioctx
1621 cdef rados_snap_t *snaps
1625 def __cinit__(self, Ioctx ioctx):
1627 # We don't know how big a buffer we need until we've called the
1628 # function. So use the exponential doubling strategy.
1629 cdef int num_snaps = 10
1631 self.snaps = <rados_snap_t*>realloc_chk(self.snaps,
1633 sizeof(rados_snap_t))
1636 ret = rados_ioctx_snap_list(ioctx.io, self.snaps, num_snaps)
1640 elif ret != -errno.ERANGE:
1641 raise make_ex(ret, "error calling rados_snap_list for \
1642 ioctx '%s'" % self.ioctx.name)
1643 num_snaps = num_snaps * 2
1651 Get the next Snapshot
1653 :raises: :class:`Error`, StopIteration
1654 :returns: Snap - next snapshot
1656 if self.cur_snap >= self.max_snap:
1660 rados_snap_t snap_id = self.snaps[self.cur_snap]
1666 name = <char *>realloc_chk(name, name_len)
1668 ret = rados_ioctx_snap_get_name(self.ioctx.io, snap_id, name, name_len)
1671 elif ret != -errno.ERANGE:
1672 raise make_ex(ret, "rados_snap_get_name error")
1674 name_len = name_len * 2
1676 snap = Snap(self.ioctx, decode_cstr(name[:name_len]).rstrip('\0'), snap_id)
1677 self.cur_snap = self.cur_snap + 1
1683 cdef class Snap(object):
1684 """Snapshot object"""
1685 cdef public Ioctx ioctx
1686 cdef public object name
1688 # NOTE(sileht): old API was storing the ctypes object
1689 # instead of the value ....
1690 cdef public rados_snap_t snap_id
1692 def __cinit__(self, Ioctx ioctx, object name, rados_snap_t snap_id):
1695 self.snap_id = snap_id
1698 return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \
1699 % (str(self.ioctx), self.name, self.snap_id)
1701 def get_timestamp(self):
1703 Find when a snapshot in the current pool occurred
1705 :raises: :class:`Error`
1706 :returns: datetime - the data and time the snapshot was created
1708 cdef time_t snap_time
1711 ret = rados_ioctx_snap_get_stamp(self.ioctx.io, self.snap_id, &snap_time)
1713 raise make_ex(ret, "rados_ioctx_snap_get_stamp error")
1714 return datetime.fromtimestamp(snap_time)
1717 cdef class Completion(object):
1718 """completion object"""
1726 rados_callback_t complete_cb
1727 rados_callback_t safe_cb
1728 rados_completion_t rados_comp
1731 def __cinit__(self, Ioctx ioctx, object oncomplete, object onsafe):
1732 self.oncomplete = oncomplete
1733 self.onsafe = onsafe
1738 Is an asynchronous operation safe?
1740 This does not imply that the safe callback has finished.
1742 :returns: True if the operation is safe
1745 ret = rados_aio_is_safe(self.rados_comp)
1748 def is_complete(self):
1750 Has an asynchronous operation completed?
1752 This does not imply that the safe callback has finished.
1754 :returns: True if the operation is completed
1757 ret = rados_aio_is_complete(self.rados_comp)
1760 def wait_for_safe(self):
1762 Wait for an asynchronous operation to be marked safe
1764 This does not imply that the safe callback has finished.
1767 rados_aio_wait_for_safe(self.rados_comp)
1769 def wait_for_complete(self):
1771 Wait for an asynchronous operation to complete
1773 This does not imply that the complete callback has finished.
1776 rados_aio_wait_for_complete(self.rados_comp)
1778 def wait_for_safe_and_cb(self):
1780 Wait for an asynchronous operation to be marked safe and for
1781 the safe callback to have returned
1784 rados_aio_wait_for_safe_and_cb(self.rados_comp)
1786 def wait_for_complete_and_cb(self):
1788 Wait for an asynchronous operation to complete and for the
1789 complete callback to have returned
1791 :returns: whether the operation is completed
1794 ret = rados_aio_wait_for_complete_and_cb(self.rados_comp)
1797 def get_return_value(self):
1799 Get the return value of an asychronous operation
1801 The return value is set when the operation is complete or safe,
1802 whichever comes first.
1804 :returns: int - return value of the operation
1807 ret = rados_aio_get_return_value(self.rados_comp)
1810 def __dealloc__(self):
1812 Release a completion
1814 Call this when you no longer need the completion. It may not be
1815 freed immediately if the operation is not acked and committed.
1817 ref.Py_XDECREF(self.buf)
1819 if self.rados_comp != NULL:
1821 rados_aio_release(self.rados_comp)
1822 self.rados_comp = NULL
1824 def _complete(self):
1825 self.oncomplete(self)
1826 with self.ioctx.lock:
1828 self.ioctx.complete_completions.remove(self)
1832 with self.ioctx.lock:
1834 self.ioctx.safe_completions.remove(self)
1837 with self.ioctx.lock:
1839 self.ioctx.complete_completions.remove(self)
1841 self.ioctx.safe_completions.remove(self)
1844 class OpCtx(object):
1845 def __enter__(self):
1846 return self.create()
1848 def __exit__(self, type, msg, traceback):
1852 cdef class WriteOp(object):
1853 cdef rados_write_op_t write_op
1857 self.write_op = rados_create_write_op()
1862 rados_release_write_op(self.write_op)
1864 @requires(('exclusive', opt(int)))
1865 def new(self, exclusive=None):
1871 int _exclusive = exclusive
1874 rados_write_op_create(self.write_op, _exclusive, NULL)
1882 rados_write_op_remove(self.write_op)
1884 @requires(('flags', int))
1885 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
1887 Set flags for the last operation added to this write_op.
1888 :para flags: flags to apply to the last operation
1896 rados_write_op_set_flags(self.write_op, _flags)
1898 @requires(('to_write', bytes))
1899 def append(self, to_write):
1901 Append data to an object synchronously
1902 :param to_write: data to write
1903 :type to_write: bytes
1907 char *_to_write = to_write
1908 size_t length = len(to_write)
1911 rados_write_op_append(self.write_op, _to_write, length)
1913 @requires(('to_write', bytes))
1914 def write_full(self, to_write):
1916 Write whole object, atomically replacing it.
1917 :param to_write: data to write
1918 :type to_write: bytes
1922 char *_to_write = to_write
1923 size_t length = len(to_write)
1926 rados_write_op_write_full(self.write_op, _to_write, length)
1928 @requires(('to_write', bytes), ('offset', int))
1929 def write(self, to_write, offset=0):
1932 :param to_write: data to write
1933 :type to_write: bytes
1934 :param offset: byte offset in the object to begin writing at
1939 char *_to_write = to_write
1940 size_t length = len(to_write)
1941 uint64_t _offset = offset
1944 rados_write_op_write(self.write_op, _to_write, length, _offset)
1946 @requires(('offset', int), ('length', int))
1947 def zero(self, offset, length):
1949 Zero part of an object.
1950 :param offset: byte offset in the object to begin writing at
1952 :param offset: number of zero to write
1957 size_t _length = length
1958 uint64_t _offset = offset
1961 rados_write_op_zero(self.write_op, _length, _offset)
1963 @requires(('offset', int))
1964 def truncate(self, offset):
1967 :param offset: byte offset in the object to begin truncating at
1972 uint64_t _offset = offset
1975 rados_write_op_truncate(self.write_op, _offset)
1978 class WriteOpCtx(WriteOp, OpCtx):
1979 """write operation context manager"""
1982 cdef class ReadOp(object):
1983 cdef rados_read_op_t read_op
1987 self.read_op = rados_create_read_op()
1992 rados_release_read_op(self.read_op)
1994 @requires(('flags', int))
1995 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
1997 Set flags for the last operation added to this read_op.
1998 :para flags: flags to apply to the last operation
2006 rados_read_op_set_flags(self.read_op, _flags)
2009 class ReadOpCtx(ReadOp, OpCtx):
2010 """read operation context manager"""
2013 cdef int __aio_safe_cb(rados_completion_t completion, void *args) with gil:
2015 Callback to onsafe() for asynchronous operations
2017 cdef object cb = <object>args
2022 cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
2024 Callback to oncomplete() for asynchronous operations
2026 cdef object cb = <object>args
2031 cdef class Ioctx(object):
2032 """rados.Ioctx object"""
2033 # NOTE(sileht): attributes declared in .pyd
2035 def __init__(self, name):
2039 self.locator_key = ""
2041 self.lock = threading.Lock()
2042 self.safe_completions = []
2043 self.complete_completions = []
2045 def __enter__(self):
2048 def __exit__(self, type_, value, traceback):
2052 def __dealloc__(self):
2055 def __track_completion(self, completion_obj):
2056 if completion_obj.oncomplete:
2058 self.complete_completions.append(completion_obj)
2059 if completion_obj.onsafe:
2061 self.safe_completions.append(completion_obj)
2063 def __get_completion(self, oncomplete, onsafe):
2065 Constructs a completion to use with asynchronous operations
2067 :param oncomplete: what to do when the write is safe and complete in memory
2069 :type oncomplete: completion
2070 :param onsafe: what to do when the write is safe and complete on storage
2072 :type onsafe: completion
2074 :raises: :class:`Error`
2075 :returns: completion object
2078 completion_obj = Completion(self, oncomplete, onsafe)
2081 rados_callback_t complete_cb = NULL
2082 rados_callback_t safe_cb = NULL
2083 rados_completion_t completion
2084 PyObject* p_completion_obj= <PyObject*>completion_obj
2087 complete_cb = <rados_callback_t>&__aio_complete_cb
2089 safe_cb = <rados_callback_t>&__aio_safe_cb
2092 ret = rados_aio_create_completion(p_completion_obj, complete_cb, safe_cb,
2095 raise make_ex(ret, "error getting a completion")
2097 completion_obj.rados_comp = completion
2098 return completion_obj
2100 @requires(('object_name', str_type), ('oncomplete', opt(Callable)))
2101 def aio_stat(self, object_name, oncomplete):
2103 Asynchronously get object stats (size/mtime)
2105 oncomplete will be called with the returned size and mtime
2106 as well as the completion:
2108 oncomplete(completion, size, mtime)
2110 :param object_name: the name of the object to get stats from
2111 :type object_name: str
2112 :param oncomplete: what to do when the stat is complete
2113 :type oncomplete: completion
2115 :raises: :class:`Error`
2116 :returns: completion object
2119 object_name = cstr(object_name, 'object_name')
2122 Completion completion
2123 char *_object_name = object_name
2127 def oncomplete_(completion_v):
2128 cdef Completion _completion_v = completion_v
2129 return_value = _completion_v.get_return_value()
2130 if return_value >= 0:
2131 return oncomplete(_completion_v, psize, time.localtime(pmtime))
2133 return oncomplete(_completion_v, None, None)
2135 completion = self.__get_completion(oncomplete_, None)
2136 self.__track_completion(completion)
2138 ret = rados_aio_stat(self.io, _object_name, completion.rados_comp,
2142 completion._cleanup()
2143 raise make_ex(ret, "error stating %s" % object_name)
2146 @requires(('object_name', str_type), ('to_write', bytes), ('offset', int),
2147 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2148 def aio_write(self, object_name, to_write, offset=0,
2149 oncomplete=None, onsafe=None):
2151 Write data to an object asynchronously
2153 Queues the write and returns.
2155 :param object_name: name of the object
2156 :type object_name: str
2157 :param to_write: data to write
2158 :type to_write: bytes
2159 :param offset: byte offset in the object to begin writing at
2161 :param oncomplete: what to do when the write is safe and complete in memory
2163 :type oncomplete: completion
2164 :param onsafe: what to do when the write is safe and complete on storage
2166 :type onsafe: completion
2168 :raises: :class:`Error`
2169 :returns: completion object
2172 object_name = cstr(object_name, 'object_name')
2175 Completion completion
2176 char* _object_name = object_name
2177 char* _to_write = to_write
2178 size_t size = len(to_write)
2179 uint64_t _offset = offset
2181 completion = self.__get_completion(oncomplete, onsafe)
2182 self.__track_completion(completion)
2184 ret = rados_aio_write(self.io, _object_name, completion.rados_comp,
2185 _to_write, size, _offset)
2187 completion._cleanup()
2188 raise make_ex(ret, "error writing object %s" % object_name)
2191 @requires(('object_name', str_type), ('to_write', bytes), ('oncomplete', opt(Callable)),
2192 ('onsafe', opt(Callable)))
2193 def aio_write_full(self, object_name, to_write,
2194 oncomplete=None, onsafe=None):
2196 Asychronously write an entire object
2198 The object is filled with the provided data. If the object exists,
2199 it is atomically truncated and then written.
2200 Queues the write and returns.
2202 :param object_name: name of the object
2203 :type object_name: str
2204 :param to_write: data to write
2206 :param oncomplete: what to do when the write is safe and complete in memory
2208 :type oncomplete: completion
2209 :param onsafe: what to do when the write is safe and complete on storage
2211 :type onsafe: completion
2213 :raises: :class:`Error`
2214 :returns: completion object
2217 object_name = cstr(object_name, 'object_name')
2220 Completion completion
2221 char* _object_name = object_name
2222 char* _to_write = to_write
2223 size_t size = len(to_write)
2225 completion = self.__get_completion(oncomplete, onsafe)
2226 self.__track_completion(completion)
2228 ret = rados_aio_write_full(self.io, _object_name,
2229 completion.rados_comp,
2232 completion._cleanup()
2233 raise make_ex(ret, "error writing object %s" % object_name)
2236 @requires(('object_name', str_type), ('to_append', bytes), ('oncomplete', opt(Callable)),
2237 ('onsafe', opt(Callable)))
2238 def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):
2240 Asychronously append data to an object
2242 Queues the write and returns.
2244 :param object_name: name of the object
2245 :type object_name: str
2246 :param to_append: data to append
2247 :type to_append: str
2248 :param offset: byte offset in the object to begin writing at
2250 :param oncomplete: what to do when the write is safe and complete in memory
2252 :type oncomplete: completion
2253 :param onsafe: what to do when the write is safe and complete on storage
2255 :type onsafe: completion
2257 :raises: :class:`Error`
2258 :returns: completion object
2260 object_name = cstr(object_name, 'object_name')
2263 Completion completion
2264 char* _object_name = object_name
2265 char* _to_append = to_append
2266 size_t size = len(to_append)
2268 completion = self.__get_completion(oncomplete, onsafe)
2269 self.__track_completion(completion)
2271 ret = rados_aio_append(self.io, _object_name,
2272 completion.rados_comp,
2275 completion._cleanup()
2276 raise make_ex(ret, "error appending object %s" % object_name)
2279 def aio_flush(self):
2281 Block until all pending writes in an io context are safe
2283 :raises: :class:`Error`
2286 ret = rados_aio_flush(self.io)
2288 raise make_ex(ret, "error flushing")
2290 @requires(('object_name', str_type), ('length', int), ('offset', int),
2291 ('oncomplete', opt(Callable)))
2292 def aio_read(self, object_name, length, offset, oncomplete):
2294 Asychronously read data from an object
2296 oncomplete will be called with the returned read value as
2297 well as the completion:
2299 oncomplete(completion, data_read)
2301 :param object_name: name of the object to read from
2302 :type object_name: str
2303 :param length: the number of bytes to read
2305 :param offset: byte offset in the object to begin reading from
2307 :param oncomplete: what to do when the read is complete
2308 :type oncomplete: completion
2310 :raises: :class:`Error`
2311 :returns: completion object
2314 object_name = cstr(object_name, 'object_name')
2317 Completion completion
2318 char* _object_name = object_name
2319 uint64_t _offset = offset
2322 size_t _length = length
2324 def oncomplete_(completion_v):
2325 cdef Completion _completion_v = completion_v
2326 return_value = _completion_v.get_return_value()
2327 if return_value > 0 and return_value != length:
2328 _PyBytes_Resize(&_completion_v.buf, return_value)
2329 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2331 completion = self.__get_completion(oncomplete_, None)
2332 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2333 ret_buf = PyBytes_AsString(completion.buf)
2334 self.__track_completion(completion)
2336 ret = rados_aio_read(self.io, _object_name, completion.rados_comp,
2337 ret_buf, _length, _offset)
2339 completion._cleanup()
2340 raise make_ex(ret, "error reading %s" % object_name)
2343 @requires(('object_name', str_type), ('cls', str_type), ('method', str_type),
2344 ('data', bytes), ('length', int),
2345 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2346 def aio_execute(self, object_name, cls, method, data,
2347 length=8192, oncomplete=None, onsafe=None):
2349 Asynchronously execute an OSD class method on an object.
2351 oncomplete and onsafe will be called with the data returned from
2352 the plugin as well as the completion:
2354 oncomplete(completion, data)
2355 onsafe(completion, data)
2357 :param object_name: name of the object
2358 :type object_name: str
2359 :param cls: name of the object class
2361 :param method: name of the method
2363 :param data: input data
2365 :param length: size of output buffer in bytes (default=8192)
2367 :param oncomplete: what to do when the execution is complete
2368 :type oncomplete: completion
2369 :param onsafe: what to do when the execution is safe and complete
2370 :type onsafe: completion
2372 :raises: :class:`Error`
2373 :returns: completion object
2376 object_name = cstr(object_name, 'object_name')
2377 cls = cstr(cls, 'cls')
2378 method = cstr(method, 'method')
2380 Completion completion
2381 char *_object_name = object_name
2383 char *_method = method
2385 size_t _data_len = len(data)
2388 size_t _length = length
2390 def oncomplete_(completion_v):
2391 cdef Completion _completion_v = completion_v
2392 return_value = _completion_v.get_return_value()
2393 if return_value > 0 and return_value != length:
2394 _PyBytes_Resize(&_completion_v.buf, return_value)
2395 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2397 def onsafe_(completion_v):
2398 cdef Completion _completion_v = completion_v
2399 return_value = _completion_v.get_return_value()
2400 return onsafe(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2402 completion = self.__get_completion(oncomplete_ if oncomplete else None, onsafe_ if onsafe else None)
2403 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2404 ret_buf = PyBytes_AsString(completion.buf)
2405 self.__track_completion(completion)
2407 ret = rados_aio_exec(self.io, _object_name, completion.rados_comp,
2408 _cls, _method, _data, _data_len, ret_buf, _length)
2410 completion._cleanup()
2411 raise make_ex(ret, "error executing %s::%s on %s" % (cls, method, object_name))
2414 @requires(('object_name', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2415 def aio_remove(self, object_name, oncomplete=None, onsafe=None):
2417 Asychronously remove an object
2419 :param object_name: name of the object to remove
2420 :type object_name: str
2421 :param oncomplete: what to do when the remove is safe and complete in memory
2423 :type oncomplete: completion
2424 :param onsafe: what to do when the remove is safe and complete on storage
2426 :type onsafe: completion
2428 :raises: :class:`Error`
2429 :returns: completion object
2431 object_name = cstr(object_name, 'object_name')
2434 Completion completion
2435 char* _object_name = object_name
2437 completion = self.__get_completion(oncomplete, onsafe)
2438 self.__track_completion(completion)
2440 ret = rados_aio_remove(self.io, _object_name,
2441 completion.rados_comp)
2443 completion._cleanup()
2444 raise make_ex(ret, "error removing %s" % object_name)
2447 def require_ioctx_open(self):
2449 Checks if the rados.Ioctx object state is 'open'
2451 :raises: IoctxStateError
2453 if self.state != "open":
2454 raise IoctxStateError("The pool is %s" % self.state)
2456 def change_auid(self, auid):
2458 Attempt to change an io context's associated auid "owner."
2460 Requires that you have write permission on both the current and new
2463 :raises: :class:`Error`
2465 self.require_ioctx_open()
2468 uint64_t _auid = auid
2471 ret = rados_ioctx_pool_set_auid(self.io, _auid)
2473 raise make_ex(ret, "error changing auid of '%s' to %d"
2474 % (self.name, auid))
2476 @requires(('loc_key', str_type))
2477 def set_locator_key(self, loc_key):
2479 Set the key for mapping objects to pgs within an io context.
2481 The key is used instead of the object name to determine which
2482 placement groups an object is put in. This affects all subsequent
2483 operations of the io context - until a different locator key is
2484 set, all objects in this io context will be placed in the same pg.
2486 :param loc_key: the key to use as the object locator, or NULL to discard
2487 any previously set key
2490 :raises: :class:`TypeError`
2492 self.require_ioctx_open()
2493 cloc_key = cstr(loc_key, 'loc_key')
2494 cdef char *_loc_key = cloc_key
2496 rados_ioctx_locator_set_key(self.io, _loc_key)
2497 self.locator_key = loc_key
2499 def get_locator_key(self):
2501 Get the locator_key of context
2503 :returns: locator_key
2505 return self.locator_key
2507 @requires(('snap_id', long))
2508 def set_read(self, snap_id):
2510 Set the snapshot for reading objects.
2512 To stop to read from snapshot, use set_read(LIBRADOS_SNAP_HEAD)
2514 :param snap_id: the snapshot Id
2517 :raises: :class:`TypeError`
2519 self.require_ioctx_open()
2520 cdef rados_snap_t _snap_id = snap_id
2522 rados_ioctx_snap_set_read(self.io, _snap_id)
2524 @requires(('nspace', str_type))
2525 def set_namespace(self, nspace):
2527 Set the namespace for objects within an io context.
2529 The namespace in addition to the object name fully identifies
2530 an object. This affects all subsequent operations of the io context
2531 - until a different namespace is set, all objects in this io context
2532 will be placed in the same namespace.
2534 :param nspace: the namespace to use, or None/"" for the default namespace
2537 :raises: :class:`TypeError`
2539 self.require_ioctx_open()
2542 cnspace = cstr(nspace, 'nspace')
2543 cdef char *_nspace = cnspace
2545 rados_ioctx_set_namespace(self.io, _nspace)
2546 self.nspace = nspace
2548 def get_namespace(self):
2550 Get the namespace of context
2558 Close a rados.Ioctx object.
2560 This just tells librados that you no longer need to use the io context.
2561 It may not be freed immediately if there are pending asynchronous
2562 requests on it, but you should not use an io context again after
2563 calling this function on it.
2565 if self.state == "open":
2566 self.require_ioctx_open()
2568 rados_ioctx_destroy(self.io)
2569 self.state = "closed"
2572 @requires(('key', str_type), ('data', bytes))
2573 def write(self, key, data, offset=0):
2575 Write data to an object synchronously
2577 :param key: name of the object
2579 :param data: data to write
2581 :param offset: byte offset in the object to begin writing at
2584 :raises: :class:`TypeError`
2585 :raises: :class:`LogicError`
2586 :returns: int - 0 on success
2588 self.require_ioctx_open()
2590 key = cstr(key, 'key')
2594 size_t length = len(data)
2595 uint64_t _offset = offset
2598 ret = rados_write(self.io, _key, _data, length, _offset)
2602 raise make_ex(ret, "Ioctx.write(%s): failed to write %s"
2605 raise LogicError("Ioctx.write(%s): rados_write \
2606 returned %d, but should return zero on success." % (self.name, ret))
2608 @requires(('key', str_type), ('data', bytes))
2609 def write_full(self, key, data):
2611 Write an entire object synchronously.
2613 The object is filled with the provided data. If the object exists,
2614 it is atomically truncated and then written.
2616 :param key: name of the object
2618 :param data: data to write
2621 :raises: :class:`TypeError`
2622 :raises: :class:`Error`
2623 :returns: int - 0 on success
2625 self.require_ioctx_open()
2626 key = cstr(key, 'key')
2630 size_t length = len(data)
2633 ret = rados_write_full(self.io, _key, _data, length)
2637 raise make_ex(ret, "Ioctx.write_full(%s): failed to write %s"
2640 raise LogicError("Ioctx.write_full(%s): rados_write_full \
2641 returned %d, but should return zero on success." % (self.name, ret))
2643 @requires(('key', str_type), ('data', bytes))
2644 def append(self, key, data):
2646 Append data to an object synchronously
2648 :param key: name of the object
2650 :param data: data to write
2653 :raises: :class:`TypeError`
2654 :raises: :class:`LogicError`
2655 :returns: int - 0 on success
2657 self.require_ioctx_open()
2658 key = cstr(key, 'key')
2662 size_t length = len(data)
2665 ret = rados_append(self.io, _key, _data, length)
2669 raise make_ex(ret, "Ioctx.append(%s): failed to append %s"
2672 raise LogicError("Ioctx.append(%s): rados_append \
2673 returned %d, but should return zero on success." % (self.name, ret))
2675 @requires(('key', str_type))
2676 def read(self, key, length=8192, offset=0):
2678 Read data from an object synchronously
2680 :param key: name of the object
2682 :param length: the number of bytes to read (default=8192)
2684 :param offset: byte offset in the object to begin reading at
2687 :raises: :class:`TypeError`
2688 :raises: :class:`Error`
2689 :returns: str - data read from object
2691 self.require_ioctx_open()
2692 key = cstr(key, 'key')
2696 uint64_t _offset = offset
2697 size_t _length = length
2698 PyObject* ret_s = NULL
2700 ret_s = PyBytes_FromStringAndSize(NULL, length)
2702 ret_buf = PyBytes_AsString(ret_s)
2704 ret = rados_read(self.io, _key, ret_buf, _length, _offset)
2706 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2709 _PyBytes_Resize(&ret_s, ret)
2711 return <object>ret_s
2713 # We DECREF unconditionally: the cast to object above will have
2714 # INCREFed if necessary. This also takes care of exceptions,
2715 # including if _PyString_Resize fails (that will free the string
2716 # itself and set ret_s to NULL, hence XDECREF).
2717 ref.Py_XDECREF(ret_s)
2719 @requires(('key', str_type), ('cls', str_type), ('method', str_type), ('data', bytes))
2720 def execute(self, key, cls, method, data, length=8192):
2722 Execute an OSD class method on an object.
2724 :param key: name of the object
2726 :param cls: name of the object class
2728 :param method: name of the method
2730 :param data: input data
2732 :param length: size of output buffer in bytes (default=8192)
2735 :raises: :class:`TypeError`
2736 :raises: :class:`Error`
2737 :returns: (ret, method output)
2739 self.require_ioctx_open()
2741 key = cstr(key, 'key')
2742 cls = cstr(cls, 'cls')
2743 method = cstr(method, 'method')
2747 char *_method = method
2749 size_t _data_len = len(data)
2752 size_t _length = length
2753 PyObject* ret_s = NULL
2755 ret_s = PyBytes_FromStringAndSize(NULL, length)
2757 ret_buf = PyBytes_AsString(ret_s)
2759 ret = rados_exec(self.io, _key, _cls, _method, _data,
2760 _data_len, ret_buf, _length)
2762 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2765 _PyBytes_Resize(&ret_s, ret)
2767 return ret, <object>ret_s
2769 # We DECREF unconditionally: the cast to object above will have
2770 # INCREFed if necessary. This also takes care of exceptions,
2771 # including if _PyString_Resize fails (that will free the string
2772 # itself and set ret_s to NULL, hence XDECREF).
2773 ref.Py_XDECREF(ret_s)
2775 def get_stats(self):
2777 Get pool usage statistics
2779 :returns: dict - contains the following keys:
2781 - ``num_bytes`` (int) - size of pool in bytes
2783 - ``num_kb`` (int) - size of pool in kbytes
2785 - ``num_objects`` (int) - number of objects in the pool
2787 - ``num_object_clones`` (int) - number of object clones
2789 - ``num_object_copies`` (int) - number of object copies
2791 - ``num_objects_missing_on_primary`` (int) - number of objets
2794 - ``num_objects_unfound`` (int) - number of unfound objects
2796 - ``num_objects_degraded`` (int) - number of degraded objects
2798 - ``num_rd`` (int) - bytes read
2800 - ``num_rd_kb`` (int) - kbytes read
2802 - ``num_wr`` (int) - bytes written
2804 - ``num_wr_kb`` (int) - kbytes written
2806 self.require_ioctx_open()
2807 cdef rados_pool_stat_t stats
2809 ret = rados_ioctx_pool_stat(self.io, &stats)
2811 raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name)
2812 return {'num_bytes': stats.num_bytes,
2813 'num_kb': stats.num_kb,
2814 'num_objects': stats.num_objects,
2815 'num_object_clones': stats.num_object_clones,
2816 'num_object_copies': stats.num_object_copies,
2817 "num_objects_missing_on_primary": stats.num_objects_missing_on_primary,
2818 "num_objects_unfound": stats.num_objects_unfound,
2819 "num_objects_degraded": stats.num_objects_degraded,
2820 "num_rd": stats.num_rd,
2821 "num_rd_kb": stats.num_rd_kb,
2822 "num_wr": stats.num_wr,
2823 "num_wr_kb": stats.num_wr_kb}
2825 @requires(('key', str_type))
2826 def remove_object(self, key):
2830 This does not delete any snapshots of the object.
2832 :param key: the name of the object to delete
2835 :raises: :class:`TypeError`
2836 :raises: :class:`Error`
2837 :returns: bool - True on success
2839 self.require_ioctx_open()
2840 key = cstr(key, 'key')
2845 ret = rados_remove(self.io, _key)
2847 raise make_ex(ret, "Failed to remove '%s'" % key)
2850 @requires(('key', str_type))
2851 def trunc(self, key, size):
2855 If this enlarges the object, the new area is logically filled with
2856 zeroes. If this shrinks the object, the excess data is removed.
2858 :param key: the name of the object to resize
2860 :param size: the new size of the object in bytes
2863 :raises: :class:`TypeError`
2864 :raises: :class:`Error`
2865 :returns: int - 0 on success, otherwise raises error
2868 self.require_ioctx_open()
2869 key = cstr(key, 'key')
2872 uint64_t _size = size
2875 ret = rados_trunc(self.io, _key, _size)
2877 raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))
2880 @requires(('key', str_type))
2881 def stat(self, key):
2883 Get object stats (size/mtime)
2885 :param key: the name of the object to get stats from
2888 :raises: :class:`TypeError`
2889 :raises: :class:`Error`
2890 :returns: (size,timestamp)
2892 self.require_ioctx_open()
2894 key = cstr(key, 'key')
2901 ret = rados_stat(self.io, _key, &psize, &pmtime)
2903 raise make_ex(ret, "Failed to stat %r" % key)
2904 return psize, time.localtime(pmtime)
2906 @requires(('key', str_type), ('xattr_name', str_type))
2907 def get_xattr(self, key, xattr_name):
2909 Get the value of an extended attribute on an object.
2911 :param key: the name of the object to get xattr from
2913 :param xattr_name: which extended attribute to read
2914 :type xattr_name: str
2916 :raises: :class:`TypeError`
2917 :raises: :class:`Error`
2918 :returns: str - value of the xattr
2920 self.require_ioctx_open()
2922 key = cstr(key, 'key')
2923 xattr_name = cstr(xattr_name, 'xattr_name')
2926 char *_xattr_name = xattr_name
2927 size_t ret_length = 4096
2928 char *ret_buf = NULL
2931 while ret_length < 4096 * 1024 * 1024:
2932 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
2934 ret = rados_getxattr(self.io, _key, _xattr_name, ret_buf, ret_length)
2935 if ret == -errno.ERANGE:
2938 raise make_ex(ret, "Failed to get xattr %r" % xattr_name)
2941 return ret_buf[:ret]
2945 @requires(('oid', str_type))
2946 def get_xattrs(self, oid):
2948 Start iterating over xattrs on an object.
2950 :param oid: the name of the object to get xattrs from
2953 :raises: :class:`TypeError`
2954 :raises: :class:`Error`
2955 :returns: XattrIterator
2957 self.require_ioctx_open()
2958 return XattrIterator(self, oid)
2960 @requires(('key', str_type), ('xattr_name', str_type), ('xattr_value', bytes))
2961 def set_xattr(self, key, xattr_name, xattr_value):
2963 Set an extended attribute on an object.
2965 :param key: the name of the object to set xattr to
2967 :param xattr_name: which extended attribute to set
2968 :type xattr_name: str
2969 :param xattr_value: the value of the extended attribute
2970 :type xattr_value: bytes
2972 :raises: :class:`TypeError`
2973 :raises: :class:`Error`
2974 :returns: bool - True on success, otherwise raise an error
2976 self.require_ioctx_open()
2978 key = cstr(key, 'key')
2979 xattr_name = cstr(xattr_name, 'xattr_name')
2982 char *_xattr_name = xattr_name
2983 char *_xattr_value = xattr_value
2984 size_t _xattr_value_len = len(xattr_value)
2987 ret = rados_setxattr(self.io, _key, _xattr_name,
2988 _xattr_value, _xattr_value_len)
2990 raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
2993 @requires(('key', str_type), ('xattr_name', str_type))
2994 def rm_xattr(self, key, xattr_name):
2996 Removes an extended attribute on from an object.
2998 :param key: the name of the object to remove xattr from
3000 :param xattr_name: which extended attribute to remove
3001 :type xattr_name: str
3003 :raises: :class:`TypeError`
3004 :raises: :class:`Error`
3005 :returns: bool - True on success, otherwise raise an error
3007 self.require_ioctx_open()
3009 key = cstr(key, 'key')
3010 xattr_name = cstr(xattr_name, 'xattr_name')
3013 char *_xattr_name = xattr_name
3016 ret = rados_rmxattr(self.io, _key, _xattr_name)
3018 raise make_ex(ret, "Failed to delete key %r xattr %r" %
3022 def list_objects(self):
3024 Get ObjectIterator on rados.Ioctx object.
3026 :returns: ObjectIterator
3028 self.require_ioctx_open()
3029 return ObjectIterator(self)
3031 def list_snaps(self):
3033 Get SnapIterator on rados.Ioctx object.
3035 :returns: SnapIterator
3037 self.require_ioctx_open()
3038 return SnapIterator(self)
3040 @requires(('snap_name', str_type))
3041 def create_snap(self, snap_name):
3043 Create a pool-wide snapshot
3045 :param snap_name: the name of the snapshot
3046 :type snap_name: str
3048 :raises: :class:`TypeError`
3049 :raises: :class:`Error`
3051 self.require_ioctx_open()
3052 snap_name = cstr(snap_name, 'snap_name')
3053 cdef char *_snap_name = snap_name
3056 ret = rados_ioctx_snap_create(self.io, _snap_name)
3058 raise make_ex(ret, "Failed to create snap %s" % snap_name)
3060 @requires(('snap_name', str_type))
3061 def remove_snap(self, snap_name):
3063 Removes a pool-wide snapshot
3065 :param snap_name: the name of the snapshot
3066 :type snap_name: str
3068 :raises: :class:`TypeError`
3069 :raises: :class:`Error`
3071 self.require_ioctx_open()
3072 snap_name = cstr(snap_name, 'snap_name')
3073 cdef char *_snap_name = snap_name
3076 ret = rados_ioctx_snap_remove(self.io, _snap_name)
3078 raise make_ex(ret, "Failed to remove snap %s" % snap_name)
3080 @requires(('snap_name', str_type))
3081 def lookup_snap(self, snap_name):
3083 Get the id of a pool snapshot
3085 :param snap_name: the name of the snapshot to lookop
3086 :type snap_name: str
3088 :raises: :class:`TypeError`
3089 :raises: :class:`Error`
3090 :returns: Snap - on success
3092 self.require_ioctx_open()
3093 csnap_name = cstr(snap_name, 'snap_name')
3095 char *_snap_name = csnap_name
3096 rados_snap_t snap_id
3099 ret = rados_ioctx_snap_lookup(self.io, _snap_name, &snap_id)
3101 raise make_ex(ret, "Failed to lookup snap %s" % snap_name)
3102 return Snap(self, snap_name, int(snap_id))
3104 @requires(('oid', str_type), ('snap_name', str_type))
3105 def snap_rollback(self, oid, snap_name):
3107 Rollback an object to a snapshot
3109 :param oid: the name of the object
3111 :param snap_name: the name of the snapshot
3112 :type snap_name: str
3114 :raises: :class:`TypeError`
3115 :raises: :class:`Error`
3117 self.require_ioctx_open()
3118 oid = cstr(oid, 'oid')
3119 snap_name = cstr(snap_name, 'snap_name')
3121 char *_snap_name = snap_name
3125 ret = rados_ioctx_snap_rollback(self.io, _oid, _snap_name)
3127 raise make_ex(ret, "Failed to rollback %s" % oid)
3129 def create_self_managed_snap(self):
3131 Creates a self-managed snapshot
3133 :returns: snap id on success
3135 :raises: :class:`Error`
3137 self.require_ioctx_open()
3139 rados_snap_t _snap_id
3141 ret = rados_ioctx_selfmanaged_snap_create(self.io, &_snap_id)
3143 raise make_ex(ret, "Failed to create self-managed snapshot")
3144 return int(_snap_id)
3146 @requires(('snap_id', int))
3147 def remove_self_managed_snap(self, snap_id):
3149 Removes a self-managed snapshot
3151 :param snap_id: the name of the snapshot
3154 :raises: :class:`TypeError`
3155 :raises: :class:`Error`
3157 self.require_ioctx_open()
3159 rados_snap_t _snap_id = snap_id
3161 ret = rados_ioctx_selfmanaged_snap_remove(self.io, _snap_id)
3163 raise make_ex(ret, "Failed to remove self-managed snapshot")
3165 def set_self_managed_snap_write(self, snaps):
3167 Updates the write context to the specified self-managed
3170 :param snaps: all associated self-managed snapshot ids
3173 :raises: :class:`TypeError`
3174 :raises: :class:`Error`
3176 self.require_ioctx_open()
3180 sorted_snaps = sorted([int(x) for x in snaps], reverse=True)
3181 snap_seq = sorted_snaps[0]
3184 rados_snap_t _snap_seq = snap_seq
3185 rados_snap_t *_snaps = NULL
3186 int _num_snaps = len(sorted_snaps)
3188 _snaps = <rados_snap_t *>malloc(_num_snaps * sizeof(rados_snap_t))
3189 for i in range(len(sorted_snaps)):
3190 _snaps[i] = sorted_snaps[i]
3192 ret = rados_ioctx_selfmanaged_snap_set_write_ctx(self.io,
3197 raise make_ex(ret, "Failed to update snapshot write context")
3201 @requires(('oid', str_type), ('snap_id', int))
3202 def rollback_self_managed_snap(self, oid, snap_id):
3204 Rolls an specific object back to a self-managed snapshot revision
3206 :param oid: the name of the object
3208 :param snap_id: the name of the snapshot
3211 :raises: :class:`TypeError`
3212 :raises: :class:`Error`
3214 self.require_ioctx_open()
3215 oid = cstr(oid, 'oid')
3218 rados_snap_t _snap_id = snap_id
3220 ret = rados_ioctx_selfmanaged_snap_rollback(self.io, _oid, _snap_id)
3222 raise make_ex(ret, "Failed to rollback %s" % oid)
3224 def get_last_version(self):
3226 Return the version of the last object read or written to.
3228 This exposes the internal version number of the last object read or
3229 written via this io context
3231 :returns: version of the last object used
3233 self.require_ioctx_open()
3235 ret = rados_get_last_version(self.io)
3238 def create_write_op(self):
3240 create write operation object.
3241 need call release_write_op after use
3243 return WriteOp().create()
3245 def create_read_op(self):
3247 create read operation object.
3248 need call release_read_op after use
3250 return ReadOp().create()
3252 def release_write_op(self, write_op):
3254 release memory alloc by create_write_op
3258 def release_read_op(self, read_op):
3260 release memory alloc by create_read_op
3261 :para read_op: read_op object
3266 @requires(('write_op', WriteOp), ('keys', tuple), ('values', tuple))
3267 def set_omap(self, write_op, keys, values):
3269 set keys values to write_op
3270 :para write_op: write_operation object
3271 :type write_op: WriteOp
3272 :para keys: a tuple of keys
3274 :para values: a tuple of values
3278 if len(keys) != len(values):
3279 raise Error("Rados(): keys and values must have the same number of items")
3281 keys = cstr_list(keys, 'keys')
3283 WriteOp _write_op = write_op
3284 size_t key_num = len(keys)
3285 char **_keys = to_bytes_array(keys)
3286 char **_values = to_bytes_array(values)
3287 size_t *_lens = to_csize_t_array([len(v) for v in values])
3291 rados_write_op_omap_set(_write_op.write_op,
3292 <const char**>_keys,
3293 <const char**>_values,
3294 <const size_t*>_lens, key_num)
3300 @requires(('write_op', WriteOp), ('oid', str_type), ('mtime', opt(int)), ('flags', opt(int)))
3301 def operate_write_op(self, write_op, oid, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3303 excute the real write operation
3304 :para write_op: write operation object
3305 :type write_op: WriteOp
3306 :para oid: object name
3308 :para mtime: the time to set the mtime to, 0 for the current time
3310 :para flags: flags to apply to the entire operation
3314 oid = cstr(oid, 'oid')
3316 WriteOp _write_op = write_op
3318 time_t _mtime = mtime
3322 ret = rados_write_op_operate(_write_op.write_op, self.io, _oid, &_mtime, _flags)
3324 raise make_ex(ret, "Failed to operate write op for oid %s" % oid)
3326 @requires(('write_op', WriteOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('mtime', opt(int)), ('flags', opt(int)))
3327 def operate_aio_write_op(self, write_op, oid, oncomplete=None, onsafe=None, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3329 excute the real write operation asynchronously
3330 :para write_op: write operation object
3331 :type write_op: WriteOp
3332 :para oid: object name
3334 :param oncomplete: what to do when the remove is safe and complete in memory
3336 :type oncomplete: completion
3337 :param onsafe: what to do when the remove is safe and complete on storage
3339 :type onsafe: completion
3340 :para mtime: the time to set the mtime to, 0 for the current time
3342 :para flags: flags to apply to the entire operation
3345 :raises: :class:`Error`
3346 :returns: completion object
3349 oid = cstr(oid, 'oid')
3351 WriteOp _write_op = write_op
3353 Completion completion
3354 time_t _mtime = mtime
3357 completion = self.__get_completion(oncomplete, onsafe)
3358 self.__track_completion(completion)
3361 ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid,
3364 completion._cleanup()
3365 raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid)
3368 @requires(('read_op', ReadOp), ('oid', str_type), ('flag', opt(int)))
3369 def operate_read_op(self, read_op, oid, flag=LIBRADOS_OPERATION_NOFLAG):
3371 excute the real read operation
3372 :para read_op: read operation object
3373 :type read_op: ReadOp
3374 :para oid: object name
3376 :para flag: flags to apply to the entire operation
3379 oid = cstr(oid, 'oid')
3381 ReadOp _read_op = read_op
3386 ret = rados_read_op_operate(_read_op.read_op, self.io, _oid, _flag)
3388 raise make_ex(ret, "Failed to operate read op for oid %s" % oid)
3390 @requires(('read_op', ReadOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('flag', opt(int)))
3391 def operate_aio_read_op(self, read_op, oid, oncomplete=None, onsafe=None, flag=LIBRADOS_OPERATION_NOFLAG):
3393 excute the real read operation
3394 :para read_op: read operation object
3395 :type read_op: ReadOp
3396 :para oid: object name
3398 :param oncomplete: what to do when the remove is safe and complete in memory
3400 :type oncomplete: completion
3401 :param onsafe: what to do when the remove is safe and complete on storage
3403 :type onsafe: completion
3404 :para flag: flags to apply to the entire operation
3407 oid = cstr(oid, 'oid')
3409 ReadOp _read_op = read_op
3411 Completion completion
3414 completion = self.__get_completion(oncomplete, onsafe)
3415 self.__track_completion(completion)
3418 ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag)
3420 completion._cleanup()
3421 raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid)
3424 @requires(('read_op', ReadOp), ('start_after', str_type), ('filter_prefix', str_type), ('max_return', int))
3425 def get_omap_vals(self, read_op, start_after, filter_prefix, max_return):
3428 :para read_op: read operation object
3429 :type read_op: ReadOp
3430 :para start_after: list keys starting after start_after
3431 :type start_after: str
3432 :para filter_prefix: list only keys beginning with filter_prefix
3433 :type filter_prefix: str
3434 :para max_return: list no more than max_return key/value pairs
3435 :type max_return: int
3436 :returns: an iterator over the requested omap values, return value from this action
3439 start_after = cstr(start_after, 'start_after') if start_after else None
3440 filter_prefix = cstr(filter_prefix, 'filter_prefix') if filter_prefix else None
3442 char *_start_after = opt_str(start_after)
3443 char *_filter_prefix = opt_str(filter_prefix)
3444 ReadOp _read_op = read_op
3445 rados_omap_iter_t iter_addr = NULL
3446 int _max_return = max_return
3450 rados_read_op_omap_get_vals2(_read_op.read_op, _start_after, _filter_prefix,
3451 _max_return, &iter_addr, NULL, &prval)
3452 it = OmapIterator(self)
3454 return it, int(prval)
3456 @requires(('read_op', ReadOp), ('start_after', str_type), ('max_return', int))
3457 def get_omap_keys(self, read_op, start_after, max_return):
3460 :para read_op: read operation object
3461 :type read_op: ReadOp
3462 :para start_after: list keys starting after start_after
3463 :type start_after: str
3464 :para max_return: list no more than max_return key/value pairs
3465 :type max_return: int
3466 :returns: an iterator over the requested omap values, return value from this action
3468 start_after = cstr(start_after, 'start_after') if start_after else None
3470 char *_start_after = opt_str(start_after)
3471 ReadOp _read_op = read_op
3472 rados_omap_iter_t iter_addr = NULL
3473 int _max_return = max_return
3477 rados_read_op_omap_get_keys2(_read_op.read_op, _start_after,
3478 _max_return, &iter_addr, NULL, &prval)
3479 it = OmapIterator(self)
3481 return it, int(prval)
3483 @requires(('read_op', ReadOp), ('keys', tuple))
3484 def get_omap_vals_by_keys(self, read_op, keys):
3486 get the omap values by keys
3487 :para read_op: read operation object
3488 :type read_op: ReadOp
3489 :para keys: input key tuple
3491 :returns: an iterator over the requested omap values, return value from this action
3493 keys = cstr_list(keys, 'keys')
3495 ReadOp _read_op = read_op
3496 rados_omap_iter_t iter_addr
3497 char **_keys = to_bytes_array(keys)
3498 size_t key_num = len(keys)
3503 rados_read_op_omap_get_vals_by_keys(_read_op.read_op,
3504 <const char**>_keys,
3505 key_num, &iter_addr, &prval)
3506 it = OmapIterator(self)
3508 return it, int(prval)
3512 @requires(('write_op', WriteOp), ('keys', tuple))
3513 def remove_omap_keys(self, write_op, keys):
3515 remove omap keys specifiled
3516 :para write_op: write operation object
3517 :type write_op: WriteOp
3518 :para keys: input key tuple
3522 keys = cstr_list(keys, 'keys')
3524 WriteOp _write_op = write_op
3525 size_t key_num = len(keys)
3526 char **_keys = to_bytes_array(keys)
3530 rados_write_op_omap_rm_keys(_write_op.write_op, <const char**>_keys, key_num)
3534 @requires(('write_op', WriteOp))
3535 def clear_omap(self, write_op):
3537 Remove all key/value pairs from an object
3538 :para write_op: write operation object
3539 :type write_op: WriteOp
3543 WriteOp _write_op = write_op
3546 rados_write_op_omap_clear(_write_op.write_op)
3548 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('desc', str_type),
3549 ('duration', opt(int)), ('flags', int))
3550 def lock_exclusive(self, key, name, cookie, desc="", duration=None, flags=0):
3553 Take an exclusive lock on an object
3555 :param key: name of the object
3557 :param name: name of the lock
3559 :param cookie: cookie of the lock
3561 :param desc: description of the lock
3563 :param duration: duration of the lock in seconds
3568 :raises: :class:`TypeError`
3569 :raises: :class:`Error`
3571 self.require_ioctx_open()
3573 key = cstr(key, 'key')
3574 name = cstr(name, 'name')
3575 cookie = cstr(cookie, 'cookie')
3576 desc = cstr(desc, 'desc')
3581 char* _cookie = cookie
3583 uint8_t _flags = flags
3586 if duration is None:
3588 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3591 _duration.tv_sec = duration
3593 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3597 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3599 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('tag', str_type),
3600 ('desc', str_type), ('duration', opt(int)), ('flags', int))
3601 def lock_shared(self, key, name, cookie, tag, desc="", duration=None, flags=0):
3604 Take a shared lock on an object
3606 :param key: name of the object
3608 :param name: name of the lock
3610 :param cookie: cookie of the lock
3612 :param tag: tag of the lock
3614 :param desc: description of the lock
3616 :param duration: duration of the lock in seconds
3621 :raises: :class:`TypeError`
3622 :raises: :class:`Error`
3624 self.require_ioctx_open()
3626 key = cstr(key, 'key')
3627 tag = cstr(tag, 'tag')
3628 name = cstr(name, 'name')
3629 cookie = cstr(cookie, 'cookie')
3630 desc = cstr(desc, 'desc')
3636 char* _cookie = cookie
3638 uint8_t _flags = flags
3641 if duration is None:
3643 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3646 _duration.tv_sec = duration
3648 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3651 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3653 @requires(('key', str_type), ('name', str_type), ('cookie', str_type))
3654 def unlock(self, key, name, cookie):
3657 Release a shared or exclusive lock on an object
3659 :param key: name of the object
3661 :param name: name of the lock
3663 :param cookie: cookie of the lock
3666 :raises: :class:`TypeError`
3667 :raises: :class:`Error`
3669 self.require_ioctx_open()
3671 key = cstr(key, 'key')
3672 name = cstr(name, 'name')
3673 cookie = cstr(cookie, 'cookie')
3678 char* _cookie = cookie
3681 ret = rados_unlock(self.io, _key, _name, _cookie)
3683 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3685 def application_enable(self, app_name, force=False):
3687 Enable an application on an OSD pool
3689 :param app_name: application name
3691 :param force: False if only a single app should exist per pool
3692 :type expire_seconds: boool
3694 :raises: :class:`Error`
3696 app_name = cstr(app_name, 'app_name')
3698 char *_app_name = app_name
3699 int _force = (1 if force else 0)
3702 ret = rados_application_enable(self.io, _app_name, _force)
3704 raise make_ex(ret, "error enabling application")
3706 def application_list(self):
3708 Returns a list of enabled applications
3710 :returns: list of app name string
3718 apps = <char *>realloc_chk(apps, length)
3720 ret = rados_application_list(self.io, apps, &length)
3722 return [decode_cstr(app) for app in
3723 apps[:length].split(b'\0') if app]
3724 elif ret == -errno.ENOENT:
3726 elif ret == -errno.ERANGE:
3729 raise make_ex(ret, "error listing applications")
3733 def application_metadata_set(self, app_name, key, value):
3735 Sets application metadata on an OSD pool
3737 :param app_name: application name
3739 :param key: metadata key
3741 :param value: metadata value
3744 :raises: :class:`Error`
3746 app_name = cstr(app_name, 'app_name')
3747 key = cstr(key, 'key')
3748 value = cstr(value, 'value')
3750 char *_app_name = app_name
3752 char *_value = value
3755 ret = rados_application_metadata_set(self.io, _app_name, _key,
3758 raise make_ex(ret, "error setting application metadata")
3760 def application_metadata_remove(self, app_name, key):
3762 Remove application metadata from an OSD pool
3764 :param app_name: application name
3766 :param key: metadata key
3769 :raises: :class:`Error`
3771 app_name = cstr(app_name, 'app_name')
3772 key = cstr(key, 'key')
3774 char *_app_name = app_name
3778 ret = rados_application_metadata_remove(self.io, _app_name, _key)
3780 raise make_ex(ret, "error removing application metadata")
3782 def application_metadata_list(self, app_name):
3784 Returns a list of enabled applications
3786 :param app_name: application name
3788 :returns: list of key/value tuples
3790 app_name = cstr(app_name, 'app_name')
3792 char *_app_name = app_name
3793 size_t key_length = 128
3794 size_t val_length = 128
3800 c_keys = <char *>realloc_chk(c_keys, key_length)
3801 c_vals = <char *>realloc_chk(c_vals, val_length)
3803 ret = rados_application_metadata_list(self.io, _app_name,
3804 c_keys, &key_length,
3805 c_vals, &val_length)
3807 keys = [decode_cstr(key) for key in
3808 c_keys[:key_length].split(b'\0') if key]
3809 vals = [decode_cstr(val) for val in
3810 c_vals[:val_length].split(b'\0') if val]
3811 return zip(keys, vals)
3812 elif ret == -errno.ERANGE:
3815 raise make_ex(ret, "error listing application metadata")
3821 def set_object_locator(func):
3822 def retfunc(self, *args, **kwargs):
3823 if self.locator_key is not None:
3824 old_locator = self.ioctx.get_locator_key()
3825 self.ioctx.set_locator_key(self.locator_key)
3826 retval = func(self, *args, **kwargs)
3827 self.ioctx.set_locator_key(old_locator)
3830 return func(self, *args, **kwargs)
3834 def set_object_namespace(func):
3835 def retfunc(self, *args, **kwargs):
3836 if self.nspace is None:
3837 raise LogicError("Namespace not set properly in context")
3838 old_nspace = self.ioctx.get_namespace()
3839 self.ioctx.set_namespace(self.nspace)
3840 retval = func(self, *args, **kwargs)
3841 self.ioctx.set_namespace(old_nspace)
3846 class Object(object):
3847 """Rados object wrapper, makes the object look like a file"""
3848 def __init__(self, ioctx, key, locator_key=None, nspace=None):
3852 self.state = "exists"
3853 self.locator_key = locator_key
3854 self.nspace = "" if nspace is None else nspace
3857 return "rados.Object(ioctx=%s,key=%s,nspace=%s,locator=%s)" % \
3858 (str(self.ioctx), self.key, "--default--"
3859 if self.nspace is "" else self.nspace, self.locator_key)
3861 def require_object_exists(self):
3862 if self.state != "exists":
3863 raise ObjectStateError("The object is %s" % self.state)
3866 @set_object_namespace
3867 def read(self, length=1024 * 1024):
3868 self.require_object_exists()
3869 ret = self.ioctx.read(self.key, length, self.offset)
3870 self.offset += len(ret)
3874 @set_object_namespace
3875 def write(self, string_to_write):
3876 self.require_object_exists()
3877 ret = self.ioctx.write(self.key, string_to_write, self.offset)
3879 self.offset += len(string_to_write)
3883 @set_object_namespace
3885 self.require_object_exists()
3886 self.ioctx.remove_object(self.key)
3887 self.state = "removed"
3890 @set_object_namespace
3892 self.require_object_exists()
3893 return self.ioctx.stat(self.key)
3895 def seek(self, position):
3896 self.require_object_exists()
3897 self.offset = position
3900 @set_object_namespace
3901 def get_xattr(self, xattr_name):
3902 self.require_object_exists()
3903 return self.ioctx.get_xattr(self.key, xattr_name)
3906 @set_object_namespace
3907 def get_xattrs(self):
3908 self.require_object_exists()
3909 return self.ioctx.get_xattrs(self.key)
3912 @set_object_namespace
3913 def set_xattr(self, xattr_name, xattr_value):
3914 self.require_object_exists()
3915 return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)
3918 @set_object_namespace
3919 def rm_xattr(self, xattr_name):
3920 self.require_object_exists()
3921 return self.ioctx.rm_xattr(self.key, xattr_name)
3932 class MonitorLog(object):
3933 # NOTE(sileht): Keep this class for backward compat
3934 # method moved to Rados.monitor_log()
3936 For watching cluster log messages. Instantiate an object and keep
3937 it around while callback is periodically called. Construct with
3938 'level' to monitor 'level' messages (one of MONITOR_LEVELS).
3939 arg will be passed to the callback.
3941 callback will be called with:
3942 arg (given to __init__)
3943 line (the full line, including timestamp, who, level, msg)
3944 who (which entity issued the log message)
3945 timestamp_sec (sec of a struct timespec)
3946 timestamp_nsec (sec of a struct timespec)
3947 seq (sequence number)
3948 level (string representing the level of the log message)
3949 msg (the message itself)
3950 callback's return value is ignored
3952 def __init__(self, cluster, level, callback, arg):
3954 self.callback = callback
3956 self.cluster = cluster
3957 self.cluster.monitor_log(level, callback, arg)