]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/rados/rados.pyx
buildsys: use download.ceph.com to download source tar ball
[ceph.git] / ceph / src / pybind / rados / rados.pyx
CommitLineData
7c673cae
FG
1# cython: embedsignature=True
2"""
3This module is a thin wrapper around librados.
4
5Error codes from librados are turned into exceptions that subclass
6:class:`Error`. Almost all methods may raise :class:`Error(the base class of all rados exceptions), :class:`PermissionError`
7(the base class of all rados exceptions), :class:`PermissionError`
8and :class:`IOError`, in addition to those documented for the
9method.
10"""
11# Copyright 2011 Josh Durgin
12# Copyright 2011, Hannu Valtonen <hannu.valtonen@ormod.com>
13# Copyright 2015 Hector Martin <marcan@marcan.st>
14# Copyright 2016 Mehdi Abaakouk <sileht@redhat.com>
15
16from cpython cimport PyObject, ref
17from cpython.pycapsule cimport *
18from libc cimport errno
19from libc.stdint cimport *
20from libc.stdlib cimport malloc, realloc, free
21
22import sys
23import threading
24import time
25
11fdf7f2
TL
26try:
27 from collections.abc import Callable
28except ImportError:
29 from collections import Callable
7c673cae
FG
30from datetime import datetime
31from functools import partial, wraps
32from itertools import chain
33
34# Are we running Python 2.x
35if sys.version_info[0] < 3:
36 str_type = basestring
37else:
38 str_type = str
39
40
41cdef extern from "Python.h":
42 # These are in cpython/string.pxd, but use "object" types instead of
43 # PyObject*, which invokes assumptions in cpython that we need to
44 # legitimately break to implement zero-copy string buffers in Ioctx.read().
45 # This is valid use of the Python API and documented as a special case.
46 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
47 char* PyBytes_AsString(PyObject *string) except NULL
48 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
49 void PyEval_InitThreads()
50
51
52cdef extern from "time.h":
53 ctypedef long int time_t
54 ctypedef long int suseconds_t
55
56
57cdef extern from "sys/time.h":
58 cdef struct timeval:
59 time_t tv_sec
60 suseconds_t tv_usec
61
62
63cdef extern from "rados/rados_types.h" nogil:
64 cdef char* _LIBRADOS_ALL_NSPACES "LIBRADOS_ALL_NSPACES"
65
66
67cdef extern from "rados/librados.h" nogil:
68 enum:
69 _LIBRADOS_OP_FLAG_EXCL "LIBRADOS_OP_FLAG_EXCL"
70 _LIBRADOS_OP_FLAG_FAILOK "LIBRADOS_OP_FLAG_FAILOK"
71 _LIBRADOS_OP_FLAG_FADVISE_RANDOM "LIBRADOS_OP_FLAG_FADVISE_RANDOM"
72 _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL "LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL"
73 _LIBRADOS_OP_FLAG_FADVISE_WILLNEED "LIBRADOS_OP_FLAG_FADVISE_WILLNEED"
74 _LIBRADOS_OP_FLAG_FADVISE_DONTNEED "LIBRADOS_OP_FLAG_FADVISE_DONTNEED"
75 _LIBRADOS_OP_FLAG_FADVISE_NOCACHE "LIBRADOS_OP_FLAG_FADVISE_NOCACHE"
76
77
78 enum:
79 _LIBRADOS_OPERATION_NOFLAG "LIBRADOS_OPERATION_NOFLAG"
80 _LIBRADOS_OPERATION_BALANCE_READS "LIBRADOS_OPERATION_BALANCE_READS"
81 _LIBRADOS_OPERATION_LOCALIZE_READS "LIBRADOS_OPERATION_LOCALIZE_READS"
82 _LIBRADOS_OPERATION_ORDER_READS_WRITES "LIBRADOS_OPERATION_ORDER_READS_WRITES"
83 _LIBRADOS_OPERATION_IGNORE_CACHE "LIBRADOS_OPERATION_IGNORE_CACHE"
84 _LIBRADOS_OPERATION_SKIPRWLOCKS "LIBRADOS_OPERATION_SKIPRWLOCKS"
85 _LIBRADOS_OPERATION_IGNORE_OVERLAY "LIBRADOS_OPERATION_IGNORE_OVERLAY"
86 _LIBRADOS_CREATE_EXCLUSIVE "LIBRADOS_CREATE_EXCLUSIVE"
87 _LIBRADOS_CREATE_IDEMPOTENT "LIBRADOS_CREATE_IDEMPOTENT"
88
89 cdef uint64_t _LIBRADOS_SNAP_HEAD "LIBRADOS_SNAP_HEAD"
90
7c673cae
FG
91 ctypedef void* rados_xattrs_iter_t
92 ctypedef void* rados_omap_iter_t
93 ctypedef void* rados_list_ctx_t
94 ctypedef uint64_t rados_snap_t
95 ctypedef void *rados_write_op_t
96 ctypedef void *rados_read_op_t
97 ctypedef void *rados_completion_t
98 ctypedef void (*rados_callback_t)(rados_completion_t cb, void *arg)
99 ctypedef void (*rados_log_callback_t)(void *arg, const char *line, const char *who,
100 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
224ce89b 101 ctypedef void (*rados_log_callback2_t)(void *arg, const char *line, const char *channel, const char *who, const char *name,
31f18b77 102 uint64_t sec, uint64_t nsec, uint64_t seq, const char *level, const char *msg)
7c673cae
FG
103
104
105 cdef struct rados_cluster_stat_t:
106 uint64_t kb
107 uint64_t kb_used
108 uint64_t kb_avail
109 uint64_t num_objects
110
111 cdef struct rados_pool_stat_t:
112 uint64_t num_bytes
113 uint64_t num_kb
114 uint64_t num_objects
115 uint64_t num_object_clones
116 uint64_t num_object_copies
117 uint64_t num_objects_missing_on_primary
118 uint64_t num_objects_unfound
119 uint64_t num_objects_degraded
120 uint64_t num_rd
121 uint64_t num_rd_kb
122 uint64_t num_wr
123 uint64_t num_wr_kb
124
125 void rados_buffer_free(char *buf)
126
127 void rados_version(int *major, int *minor, int *extra)
128 int rados_create2(rados_t *pcluster, const char *const clustername,
129 const char * const name, uint64_t flags)
130 int rados_create_with_context(rados_t *cluster, rados_config_t cct)
131 int rados_connect(rados_t cluster)
132 void rados_shutdown(rados_t cluster)
11fdf7f2 133 uint64_t rados_get_instance_id(rados_t cluster)
7c673cae
FG
134 int rados_conf_read_file(rados_t cluster, const char *path)
135 int rados_conf_parse_argv_remainder(rados_t cluster, int argc, const char **argv, const char **remargv)
136 int rados_conf_parse_env(rados_t cluster, const char *var)
137 int rados_conf_set(rados_t cluster, char *option, const char *value)
138 int rados_conf_get(rados_t cluster, char *option, char *buf, size_t len)
139
140 int rados_ioctx_pool_stat(rados_ioctx_t io, rados_pool_stat_t *stats)
141 int64_t rados_pool_lookup(rados_t cluster, const char *pool_name)
142 int rados_pool_reverse_lookup(rados_t cluster, int64_t id, char *buf, size_t maxlen)
143 int rados_pool_create(rados_t cluster, const char *pool_name)
7c673cae 144 int rados_pool_create_with_crush_rule(rados_t cluster, const char *pool_name, uint8_t crush_rule_num)
7c673cae
FG
145 int rados_pool_get_base_tier(rados_t cluster, int64_t pool, int64_t *base_tier)
146 int rados_pool_list(rados_t cluster, char *buf, size_t len)
147 int rados_pool_delete(rados_t cluster, const char *pool_name)
148 int rados_inconsistent_pg_list(rados_t cluster, int64_t pool, char *buf, size_t len)
149
150 int rados_cluster_stat(rados_t cluster, rados_cluster_stat_t *result)
151 int rados_cluster_fsid(rados_t cluster, char *buf, size_t len)
152 int rados_blacklist_add(rados_t cluster, char *client_address, uint32_t expire_seconds)
c07f9fc5
FG
153 int rados_application_enable(rados_ioctx_t io, const char *app_name,
154 int force)
11fdf7f2
TL
155 void rados_set_osdmap_full_try(rados_ioctx_t io)
156 void rados_unset_osdmap_full_try(rados_ioctx_t io)
c07f9fc5
FG
157 int rados_application_list(rados_ioctx_t io, char *values,
158 size_t *values_len)
159 int rados_application_metadata_get(rados_ioctx_t io, const char *app_name,
160 const char *key, char *value,
161 size_t *value_len)
162 int rados_application_metadata_set(rados_ioctx_t io, const char *app_name,
163 const char *key, const char *value)
164 int rados_application_metadata_remove(rados_ioctx_t io,
165 const char *app_name, const char *key)
166 int rados_application_metadata_list(rados_ioctx_t io,
167 const char *app_name, char *keys,
168 size_t *key_len, char *values,
169 size_t *value_len)
7c673cae
FG
170 int rados_ping_monitor(rados_t cluster, const char *mon_id, char **outstr, size_t *outstrlen)
171 int rados_mon_command(rados_t cluster, const char **cmd, size_t cmdlen,
172 const char *inbuf, size_t inbuflen,
173 char **outbuf, size_t *outbuflen,
174 char **outs, size_t *outslen)
175 int rados_mgr_command(rados_t cluster, const char **cmd, size_t cmdlen,
176 const char *inbuf, size_t inbuflen,
177 char **outbuf, size_t *outbuflen,
178 char **outs, size_t *outslen)
179 int rados_mon_command_target(rados_t cluster, const char *name, const char **cmd, size_t cmdlen,
180 const char *inbuf, size_t inbuflen,
181 char **outbuf, size_t *outbuflen,
182 char **outs, size_t *outslen)
183 int rados_osd_command(rados_t cluster, int osdid, const char **cmd, size_t cmdlen,
184 const char *inbuf, size_t inbuflen,
185 char **outbuf, size_t *outbuflen,
186 char **outs, size_t *outslen)
187 int rados_pg_command(rados_t cluster, const char *pgstr, const char **cmd, size_t cmdlen,
188 const char *inbuf, size_t inbuflen,
189 char **outbuf, size_t *outbuflen,
190 char **outs, size_t *outslen)
191 int rados_monitor_log(rados_t cluster, const char *level, rados_log_callback_t cb, void *arg)
31f18b77 192 int rados_monitor_log2(rados_t cluster, const char *level, rados_log_callback2_t cb, void *arg)
7c673cae
FG
193
194 int rados_wait_for_latest_osdmap(rados_t cluster)
195
11fdf7f2
TL
196 int rados_service_register(rados_t cluster, const char *service, const char *daemon, const char *metadata_dict)
197 int rados_service_update_status(rados_t cluster, const char *status_dict)
198
7c673cae 199 int rados_ioctx_create(rados_t cluster, const char *pool_name, rados_ioctx_t *ioctx)
11fdf7f2 200 int rados_ioctx_create2(rados_t cluster, int64_t pool_id, rados_ioctx_t *ioctx)
7c673cae 201 void rados_ioctx_destroy(rados_ioctx_t io)
7c673cae
FG
202 void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key)
203 void rados_ioctx_set_namespace(rados_ioctx_t io, const char * nspace)
204
205 uint64_t rados_get_last_version(rados_ioctx_t io)
206 int rados_stat(rados_ioctx_t io, const char *o, uint64_t *psize, time_t *pmtime)
207 int rados_write(rados_ioctx_t io, const char *oid, const char *buf, size_t len, uint64_t off)
208 int rados_write_full(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
209 int rados_append(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
210 int rados_read(rados_ioctx_t io, const char *oid, char *buf, size_t len, uint64_t off)
211 int rados_remove(rados_ioctx_t io, const char *oid)
212 int rados_trunc(rados_ioctx_t io, const char *oid, uint64_t size)
213 int rados_getxattr(rados_ioctx_t io, const char *o, const char *name, char *buf, size_t len)
214 int rados_setxattr(rados_ioctx_t io, const char *o, const char *name, const char *buf, size_t len)
215 int rados_rmxattr(rados_ioctx_t io, const char *o, const char *name)
216 int rados_getxattrs(rados_ioctx_t io, const char *oid, rados_xattrs_iter_t *iter)
217 int rados_getxattrs_next(rados_xattrs_iter_t iter, const char **name, const char **val, size_t *len)
218 void rados_getxattrs_end(rados_xattrs_iter_t iter)
219
220 int rados_nobjects_list_open(rados_ioctx_t io, rados_list_ctx_t *ctx)
221 int rados_nobjects_list_next(rados_list_ctx_t ctx, const char **entry, const char **key, const char **nspace)
222 void rados_nobjects_list_close(rados_list_ctx_t ctx)
223
11fdf7f2
TL
224 int rados_ioctx_pool_requires_alignment2(rados_ioctx_t io, int * requires)
225 int rados_ioctx_pool_required_alignment2(rados_ioctx_t io, uint64_t * alignment)
226
7c673cae
FG
227 int rados_ioctx_snap_rollback(rados_ioctx_t io, const char * oid, const char * snapname)
228 int rados_ioctx_snap_create(rados_ioctx_t io, const char * snapname)
229 int rados_ioctx_snap_remove(rados_ioctx_t io, const char * snapname)
230 int rados_ioctx_snap_lookup(rados_ioctx_t io, const char * name, rados_snap_t * id)
231 int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id, char * name, int maxlen)
232 void rados_ioctx_snap_set_read(rados_ioctx_t io, rados_snap_t snap)
233 int rados_ioctx_snap_list(rados_ioctx_t io, rados_snap_t * snaps, int maxlen)
234 int rados_ioctx_snap_get_stamp(rados_ioctx_t io, rados_snap_t id, time_t * t)
235
28e407b8
AA
236 int rados_ioctx_selfmanaged_snap_create(rados_ioctx_t io,
237 rados_snap_t *snapid)
238 int rados_ioctx_selfmanaged_snap_remove(rados_ioctx_t io,
239 rados_snap_t snapid)
240 int rados_ioctx_selfmanaged_snap_set_write_ctx(rados_ioctx_t io,
241 rados_snap_t snap_seq,
242 rados_snap_t *snap,
243 int num_snaps)
244 int rados_ioctx_selfmanaged_snap_rollback(rados_ioctx_t io, const char *oid,
245 rados_snap_t snapid)
246
7c673cae
FG
247 int rados_lock_exclusive(rados_ioctx_t io, const char * oid, const char * name,
248 const char * cookie, const char * desc,
249 timeval * duration, uint8_t flags)
250 int rados_lock_shared(rados_ioctx_t io, const char * o, const char * name,
251 const char * cookie, const char * tag, const char * desc,
252 timeval * duration, uint8_t flags)
253 int rados_unlock(rados_ioctx_t io, const char * o, const char * name, const char * cookie)
254
255 rados_write_op_t rados_create_write_op()
256 void rados_release_write_op(rados_write_op_t write_op)
257
258 rados_read_op_t rados_create_read_op()
259 void rados_release_read_op(rados_read_op_t read_op)
260
261 int rados_aio_create_completion(void * cb_arg, rados_callback_t cb_complete, rados_callback_t cb_safe, rados_completion_t * pc)
262 void rados_aio_release(rados_completion_t c)
263 int rados_aio_stat(rados_ioctx_t io, const char *oid, rados_completion_t completion, uint64_t *psize, time_t *pmtime)
264 int rados_aio_write(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len, uint64_t off)
265 int rados_aio_append(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
266 int rados_aio_write_full(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
267 int rados_aio_remove(rados_ioctx_t io, const char * oid, rados_completion_t completion)
268 int rados_aio_read(rados_ioctx_t io, const char * oid, rados_completion_t completion, char * buf, size_t len, uint64_t off)
269 int rados_aio_flush(rados_ioctx_t io)
270
271 int rados_aio_get_return_value(rados_completion_t c)
272 int rados_aio_wait_for_complete_and_cb(rados_completion_t c)
273 int rados_aio_wait_for_safe_and_cb(rados_completion_t c)
274 int rados_aio_wait_for_complete(rados_completion_t c)
275 int rados_aio_wait_for_safe(rados_completion_t c)
276 int rados_aio_is_complete(rados_completion_t c)
277 int rados_aio_is_safe(rados_completion_t c)
278
279 int rados_exec(rados_ioctx_t io, const char * oid, const char * cls, const char * method,
280 const char * in_buf, size_t in_len, char * buf, size_t out_len)
281 int rados_aio_exec(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * cls, const char * method,
282 const char * in_buf, size_t in_len, char * buf, size_t out_len)
283
284 int rados_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, const char * oid, time_t * mtime, int flags)
285 int rados_aio_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, time_t *mtime, int flags)
286 void rados_write_op_omap_set(rados_write_op_t write_op, const char * const* keys, const char * const* vals, const size_t * lens, size_t num)
287 void rados_write_op_omap_rm_keys(rados_write_op_t write_op, const char * const* keys, size_t keys_len)
288 void rados_write_op_omap_clear(rados_write_op_t write_op)
289 void rados_write_op_set_flags(rados_write_op_t write_op, int flags)
290
291 void rados_write_op_create(rados_write_op_t write_op, int exclusive, const char *category)
292 void rados_write_op_append(rados_write_op_t write_op, const char *buffer, size_t len)
293 void rados_write_op_write_full(rados_write_op_t write_op, const char *buffer, size_t len)
91327a77 294 void rados_write_op_assert_version(rados_write_op_t write_op, uint64_t ver)
7c673cae
FG
295 void rados_write_op_write(rados_write_op_t write_op, const char *buffer, size_t len, uint64_t offset)
296 void rados_write_op_remove(rados_write_op_t write_op)
297 void rados_write_op_truncate(rados_write_op_t write_op, uint64_t offset)
298 void rados_write_op_zero(rados_write_op_t write_op, uint64_t offset, uint64_t len)
299
d2e6a577
FG
300 void rados_read_op_omap_get_vals2(rados_read_op_t read_op, const char * start_after, const char * filter_prefix, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
301 void rados_read_op_omap_get_keys2(rados_read_op_t read_op, const char * start_after, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
7c673cae
FG
302 void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op, const char * const* keys, size_t keys_len, rados_omap_iter_t * iter, int * prval)
303 int rados_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, const char * oid, int flags)
304 int rados_aio_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, int flags)
305 void rados_read_op_set_flags(rados_read_op_t read_op, int flags)
306 int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len)
307 void rados_omap_get_end(rados_omap_iter_t iter)
11fdf7f2 308 int rados_notify2(rados_ioctx_t io, const char * o, const char *buf, int buf_len, uint64_t timeout_ms, char **reply_buffer, size_t *reply_buffer_len)
7c673cae
FG
309
310
311LIBRADOS_OP_FLAG_EXCL = _LIBRADOS_OP_FLAG_EXCL
312LIBRADOS_OP_FLAG_FAILOK = _LIBRADOS_OP_FLAG_FAILOK
313LIBRADOS_OP_FLAG_FADVISE_RANDOM = _LIBRADOS_OP_FLAG_FADVISE_RANDOM
314LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL = _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL
315LIBRADOS_OP_FLAG_FADVISE_WILLNEED = _LIBRADOS_OP_FLAG_FADVISE_WILLNEED
316LIBRADOS_OP_FLAG_FADVISE_DONTNEED = _LIBRADOS_OP_FLAG_FADVISE_DONTNEED
317LIBRADOS_OP_FLAG_FADVISE_NOCACHE = _LIBRADOS_OP_FLAG_FADVISE_NOCACHE
318
319LIBRADOS_SNAP_HEAD = _LIBRADOS_SNAP_HEAD
320
321LIBRADOS_OPERATION_NOFLAG = _LIBRADOS_OPERATION_NOFLAG
322LIBRADOS_OPERATION_BALANCE_READS = _LIBRADOS_OPERATION_BALANCE_READS
323LIBRADOS_OPERATION_LOCALIZE_READS = _LIBRADOS_OPERATION_LOCALIZE_READS
324LIBRADOS_OPERATION_ORDER_READS_WRITES = _LIBRADOS_OPERATION_ORDER_READS_WRITES
325LIBRADOS_OPERATION_IGNORE_CACHE = _LIBRADOS_OPERATION_IGNORE_CACHE
326LIBRADOS_OPERATION_SKIPRWLOCKS = _LIBRADOS_OPERATION_SKIPRWLOCKS
327LIBRADOS_OPERATION_IGNORE_OVERLAY = _LIBRADOS_OPERATION_IGNORE_OVERLAY
328
329LIBRADOS_ALL_NSPACES = _LIBRADOS_ALL_NSPACES.decode('utf-8')
330
331LIBRADOS_CREATE_EXCLUSIVE = _LIBRADOS_CREATE_EXCLUSIVE
332LIBRADOS_CREATE_IDEMPOTENT = _LIBRADOS_CREATE_IDEMPOTENT
333
334ANONYMOUS_AUID = 0xffffffffffffffff
335ADMIN_AUID = 0
336
337
338class Error(Exception):
339 """ `Error` class, derived from `Exception` """
224ce89b 340 def __init__(self, message, errno=None):
1adf2230 341 super(Exception, self).__init__(message)
7c673cae 342 self.errno = errno
7c673cae
FG
343
344 def __str__(self):
1adf2230 345 msg = super(Exception, self).__str__()
224ce89b
WB
346 if self.errno is None:
347 return msg
348 return '[errno {0}] {1}'.format(self.errno, msg)
7c673cae 349
224ce89b
WB
350 def __reduce__(self):
351 return (self.__class__, (self.message, self.errno))
7c673cae 352
1adf2230
AA
353class InvalidArgumentError(Error):
354 pass
355
356class OSError(Error):
357 """ `OSError` class, derived from `Error` """
358 pass
359
7c673cae
FG
360class InterruptedOrTimeoutError(OSError):
361 """ `InterruptedOrTimeoutError` class, derived from `OSError` """
362 pass
363
364
365class PermissionError(OSError):
366 """ `PermissionError` class, derived from `OSError` """
367 pass
368
369
370class PermissionDeniedError(OSError):
371 """ deal with EACCES related. """
372 pass
373
374
375class ObjectNotFound(OSError):
376 """ `ObjectNotFound` class, derived from `OSError` """
377 pass
378
379
380class NoData(OSError):
381 """ `NoData` class, derived from `OSError` """
382 pass
383
384
385class ObjectExists(OSError):
386 """ `ObjectExists` class, derived from `OSError` """
387 pass
388
389
390class ObjectBusy(OSError):
391 """ `ObjectBusy` class, derived from `IOError` """
392 pass
393
394
395class IOError(OSError):
396 """ `ObjectBusy` class, derived from `OSError` """
397 pass
398
399
400class NoSpace(OSError):
401 """ `NoSpace` class, derived from `OSError` """
402 pass
403
404
405class RadosStateError(Error):
406 """ `RadosStateError` class, derived from `Error` """
407 pass
408
409
410class IoctxStateError(Error):
411 """ `IoctxStateError` class, derived from `Error` """
412 pass
413
414
415class ObjectStateError(Error):
416 """ `ObjectStateError` class, derived from `Error` """
417 pass
418
419
420class LogicError(Error):
421 """ `` class, derived from `Error` """
422 pass
423
424
425class TimedOut(OSError):
426 """ `TimedOut` class, derived from `OSError` """
427 pass
428
429
430IF UNAME_SYSNAME == "FreeBSD":
431 cdef errno_to_exception = {
432 errno.EPERM : PermissionError,
433 errno.ENOENT : ObjectNotFound,
434 errno.EIO : IOError,
435 errno.ENOSPC : NoSpace,
436 errno.EEXIST : ObjectExists,
437 errno.EBUSY : ObjectBusy,
438 errno.ENOATTR : NoData,
439 errno.EINTR : InterruptedOrTimeoutError,
440 errno.ETIMEDOUT : TimedOut,
441 errno.EACCES : PermissionDeniedError,
442 errno.EINVAL : InvalidArgumentError,
443 }
444ELSE:
445 cdef errno_to_exception = {
446 errno.EPERM : PermissionError,
447 errno.ENOENT : ObjectNotFound,
448 errno.EIO : IOError,
449 errno.ENOSPC : NoSpace,
450 errno.EEXIST : ObjectExists,
451 errno.EBUSY : ObjectBusy,
452 errno.ENODATA : NoData,
453 errno.EINTR : InterruptedOrTimeoutError,
454 errno.ETIMEDOUT : TimedOut,
455 errno.EACCES : PermissionDeniedError,
456 errno.EINVAL : InvalidArgumentError,
457 }
458
459
460cdef make_ex(ret, msg):
461 """
462 Translate a librados return code into an exception.
463
464 :param ret: the return code
465 :type ret: int
466 :param msg: the error message to use
467 :type msg: str
468 :returns: a subclass of :class:`Error`
469 """
470 ret = abs(ret)
471 if ret in errno_to_exception:
224ce89b 472 return errno_to_exception[ret](msg, errno=ret)
7c673cae 473 else:
224ce89b 474 return OSError(msg, errno=ret)
7c673cae
FG
475
476
477# helper to specify an optional argument, where in addition to `cls`, `None`
478# is also acceptable
479def opt(cls):
480 return (cls, None)
481
482
483# validate argument types of an instance method
484# kwargs is an un-ordered dict, so use args instead
485def requires(*types):
486 def is_type_of(v, t):
487 if t is None:
488 return v is None
489 else:
490 return isinstance(v, t)
491
492 def check_type(val, arg_name, arg_type):
493 if isinstance(arg_type, tuple):
494 if any(is_type_of(val, t) for t in arg_type):
495 return
496 type_names = ' or '.join('None' if t is None else t.__name__
497 for t in arg_type)
498 raise TypeError('%s must be %s' % (arg_name, type_names))
499 else:
500 if is_type_of(val, arg_type):
501 return
502 assert(arg_type is not None)
503 raise TypeError('%s must be %s' % (arg_name, arg_type.__name__))
504
505 def wrapper(f):
506 # FIXME(sileht): this stop with
507 # AttributeError: 'method_descriptor' object has no attribute '__module__'
508 # @wraps(f)
509 def validate_func(*args, **kwargs):
510 # ignore the `self` arg
511 pos_args = zip(args[1:], types)
512 named_args = ((kwargs[name], (name, spec)) for name, spec in types
513 if name in kwargs)
514 for arg_val, (arg_name, arg_type) in chain(pos_args, named_args):
515 check_type(arg_val, arg_name, arg_type)
516 return f(*args, **kwargs)
517 return validate_func
518 return wrapper
519
520
521def cstr(val, name, encoding="utf-8", opt=False):
522 """
523 Create a byte string from a Python string
524
525 :param basestring val: Python string
526 :param str name: Name of the string parameter, for exceptions
527 :param str encoding: Encoding to use
528 :param bool opt: If True, None is allowed
529 :rtype: bytes
530 :raises: :class:`InvalidArgument`
531 """
532 if opt and val is None:
533 return None
534 if isinstance(val, bytes):
535 return val
536 elif isinstance(val, unicode):
537 return val.encode(encoding)
538 else:
539 raise TypeError('%s must be a string' % name)
540
541
542def cstr_list(list_str, name, encoding="utf-8"):
543 return [cstr(s, name) for s in list_str]
544
545
546def decode_cstr(val, encoding="utf-8"):
547 """
548 Decode a byte string into a Python string.
549
550 :param bytes val: byte string
551 :rtype: unicode or None
552 """
553 if val is None:
554 return None
555
556 return val.decode(encoding)
557
558
559cdef char* opt_str(s) except? NULL:
560 if s is None:
561 return NULL
562 return s
563
564
565cdef void* realloc_chk(void* ptr, size_t size) except NULL:
566 cdef void *ret = realloc(ptr, size)
567 if ret == NULL:
568 raise MemoryError("realloc failed")
569 return ret
570
571
572cdef size_t * to_csize_t_array(list_int):
573 cdef size_t *ret = <size_t *>malloc(len(list_int) * sizeof(size_t))
574 if ret == NULL:
575 raise MemoryError("malloc failed")
576 for i in xrange(len(list_int)):
577 ret[i] = <size_t>list_int[i]
578 return ret
579
580
581cdef char ** to_bytes_array(list_bytes):
582 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
583 if ret == NULL:
584 raise MemoryError("malloc failed")
585 for i in xrange(len(list_bytes)):
586 ret[i] = <char *>list_bytes[i]
587 return ret
588
589
590
591cdef int __monitor_callback(void *arg, const char *line, const char *who,
592 uint64_t sec, uint64_t nsec, uint64_t seq,
593 const char *level, const char *msg) with gil:
594 cdef object cb_info = <object>arg
595 cb_info[0](cb_info[1], line, who, sec, nsec, seq, level, msg)
596 return 0
597
224ce89b
WB
598cdef int __monitor_callback2(void *arg, const char *line, const char *channel,
599 const char *who,
31f18b77
FG
600 const char *name,
601 uint64_t sec, uint64_t nsec, uint64_t seq,
602 const char *level, const char *msg) with gil:
603 cdef object cb_info = <object>arg
224ce89b 604 cb_info[0](cb_info[1], line, channel, name, who, sec, nsec, seq, level, msg)
31f18b77
FG
605 return 0
606
7c673cae
FG
607
608class Version(object):
609 """ Version information """
610 def __init__(self, major, minor, extra):
611 self.major = major
612 self.minor = minor
613 self.extra = extra
614
615 def __str__(self):
616 return "%d.%d.%d" % (self.major, self.minor, self.extra)
617
618
619cdef class Rados(object):
620 """This class wraps librados functions"""
621 # NOTE(sileht): attributes declared in .pyd
622
623 def __init__(self, *args, **kwargs):
624 PyEval_InitThreads()
625 self.__setup(*args, **kwargs)
626
627 @requires(('rados_id', opt(str_type)), ('name', opt(str_type)), ('clustername', opt(str_type)),
628 ('conffile', opt(str_type)))
629 def __setup(self, rados_id=None, name=None, clustername=None,
630 conf_defaults=None, conffile=None, conf=None, flags=0,
631 context=None):
632 self.monitor_callback = None
31f18b77 633 self.monitor_callback2 = None
7c673cae
FG
634 self.parsed_args = []
635 self.conf_defaults = conf_defaults
636 self.conffile = conffile
637 self.rados_id = rados_id
638
639 if rados_id and name:
640 raise Error("Rados(): can't supply both rados_id and name")
641 elif rados_id:
642 name = 'client.' + rados_id
643 elif name is None:
644 name = 'client.admin'
645 if clustername is None:
646 clustername = ''
647
648 name = cstr(name, 'name')
649 clustername = cstr(clustername, 'clustername')
650 cdef:
651 char *_name = name
652 char *_clustername = clustername
653 int _flags = flags
654 int ret
655
656 if context:
657 # Unpack void* (aka rados_config_t) from capsule
658 rados_config = <rados_config_t> PyCapsule_GetPointer(context, NULL)
659 with nogil:
660 ret = rados_create_with_context(&self.cluster, rados_config)
661 else:
662 with nogil:
663 ret = rados_create2(&self.cluster, _clustername, _name, _flags)
664 if ret != 0:
665 raise Error("rados_initialize failed with error code: %d" % ret)
666
667 self.state = "configuring"
668 # order is important: conf_defaults, then conffile, then conf
669 if conf_defaults:
670 for key, value in conf_defaults.items():
671 self.conf_set(key, value)
672 if conffile is not None:
673 # read the default conf file when '' is given
674 if conffile == '':
675 conffile = None
676 self.conf_read_file(conffile)
677 if conf:
678 for key, value in conf.items():
679 self.conf_set(key, value)
680
681 def require_state(self, *args):
682 """
683 Checks if the Rados object is in a special state
684
11fdf7f2 685 :raises: :class:`RadosStateError`
7c673cae
FG
686 """
687 if self.state in args:
688 return
689 raise RadosStateError("You cannot perform that operation on a \
690Rados object in state %s." % self.state)
691
692 def shutdown(self):
693 """
694 Disconnects from the cluster. Call this explicitly when a
695 Rados.connect()ed object is no longer used.
696 """
697 if self.state != "shutdown":
698 with nogil:
699 rados_shutdown(self.cluster)
700 self.state = "shutdown"
701
702 def __enter__(self):
703 self.connect()
704 return self
705
706 def __exit__(self, type_, value, traceback):
707 self.shutdown()
708 return False
709
710 def version(self):
711 """
712 Get the version number of the ``librados`` C library.
713
714 :returns: a tuple of ``(major, minor, extra)`` components of the
715 librados version
716 """
717 cdef int major = 0
718 cdef int minor = 0
719 cdef int extra = 0
720 with nogil:
721 rados_version(&major, &minor, &extra)
722 return Version(major, minor, extra)
723
724 @requires(('path', opt(str_type)))
725 def conf_read_file(self, path=None):
726 """
727 Configure the cluster handle using a Ceph config file.
728
729 :param path: path to the config file
730 :type path: str
731 """
732 self.require_state("configuring", "connected")
733 path = cstr(path, 'path', opt=True)
734 cdef:
735 char *_path = opt_str(path)
736 with nogil:
737 ret = rados_conf_read_file(self.cluster, _path)
738 if ret != 0:
739 raise make_ex(ret, "error calling conf_read_file")
740
741 def conf_parse_argv(self, args):
742 """
743 Parse known arguments from args, and remove; returned
744 args contain only those unknown to ceph
745 """
746 self.require_state("configuring", "connected")
747 if not args:
748 return
749
750 cargs = cstr_list(args, 'args')
751 cdef:
752 int _argc = len(args)
753 char **_argv = to_bytes_array(cargs)
754 char **_remargv = NULL
755
756 try:
757 _remargv = <char **>malloc(_argc * sizeof(char *))
758 with nogil:
759 ret = rados_conf_parse_argv_remainder(self.cluster, _argc,
760 <const char**>_argv,
761 <const char**>_remargv)
762 if ret:
763 raise make_ex(ret, "error calling conf_parse_argv_remainder")
764
765 # _remargv was allocated with fixed argc; collapse return
766 # list to eliminate any missing args
767 retargs = [decode_cstr(a) for a in _remargv[:_argc]
768 if a != NULL]
769 self.parsed_args = args
770 return retargs
771 finally:
772 free(_argv)
773 free(_remargv)
774
775 def conf_parse_env(self, var='CEPH_ARGS'):
776 """
777 Parse known arguments from an environment variable, normally
778 CEPH_ARGS.
779 """
780 self.require_state("configuring", "connected")
781 if not var:
782 return
783
784 var = cstr(var, 'var')
785 cdef:
786 char *_var = var
787 with nogil:
788 ret = rados_conf_parse_env(self.cluster, _var)
789 if ret != 0:
790 raise make_ex(ret, "error calling conf_parse_env")
791
792 @requires(('option', str_type))
793 def conf_get(self, option):
794 """
795 Get the value of a configuration option
796
797 :param option: which option to read
798 :type option: str
799
800 :returns: str - value of the option or None
801 :raises: :class:`TypeError`
802 """
803 self.require_state("configuring", "connected")
804 option = cstr(option, 'option')
805 cdef:
806 char *_option = option
807 size_t length = 20
808 char *ret_buf = NULL
809
810 try:
811 while True:
812 ret_buf = <char *>realloc_chk(ret_buf, length)
813 with nogil:
814 ret = rados_conf_get(self.cluster, _option, ret_buf, length)
815 if ret == 0:
816 return decode_cstr(ret_buf)
817 elif ret == -errno.ENAMETOOLONG:
818 length = length * 2
819 elif ret == -errno.ENOENT:
820 return None
821 else:
822 raise make_ex(ret, "error calling conf_get")
823 finally:
824 free(ret_buf)
825
826 @requires(('option', str_type), ('val', str_type))
827 def conf_set(self, option, val):
828 """
829 Set the value of a configuration option
830
831 :param option: which option to set
832 :type option: str
833 :param option: value of the option
834 :type option: str
835
836 :raises: :class:`TypeError`, :class:`ObjectNotFound`
837 """
838 self.require_state("configuring", "connected")
839 option = cstr(option, 'option')
840 val = cstr(val, 'val')
841 cdef:
842 char *_option = option
843 char *_val = val
844
845 with nogil:
846 ret = rados_conf_set(self.cluster, _option, _val)
847 if ret != 0:
848 raise make_ex(ret, "error calling conf_set")
849
850 def ping_monitor(self, mon_id):
851 """
852 Ping a monitor to assess liveness
853
854 May be used as a simply way to assess liveness, or to obtain
855 information about the monitor in a simple way even in the
856 absence of quorum.
857
858 :param mon_id: the ID portion of the monitor's name (i.e., mon.<ID>)
859 :type mon_id: str
860 :returns: the string reply from the monitor
861 """
862
863 self.require_state("configuring", "connected")
864
865 mon_id = cstr(mon_id, 'mon_id')
866 cdef:
867 char *_mon_id = mon_id
31f18b77 868 size_t outstrlen = 0
7c673cae
FG
869 char *outstr
870
871 with nogil:
872 ret = rados_ping_monitor(self.cluster, _mon_id, &outstr, &outstrlen)
873
874 if ret != 0:
875 raise make_ex(ret, "error calling ping_monitor")
876
877 if outstrlen:
878 my_outstr = outstr[:outstrlen]
879 rados_buffer_free(outstr)
880 return decode_cstr(my_outstr)
881
882 def connect(self, timeout=0):
883 """
884 Connect to the cluster. Use shutdown() to release resources.
885 """
886 self.require_state("configuring")
887 # NOTE(sileht): timeout was supported by old python API,
888 # but this is not something available in C API, so ignore
889 # for now and remove it later
890 with nogil:
891 ret = rados_connect(self.cluster)
892 if ret != 0:
893 raise make_ex(ret, "error connecting to the cluster")
894 self.state = "connected"
895
11fdf7f2
TL
896 def get_instance_id(self):
897 """
898 Get a global id for current instance
899 """
900 self.require_state("connected")
901 with nogil:
902 ret = rados_get_instance_id(self.cluster)
903 return ret;
904
7c673cae
FG
905 def get_cluster_stats(self):
906 """
907 Read usage info about the cluster
908
909 This tells you total space, space used, space available, and number
910 of objects. These are not updated immediately when data is written,
911 they are eventually consistent.
912
913 :returns: dict - contains the following keys:
914
915 - ``kb`` (int) - total space
916
917 - ``kb_used`` (int) - space used
918
919 - ``kb_avail`` (int) - free space available
920
921 - ``num_objects`` (int) - number of objects
922
923 """
924 cdef:
925 rados_cluster_stat_t stats
926
927 with nogil:
928 ret = rados_cluster_stat(self.cluster, &stats)
929
930 if ret < 0:
931 raise make_ex(
932 ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id)
933 return {'kb': stats.kb,
934 'kb_used': stats.kb_used,
935 'kb_avail': stats.kb_avail,
936 'num_objects': stats.num_objects}
937
938 @requires(('pool_name', str_type))
939 def pool_exists(self, pool_name):
940 """
941 Checks if a given pool exists.
942
943 :param pool_name: name of the pool to check
944 :type pool_name: str
945
946 :raises: :class:`TypeError`, :class:`Error`
947 :returns: bool - whether the pool exists, false otherwise.
948 """
949 self.require_state("connected")
950
951 pool_name = cstr(pool_name, 'pool_name')
952 cdef:
953 char *_pool_name = pool_name
954
955 with nogil:
956 ret = rados_pool_lookup(self.cluster, _pool_name)
957 if ret >= 0:
958 return True
959 elif ret == -errno.ENOENT:
960 return False
961 else:
962 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
963
964 @requires(('pool_name', str_type))
965 def pool_lookup(self, pool_name):
966 """
967 Returns a pool's ID based on its name.
968
969 :param pool_name: name of the pool to look up
970 :type pool_name: str
971
972 :raises: :class:`TypeError`, :class:`Error`
973 :returns: int - pool ID, or None if it doesn't exist
974 """
975 self.require_state("connected")
976 pool_name = cstr(pool_name, 'pool_name')
977 cdef:
978 char *_pool_name = pool_name
979
980 with nogil:
981 ret = rados_pool_lookup(self.cluster, _pool_name)
982 if ret >= 0:
983 return int(ret)
984 elif ret == -errno.ENOENT:
985 return None
986 else:
987 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
988
989 @requires(('pool_id', int))
990 def pool_reverse_lookup(self, pool_id):
991 """
992 Returns a pool's name based on its ID.
993
994 :param pool_id: ID of the pool to look up
995 :type pool_id: int
996
997 :raises: :class:`TypeError`, :class:`Error`
998 :returns: string - pool name, or None if it doesn't exist
999 """
1000 self.require_state("connected")
1001 cdef:
1002 int64_t _pool_id = pool_id
1003 size_t size = 512
1004 char *name = NULL
1005
1006 try:
1007 while True:
1008 name = <char *>realloc_chk(name, size)
1009 with nogil:
1010 ret = rados_pool_reverse_lookup(self.cluster, _pool_id, name, size)
1011 if ret >= 0:
1012 break
1013 elif ret != -errno.ERANGE and size <= 4096:
1014 size *= 2
1015 elif ret == -errno.ENOENT:
1016 return None
1017 elif ret < 0:
1018 raise make_ex(ret, "error reverse looking up pool '%s'" % pool_id)
1019
1020 return decode_cstr(name)
1021
1022 finally:
1023 free(name)
1024
11fdf7f2
TL
1025 @requires(('pool_name', str_type), ('crush_rule', opt(int)))
1026 def create_pool(self, pool_name, crush_rule=None):
7c673cae
FG
1027 """
1028 Create a pool:
11fdf7f2
TL
1029 - with default settings: if crush_rule=None
1030 - with a specific CRUSH rule: crush_rule given
7c673cae
FG
1031
1032 :param pool_name: name of the pool to create
1033 :type pool_name: str
7c673cae
FG
1034 :param crush_rule: rule to use for placement in the new pool
1035 :type crush_rule: int
1036
1037 :raises: :class:`TypeError`, :class:`Error`
1038 """
1039 self.require_state("connected")
1040
1041 pool_name = cstr(pool_name, 'pool_name')
1042 cdef:
1043 char *_pool_name = pool_name
1044 uint8_t _crush_rule
7c673cae 1045
11fdf7f2 1046 if crush_rule is None:
7c673cae
FG
1047 with nogil:
1048 ret = rados_pool_create(self.cluster, _pool_name)
7c673cae 1049 else:
7c673cae
FG
1050 _crush_rule = crush_rule
1051 with nogil:
11fdf7f2 1052 ret = rados_pool_create_with_crush_rule(self.cluster, _pool_name, _crush_rule)
7c673cae
FG
1053 if ret < 0:
1054 raise make_ex(ret, "error creating pool '%s'" % pool_name)
1055
1056 @requires(('pool_id', int))
1057 def get_pool_base_tier(self, pool_id):
1058 """
1059 Get base pool
1060
1061 :returns: base pool, or pool_id if tiering is not configured for the pool
1062 """
1063 self.require_state("connected")
1064 cdef:
1065 int64_t base_tier = 0
1066 int64_t _pool_id = pool_id
1067
1068 with nogil:
1069 ret = rados_pool_get_base_tier(self.cluster, _pool_id, &base_tier)
1070 if ret < 0:
1071 raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id)
1072 return int(base_tier)
1073
1074 @requires(('pool_name', str_type))
1075 def delete_pool(self, pool_name):
1076 """
1077 Delete a pool and all data inside it.
1078
1079 The pool is removed from the cluster immediately,
1080 but the actual data is deleted in the background.
1081
1082 :param pool_name: name of the pool to delete
1083 :type pool_name: str
1084
1085 :raises: :class:`TypeError`, :class:`Error`
1086 """
1087 self.require_state("connected")
1088
1089 pool_name = cstr(pool_name, 'pool_name')
1090 cdef:
1091 char *_pool_name = pool_name
1092
1093 with nogil:
1094 ret = rados_pool_delete(self.cluster, _pool_name)
1095 if ret < 0:
1096 raise make_ex(ret, "error deleting pool '%s'" % pool_name)
1097
1098 @requires(('pool_id', int))
1099 def get_inconsistent_pgs(self, pool_id):
1100 """
1101 List inconsistent placement groups in the given pool
1102
1103 :param pool_id: ID of the pool in which PGs are listed
1104 :type pool_id: int
1105 :returns: list - inconsistent placement groups
1106 """
1107 self.require_state("connected")
1108 cdef:
1109 int64_t pool = pool_id
1110 size_t size = 512
1111 char *pgs = NULL
1112
1113 try:
1114 while True:
1115 pgs = <char *>realloc_chk(pgs, size);
1116 with nogil:
1117 ret = rados_inconsistent_pg_list(self.cluster, pool,
1118 pgs, size)
1119 if ret > <int>size:
1120 size *= 2
1121 elif ret >= 0:
1122 break
1123 else:
1124 raise make_ex(ret, "error calling inconsistent_pg_list")
1125 return [pg for pg in decode_cstr(pgs[:ret]).split('\0') if pg]
1126 finally:
1127 free(pgs)
1128
1129 def list_pools(self):
1130 """
1131 Gets a list of pool names.
1132
1133 :returns: list - of pool names.
1134 """
1135 self.require_state("connected")
1136 cdef:
1137 size_t size = 512
1138 char *c_names = NULL
1139
1140 try:
1141 while True:
1142 c_names = <char *>realloc_chk(c_names, size)
1143 with nogil:
1144 ret = rados_pool_list(self.cluster, c_names, size)
1145 if ret > <int>size:
1146 size *= 2
1147 elif ret >= 0:
1148 break
1149 return [name for name in decode_cstr(c_names[:ret]).split('\0')
1150 if name]
1151 finally:
1152 free(c_names)
1153
1154 def get_fsid(self):
1155 """
1156 Get the fsid of the cluster as a hexadecimal string.
1157
1158 :raises: :class:`Error`
1159 :returns: str - cluster fsid
1160 """
1161 self.require_state("connected")
1162 cdef:
1163 char *ret_buf
1164 size_t buf_len = 37
1165 PyObject* ret_s = NULL
1166
1167 ret_s = PyBytes_FromStringAndSize(NULL, buf_len)
1168 try:
1169 ret_buf = PyBytes_AsString(ret_s)
1170 with nogil:
1171 ret = rados_cluster_fsid(self.cluster, ret_buf, buf_len)
1172 if ret < 0:
1173 raise make_ex(ret, "error getting cluster fsid")
1174 if ret != <int>buf_len:
1175 _PyBytes_Resize(&ret_s, ret)
1176 return <object>ret_s
1177 finally:
1178 # We DECREF unconditionally: the cast to object above will have
1179 # INCREFed if necessary. This also takes care of exceptions,
1180 # including if _PyString_Resize fails (that will free the string
1181 # itself and set ret_s to NULL, hence XDECREF).
1182 ref.Py_XDECREF(ret_s)
1183
1184 @requires(('ioctx_name', str_type))
1185 def open_ioctx(self, ioctx_name):
1186 """
1187 Create an io context
1188
1189 The io context allows you to perform operations within a particular
1190 pool.
1191
1192 :param ioctx_name: name of the pool
1193 :type ioctx_name: str
1194
1195 :raises: :class:`TypeError`, :class:`Error`
1196 :returns: Ioctx - Rados Ioctx object
1197 """
1198 self.require_state("connected")
1199 ioctx_name = cstr(ioctx_name, 'ioctx_name')
1200 cdef:
1201 rados_ioctx_t ioctx
1202 char *_ioctx_name = ioctx_name
1203 with nogil:
1204 ret = rados_ioctx_create(self.cluster, _ioctx_name, &ioctx)
1205 if ret < 0:
1206 raise make_ex(ret, "error opening pool '%s'" % ioctx_name)
1207 io = Ioctx(ioctx_name)
1208 io.io = ioctx
1209 return io
1210
11fdf7f2
TL
1211 @requires(('pool_id', int))
1212 def open_ioctx2(self, pool_id):
1213 """
1214 Create an io context
1215
1216 The io context allows you to perform operations within a particular
1217 pool.
1218
1219 :param pool_id: ID of the pool
1220 :type pool_id: int
1221
1222 :raises: :class:`TypeError`, :class:`Error`
1223 :returns: Ioctx - Rados Ioctx object
1224 """
1225 self.require_state("connected")
1226 cdef:
1227 rados_ioctx_t ioctx
1228 int64_t _pool_id = pool_id
1229 with nogil:
1230 ret = rados_ioctx_create2(self.cluster, _pool_id, &ioctx)
1231 if ret < 0:
1232 raise make_ex(ret, "error opening pool id '%s'" % pool_id)
1233 io = Ioctx(str(pool_id))
1234 io.io = ioctx
1235 return io
1236
7c673cae
FG
1237 def mon_command(self, cmd, inbuf, timeout=0, target=None):
1238 """
1239 mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen)
1240 returns (int ret, string outbuf, string outs)
1241 """
1242 # NOTE(sileht): timeout is ignored because C API doesn't provide
1243 # timeout argument, but we keep it for backward compat with old python binding
1244
1245 self.require_state("connected")
1246 cmd = cstr_list(cmd, 'c')
1247
1248 if isinstance(target, int):
1249 # NOTE(sileht): looks weird but test_monmap_dump pass int
1250 target = str(target)
1251
1252 target = cstr(target, 'target', opt=True)
1253 inbuf = cstr(inbuf, 'inbuf')
1254
1255 cdef:
1256 char *_target = opt_str(target)
1257 char **_cmd = to_bytes_array(cmd)
1258 size_t _cmdlen = len(cmd)
1259
1260 char *_inbuf = inbuf
1261 size_t _inbuf_len = len(inbuf)
1262
1263 char *_outbuf
1264 size_t _outbuf_len
1265 char *_outs
1266 size_t _outs_len
1267
1268 try:
1269 if target:
1270 with nogil:
1271 ret = rados_mon_command_target(self.cluster, _target,
1272 <const char **>_cmd, _cmdlen,
1273 <const char*>_inbuf, _inbuf_len,
1274 &_outbuf, &_outbuf_len,
1275 &_outs, &_outs_len)
1276 else:
1277 with nogil:
1278 ret = rados_mon_command(self.cluster,
1279 <const char **>_cmd, _cmdlen,
1280 <const char*>_inbuf, _inbuf_len,
1281 &_outbuf, &_outbuf_len,
1282 &_outs, &_outs_len)
1283
1284 my_outs = decode_cstr(_outs[:_outs_len])
1285 my_outbuf = _outbuf[:_outbuf_len]
1286 if _outs_len:
1287 rados_buffer_free(_outs)
1288 if _outbuf_len:
1289 rados_buffer_free(_outbuf)
1290 return (ret, my_outbuf, my_outs)
1291 finally:
1292 free(_cmd)
1293
1294 def osd_command(self, osdid, cmd, inbuf, timeout=0):
1295 """
1296 osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1297 returns (int ret, string outbuf, string outs)
1298 """
1299 # NOTE(sileht): timeout is ignored because C API doesn't provide
1300 # timeout argument, but we keep it for backward compat with old python binding
1301 self.require_state("connected")
1302
1303 cmd = cstr_list(cmd, 'cmd')
1304 inbuf = cstr(inbuf, 'inbuf')
1305
1306 cdef:
1307 int _osdid = osdid
1308 char **_cmd = to_bytes_array(cmd)
1309 size_t _cmdlen = len(cmd)
1310
1311 char *_inbuf = inbuf
1312 size_t _inbuf_len = len(inbuf)
1313
1314 char *_outbuf
1315 size_t _outbuf_len
1316 char *_outs
1317 size_t _outs_len
1318
1319 try:
1320 with nogil:
1321 ret = rados_osd_command(self.cluster, _osdid,
1322 <const char **>_cmd, _cmdlen,
1323 <const char*>_inbuf, _inbuf_len,
1324 &_outbuf, &_outbuf_len,
1325 &_outs, &_outs_len)
1326
1327 my_outs = decode_cstr(_outs[:_outs_len])
1328 my_outbuf = _outbuf[:_outbuf_len]
1329 if _outs_len:
1330 rados_buffer_free(_outs)
1331 if _outbuf_len:
1332 rados_buffer_free(_outbuf)
1333 return (ret, my_outbuf, my_outs)
1334 finally:
1335 free(_cmd)
1336
1337 def mgr_command(self, cmd, inbuf, timeout=0):
1338 """
1339 returns (int ret, string outbuf, string outs)
1340 """
1341 # NOTE(sileht): timeout is ignored because C API doesn't provide
1342 # timeout argument, but we keep it for backward compat with old python binding
1343 self.require_state("connected")
1344
1345 cmd = cstr_list(cmd, 'cmd')
1346 inbuf = cstr(inbuf, 'inbuf')
1347
1348 cdef:
1349 char **_cmd = to_bytes_array(cmd)
1350 size_t _cmdlen = len(cmd)
1351
1352 char *_inbuf = inbuf
1353 size_t _inbuf_len = len(inbuf)
1354
1355 char *_outbuf
1356 size_t _outbuf_len
1357 char *_outs
1358 size_t _outs_len
1359
1360 try:
1361 with nogil:
1362 ret = rados_mgr_command(self.cluster,
1363 <const char **>_cmd, _cmdlen,
1364 <const char*>_inbuf, _inbuf_len,
1365 &_outbuf, &_outbuf_len,
1366 &_outs, &_outs_len)
1367
1368 my_outs = decode_cstr(_outs[:_outs_len])
1369 my_outbuf = _outbuf[:_outbuf_len]
1370 if _outs_len:
1371 rados_buffer_free(_outs)
1372 if _outbuf_len:
1373 rados_buffer_free(_outbuf)
1374 return (ret, my_outbuf, my_outs)
1375 finally:
1376 free(_cmd)
1377
1378 def pg_command(self, pgid, cmd, inbuf, timeout=0):
1379 """
1380 pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1381 returns (int ret, string outbuf, string outs)
1382 """
1383 # NOTE(sileht): timeout is ignored because C API doesn't provide
1384 # timeout argument, but we keep it for backward compat with old python binding
1385 self.require_state("connected")
1386
1387 pgid = cstr(pgid, 'pgid')
1388 cmd = cstr_list(cmd, 'cmd')
1389 inbuf = cstr(inbuf, 'inbuf')
1390
1391 cdef:
1392 char *_pgid = pgid
1393 char **_cmd = to_bytes_array(cmd)
1394 size_t _cmdlen = len(cmd)
1395
1396 char *_inbuf = inbuf
1397 size_t _inbuf_len = len(inbuf)
1398
1399 char *_outbuf
1400 size_t _outbuf_len
1401 char *_outs
1402 size_t _outs_len
1403
1404 try:
1405 with nogil:
1406 ret = rados_pg_command(self.cluster, _pgid,
1407 <const char **>_cmd, _cmdlen,
1408 <const char *>_inbuf, _inbuf_len,
1409 &_outbuf, &_outbuf_len,
1410 &_outs, &_outs_len)
1411
1412 my_outs = decode_cstr(_outs[:_outs_len])
1413 my_outbuf = _outbuf[:_outbuf_len]
1414 if _outs_len:
1415 rados_buffer_free(_outs)
1416 if _outbuf_len:
1417 rados_buffer_free(_outbuf)
1418 return (ret, my_outbuf, my_outs)
1419 finally:
1420 free(_cmd)
1421
1422 def wait_for_latest_osdmap(self):
1423 self.require_state("connected")
1424 with nogil:
1425 ret = rados_wait_for_latest_osdmap(self.cluster)
1426 return ret
1427
1428 def blacklist_add(self, client_address, expire_seconds=0):
1429 """
1430 Blacklist a client from the OSDs
1431
1432 :param client_address: client address
1433 :type client_address: str
1434 :param expire_seconds: number of seconds to blacklist
1435 :type expire_seconds: int
1436
1437 :raises: :class:`Error`
1438 """
1439 self.require_state("connected")
1440 client_address = cstr(client_address, 'client_address')
1441 cdef:
1442 uint32_t _expire_seconds = expire_seconds
1443 char *_client_address = client_address
1444
1445 with nogil:
1446 ret = rados_blacklist_add(self.cluster, _client_address, _expire_seconds)
1447 if ret < 0:
1448 raise make_ex(ret, "error blacklisting client '%s'" % client_address)
1449
1450 def monitor_log(self, level, callback, arg):
1451 if level not in MONITOR_LEVELS:
1452 raise LogicError("invalid monitor level " + level)
1453 if callback is not None and not callable(callback):
1454 raise LogicError("callback must be a callable function or None")
1455
1456 level = cstr(level, 'level')
1457 cdef char *_level = level
1458
1459 if callback is None:
1460 with nogil:
1461 r = rados_monitor_log(self.cluster, <const char*>_level, NULL, NULL)
1462 self.monitor_callback = None
31f18b77 1463 self.monitor_callback2 = None
7c673cae
FG
1464 return
1465
1466 cb = (callback, arg)
1467 cdef PyObject* _arg = <PyObject*>cb
1468 with nogil:
1469 r = rados_monitor_log(self.cluster, <const char*>_level,
1470 <rados_log_callback_t>&__monitor_callback, _arg)
1471
1472 if r:
1473 raise make_ex(r, 'error calling rados_monitor_log')
1474 # NOTE(sileht): Prevents the callback method from being garbage collected
1475 self.monitor_callback = cb
31f18b77
FG
1476 self.monitor_callback2 = None
1477
1478 def monitor_log2(self, level, callback, arg):
1479 if level not in MONITOR_LEVELS:
1480 raise LogicError("invalid monitor level " + level)
1481 if callback is not None and not callable(callback):
1482 raise LogicError("callback must be a callable function or None")
1483
1484 level = cstr(level, 'level')
1485 cdef char *_level = level
1486
1487 if callback is None:
1488 with nogil:
1489 r = rados_monitor_log2(self.cluster, <const char*>_level, NULL, NULL)
1490 self.monitor_callback = None
1491 self.monitor_callback2 = None
1492 return
1493
1494 cb = (callback, arg)
1495 cdef PyObject* _arg = <PyObject*>cb
1496 with nogil:
1497 r = rados_monitor_log2(self.cluster, <const char*>_level,
1498 <rados_log_callback2_t>&__monitor_callback2, _arg)
1499
1500 if r:
1501 raise make_ex(r, 'error calling rados_monitor_log')
1502 # NOTE(sileht): Prevents the callback method from being garbage collected
1503 self.monitor_callback = None
1504 self.monitor_callback2 = cb
7c673cae 1505
11fdf7f2
TL
1506 @requires(('service', str_type), ('daemon', str_type), ('metadata', dict))
1507 def service_daemon_register(self, service, daemon, metadata):
1508 """
1509 :param str service: service name (e.g. "rgw")
1510 :param str daemon: daemon name (e.g. "gwfoo")
1511 :param dict metadata: static metadata about the register daemon
1512 (e.g., the version of Ceph, the kernel version.)
1513 """
1514 service = cstr(service, 'service')
1515 daemon = cstr(daemon, 'daemon')
1516 metadata_dict = '\0'.join(chain.from_iterable(metadata.items()))
1517 metadata_dict += '\0'
1518 cdef:
1519 char *_service = service
1520 char *_daemon = daemon
1521 char *_metadata = metadata_dict
1522
1523 with nogil:
1524 ret = rados_service_register(self.cluster, _service, _daemon, _metadata)
1525 if ret != 0:
1526 raise make_ex(ret, "error calling service_register()")
1527
1528 @requires(('metadata', dict))
1529 def service_daemon_update(self, status):
1530 status_dict = '\0'.join(chain.from_iterable(status.items()))
1531 status_dict += '\0'
1532 cdef:
1533 char *_status = status_dict
1534
1535 with nogil:
1536 ret = rados_service_update_status(self.cluster, _status)
1537 if ret != 0:
1538 raise make_ex(ret, "error calling service_daemon_update()")
1539
7c673cae
FG
1540
1541cdef class OmapIterator(object):
1542 """Omap iterator"""
1543
1544 cdef public Ioctx ioctx
1545 cdef rados_omap_iter_t ctx
1546
1547 def __cinit__(self, Ioctx ioctx):
1548 self.ioctx = ioctx
1549
1550 def __iter__(self):
1551 return self
1552
1553 def __next__(self):
1554 """
1555 Get the next key-value pair in the object
1556 :returns: next rados.OmapItem
1557 """
1558 cdef:
1559 char *key_ = NULL
1560 char *val_ = NULL
1561 size_t len_
1562
1563 with nogil:
1564 ret = rados_omap_get_next(self.ctx, &key_, &val_, &len_)
1565
1566 if ret != 0:
1567 raise make_ex(ret, "error iterating over the omap")
1568 if key_ == NULL:
1569 raise StopIteration()
1570 key = decode_cstr(key_)
1571 val = None
1572 if val_ != NULL:
1573 val = val_[:len_]
1574 return (key, val)
1575
1576 def __dealloc__(self):
1577 with nogil:
1578 rados_omap_get_end(self.ctx)
1579
1580
1581cdef class ObjectIterator(object):
1582 """rados.Ioctx Object iterator"""
1583
1584 cdef rados_list_ctx_t ctx
1585
1586 cdef public object ioctx
1587
1588 def __cinit__(self, Ioctx ioctx):
1589 self.ioctx = ioctx
1590
1591 with nogil:
1592 ret = rados_nobjects_list_open(ioctx.io, &self.ctx)
1593 if ret < 0:
1594 raise make_ex(ret, "error iterating over the objects in ioctx '%s'"
1595 % self.ioctx.name)
1596
1597 def __iter__(self):
1598 return self
1599
1600 def __next__(self):
1601 """
1602 Get the next object name and locator in the pool
1603
1604 :raises: StopIteration
1605 :returns: next rados.Ioctx Object
1606 """
1607 cdef:
1608 const char *key_ = NULL
1609 const char *locator_ = NULL
1610 const char *nspace_ = NULL
1611
1612 with nogil:
1613 ret = rados_nobjects_list_next(self.ctx, &key_, &locator_, &nspace_)
1614
1615 if ret < 0:
1616 raise StopIteration()
1617
1618 key = decode_cstr(key_)
1619 locator = decode_cstr(locator_) if locator_ != NULL else None
1620 nspace = decode_cstr(nspace_) if nspace_ != NULL else None
1621 return Object(self.ioctx, key, locator, nspace)
1622
1623 def __dealloc__(self):
1624 with nogil:
1625 rados_nobjects_list_close(self.ctx)
1626
1627
1628cdef class XattrIterator(object):
1629 """Extended attribute iterator"""
1630
1631 cdef rados_xattrs_iter_t it
1632 cdef char* _oid
1633
1634 cdef public Ioctx ioctx
1635 cdef public object oid
1636
1637 def __cinit__(self, Ioctx ioctx, oid):
1638 self.ioctx = ioctx
1639 self.oid = cstr(oid, 'oid')
1640 self._oid = self.oid
1641
1642 with nogil:
1643 ret = rados_getxattrs(ioctx.io, self._oid, &self.it)
1644 if ret != 0:
1645 raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid)
1646
1647 def __iter__(self):
1648 return self
1649
1650 def __next__(self):
1651 """
1652 Get the next xattr on the object
1653
1654 :raises: StopIteration
1655 :returns: pair - of name and value of the next Xattr
1656 """
1657 cdef:
1658 const char *name_ = NULL
1659 const char *val_ = NULL
1660 size_t len_ = 0
1661
1662 with nogil:
1663 ret = rados_getxattrs_next(self.it, &name_, &val_, &len_)
1664 if ret != 0:
1665 raise make_ex(ret, "error iterating over the extended attributes \
1666in '%s'" % self.oid)
1667 if name_ == NULL:
1668 raise StopIteration()
1669 name = decode_cstr(name_)
1670 val = val_[:len_]
1671 return (name, val)
1672
1673 def __dealloc__(self):
1674 with nogil:
1675 rados_getxattrs_end(self.it)
1676
1677
1678cdef class SnapIterator(object):
1679 """Snapshot iterator"""
1680
1681 cdef public Ioctx ioctx
1682
1683 cdef rados_snap_t *snaps
1684 cdef int max_snap
1685 cdef int cur_snap
1686
1687 def __cinit__(self, Ioctx ioctx):
1688 self.ioctx = ioctx
1689 # We don't know how big a buffer we need until we've called the
1690 # function. So use the exponential doubling strategy.
1691 cdef int num_snaps = 10
1692 while True:
1693 self.snaps = <rados_snap_t*>realloc_chk(self.snaps,
1694 num_snaps *
1695 sizeof(rados_snap_t))
1696
1697 with nogil:
1698 ret = rados_ioctx_snap_list(ioctx.io, self.snaps, num_snaps)
1699 if ret >= 0:
1700 self.max_snap = ret
1701 break
1702 elif ret != -errno.ERANGE:
1703 raise make_ex(ret, "error calling rados_snap_list for \
1704ioctx '%s'" % self.ioctx.name)
1705 num_snaps = num_snaps * 2
1706 self.cur_snap = 0
1707
1708 def __iter__(self):
1709 return self
1710
1711 def __next__(self):
1712 """
1713 Get the next Snapshot
1714
1715 :raises: :class:`Error`, StopIteration
1716 :returns: Snap - next snapshot
1717 """
1718 if self.cur_snap >= self.max_snap:
1719 raise StopIteration
1720
1721 cdef:
1722 rados_snap_t snap_id = self.snaps[self.cur_snap]
1723 int name_len = 10
1724 char *name = NULL
1725
1726 try:
1727 while True:
1728 name = <char *>realloc_chk(name, name_len)
1729 with nogil:
1730 ret = rados_ioctx_snap_get_name(self.ioctx.io, snap_id, name, name_len)
1731 if ret == 0:
1732 break
1733 elif ret != -errno.ERANGE:
1734 raise make_ex(ret, "rados_snap_get_name error")
1735 else:
1736 name_len = name_len * 2
1737
1738 snap = Snap(self.ioctx, decode_cstr(name[:name_len]).rstrip('\0'), snap_id)
1739 self.cur_snap = self.cur_snap + 1
1740 return snap
1741 finally:
1742 free(name)
1743
1744
1745cdef class Snap(object):
1746 """Snapshot object"""
1747 cdef public Ioctx ioctx
1748 cdef public object name
1749
1750 # NOTE(sileht): old API was storing the ctypes object
1751 # instead of the value ....
1752 cdef public rados_snap_t snap_id
1753
1754 def __cinit__(self, Ioctx ioctx, object name, rados_snap_t snap_id):
1755 self.ioctx = ioctx
1756 self.name = name
1757 self.snap_id = snap_id
1758
1759 def __str__(self):
1760 return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \
1761 % (str(self.ioctx), self.name, self.snap_id)
1762
1763 def get_timestamp(self):
1764 """
1765 Find when a snapshot in the current pool occurred
1766
1767 :raises: :class:`Error`
1768 :returns: datetime - the data and time the snapshot was created
1769 """
1770 cdef time_t snap_time
1771
1772 with nogil:
1773 ret = rados_ioctx_snap_get_stamp(self.ioctx.io, self.snap_id, &snap_time)
1774 if ret != 0:
1775 raise make_ex(ret, "rados_ioctx_snap_get_stamp error")
1776 return datetime.fromtimestamp(snap_time)
1777
1778
1779cdef class Completion(object):
1780 """completion object"""
1781
1782 cdef public:
1783 Ioctx ioctx
1784 object oncomplete
1785 object onsafe
1786
1787 cdef:
1788 rados_callback_t complete_cb
1789 rados_callback_t safe_cb
1790 rados_completion_t rados_comp
1791 PyObject* buf
1792
1793 def __cinit__(self, Ioctx ioctx, object oncomplete, object onsafe):
1794 self.oncomplete = oncomplete
1795 self.onsafe = onsafe
1796 self.ioctx = ioctx
1797
1798 def is_safe(self):
1799 """
1800 Is an asynchronous operation safe?
1801
1802 This does not imply that the safe callback has finished.
1803
1804 :returns: True if the operation is safe
1805 """
1806 with nogil:
1807 ret = rados_aio_is_safe(self.rados_comp)
1808 return ret == 1
1809
1810 def is_complete(self):
1811 """
1812 Has an asynchronous operation completed?
1813
1814 This does not imply that the safe callback has finished.
1815
1816 :returns: True if the operation is completed
1817 """
1818 with nogil:
1819 ret = rados_aio_is_complete(self.rados_comp)
1820 return ret == 1
1821
1822 def wait_for_safe(self):
1823 """
1824 Wait for an asynchronous operation to be marked safe
1825
1826 This does not imply that the safe callback has finished.
1827 """
1828 with nogil:
1829 rados_aio_wait_for_safe(self.rados_comp)
1830
1831 def wait_for_complete(self):
1832 """
1833 Wait for an asynchronous operation to complete
1834
1835 This does not imply that the complete callback has finished.
1836 """
1837 with nogil:
1838 rados_aio_wait_for_complete(self.rados_comp)
1839
1840 def wait_for_safe_and_cb(self):
1841 """
1842 Wait for an asynchronous operation to be marked safe and for
1843 the safe callback to have returned
1844 """
1845 with nogil:
1846 rados_aio_wait_for_safe_and_cb(self.rados_comp)
1847
1848 def wait_for_complete_and_cb(self):
1849 """
1850 Wait for an asynchronous operation to complete and for the
1851 complete callback to have returned
1852
1853 :returns: whether the operation is completed
1854 """
1855 with nogil:
1856 ret = rados_aio_wait_for_complete_and_cb(self.rados_comp)
1857 return ret
1858
1859 def get_return_value(self):
1860 """
1861 Get the return value of an asychronous operation
1862
1863 The return value is set when the operation is complete or safe,
1864 whichever comes first.
1865
1866 :returns: int - return value of the operation
1867 """
1868 with nogil:
1869 ret = rados_aio_get_return_value(self.rados_comp)
1870 return ret
1871
1872 def __dealloc__(self):
1873 """
1874 Release a completion
1875
1876 Call this when you no longer need the completion. It may not be
1877 freed immediately if the operation is not acked and committed.
1878 """
1879 ref.Py_XDECREF(self.buf)
1880 self.buf = NULL
1881 if self.rados_comp != NULL:
1882 with nogil:
1883 rados_aio_release(self.rados_comp)
1884 self.rados_comp = NULL
1885
1886 def _complete(self):
1887 self.oncomplete(self)
1888 with self.ioctx.lock:
1889 if self.oncomplete:
1890 self.ioctx.complete_completions.remove(self)
1891
1892 def _safe(self):
1893 self.onsafe(self)
1894 with self.ioctx.lock:
1895 if self.onsafe:
1896 self.ioctx.safe_completions.remove(self)
1897
1898 def _cleanup(self):
1899 with self.ioctx.lock:
1900 if self.oncomplete:
1901 self.ioctx.complete_completions.remove(self)
1902 if self.onsafe:
1903 self.ioctx.safe_completions.remove(self)
1904
1905
1906class OpCtx(object):
1907 def __enter__(self):
1908 return self.create()
1909
1910 def __exit__(self, type, msg, traceback):
1911 self.release()
1912
1913
1914cdef class WriteOp(object):
1915 cdef rados_write_op_t write_op
1916
1917 def create(self):
1918 with nogil:
1919 self.write_op = rados_create_write_op()
1920 return self
1921
1922 def release(self):
1923 with nogil:
1924 rados_release_write_op(self.write_op)
1925
1926 @requires(('exclusive', opt(int)))
1927 def new(self, exclusive=None):
1928 """
1929 Create the object.
1930 """
1931
1932 cdef:
1933 int _exclusive = exclusive
1934
1935 with nogil:
1936 rados_write_op_create(self.write_op, _exclusive, NULL)
1937
1938
1939 def remove(self):
1940 """
1941 Remove object.
1942 """
1943 with nogil:
1944 rados_write_op_remove(self.write_op)
1945
1946 @requires(('flags', int))
1947 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
1948 """
1949 Set flags for the last operation added to this write_op.
1950 :para flags: flags to apply to the last operation
1951 :type flags: int
1952 """
1953
1954 cdef:
1955 int _flags = flags
1956
1957 with nogil:
1958 rados_write_op_set_flags(self.write_op, _flags)
1959
1960 @requires(('to_write', bytes))
1961 def append(self, to_write):
1962 """
1963 Append data to an object synchronously
1964 :param to_write: data to write
1965 :type to_write: bytes
1966 """
1967
1968 cdef:
1969 char *_to_write = to_write
1970 size_t length = len(to_write)
1971
1972 with nogil:
1973 rados_write_op_append(self.write_op, _to_write, length)
1974
1975 @requires(('to_write', bytes))
1976 def write_full(self, to_write):
1977 """
1978 Write whole object, atomically replacing it.
1979 :param to_write: data to write
1980 :type to_write: bytes
1981 """
1982
1983 cdef:
1984 char *_to_write = to_write
1985 size_t length = len(to_write)
1986
1987 with nogil:
1988 rados_write_op_write_full(self.write_op, _to_write, length)
1989
1990 @requires(('to_write', bytes), ('offset', int))
1991 def write(self, to_write, offset=0):
1992 """
1993 Write to offset.
1994 :param to_write: data to write
1995 :type to_write: bytes
1996 :param offset: byte offset in the object to begin writing at
1997 :type offset: int
1998 """
1999
2000 cdef:
2001 char *_to_write = to_write
2002 size_t length = len(to_write)
2003 uint64_t _offset = offset
2004
2005 with nogil:
2006 rados_write_op_write(self.write_op, _to_write, length, _offset)
2007
91327a77
AA
2008 @requires(('version', int))
2009 def assert_version(self, version):
2010 """
2011 Check if object's version is the expected one.
2012 :param version: expected version of the object
2013 :param type: int
2014 """
2015 cdef:
2016 uint64_t _version = version
2017
2018 with nogil:
2019 rados_write_op_assert_version(self.write_op, _version)
2020
7c673cae
FG
2021 @requires(('offset', int), ('length', int))
2022 def zero(self, offset, length):
2023 """
2024 Zero part of an object.
2025 :param offset: byte offset in the object to begin writing at
2026 :type offset: int
2027 :param offset: number of zero to write
2028 :type offset: int
2029 """
2030
2031 cdef:
2032 size_t _length = length
2033 uint64_t _offset = offset
2034
2035 with nogil:
2036 rados_write_op_zero(self.write_op, _length, _offset)
2037
2038 @requires(('offset', int))
2039 def truncate(self, offset):
2040 """
2041 Truncate an object.
2042 :param offset: byte offset in the object to begin truncating at
2043 :type offset: int
2044 """
2045
2046 cdef:
2047 uint64_t _offset = offset
2048
2049 with nogil:
2050 rados_write_op_truncate(self.write_op, _offset)
2051
2052
2053class WriteOpCtx(WriteOp, OpCtx):
2054 """write operation context manager"""
2055
2056
2057cdef class ReadOp(object):
2058 cdef rados_read_op_t read_op
2059
2060 def create(self):
2061 with nogil:
2062 self.read_op = rados_create_read_op()
2063 return self
2064
2065 def release(self):
2066 with nogil:
2067 rados_release_read_op(self.read_op)
2068
2069 @requires(('flags', int))
2070 def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
2071 """
2072 Set flags for the last operation added to this read_op.
2073 :para flags: flags to apply to the last operation
2074 :type flags: int
2075 """
2076
2077 cdef:
2078 int _flags = flags
2079
2080 with nogil:
2081 rados_read_op_set_flags(self.read_op, _flags)
2082
2083
2084class ReadOpCtx(ReadOp, OpCtx):
2085 """read operation context manager"""
2086
2087
2088cdef int __aio_safe_cb(rados_completion_t completion, void *args) with gil:
2089 """
2090 Callback to onsafe() for asynchronous operations
2091 """
2092 cdef object cb = <object>args
2093 cb._safe()
2094 return 0
2095
2096
2097cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
2098 """
2099 Callback to oncomplete() for asynchronous operations
2100 """
2101 cdef object cb = <object>args
2102 cb._complete()
2103 return 0
2104
2105
2106cdef class Ioctx(object):
2107 """rados.Ioctx object"""
2108 # NOTE(sileht): attributes declared in .pyd
2109
2110 def __init__(self, name):
2111 self.name = name
2112 self.state = "open"
2113
2114 self.locator_key = ""
2115 self.nspace = ""
2116 self.lock = threading.Lock()
2117 self.safe_completions = []
2118 self.complete_completions = []
2119
2120 def __enter__(self):
2121 return self
2122
2123 def __exit__(self, type_, value, traceback):
2124 self.close()
2125 return False
2126
2127 def __dealloc__(self):
2128 self.close()
2129
2130 def __track_completion(self, completion_obj):
2131 if completion_obj.oncomplete:
2132 with self.lock:
2133 self.complete_completions.append(completion_obj)
2134 if completion_obj.onsafe:
2135 with self.lock:
2136 self.safe_completions.append(completion_obj)
2137
2138 def __get_completion(self, oncomplete, onsafe):
2139 """
2140 Constructs a completion to use with asynchronous operations
2141
2142 :param oncomplete: what to do when the write is safe and complete in memory
2143 on all replicas
2144 :type oncomplete: completion
2145 :param onsafe: what to do when the write is safe and complete on storage
2146 on all replicas
2147 :type onsafe: completion
2148
2149 :raises: :class:`Error`
2150 :returns: completion object
2151 """
2152
2153 completion_obj = Completion(self, oncomplete, onsafe)
2154
2155 cdef:
2156 rados_callback_t complete_cb = NULL
2157 rados_callback_t safe_cb = NULL
2158 rados_completion_t completion
2159 PyObject* p_completion_obj= <PyObject*>completion_obj
2160
2161 if oncomplete:
2162 complete_cb = <rados_callback_t>&__aio_complete_cb
2163 if onsafe:
2164 safe_cb = <rados_callback_t>&__aio_safe_cb
2165
2166 with nogil:
2167 ret = rados_aio_create_completion(p_completion_obj, complete_cb, safe_cb,
2168 &completion)
2169 if ret < 0:
2170 raise make_ex(ret, "error getting a completion")
2171
2172 completion_obj.rados_comp = completion
2173 return completion_obj
2174
2175 @requires(('object_name', str_type), ('oncomplete', opt(Callable)))
2176 def aio_stat(self, object_name, oncomplete):
2177 """
2178 Asynchronously get object stats (size/mtime)
2179
2180 oncomplete will be called with the returned size and mtime
2181 as well as the completion:
2182
2183 oncomplete(completion, size, mtime)
2184
2185 :param object_name: the name of the object to get stats from
2186 :type object_name: str
2187 :param oncomplete: what to do when the stat is complete
2188 :type oncomplete: completion
2189
2190 :raises: :class:`Error`
2191 :returns: completion object
2192 """
2193
2194 object_name = cstr(object_name, 'object_name')
2195
2196 cdef:
2197 Completion completion
2198 char *_object_name = object_name
2199 uint64_t psize
2200 time_t pmtime
2201
2202 def oncomplete_(completion_v):
2203 cdef Completion _completion_v = completion_v
2204 return_value = _completion_v.get_return_value()
2205 if return_value >= 0:
2206 return oncomplete(_completion_v, psize, time.localtime(pmtime))
2207 else:
2208 return oncomplete(_completion_v, None, None)
2209
2210 completion = self.__get_completion(oncomplete_, None)
2211 self.__track_completion(completion)
2212 with nogil:
2213 ret = rados_aio_stat(self.io, _object_name, completion.rados_comp,
2214 &psize, &pmtime)
2215
2216 if ret < 0:
2217 completion._cleanup()
2218 raise make_ex(ret, "error stating %s" % object_name)
2219 return completion
2220
2221 @requires(('object_name', str_type), ('to_write', bytes), ('offset', int),
2222 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2223 def aio_write(self, object_name, to_write, offset=0,
2224 oncomplete=None, onsafe=None):
2225 """
2226 Write data to an object asynchronously
2227
2228 Queues the write and returns.
2229
2230 :param object_name: name of the object
2231 :type object_name: str
2232 :param to_write: data to write
2233 :type to_write: bytes
2234 :param offset: byte offset in the object to begin writing at
2235 :type offset: int
2236 :param oncomplete: what to do when the write is safe and complete in memory
2237 on all replicas
2238 :type oncomplete: completion
2239 :param onsafe: what to do when the write is safe and complete on storage
2240 on all replicas
2241 :type onsafe: completion
2242
2243 :raises: :class:`Error`
2244 :returns: completion object
2245 """
2246
2247 object_name = cstr(object_name, 'object_name')
2248
2249 cdef:
2250 Completion completion
2251 char* _object_name = object_name
2252 char* _to_write = to_write
2253 size_t size = len(to_write)
2254 uint64_t _offset = offset
2255
2256 completion = self.__get_completion(oncomplete, onsafe)
2257 self.__track_completion(completion)
2258 with nogil:
2259 ret = rados_aio_write(self.io, _object_name, completion.rados_comp,
2260 _to_write, size, _offset)
2261 if ret < 0:
2262 completion._cleanup()
2263 raise make_ex(ret, "error writing object %s" % object_name)
2264 return completion
2265
2266 @requires(('object_name', str_type), ('to_write', bytes), ('oncomplete', opt(Callable)),
2267 ('onsafe', opt(Callable)))
2268 def aio_write_full(self, object_name, to_write,
2269 oncomplete=None, onsafe=None):
2270 """
11fdf7f2 2271 Asynchronously write an entire object
7c673cae
FG
2272
2273 The object is filled with the provided data. If the object exists,
2274 it is atomically truncated and then written.
2275 Queues the write and returns.
2276
2277 :param object_name: name of the object
2278 :type object_name: str
2279 :param to_write: data to write
2280 :type to_write: str
2281 :param oncomplete: what to do when the write is safe and complete in memory
2282 on all replicas
2283 :type oncomplete: completion
2284 :param onsafe: what to do when the write is safe and complete on storage
2285 on all replicas
2286 :type onsafe: completion
2287
2288 :raises: :class:`Error`
2289 :returns: completion object
2290 """
2291
2292 object_name = cstr(object_name, 'object_name')
2293
2294 cdef:
2295 Completion completion
2296 char* _object_name = object_name
2297 char* _to_write = to_write
2298 size_t size = len(to_write)
2299
2300 completion = self.__get_completion(oncomplete, onsafe)
2301 self.__track_completion(completion)
2302 with nogil:
2303 ret = rados_aio_write_full(self.io, _object_name,
2304 completion.rados_comp,
2305 _to_write, size)
2306 if ret < 0:
2307 completion._cleanup()
2308 raise make_ex(ret, "error writing object %s" % object_name)
2309 return completion
2310
2311 @requires(('object_name', str_type), ('to_append', bytes), ('oncomplete', opt(Callable)),
2312 ('onsafe', opt(Callable)))
2313 def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):
2314 """
11fdf7f2 2315 Asynchronously append data to an object
7c673cae
FG
2316
2317 Queues the write and returns.
2318
2319 :param object_name: name of the object
2320 :type object_name: str
2321 :param to_append: data to append
2322 :type to_append: str
2323 :param offset: byte offset in the object to begin writing at
2324 :type offset: int
2325 :param oncomplete: what to do when the write is safe and complete in memory
2326 on all replicas
2327 :type oncomplete: completion
2328 :param onsafe: what to do when the write is safe and complete on storage
2329 on all replicas
2330 :type onsafe: completion
2331
2332 :raises: :class:`Error`
2333 :returns: completion object
2334 """
2335 object_name = cstr(object_name, 'object_name')
2336
2337 cdef:
2338 Completion completion
2339 char* _object_name = object_name
2340 char* _to_append = to_append
2341 size_t size = len(to_append)
2342
2343 completion = self.__get_completion(oncomplete, onsafe)
2344 self.__track_completion(completion)
2345 with nogil:
2346 ret = rados_aio_append(self.io, _object_name,
2347 completion.rados_comp,
2348 _to_append, size)
2349 if ret < 0:
2350 completion._cleanup()
2351 raise make_ex(ret, "error appending object %s" % object_name)
2352 return completion
2353
2354 def aio_flush(self):
2355 """
2356 Block until all pending writes in an io context are safe
2357
2358 :raises: :class:`Error`
2359 """
2360 with nogil:
2361 ret = rados_aio_flush(self.io)
2362 if ret < 0:
2363 raise make_ex(ret, "error flushing")
2364
2365 @requires(('object_name', str_type), ('length', int), ('offset', int),
2366 ('oncomplete', opt(Callable)))
2367 def aio_read(self, object_name, length, offset, oncomplete):
2368 """
11fdf7f2 2369 Asynchronously read data from an object
7c673cae
FG
2370
2371 oncomplete will be called with the returned read value as
2372 well as the completion:
2373
2374 oncomplete(completion, data_read)
2375
2376 :param object_name: name of the object to read from
2377 :type object_name: str
2378 :param length: the number of bytes to read
2379 :type length: int
2380 :param offset: byte offset in the object to begin reading from
2381 :type offset: int
2382 :param oncomplete: what to do when the read is complete
2383 :type oncomplete: completion
2384
2385 :raises: :class:`Error`
2386 :returns: completion object
2387 """
2388
2389 object_name = cstr(object_name, 'object_name')
2390
2391 cdef:
2392 Completion completion
2393 char* _object_name = object_name
2394 uint64_t _offset = offset
2395
2396 char *ref_buf
2397 size_t _length = length
2398
2399 def oncomplete_(completion_v):
2400 cdef Completion _completion_v = completion_v
2401 return_value = _completion_v.get_return_value()
2402 if return_value > 0 and return_value != length:
2403 _PyBytes_Resize(&_completion_v.buf, return_value)
2404 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2405
2406 completion = self.__get_completion(oncomplete_, None)
2407 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2408 ret_buf = PyBytes_AsString(completion.buf)
2409 self.__track_completion(completion)
2410 with nogil:
2411 ret = rados_aio_read(self.io, _object_name, completion.rados_comp,
2412 ret_buf, _length, _offset)
2413 if ret < 0:
2414 completion._cleanup()
2415 raise make_ex(ret, "error reading %s" % object_name)
2416 return completion
2417
2418 @requires(('object_name', str_type), ('cls', str_type), ('method', str_type),
2419 ('data', bytes), ('length', int),
2420 ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2421 def aio_execute(self, object_name, cls, method, data,
2422 length=8192, oncomplete=None, onsafe=None):
2423 """
2424 Asynchronously execute an OSD class method on an object.
2425
2426 oncomplete and onsafe will be called with the data returned from
2427 the plugin as well as the completion:
2428
2429 oncomplete(completion, data)
2430 onsafe(completion, data)
2431
2432 :param object_name: name of the object
2433 :type object_name: str
2434 :param cls: name of the object class
2435 :type cls: str
2436 :param method: name of the method
2437 :type method: str
2438 :param data: input data
2439 :type data: bytes
2440 :param length: size of output buffer in bytes (default=8192)
2441 :type length: int
2442 :param oncomplete: what to do when the execution is complete
2443 :type oncomplete: completion
2444 :param onsafe: what to do when the execution is safe and complete
2445 :type onsafe: completion
2446
2447 :raises: :class:`Error`
2448 :returns: completion object
2449 """
2450
2451 object_name = cstr(object_name, 'object_name')
2452 cls = cstr(cls, 'cls')
2453 method = cstr(method, 'method')
2454 cdef:
2455 Completion completion
2456 char *_object_name = object_name
2457 char *_cls = cls
2458 char *_method = method
2459 char *_data = data
2460 size_t _data_len = len(data)
2461
2462 char *ref_buf
2463 size_t _length = length
2464
2465 def oncomplete_(completion_v):
2466 cdef Completion _completion_v = completion_v
2467 return_value = _completion_v.get_return_value()
2468 if return_value > 0 and return_value != length:
2469 _PyBytes_Resize(&_completion_v.buf, return_value)
2470 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2471
2472 def onsafe_(completion_v):
2473 cdef Completion _completion_v = completion_v
2474 return_value = _completion_v.get_return_value()
2475 return onsafe(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2476
2477 completion = self.__get_completion(oncomplete_ if oncomplete else None, onsafe_ if onsafe else None)
2478 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2479 ret_buf = PyBytes_AsString(completion.buf)
2480 self.__track_completion(completion)
2481 with nogil:
2482 ret = rados_aio_exec(self.io, _object_name, completion.rados_comp,
2483 _cls, _method, _data, _data_len, ret_buf, _length)
2484 if ret < 0:
2485 completion._cleanup()
2486 raise make_ex(ret, "error executing %s::%s on %s" % (cls, method, object_name))
2487 return completion
2488
2489 @requires(('object_name', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)))
2490 def aio_remove(self, object_name, oncomplete=None, onsafe=None):
2491 """
11fdf7f2 2492 Asynchronously remove an object
7c673cae
FG
2493
2494 :param object_name: name of the object to remove
2495 :type object_name: str
2496 :param oncomplete: what to do when the remove is safe and complete in memory
2497 on all replicas
2498 :type oncomplete: completion
2499 :param onsafe: what to do when the remove is safe and complete on storage
2500 on all replicas
2501 :type onsafe: completion
2502
2503 :raises: :class:`Error`
2504 :returns: completion object
2505 """
2506 object_name = cstr(object_name, 'object_name')
2507
2508 cdef:
2509 Completion completion
2510 char* _object_name = object_name
2511
2512 completion = self.__get_completion(oncomplete, onsafe)
2513 self.__track_completion(completion)
2514 with nogil:
2515 ret = rados_aio_remove(self.io, _object_name,
2516 completion.rados_comp)
2517 if ret < 0:
2518 completion._cleanup()
2519 raise make_ex(ret, "error removing %s" % object_name)
2520 return completion
2521
2522 def require_ioctx_open(self):
2523 """
2524 Checks if the rados.Ioctx object state is 'open'
2525
2526 :raises: IoctxStateError
2527 """
2528 if self.state != "open":
2529 raise IoctxStateError("The pool is %s" % self.state)
2530
7c673cae
FG
2531 @requires(('loc_key', str_type))
2532 def set_locator_key(self, loc_key):
2533 """
2534 Set the key for mapping objects to pgs within an io context.
2535
2536 The key is used instead of the object name to determine which
2537 placement groups an object is put in. This affects all subsequent
2538 operations of the io context - until a different locator key is
2539 set, all objects in this io context will be placed in the same pg.
2540
2541 :param loc_key: the key to use as the object locator, or NULL to discard
2542 any previously set key
2543 :type loc_key: str
2544
2545 :raises: :class:`TypeError`
2546 """
2547 self.require_ioctx_open()
2548 cloc_key = cstr(loc_key, 'loc_key')
2549 cdef char *_loc_key = cloc_key
2550 with nogil:
2551 rados_ioctx_locator_set_key(self.io, _loc_key)
2552 self.locator_key = loc_key
2553
2554 def get_locator_key(self):
2555 """
2556 Get the locator_key of context
2557
2558 :returns: locator_key
2559 """
2560 return self.locator_key
2561
2562 @requires(('snap_id', long))
2563 def set_read(self, snap_id):
2564 """
2565 Set the snapshot for reading objects.
2566
2567 To stop to read from snapshot, use set_read(LIBRADOS_SNAP_HEAD)
2568
2569 :param snap_id: the snapshot Id
2570 :type snap_id: int
2571
2572 :raises: :class:`TypeError`
2573 """
2574 self.require_ioctx_open()
2575 cdef rados_snap_t _snap_id = snap_id
2576 with nogil:
2577 rados_ioctx_snap_set_read(self.io, _snap_id)
2578
2579 @requires(('nspace', str_type))
2580 def set_namespace(self, nspace):
2581 """
2582 Set the namespace for objects within an io context.
2583
2584 The namespace in addition to the object name fully identifies
2585 an object. This affects all subsequent operations of the io context
2586 - until a different namespace is set, all objects in this io context
2587 will be placed in the same namespace.
2588
2589 :param nspace: the namespace to use, or None/"" for the default namespace
2590 :type nspace: str
2591
2592 :raises: :class:`TypeError`
2593 """
2594 self.require_ioctx_open()
2595 if nspace is None:
2596 nspace = ""
2597 cnspace = cstr(nspace, 'nspace')
2598 cdef char *_nspace = cnspace
2599 with nogil:
2600 rados_ioctx_set_namespace(self.io, _nspace)
2601 self.nspace = nspace
2602
2603 def get_namespace(self):
2604 """
2605 Get the namespace of context
2606
2607 :returns: namespace
2608 """
2609 return self.nspace
2610
2611 def close(self):
2612 """
2613 Close a rados.Ioctx object.
2614
2615 This just tells librados that you no longer need to use the io context.
2616 It may not be freed immediately if there are pending asynchronous
2617 requests on it, but you should not use an io context again after
2618 calling this function on it.
2619 """
2620 if self.state == "open":
2621 self.require_ioctx_open()
2622 with nogil:
2623 rados_ioctx_destroy(self.io)
2624 self.state = "closed"
2625
2626
2627 @requires(('key', str_type), ('data', bytes))
2628 def write(self, key, data, offset=0):
2629 """
2630 Write data to an object synchronously
2631
2632 :param key: name of the object
2633 :type key: str
2634 :param data: data to write
2635 :type data: bytes
2636 :param offset: byte offset in the object to begin writing at
2637 :type offset: int
2638
2639 :raises: :class:`TypeError`
2640 :raises: :class:`LogicError`
2641 :returns: int - 0 on success
2642 """
2643 self.require_ioctx_open()
2644
2645 key = cstr(key, 'key')
2646 cdef:
2647 char *_key = key
2648 char *_data = data
2649 size_t length = len(data)
2650 uint64_t _offset = offset
2651
2652 with nogil:
2653 ret = rados_write(self.io, _key, _data, length, _offset)
2654 if ret == 0:
2655 return ret
2656 elif ret < 0:
2657 raise make_ex(ret, "Ioctx.write(%s): failed to write %s"
2658 % (self.name, key))
2659 else:
2660 raise LogicError("Ioctx.write(%s): rados_write \
2661returned %d, but should return zero on success." % (self.name, ret))
2662
2663 @requires(('key', str_type), ('data', bytes))
2664 def write_full(self, key, data):
2665 """
2666 Write an entire object synchronously.
2667
2668 The object is filled with the provided data. If the object exists,
2669 it is atomically truncated and then written.
2670
2671 :param key: name of the object
2672 :type key: str
2673 :param data: data to write
2674 :type data: bytes
2675
2676 :raises: :class:`TypeError`
2677 :raises: :class:`Error`
2678 :returns: int - 0 on success
2679 """
2680 self.require_ioctx_open()
2681 key = cstr(key, 'key')
2682 cdef:
2683 char *_key = key
2684 char *_data = data
2685 size_t length = len(data)
2686
2687 with nogil:
2688 ret = rados_write_full(self.io, _key, _data, length)
2689 if ret == 0:
2690 return ret
2691 elif ret < 0:
2692 raise make_ex(ret, "Ioctx.write_full(%s): failed to write %s"
2693 % (self.name, key))
2694 else:
2695 raise LogicError("Ioctx.write_full(%s): rados_write_full \
2696returned %d, but should return zero on success." % (self.name, ret))
2697
2698 @requires(('key', str_type), ('data', bytes))
2699 def append(self, key, data):
2700 """
2701 Append data to an object synchronously
2702
2703 :param key: name of the object
2704 :type key: str
2705 :param data: data to write
2706 :type data: bytes
2707
2708 :raises: :class:`TypeError`
2709 :raises: :class:`LogicError`
2710 :returns: int - 0 on success
2711 """
2712 self.require_ioctx_open()
2713 key = cstr(key, 'key')
2714 cdef:
2715 char *_key = key
2716 char *_data = data
2717 size_t length = len(data)
2718
2719 with nogil:
2720 ret = rados_append(self.io, _key, _data, length)
2721 if ret == 0:
2722 return ret
2723 elif ret < 0:
2724 raise make_ex(ret, "Ioctx.append(%s): failed to append %s"
2725 % (self.name, key))
2726 else:
2727 raise LogicError("Ioctx.append(%s): rados_append \
2728returned %d, but should return zero on success." % (self.name, ret))
2729
2730 @requires(('key', str_type))
2731 def read(self, key, length=8192, offset=0):
2732 """
2733 Read data from an object synchronously
2734
2735 :param key: name of the object
2736 :type key: str
2737 :param length: the number of bytes to read (default=8192)
2738 :type length: int
2739 :param offset: byte offset in the object to begin reading at
2740 :type offset: int
2741
2742 :raises: :class:`TypeError`
2743 :raises: :class:`Error`
2744 :returns: str - data read from object
2745 """
2746 self.require_ioctx_open()
2747 key = cstr(key, 'key')
2748 cdef:
2749 char *_key = key
2750 char *ret_buf
2751 uint64_t _offset = offset
2752 size_t _length = length
2753 PyObject* ret_s = NULL
2754
2755 ret_s = PyBytes_FromStringAndSize(NULL, length)
2756 try:
2757 ret_buf = PyBytes_AsString(ret_s)
2758 with nogil:
2759 ret = rados_read(self.io, _key, ret_buf, _length, _offset)
2760 if ret < 0:
2761 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2762
2763 if ret != length:
2764 _PyBytes_Resize(&ret_s, ret)
2765
2766 return <object>ret_s
2767 finally:
2768 # We DECREF unconditionally: the cast to object above will have
2769 # INCREFed if necessary. This also takes care of exceptions,
2770 # including if _PyString_Resize fails (that will free the string
2771 # itself and set ret_s to NULL, hence XDECREF).
2772 ref.Py_XDECREF(ret_s)
2773
2774 @requires(('key', str_type), ('cls', str_type), ('method', str_type), ('data', bytes))
2775 def execute(self, key, cls, method, data, length=8192):
2776 """
2777 Execute an OSD class method on an object.
2778
2779 :param key: name of the object
2780 :type key: str
2781 :param cls: name of the object class
2782 :type cls: str
2783 :param method: name of the method
2784 :type method: str
2785 :param data: input data
2786 :type data: bytes
2787 :param length: size of output buffer in bytes (default=8192)
2788 :type length: int
2789
2790 :raises: :class:`TypeError`
2791 :raises: :class:`Error`
2792 :returns: (ret, method output)
2793 """
2794 self.require_ioctx_open()
2795
2796 key = cstr(key, 'key')
2797 cls = cstr(cls, 'cls')
2798 method = cstr(method, 'method')
2799 cdef:
2800 char *_key = key
2801 char *_cls = cls
2802 char *_method = method
2803 char *_data = data
2804 size_t _data_len = len(data)
2805
2806 char *ref_buf
2807 size_t _length = length
2808 PyObject* ret_s = NULL
2809
2810 ret_s = PyBytes_FromStringAndSize(NULL, length)
2811 try:
2812 ret_buf = PyBytes_AsString(ret_s)
2813 with nogil:
2814 ret = rados_exec(self.io, _key, _cls, _method, _data,
2815 _data_len, ret_buf, _length)
2816 if ret < 0:
2817 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2818
2819 if ret != length:
2820 _PyBytes_Resize(&ret_s, ret)
2821
2822 return ret, <object>ret_s
2823 finally:
2824 # We DECREF unconditionally: the cast to object above will have
2825 # INCREFed if necessary. This also takes care of exceptions,
2826 # including if _PyString_Resize fails (that will free the string
2827 # itself and set ret_s to NULL, hence XDECREF).
2828 ref.Py_XDECREF(ret_s)
2829
2830 def get_stats(self):
2831 """
2832 Get pool usage statistics
2833
2834 :returns: dict - contains the following keys:
2835
2836 - ``num_bytes`` (int) - size of pool in bytes
2837
2838 - ``num_kb`` (int) - size of pool in kbytes
2839
2840 - ``num_objects`` (int) - number of objects in the pool
2841
2842 - ``num_object_clones`` (int) - number of object clones
2843
2844 - ``num_object_copies`` (int) - number of object copies
2845
2846 - ``num_objects_missing_on_primary`` (int) - number of objets
2847 missing on primary
2848
2849 - ``num_objects_unfound`` (int) - number of unfound objects
2850
2851 - ``num_objects_degraded`` (int) - number of degraded objects
2852
2853 - ``num_rd`` (int) - bytes read
2854
2855 - ``num_rd_kb`` (int) - kbytes read
2856
2857 - ``num_wr`` (int) - bytes written
2858
2859 - ``num_wr_kb`` (int) - kbytes written
2860 """
2861 self.require_ioctx_open()
2862 cdef rados_pool_stat_t stats
2863 with nogil:
2864 ret = rados_ioctx_pool_stat(self.io, &stats)
2865 if ret < 0:
2866 raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name)
2867 return {'num_bytes': stats.num_bytes,
2868 'num_kb': stats.num_kb,
2869 'num_objects': stats.num_objects,
2870 'num_object_clones': stats.num_object_clones,
2871 'num_object_copies': stats.num_object_copies,
2872 "num_objects_missing_on_primary": stats.num_objects_missing_on_primary,
2873 "num_objects_unfound": stats.num_objects_unfound,
2874 "num_objects_degraded": stats.num_objects_degraded,
2875 "num_rd": stats.num_rd,
2876 "num_rd_kb": stats.num_rd_kb,
2877 "num_wr": stats.num_wr,
2878 "num_wr_kb": stats.num_wr_kb}
2879
2880 @requires(('key', str_type))
2881 def remove_object(self, key):
2882 """
2883 Delete an object
2884
2885 This does not delete any snapshots of the object.
2886
2887 :param key: the name of the object to delete
2888 :type key: str
2889
2890 :raises: :class:`TypeError`
2891 :raises: :class:`Error`
2892 :returns: bool - True on success
2893 """
2894 self.require_ioctx_open()
2895 key = cstr(key, 'key')
2896 cdef:
2897 char *_key = key
2898
2899 with nogil:
2900 ret = rados_remove(self.io, _key)
2901 if ret < 0:
2902 raise make_ex(ret, "Failed to remove '%s'" % key)
2903 return True
2904
2905 @requires(('key', str_type))
2906 def trunc(self, key, size):
2907 """
2908 Resize an object
2909
2910 If this enlarges the object, the new area is logically filled with
2911 zeroes. If this shrinks the object, the excess data is removed.
2912
2913 :param key: the name of the object to resize
2914 :type key: str
2915 :param size: the new size of the object in bytes
2916 :type size: int
2917
2918 :raises: :class:`TypeError`
2919 :raises: :class:`Error`
2920 :returns: int - 0 on success, otherwise raises error
2921 """
2922
2923 self.require_ioctx_open()
2924 key = cstr(key, 'key')
2925 cdef:
2926 char *_key = key
2927 uint64_t _size = size
2928
2929 with nogil:
2930 ret = rados_trunc(self.io, _key, _size)
2931 if ret < 0:
2932 raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))
2933 return ret
2934
2935 @requires(('key', str_type))
2936 def stat(self, key):
2937 """
2938 Get object stats (size/mtime)
2939
2940 :param key: the name of the object to get stats from
2941 :type key: str
2942
2943 :raises: :class:`TypeError`
2944 :raises: :class:`Error`
2945 :returns: (size,timestamp)
2946 """
2947 self.require_ioctx_open()
2948
2949 key = cstr(key, 'key')
2950 cdef:
2951 char *_key = key
2952 uint64_t psize
2953 time_t pmtime
2954
2955 with nogil:
2956 ret = rados_stat(self.io, _key, &psize, &pmtime)
2957 if ret < 0:
2958 raise make_ex(ret, "Failed to stat %r" % key)
2959 return psize, time.localtime(pmtime)
2960
2961 @requires(('key', str_type), ('xattr_name', str_type))
2962 def get_xattr(self, key, xattr_name):
2963 """
2964 Get the value of an extended attribute on an object.
2965
2966 :param key: the name of the object to get xattr from
2967 :type key: str
2968 :param xattr_name: which extended attribute to read
2969 :type xattr_name: str
2970
2971 :raises: :class:`TypeError`
2972 :raises: :class:`Error`
2973 :returns: str - value of the xattr
2974 """
2975 self.require_ioctx_open()
2976
2977 key = cstr(key, 'key')
2978 xattr_name = cstr(xattr_name, 'xattr_name')
2979 cdef:
2980 char *_key = key
2981 char *_xattr_name = xattr_name
2982 size_t ret_length = 4096
2983 char *ret_buf = NULL
2984
2985 try:
2986 while ret_length < 4096 * 1024 * 1024:
2987 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
2988 with nogil:
2989 ret = rados_getxattr(self.io, _key, _xattr_name, ret_buf, ret_length)
2990 if ret == -errno.ERANGE:
2991 ret_length *= 2
2992 elif ret < 0:
2993 raise make_ex(ret, "Failed to get xattr %r" % xattr_name)
2994 else:
2995 break
2996 return ret_buf[:ret]
2997 finally:
2998 free(ret_buf)
2999
3000 @requires(('oid', str_type))
3001 def get_xattrs(self, oid):
3002 """
3003 Start iterating over xattrs on an object.
3004
3005 :param oid: the name of the object to get xattrs from
3006 :type oid: str
3007
3008 :raises: :class:`TypeError`
3009 :raises: :class:`Error`
3010 :returns: XattrIterator
3011 """
3012 self.require_ioctx_open()
3013 return XattrIterator(self, oid)
3014
3015 @requires(('key', str_type), ('xattr_name', str_type), ('xattr_value', bytes))
3016 def set_xattr(self, key, xattr_name, xattr_value):
3017 """
3018 Set an extended attribute on an object.
3019
3020 :param key: the name of the object to set xattr to
3021 :type key: str
3022 :param xattr_name: which extended attribute to set
3023 :type xattr_name: str
3024 :param xattr_value: the value of the extended attribute
3025 :type xattr_value: bytes
3026
3027 :raises: :class:`TypeError`
3028 :raises: :class:`Error`
3029 :returns: bool - True on success, otherwise raise an error
3030 """
3031 self.require_ioctx_open()
3032
3033 key = cstr(key, 'key')
3034 xattr_name = cstr(xattr_name, 'xattr_name')
3035 cdef:
3036 char *_key = key
3037 char *_xattr_name = xattr_name
3038 char *_xattr_value = xattr_value
3039 size_t _xattr_value_len = len(xattr_value)
3040
3041 with nogil:
3042 ret = rados_setxattr(self.io, _key, _xattr_name,
3043 _xattr_value, _xattr_value_len)
3044 if ret < 0:
3045 raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
3046 return True
3047
3048 @requires(('key', str_type), ('xattr_name', str_type))
3049 def rm_xattr(self, key, xattr_name):
3050 """
3051 Removes an extended attribute on from an object.
3052
3053 :param key: the name of the object to remove xattr from
3054 :type key: str
3055 :param xattr_name: which extended attribute to remove
3056 :type xattr_name: str
3057
3058 :raises: :class:`TypeError`
3059 :raises: :class:`Error`
3060 :returns: bool - True on success, otherwise raise an error
3061 """
3062 self.require_ioctx_open()
3063
3064 key = cstr(key, 'key')
3065 xattr_name = cstr(xattr_name, 'xattr_name')
3066 cdef:
3067 char *_key = key
3068 char *_xattr_name = xattr_name
3069
3070 with nogil:
3071 ret = rados_rmxattr(self.io, _key, _xattr_name)
3072 if ret < 0:
3073 raise make_ex(ret, "Failed to delete key %r xattr %r" %
3074 (key, xattr_name))
3075 return True
3076
11fdf7f2
TL
3077 @requires(('obj', str_type), ('msg', str_type), ('timeout_ms', int))
3078 def notify(self, obj, msg='', timeout_ms=5000):
3079 """
3080 Send a rados notification to an object.
3081
3082 :param obj: the name of the object to notify
3083 :type obj: str
3084 :param msg: optional message to send in the notification
3085 :type msg: str
3086 :param timeout_ms: notify timeout (in ms)
3087 :type timeout_ms: int
3088
3089 :raises: :class:`TypeError`
3090 :raises: :class:`Error`
3091 :returns: bool - True on success, otherwise raise an error
3092 """
3093 self.require_ioctx_open()
3094
3095 msglen = len(msg)
3096 obj = cstr(obj, 'obj')
3097 msg = cstr(msg, 'msg')
3098 cdef:
3099 char *_obj = obj
3100 char *_msg = msg
3101 int _msglen = msglen
3102 uint64_t _timeout_ms = timeout_ms
3103
3104 with nogil:
3105 ret = rados_notify2(self.io, _obj, _msg, _msglen, _timeout_ms,
3106 NULL, NULL)
3107 if ret < 0:
3108 raise make_ex(ret, "Failed to notify %r" % (obj))
3109 return True
3110
7c673cae
FG
3111 def list_objects(self):
3112 """
3113 Get ObjectIterator on rados.Ioctx object.
3114
3115 :returns: ObjectIterator
3116 """
3117 self.require_ioctx_open()
3118 return ObjectIterator(self)
3119
3120 def list_snaps(self):
3121 """
3122 Get SnapIterator on rados.Ioctx object.
3123
3124 :returns: SnapIterator
3125 """
3126 self.require_ioctx_open()
3127 return SnapIterator(self)
3128
3129 @requires(('snap_name', str_type))
3130 def create_snap(self, snap_name):
3131 """
3132 Create a pool-wide snapshot
3133
3134 :param snap_name: the name of the snapshot
3135 :type snap_name: str
3136
3137 :raises: :class:`TypeError`
3138 :raises: :class:`Error`
3139 """
3140 self.require_ioctx_open()
3141 snap_name = cstr(snap_name, 'snap_name')
3142 cdef char *_snap_name = snap_name
3143
3144 with nogil:
3145 ret = rados_ioctx_snap_create(self.io, _snap_name)
3146 if ret != 0:
3147 raise make_ex(ret, "Failed to create snap %s" % snap_name)
3148
3149 @requires(('snap_name', str_type))
3150 def remove_snap(self, snap_name):
3151 """
3152 Removes a pool-wide snapshot
3153
3154 :param snap_name: the name of the snapshot
3155 :type snap_name: str
3156
3157 :raises: :class:`TypeError`
3158 :raises: :class:`Error`
3159 """
3160 self.require_ioctx_open()
3161 snap_name = cstr(snap_name, 'snap_name')
3162 cdef char *_snap_name = snap_name
3163
3164 with nogil:
3165 ret = rados_ioctx_snap_remove(self.io, _snap_name)
3166 if ret != 0:
3167 raise make_ex(ret, "Failed to remove snap %s" % snap_name)
3168
3169 @requires(('snap_name', str_type))
3170 def lookup_snap(self, snap_name):
3171 """
3172 Get the id of a pool snapshot
3173
3174 :param snap_name: the name of the snapshot to lookop
3175 :type snap_name: str
3176
3177 :raises: :class:`TypeError`
3178 :raises: :class:`Error`
3179 :returns: Snap - on success
3180 """
3181 self.require_ioctx_open()
3182 csnap_name = cstr(snap_name, 'snap_name')
3183 cdef:
3184 char *_snap_name = csnap_name
3185 rados_snap_t snap_id
3186
3187 with nogil:
3188 ret = rados_ioctx_snap_lookup(self.io, _snap_name, &snap_id)
3189 if ret != 0:
3190 raise make_ex(ret, "Failed to lookup snap %s" % snap_name)
3191 return Snap(self, snap_name, int(snap_id))
3192
3193 @requires(('oid', str_type), ('snap_name', str_type))
3194 def snap_rollback(self, oid, snap_name):
3195 """
3196 Rollback an object to a snapshot
3197
3198 :param oid: the name of the object
3199 :type oid: str
3200 :param snap_name: the name of the snapshot
3201 :type snap_name: str
3202
3203 :raises: :class:`TypeError`
3204 :raises: :class:`Error`
3205 """
3206 self.require_ioctx_open()
3207 oid = cstr(oid, 'oid')
3208 snap_name = cstr(snap_name, 'snap_name')
3209 cdef:
3210 char *_snap_name = snap_name
3211 char *_oid = oid
3212
3213 with nogil:
3214 ret = rados_ioctx_snap_rollback(self.io, _oid, _snap_name)
3215 if ret != 0:
3216 raise make_ex(ret, "Failed to rollback %s" % oid)
3217
28e407b8
AA
3218 def create_self_managed_snap(self):
3219 """
3220 Creates a self-managed snapshot
3221
3222 :returns: snap id on success
3223
3224 :raises: :class:`Error`
3225 """
3226 self.require_ioctx_open()
3227 cdef:
3228 rados_snap_t _snap_id
3229 with nogil:
3230 ret = rados_ioctx_selfmanaged_snap_create(self.io, &_snap_id)
3231 if ret != 0:
3232 raise make_ex(ret, "Failed to create self-managed snapshot")
3233 return int(_snap_id)
3234
3235 @requires(('snap_id', int))
3236 def remove_self_managed_snap(self, snap_id):
3237 """
3238 Removes a self-managed snapshot
3239
3240 :param snap_id: the name of the snapshot
3241 :type snap_id: int
3242
3243 :raises: :class:`TypeError`
3244 :raises: :class:`Error`
3245 """
3246 self.require_ioctx_open()
3247 cdef:
3248 rados_snap_t _snap_id = snap_id
3249 with nogil:
3250 ret = rados_ioctx_selfmanaged_snap_remove(self.io, _snap_id)
3251 if ret != 0:
3252 raise make_ex(ret, "Failed to remove self-managed snapshot")
3253
3254 def set_self_managed_snap_write(self, snaps):
3255 """
3256 Updates the write context to the specified self-managed
3257 snapshot ids.
3258
3259 :param snaps: all associated self-managed snapshot ids
3260 :type snaps: list
3261
3262 :raises: :class:`TypeError`
3263 :raises: :class:`Error`
3264 """
3265 self.require_ioctx_open()
3266 sorted_snaps = []
3267 snap_seq = 0
3268 if snaps:
3269 sorted_snaps = sorted([int(x) for x in snaps], reverse=True)
3270 snap_seq = sorted_snaps[0]
3271
3272 cdef:
3273 rados_snap_t _snap_seq = snap_seq
3274 rados_snap_t *_snaps = NULL
3275 int _num_snaps = len(sorted_snaps)
3276 try:
3277 _snaps = <rados_snap_t *>malloc(_num_snaps * sizeof(rados_snap_t))
3278 for i in range(len(sorted_snaps)):
3279 _snaps[i] = sorted_snaps[i]
3280 with nogil:
3281 ret = rados_ioctx_selfmanaged_snap_set_write_ctx(self.io,
3282 _snap_seq,
3283 _snaps,
3284 _num_snaps)
3285 if ret != 0:
3286 raise make_ex(ret, "Failed to update snapshot write context")
3287 finally:
3288 free(_snaps)
3289
3290 @requires(('oid', str_type), ('snap_id', int))
3291 def rollback_self_managed_snap(self, oid, snap_id):
3292 """
3293 Rolls an specific object back to a self-managed snapshot revision
3294
3295 :param oid: the name of the object
3296 :type oid: str
3297 :param snap_id: the name of the snapshot
3298 :type snap_id: int
3299
3300 :raises: :class:`TypeError`
3301 :raises: :class:`Error`
3302 """
3303 self.require_ioctx_open()
3304 oid = cstr(oid, 'oid')
3305 cdef:
3306 char *_oid = oid
3307 rados_snap_t _snap_id = snap_id
3308 with nogil:
3309 ret = rados_ioctx_selfmanaged_snap_rollback(self.io, _oid, _snap_id)
3310 if ret != 0:
3311 raise make_ex(ret, "Failed to rollback %s" % oid)
3312
7c673cae
FG
3313 def get_last_version(self):
3314 """
3315 Return the version of the last object read or written to.
3316
3317 This exposes the internal version number of the last object read or
3318 written via this io context
3319
3320 :returns: version of the last object used
3321 """
3322 self.require_ioctx_open()
3323 with nogil:
3324 ret = rados_get_last_version(self.io)
3325 return int(ret)
3326
3327 def create_write_op(self):
3328 """
3329 create write operation object.
3330 need call release_write_op after use
3331 """
3332 return WriteOp().create()
3333
3334 def create_read_op(self):
3335 """
3336 create read operation object.
3337 need call release_read_op after use
3338 """
3339 return ReadOp().create()
3340
3341 def release_write_op(self, write_op):
3342 """
3343 release memory alloc by create_write_op
3344 """
3345 write_op.release()
3346
3347 def release_read_op(self, read_op):
3348 """
3349 release memory alloc by create_read_op
3350 :para read_op: read_op object
3351 :type: int
3352 """
3353 read_op.release()
3354
3355 @requires(('write_op', WriteOp), ('keys', tuple), ('values', tuple))
3356 def set_omap(self, write_op, keys, values):
3357 """
3358 set keys values to write_op
3359 :para write_op: write_operation object
3360 :type write_op: WriteOp
3361 :para keys: a tuple of keys
3362 :type keys: tuple
3363 :para values: a tuple of values
3364 :type values: tuple
3365 """
3366
3367 if len(keys) != len(values):
3368 raise Error("Rados(): keys and values must have the same number of items")
3369
3370 keys = cstr_list(keys, 'keys')
3371 cdef:
3372 WriteOp _write_op = write_op
3373 size_t key_num = len(keys)
3374 char **_keys = to_bytes_array(keys)
3375 char **_values = to_bytes_array(values)
3376 size_t *_lens = to_csize_t_array([len(v) for v in values])
3377
3378 try:
3379 with nogil:
3380 rados_write_op_omap_set(_write_op.write_op,
3381 <const char**>_keys,
3382 <const char**>_values,
3383 <const size_t*>_lens, key_num)
3384 finally:
3385 free(_keys)
3386 free(_values)
3387 free(_lens)
3388
3389 @requires(('write_op', WriteOp), ('oid', str_type), ('mtime', opt(int)), ('flags', opt(int)))
3390 def operate_write_op(self, write_op, oid, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3391 """
11fdf7f2 3392 execute the real write operation
7c673cae
FG
3393 :para write_op: write operation object
3394 :type write_op: WriteOp
3395 :para oid: object name
3396 :type oid: str
3397 :para mtime: the time to set the mtime to, 0 for the current time
3398 :type mtime: int
3399 :para flags: flags to apply to the entire operation
3400 :type flags: int
3401 """
3402
3403 oid = cstr(oid, 'oid')
3404 cdef:
3405 WriteOp _write_op = write_op
3406 char *_oid = oid
3407 time_t _mtime = mtime
3408 int _flags = flags
3409
3410 with nogil:
3411 ret = rados_write_op_operate(_write_op.write_op, self.io, _oid, &_mtime, _flags)
3412 if ret != 0:
3413 raise make_ex(ret, "Failed to operate write op for oid %s" % oid)
3414
3415 @requires(('write_op', WriteOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('mtime', opt(int)), ('flags', opt(int)))
3416 def operate_aio_write_op(self, write_op, oid, oncomplete=None, onsafe=None, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
3417 """
11fdf7f2 3418 execute the real write operation asynchronously
7c673cae
FG
3419 :para write_op: write operation object
3420 :type write_op: WriteOp
3421 :para oid: object name
3422 :type oid: str
3423 :param oncomplete: what to do when the remove is safe and complete in memory
3424 on all replicas
3425 :type oncomplete: completion
3426 :param onsafe: what to do when the remove is safe and complete on storage
3427 on all replicas
3428 :type onsafe: completion
3429 :para mtime: the time to set the mtime to, 0 for the current time
3430 :type mtime: int
3431 :para flags: flags to apply to the entire operation
3432 :type flags: int
3433
3434 :raises: :class:`Error`
3435 :returns: completion object
3436 """
3437
3438 oid = cstr(oid, 'oid')
3439 cdef:
3440 WriteOp _write_op = write_op
3441 char *_oid = oid
3442 Completion completion
3443 time_t _mtime = mtime
3444 int _flags = flags
3445
3446 completion = self.__get_completion(oncomplete, onsafe)
3447 self.__track_completion(completion)
3448
3449 with nogil:
3450 ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid,
3451 &_mtime, _flags)
3452 if ret != 0:
3453 completion._cleanup()
3454 raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid)
3455 return completion
3456
3457 @requires(('read_op', ReadOp), ('oid', str_type), ('flag', opt(int)))
3458 def operate_read_op(self, read_op, oid, flag=LIBRADOS_OPERATION_NOFLAG):
3459 """
11fdf7f2 3460 execute the real read operation
7c673cae
FG
3461 :para read_op: read operation object
3462 :type read_op: ReadOp
3463 :para oid: object name
3464 :type oid: str
3465 :para flag: flags to apply to the entire operation
3466 :type flag: int
3467 """
3468 oid = cstr(oid, 'oid')
3469 cdef:
3470 ReadOp _read_op = read_op
3471 char *_oid = oid
3472 int _flag = flag
3473
3474 with nogil:
3475 ret = rados_read_op_operate(_read_op.read_op, self.io, _oid, _flag)
3476 if ret != 0:
3477 raise make_ex(ret, "Failed to operate read op for oid %s" % oid)
3478
3479 @requires(('read_op', ReadOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('flag', opt(int)))
3480 def operate_aio_read_op(self, read_op, oid, oncomplete=None, onsafe=None, flag=LIBRADOS_OPERATION_NOFLAG):
3481 """
11fdf7f2 3482 execute the real read operation
7c673cae
FG
3483 :para read_op: read operation object
3484 :type read_op: ReadOp
3485 :para oid: object name
3486 :type oid: str
3487 :param oncomplete: what to do when the remove is safe and complete in memory
3488 on all replicas
3489 :type oncomplete: completion
3490 :param onsafe: what to do when the remove is safe and complete on storage
3491 on all replicas
3492 :type onsafe: completion
3493 :para flag: flags to apply to the entire operation
3494 :type flag: int
3495 """
3496 oid = cstr(oid, 'oid')
3497 cdef:
3498 ReadOp _read_op = read_op
3499 char *_oid = oid
3500 Completion completion
3501 int _flag = flag
3502
3503 completion = self.__get_completion(oncomplete, onsafe)
3504 self.__track_completion(completion)
3505
3506 with nogil:
3507 ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag)
3508 if ret != 0:
3509 completion._cleanup()
3510 raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid)
3511 return completion
3512
3513 @requires(('read_op', ReadOp), ('start_after', str_type), ('filter_prefix', str_type), ('max_return', int))
3514 def get_omap_vals(self, read_op, start_after, filter_prefix, max_return):
3515 """
3516 get the omap values
3517 :para read_op: read operation object
3518 :type read_op: ReadOp
3519 :para start_after: list keys starting after start_after
3520 :type start_after: str
3521 :para filter_prefix: list only keys beginning with filter_prefix
3522 :type filter_prefix: str
3523 :para max_return: list no more than max_return key/value pairs
3524 :type max_return: int
3525 :returns: an iterator over the requested omap values, return value from this action
3526 """
3527
3528 start_after = cstr(start_after, 'start_after') if start_after else None
3529 filter_prefix = cstr(filter_prefix, 'filter_prefix') if filter_prefix else None
3530 cdef:
3531 char *_start_after = opt_str(start_after)
3532 char *_filter_prefix = opt_str(filter_prefix)
3533 ReadOp _read_op = read_op
3534 rados_omap_iter_t iter_addr = NULL
3535 int _max_return = max_return
7c673cae
FG
3536
3537 with nogil:
d2e6a577 3538 rados_read_op_omap_get_vals2(_read_op.read_op, _start_after, _filter_prefix,
91327a77 3539 _max_return, &iter_addr, NULL, NULL)
7c673cae
FG
3540 it = OmapIterator(self)
3541 it.ctx = iter_addr
91327a77 3542 return it, 0 # 0 is meaningless; there for backward-compat
7c673cae
FG
3543
3544 @requires(('read_op', ReadOp), ('start_after', str_type), ('max_return', int))
3545 def get_omap_keys(self, read_op, start_after, max_return):
3546 """
3547 get the omap keys
3548 :para read_op: read operation object
3549 :type read_op: ReadOp
3550 :para start_after: list keys starting after start_after
3551 :type start_after: str
3552 :para max_return: list no more than max_return key/value pairs
3553 :type max_return: int
3554 :returns: an iterator over the requested omap values, return value from this action
3555 """
3556 start_after = cstr(start_after, 'start_after') if start_after else None
3557 cdef:
3558 char *_start_after = opt_str(start_after)
3559 ReadOp _read_op = read_op
3560 rados_omap_iter_t iter_addr = NULL
3561 int _max_return = max_return
7c673cae
FG
3562
3563 with nogil:
d2e6a577 3564 rados_read_op_omap_get_keys2(_read_op.read_op, _start_after,
91327a77 3565 _max_return, &iter_addr, NULL, NULL)
7c673cae
FG
3566 it = OmapIterator(self)
3567 it.ctx = iter_addr
91327a77 3568 return it, 0 # 0 is meaningless; there for backward-compat
7c673cae
FG
3569
3570 @requires(('read_op', ReadOp), ('keys', tuple))
3571 def get_omap_vals_by_keys(self, read_op, keys):
3572 """
3573 get the omap values by keys
3574 :para read_op: read operation object
3575 :type read_op: ReadOp
3576 :para keys: input key tuple
3577 :type keys: tuple
3578 :returns: an iterator over the requested omap values, return value from this action
3579 """
3580 keys = cstr_list(keys, 'keys')
3581 cdef:
3582 ReadOp _read_op = read_op
3583 rados_omap_iter_t iter_addr
3584 char **_keys = to_bytes_array(keys)
3585 size_t key_num = len(keys)
7c673cae
FG
3586
3587 try:
3588 with nogil:
3589 rados_read_op_omap_get_vals_by_keys(_read_op.read_op,
3590 <const char**>_keys,
91327a77 3591 key_num, &iter_addr, NULL)
7c673cae
FG
3592 it = OmapIterator(self)
3593 it.ctx = iter_addr
91327a77 3594 return it, 0 # 0 is meaningless; there for backward-compat
7c673cae
FG
3595 finally:
3596 free(_keys)
3597
3598 @requires(('write_op', WriteOp), ('keys', tuple))
3599 def remove_omap_keys(self, write_op, keys):
3600 """
3601 remove omap keys specifiled
3602 :para write_op: write operation object
3603 :type write_op: WriteOp
3604 :para keys: input key tuple
3605 :type keys: tuple
3606 """
3607
3608 keys = cstr_list(keys, 'keys')
3609 cdef:
3610 WriteOp _write_op = write_op
3611 size_t key_num = len(keys)
3612 char **_keys = to_bytes_array(keys)
3613
3614 try:
3615 with nogil:
3616 rados_write_op_omap_rm_keys(_write_op.write_op, <const char**>_keys, key_num)
3617 finally:
3618 free(_keys)
3619
3620 @requires(('write_op', WriteOp))
3621 def clear_omap(self, write_op):
3622 """
3623 Remove all key/value pairs from an object
3624 :para write_op: write operation object
3625 :type write_op: WriteOp
3626 """
3627
3628 cdef:
3629 WriteOp _write_op = write_op
3630
3631 with nogil:
3632 rados_write_op_omap_clear(_write_op.write_op)
3633
3634 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('desc', str_type),
3635 ('duration', opt(int)), ('flags', int))
3636 def lock_exclusive(self, key, name, cookie, desc="", duration=None, flags=0):
3637
3638 """
3639 Take an exclusive lock on an object
3640
3641 :param key: name of the object
3642 :type key: str
3643 :param name: name of the lock
3644 :type name: str
3645 :param cookie: cookie of the lock
3646 :type cookie: str
3647 :param desc: description of the lock
3648 :type desc: str
3649 :param duration: duration of the lock in seconds
3650 :type duration: int
3651 :param flags: flags
3652 :type flags: int
3653
3654 :raises: :class:`TypeError`
3655 :raises: :class:`Error`
3656 """
3657 self.require_ioctx_open()
3658
3659 key = cstr(key, 'key')
3660 name = cstr(name, 'name')
3661 cookie = cstr(cookie, 'cookie')
3662 desc = cstr(desc, 'desc')
3663
3664 cdef:
3665 char* _key = key
3666 char* _name = name
3667 char* _cookie = cookie
3668 char* _desc = desc
3669 uint8_t _flags = flags
3670 timeval _duration
3671
3672 if duration is None:
3673 with nogil:
3674 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3675 NULL, _flags)
3676 else:
3677 _duration.tv_sec = duration
3678 with nogil:
3679 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3680 &_duration, _flags)
3681
3682 if ret < 0:
3683 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3684
3685 @requires(('key', str_type), ('name', str_type), ('cookie', str_type), ('tag', str_type),
3686 ('desc', str_type), ('duration', opt(int)), ('flags', int))
3687 def lock_shared(self, key, name, cookie, tag, desc="", duration=None, flags=0):
3688
3689 """
3690 Take a shared lock on an object
3691
3692 :param key: name of the object
3693 :type key: str
3694 :param name: name of the lock
3695 :type name: str
3696 :param cookie: cookie of the lock
3697 :type cookie: str
3698 :param tag: tag of the lock
3699 :type tag: str
3700 :param desc: description of the lock
3701 :type desc: str
3702 :param duration: duration of the lock in seconds
3703 :type duration: int
3704 :param flags: flags
3705 :type flags: int
3706
3707 :raises: :class:`TypeError`
3708 :raises: :class:`Error`
3709 """
3710 self.require_ioctx_open()
3711
3712 key = cstr(key, 'key')
3713 tag = cstr(tag, 'tag')
3714 name = cstr(name, 'name')
3715 cookie = cstr(cookie, 'cookie')
3716 desc = cstr(desc, 'desc')
3717
3718 cdef:
3719 char* _key = key
3720 char* _tag = tag
3721 char* _name = name
3722 char* _cookie = cookie
3723 char* _desc = desc
3724 uint8_t _flags = flags
3725 timeval _duration
3726
3727 if duration is None:
3728 with nogil:
3729 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3730 NULL, _flags)
3731 else:
3732 _duration.tv_sec = duration
3733 with nogil:
3734 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3735 &_duration, _flags)
3736 if ret < 0:
3737 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3738
3739 @requires(('key', str_type), ('name', str_type), ('cookie', str_type))
3740 def unlock(self, key, name, cookie):
3741
3742 """
3743 Release a shared or exclusive lock on an object
3744
3745 :param key: name of the object
3746 :type key: str
3747 :param name: name of the lock
3748 :type name: str
3749 :param cookie: cookie of the lock
3750 :type cookie: str
3751
3752 :raises: :class:`TypeError`
3753 :raises: :class:`Error`
3754 """
3755 self.require_ioctx_open()
3756
3757 key = cstr(key, 'key')
3758 name = cstr(name, 'name')
3759 cookie = cstr(cookie, 'cookie')
3760
3761 cdef:
3762 char* _key = key
3763 char* _name = name
3764 char* _cookie = cookie
3765
3766 with nogil:
3767 ret = rados_unlock(self.io, _key, _name, _cookie)
3768 if ret < 0:
3769 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3770
11fdf7f2
TL
3771 def set_osdmap_full_try(self):
3772 """
3773 Set global osdmap_full_try label to true
3774 """
3775 with nogil:
3776 rados_set_osdmap_full_try(self.io)
3777
3778 def unset_osdmap_full_try(self):
3779 """
3780 Unset
3781 """
3782 with nogil:
3783 rados_unset_osdmap_full_try(self.io)
3784
c07f9fc5
FG
3785 def application_enable(self, app_name, force=False):
3786 """
3787 Enable an application on an OSD pool
3788
3789 :param app_name: application name
3790 :type app_name: str
3791 :param force: False if only a single app should exist per pool
3792 :type expire_seconds: boool
3793
3794 :raises: :class:`Error`
3795 """
3796 app_name = cstr(app_name, 'app_name')
3797 cdef:
3798 char *_app_name = app_name
3799 int _force = (1 if force else 0)
3800
3801 with nogil:
3802 ret = rados_application_enable(self.io, _app_name, _force)
3803 if ret < 0:
3804 raise make_ex(ret, "error enabling application")
3805
3806 def application_list(self):
3807 """
3808 Returns a list of enabled applications
3809
3810 :returns: list of app name string
3811 """
3812 cdef:
3813 size_t length = 128
3814 char *apps = NULL
3815
3816 try:
3817 while True:
3818 apps = <char *>realloc_chk(apps, length)
3819 with nogil:
3820 ret = rados_application_list(self.io, apps, &length)
3821 if ret == 0:
3822 return [decode_cstr(app) for app in
3823 apps[:length].split(b'\0') if app]
3824 elif ret == -errno.ENOENT:
3825 return None
3826 elif ret == -errno.ERANGE:
3827 pass
3828 else:
3829 raise make_ex(ret, "error listing applications")
3830 finally:
3831 free(apps)
3832
3833 def application_metadata_set(self, app_name, key, value):
3834 """
3835 Sets application metadata on an OSD pool
3836
3837 :param app_name: application name
3838 :type app_name: str
3839 :param key: metadata key
3840 :type key: str
3841 :param value: metadata value
3842 :type value: str
3843
3844 :raises: :class:`Error`
3845 """
3846 app_name = cstr(app_name, 'app_name')
3847 key = cstr(key, 'key')
3848 value = cstr(value, 'value')
3849 cdef:
3850 char *_app_name = app_name
3851 char *_key = key
3852 char *_value = value
3853
3854 with nogil:
3855 ret = rados_application_metadata_set(self.io, _app_name, _key,
3856 _value)
3857 if ret < 0:
3858 raise make_ex(ret, "error setting application metadata")
3859
3860 def application_metadata_remove(self, app_name, key):
3861 """
3862 Remove application metadata from an OSD pool
3863
3864 :param app_name: application name
3865 :type app_name: str
3866 :param key: metadata key
3867 :type key: str
3868
3869 :raises: :class:`Error`
3870 """
3871 app_name = cstr(app_name, 'app_name')
3872 key = cstr(key, 'key')
3873 cdef:
3874 char *_app_name = app_name
3875 char *_key = key
3876
3877 with nogil:
3878 ret = rados_application_metadata_remove(self.io, _app_name, _key)
3879 if ret < 0:
3880 raise make_ex(ret, "error removing application metadata")
3881
3882 def application_metadata_list(self, app_name):
3883 """
3884 Returns a list of enabled applications
3885
3886 :param app_name: application name
3887 :type app_name: str
3888 :returns: list of key/value tuples
3889 """
3890 app_name = cstr(app_name, 'app_name')
3891 cdef:
3892 char *_app_name = app_name
3893 size_t key_length = 128
3894 size_t val_length = 128
3895 char *c_keys = NULL
3896 char *c_vals = NULL
3897
3898 try:
3899 while True:
3900 c_keys = <char *>realloc_chk(c_keys, key_length)
3901 c_vals = <char *>realloc_chk(c_vals, val_length)
3902 with nogil:
3903 ret = rados_application_metadata_list(self.io, _app_name,
3904 c_keys, &key_length,
3905 c_vals, &val_length)
3906 if ret == 0:
3907 keys = [decode_cstr(key) for key in
11fdf7f2 3908 c_keys[:key_length].split(b'\0')]
c07f9fc5 3909 vals = [decode_cstr(val) for val in
11fdf7f2
TL
3910 c_vals[:val_length].split(b'\0')]
3911 return zip(keys, vals)[:-1]
c07f9fc5
FG
3912 elif ret == -errno.ERANGE:
3913 pass
3914 else:
3915 raise make_ex(ret, "error listing application metadata")
3916 finally:
3917 free(c_keys)
3918 free(c_vals)
3919
11fdf7f2
TL
3920 def alignment(self):
3921 """
3922 Returns pool alignment
3923
3924 :returns:
3925 Number of alignment bytes required by the current pool, or None if
3926 alignment is not required.
3927 """
3928 cdef:
3929 int requires = 0
3930 uint64_t _alignment
3931
3932 with nogil:
3933 ret = rados_ioctx_pool_requires_alignment2(self.io, &requires)
3934 if ret != 0:
3935 raise make_ex(ret, "error checking alignment")
3936
3937 alignment = None
3938 if requires:
3939 with nogil:
3940 ret = rados_ioctx_pool_required_alignment2(self.io, &_alignment)
3941 if ret != 0:
3942 raise make_ex(ret, "error querying alignment")
3943 alignment = _alignment
3944 return alignment
3945
7c673cae
FG
3946
3947def set_object_locator(func):
3948 def retfunc(self, *args, **kwargs):
3949 if self.locator_key is not None:
3950 old_locator = self.ioctx.get_locator_key()
3951 self.ioctx.set_locator_key(self.locator_key)
3952 retval = func(self, *args, **kwargs)
3953 self.ioctx.set_locator_key(old_locator)
3954 return retval
3955 else:
3956 return func(self, *args, **kwargs)
3957 return retfunc
3958
3959
3960def set_object_namespace(func):
3961 def retfunc(self, *args, **kwargs):
3962 if self.nspace is None:
3963 raise LogicError("Namespace not set properly in context")
3964 old_nspace = self.ioctx.get_namespace()
3965 self.ioctx.set_namespace(self.nspace)
3966 retval = func(self, *args, **kwargs)
3967 self.ioctx.set_namespace(old_nspace)
3968 return retval
3969 return retfunc
3970
3971
3972class Object(object):
3973 """Rados object wrapper, makes the object look like a file"""
3974 def __init__(self, ioctx, key, locator_key=None, nspace=None):
3975 self.key = key
3976 self.ioctx = ioctx
3977 self.offset = 0
3978 self.state = "exists"
3979 self.locator_key = locator_key
3980 self.nspace = "" if nspace is None else nspace
3981
3982 def __str__(self):
3983 return "rados.Object(ioctx=%s,key=%s,nspace=%s,locator=%s)" % \
3984 (str(self.ioctx), self.key, "--default--"
3985 if self.nspace is "" else self.nspace, self.locator_key)
3986
3987 def require_object_exists(self):
3988 if self.state != "exists":
3989 raise ObjectStateError("The object is %s" % self.state)
3990
3991 @set_object_locator
3992 @set_object_namespace
3993 def read(self, length=1024 * 1024):
3994 self.require_object_exists()
3995 ret = self.ioctx.read(self.key, length, self.offset)
3996 self.offset += len(ret)
3997 return ret
3998
3999 @set_object_locator
4000 @set_object_namespace
4001 def write(self, string_to_write):
4002 self.require_object_exists()
4003 ret = self.ioctx.write(self.key, string_to_write, self.offset)
4004 if ret == 0:
4005 self.offset += len(string_to_write)
4006 return ret
4007
4008 @set_object_locator
4009 @set_object_namespace
4010 def remove(self):
4011 self.require_object_exists()
4012 self.ioctx.remove_object(self.key)
4013 self.state = "removed"
4014
4015 @set_object_locator
4016 @set_object_namespace
4017 def stat(self):
4018 self.require_object_exists()
4019 return self.ioctx.stat(self.key)
4020
4021 def seek(self, position):
4022 self.require_object_exists()
4023 self.offset = position
4024
4025 @set_object_locator
4026 @set_object_namespace
4027 def get_xattr(self, xattr_name):
4028 self.require_object_exists()
4029 return self.ioctx.get_xattr(self.key, xattr_name)
4030
4031 @set_object_locator
4032 @set_object_namespace
4033 def get_xattrs(self):
4034 self.require_object_exists()
4035 return self.ioctx.get_xattrs(self.key)
4036
4037 @set_object_locator
4038 @set_object_namespace
4039 def set_xattr(self, xattr_name, xattr_value):
4040 self.require_object_exists()
4041 return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)
4042
4043 @set_object_locator
4044 @set_object_namespace
4045 def rm_xattr(self, xattr_name):
4046 self.require_object_exists()
4047 return self.ioctx.rm_xattr(self.key, xattr_name)
4048
4049MONITOR_LEVELS = [
4050 "debug",
4051 "info",
4052 "warn", "warning",
4053 "err", "error",
4054 "sec",
4055 ]
4056
4057
4058class MonitorLog(object):
4059 # NOTE(sileht): Keep this class for backward compat
4060 # method moved to Rados.monitor_log()
4061 """
4062 For watching cluster log messages. Instantiate an object and keep
4063 it around while callback is periodically called. Construct with
4064 'level' to monitor 'level' messages (one of MONITOR_LEVELS).
4065 arg will be passed to the callback.
4066
4067 callback will be called with:
4068 arg (given to __init__)
4069 line (the full line, including timestamp, who, level, msg)
4070 who (which entity issued the log message)
4071 timestamp_sec (sec of a struct timespec)
4072 timestamp_nsec (sec of a struct timespec)
4073 seq (sequence number)
4074 level (string representing the level of the log message)
4075 msg (the message itself)
4076 callback's return value is ignored
4077 """
4078 def __init__(self, cluster, level, callback, arg):
4079 self.level = level
4080 self.callback = callback
4081 self.arg = arg
4082 self.cluster = cluster
4083 self.cluster.monitor_log(level, callback, arg)
4084