]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/rados/rados.pyx
9e8430e246eb55e23c8a850f22de5ecd0309501e
[ceph.git] / ceph / src / pybind / rados / rados.pyx
1 # cython: embedsignature=True
2 """
3 This module is a thin wrapper around librados.
4
5 Error codes from librados are turned into exceptions that subclass
6 :class:`Error`. Almost all methods may raise :class:`Error(the base class of all rados exceptions), :class:`PermissionError`
7 (the base class of all rados exceptions), :class:`PermissionError`
8 and :class:`IOError`, in addition to those documented for the
9 method.
10 """
11 # Copyright 2011 Josh Durgin
12 # Copyright 2011, Hannu Valtonen <hannu.valtonen@ormod.com>
13 # Copyright 2015 Hector Martin <marcan@marcan.st>
14 # Copyright 2016 Mehdi Abaakouk <sileht@redhat.com>
15
16 from cpython cimport PyObject, ref
17 from cpython.pycapsule cimport *
18 from libc cimport errno
19 from libc.stdint cimport *
20 from libc.stdlib cimport malloc, realloc, free
21
22 import sys
23 import threading
24 import time
25
26 from collections import Callable
27 from datetime import datetime
28 from functools import partial, wraps
29 from itertools import chain
30
31 # Are we running Python 2.x
32 if sys.version_info[0] < 3:
33 str_type = basestring
34 else:
35 str_type = str
36
37
38 cdef extern from "Python.h":
39 # These are in cpython/string.pxd, but use "object" types instead of
40 # PyObject*, which invokes assumptions in cpython that we need to
41 # legitimately break to implement zero-copy string buffers in Ioctx.read().
42 # This is valid use of the Python API and documented as a special case.
43 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
44 char* PyBytes_AsString(PyObject *string) except NULL
45 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
46 void PyEval_InitThreads()
47
48
49 cdef extern from "time.h":
50 ctypedef long int time_t
51 ctypedef long int suseconds_t
52
53
54 cdef extern from "sys/time.h":
55 cdef struct timeval:
56 time_t tv_sec
57 suseconds_t tv_usec
58
59
60 cdef extern from "rados/rados_types.h" nogil:
61 cdef char* _LIBRADOS_ALL_NSPACES "LIBRADOS_ALL_NSPACES"
62
63
64 cdef extern from "rados/librados.h" nogil:
65 enum:
66 _LIBRADOS_OP_FLAG_EXCL "LIBRADOS_OP_FLAG_EXCL"
67 _LIBRADOS_OP_FLAG_FAILOK "LIBRADOS_OP_FLAG_FAILOK"
68 _LIBRADOS_OP_FLAG_FADVISE_RANDOM "LIBRADOS_OP_FLAG_FADVISE_RANDOM"
69 _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL "LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL"
70 _LIBRADOS_OP_FLAG_FADVISE_WILLNEED "LIBRADOS_OP_FLAG_FADVISE_WILLNEED"
71 _LIBRADOS_OP_FLAG_FADVISE_DONTNEED "LIBRADOS_OP_FLAG_FADVISE_DONTNEED"
72 _LIBRADOS_OP_FLAG_FADVISE_NOCACHE "LIBRADOS_OP_FLAG_FADVISE_NOCACHE"
73
74
75 enum:
76 _LIBRADOS_OPERATION_NOFLAG "LIBRADOS_OPERATION_NOFLAG"
77 _LIBRADOS_OPERATION_BALANCE_READS "LIBRADOS_OPERATION_BALANCE_READS"
78 _LIBRADOS_OPERATION_LOCALIZE_READS "LIBRADOS_OPERATION_LOCALIZE_READS"
79 _LIBRADOS_OPERATION_ORDER_READS_WRITES "LIBRADOS_OPERATION_ORDER_READS_WRITES"
80 _LIBRADOS_OPERATION_IGNORE_CACHE "LIBRADOS_OPERATION_IGNORE_CACHE"
81 _LIBRADOS_OPERATION_SKIPRWLOCKS "LIBRADOS_OPERATION_SKIPRWLOCKS"
82 _LIBRADOS_OPERATION_IGNORE_OVERLAY "LIBRADOS_OPERATION_IGNORE_OVERLAY"
83 _LIBRADOS_CREATE_EXCLUSIVE "LIBRADOS_CREATE_EXCLUSIVE"
84 _LIBRADOS_CREATE_IDEMPOTENT "LIBRADOS_CREATE_IDEMPOTENT"
85
86 cdef uint64_t _LIBRADOS_SNAP_HEAD "LIBRADOS_SNAP_HEAD"
87
88 ctypedef void* rados_t
89 ctypedef void* rados_config_t
90 ctypedef void* rados_ioctx_t
91 ctypedef void* rados_xattrs_iter_t
92 ctypedef void* rados_omap_iter_t
93 ctypedef void* rados_list_ctx_t
94 ctypedef uint64_t rados_snap_t
95 ctypedef void *rados_write_op_t
96 ctypedef void *rados_read_op_t
97 ctypedef void *rados_completion_t
98 ctypedef void (*rados_callback_t)(rados_completion_t cb, void *arg)
99 ctypedef void (*rados_log_callback_t)(void *arg, const char *line, const char *who,
100 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
101 ctypedef void (*rados_log_callback2_t)(void *arg, const char *line, const char *who, const char *name,
102 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
103
104
105 cdef struct rados_cluster_stat_t:
106 uint64_t kb
107 uint64_t kb_used
108 uint64_t kb_avail
109 uint64_t num_objects
110
111 cdef struct rados_pool_stat_t:
112 uint64_t num_bytes
113 uint64_t num_kb
114 uint64_t num_objects
115 uint64_t num_object_clones
116 uint64_t num_object_copies
117 uint64_t num_objects_missing_on_primary
118 uint64_t num_objects_unfound
119 uint64_t num_objects_degraded
120 uint64_t num_rd
121 uint64_t num_rd_kb
122 uint64_t num_wr
123 uint64_t num_wr_kb
124
125 void rados_buffer_free(char *buf)
126
127 void rados_version(int *major, int *minor, int *extra)
128 int rados_create2(rados_t *pcluster, const char *const clustername,
129 const char * const name, uint64_t flags)
130 int rados_create_with_context(rados_t *cluster, rados_config_t cct)
131 int rados_connect(rados_t cluster)
132 void rados_shutdown(rados_t cluster)
133 int rados_conf_read_file(rados_t cluster, const char *path)
134 int rados_conf_parse_argv_remainder(rados_t cluster, int argc, const char **argv, const char **remargv)
135 int rados_conf_parse_env(rados_t cluster, const char *var)
136 int rados_conf_set(rados_t cluster, char *option, const char *value)
137 int rados_conf_get(rados_t cluster, char *option, char *buf, size_t len)
138
139 int rados_ioctx_pool_stat(rados_ioctx_t io, rados_pool_stat_t *stats)
140 int64_t rados_pool_lookup(rados_t cluster, const char *pool_name)
141 int rados_pool_reverse_lookup(rados_t cluster, int64_t id, char *buf, size_t maxlen)
142 int rados_pool_create(rados_t cluster, const char *pool_name)
143 int rados_pool_create_with_auid(rados_t cluster, const char *pool_name, uint64_t auid)
144 int rados_pool_create_with_crush_rule(rados_t cluster, const char *pool_name, uint8_t crush_rule_num)
145 int rados_pool_create_with_all(rados_t cluster, const char *pool_name, uint64_t auid, uint8_t crush_rule_num)
146 int rados_pool_get_base_tier(rados_t cluster, int64_t pool, int64_t *base_tier)
147 int rados_pool_list(rados_t cluster, char *buf, size_t len)
148 int rados_pool_delete(rados_t cluster, const char *pool_name)
149 int rados_inconsistent_pg_list(rados_t cluster, int64_t pool, char *buf, size_t len)
150
151 int rados_cluster_stat(rados_t cluster, rados_cluster_stat_t *result)
152 int rados_cluster_fsid(rados_t cluster, char *buf, size_t len)
153 int rados_blacklist_add(rados_t cluster, char *client_address, uint32_t expire_seconds)
154
155 int rados_ping_monitor(rados_t cluster, const char *mon_id, char **outstr, size_t *outstrlen)
156 int rados_mon_command(rados_t cluster, const char **cmd, size_t cmdlen,
157 const char *inbuf, size_t inbuflen,
158 char **outbuf, size_t *outbuflen,
159 char **outs, size_t *outslen)
160 int rados_mgr_command(rados_t cluster, const char **cmd, size_t cmdlen,
161 const char *inbuf, size_t inbuflen,
162 char **outbuf, size_t *outbuflen,
163 char **outs, size_t *outslen)
164 int rados_mon_command_target(rados_t cluster, const char *name, const char **cmd, size_t cmdlen,
165 const char *inbuf, size_t inbuflen,
166 char **outbuf, size_t *outbuflen,
167 char **outs, size_t *outslen)
168 int rados_osd_command(rados_t cluster, int osdid, const char **cmd, size_t cmdlen,
169 const char *inbuf, size_t inbuflen,
170 char **outbuf, size_t *outbuflen,
171 char **outs, size_t *outslen)
172 int rados_pg_command(rados_t cluster, const char *pgstr, const char **cmd, size_t cmdlen,
173 const char *inbuf, size_t inbuflen,
174 char **outbuf, size_t *outbuflen,
175 char **outs, size_t *outslen)
176 int rados_monitor_log(rados_t cluster, const char *level, rados_log_callback_t cb, void *arg)
177 int rados_monitor_log2(rados_t cluster, const char *level, rados_log_callback2_t cb, void *arg)
178
179 int rados_wait_for_latest_osdmap(rados_t cluster)
180
181 int rados_ioctx_create(rados_t cluster, const char *pool_name, rados_ioctx_t *ioctx)
182 void rados_ioctx_destroy(rados_ioctx_t io)
183 int rados_ioctx_pool_set_auid(rados_ioctx_t io, uint64_t auid)
184 void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key)
185 void rados_ioctx_set_namespace(rados_ioctx_t io, const char * nspace)
186
187 uint64_t rados_get_last_version(rados_ioctx_t io)
188 int rados_stat(rados_ioctx_t io, const char *o, uint64_t *psize, time_t *pmtime)
189 int rados_write(rados_ioctx_t io, const char *oid, const char *buf, size_t len, uint64_t off)
190 int rados_write_full(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
191 int rados_append(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
192 int rados_read(rados_ioctx_t io, const char *oid, char *buf, size_t len, uint64_t off)
193 int rados_remove(rados_ioctx_t io, const char *oid)
194 int rados_trunc(rados_ioctx_t io, const char *oid, uint64_t size)
195 int rados_getxattr(rados_ioctx_t io, const char *o, const char *name, char *buf, size_t len)
196 int rados_setxattr(rados_ioctx_t io, const char *o, const char *name, const char *buf, size_t len)
197 int rados_rmxattr(rados_ioctx_t io, const char *o, const char *name)
198 int rados_getxattrs(rados_ioctx_t io, const char *oid, rados_xattrs_iter_t *iter)
199 int rados_getxattrs_next(rados_xattrs_iter_t iter, const char **name, const char **val, size_t *len)
200 void rados_getxattrs_end(rados_xattrs_iter_t iter)
201
202 int rados_nobjects_list_open(rados_ioctx_t io, rados_list_ctx_t *ctx)
203 int rados_nobjects_list_next(rados_list_ctx_t ctx, const char **entry, const char **key, const char **nspace)
204 void rados_nobjects_list_close(rados_list_ctx_t ctx)
205
206 int rados_ioctx_snap_rollback(rados_ioctx_t io, const char * oid, const char * snapname)
207 int rados_ioctx_snap_create(rados_ioctx_t io, const char * snapname)
208 int rados_ioctx_snap_remove(rados_ioctx_t io, const char * snapname)
209 int rados_ioctx_snap_lookup(rados_ioctx_t io, const char * name, rados_snap_t * id)
210 int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id, char * name, int maxlen)
211 void rados_ioctx_snap_set_read(rados_ioctx_t io, rados_snap_t snap)
212 int rados_ioctx_snap_list(rados_ioctx_t io, rados_snap_t * snaps, int maxlen)
213 int rados_ioctx_snap_get_stamp(rados_ioctx_t io, rados_snap_t id, time_t * t)
214
215 int rados_lock_exclusive(rados_ioctx_t io, const char * oid, const char * name,
216 const char * cookie, const char * desc,
217 timeval * duration, uint8_t flags)
218 int rados_lock_shared(rados_ioctx_t io, const char * o, const char * name,
219 const char * cookie, const char * tag, const char * desc,
220 timeval * duration, uint8_t flags)
221 int rados_unlock(rados_ioctx_t io, const char * o, const char * name, const char * cookie)
222
223 rados_write_op_t rados_create_write_op()
224 void rados_release_write_op(rados_write_op_t write_op)
225
226 rados_read_op_t rados_create_read_op()
227 void rados_release_read_op(rados_read_op_t read_op)
228
229 int rados_aio_create_completion(void * cb_arg, rados_callback_t cb_complete, rados_callback_t cb_safe, rados_completion_t * pc)
230 void rados_aio_release(rados_completion_t c)
231 int rados_aio_stat(rados_ioctx_t io, const char *oid, rados_completion_t completion, uint64_t *psize, time_t *pmtime)
232 int rados_aio_write(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len, uint64_t off)
233 int rados_aio_append(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
234 int rados_aio_write_full(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
235 int rados_aio_remove(rados_ioctx_t io, const char * oid, rados_completion_t completion)
236 int rados_aio_read(rados_ioctx_t io, const char * oid, rados_completion_t completion, char * buf, size_t len, uint64_t off)
237 int rados_aio_flush(rados_ioctx_t io)
238
239 int rados_aio_get_return_value(rados_completion_t c)
240 int rados_aio_wait_for_complete_and_cb(rados_completion_t c)
241 int rados_aio_wait_for_safe_and_cb(rados_completion_t c)
242 int rados_aio_wait_for_complete(rados_completion_t c)
243 int rados_aio_wait_for_safe(rados_completion_t c)
244 int rados_aio_is_complete(rados_completion_t c)
245 int rados_aio_is_safe(rados_completion_t c)
246
247 int rados_exec(rados_ioctx_t io, const char * oid, const char * cls, const char * method,
248 const char * in_buf, size_t in_len, char * buf, size_t out_len)
249 int rados_aio_exec(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * cls, const char * method,
250 const char * in_buf, size_t in_len, char * buf, size_t out_len)
251
252 int rados_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, const char * oid, time_t * mtime, int flags)
253 int rados_aio_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, time_t *mtime, int flags)
254 void rados_write_op_omap_set(rados_write_op_t write_op, const char * const* keys, const char * const* vals, const size_t * lens, size_t num)
255 void rados_write_op_omap_rm_keys(rados_write_op_t write_op, const char * const* keys, size_t keys_len)
256 void rados_write_op_omap_clear(rados_write_op_t write_op)
257 void rados_write_op_set_flags(rados_write_op_t write_op, int flags)
258
259 void rados_write_op_create(rados_write_op_t write_op, int exclusive, const char *category)
260 void rados_write_op_append(rados_write_op_t write_op, const char *buffer, size_t len)
261 void rados_write_op_write_full(rados_write_op_t write_op, const char *buffer, size_t len)
262 void rados_write_op_write(rados_write_op_t write_op, const char *buffer, size_t len, uint64_t offset)
263 void rados_write_op_remove(rados_write_op_t write_op)
264 void rados_write_op_truncate(rados_write_op_t write_op, uint64_t offset)
265 void rados_write_op_zero(rados_write_op_t write_op, uint64_t offset, uint64_t len)
266
267 void rados_read_op_omap_get_vals(rados_read_op_t read_op, const char * start_after, const char * filter_prefix, uint64_t max_return, rados_omap_iter_t * iter, int * prval)
268 void rados_read_op_omap_get_keys(rados_read_op_t read_op, const char * start_after, uint64_t max_return, rados_omap_iter_t * iter, int * prval)
269 void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op, const char * const* keys, size_t keys_len, rados_omap_iter_t * iter, int * prval)
270 int rados_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, const char * oid, int flags)
271 int rados_aio_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, int flags)
272 void rados_read_op_set_flags(rados_read_op_t read_op, int flags)
273 int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len)
274 void rados_omap_get_end(rados_omap_iter_t iter)
275
276
277 LIBRADOS_OP_FLAG_EXCL = _LIBRADOS_OP_FLAG_EXCL
278 LIBRADOS_OP_FLAG_FAILOK = _LIBRADOS_OP_FLAG_FAILOK
279 LIBRADOS_OP_FLAG_FADVISE_RANDOM = _LIBRADOS_OP_FLAG_FADVISE_RANDOM
280 LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL = _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL
281 LIBRADOS_OP_FLAG_FADVISE_WILLNEED = _LIBRADOS_OP_FLAG_FADVISE_WILLNEED
282 LIBRADOS_OP_FLAG_FADVISE_DONTNEED = _LIBRADOS_OP_FLAG_FADVISE_DONTNEED
283 LIBRADOS_OP_FLAG_FADVISE_NOCACHE = _LIBRADOS_OP_FLAG_FADVISE_NOCACHE
284
285 LIBRADOS_SNAP_HEAD = _LIBRADOS_SNAP_HEAD
286
287 LIBRADOS_OPERATION_NOFLAG = _LIBRADOS_OPERATION_NOFLAG
288 LIBRADOS_OPERATION_BALANCE_READS = _LIBRADOS_OPERATION_BALANCE_READS
289 LIBRADOS_OPERATION_LOCALIZE_READS = _LIBRADOS_OPERATION_LOCALIZE_READS
290 LIBRADOS_OPERATION_ORDER_READS_WRITES = _LIBRADOS_OPERATION_ORDER_READS_WRITES
291 LIBRADOS_OPERATION_IGNORE_CACHE = _LIBRADOS_OPERATION_IGNORE_CACHE
292 LIBRADOS_OPERATION_SKIPRWLOCKS = _LIBRADOS_OPERATION_SKIPRWLOCKS
293 LIBRADOS_OPERATION_IGNORE_OVERLAY = _LIBRADOS_OPERATION_IGNORE_OVERLAY
294
295 LIBRADOS_ALL_NSPACES = _LIBRADOS_ALL_NSPACES.decode('utf-8')
296
297 LIBRADOS_CREATE_EXCLUSIVE = _LIBRADOS_CREATE_EXCLUSIVE
298 LIBRADOS_CREATE_IDEMPOTENT = _LIBRADOS_CREATE_IDEMPOTENT
299
300 ANONYMOUS_AUID = 0xffffffffffffffff
301 ADMIN_AUID = 0
302
303
304 class Error(Exception):
305 """ `Error` class, derived from `Exception` """
306 pass
307
308
309 class InvalidArgumentError(Error):
310 pass
311
312
313 class OSError(Error):
314 """ `OSError` class, derived from `Error` """
315 def __init__(self, errno, strerror):
316 self.errno = errno
317 self.strerror = strerror
318
319 def __str__(self):
320 return '[Errno {0}] {1}'.format(self.errno, self.strerror)
321
322
323 class InterruptedOrTimeoutError(OSError):
324 """ `InterruptedOrTimeoutError` class, derived from `OSError` """
325 pass
326
327
328 class PermissionError(OSError):
329 """ `PermissionError` class, derived from `OSError` """
330 pass
331
332
333 class PermissionDeniedError(OSError):
334 """ deal with EACCES related. """
335 pass
336
337
338 class ObjectNotFound(OSError):
339 """ `ObjectNotFound` class, derived from `OSError` """
340 pass
341
342
343 class NoData(OSError):
344 """ `NoData` class, derived from `OSError` """
345 pass
346
347
348 class ObjectExists(OSError):
349 """ `ObjectExists` class, derived from `OSError` """
350 pass
351
352
353 class ObjectBusy(OSError):
354 """ `ObjectBusy` class, derived from `IOError` """
355 pass
356
357
358 class IOError(OSError):
359 """ `ObjectBusy` class, derived from `OSError` """
360 pass
361
362
363 class NoSpace(OSError):
364 """ `NoSpace` class, derived from `OSError` """
365 pass
366
367
368 class RadosStateError(Error):
369 """ `RadosStateError` class, derived from `Error` """
370 pass
371
372
373 class IoctxStateError(Error):
374 """ `IoctxStateError` class, derived from `Error` """
375 pass
376
377
378 class ObjectStateError(Error):
379 """ `ObjectStateError` class, derived from `Error` """
380 pass
381
382
383 class LogicError(Error):
384 """ `` class, derived from `Error` """
385 pass
386
387
388 class TimedOut(OSError):
389 """ `TimedOut` class, derived from `OSError` """
390 pass
391
392
393 IF UNAME_SYSNAME == "FreeBSD":
394 cdef errno_to_exception = {
395 errno.EPERM : PermissionError,
396 errno.ENOENT : ObjectNotFound,
397 errno.EIO : IOError,
398 errno.ENOSPC : NoSpace,
399 errno.EEXIST : ObjectExists,
400 errno.EBUSY : ObjectBusy,
401 errno.ENOATTR : NoData,
402 errno.EINTR : InterruptedOrTimeoutError,
403 errno.ETIMEDOUT : TimedOut,
404 errno.EACCES : PermissionDeniedError,
405 errno.EINVAL : InvalidArgumentError,
406 }
407 ELSE:
408 cdef errno_to_exception = {
409 errno.EPERM : PermissionError,
410 errno.ENOENT : ObjectNotFound,
411 errno.EIO : IOError,
412 errno.ENOSPC : NoSpace,
413 errno.EEXIST : ObjectExists,
414 errno.EBUSY : ObjectBusy,
415 errno.ENODATA : NoData,
416 errno.EINTR : InterruptedOrTimeoutError,
417 errno.ETIMEDOUT : TimedOut,
418 errno.EACCES : PermissionDeniedError,
419 errno.EINVAL : InvalidArgumentError,
420 }
421
422
423 cdef make_ex(ret, msg):
424 """
425 Translate a librados return code into an exception.
426
427 :param ret: the return code
428 :type ret: int
429 :param msg: the error message to use
430 :type msg: str
431 :returns: a subclass of :class:`Error`
432 """
433 ret = abs(ret)
434 if ret in errno_to_exception:
435 return errno_to_exception[ret](ret, msg)
436 else:
437 return Error(ret, msg + (": error code %d" % ret))
438
439
440 # helper to specify an optional argument, where in addition to `cls`, `None`
441 # is also acceptable
442 def opt(cls):
443 return (cls, None)
444
445
446 # validate argument types of an instance method
447 # kwargs is an un-ordered dict, so use args instead
448 def requires(*types):
449 def is_type_of(v, t):
450 if t is None:
451 return v is None
452 else:
453 return isinstance(v, t)
454
455 def check_type(val, arg_name, arg_type):
456 if isinstance(arg_type, tuple):
457 if any(is_type_of(val, t) for t in arg_type):
458 return
459 type_names = ' or '.join('None' if t is None else t.__name__
460 for t in arg_type)
461 raise TypeError('%s must be %s' % (arg_name, type_names))
462 else:
463 if is_type_of(val, arg_type):
464 return
465 assert(arg_type is not None)
466 raise TypeError('%s must be %s' % (arg_name, arg_type.__name__))
467
468 def wrapper(f):
469 # FIXME(sileht): this stop with
470 # AttributeError: 'method_descriptor' object has no attribute '__module__'
471 # @wraps(f)
472 def validate_func(*args, **kwargs):
473 # ignore the `self` arg
474 pos_args = zip(args[1:], types)
475 named_args = ((kwargs[name], (name, spec)) for name, spec in types
476 if name in kwargs)
477 for arg_val, (arg_name, arg_type) in chain(pos_args, named_args):
478 check_type(arg_val, arg_name, arg_type)
479 return f(*args, **kwargs)
480 return validate_func
481 return wrapper
482
483
484 def cstr(val, name, encoding="utf-8", opt=False):
485 """
486 Create a byte string from a Python string
487
488 :param basestring val: Python string
489 :param str name: Name of the string parameter, for exceptions
490 :param str encoding: Encoding to use
491 :param bool opt: If True, None is allowed
492 :rtype: bytes
493 :raises: :class:`InvalidArgument`
494 """
495 if opt and val is None:
496 return None
497 if isinstance(val, bytes):
498 return val
499 elif isinstance(val, unicode):
500 return val.encode(encoding)
501 else:
502 raise TypeError('%s must be a string' % name)
503
504
505 def cstr_list(list_str, name, encoding="utf-8"):
506 return [cstr(s, name) for s in list_str]
507
508
509 def decode_cstr(val, encoding="utf-8"):
510 """
511 Decode a byte string into a Python string.
512
513 :param bytes val: byte string
514 :rtype: unicode or None
515 """
516 if val is None:
517 return None
518
519 return val.decode(encoding)
520
521
522 cdef char* opt_str(s) except? NULL:
523 if s is None:
524 return NULL
525 return s
526
527
528 cdef void* realloc_chk(void* ptr, size_t size) except NULL:
529 cdef void *ret = realloc(ptr, size)
530 if ret == NULL:
531 raise MemoryError("realloc failed")
532 return ret
533
534
535 cdef size_t * to_csize_t_array(list_int):
536 cdef size_t *ret = <size_t *>malloc(len(list_int) * sizeof(size_t))
537 if ret == NULL:
538 raise MemoryError("malloc failed")
539 for i in xrange(len(list_int)):
540 ret[i] = <size_t>list_int[i]
541 return ret
542
543
544 cdef char ** to_bytes_array(list_bytes):
545 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
546 if ret == NULL:
547 raise MemoryError("malloc failed")
548 for i in xrange(len(list_bytes)):
549 ret[i] = <char *>list_bytes[i]
550 return ret
551
552
553
554 cdef int __monitor_callback(void *arg, const char *line, const char *who,
555 uint64_t sec, uint64_t nsec, uint64_t seq,
556 const char *level, const char *msg) with gil:
557 cdef object cb_info = <object>arg
558 cb_info[0](cb_info[1], line, who, sec, nsec, seq, level, msg)
559 return 0
560
561 cdef int __monitor_callback2(void *arg, const char *line, const char *who,
562 const char *name,
563 uint64_t sec, uint64_t nsec, uint64_t seq,
564 const char *level, const char *msg) with gil:
565 cdef object cb_info = <object>arg
566 cb_info[0](cb_info[1], line, name, who, sec, nsec, seq, level, msg)
567 return 0
568
569
570 class Version(object):
571 """ Version information """
572 def __init__(self, major, minor, extra):
573 self.major = major
574 self.minor = minor
575 self.extra = extra
576
577 def __str__(self):
578 return "%d.%d.%d" % (self.major, self.minor, self.extra)
579
580
581 cdef class Rados(object):
582 """This class wraps librados functions"""
583 # NOTE(sileht): attributes declared in .pyd
584
585 def __init__(self, *args, **kwargs):
586 PyEval_InitThreads()
587 self.__setup(*args, **kwargs)
588
589 @requires(('rados_id', opt(str_type)), ('name', opt(str_type)), ('clustername', opt(str_type)),
590 ('conffile', opt(str_type)))
591 def __setup(self, rados_id=None, name=None, clustername=None,
592 conf_defaults=None, conffile=None, conf=None, flags=0,
593 context=None):
594 self.monitor_callback = None
595 self.monitor_callback2 = None
596 self.parsed_args = []
597 self.conf_defaults = conf_defaults
598 self.conffile = conffile
599 self.rados_id = rados_id
600
601 if rados_id and name:
602 raise Error("Rados(): can't supply both rados_id and name")
603 elif rados_id:
604 name = 'client.' + rados_id
605 elif name is None:
606 name = 'client.admin'
607 if clustername is None:
608 clustername = ''
609
610 name = cstr(name, 'name')
611 clustername = cstr(clustername, 'clustername')
612 cdef:
613 char *_name = name
614 char *_clustername = clustername
615 int _flags = flags
616 int ret
617
618 if context:
619 # Unpack void* (aka rados_config_t) from capsule
620 rados_config = <rados_config_t> PyCapsule_GetPointer(context, NULL)
621 with nogil:
622 ret = rados_create_with_context(&self.cluster, rados_config)
623 else:
624 with nogil:
625 ret = rados_create2(&self.cluster, _clustername, _name, _flags)
626 if ret != 0:
627 raise Error("rados_initialize failed with error code: %d" % ret)
628
629 self.state = "configuring"
630 # order is important: conf_defaults, then conffile, then conf
631 if conf_defaults:
632 for key, value in conf_defaults.items():
633 self.conf_set(key, value)
634 if conffile is not None:
635 # read the default conf file when '' is given
636 if conffile == '':
637 conffile = None
638 self.conf_read_file(conffile)
639 if conf:
640 for key, value in conf.items():
641 self.conf_set(key, value)
642
643 def require_state(self, *args):
644 """
645 Checks if the Rados object is in a special state
646
647 :raises: RadosStateError
648 """
649 if self.state in args:
650 return
651 raise RadosStateError("You cannot perform that operation on a \
652 Rados object in state %s." % self.state)
653
654 def shutdown(self):
655 """
656 Disconnects from the cluster. Call this explicitly when a
657 Rados.connect()ed object is no longer used.
658 """
659 if self.state != "shutdown":
660 with nogil:
661 rados_shutdown(self.cluster)
662 self.state = "shutdown"
663
664 def __enter__(self):
665 self.connect()
666 return self
667
668 def __exit__(self, type_, value, traceback):
669 self.shutdown()
670 return False
671
672 def version(self):
673 """
674 Get the version number of the ``librados`` C library.
675
676 :returns: a tuple of ``(major, minor, extra)`` components of the
677 librados version
678 """
679 cdef int major = 0
680 cdef int minor = 0
681 cdef int extra = 0
682 with nogil:
683 rados_version(&major, &minor, &extra)
684 return Version(major, minor, extra)
685
686 @requires(('path', opt(str_type)))
687 def conf_read_file(self, path=None):
688 """
689 Configure the cluster handle using a Ceph config file.
690
691 :param path: path to the config file
692 :type path: str
693 """
694 self.require_state("configuring", "connected")
695 path = cstr(path, 'path', opt=True)
696 cdef:
697 char *_path = opt_str(path)
698 with nogil:
699 ret = rados_conf_read_file(self.cluster, _path)
700 if ret != 0:
701 raise make_ex(ret, "error calling conf_read_file")
702
703 def conf_parse_argv(self, args):
704 """
705 Parse known arguments from args, and remove; returned
706 args contain only those unknown to ceph
707 """
708 self.require_state("configuring", "connected")
709 if not args:
710 return
711
712 cargs = cstr_list(args, 'args')
713 cdef:
714 int _argc = len(args)
715 char **_argv = to_bytes_array(cargs)
716 char **_remargv = NULL
717
718 try:
719 _remargv = <char **>malloc(_argc * sizeof(char *))
720 with nogil:
721 ret = rados_conf_parse_argv_remainder(self.cluster, _argc,
722 <const char**>_argv,
723 <const char**>_remargv)
724 if ret:
725 raise make_ex(ret, "error calling conf_parse_argv_remainder")
726
727 # _remargv was allocated with fixed argc; collapse return
728 # list to eliminate any missing args
729 retargs = [decode_cstr(a) for a in _remargv[:_argc]
730 if a != NULL]
731 self.parsed_args = args
732 return retargs
733 finally:
734 free(_argv)
735 free(_remargv)
736
737 def conf_parse_env(self, var='CEPH_ARGS'):
738 """
739 Parse known arguments from an environment variable, normally
740 CEPH_ARGS.
741 """
742 self.require_state("configuring", "connected")
743 if not var:
744 return
745
746 var = cstr(var, 'var')
747 cdef:
748 char *_var = var
749 with nogil:
750 ret = rados_conf_parse_env(self.cluster, _var)
751 if ret != 0:
752 raise make_ex(ret, "error calling conf_parse_env")
753
754 @requires(('option', str_type))
755 def conf_get(self, option):
756 """
757 Get the value of a configuration option
758
759 :param option: which option to read
760 :type option: str
761
762 :returns: str - value of the option or None
763 :raises: :class:`TypeError`
764 """
765 self.require_state("configuring", "connected")
766 option = cstr(option, 'option')
767 cdef:
768 char *_option = option
769 size_t length = 20
770 char *ret_buf = NULL
771
772 try:
773 while True:
774 ret_buf = <char *>realloc_chk(ret_buf, length)
775 with nogil:
776 ret = rados_conf_get(self.cluster, _option, ret_buf, length)
777 if ret == 0:
778 return decode_cstr(ret_buf)
779 elif ret == -errno.ENAMETOOLONG:
780 length = length * 2
781 elif ret == -errno.ENOENT:
782 return None
783 else:
784 raise make_ex(ret, "error calling conf_get")
785 finally:
786 free(ret_buf)
787
788 @requires(('option', str_type), ('val', str_type))
789 def conf_set(self, option, val):
790 """
791 Set the value of a configuration option
792
793 :param option: which option to set
794 :type option: str
795 :param option: value of the option
796 :type option: str
797
798 :raises: :class:`TypeError`, :class:`ObjectNotFound`
799 """
800 self.require_state("configuring", "connected")
801 option = cstr(option, 'option')
802 val = cstr(val, 'val')
803 cdef:
804 char *_option = option
805 char *_val = val
806
807 with nogil:
808 ret = rados_conf_set(self.cluster, _option, _val)
809 if ret != 0:
810 raise make_ex(ret, "error calling conf_set")
811
812 def ping_monitor(self, mon_id):
813 """
814 Ping a monitor to assess liveness
815
816 May be used as a simply way to assess liveness, or to obtain
817 information about the monitor in a simple way even in the
818 absence of quorum.
819
820 :param mon_id: the ID portion of the monitor's name (i.e., mon.<ID>)
821 :type mon_id: str
822 :returns: the string reply from the monitor
823 """
824
825 self.require_state("configuring", "connected")
826
827 mon_id = cstr(mon_id, 'mon_id')
828 cdef:
829 char *_mon_id = mon_id
830 size_t outstrlen = 0
831 char *outstr
832
833 with nogil:
834 ret = rados_ping_monitor(self.cluster, _mon_id, &outstr, &outstrlen)
835
836 if ret != 0:
837 raise make_ex(ret, "error calling ping_monitor")
838
839 if outstrlen:
840 my_outstr = outstr[:outstrlen]
841 rados_buffer_free(outstr)
842 return decode_cstr(my_outstr)
843
844 def connect(self, timeout=0):
845 """
846 Connect to the cluster. Use shutdown() to release resources.
847 """
848 self.require_state("configuring")
849 # NOTE(sileht): timeout was supported by old python API,
850 # but this is not something available in C API, so ignore
851 # for now and remove it later
852 with nogil:
853 ret = rados_connect(self.cluster)
854 if ret != 0:
855 raise make_ex(ret, "error connecting to the cluster")
856 self.state = "connected"
857
858 def get_cluster_stats(self):
859 """
860 Read usage info about the cluster
861
862 This tells you total space, space used, space available, and number
863 of objects. These are not updated immediately when data is written,
864 they are eventually consistent.
865
866 :returns: dict - contains the following keys:
867
868 - ``kb`` (int) - total space
869
870 - ``kb_used`` (int) - space used
871
872 - ``kb_avail`` (int) - free space available
873
874 - ``num_objects`` (int) - number of objects
875
876 """
877 cdef:
878 rados_cluster_stat_t stats
879
880 with nogil:
881 ret = rados_cluster_stat(self.cluster, &stats)
882
883 if ret < 0:
884 raise make_ex(
885 ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id)
886 return {'kb': stats.kb,
887 'kb_used': stats.kb_used,
888 'kb_avail': stats.kb_avail,
889 'num_objects': stats.num_objects}
890
891 @requires(('pool_name', str_type))
892 def pool_exists(self, pool_name):
893 """
894 Checks if a given pool exists.
895
896 :param pool_name: name of the pool to check
897 :type pool_name: str
898
899 :raises: :class:`TypeError`, :class:`Error`
900 :returns: bool - whether the pool exists, false otherwise.
901 """
902 self.require_state("connected")
903
904 pool_name = cstr(pool_name, 'pool_name')
905 cdef:
906 char *_pool_name = pool_name
907
908 with nogil:
909 ret = rados_pool_lookup(self.cluster, _pool_name)
910 if ret >= 0:
911 return True
912 elif ret == -errno.ENOENT:
913 return False
914 else:
915 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
916
917 @requires(('pool_name', str_type))
918 def pool_lookup(self, pool_name):
919 """
920 Returns a pool's ID based on its name.
921
922 :param pool_name: name of the pool to look up
923 :type pool_name: str
924
925 :raises: :class:`TypeError`, :class:`Error`
926 :returns: int - pool ID, or None if it doesn't exist
927 """
928 self.require_state("connected")
929 pool_name = cstr(pool_name, 'pool_name')
930 cdef:
931 char *_pool_name = pool_name
932
933 with nogil:
934 ret = rados_pool_lookup(self.cluster, _pool_name)
935 if ret >= 0:
936 return int(ret)
937 elif ret == -errno.ENOENT:
938 return None
939 else:
940 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
941
942 @requires(('pool_id', int))
943 def pool_reverse_lookup(self, pool_id):
944 """
945 Returns a pool's name based on its ID.
946
947 :param pool_id: ID of the pool to look up
948 :type pool_id: int
949
950 :raises: :class:`TypeError`, :class:`Error`
951 :returns: string - pool name, or None if it doesn't exist
952 """
953 self.require_state("connected")
954 cdef:
955 int64_t _pool_id = pool_id
956 size_t size = 512
957 char *name = NULL
958
959 try:
960 while True:
961 name = <char *>realloc_chk(name, size)
962 with nogil:
963 ret = rados_pool_reverse_lookup(self.cluster, _pool_id, name, size)
964 if ret >= 0:
965 break
966 elif ret != -errno.ERANGE and size <= 4096:
967 size *= 2
968 elif ret == -errno.ENOENT:
969 return None
970 elif ret < 0:
971 raise make_ex(ret, "error reverse looking up pool '%s'" % pool_id)
972
973 return decode_cstr(name)
974
975 finally:
976 free(name)
977
978 @requires(('pool_name', str_type), ('auid', opt(int)), ('crush_rule', opt(int)))
979 def create_pool(self, pool_name, auid=None, crush_rule=None):
980 """
981 Create a pool:
982 - with default settings: if auid=None and crush_rule=None
983 - owned by a specific auid: auid given and crush_rule=None
984 - with a specific CRUSH rule: if auid=None and crush_rule given
985 - with a specific CRUSH rule and auid: if auid and crush_rule given
986
987 :param pool_name: name of the pool to create
988 :type pool_name: str
989 :param auid: the id of the owner of the new pool
990 :type auid: int
991 :param crush_rule: rule to use for placement in the new pool
992 :type crush_rule: int
993
994 :raises: :class:`TypeError`, :class:`Error`
995 """
996 self.require_state("connected")
997
998 pool_name = cstr(pool_name, 'pool_name')
999 cdef:
1000 char *_pool_name = pool_name
1001 uint8_t _crush_rule
1002 uint64_t _auid
1003
1004 if auid is None and crush_rule is None:
1005 with nogil:
1006 ret = rados_pool_create(self.cluster, _pool_name)
1007 elif auid is None:
1008 _crush_rule = crush_rule
1009 with nogil:
1010 ret = rados_pool_create_with_crush_rule(self.cluster, _pool_name, _crush_rule)
1011 elif crush_rule is None:
1012 _auid = auid
1013 with nogil:
1014 ret = rados_pool_create_with_auid(self.cluster, _pool_name, _auid)
1015 else:
1016 _auid = auid
1017 _crush_rule = crush_rule
1018 with nogil:
1019 ret = rados_pool_create_with_all(self.cluster, _pool_name, _auid, _crush_rule)
1020 if ret < 0:
1021 raise make_ex(ret, "error creating pool '%s'" % pool_name)
1022
1023 @requires(('pool_id', int))
1024 def get_pool_base_tier(self, pool_id):
1025 """
1026 Get base pool
1027
1028 :returns: base pool, or pool_id if tiering is not configured for the pool
1029 """
1030 self.require_state("connected")
1031 cdef:
1032 int64_t base_tier = 0
1033 int64_t _pool_id = pool_id
1034
1035 with nogil:
1036 ret = rados_pool_get_base_tier(self.cluster, _pool_id, &base_tier)
1037 if ret < 0:
1038 raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id)
1039 return int(base_tier)
1040
1041 @requires(('pool_name', str_type))
1042 def delete_pool(self, pool_name):
1043 """
1044 Delete a pool and all data inside it.
1045
1046 The pool is removed from the cluster immediately,
1047 but the actual data is deleted in the background.
1048
1049 :param pool_name: name of the pool to delete
1050 :type pool_name: str
1051
1052 :raises: :class:`TypeError`, :class:`Error`
1053 """
1054 self.require_state("connected")
1055
1056 pool_name = cstr(pool_name, 'pool_name')
1057 cdef:
1058 char *_pool_name = pool_name
1059
1060 with nogil:
1061 ret = rados_pool_delete(self.cluster, _pool_name)
1062 if ret < 0:
1063 raise make_ex(ret, "error deleting pool '%s'" % pool_name)
1064
1065 @requires(('pool_id', int))
1066 def get_inconsistent_pgs(self, pool_id):
1067 """
1068 List inconsistent placement groups in the given pool
1069
1070 :param pool_id: ID of the pool in which PGs are listed
1071 :type pool_id: int
1072 :returns: list - inconsistent placement groups
1073 """
1074 self.require_state("connected")
1075 cdef:
1076 int64_t pool = pool_id
1077 size_t size = 512
1078 char *pgs = NULL
1079
1080 try:
1081 while True:
1082 pgs = <char *>realloc_chk(pgs, size);
1083 with nogil:
1084 ret = rados_inconsistent_pg_list(self.cluster, pool,
1085 pgs, size)
1086 if ret > <int>size:
1087 size *= 2
1088 elif ret >= 0:
1089 break
1090 else:
1091 raise make_ex(ret, "error calling inconsistent_pg_list")
1092 return [pg for pg in decode_cstr(pgs[:ret]).split('\0') if pg]
1093 finally:
1094 free(pgs)
1095
1096 def list_pools(self):
1097 """
1098 Gets a list of pool names.
1099
1100 :returns: list - of pool names.
1101 """
1102 self.require_state("connected")
1103 cdef:
1104 size_t size = 512
1105 char *c_names = NULL
1106
1107 try:
1108 while True:
1109 c_names = <char *>realloc_chk(c_names, size)
1110 with nogil:
1111 ret = rados_pool_list(self.cluster, c_names, size)
1112 if ret > <int>size:
1113 size *= 2
1114 elif ret >= 0:
1115 break
1116 return [name for name in decode_cstr(c_names[:ret]).split('\0')
1117 if name]
1118 finally:
1119 free(c_names)
1120
1121 def get_fsid(self):
1122 """
1123 Get the fsid of the cluster as a hexadecimal string.
1124
1125 :raises: :class:`Error`
1126 :returns: str - cluster fsid
1127 """
1128 self.require_state("connected")
1129 cdef:
1130 char *ret_buf
1131 size_t buf_len = 37
1132 PyObject* ret_s = NULL
1133
1134 ret_s = PyBytes_FromStringAndSize(NULL, buf_len)
1135 try:
1136 ret_buf = PyBytes_AsString(ret_s)
1137 with nogil:
1138 ret = rados_cluster_fsid(self.cluster, ret_buf, buf_len)
1139 if ret < 0:
1140 raise make_ex(ret, "error getting cluster fsid")
1141 if ret != <int>buf_len:
1142 _PyBytes_Resize(&ret_s, ret)
1143 return <object>ret_s
1144 finally:
1145 # We DECREF unconditionally: the cast to object above will have
1146 # INCREFed if necessary. This also takes care of exceptions,
1147 # including if _PyString_Resize fails (that will free the string
1148 # itself and set ret_s to NULL, hence XDECREF).
1149 ref.Py_XDECREF(ret_s)
1150
1151 @requires(('ioctx_name', str_type))
1152 def open_ioctx(self, ioctx_name):
1153 """
1154 Create an io context
1155
1156 The io context allows you to perform operations within a particular
1157 pool.
1158
1159 :param ioctx_name: name of the pool
1160 :type ioctx_name: str
1161
1162 :raises: :class:`TypeError`, :class:`Error`
1163 :returns: Ioctx - Rados Ioctx object
1164 """
1165 self.require_state("connected")
1166 ioctx_name = cstr(ioctx_name, 'ioctx_name')
1167 cdef:
1168 rados_ioctx_t ioctx
1169 char *_ioctx_name = ioctx_name
1170 with nogil:
1171 ret = rados_ioctx_create(self.cluster, _ioctx_name, &ioctx)
1172 if ret < 0:
1173 raise make_ex(ret, "error opening pool '%s'" % ioctx_name)
1174 io = Ioctx(ioctx_name)
1175 io.io = ioctx
1176 return io
1177
1178 def mon_command(self, cmd, inbuf, timeout=0, target=None):
1179 """
1180 mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen)
1181 returns (int ret, string outbuf, string outs)
1182 """
1183 # NOTE(sileht): timeout is ignored because C API doesn't provide
1184 # timeout argument, but we keep it for backward compat with old python binding
1185
1186 self.require_state("connected")
1187 cmd = cstr_list(cmd, 'c')
1188
1189 if isinstance(target, int):
1190 # NOTE(sileht): looks weird but test_monmap_dump pass int
1191 target = str(target)
1192
1193 target = cstr(target, 'target', opt=True)
1194 inbuf = cstr(inbuf, 'inbuf')
1195
1196 cdef:
1197 char *_target = opt_str(target)
1198 char **_cmd = to_bytes_array(cmd)
1199 size_t _cmdlen = len(cmd)
1200
1201 char *_inbuf = inbuf
1202 size_t _inbuf_len = len(inbuf)
1203
1204 char *_outbuf
1205 size_t _outbuf_len
1206 char *_outs
1207 size_t _outs_len
1208
1209 try:
1210 if target:
1211 with nogil:
1212 ret = rados_mon_command_target(self.cluster, _target,
1213 <const char **>_cmd, _cmdlen,
1214 <const char*>_inbuf, _inbuf_len,
1215 &_outbuf, &_outbuf_len,
1216 &_outs, &_outs_len)
1217 else:
1218 with nogil:
1219 ret = rados_mon_command(self.cluster,
1220 <const char **>_cmd, _cmdlen,
1221 <const char*>_inbuf, _inbuf_len,
1222 &_outbuf, &_outbuf_len,
1223 &_outs, &_outs_len)
1224
1225 my_outs = decode_cstr(_outs[:_outs_len])
1226 my_outbuf = _outbuf[:_outbuf_len]
1227 if _outs_len:
1228 rados_buffer_free(_outs)
1229 if _outbuf_len:
1230 rados_buffer_free(_outbuf)
1231 return (ret, my_outbuf, my_outs)
1232 finally:
1233 free(_cmd)
1234
1235 def osd_command(self, osdid, cmd, inbuf, timeout=0):
1236 """
1237 osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1238 returns (int ret, string outbuf, string outs)
1239 """
1240 # NOTE(sileht): timeout is ignored because C API doesn't provide
1241 # timeout argument, but we keep it for backward compat with old python binding
1242 self.require_state("connected")
1243
1244 cmd = cstr_list(cmd, 'cmd')
1245 inbuf = cstr(inbuf, 'inbuf')
1246
1247 cdef:
1248 int _osdid = osdid
1249 char **_cmd = to_bytes_array(cmd)
1250 size_t _cmdlen = len(cmd)
1251
1252 char *_inbuf = inbuf
1253 size_t _inbuf_len = len(inbuf)
1254
1255 char *_outbuf
1256 size_t _outbuf_len
1257 char *_outs
1258 size_t _outs_len
1259
1260 try:
1261 with nogil:
1262 ret = rados_osd_command(self.cluster, _osdid,
1263 <const char **>_cmd, _cmdlen,
1264 <const char*>_inbuf, _inbuf_len,
1265 &_outbuf, &_outbuf_len,
1266 &_outs, &_outs_len)
1267
1268 my_outs = decode_cstr(_outs[:_outs_len])
1269 my_outbuf = _outbuf[:_outbuf_len]
1270 if _outs_len:
1271 rados_buffer_free(_outs)
1272 if _outbuf_len:
1273 rados_buffer_free(_outbuf)
1274 return (ret, my_outbuf, my_outs)
1275 finally:
1276 free(_cmd)
1277
1278 def mgr_command(self, cmd, inbuf, timeout=0):
1279 """
1280 returns (int ret, string outbuf, string outs)
1281 """
1282 # NOTE(sileht): timeout is ignored because C API doesn't provide
1283 # timeout argument, but we keep it for backward compat with old python binding
1284 self.require_state("connected")
1285
1286 cmd = cstr_list(cmd, 'cmd')
1287 inbuf = cstr(inbuf, 'inbuf')
1288
1289 cdef:
1290 char **_cmd = to_bytes_array(cmd)
1291 size_t _cmdlen = len(cmd)
1292
1293 char *_inbuf = inbuf
1294 size_t _inbuf_len = len(inbuf)
1295
1296 char *_outbuf
1297 size_t _outbuf_len
1298 char *_outs
1299 size_t _outs_len
1300
1301 try:
1302 with nogil:
1303 ret = rados_mgr_command(self.cluster,
1304 <const char **>_cmd, _cmdlen,
1305 <const char*>_inbuf, _inbuf_len,
1306 &_outbuf, &_outbuf_len,
1307 &_outs, &_outs_len)
1308
1309 my_outs = decode_cstr(_outs[:_outs_len])
1310 my_outbuf = _outbuf[:_outbuf_len]
1311 if _outs_len:
1312 rados_buffer_free(_outs)
1313 if _outbuf_len:
1314 rados_buffer_free(_outbuf)
1315 return (ret, my_outbuf, my_outs)
1316 finally:
1317 free(_cmd)
1318
1319 def pg_command(self, pgid, cmd, inbuf, timeout=0):
1320 """
1321 pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1322 returns (int ret, string outbuf, string outs)
1323 """
1324 # NOTE(sileht): timeout is ignored because C API doesn't provide
1325 # timeout argument, but we keep it for backward compat with old python binding
1326 self.require_state("connected")
1327
1328 pgid = cstr(pgid, 'pgid')
1329 cmd = cstr_list(cmd, 'cmd')
1330 inbuf = cstr(inbuf, 'inbuf')
1331
1332 cdef:
1333 char *_pgid = pgid
1334 char **_cmd = to_bytes_array(cmd)
1335 size_t _cmdlen = len(cmd)
1336
1337 char *_inbuf = inbuf
1338 size_t _inbuf_len = len(inbuf)
1339
1340 char *_outbuf
1341 size_t _outbuf_len
1342 char *_outs
1343 size_t _outs_len
1344
1345 try:
1346 with nogil:
1347 ret = rados_pg_command(self.cluster, _pgid,
1348 <const char **>_cmd, _cmdlen,
1349 <const char *>_inbuf, _inbuf_len,
1350 &_outbuf, &_outbuf_len,
1351 &_outs, &_outs_len)
1352
1353 my_outs = decode_cstr(_outs[:_outs_len])
1354 my_outbuf = _outbuf[:_outbuf_len]
1355 if _outs_len:
1356 rados_buffer_free(_outs)
1357 if _outbuf_len:
1358 rados_buffer_free(_outbuf)
1359 return (ret, my_outbuf, my_outs)
1360 finally:
1361 free(_cmd)
1362
1363 def wait_for_latest_osdmap(self):
1364 self.require_state("connected")
1365 with nogil:
1366 ret = rados_wait_for_latest_osdmap(self.cluster)
1367 return ret
1368
1369 def blacklist_add(self, client_address, expire_seconds=0):
1370 """
1371 Blacklist a client from the OSDs
1372
1373 :param client_address: client address
1374 :type client_address: str
1375 :param expire_seconds: number of seconds to blacklist
1376 :type expire_seconds: int
1377
1378 :raises: :class:`Error`
1379 """
1380 self.require_state("connected")
1381 client_address = cstr(client_address, 'client_address')
1382 cdef:
1383 uint32_t _expire_seconds = expire_seconds
1384 char *_client_address = client_address
1385
1386 with nogil:
1387 ret = rados_blacklist_add(self.cluster, _client_address, _expire_seconds)
1388 if ret < 0:
1389 raise make_ex(ret, "error blacklisting client '%s'" % client_address)
1390
1391 def monitor_log(self, level, callback, arg):
1392 if level not in MONITOR_LEVELS:
1393 raise LogicError("invalid monitor level " + level)
1394 if callback is not None and not callable(callback):
1395 raise LogicError("callback must be a callable function or None")
1396
1397 level = cstr(level, 'level')
1398 cdef char *_level = level
1399
1400 if callback is None:
1401 with nogil:
1402 r = rados_monitor_log(self.cluster, <const char*>_level, NULL, NULL)
1403 self.monitor_callback = None
1404 self.monitor_callback2 = None
1405 return
1406
1407 cb = (callback, arg)
1408 cdef PyObject* _arg = <PyObject*>cb
1409 with nogil:
1410 r = rados_monitor_log(self.cluster, <const char*>_level,
1411 <rados_log_callback_t>&__monitor_callback, _arg)
1412
1413 if r:
1414 raise make_ex(r, 'error calling rados_monitor_log')
1415 # NOTE(sileht): Prevents the callback method from being garbage collected
1416 self.monitor_callback = cb
1417 self.monitor_callback2 = None
1418
1419 def monitor_log2(self, level, callback, arg):
1420 if level not in MONITOR_LEVELS:
1421 raise LogicError("invalid monitor level " + level)
1422 if callback is not None and not callable(callback):
1423 raise LogicError("callback must be a callable function or None")
1424
1425 level = cstr(level, 'level')
1426 cdef char *_level = level
1427
1428 if callback is None:
1429 with nogil:
1430 r = rados_monitor_log2(self.cluster, <const char*>_level, NULL, NULL)
1431 self.monitor_callback = None
1432 self.monitor_callback2 = None
1433 return
1434
1435 cb = (callback, arg)
1436 cdef PyObject* _arg = <PyObject*>cb
1437 with nogil:
1438 r = rados_monitor_log2(self.cluster, <const char*>_level,
1439 <rados_log_callback2_t>&__monitor_callback2, _arg)
1440
1441 if r:
1442 raise make_ex(r, 'error calling rados_monitor_log')
1443 # NOTE(sileht): Prevents the callback method from being garbage collected
1444 self.monitor_callback = None
1445 self.monitor_callback2 = cb
1446
1447
1448 cdef class OmapIterator(object):
1449 """Omap iterator"""
1450
1451 cdef public Ioctx ioctx
1452 cdef rados_omap_iter_t ctx
1453
1454 def __cinit__(self, Ioctx ioctx):
1455 self.ioctx = ioctx
1456
1457 def __iter__(self):
1458 return self
1459
1460 def __next__(self):
1461 """
1462 Get the next key-value pair in the object
1463 :returns: next rados.OmapItem
1464 """
1465 cdef:
1466 char *key_ = NULL
1467 char *val_ = NULL
1468 size_t len_
1469
1470 with nogil:
1471 ret = rados_omap_get_next(self.ctx, &key_, &val_, &len_)
1472
1473 if ret != 0:
1474 raise make_ex(ret, "error iterating over the omap")
1475 if key_ == NULL:
1476 raise StopIteration()
1477 key = decode_cstr(key_)
1478 val = None
1479 if val_ != NULL:
1480 val = val_[:len_]
1481 return (key, val)
1482
1483 def __dealloc__(self):
1484 with nogil:
1485 rados_omap_get_end(self.ctx)
1486
1487
1488 cdef class ObjectIterator(object):
1489 """rados.Ioctx Object iterator"""
1490
1491 cdef rados_list_ctx_t ctx
1492
1493 cdef public object ioctx
1494
1495 def __cinit__(self, Ioctx ioctx):
1496 self.ioctx = ioctx
1497
1498 with nogil:
1499 ret = rados_nobjects_list_open(ioctx.io, &self.ctx)
1500 if ret < 0:
1501 raise make_ex(ret, "error iterating over the objects in ioctx '%s'"
1502 % self.ioctx.name)
1503
1504 def __iter__(self):
1505 return self
1506
1507 def __next__(self):
1508 """
1509 Get the next object name and locator in the pool
1510
1511 :raises: StopIteration
1512 :returns: next rados.Ioctx Object
1513 """
1514 cdef:
1515 const char *key_ = NULL
1516 const char *locator_ = NULL
1517 const char *nspace_ = NULL
1518
1519 with nogil:
1520 ret = rados_nobjects_list_next(self.ctx, &key_, &locator_, &nspace_)
1521
1522 if ret < 0:
1523 raise StopIteration()
1524
1525 key = decode_cstr(key_)
1526 locator = decode_cstr(locator_) if locator_ != NULL else None
1527 nspace = decode_cstr(nspace_) if nspace_ != NULL else None
1528 return Object(self.ioctx, key, locator, nspace)
1529
1530 def __dealloc__(self):
1531 with nogil:
1532 rados_nobjects_list_close(self.ctx)
1533
1534
1535 cdef class XattrIterator(object):
1536 """Extended attribute iterator"""
1537
1538 cdef rados_xattrs_iter_t it
1539 cdef char* _oid
1540
1541 cdef public Ioctx ioctx
1542 cdef public object oid
1543
1544 def __cinit__(self, Ioctx ioctx, oid):
1545 self.ioctx = ioctx
1546 self.oid = cstr(oid, 'oid')
1547 self._oid = self.oid
1548
1549 with nogil:
1550 ret = rados_getxattrs(ioctx.io, self._oid, &self.it)
1551 if ret != 0:
1552 raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid)
1553
1554 def __iter__(self):
1555 return self
1556
1557 def __next__(self):
1558 """
1559 Get the next xattr on the object
1560
1561 :raises: StopIteration
1562 :returns: pair - of name and value of the next Xattr
1563 """
1564 cdef:
1565 const char *name_ = NULL
1566 const char *val_ = NULL
1567 size_t len_ = 0
1568
1569 with nogil:
1570 ret = rados_getxattrs_next(self.it, &name_, &val_, &len_)
1571 if ret != 0:
1572 raise make_ex(ret, "error iterating over the extended attributes \
1573 in '%s'" % self.oid)
1574 if name_ == NULL:
1575 raise StopIteration()
1576 name = decode_cstr(name_)
1577 val = val_[:len_]
1578 return (name, val)
1579
1580 def __dealloc__(self):
1581 with nogil:
1582 rados_getxattrs_end(self.it)
1583
1584
1585 cdef class SnapIterator(object):
1586 """Snapshot iterator"""
1587
1588 cdef public Ioctx ioctx
1589
1590 cdef rados_snap_t *snaps
1591 cdef int max_snap
1592 cdef int cur_snap
1593
1594 def __cinit__(self, Ioctx ioctx):
1595 self.ioctx = ioctx
1596 # We don't know how big a buffer we need until we've called the
1597 # function. So use the exponential doubling strategy.
1598 cdef int num_snaps = 10
1599 while True:
1600 self.snaps = <rados_snap_t*>realloc_chk(self.snaps,
1601 num_snaps *
1602 sizeof(rados_snap_t))
1603
1604 with nogil:
1605 ret = rados_ioctx_snap_list(ioctx.io, self.snaps, num_snaps)
1606 if ret >= 0:
1607 self.max_snap = ret
1608 break
1609 elif ret != -errno.ERANGE:
1610 raise make_ex(ret, "error calling rados_snap_list for \
1611 ioctx '%s'" % self.ioctx.name)
1612 num_snaps = num_snaps * 2
1613 self.cur_snap = 0
1614
1615 def __iter__(self):
1616 return self
1617
1618 def __next__(self):
1619 """
1620 Get the next Snapshot
1621
1622 :raises: :class:`Error`, StopIteration
1623 :returns: Snap - next snapshot
1624 """
1625 if self.cur_snap >= self.max_snap:
1626 raise StopIteration
1627
1628 cdef:
1629 rados_snap_t snap_id = self.snaps[self.cur_snap]
1630 int name_len = 10
1631 char *name = NULL
1632
1633 try:
1634 while True:
1635 name = <char *>realloc_chk(name, name_len)
1636 with nogil:
1637 ret = rados_ioctx_snap_get_name(self.ioctx.io, snap_id, name, name_len)
1638 if ret == 0:
1639 break
1640 elif ret != -errno.ERANGE:
1641 raise make_ex(ret, "rados_snap_get_name error")
1642 else:
1643 name_len = name_len * 2
1644
1645 snap = Snap(self.ioctx, decode_cstr(name[:name_len]).rstrip('\0'), snap_id)
1646 self.cur_snap = self.cur_snap + 1
1647 return snap
1648 finally:
1649 free(name)
1650
1651
1652 cdef class Snap(object):
1653 """Snapshot object"""
1654 cdef public Ioctx ioctx
1655 cdef public object name
1656
1657 # NOTE(sileht): old API was storing the ctypes object
1658 # instead of the value ....
1659 cdef public rados_snap_t snap_id
1660
1661 def __cinit__(self, Ioctx ioctx, object name, rados_snap_t snap_id):
1662 self.ioctx = ioctx
1663 self.name = name
1664 self.snap_id = snap_id
1665
1666 def __str__(self):
1667 return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \
1668 % (str(self.ioctx), self.name, self.snap_id)
1669
1670 def get_timestamp(self):
1671 """
1672 Find when a snapshot in the current pool occurred
1673
1674 :raises: :class:`Error`
1675 :returns: datetime - the data and time the snapshot was created
1676 """
1677 cdef time_t snap_time
1678
1679 with nogil:
1680 ret = rados_ioctx_snap_get_stamp(self.ioctx.io, self.snap_id, &snap_time)
1681 if ret != 0:
1682 raise make_ex(ret, "rados_ioctx_snap_get_stamp error")
1683 return datetime.fromtimestamp(snap_time)
1684
1685
1686 cdef class Completion(object):
1687 """completion object"""
1688
1689 cdef public:
1690 Ioctx ioctx
1691 object oncomplete
1692 object onsafe
1693
1694 cdef:
1695 rados_callback_t complete_cb
1696 rados_callback_t safe_cb
1697 rados_completion_t rados_comp
1698 PyObject* buf
1699
1700 def __cinit__(self, Ioctx ioctx, object oncomplete, object onsafe):
1701 self.oncomplete = oncomplete
1702 self.onsafe = onsafe
1703 self.ioctx = ioctx
1704
1705 def is_safe(self):
1706 """
1707 Is an asynchronous operation safe?
1708
1709 This does not imply that the safe callback has finished.
1710
1711 :returns: True if the operation is safe
1712 """
1713 with nogil:
1714 ret = rados_aio_is_safe(self.rados_comp)
1715 return ret == 1
1716
1717 def is_complete(self):
1718 """
1719 Has an asynchronous operation completed?
1720
1721 This does not imply that the safe callback has finished.
1722
1723 :returns: True if the operation is completed
1724 """
1725 with nogil:
1726 ret = rados_aio_is_complete(self.rados_comp)
1727 return ret == 1
1728
1729 def wait_for_safe(self):
1730 """
1731 Wait for an asynchronous operation to be marked safe
1732
1733 This does not imply that the safe callback has finished.
1734 """
1735 with nogil:
1736 rados_aio_wait_for_safe(self.rados_comp)
1737
1738 def wait_for_complete(self):
1739 """
1740 Wait for an asynchronous operation to complete
1741
1742 This does not imply that the complete callback has finished.
1743 """
1744 with nogil:
1745 rados_aio_wait_for_complete(self.rados_comp)
1746
1747 def wait_for_safe_and_cb(self):
1748 """
1749 Wait for an asynchronous operation to be marked safe and for
1750 the safe callback to have returned
1751 """
1752 with nogil:
1753 rados_aio_wait_for_safe_and_cb(self.rados_comp)
1754
1755 def wait_for_complete_and_cb(self):
1756 """
1757 Wait for an asynchronous operation to complete and for the
1758 complete callback to have returned
1759
1760 :returns: whether the operation is completed
1761 """
1762 with nogil:
1763 ret = rados_aio_wait_for_complete_and_cb(self.rados_comp)
1764 return ret
1765
1766 def get_return_value(self):
1767 """
1768 Get the return value of an asychronous operation
1769
1770 The return value is set when the operation is complete or safe,
1771 whichever comes first.
1772
1773 :returns: int - return value of the operation
1774 """
1775 with nogil:
1776 ret = rados_aio_get_return_value(self.rados_comp)
1777 return ret
1778
1779 def __dealloc__(self):
1780 """
1781 Release a completion
1782
1783 Call this when you no longer need the completion. It may not be
1784 freed immediately if the operation is not acked and committed.
1785 """
1786 ref.Py_XDECREF(self.buf)
1787 self.buf = NULL
1788 if self.rados_comp != NULL:
1789 with nogil:
1790 rados_aio_release(self.rados_comp)
1791 self.rados_comp = NULL
1792
1793 def _complete(self):
1794 self.oncomplete(self)
1795 with self.ioctx.lock:
1796 if self.oncomplete:
1797 self.ioctx.complete_completions.remove(self)
1798
1799 def _safe(self):
1800 self.onsafe(self)
1801 with self.ioctx.lock:
1802 if self.onsafe:
1803 self.ioctx.safe_completions.remove(self)
1804
1805 def _cleanup(self):
1806 with self.ioctx.lock:
1807 if self.oncomplete:
1808 self.ioctx.complete_completions.remove(self)
1809 if self.onsafe:
1810 self.ioctx.safe_completions.remove(self)
1811
1812
1813 class OpCtx(object):
1814 def __enter__(self):
1815 return self.create()
1816
1817 def __exit__(self, type, msg, traceback):
1818 self.release()
1819
1820
1821 cdef class WriteOp(object):
1822 cdef rados_write_op_t write_op
1823
1824 def create(self):
1825 with nogil:
1826 self.write_op = rados_create_write_op()
1827 return self
1828
1829 def release(self):
1830 with nogil:
1831 rados_release_write_op(self.write_op)
1832
1833 @requires(('exclusive', opt(int)))
1834 def new(self, exclusive=None):
1835 """
1836 Create the object.
1837 """
1838
1839 cdef:
1840 int _exclusive = exclusive
1841
1842 with nogil:
1843 rados_write_op_create(self.write_op, _exclusive, NULL)
1844
1845
1846 def remove(self):
1847 """
1848 Remove object.
1849 """
1850 with nogil:
1851 rados_write_op_remove(self.write_op)
1852
1853 @requires(('flags', int))
1854 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
1855 """
1856 Set flags for the last operation added to this write_op.
1857 :para flags: flags to apply to the last operation
1858 :type flags: int
1859 """
1860
1861 cdef:
1862 int _flags = flags
1863
1864 with nogil:
1865 rados_write_op_set_flags(self.write_op, _flags)
1866
1867 @requires(('to_write', bytes))
1868 def append(self, to_write):
1869 """
1870 Append data to an object synchronously
1871 :param to_write: data to write
1872 :type to_write: bytes
1873 """
1874
1875 cdef:
1876 char *_to_write = to_write
1877 size_t length = len(to_write)
1878
1879 with nogil:
1880 rados_write_op_append(self.write_op, _to_write, length)
1881
1882 @requires(('to_write', bytes))
1883 def write_full(self, to_write):
1884 """
1885 Write whole object, atomically replacing it.
1886 :param to_write: data to write
1887 :type to_write: bytes
1888 """
1889
1890 cdef:
1891 char *_to_write = to_write
1892 size_t length = len(to_write)
1893
1894 with nogil:
1895 rados_write_op_write_full(self.write_op, _to_write, length)
1896
1897 @requires(('to_write', bytes), ('offset', int))
1898 def write(self, to_write, offset=0):
1899 """
1900 Write to offset.
1901 :param to_write: data to write
1902 :type to_write: bytes
1903 :param offset: byte offset in the object to begin writing at
1904 :type offset: int
1905 """
1906
1907 cdef:
1908 char *_to_write = to_write
1909 size_t length = len(to_write)
1910 uint64_t _offset = offset
1911
1912 with nogil:
1913 rados_write_op_write(self.write_op, _to_write, length, _offset)
1914
1915 @requires(('offset', int), ('length', int))
1916 def zero(self, offset, length):
1917 """
1918 Zero part of an object.
1919 :param offset: byte offset in the object to begin writing at
1920 :type offset: int
1921 :param offset: number of zero to write
1922 :type offset: int
1923 """
1924
1925 cdef:
1926 size_t _length = length
1927 uint64_t _offset = offset
1928
1929 with nogil:
1930 rados_write_op_zero(self.write_op, _length, _offset)
1931
1932 @requires(('offset', int))
1933 def truncate(self, offset):
1934 """
1935 Truncate an object.
1936 :param offset: byte offset in the object to begin truncating at
1937 :type offset: int
1938 """
1939
1940 cdef:
1941 uint64_t _offset = offset
1942
1943 with nogil:
1944 rados_write_op_truncate(self.write_op, _offset)
1945
1946
1947 class WriteOpCtx(WriteOp, OpCtx):
1948 """write operation context manager"""
1949
1950
1951 cdef class ReadOp(object):
1952 cdef rados_read_op_t read_op
1953
1954 def create(self):
1955 with nogil:
1956 self.read_op = rados_create_read_op()
1957 return self
1958
1959 def release(self):
1960 with nogil:
1961 rados_release_read_op(self.read_op)
1962
1963 @requires(('flags', int))
1964 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
1965 """
1966 Set flags for the last operation added to this read_op.
1967 :para flags: flags to apply to the last operation
1968 :type flags: int
1969 """
1970
1971 cdef:
1972 int _flags = flags
1973
1974 with nogil:
1975 rados_read_op_set_flags(self.read_op, _flags)
1976
1977
1978 class ReadOpCtx(ReadOp, OpCtx):
1979 """read operation context manager"""
1980
1981
1982 cdef int __aio_safe_cb(rados_completion_t completion, void *args) with gil:
1983 """
1984 Callback to onsafe() for asynchronous operations
1985 """
1986 cdef object cb = <object>args
1987 cb._safe()
1988 return 0
1989
1990
1991 cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
1992 """
1993 Callback to oncomplete() for asynchronous operations
1994 """
1995 cdef object cb = <object>args
1996 cb._complete()
1997 return 0
1998
1999
2000 cdef class Ioctx(object):
2001 """rados.Ioctx object"""
2002 # NOTE(sileht): attributes declared in .pyd
2003
2004 def __init__(self, name):
2005 self.name = name
2006 self.state = "open"
2007
2008 self.locator_key = ""
2009 self.nspace = ""
2010 self.lock = threading.Lock()
2011 self.safe_completions = []
2012 self.complete_completions = []
2013
2014 def __enter__(self):
2015 return self
2016
2017 def __exit__(self, type_, value, traceback):
2018 self.close()
2019 return False
2020
2021 def __dealloc__(self):
2022 self.close()
2023
2024 def __track_completion(self, completion_obj):
2025 if completion_obj.oncomplete:
2026 with self.lock:
2027 self.complete_completions.append(completion_obj)
2028 if completion_obj.onsafe:
2029 with self.lock:
2030 self.safe_completions.append(completion_obj)
2031
2032 def __get_completion(self, oncomplete, onsafe):
2033 """
2034 Constructs a completion to use with asynchronous operations
2035
2036 :param oncomplete: what to do when the write is safe and complete in memory
2037 on all replicas
2038 :type oncomplete: completion
2039 :param onsafe: what to do when the write is safe and complete on storage
2040 on all replicas
2041 :type onsafe: completion
2042
2043 :raises: :class:`Error`
2044 :returns: completion object
2045 """
2046
2047 completion_obj = Completion(self, oncomplete, onsafe)
2048
2049 cdef:
2050 rados_callback_t complete_cb = NULL
2051 rados_callback_t safe_cb = NULL
2052 rados_completion_t completion
2053 PyObject* p_completion_obj= <PyObject*>completion_obj
2054
2055 if oncomplete:
2056 complete_cb = <rados_callback_t>&__aio_complete_cb
2057 if onsafe:
2058 safe_cb = <rados_callback_t>&__aio_safe_cb
2059
2060 with nogil:
2061 ret = rados_aio_create_completion(p_completion_obj, complete_cb, safe_cb,
2062 &completion)
2063 if ret < 0:
2064 raise make_ex(ret, "error getting a completion")
2065
2066 completion_obj.rados_comp = completion
2067 return completion_obj
2068
2069 @requires(('object_name', str_type), ('oncomplete', opt(Callable)))
2070 def aio_stat(self, object_name, oncomplete):
2071 """
2072 Asynchronously get object stats (size/mtime)
2073
2074 oncomplete will be called with the returned size and mtime
2075 as well as the completion:
2076
2077 oncomplete(completion, size, mtime)
2078
2079 :param object_name: the name of the object to get stats from
2080 :type object_name: str
2081 :param oncomplete: what to do when the stat is complete
2082 :type oncomplete: completion
2083
2084 :raises: :class:`Error`
2085 :returns: completion object
2086 """
2087
2088 object_name = cstr(object_name, 'object_name')
2089
2090 cdef:
2091 Completion completion
2092 char *_object_name = object_name
2093 uint64_t psize
2094 time_t pmtime
2095
2096 def oncomplete_(completion_v):
2097 cdef Completion _completion_v = completion_v
2098 return_value = _completion_v.get_return_value()
2099 if return_value >= 0:
2100 return oncomplete(_completion_v, psize, time.localtime(pmtime))
2101 else:
2102 return oncomplete(_completion_v, None, None)
2103
2104 completion = self.__get_completion(oncomplete_, None)
2105 self.__track_completion(completion)
2106 with nogil:
2107 ret = rados_aio_stat(self.io, _object_name, completion.rados_comp,
2108 &psize, &pmtime)
2109
2110 if ret < 0:
2111 completion._cleanup()
2112 raise make_ex(ret, "error stating %s" % object_name)
2113 return completion
2114
2115 @requires(('object_name', str_type), ('to_write', bytes), ('offset', int),
2116 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2117 def aio_write(self, object_name, to_write, offset=0,
2118 oncomplete=None, onsafe=None):
2119 """
2120 Write data to an object asynchronously
2121
2122 Queues the write and returns.
2123
2124 :param object_name: name of the object
2125 :type object_name: str
2126 :param to_write: data to write
2127 :type to_write: bytes
2128 :param offset: byte offset in the object to begin writing at
2129 :type offset: int
2130 :param oncomplete: what to do when the write is safe and complete in memory
2131 on all replicas
2132 :type oncomplete: completion
2133 :param onsafe: what to do when the write is safe and complete on storage
2134 on all replicas
2135 :type onsafe: completion
2136
2137 :raises: :class:`Error`
2138 :returns: completion object
2139 """
2140
2141 object_name = cstr(object_name, 'object_name')
2142
2143 cdef:
2144 Completion completion
2145 char* _object_name = object_name
2146 char* _to_write = to_write
2147 size_t size = len(to_write)
2148 uint64_t _offset = offset
2149
2150 completion = self.__get_completion(oncomplete, onsafe)
2151 self.__track_completion(completion)
2152 with nogil:
2153 ret = rados_aio_write(self.io, _object_name, completion.rados_comp,
2154 _to_write, size, _offset)
2155 if ret < 0:
2156 completion._cleanup()
2157 raise make_ex(ret, "error writing object %s" % object_name)
2158 return completion
2159
2160 @requires(('object_name', str_type), ('to_write', bytes), ('oncomplete', opt(Callable)),
2161 ('onsafe', opt(Callable)))
2162 def aio_write_full(self, object_name, to_write,
2163 oncomplete=None, onsafe=None):
2164 """
2165 Asychronously write an entire object
2166
2167 The object is filled with the provided data. If the object exists,
2168 it is atomically truncated and then written.
2169 Queues the write and returns.
2170
2171 :param object_name: name of the object
2172 :type object_name: str
2173 :param to_write: data to write
2174 :type to_write: str
2175 :param oncomplete: what to do when the write is safe and complete in memory
2176 on all replicas
2177 :type oncomplete: completion
2178 :param onsafe: what to do when the write is safe and complete on storage
2179 on all replicas
2180 :type onsafe: completion
2181
2182 :raises: :class:`Error`
2183 :returns: completion object
2184 """
2185
2186 object_name = cstr(object_name, 'object_name')
2187
2188 cdef:
2189 Completion completion
2190 char* _object_name = object_name
2191 char* _to_write = to_write
2192 size_t size = len(to_write)
2193
2194 completion = self.__get_completion(oncomplete, onsafe)
2195 self.__track_completion(completion)
2196 with nogil:
2197 ret = rados_aio_write_full(self.io, _object_name,
2198 completion.rados_comp,
2199 _to_write, size)
2200 if ret < 0:
2201 completion._cleanup()
2202 raise make_ex(ret, "error writing object %s" % object_name)
2203 return completion
2204
2205 @requires(('object_name', str_type), ('to_append', bytes), ('oncomplete', opt(Callable)),
2206 ('onsafe', opt(Callable)))
2207 def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):
2208 """
2209 Asychronously append data to an object
2210
2211 Queues the write and returns.
2212
2213 :param object_name: name of the object
2214 :type object_name: str
2215 :param to_append: data to append
2216 :type to_append: str
2217 :param offset: byte offset in the object to begin writing at
2218 :type offset: int
2219 :param oncomplete: what to do when the write is safe and complete in memory
2220 on all replicas
2221 :type oncomplete: completion
2222 :param onsafe: what to do when the write is safe and complete on storage
2223 on all replicas
2224 :type onsafe: completion
2225
2226 :raises: :class:`Error`
2227 :returns: completion object
2228 """
2229 object_name = cstr(object_name, 'object_name')
2230
2231 cdef:
2232 Completion completion
2233 char* _object_name = object_name
2234 char* _to_append = to_append
2235 size_t size = len(to_append)
2236
2237 completion = self.__get_completion(oncomplete, onsafe)
2238 self.__track_completion(completion)
2239 with nogil:
2240 ret = rados_aio_append(self.io, _object_name,
2241 completion.rados_comp,
2242 _to_append, size)
2243 if ret < 0:
2244 completion._cleanup()
2245 raise make_ex(ret, "error appending object %s" % object_name)
2246 return completion
2247
2248 def aio_flush(self):
2249 """
2250 Block until all pending writes in an io context are safe
2251
2252 :raises: :class:`Error`
2253 """
2254 with nogil:
2255 ret = rados_aio_flush(self.io)
2256 if ret < 0:
2257 raise make_ex(ret, "error flushing")
2258
2259 @requires(('object_name', str_type), ('length', int), ('offset', int),
2260 ('oncomplete', opt(Callable)))
2261 def aio_read(self, object_name, length, offset, oncomplete):
2262 """
2263 Asychronously read data from an object
2264
2265 oncomplete will be called with the returned read value as
2266 well as the completion:
2267
2268 oncomplete(completion, data_read)
2269
2270 :param object_name: name of the object to read from
2271 :type object_name: str
2272 :param length: the number of bytes to read
2273 :type length: int
2274 :param offset: byte offset in the object to begin reading from
2275 :type offset: int
2276 :param oncomplete: what to do when the read is complete
2277 :type oncomplete: completion
2278
2279 :raises: :class:`Error`
2280 :returns: completion object
2281 """
2282
2283 object_name = cstr(object_name, 'object_name')
2284
2285 cdef:
2286 Completion completion
2287 char* _object_name = object_name
2288 uint64_t _offset = offset
2289
2290 char *ref_buf
2291 size_t _length = length
2292
2293 def oncomplete_(completion_v):
2294 cdef Completion _completion_v = completion_v
2295 return_value = _completion_v.get_return_value()
2296 if return_value > 0 and return_value != length:
2297 _PyBytes_Resize(&_completion_v.buf, return_value)
2298 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2299
2300 completion = self.__get_completion(oncomplete_, None)
2301 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2302 ret_buf = PyBytes_AsString(completion.buf)
2303 self.__track_completion(completion)
2304 with nogil:
2305 ret = rados_aio_read(self.io, _object_name, completion.rados_comp,
2306 ret_buf, _length, _offset)
2307 if ret < 0:
2308 completion._cleanup()
2309 raise make_ex(ret, "error reading %s" % object_name)
2310 return completion
2311
2312 @requires(('object_name', str_type), ('cls', str_type), ('method', str_type),
2313 ('data', bytes), ('length', int),
2314 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2315 def aio_execute(self, object_name, cls, method, data,
2316 length=8192, oncomplete=None, onsafe=None):
2317 """
2318 Asynchronously execute an OSD class method on an object.
2319
2320 oncomplete and onsafe will be called with the data returned from
2321 the plugin as well as the completion:
2322
2323 oncomplete(completion, data)
2324 onsafe(completion, data)
2325
2326 :param object_name: name of the object
2327 :type object_name: str
2328 :param cls: name of the object class
2329 :type cls: str
2330 :param method: name of the method
2331 :type method: str
2332 :param data: input data
2333 :type data: bytes
2334 :param length: size of output buffer in bytes (default=8192)
2335 :type length: int
2336 :param oncomplete: what to do when the execution is complete
2337 :type oncomplete: completion
2338 :param onsafe: what to do when the execution is safe and complete
2339 :type onsafe: completion
2340
2341 :raises: :class:`Error`
2342 :returns: completion object
2343 """
2344
2345 object_name = cstr(object_name, 'object_name')
2346 cls = cstr(cls, 'cls')
2347 method = cstr(method, 'method')
2348 cdef:
2349 Completion completion
2350 char *_object_name = object_name
2351 char *_cls = cls
2352 char *_method = method
2353 char *_data = data
2354 size_t _data_len = len(data)
2355
2356 char *ref_buf
2357 size_t _length = length
2358
2359 def oncomplete_(completion_v):
2360 cdef Completion _completion_v = completion_v
2361 return_value = _completion_v.get_return_value()
2362 if return_value > 0 and return_value != length:
2363 _PyBytes_Resize(&_completion_v.buf, return_value)
2364 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2365
2366 def onsafe_(completion_v):
2367 cdef Completion _completion_v = completion_v
2368 return_value = _completion_v.get_return_value()
2369 return onsafe(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2370
2371 completion = self.__get_completion(oncomplete_ if oncomplete else None, onsafe_ if onsafe else None)
2372 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2373 ret_buf = PyBytes_AsString(completion.buf)
2374 self.__track_completion(completion)
2375 with nogil:
2376 ret = rados_aio_exec(self.io, _object_name, completion.rados_comp,
2377 _cls, _method, _data, _data_len, ret_buf, _length)
2378 if ret < 0:
2379 completion._cleanup()
2380 raise make_ex(ret, "error executing %s::%s on %s" % (cls, method, object_name))
2381 return completion
2382
2383 @requires(('object_name', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2384 def aio_remove(self, object_name, oncomplete=None, onsafe=None):
2385 """
2386 Asychronously remove an object
2387
2388 :param object_name: name of the object to remove
2389 :type object_name: str
2390 :param oncomplete: what to do when the remove is safe and complete in memory
2391 on all replicas
2392 :type oncomplete: completion
2393 :param onsafe: what to do when the remove is safe and complete on storage
2394 on all replicas
2395 :type onsafe: completion
2396
2397 :raises: :class:`Error`
2398 :returns: completion object
2399 """
2400 object_name = cstr(object_name, 'object_name')
2401
2402 cdef:
2403 Completion completion
2404 char* _object_name = object_name
2405
2406 completion = self.__get_completion(oncomplete, onsafe)
2407 self.__track_completion(completion)
2408 with nogil:
2409 ret = rados_aio_remove(self.io, _object_name,
2410 completion.rados_comp)
2411 if ret < 0:
2412 completion._cleanup()
2413 raise make_ex(ret, "error removing %s" % object_name)
2414 return completion
2415
2416 def require_ioctx_open(self):
2417 """
2418 Checks if the rados.Ioctx object state is 'open'
2419
2420 :raises: IoctxStateError
2421 """
2422 if self.state != "open":
2423 raise IoctxStateError("The pool is %s" % self.state)
2424
2425 def change_auid(self, auid):
2426 """
2427 Attempt to change an io context's associated auid "owner."
2428
2429 Requires that you have write permission on both the current and new
2430 auid.
2431
2432 :raises: :class:`Error`
2433 """
2434 self.require_ioctx_open()
2435
2436 cdef:
2437 uint64_t _auid = auid
2438
2439 with nogil:
2440 ret = rados_ioctx_pool_set_auid(self.io, _auid)
2441 if ret < 0:
2442 raise make_ex(ret, "error changing auid of '%s' to %d"
2443 % (self.name, auid))
2444
2445 @requires(('loc_key', str_type))
2446 def set_locator_key(self, loc_key):
2447 """
2448 Set the key for mapping objects to pgs within an io context.
2449
2450 The key is used instead of the object name to determine which
2451 placement groups an object is put in. This affects all subsequent
2452 operations of the io context - until a different locator key is
2453 set, all objects in this io context will be placed in the same pg.
2454
2455 :param loc_key: the key to use as the object locator, or NULL to discard
2456 any previously set key
2457 :type loc_key: str
2458
2459 :raises: :class:`TypeError`
2460 """
2461 self.require_ioctx_open()
2462 cloc_key = cstr(loc_key, 'loc_key')
2463 cdef char *_loc_key = cloc_key
2464 with nogil:
2465 rados_ioctx_locator_set_key(self.io, _loc_key)
2466 self.locator_key = loc_key
2467
2468 def get_locator_key(self):
2469 """
2470 Get the locator_key of context
2471
2472 :returns: locator_key
2473 """
2474 return self.locator_key
2475
2476 @requires(('snap_id', long))
2477 def set_read(self, snap_id):
2478 """
2479 Set the snapshot for reading objects.
2480
2481 To stop to read from snapshot, use set_read(LIBRADOS_SNAP_HEAD)
2482
2483 :param snap_id: the snapshot Id
2484 :type snap_id: int
2485
2486 :raises: :class:`TypeError`
2487 """
2488 self.require_ioctx_open()
2489 cdef rados_snap_t _snap_id = snap_id
2490 with nogil:
2491 rados_ioctx_snap_set_read(self.io, _snap_id)
2492
2493 @requires(('nspace', str_type))
2494 def set_namespace(self, nspace):
2495 """
2496 Set the namespace for objects within an io context.
2497
2498 The namespace in addition to the object name fully identifies
2499 an object. This affects all subsequent operations of the io context
2500 - until a different namespace is set, all objects in this io context
2501 will be placed in the same namespace.
2502
2503 :param nspace: the namespace to use, or None/"" for the default namespace
2504 :type nspace: str
2505
2506 :raises: :class:`TypeError`
2507 """
2508 self.require_ioctx_open()
2509 if nspace is None:
2510 nspace = ""
2511 cnspace = cstr(nspace, 'nspace')
2512 cdef char *_nspace = cnspace
2513 with nogil:
2514 rados_ioctx_set_namespace(self.io, _nspace)
2515 self.nspace = nspace
2516
2517 def get_namespace(self):
2518 """
2519 Get the namespace of context
2520
2521 :returns: namespace
2522 """
2523 return self.nspace
2524
2525 def close(self):
2526 """
2527 Close a rados.Ioctx object.
2528
2529 This just tells librados that you no longer need to use the io context.
2530 It may not be freed immediately if there are pending asynchronous
2531 requests on it, but you should not use an io context again after
2532 calling this function on it.
2533 """
2534 if self.state == "open":
2535 self.require_ioctx_open()
2536 with nogil:
2537 rados_ioctx_destroy(self.io)
2538 self.state = "closed"
2539
2540
2541 @requires(('key', str_type), ('data', bytes))
2542 def write(self, key, data, offset=0):
2543 """
2544 Write data to an object synchronously
2545
2546 :param key: name of the object
2547 :type key: str
2548 :param data: data to write
2549 :type data: bytes
2550 :param offset: byte offset in the object to begin writing at
2551 :type offset: int
2552
2553 :raises: :class:`TypeError`
2554 :raises: :class:`LogicError`
2555 :returns: int - 0 on success
2556 """
2557 self.require_ioctx_open()
2558
2559 key = cstr(key, 'key')
2560 cdef:
2561 char *_key = key
2562 char *_data = data
2563 size_t length = len(data)
2564 uint64_t _offset = offset
2565
2566 with nogil:
2567 ret = rados_write(self.io, _key, _data, length, _offset)
2568 if ret == 0:
2569 return ret
2570 elif ret < 0:
2571 raise make_ex(ret, "Ioctx.write(%s): failed to write %s"
2572 % (self.name, key))
2573 else:
2574 raise LogicError("Ioctx.write(%s): rados_write \
2575 returned %d, but should return zero on success." % (self.name, ret))
2576
2577 @requires(('key', str_type), ('data', bytes))
2578 def write_full(self, key, data):
2579 """
2580 Write an entire object synchronously.
2581
2582 The object is filled with the provided data. If the object exists,
2583 it is atomically truncated and then written.
2584
2585 :param key: name of the object
2586 :type key: str
2587 :param data: data to write
2588 :type data: bytes
2589
2590 :raises: :class:`TypeError`
2591 :raises: :class:`Error`
2592 :returns: int - 0 on success
2593 """
2594 self.require_ioctx_open()
2595 key = cstr(key, 'key')
2596 cdef:
2597 char *_key = key
2598 char *_data = data
2599 size_t length = len(data)
2600
2601 with nogil:
2602 ret = rados_write_full(self.io, _key, _data, length)
2603 if ret == 0:
2604 return ret
2605 elif ret < 0:
2606 raise make_ex(ret, "Ioctx.write_full(%s): failed to write %s"
2607 % (self.name, key))
2608 else:
2609 raise LogicError("Ioctx.write_full(%s): rados_write_full \
2610 returned %d, but should return zero on success." % (self.name, ret))
2611
2612 @requires(('key', str_type), ('data', bytes))
2613 def append(self, key, data):
2614 """
2615 Append data to an object synchronously
2616
2617 :param key: name of the object
2618 :type key: str
2619 :param data: data to write
2620 :type data: bytes
2621
2622 :raises: :class:`TypeError`
2623 :raises: :class:`LogicError`
2624 :returns: int - 0 on success
2625 """
2626 self.require_ioctx_open()
2627 key = cstr(key, 'key')
2628 cdef:
2629 char *_key = key
2630 char *_data = data
2631 size_t length = len(data)
2632
2633 with nogil:
2634 ret = rados_append(self.io, _key, _data, length)
2635 if ret == 0:
2636 return ret
2637 elif ret < 0:
2638 raise make_ex(ret, "Ioctx.append(%s): failed to append %s"
2639 % (self.name, key))
2640 else:
2641 raise LogicError("Ioctx.append(%s): rados_append \
2642 returned %d, but should return zero on success." % (self.name, ret))
2643
2644 @requires(('key', str_type))
2645 def read(self, key, length=8192, offset=0):
2646 """
2647 Read data from an object synchronously
2648
2649 :param key: name of the object
2650 :type key: str
2651 :param length: the number of bytes to read (default=8192)
2652 :type length: int
2653 :param offset: byte offset in the object to begin reading at
2654 :type offset: int
2655
2656 :raises: :class:`TypeError`
2657 :raises: :class:`Error`
2658 :returns: str - data read from object
2659 """
2660 self.require_ioctx_open()
2661 key = cstr(key, 'key')
2662 cdef:
2663 char *_key = key
2664 char *ret_buf
2665 uint64_t _offset = offset
2666 size_t _length = length
2667 PyObject* ret_s = NULL
2668
2669 ret_s = PyBytes_FromStringAndSize(NULL, length)
2670 try:
2671 ret_buf = PyBytes_AsString(ret_s)
2672 with nogil:
2673 ret = rados_read(self.io, _key, ret_buf, _length, _offset)
2674 if ret < 0:
2675 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2676
2677 if ret != length:
2678 _PyBytes_Resize(&ret_s, ret)
2679
2680 return <object>ret_s
2681 finally:
2682 # We DECREF unconditionally: the cast to object above will have
2683 # INCREFed if necessary. This also takes care of exceptions,
2684 # including if _PyString_Resize fails (that will free the string
2685 # itself and set ret_s to NULL, hence XDECREF).
2686 ref.Py_XDECREF(ret_s)
2687
2688 @requires(('key', str_type), ('cls', str_type), ('method', str_type), ('data', bytes))
2689 def execute(self, key, cls, method, data, length=8192):
2690 """
2691 Execute an OSD class method on an object.
2692
2693 :param key: name of the object
2694 :type key: str
2695 :param cls: name of the object class
2696 :type cls: str
2697 :param method: name of the method
2698 :type method: str
2699 :param data: input data
2700 :type data: bytes
2701 :param length: size of output buffer in bytes (default=8192)
2702 :type length: int
2703
2704 :raises: :class:`TypeError`
2705 :raises: :class:`Error`
2706 :returns: (ret, method output)
2707 """
2708 self.require_ioctx_open()
2709
2710 key = cstr(key, 'key')
2711 cls = cstr(cls, 'cls')
2712 method = cstr(method, 'method')
2713 cdef:
2714 char *_key = key
2715 char *_cls = cls
2716 char *_method = method
2717 char *_data = data
2718 size_t _data_len = len(data)
2719
2720 char *ref_buf
2721 size_t _length = length
2722 PyObject* ret_s = NULL
2723
2724 ret_s = PyBytes_FromStringAndSize(NULL, length)
2725 try:
2726 ret_buf = PyBytes_AsString(ret_s)
2727 with nogil:
2728 ret = rados_exec(self.io, _key, _cls, _method, _data,
2729 _data_len, ret_buf, _length)
2730 if ret < 0:
2731 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2732
2733 if ret != length:
2734 _PyBytes_Resize(&ret_s, ret)
2735
2736 return ret, <object>ret_s
2737 finally:
2738 # We DECREF unconditionally: the cast to object above will have
2739 # INCREFed if necessary. This also takes care of exceptions,
2740 # including if _PyString_Resize fails (that will free the string
2741 # itself and set ret_s to NULL, hence XDECREF).
2742 ref.Py_XDECREF(ret_s)
2743
2744 def get_stats(self):
2745 """
2746 Get pool usage statistics
2747
2748 :returns: dict - contains the following keys:
2749
2750 - ``num_bytes`` (int) - size of pool in bytes
2751
2752 - ``num_kb`` (int) - size of pool in kbytes
2753
2754 - ``num_objects`` (int) - number of objects in the pool
2755
2756 - ``num_object_clones`` (int) - number of object clones
2757
2758 - ``num_object_copies`` (int) - number of object copies
2759
2760 - ``num_objects_missing_on_primary`` (int) - number of objets
2761 missing on primary
2762
2763 - ``num_objects_unfound`` (int) - number of unfound objects
2764
2765 - ``num_objects_degraded`` (int) - number of degraded objects
2766
2767 - ``num_rd`` (int) - bytes read
2768
2769 - ``num_rd_kb`` (int) - kbytes read
2770
2771 - ``num_wr`` (int) - bytes written
2772
2773 - ``num_wr_kb`` (int) - kbytes written
2774 """
2775 self.require_ioctx_open()
2776 cdef rados_pool_stat_t stats
2777 with nogil:
2778 ret = rados_ioctx_pool_stat(self.io, &stats)
2779 if ret < 0:
2780 raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name)
2781 return {'num_bytes': stats.num_bytes,
2782 'num_kb': stats.num_kb,
2783 'num_objects': stats.num_objects,
2784 'num_object_clones': stats.num_object_clones,
2785 'num_object_copies': stats.num_object_copies,
2786 "num_objects_missing_on_primary": stats.num_objects_missing_on_primary,
2787 "num_objects_unfound": stats.num_objects_unfound,
2788 "num_objects_degraded": stats.num_objects_degraded,
2789 "num_rd": stats.num_rd,
2790 "num_rd_kb": stats.num_rd_kb,
2791 "num_wr": stats.num_wr,
2792 "num_wr_kb": stats.num_wr_kb}
2793
2794 @requires(('key', str_type))
2795 def remove_object(self, key):
2796 """
2797 Delete an object
2798
2799 This does not delete any snapshots of the object.
2800
2801 :param key: the name of the object to delete
2802 :type key: str
2803
2804 :raises: :class:`TypeError`
2805 :raises: :class:`Error`
2806 :returns: bool - True on success
2807 """
2808 self.require_ioctx_open()
2809 key = cstr(key, 'key')
2810 cdef:
2811 char *_key = key
2812
2813 with nogil:
2814 ret = rados_remove(self.io, _key)
2815 if ret < 0:
2816 raise make_ex(ret, "Failed to remove '%s'" % key)
2817 return True
2818
2819 @requires(('key', str_type))
2820 def trunc(self, key, size):
2821 """
2822 Resize an object
2823
2824 If this enlarges the object, the new area is logically filled with
2825 zeroes. If this shrinks the object, the excess data is removed.
2826
2827 :param key: the name of the object to resize
2828 :type key: str
2829 :param size: the new size of the object in bytes
2830 :type size: int
2831
2832 :raises: :class:`TypeError`
2833 :raises: :class:`Error`
2834 :returns: int - 0 on success, otherwise raises error
2835 """
2836
2837 self.require_ioctx_open()
2838 key = cstr(key, 'key')
2839 cdef:
2840 char *_key = key
2841 uint64_t _size = size
2842
2843 with nogil:
2844 ret = rados_trunc(self.io, _key, _size)
2845 if ret < 0:
2846 raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))
2847 return ret
2848
2849 @requires(('key', str_type))
2850 def stat(self, key):
2851 """
2852 Get object stats (size/mtime)
2853
2854 :param key: the name of the object to get stats from
2855 :type key: str
2856
2857 :raises: :class:`TypeError`
2858 :raises: :class:`Error`
2859 :returns: (size,timestamp)
2860 """
2861 self.require_ioctx_open()
2862
2863 key = cstr(key, 'key')
2864 cdef:
2865 char *_key = key
2866 uint64_t psize
2867 time_t pmtime
2868
2869 with nogil:
2870 ret = rados_stat(self.io, _key, &psize, &pmtime)
2871 if ret < 0:
2872 raise make_ex(ret, "Failed to stat %r" % key)
2873 return psize, time.localtime(pmtime)
2874
2875 @requires(('key', str_type), ('xattr_name', str_type))
2876 def get_xattr(self, key, xattr_name):
2877 """
2878 Get the value of an extended attribute on an object.
2879
2880 :param key: the name of the object to get xattr from
2881 :type key: str
2882 :param xattr_name: which extended attribute to read
2883 :type xattr_name: str
2884
2885 :raises: :class:`TypeError`
2886 :raises: :class:`Error`
2887 :returns: str - value of the xattr
2888 """
2889 self.require_ioctx_open()
2890
2891 key = cstr(key, 'key')
2892 xattr_name = cstr(xattr_name, 'xattr_name')
2893 cdef:
2894 char *_key = key
2895 char *_xattr_name = xattr_name
2896 size_t ret_length = 4096
2897 char *ret_buf = NULL
2898
2899 try:
2900 while ret_length < 4096 * 1024 * 1024:
2901 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
2902 with nogil:
2903 ret = rados_getxattr(self.io, _key, _xattr_name, ret_buf, ret_length)
2904 if ret == -errno.ERANGE:
2905 ret_length *= 2
2906 elif ret < 0:
2907 raise make_ex(ret, "Failed to get xattr %r" % xattr_name)
2908 else:
2909 break
2910 return ret_buf[:ret]
2911 finally:
2912 free(ret_buf)
2913
2914 @requires(('oid', str_type))
2915 def get_xattrs(self, oid):
2916 """
2917 Start iterating over xattrs on an object.
2918
2919 :param oid: the name of the object to get xattrs from
2920 :type oid: str
2921
2922 :raises: :class:`TypeError`
2923 :raises: :class:`Error`
2924 :returns: XattrIterator
2925 """
2926 self.require_ioctx_open()
2927 return XattrIterator(self, oid)
2928
2929 @requires(('key', str_type), ('xattr_name', str_type), ('xattr_value', bytes))
2930 def set_xattr(self, key, xattr_name, xattr_value):
2931 """
2932 Set an extended attribute on an object.
2933
2934 :param key: the name of the object to set xattr to
2935 :type key: str
2936 :param xattr_name: which extended attribute to set
2937 :type xattr_name: str
2938 :param xattr_value: the value of the extended attribute
2939 :type xattr_value: bytes
2940
2941 :raises: :class:`TypeError`
2942 :raises: :class:`Error`
2943 :returns: bool - True on success, otherwise raise an error
2944 """
2945 self.require_ioctx_open()
2946
2947 key = cstr(key, 'key')
2948 xattr_name = cstr(xattr_name, 'xattr_name')
2949 cdef:
2950 char *_key = key
2951 char *_xattr_name = xattr_name
2952 char *_xattr_value = xattr_value
2953 size_t _xattr_value_len = len(xattr_value)
2954
2955 with nogil:
2956 ret = rados_setxattr(self.io, _key, _xattr_name,
2957 _xattr_value, _xattr_value_len)
2958 if ret < 0:
2959 raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
2960 return True
2961
2962 @requires(('key', str_type), ('xattr_name', str_type))
2963 def rm_xattr(self, key, xattr_name):
2964 """
2965 Removes an extended attribute on from an object.
2966
2967 :param key: the name of the object to remove xattr from
2968 :type key: str
2969 :param xattr_name: which extended attribute to remove
2970 :type xattr_name: str
2971
2972 :raises: :class:`TypeError`
2973 :raises: :class:`Error`
2974 :returns: bool - True on success, otherwise raise an error
2975 """
2976 self.require_ioctx_open()
2977
2978 key = cstr(key, 'key')
2979 xattr_name = cstr(xattr_name, 'xattr_name')
2980 cdef:
2981 char *_key = key
2982 char *_xattr_name = xattr_name
2983
2984 with nogil:
2985 ret = rados_rmxattr(self.io, _key, _xattr_name)
2986 if ret < 0:
2987 raise make_ex(ret, "Failed to delete key %r xattr %r" %
2988 (key, xattr_name))
2989 return True
2990
2991 def list_objects(self):
2992 """
2993 Get ObjectIterator on rados.Ioctx object.
2994
2995 :returns: ObjectIterator
2996 """
2997 self.require_ioctx_open()
2998 return ObjectIterator(self)
2999
3000 def list_snaps(self):
3001 """
3002 Get SnapIterator on rados.Ioctx object.
3003
3004 :returns: SnapIterator
3005 """
3006 self.require_ioctx_open()
3007 return SnapIterator(self)
3008
3009 @requires(('snap_name', str_type))
3010 def create_snap(self, snap_name):
3011 """
3012 Create a pool-wide snapshot
3013
3014 :param snap_name: the name of the snapshot
3015 :type snap_name: str
3016
3017 :raises: :class:`TypeError`
3018 :raises: :class:`Error`
3019 """
3020 self.require_ioctx_open()
3021 snap_name = cstr(snap_name, 'snap_name')
3022 cdef char *_snap_name = snap_name
3023
3024 with nogil:
3025 ret = rados_ioctx_snap_create(self.io, _snap_name)
3026 if ret != 0:
3027 raise make_ex(ret, "Failed to create snap %s" % snap_name)
3028
3029 @requires(('snap_name', str_type))
3030 def remove_snap(self, snap_name):
3031 """
3032 Removes a pool-wide snapshot
3033
3034 :param snap_name: the name of the snapshot
3035 :type snap_name: str
3036
3037 :raises: :class:`TypeError`
3038 :raises: :class:`Error`
3039 """
3040 self.require_ioctx_open()
3041 snap_name = cstr(snap_name, 'snap_name')
3042 cdef char *_snap_name = snap_name
3043
3044 with nogil:
3045 ret = rados_ioctx_snap_remove(self.io, _snap_name)
3046 if ret != 0:
3047 raise make_ex(ret, "Failed to remove snap %s" % snap_name)
3048
3049 @requires(('snap_name', str_type))
3050 def lookup_snap(self, snap_name):
3051 """
3052 Get the id of a pool snapshot
3053
3054 :param snap_name: the name of the snapshot to lookop
3055 :type snap_name: str
3056
3057 :raises: :class:`TypeError`
3058 :raises: :class:`Error`
3059 :returns: Snap - on success
3060 """
3061 self.require_ioctx_open()
3062 csnap_name = cstr(snap_name, 'snap_name')
3063 cdef:
3064 char *_snap_name = csnap_name
3065 rados_snap_t snap_id
3066
3067 with nogil:
3068 ret = rados_ioctx_snap_lookup(self.io, _snap_name, &snap_id)
3069 if ret != 0:
3070 raise make_ex(ret, "Failed to lookup snap %s" % snap_name)
3071 return Snap(self, snap_name, int(snap_id))
3072
3073 @requires(('oid', str_type), ('snap_name', str_type))
3074 def snap_rollback(self, oid, snap_name):
3075 """
3076 Rollback an object to a snapshot
3077
3078 :param oid: the name of the object
3079 :type oid: str
3080 :param snap_name: the name of the snapshot
3081 :type snap_name: str
3082
3083 :raises: :class:`TypeError`
3084 :raises: :class:`Error`
3085 """
3086 self.require_ioctx_open()
3087 oid = cstr(oid, 'oid')
3088 snap_name = cstr(snap_name, 'snap_name')
3089 cdef:
3090 char *_snap_name = snap_name
3091 char *_oid = oid
3092
3093 with nogil:
3094 ret = rados_ioctx_snap_rollback(self.io, _oid, _snap_name)
3095 if ret != 0:
3096 raise make_ex(ret, "Failed to rollback %s" % oid)
3097
3098 def get_last_version(self):
3099 """
3100 Return the version of the last object read or written to.
3101
3102 This exposes the internal version number of the last object read or
3103 written via this io context
3104
3105 :returns: version of the last object used
3106 """
3107 self.require_ioctx_open()
3108 with nogil:
3109 ret = rados_get_last_version(self.io)
3110 return int(ret)
3111
3112 def create_write_op(self):
3113 """
3114 create write operation object.
3115 need call release_write_op after use
3116 """
3117 return WriteOp().create()
3118
3119 def create_read_op(self):
3120 """
3121 create read operation object.
3122 need call release_read_op after use
3123 """
3124 return ReadOp().create()
3125
3126 def release_write_op(self, write_op):
3127 """
3128 release memory alloc by create_write_op
3129 """
3130 write_op.release()
3131
3132 def release_read_op(self, read_op):
3133 """
3134 release memory alloc by create_read_op
3135 :para read_op: read_op object
3136 :type: int
3137 """
3138 read_op.release()
3139
3140 @requires(('write_op', WriteOp), ('keys', tuple), ('values', tuple))
3141 def set_omap(self, write_op, keys, values):
3142 """
3143 set keys values to write_op
3144 :para write_op: write_operation object
3145 :type write_op: WriteOp
3146 :para keys: a tuple of keys
3147 :type keys: tuple
3148 :para values: a tuple of values
3149 :type values: tuple
3150 """
3151
3152 if len(keys) != len(values):
3153 raise Error("Rados(): keys and values must have the same number of items")
3154
3155 keys = cstr_list(keys, 'keys')
3156 cdef:
3157 WriteOp _write_op = write_op
3158 size_t key_num = len(keys)
3159 char **_keys = to_bytes_array(keys)
3160 char **_values = to_bytes_array(values)
3161 size_t *_lens = to_csize_t_array([len(v) for v in values])
3162
3163 try:
3164 with nogil:
3165 rados_write_op_omap_set(_write_op.write_op,
3166 <const char**>_keys,
3167 <const char**>_values,
3168 <const size_t*>_lens, key_num)
3169 finally:
3170 free(_keys)
3171 free(_values)
3172 free(_lens)
3173
3174 @requires(('write_op', WriteOp), ('oid', str_type), ('mtime', opt(int)), ('flags', opt(int)))
3175 def operate_write_op(self, write_op, oid, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3176 """
3177 excute the real write operation
3178 :para write_op: write operation object
3179 :type write_op: WriteOp
3180 :para oid: object name
3181 :type oid: str
3182 :para mtime: the time to set the mtime to, 0 for the current time
3183 :type mtime: int
3184 :para flags: flags to apply to the entire operation
3185 :type flags: int
3186 """
3187
3188 oid = cstr(oid, 'oid')
3189 cdef:
3190 WriteOp _write_op = write_op
3191 char *_oid = oid
3192 time_t _mtime = mtime
3193 int _flags = flags
3194
3195 with nogil:
3196 ret = rados_write_op_operate(_write_op.write_op, self.io, _oid, &_mtime, _flags)
3197 if ret != 0:
3198 raise make_ex(ret, "Failed to operate write op for oid %s" % oid)
3199
3200 @requires(('write_op', WriteOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('mtime', opt(int)), ('flags', opt(int)))
3201 def operate_aio_write_op(self, write_op, oid, oncomplete=None, onsafe=None, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3202 """
3203 excute the real write operation asynchronously
3204 :para write_op: write operation object
3205 :type write_op: WriteOp
3206 :para oid: object name
3207 :type oid: str
3208 :param oncomplete: what to do when the remove is safe and complete in memory
3209 on all replicas
3210 :type oncomplete: completion
3211 :param onsafe: what to do when the remove is safe and complete on storage
3212 on all replicas
3213 :type onsafe: completion
3214 :para mtime: the time to set the mtime to, 0 for the current time
3215 :type mtime: int
3216 :para flags: flags to apply to the entire operation
3217 :type flags: int
3218
3219 :raises: :class:`Error`
3220 :returns: completion object
3221 """
3222
3223 oid = cstr(oid, 'oid')
3224 cdef:
3225 WriteOp _write_op = write_op
3226 char *_oid = oid
3227 Completion completion
3228 time_t _mtime = mtime
3229 int _flags = flags
3230
3231 completion = self.__get_completion(oncomplete, onsafe)
3232 self.__track_completion(completion)
3233
3234 with nogil:
3235 ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid,
3236 &_mtime, _flags)
3237 if ret != 0:
3238 completion._cleanup()
3239 raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid)
3240 return completion
3241
3242 @requires(('read_op', ReadOp), ('oid', str_type), ('flag', opt(int)))
3243 def operate_read_op(self, read_op, oid, flag=LIBRADOS_OPERATION_NOFLAG):
3244 """
3245 excute the real read operation
3246 :para read_op: read operation object
3247 :type read_op: ReadOp
3248 :para oid: object name
3249 :type oid: str
3250 :para flag: flags to apply to the entire operation
3251 :type flag: int
3252 """
3253 oid = cstr(oid, 'oid')
3254 cdef:
3255 ReadOp _read_op = read_op
3256 char *_oid = oid
3257 int _flag = flag
3258
3259 with nogil:
3260 ret = rados_read_op_operate(_read_op.read_op, self.io, _oid, _flag)
3261 if ret != 0:
3262 raise make_ex(ret, "Failed to operate read op for oid %s" % oid)
3263
3264 @requires(('read_op', ReadOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('flag', opt(int)))
3265 def operate_aio_read_op(self, read_op, oid, oncomplete=None, onsafe=None, flag=LIBRADOS_OPERATION_NOFLAG):
3266 """
3267 excute the real read operation
3268 :para read_op: read operation object
3269 :type read_op: ReadOp
3270 :para oid: object name
3271 :type oid: str
3272 :param oncomplete: what to do when the remove is safe and complete in memory
3273 on all replicas
3274 :type oncomplete: completion
3275 :param onsafe: what to do when the remove is safe and complete on storage
3276 on all replicas
3277 :type onsafe: completion
3278 :para flag: flags to apply to the entire operation
3279 :type flag: int
3280 """
3281 oid = cstr(oid, 'oid')
3282 cdef:
3283 ReadOp _read_op = read_op
3284 char *_oid = oid
3285 Completion completion
3286 int _flag = flag
3287
3288 completion = self.__get_completion(oncomplete, onsafe)
3289 self.__track_completion(completion)
3290
3291 with nogil:
3292 ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag)
3293 if ret != 0:
3294 completion._cleanup()
3295 raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid)
3296 return completion
3297
3298 @requires(('read_op', ReadOp), ('start_after', str_type), ('filter_prefix', str_type), ('max_return', int))
3299 def get_omap_vals(self, read_op, start_after, filter_prefix, max_return):
3300 """
3301 get the omap values
3302 :para read_op: read operation object
3303 :type read_op: ReadOp
3304 :para start_after: list keys starting after start_after
3305 :type start_after: str
3306 :para filter_prefix: list only keys beginning with filter_prefix
3307 :type filter_prefix: str
3308 :para max_return: list no more than max_return key/value pairs
3309 :type max_return: int
3310 :returns: an iterator over the requested omap values, return value from this action
3311 """
3312
3313 start_after = cstr(start_after, 'start_after') if start_after else None
3314 filter_prefix = cstr(filter_prefix, 'filter_prefix') if filter_prefix else None
3315 cdef:
3316 char *_start_after = opt_str(start_after)
3317 char *_filter_prefix = opt_str(filter_prefix)
3318 ReadOp _read_op = read_op
3319 rados_omap_iter_t iter_addr = NULL
3320 int _max_return = max_return
3321 int prval = 0
3322
3323 with nogil:
3324 rados_read_op_omap_get_vals(_read_op.read_op, _start_after, _filter_prefix,
3325 _max_return, &iter_addr, &prval)
3326 it = OmapIterator(self)
3327 it.ctx = iter_addr
3328 return it, int(prval)
3329
3330 @requires(('read_op', ReadOp), ('start_after', str_type), ('max_return', int))
3331 def get_omap_keys(self, read_op, start_after, max_return):
3332 """
3333 get the omap keys
3334 :para read_op: read operation object
3335 :type read_op: ReadOp
3336 :para start_after: list keys starting after start_after
3337 :type start_after: str
3338 :para max_return: list no more than max_return key/value pairs
3339 :type max_return: int
3340 :returns: an iterator over the requested omap values, return value from this action
3341 """
3342 start_after = cstr(start_after, 'start_after') if start_after else None
3343 cdef:
3344 char *_start_after = opt_str(start_after)
3345 ReadOp _read_op = read_op
3346 rados_omap_iter_t iter_addr = NULL
3347 int _max_return = max_return
3348 int prval = 0
3349
3350 with nogil:
3351 rados_read_op_omap_get_keys(_read_op.read_op, _start_after,
3352 _max_return, &iter_addr, &prval)
3353 it = OmapIterator(self)
3354 it.ctx = iter_addr
3355 return it, int(prval)
3356
3357 @requires(('read_op', ReadOp), ('keys', tuple))
3358 def get_omap_vals_by_keys(self, read_op, keys):
3359 """
3360 get the omap values by keys
3361 :para read_op: read operation object
3362 :type read_op: ReadOp
3363 :para keys: input key tuple
3364 :type keys: tuple
3365 :returns: an iterator over the requested omap values, return value from this action
3366 """
3367 keys = cstr_list(keys, 'keys')
3368 cdef:
3369 ReadOp _read_op = read_op
3370 rados_omap_iter_t iter_addr
3371 char **_keys = to_bytes_array(keys)
3372 size_t key_num = len(keys)
3373 int prval = 0
3374
3375 try:
3376 with nogil:
3377 rados_read_op_omap_get_vals_by_keys(_read_op.read_op,
3378 <const char**>_keys,
3379 key_num, &iter_addr, &prval)
3380 it = OmapIterator(self)
3381 it.ctx = iter_addr
3382 return it, int(prval)
3383 finally:
3384 free(_keys)
3385
3386 @requires(('write_op', WriteOp), ('keys', tuple))
3387 def remove_omap_keys(self, write_op, keys):
3388 """
3389 remove omap keys specifiled
3390 :para write_op: write operation object
3391 :type write_op: WriteOp
3392 :para keys: input key tuple
3393 :type keys: tuple
3394 """
3395
3396 keys = cstr_list(keys, 'keys')
3397 cdef:
3398 WriteOp _write_op = write_op
3399 size_t key_num = len(keys)
3400 char **_keys = to_bytes_array(keys)
3401
3402 try:
3403 with nogil:
3404 rados_write_op_omap_rm_keys(_write_op.write_op, <const char**>_keys, key_num)
3405 finally:
3406 free(_keys)
3407
3408 @requires(('write_op', WriteOp))
3409 def clear_omap(self, write_op):
3410 """
3411 Remove all key/value pairs from an object
3412 :para write_op: write operation object
3413 :type write_op: WriteOp
3414 """
3415
3416 cdef:
3417 WriteOp _write_op = write_op
3418
3419 with nogil:
3420 rados_write_op_omap_clear(_write_op.write_op)
3421
3422 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('desc', str_type),
3423 ('duration', opt(int)), ('flags', int))
3424 def lock_exclusive(self, key, name, cookie, desc="", duration=None, flags=0):
3425
3426 """
3427 Take an exclusive lock on an object
3428
3429 :param key: name of the object
3430 :type key: str
3431 :param name: name of the lock
3432 :type name: str
3433 :param cookie: cookie of the lock
3434 :type cookie: str
3435 :param desc: description of the lock
3436 :type desc: str
3437 :param duration: duration of the lock in seconds
3438 :type duration: int
3439 :param flags: flags
3440 :type flags: int
3441
3442 :raises: :class:`TypeError`
3443 :raises: :class:`Error`
3444 """
3445 self.require_ioctx_open()
3446
3447 key = cstr(key, 'key')
3448 name = cstr(name, 'name')
3449 cookie = cstr(cookie, 'cookie')
3450 desc = cstr(desc, 'desc')
3451
3452 cdef:
3453 char* _key = key
3454 char* _name = name
3455 char* _cookie = cookie
3456 char* _desc = desc
3457 uint8_t _flags = flags
3458 timeval _duration
3459
3460 if duration is None:
3461 with nogil:
3462 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3463 NULL, _flags)
3464 else:
3465 _duration.tv_sec = duration
3466 with nogil:
3467 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3468 &_duration, _flags)
3469
3470 if ret < 0:
3471 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3472
3473 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('tag', str_type),
3474 ('desc', str_type), ('duration', opt(int)), ('flags', int))
3475 def lock_shared(self, key, name, cookie, tag, desc="", duration=None, flags=0):
3476
3477 """
3478 Take a shared lock on an object
3479
3480 :param key: name of the object
3481 :type key: str
3482 :param name: name of the lock
3483 :type name: str
3484 :param cookie: cookie of the lock
3485 :type cookie: str
3486 :param tag: tag of the lock
3487 :type tag: str
3488 :param desc: description of the lock
3489 :type desc: str
3490 :param duration: duration of the lock in seconds
3491 :type duration: int
3492 :param flags: flags
3493 :type flags: int
3494
3495 :raises: :class:`TypeError`
3496 :raises: :class:`Error`
3497 """
3498 self.require_ioctx_open()
3499
3500 key = cstr(key, 'key')
3501 tag = cstr(tag, 'tag')
3502 name = cstr(name, 'name')
3503 cookie = cstr(cookie, 'cookie')
3504 desc = cstr(desc, 'desc')
3505
3506 cdef:
3507 char* _key = key
3508 char* _tag = tag
3509 char* _name = name
3510 char* _cookie = cookie
3511 char* _desc = desc
3512 uint8_t _flags = flags
3513 timeval _duration
3514
3515 if duration is None:
3516 with nogil:
3517 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3518 NULL, _flags)
3519 else:
3520 _duration.tv_sec = duration
3521 with nogil:
3522 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3523 &_duration, _flags)
3524 if ret < 0:
3525 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3526
3527 @requires(('key', str_type), ('name', str_type), ('cookie', str_type))
3528 def unlock(self, key, name, cookie):
3529
3530 """
3531 Release a shared or exclusive lock on an object
3532
3533 :param key: name of the object
3534 :type key: str
3535 :param name: name of the lock
3536 :type name: str
3537 :param cookie: cookie of the lock
3538 :type cookie: str
3539
3540 :raises: :class:`TypeError`
3541 :raises: :class:`Error`
3542 """
3543 self.require_ioctx_open()
3544
3545 key = cstr(key, 'key')
3546 name = cstr(name, 'name')
3547 cookie = cstr(cookie, 'cookie')
3548
3549 cdef:
3550 char* _key = key
3551 char* _name = name
3552 char* _cookie = cookie
3553
3554 with nogil:
3555 ret = rados_unlock(self.io, _key, _name, _cookie)
3556 if ret < 0:
3557 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3558
3559
3560 def set_object_locator(func):
3561 def retfunc(self, *args, **kwargs):
3562 if self.locator_key is not None:
3563 old_locator = self.ioctx.get_locator_key()
3564 self.ioctx.set_locator_key(self.locator_key)
3565 retval = func(self, *args, **kwargs)
3566 self.ioctx.set_locator_key(old_locator)
3567 return retval
3568 else:
3569 return func(self, *args, **kwargs)
3570 return retfunc
3571
3572
3573 def set_object_namespace(func):
3574 def retfunc(self, *args, **kwargs):
3575 if self.nspace is None:
3576 raise LogicError("Namespace not set properly in context")
3577 old_nspace = self.ioctx.get_namespace()
3578 self.ioctx.set_namespace(self.nspace)
3579 retval = func(self, *args, **kwargs)
3580 self.ioctx.set_namespace(old_nspace)
3581 return retval
3582 return retfunc
3583
3584
3585 class Object(object):
3586 """Rados object wrapper, makes the object look like a file"""
3587 def __init__(self, ioctx, key, locator_key=None, nspace=None):
3588 self.key = key
3589 self.ioctx = ioctx
3590 self.offset = 0
3591 self.state = "exists"
3592 self.locator_key = locator_key
3593 self.nspace = "" if nspace is None else nspace
3594
3595 def __str__(self):
3596 return "rados.Object(ioctx=%s,key=%s,nspace=%s,locator=%s)" % \
3597 (str(self.ioctx), self.key, "--default--"
3598 if self.nspace is "" else self.nspace, self.locator_key)
3599
3600 def require_object_exists(self):
3601 if self.state != "exists":
3602 raise ObjectStateError("The object is %s" % self.state)
3603
3604 @set_object_locator
3605 @set_object_namespace
3606 def read(self, length=1024 * 1024):
3607 self.require_object_exists()
3608 ret = self.ioctx.read(self.key, length, self.offset)
3609 self.offset += len(ret)
3610 return ret
3611
3612 @set_object_locator
3613 @set_object_namespace
3614 def write(self, string_to_write):
3615 self.require_object_exists()
3616 ret = self.ioctx.write(self.key, string_to_write, self.offset)
3617 if ret == 0:
3618 self.offset += len(string_to_write)
3619 return ret
3620
3621 @set_object_locator
3622 @set_object_namespace
3623 def remove(self):
3624 self.require_object_exists()
3625 self.ioctx.remove_object(self.key)
3626 self.state = "removed"
3627
3628 @set_object_locator
3629 @set_object_namespace
3630 def stat(self):
3631 self.require_object_exists()
3632 return self.ioctx.stat(self.key)
3633
3634 def seek(self, position):
3635 self.require_object_exists()
3636 self.offset = position
3637
3638 @set_object_locator
3639 @set_object_namespace
3640 def get_xattr(self, xattr_name):
3641 self.require_object_exists()
3642 return self.ioctx.get_xattr(self.key, xattr_name)
3643
3644 @set_object_locator
3645 @set_object_namespace
3646 def get_xattrs(self):
3647 self.require_object_exists()
3648 return self.ioctx.get_xattrs(self.key)
3649
3650 @set_object_locator
3651 @set_object_namespace
3652 def set_xattr(self, xattr_name, xattr_value):
3653 self.require_object_exists()
3654 return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)
3655
3656 @set_object_locator
3657 @set_object_namespace
3658 def rm_xattr(self, xattr_name):
3659 self.require_object_exists()
3660 return self.ioctx.rm_xattr(self.key, xattr_name)
3661
3662 MONITOR_LEVELS = [
3663 "debug",
3664 "info",
3665 "warn", "warning",
3666 "err", "error",
3667 "sec",
3668 ]
3669
3670
3671 class MonitorLog(object):
3672 # NOTE(sileht): Keep this class for backward compat
3673 # method moved to Rados.monitor_log()
3674 """
3675 For watching cluster log messages. Instantiate an object and keep
3676 it around while callback is periodically called. Construct with
3677 'level' to monitor 'level' messages (one of MONITOR_LEVELS).
3678 arg will be passed to the callback.
3679
3680 callback will be called with:
3681 arg (given to __init__)
3682 line (the full line, including timestamp, who, level, msg)
3683 who (which entity issued the log message)
3684 timestamp_sec (sec of a struct timespec)
3685 timestamp_nsec (sec of a struct timespec)
3686 seq (sequence number)
3687 level (string representing the level of the log message)
3688 msg (the message itself)
3689 callback's return value is ignored
3690 """
3691 def __init__(self, cluster, level, callback, arg):
3692 self.level = level
3693 self.callback = callback
3694 self.arg = arg
3695 self.cluster = cluster
3696 self.cluster.monitor_log(level, callback, arg)
3697