]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/rados/rados.pyx
buildsys: auto-determine current version for makefile
[ceph.git] / ceph / src / pybind / rados / rados.pyx
CommitLineData
7c673cae
FG
1# cython: embedsignature=True
2"""
3This module is a thin wrapper around librados.
4
5Error codes from librados are turned into exceptions that subclass
6:class:`Error`. Almost all methods may raise :class:`Error(the base class of all rados exceptions), :class:`PermissionError`
7(the base class of all rados exceptions), :class:`PermissionError`
8and :class:`IOError`, in addition to those documented for the
9method.
10"""
11# Copyright 2011 Josh Durgin
12# Copyright 2011, Hannu Valtonen <hannu.valtonen@ormod.com>
13# Copyright 2015 Hector Martin <marcan@marcan.st>
14# Copyright 2016 Mehdi Abaakouk <sileht@redhat.com>
15
16from cpython cimport PyObject, ref
17from cpython.pycapsule cimport *
18from libc cimport errno
19from libc.stdint cimport *
20from libc.stdlib cimport malloc, realloc, free
21
22import sys
23import threading
24import time
25
11fdf7f2
TL
26try:
27 from collections.abc import Callable
28except ImportError:
29 from collections import Callable
7c673cae
FG
30from datetime import datetime
31from functools import partial, wraps
32from itertools import chain
33
34# Are we running Python 2.x
35if sys.version_info[0] < 3:
36 str_type = basestring
37else:
38 str_type = str
39
40
41cdef extern from "Python.h":
42 # These are in cpython/string.pxd, but use "object" types instead of
43 # PyObject*, which invokes assumptions in cpython that we need to
44 # legitimately break to implement zero-copy string buffers in Ioctx.read().
45 # This is valid use of the Python API and documented as a special case.
46 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
47 char* PyBytes_AsString(PyObject *string) except NULL
48 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
49 void PyEval_InitThreads()
50
51
52cdef extern from "time.h":
53 ctypedef long int time_t
54 ctypedef long int suseconds_t
55
56
57cdef extern from "sys/time.h":
58 cdef struct timeval:
59 time_t tv_sec
60 suseconds_t tv_usec
61
62
63cdef extern from "rados/rados_types.h" nogil:
64 cdef char* _LIBRADOS_ALL_NSPACES "LIBRADOS_ALL_NSPACES"
65
66
67cdef extern from "rados/librados.h" nogil:
68 enum:
69 _LIBRADOS_OP_FLAG_EXCL "LIBRADOS_OP_FLAG_EXCL"
70 _LIBRADOS_OP_FLAG_FAILOK "LIBRADOS_OP_FLAG_FAILOK"
71 _LIBRADOS_OP_FLAG_FADVISE_RANDOM "LIBRADOS_OP_FLAG_FADVISE_RANDOM"
72 _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL "LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL"
73 _LIBRADOS_OP_FLAG_FADVISE_WILLNEED "LIBRADOS_OP_FLAG_FADVISE_WILLNEED"
74 _LIBRADOS_OP_FLAG_FADVISE_DONTNEED "LIBRADOS_OP_FLAG_FADVISE_DONTNEED"
75 _LIBRADOS_OP_FLAG_FADVISE_NOCACHE "LIBRADOS_OP_FLAG_FADVISE_NOCACHE"
76
77
78 enum:
79 _LIBRADOS_OPERATION_NOFLAG "LIBRADOS_OPERATION_NOFLAG"
80 _LIBRADOS_OPERATION_BALANCE_READS "LIBRADOS_OPERATION_BALANCE_READS"
81 _LIBRADOS_OPERATION_LOCALIZE_READS "LIBRADOS_OPERATION_LOCALIZE_READS"
82 _LIBRADOS_OPERATION_ORDER_READS_WRITES "LIBRADOS_OPERATION_ORDER_READS_WRITES"
83 _LIBRADOS_OPERATION_IGNORE_CACHE "LIBRADOS_OPERATION_IGNORE_CACHE"
84 _LIBRADOS_OPERATION_SKIPRWLOCKS "LIBRADOS_OPERATION_SKIPRWLOCKS"
85 _LIBRADOS_OPERATION_IGNORE_OVERLAY "LIBRADOS_OPERATION_IGNORE_OVERLAY"
86 _LIBRADOS_CREATE_EXCLUSIVE "LIBRADOS_CREATE_EXCLUSIVE"
87 _LIBRADOS_CREATE_IDEMPOTENT "LIBRADOS_CREATE_IDEMPOTENT"
88
89 cdef uint64_t _LIBRADOS_SNAP_HEAD "LIBRADOS_SNAP_HEAD"
90
7c673cae
FG
91 ctypedef void* rados_xattrs_iter_t
92 ctypedef void* rados_omap_iter_t
93 ctypedef void* rados_list_ctx_t
94 ctypedef uint64_t rados_snap_t
95 ctypedef void *rados_write_op_t
96 ctypedef void *rados_read_op_t
97 ctypedef void *rados_completion_t
98 ctypedef void (*rados_callback_t)(rados_completion_t cb, void *arg)
99 ctypedef void (*rados_log_callback_t)(void *arg, const char *line, const char *who,
100 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
224ce89b 101 ctypedef void (*rados_log_callback2_t)(void *arg, const char *line, const char *channel, const char *who, const char *name,
31f18b77 102 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
7c673cae
FG
103
104
105 cdef struct rados_cluster_stat_t:
106 uint64_t kb
107 uint64_t kb_used
108 uint64_t kb_avail
109 uint64_t num_objects
110
111 cdef struct rados_pool_stat_t:
112 uint64_t num_bytes
113 uint64_t num_kb
114 uint64_t num_objects
115 uint64_t num_object_clones
116 uint64_t num_object_copies
117 uint64_t num_objects_missing_on_primary
118 uint64_t num_objects_unfound
119 uint64_t num_objects_degraded
120 uint64_t num_rd
121 uint64_t num_rd_kb
122 uint64_t num_wr
123 uint64_t num_wr_kb
124
125 void rados_buffer_free(char *buf)
126
127 void rados_version(int *major, int *minor, int *extra)
128 int rados_create2(rados_t *pcluster, const char *const clustername,
129 const char * const name, uint64_t flags)
130 int rados_create_with_context(rados_t *cluster, rados_config_t cct)
131 int rados_connect(rados_t cluster)
132 void rados_shutdown(rados_t cluster)
11fdf7f2 133 uint64_t rados_get_instance_id(rados_t cluster)
7c673cae
FG
134 int rados_conf_read_file(rados_t cluster, const char *path)
135 int rados_conf_parse_argv_remainder(rados_t cluster, int argc, const char **argv, const char **remargv)
136 int rados_conf_parse_env(rados_t cluster, const char *var)
137 int rados_conf_set(rados_t cluster, char *option, const char *value)
138 int rados_conf_get(rados_t cluster, char *option, char *buf, size_t len)
139
140 int rados_ioctx_pool_stat(rados_ioctx_t io, rados_pool_stat_t *stats)
141 int64_t rados_pool_lookup(rados_t cluster, const char *pool_name)
142 int rados_pool_reverse_lookup(rados_t cluster, int64_t id, char *buf, size_t maxlen)
143 int rados_pool_create(rados_t cluster, const char *pool_name)
7c673cae 144 int rados_pool_create_with_crush_rule(rados_t cluster, const char *pool_name, uint8_t crush_rule_num)
7c673cae
FG
145 int rados_pool_get_base_tier(rados_t cluster, int64_t pool, int64_t *base_tier)
146 int rados_pool_list(rados_t cluster, char *buf, size_t len)
147 int rados_pool_delete(rados_t cluster, const char *pool_name)
148 int rados_inconsistent_pg_list(rados_t cluster, int64_t pool, char *buf, size_t len)
149
150 int rados_cluster_stat(rados_t cluster, rados_cluster_stat_t *result)
151 int rados_cluster_fsid(rados_t cluster, char *buf, size_t len)
152 int rados_blacklist_add(rados_t cluster, char *client_address, uint32_t expire_seconds)
c07f9fc5
FG
153 int rados_application_enable(rados_ioctx_t io, const char *app_name,
154 int force)
11fdf7f2
TL
155 void rados_set_osdmap_full_try(rados_ioctx_t io)
156 void rados_unset_osdmap_full_try(rados_ioctx_t io)
c07f9fc5
FG
157 int rados_application_list(rados_ioctx_t io, char *values,
158 size_t *values_len)
159 int rados_application_metadata_get(rados_ioctx_t io, const char *app_name,
160 const char *key, char *value,
161 size_t *value_len)
162 int rados_application_metadata_set(rados_ioctx_t io, const char *app_name,
163 const char *key, const char *value)
164 int rados_application_metadata_remove(rados_ioctx_t io,
165 const char *app_name, const char *key)
166 int rados_application_metadata_list(rados_ioctx_t io,
167 const char *app_name, char *keys,
168 size_t *key_len, char *values,
169 size_t *value_len)
7c673cae
FG
170 int rados_ping_monitor(rados_t cluster, const char *mon_id, char **outstr, size_t *outstrlen)
171 int rados_mon_command(rados_t cluster, const char **cmd, size_t cmdlen,
172 const char *inbuf, size_t inbuflen,
173 char **outbuf, size_t *outbuflen,
174 char **outs, size_t *outslen)
175 int rados_mgr_command(rados_t cluster, const char **cmd, size_t cmdlen,
176 const char *inbuf, size_t inbuflen,
177 char **outbuf, size_t *outbuflen,
178 char **outs, size_t *outslen)
179 int rados_mon_command_target(rados_t cluster, const char *name, const char **cmd, size_t cmdlen,
180 const char *inbuf, size_t inbuflen,
181 char **outbuf, size_t *outbuflen,
182 char **outs, size_t *outslen)
183 int rados_osd_command(rados_t cluster, int osdid, const char **cmd, size_t cmdlen,
184 const char *inbuf, size_t inbuflen,
185 char **outbuf, size_t *outbuflen,
186 char **outs, size_t *outslen)
187 int rados_pg_command(rados_t cluster, const char *pgstr, const char **cmd, size_t cmdlen,
188 const char *inbuf, size_t inbuflen,
189 char **outbuf, size_t *outbuflen,
190 char **outs, size_t *outslen)
191 int rados_monitor_log(rados_t cluster, const char *level, rados_log_callback_t cb, void *arg)
31f18b77 192 int rados_monitor_log2(rados_t cluster, const char *level, rados_log_callback2_t cb, void *arg)
7c673cae
FG
193
194 int rados_wait_for_latest_osdmap(rados_t cluster)
195
11fdf7f2
TL
196 int rados_service_register(rados_t cluster, const char *service, const char *daemon, const char *metadata_dict)
197 int rados_service_update_status(rados_t cluster, const char *status_dict)
198
7c673cae 199 int rados_ioctx_create(rados_t cluster, const char *pool_name, rados_ioctx_t *ioctx)
11fdf7f2 200 int rados_ioctx_create2(rados_t cluster, int64_t pool_id, rados_ioctx_t *ioctx)
7c673cae 201 void rados_ioctx_destroy(rados_ioctx_t io)
7c673cae
FG
202 void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key)
203 void rados_ioctx_set_namespace(rados_ioctx_t io, const char * nspace)
204
205 uint64_t rados_get_last_version(rados_ioctx_t io)
206 int rados_stat(rados_ioctx_t io, const char *o, uint64_t *psize, time_t *pmtime)
207 int rados_write(rados_ioctx_t io, const char *oid, const char *buf, size_t len, uint64_t off)
208 int rados_write_full(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
209 int rados_append(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
210 int rados_read(rados_ioctx_t io, const char *oid, char *buf, size_t len, uint64_t off)
211 int rados_remove(rados_ioctx_t io, const char *oid)
212 int rados_trunc(rados_ioctx_t io, const char *oid, uint64_t size)
213 int rados_getxattr(rados_ioctx_t io, const char *o, const char *name, char *buf, size_t len)
214 int rados_setxattr(rados_ioctx_t io, const char *o, const char *name, const char *buf, size_t len)
215 int rados_rmxattr(rados_ioctx_t io, const char *o, const char *name)
216 int rados_getxattrs(rados_ioctx_t io, const char *oid, rados_xattrs_iter_t *iter)
217 int rados_getxattrs_next(rados_xattrs_iter_t iter, const char **name, const char **val, size_t *len)
218 void rados_getxattrs_end(rados_xattrs_iter_t iter)
219
220 int rados_nobjects_list_open(rados_ioctx_t io, rados_list_ctx_t *ctx)
221 int rados_nobjects_list_next(rados_list_ctx_t ctx, const char **entry, const char **key, const char **nspace)
222 void rados_nobjects_list_close(rados_list_ctx_t ctx)
223
11fdf7f2
TL
224 int rados_ioctx_pool_requires_alignment2(rados_ioctx_t io, int * requires)
225 int rados_ioctx_pool_required_alignment2(rados_ioctx_t io, uint64_t * alignment)
226
7c673cae
FG
227 int rados_ioctx_snap_rollback(rados_ioctx_t io, const char * oid, const char * snapname)
228 int rados_ioctx_snap_create(rados_ioctx_t io, const char * snapname)
229 int rados_ioctx_snap_remove(rados_ioctx_t io, const char * snapname)
230 int rados_ioctx_snap_lookup(rados_ioctx_t io, const char * name, rados_snap_t * id)
231 int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id, char * name, int maxlen)
232 void rados_ioctx_snap_set_read(rados_ioctx_t io, rados_snap_t snap)
233 int rados_ioctx_snap_list(rados_ioctx_t io, rados_snap_t * snaps, int maxlen)
234 int rados_ioctx_snap_get_stamp(rados_ioctx_t io, rados_snap_t id, time_t * t)
235
28e407b8
AA
236 int rados_ioctx_selfmanaged_snap_create(rados_ioctx_t io,
237 rados_snap_t *snapid)
238 int rados_ioctx_selfmanaged_snap_remove(rados_ioctx_t io,
239 rados_snap_t snapid)
240 int rados_ioctx_selfmanaged_snap_set_write_ctx(rados_ioctx_t io,
241 rados_snap_t snap_seq,
242 rados_snap_t *snap,
243 int num_snaps)
244 int rados_ioctx_selfmanaged_snap_rollback(rados_ioctx_t io, const char *oid,
245 rados_snap_t snapid)
246
7c673cae
FG
247 int rados_lock_exclusive(rados_ioctx_t io, const char * oid, const char * name,
248 const char * cookie, const char * desc,
249 timeval * duration, uint8_t flags)
250 int rados_lock_shared(rados_ioctx_t io, const char * o, const char * name,
251 const char * cookie, const char * tag, const char * desc,
252 timeval * duration, uint8_t flags)
253 int rados_unlock(rados_ioctx_t io, const char * o, const char * name, const char * cookie)
254
255 rados_write_op_t rados_create_write_op()
256 void rados_release_write_op(rados_write_op_t write_op)
257
258 rados_read_op_t rados_create_read_op()
259 void rados_release_read_op(rados_read_op_t read_op)
260
261 int rados_aio_create_completion(void * cb_arg, rados_callback_t cb_complete, rados_callback_t cb_safe, rados_completion_t * pc)
262 void rados_aio_release(rados_completion_t c)
263 int rados_aio_stat(rados_ioctx_t io, const char *oid, rados_completion_t completion, uint64_t *psize, time_t *pmtime)
264 int rados_aio_write(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len, uint64_t off)
265 int rados_aio_append(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
266 int rados_aio_write_full(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
267 int rados_aio_remove(rados_ioctx_t io, const char * oid, rados_completion_t completion)
268 int rados_aio_read(rados_ioctx_t io, const char * oid, rados_completion_t completion, char * buf, size_t len, uint64_t off)
269 int rados_aio_flush(rados_ioctx_t io)
270
271 int rados_aio_get_return_value(rados_completion_t c)
272 int rados_aio_wait_for_complete_and_cb(rados_completion_t c)
273 int rados_aio_wait_for_safe_and_cb(rados_completion_t c)
274 int rados_aio_wait_for_complete(rados_completion_t c)
275 int rados_aio_wait_for_safe(rados_completion_t c)
276 int rados_aio_is_complete(rados_completion_t c)
277 int rados_aio_is_safe(rados_completion_t c)
278
279 int rados_exec(rados_ioctx_t io, const char * oid, const char * cls, const char * method,
280 const char * in_buf, size_t in_len, char * buf, size_t out_len)
281 int rados_aio_exec(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * cls, const char * method,
282 const char * in_buf, size_t in_len, char * buf, size_t out_len)
283
284 int rados_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, const char * oid, time_t * mtime, int flags)
285 int rados_aio_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, time_t *mtime, int flags)
286 void rados_write_op_omap_set(rados_write_op_t write_op, const char * const* keys, const char * const* vals, const size_t * lens, size_t num)
287 void rados_write_op_omap_rm_keys(rados_write_op_t write_op, const char * const* keys, size_t keys_len)
288 void rados_write_op_omap_clear(rados_write_op_t write_op)
289 void rados_write_op_set_flags(rados_write_op_t write_op, int flags)
290
291 void rados_write_op_create(rados_write_op_t write_op, int exclusive, const char *category)
292 void rados_write_op_append(rados_write_op_t write_op, const char *buffer, size_t len)
293 void rados_write_op_write_full(rados_write_op_t write_op, const char *buffer, size_t len)
91327a77 294 void rados_write_op_assert_version(rados_write_op_t write_op, uint64_t ver)
7c673cae
FG
295 void rados_write_op_write(rados_write_op_t write_op, const char *buffer, size_t len, uint64_t offset)
296 void rados_write_op_remove(rados_write_op_t write_op)
297 void rados_write_op_truncate(rados_write_op_t write_op, uint64_t offset)
298 void rados_write_op_zero(rados_write_op_t write_op, uint64_t offset, uint64_t len)
299
d2e6a577
FG
300 void rados_read_op_omap_get_vals2(rados_read_op_t read_op, const char * start_after, const char * filter_prefix, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
301 void rados_read_op_omap_get_keys2(rados_read_op_t read_op, const char * start_after, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
7c673cae
FG
302 void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op, const char * const* keys, size_t keys_len, rados_omap_iter_t * iter, int * prval)
303 int rados_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, const char * oid, int flags)
304 int rados_aio_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, int flags)
305 void rados_read_op_set_flags(rados_read_op_t read_op, int flags)
306 int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len)
307 void rados_omap_get_end(rados_omap_iter_t iter)
11fdf7f2 308 int rados_notify2(rados_ioctx_t io, const char * o, const char *buf, int buf_len, uint64_t timeout_ms, char **reply_buffer, size_t *reply_buffer_len)
7c673cae
FG
309
310
311LIBRADOS_OP_FLAG_EXCL = _LIBRADOS_OP_FLAG_EXCL
312LIBRADOS_OP_FLAG_FAILOK = _LIBRADOS_OP_FLAG_FAILOK
313LIBRADOS_OP_FLAG_FADVISE_RANDOM = _LIBRADOS_OP_FLAG_FADVISE_RANDOM
314LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL = _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL
315LIBRADOS_OP_FLAG_FADVISE_WILLNEED = _LIBRADOS_OP_FLAG_FADVISE_WILLNEED
316LIBRADOS_OP_FLAG_FADVISE_DONTNEED = _LIBRADOS_OP_FLAG_FADVISE_DONTNEED
317LIBRADOS_OP_FLAG_FADVISE_NOCACHE = _LIBRADOS_OP_FLAG_FADVISE_NOCACHE
318
319LIBRADOS_SNAP_HEAD = _LIBRADOS_SNAP_HEAD
320
321LIBRADOS_OPERATION_NOFLAG = _LIBRADOS_OPERATION_NOFLAG
322LIBRADOS_OPERATION_BALANCE_READS = _LIBRADOS_OPERATION_BALANCE_READS
323LIBRADOS_OPERATION_LOCALIZE_READS = _LIBRADOS_OPERATION_LOCALIZE_READS
324LIBRADOS_OPERATION_ORDER_READS_WRITES = _LIBRADOS_OPERATION_ORDER_READS_WRITES
325LIBRADOS_OPERATION_IGNORE_CACHE = _LIBRADOS_OPERATION_IGNORE_CACHE
326LIBRADOS_OPERATION_SKIPRWLOCKS = _LIBRADOS_OPERATION_SKIPRWLOCKS
327LIBRADOS_OPERATION_IGNORE_OVERLAY = _LIBRADOS_OPERATION_IGNORE_OVERLAY
328
329LIBRADOS_ALL_NSPACES = _LIBRADOS_ALL_NSPACES.decode('utf-8')
330
331LIBRADOS_CREATE_EXCLUSIVE = _LIBRADOS_CREATE_EXCLUSIVE
332LIBRADOS_CREATE_IDEMPOTENT = _LIBRADOS_CREATE_IDEMPOTENT
333
334ANONYMOUS_AUID = 0xffffffffffffffff
335ADMIN_AUID = 0
336
337
338class Error(Exception):
339 """ `Error` class, derived from `Exception` """
224ce89b 340 def __init__(self, message, errno=None):
1adf2230 341 super(Exception, self).__init__(message)
7c673cae 342 self.errno = errno
7c673cae
FG
343
344 def __str__(self):
1adf2230 345 msg = super(Exception, self).__str__()
224ce89b
WB
346 if self.errno is None:
347 return msg
348 return '[errno {0}] {1}'.format(self.errno, msg)
7c673cae 349
224ce89b
WB
350 def __reduce__(self):
351 return (self.__class__, (self.message, self.errno))
7c673cae 352
1adf2230
AA
353class InvalidArgumentError(Error):
354 pass
355
356class OSError(Error):
357 """ `OSError` class, derived from `Error` """
358 pass
359
7c673cae
FG
360class InterruptedOrTimeoutError(OSError):
361 """ `InterruptedOrTimeoutError` class, derived from `OSError` """
362 pass
363
364
365class PermissionError(OSError):
366 """ `PermissionError` class, derived from `OSError` """
367 pass
368
369
370class PermissionDeniedError(OSError):
371 """ deal with EACCES related. """
372 pass
373
374
375class ObjectNotFound(OSError):
376 """ `ObjectNotFound` class, derived from `OSError` """
377 pass
378
379
380class NoData(OSError):
381 """ `NoData` class, derived from `OSError` """
382 pass
383
384
385class ObjectExists(OSError):
386 """ `ObjectExists` class, derived from `OSError` """
387 pass
388
389
390class ObjectBusy(OSError):
391 """ `ObjectBusy` class, derived from `IOError` """
392 pass
393
394
395class IOError(OSError):
396 """ `ObjectBusy` class, derived from `OSError` """
397 pass
398
399
400class NoSpace(OSError):
401 """ `NoSpace` class, derived from `OSError` """
402 pass
403
404
405class RadosStateError(Error):
406 """ `RadosStateError` class, derived from `Error` """
407 pass
408
409
410class IoctxStateError(Error):
411 """ `IoctxStateError` class, derived from `Error` """
412 pass
413
414
415class ObjectStateError(Error):
416 """ `ObjectStateError` class, derived from `Error` """
417 pass
418
419
420class LogicError(Error):
421 """ `` class, derived from `Error` """
422 pass
423
424
425class TimedOut(OSError):
426 """ `TimedOut` class, derived from `OSError` """
427 pass
428
429
430IF UNAME_SYSNAME == "FreeBSD":
431 cdef errno_to_exception = {
432 errno.EPERM : PermissionError,
433 errno.ENOENT : ObjectNotFound,
434 errno.EIO : IOError,
435 errno.ENOSPC : NoSpace,
436 errno.EEXIST : ObjectExists,
437 errno.EBUSY : ObjectBusy,
438 errno.ENOATTR : NoData,
439 errno.EINTR : InterruptedOrTimeoutError,
440 errno.ETIMEDOUT : TimedOut,
441 errno.EACCES : PermissionDeniedError,
442 errno.EINVAL : InvalidArgumentError,
443 }
444ELSE:
445 cdef errno_to_exception = {
446 errno.EPERM : PermissionError,
447 errno.ENOENT : ObjectNotFound,
448 errno.EIO : IOError,
449 errno.ENOSPC : NoSpace,
450 errno.EEXIST : ObjectExists,
451 errno.EBUSY : ObjectBusy,
452 errno.ENODATA : NoData,
453 errno.EINTR : InterruptedOrTimeoutError,
454 errno.ETIMEDOUT : TimedOut,
455 errno.EACCES : PermissionDeniedError,
456 errno.EINVAL : InvalidArgumentError,
457 }
458
459
460cdef make_ex(ret, msg):
461 """
462 Translate a librados return code into an exception.
463
464 :param ret: the return code
465 :type ret: int
466 :param msg: the error message to use
467 :type msg: str
468 :returns: a subclass of :class:`Error`
469 """
470 ret = abs(ret)
471 if ret in errno_to_exception:
224ce89b 472 return errno_to_exception[ret](msg, errno=ret)
7c673cae 473 else:
224ce89b 474 return OSError(msg, errno=ret)
7c673cae
FG
475
476
477# helper to specify an optional argument, where in addition to `cls`, `None`
478# is also acceptable
479def opt(cls):
480 return (cls, None)
481
482
483# validate argument types of an instance method
484# kwargs is an un-ordered dict, so use args instead
485def requires(*types):
486 def is_type_of(v, t):
487 if t is None:
488 return v is None
489 else:
490 return isinstance(v, t)
491
492 def check_type(val, arg_name, arg_type):
493 if isinstance(arg_type, tuple):
494 if any(is_type_of(val, t) for t in arg_type):
495 return
496 type_names = ' or '.join('None' if t is None else t.__name__
497 for t in arg_type)
498 raise TypeError('%s must be %s' % (arg_name, type_names))
499 else:
500 if is_type_of(val, arg_type):
501 return
502 assert(arg_type is not None)
503 raise TypeError('%s must be %s' % (arg_name, arg_type.__name__))
504
505 def wrapper(f):
506 # FIXME(sileht): this stop with
507 # AttributeError: 'method_descriptor' object has no attribute '__module__'
508 # @wraps(f)
509 def validate_func(*args, **kwargs):
510 # ignore the `self` arg
511 pos_args = zip(args[1:], types)
512 named_args = ((kwargs[name], (name, spec)) for name, spec in types
513 if name in kwargs)
514 for arg_val, (arg_name, arg_type) in chain(pos_args, named_args):
515 check_type(arg_val, arg_name, arg_type)
516 return f(*args, **kwargs)
517 return validate_func
518 return wrapper
519
520
521def cstr(val, name, encoding="utf-8", opt=False):
522 """
523 Create a byte string from a Python string
524
525 :param basestring val: Python string
526 :param str name: Name of the string parameter, for exceptions
527 :param str encoding: Encoding to use
528 :param bool opt: If True, None is allowed
529 :rtype: bytes
530 :raises: :class:`InvalidArgument`
531 """
532 if opt and val is None:
533 return None
534 if isinstance(val, bytes):
535 return val
536 elif isinstance(val, unicode):
537 return val.encode(encoding)
538 else:
539 raise TypeError('%s must be a string' % name)
540
541
542def cstr_list(list_str, name, encoding="utf-8"):
543 return [cstr(s, name) for s in list_str]
544
545
546def decode_cstr(val, encoding="utf-8"):
547 """
548 Decode a byte string into a Python string.
549
550 :param bytes val: byte string
551 :rtype: unicode or None
552 """
553 if val is None:
554 return None
555
556 return val.decode(encoding)
557
558
559cdef char* opt_str(s) except? NULL:
560 if s is None:
561 return NULL
562 return s
563
564
565cdef void* realloc_chk(void* ptr, size_t size) except NULL:
566 cdef void *ret = realloc(ptr, size)
567 if ret == NULL:
568 raise MemoryError("realloc failed")
569 return ret
570
571
572cdef size_t * to_csize_t_array(list_int):
573 cdef size_t *ret = <size_t *>malloc(len(list_int) * sizeof(size_t))
574 if ret == NULL:
575 raise MemoryError("malloc failed")
576 for i in xrange(len(list_int)):
577 ret[i] = <size_t>list_int[i]
578 return ret
579
580
581cdef char ** to_bytes_array(list_bytes):
582 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
583 if ret == NULL:
584 raise MemoryError("malloc failed")
585 for i in xrange(len(list_bytes)):
586 ret[i] = <char *>list_bytes[i]
587 return ret
588
589
590
591cdef int __monitor_callback(void *arg, const char *line, const char *who,
592 uint64_t sec, uint64_t nsec, uint64_t seq,
593 const char *level, const char *msg) with gil:
594 cdef object cb_info = <object>arg
595 cb_info[0](cb_info[1], line, who, sec, nsec, seq, level, msg)
596 return 0
597
224ce89b
WB
598cdef int __monitor_callback2(void *arg, const char *line, const char *channel,
599 const char *who,
31f18b77
FG
600 const char *name,
601 uint64_t sec, uint64_t nsec, uint64_t seq,
602 const char *level, const char *msg) with gil:
603 cdef object cb_info = <object>arg
224ce89b 604 cb_info[0](cb_info[1], line, channel, name, who, sec, nsec, seq, level, msg)
31f18b77
FG
605 return 0
606
7c673cae
FG
607
608class Version(object):
609 """ Version information """
610 def __init__(self, major, minor, extra):
611 self.major = major
612 self.minor = minor
613 self.extra = extra
614
615 def __str__(self):
616 return "%d.%d.%d" % (self.major, self.minor, self.extra)
617
618
619cdef class Rados(object):
620 """This class wraps librados functions"""
621 # NOTE(sileht): attributes declared in .pyd
622
623 def __init__(self, *args, **kwargs):
624 PyEval_InitThreads()
625 self.__setup(*args, **kwargs)
626
627 @requires(('rados_id', opt(str_type)), ('name', opt(str_type)), ('clustername', opt(str_type)),
628 ('conffile', opt(str_type)))
629 def __setup(self, rados_id=None, name=None, clustername=None,
630 conf_defaults=None, conffile=None, conf=None, flags=0,
631 context=None):
632 self.monitor_callback = None
31f18b77 633 self.monitor_callback2 = None
7c673cae
FG
634 self.parsed_args = []
635 self.conf_defaults = conf_defaults
636 self.conffile = conffile
637 self.rados_id = rados_id
638
639 if rados_id and name:
640 raise Error("Rados(): can't supply both rados_id and name")
641 elif rados_id:
642 name = 'client.' + rados_id
643 elif name is None:
644 name = 'client.admin'
645 if clustername is None:
646 clustername = ''
647
648 name = cstr(name, 'name')
649 clustername = cstr(clustername, 'clustername')
650 cdef:
651 char *_name = name
652 char *_clustername = clustername
653 int _flags = flags
654 int ret
655
656 if context:
657 # Unpack void* (aka rados_config_t) from capsule
658 rados_config = <rados_config_t> PyCapsule_GetPointer(context, NULL)
659 with nogil:
660 ret = rados_create_with_context(&self.cluster, rados_config)
661 else:
662 with nogil:
663 ret = rados_create2(&self.cluster, _clustername, _name, _flags)
664 if ret != 0:
665 raise Error("rados_initialize failed with error code: %d" % ret)
666
667 self.state = "configuring"
668 # order is important: conf_defaults, then conffile, then conf
669 if conf_defaults:
670 for key, value in conf_defaults.items():
671 self.conf_set(key, value)
672 if conffile is not None:
673 # read the default conf file when '' is given
674 if conffile == '':
675 conffile = None
676 self.conf_read_file(conffile)
677 if conf:
678 for key, value in conf.items():
679 self.conf_set(key, value)
680
681 def require_state(self, *args):
682 """
683 Checks if the Rados object is in a special state
684
11fdf7f2 685 :raises: :class:`RadosStateError`
7c673cae
FG
686 """
687 if self.state in args:
688 return
689 raise RadosStateError("You cannot perform that operation on a \
690Rados object in state %s." % self.state)
691
692 def shutdown(self):
693 """
694 Disconnects from the cluster. Call this explicitly when a
695 Rados.connect()ed object is no longer used.
696 """
697 if self.state != "shutdown":
698 with nogil:
699 rados_shutdown(self.cluster)
700 self.state = "shutdown"
701
702 def __enter__(self):
703 self.connect()
704 return self
705
706 def __exit__(self, type_, value, traceback):
707 self.shutdown()
708 return False
709
710 def version(self):
711 """
712 Get the version number of the ``librados`` C library.
713
714 :returns: a tuple of ``(major, minor, extra)`` components of the
715 librados version
716 """
717 cdef int major = 0
718 cdef int minor = 0
719 cdef int extra = 0
720 with nogil:
721 rados_version(&major, &minor, &extra)
722 return Version(major, minor, extra)
723
724 @requires(('path', opt(str_type)))
725 def conf_read_file(self, path=None):
726 """
727 Configure the cluster handle using a Ceph config file.
728
729 :param path: path to the config file
730 :type path: str
731 """
732 self.require_state("configuring", "connected")
733 path = cstr(path, 'path', opt=True)
734 cdef:
735 char *_path = opt_str(path)
736 with nogil:
737 ret = rados_conf_read_file(self.cluster, _path)
738 if ret != 0:
739 raise make_ex(ret, "error calling conf_read_file")
740
741 def conf_parse_argv(self, args):
742 """
743 Parse known arguments from args, and remove; returned
744 args contain only those unknown to ceph
745 """
746 self.require_state("configuring", "connected")
747 if not args:
748 return
749
750 cargs = cstr_list(args, 'args')
751 cdef:
752 int _argc = len(args)
753 char **_argv = to_bytes_array(cargs)
754 char **_remargv = NULL
755
756 try:
757 _remargv = <char **>malloc(_argc * sizeof(char *))
758 with nogil:
759 ret = rados_conf_parse_argv_remainder(self.cluster, _argc,
760 <const char**>_argv,
761 <const char**>_remargv)
762 if ret:
763 raise make_ex(ret, "error calling conf_parse_argv_remainder")
764
765 # _remargv was allocated with fixed argc; collapse return
766 # list to eliminate any missing args
767 retargs = [decode_cstr(a) for a in _remargv[:_argc]
768 if a != NULL]
769 self.parsed_args = args
770 return retargs
771 finally:
772 free(_argv)
773 free(_remargv)
774
775 def conf_parse_env(self, var='CEPH_ARGS'):
776 """
777 Parse known arguments from an environment variable, normally
778 CEPH_ARGS.
779 """
780 self.require_state("configuring", "connected")
781 if not var:
782 return
783
784 var = cstr(var, 'var')
785 cdef:
786 char *_var = var
787 with nogil:
788 ret = rados_conf_parse_env(self.cluster, _var)
789 if ret != 0:
790 raise make_ex(ret, "error calling conf_parse_env")
791
792 @requires(('option', str_type))
793 def conf_get(self, option):
794 """
795 Get the value of a configuration option
796
797 :param option: which option to read
798 :type option: str
799
800 :returns: str - value of the option or None
801 :raises: :class:`TypeError`
802 """
803 self.require_state("configuring", "connected")
804 option = cstr(option, 'option')
805 cdef:
806 char *_option = option
807 size_t length = 20
808 char *ret_buf = NULL
809
810 try:
811 while True:
812 ret_buf = <char *>realloc_chk(ret_buf, length)
813 with nogil:
814 ret = rados_conf_get(self.cluster, _option, ret_buf, length)
815 if ret == 0:
816 return decode_cstr(ret_buf)
817 elif ret == -errno.ENAMETOOLONG:
818 length = length * 2
819 elif ret == -errno.ENOENT:
820 return None
821 else:
822 raise make_ex(ret, "error calling conf_get")
823 finally:
824 free(ret_buf)
825
826 @requires(('option', str_type), ('val', str_type))
827 def conf_set(self, option, val):
828 """
829 Set the value of a configuration option
830
831 :param option: which option to set
832 :type option: str
833 :param option: value of the option
834 :type option: str
835
836 :raises: :class:`TypeError`, :class:`ObjectNotFound`
837 """
838 self.require_state("configuring", "connected")
839 option = cstr(option, 'option')
840 val = cstr(val, 'val')
841 cdef:
842 char *_option = option
843 char *_val = val
844
845 with nogil:
846 ret = rados_conf_set(self.cluster, _option, _val)
847 if ret != 0:
848 raise make_ex(ret, "error calling conf_set")
849
850 def ping_monitor(self, mon_id):
851 """
852 Ping a monitor to assess liveness
853
854 May be used as a simply way to assess liveness, or to obtain
855 information about the monitor in a simple way even in the
856 absence of quorum.
857
858 :param mon_id: the ID portion of the monitor's name (i.e., mon.<ID>)
859 :type mon_id: str
860 :returns: the string reply from the monitor
861 """
862
863 self.require_state("configuring", "connected")
864
865 mon_id = cstr(mon_id, 'mon_id')
866 cdef:
867 char *_mon_id = mon_id
31f18b77 868 size_t outstrlen = 0
7c673cae
FG
869 char *outstr
870
871 with nogil:
872 ret = rados_ping_monitor(self.cluster, _mon_id, &outstr, &outstrlen)
873
874 if ret != 0:
875 raise make_ex(ret, "error calling ping_monitor")
876
877 if outstrlen:
878 my_outstr = outstr[:outstrlen]
879 rados_buffer_free(outstr)
880 return decode_cstr(my_outstr)
881
882 def connect(self, timeout=0):
883 """
884 Connect to the cluster. Use shutdown() to release resources.
885 """
886 self.require_state("configuring")
887 # NOTE(sileht): timeout was supported by old python API,
888 # but this is not something available in C API, so ignore
889 # for now and remove it later
890 with nogil:
891 ret = rados_connect(self.cluster)
892 if ret != 0:
893 raise make_ex(ret, "error connecting to the cluster")
894 self.state = "connected"
895
11fdf7f2
TL
896 def get_instance_id(self):
897 """
898 Get a global id for current instance
899 """
900 self.require_state("connected")
901 with nogil:
902 ret = rados_get_instance_id(self.cluster)
903 return ret;
904
7c673cae
FG
905 def get_cluster_stats(self):
906 """
907 Read usage info about the cluster
908
909 This tells you total space, space used, space available, and number
910 of objects. These are not updated immediately when data is written,
911 they are eventually consistent.
912
913 :returns: dict - contains the following keys:
914
915 - ``kb`` (int) - total space
916
917 - ``kb_used`` (int) - space used
918
919 - ``kb_avail`` (int) - free space available
920
921 - ``num_objects`` (int) - number of objects
922
923 """
924 cdef:
925 rados_cluster_stat_t stats
926
927 with nogil:
928 ret = rados_cluster_stat(self.cluster, &stats)
929
930 if ret < 0:
931 raise make_ex(
932 ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id)
933 return {'kb': stats.kb,
934 'kb_used': stats.kb_used,
935 'kb_avail': stats.kb_avail,
936 'num_objects': stats.num_objects}
937
938 @requires(('pool_name', str_type))
939 def pool_exists(self, pool_name):
940 """
941 Checks if a given pool exists.
942
943 :param pool_name: name of the pool to check
944 :type pool_name: str
945
946 :raises: :class:`TypeError`, :class:`Error`
947 :returns: bool - whether the pool exists, false otherwise.
948 """
949 self.require_state("connected")
950
951 pool_name = cstr(pool_name, 'pool_name')
952 cdef:
953 char *_pool_name = pool_name
954
955 with nogil:
956 ret = rados_pool_lookup(self.cluster, _pool_name)
957 if ret >= 0:
958 return True
959 elif ret == -errno.ENOENT:
960 return False
961 else:
962 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
963
964 @requires(('pool_name', str_type))
965 def pool_lookup(self, pool_name):
966 """
967 Returns a pool's ID based on its name.
968
969 :param pool_name: name of the pool to look up
970 :type pool_name: str
971
972 :raises: :class:`TypeError`, :class:`Error`
973 :returns: int - pool ID, or None if it doesn't exist
974 """
975 self.require_state("connected")
976 pool_name = cstr(pool_name, 'pool_name')
977 cdef:
978 char *_pool_name = pool_name
979
980 with nogil:
981 ret = rados_pool_lookup(self.cluster, _pool_name)
982 if ret >= 0:
983 return int(ret)
984 elif ret == -errno.ENOENT:
985 return None
986 else:
987 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
988
989 @requires(('pool_id', int))
990 def pool_reverse_lookup(self, pool_id):
991 """
992 Returns a pool's name based on its ID.
993
994 :param pool_id: ID of the pool to look up
995 :type pool_id: int
996
997 :raises: :class:`TypeError`, :class:`Error`
998 :returns: string - pool name, or None if it doesn't exist
999 """
1000 self.require_state("connected")
1001 cdef:
1002 int64_t _pool_id = pool_id
1003 size_t size = 512
1004 char *name = NULL
1005
1006 try:
1007 while True:
1008 name = <char *>realloc_chk(name, size)
1009 with nogil:
1010 ret = rados_pool_reverse_lookup(self.cluster, _pool_id, name, size)
1011 if ret >= 0:
1012 break
1013 elif ret != -errno.ERANGE and size <= 4096:
1014 size *= 2
1015 elif ret == -errno.ENOENT:
1016 return None
1017 elif ret < 0:
1018 raise make_ex(ret, "error reverse looking up pool '%s'" % pool_id)
1019
1020 return decode_cstr(name)
1021
1022 finally:
1023 free(name)
1024
11fdf7f2
TL
1025 @requires(('pool_name', str_type), ('crush_rule', opt(int)))
1026 def create_pool(self, pool_name, crush_rule=None):
7c673cae
FG
1027 """
1028 Create a pool:
11fdf7f2
TL
1029 - with default settings: if crush_rule=None
1030 - with a specific CRUSH rule: crush_rule given
7c673cae
FG
1031
1032 :param pool_name: name of the pool to create
1033 :type pool_name: str
7c673cae
FG
1034 :param crush_rule: rule to use for placement in the new pool
1035 :type crush_rule: int
1036
1037 :raises: :class:`TypeError`, :class:`Error`
1038 """
1039 self.require_state("connected")
1040
1041 pool_name = cstr(pool_name, 'pool_name')
1042 cdef:
1043 char *_pool_name = pool_name
1044 uint8_t _crush_rule
7c673cae 1045
11fdf7f2 1046 if crush_rule is None:
7c673cae
FG
1047 with nogil:
1048 ret = rados_pool_create(self.cluster, _pool_name)
7c673cae 1049 else:
7c673cae
FG
1050 _crush_rule = crush_rule
1051 with nogil:
11fdf7f2 1052 ret = rados_pool_create_with_crush_rule(self.cluster, _pool_name, _crush_rule)
7c673cae
FG
1053 if ret < 0:
1054 raise make_ex(ret, "error creating pool '%s'" % pool_name)
1055
1056 @requires(('pool_id', int))
1057 def get_pool_base_tier(self, pool_id):
1058 """
1059 Get base pool
1060
1061 :returns: base pool, or pool_id if tiering is not configured for the pool
1062 """
1063 self.require_state("connected")
1064 cdef:
1065 int64_t base_tier = 0
1066 int64_t _pool_id = pool_id
1067
1068 with nogil:
1069 ret = rados_pool_get_base_tier(self.cluster, _pool_id, &base_tier)
1070 if ret < 0:
1071 raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id)
1072 return int(base_tier)
1073
1074 @requires(('pool_name', str_type))
1075 def delete_pool(self, pool_name):
1076 """
1077 Delete a pool and all data inside it.
1078
1079 The pool is removed from the cluster immediately,
1080 but the actual data is deleted in the background.
1081
1082 :param pool_name: name of the pool to delete
1083 :type pool_name: str
1084
1085 :raises: :class:`TypeError`, :class:`Error`
1086 """
1087 self.require_state("connected")
1088
1089 pool_name = cstr(pool_name, 'pool_name')
1090 cdef:
1091 char *_pool_name = pool_name
1092
1093 with nogil:
1094 ret = rados_pool_delete(self.cluster, _pool_name)
1095 if ret < 0:
1096 raise make_ex(ret, "error deleting pool '%s'" % pool_name)
1097
1098 @requires(('pool_id', int))
1099 def get_inconsistent_pgs(self, pool_id):
1100 """
1101 List inconsistent placement groups in the given pool
1102
1103 :param pool_id: ID of the pool in which PGs are listed
1104 :type pool_id: int
1105 :returns: list - inconsistent placement groups
1106 """
1107 self.require_state("connected")
1108 cdef:
1109 int64_t pool = pool_id
1110 size_t size = 512
1111 char *pgs = NULL
1112
1113 try:
1114 while True:
1115 pgs = <char *>realloc_chk(pgs, size);
1116 with nogil:
1117 ret = rados_inconsistent_pg_list(self.cluster, pool,
1118 pgs, size)
1119 if ret > <int>size:
1120 size *= 2
1121 elif ret >= 0:
1122 break
1123 else:
1124 raise make_ex(ret, "error calling inconsistent_pg_list")
1125 return [pg for pg in decode_cstr(pgs[:ret]).split('\0') if pg]
1126 finally:
1127 free(pgs)
1128
1129 def list_pools(self):
1130 """
1131 Gets a list of pool names.
1132
1133 :returns: list - of pool names.
1134 """
1135 self.require_state("connected")
1136 cdef:
1137 size_t size = 512
1138 char *c_names = NULL
1139
1140 try:
1141 while True:
1142 c_names = <char *>realloc_chk(c_names, size)
1143 with nogil:
1144 ret = rados_pool_list(self.cluster, c_names, size)
1145 if ret > <int>size:
1146 size *= 2
1147 elif ret >= 0:
1148 break
1149 return [name for name in decode_cstr(c_names[:ret]).split('\0')
1150 if name]
1151 finally:
1152 free(c_names)
1153
1154 def get_fsid(self):
1155 """
1156 Get the fsid of the cluster as a hexadecimal string.
1157
1158 :raises: :class:`Error`
1159 :returns: str - cluster fsid
1160 """
1161 self.require_state("connected")
1162 cdef:
81eedcae
TL
1163 char *ret_buf = NULL
1164 size_t buf_len = 64
7c673cae 1165
7c673cae 1166 try:
81eedcae
TL
1167 while True:
1168 ret_buf = <char *>realloc_chk(ret_buf, buf_len)
1169 with nogil:
1170 ret = rados_cluster_fsid(self.cluster, ret_buf, buf_len)
1171 if ret == -errno.ERANGE:
1172 buf_len = buf_len * 2
1173 elif ret < 0:
1174 raise make_ex(ret, "error getting cluster fsid")
1175 else:
1176 break
1177 return decode_cstr(ret_buf)
7c673cae 1178 finally:
81eedcae 1179 free(ret_buf)
7c673cae
FG
1180
1181 @requires(('ioctx_name', str_type))
1182 def open_ioctx(self, ioctx_name):
1183 """
1184 Create an io context
1185
1186 The io context allows you to perform operations within a particular
1187 pool.
1188
1189 :param ioctx_name: name of the pool
1190 :type ioctx_name: str
1191
1192 :raises: :class:`TypeError`, :class:`Error`
1193 :returns: Ioctx - Rados Ioctx object
1194 """
1195 self.require_state("connected")
1196 ioctx_name = cstr(ioctx_name, 'ioctx_name')
1197 cdef:
1198 rados_ioctx_t ioctx
1199 char *_ioctx_name = ioctx_name
1200 with nogil:
1201 ret = rados_ioctx_create(self.cluster, _ioctx_name, &ioctx)
1202 if ret < 0:
1203 raise make_ex(ret, "error opening pool '%s'" % ioctx_name)
1204 io = Ioctx(ioctx_name)
1205 io.io = ioctx
1206 return io
1207
11fdf7f2
TL
1208 @requires(('pool_id', int))
1209 def open_ioctx2(self, pool_id):
1210 """
1211 Create an io context
1212
1213 The io context allows you to perform operations within a particular
1214 pool.
1215
1216 :param pool_id: ID of the pool
1217 :type pool_id: int
1218
1219 :raises: :class:`TypeError`, :class:`Error`
1220 :returns: Ioctx - Rados Ioctx object
1221 """
1222 self.require_state("connected")
1223 cdef:
1224 rados_ioctx_t ioctx
1225 int64_t _pool_id = pool_id
1226 with nogil:
1227 ret = rados_ioctx_create2(self.cluster, _pool_id, &ioctx)
1228 if ret < 0:
1229 raise make_ex(ret, "error opening pool id '%s'" % pool_id)
1230 io = Ioctx(str(pool_id))
1231 io.io = ioctx
1232 return io
1233
7c673cae
FG
1234 def mon_command(self, cmd, inbuf, timeout=0, target=None):
1235 """
1236 mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen)
1237 returns (int ret, string outbuf, string outs)
1238 """
1239 # NOTE(sileht): timeout is ignored because C API doesn't provide
1240 # timeout argument, but we keep it for backward compat with old python binding
1241
1242 self.require_state("connected")
1243 cmd = cstr_list(cmd, 'c')
1244
1245 if isinstance(target, int):
1246 # NOTE(sileht): looks weird but test_monmap_dump pass int
1247 target = str(target)
1248
1249 target = cstr(target, 'target', opt=True)
1250 inbuf = cstr(inbuf, 'inbuf')
1251
1252 cdef:
1253 char *_target = opt_str(target)
1254 char **_cmd = to_bytes_array(cmd)
1255 size_t _cmdlen = len(cmd)
1256
1257 char *_inbuf = inbuf
1258 size_t _inbuf_len = len(inbuf)
1259
1260 char *_outbuf
1261 size_t _outbuf_len
1262 char *_outs
1263 size_t _outs_len
1264
1265 try:
1266 if target:
1267 with nogil:
1268 ret = rados_mon_command_target(self.cluster, _target,
1269 <const char **>_cmd, _cmdlen,
1270 <const char*>_inbuf, _inbuf_len,
1271 &_outbuf, &_outbuf_len,
1272 &_outs, &_outs_len)
1273 else:
1274 with nogil:
1275 ret = rados_mon_command(self.cluster,
1276 <const char **>_cmd, _cmdlen,
1277 <const char*>_inbuf, _inbuf_len,
1278 &_outbuf, &_outbuf_len,
1279 &_outs, &_outs_len)
1280
1281 my_outs = decode_cstr(_outs[:_outs_len])
1282 my_outbuf = _outbuf[:_outbuf_len]
1283 if _outs_len:
1284 rados_buffer_free(_outs)
1285 if _outbuf_len:
1286 rados_buffer_free(_outbuf)
1287 return (ret, my_outbuf, my_outs)
1288 finally:
1289 free(_cmd)
1290
1291 def osd_command(self, osdid, cmd, inbuf, timeout=0):
1292 """
1293 osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1294 returns (int ret, string outbuf, string outs)
1295 """
1296 # NOTE(sileht): timeout is ignored because C API doesn't provide
1297 # timeout argument, but we keep it for backward compat with old python binding
1298 self.require_state("connected")
1299
1300 cmd = cstr_list(cmd, 'cmd')
1301 inbuf = cstr(inbuf, 'inbuf')
1302
1303 cdef:
1304 int _osdid = osdid
1305 char **_cmd = to_bytes_array(cmd)
1306 size_t _cmdlen = len(cmd)
1307
1308 char *_inbuf = inbuf
1309 size_t _inbuf_len = len(inbuf)
1310
1311 char *_outbuf
1312 size_t _outbuf_len
1313 char *_outs
1314 size_t _outs_len
1315
1316 try:
1317 with nogil:
1318 ret = rados_osd_command(self.cluster, _osdid,
1319 <const char **>_cmd, _cmdlen,
1320 <const char*>_inbuf, _inbuf_len,
1321 &_outbuf, &_outbuf_len,
1322 &_outs, &_outs_len)
1323
1324 my_outs = decode_cstr(_outs[:_outs_len])
1325 my_outbuf = _outbuf[:_outbuf_len]
1326 if _outs_len:
1327 rados_buffer_free(_outs)
1328 if _outbuf_len:
1329 rados_buffer_free(_outbuf)
1330 return (ret, my_outbuf, my_outs)
1331 finally:
1332 free(_cmd)
1333
1334 def mgr_command(self, cmd, inbuf, timeout=0):
1335 """
1336 returns (int ret, string outbuf, string outs)
1337 """
1338 # NOTE(sileht): timeout is ignored because C API doesn't provide
1339 # timeout argument, but we keep it for backward compat with old python binding
1340 self.require_state("connected")
1341
1342 cmd = cstr_list(cmd, 'cmd')
1343 inbuf = cstr(inbuf, 'inbuf')
1344
1345 cdef:
1346 char **_cmd = to_bytes_array(cmd)
1347 size_t _cmdlen = len(cmd)
1348
1349 char *_inbuf = inbuf
1350 size_t _inbuf_len = len(inbuf)
1351
1352 char *_outbuf
1353 size_t _outbuf_len
1354 char *_outs
1355 size_t _outs_len
1356
1357 try:
1358 with nogil:
1359 ret = rados_mgr_command(self.cluster,
1360 <const char **>_cmd, _cmdlen,
1361 <const char*>_inbuf, _inbuf_len,
1362 &_outbuf, &_outbuf_len,
1363 &_outs, &_outs_len)
1364
1365 my_outs = decode_cstr(_outs[:_outs_len])
1366 my_outbuf = _outbuf[:_outbuf_len]
1367 if _outs_len:
1368 rados_buffer_free(_outs)
1369 if _outbuf_len:
1370 rados_buffer_free(_outbuf)
1371 return (ret, my_outbuf, my_outs)
1372 finally:
1373 free(_cmd)
1374
1375 def pg_command(self, pgid, cmd, inbuf, timeout=0):
1376 """
1377 pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1378 returns (int ret, string outbuf, string outs)
1379 """
1380 # NOTE(sileht): timeout is ignored because C API doesn't provide
1381 # timeout argument, but we keep it for backward compat with old python binding
1382 self.require_state("connected")
1383
1384 pgid = cstr(pgid, 'pgid')
1385 cmd = cstr_list(cmd, 'cmd')
1386 inbuf = cstr(inbuf, 'inbuf')
1387
1388 cdef:
1389 char *_pgid = pgid
1390 char **_cmd = to_bytes_array(cmd)
1391 size_t _cmdlen = len(cmd)
1392
1393 char *_inbuf = inbuf
1394 size_t _inbuf_len = len(inbuf)
1395
1396 char *_outbuf
1397 size_t _outbuf_len
1398 char *_outs
1399 size_t _outs_len
1400
1401 try:
1402 with nogil:
1403 ret = rados_pg_command(self.cluster, _pgid,
1404 <const char **>_cmd, _cmdlen,
1405 <const char *>_inbuf, _inbuf_len,
1406 &_outbuf, &_outbuf_len,
1407 &_outs, &_outs_len)
1408
1409 my_outs = decode_cstr(_outs[:_outs_len])
1410 my_outbuf = _outbuf[:_outbuf_len]
1411 if _outs_len:
1412 rados_buffer_free(_outs)
1413 if _outbuf_len:
1414 rados_buffer_free(_outbuf)
1415 return (ret, my_outbuf, my_outs)
1416 finally:
1417 free(_cmd)
1418
1419 def wait_for_latest_osdmap(self):
1420 self.require_state("connected")
1421 with nogil:
1422 ret = rados_wait_for_latest_osdmap(self.cluster)
1423 return ret
1424
1425 def blacklist_add(self, client_address, expire_seconds=0):
1426 """
1427 Blacklist a client from the OSDs
1428
1429 :param client_address: client address
1430 :type client_address: str
1431 :param expire_seconds: number of seconds to blacklist
1432 :type expire_seconds: int
1433
1434 :raises: :class:`Error`
1435 """
1436 self.require_state("connected")
1437 client_address = cstr(client_address, 'client_address')
1438 cdef:
1439 uint32_t _expire_seconds = expire_seconds
1440 char *_client_address = client_address
1441
1442 with nogil:
1443 ret = rados_blacklist_add(self.cluster, _client_address, _expire_seconds)
1444 if ret < 0:
1445 raise make_ex(ret, "error blacklisting client '%s'" % client_address)
1446
1447 def monitor_log(self, level, callback, arg):
1448 if level not in MONITOR_LEVELS:
1449 raise LogicError("invalid monitor level " + level)
1450 if callback is not None and not callable(callback):
1451 raise LogicError("callback must be a callable function or None")
1452
1453 level = cstr(level, 'level')
1454 cdef char *_level = level
1455
1456 if callback is None:
1457 with nogil:
1458 r = rados_monitor_log(self.cluster, <const char*>_level, NULL, NULL)
1459 self.monitor_callback = None
31f18b77 1460 self.monitor_callback2 = None
7c673cae
FG
1461 return
1462
1463 cb = (callback, arg)
1464 cdef PyObject* _arg = <PyObject*>cb
1465 with nogil:
1466 r = rados_monitor_log(self.cluster, <const char*>_level,
1467 <rados_log_callback_t>&__monitor_callback, _arg)
1468
1469 if r:
1470 raise make_ex(r, 'error calling rados_monitor_log')
1471 # NOTE(sileht): Prevents the callback method from being garbage collected
1472 self.monitor_callback = cb
31f18b77
FG
1473 self.monitor_callback2 = None
1474
1475 def monitor_log2(self, level, callback, arg):
1476 if level not in MONITOR_LEVELS:
1477 raise LogicError("invalid monitor level " + level)
1478 if callback is not None and not callable(callback):
1479 raise LogicError("callback must be a callable function or None")
1480
1481 level = cstr(level, 'level')
1482 cdef char *_level = level
1483
1484 if callback is None:
1485 with nogil:
1486 r = rados_monitor_log2(self.cluster, <const char*>_level, NULL, NULL)
1487 self.monitor_callback = None
1488 self.monitor_callback2 = None
1489 return
1490
1491 cb = (callback, arg)
1492 cdef PyObject* _arg = <PyObject*>cb
1493 with nogil:
1494 r = rados_monitor_log2(self.cluster, <const char*>_level,
1495 <rados_log_callback2_t>&__monitor_callback2, _arg)
1496
1497 if r:
1498 raise make_ex(r, 'error calling rados_monitor_log')
1499 # NOTE(sileht): Prevents the callback method from being garbage collected
1500 self.monitor_callback = None
1501 self.monitor_callback2 = cb
7c673cae 1502
11fdf7f2
TL
1503 @requires(('service', str_type), ('daemon', str_type), ('metadata', dict))
1504 def service_daemon_register(self, service, daemon, metadata):
1505 """
1506 :param str service: service name (e.g. "rgw")
1507 :param str daemon: daemon name (e.g. "gwfoo")
1508 :param dict metadata: static metadata about the register daemon
1509 (e.g., the version of Ceph, the kernel version.)
1510 """
1511 service = cstr(service, 'service')
1512 daemon = cstr(daemon, 'daemon')
1513 metadata_dict = '\0'.join(chain.from_iterable(metadata.items()))
1514 metadata_dict += '\0'
1515 cdef:
1516 char *_service = service
1517 char *_daemon = daemon
1518 char *_metadata = metadata_dict
1519
1520 with nogil:
1521 ret = rados_service_register(self.cluster, _service, _daemon, _metadata)
1522 if ret != 0:
1523 raise make_ex(ret, "error calling service_register()")
1524
1525 @requires(('metadata', dict))
1526 def service_daemon_update(self, status):
1527 status_dict = '\0'.join(chain.from_iterable(status.items()))
1528 status_dict += '\0'
1529 cdef:
1530 char *_status = status_dict
1531
1532 with nogil:
1533 ret = rados_service_update_status(self.cluster, _status)
1534 if ret != 0:
1535 raise make_ex(ret, "error calling service_daemon_update()")
1536
7c673cae
FG
1537
1538cdef class OmapIterator(object):
1539 """Omap iterator"""
1540
1541 cdef public Ioctx ioctx
1542 cdef rados_omap_iter_t ctx
1543
1544 def __cinit__(self, Ioctx ioctx):
1545 self.ioctx = ioctx
1546
1547 def __iter__(self):
1548 return self
1549
1550 def __next__(self):
1551 """
1552 Get the next key-value pair in the object
1553 :returns: next rados.OmapItem
1554 """
1555 cdef:
1556 char *key_ = NULL
1557 char *val_ = NULL
1558 size_t len_
1559
1560 with nogil:
1561 ret = rados_omap_get_next(self.ctx, &key_, &val_, &len_)
1562
1563 if ret != 0:
1564 raise make_ex(ret, "error iterating over the omap")
1565 if key_ == NULL:
1566 raise StopIteration()
1567 key = decode_cstr(key_)
1568 val = None
1569 if val_ != NULL:
1570 val = val_[:len_]
1571 return (key, val)
1572
1573 def __dealloc__(self):
1574 with nogil:
1575 rados_omap_get_end(self.ctx)
1576
1577
1578cdef class ObjectIterator(object):
1579 """rados.Ioctx Object iterator"""
1580
1581 cdef rados_list_ctx_t ctx
1582
1583 cdef public object ioctx
1584
1585 def __cinit__(self, Ioctx ioctx):
1586 self.ioctx = ioctx
1587
1588 with nogil:
1589 ret = rados_nobjects_list_open(ioctx.io, &self.ctx)
1590 if ret < 0:
1591 raise make_ex(ret, "error iterating over the objects in ioctx '%s'"
1592 % self.ioctx.name)
1593
1594 def __iter__(self):
1595 return self
1596
1597 def __next__(self):
1598 """
1599 Get the next object name and locator in the pool
1600
1601 :raises: StopIteration
1602 :returns: next rados.Ioctx Object
1603 """
1604 cdef:
1605 const char *key_ = NULL
1606 const char *locator_ = NULL
1607 const char *nspace_ = NULL
1608
1609 with nogil:
1610 ret = rados_nobjects_list_next(self.ctx, &key_, &locator_, &nspace_)
1611
1612 if ret < 0:
1613 raise StopIteration()
1614
1615 key = decode_cstr(key_)
1616 locator = decode_cstr(locator_) if locator_ != NULL else None
1617 nspace = decode_cstr(nspace_) if nspace_ != NULL else None
1618 return Object(self.ioctx, key, locator, nspace)
1619
1620 def __dealloc__(self):
1621 with nogil:
1622 rados_nobjects_list_close(self.ctx)
1623
1624
1625cdef class XattrIterator(object):
1626 """Extended attribute iterator"""
1627
1628 cdef rados_xattrs_iter_t it
1629 cdef char* _oid
1630
1631 cdef public Ioctx ioctx
1632 cdef public object oid
1633
1634 def __cinit__(self, Ioctx ioctx, oid):
1635 self.ioctx = ioctx
1636 self.oid = cstr(oid, 'oid')
1637 self._oid = self.oid
1638
1639 with nogil:
1640 ret = rados_getxattrs(ioctx.io, self._oid, &self.it)
1641 if ret != 0:
1642 raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid)
1643
1644 def __iter__(self):
1645 return self
1646
1647 def __next__(self):
1648 """
1649 Get the next xattr on the object
1650
1651 :raises: StopIteration
1652 :returns: pair - of name and value of the next Xattr
1653 """
1654 cdef:
1655 const char *name_ = NULL
1656 const char *val_ = NULL
1657 size_t len_ = 0
1658
1659 with nogil:
1660 ret = rados_getxattrs_next(self.it, &name_, &val_, &len_)
1661 if ret != 0:
1662 raise make_ex(ret, "error iterating over the extended attributes \
1663in '%s'" % self.oid)
1664 if name_ == NULL:
1665 raise StopIteration()
1666 name = decode_cstr(name_)
1667 val = val_[:len_]
1668 return (name, val)
1669
1670 def __dealloc__(self):
1671 with nogil:
1672 rados_getxattrs_end(self.it)
1673
1674
1675cdef class SnapIterator(object):
1676 """Snapshot iterator"""
1677
1678 cdef public Ioctx ioctx
1679
1680 cdef rados_snap_t *snaps
1681 cdef int max_snap
1682 cdef int cur_snap
1683
1684 def __cinit__(self, Ioctx ioctx):
1685 self.ioctx = ioctx
1686 # We don't know how big a buffer we need until we've called the
1687 # function. So use the exponential doubling strategy.
1688 cdef int num_snaps = 10
1689 while True:
1690 self.snaps = <rados_snap_t*>realloc_chk(self.snaps,
1691 num_snaps *
1692 sizeof(rados_snap_t))
1693
1694 with nogil:
1695 ret = rados_ioctx_snap_list(ioctx.io, self.snaps, num_snaps)
1696 if ret >= 0:
1697 self.max_snap = ret
1698 break
1699 elif ret != -errno.ERANGE:
1700 raise make_ex(ret, "error calling rados_snap_list for \
1701ioctx '%s'" % self.ioctx.name)
1702 num_snaps = num_snaps * 2
1703 self.cur_snap = 0
1704
1705 def __iter__(self):
1706 return self
1707
1708 def __next__(self):
1709 """
1710 Get the next Snapshot
1711
1712 :raises: :class:`Error`, StopIteration
1713 :returns: Snap - next snapshot
1714 """
1715 if self.cur_snap >= self.max_snap:
1716 raise StopIteration
1717
1718 cdef:
1719 rados_snap_t snap_id = self.snaps[self.cur_snap]
1720 int name_len = 10
1721 char *name = NULL
1722
1723 try:
1724 while True:
1725 name = <char *>realloc_chk(name, name_len)
1726 with nogil:
1727 ret = rados_ioctx_snap_get_name(self.ioctx.io, snap_id, name, name_len)
1728 if ret == 0:
1729 break
1730 elif ret != -errno.ERANGE:
1731 raise make_ex(ret, "rados_snap_get_name error")
1732 else:
1733 name_len = name_len * 2
1734
1735 snap = Snap(self.ioctx, decode_cstr(name[:name_len]).rstrip('\0'), snap_id)
1736 self.cur_snap = self.cur_snap + 1
1737 return snap
1738 finally:
1739 free(name)
1740
1741
1742cdef class Snap(object):
1743 """Snapshot object"""
1744 cdef public Ioctx ioctx
1745 cdef public object name
1746
1747 # NOTE(sileht): old API was storing the ctypes object
1748 # instead of the value ....
1749 cdef public rados_snap_t snap_id
1750
1751 def __cinit__(self, Ioctx ioctx, object name, rados_snap_t snap_id):
1752 self.ioctx = ioctx
1753 self.name = name
1754 self.snap_id = snap_id
1755
1756 def __str__(self):
1757 return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \
1758 % (str(self.ioctx), self.name, self.snap_id)
1759
1760 def get_timestamp(self):
1761 """
1762 Find when a snapshot in the current pool occurred
1763
1764 :raises: :class:`Error`
1765 :returns: datetime - the data and time the snapshot was created
1766 """
1767 cdef time_t snap_time
1768
1769 with nogil:
1770 ret = rados_ioctx_snap_get_stamp(self.ioctx.io, self.snap_id, &snap_time)
1771 if ret != 0:
1772 raise make_ex(ret, "rados_ioctx_snap_get_stamp error")
1773 return datetime.fromtimestamp(snap_time)
1774
1775
1776cdef class Completion(object):
1777 """completion object"""
1778
1779 cdef public:
1780 Ioctx ioctx
1781 object oncomplete
1782 object onsafe
1783
1784 cdef:
1785 rados_callback_t complete_cb
1786 rados_callback_t safe_cb
1787 rados_completion_t rados_comp
1788 PyObject* buf
1789
1790 def __cinit__(self, Ioctx ioctx, object oncomplete, object onsafe):
1791 self.oncomplete = oncomplete
1792 self.onsafe = onsafe
1793 self.ioctx = ioctx
1794
1795 def is_safe(self):
1796 """
1797 Is an asynchronous operation safe?
1798
1799 This does not imply that the safe callback has finished.
1800
1801 :returns: True if the operation is safe
1802 """
1803 with nogil:
1804 ret = rados_aio_is_safe(self.rados_comp)
1805 return ret == 1
1806
1807 def is_complete(self):
1808 """
1809 Has an asynchronous operation completed?
1810
1811 This does not imply that the safe callback has finished.
1812
1813 :returns: True if the operation is completed
1814 """
1815 with nogil:
1816 ret = rados_aio_is_complete(self.rados_comp)
1817 return ret == 1
1818
1819 def wait_for_safe(self):
1820 """
1821 Wait for an asynchronous operation to be marked safe
1822
1823 This does not imply that the safe callback has finished.
1824 """
1825 with nogil:
1826 rados_aio_wait_for_safe(self.rados_comp)
1827
1828 def wait_for_complete(self):
1829 """
1830 Wait for an asynchronous operation to complete
1831
1832 This does not imply that the complete callback has finished.
1833 """
1834 with nogil:
1835 rados_aio_wait_for_complete(self.rados_comp)
1836
1837 def wait_for_safe_and_cb(self):
1838 """
1839 Wait for an asynchronous operation to be marked safe and for
1840 the safe callback to have returned
1841 """
1842 with nogil:
1843 rados_aio_wait_for_safe_and_cb(self.rados_comp)
1844
1845 def wait_for_complete_and_cb(self):
1846 """
1847 Wait for an asynchronous operation to complete and for the
1848 complete callback to have returned
1849
1850 :returns: whether the operation is completed
1851 """
1852 with nogil:
1853 ret = rados_aio_wait_for_complete_and_cb(self.rados_comp)
1854 return ret
1855
1856 def get_return_value(self):
1857 """
1858 Get the return value of an asychronous operation
1859
1860 The return value is set when the operation is complete or safe,
1861 whichever comes first.
1862
1863 :returns: int - return value of the operation
1864 """
1865 with nogil:
1866 ret = rados_aio_get_return_value(self.rados_comp)
1867 return ret
1868
1869 def __dealloc__(self):
1870 """
1871 Release a completion
1872
1873 Call this when you no longer need the completion. It may not be
1874 freed immediately if the operation is not acked and committed.
1875 """
1876 ref.Py_XDECREF(self.buf)
1877 self.buf = NULL
1878 if self.rados_comp != NULL:
1879 with nogil:
1880 rados_aio_release(self.rados_comp)
1881 self.rados_comp = NULL
1882
1883 def _complete(self):
1884 self.oncomplete(self)
1885 with self.ioctx.lock:
1886 if self.oncomplete:
1887 self.ioctx.complete_completions.remove(self)
1888
1889 def _safe(self):
1890 self.onsafe(self)
1891 with self.ioctx.lock:
1892 if self.onsafe:
1893 self.ioctx.safe_completions.remove(self)
1894
1895 def _cleanup(self):
1896 with self.ioctx.lock:
1897 if self.oncomplete:
1898 self.ioctx.complete_completions.remove(self)
1899 if self.onsafe:
1900 self.ioctx.safe_completions.remove(self)
1901
1902
1903class OpCtx(object):
1904 def __enter__(self):
1905 return self.create()
1906
1907 def __exit__(self, type, msg, traceback):
1908 self.release()
1909
1910
1911cdef class WriteOp(object):
1912 cdef rados_write_op_t write_op
1913
1914 def create(self):
1915 with nogil:
1916 self.write_op = rados_create_write_op()
1917 return self
1918
1919 def release(self):
1920 with nogil:
1921 rados_release_write_op(self.write_op)
1922
1923 @requires(('exclusive', opt(int)))
1924 def new(self, exclusive=None):
1925 """
1926 Create the object.
1927 """
1928
1929 cdef:
1930 int _exclusive = exclusive
1931
1932 with nogil:
1933 rados_write_op_create(self.write_op, _exclusive, NULL)
1934
1935
1936 def remove(self):
1937 """
1938 Remove object.
1939 """
1940 with nogil:
1941 rados_write_op_remove(self.write_op)
1942
1943 @requires(('flags', int))
1944 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
1945 """
1946 Set flags for the last operation added to this write_op.
1947 :para flags: flags to apply to the last operation
1948 :type flags: int
1949 """
1950
1951 cdef:
1952 int _flags = flags
1953
1954 with nogil:
1955 rados_write_op_set_flags(self.write_op, _flags)
1956
1957 @requires(('to_write', bytes))
1958 def append(self, to_write):
1959 """
1960 Append data to an object synchronously
1961 :param to_write: data to write
1962 :type to_write: bytes
1963 """
1964
1965 cdef:
1966 char *_to_write = to_write
1967 size_t length = len(to_write)
1968
1969 with nogil:
1970 rados_write_op_append(self.write_op, _to_write, length)
1971
1972 @requires(('to_write', bytes))
1973 def write_full(self, to_write):
1974 """
1975 Write whole object, atomically replacing it.
1976 :param to_write: data to write
1977 :type to_write: bytes
1978 """
1979
1980 cdef:
1981 char *_to_write = to_write
1982 size_t length = len(to_write)
1983
1984 with nogil:
1985 rados_write_op_write_full(self.write_op, _to_write, length)
1986
1987 @requires(('to_write', bytes), ('offset', int))
1988 def write(self, to_write, offset=0):
1989 """
1990 Write to offset.
1991 :param to_write: data to write
1992 :type to_write: bytes
1993 :param offset: byte offset in the object to begin writing at
1994 :type offset: int
1995 """
1996
1997 cdef:
1998 char *_to_write = to_write
1999 size_t length = len(to_write)
2000 uint64_t _offset = offset
2001
2002 with nogil:
2003 rados_write_op_write(self.write_op, _to_write, length, _offset)
2004
91327a77
AA
2005 @requires(('version', int))
2006 def assert_version(self, version):
2007 """
2008 Check if object's version is the expected one.
2009 :param version: expected version of the object
2010 :param type: int
2011 """
2012 cdef:
2013 uint64_t _version = version
2014
2015 with nogil:
2016 rados_write_op_assert_version(self.write_op, _version)
2017
7c673cae
FG
2018 @requires(('offset', int), ('length', int))
2019 def zero(self, offset, length):
2020 """
2021 Zero part of an object.
2022 :param offset: byte offset in the object to begin writing at
2023 :type offset: int
2024 :param offset: number of zero to write
2025 :type offset: int
2026 """
2027
2028 cdef:
2029 size_t _length = length
2030 uint64_t _offset = offset
2031
2032 with nogil:
2033 rados_write_op_zero(self.write_op, _length, _offset)
2034
2035 @requires(('offset', int))
2036 def truncate(self, offset):
2037 """
2038 Truncate an object.
2039 :param offset: byte offset in the object to begin truncating at
2040 :type offset: int
2041 """
2042
2043 cdef:
2044 uint64_t _offset = offset
2045
2046 with nogil:
2047 rados_write_op_truncate(self.write_op, _offset)
2048
2049
2050class WriteOpCtx(WriteOp, OpCtx):
2051 """write operation context manager"""
2052
2053
2054cdef class ReadOp(object):
2055 cdef rados_read_op_t read_op
2056
2057 def create(self):
2058 with nogil:
2059 self.read_op = rados_create_read_op()
2060 return self
2061
2062 def release(self):
2063 with nogil:
2064 rados_release_read_op(self.read_op)
2065
2066 @requires(('flags', int))
2067 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
2068 """
2069 Set flags for the last operation added to this read_op.
2070 :para flags: flags to apply to the last operation
2071 :type flags: int
2072 """
2073
2074 cdef:
2075 int _flags = flags
2076
2077 with nogil:
2078 rados_read_op_set_flags(self.read_op, _flags)
2079
2080
2081class ReadOpCtx(ReadOp, OpCtx):
2082 """read operation context manager"""
2083
2084
2085cdef int __aio_safe_cb(rados_completion_t completion, void *args) with gil:
2086 """
2087 Callback to onsafe() for asynchronous operations
2088 """
2089 cdef object cb = <object>args
2090 cb._safe()
2091 return 0
2092
2093
2094cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
2095 """
2096 Callback to oncomplete() for asynchronous operations
2097 """
2098 cdef object cb = <object>args
2099 cb._complete()
2100 return 0
2101
2102
2103cdef class Ioctx(object):
2104 """rados.Ioctx object"""
2105 # NOTE(sileht): attributes declared in .pyd
2106
2107 def __init__(self, name):
2108 self.name = name
2109 self.state = "open"
2110
2111 self.locator_key = ""
2112 self.nspace = ""
2113 self.lock = threading.Lock()
2114 self.safe_completions = []
2115 self.complete_completions = []
2116
2117 def __enter__(self):
2118 return self
2119
2120 def __exit__(self, type_, value, traceback):
2121 self.close()
2122 return False
2123
2124 def __dealloc__(self):
2125 self.close()
2126
2127 def __track_completion(self, completion_obj):
2128 if completion_obj.oncomplete:
2129 with self.lock:
2130 self.complete_completions.append(completion_obj)
2131 if completion_obj.onsafe:
2132 with self.lock:
2133 self.safe_completions.append(completion_obj)
2134
2135 def __get_completion(self, oncomplete, onsafe):
2136 """
2137 Constructs a completion to use with asynchronous operations
2138
2139 :param oncomplete: what to do when the write is safe and complete in memory
2140 on all replicas
2141 :type oncomplete: completion
2142 :param onsafe: what to do when the write is safe and complete on storage
2143 on all replicas
2144 :type onsafe: completion
2145
2146 :raises: :class:`Error`
2147 :returns: completion object
2148 """
2149
2150 completion_obj = Completion(self, oncomplete, onsafe)
2151
2152 cdef:
2153 rados_callback_t complete_cb = NULL
2154 rados_callback_t safe_cb = NULL
2155 rados_completion_t completion
2156 PyObject* p_completion_obj= <PyObject*>completion_obj
2157
2158 if oncomplete:
2159 complete_cb = <rados_callback_t>&__aio_complete_cb
2160 if onsafe:
2161 safe_cb = <rados_callback_t>&__aio_safe_cb
2162
2163 with nogil:
2164 ret = rados_aio_create_completion(p_completion_obj, complete_cb, safe_cb,
2165 &completion)
2166 if ret < 0:
2167 raise make_ex(ret, "error getting a completion")
2168
2169 completion_obj.rados_comp = completion
2170 return completion_obj
2171
2172 @requires(('object_name', str_type), ('oncomplete', opt(Callable)))
2173 def aio_stat(self, object_name, oncomplete):
2174 """
2175 Asynchronously get object stats (size/mtime)
2176
2177 oncomplete will be called with the returned size and mtime
2178 as well as the completion:
2179
2180 oncomplete(completion, size, mtime)
2181
2182 :param object_name: the name of the object to get stats from
2183 :type object_name: str
2184 :param oncomplete: what to do when the stat is complete
2185 :type oncomplete: completion
2186
2187 :raises: :class:`Error`
2188 :returns: completion object
2189 """
2190
2191 object_name = cstr(object_name, 'object_name')
2192
2193 cdef:
2194 Completion completion
2195 char *_object_name = object_name
2196 uint64_t psize
2197 time_t pmtime
2198
2199 def oncomplete_(completion_v):
2200 cdef Completion _completion_v = completion_v
2201 return_value = _completion_v.get_return_value()
2202 if return_value >= 0:
2203 return oncomplete(_completion_v, psize, time.localtime(pmtime))
2204 else:
2205 return oncomplete(_completion_v, None, None)
2206
2207 completion = self.__get_completion(oncomplete_, None)
2208 self.__track_completion(completion)
2209 with nogil:
2210 ret = rados_aio_stat(self.io, _object_name, completion.rados_comp,
2211 &psize, &pmtime)
2212
2213 if ret < 0:
2214 completion._cleanup()
2215 raise make_ex(ret, "error stating %s" % object_name)
2216 return completion
2217
2218 @requires(('object_name', str_type), ('to_write', bytes), ('offset', int),
2219 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2220 def aio_write(self, object_name, to_write, offset=0,
2221 oncomplete=None, onsafe=None):
2222 """
2223 Write data to an object asynchronously
2224
2225 Queues the write and returns.
2226
2227 :param object_name: name of the object
2228 :type object_name: str
2229 :param to_write: data to write
2230 :type to_write: bytes
2231 :param offset: byte offset in the object to begin writing at
2232 :type offset: int
2233 :param oncomplete: what to do when the write is safe and complete in memory
2234 on all replicas
2235 :type oncomplete: completion
2236 :param onsafe: what to do when the write is safe and complete on storage
2237 on all replicas
2238 :type onsafe: completion
2239
2240 :raises: :class:`Error`
2241 :returns: completion object
2242 """
2243
2244 object_name = cstr(object_name, 'object_name')
2245
2246 cdef:
2247 Completion completion
2248 char* _object_name = object_name
2249 char* _to_write = to_write
2250 size_t size = len(to_write)
2251 uint64_t _offset = offset
2252
2253 completion = self.__get_completion(oncomplete, onsafe)
2254 self.__track_completion(completion)
2255 with nogil:
2256 ret = rados_aio_write(self.io, _object_name, completion.rados_comp,
2257 _to_write, size, _offset)
2258 if ret < 0:
2259 completion._cleanup()
2260 raise make_ex(ret, "error writing object %s" % object_name)
2261 return completion
2262
2263 @requires(('object_name', str_type), ('to_write', bytes), ('oncomplete', opt(Callable)),
2264 ('onsafe', opt(Callable)))
2265 def aio_write_full(self, object_name, to_write,
2266 oncomplete=None, onsafe=None):
2267 """
11fdf7f2 2268 Asynchronously write an entire object
7c673cae
FG
2269
2270 The object is filled with the provided data. If the object exists,
2271 it is atomically truncated and then written.
2272 Queues the write and returns.
2273
2274 :param object_name: name of the object
2275 :type object_name: str
2276 :param to_write: data to write
2277 :type to_write: str
2278 :param oncomplete: what to do when the write is safe and complete in memory
2279 on all replicas
2280 :type oncomplete: completion
2281 :param onsafe: what to do when the write is safe and complete on storage
2282 on all replicas
2283 :type onsafe: completion
2284
2285 :raises: :class:`Error`
2286 :returns: completion object
2287 """
2288
2289 object_name = cstr(object_name, 'object_name')
2290
2291 cdef:
2292 Completion completion
2293 char* _object_name = object_name
2294 char* _to_write = to_write
2295 size_t size = len(to_write)
2296
2297 completion = self.__get_completion(oncomplete, onsafe)
2298 self.__track_completion(completion)
2299 with nogil:
2300 ret = rados_aio_write_full(self.io, _object_name,
2301 completion.rados_comp,
2302 _to_write, size)
2303 if ret < 0:
2304 completion._cleanup()
2305 raise make_ex(ret, "error writing object %s" % object_name)
2306 return completion
2307
2308 @requires(('object_name', str_type), ('to_append', bytes), ('oncomplete', opt(Callable)),
2309 ('onsafe', opt(Callable)))
2310 def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):
2311 """
11fdf7f2 2312 Asynchronously append data to an object
7c673cae
FG
2313
2314 Queues the write and returns.
2315
2316 :param object_name: name of the object
2317 :type object_name: str
2318 :param to_append: data to append
2319 :type to_append: str
2320 :param offset: byte offset in the object to begin writing at
2321 :type offset: int
2322 :param oncomplete: what to do when the write is safe and complete in memory
2323 on all replicas
2324 :type oncomplete: completion
2325 :param onsafe: what to do when the write is safe and complete on storage
2326 on all replicas
2327 :type onsafe: completion
2328
2329 :raises: :class:`Error`
2330 :returns: completion object
2331 """
2332 object_name = cstr(object_name, 'object_name')
2333
2334 cdef:
2335 Completion completion
2336 char* _object_name = object_name
2337 char* _to_append = to_append
2338 size_t size = len(to_append)
2339
2340 completion = self.__get_completion(oncomplete, onsafe)
2341 self.__track_completion(completion)
2342 with nogil:
2343 ret = rados_aio_append(self.io, _object_name,
2344 completion.rados_comp,
2345 _to_append, size)
2346 if ret < 0:
2347 completion._cleanup()
2348 raise make_ex(ret, "error appending object %s" % object_name)
2349 return completion
2350
2351 def aio_flush(self):
2352 """
2353 Block until all pending writes in an io context are safe
2354
2355 :raises: :class:`Error`
2356 """
2357 with nogil:
2358 ret = rados_aio_flush(self.io)
2359 if ret < 0:
2360 raise make_ex(ret, "error flushing")
2361
2362 @requires(('object_name', str_type), ('length', int), ('offset', int),
2363 ('oncomplete', opt(Callable)))
2364 def aio_read(self, object_name, length, offset, oncomplete):
2365 """
11fdf7f2 2366 Asynchronously read data from an object
7c673cae
FG
2367
2368 oncomplete will be called with the returned read value as
2369 well as the completion:
2370
2371 oncomplete(completion, data_read)
2372
2373 :param object_name: name of the object to read from
2374 :type object_name: str
2375 :param length: the number of bytes to read
2376 :type length: int
2377 :param offset: byte offset in the object to begin reading from
2378 :type offset: int
2379 :param oncomplete: what to do when the read is complete
2380 :type oncomplete: completion
2381
2382 :raises: :class:`Error`
2383 :returns: completion object
2384 """
2385
2386 object_name = cstr(object_name, 'object_name')
2387
2388 cdef:
2389 Completion completion
2390 char* _object_name = object_name
2391 uint64_t _offset = offset
2392
2393 char *ref_buf
2394 size_t _length = length
2395
2396 def oncomplete_(completion_v):
2397 cdef Completion _completion_v = completion_v
2398 return_value = _completion_v.get_return_value()
2399 if return_value > 0 and return_value != length:
2400 _PyBytes_Resize(&_completion_v.buf, return_value)
2401 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2402
2403 completion = self.__get_completion(oncomplete_, None)
2404 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2405 ret_buf = PyBytes_AsString(completion.buf)
2406 self.__track_completion(completion)
2407 with nogil:
2408 ret = rados_aio_read(self.io, _object_name, completion.rados_comp,
2409 ret_buf, _length, _offset)
2410 if ret < 0:
2411 completion._cleanup()
2412 raise make_ex(ret, "error reading %s" % object_name)
2413 return completion
2414
2415 @requires(('object_name', str_type), ('cls', str_type), ('method', str_type),
2416 ('data', bytes), ('length', int),
2417 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2418 def aio_execute(self, object_name, cls, method, data,
2419 length=8192, oncomplete=None, onsafe=None):
2420 """
2421 Asynchronously execute an OSD class method on an object.
2422
2423 oncomplete and onsafe will be called with the data returned from
2424 the plugin as well as the completion:
2425
2426 oncomplete(completion, data)
2427 onsafe(completion, data)
2428
2429 :param object_name: name of the object
2430 :type object_name: str
2431 :param cls: name of the object class
2432 :type cls: str
2433 :param method: name of the method
2434 :type method: str
2435 :param data: input data
2436 :type data: bytes
2437 :param length: size of output buffer in bytes (default=8192)
2438 :type length: int
2439 :param oncomplete: what to do when the execution is complete
2440 :type oncomplete: completion
2441 :param onsafe: what to do when the execution is safe and complete
2442 :type onsafe: completion
2443
2444 :raises: :class:`Error`
2445 :returns: completion object
2446 """
2447
2448 object_name = cstr(object_name, 'object_name')
2449 cls = cstr(cls, 'cls')
2450 method = cstr(method, 'method')
2451 cdef:
2452 Completion completion
2453 char *_object_name = object_name
2454 char *_cls = cls
2455 char *_method = method
2456 char *_data = data
2457 size_t _data_len = len(data)
2458
2459 char *ref_buf
2460 size_t _length = length
2461
2462 def oncomplete_(completion_v):
2463 cdef Completion _completion_v = completion_v
2464 return_value = _completion_v.get_return_value()
2465 if return_value > 0 and return_value != length:
2466 _PyBytes_Resize(&_completion_v.buf, return_value)
2467 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2468
2469 def onsafe_(completion_v):
2470 cdef Completion _completion_v = completion_v
2471 return_value = _completion_v.get_return_value()
2472 return onsafe(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2473
2474 completion = self.__get_completion(oncomplete_ if oncomplete else None, onsafe_ if onsafe else None)
2475 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2476 ret_buf = PyBytes_AsString(completion.buf)
2477 self.__track_completion(completion)
2478 with nogil:
2479 ret = rados_aio_exec(self.io, _object_name, completion.rados_comp,
2480 _cls, _method, _data, _data_len, ret_buf, _length)
2481 if ret < 0:
2482 completion._cleanup()
2483 raise make_ex(ret, "error executing %s::%s on %s" % (cls, method, object_name))
2484 return completion
2485
2486 @requires(('object_name', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2487 def aio_remove(self, object_name, oncomplete=None, onsafe=None):
2488 """
11fdf7f2 2489 Asynchronously remove an object
7c673cae
FG
2490
2491 :param object_name: name of the object to remove
2492 :type object_name: str
2493 :param oncomplete: what to do when the remove is safe and complete in memory
2494 on all replicas
2495 :type oncomplete: completion
2496 :param onsafe: what to do when the remove is safe and complete on storage
2497 on all replicas
2498 :type onsafe: completion
2499
2500 :raises: :class:`Error`
2501 :returns: completion object
2502 """
2503 object_name = cstr(object_name, 'object_name')
2504
2505 cdef:
2506 Completion completion
2507 char* _object_name = object_name
2508
2509 completion = self.__get_completion(oncomplete, onsafe)
2510 self.__track_completion(completion)
2511 with nogil:
2512 ret = rados_aio_remove(self.io, _object_name,
2513 completion.rados_comp)
2514 if ret < 0:
2515 completion._cleanup()
2516 raise make_ex(ret, "error removing %s" % object_name)
2517 return completion
2518
2519 def require_ioctx_open(self):
2520 """
2521 Checks if the rados.Ioctx object state is 'open'
2522
2523 :raises: IoctxStateError
2524 """
2525 if self.state != "open":
2526 raise IoctxStateError("The pool is %s" % self.state)
2527
7c673cae
FG
2528 @requires(('loc_key', str_type))
2529 def set_locator_key(self, loc_key):
2530 """
2531 Set the key for mapping objects to pgs within an io context.
2532
2533 The key is used instead of the object name to determine which
2534 placement groups an object is put in. This affects all subsequent
2535 operations of the io context - until a different locator key is
2536 set, all objects in this io context will be placed in the same pg.
2537
2538 :param loc_key: the key to use as the object locator, or NULL to discard
2539 any previously set key
2540 :type loc_key: str
2541
2542 :raises: :class:`TypeError`
2543 """
2544 self.require_ioctx_open()
2545 cloc_key = cstr(loc_key, 'loc_key')
2546 cdef char *_loc_key = cloc_key
2547 with nogil:
2548 rados_ioctx_locator_set_key(self.io, _loc_key)
2549 self.locator_key = loc_key
2550
2551 def get_locator_key(self):
2552 """
2553 Get the locator_key of context
2554
2555 :returns: locator_key
2556 """
2557 return self.locator_key
2558
2559 @requires(('snap_id', long))
2560 def set_read(self, snap_id):
2561 """
2562 Set the snapshot for reading objects.
2563
2564 To stop to read from snapshot, use set_read(LIBRADOS_SNAP_HEAD)
2565
2566 :param snap_id: the snapshot Id
2567 :type snap_id: int
2568
2569 :raises: :class:`TypeError`
2570 """
2571 self.require_ioctx_open()
2572 cdef rados_snap_t _snap_id = snap_id
2573 with nogil:
2574 rados_ioctx_snap_set_read(self.io, _snap_id)
2575
2576 @requires(('nspace', str_type))
2577 def set_namespace(self, nspace):
2578 """
2579 Set the namespace for objects within an io context.
2580
2581 The namespace in addition to the object name fully identifies
2582 an object. This affects all subsequent operations of the io context
2583 - until a different namespace is set, all objects in this io context
2584 will be placed in the same namespace.
2585
2586 :param nspace: the namespace to use, or None/"" for the default namespace
2587 :type nspace: str
2588
2589 :raises: :class:`TypeError`
2590 """
2591 self.require_ioctx_open()
2592 if nspace is None:
2593 nspace = ""
2594 cnspace = cstr(nspace, 'nspace')
2595 cdef char *_nspace = cnspace
2596 with nogil:
2597 rados_ioctx_set_namespace(self.io, _nspace)
2598 self.nspace = nspace
2599
2600 def get_namespace(self):
2601 """
2602 Get the namespace of context
2603
2604 :returns: namespace
2605 """
2606 return self.nspace
2607
2608 def close(self):
2609 """
2610 Close a rados.Ioctx object.
2611
2612 This just tells librados that you no longer need to use the io context.
2613 It may not be freed immediately if there are pending asynchronous
2614 requests on it, but you should not use an io context again after
2615 calling this function on it.
2616 """
2617 if self.state == "open":
2618 self.require_ioctx_open()
2619 with nogil:
2620 rados_ioctx_destroy(self.io)
2621 self.state = "closed"
2622
2623
2624 @requires(('key', str_type), ('data', bytes))
2625 def write(self, key, data, offset=0):
2626 """
2627 Write data to an object synchronously
2628
2629 :param key: name of the object
2630 :type key: str
2631 :param data: data to write
2632 :type data: bytes
2633 :param offset: byte offset in the object to begin writing at
2634 :type offset: int
2635
2636 :raises: :class:`TypeError`
2637 :raises: :class:`LogicError`
2638 :returns: int - 0 on success
2639 """
2640 self.require_ioctx_open()
2641
2642 key = cstr(key, 'key')
2643 cdef:
2644 char *_key = key
2645 char *_data = data
2646 size_t length = len(data)
2647 uint64_t _offset = offset
2648
2649 with nogil:
2650 ret = rados_write(self.io, _key, _data, length, _offset)
2651 if ret == 0:
2652 return ret
2653 elif ret < 0:
2654 raise make_ex(ret, "Ioctx.write(%s): failed to write %s"
2655 % (self.name, key))
2656 else:
2657 raise LogicError("Ioctx.write(%s): rados_write \
2658returned %d, but should return zero on success." % (self.name, ret))
2659
2660 @requires(('key', str_type), ('data', bytes))
2661 def write_full(self, key, data):
2662 """
2663 Write an entire object synchronously.
2664
2665 The object is filled with the provided data. If the object exists,
2666 it is atomically truncated and then written.
2667
2668 :param key: name of the object
2669 :type key: str
2670 :param data: data to write
2671 :type data: bytes
2672
2673 :raises: :class:`TypeError`
2674 :raises: :class:`Error`
2675 :returns: int - 0 on success
2676 """
2677 self.require_ioctx_open()
2678 key = cstr(key, 'key')
2679 cdef:
2680 char *_key = key
2681 char *_data = data
2682 size_t length = len(data)
2683
2684 with nogil:
2685 ret = rados_write_full(self.io, _key, _data, length)
2686 if ret == 0:
2687 return ret
2688 elif ret < 0:
2689 raise make_ex(ret, "Ioctx.write_full(%s): failed to write %s"
2690 % (self.name, key))
2691 else:
2692 raise LogicError("Ioctx.write_full(%s): rados_write_full \
2693returned %d, but should return zero on success." % (self.name, ret))
2694
2695 @requires(('key', str_type), ('data', bytes))
2696 def append(self, key, data):
2697 """
2698 Append data to an object synchronously
2699
2700 :param key: name of the object
2701 :type key: str
2702 :param data: data to write
2703 :type data: bytes
2704
2705 :raises: :class:`TypeError`
2706 :raises: :class:`LogicError`
2707 :returns: int - 0 on success
2708 """
2709 self.require_ioctx_open()
2710 key = cstr(key, 'key')
2711 cdef:
2712 char *_key = key
2713 char *_data = data
2714 size_t length = len(data)
2715
2716 with nogil:
2717 ret = rados_append(self.io, _key, _data, length)
2718 if ret == 0:
2719 return ret
2720 elif ret < 0:
2721 raise make_ex(ret, "Ioctx.append(%s): failed to append %s"
2722 % (self.name, key))
2723 else:
2724 raise LogicError("Ioctx.append(%s): rados_append \
2725returned %d, but should return zero on success." % (self.name, ret))
2726
2727 @requires(('key', str_type))
2728 def read(self, key, length=8192, offset=0):
2729 """
2730 Read data from an object synchronously
2731
2732 :param key: name of the object
2733 :type key: str
2734 :param length: the number of bytes to read (default=8192)
2735 :type length: int
2736 :param offset: byte offset in the object to begin reading at
2737 :type offset: int
2738
2739 :raises: :class:`TypeError`
2740 :raises: :class:`Error`
2741 :returns: str - data read from object
2742 """
2743 self.require_ioctx_open()
2744 key = cstr(key, 'key')
2745 cdef:
2746 char *_key = key
2747 char *ret_buf
2748 uint64_t _offset = offset
2749 size_t _length = length
2750 PyObject* ret_s = NULL
2751
2752 ret_s = PyBytes_FromStringAndSize(NULL, length)
2753 try:
2754 ret_buf = PyBytes_AsString(ret_s)
2755 with nogil:
2756 ret = rados_read(self.io, _key, ret_buf, _length, _offset)
2757 if ret < 0:
2758 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2759
2760 if ret != length:
2761 _PyBytes_Resize(&ret_s, ret)
2762
2763 return <object>ret_s
2764 finally:
2765 # We DECREF unconditionally: the cast to object above will have
2766 # INCREFed if necessary. This also takes care of exceptions,
2767 # including if _PyString_Resize fails (that will free the string
2768 # itself and set ret_s to NULL, hence XDECREF).
2769 ref.Py_XDECREF(ret_s)
2770
2771 @requires(('key', str_type), ('cls', str_type), ('method', str_type), ('data', bytes))
2772 def execute(self, key, cls, method, data, length=8192):
2773 """
2774 Execute an OSD class method on an object.
2775
2776 :param key: name of the object
2777 :type key: str
2778 :param cls: name of the object class
2779 :type cls: str
2780 :param method: name of the method
2781 :type method: str
2782 :param data: input data
2783 :type data: bytes
2784 :param length: size of output buffer in bytes (default=8192)
2785 :type length: int
2786
2787 :raises: :class:`TypeError`
2788 :raises: :class:`Error`
2789 :returns: (ret, method output)
2790 """
2791 self.require_ioctx_open()
2792
2793 key = cstr(key, 'key')
2794 cls = cstr(cls, 'cls')
2795 method = cstr(method, 'method')
2796 cdef:
2797 char *_key = key
2798 char *_cls = cls
2799 char *_method = method
2800 char *_data = data
2801 size_t _data_len = len(data)
2802
2803 char *ref_buf
2804 size_t _length = length
2805 PyObject* ret_s = NULL
2806
2807 ret_s = PyBytes_FromStringAndSize(NULL, length)
2808 try:
2809 ret_buf = PyBytes_AsString(ret_s)
2810 with nogil:
2811 ret = rados_exec(self.io, _key, _cls, _method, _data,
2812 _data_len, ret_buf, _length)
2813 if ret < 0:
2814 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2815
2816 if ret != length:
2817 _PyBytes_Resize(&ret_s, ret)
2818
2819 return ret, <object>ret_s
2820 finally:
2821 # We DECREF unconditionally: the cast to object above will have
2822 # INCREFed if necessary. This also takes care of exceptions,
2823 # including if _PyString_Resize fails (that will free the string
2824 # itself and set ret_s to NULL, hence XDECREF).
2825 ref.Py_XDECREF(ret_s)
2826
2827 def get_stats(self):
2828 """
2829 Get pool usage statistics
2830
2831 :returns: dict - contains the following keys:
2832
2833 - ``num_bytes`` (int) - size of pool in bytes
2834
2835 - ``num_kb`` (int) - size of pool in kbytes
2836
2837 - ``num_objects`` (int) - number of objects in the pool
2838
2839 - ``num_object_clones`` (int) - number of object clones
2840
2841 - ``num_object_copies`` (int) - number of object copies
2842
2843 - ``num_objects_missing_on_primary`` (int) - number of objets
2844 missing on primary
2845
2846 - ``num_objects_unfound`` (int) - number of unfound objects
2847
2848 - ``num_objects_degraded`` (int) - number of degraded objects
2849
2850 - ``num_rd`` (int) - bytes read
2851
2852 - ``num_rd_kb`` (int) - kbytes read
2853
2854 - ``num_wr`` (int) - bytes written
2855
2856 - ``num_wr_kb`` (int) - kbytes written
2857 """
2858 self.require_ioctx_open()
2859 cdef rados_pool_stat_t stats
2860 with nogil:
2861 ret = rados_ioctx_pool_stat(self.io, &stats)
2862 if ret < 0:
2863 raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name)
2864 return {'num_bytes': stats.num_bytes,
2865 'num_kb': stats.num_kb,
2866 'num_objects': stats.num_objects,
2867 'num_object_clones': stats.num_object_clones,
2868 'num_object_copies': stats.num_object_copies,
2869 "num_objects_missing_on_primary": stats.num_objects_missing_on_primary,
2870 "num_objects_unfound": stats.num_objects_unfound,
2871 "num_objects_degraded": stats.num_objects_degraded,
2872 "num_rd": stats.num_rd,
2873 "num_rd_kb": stats.num_rd_kb,
2874 "num_wr": stats.num_wr,
2875 "num_wr_kb": stats.num_wr_kb}
2876
2877 @requires(('key', str_type))
2878 def remove_object(self, key):
2879 """
2880 Delete an object
2881
2882 This does not delete any snapshots of the object.
2883
2884 :param key: the name of the object to delete
2885 :type key: str
2886
2887 :raises: :class:`TypeError`
2888 :raises: :class:`Error`
2889 :returns: bool - True on success
2890 """
2891 self.require_ioctx_open()
2892 key = cstr(key, 'key')
2893 cdef:
2894 char *_key = key
2895
2896 with nogil:
2897 ret = rados_remove(self.io, _key)
2898 if ret < 0:
2899 raise make_ex(ret, "Failed to remove '%s'" % key)
2900 return True
2901
2902 @requires(('key', str_type))
2903 def trunc(self, key, size):
2904 """
2905 Resize an object
2906
2907 If this enlarges the object, the new area is logically filled with
2908 zeroes. If this shrinks the object, the excess data is removed.
2909
2910 :param key: the name of the object to resize
2911 :type key: str
2912 :param size: the new size of the object in bytes
2913 :type size: int
2914
2915 :raises: :class:`TypeError`
2916 :raises: :class:`Error`
2917 :returns: int - 0 on success, otherwise raises error
2918 """
2919
2920 self.require_ioctx_open()
2921 key = cstr(key, 'key')
2922 cdef:
2923 char *_key = key
2924 uint64_t _size = size
2925
2926 with nogil:
2927 ret = rados_trunc(self.io, _key, _size)
2928 if ret < 0:
2929 raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))
2930 return ret
2931
2932 @requires(('key', str_type))
2933 def stat(self, key):
2934 """
2935 Get object stats (size/mtime)
2936
2937 :param key: the name of the object to get stats from
2938 :type key: str
2939
2940 :raises: :class:`TypeError`
2941 :raises: :class:`Error`
2942 :returns: (size,timestamp)
2943 """
2944 self.require_ioctx_open()
2945
2946 key = cstr(key, 'key')
2947 cdef:
2948 char *_key = key
2949 uint64_t psize
2950 time_t pmtime
2951
2952 with nogil:
2953 ret = rados_stat(self.io, _key, &psize, &pmtime)
2954 if ret < 0:
2955 raise make_ex(ret, "Failed to stat %r" % key)
2956 return psize, time.localtime(pmtime)
2957
2958 @requires(('key', str_type), ('xattr_name', str_type))
2959 def get_xattr(self, key, xattr_name):
2960 """
2961 Get the value of an extended attribute on an object.
2962
2963 :param key: the name of the object to get xattr from
2964 :type key: str
2965 :param xattr_name: which extended attribute to read
2966 :type xattr_name: str
2967
2968 :raises: :class:`TypeError`
2969 :raises: :class:`Error`
2970 :returns: str - value of the xattr
2971 """
2972 self.require_ioctx_open()
2973
2974 key = cstr(key, 'key')
2975 xattr_name = cstr(xattr_name, 'xattr_name')
2976 cdef:
2977 char *_key = key
2978 char *_xattr_name = xattr_name
2979 size_t ret_length = 4096
2980 char *ret_buf = NULL
2981
2982 try:
2983 while ret_length < 4096 * 1024 * 1024:
2984 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
2985 with nogil:
2986 ret = rados_getxattr(self.io, _key, _xattr_name, ret_buf, ret_length)
2987 if ret == -errno.ERANGE:
2988 ret_length *= 2
2989 elif ret < 0:
2990 raise make_ex(ret, "Failed to get xattr %r" % xattr_name)
2991 else:
2992 break
2993 return ret_buf[:ret]
2994 finally:
2995 free(ret_buf)
2996
2997 @requires(('oid', str_type))
2998 def get_xattrs(self, oid):
2999 """
3000 Start iterating over xattrs on an object.
3001
3002 :param oid: the name of the object to get xattrs from
3003 :type oid: str
3004
3005 :raises: :class:`TypeError`
3006 :raises: :class:`Error`
3007 :returns: XattrIterator
3008 """
3009 self.require_ioctx_open()
3010 return XattrIterator(self, oid)
3011
3012 @requires(('key', str_type), ('xattr_name', str_type), ('xattr_value', bytes))
3013 def set_xattr(self, key, xattr_name, xattr_value):
3014 """
3015 Set an extended attribute on an object.
3016
3017 :param key: the name of the object to set xattr to
3018 :type key: str
3019 :param xattr_name: which extended attribute to set
3020 :type xattr_name: str
3021 :param xattr_value: the value of the extended attribute
3022 :type xattr_value: bytes
3023
3024 :raises: :class:`TypeError`
3025 :raises: :class:`Error`
3026 :returns: bool - True on success, otherwise raise an error
3027 """
3028 self.require_ioctx_open()
3029
3030 key = cstr(key, 'key')
3031 xattr_name = cstr(xattr_name, 'xattr_name')
3032 cdef:
3033 char *_key = key
3034 char *_xattr_name = xattr_name
3035 char *_xattr_value = xattr_value
3036 size_t _xattr_value_len = len(xattr_value)
3037
3038 with nogil:
3039 ret = rados_setxattr(self.io, _key, _xattr_name,
3040 _xattr_value, _xattr_value_len)
3041 if ret < 0:
3042 raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
3043 return True
3044
3045 @requires(('key', str_type), ('xattr_name', str_type))
3046 def rm_xattr(self, key, xattr_name):
3047 """
3048 Removes an extended attribute on from an object.
3049
3050 :param key: the name of the object to remove xattr from
3051 :type key: str
3052 :param xattr_name: which extended attribute to remove
3053 :type xattr_name: str
3054
3055 :raises: :class:`TypeError`
3056 :raises: :class:`Error`
3057 :returns: bool - True on success, otherwise raise an error
3058 """
3059 self.require_ioctx_open()
3060
3061 key = cstr(key, 'key')
3062 xattr_name = cstr(xattr_name, 'xattr_name')
3063 cdef:
3064 char *_key = key
3065 char *_xattr_name = xattr_name
3066
3067 with nogil:
3068 ret = rados_rmxattr(self.io, _key, _xattr_name)
3069 if ret < 0:
3070 raise make_ex(ret, "Failed to delete key %r xattr %r" %
3071 (key, xattr_name))
3072 return True
3073
11fdf7f2
TL
3074 @requires(('obj', str_type), ('msg', str_type), ('timeout_ms', int))
3075 def notify(self, obj, msg='', timeout_ms=5000):
3076 """
3077 Send a rados notification to an object.
3078
3079 :param obj: the name of the object to notify
3080 :type obj: str
3081 :param msg: optional message to send in the notification
3082 :type msg: str
3083 :param timeout_ms: notify timeout (in ms)
3084 :type timeout_ms: int
3085
3086 :raises: :class:`TypeError`
3087 :raises: :class:`Error`
3088 :returns: bool - True on success, otherwise raise an error
3089 """
3090 self.require_ioctx_open()
3091
3092 msglen = len(msg)
3093 obj = cstr(obj, 'obj')
3094 msg = cstr(msg, 'msg')
3095 cdef:
3096 char *_obj = obj
3097 char *_msg = msg
3098 int _msglen = msglen
3099 uint64_t _timeout_ms = timeout_ms
3100
3101 with nogil:
3102 ret = rados_notify2(self.io, _obj, _msg, _msglen, _timeout_ms,
3103 NULL, NULL)
3104 if ret < 0:
3105 raise make_ex(ret, "Failed to notify %r" % (obj))
3106 return True
3107
7c673cae
FG
3108 def list_objects(self):
3109 """
3110 Get ObjectIterator on rados.Ioctx object.
3111
3112 :returns: ObjectIterator
3113 """
3114 self.require_ioctx_open()
3115 return ObjectIterator(self)
3116
3117 def list_snaps(self):
3118 """
3119 Get SnapIterator on rados.Ioctx object.
3120
3121 :returns: SnapIterator
3122 """
3123 self.require_ioctx_open()
3124 return SnapIterator(self)
3125
3126 @requires(('snap_name', str_type))
3127 def create_snap(self, snap_name):
3128 """
3129 Create a pool-wide snapshot
3130
3131 :param snap_name: the name of the snapshot
3132 :type snap_name: str
3133
3134 :raises: :class:`TypeError`
3135 :raises: :class:`Error`
3136 """
3137 self.require_ioctx_open()
3138 snap_name = cstr(snap_name, 'snap_name')
3139 cdef char *_snap_name = snap_name
3140
3141 with nogil:
3142 ret = rados_ioctx_snap_create(self.io, _snap_name)
3143 if ret != 0:
3144 raise make_ex(ret, "Failed to create snap %s" % snap_name)
3145
3146 @requires(('snap_name', str_type))
3147 def remove_snap(self, snap_name):
3148 """
3149 Removes a pool-wide snapshot
3150
3151 :param snap_name: the name of the snapshot
3152 :type snap_name: str
3153
3154 :raises: :class:`TypeError`
3155 :raises: :class:`Error`
3156 """
3157 self.require_ioctx_open()
3158 snap_name = cstr(snap_name, 'snap_name')
3159 cdef char *_snap_name = snap_name
3160
3161 with nogil:
3162 ret = rados_ioctx_snap_remove(self.io, _snap_name)
3163 if ret != 0:
3164 raise make_ex(ret, "Failed to remove snap %s" % snap_name)
3165
3166 @requires(('snap_name', str_type))
3167 def lookup_snap(self, snap_name):
3168 """
3169 Get the id of a pool snapshot
3170
3171 :param snap_name: the name of the snapshot to lookop
3172 :type snap_name: str
3173
3174 :raises: :class:`TypeError`
3175 :raises: :class:`Error`
3176 :returns: Snap - on success
3177 """
3178 self.require_ioctx_open()
3179 csnap_name = cstr(snap_name, 'snap_name')
3180 cdef:
3181 char *_snap_name = csnap_name
3182 rados_snap_t snap_id
3183
3184 with nogil:
3185 ret = rados_ioctx_snap_lookup(self.io, _snap_name, &snap_id)
3186 if ret != 0:
3187 raise make_ex(ret, "Failed to lookup snap %s" % snap_name)
3188 return Snap(self, snap_name, int(snap_id))
3189
3190 @requires(('oid', str_type), ('snap_name', str_type))
3191 def snap_rollback(self, oid, snap_name):
3192 """
3193 Rollback an object to a snapshot
3194
3195 :param oid: the name of the object
3196 :type oid: str
3197 :param snap_name: the name of the snapshot
3198 :type snap_name: str
3199
3200 :raises: :class:`TypeError`
3201 :raises: :class:`Error`
3202 """
3203 self.require_ioctx_open()
3204 oid = cstr(oid, 'oid')
3205 snap_name = cstr(snap_name, 'snap_name')
3206 cdef:
3207 char *_snap_name = snap_name
3208 char *_oid = oid
3209
3210 with nogil:
3211 ret = rados_ioctx_snap_rollback(self.io, _oid, _snap_name)
3212 if ret != 0:
3213 raise make_ex(ret, "Failed to rollback %s" % oid)
3214
28e407b8
AA
3215 def create_self_managed_snap(self):
3216 """
3217 Creates a self-managed snapshot
3218
3219 :returns: snap id on success
3220
3221 :raises: :class:`Error`
3222 """
3223 self.require_ioctx_open()
3224 cdef:
3225 rados_snap_t _snap_id
3226 with nogil:
3227 ret = rados_ioctx_selfmanaged_snap_create(self.io, &_snap_id)
3228 if ret != 0:
3229 raise make_ex(ret, "Failed to create self-managed snapshot")
3230 return int(_snap_id)
3231
3232 @requires(('snap_id', int))
3233 def remove_self_managed_snap(self, snap_id):
3234 """
3235 Removes a self-managed snapshot
3236
3237 :param snap_id: the name of the snapshot
3238 :type snap_id: int
3239
3240 :raises: :class:`TypeError`
3241 :raises: :class:`Error`
3242 """
3243 self.require_ioctx_open()
3244 cdef:
3245 rados_snap_t _snap_id = snap_id
3246 with nogil:
3247 ret = rados_ioctx_selfmanaged_snap_remove(self.io, _snap_id)
3248 if ret != 0:
3249 raise make_ex(ret, "Failed to remove self-managed snapshot")
3250
3251 def set_self_managed_snap_write(self, snaps):
3252 """
3253 Updates the write context to the specified self-managed
3254 snapshot ids.
3255
3256 :param snaps: all associated self-managed snapshot ids
3257 :type snaps: list
3258
3259 :raises: :class:`TypeError`
3260 :raises: :class:`Error`
3261 """
3262 self.require_ioctx_open()
3263 sorted_snaps = []
3264 snap_seq = 0
3265 if snaps:
3266 sorted_snaps = sorted([int(x) for x in snaps], reverse=True)
3267 snap_seq = sorted_snaps[0]
3268
3269 cdef:
3270 rados_snap_t _snap_seq = snap_seq
3271 rados_snap_t *_snaps = NULL
3272 int _num_snaps = len(sorted_snaps)
3273 try:
3274 _snaps = <rados_snap_t *>malloc(_num_snaps * sizeof(rados_snap_t))
3275 for i in range(len(sorted_snaps)):
3276 _snaps[i] = sorted_snaps[i]
3277 with nogil:
3278 ret = rados_ioctx_selfmanaged_snap_set_write_ctx(self.io,
3279 _snap_seq,
3280 _snaps,
3281 _num_snaps)
3282 if ret != 0:
3283 raise make_ex(ret, "Failed to update snapshot write context")
3284 finally:
3285 free(_snaps)
3286
3287 @requires(('oid', str_type), ('snap_id', int))
3288 def rollback_self_managed_snap(self, oid, snap_id):
3289 """
3290 Rolls an specific object back to a self-managed snapshot revision
3291
3292 :param oid: the name of the object
3293 :type oid: str
3294 :param snap_id: the name of the snapshot
3295 :type snap_id: int
3296
3297 :raises: :class:`TypeError`
3298 :raises: :class:`Error`
3299 """
3300 self.require_ioctx_open()
3301 oid = cstr(oid, 'oid')
3302 cdef:
3303 char *_oid = oid
3304 rados_snap_t _snap_id = snap_id
3305 with nogil:
3306 ret = rados_ioctx_selfmanaged_snap_rollback(self.io, _oid, _snap_id)
3307 if ret != 0:
3308 raise make_ex(ret, "Failed to rollback %s" % oid)
3309
7c673cae
FG
3310 def get_last_version(self):
3311 """
3312 Return the version of the last object read or written to.
3313
3314 This exposes the internal version number of the last object read or
3315 written via this io context
3316
3317 :returns: version of the last object used
3318 """
3319 self.require_ioctx_open()
3320 with nogil:
3321 ret = rados_get_last_version(self.io)
3322 return int(ret)
3323
3324 def create_write_op(self):
3325 """
3326 create write operation object.
3327 need call release_write_op after use
3328 """
3329 return WriteOp().create()
3330
3331 def create_read_op(self):
3332 """
3333 create read operation object.
3334 need call release_read_op after use
3335 """
3336 return ReadOp().create()
3337
3338 def release_write_op(self, write_op):
3339 """
3340 release memory alloc by create_write_op
3341 """
3342 write_op.release()
3343
3344 def release_read_op(self, read_op):
3345 """
3346 release memory alloc by create_read_op
3347 :para read_op: read_op object
3348 :type: int
3349 """
3350 read_op.release()
3351
3352 @requires(('write_op', WriteOp), ('keys', tuple), ('values', tuple))
3353 def set_omap(self, write_op, keys, values):
3354 """
3355 set keys values to write_op
3356 :para write_op: write_operation object
3357 :type write_op: WriteOp
3358 :para keys: a tuple of keys
3359 :type keys: tuple
3360 :para values: a tuple of values
3361 :type values: tuple
3362 """
3363
3364 if len(keys) != len(values):
3365 raise Error("Rados(): keys and values must have the same number of items")
3366
3367 keys = cstr_list(keys, 'keys')
3368 cdef:
3369 WriteOp _write_op = write_op
3370 size_t key_num = len(keys)
3371 char **_keys = to_bytes_array(keys)
3372 char **_values = to_bytes_array(values)
3373 size_t *_lens = to_csize_t_array([len(v) for v in values])
3374
3375 try:
3376 with nogil:
3377 rados_write_op_omap_set(_write_op.write_op,
3378 <const char**>_keys,
3379 <const char**>_values,
3380 <const size_t*>_lens, key_num)
3381 finally:
3382 free(_keys)
3383 free(_values)
3384 free(_lens)
3385
3386 @requires(('write_op', WriteOp), ('oid', str_type), ('mtime', opt(int)), ('flags', opt(int)))
3387 def operate_write_op(self, write_op, oid, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3388 """
11fdf7f2 3389 execute the real write operation
7c673cae
FG
3390 :para write_op: write operation object
3391 :type write_op: WriteOp
3392 :para oid: object name
3393 :type oid: str
3394 :para mtime: the time to set the mtime to, 0 for the current time
3395 :type mtime: int
3396 :para flags: flags to apply to the entire operation
3397 :type flags: int
3398 """
3399
3400 oid = cstr(oid, 'oid')
3401 cdef:
3402 WriteOp _write_op = write_op
3403 char *_oid = oid
3404 time_t _mtime = mtime
3405 int _flags = flags
3406
3407 with nogil:
3408 ret = rados_write_op_operate(_write_op.write_op, self.io, _oid, &_mtime, _flags)
3409 if ret != 0:
3410 raise make_ex(ret, "Failed to operate write op for oid %s" % oid)
3411
3412 @requires(('write_op', WriteOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('mtime', opt(int)), ('flags', opt(int)))
3413 def operate_aio_write_op(self, write_op, oid, oncomplete=None, onsafe=None, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3414 """
11fdf7f2 3415 execute the real write operation asynchronously
7c673cae
FG
3416 :para write_op: write operation object
3417 :type write_op: WriteOp
3418 :para oid: object name
3419 :type oid: str
3420 :param oncomplete: what to do when the remove is safe and complete in memory
3421 on all replicas
3422 :type oncomplete: completion
3423 :param onsafe: what to do when the remove is safe and complete on storage
3424 on all replicas
3425 :type onsafe: completion
3426 :para mtime: the time to set the mtime to, 0 for the current time
3427 :type mtime: int
3428 :para flags: flags to apply to the entire operation
3429 :type flags: int
3430
3431 :raises: :class:`Error`
3432 :returns: completion object
3433 """
3434
3435 oid = cstr(oid, 'oid')
3436 cdef:
3437 WriteOp _write_op = write_op
3438 char *_oid = oid
3439 Completion completion
3440 time_t _mtime = mtime
3441 int _flags = flags
3442
3443 completion = self.__get_completion(oncomplete, onsafe)
3444 self.__track_completion(completion)
3445
3446 with nogil:
3447 ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid,
3448 &_mtime, _flags)
3449 if ret != 0:
3450 completion._cleanup()
3451 raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid)
3452 return completion
3453
3454 @requires(('read_op', ReadOp), ('oid', str_type), ('flag', opt(int)))
3455 def operate_read_op(self, read_op, oid, flag=LIBRADOS_OPERATION_NOFLAG):
3456 """
11fdf7f2 3457 execute the real read operation
7c673cae
FG
3458 :para read_op: read operation object
3459 :type read_op: ReadOp
3460 :para oid: object name
3461 :type oid: str
3462 :para flag: flags to apply to the entire operation
3463 :type flag: int
3464 """
3465 oid = cstr(oid, 'oid')
3466 cdef:
3467 ReadOp _read_op = read_op
3468 char *_oid = oid
3469 int _flag = flag
3470
3471 with nogil:
3472 ret = rados_read_op_operate(_read_op.read_op, self.io, _oid, _flag)
3473 if ret != 0:
3474 raise make_ex(ret, "Failed to operate read op for oid %s" % oid)
3475
3476 @requires(('read_op', ReadOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('flag', opt(int)))
3477 def operate_aio_read_op(self, read_op, oid, oncomplete=None, onsafe=None, flag=LIBRADOS_OPERATION_NOFLAG):
3478 """
11fdf7f2 3479 execute the real read operation
7c673cae
FG
3480 :para read_op: read operation object
3481 :type read_op: ReadOp
3482 :para oid: object name
3483 :type oid: str
3484 :param oncomplete: what to do when the remove is safe and complete in memory
3485 on all replicas
3486 :type oncomplete: completion
3487 :param onsafe: what to do when the remove is safe and complete on storage
3488 on all replicas
3489 :type onsafe: completion
3490 :para flag: flags to apply to the entire operation
3491 :type flag: int
3492 """
3493 oid = cstr(oid, 'oid')
3494 cdef:
3495 ReadOp _read_op = read_op
3496 char *_oid = oid
3497 Completion completion
3498 int _flag = flag
3499
3500 completion = self.__get_completion(oncomplete, onsafe)
3501 self.__track_completion(completion)
3502
3503 with nogil:
3504 ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag)
3505 if ret != 0:
3506 completion._cleanup()
3507 raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid)
3508 return completion
3509
3510 @requires(('read_op', ReadOp), ('start_after', str_type), ('filter_prefix', str_type), ('max_return', int))
3511 def get_omap_vals(self, read_op, start_after, filter_prefix, max_return):
3512 """
3513 get the omap values
3514 :para read_op: read operation object
3515 :type read_op: ReadOp
3516 :para start_after: list keys starting after start_after
3517 :type start_after: str
3518 :para filter_prefix: list only keys beginning with filter_prefix
3519 :type filter_prefix: str
3520 :para max_return: list no more than max_return key/value pairs
3521 :type max_return: int
3522 :returns: an iterator over the requested omap values, return value from this action
3523 """
3524
3525 start_after = cstr(start_after, 'start_after') if start_after else None
3526 filter_prefix = cstr(filter_prefix, 'filter_prefix') if filter_prefix else None
3527 cdef:
3528 char *_start_after = opt_str(start_after)
3529 char *_filter_prefix = opt_str(filter_prefix)
3530 ReadOp _read_op = read_op
3531 rados_omap_iter_t iter_addr = NULL
3532 int _max_return = max_return
7c673cae
FG
3533
3534 with nogil:
d2e6a577 3535 rados_read_op_omap_get_vals2(_read_op.read_op, _start_after, _filter_prefix,
91327a77 3536 _max_return, &iter_addr, NULL, NULL)
7c673cae
FG
3537 it = OmapIterator(self)
3538 it.ctx = iter_addr
91327a77 3539 return it, 0 # 0 is meaningless; there for backward-compat
7c673cae
FG
3540
3541 @requires(('read_op', ReadOp), ('start_after', str_type), ('max_return', int))
3542 def get_omap_keys(self, read_op, start_after, max_return):
3543 """
3544 get the omap keys
3545 :para read_op: read operation object
3546 :type read_op: ReadOp
3547 :para start_after: list keys starting after start_after
3548 :type start_after: str
3549 :para max_return: list no more than max_return key/value pairs
3550 :type max_return: int
3551 :returns: an iterator over the requested omap values, return value from this action
3552 """
3553 start_after = cstr(start_after, 'start_after') if start_after else None
3554 cdef:
3555 char *_start_after = opt_str(start_after)
3556 ReadOp _read_op = read_op
3557 rados_omap_iter_t iter_addr = NULL
3558 int _max_return = max_return
7c673cae
FG
3559
3560 with nogil:
d2e6a577 3561 rados_read_op_omap_get_keys2(_read_op.read_op, _start_after,
91327a77 3562 _max_return, &iter_addr, NULL, NULL)
7c673cae
FG
3563 it = OmapIterator(self)
3564 it.ctx = iter_addr
91327a77 3565 return it, 0 # 0 is meaningless; there for backward-compat
7c673cae
FG
3566
3567 @requires(('read_op', ReadOp), ('keys', tuple))
3568 def get_omap_vals_by_keys(self, read_op, keys):
3569 """
3570 get the omap values by keys
3571 :para read_op: read operation object
3572 :type read_op: ReadOp
3573 :para keys: input key tuple
3574 :type keys: tuple
3575 :returns: an iterator over the requested omap values, return value from this action
3576 """
3577 keys = cstr_list(keys, 'keys')
3578 cdef:
3579 ReadOp _read_op = read_op
3580 rados_omap_iter_t iter_addr
3581 char **_keys = to_bytes_array(keys)
3582 size_t key_num = len(keys)
7c673cae
FG
3583
3584 try:
3585 with nogil:
3586 rados_read_op_omap_get_vals_by_keys(_read_op.read_op,
3587 <const char**>_keys,
91327a77 3588 key_num, &iter_addr, NULL)
7c673cae
FG
3589 it = OmapIterator(self)
3590 it.ctx = iter_addr
91327a77 3591 return it, 0 # 0 is meaningless; there for backward-compat
7c673cae
FG
3592 finally:
3593 free(_keys)
3594
3595 @requires(('write_op', WriteOp), ('keys', tuple))
3596 def remove_omap_keys(self, write_op, keys):
3597 """
3598 remove omap keys specifiled
3599 :para write_op: write operation object
3600 :type write_op: WriteOp
3601 :para keys: input key tuple
3602 :type keys: tuple
3603 """
3604
3605 keys = cstr_list(keys, 'keys')
3606 cdef:
3607 WriteOp _write_op = write_op
3608 size_t key_num = len(keys)
3609 char **_keys = to_bytes_array(keys)
3610
3611 try:
3612 with nogil:
3613 rados_write_op_omap_rm_keys(_write_op.write_op, <const char**>_keys, key_num)
3614 finally:
3615 free(_keys)
3616
3617 @requires(('write_op', WriteOp))
3618 def clear_omap(self, write_op):
3619 """
3620 Remove all key/value pairs from an object
3621 :para write_op: write operation object
3622 :type write_op: WriteOp
3623 """
3624
3625 cdef:
3626 WriteOp _write_op = write_op
3627
3628 with nogil:
3629 rados_write_op_omap_clear(_write_op.write_op)
3630
3631 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('desc', str_type),
3632 ('duration', opt(int)), ('flags', int))
3633 def lock_exclusive(self, key, name, cookie, desc="", duration=None, flags=0):
3634
3635 """
3636 Take an exclusive lock on an object
3637
3638 :param key: name of the object
3639 :type key: str
3640 :param name: name of the lock
3641 :type name: str
3642 :param cookie: cookie of the lock
3643 :type cookie: str
3644 :param desc: description of the lock
3645 :type desc: str
3646 :param duration: duration of the lock in seconds
3647 :type duration: int
3648 :param flags: flags
3649 :type flags: int
3650
3651 :raises: :class:`TypeError`
3652 :raises: :class:`Error`
3653 """
3654 self.require_ioctx_open()
3655
3656 key = cstr(key, 'key')
3657 name = cstr(name, 'name')
3658 cookie = cstr(cookie, 'cookie')
3659 desc = cstr(desc, 'desc')
3660
3661 cdef:
3662 char* _key = key
3663 char* _name = name
3664 char* _cookie = cookie
3665 char* _desc = desc
3666 uint8_t _flags = flags
3667 timeval _duration
3668
3669 if duration is None:
3670 with nogil:
3671 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3672 NULL, _flags)
3673 else:
3674 _duration.tv_sec = duration
3675 with nogil:
3676 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3677 &_duration, _flags)
3678
3679 if ret < 0:
3680 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3681
3682 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('tag', str_type),
3683 ('desc', str_type), ('duration', opt(int)), ('flags', int))
3684 def lock_shared(self, key, name, cookie, tag, desc="", duration=None, flags=0):
3685
3686 """
3687 Take a shared lock on an object
3688
3689 :param key: name of the object
3690 :type key: str
3691 :param name: name of the lock
3692 :type name: str
3693 :param cookie: cookie of the lock
3694 :type cookie: str
3695 :param tag: tag of the lock
3696 :type tag: str
3697 :param desc: description of the lock
3698 :type desc: str
3699 :param duration: duration of the lock in seconds
3700 :type duration: int
3701 :param flags: flags
3702 :type flags: int
3703
3704 :raises: :class:`TypeError`
3705 :raises: :class:`Error`
3706 """
3707 self.require_ioctx_open()
3708
3709 key = cstr(key, 'key')
3710 tag = cstr(tag, 'tag')
3711 name = cstr(name, 'name')
3712 cookie = cstr(cookie, 'cookie')
3713 desc = cstr(desc, 'desc')
3714
3715 cdef:
3716 char* _key = key
3717 char* _tag = tag
3718 char* _name = name
3719 char* _cookie = cookie
3720 char* _desc = desc
3721 uint8_t _flags = flags
3722 timeval _duration
3723
3724 if duration is None:
3725 with nogil:
3726 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3727 NULL, _flags)
3728 else:
3729 _duration.tv_sec = duration
3730 with nogil:
3731 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3732 &_duration, _flags)
3733 if ret < 0:
3734 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3735
3736 @requires(('key', str_type), ('name', str_type), ('cookie', str_type))
3737 def unlock(self, key, name, cookie):
3738
3739 """
3740 Release a shared or exclusive lock on an object
3741
3742 :param key: name of the object
3743 :type key: str
3744 :param name: name of the lock
3745 :type name: str
3746 :param cookie: cookie of the lock
3747 :type cookie: str
3748
3749 :raises: :class:`TypeError`
3750 :raises: :class:`Error`
3751 """
3752 self.require_ioctx_open()
3753
3754 key = cstr(key, 'key')
3755 name = cstr(name, 'name')
3756 cookie = cstr(cookie, 'cookie')
3757
3758 cdef:
3759 char* _key = key
3760 char* _name = name
3761 char* _cookie = cookie
3762
3763 with nogil:
3764 ret = rados_unlock(self.io, _key, _name, _cookie)
3765 if ret < 0:
3766 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3767
11fdf7f2
TL
3768 def set_osdmap_full_try(self):
3769 """
3770 Set global osdmap_full_try label to true
3771 """
3772 with nogil:
3773 rados_set_osdmap_full_try(self.io)
3774
3775 def unset_osdmap_full_try(self):
3776 """
3777 Unset
3778 """
3779 with nogil:
3780 rados_unset_osdmap_full_try(self.io)
3781
c07f9fc5
FG
3782 def application_enable(self, app_name, force=False):
3783 """
3784 Enable an application on an OSD pool
3785
3786 :param app_name: application name
3787 :type app_name: str
3788 :param force: False if only a single app should exist per pool
3789 :type expire_seconds: boool
3790
3791 :raises: :class:`Error`
3792 """
3793 app_name = cstr(app_name, 'app_name')
3794 cdef:
3795 char *_app_name = app_name
3796 int _force = (1 if force else 0)
3797
3798 with nogil:
3799 ret = rados_application_enable(self.io, _app_name, _force)
3800 if ret < 0:
3801 raise make_ex(ret, "error enabling application")
3802
3803 def application_list(self):
3804 """
3805 Returns a list of enabled applications
3806
3807 :returns: list of app name string
3808 """
3809 cdef:
3810 size_t length = 128
3811 char *apps = NULL
3812
3813 try:
3814 while True:
3815 apps = <char *>realloc_chk(apps, length)
3816 with nogil:
3817 ret = rados_application_list(self.io, apps, &length)
3818 if ret == 0:
3819 return [decode_cstr(app) for app in
3820 apps[:length].split(b'\0') if app]
3821 elif ret == -errno.ENOENT:
3822 return None
3823 elif ret == -errno.ERANGE:
3824 pass
3825 else:
3826 raise make_ex(ret, "error listing applications")
3827 finally:
3828 free(apps)
3829
3830 def application_metadata_set(self, app_name, key, value):
3831 """
3832 Sets application metadata on an OSD pool
3833
3834 :param app_name: application name
3835 :type app_name: str
3836 :param key: metadata key
3837 :type key: str
3838 :param value: metadata value
3839 :type value: str
3840
3841 :raises: :class:`Error`
3842 """
3843 app_name = cstr(app_name, 'app_name')
3844 key = cstr(key, 'key')
3845 value = cstr(value, 'value')
3846 cdef:
3847 char *_app_name = app_name
3848 char *_key = key
3849 char *_value = value
3850
3851 with nogil:
3852 ret = rados_application_metadata_set(self.io, _app_name, _key,
3853 _value)
3854 if ret < 0:
3855 raise make_ex(ret, "error setting application metadata")
3856
3857 def application_metadata_remove(self, app_name, key):
3858 """
3859 Remove application metadata from an OSD pool
3860
3861 :param app_name: application name
3862 :type app_name: str
3863 :param key: metadata key
3864 :type key: str
3865
3866 :raises: :class:`Error`
3867 """
3868 app_name = cstr(app_name, 'app_name')
3869 key = cstr(key, 'key')
3870 cdef:
3871 char *_app_name = app_name
3872 char *_key = key
3873
3874 with nogil:
3875 ret = rados_application_metadata_remove(self.io, _app_name, _key)
3876 if ret < 0:
3877 raise make_ex(ret, "error removing application metadata")
3878
3879 def application_metadata_list(self, app_name):
3880 """
3881 Returns a list of enabled applications
3882
3883 :param app_name: application name
3884 :type app_name: str
3885 :returns: list of key/value tuples
3886 """
3887 app_name = cstr(app_name, 'app_name')
3888 cdef:
3889 char *_app_name = app_name
3890 size_t key_length = 128
3891 size_t val_length = 128
3892 char *c_keys = NULL
3893 char *c_vals = NULL
3894
3895 try:
3896 while True:
3897 c_keys = <char *>realloc_chk(c_keys, key_length)
3898 c_vals = <char *>realloc_chk(c_vals, val_length)
3899 with nogil:
3900 ret = rados_application_metadata_list(self.io, _app_name,
3901 c_keys, &key_length,
3902 c_vals, &val_length)
3903 if ret == 0:
3904 keys = [decode_cstr(key) for key in
11fdf7f2 3905 c_keys[:key_length].split(b'\0')]
c07f9fc5 3906 vals = [decode_cstr(val) for val in
11fdf7f2
TL
3907 c_vals[:val_length].split(b'\0')]
3908 return zip(keys, vals)[:-1]
c07f9fc5
FG
3909 elif ret == -errno.ERANGE:
3910 pass
3911 else:
3912 raise make_ex(ret, "error listing application metadata")
3913 finally:
3914 free(c_keys)
3915 free(c_vals)
3916
11fdf7f2
TL
3917 def alignment(self):
3918 """
3919 Returns pool alignment
3920
3921 :returns:
3922 Number of alignment bytes required by the current pool, or None if
3923 alignment is not required.
3924 """
3925 cdef:
3926 int requires = 0
3927 uint64_t _alignment
3928
3929 with nogil:
3930 ret = rados_ioctx_pool_requires_alignment2(self.io, &requires)
3931 if ret != 0:
3932 raise make_ex(ret, "error checking alignment")
3933
3934 alignment = None
3935 if requires:
3936 with nogil:
3937 ret = rados_ioctx_pool_required_alignment2(self.io, &_alignment)
3938 if ret != 0:
3939 raise make_ex(ret, "error querying alignment")
3940 alignment = _alignment
3941 return alignment
3942
7c673cae
FG
3943
3944def set_object_locator(func):
3945 def retfunc(self, *args, **kwargs):
3946 if self.locator_key is not None:
3947 old_locator = self.ioctx.get_locator_key()
3948 self.ioctx.set_locator_key(self.locator_key)
3949 retval = func(self, *args, **kwargs)
3950 self.ioctx.set_locator_key(old_locator)
3951 return retval
3952 else:
3953 return func(self, *args, **kwargs)
3954 return retfunc
3955
3956
3957def set_object_namespace(func):
3958 def retfunc(self, *args, **kwargs):
3959 if self.nspace is None:
3960 raise LogicError("Namespace not set properly in context")
3961 old_nspace = self.ioctx.get_namespace()
3962 self.ioctx.set_namespace(self.nspace)
3963 retval = func(self, *args, **kwargs)
3964 self.ioctx.set_namespace(old_nspace)
3965 return retval
3966 return retfunc
3967
3968
3969class Object(object):
3970 """Rados object wrapper, makes the object look like a file"""
3971 def __init__(self, ioctx, key, locator_key=None, nspace=None):
3972 self.key = key
3973 self.ioctx = ioctx
3974 self.offset = 0
3975 self.state = "exists"
3976 self.locator_key = locator_key
3977 self.nspace = "" if nspace is None else nspace
3978
3979 def __str__(self):
3980 return "rados.Object(ioctx=%s,key=%s,nspace=%s,locator=%s)" % \
3981 (str(self.ioctx), self.key, "--default--"
3982 if self.nspace is "" else self.nspace, self.locator_key)
3983
3984 def require_object_exists(self):
3985 if self.state != "exists":
3986 raise ObjectStateError("The object is %s" % self.state)
3987
3988 @set_object_locator
3989 @set_object_namespace
3990 def read(self, length=1024 * 1024):
3991 self.require_object_exists()
3992 ret = self.ioctx.read(self.key, length, self.offset)
3993 self.offset += len(ret)
3994 return ret
3995
3996 @set_object_locator
3997 @set_object_namespace
3998 def write(self, string_to_write):
3999 self.require_object_exists()
4000 ret = self.ioctx.write(self.key, string_to_write, self.offset)
4001 if ret == 0:
4002 self.offset += len(string_to_write)
4003 return ret
4004
4005 @set_object_locator
4006 @set_object_namespace
4007 def remove(self):
4008 self.require_object_exists()
4009 self.ioctx.remove_object(self.key)
4010 self.state = "removed"
4011
4012 @set_object_locator
4013 @set_object_namespace
4014 def stat(self):
4015 self.require_object_exists()
4016 return self.ioctx.stat(self.key)
4017
4018 def seek(self, position):
4019 self.require_object_exists()
4020 self.offset = position
4021
4022 @set_object_locator
4023 @set_object_namespace
4024 def get_xattr(self, xattr_name):
4025 self.require_object_exists()
4026 return self.ioctx.get_xattr(self.key, xattr_name)
4027
4028 @set_object_locator
4029 @set_object_namespace
4030 def get_xattrs(self):
4031 self.require_object_exists()
4032 return self.ioctx.get_xattrs(self.key)
4033
4034 @set_object_locator
4035 @set_object_namespace
4036 def set_xattr(self, xattr_name, xattr_value):
4037 self.require_object_exists()
4038 return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)
4039
4040 @set_object_locator
4041 @set_object_namespace
4042 def rm_xattr(self, xattr_name):
4043 self.require_object_exists()
4044 return self.ioctx.rm_xattr(self.key, xattr_name)
4045
4046MONITOR_LEVELS = [
4047 "debug",
4048 "info",
4049 "warn", "warning",
4050 "err", "error",
4051 "sec",
4052 ]
4053
4054
4055class MonitorLog(object):
4056 # NOTE(sileht): Keep this class for backward compat
4057 # method moved to Rados.monitor_log()
4058 """
4059 For watching cluster log messages. Instantiate an object and keep
4060 it around while callback is periodically called. Construct with
4061 'level' to monitor 'level' messages (one of MONITOR_LEVELS).
4062 arg will be passed to the callback.
4063
4064 callback will be called with:
4065 arg (given to __init__)
4066 line (the full line, including timestamp, who, level, msg)
4067 who (which entity issued the log message)
4068 timestamp_sec (sec of a struct timespec)
4069 timestamp_nsec (sec of a struct timespec)
4070 seq (sequence number)
4071 level (string representing the level of the log message)
4072 msg (the message itself)
4073 callback's return value is ignored
4074 """
4075 def __init__(self, cluster, level, callback, arg):
4076 self.level = level
4077 self.callback = callback
4078 self.arg = arg
4079 self.cluster = cluster
4080 self.cluster.monitor_log(level, callback, arg)
4081