]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/rados/rados.pyx
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / pybind / rados / rados.pyx
1 # cython: embedsignature=True
2 """
3 This module is a thin wrapper around librados.
4
5 Error codes from librados are turned into exceptions that subclass
6 :class:`Error`. Almost all methods may raise :class:`Error(the base class of all rados exceptions), :class:`PermissionError`
7 (the base class of all rados exceptions), :class:`PermissionError`
8 and :class:`IOError`, in addition to those documented for the
9 method.
10 """
11 # Copyright 2011 Josh Durgin
12 # Copyright 2011, Hannu Valtonen <hannu.valtonen@ormod.com>
13 # Copyright 2015 Hector Martin <marcan@marcan.st>
14 # Copyright 2016 Mehdi Abaakouk <sileht@redhat.com>
15
16 from cpython cimport PyObject, ref
17 from cpython.pycapsule cimport *
18 from libc cimport errno
19 from libc.stdint cimport *
20 from libc.stdlib cimport malloc, realloc, free
21
22 import sys
23 import threading
24 import time
25
26 from collections import Callable
27 from datetime import datetime
28 from functools import partial, wraps
29 from itertools import chain
30
31 # Are we running Python 2.x
32 if sys.version_info[0] < 3:
33 str_type = basestring
34 else:
35 str_type = str
36
37
38 cdef extern from "Python.h":
39 # These are in cpython/string.pxd, but use "object" types instead of
40 # PyObject*, which invokes assumptions in cpython that we need to
41 # legitimately break to implement zero-copy string buffers in Ioctx.read().
42 # This is valid use of the Python API and documented as a special case.
43 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
44 char* PyBytes_AsString(PyObject *string) except NULL
45 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
46 void PyEval_InitThreads()
47
48
49 cdef extern from "time.h":
50 ctypedef long int time_t
51 ctypedef long int suseconds_t
52
53
54 cdef extern from "sys/time.h":
55 cdef struct timeval:
56 time_t tv_sec
57 suseconds_t tv_usec
58
59
60 cdef extern from "rados/rados_types.h" nogil:
61 cdef char* _LIBRADOS_ALL_NSPACES "LIBRADOS_ALL_NSPACES"
62
63
64 cdef extern from "rados/librados.h" nogil:
65 enum:
66 _LIBRADOS_OP_FLAG_EXCL "LIBRADOS_OP_FLAG_EXCL"
67 _LIBRADOS_OP_FLAG_FAILOK "LIBRADOS_OP_FLAG_FAILOK"
68 _LIBRADOS_OP_FLAG_FADVISE_RANDOM "LIBRADOS_OP_FLAG_FADVISE_RANDOM"
69 _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL "LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL"
70 _LIBRADOS_OP_FLAG_FADVISE_WILLNEED "LIBRADOS_OP_FLAG_FADVISE_WILLNEED"
71 _LIBRADOS_OP_FLAG_FADVISE_DONTNEED "LIBRADOS_OP_FLAG_FADVISE_DONTNEED"
72 _LIBRADOS_OP_FLAG_FADVISE_NOCACHE "LIBRADOS_OP_FLAG_FADVISE_NOCACHE"
73
74
75 enum:
76 _LIBRADOS_OPERATION_NOFLAG "LIBRADOS_OPERATION_NOFLAG"
77 _LIBRADOS_OPERATION_BALANCE_READS "LIBRADOS_OPERATION_BALANCE_READS"
78 _LIBRADOS_OPERATION_LOCALIZE_READS "LIBRADOS_OPERATION_LOCALIZE_READS"
79 _LIBRADOS_OPERATION_ORDER_READS_WRITES "LIBRADOS_OPERATION_ORDER_READS_WRITES"
80 _LIBRADOS_OPERATION_IGNORE_CACHE "LIBRADOS_OPERATION_IGNORE_CACHE"
81 _LIBRADOS_OPERATION_SKIPRWLOCKS "LIBRADOS_OPERATION_SKIPRWLOCKS"
82 _LIBRADOS_OPERATION_IGNORE_OVERLAY "LIBRADOS_OPERATION_IGNORE_OVERLAY"
83 _LIBRADOS_CREATE_EXCLUSIVE "LIBRADOS_CREATE_EXCLUSIVE"
84 _LIBRADOS_CREATE_IDEMPOTENT "LIBRADOS_CREATE_IDEMPOTENT"
85
86 cdef uint64_t _LIBRADOS_SNAP_HEAD "LIBRADOS_SNAP_HEAD"
87
88 ctypedef void* rados_t
89 ctypedef void* rados_config_t
90 ctypedef void* rados_ioctx_t
91 ctypedef void* rados_xattrs_iter_t
92 ctypedef void* rados_omap_iter_t
93 ctypedef void* rados_list_ctx_t
94 ctypedef uint64_t rados_snap_t
95 ctypedef void *rados_write_op_t
96 ctypedef void *rados_read_op_t
97 ctypedef void *rados_completion_t
98 ctypedef void (*rados_callback_t)(rados_completion_t cb, void *arg)
99 ctypedef void (*rados_log_callback_t)(void *arg, const char *line, const char *who,
100 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
101
102
103 cdef struct rados_cluster_stat_t:
104 uint64_t kb
105 uint64_t kb_used
106 uint64_t kb_avail
107 uint64_t num_objects
108
109 cdef struct rados_pool_stat_t:
110 uint64_t num_bytes
111 uint64_t num_kb
112 uint64_t num_objects
113 uint64_t num_object_clones
114 uint64_t num_object_copies
115 uint64_t num_objects_missing_on_primary
116 uint64_t num_objects_unfound
117 uint64_t num_objects_degraded
118 uint64_t num_rd
119 uint64_t num_rd_kb
120 uint64_t num_wr
121 uint64_t num_wr_kb
122
123 void rados_buffer_free(char *buf)
124
125 void rados_version(int *major, int *minor, int *extra)
126 int rados_create2(rados_t *pcluster, const char *const clustername,
127 const char * const name, uint64_t flags)
128 int rados_create_with_context(rados_t *cluster, rados_config_t cct)
129 int rados_connect(rados_t cluster)
130 void rados_shutdown(rados_t cluster)
131 int rados_conf_read_file(rados_t cluster, const char *path)
132 int rados_conf_parse_argv_remainder(rados_t cluster, int argc, const char **argv, const char **remargv)
133 int rados_conf_parse_env(rados_t cluster, const char *var)
134 int rados_conf_set(rados_t cluster, char *option, const char *value)
135 int rados_conf_get(rados_t cluster, char *option, char *buf, size_t len)
136
137 int rados_ioctx_pool_stat(rados_ioctx_t io, rados_pool_stat_t *stats)
138 int64_t rados_pool_lookup(rados_t cluster, const char *pool_name)
139 int rados_pool_reverse_lookup(rados_t cluster, int64_t id, char *buf, size_t maxlen)
140 int rados_pool_create(rados_t cluster, const char *pool_name)
141 int rados_pool_create_with_auid(rados_t cluster, const char *pool_name, uint64_t auid)
142 int rados_pool_create_with_crush_rule(rados_t cluster, const char *pool_name, uint8_t crush_rule_num)
143 int rados_pool_create_with_all(rados_t cluster, const char *pool_name, uint64_t auid, uint8_t crush_rule_num)
144 int rados_pool_get_base_tier(rados_t cluster, int64_t pool, int64_t *base_tier)
145 int rados_pool_list(rados_t cluster, char *buf, size_t len)
146 int rados_pool_delete(rados_t cluster, const char *pool_name)
147 int rados_inconsistent_pg_list(rados_t cluster, int64_t pool, char *buf, size_t len)
148
149 int rados_cluster_stat(rados_t cluster, rados_cluster_stat_t *result)
150 int rados_cluster_fsid(rados_t cluster, char *buf, size_t len)
151 int rados_blacklist_add(rados_t cluster, char *client_address, uint32_t expire_seconds)
152
153 int rados_ping_monitor(rados_t cluster, const char *mon_id, char **outstr, size_t *outstrlen)
154 int rados_mon_command(rados_t cluster, const char **cmd, size_t cmdlen,
155 const char *inbuf, size_t inbuflen,
156 char **outbuf, size_t *outbuflen,
157 char **outs, size_t *outslen)
158 int rados_mgr_command(rados_t cluster, const char **cmd, size_t cmdlen,
159 const char *inbuf, size_t inbuflen,
160 char **outbuf, size_t *outbuflen,
161 char **outs, size_t *outslen)
162 int rados_mon_command_target(rados_t cluster, const char *name, const char **cmd, size_t cmdlen,
163 const char *inbuf, size_t inbuflen,
164 char **outbuf, size_t *outbuflen,
165 char **outs, size_t *outslen)
166 int rados_osd_command(rados_t cluster, int osdid, const char **cmd, size_t cmdlen,
167 const char *inbuf, size_t inbuflen,
168 char **outbuf, size_t *outbuflen,
169 char **outs, size_t *outslen)
170 int rados_pg_command(rados_t cluster, const char *pgstr, const char **cmd, size_t cmdlen,
171 const char *inbuf, size_t inbuflen,
172 char **outbuf, size_t *outbuflen,
173 char **outs, size_t *outslen)
174 int rados_monitor_log(rados_t cluster, const char *level, rados_log_callback_t cb, void *arg)
175
176 int rados_wait_for_latest_osdmap(rados_t cluster)
177
178 int rados_ioctx_create(rados_t cluster, const char *pool_name, rados_ioctx_t *ioctx)
179 void rados_ioctx_destroy(rados_ioctx_t io)
180 int rados_ioctx_pool_set_auid(rados_ioctx_t io, uint64_t auid)
181 void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key)
182 void rados_ioctx_set_namespace(rados_ioctx_t io, const char * nspace)
183
184 uint64_t rados_get_last_version(rados_ioctx_t io)
185 int rados_stat(rados_ioctx_t io, const char *o, uint64_t *psize, time_t *pmtime)
186 int rados_write(rados_ioctx_t io, const char *oid, const char *buf, size_t len, uint64_t off)
187 int rados_write_full(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
188 int rados_append(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
189 int rados_read(rados_ioctx_t io, const char *oid, char *buf, size_t len, uint64_t off)
190 int rados_remove(rados_ioctx_t io, const char *oid)
191 int rados_trunc(rados_ioctx_t io, const char *oid, uint64_t size)
192 int rados_getxattr(rados_ioctx_t io, const char *o, const char *name, char *buf, size_t len)
193 int rados_setxattr(rados_ioctx_t io, const char *o, const char *name, const char *buf, size_t len)
194 int rados_rmxattr(rados_ioctx_t io, const char *o, const char *name)
195 int rados_getxattrs(rados_ioctx_t io, const char *oid, rados_xattrs_iter_t *iter)
196 int rados_getxattrs_next(rados_xattrs_iter_t iter, const char **name, const char **val, size_t *len)
197 void rados_getxattrs_end(rados_xattrs_iter_t iter)
198
199 int rados_nobjects_list_open(rados_ioctx_t io, rados_list_ctx_t *ctx)
200 int rados_nobjects_list_next(rados_list_ctx_t ctx, const char **entry, const char **key, const char **nspace)
201 void rados_nobjects_list_close(rados_list_ctx_t ctx)
202
203 int rados_ioctx_snap_rollback(rados_ioctx_t io, const char * oid, const char * snapname)
204 int rados_ioctx_snap_create(rados_ioctx_t io, const char * snapname)
205 int rados_ioctx_snap_remove(rados_ioctx_t io, const char * snapname)
206 int rados_ioctx_snap_lookup(rados_ioctx_t io, const char * name, rados_snap_t * id)
207 int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id, char * name, int maxlen)
208 void rados_ioctx_snap_set_read(rados_ioctx_t io, rados_snap_t snap)
209 int rados_ioctx_snap_list(rados_ioctx_t io, rados_snap_t * snaps, int maxlen)
210 int rados_ioctx_snap_get_stamp(rados_ioctx_t io, rados_snap_t id, time_t * t)
211
212 int rados_lock_exclusive(rados_ioctx_t io, const char * oid, const char * name,
213 const char * cookie, const char * desc,
214 timeval * duration, uint8_t flags)
215 int rados_lock_shared(rados_ioctx_t io, const char * o, const char * name,
216 const char * cookie, const char * tag, const char * desc,
217 timeval * duration, uint8_t flags)
218 int rados_unlock(rados_ioctx_t io, const char * o, const char * name, const char * cookie)
219
220 rados_write_op_t rados_create_write_op()
221 void rados_release_write_op(rados_write_op_t write_op)
222
223 rados_read_op_t rados_create_read_op()
224 void rados_release_read_op(rados_read_op_t read_op)
225
226 int rados_aio_create_completion(void * cb_arg, rados_callback_t cb_complete, rados_callback_t cb_safe, rados_completion_t * pc)
227 void rados_aio_release(rados_completion_t c)
228 int rados_aio_stat(rados_ioctx_t io, const char *oid, rados_completion_t completion, uint64_t *psize, time_t *pmtime)
229 int rados_aio_write(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len, uint64_t off)
230 int rados_aio_append(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
231 int rados_aio_write_full(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
232 int rados_aio_remove(rados_ioctx_t io, const char * oid, rados_completion_t completion)
233 int rados_aio_read(rados_ioctx_t io, const char * oid, rados_completion_t completion, char * buf, size_t len, uint64_t off)
234 int rados_aio_flush(rados_ioctx_t io)
235
236 int rados_aio_get_return_value(rados_completion_t c)
237 int rados_aio_wait_for_complete_and_cb(rados_completion_t c)
238 int rados_aio_wait_for_safe_and_cb(rados_completion_t c)
239 int rados_aio_wait_for_complete(rados_completion_t c)
240 int rados_aio_wait_for_safe(rados_completion_t c)
241 int rados_aio_is_complete(rados_completion_t c)
242 int rados_aio_is_safe(rados_completion_t c)
243
244 int rados_exec(rados_ioctx_t io, const char * oid, const char * cls, const char * method,
245 const char * in_buf, size_t in_len, char * buf, size_t out_len)
246 int rados_aio_exec(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * cls, const char * method,
247 const char * in_buf, size_t in_len, char * buf, size_t out_len)
248
249 int rados_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, const char * oid, time_t * mtime, int flags)
250 int rados_aio_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, time_t *mtime, int flags)
251 void rados_write_op_omap_set(rados_write_op_t write_op, const char * const* keys, const char * const* vals, const size_t * lens, size_t num)
252 void rados_write_op_omap_rm_keys(rados_write_op_t write_op, const char * const* keys, size_t keys_len)
253 void rados_write_op_omap_clear(rados_write_op_t write_op)
254 void rados_write_op_set_flags(rados_write_op_t write_op, int flags)
255
256 void rados_write_op_create(rados_write_op_t write_op, int exclusive, const char *category)
257 void rados_write_op_append(rados_write_op_t write_op, const char *buffer, size_t len)
258 void rados_write_op_write_full(rados_write_op_t write_op, const char *buffer, size_t len)
259 void rados_write_op_write(rados_write_op_t write_op, const char *buffer, size_t len, uint64_t offset)
260 void rados_write_op_remove(rados_write_op_t write_op)
261 void rados_write_op_truncate(rados_write_op_t write_op, uint64_t offset)
262 void rados_write_op_zero(rados_write_op_t write_op, uint64_t offset, uint64_t len)
263
264 void rados_read_op_omap_get_vals(rados_read_op_t read_op, const char * start_after, const char * filter_prefix, uint64_t max_return, rados_omap_iter_t * iter, int * prval)
265 void rados_read_op_omap_get_keys(rados_read_op_t read_op, const char * start_after, uint64_t max_return, rados_omap_iter_t * iter, int * prval)
266 void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op, const char * const* keys, size_t keys_len, rados_omap_iter_t * iter, int * prval)
267 int rados_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, const char * oid, int flags)
268 int rados_aio_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, int flags)
269 void rados_read_op_set_flags(rados_read_op_t read_op, int flags)
270 int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len)
271 void rados_omap_get_end(rados_omap_iter_t iter)
272
273
274 LIBRADOS_OP_FLAG_EXCL = _LIBRADOS_OP_FLAG_EXCL
275 LIBRADOS_OP_FLAG_FAILOK = _LIBRADOS_OP_FLAG_FAILOK
276 LIBRADOS_OP_FLAG_FADVISE_RANDOM = _LIBRADOS_OP_FLAG_FADVISE_RANDOM
277 LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL = _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL
278 LIBRADOS_OP_FLAG_FADVISE_WILLNEED = _LIBRADOS_OP_FLAG_FADVISE_WILLNEED
279 LIBRADOS_OP_FLAG_FADVISE_DONTNEED = _LIBRADOS_OP_FLAG_FADVISE_DONTNEED
280 LIBRADOS_OP_FLAG_FADVISE_NOCACHE = _LIBRADOS_OP_FLAG_FADVISE_NOCACHE
281
282 LIBRADOS_SNAP_HEAD = _LIBRADOS_SNAP_HEAD
283
284 LIBRADOS_OPERATION_NOFLAG = _LIBRADOS_OPERATION_NOFLAG
285 LIBRADOS_OPERATION_BALANCE_READS = _LIBRADOS_OPERATION_BALANCE_READS
286 LIBRADOS_OPERATION_LOCALIZE_READS = _LIBRADOS_OPERATION_LOCALIZE_READS
287 LIBRADOS_OPERATION_ORDER_READS_WRITES = _LIBRADOS_OPERATION_ORDER_READS_WRITES
288 LIBRADOS_OPERATION_IGNORE_CACHE = _LIBRADOS_OPERATION_IGNORE_CACHE
289 LIBRADOS_OPERATION_SKIPRWLOCKS = _LIBRADOS_OPERATION_SKIPRWLOCKS
290 LIBRADOS_OPERATION_IGNORE_OVERLAY = _LIBRADOS_OPERATION_IGNORE_OVERLAY
291
292 LIBRADOS_ALL_NSPACES = _LIBRADOS_ALL_NSPACES.decode('utf-8')
293
294 LIBRADOS_CREATE_EXCLUSIVE = _LIBRADOS_CREATE_EXCLUSIVE
295 LIBRADOS_CREATE_IDEMPOTENT = _LIBRADOS_CREATE_IDEMPOTENT
296
297 ANONYMOUS_AUID = 0xffffffffffffffff
298 ADMIN_AUID = 0
299
300
301 class Error(Exception):
302 """ `Error` class, derived from `Exception` """
303 pass
304
305
306 class InvalidArgumentError(Error):
307 pass
308
309
310 class OSError(Error):
311 """ `OSError` class, derived from `Error` """
312 def __init__(self, errno, strerror):
313 self.errno = errno
314 self.strerror = strerror
315
316 def __str__(self):
317 return '[Errno {0}] {1}'.format(self.errno, self.strerror)
318
319
320 class InterruptedOrTimeoutError(OSError):
321 """ `InterruptedOrTimeoutError` class, derived from `OSError` """
322 pass
323
324
325 class PermissionError(OSError):
326 """ `PermissionError` class, derived from `OSError` """
327 pass
328
329
330 class PermissionDeniedError(OSError):
331 """ deal with EACCES related. """
332 pass
333
334
335 class ObjectNotFound(OSError):
336 """ `ObjectNotFound` class, derived from `OSError` """
337 pass
338
339
340 class NoData(OSError):
341 """ `NoData` class, derived from `OSError` """
342 pass
343
344
345 class ObjectExists(OSError):
346 """ `ObjectExists` class, derived from `OSError` """
347 pass
348
349
350 class ObjectBusy(OSError):
351 """ `ObjectBusy` class, derived from `IOError` """
352 pass
353
354
355 class IOError(OSError):
356 """ `ObjectBusy` class, derived from `OSError` """
357 pass
358
359
360 class NoSpace(OSError):
361 """ `NoSpace` class, derived from `OSError` """
362 pass
363
364
365 class RadosStateError(Error):
366 """ `RadosStateError` class, derived from `Error` """
367 pass
368
369
370 class IoctxStateError(Error):
371 """ `IoctxStateError` class, derived from `Error` """
372 pass
373
374
375 class ObjectStateError(Error):
376 """ `ObjectStateError` class, derived from `Error` """
377 pass
378
379
380 class LogicError(Error):
381 """ `` class, derived from `Error` """
382 pass
383
384
385 class TimedOut(OSError):
386 """ `TimedOut` class, derived from `OSError` """
387 pass
388
389
390 IF UNAME_SYSNAME == "FreeBSD":
391 cdef errno_to_exception = {
392 errno.EPERM : PermissionError,
393 errno.ENOENT : ObjectNotFound,
394 errno.EIO : IOError,
395 errno.ENOSPC : NoSpace,
396 errno.EEXIST : ObjectExists,
397 errno.EBUSY : ObjectBusy,
398 errno.ENOATTR : NoData,
399 errno.EINTR : InterruptedOrTimeoutError,
400 errno.ETIMEDOUT : TimedOut,
401 errno.EACCES : PermissionDeniedError,
402 errno.EINVAL : InvalidArgumentError,
403 }
404 ELSE:
405 cdef errno_to_exception = {
406 errno.EPERM : PermissionError,
407 errno.ENOENT : ObjectNotFound,
408 errno.EIO : IOError,
409 errno.ENOSPC : NoSpace,
410 errno.EEXIST : ObjectExists,
411 errno.EBUSY : ObjectBusy,
412 errno.ENODATA : NoData,
413 errno.EINTR : InterruptedOrTimeoutError,
414 errno.ETIMEDOUT : TimedOut,
415 errno.EACCES : PermissionDeniedError,
416 errno.EINVAL : InvalidArgumentError,
417 }
418
419
420 cdef make_ex(ret, msg):
421 """
422 Translate a librados return code into an exception.
423
424 :param ret: the return code
425 :type ret: int
426 :param msg: the error message to use
427 :type msg: str
428 :returns: a subclass of :class:`Error`
429 """
430 ret = abs(ret)
431 if ret in errno_to_exception:
432 return errno_to_exception[ret](ret, msg)
433 else:
434 return Error(ret, msg + (": error code %d" % ret))
435
436
437 # helper to specify an optional argument, where in addition to `cls`, `None`
438 # is also acceptable
439 def opt(cls):
440 return (cls, None)
441
442
443 # validate argument types of an instance method
444 # kwargs is an un-ordered dict, so use args instead
445 def requires(*types):
446 def is_type_of(v, t):
447 if t is None:
448 return v is None
449 else:
450 return isinstance(v, t)
451
452 def check_type(val, arg_name, arg_type):
453 if isinstance(arg_type, tuple):
454 if any(is_type_of(val, t) for t in arg_type):
455 return
456 type_names = ' or '.join('None' if t is None else t.__name__
457 for t in arg_type)
458 raise TypeError('%s must be %s' % (arg_name, type_names))
459 else:
460 if is_type_of(val, arg_type):
461 return
462 assert(arg_type is not None)
463 raise TypeError('%s must be %s' % (arg_name, arg_type.__name__))
464
465 def wrapper(f):
466 # FIXME(sileht): this stop with
467 # AttributeError: 'method_descriptor' object has no attribute '__module__'
468 # @wraps(f)
469 def validate_func(*args, **kwargs):
470 # ignore the `self` arg
471 pos_args = zip(args[1:], types)
472 named_args = ((kwargs[name], (name, spec)) for name, spec in types
473 if name in kwargs)
474 for arg_val, (arg_name, arg_type) in chain(pos_args, named_args):
475 check_type(arg_val, arg_name, arg_type)
476 return f(*args, **kwargs)
477 return validate_func
478 return wrapper
479
480
481 def cstr(val, name, encoding="utf-8", opt=False):
482 """
483 Create a byte string from a Python string
484
485 :param basestring val: Python string
486 :param str name: Name of the string parameter, for exceptions
487 :param str encoding: Encoding to use
488 :param bool opt: If True, None is allowed
489 :rtype: bytes
490 :raises: :class:`InvalidArgument`
491 """
492 if opt and val is None:
493 return None
494 if isinstance(val, bytes):
495 return val
496 elif isinstance(val, unicode):
497 return val.encode(encoding)
498 else:
499 raise TypeError('%s must be a string' % name)
500
501
502 def cstr_list(list_str, name, encoding="utf-8"):
503 return [cstr(s, name) for s in list_str]
504
505
506 def decode_cstr(val, encoding="utf-8"):
507 """
508 Decode a byte string into a Python string.
509
510 :param bytes val: byte string
511 :rtype: unicode or None
512 """
513 if val is None:
514 return None
515
516 return val.decode(encoding)
517
518
519 cdef char* opt_str(s) except? NULL:
520 if s is None:
521 return NULL
522 return s
523
524
525 cdef void* realloc_chk(void* ptr, size_t size) except NULL:
526 cdef void *ret = realloc(ptr, size)
527 if ret == NULL:
528 raise MemoryError("realloc failed")
529 return ret
530
531
532 cdef size_t * to_csize_t_array(list_int):
533 cdef size_t *ret = <size_t *>malloc(len(list_int) * sizeof(size_t))
534 if ret == NULL:
535 raise MemoryError("malloc failed")
536 for i in xrange(len(list_int)):
537 ret[i] = <size_t>list_int[i]
538 return ret
539
540
541 cdef char ** to_bytes_array(list_bytes):
542 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
543 if ret == NULL:
544 raise MemoryError("malloc failed")
545 for i in xrange(len(list_bytes)):
546 ret[i] = <char *>list_bytes[i]
547 return ret
548
549
550
551 cdef int __monitor_callback(void *arg, const char *line, const char *who,
552 uint64_t sec, uint64_t nsec, uint64_t seq,
553 const char *level, const char *msg) with gil:
554 cdef object cb_info = <object>arg
555 cb_info[0](cb_info[1], line, who, sec, nsec, seq, level, msg)
556 return 0
557
558
559 class Version(object):
560 """ Version information """
561 def __init__(self, major, minor, extra):
562 self.major = major
563 self.minor = minor
564 self.extra = extra
565
566 def __str__(self):
567 return "%d.%d.%d" % (self.major, self.minor, self.extra)
568
569
570 cdef class Rados(object):
571 """This class wraps librados functions"""
572 # NOTE(sileht): attributes declared in .pyd
573
574 def __init__(self, *args, **kwargs):
575 PyEval_InitThreads()
576 self.__setup(*args, **kwargs)
577
578 @requires(('rados_id', opt(str_type)), ('name', opt(str_type)), ('clustername', opt(str_type)),
579 ('conffile', opt(str_type)))
580 def __setup(self, rados_id=None, name=None, clustername=None,
581 conf_defaults=None, conffile=None, conf=None, flags=0,
582 context=None):
583 self.monitor_callback = None
584 self.parsed_args = []
585 self.conf_defaults = conf_defaults
586 self.conffile = conffile
587 self.rados_id = rados_id
588
589 if rados_id and name:
590 raise Error("Rados(): can't supply both rados_id and name")
591 elif rados_id:
592 name = 'client.' + rados_id
593 elif name is None:
594 name = 'client.admin'
595 if clustername is None:
596 clustername = ''
597
598 name = cstr(name, 'name')
599 clustername = cstr(clustername, 'clustername')
600 cdef:
601 char *_name = name
602 char *_clustername = clustername
603 int _flags = flags
604 int ret
605
606 if context:
607 # Unpack void* (aka rados_config_t) from capsule
608 rados_config = <rados_config_t> PyCapsule_GetPointer(context, NULL)
609 with nogil:
610 ret = rados_create_with_context(&self.cluster, rados_config)
611 else:
612 with nogil:
613 ret = rados_create2(&self.cluster, _clustername, _name, _flags)
614 if ret != 0:
615 raise Error("rados_initialize failed with error code: %d" % ret)
616
617 self.state = "configuring"
618 # order is important: conf_defaults, then conffile, then conf
619 if conf_defaults:
620 for key, value in conf_defaults.items():
621 self.conf_set(key, value)
622 if conffile is not None:
623 # read the default conf file when '' is given
624 if conffile == '':
625 conffile = None
626 self.conf_read_file(conffile)
627 if conf:
628 for key, value in conf.items():
629 self.conf_set(key, value)
630
631 def require_state(self, *args):
632 """
633 Checks if the Rados object is in a special state
634
635 :raises: RadosStateError
636 """
637 if self.state in args:
638 return
639 raise RadosStateError("You cannot perform that operation on a \
640 Rados object in state %s." % self.state)
641
642 def shutdown(self):
643 """
644 Disconnects from the cluster. Call this explicitly when a
645 Rados.connect()ed object is no longer used.
646 """
647 if self.state != "shutdown":
648 with nogil:
649 rados_shutdown(self.cluster)
650 self.state = "shutdown"
651
652 def __enter__(self):
653 self.connect()
654 return self
655
656 def __exit__(self, type_, value, traceback):
657 self.shutdown()
658 return False
659
660 def version(self):
661 """
662 Get the version number of the ``librados`` C library.
663
664 :returns: a tuple of ``(major, minor, extra)`` components of the
665 librados version
666 """
667 cdef int major = 0
668 cdef int minor = 0
669 cdef int extra = 0
670 with nogil:
671 rados_version(&major, &minor, &extra)
672 return Version(major, minor, extra)
673
674 @requires(('path', opt(str_type)))
675 def conf_read_file(self, path=None):
676 """
677 Configure the cluster handle using a Ceph config file.
678
679 :param path: path to the config file
680 :type path: str
681 """
682 self.require_state("configuring", "connected")
683 path = cstr(path, 'path', opt=True)
684 cdef:
685 char *_path = opt_str(path)
686 with nogil:
687 ret = rados_conf_read_file(self.cluster, _path)
688 if ret != 0:
689 raise make_ex(ret, "error calling conf_read_file")
690
691 def conf_parse_argv(self, args):
692 """
693 Parse known arguments from args, and remove; returned
694 args contain only those unknown to ceph
695 """
696 self.require_state("configuring", "connected")
697 if not args:
698 return
699
700 cargs = cstr_list(args, 'args')
701 cdef:
702 int _argc = len(args)
703 char **_argv = to_bytes_array(cargs)
704 char **_remargv = NULL
705
706 try:
707 _remargv = <char **>malloc(_argc * sizeof(char *))
708 with nogil:
709 ret = rados_conf_parse_argv_remainder(self.cluster, _argc,
710 <const char**>_argv,
711 <const char**>_remargv)
712 if ret:
713 raise make_ex(ret, "error calling conf_parse_argv_remainder")
714
715 # _remargv was allocated with fixed argc; collapse return
716 # list to eliminate any missing args
717 retargs = [decode_cstr(a) for a in _remargv[:_argc]
718 if a != NULL]
719 self.parsed_args = args
720 return retargs
721 finally:
722 free(_argv)
723 free(_remargv)
724
725 def conf_parse_env(self, var='CEPH_ARGS'):
726 """
727 Parse known arguments from an environment variable, normally
728 CEPH_ARGS.
729 """
730 self.require_state("configuring", "connected")
731 if not var:
732 return
733
734 var = cstr(var, 'var')
735 cdef:
736 char *_var = var
737 with nogil:
738 ret = rados_conf_parse_env(self.cluster, _var)
739 if ret != 0:
740 raise make_ex(ret, "error calling conf_parse_env")
741
742 @requires(('option', str_type))
743 def conf_get(self, option):
744 """
745 Get the value of a configuration option
746
747 :param option: which option to read
748 :type option: str
749
750 :returns: str - value of the option or None
751 :raises: :class:`TypeError`
752 """
753 self.require_state("configuring", "connected")
754 option = cstr(option, 'option')
755 cdef:
756 char *_option = option
757 size_t length = 20
758 char *ret_buf = NULL
759
760 try:
761 while True:
762 ret_buf = <char *>realloc_chk(ret_buf, length)
763 with nogil:
764 ret = rados_conf_get(self.cluster, _option, ret_buf, length)
765 if ret == 0:
766 return decode_cstr(ret_buf)
767 elif ret == -errno.ENAMETOOLONG:
768 length = length * 2
769 elif ret == -errno.ENOENT:
770 return None
771 else:
772 raise make_ex(ret, "error calling conf_get")
773 finally:
774 free(ret_buf)
775
776 @requires(('option', str_type), ('val', str_type))
777 def conf_set(self, option, val):
778 """
779 Set the value of a configuration option
780
781 :param option: which option to set
782 :type option: str
783 :param option: value of the option
784 :type option: str
785
786 :raises: :class:`TypeError`, :class:`ObjectNotFound`
787 """
788 self.require_state("configuring", "connected")
789 option = cstr(option, 'option')
790 val = cstr(val, 'val')
791 cdef:
792 char *_option = option
793 char *_val = val
794
795 with nogil:
796 ret = rados_conf_set(self.cluster, _option, _val)
797 if ret != 0:
798 raise make_ex(ret, "error calling conf_set")
799
800 def ping_monitor(self, mon_id):
801 """
802 Ping a monitor to assess liveness
803
804 May be used as a simply way to assess liveness, or to obtain
805 information about the monitor in a simple way even in the
806 absence of quorum.
807
808 :param mon_id: the ID portion of the monitor's name (i.e., mon.<ID>)
809 :type mon_id: str
810 :returns: the string reply from the monitor
811 """
812
813 self.require_state("configuring", "connected")
814
815 mon_id = cstr(mon_id, 'mon_id')
816 cdef:
817 char *_mon_id = mon_id
818 size_t outstrlen
819 char *outstr
820
821 with nogil:
822 ret = rados_ping_monitor(self.cluster, _mon_id, &outstr, &outstrlen)
823
824 if ret != 0:
825 raise make_ex(ret, "error calling ping_monitor")
826
827 if outstrlen:
828 my_outstr = outstr[:outstrlen]
829 rados_buffer_free(outstr)
830 return decode_cstr(my_outstr)
831
832 def connect(self, timeout=0):
833 """
834 Connect to the cluster. Use shutdown() to release resources.
835 """
836 self.require_state("configuring")
837 # NOTE(sileht): timeout was supported by old python API,
838 # but this is not something available in C API, so ignore
839 # for now and remove it later
840 with nogil:
841 ret = rados_connect(self.cluster)
842 if ret != 0:
843 raise make_ex(ret, "error connecting to the cluster")
844 self.state = "connected"
845
846 def get_cluster_stats(self):
847 """
848 Read usage info about the cluster
849
850 This tells you total space, space used, space available, and number
851 of objects. These are not updated immediately when data is written,
852 they are eventually consistent.
853
854 :returns: dict - contains the following keys:
855
856 - ``kb`` (int) - total space
857
858 - ``kb_used`` (int) - space used
859
860 - ``kb_avail`` (int) - free space available
861
862 - ``num_objects`` (int) - number of objects
863
864 """
865 cdef:
866 rados_cluster_stat_t stats
867
868 with nogil:
869 ret = rados_cluster_stat(self.cluster, &stats)
870
871 if ret < 0:
872 raise make_ex(
873 ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id)
874 return {'kb': stats.kb,
875 'kb_used': stats.kb_used,
876 'kb_avail': stats.kb_avail,
877 'num_objects': stats.num_objects}
878
879 @requires(('pool_name', str_type))
880 def pool_exists(self, pool_name):
881 """
882 Checks if a given pool exists.
883
884 :param pool_name: name of the pool to check
885 :type pool_name: str
886
887 :raises: :class:`TypeError`, :class:`Error`
888 :returns: bool - whether the pool exists, false otherwise.
889 """
890 self.require_state("connected")
891
892 pool_name = cstr(pool_name, 'pool_name')
893 cdef:
894 char *_pool_name = pool_name
895
896 with nogil:
897 ret = rados_pool_lookup(self.cluster, _pool_name)
898 if ret >= 0:
899 return True
900 elif ret == -errno.ENOENT:
901 return False
902 else:
903 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
904
905 @requires(('pool_name', str_type))
906 def pool_lookup(self, pool_name):
907 """
908 Returns a pool's ID based on its name.
909
910 :param pool_name: name of the pool to look up
911 :type pool_name: str
912
913 :raises: :class:`TypeError`, :class:`Error`
914 :returns: int - pool ID, or None if it doesn't exist
915 """
916 self.require_state("connected")
917 pool_name = cstr(pool_name, 'pool_name')
918 cdef:
919 char *_pool_name = pool_name
920
921 with nogil:
922 ret = rados_pool_lookup(self.cluster, _pool_name)
923 if ret >= 0:
924 return int(ret)
925 elif ret == -errno.ENOENT:
926 return None
927 else:
928 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
929
930 @requires(('pool_id', int))
931 def pool_reverse_lookup(self, pool_id):
932 """
933 Returns a pool's name based on its ID.
934
935 :param pool_id: ID of the pool to look up
936 :type pool_id: int
937
938 :raises: :class:`TypeError`, :class:`Error`
939 :returns: string - pool name, or None if it doesn't exist
940 """
941 self.require_state("connected")
942 cdef:
943 int64_t _pool_id = pool_id
944 size_t size = 512
945 char *name = NULL
946
947 try:
948 while True:
949 name = <char *>realloc_chk(name, size)
950 with nogil:
951 ret = rados_pool_reverse_lookup(self.cluster, _pool_id, name, size)
952 if ret >= 0:
953 break
954 elif ret != -errno.ERANGE and size <= 4096:
955 size *= 2
956 elif ret == -errno.ENOENT:
957 return None
958 elif ret < 0:
959 raise make_ex(ret, "error reverse looking up pool '%s'" % pool_id)
960
961 return decode_cstr(name)
962
963 finally:
964 free(name)
965
966 @requires(('pool_name', str_type), ('auid', opt(int)), ('crush_rule', opt(int)))
967 def create_pool(self, pool_name, auid=None, crush_rule=None):
968 """
969 Create a pool:
970 - with default settings: if auid=None and crush_rule=None
971 - owned by a specific auid: auid given and crush_rule=None
972 - with a specific CRUSH rule: if auid=None and crush_rule given
973 - with a specific CRUSH rule and auid: if auid and crush_rule given
974
975 :param pool_name: name of the pool to create
976 :type pool_name: str
977 :param auid: the id of the owner of the new pool
978 :type auid: int
979 :param crush_rule: rule to use for placement in the new pool
980 :type crush_rule: int
981
982 :raises: :class:`TypeError`, :class:`Error`
983 """
984 self.require_state("connected")
985
986 pool_name = cstr(pool_name, 'pool_name')
987 cdef:
988 char *_pool_name = pool_name
989 uint8_t _crush_rule
990 uint64_t _auid
991
992 if auid is None and crush_rule is None:
993 with nogil:
994 ret = rados_pool_create(self.cluster, _pool_name)
995 elif auid is None:
996 _crush_rule = crush_rule
997 with nogil:
998 ret = rados_pool_create_with_crush_rule(self.cluster, _pool_name, _crush_rule)
999 elif crush_rule is None:
1000 _auid = auid
1001 with nogil:
1002 ret = rados_pool_create_with_auid(self.cluster, _pool_name, _auid)
1003 else:
1004 _auid = auid
1005 _crush_rule = crush_rule
1006 with nogil:
1007 ret = rados_pool_create_with_all(self.cluster, _pool_name, _auid, _crush_rule)
1008 if ret < 0:
1009 raise make_ex(ret, "error creating pool '%s'" % pool_name)
1010
1011 @requires(('pool_id', int))
1012 def get_pool_base_tier(self, pool_id):
1013 """
1014 Get base pool
1015
1016 :returns: base pool, or pool_id if tiering is not configured for the pool
1017 """
1018 self.require_state("connected")
1019 cdef:
1020 int64_t base_tier = 0
1021 int64_t _pool_id = pool_id
1022
1023 with nogil:
1024 ret = rados_pool_get_base_tier(self.cluster, _pool_id, &base_tier)
1025 if ret < 0:
1026 raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id)
1027 return int(base_tier)
1028
1029 @requires(('pool_name', str_type))
1030 def delete_pool(self, pool_name):
1031 """
1032 Delete a pool and all data inside it.
1033
1034 The pool is removed from the cluster immediately,
1035 but the actual data is deleted in the background.
1036
1037 :param pool_name: name of the pool to delete
1038 :type pool_name: str
1039
1040 :raises: :class:`TypeError`, :class:`Error`
1041 """
1042 self.require_state("connected")
1043
1044 pool_name = cstr(pool_name, 'pool_name')
1045 cdef:
1046 char *_pool_name = pool_name
1047
1048 with nogil:
1049 ret = rados_pool_delete(self.cluster, _pool_name)
1050 if ret < 0:
1051 raise make_ex(ret, "error deleting pool '%s'" % pool_name)
1052
1053 @requires(('pool_id', int))
1054 def get_inconsistent_pgs(self, pool_id):
1055 """
1056 List inconsistent placement groups in the given pool
1057
1058 :param pool_id: ID of the pool in which PGs are listed
1059 :type pool_id: int
1060 :returns: list - inconsistent placement groups
1061 """
1062 self.require_state("connected")
1063 cdef:
1064 int64_t pool = pool_id
1065 size_t size = 512
1066 char *pgs = NULL
1067
1068 try:
1069 while True:
1070 pgs = <char *>realloc_chk(pgs, size);
1071 with nogil:
1072 ret = rados_inconsistent_pg_list(self.cluster, pool,
1073 pgs, size)
1074 if ret > <int>size:
1075 size *= 2
1076 elif ret >= 0:
1077 break
1078 else:
1079 raise make_ex(ret, "error calling inconsistent_pg_list")
1080 return [pg for pg in decode_cstr(pgs[:ret]).split('\0') if pg]
1081 finally:
1082 free(pgs)
1083
1084 def list_pools(self):
1085 """
1086 Gets a list of pool names.
1087
1088 :returns: list - of pool names.
1089 """
1090 self.require_state("connected")
1091 cdef:
1092 size_t size = 512
1093 char *c_names = NULL
1094
1095 try:
1096 while True:
1097 c_names = <char *>realloc_chk(c_names, size)
1098 with nogil:
1099 ret = rados_pool_list(self.cluster, c_names, size)
1100 if ret > <int>size:
1101 size *= 2
1102 elif ret >= 0:
1103 break
1104 return [name for name in decode_cstr(c_names[:ret]).split('\0')
1105 if name]
1106 finally:
1107 free(c_names)
1108
1109 def get_fsid(self):
1110 """
1111 Get the fsid of the cluster as a hexadecimal string.
1112
1113 :raises: :class:`Error`
1114 :returns: str - cluster fsid
1115 """
1116 self.require_state("connected")
1117 cdef:
1118 char *ret_buf
1119 size_t buf_len = 37
1120 PyObject* ret_s = NULL
1121
1122 ret_s = PyBytes_FromStringAndSize(NULL, buf_len)
1123 try:
1124 ret_buf = PyBytes_AsString(ret_s)
1125 with nogil:
1126 ret = rados_cluster_fsid(self.cluster, ret_buf, buf_len)
1127 if ret < 0:
1128 raise make_ex(ret, "error getting cluster fsid")
1129 if ret != <int>buf_len:
1130 _PyBytes_Resize(&ret_s, ret)
1131 return <object>ret_s
1132 finally:
1133 # We DECREF unconditionally: the cast to object above will have
1134 # INCREFed if necessary. This also takes care of exceptions,
1135 # including if _PyString_Resize fails (that will free the string
1136 # itself and set ret_s to NULL, hence XDECREF).
1137 ref.Py_XDECREF(ret_s)
1138
1139 @requires(('ioctx_name', str_type))
1140 def open_ioctx(self, ioctx_name):
1141 """
1142 Create an io context
1143
1144 The io context allows you to perform operations within a particular
1145 pool.
1146
1147 :param ioctx_name: name of the pool
1148 :type ioctx_name: str
1149
1150 :raises: :class:`TypeError`, :class:`Error`
1151 :returns: Ioctx - Rados Ioctx object
1152 """
1153 self.require_state("connected")
1154 ioctx_name = cstr(ioctx_name, 'ioctx_name')
1155 cdef:
1156 rados_ioctx_t ioctx
1157 char *_ioctx_name = ioctx_name
1158 with nogil:
1159 ret = rados_ioctx_create(self.cluster, _ioctx_name, &ioctx)
1160 if ret < 0:
1161 raise make_ex(ret, "error opening pool '%s'" % ioctx_name)
1162 io = Ioctx(ioctx_name)
1163 io.io = ioctx
1164 return io
1165
1166 def mon_command(self, cmd, inbuf, timeout=0, target=None):
1167 """
1168 mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen)
1169 returns (int ret, string outbuf, string outs)
1170 """
1171 # NOTE(sileht): timeout is ignored because C API doesn't provide
1172 # timeout argument, but we keep it for backward compat with old python binding
1173
1174 self.require_state("connected")
1175 cmd = cstr_list(cmd, 'c')
1176
1177 if isinstance(target, int):
1178 # NOTE(sileht): looks weird but test_monmap_dump pass int
1179 target = str(target)
1180
1181 target = cstr(target, 'target', opt=True)
1182 inbuf = cstr(inbuf, 'inbuf')
1183
1184 cdef:
1185 char *_target = opt_str(target)
1186 char **_cmd = to_bytes_array(cmd)
1187 size_t _cmdlen = len(cmd)
1188
1189 char *_inbuf = inbuf
1190 size_t _inbuf_len = len(inbuf)
1191
1192 char *_outbuf
1193 size_t _outbuf_len
1194 char *_outs
1195 size_t _outs_len
1196
1197 try:
1198 if target:
1199 with nogil:
1200 ret = rados_mon_command_target(self.cluster, _target,
1201 <const char **>_cmd, _cmdlen,
1202 <const char*>_inbuf, _inbuf_len,
1203 &_outbuf, &_outbuf_len,
1204 &_outs, &_outs_len)
1205 else:
1206 with nogil:
1207 ret = rados_mon_command(self.cluster,
1208 <const char **>_cmd, _cmdlen,
1209 <const char*>_inbuf, _inbuf_len,
1210 &_outbuf, &_outbuf_len,
1211 &_outs, &_outs_len)
1212
1213 my_outs = decode_cstr(_outs[:_outs_len])
1214 my_outbuf = _outbuf[:_outbuf_len]
1215 if _outs_len:
1216 rados_buffer_free(_outs)
1217 if _outbuf_len:
1218 rados_buffer_free(_outbuf)
1219 return (ret, my_outbuf, my_outs)
1220 finally:
1221 free(_cmd)
1222
1223 def osd_command(self, osdid, cmd, inbuf, timeout=0):
1224 """
1225 osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1226 returns (int ret, string outbuf, string outs)
1227 """
1228 # NOTE(sileht): timeout is ignored because C API doesn't provide
1229 # timeout argument, but we keep it for backward compat with old python binding
1230 self.require_state("connected")
1231
1232 cmd = cstr_list(cmd, 'cmd')
1233 inbuf = cstr(inbuf, 'inbuf')
1234
1235 cdef:
1236 int _osdid = osdid
1237 char **_cmd = to_bytes_array(cmd)
1238 size_t _cmdlen = len(cmd)
1239
1240 char *_inbuf = inbuf
1241 size_t _inbuf_len = len(inbuf)
1242
1243 char *_outbuf
1244 size_t _outbuf_len
1245 char *_outs
1246 size_t _outs_len
1247
1248 try:
1249 with nogil:
1250 ret = rados_osd_command(self.cluster, _osdid,
1251 <const char **>_cmd, _cmdlen,
1252 <const char*>_inbuf, _inbuf_len,
1253 &_outbuf, &_outbuf_len,
1254 &_outs, &_outs_len)
1255
1256 my_outs = decode_cstr(_outs[:_outs_len])
1257 my_outbuf = _outbuf[:_outbuf_len]
1258 if _outs_len:
1259 rados_buffer_free(_outs)
1260 if _outbuf_len:
1261 rados_buffer_free(_outbuf)
1262 return (ret, my_outbuf, my_outs)
1263 finally:
1264 free(_cmd)
1265
1266 def mgr_command(self, cmd, inbuf, timeout=0):
1267 """
1268 returns (int ret, string outbuf, string outs)
1269 """
1270 # NOTE(sileht): timeout is ignored because C API doesn't provide
1271 # timeout argument, but we keep it for backward compat with old python binding
1272 self.require_state("connected")
1273
1274 cmd = cstr_list(cmd, 'cmd')
1275 inbuf = cstr(inbuf, 'inbuf')
1276
1277 cdef:
1278 char **_cmd = to_bytes_array(cmd)
1279 size_t _cmdlen = len(cmd)
1280
1281 char *_inbuf = inbuf
1282 size_t _inbuf_len = len(inbuf)
1283
1284 char *_outbuf
1285 size_t _outbuf_len
1286 char *_outs
1287 size_t _outs_len
1288
1289 try:
1290 with nogil:
1291 ret = rados_mgr_command(self.cluster,
1292 <const char **>_cmd, _cmdlen,
1293 <const char*>_inbuf, _inbuf_len,
1294 &_outbuf, &_outbuf_len,
1295 &_outs, &_outs_len)
1296
1297 my_outs = decode_cstr(_outs[:_outs_len])
1298 my_outbuf = _outbuf[:_outbuf_len]
1299 if _outs_len:
1300 rados_buffer_free(_outs)
1301 if _outbuf_len:
1302 rados_buffer_free(_outbuf)
1303 return (ret, my_outbuf, my_outs)
1304 finally:
1305 free(_cmd)
1306
1307 def pg_command(self, pgid, cmd, inbuf, timeout=0):
1308 """
1309 pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1310 returns (int ret, string outbuf, string outs)
1311 """
1312 # NOTE(sileht): timeout is ignored because C API doesn't provide
1313 # timeout argument, but we keep it for backward compat with old python binding
1314 self.require_state("connected")
1315
1316 pgid = cstr(pgid, 'pgid')
1317 cmd = cstr_list(cmd, 'cmd')
1318 inbuf = cstr(inbuf, 'inbuf')
1319
1320 cdef:
1321 char *_pgid = pgid
1322 char **_cmd = to_bytes_array(cmd)
1323 size_t _cmdlen = len(cmd)
1324
1325 char *_inbuf = inbuf
1326 size_t _inbuf_len = len(inbuf)
1327
1328 char *_outbuf
1329 size_t _outbuf_len
1330 char *_outs
1331 size_t _outs_len
1332
1333 try:
1334 with nogil:
1335 ret = rados_pg_command(self.cluster, _pgid,
1336 <const char **>_cmd, _cmdlen,
1337 <const char *>_inbuf, _inbuf_len,
1338 &_outbuf, &_outbuf_len,
1339 &_outs, &_outs_len)
1340
1341 my_outs = decode_cstr(_outs[:_outs_len])
1342 my_outbuf = _outbuf[:_outbuf_len]
1343 if _outs_len:
1344 rados_buffer_free(_outs)
1345 if _outbuf_len:
1346 rados_buffer_free(_outbuf)
1347 return (ret, my_outbuf, my_outs)
1348 finally:
1349 free(_cmd)
1350
1351 def wait_for_latest_osdmap(self):
1352 self.require_state("connected")
1353 with nogil:
1354 ret = rados_wait_for_latest_osdmap(self.cluster)
1355 return ret
1356
1357 def blacklist_add(self, client_address, expire_seconds=0):
1358 """
1359 Blacklist a client from the OSDs
1360
1361 :param client_address: client address
1362 :type client_address: str
1363 :param expire_seconds: number of seconds to blacklist
1364 :type expire_seconds: int
1365
1366 :raises: :class:`Error`
1367 """
1368 self.require_state("connected")
1369 client_address = cstr(client_address, 'client_address')
1370 cdef:
1371 uint32_t _expire_seconds = expire_seconds
1372 char *_client_address = client_address
1373
1374 with nogil:
1375 ret = rados_blacklist_add(self.cluster, _client_address, _expire_seconds)
1376 if ret < 0:
1377 raise make_ex(ret, "error blacklisting client '%s'" % client_address)
1378
1379 def monitor_log(self, level, callback, arg):
1380 if level not in MONITOR_LEVELS:
1381 raise LogicError("invalid monitor level " + level)
1382 if callback is not None and not callable(callback):
1383 raise LogicError("callback must be a callable function or None")
1384
1385 level = cstr(level, 'level')
1386 cdef char *_level = level
1387
1388 if callback is None:
1389 with nogil:
1390 r = rados_monitor_log(self.cluster, <const char*>_level, NULL, NULL)
1391 self.monitor_callback = None
1392 return
1393
1394 cb = (callback, arg)
1395 cdef PyObject* _arg = <PyObject*>cb
1396 with nogil:
1397 r = rados_monitor_log(self.cluster, <const char*>_level,
1398 <rados_log_callback_t>&__monitor_callback, _arg)
1399
1400 if r:
1401 raise make_ex(r, 'error calling rados_monitor_log')
1402 # NOTE(sileht): Prevents the callback method from being garbage collected
1403 self.monitor_callback = cb
1404
1405
1406 cdef class OmapIterator(object):
1407 """Omap iterator"""
1408
1409 cdef public Ioctx ioctx
1410 cdef rados_omap_iter_t ctx
1411
1412 def __cinit__(self, Ioctx ioctx):
1413 self.ioctx = ioctx
1414
1415 def __iter__(self):
1416 return self
1417
1418 def __next__(self):
1419 """
1420 Get the next key-value pair in the object
1421 :returns: next rados.OmapItem
1422 """
1423 cdef:
1424 char *key_ = NULL
1425 char *val_ = NULL
1426 size_t len_
1427
1428 with nogil:
1429 ret = rados_omap_get_next(self.ctx, &key_, &val_, &len_)
1430
1431 if ret != 0:
1432 raise make_ex(ret, "error iterating over the omap")
1433 if key_ == NULL:
1434 raise StopIteration()
1435 key = decode_cstr(key_)
1436 val = None
1437 if val_ != NULL:
1438 val = val_[:len_]
1439 return (key, val)
1440
1441 def __dealloc__(self):
1442 with nogil:
1443 rados_omap_get_end(self.ctx)
1444
1445
1446 cdef class ObjectIterator(object):
1447 """rados.Ioctx Object iterator"""
1448
1449 cdef rados_list_ctx_t ctx
1450
1451 cdef public object ioctx
1452
1453 def __cinit__(self, Ioctx ioctx):
1454 self.ioctx = ioctx
1455
1456 with nogil:
1457 ret = rados_nobjects_list_open(ioctx.io, &self.ctx)
1458 if ret < 0:
1459 raise make_ex(ret, "error iterating over the objects in ioctx '%s'"
1460 % self.ioctx.name)
1461
1462 def __iter__(self):
1463 return self
1464
1465 def __next__(self):
1466 """
1467 Get the next object name and locator in the pool
1468
1469 :raises: StopIteration
1470 :returns: next rados.Ioctx Object
1471 """
1472 cdef:
1473 const char *key_ = NULL
1474 const char *locator_ = NULL
1475 const char *nspace_ = NULL
1476
1477 with nogil:
1478 ret = rados_nobjects_list_next(self.ctx, &key_, &locator_, &nspace_)
1479
1480 if ret < 0:
1481 raise StopIteration()
1482
1483 key = decode_cstr(key_)
1484 locator = decode_cstr(locator_) if locator_ != NULL else None
1485 nspace = decode_cstr(nspace_) if nspace_ != NULL else None
1486 return Object(self.ioctx, key, locator, nspace)
1487
1488 def __dealloc__(self):
1489 with nogil:
1490 rados_nobjects_list_close(self.ctx)
1491
1492
1493 cdef class XattrIterator(object):
1494 """Extended attribute iterator"""
1495
1496 cdef rados_xattrs_iter_t it
1497 cdef char* _oid
1498
1499 cdef public Ioctx ioctx
1500 cdef public object oid
1501
1502 def __cinit__(self, Ioctx ioctx, oid):
1503 self.ioctx = ioctx
1504 self.oid = cstr(oid, 'oid')
1505 self._oid = self.oid
1506
1507 with nogil:
1508 ret = rados_getxattrs(ioctx.io, self._oid, &self.it)
1509 if ret != 0:
1510 raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid)
1511
1512 def __iter__(self):
1513 return self
1514
1515 def __next__(self):
1516 """
1517 Get the next xattr on the object
1518
1519 :raises: StopIteration
1520 :returns: pair - of name and value of the next Xattr
1521 """
1522 cdef:
1523 const char *name_ = NULL
1524 const char *val_ = NULL
1525 size_t len_ = 0
1526
1527 with nogil:
1528 ret = rados_getxattrs_next(self.it, &name_, &val_, &len_)
1529 if ret != 0:
1530 raise make_ex(ret, "error iterating over the extended attributes \
1531 in '%s'" % self.oid)
1532 if name_ == NULL:
1533 raise StopIteration()
1534 name = decode_cstr(name_)
1535 val = val_[:len_]
1536 return (name, val)
1537
1538 def __dealloc__(self):
1539 with nogil:
1540 rados_getxattrs_end(self.it)
1541
1542
1543 cdef class SnapIterator(object):
1544 """Snapshot iterator"""
1545
1546 cdef public Ioctx ioctx
1547
1548 cdef rados_snap_t *snaps
1549 cdef int max_snap
1550 cdef int cur_snap
1551
1552 def __cinit__(self, Ioctx ioctx):
1553 self.ioctx = ioctx
1554 # We don't know how big a buffer we need until we've called the
1555 # function. So use the exponential doubling strategy.
1556 cdef int num_snaps = 10
1557 while True:
1558 self.snaps = <rados_snap_t*>realloc_chk(self.snaps,
1559 num_snaps *
1560 sizeof(rados_snap_t))
1561
1562 with nogil:
1563 ret = rados_ioctx_snap_list(ioctx.io, self.snaps, num_snaps)
1564 if ret >= 0:
1565 self.max_snap = ret
1566 break
1567 elif ret != -errno.ERANGE:
1568 raise make_ex(ret, "error calling rados_snap_list for \
1569 ioctx '%s'" % self.ioctx.name)
1570 num_snaps = num_snaps * 2
1571 self.cur_snap = 0
1572
1573 def __iter__(self):
1574 return self
1575
1576 def __next__(self):
1577 """
1578 Get the next Snapshot
1579
1580 :raises: :class:`Error`, StopIteration
1581 :returns: Snap - next snapshot
1582 """
1583 if self.cur_snap >= self.max_snap:
1584 raise StopIteration
1585
1586 cdef:
1587 rados_snap_t snap_id = self.snaps[self.cur_snap]
1588 int name_len = 10
1589 char *name = NULL
1590
1591 try:
1592 while True:
1593 name = <char *>realloc_chk(name, name_len)
1594 with nogil:
1595 ret = rados_ioctx_snap_get_name(self.ioctx.io, snap_id, name, name_len)
1596 if ret == 0:
1597 break
1598 elif ret != -errno.ERANGE:
1599 raise make_ex(ret, "rados_snap_get_name error")
1600 else:
1601 name_len = name_len * 2
1602
1603 snap = Snap(self.ioctx, decode_cstr(name[:name_len]).rstrip('\0'), snap_id)
1604 self.cur_snap = self.cur_snap + 1
1605 return snap
1606 finally:
1607 free(name)
1608
1609
1610 cdef class Snap(object):
1611 """Snapshot object"""
1612 cdef public Ioctx ioctx
1613 cdef public object name
1614
1615 # NOTE(sileht): old API was storing the ctypes object
1616 # instead of the value ....
1617 cdef public rados_snap_t snap_id
1618
1619 def __cinit__(self, Ioctx ioctx, object name, rados_snap_t snap_id):
1620 self.ioctx = ioctx
1621 self.name = name
1622 self.snap_id = snap_id
1623
1624 def __str__(self):
1625 return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \
1626 % (str(self.ioctx), self.name, self.snap_id)
1627
1628 def get_timestamp(self):
1629 """
1630 Find when a snapshot in the current pool occurred
1631
1632 :raises: :class:`Error`
1633 :returns: datetime - the data and time the snapshot was created
1634 """
1635 cdef time_t snap_time
1636
1637 with nogil:
1638 ret = rados_ioctx_snap_get_stamp(self.ioctx.io, self.snap_id, &snap_time)
1639 if ret != 0:
1640 raise make_ex(ret, "rados_ioctx_snap_get_stamp error")
1641 return datetime.fromtimestamp(snap_time)
1642
1643
1644 cdef class Completion(object):
1645 """completion object"""
1646
1647 cdef public:
1648 Ioctx ioctx
1649 object oncomplete
1650 object onsafe
1651
1652 cdef:
1653 rados_callback_t complete_cb
1654 rados_callback_t safe_cb
1655 rados_completion_t rados_comp
1656 PyObject* buf
1657
1658 def __cinit__(self, Ioctx ioctx, object oncomplete, object onsafe):
1659 self.oncomplete = oncomplete
1660 self.onsafe = onsafe
1661 self.ioctx = ioctx
1662
1663 def is_safe(self):
1664 """
1665 Is an asynchronous operation safe?
1666
1667 This does not imply that the safe callback has finished.
1668
1669 :returns: True if the operation is safe
1670 """
1671 with nogil:
1672 ret = rados_aio_is_safe(self.rados_comp)
1673 return ret == 1
1674
1675 def is_complete(self):
1676 """
1677 Has an asynchronous operation completed?
1678
1679 This does not imply that the safe callback has finished.
1680
1681 :returns: True if the operation is completed
1682 """
1683 with nogil:
1684 ret = rados_aio_is_complete(self.rados_comp)
1685 return ret == 1
1686
1687 def wait_for_safe(self):
1688 """
1689 Wait for an asynchronous operation to be marked safe
1690
1691 This does not imply that the safe callback has finished.
1692 """
1693 with nogil:
1694 rados_aio_wait_for_safe(self.rados_comp)
1695
1696 def wait_for_complete(self):
1697 """
1698 Wait for an asynchronous operation to complete
1699
1700 This does not imply that the complete callback has finished.
1701 """
1702 with nogil:
1703 rados_aio_wait_for_complete(self.rados_comp)
1704
1705 def wait_for_safe_and_cb(self):
1706 """
1707 Wait for an asynchronous operation to be marked safe and for
1708 the safe callback to have returned
1709 """
1710 with nogil:
1711 rados_aio_wait_for_safe_and_cb(self.rados_comp)
1712
1713 def wait_for_complete_and_cb(self):
1714 """
1715 Wait for an asynchronous operation to complete and for the
1716 complete callback to have returned
1717
1718 :returns: whether the operation is completed
1719 """
1720 with nogil:
1721 ret = rados_aio_wait_for_complete_and_cb(self.rados_comp)
1722 return ret
1723
1724 def get_return_value(self):
1725 """
1726 Get the return value of an asychronous operation
1727
1728 The return value is set when the operation is complete or safe,
1729 whichever comes first.
1730
1731 :returns: int - return value of the operation
1732 """
1733 with nogil:
1734 ret = rados_aio_get_return_value(self.rados_comp)
1735 return ret
1736
1737 def __dealloc__(self):
1738 """
1739 Release a completion
1740
1741 Call this when you no longer need the completion. It may not be
1742 freed immediately if the operation is not acked and committed.
1743 """
1744 ref.Py_XDECREF(self.buf)
1745 self.buf = NULL
1746 if self.rados_comp != NULL:
1747 with nogil:
1748 rados_aio_release(self.rados_comp)
1749 self.rados_comp = NULL
1750
1751 def _complete(self):
1752 self.oncomplete(self)
1753 with self.ioctx.lock:
1754 if self.oncomplete:
1755 self.ioctx.complete_completions.remove(self)
1756
1757 def _safe(self):
1758 self.onsafe(self)
1759 with self.ioctx.lock:
1760 if self.onsafe:
1761 self.ioctx.safe_completions.remove(self)
1762
1763 def _cleanup(self):
1764 with self.ioctx.lock:
1765 if self.oncomplete:
1766 self.ioctx.complete_completions.remove(self)
1767 if self.onsafe:
1768 self.ioctx.safe_completions.remove(self)
1769
1770
1771 class OpCtx(object):
1772 def __enter__(self):
1773 return self.create()
1774
1775 def __exit__(self, type, msg, traceback):
1776 self.release()
1777
1778
1779 cdef class WriteOp(object):
1780 cdef rados_write_op_t write_op
1781
1782 def create(self):
1783 with nogil:
1784 self.write_op = rados_create_write_op()
1785 return self
1786
1787 def release(self):
1788 with nogil:
1789 rados_release_write_op(self.write_op)
1790
1791 @requires(('exclusive', opt(int)))
1792 def new(self, exclusive=None):
1793 """
1794 Create the object.
1795 """
1796
1797 cdef:
1798 int _exclusive = exclusive
1799
1800 with nogil:
1801 rados_write_op_create(self.write_op, _exclusive, NULL)
1802
1803
1804 def remove(self):
1805 """
1806 Remove object.
1807 """
1808 with nogil:
1809 rados_write_op_remove(self.write_op)
1810
1811 @requires(('flags', int))
1812 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
1813 """
1814 Set flags for the last operation added to this write_op.
1815 :para flags: flags to apply to the last operation
1816 :type flags: int
1817 """
1818
1819 cdef:
1820 int _flags = flags
1821
1822 with nogil:
1823 rados_write_op_set_flags(self.write_op, _flags)
1824
1825 @requires(('to_write', bytes))
1826 def append(self, to_write):
1827 """
1828 Append data to an object synchronously
1829 :param to_write: data to write
1830 :type to_write: bytes
1831 """
1832
1833 cdef:
1834 char *_to_write = to_write
1835 size_t length = len(to_write)
1836
1837 with nogil:
1838 rados_write_op_append(self.write_op, _to_write, length)
1839
1840 @requires(('to_write', bytes))
1841 def write_full(self, to_write):
1842 """
1843 Write whole object, atomically replacing it.
1844 :param to_write: data to write
1845 :type to_write: bytes
1846 """
1847
1848 cdef:
1849 char *_to_write = to_write
1850 size_t length = len(to_write)
1851
1852 with nogil:
1853 rados_write_op_write_full(self.write_op, _to_write, length)
1854
1855 @requires(('to_write', bytes), ('offset', int))
1856 def write(self, to_write, offset=0):
1857 """
1858 Write to offset.
1859 :param to_write: data to write
1860 :type to_write: bytes
1861 :param offset: byte offset in the object to begin writing at
1862 :type offset: int
1863 """
1864
1865 cdef:
1866 char *_to_write = to_write
1867 size_t length = len(to_write)
1868 uint64_t _offset = offset
1869
1870 with nogil:
1871 rados_write_op_write(self.write_op, _to_write, length, _offset)
1872
1873 @requires(('offset', int), ('length', int))
1874 def zero(self, offset, length):
1875 """
1876 Zero part of an object.
1877 :param offset: byte offset in the object to begin writing at
1878 :type offset: int
1879 :param offset: number of zero to write
1880 :type offset: int
1881 """
1882
1883 cdef:
1884 size_t _length = length
1885 uint64_t _offset = offset
1886
1887 with nogil:
1888 rados_write_op_zero(self.write_op, _length, _offset)
1889
1890 @requires(('offset', int))
1891 def truncate(self, offset):
1892 """
1893 Truncate an object.
1894 :param offset: byte offset in the object to begin truncating at
1895 :type offset: int
1896 """
1897
1898 cdef:
1899 uint64_t _offset = offset
1900
1901 with nogil:
1902 rados_write_op_truncate(self.write_op, _offset)
1903
1904
1905 class WriteOpCtx(WriteOp, OpCtx):
1906 """write operation context manager"""
1907
1908
1909 cdef class ReadOp(object):
1910 cdef rados_read_op_t read_op
1911
1912 def create(self):
1913 with nogil:
1914 self.read_op = rados_create_read_op()
1915 return self
1916
1917 def release(self):
1918 with nogil:
1919 rados_release_read_op(self.read_op)
1920
1921 @requires(('flags', int))
1922 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
1923 """
1924 Set flags for the last operation added to this read_op.
1925 :para flags: flags to apply to the last operation
1926 :type flags: int
1927 """
1928
1929 cdef:
1930 int _flags = flags
1931
1932 with nogil:
1933 rados_read_op_set_flags(self.read_op, _flags)
1934
1935
1936 class ReadOpCtx(ReadOp, OpCtx):
1937 """read operation context manager"""
1938
1939
1940 cdef int __aio_safe_cb(rados_completion_t completion, void *args) with gil:
1941 """
1942 Callback to onsafe() for asynchronous operations
1943 """
1944 cdef object cb = <object>args
1945 cb._safe()
1946 return 0
1947
1948
1949 cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
1950 """
1951 Callback to oncomplete() for asynchronous operations
1952 """
1953 cdef object cb = <object>args
1954 cb._complete()
1955 return 0
1956
1957
1958 cdef class Ioctx(object):
1959 """rados.Ioctx object"""
1960 # NOTE(sileht): attributes declared in .pyd
1961
1962 def __init__(self, name):
1963 self.name = name
1964 self.state = "open"
1965
1966 self.locator_key = ""
1967 self.nspace = ""
1968 self.lock = threading.Lock()
1969 self.safe_completions = []
1970 self.complete_completions = []
1971
1972 def __enter__(self):
1973 return self
1974
1975 def __exit__(self, type_, value, traceback):
1976 self.close()
1977 return False
1978
1979 def __dealloc__(self):
1980 self.close()
1981
1982 def __track_completion(self, completion_obj):
1983 if completion_obj.oncomplete:
1984 with self.lock:
1985 self.complete_completions.append(completion_obj)
1986 if completion_obj.onsafe:
1987 with self.lock:
1988 self.safe_completions.append(completion_obj)
1989
1990 def __get_completion(self, oncomplete, onsafe):
1991 """
1992 Constructs a completion to use with asynchronous operations
1993
1994 :param oncomplete: what to do when the write is safe and complete in memory
1995 on all replicas
1996 :type oncomplete: completion
1997 :param onsafe: what to do when the write is safe and complete on storage
1998 on all replicas
1999 :type onsafe: completion
2000
2001 :raises: :class:`Error`
2002 :returns: completion object
2003 """
2004
2005 completion_obj = Completion(self, oncomplete, onsafe)
2006
2007 cdef:
2008 rados_callback_t complete_cb = NULL
2009 rados_callback_t safe_cb = NULL
2010 rados_completion_t completion
2011 PyObject* p_completion_obj= <PyObject*>completion_obj
2012
2013 if oncomplete:
2014 complete_cb = <rados_callback_t>&__aio_complete_cb
2015 if onsafe:
2016 safe_cb = <rados_callback_t>&__aio_safe_cb
2017
2018 with nogil:
2019 ret = rados_aio_create_completion(p_completion_obj, complete_cb, safe_cb,
2020 &completion)
2021 if ret < 0:
2022 raise make_ex(ret, "error getting a completion")
2023
2024 completion_obj.rados_comp = completion
2025 return completion_obj
2026
2027 @requires(('object_name', str_type), ('oncomplete', opt(Callable)))
2028 def aio_stat(self, object_name, oncomplete):
2029 """
2030 Asynchronously get object stats (size/mtime)
2031
2032 oncomplete will be called with the returned size and mtime
2033 as well as the completion:
2034
2035 oncomplete(completion, size, mtime)
2036
2037 :param object_name: the name of the object to get stats from
2038 :type object_name: str
2039 :param oncomplete: what to do when the stat is complete
2040 :type oncomplete: completion
2041
2042 :raises: :class:`Error`
2043 :returns: completion object
2044 """
2045
2046 object_name = cstr(object_name, 'object_name')
2047
2048 cdef:
2049 Completion completion
2050 char *_object_name = object_name
2051 uint64_t psize
2052 time_t pmtime
2053
2054 def oncomplete_(completion_v):
2055 cdef Completion _completion_v = completion_v
2056 return_value = _completion_v.get_return_value()
2057 if return_value >= 0:
2058 return oncomplete(_completion_v, psize, time.localtime(pmtime))
2059 else:
2060 return oncomplete(_completion_v, None, None)
2061
2062 completion = self.__get_completion(oncomplete_, None)
2063 self.__track_completion(completion)
2064 with nogil:
2065 ret = rados_aio_stat(self.io, _object_name, completion.rados_comp,
2066 &psize, &pmtime)
2067
2068 if ret < 0:
2069 completion._cleanup()
2070 raise make_ex(ret, "error stating %s" % object_name)
2071 return completion
2072
2073 @requires(('object_name', str_type), ('to_write', bytes), ('offset', int),
2074 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2075 def aio_write(self, object_name, to_write, offset=0,
2076 oncomplete=None, onsafe=None):
2077 """
2078 Write data to an object asynchronously
2079
2080 Queues the write and returns.
2081
2082 :param object_name: name of the object
2083 :type object_name: str
2084 :param to_write: data to write
2085 :type to_write: bytes
2086 :param offset: byte offset in the object to begin writing at
2087 :type offset: int
2088 :param oncomplete: what to do when the write is safe and complete in memory
2089 on all replicas
2090 :type oncomplete: completion
2091 :param onsafe: what to do when the write is safe and complete on storage
2092 on all replicas
2093 :type onsafe: completion
2094
2095 :raises: :class:`Error`
2096 :returns: completion object
2097 """
2098
2099 object_name = cstr(object_name, 'object_name')
2100
2101 cdef:
2102 Completion completion
2103 char* _object_name = object_name
2104 char* _to_write = to_write
2105 size_t size = len(to_write)
2106 uint64_t _offset = offset
2107
2108 completion = self.__get_completion(oncomplete, onsafe)
2109 self.__track_completion(completion)
2110 with nogil:
2111 ret = rados_aio_write(self.io, _object_name, completion.rados_comp,
2112 _to_write, size, _offset)
2113 if ret < 0:
2114 completion._cleanup()
2115 raise make_ex(ret, "error writing object %s" % object_name)
2116 return completion
2117
2118 @requires(('object_name', str_type), ('to_write', bytes), ('oncomplete', opt(Callable)),
2119 ('onsafe', opt(Callable)))
2120 def aio_write_full(self, object_name, to_write,
2121 oncomplete=None, onsafe=None):
2122 """
2123 Asychronously write an entire object
2124
2125 The object is filled with the provided data. If the object exists,
2126 it is atomically truncated and then written.
2127 Queues the write and returns.
2128
2129 :param object_name: name of the object
2130 :type object_name: str
2131 :param to_write: data to write
2132 :type to_write: str
2133 :param oncomplete: what to do when the write is safe and complete in memory
2134 on all replicas
2135 :type oncomplete: completion
2136 :param onsafe: what to do when the write is safe and complete on storage
2137 on all replicas
2138 :type onsafe: completion
2139
2140 :raises: :class:`Error`
2141 :returns: completion object
2142 """
2143
2144 object_name = cstr(object_name, 'object_name')
2145
2146 cdef:
2147 Completion completion
2148 char* _object_name = object_name
2149 char* _to_write = to_write
2150 size_t size = len(to_write)
2151
2152 completion = self.__get_completion(oncomplete, onsafe)
2153 self.__track_completion(completion)
2154 with nogil:
2155 ret = rados_aio_write_full(self.io, _object_name,
2156 completion.rados_comp,
2157 _to_write, size)
2158 if ret < 0:
2159 completion._cleanup()
2160 raise make_ex(ret, "error writing object %s" % object_name)
2161 return completion
2162
2163 @requires(('object_name', str_type), ('to_append', bytes), ('oncomplete', opt(Callable)),
2164 ('onsafe', opt(Callable)))
2165 def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):
2166 """
2167 Asychronously append data to an object
2168
2169 Queues the write and returns.
2170
2171 :param object_name: name of the object
2172 :type object_name: str
2173 :param to_append: data to append
2174 :type to_append: str
2175 :param offset: byte offset in the object to begin writing at
2176 :type offset: int
2177 :param oncomplete: what to do when the write is safe and complete in memory
2178 on all replicas
2179 :type oncomplete: completion
2180 :param onsafe: what to do when the write is safe and complete on storage
2181 on all replicas
2182 :type onsafe: completion
2183
2184 :raises: :class:`Error`
2185 :returns: completion object
2186 """
2187 object_name = cstr(object_name, 'object_name')
2188
2189 cdef:
2190 Completion completion
2191 char* _object_name = object_name
2192 char* _to_append = to_append
2193 size_t size = len(to_append)
2194
2195 completion = self.__get_completion(oncomplete, onsafe)
2196 self.__track_completion(completion)
2197 with nogil:
2198 ret = rados_aio_append(self.io, _object_name,
2199 completion.rados_comp,
2200 _to_append, size)
2201 if ret < 0:
2202 completion._cleanup()
2203 raise make_ex(ret, "error appending object %s" % object_name)
2204 return completion
2205
2206 def aio_flush(self):
2207 """
2208 Block until all pending writes in an io context are safe
2209
2210 :raises: :class:`Error`
2211 """
2212 with nogil:
2213 ret = rados_aio_flush(self.io)
2214 if ret < 0:
2215 raise make_ex(ret, "error flushing")
2216
2217 @requires(('object_name', str_type), ('length', int), ('offset', int),
2218 ('oncomplete', opt(Callable)))
2219 def aio_read(self, object_name, length, offset, oncomplete):
2220 """
2221 Asychronously read data from an object
2222
2223 oncomplete will be called with the returned read value as
2224 well as the completion:
2225
2226 oncomplete(completion, data_read)
2227
2228 :param object_name: name of the object to read from
2229 :type object_name: str
2230 :param length: the number of bytes to read
2231 :type length: int
2232 :param offset: byte offset in the object to begin reading from
2233 :type offset: int
2234 :param oncomplete: what to do when the read is complete
2235 :type oncomplete: completion
2236
2237 :raises: :class:`Error`
2238 :returns: completion object
2239 """
2240
2241 object_name = cstr(object_name, 'object_name')
2242
2243 cdef:
2244 Completion completion
2245 char* _object_name = object_name
2246 uint64_t _offset = offset
2247
2248 char *ref_buf
2249 size_t _length = length
2250
2251 def oncomplete_(completion_v):
2252 cdef Completion _completion_v = completion_v
2253 return_value = _completion_v.get_return_value()
2254 if return_value > 0 and return_value != length:
2255 _PyBytes_Resize(&_completion_v.buf, return_value)
2256 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2257
2258 completion = self.__get_completion(oncomplete_, None)
2259 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2260 ret_buf = PyBytes_AsString(completion.buf)
2261 self.__track_completion(completion)
2262 with nogil:
2263 ret = rados_aio_read(self.io, _object_name, completion.rados_comp,
2264 ret_buf, _length, _offset)
2265 if ret < 0:
2266 completion._cleanup()
2267 raise make_ex(ret, "error reading %s" % object_name)
2268 return completion
2269
2270 @requires(('object_name', str_type), ('cls', str_type), ('method', str_type),
2271 ('data', bytes), ('length', int),
2272 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2273 def aio_execute(self, object_name, cls, method, data,
2274 length=8192, oncomplete=None, onsafe=None):
2275 """
2276 Asynchronously execute an OSD class method on an object.
2277
2278 oncomplete and onsafe will be called with the data returned from
2279 the plugin as well as the completion:
2280
2281 oncomplete(completion, data)
2282 onsafe(completion, data)
2283
2284 :param object_name: name of the object
2285 :type object_name: str
2286 :param cls: name of the object class
2287 :type cls: str
2288 :param method: name of the method
2289 :type method: str
2290 :param data: input data
2291 :type data: bytes
2292 :param length: size of output buffer in bytes (default=8192)
2293 :type length: int
2294 :param oncomplete: what to do when the execution is complete
2295 :type oncomplete: completion
2296 :param onsafe: what to do when the execution is safe and complete
2297 :type onsafe: completion
2298
2299 :raises: :class:`Error`
2300 :returns: completion object
2301 """
2302
2303 object_name = cstr(object_name, 'object_name')
2304 cls = cstr(cls, 'cls')
2305 method = cstr(method, 'method')
2306 cdef:
2307 Completion completion
2308 char *_object_name = object_name
2309 char *_cls = cls
2310 char *_method = method
2311 char *_data = data
2312 size_t _data_len = len(data)
2313
2314 char *ref_buf
2315 size_t _length = length
2316
2317 def oncomplete_(completion_v):
2318 cdef Completion _completion_v = completion_v
2319 return_value = _completion_v.get_return_value()
2320 if return_value > 0 and return_value != length:
2321 _PyBytes_Resize(&_completion_v.buf, return_value)
2322 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2323
2324 def onsafe_(completion_v):
2325 cdef Completion _completion_v = completion_v
2326 return_value = _completion_v.get_return_value()
2327 return onsafe(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2328
2329 completion = self.__get_completion(oncomplete_ if oncomplete else None, onsafe_ if onsafe else None)
2330 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2331 ret_buf = PyBytes_AsString(completion.buf)
2332 self.__track_completion(completion)
2333 with nogil:
2334 ret = rados_aio_exec(self.io, _object_name, completion.rados_comp,
2335 _cls, _method, _data, _data_len, ret_buf, _length)
2336 if ret < 0:
2337 completion._cleanup()
2338 raise make_ex(ret, "error executing %s::%s on %s" % (cls, method, object_name))
2339 return completion
2340
2341 @requires(('object_name', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2342 def aio_remove(self, object_name, oncomplete=None, onsafe=None):
2343 """
2344 Asychronously remove an object
2345
2346 :param object_name: name of the object to remove
2347 :type object_name: str
2348 :param oncomplete: what to do when the remove is safe and complete in memory
2349 on all replicas
2350 :type oncomplete: completion
2351 :param onsafe: what to do when the remove is safe and complete on storage
2352 on all replicas
2353 :type onsafe: completion
2354
2355 :raises: :class:`Error`
2356 :returns: completion object
2357 """
2358 object_name = cstr(object_name, 'object_name')
2359
2360 cdef:
2361 Completion completion
2362 char* _object_name = object_name
2363
2364 completion = self.__get_completion(oncomplete, onsafe)
2365 self.__track_completion(completion)
2366 with nogil:
2367 ret = rados_aio_remove(self.io, _object_name,
2368 completion.rados_comp)
2369 if ret < 0:
2370 completion._cleanup()
2371 raise make_ex(ret, "error removing %s" % object_name)
2372 return completion
2373
2374 def require_ioctx_open(self):
2375 """
2376 Checks if the rados.Ioctx object state is 'open'
2377
2378 :raises: IoctxStateError
2379 """
2380 if self.state != "open":
2381 raise IoctxStateError("The pool is %s" % self.state)
2382
2383 def change_auid(self, auid):
2384 """
2385 Attempt to change an io context's associated auid "owner."
2386
2387 Requires that you have write permission on both the current and new
2388 auid.
2389
2390 :raises: :class:`Error`
2391 """
2392 self.require_ioctx_open()
2393
2394 cdef:
2395 uint64_t _auid = auid
2396
2397 with nogil:
2398 ret = rados_ioctx_pool_set_auid(self.io, _auid)
2399 if ret < 0:
2400 raise make_ex(ret, "error changing auid of '%s' to %d"
2401 % (self.name, auid))
2402
2403 @requires(('loc_key', str_type))
2404 def set_locator_key(self, loc_key):
2405 """
2406 Set the key for mapping objects to pgs within an io context.
2407
2408 The key is used instead of the object name to determine which
2409 placement groups an object is put in. This affects all subsequent
2410 operations of the io context - until a different locator key is
2411 set, all objects in this io context will be placed in the same pg.
2412
2413 :param loc_key: the key to use as the object locator, or NULL to discard
2414 any previously set key
2415 :type loc_key: str
2416
2417 :raises: :class:`TypeError`
2418 """
2419 self.require_ioctx_open()
2420 cloc_key = cstr(loc_key, 'loc_key')
2421 cdef char *_loc_key = cloc_key
2422 with nogil:
2423 rados_ioctx_locator_set_key(self.io, _loc_key)
2424 self.locator_key = loc_key
2425
2426 def get_locator_key(self):
2427 """
2428 Get the locator_key of context
2429
2430 :returns: locator_key
2431 """
2432 return self.locator_key
2433
2434 @requires(('snap_id', long))
2435 def set_read(self, snap_id):
2436 """
2437 Set the snapshot for reading objects.
2438
2439 To stop to read from snapshot, use set_read(LIBRADOS_SNAP_HEAD)
2440
2441 :param snap_id: the snapshot Id
2442 :type snap_id: int
2443
2444 :raises: :class:`TypeError`
2445 """
2446 self.require_ioctx_open()
2447 cdef rados_snap_t _snap_id = snap_id
2448 with nogil:
2449 rados_ioctx_snap_set_read(self.io, _snap_id)
2450
2451 @requires(('nspace', str_type))
2452 def set_namespace(self, nspace):
2453 """
2454 Set the namespace for objects within an io context.
2455
2456 The namespace in addition to the object name fully identifies
2457 an object. This affects all subsequent operations of the io context
2458 - until a different namespace is set, all objects in this io context
2459 will be placed in the same namespace.
2460
2461 :param nspace: the namespace to use, or None/"" for the default namespace
2462 :type nspace: str
2463
2464 :raises: :class:`TypeError`
2465 """
2466 self.require_ioctx_open()
2467 if nspace is None:
2468 nspace = ""
2469 cnspace = cstr(nspace, 'nspace')
2470 cdef char *_nspace = cnspace
2471 with nogil:
2472 rados_ioctx_set_namespace(self.io, _nspace)
2473 self.nspace = nspace
2474
2475 def get_namespace(self):
2476 """
2477 Get the namespace of context
2478
2479 :returns: namespace
2480 """
2481 return self.nspace
2482
2483 def close(self):
2484 """
2485 Close a rados.Ioctx object.
2486
2487 This just tells librados that you no longer need to use the io context.
2488 It may not be freed immediately if there are pending asynchronous
2489 requests on it, but you should not use an io context again after
2490 calling this function on it.
2491 """
2492 if self.state == "open":
2493 self.require_ioctx_open()
2494 with nogil:
2495 rados_ioctx_destroy(self.io)
2496 self.state = "closed"
2497
2498
2499 @requires(('key', str_type), ('data', bytes))
2500 def write(self, key, data, offset=0):
2501 """
2502 Write data to an object synchronously
2503
2504 :param key: name of the object
2505 :type key: str
2506 :param data: data to write
2507 :type data: bytes
2508 :param offset: byte offset in the object to begin writing at
2509 :type offset: int
2510
2511 :raises: :class:`TypeError`
2512 :raises: :class:`LogicError`
2513 :returns: int - 0 on success
2514 """
2515 self.require_ioctx_open()
2516
2517 key = cstr(key, 'key')
2518 cdef:
2519 char *_key = key
2520 char *_data = data
2521 size_t length = len(data)
2522 uint64_t _offset = offset
2523
2524 with nogil:
2525 ret = rados_write(self.io, _key, _data, length, _offset)
2526 if ret == 0:
2527 return ret
2528 elif ret < 0:
2529 raise make_ex(ret, "Ioctx.write(%s): failed to write %s"
2530 % (self.name, key))
2531 else:
2532 raise LogicError("Ioctx.write(%s): rados_write \
2533 returned %d, but should return zero on success." % (self.name, ret))
2534
2535 @requires(('key', str_type), ('data', bytes))
2536 def write_full(self, key, data):
2537 """
2538 Write an entire object synchronously.
2539
2540 The object is filled with the provided data. If the object exists,
2541 it is atomically truncated and then written.
2542
2543 :param key: name of the object
2544 :type key: str
2545 :param data: data to write
2546 :type data: bytes
2547
2548 :raises: :class:`TypeError`
2549 :raises: :class:`Error`
2550 :returns: int - 0 on success
2551 """
2552 self.require_ioctx_open()
2553 key = cstr(key, 'key')
2554 cdef:
2555 char *_key = key
2556 char *_data = data
2557 size_t length = len(data)
2558
2559 with nogil:
2560 ret = rados_write_full(self.io, _key, _data, length)
2561 if ret == 0:
2562 return ret
2563 elif ret < 0:
2564 raise make_ex(ret, "Ioctx.write_full(%s): failed to write %s"
2565 % (self.name, key))
2566 else:
2567 raise LogicError("Ioctx.write_full(%s): rados_write_full \
2568 returned %d, but should return zero on success." % (self.name, ret))
2569
2570 @requires(('key', str_type), ('data', bytes))
2571 def append(self, key, data):
2572 """
2573 Append data to an object synchronously
2574
2575 :param key: name of the object
2576 :type key: str
2577 :param data: data to write
2578 :type data: bytes
2579
2580 :raises: :class:`TypeError`
2581 :raises: :class:`LogicError`
2582 :returns: int - 0 on success
2583 """
2584 self.require_ioctx_open()
2585 key = cstr(key, 'key')
2586 cdef:
2587 char *_key = key
2588 char *_data = data
2589 size_t length = len(data)
2590
2591 with nogil:
2592 ret = rados_append(self.io, _key, _data, length)
2593 if ret == 0:
2594 return ret
2595 elif ret < 0:
2596 raise make_ex(ret, "Ioctx.append(%s): failed to append %s"
2597 % (self.name, key))
2598 else:
2599 raise LogicError("Ioctx.append(%s): rados_append \
2600 returned %d, but should return zero on success." % (self.name, ret))
2601
2602 @requires(('key', str_type))
2603 def read(self, key, length=8192, offset=0):
2604 """
2605 Read data from an object synchronously
2606
2607 :param key: name of the object
2608 :type key: str
2609 :param length: the number of bytes to read (default=8192)
2610 :type length: int
2611 :param offset: byte offset in the object to begin reading at
2612 :type offset: int
2613
2614 :raises: :class:`TypeError`
2615 :raises: :class:`Error`
2616 :returns: str - data read from object
2617 """
2618 self.require_ioctx_open()
2619 key = cstr(key, 'key')
2620 cdef:
2621 char *_key = key
2622 char *ret_buf
2623 uint64_t _offset = offset
2624 size_t _length = length
2625 PyObject* ret_s = NULL
2626
2627 ret_s = PyBytes_FromStringAndSize(NULL, length)
2628 try:
2629 ret_buf = PyBytes_AsString(ret_s)
2630 with nogil:
2631 ret = rados_read(self.io, _key, ret_buf, _length, _offset)
2632 if ret < 0:
2633 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2634
2635 if ret != length:
2636 _PyBytes_Resize(&ret_s, ret)
2637
2638 return <object>ret_s
2639 finally:
2640 # We DECREF unconditionally: the cast to object above will have
2641 # INCREFed if necessary. This also takes care of exceptions,
2642 # including if _PyString_Resize fails (that will free the string
2643 # itself and set ret_s to NULL, hence XDECREF).
2644 ref.Py_XDECREF(ret_s)
2645
2646 @requires(('key', str_type), ('cls', str_type), ('method', str_type), ('data', bytes))
2647 def execute(self, key, cls, method, data, length=8192):
2648 """
2649 Execute an OSD class method on an object.
2650
2651 :param key: name of the object
2652 :type key: str
2653 :param cls: name of the object class
2654 :type cls: str
2655 :param method: name of the method
2656 :type method: str
2657 :param data: input data
2658 :type data: bytes
2659 :param length: size of output buffer in bytes (default=8192)
2660 :type length: int
2661
2662 :raises: :class:`TypeError`
2663 :raises: :class:`Error`
2664 :returns: (ret, method output)
2665 """
2666 self.require_ioctx_open()
2667
2668 key = cstr(key, 'key')
2669 cls = cstr(cls, 'cls')
2670 method = cstr(method, 'method')
2671 cdef:
2672 char *_key = key
2673 char *_cls = cls
2674 char *_method = method
2675 char *_data = data
2676 size_t _data_len = len(data)
2677
2678 char *ref_buf
2679 size_t _length = length
2680 PyObject* ret_s = NULL
2681
2682 ret_s = PyBytes_FromStringAndSize(NULL, length)
2683 try:
2684 ret_buf = PyBytes_AsString(ret_s)
2685 with nogil:
2686 ret = rados_exec(self.io, _key, _cls, _method, _data,
2687 _data_len, ret_buf, _length)
2688 if ret < 0:
2689 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2690
2691 if ret != length:
2692 _PyBytes_Resize(&ret_s, ret)
2693
2694 return ret, <object>ret_s
2695 finally:
2696 # We DECREF unconditionally: the cast to object above will have
2697 # INCREFed if necessary. This also takes care of exceptions,
2698 # including if _PyString_Resize fails (that will free the string
2699 # itself and set ret_s to NULL, hence XDECREF).
2700 ref.Py_XDECREF(ret_s)
2701
2702 def get_stats(self):
2703 """
2704 Get pool usage statistics
2705
2706 :returns: dict - contains the following keys:
2707
2708 - ``num_bytes`` (int) - size of pool in bytes
2709
2710 - ``num_kb`` (int) - size of pool in kbytes
2711
2712 - ``num_objects`` (int) - number of objects in the pool
2713
2714 - ``num_object_clones`` (int) - number of object clones
2715
2716 - ``num_object_copies`` (int) - number of object copies
2717
2718 - ``num_objects_missing_on_primary`` (int) - number of objets
2719 missing on primary
2720
2721 - ``num_objects_unfound`` (int) - number of unfound objects
2722
2723 - ``num_objects_degraded`` (int) - number of degraded objects
2724
2725 - ``num_rd`` (int) - bytes read
2726
2727 - ``num_rd_kb`` (int) - kbytes read
2728
2729 - ``num_wr`` (int) - bytes written
2730
2731 - ``num_wr_kb`` (int) - kbytes written
2732 """
2733 self.require_ioctx_open()
2734 cdef rados_pool_stat_t stats
2735 with nogil:
2736 ret = rados_ioctx_pool_stat(self.io, &stats)
2737 if ret < 0:
2738 raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name)
2739 return {'num_bytes': stats.num_bytes,
2740 'num_kb': stats.num_kb,
2741 'num_objects': stats.num_objects,
2742 'num_object_clones': stats.num_object_clones,
2743 'num_object_copies': stats.num_object_copies,
2744 "num_objects_missing_on_primary": stats.num_objects_missing_on_primary,
2745 "num_objects_unfound": stats.num_objects_unfound,
2746 "num_objects_degraded": stats.num_objects_degraded,
2747 "num_rd": stats.num_rd,
2748 "num_rd_kb": stats.num_rd_kb,
2749 "num_wr": stats.num_wr,
2750 "num_wr_kb": stats.num_wr_kb}
2751
2752 @requires(('key', str_type))
2753 def remove_object(self, key):
2754 """
2755 Delete an object
2756
2757 This does not delete any snapshots of the object.
2758
2759 :param key: the name of the object to delete
2760 :type key: str
2761
2762 :raises: :class:`TypeError`
2763 :raises: :class:`Error`
2764 :returns: bool - True on success
2765 """
2766 self.require_ioctx_open()
2767 key = cstr(key, 'key')
2768 cdef:
2769 char *_key = key
2770
2771 with nogil:
2772 ret = rados_remove(self.io, _key)
2773 if ret < 0:
2774 raise make_ex(ret, "Failed to remove '%s'" % key)
2775 return True
2776
2777 @requires(('key', str_type))
2778 def trunc(self, key, size):
2779 """
2780 Resize an object
2781
2782 If this enlarges the object, the new area is logically filled with
2783 zeroes. If this shrinks the object, the excess data is removed.
2784
2785 :param key: the name of the object to resize
2786 :type key: str
2787 :param size: the new size of the object in bytes
2788 :type size: int
2789
2790 :raises: :class:`TypeError`
2791 :raises: :class:`Error`
2792 :returns: int - 0 on success, otherwise raises error
2793 """
2794
2795 self.require_ioctx_open()
2796 key = cstr(key, 'key')
2797 cdef:
2798 char *_key = key
2799 uint64_t _size = size
2800
2801 with nogil:
2802 ret = rados_trunc(self.io, _key, _size)
2803 if ret < 0:
2804 raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))
2805 return ret
2806
2807 @requires(('key', str_type))
2808 def stat(self, key):
2809 """
2810 Get object stats (size/mtime)
2811
2812 :param key: the name of the object to get stats from
2813 :type key: str
2814
2815 :raises: :class:`TypeError`
2816 :raises: :class:`Error`
2817 :returns: (size,timestamp)
2818 """
2819 self.require_ioctx_open()
2820
2821 key = cstr(key, 'key')
2822 cdef:
2823 char *_key = key
2824 uint64_t psize
2825 time_t pmtime
2826
2827 with nogil:
2828 ret = rados_stat(self.io, _key, &psize, &pmtime)
2829 if ret < 0:
2830 raise make_ex(ret, "Failed to stat %r" % key)
2831 return psize, time.localtime(pmtime)
2832
2833 @requires(('key', str_type), ('xattr_name', str_type))
2834 def get_xattr(self, key, xattr_name):
2835 """
2836 Get the value of an extended attribute on an object.
2837
2838 :param key: the name of the object to get xattr from
2839 :type key: str
2840 :param xattr_name: which extended attribute to read
2841 :type xattr_name: str
2842
2843 :raises: :class:`TypeError`
2844 :raises: :class:`Error`
2845 :returns: str - value of the xattr
2846 """
2847 self.require_ioctx_open()
2848
2849 key = cstr(key, 'key')
2850 xattr_name = cstr(xattr_name, 'xattr_name')
2851 cdef:
2852 char *_key = key
2853 char *_xattr_name = xattr_name
2854 size_t ret_length = 4096
2855 char *ret_buf = NULL
2856
2857 try:
2858 while ret_length < 4096 * 1024 * 1024:
2859 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
2860 with nogil:
2861 ret = rados_getxattr(self.io, _key, _xattr_name, ret_buf, ret_length)
2862 if ret == -errno.ERANGE:
2863 ret_length *= 2
2864 elif ret < 0:
2865 raise make_ex(ret, "Failed to get xattr %r" % xattr_name)
2866 else:
2867 break
2868 return ret_buf[:ret]
2869 finally:
2870 free(ret_buf)
2871
2872 @requires(('oid', str_type))
2873 def get_xattrs(self, oid):
2874 """
2875 Start iterating over xattrs on an object.
2876
2877 :param oid: the name of the object to get xattrs from
2878 :type oid: str
2879
2880 :raises: :class:`TypeError`
2881 :raises: :class:`Error`
2882 :returns: XattrIterator
2883 """
2884 self.require_ioctx_open()
2885 return XattrIterator(self, oid)
2886
2887 @requires(('key', str_type), ('xattr_name', str_type), ('xattr_value', bytes))
2888 def set_xattr(self, key, xattr_name, xattr_value):
2889 """
2890 Set an extended attribute on an object.
2891
2892 :param key: the name of the object to set xattr to
2893 :type key: str
2894 :param xattr_name: which extended attribute to set
2895 :type xattr_name: str
2896 :param xattr_value: the value of the extended attribute
2897 :type xattr_value: bytes
2898
2899 :raises: :class:`TypeError`
2900 :raises: :class:`Error`
2901 :returns: bool - True on success, otherwise raise an error
2902 """
2903 self.require_ioctx_open()
2904
2905 key = cstr(key, 'key')
2906 xattr_name = cstr(xattr_name, 'xattr_name')
2907 cdef:
2908 char *_key = key
2909 char *_xattr_name = xattr_name
2910 char *_xattr_value = xattr_value
2911 size_t _xattr_value_len = len(xattr_value)
2912
2913 with nogil:
2914 ret = rados_setxattr(self.io, _key, _xattr_name,
2915 _xattr_value, _xattr_value_len)
2916 if ret < 0:
2917 raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
2918 return True
2919
2920 @requires(('key', str_type), ('xattr_name', str_type))
2921 def rm_xattr(self, key, xattr_name):
2922 """
2923 Removes an extended attribute on from an object.
2924
2925 :param key: the name of the object to remove xattr from
2926 :type key: str
2927 :param xattr_name: which extended attribute to remove
2928 :type xattr_name: str
2929
2930 :raises: :class:`TypeError`
2931 :raises: :class:`Error`
2932 :returns: bool - True on success, otherwise raise an error
2933 """
2934 self.require_ioctx_open()
2935
2936 key = cstr(key, 'key')
2937 xattr_name = cstr(xattr_name, 'xattr_name')
2938 cdef:
2939 char *_key = key
2940 char *_xattr_name = xattr_name
2941
2942 with nogil:
2943 ret = rados_rmxattr(self.io, _key, _xattr_name)
2944 if ret < 0:
2945 raise make_ex(ret, "Failed to delete key %r xattr %r" %
2946 (key, xattr_name))
2947 return True
2948
2949 def list_objects(self):
2950 """
2951 Get ObjectIterator on rados.Ioctx object.
2952
2953 :returns: ObjectIterator
2954 """
2955 self.require_ioctx_open()
2956 return ObjectIterator(self)
2957
2958 def list_snaps(self):
2959 """
2960 Get SnapIterator on rados.Ioctx object.
2961
2962 :returns: SnapIterator
2963 """
2964 self.require_ioctx_open()
2965 return SnapIterator(self)
2966
2967 @requires(('snap_name', str_type))
2968 def create_snap(self, snap_name):
2969 """
2970 Create a pool-wide snapshot
2971
2972 :param snap_name: the name of the snapshot
2973 :type snap_name: str
2974
2975 :raises: :class:`TypeError`
2976 :raises: :class:`Error`
2977 """
2978 self.require_ioctx_open()
2979 snap_name = cstr(snap_name, 'snap_name')
2980 cdef char *_snap_name = snap_name
2981
2982 with nogil:
2983 ret = rados_ioctx_snap_create(self.io, _snap_name)
2984 if ret != 0:
2985 raise make_ex(ret, "Failed to create snap %s" % snap_name)
2986
2987 @requires(('snap_name', str_type))
2988 def remove_snap(self, snap_name):
2989 """
2990 Removes a pool-wide snapshot
2991
2992 :param snap_name: the name of the snapshot
2993 :type snap_name: str
2994
2995 :raises: :class:`TypeError`
2996 :raises: :class:`Error`
2997 """
2998 self.require_ioctx_open()
2999 snap_name = cstr(snap_name, 'snap_name')
3000 cdef char *_snap_name = snap_name
3001
3002 with nogil:
3003 ret = rados_ioctx_snap_remove(self.io, _snap_name)
3004 if ret != 0:
3005 raise make_ex(ret, "Failed to remove snap %s" % snap_name)
3006
3007 @requires(('snap_name', str_type))
3008 def lookup_snap(self, snap_name):
3009 """
3010 Get the id of a pool snapshot
3011
3012 :param snap_name: the name of the snapshot to lookop
3013 :type snap_name: str
3014
3015 :raises: :class:`TypeError`
3016 :raises: :class:`Error`
3017 :returns: Snap - on success
3018 """
3019 self.require_ioctx_open()
3020 csnap_name = cstr(snap_name, 'snap_name')
3021 cdef:
3022 char *_snap_name = csnap_name
3023 rados_snap_t snap_id
3024
3025 with nogil:
3026 ret = rados_ioctx_snap_lookup(self.io, _snap_name, &snap_id)
3027 if ret != 0:
3028 raise make_ex(ret, "Failed to lookup snap %s" % snap_name)
3029 return Snap(self, snap_name, int(snap_id))
3030
3031 @requires(('oid', str_type), ('snap_name', str_type))
3032 def snap_rollback(self, oid, snap_name):
3033 """
3034 Rollback an object to a snapshot
3035
3036 :param oid: the name of the object
3037 :type oid: str
3038 :param snap_name: the name of the snapshot
3039 :type snap_name: str
3040
3041 :raises: :class:`TypeError`
3042 :raises: :class:`Error`
3043 """
3044 self.require_ioctx_open()
3045 oid = cstr(oid, 'oid')
3046 snap_name = cstr(snap_name, 'snap_name')
3047 cdef:
3048 char *_snap_name = snap_name
3049 char *_oid = oid
3050
3051 with nogil:
3052 ret = rados_ioctx_snap_rollback(self.io, _oid, _snap_name)
3053 if ret != 0:
3054 raise make_ex(ret, "Failed to rollback %s" % oid)
3055
3056 def get_last_version(self):
3057 """
3058 Return the version of the last object read or written to.
3059
3060 This exposes the internal version number of the last object read or
3061 written via this io context
3062
3063 :returns: version of the last object used
3064 """
3065 self.require_ioctx_open()
3066 with nogil:
3067 ret = rados_get_last_version(self.io)
3068 return int(ret)
3069
3070 def create_write_op(self):
3071 """
3072 create write operation object.
3073 need call release_write_op after use
3074 """
3075 return WriteOp().create()
3076
3077 def create_read_op(self):
3078 """
3079 create read operation object.
3080 need call release_read_op after use
3081 """
3082 return ReadOp().create()
3083
3084 def release_write_op(self, write_op):
3085 """
3086 release memory alloc by create_write_op
3087 """
3088 write_op.release()
3089
3090 def release_read_op(self, read_op):
3091 """
3092 release memory alloc by create_read_op
3093 :para read_op: read_op object
3094 :type: int
3095 """
3096 read_op.release()
3097
3098 @requires(('write_op', WriteOp), ('keys', tuple), ('values', tuple))
3099 def set_omap(self, write_op, keys, values):
3100 """
3101 set keys values to write_op
3102 :para write_op: write_operation object
3103 :type write_op: WriteOp
3104 :para keys: a tuple of keys
3105 :type keys: tuple
3106 :para values: a tuple of values
3107 :type values: tuple
3108 """
3109
3110 if len(keys) != len(values):
3111 raise Error("Rados(): keys and values must have the same number of items")
3112
3113 keys = cstr_list(keys, 'keys')
3114 cdef:
3115 WriteOp _write_op = write_op
3116 size_t key_num = len(keys)
3117 char **_keys = to_bytes_array(keys)
3118 char **_values = to_bytes_array(values)
3119 size_t *_lens = to_csize_t_array([len(v) for v in values])
3120
3121 try:
3122 with nogil:
3123 rados_write_op_omap_set(_write_op.write_op,
3124 <const char**>_keys,
3125 <const char**>_values,
3126 <const size_t*>_lens, key_num)
3127 finally:
3128 free(_keys)
3129 free(_values)
3130 free(_lens)
3131
3132 @requires(('write_op', WriteOp), ('oid', str_type), ('mtime', opt(int)), ('flags', opt(int)))
3133 def operate_write_op(self, write_op, oid, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3134 """
3135 excute the real write operation
3136 :para write_op: write operation object
3137 :type write_op: WriteOp
3138 :para oid: object name
3139 :type oid: str
3140 :para mtime: the time to set the mtime to, 0 for the current time
3141 :type mtime: int
3142 :para flags: flags to apply to the entire operation
3143 :type flags: int
3144 """
3145
3146 oid = cstr(oid, 'oid')
3147 cdef:
3148 WriteOp _write_op = write_op
3149 char *_oid = oid
3150 time_t _mtime = mtime
3151 int _flags = flags
3152
3153 with nogil:
3154 ret = rados_write_op_operate(_write_op.write_op, self.io, _oid, &_mtime, _flags)
3155 if ret != 0:
3156 raise make_ex(ret, "Failed to operate write op for oid %s" % oid)
3157
3158 @requires(('write_op', WriteOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('mtime', opt(int)), ('flags', opt(int)))
3159 def operate_aio_write_op(self, write_op, oid, oncomplete=None, onsafe=None, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3160 """
3161 excute the real write operation asynchronously
3162 :para write_op: write operation object
3163 :type write_op: WriteOp
3164 :para oid: object name
3165 :type oid: str
3166 :param oncomplete: what to do when the remove is safe and complete in memory
3167 on all replicas
3168 :type oncomplete: completion
3169 :param onsafe: what to do when the remove is safe and complete on storage
3170 on all replicas
3171 :type onsafe: completion
3172 :para mtime: the time to set the mtime to, 0 for the current time
3173 :type mtime: int
3174 :para flags: flags to apply to the entire operation
3175 :type flags: int
3176
3177 :raises: :class:`Error`
3178 :returns: completion object
3179 """
3180
3181 oid = cstr(oid, 'oid')
3182 cdef:
3183 WriteOp _write_op = write_op
3184 char *_oid = oid
3185 Completion completion
3186 time_t _mtime = mtime
3187 int _flags = flags
3188
3189 completion = self.__get_completion(oncomplete, onsafe)
3190 self.__track_completion(completion)
3191
3192 with nogil:
3193 ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid,
3194 &_mtime, _flags)
3195 if ret != 0:
3196 completion._cleanup()
3197 raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid)
3198 return completion
3199
3200 @requires(('read_op', ReadOp), ('oid', str_type), ('flag', opt(int)))
3201 def operate_read_op(self, read_op, oid, flag=LIBRADOS_OPERATION_NOFLAG):
3202 """
3203 excute the real read operation
3204 :para read_op: read operation object
3205 :type read_op: ReadOp
3206 :para oid: object name
3207 :type oid: str
3208 :para flag: flags to apply to the entire operation
3209 :type flag: int
3210 """
3211 oid = cstr(oid, 'oid')
3212 cdef:
3213 ReadOp _read_op = read_op
3214 char *_oid = oid
3215 int _flag = flag
3216
3217 with nogil:
3218 ret = rados_read_op_operate(_read_op.read_op, self.io, _oid, _flag)
3219 if ret != 0:
3220 raise make_ex(ret, "Failed to operate read op for oid %s" % oid)
3221
3222 @requires(('read_op', ReadOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('flag', opt(int)))
3223 def operate_aio_read_op(self, read_op, oid, oncomplete=None, onsafe=None, flag=LIBRADOS_OPERATION_NOFLAG):
3224 """
3225 excute the real read operation
3226 :para read_op: read operation object
3227 :type read_op: ReadOp
3228 :para oid: object name
3229 :type oid: str
3230 :param oncomplete: what to do when the remove is safe and complete in memory
3231 on all replicas
3232 :type oncomplete: completion
3233 :param onsafe: what to do when the remove is safe and complete on storage
3234 on all replicas
3235 :type onsafe: completion
3236 :para flag: flags to apply to the entire operation
3237 :type flag: int
3238 """
3239 oid = cstr(oid, 'oid')
3240 cdef:
3241 ReadOp _read_op = read_op
3242 char *_oid = oid
3243 Completion completion
3244 int _flag = flag
3245
3246 completion = self.__get_completion(oncomplete, onsafe)
3247 self.__track_completion(completion)
3248
3249 with nogil:
3250 ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag)
3251 if ret != 0:
3252 completion._cleanup()
3253 raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid)
3254 return completion
3255
3256 @requires(('read_op', ReadOp), ('start_after', str_type), ('filter_prefix', str_type), ('max_return', int))
3257 def get_omap_vals(self, read_op, start_after, filter_prefix, max_return):
3258 """
3259 get the omap values
3260 :para read_op: read operation object
3261 :type read_op: ReadOp
3262 :para start_after: list keys starting after start_after
3263 :type start_after: str
3264 :para filter_prefix: list only keys beginning with filter_prefix
3265 :type filter_prefix: str
3266 :para max_return: list no more than max_return key/value pairs
3267 :type max_return: int
3268 :returns: an iterator over the requested omap values, return value from this action
3269 """
3270
3271 start_after = cstr(start_after, 'start_after') if start_after else None
3272 filter_prefix = cstr(filter_prefix, 'filter_prefix') if filter_prefix else None
3273 cdef:
3274 char *_start_after = opt_str(start_after)
3275 char *_filter_prefix = opt_str(filter_prefix)
3276 ReadOp _read_op = read_op
3277 rados_omap_iter_t iter_addr = NULL
3278 int _max_return = max_return
3279 int prval = 0
3280
3281 with nogil:
3282 rados_read_op_omap_get_vals(_read_op.read_op, _start_after, _filter_prefix,
3283 _max_return, &iter_addr, &prval)
3284 it = OmapIterator(self)
3285 it.ctx = iter_addr
3286 return it, int(prval)
3287
3288 @requires(('read_op', ReadOp), ('start_after', str_type), ('max_return', int))
3289 def get_omap_keys(self, read_op, start_after, max_return):
3290 """
3291 get the omap keys
3292 :para read_op: read operation object
3293 :type read_op: ReadOp
3294 :para start_after: list keys starting after start_after
3295 :type start_after: str
3296 :para max_return: list no more than max_return key/value pairs
3297 :type max_return: int
3298 :returns: an iterator over the requested omap values, return value from this action
3299 """
3300 start_after = cstr(start_after, 'start_after') if start_after else None
3301 cdef:
3302 char *_start_after = opt_str(start_after)
3303 ReadOp _read_op = read_op
3304 rados_omap_iter_t iter_addr = NULL
3305 int _max_return = max_return
3306 int prval = 0
3307
3308 with nogil:
3309 rados_read_op_omap_get_keys(_read_op.read_op, _start_after,
3310 _max_return, &iter_addr, &prval)
3311 it = OmapIterator(self)
3312 it.ctx = iter_addr
3313 return it, int(prval)
3314
3315 @requires(('read_op', ReadOp), ('keys', tuple))
3316 def get_omap_vals_by_keys(self, read_op, keys):
3317 """
3318 get the omap values by keys
3319 :para read_op: read operation object
3320 :type read_op: ReadOp
3321 :para keys: input key tuple
3322 :type keys: tuple
3323 :returns: an iterator over the requested omap values, return value from this action
3324 """
3325 keys = cstr_list(keys, 'keys')
3326 cdef:
3327 ReadOp _read_op = read_op
3328 rados_omap_iter_t iter_addr
3329 char **_keys = to_bytes_array(keys)
3330 size_t key_num = len(keys)
3331 int prval = 0
3332
3333 try:
3334 with nogil:
3335 rados_read_op_omap_get_vals_by_keys(_read_op.read_op,
3336 <const char**>_keys,
3337 key_num, &iter_addr, &prval)
3338 it = OmapIterator(self)
3339 it.ctx = iter_addr
3340 return it, int(prval)
3341 finally:
3342 free(_keys)
3343
3344 @requires(('write_op', WriteOp), ('keys', tuple))
3345 def remove_omap_keys(self, write_op, keys):
3346 """
3347 remove omap keys specifiled
3348 :para write_op: write operation object
3349 :type write_op: WriteOp
3350 :para keys: input key tuple
3351 :type keys: tuple
3352 """
3353
3354 keys = cstr_list(keys, 'keys')
3355 cdef:
3356 WriteOp _write_op = write_op
3357 size_t key_num = len(keys)
3358 char **_keys = to_bytes_array(keys)
3359
3360 try:
3361 with nogil:
3362 rados_write_op_omap_rm_keys(_write_op.write_op, <const char**>_keys, key_num)
3363 finally:
3364 free(_keys)
3365
3366 @requires(('write_op', WriteOp))
3367 def clear_omap(self, write_op):
3368 """
3369 Remove all key/value pairs from an object
3370 :para write_op: write operation object
3371 :type write_op: WriteOp
3372 """
3373
3374 cdef:
3375 WriteOp _write_op = write_op
3376
3377 with nogil:
3378 rados_write_op_omap_clear(_write_op.write_op)
3379
3380 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('desc', str_type),
3381 ('duration', opt(int)), ('flags', int))
3382 def lock_exclusive(self, key, name, cookie, desc="", duration=None, flags=0):
3383
3384 """
3385 Take an exclusive lock on an object
3386
3387 :param key: name of the object
3388 :type key: str
3389 :param name: name of the lock
3390 :type name: str
3391 :param cookie: cookie of the lock
3392 :type cookie: str
3393 :param desc: description of the lock
3394 :type desc: str
3395 :param duration: duration of the lock in seconds
3396 :type duration: int
3397 :param flags: flags
3398 :type flags: int
3399
3400 :raises: :class:`TypeError`
3401 :raises: :class:`Error`
3402 """
3403 self.require_ioctx_open()
3404
3405 key = cstr(key, 'key')
3406 name = cstr(name, 'name')
3407 cookie = cstr(cookie, 'cookie')
3408 desc = cstr(desc, 'desc')
3409
3410 cdef:
3411 char* _key = key
3412 char* _name = name
3413 char* _cookie = cookie
3414 char* _desc = desc
3415 uint8_t _flags = flags
3416 timeval _duration
3417
3418 if duration is None:
3419 with nogil:
3420 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3421 NULL, _flags)
3422 else:
3423 _duration.tv_sec = duration
3424 with nogil:
3425 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3426 &_duration, _flags)
3427
3428 if ret < 0:
3429 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3430
3431 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('tag', str_type),
3432 ('desc', str_type), ('duration', opt(int)), ('flags', int))
3433 def lock_shared(self, key, name, cookie, tag, desc="", duration=None, flags=0):
3434
3435 """
3436 Take a shared lock on an object
3437
3438 :param key: name of the object
3439 :type key: str
3440 :param name: name of the lock
3441 :type name: str
3442 :param cookie: cookie of the lock
3443 :type cookie: str
3444 :param tag: tag of the lock
3445 :type tag: str
3446 :param desc: description of the lock
3447 :type desc: str
3448 :param duration: duration of the lock in seconds
3449 :type duration: int
3450 :param flags: flags
3451 :type flags: int
3452
3453 :raises: :class:`TypeError`
3454 :raises: :class:`Error`
3455 """
3456 self.require_ioctx_open()
3457
3458 key = cstr(key, 'key')
3459 tag = cstr(tag, 'tag')
3460 name = cstr(name, 'name')
3461 cookie = cstr(cookie, 'cookie')
3462 desc = cstr(desc, 'desc')
3463
3464 cdef:
3465 char* _key = key
3466 char* _tag = tag
3467 char* _name = name
3468 char* _cookie = cookie
3469 char* _desc = desc
3470 uint8_t _flags = flags
3471 timeval _duration
3472
3473 if duration is None:
3474 with nogil:
3475 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3476 NULL, _flags)
3477 else:
3478 _duration.tv_sec = duration
3479 with nogil:
3480 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3481 &_duration, _flags)
3482 if ret < 0:
3483 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3484
3485 @requires(('key', str_type), ('name', str_type), ('cookie', str_type))
3486 def unlock(self, key, name, cookie):
3487
3488 """
3489 Release a shared or exclusive lock on an object
3490
3491 :param key: name of the object
3492 :type key: str
3493 :param name: name of the lock
3494 :type name: str
3495 :param cookie: cookie of the lock
3496 :type cookie: str
3497
3498 :raises: :class:`TypeError`
3499 :raises: :class:`Error`
3500 """
3501 self.require_ioctx_open()
3502
3503 key = cstr(key, 'key')
3504 name = cstr(name, 'name')
3505 cookie = cstr(cookie, 'cookie')
3506
3507 cdef:
3508 char* _key = key
3509 char* _name = name
3510 char* _cookie = cookie
3511
3512 with nogil:
3513 ret = rados_unlock(self.io, _key, _name, _cookie)
3514 if ret < 0:
3515 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3516
3517
3518 def set_object_locator(func):
3519 def retfunc(self, *args, **kwargs):
3520 if self.locator_key is not None:
3521 old_locator = self.ioctx.get_locator_key()
3522 self.ioctx.set_locator_key(self.locator_key)
3523 retval = func(self, *args, **kwargs)
3524 self.ioctx.set_locator_key(old_locator)
3525 return retval
3526 else:
3527 return func(self, *args, **kwargs)
3528 return retfunc
3529
3530
3531 def set_object_namespace(func):
3532 def retfunc(self, *args, **kwargs):
3533 if self.nspace is None:
3534 raise LogicError("Namespace not set properly in context")
3535 old_nspace = self.ioctx.get_namespace()
3536 self.ioctx.set_namespace(self.nspace)
3537 retval = func(self, *args, **kwargs)
3538 self.ioctx.set_namespace(old_nspace)
3539 return retval
3540 return retfunc
3541
3542
3543 class Object(object):
3544 """Rados object wrapper, makes the object look like a file"""
3545 def __init__(self, ioctx, key, locator_key=None, nspace=None):
3546 self.key = key
3547 self.ioctx = ioctx
3548 self.offset = 0
3549 self.state = "exists"
3550 self.locator_key = locator_key
3551 self.nspace = "" if nspace is None else nspace
3552
3553 def __str__(self):
3554 return "rados.Object(ioctx=%s,key=%s,nspace=%s,locator=%s)" % \
3555 (str(self.ioctx), self.key, "--default--"
3556 if self.nspace is "" else self.nspace, self.locator_key)
3557
3558 def require_object_exists(self):
3559 if self.state != "exists":
3560 raise ObjectStateError("The object is %s" % self.state)
3561
3562 @set_object_locator
3563 @set_object_namespace
3564 def read(self, length=1024 * 1024):
3565 self.require_object_exists()
3566 ret = self.ioctx.read(self.key, length, self.offset)
3567 self.offset += len(ret)
3568 return ret
3569
3570 @set_object_locator
3571 @set_object_namespace
3572 def write(self, string_to_write):
3573 self.require_object_exists()
3574 ret = self.ioctx.write(self.key, string_to_write, self.offset)
3575 if ret == 0:
3576 self.offset += len(string_to_write)
3577 return ret
3578
3579 @set_object_locator
3580 @set_object_namespace
3581 def remove(self):
3582 self.require_object_exists()
3583 self.ioctx.remove_object(self.key)
3584 self.state = "removed"
3585
3586 @set_object_locator
3587 @set_object_namespace
3588 def stat(self):
3589 self.require_object_exists()
3590 return self.ioctx.stat(self.key)
3591
3592 def seek(self, position):
3593 self.require_object_exists()
3594 self.offset = position
3595
3596 @set_object_locator
3597 @set_object_namespace
3598 def get_xattr(self, xattr_name):
3599 self.require_object_exists()
3600 return self.ioctx.get_xattr(self.key, xattr_name)
3601
3602 @set_object_locator
3603 @set_object_namespace
3604 def get_xattrs(self):
3605 self.require_object_exists()
3606 return self.ioctx.get_xattrs(self.key)
3607
3608 @set_object_locator
3609 @set_object_namespace
3610 def set_xattr(self, xattr_name, xattr_value):
3611 self.require_object_exists()
3612 return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)
3613
3614 @set_object_locator
3615 @set_object_namespace
3616 def rm_xattr(self, xattr_name):
3617 self.require_object_exists()
3618 return self.ioctx.rm_xattr(self.key, xattr_name)
3619
3620 MONITOR_LEVELS = [
3621 "debug",
3622 "info",
3623 "warn", "warning",
3624 "err", "error",
3625 "sec",
3626 ]
3627
3628
3629 class MonitorLog(object):
3630 # NOTE(sileht): Keep this class for backward compat
3631 # method moved to Rados.monitor_log()
3632 """
3633 For watching cluster log messages. Instantiate an object and keep
3634 it around while callback is periodically called. Construct with
3635 'level' to monitor 'level' messages (one of MONITOR_LEVELS).
3636 arg will be passed to the callback.
3637
3638 callback will be called with:
3639 arg (given to __init__)
3640 line (the full line, including timestamp, who, level, msg)
3641 who (which entity issued the log message)
3642 timestamp_sec (sec of a struct timespec)
3643 timestamp_nsec (sec of a struct timespec)
3644 seq (sequence number)
3645 level (string representing the level of the log message)
3646 msg (the message itself)
3647 callback's return value is ignored
3648 """
3649 def __init__(self, cluster, level, callback, arg):
3650 self.level = level
3651 self.callback = callback
3652 self.arg = arg
3653 self.cluster = cluster
3654 self.cluster.monitor_log(level, callback, arg)
3655