]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/rados/rados.pyx
update sources to v12.1.1
[ceph.git] / ceph / src / pybind / rados / rados.pyx
1 # cython: embedsignature=True
2 """
3 This module is a thin wrapper around librados.
4
5 Error codes from librados are turned into exceptions that subclass
6 :class:`Error`. Almost all methods may raise :class:`Error(the base class of all rados exceptions), :class:`PermissionError`
7 (the base class of all rados exceptions), :class:`PermissionError`
8 and :class:`IOError`, in addition to those documented for the
9 method.
10 """
11 # Copyright 2011 Josh Durgin
12 # Copyright 2011, Hannu Valtonen <hannu.valtonen@ormod.com>
13 # Copyright 2015 Hector Martin <marcan@marcan.st>
14 # Copyright 2016 Mehdi Abaakouk <sileht@redhat.com>
15
16 from cpython cimport PyObject, ref
17 from cpython.pycapsule cimport *
18 from libc cimport errno
19 from libc.stdint cimport *
20 from libc.stdlib cimport malloc, realloc, free
21
22 import sys
23 import threading
24 import time
25
26 from collections import Callable
27 from datetime import datetime
28 from functools import partial, wraps
29 from itertools import chain
30
31 # Are we running Python 2.x
32 if sys.version_info[0] < 3:
33 str_type = basestring
34 else:
35 str_type = str
36
37
38 cdef extern from "Python.h":
39 # These are in cpython/string.pxd, but use "object" types instead of
40 # PyObject*, which invokes assumptions in cpython that we need to
41 # legitimately break to implement zero-copy string buffers in Ioctx.read().
42 # This is valid use of the Python API and documented as a special case.
43 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
44 char* PyBytes_AsString(PyObject *string) except NULL
45 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
46 void PyEval_InitThreads()
47
48
49 cdef extern from "time.h":
50 ctypedef long int time_t
51 ctypedef long int suseconds_t
52
53
54 cdef extern from "sys/time.h":
55 cdef struct timeval:
56 time_t tv_sec
57 suseconds_t tv_usec
58
59
60 cdef extern from "rados/rados_types.h" nogil:
61 cdef char* _LIBRADOS_ALL_NSPACES "LIBRADOS_ALL_NSPACES"
62
63
64 cdef extern from "rados/librados.h" nogil:
65 enum:
66 _LIBRADOS_OP_FLAG_EXCL "LIBRADOS_OP_FLAG_EXCL"
67 _LIBRADOS_OP_FLAG_FAILOK "LIBRADOS_OP_FLAG_FAILOK"
68 _LIBRADOS_OP_FLAG_FADVISE_RANDOM "LIBRADOS_OP_FLAG_FADVISE_RANDOM"
69 _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL "LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL"
70 _LIBRADOS_OP_FLAG_FADVISE_WILLNEED "LIBRADOS_OP_FLAG_FADVISE_WILLNEED"
71 _LIBRADOS_OP_FLAG_FADVISE_DONTNEED "LIBRADOS_OP_FLAG_FADVISE_DONTNEED"
72 _LIBRADOS_OP_FLAG_FADVISE_NOCACHE "LIBRADOS_OP_FLAG_FADVISE_NOCACHE"
73
74
75 enum:
76 _LIBRADOS_OPERATION_NOFLAG "LIBRADOS_OPERATION_NOFLAG"
77 _LIBRADOS_OPERATION_BALANCE_READS "LIBRADOS_OPERATION_BALANCE_READS"
78 _LIBRADOS_OPERATION_LOCALIZE_READS "LIBRADOS_OPERATION_LOCALIZE_READS"
79 _LIBRADOS_OPERATION_ORDER_READS_WRITES "LIBRADOS_OPERATION_ORDER_READS_WRITES"
80 _LIBRADOS_OPERATION_IGNORE_CACHE "LIBRADOS_OPERATION_IGNORE_CACHE"
81 _LIBRADOS_OPERATION_SKIPRWLOCKS "LIBRADOS_OPERATION_SKIPRWLOCKS"
82 _LIBRADOS_OPERATION_IGNORE_OVERLAY "LIBRADOS_OPERATION_IGNORE_OVERLAY"
83 _LIBRADOS_CREATE_EXCLUSIVE "LIBRADOS_CREATE_EXCLUSIVE"
84 _LIBRADOS_CREATE_IDEMPOTENT "LIBRADOS_CREATE_IDEMPOTENT"
85
86 cdef uint64_t _LIBRADOS_SNAP_HEAD "LIBRADOS_SNAP_HEAD"
87
88 ctypedef void* rados_t
89 ctypedef void* rados_config_t
90 ctypedef void* rados_ioctx_t
91 ctypedef void* rados_xattrs_iter_t
92 ctypedef void* rados_omap_iter_t
93 ctypedef void* rados_list_ctx_t
94 ctypedef uint64_t rados_snap_t
95 ctypedef void *rados_write_op_t
96 ctypedef void *rados_read_op_t
97 ctypedef void *rados_completion_t
98 ctypedef void (*rados_callback_t)(rados_completion_t cb, void *arg)
99 ctypedef void (*rados_log_callback_t)(void *arg, const char *line, const char *who,
100 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
101 ctypedef void (*rados_log_callback2_t)(void *arg, const char *line, const char *channel, const char *who, const char *name,
102 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
103
104
105 cdef struct rados_cluster_stat_t:
106 uint64_t kb
107 uint64_t kb_used
108 uint64_t kb_avail
109 uint64_t num_objects
110
111 cdef struct rados_pool_stat_t:
112 uint64_t num_bytes
113 uint64_t num_kb
114 uint64_t num_objects
115 uint64_t num_object_clones
116 uint64_t num_object_copies
117 uint64_t num_objects_missing_on_primary
118 uint64_t num_objects_unfound
119 uint64_t num_objects_degraded
120 uint64_t num_rd
121 uint64_t num_rd_kb
122 uint64_t num_wr
123 uint64_t num_wr_kb
124
125 void rados_buffer_free(char *buf)
126
127 void rados_version(int *major, int *minor, int *extra)
128 int rados_create2(rados_t *pcluster, const char *const clustername,
129 const char * const name, uint64_t flags)
130 int rados_create_with_context(rados_t *cluster, rados_config_t cct)
131 int rados_connect(rados_t cluster)
132 void rados_shutdown(rados_t cluster)
133 int rados_conf_read_file(rados_t cluster, const char *path)
134 int rados_conf_parse_argv_remainder(rados_t cluster, int argc, const char **argv, const char **remargv)
135 int rados_conf_parse_env(rados_t cluster, const char *var)
136 int rados_conf_set(rados_t cluster, char *option, const char *value)
137 int rados_conf_get(rados_t cluster, char *option, char *buf, size_t len)
138
139 int rados_ioctx_pool_stat(rados_ioctx_t io, rados_pool_stat_t *stats)
140 int64_t rados_pool_lookup(rados_t cluster, const char *pool_name)
141 int rados_pool_reverse_lookup(rados_t cluster, int64_t id, char *buf, size_t maxlen)
142 int rados_pool_create(rados_t cluster, const char *pool_name)
143 int rados_pool_create_with_auid(rados_t cluster, const char *pool_name, uint64_t auid)
144 int rados_pool_create_with_crush_rule(rados_t cluster, const char *pool_name, uint8_t crush_rule_num)
145 int rados_pool_create_with_all(rados_t cluster, const char *pool_name, uint64_t auid, uint8_t crush_rule_num)
146 int rados_pool_get_base_tier(rados_t cluster, int64_t pool, int64_t *base_tier)
147 int rados_pool_list(rados_t cluster, char *buf, size_t len)
148 int rados_pool_delete(rados_t cluster, const char *pool_name)
149 int rados_inconsistent_pg_list(rados_t cluster, int64_t pool, char *buf, size_t len)
150
151 int rados_cluster_stat(rados_t cluster, rados_cluster_stat_t *result)
152 int rados_cluster_fsid(rados_t cluster, char *buf, size_t len)
153 int rados_blacklist_add(rados_t cluster, char *client_address, uint32_t expire_seconds)
154
155 int rados_ping_monitor(rados_t cluster, const char *mon_id, char **outstr, size_t *outstrlen)
156 int rados_mon_command(rados_t cluster, const char **cmd, size_t cmdlen,
157 const char *inbuf, size_t inbuflen,
158 char **outbuf, size_t *outbuflen,
159 char **outs, size_t *outslen)
160 int rados_mgr_command(rados_t cluster, const char **cmd, size_t cmdlen,
161 const char *inbuf, size_t inbuflen,
162 char **outbuf, size_t *outbuflen,
163 char **outs, size_t *outslen)
164 int rados_mon_command_target(rados_t cluster, const char *name, const char **cmd, size_t cmdlen,
165 const char *inbuf, size_t inbuflen,
166 char **outbuf, size_t *outbuflen,
167 char **outs, size_t *outslen)
168 int rados_osd_command(rados_t cluster, int osdid, const char **cmd, size_t cmdlen,
169 const char *inbuf, size_t inbuflen,
170 char **outbuf, size_t *outbuflen,
171 char **outs, size_t *outslen)
172 int rados_pg_command(rados_t cluster, const char *pgstr, const char **cmd, size_t cmdlen,
173 const char *inbuf, size_t inbuflen,
174 char **outbuf, size_t *outbuflen,
175 char **outs, size_t *outslen)
176 int rados_monitor_log(rados_t cluster, const char *level, rados_log_callback_t cb, void *arg)
177 int rados_monitor_log2(rados_t cluster, const char *level, rados_log_callback2_t cb, void *arg)
178
179 int rados_wait_for_latest_osdmap(rados_t cluster)
180
181 int rados_ioctx_create(rados_t cluster, const char *pool_name, rados_ioctx_t *ioctx)
182 void rados_ioctx_destroy(rados_ioctx_t io)
183 int rados_ioctx_pool_set_auid(rados_ioctx_t io, uint64_t auid)
184 void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key)
185 void rados_ioctx_set_namespace(rados_ioctx_t io, const char * nspace)
186
187 uint64_t rados_get_last_version(rados_ioctx_t io)
188 int rados_stat(rados_ioctx_t io, const char *o, uint64_t *psize, time_t *pmtime)
189 int rados_write(rados_ioctx_t io, const char *oid, const char *buf, size_t len, uint64_t off)
190 int rados_write_full(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
191 int rados_append(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
192 int rados_read(rados_ioctx_t io, const char *oid, char *buf, size_t len, uint64_t off)
193 int rados_remove(rados_ioctx_t io, const char *oid)
194 int rados_trunc(rados_ioctx_t io, const char *oid, uint64_t size)
195 int rados_getxattr(rados_ioctx_t io, const char *o, const char *name, char *buf, size_t len)
196 int rados_setxattr(rados_ioctx_t io, const char *o, const char *name, const char *buf, size_t len)
197 int rados_rmxattr(rados_ioctx_t io, const char *o, const char *name)
198 int rados_getxattrs(rados_ioctx_t io, const char *oid, rados_xattrs_iter_t *iter)
199 int rados_getxattrs_next(rados_xattrs_iter_t iter, const char **name, const char **val, size_t *len)
200 void rados_getxattrs_end(rados_xattrs_iter_t iter)
201
202 int rados_nobjects_list_open(rados_ioctx_t io, rados_list_ctx_t *ctx)
203 int rados_nobjects_list_next(rados_list_ctx_t ctx, const char **entry, const char **key, const char **nspace)
204 void rados_nobjects_list_close(rados_list_ctx_t ctx)
205
206 int rados_ioctx_snap_rollback(rados_ioctx_t io, const char * oid, const char * snapname)
207 int rados_ioctx_snap_create(rados_ioctx_t io, const char * snapname)
208 int rados_ioctx_snap_remove(rados_ioctx_t io, const char * snapname)
209 int rados_ioctx_snap_lookup(rados_ioctx_t io, const char * name, rados_snap_t * id)
210 int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id, char * name, int maxlen)
211 void rados_ioctx_snap_set_read(rados_ioctx_t io, rados_snap_t snap)
212 int rados_ioctx_snap_list(rados_ioctx_t io, rados_snap_t * snaps, int maxlen)
213 int rados_ioctx_snap_get_stamp(rados_ioctx_t io, rados_snap_t id, time_t * t)
214
215 int rados_lock_exclusive(rados_ioctx_t io, const char * oid, const char * name,
216 const char * cookie, const char * desc,
217 timeval * duration, uint8_t flags)
218 int rados_lock_shared(rados_ioctx_t io, const char * o, const char * name,
219 const char * cookie, const char * tag, const char * desc,
220 timeval * duration, uint8_t flags)
221 int rados_unlock(rados_ioctx_t io, const char * o, const char * name, const char * cookie)
222
223 rados_write_op_t rados_create_write_op()
224 void rados_release_write_op(rados_write_op_t write_op)
225
226 rados_read_op_t rados_create_read_op()
227 void rados_release_read_op(rados_read_op_t read_op)
228
229 int rados_aio_create_completion(void * cb_arg, rados_callback_t cb_complete, rados_callback_t cb_safe, rados_completion_t * pc)
230 void rados_aio_release(rados_completion_t c)
231 int rados_aio_stat(rados_ioctx_t io, const char *oid, rados_completion_t completion, uint64_t *psize, time_t *pmtime)
232 int rados_aio_write(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len, uint64_t off)
233 int rados_aio_append(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
234 int rados_aio_write_full(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
235 int rados_aio_remove(rados_ioctx_t io, const char * oid, rados_completion_t completion)
236 int rados_aio_read(rados_ioctx_t io, const char * oid, rados_completion_t completion, char * buf, size_t len, uint64_t off)
237 int rados_aio_flush(rados_ioctx_t io)
238
239 int rados_aio_get_return_value(rados_completion_t c)
240 int rados_aio_wait_for_complete_and_cb(rados_completion_t c)
241 int rados_aio_wait_for_safe_and_cb(rados_completion_t c)
242 int rados_aio_wait_for_complete(rados_completion_t c)
243 int rados_aio_wait_for_safe(rados_completion_t c)
244 int rados_aio_is_complete(rados_completion_t c)
245 int rados_aio_is_safe(rados_completion_t c)
246
247 int rados_exec(rados_ioctx_t io, const char * oid, const char * cls, const char * method,
248 const char * in_buf, size_t in_len, char * buf, size_t out_len)
249 int rados_aio_exec(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * cls, const char * method,
250 const char * in_buf, size_t in_len, char * buf, size_t out_len)
251
252 int rados_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, const char * oid, time_t * mtime, int flags)
253 int rados_aio_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, time_t *mtime, int flags)
254 void rados_write_op_omap_set(rados_write_op_t write_op, const char * const* keys, const char * const* vals, const size_t * lens, size_t num)
255 void rados_write_op_omap_rm_keys(rados_write_op_t write_op, const char * const* keys, size_t keys_len)
256 void rados_write_op_omap_clear(rados_write_op_t write_op)
257 void rados_write_op_set_flags(rados_write_op_t write_op, int flags)
258
259 void rados_write_op_create(rados_write_op_t write_op, int exclusive, const char *category)
260 void rados_write_op_append(rados_write_op_t write_op, const char *buffer, size_t len)
261 void rados_write_op_write_full(rados_write_op_t write_op, const char *buffer, size_t len)
262 void rados_write_op_write(rados_write_op_t write_op, const char *buffer, size_t len, uint64_t offset)
263 void rados_write_op_remove(rados_write_op_t write_op)
264 void rados_write_op_truncate(rados_write_op_t write_op, uint64_t offset)
265 void rados_write_op_zero(rados_write_op_t write_op, uint64_t offset, uint64_t len)
266
267 void rados_read_op_omap_get_vals(rados_read_op_t read_op, const char * start_after, const char * filter_prefix, uint64_t max_return, rados_omap_iter_t * iter, int * prval)
268 void rados_read_op_omap_get_keys(rados_read_op_t read_op, const char * start_after, uint64_t max_return, rados_omap_iter_t * iter, int * prval)
269 void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op, const char * const* keys, size_t keys_len, rados_omap_iter_t * iter, int * prval)
270 int rados_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, const char * oid, int flags)
271 int rados_aio_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, int flags)
272 void rados_read_op_set_flags(rados_read_op_t read_op, int flags)
273 int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len)
274 void rados_omap_get_end(rados_omap_iter_t iter)
275
276
277 LIBRADOS_OP_FLAG_EXCL = _LIBRADOS_OP_FLAG_EXCL
278 LIBRADOS_OP_FLAG_FAILOK = _LIBRADOS_OP_FLAG_FAILOK
279 LIBRADOS_OP_FLAG_FADVISE_RANDOM = _LIBRADOS_OP_FLAG_FADVISE_RANDOM
280 LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL = _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL
281 LIBRADOS_OP_FLAG_FADVISE_WILLNEED = _LIBRADOS_OP_FLAG_FADVISE_WILLNEED
282 LIBRADOS_OP_FLAG_FADVISE_DONTNEED = _LIBRADOS_OP_FLAG_FADVISE_DONTNEED
283 LIBRADOS_OP_FLAG_FADVISE_NOCACHE = _LIBRADOS_OP_FLAG_FADVISE_NOCACHE
284
285 LIBRADOS_SNAP_HEAD = _LIBRADOS_SNAP_HEAD
286
287 LIBRADOS_OPERATION_NOFLAG = _LIBRADOS_OPERATION_NOFLAG
288 LIBRADOS_OPERATION_BALANCE_READS = _LIBRADOS_OPERATION_BALANCE_READS
289 LIBRADOS_OPERATION_LOCALIZE_READS = _LIBRADOS_OPERATION_LOCALIZE_READS
290 LIBRADOS_OPERATION_ORDER_READS_WRITES = _LIBRADOS_OPERATION_ORDER_READS_WRITES
291 LIBRADOS_OPERATION_IGNORE_CACHE = _LIBRADOS_OPERATION_IGNORE_CACHE
292 LIBRADOS_OPERATION_SKIPRWLOCKS = _LIBRADOS_OPERATION_SKIPRWLOCKS
293 LIBRADOS_OPERATION_IGNORE_OVERLAY = _LIBRADOS_OPERATION_IGNORE_OVERLAY
294
295 LIBRADOS_ALL_NSPACES = _LIBRADOS_ALL_NSPACES.decode('utf-8')
296
297 LIBRADOS_CREATE_EXCLUSIVE = _LIBRADOS_CREATE_EXCLUSIVE
298 LIBRADOS_CREATE_IDEMPOTENT = _LIBRADOS_CREATE_IDEMPOTENT
299
300 ANONYMOUS_AUID = 0xffffffffffffffff
301 ADMIN_AUID = 0
302
303
304 class Error(Exception):
305 """ `Error` class, derived from `Exception` """
306 pass
307
308
309 class InvalidArgumentError(Error):
310 pass
311
312
313 class OSError(Error):
314 """ `OSError` class, derived from `Error` """
315 def __init__(self, message, errno=None):
316 super(OSError, self).__init__(message)
317 self.errno = errno
318
319 def __str__(self):
320 msg = super(OSError, self).__str__()
321 if self.errno is None:
322 return msg
323 return '[errno {0}] {1}'.format(self.errno, msg)
324
325 def __reduce__(self):
326 return (self.__class__, (self.message, self.errno))
327
328 class InterruptedOrTimeoutError(OSError):
329 """ `InterruptedOrTimeoutError` class, derived from `OSError` """
330 pass
331
332
333 class PermissionError(OSError):
334 """ `PermissionError` class, derived from `OSError` """
335 pass
336
337
338 class PermissionDeniedError(OSError):
339 """ deal with EACCES related. """
340 pass
341
342
343 class ObjectNotFound(OSError):
344 """ `ObjectNotFound` class, derived from `OSError` """
345 pass
346
347
348 class NoData(OSError):
349 """ `NoData` class, derived from `OSError` """
350 pass
351
352
353 class ObjectExists(OSError):
354 """ `ObjectExists` class, derived from `OSError` """
355 pass
356
357
358 class ObjectBusy(OSError):
359 """ `ObjectBusy` class, derived from `IOError` """
360 pass
361
362
363 class IOError(OSError):
364 """ `ObjectBusy` class, derived from `OSError` """
365 pass
366
367
368 class NoSpace(OSError):
369 """ `NoSpace` class, derived from `OSError` """
370 pass
371
372
373 class RadosStateError(Error):
374 """ `RadosStateError` class, derived from `Error` """
375 pass
376
377
378 class IoctxStateError(Error):
379 """ `IoctxStateError` class, derived from `Error` """
380 pass
381
382
383 class ObjectStateError(Error):
384 """ `ObjectStateError` class, derived from `Error` """
385 pass
386
387
388 class LogicError(Error):
389 """ `` class, derived from `Error` """
390 pass
391
392
393 class TimedOut(OSError):
394 """ `TimedOut` class, derived from `OSError` """
395 pass
396
397
398 IF UNAME_SYSNAME == "FreeBSD":
399 cdef errno_to_exception = {
400 errno.EPERM : PermissionError,
401 errno.ENOENT : ObjectNotFound,
402 errno.EIO : IOError,
403 errno.ENOSPC : NoSpace,
404 errno.EEXIST : ObjectExists,
405 errno.EBUSY : ObjectBusy,
406 errno.ENOATTR : NoData,
407 errno.EINTR : InterruptedOrTimeoutError,
408 errno.ETIMEDOUT : TimedOut,
409 errno.EACCES : PermissionDeniedError,
410 errno.EINVAL : InvalidArgumentError,
411 }
412 ELSE:
413 cdef errno_to_exception = {
414 errno.EPERM : PermissionError,
415 errno.ENOENT : ObjectNotFound,
416 errno.EIO : IOError,
417 errno.ENOSPC : NoSpace,
418 errno.EEXIST : ObjectExists,
419 errno.EBUSY : ObjectBusy,
420 errno.ENODATA : NoData,
421 errno.EINTR : InterruptedOrTimeoutError,
422 errno.ETIMEDOUT : TimedOut,
423 errno.EACCES : PermissionDeniedError,
424 errno.EINVAL : InvalidArgumentError,
425 }
426
427
428 cdef make_ex(ret, msg):
429 """
430 Translate a librados return code into an exception.
431
432 :param ret: the return code
433 :type ret: int
434 :param msg: the error message to use
435 :type msg: str
436 :returns: a subclass of :class:`Error`
437 """
438 ret = abs(ret)
439 if ret in errno_to_exception:
440 return errno_to_exception[ret](msg, errno=ret)
441 else:
442 return OSError(msg, errno=ret)
443
444
445 # helper to specify an optional argument, where in addition to `cls`, `None`
446 # is also acceptable
447 def opt(cls):
448 return (cls, None)
449
450
451 # validate argument types of an instance method
452 # kwargs is an un-ordered dict, so use args instead
453 def requires(*types):
454 def is_type_of(v, t):
455 if t is None:
456 return v is None
457 else:
458 return isinstance(v, t)
459
460 def check_type(val, arg_name, arg_type):
461 if isinstance(arg_type, tuple):
462 if any(is_type_of(val, t) for t in arg_type):
463 return
464 type_names = ' or '.join('None' if t is None else t.__name__
465 for t in arg_type)
466 raise TypeError('%s must be %s' % (arg_name, type_names))
467 else:
468 if is_type_of(val, arg_type):
469 return
470 assert(arg_type is not None)
471 raise TypeError('%s must be %s' % (arg_name, arg_type.__name__))
472
473 def wrapper(f):
474 # FIXME(sileht): this stop with
475 # AttributeError: 'method_descriptor' object has no attribute '__module__'
476 # @wraps(f)
477 def validate_func(*args, **kwargs):
478 # ignore the `self` arg
479 pos_args = zip(args[1:], types)
480 named_args = ((kwargs[name], (name, spec)) for name, spec in types
481 if name in kwargs)
482 for arg_val, (arg_name, arg_type) in chain(pos_args, named_args):
483 check_type(arg_val, arg_name, arg_type)
484 return f(*args, **kwargs)
485 return validate_func
486 return wrapper
487
488
489 def cstr(val, name, encoding="utf-8", opt=False):
490 """
491 Create a byte string from a Python string
492
493 :param basestring val: Python string
494 :param str name: Name of the string parameter, for exceptions
495 :param str encoding: Encoding to use
496 :param bool opt: If True, None is allowed
497 :rtype: bytes
498 :raises: :class:`InvalidArgument`
499 """
500 if opt and val is None:
501 return None
502 if isinstance(val, bytes):
503 return val
504 elif isinstance(val, unicode):
505 return val.encode(encoding)
506 else:
507 raise TypeError('%s must be a string' % name)
508
509
510 def cstr_list(list_str, name, encoding="utf-8"):
511 return [cstr(s, name) for s in list_str]
512
513
514 def decode_cstr(val, encoding="utf-8"):
515 """
516 Decode a byte string into a Python string.
517
518 :param bytes val: byte string
519 :rtype: unicode or None
520 """
521 if val is None:
522 return None
523
524 return val.decode(encoding)
525
526
527 cdef char* opt_str(s) except? NULL:
528 if s is None:
529 return NULL
530 return s
531
532
533 cdef void* realloc_chk(void* ptr, size_t size) except NULL:
534 cdef void *ret = realloc(ptr, size)
535 if ret == NULL:
536 raise MemoryError("realloc failed")
537 return ret
538
539
540 cdef size_t * to_csize_t_array(list_int):
541 cdef size_t *ret = <size_t *>malloc(len(list_int) * sizeof(size_t))
542 if ret == NULL:
543 raise MemoryError("malloc failed")
544 for i in xrange(len(list_int)):
545 ret[i] = <size_t>list_int[i]
546 return ret
547
548
549 cdef char ** to_bytes_array(list_bytes):
550 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
551 if ret == NULL:
552 raise MemoryError("malloc failed")
553 for i in xrange(len(list_bytes)):
554 ret[i] = <char *>list_bytes[i]
555 return ret
556
557
558
559 cdef int __monitor_callback(void *arg, const char *line, const char *who,
560 uint64_t sec, uint64_t nsec, uint64_t seq,
561 const char *level, const char *msg) with gil:
562 cdef object cb_info = <object>arg
563 cb_info[0](cb_info[1], line, who, sec, nsec, seq, level, msg)
564 return 0
565
566 cdef int __monitor_callback2(void *arg, const char *line, const char *channel,
567 const char *who,
568 const char *name,
569 uint64_t sec, uint64_t nsec, uint64_t seq,
570 const char *level, const char *msg) with gil:
571 cdef object cb_info = <object>arg
572 cb_info[0](cb_info[1], line, channel, name, who, sec, nsec, seq, level, msg)
573 return 0
574
575
576 class Version(object):
577 """ Version information """
578 def __init__(self, major, minor, extra):
579 self.major = major
580 self.minor = minor
581 self.extra = extra
582
583 def __str__(self):
584 return "%d.%d.%d" % (self.major, self.minor, self.extra)
585
586
587 cdef class Rados(object):
588 """This class wraps librados functions"""
589 # NOTE(sileht): attributes declared in .pyd
590
591 def __init__(self, *args, **kwargs):
592 PyEval_InitThreads()
593 self.__setup(*args, **kwargs)
594
595 @requires(('rados_id', opt(str_type)), ('name', opt(str_type)), ('clustername', opt(str_type)),
596 ('conffile', opt(str_type)))
597 def __setup(self, rados_id=None, name=None, clustername=None,
598 conf_defaults=None, conffile=None, conf=None, flags=0,
599 context=None):
600 self.monitor_callback = None
601 self.monitor_callback2 = None
602 self.parsed_args = []
603 self.conf_defaults = conf_defaults
604 self.conffile = conffile
605 self.rados_id = rados_id
606
607 if rados_id and name:
608 raise Error("Rados(): can't supply both rados_id and name")
609 elif rados_id:
610 name = 'client.' + rados_id
611 elif name is None:
612 name = 'client.admin'
613 if clustername is None:
614 clustername = ''
615
616 name = cstr(name, 'name')
617 clustername = cstr(clustername, 'clustername')
618 cdef:
619 char *_name = name
620 char *_clustername = clustername
621 int _flags = flags
622 int ret
623
624 if context:
625 # Unpack void* (aka rados_config_t) from capsule
626 rados_config = <rados_config_t> PyCapsule_GetPointer(context, NULL)
627 with nogil:
628 ret = rados_create_with_context(&self.cluster, rados_config)
629 else:
630 with nogil:
631 ret = rados_create2(&self.cluster, _clustername, _name, _flags)
632 if ret != 0:
633 raise Error("rados_initialize failed with error code: %d" % ret)
634
635 self.state = "configuring"
636 # order is important: conf_defaults, then conffile, then conf
637 if conf_defaults:
638 for key, value in conf_defaults.items():
639 self.conf_set(key, value)
640 if conffile is not None:
641 # read the default conf file when '' is given
642 if conffile == '':
643 conffile = None
644 self.conf_read_file(conffile)
645 if conf:
646 for key, value in conf.items():
647 self.conf_set(key, value)
648
649 def require_state(self, *args):
650 """
651 Checks if the Rados object is in a special state
652
653 :raises: RadosStateError
654 """
655 if self.state in args:
656 return
657 raise RadosStateError("You cannot perform that operation on a \
658 Rados object in state %s." % self.state)
659
660 def shutdown(self):
661 """
662 Disconnects from the cluster. Call this explicitly when a
663 Rados.connect()ed object is no longer used.
664 """
665 if self.state != "shutdown":
666 with nogil:
667 rados_shutdown(self.cluster)
668 self.state = "shutdown"
669
670 def __enter__(self):
671 self.connect()
672 return self
673
674 def __exit__(self, type_, value, traceback):
675 self.shutdown()
676 return False
677
678 def version(self):
679 """
680 Get the version number of the ``librados`` C library.
681
682 :returns: a tuple of ``(major, minor, extra)`` components of the
683 librados version
684 """
685 cdef int major = 0
686 cdef int minor = 0
687 cdef int extra = 0
688 with nogil:
689 rados_version(&major, &minor, &extra)
690 return Version(major, minor, extra)
691
692 @requires(('path', opt(str_type)))
693 def conf_read_file(self, path=None):
694 """
695 Configure the cluster handle using a Ceph config file.
696
697 :param path: path to the config file
698 :type path: str
699 """
700 self.require_state("configuring", "connected")
701 path = cstr(path, 'path', opt=True)
702 cdef:
703 char *_path = opt_str(path)
704 with nogil:
705 ret = rados_conf_read_file(self.cluster, _path)
706 if ret != 0:
707 raise make_ex(ret, "error calling conf_read_file")
708
709 def conf_parse_argv(self, args):
710 """
711 Parse known arguments from args, and remove; returned
712 args contain only those unknown to ceph
713 """
714 self.require_state("configuring", "connected")
715 if not args:
716 return
717
718 cargs = cstr_list(args, 'args')
719 cdef:
720 int _argc = len(args)
721 char **_argv = to_bytes_array(cargs)
722 char **_remargv = NULL
723
724 try:
725 _remargv = <char **>malloc(_argc * sizeof(char *))
726 with nogil:
727 ret = rados_conf_parse_argv_remainder(self.cluster, _argc,
728 <const char**>_argv,
729 <const char**>_remargv)
730 if ret:
731 raise make_ex(ret, "error calling conf_parse_argv_remainder")
732
733 # _remargv was allocated with fixed argc; collapse return
734 # list to eliminate any missing args
735 retargs = [decode_cstr(a) for a in _remargv[:_argc]
736 if a != NULL]
737 self.parsed_args = args
738 return retargs
739 finally:
740 free(_argv)
741 free(_remargv)
742
743 def conf_parse_env(self, var='CEPH_ARGS'):
744 """
745 Parse known arguments from an environment variable, normally
746 CEPH_ARGS.
747 """
748 self.require_state("configuring", "connected")
749 if not var:
750 return
751
752 var = cstr(var, 'var')
753 cdef:
754 char *_var = var
755 with nogil:
756 ret = rados_conf_parse_env(self.cluster, _var)
757 if ret != 0:
758 raise make_ex(ret, "error calling conf_parse_env")
759
760 @requires(('option', str_type))
761 def conf_get(self, option):
762 """
763 Get the value of a configuration option
764
765 :param option: which option to read
766 :type option: str
767
768 :returns: str - value of the option or None
769 :raises: :class:`TypeError`
770 """
771 self.require_state("configuring", "connected")
772 option = cstr(option, 'option')
773 cdef:
774 char *_option = option
775 size_t length = 20
776 char *ret_buf = NULL
777
778 try:
779 while True:
780 ret_buf = <char *>realloc_chk(ret_buf, length)
781 with nogil:
782 ret = rados_conf_get(self.cluster, _option, ret_buf, length)
783 if ret == 0:
784 return decode_cstr(ret_buf)
785 elif ret == -errno.ENAMETOOLONG:
786 length = length * 2
787 elif ret == -errno.ENOENT:
788 return None
789 else:
790 raise make_ex(ret, "error calling conf_get")
791 finally:
792 free(ret_buf)
793
794 @requires(('option', str_type), ('val', str_type))
795 def conf_set(self, option, val):
796 """
797 Set the value of a configuration option
798
799 :param option: which option to set
800 :type option: str
801 :param option: value of the option
802 :type option: str
803
804 :raises: :class:`TypeError`, :class:`ObjectNotFound`
805 """
806 self.require_state("configuring", "connected")
807 option = cstr(option, 'option')
808 val = cstr(val, 'val')
809 cdef:
810 char *_option = option
811 char *_val = val
812
813 with nogil:
814 ret = rados_conf_set(self.cluster, _option, _val)
815 if ret != 0:
816 raise make_ex(ret, "error calling conf_set")
817
818 def ping_monitor(self, mon_id):
819 """
820 Ping a monitor to assess liveness
821
822 May be used as a simply way to assess liveness, or to obtain
823 information about the monitor in a simple way even in the
824 absence of quorum.
825
826 :param mon_id: the ID portion of the monitor's name (i.e., mon.<ID>)
827 :type mon_id: str
828 :returns: the string reply from the monitor
829 """
830
831 self.require_state("configuring", "connected")
832
833 mon_id = cstr(mon_id, 'mon_id')
834 cdef:
835 char *_mon_id = mon_id
836 size_t outstrlen = 0
837 char *outstr
838
839 with nogil:
840 ret = rados_ping_monitor(self.cluster, _mon_id, &outstr, &outstrlen)
841
842 if ret != 0:
843 raise make_ex(ret, "error calling ping_monitor")
844
845 if outstrlen:
846 my_outstr = outstr[:outstrlen]
847 rados_buffer_free(outstr)
848 return decode_cstr(my_outstr)
849
850 def connect(self, timeout=0):
851 """
852 Connect to the cluster. Use shutdown() to release resources.
853 """
854 self.require_state("configuring")
855 # NOTE(sileht): timeout was supported by old python API,
856 # but this is not something available in C API, so ignore
857 # for now and remove it later
858 with nogil:
859 ret = rados_connect(self.cluster)
860 if ret != 0:
861 raise make_ex(ret, "error connecting to the cluster")
862 self.state = "connected"
863
864 def get_cluster_stats(self):
865 """
866 Read usage info about the cluster
867
868 This tells you total space, space used, space available, and number
869 of objects. These are not updated immediately when data is written,
870 they are eventually consistent.
871
872 :returns: dict - contains the following keys:
873
874 - ``kb`` (int) - total space
875
876 - ``kb_used`` (int) - space used
877
878 - ``kb_avail`` (int) - free space available
879
880 - ``num_objects`` (int) - number of objects
881
882 """
883 cdef:
884 rados_cluster_stat_t stats
885
886 with nogil:
887 ret = rados_cluster_stat(self.cluster, &stats)
888
889 if ret < 0:
890 raise make_ex(
891 ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id)
892 return {'kb': stats.kb,
893 'kb_used': stats.kb_used,
894 'kb_avail': stats.kb_avail,
895 'num_objects': stats.num_objects}
896
897 @requires(('pool_name', str_type))
898 def pool_exists(self, pool_name):
899 """
900 Checks if a given pool exists.
901
902 :param pool_name: name of the pool to check
903 :type pool_name: str
904
905 :raises: :class:`TypeError`, :class:`Error`
906 :returns: bool - whether the pool exists, false otherwise.
907 """
908 self.require_state("connected")
909
910 pool_name = cstr(pool_name, 'pool_name')
911 cdef:
912 char *_pool_name = pool_name
913
914 with nogil:
915 ret = rados_pool_lookup(self.cluster, _pool_name)
916 if ret >= 0:
917 return True
918 elif ret == -errno.ENOENT:
919 return False
920 else:
921 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
922
923 @requires(('pool_name', str_type))
924 def pool_lookup(self, pool_name):
925 """
926 Returns a pool's ID based on its name.
927
928 :param pool_name: name of the pool to look up
929 :type pool_name: str
930
931 :raises: :class:`TypeError`, :class:`Error`
932 :returns: int - pool ID, or None if it doesn't exist
933 """
934 self.require_state("connected")
935 pool_name = cstr(pool_name, 'pool_name')
936 cdef:
937 char *_pool_name = pool_name
938
939 with nogil:
940 ret = rados_pool_lookup(self.cluster, _pool_name)
941 if ret >= 0:
942 return int(ret)
943 elif ret == -errno.ENOENT:
944 return None
945 else:
946 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
947
948 @requires(('pool_id', int))
949 def pool_reverse_lookup(self, pool_id):
950 """
951 Returns a pool's name based on its ID.
952
953 :param pool_id: ID of the pool to look up
954 :type pool_id: int
955
956 :raises: :class:`TypeError`, :class:`Error`
957 :returns: string - pool name, or None if it doesn't exist
958 """
959 self.require_state("connected")
960 cdef:
961 int64_t _pool_id = pool_id
962 size_t size = 512
963 char *name = NULL
964
965 try:
966 while True:
967 name = <char *>realloc_chk(name, size)
968 with nogil:
969 ret = rados_pool_reverse_lookup(self.cluster, _pool_id, name, size)
970 if ret >= 0:
971 break
972 elif ret != -errno.ERANGE and size <= 4096:
973 size *= 2
974 elif ret == -errno.ENOENT:
975 return None
976 elif ret < 0:
977 raise make_ex(ret, "error reverse looking up pool '%s'" % pool_id)
978
979 return decode_cstr(name)
980
981 finally:
982 free(name)
983
984 @requires(('pool_name', str_type), ('auid', opt(int)), ('crush_rule', opt(int)))
985 def create_pool(self, pool_name, auid=None, crush_rule=None):
986 """
987 Create a pool:
988 - with default settings: if auid=None and crush_rule=None
989 - owned by a specific auid: auid given and crush_rule=None
990 - with a specific CRUSH rule: if auid=None and crush_rule given
991 - with a specific CRUSH rule and auid: if auid and crush_rule given
992
993 :param pool_name: name of the pool to create
994 :type pool_name: str
995 :param auid: the id of the owner of the new pool
996 :type auid: int
997 :param crush_rule: rule to use for placement in the new pool
998 :type crush_rule: int
999
1000 :raises: :class:`TypeError`, :class:`Error`
1001 """
1002 self.require_state("connected")
1003
1004 pool_name = cstr(pool_name, 'pool_name')
1005 cdef:
1006 char *_pool_name = pool_name
1007 uint8_t _crush_rule
1008 uint64_t _auid
1009
1010 if auid is None and crush_rule is None:
1011 with nogil:
1012 ret = rados_pool_create(self.cluster, _pool_name)
1013 elif auid is None:
1014 _crush_rule = crush_rule
1015 with nogil:
1016 ret = rados_pool_create_with_crush_rule(self.cluster, _pool_name, _crush_rule)
1017 elif crush_rule is None:
1018 _auid = auid
1019 with nogil:
1020 ret = rados_pool_create_with_auid(self.cluster, _pool_name, _auid)
1021 else:
1022 _auid = auid
1023 _crush_rule = crush_rule
1024 with nogil:
1025 ret = rados_pool_create_with_all(self.cluster, _pool_name, _auid, _crush_rule)
1026 if ret < 0:
1027 raise make_ex(ret, "error creating pool '%s'" % pool_name)
1028
1029 @requires(('pool_id', int))
1030 def get_pool_base_tier(self, pool_id):
1031 """
1032 Get base pool
1033
1034 :returns: base pool, or pool_id if tiering is not configured for the pool
1035 """
1036 self.require_state("connected")
1037 cdef:
1038 int64_t base_tier = 0
1039 int64_t _pool_id = pool_id
1040
1041 with nogil:
1042 ret = rados_pool_get_base_tier(self.cluster, _pool_id, &base_tier)
1043 if ret < 0:
1044 raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id)
1045 return int(base_tier)
1046
1047 @requires(('pool_name', str_type))
1048 def delete_pool(self, pool_name):
1049 """
1050 Delete a pool and all data inside it.
1051
1052 The pool is removed from the cluster immediately,
1053 but the actual data is deleted in the background.
1054
1055 :param pool_name: name of the pool to delete
1056 :type pool_name: str
1057
1058 :raises: :class:`TypeError`, :class:`Error`
1059 """
1060 self.require_state("connected")
1061
1062 pool_name = cstr(pool_name, 'pool_name')
1063 cdef:
1064 char *_pool_name = pool_name
1065
1066 with nogil:
1067 ret = rados_pool_delete(self.cluster, _pool_name)
1068 if ret < 0:
1069 raise make_ex(ret, "error deleting pool '%s'" % pool_name)
1070
1071 @requires(('pool_id', int))
1072 def get_inconsistent_pgs(self, pool_id):
1073 """
1074 List inconsistent placement groups in the given pool
1075
1076 :param pool_id: ID of the pool in which PGs are listed
1077 :type pool_id: int
1078 :returns: list - inconsistent placement groups
1079 """
1080 self.require_state("connected")
1081 cdef:
1082 int64_t pool = pool_id
1083 size_t size = 512
1084 char *pgs = NULL
1085
1086 try:
1087 while True:
1088 pgs = <char *>realloc_chk(pgs, size);
1089 with nogil:
1090 ret = rados_inconsistent_pg_list(self.cluster, pool,
1091 pgs, size)
1092 if ret > <int>size:
1093 size *= 2
1094 elif ret >= 0:
1095 break
1096 else:
1097 raise make_ex(ret, "error calling inconsistent_pg_list")
1098 return [pg for pg in decode_cstr(pgs[:ret]).split('\0') if pg]
1099 finally:
1100 free(pgs)
1101
1102 def list_pools(self):
1103 """
1104 Gets a list of pool names.
1105
1106 :returns: list - of pool names.
1107 """
1108 self.require_state("connected")
1109 cdef:
1110 size_t size = 512
1111 char *c_names = NULL
1112
1113 try:
1114 while True:
1115 c_names = <char *>realloc_chk(c_names, size)
1116 with nogil:
1117 ret = rados_pool_list(self.cluster, c_names, size)
1118 if ret > <int>size:
1119 size *= 2
1120 elif ret >= 0:
1121 break
1122 return [name for name in decode_cstr(c_names[:ret]).split('\0')
1123 if name]
1124 finally:
1125 free(c_names)
1126
1127 def get_fsid(self):
1128 """
1129 Get the fsid of the cluster as a hexadecimal string.
1130
1131 :raises: :class:`Error`
1132 :returns: str - cluster fsid
1133 """
1134 self.require_state("connected")
1135 cdef:
1136 char *ret_buf
1137 size_t buf_len = 37
1138 PyObject* ret_s = NULL
1139
1140 ret_s = PyBytes_FromStringAndSize(NULL, buf_len)
1141 try:
1142 ret_buf = PyBytes_AsString(ret_s)
1143 with nogil:
1144 ret = rados_cluster_fsid(self.cluster, ret_buf, buf_len)
1145 if ret < 0:
1146 raise make_ex(ret, "error getting cluster fsid")
1147 if ret != <int>buf_len:
1148 _PyBytes_Resize(&ret_s, ret)
1149 return <object>ret_s
1150 finally:
1151 # We DECREF unconditionally: the cast to object above will have
1152 # INCREFed if necessary. This also takes care of exceptions,
1153 # including if _PyString_Resize fails (that will free the string
1154 # itself and set ret_s to NULL, hence XDECREF).
1155 ref.Py_XDECREF(ret_s)
1156
1157 @requires(('ioctx_name', str_type))
1158 def open_ioctx(self, ioctx_name):
1159 """
1160 Create an io context
1161
1162 The io context allows you to perform operations within a particular
1163 pool.
1164
1165 :param ioctx_name: name of the pool
1166 :type ioctx_name: str
1167
1168 :raises: :class:`TypeError`, :class:`Error`
1169 :returns: Ioctx - Rados Ioctx object
1170 """
1171 self.require_state("connected")
1172 ioctx_name = cstr(ioctx_name, 'ioctx_name')
1173 cdef:
1174 rados_ioctx_t ioctx
1175 char *_ioctx_name = ioctx_name
1176 with nogil:
1177 ret = rados_ioctx_create(self.cluster, _ioctx_name, &ioctx)
1178 if ret < 0:
1179 raise make_ex(ret, "error opening pool '%s'" % ioctx_name)
1180 io = Ioctx(ioctx_name)
1181 io.io = ioctx
1182 return io
1183
1184 def mon_command(self, cmd, inbuf, timeout=0, target=None):
1185 """
1186 mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen)
1187 returns (int ret, string outbuf, string outs)
1188 """
1189 # NOTE(sileht): timeout is ignored because C API doesn't provide
1190 # timeout argument, but we keep it for backward compat with old python binding
1191
1192 self.require_state("connected")
1193 cmd = cstr_list(cmd, 'c')
1194
1195 if isinstance(target, int):
1196 # NOTE(sileht): looks weird but test_monmap_dump pass int
1197 target = str(target)
1198
1199 target = cstr(target, 'target', opt=True)
1200 inbuf = cstr(inbuf, 'inbuf')
1201
1202 cdef:
1203 char *_target = opt_str(target)
1204 char **_cmd = to_bytes_array(cmd)
1205 size_t _cmdlen = len(cmd)
1206
1207 char *_inbuf = inbuf
1208 size_t _inbuf_len = len(inbuf)
1209
1210 char *_outbuf
1211 size_t _outbuf_len
1212 char *_outs
1213 size_t _outs_len
1214
1215 try:
1216 if target:
1217 with nogil:
1218 ret = rados_mon_command_target(self.cluster, _target,
1219 <const char **>_cmd, _cmdlen,
1220 <const char*>_inbuf, _inbuf_len,
1221 &_outbuf, &_outbuf_len,
1222 &_outs, &_outs_len)
1223 else:
1224 with nogil:
1225 ret = rados_mon_command(self.cluster,
1226 <const char **>_cmd, _cmdlen,
1227 <const char*>_inbuf, _inbuf_len,
1228 &_outbuf, &_outbuf_len,
1229 &_outs, &_outs_len)
1230
1231 my_outs = decode_cstr(_outs[:_outs_len])
1232 my_outbuf = _outbuf[:_outbuf_len]
1233 if _outs_len:
1234 rados_buffer_free(_outs)
1235 if _outbuf_len:
1236 rados_buffer_free(_outbuf)
1237 return (ret, my_outbuf, my_outs)
1238 finally:
1239 free(_cmd)
1240
1241 def osd_command(self, osdid, cmd, inbuf, timeout=0):
1242 """
1243 osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1244 returns (int ret, string outbuf, string outs)
1245 """
1246 # NOTE(sileht): timeout is ignored because C API doesn't provide
1247 # timeout argument, but we keep it for backward compat with old python binding
1248 self.require_state("connected")
1249
1250 cmd = cstr_list(cmd, 'cmd')
1251 inbuf = cstr(inbuf, 'inbuf')
1252
1253 cdef:
1254 int _osdid = osdid
1255 char **_cmd = to_bytes_array(cmd)
1256 size_t _cmdlen = len(cmd)
1257
1258 char *_inbuf = inbuf
1259 size_t _inbuf_len = len(inbuf)
1260
1261 char *_outbuf
1262 size_t _outbuf_len
1263 char *_outs
1264 size_t _outs_len
1265
1266 try:
1267 with nogil:
1268 ret = rados_osd_command(self.cluster, _osdid,
1269 <const char **>_cmd, _cmdlen,
1270 <const char*>_inbuf, _inbuf_len,
1271 &_outbuf, &_outbuf_len,
1272 &_outs, &_outs_len)
1273
1274 my_outs = decode_cstr(_outs[:_outs_len])
1275 my_outbuf = _outbuf[:_outbuf_len]
1276 if _outs_len:
1277 rados_buffer_free(_outs)
1278 if _outbuf_len:
1279 rados_buffer_free(_outbuf)
1280 return (ret, my_outbuf, my_outs)
1281 finally:
1282 free(_cmd)
1283
1284 def mgr_command(self, cmd, inbuf, timeout=0):
1285 """
1286 returns (int ret, string outbuf, string outs)
1287 """
1288 # NOTE(sileht): timeout is ignored because C API doesn't provide
1289 # timeout argument, but we keep it for backward compat with old python binding
1290 self.require_state("connected")
1291
1292 cmd = cstr_list(cmd, 'cmd')
1293 inbuf = cstr(inbuf, 'inbuf')
1294
1295 cdef:
1296 char **_cmd = to_bytes_array(cmd)
1297 size_t _cmdlen = len(cmd)
1298
1299 char *_inbuf = inbuf
1300 size_t _inbuf_len = len(inbuf)
1301
1302 char *_outbuf
1303 size_t _outbuf_len
1304 char *_outs
1305 size_t _outs_len
1306
1307 try:
1308 with nogil:
1309 ret = rados_mgr_command(self.cluster,
1310 <const char **>_cmd, _cmdlen,
1311 <const char*>_inbuf, _inbuf_len,
1312 &_outbuf, &_outbuf_len,
1313 &_outs, &_outs_len)
1314
1315 my_outs = decode_cstr(_outs[:_outs_len])
1316 my_outbuf = _outbuf[:_outbuf_len]
1317 if _outs_len:
1318 rados_buffer_free(_outs)
1319 if _outbuf_len:
1320 rados_buffer_free(_outbuf)
1321 return (ret, my_outbuf, my_outs)
1322 finally:
1323 free(_cmd)
1324
1325 def pg_command(self, pgid, cmd, inbuf, timeout=0):
1326 """
1327 pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1328 returns (int ret, string outbuf, string outs)
1329 """
1330 # NOTE(sileht): timeout is ignored because C API doesn't provide
1331 # timeout argument, but we keep it for backward compat with old python binding
1332 self.require_state("connected")
1333
1334 pgid = cstr(pgid, 'pgid')
1335 cmd = cstr_list(cmd, 'cmd')
1336 inbuf = cstr(inbuf, 'inbuf')
1337
1338 cdef:
1339 char *_pgid = pgid
1340 char **_cmd = to_bytes_array(cmd)
1341 size_t _cmdlen = len(cmd)
1342
1343 char *_inbuf = inbuf
1344 size_t _inbuf_len = len(inbuf)
1345
1346 char *_outbuf
1347 size_t _outbuf_len
1348 char *_outs
1349 size_t _outs_len
1350
1351 try:
1352 with nogil:
1353 ret = rados_pg_command(self.cluster, _pgid,
1354 <const char **>_cmd, _cmdlen,
1355 <const char *>_inbuf, _inbuf_len,
1356 &_outbuf, &_outbuf_len,
1357 &_outs, &_outs_len)
1358
1359 my_outs = decode_cstr(_outs[:_outs_len])
1360 my_outbuf = _outbuf[:_outbuf_len]
1361 if _outs_len:
1362 rados_buffer_free(_outs)
1363 if _outbuf_len:
1364 rados_buffer_free(_outbuf)
1365 return (ret, my_outbuf, my_outs)
1366 finally:
1367 free(_cmd)
1368
1369 def wait_for_latest_osdmap(self):
1370 self.require_state("connected")
1371 with nogil:
1372 ret = rados_wait_for_latest_osdmap(self.cluster)
1373 return ret
1374
1375 def blacklist_add(self, client_address, expire_seconds=0):
1376 """
1377 Blacklist a client from the OSDs
1378
1379 :param client_address: client address
1380 :type client_address: str
1381 :param expire_seconds: number of seconds to blacklist
1382 :type expire_seconds: int
1383
1384 :raises: :class:`Error`
1385 """
1386 self.require_state("connected")
1387 client_address = cstr(client_address, 'client_address')
1388 cdef:
1389 uint32_t _expire_seconds = expire_seconds
1390 char *_client_address = client_address
1391
1392 with nogil:
1393 ret = rados_blacklist_add(self.cluster, _client_address, _expire_seconds)
1394 if ret < 0:
1395 raise make_ex(ret, "error blacklisting client '%s'" % client_address)
1396
1397 def monitor_log(self, level, callback, arg):
1398 if level not in MONITOR_LEVELS:
1399 raise LogicError("invalid monitor level " + level)
1400 if callback is not None and not callable(callback):
1401 raise LogicError("callback must be a callable function or None")
1402
1403 level = cstr(level, 'level')
1404 cdef char *_level = level
1405
1406 if callback is None:
1407 with nogil:
1408 r = rados_monitor_log(self.cluster, <const char*>_level, NULL, NULL)
1409 self.monitor_callback = None
1410 self.monitor_callback2 = None
1411 return
1412
1413 cb = (callback, arg)
1414 cdef PyObject* _arg = <PyObject*>cb
1415 with nogil:
1416 r = rados_monitor_log(self.cluster, <const char*>_level,
1417 <rados_log_callback_t>&__monitor_callback, _arg)
1418
1419 if r:
1420 raise make_ex(r, 'error calling rados_monitor_log')
1421 # NOTE(sileht): Prevents the callback method from being garbage collected
1422 self.monitor_callback = cb
1423 self.monitor_callback2 = None
1424
1425 def monitor_log2(self, level, callback, arg):
1426 if level not in MONITOR_LEVELS:
1427 raise LogicError("invalid monitor level " + level)
1428 if callback is not None and not callable(callback):
1429 raise LogicError("callback must be a callable function or None")
1430
1431 level = cstr(level, 'level')
1432 cdef char *_level = level
1433
1434 if callback is None:
1435 with nogil:
1436 r = rados_monitor_log2(self.cluster, <const char*>_level, NULL, NULL)
1437 self.monitor_callback = None
1438 self.monitor_callback2 = None
1439 return
1440
1441 cb = (callback, arg)
1442 cdef PyObject* _arg = <PyObject*>cb
1443 with nogil:
1444 r = rados_monitor_log2(self.cluster, <const char*>_level,
1445 <rados_log_callback2_t>&__monitor_callback2, _arg)
1446
1447 if r:
1448 raise make_ex(r, 'error calling rados_monitor_log')
1449 # NOTE(sileht): Prevents the callback method from being garbage collected
1450 self.monitor_callback = None
1451 self.monitor_callback2 = cb
1452
1453
1454 cdef class OmapIterator(object):
1455 """Omap iterator"""
1456
1457 cdef public Ioctx ioctx
1458 cdef rados_omap_iter_t ctx
1459
1460 def __cinit__(self, Ioctx ioctx):
1461 self.ioctx = ioctx
1462
1463 def __iter__(self):
1464 return self
1465
1466 def __next__(self):
1467 """
1468 Get the next key-value pair in the object
1469 :returns: next rados.OmapItem
1470 """
1471 cdef:
1472 char *key_ = NULL
1473 char *val_ = NULL
1474 size_t len_
1475
1476 with nogil:
1477 ret = rados_omap_get_next(self.ctx, &key_, &val_, &len_)
1478
1479 if ret != 0:
1480 raise make_ex(ret, "error iterating over the omap")
1481 if key_ == NULL:
1482 raise StopIteration()
1483 key = decode_cstr(key_)
1484 val = None
1485 if val_ != NULL:
1486 val = val_[:len_]
1487 return (key, val)
1488
1489 def __dealloc__(self):
1490 with nogil:
1491 rados_omap_get_end(self.ctx)
1492
1493
1494 cdef class ObjectIterator(object):
1495 """rados.Ioctx Object iterator"""
1496
1497 cdef rados_list_ctx_t ctx
1498
1499 cdef public object ioctx
1500
1501 def __cinit__(self, Ioctx ioctx):
1502 self.ioctx = ioctx
1503
1504 with nogil:
1505 ret = rados_nobjects_list_open(ioctx.io, &self.ctx)
1506 if ret < 0:
1507 raise make_ex(ret, "error iterating over the objects in ioctx '%s'"
1508 % self.ioctx.name)
1509
1510 def __iter__(self):
1511 return self
1512
1513 def __next__(self):
1514 """
1515 Get the next object name and locator in the pool
1516
1517 :raises: StopIteration
1518 :returns: next rados.Ioctx Object
1519 """
1520 cdef:
1521 const char *key_ = NULL
1522 const char *locator_ = NULL
1523 const char *nspace_ = NULL
1524
1525 with nogil:
1526 ret = rados_nobjects_list_next(self.ctx, &key_, &locator_, &nspace_)
1527
1528 if ret < 0:
1529 raise StopIteration()
1530
1531 key = decode_cstr(key_)
1532 locator = decode_cstr(locator_) if locator_ != NULL else None
1533 nspace = decode_cstr(nspace_) if nspace_ != NULL else None
1534 return Object(self.ioctx, key, locator, nspace)
1535
1536 def __dealloc__(self):
1537 with nogil:
1538 rados_nobjects_list_close(self.ctx)
1539
1540
1541 cdef class XattrIterator(object):
1542 """Extended attribute iterator"""
1543
1544 cdef rados_xattrs_iter_t it
1545 cdef char* _oid
1546
1547 cdef public Ioctx ioctx
1548 cdef public object oid
1549
1550 def __cinit__(self, Ioctx ioctx, oid):
1551 self.ioctx = ioctx
1552 self.oid = cstr(oid, 'oid')
1553 self._oid = self.oid
1554
1555 with nogil:
1556 ret = rados_getxattrs(ioctx.io, self._oid, &self.it)
1557 if ret != 0:
1558 raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid)
1559
1560 def __iter__(self):
1561 return self
1562
1563 def __next__(self):
1564 """
1565 Get the next xattr on the object
1566
1567 :raises: StopIteration
1568 :returns: pair - of name and value of the next Xattr
1569 """
1570 cdef:
1571 const char *name_ = NULL
1572 const char *val_ = NULL
1573 size_t len_ = 0
1574
1575 with nogil:
1576 ret = rados_getxattrs_next(self.it, &name_, &val_, &len_)
1577 if ret != 0:
1578 raise make_ex(ret, "error iterating over the extended attributes \
1579 in '%s'" % self.oid)
1580 if name_ == NULL:
1581 raise StopIteration()
1582 name = decode_cstr(name_)
1583 val = val_[:len_]
1584 return (name, val)
1585
1586 def __dealloc__(self):
1587 with nogil:
1588 rados_getxattrs_end(self.it)
1589
1590
1591 cdef class SnapIterator(object):
1592 """Snapshot iterator"""
1593
1594 cdef public Ioctx ioctx
1595
1596 cdef rados_snap_t *snaps
1597 cdef int max_snap
1598 cdef int cur_snap
1599
1600 def __cinit__(self, Ioctx ioctx):
1601 self.ioctx = ioctx
1602 # We don't know how big a buffer we need until we've called the
1603 # function. So use the exponential doubling strategy.
1604 cdef int num_snaps = 10
1605 while True:
1606 self.snaps = <rados_snap_t*>realloc_chk(self.snaps,
1607 num_snaps *
1608 sizeof(rados_snap_t))
1609
1610 with nogil:
1611 ret = rados_ioctx_snap_list(ioctx.io, self.snaps, num_snaps)
1612 if ret >= 0:
1613 self.max_snap = ret
1614 break
1615 elif ret != -errno.ERANGE:
1616 raise make_ex(ret, "error calling rados_snap_list for \
1617 ioctx '%s'" % self.ioctx.name)
1618 num_snaps = num_snaps * 2
1619 self.cur_snap = 0
1620
1621 def __iter__(self):
1622 return self
1623
1624 def __next__(self):
1625 """
1626 Get the next Snapshot
1627
1628 :raises: :class:`Error`, StopIteration
1629 :returns: Snap - next snapshot
1630 """
1631 if self.cur_snap >= self.max_snap:
1632 raise StopIteration
1633
1634 cdef:
1635 rados_snap_t snap_id = self.snaps[self.cur_snap]
1636 int name_len = 10
1637 char *name = NULL
1638
1639 try:
1640 while True:
1641 name = <char *>realloc_chk(name, name_len)
1642 with nogil:
1643 ret = rados_ioctx_snap_get_name(self.ioctx.io, snap_id, name, name_len)
1644 if ret == 0:
1645 break
1646 elif ret != -errno.ERANGE:
1647 raise make_ex(ret, "rados_snap_get_name error")
1648 else:
1649 name_len = name_len * 2
1650
1651 snap = Snap(self.ioctx, decode_cstr(name[:name_len]).rstrip('\0'), snap_id)
1652 self.cur_snap = self.cur_snap + 1
1653 return snap
1654 finally:
1655 free(name)
1656
1657
1658 cdef class Snap(object):
1659 """Snapshot object"""
1660 cdef public Ioctx ioctx
1661 cdef public object name
1662
1663 # NOTE(sileht): old API was storing the ctypes object
1664 # instead of the value ....
1665 cdef public rados_snap_t snap_id
1666
1667 def __cinit__(self, Ioctx ioctx, object name, rados_snap_t snap_id):
1668 self.ioctx = ioctx
1669 self.name = name
1670 self.snap_id = snap_id
1671
1672 def __str__(self):
1673 return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \
1674 % (str(self.ioctx), self.name, self.snap_id)
1675
1676 def get_timestamp(self):
1677 """
1678 Find when a snapshot in the current pool occurred
1679
1680 :raises: :class:`Error`
1681 :returns: datetime - the data and time the snapshot was created
1682 """
1683 cdef time_t snap_time
1684
1685 with nogil:
1686 ret = rados_ioctx_snap_get_stamp(self.ioctx.io, self.snap_id, &snap_time)
1687 if ret != 0:
1688 raise make_ex(ret, "rados_ioctx_snap_get_stamp error")
1689 return datetime.fromtimestamp(snap_time)
1690
1691
1692 cdef class Completion(object):
1693 """completion object"""
1694
1695 cdef public:
1696 Ioctx ioctx
1697 object oncomplete
1698 object onsafe
1699
1700 cdef:
1701 rados_callback_t complete_cb
1702 rados_callback_t safe_cb
1703 rados_completion_t rados_comp
1704 PyObject* buf
1705
1706 def __cinit__(self, Ioctx ioctx, object oncomplete, object onsafe):
1707 self.oncomplete = oncomplete
1708 self.onsafe = onsafe
1709 self.ioctx = ioctx
1710
1711 def is_safe(self):
1712 """
1713 Is an asynchronous operation safe?
1714
1715 This does not imply that the safe callback has finished.
1716
1717 :returns: True if the operation is safe
1718 """
1719 with nogil:
1720 ret = rados_aio_is_safe(self.rados_comp)
1721 return ret == 1
1722
1723 def is_complete(self):
1724 """
1725 Has an asynchronous operation completed?
1726
1727 This does not imply that the safe callback has finished.
1728
1729 :returns: True if the operation is completed
1730 """
1731 with nogil:
1732 ret = rados_aio_is_complete(self.rados_comp)
1733 return ret == 1
1734
1735 def wait_for_safe(self):
1736 """
1737 Wait for an asynchronous operation to be marked safe
1738
1739 This does not imply that the safe callback has finished.
1740 """
1741 with nogil:
1742 rados_aio_wait_for_safe(self.rados_comp)
1743
1744 def wait_for_complete(self):
1745 """
1746 Wait for an asynchronous operation to complete
1747
1748 This does not imply that the complete callback has finished.
1749 """
1750 with nogil:
1751 rados_aio_wait_for_complete(self.rados_comp)
1752
1753 def wait_for_safe_and_cb(self):
1754 """
1755 Wait for an asynchronous operation to be marked safe and for
1756 the safe callback to have returned
1757 """
1758 with nogil:
1759 rados_aio_wait_for_safe_and_cb(self.rados_comp)
1760
1761 def wait_for_complete_and_cb(self):
1762 """
1763 Wait for an asynchronous operation to complete and for the
1764 complete callback to have returned
1765
1766 :returns: whether the operation is completed
1767 """
1768 with nogil:
1769 ret = rados_aio_wait_for_complete_and_cb(self.rados_comp)
1770 return ret
1771
1772 def get_return_value(self):
1773 """
1774 Get the return value of an asychronous operation
1775
1776 The return value is set when the operation is complete or safe,
1777 whichever comes first.
1778
1779 :returns: int - return value of the operation
1780 """
1781 with nogil:
1782 ret = rados_aio_get_return_value(self.rados_comp)
1783 return ret
1784
1785 def __dealloc__(self):
1786 """
1787 Release a completion
1788
1789 Call this when you no longer need the completion. It may not be
1790 freed immediately if the operation is not acked and committed.
1791 """
1792 ref.Py_XDECREF(self.buf)
1793 self.buf = NULL
1794 if self.rados_comp != NULL:
1795 with nogil:
1796 rados_aio_release(self.rados_comp)
1797 self.rados_comp = NULL
1798
1799 def _complete(self):
1800 self.oncomplete(self)
1801 with self.ioctx.lock:
1802 if self.oncomplete:
1803 self.ioctx.complete_completions.remove(self)
1804
1805 def _safe(self):
1806 self.onsafe(self)
1807 with self.ioctx.lock:
1808 if self.onsafe:
1809 self.ioctx.safe_completions.remove(self)
1810
1811 def _cleanup(self):
1812 with self.ioctx.lock:
1813 if self.oncomplete:
1814 self.ioctx.complete_completions.remove(self)
1815 if self.onsafe:
1816 self.ioctx.safe_completions.remove(self)
1817
1818
1819 class OpCtx(object):
1820 def __enter__(self):
1821 return self.create()
1822
1823 def __exit__(self, type, msg, traceback):
1824 self.release()
1825
1826
1827 cdef class WriteOp(object):
1828 cdef rados_write_op_t write_op
1829
1830 def create(self):
1831 with nogil:
1832 self.write_op = rados_create_write_op()
1833 return self
1834
1835 def release(self):
1836 with nogil:
1837 rados_release_write_op(self.write_op)
1838
1839 @requires(('exclusive', opt(int)))
1840 def new(self, exclusive=None):
1841 """
1842 Create the object.
1843 """
1844
1845 cdef:
1846 int _exclusive = exclusive
1847
1848 with nogil:
1849 rados_write_op_create(self.write_op, _exclusive, NULL)
1850
1851
1852 def remove(self):
1853 """
1854 Remove object.
1855 """
1856 with nogil:
1857 rados_write_op_remove(self.write_op)
1858
1859 @requires(('flags', int))
1860 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
1861 """
1862 Set flags for the last operation added to this write_op.
1863 :para flags: flags to apply to the last operation
1864 :type flags: int
1865 """
1866
1867 cdef:
1868 int _flags = flags
1869
1870 with nogil:
1871 rados_write_op_set_flags(self.write_op, _flags)
1872
1873 @requires(('to_write', bytes))
1874 def append(self, to_write):
1875 """
1876 Append data to an object synchronously
1877 :param to_write: data to write
1878 :type to_write: bytes
1879 """
1880
1881 cdef:
1882 char *_to_write = to_write
1883 size_t length = len(to_write)
1884
1885 with nogil:
1886 rados_write_op_append(self.write_op, _to_write, length)
1887
1888 @requires(('to_write', bytes))
1889 def write_full(self, to_write):
1890 """
1891 Write whole object, atomically replacing it.
1892 :param to_write: data to write
1893 :type to_write: bytes
1894 """
1895
1896 cdef:
1897 char *_to_write = to_write
1898 size_t length = len(to_write)
1899
1900 with nogil:
1901 rados_write_op_write_full(self.write_op, _to_write, length)
1902
1903 @requires(('to_write', bytes), ('offset', int))
1904 def write(self, to_write, offset=0):
1905 """
1906 Write to offset.
1907 :param to_write: data to write
1908 :type to_write: bytes
1909 :param offset: byte offset in the object to begin writing at
1910 :type offset: int
1911 """
1912
1913 cdef:
1914 char *_to_write = to_write
1915 size_t length = len(to_write)
1916 uint64_t _offset = offset
1917
1918 with nogil:
1919 rados_write_op_write(self.write_op, _to_write, length, _offset)
1920
1921 @requires(('offset', int), ('length', int))
1922 def zero(self, offset, length):
1923 """
1924 Zero part of an object.
1925 :param offset: byte offset in the object to begin writing at
1926 :type offset: int
1927 :param offset: number of zero to write
1928 :type offset: int
1929 """
1930
1931 cdef:
1932 size_t _length = length
1933 uint64_t _offset = offset
1934
1935 with nogil:
1936 rados_write_op_zero(self.write_op, _length, _offset)
1937
1938 @requires(('offset', int))
1939 def truncate(self, offset):
1940 """
1941 Truncate an object.
1942 :param offset: byte offset in the object to begin truncating at
1943 :type offset: int
1944 """
1945
1946 cdef:
1947 uint64_t _offset = offset
1948
1949 with nogil:
1950 rados_write_op_truncate(self.write_op, _offset)
1951
1952
1953 class WriteOpCtx(WriteOp, OpCtx):
1954 """write operation context manager"""
1955
1956
1957 cdef class ReadOp(object):
1958 cdef rados_read_op_t read_op
1959
1960 def create(self):
1961 with nogil:
1962 self.read_op = rados_create_read_op()
1963 return self
1964
1965 def release(self):
1966 with nogil:
1967 rados_release_read_op(self.read_op)
1968
1969 @requires(('flags', int))
1970 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
1971 """
1972 Set flags for the last operation added to this read_op.
1973 :para flags: flags to apply to the last operation
1974 :type flags: int
1975 """
1976
1977 cdef:
1978 int _flags = flags
1979
1980 with nogil:
1981 rados_read_op_set_flags(self.read_op, _flags)
1982
1983
1984 class ReadOpCtx(ReadOp, OpCtx):
1985 """read operation context manager"""
1986
1987
1988 cdef int __aio_safe_cb(rados_completion_t completion, void *args) with gil:
1989 """
1990 Callback to onsafe() for asynchronous operations
1991 """
1992 cdef object cb = <object>args
1993 cb._safe()
1994 return 0
1995
1996
1997 cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
1998 """
1999 Callback to oncomplete() for asynchronous operations
2000 """
2001 cdef object cb = <object>args
2002 cb._complete()
2003 return 0
2004
2005
2006 cdef class Ioctx(object):
2007 """rados.Ioctx object"""
2008 # NOTE(sileht): attributes declared in .pyd
2009
2010 def __init__(self, name):
2011 self.name = name
2012 self.state = "open"
2013
2014 self.locator_key = ""
2015 self.nspace = ""
2016 self.lock = threading.Lock()
2017 self.safe_completions = []
2018 self.complete_completions = []
2019
2020 def __enter__(self):
2021 return self
2022
2023 def __exit__(self, type_, value, traceback):
2024 self.close()
2025 return False
2026
2027 def __dealloc__(self):
2028 self.close()
2029
2030 def __track_completion(self, completion_obj):
2031 if completion_obj.oncomplete:
2032 with self.lock:
2033 self.complete_completions.append(completion_obj)
2034 if completion_obj.onsafe:
2035 with self.lock:
2036 self.safe_completions.append(completion_obj)
2037
2038 def __get_completion(self, oncomplete, onsafe):
2039 """
2040 Constructs a completion to use with asynchronous operations
2041
2042 :param oncomplete: what to do when the write is safe and complete in memory
2043 on all replicas
2044 :type oncomplete: completion
2045 :param onsafe: what to do when the write is safe and complete on storage
2046 on all replicas
2047 :type onsafe: completion
2048
2049 :raises: :class:`Error`
2050 :returns: completion object
2051 """
2052
2053 completion_obj = Completion(self, oncomplete, onsafe)
2054
2055 cdef:
2056 rados_callback_t complete_cb = NULL
2057 rados_callback_t safe_cb = NULL
2058 rados_completion_t completion
2059 PyObject* p_completion_obj= <PyObject*>completion_obj
2060
2061 if oncomplete:
2062 complete_cb = <rados_callback_t>&__aio_complete_cb
2063 if onsafe:
2064 safe_cb = <rados_callback_t>&__aio_safe_cb
2065
2066 with nogil:
2067 ret = rados_aio_create_completion(p_completion_obj, complete_cb, safe_cb,
2068 &completion)
2069 if ret < 0:
2070 raise make_ex(ret, "error getting a completion")
2071
2072 completion_obj.rados_comp = completion
2073 return completion_obj
2074
2075 @requires(('object_name', str_type), ('oncomplete', opt(Callable)))
2076 def aio_stat(self, object_name, oncomplete):
2077 """
2078 Asynchronously get object stats (size/mtime)
2079
2080 oncomplete will be called with the returned size and mtime
2081 as well as the completion:
2082
2083 oncomplete(completion, size, mtime)
2084
2085 :param object_name: the name of the object to get stats from
2086 :type object_name: str
2087 :param oncomplete: what to do when the stat is complete
2088 :type oncomplete: completion
2089
2090 :raises: :class:`Error`
2091 :returns: completion object
2092 """
2093
2094 object_name = cstr(object_name, 'object_name')
2095
2096 cdef:
2097 Completion completion
2098 char *_object_name = object_name
2099 uint64_t psize
2100 time_t pmtime
2101
2102 def oncomplete_(completion_v):
2103 cdef Completion _completion_v = completion_v
2104 return_value = _completion_v.get_return_value()
2105 if return_value >= 0:
2106 return oncomplete(_completion_v, psize, time.localtime(pmtime))
2107 else:
2108 return oncomplete(_completion_v, None, None)
2109
2110 completion = self.__get_completion(oncomplete_, None)
2111 self.__track_completion(completion)
2112 with nogil:
2113 ret = rados_aio_stat(self.io, _object_name, completion.rados_comp,
2114 &psize, &pmtime)
2115
2116 if ret < 0:
2117 completion._cleanup()
2118 raise make_ex(ret, "error stating %s" % object_name)
2119 return completion
2120
2121 @requires(('object_name', str_type), ('to_write', bytes), ('offset', int),
2122 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2123 def aio_write(self, object_name, to_write, offset=0,
2124 oncomplete=None, onsafe=None):
2125 """
2126 Write data to an object asynchronously
2127
2128 Queues the write and returns.
2129
2130 :param object_name: name of the object
2131 :type object_name: str
2132 :param to_write: data to write
2133 :type to_write: bytes
2134 :param offset: byte offset in the object to begin writing at
2135 :type offset: int
2136 :param oncomplete: what to do when the write is safe and complete in memory
2137 on all replicas
2138 :type oncomplete: completion
2139 :param onsafe: what to do when the write is safe and complete on storage
2140 on all replicas
2141 :type onsafe: completion
2142
2143 :raises: :class:`Error`
2144 :returns: completion object
2145 """
2146
2147 object_name = cstr(object_name, 'object_name')
2148
2149 cdef:
2150 Completion completion
2151 char* _object_name = object_name
2152 char* _to_write = to_write
2153 size_t size = len(to_write)
2154 uint64_t _offset = offset
2155
2156 completion = self.__get_completion(oncomplete, onsafe)
2157 self.__track_completion(completion)
2158 with nogil:
2159 ret = rados_aio_write(self.io, _object_name, completion.rados_comp,
2160 _to_write, size, _offset)
2161 if ret < 0:
2162 completion._cleanup()
2163 raise make_ex(ret, "error writing object %s" % object_name)
2164 return completion
2165
2166 @requires(('object_name', str_type), ('to_write', bytes), ('oncomplete', opt(Callable)),
2167 ('onsafe', opt(Callable)))
2168 def aio_write_full(self, object_name, to_write,
2169 oncomplete=None, onsafe=None):
2170 """
2171 Asychronously write an entire object
2172
2173 The object is filled with the provided data. If the object exists,
2174 it is atomically truncated and then written.
2175 Queues the write and returns.
2176
2177 :param object_name: name of the object
2178 :type object_name: str
2179 :param to_write: data to write
2180 :type to_write: str
2181 :param oncomplete: what to do when the write is safe and complete in memory
2182 on all replicas
2183 :type oncomplete: completion
2184 :param onsafe: what to do when the write is safe and complete on storage
2185 on all replicas
2186 :type onsafe: completion
2187
2188 :raises: :class:`Error`
2189 :returns: completion object
2190 """
2191
2192 object_name = cstr(object_name, 'object_name')
2193
2194 cdef:
2195 Completion completion
2196 char* _object_name = object_name
2197 char* _to_write = to_write
2198 size_t size = len(to_write)
2199
2200 completion = self.__get_completion(oncomplete, onsafe)
2201 self.__track_completion(completion)
2202 with nogil:
2203 ret = rados_aio_write_full(self.io, _object_name,
2204 completion.rados_comp,
2205 _to_write, size)
2206 if ret < 0:
2207 completion._cleanup()
2208 raise make_ex(ret, "error writing object %s" % object_name)
2209 return completion
2210
2211 @requires(('object_name', str_type), ('to_append', bytes), ('oncomplete', opt(Callable)),
2212 ('onsafe', opt(Callable)))
2213 def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):
2214 """
2215 Asychronously append data to an object
2216
2217 Queues the write and returns.
2218
2219 :param object_name: name of the object
2220 :type object_name: str
2221 :param to_append: data to append
2222 :type to_append: str
2223 :param offset: byte offset in the object to begin writing at
2224 :type offset: int
2225 :param oncomplete: what to do when the write is safe and complete in memory
2226 on all replicas
2227 :type oncomplete: completion
2228 :param onsafe: what to do when the write is safe and complete on storage
2229 on all replicas
2230 :type onsafe: completion
2231
2232 :raises: :class:`Error`
2233 :returns: completion object
2234 """
2235 object_name = cstr(object_name, 'object_name')
2236
2237 cdef:
2238 Completion completion
2239 char* _object_name = object_name
2240 char* _to_append = to_append
2241 size_t size = len(to_append)
2242
2243 completion = self.__get_completion(oncomplete, onsafe)
2244 self.__track_completion(completion)
2245 with nogil:
2246 ret = rados_aio_append(self.io, _object_name,
2247 completion.rados_comp,
2248 _to_append, size)
2249 if ret < 0:
2250 completion._cleanup()
2251 raise make_ex(ret, "error appending object %s" % object_name)
2252 return completion
2253
2254 def aio_flush(self):
2255 """
2256 Block until all pending writes in an io context are safe
2257
2258 :raises: :class:`Error`
2259 """
2260 with nogil:
2261 ret = rados_aio_flush(self.io)
2262 if ret < 0:
2263 raise make_ex(ret, "error flushing")
2264
2265 @requires(('object_name', str_type), ('length', int), ('offset', int),
2266 ('oncomplete', opt(Callable)))
2267 def aio_read(self, object_name, length, offset, oncomplete):
2268 """
2269 Asychronously read data from an object
2270
2271 oncomplete will be called with the returned read value as
2272 well as the completion:
2273
2274 oncomplete(completion, data_read)
2275
2276 :param object_name: name of the object to read from
2277 :type object_name: str
2278 :param length: the number of bytes to read
2279 :type length: int
2280 :param offset: byte offset in the object to begin reading from
2281 :type offset: int
2282 :param oncomplete: what to do when the read is complete
2283 :type oncomplete: completion
2284
2285 :raises: :class:`Error`
2286 :returns: completion object
2287 """
2288
2289 object_name = cstr(object_name, 'object_name')
2290
2291 cdef:
2292 Completion completion
2293 char* _object_name = object_name
2294 uint64_t _offset = offset
2295
2296 char *ref_buf
2297 size_t _length = length
2298
2299 def oncomplete_(completion_v):
2300 cdef Completion _completion_v = completion_v
2301 return_value = _completion_v.get_return_value()
2302 if return_value > 0 and return_value != length:
2303 _PyBytes_Resize(&_completion_v.buf, return_value)
2304 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2305
2306 completion = self.__get_completion(oncomplete_, None)
2307 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2308 ret_buf = PyBytes_AsString(completion.buf)
2309 self.__track_completion(completion)
2310 with nogil:
2311 ret = rados_aio_read(self.io, _object_name, completion.rados_comp,
2312 ret_buf, _length, _offset)
2313 if ret < 0:
2314 completion._cleanup()
2315 raise make_ex(ret, "error reading %s" % object_name)
2316 return completion
2317
2318 @requires(('object_name', str_type), ('cls', str_type), ('method', str_type),
2319 ('data', bytes), ('length', int),
2320 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2321 def aio_execute(self, object_name, cls, method, data,
2322 length=8192, oncomplete=None, onsafe=None):
2323 """
2324 Asynchronously execute an OSD class method on an object.
2325
2326 oncomplete and onsafe will be called with the data returned from
2327 the plugin as well as the completion:
2328
2329 oncomplete(completion, data)
2330 onsafe(completion, data)
2331
2332 :param object_name: name of the object
2333 :type object_name: str
2334 :param cls: name of the object class
2335 :type cls: str
2336 :param method: name of the method
2337 :type method: str
2338 :param data: input data
2339 :type data: bytes
2340 :param length: size of output buffer in bytes (default=8192)
2341 :type length: int
2342 :param oncomplete: what to do when the execution is complete
2343 :type oncomplete: completion
2344 :param onsafe: what to do when the execution is safe and complete
2345 :type onsafe: completion
2346
2347 :raises: :class:`Error`
2348 :returns: completion object
2349 """
2350
2351 object_name = cstr(object_name, 'object_name')
2352 cls = cstr(cls, 'cls')
2353 method = cstr(method, 'method')
2354 cdef:
2355 Completion completion
2356 char *_object_name = object_name
2357 char *_cls = cls
2358 char *_method = method
2359 char *_data = data
2360 size_t _data_len = len(data)
2361
2362 char *ref_buf
2363 size_t _length = length
2364
2365 def oncomplete_(completion_v):
2366 cdef Completion _completion_v = completion_v
2367 return_value = _completion_v.get_return_value()
2368 if return_value > 0 and return_value != length:
2369 _PyBytes_Resize(&_completion_v.buf, return_value)
2370 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2371
2372 def onsafe_(completion_v):
2373 cdef Completion _completion_v = completion_v
2374 return_value = _completion_v.get_return_value()
2375 return onsafe(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2376
2377 completion = self.__get_completion(oncomplete_ if oncomplete else None, onsafe_ if onsafe else None)
2378 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2379 ret_buf = PyBytes_AsString(completion.buf)
2380 self.__track_completion(completion)
2381 with nogil:
2382 ret = rados_aio_exec(self.io, _object_name, completion.rados_comp,
2383 _cls, _method, _data, _data_len, ret_buf, _length)
2384 if ret < 0:
2385 completion._cleanup()
2386 raise make_ex(ret, "error executing %s::%s on %s" % (cls, method, object_name))
2387 return completion
2388
2389 @requires(('object_name', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2390 def aio_remove(self, object_name, oncomplete=None, onsafe=None):
2391 """
2392 Asychronously remove an object
2393
2394 :param object_name: name of the object to remove
2395 :type object_name: str
2396 :param oncomplete: what to do when the remove is safe and complete in memory
2397 on all replicas
2398 :type oncomplete: completion
2399 :param onsafe: what to do when the remove is safe and complete on storage
2400 on all replicas
2401 :type onsafe: completion
2402
2403 :raises: :class:`Error`
2404 :returns: completion object
2405 """
2406 object_name = cstr(object_name, 'object_name')
2407
2408 cdef:
2409 Completion completion
2410 char* _object_name = object_name
2411
2412 completion = self.__get_completion(oncomplete, onsafe)
2413 self.__track_completion(completion)
2414 with nogil:
2415 ret = rados_aio_remove(self.io, _object_name,
2416 completion.rados_comp)
2417 if ret < 0:
2418 completion._cleanup()
2419 raise make_ex(ret, "error removing %s" % object_name)
2420 return completion
2421
2422 def require_ioctx_open(self):
2423 """
2424 Checks if the rados.Ioctx object state is 'open'
2425
2426 :raises: IoctxStateError
2427 """
2428 if self.state != "open":
2429 raise IoctxStateError("The pool is %s" % self.state)
2430
2431 def change_auid(self, auid):
2432 """
2433 Attempt to change an io context's associated auid "owner."
2434
2435 Requires that you have write permission on both the current and new
2436 auid.
2437
2438 :raises: :class:`Error`
2439 """
2440 self.require_ioctx_open()
2441
2442 cdef:
2443 uint64_t _auid = auid
2444
2445 with nogil:
2446 ret = rados_ioctx_pool_set_auid(self.io, _auid)
2447 if ret < 0:
2448 raise make_ex(ret, "error changing auid of '%s' to %d"
2449 % (self.name, auid))
2450
2451 @requires(('loc_key', str_type))
2452 def set_locator_key(self, loc_key):
2453 """
2454 Set the key for mapping objects to pgs within an io context.
2455
2456 The key is used instead of the object name to determine which
2457 placement groups an object is put in. This affects all subsequent
2458 operations of the io context - until a different locator key is
2459 set, all objects in this io context will be placed in the same pg.
2460
2461 :param loc_key: the key to use as the object locator, or NULL to discard
2462 any previously set key
2463 :type loc_key: str
2464
2465 :raises: :class:`TypeError`
2466 """
2467 self.require_ioctx_open()
2468 cloc_key = cstr(loc_key, 'loc_key')
2469 cdef char *_loc_key = cloc_key
2470 with nogil:
2471 rados_ioctx_locator_set_key(self.io, _loc_key)
2472 self.locator_key = loc_key
2473
2474 def get_locator_key(self):
2475 """
2476 Get the locator_key of context
2477
2478 :returns: locator_key
2479 """
2480 return self.locator_key
2481
2482 @requires(('snap_id', long))
2483 def set_read(self, snap_id):
2484 """
2485 Set the snapshot for reading objects.
2486
2487 To stop to read from snapshot, use set_read(LIBRADOS_SNAP_HEAD)
2488
2489 :param snap_id: the snapshot Id
2490 :type snap_id: int
2491
2492 :raises: :class:`TypeError`
2493 """
2494 self.require_ioctx_open()
2495 cdef rados_snap_t _snap_id = snap_id
2496 with nogil:
2497 rados_ioctx_snap_set_read(self.io, _snap_id)
2498
2499 @requires(('nspace', str_type))
2500 def set_namespace(self, nspace):
2501 """
2502 Set the namespace for objects within an io context.
2503
2504 The namespace in addition to the object name fully identifies
2505 an object. This affects all subsequent operations of the io context
2506 - until a different namespace is set, all objects in this io context
2507 will be placed in the same namespace.
2508
2509 :param nspace: the namespace to use, or None/"" for the default namespace
2510 :type nspace: str
2511
2512 :raises: :class:`TypeError`
2513 """
2514 self.require_ioctx_open()
2515 if nspace is None:
2516 nspace = ""
2517 cnspace = cstr(nspace, 'nspace')
2518 cdef char *_nspace = cnspace
2519 with nogil:
2520 rados_ioctx_set_namespace(self.io, _nspace)
2521 self.nspace = nspace
2522
2523 def get_namespace(self):
2524 """
2525 Get the namespace of context
2526
2527 :returns: namespace
2528 """
2529 return self.nspace
2530
2531 def close(self):
2532 """
2533 Close a rados.Ioctx object.
2534
2535 This just tells librados that you no longer need to use the io context.
2536 It may not be freed immediately if there are pending asynchronous
2537 requests on it, but you should not use an io context again after
2538 calling this function on it.
2539 """
2540 if self.state == "open":
2541 self.require_ioctx_open()
2542 with nogil:
2543 rados_ioctx_destroy(self.io)
2544 self.state = "closed"
2545
2546
2547 @requires(('key', str_type), ('data', bytes))
2548 def write(self, key, data, offset=0):
2549 """
2550 Write data to an object synchronously
2551
2552 :param key: name of the object
2553 :type key: str
2554 :param data: data to write
2555 :type data: bytes
2556 :param offset: byte offset in the object to begin writing at
2557 :type offset: int
2558
2559 :raises: :class:`TypeError`
2560 :raises: :class:`LogicError`
2561 :returns: int - 0 on success
2562 """
2563 self.require_ioctx_open()
2564
2565 key = cstr(key, 'key')
2566 cdef:
2567 char *_key = key
2568 char *_data = data
2569 size_t length = len(data)
2570 uint64_t _offset = offset
2571
2572 with nogil:
2573 ret = rados_write(self.io, _key, _data, length, _offset)
2574 if ret == 0:
2575 return ret
2576 elif ret < 0:
2577 raise make_ex(ret, "Ioctx.write(%s): failed to write %s"
2578 % (self.name, key))
2579 else:
2580 raise LogicError("Ioctx.write(%s): rados_write \
2581 returned %d, but should return zero on success." % (self.name, ret))
2582
2583 @requires(('key', str_type), ('data', bytes))
2584 def write_full(self, key, data):
2585 """
2586 Write an entire object synchronously.
2587
2588 The object is filled with the provided data. If the object exists,
2589 it is atomically truncated and then written.
2590
2591 :param key: name of the object
2592 :type key: str
2593 :param data: data to write
2594 :type data: bytes
2595
2596 :raises: :class:`TypeError`
2597 :raises: :class:`Error`
2598 :returns: int - 0 on success
2599 """
2600 self.require_ioctx_open()
2601 key = cstr(key, 'key')
2602 cdef:
2603 char *_key = key
2604 char *_data = data
2605 size_t length = len(data)
2606
2607 with nogil:
2608 ret = rados_write_full(self.io, _key, _data, length)
2609 if ret == 0:
2610 return ret
2611 elif ret < 0:
2612 raise make_ex(ret, "Ioctx.write_full(%s): failed to write %s"
2613 % (self.name, key))
2614 else:
2615 raise LogicError("Ioctx.write_full(%s): rados_write_full \
2616 returned %d, but should return zero on success." % (self.name, ret))
2617
2618 @requires(('key', str_type), ('data', bytes))
2619 def append(self, key, data):
2620 """
2621 Append data to an object synchronously
2622
2623 :param key: name of the object
2624 :type key: str
2625 :param data: data to write
2626 :type data: bytes
2627
2628 :raises: :class:`TypeError`
2629 :raises: :class:`LogicError`
2630 :returns: int - 0 on success
2631 """
2632 self.require_ioctx_open()
2633 key = cstr(key, 'key')
2634 cdef:
2635 char *_key = key
2636 char *_data = data
2637 size_t length = len(data)
2638
2639 with nogil:
2640 ret = rados_append(self.io, _key, _data, length)
2641 if ret == 0:
2642 return ret
2643 elif ret < 0:
2644 raise make_ex(ret, "Ioctx.append(%s): failed to append %s"
2645 % (self.name, key))
2646 else:
2647 raise LogicError("Ioctx.append(%s): rados_append \
2648 returned %d, but should return zero on success." % (self.name, ret))
2649
2650 @requires(('key', str_type))
2651 def read(self, key, length=8192, offset=0):
2652 """
2653 Read data from an object synchronously
2654
2655 :param key: name of the object
2656 :type key: str
2657 :param length: the number of bytes to read (default=8192)
2658 :type length: int
2659 :param offset: byte offset in the object to begin reading at
2660 :type offset: int
2661
2662 :raises: :class:`TypeError`
2663 :raises: :class:`Error`
2664 :returns: str - data read from object
2665 """
2666 self.require_ioctx_open()
2667 key = cstr(key, 'key')
2668 cdef:
2669 char *_key = key
2670 char *ret_buf
2671 uint64_t _offset = offset
2672 size_t _length = length
2673 PyObject* ret_s = NULL
2674
2675 ret_s = PyBytes_FromStringAndSize(NULL, length)
2676 try:
2677 ret_buf = PyBytes_AsString(ret_s)
2678 with nogil:
2679 ret = rados_read(self.io, _key, ret_buf, _length, _offset)
2680 if ret < 0:
2681 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2682
2683 if ret != length:
2684 _PyBytes_Resize(&ret_s, ret)
2685
2686 return <object>ret_s
2687 finally:
2688 # We DECREF unconditionally: the cast to object above will have
2689 # INCREFed if necessary. This also takes care of exceptions,
2690 # including if _PyString_Resize fails (that will free the string
2691 # itself and set ret_s to NULL, hence XDECREF).
2692 ref.Py_XDECREF(ret_s)
2693
2694 @requires(('key', str_type), ('cls', str_type), ('method', str_type), ('data', bytes))
2695 def execute(self, key, cls, method, data, length=8192):
2696 """
2697 Execute an OSD class method on an object.
2698
2699 :param key: name of the object
2700 :type key: str
2701 :param cls: name of the object class
2702 :type cls: str
2703 :param method: name of the method
2704 :type method: str
2705 :param data: input data
2706 :type data: bytes
2707 :param length: size of output buffer in bytes (default=8192)
2708 :type length: int
2709
2710 :raises: :class:`TypeError`
2711 :raises: :class:`Error`
2712 :returns: (ret, method output)
2713 """
2714 self.require_ioctx_open()
2715
2716 key = cstr(key, 'key')
2717 cls = cstr(cls, 'cls')
2718 method = cstr(method, 'method')
2719 cdef:
2720 char *_key = key
2721 char *_cls = cls
2722 char *_method = method
2723 char *_data = data
2724 size_t _data_len = len(data)
2725
2726 char *ref_buf
2727 size_t _length = length
2728 PyObject* ret_s = NULL
2729
2730 ret_s = PyBytes_FromStringAndSize(NULL, length)
2731 try:
2732 ret_buf = PyBytes_AsString(ret_s)
2733 with nogil:
2734 ret = rados_exec(self.io, _key, _cls, _method, _data,
2735 _data_len, ret_buf, _length)
2736 if ret < 0:
2737 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2738
2739 if ret != length:
2740 _PyBytes_Resize(&ret_s, ret)
2741
2742 return ret, <object>ret_s
2743 finally:
2744 # We DECREF unconditionally: the cast to object above will have
2745 # INCREFed if necessary. This also takes care of exceptions,
2746 # including if _PyString_Resize fails (that will free the string
2747 # itself and set ret_s to NULL, hence XDECREF).
2748 ref.Py_XDECREF(ret_s)
2749
2750 def get_stats(self):
2751 """
2752 Get pool usage statistics
2753
2754 :returns: dict - contains the following keys:
2755
2756 - ``num_bytes`` (int) - size of pool in bytes
2757
2758 - ``num_kb`` (int) - size of pool in kbytes
2759
2760 - ``num_objects`` (int) - number of objects in the pool
2761
2762 - ``num_object_clones`` (int) - number of object clones
2763
2764 - ``num_object_copies`` (int) - number of object copies
2765
2766 - ``num_objects_missing_on_primary`` (int) - number of objets
2767 missing on primary
2768
2769 - ``num_objects_unfound`` (int) - number of unfound objects
2770
2771 - ``num_objects_degraded`` (int) - number of degraded objects
2772
2773 - ``num_rd`` (int) - bytes read
2774
2775 - ``num_rd_kb`` (int) - kbytes read
2776
2777 - ``num_wr`` (int) - bytes written
2778
2779 - ``num_wr_kb`` (int) - kbytes written
2780 """
2781 self.require_ioctx_open()
2782 cdef rados_pool_stat_t stats
2783 with nogil:
2784 ret = rados_ioctx_pool_stat(self.io, &stats)
2785 if ret < 0:
2786 raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name)
2787 return {'num_bytes': stats.num_bytes,
2788 'num_kb': stats.num_kb,
2789 'num_objects': stats.num_objects,
2790 'num_object_clones': stats.num_object_clones,
2791 'num_object_copies': stats.num_object_copies,
2792 "num_objects_missing_on_primary": stats.num_objects_missing_on_primary,
2793 "num_objects_unfound": stats.num_objects_unfound,
2794 "num_objects_degraded": stats.num_objects_degraded,
2795 "num_rd": stats.num_rd,
2796 "num_rd_kb": stats.num_rd_kb,
2797 "num_wr": stats.num_wr,
2798 "num_wr_kb": stats.num_wr_kb}
2799
2800 @requires(('key', str_type))
2801 def remove_object(self, key):
2802 """
2803 Delete an object
2804
2805 This does not delete any snapshots of the object.
2806
2807 :param key: the name of the object to delete
2808 :type key: str
2809
2810 :raises: :class:`TypeError`
2811 :raises: :class:`Error`
2812 :returns: bool - True on success
2813 """
2814 self.require_ioctx_open()
2815 key = cstr(key, 'key')
2816 cdef:
2817 char *_key = key
2818
2819 with nogil:
2820 ret = rados_remove(self.io, _key)
2821 if ret < 0:
2822 raise make_ex(ret, "Failed to remove '%s'" % key)
2823 return True
2824
2825 @requires(('key', str_type))
2826 def trunc(self, key, size):
2827 """
2828 Resize an object
2829
2830 If this enlarges the object, the new area is logically filled with
2831 zeroes. If this shrinks the object, the excess data is removed.
2832
2833 :param key: the name of the object to resize
2834 :type key: str
2835 :param size: the new size of the object in bytes
2836 :type size: int
2837
2838 :raises: :class:`TypeError`
2839 :raises: :class:`Error`
2840 :returns: int - 0 on success, otherwise raises error
2841 """
2842
2843 self.require_ioctx_open()
2844 key = cstr(key, 'key')
2845 cdef:
2846 char *_key = key
2847 uint64_t _size = size
2848
2849 with nogil:
2850 ret = rados_trunc(self.io, _key, _size)
2851 if ret < 0:
2852 raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))
2853 return ret
2854
2855 @requires(('key', str_type))
2856 def stat(self, key):
2857 """
2858 Get object stats (size/mtime)
2859
2860 :param key: the name of the object to get stats from
2861 :type key: str
2862
2863 :raises: :class:`TypeError`
2864 :raises: :class:`Error`
2865 :returns: (size,timestamp)
2866 """
2867 self.require_ioctx_open()
2868
2869 key = cstr(key, 'key')
2870 cdef:
2871 char *_key = key
2872 uint64_t psize
2873 time_t pmtime
2874
2875 with nogil:
2876 ret = rados_stat(self.io, _key, &psize, &pmtime)
2877 if ret < 0:
2878 raise make_ex(ret, "Failed to stat %r" % key)
2879 return psize, time.localtime(pmtime)
2880
2881 @requires(('key', str_type), ('xattr_name', str_type))
2882 def get_xattr(self, key, xattr_name):
2883 """
2884 Get the value of an extended attribute on an object.
2885
2886 :param key: the name of the object to get xattr from
2887 :type key: str
2888 :param xattr_name: which extended attribute to read
2889 :type xattr_name: str
2890
2891 :raises: :class:`TypeError`
2892 :raises: :class:`Error`
2893 :returns: str - value of the xattr
2894 """
2895 self.require_ioctx_open()
2896
2897 key = cstr(key, 'key')
2898 xattr_name = cstr(xattr_name, 'xattr_name')
2899 cdef:
2900 char *_key = key
2901 char *_xattr_name = xattr_name
2902 size_t ret_length = 4096
2903 char *ret_buf = NULL
2904
2905 try:
2906 while ret_length < 4096 * 1024 * 1024:
2907 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
2908 with nogil:
2909 ret = rados_getxattr(self.io, _key, _xattr_name, ret_buf, ret_length)
2910 if ret == -errno.ERANGE:
2911 ret_length *= 2
2912 elif ret < 0:
2913 raise make_ex(ret, "Failed to get xattr %r" % xattr_name)
2914 else:
2915 break
2916 return ret_buf[:ret]
2917 finally:
2918 free(ret_buf)
2919
2920 @requires(('oid', str_type))
2921 def get_xattrs(self, oid):
2922 """
2923 Start iterating over xattrs on an object.
2924
2925 :param oid: the name of the object to get xattrs from
2926 :type oid: str
2927
2928 :raises: :class:`TypeError`
2929 :raises: :class:`Error`
2930 :returns: XattrIterator
2931 """
2932 self.require_ioctx_open()
2933 return XattrIterator(self, oid)
2934
2935 @requires(('key', str_type), ('xattr_name', str_type), ('xattr_value', bytes))
2936 def set_xattr(self, key, xattr_name, xattr_value):
2937 """
2938 Set an extended attribute on an object.
2939
2940 :param key: the name of the object to set xattr to
2941 :type key: str
2942 :param xattr_name: which extended attribute to set
2943 :type xattr_name: str
2944 :param xattr_value: the value of the extended attribute
2945 :type xattr_value: bytes
2946
2947 :raises: :class:`TypeError`
2948 :raises: :class:`Error`
2949 :returns: bool - True on success, otherwise raise an error
2950 """
2951 self.require_ioctx_open()
2952
2953 key = cstr(key, 'key')
2954 xattr_name = cstr(xattr_name, 'xattr_name')
2955 cdef:
2956 char *_key = key
2957 char *_xattr_name = xattr_name
2958 char *_xattr_value = xattr_value
2959 size_t _xattr_value_len = len(xattr_value)
2960
2961 with nogil:
2962 ret = rados_setxattr(self.io, _key, _xattr_name,
2963 _xattr_value, _xattr_value_len)
2964 if ret < 0:
2965 raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
2966 return True
2967
2968 @requires(('key', str_type), ('xattr_name', str_type))
2969 def rm_xattr(self, key, xattr_name):
2970 """
2971 Removes an extended attribute on from an object.
2972
2973 :param key: the name of the object to remove xattr from
2974 :type key: str
2975 :param xattr_name: which extended attribute to remove
2976 :type xattr_name: str
2977
2978 :raises: :class:`TypeError`
2979 :raises: :class:`Error`
2980 :returns: bool - True on success, otherwise raise an error
2981 """
2982 self.require_ioctx_open()
2983
2984 key = cstr(key, 'key')
2985 xattr_name = cstr(xattr_name, 'xattr_name')
2986 cdef:
2987 char *_key = key
2988 char *_xattr_name = xattr_name
2989
2990 with nogil:
2991 ret = rados_rmxattr(self.io, _key, _xattr_name)
2992 if ret < 0:
2993 raise make_ex(ret, "Failed to delete key %r xattr %r" %
2994 (key, xattr_name))
2995 return True
2996
2997 def list_objects(self):
2998 """
2999 Get ObjectIterator on rados.Ioctx object.
3000
3001 :returns: ObjectIterator
3002 """
3003 self.require_ioctx_open()
3004 return ObjectIterator(self)
3005
3006 def list_snaps(self):
3007 """
3008 Get SnapIterator on rados.Ioctx object.
3009
3010 :returns: SnapIterator
3011 """
3012 self.require_ioctx_open()
3013 return SnapIterator(self)
3014
3015 @requires(('snap_name', str_type))
3016 def create_snap(self, snap_name):
3017 """
3018 Create a pool-wide snapshot
3019
3020 :param snap_name: the name of the snapshot
3021 :type snap_name: str
3022
3023 :raises: :class:`TypeError`
3024 :raises: :class:`Error`
3025 """
3026 self.require_ioctx_open()
3027 snap_name = cstr(snap_name, 'snap_name')
3028 cdef char *_snap_name = snap_name
3029
3030 with nogil:
3031 ret = rados_ioctx_snap_create(self.io, _snap_name)
3032 if ret != 0:
3033 raise make_ex(ret, "Failed to create snap %s" % snap_name)
3034
3035 @requires(('snap_name', str_type))
3036 def remove_snap(self, snap_name):
3037 """
3038 Removes a pool-wide snapshot
3039
3040 :param snap_name: the name of the snapshot
3041 :type snap_name: str
3042
3043 :raises: :class:`TypeError`
3044 :raises: :class:`Error`
3045 """
3046 self.require_ioctx_open()
3047 snap_name = cstr(snap_name, 'snap_name')
3048 cdef char *_snap_name = snap_name
3049
3050 with nogil:
3051 ret = rados_ioctx_snap_remove(self.io, _snap_name)
3052 if ret != 0:
3053 raise make_ex(ret, "Failed to remove snap %s" % snap_name)
3054
3055 @requires(('snap_name', str_type))
3056 def lookup_snap(self, snap_name):
3057 """
3058 Get the id of a pool snapshot
3059
3060 :param snap_name: the name of the snapshot to lookop
3061 :type snap_name: str
3062
3063 :raises: :class:`TypeError`
3064 :raises: :class:`Error`
3065 :returns: Snap - on success
3066 """
3067 self.require_ioctx_open()
3068 csnap_name = cstr(snap_name, 'snap_name')
3069 cdef:
3070 char *_snap_name = csnap_name
3071 rados_snap_t snap_id
3072
3073 with nogil:
3074 ret = rados_ioctx_snap_lookup(self.io, _snap_name, &snap_id)
3075 if ret != 0:
3076 raise make_ex(ret, "Failed to lookup snap %s" % snap_name)
3077 return Snap(self, snap_name, int(snap_id))
3078
3079 @requires(('oid', str_type), ('snap_name', str_type))
3080 def snap_rollback(self, oid, snap_name):
3081 """
3082 Rollback an object to a snapshot
3083
3084 :param oid: the name of the object
3085 :type oid: str
3086 :param snap_name: the name of the snapshot
3087 :type snap_name: str
3088
3089 :raises: :class:`TypeError`
3090 :raises: :class:`Error`
3091 """
3092 self.require_ioctx_open()
3093 oid = cstr(oid, 'oid')
3094 snap_name = cstr(snap_name, 'snap_name')
3095 cdef:
3096 char *_snap_name = snap_name
3097 char *_oid = oid
3098
3099 with nogil:
3100 ret = rados_ioctx_snap_rollback(self.io, _oid, _snap_name)
3101 if ret != 0:
3102 raise make_ex(ret, "Failed to rollback %s" % oid)
3103
3104 def get_last_version(self):
3105 """
3106 Return the version of the last object read or written to.
3107
3108 This exposes the internal version number of the last object read or
3109 written via this io context
3110
3111 :returns: version of the last object used
3112 """
3113 self.require_ioctx_open()
3114 with nogil:
3115 ret = rados_get_last_version(self.io)
3116 return int(ret)
3117
3118 def create_write_op(self):
3119 """
3120 create write operation object.
3121 need call release_write_op after use
3122 """
3123 return WriteOp().create()
3124
3125 def create_read_op(self):
3126 """
3127 create read operation object.
3128 need call release_read_op after use
3129 """
3130 return ReadOp().create()
3131
3132 def release_write_op(self, write_op):
3133 """
3134 release memory alloc by create_write_op
3135 """
3136 write_op.release()
3137
3138 def release_read_op(self, read_op):
3139 """
3140 release memory alloc by create_read_op
3141 :para read_op: read_op object
3142 :type: int
3143 """
3144 read_op.release()
3145
3146 @requires(('write_op', WriteOp), ('keys', tuple), ('values', tuple))
3147 def set_omap(self, write_op, keys, values):
3148 """
3149 set keys values to write_op
3150 :para write_op: write_operation object
3151 :type write_op: WriteOp
3152 :para keys: a tuple of keys
3153 :type keys: tuple
3154 :para values: a tuple of values
3155 :type values: tuple
3156 """
3157
3158 if len(keys) != len(values):
3159 raise Error("Rados(): keys and values must have the same number of items")
3160
3161 keys = cstr_list(keys, 'keys')
3162 cdef:
3163 WriteOp _write_op = write_op
3164 size_t key_num = len(keys)
3165 char **_keys = to_bytes_array(keys)
3166 char **_values = to_bytes_array(values)
3167 size_t *_lens = to_csize_t_array([len(v) for v in values])
3168
3169 try:
3170 with nogil:
3171 rados_write_op_omap_set(_write_op.write_op,
3172 <const char**>_keys,
3173 <const char**>_values,
3174 <const size_t*>_lens, key_num)
3175 finally:
3176 free(_keys)
3177 free(_values)
3178 free(_lens)
3179
3180 @requires(('write_op', WriteOp), ('oid', str_type), ('mtime', opt(int)), ('flags', opt(int)))
3181 def operate_write_op(self, write_op, oid, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3182 """
3183 excute the real write operation
3184 :para write_op: write operation object
3185 :type write_op: WriteOp
3186 :para oid: object name
3187 :type oid: str
3188 :para mtime: the time to set the mtime to, 0 for the current time
3189 :type mtime: int
3190 :para flags: flags to apply to the entire operation
3191 :type flags: int
3192 """
3193
3194 oid = cstr(oid, 'oid')
3195 cdef:
3196 WriteOp _write_op = write_op
3197 char *_oid = oid
3198 time_t _mtime = mtime
3199 int _flags = flags
3200
3201 with nogil:
3202 ret = rados_write_op_operate(_write_op.write_op, self.io, _oid, &_mtime, _flags)
3203 if ret != 0:
3204 raise make_ex(ret, "Failed to operate write op for oid %s" % oid)
3205
3206 @requires(('write_op', WriteOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('mtime', opt(int)), ('flags', opt(int)))
3207 def operate_aio_write_op(self, write_op, oid, oncomplete=None, onsafe=None, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3208 """
3209 excute the real write operation asynchronously
3210 :para write_op: write operation object
3211 :type write_op: WriteOp
3212 :para oid: object name
3213 :type oid: str
3214 :param oncomplete: what to do when the remove is safe and complete in memory
3215 on all replicas
3216 :type oncomplete: completion
3217 :param onsafe: what to do when the remove is safe and complete on storage
3218 on all replicas
3219 :type onsafe: completion
3220 :para mtime: the time to set the mtime to, 0 for the current time
3221 :type mtime: int
3222 :para flags: flags to apply to the entire operation
3223 :type flags: int
3224
3225 :raises: :class:`Error`
3226 :returns: completion object
3227 """
3228
3229 oid = cstr(oid, 'oid')
3230 cdef:
3231 WriteOp _write_op = write_op
3232 char *_oid = oid
3233 Completion completion
3234 time_t _mtime = mtime
3235 int _flags = flags
3236
3237 completion = self.__get_completion(oncomplete, onsafe)
3238 self.__track_completion(completion)
3239
3240 with nogil:
3241 ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid,
3242 &_mtime, _flags)
3243 if ret != 0:
3244 completion._cleanup()
3245 raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid)
3246 return completion
3247
3248 @requires(('read_op', ReadOp), ('oid', str_type), ('flag', opt(int)))
3249 def operate_read_op(self, read_op, oid, flag=LIBRADOS_OPERATION_NOFLAG):
3250 """
3251 excute the real read operation
3252 :para read_op: read operation object
3253 :type read_op: ReadOp
3254 :para oid: object name
3255 :type oid: str
3256 :para flag: flags to apply to the entire operation
3257 :type flag: int
3258 """
3259 oid = cstr(oid, 'oid')
3260 cdef:
3261 ReadOp _read_op = read_op
3262 char *_oid = oid
3263 int _flag = flag
3264
3265 with nogil:
3266 ret = rados_read_op_operate(_read_op.read_op, self.io, _oid, _flag)
3267 if ret != 0:
3268 raise make_ex(ret, "Failed to operate read op for oid %s" % oid)
3269
3270 @requires(('read_op', ReadOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('flag', opt(int)))
3271 def operate_aio_read_op(self, read_op, oid, oncomplete=None, onsafe=None, flag=LIBRADOS_OPERATION_NOFLAG):
3272 """
3273 excute the real read operation
3274 :para read_op: read operation object
3275 :type read_op: ReadOp
3276 :para oid: object name
3277 :type oid: str
3278 :param oncomplete: what to do when the remove is safe and complete in memory
3279 on all replicas
3280 :type oncomplete: completion
3281 :param onsafe: what to do when the remove is safe and complete on storage
3282 on all replicas
3283 :type onsafe: completion
3284 :para flag: flags to apply to the entire operation
3285 :type flag: int
3286 """
3287 oid = cstr(oid, 'oid')
3288 cdef:
3289 ReadOp _read_op = read_op
3290 char *_oid = oid
3291 Completion completion
3292 int _flag = flag
3293
3294 completion = self.__get_completion(oncomplete, onsafe)
3295 self.__track_completion(completion)
3296
3297 with nogil:
3298 ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag)
3299 if ret != 0:
3300 completion._cleanup()
3301 raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid)
3302 return completion
3303
3304 @requires(('read_op', ReadOp), ('start_after', str_type), ('filter_prefix', str_type), ('max_return', int))
3305 def get_omap_vals(self, read_op, start_after, filter_prefix, max_return):
3306 """
3307 get the omap values
3308 :para read_op: read operation object
3309 :type read_op: ReadOp
3310 :para start_after: list keys starting after start_after
3311 :type start_after: str
3312 :para filter_prefix: list only keys beginning with filter_prefix
3313 :type filter_prefix: str
3314 :para max_return: list no more than max_return key/value pairs
3315 :type max_return: int
3316 :returns: an iterator over the requested omap values, return value from this action
3317 """
3318
3319 start_after = cstr(start_after, 'start_after') if start_after else None
3320 filter_prefix = cstr(filter_prefix, 'filter_prefix') if filter_prefix else None
3321 cdef:
3322 char *_start_after = opt_str(start_after)
3323 char *_filter_prefix = opt_str(filter_prefix)
3324 ReadOp _read_op = read_op
3325 rados_omap_iter_t iter_addr = NULL
3326 int _max_return = max_return
3327 int prval = 0
3328
3329 with nogil:
3330 rados_read_op_omap_get_vals(_read_op.read_op, _start_after, _filter_prefix,
3331 _max_return, &iter_addr, &prval)
3332 it = OmapIterator(self)
3333 it.ctx = iter_addr
3334 return it, int(prval)
3335
3336 @requires(('read_op', ReadOp), ('start_after', str_type), ('max_return', int))
3337 def get_omap_keys(self, read_op, start_after, max_return):
3338 """
3339 get the omap keys
3340 :para read_op: read operation object
3341 :type read_op: ReadOp
3342 :para start_after: list keys starting after start_after
3343 :type start_after: str
3344 :para max_return: list no more than max_return key/value pairs
3345 :type max_return: int
3346 :returns: an iterator over the requested omap values, return value from this action
3347 """
3348 start_after = cstr(start_after, 'start_after') if start_after else None
3349 cdef:
3350 char *_start_after = opt_str(start_after)
3351 ReadOp _read_op = read_op
3352 rados_omap_iter_t iter_addr = NULL
3353 int _max_return = max_return
3354 int prval = 0
3355
3356 with nogil:
3357 rados_read_op_omap_get_keys(_read_op.read_op, _start_after,
3358 _max_return, &iter_addr, &prval)
3359 it = OmapIterator(self)
3360 it.ctx = iter_addr
3361 return it, int(prval)
3362
3363 @requires(('read_op', ReadOp), ('keys', tuple))
3364 def get_omap_vals_by_keys(self, read_op, keys):
3365 """
3366 get the omap values by keys
3367 :para read_op: read operation object
3368 :type read_op: ReadOp
3369 :para keys: input key tuple
3370 :type keys: tuple
3371 :returns: an iterator over the requested omap values, return value from this action
3372 """
3373 keys = cstr_list(keys, 'keys')
3374 cdef:
3375 ReadOp _read_op = read_op
3376 rados_omap_iter_t iter_addr
3377 char **_keys = to_bytes_array(keys)
3378 size_t key_num = len(keys)
3379 int prval = 0
3380
3381 try:
3382 with nogil:
3383 rados_read_op_omap_get_vals_by_keys(_read_op.read_op,
3384 <const char**>_keys,
3385 key_num, &iter_addr, &prval)
3386 it = OmapIterator(self)
3387 it.ctx = iter_addr
3388 return it, int(prval)
3389 finally:
3390 free(_keys)
3391
3392 @requires(('write_op', WriteOp), ('keys', tuple))
3393 def remove_omap_keys(self, write_op, keys):
3394 """
3395 remove omap keys specifiled
3396 :para write_op: write operation object
3397 :type write_op: WriteOp
3398 :para keys: input key tuple
3399 :type keys: tuple
3400 """
3401
3402 keys = cstr_list(keys, 'keys')
3403 cdef:
3404 WriteOp _write_op = write_op
3405 size_t key_num = len(keys)
3406 char **_keys = to_bytes_array(keys)
3407
3408 try:
3409 with nogil:
3410 rados_write_op_omap_rm_keys(_write_op.write_op, <const char**>_keys, key_num)
3411 finally:
3412 free(_keys)
3413
3414 @requires(('write_op', WriteOp))
3415 def clear_omap(self, write_op):
3416 """
3417 Remove all key/value pairs from an object
3418 :para write_op: write operation object
3419 :type write_op: WriteOp
3420 """
3421
3422 cdef:
3423 WriteOp _write_op = write_op
3424
3425 with nogil:
3426 rados_write_op_omap_clear(_write_op.write_op)
3427
3428 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('desc', str_type),
3429 ('duration', opt(int)), ('flags', int))
3430 def lock_exclusive(self, key, name, cookie, desc="", duration=None, flags=0):
3431
3432 """
3433 Take an exclusive lock on an object
3434
3435 :param key: name of the object
3436 :type key: str
3437 :param name: name of the lock
3438 :type name: str
3439 :param cookie: cookie of the lock
3440 :type cookie: str
3441 :param desc: description of the lock
3442 :type desc: str
3443 :param duration: duration of the lock in seconds
3444 :type duration: int
3445 :param flags: flags
3446 :type flags: int
3447
3448 :raises: :class:`TypeError`
3449 :raises: :class:`Error`
3450 """
3451 self.require_ioctx_open()
3452
3453 key = cstr(key, 'key')
3454 name = cstr(name, 'name')
3455 cookie = cstr(cookie, 'cookie')
3456 desc = cstr(desc, 'desc')
3457
3458 cdef:
3459 char* _key = key
3460 char* _name = name
3461 char* _cookie = cookie
3462 char* _desc = desc
3463 uint8_t _flags = flags
3464 timeval _duration
3465
3466 if duration is None:
3467 with nogil:
3468 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3469 NULL, _flags)
3470 else:
3471 _duration.tv_sec = duration
3472 with nogil:
3473 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3474 &_duration, _flags)
3475
3476 if ret < 0:
3477 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3478
3479 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('tag', str_type),
3480 ('desc', str_type), ('duration', opt(int)), ('flags', int))
3481 def lock_shared(self, key, name, cookie, tag, desc="", duration=None, flags=0):
3482
3483 """
3484 Take a shared lock on an object
3485
3486 :param key: name of the object
3487 :type key: str
3488 :param name: name of the lock
3489 :type name: str
3490 :param cookie: cookie of the lock
3491 :type cookie: str
3492 :param tag: tag of the lock
3493 :type tag: str
3494 :param desc: description of the lock
3495 :type desc: str
3496 :param duration: duration of the lock in seconds
3497 :type duration: int
3498 :param flags: flags
3499 :type flags: int
3500
3501 :raises: :class:`TypeError`
3502 :raises: :class:`Error`
3503 """
3504 self.require_ioctx_open()
3505
3506 key = cstr(key, 'key')
3507 tag = cstr(tag, 'tag')
3508 name = cstr(name, 'name')
3509 cookie = cstr(cookie, 'cookie')
3510 desc = cstr(desc, 'desc')
3511
3512 cdef:
3513 char* _key = key
3514 char* _tag = tag
3515 char* _name = name
3516 char* _cookie = cookie
3517 char* _desc = desc
3518 uint8_t _flags = flags
3519 timeval _duration
3520
3521 if duration is None:
3522 with nogil:
3523 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3524 NULL, _flags)
3525 else:
3526 _duration.tv_sec = duration
3527 with nogil:
3528 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3529 &_duration, _flags)
3530 if ret < 0:
3531 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3532
3533 @requires(('key', str_type), ('name', str_type), ('cookie', str_type))
3534 def unlock(self, key, name, cookie):
3535
3536 """
3537 Release a shared or exclusive lock on an object
3538
3539 :param key: name of the object
3540 :type key: str
3541 :param name: name of the lock
3542 :type name: str
3543 :param cookie: cookie of the lock
3544 :type cookie: str
3545
3546 :raises: :class:`TypeError`
3547 :raises: :class:`Error`
3548 """
3549 self.require_ioctx_open()
3550
3551 key = cstr(key, 'key')
3552 name = cstr(name, 'name')
3553 cookie = cstr(cookie, 'cookie')
3554
3555 cdef:
3556 char* _key = key
3557 char* _name = name
3558 char* _cookie = cookie
3559
3560 with nogil:
3561 ret = rados_unlock(self.io, _key, _name, _cookie)
3562 if ret < 0:
3563 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3564
3565
3566 def set_object_locator(func):
3567 def retfunc(self, *args, **kwargs):
3568 if self.locator_key is not None:
3569 old_locator = self.ioctx.get_locator_key()
3570 self.ioctx.set_locator_key(self.locator_key)
3571 retval = func(self, *args, **kwargs)
3572 self.ioctx.set_locator_key(old_locator)
3573 return retval
3574 else:
3575 return func(self, *args, **kwargs)
3576 return retfunc
3577
3578
3579 def set_object_namespace(func):
3580 def retfunc(self, *args, **kwargs):
3581 if self.nspace is None:
3582 raise LogicError("Namespace not set properly in context")
3583 old_nspace = self.ioctx.get_namespace()
3584 self.ioctx.set_namespace(self.nspace)
3585 retval = func(self, *args, **kwargs)
3586 self.ioctx.set_namespace(old_nspace)
3587 return retval
3588 return retfunc
3589
3590
3591 class Object(object):
3592 """Rados object wrapper, makes the object look like a file"""
3593 def __init__(self, ioctx, key, locator_key=None, nspace=None):
3594 self.key = key
3595 self.ioctx = ioctx
3596 self.offset = 0
3597 self.state = "exists"
3598 self.locator_key = locator_key
3599 self.nspace = "" if nspace is None else nspace
3600
3601 def __str__(self):
3602 return "rados.Object(ioctx=%s,key=%s,nspace=%s,locator=%s)" % \
3603 (str(self.ioctx), self.key, "--default--"
3604 if self.nspace is "" else self.nspace, self.locator_key)
3605
3606 def require_object_exists(self):
3607 if self.state != "exists":
3608 raise ObjectStateError("The object is %s" % self.state)
3609
3610 @set_object_locator
3611 @set_object_namespace
3612 def read(self, length=1024 * 1024):
3613 self.require_object_exists()
3614 ret = self.ioctx.read(self.key, length, self.offset)
3615 self.offset += len(ret)
3616 return ret
3617
3618 @set_object_locator
3619 @set_object_namespace
3620 def write(self, string_to_write):
3621 self.require_object_exists()
3622 ret = self.ioctx.write(self.key, string_to_write, self.offset)
3623 if ret == 0:
3624 self.offset += len(string_to_write)
3625 return ret
3626
3627 @set_object_locator
3628 @set_object_namespace
3629 def remove(self):
3630 self.require_object_exists()
3631 self.ioctx.remove_object(self.key)
3632 self.state = "removed"
3633
3634 @set_object_locator
3635 @set_object_namespace
3636 def stat(self):
3637 self.require_object_exists()
3638 return self.ioctx.stat(self.key)
3639
3640 def seek(self, position):
3641 self.require_object_exists()
3642 self.offset = position
3643
3644 @set_object_locator
3645 @set_object_namespace
3646 def get_xattr(self, xattr_name):
3647 self.require_object_exists()
3648 return self.ioctx.get_xattr(self.key, xattr_name)
3649
3650 @set_object_locator
3651 @set_object_namespace
3652 def get_xattrs(self):
3653 self.require_object_exists()
3654 return self.ioctx.get_xattrs(self.key)
3655
3656 @set_object_locator
3657 @set_object_namespace
3658 def set_xattr(self, xattr_name, xattr_value):
3659 self.require_object_exists()
3660 return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)
3661
3662 @set_object_locator
3663 @set_object_namespace
3664 def rm_xattr(self, xattr_name):
3665 self.require_object_exists()
3666 return self.ioctx.rm_xattr(self.key, xattr_name)
3667
3668 MONITOR_LEVELS = [
3669 "debug",
3670 "info",
3671 "warn", "warning",
3672 "err", "error",
3673 "sec",
3674 ]
3675
3676
3677 class MonitorLog(object):
3678 # NOTE(sileht): Keep this class for backward compat
3679 # method moved to Rados.monitor_log()
3680 """
3681 For watching cluster log messages. Instantiate an object and keep
3682 it around while callback is periodically called. Construct with
3683 'level' to monitor 'level' messages (one of MONITOR_LEVELS).
3684 arg will be passed to the callback.
3685
3686 callback will be called with:
3687 arg (given to __init__)
3688 line (the full line, including timestamp, who, level, msg)
3689 who (which entity issued the log message)
3690 timestamp_sec (sec of a struct timespec)
3691 timestamp_nsec (sec of a struct timespec)
3692 seq (sequence number)
3693 level (string representing the level of the log message)
3694 msg (the message itself)
3695 callback's return value is ignored
3696 """
3697 def __init__(self, cluster, level, callback, arg):
3698 self.level = level
3699 self.callback = callback
3700 self.arg = arg
3701 self.cluster = cluster
3702 self.cluster.monitor_log(level, callback, arg)
3703