]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/rados/rados.pyx
update sources to 12.2.7
[ceph.git] / ceph / src / pybind / rados / rados.pyx
1 # cython: embedsignature=True
2 """
3 This module is a thin wrapper around librados.
4
5 Error codes from librados are turned into exceptions that subclass
6 :class:`Error`. Almost all methods may raise :class:`Error(the base class of all rados exceptions), :class:`PermissionError`
7 (the base class of all rados exceptions), :class:`PermissionError`
8 and :class:`IOError`, in addition to those documented for the
9 method.
10 """
11 # Copyright 2011 Josh Durgin
12 # Copyright 2011, Hannu Valtonen <hannu.valtonen@ormod.com>
13 # Copyright 2015 Hector Martin <marcan@marcan.st>
14 # Copyright 2016 Mehdi Abaakouk <sileht@redhat.com>
15
16 from cpython cimport PyObject, ref
17 from cpython.pycapsule cimport *
18 from libc cimport errno
19 from libc.stdint cimport *
20 from libc.stdlib cimport malloc, realloc, free
21
22 import sys
23 import threading
24 import time
25
26 from collections import Callable
27 from datetime import datetime
28 from functools import partial, wraps
29 from itertools import chain
30
31 # Are we running Python 2.x
32 if sys.version_info[0] < 3:
33 str_type = basestring
34 else:
35 str_type = str
36
37
38 cdef extern from "Python.h":
39 # These are in cpython/string.pxd, but use "object" types instead of
40 # PyObject*, which invokes assumptions in cpython that we need to
41 # legitimately break to implement zero-copy string buffers in Ioctx.read().
42 # This is valid use of the Python API and documented as a special case.
43 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
44 char* PyBytes_AsString(PyObject *string) except NULL
45 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
46 void PyEval_InitThreads()
47
48
49 cdef extern from "time.h":
50 ctypedef long int time_t
51 ctypedef long int suseconds_t
52
53
54 cdef extern from "sys/time.h":
55 cdef struct timeval:
56 time_t tv_sec
57 suseconds_t tv_usec
58
59
60 cdef extern from "rados/rados_types.h" nogil:
61 cdef char* _LIBRADOS_ALL_NSPACES "LIBRADOS_ALL_NSPACES"
62
63
64 cdef extern from "rados/librados.h" nogil:
65 enum:
66 _LIBRADOS_OP_FLAG_EXCL "LIBRADOS_OP_FLAG_EXCL"
67 _LIBRADOS_OP_FLAG_FAILOK "LIBRADOS_OP_FLAG_FAILOK"
68 _LIBRADOS_OP_FLAG_FADVISE_RANDOM "LIBRADOS_OP_FLAG_FADVISE_RANDOM"
69 _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL "LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL"
70 _LIBRADOS_OP_FLAG_FADVISE_WILLNEED "LIBRADOS_OP_FLAG_FADVISE_WILLNEED"
71 _LIBRADOS_OP_FLAG_FADVISE_DONTNEED "LIBRADOS_OP_FLAG_FADVISE_DONTNEED"
72 _LIBRADOS_OP_FLAG_FADVISE_NOCACHE "LIBRADOS_OP_FLAG_FADVISE_NOCACHE"
73
74
75 enum:
76 _LIBRADOS_OPERATION_NOFLAG "LIBRADOS_OPERATION_NOFLAG"
77 _LIBRADOS_OPERATION_BALANCE_READS "LIBRADOS_OPERATION_BALANCE_READS"
78 _LIBRADOS_OPERATION_LOCALIZE_READS "LIBRADOS_OPERATION_LOCALIZE_READS"
79 _LIBRADOS_OPERATION_ORDER_READS_WRITES "LIBRADOS_OPERATION_ORDER_READS_WRITES"
80 _LIBRADOS_OPERATION_IGNORE_CACHE "LIBRADOS_OPERATION_IGNORE_CACHE"
81 _LIBRADOS_OPERATION_SKIPRWLOCKS "LIBRADOS_OPERATION_SKIPRWLOCKS"
82 _LIBRADOS_OPERATION_IGNORE_OVERLAY "LIBRADOS_OPERATION_IGNORE_OVERLAY"
83 _LIBRADOS_CREATE_EXCLUSIVE "LIBRADOS_CREATE_EXCLUSIVE"
84 _LIBRADOS_CREATE_IDEMPOTENT "LIBRADOS_CREATE_IDEMPOTENT"
85
86 cdef uint64_t _LIBRADOS_SNAP_HEAD "LIBRADOS_SNAP_HEAD"
87
88 ctypedef void* rados_t
89 ctypedef void* rados_config_t
90 ctypedef void* rados_ioctx_t
91 ctypedef void* rados_xattrs_iter_t
92 ctypedef void* rados_omap_iter_t
93 ctypedef void* rados_list_ctx_t
94 ctypedef uint64_t rados_snap_t
95 ctypedef void *rados_write_op_t
96 ctypedef void *rados_read_op_t
97 ctypedef void *rados_completion_t
98 ctypedef void (*rados_callback_t)(rados_completion_t cb, void *arg)
99 ctypedef void (*rados_log_callback_t)(void *arg, const char *line, const char *who,
100 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
101 ctypedef void (*rados_log_callback2_t)(void *arg, const char *line, const char *channel, const char *who, const char *name,
102 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
103
104
105 cdef struct rados_cluster_stat_t:
106 uint64_t kb
107 uint64_t kb_used
108 uint64_t kb_avail
109 uint64_t num_objects
110
111 cdef struct rados_pool_stat_t:
112 uint64_t num_bytes
113 uint64_t num_kb
114 uint64_t num_objects
115 uint64_t num_object_clones
116 uint64_t num_object_copies
117 uint64_t num_objects_missing_on_primary
118 uint64_t num_objects_unfound
119 uint64_t num_objects_degraded
120 uint64_t num_rd
121 uint64_t num_rd_kb
122 uint64_t num_wr
123 uint64_t num_wr_kb
124
125 void rados_buffer_free(char *buf)
126
127 void rados_version(int *major, int *minor, int *extra)
128 int rados_create2(rados_t *pcluster, const char *const clustername,
129 const char * const name, uint64_t flags)
130 int rados_create_with_context(rados_t *cluster, rados_config_t cct)
131 int rados_connect(rados_t cluster)
132 void rados_shutdown(rados_t cluster)
133 int rados_conf_read_file(rados_t cluster, const char *path)
134 int rados_conf_parse_argv_remainder(rados_t cluster, int argc, const char **argv, const char **remargv)
135 int rados_conf_parse_env(rados_t cluster, const char *var)
136 int rados_conf_set(rados_t cluster, char *option, const char *value)
137 int rados_conf_get(rados_t cluster, char *option, char *buf, size_t len)
138
139 int rados_ioctx_pool_stat(rados_ioctx_t io, rados_pool_stat_t *stats)
140 int64_t rados_pool_lookup(rados_t cluster, const char *pool_name)
141 int rados_pool_reverse_lookup(rados_t cluster, int64_t id, char *buf, size_t maxlen)
142 int rados_pool_create(rados_t cluster, const char *pool_name)
143 int rados_pool_create_with_auid(rados_t cluster, const char *pool_name, uint64_t auid)
144 int rados_pool_create_with_crush_rule(rados_t cluster, const char *pool_name, uint8_t crush_rule_num)
145 int rados_pool_create_with_all(rados_t cluster, const char *pool_name, uint64_t auid, uint8_t crush_rule_num)
146 int rados_pool_get_base_tier(rados_t cluster, int64_t pool, int64_t *base_tier)
147 int rados_pool_list(rados_t cluster, char *buf, size_t len)
148 int rados_pool_delete(rados_t cluster, const char *pool_name)
149 int rados_inconsistent_pg_list(rados_t cluster, int64_t pool, char *buf, size_t len)
150
151 int rados_cluster_stat(rados_t cluster, rados_cluster_stat_t *result)
152 int rados_cluster_fsid(rados_t cluster, char *buf, size_t len)
153 int rados_blacklist_add(rados_t cluster, char *client_address, uint32_t expire_seconds)
154 int rados_application_enable(rados_ioctx_t io, const char *app_name,
155 int force)
156 int rados_application_list(rados_ioctx_t io, char *values,
157 size_t *values_len)
158 int rados_application_metadata_get(rados_ioctx_t io, const char *app_name,
159 const char *key, char *value,
160 size_t *value_len)
161 int rados_application_metadata_set(rados_ioctx_t io, const char *app_name,
162 const char *key, const char *value)
163 int rados_application_metadata_remove(rados_ioctx_t io,
164 const char *app_name, const char *key)
165 int rados_application_metadata_list(rados_ioctx_t io,
166 const char *app_name, char *keys,
167 size_t *key_len, char *values,
168 size_t *value_len)
169 int rados_ping_monitor(rados_t cluster, const char *mon_id, char **outstr, size_t *outstrlen)
170 int rados_mon_command(rados_t cluster, const char **cmd, size_t cmdlen,
171 const char *inbuf, size_t inbuflen,
172 char **outbuf, size_t *outbuflen,
173 char **outs, size_t *outslen)
174 int rados_mgr_command(rados_t cluster, const char **cmd, size_t cmdlen,
175 const char *inbuf, size_t inbuflen,
176 char **outbuf, size_t *outbuflen,
177 char **outs, size_t *outslen)
178 int rados_mon_command_target(rados_t cluster, const char *name, const char **cmd, size_t cmdlen,
179 const char *inbuf, size_t inbuflen,
180 char **outbuf, size_t *outbuflen,
181 char **outs, size_t *outslen)
182 int rados_osd_command(rados_t cluster, int osdid, const char **cmd, size_t cmdlen,
183 const char *inbuf, size_t inbuflen,
184 char **outbuf, size_t *outbuflen,
185 char **outs, size_t *outslen)
186 int rados_pg_command(rados_t cluster, const char *pgstr, const char **cmd, size_t cmdlen,
187 const char *inbuf, size_t inbuflen,
188 char **outbuf, size_t *outbuflen,
189 char **outs, size_t *outslen)
190 int rados_monitor_log(rados_t cluster, const char *level, rados_log_callback_t cb, void *arg)
191 int rados_monitor_log2(rados_t cluster, const char *level, rados_log_callback2_t cb, void *arg)
192
193 int rados_wait_for_latest_osdmap(rados_t cluster)
194
195 int rados_ioctx_create(rados_t cluster, const char *pool_name, rados_ioctx_t *ioctx)
196 void rados_ioctx_destroy(rados_ioctx_t io)
197 int rados_ioctx_pool_set_auid(rados_ioctx_t io, uint64_t auid)
198 void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key)
199 void rados_ioctx_set_namespace(rados_ioctx_t io, const char * nspace)
200
201 uint64_t rados_get_last_version(rados_ioctx_t io)
202 int rados_stat(rados_ioctx_t io, const char *o, uint64_t *psize, time_t *pmtime)
203 int rados_write(rados_ioctx_t io, const char *oid, const char *buf, size_t len, uint64_t off)
204 int rados_write_full(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
205 int rados_append(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
206 int rados_read(rados_ioctx_t io, const char *oid, char *buf, size_t len, uint64_t off)
207 int rados_remove(rados_ioctx_t io, const char *oid)
208 int rados_trunc(rados_ioctx_t io, const char *oid, uint64_t size)
209 int rados_getxattr(rados_ioctx_t io, const char *o, const char *name, char *buf, size_t len)
210 int rados_setxattr(rados_ioctx_t io, const char *o, const char *name, const char *buf, size_t len)
211 int rados_rmxattr(rados_ioctx_t io, const char *o, const char *name)
212 int rados_getxattrs(rados_ioctx_t io, const char *oid, rados_xattrs_iter_t *iter)
213 int rados_getxattrs_next(rados_xattrs_iter_t iter, const char **name, const char **val, size_t *len)
214 void rados_getxattrs_end(rados_xattrs_iter_t iter)
215
216 int rados_nobjects_list_open(rados_ioctx_t io, rados_list_ctx_t *ctx)
217 int rados_nobjects_list_next(rados_list_ctx_t ctx, const char **entry, const char **key, const char **nspace)
218 void rados_nobjects_list_close(rados_list_ctx_t ctx)
219
220 int rados_ioctx_snap_rollback(rados_ioctx_t io, const char * oid, const char * snapname)
221 int rados_ioctx_snap_create(rados_ioctx_t io, const char * snapname)
222 int rados_ioctx_snap_remove(rados_ioctx_t io, const char * snapname)
223 int rados_ioctx_snap_lookup(rados_ioctx_t io, const char * name, rados_snap_t * id)
224 int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id, char * name, int maxlen)
225 void rados_ioctx_snap_set_read(rados_ioctx_t io, rados_snap_t snap)
226 int rados_ioctx_snap_list(rados_ioctx_t io, rados_snap_t * snaps, int maxlen)
227 int rados_ioctx_snap_get_stamp(rados_ioctx_t io, rados_snap_t id, time_t * t)
228
229 int rados_ioctx_selfmanaged_snap_create(rados_ioctx_t io,
230 rados_snap_t *snapid)
231 int rados_ioctx_selfmanaged_snap_remove(rados_ioctx_t io,
232 rados_snap_t snapid)
233 int rados_ioctx_selfmanaged_snap_set_write_ctx(rados_ioctx_t io,
234 rados_snap_t snap_seq,
235 rados_snap_t *snap,
236 int num_snaps)
237 int rados_ioctx_selfmanaged_snap_rollback(rados_ioctx_t io, const char *oid,
238 rados_snap_t snapid)
239
240 int rados_lock_exclusive(rados_ioctx_t io, const char * oid, const char * name,
241 const char * cookie, const char * desc,
242 timeval * duration, uint8_t flags)
243 int rados_lock_shared(rados_ioctx_t io, const char * o, const char * name,
244 const char * cookie, const char * tag, const char * desc,
245 timeval * duration, uint8_t flags)
246 int rados_unlock(rados_ioctx_t io, const char * o, const char * name, const char * cookie)
247
248 rados_write_op_t rados_create_write_op()
249 void rados_release_write_op(rados_write_op_t write_op)
250
251 rados_read_op_t rados_create_read_op()
252 void rados_release_read_op(rados_read_op_t read_op)
253
254 int rados_aio_create_completion(void * cb_arg, rados_callback_t cb_complete, rados_callback_t cb_safe, rados_completion_t * pc)
255 void rados_aio_release(rados_completion_t c)
256 int rados_aio_stat(rados_ioctx_t io, const char *oid, rados_completion_t completion, uint64_t *psize, time_t *pmtime)
257 int rados_aio_write(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len, uint64_t off)
258 int rados_aio_append(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
259 int rados_aio_write_full(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
260 int rados_aio_remove(rados_ioctx_t io, const char * oid, rados_completion_t completion)
261 int rados_aio_read(rados_ioctx_t io, const char * oid, rados_completion_t completion, char * buf, size_t len, uint64_t off)
262 int rados_aio_flush(rados_ioctx_t io)
263
264 int rados_aio_get_return_value(rados_completion_t c)
265 int rados_aio_wait_for_complete_and_cb(rados_completion_t c)
266 int rados_aio_wait_for_safe_and_cb(rados_completion_t c)
267 int rados_aio_wait_for_complete(rados_completion_t c)
268 int rados_aio_wait_for_safe(rados_completion_t c)
269 int rados_aio_is_complete(rados_completion_t c)
270 int rados_aio_is_safe(rados_completion_t c)
271
272 int rados_exec(rados_ioctx_t io, const char * oid, const char * cls, const char * method,
273 const char * in_buf, size_t in_len, char * buf, size_t out_len)
274 int rados_aio_exec(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * cls, const char * method,
275 const char * in_buf, size_t in_len, char * buf, size_t out_len)
276
277 int rados_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, const char * oid, time_t * mtime, int flags)
278 int rados_aio_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, time_t *mtime, int flags)
279 void rados_write_op_omap_set(rados_write_op_t write_op, const char * const* keys, const char * const* vals, const size_t * lens, size_t num)
280 void rados_write_op_omap_rm_keys(rados_write_op_t write_op, const char * const* keys, size_t keys_len)
281 void rados_write_op_omap_clear(rados_write_op_t write_op)
282 void rados_write_op_set_flags(rados_write_op_t write_op, int flags)
283
284 void rados_write_op_create(rados_write_op_t write_op, int exclusive, const char *category)
285 void rados_write_op_append(rados_write_op_t write_op, const char *buffer, size_t len)
286 void rados_write_op_write_full(rados_write_op_t write_op, const char *buffer, size_t len)
287 void rados_write_op_write(rados_write_op_t write_op, const char *buffer, size_t len, uint64_t offset)
288 void rados_write_op_remove(rados_write_op_t write_op)
289 void rados_write_op_truncate(rados_write_op_t write_op, uint64_t offset)
290 void rados_write_op_zero(rados_write_op_t write_op, uint64_t offset, uint64_t len)
291
292 void rados_read_op_omap_get_vals2(rados_read_op_t read_op, const char * start_after, const char * filter_prefix, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
293 void rados_read_op_omap_get_keys2(rados_read_op_t read_op, const char * start_after, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
294 void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op, const char * const* keys, size_t keys_len, rados_omap_iter_t * iter, int * prval)
295 int rados_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, const char * oid, int flags)
296 int rados_aio_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, int flags)
297 void rados_read_op_set_flags(rados_read_op_t read_op, int flags)
298 int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len)
299 void rados_omap_get_end(rados_omap_iter_t iter)
300
301
302 LIBRADOS_OP_FLAG_EXCL = _LIBRADOS_OP_FLAG_EXCL
303 LIBRADOS_OP_FLAG_FAILOK = _LIBRADOS_OP_FLAG_FAILOK
304 LIBRADOS_OP_FLAG_FADVISE_RANDOM = _LIBRADOS_OP_FLAG_FADVISE_RANDOM
305 LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL = _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL
306 LIBRADOS_OP_FLAG_FADVISE_WILLNEED = _LIBRADOS_OP_FLAG_FADVISE_WILLNEED
307 LIBRADOS_OP_FLAG_FADVISE_DONTNEED = _LIBRADOS_OP_FLAG_FADVISE_DONTNEED
308 LIBRADOS_OP_FLAG_FADVISE_NOCACHE = _LIBRADOS_OP_FLAG_FADVISE_NOCACHE
309
310 LIBRADOS_SNAP_HEAD = _LIBRADOS_SNAP_HEAD
311
312 LIBRADOS_OPERATION_NOFLAG = _LIBRADOS_OPERATION_NOFLAG
313 LIBRADOS_OPERATION_BALANCE_READS = _LIBRADOS_OPERATION_BALANCE_READS
314 LIBRADOS_OPERATION_LOCALIZE_READS = _LIBRADOS_OPERATION_LOCALIZE_READS
315 LIBRADOS_OPERATION_ORDER_READS_WRITES = _LIBRADOS_OPERATION_ORDER_READS_WRITES
316 LIBRADOS_OPERATION_IGNORE_CACHE = _LIBRADOS_OPERATION_IGNORE_CACHE
317 LIBRADOS_OPERATION_SKIPRWLOCKS = _LIBRADOS_OPERATION_SKIPRWLOCKS
318 LIBRADOS_OPERATION_IGNORE_OVERLAY = _LIBRADOS_OPERATION_IGNORE_OVERLAY
319
320 LIBRADOS_ALL_NSPACES = _LIBRADOS_ALL_NSPACES.decode('utf-8')
321
322 LIBRADOS_CREATE_EXCLUSIVE = _LIBRADOS_CREATE_EXCLUSIVE
323 LIBRADOS_CREATE_IDEMPOTENT = _LIBRADOS_CREATE_IDEMPOTENT
324
325 ANONYMOUS_AUID = 0xffffffffffffffff
326 ADMIN_AUID = 0
327
328
329 class Error(Exception):
330 """ `Error` class, derived from `Exception` """
331 pass
332
333
334 class InvalidArgumentError(Error):
335 pass
336
337
338 class OSError(Error):
339 """ `OSError` class, derived from `Error` """
340 def __init__(self, message, errno=None):
341 super(OSError, self).__init__(message)
342 self.errno = errno
343
344 def __str__(self):
345 msg = super(OSError, self).__str__()
346 if self.errno is None:
347 return msg
348 return '[errno {0}] {1}'.format(self.errno, msg)
349
350 def __reduce__(self):
351 return (self.__class__, (self.message, self.errno))
352
353 class InterruptedOrTimeoutError(OSError):
354 """ `InterruptedOrTimeoutError` class, derived from `OSError` """
355 pass
356
357
358 class PermissionError(OSError):
359 """ `PermissionError` class, derived from `OSError` """
360 pass
361
362
363 class PermissionDeniedError(OSError):
364 """ deal with EACCES related. """
365 pass
366
367
368 class ObjectNotFound(OSError):
369 """ `ObjectNotFound` class, derived from `OSError` """
370 pass
371
372
373 class NoData(OSError):
374 """ `NoData` class, derived from `OSError` """
375 pass
376
377
378 class ObjectExists(OSError):
379 """ `ObjectExists` class, derived from `OSError` """
380 pass
381
382
383 class ObjectBusy(OSError):
384 """ `ObjectBusy` class, derived from `IOError` """
385 pass
386
387
388 class IOError(OSError):
389 """ `ObjectBusy` class, derived from `OSError` """
390 pass
391
392
393 class NoSpace(OSError):
394 """ `NoSpace` class, derived from `OSError` """
395 pass
396
397
398 class RadosStateError(Error):
399 """ `RadosStateError` class, derived from `Error` """
400 pass
401
402
403 class IoctxStateError(Error):
404 """ `IoctxStateError` class, derived from `Error` """
405 pass
406
407
408 class ObjectStateError(Error):
409 """ `ObjectStateError` class, derived from `Error` """
410 pass
411
412
413 class LogicError(Error):
414 """ `` class, derived from `Error` """
415 pass
416
417
418 class TimedOut(OSError):
419 """ `TimedOut` class, derived from `OSError` """
420 pass
421
422
423 IF UNAME_SYSNAME == "FreeBSD":
424 cdef errno_to_exception = {
425 errno.EPERM : PermissionError,
426 errno.ENOENT : ObjectNotFound,
427 errno.EIO : IOError,
428 errno.ENOSPC : NoSpace,
429 errno.EEXIST : ObjectExists,
430 errno.EBUSY : ObjectBusy,
431 errno.ENOATTR : NoData,
432 errno.EINTR : InterruptedOrTimeoutError,
433 errno.ETIMEDOUT : TimedOut,
434 errno.EACCES : PermissionDeniedError,
435 errno.EINVAL : InvalidArgumentError,
436 }
437 ELSE:
438 cdef errno_to_exception = {
439 errno.EPERM : PermissionError,
440 errno.ENOENT : ObjectNotFound,
441 errno.EIO : IOError,
442 errno.ENOSPC : NoSpace,
443 errno.EEXIST : ObjectExists,
444 errno.EBUSY : ObjectBusy,
445 errno.ENODATA : NoData,
446 errno.EINTR : InterruptedOrTimeoutError,
447 errno.ETIMEDOUT : TimedOut,
448 errno.EACCES : PermissionDeniedError,
449 errno.EINVAL : InvalidArgumentError,
450 }
451
452
453 cdef make_ex(ret, msg):
454 """
455 Translate a librados return code into an exception.
456
457 :param ret: the return code
458 :type ret: int
459 :param msg: the error message to use
460 :type msg: str
461 :returns: a subclass of :class:`Error`
462 """
463 ret = abs(ret)
464 if ret in errno_to_exception:
465 return errno_to_exception[ret](msg, errno=ret)
466 else:
467 return OSError(msg, errno=ret)
468
469
470 # helper to specify an optional argument, where in addition to `cls`, `None`
471 # is also acceptable
472 def opt(cls):
473 return (cls, None)
474
475
476 # validate argument types of an instance method
477 # kwargs is an un-ordered dict, so use args instead
478 def requires(*types):
479 def is_type_of(v, t):
480 if t is None:
481 return v is None
482 else:
483 return isinstance(v, t)
484
485 def check_type(val, arg_name, arg_type):
486 if isinstance(arg_type, tuple):
487 if any(is_type_of(val, t) for t in arg_type):
488 return
489 type_names = ' or '.join('None' if t is None else t.__name__
490 for t in arg_type)
491 raise TypeError('%s must be %s' % (arg_name, type_names))
492 else:
493 if is_type_of(val, arg_type):
494 return
495 assert(arg_type is not None)
496 raise TypeError('%s must be %s' % (arg_name, arg_type.__name__))
497
498 def wrapper(f):
499 # FIXME(sileht): this stop with
500 # AttributeError: 'method_descriptor' object has no attribute '__module__'
501 # @wraps(f)
502 def validate_func(*args, **kwargs):
503 # ignore the `self` arg
504 pos_args = zip(args[1:], types)
505 named_args = ((kwargs[name], (name, spec)) for name, spec in types
506 if name in kwargs)
507 for arg_val, (arg_name, arg_type) in chain(pos_args, named_args):
508 check_type(arg_val, arg_name, arg_type)
509 return f(*args, **kwargs)
510 return validate_func
511 return wrapper
512
513
514 def cstr(val, name, encoding="utf-8", opt=False):
515 """
516 Create a byte string from a Python string
517
518 :param basestring val: Python string
519 :param str name: Name of the string parameter, for exceptions
520 :param str encoding: Encoding to use
521 :param bool opt: If True, None is allowed
522 :rtype: bytes
523 :raises: :class:`InvalidArgument`
524 """
525 if opt and val is None:
526 return None
527 if isinstance(val, bytes):
528 return val
529 elif isinstance(val, unicode):
530 return val.encode(encoding)
531 else:
532 raise TypeError('%s must be a string' % name)
533
534
535 def cstr_list(list_str, name, encoding="utf-8"):
536 return [cstr(s, name) for s in list_str]
537
538
539 def decode_cstr(val, encoding="utf-8"):
540 """
541 Decode a byte string into a Python string.
542
543 :param bytes val: byte string
544 :rtype: unicode or None
545 """
546 if val is None:
547 return None
548
549 return val.decode(encoding)
550
551
552 cdef char* opt_str(s) except? NULL:
553 if s is None:
554 return NULL
555 return s
556
557
558 cdef void* realloc_chk(void* ptr, size_t size) except NULL:
559 cdef void *ret = realloc(ptr, size)
560 if ret == NULL:
561 raise MemoryError("realloc failed")
562 return ret
563
564
565 cdef size_t * to_csize_t_array(list_int):
566 cdef size_t *ret = <size_t *>malloc(len(list_int) * sizeof(size_t))
567 if ret == NULL:
568 raise MemoryError("malloc failed")
569 for i in xrange(len(list_int)):
570 ret[i] = <size_t>list_int[i]
571 return ret
572
573
574 cdef char ** to_bytes_array(list_bytes):
575 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
576 if ret == NULL:
577 raise MemoryError("malloc failed")
578 for i in xrange(len(list_bytes)):
579 ret[i] = <char *>list_bytes[i]
580 return ret
581
582
583
584 cdef int __monitor_callback(void *arg, const char *line, const char *who,
585 uint64_t sec, uint64_t nsec, uint64_t seq,
586 const char *level, const char *msg) with gil:
587 cdef object cb_info = <object>arg
588 cb_info[0](cb_info[1], line, who, sec, nsec, seq, level, msg)
589 return 0
590
591 cdef int __monitor_callback2(void *arg, const char *line, const char *channel,
592 const char *who,
593 const char *name,
594 uint64_t sec, uint64_t nsec, uint64_t seq,
595 const char *level, const char *msg) with gil:
596 cdef object cb_info = <object>arg
597 cb_info[0](cb_info[1], line, channel, name, who, sec, nsec, seq, level, msg)
598 return 0
599
600
601 class Version(object):
602 """ Version information """
603 def __init__(self, major, minor, extra):
604 self.major = major
605 self.minor = minor
606 self.extra = extra
607
608 def __str__(self):
609 return "%d.%d.%d" % (self.major, self.minor, self.extra)
610
611
612 cdef class Rados(object):
613 """This class wraps librados functions"""
614 # NOTE(sileht): attributes declared in .pyd
615
616 def __init__(self, *args, **kwargs):
617 PyEval_InitThreads()
618 self.__setup(*args, **kwargs)
619
620 @requires(('rados_id', opt(str_type)), ('name', opt(str_type)), ('clustername', opt(str_type)),
621 ('conffile', opt(str_type)))
622 def __setup(self, rados_id=None, name=None, clustername=None,
623 conf_defaults=None, conffile=None, conf=None, flags=0,
624 context=None):
625 self.monitor_callback = None
626 self.monitor_callback2 = None
627 self.parsed_args = []
628 self.conf_defaults = conf_defaults
629 self.conffile = conffile
630 self.rados_id = rados_id
631
632 if rados_id and name:
633 raise Error("Rados(): can't supply both rados_id and name")
634 elif rados_id:
635 name = 'client.' + rados_id
636 elif name is None:
637 name = 'client.admin'
638 if clustername is None:
639 clustername = ''
640
641 name = cstr(name, 'name')
642 clustername = cstr(clustername, 'clustername')
643 cdef:
644 char *_name = name
645 char *_clustername = clustername
646 int _flags = flags
647 int ret
648
649 if context:
650 # Unpack void* (aka rados_config_t) from capsule
651 rados_config = <rados_config_t> PyCapsule_GetPointer(context, NULL)
652 with nogil:
653 ret = rados_create_with_context(&self.cluster, rados_config)
654 else:
655 with nogil:
656 ret = rados_create2(&self.cluster, _clustername, _name, _flags)
657 if ret != 0:
658 raise Error("rados_initialize failed with error code: %d" % ret)
659
660 self.state = "configuring"
661 # order is important: conf_defaults, then conffile, then conf
662 if conf_defaults:
663 for key, value in conf_defaults.items():
664 self.conf_set(key, value)
665 if conffile is not None:
666 # read the default conf file when '' is given
667 if conffile == '':
668 conffile = None
669 self.conf_read_file(conffile)
670 if conf:
671 for key, value in conf.items():
672 self.conf_set(key, value)
673
674 def require_state(self, *args):
675 """
676 Checks if the Rados object is in a special state
677
678 :raises: RadosStateError
679 """
680 if self.state in args:
681 return
682 raise RadosStateError("You cannot perform that operation on a \
683 Rados object in state %s." % self.state)
684
685 def shutdown(self):
686 """
687 Disconnects from the cluster. Call this explicitly when a
688 Rados.connect()ed object is no longer used.
689 """
690 if self.state != "shutdown":
691 with nogil:
692 rados_shutdown(self.cluster)
693 self.state = "shutdown"
694
695 def __enter__(self):
696 self.connect()
697 return self
698
699 def __exit__(self, type_, value, traceback):
700 self.shutdown()
701 return False
702
703 def version(self):
704 """
705 Get the version number of the ``librados`` C library.
706
707 :returns: a tuple of ``(major, minor, extra)`` components of the
708 librados version
709 """
710 cdef int major = 0
711 cdef int minor = 0
712 cdef int extra = 0
713 with nogil:
714 rados_version(&major, &minor, &extra)
715 return Version(major, minor, extra)
716
717 @requires(('path', opt(str_type)))
718 def conf_read_file(self, path=None):
719 """
720 Configure the cluster handle using a Ceph config file.
721
722 :param path: path to the config file
723 :type path: str
724 """
725 self.require_state("configuring", "connected")
726 path = cstr(path, 'path', opt=True)
727 cdef:
728 char *_path = opt_str(path)
729 with nogil:
730 ret = rados_conf_read_file(self.cluster, _path)
731 if ret != 0:
732 raise make_ex(ret, "error calling conf_read_file")
733
734 def conf_parse_argv(self, args):
735 """
736 Parse known arguments from args, and remove; returned
737 args contain only those unknown to ceph
738 """
739 self.require_state("configuring", "connected")
740 if not args:
741 return
742
743 cargs = cstr_list(args, 'args')
744 cdef:
745 int _argc = len(args)
746 char **_argv = to_bytes_array(cargs)
747 char **_remargv = NULL
748
749 try:
750 _remargv = <char **>malloc(_argc * sizeof(char *))
751 with nogil:
752 ret = rados_conf_parse_argv_remainder(self.cluster, _argc,
753 <const char**>_argv,
754 <const char**>_remargv)
755 if ret:
756 raise make_ex(ret, "error calling conf_parse_argv_remainder")
757
758 # _remargv was allocated with fixed argc; collapse return
759 # list to eliminate any missing args
760 retargs = [decode_cstr(a) for a in _remargv[:_argc]
761 if a != NULL]
762 self.parsed_args = args
763 return retargs
764 finally:
765 free(_argv)
766 free(_remargv)
767
768 def conf_parse_env(self, var='CEPH_ARGS'):
769 """
770 Parse known arguments from an environment variable, normally
771 CEPH_ARGS.
772 """
773 self.require_state("configuring", "connected")
774 if not var:
775 return
776
777 var = cstr(var, 'var')
778 cdef:
779 char *_var = var
780 with nogil:
781 ret = rados_conf_parse_env(self.cluster, _var)
782 if ret != 0:
783 raise make_ex(ret, "error calling conf_parse_env")
784
785 @requires(('option', str_type))
786 def conf_get(self, option):
787 """
788 Get the value of a configuration option
789
790 :param option: which option to read
791 :type option: str
792
793 :returns: str - value of the option or None
794 :raises: :class:`TypeError`
795 """
796 self.require_state("configuring", "connected")
797 option = cstr(option, 'option')
798 cdef:
799 char *_option = option
800 size_t length = 20
801 char *ret_buf = NULL
802
803 try:
804 while True:
805 ret_buf = <char *>realloc_chk(ret_buf, length)
806 with nogil:
807 ret = rados_conf_get(self.cluster, _option, ret_buf, length)
808 if ret == 0:
809 return decode_cstr(ret_buf)
810 elif ret == -errno.ENAMETOOLONG:
811 length = length * 2
812 elif ret == -errno.ENOENT:
813 return None
814 else:
815 raise make_ex(ret, "error calling conf_get")
816 finally:
817 free(ret_buf)
818
819 @requires(('option', str_type), ('val', str_type))
820 def conf_set(self, option, val):
821 """
822 Set the value of a configuration option
823
824 :param option: which option to set
825 :type option: str
826 :param option: value of the option
827 :type option: str
828
829 :raises: :class:`TypeError`, :class:`ObjectNotFound`
830 """
831 self.require_state("configuring", "connected")
832 option = cstr(option, 'option')
833 val = cstr(val, 'val')
834 cdef:
835 char *_option = option
836 char *_val = val
837
838 with nogil:
839 ret = rados_conf_set(self.cluster, _option, _val)
840 if ret != 0:
841 raise make_ex(ret, "error calling conf_set")
842
843 def ping_monitor(self, mon_id):
844 """
845 Ping a monitor to assess liveness
846
847 May be used as a simply way to assess liveness, or to obtain
848 information about the monitor in a simple way even in the
849 absence of quorum.
850
851 :param mon_id: the ID portion of the monitor's name (i.e., mon.<ID>)
852 :type mon_id: str
853 :returns: the string reply from the monitor
854 """
855
856 self.require_state("configuring", "connected")
857
858 mon_id = cstr(mon_id, 'mon_id')
859 cdef:
860 char *_mon_id = mon_id
861 size_t outstrlen = 0
862 char *outstr
863
864 with nogil:
865 ret = rados_ping_monitor(self.cluster, _mon_id, &outstr, &outstrlen)
866
867 if ret != 0:
868 raise make_ex(ret, "error calling ping_monitor")
869
870 if outstrlen:
871 my_outstr = outstr[:outstrlen]
872 rados_buffer_free(outstr)
873 return decode_cstr(my_outstr)
874
875 def connect(self, timeout=0):
876 """
877 Connect to the cluster. Use shutdown() to release resources.
878 """
879 self.require_state("configuring")
880 # NOTE(sileht): timeout was supported by old python API,
881 # but this is not something available in C API, so ignore
882 # for now and remove it later
883 with nogil:
884 ret = rados_connect(self.cluster)
885 if ret != 0:
886 raise make_ex(ret, "error connecting to the cluster")
887 self.state = "connected"
888
889 def get_cluster_stats(self):
890 """
891 Read usage info about the cluster
892
893 This tells you total space, space used, space available, and number
894 of objects. These are not updated immediately when data is written,
895 they are eventually consistent.
896
897 :returns: dict - contains the following keys:
898
899 - ``kb`` (int) - total space
900
901 - ``kb_used`` (int) - space used
902
903 - ``kb_avail`` (int) - free space available
904
905 - ``num_objects`` (int) - number of objects
906
907 """
908 cdef:
909 rados_cluster_stat_t stats
910
911 with nogil:
912 ret = rados_cluster_stat(self.cluster, &stats)
913
914 if ret < 0:
915 raise make_ex(
916 ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id)
917 return {'kb': stats.kb,
918 'kb_used': stats.kb_used,
919 'kb_avail': stats.kb_avail,
920 'num_objects': stats.num_objects}
921
922 @requires(('pool_name', str_type))
923 def pool_exists(self, pool_name):
924 """
925 Checks if a given pool exists.
926
927 :param pool_name: name of the pool to check
928 :type pool_name: str
929
930 :raises: :class:`TypeError`, :class:`Error`
931 :returns: bool - whether the pool exists, false otherwise.
932 """
933 self.require_state("connected")
934
935 pool_name = cstr(pool_name, 'pool_name')
936 cdef:
937 char *_pool_name = pool_name
938
939 with nogil:
940 ret = rados_pool_lookup(self.cluster, _pool_name)
941 if ret >= 0:
942 return True
943 elif ret == -errno.ENOENT:
944 return False
945 else:
946 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
947
948 @requires(('pool_name', str_type))
949 def pool_lookup(self, pool_name):
950 """
951 Returns a pool's ID based on its name.
952
953 :param pool_name: name of the pool to look up
954 :type pool_name: str
955
956 :raises: :class:`TypeError`, :class:`Error`
957 :returns: int - pool ID, or None if it doesn't exist
958 """
959 self.require_state("connected")
960 pool_name = cstr(pool_name, 'pool_name')
961 cdef:
962 char *_pool_name = pool_name
963
964 with nogil:
965 ret = rados_pool_lookup(self.cluster, _pool_name)
966 if ret >= 0:
967 return int(ret)
968 elif ret == -errno.ENOENT:
969 return None
970 else:
971 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
972
973 @requires(('pool_id', int))
974 def pool_reverse_lookup(self, pool_id):
975 """
976 Returns a pool's name based on its ID.
977
978 :param pool_id: ID of the pool to look up
979 :type pool_id: int
980
981 :raises: :class:`TypeError`, :class:`Error`
982 :returns: string - pool name, or None if it doesn't exist
983 """
984 self.require_state("connected")
985 cdef:
986 int64_t _pool_id = pool_id
987 size_t size = 512
988 char *name = NULL
989
990 try:
991 while True:
992 name = <char *>realloc_chk(name, size)
993 with nogil:
994 ret = rados_pool_reverse_lookup(self.cluster, _pool_id, name, size)
995 if ret >= 0:
996 break
997 elif ret != -errno.ERANGE and size <= 4096:
998 size *= 2
999 elif ret == -errno.ENOENT:
1000 return None
1001 elif ret < 0:
1002 raise make_ex(ret, "error reverse looking up pool '%s'" % pool_id)
1003
1004 return decode_cstr(name)
1005
1006 finally:
1007 free(name)
1008
1009 @requires(('pool_name', str_type), ('auid', opt(int)), ('crush_rule', opt(int)))
1010 def create_pool(self, pool_name, auid=None, crush_rule=None):
1011 """
1012 Create a pool:
1013 - with default settings: if auid=None and crush_rule=None
1014 - owned by a specific auid: auid given and crush_rule=None
1015 - with a specific CRUSH rule: if auid=None and crush_rule given
1016 - with a specific CRUSH rule and auid: if auid and crush_rule given
1017
1018 :param pool_name: name of the pool to create
1019 :type pool_name: str
1020 :param auid: the id of the owner of the new pool
1021 :type auid: int
1022 :param crush_rule: rule to use for placement in the new pool
1023 :type crush_rule: int
1024
1025 :raises: :class:`TypeError`, :class:`Error`
1026 """
1027 self.require_state("connected")
1028
1029 pool_name = cstr(pool_name, 'pool_name')
1030 cdef:
1031 char *_pool_name = pool_name
1032 uint8_t _crush_rule
1033 uint64_t _auid
1034
1035 if auid is None and crush_rule is None:
1036 with nogil:
1037 ret = rados_pool_create(self.cluster, _pool_name)
1038 elif auid is None:
1039 _crush_rule = crush_rule
1040 with nogil:
1041 ret = rados_pool_create_with_crush_rule(self.cluster, _pool_name, _crush_rule)
1042 elif crush_rule is None:
1043 _auid = auid
1044 with nogil:
1045 ret = rados_pool_create_with_auid(self.cluster, _pool_name, _auid)
1046 else:
1047 _auid = auid
1048 _crush_rule = crush_rule
1049 with nogil:
1050 ret = rados_pool_create_with_all(self.cluster, _pool_name, _auid, _crush_rule)
1051 if ret < 0:
1052 raise make_ex(ret, "error creating pool '%s'" % pool_name)
1053
1054 @requires(('pool_id', int))
1055 def get_pool_base_tier(self, pool_id):
1056 """
1057 Get base pool
1058
1059 :returns: base pool, or pool_id if tiering is not configured for the pool
1060 """
1061 self.require_state("connected")
1062 cdef:
1063 int64_t base_tier = 0
1064 int64_t _pool_id = pool_id
1065
1066 with nogil:
1067 ret = rados_pool_get_base_tier(self.cluster, _pool_id, &base_tier)
1068 if ret < 0:
1069 raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id)
1070 return int(base_tier)
1071
1072 @requires(('pool_name', str_type))
1073 def delete_pool(self, pool_name):
1074 """
1075 Delete a pool and all data inside it.
1076
1077 The pool is removed from the cluster immediately,
1078 but the actual data is deleted in the background.
1079
1080 :param pool_name: name of the pool to delete
1081 :type pool_name: str
1082
1083 :raises: :class:`TypeError`, :class:`Error`
1084 """
1085 self.require_state("connected")
1086
1087 pool_name = cstr(pool_name, 'pool_name')
1088 cdef:
1089 char *_pool_name = pool_name
1090
1091 with nogil:
1092 ret = rados_pool_delete(self.cluster, _pool_name)
1093 if ret < 0:
1094 raise make_ex(ret, "error deleting pool '%s'" % pool_name)
1095
1096 @requires(('pool_id', int))
1097 def get_inconsistent_pgs(self, pool_id):
1098 """
1099 List inconsistent placement groups in the given pool
1100
1101 :param pool_id: ID of the pool in which PGs are listed
1102 :type pool_id: int
1103 :returns: list - inconsistent placement groups
1104 """
1105 self.require_state("connected")
1106 cdef:
1107 int64_t pool = pool_id
1108 size_t size = 512
1109 char *pgs = NULL
1110
1111 try:
1112 while True:
1113 pgs = <char *>realloc_chk(pgs, size);
1114 with nogil:
1115 ret = rados_inconsistent_pg_list(self.cluster, pool,
1116 pgs, size)
1117 if ret > <int>size:
1118 size *= 2
1119 elif ret >= 0:
1120 break
1121 else:
1122 raise make_ex(ret, "error calling inconsistent_pg_list")
1123 return [pg for pg in decode_cstr(pgs[:ret]).split('\0') if pg]
1124 finally:
1125 free(pgs)
1126
1127 def list_pools(self):
1128 """
1129 Gets a list of pool names.
1130
1131 :returns: list - of pool names.
1132 """
1133 self.require_state("connected")
1134 cdef:
1135 size_t size = 512
1136 char *c_names = NULL
1137
1138 try:
1139 while True:
1140 c_names = <char *>realloc_chk(c_names, size)
1141 with nogil:
1142 ret = rados_pool_list(self.cluster, c_names, size)
1143 if ret > <int>size:
1144 size *= 2
1145 elif ret >= 0:
1146 break
1147 return [name for name in decode_cstr(c_names[:ret]).split('\0')
1148 if name]
1149 finally:
1150 free(c_names)
1151
1152 def get_fsid(self):
1153 """
1154 Get the fsid of the cluster as a hexadecimal string.
1155
1156 :raises: :class:`Error`
1157 :returns: str - cluster fsid
1158 """
1159 self.require_state("connected")
1160 cdef:
1161 char *ret_buf
1162 size_t buf_len = 37
1163 PyObject* ret_s = NULL
1164
1165 ret_s = PyBytes_FromStringAndSize(NULL, buf_len)
1166 try:
1167 ret_buf = PyBytes_AsString(ret_s)
1168 with nogil:
1169 ret = rados_cluster_fsid(self.cluster, ret_buf, buf_len)
1170 if ret < 0:
1171 raise make_ex(ret, "error getting cluster fsid")
1172 if ret != <int>buf_len:
1173 _PyBytes_Resize(&ret_s, ret)
1174 return <object>ret_s
1175 finally:
1176 # We DECREF unconditionally: the cast to object above will have
1177 # INCREFed if necessary. This also takes care of exceptions,
1178 # including if _PyString_Resize fails (that will free the string
1179 # itself and set ret_s to NULL, hence XDECREF).
1180 ref.Py_XDECREF(ret_s)
1181
1182 @requires(('ioctx_name', str_type))
1183 def open_ioctx(self, ioctx_name):
1184 """
1185 Create an io context
1186
1187 The io context allows you to perform operations within a particular
1188 pool.
1189
1190 :param ioctx_name: name of the pool
1191 :type ioctx_name: str
1192
1193 :raises: :class:`TypeError`, :class:`Error`
1194 :returns: Ioctx - Rados Ioctx object
1195 """
1196 self.require_state("connected")
1197 ioctx_name = cstr(ioctx_name, 'ioctx_name')
1198 cdef:
1199 rados_ioctx_t ioctx
1200 char *_ioctx_name = ioctx_name
1201 with nogil:
1202 ret = rados_ioctx_create(self.cluster, _ioctx_name, &ioctx)
1203 if ret < 0:
1204 raise make_ex(ret, "error opening pool '%s'" % ioctx_name)
1205 io = Ioctx(ioctx_name)
1206 io.io = ioctx
1207 return io
1208
1209 def mon_command(self, cmd, inbuf, timeout=0, target=None):
1210 """
1211 mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen)
1212 returns (int ret, string outbuf, string outs)
1213 """
1214 # NOTE(sileht): timeout is ignored because C API doesn't provide
1215 # timeout argument, but we keep it for backward compat with old python binding
1216
1217 self.require_state("connected")
1218 cmd = cstr_list(cmd, 'c')
1219
1220 if isinstance(target, int):
1221 # NOTE(sileht): looks weird but test_monmap_dump pass int
1222 target = str(target)
1223
1224 target = cstr(target, 'target', opt=True)
1225 inbuf = cstr(inbuf, 'inbuf')
1226
1227 cdef:
1228 char *_target = opt_str(target)
1229 char **_cmd = to_bytes_array(cmd)
1230 size_t _cmdlen = len(cmd)
1231
1232 char *_inbuf = inbuf
1233 size_t _inbuf_len = len(inbuf)
1234
1235 char *_outbuf
1236 size_t _outbuf_len
1237 char *_outs
1238 size_t _outs_len
1239
1240 try:
1241 if target:
1242 with nogil:
1243 ret = rados_mon_command_target(self.cluster, _target,
1244 <const char **>_cmd, _cmdlen,
1245 <const char*>_inbuf, _inbuf_len,
1246 &_outbuf, &_outbuf_len,
1247 &_outs, &_outs_len)
1248 else:
1249 with nogil:
1250 ret = rados_mon_command(self.cluster,
1251 <const char **>_cmd, _cmdlen,
1252 <const char*>_inbuf, _inbuf_len,
1253 &_outbuf, &_outbuf_len,
1254 &_outs, &_outs_len)
1255
1256 my_outs = decode_cstr(_outs[:_outs_len])
1257 my_outbuf = _outbuf[:_outbuf_len]
1258 if _outs_len:
1259 rados_buffer_free(_outs)
1260 if _outbuf_len:
1261 rados_buffer_free(_outbuf)
1262 return (ret, my_outbuf, my_outs)
1263 finally:
1264 free(_cmd)
1265
1266 def osd_command(self, osdid, cmd, inbuf, timeout=0):
1267 """
1268 osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1269 returns (int ret, string outbuf, string outs)
1270 """
1271 # NOTE(sileht): timeout is ignored because C API doesn't provide
1272 # timeout argument, but we keep it for backward compat with old python binding
1273 self.require_state("connected")
1274
1275 cmd = cstr_list(cmd, 'cmd')
1276 inbuf = cstr(inbuf, 'inbuf')
1277
1278 cdef:
1279 int _osdid = osdid
1280 char **_cmd = to_bytes_array(cmd)
1281 size_t _cmdlen = len(cmd)
1282
1283 char *_inbuf = inbuf
1284 size_t _inbuf_len = len(inbuf)
1285
1286 char *_outbuf
1287 size_t _outbuf_len
1288 char *_outs
1289 size_t _outs_len
1290
1291 try:
1292 with nogil:
1293 ret = rados_osd_command(self.cluster, _osdid,
1294 <const char **>_cmd, _cmdlen,
1295 <const char*>_inbuf, _inbuf_len,
1296 &_outbuf, &_outbuf_len,
1297 &_outs, &_outs_len)
1298
1299 my_outs = decode_cstr(_outs[:_outs_len])
1300 my_outbuf = _outbuf[:_outbuf_len]
1301 if _outs_len:
1302 rados_buffer_free(_outs)
1303 if _outbuf_len:
1304 rados_buffer_free(_outbuf)
1305 return (ret, my_outbuf, my_outs)
1306 finally:
1307 free(_cmd)
1308
1309 def mgr_command(self, cmd, inbuf, timeout=0):
1310 """
1311 returns (int ret, string outbuf, string outs)
1312 """
1313 # NOTE(sileht): timeout is ignored because C API doesn't provide
1314 # timeout argument, but we keep it for backward compat with old python binding
1315 self.require_state("connected")
1316
1317 cmd = cstr_list(cmd, 'cmd')
1318 inbuf = cstr(inbuf, 'inbuf')
1319
1320 cdef:
1321 char **_cmd = to_bytes_array(cmd)
1322 size_t _cmdlen = len(cmd)
1323
1324 char *_inbuf = inbuf
1325 size_t _inbuf_len = len(inbuf)
1326
1327 char *_outbuf
1328 size_t _outbuf_len
1329 char *_outs
1330 size_t _outs_len
1331
1332 try:
1333 with nogil:
1334 ret = rados_mgr_command(self.cluster,
1335 <const char **>_cmd, _cmdlen,
1336 <const char*>_inbuf, _inbuf_len,
1337 &_outbuf, &_outbuf_len,
1338 &_outs, &_outs_len)
1339
1340 my_outs = decode_cstr(_outs[:_outs_len])
1341 my_outbuf = _outbuf[:_outbuf_len]
1342 if _outs_len:
1343 rados_buffer_free(_outs)
1344 if _outbuf_len:
1345 rados_buffer_free(_outbuf)
1346 return (ret, my_outbuf, my_outs)
1347 finally:
1348 free(_cmd)
1349
1350 def pg_command(self, pgid, cmd, inbuf, timeout=0):
1351 """
1352 pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1353 returns (int ret, string outbuf, string outs)
1354 """
1355 # NOTE(sileht): timeout is ignored because C API doesn't provide
1356 # timeout argument, but we keep it for backward compat with old python binding
1357 self.require_state("connected")
1358
1359 pgid = cstr(pgid, 'pgid')
1360 cmd = cstr_list(cmd, 'cmd')
1361 inbuf = cstr(inbuf, 'inbuf')
1362
1363 cdef:
1364 char *_pgid = pgid
1365 char **_cmd = to_bytes_array(cmd)
1366 size_t _cmdlen = len(cmd)
1367
1368 char *_inbuf = inbuf
1369 size_t _inbuf_len = len(inbuf)
1370
1371 char *_outbuf
1372 size_t _outbuf_len
1373 char *_outs
1374 size_t _outs_len
1375
1376 try:
1377 with nogil:
1378 ret = rados_pg_command(self.cluster, _pgid,
1379 <const char **>_cmd, _cmdlen,
1380 <const char *>_inbuf, _inbuf_len,
1381 &_outbuf, &_outbuf_len,
1382 &_outs, &_outs_len)
1383
1384 my_outs = decode_cstr(_outs[:_outs_len])
1385 my_outbuf = _outbuf[:_outbuf_len]
1386 if _outs_len:
1387 rados_buffer_free(_outs)
1388 if _outbuf_len:
1389 rados_buffer_free(_outbuf)
1390 return (ret, my_outbuf, my_outs)
1391 finally:
1392 free(_cmd)
1393
1394 def wait_for_latest_osdmap(self):
1395 self.require_state("connected")
1396 with nogil:
1397 ret = rados_wait_for_latest_osdmap(self.cluster)
1398 return ret
1399
1400 def blacklist_add(self, client_address, expire_seconds=0):
1401 """
1402 Blacklist a client from the OSDs
1403
1404 :param client_address: client address
1405 :type client_address: str
1406 :param expire_seconds: number of seconds to blacklist
1407 :type expire_seconds: int
1408
1409 :raises: :class:`Error`
1410 """
1411 self.require_state("connected")
1412 client_address = cstr(client_address, 'client_address')
1413 cdef:
1414 uint32_t _expire_seconds = expire_seconds
1415 char *_client_address = client_address
1416
1417 with nogil:
1418 ret = rados_blacklist_add(self.cluster, _client_address, _expire_seconds)
1419 if ret < 0:
1420 raise make_ex(ret, "error blacklisting client '%s'" % client_address)
1421
1422 def monitor_log(self, level, callback, arg):
1423 if level not in MONITOR_LEVELS:
1424 raise LogicError("invalid monitor level " + level)
1425 if callback is not None and not callable(callback):
1426 raise LogicError("callback must be a callable function or None")
1427
1428 level = cstr(level, 'level')
1429 cdef char *_level = level
1430
1431 if callback is None:
1432 with nogil:
1433 r = rados_monitor_log(self.cluster, <const char*>_level, NULL, NULL)
1434 self.monitor_callback = None
1435 self.monitor_callback2 = None
1436 return
1437
1438 cb = (callback, arg)
1439 cdef PyObject* _arg = <PyObject*>cb
1440 with nogil:
1441 r = rados_monitor_log(self.cluster, <const char*>_level,
1442 <rados_log_callback_t>&__monitor_callback, _arg)
1443
1444 if r:
1445 raise make_ex(r, 'error calling rados_monitor_log')
1446 # NOTE(sileht): Prevents the callback method from being garbage collected
1447 self.monitor_callback = cb
1448 self.monitor_callback2 = None
1449
1450 def monitor_log2(self, level, callback, arg):
1451 if level not in MONITOR_LEVELS:
1452 raise LogicError("invalid monitor level " + level)
1453 if callback is not None and not callable(callback):
1454 raise LogicError("callback must be a callable function or None")
1455
1456 level = cstr(level, 'level')
1457 cdef char *_level = level
1458
1459 if callback is None:
1460 with nogil:
1461 r = rados_monitor_log2(self.cluster, <const char*>_level, NULL, NULL)
1462 self.monitor_callback = None
1463 self.monitor_callback2 = None
1464 return
1465
1466 cb = (callback, arg)
1467 cdef PyObject* _arg = <PyObject*>cb
1468 with nogil:
1469 r = rados_monitor_log2(self.cluster, <const char*>_level,
1470 <rados_log_callback2_t>&__monitor_callback2, _arg)
1471
1472 if r:
1473 raise make_ex(r, 'error calling rados_monitor_log')
1474 # NOTE(sileht): Prevents the callback method from being garbage collected
1475 self.monitor_callback = None
1476 self.monitor_callback2 = cb
1477
1478
1479 cdef class OmapIterator(object):
1480 """Omap iterator"""
1481
1482 cdef public Ioctx ioctx
1483 cdef rados_omap_iter_t ctx
1484
1485 def __cinit__(self, Ioctx ioctx):
1486 self.ioctx = ioctx
1487
1488 def __iter__(self):
1489 return self
1490
1491 def __next__(self):
1492 """
1493 Get the next key-value pair in the object
1494 :returns: next rados.OmapItem
1495 """
1496 cdef:
1497 char *key_ = NULL
1498 char *val_ = NULL
1499 size_t len_
1500
1501 with nogil:
1502 ret = rados_omap_get_next(self.ctx, &key_, &val_, &len_)
1503
1504 if ret != 0:
1505 raise make_ex(ret, "error iterating over the omap")
1506 if key_ == NULL:
1507 raise StopIteration()
1508 key = decode_cstr(key_)
1509 val = None
1510 if val_ != NULL:
1511 val = val_[:len_]
1512 return (key, val)
1513
1514 def __dealloc__(self):
1515 with nogil:
1516 rados_omap_get_end(self.ctx)
1517
1518
1519 cdef class ObjectIterator(object):
1520 """rados.Ioctx Object iterator"""
1521
1522 cdef rados_list_ctx_t ctx
1523
1524 cdef public object ioctx
1525
1526 def __cinit__(self, Ioctx ioctx):
1527 self.ioctx = ioctx
1528
1529 with nogil:
1530 ret = rados_nobjects_list_open(ioctx.io, &self.ctx)
1531 if ret < 0:
1532 raise make_ex(ret, "error iterating over the objects in ioctx '%s'"
1533 % self.ioctx.name)
1534
1535 def __iter__(self):
1536 return self
1537
1538 def __next__(self):
1539 """
1540 Get the next object name and locator in the pool
1541
1542 :raises: StopIteration
1543 :returns: next rados.Ioctx Object
1544 """
1545 cdef:
1546 const char *key_ = NULL
1547 const char *locator_ = NULL
1548 const char *nspace_ = NULL
1549
1550 with nogil:
1551 ret = rados_nobjects_list_next(self.ctx, &key_, &locator_, &nspace_)
1552
1553 if ret < 0:
1554 raise StopIteration()
1555
1556 key = decode_cstr(key_)
1557 locator = decode_cstr(locator_) if locator_ != NULL else None
1558 nspace = decode_cstr(nspace_) if nspace_ != NULL else None
1559 return Object(self.ioctx, key, locator, nspace)
1560
1561 def __dealloc__(self):
1562 with nogil:
1563 rados_nobjects_list_close(self.ctx)
1564
1565
1566 cdef class XattrIterator(object):
1567 """Extended attribute iterator"""
1568
1569 cdef rados_xattrs_iter_t it
1570 cdef char* _oid
1571
1572 cdef public Ioctx ioctx
1573 cdef public object oid
1574
1575 def __cinit__(self, Ioctx ioctx, oid):
1576 self.ioctx = ioctx
1577 self.oid = cstr(oid, 'oid')
1578 self._oid = self.oid
1579
1580 with nogil:
1581 ret = rados_getxattrs(ioctx.io, self._oid, &self.it)
1582 if ret != 0:
1583 raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid)
1584
1585 def __iter__(self):
1586 return self
1587
1588 def __next__(self):
1589 """
1590 Get the next xattr on the object
1591
1592 :raises: StopIteration
1593 :returns: pair - of name and value of the next Xattr
1594 """
1595 cdef:
1596 const char *name_ = NULL
1597 const char *val_ = NULL
1598 size_t len_ = 0
1599
1600 with nogil:
1601 ret = rados_getxattrs_next(self.it, &name_, &val_, &len_)
1602 if ret != 0:
1603 raise make_ex(ret, "error iterating over the extended attributes \
1604 in '%s'" % self.oid)
1605 if name_ == NULL:
1606 raise StopIteration()
1607 name = decode_cstr(name_)
1608 val = val_[:len_]
1609 return (name, val)
1610
1611 def __dealloc__(self):
1612 with nogil:
1613 rados_getxattrs_end(self.it)
1614
1615
1616 cdef class SnapIterator(object):
1617 """Snapshot iterator"""
1618
1619 cdef public Ioctx ioctx
1620
1621 cdef rados_snap_t *snaps
1622 cdef int max_snap
1623 cdef int cur_snap
1624
1625 def __cinit__(self, Ioctx ioctx):
1626 self.ioctx = ioctx
1627 # We don't know how big a buffer we need until we've called the
1628 # function. So use the exponential doubling strategy.
1629 cdef int num_snaps = 10
1630 while True:
1631 self.snaps = <rados_snap_t*>realloc_chk(self.snaps,
1632 num_snaps *
1633 sizeof(rados_snap_t))
1634
1635 with nogil:
1636 ret = rados_ioctx_snap_list(ioctx.io, self.snaps, num_snaps)
1637 if ret >= 0:
1638 self.max_snap = ret
1639 break
1640 elif ret != -errno.ERANGE:
1641 raise make_ex(ret, "error calling rados_snap_list for \
1642 ioctx '%s'" % self.ioctx.name)
1643 num_snaps = num_snaps * 2
1644 self.cur_snap = 0
1645
1646 def __iter__(self):
1647 return self
1648
1649 def __next__(self):
1650 """
1651 Get the next Snapshot
1652
1653 :raises: :class:`Error`, StopIteration
1654 :returns: Snap - next snapshot
1655 """
1656 if self.cur_snap >= self.max_snap:
1657 raise StopIteration
1658
1659 cdef:
1660 rados_snap_t snap_id = self.snaps[self.cur_snap]
1661 int name_len = 10
1662 char *name = NULL
1663
1664 try:
1665 while True:
1666 name = <char *>realloc_chk(name, name_len)
1667 with nogil:
1668 ret = rados_ioctx_snap_get_name(self.ioctx.io, snap_id, name, name_len)
1669 if ret == 0:
1670 break
1671 elif ret != -errno.ERANGE:
1672 raise make_ex(ret, "rados_snap_get_name error")
1673 else:
1674 name_len = name_len * 2
1675
1676 snap = Snap(self.ioctx, decode_cstr(name[:name_len]).rstrip('\0'), snap_id)
1677 self.cur_snap = self.cur_snap + 1
1678 return snap
1679 finally:
1680 free(name)
1681
1682
1683 cdef class Snap(object):
1684 """Snapshot object"""
1685 cdef public Ioctx ioctx
1686 cdef public object name
1687
1688 # NOTE(sileht): old API was storing the ctypes object
1689 # instead of the value ....
1690 cdef public rados_snap_t snap_id
1691
1692 def __cinit__(self, Ioctx ioctx, object name, rados_snap_t snap_id):
1693 self.ioctx = ioctx
1694 self.name = name
1695 self.snap_id = snap_id
1696
1697 def __str__(self):
1698 return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \
1699 % (str(self.ioctx), self.name, self.snap_id)
1700
1701 def get_timestamp(self):
1702 """
1703 Find when a snapshot in the current pool occurred
1704
1705 :raises: :class:`Error`
1706 :returns: datetime - the data and time the snapshot was created
1707 """
1708 cdef time_t snap_time
1709
1710 with nogil:
1711 ret = rados_ioctx_snap_get_stamp(self.ioctx.io, self.snap_id, &snap_time)
1712 if ret != 0:
1713 raise make_ex(ret, "rados_ioctx_snap_get_stamp error")
1714 return datetime.fromtimestamp(snap_time)
1715
1716
1717 cdef class Completion(object):
1718 """completion object"""
1719
1720 cdef public:
1721 Ioctx ioctx
1722 object oncomplete
1723 object onsafe
1724
1725 cdef:
1726 rados_callback_t complete_cb
1727 rados_callback_t safe_cb
1728 rados_completion_t rados_comp
1729 PyObject* buf
1730
1731 def __cinit__(self, Ioctx ioctx, object oncomplete, object onsafe):
1732 self.oncomplete = oncomplete
1733 self.onsafe = onsafe
1734 self.ioctx = ioctx
1735
1736 def is_safe(self):
1737 """
1738 Is an asynchronous operation safe?
1739
1740 This does not imply that the safe callback has finished.
1741
1742 :returns: True if the operation is safe
1743 """
1744 with nogil:
1745 ret = rados_aio_is_safe(self.rados_comp)
1746 return ret == 1
1747
1748 def is_complete(self):
1749 """
1750 Has an asynchronous operation completed?
1751
1752 This does not imply that the safe callback has finished.
1753
1754 :returns: True if the operation is completed
1755 """
1756 with nogil:
1757 ret = rados_aio_is_complete(self.rados_comp)
1758 return ret == 1
1759
1760 def wait_for_safe(self):
1761 """
1762 Wait for an asynchronous operation to be marked safe
1763
1764 This does not imply that the safe callback has finished.
1765 """
1766 with nogil:
1767 rados_aio_wait_for_safe(self.rados_comp)
1768
1769 def wait_for_complete(self):
1770 """
1771 Wait for an asynchronous operation to complete
1772
1773 This does not imply that the complete callback has finished.
1774 """
1775 with nogil:
1776 rados_aio_wait_for_complete(self.rados_comp)
1777
1778 def wait_for_safe_and_cb(self):
1779 """
1780 Wait for an asynchronous operation to be marked safe and for
1781 the safe callback to have returned
1782 """
1783 with nogil:
1784 rados_aio_wait_for_safe_and_cb(self.rados_comp)
1785
1786 def wait_for_complete_and_cb(self):
1787 """
1788 Wait for an asynchronous operation to complete and for the
1789 complete callback to have returned
1790
1791 :returns: whether the operation is completed
1792 """
1793 with nogil:
1794 ret = rados_aio_wait_for_complete_and_cb(self.rados_comp)
1795 return ret
1796
1797 def get_return_value(self):
1798 """
1799 Get the return value of an asychronous operation
1800
1801 The return value is set when the operation is complete or safe,
1802 whichever comes first.
1803
1804 :returns: int - return value of the operation
1805 """
1806 with nogil:
1807 ret = rados_aio_get_return_value(self.rados_comp)
1808 return ret
1809
1810 def __dealloc__(self):
1811 """
1812 Release a completion
1813
1814 Call this when you no longer need the completion. It may not be
1815 freed immediately if the operation is not acked and committed.
1816 """
1817 ref.Py_XDECREF(self.buf)
1818 self.buf = NULL
1819 if self.rados_comp != NULL:
1820 with nogil:
1821 rados_aio_release(self.rados_comp)
1822 self.rados_comp = NULL
1823
1824 def _complete(self):
1825 self.oncomplete(self)
1826 with self.ioctx.lock:
1827 if self.oncomplete:
1828 self.ioctx.complete_completions.remove(self)
1829
1830 def _safe(self):
1831 self.onsafe(self)
1832 with self.ioctx.lock:
1833 if self.onsafe:
1834 self.ioctx.safe_completions.remove(self)
1835
1836 def _cleanup(self):
1837 with self.ioctx.lock:
1838 if self.oncomplete:
1839 self.ioctx.complete_completions.remove(self)
1840 if self.onsafe:
1841 self.ioctx.safe_completions.remove(self)
1842
1843
1844 class OpCtx(object):
1845 def __enter__(self):
1846 return self.create()
1847
1848 def __exit__(self, type, msg, traceback):
1849 self.release()
1850
1851
1852 cdef class WriteOp(object):
1853 cdef rados_write_op_t write_op
1854
1855 def create(self):
1856 with nogil:
1857 self.write_op = rados_create_write_op()
1858 return self
1859
1860 def release(self):
1861 with nogil:
1862 rados_release_write_op(self.write_op)
1863
1864 @requires(('exclusive', opt(int)))
1865 def new(self, exclusive=None):
1866 """
1867 Create the object.
1868 """
1869
1870 cdef:
1871 int _exclusive = exclusive
1872
1873 with nogil:
1874 rados_write_op_create(self.write_op, _exclusive, NULL)
1875
1876
1877 def remove(self):
1878 """
1879 Remove object.
1880 """
1881 with nogil:
1882 rados_write_op_remove(self.write_op)
1883
1884 @requires(('flags', int))
1885 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
1886 """
1887 Set flags for the last operation added to this write_op.
1888 :para flags: flags to apply to the last operation
1889 :type flags: int
1890 """
1891
1892 cdef:
1893 int _flags = flags
1894
1895 with nogil:
1896 rados_write_op_set_flags(self.write_op, _flags)
1897
1898 @requires(('to_write', bytes))
1899 def append(self, to_write):
1900 """
1901 Append data to an object synchronously
1902 :param to_write: data to write
1903 :type to_write: bytes
1904 """
1905
1906 cdef:
1907 char *_to_write = to_write
1908 size_t length = len(to_write)
1909
1910 with nogil:
1911 rados_write_op_append(self.write_op, _to_write, length)
1912
1913 @requires(('to_write', bytes))
1914 def write_full(self, to_write):
1915 """
1916 Write whole object, atomically replacing it.
1917 :param to_write: data to write
1918 :type to_write: bytes
1919 """
1920
1921 cdef:
1922 char *_to_write = to_write
1923 size_t length = len(to_write)
1924
1925 with nogil:
1926 rados_write_op_write_full(self.write_op, _to_write, length)
1927
1928 @requires(('to_write', bytes), ('offset', int))
1929 def write(self, to_write, offset=0):
1930 """
1931 Write to offset.
1932 :param to_write: data to write
1933 :type to_write: bytes
1934 :param offset: byte offset in the object to begin writing at
1935 :type offset: int
1936 """
1937
1938 cdef:
1939 char *_to_write = to_write
1940 size_t length = len(to_write)
1941 uint64_t _offset = offset
1942
1943 with nogil:
1944 rados_write_op_write(self.write_op, _to_write, length, _offset)
1945
1946 @requires(('offset', int), ('length', int))
1947 def zero(self, offset, length):
1948 """
1949 Zero part of an object.
1950 :param offset: byte offset in the object to begin writing at
1951 :type offset: int
1952 :param offset: number of zero to write
1953 :type offset: int
1954 """
1955
1956 cdef:
1957 size_t _length = length
1958 uint64_t _offset = offset
1959
1960 with nogil:
1961 rados_write_op_zero(self.write_op, _length, _offset)
1962
1963 @requires(('offset', int))
1964 def truncate(self, offset):
1965 """
1966 Truncate an object.
1967 :param offset: byte offset in the object to begin truncating at
1968 :type offset: int
1969 """
1970
1971 cdef:
1972 uint64_t _offset = offset
1973
1974 with nogil:
1975 rados_write_op_truncate(self.write_op, _offset)
1976
1977
1978 class WriteOpCtx(WriteOp, OpCtx):
1979 """write operation context manager"""
1980
1981
1982 cdef class ReadOp(object):
1983 cdef rados_read_op_t read_op
1984
1985 def create(self):
1986 with nogil:
1987 self.read_op = rados_create_read_op()
1988 return self
1989
1990 def release(self):
1991 with nogil:
1992 rados_release_read_op(self.read_op)
1993
1994 @requires(('flags', int))
1995 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
1996 """
1997 Set flags for the last operation added to this read_op.
1998 :para flags: flags to apply to the last operation
1999 :type flags: int
2000 """
2001
2002 cdef:
2003 int _flags = flags
2004
2005 with nogil:
2006 rados_read_op_set_flags(self.read_op, _flags)
2007
2008
2009 class ReadOpCtx(ReadOp, OpCtx):
2010 """read operation context manager"""
2011
2012
2013 cdef int __aio_safe_cb(rados_completion_t completion, void *args) with gil:
2014 """
2015 Callback to onsafe() for asynchronous operations
2016 """
2017 cdef object cb = <object>args
2018 cb._safe()
2019 return 0
2020
2021
2022 cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
2023 """
2024 Callback to oncomplete() for asynchronous operations
2025 """
2026 cdef object cb = <object>args
2027 cb._complete()
2028 return 0
2029
2030
2031 cdef class Ioctx(object):
2032 """rados.Ioctx object"""
2033 # NOTE(sileht): attributes declared in .pyd
2034
2035 def __init__(self, name):
2036 self.name = name
2037 self.state = "open"
2038
2039 self.locator_key = ""
2040 self.nspace = ""
2041 self.lock = threading.Lock()
2042 self.safe_completions = []
2043 self.complete_completions = []
2044
2045 def __enter__(self):
2046 return self
2047
2048 def __exit__(self, type_, value, traceback):
2049 self.close()
2050 return False
2051
2052 def __dealloc__(self):
2053 self.close()
2054
2055 def __track_completion(self, completion_obj):
2056 if completion_obj.oncomplete:
2057 with self.lock:
2058 self.complete_completions.append(completion_obj)
2059 if completion_obj.onsafe:
2060 with self.lock:
2061 self.safe_completions.append(completion_obj)
2062
2063 def __get_completion(self, oncomplete, onsafe):
2064 """
2065 Constructs a completion to use with asynchronous operations
2066
2067 :param oncomplete: what to do when the write is safe and complete in memory
2068 on all replicas
2069 :type oncomplete: completion
2070 :param onsafe: what to do when the write is safe and complete on storage
2071 on all replicas
2072 :type onsafe: completion
2073
2074 :raises: :class:`Error`
2075 :returns: completion object
2076 """
2077
2078 completion_obj = Completion(self, oncomplete, onsafe)
2079
2080 cdef:
2081 rados_callback_t complete_cb = NULL
2082 rados_callback_t safe_cb = NULL
2083 rados_completion_t completion
2084 PyObject* p_completion_obj= <PyObject*>completion_obj
2085
2086 if oncomplete:
2087 complete_cb = <rados_callback_t>&__aio_complete_cb
2088 if onsafe:
2089 safe_cb = <rados_callback_t>&__aio_safe_cb
2090
2091 with nogil:
2092 ret = rados_aio_create_completion(p_completion_obj, complete_cb, safe_cb,
2093 &completion)
2094 if ret < 0:
2095 raise make_ex(ret, "error getting a completion")
2096
2097 completion_obj.rados_comp = completion
2098 return completion_obj
2099
2100 @requires(('object_name', str_type), ('oncomplete', opt(Callable)))
2101 def aio_stat(self, object_name, oncomplete):
2102 """
2103 Asynchronously get object stats (size/mtime)
2104
2105 oncomplete will be called with the returned size and mtime
2106 as well as the completion:
2107
2108 oncomplete(completion, size, mtime)
2109
2110 :param object_name: the name of the object to get stats from
2111 :type object_name: str
2112 :param oncomplete: what to do when the stat is complete
2113 :type oncomplete: completion
2114
2115 :raises: :class:`Error`
2116 :returns: completion object
2117 """
2118
2119 object_name = cstr(object_name, 'object_name')
2120
2121 cdef:
2122 Completion completion
2123 char *_object_name = object_name
2124 uint64_t psize
2125 time_t pmtime
2126
2127 def oncomplete_(completion_v):
2128 cdef Completion _completion_v = completion_v
2129 return_value = _completion_v.get_return_value()
2130 if return_value >= 0:
2131 return oncomplete(_completion_v, psize, time.localtime(pmtime))
2132 else:
2133 return oncomplete(_completion_v, None, None)
2134
2135 completion = self.__get_completion(oncomplete_, None)
2136 self.__track_completion(completion)
2137 with nogil:
2138 ret = rados_aio_stat(self.io, _object_name, completion.rados_comp,
2139 &psize, &pmtime)
2140
2141 if ret < 0:
2142 completion._cleanup()
2143 raise make_ex(ret, "error stating %s" % object_name)
2144 return completion
2145
2146 @requires(('object_name', str_type), ('to_write', bytes), ('offset', int),
2147 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2148 def aio_write(self, object_name, to_write, offset=0,
2149 oncomplete=None, onsafe=None):
2150 """
2151 Write data to an object asynchronously
2152
2153 Queues the write and returns.
2154
2155 :param object_name: name of the object
2156 :type object_name: str
2157 :param to_write: data to write
2158 :type to_write: bytes
2159 :param offset: byte offset in the object to begin writing at
2160 :type offset: int
2161 :param oncomplete: what to do when the write is safe and complete in memory
2162 on all replicas
2163 :type oncomplete: completion
2164 :param onsafe: what to do when the write is safe and complete on storage
2165 on all replicas
2166 :type onsafe: completion
2167
2168 :raises: :class:`Error`
2169 :returns: completion object
2170 """
2171
2172 object_name = cstr(object_name, 'object_name')
2173
2174 cdef:
2175 Completion completion
2176 char* _object_name = object_name
2177 char* _to_write = to_write
2178 size_t size = len(to_write)
2179 uint64_t _offset = offset
2180
2181 completion = self.__get_completion(oncomplete, onsafe)
2182 self.__track_completion(completion)
2183 with nogil:
2184 ret = rados_aio_write(self.io, _object_name, completion.rados_comp,
2185 _to_write, size, _offset)
2186 if ret < 0:
2187 completion._cleanup()
2188 raise make_ex(ret, "error writing object %s" % object_name)
2189 return completion
2190
2191 @requires(('object_name', str_type), ('to_write', bytes), ('oncomplete', opt(Callable)),
2192 ('onsafe', opt(Callable)))
2193 def aio_write_full(self, object_name, to_write,
2194 oncomplete=None, onsafe=None):
2195 """
2196 Asychronously write an entire object
2197
2198 The object is filled with the provided data. If the object exists,
2199 it is atomically truncated and then written.
2200 Queues the write and returns.
2201
2202 :param object_name: name of the object
2203 :type object_name: str
2204 :param to_write: data to write
2205 :type to_write: str
2206 :param oncomplete: what to do when the write is safe and complete in memory
2207 on all replicas
2208 :type oncomplete: completion
2209 :param onsafe: what to do when the write is safe and complete on storage
2210 on all replicas
2211 :type onsafe: completion
2212
2213 :raises: :class:`Error`
2214 :returns: completion object
2215 """
2216
2217 object_name = cstr(object_name, 'object_name')
2218
2219 cdef:
2220 Completion completion
2221 char* _object_name = object_name
2222 char* _to_write = to_write
2223 size_t size = len(to_write)
2224
2225 completion = self.__get_completion(oncomplete, onsafe)
2226 self.__track_completion(completion)
2227 with nogil:
2228 ret = rados_aio_write_full(self.io, _object_name,
2229 completion.rados_comp,
2230 _to_write, size)
2231 if ret < 0:
2232 completion._cleanup()
2233 raise make_ex(ret, "error writing object %s" % object_name)
2234 return completion
2235
2236 @requires(('object_name', str_type), ('to_append', bytes), ('oncomplete', opt(Callable)),
2237 ('onsafe', opt(Callable)))
2238 def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):
2239 """
2240 Asychronously append data to an object
2241
2242 Queues the write and returns.
2243
2244 :param object_name: name of the object
2245 :type object_name: str
2246 :param to_append: data to append
2247 :type to_append: str
2248 :param offset: byte offset in the object to begin writing at
2249 :type offset: int
2250 :param oncomplete: what to do when the write is safe and complete in memory
2251 on all replicas
2252 :type oncomplete: completion
2253 :param onsafe: what to do when the write is safe and complete on storage
2254 on all replicas
2255 :type onsafe: completion
2256
2257 :raises: :class:`Error`
2258 :returns: completion object
2259 """
2260 object_name = cstr(object_name, 'object_name')
2261
2262 cdef:
2263 Completion completion
2264 char* _object_name = object_name
2265 char* _to_append = to_append
2266 size_t size = len(to_append)
2267
2268 completion = self.__get_completion(oncomplete, onsafe)
2269 self.__track_completion(completion)
2270 with nogil:
2271 ret = rados_aio_append(self.io, _object_name,
2272 completion.rados_comp,
2273 _to_append, size)
2274 if ret < 0:
2275 completion._cleanup()
2276 raise make_ex(ret, "error appending object %s" % object_name)
2277 return completion
2278
2279 def aio_flush(self):
2280 """
2281 Block until all pending writes in an io context are safe
2282
2283 :raises: :class:`Error`
2284 """
2285 with nogil:
2286 ret = rados_aio_flush(self.io)
2287 if ret < 0:
2288 raise make_ex(ret, "error flushing")
2289
2290 @requires(('object_name', str_type), ('length', int), ('offset', int),
2291 ('oncomplete', opt(Callable)))
2292 def aio_read(self, object_name, length, offset, oncomplete):
2293 """
2294 Asychronously read data from an object
2295
2296 oncomplete will be called with the returned read value as
2297 well as the completion:
2298
2299 oncomplete(completion, data_read)
2300
2301 :param object_name: name of the object to read from
2302 :type object_name: str
2303 :param length: the number of bytes to read
2304 :type length: int
2305 :param offset: byte offset in the object to begin reading from
2306 :type offset: int
2307 :param oncomplete: what to do when the read is complete
2308 :type oncomplete: completion
2309
2310 :raises: :class:`Error`
2311 :returns: completion object
2312 """
2313
2314 object_name = cstr(object_name, 'object_name')
2315
2316 cdef:
2317 Completion completion
2318 char* _object_name = object_name
2319 uint64_t _offset = offset
2320
2321 char *ref_buf
2322 size_t _length = length
2323
2324 def oncomplete_(completion_v):
2325 cdef Completion _completion_v = completion_v
2326 return_value = _completion_v.get_return_value()
2327 if return_value > 0 and return_value != length:
2328 _PyBytes_Resize(&_completion_v.buf, return_value)
2329 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2330
2331 completion = self.__get_completion(oncomplete_, None)
2332 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2333 ret_buf = PyBytes_AsString(completion.buf)
2334 self.__track_completion(completion)
2335 with nogil:
2336 ret = rados_aio_read(self.io, _object_name, completion.rados_comp,
2337 ret_buf, _length, _offset)
2338 if ret < 0:
2339 completion._cleanup()
2340 raise make_ex(ret, "error reading %s" % object_name)
2341 return completion
2342
2343 @requires(('object_name', str_type), ('cls', str_type), ('method', str_type),
2344 ('data', bytes), ('length', int),
2345 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2346 def aio_execute(self, object_name, cls, method, data,
2347 length=8192, oncomplete=None, onsafe=None):
2348 """
2349 Asynchronously execute an OSD class method on an object.
2350
2351 oncomplete and onsafe will be called with the data returned from
2352 the plugin as well as the completion:
2353
2354 oncomplete(completion, data)
2355 onsafe(completion, data)
2356
2357 :param object_name: name of the object
2358 :type object_name: str
2359 :param cls: name of the object class
2360 :type cls: str
2361 :param method: name of the method
2362 :type method: str
2363 :param data: input data
2364 :type data: bytes
2365 :param length: size of output buffer in bytes (default=8192)
2366 :type length: int
2367 :param oncomplete: what to do when the execution is complete
2368 :type oncomplete: completion
2369 :param onsafe: what to do when the execution is safe and complete
2370 :type onsafe: completion
2371
2372 :raises: :class:`Error`
2373 :returns: completion object
2374 """
2375
2376 object_name = cstr(object_name, 'object_name')
2377 cls = cstr(cls, 'cls')
2378 method = cstr(method, 'method')
2379 cdef:
2380 Completion completion
2381 char *_object_name = object_name
2382 char *_cls = cls
2383 char *_method = method
2384 char *_data = data
2385 size_t _data_len = len(data)
2386
2387 char *ref_buf
2388 size_t _length = length
2389
2390 def oncomplete_(completion_v):
2391 cdef Completion _completion_v = completion_v
2392 return_value = _completion_v.get_return_value()
2393 if return_value > 0 and return_value != length:
2394 _PyBytes_Resize(&_completion_v.buf, return_value)
2395 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2396
2397 def onsafe_(completion_v):
2398 cdef Completion _completion_v = completion_v
2399 return_value = _completion_v.get_return_value()
2400 return onsafe(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2401
2402 completion = self.__get_completion(oncomplete_ if oncomplete else None, onsafe_ if onsafe else None)
2403 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2404 ret_buf = PyBytes_AsString(completion.buf)
2405 self.__track_completion(completion)
2406 with nogil:
2407 ret = rados_aio_exec(self.io, _object_name, completion.rados_comp,
2408 _cls, _method, _data, _data_len, ret_buf, _length)
2409 if ret < 0:
2410 completion._cleanup()
2411 raise make_ex(ret, "error executing %s::%s on %s" % (cls, method, object_name))
2412 return completion
2413
2414 @requires(('object_name', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2415 def aio_remove(self, object_name, oncomplete=None, onsafe=None):
2416 """
2417 Asychronously remove an object
2418
2419 :param object_name: name of the object to remove
2420 :type object_name: str
2421 :param oncomplete: what to do when the remove is safe and complete in memory
2422 on all replicas
2423 :type oncomplete: completion
2424 :param onsafe: what to do when the remove is safe and complete on storage
2425 on all replicas
2426 :type onsafe: completion
2427
2428 :raises: :class:`Error`
2429 :returns: completion object
2430 """
2431 object_name = cstr(object_name, 'object_name')
2432
2433 cdef:
2434 Completion completion
2435 char* _object_name = object_name
2436
2437 completion = self.__get_completion(oncomplete, onsafe)
2438 self.__track_completion(completion)
2439 with nogil:
2440 ret = rados_aio_remove(self.io, _object_name,
2441 completion.rados_comp)
2442 if ret < 0:
2443 completion._cleanup()
2444 raise make_ex(ret, "error removing %s" % object_name)
2445 return completion
2446
2447 def require_ioctx_open(self):
2448 """
2449 Checks if the rados.Ioctx object state is 'open'
2450
2451 :raises: IoctxStateError
2452 """
2453 if self.state != "open":
2454 raise IoctxStateError("The pool is %s" % self.state)
2455
2456 def change_auid(self, auid):
2457 """
2458 Attempt to change an io context's associated auid "owner."
2459
2460 Requires that you have write permission on both the current and new
2461 auid.
2462
2463 :raises: :class:`Error`
2464 """
2465 self.require_ioctx_open()
2466
2467 cdef:
2468 uint64_t _auid = auid
2469
2470 with nogil:
2471 ret = rados_ioctx_pool_set_auid(self.io, _auid)
2472 if ret < 0:
2473 raise make_ex(ret, "error changing auid of '%s' to %d"
2474 % (self.name, auid))
2475
2476 @requires(('loc_key', str_type))
2477 def set_locator_key(self, loc_key):
2478 """
2479 Set the key for mapping objects to pgs within an io context.
2480
2481 The key is used instead of the object name to determine which
2482 placement groups an object is put in. This affects all subsequent
2483 operations of the io context - until a different locator key is
2484 set, all objects in this io context will be placed in the same pg.
2485
2486 :param loc_key: the key to use as the object locator, or NULL to discard
2487 any previously set key
2488 :type loc_key: str
2489
2490 :raises: :class:`TypeError`
2491 """
2492 self.require_ioctx_open()
2493 cloc_key = cstr(loc_key, 'loc_key')
2494 cdef char *_loc_key = cloc_key
2495 with nogil:
2496 rados_ioctx_locator_set_key(self.io, _loc_key)
2497 self.locator_key = loc_key
2498
2499 def get_locator_key(self):
2500 """
2501 Get the locator_key of context
2502
2503 :returns: locator_key
2504 """
2505 return self.locator_key
2506
2507 @requires(('snap_id', long))
2508 def set_read(self, snap_id):
2509 """
2510 Set the snapshot for reading objects.
2511
2512 To stop to read from snapshot, use set_read(LIBRADOS_SNAP_HEAD)
2513
2514 :param snap_id: the snapshot Id
2515 :type snap_id: int
2516
2517 :raises: :class:`TypeError`
2518 """
2519 self.require_ioctx_open()
2520 cdef rados_snap_t _snap_id = snap_id
2521 with nogil:
2522 rados_ioctx_snap_set_read(self.io, _snap_id)
2523
2524 @requires(('nspace', str_type))
2525 def set_namespace(self, nspace):
2526 """
2527 Set the namespace for objects within an io context.
2528
2529 The namespace in addition to the object name fully identifies
2530 an object. This affects all subsequent operations of the io context
2531 - until a different namespace is set, all objects in this io context
2532 will be placed in the same namespace.
2533
2534 :param nspace: the namespace to use, or None/"" for the default namespace
2535 :type nspace: str
2536
2537 :raises: :class:`TypeError`
2538 """
2539 self.require_ioctx_open()
2540 if nspace is None:
2541 nspace = ""
2542 cnspace = cstr(nspace, 'nspace')
2543 cdef char *_nspace = cnspace
2544 with nogil:
2545 rados_ioctx_set_namespace(self.io, _nspace)
2546 self.nspace = nspace
2547
2548 def get_namespace(self):
2549 """
2550 Get the namespace of context
2551
2552 :returns: namespace
2553 """
2554 return self.nspace
2555
2556 def close(self):
2557 """
2558 Close a rados.Ioctx object.
2559
2560 This just tells librados that you no longer need to use the io context.
2561 It may not be freed immediately if there are pending asynchronous
2562 requests on it, but you should not use an io context again after
2563 calling this function on it.
2564 """
2565 if self.state == "open":
2566 self.require_ioctx_open()
2567 with nogil:
2568 rados_ioctx_destroy(self.io)
2569 self.state = "closed"
2570
2571
2572 @requires(('key', str_type), ('data', bytes))
2573 def write(self, key, data, offset=0):
2574 """
2575 Write data to an object synchronously
2576
2577 :param key: name of the object
2578 :type key: str
2579 :param data: data to write
2580 :type data: bytes
2581 :param offset: byte offset in the object to begin writing at
2582 :type offset: int
2583
2584 :raises: :class:`TypeError`
2585 :raises: :class:`LogicError`
2586 :returns: int - 0 on success
2587 """
2588 self.require_ioctx_open()
2589
2590 key = cstr(key, 'key')
2591 cdef:
2592 char *_key = key
2593 char *_data = data
2594 size_t length = len(data)
2595 uint64_t _offset = offset
2596
2597 with nogil:
2598 ret = rados_write(self.io, _key, _data, length, _offset)
2599 if ret == 0:
2600 return ret
2601 elif ret < 0:
2602 raise make_ex(ret, "Ioctx.write(%s): failed to write %s"
2603 % (self.name, key))
2604 else:
2605 raise LogicError("Ioctx.write(%s): rados_write \
2606 returned %d, but should return zero on success." % (self.name, ret))
2607
2608 @requires(('key', str_type), ('data', bytes))
2609 def write_full(self, key, data):
2610 """
2611 Write an entire object synchronously.
2612
2613 The object is filled with the provided data. If the object exists,
2614 it is atomically truncated and then written.
2615
2616 :param key: name of the object
2617 :type key: str
2618 :param data: data to write
2619 :type data: bytes
2620
2621 :raises: :class:`TypeError`
2622 :raises: :class:`Error`
2623 :returns: int - 0 on success
2624 """
2625 self.require_ioctx_open()
2626 key = cstr(key, 'key')
2627 cdef:
2628 char *_key = key
2629 char *_data = data
2630 size_t length = len(data)
2631
2632 with nogil:
2633 ret = rados_write_full(self.io, _key, _data, length)
2634 if ret == 0:
2635 return ret
2636 elif ret < 0:
2637 raise make_ex(ret, "Ioctx.write_full(%s): failed to write %s"
2638 % (self.name, key))
2639 else:
2640 raise LogicError("Ioctx.write_full(%s): rados_write_full \
2641 returned %d, but should return zero on success." % (self.name, ret))
2642
2643 @requires(('key', str_type), ('data', bytes))
2644 def append(self, key, data):
2645 """
2646 Append data to an object synchronously
2647
2648 :param key: name of the object
2649 :type key: str
2650 :param data: data to write
2651 :type data: bytes
2652
2653 :raises: :class:`TypeError`
2654 :raises: :class:`LogicError`
2655 :returns: int - 0 on success
2656 """
2657 self.require_ioctx_open()
2658 key = cstr(key, 'key')
2659 cdef:
2660 char *_key = key
2661 char *_data = data
2662 size_t length = len(data)
2663
2664 with nogil:
2665 ret = rados_append(self.io, _key, _data, length)
2666 if ret == 0:
2667 return ret
2668 elif ret < 0:
2669 raise make_ex(ret, "Ioctx.append(%s): failed to append %s"
2670 % (self.name, key))
2671 else:
2672 raise LogicError("Ioctx.append(%s): rados_append \
2673 returned %d, but should return zero on success." % (self.name, ret))
2674
2675 @requires(('key', str_type))
2676 def read(self, key, length=8192, offset=0):
2677 """
2678 Read data from an object synchronously
2679
2680 :param key: name of the object
2681 :type key: str
2682 :param length: the number of bytes to read (default=8192)
2683 :type length: int
2684 :param offset: byte offset in the object to begin reading at
2685 :type offset: int
2686
2687 :raises: :class:`TypeError`
2688 :raises: :class:`Error`
2689 :returns: str - data read from object
2690 """
2691 self.require_ioctx_open()
2692 key = cstr(key, 'key')
2693 cdef:
2694 char *_key = key
2695 char *ret_buf
2696 uint64_t _offset = offset
2697 size_t _length = length
2698 PyObject* ret_s = NULL
2699
2700 ret_s = PyBytes_FromStringAndSize(NULL, length)
2701 try:
2702 ret_buf = PyBytes_AsString(ret_s)
2703 with nogil:
2704 ret = rados_read(self.io, _key, ret_buf, _length, _offset)
2705 if ret < 0:
2706 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2707
2708 if ret != length:
2709 _PyBytes_Resize(&ret_s, ret)
2710
2711 return <object>ret_s
2712 finally:
2713 # We DECREF unconditionally: the cast to object above will have
2714 # INCREFed if necessary. This also takes care of exceptions,
2715 # including if _PyString_Resize fails (that will free the string
2716 # itself and set ret_s to NULL, hence XDECREF).
2717 ref.Py_XDECREF(ret_s)
2718
2719 @requires(('key', str_type), ('cls', str_type), ('method', str_type), ('data', bytes))
2720 def execute(self, key, cls, method, data, length=8192):
2721 """
2722 Execute an OSD class method on an object.
2723
2724 :param key: name of the object
2725 :type key: str
2726 :param cls: name of the object class
2727 :type cls: str
2728 :param method: name of the method
2729 :type method: str
2730 :param data: input data
2731 :type data: bytes
2732 :param length: size of output buffer in bytes (default=8192)
2733 :type length: int
2734
2735 :raises: :class:`TypeError`
2736 :raises: :class:`Error`
2737 :returns: (ret, method output)
2738 """
2739 self.require_ioctx_open()
2740
2741 key = cstr(key, 'key')
2742 cls = cstr(cls, 'cls')
2743 method = cstr(method, 'method')
2744 cdef:
2745 char *_key = key
2746 char *_cls = cls
2747 char *_method = method
2748 char *_data = data
2749 size_t _data_len = len(data)
2750
2751 char *ref_buf
2752 size_t _length = length
2753 PyObject* ret_s = NULL
2754
2755 ret_s = PyBytes_FromStringAndSize(NULL, length)
2756 try:
2757 ret_buf = PyBytes_AsString(ret_s)
2758 with nogil:
2759 ret = rados_exec(self.io, _key, _cls, _method, _data,
2760 _data_len, ret_buf, _length)
2761 if ret < 0:
2762 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2763
2764 if ret != length:
2765 _PyBytes_Resize(&ret_s, ret)
2766
2767 return ret, <object>ret_s
2768 finally:
2769 # We DECREF unconditionally: the cast to object above will have
2770 # INCREFed if necessary. This also takes care of exceptions,
2771 # including if _PyString_Resize fails (that will free the string
2772 # itself and set ret_s to NULL, hence XDECREF).
2773 ref.Py_XDECREF(ret_s)
2774
2775 def get_stats(self):
2776 """
2777 Get pool usage statistics
2778
2779 :returns: dict - contains the following keys:
2780
2781 - ``num_bytes`` (int) - size of pool in bytes
2782
2783 - ``num_kb`` (int) - size of pool in kbytes
2784
2785 - ``num_objects`` (int) - number of objects in the pool
2786
2787 - ``num_object_clones`` (int) - number of object clones
2788
2789 - ``num_object_copies`` (int) - number of object copies
2790
2791 - ``num_objects_missing_on_primary`` (int) - number of objets
2792 missing on primary
2793
2794 - ``num_objects_unfound`` (int) - number of unfound objects
2795
2796 - ``num_objects_degraded`` (int) - number of degraded objects
2797
2798 - ``num_rd`` (int) - bytes read
2799
2800 - ``num_rd_kb`` (int) - kbytes read
2801
2802 - ``num_wr`` (int) - bytes written
2803
2804 - ``num_wr_kb`` (int) - kbytes written
2805 """
2806 self.require_ioctx_open()
2807 cdef rados_pool_stat_t stats
2808 with nogil:
2809 ret = rados_ioctx_pool_stat(self.io, &stats)
2810 if ret < 0:
2811 raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name)
2812 return {'num_bytes': stats.num_bytes,
2813 'num_kb': stats.num_kb,
2814 'num_objects': stats.num_objects,
2815 'num_object_clones': stats.num_object_clones,
2816 'num_object_copies': stats.num_object_copies,
2817 "num_objects_missing_on_primary": stats.num_objects_missing_on_primary,
2818 "num_objects_unfound": stats.num_objects_unfound,
2819 "num_objects_degraded": stats.num_objects_degraded,
2820 "num_rd": stats.num_rd,
2821 "num_rd_kb": stats.num_rd_kb,
2822 "num_wr": stats.num_wr,
2823 "num_wr_kb": stats.num_wr_kb}
2824
2825 @requires(('key', str_type))
2826 def remove_object(self, key):
2827 """
2828 Delete an object
2829
2830 This does not delete any snapshots of the object.
2831
2832 :param key: the name of the object to delete
2833 :type key: str
2834
2835 :raises: :class:`TypeError`
2836 :raises: :class:`Error`
2837 :returns: bool - True on success
2838 """
2839 self.require_ioctx_open()
2840 key = cstr(key, 'key')
2841 cdef:
2842 char *_key = key
2843
2844 with nogil:
2845 ret = rados_remove(self.io, _key)
2846 if ret < 0:
2847 raise make_ex(ret, "Failed to remove '%s'" % key)
2848 return True
2849
2850 @requires(('key', str_type))
2851 def trunc(self, key, size):
2852 """
2853 Resize an object
2854
2855 If this enlarges the object, the new area is logically filled with
2856 zeroes. If this shrinks the object, the excess data is removed.
2857
2858 :param key: the name of the object to resize
2859 :type key: str
2860 :param size: the new size of the object in bytes
2861 :type size: int
2862
2863 :raises: :class:`TypeError`
2864 :raises: :class:`Error`
2865 :returns: int - 0 on success, otherwise raises error
2866 """
2867
2868 self.require_ioctx_open()
2869 key = cstr(key, 'key')
2870 cdef:
2871 char *_key = key
2872 uint64_t _size = size
2873
2874 with nogil:
2875 ret = rados_trunc(self.io, _key, _size)
2876 if ret < 0:
2877 raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))
2878 return ret
2879
2880 @requires(('key', str_type))
2881 def stat(self, key):
2882 """
2883 Get object stats (size/mtime)
2884
2885 :param key: the name of the object to get stats from
2886 :type key: str
2887
2888 :raises: :class:`TypeError`
2889 :raises: :class:`Error`
2890 :returns: (size,timestamp)
2891 """
2892 self.require_ioctx_open()
2893
2894 key = cstr(key, 'key')
2895 cdef:
2896 char *_key = key
2897 uint64_t psize
2898 time_t pmtime
2899
2900 with nogil:
2901 ret = rados_stat(self.io, _key, &psize, &pmtime)
2902 if ret < 0:
2903 raise make_ex(ret, "Failed to stat %r" % key)
2904 return psize, time.localtime(pmtime)
2905
2906 @requires(('key', str_type), ('xattr_name', str_type))
2907 def get_xattr(self, key, xattr_name):
2908 """
2909 Get the value of an extended attribute on an object.
2910
2911 :param key: the name of the object to get xattr from
2912 :type key: str
2913 :param xattr_name: which extended attribute to read
2914 :type xattr_name: str
2915
2916 :raises: :class:`TypeError`
2917 :raises: :class:`Error`
2918 :returns: str - value of the xattr
2919 """
2920 self.require_ioctx_open()
2921
2922 key = cstr(key, 'key')
2923 xattr_name = cstr(xattr_name, 'xattr_name')
2924 cdef:
2925 char *_key = key
2926 char *_xattr_name = xattr_name
2927 size_t ret_length = 4096
2928 char *ret_buf = NULL
2929
2930 try:
2931 while ret_length < 4096 * 1024 * 1024:
2932 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
2933 with nogil:
2934 ret = rados_getxattr(self.io, _key, _xattr_name, ret_buf, ret_length)
2935 if ret == -errno.ERANGE:
2936 ret_length *= 2
2937 elif ret < 0:
2938 raise make_ex(ret, "Failed to get xattr %r" % xattr_name)
2939 else:
2940 break
2941 return ret_buf[:ret]
2942 finally:
2943 free(ret_buf)
2944
2945 @requires(('oid', str_type))
2946 def get_xattrs(self, oid):
2947 """
2948 Start iterating over xattrs on an object.
2949
2950 :param oid: the name of the object to get xattrs from
2951 :type oid: str
2952
2953 :raises: :class:`TypeError`
2954 :raises: :class:`Error`
2955 :returns: XattrIterator
2956 """
2957 self.require_ioctx_open()
2958 return XattrIterator(self, oid)
2959
2960 @requires(('key', str_type), ('xattr_name', str_type), ('xattr_value', bytes))
2961 def set_xattr(self, key, xattr_name, xattr_value):
2962 """
2963 Set an extended attribute on an object.
2964
2965 :param key: the name of the object to set xattr to
2966 :type key: str
2967 :param xattr_name: which extended attribute to set
2968 :type xattr_name: str
2969 :param xattr_value: the value of the extended attribute
2970 :type xattr_value: bytes
2971
2972 :raises: :class:`TypeError`
2973 :raises: :class:`Error`
2974 :returns: bool - True on success, otherwise raise an error
2975 """
2976 self.require_ioctx_open()
2977
2978 key = cstr(key, 'key')
2979 xattr_name = cstr(xattr_name, 'xattr_name')
2980 cdef:
2981 char *_key = key
2982 char *_xattr_name = xattr_name
2983 char *_xattr_value = xattr_value
2984 size_t _xattr_value_len = len(xattr_value)
2985
2986 with nogil:
2987 ret = rados_setxattr(self.io, _key, _xattr_name,
2988 _xattr_value, _xattr_value_len)
2989 if ret < 0:
2990 raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
2991 return True
2992
2993 @requires(('key', str_type), ('xattr_name', str_type))
2994 def rm_xattr(self, key, xattr_name):
2995 """
2996 Removes an extended attribute on from an object.
2997
2998 :param key: the name of the object to remove xattr from
2999 :type key: str
3000 :param xattr_name: which extended attribute to remove
3001 :type xattr_name: str
3002
3003 :raises: :class:`TypeError`
3004 :raises: :class:`Error`
3005 :returns: bool - True on success, otherwise raise an error
3006 """
3007 self.require_ioctx_open()
3008
3009 key = cstr(key, 'key')
3010 xattr_name = cstr(xattr_name, 'xattr_name')
3011 cdef:
3012 char *_key = key
3013 char *_xattr_name = xattr_name
3014
3015 with nogil:
3016 ret = rados_rmxattr(self.io, _key, _xattr_name)
3017 if ret < 0:
3018 raise make_ex(ret, "Failed to delete key %r xattr %r" %
3019 (key, xattr_name))
3020 return True
3021
3022 def list_objects(self):
3023 """
3024 Get ObjectIterator on rados.Ioctx object.
3025
3026 :returns: ObjectIterator
3027 """
3028 self.require_ioctx_open()
3029 return ObjectIterator(self)
3030
3031 def list_snaps(self):
3032 """
3033 Get SnapIterator on rados.Ioctx object.
3034
3035 :returns: SnapIterator
3036 """
3037 self.require_ioctx_open()
3038 return SnapIterator(self)
3039
3040 @requires(('snap_name', str_type))
3041 def create_snap(self, snap_name):
3042 """
3043 Create a pool-wide snapshot
3044
3045 :param snap_name: the name of the snapshot
3046 :type snap_name: str
3047
3048 :raises: :class:`TypeError`
3049 :raises: :class:`Error`
3050 """
3051 self.require_ioctx_open()
3052 snap_name = cstr(snap_name, 'snap_name')
3053 cdef char *_snap_name = snap_name
3054
3055 with nogil:
3056 ret = rados_ioctx_snap_create(self.io, _snap_name)
3057 if ret != 0:
3058 raise make_ex(ret, "Failed to create snap %s" % snap_name)
3059
3060 @requires(('snap_name', str_type))
3061 def remove_snap(self, snap_name):
3062 """
3063 Removes a pool-wide snapshot
3064
3065 :param snap_name: the name of the snapshot
3066 :type snap_name: str
3067
3068 :raises: :class:`TypeError`
3069 :raises: :class:`Error`
3070 """
3071 self.require_ioctx_open()
3072 snap_name = cstr(snap_name, 'snap_name')
3073 cdef char *_snap_name = snap_name
3074
3075 with nogil:
3076 ret = rados_ioctx_snap_remove(self.io, _snap_name)
3077 if ret != 0:
3078 raise make_ex(ret, "Failed to remove snap %s" % snap_name)
3079
3080 @requires(('snap_name', str_type))
3081 def lookup_snap(self, snap_name):
3082 """
3083 Get the id of a pool snapshot
3084
3085 :param snap_name: the name of the snapshot to lookop
3086 :type snap_name: str
3087
3088 :raises: :class:`TypeError`
3089 :raises: :class:`Error`
3090 :returns: Snap - on success
3091 """
3092 self.require_ioctx_open()
3093 csnap_name = cstr(snap_name, 'snap_name')
3094 cdef:
3095 char *_snap_name = csnap_name
3096 rados_snap_t snap_id
3097
3098 with nogil:
3099 ret = rados_ioctx_snap_lookup(self.io, _snap_name, &snap_id)
3100 if ret != 0:
3101 raise make_ex(ret, "Failed to lookup snap %s" % snap_name)
3102 return Snap(self, snap_name, int(snap_id))
3103
3104 @requires(('oid', str_type), ('snap_name', str_type))
3105 def snap_rollback(self, oid, snap_name):
3106 """
3107 Rollback an object to a snapshot
3108
3109 :param oid: the name of the object
3110 :type oid: str
3111 :param snap_name: the name of the snapshot
3112 :type snap_name: str
3113
3114 :raises: :class:`TypeError`
3115 :raises: :class:`Error`
3116 """
3117 self.require_ioctx_open()
3118 oid = cstr(oid, 'oid')
3119 snap_name = cstr(snap_name, 'snap_name')
3120 cdef:
3121 char *_snap_name = snap_name
3122 char *_oid = oid
3123
3124 with nogil:
3125 ret = rados_ioctx_snap_rollback(self.io, _oid, _snap_name)
3126 if ret != 0:
3127 raise make_ex(ret, "Failed to rollback %s" % oid)
3128
3129 def create_self_managed_snap(self):
3130 """
3131 Creates a self-managed snapshot
3132
3133 :returns: snap id on success
3134
3135 :raises: :class:`Error`
3136 """
3137 self.require_ioctx_open()
3138 cdef:
3139 rados_snap_t _snap_id
3140 with nogil:
3141 ret = rados_ioctx_selfmanaged_snap_create(self.io, &_snap_id)
3142 if ret != 0:
3143 raise make_ex(ret, "Failed to create self-managed snapshot")
3144 return int(_snap_id)
3145
3146 @requires(('snap_id', int))
3147 def remove_self_managed_snap(self, snap_id):
3148 """
3149 Removes a self-managed snapshot
3150
3151 :param snap_id: the name of the snapshot
3152 :type snap_id: int
3153
3154 :raises: :class:`TypeError`
3155 :raises: :class:`Error`
3156 """
3157 self.require_ioctx_open()
3158 cdef:
3159 rados_snap_t _snap_id = snap_id
3160 with nogil:
3161 ret = rados_ioctx_selfmanaged_snap_remove(self.io, _snap_id)
3162 if ret != 0:
3163 raise make_ex(ret, "Failed to remove self-managed snapshot")
3164
3165 def set_self_managed_snap_write(self, snaps):
3166 """
3167 Updates the write context to the specified self-managed
3168 snapshot ids.
3169
3170 :param snaps: all associated self-managed snapshot ids
3171 :type snaps: list
3172
3173 :raises: :class:`TypeError`
3174 :raises: :class:`Error`
3175 """
3176 self.require_ioctx_open()
3177 sorted_snaps = []
3178 snap_seq = 0
3179 if snaps:
3180 sorted_snaps = sorted([int(x) for x in snaps], reverse=True)
3181 snap_seq = sorted_snaps[0]
3182
3183 cdef:
3184 rados_snap_t _snap_seq = snap_seq
3185 rados_snap_t *_snaps = NULL
3186 int _num_snaps = len(sorted_snaps)
3187 try:
3188 _snaps = <rados_snap_t *>malloc(_num_snaps * sizeof(rados_snap_t))
3189 for i in range(len(sorted_snaps)):
3190 _snaps[i] = sorted_snaps[i]
3191 with nogil:
3192 ret = rados_ioctx_selfmanaged_snap_set_write_ctx(self.io,
3193 _snap_seq,
3194 _snaps,
3195 _num_snaps)
3196 if ret != 0:
3197 raise make_ex(ret, "Failed to update snapshot write context")
3198 finally:
3199 free(_snaps)
3200
3201 @requires(('oid', str_type), ('snap_id', int))
3202 def rollback_self_managed_snap(self, oid, snap_id):
3203 """
3204 Rolls an specific object back to a self-managed snapshot revision
3205
3206 :param oid: the name of the object
3207 :type oid: str
3208 :param snap_id: the name of the snapshot
3209 :type snap_id: int
3210
3211 :raises: :class:`TypeError`
3212 :raises: :class:`Error`
3213 """
3214 self.require_ioctx_open()
3215 oid = cstr(oid, 'oid')
3216 cdef:
3217 char *_oid = oid
3218 rados_snap_t _snap_id = snap_id
3219 with nogil:
3220 ret = rados_ioctx_selfmanaged_snap_rollback(self.io, _oid, _snap_id)
3221 if ret != 0:
3222 raise make_ex(ret, "Failed to rollback %s" % oid)
3223
3224 def get_last_version(self):
3225 """
3226 Return the version of the last object read or written to.
3227
3228 This exposes the internal version number of the last object read or
3229 written via this io context
3230
3231 :returns: version of the last object used
3232 """
3233 self.require_ioctx_open()
3234 with nogil:
3235 ret = rados_get_last_version(self.io)
3236 return int(ret)
3237
3238 def create_write_op(self):
3239 """
3240 create write operation object.
3241 need call release_write_op after use
3242 """
3243 return WriteOp().create()
3244
3245 def create_read_op(self):
3246 """
3247 create read operation object.
3248 need call release_read_op after use
3249 """
3250 return ReadOp().create()
3251
3252 def release_write_op(self, write_op):
3253 """
3254 release memory alloc by create_write_op
3255 """
3256 write_op.release()
3257
3258 def release_read_op(self, read_op):
3259 """
3260 release memory alloc by create_read_op
3261 :para read_op: read_op object
3262 :type: int
3263 """
3264 read_op.release()
3265
3266 @requires(('write_op', WriteOp), ('keys', tuple), ('values', tuple))
3267 def set_omap(self, write_op, keys, values):
3268 """
3269 set keys values to write_op
3270 :para write_op: write_operation object
3271 :type write_op: WriteOp
3272 :para keys: a tuple of keys
3273 :type keys: tuple
3274 :para values: a tuple of values
3275 :type values: tuple
3276 """
3277
3278 if len(keys) != len(values):
3279 raise Error("Rados(): keys and values must have the same number of items")
3280
3281 keys = cstr_list(keys, 'keys')
3282 cdef:
3283 WriteOp _write_op = write_op
3284 size_t key_num = len(keys)
3285 char **_keys = to_bytes_array(keys)
3286 char **_values = to_bytes_array(values)
3287 size_t *_lens = to_csize_t_array([len(v) for v in values])
3288
3289 try:
3290 with nogil:
3291 rados_write_op_omap_set(_write_op.write_op,
3292 <const char**>_keys,
3293 <const char**>_values,
3294 <const size_t*>_lens, key_num)
3295 finally:
3296 free(_keys)
3297 free(_values)
3298 free(_lens)
3299
3300 @requires(('write_op', WriteOp), ('oid', str_type), ('mtime', opt(int)), ('flags', opt(int)))
3301 def operate_write_op(self, write_op, oid, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3302 """
3303 excute the real write operation
3304 :para write_op: write operation object
3305 :type write_op: WriteOp
3306 :para oid: object name
3307 :type oid: str
3308 :para mtime: the time to set the mtime to, 0 for the current time
3309 :type mtime: int
3310 :para flags: flags to apply to the entire operation
3311 :type flags: int
3312 """
3313
3314 oid = cstr(oid, 'oid')
3315 cdef:
3316 WriteOp _write_op = write_op
3317 char *_oid = oid
3318 time_t _mtime = mtime
3319 int _flags = flags
3320
3321 with nogil:
3322 ret = rados_write_op_operate(_write_op.write_op, self.io, _oid, &_mtime, _flags)
3323 if ret != 0:
3324 raise make_ex(ret, "Failed to operate write op for oid %s" % oid)
3325
3326 @requires(('write_op', WriteOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('mtime', opt(int)), ('flags', opt(int)))
3327 def operate_aio_write_op(self, write_op, oid, oncomplete=None, onsafe=None, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3328 """
3329 excute the real write operation asynchronously
3330 :para write_op: write operation object
3331 :type write_op: WriteOp
3332 :para oid: object name
3333 :type oid: str
3334 :param oncomplete: what to do when the remove is safe and complete in memory
3335 on all replicas
3336 :type oncomplete: completion
3337 :param onsafe: what to do when the remove is safe and complete on storage
3338 on all replicas
3339 :type onsafe: completion
3340 :para mtime: the time to set the mtime to, 0 for the current time
3341 :type mtime: int
3342 :para flags: flags to apply to the entire operation
3343 :type flags: int
3344
3345 :raises: :class:`Error`
3346 :returns: completion object
3347 """
3348
3349 oid = cstr(oid, 'oid')
3350 cdef:
3351 WriteOp _write_op = write_op
3352 char *_oid = oid
3353 Completion completion
3354 time_t _mtime = mtime
3355 int _flags = flags
3356
3357 completion = self.__get_completion(oncomplete, onsafe)
3358 self.__track_completion(completion)
3359
3360 with nogil:
3361 ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid,
3362 &_mtime, _flags)
3363 if ret != 0:
3364 completion._cleanup()
3365 raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid)
3366 return completion
3367
3368 @requires(('read_op', ReadOp), ('oid', str_type), ('flag', opt(int)))
3369 def operate_read_op(self, read_op, oid, flag=LIBRADOS_OPERATION_NOFLAG):
3370 """
3371 excute the real read operation
3372 :para read_op: read operation object
3373 :type read_op: ReadOp
3374 :para oid: object name
3375 :type oid: str
3376 :para flag: flags to apply to the entire operation
3377 :type flag: int
3378 """
3379 oid = cstr(oid, 'oid')
3380 cdef:
3381 ReadOp _read_op = read_op
3382 char *_oid = oid
3383 int _flag = flag
3384
3385 with nogil:
3386 ret = rados_read_op_operate(_read_op.read_op, self.io, _oid, _flag)
3387 if ret != 0:
3388 raise make_ex(ret, "Failed to operate read op for oid %s" % oid)
3389
3390 @requires(('read_op', ReadOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('flag', opt(int)))
3391 def operate_aio_read_op(self, read_op, oid, oncomplete=None, onsafe=None, flag=LIBRADOS_OPERATION_NOFLAG):
3392 """
3393 excute the real read operation
3394 :para read_op: read operation object
3395 :type read_op: ReadOp
3396 :para oid: object name
3397 :type oid: str
3398 :param oncomplete: what to do when the remove is safe and complete in memory
3399 on all replicas
3400 :type oncomplete: completion
3401 :param onsafe: what to do when the remove is safe and complete on storage
3402 on all replicas
3403 :type onsafe: completion
3404 :para flag: flags to apply to the entire operation
3405 :type flag: int
3406 """
3407 oid = cstr(oid, 'oid')
3408 cdef:
3409 ReadOp _read_op = read_op
3410 char *_oid = oid
3411 Completion completion
3412 int _flag = flag
3413
3414 completion = self.__get_completion(oncomplete, onsafe)
3415 self.__track_completion(completion)
3416
3417 with nogil:
3418 ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag)
3419 if ret != 0:
3420 completion._cleanup()
3421 raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid)
3422 return completion
3423
3424 @requires(('read_op', ReadOp), ('start_after', str_type), ('filter_prefix', str_type), ('max_return', int))
3425 def get_omap_vals(self, read_op, start_after, filter_prefix, max_return):
3426 """
3427 get the omap values
3428 :para read_op: read operation object
3429 :type read_op: ReadOp
3430 :para start_after: list keys starting after start_after
3431 :type start_after: str
3432 :para filter_prefix: list only keys beginning with filter_prefix
3433 :type filter_prefix: str
3434 :para max_return: list no more than max_return key/value pairs
3435 :type max_return: int
3436 :returns: an iterator over the requested omap values, return value from this action
3437 """
3438
3439 start_after = cstr(start_after, 'start_after') if start_after else None
3440 filter_prefix = cstr(filter_prefix, 'filter_prefix') if filter_prefix else None
3441 cdef:
3442 char *_start_after = opt_str(start_after)
3443 char *_filter_prefix = opt_str(filter_prefix)
3444 ReadOp _read_op = read_op
3445 rados_omap_iter_t iter_addr = NULL
3446 int _max_return = max_return
3447 int prval = 0
3448
3449 with nogil:
3450 rados_read_op_omap_get_vals2(_read_op.read_op, _start_after, _filter_prefix,
3451 _max_return, &iter_addr, NULL, &prval)
3452 it = OmapIterator(self)
3453 it.ctx = iter_addr
3454 return it, int(prval)
3455
3456 @requires(('read_op', ReadOp), ('start_after', str_type), ('max_return', int))
3457 def get_omap_keys(self, read_op, start_after, max_return):
3458 """
3459 get the omap keys
3460 :para read_op: read operation object
3461 :type read_op: ReadOp
3462 :para start_after: list keys starting after start_after
3463 :type start_after: str
3464 :para max_return: list no more than max_return key/value pairs
3465 :type max_return: int
3466 :returns: an iterator over the requested omap values, return value from this action
3467 """
3468 start_after = cstr(start_after, 'start_after') if start_after else None
3469 cdef:
3470 char *_start_after = opt_str(start_after)
3471 ReadOp _read_op = read_op
3472 rados_omap_iter_t iter_addr = NULL
3473 int _max_return = max_return
3474 int prval = 0
3475
3476 with nogil:
3477 rados_read_op_omap_get_keys2(_read_op.read_op, _start_after,
3478 _max_return, &iter_addr, NULL, &prval)
3479 it = OmapIterator(self)
3480 it.ctx = iter_addr
3481 return it, int(prval)
3482
3483 @requires(('read_op', ReadOp), ('keys', tuple))
3484 def get_omap_vals_by_keys(self, read_op, keys):
3485 """
3486 get the omap values by keys
3487 :para read_op: read operation object
3488 :type read_op: ReadOp
3489 :para keys: input key tuple
3490 :type keys: tuple
3491 :returns: an iterator over the requested omap values, return value from this action
3492 """
3493 keys = cstr_list(keys, 'keys')
3494 cdef:
3495 ReadOp _read_op = read_op
3496 rados_omap_iter_t iter_addr
3497 char **_keys = to_bytes_array(keys)
3498 size_t key_num = len(keys)
3499 int prval = 0
3500
3501 try:
3502 with nogil:
3503 rados_read_op_omap_get_vals_by_keys(_read_op.read_op,
3504 <const char**>_keys,
3505 key_num, &iter_addr, &prval)
3506 it = OmapIterator(self)
3507 it.ctx = iter_addr
3508 return it, int(prval)
3509 finally:
3510 free(_keys)
3511
3512 @requires(('write_op', WriteOp), ('keys', tuple))
3513 def remove_omap_keys(self, write_op, keys):
3514 """
3515 remove omap keys specifiled
3516 :para write_op: write operation object
3517 :type write_op: WriteOp
3518 :para keys: input key tuple
3519 :type keys: tuple
3520 """
3521
3522 keys = cstr_list(keys, 'keys')
3523 cdef:
3524 WriteOp _write_op = write_op
3525 size_t key_num = len(keys)
3526 char **_keys = to_bytes_array(keys)
3527
3528 try:
3529 with nogil:
3530 rados_write_op_omap_rm_keys(_write_op.write_op, <const char**>_keys, key_num)
3531 finally:
3532 free(_keys)
3533
3534 @requires(('write_op', WriteOp))
3535 def clear_omap(self, write_op):
3536 """
3537 Remove all key/value pairs from an object
3538 :para write_op: write operation object
3539 :type write_op: WriteOp
3540 """
3541
3542 cdef:
3543 WriteOp _write_op = write_op
3544
3545 with nogil:
3546 rados_write_op_omap_clear(_write_op.write_op)
3547
3548 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('desc', str_type),
3549 ('duration', opt(int)), ('flags', int))
3550 def lock_exclusive(self, key, name, cookie, desc="", duration=None, flags=0):
3551
3552 """
3553 Take an exclusive lock on an object
3554
3555 :param key: name of the object
3556 :type key: str
3557 :param name: name of the lock
3558 :type name: str
3559 :param cookie: cookie of the lock
3560 :type cookie: str
3561 :param desc: description of the lock
3562 :type desc: str
3563 :param duration: duration of the lock in seconds
3564 :type duration: int
3565 :param flags: flags
3566 :type flags: int
3567
3568 :raises: :class:`TypeError`
3569 :raises: :class:`Error`
3570 """
3571 self.require_ioctx_open()
3572
3573 key = cstr(key, 'key')
3574 name = cstr(name, 'name')
3575 cookie = cstr(cookie, 'cookie')
3576 desc = cstr(desc, 'desc')
3577
3578 cdef:
3579 char* _key = key
3580 char* _name = name
3581 char* _cookie = cookie
3582 char* _desc = desc
3583 uint8_t _flags = flags
3584 timeval _duration
3585
3586 if duration is None:
3587 with nogil:
3588 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3589 NULL, _flags)
3590 else:
3591 _duration.tv_sec = duration
3592 with nogil:
3593 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3594 &_duration, _flags)
3595
3596 if ret < 0:
3597 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3598
3599 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('tag', str_type),
3600 ('desc', str_type), ('duration', opt(int)), ('flags', int))
3601 def lock_shared(self, key, name, cookie, tag, desc="", duration=None, flags=0):
3602
3603 """
3604 Take a shared lock on an object
3605
3606 :param key: name of the object
3607 :type key: str
3608 :param name: name of the lock
3609 :type name: str
3610 :param cookie: cookie of the lock
3611 :type cookie: str
3612 :param tag: tag of the lock
3613 :type tag: str
3614 :param desc: description of the lock
3615 :type desc: str
3616 :param duration: duration of the lock in seconds
3617 :type duration: int
3618 :param flags: flags
3619 :type flags: int
3620
3621 :raises: :class:`TypeError`
3622 :raises: :class:`Error`
3623 """
3624 self.require_ioctx_open()
3625
3626 key = cstr(key, 'key')
3627 tag = cstr(tag, 'tag')
3628 name = cstr(name, 'name')
3629 cookie = cstr(cookie, 'cookie')
3630 desc = cstr(desc, 'desc')
3631
3632 cdef:
3633 char* _key = key
3634 char* _tag = tag
3635 char* _name = name
3636 char* _cookie = cookie
3637 char* _desc = desc
3638 uint8_t _flags = flags
3639 timeval _duration
3640
3641 if duration is None:
3642 with nogil:
3643 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3644 NULL, _flags)
3645 else:
3646 _duration.tv_sec = duration
3647 with nogil:
3648 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3649 &_duration, _flags)
3650 if ret < 0:
3651 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3652
3653 @requires(('key', str_type), ('name', str_type), ('cookie', str_type))
3654 def unlock(self, key, name, cookie):
3655
3656 """
3657 Release a shared or exclusive lock on an object
3658
3659 :param key: name of the object
3660 :type key: str
3661 :param name: name of the lock
3662 :type name: str
3663 :param cookie: cookie of the lock
3664 :type cookie: str
3665
3666 :raises: :class:`TypeError`
3667 :raises: :class:`Error`
3668 """
3669 self.require_ioctx_open()
3670
3671 key = cstr(key, 'key')
3672 name = cstr(name, 'name')
3673 cookie = cstr(cookie, 'cookie')
3674
3675 cdef:
3676 char* _key = key
3677 char* _name = name
3678 char* _cookie = cookie
3679
3680 with nogil:
3681 ret = rados_unlock(self.io, _key, _name, _cookie)
3682 if ret < 0:
3683 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3684
3685 def application_enable(self, app_name, force=False):
3686 """
3687 Enable an application on an OSD pool
3688
3689 :param app_name: application name
3690 :type app_name: str
3691 :param force: False if only a single app should exist per pool
3692 :type expire_seconds: boool
3693
3694 :raises: :class:`Error`
3695 """
3696 app_name = cstr(app_name, 'app_name')
3697 cdef:
3698 char *_app_name = app_name
3699 int _force = (1 if force else 0)
3700
3701 with nogil:
3702 ret = rados_application_enable(self.io, _app_name, _force)
3703 if ret < 0:
3704 raise make_ex(ret, "error enabling application")
3705
3706 def application_list(self):
3707 """
3708 Returns a list of enabled applications
3709
3710 :returns: list of app name string
3711 """
3712 cdef:
3713 size_t length = 128
3714 char *apps = NULL
3715
3716 try:
3717 while True:
3718 apps = <char *>realloc_chk(apps, length)
3719 with nogil:
3720 ret = rados_application_list(self.io, apps, &length)
3721 if ret == 0:
3722 return [decode_cstr(app) for app in
3723 apps[:length].split(b'\0') if app]
3724 elif ret == -errno.ENOENT:
3725 return None
3726 elif ret == -errno.ERANGE:
3727 pass
3728 else:
3729 raise make_ex(ret, "error listing applications")
3730 finally:
3731 free(apps)
3732
3733 def application_metadata_set(self, app_name, key, value):
3734 """
3735 Sets application metadata on an OSD pool
3736
3737 :param app_name: application name
3738 :type app_name: str
3739 :param key: metadata key
3740 :type key: str
3741 :param value: metadata value
3742 :type value: str
3743
3744 :raises: :class:`Error`
3745 """
3746 app_name = cstr(app_name, 'app_name')
3747 key = cstr(key, 'key')
3748 value = cstr(value, 'value')
3749 cdef:
3750 char *_app_name = app_name
3751 char *_key = key
3752 char *_value = value
3753
3754 with nogil:
3755 ret = rados_application_metadata_set(self.io, _app_name, _key,
3756 _value)
3757 if ret < 0:
3758 raise make_ex(ret, "error setting application metadata")
3759
3760 def application_metadata_remove(self, app_name, key):
3761 """
3762 Remove application metadata from an OSD pool
3763
3764 :param app_name: application name
3765 :type app_name: str
3766 :param key: metadata key
3767 :type key: str
3768
3769 :raises: :class:`Error`
3770 """
3771 app_name = cstr(app_name, 'app_name')
3772 key = cstr(key, 'key')
3773 cdef:
3774 char *_app_name = app_name
3775 char *_key = key
3776
3777 with nogil:
3778 ret = rados_application_metadata_remove(self.io, _app_name, _key)
3779 if ret < 0:
3780 raise make_ex(ret, "error removing application metadata")
3781
3782 def application_metadata_list(self, app_name):
3783 """
3784 Returns a list of enabled applications
3785
3786 :param app_name: application name
3787 :type app_name: str
3788 :returns: list of key/value tuples
3789 """
3790 app_name = cstr(app_name, 'app_name')
3791 cdef:
3792 char *_app_name = app_name
3793 size_t key_length = 128
3794 size_t val_length = 128
3795 char *c_keys = NULL
3796 char *c_vals = NULL
3797
3798 try:
3799 while True:
3800 c_keys = <char *>realloc_chk(c_keys, key_length)
3801 c_vals = <char *>realloc_chk(c_vals, val_length)
3802 with nogil:
3803 ret = rados_application_metadata_list(self.io, _app_name,
3804 c_keys, &key_length,
3805 c_vals, &val_length)
3806 if ret == 0:
3807 keys = [decode_cstr(key) for key in
3808 c_keys[:key_length].split(b'\0') if key]
3809 vals = [decode_cstr(val) for val in
3810 c_vals[:val_length].split(b'\0') if val]
3811 return zip(keys, vals)
3812 elif ret == -errno.ERANGE:
3813 pass
3814 else:
3815 raise make_ex(ret, "error listing application metadata")
3816 finally:
3817 free(c_keys)
3818 free(c_vals)
3819
3820
3821 def set_object_locator(func):
3822 def retfunc(self, *args, **kwargs):
3823 if self.locator_key is not None:
3824 old_locator = self.ioctx.get_locator_key()
3825 self.ioctx.set_locator_key(self.locator_key)
3826 retval = func(self, *args, **kwargs)
3827 self.ioctx.set_locator_key(old_locator)
3828 return retval
3829 else:
3830 return func(self, *args, **kwargs)
3831 return retfunc
3832
3833
3834 def set_object_namespace(func):
3835 def retfunc(self, *args, **kwargs):
3836 if self.nspace is None:
3837 raise LogicError("Namespace not set properly in context")
3838 old_nspace = self.ioctx.get_namespace()
3839 self.ioctx.set_namespace(self.nspace)
3840 retval = func(self, *args, **kwargs)
3841 self.ioctx.set_namespace(old_nspace)
3842 return retval
3843 return retfunc
3844
3845
3846 class Object(object):
3847 """Rados object wrapper, makes the object look like a file"""
3848 def __init__(self, ioctx, key, locator_key=None, nspace=None):
3849 self.key = key
3850 self.ioctx = ioctx
3851 self.offset = 0
3852 self.state = "exists"
3853 self.locator_key = locator_key
3854 self.nspace = "" if nspace is None else nspace
3855
3856 def __str__(self):
3857 return "rados.Object(ioctx=%s,key=%s,nspace=%s,locator=%s)" % \
3858 (str(self.ioctx), self.key, "--default--"
3859 if self.nspace is "" else self.nspace, self.locator_key)
3860
3861 def require_object_exists(self):
3862 if self.state != "exists":
3863 raise ObjectStateError("The object is %s" % self.state)
3864
3865 @set_object_locator
3866 @set_object_namespace
3867 def read(self, length=1024 * 1024):
3868 self.require_object_exists()
3869 ret = self.ioctx.read(self.key, length, self.offset)
3870 self.offset += len(ret)
3871 return ret
3872
3873 @set_object_locator
3874 @set_object_namespace
3875 def write(self, string_to_write):
3876 self.require_object_exists()
3877 ret = self.ioctx.write(self.key, string_to_write, self.offset)
3878 if ret == 0:
3879 self.offset += len(string_to_write)
3880 return ret
3881
3882 @set_object_locator
3883 @set_object_namespace
3884 def remove(self):
3885 self.require_object_exists()
3886 self.ioctx.remove_object(self.key)
3887 self.state = "removed"
3888
3889 @set_object_locator
3890 @set_object_namespace
3891 def stat(self):
3892 self.require_object_exists()
3893 return self.ioctx.stat(self.key)
3894
3895 def seek(self, position):
3896 self.require_object_exists()
3897 self.offset = position
3898
3899 @set_object_locator
3900 @set_object_namespace
3901 def get_xattr(self, xattr_name):
3902 self.require_object_exists()
3903 return self.ioctx.get_xattr(self.key, xattr_name)
3904
3905 @set_object_locator
3906 @set_object_namespace
3907 def get_xattrs(self):
3908 self.require_object_exists()
3909 return self.ioctx.get_xattrs(self.key)
3910
3911 @set_object_locator
3912 @set_object_namespace
3913 def set_xattr(self, xattr_name, xattr_value):
3914 self.require_object_exists()
3915 return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)
3916
3917 @set_object_locator
3918 @set_object_namespace
3919 def rm_xattr(self, xattr_name):
3920 self.require_object_exists()
3921 return self.ioctx.rm_xattr(self.key, xattr_name)
3922
3923 MONITOR_LEVELS = [
3924 "debug",
3925 "info",
3926 "warn", "warning",
3927 "err", "error",
3928 "sec",
3929 ]
3930
3931
3932 class MonitorLog(object):
3933 # NOTE(sileht): Keep this class for backward compat
3934 # method moved to Rados.monitor_log()
3935 """
3936 For watching cluster log messages. Instantiate an object and keep
3937 it around while callback is periodically called. Construct with
3938 'level' to monitor 'level' messages (one of MONITOR_LEVELS).
3939 arg will be passed to the callback.
3940
3941 callback will be called with:
3942 arg (given to __init__)
3943 line (the full line, including timestamp, who, level, msg)
3944 who (which entity issued the log message)
3945 timestamp_sec (sec of a struct timespec)
3946 timestamp_nsec (sec of a struct timespec)
3947 seq (sequence number)
3948 level (string representing the level of the log message)
3949 msg (the message itself)
3950 callback's return value is ignored
3951 """
3952 def __init__(self, cluster, level, callback, arg):
3953 self.level = level
3954 self.callback = callback
3955 self.arg = arg
3956 self.cluster = cluster
3957 self.cluster.monitor_log(level, callback, arg)
3958