1 # cython: embedsignature=True
3 This module is a thin wrapper around librados.
5 Error codes from librados are turned into exceptions that subclass
6 :class:`Error`. Almost all methods may raise :class:`Error(the base class of all rados exceptions), :class:`PermissionError`
7 (the base class of all rados exceptions), :class:`PermissionError`
8 and :class:`IOError`, in addition to those documented for the
11 # Copyright 2011 Josh Durgin
12 # Copyright 2011, Hannu Valtonen <hannu.valtonen@ormod.com>
13 # Copyright 2015 Hector Martin <marcan@marcan.st>
14 # Copyright 2016 Mehdi Abaakouk <sileht@redhat.com>
16 from cpython cimport PyObject, ref
17 from cpython.pycapsule cimport *
18 from libc cimport errno
19 from libc.stdint cimport *
20 from libc.stdlib cimport malloc, realloc, free
27 from collections.abc import Callable
29 from collections import Callable
30 from datetime import datetime
31 from functools import partial, wraps
32 from itertools import chain
34 # Are we running Python 2.x
35 if sys.version_info[0] < 3:
41 cdef extern from "Python.h":
42 # These are in cpython/string.pxd, but use "object" types instead of
43 # PyObject*, which invokes assumptions in cpython that we need to
44 # legitimately break to implement zero-copy string buffers in Ioctx.read().
45 # This is valid use of the Python API and documented as a special case.
46 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
47 char* PyBytes_AsString(PyObject *string) except NULL
48 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
49 void PyEval_InitThreads()
52 cdef extern from "time.h":
53 ctypedef long int time_t
54 ctypedef long int suseconds_t
57 cdef extern from "sys/time.h":
63 cdef extern from "rados/rados_types.h" nogil:
64 cdef char* _LIBRADOS_ALL_NSPACES "LIBRADOS_ALL_NSPACES"
67 cdef extern from "rados/librados.h" nogil:
69 _LIBRADOS_OP_FLAG_EXCL "LIBRADOS_OP_FLAG_EXCL"
70 _LIBRADOS_OP_FLAG_FAILOK "LIBRADOS_OP_FLAG_FAILOK"
71 _LIBRADOS_OP_FLAG_FADVISE_RANDOM "LIBRADOS_OP_FLAG_FADVISE_RANDOM"
72 _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL "LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL"
73 _LIBRADOS_OP_FLAG_FADVISE_WILLNEED "LIBRADOS_OP_FLAG_FADVISE_WILLNEED"
74 _LIBRADOS_OP_FLAG_FADVISE_DONTNEED "LIBRADOS_OP_FLAG_FADVISE_DONTNEED"
75 _LIBRADOS_OP_FLAG_FADVISE_NOCACHE "LIBRADOS_OP_FLAG_FADVISE_NOCACHE"
79 _LIBRADOS_OPERATION_NOFLAG "LIBRADOS_OPERATION_NOFLAG"
80 _LIBRADOS_OPERATION_BALANCE_READS "LIBRADOS_OPERATION_BALANCE_READS"
81 _LIBRADOS_OPERATION_LOCALIZE_READS "LIBRADOS_OPERATION_LOCALIZE_READS"
82 _LIBRADOS_OPERATION_ORDER_READS_WRITES "LIBRADOS_OPERATION_ORDER_READS_WRITES"
83 _LIBRADOS_OPERATION_IGNORE_CACHE "LIBRADOS_OPERATION_IGNORE_CACHE"
84 _LIBRADOS_OPERATION_SKIPRWLOCKS "LIBRADOS_OPERATION_SKIPRWLOCKS"
85 _LIBRADOS_OPERATION_IGNORE_OVERLAY "LIBRADOS_OPERATION_IGNORE_OVERLAY"
86 _LIBRADOS_CREATE_EXCLUSIVE "LIBRADOS_CREATE_EXCLUSIVE"
87 _LIBRADOS_CREATE_IDEMPOTENT "LIBRADOS_CREATE_IDEMPOTENT"
89 cdef uint64_t _LIBRADOS_SNAP_HEAD "LIBRADOS_SNAP_HEAD"
91 ctypedef void* rados_xattrs_iter_t
92 ctypedef void* rados_omap_iter_t
93 ctypedef void* rados_list_ctx_t
94 ctypedef uint64_t rados_snap_t
95 ctypedef void *rados_write_op_t
96 ctypedef void *rados_read_op_t
97 ctypedef void *rados_completion_t
98 ctypedef void (*rados_callback_t)(rados_completion_t cb, void *arg)
99 ctypedef void (*rados_log_callback_t)(void *arg, const char *line, const char *who,
100 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
101 ctypedef void (*rados_log_callback2_t)(void *arg, const char *line, const char *channel, const char *who, const char *name,
102 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
105 cdef struct rados_cluster_stat_t:
111 cdef struct rados_pool_stat_t:
115 uint64_t num_object_clones
116 uint64_t num_object_copies
117 uint64_t num_objects_missing_on_primary
118 uint64_t num_objects_unfound
119 uint64_t num_objects_degraded
125 void rados_buffer_free(char *buf)
127 void rados_version(int *major, int *minor, int *extra)
128 int rados_create2(rados_t *pcluster, const char *const clustername,
129 const char * const name, uint64_t flags)
130 int rados_create_with_context(rados_t *cluster, rados_config_t cct)
131 int rados_connect(rados_t cluster)
132 void rados_shutdown(rados_t cluster)
133 uint64_t rados_get_instance_id(rados_t cluster)
134 int rados_conf_read_file(rados_t cluster, const char *path)
135 int rados_conf_parse_argv_remainder(rados_t cluster, int argc, const char **argv, const char **remargv)
136 int rados_conf_parse_env(rados_t cluster, const char *var)
137 int rados_conf_set(rados_t cluster, char *option, const char *value)
138 int rados_conf_get(rados_t cluster, char *option, char *buf, size_t len)
140 int rados_ioctx_pool_stat(rados_ioctx_t io, rados_pool_stat_t *stats)
141 int64_t rados_pool_lookup(rados_t cluster, const char *pool_name)
142 int rados_pool_reverse_lookup(rados_t cluster, int64_t id, char *buf, size_t maxlen)
143 int rados_pool_create(rados_t cluster, const char *pool_name)
144 int rados_pool_create_with_crush_rule(rados_t cluster, const char *pool_name, uint8_t crush_rule_num)
145 int rados_pool_create_with_auid(rados_t cluster, const char *pool_name, uint64_t auid)
146 int rados_pool_create_with_all(rados_t cluster, const char *pool_name, uint64_t auid, uint8_t crush_rule_num)
147 int rados_pool_get_base_tier(rados_t cluster, int64_t pool, int64_t *base_tier)
148 int rados_pool_list(rados_t cluster, char *buf, size_t len)
149 int rados_pool_delete(rados_t cluster, const char *pool_name)
150 int rados_inconsistent_pg_list(rados_t cluster, int64_t pool, char *buf, size_t len)
152 int rados_cluster_stat(rados_t cluster, rados_cluster_stat_t *result)
153 int rados_cluster_fsid(rados_t cluster, char *buf, size_t len)
154 int rados_blacklist_add(rados_t cluster, char *client_address, uint32_t expire_seconds)
155 int rados_getaddrs(rados_t cluster, char** addrs)
156 int rados_application_enable(rados_ioctx_t io, const char *app_name,
158 void rados_set_osdmap_full_try(rados_ioctx_t io)
159 void rados_unset_osdmap_full_try(rados_ioctx_t io)
160 int rados_application_list(rados_ioctx_t io, char *values,
162 int rados_application_metadata_get(rados_ioctx_t io, const char *app_name,
163 const char *key, char *value,
165 int rados_application_metadata_set(rados_ioctx_t io, const char *app_name,
166 const char *key, const char *value)
167 int rados_application_metadata_remove(rados_ioctx_t io,
168 const char *app_name, const char *key)
169 int rados_application_metadata_list(rados_ioctx_t io,
170 const char *app_name, char *keys,
171 size_t *key_len, char *values,
173 int rados_ping_monitor(rados_t cluster, const char *mon_id, char **outstr, size_t *outstrlen)
174 int rados_mon_command(rados_t cluster, const char **cmd, size_t cmdlen,
175 const char *inbuf, size_t inbuflen,
176 char **outbuf, size_t *outbuflen,
177 char **outs, size_t *outslen)
178 int rados_mgr_command(rados_t cluster, const char **cmd, size_t cmdlen,
179 const char *inbuf, size_t inbuflen,
180 char **outbuf, size_t *outbuflen,
181 char **outs, size_t *outslen)
182 int rados_mgr_command_target(rados_t cluster,
184 const char **cmd, size_t cmdlen,
185 const char *inbuf, size_t inbuflen,
186 char **outbuf, size_t *outbuflen,
187 char **outs, size_t *outslen)
188 int rados_mon_command_target(rados_t cluster, const char *name, const char **cmd, size_t cmdlen,
189 const char *inbuf, size_t inbuflen,
190 char **outbuf, size_t *outbuflen,
191 char **outs, size_t *outslen)
192 int rados_osd_command(rados_t cluster, int osdid, const char **cmd, size_t cmdlen,
193 const char *inbuf, size_t inbuflen,
194 char **outbuf, size_t *outbuflen,
195 char **outs, size_t *outslen)
196 int rados_pg_command(rados_t cluster, const char *pgstr, const char **cmd, size_t cmdlen,
197 const char *inbuf, size_t inbuflen,
198 char **outbuf, size_t *outbuflen,
199 char **outs, size_t *outslen)
200 int rados_monitor_log(rados_t cluster, const char *level, rados_log_callback_t cb, void *arg)
201 int rados_monitor_log2(rados_t cluster, const char *level, rados_log_callback2_t cb, void *arg)
203 int rados_wait_for_latest_osdmap(rados_t cluster)
205 int rados_service_register(rados_t cluster, const char *service, const char *daemon, const char *metadata_dict)
206 int rados_service_update_status(rados_t cluster, const char *status_dict)
208 int rados_ioctx_create(rados_t cluster, const char *pool_name, rados_ioctx_t *ioctx)
209 int rados_ioctx_create2(rados_t cluster, int64_t pool_id, rados_ioctx_t *ioctx)
210 void rados_ioctx_destroy(rados_ioctx_t io)
211 void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key)
212 void rados_ioctx_set_namespace(rados_ioctx_t io, const char * nspace)
214 uint64_t rados_get_last_version(rados_ioctx_t io)
215 int rados_stat(rados_ioctx_t io, const char *o, uint64_t *psize, time_t *pmtime)
216 int rados_write(rados_ioctx_t io, const char *oid, const char *buf, size_t len, uint64_t off)
217 int rados_write_full(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
218 int rados_writesame(rados_ioctx_t io, const char *oid, const char *buf, size_t data_len, size_t write_len, uint64_t off)
219 int rados_append(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
220 int rados_read(rados_ioctx_t io, const char *oid, char *buf, size_t len, uint64_t off)
221 int rados_remove(rados_ioctx_t io, const char *oid)
222 int rados_trunc(rados_ioctx_t io, const char *oid, uint64_t size)
223 int rados_getxattr(rados_ioctx_t io, const char *o, const char *name, char *buf, size_t len)
224 int rados_setxattr(rados_ioctx_t io, const char *o, const char *name, const char *buf, size_t len)
225 int rados_rmxattr(rados_ioctx_t io, const char *o, const char *name)
226 int rados_getxattrs(rados_ioctx_t io, const char *oid, rados_xattrs_iter_t *iter)
227 int rados_getxattrs_next(rados_xattrs_iter_t iter, const char **name, const char **val, size_t *len)
228 void rados_getxattrs_end(rados_xattrs_iter_t iter)
230 int rados_nobjects_list_open(rados_ioctx_t io, rados_list_ctx_t *ctx)
231 int rados_nobjects_list_next(rados_list_ctx_t ctx, const char **entry, const char **key, const char **nspace)
232 void rados_nobjects_list_close(rados_list_ctx_t ctx)
234 int rados_ioctx_pool_requires_alignment2(rados_ioctx_t io, int * requires)
235 int rados_ioctx_pool_required_alignment2(rados_ioctx_t io, uint64_t * alignment)
237 int rados_ioctx_snap_rollback(rados_ioctx_t io, const char * oid, const char * snapname)
238 int rados_ioctx_snap_create(rados_ioctx_t io, const char * snapname)
239 int rados_ioctx_snap_remove(rados_ioctx_t io, const char * snapname)
240 int rados_ioctx_snap_lookup(rados_ioctx_t io, const char * name, rados_snap_t * id)
241 int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id, char * name, int maxlen)
242 void rados_ioctx_snap_set_read(rados_ioctx_t io, rados_snap_t snap)
243 int rados_ioctx_snap_list(rados_ioctx_t io, rados_snap_t * snaps, int maxlen)
244 int rados_ioctx_snap_get_stamp(rados_ioctx_t io, rados_snap_t id, time_t * t)
245 uint64_t rados_ioctx_get_id(rados_ioctx_t io)
246 int rados_ioctx_get_pool_name(rados_ioctx_t io, char *buf, unsigned maxlen)
248 int rados_ioctx_selfmanaged_snap_create(rados_ioctx_t io,
249 rados_snap_t *snapid)
250 int rados_ioctx_selfmanaged_snap_remove(rados_ioctx_t io,
252 int rados_ioctx_selfmanaged_snap_set_write_ctx(rados_ioctx_t io,
253 rados_snap_t snap_seq,
256 int rados_ioctx_selfmanaged_snap_rollback(rados_ioctx_t io, const char *oid,
259 int rados_lock_exclusive(rados_ioctx_t io, const char * oid, const char * name,
260 const char * cookie, const char * desc,
261 timeval * duration, uint8_t flags)
262 int rados_lock_shared(rados_ioctx_t io, const char * o, const char * name,
263 const char * cookie, const char * tag, const char * desc,
264 timeval * duration, uint8_t flags)
265 int rados_unlock(rados_ioctx_t io, const char * o, const char * name, const char * cookie)
267 rados_write_op_t rados_create_write_op()
268 void rados_release_write_op(rados_write_op_t write_op)
270 rados_read_op_t rados_create_read_op()
271 void rados_release_read_op(rados_read_op_t read_op)
273 int rados_aio_create_completion2(void * cb_arg, rados_callback_t cb_complete, rados_completion_t * pc)
274 void rados_aio_release(rados_completion_t c)
275 int rados_aio_stat(rados_ioctx_t io, const char *oid, rados_completion_t completion, uint64_t *psize, time_t *pmtime)
276 int rados_aio_write(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len, uint64_t off)
277 int rados_aio_append(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
278 int rados_aio_write_full(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
279 int rados_aio_writesame(rados_ioctx_t io, const char *oid, rados_completion_t completion, const char *buf, size_t data_len, size_t write_len, uint64_t off)
280 int rados_aio_remove(rados_ioctx_t io, const char * oid, rados_completion_t completion)
281 int rados_aio_read(rados_ioctx_t io, const char * oid, rados_completion_t completion, char * buf, size_t len, uint64_t off)
282 int rados_aio_flush(rados_ioctx_t io)
284 int rados_aio_get_return_value(rados_completion_t c)
285 int rados_aio_wait_for_complete_and_cb(rados_completion_t c)
286 int rados_aio_wait_for_complete(rados_completion_t c)
287 int rados_aio_is_complete(rados_completion_t c)
289 int rados_exec(rados_ioctx_t io, const char * oid, const char * cls, const char * method,
290 const char * in_buf, size_t in_len, char * buf, size_t out_len)
291 int rados_aio_exec(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * cls, const char * method,
292 const char * in_buf, size_t in_len, char * buf, size_t out_len)
294 int rados_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, const char * oid, time_t * mtime, int flags)
295 int rados_aio_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, time_t *mtime, int flags)
296 void rados_write_op_omap_set(rados_write_op_t write_op, const char * const* keys, const char * const* vals, const size_t * lens, size_t num)
297 void rados_write_op_omap_rm_keys(rados_write_op_t write_op, const char * const* keys, size_t keys_len)
298 void rados_write_op_omap_clear(rados_write_op_t write_op)
299 void rados_write_op_set_flags(rados_write_op_t write_op, int flags)
300 void rados_write_op_setxattr(rados_write_op_t write_op, const char *name, const char *value, size_t value_len)
301 void rados_write_op_rmxattr(rados_write_op_t write_op, const char *name)
303 void rados_write_op_create(rados_write_op_t write_op, int exclusive, const char *category)
304 void rados_write_op_append(rados_write_op_t write_op, const char *buffer, size_t len)
305 void rados_write_op_write_full(rados_write_op_t write_op, const char *buffer, size_t len)
306 void rados_write_op_assert_version(rados_write_op_t write_op, uint64_t ver)
307 void rados_write_op_write(rados_write_op_t write_op, const char *buffer, size_t len, uint64_t offset)
308 void rados_write_op_remove(rados_write_op_t write_op)
309 void rados_write_op_truncate(rados_write_op_t write_op, uint64_t offset)
310 void rados_write_op_zero(rados_write_op_t write_op, uint64_t offset, uint64_t len)
311 void rados_write_op_exec(rados_write_op_t write_op, const char *cls, const char *method, const char *in_buf, size_t in_len, int *prval)
312 void rados_write_op_writesame(rados_write_op_t write_op, const char *buffer, size_t data_len, size_t write_len, uint64_t offset)
313 void rados_read_op_omap_get_vals2(rados_read_op_t read_op, const char * start_after, const char * filter_prefix, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
314 void rados_read_op_omap_get_keys2(rados_read_op_t read_op, const char * start_after, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
315 void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op, const char * const* keys, size_t keys_len, rados_omap_iter_t * iter, int * prval)
316 int rados_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, const char * oid, int flags)
317 int rados_aio_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, int flags)
318 void rados_read_op_set_flags(rados_read_op_t read_op, int flags)
319 int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len)
320 void rados_omap_get_end(rados_omap_iter_t iter)
321 int rados_notify2(rados_ioctx_t io, const char * o, const char *buf, int buf_len, uint64_t timeout_ms, char **reply_buffer, size_t *reply_buffer_len)
324 LIBRADOS_OP_FLAG_EXCL = _LIBRADOS_OP_FLAG_EXCL
325 LIBRADOS_OP_FLAG_FAILOK = _LIBRADOS_OP_FLAG_FAILOK
326 LIBRADOS_OP_FLAG_FADVISE_RANDOM = _LIBRADOS_OP_FLAG_FADVISE_RANDOM
327 LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL = _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL
328 LIBRADOS_OP_FLAG_FADVISE_WILLNEED = _LIBRADOS_OP_FLAG_FADVISE_WILLNEED
329 LIBRADOS_OP_FLAG_FADVISE_DONTNEED = _LIBRADOS_OP_FLAG_FADVISE_DONTNEED
330 LIBRADOS_OP_FLAG_FADVISE_NOCACHE = _LIBRADOS_OP_FLAG_FADVISE_NOCACHE
332 LIBRADOS_SNAP_HEAD = _LIBRADOS_SNAP_HEAD
334 LIBRADOS_OPERATION_NOFLAG = _LIBRADOS_OPERATION_NOFLAG
335 LIBRADOS_OPERATION_BALANCE_READS = _LIBRADOS_OPERATION_BALANCE_READS
336 LIBRADOS_OPERATION_LOCALIZE_READS = _LIBRADOS_OPERATION_LOCALIZE_READS
337 LIBRADOS_OPERATION_ORDER_READS_WRITES = _LIBRADOS_OPERATION_ORDER_READS_WRITES
338 LIBRADOS_OPERATION_IGNORE_CACHE = _LIBRADOS_OPERATION_IGNORE_CACHE
339 LIBRADOS_OPERATION_SKIPRWLOCKS = _LIBRADOS_OPERATION_SKIPRWLOCKS
340 LIBRADOS_OPERATION_IGNORE_OVERLAY = _LIBRADOS_OPERATION_IGNORE_OVERLAY
342 LIBRADOS_ALL_NSPACES = _LIBRADOS_ALL_NSPACES.decode('utf-8')
344 LIBRADOS_CREATE_EXCLUSIVE = _LIBRADOS_CREATE_EXCLUSIVE
345 LIBRADOS_CREATE_IDEMPOTENT = _LIBRADOS_CREATE_IDEMPOTENT
347 ANONYMOUS_AUID = 0xffffffffffffffff
351 class Error(Exception):
352 """ `Error` class, derived from `Exception` """
353 def __init__(self, message, errno=None):
354 super(Exception, self).__init__(message)
358 msg = super(Exception, self).__str__()
359 if self.errno is None:
361 return '[errno {0}] {1}'.format(self.errno, msg)
363 def __reduce__(self):
364 return (self.__class__, (self.message, self.errno))
366 class InvalidArgumentError(Error):
367 def __init__(self, message, errno=None):
368 super(InvalidArgumentError, self).__init__(
369 "RADOS invalid argument (%s)" % message, errno)
372 class OSError(Error):
373 """ `OSError` class, derived from `Error` """
376 class InterruptedOrTimeoutError(OSError):
377 """ `InterruptedOrTimeoutError` class, derived from `OSError` """
378 def __init__(self, message, errno=None):
379 super(InterruptedOrTimeoutError, self).__init__(
380 "RADOS interrupted or timeout (%s)" % message, errno)
383 class PermissionError(OSError):
384 """ `PermissionError` class, derived from `OSError` """
385 def __init__(self, message, errno=None):
386 super(PermissionError, self).__init__(
387 "RADOS permission error (%s)" % message, errno)
390 class PermissionDeniedError(OSError):
391 """ deal with EACCES related. """
392 def __init__(self, message, errno=None):
393 super(PermissionDeniedError, self).__init__(
394 "RADOS permission denied (%s)" % message, errno)
397 class ObjectNotFound(OSError):
398 """ `ObjectNotFound` class, derived from `OSError` """
399 def __init__(self, message, errno=None):
400 super(ObjectNotFound, self).__init__(
401 "RADOS object not found (%s)" % message, errno)
404 class NoData(OSError):
405 """ `NoData` class, derived from `OSError` """
406 def __init__(self, message, errno=None):
407 super(NoData, self).__init__(
408 "RADOS no data (%s)" % message, errno)
411 class ObjectExists(OSError):
412 """ `ObjectExists` class, derived from `OSError` """
413 def __init__(self, message, errno=None):
414 super(ObjectExists, self).__init__(
415 "RADOS object exists (%s)" % message, errno)
418 class ObjectBusy(OSError):
419 """ `ObjectBusy` class, derived from `IOError` """
420 def __init__(self, message, errno=None):
421 super(ObjectBusy, self).__init__(
422 "RADOS object busy (%s)" % message, errno)
425 class IOError(OSError):
426 """ `ObjectBusy` class, derived from `OSError` """
427 def __init__(self, message, errno=None):
428 super(IOError, self).__init__(
429 "RADOS I/O error (%s)" % message, errno)
432 class NoSpace(OSError):
433 """ `NoSpace` class, derived from `OSError` """
434 def __init__(self, message, errno=None):
435 super(NoSpace, self).__init__(
436 "RADOS no space (%s)" % message, errno)
439 class RadosStateError(Error):
440 """ `RadosStateError` class, derived from `Error` """
441 def __init__(self, message, errno=None):
442 super(RadosStateError, self).__init__(
443 "RADOS rados state (%s)" % message, errno)
446 class IoctxStateError(Error):
447 """ `IoctxStateError` class, derived from `Error` """
448 def __init__(self, message, errno=None):
449 super(IoctxStateError, self).__init__(
450 "RADOS Ioctx state error (%s)" % message, errno)
453 class ObjectStateError(Error):
454 """ `ObjectStateError` class, derived from `Error` """
455 def __init__(self, message, errno=None):
456 super(ObjectStateError, self).__init__(
457 "RADOS object state error (%s)" % message, errno)
460 class LogicError(Error):
461 """ `` class, derived from `Error` """
462 def __init__(self, message, errno=None):
463 super(LogicError, self).__init__(
464 "RADOS logic error (%s)" % message, errno)
467 class TimedOut(OSError):
468 """ `TimedOut` class, derived from `OSError` """
469 def __init__(self, message, errno=None):
470 super(TimedOut, self).__init__(
471 "RADOS timed out (%s)" % message, errno)
474 class InProgress(Error):
475 """ `InProgress` class, derived from `Error` """
476 def __init__(self, message, errno=None):
477 super(InProgress, self).__init__(
478 "RADOS in progress error (%s)" % message, errno)
481 class IsConnected(Error):
482 """ `IsConnected` class, derived from `Error` """
483 def __init__(self, message, errno=None):
484 super(IsConnected, self).__init__(
485 "RADOS is connected error (%s)" % message, errno)
488 IF UNAME_SYSNAME == "FreeBSD":
489 cdef errno_to_exception = {
490 errno.EPERM : PermissionError,
491 errno.ENOENT : ObjectNotFound,
493 errno.ENOSPC : NoSpace,
494 errno.EEXIST : ObjectExists,
495 errno.EBUSY : ObjectBusy,
496 errno.ENOATTR : NoData,
497 errno.EINTR : InterruptedOrTimeoutError,
498 errno.ETIMEDOUT : TimedOut,
499 errno.EACCES : PermissionDeniedError,
500 errno.EINPROGRESS : InProgress,
501 errno.EISCONN : IsConnected,
502 errno.EINVAL : InvalidArgumentError,
505 cdef errno_to_exception = {
506 errno.EPERM : PermissionError,
507 errno.ENOENT : ObjectNotFound,
509 errno.ENOSPC : NoSpace,
510 errno.EEXIST : ObjectExists,
511 errno.EBUSY : ObjectBusy,
512 errno.ENODATA : NoData,
513 errno.EINTR : InterruptedOrTimeoutError,
514 errno.ETIMEDOUT : TimedOut,
515 errno.EACCES : PermissionDeniedError,
516 errno.EINPROGRESS : InProgress,
517 errno.EISCONN : IsConnected,
518 errno.EINVAL : InvalidArgumentError,
522 cdef make_ex(ret, msg):
524 Translate a librados return code into an exception.
526 :param ret: the return code
528 :param msg: the error message to use
530 :returns: a subclass of :class:`Error`
533 if ret in errno_to_exception:
534 return errno_to_exception[ret](msg, errno=ret)
536 return OSError(msg, errno=ret)
539 # helper to specify an optional argument, where in addition to `cls`, `None`
545 # validate argument types of an instance method
546 # kwargs is an un-ordered dict, so use args instead
547 def requires(*types):
548 def is_type_of(v, t):
552 return isinstance(v, t)
554 def check_type(val, arg_name, arg_type):
555 if isinstance(arg_type, tuple):
556 if any(is_type_of(val, t) for t in arg_type):
558 type_names = ' or '.join('None' if t is None else t.__name__
560 raise TypeError('%s must be %s' % (arg_name, type_names))
562 if is_type_of(val, arg_type):
564 assert(arg_type is not None)
565 raise TypeError('%s must be %s' % (arg_name, arg_type.__name__))
568 # FIXME(sileht): this stop with
569 # AttributeError: 'method_descriptor' object has no attribute '__module__'
571 def validate_func(*args, **kwargs):
572 # ignore the `self` arg
573 pos_args = zip(args[1:], types)
574 named_args = ((kwargs[name], (name, spec)) for name, spec in types
576 for arg_val, (arg_name, arg_type) in chain(pos_args, named_args):
577 check_type(arg_val, arg_name, arg_type)
578 return f(*args, **kwargs)
583 def cstr(val, name, encoding="utf-8", opt=False):
585 Create a byte string from a Python string
587 :param basestring val: Python string
588 :param str name: Name of the string parameter, for exceptions
589 :param str encoding: Encoding to use
590 :param bool opt: If True, None is allowed
592 :raises: :class:`InvalidArgument`
594 if opt and val is None:
596 if isinstance(val, bytes):
598 elif isinstance(val, unicode):
599 return val.encode(encoding)
601 raise TypeError('%s must be a string' % name)
604 def cstr_list(list_str, name, encoding="utf-8"):
605 return [cstr(s, name) for s in list_str]
608 def decode_cstr(val, encoding="utf-8"):
610 Decode a byte string into a Python string.
612 :param bytes val: byte string
613 :rtype: unicode or None
618 return val.decode(encoding)
621 def flatten_dict(d, name):
622 items = chain.from_iterable(d.items())
623 return cstr(''.join(i + '\0' for i in items), name)
626 cdef char* opt_str(s) except? NULL:
632 cdef void* realloc_chk(void* ptr, size_t size) except NULL:
633 cdef void *ret = realloc(ptr, size)
635 raise MemoryError("realloc failed")
639 cdef size_t * to_csize_t_array(list_int):
640 cdef size_t *ret = <size_t *>malloc(len(list_int) * sizeof(size_t))
642 raise MemoryError("malloc failed")
643 for i in xrange(len(list_int)):
644 ret[i] = <size_t>list_int[i]
648 cdef char ** to_bytes_array(list_bytes):
649 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
651 raise MemoryError("malloc failed")
652 for i in xrange(len(list_bytes)):
653 ret[i] = <char *>list_bytes[i]
658 cdef int __monitor_callback(void *arg, const char *line, const char *who,
659 uint64_t sec, uint64_t nsec, uint64_t seq,
660 const char *level, const char *msg) with gil:
661 cdef object cb_info = <object>arg
662 cb_info[0](cb_info[1], line, who, sec, nsec, seq, level, msg)
665 cdef int __monitor_callback2(void *arg, const char *line, const char *channel,
668 uint64_t sec, uint64_t nsec, uint64_t seq,
669 const char *level, const char *msg) with gil:
670 cdef object cb_info = <object>arg
671 cb_info[0](cb_info[1], line, channel, name, who, sec, nsec, seq, level, msg)
675 class Version(object):
676 """ Version information """
677 def __init__(self, major, minor, extra):
683 return "%d.%d.%d" % (self.major, self.minor, self.extra)
686 cdef class Rados(object):
687 """This class wraps librados functions"""
688 # NOTE(sileht): attributes declared in .pyd
690 def __init__(self, *args, **kwargs):
692 self.__setup(*args, **kwargs)
694 @requires(('rados_id', opt(str_type)), ('name', opt(str_type)), ('clustername', opt(str_type)),
695 ('conffile', opt(str_type)))
696 def __setup(self, rados_id=None, name=None, clustername=None,
697 conf_defaults=None, conffile=None, conf=None, flags=0,
699 self.monitor_callback = None
700 self.monitor_callback2 = None
701 self.parsed_args = []
702 self.conf_defaults = conf_defaults
703 self.conffile = conffile
704 self.rados_id = rados_id
706 if rados_id and name:
707 raise Error("Rados(): can't supply both rados_id and name")
709 name = 'client.' + rados_id
711 name = 'client.admin'
712 if clustername is None:
715 name = cstr(name, 'name')
716 clustername = cstr(clustername, 'clustername')
719 char *_clustername = clustername
724 # Unpack void* (aka rados_config_t) from capsule
725 rados_config = <rados_config_t> PyCapsule_GetPointer(context, NULL)
727 ret = rados_create_with_context(&self.cluster, rados_config)
730 ret = rados_create2(&self.cluster, _clustername, _name, _flags)
732 raise Error("rados_initialize failed with error code: %d" % ret)
734 self.state = "configuring"
735 # order is important: conf_defaults, then conffile, then conf
737 for key, value in conf_defaults.items():
738 self.conf_set(key, value)
739 if conffile is not None:
740 # read the default conf file when '' is given
743 self.conf_read_file(conffile)
745 for key, value in conf.items():
746 self.conf_set(key, value)
750 Get associated client addresses with this RADOS session.
752 self.require_state("configuring", "connected")
760 ret = rados_getaddrs(self.cluster, &addrs)
762 raise make_ex(ret, "error calling getaddrs")
764 return decode_cstr(addrs)
768 def require_state(self, *args):
770 Checks if the Rados object is in a special state
772 :raises: :class:`RadosStateError`
774 if self.state in args:
776 raise RadosStateError("You cannot perform that operation on a \
777 Rados object in state %s." % self.state)
781 Disconnects from the cluster. Call this explicitly when a
782 Rados.connect()ed object is no longer used.
784 if self.state != "shutdown":
786 rados_shutdown(self.cluster)
787 self.state = "shutdown"
793 def __exit__(self, type_, value, traceback):
799 Get the version number of the ``librados`` C library.
801 :returns: a tuple of ``(major, minor, extra)`` components of the
808 rados_version(&major, &minor, &extra)
809 return Version(major, minor, extra)
811 @requires(('path', opt(str_type)))
812 def conf_read_file(self, path=None):
814 Configure the cluster handle using a Ceph config file.
816 :param path: path to the config file
819 self.require_state("configuring", "connected")
820 path = cstr(path, 'path', opt=True)
822 char *_path = opt_str(path)
824 ret = rados_conf_read_file(self.cluster, _path)
826 raise make_ex(ret, "error calling conf_read_file")
828 def conf_parse_argv(self, args):
830 Parse known arguments from args, and remove; returned
831 args contain only those unknown to ceph
833 self.require_state("configuring", "connected")
837 cargs = cstr_list(args, 'args')
839 int _argc = len(args)
840 char **_argv = to_bytes_array(cargs)
841 char **_remargv = NULL
844 _remargv = <char **>malloc(_argc * sizeof(char *))
846 ret = rados_conf_parse_argv_remainder(self.cluster, _argc,
848 <const char**>_remargv)
850 raise make_ex(ret, "error calling conf_parse_argv_remainder")
852 # _remargv was allocated with fixed argc; collapse return
853 # list to eliminate any missing args
854 retargs = [decode_cstr(a) for a in _remargv[:_argc]
856 self.parsed_args = args
862 def conf_parse_env(self, var='CEPH_ARGS'):
864 Parse known arguments from an environment variable, normally
867 self.require_state("configuring", "connected")
871 var = cstr(var, 'var')
875 ret = rados_conf_parse_env(self.cluster, _var)
877 raise make_ex(ret, "error calling conf_parse_env")
879 @requires(('option', str_type))
880 def conf_get(self, option):
882 Get the value of a configuration option
884 :param option: which option to read
887 :returns: str - value of the option or None
888 :raises: :class:`TypeError`
890 self.require_state("configuring", "connected")
891 option = cstr(option, 'option')
893 char *_option = option
899 ret_buf = <char *>realloc_chk(ret_buf, length)
901 ret = rados_conf_get(self.cluster, _option, ret_buf, length)
903 return decode_cstr(ret_buf)
904 elif ret == -errno.ENAMETOOLONG:
906 elif ret == -errno.ENOENT:
909 raise make_ex(ret, "error calling conf_get")
913 @requires(('option', str_type), ('val', str_type))
914 def conf_set(self, option, val):
916 Set the value of a configuration option
918 :param option: which option to set
920 :param option: value of the option
923 :raises: :class:`TypeError`, :class:`ObjectNotFound`
925 self.require_state("configuring", "connected")
926 option = cstr(option, 'option')
927 val = cstr(val, 'val')
929 char *_option = option
933 ret = rados_conf_set(self.cluster, _option, _val)
935 raise make_ex(ret, "error calling conf_set")
937 def ping_monitor(self, mon_id):
939 Ping a monitor to assess liveness
941 May be used as a simply way to assess liveness, or to obtain
942 information about the monitor in a simple way even in the
945 :param mon_id: the ID portion of the monitor's name (i.e., mon.<ID>)
947 :returns: the string reply from the monitor
950 self.require_state("configuring", "connected")
952 mon_id = cstr(mon_id, 'mon_id')
954 char *_mon_id = mon_id
959 ret = rados_ping_monitor(self.cluster, _mon_id, &outstr, &outstrlen)
962 raise make_ex(ret, "error calling ping_monitor")
965 my_outstr = outstr[:outstrlen]
966 rados_buffer_free(outstr)
967 return decode_cstr(my_outstr)
969 def connect(self, timeout=0):
971 Connect to the cluster. Use shutdown() to release resources.
973 self.require_state("configuring")
974 # NOTE(sileht): timeout was supported by old python API,
975 # but this is not something available in C API, so ignore
976 # for now and remove it later
978 ret = rados_connect(self.cluster)
980 raise make_ex(ret, "error connecting to the cluster")
981 self.state = "connected"
983 def get_instance_id(self):
985 Get a global id for current instance
987 self.require_state("connected")
989 ret = rados_get_instance_id(self.cluster)
992 def get_cluster_stats(self):
994 Read usage info about the cluster
996 This tells you total space, space used, space available, and number
997 of objects. These are not updated immediately when data is written,
998 they are eventually consistent.
1000 :returns: dict - contains the following keys:
1002 - ``kb`` (int) - total space
1004 - ``kb_used`` (int) - space used
1006 - ``kb_avail`` (int) - free space available
1008 - ``num_objects`` (int) - number of objects
1012 rados_cluster_stat_t stats
1015 ret = rados_cluster_stat(self.cluster, &stats)
1019 ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id)
1020 return {'kb': stats.kb,
1021 'kb_used': stats.kb_used,
1022 'kb_avail': stats.kb_avail,
1023 'num_objects': stats.num_objects}
1025 @requires(('pool_name', str_type))
1026 def pool_exists(self, pool_name):
1028 Checks if a given pool exists.
1030 :param pool_name: name of the pool to check
1031 :type pool_name: str
1033 :raises: :class:`TypeError`, :class:`Error`
1034 :returns: bool - whether the pool exists, false otherwise.
1036 self.require_state("connected")
1038 pool_name = cstr(pool_name, 'pool_name')
1040 char *_pool_name = pool_name
1043 ret = rados_pool_lookup(self.cluster, _pool_name)
1046 elif ret == -errno.ENOENT:
1049 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
1051 @requires(('pool_name', str_type))
1052 def pool_lookup(self, pool_name):
1054 Returns a pool's ID based on its name.
1056 :param pool_name: name of the pool to look up
1057 :type pool_name: str
1059 :raises: :class:`TypeError`, :class:`Error`
1060 :returns: int - pool ID, or None if it doesn't exist
1062 self.require_state("connected")
1063 pool_name = cstr(pool_name, 'pool_name')
1065 char *_pool_name = pool_name
1068 ret = rados_pool_lookup(self.cluster, _pool_name)
1071 elif ret == -errno.ENOENT:
1074 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
1076 @requires(('pool_id', int))
1077 def pool_reverse_lookup(self, pool_id):
1079 Returns a pool's name based on its ID.
1081 :param pool_id: ID of the pool to look up
1084 :raises: :class:`TypeError`, :class:`Error`
1085 :returns: string - pool name, or None if it doesn't exist
1087 self.require_state("connected")
1089 int64_t _pool_id = pool_id
1095 name = <char *>realloc_chk(name, size)
1097 ret = rados_pool_reverse_lookup(self.cluster, _pool_id, name, size)
1100 elif ret != -errno.ERANGE and size <= 4096:
1102 elif ret == -errno.ENOENT:
1105 raise make_ex(ret, "error reverse looking up pool '%s'" % pool_id)
1107 return decode_cstr(name)
1112 @requires(('pool_name', str_type), ('crush_rule', opt(int)), ('auid', opt(int)))
1113 def create_pool(self, pool_name, crush_rule=None, auid=None):
1116 - with default settings: if crush_rule=None and auid=None
1117 - with a specific CRUSH rule: crush_rule given
1118 - with a specific auid: auid given
1119 - with a specific CRUSH rule and auid: crush_rule and auid given
1121 :param pool_name: name of the pool to create
1122 :type pool_name: str
1123 :param crush_rule: rule to use for placement in the new pool
1124 :type crush_rule: int
1125 :param auid: id of the owner of the new pool
1128 :raises: :class:`TypeError`, :class:`Error`
1130 self.require_state("connected")
1132 pool_name = cstr(pool_name, 'pool_name')
1134 char *_pool_name = pool_name
1138 if crush_rule is None and auid is None:
1140 ret = rados_pool_create(self.cluster, _pool_name)
1141 elif crush_rule is not None and auid is None:
1142 _crush_rule = crush_rule
1144 ret = rados_pool_create_with_crush_rule(self.cluster, _pool_name, _crush_rule)
1145 elif crush_rule is None and auid is not None:
1148 ret = rados_pool_create_with_auid(self.cluster, _pool_name, _auid)
1150 _crush_rule = crush_rule
1153 ret = rados_pool_create_with_all(self.cluster, _pool_name, _auid, _crush_rule)
1155 raise make_ex(ret, "error creating pool '%s'" % pool_name)
1157 @requires(('pool_id', int))
1158 def get_pool_base_tier(self, pool_id):
1162 :returns: base pool, or pool_id if tiering is not configured for the pool
1164 self.require_state("connected")
1166 int64_t base_tier = 0
1167 int64_t _pool_id = pool_id
1170 ret = rados_pool_get_base_tier(self.cluster, _pool_id, &base_tier)
1172 raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id)
1173 return int(base_tier)
1175 @requires(('pool_name', str_type))
1176 def delete_pool(self, pool_name):
1178 Delete a pool and all data inside it.
1180 The pool is removed from the cluster immediately,
1181 but the actual data is deleted in the background.
1183 :param pool_name: name of the pool to delete
1184 :type pool_name: str
1186 :raises: :class:`TypeError`, :class:`Error`
1188 self.require_state("connected")
1190 pool_name = cstr(pool_name, 'pool_name')
1192 char *_pool_name = pool_name
1195 ret = rados_pool_delete(self.cluster, _pool_name)
1197 raise make_ex(ret, "error deleting pool '%s'" % pool_name)
1199 @requires(('pool_id', int))
1200 def get_inconsistent_pgs(self, pool_id):
1202 List inconsistent placement groups in the given pool
1204 :param pool_id: ID of the pool in which PGs are listed
1206 :returns: list - inconsistent placement groups
1208 self.require_state("connected")
1210 int64_t pool = pool_id
1216 pgs = <char *>realloc_chk(pgs, size);
1218 ret = rados_inconsistent_pg_list(self.cluster, pool,
1225 raise make_ex(ret, "error calling inconsistent_pg_list")
1226 return [pg for pg in decode_cstr(pgs[:ret]).split('\0') if pg]
1230 def list_pools(self):
1232 Gets a list of pool names.
1234 :returns: list - of pool names.
1236 self.require_state("connected")
1239 char *c_names = NULL
1243 c_names = <char *>realloc_chk(c_names, size)
1245 ret = rados_pool_list(self.cluster, c_names, size)
1250 return [name for name in decode_cstr(c_names[:ret]).split('\0')
1257 Get the fsid of the cluster as a hexadecimal string.
1259 :raises: :class:`Error`
1260 :returns: str - cluster fsid
1262 self.require_state("connected")
1264 char *ret_buf = NULL
1269 ret_buf = <char *>realloc_chk(ret_buf, buf_len)
1271 ret = rados_cluster_fsid(self.cluster, ret_buf, buf_len)
1272 if ret == -errno.ERANGE:
1273 buf_len = buf_len * 2
1275 raise make_ex(ret, "error getting cluster fsid")
1278 return decode_cstr(ret_buf)
1282 @requires(('ioctx_name', str_type))
1283 def open_ioctx(self, ioctx_name):
1285 Create an io context
1287 The io context allows you to perform operations within a particular
1290 :param ioctx_name: name of the pool
1291 :type ioctx_name: str
1293 :raises: :class:`TypeError`, :class:`Error`
1294 :returns: Ioctx - Rados Ioctx object
1296 self.require_state("connected")
1297 ioctx_name = cstr(ioctx_name, 'ioctx_name')
1300 char *_ioctx_name = ioctx_name
1302 ret = rados_ioctx_create(self.cluster, _ioctx_name, &ioctx)
1304 raise make_ex(ret, "error opening pool '%s'" % ioctx_name)
1305 io = Ioctx(ioctx_name)
1309 @requires(('pool_id', int))
1310 def open_ioctx2(self, pool_id):
1312 Create an io context
1314 The io context allows you to perform operations within a particular
1317 :param pool_id: ID of the pool
1320 :raises: :class:`TypeError`, :class:`Error`
1321 :returns: Ioctx - Rados Ioctx object
1323 self.require_state("connected")
1326 int64_t _pool_id = pool_id
1328 ret = rados_ioctx_create2(self.cluster, _pool_id, &ioctx)
1330 raise make_ex(ret, "error opening pool id '%s'" % pool_id)
1331 io = Ioctx(str(pool_id))
1335 def mon_command(self, cmd, inbuf, timeout=0, target=None):
1337 Send a command to the mon.
1339 mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen)
1341 :param cmd: JSON formatted string.
1342 :param inbuf: optional string.
1343 :param timeout: This parameter is ignored.
1344 :param target: name of a specific mon. Optional
1345 :return: (int ret, string outbuf, string outs)
1350 >>> c = Rados(conffile='/etc/ceph/ceph.conf')
1352 >>> cmd = json.dumps({"prefix": "osd safe-to-destroy", "ids": ["2"], "format": "json"})
1353 >>> c.mon_command(cmd, b'')
1355 # NOTE(sileht): timeout is ignored because C API doesn't provide
1356 # timeout argument, but we keep it for backward compat with old python binding
1358 self.require_state("connected")
1359 cmd = cstr_list(cmd, 'c')
1361 if isinstance(target, int):
1362 # NOTE(sileht): looks weird but test_monmap_dump pass int
1363 target = str(target)
1365 target = cstr(target, 'target', opt=True)
1366 inbuf = cstr(inbuf, 'inbuf')
1369 char *_target = opt_str(target)
1370 char **_cmd = to_bytes_array(cmd)
1371 size_t _cmdlen = len(cmd)
1373 char *_inbuf = inbuf
1374 size_t _inbuf_len = len(inbuf)
1384 ret = rados_mon_command_target(self.cluster, _target,
1385 <const char **>_cmd, _cmdlen,
1386 <const char*>_inbuf, _inbuf_len,
1387 &_outbuf, &_outbuf_len,
1391 ret = rados_mon_command(self.cluster,
1392 <const char **>_cmd, _cmdlen,
1393 <const char*>_inbuf, _inbuf_len,
1394 &_outbuf, &_outbuf_len,
1397 my_outs = decode_cstr(_outs[:_outs_len])
1398 my_outbuf = _outbuf[:_outbuf_len]
1400 rados_buffer_free(_outs)
1402 rados_buffer_free(_outbuf)
1403 return (ret, my_outbuf, my_outs)
1407 def osd_command(self, osdid, cmd, inbuf, timeout=0):
1409 osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1411 :return: (int ret, string outbuf, string outs)
1413 # NOTE(sileht): timeout is ignored because C API doesn't provide
1414 # timeout argument, but we keep it for backward compat with old python binding
1415 self.require_state("connected")
1417 cmd = cstr_list(cmd, 'cmd')
1418 inbuf = cstr(inbuf, 'inbuf')
1422 char **_cmd = to_bytes_array(cmd)
1423 size_t _cmdlen = len(cmd)
1425 char *_inbuf = inbuf
1426 size_t _inbuf_len = len(inbuf)
1435 ret = rados_osd_command(self.cluster, _osdid,
1436 <const char **>_cmd, _cmdlen,
1437 <const char*>_inbuf, _inbuf_len,
1438 &_outbuf, &_outbuf_len,
1441 my_outs = decode_cstr(_outs[:_outs_len])
1442 my_outbuf = _outbuf[:_outbuf_len]
1444 rados_buffer_free(_outs)
1446 rados_buffer_free(_outbuf)
1447 return (ret, my_outbuf, my_outs)
1451 def mgr_command(self, cmd, inbuf, timeout=0, target=None):
1453 :return: (int ret, string outbuf, string outs)
1455 # NOTE(sileht): timeout is ignored because C API doesn't provide
1456 # timeout argument, but we keep it for backward compat with old python binding
1457 self.require_state("connected")
1459 cmd = cstr_list(cmd, 'cmd')
1460 inbuf = cstr(inbuf, 'inbuf')
1461 target = cstr(target, 'target', opt=True)
1464 char *_target = opt_str(target)
1466 char **_cmd = to_bytes_array(cmd)
1467 size_t _cmdlen = len(cmd)
1469 char *_inbuf = inbuf
1470 size_t _inbuf_len = len(inbuf)
1478 if target is not None:
1480 ret = rados_mgr_command_target(self.cluster,
1481 <const char*>_target,
1482 <const char **>_cmd, _cmdlen,
1483 <const char*>_inbuf, _inbuf_len,
1484 &_outbuf, &_outbuf_len,
1488 ret = rados_mgr_command(self.cluster,
1489 <const char **>_cmd, _cmdlen,
1490 <const char*>_inbuf, _inbuf_len,
1491 &_outbuf, &_outbuf_len,
1494 my_outs = decode_cstr(_outs[:_outs_len])
1495 my_outbuf = _outbuf[:_outbuf_len]
1497 rados_buffer_free(_outs)
1499 rados_buffer_free(_outbuf)
1500 return (ret, my_outbuf, my_outs)
1504 def pg_command(self, pgid, cmd, inbuf, timeout=0):
1506 pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1508 :return: (int ret, string outbuf, string outs)
1510 # NOTE(sileht): timeout is ignored because C API doesn't provide
1511 # timeout argument, but we keep it for backward compat with old python binding
1512 self.require_state("connected")
1514 pgid = cstr(pgid, 'pgid')
1515 cmd = cstr_list(cmd, 'cmd')
1516 inbuf = cstr(inbuf, 'inbuf')
1520 char **_cmd = to_bytes_array(cmd)
1521 size_t _cmdlen = len(cmd)
1523 char *_inbuf = inbuf
1524 size_t _inbuf_len = len(inbuf)
1533 ret = rados_pg_command(self.cluster, _pgid,
1534 <const char **>_cmd, _cmdlen,
1535 <const char *>_inbuf, _inbuf_len,
1536 &_outbuf, &_outbuf_len,
1539 my_outs = decode_cstr(_outs[:_outs_len])
1540 my_outbuf = _outbuf[:_outbuf_len]
1542 rados_buffer_free(_outs)
1544 rados_buffer_free(_outbuf)
1545 return (ret, my_outbuf, my_outs)
1549 def wait_for_latest_osdmap(self):
1550 self.require_state("connected")
1552 ret = rados_wait_for_latest_osdmap(self.cluster)
1555 def blacklist_add(self, client_address, expire_seconds=0):
1557 Blacklist a client from the OSDs
1559 :param client_address: client address
1560 :type client_address: str
1561 :param expire_seconds: number of seconds to blacklist
1562 :type expire_seconds: int
1564 :raises: :class:`Error`
1566 self.require_state("connected")
1567 client_address = cstr(client_address, 'client_address')
1569 uint32_t _expire_seconds = expire_seconds
1570 char *_client_address = client_address
1573 ret = rados_blacklist_add(self.cluster, _client_address, _expire_seconds)
1575 raise make_ex(ret, "error blacklisting client '%s'" % client_address)
1577 def monitor_log(self, level, callback, arg):
1578 if level not in MONITOR_LEVELS:
1579 raise LogicError("invalid monitor level " + level)
1580 if callback is not None and not callable(callback):
1581 raise LogicError("callback must be a callable function or None")
1583 level = cstr(level, 'level')
1584 cdef char *_level = level
1586 if callback is None:
1588 r = rados_monitor_log(self.cluster, <const char*>_level, NULL, NULL)
1589 self.monitor_callback = None
1590 self.monitor_callback2 = None
1593 cb = (callback, arg)
1594 cdef PyObject* _arg = <PyObject*>cb
1596 r = rados_monitor_log(self.cluster, <const char*>_level,
1597 <rados_log_callback_t>&__monitor_callback, _arg)
1600 raise make_ex(r, 'error calling rados_monitor_log')
1601 # NOTE(sileht): Prevents the callback method from being garbage collected
1602 self.monitor_callback = cb
1603 self.monitor_callback2 = None
1605 def monitor_log2(self, level, callback, arg):
1606 if level not in MONITOR_LEVELS:
1607 raise LogicError("invalid monitor level " + level)
1608 if callback is not None and not callable(callback):
1609 raise LogicError("callback must be a callable function or None")
1611 level = cstr(level, 'level')
1612 cdef char *_level = level
1614 if callback is None:
1616 r = rados_monitor_log2(self.cluster, <const char*>_level, NULL, NULL)
1617 self.monitor_callback = None
1618 self.monitor_callback2 = None
1621 cb = (callback, arg)
1622 cdef PyObject* _arg = <PyObject*>cb
1624 r = rados_monitor_log2(self.cluster, <const char*>_level,
1625 <rados_log_callback2_t>&__monitor_callback2, _arg)
1628 raise make_ex(r, 'error calling rados_monitor_log')
1629 # NOTE(sileht): Prevents the callback method from being garbage collected
1630 self.monitor_callback = None
1631 self.monitor_callback2 = cb
1633 @requires(('service', str_type), ('daemon', str_type), ('metadata', dict))
1634 def service_daemon_register(self, service, daemon, metadata):
1636 :param str service: service name (e.g. "rgw")
1637 :param str daemon: daemon name (e.g. "gwfoo")
1638 :param dict metadata: static metadata about the register daemon
1639 (e.g., the version of Ceph, the kernel version.)
1641 service = cstr(service, 'service')
1642 daemon = cstr(daemon, 'daemon')
1643 metadata_dict = flatten_dict(metadata, 'metadata')
1645 char *_service = service
1646 char *_daemon = daemon
1647 char *_metadata = metadata_dict
1650 ret = rados_service_register(self.cluster, _service, _daemon, _metadata)
1652 raise make_ex(ret, "error calling service_register()")
1654 @requires(('metadata', dict))
1655 def service_daemon_update(self, status):
1656 status_dict = flatten_dict(status, 'status')
1658 char *_status = status_dict
1661 ret = rados_service_update_status(self.cluster, _status)
1663 raise make_ex(ret, "error calling service_daemon_update()")
1666 cdef class OmapIterator(object):
1669 cdef public Ioctx ioctx
1670 cdef rados_omap_iter_t ctx
1672 def __cinit__(self, Ioctx ioctx):
1680 Get the next key-value pair in the object
1681 :returns: next rados.OmapItem
1689 ret = rados_omap_get_next(self.ctx, &key_, &val_, &len_)
1692 raise make_ex(ret, "error iterating over the omap")
1694 raise StopIteration()
1695 key = decode_cstr(key_)
1701 def __dealloc__(self):
1703 rados_omap_get_end(self.ctx)
1706 cdef class ObjectIterator(object):
1707 """rados.Ioctx Object iterator"""
1709 cdef rados_list_ctx_t ctx
1711 cdef public object ioctx
1713 def __cinit__(self, Ioctx ioctx):
1717 ret = rados_nobjects_list_open(ioctx.io, &self.ctx)
1719 raise make_ex(ret, "error iterating over the objects in ioctx '%s'"
1727 Get the next object name and locator in the pool
1729 :raises: StopIteration
1730 :returns: next rados.Ioctx Object
1733 const char *key_ = NULL
1734 const char *locator_ = NULL
1735 const char *nspace_ = NULL
1738 ret = rados_nobjects_list_next(self.ctx, &key_, &locator_, &nspace_)
1741 raise StopIteration()
1743 key = decode_cstr(key_)
1744 locator = decode_cstr(locator_) if locator_ != NULL else None
1745 nspace = decode_cstr(nspace_) if nspace_ != NULL else None
1746 return Object(self.ioctx, key, locator, nspace)
1748 def __dealloc__(self):
1750 rados_nobjects_list_close(self.ctx)
1753 cdef class XattrIterator(object):
1754 """Extended attribute iterator"""
1756 cdef rados_xattrs_iter_t it
1759 cdef public Ioctx ioctx
1760 cdef public object oid
1762 def __cinit__(self, Ioctx ioctx, oid):
1764 self.oid = cstr(oid, 'oid')
1765 self._oid = self.oid
1768 ret = rados_getxattrs(ioctx.io, self._oid, &self.it)
1770 raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid)
1777 Get the next xattr on the object
1779 :raises: StopIteration
1780 :returns: pair - of name and value of the next Xattr
1783 const char *name_ = NULL
1784 const char *val_ = NULL
1788 ret = rados_getxattrs_next(self.it, &name_, &val_, &len_)
1790 raise make_ex(ret, "error iterating over the extended attributes \
1791 in '%s'" % self.oid)
1793 raise StopIteration()
1794 name = decode_cstr(name_)
1798 def __dealloc__(self):
1800 rados_getxattrs_end(self.it)
1803 cdef class SnapIterator(object):
1804 """Snapshot iterator"""
1806 cdef public Ioctx ioctx
1808 cdef rados_snap_t *snaps
1812 def __cinit__(self, Ioctx ioctx):
1814 # We don't know how big a buffer we need until we've called the
1815 # function. So use the exponential doubling strategy.
1816 cdef int num_snaps = 10
1818 self.snaps = <rados_snap_t*>realloc_chk(self.snaps,
1820 sizeof(rados_snap_t))
1823 ret = rados_ioctx_snap_list(ioctx.io, self.snaps, num_snaps)
1827 elif ret != -errno.ERANGE:
1828 raise make_ex(ret, "error calling rados_snap_list for \
1829 ioctx '%s'" % self.ioctx.name)
1830 num_snaps = num_snaps * 2
1838 Get the next Snapshot
1840 :raises: :class:`Error`, StopIteration
1841 :returns: Snap - next snapshot
1843 if self.cur_snap >= self.max_snap:
1847 rados_snap_t snap_id = self.snaps[self.cur_snap]
1853 name = <char *>realloc_chk(name, name_len)
1855 ret = rados_ioctx_snap_get_name(self.ioctx.io, snap_id, name, name_len)
1858 elif ret != -errno.ERANGE:
1859 raise make_ex(ret, "rados_snap_get_name error")
1861 name_len = name_len * 2
1863 snap = Snap(self.ioctx, decode_cstr(name[:name_len]).rstrip('\0'), snap_id)
1864 self.cur_snap = self.cur_snap + 1
1870 cdef class Snap(object):
1871 """Snapshot object"""
1872 cdef public Ioctx ioctx
1873 cdef public object name
1875 # NOTE(sileht): old API was storing the ctypes object
1876 # instead of the value ....
1877 cdef public rados_snap_t snap_id
1879 def __cinit__(self, Ioctx ioctx, object name, rados_snap_t snap_id):
1882 self.snap_id = snap_id
1885 return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \
1886 % (str(self.ioctx), self.name, self.snap_id)
1888 def get_timestamp(self):
1890 Find when a snapshot in the current pool occurred
1892 :raises: :class:`Error`
1893 :returns: datetime - the data and time the snapshot was created
1895 cdef time_t snap_time
1898 ret = rados_ioctx_snap_get_stamp(self.ioctx.io, self.snap_id, &snap_time)
1900 raise make_ex(ret, "rados_ioctx_snap_get_stamp error")
1901 return datetime.fromtimestamp(snap_time)
1904 cdef class Completion(object):
1905 """completion object"""
1913 rados_callback_t complete_cb
1914 rados_callback_t safe_cb
1915 rados_completion_t rados_comp
1918 def __cinit__(self, Ioctx ioctx, object oncomplete, object onsafe):
1919 self.oncomplete = oncomplete
1920 self.onsafe = onsafe
1925 Is an asynchronous operation safe?
1927 This does not imply that the safe callback has finished.
1929 :returns: True if the operation is safe
1931 return self.is_complete()
1933 def is_complete(self):
1935 Has an asynchronous operation completed?
1937 This does not imply that the safe callback has finished.
1939 :returns: True if the operation is completed
1942 ret = rados_aio_is_complete(self.rados_comp)
1945 def wait_for_safe(self):
1947 Wait for an asynchronous operation to be marked safe
1949 wait_for_safe() is an alias of wait_for_complete() since Luminous
1951 self.wait_for_complete()
1953 def wait_for_complete(self):
1955 Wait for an asynchronous operation to complete
1957 This does not imply that the complete callback has finished.
1960 rados_aio_wait_for_complete(self.rados_comp)
1962 def wait_for_safe_and_cb(self):
1964 Wait for an asynchronous operation to be marked safe and for
1965 the safe callback to have returned
1967 return self.wait_for_complete_and_cb()
1969 def wait_for_complete_and_cb(self):
1971 Wait for an asynchronous operation to complete and for the
1972 complete callback to have returned
1974 :returns: whether the operation is completed
1977 ret = rados_aio_wait_for_complete_and_cb(self.rados_comp)
1980 def get_return_value(self):
1982 Get the return value of an asychronous operation
1984 The return value is set when the operation is complete or safe,
1985 whichever comes first.
1987 :returns: int - return value of the operation
1990 ret = rados_aio_get_return_value(self.rados_comp)
1993 def __dealloc__(self):
1995 Release a completion
1997 Call this when you no longer need the completion. It may not be
1998 freed immediately if the operation is not acked and committed.
2000 ref.Py_XDECREF(self.buf)
2002 if self.rados_comp != NULL:
2004 rados_aio_release(self.rados_comp)
2005 self.rados_comp = NULL
2007 def _complete(self):
2008 self.oncomplete(self)
2014 with self.ioctx.lock:
2016 self.ioctx.complete_completions.remove(self)
2018 self.ioctx.safe_completions.remove(self)
2021 class OpCtx(object):
2022 def __enter__(self):
2023 return self.create()
2025 def __exit__(self, type, msg, traceback):
2029 cdef class WriteOp(object):
2030 cdef rados_write_op_t write_op
2034 self.write_op = rados_create_write_op()
2039 rados_release_write_op(self.write_op)
2041 @requires(('exclusive', opt(int)))
2042 def new(self, exclusive=None):
2048 int _exclusive = exclusive
2051 rados_write_op_create(self.write_op, _exclusive, NULL)
2059 rados_write_op_remove(self.write_op)
2061 @requires(('flags', int))
2062 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
2064 Set flags for the last operation added to this write_op.
2065 :para flags: flags to apply to the last operation
2073 rados_write_op_set_flags(self.write_op, _flags)
2075 @requires(('xattr_name', str_type), ('xattr_value', bytes))
2076 def set_xattr(self, xattr_name, xattr_value):
2078 Set an extended attribute on an object.
2079 :param xattr_name: name of the xattr
2080 :type xattr_name: str
2081 :param xattr_value: buffer to set xattr to
2082 :type xattr_value: bytes
2084 xattr_name = cstr(xattr_name, 'xattr_name')
2086 char *_xattr_name = xattr_name
2087 char *_xattr_value = xattr_value
2088 size_t _xattr_value_len = len(xattr_value)
2090 rados_write_op_setxattr(self.write_op, _xattr_name, _xattr_value, _xattr_value_len)
2092 @requires(('xattr_name', str_type))
2093 def rm_xattr(self, xattr_name):
2095 Removes an extended attribute on from an object.
2096 :param xattr_name: name of the xattr to remove
2097 :type xattr_name: str
2099 xattr_name = cstr(xattr_name, 'xattr_name')
2101 char *_xattr_name = xattr_name
2103 rados_write_op_rmxattr(self.write_op, _xattr_name)
2105 @requires(('to_write', bytes))
2106 def append(self, to_write):
2108 Append data to an object synchronously
2109 :param to_write: data to write
2110 :type to_write: bytes
2114 char *_to_write = to_write
2115 size_t length = len(to_write)
2118 rados_write_op_append(self.write_op, _to_write, length)
2120 @requires(('to_write', bytes))
2121 def write_full(self, to_write):
2123 Write whole object, atomically replacing it.
2124 :param to_write: data to write
2125 :type to_write: bytes
2129 char *_to_write = to_write
2130 size_t length = len(to_write)
2133 rados_write_op_write_full(self.write_op, _to_write, length)
2135 @requires(('to_write', bytes), ('offset', int))
2136 def write(self, to_write, offset=0):
2139 :param to_write: data to write
2140 :type to_write: bytes
2141 :param offset: byte offset in the object to begin writing at
2146 char *_to_write = to_write
2147 size_t length = len(to_write)
2148 uint64_t _offset = offset
2151 rados_write_op_write(self.write_op, _to_write, length, _offset)
2153 @requires(('version', int))
2154 def assert_version(self, version):
2156 Check if object's version is the expected one.
2157 :param version: expected version of the object
2161 uint64_t _version = version
2164 rados_write_op_assert_version(self.write_op, _version)
2166 @requires(('offset', int), ('length', int))
2167 def zero(self, offset, length):
2169 Zero part of an object.
2170 :param offset: byte offset in the object to begin writing at
2172 :param offset: number of zero to write
2177 size_t _length = length
2178 uint64_t _offset = offset
2181 rados_write_op_zero(self.write_op, _length, _offset)
2183 @requires(('offset', int))
2184 def truncate(self, offset):
2187 :param offset: byte offset in the object to begin truncating at
2192 uint64_t _offset = offset
2195 rados_write_op_truncate(self.write_op, _offset)
2197 @requires(('cls', str_type), ('method', str_type), ('data', bytes))
2198 def execute(self, cls, method, data):
2200 Execute an OSD class method on an object
2202 :param cls: name of the object class
2204 :param method: name of the method
2206 :param data: input data
2210 cls = cstr(cls, 'cls')
2211 method = cstr(method, 'method')
2214 char *_method = method
2216 size_t _data_len = len(data)
2219 rados_write_op_exec(self.write_op, _cls, _method, _data, _data_len, NULL)
2221 @requires(('to_write', bytes), ('write_len', int), ('offset', int))
2222 def writesame(self, to_write, write_len, offset=0):
2224 Write the same buffer multiple times
2225 :param to_write: data to write
2226 :type to_write: bytes
2227 :param write_len: total number of bytes to write
2229 :param offset: byte offset in the object to begin writing at
2233 char *_to_write = to_write
2234 size_t _data_len = len(to_write)
2235 size_t _write_len = write_len
2236 uint64_t _offset = offset
2238 rados_write_op_writesame(self.write_op, _to_write, _data_len, _write_len, _offset)
2240 class WriteOpCtx(WriteOp, OpCtx):
2241 """write operation context manager"""
2244 cdef class ReadOp(object):
2245 cdef rados_read_op_t read_op
2249 self.read_op = rados_create_read_op()
2254 rados_release_read_op(self.read_op)
2256 @requires(('flags', int))
2257 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
2259 Set flags for the last operation added to this read_op.
2260 :para flags: flags to apply to the last operation
2268 rados_read_op_set_flags(self.read_op, _flags)
2271 class ReadOpCtx(ReadOp, OpCtx):
2272 """read operation context manager"""
2275 cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
2277 Callback to oncomplete() for asynchronous operations
2279 cdef object cb = <object>args
2284 cdef class Ioctx(object):
2285 """rados.Ioctx object"""
2286 # NOTE(sileht): attributes declared in .pyd
2288 def __init__(self, name):
2292 self.locator_key = ""
2294 self.lock = threading.Lock()
2295 self.safe_completions = []
2296 self.complete_completions = []
2298 def __enter__(self):
2301 def __exit__(self, type_, value, traceback):
2305 def __dealloc__(self):
2308 def __track_completion(self, completion_obj):
2309 if completion_obj.oncomplete:
2311 self.complete_completions.append(completion_obj)
2312 if completion_obj.onsafe:
2314 self.safe_completions.append(completion_obj)
2316 def __get_completion(self, oncomplete, onsafe):
2318 Constructs a completion to use with asynchronous operations
2320 :param oncomplete: what to do when the write is safe and complete in memory
2322 :type oncomplete: completion
2323 :param onsafe: what to do when the write is safe and complete on storage
2325 :type onsafe: completion
2327 :raises: :class:`Error`
2328 :returns: completion object
2331 completion_obj = Completion(self, oncomplete, onsafe)
2334 rados_callback_t complete_cb = NULL
2335 rados_completion_t completion
2336 PyObject* p_completion_obj= <PyObject*>completion_obj
2339 complete_cb = <rados_callback_t>&__aio_complete_cb
2342 ret = rados_aio_create_completion2(p_completion_obj, complete_cb,
2345 raise make_ex(ret, "error getting a completion")
2347 completion_obj.rados_comp = completion
2348 return completion_obj
2350 @requires(('object_name', str_type), ('oncomplete', opt(Callable)))
2351 def aio_stat(self, object_name, oncomplete):
2353 Asynchronously get object stats (size/mtime)
2355 oncomplete will be called with the returned size and mtime
2356 as well as the completion:
2358 oncomplete(completion, size, mtime)
2360 :param object_name: the name of the object to get stats from
2361 :type object_name: str
2362 :param oncomplete: what to do when the stat is complete
2363 :type oncomplete: completion
2365 :raises: :class:`Error`
2366 :returns: completion object
2369 object_name = cstr(object_name, 'object_name')
2372 Completion completion
2373 char *_object_name = object_name
2377 def oncomplete_(completion_v):
2378 cdef Completion _completion_v = completion_v
2379 return_value = _completion_v.get_return_value()
2380 if return_value >= 0:
2381 return oncomplete(_completion_v, psize, time.localtime(pmtime))
2383 return oncomplete(_completion_v, None, None)
2385 completion = self.__get_completion(oncomplete_, None)
2386 self.__track_completion(completion)
2388 ret = rados_aio_stat(self.io, _object_name, completion.rados_comp,
2392 completion._cleanup()
2393 raise make_ex(ret, "error stating %s" % object_name)
2396 @requires(('object_name', str_type), ('to_write', bytes), ('offset', int),
2397 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2398 def aio_write(self, object_name, to_write, offset=0,
2399 oncomplete=None, onsafe=None):
2401 Write data to an object asynchronously
2403 Queues the write and returns.
2405 :param object_name: name of the object
2406 :type object_name: str
2407 :param to_write: data to write
2408 :type to_write: bytes
2409 :param offset: byte offset in the object to begin writing at
2411 :param oncomplete: what to do when the write is safe and complete in memory
2413 :type oncomplete: completion
2414 :param onsafe: what to do when the write is safe and complete on storage
2416 :type onsafe: completion
2418 :raises: :class:`Error`
2419 :returns: completion object
2422 object_name = cstr(object_name, 'object_name')
2425 Completion completion
2426 char* _object_name = object_name
2427 char* _to_write = to_write
2428 size_t size = len(to_write)
2429 uint64_t _offset = offset
2431 completion = self.__get_completion(oncomplete, onsafe)
2432 self.__track_completion(completion)
2434 ret = rados_aio_write(self.io, _object_name, completion.rados_comp,
2435 _to_write, size, _offset)
2437 completion._cleanup()
2438 raise make_ex(ret, "error writing object %s" % object_name)
2441 @requires(('object_name', str_type), ('to_write', bytes), ('oncomplete', opt(Callable)),
2442 ('onsafe', opt(Callable)))
2443 def aio_write_full(self, object_name, to_write,
2444 oncomplete=None, onsafe=None):
2446 Asynchronously write an entire object
2448 The object is filled with the provided data. If the object exists,
2449 it is atomically truncated and then written.
2450 Queues the write and returns.
2452 :param object_name: name of the object
2453 :type object_name: str
2454 :param to_write: data to write
2456 :param oncomplete: what to do when the write is safe and complete in memory
2458 :type oncomplete: completion
2459 :param onsafe: what to do when the write is safe and complete on storage
2461 :type onsafe: completion
2463 :raises: :class:`Error`
2464 :returns: completion object
2467 object_name = cstr(object_name, 'object_name')
2470 Completion completion
2471 char* _object_name = object_name
2472 char* _to_write = to_write
2473 size_t size = len(to_write)
2475 completion = self.__get_completion(oncomplete, onsafe)
2476 self.__track_completion(completion)
2478 ret = rados_aio_write_full(self.io, _object_name,
2479 completion.rados_comp,
2482 completion._cleanup()
2483 raise make_ex(ret, "error writing object %s" % object_name)
2486 @requires(('object_name', str_type), ('to_write', bytes), ('write_len', int),
2487 ('offset', int), ('oncomplete', opt(Callable)))
2488 def aio_writesame(self, object_name, to_write, write_len, offset=0,
2491 Asynchronously write the same buffer multiple times
2493 :param object_name: name of the object
2494 :type object_name: str
2495 :param to_write: data to write
2496 :type to_write: bytes
2497 :param write_len: total number of bytes to write
2498 :type write_len: int
2499 :param offset: byte offset in the object to begin writing at
2501 :param oncomplete: what to do when the writesame is safe and
2502 complete in memory on all replicas
2503 :type oncomplete: completion
2504 :raises: :class:`Error`
2505 :returns: completion object
2508 object_name = cstr(object_name, 'object_name')
2511 Completion completion
2512 char* _object_name = object_name
2513 char* _to_write = to_write
2514 size_t _data_len = len(to_write)
2515 size_t _write_len = write_len
2516 uint64_t _offset = offset
2518 completion = self.__get_completion(oncomplete, None)
2519 self.__track_completion(completion)
2521 ret = rados_aio_writesame(self.io, _object_name, completion.rados_comp,
2522 _to_write, _data_len, _write_len, _offset)
2525 completion._cleanup()
2526 raise make_ex(ret, "error writing object %s" % object_name)
2529 @requires(('object_name', str_type), ('to_append', bytes), ('oncomplete', opt(Callable)),
2530 ('onsafe', opt(Callable)))
2531 def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):
2533 Asynchronously append data to an object
2535 Queues the write and returns.
2537 :param object_name: name of the object
2538 :type object_name: str
2539 :param to_append: data to append
2540 :type to_append: str
2541 :param offset: byte offset in the object to begin writing at
2543 :param oncomplete: what to do when the write is safe and complete in memory
2545 :type oncomplete: completion
2546 :param onsafe: what to do when the write is safe and complete on storage
2548 :type onsafe: completion
2550 :raises: :class:`Error`
2551 :returns: completion object
2553 object_name = cstr(object_name, 'object_name')
2556 Completion completion
2557 char* _object_name = object_name
2558 char* _to_append = to_append
2559 size_t size = len(to_append)
2561 completion = self.__get_completion(oncomplete, onsafe)
2562 self.__track_completion(completion)
2564 ret = rados_aio_append(self.io, _object_name,
2565 completion.rados_comp,
2568 completion._cleanup()
2569 raise make_ex(ret, "error appending object %s" % object_name)
2572 def aio_flush(self):
2574 Block until all pending writes in an io context are safe
2576 :raises: :class:`Error`
2579 ret = rados_aio_flush(self.io)
2581 raise make_ex(ret, "error flushing")
2583 @requires(('object_name', str_type), ('length', int), ('offset', int),
2584 ('oncomplete', opt(Callable)))
2585 def aio_read(self, object_name, length, offset, oncomplete):
2587 Asynchronously read data from an object
2589 oncomplete will be called with the returned read value as
2590 well as the completion:
2592 oncomplete(completion, data_read)
2594 :param object_name: name of the object to read from
2595 :type object_name: str
2596 :param length: the number of bytes to read
2598 :param offset: byte offset in the object to begin reading from
2600 :param oncomplete: what to do when the read is complete
2601 :type oncomplete: completion
2603 :raises: :class:`Error`
2604 :returns: completion object
2607 object_name = cstr(object_name, 'object_name')
2610 Completion completion
2611 char* _object_name = object_name
2612 uint64_t _offset = offset
2615 size_t _length = length
2617 def oncomplete_(completion_v):
2618 cdef Completion _completion_v = completion_v
2619 return_value = _completion_v.get_return_value()
2620 if return_value > 0 and return_value != length:
2621 _PyBytes_Resize(&_completion_v.buf, return_value)
2622 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2624 completion = self.__get_completion(oncomplete_, None)
2625 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2626 ret_buf = PyBytes_AsString(completion.buf)
2627 self.__track_completion(completion)
2629 ret = rados_aio_read(self.io, _object_name, completion.rados_comp,
2630 ret_buf, _length, _offset)
2632 completion._cleanup()
2633 raise make_ex(ret, "error reading %s" % object_name)
2636 @requires(('object_name', str_type), ('cls', str_type), ('method', str_type),
2637 ('data', bytes), ('length', int),
2638 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2639 def aio_execute(self, object_name, cls, method, data,
2640 length=8192, oncomplete=None, onsafe=None):
2642 Asynchronously execute an OSD class method on an object.
2644 oncomplete and onsafe will be called with the data returned from
2645 the plugin as well as the completion:
2647 oncomplete(completion, data)
2648 onsafe(completion, data)
2650 :param object_name: name of the object
2651 :type object_name: str
2652 :param cls: name of the object class
2654 :param method: name of the method
2656 :param data: input data
2658 :param length: size of output buffer in bytes (default=8192)
2660 :param oncomplete: what to do when the execution is complete
2661 :type oncomplete: completion
2662 :param onsafe: what to do when the execution is safe and complete
2663 :type onsafe: completion
2665 :raises: :class:`Error`
2666 :returns: completion object
2669 object_name = cstr(object_name, 'object_name')
2670 cls = cstr(cls, 'cls')
2671 method = cstr(method, 'method')
2673 Completion completion
2674 char *_object_name = object_name
2676 char *_method = method
2678 size_t _data_len = len(data)
2681 size_t _length = length
2683 def oncomplete_(completion_v):
2684 cdef Completion _completion_v = completion_v
2685 return_value = _completion_v.get_return_value()
2686 if return_value > 0 and return_value != length:
2687 _PyBytes_Resize(&_completion_v.buf, return_value)
2688 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2690 def onsafe_(completion_v):
2691 cdef Completion _completion_v = completion_v
2692 return_value = _completion_v.get_return_value()
2693 return onsafe(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2695 completion = self.__get_completion(oncomplete_ if oncomplete else None, onsafe_ if onsafe else None)
2696 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2697 ret_buf = PyBytes_AsString(completion.buf)
2698 self.__track_completion(completion)
2700 ret = rados_aio_exec(self.io, _object_name, completion.rados_comp,
2701 _cls, _method, _data, _data_len, ret_buf, _length)
2703 completion._cleanup()
2704 raise make_ex(ret, "error executing %s::%s on %s" % (cls, method, object_name))
2707 @requires(('object_name', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2708 def aio_remove(self, object_name, oncomplete=None, onsafe=None):
2710 Asynchronously remove an object
2712 :param object_name: name of the object to remove
2713 :type object_name: str
2714 :param oncomplete: what to do when the remove is safe and complete in memory
2716 :type oncomplete: completion
2717 :param onsafe: what to do when the remove is safe and complete on storage
2719 :type onsafe: completion
2721 :raises: :class:`Error`
2722 :returns: completion object
2724 object_name = cstr(object_name, 'object_name')
2727 Completion completion
2728 char* _object_name = object_name
2730 completion = self.__get_completion(oncomplete, onsafe)
2731 self.__track_completion(completion)
2733 ret = rados_aio_remove(self.io, _object_name,
2734 completion.rados_comp)
2736 completion._cleanup()
2737 raise make_ex(ret, "error removing %s" % object_name)
2740 def require_ioctx_open(self):
2742 Checks if the rados.Ioctx object state is 'open'
2744 :raises: IoctxStateError
2746 if self.state != "open":
2747 raise IoctxStateError("The pool is %s" % self.state)
2749 @requires(('loc_key', str_type))
2750 def set_locator_key(self, loc_key):
2752 Set the key for mapping objects to pgs within an io context.
2754 The key is used instead of the object name to determine which
2755 placement groups an object is put in. This affects all subsequent
2756 operations of the io context - until a different locator key is
2757 set, all objects in this io context will be placed in the same pg.
2759 :param loc_key: the key to use as the object locator, or NULL to discard
2760 any previously set key
2763 :raises: :class:`TypeError`
2765 self.require_ioctx_open()
2766 cloc_key = cstr(loc_key, 'loc_key')
2767 cdef char *_loc_key = cloc_key
2769 rados_ioctx_locator_set_key(self.io, _loc_key)
2770 self.locator_key = loc_key
2772 def get_locator_key(self):
2774 Get the locator_key of context
2776 :returns: locator_key
2778 return self.locator_key
2780 @requires(('snap_id', long))
2781 def set_read(self, snap_id):
2783 Set the snapshot for reading objects.
2785 To stop to read from snapshot, use set_read(LIBRADOS_SNAP_HEAD)
2787 :param snap_id: the snapshot Id
2790 :raises: :class:`TypeError`
2792 self.require_ioctx_open()
2793 cdef rados_snap_t _snap_id = snap_id
2795 rados_ioctx_snap_set_read(self.io, _snap_id)
2797 @requires(('nspace', str_type))
2798 def set_namespace(self, nspace):
2800 Set the namespace for objects within an io context.
2802 The namespace in addition to the object name fully identifies
2803 an object. This affects all subsequent operations of the io context
2804 - until a different namespace is set, all objects in this io context
2805 will be placed in the same namespace.
2807 :param nspace: the namespace to use, or None/"" for the default namespace
2810 :raises: :class:`TypeError`
2812 self.require_ioctx_open()
2815 cnspace = cstr(nspace, 'nspace')
2816 cdef char *_nspace = cnspace
2818 rados_ioctx_set_namespace(self.io, _nspace)
2819 self.nspace = nspace
2821 def get_namespace(self):
2823 Get the namespace of context
2831 Close a rados.Ioctx object.
2833 This just tells librados that you no longer need to use the io context.
2834 It may not be freed immediately if there are pending asynchronous
2835 requests on it, but you should not use an io context again after
2836 calling this function on it.
2838 if self.state == "open":
2839 self.require_ioctx_open()
2841 rados_ioctx_destroy(self.io)
2842 self.state = "closed"
2845 @requires(('key', str_type), ('data', bytes))
2846 def write(self, key, data, offset=0):
2848 Write data to an object synchronously
2850 :param key: name of the object
2852 :param data: data to write
2854 :param offset: byte offset in the object to begin writing at
2857 :raises: :class:`TypeError`
2858 :raises: :class:`LogicError`
2859 :returns: int - 0 on success
2861 self.require_ioctx_open()
2863 key = cstr(key, 'key')
2867 size_t length = len(data)
2868 uint64_t _offset = offset
2871 ret = rados_write(self.io, _key, _data, length, _offset)
2875 raise make_ex(ret, "Ioctx.write(%s): failed to write %s"
2878 raise LogicError("Ioctx.write(%s): rados_write \
2879 returned %d, but should return zero on success." % (self.name, ret))
2881 @requires(('key', str_type), ('data', bytes))
2882 def write_full(self, key, data):
2884 Write an entire object synchronously.
2886 The object is filled with the provided data. If the object exists,
2887 it is atomically truncated and then written.
2889 :param key: name of the object
2891 :param data: data to write
2894 :raises: :class:`TypeError`
2895 :raises: :class:`Error`
2896 :returns: int - 0 on success
2898 self.require_ioctx_open()
2899 key = cstr(key, 'key')
2903 size_t length = len(data)
2906 ret = rados_write_full(self.io, _key, _data, length)
2910 raise make_ex(ret, "Ioctx.write_full(%s): failed to write %s"
2913 raise LogicError("Ioctx.write_full(%s): rados_write_full \
2914 returned %d, but should return zero on success." % (self.name, ret))
2916 @requires(('key', str_type), ('data', bytes), ('write_len', int), ('offset', int))
2917 def writesame(self, key, data, write_len, offset=0):
2919 Write the same buffer multiple times
2920 :param key: name of the object
2922 :param data: data to write
2924 :param write_len: total number of bytes to write
2925 :type write_len: int
2926 :param offset: byte offset in the object to begin writing at
2929 :raises: :class:`TypeError`
2930 :raises: :class:`LogicError`
2932 self.require_ioctx_open()
2934 key = cstr(key, 'key')
2938 size_t _data_len = len(data)
2939 size_t _write_len = write_len
2940 uint64_t _offset = offset
2943 ret = rados_writesame(self.io, _key, _data, _data_len, _write_len, _offset)
2945 raise make_ex(ret, "Ioctx.writesame(%s): failed to write %s"
2949 @requires(('key', str_type), ('data', bytes))
2950 def append(self, key, data):
2952 Append data to an object synchronously
2954 :param key: name of the object
2956 :param data: data to write
2959 :raises: :class:`TypeError`
2960 :raises: :class:`LogicError`
2961 :returns: int - 0 on success
2963 self.require_ioctx_open()
2964 key = cstr(key, 'key')
2968 size_t length = len(data)
2971 ret = rados_append(self.io, _key, _data, length)
2975 raise make_ex(ret, "Ioctx.append(%s): failed to append %s"
2978 raise LogicError("Ioctx.append(%s): rados_append \
2979 returned %d, but should return zero on success." % (self.name, ret))
2981 @requires(('key', str_type))
2982 def read(self, key, length=8192, offset=0):
2984 Read data from an object synchronously
2986 :param key: name of the object
2988 :param length: the number of bytes to read (default=8192)
2990 :param offset: byte offset in the object to begin reading at
2993 :raises: :class:`TypeError`
2994 :raises: :class:`Error`
2995 :returns: str - data read from object
2997 self.require_ioctx_open()
2998 key = cstr(key, 'key')
3002 uint64_t _offset = offset
3003 size_t _length = length
3004 PyObject* ret_s = NULL
3006 ret_s = PyBytes_FromStringAndSize(NULL, length)
3008 ret_buf = PyBytes_AsString(ret_s)
3010 ret = rados_read(self.io, _key, ret_buf, _length, _offset)
3012 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
3015 _PyBytes_Resize(&ret_s, ret)
3017 return <object>ret_s
3019 # We DECREF unconditionally: the cast to object above will have
3020 # INCREFed if necessary. This also takes care of exceptions,
3021 # including if _PyString_Resize fails (that will free the string
3022 # itself and set ret_s to NULL, hence XDECREF).
3023 ref.Py_XDECREF(ret_s)
3025 @requires(('key', str_type), ('cls', str_type), ('method', str_type), ('data', bytes))
3026 def execute(self, key, cls, method, data, length=8192):
3028 Execute an OSD class method on an object.
3030 :param key: name of the object
3032 :param cls: name of the object class
3034 :param method: name of the method
3036 :param data: input data
3038 :param length: size of output buffer in bytes (default=8192)
3041 :raises: :class:`TypeError`
3042 :raises: :class:`Error`
3043 :returns: (ret, method output)
3045 self.require_ioctx_open()
3047 key = cstr(key, 'key')
3048 cls = cstr(cls, 'cls')
3049 method = cstr(method, 'method')
3053 char *_method = method
3055 size_t _data_len = len(data)
3058 size_t _length = length
3059 PyObject* ret_s = NULL
3061 ret_s = PyBytes_FromStringAndSize(NULL, length)
3063 ret_buf = PyBytes_AsString(ret_s)
3065 ret = rados_exec(self.io, _key, _cls, _method, _data,
3066 _data_len, ret_buf, _length)
3068 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
3071 _PyBytes_Resize(&ret_s, ret)
3073 return ret, <object>ret_s
3075 # We DECREF unconditionally: the cast to object above will have
3076 # INCREFed if necessary. This also takes care of exceptions,
3077 # including if _PyString_Resize fails (that will free the string
3078 # itself and set ret_s to NULL, hence XDECREF).
3079 ref.Py_XDECREF(ret_s)
3081 def get_stats(self):
3083 Get pool usage statistics
3085 :returns: dict - contains the following keys:
3087 - ``num_bytes`` (int) - size of pool in bytes
3089 - ``num_kb`` (int) - size of pool in kbytes
3091 - ``num_objects`` (int) - number of objects in the pool
3093 - ``num_object_clones`` (int) - number of object clones
3095 - ``num_object_copies`` (int) - number of object copies
3097 - ``num_objects_missing_on_primary`` (int) - number of objets
3100 - ``num_objects_unfound`` (int) - number of unfound objects
3102 - ``num_objects_degraded`` (int) - number of degraded objects
3104 - ``num_rd`` (int) - bytes read
3106 - ``num_rd_kb`` (int) - kbytes read
3108 - ``num_wr`` (int) - bytes written
3110 - ``num_wr_kb`` (int) - kbytes written
3112 self.require_ioctx_open()
3113 cdef rados_pool_stat_t stats
3115 ret = rados_ioctx_pool_stat(self.io, &stats)
3117 raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name)
3118 return {'num_bytes': stats.num_bytes,
3119 'num_kb': stats.num_kb,
3120 'num_objects': stats.num_objects,
3121 'num_object_clones': stats.num_object_clones,
3122 'num_object_copies': stats.num_object_copies,
3123 "num_objects_missing_on_primary": stats.num_objects_missing_on_primary,
3124 "num_objects_unfound": stats.num_objects_unfound,
3125 "num_objects_degraded": stats.num_objects_degraded,
3126 "num_rd": stats.num_rd,
3127 "num_rd_kb": stats.num_rd_kb,
3128 "num_wr": stats.num_wr,
3129 "num_wr_kb": stats.num_wr_kb}
3131 @requires(('key', str_type))
3132 def remove_object(self, key):
3136 This does not delete any snapshots of the object.
3138 :param key: the name of the object to delete
3141 :raises: :class:`TypeError`
3142 :raises: :class:`Error`
3143 :returns: bool - True on success
3145 self.require_ioctx_open()
3146 key = cstr(key, 'key')
3151 ret = rados_remove(self.io, _key)
3153 raise make_ex(ret, "Failed to remove '%s'" % key)
3156 @requires(('key', str_type))
3157 def trunc(self, key, size):
3161 If this enlarges the object, the new area is logically filled with
3162 zeroes. If this shrinks the object, the excess data is removed.
3164 :param key: the name of the object to resize
3166 :param size: the new size of the object in bytes
3169 :raises: :class:`TypeError`
3170 :raises: :class:`Error`
3171 :returns: int - 0 on success, otherwise raises error
3174 self.require_ioctx_open()
3175 key = cstr(key, 'key')
3178 uint64_t _size = size
3181 ret = rados_trunc(self.io, _key, _size)
3183 raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))
3186 @requires(('key', str_type))
3187 def stat(self, key):
3189 Get object stats (size/mtime)
3191 :param key: the name of the object to get stats from
3194 :raises: :class:`TypeError`
3195 :raises: :class:`Error`
3196 :returns: (size,timestamp)
3198 self.require_ioctx_open()
3200 key = cstr(key, 'key')
3207 ret = rados_stat(self.io, _key, &psize, &pmtime)
3209 raise make_ex(ret, "Failed to stat %r" % key)
3210 return psize, time.localtime(pmtime)
3212 @requires(('key', str_type), ('xattr_name', str_type))
3213 def get_xattr(self, key, xattr_name):
3215 Get the value of an extended attribute on an object.
3217 :param key: the name of the object to get xattr from
3219 :param xattr_name: which extended attribute to read
3220 :type xattr_name: str
3222 :raises: :class:`TypeError`
3223 :raises: :class:`Error`
3224 :returns: str - value of the xattr
3226 self.require_ioctx_open()
3228 key = cstr(key, 'key')
3229 xattr_name = cstr(xattr_name, 'xattr_name')
3232 char *_xattr_name = xattr_name
3233 size_t ret_length = 4096
3234 char *ret_buf = NULL
3237 while ret_length < 4096 * 1024 * 1024:
3238 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
3240 ret = rados_getxattr(self.io, _key, _xattr_name, ret_buf, ret_length)
3241 if ret == -errno.ERANGE:
3244 raise make_ex(ret, "Failed to get xattr %r" % xattr_name)
3247 return ret_buf[:ret]
3251 @requires(('oid', str_type))
3252 def get_xattrs(self, oid):
3254 Start iterating over xattrs on an object.
3256 :param oid: the name of the object to get xattrs from
3259 :raises: :class:`TypeError`
3260 :raises: :class:`Error`
3261 :returns: XattrIterator
3263 self.require_ioctx_open()
3264 return XattrIterator(self, oid)
3266 @requires(('key', str_type), ('xattr_name', str_type), ('xattr_value', bytes))
3267 def set_xattr(self, key, xattr_name, xattr_value):
3269 Set an extended attribute on an object.
3271 :param key: the name of the object to set xattr to
3273 :param xattr_name: which extended attribute to set
3274 :type xattr_name: str
3275 :param xattr_value: the value of the extended attribute
3276 :type xattr_value: bytes
3278 :raises: :class:`TypeError`
3279 :raises: :class:`Error`
3280 :returns: bool - True on success, otherwise raise an error
3282 self.require_ioctx_open()
3284 key = cstr(key, 'key')
3285 xattr_name = cstr(xattr_name, 'xattr_name')
3288 char *_xattr_name = xattr_name
3289 char *_xattr_value = xattr_value
3290 size_t _xattr_value_len = len(xattr_value)
3293 ret = rados_setxattr(self.io, _key, _xattr_name,
3294 _xattr_value, _xattr_value_len)
3296 raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
3299 @requires(('key', str_type), ('xattr_name', str_type))
3300 def rm_xattr(self, key, xattr_name):
3302 Removes an extended attribute on from an object.
3304 :param key: the name of the object to remove xattr from
3306 :param xattr_name: which extended attribute to remove
3307 :type xattr_name: str
3309 :raises: :class:`TypeError`
3310 :raises: :class:`Error`
3311 :returns: bool - True on success, otherwise raise an error
3313 self.require_ioctx_open()
3315 key = cstr(key, 'key')
3316 xattr_name = cstr(xattr_name, 'xattr_name')
3319 char *_xattr_name = xattr_name
3322 ret = rados_rmxattr(self.io, _key, _xattr_name)
3324 raise make_ex(ret, "Failed to delete key %r xattr %r" %
3328 @requires(('obj', str_type), ('msg', str_type), ('timeout_ms', int))
3329 def notify(self, obj, msg='', timeout_ms=5000):
3331 Send a rados notification to an object.
3333 :param obj: the name of the object to notify
3335 :param msg: optional message to send in the notification
3337 :param timeout_ms: notify timeout (in ms)
3338 :type timeout_ms: int
3340 :raises: :class:`TypeError`
3341 :raises: :class:`Error`
3342 :returns: bool - True on success, otherwise raise an error
3344 self.require_ioctx_open()
3347 obj = cstr(obj, 'obj')
3348 msg = cstr(msg, 'msg')
3352 int _msglen = msglen
3353 uint64_t _timeout_ms = timeout_ms
3356 ret = rados_notify2(self.io, _obj, _msg, _msglen, _timeout_ms,
3359 raise make_ex(ret, "Failed to notify %r" % (obj))
3362 def list_objects(self):
3364 Get ObjectIterator on rados.Ioctx object.
3366 :returns: ObjectIterator
3368 self.require_ioctx_open()
3369 return ObjectIterator(self)
3371 def list_snaps(self):
3373 Get SnapIterator on rados.Ioctx object.
3375 :returns: SnapIterator
3377 self.require_ioctx_open()
3378 return SnapIterator(self)
3380 def get_pool_id(self):
3384 :returns: int - pool id
3387 ret = rados_ioctx_get_id(self.io)
3390 def get_pool_name(self):
3394 :returns: str - pool name
3402 name = <char *>realloc_chk(name, name_len)
3404 ret = rados_ioctx_get_pool_name(self.io, name, name_len)
3407 elif ret != -errno.ERANGE:
3408 raise make_ex(ret, "get pool name error")
3410 name_len = name_len * 2
3412 return decode_cstr(name)
3417 @requires(('snap_name', str_type))
3418 def create_snap(self, snap_name):
3420 Create a pool-wide snapshot
3422 :param snap_name: the name of the snapshot
3423 :type snap_name: str
3425 :raises: :class:`TypeError`
3426 :raises: :class:`Error`
3428 self.require_ioctx_open()
3429 snap_name = cstr(snap_name, 'snap_name')
3430 cdef char *_snap_name = snap_name
3433 ret = rados_ioctx_snap_create(self.io, _snap_name)
3435 raise make_ex(ret, "Failed to create snap %s" % snap_name)
3437 @requires(('snap_name', str_type))
3438 def remove_snap(self, snap_name):
3440 Removes a pool-wide snapshot
3442 :param snap_name: the name of the snapshot
3443 :type snap_name: str
3445 :raises: :class:`TypeError`
3446 :raises: :class:`Error`
3448 self.require_ioctx_open()
3449 snap_name = cstr(snap_name, 'snap_name')
3450 cdef char *_snap_name = snap_name
3453 ret = rados_ioctx_snap_remove(self.io, _snap_name)
3455 raise make_ex(ret, "Failed to remove snap %s" % snap_name)
3457 @requires(('snap_name', str_type))
3458 def lookup_snap(self, snap_name):
3460 Get the id of a pool snapshot
3462 :param snap_name: the name of the snapshot to lookop
3463 :type snap_name: str
3465 :raises: :class:`TypeError`
3466 :raises: :class:`Error`
3467 :returns: Snap - on success
3469 self.require_ioctx_open()
3470 csnap_name = cstr(snap_name, 'snap_name')
3472 char *_snap_name = csnap_name
3473 rados_snap_t snap_id
3476 ret = rados_ioctx_snap_lookup(self.io, _snap_name, &snap_id)
3478 raise make_ex(ret, "Failed to lookup snap %s" % snap_name)
3479 return Snap(self, snap_name, int(snap_id))
3481 @requires(('oid', str_type), ('snap_name', str_type))
3482 def snap_rollback(self, oid, snap_name):
3484 Rollback an object to a snapshot
3486 :param oid: the name of the object
3488 :param snap_name: the name of the snapshot
3489 :type snap_name: str
3491 :raises: :class:`TypeError`
3492 :raises: :class:`Error`
3494 self.require_ioctx_open()
3495 oid = cstr(oid, 'oid')
3496 snap_name = cstr(snap_name, 'snap_name')
3498 char *_snap_name = snap_name
3502 ret = rados_ioctx_snap_rollback(self.io, _oid, _snap_name)
3504 raise make_ex(ret, "Failed to rollback %s" % oid)
3506 def create_self_managed_snap(self):
3508 Creates a self-managed snapshot
3510 :returns: snap id on success
3512 :raises: :class:`Error`
3514 self.require_ioctx_open()
3516 rados_snap_t _snap_id
3518 ret = rados_ioctx_selfmanaged_snap_create(self.io, &_snap_id)
3520 raise make_ex(ret, "Failed to create self-managed snapshot")
3521 return int(_snap_id)
3523 @requires(('snap_id', int))
3524 def remove_self_managed_snap(self, snap_id):
3526 Removes a self-managed snapshot
3528 :param snap_id: the name of the snapshot
3531 :raises: :class:`TypeError`
3532 :raises: :class:`Error`
3534 self.require_ioctx_open()
3536 rados_snap_t _snap_id = snap_id
3538 ret = rados_ioctx_selfmanaged_snap_remove(self.io, _snap_id)
3540 raise make_ex(ret, "Failed to remove self-managed snapshot")
3542 def set_self_managed_snap_write(self, snaps):
3544 Updates the write context to the specified self-managed
3547 :param snaps: all associated self-managed snapshot ids
3550 :raises: :class:`TypeError`
3551 :raises: :class:`Error`
3553 self.require_ioctx_open()
3557 sorted_snaps = sorted([int(x) for x in snaps], reverse=True)
3558 snap_seq = sorted_snaps[0]
3561 rados_snap_t _snap_seq = snap_seq
3562 rados_snap_t *_snaps = NULL
3563 int _num_snaps = len(sorted_snaps)
3565 _snaps = <rados_snap_t *>malloc(_num_snaps * sizeof(rados_snap_t))
3566 for i in range(len(sorted_snaps)):
3567 _snaps[i] = sorted_snaps[i]
3569 ret = rados_ioctx_selfmanaged_snap_set_write_ctx(self.io,
3574 raise make_ex(ret, "Failed to update snapshot write context")
3578 @requires(('oid', str_type), ('snap_id', int))
3579 def rollback_self_managed_snap(self, oid, snap_id):
3581 Rolls an specific object back to a self-managed snapshot revision
3583 :param oid: the name of the object
3585 :param snap_id: the name of the snapshot
3588 :raises: :class:`TypeError`
3589 :raises: :class:`Error`
3591 self.require_ioctx_open()
3592 oid = cstr(oid, 'oid')
3595 rados_snap_t _snap_id = snap_id
3597 ret = rados_ioctx_selfmanaged_snap_rollback(self.io, _oid, _snap_id)
3599 raise make_ex(ret, "Failed to rollback %s" % oid)
3601 def get_last_version(self):
3603 Return the version of the last object read or written to.
3605 This exposes the internal version number of the last object read or
3606 written via this io context
3608 :returns: version of the last object used
3610 self.require_ioctx_open()
3612 ret = rados_get_last_version(self.io)
3615 def create_write_op(self):
3617 create write operation object.
3618 need call release_write_op after use
3620 return WriteOp().create()
3622 def create_read_op(self):
3624 create read operation object.
3625 need call release_read_op after use
3627 return ReadOp().create()
3629 def release_write_op(self, write_op):
3631 release memory alloc by create_write_op
3635 def release_read_op(self, read_op):
3637 release memory alloc by create_read_op
3638 :para read_op: read_op object
3643 @requires(('write_op', WriteOp), ('keys', tuple), ('values', tuple))
3644 def set_omap(self, write_op, keys, values):
3646 set keys values to write_op
3647 :para write_op: write_operation object
3648 :type write_op: WriteOp
3649 :para keys: a tuple of keys
3651 :para values: a tuple of values
3655 if len(keys) != len(values):
3656 raise Error("Rados(): keys and values must have the same number of items")
3658 keys = cstr_list(keys, 'keys')
3659 values = cstr_list(values, 'values')
3660 lens = [len(v) for v in values]
3662 WriteOp _write_op = write_op
3663 size_t key_num = len(keys)
3664 char **_keys = to_bytes_array(keys)
3665 char **_values = to_bytes_array(values)
3666 size_t *_lens = to_csize_t_array(lens)
3670 rados_write_op_omap_set(_write_op.write_op,
3671 <const char**>_keys,
3672 <const char**>_values,
3673 <const size_t*>_lens, key_num)
3679 @requires(('write_op', WriteOp), ('oid', str_type), ('mtime', opt(int)), ('flags', opt(int)))
3680 def operate_write_op(self, write_op, oid, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3682 execute the real write operation
3683 :para write_op: write operation object
3684 :type write_op: WriteOp
3685 :para oid: object name
3687 :para mtime: the time to set the mtime to, 0 for the current time
3689 :para flags: flags to apply to the entire operation
3693 oid = cstr(oid, 'oid')
3695 WriteOp _write_op = write_op
3697 time_t _mtime = mtime
3701 ret = rados_write_op_operate(_write_op.write_op, self.io, _oid, &_mtime, _flags)
3703 raise make_ex(ret, "Failed to operate write op for oid %s" % oid)
3705 @requires(('write_op', WriteOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('mtime', opt(int)), ('flags', opt(int)))
3706 def operate_aio_write_op(self, write_op, oid, oncomplete=None, onsafe=None, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3708 execute the real write operation asynchronously
3709 :para write_op: write operation object
3710 :type write_op: WriteOp
3711 :para oid: object name
3713 :param oncomplete: what to do when the remove is safe and complete in memory
3715 :type oncomplete: completion
3716 :param onsafe: what to do when the remove is safe and complete on storage
3718 :type onsafe: completion
3719 :para mtime: the time to set the mtime to, 0 for the current time
3721 :para flags: flags to apply to the entire operation
3724 :raises: :class:`Error`
3725 :returns: completion object
3728 oid = cstr(oid, 'oid')
3730 WriteOp _write_op = write_op
3732 Completion completion
3733 time_t _mtime = mtime
3736 completion = self.__get_completion(oncomplete, onsafe)
3737 self.__track_completion(completion)
3740 ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid,
3743 completion._cleanup()
3744 raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid)
3747 @requires(('read_op', ReadOp), ('oid', str_type), ('flag', opt(int)))
3748 def operate_read_op(self, read_op, oid, flag=LIBRADOS_OPERATION_NOFLAG):
3750 execute the real read operation
3751 :para read_op: read operation object
3752 :type read_op: ReadOp
3753 :para oid: object name
3755 :para flag: flags to apply to the entire operation
3758 oid = cstr(oid, 'oid')
3760 ReadOp _read_op = read_op
3765 ret = rados_read_op_operate(_read_op.read_op, self.io, _oid, _flag)
3767 raise make_ex(ret, "Failed to operate read op for oid %s" % oid)
3769 @requires(('read_op', ReadOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('flag', opt(int)))
3770 def operate_aio_read_op(self, read_op, oid, oncomplete=None, onsafe=None, flag=LIBRADOS_OPERATION_NOFLAG):
3772 execute the real read operation
3773 :para read_op: read operation object
3774 :type read_op: ReadOp
3775 :para oid: object name
3777 :param oncomplete: what to do when the remove is safe and complete in memory
3779 :type oncomplete: completion
3780 :param onsafe: what to do when the remove is safe and complete on storage
3782 :type onsafe: completion
3783 :para flag: flags to apply to the entire operation
3786 oid = cstr(oid, 'oid')
3788 ReadOp _read_op = read_op
3790 Completion completion
3793 completion = self.__get_completion(oncomplete, onsafe)
3794 self.__track_completion(completion)
3797 ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag)
3799 completion._cleanup()
3800 raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid)
3803 @requires(('read_op', ReadOp), ('start_after', str_type), ('filter_prefix', str_type), ('max_return', int))
3804 def get_omap_vals(self, read_op, start_after, filter_prefix, max_return):
3807 :para read_op: read operation object
3808 :type read_op: ReadOp
3809 :para start_after: list keys starting after start_after
3810 :type start_after: str
3811 :para filter_prefix: list only keys beginning with filter_prefix
3812 :type filter_prefix: str
3813 :para max_return: list no more than max_return key/value pairs
3814 :type max_return: int
3815 :returns: an iterator over the requested omap values, return value from this action
3818 start_after = cstr(start_after, 'start_after') if start_after else None
3819 filter_prefix = cstr(filter_prefix, 'filter_prefix') if filter_prefix else None
3821 char *_start_after = opt_str(start_after)
3822 char *_filter_prefix = opt_str(filter_prefix)
3823 ReadOp _read_op = read_op
3824 rados_omap_iter_t iter_addr = NULL
3825 int _max_return = max_return
3828 rados_read_op_omap_get_vals2(_read_op.read_op, _start_after, _filter_prefix,
3829 _max_return, &iter_addr, NULL, NULL)
3830 it = OmapIterator(self)
3832 return it, 0 # 0 is meaningless; there for backward-compat
3834 @requires(('read_op', ReadOp), ('start_after', str_type), ('max_return', int))
3835 def get_omap_keys(self, read_op, start_after, max_return):
3838 :para read_op: read operation object
3839 :type read_op: ReadOp
3840 :para start_after: list keys starting after start_after
3841 :type start_after: str
3842 :para max_return: list no more than max_return key/value pairs
3843 :type max_return: int
3844 :returns: an iterator over the requested omap values, return value from this action
3846 start_after = cstr(start_after, 'start_after') if start_after else None
3848 char *_start_after = opt_str(start_after)
3849 ReadOp _read_op = read_op
3850 rados_omap_iter_t iter_addr = NULL
3851 int _max_return = max_return
3854 rados_read_op_omap_get_keys2(_read_op.read_op, _start_after,
3855 _max_return, &iter_addr, NULL, NULL)
3856 it = OmapIterator(self)
3858 return it, 0 # 0 is meaningless; there for backward-compat
3860 @requires(('read_op', ReadOp), ('keys', tuple))
3861 def get_omap_vals_by_keys(self, read_op, keys):
3863 get the omap values by keys
3864 :para read_op: read operation object
3865 :type read_op: ReadOp
3866 :para keys: input key tuple
3868 :returns: an iterator over the requested omap values, return value from this action
3870 keys = cstr_list(keys, 'keys')
3872 ReadOp _read_op = read_op
3873 rados_omap_iter_t iter_addr
3874 char **_keys = to_bytes_array(keys)
3875 size_t key_num = len(keys)
3879 rados_read_op_omap_get_vals_by_keys(_read_op.read_op,
3880 <const char**>_keys,
3881 key_num, &iter_addr, NULL)
3882 it = OmapIterator(self)
3884 return it, 0 # 0 is meaningless; there for backward-compat
3888 @requires(('write_op', WriteOp), ('keys', tuple))
3889 def remove_omap_keys(self, write_op, keys):
3891 remove omap keys specifiled
3892 :para write_op: write operation object
3893 :type write_op: WriteOp
3894 :para keys: input key tuple
3898 keys = cstr_list(keys, 'keys')
3900 WriteOp _write_op = write_op
3901 size_t key_num = len(keys)
3902 char **_keys = to_bytes_array(keys)
3906 rados_write_op_omap_rm_keys(_write_op.write_op, <const char**>_keys, key_num)
3910 @requires(('write_op', WriteOp))
3911 def clear_omap(self, write_op):
3913 Remove all key/value pairs from an object
3914 :para write_op: write operation object
3915 :type write_op: WriteOp
3919 WriteOp _write_op = write_op
3922 rados_write_op_omap_clear(_write_op.write_op)
3924 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('desc', str_type),
3925 ('duration', opt(int)), ('flags', int))
3926 def lock_exclusive(self, key, name, cookie, desc="", duration=None, flags=0):
3929 Take an exclusive lock on an object
3931 :param key: name of the object
3933 :param name: name of the lock
3935 :param cookie: cookie of the lock
3937 :param desc: description of the lock
3939 :param duration: duration of the lock in seconds
3944 :raises: :class:`TypeError`
3945 :raises: :class:`Error`
3947 self.require_ioctx_open()
3949 key = cstr(key, 'key')
3950 name = cstr(name, 'name')
3951 cookie = cstr(cookie, 'cookie')
3952 desc = cstr(desc, 'desc')
3957 char* _cookie = cookie
3959 uint8_t _flags = flags
3962 if duration is None:
3964 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3967 _duration.tv_sec = duration
3969 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3973 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3975 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('tag', str_type),
3976 ('desc', str_type), ('duration', opt(int)), ('flags', int))
3977 def lock_shared(self, key, name, cookie, tag, desc="", duration=None, flags=0):
3980 Take a shared lock on an object
3982 :param key: name of the object
3984 :param name: name of the lock
3986 :param cookie: cookie of the lock
3988 :param tag: tag of the lock
3990 :param desc: description of the lock
3992 :param duration: duration of the lock in seconds
3997 :raises: :class:`TypeError`
3998 :raises: :class:`Error`
4000 self.require_ioctx_open()
4002 key = cstr(key, 'key')
4003 tag = cstr(tag, 'tag')
4004 name = cstr(name, 'name')
4005 cookie = cstr(cookie, 'cookie')
4006 desc = cstr(desc, 'desc')
4012 char* _cookie = cookie
4014 uint8_t _flags = flags
4017 if duration is None:
4019 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
4022 _duration.tv_sec = duration
4024 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
4027 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
4029 @requires(('key', str_type), ('name', str_type), ('cookie', str_type))
4030 def unlock(self, key, name, cookie):
4033 Release a shared or exclusive lock on an object
4035 :param key: name of the object
4037 :param name: name of the lock
4039 :param cookie: cookie of the lock
4042 :raises: :class:`TypeError`
4043 :raises: :class:`Error`
4045 self.require_ioctx_open()
4047 key = cstr(key, 'key')
4048 name = cstr(name, 'name')
4049 cookie = cstr(cookie, 'cookie')
4054 char* _cookie = cookie
4057 ret = rados_unlock(self.io, _key, _name, _cookie)
4059 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
4061 def set_osdmap_full_try(self):
4063 Set global osdmap_full_try label to true
4066 rados_set_osdmap_full_try(self.io)
4068 def unset_osdmap_full_try(self):
4073 rados_unset_osdmap_full_try(self.io)
4075 def application_enable(self, app_name, force=False):
4077 Enable an application on an OSD pool
4079 :param app_name: application name
4081 :param force: False if only a single app should exist per pool
4082 :type expire_seconds: boool
4084 :raises: :class:`Error`
4086 app_name = cstr(app_name, 'app_name')
4088 char *_app_name = app_name
4089 int _force = (1 if force else 0)
4092 ret = rados_application_enable(self.io, _app_name, _force)
4094 raise make_ex(ret, "error enabling application")
4096 def application_list(self):
4098 Returns a list of enabled applications
4100 :returns: list of app name string
4108 apps = <char *>realloc_chk(apps, length)
4110 ret = rados_application_list(self.io, apps, &length)
4112 return [decode_cstr(app) for app in
4113 apps[:length].split(b'\0') if app]
4114 elif ret == -errno.ENOENT:
4116 elif ret == -errno.ERANGE:
4119 raise make_ex(ret, "error listing applications")
4123 def application_metadata_get(self, app_name, key):
4125 Gets application metadata on an OSD pool for the given key
4127 :param app_name: application name
4129 :param key: metadata key
4131 :returns: str - metadata value
4133 :raises: :class:`Error`
4136 app_name = cstr(app_name, 'app_name')
4137 key = cstr(key, 'key')
4139 char *_app_name = app_name
4146 value = <char *>realloc_chk(value, size)
4148 ret = rados_application_metadata_get(self.io, _app_name,
4150 if ret != -errno.ERANGE:
4152 if ret == -errno.ENOENT:
4153 raise KeyError('no metadata %s for application %s' % (key, _app_name))
4155 raise make_ex(ret, 'error getting metadata %s for application %s' %
4157 return decode_cstr(value)
4161 def application_metadata_set(self, app_name, key, value):
4163 Sets application metadata on an OSD pool
4165 :param app_name: application name
4167 :param key: metadata key
4169 :param value: metadata value
4172 :raises: :class:`Error`
4174 app_name = cstr(app_name, 'app_name')
4175 key = cstr(key, 'key')
4176 value = cstr(value, 'value')
4178 char *_app_name = app_name
4180 char *_value = value
4183 ret = rados_application_metadata_set(self.io, _app_name, _key,
4186 raise make_ex(ret, "error setting application metadata")
4188 def application_metadata_remove(self, app_name, key):
4190 Remove application metadata from an OSD pool
4192 :param app_name: application name
4194 :param key: metadata key
4197 :raises: :class:`Error`
4199 app_name = cstr(app_name, 'app_name')
4200 key = cstr(key, 'key')
4202 char *_app_name = app_name
4206 ret = rados_application_metadata_remove(self.io, _app_name, _key)
4208 raise make_ex(ret, "error removing application metadata")
4210 def application_metadata_list(self, app_name):
4212 Returns a list of enabled applications
4214 :param app_name: application name
4216 :returns: list of key/value tuples
4218 app_name = cstr(app_name, 'app_name')
4220 char *_app_name = app_name
4221 size_t key_length = 128
4222 size_t val_length = 128
4228 c_keys = <char *>realloc_chk(c_keys, key_length)
4229 c_vals = <char *>realloc_chk(c_vals, val_length)
4231 ret = rados_application_metadata_list(self.io, _app_name,
4232 c_keys, &key_length,
4233 c_vals, &val_length)
4235 keys = [decode_cstr(key) for key in
4236 c_keys[:key_length].split(b'\0')]
4237 vals = [decode_cstr(val) for val in
4238 c_vals[:val_length].split(b'\0')]
4239 return list(zip(keys, vals))[:-1]
4240 elif ret == -errno.ERANGE:
4243 raise make_ex(ret, "error listing application metadata")
4248 def alignment(self):
4250 Returns pool alignment
4253 Number of alignment bytes required by the current pool, or None if
4254 alignment is not required.
4261 ret = rados_ioctx_pool_requires_alignment2(self.io, &requires)
4263 raise make_ex(ret, "error checking alignment")
4268 ret = rados_ioctx_pool_required_alignment2(self.io, &_alignment)
4270 raise make_ex(ret, "error querying alignment")
4271 alignment = _alignment
4275 def set_object_locator(func):
4276 def retfunc(self, *args, **kwargs):
4277 if self.locator_key is not None:
4278 old_locator = self.ioctx.get_locator_key()
4279 self.ioctx.set_locator_key(self.locator_key)
4280 retval = func(self, *args, **kwargs)
4281 self.ioctx.set_locator_key(old_locator)
4284 return func(self, *args, **kwargs)
4288 def set_object_namespace(func):
4289 def retfunc(self, *args, **kwargs):
4290 if self.nspace is None:
4291 raise LogicError("Namespace not set properly in context")
4292 old_nspace = self.ioctx.get_namespace()
4293 self.ioctx.set_namespace(self.nspace)
4294 retval = func(self, *args, **kwargs)
4295 self.ioctx.set_namespace(old_nspace)
4300 class Object(object):
4301 """Rados object wrapper, makes the object look like a file"""
4302 def __init__(self, ioctx, key, locator_key=None, nspace=None):
4306 self.state = "exists"
4307 self.locator_key = locator_key
4308 self.nspace = "" if nspace is None else nspace
4311 return "rados.Object(ioctx=%s,key=%s,nspace=%s,locator=%s)" % \
4312 (str(self.ioctx), self.key, "--default--"
4313 if self.nspace is "" else self.nspace, self.locator_key)
4315 def require_object_exists(self):
4316 if self.state != "exists":
4317 raise ObjectStateError("The object is %s" % self.state)
4320 @set_object_namespace
4321 def read(self, length=1024 * 1024):
4322 self.require_object_exists()
4323 ret = self.ioctx.read(self.key, length, self.offset)
4324 self.offset += len(ret)
4328 @set_object_namespace
4329 def write(self, string_to_write):
4330 self.require_object_exists()
4331 ret = self.ioctx.write(self.key, string_to_write, self.offset)
4333 self.offset += len(string_to_write)
4337 @set_object_namespace
4339 self.require_object_exists()
4340 self.ioctx.remove_object(self.key)
4341 self.state = "removed"
4344 @set_object_namespace
4346 self.require_object_exists()
4347 return self.ioctx.stat(self.key)
4349 def seek(self, position):
4350 self.require_object_exists()
4351 self.offset = position
4354 @set_object_namespace
4355 def get_xattr(self, xattr_name):
4356 self.require_object_exists()
4357 return self.ioctx.get_xattr(self.key, xattr_name)
4360 @set_object_namespace
4361 def get_xattrs(self):
4362 self.require_object_exists()
4363 return self.ioctx.get_xattrs(self.key)
4366 @set_object_namespace
4367 def set_xattr(self, xattr_name, xattr_value):
4368 self.require_object_exists()
4369 return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)
4372 @set_object_namespace
4373 def rm_xattr(self, xattr_name):
4374 self.require_object_exists()
4375 return self.ioctx.rm_xattr(self.key, xattr_name)
4386 class MonitorLog(object):
4387 # NOTE(sileht): Keep this class for backward compat
4388 # method moved to Rados.monitor_log()
4390 For watching cluster log messages. Instantiate an object and keep
4391 it around while callback is periodically called. Construct with
4392 'level' to monitor 'level' messages (one of MONITOR_LEVELS).
4393 arg will be passed to the callback.
4395 callback will be called with:
4396 arg (given to __init__)
4397 line (the full line, including timestamp, who, level, msg)
4398 who (which entity issued the log message)
4399 timestamp_sec (sec of a struct timespec)
4400 timestamp_nsec (sec of a struct timespec)
4401 seq (sequence number)
4402 level (string representing the level of the log message)
4403 msg (the message itself)
4404 callback's return value is ignored
4406 def __init__(self, cluster, level, callback, arg):
4408 self.callback = callback
4410 self.cluster = cluster
4411 self.cluster.monitor_log(level, callback, arg)