]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/rados/rados.pyx
import ceph 14.2.5
[ceph.git] / ceph / src / pybind / rados / rados.pyx
1 # cython: embedsignature=True
2 """
3 This module is a thin wrapper around librados.
4
5 Error codes from librados are turned into exceptions that subclass
6 :class:`Error`. Almost all methods may raise :class:`Error(the base class of all rados exceptions), :class:`PermissionError`
7 (the base class of all rados exceptions), :class:`PermissionError`
8 and :class:`IOError`, in addition to those documented for the
9 method.
10 """
11 # Copyright 2011 Josh Durgin
12 # Copyright 2011, Hannu Valtonen <hannu.valtonen@ormod.com>
13 # Copyright 2015 Hector Martin <marcan@marcan.st>
14 # Copyright 2016 Mehdi Abaakouk <sileht@redhat.com>
15
16 from cpython cimport PyObject, ref
17 from cpython.pycapsule cimport *
18 from libc cimport errno
19 from libc.stdint cimport *
20 from libc.stdlib cimport malloc, realloc, free
21
22 import sys
23 import threading
24 import time
25
26 try:
27 from collections.abc import Callable
28 except ImportError:
29 from collections import Callable
30 from datetime import datetime
31 from functools import partial, wraps
32 from itertools import chain
33
34 # Are we running Python 2.x
35 if sys.version_info[0] < 3:
36 str_type = basestring
37 else:
38 str_type = str
39
40
41 cdef extern from "Python.h":
42 # These are in cpython/string.pxd, but use "object" types instead of
43 # PyObject*, which invokes assumptions in cpython that we need to
44 # legitimately break to implement zero-copy string buffers in Ioctx.read().
45 # This is valid use of the Python API and documented as a special case.
46 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
47 char* PyBytes_AsString(PyObject *string) except NULL
48 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
49 void PyEval_InitThreads()
50
51
52 cdef extern from "time.h":
53 ctypedef long int time_t
54 ctypedef long int suseconds_t
55
56
57 cdef extern from "sys/time.h":
58 cdef struct timeval:
59 time_t tv_sec
60 suseconds_t tv_usec
61
62
63 cdef extern from "rados/rados_types.h" nogil:
64 cdef char* _LIBRADOS_ALL_NSPACES "LIBRADOS_ALL_NSPACES"
65
66
67 cdef extern from "rados/librados.h" nogil:
68 enum:
69 _LIBRADOS_OP_FLAG_EXCL "LIBRADOS_OP_FLAG_EXCL"
70 _LIBRADOS_OP_FLAG_FAILOK "LIBRADOS_OP_FLAG_FAILOK"
71 _LIBRADOS_OP_FLAG_FADVISE_RANDOM "LIBRADOS_OP_FLAG_FADVISE_RANDOM"
72 _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL "LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL"
73 _LIBRADOS_OP_FLAG_FADVISE_WILLNEED "LIBRADOS_OP_FLAG_FADVISE_WILLNEED"
74 _LIBRADOS_OP_FLAG_FADVISE_DONTNEED "LIBRADOS_OP_FLAG_FADVISE_DONTNEED"
75 _LIBRADOS_OP_FLAG_FADVISE_NOCACHE "LIBRADOS_OP_FLAG_FADVISE_NOCACHE"
76
77
78 enum:
79 _LIBRADOS_OPERATION_NOFLAG "LIBRADOS_OPERATION_NOFLAG"
80 _LIBRADOS_OPERATION_BALANCE_READS "LIBRADOS_OPERATION_BALANCE_READS"
81 _LIBRADOS_OPERATION_LOCALIZE_READS "LIBRADOS_OPERATION_LOCALIZE_READS"
82 _LIBRADOS_OPERATION_ORDER_READS_WRITES "LIBRADOS_OPERATION_ORDER_READS_WRITES"
83 _LIBRADOS_OPERATION_IGNORE_CACHE "LIBRADOS_OPERATION_IGNORE_CACHE"
84 _LIBRADOS_OPERATION_SKIPRWLOCKS "LIBRADOS_OPERATION_SKIPRWLOCKS"
85 _LIBRADOS_OPERATION_IGNORE_OVERLAY "LIBRADOS_OPERATION_IGNORE_OVERLAY"
86 _LIBRADOS_CREATE_EXCLUSIVE "LIBRADOS_CREATE_EXCLUSIVE"
87 _LIBRADOS_CREATE_IDEMPOTENT "LIBRADOS_CREATE_IDEMPOTENT"
88
89 cdef uint64_t _LIBRADOS_SNAP_HEAD "LIBRADOS_SNAP_HEAD"
90
91 ctypedef void* rados_xattrs_iter_t
92 ctypedef void* rados_omap_iter_t
93 ctypedef void* rados_list_ctx_t
94 ctypedef uint64_t rados_snap_t
95 ctypedef void *rados_write_op_t
96 ctypedef void *rados_read_op_t
97 ctypedef void *rados_completion_t
98 ctypedef void (*rados_callback_t)(rados_completion_t cb, void *arg)
99 ctypedef void (*rados_log_callback_t)(void *arg, const char *line, const char *who,
100 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
101 ctypedef void (*rados_log_callback2_t)(void *arg, const char *line, const char *channel, const char *who, const char *name,
102 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
103
104
105 cdef struct rados_cluster_stat_t:
106 uint64_t kb
107 uint64_t kb_used
108 uint64_t kb_avail
109 uint64_t num_objects
110
111 cdef struct rados_pool_stat_t:
112 uint64_t num_bytes
113 uint64_t num_kb
114 uint64_t num_objects
115 uint64_t num_object_clones
116 uint64_t num_object_copies
117 uint64_t num_objects_missing_on_primary
118 uint64_t num_objects_unfound
119 uint64_t num_objects_degraded
120 uint64_t num_rd
121 uint64_t num_rd_kb
122 uint64_t num_wr
123 uint64_t num_wr_kb
124
125 void rados_buffer_free(char *buf)
126
127 void rados_version(int *major, int *minor, int *extra)
128 int rados_create2(rados_t *pcluster, const char *const clustername,
129 const char * const name, uint64_t flags)
130 int rados_create_with_context(rados_t *cluster, rados_config_t cct)
131 int rados_connect(rados_t cluster)
132 void rados_shutdown(rados_t cluster)
133 uint64_t rados_get_instance_id(rados_t cluster)
134 int rados_conf_read_file(rados_t cluster, const char *path)
135 int rados_conf_parse_argv_remainder(rados_t cluster, int argc, const char **argv, const char **remargv)
136 int rados_conf_parse_env(rados_t cluster, const char *var)
137 int rados_conf_set(rados_t cluster, char *option, const char *value)
138 int rados_conf_get(rados_t cluster, char *option, char *buf, size_t len)
139
140 int rados_ioctx_pool_stat(rados_ioctx_t io, rados_pool_stat_t *stats)
141 int64_t rados_pool_lookup(rados_t cluster, const char *pool_name)
142 int rados_pool_reverse_lookup(rados_t cluster, int64_t id, char *buf, size_t maxlen)
143 int rados_pool_create(rados_t cluster, const char *pool_name)
144 int rados_pool_create_with_crush_rule(rados_t cluster, const char *pool_name, uint8_t crush_rule_num)
145 int rados_pool_get_base_tier(rados_t cluster, int64_t pool, int64_t *base_tier)
146 int rados_pool_list(rados_t cluster, char *buf, size_t len)
147 int rados_pool_delete(rados_t cluster, const char *pool_name)
148 int rados_inconsistent_pg_list(rados_t cluster, int64_t pool, char *buf, size_t len)
149
150 int rados_cluster_stat(rados_t cluster, rados_cluster_stat_t *result)
151 int rados_cluster_fsid(rados_t cluster, char *buf, size_t len)
152 int rados_blacklist_add(rados_t cluster, char *client_address, uint32_t expire_seconds)
153 int rados_application_enable(rados_ioctx_t io, const char *app_name,
154 int force)
155 void rados_set_osdmap_full_try(rados_ioctx_t io)
156 void rados_unset_osdmap_full_try(rados_ioctx_t io)
157 int rados_application_list(rados_ioctx_t io, char *values,
158 size_t *values_len)
159 int rados_application_metadata_get(rados_ioctx_t io, const char *app_name,
160 const char *key, char *value,
161 size_t *value_len)
162 int rados_application_metadata_set(rados_ioctx_t io, const char *app_name,
163 const char *key, const char *value)
164 int rados_application_metadata_remove(rados_ioctx_t io,
165 const char *app_name, const char *key)
166 int rados_application_metadata_list(rados_ioctx_t io,
167 const char *app_name, char *keys,
168 size_t *key_len, char *values,
169 size_t *value_len)
170 int rados_ping_monitor(rados_t cluster, const char *mon_id, char **outstr, size_t *outstrlen)
171 int rados_mon_command(rados_t cluster, const char **cmd, size_t cmdlen,
172 const char *inbuf, size_t inbuflen,
173 char **outbuf, size_t *outbuflen,
174 char **outs, size_t *outslen)
175 int rados_mgr_command(rados_t cluster, const char **cmd, size_t cmdlen,
176 const char *inbuf, size_t inbuflen,
177 char **outbuf, size_t *outbuflen,
178 char **outs, size_t *outslen)
179 int rados_mon_command_target(rados_t cluster, const char *name, const char **cmd, size_t cmdlen,
180 const char *inbuf, size_t inbuflen,
181 char **outbuf, size_t *outbuflen,
182 char **outs, size_t *outslen)
183 int rados_osd_command(rados_t cluster, int osdid, const char **cmd, size_t cmdlen,
184 const char *inbuf, size_t inbuflen,
185 char **outbuf, size_t *outbuflen,
186 char **outs, size_t *outslen)
187 int rados_pg_command(rados_t cluster, const char *pgstr, const char **cmd, size_t cmdlen,
188 const char *inbuf, size_t inbuflen,
189 char **outbuf, size_t *outbuflen,
190 char **outs, size_t *outslen)
191 int rados_monitor_log(rados_t cluster, const char *level, rados_log_callback_t cb, void *arg)
192 int rados_monitor_log2(rados_t cluster, const char *level, rados_log_callback2_t cb, void *arg)
193
194 int rados_wait_for_latest_osdmap(rados_t cluster)
195
196 int rados_service_register(rados_t cluster, const char *service, const char *daemon, const char *metadata_dict)
197 int rados_service_update_status(rados_t cluster, const char *status_dict)
198
199 int rados_ioctx_create(rados_t cluster, const char *pool_name, rados_ioctx_t *ioctx)
200 int rados_ioctx_create2(rados_t cluster, int64_t pool_id, rados_ioctx_t *ioctx)
201 void rados_ioctx_destroy(rados_ioctx_t io)
202 void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key)
203 void rados_ioctx_set_namespace(rados_ioctx_t io, const char * nspace)
204
205 uint64_t rados_get_last_version(rados_ioctx_t io)
206 int rados_stat(rados_ioctx_t io, const char *o, uint64_t *psize, time_t *pmtime)
207 int rados_write(rados_ioctx_t io, const char *oid, const char *buf, size_t len, uint64_t off)
208 int rados_write_full(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
209 int rados_append(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
210 int rados_read(rados_ioctx_t io, const char *oid, char *buf, size_t len, uint64_t off)
211 int rados_remove(rados_ioctx_t io, const char *oid)
212 int rados_trunc(rados_ioctx_t io, const char *oid, uint64_t size)
213 int rados_getxattr(rados_ioctx_t io, const char *o, const char *name, char *buf, size_t len)
214 int rados_setxattr(rados_ioctx_t io, const char *o, const char *name, const char *buf, size_t len)
215 int rados_rmxattr(rados_ioctx_t io, const char *o, const char *name)
216 int rados_getxattrs(rados_ioctx_t io, const char *oid, rados_xattrs_iter_t *iter)
217 int rados_getxattrs_next(rados_xattrs_iter_t iter, const char **name, const char **val, size_t *len)
218 void rados_getxattrs_end(rados_xattrs_iter_t iter)
219
220 int rados_nobjects_list_open(rados_ioctx_t io, rados_list_ctx_t *ctx)
221 int rados_nobjects_list_next(rados_list_ctx_t ctx, const char **entry, const char **key, const char **nspace)
222 void rados_nobjects_list_close(rados_list_ctx_t ctx)
223
224 int rados_ioctx_pool_requires_alignment2(rados_ioctx_t io, int * requires)
225 int rados_ioctx_pool_required_alignment2(rados_ioctx_t io, uint64_t * alignment)
226
227 int rados_ioctx_snap_rollback(rados_ioctx_t io, const char * oid, const char * snapname)
228 int rados_ioctx_snap_create(rados_ioctx_t io, const char * snapname)
229 int rados_ioctx_snap_remove(rados_ioctx_t io, const char * snapname)
230 int rados_ioctx_snap_lookup(rados_ioctx_t io, const char * name, rados_snap_t * id)
231 int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id, char * name, int maxlen)
232 void rados_ioctx_snap_set_read(rados_ioctx_t io, rados_snap_t snap)
233 int rados_ioctx_snap_list(rados_ioctx_t io, rados_snap_t * snaps, int maxlen)
234 int rados_ioctx_snap_get_stamp(rados_ioctx_t io, rados_snap_t id, time_t * t)
235
236 int rados_ioctx_selfmanaged_snap_create(rados_ioctx_t io,
237 rados_snap_t *snapid)
238 int rados_ioctx_selfmanaged_snap_remove(rados_ioctx_t io,
239 rados_snap_t snapid)
240 int rados_ioctx_selfmanaged_snap_set_write_ctx(rados_ioctx_t io,
241 rados_snap_t snap_seq,
242 rados_snap_t *snap,
243 int num_snaps)
244 int rados_ioctx_selfmanaged_snap_rollback(rados_ioctx_t io, const char *oid,
245 rados_snap_t snapid)
246
247 int rados_lock_exclusive(rados_ioctx_t io, const char * oid, const char * name,
248 const char * cookie, const char * desc,
249 timeval * duration, uint8_t flags)
250 int rados_lock_shared(rados_ioctx_t io, const char * o, const char * name,
251 const char * cookie, const char * tag, const char * desc,
252 timeval * duration, uint8_t flags)
253 int rados_unlock(rados_ioctx_t io, const char * o, const char * name, const char * cookie)
254
255 rados_write_op_t rados_create_write_op()
256 void rados_release_write_op(rados_write_op_t write_op)
257
258 rados_read_op_t rados_create_read_op()
259 void rados_release_read_op(rados_read_op_t read_op)
260
261 int rados_aio_create_completion(void * cb_arg, rados_callback_t cb_complete, rados_callback_t cb_safe, rados_completion_t * pc)
262 void rados_aio_release(rados_completion_t c)
263 int rados_aio_stat(rados_ioctx_t io, const char *oid, rados_completion_t completion, uint64_t *psize, time_t *pmtime)
264 int rados_aio_write(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len, uint64_t off)
265 int rados_aio_append(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
266 int rados_aio_write_full(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
267 int rados_aio_remove(rados_ioctx_t io, const char * oid, rados_completion_t completion)
268 int rados_aio_read(rados_ioctx_t io, const char * oid, rados_completion_t completion, char * buf, size_t len, uint64_t off)
269 int rados_aio_flush(rados_ioctx_t io)
270
271 int rados_aio_get_return_value(rados_completion_t c)
272 int rados_aio_wait_for_complete_and_cb(rados_completion_t c)
273 int rados_aio_wait_for_safe_and_cb(rados_completion_t c)
274 int rados_aio_wait_for_complete(rados_completion_t c)
275 int rados_aio_wait_for_safe(rados_completion_t c)
276 int rados_aio_is_complete(rados_completion_t c)
277 int rados_aio_is_safe(rados_completion_t c)
278
279 int rados_exec(rados_ioctx_t io, const char * oid, const char * cls, const char * method,
280 const char * in_buf, size_t in_len, char * buf, size_t out_len)
281 int rados_aio_exec(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * cls, const char * method,
282 const char * in_buf, size_t in_len, char * buf, size_t out_len)
283
284 int rados_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, const char * oid, time_t * mtime, int flags)
285 int rados_aio_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, time_t *mtime, int flags)
286 void rados_write_op_omap_set(rados_write_op_t write_op, const char * const* keys, const char * const* vals, const size_t * lens, size_t num)
287 void rados_write_op_omap_rm_keys(rados_write_op_t write_op, const char * const* keys, size_t keys_len)
288 void rados_write_op_omap_clear(rados_write_op_t write_op)
289 void rados_write_op_set_flags(rados_write_op_t write_op, int flags)
290
291 void rados_write_op_create(rados_write_op_t write_op, int exclusive, const char *category)
292 void rados_write_op_append(rados_write_op_t write_op, const char *buffer, size_t len)
293 void rados_write_op_write_full(rados_write_op_t write_op, const char *buffer, size_t len)
294 void rados_write_op_assert_version(rados_write_op_t write_op, uint64_t ver)
295 void rados_write_op_write(rados_write_op_t write_op, const char *buffer, size_t len, uint64_t offset)
296 void rados_write_op_remove(rados_write_op_t write_op)
297 void rados_write_op_truncate(rados_write_op_t write_op, uint64_t offset)
298 void rados_write_op_zero(rados_write_op_t write_op, uint64_t offset, uint64_t len)
299
300 void rados_read_op_omap_get_vals2(rados_read_op_t read_op, const char * start_after, const char * filter_prefix, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
301 void rados_read_op_omap_get_keys2(rados_read_op_t read_op, const char * start_after, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
302 void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op, const char * const* keys, size_t keys_len, rados_omap_iter_t * iter, int * prval)
303 int rados_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, const char * oid, int flags)
304 int rados_aio_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, int flags)
305 void rados_read_op_set_flags(rados_read_op_t read_op, int flags)
306 int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len)
307 void rados_omap_get_end(rados_omap_iter_t iter)
308 int rados_notify2(rados_ioctx_t io, const char * o, const char *buf, int buf_len, uint64_t timeout_ms, char **reply_buffer, size_t *reply_buffer_len)
309
310
311 LIBRADOS_OP_FLAG_EXCL = _LIBRADOS_OP_FLAG_EXCL
312 LIBRADOS_OP_FLAG_FAILOK = _LIBRADOS_OP_FLAG_FAILOK
313 LIBRADOS_OP_FLAG_FADVISE_RANDOM = _LIBRADOS_OP_FLAG_FADVISE_RANDOM
314 LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL = _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL
315 LIBRADOS_OP_FLAG_FADVISE_WILLNEED = _LIBRADOS_OP_FLAG_FADVISE_WILLNEED
316 LIBRADOS_OP_FLAG_FADVISE_DONTNEED = _LIBRADOS_OP_FLAG_FADVISE_DONTNEED
317 LIBRADOS_OP_FLAG_FADVISE_NOCACHE = _LIBRADOS_OP_FLAG_FADVISE_NOCACHE
318
319 LIBRADOS_SNAP_HEAD = _LIBRADOS_SNAP_HEAD
320
321 LIBRADOS_OPERATION_NOFLAG = _LIBRADOS_OPERATION_NOFLAG
322 LIBRADOS_OPERATION_BALANCE_READS = _LIBRADOS_OPERATION_BALANCE_READS
323 LIBRADOS_OPERATION_LOCALIZE_READS = _LIBRADOS_OPERATION_LOCALIZE_READS
324 LIBRADOS_OPERATION_ORDER_READS_WRITES = _LIBRADOS_OPERATION_ORDER_READS_WRITES
325 LIBRADOS_OPERATION_IGNORE_CACHE = _LIBRADOS_OPERATION_IGNORE_CACHE
326 LIBRADOS_OPERATION_SKIPRWLOCKS = _LIBRADOS_OPERATION_SKIPRWLOCKS
327 LIBRADOS_OPERATION_IGNORE_OVERLAY = _LIBRADOS_OPERATION_IGNORE_OVERLAY
328
329 LIBRADOS_ALL_NSPACES = _LIBRADOS_ALL_NSPACES.decode('utf-8')
330
331 LIBRADOS_CREATE_EXCLUSIVE = _LIBRADOS_CREATE_EXCLUSIVE
332 LIBRADOS_CREATE_IDEMPOTENT = _LIBRADOS_CREATE_IDEMPOTENT
333
334 ANONYMOUS_AUID = 0xffffffffffffffff
335 ADMIN_AUID = 0
336
337
338 class Error(Exception):
339 """ `Error` class, derived from `Exception` """
340 def __init__(self, message, errno=None):
341 super(Exception, self).__init__(message)
342 self.errno = errno
343
344 def __str__(self):
345 msg = super(Exception, self).__str__()
346 if self.errno is None:
347 return msg
348 return '[errno {0}] {1}'.format(self.errno, msg)
349
350 def __reduce__(self):
351 return (self.__class__, (self.message, self.errno))
352
353 class InvalidArgumentError(Error):
354 pass
355
356 class OSError(Error):
357 """ `OSError` class, derived from `Error` """
358 pass
359
360 class InterruptedOrTimeoutError(OSError):
361 """ `InterruptedOrTimeoutError` class, derived from `OSError` """
362 pass
363
364
365 class PermissionError(OSError):
366 """ `PermissionError` class, derived from `OSError` """
367 pass
368
369
370 class PermissionDeniedError(OSError):
371 """ deal with EACCES related. """
372 pass
373
374
375 class ObjectNotFound(OSError):
376 """ `ObjectNotFound` class, derived from `OSError` """
377 pass
378
379
380 class NoData(OSError):
381 """ `NoData` class, derived from `OSError` """
382 pass
383
384
385 class ObjectExists(OSError):
386 """ `ObjectExists` class, derived from `OSError` """
387 pass
388
389
390 class ObjectBusy(OSError):
391 """ `ObjectBusy` class, derived from `IOError` """
392 pass
393
394
395 class IOError(OSError):
396 """ `ObjectBusy` class, derived from `OSError` """
397 pass
398
399
400 class NoSpace(OSError):
401 """ `NoSpace` class, derived from `OSError` """
402 pass
403
404
405 class RadosStateError(Error):
406 """ `RadosStateError` class, derived from `Error` """
407 pass
408
409
410 class IoctxStateError(Error):
411 """ `IoctxStateError` class, derived from `Error` """
412 pass
413
414
415 class ObjectStateError(Error):
416 """ `ObjectStateError` class, derived from `Error` """
417 pass
418
419
420 class LogicError(Error):
421 """ `` class, derived from `Error` """
422 pass
423
424
425 class TimedOut(OSError):
426 """ `TimedOut` class, derived from `OSError` """
427 pass
428
429
430 IF UNAME_SYSNAME == "FreeBSD":
431 cdef errno_to_exception = {
432 errno.EPERM : PermissionError,
433 errno.ENOENT : ObjectNotFound,
434 errno.EIO : IOError,
435 errno.ENOSPC : NoSpace,
436 errno.EEXIST : ObjectExists,
437 errno.EBUSY : ObjectBusy,
438 errno.ENOATTR : NoData,
439 errno.EINTR : InterruptedOrTimeoutError,
440 errno.ETIMEDOUT : TimedOut,
441 errno.EACCES : PermissionDeniedError,
442 errno.EINVAL : InvalidArgumentError,
443 }
444 ELSE:
445 cdef errno_to_exception = {
446 errno.EPERM : PermissionError,
447 errno.ENOENT : ObjectNotFound,
448 errno.EIO : IOError,
449 errno.ENOSPC : NoSpace,
450 errno.EEXIST : ObjectExists,
451 errno.EBUSY : ObjectBusy,
452 errno.ENODATA : NoData,
453 errno.EINTR : InterruptedOrTimeoutError,
454 errno.ETIMEDOUT : TimedOut,
455 errno.EACCES : PermissionDeniedError,
456 errno.EINVAL : InvalidArgumentError,
457 }
458
459
460 cdef make_ex(ret, msg):
461 """
462 Translate a librados return code into an exception.
463
464 :param ret: the return code
465 :type ret: int
466 :param msg: the error message to use
467 :type msg: str
468 :returns: a subclass of :class:`Error`
469 """
470 ret = abs(ret)
471 if ret in errno_to_exception:
472 return errno_to_exception[ret](msg, errno=ret)
473 else:
474 return OSError(msg, errno=ret)
475
476
477 # helper to specify an optional argument, where in addition to `cls`, `None`
478 # is also acceptable
479 def opt(cls):
480 return (cls, None)
481
482
483 # validate argument types of an instance method
484 # kwargs is an un-ordered dict, so use args instead
485 def requires(*types):
486 def is_type_of(v, t):
487 if t is None:
488 return v is None
489 else:
490 return isinstance(v, t)
491
492 def check_type(val, arg_name, arg_type):
493 if isinstance(arg_type, tuple):
494 if any(is_type_of(val, t) for t in arg_type):
495 return
496 type_names = ' or '.join('None' if t is None else t.__name__
497 for t in arg_type)
498 raise TypeError('%s must be %s' % (arg_name, type_names))
499 else:
500 if is_type_of(val, arg_type):
501 return
502 assert(arg_type is not None)
503 raise TypeError('%s must be %s' % (arg_name, arg_type.__name__))
504
505 def wrapper(f):
506 # FIXME(sileht): this stop with
507 # AttributeError: 'method_descriptor' object has no attribute '__module__'
508 # @wraps(f)
509 def validate_func(*args, **kwargs):
510 # ignore the `self` arg
511 pos_args = zip(args[1:], types)
512 named_args = ((kwargs[name], (name, spec)) for name, spec in types
513 if name in kwargs)
514 for arg_val, (arg_name, arg_type) in chain(pos_args, named_args):
515 check_type(arg_val, arg_name, arg_type)
516 return f(*args, **kwargs)
517 return validate_func
518 return wrapper
519
520
521 def cstr(val, name, encoding="utf-8", opt=False):
522 """
523 Create a byte string from a Python string
524
525 :param basestring val: Python string
526 :param str name: Name of the string parameter, for exceptions
527 :param str encoding: Encoding to use
528 :param bool opt: If True, None is allowed
529 :rtype: bytes
530 :raises: :class:`InvalidArgument`
531 """
532 if opt and val is None:
533 return None
534 if isinstance(val, bytes):
535 return val
536 elif isinstance(val, unicode):
537 return val.encode(encoding)
538 else:
539 raise TypeError('%s must be a string' % name)
540
541
542 def cstr_list(list_str, name, encoding="utf-8"):
543 return [cstr(s, name) for s in list_str]
544
545
546 def decode_cstr(val, encoding="utf-8"):
547 """
548 Decode a byte string into a Python string.
549
550 :param bytes val: byte string
551 :rtype: unicode or None
552 """
553 if val is None:
554 return None
555
556 return val.decode(encoding)
557
558
559 cdef char* opt_str(s) except? NULL:
560 if s is None:
561 return NULL
562 return s
563
564
565 cdef void* realloc_chk(void* ptr, size_t size) except NULL:
566 cdef void *ret = realloc(ptr, size)
567 if ret == NULL:
568 raise MemoryError("realloc failed")
569 return ret
570
571
572 cdef size_t * to_csize_t_array(list_int):
573 cdef size_t *ret = <size_t *>malloc(len(list_int) * sizeof(size_t))
574 if ret == NULL:
575 raise MemoryError("malloc failed")
576 for i in xrange(len(list_int)):
577 ret[i] = <size_t>list_int[i]
578 return ret
579
580
581 cdef char ** to_bytes_array(list_bytes):
582 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
583 if ret == NULL:
584 raise MemoryError("malloc failed")
585 for i in xrange(len(list_bytes)):
586 ret[i] = <char *>list_bytes[i]
587 return ret
588
589
590
591 cdef int __monitor_callback(void *arg, const char *line, const char *who,
592 uint64_t sec, uint64_t nsec, uint64_t seq,
593 const char *level, const char *msg) with gil:
594 cdef object cb_info = <object>arg
595 cb_info[0](cb_info[1], line, who, sec, nsec, seq, level, msg)
596 return 0
597
598 cdef int __monitor_callback2(void *arg, const char *line, const char *channel,
599 const char *who,
600 const char *name,
601 uint64_t sec, uint64_t nsec, uint64_t seq,
602 const char *level, const char *msg) with gil:
603 cdef object cb_info = <object>arg
604 cb_info[0](cb_info[1], line, channel, name, who, sec, nsec, seq, level, msg)
605 return 0
606
607
608 class Version(object):
609 """ Version information """
610 def __init__(self, major, minor, extra):
611 self.major = major
612 self.minor = minor
613 self.extra = extra
614
615 def __str__(self):
616 return "%d.%d.%d" % (self.major, self.minor, self.extra)
617
618
619 cdef class Rados(object):
620 """This class wraps librados functions"""
621 # NOTE(sileht): attributes declared in .pyd
622
623 def __init__(self, *args, **kwargs):
624 PyEval_InitThreads()
625 self.__setup(*args, **kwargs)
626
627 @requires(('rados_id', opt(str_type)), ('name', opt(str_type)), ('clustername', opt(str_type)),
628 ('conffile', opt(str_type)))
629 def __setup(self, rados_id=None, name=None, clustername=None,
630 conf_defaults=None, conffile=None, conf=None, flags=0,
631 context=None):
632 self.monitor_callback = None
633 self.monitor_callback2 = None
634 self.parsed_args = []
635 self.conf_defaults = conf_defaults
636 self.conffile = conffile
637 self.rados_id = rados_id
638
639 if rados_id and name:
640 raise Error("Rados(): can't supply both rados_id and name")
641 elif rados_id:
642 name = 'client.' + rados_id
643 elif name is None:
644 name = 'client.admin'
645 if clustername is None:
646 clustername = ''
647
648 name = cstr(name, 'name')
649 clustername = cstr(clustername, 'clustername')
650 cdef:
651 char *_name = name
652 char *_clustername = clustername
653 int _flags = flags
654 int ret
655
656 if context:
657 # Unpack void* (aka rados_config_t) from capsule
658 rados_config = <rados_config_t> PyCapsule_GetPointer(context, NULL)
659 with nogil:
660 ret = rados_create_with_context(&self.cluster, rados_config)
661 else:
662 with nogil:
663 ret = rados_create2(&self.cluster, _clustername, _name, _flags)
664 if ret != 0:
665 raise Error("rados_initialize failed with error code: %d" % ret)
666
667 self.state = "configuring"
668 # order is important: conf_defaults, then conffile, then conf
669 if conf_defaults:
670 for key, value in conf_defaults.items():
671 self.conf_set(key, value)
672 if conffile is not None:
673 # read the default conf file when '' is given
674 if conffile == '':
675 conffile = None
676 self.conf_read_file(conffile)
677 if conf:
678 for key, value in conf.items():
679 self.conf_set(key, value)
680
681 def require_state(self, *args):
682 """
683 Checks if the Rados object is in a special state
684
685 :raises: :class:`RadosStateError`
686 """
687 if self.state in args:
688 return
689 raise RadosStateError("You cannot perform that operation on a \
690 Rados object in state %s." % self.state)
691
692 def shutdown(self):
693 """
694 Disconnects from the cluster. Call this explicitly when a
695 Rados.connect()ed object is no longer used.
696 """
697 if self.state != "shutdown":
698 with nogil:
699 rados_shutdown(self.cluster)
700 self.state = "shutdown"
701
702 def __enter__(self):
703 self.connect()
704 return self
705
706 def __exit__(self, type_, value, traceback):
707 self.shutdown()
708 return False
709
710 def version(self):
711 """
712 Get the version number of the ``librados`` C library.
713
714 :returns: a tuple of ``(major, minor, extra)`` components of the
715 librados version
716 """
717 cdef int major = 0
718 cdef int minor = 0
719 cdef int extra = 0
720 with nogil:
721 rados_version(&major, &minor, &extra)
722 return Version(major, minor, extra)
723
724 @requires(('path', opt(str_type)))
725 def conf_read_file(self, path=None):
726 """
727 Configure the cluster handle using a Ceph config file.
728
729 :param path: path to the config file
730 :type path: str
731 """
732 self.require_state("configuring", "connected")
733 path = cstr(path, 'path', opt=True)
734 cdef:
735 char *_path = opt_str(path)
736 with nogil:
737 ret = rados_conf_read_file(self.cluster, _path)
738 if ret != 0:
739 raise make_ex(ret, "error calling conf_read_file")
740
741 def conf_parse_argv(self, args):
742 """
743 Parse known arguments from args, and remove; returned
744 args contain only those unknown to ceph
745 """
746 self.require_state("configuring", "connected")
747 if not args:
748 return
749
750 cargs = cstr_list(args, 'args')
751 cdef:
752 int _argc = len(args)
753 char **_argv = to_bytes_array(cargs)
754 char **_remargv = NULL
755
756 try:
757 _remargv = <char **>malloc(_argc * sizeof(char *))
758 with nogil:
759 ret = rados_conf_parse_argv_remainder(self.cluster, _argc,
760 <const char**>_argv,
761 <const char**>_remargv)
762 if ret:
763 raise make_ex(ret, "error calling conf_parse_argv_remainder")
764
765 # _remargv was allocated with fixed argc; collapse return
766 # list to eliminate any missing args
767 retargs = [decode_cstr(a) for a in _remargv[:_argc]
768 if a != NULL]
769 self.parsed_args = args
770 return retargs
771 finally:
772 free(_argv)
773 free(_remargv)
774
775 def conf_parse_env(self, var='CEPH_ARGS'):
776 """
777 Parse known arguments from an environment variable, normally
778 CEPH_ARGS.
779 """
780 self.require_state("configuring", "connected")
781 if not var:
782 return
783
784 var = cstr(var, 'var')
785 cdef:
786 char *_var = var
787 with nogil:
788 ret = rados_conf_parse_env(self.cluster, _var)
789 if ret != 0:
790 raise make_ex(ret, "error calling conf_parse_env")
791
792 @requires(('option', str_type))
793 def conf_get(self, option):
794 """
795 Get the value of a configuration option
796
797 :param option: which option to read
798 :type option: str
799
800 :returns: str - value of the option or None
801 :raises: :class:`TypeError`
802 """
803 self.require_state("configuring", "connected")
804 option = cstr(option, 'option')
805 cdef:
806 char *_option = option
807 size_t length = 20
808 char *ret_buf = NULL
809
810 try:
811 while True:
812 ret_buf = <char *>realloc_chk(ret_buf, length)
813 with nogil:
814 ret = rados_conf_get(self.cluster, _option, ret_buf, length)
815 if ret == 0:
816 return decode_cstr(ret_buf)
817 elif ret == -errno.ENAMETOOLONG:
818 length = length * 2
819 elif ret == -errno.ENOENT:
820 return None
821 else:
822 raise make_ex(ret, "error calling conf_get")
823 finally:
824 free(ret_buf)
825
826 @requires(('option', str_type), ('val', str_type))
827 def conf_set(self, option, val):
828 """
829 Set the value of a configuration option
830
831 :param option: which option to set
832 :type option: str
833 :param option: value of the option
834 :type option: str
835
836 :raises: :class:`TypeError`, :class:`ObjectNotFound`
837 """
838 self.require_state("configuring", "connected")
839 option = cstr(option, 'option')
840 val = cstr(val, 'val')
841 cdef:
842 char *_option = option
843 char *_val = val
844
845 with nogil:
846 ret = rados_conf_set(self.cluster, _option, _val)
847 if ret != 0:
848 raise make_ex(ret, "error calling conf_set")
849
850 def ping_monitor(self, mon_id):
851 """
852 Ping a monitor to assess liveness
853
854 May be used as a simply way to assess liveness, or to obtain
855 information about the monitor in a simple way even in the
856 absence of quorum.
857
858 :param mon_id: the ID portion of the monitor's name (i.e., mon.<ID>)
859 :type mon_id: str
860 :returns: the string reply from the monitor
861 """
862
863 self.require_state("configuring", "connected")
864
865 mon_id = cstr(mon_id, 'mon_id')
866 cdef:
867 char *_mon_id = mon_id
868 size_t outstrlen = 0
869 char *outstr
870
871 with nogil:
872 ret = rados_ping_monitor(self.cluster, _mon_id, &outstr, &outstrlen)
873
874 if ret != 0:
875 raise make_ex(ret, "error calling ping_monitor")
876
877 if outstrlen:
878 my_outstr = outstr[:outstrlen]
879 rados_buffer_free(outstr)
880 return decode_cstr(my_outstr)
881
882 def connect(self, timeout=0):
883 """
884 Connect to the cluster. Use shutdown() to release resources.
885 """
886 self.require_state("configuring")
887 # NOTE(sileht): timeout was supported by old python API,
888 # but this is not something available in C API, so ignore
889 # for now and remove it later
890 with nogil:
891 ret = rados_connect(self.cluster)
892 if ret != 0:
893 raise make_ex(ret, "error connecting to the cluster")
894 self.state = "connected"
895
896 def get_instance_id(self):
897 """
898 Get a global id for current instance
899 """
900 self.require_state("connected")
901 with nogil:
902 ret = rados_get_instance_id(self.cluster)
903 return ret;
904
905 def get_cluster_stats(self):
906 """
907 Read usage info about the cluster
908
909 This tells you total space, space used, space available, and number
910 of objects. These are not updated immediately when data is written,
911 they are eventually consistent.
912
913 :returns: dict - contains the following keys:
914
915 - ``kb`` (int) - total space
916
917 - ``kb_used`` (int) - space used
918
919 - ``kb_avail`` (int) - free space available
920
921 - ``num_objects`` (int) - number of objects
922
923 """
924 cdef:
925 rados_cluster_stat_t stats
926
927 with nogil:
928 ret = rados_cluster_stat(self.cluster, &stats)
929
930 if ret < 0:
931 raise make_ex(
932 ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id)
933 return {'kb': stats.kb,
934 'kb_used': stats.kb_used,
935 'kb_avail': stats.kb_avail,
936 'num_objects': stats.num_objects}
937
938 @requires(('pool_name', str_type))
939 def pool_exists(self, pool_name):
940 """
941 Checks if a given pool exists.
942
943 :param pool_name: name of the pool to check
944 :type pool_name: str
945
946 :raises: :class:`TypeError`, :class:`Error`
947 :returns: bool - whether the pool exists, false otherwise.
948 """
949 self.require_state("connected")
950
951 pool_name = cstr(pool_name, 'pool_name')
952 cdef:
953 char *_pool_name = pool_name
954
955 with nogil:
956 ret = rados_pool_lookup(self.cluster, _pool_name)
957 if ret >= 0:
958 return True
959 elif ret == -errno.ENOENT:
960 return False
961 else:
962 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
963
964 @requires(('pool_name', str_type))
965 def pool_lookup(self, pool_name):
966 """
967 Returns a pool's ID based on its name.
968
969 :param pool_name: name of the pool to look up
970 :type pool_name: str
971
972 :raises: :class:`TypeError`, :class:`Error`
973 :returns: int - pool ID, or None if it doesn't exist
974 """
975 self.require_state("connected")
976 pool_name = cstr(pool_name, 'pool_name')
977 cdef:
978 char *_pool_name = pool_name
979
980 with nogil:
981 ret = rados_pool_lookup(self.cluster, _pool_name)
982 if ret >= 0:
983 return int(ret)
984 elif ret == -errno.ENOENT:
985 return None
986 else:
987 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
988
989 @requires(('pool_id', int))
990 def pool_reverse_lookup(self, pool_id):
991 """
992 Returns a pool's name based on its ID.
993
994 :param pool_id: ID of the pool to look up
995 :type pool_id: int
996
997 :raises: :class:`TypeError`, :class:`Error`
998 :returns: string - pool name, or None if it doesn't exist
999 """
1000 self.require_state("connected")
1001 cdef:
1002 int64_t _pool_id = pool_id
1003 size_t size = 512
1004 char *name = NULL
1005
1006 try:
1007 while True:
1008 name = <char *>realloc_chk(name, size)
1009 with nogil:
1010 ret = rados_pool_reverse_lookup(self.cluster, _pool_id, name, size)
1011 if ret >= 0:
1012 break
1013 elif ret != -errno.ERANGE and size <= 4096:
1014 size *= 2
1015 elif ret == -errno.ENOENT:
1016 return None
1017 elif ret < 0:
1018 raise make_ex(ret, "error reverse looking up pool '%s'" % pool_id)
1019
1020 return decode_cstr(name)
1021
1022 finally:
1023 free(name)
1024
1025 @requires(('pool_name', str_type), ('crush_rule', opt(int)))
1026 def create_pool(self, pool_name, crush_rule=None):
1027 """
1028 Create a pool:
1029 - with default settings: if crush_rule=None
1030 - with a specific CRUSH rule: crush_rule given
1031
1032 :param pool_name: name of the pool to create
1033 :type pool_name: str
1034 :param crush_rule: rule to use for placement in the new pool
1035 :type crush_rule: int
1036
1037 :raises: :class:`TypeError`, :class:`Error`
1038 """
1039 self.require_state("connected")
1040
1041 pool_name = cstr(pool_name, 'pool_name')
1042 cdef:
1043 char *_pool_name = pool_name
1044 uint8_t _crush_rule
1045
1046 if crush_rule is None:
1047 with nogil:
1048 ret = rados_pool_create(self.cluster, _pool_name)
1049 else:
1050 _crush_rule = crush_rule
1051 with nogil:
1052 ret = rados_pool_create_with_crush_rule(self.cluster, _pool_name, _crush_rule)
1053 if ret < 0:
1054 raise make_ex(ret, "error creating pool '%s'" % pool_name)
1055
1056 @requires(('pool_id', int))
1057 def get_pool_base_tier(self, pool_id):
1058 """
1059 Get base pool
1060
1061 :returns: base pool, or pool_id if tiering is not configured for the pool
1062 """
1063 self.require_state("connected")
1064 cdef:
1065 int64_t base_tier = 0
1066 int64_t _pool_id = pool_id
1067
1068 with nogil:
1069 ret = rados_pool_get_base_tier(self.cluster, _pool_id, &base_tier)
1070 if ret < 0:
1071 raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id)
1072 return int(base_tier)
1073
1074 @requires(('pool_name', str_type))
1075 def delete_pool(self, pool_name):
1076 """
1077 Delete a pool and all data inside it.
1078
1079 The pool is removed from the cluster immediately,
1080 but the actual data is deleted in the background.
1081
1082 :param pool_name: name of the pool to delete
1083 :type pool_name: str
1084
1085 :raises: :class:`TypeError`, :class:`Error`
1086 """
1087 self.require_state("connected")
1088
1089 pool_name = cstr(pool_name, 'pool_name')
1090 cdef:
1091 char *_pool_name = pool_name
1092
1093 with nogil:
1094 ret = rados_pool_delete(self.cluster, _pool_name)
1095 if ret < 0:
1096 raise make_ex(ret, "error deleting pool '%s'" % pool_name)
1097
1098 @requires(('pool_id', int))
1099 def get_inconsistent_pgs(self, pool_id):
1100 """
1101 List inconsistent placement groups in the given pool
1102
1103 :param pool_id: ID of the pool in which PGs are listed
1104 :type pool_id: int
1105 :returns: list - inconsistent placement groups
1106 """
1107 self.require_state("connected")
1108 cdef:
1109 int64_t pool = pool_id
1110 size_t size = 512
1111 char *pgs = NULL
1112
1113 try:
1114 while True:
1115 pgs = <char *>realloc_chk(pgs, size);
1116 with nogil:
1117 ret = rados_inconsistent_pg_list(self.cluster, pool,
1118 pgs, size)
1119 if ret > <int>size:
1120 size *= 2
1121 elif ret >= 0:
1122 break
1123 else:
1124 raise make_ex(ret, "error calling inconsistent_pg_list")
1125 return [pg for pg in decode_cstr(pgs[:ret]).split('\0') if pg]
1126 finally:
1127 free(pgs)
1128
1129 def list_pools(self):
1130 """
1131 Gets a list of pool names.
1132
1133 :returns: list - of pool names.
1134 """
1135 self.require_state("connected")
1136 cdef:
1137 size_t size = 512
1138 char *c_names = NULL
1139
1140 try:
1141 while True:
1142 c_names = <char *>realloc_chk(c_names, size)
1143 with nogil:
1144 ret = rados_pool_list(self.cluster, c_names, size)
1145 if ret > <int>size:
1146 size *= 2
1147 elif ret >= 0:
1148 break
1149 return [name for name in decode_cstr(c_names[:ret]).split('\0')
1150 if name]
1151 finally:
1152 free(c_names)
1153
1154 def get_fsid(self):
1155 """
1156 Get the fsid of the cluster as a hexadecimal string.
1157
1158 :raises: :class:`Error`
1159 :returns: str - cluster fsid
1160 """
1161 self.require_state("connected")
1162 cdef:
1163 char *ret_buf = NULL
1164 size_t buf_len = 64
1165
1166 try:
1167 while True:
1168 ret_buf = <char *>realloc_chk(ret_buf, buf_len)
1169 with nogil:
1170 ret = rados_cluster_fsid(self.cluster, ret_buf, buf_len)
1171 if ret == -errno.ERANGE:
1172 buf_len = buf_len * 2
1173 elif ret < 0:
1174 raise make_ex(ret, "error getting cluster fsid")
1175 else:
1176 break
1177 return decode_cstr(ret_buf)
1178 finally:
1179 free(ret_buf)
1180
1181 @requires(('ioctx_name', str_type))
1182 def open_ioctx(self, ioctx_name):
1183 """
1184 Create an io context
1185
1186 The io context allows you to perform operations within a particular
1187 pool.
1188
1189 :param ioctx_name: name of the pool
1190 :type ioctx_name: str
1191
1192 :raises: :class:`TypeError`, :class:`Error`
1193 :returns: Ioctx - Rados Ioctx object
1194 """
1195 self.require_state("connected")
1196 ioctx_name = cstr(ioctx_name, 'ioctx_name')
1197 cdef:
1198 rados_ioctx_t ioctx
1199 char *_ioctx_name = ioctx_name
1200 with nogil:
1201 ret = rados_ioctx_create(self.cluster, _ioctx_name, &ioctx)
1202 if ret < 0:
1203 raise make_ex(ret, "error opening pool '%s'" % ioctx_name)
1204 io = Ioctx(ioctx_name)
1205 io.io = ioctx
1206 return io
1207
1208 @requires(('pool_id', int))
1209 def open_ioctx2(self, pool_id):
1210 """
1211 Create an io context
1212
1213 The io context allows you to perform operations within a particular
1214 pool.
1215
1216 :param pool_id: ID of the pool
1217 :type pool_id: int
1218
1219 :raises: :class:`TypeError`, :class:`Error`
1220 :returns: Ioctx - Rados Ioctx object
1221 """
1222 self.require_state("connected")
1223 cdef:
1224 rados_ioctx_t ioctx
1225 int64_t _pool_id = pool_id
1226 with nogil:
1227 ret = rados_ioctx_create2(self.cluster, _pool_id, &ioctx)
1228 if ret < 0:
1229 raise make_ex(ret, "error opening pool id '%s'" % pool_id)
1230 io = Ioctx(str(pool_id))
1231 io.io = ioctx
1232 return io
1233
1234 def mon_command(self, cmd, inbuf, timeout=0, target=None):
1235 """
1236 mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen)
1237 returns (int ret, string outbuf, string outs)
1238 """
1239 # NOTE(sileht): timeout is ignored because C API doesn't provide
1240 # timeout argument, but we keep it for backward compat with old python binding
1241
1242 self.require_state("connected")
1243 cmd = cstr_list(cmd, 'c')
1244
1245 if isinstance(target, int):
1246 # NOTE(sileht): looks weird but test_monmap_dump pass int
1247 target = str(target)
1248
1249 target = cstr(target, 'target', opt=True)
1250 inbuf = cstr(inbuf, 'inbuf')
1251
1252 cdef:
1253 char *_target = opt_str(target)
1254 char **_cmd = to_bytes_array(cmd)
1255 size_t _cmdlen = len(cmd)
1256
1257 char *_inbuf = inbuf
1258 size_t _inbuf_len = len(inbuf)
1259
1260 char *_outbuf
1261 size_t _outbuf_len
1262 char *_outs
1263 size_t _outs_len
1264
1265 try:
1266 if target:
1267 with nogil:
1268 ret = rados_mon_command_target(self.cluster, _target,
1269 <const char **>_cmd, _cmdlen,
1270 <const char*>_inbuf, _inbuf_len,
1271 &_outbuf, &_outbuf_len,
1272 &_outs, &_outs_len)
1273 else:
1274 with nogil:
1275 ret = rados_mon_command(self.cluster,
1276 <const char **>_cmd, _cmdlen,
1277 <const char*>_inbuf, _inbuf_len,
1278 &_outbuf, &_outbuf_len,
1279 &_outs, &_outs_len)
1280
1281 my_outs = decode_cstr(_outs[:_outs_len])
1282 my_outbuf = _outbuf[:_outbuf_len]
1283 if _outs_len:
1284 rados_buffer_free(_outs)
1285 if _outbuf_len:
1286 rados_buffer_free(_outbuf)
1287 return (ret, my_outbuf, my_outs)
1288 finally:
1289 free(_cmd)
1290
1291 def osd_command(self, osdid, cmd, inbuf, timeout=0):
1292 """
1293 osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1294 returns (int ret, string outbuf, string outs)
1295 """
1296 # NOTE(sileht): timeout is ignored because C API doesn't provide
1297 # timeout argument, but we keep it for backward compat with old python binding
1298 self.require_state("connected")
1299
1300 cmd = cstr_list(cmd, 'cmd')
1301 inbuf = cstr(inbuf, 'inbuf')
1302
1303 cdef:
1304 int _osdid = osdid
1305 char **_cmd = to_bytes_array(cmd)
1306 size_t _cmdlen = len(cmd)
1307
1308 char *_inbuf = inbuf
1309 size_t _inbuf_len = len(inbuf)
1310
1311 char *_outbuf
1312 size_t _outbuf_len
1313 char *_outs
1314 size_t _outs_len
1315
1316 try:
1317 with nogil:
1318 ret = rados_osd_command(self.cluster, _osdid,
1319 <const char **>_cmd, _cmdlen,
1320 <const char*>_inbuf, _inbuf_len,
1321 &_outbuf, &_outbuf_len,
1322 &_outs, &_outs_len)
1323
1324 my_outs = decode_cstr(_outs[:_outs_len])
1325 my_outbuf = _outbuf[:_outbuf_len]
1326 if _outs_len:
1327 rados_buffer_free(_outs)
1328 if _outbuf_len:
1329 rados_buffer_free(_outbuf)
1330 return (ret, my_outbuf, my_outs)
1331 finally:
1332 free(_cmd)
1333
1334 def mgr_command(self, cmd, inbuf, timeout=0):
1335 """
1336 returns (int ret, string outbuf, string outs)
1337 """
1338 # NOTE(sileht): timeout is ignored because C API doesn't provide
1339 # timeout argument, but we keep it for backward compat with old python binding
1340 self.require_state("connected")
1341
1342 cmd = cstr_list(cmd, 'cmd')
1343 inbuf = cstr(inbuf, 'inbuf')
1344
1345 cdef:
1346 char **_cmd = to_bytes_array(cmd)
1347 size_t _cmdlen = len(cmd)
1348
1349 char *_inbuf = inbuf
1350 size_t _inbuf_len = len(inbuf)
1351
1352 char *_outbuf
1353 size_t _outbuf_len
1354 char *_outs
1355 size_t _outs_len
1356
1357 try:
1358 with nogil:
1359 ret = rados_mgr_command(self.cluster,
1360 <const char **>_cmd, _cmdlen,
1361 <const char*>_inbuf, _inbuf_len,
1362 &_outbuf, &_outbuf_len,
1363 &_outs, &_outs_len)
1364
1365 my_outs = decode_cstr(_outs[:_outs_len])
1366 my_outbuf = _outbuf[:_outbuf_len]
1367 if _outs_len:
1368 rados_buffer_free(_outs)
1369 if _outbuf_len:
1370 rados_buffer_free(_outbuf)
1371 return (ret, my_outbuf, my_outs)
1372 finally:
1373 free(_cmd)
1374
1375 def pg_command(self, pgid, cmd, inbuf, timeout=0):
1376 """
1377 pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1378 returns (int ret, string outbuf, string outs)
1379 """
1380 # NOTE(sileht): timeout is ignored because C API doesn't provide
1381 # timeout argument, but we keep it for backward compat with old python binding
1382 self.require_state("connected")
1383
1384 pgid = cstr(pgid, 'pgid')
1385 cmd = cstr_list(cmd, 'cmd')
1386 inbuf = cstr(inbuf, 'inbuf')
1387
1388 cdef:
1389 char *_pgid = pgid
1390 char **_cmd = to_bytes_array(cmd)
1391 size_t _cmdlen = len(cmd)
1392
1393 char *_inbuf = inbuf
1394 size_t _inbuf_len = len(inbuf)
1395
1396 char *_outbuf
1397 size_t _outbuf_len
1398 char *_outs
1399 size_t _outs_len
1400
1401 try:
1402 with nogil:
1403 ret = rados_pg_command(self.cluster, _pgid,
1404 <const char **>_cmd, _cmdlen,
1405 <const char *>_inbuf, _inbuf_len,
1406 &_outbuf, &_outbuf_len,
1407 &_outs, &_outs_len)
1408
1409 my_outs = decode_cstr(_outs[:_outs_len])
1410 my_outbuf = _outbuf[:_outbuf_len]
1411 if _outs_len:
1412 rados_buffer_free(_outs)
1413 if _outbuf_len:
1414 rados_buffer_free(_outbuf)
1415 return (ret, my_outbuf, my_outs)
1416 finally:
1417 free(_cmd)
1418
1419 def wait_for_latest_osdmap(self):
1420 self.require_state("connected")
1421 with nogil:
1422 ret = rados_wait_for_latest_osdmap(self.cluster)
1423 return ret
1424
1425 def blacklist_add(self, client_address, expire_seconds=0):
1426 """
1427 Blacklist a client from the OSDs
1428
1429 :param client_address: client address
1430 :type client_address: str
1431 :param expire_seconds: number of seconds to blacklist
1432 :type expire_seconds: int
1433
1434 :raises: :class:`Error`
1435 """
1436 self.require_state("connected")
1437 client_address = cstr(client_address, 'client_address')
1438 cdef:
1439 uint32_t _expire_seconds = expire_seconds
1440 char *_client_address = client_address
1441
1442 with nogil:
1443 ret = rados_blacklist_add(self.cluster, _client_address, _expire_seconds)
1444 if ret < 0:
1445 raise make_ex(ret, "error blacklisting client '%s'" % client_address)
1446
1447 def monitor_log(self, level, callback, arg):
1448 if level not in MONITOR_LEVELS:
1449 raise LogicError("invalid monitor level " + level)
1450 if callback is not None and not callable(callback):
1451 raise LogicError("callback must be a callable function or None")
1452
1453 level = cstr(level, 'level')
1454 cdef char *_level = level
1455
1456 if callback is None:
1457 with nogil:
1458 r = rados_monitor_log(self.cluster, <const char*>_level, NULL, NULL)
1459 self.monitor_callback = None
1460 self.monitor_callback2 = None
1461 return
1462
1463 cb = (callback, arg)
1464 cdef PyObject* _arg = <PyObject*>cb
1465 with nogil:
1466 r = rados_monitor_log(self.cluster, <const char*>_level,
1467 <rados_log_callback_t>&__monitor_callback, _arg)
1468
1469 if r:
1470 raise make_ex(r, 'error calling rados_monitor_log')
1471 # NOTE(sileht): Prevents the callback method from being garbage collected
1472 self.monitor_callback = cb
1473 self.monitor_callback2 = None
1474
1475 def monitor_log2(self, level, callback, arg):
1476 if level not in MONITOR_LEVELS:
1477 raise LogicError("invalid monitor level " + level)
1478 if callback is not None and not callable(callback):
1479 raise LogicError("callback must be a callable function or None")
1480
1481 level = cstr(level, 'level')
1482 cdef char *_level = level
1483
1484 if callback is None:
1485 with nogil:
1486 r = rados_monitor_log2(self.cluster, <const char*>_level, NULL, NULL)
1487 self.monitor_callback = None
1488 self.monitor_callback2 = None
1489 return
1490
1491 cb = (callback, arg)
1492 cdef PyObject* _arg = <PyObject*>cb
1493 with nogil:
1494 r = rados_monitor_log2(self.cluster, <const char*>_level,
1495 <rados_log_callback2_t>&__monitor_callback2, _arg)
1496
1497 if r:
1498 raise make_ex(r, 'error calling rados_monitor_log')
1499 # NOTE(sileht): Prevents the callback method from being garbage collected
1500 self.monitor_callback = None
1501 self.monitor_callback2 = cb
1502
1503 @requires(('service', str_type), ('daemon', str_type), ('metadata', dict))
1504 def service_daemon_register(self, service, daemon, metadata):
1505 """
1506 :param str service: service name (e.g. "rgw")
1507 :param str daemon: daemon name (e.g. "gwfoo")
1508 :param dict metadata: static metadata about the register daemon
1509 (e.g., the version of Ceph, the kernel version.)
1510 """
1511 service = cstr(service, 'service')
1512 daemon = cstr(daemon, 'daemon')
1513 metadata_dict = '\0'.join(chain.from_iterable(metadata.items()))
1514 metadata_dict += '\0'
1515 cdef:
1516 char *_service = service
1517 char *_daemon = daemon
1518 char *_metadata = metadata_dict
1519
1520 with nogil:
1521 ret = rados_service_register(self.cluster, _service, _daemon, _metadata)
1522 if ret != 0:
1523 raise make_ex(ret, "error calling service_register()")
1524
1525 @requires(('metadata', dict))
1526 def service_daemon_update(self, status):
1527 status_dict = '\0'.join(chain.from_iterable(status.items()))
1528 status_dict += '\0'
1529 cdef:
1530 char *_status = status_dict
1531
1532 with nogil:
1533 ret = rados_service_update_status(self.cluster, _status)
1534 if ret != 0:
1535 raise make_ex(ret, "error calling service_daemon_update()")
1536
1537
1538 cdef class OmapIterator(object):
1539 """Omap iterator"""
1540
1541 cdef public Ioctx ioctx
1542 cdef rados_omap_iter_t ctx
1543
1544 def __cinit__(self, Ioctx ioctx):
1545 self.ioctx = ioctx
1546
1547 def __iter__(self):
1548 return self
1549
1550 def __next__(self):
1551 """
1552 Get the next key-value pair in the object
1553 :returns: next rados.OmapItem
1554 """
1555 cdef:
1556 char *key_ = NULL
1557 char *val_ = NULL
1558 size_t len_
1559
1560 with nogil:
1561 ret = rados_omap_get_next(self.ctx, &key_, &val_, &len_)
1562
1563 if ret != 0:
1564 raise make_ex(ret, "error iterating over the omap")
1565 if key_ == NULL:
1566 raise StopIteration()
1567 key = decode_cstr(key_)
1568 val = None
1569 if val_ != NULL:
1570 val = val_[:len_]
1571 return (key, val)
1572
1573 def __dealloc__(self):
1574 with nogil:
1575 rados_omap_get_end(self.ctx)
1576
1577
1578 cdef class ObjectIterator(object):
1579 """rados.Ioctx Object iterator"""
1580
1581 cdef rados_list_ctx_t ctx
1582
1583 cdef public object ioctx
1584
1585 def __cinit__(self, Ioctx ioctx):
1586 self.ioctx = ioctx
1587
1588 with nogil:
1589 ret = rados_nobjects_list_open(ioctx.io, &self.ctx)
1590 if ret < 0:
1591 raise make_ex(ret, "error iterating over the objects in ioctx '%s'"
1592 % self.ioctx.name)
1593
1594 def __iter__(self):
1595 return self
1596
1597 def __next__(self):
1598 """
1599 Get the next object name and locator in the pool
1600
1601 :raises: StopIteration
1602 :returns: next rados.Ioctx Object
1603 """
1604 cdef:
1605 const char *key_ = NULL
1606 const char *locator_ = NULL
1607 const char *nspace_ = NULL
1608
1609 with nogil:
1610 ret = rados_nobjects_list_next(self.ctx, &key_, &locator_, &nspace_)
1611
1612 if ret < 0:
1613 raise StopIteration()
1614
1615 key = decode_cstr(key_)
1616 locator = decode_cstr(locator_) if locator_ != NULL else None
1617 nspace = decode_cstr(nspace_) if nspace_ != NULL else None
1618 return Object(self.ioctx, key, locator, nspace)
1619
1620 def __dealloc__(self):
1621 with nogil:
1622 rados_nobjects_list_close(self.ctx)
1623
1624
1625 cdef class XattrIterator(object):
1626 """Extended attribute iterator"""
1627
1628 cdef rados_xattrs_iter_t it
1629 cdef char* _oid
1630
1631 cdef public Ioctx ioctx
1632 cdef public object oid
1633
1634 def __cinit__(self, Ioctx ioctx, oid):
1635 self.ioctx = ioctx
1636 self.oid = cstr(oid, 'oid')
1637 self._oid = self.oid
1638
1639 with nogil:
1640 ret = rados_getxattrs(ioctx.io, self._oid, &self.it)
1641 if ret != 0:
1642 raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid)
1643
1644 def __iter__(self):
1645 return self
1646
1647 def __next__(self):
1648 """
1649 Get the next xattr on the object
1650
1651 :raises: StopIteration
1652 :returns: pair - of name and value of the next Xattr
1653 """
1654 cdef:
1655 const char *name_ = NULL
1656 const char *val_ = NULL
1657 size_t len_ = 0
1658
1659 with nogil:
1660 ret = rados_getxattrs_next(self.it, &name_, &val_, &len_)
1661 if ret != 0:
1662 raise make_ex(ret, "error iterating over the extended attributes \
1663 in '%s'" % self.oid)
1664 if name_ == NULL:
1665 raise StopIteration()
1666 name = decode_cstr(name_)
1667 val = val_[:len_]
1668 return (name, val)
1669
1670 def __dealloc__(self):
1671 with nogil:
1672 rados_getxattrs_end(self.it)
1673
1674
1675 cdef class SnapIterator(object):
1676 """Snapshot iterator"""
1677
1678 cdef public Ioctx ioctx
1679
1680 cdef rados_snap_t *snaps
1681 cdef int max_snap
1682 cdef int cur_snap
1683
1684 def __cinit__(self, Ioctx ioctx):
1685 self.ioctx = ioctx
1686 # We don't know how big a buffer we need until we've called the
1687 # function. So use the exponential doubling strategy.
1688 cdef int num_snaps = 10
1689 while True:
1690 self.snaps = <rados_snap_t*>realloc_chk(self.snaps,
1691 num_snaps *
1692 sizeof(rados_snap_t))
1693
1694 with nogil:
1695 ret = rados_ioctx_snap_list(ioctx.io, self.snaps, num_snaps)
1696 if ret >= 0:
1697 self.max_snap = ret
1698 break
1699 elif ret != -errno.ERANGE:
1700 raise make_ex(ret, "error calling rados_snap_list for \
1701 ioctx '%s'" % self.ioctx.name)
1702 num_snaps = num_snaps * 2
1703 self.cur_snap = 0
1704
1705 def __iter__(self):
1706 return self
1707
1708 def __next__(self):
1709 """
1710 Get the next Snapshot
1711
1712 :raises: :class:`Error`, StopIteration
1713 :returns: Snap - next snapshot
1714 """
1715 if self.cur_snap >= self.max_snap:
1716 raise StopIteration
1717
1718 cdef:
1719 rados_snap_t snap_id = self.snaps[self.cur_snap]
1720 int name_len = 10
1721 char *name = NULL
1722
1723 try:
1724 while True:
1725 name = <char *>realloc_chk(name, name_len)
1726 with nogil:
1727 ret = rados_ioctx_snap_get_name(self.ioctx.io, snap_id, name, name_len)
1728 if ret == 0:
1729 break
1730 elif ret != -errno.ERANGE:
1731 raise make_ex(ret, "rados_snap_get_name error")
1732 else:
1733 name_len = name_len * 2
1734
1735 snap = Snap(self.ioctx, decode_cstr(name[:name_len]).rstrip('\0'), snap_id)
1736 self.cur_snap = self.cur_snap + 1
1737 return snap
1738 finally:
1739 free(name)
1740
1741
1742 cdef class Snap(object):
1743 """Snapshot object"""
1744 cdef public Ioctx ioctx
1745 cdef public object name
1746
1747 # NOTE(sileht): old API was storing the ctypes object
1748 # instead of the value ....
1749 cdef public rados_snap_t snap_id
1750
1751 def __cinit__(self, Ioctx ioctx, object name, rados_snap_t snap_id):
1752 self.ioctx = ioctx
1753 self.name = name
1754 self.snap_id = snap_id
1755
1756 def __str__(self):
1757 return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \
1758 % (str(self.ioctx), self.name, self.snap_id)
1759
1760 def get_timestamp(self):
1761 """
1762 Find when a snapshot in the current pool occurred
1763
1764 :raises: :class:`Error`
1765 :returns: datetime - the data and time the snapshot was created
1766 """
1767 cdef time_t snap_time
1768
1769 with nogil:
1770 ret = rados_ioctx_snap_get_stamp(self.ioctx.io, self.snap_id, &snap_time)
1771 if ret != 0:
1772 raise make_ex(ret, "rados_ioctx_snap_get_stamp error")
1773 return datetime.fromtimestamp(snap_time)
1774
1775
1776 cdef class Completion(object):
1777 """completion object"""
1778
1779 cdef public:
1780 Ioctx ioctx
1781 object oncomplete
1782 object onsafe
1783
1784 cdef:
1785 rados_callback_t complete_cb
1786 rados_callback_t safe_cb
1787 rados_completion_t rados_comp
1788 PyObject* buf
1789
1790 def __cinit__(self, Ioctx ioctx, object oncomplete, object onsafe):
1791 self.oncomplete = oncomplete
1792 self.onsafe = onsafe
1793 self.ioctx = ioctx
1794
1795 def is_safe(self):
1796 """
1797 Is an asynchronous operation safe?
1798
1799 This does not imply that the safe callback has finished.
1800
1801 :returns: True if the operation is safe
1802 """
1803 with nogil:
1804 ret = rados_aio_is_safe(self.rados_comp)
1805 return ret == 1
1806
1807 def is_complete(self):
1808 """
1809 Has an asynchronous operation completed?
1810
1811 This does not imply that the safe callback has finished.
1812
1813 :returns: True if the operation is completed
1814 """
1815 with nogil:
1816 ret = rados_aio_is_complete(self.rados_comp)
1817 return ret == 1
1818
1819 def wait_for_safe(self):
1820 """
1821 Wait for an asynchronous operation to be marked safe
1822
1823 This does not imply that the safe callback has finished.
1824 """
1825 with nogil:
1826 rados_aio_wait_for_safe(self.rados_comp)
1827
1828 def wait_for_complete(self):
1829 """
1830 Wait for an asynchronous operation to complete
1831
1832 This does not imply that the complete callback has finished.
1833 """
1834 with nogil:
1835 rados_aio_wait_for_complete(self.rados_comp)
1836
1837 def wait_for_safe_and_cb(self):
1838 """
1839 Wait for an asynchronous operation to be marked safe and for
1840 the safe callback to have returned
1841 """
1842 with nogil:
1843 rados_aio_wait_for_safe_and_cb(self.rados_comp)
1844
1845 def wait_for_complete_and_cb(self):
1846 """
1847 Wait for an asynchronous operation to complete and for the
1848 complete callback to have returned
1849
1850 :returns: whether the operation is completed
1851 """
1852 with nogil:
1853 ret = rados_aio_wait_for_complete_and_cb(self.rados_comp)
1854 return ret
1855
1856 def get_return_value(self):
1857 """
1858 Get the return value of an asychronous operation
1859
1860 The return value is set when the operation is complete or safe,
1861 whichever comes first.
1862
1863 :returns: int - return value of the operation
1864 """
1865 with nogil:
1866 ret = rados_aio_get_return_value(self.rados_comp)
1867 return ret
1868
1869 def __dealloc__(self):
1870 """
1871 Release a completion
1872
1873 Call this when you no longer need the completion. It may not be
1874 freed immediately if the operation is not acked and committed.
1875 """
1876 ref.Py_XDECREF(self.buf)
1877 self.buf = NULL
1878 if self.rados_comp != NULL:
1879 with nogil:
1880 rados_aio_release(self.rados_comp)
1881 self.rados_comp = NULL
1882
1883 def _complete(self):
1884 self.oncomplete(self)
1885 with self.ioctx.lock:
1886 if self.oncomplete:
1887 self.ioctx.complete_completions.remove(self)
1888
1889 def _safe(self):
1890 self.onsafe(self)
1891 with self.ioctx.lock:
1892 if self.onsafe:
1893 self.ioctx.safe_completions.remove(self)
1894
1895 def _cleanup(self):
1896 with self.ioctx.lock:
1897 if self.oncomplete:
1898 self.ioctx.complete_completions.remove(self)
1899 if self.onsafe:
1900 self.ioctx.safe_completions.remove(self)
1901
1902
1903 class OpCtx(object):
1904 def __enter__(self):
1905 return self.create()
1906
1907 def __exit__(self, type, msg, traceback):
1908 self.release()
1909
1910
1911 cdef class WriteOp(object):
1912 cdef rados_write_op_t write_op
1913
1914 def create(self):
1915 with nogil:
1916 self.write_op = rados_create_write_op()
1917 return self
1918
1919 def release(self):
1920 with nogil:
1921 rados_release_write_op(self.write_op)
1922
1923 @requires(('exclusive', opt(int)))
1924 def new(self, exclusive=None):
1925 """
1926 Create the object.
1927 """
1928
1929 cdef:
1930 int _exclusive = exclusive
1931
1932 with nogil:
1933 rados_write_op_create(self.write_op, _exclusive, NULL)
1934
1935
1936 def remove(self):
1937 """
1938 Remove object.
1939 """
1940 with nogil:
1941 rados_write_op_remove(self.write_op)
1942
1943 @requires(('flags', int))
1944 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
1945 """
1946 Set flags for the last operation added to this write_op.
1947 :para flags: flags to apply to the last operation
1948 :type flags: int
1949 """
1950
1951 cdef:
1952 int _flags = flags
1953
1954 with nogil:
1955 rados_write_op_set_flags(self.write_op, _flags)
1956
1957 @requires(('to_write', bytes))
1958 def append(self, to_write):
1959 """
1960 Append data to an object synchronously
1961 :param to_write: data to write
1962 :type to_write: bytes
1963 """
1964
1965 cdef:
1966 char *_to_write = to_write
1967 size_t length = len(to_write)
1968
1969 with nogil:
1970 rados_write_op_append(self.write_op, _to_write, length)
1971
1972 @requires(('to_write', bytes))
1973 def write_full(self, to_write):
1974 """
1975 Write whole object, atomically replacing it.
1976 :param to_write: data to write
1977 :type to_write: bytes
1978 """
1979
1980 cdef:
1981 char *_to_write = to_write
1982 size_t length = len(to_write)
1983
1984 with nogil:
1985 rados_write_op_write_full(self.write_op, _to_write, length)
1986
1987 @requires(('to_write', bytes), ('offset', int))
1988 def write(self, to_write, offset=0):
1989 """
1990 Write to offset.
1991 :param to_write: data to write
1992 :type to_write: bytes
1993 :param offset: byte offset in the object to begin writing at
1994 :type offset: int
1995 """
1996
1997 cdef:
1998 char *_to_write = to_write
1999 size_t length = len(to_write)
2000 uint64_t _offset = offset
2001
2002 with nogil:
2003 rados_write_op_write(self.write_op, _to_write, length, _offset)
2004
2005 @requires(('version', int))
2006 def assert_version(self, version):
2007 """
2008 Check if object's version is the expected one.
2009 :param version: expected version of the object
2010 :param type: int
2011 """
2012 cdef:
2013 uint64_t _version = version
2014
2015 with nogil:
2016 rados_write_op_assert_version(self.write_op, _version)
2017
2018 @requires(('offset', int), ('length', int))
2019 def zero(self, offset, length):
2020 """
2021 Zero part of an object.
2022 :param offset: byte offset in the object to begin writing at
2023 :type offset: int
2024 :param offset: number of zero to write
2025 :type offset: int
2026 """
2027
2028 cdef:
2029 size_t _length = length
2030 uint64_t _offset = offset
2031
2032 with nogil:
2033 rados_write_op_zero(self.write_op, _length, _offset)
2034
2035 @requires(('offset', int))
2036 def truncate(self, offset):
2037 """
2038 Truncate an object.
2039 :param offset: byte offset in the object to begin truncating at
2040 :type offset: int
2041 """
2042
2043 cdef:
2044 uint64_t _offset = offset
2045
2046 with nogil:
2047 rados_write_op_truncate(self.write_op, _offset)
2048
2049
2050 class WriteOpCtx(WriteOp, OpCtx):
2051 """write operation context manager"""
2052
2053
2054 cdef class ReadOp(object):
2055 cdef rados_read_op_t read_op
2056
2057 def create(self):
2058 with nogil:
2059 self.read_op = rados_create_read_op()
2060 return self
2061
2062 def release(self):
2063 with nogil:
2064 rados_release_read_op(self.read_op)
2065
2066 @requires(('flags', int))
2067 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
2068 """
2069 Set flags for the last operation added to this read_op.
2070 :para flags: flags to apply to the last operation
2071 :type flags: int
2072 """
2073
2074 cdef:
2075 int _flags = flags
2076
2077 with nogil:
2078 rados_read_op_set_flags(self.read_op, _flags)
2079
2080
2081 class ReadOpCtx(ReadOp, OpCtx):
2082 """read operation context manager"""
2083
2084
2085 cdef int __aio_safe_cb(rados_completion_t completion, void *args) with gil:
2086 """
2087 Callback to onsafe() for asynchronous operations
2088 """
2089 cdef object cb = <object>args
2090 cb._safe()
2091 return 0
2092
2093
2094 cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
2095 """
2096 Callback to oncomplete() for asynchronous operations
2097 """
2098 cdef object cb = <object>args
2099 cb._complete()
2100 return 0
2101
2102
2103 cdef class Ioctx(object):
2104 """rados.Ioctx object"""
2105 # NOTE(sileht): attributes declared in .pyd
2106
2107 def __init__(self, name):
2108 self.name = name
2109 self.state = "open"
2110
2111 self.locator_key = ""
2112 self.nspace = ""
2113 self.lock = threading.Lock()
2114 self.safe_completions = []
2115 self.complete_completions = []
2116
2117 def __enter__(self):
2118 return self
2119
2120 def __exit__(self, type_, value, traceback):
2121 self.close()
2122 return False
2123
2124 def __dealloc__(self):
2125 self.close()
2126
2127 def __track_completion(self, completion_obj):
2128 if completion_obj.oncomplete:
2129 with self.lock:
2130 self.complete_completions.append(completion_obj)
2131 if completion_obj.onsafe:
2132 with self.lock:
2133 self.safe_completions.append(completion_obj)
2134
2135 def __get_completion(self, oncomplete, onsafe):
2136 """
2137 Constructs a completion to use with asynchronous operations
2138
2139 :param oncomplete: what to do when the write is safe and complete in memory
2140 on all replicas
2141 :type oncomplete: completion
2142 :param onsafe: what to do when the write is safe and complete on storage
2143 on all replicas
2144 :type onsafe: completion
2145
2146 :raises: :class:`Error`
2147 :returns: completion object
2148 """
2149
2150 completion_obj = Completion(self, oncomplete, onsafe)
2151
2152 cdef:
2153 rados_callback_t complete_cb = NULL
2154 rados_callback_t safe_cb = NULL
2155 rados_completion_t completion
2156 PyObject* p_completion_obj= <PyObject*>completion_obj
2157
2158 if oncomplete:
2159 complete_cb = <rados_callback_t>&__aio_complete_cb
2160 if onsafe:
2161 safe_cb = <rados_callback_t>&__aio_safe_cb
2162
2163 with nogil:
2164 ret = rados_aio_create_completion(p_completion_obj, complete_cb, safe_cb,
2165 &completion)
2166 if ret < 0:
2167 raise make_ex(ret, "error getting a completion")
2168
2169 completion_obj.rados_comp = completion
2170 return completion_obj
2171
2172 @requires(('object_name', str_type), ('oncomplete', opt(Callable)))
2173 def aio_stat(self, object_name, oncomplete):
2174 """
2175 Asynchronously get object stats (size/mtime)
2176
2177 oncomplete will be called with the returned size and mtime
2178 as well as the completion:
2179
2180 oncomplete(completion, size, mtime)
2181
2182 :param object_name: the name of the object to get stats from
2183 :type object_name: str
2184 :param oncomplete: what to do when the stat is complete
2185 :type oncomplete: completion
2186
2187 :raises: :class:`Error`
2188 :returns: completion object
2189 """
2190
2191 object_name = cstr(object_name, 'object_name')
2192
2193 cdef:
2194 Completion completion
2195 char *_object_name = object_name
2196 uint64_t psize
2197 time_t pmtime
2198
2199 def oncomplete_(completion_v):
2200 cdef Completion _completion_v = completion_v
2201 return_value = _completion_v.get_return_value()
2202 if return_value >= 0:
2203 return oncomplete(_completion_v, psize, time.localtime(pmtime))
2204 else:
2205 return oncomplete(_completion_v, None, None)
2206
2207 completion = self.__get_completion(oncomplete_, None)
2208 self.__track_completion(completion)
2209 with nogil:
2210 ret = rados_aio_stat(self.io, _object_name, completion.rados_comp,
2211 &psize, &pmtime)
2212
2213 if ret < 0:
2214 completion._cleanup()
2215 raise make_ex(ret, "error stating %s" % object_name)
2216 return completion
2217
2218 @requires(('object_name', str_type), ('to_write', bytes), ('offset', int),
2219 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2220 def aio_write(self, object_name, to_write, offset=0,
2221 oncomplete=None, onsafe=None):
2222 """
2223 Write data to an object asynchronously
2224
2225 Queues the write and returns.
2226
2227 :param object_name: name of the object
2228 :type object_name: str
2229 :param to_write: data to write
2230 :type to_write: bytes
2231 :param offset: byte offset in the object to begin writing at
2232 :type offset: int
2233 :param oncomplete: what to do when the write is safe and complete in memory
2234 on all replicas
2235 :type oncomplete: completion
2236 :param onsafe: what to do when the write is safe and complete on storage
2237 on all replicas
2238 :type onsafe: completion
2239
2240 :raises: :class:`Error`
2241 :returns: completion object
2242 """
2243
2244 object_name = cstr(object_name, 'object_name')
2245
2246 cdef:
2247 Completion completion
2248 char* _object_name = object_name
2249 char* _to_write = to_write
2250 size_t size = len(to_write)
2251 uint64_t _offset = offset
2252
2253 completion = self.__get_completion(oncomplete, onsafe)
2254 self.__track_completion(completion)
2255 with nogil:
2256 ret = rados_aio_write(self.io, _object_name, completion.rados_comp,
2257 _to_write, size, _offset)
2258 if ret < 0:
2259 completion._cleanup()
2260 raise make_ex(ret, "error writing object %s" % object_name)
2261 return completion
2262
2263 @requires(('object_name', str_type), ('to_write', bytes), ('oncomplete', opt(Callable)),
2264 ('onsafe', opt(Callable)))
2265 def aio_write_full(self, object_name, to_write,
2266 oncomplete=None, onsafe=None):
2267 """
2268 Asynchronously write an entire object
2269
2270 The object is filled with the provided data. If the object exists,
2271 it is atomically truncated and then written.
2272 Queues the write and returns.
2273
2274 :param object_name: name of the object
2275 :type object_name: str
2276 :param to_write: data to write
2277 :type to_write: str
2278 :param oncomplete: what to do when the write is safe and complete in memory
2279 on all replicas
2280 :type oncomplete: completion
2281 :param onsafe: what to do when the write is safe and complete on storage
2282 on all replicas
2283 :type onsafe: completion
2284
2285 :raises: :class:`Error`
2286 :returns: completion object
2287 """
2288
2289 object_name = cstr(object_name, 'object_name')
2290
2291 cdef:
2292 Completion completion
2293 char* _object_name = object_name
2294 char* _to_write = to_write
2295 size_t size = len(to_write)
2296
2297 completion = self.__get_completion(oncomplete, onsafe)
2298 self.__track_completion(completion)
2299 with nogil:
2300 ret = rados_aio_write_full(self.io, _object_name,
2301 completion.rados_comp,
2302 _to_write, size)
2303 if ret < 0:
2304 completion._cleanup()
2305 raise make_ex(ret, "error writing object %s" % object_name)
2306 return completion
2307
2308 @requires(('object_name', str_type), ('to_append', bytes), ('oncomplete', opt(Callable)),
2309 ('onsafe', opt(Callable)))
2310 def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):
2311 """
2312 Asynchronously append data to an object
2313
2314 Queues the write and returns.
2315
2316 :param object_name: name of the object
2317 :type object_name: str
2318 :param to_append: data to append
2319 :type to_append: str
2320 :param offset: byte offset in the object to begin writing at
2321 :type offset: int
2322 :param oncomplete: what to do when the write is safe and complete in memory
2323 on all replicas
2324 :type oncomplete: completion
2325 :param onsafe: what to do when the write is safe and complete on storage
2326 on all replicas
2327 :type onsafe: completion
2328
2329 :raises: :class:`Error`
2330 :returns: completion object
2331 """
2332 object_name = cstr(object_name, 'object_name')
2333
2334 cdef:
2335 Completion completion
2336 char* _object_name = object_name
2337 char* _to_append = to_append
2338 size_t size = len(to_append)
2339
2340 completion = self.__get_completion(oncomplete, onsafe)
2341 self.__track_completion(completion)
2342 with nogil:
2343 ret = rados_aio_append(self.io, _object_name,
2344 completion.rados_comp,
2345 _to_append, size)
2346 if ret < 0:
2347 completion._cleanup()
2348 raise make_ex(ret, "error appending object %s" % object_name)
2349 return completion
2350
2351 def aio_flush(self):
2352 """
2353 Block until all pending writes in an io context are safe
2354
2355 :raises: :class:`Error`
2356 """
2357 with nogil:
2358 ret = rados_aio_flush(self.io)
2359 if ret < 0:
2360 raise make_ex(ret, "error flushing")
2361
2362 @requires(('object_name', str_type), ('length', int), ('offset', int),
2363 ('oncomplete', opt(Callable)))
2364 def aio_read(self, object_name, length, offset, oncomplete):
2365 """
2366 Asynchronously read data from an object
2367
2368 oncomplete will be called with the returned read value as
2369 well as the completion:
2370
2371 oncomplete(completion, data_read)
2372
2373 :param object_name: name of the object to read from
2374 :type object_name: str
2375 :param length: the number of bytes to read
2376 :type length: int
2377 :param offset: byte offset in the object to begin reading from
2378 :type offset: int
2379 :param oncomplete: what to do when the read is complete
2380 :type oncomplete: completion
2381
2382 :raises: :class:`Error`
2383 :returns: completion object
2384 """
2385
2386 object_name = cstr(object_name, 'object_name')
2387
2388 cdef:
2389 Completion completion
2390 char* _object_name = object_name
2391 uint64_t _offset = offset
2392
2393 char *ref_buf
2394 size_t _length = length
2395
2396 def oncomplete_(completion_v):
2397 cdef Completion _completion_v = completion_v
2398 return_value = _completion_v.get_return_value()
2399 if return_value > 0 and return_value != length:
2400 _PyBytes_Resize(&_completion_v.buf, return_value)
2401 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2402
2403 completion = self.__get_completion(oncomplete_, None)
2404 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2405 ret_buf = PyBytes_AsString(completion.buf)
2406 self.__track_completion(completion)
2407 with nogil:
2408 ret = rados_aio_read(self.io, _object_name, completion.rados_comp,
2409 ret_buf, _length, _offset)
2410 if ret < 0:
2411 completion._cleanup()
2412 raise make_ex(ret, "error reading %s" % object_name)
2413 return completion
2414
2415 @requires(('object_name', str_type), ('cls', str_type), ('method', str_type),
2416 ('data', bytes), ('length', int),
2417 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2418 def aio_execute(self, object_name, cls, method, data,
2419 length=8192, oncomplete=None, onsafe=None):
2420 """
2421 Asynchronously execute an OSD class method on an object.
2422
2423 oncomplete and onsafe will be called with the data returned from
2424 the plugin as well as the completion:
2425
2426 oncomplete(completion, data)
2427 onsafe(completion, data)
2428
2429 :param object_name: name of the object
2430 :type object_name: str
2431 :param cls: name of the object class
2432 :type cls: str
2433 :param method: name of the method
2434 :type method: str
2435 :param data: input data
2436 :type data: bytes
2437 :param length: size of output buffer in bytes (default=8192)
2438 :type length: int
2439 :param oncomplete: what to do when the execution is complete
2440 :type oncomplete: completion
2441 :param onsafe: what to do when the execution is safe and complete
2442 :type onsafe: completion
2443
2444 :raises: :class:`Error`
2445 :returns: completion object
2446 """
2447
2448 object_name = cstr(object_name, 'object_name')
2449 cls = cstr(cls, 'cls')
2450 method = cstr(method, 'method')
2451 cdef:
2452 Completion completion
2453 char *_object_name = object_name
2454 char *_cls = cls
2455 char *_method = method
2456 char *_data = data
2457 size_t _data_len = len(data)
2458
2459 char *ref_buf
2460 size_t _length = length
2461
2462 def oncomplete_(completion_v):
2463 cdef Completion _completion_v = completion_v
2464 return_value = _completion_v.get_return_value()
2465 if return_value > 0 and return_value != length:
2466 _PyBytes_Resize(&_completion_v.buf, return_value)
2467 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2468
2469 def onsafe_(completion_v):
2470 cdef Completion _completion_v = completion_v
2471 return_value = _completion_v.get_return_value()
2472 return onsafe(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2473
2474 completion = self.__get_completion(oncomplete_ if oncomplete else None, onsafe_ if onsafe else None)
2475 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2476 ret_buf = PyBytes_AsString(completion.buf)
2477 self.__track_completion(completion)
2478 with nogil:
2479 ret = rados_aio_exec(self.io, _object_name, completion.rados_comp,
2480 _cls, _method, _data, _data_len, ret_buf, _length)
2481 if ret < 0:
2482 completion._cleanup()
2483 raise make_ex(ret, "error executing %s::%s on %s" % (cls, method, object_name))
2484 return completion
2485
2486 @requires(('object_name', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2487 def aio_remove(self, object_name, oncomplete=None, onsafe=None):
2488 """
2489 Asynchronously remove an object
2490
2491 :param object_name: name of the object to remove
2492 :type object_name: str
2493 :param oncomplete: what to do when the remove is safe and complete in memory
2494 on all replicas
2495 :type oncomplete: completion
2496 :param onsafe: what to do when the remove is safe and complete on storage
2497 on all replicas
2498 :type onsafe: completion
2499
2500 :raises: :class:`Error`
2501 :returns: completion object
2502 """
2503 object_name = cstr(object_name, 'object_name')
2504
2505 cdef:
2506 Completion completion
2507 char* _object_name = object_name
2508
2509 completion = self.__get_completion(oncomplete, onsafe)
2510 self.__track_completion(completion)
2511 with nogil:
2512 ret = rados_aio_remove(self.io, _object_name,
2513 completion.rados_comp)
2514 if ret < 0:
2515 completion._cleanup()
2516 raise make_ex(ret, "error removing %s" % object_name)
2517 return completion
2518
2519 def require_ioctx_open(self):
2520 """
2521 Checks if the rados.Ioctx object state is 'open'
2522
2523 :raises: IoctxStateError
2524 """
2525 if self.state != "open":
2526 raise IoctxStateError("The pool is %s" % self.state)
2527
2528 @requires(('loc_key', str_type))
2529 def set_locator_key(self, loc_key):
2530 """
2531 Set the key for mapping objects to pgs within an io context.
2532
2533 The key is used instead of the object name to determine which
2534 placement groups an object is put in. This affects all subsequent
2535 operations of the io context - until a different locator key is
2536 set, all objects in this io context will be placed in the same pg.
2537
2538 :param loc_key: the key to use as the object locator, or NULL to discard
2539 any previously set key
2540 :type loc_key: str
2541
2542 :raises: :class:`TypeError`
2543 """
2544 self.require_ioctx_open()
2545 cloc_key = cstr(loc_key, 'loc_key')
2546 cdef char *_loc_key = cloc_key
2547 with nogil:
2548 rados_ioctx_locator_set_key(self.io, _loc_key)
2549 self.locator_key = loc_key
2550
2551 def get_locator_key(self):
2552 """
2553 Get the locator_key of context
2554
2555 :returns: locator_key
2556 """
2557 return self.locator_key
2558
2559 @requires(('snap_id', long))
2560 def set_read(self, snap_id):
2561 """
2562 Set the snapshot for reading objects.
2563
2564 To stop to read from snapshot, use set_read(LIBRADOS_SNAP_HEAD)
2565
2566 :param snap_id: the snapshot Id
2567 :type snap_id: int
2568
2569 :raises: :class:`TypeError`
2570 """
2571 self.require_ioctx_open()
2572 cdef rados_snap_t _snap_id = snap_id
2573 with nogil:
2574 rados_ioctx_snap_set_read(self.io, _snap_id)
2575
2576 @requires(('nspace', str_type))
2577 def set_namespace(self, nspace):
2578 """
2579 Set the namespace for objects within an io context.
2580
2581 The namespace in addition to the object name fully identifies
2582 an object. This affects all subsequent operations of the io context
2583 - until a different namespace is set, all objects in this io context
2584 will be placed in the same namespace.
2585
2586 :param nspace: the namespace to use, or None/"" for the default namespace
2587 :type nspace: str
2588
2589 :raises: :class:`TypeError`
2590 """
2591 self.require_ioctx_open()
2592 if nspace is None:
2593 nspace = ""
2594 cnspace = cstr(nspace, 'nspace')
2595 cdef char *_nspace = cnspace
2596 with nogil:
2597 rados_ioctx_set_namespace(self.io, _nspace)
2598 self.nspace = nspace
2599
2600 def get_namespace(self):
2601 """
2602 Get the namespace of context
2603
2604 :returns: namespace
2605 """
2606 return self.nspace
2607
2608 def close(self):
2609 """
2610 Close a rados.Ioctx object.
2611
2612 This just tells librados that you no longer need to use the io context.
2613 It may not be freed immediately if there are pending asynchronous
2614 requests on it, but you should not use an io context again after
2615 calling this function on it.
2616 """
2617 if self.state == "open":
2618 self.require_ioctx_open()
2619 with nogil:
2620 rados_ioctx_destroy(self.io)
2621 self.state = "closed"
2622
2623
2624 @requires(('key', str_type), ('data', bytes))
2625 def write(self, key, data, offset=0):
2626 """
2627 Write data to an object synchronously
2628
2629 :param key: name of the object
2630 :type key: str
2631 :param data: data to write
2632 :type data: bytes
2633 :param offset: byte offset in the object to begin writing at
2634 :type offset: int
2635
2636 :raises: :class:`TypeError`
2637 :raises: :class:`LogicError`
2638 :returns: int - 0 on success
2639 """
2640 self.require_ioctx_open()
2641
2642 key = cstr(key, 'key')
2643 cdef:
2644 char *_key = key
2645 char *_data = data
2646 size_t length = len(data)
2647 uint64_t _offset = offset
2648
2649 with nogil:
2650 ret = rados_write(self.io, _key, _data, length, _offset)
2651 if ret == 0:
2652 return ret
2653 elif ret < 0:
2654 raise make_ex(ret, "Ioctx.write(%s): failed to write %s"
2655 % (self.name, key))
2656 else:
2657 raise LogicError("Ioctx.write(%s): rados_write \
2658 returned %d, but should return zero on success." % (self.name, ret))
2659
2660 @requires(('key', str_type), ('data', bytes))
2661 def write_full(self, key, data):
2662 """
2663 Write an entire object synchronously.
2664
2665 The object is filled with the provided data. If the object exists,
2666 it is atomically truncated and then written.
2667
2668 :param key: name of the object
2669 :type key: str
2670 :param data: data to write
2671 :type data: bytes
2672
2673 :raises: :class:`TypeError`
2674 :raises: :class:`Error`
2675 :returns: int - 0 on success
2676 """
2677 self.require_ioctx_open()
2678 key = cstr(key, 'key')
2679 cdef:
2680 char *_key = key
2681 char *_data = data
2682 size_t length = len(data)
2683
2684 with nogil:
2685 ret = rados_write_full(self.io, _key, _data, length)
2686 if ret == 0:
2687 return ret
2688 elif ret < 0:
2689 raise make_ex(ret, "Ioctx.write_full(%s): failed to write %s"
2690 % (self.name, key))
2691 else:
2692 raise LogicError("Ioctx.write_full(%s): rados_write_full \
2693 returned %d, but should return zero on success." % (self.name, ret))
2694
2695 @requires(('key', str_type), ('data', bytes))
2696 def append(self, key, data):
2697 """
2698 Append data to an object synchronously
2699
2700 :param key: name of the object
2701 :type key: str
2702 :param data: data to write
2703 :type data: bytes
2704
2705 :raises: :class:`TypeError`
2706 :raises: :class:`LogicError`
2707 :returns: int - 0 on success
2708 """
2709 self.require_ioctx_open()
2710 key = cstr(key, 'key')
2711 cdef:
2712 char *_key = key
2713 char *_data = data
2714 size_t length = len(data)
2715
2716 with nogil:
2717 ret = rados_append(self.io, _key, _data, length)
2718 if ret == 0:
2719 return ret
2720 elif ret < 0:
2721 raise make_ex(ret, "Ioctx.append(%s): failed to append %s"
2722 % (self.name, key))
2723 else:
2724 raise LogicError("Ioctx.append(%s): rados_append \
2725 returned %d, but should return zero on success." % (self.name, ret))
2726
2727 @requires(('key', str_type))
2728 def read(self, key, length=8192, offset=0):
2729 """
2730 Read data from an object synchronously
2731
2732 :param key: name of the object
2733 :type key: str
2734 :param length: the number of bytes to read (default=8192)
2735 :type length: int
2736 :param offset: byte offset in the object to begin reading at
2737 :type offset: int
2738
2739 :raises: :class:`TypeError`
2740 :raises: :class:`Error`
2741 :returns: str - data read from object
2742 """
2743 self.require_ioctx_open()
2744 key = cstr(key, 'key')
2745 cdef:
2746 char *_key = key
2747 char *ret_buf
2748 uint64_t _offset = offset
2749 size_t _length = length
2750 PyObject* ret_s = NULL
2751
2752 ret_s = PyBytes_FromStringAndSize(NULL, length)
2753 try:
2754 ret_buf = PyBytes_AsString(ret_s)
2755 with nogil:
2756 ret = rados_read(self.io, _key, ret_buf, _length, _offset)
2757 if ret < 0:
2758 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2759
2760 if ret != length:
2761 _PyBytes_Resize(&ret_s, ret)
2762
2763 return <object>ret_s
2764 finally:
2765 # We DECREF unconditionally: the cast to object above will have
2766 # INCREFed if necessary. This also takes care of exceptions,
2767 # including if _PyString_Resize fails (that will free the string
2768 # itself and set ret_s to NULL, hence XDECREF).
2769 ref.Py_XDECREF(ret_s)
2770
2771 @requires(('key', str_type), ('cls', str_type), ('method', str_type), ('data', bytes))
2772 def execute(self, key, cls, method, data, length=8192):
2773 """
2774 Execute an OSD class method on an object.
2775
2776 :param key: name of the object
2777 :type key: str
2778 :param cls: name of the object class
2779 :type cls: str
2780 :param method: name of the method
2781 :type method: str
2782 :param data: input data
2783 :type data: bytes
2784 :param length: size of output buffer in bytes (default=8192)
2785 :type length: int
2786
2787 :raises: :class:`TypeError`
2788 :raises: :class:`Error`
2789 :returns: (ret, method output)
2790 """
2791 self.require_ioctx_open()
2792
2793 key = cstr(key, 'key')
2794 cls = cstr(cls, 'cls')
2795 method = cstr(method, 'method')
2796 cdef:
2797 char *_key = key
2798 char *_cls = cls
2799 char *_method = method
2800 char *_data = data
2801 size_t _data_len = len(data)
2802
2803 char *ref_buf
2804 size_t _length = length
2805 PyObject* ret_s = NULL
2806
2807 ret_s = PyBytes_FromStringAndSize(NULL, length)
2808 try:
2809 ret_buf = PyBytes_AsString(ret_s)
2810 with nogil:
2811 ret = rados_exec(self.io, _key, _cls, _method, _data,
2812 _data_len, ret_buf, _length)
2813 if ret < 0:
2814 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2815
2816 if ret != length:
2817 _PyBytes_Resize(&ret_s, ret)
2818
2819 return ret, <object>ret_s
2820 finally:
2821 # We DECREF unconditionally: the cast to object above will have
2822 # INCREFed if necessary. This also takes care of exceptions,
2823 # including if _PyString_Resize fails (that will free the string
2824 # itself and set ret_s to NULL, hence XDECREF).
2825 ref.Py_XDECREF(ret_s)
2826
2827 def get_stats(self):
2828 """
2829 Get pool usage statistics
2830
2831 :returns: dict - contains the following keys:
2832
2833 - ``num_bytes`` (int) - size of pool in bytes
2834
2835 - ``num_kb`` (int) - size of pool in kbytes
2836
2837 - ``num_objects`` (int) - number of objects in the pool
2838
2839 - ``num_object_clones`` (int) - number of object clones
2840
2841 - ``num_object_copies`` (int) - number of object copies
2842
2843 - ``num_objects_missing_on_primary`` (int) - number of objets
2844 missing on primary
2845
2846 - ``num_objects_unfound`` (int) - number of unfound objects
2847
2848 - ``num_objects_degraded`` (int) - number of degraded objects
2849
2850 - ``num_rd`` (int) - bytes read
2851
2852 - ``num_rd_kb`` (int) - kbytes read
2853
2854 - ``num_wr`` (int) - bytes written
2855
2856 - ``num_wr_kb`` (int) - kbytes written
2857 """
2858 self.require_ioctx_open()
2859 cdef rados_pool_stat_t stats
2860 with nogil:
2861 ret = rados_ioctx_pool_stat(self.io, &stats)
2862 if ret < 0:
2863 raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name)
2864 return {'num_bytes': stats.num_bytes,
2865 'num_kb': stats.num_kb,
2866 'num_objects': stats.num_objects,
2867 'num_object_clones': stats.num_object_clones,
2868 'num_object_copies': stats.num_object_copies,
2869 "num_objects_missing_on_primary": stats.num_objects_missing_on_primary,
2870 "num_objects_unfound": stats.num_objects_unfound,
2871 "num_objects_degraded": stats.num_objects_degraded,
2872 "num_rd": stats.num_rd,
2873 "num_rd_kb": stats.num_rd_kb,
2874 "num_wr": stats.num_wr,
2875 "num_wr_kb": stats.num_wr_kb}
2876
2877 @requires(('key', str_type))
2878 def remove_object(self, key):
2879 """
2880 Delete an object
2881
2882 This does not delete any snapshots of the object.
2883
2884 :param key: the name of the object to delete
2885 :type key: str
2886
2887 :raises: :class:`TypeError`
2888 :raises: :class:`Error`
2889 :returns: bool - True on success
2890 """
2891 self.require_ioctx_open()
2892 key = cstr(key, 'key')
2893 cdef:
2894 char *_key = key
2895
2896 with nogil:
2897 ret = rados_remove(self.io, _key)
2898 if ret < 0:
2899 raise make_ex(ret, "Failed to remove '%s'" % key)
2900 return True
2901
2902 @requires(('key', str_type))
2903 def trunc(self, key, size):
2904 """
2905 Resize an object
2906
2907 If this enlarges the object, the new area is logically filled with
2908 zeroes. If this shrinks the object, the excess data is removed.
2909
2910 :param key: the name of the object to resize
2911 :type key: str
2912 :param size: the new size of the object in bytes
2913 :type size: int
2914
2915 :raises: :class:`TypeError`
2916 :raises: :class:`Error`
2917 :returns: int - 0 on success, otherwise raises error
2918 """
2919
2920 self.require_ioctx_open()
2921 key = cstr(key, 'key')
2922 cdef:
2923 char *_key = key
2924 uint64_t _size = size
2925
2926 with nogil:
2927 ret = rados_trunc(self.io, _key, _size)
2928 if ret < 0:
2929 raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))
2930 return ret
2931
2932 @requires(('key', str_type))
2933 def stat(self, key):
2934 """
2935 Get object stats (size/mtime)
2936
2937 :param key: the name of the object to get stats from
2938 :type key: str
2939
2940 :raises: :class:`TypeError`
2941 :raises: :class:`Error`
2942 :returns: (size,timestamp)
2943 """
2944 self.require_ioctx_open()
2945
2946 key = cstr(key, 'key')
2947 cdef:
2948 char *_key = key
2949 uint64_t psize
2950 time_t pmtime
2951
2952 with nogil:
2953 ret = rados_stat(self.io, _key, &psize, &pmtime)
2954 if ret < 0:
2955 raise make_ex(ret, "Failed to stat %r" % key)
2956 return psize, time.localtime(pmtime)
2957
2958 @requires(('key', str_type), ('xattr_name', str_type))
2959 def get_xattr(self, key, xattr_name):
2960 """
2961 Get the value of an extended attribute on an object.
2962
2963 :param key: the name of the object to get xattr from
2964 :type key: str
2965 :param xattr_name: which extended attribute to read
2966 :type xattr_name: str
2967
2968 :raises: :class:`TypeError`
2969 :raises: :class:`Error`
2970 :returns: str - value of the xattr
2971 """
2972 self.require_ioctx_open()
2973
2974 key = cstr(key, 'key')
2975 xattr_name = cstr(xattr_name, 'xattr_name')
2976 cdef:
2977 char *_key = key
2978 char *_xattr_name = xattr_name
2979 size_t ret_length = 4096
2980 char *ret_buf = NULL
2981
2982 try:
2983 while ret_length < 4096 * 1024 * 1024:
2984 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
2985 with nogil:
2986 ret = rados_getxattr(self.io, _key, _xattr_name, ret_buf, ret_length)
2987 if ret == -errno.ERANGE:
2988 ret_length *= 2
2989 elif ret < 0:
2990 raise make_ex(ret, "Failed to get xattr %r" % xattr_name)
2991 else:
2992 break
2993 return ret_buf[:ret]
2994 finally:
2995 free(ret_buf)
2996
2997 @requires(('oid', str_type))
2998 def get_xattrs(self, oid):
2999 """
3000 Start iterating over xattrs on an object.
3001
3002 :param oid: the name of the object to get xattrs from
3003 :type oid: str
3004
3005 :raises: :class:`TypeError`
3006 :raises: :class:`Error`
3007 :returns: XattrIterator
3008 """
3009 self.require_ioctx_open()
3010 return XattrIterator(self, oid)
3011
3012 @requires(('key', str_type), ('xattr_name', str_type), ('xattr_value', bytes))
3013 def set_xattr(self, key, xattr_name, xattr_value):
3014 """
3015 Set an extended attribute on an object.
3016
3017 :param key: the name of the object to set xattr to
3018 :type key: str
3019 :param xattr_name: which extended attribute to set
3020 :type xattr_name: str
3021 :param xattr_value: the value of the extended attribute
3022 :type xattr_value: bytes
3023
3024 :raises: :class:`TypeError`
3025 :raises: :class:`Error`
3026 :returns: bool - True on success, otherwise raise an error
3027 """
3028 self.require_ioctx_open()
3029
3030 key = cstr(key, 'key')
3031 xattr_name = cstr(xattr_name, 'xattr_name')
3032 cdef:
3033 char *_key = key
3034 char *_xattr_name = xattr_name
3035 char *_xattr_value = xattr_value
3036 size_t _xattr_value_len = len(xattr_value)
3037
3038 with nogil:
3039 ret = rados_setxattr(self.io, _key, _xattr_name,
3040 _xattr_value, _xattr_value_len)
3041 if ret < 0:
3042 raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
3043 return True
3044
3045 @requires(('key', str_type), ('xattr_name', str_type))
3046 def rm_xattr(self, key, xattr_name):
3047 """
3048 Removes an extended attribute on from an object.
3049
3050 :param key: the name of the object to remove xattr from
3051 :type key: str
3052 :param xattr_name: which extended attribute to remove
3053 :type xattr_name: str
3054
3055 :raises: :class:`TypeError`
3056 :raises: :class:`Error`
3057 :returns: bool - True on success, otherwise raise an error
3058 """
3059 self.require_ioctx_open()
3060
3061 key = cstr(key, 'key')
3062 xattr_name = cstr(xattr_name, 'xattr_name')
3063 cdef:
3064 char *_key = key
3065 char *_xattr_name = xattr_name
3066
3067 with nogil:
3068 ret = rados_rmxattr(self.io, _key, _xattr_name)
3069 if ret < 0:
3070 raise make_ex(ret, "Failed to delete key %r xattr %r" %
3071 (key, xattr_name))
3072 return True
3073
3074 @requires(('obj', str_type), ('msg', str_type), ('timeout_ms', int))
3075 def notify(self, obj, msg='', timeout_ms=5000):
3076 """
3077 Send a rados notification to an object.
3078
3079 :param obj: the name of the object to notify
3080 :type obj: str
3081 :param msg: optional message to send in the notification
3082 :type msg: str
3083 :param timeout_ms: notify timeout (in ms)
3084 :type timeout_ms: int
3085
3086 :raises: :class:`TypeError`
3087 :raises: :class:`Error`
3088 :returns: bool - True on success, otherwise raise an error
3089 """
3090 self.require_ioctx_open()
3091
3092 msglen = len(msg)
3093 obj = cstr(obj, 'obj')
3094 msg = cstr(msg, 'msg')
3095 cdef:
3096 char *_obj = obj
3097 char *_msg = msg
3098 int _msglen = msglen
3099 uint64_t _timeout_ms = timeout_ms
3100
3101 with nogil:
3102 ret = rados_notify2(self.io, _obj, _msg, _msglen, _timeout_ms,
3103 NULL, NULL)
3104 if ret < 0:
3105 raise make_ex(ret, "Failed to notify %r" % (obj))
3106 return True
3107
3108 def list_objects(self):
3109 """
3110 Get ObjectIterator on rados.Ioctx object.
3111
3112 :returns: ObjectIterator
3113 """
3114 self.require_ioctx_open()
3115 return ObjectIterator(self)
3116
3117 def list_snaps(self):
3118 """
3119 Get SnapIterator on rados.Ioctx object.
3120
3121 :returns: SnapIterator
3122 """
3123 self.require_ioctx_open()
3124 return SnapIterator(self)
3125
3126 @requires(('snap_name', str_type))
3127 def create_snap(self, snap_name):
3128 """
3129 Create a pool-wide snapshot
3130
3131 :param snap_name: the name of the snapshot
3132 :type snap_name: str
3133
3134 :raises: :class:`TypeError`
3135 :raises: :class:`Error`
3136 """
3137 self.require_ioctx_open()
3138 snap_name = cstr(snap_name, 'snap_name')
3139 cdef char *_snap_name = snap_name
3140
3141 with nogil:
3142 ret = rados_ioctx_snap_create(self.io, _snap_name)
3143 if ret != 0:
3144 raise make_ex(ret, "Failed to create snap %s" % snap_name)
3145
3146 @requires(('snap_name', str_type))
3147 def remove_snap(self, snap_name):
3148 """
3149 Removes a pool-wide snapshot
3150
3151 :param snap_name: the name of the snapshot
3152 :type snap_name: str
3153
3154 :raises: :class:`TypeError`
3155 :raises: :class:`Error`
3156 """
3157 self.require_ioctx_open()
3158 snap_name = cstr(snap_name, 'snap_name')
3159 cdef char *_snap_name = snap_name
3160
3161 with nogil:
3162 ret = rados_ioctx_snap_remove(self.io, _snap_name)
3163 if ret != 0:
3164 raise make_ex(ret, "Failed to remove snap %s" % snap_name)
3165
3166 @requires(('snap_name', str_type))
3167 def lookup_snap(self, snap_name):
3168 """
3169 Get the id of a pool snapshot
3170
3171 :param snap_name: the name of the snapshot to lookop
3172 :type snap_name: str
3173
3174 :raises: :class:`TypeError`
3175 :raises: :class:`Error`
3176 :returns: Snap - on success
3177 """
3178 self.require_ioctx_open()
3179 csnap_name = cstr(snap_name, 'snap_name')
3180 cdef:
3181 char *_snap_name = csnap_name
3182 rados_snap_t snap_id
3183
3184 with nogil:
3185 ret = rados_ioctx_snap_lookup(self.io, _snap_name, &snap_id)
3186 if ret != 0:
3187 raise make_ex(ret, "Failed to lookup snap %s" % snap_name)
3188 return Snap(self, snap_name, int(snap_id))
3189
3190 @requires(('oid', str_type), ('snap_name', str_type))
3191 def snap_rollback(self, oid, snap_name):
3192 """
3193 Rollback an object to a snapshot
3194
3195 :param oid: the name of the object
3196 :type oid: str
3197 :param snap_name: the name of the snapshot
3198 :type snap_name: str
3199
3200 :raises: :class:`TypeError`
3201 :raises: :class:`Error`
3202 """
3203 self.require_ioctx_open()
3204 oid = cstr(oid, 'oid')
3205 snap_name = cstr(snap_name, 'snap_name')
3206 cdef:
3207 char *_snap_name = snap_name
3208 char *_oid = oid
3209
3210 with nogil:
3211 ret = rados_ioctx_snap_rollback(self.io, _oid, _snap_name)
3212 if ret != 0:
3213 raise make_ex(ret, "Failed to rollback %s" % oid)
3214
3215 def create_self_managed_snap(self):
3216 """
3217 Creates a self-managed snapshot
3218
3219 :returns: snap id on success
3220
3221 :raises: :class:`Error`
3222 """
3223 self.require_ioctx_open()
3224 cdef:
3225 rados_snap_t _snap_id
3226 with nogil:
3227 ret = rados_ioctx_selfmanaged_snap_create(self.io, &_snap_id)
3228 if ret != 0:
3229 raise make_ex(ret, "Failed to create self-managed snapshot")
3230 return int(_snap_id)
3231
3232 @requires(('snap_id', int))
3233 def remove_self_managed_snap(self, snap_id):
3234 """
3235 Removes a self-managed snapshot
3236
3237 :param snap_id: the name of the snapshot
3238 :type snap_id: int
3239
3240 :raises: :class:`TypeError`
3241 :raises: :class:`Error`
3242 """
3243 self.require_ioctx_open()
3244 cdef:
3245 rados_snap_t _snap_id = snap_id
3246 with nogil:
3247 ret = rados_ioctx_selfmanaged_snap_remove(self.io, _snap_id)
3248 if ret != 0:
3249 raise make_ex(ret, "Failed to remove self-managed snapshot")
3250
3251 def set_self_managed_snap_write(self, snaps):
3252 """
3253 Updates the write context to the specified self-managed
3254 snapshot ids.
3255
3256 :param snaps: all associated self-managed snapshot ids
3257 :type snaps: list
3258
3259 :raises: :class:`TypeError`
3260 :raises: :class:`Error`
3261 """
3262 self.require_ioctx_open()
3263 sorted_snaps = []
3264 snap_seq = 0
3265 if snaps:
3266 sorted_snaps = sorted([int(x) for x in snaps], reverse=True)
3267 snap_seq = sorted_snaps[0]
3268
3269 cdef:
3270 rados_snap_t _snap_seq = snap_seq
3271 rados_snap_t *_snaps = NULL
3272 int _num_snaps = len(sorted_snaps)
3273 try:
3274 _snaps = <rados_snap_t *>malloc(_num_snaps * sizeof(rados_snap_t))
3275 for i in range(len(sorted_snaps)):
3276 _snaps[i] = sorted_snaps[i]
3277 with nogil:
3278 ret = rados_ioctx_selfmanaged_snap_set_write_ctx(self.io,
3279 _snap_seq,
3280 _snaps,
3281 _num_snaps)
3282 if ret != 0:
3283 raise make_ex(ret, "Failed to update snapshot write context")
3284 finally:
3285 free(_snaps)
3286
3287 @requires(('oid', str_type), ('snap_id', int))
3288 def rollback_self_managed_snap(self, oid, snap_id):
3289 """
3290 Rolls an specific object back to a self-managed snapshot revision
3291
3292 :param oid: the name of the object
3293 :type oid: str
3294 :param snap_id: the name of the snapshot
3295 :type snap_id: int
3296
3297 :raises: :class:`TypeError`
3298 :raises: :class:`Error`
3299 """
3300 self.require_ioctx_open()
3301 oid = cstr(oid, 'oid')
3302 cdef:
3303 char *_oid = oid
3304 rados_snap_t _snap_id = snap_id
3305 with nogil:
3306 ret = rados_ioctx_selfmanaged_snap_rollback(self.io, _oid, _snap_id)
3307 if ret != 0:
3308 raise make_ex(ret, "Failed to rollback %s" % oid)
3309
3310 def get_last_version(self):
3311 """
3312 Return the version of the last object read or written to.
3313
3314 This exposes the internal version number of the last object read or
3315 written via this io context
3316
3317 :returns: version of the last object used
3318 """
3319 self.require_ioctx_open()
3320 with nogil:
3321 ret = rados_get_last_version(self.io)
3322 return int(ret)
3323
3324 def create_write_op(self):
3325 """
3326 create write operation object.
3327 need call release_write_op after use
3328 """
3329 return WriteOp().create()
3330
3331 def create_read_op(self):
3332 """
3333 create read operation object.
3334 need call release_read_op after use
3335 """
3336 return ReadOp().create()
3337
3338 def release_write_op(self, write_op):
3339 """
3340 release memory alloc by create_write_op
3341 """
3342 write_op.release()
3343
3344 def release_read_op(self, read_op):
3345 """
3346 release memory alloc by create_read_op
3347 :para read_op: read_op object
3348 :type: int
3349 """
3350 read_op.release()
3351
3352 @requires(('write_op', WriteOp), ('keys', tuple), ('values', tuple))
3353 def set_omap(self, write_op, keys, values):
3354 """
3355 set keys values to write_op
3356 :para write_op: write_operation object
3357 :type write_op: WriteOp
3358 :para keys: a tuple of keys
3359 :type keys: tuple
3360 :para values: a tuple of values
3361 :type values: tuple
3362 """
3363
3364 if len(keys) != len(values):
3365 raise Error("Rados(): keys and values must have the same number of items")
3366
3367 keys = cstr_list(keys, 'keys')
3368 values = cstr_list(values, 'values')
3369 lens = [len(v) for v in values]
3370 cdef:
3371 WriteOp _write_op = write_op
3372 size_t key_num = len(keys)
3373 char **_keys = to_bytes_array(keys)
3374 char **_values = to_bytes_array(values)
3375 size_t *_lens = to_csize_t_array(lens)
3376
3377 try:
3378 with nogil:
3379 rados_write_op_omap_set(_write_op.write_op,
3380 <const char**>_keys,
3381 <const char**>_values,
3382 <const size_t*>_lens, key_num)
3383 finally:
3384 free(_keys)
3385 free(_values)
3386 free(_lens)
3387
3388 @requires(('write_op', WriteOp), ('oid', str_type), ('mtime', opt(int)), ('flags', opt(int)))
3389 def operate_write_op(self, write_op, oid, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3390 """
3391 execute the real write operation
3392 :para write_op: write operation object
3393 :type write_op: WriteOp
3394 :para oid: object name
3395 :type oid: str
3396 :para mtime: the time to set the mtime to, 0 for the current time
3397 :type mtime: int
3398 :para flags: flags to apply to the entire operation
3399 :type flags: int
3400 """
3401
3402 oid = cstr(oid, 'oid')
3403 cdef:
3404 WriteOp _write_op = write_op
3405 char *_oid = oid
3406 time_t _mtime = mtime
3407 int _flags = flags
3408
3409 with nogil:
3410 ret = rados_write_op_operate(_write_op.write_op, self.io, _oid, &_mtime, _flags)
3411 if ret != 0:
3412 raise make_ex(ret, "Failed to operate write op for oid %s" % oid)
3413
3414 @requires(('write_op', WriteOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('mtime', opt(int)), ('flags', opt(int)))
3415 def operate_aio_write_op(self, write_op, oid, oncomplete=None, onsafe=None, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3416 """
3417 execute the real write operation asynchronously
3418 :para write_op: write operation object
3419 :type write_op: WriteOp
3420 :para oid: object name
3421 :type oid: str
3422 :param oncomplete: what to do when the remove is safe and complete in memory
3423 on all replicas
3424 :type oncomplete: completion
3425 :param onsafe: what to do when the remove is safe and complete on storage
3426 on all replicas
3427 :type onsafe: completion
3428 :para mtime: the time to set the mtime to, 0 for the current time
3429 :type mtime: int
3430 :para flags: flags to apply to the entire operation
3431 :type flags: int
3432
3433 :raises: :class:`Error`
3434 :returns: completion object
3435 """
3436
3437 oid = cstr(oid, 'oid')
3438 cdef:
3439 WriteOp _write_op = write_op
3440 char *_oid = oid
3441 Completion completion
3442 time_t _mtime = mtime
3443 int _flags = flags
3444
3445 completion = self.__get_completion(oncomplete, onsafe)
3446 self.__track_completion(completion)
3447
3448 with nogil:
3449 ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid,
3450 &_mtime, _flags)
3451 if ret != 0:
3452 completion._cleanup()
3453 raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid)
3454 return completion
3455
3456 @requires(('read_op', ReadOp), ('oid', str_type), ('flag', opt(int)))
3457 def operate_read_op(self, read_op, oid, flag=LIBRADOS_OPERATION_NOFLAG):
3458 """
3459 execute the real read operation
3460 :para read_op: read operation object
3461 :type read_op: ReadOp
3462 :para oid: object name
3463 :type oid: str
3464 :para flag: flags to apply to the entire operation
3465 :type flag: int
3466 """
3467 oid = cstr(oid, 'oid')
3468 cdef:
3469 ReadOp _read_op = read_op
3470 char *_oid = oid
3471 int _flag = flag
3472
3473 with nogil:
3474 ret = rados_read_op_operate(_read_op.read_op, self.io, _oid, _flag)
3475 if ret != 0:
3476 raise make_ex(ret, "Failed to operate read op for oid %s" % oid)
3477
3478 @requires(('read_op', ReadOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('flag', opt(int)))
3479 def operate_aio_read_op(self, read_op, oid, oncomplete=None, onsafe=None, flag=LIBRADOS_OPERATION_NOFLAG):
3480 """
3481 execute the real read operation
3482 :para read_op: read operation object
3483 :type read_op: ReadOp
3484 :para oid: object name
3485 :type oid: str
3486 :param oncomplete: what to do when the remove is safe and complete in memory
3487 on all replicas
3488 :type oncomplete: completion
3489 :param onsafe: what to do when the remove is safe and complete on storage
3490 on all replicas
3491 :type onsafe: completion
3492 :para flag: flags to apply to the entire operation
3493 :type flag: int
3494 """
3495 oid = cstr(oid, 'oid')
3496 cdef:
3497 ReadOp _read_op = read_op
3498 char *_oid = oid
3499 Completion completion
3500 int _flag = flag
3501
3502 completion = self.__get_completion(oncomplete, onsafe)
3503 self.__track_completion(completion)
3504
3505 with nogil:
3506 ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag)
3507 if ret != 0:
3508 completion._cleanup()
3509 raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid)
3510 return completion
3511
3512 @requires(('read_op', ReadOp), ('start_after', str_type), ('filter_prefix', str_type), ('max_return', int))
3513 def get_omap_vals(self, read_op, start_after, filter_prefix, max_return):
3514 """
3515 get the omap values
3516 :para read_op: read operation object
3517 :type read_op: ReadOp
3518 :para start_after: list keys starting after start_after
3519 :type start_after: str
3520 :para filter_prefix: list only keys beginning with filter_prefix
3521 :type filter_prefix: str
3522 :para max_return: list no more than max_return key/value pairs
3523 :type max_return: int
3524 :returns: an iterator over the requested omap values, return value from this action
3525 """
3526
3527 start_after = cstr(start_after, 'start_after') if start_after else None
3528 filter_prefix = cstr(filter_prefix, 'filter_prefix') if filter_prefix else None
3529 cdef:
3530 char *_start_after = opt_str(start_after)
3531 char *_filter_prefix = opt_str(filter_prefix)
3532 ReadOp _read_op = read_op
3533 rados_omap_iter_t iter_addr = NULL
3534 int _max_return = max_return
3535
3536 with nogil:
3537 rados_read_op_omap_get_vals2(_read_op.read_op, _start_after, _filter_prefix,
3538 _max_return, &iter_addr, NULL, NULL)
3539 it = OmapIterator(self)
3540 it.ctx = iter_addr
3541 return it, 0 # 0 is meaningless; there for backward-compat
3542
3543 @requires(('read_op', ReadOp), ('start_after', str_type), ('max_return', int))
3544 def get_omap_keys(self, read_op, start_after, max_return):
3545 """
3546 get the omap keys
3547 :para read_op: read operation object
3548 :type read_op: ReadOp
3549 :para start_after: list keys starting after start_after
3550 :type start_after: str
3551 :para max_return: list no more than max_return key/value pairs
3552 :type max_return: int
3553 :returns: an iterator over the requested omap values, return value from this action
3554 """
3555 start_after = cstr(start_after, 'start_after') if start_after else None
3556 cdef:
3557 char *_start_after = opt_str(start_after)
3558 ReadOp _read_op = read_op
3559 rados_omap_iter_t iter_addr = NULL
3560 int _max_return = max_return
3561
3562 with nogil:
3563 rados_read_op_omap_get_keys2(_read_op.read_op, _start_after,
3564 _max_return, &iter_addr, NULL, NULL)
3565 it = OmapIterator(self)
3566 it.ctx = iter_addr
3567 return it, 0 # 0 is meaningless; there for backward-compat
3568
3569 @requires(('read_op', ReadOp), ('keys', tuple))
3570 def get_omap_vals_by_keys(self, read_op, keys):
3571 """
3572 get the omap values by keys
3573 :para read_op: read operation object
3574 :type read_op: ReadOp
3575 :para keys: input key tuple
3576 :type keys: tuple
3577 :returns: an iterator over the requested omap values, return value from this action
3578 """
3579 keys = cstr_list(keys, 'keys')
3580 cdef:
3581 ReadOp _read_op = read_op
3582 rados_omap_iter_t iter_addr
3583 char **_keys = to_bytes_array(keys)
3584 size_t key_num = len(keys)
3585
3586 try:
3587 with nogil:
3588 rados_read_op_omap_get_vals_by_keys(_read_op.read_op,
3589 <const char**>_keys,
3590 key_num, &iter_addr, NULL)
3591 it = OmapIterator(self)
3592 it.ctx = iter_addr
3593 return it, 0 # 0 is meaningless; there for backward-compat
3594 finally:
3595 free(_keys)
3596
3597 @requires(('write_op', WriteOp), ('keys', tuple))
3598 def remove_omap_keys(self, write_op, keys):
3599 """
3600 remove omap keys specifiled
3601 :para write_op: write operation object
3602 :type write_op: WriteOp
3603 :para keys: input key tuple
3604 :type keys: tuple
3605 """
3606
3607 keys = cstr_list(keys, 'keys')
3608 cdef:
3609 WriteOp _write_op = write_op
3610 size_t key_num = len(keys)
3611 char **_keys = to_bytes_array(keys)
3612
3613 try:
3614 with nogil:
3615 rados_write_op_omap_rm_keys(_write_op.write_op, <const char**>_keys, key_num)
3616 finally:
3617 free(_keys)
3618
3619 @requires(('write_op', WriteOp))
3620 def clear_omap(self, write_op):
3621 """
3622 Remove all key/value pairs from an object
3623 :para write_op: write operation object
3624 :type write_op: WriteOp
3625 """
3626
3627 cdef:
3628 WriteOp _write_op = write_op
3629
3630 with nogil:
3631 rados_write_op_omap_clear(_write_op.write_op)
3632
3633 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('desc', str_type),
3634 ('duration', opt(int)), ('flags', int))
3635 def lock_exclusive(self, key, name, cookie, desc="", duration=None, flags=0):
3636
3637 """
3638 Take an exclusive lock on an object
3639
3640 :param key: name of the object
3641 :type key: str
3642 :param name: name of the lock
3643 :type name: str
3644 :param cookie: cookie of the lock
3645 :type cookie: str
3646 :param desc: description of the lock
3647 :type desc: str
3648 :param duration: duration of the lock in seconds
3649 :type duration: int
3650 :param flags: flags
3651 :type flags: int
3652
3653 :raises: :class:`TypeError`
3654 :raises: :class:`Error`
3655 """
3656 self.require_ioctx_open()
3657
3658 key = cstr(key, 'key')
3659 name = cstr(name, 'name')
3660 cookie = cstr(cookie, 'cookie')
3661 desc = cstr(desc, 'desc')
3662
3663 cdef:
3664 char* _key = key
3665 char* _name = name
3666 char* _cookie = cookie
3667 char* _desc = desc
3668 uint8_t _flags = flags
3669 timeval _duration
3670
3671 if duration is None:
3672 with nogil:
3673 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3674 NULL, _flags)
3675 else:
3676 _duration.tv_sec = duration
3677 with nogil:
3678 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3679 &_duration, _flags)
3680
3681 if ret < 0:
3682 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3683
3684 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('tag', str_type),
3685 ('desc', str_type), ('duration', opt(int)), ('flags', int))
3686 def lock_shared(self, key, name, cookie, tag, desc="", duration=None, flags=0):
3687
3688 """
3689 Take a shared lock on an object
3690
3691 :param key: name of the object
3692 :type key: str
3693 :param name: name of the lock
3694 :type name: str
3695 :param cookie: cookie of the lock
3696 :type cookie: str
3697 :param tag: tag of the lock
3698 :type tag: str
3699 :param desc: description of the lock
3700 :type desc: str
3701 :param duration: duration of the lock in seconds
3702 :type duration: int
3703 :param flags: flags
3704 :type flags: int
3705
3706 :raises: :class:`TypeError`
3707 :raises: :class:`Error`
3708 """
3709 self.require_ioctx_open()
3710
3711 key = cstr(key, 'key')
3712 tag = cstr(tag, 'tag')
3713 name = cstr(name, 'name')
3714 cookie = cstr(cookie, 'cookie')
3715 desc = cstr(desc, 'desc')
3716
3717 cdef:
3718 char* _key = key
3719 char* _tag = tag
3720 char* _name = name
3721 char* _cookie = cookie
3722 char* _desc = desc
3723 uint8_t _flags = flags
3724 timeval _duration
3725
3726 if duration is None:
3727 with nogil:
3728 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3729 NULL, _flags)
3730 else:
3731 _duration.tv_sec = duration
3732 with nogil:
3733 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3734 &_duration, _flags)
3735 if ret < 0:
3736 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3737
3738 @requires(('key', str_type), ('name', str_type), ('cookie', str_type))
3739 def unlock(self, key, name, cookie):
3740
3741 """
3742 Release a shared or exclusive lock on an object
3743
3744 :param key: name of the object
3745 :type key: str
3746 :param name: name of the lock
3747 :type name: str
3748 :param cookie: cookie of the lock
3749 :type cookie: str
3750
3751 :raises: :class:`TypeError`
3752 :raises: :class:`Error`
3753 """
3754 self.require_ioctx_open()
3755
3756 key = cstr(key, 'key')
3757 name = cstr(name, 'name')
3758 cookie = cstr(cookie, 'cookie')
3759
3760 cdef:
3761 char* _key = key
3762 char* _name = name
3763 char* _cookie = cookie
3764
3765 with nogil:
3766 ret = rados_unlock(self.io, _key, _name, _cookie)
3767 if ret < 0:
3768 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3769
3770 def set_osdmap_full_try(self):
3771 """
3772 Set global osdmap_full_try label to true
3773 """
3774 with nogil:
3775 rados_set_osdmap_full_try(self.io)
3776
3777 def unset_osdmap_full_try(self):
3778 """
3779 Unset
3780 """
3781 with nogil:
3782 rados_unset_osdmap_full_try(self.io)
3783
3784 def application_enable(self, app_name, force=False):
3785 """
3786 Enable an application on an OSD pool
3787
3788 :param app_name: application name
3789 :type app_name: str
3790 :param force: False if only a single app should exist per pool
3791 :type expire_seconds: boool
3792
3793 :raises: :class:`Error`
3794 """
3795 app_name = cstr(app_name, 'app_name')
3796 cdef:
3797 char *_app_name = app_name
3798 int _force = (1 if force else 0)
3799
3800 with nogil:
3801 ret = rados_application_enable(self.io, _app_name, _force)
3802 if ret < 0:
3803 raise make_ex(ret, "error enabling application")
3804
3805 def application_list(self):
3806 """
3807 Returns a list of enabled applications
3808
3809 :returns: list of app name string
3810 """
3811 cdef:
3812 size_t length = 128
3813 char *apps = NULL
3814
3815 try:
3816 while True:
3817 apps = <char *>realloc_chk(apps, length)
3818 with nogil:
3819 ret = rados_application_list(self.io, apps, &length)
3820 if ret == 0:
3821 return [decode_cstr(app) for app in
3822 apps[:length].split(b'\0') if app]
3823 elif ret == -errno.ENOENT:
3824 return None
3825 elif ret == -errno.ERANGE:
3826 pass
3827 else:
3828 raise make_ex(ret, "error listing applications")
3829 finally:
3830 free(apps)
3831
3832 def application_metadata_set(self, app_name, key, value):
3833 """
3834 Sets application metadata on an OSD pool
3835
3836 :param app_name: application name
3837 :type app_name: str
3838 :param key: metadata key
3839 :type key: str
3840 :param value: metadata value
3841 :type value: str
3842
3843 :raises: :class:`Error`
3844 """
3845 app_name = cstr(app_name, 'app_name')
3846 key = cstr(key, 'key')
3847 value = cstr(value, 'value')
3848 cdef:
3849 char *_app_name = app_name
3850 char *_key = key
3851 char *_value = value
3852
3853 with nogil:
3854 ret = rados_application_metadata_set(self.io, _app_name, _key,
3855 _value)
3856 if ret < 0:
3857 raise make_ex(ret, "error setting application metadata")
3858
3859 def application_metadata_remove(self, app_name, key):
3860 """
3861 Remove application metadata from an OSD pool
3862
3863 :param app_name: application name
3864 :type app_name: str
3865 :param key: metadata key
3866 :type key: str
3867
3868 :raises: :class:`Error`
3869 """
3870 app_name = cstr(app_name, 'app_name')
3871 key = cstr(key, 'key')
3872 cdef:
3873 char *_app_name = app_name
3874 char *_key = key
3875
3876 with nogil:
3877 ret = rados_application_metadata_remove(self.io, _app_name, _key)
3878 if ret < 0:
3879 raise make_ex(ret, "error removing application metadata")
3880
3881 def application_metadata_list(self, app_name):
3882 """
3883 Returns a list of enabled applications
3884
3885 :param app_name: application name
3886 :type app_name: str
3887 :returns: list of key/value tuples
3888 """
3889 app_name = cstr(app_name, 'app_name')
3890 cdef:
3891 char *_app_name = app_name
3892 size_t key_length = 128
3893 size_t val_length = 128
3894 char *c_keys = NULL
3895 char *c_vals = NULL
3896
3897 try:
3898 while True:
3899 c_keys = <char *>realloc_chk(c_keys, key_length)
3900 c_vals = <char *>realloc_chk(c_vals, val_length)
3901 with nogil:
3902 ret = rados_application_metadata_list(self.io, _app_name,
3903 c_keys, &key_length,
3904 c_vals, &val_length)
3905 if ret == 0:
3906 keys = [decode_cstr(key) for key in
3907 c_keys[:key_length].split(b'\0')]
3908 vals = [decode_cstr(val) for val in
3909 c_vals[:val_length].split(b'\0')]
3910 return zip(keys, vals)[:-1]
3911 elif ret == -errno.ERANGE:
3912 pass
3913 else:
3914 raise make_ex(ret, "error listing application metadata")
3915 finally:
3916 free(c_keys)
3917 free(c_vals)
3918
3919 def alignment(self):
3920 """
3921 Returns pool alignment
3922
3923 :returns:
3924 Number of alignment bytes required by the current pool, or None if
3925 alignment is not required.
3926 """
3927 cdef:
3928 int requires = 0
3929 uint64_t _alignment
3930
3931 with nogil:
3932 ret = rados_ioctx_pool_requires_alignment2(self.io, &requires)
3933 if ret != 0:
3934 raise make_ex(ret, "error checking alignment")
3935
3936 alignment = None
3937 if requires:
3938 with nogil:
3939 ret = rados_ioctx_pool_required_alignment2(self.io, &_alignment)
3940 if ret != 0:
3941 raise make_ex(ret, "error querying alignment")
3942 alignment = _alignment
3943 return alignment
3944
3945
3946 def set_object_locator(func):
3947 def retfunc(self, *args, **kwargs):
3948 if self.locator_key is not None:
3949 old_locator = self.ioctx.get_locator_key()
3950 self.ioctx.set_locator_key(self.locator_key)
3951 retval = func(self, *args, **kwargs)
3952 self.ioctx.set_locator_key(old_locator)
3953 return retval
3954 else:
3955 return func(self, *args, **kwargs)
3956 return retfunc
3957
3958
3959 def set_object_namespace(func):
3960 def retfunc(self, *args, **kwargs):
3961 if self.nspace is None:
3962 raise LogicError("Namespace not set properly in context")
3963 old_nspace = self.ioctx.get_namespace()
3964 self.ioctx.set_namespace(self.nspace)
3965 retval = func(self, *args, **kwargs)
3966 self.ioctx.set_namespace(old_nspace)
3967 return retval
3968 return retfunc
3969
3970
3971 class Object(object):
3972 """Rados object wrapper, makes the object look like a file"""
3973 def __init__(self, ioctx, key, locator_key=None, nspace=None):
3974 self.key = key
3975 self.ioctx = ioctx
3976 self.offset = 0
3977 self.state = "exists"
3978 self.locator_key = locator_key
3979 self.nspace = "" if nspace is None else nspace
3980
3981 def __str__(self):
3982 return "rados.Object(ioctx=%s,key=%s,nspace=%s,locator=%s)" % \
3983 (str(self.ioctx), self.key, "--default--"
3984 if self.nspace is "" else self.nspace, self.locator_key)
3985
3986 def require_object_exists(self):
3987 if self.state != "exists":
3988 raise ObjectStateError("The object is %s" % self.state)
3989
3990 @set_object_locator
3991 @set_object_namespace
3992 def read(self, length=1024 * 1024):
3993 self.require_object_exists()
3994 ret = self.ioctx.read(self.key, length, self.offset)
3995 self.offset += len(ret)
3996 return ret
3997
3998 @set_object_locator
3999 @set_object_namespace
4000 def write(self, string_to_write):
4001 self.require_object_exists()
4002 ret = self.ioctx.write(self.key, string_to_write, self.offset)
4003 if ret == 0:
4004 self.offset += len(string_to_write)
4005 return ret
4006
4007 @set_object_locator
4008 @set_object_namespace
4009 def remove(self):
4010 self.require_object_exists()
4011 self.ioctx.remove_object(self.key)
4012 self.state = "removed"
4013
4014 @set_object_locator
4015 @set_object_namespace
4016 def stat(self):
4017 self.require_object_exists()
4018 return self.ioctx.stat(self.key)
4019
4020 def seek(self, position):
4021 self.require_object_exists()
4022 self.offset = position
4023
4024 @set_object_locator
4025 @set_object_namespace
4026 def get_xattr(self, xattr_name):
4027 self.require_object_exists()
4028 return self.ioctx.get_xattr(self.key, xattr_name)
4029
4030 @set_object_locator
4031 @set_object_namespace
4032 def get_xattrs(self):
4033 self.require_object_exists()
4034 return self.ioctx.get_xattrs(self.key)
4035
4036 @set_object_locator
4037 @set_object_namespace
4038 def set_xattr(self, xattr_name, xattr_value):
4039 self.require_object_exists()
4040 return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)
4041
4042 @set_object_locator
4043 @set_object_namespace
4044 def rm_xattr(self, xattr_name):
4045 self.require_object_exists()
4046 return self.ioctx.rm_xattr(self.key, xattr_name)
4047
4048 MONITOR_LEVELS = [
4049 "debug",
4050 "info",
4051 "warn", "warning",
4052 "err", "error",
4053 "sec",
4054 ]
4055
4056
4057 class MonitorLog(object):
4058 # NOTE(sileht): Keep this class for backward compat
4059 # method moved to Rados.monitor_log()
4060 """
4061 For watching cluster log messages. Instantiate an object and keep
4062 it around while callback is periodically called. Construct with
4063 'level' to monitor 'level' messages (one of MONITOR_LEVELS).
4064 arg will be passed to the callback.
4065
4066 callback will be called with:
4067 arg (given to __init__)
4068 line (the full line, including timestamp, who, level, msg)
4069 who (which entity issued the log message)
4070 timestamp_sec (sec of a struct timespec)
4071 timestamp_nsec (sec of a struct timespec)
4072 seq (sequence number)
4073 level (string representing the level of the log message)
4074 msg (the message itself)
4075 callback's return value is ignored
4076 """
4077 def __init__(self, cluster, level, callback, arg):
4078 self.level = level
4079 self.callback = callback
4080 self.arg = arg
4081 self.cluster = cluster
4082 self.cluster.monitor_log(level, callback, arg)
4083