1 # cython: embedsignature=True
3 This module is a thin wrapper around librados.
5 Error codes from librados are turned into exceptions that subclass
6 :class:`Error`. Almost all methods may raise :class:`Error(the base class of all rados exceptions), :class:`PermissionError`
7 (the base class of all rados exceptions), :class:`PermissionError`
8 and :class:`IOError`, in addition to those documented for the
11 # Copyright 2011 Josh Durgin
12 # Copyright 2011, Hannu Valtonen <hannu.valtonen@ormod.com>
13 # Copyright 2015 Hector Martin <marcan@marcan.st>
14 # Copyright 2016 Mehdi Abaakouk <sileht@redhat.com>
16 from cpython cimport PyObject, ref
17 from cpython.pycapsule cimport *
18 from libc cimport errno
19 from libc.stdint cimport *
20 from libc.stdlib cimport malloc, realloc, free
27 from collections.abc import Callable
29 from collections import Callable
30 from datetime import datetime
31 from functools import partial, wraps
32 from itertools import chain
34 # Are we running Python 2.x
35 if sys.version_info[0] < 3:
41 cdef extern from "Python.h":
42 # These are in cpython/string.pxd, but use "object" types instead of
43 # PyObject*, which invokes assumptions in cpython that we need to
44 # legitimately break to implement zero-copy string buffers in Ioctx.read().
45 # This is valid use of the Python API and documented as a special case.
46 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
47 char* PyBytes_AsString(PyObject *string) except NULL
48 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
49 void PyEval_InitThreads()
52 cdef extern from "time.h":
53 ctypedef long int time_t
54 ctypedef long int suseconds_t
57 cdef extern from "sys/time.h":
63 cdef extern from "rados/rados_types.h" nogil:
64 cdef char* _LIBRADOS_ALL_NSPACES "LIBRADOS_ALL_NSPACES"
67 cdef extern from "rados/librados.h" nogil:
69 _LIBRADOS_OP_FLAG_EXCL "LIBRADOS_OP_FLAG_EXCL"
70 _LIBRADOS_OP_FLAG_FAILOK "LIBRADOS_OP_FLAG_FAILOK"
71 _LIBRADOS_OP_FLAG_FADVISE_RANDOM "LIBRADOS_OP_FLAG_FADVISE_RANDOM"
72 _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL "LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL"
73 _LIBRADOS_OP_FLAG_FADVISE_WILLNEED "LIBRADOS_OP_FLAG_FADVISE_WILLNEED"
74 _LIBRADOS_OP_FLAG_FADVISE_DONTNEED "LIBRADOS_OP_FLAG_FADVISE_DONTNEED"
75 _LIBRADOS_OP_FLAG_FADVISE_NOCACHE "LIBRADOS_OP_FLAG_FADVISE_NOCACHE"
79 _LIBRADOS_OPERATION_NOFLAG "LIBRADOS_OPERATION_NOFLAG"
80 _LIBRADOS_OPERATION_BALANCE_READS "LIBRADOS_OPERATION_BALANCE_READS"
81 _LIBRADOS_OPERATION_LOCALIZE_READS "LIBRADOS_OPERATION_LOCALIZE_READS"
82 _LIBRADOS_OPERATION_ORDER_READS_WRITES "LIBRADOS_OPERATION_ORDER_READS_WRITES"
83 _LIBRADOS_OPERATION_IGNORE_CACHE "LIBRADOS_OPERATION_IGNORE_CACHE"
84 _LIBRADOS_OPERATION_SKIPRWLOCKS "LIBRADOS_OPERATION_SKIPRWLOCKS"
85 _LIBRADOS_OPERATION_IGNORE_OVERLAY "LIBRADOS_OPERATION_IGNORE_OVERLAY"
86 _LIBRADOS_CREATE_EXCLUSIVE "LIBRADOS_CREATE_EXCLUSIVE"
87 _LIBRADOS_CREATE_IDEMPOTENT "LIBRADOS_CREATE_IDEMPOTENT"
89 cdef uint64_t _LIBRADOS_SNAP_HEAD "LIBRADOS_SNAP_HEAD"
91 ctypedef void* rados_xattrs_iter_t
92 ctypedef void* rados_omap_iter_t
93 ctypedef void* rados_list_ctx_t
94 ctypedef uint64_t rados_snap_t
95 ctypedef void *rados_write_op_t
96 ctypedef void *rados_read_op_t
97 ctypedef void *rados_completion_t
98 ctypedef void (*rados_callback_t)(rados_completion_t cb, void *arg)
99 ctypedef void (*rados_log_callback_t)(void *arg, const char *line, const char *who,
100 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
101 ctypedef void (*rados_log_callback2_t)(void *arg, const char *line, const char *channel, const char *who, const char *name,
102 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
105 cdef struct rados_cluster_stat_t:
111 cdef struct rados_pool_stat_t:
115 uint64_t num_object_clones
116 uint64_t num_object_copies
117 uint64_t num_objects_missing_on_primary
118 uint64_t num_objects_unfound
119 uint64_t num_objects_degraded
125 void rados_buffer_free(char *buf)
127 void rados_version(int *major, int *minor, int *extra)
128 int rados_create2(rados_t *pcluster, const char *const clustername,
129 const char * const name, uint64_t flags)
130 int rados_create_with_context(rados_t *cluster, rados_config_t cct)
131 int rados_connect(rados_t cluster)
132 void rados_shutdown(rados_t cluster)
133 uint64_t rados_get_instance_id(rados_t cluster)
134 int rados_conf_read_file(rados_t cluster, const char *path)
135 int rados_conf_parse_argv_remainder(rados_t cluster, int argc, const char **argv, const char **remargv)
136 int rados_conf_parse_env(rados_t cluster, const char *var)
137 int rados_conf_set(rados_t cluster, char *option, const char *value)
138 int rados_conf_get(rados_t cluster, char *option, char *buf, size_t len)
140 int rados_ioctx_pool_stat(rados_ioctx_t io, rados_pool_stat_t *stats)
141 int64_t rados_pool_lookup(rados_t cluster, const char *pool_name)
142 int rados_pool_reverse_lookup(rados_t cluster, int64_t id, char *buf, size_t maxlen)
143 int rados_pool_create(rados_t cluster, const char *pool_name)
144 int rados_pool_create_with_crush_rule(rados_t cluster, const char *pool_name, uint8_t crush_rule_num)
145 int rados_pool_get_base_tier(rados_t cluster, int64_t pool, int64_t *base_tier)
146 int rados_pool_list(rados_t cluster, char *buf, size_t len)
147 int rados_pool_delete(rados_t cluster, const char *pool_name)
148 int rados_inconsistent_pg_list(rados_t cluster, int64_t pool, char *buf, size_t len)
150 int rados_cluster_stat(rados_t cluster, rados_cluster_stat_t *result)
151 int rados_cluster_fsid(rados_t cluster, char *buf, size_t len)
152 int rados_blacklist_add(rados_t cluster, char *client_address, uint32_t expire_seconds)
153 int rados_application_enable(rados_ioctx_t io, const char *app_name,
155 void rados_set_osdmap_full_try(rados_ioctx_t io)
156 void rados_unset_osdmap_full_try(rados_ioctx_t io)
157 int rados_application_list(rados_ioctx_t io, char *values,
159 int rados_application_metadata_get(rados_ioctx_t io, const char *app_name,
160 const char *key, char *value,
162 int rados_application_metadata_set(rados_ioctx_t io, const char *app_name,
163 const char *key, const char *value)
164 int rados_application_metadata_remove(rados_ioctx_t io,
165 const char *app_name, const char *key)
166 int rados_application_metadata_list(rados_ioctx_t io,
167 const char *app_name, char *keys,
168 size_t *key_len, char *values,
170 int rados_ping_monitor(rados_t cluster, const char *mon_id, char **outstr, size_t *outstrlen)
171 int rados_mon_command(rados_t cluster, const char **cmd, size_t cmdlen,
172 const char *inbuf, size_t inbuflen,
173 char **outbuf, size_t *outbuflen,
174 char **outs, size_t *outslen)
175 int rados_mgr_command(rados_t cluster, const char **cmd, size_t cmdlen,
176 const char *inbuf, size_t inbuflen,
177 char **outbuf, size_t *outbuflen,
178 char **outs, size_t *outslen)
179 int rados_mon_command_target(rados_t cluster, const char *name, const char **cmd, size_t cmdlen,
180 const char *inbuf, size_t inbuflen,
181 char **outbuf, size_t *outbuflen,
182 char **outs, size_t *outslen)
183 int rados_osd_command(rados_t cluster, int osdid, const char **cmd, size_t cmdlen,
184 const char *inbuf, size_t inbuflen,
185 char **outbuf, size_t *outbuflen,
186 char **outs, size_t *outslen)
187 int rados_pg_command(rados_t cluster, const char *pgstr, const char **cmd, size_t cmdlen,
188 const char *inbuf, size_t inbuflen,
189 char **outbuf, size_t *outbuflen,
190 char **outs, size_t *outslen)
191 int rados_monitor_log(rados_t cluster, const char *level, rados_log_callback_t cb, void *arg)
192 int rados_monitor_log2(rados_t cluster, const char *level, rados_log_callback2_t cb, void *arg)
194 int rados_wait_for_latest_osdmap(rados_t cluster)
196 int rados_service_register(rados_t cluster, const char *service, const char *daemon, const char *metadata_dict)
197 int rados_service_update_status(rados_t cluster, const char *status_dict)
199 int rados_ioctx_create(rados_t cluster, const char *pool_name, rados_ioctx_t *ioctx)
200 int rados_ioctx_create2(rados_t cluster, int64_t pool_id, rados_ioctx_t *ioctx)
201 void rados_ioctx_destroy(rados_ioctx_t io)
202 void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key)
203 void rados_ioctx_set_namespace(rados_ioctx_t io, const char * nspace)
205 uint64_t rados_get_last_version(rados_ioctx_t io)
206 int rados_stat(rados_ioctx_t io, const char *o, uint64_t *psize, time_t *pmtime)
207 int rados_write(rados_ioctx_t io, const char *oid, const char *buf, size_t len, uint64_t off)
208 int rados_write_full(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
209 int rados_append(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
210 int rados_read(rados_ioctx_t io, const char *oid, char *buf, size_t len, uint64_t off)
211 int rados_remove(rados_ioctx_t io, const char *oid)
212 int rados_trunc(rados_ioctx_t io, const char *oid, uint64_t size)
213 int rados_getxattr(rados_ioctx_t io, const char *o, const char *name, char *buf, size_t len)
214 int rados_setxattr(rados_ioctx_t io, const char *o, const char *name, const char *buf, size_t len)
215 int rados_rmxattr(rados_ioctx_t io, const char *o, const char *name)
216 int rados_getxattrs(rados_ioctx_t io, const char *oid, rados_xattrs_iter_t *iter)
217 int rados_getxattrs_next(rados_xattrs_iter_t iter, const char **name, const char **val, size_t *len)
218 void rados_getxattrs_end(rados_xattrs_iter_t iter)
220 int rados_nobjects_list_open(rados_ioctx_t io, rados_list_ctx_t *ctx)
221 int rados_nobjects_list_next(rados_list_ctx_t ctx, const char **entry, const char **key, const char **nspace)
222 void rados_nobjects_list_close(rados_list_ctx_t ctx)
224 int rados_ioctx_pool_requires_alignment2(rados_ioctx_t io, int * requires)
225 int rados_ioctx_pool_required_alignment2(rados_ioctx_t io, uint64_t * alignment)
227 int rados_ioctx_snap_rollback(rados_ioctx_t io, const char * oid, const char * snapname)
228 int rados_ioctx_snap_create(rados_ioctx_t io, const char * snapname)
229 int rados_ioctx_snap_remove(rados_ioctx_t io, const char * snapname)
230 int rados_ioctx_snap_lookup(rados_ioctx_t io, const char * name, rados_snap_t * id)
231 int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id, char * name, int maxlen)
232 void rados_ioctx_snap_set_read(rados_ioctx_t io, rados_snap_t snap)
233 int rados_ioctx_snap_list(rados_ioctx_t io, rados_snap_t * snaps, int maxlen)
234 int rados_ioctx_snap_get_stamp(rados_ioctx_t io, rados_snap_t id, time_t * t)
236 int rados_ioctx_selfmanaged_snap_create(rados_ioctx_t io,
237 rados_snap_t *snapid)
238 int rados_ioctx_selfmanaged_snap_remove(rados_ioctx_t io,
240 int rados_ioctx_selfmanaged_snap_set_write_ctx(rados_ioctx_t io,
241 rados_snap_t snap_seq,
244 int rados_ioctx_selfmanaged_snap_rollback(rados_ioctx_t io, const char *oid,
247 int rados_lock_exclusive(rados_ioctx_t io, const char * oid, const char * name,
248 const char * cookie, const char * desc,
249 timeval * duration, uint8_t flags)
250 int rados_lock_shared(rados_ioctx_t io, const char * o, const char * name,
251 const char * cookie, const char * tag, const char * desc,
252 timeval * duration, uint8_t flags)
253 int rados_unlock(rados_ioctx_t io, const char * o, const char * name, const char * cookie)
255 rados_write_op_t rados_create_write_op()
256 void rados_release_write_op(rados_write_op_t write_op)
258 rados_read_op_t rados_create_read_op()
259 void rados_release_read_op(rados_read_op_t read_op)
261 int rados_aio_create_completion(void * cb_arg, rados_callback_t cb_complete, rados_callback_t cb_safe, rados_completion_t * pc)
262 void rados_aio_release(rados_completion_t c)
263 int rados_aio_stat(rados_ioctx_t io, const char *oid, rados_completion_t completion, uint64_t *psize, time_t *pmtime)
264 int rados_aio_write(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len, uint64_t off)
265 int rados_aio_append(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
266 int rados_aio_write_full(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
267 int rados_aio_remove(rados_ioctx_t io, const char * oid, rados_completion_t completion)
268 int rados_aio_read(rados_ioctx_t io, const char * oid, rados_completion_t completion, char * buf, size_t len, uint64_t off)
269 int rados_aio_flush(rados_ioctx_t io)
271 int rados_aio_get_return_value(rados_completion_t c)
272 int rados_aio_wait_for_complete_and_cb(rados_completion_t c)
273 int rados_aio_wait_for_safe_and_cb(rados_completion_t c)
274 int rados_aio_wait_for_complete(rados_completion_t c)
275 int rados_aio_wait_for_safe(rados_completion_t c)
276 int rados_aio_is_complete(rados_completion_t c)
277 int rados_aio_is_safe(rados_completion_t c)
279 int rados_exec(rados_ioctx_t io, const char * oid, const char * cls, const char * method,
280 const char * in_buf, size_t in_len, char * buf, size_t out_len)
281 int rados_aio_exec(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * cls, const char * method,
282 const char * in_buf, size_t in_len, char * buf, size_t out_len)
284 int rados_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, const char * oid, time_t * mtime, int flags)
285 int rados_aio_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, time_t *mtime, int flags)
286 void rados_write_op_omap_set(rados_write_op_t write_op, const char * const* keys, const char * const* vals, const size_t * lens, size_t num)
287 void rados_write_op_omap_rm_keys(rados_write_op_t write_op, const char * const* keys, size_t keys_len)
288 void rados_write_op_omap_clear(rados_write_op_t write_op)
289 void rados_write_op_set_flags(rados_write_op_t write_op, int flags)
291 void rados_write_op_create(rados_write_op_t write_op, int exclusive, const char *category)
292 void rados_write_op_append(rados_write_op_t write_op, const char *buffer, size_t len)
293 void rados_write_op_write_full(rados_write_op_t write_op, const char *buffer, size_t len)
294 void rados_write_op_assert_version(rados_write_op_t write_op, uint64_t ver)
295 void rados_write_op_write(rados_write_op_t write_op, const char *buffer, size_t len, uint64_t offset)
296 void rados_write_op_remove(rados_write_op_t write_op)
297 void rados_write_op_truncate(rados_write_op_t write_op, uint64_t offset)
298 void rados_write_op_zero(rados_write_op_t write_op, uint64_t offset, uint64_t len)
300 void rados_read_op_omap_get_vals2(rados_read_op_t read_op, const char * start_after, const char * filter_prefix, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
301 void rados_read_op_omap_get_keys2(rados_read_op_t read_op, const char * start_after, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
302 void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op, const char * const* keys, size_t keys_len, rados_omap_iter_t * iter, int * prval)
303 int rados_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, const char * oid, int flags)
304 int rados_aio_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, int flags)
305 void rados_read_op_set_flags(rados_read_op_t read_op, int flags)
306 int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len)
307 void rados_omap_get_end(rados_omap_iter_t iter)
308 int rados_notify2(rados_ioctx_t io, const char * o, const char *buf, int buf_len, uint64_t timeout_ms, char **reply_buffer, size_t *reply_buffer_len)
311 LIBRADOS_OP_FLAG_EXCL = _LIBRADOS_OP_FLAG_EXCL
312 LIBRADOS_OP_FLAG_FAILOK = _LIBRADOS_OP_FLAG_FAILOK
313 LIBRADOS_OP_FLAG_FADVISE_RANDOM = _LIBRADOS_OP_FLAG_FADVISE_RANDOM
314 LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL = _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL
315 LIBRADOS_OP_FLAG_FADVISE_WILLNEED = _LIBRADOS_OP_FLAG_FADVISE_WILLNEED
316 LIBRADOS_OP_FLAG_FADVISE_DONTNEED = _LIBRADOS_OP_FLAG_FADVISE_DONTNEED
317 LIBRADOS_OP_FLAG_FADVISE_NOCACHE = _LIBRADOS_OP_FLAG_FADVISE_NOCACHE
319 LIBRADOS_SNAP_HEAD = _LIBRADOS_SNAP_HEAD
321 LIBRADOS_OPERATION_NOFLAG = _LIBRADOS_OPERATION_NOFLAG
322 LIBRADOS_OPERATION_BALANCE_READS = _LIBRADOS_OPERATION_BALANCE_READS
323 LIBRADOS_OPERATION_LOCALIZE_READS = _LIBRADOS_OPERATION_LOCALIZE_READS
324 LIBRADOS_OPERATION_ORDER_READS_WRITES = _LIBRADOS_OPERATION_ORDER_READS_WRITES
325 LIBRADOS_OPERATION_IGNORE_CACHE = _LIBRADOS_OPERATION_IGNORE_CACHE
326 LIBRADOS_OPERATION_SKIPRWLOCKS = _LIBRADOS_OPERATION_SKIPRWLOCKS
327 LIBRADOS_OPERATION_IGNORE_OVERLAY = _LIBRADOS_OPERATION_IGNORE_OVERLAY
329 LIBRADOS_ALL_NSPACES = _LIBRADOS_ALL_NSPACES.decode('utf-8')
331 LIBRADOS_CREATE_EXCLUSIVE = _LIBRADOS_CREATE_EXCLUSIVE
332 LIBRADOS_CREATE_IDEMPOTENT = _LIBRADOS_CREATE_IDEMPOTENT
334 ANONYMOUS_AUID = 0xffffffffffffffff
338 class Error(Exception):
339 """ `Error` class, derived from `Exception` """
340 def __init__(self, message, errno=None):
341 super(Exception, self).__init__(message)
345 msg = super(Exception, self).__str__()
346 if self.errno is None:
348 return '[errno {0}] {1}'.format(self.errno, msg)
350 def __reduce__(self):
351 return (self.__class__, (self.message, self.errno))
353 class InvalidArgumentError(Error):
356 class OSError(Error):
357 """ `OSError` class, derived from `Error` """
360 class InterruptedOrTimeoutError(OSError):
361 """ `InterruptedOrTimeoutError` class, derived from `OSError` """
365 class PermissionError(OSError):
366 """ `PermissionError` class, derived from `OSError` """
370 class PermissionDeniedError(OSError):
371 """ deal with EACCES related. """
375 class ObjectNotFound(OSError):
376 """ `ObjectNotFound` class, derived from `OSError` """
380 class NoData(OSError):
381 """ `NoData` class, derived from `OSError` """
385 class ObjectExists(OSError):
386 """ `ObjectExists` class, derived from `OSError` """
390 class ObjectBusy(OSError):
391 """ `ObjectBusy` class, derived from `IOError` """
395 class IOError(OSError):
396 """ `ObjectBusy` class, derived from `OSError` """
400 class NoSpace(OSError):
401 """ `NoSpace` class, derived from `OSError` """
405 class RadosStateError(Error):
406 """ `RadosStateError` class, derived from `Error` """
410 class IoctxStateError(Error):
411 """ `IoctxStateError` class, derived from `Error` """
415 class ObjectStateError(Error):
416 """ `ObjectStateError` class, derived from `Error` """
420 class LogicError(Error):
421 """ `` class, derived from `Error` """
425 class TimedOut(OSError):
426 """ `TimedOut` class, derived from `OSError` """
430 IF UNAME_SYSNAME == "FreeBSD":
431 cdef errno_to_exception = {
432 errno.EPERM : PermissionError,
433 errno.ENOENT : ObjectNotFound,
435 errno.ENOSPC : NoSpace,
436 errno.EEXIST : ObjectExists,
437 errno.EBUSY : ObjectBusy,
438 errno.ENOATTR : NoData,
439 errno.EINTR : InterruptedOrTimeoutError,
440 errno.ETIMEDOUT : TimedOut,
441 errno.EACCES : PermissionDeniedError,
442 errno.EINVAL : InvalidArgumentError,
445 cdef errno_to_exception = {
446 errno.EPERM : PermissionError,
447 errno.ENOENT : ObjectNotFound,
449 errno.ENOSPC : NoSpace,
450 errno.EEXIST : ObjectExists,
451 errno.EBUSY : ObjectBusy,
452 errno.ENODATA : NoData,
453 errno.EINTR : InterruptedOrTimeoutError,
454 errno.ETIMEDOUT : TimedOut,
455 errno.EACCES : PermissionDeniedError,
456 errno.EINVAL : InvalidArgumentError,
460 cdef make_ex(ret, msg):
462 Translate a librados return code into an exception.
464 :param ret: the return code
466 :param msg: the error message to use
468 :returns: a subclass of :class:`Error`
471 if ret in errno_to_exception:
472 return errno_to_exception[ret](msg, errno=ret)
474 return OSError(msg, errno=ret)
477 # helper to specify an optional argument, where in addition to `cls`, `None`
483 # validate argument types of an instance method
484 # kwargs is an un-ordered dict, so use args instead
485 def requires(*types):
486 def is_type_of(v, t):
490 return isinstance(v, t)
492 def check_type(val, arg_name, arg_type):
493 if isinstance(arg_type, tuple):
494 if any(is_type_of(val, t) for t in arg_type):
496 type_names = ' or '.join('None' if t is None else t.__name__
498 raise TypeError('%s must be %s' % (arg_name, type_names))
500 if is_type_of(val, arg_type):
502 assert(arg_type is not None)
503 raise TypeError('%s must be %s' % (arg_name, arg_type.__name__))
506 # FIXME(sileht): this stop with
507 # AttributeError: 'method_descriptor' object has no attribute '__module__'
509 def validate_func(*args, **kwargs):
510 # ignore the `self` arg
511 pos_args = zip(args[1:], types)
512 named_args = ((kwargs[name], (name, spec)) for name, spec in types
514 for arg_val, (arg_name, arg_type) in chain(pos_args, named_args):
515 check_type(arg_val, arg_name, arg_type)
516 return f(*args, **kwargs)
521 def cstr(val, name, encoding="utf-8", opt=False):
523 Create a byte string from a Python string
525 :param basestring val: Python string
526 :param str name: Name of the string parameter, for exceptions
527 :param str encoding: Encoding to use
528 :param bool opt: If True, None is allowed
530 :raises: :class:`InvalidArgument`
532 if opt and val is None:
534 if isinstance(val, bytes):
536 elif isinstance(val, unicode):
537 return val.encode(encoding)
539 raise TypeError('%s must be a string' % name)
542 def cstr_list(list_str, name, encoding="utf-8"):
543 return [cstr(s, name) for s in list_str]
546 def decode_cstr(val, encoding="utf-8"):
548 Decode a byte string into a Python string.
550 :param bytes val: byte string
551 :rtype: unicode or None
556 return val.decode(encoding)
559 cdef char* opt_str(s) except? NULL:
565 cdef void* realloc_chk(void* ptr, size_t size) except NULL:
566 cdef void *ret = realloc(ptr, size)
568 raise MemoryError("realloc failed")
572 cdef size_t * to_csize_t_array(list_int):
573 cdef size_t *ret = <size_t *>malloc(len(list_int) * sizeof(size_t))
575 raise MemoryError("malloc failed")
576 for i in xrange(len(list_int)):
577 ret[i] = <size_t>list_int[i]
581 cdef char ** to_bytes_array(list_bytes):
582 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
584 raise MemoryError("malloc failed")
585 for i in xrange(len(list_bytes)):
586 ret[i] = <char *>list_bytes[i]
591 cdef int __monitor_callback(void *arg, const char *line, const char *who,
592 uint64_t sec, uint64_t nsec, uint64_t seq,
593 const char *level, const char *msg) with gil:
594 cdef object cb_info = <object>arg
595 cb_info[0](cb_info[1], line, who, sec, nsec, seq, level, msg)
598 cdef int __monitor_callback2(void *arg, const char *line, const char *channel,
601 uint64_t sec, uint64_t nsec, uint64_t seq,
602 const char *level, const char *msg) with gil:
603 cdef object cb_info = <object>arg
604 cb_info[0](cb_info[1], line, channel, name, who, sec, nsec, seq, level, msg)
608 class Version(object):
609 """ Version information """
610 def __init__(self, major, minor, extra):
616 return "%d.%d.%d" % (self.major, self.minor, self.extra)
619 cdef class Rados(object):
620 """This class wraps librados functions"""
621 # NOTE(sileht): attributes declared in .pyd
623 def __init__(self, *args, **kwargs):
625 self.__setup(*args, **kwargs)
627 @requires(('rados_id', opt(str_type)), ('name', opt(str_type)), ('clustername', opt(str_type)),
628 ('conffile', opt(str_type)))
629 def __setup(self, rados_id=None, name=None, clustername=None,
630 conf_defaults=None, conffile=None, conf=None, flags=0,
632 self.monitor_callback = None
633 self.monitor_callback2 = None
634 self.parsed_args = []
635 self.conf_defaults = conf_defaults
636 self.conffile = conffile
637 self.rados_id = rados_id
639 if rados_id and name:
640 raise Error("Rados(): can't supply both rados_id and name")
642 name = 'client.' + rados_id
644 name = 'client.admin'
645 if clustername is None:
648 name = cstr(name, 'name')
649 clustername = cstr(clustername, 'clustername')
652 char *_clustername = clustername
657 # Unpack void* (aka rados_config_t) from capsule
658 rados_config = <rados_config_t> PyCapsule_GetPointer(context, NULL)
660 ret = rados_create_with_context(&self.cluster, rados_config)
663 ret = rados_create2(&self.cluster, _clustername, _name, _flags)
665 raise Error("rados_initialize failed with error code: %d" % ret)
667 self.state = "configuring"
668 # order is important: conf_defaults, then conffile, then conf
670 for key, value in conf_defaults.items():
671 self.conf_set(key, value)
672 if conffile is not None:
673 # read the default conf file when '' is given
676 self.conf_read_file(conffile)
678 for key, value in conf.items():
679 self.conf_set(key, value)
681 def require_state(self, *args):
683 Checks if the Rados object is in a special state
685 :raises: :class:`RadosStateError`
687 if self.state in args:
689 raise RadosStateError("You cannot perform that operation on a \
690 Rados object in state %s." % self.state)
694 Disconnects from the cluster. Call this explicitly when a
695 Rados.connect()ed object is no longer used.
697 if self.state != "shutdown":
699 rados_shutdown(self.cluster)
700 self.state = "shutdown"
706 def __exit__(self, type_, value, traceback):
712 Get the version number of the ``librados`` C library.
714 :returns: a tuple of ``(major, minor, extra)`` components of the
721 rados_version(&major, &minor, &extra)
722 return Version(major, minor, extra)
724 @requires(('path', opt(str_type)))
725 def conf_read_file(self, path=None):
727 Configure the cluster handle using a Ceph config file.
729 :param path: path to the config file
732 self.require_state("configuring", "connected")
733 path = cstr(path, 'path', opt=True)
735 char *_path = opt_str(path)
737 ret = rados_conf_read_file(self.cluster, _path)
739 raise make_ex(ret, "error calling conf_read_file")
741 def conf_parse_argv(self, args):
743 Parse known arguments from args, and remove; returned
744 args contain only those unknown to ceph
746 self.require_state("configuring", "connected")
750 cargs = cstr_list(args, 'args')
752 int _argc = len(args)
753 char **_argv = to_bytes_array(cargs)
754 char **_remargv = NULL
757 _remargv = <char **>malloc(_argc * sizeof(char *))
759 ret = rados_conf_parse_argv_remainder(self.cluster, _argc,
761 <const char**>_remargv)
763 raise make_ex(ret, "error calling conf_parse_argv_remainder")
765 # _remargv was allocated with fixed argc; collapse return
766 # list to eliminate any missing args
767 retargs = [decode_cstr(a) for a in _remargv[:_argc]
769 self.parsed_args = args
775 def conf_parse_env(self, var='CEPH_ARGS'):
777 Parse known arguments from an environment variable, normally
780 self.require_state("configuring", "connected")
784 var = cstr(var, 'var')
788 ret = rados_conf_parse_env(self.cluster, _var)
790 raise make_ex(ret, "error calling conf_parse_env")
792 @requires(('option', str_type))
793 def conf_get(self, option):
795 Get the value of a configuration option
797 :param option: which option to read
800 :returns: str - value of the option or None
801 :raises: :class:`TypeError`
803 self.require_state("configuring", "connected")
804 option = cstr(option, 'option')
806 char *_option = option
812 ret_buf = <char *>realloc_chk(ret_buf, length)
814 ret = rados_conf_get(self.cluster, _option, ret_buf, length)
816 return decode_cstr(ret_buf)
817 elif ret == -errno.ENAMETOOLONG:
819 elif ret == -errno.ENOENT:
822 raise make_ex(ret, "error calling conf_get")
826 @requires(('option', str_type), ('val', str_type))
827 def conf_set(self, option, val):
829 Set the value of a configuration option
831 :param option: which option to set
833 :param option: value of the option
836 :raises: :class:`TypeError`, :class:`ObjectNotFound`
838 self.require_state("configuring", "connected")
839 option = cstr(option, 'option')
840 val = cstr(val, 'val')
842 char *_option = option
846 ret = rados_conf_set(self.cluster, _option, _val)
848 raise make_ex(ret, "error calling conf_set")
850 def ping_monitor(self, mon_id):
852 Ping a monitor to assess liveness
854 May be used as a simply way to assess liveness, or to obtain
855 information about the monitor in a simple way even in the
858 :param mon_id: the ID portion of the monitor's name (i.e., mon.<ID>)
860 :returns: the string reply from the monitor
863 self.require_state("configuring", "connected")
865 mon_id = cstr(mon_id, 'mon_id')
867 char *_mon_id = mon_id
872 ret = rados_ping_monitor(self.cluster, _mon_id, &outstr, &outstrlen)
875 raise make_ex(ret, "error calling ping_monitor")
878 my_outstr = outstr[:outstrlen]
879 rados_buffer_free(outstr)
880 return decode_cstr(my_outstr)
882 def connect(self, timeout=0):
884 Connect to the cluster. Use shutdown() to release resources.
886 self.require_state("configuring")
887 # NOTE(sileht): timeout was supported by old python API,
888 # but this is not something available in C API, so ignore
889 # for now and remove it later
891 ret = rados_connect(self.cluster)
893 raise make_ex(ret, "error connecting to the cluster")
894 self.state = "connected"
896 def get_instance_id(self):
898 Get a global id for current instance
900 self.require_state("connected")
902 ret = rados_get_instance_id(self.cluster)
905 def get_cluster_stats(self):
907 Read usage info about the cluster
909 This tells you total space, space used, space available, and number
910 of objects. These are not updated immediately when data is written,
911 they are eventually consistent.
913 :returns: dict - contains the following keys:
915 - ``kb`` (int) - total space
917 - ``kb_used`` (int) - space used
919 - ``kb_avail`` (int) - free space available
921 - ``num_objects`` (int) - number of objects
925 rados_cluster_stat_t stats
928 ret = rados_cluster_stat(self.cluster, &stats)
932 ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id)
933 return {'kb': stats.kb,
934 'kb_used': stats.kb_used,
935 'kb_avail': stats.kb_avail,
936 'num_objects': stats.num_objects}
938 @requires(('pool_name', str_type))
939 def pool_exists(self, pool_name):
941 Checks if a given pool exists.
943 :param pool_name: name of the pool to check
946 :raises: :class:`TypeError`, :class:`Error`
947 :returns: bool - whether the pool exists, false otherwise.
949 self.require_state("connected")
951 pool_name = cstr(pool_name, 'pool_name')
953 char *_pool_name = pool_name
956 ret = rados_pool_lookup(self.cluster, _pool_name)
959 elif ret == -errno.ENOENT:
962 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
964 @requires(('pool_name', str_type))
965 def pool_lookup(self, pool_name):
967 Returns a pool's ID based on its name.
969 :param pool_name: name of the pool to look up
972 :raises: :class:`TypeError`, :class:`Error`
973 :returns: int - pool ID, or None if it doesn't exist
975 self.require_state("connected")
976 pool_name = cstr(pool_name, 'pool_name')
978 char *_pool_name = pool_name
981 ret = rados_pool_lookup(self.cluster, _pool_name)
984 elif ret == -errno.ENOENT:
987 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
989 @requires(('pool_id', int))
990 def pool_reverse_lookup(self, pool_id):
992 Returns a pool's name based on its ID.
994 :param pool_id: ID of the pool to look up
997 :raises: :class:`TypeError`, :class:`Error`
998 :returns: string - pool name, or None if it doesn't exist
1000 self.require_state("connected")
1002 int64_t _pool_id = pool_id
1008 name = <char *>realloc_chk(name, size)
1010 ret = rados_pool_reverse_lookup(self.cluster, _pool_id, name, size)
1013 elif ret != -errno.ERANGE and size <= 4096:
1015 elif ret == -errno.ENOENT:
1018 raise make_ex(ret, "error reverse looking up pool '%s'" % pool_id)
1020 return decode_cstr(name)
1025 @requires(('pool_name', str_type), ('crush_rule', opt(int)))
1026 def create_pool(self, pool_name, crush_rule=None):
1029 - with default settings: if crush_rule=None
1030 - with a specific CRUSH rule: crush_rule given
1032 :param pool_name: name of the pool to create
1033 :type pool_name: str
1034 :param crush_rule: rule to use for placement in the new pool
1035 :type crush_rule: int
1037 :raises: :class:`TypeError`, :class:`Error`
1039 self.require_state("connected")
1041 pool_name = cstr(pool_name, 'pool_name')
1043 char *_pool_name = pool_name
1046 if crush_rule is None:
1048 ret = rados_pool_create(self.cluster, _pool_name)
1050 _crush_rule = crush_rule
1052 ret = rados_pool_create_with_crush_rule(self.cluster, _pool_name, _crush_rule)
1054 raise make_ex(ret, "error creating pool '%s'" % pool_name)
1056 @requires(('pool_id', int))
1057 def get_pool_base_tier(self, pool_id):
1061 :returns: base pool, or pool_id if tiering is not configured for the pool
1063 self.require_state("connected")
1065 int64_t base_tier = 0
1066 int64_t _pool_id = pool_id
1069 ret = rados_pool_get_base_tier(self.cluster, _pool_id, &base_tier)
1071 raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id)
1072 return int(base_tier)
1074 @requires(('pool_name', str_type))
1075 def delete_pool(self, pool_name):
1077 Delete a pool and all data inside it.
1079 The pool is removed from the cluster immediately,
1080 but the actual data is deleted in the background.
1082 :param pool_name: name of the pool to delete
1083 :type pool_name: str
1085 :raises: :class:`TypeError`, :class:`Error`
1087 self.require_state("connected")
1089 pool_name = cstr(pool_name, 'pool_name')
1091 char *_pool_name = pool_name
1094 ret = rados_pool_delete(self.cluster, _pool_name)
1096 raise make_ex(ret, "error deleting pool '%s'" % pool_name)
1098 @requires(('pool_id', int))
1099 def get_inconsistent_pgs(self, pool_id):
1101 List inconsistent placement groups in the given pool
1103 :param pool_id: ID of the pool in which PGs are listed
1105 :returns: list - inconsistent placement groups
1107 self.require_state("connected")
1109 int64_t pool = pool_id
1115 pgs = <char *>realloc_chk(pgs, size);
1117 ret = rados_inconsistent_pg_list(self.cluster, pool,
1124 raise make_ex(ret, "error calling inconsistent_pg_list")
1125 return [pg for pg in decode_cstr(pgs[:ret]).split('\0') if pg]
1129 def list_pools(self):
1131 Gets a list of pool names.
1133 :returns: list - of pool names.
1135 self.require_state("connected")
1138 char *c_names = NULL
1142 c_names = <char *>realloc_chk(c_names, size)
1144 ret = rados_pool_list(self.cluster, c_names, size)
1149 return [name for name in decode_cstr(c_names[:ret]).split('\0')
1156 Get the fsid of the cluster as a hexadecimal string.
1158 :raises: :class:`Error`
1159 :returns: str - cluster fsid
1161 self.require_state("connected")
1165 PyObject* ret_s = NULL
1167 ret_s = PyBytes_FromStringAndSize(NULL, buf_len)
1169 ret_buf = PyBytes_AsString(ret_s)
1171 ret = rados_cluster_fsid(self.cluster, ret_buf, buf_len)
1173 raise make_ex(ret, "error getting cluster fsid")
1174 if ret != <int>buf_len:
1175 _PyBytes_Resize(&ret_s, ret)
1176 return <object>ret_s
1178 # We DECREF unconditionally: the cast to object above will have
1179 # INCREFed if necessary. This also takes care of exceptions,
1180 # including if _PyString_Resize fails (that will free the string
1181 # itself and set ret_s to NULL, hence XDECREF).
1182 ref.Py_XDECREF(ret_s)
1184 @requires(('ioctx_name', str_type))
1185 def open_ioctx(self, ioctx_name):
1187 Create an io context
1189 The io context allows you to perform operations within a particular
1192 :param ioctx_name: name of the pool
1193 :type ioctx_name: str
1195 :raises: :class:`TypeError`, :class:`Error`
1196 :returns: Ioctx - Rados Ioctx object
1198 self.require_state("connected")
1199 ioctx_name = cstr(ioctx_name, 'ioctx_name')
1202 char *_ioctx_name = ioctx_name
1204 ret = rados_ioctx_create(self.cluster, _ioctx_name, &ioctx)
1206 raise make_ex(ret, "error opening pool '%s'" % ioctx_name)
1207 io = Ioctx(ioctx_name)
1211 @requires(('pool_id', int))
1212 def open_ioctx2(self, pool_id):
1214 Create an io context
1216 The io context allows you to perform operations within a particular
1219 :param pool_id: ID of the pool
1222 :raises: :class:`TypeError`, :class:`Error`
1223 :returns: Ioctx - Rados Ioctx object
1225 self.require_state("connected")
1228 int64_t _pool_id = pool_id
1230 ret = rados_ioctx_create2(self.cluster, _pool_id, &ioctx)
1232 raise make_ex(ret, "error opening pool id '%s'" % pool_id)
1233 io = Ioctx(str(pool_id))
1237 def mon_command(self, cmd, inbuf, timeout=0, target=None):
1239 mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen)
1240 returns (int ret, string outbuf, string outs)
1242 # NOTE(sileht): timeout is ignored because C API doesn't provide
1243 # timeout argument, but we keep it for backward compat with old python binding
1245 self.require_state("connected")
1246 cmd = cstr_list(cmd, 'c')
1248 if isinstance(target, int):
1249 # NOTE(sileht): looks weird but test_monmap_dump pass int
1250 target = str(target)
1252 target = cstr(target, 'target', opt=True)
1253 inbuf = cstr(inbuf, 'inbuf')
1256 char *_target = opt_str(target)
1257 char **_cmd = to_bytes_array(cmd)
1258 size_t _cmdlen = len(cmd)
1260 char *_inbuf = inbuf
1261 size_t _inbuf_len = len(inbuf)
1271 ret = rados_mon_command_target(self.cluster, _target,
1272 <const char **>_cmd, _cmdlen,
1273 <const char*>_inbuf, _inbuf_len,
1274 &_outbuf, &_outbuf_len,
1278 ret = rados_mon_command(self.cluster,
1279 <const char **>_cmd, _cmdlen,
1280 <const char*>_inbuf, _inbuf_len,
1281 &_outbuf, &_outbuf_len,
1284 my_outs = decode_cstr(_outs[:_outs_len])
1285 my_outbuf = _outbuf[:_outbuf_len]
1287 rados_buffer_free(_outs)
1289 rados_buffer_free(_outbuf)
1290 return (ret, my_outbuf, my_outs)
1294 def osd_command(self, osdid, cmd, inbuf, timeout=0):
1296 osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1297 returns (int ret, string outbuf, string outs)
1299 # NOTE(sileht): timeout is ignored because C API doesn't provide
1300 # timeout argument, but we keep it for backward compat with old python binding
1301 self.require_state("connected")
1303 cmd = cstr_list(cmd, 'cmd')
1304 inbuf = cstr(inbuf, 'inbuf')
1308 char **_cmd = to_bytes_array(cmd)
1309 size_t _cmdlen = len(cmd)
1311 char *_inbuf = inbuf
1312 size_t _inbuf_len = len(inbuf)
1321 ret = rados_osd_command(self.cluster, _osdid,
1322 <const char **>_cmd, _cmdlen,
1323 <const char*>_inbuf, _inbuf_len,
1324 &_outbuf, &_outbuf_len,
1327 my_outs = decode_cstr(_outs[:_outs_len])
1328 my_outbuf = _outbuf[:_outbuf_len]
1330 rados_buffer_free(_outs)
1332 rados_buffer_free(_outbuf)
1333 return (ret, my_outbuf, my_outs)
1337 def mgr_command(self, cmd, inbuf, timeout=0):
1339 returns (int ret, string outbuf, string outs)
1341 # NOTE(sileht): timeout is ignored because C API doesn't provide
1342 # timeout argument, but we keep it for backward compat with old python binding
1343 self.require_state("connected")
1345 cmd = cstr_list(cmd, 'cmd')
1346 inbuf = cstr(inbuf, 'inbuf')
1349 char **_cmd = to_bytes_array(cmd)
1350 size_t _cmdlen = len(cmd)
1352 char *_inbuf = inbuf
1353 size_t _inbuf_len = len(inbuf)
1362 ret = rados_mgr_command(self.cluster,
1363 <const char **>_cmd, _cmdlen,
1364 <const char*>_inbuf, _inbuf_len,
1365 &_outbuf, &_outbuf_len,
1368 my_outs = decode_cstr(_outs[:_outs_len])
1369 my_outbuf = _outbuf[:_outbuf_len]
1371 rados_buffer_free(_outs)
1373 rados_buffer_free(_outbuf)
1374 return (ret, my_outbuf, my_outs)
1378 def pg_command(self, pgid, cmd, inbuf, timeout=0):
1380 pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1381 returns (int ret, string outbuf, string outs)
1383 # NOTE(sileht): timeout is ignored because C API doesn't provide
1384 # timeout argument, but we keep it for backward compat with old python binding
1385 self.require_state("connected")
1387 pgid = cstr(pgid, 'pgid')
1388 cmd = cstr_list(cmd, 'cmd')
1389 inbuf = cstr(inbuf, 'inbuf')
1393 char **_cmd = to_bytes_array(cmd)
1394 size_t _cmdlen = len(cmd)
1396 char *_inbuf = inbuf
1397 size_t _inbuf_len = len(inbuf)
1406 ret = rados_pg_command(self.cluster, _pgid,
1407 <const char **>_cmd, _cmdlen,
1408 <const char *>_inbuf, _inbuf_len,
1409 &_outbuf, &_outbuf_len,
1412 my_outs = decode_cstr(_outs[:_outs_len])
1413 my_outbuf = _outbuf[:_outbuf_len]
1415 rados_buffer_free(_outs)
1417 rados_buffer_free(_outbuf)
1418 return (ret, my_outbuf, my_outs)
1422 def wait_for_latest_osdmap(self):
1423 self.require_state("connected")
1425 ret = rados_wait_for_latest_osdmap(self.cluster)
1428 def blacklist_add(self, client_address, expire_seconds=0):
1430 Blacklist a client from the OSDs
1432 :param client_address: client address
1433 :type client_address: str
1434 :param expire_seconds: number of seconds to blacklist
1435 :type expire_seconds: int
1437 :raises: :class:`Error`
1439 self.require_state("connected")
1440 client_address = cstr(client_address, 'client_address')
1442 uint32_t _expire_seconds = expire_seconds
1443 char *_client_address = client_address
1446 ret = rados_blacklist_add(self.cluster, _client_address, _expire_seconds)
1448 raise make_ex(ret, "error blacklisting client '%s'" % client_address)
1450 def monitor_log(self, level, callback, arg):
1451 if level not in MONITOR_LEVELS:
1452 raise LogicError("invalid monitor level " + level)
1453 if callback is not None and not callable(callback):
1454 raise LogicError("callback must be a callable function or None")
1456 level = cstr(level, 'level')
1457 cdef char *_level = level
1459 if callback is None:
1461 r = rados_monitor_log(self.cluster, <const char*>_level, NULL, NULL)
1462 self.monitor_callback = None
1463 self.monitor_callback2 = None
1466 cb = (callback, arg)
1467 cdef PyObject* _arg = <PyObject*>cb
1469 r = rados_monitor_log(self.cluster, <const char*>_level,
1470 <rados_log_callback_t>&__monitor_callback, _arg)
1473 raise make_ex(r, 'error calling rados_monitor_log')
1474 # NOTE(sileht): Prevents the callback method from being garbage collected
1475 self.monitor_callback = cb
1476 self.monitor_callback2 = None
1478 def monitor_log2(self, level, callback, arg):
1479 if level not in MONITOR_LEVELS:
1480 raise LogicError("invalid monitor level " + level)
1481 if callback is not None and not callable(callback):
1482 raise LogicError("callback must be a callable function or None")
1484 level = cstr(level, 'level')
1485 cdef char *_level = level
1487 if callback is None:
1489 r = rados_monitor_log2(self.cluster, <const char*>_level, NULL, NULL)
1490 self.monitor_callback = None
1491 self.monitor_callback2 = None
1494 cb = (callback, arg)
1495 cdef PyObject* _arg = <PyObject*>cb
1497 r = rados_monitor_log2(self.cluster, <const char*>_level,
1498 <rados_log_callback2_t>&__monitor_callback2, _arg)
1501 raise make_ex(r, 'error calling rados_monitor_log')
1502 # NOTE(sileht): Prevents the callback method from being garbage collected
1503 self.monitor_callback = None
1504 self.monitor_callback2 = cb
1506 @requires(('service', str_type), ('daemon', str_type), ('metadata', dict))
1507 def service_daemon_register(self, service, daemon, metadata):
1509 :param str service: service name (e.g. "rgw")
1510 :param str daemon: daemon name (e.g. "gwfoo")
1511 :param dict metadata: static metadata about the register daemon
1512 (e.g., the version of Ceph, the kernel version.)
1514 service = cstr(service, 'service')
1515 daemon = cstr(daemon, 'daemon')
1516 metadata_dict = '\0'.join(chain.from_iterable(metadata.items()))
1517 metadata_dict += '\0'
1519 char *_service = service
1520 char *_daemon = daemon
1521 char *_metadata = metadata_dict
1524 ret = rados_service_register(self.cluster, _service, _daemon, _metadata)
1526 raise make_ex(ret, "error calling service_register()")
1528 @requires(('metadata', dict))
1529 def service_daemon_update(self, status):
1530 status_dict = '\0'.join(chain.from_iterable(status.items()))
1533 char *_status = status_dict
1536 ret = rados_service_update_status(self.cluster, _status)
1538 raise make_ex(ret, "error calling service_daemon_update()")
1541 cdef class OmapIterator(object):
1544 cdef public Ioctx ioctx
1545 cdef rados_omap_iter_t ctx
1547 def __cinit__(self, Ioctx ioctx):
1555 Get the next key-value pair in the object
1556 :returns: next rados.OmapItem
1564 ret = rados_omap_get_next(self.ctx, &key_, &val_, &len_)
1567 raise make_ex(ret, "error iterating over the omap")
1569 raise StopIteration()
1570 key = decode_cstr(key_)
1576 def __dealloc__(self):
1578 rados_omap_get_end(self.ctx)
1581 cdef class ObjectIterator(object):
1582 """rados.Ioctx Object iterator"""
1584 cdef rados_list_ctx_t ctx
1586 cdef public object ioctx
1588 def __cinit__(self, Ioctx ioctx):
1592 ret = rados_nobjects_list_open(ioctx.io, &self.ctx)
1594 raise make_ex(ret, "error iterating over the objects in ioctx '%s'"
1602 Get the next object name and locator in the pool
1604 :raises: StopIteration
1605 :returns: next rados.Ioctx Object
1608 const char *key_ = NULL
1609 const char *locator_ = NULL
1610 const char *nspace_ = NULL
1613 ret = rados_nobjects_list_next(self.ctx, &key_, &locator_, &nspace_)
1616 raise StopIteration()
1618 key = decode_cstr(key_)
1619 locator = decode_cstr(locator_) if locator_ != NULL else None
1620 nspace = decode_cstr(nspace_) if nspace_ != NULL else None
1621 return Object(self.ioctx, key, locator, nspace)
1623 def __dealloc__(self):
1625 rados_nobjects_list_close(self.ctx)
1628 cdef class XattrIterator(object):
1629 """Extended attribute iterator"""
1631 cdef rados_xattrs_iter_t it
1634 cdef public Ioctx ioctx
1635 cdef public object oid
1637 def __cinit__(self, Ioctx ioctx, oid):
1639 self.oid = cstr(oid, 'oid')
1640 self._oid = self.oid
1643 ret = rados_getxattrs(ioctx.io, self._oid, &self.it)
1645 raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid)
1652 Get the next xattr on the object
1654 :raises: StopIteration
1655 :returns: pair - of name and value of the next Xattr
1658 const char *name_ = NULL
1659 const char *val_ = NULL
1663 ret = rados_getxattrs_next(self.it, &name_, &val_, &len_)
1665 raise make_ex(ret, "error iterating over the extended attributes \
1666 in '%s'" % self.oid)
1668 raise StopIteration()
1669 name = decode_cstr(name_)
1673 def __dealloc__(self):
1675 rados_getxattrs_end(self.it)
1678 cdef class SnapIterator(object):
1679 """Snapshot iterator"""
1681 cdef public Ioctx ioctx
1683 cdef rados_snap_t *snaps
1687 def __cinit__(self, Ioctx ioctx):
1689 # We don't know how big a buffer we need until we've called the
1690 # function. So use the exponential doubling strategy.
1691 cdef int num_snaps = 10
1693 self.snaps = <rados_snap_t*>realloc_chk(self.snaps,
1695 sizeof(rados_snap_t))
1698 ret = rados_ioctx_snap_list(ioctx.io, self.snaps, num_snaps)
1702 elif ret != -errno.ERANGE:
1703 raise make_ex(ret, "error calling rados_snap_list for \
1704 ioctx '%s'" % self.ioctx.name)
1705 num_snaps = num_snaps * 2
1713 Get the next Snapshot
1715 :raises: :class:`Error`, StopIteration
1716 :returns: Snap - next snapshot
1718 if self.cur_snap >= self.max_snap:
1722 rados_snap_t snap_id = self.snaps[self.cur_snap]
1728 name = <char *>realloc_chk(name, name_len)
1730 ret = rados_ioctx_snap_get_name(self.ioctx.io, snap_id, name, name_len)
1733 elif ret != -errno.ERANGE:
1734 raise make_ex(ret, "rados_snap_get_name error")
1736 name_len = name_len * 2
1738 snap = Snap(self.ioctx, decode_cstr(name[:name_len]).rstrip('\0'), snap_id)
1739 self.cur_snap = self.cur_snap + 1
1745 cdef class Snap(object):
1746 """Snapshot object"""
1747 cdef public Ioctx ioctx
1748 cdef public object name
1750 # NOTE(sileht): old API was storing the ctypes object
1751 # instead of the value ....
1752 cdef public rados_snap_t snap_id
1754 def __cinit__(self, Ioctx ioctx, object name, rados_snap_t snap_id):
1757 self.snap_id = snap_id
1760 return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \
1761 % (str(self.ioctx), self.name, self.snap_id)
1763 def get_timestamp(self):
1765 Find when a snapshot in the current pool occurred
1767 :raises: :class:`Error`
1768 :returns: datetime - the data and time the snapshot was created
1770 cdef time_t snap_time
1773 ret = rados_ioctx_snap_get_stamp(self.ioctx.io, self.snap_id, &snap_time)
1775 raise make_ex(ret, "rados_ioctx_snap_get_stamp error")
1776 return datetime.fromtimestamp(snap_time)
1779 cdef class Completion(object):
1780 """completion object"""
1788 rados_callback_t complete_cb
1789 rados_callback_t safe_cb
1790 rados_completion_t rados_comp
1793 def __cinit__(self, Ioctx ioctx, object oncomplete, object onsafe):
1794 self.oncomplete = oncomplete
1795 self.onsafe = onsafe
1800 Is an asynchronous operation safe?
1802 This does not imply that the safe callback has finished.
1804 :returns: True if the operation is safe
1807 ret = rados_aio_is_safe(self.rados_comp)
1810 def is_complete(self):
1812 Has an asynchronous operation completed?
1814 This does not imply that the safe callback has finished.
1816 :returns: True if the operation is completed
1819 ret = rados_aio_is_complete(self.rados_comp)
1822 def wait_for_safe(self):
1824 Wait for an asynchronous operation to be marked safe
1826 This does not imply that the safe callback has finished.
1829 rados_aio_wait_for_safe(self.rados_comp)
1831 def wait_for_complete(self):
1833 Wait for an asynchronous operation to complete
1835 This does not imply that the complete callback has finished.
1838 rados_aio_wait_for_complete(self.rados_comp)
1840 def wait_for_safe_and_cb(self):
1842 Wait for an asynchronous operation to be marked safe and for
1843 the safe callback to have returned
1846 rados_aio_wait_for_safe_and_cb(self.rados_comp)
1848 def wait_for_complete_and_cb(self):
1850 Wait for an asynchronous operation to complete and for the
1851 complete callback to have returned
1853 :returns: whether the operation is completed
1856 ret = rados_aio_wait_for_complete_and_cb(self.rados_comp)
1859 def get_return_value(self):
1861 Get the return value of an asychronous operation
1863 The return value is set when the operation is complete or safe,
1864 whichever comes first.
1866 :returns: int - return value of the operation
1869 ret = rados_aio_get_return_value(self.rados_comp)
1872 def __dealloc__(self):
1874 Release a completion
1876 Call this when you no longer need the completion. It may not be
1877 freed immediately if the operation is not acked and committed.
1879 ref.Py_XDECREF(self.buf)
1881 if self.rados_comp != NULL:
1883 rados_aio_release(self.rados_comp)
1884 self.rados_comp = NULL
1886 def _complete(self):
1887 self.oncomplete(self)
1888 with self.ioctx.lock:
1890 self.ioctx.complete_completions.remove(self)
1894 with self.ioctx.lock:
1896 self.ioctx.safe_completions.remove(self)
1899 with self.ioctx.lock:
1901 self.ioctx.complete_completions.remove(self)
1903 self.ioctx.safe_completions.remove(self)
1906 class OpCtx(object):
1907 def __enter__(self):
1908 return self.create()
1910 def __exit__(self, type, msg, traceback):
1914 cdef class WriteOp(object):
1915 cdef rados_write_op_t write_op
1919 self.write_op = rados_create_write_op()
1924 rados_release_write_op(self.write_op)
1926 @requires(('exclusive', opt(int)))
1927 def new(self, exclusive=None):
1933 int _exclusive = exclusive
1936 rados_write_op_create(self.write_op, _exclusive, NULL)
1944 rados_write_op_remove(self.write_op)
1946 @requires(('flags', int))
1947 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
1949 Set flags for the last operation added to this write_op.
1950 :para flags: flags to apply to the last operation
1958 rados_write_op_set_flags(self.write_op, _flags)
1960 @requires(('to_write', bytes))
1961 def append(self, to_write):
1963 Append data to an object synchronously
1964 :param to_write: data to write
1965 :type to_write: bytes
1969 char *_to_write = to_write
1970 size_t length = len(to_write)
1973 rados_write_op_append(self.write_op, _to_write, length)
1975 @requires(('to_write', bytes))
1976 def write_full(self, to_write):
1978 Write whole object, atomically replacing it.
1979 :param to_write: data to write
1980 :type to_write: bytes
1984 char *_to_write = to_write
1985 size_t length = len(to_write)
1988 rados_write_op_write_full(self.write_op, _to_write, length)
1990 @requires(('to_write', bytes), ('offset', int))
1991 def write(self, to_write, offset=0):
1994 :param to_write: data to write
1995 :type to_write: bytes
1996 :param offset: byte offset in the object to begin writing at
2001 char *_to_write = to_write
2002 size_t length = len(to_write)
2003 uint64_t _offset = offset
2006 rados_write_op_write(self.write_op, _to_write, length, _offset)
2008 @requires(('version', int))
2009 def assert_version(self, version):
2011 Check if object's version is the expected one.
2012 :param version: expected version of the object
2016 uint64_t _version = version
2019 rados_write_op_assert_version(self.write_op, _version)
2021 @requires(('offset', int), ('length', int))
2022 def zero(self, offset, length):
2024 Zero part of an object.
2025 :param offset: byte offset in the object to begin writing at
2027 :param offset: number of zero to write
2032 size_t _length = length
2033 uint64_t _offset = offset
2036 rados_write_op_zero(self.write_op, _length, _offset)
2038 @requires(('offset', int))
2039 def truncate(self, offset):
2042 :param offset: byte offset in the object to begin truncating at
2047 uint64_t _offset = offset
2050 rados_write_op_truncate(self.write_op, _offset)
2053 class WriteOpCtx(WriteOp, OpCtx):
2054 """write operation context manager"""
2057 cdef class ReadOp(object):
2058 cdef rados_read_op_t read_op
2062 self.read_op = rados_create_read_op()
2067 rados_release_read_op(self.read_op)
2069 @requires(('flags', int))
2070 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
2072 Set flags for the last operation added to this read_op.
2073 :para flags: flags to apply to the last operation
2081 rados_read_op_set_flags(self.read_op, _flags)
2084 class ReadOpCtx(ReadOp, OpCtx):
2085 """read operation context manager"""
2088 cdef int __aio_safe_cb(rados_completion_t completion, void *args) with gil:
2090 Callback to onsafe() for asynchronous operations
2092 cdef object cb = <object>args
2097 cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
2099 Callback to oncomplete() for asynchronous operations
2101 cdef object cb = <object>args
2106 cdef class Ioctx(object):
2107 """rados.Ioctx object"""
2108 # NOTE(sileht): attributes declared in .pyd
2110 def __init__(self, name):
2114 self.locator_key = ""
2116 self.lock = threading.Lock()
2117 self.safe_completions = []
2118 self.complete_completions = []
2120 def __enter__(self):
2123 def __exit__(self, type_, value, traceback):
2127 def __dealloc__(self):
2130 def __track_completion(self, completion_obj):
2131 if completion_obj.oncomplete:
2133 self.complete_completions.append(completion_obj)
2134 if completion_obj.onsafe:
2136 self.safe_completions.append(completion_obj)
2138 def __get_completion(self, oncomplete, onsafe):
2140 Constructs a completion to use with asynchronous operations
2142 :param oncomplete: what to do when the write is safe and complete in memory
2144 :type oncomplete: completion
2145 :param onsafe: what to do when the write is safe and complete on storage
2147 :type onsafe: completion
2149 :raises: :class:`Error`
2150 :returns: completion object
2153 completion_obj = Completion(self, oncomplete, onsafe)
2156 rados_callback_t complete_cb = NULL
2157 rados_callback_t safe_cb = NULL
2158 rados_completion_t completion
2159 PyObject* p_completion_obj= <PyObject*>completion_obj
2162 complete_cb = <rados_callback_t>&__aio_complete_cb
2164 safe_cb = <rados_callback_t>&__aio_safe_cb
2167 ret = rados_aio_create_completion(p_completion_obj, complete_cb, safe_cb,
2170 raise make_ex(ret, "error getting a completion")
2172 completion_obj.rados_comp = completion
2173 return completion_obj
2175 @requires(('object_name', str_type), ('oncomplete', opt(Callable)))
2176 def aio_stat(self, object_name, oncomplete):
2178 Asynchronously get object stats (size/mtime)
2180 oncomplete will be called with the returned size and mtime
2181 as well as the completion:
2183 oncomplete(completion, size, mtime)
2185 :param object_name: the name of the object to get stats from
2186 :type object_name: str
2187 :param oncomplete: what to do when the stat is complete
2188 :type oncomplete: completion
2190 :raises: :class:`Error`
2191 :returns: completion object
2194 object_name = cstr(object_name, 'object_name')
2197 Completion completion
2198 char *_object_name = object_name
2202 def oncomplete_(completion_v):
2203 cdef Completion _completion_v = completion_v
2204 return_value = _completion_v.get_return_value()
2205 if return_value >= 0:
2206 return oncomplete(_completion_v, psize, time.localtime(pmtime))
2208 return oncomplete(_completion_v, None, None)
2210 completion = self.__get_completion(oncomplete_, None)
2211 self.__track_completion(completion)
2213 ret = rados_aio_stat(self.io, _object_name, completion.rados_comp,
2217 completion._cleanup()
2218 raise make_ex(ret, "error stating %s" % object_name)
2221 @requires(('object_name', str_type), ('to_write', bytes), ('offset', int),
2222 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2223 def aio_write(self, object_name, to_write, offset=0,
2224 oncomplete=None, onsafe=None):
2226 Write data to an object asynchronously
2228 Queues the write and returns.
2230 :param object_name: name of the object
2231 :type object_name: str
2232 :param to_write: data to write
2233 :type to_write: bytes
2234 :param offset: byte offset in the object to begin writing at
2236 :param oncomplete: what to do when the write is safe and complete in memory
2238 :type oncomplete: completion
2239 :param onsafe: what to do when the write is safe and complete on storage
2241 :type onsafe: completion
2243 :raises: :class:`Error`
2244 :returns: completion object
2247 object_name = cstr(object_name, 'object_name')
2250 Completion completion
2251 char* _object_name = object_name
2252 char* _to_write = to_write
2253 size_t size = len(to_write)
2254 uint64_t _offset = offset
2256 completion = self.__get_completion(oncomplete, onsafe)
2257 self.__track_completion(completion)
2259 ret = rados_aio_write(self.io, _object_name, completion.rados_comp,
2260 _to_write, size, _offset)
2262 completion._cleanup()
2263 raise make_ex(ret, "error writing object %s" % object_name)
2266 @requires(('object_name', str_type), ('to_write', bytes), ('oncomplete', opt(Callable)),
2267 ('onsafe', opt(Callable)))
2268 def aio_write_full(self, object_name, to_write,
2269 oncomplete=None, onsafe=None):
2271 Asynchronously write an entire object
2273 The object is filled with the provided data. If the object exists,
2274 it is atomically truncated and then written.
2275 Queues the write and returns.
2277 :param object_name: name of the object
2278 :type object_name: str
2279 :param to_write: data to write
2281 :param oncomplete: what to do when the write is safe and complete in memory
2283 :type oncomplete: completion
2284 :param onsafe: what to do when the write is safe and complete on storage
2286 :type onsafe: completion
2288 :raises: :class:`Error`
2289 :returns: completion object
2292 object_name = cstr(object_name, 'object_name')
2295 Completion completion
2296 char* _object_name = object_name
2297 char* _to_write = to_write
2298 size_t size = len(to_write)
2300 completion = self.__get_completion(oncomplete, onsafe)
2301 self.__track_completion(completion)
2303 ret = rados_aio_write_full(self.io, _object_name,
2304 completion.rados_comp,
2307 completion._cleanup()
2308 raise make_ex(ret, "error writing object %s" % object_name)
2311 @requires(('object_name', str_type), ('to_append', bytes), ('oncomplete', opt(Callable)),
2312 ('onsafe', opt(Callable)))
2313 def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):
2315 Asynchronously append data to an object
2317 Queues the write and returns.
2319 :param object_name: name of the object
2320 :type object_name: str
2321 :param to_append: data to append
2322 :type to_append: str
2323 :param offset: byte offset in the object to begin writing at
2325 :param oncomplete: what to do when the write is safe and complete in memory
2327 :type oncomplete: completion
2328 :param onsafe: what to do when the write is safe and complete on storage
2330 :type onsafe: completion
2332 :raises: :class:`Error`
2333 :returns: completion object
2335 object_name = cstr(object_name, 'object_name')
2338 Completion completion
2339 char* _object_name = object_name
2340 char* _to_append = to_append
2341 size_t size = len(to_append)
2343 completion = self.__get_completion(oncomplete, onsafe)
2344 self.__track_completion(completion)
2346 ret = rados_aio_append(self.io, _object_name,
2347 completion.rados_comp,
2350 completion._cleanup()
2351 raise make_ex(ret, "error appending object %s" % object_name)
2354 def aio_flush(self):
2356 Block until all pending writes in an io context are safe
2358 :raises: :class:`Error`
2361 ret = rados_aio_flush(self.io)
2363 raise make_ex(ret, "error flushing")
2365 @requires(('object_name', str_type), ('length', int), ('offset', int),
2366 ('oncomplete', opt(Callable)))
2367 def aio_read(self, object_name, length, offset, oncomplete):
2369 Asynchronously read data from an object
2371 oncomplete will be called with the returned read value as
2372 well as the completion:
2374 oncomplete(completion, data_read)
2376 :param object_name: name of the object to read from
2377 :type object_name: str
2378 :param length: the number of bytes to read
2380 :param offset: byte offset in the object to begin reading from
2382 :param oncomplete: what to do when the read is complete
2383 :type oncomplete: completion
2385 :raises: :class:`Error`
2386 :returns: completion object
2389 object_name = cstr(object_name, 'object_name')
2392 Completion completion
2393 char* _object_name = object_name
2394 uint64_t _offset = offset
2397 size_t _length = length
2399 def oncomplete_(completion_v):
2400 cdef Completion _completion_v = completion_v
2401 return_value = _completion_v.get_return_value()
2402 if return_value > 0 and return_value != length:
2403 _PyBytes_Resize(&_completion_v.buf, return_value)
2404 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2406 completion = self.__get_completion(oncomplete_, None)
2407 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2408 ret_buf = PyBytes_AsString(completion.buf)
2409 self.__track_completion(completion)
2411 ret = rados_aio_read(self.io, _object_name, completion.rados_comp,
2412 ret_buf, _length, _offset)
2414 completion._cleanup()
2415 raise make_ex(ret, "error reading %s" % object_name)
2418 @requires(('object_name', str_type), ('cls', str_type), ('method', str_type),
2419 ('data', bytes), ('length', int),
2420 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2421 def aio_execute(self, object_name, cls, method, data,
2422 length=8192, oncomplete=None, onsafe=None):
2424 Asynchronously execute an OSD class method on an object.
2426 oncomplete and onsafe will be called with the data returned from
2427 the plugin as well as the completion:
2429 oncomplete(completion, data)
2430 onsafe(completion, data)
2432 :param object_name: name of the object
2433 :type object_name: str
2434 :param cls: name of the object class
2436 :param method: name of the method
2438 :param data: input data
2440 :param length: size of output buffer in bytes (default=8192)
2442 :param oncomplete: what to do when the execution is complete
2443 :type oncomplete: completion
2444 :param onsafe: what to do when the execution is safe and complete
2445 :type onsafe: completion
2447 :raises: :class:`Error`
2448 :returns: completion object
2451 object_name = cstr(object_name, 'object_name')
2452 cls = cstr(cls, 'cls')
2453 method = cstr(method, 'method')
2455 Completion completion
2456 char *_object_name = object_name
2458 char *_method = method
2460 size_t _data_len = len(data)
2463 size_t _length = length
2465 def oncomplete_(completion_v):
2466 cdef Completion _completion_v = completion_v
2467 return_value = _completion_v.get_return_value()
2468 if return_value > 0 and return_value != length:
2469 _PyBytes_Resize(&_completion_v.buf, return_value)
2470 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2472 def onsafe_(completion_v):
2473 cdef Completion _completion_v = completion_v
2474 return_value = _completion_v.get_return_value()
2475 return onsafe(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2477 completion = self.__get_completion(oncomplete_ if oncomplete else None, onsafe_ if onsafe else None)
2478 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2479 ret_buf = PyBytes_AsString(completion.buf)
2480 self.__track_completion(completion)
2482 ret = rados_aio_exec(self.io, _object_name, completion.rados_comp,
2483 _cls, _method, _data, _data_len, ret_buf, _length)
2485 completion._cleanup()
2486 raise make_ex(ret, "error executing %s::%s on %s" % (cls, method, object_name))
2489 @requires(('object_name', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2490 def aio_remove(self, object_name, oncomplete=None, onsafe=None):
2492 Asynchronously remove an object
2494 :param object_name: name of the object to remove
2495 :type object_name: str
2496 :param oncomplete: what to do when the remove is safe and complete in memory
2498 :type oncomplete: completion
2499 :param onsafe: what to do when the remove is safe and complete on storage
2501 :type onsafe: completion
2503 :raises: :class:`Error`
2504 :returns: completion object
2506 object_name = cstr(object_name, 'object_name')
2509 Completion completion
2510 char* _object_name = object_name
2512 completion = self.__get_completion(oncomplete, onsafe)
2513 self.__track_completion(completion)
2515 ret = rados_aio_remove(self.io, _object_name,
2516 completion.rados_comp)
2518 completion._cleanup()
2519 raise make_ex(ret, "error removing %s" % object_name)
2522 def require_ioctx_open(self):
2524 Checks if the rados.Ioctx object state is 'open'
2526 :raises: IoctxStateError
2528 if self.state != "open":
2529 raise IoctxStateError("The pool is %s" % self.state)
2531 @requires(('loc_key', str_type))
2532 def set_locator_key(self, loc_key):
2534 Set the key for mapping objects to pgs within an io context.
2536 The key is used instead of the object name to determine which
2537 placement groups an object is put in. This affects all subsequent
2538 operations of the io context - until a different locator key is
2539 set, all objects in this io context will be placed in the same pg.
2541 :param loc_key: the key to use as the object locator, or NULL to discard
2542 any previously set key
2545 :raises: :class:`TypeError`
2547 self.require_ioctx_open()
2548 cloc_key = cstr(loc_key, 'loc_key')
2549 cdef char *_loc_key = cloc_key
2551 rados_ioctx_locator_set_key(self.io, _loc_key)
2552 self.locator_key = loc_key
2554 def get_locator_key(self):
2556 Get the locator_key of context
2558 :returns: locator_key
2560 return self.locator_key
2562 @requires(('snap_id', long))
2563 def set_read(self, snap_id):
2565 Set the snapshot for reading objects.
2567 To stop to read from snapshot, use set_read(LIBRADOS_SNAP_HEAD)
2569 :param snap_id: the snapshot Id
2572 :raises: :class:`TypeError`
2574 self.require_ioctx_open()
2575 cdef rados_snap_t _snap_id = snap_id
2577 rados_ioctx_snap_set_read(self.io, _snap_id)
2579 @requires(('nspace', str_type))
2580 def set_namespace(self, nspace):
2582 Set the namespace for objects within an io context.
2584 The namespace in addition to the object name fully identifies
2585 an object. This affects all subsequent operations of the io context
2586 - until a different namespace is set, all objects in this io context
2587 will be placed in the same namespace.
2589 :param nspace: the namespace to use, or None/"" for the default namespace
2592 :raises: :class:`TypeError`
2594 self.require_ioctx_open()
2597 cnspace = cstr(nspace, 'nspace')
2598 cdef char *_nspace = cnspace
2600 rados_ioctx_set_namespace(self.io, _nspace)
2601 self.nspace = nspace
2603 def get_namespace(self):
2605 Get the namespace of context
2613 Close a rados.Ioctx object.
2615 This just tells librados that you no longer need to use the io context.
2616 It may not be freed immediately if there are pending asynchronous
2617 requests on it, but you should not use an io context again after
2618 calling this function on it.
2620 if self.state == "open":
2621 self.require_ioctx_open()
2623 rados_ioctx_destroy(self.io)
2624 self.state = "closed"
2627 @requires(('key', str_type), ('data', bytes))
2628 def write(self, key, data, offset=0):
2630 Write data to an object synchronously
2632 :param key: name of the object
2634 :param data: data to write
2636 :param offset: byte offset in the object to begin writing at
2639 :raises: :class:`TypeError`
2640 :raises: :class:`LogicError`
2641 :returns: int - 0 on success
2643 self.require_ioctx_open()
2645 key = cstr(key, 'key')
2649 size_t length = len(data)
2650 uint64_t _offset = offset
2653 ret = rados_write(self.io, _key, _data, length, _offset)
2657 raise make_ex(ret, "Ioctx.write(%s): failed to write %s"
2660 raise LogicError("Ioctx.write(%s): rados_write \
2661 returned %d, but should return zero on success." % (self.name, ret))
2663 @requires(('key', str_type), ('data', bytes))
2664 def write_full(self, key, data):
2666 Write an entire object synchronously.
2668 The object is filled with the provided data. If the object exists,
2669 it is atomically truncated and then written.
2671 :param key: name of the object
2673 :param data: data to write
2676 :raises: :class:`TypeError`
2677 :raises: :class:`Error`
2678 :returns: int - 0 on success
2680 self.require_ioctx_open()
2681 key = cstr(key, 'key')
2685 size_t length = len(data)
2688 ret = rados_write_full(self.io, _key, _data, length)
2692 raise make_ex(ret, "Ioctx.write_full(%s): failed to write %s"
2695 raise LogicError("Ioctx.write_full(%s): rados_write_full \
2696 returned %d, but should return zero on success." % (self.name, ret))
2698 @requires(('key', str_type), ('data', bytes))
2699 def append(self, key, data):
2701 Append data to an object synchronously
2703 :param key: name of the object
2705 :param data: data to write
2708 :raises: :class:`TypeError`
2709 :raises: :class:`LogicError`
2710 :returns: int - 0 on success
2712 self.require_ioctx_open()
2713 key = cstr(key, 'key')
2717 size_t length = len(data)
2720 ret = rados_append(self.io, _key, _data, length)
2724 raise make_ex(ret, "Ioctx.append(%s): failed to append %s"
2727 raise LogicError("Ioctx.append(%s): rados_append \
2728 returned %d, but should return zero on success." % (self.name, ret))
2730 @requires(('key', str_type))
2731 def read(self, key, length=8192, offset=0):
2733 Read data from an object synchronously
2735 :param key: name of the object
2737 :param length: the number of bytes to read (default=8192)
2739 :param offset: byte offset in the object to begin reading at
2742 :raises: :class:`TypeError`
2743 :raises: :class:`Error`
2744 :returns: str - data read from object
2746 self.require_ioctx_open()
2747 key = cstr(key, 'key')
2751 uint64_t _offset = offset
2752 size_t _length = length
2753 PyObject* ret_s = NULL
2755 ret_s = PyBytes_FromStringAndSize(NULL, length)
2757 ret_buf = PyBytes_AsString(ret_s)
2759 ret = rados_read(self.io, _key, ret_buf, _length, _offset)
2761 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2764 _PyBytes_Resize(&ret_s, ret)
2766 return <object>ret_s
2768 # We DECREF unconditionally: the cast to object above will have
2769 # INCREFed if necessary. This also takes care of exceptions,
2770 # including if _PyString_Resize fails (that will free the string
2771 # itself and set ret_s to NULL, hence XDECREF).
2772 ref.Py_XDECREF(ret_s)
2774 @requires(('key', str_type), ('cls', str_type), ('method', str_type), ('data', bytes))
2775 def execute(self, key, cls, method, data, length=8192):
2777 Execute an OSD class method on an object.
2779 :param key: name of the object
2781 :param cls: name of the object class
2783 :param method: name of the method
2785 :param data: input data
2787 :param length: size of output buffer in bytes (default=8192)
2790 :raises: :class:`TypeError`
2791 :raises: :class:`Error`
2792 :returns: (ret, method output)
2794 self.require_ioctx_open()
2796 key = cstr(key, 'key')
2797 cls = cstr(cls, 'cls')
2798 method = cstr(method, 'method')
2802 char *_method = method
2804 size_t _data_len = len(data)
2807 size_t _length = length
2808 PyObject* ret_s = NULL
2810 ret_s = PyBytes_FromStringAndSize(NULL, length)
2812 ret_buf = PyBytes_AsString(ret_s)
2814 ret = rados_exec(self.io, _key, _cls, _method, _data,
2815 _data_len, ret_buf, _length)
2817 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2820 _PyBytes_Resize(&ret_s, ret)
2822 return ret, <object>ret_s
2824 # We DECREF unconditionally: the cast to object above will have
2825 # INCREFed if necessary. This also takes care of exceptions,
2826 # including if _PyString_Resize fails (that will free the string
2827 # itself and set ret_s to NULL, hence XDECREF).
2828 ref.Py_XDECREF(ret_s)
2830 def get_stats(self):
2832 Get pool usage statistics
2834 :returns: dict - contains the following keys:
2836 - ``num_bytes`` (int) - size of pool in bytes
2838 - ``num_kb`` (int) - size of pool in kbytes
2840 - ``num_objects`` (int) - number of objects in the pool
2842 - ``num_object_clones`` (int) - number of object clones
2844 - ``num_object_copies`` (int) - number of object copies
2846 - ``num_objects_missing_on_primary`` (int) - number of objets
2849 - ``num_objects_unfound`` (int) - number of unfound objects
2851 - ``num_objects_degraded`` (int) - number of degraded objects
2853 - ``num_rd`` (int) - bytes read
2855 - ``num_rd_kb`` (int) - kbytes read
2857 - ``num_wr`` (int) - bytes written
2859 - ``num_wr_kb`` (int) - kbytes written
2861 self.require_ioctx_open()
2862 cdef rados_pool_stat_t stats
2864 ret = rados_ioctx_pool_stat(self.io, &stats)
2866 raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name)
2867 return {'num_bytes': stats.num_bytes,
2868 'num_kb': stats.num_kb,
2869 'num_objects': stats.num_objects,
2870 'num_object_clones': stats.num_object_clones,
2871 'num_object_copies': stats.num_object_copies,
2872 "num_objects_missing_on_primary": stats.num_objects_missing_on_primary,
2873 "num_objects_unfound": stats.num_objects_unfound,
2874 "num_objects_degraded": stats.num_objects_degraded,
2875 "num_rd": stats.num_rd,
2876 "num_rd_kb": stats.num_rd_kb,
2877 "num_wr": stats.num_wr,
2878 "num_wr_kb": stats.num_wr_kb}
2880 @requires(('key', str_type))
2881 def remove_object(self, key):
2885 This does not delete any snapshots of the object.
2887 :param key: the name of the object to delete
2890 :raises: :class:`TypeError`
2891 :raises: :class:`Error`
2892 :returns: bool - True on success
2894 self.require_ioctx_open()
2895 key = cstr(key, 'key')
2900 ret = rados_remove(self.io, _key)
2902 raise make_ex(ret, "Failed to remove '%s'" % key)
2905 @requires(('key', str_type))
2906 def trunc(self, key, size):
2910 If this enlarges the object, the new area is logically filled with
2911 zeroes. If this shrinks the object, the excess data is removed.
2913 :param key: the name of the object to resize
2915 :param size: the new size of the object in bytes
2918 :raises: :class:`TypeError`
2919 :raises: :class:`Error`
2920 :returns: int - 0 on success, otherwise raises error
2923 self.require_ioctx_open()
2924 key = cstr(key, 'key')
2927 uint64_t _size = size
2930 ret = rados_trunc(self.io, _key, _size)
2932 raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))
2935 @requires(('key', str_type))
2936 def stat(self, key):
2938 Get object stats (size/mtime)
2940 :param key: the name of the object to get stats from
2943 :raises: :class:`TypeError`
2944 :raises: :class:`Error`
2945 :returns: (size,timestamp)
2947 self.require_ioctx_open()
2949 key = cstr(key, 'key')
2956 ret = rados_stat(self.io, _key, &psize, &pmtime)
2958 raise make_ex(ret, "Failed to stat %r" % key)
2959 return psize, time.localtime(pmtime)
2961 @requires(('key', str_type), ('xattr_name', str_type))
2962 def get_xattr(self, key, xattr_name):
2964 Get the value of an extended attribute on an object.
2966 :param key: the name of the object to get xattr from
2968 :param xattr_name: which extended attribute to read
2969 :type xattr_name: str
2971 :raises: :class:`TypeError`
2972 :raises: :class:`Error`
2973 :returns: str - value of the xattr
2975 self.require_ioctx_open()
2977 key = cstr(key, 'key')
2978 xattr_name = cstr(xattr_name, 'xattr_name')
2981 char *_xattr_name = xattr_name
2982 size_t ret_length = 4096
2983 char *ret_buf = NULL
2986 while ret_length < 4096 * 1024 * 1024:
2987 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
2989 ret = rados_getxattr(self.io, _key, _xattr_name, ret_buf, ret_length)
2990 if ret == -errno.ERANGE:
2993 raise make_ex(ret, "Failed to get xattr %r" % xattr_name)
2996 return ret_buf[:ret]
3000 @requires(('oid', str_type))
3001 def get_xattrs(self, oid):
3003 Start iterating over xattrs on an object.
3005 :param oid: the name of the object to get xattrs from
3008 :raises: :class:`TypeError`
3009 :raises: :class:`Error`
3010 :returns: XattrIterator
3012 self.require_ioctx_open()
3013 return XattrIterator(self, oid)
3015 @requires(('key', str_type), ('xattr_name', str_type), ('xattr_value', bytes))
3016 def set_xattr(self, key, xattr_name, xattr_value):
3018 Set an extended attribute on an object.
3020 :param key: the name of the object to set xattr to
3022 :param xattr_name: which extended attribute to set
3023 :type xattr_name: str
3024 :param xattr_value: the value of the extended attribute
3025 :type xattr_value: bytes
3027 :raises: :class:`TypeError`
3028 :raises: :class:`Error`
3029 :returns: bool - True on success, otherwise raise an error
3031 self.require_ioctx_open()
3033 key = cstr(key, 'key')
3034 xattr_name = cstr(xattr_name, 'xattr_name')
3037 char *_xattr_name = xattr_name
3038 char *_xattr_value = xattr_value
3039 size_t _xattr_value_len = len(xattr_value)
3042 ret = rados_setxattr(self.io, _key, _xattr_name,
3043 _xattr_value, _xattr_value_len)
3045 raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
3048 @requires(('key', str_type), ('xattr_name', str_type))
3049 def rm_xattr(self, key, xattr_name):
3051 Removes an extended attribute on from an object.
3053 :param key: the name of the object to remove xattr from
3055 :param xattr_name: which extended attribute to remove
3056 :type xattr_name: str
3058 :raises: :class:`TypeError`
3059 :raises: :class:`Error`
3060 :returns: bool - True on success, otherwise raise an error
3062 self.require_ioctx_open()
3064 key = cstr(key, 'key')
3065 xattr_name = cstr(xattr_name, 'xattr_name')
3068 char *_xattr_name = xattr_name
3071 ret = rados_rmxattr(self.io, _key, _xattr_name)
3073 raise make_ex(ret, "Failed to delete key %r xattr %r" %
3077 @requires(('obj', str_type), ('msg', str_type), ('timeout_ms', int))
3078 def notify(self, obj, msg='', timeout_ms=5000):
3080 Send a rados notification to an object.
3082 :param obj: the name of the object to notify
3084 :param msg: optional message to send in the notification
3086 :param timeout_ms: notify timeout (in ms)
3087 :type timeout_ms: int
3089 :raises: :class:`TypeError`
3090 :raises: :class:`Error`
3091 :returns: bool - True on success, otherwise raise an error
3093 self.require_ioctx_open()
3096 obj = cstr(obj, 'obj')
3097 msg = cstr(msg, 'msg')
3101 int _msglen = msglen
3102 uint64_t _timeout_ms = timeout_ms
3105 ret = rados_notify2(self.io, _obj, _msg, _msglen, _timeout_ms,
3108 raise make_ex(ret, "Failed to notify %r" % (obj))
3111 def list_objects(self):
3113 Get ObjectIterator on rados.Ioctx object.
3115 :returns: ObjectIterator
3117 self.require_ioctx_open()
3118 return ObjectIterator(self)
3120 def list_snaps(self):
3122 Get SnapIterator on rados.Ioctx object.
3124 :returns: SnapIterator
3126 self.require_ioctx_open()
3127 return SnapIterator(self)
3129 @requires(('snap_name', str_type))
3130 def create_snap(self, snap_name):
3132 Create a pool-wide snapshot
3134 :param snap_name: the name of the snapshot
3135 :type snap_name: str
3137 :raises: :class:`TypeError`
3138 :raises: :class:`Error`
3140 self.require_ioctx_open()
3141 snap_name = cstr(snap_name, 'snap_name')
3142 cdef char *_snap_name = snap_name
3145 ret = rados_ioctx_snap_create(self.io, _snap_name)
3147 raise make_ex(ret, "Failed to create snap %s" % snap_name)
3149 @requires(('snap_name', str_type))
3150 def remove_snap(self, snap_name):
3152 Removes a pool-wide snapshot
3154 :param snap_name: the name of the snapshot
3155 :type snap_name: str
3157 :raises: :class:`TypeError`
3158 :raises: :class:`Error`
3160 self.require_ioctx_open()
3161 snap_name = cstr(snap_name, 'snap_name')
3162 cdef char *_snap_name = snap_name
3165 ret = rados_ioctx_snap_remove(self.io, _snap_name)
3167 raise make_ex(ret, "Failed to remove snap %s" % snap_name)
3169 @requires(('snap_name', str_type))
3170 def lookup_snap(self, snap_name):
3172 Get the id of a pool snapshot
3174 :param snap_name: the name of the snapshot to lookop
3175 :type snap_name: str
3177 :raises: :class:`TypeError`
3178 :raises: :class:`Error`
3179 :returns: Snap - on success
3181 self.require_ioctx_open()
3182 csnap_name = cstr(snap_name, 'snap_name')
3184 char *_snap_name = csnap_name
3185 rados_snap_t snap_id
3188 ret = rados_ioctx_snap_lookup(self.io, _snap_name, &snap_id)
3190 raise make_ex(ret, "Failed to lookup snap %s" % snap_name)
3191 return Snap(self, snap_name, int(snap_id))
3193 @requires(('oid', str_type), ('snap_name', str_type))
3194 def snap_rollback(self, oid, snap_name):
3196 Rollback an object to a snapshot
3198 :param oid: the name of the object
3200 :param snap_name: the name of the snapshot
3201 :type snap_name: str
3203 :raises: :class:`TypeError`
3204 :raises: :class:`Error`
3206 self.require_ioctx_open()
3207 oid = cstr(oid, 'oid')
3208 snap_name = cstr(snap_name, 'snap_name')
3210 char *_snap_name = snap_name
3214 ret = rados_ioctx_snap_rollback(self.io, _oid, _snap_name)
3216 raise make_ex(ret, "Failed to rollback %s" % oid)
3218 def create_self_managed_snap(self):
3220 Creates a self-managed snapshot
3222 :returns: snap id on success
3224 :raises: :class:`Error`
3226 self.require_ioctx_open()
3228 rados_snap_t _snap_id
3230 ret = rados_ioctx_selfmanaged_snap_create(self.io, &_snap_id)
3232 raise make_ex(ret, "Failed to create self-managed snapshot")
3233 return int(_snap_id)
3235 @requires(('snap_id', int))
3236 def remove_self_managed_snap(self, snap_id):
3238 Removes a self-managed snapshot
3240 :param snap_id: the name of the snapshot
3243 :raises: :class:`TypeError`
3244 :raises: :class:`Error`
3246 self.require_ioctx_open()
3248 rados_snap_t _snap_id = snap_id
3250 ret = rados_ioctx_selfmanaged_snap_remove(self.io, _snap_id)
3252 raise make_ex(ret, "Failed to remove self-managed snapshot")
3254 def set_self_managed_snap_write(self, snaps):
3256 Updates the write context to the specified self-managed
3259 :param snaps: all associated self-managed snapshot ids
3262 :raises: :class:`TypeError`
3263 :raises: :class:`Error`
3265 self.require_ioctx_open()
3269 sorted_snaps = sorted([int(x) for x in snaps], reverse=True)
3270 snap_seq = sorted_snaps[0]
3273 rados_snap_t _snap_seq = snap_seq
3274 rados_snap_t *_snaps = NULL
3275 int _num_snaps = len(sorted_snaps)
3277 _snaps = <rados_snap_t *>malloc(_num_snaps * sizeof(rados_snap_t))
3278 for i in range(len(sorted_snaps)):
3279 _snaps[i] = sorted_snaps[i]
3281 ret = rados_ioctx_selfmanaged_snap_set_write_ctx(self.io,
3286 raise make_ex(ret, "Failed to update snapshot write context")
3290 @requires(('oid', str_type), ('snap_id', int))
3291 def rollback_self_managed_snap(self, oid, snap_id):
3293 Rolls an specific object back to a self-managed snapshot revision
3295 :param oid: the name of the object
3297 :param snap_id: the name of the snapshot
3300 :raises: :class:`TypeError`
3301 :raises: :class:`Error`
3303 self.require_ioctx_open()
3304 oid = cstr(oid, 'oid')
3307 rados_snap_t _snap_id = snap_id
3309 ret = rados_ioctx_selfmanaged_snap_rollback(self.io, _oid, _snap_id)
3311 raise make_ex(ret, "Failed to rollback %s" % oid)
3313 def get_last_version(self):
3315 Return the version of the last object read or written to.
3317 This exposes the internal version number of the last object read or
3318 written via this io context
3320 :returns: version of the last object used
3322 self.require_ioctx_open()
3324 ret = rados_get_last_version(self.io)
3327 def create_write_op(self):
3329 create write operation object.
3330 need call release_write_op after use
3332 return WriteOp().create()
3334 def create_read_op(self):
3336 create read operation object.
3337 need call release_read_op after use
3339 return ReadOp().create()
3341 def release_write_op(self, write_op):
3343 release memory alloc by create_write_op
3347 def release_read_op(self, read_op):
3349 release memory alloc by create_read_op
3350 :para read_op: read_op object
3355 @requires(('write_op', WriteOp), ('keys', tuple), ('values', tuple))
3356 def set_omap(self, write_op, keys, values):
3358 set keys values to write_op
3359 :para write_op: write_operation object
3360 :type write_op: WriteOp
3361 :para keys: a tuple of keys
3363 :para values: a tuple of values
3367 if len(keys) != len(values):
3368 raise Error("Rados(): keys and values must have the same number of items")
3370 keys = cstr_list(keys, 'keys')
3372 WriteOp _write_op = write_op
3373 size_t key_num = len(keys)
3374 char **_keys = to_bytes_array(keys)
3375 char **_values = to_bytes_array(values)
3376 size_t *_lens = to_csize_t_array([len(v) for v in values])
3380 rados_write_op_omap_set(_write_op.write_op,
3381 <const char**>_keys,
3382 <const char**>_values,
3383 <const size_t*>_lens, key_num)
3389 @requires(('write_op', WriteOp), ('oid', str_type), ('mtime', opt(int)), ('flags', opt(int)))
3390 def operate_write_op(self, write_op, oid, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3392 execute the real write operation
3393 :para write_op: write operation object
3394 :type write_op: WriteOp
3395 :para oid: object name
3397 :para mtime: the time to set the mtime to, 0 for the current time
3399 :para flags: flags to apply to the entire operation
3403 oid = cstr(oid, 'oid')
3405 WriteOp _write_op = write_op
3407 time_t _mtime = mtime
3411 ret = rados_write_op_operate(_write_op.write_op, self.io, _oid, &_mtime, _flags)
3413 raise make_ex(ret, "Failed to operate write op for oid %s" % oid)
3415 @requires(('write_op', WriteOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('mtime', opt(int)), ('flags', opt(int)))
3416 def operate_aio_write_op(self, write_op, oid, oncomplete=None, onsafe=None, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3418 execute the real write operation asynchronously
3419 :para write_op: write operation object
3420 :type write_op: WriteOp
3421 :para oid: object name
3423 :param oncomplete: what to do when the remove is safe and complete in memory
3425 :type oncomplete: completion
3426 :param onsafe: what to do when the remove is safe and complete on storage
3428 :type onsafe: completion
3429 :para mtime: the time to set the mtime to, 0 for the current time
3431 :para flags: flags to apply to the entire operation
3434 :raises: :class:`Error`
3435 :returns: completion object
3438 oid = cstr(oid, 'oid')
3440 WriteOp _write_op = write_op
3442 Completion completion
3443 time_t _mtime = mtime
3446 completion = self.__get_completion(oncomplete, onsafe)
3447 self.__track_completion(completion)
3450 ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid,
3453 completion._cleanup()
3454 raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid)
3457 @requires(('read_op', ReadOp), ('oid', str_type), ('flag', opt(int)))
3458 def operate_read_op(self, read_op, oid, flag=LIBRADOS_OPERATION_NOFLAG):
3460 execute the real read operation
3461 :para read_op: read operation object
3462 :type read_op: ReadOp
3463 :para oid: object name
3465 :para flag: flags to apply to the entire operation
3468 oid = cstr(oid, 'oid')
3470 ReadOp _read_op = read_op
3475 ret = rados_read_op_operate(_read_op.read_op, self.io, _oid, _flag)
3477 raise make_ex(ret, "Failed to operate read op for oid %s" % oid)
3479 @requires(('read_op', ReadOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('flag', opt(int)))
3480 def operate_aio_read_op(self, read_op, oid, oncomplete=None, onsafe=None, flag=LIBRADOS_OPERATION_NOFLAG):
3482 execute the real read operation
3483 :para read_op: read operation object
3484 :type read_op: ReadOp
3485 :para oid: object name
3487 :param oncomplete: what to do when the remove is safe and complete in memory
3489 :type oncomplete: completion
3490 :param onsafe: what to do when the remove is safe and complete on storage
3492 :type onsafe: completion
3493 :para flag: flags to apply to the entire operation
3496 oid = cstr(oid, 'oid')
3498 ReadOp _read_op = read_op
3500 Completion completion
3503 completion = self.__get_completion(oncomplete, onsafe)
3504 self.__track_completion(completion)
3507 ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag)
3509 completion._cleanup()
3510 raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid)
3513 @requires(('read_op', ReadOp), ('start_after', str_type), ('filter_prefix', str_type), ('max_return', int))
3514 def get_omap_vals(self, read_op, start_after, filter_prefix, max_return):
3517 :para read_op: read operation object
3518 :type read_op: ReadOp
3519 :para start_after: list keys starting after start_after
3520 :type start_after: str
3521 :para filter_prefix: list only keys beginning with filter_prefix
3522 :type filter_prefix: str
3523 :para max_return: list no more than max_return key/value pairs
3524 :type max_return: int
3525 :returns: an iterator over the requested omap values, return value from this action
3528 start_after = cstr(start_after, 'start_after') if start_after else None
3529 filter_prefix = cstr(filter_prefix, 'filter_prefix') if filter_prefix else None
3531 char *_start_after = opt_str(start_after)
3532 char *_filter_prefix = opt_str(filter_prefix)
3533 ReadOp _read_op = read_op
3534 rados_omap_iter_t iter_addr = NULL
3535 int _max_return = max_return
3538 rados_read_op_omap_get_vals2(_read_op.read_op, _start_after, _filter_prefix,
3539 _max_return, &iter_addr, NULL, NULL)
3540 it = OmapIterator(self)
3542 return it, 0 # 0 is meaningless; there for backward-compat
3544 @requires(('read_op', ReadOp), ('start_after', str_type), ('max_return', int))
3545 def get_omap_keys(self, read_op, start_after, max_return):
3548 :para read_op: read operation object
3549 :type read_op: ReadOp
3550 :para start_after: list keys starting after start_after
3551 :type start_after: str
3552 :para max_return: list no more than max_return key/value pairs
3553 :type max_return: int
3554 :returns: an iterator over the requested omap values, return value from this action
3556 start_after = cstr(start_after, 'start_after') if start_after else None
3558 char *_start_after = opt_str(start_after)
3559 ReadOp _read_op = read_op
3560 rados_omap_iter_t iter_addr = NULL
3561 int _max_return = max_return
3564 rados_read_op_omap_get_keys2(_read_op.read_op, _start_after,
3565 _max_return, &iter_addr, NULL, NULL)
3566 it = OmapIterator(self)
3568 return it, 0 # 0 is meaningless; there for backward-compat
3570 @requires(('read_op', ReadOp), ('keys', tuple))
3571 def get_omap_vals_by_keys(self, read_op, keys):
3573 get the omap values by keys
3574 :para read_op: read operation object
3575 :type read_op: ReadOp
3576 :para keys: input key tuple
3578 :returns: an iterator over the requested omap values, return value from this action
3580 keys = cstr_list(keys, 'keys')
3582 ReadOp _read_op = read_op
3583 rados_omap_iter_t iter_addr
3584 char **_keys = to_bytes_array(keys)
3585 size_t key_num = len(keys)
3589 rados_read_op_omap_get_vals_by_keys(_read_op.read_op,
3590 <const char**>_keys,
3591 key_num, &iter_addr, NULL)
3592 it = OmapIterator(self)
3594 return it, 0 # 0 is meaningless; there for backward-compat
3598 @requires(('write_op', WriteOp), ('keys', tuple))
3599 def remove_omap_keys(self, write_op, keys):
3601 remove omap keys specifiled
3602 :para write_op: write operation object
3603 :type write_op: WriteOp
3604 :para keys: input key tuple
3608 keys = cstr_list(keys, 'keys')
3610 WriteOp _write_op = write_op
3611 size_t key_num = len(keys)
3612 char **_keys = to_bytes_array(keys)
3616 rados_write_op_omap_rm_keys(_write_op.write_op, <const char**>_keys, key_num)
3620 @requires(('write_op', WriteOp))
3621 def clear_omap(self, write_op):
3623 Remove all key/value pairs from an object
3624 :para write_op: write operation object
3625 :type write_op: WriteOp
3629 WriteOp _write_op = write_op
3632 rados_write_op_omap_clear(_write_op.write_op)
3634 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('desc', str_type),
3635 ('duration', opt(int)), ('flags', int))
3636 def lock_exclusive(self, key, name, cookie, desc="", duration=None, flags=0):
3639 Take an exclusive lock on an object
3641 :param key: name of the object
3643 :param name: name of the lock
3645 :param cookie: cookie of the lock
3647 :param desc: description of the lock
3649 :param duration: duration of the lock in seconds
3654 :raises: :class:`TypeError`
3655 :raises: :class:`Error`
3657 self.require_ioctx_open()
3659 key = cstr(key, 'key')
3660 name = cstr(name, 'name')
3661 cookie = cstr(cookie, 'cookie')
3662 desc = cstr(desc, 'desc')
3667 char* _cookie = cookie
3669 uint8_t _flags = flags
3672 if duration is None:
3674 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3677 _duration.tv_sec = duration
3679 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3683 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3685 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('tag', str_type),
3686 ('desc', str_type), ('duration', opt(int)), ('flags', int))
3687 def lock_shared(self, key, name, cookie, tag, desc="", duration=None, flags=0):
3690 Take a shared lock on an object
3692 :param key: name of the object
3694 :param name: name of the lock
3696 :param cookie: cookie of the lock
3698 :param tag: tag of the lock
3700 :param desc: description of the lock
3702 :param duration: duration of the lock in seconds
3707 :raises: :class:`TypeError`
3708 :raises: :class:`Error`
3710 self.require_ioctx_open()
3712 key = cstr(key, 'key')
3713 tag = cstr(tag, 'tag')
3714 name = cstr(name, 'name')
3715 cookie = cstr(cookie, 'cookie')
3716 desc = cstr(desc, 'desc')
3722 char* _cookie = cookie
3724 uint8_t _flags = flags
3727 if duration is None:
3729 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3732 _duration.tv_sec = duration
3734 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3737 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3739 @requires(('key', str_type), ('name', str_type), ('cookie', str_type))
3740 def unlock(self, key, name, cookie):
3743 Release a shared or exclusive lock on an object
3745 :param key: name of the object
3747 :param name: name of the lock
3749 :param cookie: cookie of the lock
3752 :raises: :class:`TypeError`
3753 :raises: :class:`Error`
3755 self.require_ioctx_open()
3757 key = cstr(key, 'key')
3758 name = cstr(name, 'name')
3759 cookie = cstr(cookie, 'cookie')
3764 char* _cookie = cookie
3767 ret = rados_unlock(self.io, _key, _name, _cookie)
3769 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3771 def set_osdmap_full_try(self):
3773 Set global osdmap_full_try label to true
3776 rados_set_osdmap_full_try(self.io)
3778 def unset_osdmap_full_try(self):
3783 rados_unset_osdmap_full_try(self.io)
3785 def application_enable(self, app_name, force=False):
3787 Enable an application on an OSD pool
3789 :param app_name: application name
3791 :param force: False if only a single app should exist per pool
3792 :type expire_seconds: boool
3794 :raises: :class:`Error`
3796 app_name = cstr(app_name, 'app_name')
3798 char *_app_name = app_name
3799 int _force = (1 if force else 0)
3802 ret = rados_application_enable(self.io, _app_name, _force)
3804 raise make_ex(ret, "error enabling application")
3806 def application_list(self):
3808 Returns a list of enabled applications
3810 :returns: list of app name string
3818 apps = <char *>realloc_chk(apps, length)
3820 ret = rados_application_list(self.io, apps, &length)
3822 return [decode_cstr(app) for app in
3823 apps[:length].split(b'\0') if app]
3824 elif ret == -errno.ENOENT:
3826 elif ret == -errno.ERANGE:
3829 raise make_ex(ret, "error listing applications")
3833 def application_metadata_set(self, app_name, key, value):
3835 Sets application metadata on an OSD pool
3837 :param app_name: application name
3839 :param key: metadata key
3841 :param value: metadata value
3844 :raises: :class:`Error`
3846 app_name = cstr(app_name, 'app_name')
3847 key = cstr(key, 'key')
3848 value = cstr(value, 'value')
3850 char *_app_name = app_name
3852 char *_value = value
3855 ret = rados_application_metadata_set(self.io, _app_name, _key,
3858 raise make_ex(ret, "error setting application metadata")
3860 def application_metadata_remove(self, app_name, key):
3862 Remove application metadata from an OSD pool
3864 :param app_name: application name
3866 :param key: metadata key
3869 :raises: :class:`Error`
3871 app_name = cstr(app_name, 'app_name')
3872 key = cstr(key, 'key')
3874 char *_app_name = app_name
3878 ret = rados_application_metadata_remove(self.io, _app_name, _key)
3880 raise make_ex(ret, "error removing application metadata")
3882 def application_metadata_list(self, app_name):
3884 Returns a list of enabled applications
3886 :param app_name: application name
3888 :returns: list of key/value tuples
3890 app_name = cstr(app_name, 'app_name')
3892 char *_app_name = app_name
3893 size_t key_length = 128
3894 size_t val_length = 128
3900 c_keys = <char *>realloc_chk(c_keys, key_length)
3901 c_vals = <char *>realloc_chk(c_vals, val_length)
3903 ret = rados_application_metadata_list(self.io, _app_name,
3904 c_keys, &key_length,
3905 c_vals, &val_length)
3907 keys = [decode_cstr(key) for key in
3908 c_keys[:key_length].split(b'\0')]
3909 vals = [decode_cstr(val) for val in
3910 c_vals[:val_length].split(b'\0')]
3911 return zip(keys, vals)[:-1]
3912 elif ret == -errno.ERANGE:
3915 raise make_ex(ret, "error listing application metadata")
3920 def alignment(self):
3922 Returns pool alignment
3925 Number of alignment bytes required by the current pool, or None if
3926 alignment is not required.
3933 ret = rados_ioctx_pool_requires_alignment2(self.io, &requires)
3935 raise make_ex(ret, "error checking alignment")
3940 ret = rados_ioctx_pool_required_alignment2(self.io, &_alignment)
3942 raise make_ex(ret, "error querying alignment")
3943 alignment = _alignment
3947 def set_object_locator(func):
3948 def retfunc(self, *args, **kwargs):
3949 if self.locator_key is not None:
3950 old_locator = self.ioctx.get_locator_key()
3951 self.ioctx.set_locator_key(self.locator_key)
3952 retval = func(self, *args, **kwargs)
3953 self.ioctx.set_locator_key(old_locator)
3956 return func(self, *args, **kwargs)
3960 def set_object_namespace(func):
3961 def retfunc(self, *args, **kwargs):
3962 if self.nspace is None:
3963 raise LogicError("Namespace not set properly in context")
3964 old_nspace = self.ioctx.get_namespace()
3965 self.ioctx.set_namespace(self.nspace)
3966 retval = func(self, *args, **kwargs)
3967 self.ioctx.set_namespace(old_nspace)
3972 class Object(object):
3973 """Rados object wrapper, makes the object look like a file"""
3974 def __init__(self, ioctx, key, locator_key=None, nspace=None):
3978 self.state = "exists"
3979 self.locator_key = locator_key
3980 self.nspace = "" if nspace is None else nspace
3983 return "rados.Object(ioctx=%s,key=%s,nspace=%s,locator=%s)" % \
3984 (str(self.ioctx), self.key, "--default--"
3985 if self.nspace is "" else self.nspace, self.locator_key)
3987 def require_object_exists(self):
3988 if self.state != "exists":
3989 raise ObjectStateError("The object is %s" % self.state)
3992 @set_object_namespace
3993 def read(self, length=1024 * 1024):
3994 self.require_object_exists()
3995 ret = self.ioctx.read(self.key, length, self.offset)
3996 self.offset += len(ret)
4000 @set_object_namespace
4001 def write(self, string_to_write):
4002 self.require_object_exists()
4003 ret = self.ioctx.write(self.key, string_to_write, self.offset)
4005 self.offset += len(string_to_write)
4009 @set_object_namespace
4011 self.require_object_exists()
4012 self.ioctx.remove_object(self.key)
4013 self.state = "removed"
4016 @set_object_namespace
4018 self.require_object_exists()
4019 return self.ioctx.stat(self.key)
4021 def seek(self, position):
4022 self.require_object_exists()
4023 self.offset = position
4026 @set_object_namespace
4027 def get_xattr(self, xattr_name):
4028 self.require_object_exists()
4029 return self.ioctx.get_xattr(self.key, xattr_name)
4032 @set_object_namespace
4033 def get_xattrs(self):
4034 self.require_object_exists()
4035 return self.ioctx.get_xattrs(self.key)
4038 @set_object_namespace
4039 def set_xattr(self, xattr_name, xattr_value):
4040 self.require_object_exists()
4041 return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)
4044 @set_object_namespace
4045 def rm_xattr(self, xattr_name):
4046 self.require_object_exists()
4047 return self.ioctx.rm_xattr(self.key, xattr_name)
4058 class MonitorLog(object):
4059 # NOTE(sileht): Keep this class for backward compat
4060 # method moved to Rados.monitor_log()
4062 For watching cluster log messages. Instantiate an object and keep
4063 it around while callback is periodically called. Construct with
4064 'level' to monitor 'level' messages (one of MONITOR_LEVELS).
4065 arg will be passed to the callback.
4067 callback will be called with:
4068 arg (given to __init__)
4069 line (the full line, including timestamp, who, level, msg)
4070 who (which entity issued the log message)
4071 timestamp_sec (sec of a struct timespec)
4072 timestamp_nsec (sec of a struct timespec)
4073 seq (sequence number)
4074 level (string representing the level of the log message)
4075 msg (the message itself)
4076 callback's return value is ignored
4078 def __init__(self, cluster, level, callback, arg):
4080 self.callback = callback
4082 self.cluster = cluster
4083 self.cluster.monitor_log(level, callback, arg)