]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/rados/rados.pyx
update sources to 12.2.8
[ceph.git] / ceph / src / pybind / rados / rados.pyx
1 # cython: embedsignature=True
2 """
3 This module is a thin wrapper around librados.
4
5 Error codes from librados are turned into exceptions that subclass
6 :class:`Error`. Almost all methods may raise :class:`Error(the base class of all rados exceptions), :class:`PermissionError`
7 (the base class of all rados exceptions), :class:`PermissionError`
8 and :class:`IOError`, in addition to those documented for the
9 method.
10 """
11 # Copyright 2011 Josh Durgin
12 # Copyright 2011, Hannu Valtonen <hannu.valtonen@ormod.com>
13 # Copyright 2015 Hector Martin <marcan@marcan.st>
14 # Copyright 2016 Mehdi Abaakouk <sileht@redhat.com>
15
16 from cpython cimport PyObject, ref
17 from cpython.pycapsule cimport *
18 from libc cimport errno
19 from libc.stdint cimport *
20 from libc.stdlib cimport malloc, realloc, free
21
22 import sys
23 import threading
24 import time
25
26 from collections import Callable
27 from datetime import datetime
28 from functools import partial, wraps
29 from itertools import chain
30
31 # Are we running Python 2.x
32 if sys.version_info[0] < 3:
33 str_type = basestring
34 else:
35 str_type = str
36
37
38 cdef extern from "Python.h":
39 # These are in cpython/string.pxd, but use "object" types instead of
40 # PyObject*, which invokes assumptions in cpython that we need to
41 # legitimately break to implement zero-copy string buffers in Ioctx.read().
42 # This is valid use of the Python API and documented as a special case.
43 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
44 char* PyBytes_AsString(PyObject *string) except NULL
45 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
46 void PyEval_InitThreads()
47
48
49 cdef extern from "time.h":
50 ctypedef long int time_t
51 ctypedef long int suseconds_t
52
53
54 cdef extern from "sys/time.h":
55 cdef struct timeval:
56 time_t tv_sec
57 suseconds_t tv_usec
58
59
60 cdef extern from "rados/rados_types.h" nogil:
61 cdef char* _LIBRADOS_ALL_NSPACES "LIBRADOS_ALL_NSPACES"
62
63
64 cdef extern from "rados/librados.h" nogil:
65 enum:
66 _LIBRADOS_OP_FLAG_EXCL "LIBRADOS_OP_FLAG_EXCL"
67 _LIBRADOS_OP_FLAG_FAILOK "LIBRADOS_OP_FLAG_FAILOK"
68 _LIBRADOS_OP_FLAG_FADVISE_RANDOM "LIBRADOS_OP_FLAG_FADVISE_RANDOM"
69 _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL "LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL"
70 _LIBRADOS_OP_FLAG_FADVISE_WILLNEED "LIBRADOS_OP_FLAG_FADVISE_WILLNEED"
71 _LIBRADOS_OP_FLAG_FADVISE_DONTNEED "LIBRADOS_OP_FLAG_FADVISE_DONTNEED"
72 _LIBRADOS_OP_FLAG_FADVISE_NOCACHE "LIBRADOS_OP_FLAG_FADVISE_NOCACHE"
73
74
75 enum:
76 _LIBRADOS_OPERATION_NOFLAG "LIBRADOS_OPERATION_NOFLAG"
77 _LIBRADOS_OPERATION_BALANCE_READS "LIBRADOS_OPERATION_BALANCE_READS"
78 _LIBRADOS_OPERATION_LOCALIZE_READS "LIBRADOS_OPERATION_LOCALIZE_READS"
79 _LIBRADOS_OPERATION_ORDER_READS_WRITES "LIBRADOS_OPERATION_ORDER_READS_WRITES"
80 _LIBRADOS_OPERATION_IGNORE_CACHE "LIBRADOS_OPERATION_IGNORE_CACHE"
81 _LIBRADOS_OPERATION_SKIPRWLOCKS "LIBRADOS_OPERATION_SKIPRWLOCKS"
82 _LIBRADOS_OPERATION_IGNORE_OVERLAY "LIBRADOS_OPERATION_IGNORE_OVERLAY"
83 _LIBRADOS_CREATE_EXCLUSIVE "LIBRADOS_CREATE_EXCLUSIVE"
84 _LIBRADOS_CREATE_IDEMPOTENT "LIBRADOS_CREATE_IDEMPOTENT"
85
86 cdef uint64_t _LIBRADOS_SNAP_HEAD "LIBRADOS_SNAP_HEAD"
87
88 ctypedef void* rados_t
89 ctypedef void* rados_config_t
90 ctypedef void* rados_ioctx_t
91 ctypedef void* rados_xattrs_iter_t
92 ctypedef void* rados_omap_iter_t
93 ctypedef void* rados_list_ctx_t
94 ctypedef uint64_t rados_snap_t
95 ctypedef void *rados_write_op_t
96 ctypedef void *rados_read_op_t
97 ctypedef void *rados_completion_t
98 ctypedef void (*rados_callback_t)(rados_completion_t cb, void *arg)
99 ctypedef void (*rados_log_callback_t)(void *arg, const char *line, const char *who,
100 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
101 ctypedef void (*rados_log_callback2_t)(void *arg, const char *line, const char *channel, const char *who, const char *name,
102 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
103
104
105 cdef struct rados_cluster_stat_t:
106 uint64_t kb
107 uint64_t kb_used
108 uint64_t kb_avail
109 uint64_t num_objects
110
111 cdef struct rados_pool_stat_t:
112 uint64_t num_bytes
113 uint64_t num_kb
114 uint64_t num_objects
115 uint64_t num_object_clones
116 uint64_t num_object_copies
117 uint64_t num_objects_missing_on_primary
118 uint64_t num_objects_unfound
119 uint64_t num_objects_degraded
120 uint64_t num_rd
121 uint64_t num_rd_kb
122 uint64_t num_wr
123 uint64_t num_wr_kb
124
125 void rados_buffer_free(char *buf)
126
127 void rados_version(int *major, int *minor, int *extra)
128 int rados_create2(rados_t *pcluster, const char *const clustername,
129 const char * const name, uint64_t flags)
130 int rados_create_with_context(rados_t *cluster, rados_config_t cct)
131 int rados_connect(rados_t cluster)
132 void rados_shutdown(rados_t cluster)
133 int rados_conf_read_file(rados_t cluster, const char *path)
134 int rados_conf_parse_argv_remainder(rados_t cluster, int argc, const char **argv, const char **remargv)
135 int rados_conf_parse_env(rados_t cluster, const char *var)
136 int rados_conf_set(rados_t cluster, char *option, const char *value)
137 int rados_conf_get(rados_t cluster, char *option, char *buf, size_t len)
138
139 int rados_ioctx_pool_stat(rados_ioctx_t io, rados_pool_stat_t *stats)
140 int64_t rados_pool_lookup(rados_t cluster, const char *pool_name)
141 int rados_pool_reverse_lookup(rados_t cluster, int64_t id, char *buf, size_t maxlen)
142 int rados_pool_create(rados_t cluster, const char *pool_name)
143 int rados_pool_create_with_auid(rados_t cluster, const char *pool_name, uint64_t auid)
144 int rados_pool_create_with_crush_rule(rados_t cluster, const char *pool_name, uint8_t crush_rule_num)
145 int rados_pool_create_with_all(rados_t cluster, const char *pool_name, uint64_t auid, uint8_t crush_rule_num)
146 int rados_pool_get_base_tier(rados_t cluster, int64_t pool, int64_t *base_tier)
147 int rados_pool_list(rados_t cluster, char *buf, size_t len)
148 int rados_pool_delete(rados_t cluster, const char *pool_name)
149 int rados_inconsistent_pg_list(rados_t cluster, int64_t pool, char *buf, size_t len)
150
151 int rados_cluster_stat(rados_t cluster, rados_cluster_stat_t *result)
152 int rados_cluster_fsid(rados_t cluster, char *buf, size_t len)
153 int rados_blacklist_add(rados_t cluster, char *client_address, uint32_t expire_seconds)
154 int rados_application_enable(rados_ioctx_t io, const char *app_name,
155 int force)
156 int rados_application_list(rados_ioctx_t io, char *values,
157 size_t *values_len)
158 int rados_application_metadata_get(rados_ioctx_t io, const char *app_name,
159 const char *key, char *value,
160 size_t *value_len)
161 int rados_application_metadata_set(rados_ioctx_t io, const char *app_name,
162 const char *key, const char *value)
163 int rados_application_metadata_remove(rados_ioctx_t io,
164 const char *app_name, const char *key)
165 int rados_application_metadata_list(rados_ioctx_t io,
166 const char *app_name, char *keys,
167 size_t *key_len, char *values,
168 size_t *value_len)
169 int rados_ping_monitor(rados_t cluster, const char *mon_id, char **outstr, size_t *outstrlen)
170 int rados_mon_command(rados_t cluster, const char **cmd, size_t cmdlen,
171 const char *inbuf, size_t inbuflen,
172 char **outbuf, size_t *outbuflen,
173 char **outs, size_t *outslen)
174 int rados_mgr_command(rados_t cluster, const char **cmd, size_t cmdlen,
175 const char *inbuf, size_t inbuflen,
176 char **outbuf, size_t *outbuflen,
177 char **outs, size_t *outslen)
178 int rados_mon_command_target(rados_t cluster, const char *name, const char **cmd, size_t cmdlen,
179 const char *inbuf, size_t inbuflen,
180 char **outbuf, size_t *outbuflen,
181 char **outs, size_t *outslen)
182 int rados_osd_command(rados_t cluster, int osdid, const char **cmd, size_t cmdlen,
183 const char *inbuf, size_t inbuflen,
184 char **outbuf, size_t *outbuflen,
185 char **outs, size_t *outslen)
186 int rados_pg_command(rados_t cluster, const char *pgstr, const char **cmd, size_t cmdlen,
187 const char *inbuf, size_t inbuflen,
188 char **outbuf, size_t *outbuflen,
189 char **outs, size_t *outslen)
190 int rados_monitor_log(rados_t cluster, const char *level, rados_log_callback_t cb, void *arg)
191 int rados_monitor_log2(rados_t cluster, const char *level, rados_log_callback2_t cb, void *arg)
192
193 int rados_wait_for_latest_osdmap(rados_t cluster)
194
195 int rados_ioctx_create(rados_t cluster, const char *pool_name, rados_ioctx_t *ioctx)
196 void rados_ioctx_destroy(rados_ioctx_t io)
197 int rados_ioctx_pool_set_auid(rados_ioctx_t io, uint64_t auid)
198 void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key)
199 void rados_ioctx_set_namespace(rados_ioctx_t io, const char * nspace)
200
201 uint64_t rados_get_last_version(rados_ioctx_t io)
202 int rados_stat(rados_ioctx_t io, const char *o, uint64_t *psize, time_t *pmtime)
203 int rados_write(rados_ioctx_t io, const char *oid, const char *buf, size_t len, uint64_t off)
204 int rados_write_full(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
205 int rados_append(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
206 int rados_read(rados_ioctx_t io, const char *oid, char *buf, size_t len, uint64_t off)
207 int rados_remove(rados_ioctx_t io, const char *oid)
208 int rados_trunc(rados_ioctx_t io, const char *oid, uint64_t size)
209 int rados_getxattr(rados_ioctx_t io, const char *o, const char *name, char *buf, size_t len)
210 int rados_setxattr(rados_ioctx_t io, const char *o, const char *name, const char *buf, size_t len)
211 int rados_rmxattr(rados_ioctx_t io, const char *o, const char *name)
212 int rados_getxattrs(rados_ioctx_t io, const char *oid, rados_xattrs_iter_t *iter)
213 int rados_getxattrs_next(rados_xattrs_iter_t iter, const char **name, const char **val, size_t *len)
214 void rados_getxattrs_end(rados_xattrs_iter_t iter)
215
216 int rados_nobjects_list_open(rados_ioctx_t io, rados_list_ctx_t *ctx)
217 int rados_nobjects_list_next(rados_list_ctx_t ctx, const char **entry, const char **key, const char **nspace)
218 void rados_nobjects_list_close(rados_list_ctx_t ctx)
219
220 int rados_ioctx_snap_rollback(rados_ioctx_t io, const char * oid, const char * snapname)
221 int rados_ioctx_snap_create(rados_ioctx_t io, const char * snapname)
222 int rados_ioctx_snap_remove(rados_ioctx_t io, const char * snapname)
223 int rados_ioctx_snap_lookup(rados_ioctx_t io, const char * name, rados_snap_t * id)
224 int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id, char * name, int maxlen)
225 void rados_ioctx_snap_set_read(rados_ioctx_t io, rados_snap_t snap)
226 int rados_ioctx_snap_list(rados_ioctx_t io, rados_snap_t * snaps, int maxlen)
227 int rados_ioctx_snap_get_stamp(rados_ioctx_t io, rados_snap_t id, time_t * t)
228
229 int rados_ioctx_selfmanaged_snap_create(rados_ioctx_t io,
230 rados_snap_t *snapid)
231 int rados_ioctx_selfmanaged_snap_remove(rados_ioctx_t io,
232 rados_snap_t snapid)
233 int rados_ioctx_selfmanaged_snap_set_write_ctx(rados_ioctx_t io,
234 rados_snap_t snap_seq,
235 rados_snap_t *snap,
236 int num_snaps)
237 int rados_ioctx_selfmanaged_snap_rollback(rados_ioctx_t io, const char *oid,
238 rados_snap_t snapid)
239
240 int rados_lock_exclusive(rados_ioctx_t io, const char * oid, const char * name,
241 const char * cookie, const char * desc,
242 timeval * duration, uint8_t flags)
243 int rados_lock_shared(rados_ioctx_t io, const char * o, const char * name,
244 const char * cookie, const char * tag, const char * desc,
245 timeval * duration, uint8_t flags)
246 int rados_unlock(rados_ioctx_t io, const char * o, const char * name, const char * cookie)
247
248 rados_write_op_t rados_create_write_op()
249 void rados_release_write_op(rados_write_op_t write_op)
250
251 rados_read_op_t rados_create_read_op()
252 void rados_release_read_op(rados_read_op_t read_op)
253
254 int rados_aio_create_completion(void * cb_arg, rados_callback_t cb_complete, rados_callback_t cb_safe, rados_completion_t * pc)
255 void rados_aio_release(rados_completion_t c)
256 int rados_aio_stat(rados_ioctx_t io, const char *oid, rados_completion_t completion, uint64_t *psize, time_t *pmtime)
257 int rados_aio_write(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len, uint64_t off)
258 int rados_aio_append(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
259 int rados_aio_write_full(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
260 int rados_aio_remove(rados_ioctx_t io, const char * oid, rados_completion_t completion)
261 int rados_aio_read(rados_ioctx_t io, const char * oid, rados_completion_t completion, char * buf, size_t len, uint64_t off)
262 int rados_aio_flush(rados_ioctx_t io)
263
264 int rados_aio_get_return_value(rados_completion_t c)
265 int rados_aio_wait_for_complete_and_cb(rados_completion_t c)
266 int rados_aio_wait_for_safe_and_cb(rados_completion_t c)
267 int rados_aio_wait_for_complete(rados_completion_t c)
268 int rados_aio_wait_for_safe(rados_completion_t c)
269 int rados_aio_is_complete(rados_completion_t c)
270 int rados_aio_is_safe(rados_completion_t c)
271
272 int rados_exec(rados_ioctx_t io, const char * oid, const char * cls, const char * method,
273 const char * in_buf, size_t in_len, char * buf, size_t out_len)
274 int rados_aio_exec(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * cls, const char * method,
275 const char * in_buf, size_t in_len, char * buf, size_t out_len)
276
277 int rados_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, const char * oid, time_t * mtime, int flags)
278 int rados_aio_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, time_t *mtime, int flags)
279 void rados_write_op_omap_set(rados_write_op_t write_op, const char * const* keys, const char * const* vals, const size_t * lens, size_t num)
280 void rados_write_op_omap_rm_keys(rados_write_op_t write_op, const char * const* keys, size_t keys_len)
281 void rados_write_op_omap_clear(rados_write_op_t write_op)
282 void rados_write_op_set_flags(rados_write_op_t write_op, int flags)
283
284 void rados_write_op_create(rados_write_op_t write_op, int exclusive, const char *category)
285 void rados_write_op_append(rados_write_op_t write_op, const char *buffer, size_t len)
286 void rados_write_op_write_full(rados_write_op_t write_op, const char *buffer, size_t len)
287 void rados_write_op_write(rados_write_op_t write_op, const char *buffer, size_t len, uint64_t offset)
288 void rados_write_op_remove(rados_write_op_t write_op)
289 void rados_write_op_truncate(rados_write_op_t write_op, uint64_t offset)
290 void rados_write_op_zero(rados_write_op_t write_op, uint64_t offset, uint64_t len)
291
292 void rados_read_op_omap_get_vals2(rados_read_op_t read_op, const char * start_after, const char * filter_prefix, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
293 void rados_read_op_omap_get_keys2(rados_read_op_t read_op, const char * start_after, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
294 void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op, const char * const* keys, size_t keys_len, rados_omap_iter_t * iter, int * prval)
295 int rados_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, const char * oid, int flags)
296 int rados_aio_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, int flags)
297 void rados_read_op_set_flags(rados_read_op_t read_op, int flags)
298 int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len)
299 void rados_omap_get_end(rados_omap_iter_t iter)
300
301
302 LIBRADOS_OP_FLAG_EXCL = _LIBRADOS_OP_FLAG_EXCL
303 LIBRADOS_OP_FLAG_FAILOK = _LIBRADOS_OP_FLAG_FAILOK
304 LIBRADOS_OP_FLAG_FADVISE_RANDOM = _LIBRADOS_OP_FLAG_FADVISE_RANDOM
305 LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL = _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL
306 LIBRADOS_OP_FLAG_FADVISE_WILLNEED = _LIBRADOS_OP_FLAG_FADVISE_WILLNEED
307 LIBRADOS_OP_FLAG_FADVISE_DONTNEED = _LIBRADOS_OP_FLAG_FADVISE_DONTNEED
308 LIBRADOS_OP_FLAG_FADVISE_NOCACHE = _LIBRADOS_OP_FLAG_FADVISE_NOCACHE
309
310 LIBRADOS_SNAP_HEAD = _LIBRADOS_SNAP_HEAD
311
312 LIBRADOS_OPERATION_NOFLAG = _LIBRADOS_OPERATION_NOFLAG
313 LIBRADOS_OPERATION_BALANCE_READS = _LIBRADOS_OPERATION_BALANCE_READS
314 LIBRADOS_OPERATION_LOCALIZE_READS = _LIBRADOS_OPERATION_LOCALIZE_READS
315 LIBRADOS_OPERATION_ORDER_READS_WRITES = _LIBRADOS_OPERATION_ORDER_READS_WRITES
316 LIBRADOS_OPERATION_IGNORE_CACHE = _LIBRADOS_OPERATION_IGNORE_CACHE
317 LIBRADOS_OPERATION_SKIPRWLOCKS = _LIBRADOS_OPERATION_SKIPRWLOCKS
318 LIBRADOS_OPERATION_IGNORE_OVERLAY = _LIBRADOS_OPERATION_IGNORE_OVERLAY
319
320 LIBRADOS_ALL_NSPACES = _LIBRADOS_ALL_NSPACES.decode('utf-8')
321
322 LIBRADOS_CREATE_EXCLUSIVE = _LIBRADOS_CREATE_EXCLUSIVE
323 LIBRADOS_CREATE_IDEMPOTENT = _LIBRADOS_CREATE_IDEMPOTENT
324
325 ANONYMOUS_AUID = 0xffffffffffffffff
326 ADMIN_AUID = 0
327
328
329 class Error(Exception):
330 """ `Error` class, derived from `Exception` """
331 def __init__(self, message, errno=None):
332 super(Exception, self).__init__(message)
333 self.errno = errno
334
335 def __str__(self):
336 msg = super(Exception, self).__str__()
337 if self.errno is None:
338 return msg
339 return '[errno {0}] {1}'.format(self.errno, msg)
340
341 def __reduce__(self):
342 return (self.__class__, (self.message, self.errno))
343
344 class InvalidArgumentError(Error):
345 pass
346
347 class OSError(Error):
348 """ `OSError` class, derived from `Error` """
349 pass
350
351 class InterruptedOrTimeoutError(OSError):
352 """ `InterruptedOrTimeoutError` class, derived from `OSError` """
353 pass
354
355
356 class PermissionError(OSError):
357 """ `PermissionError` class, derived from `OSError` """
358 pass
359
360
361 class PermissionDeniedError(OSError):
362 """ deal with EACCES related. """
363 pass
364
365
366 class ObjectNotFound(OSError):
367 """ `ObjectNotFound` class, derived from `OSError` """
368 pass
369
370
371 class NoData(OSError):
372 """ `NoData` class, derived from `OSError` """
373 pass
374
375
376 class ObjectExists(OSError):
377 """ `ObjectExists` class, derived from `OSError` """
378 pass
379
380
381 class ObjectBusy(OSError):
382 """ `ObjectBusy` class, derived from `IOError` """
383 pass
384
385
386 class IOError(OSError):
387 """ `ObjectBusy` class, derived from `OSError` """
388 pass
389
390
391 class NoSpace(OSError):
392 """ `NoSpace` class, derived from `OSError` """
393 pass
394
395
396 class RadosStateError(Error):
397 """ `RadosStateError` class, derived from `Error` """
398 pass
399
400
401 class IoctxStateError(Error):
402 """ `IoctxStateError` class, derived from `Error` """
403 pass
404
405
406 class ObjectStateError(Error):
407 """ `ObjectStateError` class, derived from `Error` """
408 pass
409
410
411 class LogicError(Error):
412 """ `` class, derived from `Error` """
413 pass
414
415
416 class TimedOut(OSError):
417 """ `TimedOut` class, derived from `OSError` """
418 pass
419
420
421 IF UNAME_SYSNAME == "FreeBSD":
422 cdef errno_to_exception = {
423 errno.EPERM : PermissionError,
424 errno.ENOENT : ObjectNotFound,
425 errno.EIO : IOError,
426 errno.ENOSPC : NoSpace,
427 errno.EEXIST : ObjectExists,
428 errno.EBUSY : ObjectBusy,
429 errno.ENOATTR : NoData,
430 errno.EINTR : InterruptedOrTimeoutError,
431 errno.ETIMEDOUT : TimedOut,
432 errno.EACCES : PermissionDeniedError,
433 errno.EINVAL : InvalidArgumentError,
434 }
435 ELSE:
436 cdef errno_to_exception = {
437 errno.EPERM : PermissionError,
438 errno.ENOENT : ObjectNotFound,
439 errno.EIO : IOError,
440 errno.ENOSPC : NoSpace,
441 errno.EEXIST : ObjectExists,
442 errno.EBUSY : ObjectBusy,
443 errno.ENODATA : NoData,
444 errno.EINTR : InterruptedOrTimeoutError,
445 errno.ETIMEDOUT : TimedOut,
446 errno.EACCES : PermissionDeniedError,
447 errno.EINVAL : InvalidArgumentError,
448 }
449
450
451 cdef make_ex(ret, msg):
452 """
453 Translate a librados return code into an exception.
454
455 :param ret: the return code
456 :type ret: int
457 :param msg: the error message to use
458 :type msg: str
459 :returns: a subclass of :class:`Error`
460 """
461 ret = abs(ret)
462 if ret in errno_to_exception:
463 return errno_to_exception[ret](msg, errno=ret)
464 else:
465 return OSError(msg, errno=ret)
466
467
468 # helper to specify an optional argument, where in addition to `cls`, `None`
469 # is also acceptable
470 def opt(cls):
471 return (cls, None)
472
473
474 # validate argument types of an instance method
475 # kwargs is an un-ordered dict, so use args instead
476 def requires(*types):
477 def is_type_of(v, t):
478 if t is None:
479 return v is None
480 else:
481 return isinstance(v, t)
482
483 def check_type(val, arg_name, arg_type):
484 if isinstance(arg_type, tuple):
485 if any(is_type_of(val, t) for t in arg_type):
486 return
487 type_names = ' or '.join('None' if t is None else t.__name__
488 for t in arg_type)
489 raise TypeError('%s must be %s' % (arg_name, type_names))
490 else:
491 if is_type_of(val, arg_type):
492 return
493 assert(arg_type is not None)
494 raise TypeError('%s must be %s' % (arg_name, arg_type.__name__))
495
496 def wrapper(f):
497 # FIXME(sileht): this stop with
498 # AttributeError: 'method_descriptor' object has no attribute '__module__'
499 # @wraps(f)
500 def validate_func(*args, **kwargs):
501 # ignore the `self` arg
502 pos_args = zip(args[1:], types)
503 named_args = ((kwargs[name], (name, spec)) for name, spec in types
504 if name in kwargs)
505 for arg_val, (arg_name, arg_type) in chain(pos_args, named_args):
506 check_type(arg_val, arg_name, arg_type)
507 return f(*args, **kwargs)
508 return validate_func
509 return wrapper
510
511
512 def cstr(val, name, encoding="utf-8", opt=False):
513 """
514 Create a byte string from a Python string
515
516 :param basestring val: Python string
517 :param str name: Name of the string parameter, for exceptions
518 :param str encoding: Encoding to use
519 :param bool opt: If True, None is allowed
520 :rtype: bytes
521 :raises: :class:`InvalidArgument`
522 """
523 if opt and val is None:
524 return None
525 if isinstance(val, bytes):
526 return val
527 elif isinstance(val, unicode):
528 return val.encode(encoding)
529 else:
530 raise TypeError('%s must be a string' % name)
531
532
533 def cstr_list(list_str, name, encoding="utf-8"):
534 return [cstr(s, name) for s in list_str]
535
536
537 def decode_cstr(val, encoding="utf-8"):
538 """
539 Decode a byte string into a Python string.
540
541 :param bytes val: byte string
542 :rtype: unicode or None
543 """
544 if val is None:
545 return None
546
547 return val.decode(encoding)
548
549
550 cdef char* opt_str(s) except? NULL:
551 if s is None:
552 return NULL
553 return s
554
555
556 cdef void* realloc_chk(void* ptr, size_t size) except NULL:
557 cdef void *ret = realloc(ptr, size)
558 if ret == NULL:
559 raise MemoryError("realloc failed")
560 return ret
561
562
563 cdef size_t * to_csize_t_array(list_int):
564 cdef size_t *ret = <size_t *>malloc(len(list_int) * sizeof(size_t))
565 if ret == NULL:
566 raise MemoryError("malloc failed")
567 for i in xrange(len(list_int)):
568 ret[i] = <size_t>list_int[i]
569 return ret
570
571
572 cdef char ** to_bytes_array(list_bytes):
573 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
574 if ret == NULL:
575 raise MemoryError("malloc failed")
576 for i in xrange(len(list_bytes)):
577 ret[i] = <char *>list_bytes[i]
578 return ret
579
580
581
582 cdef int __monitor_callback(void *arg, const char *line, const char *who,
583 uint64_t sec, uint64_t nsec, uint64_t seq,
584 const char *level, const char *msg) with gil:
585 cdef object cb_info = <object>arg
586 cb_info[0](cb_info[1], line, who, sec, nsec, seq, level, msg)
587 return 0
588
589 cdef int __monitor_callback2(void *arg, const char *line, const char *channel,
590 const char *who,
591 const char *name,
592 uint64_t sec, uint64_t nsec, uint64_t seq,
593 const char *level, const char *msg) with gil:
594 cdef object cb_info = <object>arg
595 cb_info[0](cb_info[1], line, channel, name, who, sec, nsec, seq, level, msg)
596 return 0
597
598
599 class Version(object):
600 """ Version information """
601 def __init__(self, major, minor, extra):
602 self.major = major
603 self.minor = minor
604 self.extra = extra
605
606 def __str__(self):
607 return "%d.%d.%d" % (self.major, self.minor, self.extra)
608
609
610 cdef class Rados(object):
611 """This class wraps librados functions"""
612 # NOTE(sileht): attributes declared in .pyd
613
614 def __init__(self, *args, **kwargs):
615 PyEval_InitThreads()
616 self.__setup(*args, **kwargs)
617
618 @requires(('rados_id', opt(str_type)), ('name', opt(str_type)), ('clustername', opt(str_type)),
619 ('conffile', opt(str_type)))
620 def __setup(self, rados_id=None, name=None, clustername=None,
621 conf_defaults=None, conffile=None, conf=None, flags=0,
622 context=None):
623 self.monitor_callback = None
624 self.monitor_callback2 = None
625 self.parsed_args = []
626 self.conf_defaults = conf_defaults
627 self.conffile = conffile
628 self.rados_id = rados_id
629
630 if rados_id and name:
631 raise Error("Rados(): can't supply both rados_id and name")
632 elif rados_id:
633 name = 'client.' + rados_id
634 elif name is None:
635 name = 'client.admin'
636 if clustername is None:
637 clustername = ''
638
639 name = cstr(name, 'name')
640 clustername = cstr(clustername, 'clustername')
641 cdef:
642 char *_name = name
643 char *_clustername = clustername
644 int _flags = flags
645 int ret
646
647 if context:
648 # Unpack void* (aka rados_config_t) from capsule
649 rados_config = <rados_config_t> PyCapsule_GetPointer(context, NULL)
650 with nogil:
651 ret = rados_create_with_context(&self.cluster, rados_config)
652 else:
653 with nogil:
654 ret = rados_create2(&self.cluster, _clustername, _name, _flags)
655 if ret != 0:
656 raise Error("rados_initialize failed with error code: %d" % ret)
657
658 self.state = "configuring"
659 # order is important: conf_defaults, then conffile, then conf
660 if conf_defaults:
661 for key, value in conf_defaults.items():
662 self.conf_set(key, value)
663 if conffile is not None:
664 # read the default conf file when '' is given
665 if conffile == '':
666 conffile = None
667 self.conf_read_file(conffile)
668 if conf:
669 for key, value in conf.items():
670 self.conf_set(key, value)
671
672 def require_state(self, *args):
673 """
674 Checks if the Rados object is in a special state
675
676 :raises: RadosStateError
677 """
678 if self.state in args:
679 return
680 raise RadosStateError("You cannot perform that operation on a \
681 Rados object in state %s." % self.state)
682
683 def shutdown(self):
684 """
685 Disconnects from the cluster. Call this explicitly when a
686 Rados.connect()ed object is no longer used.
687 """
688 if self.state != "shutdown":
689 with nogil:
690 rados_shutdown(self.cluster)
691 self.state = "shutdown"
692
693 def __enter__(self):
694 self.connect()
695 return self
696
697 def __exit__(self, type_, value, traceback):
698 self.shutdown()
699 return False
700
701 def version(self):
702 """
703 Get the version number of the ``librados`` C library.
704
705 :returns: a tuple of ``(major, minor, extra)`` components of the
706 librados version
707 """
708 cdef int major = 0
709 cdef int minor = 0
710 cdef int extra = 0
711 with nogil:
712 rados_version(&major, &minor, &extra)
713 return Version(major, minor, extra)
714
715 @requires(('path', opt(str_type)))
716 def conf_read_file(self, path=None):
717 """
718 Configure the cluster handle using a Ceph config file.
719
720 :param path: path to the config file
721 :type path: str
722 """
723 self.require_state("configuring", "connected")
724 path = cstr(path, 'path', opt=True)
725 cdef:
726 char *_path = opt_str(path)
727 with nogil:
728 ret = rados_conf_read_file(self.cluster, _path)
729 if ret != 0:
730 raise make_ex(ret, "error calling conf_read_file")
731
732 def conf_parse_argv(self, args):
733 """
734 Parse known arguments from args, and remove; returned
735 args contain only those unknown to ceph
736 """
737 self.require_state("configuring", "connected")
738 if not args:
739 return
740
741 cargs = cstr_list(args, 'args')
742 cdef:
743 int _argc = len(args)
744 char **_argv = to_bytes_array(cargs)
745 char **_remargv = NULL
746
747 try:
748 _remargv = <char **>malloc(_argc * sizeof(char *))
749 with nogil:
750 ret = rados_conf_parse_argv_remainder(self.cluster, _argc,
751 <const char**>_argv,
752 <const char**>_remargv)
753 if ret:
754 raise make_ex(ret, "error calling conf_parse_argv_remainder")
755
756 # _remargv was allocated with fixed argc; collapse return
757 # list to eliminate any missing args
758 retargs = [decode_cstr(a) for a in _remargv[:_argc]
759 if a != NULL]
760 self.parsed_args = args
761 return retargs
762 finally:
763 free(_argv)
764 free(_remargv)
765
766 def conf_parse_env(self, var='CEPH_ARGS'):
767 """
768 Parse known arguments from an environment variable, normally
769 CEPH_ARGS.
770 """
771 self.require_state("configuring", "connected")
772 if not var:
773 return
774
775 var = cstr(var, 'var')
776 cdef:
777 char *_var = var
778 with nogil:
779 ret = rados_conf_parse_env(self.cluster, _var)
780 if ret != 0:
781 raise make_ex(ret, "error calling conf_parse_env")
782
783 @requires(('option', str_type))
784 def conf_get(self, option):
785 """
786 Get the value of a configuration option
787
788 :param option: which option to read
789 :type option: str
790
791 :returns: str - value of the option or None
792 :raises: :class:`TypeError`
793 """
794 self.require_state("configuring", "connected")
795 option = cstr(option, 'option')
796 cdef:
797 char *_option = option
798 size_t length = 20
799 char *ret_buf = NULL
800
801 try:
802 while True:
803 ret_buf = <char *>realloc_chk(ret_buf, length)
804 with nogil:
805 ret = rados_conf_get(self.cluster, _option, ret_buf, length)
806 if ret == 0:
807 return decode_cstr(ret_buf)
808 elif ret == -errno.ENAMETOOLONG:
809 length = length * 2
810 elif ret == -errno.ENOENT:
811 return None
812 else:
813 raise make_ex(ret, "error calling conf_get")
814 finally:
815 free(ret_buf)
816
817 @requires(('option', str_type), ('val', str_type))
818 def conf_set(self, option, val):
819 """
820 Set the value of a configuration option
821
822 :param option: which option to set
823 :type option: str
824 :param option: value of the option
825 :type option: str
826
827 :raises: :class:`TypeError`, :class:`ObjectNotFound`
828 """
829 self.require_state("configuring", "connected")
830 option = cstr(option, 'option')
831 val = cstr(val, 'val')
832 cdef:
833 char *_option = option
834 char *_val = val
835
836 with nogil:
837 ret = rados_conf_set(self.cluster, _option, _val)
838 if ret != 0:
839 raise make_ex(ret, "error calling conf_set")
840
841 def ping_monitor(self, mon_id):
842 """
843 Ping a monitor to assess liveness
844
845 May be used as a simply way to assess liveness, or to obtain
846 information about the monitor in a simple way even in the
847 absence of quorum.
848
849 :param mon_id: the ID portion of the monitor's name (i.e., mon.<ID>)
850 :type mon_id: str
851 :returns: the string reply from the monitor
852 """
853
854 self.require_state("configuring", "connected")
855
856 mon_id = cstr(mon_id, 'mon_id')
857 cdef:
858 char *_mon_id = mon_id
859 size_t outstrlen = 0
860 char *outstr
861
862 with nogil:
863 ret = rados_ping_monitor(self.cluster, _mon_id, &outstr, &outstrlen)
864
865 if ret != 0:
866 raise make_ex(ret, "error calling ping_monitor")
867
868 if outstrlen:
869 my_outstr = outstr[:outstrlen]
870 rados_buffer_free(outstr)
871 return decode_cstr(my_outstr)
872
873 def connect(self, timeout=0):
874 """
875 Connect to the cluster. Use shutdown() to release resources.
876 """
877 self.require_state("configuring")
878 # NOTE(sileht): timeout was supported by old python API,
879 # but this is not something available in C API, so ignore
880 # for now and remove it later
881 with nogil:
882 ret = rados_connect(self.cluster)
883 if ret != 0:
884 raise make_ex(ret, "error connecting to the cluster")
885 self.state = "connected"
886
887 def get_cluster_stats(self):
888 """
889 Read usage info about the cluster
890
891 This tells you total space, space used, space available, and number
892 of objects. These are not updated immediately when data is written,
893 they are eventually consistent.
894
895 :returns: dict - contains the following keys:
896
897 - ``kb`` (int) - total space
898
899 - ``kb_used`` (int) - space used
900
901 - ``kb_avail`` (int) - free space available
902
903 - ``num_objects`` (int) - number of objects
904
905 """
906 cdef:
907 rados_cluster_stat_t stats
908
909 with nogil:
910 ret = rados_cluster_stat(self.cluster, &stats)
911
912 if ret < 0:
913 raise make_ex(
914 ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id)
915 return {'kb': stats.kb,
916 'kb_used': stats.kb_used,
917 'kb_avail': stats.kb_avail,
918 'num_objects': stats.num_objects}
919
920 @requires(('pool_name', str_type))
921 def pool_exists(self, pool_name):
922 """
923 Checks if a given pool exists.
924
925 :param pool_name: name of the pool to check
926 :type pool_name: str
927
928 :raises: :class:`TypeError`, :class:`Error`
929 :returns: bool - whether the pool exists, false otherwise.
930 """
931 self.require_state("connected")
932
933 pool_name = cstr(pool_name, 'pool_name')
934 cdef:
935 char *_pool_name = pool_name
936
937 with nogil:
938 ret = rados_pool_lookup(self.cluster, _pool_name)
939 if ret >= 0:
940 return True
941 elif ret == -errno.ENOENT:
942 return False
943 else:
944 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
945
946 @requires(('pool_name', str_type))
947 def pool_lookup(self, pool_name):
948 """
949 Returns a pool's ID based on its name.
950
951 :param pool_name: name of the pool to look up
952 :type pool_name: str
953
954 :raises: :class:`TypeError`, :class:`Error`
955 :returns: int - pool ID, or None if it doesn't exist
956 """
957 self.require_state("connected")
958 pool_name = cstr(pool_name, 'pool_name')
959 cdef:
960 char *_pool_name = pool_name
961
962 with nogil:
963 ret = rados_pool_lookup(self.cluster, _pool_name)
964 if ret >= 0:
965 return int(ret)
966 elif ret == -errno.ENOENT:
967 return None
968 else:
969 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
970
971 @requires(('pool_id', int))
972 def pool_reverse_lookup(self, pool_id):
973 """
974 Returns a pool's name based on its ID.
975
976 :param pool_id: ID of the pool to look up
977 :type pool_id: int
978
979 :raises: :class:`TypeError`, :class:`Error`
980 :returns: string - pool name, or None if it doesn't exist
981 """
982 self.require_state("connected")
983 cdef:
984 int64_t _pool_id = pool_id
985 size_t size = 512
986 char *name = NULL
987
988 try:
989 while True:
990 name = <char *>realloc_chk(name, size)
991 with nogil:
992 ret = rados_pool_reverse_lookup(self.cluster, _pool_id, name, size)
993 if ret >= 0:
994 break
995 elif ret != -errno.ERANGE and size <= 4096:
996 size *= 2
997 elif ret == -errno.ENOENT:
998 return None
999 elif ret < 0:
1000 raise make_ex(ret, "error reverse looking up pool '%s'" % pool_id)
1001
1002 return decode_cstr(name)
1003
1004 finally:
1005 free(name)
1006
1007 @requires(('pool_name', str_type), ('auid', opt(int)), ('crush_rule', opt(int)))
1008 def create_pool(self, pool_name, auid=None, crush_rule=None):
1009 """
1010 Create a pool:
1011 - with default settings: if auid=None and crush_rule=None
1012 - owned by a specific auid: auid given and crush_rule=None
1013 - with a specific CRUSH rule: if auid=None and crush_rule given
1014 - with a specific CRUSH rule and auid: if auid and crush_rule given
1015
1016 :param pool_name: name of the pool to create
1017 :type pool_name: str
1018 :param auid: the id of the owner of the new pool
1019 :type auid: int
1020 :param crush_rule: rule to use for placement in the new pool
1021 :type crush_rule: int
1022
1023 :raises: :class:`TypeError`, :class:`Error`
1024 """
1025 self.require_state("connected")
1026
1027 pool_name = cstr(pool_name, 'pool_name')
1028 cdef:
1029 char *_pool_name = pool_name
1030 uint8_t _crush_rule
1031 uint64_t _auid
1032
1033 if auid is None and crush_rule is None:
1034 with nogil:
1035 ret = rados_pool_create(self.cluster, _pool_name)
1036 elif auid is None:
1037 _crush_rule = crush_rule
1038 with nogil:
1039 ret = rados_pool_create_with_crush_rule(self.cluster, _pool_name, _crush_rule)
1040 elif crush_rule is None:
1041 _auid = auid
1042 with nogil:
1043 ret = rados_pool_create_with_auid(self.cluster, _pool_name, _auid)
1044 else:
1045 _auid = auid
1046 _crush_rule = crush_rule
1047 with nogil:
1048 ret = rados_pool_create_with_all(self.cluster, _pool_name, _auid, _crush_rule)
1049 if ret < 0:
1050 raise make_ex(ret, "error creating pool '%s'" % pool_name)
1051
1052 @requires(('pool_id', int))
1053 def get_pool_base_tier(self, pool_id):
1054 """
1055 Get base pool
1056
1057 :returns: base pool, or pool_id if tiering is not configured for the pool
1058 """
1059 self.require_state("connected")
1060 cdef:
1061 int64_t base_tier = 0
1062 int64_t _pool_id = pool_id
1063
1064 with nogil:
1065 ret = rados_pool_get_base_tier(self.cluster, _pool_id, &base_tier)
1066 if ret < 0:
1067 raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id)
1068 return int(base_tier)
1069
1070 @requires(('pool_name', str_type))
1071 def delete_pool(self, pool_name):
1072 """
1073 Delete a pool and all data inside it.
1074
1075 The pool is removed from the cluster immediately,
1076 but the actual data is deleted in the background.
1077
1078 :param pool_name: name of the pool to delete
1079 :type pool_name: str
1080
1081 :raises: :class:`TypeError`, :class:`Error`
1082 """
1083 self.require_state("connected")
1084
1085 pool_name = cstr(pool_name, 'pool_name')
1086 cdef:
1087 char *_pool_name = pool_name
1088
1089 with nogil:
1090 ret = rados_pool_delete(self.cluster, _pool_name)
1091 if ret < 0:
1092 raise make_ex(ret, "error deleting pool '%s'" % pool_name)
1093
1094 @requires(('pool_id', int))
1095 def get_inconsistent_pgs(self, pool_id):
1096 """
1097 List inconsistent placement groups in the given pool
1098
1099 :param pool_id: ID of the pool in which PGs are listed
1100 :type pool_id: int
1101 :returns: list - inconsistent placement groups
1102 """
1103 self.require_state("connected")
1104 cdef:
1105 int64_t pool = pool_id
1106 size_t size = 512
1107 char *pgs = NULL
1108
1109 try:
1110 while True:
1111 pgs = <char *>realloc_chk(pgs, size);
1112 with nogil:
1113 ret = rados_inconsistent_pg_list(self.cluster, pool,
1114 pgs, size)
1115 if ret > <int>size:
1116 size *= 2
1117 elif ret >= 0:
1118 break
1119 else:
1120 raise make_ex(ret, "error calling inconsistent_pg_list")
1121 return [pg for pg in decode_cstr(pgs[:ret]).split('\0') if pg]
1122 finally:
1123 free(pgs)
1124
1125 def list_pools(self):
1126 """
1127 Gets a list of pool names.
1128
1129 :returns: list - of pool names.
1130 """
1131 self.require_state("connected")
1132 cdef:
1133 size_t size = 512
1134 char *c_names = NULL
1135
1136 try:
1137 while True:
1138 c_names = <char *>realloc_chk(c_names, size)
1139 with nogil:
1140 ret = rados_pool_list(self.cluster, c_names, size)
1141 if ret > <int>size:
1142 size *= 2
1143 elif ret >= 0:
1144 break
1145 return [name for name in decode_cstr(c_names[:ret]).split('\0')
1146 if name]
1147 finally:
1148 free(c_names)
1149
1150 def get_fsid(self):
1151 """
1152 Get the fsid of the cluster as a hexadecimal string.
1153
1154 :raises: :class:`Error`
1155 :returns: str - cluster fsid
1156 """
1157 self.require_state("connected")
1158 cdef:
1159 char *ret_buf
1160 size_t buf_len = 37
1161 PyObject* ret_s = NULL
1162
1163 ret_s = PyBytes_FromStringAndSize(NULL, buf_len)
1164 try:
1165 ret_buf = PyBytes_AsString(ret_s)
1166 with nogil:
1167 ret = rados_cluster_fsid(self.cluster, ret_buf, buf_len)
1168 if ret < 0:
1169 raise make_ex(ret, "error getting cluster fsid")
1170 if ret != <int>buf_len:
1171 _PyBytes_Resize(&ret_s, ret)
1172 return <object>ret_s
1173 finally:
1174 # We DECREF unconditionally: the cast to object above will have
1175 # INCREFed if necessary. This also takes care of exceptions,
1176 # including if _PyString_Resize fails (that will free the string
1177 # itself and set ret_s to NULL, hence XDECREF).
1178 ref.Py_XDECREF(ret_s)
1179
1180 @requires(('ioctx_name', str_type))
1181 def open_ioctx(self, ioctx_name):
1182 """
1183 Create an io context
1184
1185 The io context allows you to perform operations within a particular
1186 pool.
1187
1188 :param ioctx_name: name of the pool
1189 :type ioctx_name: str
1190
1191 :raises: :class:`TypeError`, :class:`Error`
1192 :returns: Ioctx - Rados Ioctx object
1193 """
1194 self.require_state("connected")
1195 ioctx_name = cstr(ioctx_name, 'ioctx_name')
1196 cdef:
1197 rados_ioctx_t ioctx
1198 char *_ioctx_name = ioctx_name
1199 with nogil:
1200 ret = rados_ioctx_create(self.cluster, _ioctx_name, &ioctx)
1201 if ret < 0:
1202 raise make_ex(ret, "error opening pool '%s'" % ioctx_name)
1203 io = Ioctx(ioctx_name)
1204 io.io = ioctx
1205 return io
1206
1207 def mon_command(self, cmd, inbuf, timeout=0, target=None):
1208 """
1209 mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen)
1210 returns (int ret, string outbuf, string outs)
1211 """
1212 # NOTE(sileht): timeout is ignored because C API doesn't provide
1213 # timeout argument, but we keep it for backward compat with old python binding
1214
1215 self.require_state("connected")
1216 cmd = cstr_list(cmd, 'c')
1217
1218 if isinstance(target, int):
1219 # NOTE(sileht): looks weird but test_monmap_dump pass int
1220 target = str(target)
1221
1222 target = cstr(target, 'target', opt=True)
1223 inbuf = cstr(inbuf, 'inbuf')
1224
1225 cdef:
1226 char *_target = opt_str(target)
1227 char **_cmd = to_bytes_array(cmd)
1228 size_t _cmdlen = len(cmd)
1229
1230 char *_inbuf = inbuf
1231 size_t _inbuf_len = len(inbuf)
1232
1233 char *_outbuf
1234 size_t _outbuf_len
1235 char *_outs
1236 size_t _outs_len
1237
1238 try:
1239 if target:
1240 with nogil:
1241 ret = rados_mon_command_target(self.cluster, _target,
1242 <const char **>_cmd, _cmdlen,
1243 <const char*>_inbuf, _inbuf_len,
1244 &_outbuf, &_outbuf_len,
1245 &_outs, &_outs_len)
1246 else:
1247 with nogil:
1248 ret = rados_mon_command(self.cluster,
1249 <const char **>_cmd, _cmdlen,
1250 <const char*>_inbuf, _inbuf_len,
1251 &_outbuf, &_outbuf_len,
1252 &_outs, &_outs_len)
1253
1254 my_outs = decode_cstr(_outs[:_outs_len])
1255 my_outbuf = _outbuf[:_outbuf_len]
1256 if _outs_len:
1257 rados_buffer_free(_outs)
1258 if _outbuf_len:
1259 rados_buffer_free(_outbuf)
1260 return (ret, my_outbuf, my_outs)
1261 finally:
1262 free(_cmd)
1263
1264 def osd_command(self, osdid, cmd, inbuf, timeout=0):
1265 """
1266 osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1267 returns (int ret, string outbuf, string outs)
1268 """
1269 # NOTE(sileht): timeout is ignored because C API doesn't provide
1270 # timeout argument, but we keep it for backward compat with old python binding
1271 self.require_state("connected")
1272
1273 cmd = cstr_list(cmd, 'cmd')
1274 inbuf = cstr(inbuf, 'inbuf')
1275
1276 cdef:
1277 int _osdid = osdid
1278 char **_cmd = to_bytes_array(cmd)
1279 size_t _cmdlen = len(cmd)
1280
1281 char *_inbuf = inbuf
1282 size_t _inbuf_len = len(inbuf)
1283
1284 char *_outbuf
1285 size_t _outbuf_len
1286 char *_outs
1287 size_t _outs_len
1288
1289 try:
1290 with nogil:
1291 ret = rados_osd_command(self.cluster, _osdid,
1292 <const char **>_cmd, _cmdlen,
1293 <const char*>_inbuf, _inbuf_len,
1294 &_outbuf, &_outbuf_len,
1295 &_outs, &_outs_len)
1296
1297 my_outs = decode_cstr(_outs[:_outs_len])
1298 my_outbuf = _outbuf[:_outbuf_len]
1299 if _outs_len:
1300 rados_buffer_free(_outs)
1301 if _outbuf_len:
1302 rados_buffer_free(_outbuf)
1303 return (ret, my_outbuf, my_outs)
1304 finally:
1305 free(_cmd)
1306
1307 def mgr_command(self, cmd, inbuf, timeout=0):
1308 """
1309 returns (int ret, string outbuf, string outs)
1310 """
1311 # NOTE(sileht): timeout is ignored because C API doesn't provide
1312 # timeout argument, but we keep it for backward compat with old python binding
1313 self.require_state("connected")
1314
1315 cmd = cstr_list(cmd, 'cmd')
1316 inbuf = cstr(inbuf, 'inbuf')
1317
1318 cdef:
1319 char **_cmd = to_bytes_array(cmd)
1320 size_t _cmdlen = len(cmd)
1321
1322 char *_inbuf = inbuf
1323 size_t _inbuf_len = len(inbuf)
1324
1325 char *_outbuf
1326 size_t _outbuf_len
1327 char *_outs
1328 size_t _outs_len
1329
1330 try:
1331 with nogil:
1332 ret = rados_mgr_command(self.cluster,
1333 <const char **>_cmd, _cmdlen,
1334 <const char*>_inbuf, _inbuf_len,
1335 &_outbuf, &_outbuf_len,
1336 &_outs, &_outs_len)
1337
1338 my_outs = decode_cstr(_outs[:_outs_len])
1339 my_outbuf = _outbuf[:_outbuf_len]
1340 if _outs_len:
1341 rados_buffer_free(_outs)
1342 if _outbuf_len:
1343 rados_buffer_free(_outbuf)
1344 return (ret, my_outbuf, my_outs)
1345 finally:
1346 free(_cmd)
1347
1348 def pg_command(self, pgid, cmd, inbuf, timeout=0):
1349 """
1350 pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1351 returns (int ret, string outbuf, string outs)
1352 """
1353 # NOTE(sileht): timeout is ignored because C API doesn't provide
1354 # timeout argument, but we keep it for backward compat with old python binding
1355 self.require_state("connected")
1356
1357 pgid = cstr(pgid, 'pgid')
1358 cmd = cstr_list(cmd, 'cmd')
1359 inbuf = cstr(inbuf, 'inbuf')
1360
1361 cdef:
1362 char *_pgid = pgid
1363 char **_cmd = to_bytes_array(cmd)
1364 size_t _cmdlen = len(cmd)
1365
1366 char *_inbuf = inbuf
1367 size_t _inbuf_len = len(inbuf)
1368
1369 char *_outbuf
1370 size_t _outbuf_len
1371 char *_outs
1372 size_t _outs_len
1373
1374 try:
1375 with nogil:
1376 ret = rados_pg_command(self.cluster, _pgid,
1377 <const char **>_cmd, _cmdlen,
1378 <const char *>_inbuf, _inbuf_len,
1379 &_outbuf, &_outbuf_len,
1380 &_outs, &_outs_len)
1381
1382 my_outs = decode_cstr(_outs[:_outs_len])
1383 my_outbuf = _outbuf[:_outbuf_len]
1384 if _outs_len:
1385 rados_buffer_free(_outs)
1386 if _outbuf_len:
1387 rados_buffer_free(_outbuf)
1388 return (ret, my_outbuf, my_outs)
1389 finally:
1390 free(_cmd)
1391
1392 def wait_for_latest_osdmap(self):
1393 self.require_state("connected")
1394 with nogil:
1395 ret = rados_wait_for_latest_osdmap(self.cluster)
1396 return ret
1397
1398 def blacklist_add(self, client_address, expire_seconds=0):
1399 """
1400 Blacklist a client from the OSDs
1401
1402 :param client_address: client address
1403 :type client_address: str
1404 :param expire_seconds: number of seconds to blacklist
1405 :type expire_seconds: int
1406
1407 :raises: :class:`Error`
1408 """
1409 self.require_state("connected")
1410 client_address = cstr(client_address, 'client_address')
1411 cdef:
1412 uint32_t _expire_seconds = expire_seconds
1413 char *_client_address = client_address
1414
1415 with nogil:
1416 ret = rados_blacklist_add(self.cluster, _client_address, _expire_seconds)
1417 if ret < 0:
1418 raise make_ex(ret, "error blacklisting client '%s'" % client_address)
1419
1420 def monitor_log(self, level, callback, arg):
1421 if level not in MONITOR_LEVELS:
1422 raise LogicError("invalid monitor level " + level)
1423 if callback is not None and not callable(callback):
1424 raise LogicError("callback must be a callable function or None")
1425
1426 level = cstr(level, 'level')
1427 cdef char *_level = level
1428
1429 if callback is None:
1430 with nogil:
1431 r = rados_monitor_log(self.cluster, <const char*>_level, NULL, NULL)
1432 self.monitor_callback = None
1433 self.monitor_callback2 = None
1434 return
1435
1436 cb = (callback, arg)
1437 cdef PyObject* _arg = <PyObject*>cb
1438 with nogil:
1439 r = rados_monitor_log(self.cluster, <const char*>_level,
1440 <rados_log_callback_t>&__monitor_callback, _arg)
1441
1442 if r:
1443 raise make_ex(r, 'error calling rados_monitor_log')
1444 # NOTE(sileht): Prevents the callback method from being garbage collected
1445 self.monitor_callback = cb
1446 self.monitor_callback2 = None
1447
1448 def monitor_log2(self, level, callback, arg):
1449 if level not in MONITOR_LEVELS:
1450 raise LogicError("invalid monitor level " + level)
1451 if callback is not None and not callable(callback):
1452 raise LogicError("callback must be a callable function or None")
1453
1454 level = cstr(level, 'level')
1455 cdef char *_level = level
1456
1457 if callback is None:
1458 with nogil:
1459 r = rados_monitor_log2(self.cluster, <const char*>_level, NULL, NULL)
1460 self.monitor_callback = None
1461 self.monitor_callback2 = None
1462 return
1463
1464 cb = (callback, arg)
1465 cdef PyObject* _arg = <PyObject*>cb
1466 with nogil:
1467 r = rados_monitor_log2(self.cluster, <const char*>_level,
1468 <rados_log_callback2_t>&__monitor_callback2, _arg)
1469
1470 if r:
1471 raise make_ex(r, 'error calling rados_monitor_log')
1472 # NOTE(sileht): Prevents the callback method from being garbage collected
1473 self.monitor_callback = None
1474 self.monitor_callback2 = cb
1475
1476
1477 cdef class OmapIterator(object):
1478 """Omap iterator"""
1479
1480 cdef public Ioctx ioctx
1481 cdef rados_omap_iter_t ctx
1482
1483 def __cinit__(self, Ioctx ioctx):
1484 self.ioctx = ioctx
1485
1486 def __iter__(self):
1487 return self
1488
1489 def __next__(self):
1490 """
1491 Get the next key-value pair in the object
1492 :returns: next rados.OmapItem
1493 """
1494 cdef:
1495 char *key_ = NULL
1496 char *val_ = NULL
1497 size_t len_
1498
1499 with nogil:
1500 ret = rados_omap_get_next(self.ctx, &key_, &val_, &len_)
1501
1502 if ret != 0:
1503 raise make_ex(ret, "error iterating over the omap")
1504 if key_ == NULL:
1505 raise StopIteration()
1506 key = decode_cstr(key_)
1507 val = None
1508 if val_ != NULL:
1509 val = val_[:len_]
1510 return (key, val)
1511
1512 def __dealloc__(self):
1513 with nogil:
1514 rados_omap_get_end(self.ctx)
1515
1516
1517 cdef class ObjectIterator(object):
1518 """rados.Ioctx Object iterator"""
1519
1520 cdef rados_list_ctx_t ctx
1521
1522 cdef public object ioctx
1523
1524 def __cinit__(self, Ioctx ioctx):
1525 self.ioctx = ioctx
1526
1527 with nogil:
1528 ret = rados_nobjects_list_open(ioctx.io, &self.ctx)
1529 if ret < 0:
1530 raise make_ex(ret, "error iterating over the objects in ioctx '%s'"
1531 % self.ioctx.name)
1532
1533 def __iter__(self):
1534 return self
1535
1536 def __next__(self):
1537 """
1538 Get the next object name and locator in the pool
1539
1540 :raises: StopIteration
1541 :returns: next rados.Ioctx Object
1542 """
1543 cdef:
1544 const char *key_ = NULL
1545 const char *locator_ = NULL
1546 const char *nspace_ = NULL
1547
1548 with nogil:
1549 ret = rados_nobjects_list_next(self.ctx, &key_, &locator_, &nspace_)
1550
1551 if ret < 0:
1552 raise StopIteration()
1553
1554 key = decode_cstr(key_)
1555 locator = decode_cstr(locator_) if locator_ != NULL else None
1556 nspace = decode_cstr(nspace_) if nspace_ != NULL else None
1557 return Object(self.ioctx, key, locator, nspace)
1558
1559 def __dealloc__(self):
1560 with nogil:
1561 rados_nobjects_list_close(self.ctx)
1562
1563
1564 cdef class XattrIterator(object):
1565 """Extended attribute iterator"""
1566
1567 cdef rados_xattrs_iter_t it
1568 cdef char* _oid
1569
1570 cdef public Ioctx ioctx
1571 cdef public object oid
1572
1573 def __cinit__(self, Ioctx ioctx, oid):
1574 self.ioctx = ioctx
1575 self.oid = cstr(oid, 'oid')
1576 self._oid = self.oid
1577
1578 with nogil:
1579 ret = rados_getxattrs(ioctx.io, self._oid, &self.it)
1580 if ret != 0:
1581 raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid)
1582
1583 def __iter__(self):
1584 return self
1585
1586 def __next__(self):
1587 """
1588 Get the next xattr on the object
1589
1590 :raises: StopIteration
1591 :returns: pair - of name and value of the next Xattr
1592 """
1593 cdef:
1594 const char *name_ = NULL
1595 const char *val_ = NULL
1596 size_t len_ = 0
1597
1598 with nogil:
1599 ret = rados_getxattrs_next(self.it, &name_, &val_, &len_)
1600 if ret != 0:
1601 raise make_ex(ret, "error iterating over the extended attributes \
1602 in '%s'" % self.oid)
1603 if name_ == NULL:
1604 raise StopIteration()
1605 name = decode_cstr(name_)
1606 val = val_[:len_]
1607 return (name, val)
1608
1609 def __dealloc__(self):
1610 with nogil:
1611 rados_getxattrs_end(self.it)
1612
1613
1614 cdef class SnapIterator(object):
1615 """Snapshot iterator"""
1616
1617 cdef public Ioctx ioctx
1618
1619 cdef rados_snap_t *snaps
1620 cdef int max_snap
1621 cdef int cur_snap
1622
1623 def __cinit__(self, Ioctx ioctx):
1624 self.ioctx = ioctx
1625 # We don't know how big a buffer we need until we've called the
1626 # function. So use the exponential doubling strategy.
1627 cdef int num_snaps = 10
1628 while True:
1629 self.snaps = <rados_snap_t*>realloc_chk(self.snaps,
1630 num_snaps *
1631 sizeof(rados_snap_t))
1632
1633 with nogil:
1634 ret = rados_ioctx_snap_list(ioctx.io, self.snaps, num_snaps)
1635 if ret >= 0:
1636 self.max_snap = ret
1637 break
1638 elif ret != -errno.ERANGE:
1639 raise make_ex(ret, "error calling rados_snap_list for \
1640 ioctx '%s'" % self.ioctx.name)
1641 num_snaps = num_snaps * 2
1642 self.cur_snap = 0
1643
1644 def __iter__(self):
1645 return self
1646
1647 def __next__(self):
1648 """
1649 Get the next Snapshot
1650
1651 :raises: :class:`Error`, StopIteration
1652 :returns: Snap - next snapshot
1653 """
1654 if self.cur_snap >= self.max_snap:
1655 raise StopIteration
1656
1657 cdef:
1658 rados_snap_t snap_id = self.snaps[self.cur_snap]
1659 int name_len = 10
1660 char *name = NULL
1661
1662 try:
1663 while True:
1664 name = <char *>realloc_chk(name, name_len)
1665 with nogil:
1666 ret = rados_ioctx_snap_get_name(self.ioctx.io, snap_id, name, name_len)
1667 if ret == 0:
1668 break
1669 elif ret != -errno.ERANGE:
1670 raise make_ex(ret, "rados_snap_get_name error")
1671 else:
1672 name_len = name_len * 2
1673
1674 snap = Snap(self.ioctx, decode_cstr(name[:name_len]).rstrip('\0'), snap_id)
1675 self.cur_snap = self.cur_snap + 1
1676 return snap
1677 finally:
1678 free(name)
1679
1680
1681 cdef class Snap(object):
1682 """Snapshot object"""
1683 cdef public Ioctx ioctx
1684 cdef public object name
1685
1686 # NOTE(sileht): old API was storing the ctypes object
1687 # instead of the value ....
1688 cdef public rados_snap_t snap_id
1689
1690 def __cinit__(self, Ioctx ioctx, object name, rados_snap_t snap_id):
1691 self.ioctx = ioctx
1692 self.name = name
1693 self.snap_id = snap_id
1694
1695 def __str__(self):
1696 return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \
1697 % (str(self.ioctx), self.name, self.snap_id)
1698
1699 def get_timestamp(self):
1700 """
1701 Find when a snapshot in the current pool occurred
1702
1703 :raises: :class:`Error`
1704 :returns: datetime - the data and time the snapshot was created
1705 """
1706 cdef time_t snap_time
1707
1708 with nogil:
1709 ret = rados_ioctx_snap_get_stamp(self.ioctx.io, self.snap_id, &snap_time)
1710 if ret != 0:
1711 raise make_ex(ret, "rados_ioctx_snap_get_stamp error")
1712 return datetime.fromtimestamp(snap_time)
1713
1714
1715 cdef class Completion(object):
1716 """completion object"""
1717
1718 cdef public:
1719 Ioctx ioctx
1720 object oncomplete
1721 object onsafe
1722
1723 cdef:
1724 rados_callback_t complete_cb
1725 rados_callback_t safe_cb
1726 rados_completion_t rados_comp
1727 PyObject* buf
1728
1729 def __cinit__(self, Ioctx ioctx, object oncomplete, object onsafe):
1730 self.oncomplete = oncomplete
1731 self.onsafe = onsafe
1732 self.ioctx = ioctx
1733
1734 def is_safe(self):
1735 """
1736 Is an asynchronous operation safe?
1737
1738 This does not imply that the safe callback has finished.
1739
1740 :returns: True if the operation is safe
1741 """
1742 with nogil:
1743 ret = rados_aio_is_safe(self.rados_comp)
1744 return ret == 1
1745
1746 def is_complete(self):
1747 """
1748 Has an asynchronous operation completed?
1749
1750 This does not imply that the safe callback has finished.
1751
1752 :returns: True if the operation is completed
1753 """
1754 with nogil:
1755 ret = rados_aio_is_complete(self.rados_comp)
1756 return ret == 1
1757
1758 def wait_for_safe(self):
1759 """
1760 Wait for an asynchronous operation to be marked safe
1761
1762 This does not imply that the safe callback has finished.
1763 """
1764 with nogil:
1765 rados_aio_wait_for_safe(self.rados_comp)
1766
1767 def wait_for_complete(self):
1768 """
1769 Wait for an asynchronous operation to complete
1770
1771 This does not imply that the complete callback has finished.
1772 """
1773 with nogil:
1774 rados_aio_wait_for_complete(self.rados_comp)
1775
1776 def wait_for_safe_and_cb(self):
1777 """
1778 Wait for an asynchronous operation to be marked safe and for
1779 the safe callback to have returned
1780 """
1781 with nogil:
1782 rados_aio_wait_for_safe_and_cb(self.rados_comp)
1783
1784 def wait_for_complete_and_cb(self):
1785 """
1786 Wait for an asynchronous operation to complete and for the
1787 complete callback to have returned
1788
1789 :returns: whether the operation is completed
1790 """
1791 with nogil:
1792 ret = rados_aio_wait_for_complete_and_cb(self.rados_comp)
1793 return ret
1794
1795 def get_return_value(self):
1796 """
1797 Get the return value of an asychronous operation
1798
1799 The return value is set when the operation is complete or safe,
1800 whichever comes first.
1801
1802 :returns: int - return value of the operation
1803 """
1804 with nogil:
1805 ret = rados_aio_get_return_value(self.rados_comp)
1806 return ret
1807
1808 def __dealloc__(self):
1809 """
1810 Release a completion
1811
1812 Call this when you no longer need the completion. It may not be
1813 freed immediately if the operation is not acked and committed.
1814 """
1815 ref.Py_XDECREF(self.buf)
1816 self.buf = NULL
1817 if self.rados_comp != NULL:
1818 with nogil:
1819 rados_aio_release(self.rados_comp)
1820 self.rados_comp = NULL
1821
1822 def _complete(self):
1823 self.oncomplete(self)
1824 with self.ioctx.lock:
1825 if self.oncomplete:
1826 self.ioctx.complete_completions.remove(self)
1827
1828 def _safe(self):
1829 self.onsafe(self)
1830 with self.ioctx.lock:
1831 if self.onsafe:
1832 self.ioctx.safe_completions.remove(self)
1833
1834 def _cleanup(self):
1835 with self.ioctx.lock:
1836 if self.oncomplete:
1837 self.ioctx.complete_completions.remove(self)
1838 if self.onsafe:
1839 self.ioctx.safe_completions.remove(self)
1840
1841
1842 class OpCtx(object):
1843 def __enter__(self):
1844 return self.create()
1845
1846 def __exit__(self, type, msg, traceback):
1847 self.release()
1848
1849
1850 cdef class WriteOp(object):
1851 cdef rados_write_op_t write_op
1852
1853 def create(self):
1854 with nogil:
1855 self.write_op = rados_create_write_op()
1856 return self
1857
1858 def release(self):
1859 with nogil:
1860 rados_release_write_op(self.write_op)
1861
1862 @requires(('exclusive', opt(int)))
1863 def new(self, exclusive=None):
1864 """
1865 Create the object.
1866 """
1867
1868 cdef:
1869 int _exclusive = exclusive
1870
1871 with nogil:
1872 rados_write_op_create(self.write_op, _exclusive, NULL)
1873
1874
1875 def remove(self):
1876 """
1877 Remove object.
1878 """
1879 with nogil:
1880 rados_write_op_remove(self.write_op)
1881
1882 @requires(('flags', int))
1883 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
1884 """
1885 Set flags for the last operation added to this write_op.
1886 :para flags: flags to apply to the last operation
1887 :type flags: int
1888 """
1889
1890 cdef:
1891 int _flags = flags
1892
1893 with nogil:
1894 rados_write_op_set_flags(self.write_op, _flags)
1895
1896 @requires(('to_write', bytes))
1897 def append(self, to_write):
1898 """
1899 Append data to an object synchronously
1900 :param to_write: data to write
1901 :type to_write: bytes
1902 """
1903
1904 cdef:
1905 char *_to_write = to_write
1906 size_t length = len(to_write)
1907
1908 with nogil:
1909 rados_write_op_append(self.write_op, _to_write, length)
1910
1911 @requires(('to_write', bytes))
1912 def write_full(self, to_write):
1913 """
1914 Write whole object, atomically replacing it.
1915 :param to_write: data to write
1916 :type to_write: bytes
1917 """
1918
1919 cdef:
1920 char *_to_write = to_write
1921 size_t length = len(to_write)
1922
1923 with nogil:
1924 rados_write_op_write_full(self.write_op, _to_write, length)
1925
1926 @requires(('to_write', bytes), ('offset', int))
1927 def write(self, to_write, offset=0):
1928 """
1929 Write to offset.
1930 :param to_write: data to write
1931 :type to_write: bytes
1932 :param offset: byte offset in the object to begin writing at
1933 :type offset: int
1934 """
1935
1936 cdef:
1937 char *_to_write = to_write
1938 size_t length = len(to_write)
1939 uint64_t _offset = offset
1940
1941 with nogil:
1942 rados_write_op_write(self.write_op, _to_write, length, _offset)
1943
1944 @requires(('offset', int), ('length', int))
1945 def zero(self, offset, length):
1946 """
1947 Zero part of an object.
1948 :param offset: byte offset in the object to begin writing at
1949 :type offset: int
1950 :param offset: number of zero to write
1951 :type offset: int
1952 """
1953
1954 cdef:
1955 size_t _length = length
1956 uint64_t _offset = offset
1957
1958 with nogil:
1959 rados_write_op_zero(self.write_op, _length, _offset)
1960
1961 @requires(('offset', int))
1962 def truncate(self, offset):
1963 """
1964 Truncate an object.
1965 :param offset: byte offset in the object to begin truncating at
1966 :type offset: int
1967 """
1968
1969 cdef:
1970 uint64_t _offset = offset
1971
1972 with nogil:
1973 rados_write_op_truncate(self.write_op, _offset)
1974
1975
1976 class WriteOpCtx(WriteOp, OpCtx):
1977 """write operation context manager"""
1978
1979
1980 cdef class ReadOp(object):
1981 cdef rados_read_op_t read_op
1982
1983 def create(self):
1984 with nogil:
1985 self.read_op = rados_create_read_op()
1986 return self
1987
1988 def release(self):
1989 with nogil:
1990 rados_release_read_op(self.read_op)
1991
1992 @requires(('flags', int))
1993 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
1994 """
1995 Set flags for the last operation added to this read_op.
1996 :para flags: flags to apply to the last operation
1997 :type flags: int
1998 """
1999
2000 cdef:
2001 int _flags = flags
2002
2003 with nogil:
2004 rados_read_op_set_flags(self.read_op, _flags)
2005
2006
2007 class ReadOpCtx(ReadOp, OpCtx):
2008 """read operation context manager"""
2009
2010
2011 cdef int __aio_safe_cb(rados_completion_t completion, void *args) with gil:
2012 """
2013 Callback to onsafe() for asynchronous operations
2014 """
2015 cdef object cb = <object>args
2016 cb._safe()
2017 return 0
2018
2019
2020 cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
2021 """
2022 Callback to oncomplete() for asynchronous operations
2023 """
2024 cdef object cb = <object>args
2025 cb._complete()
2026 return 0
2027
2028
2029 cdef class Ioctx(object):
2030 """rados.Ioctx object"""
2031 # NOTE(sileht): attributes declared in .pyd
2032
2033 def __init__(self, name):
2034 self.name = name
2035 self.state = "open"
2036
2037 self.locator_key = ""
2038 self.nspace = ""
2039 self.lock = threading.Lock()
2040 self.safe_completions = []
2041 self.complete_completions = []
2042
2043 def __enter__(self):
2044 return self
2045
2046 def __exit__(self, type_, value, traceback):
2047 self.close()
2048 return False
2049
2050 def __dealloc__(self):
2051 self.close()
2052
2053 def __track_completion(self, completion_obj):
2054 if completion_obj.oncomplete:
2055 with self.lock:
2056 self.complete_completions.append(completion_obj)
2057 if completion_obj.onsafe:
2058 with self.lock:
2059 self.safe_completions.append(completion_obj)
2060
2061 def __get_completion(self, oncomplete, onsafe):
2062 """
2063 Constructs a completion to use with asynchronous operations
2064
2065 :param oncomplete: what to do when the write is safe and complete in memory
2066 on all replicas
2067 :type oncomplete: completion
2068 :param onsafe: what to do when the write is safe and complete on storage
2069 on all replicas
2070 :type onsafe: completion
2071
2072 :raises: :class:`Error`
2073 :returns: completion object
2074 """
2075
2076 completion_obj = Completion(self, oncomplete, onsafe)
2077
2078 cdef:
2079 rados_callback_t complete_cb = NULL
2080 rados_callback_t safe_cb = NULL
2081 rados_completion_t completion
2082 PyObject* p_completion_obj= <PyObject*>completion_obj
2083
2084 if oncomplete:
2085 complete_cb = <rados_callback_t>&__aio_complete_cb
2086 if onsafe:
2087 safe_cb = <rados_callback_t>&__aio_safe_cb
2088
2089 with nogil:
2090 ret = rados_aio_create_completion(p_completion_obj, complete_cb, safe_cb,
2091 &completion)
2092 if ret < 0:
2093 raise make_ex(ret, "error getting a completion")
2094
2095 completion_obj.rados_comp = completion
2096 return completion_obj
2097
2098 @requires(('object_name', str_type), ('oncomplete', opt(Callable)))
2099 def aio_stat(self, object_name, oncomplete):
2100 """
2101 Asynchronously get object stats (size/mtime)
2102
2103 oncomplete will be called with the returned size and mtime
2104 as well as the completion:
2105
2106 oncomplete(completion, size, mtime)
2107
2108 :param object_name: the name of the object to get stats from
2109 :type object_name: str
2110 :param oncomplete: what to do when the stat is complete
2111 :type oncomplete: completion
2112
2113 :raises: :class:`Error`
2114 :returns: completion object
2115 """
2116
2117 object_name = cstr(object_name, 'object_name')
2118
2119 cdef:
2120 Completion completion
2121 char *_object_name = object_name
2122 uint64_t psize
2123 time_t pmtime
2124
2125 def oncomplete_(completion_v):
2126 cdef Completion _completion_v = completion_v
2127 return_value = _completion_v.get_return_value()
2128 if return_value >= 0:
2129 return oncomplete(_completion_v, psize, time.localtime(pmtime))
2130 else:
2131 return oncomplete(_completion_v, None, None)
2132
2133 completion = self.__get_completion(oncomplete_, None)
2134 self.__track_completion(completion)
2135 with nogil:
2136 ret = rados_aio_stat(self.io, _object_name, completion.rados_comp,
2137 &psize, &pmtime)
2138
2139 if ret < 0:
2140 completion._cleanup()
2141 raise make_ex(ret, "error stating %s" % object_name)
2142 return completion
2143
2144 @requires(('object_name', str_type), ('to_write', bytes), ('offset', int),
2145 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2146 def aio_write(self, object_name, to_write, offset=0,
2147 oncomplete=None, onsafe=None):
2148 """
2149 Write data to an object asynchronously
2150
2151 Queues the write and returns.
2152
2153 :param object_name: name of the object
2154 :type object_name: str
2155 :param to_write: data to write
2156 :type to_write: bytes
2157 :param offset: byte offset in the object to begin writing at
2158 :type offset: int
2159 :param oncomplete: what to do when the write is safe and complete in memory
2160 on all replicas
2161 :type oncomplete: completion
2162 :param onsafe: what to do when the write is safe and complete on storage
2163 on all replicas
2164 :type onsafe: completion
2165
2166 :raises: :class:`Error`
2167 :returns: completion object
2168 """
2169
2170 object_name = cstr(object_name, 'object_name')
2171
2172 cdef:
2173 Completion completion
2174 char* _object_name = object_name
2175 char* _to_write = to_write
2176 size_t size = len(to_write)
2177 uint64_t _offset = offset
2178
2179 completion = self.__get_completion(oncomplete, onsafe)
2180 self.__track_completion(completion)
2181 with nogil:
2182 ret = rados_aio_write(self.io, _object_name, completion.rados_comp,
2183 _to_write, size, _offset)
2184 if ret < 0:
2185 completion._cleanup()
2186 raise make_ex(ret, "error writing object %s" % object_name)
2187 return completion
2188
2189 @requires(('object_name', str_type), ('to_write', bytes), ('oncomplete', opt(Callable)),
2190 ('onsafe', opt(Callable)))
2191 def aio_write_full(self, object_name, to_write,
2192 oncomplete=None, onsafe=None):
2193 """
2194 Asychronously write an entire object
2195
2196 The object is filled with the provided data. If the object exists,
2197 it is atomically truncated and then written.
2198 Queues the write and returns.
2199
2200 :param object_name: name of the object
2201 :type object_name: str
2202 :param to_write: data to write
2203 :type to_write: str
2204 :param oncomplete: what to do when the write is safe and complete in memory
2205 on all replicas
2206 :type oncomplete: completion
2207 :param onsafe: what to do when the write is safe and complete on storage
2208 on all replicas
2209 :type onsafe: completion
2210
2211 :raises: :class:`Error`
2212 :returns: completion object
2213 """
2214
2215 object_name = cstr(object_name, 'object_name')
2216
2217 cdef:
2218 Completion completion
2219 char* _object_name = object_name
2220 char* _to_write = to_write
2221 size_t size = len(to_write)
2222
2223 completion = self.__get_completion(oncomplete, onsafe)
2224 self.__track_completion(completion)
2225 with nogil:
2226 ret = rados_aio_write_full(self.io, _object_name,
2227 completion.rados_comp,
2228 _to_write, size)
2229 if ret < 0:
2230 completion._cleanup()
2231 raise make_ex(ret, "error writing object %s" % object_name)
2232 return completion
2233
2234 @requires(('object_name', str_type), ('to_append', bytes), ('oncomplete', opt(Callable)),
2235 ('onsafe', opt(Callable)))
2236 def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):
2237 """
2238 Asychronously append data to an object
2239
2240 Queues the write and returns.
2241
2242 :param object_name: name of the object
2243 :type object_name: str
2244 :param to_append: data to append
2245 :type to_append: str
2246 :param offset: byte offset in the object to begin writing at
2247 :type offset: int
2248 :param oncomplete: what to do when the write is safe and complete in memory
2249 on all replicas
2250 :type oncomplete: completion
2251 :param onsafe: what to do when the write is safe and complete on storage
2252 on all replicas
2253 :type onsafe: completion
2254
2255 :raises: :class:`Error`
2256 :returns: completion object
2257 """
2258 object_name = cstr(object_name, 'object_name')
2259
2260 cdef:
2261 Completion completion
2262 char* _object_name = object_name
2263 char* _to_append = to_append
2264 size_t size = len(to_append)
2265
2266 completion = self.__get_completion(oncomplete, onsafe)
2267 self.__track_completion(completion)
2268 with nogil:
2269 ret = rados_aio_append(self.io, _object_name,
2270 completion.rados_comp,
2271 _to_append, size)
2272 if ret < 0:
2273 completion._cleanup()
2274 raise make_ex(ret, "error appending object %s" % object_name)
2275 return completion
2276
2277 def aio_flush(self):
2278 """
2279 Block until all pending writes in an io context are safe
2280
2281 :raises: :class:`Error`
2282 """
2283 with nogil:
2284 ret = rados_aio_flush(self.io)
2285 if ret < 0:
2286 raise make_ex(ret, "error flushing")
2287
2288 @requires(('object_name', str_type), ('length', int), ('offset', int),
2289 ('oncomplete', opt(Callable)))
2290 def aio_read(self, object_name, length, offset, oncomplete):
2291 """
2292 Asychronously read data from an object
2293
2294 oncomplete will be called with the returned read value as
2295 well as the completion:
2296
2297 oncomplete(completion, data_read)
2298
2299 :param object_name: name of the object to read from
2300 :type object_name: str
2301 :param length: the number of bytes to read
2302 :type length: int
2303 :param offset: byte offset in the object to begin reading from
2304 :type offset: int
2305 :param oncomplete: what to do when the read is complete
2306 :type oncomplete: completion
2307
2308 :raises: :class:`Error`
2309 :returns: completion object
2310 """
2311
2312 object_name = cstr(object_name, 'object_name')
2313
2314 cdef:
2315 Completion completion
2316 char* _object_name = object_name
2317 uint64_t _offset = offset
2318
2319 char *ref_buf
2320 size_t _length = length
2321
2322 def oncomplete_(completion_v):
2323 cdef Completion _completion_v = completion_v
2324 return_value = _completion_v.get_return_value()
2325 if return_value > 0 and return_value != length:
2326 _PyBytes_Resize(&_completion_v.buf, return_value)
2327 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2328
2329 completion = self.__get_completion(oncomplete_, None)
2330 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2331 ret_buf = PyBytes_AsString(completion.buf)
2332 self.__track_completion(completion)
2333 with nogil:
2334 ret = rados_aio_read(self.io, _object_name, completion.rados_comp,
2335 ret_buf, _length, _offset)
2336 if ret < 0:
2337 completion._cleanup()
2338 raise make_ex(ret, "error reading %s" % object_name)
2339 return completion
2340
2341 @requires(('object_name', str_type), ('cls', str_type), ('method', str_type),
2342 ('data', bytes), ('length', int),
2343 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2344 def aio_execute(self, object_name, cls, method, data,
2345 length=8192, oncomplete=None, onsafe=None):
2346 """
2347 Asynchronously execute an OSD class method on an object.
2348
2349 oncomplete and onsafe will be called with the data returned from
2350 the plugin as well as the completion:
2351
2352 oncomplete(completion, data)
2353 onsafe(completion, data)
2354
2355 :param object_name: name of the object
2356 :type object_name: str
2357 :param cls: name of the object class
2358 :type cls: str
2359 :param method: name of the method
2360 :type method: str
2361 :param data: input data
2362 :type data: bytes
2363 :param length: size of output buffer in bytes (default=8192)
2364 :type length: int
2365 :param oncomplete: what to do when the execution is complete
2366 :type oncomplete: completion
2367 :param onsafe: what to do when the execution is safe and complete
2368 :type onsafe: completion
2369
2370 :raises: :class:`Error`
2371 :returns: completion object
2372 """
2373
2374 object_name = cstr(object_name, 'object_name')
2375 cls = cstr(cls, 'cls')
2376 method = cstr(method, 'method')
2377 cdef:
2378 Completion completion
2379 char *_object_name = object_name
2380 char *_cls = cls
2381 char *_method = method
2382 char *_data = data
2383 size_t _data_len = len(data)
2384
2385 char *ref_buf
2386 size_t _length = length
2387
2388 def oncomplete_(completion_v):
2389 cdef Completion _completion_v = completion_v
2390 return_value = _completion_v.get_return_value()
2391 if return_value > 0 and return_value != length:
2392 _PyBytes_Resize(&_completion_v.buf, return_value)
2393 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2394
2395 def onsafe_(completion_v):
2396 cdef Completion _completion_v = completion_v
2397 return_value = _completion_v.get_return_value()
2398 return onsafe(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2399
2400 completion = self.__get_completion(oncomplete_ if oncomplete else None, onsafe_ if onsafe else None)
2401 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2402 ret_buf = PyBytes_AsString(completion.buf)
2403 self.__track_completion(completion)
2404 with nogil:
2405 ret = rados_aio_exec(self.io, _object_name, completion.rados_comp,
2406 _cls, _method, _data, _data_len, ret_buf, _length)
2407 if ret < 0:
2408 completion._cleanup()
2409 raise make_ex(ret, "error executing %s::%s on %s" % (cls, method, object_name))
2410 return completion
2411
2412 @requires(('object_name', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2413 def aio_remove(self, object_name, oncomplete=None, onsafe=None):
2414 """
2415 Asychronously remove an object
2416
2417 :param object_name: name of the object to remove
2418 :type object_name: str
2419 :param oncomplete: what to do when the remove is safe and complete in memory
2420 on all replicas
2421 :type oncomplete: completion
2422 :param onsafe: what to do when the remove is safe and complete on storage
2423 on all replicas
2424 :type onsafe: completion
2425
2426 :raises: :class:`Error`
2427 :returns: completion object
2428 """
2429 object_name = cstr(object_name, 'object_name')
2430
2431 cdef:
2432 Completion completion
2433 char* _object_name = object_name
2434
2435 completion = self.__get_completion(oncomplete, onsafe)
2436 self.__track_completion(completion)
2437 with nogil:
2438 ret = rados_aio_remove(self.io, _object_name,
2439 completion.rados_comp)
2440 if ret < 0:
2441 completion._cleanup()
2442 raise make_ex(ret, "error removing %s" % object_name)
2443 return completion
2444
2445 def require_ioctx_open(self):
2446 """
2447 Checks if the rados.Ioctx object state is 'open'
2448
2449 :raises: IoctxStateError
2450 """
2451 if self.state != "open":
2452 raise IoctxStateError("The pool is %s" % self.state)
2453
2454 def change_auid(self, auid):
2455 """
2456 Attempt to change an io context's associated auid "owner."
2457
2458 Requires that you have write permission on both the current and new
2459 auid.
2460
2461 :raises: :class:`Error`
2462 """
2463 self.require_ioctx_open()
2464
2465 cdef:
2466 uint64_t _auid = auid
2467
2468 with nogil:
2469 ret = rados_ioctx_pool_set_auid(self.io, _auid)
2470 if ret < 0:
2471 raise make_ex(ret, "error changing auid of '%s' to %d"
2472 % (self.name, auid))
2473
2474 @requires(('loc_key', str_type))
2475 def set_locator_key(self, loc_key):
2476 """
2477 Set the key for mapping objects to pgs within an io context.
2478
2479 The key is used instead of the object name to determine which
2480 placement groups an object is put in. This affects all subsequent
2481 operations of the io context - until a different locator key is
2482 set, all objects in this io context will be placed in the same pg.
2483
2484 :param loc_key: the key to use as the object locator, or NULL to discard
2485 any previously set key
2486 :type loc_key: str
2487
2488 :raises: :class:`TypeError`
2489 """
2490 self.require_ioctx_open()
2491 cloc_key = cstr(loc_key, 'loc_key')
2492 cdef char *_loc_key = cloc_key
2493 with nogil:
2494 rados_ioctx_locator_set_key(self.io, _loc_key)
2495 self.locator_key = loc_key
2496
2497 def get_locator_key(self):
2498 """
2499 Get the locator_key of context
2500
2501 :returns: locator_key
2502 """
2503 return self.locator_key
2504
2505 @requires(('snap_id', long))
2506 def set_read(self, snap_id):
2507 """
2508 Set the snapshot for reading objects.
2509
2510 To stop to read from snapshot, use set_read(LIBRADOS_SNAP_HEAD)
2511
2512 :param snap_id: the snapshot Id
2513 :type snap_id: int
2514
2515 :raises: :class:`TypeError`
2516 """
2517 self.require_ioctx_open()
2518 cdef rados_snap_t _snap_id = snap_id
2519 with nogil:
2520 rados_ioctx_snap_set_read(self.io, _snap_id)
2521
2522 @requires(('nspace', str_type))
2523 def set_namespace(self, nspace):
2524 """
2525 Set the namespace for objects within an io context.
2526
2527 The namespace in addition to the object name fully identifies
2528 an object. This affects all subsequent operations of the io context
2529 - until a different namespace is set, all objects in this io context
2530 will be placed in the same namespace.
2531
2532 :param nspace: the namespace to use, or None/"" for the default namespace
2533 :type nspace: str
2534
2535 :raises: :class:`TypeError`
2536 """
2537 self.require_ioctx_open()
2538 if nspace is None:
2539 nspace = ""
2540 cnspace = cstr(nspace, 'nspace')
2541 cdef char *_nspace = cnspace
2542 with nogil:
2543 rados_ioctx_set_namespace(self.io, _nspace)
2544 self.nspace = nspace
2545
2546 def get_namespace(self):
2547 """
2548 Get the namespace of context
2549
2550 :returns: namespace
2551 """
2552 return self.nspace
2553
2554 def close(self):
2555 """
2556 Close a rados.Ioctx object.
2557
2558 This just tells librados that you no longer need to use the io context.
2559 It may not be freed immediately if there are pending asynchronous
2560 requests on it, but you should not use an io context again after
2561 calling this function on it.
2562 """
2563 if self.state == "open":
2564 self.require_ioctx_open()
2565 with nogil:
2566 rados_ioctx_destroy(self.io)
2567 self.state = "closed"
2568
2569
2570 @requires(('key', str_type), ('data', bytes))
2571 def write(self, key, data, offset=0):
2572 """
2573 Write data to an object synchronously
2574
2575 :param key: name of the object
2576 :type key: str
2577 :param data: data to write
2578 :type data: bytes
2579 :param offset: byte offset in the object to begin writing at
2580 :type offset: int
2581
2582 :raises: :class:`TypeError`
2583 :raises: :class:`LogicError`
2584 :returns: int - 0 on success
2585 """
2586 self.require_ioctx_open()
2587
2588 key = cstr(key, 'key')
2589 cdef:
2590 char *_key = key
2591 char *_data = data
2592 size_t length = len(data)
2593 uint64_t _offset = offset
2594
2595 with nogil:
2596 ret = rados_write(self.io, _key, _data, length, _offset)
2597 if ret == 0:
2598 return ret
2599 elif ret < 0:
2600 raise make_ex(ret, "Ioctx.write(%s): failed to write %s"
2601 % (self.name, key))
2602 else:
2603 raise LogicError("Ioctx.write(%s): rados_write \
2604 returned %d, but should return zero on success." % (self.name, ret))
2605
2606 @requires(('key', str_type), ('data', bytes))
2607 def write_full(self, key, data):
2608 """
2609 Write an entire object synchronously.
2610
2611 The object is filled with the provided data. If the object exists,
2612 it is atomically truncated and then written.
2613
2614 :param key: name of the object
2615 :type key: str
2616 :param data: data to write
2617 :type data: bytes
2618
2619 :raises: :class:`TypeError`
2620 :raises: :class:`Error`
2621 :returns: int - 0 on success
2622 """
2623 self.require_ioctx_open()
2624 key = cstr(key, 'key')
2625 cdef:
2626 char *_key = key
2627 char *_data = data
2628 size_t length = len(data)
2629
2630 with nogil:
2631 ret = rados_write_full(self.io, _key, _data, length)
2632 if ret == 0:
2633 return ret
2634 elif ret < 0:
2635 raise make_ex(ret, "Ioctx.write_full(%s): failed to write %s"
2636 % (self.name, key))
2637 else:
2638 raise LogicError("Ioctx.write_full(%s): rados_write_full \
2639 returned %d, but should return zero on success." % (self.name, ret))
2640
2641 @requires(('key', str_type), ('data', bytes))
2642 def append(self, key, data):
2643 """
2644 Append data to an object synchronously
2645
2646 :param key: name of the object
2647 :type key: str
2648 :param data: data to write
2649 :type data: bytes
2650
2651 :raises: :class:`TypeError`
2652 :raises: :class:`LogicError`
2653 :returns: int - 0 on success
2654 """
2655 self.require_ioctx_open()
2656 key = cstr(key, 'key')
2657 cdef:
2658 char *_key = key
2659 char *_data = data
2660 size_t length = len(data)
2661
2662 with nogil:
2663 ret = rados_append(self.io, _key, _data, length)
2664 if ret == 0:
2665 return ret
2666 elif ret < 0:
2667 raise make_ex(ret, "Ioctx.append(%s): failed to append %s"
2668 % (self.name, key))
2669 else:
2670 raise LogicError("Ioctx.append(%s): rados_append \
2671 returned %d, but should return zero on success." % (self.name, ret))
2672
2673 @requires(('key', str_type))
2674 def read(self, key, length=8192, offset=0):
2675 """
2676 Read data from an object synchronously
2677
2678 :param key: name of the object
2679 :type key: str
2680 :param length: the number of bytes to read (default=8192)
2681 :type length: int
2682 :param offset: byte offset in the object to begin reading at
2683 :type offset: int
2684
2685 :raises: :class:`TypeError`
2686 :raises: :class:`Error`
2687 :returns: str - data read from object
2688 """
2689 self.require_ioctx_open()
2690 key = cstr(key, 'key')
2691 cdef:
2692 char *_key = key
2693 char *ret_buf
2694 uint64_t _offset = offset
2695 size_t _length = length
2696 PyObject* ret_s = NULL
2697
2698 ret_s = PyBytes_FromStringAndSize(NULL, length)
2699 try:
2700 ret_buf = PyBytes_AsString(ret_s)
2701 with nogil:
2702 ret = rados_read(self.io, _key, ret_buf, _length, _offset)
2703 if ret < 0:
2704 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2705
2706 if ret != length:
2707 _PyBytes_Resize(&ret_s, ret)
2708
2709 return <object>ret_s
2710 finally:
2711 # We DECREF unconditionally: the cast to object above will have
2712 # INCREFed if necessary. This also takes care of exceptions,
2713 # including if _PyString_Resize fails (that will free the string
2714 # itself and set ret_s to NULL, hence XDECREF).
2715 ref.Py_XDECREF(ret_s)
2716
2717 @requires(('key', str_type), ('cls', str_type), ('method', str_type), ('data', bytes))
2718 def execute(self, key, cls, method, data, length=8192):
2719 """
2720 Execute an OSD class method on an object.
2721
2722 :param key: name of the object
2723 :type key: str
2724 :param cls: name of the object class
2725 :type cls: str
2726 :param method: name of the method
2727 :type method: str
2728 :param data: input data
2729 :type data: bytes
2730 :param length: size of output buffer in bytes (default=8192)
2731 :type length: int
2732
2733 :raises: :class:`TypeError`
2734 :raises: :class:`Error`
2735 :returns: (ret, method output)
2736 """
2737 self.require_ioctx_open()
2738
2739 key = cstr(key, 'key')
2740 cls = cstr(cls, 'cls')
2741 method = cstr(method, 'method')
2742 cdef:
2743 char *_key = key
2744 char *_cls = cls
2745 char *_method = method
2746 char *_data = data
2747 size_t _data_len = len(data)
2748
2749 char *ref_buf
2750 size_t _length = length
2751 PyObject* ret_s = NULL
2752
2753 ret_s = PyBytes_FromStringAndSize(NULL, length)
2754 try:
2755 ret_buf = PyBytes_AsString(ret_s)
2756 with nogil:
2757 ret = rados_exec(self.io, _key, _cls, _method, _data,
2758 _data_len, ret_buf, _length)
2759 if ret < 0:
2760 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2761
2762 if ret != length:
2763 _PyBytes_Resize(&ret_s, ret)
2764
2765 return ret, <object>ret_s
2766 finally:
2767 # We DECREF unconditionally: the cast to object above will have
2768 # INCREFed if necessary. This also takes care of exceptions,
2769 # including if _PyString_Resize fails (that will free the string
2770 # itself and set ret_s to NULL, hence XDECREF).
2771 ref.Py_XDECREF(ret_s)
2772
2773 def get_stats(self):
2774 """
2775 Get pool usage statistics
2776
2777 :returns: dict - contains the following keys:
2778
2779 - ``num_bytes`` (int) - size of pool in bytes
2780
2781 - ``num_kb`` (int) - size of pool in kbytes
2782
2783 - ``num_objects`` (int) - number of objects in the pool
2784
2785 - ``num_object_clones`` (int) - number of object clones
2786
2787 - ``num_object_copies`` (int) - number of object copies
2788
2789 - ``num_objects_missing_on_primary`` (int) - number of objets
2790 missing on primary
2791
2792 - ``num_objects_unfound`` (int) - number of unfound objects
2793
2794 - ``num_objects_degraded`` (int) - number of degraded objects
2795
2796 - ``num_rd`` (int) - bytes read
2797
2798 - ``num_rd_kb`` (int) - kbytes read
2799
2800 - ``num_wr`` (int) - bytes written
2801
2802 - ``num_wr_kb`` (int) - kbytes written
2803 """
2804 self.require_ioctx_open()
2805 cdef rados_pool_stat_t stats
2806 with nogil:
2807 ret = rados_ioctx_pool_stat(self.io, &stats)
2808 if ret < 0:
2809 raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name)
2810 return {'num_bytes': stats.num_bytes,
2811 'num_kb': stats.num_kb,
2812 'num_objects': stats.num_objects,
2813 'num_object_clones': stats.num_object_clones,
2814 'num_object_copies': stats.num_object_copies,
2815 "num_objects_missing_on_primary": stats.num_objects_missing_on_primary,
2816 "num_objects_unfound": stats.num_objects_unfound,
2817 "num_objects_degraded": stats.num_objects_degraded,
2818 "num_rd": stats.num_rd,
2819 "num_rd_kb": stats.num_rd_kb,
2820 "num_wr": stats.num_wr,
2821 "num_wr_kb": stats.num_wr_kb}
2822
2823 @requires(('key', str_type))
2824 def remove_object(self, key):
2825 """
2826 Delete an object
2827
2828 This does not delete any snapshots of the object.
2829
2830 :param key: the name of the object to delete
2831 :type key: str
2832
2833 :raises: :class:`TypeError`
2834 :raises: :class:`Error`
2835 :returns: bool - True on success
2836 """
2837 self.require_ioctx_open()
2838 key = cstr(key, 'key')
2839 cdef:
2840 char *_key = key
2841
2842 with nogil:
2843 ret = rados_remove(self.io, _key)
2844 if ret < 0:
2845 raise make_ex(ret, "Failed to remove '%s'" % key)
2846 return True
2847
2848 @requires(('key', str_type))
2849 def trunc(self, key, size):
2850 """
2851 Resize an object
2852
2853 If this enlarges the object, the new area is logically filled with
2854 zeroes. If this shrinks the object, the excess data is removed.
2855
2856 :param key: the name of the object to resize
2857 :type key: str
2858 :param size: the new size of the object in bytes
2859 :type size: int
2860
2861 :raises: :class:`TypeError`
2862 :raises: :class:`Error`
2863 :returns: int - 0 on success, otherwise raises error
2864 """
2865
2866 self.require_ioctx_open()
2867 key = cstr(key, 'key')
2868 cdef:
2869 char *_key = key
2870 uint64_t _size = size
2871
2872 with nogil:
2873 ret = rados_trunc(self.io, _key, _size)
2874 if ret < 0:
2875 raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))
2876 return ret
2877
2878 @requires(('key', str_type))
2879 def stat(self, key):
2880 """
2881 Get object stats (size/mtime)
2882
2883 :param key: the name of the object to get stats from
2884 :type key: str
2885
2886 :raises: :class:`TypeError`
2887 :raises: :class:`Error`
2888 :returns: (size,timestamp)
2889 """
2890 self.require_ioctx_open()
2891
2892 key = cstr(key, 'key')
2893 cdef:
2894 char *_key = key
2895 uint64_t psize
2896 time_t pmtime
2897
2898 with nogil:
2899 ret = rados_stat(self.io, _key, &psize, &pmtime)
2900 if ret < 0:
2901 raise make_ex(ret, "Failed to stat %r" % key)
2902 return psize, time.localtime(pmtime)
2903
2904 @requires(('key', str_type), ('xattr_name', str_type))
2905 def get_xattr(self, key, xattr_name):
2906 """
2907 Get the value of an extended attribute on an object.
2908
2909 :param key: the name of the object to get xattr from
2910 :type key: str
2911 :param xattr_name: which extended attribute to read
2912 :type xattr_name: str
2913
2914 :raises: :class:`TypeError`
2915 :raises: :class:`Error`
2916 :returns: str - value of the xattr
2917 """
2918 self.require_ioctx_open()
2919
2920 key = cstr(key, 'key')
2921 xattr_name = cstr(xattr_name, 'xattr_name')
2922 cdef:
2923 char *_key = key
2924 char *_xattr_name = xattr_name
2925 size_t ret_length = 4096
2926 char *ret_buf = NULL
2927
2928 try:
2929 while ret_length < 4096 * 1024 * 1024:
2930 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
2931 with nogil:
2932 ret = rados_getxattr(self.io, _key, _xattr_name, ret_buf, ret_length)
2933 if ret == -errno.ERANGE:
2934 ret_length *= 2
2935 elif ret < 0:
2936 raise make_ex(ret, "Failed to get xattr %r" % xattr_name)
2937 else:
2938 break
2939 return ret_buf[:ret]
2940 finally:
2941 free(ret_buf)
2942
2943 @requires(('oid', str_type))
2944 def get_xattrs(self, oid):
2945 """
2946 Start iterating over xattrs on an object.
2947
2948 :param oid: the name of the object to get xattrs from
2949 :type oid: str
2950
2951 :raises: :class:`TypeError`
2952 :raises: :class:`Error`
2953 :returns: XattrIterator
2954 """
2955 self.require_ioctx_open()
2956 return XattrIterator(self, oid)
2957
2958 @requires(('key', str_type), ('xattr_name', str_type), ('xattr_value', bytes))
2959 def set_xattr(self, key, xattr_name, xattr_value):
2960 """
2961 Set an extended attribute on an object.
2962
2963 :param key: the name of the object to set xattr to
2964 :type key: str
2965 :param xattr_name: which extended attribute to set
2966 :type xattr_name: str
2967 :param xattr_value: the value of the extended attribute
2968 :type xattr_value: bytes
2969
2970 :raises: :class:`TypeError`
2971 :raises: :class:`Error`
2972 :returns: bool - True on success, otherwise raise an error
2973 """
2974 self.require_ioctx_open()
2975
2976 key = cstr(key, 'key')
2977 xattr_name = cstr(xattr_name, 'xattr_name')
2978 cdef:
2979 char *_key = key
2980 char *_xattr_name = xattr_name
2981 char *_xattr_value = xattr_value
2982 size_t _xattr_value_len = len(xattr_value)
2983
2984 with nogil:
2985 ret = rados_setxattr(self.io, _key, _xattr_name,
2986 _xattr_value, _xattr_value_len)
2987 if ret < 0:
2988 raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
2989 return True
2990
2991 @requires(('key', str_type), ('xattr_name', str_type))
2992 def rm_xattr(self, key, xattr_name):
2993 """
2994 Removes an extended attribute on from an object.
2995
2996 :param key: the name of the object to remove xattr from
2997 :type key: str
2998 :param xattr_name: which extended attribute to remove
2999 :type xattr_name: str
3000
3001 :raises: :class:`TypeError`
3002 :raises: :class:`Error`
3003 :returns: bool - True on success, otherwise raise an error
3004 """
3005 self.require_ioctx_open()
3006
3007 key = cstr(key, 'key')
3008 xattr_name = cstr(xattr_name, 'xattr_name')
3009 cdef:
3010 char *_key = key
3011 char *_xattr_name = xattr_name
3012
3013 with nogil:
3014 ret = rados_rmxattr(self.io, _key, _xattr_name)
3015 if ret < 0:
3016 raise make_ex(ret, "Failed to delete key %r xattr %r" %
3017 (key, xattr_name))
3018 return True
3019
3020 def list_objects(self):
3021 """
3022 Get ObjectIterator on rados.Ioctx object.
3023
3024 :returns: ObjectIterator
3025 """
3026 self.require_ioctx_open()
3027 return ObjectIterator(self)
3028
3029 def list_snaps(self):
3030 """
3031 Get SnapIterator on rados.Ioctx object.
3032
3033 :returns: SnapIterator
3034 """
3035 self.require_ioctx_open()
3036 return SnapIterator(self)
3037
3038 @requires(('snap_name', str_type))
3039 def create_snap(self, snap_name):
3040 """
3041 Create a pool-wide snapshot
3042
3043 :param snap_name: the name of the snapshot
3044 :type snap_name: str
3045
3046 :raises: :class:`TypeError`
3047 :raises: :class:`Error`
3048 """
3049 self.require_ioctx_open()
3050 snap_name = cstr(snap_name, 'snap_name')
3051 cdef char *_snap_name = snap_name
3052
3053 with nogil:
3054 ret = rados_ioctx_snap_create(self.io, _snap_name)
3055 if ret != 0:
3056 raise make_ex(ret, "Failed to create snap %s" % snap_name)
3057
3058 @requires(('snap_name', str_type))
3059 def remove_snap(self, snap_name):
3060 """
3061 Removes a pool-wide snapshot
3062
3063 :param snap_name: the name of the snapshot
3064 :type snap_name: str
3065
3066 :raises: :class:`TypeError`
3067 :raises: :class:`Error`
3068 """
3069 self.require_ioctx_open()
3070 snap_name = cstr(snap_name, 'snap_name')
3071 cdef char *_snap_name = snap_name
3072
3073 with nogil:
3074 ret = rados_ioctx_snap_remove(self.io, _snap_name)
3075 if ret != 0:
3076 raise make_ex(ret, "Failed to remove snap %s" % snap_name)
3077
3078 @requires(('snap_name', str_type))
3079 def lookup_snap(self, snap_name):
3080 """
3081 Get the id of a pool snapshot
3082
3083 :param snap_name: the name of the snapshot to lookop
3084 :type snap_name: str
3085
3086 :raises: :class:`TypeError`
3087 :raises: :class:`Error`
3088 :returns: Snap - on success
3089 """
3090 self.require_ioctx_open()
3091 csnap_name = cstr(snap_name, 'snap_name')
3092 cdef:
3093 char *_snap_name = csnap_name
3094 rados_snap_t snap_id
3095
3096 with nogil:
3097 ret = rados_ioctx_snap_lookup(self.io, _snap_name, &snap_id)
3098 if ret != 0:
3099 raise make_ex(ret, "Failed to lookup snap %s" % snap_name)
3100 return Snap(self, snap_name, int(snap_id))
3101
3102 @requires(('oid', str_type), ('snap_name', str_type))
3103 def snap_rollback(self, oid, snap_name):
3104 """
3105 Rollback an object to a snapshot
3106
3107 :param oid: the name of the object
3108 :type oid: str
3109 :param snap_name: the name of the snapshot
3110 :type snap_name: str
3111
3112 :raises: :class:`TypeError`
3113 :raises: :class:`Error`
3114 """
3115 self.require_ioctx_open()
3116 oid = cstr(oid, 'oid')
3117 snap_name = cstr(snap_name, 'snap_name')
3118 cdef:
3119 char *_snap_name = snap_name
3120 char *_oid = oid
3121
3122 with nogil:
3123 ret = rados_ioctx_snap_rollback(self.io, _oid, _snap_name)
3124 if ret != 0:
3125 raise make_ex(ret, "Failed to rollback %s" % oid)
3126
3127 def create_self_managed_snap(self):
3128 """
3129 Creates a self-managed snapshot
3130
3131 :returns: snap id on success
3132
3133 :raises: :class:`Error`
3134 """
3135 self.require_ioctx_open()
3136 cdef:
3137 rados_snap_t _snap_id
3138 with nogil:
3139 ret = rados_ioctx_selfmanaged_snap_create(self.io, &_snap_id)
3140 if ret != 0:
3141 raise make_ex(ret, "Failed to create self-managed snapshot")
3142 return int(_snap_id)
3143
3144 @requires(('snap_id', int))
3145 def remove_self_managed_snap(self, snap_id):
3146 """
3147 Removes a self-managed snapshot
3148
3149 :param snap_id: the name of the snapshot
3150 :type snap_id: int
3151
3152 :raises: :class:`TypeError`
3153 :raises: :class:`Error`
3154 """
3155 self.require_ioctx_open()
3156 cdef:
3157 rados_snap_t _snap_id = snap_id
3158 with nogil:
3159 ret = rados_ioctx_selfmanaged_snap_remove(self.io, _snap_id)
3160 if ret != 0:
3161 raise make_ex(ret, "Failed to remove self-managed snapshot")
3162
3163 def set_self_managed_snap_write(self, snaps):
3164 """
3165 Updates the write context to the specified self-managed
3166 snapshot ids.
3167
3168 :param snaps: all associated self-managed snapshot ids
3169 :type snaps: list
3170
3171 :raises: :class:`TypeError`
3172 :raises: :class:`Error`
3173 """
3174 self.require_ioctx_open()
3175 sorted_snaps = []
3176 snap_seq = 0
3177 if snaps:
3178 sorted_snaps = sorted([int(x) for x in snaps], reverse=True)
3179 snap_seq = sorted_snaps[0]
3180
3181 cdef:
3182 rados_snap_t _snap_seq = snap_seq
3183 rados_snap_t *_snaps = NULL
3184 int _num_snaps = len(sorted_snaps)
3185 try:
3186 _snaps = <rados_snap_t *>malloc(_num_snaps * sizeof(rados_snap_t))
3187 for i in range(len(sorted_snaps)):
3188 _snaps[i] = sorted_snaps[i]
3189 with nogil:
3190 ret = rados_ioctx_selfmanaged_snap_set_write_ctx(self.io,
3191 _snap_seq,
3192 _snaps,
3193 _num_snaps)
3194 if ret != 0:
3195 raise make_ex(ret, "Failed to update snapshot write context")
3196 finally:
3197 free(_snaps)
3198
3199 @requires(('oid', str_type), ('snap_id', int))
3200 def rollback_self_managed_snap(self, oid, snap_id):
3201 """
3202 Rolls an specific object back to a self-managed snapshot revision
3203
3204 :param oid: the name of the object
3205 :type oid: str
3206 :param snap_id: the name of the snapshot
3207 :type snap_id: int
3208
3209 :raises: :class:`TypeError`
3210 :raises: :class:`Error`
3211 """
3212 self.require_ioctx_open()
3213 oid = cstr(oid, 'oid')
3214 cdef:
3215 char *_oid = oid
3216 rados_snap_t _snap_id = snap_id
3217 with nogil:
3218 ret = rados_ioctx_selfmanaged_snap_rollback(self.io, _oid, _snap_id)
3219 if ret != 0:
3220 raise make_ex(ret, "Failed to rollback %s" % oid)
3221
3222 def get_last_version(self):
3223 """
3224 Return the version of the last object read or written to.
3225
3226 This exposes the internal version number of the last object read or
3227 written via this io context
3228
3229 :returns: version of the last object used
3230 """
3231 self.require_ioctx_open()
3232 with nogil:
3233 ret = rados_get_last_version(self.io)
3234 return int(ret)
3235
3236 def create_write_op(self):
3237 """
3238 create write operation object.
3239 need call release_write_op after use
3240 """
3241 return WriteOp().create()
3242
3243 def create_read_op(self):
3244 """
3245 create read operation object.
3246 need call release_read_op after use
3247 """
3248 return ReadOp().create()
3249
3250 def release_write_op(self, write_op):
3251 """
3252 release memory alloc by create_write_op
3253 """
3254 write_op.release()
3255
3256 def release_read_op(self, read_op):
3257 """
3258 release memory alloc by create_read_op
3259 :para read_op: read_op object
3260 :type: int
3261 """
3262 read_op.release()
3263
3264 @requires(('write_op', WriteOp), ('keys', tuple), ('values', tuple))
3265 def set_omap(self, write_op, keys, values):
3266 """
3267 set keys values to write_op
3268 :para write_op: write_operation object
3269 :type write_op: WriteOp
3270 :para keys: a tuple of keys
3271 :type keys: tuple
3272 :para values: a tuple of values
3273 :type values: tuple
3274 """
3275
3276 if len(keys) != len(values):
3277 raise Error("Rados(): keys and values must have the same number of items")
3278
3279 keys = cstr_list(keys, 'keys')
3280 cdef:
3281 WriteOp _write_op = write_op
3282 size_t key_num = len(keys)
3283 char **_keys = to_bytes_array(keys)
3284 char **_values = to_bytes_array(values)
3285 size_t *_lens = to_csize_t_array([len(v) for v in values])
3286
3287 try:
3288 with nogil:
3289 rados_write_op_omap_set(_write_op.write_op,
3290 <const char**>_keys,
3291 <const char**>_values,
3292 <const size_t*>_lens, key_num)
3293 finally:
3294 free(_keys)
3295 free(_values)
3296 free(_lens)
3297
3298 @requires(('write_op', WriteOp), ('oid', str_type), ('mtime', opt(int)), ('flags', opt(int)))
3299 def operate_write_op(self, write_op, oid, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3300 """
3301 excute the real write operation
3302 :para write_op: write operation object
3303 :type write_op: WriteOp
3304 :para oid: object name
3305 :type oid: str
3306 :para mtime: the time to set the mtime to, 0 for the current time
3307 :type mtime: int
3308 :para flags: flags to apply to the entire operation
3309 :type flags: int
3310 """
3311
3312 oid = cstr(oid, 'oid')
3313 cdef:
3314 WriteOp _write_op = write_op
3315 char *_oid = oid
3316 time_t _mtime = mtime
3317 int _flags = flags
3318
3319 with nogil:
3320 ret = rados_write_op_operate(_write_op.write_op, self.io, _oid, &_mtime, _flags)
3321 if ret != 0:
3322 raise make_ex(ret, "Failed to operate write op for oid %s" % oid)
3323
3324 @requires(('write_op', WriteOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('mtime', opt(int)), ('flags', opt(int)))
3325 def operate_aio_write_op(self, write_op, oid, oncomplete=None, onsafe=None, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3326 """
3327 excute the real write operation asynchronously
3328 :para write_op: write operation object
3329 :type write_op: WriteOp
3330 :para oid: object name
3331 :type oid: str
3332 :param oncomplete: what to do when the remove is safe and complete in memory
3333 on all replicas
3334 :type oncomplete: completion
3335 :param onsafe: what to do when the remove is safe and complete on storage
3336 on all replicas
3337 :type onsafe: completion
3338 :para mtime: the time to set the mtime to, 0 for the current time
3339 :type mtime: int
3340 :para flags: flags to apply to the entire operation
3341 :type flags: int
3342
3343 :raises: :class:`Error`
3344 :returns: completion object
3345 """
3346
3347 oid = cstr(oid, 'oid')
3348 cdef:
3349 WriteOp _write_op = write_op
3350 char *_oid = oid
3351 Completion completion
3352 time_t _mtime = mtime
3353 int _flags = flags
3354
3355 completion = self.__get_completion(oncomplete, onsafe)
3356 self.__track_completion(completion)
3357
3358 with nogil:
3359 ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid,
3360 &_mtime, _flags)
3361 if ret != 0:
3362 completion._cleanup()
3363 raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid)
3364 return completion
3365
3366 @requires(('read_op', ReadOp), ('oid', str_type), ('flag', opt(int)))
3367 def operate_read_op(self, read_op, oid, flag=LIBRADOS_OPERATION_NOFLAG):
3368 """
3369 excute the real read operation
3370 :para read_op: read operation object
3371 :type read_op: ReadOp
3372 :para oid: object name
3373 :type oid: str
3374 :para flag: flags to apply to the entire operation
3375 :type flag: int
3376 """
3377 oid = cstr(oid, 'oid')
3378 cdef:
3379 ReadOp _read_op = read_op
3380 char *_oid = oid
3381 int _flag = flag
3382
3383 with nogil:
3384 ret = rados_read_op_operate(_read_op.read_op, self.io, _oid, _flag)
3385 if ret != 0:
3386 raise make_ex(ret, "Failed to operate read op for oid %s" % oid)
3387
3388 @requires(('read_op', ReadOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('flag', opt(int)))
3389 def operate_aio_read_op(self, read_op, oid, oncomplete=None, onsafe=None, flag=LIBRADOS_OPERATION_NOFLAG):
3390 """
3391 excute the real read operation
3392 :para read_op: read operation object
3393 :type read_op: ReadOp
3394 :para oid: object name
3395 :type oid: str
3396 :param oncomplete: what to do when the remove is safe and complete in memory
3397 on all replicas
3398 :type oncomplete: completion
3399 :param onsafe: what to do when the remove is safe and complete on storage
3400 on all replicas
3401 :type onsafe: completion
3402 :para flag: flags to apply to the entire operation
3403 :type flag: int
3404 """
3405 oid = cstr(oid, 'oid')
3406 cdef:
3407 ReadOp _read_op = read_op
3408 char *_oid = oid
3409 Completion completion
3410 int _flag = flag
3411
3412 completion = self.__get_completion(oncomplete, onsafe)
3413 self.__track_completion(completion)
3414
3415 with nogil:
3416 ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag)
3417 if ret != 0:
3418 completion._cleanup()
3419 raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid)
3420 return completion
3421
3422 @requires(('read_op', ReadOp), ('start_after', str_type), ('filter_prefix', str_type), ('max_return', int))
3423 def get_omap_vals(self, read_op, start_after, filter_prefix, max_return):
3424 """
3425 get the omap values
3426 :para read_op: read operation object
3427 :type read_op: ReadOp
3428 :para start_after: list keys starting after start_after
3429 :type start_after: str
3430 :para filter_prefix: list only keys beginning with filter_prefix
3431 :type filter_prefix: str
3432 :para max_return: list no more than max_return key/value pairs
3433 :type max_return: int
3434 :returns: an iterator over the requested omap values, return value from this action
3435 """
3436
3437 start_after = cstr(start_after, 'start_after') if start_after else None
3438 filter_prefix = cstr(filter_prefix, 'filter_prefix') if filter_prefix else None
3439 cdef:
3440 char *_start_after = opt_str(start_after)
3441 char *_filter_prefix = opt_str(filter_prefix)
3442 ReadOp _read_op = read_op
3443 rados_omap_iter_t iter_addr = NULL
3444 int _max_return = max_return
3445 int prval = 0
3446
3447 with nogil:
3448 rados_read_op_omap_get_vals2(_read_op.read_op, _start_after, _filter_prefix,
3449 _max_return, &iter_addr, NULL, &prval)
3450 it = OmapIterator(self)
3451 it.ctx = iter_addr
3452 return it, int(prval)
3453
3454 @requires(('read_op', ReadOp), ('start_after', str_type), ('max_return', int))
3455 def get_omap_keys(self, read_op, start_after, max_return):
3456 """
3457 get the omap keys
3458 :para read_op: read operation object
3459 :type read_op: ReadOp
3460 :para start_after: list keys starting after start_after
3461 :type start_after: str
3462 :para max_return: list no more than max_return key/value pairs
3463 :type max_return: int
3464 :returns: an iterator over the requested omap values, return value from this action
3465 """
3466 start_after = cstr(start_after, 'start_after') if start_after else None
3467 cdef:
3468 char *_start_after = opt_str(start_after)
3469 ReadOp _read_op = read_op
3470 rados_omap_iter_t iter_addr = NULL
3471 int _max_return = max_return
3472 int prval = 0
3473
3474 with nogil:
3475 rados_read_op_omap_get_keys2(_read_op.read_op, _start_after,
3476 _max_return, &iter_addr, NULL, &prval)
3477 it = OmapIterator(self)
3478 it.ctx = iter_addr
3479 return it, int(prval)
3480
3481 @requires(('read_op', ReadOp), ('keys', tuple))
3482 def get_omap_vals_by_keys(self, read_op, keys):
3483 """
3484 get the omap values by keys
3485 :para read_op: read operation object
3486 :type read_op: ReadOp
3487 :para keys: input key tuple
3488 :type keys: tuple
3489 :returns: an iterator over the requested omap values, return value from this action
3490 """
3491 keys = cstr_list(keys, 'keys')
3492 cdef:
3493 ReadOp _read_op = read_op
3494 rados_omap_iter_t iter_addr
3495 char **_keys = to_bytes_array(keys)
3496 size_t key_num = len(keys)
3497 int prval = 0
3498
3499 try:
3500 with nogil:
3501 rados_read_op_omap_get_vals_by_keys(_read_op.read_op,
3502 <const char**>_keys,
3503 key_num, &iter_addr, &prval)
3504 it = OmapIterator(self)
3505 it.ctx = iter_addr
3506 return it, int(prval)
3507 finally:
3508 free(_keys)
3509
3510 @requires(('write_op', WriteOp), ('keys', tuple))
3511 def remove_omap_keys(self, write_op, keys):
3512 """
3513 remove omap keys specifiled
3514 :para write_op: write operation object
3515 :type write_op: WriteOp
3516 :para keys: input key tuple
3517 :type keys: tuple
3518 """
3519
3520 keys = cstr_list(keys, 'keys')
3521 cdef:
3522 WriteOp _write_op = write_op
3523 size_t key_num = len(keys)
3524 char **_keys = to_bytes_array(keys)
3525
3526 try:
3527 with nogil:
3528 rados_write_op_omap_rm_keys(_write_op.write_op, <const char**>_keys, key_num)
3529 finally:
3530 free(_keys)
3531
3532 @requires(('write_op', WriteOp))
3533 def clear_omap(self, write_op):
3534 """
3535 Remove all key/value pairs from an object
3536 :para write_op: write operation object
3537 :type write_op: WriteOp
3538 """
3539
3540 cdef:
3541 WriteOp _write_op = write_op
3542
3543 with nogil:
3544 rados_write_op_omap_clear(_write_op.write_op)
3545
3546 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('desc', str_type),
3547 ('duration', opt(int)), ('flags', int))
3548 def lock_exclusive(self, key, name, cookie, desc="", duration=None, flags=0):
3549
3550 """
3551 Take an exclusive lock on an object
3552
3553 :param key: name of the object
3554 :type key: str
3555 :param name: name of the lock
3556 :type name: str
3557 :param cookie: cookie of the lock
3558 :type cookie: str
3559 :param desc: description of the lock
3560 :type desc: str
3561 :param duration: duration of the lock in seconds
3562 :type duration: int
3563 :param flags: flags
3564 :type flags: int
3565
3566 :raises: :class:`TypeError`
3567 :raises: :class:`Error`
3568 """
3569 self.require_ioctx_open()
3570
3571 key = cstr(key, 'key')
3572 name = cstr(name, 'name')
3573 cookie = cstr(cookie, 'cookie')
3574 desc = cstr(desc, 'desc')
3575
3576 cdef:
3577 char* _key = key
3578 char* _name = name
3579 char* _cookie = cookie
3580 char* _desc = desc
3581 uint8_t _flags = flags
3582 timeval _duration
3583
3584 if duration is None:
3585 with nogil:
3586 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3587 NULL, _flags)
3588 else:
3589 _duration.tv_sec = duration
3590 with nogil:
3591 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3592 &_duration, _flags)
3593
3594 if ret < 0:
3595 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3596
3597 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('tag', str_type),
3598 ('desc', str_type), ('duration', opt(int)), ('flags', int))
3599 def lock_shared(self, key, name, cookie, tag, desc="", duration=None, flags=0):
3600
3601 """
3602 Take a shared lock on an object
3603
3604 :param key: name of the object
3605 :type key: str
3606 :param name: name of the lock
3607 :type name: str
3608 :param cookie: cookie of the lock
3609 :type cookie: str
3610 :param tag: tag of the lock
3611 :type tag: str
3612 :param desc: description of the lock
3613 :type desc: str
3614 :param duration: duration of the lock in seconds
3615 :type duration: int
3616 :param flags: flags
3617 :type flags: int
3618
3619 :raises: :class:`TypeError`
3620 :raises: :class:`Error`
3621 """
3622 self.require_ioctx_open()
3623
3624 key = cstr(key, 'key')
3625 tag = cstr(tag, 'tag')
3626 name = cstr(name, 'name')
3627 cookie = cstr(cookie, 'cookie')
3628 desc = cstr(desc, 'desc')
3629
3630 cdef:
3631 char* _key = key
3632 char* _tag = tag
3633 char* _name = name
3634 char* _cookie = cookie
3635 char* _desc = desc
3636 uint8_t _flags = flags
3637 timeval _duration
3638
3639 if duration is None:
3640 with nogil:
3641 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3642 NULL, _flags)
3643 else:
3644 _duration.tv_sec = duration
3645 with nogil:
3646 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3647 &_duration, _flags)
3648 if ret < 0:
3649 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3650
3651 @requires(('key', str_type), ('name', str_type), ('cookie', str_type))
3652 def unlock(self, key, name, cookie):
3653
3654 """
3655 Release a shared or exclusive lock on an object
3656
3657 :param key: name of the object
3658 :type key: str
3659 :param name: name of the lock
3660 :type name: str
3661 :param cookie: cookie of the lock
3662 :type cookie: str
3663
3664 :raises: :class:`TypeError`
3665 :raises: :class:`Error`
3666 """
3667 self.require_ioctx_open()
3668
3669 key = cstr(key, 'key')
3670 name = cstr(name, 'name')
3671 cookie = cstr(cookie, 'cookie')
3672
3673 cdef:
3674 char* _key = key
3675 char* _name = name
3676 char* _cookie = cookie
3677
3678 with nogil:
3679 ret = rados_unlock(self.io, _key, _name, _cookie)
3680 if ret < 0:
3681 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3682
3683 def application_enable(self, app_name, force=False):
3684 """
3685 Enable an application on an OSD pool
3686
3687 :param app_name: application name
3688 :type app_name: str
3689 :param force: False if only a single app should exist per pool
3690 :type expire_seconds: boool
3691
3692 :raises: :class:`Error`
3693 """
3694 app_name = cstr(app_name, 'app_name')
3695 cdef:
3696 char *_app_name = app_name
3697 int _force = (1 if force else 0)
3698
3699 with nogil:
3700 ret = rados_application_enable(self.io, _app_name, _force)
3701 if ret < 0:
3702 raise make_ex(ret, "error enabling application")
3703
3704 def application_list(self):
3705 """
3706 Returns a list of enabled applications
3707
3708 :returns: list of app name string
3709 """
3710 cdef:
3711 size_t length = 128
3712 char *apps = NULL
3713
3714 try:
3715 while True:
3716 apps = <char *>realloc_chk(apps, length)
3717 with nogil:
3718 ret = rados_application_list(self.io, apps, &length)
3719 if ret == 0:
3720 return [decode_cstr(app) for app in
3721 apps[:length].split(b'\0') if app]
3722 elif ret == -errno.ENOENT:
3723 return None
3724 elif ret == -errno.ERANGE:
3725 pass
3726 else:
3727 raise make_ex(ret, "error listing applications")
3728 finally:
3729 free(apps)
3730
3731 def application_metadata_set(self, app_name, key, value):
3732 """
3733 Sets application metadata on an OSD pool
3734
3735 :param app_name: application name
3736 :type app_name: str
3737 :param key: metadata key
3738 :type key: str
3739 :param value: metadata value
3740 :type value: str
3741
3742 :raises: :class:`Error`
3743 """
3744 app_name = cstr(app_name, 'app_name')
3745 key = cstr(key, 'key')
3746 value = cstr(value, 'value')
3747 cdef:
3748 char *_app_name = app_name
3749 char *_key = key
3750 char *_value = value
3751
3752 with nogil:
3753 ret = rados_application_metadata_set(self.io, _app_name, _key,
3754 _value)
3755 if ret < 0:
3756 raise make_ex(ret, "error setting application metadata")
3757
3758 def application_metadata_remove(self, app_name, key):
3759 """
3760 Remove application metadata from an OSD pool
3761
3762 :param app_name: application name
3763 :type app_name: str
3764 :param key: metadata key
3765 :type key: str
3766
3767 :raises: :class:`Error`
3768 """
3769 app_name = cstr(app_name, 'app_name')
3770 key = cstr(key, 'key')
3771 cdef:
3772 char *_app_name = app_name
3773 char *_key = key
3774
3775 with nogil:
3776 ret = rados_application_metadata_remove(self.io, _app_name, _key)
3777 if ret < 0:
3778 raise make_ex(ret, "error removing application metadata")
3779
3780 def application_metadata_list(self, app_name):
3781 """
3782 Returns a list of enabled applications
3783
3784 :param app_name: application name
3785 :type app_name: str
3786 :returns: list of key/value tuples
3787 """
3788 app_name = cstr(app_name, 'app_name')
3789 cdef:
3790 char *_app_name = app_name
3791 size_t key_length = 128
3792 size_t val_length = 128
3793 char *c_keys = NULL
3794 char *c_vals = NULL
3795
3796 try:
3797 while True:
3798 c_keys = <char *>realloc_chk(c_keys, key_length)
3799 c_vals = <char *>realloc_chk(c_vals, val_length)
3800 with nogil:
3801 ret = rados_application_metadata_list(self.io, _app_name,
3802 c_keys, &key_length,
3803 c_vals, &val_length)
3804 if ret == 0:
3805 keys = [decode_cstr(key) for key in
3806 c_keys[:key_length].split(b'\0') if key]
3807 vals = [decode_cstr(val) for val in
3808 c_vals[:val_length].split(b'\0') if val]
3809 return zip(keys, vals)
3810 elif ret == -errno.ERANGE:
3811 pass
3812 else:
3813 raise make_ex(ret, "error listing application metadata")
3814 finally:
3815 free(c_keys)
3816 free(c_vals)
3817
3818
3819 def set_object_locator(func):
3820 def retfunc(self, *args, **kwargs):
3821 if self.locator_key is not None:
3822 old_locator = self.ioctx.get_locator_key()
3823 self.ioctx.set_locator_key(self.locator_key)
3824 retval = func(self, *args, **kwargs)
3825 self.ioctx.set_locator_key(old_locator)
3826 return retval
3827 else:
3828 return func(self, *args, **kwargs)
3829 return retfunc
3830
3831
3832 def set_object_namespace(func):
3833 def retfunc(self, *args, **kwargs):
3834 if self.nspace is None:
3835 raise LogicError("Namespace not set properly in context")
3836 old_nspace = self.ioctx.get_namespace()
3837 self.ioctx.set_namespace(self.nspace)
3838 retval = func(self, *args, **kwargs)
3839 self.ioctx.set_namespace(old_nspace)
3840 return retval
3841 return retfunc
3842
3843
3844 class Object(object):
3845 """Rados object wrapper, makes the object look like a file"""
3846 def __init__(self, ioctx, key, locator_key=None, nspace=None):
3847 self.key = key
3848 self.ioctx = ioctx
3849 self.offset = 0
3850 self.state = "exists"
3851 self.locator_key = locator_key
3852 self.nspace = "" if nspace is None else nspace
3853
3854 def __str__(self):
3855 return "rados.Object(ioctx=%s,key=%s,nspace=%s,locator=%s)" % \
3856 (str(self.ioctx), self.key, "--default--"
3857 if self.nspace is "" else self.nspace, self.locator_key)
3858
3859 def require_object_exists(self):
3860 if self.state != "exists":
3861 raise ObjectStateError("The object is %s" % self.state)
3862
3863 @set_object_locator
3864 @set_object_namespace
3865 def read(self, length=1024 * 1024):
3866 self.require_object_exists()
3867 ret = self.ioctx.read(self.key, length, self.offset)
3868 self.offset += len(ret)
3869 return ret
3870
3871 @set_object_locator
3872 @set_object_namespace
3873 def write(self, string_to_write):
3874 self.require_object_exists()
3875 ret = self.ioctx.write(self.key, string_to_write, self.offset)
3876 if ret == 0:
3877 self.offset += len(string_to_write)
3878 return ret
3879
3880 @set_object_locator
3881 @set_object_namespace
3882 def remove(self):
3883 self.require_object_exists()
3884 self.ioctx.remove_object(self.key)
3885 self.state = "removed"
3886
3887 @set_object_locator
3888 @set_object_namespace
3889 def stat(self):
3890 self.require_object_exists()
3891 return self.ioctx.stat(self.key)
3892
3893 def seek(self, position):
3894 self.require_object_exists()
3895 self.offset = position
3896
3897 @set_object_locator
3898 @set_object_namespace
3899 def get_xattr(self, xattr_name):
3900 self.require_object_exists()
3901 return self.ioctx.get_xattr(self.key, xattr_name)
3902
3903 @set_object_locator
3904 @set_object_namespace
3905 def get_xattrs(self):
3906 self.require_object_exists()
3907 return self.ioctx.get_xattrs(self.key)
3908
3909 @set_object_locator
3910 @set_object_namespace
3911 def set_xattr(self, xattr_name, xattr_value):
3912 self.require_object_exists()
3913 return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)
3914
3915 @set_object_locator
3916 @set_object_namespace
3917 def rm_xattr(self, xattr_name):
3918 self.require_object_exists()
3919 return self.ioctx.rm_xattr(self.key, xattr_name)
3920
3921 MONITOR_LEVELS = [
3922 "debug",
3923 "info",
3924 "warn", "warning",
3925 "err", "error",
3926 "sec",
3927 ]
3928
3929
3930 class MonitorLog(object):
3931 # NOTE(sileht): Keep this class for backward compat
3932 # method moved to Rados.monitor_log()
3933 """
3934 For watching cluster log messages. Instantiate an object and keep
3935 it around while callback is periodically called. Construct with
3936 'level' to monitor 'level' messages (one of MONITOR_LEVELS).
3937 arg will be passed to the callback.
3938
3939 callback will be called with:
3940 arg (given to __init__)
3941 line (the full line, including timestamp, who, level, msg)
3942 who (which entity issued the log message)
3943 timestamp_sec (sec of a struct timespec)
3944 timestamp_nsec (sec of a struct timespec)
3945 seq (sequence number)
3946 level (string representing the level of the log message)
3947 msg (the message itself)
3948 callback's return value is ignored
3949 """
3950 def __init__(self, cluster, level, callback, arg):
3951 self.level = level
3952 self.callback = callback
3953 self.arg = arg
3954 self.cluster = cluster
3955 self.cluster.monitor_log(level, callback, arg)
3956