]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/rados/rados.pyx
bump version to 12.2.12-pve1
[ceph.git] / ceph / src / pybind / rados / rados.pyx
CommitLineData
7c673cae
FG
1# cython: embedsignature=True
2"""
3This module is a thin wrapper around librados.
4
5Error codes from librados are turned into exceptions that subclass
6:class:`Error`. Almost all methods may raise :class:`Error(the base class of all rados exceptions), :class:`PermissionError`
7(the base class of all rados exceptions), :class:`PermissionError`
8and :class:`IOError`, in addition to those documented for the
9method.
10"""
11# Copyright 2011 Josh Durgin
12# Copyright 2011, Hannu Valtonen <hannu.valtonen@ormod.com>
13# Copyright 2015 Hector Martin <marcan@marcan.st>
14# Copyright 2016 Mehdi Abaakouk <sileht@redhat.com>
15
16from cpython cimport PyObject, ref
17from cpython.pycapsule cimport *
18from libc cimport errno
19from libc.stdint cimport *
20from libc.stdlib cimport malloc, realloc, free
21
22import sys
23import threading
24import time
25
26from collections import Callable
27from datetime import datetime
28from functools import partial, wraps
29from itertools import chain
30
31# Are we running Python 2.x
32if sys.version_info[0] < 3:
33 str_type = basestring
34else:
35 str_type = str
36
37
38cdef extern from "Python.h":
39 # These are in cpython/string.pxd, but use "object" types instead of
40 # PyObject*, which invokes assumptions in cpython that we need to
41 # legitimately break to implement zero-copy string buffers in Ioctx.read().
42 # This is valid use of the Python API and documented as a special case.
43 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
44 char* PyBytes_AsString(PyObject *string) except NULL
45 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
46 void PyEval_InitThreads()
47
48
49cdef extern from "time.h":
50 ctypedef long int time_t
51 ctypedef long int suseconds_t
52
53
54cdef extern from "sys/time.h":
55 cdef struct timeval:
56 time_t tv_sec
57 suseconds_t tv_usec
58
59
60cdef extern from "rados/rados_types.h" nogil:
61 cdef char* _LIBRADOS_ALL_NSPACES "LIBRADOS_ALL_NSPACES"
62
63
64cdef extern from "rados/librados.h" nogil:
65 enum:
66 _LIBRADOS_OP_FLAG_EXCL "LIBRADOS_OP_FLAG_EXCL"
67 _LIBRADOS_OP_FLAG_FAILOK "LIBRADOS_OP_FLAG_FAILOK"
68 _LIBRADOS_OP_FLAG_FADVISE_RANDOM "LIBRADOS_OP_FLAG_FADVISE_RANDOM"
69 _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL "LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL"
70 _LIBRADOS_OP_FLAG_FADVISE_WILLNEED "LIBRADOS_OP_FLAG_FADVISE_WILLNEED"
71 _LIBRADOS_OP_FLAG_FADVISE_DONTNEED "LIBRADOS_OP_FLAG_FADVISE_DONTNEED"
72 _LIBRADOS_OP_FLAG_FADVISE_NOCACHE "LIBRADOS_OP_FLAG_FADVISE_NOCACHE"
73
74
75 enum:
76 _LIBRADOS_OPERATION_NOFLAG "LIBRADOS_OPERATION_NOFLAG"
77 _LIBRADOS_OPERATION_BALANCE_READS "LIBRADOS_OPERATION_BALANCE_READS"
78 _LIBRADOS_OPERATION_LOCALIZE_READS "LIBRADOS_OPERATION_LOCALIZE_READS"
79 _LIBRADOS_OPERATION_ORDER_READS_WRITES "LIBRADOS_OPERATION_ORDER_READS_WRITES"
80 _LIBRADOS_OPERATION_IGNORE_CACHE "LIBRADOS_OPERATION_IGNORE_CACHE"
81 _LIBRADOS_OPERATION_SKIPRWLOCKS "LIBRADOS_OPERATION_SKIPRWLOCKS"
82 _LIBRADOS_OPERATION_IGNORE_OVERLAY "LIBRADOS_OPERATION_IGNORE_OVERLAY"
83 _LIBRADOS_CREATE_EXCLUSIVE "LIBRADOS_CREATE_EXCLUSIVE"
84 _LIBRADOS_CREATE_IDEMPOTENT "LIBRADOS_CREATE_IDEMPOTENT"
85
86 cdef uint64_t _LIBRADOS_SNAP_HEAD "LIBRADOS_SNAP_HEAD"
87
88 ctypedef void* rados_t
89 ctypedef void* rados_config_t
90 ctypedef void* rados_ioctx_t
91 ctypedef void* rados_xattrs_iter_t
92 ctypedef void* rados_omap_iter_t
93 ctypedef void* rados_list_ctx_t
94 ctypedef uint64_t rados_snap_t
95 ctypedef void *rados_write_op_t
96 ctypedef void *rados_read_op_t
97 ctypedef void *rados_completion_t
98 ctypedef void (*rados_callback_t)(rados_completion_t cb, void *arg)
99 ctypedef void (*rados_log_callback_t)(void *arg, const char *line, const char *who,
100 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
224ce89b 101 ctypedef void (*rados_log_callback2_t)(void *arg, const char *line, const char *channel, const char *who, const char *name,
31f18b77 102 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
7c673cae
FG
103
104
105 cdef struct rados_cluster_stat_t:
106 uint64_t kb
107 uint64_t kb_used
108 uint64_t kb_avail
109 uint64_t num_objects
110
111 cdef struct rados_pool_stat_t:
112 uint64_t num_bytes
113 uint64_t num_kb
114 uint64_t num_objects
115 uint64_t num_object_clones
116 uint64_t num_object_copies
117 uint64_t num_objects_missing_on_primary
118 uint64_t num_objects_unfound
119 uint64_t num_objects_degraded
120 uint64_t num_rd
121 uint64_t num_rd_kb
122 uint64_t num_wr
123 uint64_t num_wr_kb
124
125 void rados_buffer_free(char *buf)
126
127 void rados_version(int *major, int *minor, int *extra)
128 int rados_create2(rados_t *pcluster, const char *const clustername,
129 const char * const name, uint64_t flags)
130 int rados_create_with_context(rados_t *cluster, rados_config_t cct)
131 int rados_connect(rados_t cluster)
132 void rados_shutdown(rados_t cluster)
133 int rados_conf_read_file(rados_t cluster, const char *path)
134 int rados_conf_parse_argv_remainder(rados_t cluster, int argc, const char **argv, const char **remargv)
135 int rados_conf_parse_env(rados_t cluster, const char *var)
136 int rados_conf_set(rados_t cluster, char *option, const char *value)
137 int rados_conf_get(rados_t cluster, char *option, char *buf, size_t len)
138
139 int rados_ioctx_pool_stat(rados_ioctx_t io, rados_pool_stat_t *stats)
140 int64_t rados_pool_lookup(rados_t cluster, const char *pool_name)
141 int rados_pool_reverse_lookup(rados_t cluster, int64_t id, char *buf, size_t maxlen)
142 int rados_pool_create(rados_t cluster, const char *pool_name)
143 int rados_pool_create_with_auid(rados_t cluster, const char *pool_name, uint64_t auid)
144 int rados_pool_create_with_crush_rule(rados_t cluster, const char *pool_name, uint8_t crush_rule_num)
145 int rados_pool_create_with_all(rados_t cluster, const char *pool_name, uint64_t auid, uint8_t crush_rule_num)
146 int rados_pool_get_base_tier(rados_t cluster, int64_t pool, int64_t *base_tier)
147 int rados_pool_list(rados_t cluster, char *buf, size_t len)
148 int rados_pool_delete(rados_t cluster, const char *pool_name)
149 int rados_inconsistent_pg_list(rados_t cluster, int64_t pool, char *buf, size_t len)
150
151 int rados_cluster_stat(rados_t cluster, rados_cluster_stat_t *result)
152 int rados_cluster_fsid(rados_t cluster, char *buf, size_t len)
153 int rados_blacklist_add(rados_t cluster, char *client_address, uint32_t expire_seconds)
c07f9fc5
FG
154 int rados_application_enable(rados_ioctx_t io, const char *app_name,
155 int force)
156 int rados_application_list(rados_ioctx_t io, char *values,
157 size_t *values_len)
158 int rados_application_metadata_get(rados_ioctx_t io, const char *app_name,
159 const char *key, char *value,
160 size_t *value_len)
161 int rados_application_metadata_set(rados_ioctx_t io, const char *app_name,
162 const char *key, const char *value)
163 int rados_application_metadata_remove(rados_ioctx_t io,
164 const char *app_name, const char *key)
165 int rados_application_metadata_list(rados_ioctx_t io,
166 const char *app_name, char *keys,
167 size_t *key_len, char *values,
168 size_t *value_len)
7c673cae
FG
169 int rados_ping_monitor(rados_t cluster, const char *mon_id, char **outstr, size_t *outstrlen)
170 int rados_mon_command(rados_t cluster, const char **cmd, size_t cmdlen,
171 const char *inbuf, size_t inbuflen,
172 char **outbuf, size_t *outbuflen,
173 char **outs, size_t *outslen)
174 int rados_mgr_command(rados_t cluster, const char **cmd, size_t cmdlen,
175 const char *inbuf, size_t inbuflen,
176 char **outbuf, size_t *outbuflen,
177 char **outs, size_t *outslen)
178 int rados_mon_command_target(rados_t cluster, const char *name, const char **cmd, size_t cmdlen,
179 const char *inbuf, size_t inbuflen,
180 char **outbuf, size_t *outbuflen,
181 char **outs, size_t *outslen)
182 int rados_osd_command(rados_t cluster, int osdid, const char **cmd, size_t cmdlen,
183 const char *inbuf, size_t inbuflen,
184 char **outbuf, size_t *outbuflen,
185 char **outs, size_t *outslen)
186 int rados_pg_command(rados_t cluster, const char *pgstr, const char **cmd, size_t cmdlen,
187 const char *inbuf, size_t inbuflen,
188 char **outbuf, size_t *outbuflen,
189 char **outs, size_t *outslen)
190 int rados_monitor_log(rados_t cluster, const char *level, rados_log_callback_t cb, void *arg)
31f18b77 191 int rados_monitor_log2(rados_t cluster, const char *level, rados_log_callback2_t cb, void *arg)
7c673cae
FG
192
193 int rados_wait_for_latest_osdmap(rados_t cluster)
194
195 int rados_ioctx_create(rados_t cluster, const char *pool_name, rados_ioctx_t *ioctx)
196 void rados_ioctx_destroy(rados_ioctx_t io)
197 int rados_ioctx_pool_set_auid(rados_ioctx_t io, uint64_t auid)
198 void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key)
199 void rados_ioctx_set_namespace(rados_ioctx_t io, const char * nspace)
200
201 uint64_t rados_get_last_version(rados_ioctx_t io)
202 int rados_stat(rados_ioctx_t io, const char *o, uint64_t *psize, time_t *pmtime)
203 int rados_write(rados_ioctx_t io, const char *oid, const char *buf, size_t len, uint64_t off)
204 int rados_write_full(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
205 int rados_append(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
206 int rados_read(rados_ioctx_t io, const char *oid, char *buf, size_t len, uint64_t off)
207 int rados_remove(rados_ioctx_t io, const char *oid)
208 int rados_trunc(rados_ioctx_t io, const char *oid, uint64_t size)
209 int rados_getxattr(rados_ioctx_t io, const char *o, const char *name, char *buf, size_t len)
210 int rados_setxattr(rados_ioctx_t io, const char *o, const char *name, const char *buf, size_t len)
211 int rados_rmxattr(rados_ioctx_t io, const char *o, const char *name)
212 int rados_getxattrs(rados_ioctx_t io, const char *oid, rados_xattrs_iter_t *iter)
213 int rados_getxattrs_next(rados_xattrs_iter_t iter, const char **name, const char **val, size_t *len)
214 void rados_getxattrs_end(rados_xattrs_iter_t iter)
215
216 int rados_nobjects_list_open(rados_ioctx_t io, rados_list_ctx_t *ctx)
217 int rados_nobjects_list_next(rados_list_ctx_t ctx, const char **entry, const char **key, const char **nspace)
218 void rados_nobjects_list_close(rados_list_ctx_t ctx)
219
220 int rados_ioctx_snap_rollback(rados_ioctx_t io, const char * oid, const char * snapname)
221 int rados_ioctx_snap_create(rados_ioctx_t io, const char * snapname)
222 int rados_ioctx_snap_remove(rados_ioctx_t io, const char * snapname)
223 int rados_ioctx_snap_lookup(rados_ioctx_t io, const char * name, rados_snap_t * id)
224 int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id, char * name, int maxlen)
225 void rados_ioctx_snap_set_read(rados_ioctx_t io, rados_snap_t snap)
226 int rados_ioctx_snap_list(rados_ioctx_t io, rados_snap_t * snaps, int maxlen)
227 int rados_ioctx_snap_get_stamp(rados_ioctx_t io, rados_snap_t id, time_t * t)
228
28e407b8
AA
229 int rados_ioctx_selfmanaged_snap_create(rados_ioctx_t io,
230 rados_snap_t *snapid)
231 int rados_ioctx_selfmanaged_snap_remove(rados_ioctx_t io,
232 rados_snap_t snapid)
233 int rados_ioctx_selfmanaged_snap_set_write_ctx(rados_ioctx_t io,
234 rados_snap_t snap_seq,
235 rados_snap_t *snap,
236 int num_snaps)
237 int rados_ioctx_selfmanaged_snap_rollback(rados_ioctx_t io, const char *oid,
238 rados_snap_t snapid)
239
7c673cae
FG
240 int rados_lock_exclusive(rados_ioctx_t io, const char * oid, const char * name,
241 const char * cookie, const char * desc,
242 timeval * duration, uint8_t flags)
243 int rados_lock_shared(rados_ioctx_t io, const char * o, const char * name,
244 const char * cookie, const char * tag, const char * desc,
245 timeval * duration, uint8_t flags)
246 int rados_unlock(rados_ioctx_t io, const char * o, const char * name, const char * cookie)
247
248 rados_write_op_t rados_create_write_op()
249 void rados_release_write_op(rados_write_op_t write_op)
250
251 rados_read_op_t rados_create_read_op()
252 void rados_release_read_op(rados_read_op_t read_op)
253
254 int rados_aio_create_completion(void * cb_arg, rados_callback_t cb_complete, rados_callback_t cb_safe, rados_completion_t * pc)
255 void rados_aio_release(rados_completion_t c)
256 int rados_aio_stat(rados_ioctx_t io, const char *oid, rados_completion_t completion, uint64_t *psize, time_t *pmtime)
257 int rados_aio_write(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len, uint64_t off)
258 int rados_aio_append(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
259 int rados_aio_write_full(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
260 int rados_aio_remove(rados_ioctx_t io, const char * oid, rados_completion_t completion)
261 int rados_aio_read(rados_ioctx_t io, const char * oid, rados_completion_t completion, char * buf, size_t len, uint64_t off)
262 int rados_aio_flush(rados_ioctx_t io)
263
264 int rados_aio_get_return_value(rados_completion_t c)
265 int rados_aio_wait_for_complete_and_cb(rados_completion_t c)
266 int rados_aio_wait_for_safe_and_cb(rados_completion_t c)
267 int rados_aio_wait_for_complete(rados_completion_t c)
268 int rados_aio_wait_for_safe(rados_completion_t c)
269 int rados_aio_is_complete(rados_completion_t c)
270 int rados_aio_is_safe(rados_completion_t c)
271
272 int rados_exec(rados_ioctx_t io, const char * oid, const char * cls, const char * method,
273 const char * in_buf, size_t in_len, char * buf, size_t out_len)
274 int rados_aio_exec(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * cls, const char * method,
275 const char * in_buf, size_t in_len, char * buf, size_t out_len)
276
277 int rados_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, const char * oid, time_t * mtime, int flags)
278 int rados_aio_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, time_t *mtime, int flags)
279 void rados_write_op_omap_set(rados_write_op_t write_op, const char * const* keys, const char * const* vals, const size_t * lens, size_t num)
280 void rados_write_op_omap_rm_keys(rados_write_op_t write_op, const char * const* keys, size_t keys_len)
281 void rados_write_op_omap_clear(rados_write_op_t write_op)
282 void rados_write_op_set_flags(rados_write_op_t write_op, int flags)
283
284 void rados_write_op_create(rados_write_op_t write_op, int exclusive, const char *category)
285 void rados_write_op_append(rados_write_op_t write_op, const char *buffer, size_t len)
286 void rados_write_op_write_full(rados_write_op_t write_op, const char *buffer, size_t len)
91327a77 287 void rados_write_op_assert_version(rados_write_op_t write_op, uint64_t ver)
7c673cae
FG
288 void rados_write_op_write(rados_write_op_t write_op, const char *buffer, size_t len, uint64_t offset)
289 void rados_write_op_remove(rados_write_op_t write_op)
290 void rados_write_op_truncate(rados_write_op_t write_op, uint64_t offset)
291 void rados_write_op_zero(rados_write_op_t write_op, uint64_t offset, uint64_t len)
292
d2e6a577
FG
293 void rados_read_op_omap_get_vals2(rados_read_op_t read_op, const char * start_after, const char * filter_prefix, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
294 void rados_read_op_omap_get_keys2(rados_read_op_t read_op, const char * start_after, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
7c673cae
FG
295 void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op, const char * const* keys, size_t keys_len, rados_omap_iter_t * iter, int * prval)
296 int rados_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, const char * oid, int flags)
297 int rados_aio_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, int flags)
298 void rados_read_op_set_flags(rados_read_op_t read_op, int flags)
299 int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len)
300 void rados_omap_get_end(rados_omap_iter_t iter)
301
302
303LIBRADOS_OP_FLAG_EXCL = _LIBRADOS_OP_FLAG_EXCL
304LIBRADOS_OP_FLAG_FAILOK = _LIBRADOS_OP_FLAG_FAILOK
305LIBRADOS_OP_FLAG_FADVISE_RANDOM = _LIBRADOS_OP_FLAG_FADVISE_RANDOM
306LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL = _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL
307LIBRADOS_OP_FLAG_FADVISE_WILLNEED = _LIBRADOS_OP_FLAG_FADVISE_WILLNEED
308LIBRADOS_OP_FLAG_FADVISE_DONTNEED = _LIBRADOS_OP_FLAG_FADVISE_DONTNEED
309LIBRADOS_OP_FLAG_FADVISE_NOCACHE = _LIBRADOS_OP_FLAG_FADVISE_NOCACHE
310
311LIBRADOS_SNAP_HEAD = _LIBRADOS_SNAP_HEAD
312
313LIBRADOS_OPERATION_NOFLAG = _LIBRADOS_OPERATION_NOFLAG
314LIBRADOS_OPERATION_BALANCE_READS = _LIBRADOS_OPERATION_BALANCE_READS
315LIBRADOS_OPERATION_LOCALIZE_READS = _LIBRADOS_OPERATION_LOCALIZE_READS
316LIBRADOS_OPERATION_ORDER_READS_WRITES = _LIBRADOS_OPERATION_ORDER_READS_WRITES
317LIBRADOS_OPERATION_IGNORE_CACHE = _LIBRADOS_OPERATION_IGNORE_CACHE
318LIBRADOS_OPERATION_SKIPRWLOCKS = _LIBRADOS_OPERATION_SKIPRWLOCKS
319LIBRADOS_OPERATION_IGNORE_OVERLAY = _LIBRADOS_OPERATION_IGNORE_OVERLAY
320
321LIBRADOS_ALL_NSPACES = _LIBRADOS_ALL_NSPACES.decode('utf-8')
322
323LIBRADOS_CREATE_EXCLUSIVE = _LIBRADOS_CREATE_EXCLUSIVE
324LIBRADOS_CREATE_IDEMPOTENT = _LIBRADOS_CREATE_IDEMPOTENT
325
326ANONYMOUS_AUID = 0xffffffffffffffff
327ADMIN_AUID = 0
328
329
330class Error(Exception):
331 """ `Error` class, derived from `Exception` """
224ce89b 332 def __init__(self, message, errno=None):
1adf2230 333 super(Exception, self).__init__(message)
7c673cae 334 self.errno = errno
7c673cae
FG
335
336 def __str__(self):
1adf2230 337 msg = super(Exception, self).__str__()
224ce89b
WB
338 if self.errno is None:
339 return msg
340 return '[errno {0}] {1}'.format(self.errno, msg)
7c673cae 341
224ce89b
WB
342 def __reduce__(self):
343 return (self.__class__, (self.message, self.errno))
7c673cae 344
1adf2230
AA
345class InvalidArgumentError(Error):
346 pass
347
348class OSError(Error):
349 """ `OSError` class, derived from `Error` """
350 pass
351
7c673cae
FG
352class InterruptedOrTimeoutError(OSError):
353 """ `InterruptedOrTimeoutError` class, derived from `OSError` """
354 pass
355
356
357class PermissionError(OSError):
358 """ `PermissionError` class, derived from `OSError` """
359 pass
360
361
362class PermissionDeniedError(OSError):
363 """ deal with EACCES related. """
364 pass
365
366
367class ObjectNotFound(OSError):
368 """ `ObjectNotFound` class, derived from `OSError` """
369 pass
370
371
372class NoData(OSError):
373 """ `NoData` class, derived from `OSError` """
374 pass
375
376
377class ObjectExists(OSError):
378 """ `ObjectExists` class, derived from `OSError` """
379 pass
380
381
382class ObjectBusy(OSError):
383 """ `ObjectBusy` class, derived from `IOError` """
384 pass
385
386
387class IOError(OSError):
388 """ `ObjectBusy` class, derived from `OSError` """
389 pass
390
391
392class NoSpace(OSError):
393 """ `NoSpace` class, derived from `OSError` """
394 pass
395
396
397class RadosStateError(Error):
398 """ `RadosStateError` class, derived from `Error` """
399 pass
400
401
402class IoctxStateError(Error):
403 """ `IoctxStateError` class, derived from `Error` """
404 pass
405
406
407class ObjectStateError(Error):
408 """ `ObjectStateError` class, derived from `Error` """
409 pass
410
411
412class LogicError(Error):
413 """ `` class, derived from `Error` """
414 pass
415
416
417class TimedOut(OSError):
418 """ `TimedOut` class, derived from `OSError` """
419 pass
420
421
422IF UNAME_SYSNAME == "FreeBSD":
423 cdef errno_to_exception = {
424 errno.EPERM : PermissionError,
425 errno.ENOENT : ObjectNotFound,
426 errno.EIO : IOError,
427 errno.ENOSPC : NoSpace,
428 errno.EEXIST : ObjectExists,
429 errno.EBUSY : ObjectBusy,
430 errno.ENOATTR : NoData,
431 errno.EINTR : InterruptedOrTimeoutError,
432 errno.ETIMEDOUT : TimedOut,
433 errno.EACCES : PermissionDeniedError,
434 errno.EINVAL : InvalidArgumentError,
435 }
436ELSE:
437 cdef errno_to_exception = {
438 errno.EPERM : PermissionError,
439 errno.ENOENT : ObjectNotFound,
440 errno.EIO : IOError,
441 errno.ENOSPC : NoSpace,
442 errno.EEXIST : ObjectExists,
443 errno.EBUSY : ObjectBusy,
444 errno.ENODATA : NoData,
445 errno.EINTR : InterruptedOrTimeoutError,
446 errno.ETIMEDOUT : TimedOut,
447 errno.EACCES : PermissionDeniedError,
448 errno.EINVAL : InvalidArgumentError,
449 }
450
451
452cdef make_ex(ret, msg):
453 """
454 Translate a librados return code into an exception.
455
456 :param ret: the return code
457 :type ret: int
458 :param msg: the error message to use
459 :type msg: str
460 :returns: a subclass of :class:`Error`
461 """
462 ret = abs(ret)
463 if ret in errno_to_exception:
224ce89b 464 return errno_to_exception[ret](msg, errno=ret)
7c673cae 465 else:
224ce89b 466 return OSError(msg, errno=ret)
7c673cae
FG
467
468
469# helper to specify an optional argument, where in addition to `cls`, `None`
470# is also acceptable
471def opt(cls):
472 return (cls, None)
473
474
475# validate argument types of an instance method
476# kwargs is an un-ordered dict, so use args instead
477def requires(*types):
478 def is_type_of(v, t):
479 if t is None:
480 return v is None
481 else:
482 return isinstance(v, t)
483
484 def check_type(val, arg_name, arg_type):
485 if isinstance(arg_type, tuple):
486 if any(is_type_of(val, t) for t in arg_type):
487 return
488 type_names = ' or '.join('None' if t is None else t.__name__
489 for t in arg_type)
490 raise TypeError('%s must be %s' % (arg_name, type_names))
491 else:
492 if is_type_of(val, arg_type):
493 return
494 assert(arg_type is not None)
495 raise TypeError('%s must be %s' % (arg_name, arg_type.__name__))
496
497 def wrapper(f):
498 # FIXME(sileht): this stop with
499 # AttributeError: 'method_descriptor' object has no attribute '__module__'
500 # @wraps(f)
501 def validate_func(*args, **kwargs):
502 # ignore the `self` arg
503 pos_args = zip(args[1:], types)
504 named_args = ((kwargs[name], (name, spec)) for name, spec in types
505 if name in kwargs)
506 for arg_val, (arg_name, arg_type) in chain(pos_args, named_args):
507 check_type(arg_val, arg_name, arg_type)
508 return f(*args, **kwargs)
509 return validate_func
510 return wrapper
511
512
513def cstr(val, name, encoding="utf-8", opt=False):
514 """
515 Create a byte string from a Python string
516
517 :param basestring val: Python string
518 :param str name: Name of the string parameter, for exceptions
519 :param str encoding: Encoding to use
520 :param bool opt: If True, None is allowed
521 :rtype: bytes
522 :raises: :class:`InvalidArgument`
523 """
524 if opt and val is None:
525 return None
526 if isinstance(val, bytes):
527 return val
528 elif isinstance(val, unicode):
529 return val.encode(encoding)
530 else:
531 raise TypeError('%s must be a string' % name)
532
533
534def cstr_list(list_str, name, encoding="utf-8"):
535 return [cstr(s, name) for s in list_str]
536
537
538def decode_cstr(val, encoding="utf-8"):
539 """
540 Decode a byte string into a Python string.
541
542 :param bytes val: byte string
543 :rtype: unicode or None
544 """
545 if val is None:
546 return None
547
548 return val.decode(encoding)
549
550
551cdef char* opt_str(s) except? NULL:
552 if s is None:
553 return NULL
554 return s
555
556
557cdef void* realloc_chk(void* ptr, size_t size) except NULL:
558 cdef void *ret = realloc(ptr, size)
559 if ret == NULL:
560 raise MemoryError("realloc failed")
561 return ret
562
563
564cdef size_t * to_csize_t_array(list_int):
565 cdef size_t *ret = <size_t *>malloc(len(list_int) * sizeof(size_t))
566 if ret == NULL:
567 raise MemoryError("malloc failed")
568 for i in xrange(len(list_int)):
569 ret[i] = <size_t>list_int[i]
570 return ret
571
572
573cdef char ** to_bytes_array(list_bytes):
574 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
575 if ret == NULL:
576 raise MemoryError("malloc failed")
577 for i in xrange(len(list_bytes)):
578 ret[i] = <char *>list_bytes[i]
579 return ret
580
581
582
583cdef int __monitor_callback(void *arg, const char *line, const char *who,
584 uint64_t sec, uint64_t nsec, uint64_t seq,
585 const char *level, const char *msg) with gil:
586 cdef object cb_info = <object>arg
587 cb_info[0](cb_info[1], line, who, sec, nsec, seq, level, msg)
588 return 0
589
224ce89b
WB
590cdef int __monitor_callback2(void *arg, const char *line, const char *channel,
591 const char *who,
31f18b77
FG
592 const char *name,
593 uint64_t sec, uint64_t nsec, uint64_t seq,
594 const char *level, const char *msg) with gil:
595 cdef object cb_info = <object>arg
224ce89b 596 cb_info[0](cb_info[1], line, channel, name, who, sec, nsec, seq, level, msg)
31f18b77
FG
597 return 0
598
7c673cae
FG
599
600class Version(object):
601 """ Version information """
602 def __init__(self, major, minor, extra):
603 self.major = major
604 self.minor = minor
605 self.extra = extra
606
607 def __str__(self):
608 return "%d.%d.%d" % (self.major, self.minor, self.extra)
609
610
611cdef class Rados(object):
612 """This class wraps librados functions"""
613 # NOTE(sileht): attributes declared in .pyd
614
615 def __init__(self, *args, **kwargs):
616 PyEval_InitThreads()
617 self.__setup(*args, **kwargs)
618
619 @requires(('rados_id', opt(str_type)), ('name', opt(str_type)), ('clustername', opt(str_type)),
620 ('conffile', opt(str_type)))
621 def __setup(self, rados_id=None, name=None, clustername=None,
622 conf_defaults=None, conffile=None, conf=None, flags=0,
623 context=None):
624 self.monitor_callback = None
31f18b77 625 self.monitor_callback2 = None
7c673cae
FG
626 self.parsed_args = []
627 self.conf_defaults = conf_defaults
628 self.conffile = conffile
629 self.rados_id = rados_id
630
631 if rados_id and name:
632 raise Error("Rados(): can't supply both rados_id and name")
633 elif rados_id:
634 name = 'client.' + rados_id
635 elif name is None:
636 name = 'client.admin'
637 if clustername is None:
638 clustername = ''
639
640 name = cstr(name, 'name')
641 clustername = cstr(clustername, 'clustername')
642 cdef:
643 char *_name = name
644 char *_clustername = clustername
645 int _flags = flags
646 int ret
647
648 if context:
649 # Unpack void* (aka rados_config_t) from capsule
650 rados_config = <rados_config_t> PyCapsule_GetPointer(context, NULL)
651 with nogil:
652 ret = rados_create_with_context(&self.cluster, rados_config)
653 else:
654 with nogil:
655 ret = rados_create2(&self.cluster, _clustername, _name, _flags)
656 if ret != 0:
657 raise Error("rados_initialize failed with error code: %d" % ret)
658
659 self.state = "configuring"
660 # order is important: conf_defaults, then conffile, then conf
661 if conf_defaults:
662 for key, value in conf_defaults.items():
663 self.conf_set(key, value)
664 if conffile is not None:
665 # read the default conf file when '' is given
666 if conffile == '':
667 conffile = None
668 self.conf_read_file(conffile)
669 if conf:
670 for key, value in conf.items():
671 self.conf_set(key, value)
672
673 def require_state(self, *args):
674 """
675 Checks if the Rados object is in a special state
676
677 :raises: RadosStateError
678 """
679 if self.state in args:
680 return
681 raise RadosStateError("You cannot perform that operation on a \
682Rados object in state %s." % self.state)
683
684 def shutdown(self):
685 """
686 Disconnects from the cluster. Call this explicitly when a
687 Rados.connect()ed object is no longer used.
688 """
689 if self.state != "shutdown":
690 with nogil:
691 rados_shutdown(self.cluster)
692 self.state = "shutdown"
693
694 def __enter__(self):
695 self.connect()
696 return self
697
698 def __exit__(self, type_, value, traceback):
699 self.shutdown()
700 return False
701
702 def version(self):
703 """
704 Get the version number of the ``librados`` C library.
705
706 :returns: a tuple of ``(major, minor, extra)`` components of the
707 librados version
708 """
709 cdef int major = 0
710 cdef int minor = 0
711 cdef int extra = 0
712 with nogil:
713 rados_version(&major, &minor, &extra)
714 return Version(major, minor, extra)
715
716 @requires(('path', opt(str_type)))
717 def conf_read_file(self, path=None):
718 """
719 Configure the cluster handle using a Ceph config file.
720
721 :param path: path to the config file
722 :type path: str
723 """
724 self.require_state("configuring", "connected")
725 path = cstr(path, 'path', opt=True)
726 cdef:
727 char *_path = opt_str(path)
728 with nogil:
729 ret = rados_conf_read_file(self.cluster, _path)
730 if ret != 0:
731 raise make_ex(ret, "error calling conf_read_file")
732
733 def conf_parse_argv(self, args):
734 """
735 Parse known arguments from args, and remove; returned
736 args contain only those unknown to ceph
737 """
738 self.require_state("configuring", "connected")
739 if not args:
740 return
741
742 cargs = cstr_list(args, 'args')
743 cdef:
744 int _argc = len(args)
745 char **_argv = to_bytes_array(cargs)
746 char **_remargv = NULL
747
748 try:
749 _remargv = <char **>malloc(_argc * sizeof(char *))
750 with nogil:
751 ret = rados_conf_parse_argv_remainder(self.cluster, _argc,
752 <const char**>_argv,
753 <const char**>_remargv)
754 if ret:
755 raise make_ex(ret, "error calling conf_parse_argv_remainder")
756
757 # _remargv was allocated with fixed argc; collapse return
758 # list to eliminate any missing args
759 retargs = [decode_cstr(a) for a in _remargv[:_argc]
760 if a != NULL]
761 self.parsed_args = args
762 return retargs
763 finally:
764 free(_argv)
765 free(_remargv)
766
767 def conf_parse_env(self, var='CEPH_ARGS'):
768 """
769 Parse known arguments from an environment variable, normally
770 CEPH_ARGS.
771 """
772 self.require_state("configuring", "connected")
773 if not var:
774 return
775
776 var = cstr(var, 'var')
777 cdef:
778 char *_var = var
779 with nogil:
780 ret = rados_conf_parse_env(self.cluster, _var)
781 if ret != 0:
782 raise make_ex(ret, "error calling conf_parse_env")
783
784 @requires(('option', str_type))
785 def conf_get(self, option):
786 """
787 Get the value of a configuration option
788
789 :param option: which option to read
790 :type option: str
791
792 :returns: str - value of the option or None
793 :raises: :class:`TypeError`
794 """
795 self.require_state("configuring", "connected")
796 option = cstr(option, 'option')
797 cdef:
798 char *_option = option
799 size_t length = 20
800 char *ret_buf = NULL
801
802 try:
803 while True:
804 ret_buf = <char *>realloc_chk(ret_buf, length)
805 with nogil:
806 ret = rados_conf_get(self.cluster, _option, ret_buf, length)
807 if ret == 0:
808 return decode_cstr(ret_buf)
809 elif ret == -errno.ENAMETOOLONG:
810 length = length * 2
811 elif ret == -errno.ENOENT:
812 return None
813 else:
814 raise make_ex(ret, "error calling conf_get")
815 finally:
816 free(ret_buf)
817
818 @requires(('option', str_type), ('val', str_type))
819 def conf_set(self, option, val):
820 """
821 Set the value of a configuration option
822
823 :param option: which option to set
824 :type option: str
825 :param option: value of the option
826 :type option: str
827
828 :raises: :class:`TypeError`, :class:`ObjectNotFound`
829 """
830 self.require_state("configuring", "connected")
831 option = cstr(option, 'option')
832 val = cstr(val, 'val')
833 cdef:
834 char *_option = option
835 char *_val = val
836
837 with nogil:
838 ret = rados_conf_set(self.cluster, _option, _val)
839 if ret != 0:
840 raise make_ex(ret, "error calling conf_set")
841
842 def ping_monitor(self, mon_id):
843 """
844 Ping a monitor to assess liveness
845
846 May be used as a simply way to assess liveness, or to obtain
847 information about the monitor in a simple way even in the
848 absence of quorum.
849
850 :param mon_id: the ID portion of the monitor's name (i.e., mon.<ID>)
851 :type mon_id: str
852 :returns: the string reply from the monitor
853 """
854
855 self.require_state("configuring", "connected")
856
857 mon_id = cstr(mon_id, 'mon_id')
858 cdef:
859 char *_mon_id = mon_id
31f18b77 860 size_t outstrlen = 0
7c673cae
FG
861 char *outstr
862
863 with nogil:
864 ret = rados_ping_monitor(self.cluster, _mon_id, &outstr, &outstrlen)
865
866 if ret != 0:
867 raise make_ex(ret, "error calling ping_monitor")
868
869 if outstrlen:
870 my_outstr = outstr[:outstrlen]
871 rados_buffer_free(outstr)
872 return decode_cstr(my_outstr)
873
874 def connect(self, timeout=0):
875 """
876 Connect to the cluster. Use shutdown() to release resources.
877 """
878 self.require_state("configuring")
879 # NOTE(sileht): timeout was supported by old python API,
880 # but this is not something available in C API, so ignore
881 # for now and remove it later
882 with nogil:
883 ret = rados_connect(self.cluster)
884 if ret != 0:
885 raise make_ex(ret, "error connecting to the cluster")
886 self.state = "connected"
887
888 def get_cluster_stats(self):
889 """
890 Read usage info about the cluster
891
892 This tells you total space, space used, space available, and number
893 of objects. These are not updated immediately when data is written,
894 they are eventually consistent.
895
896 :returns: dict - contains the following keys:
897
898 - ``kb`` (int) - total space
899
900 - ``kb_used`` (int) - space used
901
902 - ``kb_avail`` (int) - free space available
903
904 - ``num_objects`` (int) - number of objects
905
906 """
907 cdef:
908 rados_cluster_stat_t stats
909
910 with nogil:
911 ret = rados_cluster_stat(self.cluster, &stats)
912
913 if ret < 0:
914 raise make_ex(
915 ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id)
916 return {'kb': stats.kb,
917 'kb_used': stats.kb_used,
918 'kb_avail': stats.kb_avail,
919 'num_objects': stats.num_objects}
920
921 @requires(('pool_name', str_type))
922 def pool_exists(self, pool_name):
923 """
924 Checks if a given pool exists.
925
926 :param pool_name: name of the pool to check
927 :type pool_name: str
928
929 :raises: :class:`TypeError`, :class:`Error`
930 :returns: bool - whether the pool exists, false otherwise.
931 """
932 self.require_state("connected")
933
934 pool_name = cstr(pool_name, 'pool_name')
935 cdef:
936 char *_pool_name = pool_name
937
938 with nogil:
939 ret = rados_pool_lookup(self.cluster, _pool_name)
940 if ret >= 0:
941 return True
942 elif ret == -errno.ENOENT:
943 return False
944 else:
945 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
946
947 @requires(('pool_name', str_type))
948 def pool_lookup(self, pool_name):
949 """
950 Returns a pool's ID based on its name.
951
952 :param pool_name: name of the pool to look up
953 :type pool_name: str
954
955 :raises: :class:`TypeError`, :class:`Error`
956 :returns: int - pool ID, or None if it doesn't exist
957 """
958 self.require_state("connected")
959 pool_name = cstr(pool_name, 'pool_name')
960 cdef:
961 char *_pool_name = pool_name
962
963 with nogil:
964 ret = rados_pool_lookup(self.cluster, _pool_name)
965 if ret >= 0:
966 return int(ret)
967 elif ret == -errno.ENOENT:
968 return None
969 else:
970 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
971
972 @requires(('pool_id', int))
973 def pool_reverse_lookup(self, pool_id):
974 """
975 Returns a pool's name based on its ID.
976
977 :param pool_id: ID of the pool to look up
978 :type pool_id: int
979
980 :raises: :class:`TypeError`, :class:`Error`
981 :returns: string - pool name, or None if it doesn't exist
982 """
983 self.require_state("connected")
984 cdef:
985 int64_t _pool_id = pool_id
986 size_t size = 512
987 char *name = NULL
988
989 try:
990 while True:
991 name = <char *>realloc_chk(name, size)
992 with nogil:
993 ret = rados_pool_reverse_lookup(self.cluster, _pool_id, name, size)
994 if ret >= 0:
995 break
996 elif ret != -errno.ERANGE and size <= 4096:
997 size *= 2
998 elif ret == -errno.ENOENT:
999 return None
1000 elif ret < 0:
1001 raise make_ex(ret, "error reverse looking up pool '%s'" % pool_id)
1002
1003 return decode_cstr(name)
1004
1005 finally:
1006 free(name)
1007
1008 @requires(('pool_name', str_type), ('auid', opt(int)), ('crush_rule', opt(int)))
1009 def create_pool(self, pool_name, auid=None, crush_rule=None):
1010 """
1011 Create a pool:
1012 - with default settings: if auid=None and crush_rule=None
1013 - owned by a specific auid: auid given and crush_rule=None
1014 - with a specific CRUSH rule: if auid=None and crush_rule given
1015 - with a specific CRUSH rule and auid: if auid and crush_rule given
1016
1017 :param pool_name: name of the pool to create
1018 :type pool_name: str
1019 :param auid: the id of the owner of the new pool
1020 :type auid: int
1021 :param crush_rule: rule to use for placement in the new pool
1022 :type crush_rule: int
1023
1024 :raises: :class:`TypeError`, :class:`Error`
1025 """
1026 self.require_state("connected")
1027
1028 pool_name = cstr(pool_name, 'pool_name')
1029 cdef:
1030 char *_pool_name = pool_name
1031 uint8_t _crush_rule
1032 uint64_t _auid
1033
1034 if auid is None and crush_rule is None:
1035 with nogil:
1036 ret = rados_pool_create(self.cluster, _pool_name)
1037 elif auid is None:
1038 _crush_rule = crush_rule
1039 with nogil:
1040 ret = rados_pool_create_with_crush_rule(self.cluster, _pool_name, _crush_rule)
1041 elif crush_rule is None:
1042 _auid = auid
1043 with nogil:
1044 ret = rados_pool_create_with_auid(self.cluster, _pool_name, _auid)
1045 else:
1046 _auid = auid
1047 _crush_rule = crush_rule
1048 with nogil:
1049 ret = rados_pool_create_with_all(self.cluster, _pool_name, _auid, _crush_rule)
1050 if ret < 0:
1051 raise make_ex(ret, "error creating pool '%s'" % pool_name)
1052
1053 @requires(('pool_id', int))
1054 def get_pool_base_tier(self, pool_id):
1055 """
1056 Get base pool
1057
1058 :returns: base pool, or pool_id if tiering is not configured for the pool
1059 """
1060 self.require_state("connected")
1061 cdef:
1062 int64_t base_tier = 0
1063 int64_t _pool_id = pool_id
1064
1065 with nogil:
1066 ret = rados_pool_get_base_tier(self.cluster, _pool_id, &base_tier)
1067 if ret < 0:
1068 raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id)
1069 return int(base_tier)
1070
1071 @requires(('pool_name', str_type))
1072 def delete_pool(self, pool_name):
1073 """
1074 Delete a pool and all data inside it.
1075
1076 The pool is removed from the cluster immediately,
1077 but the actual data is deleted in the background.
1078
1079 :param pool_name: name of the pool to delete
1080 :type pool_name: str
1081
1082 :raises: :class:`TypeError`, :class:`Error`
1083 """
1084 self.require_state("connected")
1085
1086 pool_name = cstr(pool_name, 'pool_name')
1087 cdef:
1088 char *_pool_name = pool_name
1089
1090 with nogil:
1091 ret = rados_pool_delete(self.cluster, _pool_name)
1092 if ret < 0:
1093 raise make_ex(ret, "error deleting pool '%s'" % pool_name)
1094
1095 @requires(('pool_id', int))
1096 def get_inconsistent_pgs(self, pool_id):
1097 """
1098 List inconsistent placement groups in the given pool
1099
1100 :param pool_id: ID of the pool in which PGs are listed
1101 :type pool_id: int
1102 :returns: list - inconsistent placement groups
1103 """
1104 self.require_state("connected")
1105 cdef:
1106 int64_t pool = pool_id
1107 size_t size = 512
1108 char *pgs = NULL
1109
1110 try:
1111 while True:
1112 pgs = <char *>realloc_chk(pgs, size);
1113 with nogil:
1114 ret = rados_inconsistent_pg_list(self.cluster, pool,
1115 pgs, size)
1116 if ret > <int>size:
1117 size *= 2
1118 elif ret >= 0:
1119 break
1120 else:
1121 raise make_ex(ret, "error calling inconsistent_pg_list")
1122 return [pg for pg in decode_cstr(pgs[:ret]).split('\0') if pg]
1123 finally:
1124 free(pgs)
1125
1126 def list_pools(self):
1127 """
1128 Gets a list of pool names.
1129
1130 :returns: list - of pool names.
1131 """
1132 self.require_state("connected")
1133 cdef:
1134 size_t size = 512
1135 char *c_names = NULL
1136
1137 try:
1138 while True:
1139 c_names = <char *>realloc_chk(c_names, size)
1140 with nogil:
1141 ret = rados_pool_list(self.cluster, c_names, size)
1142 if ret > <int>size:
1143 size *= 2
1144 elif ret >= 0:
1145 break
1146 return [name for name in decode_cstr(c_names[:ret]).split('\0')
1147 if name]
1148 finally:
1149 free(c_names)
1150
1151 def get_fsid(self):
1152 """
1153 Get the fsid of the cluster as a hexadecimal string.
1154
1155 :raises: :class:`Error`
1156 :returns: str - cluster fsid
1157 """
1158 self.require_state("connected")
1159 cdef:
1160 char *ret_buf
1161 size_t buf_len = 37
1162 PyObject* ret_s = NULL
1163
1164 ret_s = PyBytes_FromStringAndSize(NULL, buf_len)
1165 try:
1166 ret_buf = PyBytes_AsString(ret_s)
1167 with nogil:
1168 ret = rados_cluster_fsid(self.cluster, ret_buf, buf_len)
1169 if ret < 0:
1170 raise make_ex(ret, "error getting cluster fsid")
1171 if ret != <int>buf_len:
1172 _PyBytes_Resize(&ret_s, ret)
1173 return <object>ret_s
1174 finally:
1175 # We DECREF unconditionally: the cast to object above will have
1176 # INCREFed if necessary. This also takes care of exceptions,
1177 # including if _PyString_Resize fails (that will free the string
1178 # itself and set ret_s to NULL, hence XDECREF).
1179 ref.Py_XDECREF(ret_s)
1180
1181 @requires(('ioctx_name', str_type))
1182 def open_ioctx(self, ioctx_name):
1183 """
1184 Create an io context
1185
1186 The io context allows you to perform operations within a particular
1187 pool.
1188
1189 :param ioctx_name: name of the pool
1190 :type ioctx_name: str
1191
1192 :raises: :class:`TypeError`, :class:`Error`
1193 :returns: Ioctx - Rados Ioctx object
1194 """
1195 self.require_state("connected")
1196 ioctx_name = cstr(ioctx_name, 'ioctx_name')
1197 cdef:
1198 rados_ioctx_t ioctx
1199 char *_ioctx_name = ioctx_name
1200 with nogil:
1201 ret = rados_ioctx_create(self.cluster, _ioctx_name, &ioctx)
1202 if ret < 0:
1203 raise make_ex(ret, "error opening pool '%s'" % ioctx_name)
1204 io = Ioctx(ioctx_name)
1205 io.io = ioctx
1206 return io
1207
1208 def mon_command(self, cmd, inbuf, timeout=0, target=None):
1209 """
1210 mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen)
1211 returns (int ret, string outbuf, string outs)
1212 """
1213 # NOTE(sileht): timeout is ignored because C API doesn't provide
1214 # timeout argument, but we keep it for backward compat with old python binding
1215
1216 self.require_state("connected")
1217 cmd = cstr_list(cmd, 'c')
1218
1219 if isinstance(target, int):
1220 # NOTE(sileht): looks weird but test_monmap_dump pass int
1221 target = str(target)
1222
1223 target = cstr(target, 'target', opt=True)
1224 inbuf = cstr(inbuf, 'inbuf')
1225
1226 cdef:
1227 char *_target = opt_str(target)
1228 char **_cmd = to_bytes_array(cmd)
1229 size_t _cmdlen = len(cmd)
1230
1231 char *_inbuf = inbuf
1232 size_t _inbuf_len = len(inbuf)
1233
1234 char *_outbuf
1235 size_t _outbuf_len
1236 char *_outs
1237 size_t _outs_len
1238
1239 try:
1240 if target:
1241 with nogil:
1242 ret = rados_mon_command_target(self.cluster, _target,
1243 <const char **>_cmd, _cmdlen,
1244 <const char*>_inbuf, _inbuf_len,
1245 &_outbuf, &_outbuf_len,
1246 &_outs, &_outs_len)
1247 else:
1248 with nogil:
1249 ret = rados_mon_command(self.cluster,
1250 <const char **>_cmd, _cmdlen,
1251 <const char*>_inbuf, _inbuf_len,
1252 &_outbuf, &_outbuf_len,
1253 &_outs, &_outs_len)
1254
1255 my_outs = decode_cstr(_outs[:_outs_len])
1256 my_outbuf = _outbuf[:_outbuf_len]
1257 if _outs_len:
1258 rados_buffer_free(_outs)
1259 if _outbuf_len:
1260 rados_buffer_free(_outbuf)
1261 return (ret, my_outbuf, my_outs)
1262 finally:
1263 free(_cmd)
1264
1265 def osd_command(self, osdid, cmd, inbuf, timeout=0):
1266 """
1267 osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1268 returns (int ret, string outbuf, string outs)
1269 """
1270 # NOTE(sileht): timeout is ignored because C API doesn't provide
1271 # timeout argument, but we keep it for backward compat with old python binding
1272 self.require_state("connected")
1273
1274 cmd = cstr_list(cmd, 'cmd')
1275 inbuf = cstr(inbuf, 'inbuf')
1276
1277 cdef:
1278 int _osdid = osdid
1279 char **_cmd = to_bytes_array(cmd)
1280 size_t _cmdlen = len(cmd)
1281
1282 char *_inbuf = inbuf
1283 size_t _inbuf_len = len(inbuf)
1284
1285 char *_outbuf
1286 size_t _outbuf_len
1287 char *_outs
1288 size_t _outs_len
1289
1290 try:
1291 with nogil:
1292 ret = rados_osd_command(self.cluster, _osdid,
1293 <const char **>_cmd, _cmdlen,
1294 <const char*>_inbuf, _inbuf_len,
1295 &_outbuf, &_outbuf_len,
1296 &_outs, &_outs_len)
1297
1298 my_outs = decode_cstr(_outs[:_outs_len])
1299 my_outbuf = _outbuf[:_outbuf_len]
1300 if _outs_len:
1301 rados_buffer_free(_outs)
1302 if _outbuf_len:
1303 rados_buffer_free(_outbuf)
1304 return (ret, my_outbuf, my_outs)
1305 finally:
1306 free(_cmd)
1307
1308 def mgr_command(self, cmd, inbuf, timeout=0):
1309 """
1310 returns (int ret, string outbuf, string outs)
1311 """
1312 # NOTE(sileht): timeout is ignored because C API doesn't provide
1313 # timeout argument, but we keep it for backward compat with old python binding
1314 self.require_state("connected")
1315
1316 cmd = cstr_list(cmd, 'cmd')
1317 inbuf = cstr(inbuf, 'inbuf')
1318
1319 cdef:
1320 char **_cmd = to_bytes_array(cmd)
1321 size_t _cmdlen = len(cmd)
1322
1323 char *_inbuf = inbuf
1324 size_t _inbuf_len = len(inbuf)
1325
1326 char *_outbuf
1327 size_t _outbuf_len
1328 char *_outs
1329 size_t _outs_len
1330
1331 try:
1332 with nogil:
1333 ret = rados_mgr_command(self.cluster,
1334 <const char **>_cmd, _cmdlen,
1335 <const char*>_inbuf, _inbuf_len,
1336 &_outbuf, &_outbuf_len,
1337 &_outs, &_outs_len)
1338
1339 my_outs = decode_cstr(_outs[:_outs_len])
1340 my_outbuf = _outbuf[:_outbuf_len]
1341 if _outs_len:
1342 rados_buffer_free(_outs)
1343 if _outbuf_len:
1344 rados_buffer_free(_outbuf)
1345 return (ret, my_outbuf, my_outs)
1346 finally:
1347 free(_cmd)
1348
1349 def pg_command(self, pgid, cmd, inbuf, timeout=0):
1350 """
1351 pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1352 returns (int ret, string outbuf, string outs)
1353 """
1354 # NOTE(sileht): timeout is ignored because C API doesn't provide
1355 # timeout argument, but we keep it for backward compat with old python binding
1356 self.require_state("connected")
1357
1358 pgid = cstr(pgid, 'pgid')
1359 cmd = cstr_list(cmd, 'cmd')
1360 inbuf = cstr(inbuf, 'inbuf')
1361
1362 cdef:
1363 char *_pgid = pgid
1364 char **_cmd = to_bytes_array(cmd)
1365 size_t _cmdlen = len(cmd)
1366
1367 char *_inbuf = inbuf
1368 size_t _inbuf_len = len(inbuf)
1369
1370 char *_outbuf
1371 size_t _outbuf_len
1372 char *_outs
1373 size_t _outs_len
1374
1375 try:
1376 with nogil:
1377 ret = rados_pg_command(self.cluster, _pgid,
1378 <const char **>_cmd, _cmdlen,
1379 <const char *>_inbuf, _inbuf_len,
1380 &_outbuf, &_outbuf_len,
1381 &_outs, &_outs_len)
1382
1383 my_outs = decode_cstr(_outs[:_outs_len])
1384 my_outbuf = _outbuf[:_outbuf_len]
1385 if _outs_len:
1386 rados_buffer_free(_outs)
1387 if _outbuf_len:
1388 rados_buffer_free(_outbuf)
1389 return (ret, my_outbuf, my_outs)
1390 finally:
1391 free(_cmd)
1392
1393 def wait_for_latest_osdmap(self):
1394 self.require_state("connected")
1395 with nogil:
1396 ret = rados_wait_for_latest_osdmap(self.cluster)
1397 return ret
1398
1399 def blacklist_add(self, client_address, expire_seconds=0):
1400 """
1401 Blacklist a client from the OSDs
1402
1403 :param client_address: client address
1404 :type client_address: str
1405 :param expire_seconds: number of seconds to blacklist
1406 :type expire_seconds: int
1407
1408 :raises: :class:`Error`
1409 """
1410 self.require_state("connected")
1411 client_address = cstr(client_address, 'client_address')
1412 cdef:
1413 uint32_t _expire_seconds = expire_seconds
1414 char *_client_address = client_address
1415
1416 with nogil:
1417 ret = rados_blacklist_add(self.cluster, _client_address, _expire_seconds)
1418 if ret < 0:
1419 raise make_ex(ret, "error blacklisting client '%s'" % client_address)
1420
1421 def monitor_log(self, level, callback, arg):
1422 if level not in MONITOR_LEVELS:
1423 raise LogicError("invalid monitor level " + level)
1424 if callback is not None and not callable(callback):
1425 raise LogicError("callback must be a callable function or None")
1426
1427 level = cstr(level, 'level')
1428 cdef char *_level = level
1429
1430 if callback is None:
1431 with nogil:
1432 r = rados_monitor_log(self.cluster, <const char*>_level, NULL, NULL)
1433 self.monitor_callback = None
31f18b77 1434 self.monitor_callback2 = None
7c673cae
FG
1435 return
1436
1437 cb = (callback, arg)
1438 cdef PyObject* _arg = <PyObject*>cb
1439 with nogil:
1440 r = rados_monitor_log(self.cluster, <const char*>_level,
1441 <rados_log_callback_t>&__monitor_callback, _arg)
1442
1443 if r:
1444 raise make_ex(r, 'error calling rados_monitor_log')
1445 # NOTE(sileht): Prevents the callback method from being garbage collected
1446 self.monitor_callback = cb
31f18b77
FG
1447 self.monitor_callback2 = None
1448
1449 def monitor_log2(self, level, callback, arg):
1450 if level not in MONITOR_LEVELS:
1451 raise LogicError("invalid monitor level " + level)
1452 if callback is not None and not callable(callback):
1453 raise LogicError("callback must be a callable function or None")
1454
1455 level = cstr(level, 'level')
1456 cdef char *_level = level
1457
1458 if callback is None:
1459 with nogil:
1460 r = rados_monitor_log2(self.cluster, <const char*>_level, NULL, NULL)
1461 self.monitor_callback = None
1462 self.monitor_callback2 = None
1463 return
1464
1465 cb = (callback, arg)
1466 cdef PyObject* _arg = <PyObject*>cb
1467 with nogil:
1468 r = rados_monitor_log2(self.cluster, <const char*>_level,
1469 <rados_log_callback2_t>&__monitor_callback2, _arg)
1470
1471 if r:
1472 raise make_ex(r, 'error calling rados_monitor_log')
1473 # NOTE(sileht): Prevents the callback method from being garbage collected
1474 self.monitor_callback = None
1475 self.monitor_callback2 = cb
7c673cae
FG
1476
1477
1478cdef class OmapIterator(object):
1479 """Omap iterator"""
1480
1481 cdef public Ioctx ioctx
1482 cdef rados_omap_iter_t ctx
1483
1484 def __cinit__(self, Ioctx ioctx):
1485 self.ioctx = ioctx
1486
1487 def __iter__(self):
1488 return self
1489
1490 def __next__(self):
1491 """
1492 Get the next key-value pair in the object
1493 :returns: next rados.OmapItem
1494 """
1495 cdef:
1496 char *key_ = NULL
1497 char *val_ = NULL
1498 size_t len_
1499
1500 with nogil:
1501 ret = rados_omap_get_next(self.ctx, &key_, &val_, &len_)
1502
1503 if ret != 0:
1504 raise make_ex(ret, "error iterating over the omap")
1505 if key_ == NULL:
1506 raise StopIteration()
1507 key = decode_cstr(key_)
1508 val = None
1509 if val_ != NULL:
1510 val = val_[:len_]
1511 return (key, val)
1512
1513 def __dealloc__(self):
1514 with nogil:
1515 rados_omap_get_end(self.ctx)
1516
1517
1518cdef class ObjectIterator(object):
1519 """rados.Ioctx Object iterator"""
1520
1521 cdef rados_list_ctx_t ctx
1522
1523 cdef public object ioctx
1524
1525 def __cinit__(self, Ioctx ioctx):
1526 self.ioctx = ioctx
1527
1528 with nogil:
1529 ret = rados_nobjects_list_open(ioctx.io, &self.ctx)
1530 if ret < 0:
1531 raise make_ex(ret, "error iterating over the objects in ioctx '%s'"
1532 % self.ioctx.name)
1533
1534 def __iter__(self):
1535 return self
1536
1537 def __next__(self):
1538 """
1539 Get the next object name and locator in the pool
1540
1541 :raises: StopIteration
1542 :returns: next rados.Ioctx Object
1543 """
1544 cdef:
1545 const char *key_ = NULL
1546 const char *locator_ = NULL
1547 const char *nspace_ = NULL
1548
1549 with nogil:
1550 ret = rados_nobjects_list_next(self.ctx, &key_, &locator_, &nspace_)
1551
1552 if ret < 0:
1553 raise StopIteration()
1554
1555 key = decode_cstr(key_)
1556 locator = decode_cstr(locator_) if locator_ != NULL else None
1557 nspace = decode_cstr(nspace_) if nspace_ != NULL else None
1558 return Object(self.ioctx, key, locator, nspace)
1559
1560 def __dealloc__(self):
1561 with nogil:
1562 rados_nobjects_list_close(self.ctx)
1563
1564
1565cdef class XattrIterator(object):
1566 """Extended attribute iterator"""
1567
1568 cdef rados_xattrs_iter_t it
1569 cdef char* _oid
1570
1571 cdef public Ioctx ioctx
1572 cdef public object oid
1573
1574 def __cinit__(self, Ioctx ioctx, oid):
1575 self.ioctx = ioctx
1576 self.oid = cstr(oid, 'oid')
1577 self._oid = self.oid
1578
1579 with nogil:
1580 ret = rados_getxattrs(ioctx.io, self._oid, &self.it)
1581 if ret != 0:
1582 raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid)
1583
1584 def __iter__(self):
1585 return self
1586
1587 def __next__(self):
1588 """
1589 Get the next xattr on the object
1590
1591 :raises: StopIteration
1592 :returns: pair - of name and value of the next Xattr
1593 """
1594 cdef:
1595 const char *name_ = NULL
1596 const char *val_ = NULL
1597 size_t len_ = 0
1598
1599 with nogil:
1600 ret = rados_getxattrs_next(self.it, &name_, &val_, &len_)
1601 if ret != 0:
1602 raise make_ex(ret, "error iterating over the extended attributes \
1603in '%s'" % self.oid)
1604 if name_ == NULL:
1605 raise StopIteration()
1606 name = decode_cstr(name_)
1607 val = val_[:len_]
1608 return (name, val)
1609
1610 def __dealloc__(self):
1611 with nogil:
1612 rados_getxattrs_end(self.it)
1613
1614
1615cdef class SnapIterator(object):
1616 """Snapshot iterator"""
1617
1618 cdef public Ioctx ioctx
1619
1620 cdef rados_snap_t *snaps
1621 cdef int max_snap
1622 cdef int cur_snap
1623
1624 def __cinit__(self, Ioctx ioctx):
1625 self.ioctx = ioctx
1626 # We don't know how big a buffer we need until we've called the
1627 # function. So use the exponential doubling strategy.
1628 cdef int num_snaps = 10
1629 while True:
1630 self.snaps = <rados_snap_t*>realloc_chk(self.snaps,
1631 num_snaps *
1632 sizeof(rados_snap_t))
1633
1634 with nogil:
1635 ret = rados_ioctx_snap_list(ioctx.io, self.snaps, num_snaps)
1636 if ret >= 0:
1637 self.max_snap = ret
1638 break
1639 elif ret != -errno.ERANGE:
1640 raise make_ex(ret, "error calling rados_snap_list for \
1641ioctx '%s'" % self.ioctx.name)
1642 num_snaps = num_snaps * 2
1643 self.cur_snap = 0
1644
1645 def __iter__(self):
1646 return self
1647
1648 def __next__(self):
1649 """
1650 Get the next Snapshot
1651
1652 :raises: :class:`Error`, StopIteration
1653 :returns: Snap - next snapshot
1654 """
1655 if self.cur_snap >= self.max_snap:
1656 raise StopIteration
1657
1658 cdef:
1659 rados_snap_t snap_id = self.snaps[self.cur_snap]
1660 int name_len = 10
1661 char *name = NULL
1662
1663 try:
1664 while True:
1665 name = <char *>realloc_chk(name, name_len)
1666 with nogil:
1667 ret = rados_ioctx_snap_get_name(self.ioctx.io, snap_id, name, name_len)
1668 if ret == 0:
1669 break
1670 elif ret != -errno.ERANGE:
1671 raise make_ex(ret, "rados_snap_get_name error")
1672 else:
1673 name_len = name_len * 2
1674
1675 snap = Snap(self.ioctx, decode_cstr(name[:name_len]).rstrip('\0'), snap_id)
1676 self.cur_snap = self.cur_snap + 1
1677 return snap
1678 finally:
1679 free(name)
1680
1681
1682cdef class Snap(object):
1683 """Snapshot object"""
1684 cdef public Ioctx ioctx
1685 cdef public object name
1686
1687 # NOTE(sileht): old API was storing the ctypes object
1688 # instead of the value ....
1689 cdef public rados_snap_t snap_id
1690
1691 def __cinit__(self, Ioctx ioctx, object name, rados_snap_t snap_id):
1692 self.ioctx = ioctx
1693 self.name = name
1694 self.snap_id = snap_id
1695
1696 def __str__(self):
1697 return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \
1698 % (str(self.ioctx), self.name, self.snap_id)
1699
1700 def get_timestamp(self):
1701 """
1702 Find when a snapshot in the current pool occurred
1703
1704 :raises: :class:`Error`
1705 :returns: datetime - the data and time the snapshot was created
1706 """
1707 cdef time_t snap_time
1708
1709 with nogil:
1710 ret = rados_ioctx_snap_get_stamp(self.ioctx.io, self.snap_id, &snap_time)
1711 if ret != 0:
1712 raise make_ex(ret, "rados_ioctx_snap_get_stamp error")
1713 return datetime.fromtimestamp(snap_time)
1714
1715
1716cdef class Completion(object):
1717 """completion object"""
1718
1719 cdef public:
1720 Ioctx ioctx
1721 object oncomplete
1722 object onsafe
1723
1724 cdef:
1725 rados_callback_t complete_cb
1726 rados_callback_t safe_cb
1727 rados_completion_t rados_comp
1728 PyObject* buf
1729
1730 def __cinit__(self, Ioctx ioctx, object oncomplete, object onsafe):
1731 self.oncomplete = oncomplete
1732 self.onsafe = onsafe
1733 self.ioctx = ioctx
1734
1735 def is_safe(self):
1736 """
1737 Is an asynchronous operation safe?
1738
1739 This does not imply that the safe callback has finished.
1740
1741 :returns: True if the operation is safe
1742 """
1743 with nogil:
1744 ret = rados_aio_is_safe(self.rados_comp)
1745 return ret == 1
1746
1747 def is_complete(self):
1748 """
1749 Has an asynchronous operation completed?
1750
1751 This does not imply that the safe callback has finished.
1752
1753 :returns: True if the operation is completed
1754 """
1755 with nogil:
1756 ret = rados_aio_is_complete(self.rados_comp)
1757 return ret == 1
1758
1759 def wait_for_safe(self):
1760 """
1761 Wait for an asynchronous operation to be marked safe
1762
1763 This does not imply that the safe callback has finished.
1764 """
1765 with nogil:
1766 rados_aio_wait_for_safe(self.rados_comp)
1767
1768 def wait_for_complete(self):
1769 """
1770 Wait for an asynchronous operation to complete
1771
1772 This does not imply that the complete callback has finished.
1773 """
1774 with nogil:
1775 rados_aio_wait_for_complete(self.rados_comp)
1776
1777 def wait_for_safe_and_cb(self):
1778 """
1779 Wait for an asynchronous operation to be marked safe and for
1780 the safe callback to have returned
1781 """
1782 with nogil:
1783 rados_aio_wait_for_safe_and_cb(self.rados_comp)
1784
1785 def wait_for_complete_and_cb(self):
1786 """
1787 Wait for an asynchronous operation to complete and for the
1788 complete callback to have returned
1789
1790 :returns: whether the operation is completed
1791 """
1792 with nogil:
1793 ret = rados_aio_wait_for_complete_and_cb(self.rados_comp)
1794 return ret
1795
1796 def get_return_value(self):
1797 """
1798 Get the return value of an asychronous operation
1799
1800 The return value is set when the operation is complete or safe,
1801 whichever comes first.
1802
1803 :returns: int - return value of the operation
1804 """
1805 with nogil:
1806 ret = rados_aio_get_return_value(self.rados_comp)
1807 return ret
1808
1809 def __dealloc__(self):
1810 """
1811 Release a completion
1812
1813 Call this when you no longer need the completion. It may not be
1814 freed immediately if the operation is not acked and committed.
1815 """
1816 ref.Py_XDECREF(self.buf)
1817 self.buf = NULL
1818 if self.rados_comp != NULL:
1819 with nogil:
1820 rados_aio_release(self.rados_comp)
1821 self.rados_comp = NULL
1822
1823 def _complete(self):
1824 self.oncomplete(self)
1825 with self.ioctx.lock:
1826 if self.oncomplete:
1827 self.ioctx.complete_completions.remove(self)
1828
1829 def _safe(self):
1830 self.onsafe(self)
1831 with self.ioctx.lock:
1832 if self.onsafe:
1833 self.ioctx.safe_completions.remove(self)
1834
1835 def _cleanup(self):
1836 with self.ioctx.lock:
1837 if self.oncomplete:
1838 self.ioctx.complete_completions.remove(self)
1839 if self.onsafe:
1840 self.ioctx.safe_completions.remove(self)
1841
1842
1843class OpCtx(object):
1844 def __enter__(self):
1845 return self.create()
1846
1847 def __exit__(self, type, msg, traceback):
1848 self.release()
1849
1850
1851cdef class WriteOp(object):
1852 cdef rados_write_op_t write_op
1853
1854 def create(self):
1855 with nogil:
1856 self.write_op = rados_create_write_op()
1857 return self
1858
1859 def release(self):
1860 with nogil:
1861 rados_release_write_op(self.write_op)
1862
1863 @requires(('exclusive', opt(int)))
1864 def new(self, exclusive=None):
1865 """
1866 Create the object.
1867 """
1868
1869 cdef:
1870 int _exclusive = exclusive
1871
1872 with nogil:
1873 rados_write_op_create(self.write_op, _exclusive, NULL)
1874
1875
1876 def remove(self):
1877 """
1878 Remove object.
1879 """
1880 with nogil:
1881 rados_write_op_remove(self.write_op)
1882
1883 @requires(('flags', int))
1884 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
1885 """
1886 Set flags for the last operation added to this write_op.
1887 :para flags: flags to apply to the last operation
1888 :type flags: int
1889 """
1890
1891 cdef:
1892 int _flags = flags
1893
1894 with nogil:
1895 rados_write_op_set_flags(self.write_op, _flags)
1896
1897 @requires(('to_write', bytes))
1898 def append(self, to_write):
1899 """
1900 Append data to an object synchronously
1901 :param to_write: data to write
1902 :type to_write: bytes
1903 """
1904
1905 cdef:
1906 char *_to_write = to_write
1907 size_t length = len(to_write)
1908
1909 with nogil:
1910 rados_write_op_append(self.write_op, _to_write, length)
1911
1912 @requires(('to_write', bytes))
1913 def write_full(self, to_write):
1914 """
1915 Write whole object, atomically replacing it.
1916 :param to_write: data to write
1917 :type to_write: bytes
1918 """
1919
1920 cdef:
1921 char *_to_write = to_write
1922 size_t length = len(to_write)
1923
1924 with nogil:
1925 rados_write_op_write_full(self.write_op, _to_write, length)
1926
1927 @requires(('to_write', bytes), ('offset', int))
1928 def write(self, to_write, offset=0):
1929 """
1930 Write to offset.
1931 :param to_write: data to write
1932 :type to_write: bytes
1933 :param offset: byte offset in the object to begin writing at
1934 :type offset: int
1935 """
1936
1937 cdef:
1938 char *_to_write = to_write
1939 size_t length = len(to_write)
1940 uint64_t _offset = offset
1941
1942 with nogil:
1943 rados_write_op_write(self.write_op, _to_write, length, _offset)
1944
91327a77
AA
1945 @requires(('version', int))
1946 def assert_version(self, version):
1947 """
1948 Check if object's version is the expected one.
1949 :param version: expected version of the object
1950 :param type: int
1951 """
1952 cdef:
1953 uint64_t _version = version
1954
1955 with nogil:
1956 rados_write_op_assert_version(self.write_op, _version)
1957
7c673cae
FG
1958 @requires(('offset', int), ('length', int))
1959 def zero(self, offset, length):
1960 """
1961 Zero part of an object.
1962 :param offset: byte offset in the object to begin writing at
1963 :type offset: int
1964 :param offset: number of zero to write
1965 :type offset: int
1966 """
1967
1968 cdef:
1969 size_t _length = length
1970 uint64_t _offset = offset
1971
1972 with nogil:
1973 rados_write_op_zero(self.write_op, _length, _offset)
1974
1975 @requires(('offset', int))
1976 def truncate(self, offset):
1977 """
1978 Truncate an object.
1979 :param offset: byte offset in the object to begin truncating at
1980 :type offset: int
1981 """
1982
1983 cdef:
1984 uint64_t _offset = offset
1985
1986 with nogil:
1987 rados_write_op_truncate(self.write_op, _offset)
1988
1989
1990class WriteOpCtx(WriteOp, OpCtx):
1991 """write operation context manager"""
1992
1993
1994cdef class ReadOp(object):
1995 cdef rados_read_op_t read_op
1996
1997 def create(self):
1998 with nogil:
1999 self.read_op = rados_create_read_op()
2000 return self
2001
2002 def release(self):
2003 with nogil:
2004 rados_release_read_op(self.read_op)
2005
2006 @requires(('flags', int))
2007 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
2008 """
2009 Set flags for the last operation added to this read_op.
2010 :para flags: flags to apply to the last operation
2011 :type flags: int
2012 """
2013
2014 cdef:
2015 int _flags = flags
2016
2017 with nogil:
2018 rados_read_op_set_flags(self.read_op, _flags)
2019
2020
2021class ReadOpCtx(ReadOp, OpCtx):
2022 """read operation context manager"""
2023
2024
2025cdef int __aio_safe_cb(rados_completion_t completion, void *args) with gil:
2026 """
2027 Callback to onsafe() for asynchronous operations
2028 """
2029 cdef object cb = <object>args
2030 cb._safe()
2031 return 0
2032
2033
2034cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
2035 """
2036 Callback to oncomplete() for asynchronous operations
2037 """
2038 cdef object cb = <object>args
2039 cb._complete()
2040 return 0
2041
2042
2043cdef class Ioctx(object):
2044 """rados.Ioctx object"""
2045 # NOTE(sileht): attributes declared in .pyd
2046
2047 def __init__(self, name):
2048 self.name = name
2049 self.state = "open"
2050
2051 self.locator_key = ""
2052 self.nspace = ""
2053 self.lock = threading.Lock()
2054 self.safe_completions = []
2055 self.complete_completions = []
2056
2057 def __enter__(self):
2058 return self
2059
2060 def __exit__(self, type_, value, traceback):
2061 self.close()
2062 return False
2063
2064 def __dealloc__(self):
2065 self.close()
2066
2067 def __track_completion(self, completion_obj):
2068 if completion_obj.oncomplete:
2069 with self.lock:
2070 self.complete_completions.append(completion_obj)
2071 if completion_obj.onsafe:
2072 with self.lock:
2073 self.safe_completions.append(completion_obj)
2074
2075 def __get_completion(self, oncomplete, onsafe):
2076 """
2077 Constructs a completion to use with asynchronous operations
2078
2079 :param oncomplete: what to do when the write is safe and complete in memory
2080 on all replicas
2081 :type oncomplete: completion
2082 :param onsafe: what to do when the write is safe and complete on storage
2083 on all replicas
2084 :type onsafe: completion
2085
2086 :raises: :class:`Error`
2087 :returns: completion object
2088 """
2089
2090 completion_obj = Completion(self, oncomplete, onsafe)
2091
2092 cdef:
2093 rados_callback_t complete_cb = NULL
2094 rados_callback_t safe_cb = NULL
2095 rados_completion_t completion
2096 PyObject* p_completion_obj= <PyObject*>completion_obj
2097
2098 if oncomplete:
2099 complete_cb = <rados_callback_t>&__aio_complete_cb
2100 if onsafe:
2101 safe_cb = <rados_callback_t>&__aio_safe_cb
2102
2103 with nogil:
2104 ret = rados_aio_create_completion(p_completion_obj, complete_cb, safe_cb,
2105 &completion)
2106 if ret < 0:
2107 raise make_ex(ret, "error getting a completion")
2108
2109 completion_obj.rados_comp = completion
2110 return completion_obj
2111
2112 @requires(('object_name', str_type), ('oncomplete', opt(Callable)))
2113 def aio_stat(self, object_name, oncomplete):
2114 """
2115 Asynchronously get object stats (size/mtime)
2116
2117 oncomplete will be called with the returned size and mtime
2118 as well as the completion:
2119
2120 oncomplete(completion, size, mtime)
2121
2122 :param object_name: the name of the object to get stats from
2123 :type object_name: str
2124 :param oncomplete: what to do when the stat is complete
2125 :type oncomplete: completion
2126
2127 :raises: :class:`Error`
2128 :returns: completion object
2129 """
2130
2131 object_name = cstr(object_name, 'object_name')
2132
2133 cdef:
2134 Completion completion
2135 char *_object_name = object_name
2136 uint64_t psize
2137 time_t pmtime
2138
2139 def oncomplete_(completion_v):
2140 cdef Completion _completion_v = completion_v
2141 return_value = _completion_v.get_return_value()
2142 if return_value >= 0:
2143 return oncomplete(_completion_v, psize, time.localtime(pmtime))
2144 else:
2145 return oncomplete(_completion_v, None, None)
2146
2147 completion = self.__get_completion(oncomplete_, None)
2148 self.__track_completion(completion)
2149 with nogil:
2150 ret = rados_aio_stat(self.io, _object_name, completion.rados_comp,
2151 &psize, &pmtime)
2152
2153 if ret < 0:
2154 completion._cleanup()
2155 raise make_ex(ret, "error stating %s" % object_name)
2156 return completion
2157
2158 @requires(('object_name', str_type), ('to_write', bytes), ('offset', int),
2159 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2160 def aio_write(self, object_name, to_write, offset=0,
2161 oncomplete=None, onsafe=None):
2162 """
2163 Write data to an object asynchronously
2164
2165 Queues the write and returns.
2166
2167 :param object_name: name of the object
2168 :type object_name: str
2169 :param to_write: data to write
2170 :type to_write: bytes
2171 :param offset: byte offset in the object to begin writing at
2172 :type offset: int
2173 :param oncomplete: what to do when the write is safe and complete in memory
2174 on all replicas
2175 :type oncomplete: completion
2176 :param onsafe: what to do when the write is safe and complete on storage
2177 on all replicas
2178 :type onsafe: completion
2179
2180 :raises: :class:`Error`
2181 :returns: completion object
2182 """
2183
2184 object_name = cstr(object_name, 'object_name')
2185
2186 cdef:
2187 Completion completion
2188 char* _object_name = object_name
2189 char* _to_write = to_write
2190 size_t size = len(to_write)
2191 uint64_t _offset = offset
2192
2193 completion = self.__get_completion(oncomplete, onsafe)
2194 self.__track_completion(completion)
2195 with nogil:
2196 ret = rados_aio_write(self.io, _object_name, completion.rados_comp,
2197 _to_write, size, _offset)
2198 if ret < 0:
2199 completion._cleanup()
2200 raise make_ex(ret, "error writing object %s" % object_name)
2201 return completion
2202
2203 @requires(('object_name', str_type), ('to_write', bytes), ('oncomplete', opt(Callable)),
2204 ('onsafe', opt(Callable)))
2205 def aio_write_full(self, object_name, to_write,
2206 oncomplete=None, onsafe=None):
2207 """
2208 Asychronously write an entire object
2209
2210 The object is filled with the provided data. If the object exists,
2211 it is atomically truncated and then written.
2212 Queues the write and returns.
2213
2214 :param object_name: name of the object
2215 :type object_name: str
2216 :param to_write: data to write
2217 :type to_write: str
2218 :param oncomplete: what to do when the write is safe and complete in memory
2219 on all replicas
2220 :type oncomplete: completion
2221 :param onsafe: what to do when the write is safe and complete on storage
2222 on all replicas
2223 :type onsafe: completion
2224
2225 :raises: :class:`Error`
2226 :returns: completion object
2227 """
2228
2229 object_name = cstr(object_name, 'object_name')
2230
2231 cdef:
2232 Completion completion
2233 char* _object_name = object_name
2234 char* _to_write = to_write
2235 size_t size = len(to_write)
2236
2237 completion = self.__get_completion(oncomplete, onsafe)
2238 self.__track_completion(completion)
2239 with nogil:
2240 ret = rados_aio_write_full(self.io, _object_name,
2241 completion.rados_comp,
2242 _to_write, size)
2243 if ret < 0:
2244 completion._cleanup()
2245 raise make_ex(ret, "error writing object %s" % object_name)
2246 return completion
2247
2248 @requires(('object_name', str_type), ('to_append', bytes), ('oncomplete', opt(Callable)),
2249 ('onsafe', opt(Callable)))
2250 def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):
2251 """
2252 Asychronously append data to an object
2253
2254 Queues the write and returns.
2255
2256 :param object_name: name of the object
2257 :type object_name: str
2258 :param to_append: data to append
2259 :type to_append: str
2260 :param offset: byte offset in the object to begin writing at
2261 :type offset: int
2262 :param oncomplete: what to do when the write is safe and complete in memory
2263 on all replicas
2264 :type oncomplete: completion
2265 :param onsafe: what to do when the write is safe and complete on storage
2266 on all replicas
2267 :type onsafe: completion
2268
2269 :raises: :class:`Error`
2270 :returns: completion object
2271 """
2272 object_name = cstr(object_name, 'object_name')
2273
2274 cdef:
2275 Completion completion
2276 char* _object_name = object_name
2277 char* _to_append = to_append
2278 size_t size = len(to_append)
2279
2280 completion = self.__get_completion(oncomplete, onsafe)
2281 self.__track_completion(completion)
2282 with nogil:
2283 ret = rados_aio_append(self.io, _object_name,
2284 completion.rados_comp,
2285 _to_append, size)
2286 if ret < 0:
2287 completion._cleanup()
2288 raise make_ex(ret, "error appending object %s" % object_name)
2289 return completion
2290
2291 def aio_flush(self):
2292 """
2293 Block until all pending writes in an io context are safe
2294
2295 :raises: :class:`Error`
2296 """
2297 with nogil:
2298 ret = rados_aio_flush(self.io)
2299 if ret < 0:
2300 raise make_ex(ret, "error flushing")
2301
2302 @requires(('object_name', str_type), ('length', int), ('offset', int),
2303 ('oncomplete', opt(Callable)))
2304 def aio_read(self, object_name, length, offset, oncomplete):
2305 """
2306 Asychronously read data from an object
2307
2308 oncomplete will be called with the returned read value as
2309 well as the completion:
2310
2311 oncomplete(completion, data_read)
2312
2313 :param object_name: name of the object to read from
2314 :type object_name: str
2315 :param length: the number of bytes to read
2316 :type length: int
2317 :param offset: byte offset in the object to begin reading from
2318 :type offset: int
2319 :param oncomplete: what to do when the read is complete
2320 :type oncomplete: completion
2321
2322 :raises: :class:`Error`
2323 :returns: completion object
2324 """
2325
2326 object_name = cstr(object_name, 'object_name')
2327
2328 cdef:
2329 Completion completion
2330 char* _object_name = object_name
2331 uint64_t _offset = offset
2332
2333 char *ref_buf
2334 size_t _length = length
2335
2336 def oncomplete_(completion_v):
2337 cdef Completion _completion_v = completion_v
2338 return_value = _completion_v.get_return_value()
2339 if return_value > 0 and return_value != length:
2340 _PyBytes_Resize(&_completion_v.buf, return_value)
2341 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2342
2343 completion = self.__get_completion(oncomplete_, None)
2344 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2345 ret_buf = PyBytes_AsString(completion.buf)
2346 self.__track_completion(completion)
2347 with nogil:
2348 ret = rados_aio_read(self.io, _object_name, completion.rados_comp,
2349 ret_buf, _length, _offset)
2350 if ret < 0:
2351 completion._cleanup()
2352 raise make_ex(ret, "error reading %s" % object_name)
2353 return completion
2354
2355 @requires(('object_name', str_type), ('cls', str_type), ('method', str_type),
2356 ('data', bytes), ('length', int),
2357 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2358 def aio_execute(self, object_name, cls, method, data,
2359 length=8192, oncomplete=None, onsafe=None):
2360 """
2361 Asynchronously execute an OSD class method on an object.
2362
2363 oncomplete and onsafe will be called with the data returned from
2364 the plugin as well as the completion:
2365
2366 oncomplete(completion, data)
2367 onsafe(completion, data)
2368
2369 :param object_name: name of the object
2370 :type object_name: str
2371 :param cls: name of the object class
2372 :type cls: str
2373 :param method: name of the method
2374 :type method: str
2375 :param data: input data
2376 :type data: bytes
2377 :param length: size of output buffer in bytes (default=8192)
2378 :type length: int
2379 :param oncomplete: what to do when the execution is complete
2380 :type oncomplete: completion
2381 :param onsafe: what to do when the execution is safe and complete
2382 :type onsafe: completion
2383
2384 :raises: :class:`Error`
2385 :returns: completion object
2386 """
2387
2388 object_name = cstr(object_name, 'object_name')
2389 cls = cstr(cls, 'cls')
2390 method = cstr(method, 'method')
2391 cdef:
2392 Completion completion
2393 char *_object_name = object_name
2394 char *_cls = cls
2395 char *_method = method
2396 char *_data = data
2397 size_t _data_len = len(data)
2398
2399 char *ref_buf
2400 size_t _length = length
2401
2402 def oncomplete_(completion_v):
2403 cdef Completion _completion_v = completion_v
2404 return_value = _completion_v.get_return_value()
2405 if return_value > 0 and return_value != length:
2406 _PyBytes_Resize(&_completion_v.buf, return_value)
2407 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2408
2409 def onsafe_(completion_v):
2410 cdef Completion _completion_v = completion_v
2411 return_value = _completion_v.get_return_value()
2412 return onsafe(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2413
2414 completion = self.__get_completion(oncomplete_ if oncomplete else None, onsafe_ if onsafe else None)
2415 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2416 ret_buf = PyBytes_AsString(completion.buf)
2417 self.__track_completion(completion)
2418 with nogil:
2419 ret = rados_aio_exec(self.io, _object_name, completion.rados_comp,
2420 _cls, _method, _data, _data_len, ret_buf, _length)
2421 if ret < 0:
2422 completion._cleanup()
2423 raise make_ex(ret, "error executing %s::%s on %s" % (cls, method, object_name))
2424 return completion
2425
2426 @requires(('object_name', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2427 def aio_remove(self, object_name, oncomplete=None, onsafe=None):
2428 """
2429 Asychronously remove an object
2430
2431 :param object_name: name of the object to remove
2432 :type object_name: str
2433 :param oncomplete: what to do when the remove is safe and complete in memory
2434 on all replicas
2435 :type oncomplete: completion
2436 :param onsafe: what to do when the remove is safe and complete on storage
2437 on all replicas
2438 :type onsafe: completion
2439
2440 :raises: :class:`Error`
2441 :returns: completion object
2442 """
2443 object_name = cstr(object_name, 'object_name')
2444
2445 cdef:
2446 Completion completion
2447 char* _object_name = object_name
2448
2449 completion = self.__get_completion(oncomplete, onsafe)
2450 self.__track_completion(completion)
2451 with nogil:
2452 ret = rados_aio_remove(self.io, _object_name,
2453 completion.rados_comp)
2454 if ret < 0:
2455 completion._cleanup()
2456 raise make_ex(ret, "error removing %s" % object_name)
2457 return completion
2458
2459 def require_ioctx_open(self):
2460 """
2461 Checks if the rados.Ioctx object state is 'open'
2462
2463 :raises: IoctxStateError
2464 """
2465 if self.state != "open":
2466 raise IoctxStateError("The pool is %s" % self.state)
2467
2468 def change_auid(self, auid):
2469 """
2470 Attempt to change an io context's associated auid "owner."
2471
2472 Requires that you have write permission on both the current and new
2473 auid.
2474
2475 :raises: :class:`Error`
2476 """
2477 self.require_ioctx_open()
2478
2479 cdef:
2480 uint64_t _auid = auid
2481
2482 with nogil:
2483 ret = rados_ioctx_pool_set_auid(self.io, _auid)
2484 if ret < 0:
2485 raise make_ex(ret, "error changing auid of '%s' to %d"
2486 % (self.name, auid))
2487
2488 @requires(('loc_key', str_type))
2489 def set_locator_key(self, loc_key):
2490 """
2491 Set the key for mapping objects to pgs within an io context.
2492
2493 The key is used instead of the object name to determine which
2494 placement groups an object is put in. This affects all subsequent
2495 operations of the io context - until a different locator key is
2496 set, all objects in this io context will be placed in the same pg.
2497
2498 :param loc_key: the key to use as the object locator, or NULL to discard
2499 any previously set key
2500 :type loc_key: str
2501
2502 :raises: :class:`TypeError`
2503 """
2504 self.require_ioctx_open()
2505 cloc_key = cstr(loc_key, 'loc_key')
2506 cdef char *_loc_key = cloc_key
2507 with nogil:
2508 rados_ioctx_locator_set_key(self.io, _loc_key)
2509 self.locator_key = loc_key
2510
2511 def get_locator_key(self):
2512 """
2513 Get the locator_key of context
2514
2515 :returns: locator_key
2516 """
2517 return self.locator_key
2518
2519 @requires(('snap_id', long))
2520 def set_read(self, snap_id):
2521 """
2522 Set the snapshot for reading objects.
2523
2524 To stop to read from snapshot, use set_read(LIBRADOS_SNAP_HEAD)
2525
2526 :param snap_id: the snapshot Id
2527 :type snap_id: int
2528
2529 :raises: :class:`TypeError`
2530 """
2531 self.require_ioctx_open()
2532 cdef rados_snap_t _snap_id = snap_id
2533 with nogil:
2534 rados_ioctx_snap_set_read(self.io, _snap_id)
2535
2536 @requires(('nspace', str_type))
2537 def set_namespace(self, nspace):
2538 """
2539 Set the namespace for objects within an io context.
2540
2541 The namespace in addition to the object name fully identifies
2542 an object. This affects all subsequent operations of the io context
2543 - until a different namespace is set, all objects in this io context
2544 will be placed in the same namespace.
2545
2546 :param nspace: the namespace to use, or None/"" for the default namespace
2547 :type nspace: str
2548
2549 :raises: :class:`TypeError`
2550 """
2551 self.require_ioctx_open()
2552 if nspace is None:
2553 nspace = ""
2554 cnspace = cstr(nspace, 'nspace')
2555 cdef char *_nspace = cnspace
2556 with nogil:
2557 rados_ioctx_set_namespace(self.io, _nspace)
2558 self.nspace = nspace
2559
2560 def get_namespace(self):
2561 """
2562 Get the namespace of context
2563
2564 :returns: namespace
2565 """
2566 return self.nspace
2567
2568 def close(self):
2569 """
2570 Close a rados.Ioctx object.
2571
2572 This just tells librados that you no longer need to use the io context.
2573 It may not be freed immediately if there are pending asynchronous
2574 requests on it, but you should not use an io context again after
2575 calling this function on it.
2576 """
2577 if self.state == "open":
2578 self.require_ioctx_open()
2579 with nogil:
2580 rados_ioctx_destroy(self.io)
2581 self.state = "closed"
2582
2583
2584 @requires(('key', str_type), ('data', bytes))
2585 def write(self, key, data, offset=0):
2586 """
2587 Write data to an object synchronously
2588
2589 :param key: name of the object
2590 :type key: str
2591 :param data: data to write
2592 :type data: bytes
2593 :param offset: byte offset in the object to begin writing at
2594 :type offset: int
2595
2596 :raises: :class:`TypeError`
2597 :raises: :class:`LogicError`
2598 :returns: int - 0 on success
2599 """
2600 self.require_ioctx_open()
2601
2602 key = cstr(key, 'key')
2603 cdef:
2604 char *_key = key
2605 char *_data = data
2606 size_t length = len(data)
2607 uint64_t _offset = offset
2608
2609 with nogil:
2610 ret = rados_write(self.io, _key, _data, length, _offset)
2611 if ret == 0:
2612 return ret
2613 elif ret < 0:
2614 raise make_ex(ret, "Ioctx.write(%s): failed to write %s"
2615 % (self.name, key))
2616 else:
2617 raise LogicError("Ioctx.write(%s): rados_write \
2618returned %d, but should return zero on success." % (self.name, ret))
2619
2620 @requires(('key', str_type), ('data', bytes))
2621 def write_full(self, key, data):
2622 """
2623 Write an entire object synchronously.
2624
2625 The object is filled with the provided data. If the object exists,
2626 it is atomically truncated and then written.
2627
2628 :param key: name of the object
2629 :type key: str
2630 :param data: data to write
2631 :type data: bytes
2632
2633 :raises: :class:`TypeError`
2634 :raises: :class:`Error`
2635 :returns: int - 0 on success
2636 """
2637 self.require_ioctx_open()
2638 key = cstr(key, 'key')
2639 cdef:
2640 char *_key = key
2641 char *_data = data
2642 size_t length = len(data)
2643
2644 with nogil:
2645 ret = rados_write_full(self.io, _key, _data, length)
2646 if ret == 0:
2647 return ret
2648 elif ret < 0:
2649 raise make_ex(ret, "Ioctx.write_full(%s): failed to write %s"
2650 % (self.name, key))
2651 else:
2652 raise LogicError("Ioctx.write_full(%s): rados_write_full \
2653returned %d, but should return zero on success." % (self.name, ret))
2654
2655 @requires(('key', str_type), ('data', bytes))
2656 def append(self, key, data):
2657 """
2658 Append data to an object synchronously
2659
2660 :param key: name of the object
2661 :type key: str
2662 :param data: data to write
2663 :type data: bytes
2664
2665 :raises: :class:`TypeError`
2666 :raises: :class:`LogicError`
2667 :returns: int - 0 on success
2668 """
2669 self.require_ioctx_open()
2670 key = cstr(key, 'key')
2671 cdef:
2672 char *_key = key
2673 char *_data = data
2674 size_t length = len(data)
2675
2676 with nogil:
2677 ret = rados_append(self.io, _key, _data, length)
2678 if ret == 0:
2679 return ret
2680 elif ret < 0:
2681 raise make_ex(ret, "Ioctx.append(%s): failed to append %s"
2682 % (self.name, key))
2683 else:
2684 raise LogicError("Ioctx.append(%s): rados_append \
2685returned %d, but should return zero on success." % (self.name, ret))
2686
2687 @requires(('key', str_type))
2688 def read(self, key, length=8192, offset=0):
2689 """
2690 Read data from an object synchronously
2691
2692 :param key: name of the object
2693 :type key: str
2694 :param length: the number of bytes to read (default=8192)
2695 :type length: int
2696 :param offset: byte offset in the object to begin reading at
2697 :type offset: int
2698
2699 :raises: :class:`TypeError`
2700 :raises: :class:`Error`
2701 :returns: str - data read from object
2702 """
2703 self.require_ioctx_open()
2704 key = cstr(key, 'key')
2705 cdef:
2706 char *_key = key
2707 char *ret_buf
2708 uint64_t _offset = offset
2709 size_t _length = length
2710 PyObject* ret_s = NULL
2711
2712 ret_s = PyBytes_FromStringAndSize(NULL, length)
2713 try:
2714 ret_buf = PyBytes_AsString(ret_s)
2715 with nogil:
2716 ret = rados_read(self.io, _key, ret_buf, _length, _offset)
2717 if ret < 0:
2718 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2719
2720 if ret != length:
2721 _PyBytes_Resize(&ret_s, ret)
2722
2723 return <object>ret_s
2724 finally:
2725 # We DECREF unconditionally: the cast to object above will have
2726 # INCREFed if necessary. This also takes care of exceptions,
2727 # including if _PyString_Resize fails (that will free the string
2728 # itself and set ret_s to NULL, hence XDECREF).
2729 ref.Py_XDECREF(ret_s)
2730
2731 @requires(('key', str_type), ('cls', str_type), ('method', str_type), ('data', bytes))
2732 def execute(self, key, cls, method, data, length=8192):
2733 """
2734 Execute an OSD class method on an object.
2735
2736 :param key: name of the object
2737 :type key: str
2738 :param cls: name of the object class
2739 :type cls: str
2740 :param method: name of the method
2741 :type method: str
2742 :param data: input data
2743 :type data: bytes
2744 :param length: size of output buffer in bytes (default=8192)
2745 :type length: int
2746
2747 :raises: :class:`TypeError`
2748 :raises: :class:`Error`
2749 :returns: (ret, method output)
2750 """
2751 self.require_ioctx_open()
2752
2753 key = cstr(key, 'key')
2754 cls = cstr(cls, 'cls')
2755 method = cstr(method, 'method')
2756 cdef:
2757 char *_key = key
2758 char *_cls = cls
2759 char *_method = method
2760 char *_data = data
2761 size_t _data_len = len(data)
2762
2763 char *ref_buf
2764 size_t _length = length
2765 PyObject* ret_s = NULL
2766
2767 ret_s = PyBytes_FromStringAndSize(NULL, length)
2768 try:
2769 ret_buf = PyBytes_AsString(ret_s)
2770 with nogil:
2771 ret = rados_exec(self.io, _key, _cls, _method, _data,
2772 _data_len, ret_buf, _length)
2773 if ret < 0:
2774 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2775
2776 if ret != length:
2777 _PyBytes_Resize(&ret_s, ret)
2778
2779 return ret, <object>ret_s
2780 finally:
2781 # We DECREF unconditionally: the cast to object above will have
2782 # INCREFed if necessary. This also takes care of exceptions,
2783 # including if _PyString_Resize fails (that will free the string
2784 # itself and set ret_s to NULL, hence XDECREF).
2785 ref.Py_XDECREF(ret_s)
2786
2787 def get_stats(self):
2788 """
2789 Get pool usage statistics
2790
2791 :returns: dict - contains the following keys:
2792
2793 - ``num_bytes`` (int) - size of pool in bytes
2794
2795 - ``num_kb`` (int) - size of pool in kbytes
2796
2797 - ``num_objects`` (int) - number of objects in the pool
2798
2799 - ``num_object_clones`` (int) - number of object clones
2800
2801 - ``num_object_copies`` (int) - number of object copies
2802
2803 - ``num_objects_missing_on_primary`` (int) - number of objets
2804 missing on primary
2805
2806 - ``num_objects_unfound`` (int) - number of unfound objects
2807
2808 - ``num_objects_degraded`` (int) - number of degraded objects
2809
2810 - ``num_rd`` (int) - bytes read
2811
2812 - ``num_rd_kb`` (int) - kbytes read
2813
2814 - ``num_wr`` (int) - bytes written
2815
2816 - ``num_wr_kb`` (int) - kbytes written
2817 """
2818 self.require_ioctx_open()
2819 cdef rados_pool_stat_t stats
2820 with nogil:
2821 ret = rados_ioctx_pool_stat(self.io, &stats)
2822 if ret < 0:
2823 raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name)
2824 return {'num_bytes': stats.num_bytes,
2825 'num_kb': stats.num_kb,
2826 'num_objects': stats.num_objects,
2827 'num_object_clones': stats.num_object_clones,
2828 'num_object_copies': stats.num_object_copies,
2829 "num_objects_missing_on_primary": stats.num_objects_missing_on_primary,
2830 "num_objects_unfound": stats.num_objects_unfound,
2831 "num_objects_degraded": stats.num_objects_degraded,
2832 "num_rd": stats.num_rd,
2833 "num_rd_kb": stats.num_rd_kb,
2834 "num_wr": stats.num_wr,
2835 "num_wr_kb": stats.num_wr_kb}
2836
2837 @requires(('key', str_type))
2838 def remove_object(self, key):
2839 """
2840 Delete an object
2841
2842 This does not delete any snapshots of the object.
2843
2844 :param key: the name of the object to delete
2845 :type key: str
2846
2847 :raises: :class:`TypeError`
2848 :raises: :class:`Error`
2849 :returns: bool - True on success
2850 """
2851 self.require_ioctx_open()
2852 key = cstr(key, 'key')
2853 cdef:
2854 char *_key = key
2855
2856 with nogil:
2857 ret = rados_remove(self.io, _key)
2858 if ret < 0:
2859 raise make_ex(ret, "Failed to remove '%s'" % key)
2860 return True
2861
2862 @requires(('key', str_type))
2863 def trunc(self, key, size):
2864 """
2865 Resize an object
2866
2867 If this enlarges the object, the new area is logically filled with
2868 zeroes. If this shrinks the object, the excess data is removed.
2869
2870 :param key: the name of the object to resize
2871 :type key: str
2872 :param size: the new size of the object in bytes
2873 :type size: int
2874
2875 :raises: :class:`TypeError`
2876 :raises: :class:`Error`
2877 :returns: int - 0 on success, otherwise raises error
2878 """
2879
2880 self.require_ioctx_open()
2881 key = cstr(key, 'key')
2882 cdef:
2883 char *_key = key
2884 uint64_t _size = size
2885
2886 with nogil:
2887 ret = rados_trunc(self.io, _key, _size)
2888 if ret < 0:
2889 raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))
2890 return ret
2891
2892 @requires(('key', str_type))
2893 def stat(self, key):
2894 """
2895 Get object stats (size/mtime)
2896
2897 :param key: the name of the object to get stats from
2898 :type key: str
2899
2900 :raises: :class:`TypeError`
2901 :raises: :class:`Error`
2902 :returns: (size,timestamp)
2903 """
2904 self.require_ioctx_open()
2905
2906 key = cstr(key, 'key')
2907 cdef:
2908 char *_key = key
2909 uint64_t psize
2910 time_t pmtime
2911
2912 with nogil:
2913 ret = rados_stat(self.io, _key, &psize, &pmtime)
2914 if ret < 0:
2915 raise make_ex(ret, "Failed to stat %r" % key)
2916 return psize, time.localtime(pmtime)
2917
2918 @requires(('key', str_type), ('xattr_name', str_type))
2919 def get_xattr(self, key, xattr_name):
2920 """
2921 Get the value of an extended attribute on an object.
2922
2923 :param key: the name of the object to get xattr from
2924 :type key: str
2925 :param xattr_name: which extended attribute to read
2926 :type xattr_name: str
2927
2928 :raises: :class:`TypeError`
2929 :raises: :class:`Error`
2930 :returns: str - value of the xattr
2931 """
2932 self.require_ioctx_open()
2933
2934 key = cstr(key, 'key')
2935 xattr_name = cstr(xattr_name, 'xattr_name')
2936 cdef:
2937 char *_key = key
2938 char *_xattr_name = xattr_name
2939 size_t ret_length = 4096
2940 char *ret_buf = NULL
2941
2942 try:
2943 while ret_length < 4096 * 1024 * 1024:
2944 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
2945 with nogil:
2946 ret = rados_getxattr(self.io, _key, _xattr_name, ret_buf, ret_length)
2947 if ret == -errno.ERANGE:
2948 ret_length *= 2
2949 elif ret < 0:
2950 raise make_ex(ret, "Failed to get xattr %r" % xattr_name)
2951 else:
2952 break
2953 return ret_buf[:ret]
2954 finally:
2955 free(ret_buf)
2956
2957 @requires(('oid', str_type))
2958 def get_xattrs(self, oid):
2959 """
2960 Start iterating over xattrs on an object.
2961
2962 :param oid: the name of the object to get xattrs from
2963 :type oid: str
2964
2965 :raises: :class:`TypeError`
2966 :raises: :class:`Error`
2967 :returns: XattrIterator
2968 """
2969 self.require_ioctx_open()
2970 return XattrIterator(self, oid)
2971
2972 @requires(('key', str_type), ('xattr_name', str_type), ('xattr_value', bytes))
2973 def set_xattr(self, key, xattr_name, xattr_value):
2974 """
2975 Set an extended attribute on an object.
2976
2977 :param key: the name of the object to set xattr to
2978 :type key: str
2979 :param xattr_name: which extended attribute to set
2980 :type xattr_name: str
2981 :param xattr_value: the value of the extended attribute
2982 :type xattr_value: bytes
2983
2984 :raises: :class:`TypeError`
2985 :raises: :class:`Error`
2986 :returns: bool - True on success, otherwise raise an error
2987 """
2988 self.require_ioctx_open()
2989
2990 key = cstr(key, 'key')
2991 xattr_name = cstr(xattr_name, 'xattr_name')
2992 cdef:
2993 char *_key = key
2994 char *_xattr_name = xattr_name
2995 char *_xattr_value = xattr_value
2996 size_t _xattr_value_len = len(xattr_value)
2997
2998 with nogil:
2999 ret = rados_setxattr(self.io, _key, _xattr_name,
3000 _xattr_value, _xattr_value_len)
3001 if ret < 0:
3002 raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
3003 return True
3004
3005 @requires(('key', str_type), ('xattr_name', str_type))
3006 def rm_xattr(self, key, xattr_name):
3007 """
3008 Removes an extended attribute on from an object.
3009
3010 :param key: the name of the object to remove xattr from
3011 :type key: str
3012 :param xattr_name: which extended attribute to remove
3013 :type xattr_name: str
3014
3015 :raises: :class:`TypeError`
3016 :raises: :class:`Error`
3017 :returns: bool - True on success, otherwise raise an error
3018 """
3019 self.require_ioctx_open()
3020
3021 key = cstr(key, 'key')
3022 xattr_name = cstr(xattr_name, 'xattr_name')
3023 cdef:
3024 char *_key = key
3025 char *_xattr_name = xattr_name
3026
3027 with nogil:
3028 ret = rados_rmxattr(self.io, _key, _xattr_name)
3029 if ret < 0:
3030 raise make_ex(ret, "Failed to delete key %r xattr %r" %
3031 (key, xattr_name))
3032 return True
3033
3034 def list_objects(self):
3035 """
3036 Get ObjectIterator on rados.Ioctx object.
3037
3038 :returns: ObjectIterator
3039 """
3040 self.require_ioctx_open()
3041 return ObjectIterator(self)
3042
3043 def list_snaps(self):
3044 """
3045 Get SnapIterator on rados.Ioctx object.
3046
3047 :returns: SnapIterator
3048 """
3049 self.require_ioctx_open()
3050 return SnapIterator(self)
3051
3052 @requires(('snap_name', str_type))
3053 def create_snap(self, snap_name):
3054 """
3055 Create a pool-wide snapshot
3056
3057 :param snap_name: the name of the snapshot
3058 :type snap_name: str
3059
3060 :raises: :class:`TypeError`
3061 :raises: :class:`Error`
3062 """
3063 self.require_ioctx_open()
3064 snap_name = cstr(snap_name, 'snap_name')
3065 cdef char *_snap_name = snap_name
3066
3067 with nogil:
3068 ret = rados_ioctx_snap_create(self.io, _snap_name)
3069 if ret != 0:
3070 raise make_ex(ret, "Failed to create snap %s" % snap_name)
3071
3072 @requires(('snap_name', str_type))
3073 def remove_snap(self, snap_name):
3074 """
3075 Removes a pool-wide snapshot
3076
3077 :param snap_name: the name of the snapshot
3078 :type snap_name: str
3079
3080 :raises: :class:`TypeError`
3081 :raises: :class:`Error`
3082 """
3083 self.require_ioctx_open()
3084 snap_name = cstr(snap_name, 'snap_name')
3085 cdef char *_snap_name = snap_name
3086
3087 with nogil:
3088 ret = rados_ioctx_snap_remove(self.io, _snap_name)
3089 if ret != 0:
3090 raise make_ex(ret, "Failed to remove snap %s" % snap_name)
3091
3092 @requires(('snap_name', str_type))
3093 def lookup_snap(self, snap_name):
3094 """
3095 Get the id of a pool snapshot
3096
3097 :param snap_name: the name of the snapshot to lookop
3098 :type snap_name: str
3099
3100 :raises: :class:`TypeError`
3101 :raises: :class:`Error`
3102 :returns: Snap - on success
3103 """
3104 self.require_ioctx_open()
3105 csnap_name = cstr(snap_name, 'snap_name')
3106 cdef:
3107 char *_snap_name = csnap_name
3108 rados_snap_t snap_id
3109
3110 with nogil:
3111 ret = rados_ioctx_snap_lookup(self.io, _snap_name, &snap_id)
3112 if ret != 0:
3113 raise make_ex(ret, "Failed to lookup snap %s" % snap_name)
3114 return Snap(self, snap_name, int(snap_id))
3115
3116 @requires(('oid', str_type), ('snap_name', str_type))
3117 def snap_rollback(self, oid, snap_name):
3118 """
3119 Rollback an object to a snapshot
3120
3121 :param oid: the name of the object
3122 :type oid: str
3123 :param snap_name: the name of the snapshot
3124 :type snap_name: str
3125
3126 :raises: :class:`TypeError`
3127 :raises: :class:`Error`
3128 """
3129 self.require_ioctx_open()
3130 oid = cstr(oid, 'oid')
3131 snap_name = cstr(snap_name, 'snap_name')
3132 cdef:
3133 char *_snap_name = snap_name
3134 char *_oid = oid
3135
3136 with nogil:
3137 ret = rados_ioctx_snap_rollback(self.io, _oid, _snap_name)
3138 if ret != 0:
3139 raise make_ex(ret, "Failed to rollback %s" % oid)
3140
28e407b8
AA
3141 def create_self_managed_snap(self):
3142 """
3143 Creates a self-managed snapshot
3144
3145 :returns: snap id on success
3146
3147 :raises: :class:`Error`
3148 """
3149 self.require_ioctx_open()
3150 cdef:
3151 rados_snap_t _snap_id
3152 with nogil:
3153 ret = rados_ioctx_selfmanaged_snap_create(self.io, &_snap_id)
3154 if ret != 0:
3155 raise make_ex(ret, "Failed to create self-managed snapshot")
3156 return int(_snap_id)
3157
3158 @requires(('snap_id', int))
3159 def remove_self_managed_snap(self, snap_id):
3160 """
3161 Removes a self-managed snapshot
3162
3163 :param snap_id: the name of the snapshot
3164 :type snap_id: int
3165
3166 :raises: :class:`TypeError`
3167 :raises: :class:`Error`
3168 """
3169 self.require_ioctx_open()
3170 cdef:
3171 rados_snap_t _snap_id = snap_id
3172 with nogil:
3173 ret = rados_ioctx_selfmanaged_snap_remove(self.io, _snap_id)
3174 if ret != 0:
3175 raise make_ex(ret, "Failed to remove self-managed snapshot")
3176
3177 def set_self_managed_snap_write(self, snaps):
3178 """
3179 Updates the write context to the specified self-managed
3180 snapshot ids.
3181
3182 :param snaps: all associated self-managed snapshot ids
3183 :type snaps: list
3184
3185 :raises: :class:`TypeError`
3186 :raises: :class:`Error`
3187 """
3188 self.require_ioctx_open()
3189 sorted_snaps = []
3190 snap_seq = 0
3191 if snaps:
3192 sorted_snaps = sorted([int(x) for x in snaps], reverse=True)
3193 snap_seq = sorted_snaps[0]
3194
3195 cdef:
3196 rados_snap_t _snap_seq = snap_seq
3197 rados_snap_t *_snaps = NULL
3198 int _num_snaps = len(sorted_snaps)
3199 try:
3200 _snaps = <rados_snap_t *>malloc(_num_snaps * sizeof(rados_snap_t))
3201 for i in range(len(sorted_snaps)):
3202 _snaps[i] = sorted_snaps[i]
3203 with nogil:
3204 ret = rados_ioctx_selfmanaged_snap_set_write_ctx(self.io,
3205 _snap_seq,
3206 _snaps,
3207 _num_snaps)
3208 if ret != 0:
3209 raise make_ex(ret, "Failed to update snapshot write context")
3210 finally:
3211 free(_snaps)
3212
3213 @requires(('oid', str_type), ('snap_id', int))
3214 def rollback_self_managed_snap(self, oid, snap_id):
3215 """
3216 Rolls an specific object back to a self-managed snapshot revision
3217
3218 :param oid: the name of the object
3219 :type oid: str
3220 :param snap_id: the name of the snapshot
3221 :type snap_id: int
3222
3223 :raises: :class:`TypeError`
3224 :raises: :class:`Error`
3225 """
3226 self.require_ioctx_open()
3227 oid = cstr(oid, 'oid')
3228 cdef:
3229 char *_oid = oid
3230 rados_snap_t _snap_id = snap_id
3231 with nogil:
3232 ret = rados_ioctx_selfmanaged_snap_rollback(self.io, _oid, _snap_id)
3233 if ret != 0:
3234 raise make_ex(ret, "Failed to rollback %s" % oid)
3235
7c673cae
FG
3236 def get_last_version(self):
3237 """
3238 Return the version of the last object read or written to.
3239
3240 This exposes the internal version number of the last object read or
3241 written via this io context
3242
3243 :returns: version of the last object used
3244 """
3245 self.require_ioctx_open()
3246 with nogil:
3247 ret = rados_get_last_version(self.io)
3248 return int(ret)
3249
3250 def create_write_op(self):
3251 """
3252 create write operation object.
3253 need call release_write_op after use
3254 """
3255 return WriteOp().create()
3256
3257 def create_read_op(self):
3258 """
3259 create read operation object.
3260 need call release_read_op after use
3261 """
3262 return ReadOp().create()
3263
3264 def release_write_op(self, write_op):
3265 """
3266 release memory alloc by create_write_op
3267 """
3268 write_op.release()
3269
3270 def release_read_op(self, read_op):
3271 """
3272 release memory alloc by create_read_op
3273 :para read_op: read_op object
3274 :type: int
3275 """
3276 read_op.release()
3277
3278 @requires(('write_op', WriteOp), ('keys', tuple), ('values', tuple))
3279 def set_omap(self, write_op, keys, values):
3280 """
3281 set keys values to write_op
3282 :para write_op: write_operation object
3283 :type write_op: WriteOp
3284 :para keys: a tuple of keys
3285 :type keys: tuple
3286 :para values: a tuple of values
3287 :type values: tuple
3288 """
3289
3290 if len(keys) != len(values):
3291 raise Error("Rados(): keys and values must have the same number of items")
3292
3293 keys = cstr_list(keys, 'keys')
3294 cdef:
3295 WriteOp _write_op = write_op
3296 size_t key_num = len(keys)
3297 char **_keys = to_bytes_array(keys)
3298 char **_values = to_bytes_array(values)
3299 size_t *_lens = to_csize_t_array([len(v) for v in values])
3300
3301 try:
3302 with nogil:
3303 rados_write_op_omap_set(_write_op.write_op,
3304 <const char**>_keys,
3305 <const char**>_values,
3306 <const size_t*>_lens, key_num)
3307 finally:
3308 free(_keys)
3309 free(_values)
3310 free(_lens)
3311
3312 @requires(('write_op', WriteOp), ('oid', str_type), ('mtime', opt(int)), ('flags', opt(int)))
3313 def operate_write_op(self, write_op, oid, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3314 """
3315 excute the real write operation
3316 :para write_op: write operation object
3317 :type write_op: WriteOp
3318 :para oid: object name
3319 :type oid: str
3320 :para mtime: the time to set the mtime to, 0 for the current time
3321 :type mtime: int
3322 :para flags: flags to apply to the entire operation
3323 :type flags: int
3324 """
3325
3326 oid = cstr(oid, 'oid')
3327 cdef:
3328 WriteOp _write_op = write_op
3329 char *_oid = oid
3330 time_t _mtime = mtime
3331 int _flags = flags
3332
3333 with nogil:
3334 ret = rados_write_op_operate(_write_op.write_op, self.io, _oid, &_mtime, _flags)
3335 if ret != 0:
3336 raise make_ex(ret, "Failed to operate write op for oid %s" % oid)
3337
3338 @requires(('write_op', WriteOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('mtime', opt(int)), ('flags', opt(int)))
3339 def operate_aio_write_op(self, write_op, oid, oncomplete=None, onsafe=None, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3340 """
3341 excute the real write operation asynchronously
3342 :para write_op: write operation object
3343 :type write_op: WriteOp
3344 :para oid: object name
3345 :type oid: str
3346 :param oncomplete: what to do when the remove is safe and complete in memory
3347 on all replicas
3348 :type oncomplete: completion
3349 :param onsafe: what to do when the remove is safe and complete on storage
3350 on all replicas
3351 :type onsafe: completion
3352 :para mtime: the time to set the mtime to, 0 for the current time
3353 :type mtime: int
3354 :para flags: flags to apply to the entire operation
3355 :type flags: int
3356
3357 :raises: :class:`Error`
3358 :returns: completion object
3359 """
3360
3361 oid = cstr(oid, 'oid')
3362 cdef:
3363 WriteOp _write_op = write_op
3364 char *_oid = oid
3365 Completion completion
3366 time_t _mtime = mtime
3367 int _flags = flags
3368
3369 completion = self.__get_completion(oncomplete, onsafe)
3370 self.__track_completion(completion)
3371
3372 with nogil:
3373 ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid,
3374 &_mtime, _flags)
3375 if ret != 0:
3376 completion._cleanup()
3377 raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid)
3378 return completion
3379
3380 @requires(('read_op', ReadOp), ('oid', str_type), ('flag', opt(int)))
3381 def operate_read_op(self, read_op, oid, flag=LIBRADOS_OPERATION_NOFLAG):
3382 """
3383 excute the real read operation
3384 :para read_op: read operation object
3385 :type read_op: ReadOp
3386 :para oid: object name
3387 :type oid: str
3388 :para flag: flags to apply to the entire operation
3389 :type flag: int
3390 """
3391 oid = cstr(oid, 'oid')
3392 cdef:
3393 ReadOp _read_op = read_op
3394 char *_oid = oid
3395 int _flag = flag
3396
3397 with nogil:
3398 ret = rados_read_op_operate(_read_op.read_op, self.io, _oid, _flag)
3399 if ret != 0:
3400 raise make_ex(ret, "Failed to operate read op for oid %s" % oid)
3401
3402 @requires(('read_op', ReadOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('flag', opt(int)))
3403 def operate_aio_read_op(self, read_op, oid, oncomplete=None, onsafe=None, flag=LIBRADOS_OPERATION_NOFLAG):
3404 """
3405 excute the real read operation
3406 :para read_op: read operation object
3407 :type read_op: ReadOp
3408 :para oid: object name
3409 :type oid: str
3410 :param oncomplete: what to do when the remove is safe and complete in memory
3411 on all replicas
3412 :type oncomplete: completion
3413 :param onsafe: what to do when the remove is safe and complete on storage
3414 on all replicas
3415 :type onsafe: completion
3416 :para flag: flags to apply to the entire operation
3417 :type flag: int
3418 """
3419 oid = cstr(oid, 'oid')
3420 cdef:
3421 ReadOp _read_op = read_op
3422 char *_oid = oid
3423 Completion completion
3424 int _flag = flag
3425
3426 completion = self.__get_completion(oncomplete, onsafe)
3427 self.__track_completion(completion)
3428
3429 with nogil:
3430 ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag)
3431 if ret != 0:
3432 completion._cleanup()
3433 raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid)
3434 return completion
3435
3436 @requires(('read_op', ReadOp), ('start_after', str_type), ('filter_prefix', str_type), ('max_return', int))
3437 def get_omap_vals(self, read_op, start_after, filter_prefix, max_return):
3438 """
3439 get the omap values
3440 :para read_op: read operation object
3441 :type read_op: ReadOp
3442 :para start_after: list keys starting after start_after
3443 :type start_after: str
3444 :para filter_prefix: list only keys beginning with filter_prefix
3445 :type filter_prefix: str
3446 :para max_return: list no more than max_return key/value pairs
3447 :type max_return: int
3448 :returns: an iterator over the requested omap values, return value from this action
3449 """
3450
3451 start_after = cstr(start_after, 'start_after') if start_after else None
3452 filter_prefix = cstr(filter_prefix, 'filter_prefix') if filter_prefix else None
3453 cdef:
3454 char *_start_after = opt_str(start_after)
3455 char *_filter_prefix = opt_str(filter_prefix)
3456 ReadOp _read_op = read_op
3457 rados_omap_iter_t iter_addr = NULL
3458 int _max_return = max_return
7c673cae
FG
3459
3460 with nogil:
d2e6a577 3461 rados_read_op_omap_get_vals2(_read_op.read_op, _start_after, _filter_prefix,
91327a77 3462 _max_return, &iter_addr, NULL, NULL)
7c673cae
FG
3463 it = OmapIterator(self)
3464 it.ctx = iter_addr
91327a77 3465 return it, 0 # 0 is meaningless; there for backward-compat
7c673cae
FG
3466
3467 @requires(('read_op', ReadOp), ('start_after', str_type), ('max_return', int))
3468 def get_omap_keys(self, read_op, start_after, max_return):
3469 """
3470 get the omap keys
3471 :para read_op: read operation object
3472 :type read_op: ReadOp
3473 :para start_after: list keys starting after start_after
3474 :type start_after: str
3475 :para max_return: list no more than max_return key/value pairs
3476 :type max_return: int
3477 :returns: an iterator over the requested omap values, return value from this action
3478 """
3479 start_after = cstr(start_after, 'start_after') if start_after else None
3480 cdef:
3481 char *_start_after = opt_str(start_after)
3482 ReadOp _read_op = read_op
3483 rados_omap_iter_t iter_addr = NULL
3484 int _max_return = max_return
7c673cae
FG
3485
3486 with nogil:
d2e6a577 3487 rados_read_op_omap_get_keys2(_read_op.read_op, _start_after,
91327a77 3488 _max_return, &iter_addr, NULL, NULL)
7c673cae
FG
3489 it = OmapIterator(self)
3490 it.ctx = iter_addr
91327a77 3491 return it, 0 # 0 is meaningless; there for backward-compat
7c673cae
FG
3492
3493 @requires(('read_op', ReadOp), ('keys', tuple))
3494 def get_omap_vals_by_keys(self, read_op, keys):
3495 """
3496 get the omap values by keys
3497 :para read_op: read operation object
3498 :type read_op: ReadOp
3499 :para keys: input key tuple
3500 :type keys: tuple
3501 :returns: an iterator over the requested omap values, return value from this action
3502 """
3503 keys = cstr_list(keys, 'keys')
3504 cdef:
3505 ReadOp _read_op = read_op
3506 rados_omap_iter_t iter_addr
3507 char **_keys = to_bytes_array(keys)
3508 size_t key_num = len(keys)
7c673cae
FG
3509
3510 try:
3511 with nogil:
3512 rados_read_op_omap_get_vals_by_keys(_read_op.read_op,
3513 <const char**>_keys,
91327a77 3514 key_num, &iter_addr, NULL)
7c673cae
FG
3515 it = OmapIterator(self)
3516 it.ctx = iter_addr
91327a77 3517 return it, 0 # 0 is meaningless; there for backward-compat
7c673cae
FG
3518 finally:
3519 free(_keys)
3520
3521 @requires(('write_op', WriteOp), ('keys', tuple))
3522 def remove_omap_keys(self, write_op, keys):
3523 """
3524 remove omap keys specifiled
3525 :para write_op: write operation object
3526 :type write_op: WriteOp
3527 :para keys: input key tuple
3528 :type keys: tuple
3529 """
3530
3531 keys = cstr_list(keys, 'keys')
3532 cdef:
3533 WriteOp _write_op = write_op
3534 size_t key_num = len(keys)
3535 char **_keys = to_bytes_array(keys)
3536
3537 try:
3538 with nogil:
3539 rados_write_op_omap_rm_keys(_write_op.write_op, <const char**>_keys, key_num)
3540 finally:
3541 free(_keys)
3542
3543 @requires(('write_op', WriteOp))
3544 def clear_omap(self, write_op):
3545 """
3546 Remove all key/value pairs from an object
3547 :para write_op: write operation object
3548 :type write_op: WriteOp
3549 """
3550
3551 cdef:
3552 WriteOp _write_op = write_op
3553
3554 with nogil:
3555 rados_write_op_omap_clear(_write_op.write_op)
3556
3557 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('desc', str_type),
3558 ('duration', opt(int)), ('flags', int))
3559 def lock_exclusive(self, key, name, cookie, desc="", duration=None, flags=0):
3560
3561 """
3562 Take an exclusive lock on an object
3563
3564 :param key: name of the object
3565 :type key: str
3566 :param name: name of the lock
3567 :type name: str
3568 :param cookie: cookie of the lock
3569 :type cookie: str
3570 :param desc: description of the lock
3571 :type desc: str
3572 :param duration: duration of the lock in seconds
3573 :type duration: int
3574 :param flags: flags
3575 :type flags: int
3576
3577 :raises: :class:`TypeError`
3578 :raises: :class:`Error`
3579 """
3580 self.require_ioctx_open()
3581
3582 key = cstr(key, 'key')
3583 name = cstr(name, 'name')
3584 cookie = cstr(cookie, 'cookie')
3585 desc = cstr(desc, 'desc')
3586
3587 cdef:
3588 char* _key = key
3589 char* _name = name
3590 char* _cookie = cookie
3591 char* _desc = desc
3592 uint8_t _flags = flags
3593 timeval _duration
3594
3595 if duration is None:
3596 with nogil:
3597 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3598 NULL, _flags)
3599 else:
3600 _duration.tv_sec = duration
3601 with nogil:
3602 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3603 &_duration, _flags)
3604
3605 if ret < 0:
3606 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3607
3608 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('tag', str_type),
3609 ('desc', str_type), ('duration', opt(int)), ('flags', int))
3610 def lock_shared(self, key, name, cookie, tag, desc="", duration=None, flags=0):
3611
3612 """
3613 Take a shared lock on an object
3614
3615 :param key: name of the object
3616 :type key: str
3617 :param name: name of the lock
3618 :type name: str
3619 :param cookie: cookie of the lock
3620 :type cookie: str
3621 :param tag: tag of the lock
3622 :type tag: str
3623 :param desc: description of the lock
3624 :type desc: str
3625 :param duration: duration of the lock in seconds
3626 :type duration: int
3627 :param flags: flags
3628 :type flags: int
3629
3630 :raises: :class:`TypeError`
3631 :raises: :class:`Error`
3632 """
3633 self.require_ioctx_open()
3634
3635 key = cstr(key, 'key')
3636 tag = cstr(tag, 'tag')
3637 name = cstr(name, 'name')
3638 cookie = cstr(cookie, 'cookie')
3639 desc = cstr(desc, 'desc')
3640
3641 cdef:
3642 char* _key = key
3643 char* _tag = tag
3644 char* _name = name
3645 char* _cookie = cookie
3646 char* _desc = desc
3647 uint8_t _flags = flags
3648 timeval _duration
3649
3650 if duration is None:
3651 with nogil:
3652 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3653 NULL, _flags)
3654 else:
3655 _duration.tv_sec = duration
3656 with nogil:
3657 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3658 &_duration, _flags)
3659 if ret < 0:
3660 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3661
3662 @requires(('key', str_type), ('name', str_type), ('cookie', str_type))
3663 def unlock(self, key, name, cookie):
3664
3665 """
3666 Release a shared or exclusive lock on an object
3667
3668 :param key: name of the object
3669 :type key: str
3670 :param name: name of the lock
3671 :type name: str
3672 :param cookie: cookie of the lock
3673 :type cookie: str
3674
3675 :raises: :class:`TypeError`
3676 :raises: :class:`Error`
3677 """
3678 self.require_ioctx_open()
3679
3680 key = cstr(key, 'key')
3681 name = cstr(name, 'name')
3682 cookie = cstr(cookie, 'cookie')
3683
3684 cdef:
3685 char* _key = key
3686 char* _name = name
3687 char* _cookie = cookie
3688
3689 with nogil:
3690 ret = rados_unlock(self.io, _key, _name, _cookie)
3691 if ret < 0:
3692 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3693
c07f9fc5
FG
3694 def application_enable(self, app_name, force=False):
3695 """
3696 Enable an application on an OSD pool
3697
3698 :param app_name: application name
3699 :type app_name: str
3700 :param force: False if only a single app should exist per pool
3701 :type expire_seconds: boool
3702
3703 :raises: :class:`Error`
3704 """
3705 app_name = cstr(app_name, 'app_name')
3706 cdef:
3707 char *_app_name = app_name
3708 int _force = (1 if force else 0)
3709
3710 with nogil:
3711 ret = rados_application_enable(self.io, _app_name, _force)
3712 if ret < 0:
3713 raise make_ex(ret, "error enabling application")
3714
3715 def application_list(self):
3716 """
3717 Returns a list of enabled applications
3718
3719 :returns: list of app name string
3720 """
3721 cdef:
3722 size_t length = 128
3723 char *apps = NULL
3724
3725 try:
3726 while True:
3727 apps = <char *>realloc_chk(apps, length)
3728 with nogil:
3729 ret = rados_application_list(self.io, apps, &length)
3730 if ret == 0:
3731 return [decode_cstr(app) for app in
3732 apps[:length].split(b'\0') if app]
3733 elif ret == -errno.ENOENT:
3734 return None
3735 elif ret == -errno.ERANGE:
3736 pass
3737 else:
3738 raise make_ex(ret, "error listing applications")
3739 finally:
3740 free(apps)
3741
3742 def application_metadata_set(self, app_name, key, value):
3743 """
3744 Sets application metadata on an OSD pool
3745
3746 :param app_name: application name
3747 :type app_name: str
3748 :param key: metadata key
3749 :type key: str
3750 :param value: metadata value
3751 :type value: str
3752
3753 :raises: :class:`Error`
3754 """
3755 app_name = cstr(app_name, 'app_name')
3756 key = cstr(key, 'key')
3757 value = cstr(value, 'value')
3758 cdef:
3759 char *_app_name = app_name
3760 char *_key = key
3761 char *_value = value
3762
3763 with nogil:
3764 ret = rados_application_metadata_set(self.io, _app_name, _key,
3765 _value)
3766 if ret < 0:
3767 raise make_ex(ret, "error setting application metadata")
3768
3769 def application_metadata_remove(self, app_name, key):
3770 """
3771 Remove application metadata from an OSD pool
3772
3773 :param app_name: application name
3774 :type app_name: str
3775 :param key: metadata key
3776 :type key: str
3777
3778 :raises: :class:`Error`
3779 """
3780 app_name = cstr(app_name, 'app_name')
3781 key = cstr(key, 'key')
3782 cdef:
3783 char *_app_name = app_name
3784 char *_key = key
3785
3786 with nogil:
3787 ret = rados_application_metadata_remove(self.io, _app_name, _key)
3788 if ret < 0:
3789 raise make_ex(ret, "error removing application metadata")
3790
3791 def application_metadata_list(self, app_name):
3792 """
3793 Returns a list of enabled applications
3794
3795 :param app_name: application name
3796 :type app_name: str
3797 :returns: list of key/value tuples
3798 """
3799 app_name = cstr(app_name, 'app_name')
3800 cdef:
3801 char *_app_name = app_name
3802 size_t key_length = 128
3803 size_t val_length = 128
3804 char *c_keys = NULL
3805 char *c_vals = NULL
3806
3807 try:
3808 while True:
3809 c_keys = <char *>realloc_chk(c_keys, key_length)
3810 c_vals = <char *>realloc_chk(c_vals, val_length)
3811 with nogil:
3812 ret = rados_application_metadata_list(self.io, _app_name,
3813 c_keys, &key_length,
3814 c_vals, &val_length)
3815 if ret == 0:
3816 keys = [decode_cstr(key) for key in
3817 c_keys[:key_length].split(b'\0') if key]
3818 vals = [decode_cstr(val) for val in
3819 c_vals[:val_length].split(b'\0') if val]
3820 return zip(keys, vals)
3821 elif ret == -errno.ERANGE:
3822 pass
3823 else:
3824 raise make_ex(ret, "error listing application metadata")
3825 finally:
3826 free(c_keys)
3827 free(c_vals)
3828
7c673cae
FG
3829
3830def set_object_locator(func):
3831 def retfunc(self, *args, **kwargs):
3832 if self.locator_key is not None:
3833 old_locator = self.ioctx.get_locator_key()
3834 self.ioctx.set_locator_key(self.locator_key)
3835 retval = func(self, *args, **kwargs)
3836 self.ioctx.set_locator_key(old_locator)
3837 return retval
3838 else:
3839 return func(self, *args, **kwargs)
3840 return retfunc
3841
3842
3843def set_object_namespace(func):
3844 def retfunc(self, *args, **kwargs):
3845 if self.nspace is None:
3846 raise LogicError("Namespace not set properly in context")
3847 old_nspace = self.ioctx.get_namespace()
3848 self.ioctx.set_namespace(self.nspace)
3849 retval = func(self, *args, **kwargs)
3850 self.ioctx.set_namespace(old_nspace)
3851 return retval
3852 return retfunc
3853
3854
3855class Object(object):
3856 """Rados object wrapper, makes the object look like a file"""
3857 def __init__(self, ioctx, key, locator_key=None, nspace=None):
3858 self.key = key
3859 self.ioctx = ioctx
3860 self.offset = 0
3861 self.state = "exists"
3862 self.locator_key = locator_key
3863 self.nspace = "" if nspace is None else nspace
3864
3865 def __str__(self):
3866 return "rados.Object(ioctx=%s,key=%s,nspace=%s,locator=%s)" % \
3867 (str(self.ioctx), self.key, "--default--"
3868 if self.nspace is "" else self.nspace, self.locator_key)
3869
3870 def require_object_exists(self):
3871 if self.state != "exists":
3872 raise ObjectStateError("The object is %s" % self.state)
3873
3874 @set_object_locator
3875 @set_object_namespace
3876 def read(self, length=1024 * 1024):
3877 self.require_object_exists()
3878 ret = self.ioctx.read(self.key, length, self.offset)
3879 self.offset += len(ret)
3880 return ret
3881
3882 @set_object_locator
3883 @set_object_namespace
3884 def write(self, string_to_write):
3885 self.require_object_exists()
3886 ret = self.ioctx.write(self.key, string_to_write, self.offset)
3887 if ret == 0:
3888 self.offset += len(string_to_write)
3889 return ret
3890
3891 @set_object_locator
3892 @set_object_namespace
3893 def remove(self):
3894 self.require_object_exists()
3895 self.ioctx.remove_object(self.key)
3896 self.state = "removed"
3897
3898 @set_object_locator
3899 @set_object_namespace
3900 def stat(self):
3901 self.require_object_exists()
3902 return self.ioctx.stat(self.key)
3903
3904 def seek(self, position):
3905 self.require_object_exists()
3906 self.offset = position
3907
3908 @set_object_locator
3909 @set_object_namespace
3910 def get_xattr(self, xattr_name):
3911 self.require_object_exists()
3912 return self.ioctx.get_xattr(self.key, xattr_name)
3913
3914 @set_object_locator
3915 @set_object_namespace
3916 def get_xattrs(self):
3917 self.require_object_exists()
3918 return self.ioctx.get_xattrs(self.key)
3919
3920 @set_object_locator
3921 @set_object_namespace
3922 def set_xattr(self, xattr_name, xattr_value):
3923 self.require_object_exists()
3924 return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)
3925
3926 @set_object_locator
3927 @set_object_namespace
3928 def rm_xattr(self, xattr_name):
3929 self.require_object_exists()
3930 return self.ioctx.rm_xattr(self.key, xattr_name)
3931
3932MONITOR_LEVELS = [
3933 "debug",
3934 "info",
3935 "warn", "warning",
3936 "err", "error",
3937 "sec",
3938 ]
3939
3940
3941class MonitorLog(object):
3942 # NOTE(sileht): Keep this class for backward compat
3943 # method moved to Rados.monitor_log()
3944 """
3945 For watching cluster log messages. Instantiate an object and keep
3946 it around while callback is periodically called. Construct with
3947 'level' to monitor 'level' messages (one of MONITOR_LEVELS).
3948 arg will be passed to the callback.
3949
3950 callback will be called with:
3951 arg (given to __init__)
3952 line (the full line, including timestamp, who, level, msg)
3953 who (which entity issued the log message)
3954 timestamp_sec (sec of a struct timespec)
3955 timestamp_nsec (sec of a struct timespec)
3956 seq (sequence number)
3957 level (string representing the level of the log message)
3958 msg (the message itself)
3959 callback's return value is ignored
3960 """
3961 def __init__(self, cluster, level, callback, arg):
3962 self.level = level
3963 self.callback = callback
3964 self.arg = arg
3965 self.cluster = cluster
3966 self.cluster.monitor_log(level, callback, arg)
3967