]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/rados/rados.pyx
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / pybind / rados / rados.pyx
CommitLineData
f67539c2 1# cython: embedsignature=True, binding=True
7c673cae
FG
2"""
3This module is a thin wrapper around librados.
4
5Error codes from librados are turned into exceptions that subclass
6:class:`Error`. Almost all methods may raise :class:`Error(the base class of all rados exceptions), :class:`PermissionError`
7(the base class of all rados exceptions), :class:`PermissionError`
8and :class:`IOError`, in addition to those documented for the
9method.
10"""
11# Copyright 2011 Josh Durgin
12# Copyright 2011, Hannu Valtonen <hannu.valtonen@ormod.com>
13# Copyright 2015 Hector Martin <marcan@marcan.st>
14# Copyright 2016 Mehdi Abaakouk <sileht@redhat.com>
15
16from cpython cimport PyObject, ref
17from cpython.pycapsule cimport *
18from libc cimport errno
19from libc.stdint cimport *
20from libc.stdlib cimport malloc, realloc, free
f67539c2
TL
21IF BUILD_DOC:
22 include "mock_rados.pxi"
23ELSE:
24 from c_rados cimport *
7c673cae 25
7c673cae
FG
26import threading
27import time
28
cd265ab1 29from datetime import datetime, timedelta
7c673cae
FG
30from functools import partial, wraps
31from itertools import chain
f67539c2 32from typing import Callable, Dict, List, Optional, Sequence, Tuple, Union
7c673cae
FG
33
34cdef extern from "Python.h":
35 # These are in cpython/string.pxd, but use "object" types instead of
36 # PyObject*, which invokes assumptions in cpython that we need to
37 # legitimately break to implement zero-copy string buffers in Ioctx.read().
38 # This is valid use of the Python API and documented as a special case.
39 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
40 char* PyBytes_AsString(PyObject *string) except NULL
41 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
42 void PyEval_InitThreads()
43
7c673cae
FG
44LIBRADOS_OP_FLAG_EXCL = _LIBRADOS_OP_FLAG_EXCL
45LIBRADOS_OP_FLAG_FAILOK = _LIBRADOS_OP_FLAG_FAILOK
46LIBRADOS_OP_FLAG_FADVISE_RANDOM = _LIBRADOS_OP_FLAG_FADVISE_RANDOM
47LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL = _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL
48LIBRADOS_OP_FLAG_FADVISE_WILLNEED = _LIBRADOS_OP_FLAG_FADVISE_WILLNEED
49LIBRADOS_OP_FLAG_FADVISE_DONTNEED = _LIBRADOS_OP_FLAG_FADVISE_DONTNEED
50LIBRADOS_OP_FLAG_FADVISE_NOCACHE = _LIBRADOS_OP_FLAG_FADVISE_NOCACHE
51
20effc67
TL
52LIBRADOS_CMPXATTR_OP_EQ = _LIBRADOS_CMPXATTR_OP_EQ
53LIBRADOS_CMPXATTR_OP_NE = _LIBRADOS_CMPXATTR_OP_NE
54LIBRADOS_CMPXATTR_OP_GT = _LIBRADOS_CMPXATTR_OP_GT
55LIBRADOS_CMPXATTR_OP_GTE = _LIBRADOS_CMPXATTR_OP_GTE
56LIBRADOS_CMPXATTR_OP_LT = _LIBRADOS_CMPXATTR_OP_LT
57LIBRADOS_CMPXATTR_OP_LTE = _LIBRADOS_CMPXATTR_OP_LTE
58
7c673cae
FG
59LIBRADOS_SNAP_HEAD = _LIBRADOS_SNAP_HEAD
60
61LIBRADOS_OPERATION_NOFLAG = _LIBRADOS_OPERATION_NOFLAG
62LIBRADOS_OPERATION_BALANCE_READS = _LIBRADOS_OPERATION_BALANCE_READS
63LIBRADOS_OPERATION_LOCALIZE_READS = _LIBRADOS_OPERATION_LOCALIZE_READS
64LIBRADOS_OPERATION_ORDER_READS_WRITES = _LIBRADOS_OPERATION_ORDER_READS_WRITES
65LIBRADOS_OPERATION_IGNORE_CACHE = _LIBRADOS_OPERATION_IGNORE_CACHE
66LIBRADOS_OPERATION_SKIPRWLOCKS = _LIBRADOS_OPERATION_SKIPRWLOCKS
67LIBRADOS_OPERATION_IGNORE_OVERLAY = _LIBRADOS_OPERATION_IGNORE_OVERLAY
68
69LIBRADOS_ALL_NSPACES = _LIBRADOS_ALL_NSPACES.decode('utf-8')
70
71LIBRADOS_CREATE_EXCLUSIVE = _LIBRADOS_CREATE_EXCLUSIVE
72LIBRADOS_CREATE_IDEMPOTENT = _LIBRADOS_CREATE_IDEMPOTENT
73
f67539c2
TL
74MAX_ERRNO = _MAX_ERRNO
75
7c673cae
FG
76ANONYMOUS_AUID = 0xffffffffffffffff
77ADMIN_AUID = 0
78
05a536ef 79OMAP_KEY_TYPE = Union[str,bytes]
7c673cae
FG
80
81class Error(Exception):
82 """ `Error` class, derived from `Exception` """
224ce89b 83 def __init__(self, message, errno=None):
1adf2230 84 super(Exception, self).__init__(message)
7c673cae 85 self.errno = errno
7c673cae
FG
86
87 def __str__(self):
1adf2230 88 msg = super(Exception, self).__str__()
224ce89b
WB
89 if self.errno is None:
90 return msg
91 return '[errno {0}] {1}'.format(self.errno, msg)
7c673cae 92
224ce89b
WB
93 def __reduce__(self):
94 return (self.__class__, (self.message, self.errno))
7c673cae 95
1adf2230 96class InvalidArgumentError(Error):
9f95a23c
TL
97 def __init__(self, message, errno=None):
98 super(InvalidArgumentError, self).__init__(
99 "RADOS invalid argument (%s)" % message, errno)
100
f67539c2
TL
101class ExtendMismatch(Error):
102 def __init__(self, message, errno, offset):
103 super().__init__(
104 "object content does not match (%s)" % message, errno)
105 self.offset = offset
1adf2230
AA
106
107class OSError(Error):
108 """ `OSError` class, derived from `Error` """
109 pass
110
7c673cae
FG
111class InterruptedOrTimeoutError(OSError):
112 """ `InterruptedOrTimeoutError` class, derived from `OSError` """
9f95a23c
TL
113 def __init__(self, message, errno=None):
114 super(InterruptedOrTimeoutError, self).__init__(
115 "RADOS interrupted or timeout (%s)" % message, errno)
7c673cae
FG
116
117
118class PermissionError(OSError):
119 """ `PermissionError` class, derived from `OSError` """
9f95a23c
TL
120 def __init__(self, message, errno=None):
121 super(PermissionError, self).__init__(
122 "RADOS permission error (%s)" % message, errno)
7c673cae
FG
123
124
125class PermissionDeniedError(OSError):
126 """ deal with EACCES related. """
9f95a23c
TL
127 def __init__(self, message, errno=None):
128 super(PermissionDeniedError, self).__init__(
129 "RADOS permission denied (%s)" % message, errno)
7c673cae
FG
130
131
132class ObjectNotFound(OSError):
133 """ `ObjectNotFound` class, derived from `OSError` """
9f95a23c
TL
134 def __init__(self, message, errno=None):
135 super(ObjectNotFound, self).__init__(
136 "RADOS object not found (%s)" % message, errno)
7c673cae
FG
137
138
139class NoData(OSError):
140 """ `NoData` class, derived from `OSError` """
9f95a23c
TL
141 def __init__(self, message, errno=None):
142 super(NoData, self).__init__(
143 "RADOS no data (%s)" % message, errno)
7c673cae
FG
144
145
146class ObjectExists(OSError):
147 """ `ObjectExists` class, derived from `OSError` """
9f95a23c
TL
148 def __init__(self, message, errno=None):
149 super(ObjectExists, self).__init__(
150 "RADOS object exists (%s)" % message, errno)
7c673cae
FG
151
152
153class ObjectBusy(OSError):
154 """ `ObjectBusy` class, derived from `IOError` """
9f95a23c
TL
155 def __init__(self, message, errno=None):
156 super(ObjectBusy, self).__init__(
157 "RADOS object busy (%s)" % message, errno)
7c673cae
FG
158
159
160class IOError(OSError):
161 """ `ObjectBusy` class, derived from `OSError` """
9f95a23c
TL
162 def __init__(self, message, errno=None):
163 super(IOError, self).__init__(
164 "RADOS I/O error (%s)" % message, errno)
7c673cae
FG
165
166
167class NoSpace(OSError):
168 """ `NoSpace` class, derived from `OSError` """
9f95a23c
TL
169 def __init__(self, message, errno=None):
170 super(NoSpace, self).__init__(
171 "RADOS no space (%s)" % message, errno)
7c673cae 172
cd265ab1
TL
173class NotConnected(OSError):
174 """ `NotConnected` class, derived from `OSError` """
175 def __init__(self, message, errno=None):
176 super(NotConnected, self).__init__(
177 "RADOS not connected (%s)" % message, errno)
7c673cae
FG
178
179class RadosStateError(Error):
180 """ `RadosStateError` class, derived from `Error` """
9f95a23c
TL
181 def __init__(self, message, errno=None):
182 super(RadosStateError, self).__init__(
183 "RADOS rados state (%s)" % message, errno)
7c673cae
FG
184
185
186class IoctxStateError(Error):
187 """ `IoctxStateError` class, derived from `Error` """
9f95a23c
TL
188 def __init__(self, message, errno=None):
189 super(IoctxStateError, self).__init__(
190 "RADOS Ioctx state error (%s)" % message, errno)
7c673cae
FG
191
192
193class ObjectStateError(Error):
194 """ `ObjectStateError` class, derived from `Error` """
9f95a23c
TL
195 def __init__(self, message, errno=None):
196 super(ObjectStateError, self).__init__(
197 "RADOS object state error (%s)" % message, errno)
7c673cae
FG
198
199
200class LogicError(Error):
201 """ `` class, derived from `Error` """
9f95a23c
TL
202 def __init__(self, message, errno=None):
203 super(LogicError, self).__init__(
204 "RADOS logic error (%s)" % message, errno)
7c673cae
FG
205
206
207class TimedOut(OSError):
208 """ `TimedOut` class, derived from `OSError` """
9f95a23c
TL
209 def __init__(self, message, errno=None):
210 super(TimedOut, self).__init__(
211 "RADOS timed out (%s)" % message, errno)
212
213
214class InProgress(Error):
215 """ `InProgress` class, derived from `Error` """
216 def __init__(self, message, errno=None):
217 super(InProgress, self).__init__(
218 "RADOS in progress error (%s)" % message, errno)
219
220
221class IsConnected(Error):
222 """ `IsConnected` class, derived from `Error` """
223 def __init__(self, message, errno=None):
224 super(IsConnected, self).__init__(
225 "RADOS is connected error (%s)" % message, errno)
7c673cae
FG
226
227
1e59de90
TL
228class ConnectionShutdown(OSError):
229 """ `ConnectionShutdown` class, derived from `OSError` """
230 def __init__(self, message, errno=None):
231 super(ConnectionShutdown, self).__init__(
232 "RADOS connection was shutdown (%s)" % message, errno)
233
234
7c673cae
FG
235IF UNAME_SYSNAME == "FreeBSD":
236 cdef errno_to_exception = {
237 errno.EPERM : PermissionError,
238 errno.ENOENT : ObjectNotFound,
239 errno.EIO : IOError,
240 errno.ENOSPC : NoSpace,
241 errno.EEXIST : ObjectExists,
242 errno.EBUSY : ObjectBusy,
243 errno.ENOATTR : NoData,
244 errno.EINTR : InterruptedOrTimeoutError,
245 errno.ETIMEDOUT : TimedOut,
246 errno.EACCES : PermissionDeniedError,
9f95a23c
TL
247 errno.EINPROGRESS : InProgress,
248 errno.EISCONN : IsConnected,
7c673cae 249 errno.EINVAL : InvalidArgumentError,
cd265ab1 250 errno.ENOTCONN : NotConnected,
1e59de90 251 errno.ESHUTDOWN : ConnectionShutdown,
7c673cae
FG
252 }
253ELSE:
254 cdef errno_to_exception = {
255 errno.EPERM : PermissionError,
256 errno.ENOENT : ObjectNotFound,
257 errno.EIO : IOError,
258 errno.ENOSPC : NoSpace,
259 errno.EEXIST : ObjectExists,
260 errno.EBUSY : ObjectBusy,
261 errno.ENODATA : NoData,
262 errno.EINTR : InterruptedOrTimeoutError,
263 errno.ETIMEDOUT : TimedOut,
264 errno.EACCES : PermissionDeniedError,
9f95a23c
TL
265 errno.EINPROGRESS : InProgress,
266 errno.EISCONN : IsConnected,
7c673cae 267 errno.EINVAL : InvalidArgumentError,
cd265ab1 268 errno.ENOTCONN : NotConnected,
1e59de90 269 errno.ESHUTDOWN : ConnectionShutdown,
7c673cae
FG
270 }
271
272
f67539c2 273cdef make_ex(ret: int, msg: str):
7c673cae
FG
274 """
275 Translate a librados return code into an exception.
276
277 :param ret: the return code
278 :type ret: int
279 :param msg: the error message to use
280 :type msg: str
281 :returns: a subclass of :class:`Error`
282 """
283 ret = abs(ret)
284 if ret in errno_to_exception:
224ce89b 285 return errno_to_exception[ret](msg, errno=ret)
f67539c2
TL
286 elif ret > MAX_ERRNO:
287 offset = ret - MAX_ERRNO
288 return ExtendMismatch(msg, ret, offset)
7c673cae 289 else:
224ce89b 290 return OSError(msg, errno=ret)
7c673cae
FG
291
292
20effc67 293def cstr(val, name, encoding="utf-8", opt=False) -> Optional[bytes]:
7c673cae
FG
294 """
295 Create a byte string from a Python string
296
297 :param basestring val: Python string
298 :param str name: Name of the string parameter, for exceptions
299 :param str encoding: Encoding to use
300 :param bool opt: If True, None is allowed
7c673cae
FG
301 :raises: :class:`InvalidArgument`
302 """
303 if opt and val is None:
304 return None
305 if isinstance(val, bytes):
306 return val
f67539c2 307 elif isinstance(val, str):
7c673cae
FG
308 return val.encode(encoding)
309 else:
310 raise TypeError('%s must be a string' % name)
311
312
313def cstr_list(list_str, name, encoding="utf-8"):
314 return [cstr(s, name) for s in list_str]
315
316
20effc67 317def decode_cstr(val, encoding="utf-8") -> Optional[str]:
7c673cae
FG
318 """
319 Decode a byte string into a Python string.
320
321 :param bytes val: byte string
7c673cae
FG
322 """
323 if val is None:
324 return None
325
326 return val.decode(encoding)
327
328
9f95a23c
TL
329def flatten_dict(d, name):
330 items = chain.from_iterable(d.items())
331 return cstr(''.join(i + '\0' for i in items), name)
332
333
7c673cae
FG
334cdef char* opt_str(s) except? NULL:
335 if s is None:
336 return NULL
337 return s
338
339
340cdef void* realloc_chk(void* ptr, size_t size) except NULL:
341 cdef void *ret = realloc(ptr, size)
342 if ret == NULL:
343 raise MemoryError("realloc failed")
344 return ret
345
346
347cdef size_t * to_csize_t_array(list_int):
348 cdef size_t *ret = <size_t *>malloc(len(list_int) * sizeof(size_t))
349 if ret == NULL:
350 raise MemoryError("malloc failed")
e306af50 351 for i in range(len(list_int)):
7c673cae
FG
352 ret[i] = <size_t>list_int[i]
353 return ret
354
355
356cdef char ** to_bytes_array(list_bytes):
357 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
358 if ret == NULL:
359 raise MemoryError("malloc failed")
e306af50 360 for i in range(len(list_bytes)):
7c673cae
FG
361 ret[i] = <char *>list_bytes[i]
362 return ret
363
7c673cae
FG
364cdef int __monitor_callback(void *arg, const char *line, const char *who,
365 uint64_t sec, uint64_t nsec, uint64_t seq,
366 const char *level, const char *msg) with gil:
367 cdef object cb_info = <object>arg
368 cb_info[0](cb_info[1], line, who, sec, nsec, seq, level, msg)
369 return 0
370
224ce89b
WB
371cdef int __monitor_callback2(void *arg, const char *line, const char *channel,
372 const char *who,
31f18b77
FG
373 const char *name,
374 uint64_t sec, uint64_t nsec, uint64_t seq,
375 const char *level, const char *msg) with gil:
376 cdef object cb_info = <object>arg
224ce89b 377 cb_info[0](cb_info[1], line, channel, name, who, sec, nsec, seq, level, msg)
31f18b77
FG
378 return 0
379
7c673cae
FG
380
381class Version(object):
382 """ Version information """
383 def __init__(self, major, minor, extra):
384 self.major = major
385 self.minor = minor
386 self.extra = extra
387
388 def __str__(self):
389 return "%d.%d.%d" % (self.major, self.minor, self.extra)
390
391
392cdef class Rados(object):
393 """This class wraps librados functions"""
394 # NOTE(sileht): attributes declared in .pyd
395
396 def __init__(self, *args, **kwargs):
397 PyEval_InitThreads()
398 self.__setup(*args, **kwargs)
399
f91f0fd5
TL
400 NO_CONF_FILE = -1
401 "special value that indicates no conffile should be read when creating a mount handle"
402 DEFAULT_CONF_FILES = -2
403 "special value that indicates the default conffiles should be read when creating a mount handle"
404
f67539c2
TL
405 def __setup(self,
406 rados_id: Optional[str] = None,
407 name: Optional[str] = None,
408 clustername: Optional[str] = None,
409 conf_defaults: Optional[Dict[str, str]] = None,
410 conffile: Union[str, int, None] = NO_CONF_FILE,
411 conf: Optional[Dict[str, str]] = None,
412 flags: int = 0,
413 context: object = None):
7c673cae 414 self.monitor_callback = None
31f18b77 415 self.monitor_callback2 = None
7c673cae
FG
416 self.parsed_args = []
417 self.conf_defaults = conf_defaults
418 self.conffile = conffile
419 self.rados_id = rados_id
420
421 if rados_id and name:
422 raise Error("Rados(): can't supply both rados_id and name")
423 elif rados_id:
424 name = 'client.' + rados_id
425 elif name is None:
426 name = 'client.admin'
427 if clustername is None:
428 clustername = ''
429
f67539c2
TL
430 name_raw = cstr(name, 'name')
431 clustername_raw = cstr(clustername, 'clustername')
7c673cae 432 cdef:
f67539c2
TL
433 char *_name = name_raw
434 char *_clustername = clustername_raw
7c673cae
FG
435 int _flags = flags
436 int ret
437
438 if context:
439 # Unpack void* (aka rados_config_t) from capsule
440 rados_config = <rados_config_t> PyCapsule_GetPointer(context, NULL)
441 with nogil:
442 ret = rados_create_with_context(&self.cluster, rados_config)
443 else:
444 with nogil:
445 ret = rados_create2(&self.cluster, _clustername, _name, _flags)
446 if ret != 0:
447 raise Error("rados_initialize failed with error code: %d" % ret)
448
449 self.state = "configuring"
450 # order is important: conf_defaults, then conffile, then conf
451 if conf_defaults:
452 for key, value in conf_defaults.items():
453 self.conf_set(key, value)
f91f0fd5
TL
454 if conffile in (self.NO_CONF_FILE, None):
455 pass
456 elif conffile in (self.DEFAULT_CONF_FILES, ''):
457 self.conf_read_file(None)
458 else:
7c673cae
FG
459 self.conf_read_file(conffile)
460 if conf:
461 for key, value in conf.items():
462 self.conf_set(key, value)
463
9f95a23c
TL
464 def get_addrs(self):
465 """
466 Get associated client addresses with this RADOS session.
467 """
468 self.require_state("configuring", "connected")
469
470 cdef:
471 char* addrs = NULL
472
473 try:
474
475 with nogil:
476 ret = rados_getaddrs(self.cluster, &addrs)
477 if ret:
478 raise make_ex(ret, "error calling getaddrs")
479
480 return decode_cstr(addrs)
481 finally:
482 free(addrs)
483
7c673cae
FG
484 def require_state(self, *args):
485 """
486 Checks if the Rados object is in a special state
487
11fdf7f2 488 :raises: :class:`RadosStateError`
7c673cae
FG
489 """
490 if self.state in args:
491 return
492 raise RadosStateError("You cannot perform that operation on a \
493Rados object in state %s." % self.state)
494
495 def shutdown(self):
496 """
497 Disconnects from the cluster. Call this explicitly when a
498 Rados.connect()ed object is no longer used.
499 """
500 if self.state != "shutdown":
501 with nogil:
502 rados_shutdown(self.cluster)
503 self.state = "shutdown"
504
505 def __enter__(self):
506 self.connect()
507 return self
508
509 def __exit__(self, type_, value, traceback):
510 self.shutdown()
511 return False
512
f67539c2 513 def version(self) -> Version:
7c673cae
FG
514 """
515 Get the version number of the ``librados`` C library.
516
517 :returns: a tuple of ``(major, minor, extra)`` components of the
518 librados version
519 """
520 cdef int major = 0
521 cdef int minor = 0
522 cdef int extra = 0
523 with nogil:
524 rados_version(&major, &minor, &extra)
525 return Version(major, minor, extra)
526
f67539c2 527 def conf_read_file(self, path: Optional[str] = None):
7c673cae
FG
528 """
529 Configure the cluster handle using a Ceph config file.
530
531 :param path: path to the config file
7c673cae
FG
532 """
533 self.require_state("configuring", "connected")
f67539c2 534 path_raw = cstr(path, 'path', opt=True)
7c673cae 535 cdef:
f67539c2 536 char *_path = opt_str(path_raw)
7c673cae
FG
537 with nogil:
538 ret = rados_conf_read_file(self.cluster, _path)
539 if ret != 0:
540 raise make_ex(ret, "error calling conf_read_file")
541
f67539c2 542 def conf_parse_argv(self, args: Sequence[str]):
7c673cae
FG
543 """
544 Parse known arguments from args, and remove; returned
545 args contain only those unknown to ceph
546 """
547 self.require_state("configuring", "connected")
548 if not args:
549 return
550
551 cargs = cstr_list(args, 'args')
552 cdef:
553 int _argc = len(args)
554 char **_argv = to_bytes_array(cargs)
555 char **_remargv = NULL
556
557 try:
558 _remargv = <char **>malloc(_argc * sizeof(char *))
559 with nogil:
560 ret = rados_conf_parse_argv_remainder(self.cluster, _argc,
561 <const char**>_argv,
562 <const char**>_remargv)
563 if ret:
564 raise make_ex(ret, "error calling conf_parse_argv_remainder")
565
566 # _remargv was allocated with fixed argc; collapse return
567 # list to eliminate any missing args
568 retargs = [decode_cstr(a) for a in _remargv[:_argc]
569 if a != NULL]
570 self.parsed_args = args
571 return retargs
572 finally:
573 free(_argv)
574 free(_remargv)
575
f67539c2 576 def conf_parse_env(self, var: Optional[str] = 'CEPH_ARGS'):
7c673cae
FG
577 """
578 Parse known arguments from an environment variable, normally
579 CEPH_ARGS.
580 """
581 self.require_state("configuring", "connected")
582 if not var:
583 return
584
f67539c2 585 var_raw = cstr(var, 'var')
7c673cae 586 cdef:
f67539c2 587 char *_var = var_raw
7c673cae
FG
588 with nogil:
589 ret = rados_conf_parse_env(self.cluster, _var)
590 if ret != 0:
591 raise make_ex(ret, "error calling conf_parse_env")
592
f67539c2 593 def conf_get(self, option: str) -> Optional[str]:
7c673cae
FG
594 """
595 Get the value of a configuration option
596
597 :param option: which option to read
7c673cae 598
20effc67 599 :returns: value of the option or None
7c673cae
FG
600 :raises: :class:`TypeError`
601 """
602 self.require_state("configuring", "connected")
f67539c2 603 option_raw = cstr(option, 'option')
7c673cae 604 cdef:
f67539c2 605 char *_option = option_raw
7c673cae
FG
606 size_t length = 20
607 char *ret_buf = NULL
608
609 try:
610 while True:
611 ret_buf = <char *>realloc_chk(ret_buf, length)
612 with nogil:
613 ret = rados_conf_get(self.cluster, _option, ret_buf, length)
614 if ret == 0:
615 return decode_cstr(ret_buf)
616 elif ret == -errno.ENAMETOOLONG:
617 length = length * 2
618 elif ret == -errno.ENOENT:
619 return None
620 else:
621 raise make_ex(ret, "error calling conf_get")
622 finally:
623 free(ret_buf)
624
f67539c2 625 def conf_set(self, option: str, val: str):
7c673cae
FG
626 """
627 Set the value of a configuration option
628
629 :param option: which option to set
7c673cae 630 :param option: value of the option
7c673cae
FG
631
632 :raises: :class:`TypeError`, :class:`ObjectNotFound`
633 """
634 self.require_state("configuring", "connected")
f67539c2
TL
635 option_raw = cstr(option, 'option')
636 val_raw = cstr(val, 'val')
7c673cae 637 cdef:
f67539c2
TL
638 char *_option = option_raw
639 char *_val = val_raw
7c673cae
FG
640
641 with nogil:
642 ret = rados_conf_set(self.cluster, _option, _val)
643 if ret != 0:
644 raise make_ex(ret, "error calling conf_set")
645
f67539c2 646 def ping_monitor(self, mon_id: str):
7c673cae
FG
647 """
648 Ping a monitor to assess liveness
649
650 May be used as a simply way to assess liveness, or to obtain
651 information about the monitor in a simple way even in the
652 absence of quorum.
653
654 :param mon_id: the ID portion of the monitor's name (i.e., mon.<ID>)
7c673cae
FG
655 :returns: the string reply from the monitor
656 """
657
658 self.require_state("configuring", "connected")
659
f67539c2 660 mon_id_raw = cstr(mon_id, 'mon_id')
7c673cae 661 cdef:
f67539c2 662 char *_mon_id = mon_id_raw
31f18b77 663 size_t outstrlen = 0
7c673cae
FG
664 char *outstr
665
666 with nogil:
667 ret = rados_ping_monitor(self.cluster, _mon_id, &outstr, &outstrlen)
668
669 if ret != 0:
670 raise make_ex(ret, "error calling ping_monitor")
671
672 if outstrlen:
673 my_outstr = outstr[:outstrlen]
674 rados_buffer_free(outstr)
675 return decode_cstr(my_outstr)
676
f67539c2 677 def connect(self, timeout: int = 0):
7c673cae
FG
678 """
679 Connect to the cluster. Use shutdown() to release resources.
20effc67
TL
680
681 :param timeout: Any supplied timeout value is currently ignored.
7c673cae
FG
682 """
683 self.require_state("configuring")
684 # NOTE(sileht): timeout was supported by old python API,
685 # but this is not something available in C API, so ignore
686 # for now and remove it later
687 with nogil:
688 ret = rados_connect(self.cluster)
689 if ret != 0:
690 raise make_ex(ret, "error connecting to the cluster")
691 self.state = "connected"
692
f67539c2 693 def get_instance_id(self) -> int:
11fdf7f2
TL
694 """
695 Get a global id for current instance
696 """
697 self.require_state("connected")
698 with nogil:
699 ret = rados_get_instance_id(self.cluster)
700 return ret;
701
f67539c2 702 def get_cluster_stats(self) -> Dict[str, int]:
7c673cae
FG
703 """
704 Read usage info about the cluster
705
706 This tells you total space, space used, space available, and number
707 of objects. These are not updated immediately when data is written,
708 they are eventually consistent.
20effc67 709 :returns: contains the following keys:
7c673cae
FG
710
711 - ``kb`` (int) - total space
712
713 - ``kb_used`` (int) - space used
714
715 - ``kb_avail`` (int) - free space available
716
717 - ``num_objects`` (int) - number of objects
718
719 """
720 cdef:
721 rados_cluster_stat_t stats
722
723 with nogil:
724 ret = rados_cluster_stat(self.cluster, &stats)
725
726 if ret < 0:
727 raise make_ex(
728 ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id)
729 return {'kb': stats.kb,
730 'kb_used': stats.kb_used,
731 'kb_avail': stats.kb_avail,
732 'num_objects': stats.num_objects}
733
f67539c2 734 def pool_exists(self, pool_name: str) -> bool:
7c673cae
FG
735 """
736 Checks if a given pool exists.
737
738 :param pool_name: name of the pool to check
7c673cae
FG
739
740 :raises: :class:`TypeError`, :class:`Error`
20effc67 741 :returns: whether the pool exists, false otherwise.
7c673cae
FG
742 """
743 self.require_state("connected")
744
f67539c2 745 pool_name_raw = cstr(pool_name, 'pool_name')
7c673cae 746 cdef:
f67539c2 747 char *_pool_name = pool_name_raw
7c673cae
FG
748
749 with nogil:
750 ret = rados_pool_lookup(self.cluster, _pool_name)
751 if ret >= 0:
752 return True
753 elif ret == -errno.ENOENT:
754 return False
755 else:
756 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
757
f67539c2 758 def pool_lookup(self, pool_name: str) -> int:
7c673cae
FG
759 """
760 Returns a pool's ID based on its name.
761
762 :param pool_name: name of the pool to look up
7c673cae
FG
763
764 :raises: :class:`TypeError`, :class:`Error`
20effc67 765 :returns: pool ID, or None if it doesn't exist
7c673cae
FG
766 """
767 self.require_state("connected")
f67539c2 768 pool_name_raw = cstr(pool_name, 'pool_name')
7c673cae 769 cdef:
f67539c2 770 char *_pool_name = pool_name_raw
7c673cae
FG
771
772 with nogil:
773 ret = rados_pool_lookup(self.cluster, _pool_name)
774 if ret >= 0:
775 return int(ret)
776 elif ret == -errno.ENOENT:
777 return None
778 else:
779 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
780
20effc67 781 def pool_reverse_lookup(self, pool_id: int) -> Optional[str]:
7c673cae
FG
782 """
783 Returns a pool's name based on its ID.
784
785 :param pool_id: ID of the pool to look up
7c673cae
FG
786
787 :raises: :class:`TypeError`, :class:`Error`
20effc67 788 :returns: pool name, or None if it doesn't exist
7c673cae
FG
789 """
790 self.require_state("connected")
791 cdef:
792 int64_t _pool_id = pool_id
793 size_t size = 512
794 char *name = NULL
795
796 try:
797 while True:
798 name = <char *>realloc_chk(name, size)
799 with nogil:
800 ret = rados_pool_reverse_lookup(self.cluster, _pool_id, name, size)
801 if ret >= 0:
802 break
803 elif ret != -errno.ERANGE and size <= 4096:
804 size *= 2
805 elif ret == -errno.ENOENT:
806 return None
807 elif ret < 0:
808 raise make_ex(ret, "error reverse looking up pool '%s'" % pool_id)
809
810 return decode_cstr(name)
811
812 finally:
813 free(name)
814
f67539c2
TL
815 def create_pool(self, pool_name: str,
816 crush_rule: Optional[int] = None,
817 auid: Optional[int] = None):
7c673cae
FG
818 """
819 Create a pool:
9f95a23c 820 - with default settings: if crush_rule=None and auid=None
11fdf7f2 821 - with a specific CRUSH rule: crush_rule given
9f95a23c
TL
822 - with a specific auid: auid given
823 - with a specific CRUSH rule and auid: crush_rule and auid given
824
7c673cae 825 :param pool_name: name of the pool to create
7c673cae 826 :param crush_rule: rule to use for placement in the new pool
9f95a23c 827 :param auid: id of the owner of the new pool
7c673cae
FG
828
829 :raises: :class:`TypeError`, :class:`Error`
830 """
831 self.require_state("connected")
832
f67539c2 833 pool_name_raw = cstr(pool_name, 'pool_name')
7c673cae 834 cdef:
f67539c2 835 char *_pool_name = pool_name_raw
7c673cae 836 uint8_t _crush_rule
9f95a23c 837 uint64_t _auid
7c673cae 838
9f95a23c 839 if crush_rule is None and auid is None:
7c673cae
FG
840 with nogil:
841 ret = rados_pool_create(self.cluster, _pool_name)
9f95a23c 842 elif crush_rule is not None and auid is None:
7c673cae
FG
843 _crush_rule = crush_rule
844 with nogil:
11fdf7f2 845 ret = rados_pool_create_with_crush_rule(self.cluster, _pool_name, _crush_rule)
9f95a23c
TL
846 elif crush_rule is None and auid is not None:
847 _auid = auid
848 with nogil:
849 ret = rados_pool_create_with_auid(self.cluster, _pool_name, _auid)
850 else:
851 _crush_rule = crush_rule
852 _auid = auid
853 with nogil:
854 ret = rados_pool_create_with_all(self.cluster, _pool_name, _auid, _crush_rule)
7c673cae
FG
855 if ret < 0:
856 raise make_ex(ret, "error creating pool '%s'" % pool_name)
857
f67539c2 858 def get_pool_base_tier(self, pool_id: int) -> int:
7c673cae
FG
859 """
860 Get base pool
861
862 :returns: base pool, or pool_id if tiering is not configured for the pool
863 """
864 self.require_state("connected")
865 cdef:
866 int64_t base_tier = 0
867 int64_t _pool_id = pool_id
868
869 with nogil:
870 ret = rados_pool_get_base_tier(self.cluster, _pool_id, &base_tier)
871 if ret < 0:
872 raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id)
873 return int(base_tier)
874
f67539c2 875 def delete_pool(self, pool_name: str):
7c673cae
FG
876 """
877 Delete a pool and all data inside it.
878
879 The pool is removed from the cluster immediately,
880 but the actual data is deleted in the background.
881
882 :param pool_name: name of the pool to delete
7c673cae
FG
883
884 :raises: :class:`TypeError`, :class:`Error`
885 """
886 self.require_state("connected")
887
f67539c2 888 pool_name_raw = cstr(pool_name, 'pool_name')
7c673cae 889 cdef:
f67539c2 890 char *_pool_name = pool_name_raw
7c673cae
FG
891
892 with nogil:
893 ret = rados_pool_delete(self.cluster, _pool_name)
894 if ret < 0:
895 raise make_ex(ret, "error deleting pool '%s'" % pool_name)
896
f67539c2 897 def get_inconsistent_pgs(self, pool_id: int) -> List[str]:
7c673cae
FG
898 """
899 List inconsistent placement groups in the given pool
900
901 :param pool_id: ID of the pool in which PGs are listed
20effc67 902 :returns: inconsistent placement groups
7c673cae
FG
903 """
904 self.require_state("connected")
905 cdef:
906 int64_t pool = pool_id
907 size_t size = 512
908 char *pgs = NULL
909
910 try:
911 while True:
912 pgs = <char *>realloc_chk(pgs, size);
913 with nogil:
914 ret = rados_inconsistent_pg_list(self.cluster, pool,
915 pgs, size)
916 if ret > <int>size:
917 size *= 2
918 elif ret >= 0:
919 break
920 else:
921 raise make_ex(ret, "error calling inconsistent_pg_list")
922 return [pg for pg in decode_cstr(pgs[:ret]).split('\0') if pg]
923 finally:
924 free(pgs)
925
f67539c2 926 def list_pools(self) -> List[str]:
7c673cae
FG
927 """
928 Gets a list of pool names.
929
20effc67 930 :returns: list of pool names.
7c673cae
FG
931 """
932 self.require_state("connected")
933 cdef:
934 size_t size = 512
935 char *c_names = NULL
936
937 try:
938 while True:
939 c_names = <char *>realloc_chk(c_names, size)
940 with nogil:
941 ret = rados_pool_list(self.cluster, c_names, size)
942 if ret > <int>size:
943 size *= 2
944 elif ret >= 0:
945 break
946 return [name for name in decode_cstr(c_names[:ret]).split('\0')
947 if name]
948 finally:
949 free(c_names)
950
f67539c2 951 def get_fsid(self) -> str:
7c673cae
FG
952 """
953 Get the fsid of the cluster as a hexadecimal string.
954
955 :raises: :class:`Error`
20effc67 956 :returns: cluster fsid
7c673cae
FG
957 """
958 self.require_state("connected")
959 cdef:
81eedcae
TL
960 char *ret_buf = NULL
961 size_t buf_len = 64
7c673cae 962
7c673cae 963 try:
81eedcae
TL
964 while True:
965 ret_buf = <char *>realloc_chk(ret_buf, buf_len)
966 with nogil:
967 ret = rados_cluster_fsid(self.cluster, ret_buf, buf_len)
968 if ret == -errno.ERANGE:
969 buf_len = buf_len * 2
970 elif ret < 0:
971 raise make_ex(ret, "error getting cluster fsid")
972 else:
973 break
974 return decode_cstr(ret_buf)
7c673cae 975 finally:
81eedcae 976 free(ret_buf)
7c673cae 977
f67539c2 978 def open_ioctx(self, ioctx_name: str) -> Ioctx:
7c673cae
FG
979 """
980 Create an io context
981
982 The io context allows you to perform operations within a particular
983 pool.
984
985 :param ioctx_name: name of the pool
7c673cae
FG
986
987 :raises: :class:`TypeError`, :class:`Error`
20effc67 988 :returns: Rados Ioctx object
7c673cae
FG
989 """
990 self.require_state("connected")
f67539c2 991 ioctx_name_raw = cstr(ioctx_name, 'ioctx_name')
7c673cae
FG
992 cdef:
993 rados_ioctx_t ioctx
f67539c2 994 char *_ioctx_name = ioctx_name_raw
7c673cae
FG
995 with nogil:
996 ret = rados_ioctx_create(self.cluster, _ioctx_name, &ioctx)
997 if ret < 0:
998 raise make_ex(ret, "error opening pool '%s'" % ioctx_name)
cd265ab1 999 io = Ioctx(self, ioctx_name)
7c673cae
FG
1000 io.io = ioctx
1001 return io
1002
f67539c2 1003 def open_ioctx2(self, pool_id: int) -> Ioctx:
11fdf7f2
TL
1004 """
1005 Create an io context
1006
1007 The io context allows you to perform operations within a particular
1008 pool.
1009
1010 :param pool_id: ID of the pool
11fdf7f2
TL
1011
1012 :raises: :class:`TypeError`, :class:`Error`
20effc67 1013 :returns: Rados Ioctx object
11fdf7f2
TL
1014 """
1015 self.require_state("connected")
1016 cdef:
1017 rados_ioctx_t ioctx
1018 int64_t _pool_id = pool_id
1019 with nogil:
1020 ret = rados_ioctx_create2(self.cluster, _pool_id, &ioctx)
1021 if ret < 0:
1022 raise make_ex(ret, "error opening pool id '%s'" % pool_id)
cd265ab1 1023 io = Ioctx(self, str(pool_id))
11fdf7f2
TL
1024 io.io = ioctx
1025 return io
1026
f67539c2
TL
1027 def mon_command(self,
1028 cmd: str,
1029 inbuf: bytes,
1030 timeout: int = 0,
1031 target: Optional[Union[str, int]] = None) -> Tuple[int, bytes, str]:
7c673cae 1032 """
9f95a23c
TL
1033 Send a command to the mon.
1034
7c673cae 1035 mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen)
9f95a23c
TL
1036
1037 :param cmd: JSON formatted string.
1038 :param inbuf: optional string.
1039 :param timeout: This parameter is ignored.
f67539c2 1040 :param target: name or rank of a specific mon. Optional
9f95a23c
TL
1041 :return: (int ret, string outbuf, string outs)
1042
1043 Example:
1044
1045 >>> import json
1046 >>> c = Rados(conffile='/etc/ceph/ceph.conf')
1047 >>> c.connect()
1048 >>> cmd = json.dumps({"prefix": "osd safe-to-destroy", "ids": ["2"], "format": "json"})
1049 >>> c.mon_command(cmd, b'')
7c673cae
FG
1050 """
1051 # NOTE(sileht): timeout is ignored because C API doesn't provide
1052 # timeout argument, but we keep it for backward compat with old python binding
7c673cae 1053 self.require_state("connected")
f67539c2 1054 cmds = [cstr(cmd, 'cmd')]
7c673cae
FG
1055
1056 if isinstance(target, int):
1057 # NOTE(sileht): looks weird but test_monmap_dump pass int
1058 target = str(target)
1059
1060 target = cstr(target, 'target', opt=True)
7c673cae
FG
1061
1062 cdef:
1063 char *_target = opt_str(target)
f67539c2
TL
1064 char **_cmd = to_bytes_array(cmds)
1065 size_t _cmdlen = len(cmds)
7c673cae
FG
1066
1067 char *_inbuf = inbuf
1068 size_t _inbuf_len = len(inbuf)
1069
1070 char *_outbuf
1071 size_t _outbuf_len
1072 char *_outs
1073 size_t _outs_len
1074
1075 try:
1076 if target:
1077 with nogil:
1078 ret = rados_mon_command_target(self.cluster, _target,
1079 <const char **>_cmd, _cmdlen,
1080 <const char*>_inbuf, _inbuf_len,
1081 &_outbuf, &_outbuf_len,
1082 &_outs, &_outs_len)
1083 else:
1084 with nogil:
1085 ret = rados_mon_command(self.cluster,
1086 <const char **>_cmd, _cmdlen,
1087 <const char*>_inbuf, _inbuf_len,
1088 &_outbuf, &_outbuf_len,
1089 &_outs, &_outs_len)
1090
1091 my_outs = decode_cstr(_outs[:_outs_len])
1092 my_outbuf = _outbuf[:_outbuf_len]
1093 if _outs_len:
1094 rados_buffer_free(_outs)
1095 if _outbuf_len:
1096 rados_buffer_free(_outbuf)
1097 return (ret, my_outbuf, my_outs)
1098 finally:
1099 free(_cmd)
1100
f67539c2
TL
1101 def osd_command(self,
1102 osdid: int,
1103 cmd: str,
1104 inbuf: bytes,
1105 timeout: int = 0) -> Tuple[int, bytes, str]:
7c673cae
FG
1106 """
1107 osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
9f95a23c
TL
1108
1109 :return: (int ret, string outbuf, string outs)
7c673cae
FG
1110 """
1111 # NOTE(sileht): timeout is ignored because C API doesn't provide
1112 # timeout argument, but we keep it for backward compat with old python binding
1113 self.require_state("connected")
1114
f67539c2 1115 cmds = [cstr(cmd, 'cmd')]
7c673cae
FG
1116
1117 cdef:
1118 int _osdid = osdid
f67539c2
TL
1119 char **_cmd = to_bytes_array(cmds)
1120 size_t _cmdlen = len(cmds)
7c673cae
FG
1121
1122 char *_inbuf = inbuf
1123 size_t _inbuf_len = len(inbuf)
1124
1125 char *_outbuf
1126 size_t _outbuf_len
1127 char *_outs
1128 size_t _outs_len
1129
1130 try:
1131 with nogil:
1132 ret = rados_osd_command(self.cluster, _osdid,
1133 <const char **>_cmd, _cmdlen,
1134 <const char*>_inbuf, _inbuf_len,
1135 &_outbuf, &_outbuf_len,
1136 &_outs, &_outs_len)
1137
1138 my_outs = decode_cstr(_outs[:_outs_len])
1139 my_outbuf = _outbuf[:_outbuf_len]
1140 if _outs_len:
1141 rados_buffer_free(_outs)
1142 if _outbuf_len:
1143 rados_buffer_free(_outbuf)
1144 return (ret, my_outbuf, my_outs)
1145 finally:
1146 free(_cmd)
1147
f67539c2
TL
1148 def mgr_command(self,
1149 cmd: str,
1150 inbuf: bytes,
1151 timeout: int = 0,
1152 target: Optional[str] = None) -> Tuple[int, str, bytes]:
7c673cae 1153 """
9f95a23c 1154 :return: (int ret, string outbuf, string outs)
7c673cae
FG
1155 """
1156 # NOTE(sileht): timeout is ignored because C API doesn't provide
1157 # timeout argument, but we keep it for backward compat with old python binding
1158 self.require_state("connected")
1159
f67539c2 1160 cmds = [cstr(cmd, 'cmd')]
9f95a23c 1161 target = cstr(target, 'target', opt=True)
7c673cae
FG
1162
1163 cdef:
9f95a23c
TL
1164 char *_target = opt_str(target)
1165
f67539c2
TL
1166 char **_cmd = to_bytes_array(cmds)
1167 size_t _cmdlen = len(cmds)
7c673cae
FG
1168
1169 char *_inbuf = inbuf
1170 size_t _inbuf_len = len(inbuf)
1171
1172 char *_outbuf
1173 size_t _outbuf_len
1174 char *_outs
1175 size_t _outs_len
1176
1177 try:
9f95a23c
TL
1178 if target is not None:
1179 with nogil:
1180 ret = rados_mgr_command_target(self.cluster,
1181 <const char*>_target,
1182 <const char **>_cmd, _cmdlen,
1183 <const char*>_inbuf, _inbuf_len,
1184 &_outbuf, &_outbuf_len,
1185 &_outs, &_outs_len)
1186 else:
1187 with nogil:
1188 ret = rados_mgr_command(self.cluster,
1189 <const char **>_cmd, _cmdlen,
1190 <const char*>_inbuf, _inbuf_len,
1191 &_outbuf, &_outbuf_len,
1192 &_outs, &_outs_len)
7c673cae
FG
1193
1194 my_outs = decode_cstr(_outs[:_outs_len])
1195 my_outbuf = _outbuf[:_outbuf_len]
1196 if _outs_len:
1197 rados_buffer_free(_outs)
1198 if _outbuf_len:
1199 rados_buffer_free(_outbuf)
1200 return (ret, my_outbuf, my_outs)
1201 finally:
1202 free(_cmd)
1203
f67539c2
TL
1204 def pg_command(self,
1205 pgid: str,
1206 cmd: str,
1207 inbuf: bytes,
1208 timeout: int = 0) -> Tuple[int, bytes, str]:
7c673cae
FG
1209 """
1210 pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
9f95a23c
TL
1211
1212 :return: (int ret, string outbuf, string outs)
7c673cae
FG
1213 """
1214 # NOTE(sileht): timeout is ignored because C API doesn't provide
1215 # timeout argument, but we keep it for backward compat with old python binding
1216 self.require_state("connected")
1217
f67539c2
TL
1218 pgid_raw = cstr(pgid, 'pgid')
1219 cmds = [cstr(cmd, 'cmd')]
7c673cae
FG
1220
1221 cdef:
f67539c2
TL
1222 char *_pgid = pgid_raw
1223 char **_cmd = to_bytes_array(cmds)
1224 size_t _cmdlen = len(cmds)
7c673cae
FG
1225
1226 char *_inbuf = inbuf
1227 size_t _inbuf_len = len(inbuf)
1228
1229 char *_outbuf
1230 size_t _outbuf_len
1231 char *_outs
1232 size_t _outs_len
1233
1234 try:
1235 with nogil:
1236 ret = rados_pg_command(self.cluster, _pgid,
1237 <const char **>_cmd, _cmdlen,
1238 <const char *>_inbuf, _inbuf_len,
1239 &_outbuf, &_outbuf_len,
1240 &_outs, &_outs_len)
1241
1242 my_outs = decode_cstr(_outs[:_outs_len])
1243 my_outbuf = _outbuf[:_outbuf_len]
1244 if _outs_len:
1245 rados_buffer_free(_outs)
1246 if _outbuf_len:
1247 rados_buffer_free(_outbuf)
1248 return (ret, my_outbuf, my_outs)
1249 finally:
1250 free(_cmd)
1251
f67539c2 1252 def wait_for_latest_osdmap(self) -> int:
7c673cae
FG
1253 self.require_state("connected")
1254 with nogil:
1255 ret = rados_wait_for_latest_osdmap(self.cluster)
1256 return ret
1257
f67539c2 1258 def blocklist_add(self, client_address: str, expire_seconds: int = 0):
7c673cae 1259 """
f67539c2 1260 Blocklist a client from the OSDs
7c673cae
FG
1261
1262 :param client_address: client address
f67539c2 1263 :param expire_seconds: number of seconds to blocklist
7c673cae
FG
1264
1265 :raises: :class:`Error`
1266 """
1267 self.require_state("connected")
f67539c2 1268 client_address_raw = cstr(client_address, 'client_address')
7c673cae
FG
1269 cdef:
1270 uint32_t _expire_seconds = expire_seconds
f67539c2 1271 char *_client_address = client_address_raw
7c673cae
FG
1272
1273 with nogil:
f67539c2 1274 ret = rados_blocklist_add(self.cluster, _client_address, _expire_seconds)
7c673cae 1275 if ret < 0:
f67539c2 1276 raise make_ex(ret, "error blocklisting client '%s'" % client_address)
7c673cae 1277
f67539c2
TL
1278 def monitor_log(self, level: str,
1279 callback: Optional[Callable[[object, str, str, str, int, int, int, str, str], None]] = None,
1280 arg: Optional[object] = None):
7c673cae
FG
1281 if level not in MONITOR_LEVELS:
1282 raise LogicError("invalid monitor level " + level)
1283 if callback is not None and not callable(callback):
1284 raise LogicError("callback must be a callable function or None")
1285
f67539c2
TL
1286 level_raw = cstr(level, 'level')
1287 cdef char *_level = level_raw
7c673cae
FG
1288
1289 if callback is None:
1290 with nogil:
1291 r = rados_monitor_log(self.cluster, <const char*>_level, NULL, NULL)
1292 self.monitor_callback = None
31f18b77 1293 self.monitor_callback2 = None
7c673cae
FG
1294 return
1295
1296 cb = (callback, arg)
1297 cdef PyObject* _arg = <PyObject*>cb
1298 with nogil:
1299 r = rados_monitor_log(self.cluster, <const char*>_level,
1300 <rados_log_callback_t>&__monitor_callback, _arg)
1301
1302 if r:
1303 raise make_ex(r, 'error calling rados_monitor_log')
1304 # NOTE(sileht): Prevents the callback method from being garbage collected
1305 self.monitor_callback = cb
31f18b77
FG
1306 self.monitor_callback2 = None
1307
f67539c2
TL
1308 def monitor_log2(self, level: str,
1309 callback: Optional[Callable[[object, str, str, str, str, int, int, int, str, str], None]] = None,
1310 arg: Optional[object] = None):
31f18b77
FG
1311 if level not in MONITOR_LEVELS:
1312 raise LogicError("invalid monitor level " + level)
1313 if callback is not None and not callable(callback):
1314 raise LogicError("callback must be a callable function or None")
1315
f67539c2
TL
1316 level_raw = cstr(level, 'level')
1317 cdef char *_level = level_raw
31f18b77
FG
1318
1319 if callback is None:
1320 with nogil:
1321 r = rados_monitor_log2(self.cluster, <const char*>_level, NULL, NULL)
1322 self.monitor_callback = None
1323 self.monitor_callback2 = None
1324 return
1325
1326 cb = (callback, arg)
1327 cdef PyObject* _arg = <PyObject*>cb
1328 with nogil:
1329 r = rados_monitor_log2(self.cluster, <const char*>_level,
1330 <rados_log_callback2_t>&__monitor_callback2, _arg)
1331
1332 if r:
1333 raise make_ex(r, 'error calling rados_monitor_log')
1334 # NOTE(sileht): Prevents the callback method from being garbage collected
1335 self.monitor_callback = None
1336 self.monitor_callback2 = cb
7c673cae 1337
f67539c2 1338 def service_daemon_register(self, service: str, daemon: str, metadata: Dict[str, str]):
11fdf7f2
TL
1339 """
1340 :param str service: service name (e.g. "rgw")
1341 :param str daemon: daemon name (e.g. "gwfoo")
1342 :param dict metadata: static metadata about the register daemon
1343 (e.g., the version of Ceph, the kernel version.)
1344 """
f67539c2
TL
1345 service_raw = cstr(service, 'service')
1346 daemon_raw = cstr(daemon, 'daemon')
9f95a23c 1347 metadata_dict = flatten_dict(metadata, 'metadata')
11fdf7f2 1348 cdef:
f67539c2
TL
1349 char *_service = service_raw
1350 char *_daemon = daemon_raw
11fdf7f2
TL
1351 char *_metadata = metadata_dict
1352
1353 with nogil:
1354 ret = rados_service_register(self.cluster, _service, _daemon, _metadata)
1355 if ret != 0:
1356 raise make_ex(ret, "error calling service_register()")
1357
f67539c2 1358 def service_daemon_update(self, status: Dict[str, str]):
9f95a23c 1359 status_dict = flatten_dict(status, 'status')
11fdf7f2
TL
1360 cdef:
1361 char *_status = status_dict
1362
1363 with nogil:
1364 ret = rados_service_update_status(self.cluster, _status)
1365 if ret != 0:
1366 raise make_ex(ret, "error calling service_daemon_update()")
1367
7c673cae
FG
1368
1369cdef class OmapIterator(object):
1370 """Omap iterator"""
1371
1372 cdef public Ioctx ioctx
05a536ef 1373 cdef public object omap_key_type
7c673cae
FG
1374 cdef rados_omap_iter_t ctx
1375
05a536ef 1376 def __cinit__(self, Ioctx ioctx, omap_key_type):
7c673cae 1377 self.ioctx = ioctx
05a536ef 1378 self.omap_key_type = omap_key_type
7c673cae
FG
1379
1380 def __iter__(self):
1381 return self
1382
1383 def __next__(self):
1384 """
1385 Get the next key-value pair in the object
1386 :returns: next rados.OmapItem
1387 """
1388 cdef:
1389 char *key_ = NULL
1390 char *val_ = NULL
1391 size_t len_
1392
1393 with nogil:
1394 ret = rados_omap_get_next(self.ctx, &key_, &val_, &len_)
1395
1396 if ret != 0:
1397 raise make_ex(ret, "error iterating over the omap")
1398 if key_ == NULL:
1399 raise StopIteration()
05a536ef 1400 key = self.omap_key_type(key_)
7c673cae
FG
1401 val = None
1402 if val_ != NULL:
1403 val = val_[:len_]
1404 return (key, val)
1405
1406 def __dealloc__(self):
1407 with nogil:
1408 rados_omap_get_end(self.ctx)
1409
1410
1411cdef class ObjectIterator(object):
1412 """rados.Ioctx Object iterator"""
1413
1414 cdef rados_list_ctx_t ctx
1415
1416 cdef public object ioctx
1417
1418 def __cinit__(self, Ioctx ioctx):
1419 self.ioctx = ioctx
1420
1421 with nogil:
1422 ret = rados_nobjects_list_open(ioctx.io, &self.ctx)
1423 if ret < 0:
1424 raise make_ex(ret, "error iterating over the objects in ioctx '%s'"
1425 % self.ioctx.name)
1426
1427 def __iter__(self):
1428 return self
1429
1430 def __next__(self):
1431 """
1432 Get the next object name and locator in the pool
1433
1434 :raises: StopIteration
1435 :returns: next rados.Ioctx Object
1436 """
1437 cdef:
1438 const char *key_ = NULL
1439 const char *locator_ = NULL
1440 const char *nspace_ = NULL
20effc67
TL
1441 size_t key_size_ = 0
1442 size_t locator_size_ = 0
1443 size_t nspace_size_ = 0
7c673cae
FG
1444
1445 with nogil:
20effc67
TL
1446 ret = rados_nobjects_list_next2(self.ctx, &key_, &locator_, &nspace_,
1447 &key_size_, &locator_size_, &nspace_size_)
7c673cae
FG
1448
1449 if ret < 0:
1450 raise StopIteration()
1451
20effc67
TL
1452 key = decode_cstr(key_[:key_size_])
1453 locator = decode_cstr(locator_[:locator_size_]) if locator_ != NULL else None
1454 nspace = decode_cstr(nspace_[:nspace_size_]) if nspace_ != NULL else None
7c673cae
FG
1455 return Object(self.ioctx, key, locator, nspace)
1456
1457 def __dealloc__(self):
1458 with nogil:
1459 rados_nobjects_list_close(self.ctx)
1460
1461
1462cdef class XattrIterator(object):
1463 """Extended attribute iterator"""
1464
1465 cdef rados_xattrs_iter_t it
1466 cdef char* _oid
1467
1468 cdef public Ioctx ioctx
1469 cdef public object oid
1470
1471 def __cinit__(self, Ioctx ioctx, oid):
1472 self.ioctx = ioctx
1473 self.oid = cstr(oid, 'oid')
1474 self._oid = self.oid
1475
1476 with nogil:
1477 ret = rados_getxattrs(ioctx.io, self._oid, &self.it)
1478 if ret != 0:
1479 raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid)
1480
1481 def __iter__(self):
1482 return self
1483
1484 def __next__(self):
1485 """
1486 Get the next xattr on the object
1487
1488 :raises: StopIteration
1489 :returns: pair - of name and value of the next Xattr
1490 """
1491 cdef:
1492 const char *name_ = NULL
1493 const char *val_ = NULL
1494 size_t len_ = 0
1495
1496 with nogil:
1497 ret = rados_getxattrs_next(self.it, &name_, &val_, &len_)
1498 if ret != 0:
1499 raise make_ex(ret, "error iterating over the extended attributes \
1500in '%s'" % self.oid)
1501 if name_ == NULL:
1502 raise StopIteration()
1503 name = decode_cstr(name_)
1504 val = val_[:len_]
1505 return (name, val)
1506
1507 def __dealloc__(self):
1508 with nogil:
1509 rados_getxattrs_end(self.it)
1510
1511
1512cdef class SnapIterator(object):
1513 """Snapshot iterator"""
1514
1515 cdef public Ioctx ioctx
1516
1517 cdef rados_snap_t *snaps
1518 cdef int max_snap
1519 cdef int cur_snap
1520
1521 def __cinit__(self, Ioctx ioctx):
1522 self.ioctx = ioctx
1523 # We don't know how big a buffer we need until we've called the
1524 # function. So use the exponential doubling strategy.
1525 cdef int num_snaps = 10
1526 while True:
1527 self.snaps = <rados_snap_t*>realloc_chk(self.snaps,
1528 num_snaps *
1529 sizeof(rados_snap_t))
1530
1531 with nogil:
1532 ret = rados_ioctx_snap_list(ioctx.io, self.snaps, num_snaps)
1533 if ret >= 0:
1534 self.max_snap = ret
1535 break
1536 elif ret != -errno.ERANGE:
1537 raise make_ex(ret, "error calling rados_snap_list for \
1538ioctx '%s'" % self.ioctx.name)
1539 num_snaps = num_snaps * 2
1540 self.cur_snap = 0
1541
20effc67 1542 def __iter__(self) -> 'SnapIterator':
7c673cae
FG
1543 return self
1544
20effc67 1545 def __next__(self) -> 'Snap':
7c673cae
FG
1546 """
1547 Get the next Snapshot
1548
1549 :raises: :class:`Error`, StopIteration
20effc67 1550 :returns: next snapshot
7c673cae
FG
1551 """
1552 if self.cur_snap >= self.max_snap:
1553 raise StopIteration
1554
1555 cdef:
1556 rados_snap_t snap_id = self.snaps[self.cur_snap]
1557 int name_len = 10
1558 char *name = NULL
1559
1560 try:
1561 while True:
1562 name = <char *>realloc_chk(name, name_len)
1563 with nogil:
1564 ret = rados_ioctx_snap_get_name(self.ioctx.io, snap_id, name, name_len)
1565 if ret == 0:
1566 break
1567 elif ret != -errno.ERANGE:
1568 raise make_ex(ret, "rados_snap_get_name error")
1569 else:
1570 name_len = name_len * 2
1571
1572 snap = Snap(self.ioctx, decode_cstr(name[:name_len]).rstrip('\0'), snap_id)
1573 self.cur_snap = self.cur_snap + 1
1574 return snap
1575 finally:
1576 free(name)
1577
1578
1579cdef class Snap(object):
1580 """Snapshot object"""
1581 cdef public Ioctx ioctx
1582 cdef public object name
1583
1584 # NOTE(sileht): old API was storing the ctypes object
1585 # instead of the value ....
1586 cdef public rados_snap_t snap_id
1587
1588 def __cinit__(self, Ioctx ioctx, object name, rados_snap_t snap_id):
1589 self.ioctx = ioctx
1590 self.name = name
1591 self.snap_id = snap_id
1592
1593 def __str__(self):
1594 return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \
1595 % (str(self.ioctx), self.name, self.snap_id)
1596
f67539c2 1597 def get_timestamp(self) -> float:
7c673cae
FG
1598 """
1599 Find when a snapshot in the current pool occurred
1600
1601 :raises: :class:`Error`
20effc67 1602 :returns: the data and time the snapshot was created
7c673cae
FG
1603 """
1604 cdef time_t snap_time
1605
1606 with nogil:
1607 ret = rados_ioctx_snap_get_stamp(self.ioctx.io, self.snap_id, &snap_time)
1608 if ret != 0:
1609 raise make_ex(ret, "rados_ioctx_snap_get_stamp error")
1610 return datetime.fromtimestamp(snap_time)
1611
f67539c2
TL
1612# https://github.com/cython/cython/issues/1370
1613unicode = str
7c673cae
FG
1614
1615cdef class Completion(object):
1616 """completion object"""
1617
1618 cdef public:
1619 Ioctx ioctx
1620 object oncomplete
1621 object onsafe
1622
1623 cdef:
1624 rados_callback_t complete_cb
1625 rados_callback_t safe_cb
1626 rados_completion_t rados_comp
1627 PyObject* buf
1628
1629 def __cinit__(self, Ioctx ioctx, object oncomplete, object onsafe):
1630 self.oncomplete = oncomplete
1631 self.onsafe = onsafe
1632 self.ioctx = ioctx
1633
f67539c2 1634 def is_safe(self) -> bool:
7c673cae
FG
1635 """
1636 Is an asynchronous operation safe?
1637
1638 This does not imply that the safe callback has finished.
1639
1640 :returns: True if the operation is safe
1641 """
9f95a23c 1642 return self.is_complete()
7c673cae 1643
f67539c2 1644 def is_complete(self) -> bool:
7c673cae
FG
1645 """
1646 Has an asynchronous operation completed?
1647
1648 This does not imply that the safe callback has finished.
1649
1650 :returns: True if the operation is completed
1651 """
1652 with nogil:
1653 ret = rados_aio_is_complete(self.rados_comp)
1654 return ret == 1
1655
1656 def wait_for_safe(self):
1657 """
1658 Wait for an asynchronous operation to be marked safe
1659
9f95a23c 1660 wait_for_safe() is an alias of wait_for_complete() since Luminous
7c673cae 1661 """
9f95a23c 1662 self.wait_for_complete()
7c673cae
FG
1663
1664 def wait_for_complete(self):
1665 """
1666 Wait for an asynchronous operation to complete
1667
1668 This does not imply that the complete callback has finished.
1669 """
1670 with nogil:
1671 rados_aio_wait_for_complete(self.rados_comp)
1672
1673 def wait_for_safe_and_cb(self):
1674 """
1675 Wait for an asynchronous operation to be marked safe and for
1676 the safe callback to have returned
1677 """
9f95a23c 1678 return self.wait_for_complete_and_cb()
7c673cae
FG
1679
1680 def wait_for_complete_and_cb(self):
1681 """
1682 Wait for an asynchronous operation to complete and for the
1683 complete callback to have returned
1684
1685 :returns: whether the operation is completed
1686 """
1687 with nogil:
1688 ret = rados_aio_wait_for_complete_and_cb(self.rados_comp)
1689 return ret
1690
f67539c2 1691 def get_return_value(self) -> int:
7c673cae
FG
1692 """
1693 Get the return value of an asychronous operation
1694
1695 The return value is set when the operation is complete or safe,
1696 whichever comes first.
1697
20effc67 1698 :returns: return value of the operation
7c673cae
FG
1699 """
1700 with nogil:
1701 ret = rados_aio_get_return_value(self.rados_comp)
1702 return ret
1703
1704 def __dealloc__(self):
1705 """
1706 Release a completion
1707
1708 Call this when you no longer need the completion. It may not be
1709 freed immediately if the operation is not acked and committed.
1710 """
1711 ref.Py_XDECREF(self.buf)
1712 self.buf = NULL
1713 if self.rados_comp != NULL:
1714 with nogil:
1715 rados_aio_release(self.rados_comp)
1716 self.rados_comp = NULL
1717
1718 def _complete(self):
1719 self.oncomplete(self)
9f95a23c
TL
1720 if self.onsafe:
1721 self.onsafe(self)
1722 self._cleanup()
7c673cae
FG
1723
1724 def _cleanup(self):
1725 with self.ioctx.lock:
1726 if self.oncomplete:
1727 self.ioctx.complete_completions.remove(self)
1728 if self.onsafe:
1729 self.ioctx.safe_completions.remove(self)
1730
1731
1732class OpCtx(object):
1733 def __enter__(self):
1734 return self.create()
1735
1736 def __exit__(self, type, msg, traceback):
1737 self.release()
1738
1739
1740cdef class WriteOp(object):
1741 cdef rados_write_op_t write_op
1742
1743 def create(self):
1744 with nogil:
1745 self.write_op = rados_create_write_op()
1746 return self
1747
1748 def release(self):
1749 with nogil:
1750 rados_release_write_op(self.write_op)
1751
f67539c2 1752 def new(self, exclusive: Optional[int] = None):
7c673cae
FG
1753 """
1754 Create the object.
1755 """
1756
1757 cdef:
1758 int _exclusive = exclusive
1759
1760 with nogil:
1761 rados_write_op_create(self.write_op, _exclusive, NULL)
1762
1763
1764 def remove(self):
1765 """
1766 Remove object.
1767 """
1768 with nogil:
1769 rados_write_op_remove(self.write_op)
1770
f67539c2 1771 def set_flags(self, flags: int = LIBRADOS_OPERATION_NOFLAG):
7c673cae
FG
1772 """
1773 Set flags for the last operation added to this write_op.
1774 :para flags: flags to apply to the last operation
7c673cae
FG
1775 """
1776
1777 cdef:
1778 int _flags = flags
1779
1780 with nogil:
1781 rados_write_op_set_flags(self.write_op, _flags)
1782
f67539c2 1783 def set_xattr(self, xattr_name: str, xattr_value: bytes):
9f95a23c
TL
1784 """
1785 Set an extended attribute on an object.
1786 :param xattr_name: name of the xattr
9f95a23c 1787 :param xattr_value: buffer to set xattr to
9f95a23c 1788 """
f67539c2 1789 xattr_name_raw = cstr(xattr_name, 'xattr_name')
9f95a23c 1790 cdef:
f67539c2 1791 char *_xattr_name = xattr_name_raw
9f95a23c
TL
1792 char *_xattr_value = xattr_value
1793 size_t _xattr_value_len = len(xattr_value)
1794 with nogil:
1795 rados_write_op_setxattr(self.write_op, _xattr_name, _xattr_value, _xattr_value_len)
1796
f67539c2 1797 def rm_xattr(self, xattr_name: str):
9f95a23c
TL
1798 """
1799 Removes an extended attribute on from an object.
1800 :param xattr_name: name of the xattr to remove
9f95a23c 1801 """
f67539c2 1802 xattr_name_raw = cstr(xattr_name, 'xattr_name')
9f95a23c 1803 cdef:
f67539c2 1804 char *_xattr_name = xattr_name_raw
9f95a23c
TL
1805 with nogil:
1806 rados_write_op_rmxattr(self.write_op, _xattr_name)
1807
f67539c2 1808 def append(self, to_write: bytes):
7c673cae
FG
1809 """
1810 Append data to an object synchronously
1811 :param to_write: data to write
7c673cae
FG
1812 """
1813
1814 cdef:
1815 char *_to_write = to_write
1816 size_t length = len(to_write)
1817
1818 with nogil:
1819 rados_write_op_append(self.write_op, _to_write, length)
1820
f67539c2 1821 def write_full(self, to_write: bytes):
7c673cae
FG
1822 """
1823 Write whole object, atomically replacing it.
1824 :param to_write: data to write
7c673cae
FG
1825 """
1826
1827 cdef:
1828 char *_to_write = to_write
1829 size_t length = len(to_write)
1830
1831 with nogil:
1832 rados_write_op_write_full(self.write_op, _to_write, length)
1833
f67539c2 1834 def write(self, to_write: bytes, offset: int = 0):
7c673cae
FG
1835 """
1836 Write to offset.
1837 :param to_write: data to write
7c673cae 1838 :param offset: byte offset in the object to begin writing at
7c673cae
FG
1839 """
1840
1841 cdef:
1842 char *_to_write = to_write
1843 size_t length = len(to_write)
1844 uint64_t _offset = offset
1845
1846 with nogil:
1847 rados_write_op_write(self.write_op, _to_write, length, _offset)
1848
f67539c2 1849 def assert_version(self, version: int):
91327a77
AA
1850 """
1851 Check if object's version is the expected one.
1852 :param version: expected version of the object
1853 :param type: int
1854 """
1855 cdef:
1856 uint64_t _version = version
1857
1858 with nogil:
1859 rados_write_op_assert_version(self.write_op, _version)
1860
f67539c2 1861 def zero(self, offset: int, length: int):
7c673cae
FG
1862 """
1863 Zero part of an object.
1864 :param offset: byte offset in the object to begin writing at
7c673cae 1865 :param offset: number of zero to write
7c673cae
FG
1866 """
1867
1868 cdef:
1869 size_t _length = length
1870 uint64_t _offset = offset
1871
1872 with nogil:
1873 rados_write_op_zero(self.write_op, _length, _offset)
1874
f67539c2 1875 def truncate(self, offset: int):
7c673cae
FG
1876 """
1877 Truncate an object.
1878 :param offset: byte offset in the object to begin truncating at
7c673cae
FG
1879 """
1880
1881 cdef:
1882 uint64_t _offset = offset
1883
1884 with nogil:
1885 rados_write_op_truncate(self.write_op, _offset)
1886
f67539c2 1887 def execute(self, cls: str, method: str, data: bytes):
9f95a23c
TL
1888 """
1889 Execute an OSD class method on an object
1890
1891 :param cls: name of the object class
9f95a23c 1892 :param method: name of the method
9f95a23c 1893 :param data: input data
9f95a23c
TL
1894 """
1895
f67539c2
TL
1896 cls_raw = cstr(cls, 'cls')
1897 method_raw = cstr(method, 'method')
9f95a23c 1898 cdef:
f67539c2
TL
1899 char *_cls = cls_raw
1900 char *_method = method_raw
9f95a23c
TL
1901 char *_data = data
1902 size_t _data_len = len(data)
1903
1904 with nogil:
1905 rados_write_op_exec(self.write_op, _cls, _method, _data, _data_len, NULL)
1906
f67539c2 1907 def writesame(self, to_write: bytes, write_len: int, offset: int = 0):
9f95a23c
TL
1908 """
1909 Write the same buffer multiple times
1910 :param to_write: data to write
9f95a23c 1911 :param write_len: total number of bytes to write
9f95a23c 1912 :param offset: byte offset in the object to begin writing at
9f95a23c
TL
1913 """
1914 cdef:
1915 char *_to_write = to_write
1916 size_t _data_len = len(to_write)
1917 size_t _write_len = write_len
1918 uint64_t _offset = offset
1919 with nogil:
1920 rados_write_op_writesame(self.write_op, _to_write, _data_len, _write_len, _offset)
7c673cae 1921
f67539c2
TL
1922 def cmpext(self, cmp_buf: bytes, offset: int = 0):
1923 """
1924 Ensure that given object range (extent) satisfies comparison
1925 :param cmp_buf: buffer containing bytes to be compared with object contents
1926 :param offset: object byte offset at which to start the comparison
1927 """
1928 cdef:
1929 char *_cmp_buf = cmp_buf
1930 size_t _cmp_buf_len = len(cmp_buf)
1931 uint64_t _offset = offset
1932 with nogil:
1933 rados_write_op_cmpext(self.write_op, _cmp_buf, _cmp_buf_len, _offset, NULL)
1934
05a536ef 1935 def omap_cmp(self, key: OMAP_KEY_TYPE, val: OMAP_KEY_TYPE, cmp_op: int = LIBRADOS_CMPXATTR_OP_EQ):
20effc67
TL
1936 """
1937 Ensure that an omap key value satisfies comparison
1938 :param key: omap key whose associated value is evaluated for comparison
1939 :param val: value to compare with
1940 :param cmp_op: comparison operator, one of LIBRADOS_CMPXATTR_OP_EQ (1),
1941 LIBRADOS_CMPXATTR_OP_GT (3), or LIBRADOS_CMPXATTR_OP_LT (5).
1942 """
1943 key_raw = cstr(key, 'key')
1944 val_raw = cstr(val, 'val')
1945 cdef:
1946 char *_key = key_raw
1947 char *_val = val_raw
1948 size_t _val_len = len(val)
1949 uint8_t _comparison_operator = cmp_op
1950 with nogil:
1951 rados_write_op_omap_cmp(self.write_op, _key, _comparison_operator, _val, _val_len, NULL)
1952
7c673cae
FG
1953class WriteOpCtx(WriteOp, OpCtx):
1954 """write operation context manager"""
1955
1956
1957cdef class ReadOp(object):
1958 cdef rados_read_op_t read_op
1959
1960 def create(self):
1961 with nogil:
1962 self.read_op = rados_create_read_op()
1963 return self
1964
1965 def release(self):
1966 with nogil:
1967 rados_release_read_op(self.read_op)
1968
f67539c2
TL
1969 def cmpext(self, cmp_buf: bytes, offset: int = 0):
1970 """
1971 Ensure that given object range (extent) satisfies comparison
1972 :param cmp_buf: buffer containing bytes to be compared with object contents
1973 :param offset: object byte offset at which to start the comparison
1974 """
1975 cdef:
1976 char *_cmp_buf = cmp_buf
1977 size_t _cmp_buf_len = len(cmp_buf)
1978 uint64_t _offset = offset
1979 with nogil:
1980 rados_read_op_cmpext(self.read_op, _cmp_buf, _cmp_buf_len, _offset, NULL)
1981
1982 def set_flags(self, flags: int = LIBRADOS_OPERATION_NOFLAG):
7c673cae
FG
1983 """
1984 Set flags for the last operation added to this read_op.
1985 :para flags: flags to apply to the last operation
7c673cae
FG
1986 """
1987
1988 cdef:
1989 int _flags = flags
1990
1991 with nogil:
1992 rados_read_op_set_flags(self.read_op, _flags)
1993
1994
1995class ReadOpCtx(ReadOp, OpCtx):
1996 """read operation context manager"""
1997
1998
cd265ab1
TL
1999cdef void __watch_callback(void *_arg, int64_t _notify_id, uint64_t _cookie,
2000 uint64_t _notifier_id, void *_data,
2001 size_t _data_len) with gil:
2002 """
2003 Watch callback
2004 """
2005 cdef object watch = <object>_arg
2006 data = None
2007 if _data != NULL:
2008 data = (<char *>_data)[:_data_len]
2009 watch._callback(_notify_id, _notifier_id, _cookie, data)
2010
2011cdef void __watch_error_callback(void *_arg, uint64_t _cookie,
2012 int _error) with gil:
2013 """
2014 Watch error callback
2015 """
2016 cdef object watch = <object>_arg
2017 watch._error_callback(_cookie, _error)
2018
2019
2020cdef class Watch(object):
2021 """watch object"""
2022
2023 cdef:
2024 object id
2025 Ioctx ioctx
2026 object oid
2027 object callback
2028 object error_callback
2029
2030 def __cinit__(self, Ioctx ioctx, object oid, object callback,
2031 object error_callback, object timeout):
2032 self.id = 0
2033 self.ioctx = ioctx.dup()
2034 self.oid = cstr(oid, 'oid')
2035 self.callback = callback
2036 self.error_callback = error_callback
2037
2038 if timeout is None:
2039 timeout = 0
2040
2041 cdef:
2042 char *_oid = self.oid
2043 uint64_t _cookie;
2044 uint32_t _timeout = timeout;
2045 void *_args = <PyObject*>self
2046
2047 with nogil:
2048 ret = rados_watch3(self.ioctx.io, _oid, &_cookie,
2049 <rados_watchcb2_t>&__watch_callback,
2050 <rados_watcherrcb_t>&__watch_error_callback,
2051 _timeout, _args)
2052 if ret < 0:
2053 raise make_ex(ret, "watch error")
2054
2055 self.id = int(_cookie);
2056
2057 def __enter__(self):
2058 return self
2059
2060 def __exit__(self, type_, value, traceback):
2061 self.close()
2062 return False
2063
2064 def __dealloc__(self):
20effc67
TL
2065 if self.id == 0:
2066 return
cd265ab1
TL
2067 self.ioctx.rados.require_state("connected")
2068 self.close()
2069
2070 def _callback(self, notify_id, notifier_id, watch_id, data):
2071 replay = self.callback(notify_id, notifier_id, watch_id, data)
2072
2073 cdef:
2074 rados_ioctx_t _io = <rados_ioctx_t>self.ioctx.io
2075 char *_obj = self.oid
2076 int64_t _notify_id = notify_id
2077 uint64_t _cookie = watch_id
2078 char *_replay = NULL
2079 int _replay_len = 0
2080
2081 if replay is not None:
2082 replay = cstr(replay, 'replay')
2083 _replay = replay
2084 _replaylen = len(replay)
2085
2086 with nogil:
2087 rados_notify_ack(_io, _obj, _notify_id, _cookie, _replay,
2088 _replaylen)
2089
2090 def _error_callback(self, watch_id, error):
2091 if self.error_callback is None:
2092 return
2093 self.error_callback(watch_id, error)
2094
f67539c2 2095 def get_id(self) -> int:
cd265ab1
TL
2096 return self.id
2097
2098 def check(self):
2099 """
2100 Check on watch validity.
2101
2102 :raises: :class:`Error`
2103 :returns: timedelta since last confirmed valid
2104 """
2105 self.ioctx.require_ioctx_open()
2106
2107 cdef:
2108 uint64_t _cookie = self.id
2109
2110 with nogil:
2111 ret = rados_watch_check(self.ioctx.io, _cookie)
2112 if ret < 0:
2113 raise make_ex(ret, "check error")
2114
2115 return timedelta(milliseconds=ret)
2116
2117 def close(self):
2118 """
2119 Unregister an interest in an object.
2120
2121 :raises: :class:`Error`
2122 """
2123 if self.id == 0:
2124 return
2125
2126 self.ioctx.require_ioctx_open()
2127
2128 cdef:
2129 uint64_t _cookie = self.id
2130
2131 with nogil:
2132 ret = rados_unwatch2(self.ioctx.io, _cookie)
2133 if ret < 0 and ret != -errno.ENOENT:
2134 raise make_ex(ret, "unwatch error")
2135 self.id = 0
2136
2137 with nogil:
2138 cluster = rados_ioctx_get_cluster(self.ioctx.io)
2139 ret = rados_watch_flush(cluster);
2140 if ret < 0:
2141 raise make_ex(ret, "watch_flush error")
2142
2143 self.ioctx.close()
2144
2145
7c673cae
FG
2146cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
2147 """
2148 Callback to oncomplete() for asynchronous operations
2149 """
2150 cdef object cb = <object>args
2151 cb._complete()
2152 return 0
2153
7c673cae
FG
2154cdef class Ioctx(object):
2155 """rados.Ioctx object"""
2156 # NOTE(sileht): attributes declared in .pyd
2157
cd265ab1
TL
2158 def __init__(self, rados, name):
2159 self.rados = rados
7c673cae
FG
2160 self.name = name
2161 self.state = "open"
2162
2163 self.locator_key = ""
2164 self.nspace = ""
2165 self.lock = threading.Lock()
2166 self.safe_completions = []
2167 self.complete_completions = []
2168
2169 def __enter__(self):
2170 return self
2171
2172 def __exit__(self, type_, value, traceback):
2173 self.close()
2174 return False
2175
2176 def __dealloc__(self):
2177 self.close()
2178
2179 def __track_completion(self, completion_obj):
2180 if completion_obj.oncomplete:
2181 with self.lock:
2182 self.complete_completions.append(completion_obj)
2183 if completion_obj.onsafe:
2184 with self.lock:
2185 self.safe_completions.append(completion_obj)
2186
f67539c2
TL
2187 def __get_completion(self,
2188 oncomplete: Callable[[Completion], None],
2189 onsafe: Callable[[Completion], None]):
7c673cae
FG
2190 """
2191 Constructs a completion to use with asynchronous operations
2192
2193 :param oncomplete: what to do when the write is safe and complete in memory
2194 on all replicas
7c673cae
FG
2195 :param onsafe: what to do when the write is safe and complete on storage
2196 on all replicas
7c673cae
FG
2197
2198 :raises: :class:`Error`
2199 :returns: completion object
2200 """
2201
2202 completion_obj = Completion(self, oncomplete, onsafe)
2203
2204 cdef:
2205 rados_callback_t complete_cb = NULL
7c673cae
FG
2206 rados_completion_t completion
2207 PyObject* p_completion_obj= <PyObject*>completion_obj
2208
2209 if oncomplete:
2210 complete_cb = <rados_callback_t>&__aio_complete_cb
7c673cae
FG
2211
2212 with nogil:
9f95a23c 2213 ret = rados_aio_create_completion2(p_completion_obj, complete_cb,
7c673cae
FG
2214 &completion)
2215 if ret < 0:
2216 raise make_ex(ret, "error getting a completion")
2217
2218 completion_obj.rados_comp = completion
2219 return completion_obj
2220
cd265ab1
TL
2221 def dup(self):
2222 """
2223 Duplicate IoCtx
2224 """
2225
2226 ioctx = self.rados.open_ioctx2(self.get_pool_id())
2227 ioctx.set_namespace(self.get_namespace())
2228 return ioctx
2229
f67539c2
TL
2230 def aio_stat(self,
2231 object_name: str,
2232 oncomplete: Callable[[Completion, Optional[int], Optional[time.struct_time]], None]) -> Completion:
7c673cae
FG
2233 """
2234 Asynchronously get object stats (size/mtime)
2235
2236 oncomplete will be called with the returned size and mtime
2237 as well as the completion:
2238
2239 oncomplete(completion, size, mtime)
2240
2241 :param object_name: the name of the object to get stats from
7c673cae 2242 :param oncomplete: what to do when the stat is complete
7c673cae
FG
2243
2244 :raises: :class:`Error`
2245 :returns: completion object
2246 """
2247
f67539c2 2248 object_name_raw = cstr(object_name, 'object_name')
7c673cae
FG
2249
2250 cdef:
2251 Completion completion
f67539c2 2252 char *_object_name = object_name_raw
7c673cae
FG
2253 uint64_t psize
2254 time_t pmtime
2255
2256 def oncomplete_(completion_v):
2257 cdef Completion _completion_v = completion_v
2258 return_value = _completion_v.get_return_value()
2259 if return_value >= 0:
2260 return oncomplete(_completion_v, psize, time.localtime(pmtime))
2261 else:
2262 return oncomplete(_completion_v, None, None)
2263
2264 completion = self.__get_completion(oncomplete_, None)
2265 self.__track_completion(completion)
2266 with nogil:
2267 ret = rados_aio_stat(self.io, _object_name, completion.rados_comp,
2268 &psize, &pmtime)
2269
2270 if ret < 0:
2271 completion._cleanup()
2272 raise make_ex(ret, "error stating %s" % object_name)
2273 return completion
2274
f67539c2
TL
2275 def aio_write(self, object_name: str, to_write: bytes, offset: int = 0,
2276 oncomplete: Optional[Callable[[Completion], None]] = None,
2277 onsafe: Optional[Callable[[Completion], None]] = None) -> Completion:
7c673cae
FG
2278 """
2279 Write data to an object asynchronously
2280
2281 Queues the write and returns.
2282
2283 :param object_name: name of the object
7c673cae 2284 :param to_write: data to write
7c673cae 2285 :param offset: byte offset in the object to begin writing at
7c673cae
FG
2286 :param oncomplete: what to do when the write is safe and complete in memory
2287 on all replicas
7c673cae
FG
2288 :param onsafe: what to do when the write is safe and complete on storage
2289 on all replicas
7c673cae
FG
2290
2291 :raises: :class:`Error`
2292 :returns: completion object
2293 """
2294
f67539c2 2295 object_name_raw = cstr(object_name, 'object_name')
7c673cae
FG
2296
2297 cdef:
2298 Completion completion
f67539c2 2299 char* _object_name = object_name_raw
7c673cae
FG
2300 char* _to_write = to_write
2301 size_t size = len(to_write)
2302 uint64_t _offset = offset
2303
2304 completion = self.__get_completion(oncomplete, onsafe)
2305 self.__track_completion(completion)
2306 with nogil:
2307 ret = rados_aio_write(self.io, _object_name, completion.rados_comp,
2308 _to_write, size, _offset)
2309 if ret < 0:
2310 completion._cleanup()
2311 raise make_ex(ret, "error writing object %s" % object_name)
2312 return completion
2313
f67539c2
TL
2314 def aio_write_full(self, object_name: str, to_write: bytes,
2315 oncomplete: Optional[Callable] = None,
2316 onsafe: Optional[Callable] = None) -> Completion:
7c673cae 2317 """
11fdf7f2 2318 Asynchronously write an entire object
7c673cae
FG
2319
2320 The object is filled with the provided data. If the object exists,
2321 it is atomically truncated and then written.
2322 Queues the write and returns.
2323
2324 :param object_name: name of the object
7c673cae 2325 :param to_write: data to write
7c673cae
FG
2326 :param oncomplete: what to do when the write is safe and complete in memory
2327 on all replicas
7c673cae
FG
2328 :param onsafe: what to do when the write is safe and complete on storage
2329 on all replicas
7c673cae
FG
2330
2331 :raises: :class:`Error`
2332 :returns: completion object
2333 """
2334
f67539c2 2335 object_name_raw = cstr(object_name, 'object_name')
7c673cae
FG
2336
2337 cdef:
2338 Completion completion
f67539c2 2339 char* _object_name = object_name_raw
7c673cae
FG
2340 char* _to_write = to_write
2341 size_t size = len(to_write)
2342
2343 completion = self.__get_completion(oncomplete, onsafe)
2344 self.__track_completion(completion)
2345 with nogil:
2346 ret = rados_aio_write_full(self.io, _object_name,
2347 completion.rados_comp,
2348 _to_write, size)
2349 if ret < 0:
2350 completion._cleanup()
2351 raise make_ex(ret, "error writing object %s" % object_name)
2352 return completion
2353
f67539c2
TL
2354 def aio_writesame(self, object_name: str, to_write: bytes,
2355 write_len: int, offset: int = 0,
2356 oncomplete: Optional[Callable] = None) -> Completion:
9f95a23c
TL
2357 """
2358 Asynchronously write the same buffer multiple times
2359
2360 :param object_name: name of the object
9f95a23c 2361 :param to_write: data to write
9f95a23c 2362 :param write_len: total number of bytes to write
9f95a23c 2363 :param offset: byte offset in the object to begin writing at
9f95a23c
TL
2364 :param oncomplete: what to do when the writesame is safe and
2365 complete in memory on all replicas
9f95a23c
TL
2366 :raises: :class:`Error`
2367 :returns: completion object
2368 """
2369
f67539c2 2370 object_name_raw = cstr(object_name, 'object_name')
9f95a23c
TL
2371
2372 cdef:
2373 Completion completion
f67539c2 2374 char* _object_name = object_name_raw
9f95a23c
TL
2375 char* _to_write = to_write
2376 size_t _data_len = len(to_write)
2377 size_t _write_len = write_len
2378 uint64_t _offset = offset
2379
2380 completion = self.__get_completion(oncomplete, None)
2381 self.__track_completion(completion)
2382 with nogil:
2383 ret = rados_aio_writesame(self.io, _object_name, completion.rados_comp,
2384 _to_write, _data_len, _write_len, _offset)
2385
2386 if ret < 0:
2387 completion._cleanup()
2388 raise make_ex(ret, "error writing object %s" % object_name)
2389 return completion
2390
f67539c2
TL
2391 def aio_append(self, object_name: str, to_append: bytes,
2392 oncomplete: Optional[Callable] = None,
2393 onsafe: Optional[Callable] = None) -> Completion:
7c673cae 2394 """
11fdf7f2 2395 Asynchronously append data to an object
7c673cae
FG
2396
2397 Queues the write and returns.
2398
2399 :param object_name: name of the object
7c673cae 2400 :param to_append: data to append
7c673cae 2401 :param offset: byte offset in the object to begin writing at
7c673cae
FG
2402 :param oncomplete: what to do when the write is safe and complete in memory
2403 on all replicas
7c673cae
FG
2404 :param onsafe: what to do when the write is safe and complete on storage
2405 on all replicas
7c673cae
FG
2406
2407 :raises: :class:`Error`
2408 :returns: completion object
2409 """
f67539c2 2410 object_name_raw = cstr(object_name, 'object_name')
7c673cae
FG
2411
2412 cdef:
2413 Completion completion
f67539c2 2414 char* _object_name = object_name_raw
7c673cae
FG
2415 char* _to_append = to_append
2416 size_t size = len(to_append)
2417
2418 completion = self.__get_completion(oncomplete, onsafe)
2419 self.__track_completion(completion)
2420 with nogil:
2421 ret = rados_aio_append(self.io, _object_name,
2422 completion.rados_comp,
2423 _to_append, size)
2424 if ret < 0:
2425 completion._cleanup()
2426 raise make_ex(ret, "error appending object %s" % object_name)
2427 return completion
2428
2429 def aio_flush(self):
2430 """
2431 Block until all pending writes in an io context are safe
2432
2433 :raises: :class:`Error`
2434 """
2435 with nogil:
2436 ret = rados_aio_flush(self.io)
2437 if ret < 0:
2438 raise make_ex(ret, "error flushing")
2439
f67539c2
TL
2440 def aio_cmpext(self, object_name: str, cmp_buf: bytes, offset: int = 0,
2441 oncomplete: Optional[Callable] = None) -> Completion:
2442 """
2443 Asynchronously compare an on-disk object range with a buffer
2444 :param object_name: the name of the object
2445 :param cmp_buf: buffer containing bytes to be compared with object contents
2446 :param offset: object byte offset at which to start the comparison
2447 :param oncomplete: what to do when the write is safe and complete in memory
2448 on all replicas
2449
2450 :raises: :class:`TypeError`
2451 returns: 0 - on success, negative error code on failure,
2452 (-MAX_ERRNO - mismatch_off) on mismatch
2453 """
2454 object_name_raw = cstr(object_name, 'object_name')
2455
2456 cdef:
2457 Completion completion
2458 char* _object_name = object_name_raw
2459 char* _cmp_buf = cmp_buf
2460 size_t _cmp_buf_len = len(cmp_buf)
2461 uint64_t _offset = offset
2462
2463 completion = self.__get_completion(oncomplete, None)
2464 self.__track_completion(completion)
2465
2466 with nogil:
2467 ret = rados_aio_cmpext(self.io, _object_name, completion.rados_comp,
2468 _cmp_buf, _cmp_buf_len, _offset)
2469
2470 if ret < 0:
2471 completion._cleanup()
2472 raise make_ex(ret, "failed to compare %s" % object_name)
2473 return completion
2474
2475 def aio_rmxattr(self, object_name: str, xattr_name: str,
2476 oncomplete: Optional[Callable] = None) -> Completion:
2477 """
2478 Asynchronously delete an extended attribute from an object
2479
2480 :param object_name: the name of the object to remove xattr from
2481 :param xattr_name: which extended attribute to remove
2482 :param oncomplete: what to do when the rmxattr completes
2483
2484 :raises: :class:`Error`
2485 :returns: completion object
2486 """
2487 object_name_raw = cstr(object_name, 'object_name')
2488 xattr_name_raw = cstr(xattr_name , 'xattr_name')
2489
2490 cdef:
2491 Completion completion
2492 char* _object_name = object_name_raw
2493 char* _xattr_name = xattr_name_raw
2494
2495 completion = self.__get_completion(oncomplete, None)
2496 self.__track_completion(completion)
2497 with nogil:
2498 ret = rados_aio_rmxattr(self.io, _object_name,
2499 completion.rados_comp, _xattr_name)
2500
2501 if ret < 0:
2502 completion._cleanup()
2503 raise make_ex(ret, "Failed to remove xattr %r" % xattr_name)
2504 return completion
2505
2506 def aio_read(self, object_name: str, length: int, offset: int,
2507 oncomplete: Optional[Callable] = None) -> Completion:
7c673cae 2508 """
11fdf7f2 2509 Asynchronously read data from an object
7c673cae
FG
2510
2511 oncomplete will be called with the returned read value as
2512 well as the completion:
2513
2514 oncomplete(completion, data_read)
2515
2516 :param object_name: name of the object to read from
7c673cae 2517 :param length: the number of bytes to read
7c673cae 2518 :param offset: byte offset in the object to begin reading from
7c673cae 2519 :param oncomplete: what to do when the read is complete
7c673cae
FG
2520
2521 :raises: :class:`Error`
2522 :returns: completion object
2523 """
2524
f67539c2 2525 object_name_raw = cstr(object_name, 'object_name')
7c673cae
FG
2526
2527 cdef:
2528 Completion completion
f67539c2 2529 char* _object_name = object_name_raw
7c673cae
FG
2530 uint64_t _offset = offset
2531
2532 char *ref_buf
2533 size_t _length = length
2534
2535 def oncomplete_(completion_v):
2536 cdef Completion _completion_v = completion_v
2537 return_value = _completion_v.get_return_value()
2538 if return_value > 0 and return_value != length:
2539 _PyBytes_Resize(&_completion_v.buf, return_value)
2540 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2541
2542 completion = self.__get_completion(oncomplete_, None)
2543 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2544 ret_buf = PyBytes_AsString(completion.buf)
2545 self.__track_completion(completion)
2546 with nogil:
2547 ret = rados_aio_read(self.io, _object_name, completion.rados_comp,
2548 ret_buf, _length, _offset)
2549 if ret < 0:
2550 completion._cleanup()
2551 raise make_ex(ret, "error reading %s" % object_name)
2552 return completion
2553
f67539c2
TL
2554 def aio_execute(self, object_name: str, cls: str, method: str,
2555 data: bytes, length: int = 8192,
2556 oncomplete: Optional[Callable[[Completion, bytes], None]] = None,
2557 onsafe: Optional[Callable[[Completion, bytes], None]] = None) -> Completion:
7c673cae
FG
2558 """
2559 Asynchronously execute an OSD class method on an object.
2560
2561 oncomplete and onsafe will be called with the data returned from
2562 the plugin as well as the completion:
2563
2564 oncomplete(completion, data)
2565 onsafe(completion, data)
2566
2567 :param object_name: name of the object
7c673cae 2568 :param cls: name of the object class
7c673cae 2569 :param method: name of the method
7c673cae 2570 :param data: input data
7c673cae 2571 :param length: size of output buffer in bytes (default=8192)
7c673cae 2572 :param oncomplete: what to do when the execution is complete
7c673cae 2573 :param onsafe: what to do when the execution is safe and complete
7c673cae
FG
2574
2575 :raises: :class:`Error`
2576 :returns: completion object
2577 """
2578
f67539c2
TL
2579 object_name_raw = cstr(object_name, 'object_name')
2580 cls_raw = cstr(cls, 'cls')
2581 method_raw = cstr(method, 'method')
7c673cae
FG
2582 cdef:
2583 Completion completion
f67539c2
TL
2584 char *_object_name = object_name_raw
2585 char *_cls = cls_raw
2586 char *_method = method_raw
7c673cae
FG
2587 char *_data = data
2588 size_t _data_len = len(data)
2589
2590 char *ref_buf
2591 size_t _length = length
2592
2593 def oncomplete_(completion_v):
2594 cdef Completion _completion_v = completion_v
2595 return_value = _completion_v.get_return_value()
2596 if return_value > 0 and return_value != length:
2597 _PyBytes_Resize(&_completion_v.buf, return_value)
2598 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2599
2600 def onsafe_(completion_v):
2601 cdef Completion _completion_v = completion_v
2602 return_value = _completion_v.get_return_value()
2603 return onsafe(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2604
2605 completion = self.__get_completion(oncomplete_ if oncomplete else None, onsafe_ if onsafe else None)
2606 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2607 ret_buf = PyBytes_AsString(completion.buf)
2608 self.__track_completion(completion)
2609 with nogil:
2610 ret = rados_aio_exec(self.io, _object_name, completion.rados_comp,
2611 _cls, _method, _data, _data_len, ret_buf, _length)
2612 if ret < 0:
2613 completion._cleanup()
2614 raise make_ex(ret, "error executing %s::%s on %s" % (cls, method, object_name))
2615 return completion
2616
f67539c2
TL
2617 def aio_setxattr(self, object_name: str, xattr_name: str, xattr_value: bytes,
2618 oncomplete: Optional[Callable] = None) -> Completion:
2619 """
2620 Asynchronously set an extended attribute on an object
2621
2622 :param object_name: the name of the object to set xattr to
2623 :param xattr_name: which extended attribute to set
2624 :param xattr_value: the value of the extended attribute
2625 :param oncomplete: what to do when the setxttr completes
2626
2627 :raises: :class:`Error`
2628 :returns: completion object
2629 """
2630 object_name_raw = cstr(object_name, 'object_name')
2631 xattr_name_raw = cstr(xattr_name , 'xattr_name')
2632
2633 cdef:
2634 Completion completion
2635 char* _object_name = object_name_raw
2636 char* _xattr_name = xattr_name_raw
2637 char* _xattr_value = xattr_value
2638 size_t xattr_value_len = len(xattr_value)
2639
2640 completion = self.__get_completion(oncomplete, None)
2641 self.__track_completion(completion)
2642 with nogil:
2643 ret = rados_aio_setxattr(self.io, _object_name,
2644 completion.rados_comp,
2645 _xattr_name, _xattr_value, xattr_value_len)
2646
2647 if ret < 0:
2648 completion._cleanup()
2649 raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
2650 return completion
2651
2652 def aio_remove(self, object_name: str,
2653 oncomplete: Optional[Callable] = None,
2654 onsafe: Optional[Callable] = None) -> Completion:
7c673cae 2655 """
11fdf7f2 2656 Asynchronously remove an object
7c673cae
FG
2657
2658 :param object_name: name of the object to remove
7c673cae
FG
2659 :param oncomplete: what to do when the remove is safe and complete in memory
2660 on all replicas
7c673cae
FG
2661 :param onsafe: what to do when the remove is safe and complete on storage
2662 on all replicas
7c673cae
FG
2663
2664 :raises: :class:`Error`
2665 :returns: completion object
2666 """
f67539c2 2667 object_name_raw = cstr(object_name, 'object_name')
7c673cae
FG
2668
2669 cdef:
2670 Completion completion
f67539c2 2671 char* _object_name = object_name_raw
7c673cae
FG
2672
2673 completion = self.__get_completion(oncomplete, onsafe)
2674 self.__track_completion(completion)
2675 with nogil:
2676 ret = rados_aio_remove(self.io, _object_name,
2677 completion.rados_comp)
2678 if ret < 0:
2679 completion._cleanup()
2680 raise make_ex(ret, "error removing %s" % object_name)
2681 return completion
2682
2683 def require_ioctx_open(self):
2684 """
2685 Checks if the rados.Ioctx object state is 'open'
2686
2687 :raises: IoctxStateError
2688 """
2689 if self.state != "open":
2690 raise IoctxStateError("The pool is %s" % self.state)
2691
f67539c2 2692 def set_locator_key(self, loc_key: str):
7c673cae
FG
2693 """
2694 Set the key for mapping objects to pgs within an io context.
2695
2696 The key is used instead of the object name to determine which
2697 placement groups an object is put in. This affects all subsequent
2698 operations of the io context - until a different locator key is
2699 set, all objects in this io context will be placed in the same pg.
2700
2701 :param loc_key: the key to use as the object locator, or NULL to discard
2702 any previously set key
7c673cae
FG
2703
2704 :raises: :class:`TypeError`
2705 """
2706 self.require_ioctx_open()
2707 cloc_key = cstr(loc_key, 'loc_key')
2708 cdef char *_loc_key = cloc_key
2709 with nogil:
2710 rados_ioctx_locator_set_key(self.io, _loc_key)
2711 self.locator_key = loc_key
2712
f67539c2 2713 def get_locator_key(self) -> str:
7c673cae
FG
2714 """
2715 Get the locator_key of context
2716
2717 :returns: locator_key
2718 """
2719 return self.locator_key
2720
f67539c2 2721 def set_read(self, snap_id: int):
7c673cae
FG
2722 """
2723 Set the snapshot for reading objects.
2724
2725 To stop to read from snapshot, use set_read(LIBRADOS_SNAP_HEAD)
2726
2727 :param snap_id: the snapshot Id
7c673cae
FG
2728
2729 :raises: :class:`TypeError`
2730 """
2731 self.require_ioctx_open()
2732 cdef rados_snap_t _snap_id = snap_id
2733 with nogil:
2734 rados_ioctx_snap_set_read(self.io, _snap_id)
2735
f67539c2 2736 def set_namespace(self, nspace: str):
7c673cae
FG
2737 """
2738 Set the namespace for objects within an io context.
2739
2740 The namespace in addition to the object name fully identifies
2741 an object. This affects all subsequent operations of the io context
2742 - until a different namespace is set, all objects in this io context
2743 will be placed in the same namespace.
2744
2745 :param nspace: the namespace to use, or None/"" for the default namespace
7c673cae
FG
2746
2747 :raises: :class:`TypeError`
2748 """
2749 self.require_ioctx_open()
2750 if nspace is None:
2751 nspace = ""
2752 cnspace = cstr(nspace, 'nspace')
2753 cdef char *_nspace = cnspace
2754 with nogil:
2755 rados_ioctx_set_namespace(self.io, _nspace)
2756 self.nspace = nspace
2757
f67539c2 2758 def get_namespace(self) -> str:
7c673cae
FG
2759 """
2760 Get the namespace of context
2761
2762 :returns: namespace
2763 """
2764 return self.nspace
2765
2766 def close(self):
2767 """
2768 Close a rados.Ioctx object.
2769
2770 This just tells librados that you no longer need to use the io context.
2771 It may not be freed immediately if there are pending asynchronous
2772 requests on it, but you should not use an io context again after
2773 calling this function on it.
2774 """
2775 if self.state == "open":
2776 self.require_ioctx_open()
2777 with nogil:
2778 rados_ioctx_destroy(self.io)
2779 self.state = "closed"
2780
2781
f67539c2 2782 def write(self, key: str, data: bytes, offset: int = 0):
7c673cae
FG
2783 """
2784 Write data to an object synchronously
2785
2786 :param key: name of the object
7c673cae 2787 :param data: data to write
7c673cae 2788 :param offset: byte offset in the object to begin writing at
7c673cae
FG
2789
2790 :raises: :class:`TypeError`
2791 :raises: :class:`LogicError`
2792 :returns: int - 0 on success
2793 """
2794 self.require_ioctx_open()
2795
f67539c2 2796 key_raw = cstr(key, 'key')
7c673cae 2797 cdef:
f67539c2 2798 char *_key = key_raw
7c673cae
FG
2799 char *_data = data
2800 size_t length = len(data)
2801 uint64_t _offset = offset
2802
2803 with nogil:
2804 ret = rados_write(self.io, _key, _data, length, _offset)
2805 if ret == 0:
2806 return ret
2807 elif ret < 0:
2808 raise make_ex(ret, "Ioctx.write(%s): failed to write %s"
2809 % (self.name, key))
2810 else:
2811 raise LogicError("Ioctx.write(%s): rados_write \
2812returned %d, but should return zero on success." % (self.name, ret))
2813
f67539c2 2814 def write_full(self, key: str, data: bytes):
7c673cae
FG
2815 """
2816 Write an entire object synchronously.
2817
2818 The object is filled with the provided data. If the object exists,
2819 it is atomically truncated and then written.
2820
2821 :param key: name of the object
7c673cae 2822 :param data: data to write
7c673cae
FG
2823
2824 :raises: :class:`TypeError`
2825 :raises: :class:`Error`
2826 :returns: int - 0 on success
2827 """
2828 self.require_ioctx_open()
f67539c2 2829 key_raw = cstr(key, 'key')
7c673cae 2830 cdef:
f67539c2 2831 char *_key = key_raw
7c673cae
FG
2832 char *_data = data
2833 size_t length = len(data)
2834
2835 with nogil:
2836 ret = rados_write_full(self.io, _key, _data, length)
2837 if ret == 0:
2838 return ret
2839 elif ret < 0:
2840 raise make_ex(ret, "Ioctx.write_full(%s): failed to write %s"
2841 % (self.name, key))
2842 else:
2843 raise LogicError("Ioctx.write_full(%s): rados_write_full \
2844returned %d, but should return zero on success." % (self.name, ret))
2845
f67539c2 2846 def writesame(self, key: str, data: bytes, write_len: int, offset: int = 0):
9f95a23c
TL
2847 """
2848 Write the same buffer multiple times
2849 :param key: name of the object
9f95a23c 2850 :param data: data to write
9f95a23c 2851 :param write_len: total number of bytes to write
9f95a23c 2852 :param offset: byte offset in the object to begin writing at
9f95a23c
TL
2853
2854 :raises: :class:`TypeError`
2855 :raises: :class:`LogicError`
2856 """
2857 self.require_ioctx_open()
2858
f67539c2 2859 key_raw = cstr(key, 'key')
9f95a23c 2860 cdef:
f67539c2 2861 char *_key = key_raw
9f95a23c
TL
2862 char *_data = data
2863 size_t _data_len = len(data)
2864 size_t _write_len = write_len
2865 uint64_t _offset = offset
2866
2867 with nogil:
2868 ret = rados_writesame(self.io, _key, _data, _data_len, _write_len, _offset)
2869 if ret < 0:
2870 raise make_ex(ret, "Ioctx.writesame(%s): failed to write %s"
2871 % (self.name, key))
2872 assert(ret == 0)
2873
f67539c2 2874 def append(self, key: str, data: bytes):
7c673cae
FG
2875 """
2876 Append data to an object synchronously
2877
2878 :param key: name of the object
7c673cae 2879 :param data: data to write
7c673cae
FG
2880
2881 :raises: :class:`TypeError`
2882 :raises: :class:`LogicError`
2883 :returns: int - 0 on success
2884 """
2885 self.require_ioctx_open()
f67539c2 2886 key_raw = cstr(key, 'key')
7c673cae 2887 cdef:
f67539c2 2888 char *_key = key_raw
7c673cae
FG
2889 char *_data = data
2890 size_t length = len(data)
2891
2892 with nogil:
2893 ret = rados_append(self.io, _key, _data, length)
2894 if ret == 0:
2895 return ret
2896 elif ret < 0:
2897 raise make_ex(ret, "Ioctx.append(%s): failed to append %s"
2898 % (self.name, key))
2899 else:
2900 raise LogicError("Ioctx.append(%s): rados_append \
2901returned %d, but should return zero on success." % (self.name, ret))
2902
f67539c2 2903 def read(self, key: str, length: int = 8192, offset: int = 0) -> bytes:
7c673cae
FG
2904 """
2905 Read data from an object synchronously
2906
2907 :param key: name of the object
7c673cae 2908 :param length: the number of bytes to read (default=8192)
7c673cae 2909 :param offset: byte offset in the object to begin reading at
7c673cae
FG
2910
2911 :raises: :class:`TypeError`
2912 :raises: :class:`Error`
20effc67 2913 :returns: data read from object
7c673cae
FG
2914 """
2915 self.require_ioctx_open()
f67539c2 2916 key_raw = cstr(key, 'key')
7c673cae 2917 cdef:
f67539c2 2918 char *_key = key_raw
7c673cae
FG
2919 char *ret_buf
2920 uint64_t _offset = offset
2921 size_t _length = length
2922 PyObject* ret_s = NULL
2923
2924 ret_s = PyBytes_FromStringAndSize(NULL, length)
2925 try:
2926 ret_buf = PyBytes_AsString(ret_s)
2927 with nogil:
2928 ret = rados_read(self.io, _key, ret_buf, _length, _offset)
2929 if ret < 0:
2930 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2931
2932 if ret != length:
2933 _PyBytes_Resize(&ret_s, ret)
2934
2935 return <object>ret_s
2936 finally:
2937 # We DECREF unconditionally: the cast to object above will have
2938 # INCREFed if necessary. This also takes care of exceptions,
2939 # including if _PyString_Resize fails (that will free the string
2940 # itself and set ret_s to NULL, hence XDECREF).
2941 ref.Py_XDECREF(ret_s)
2942
f67539c2 2943 def execute(self, key: str, cls: str, method: str, data: bytes, length: int = 8192) -> Tuple[int, object]:
7c673cae
FG
2944 """
2945 Execute an OSD class method on an object.
2946
2947 :param key: name of the object
7c673cae 2948 :param cls: name of the object class
7c673cae 2949 :param method: name of the method
7c673cae 2950 :param data: input data
7c673cae 2951 :param length: size of output buffer in bytes (default=8192)
7c673cae
FG
2952
2953 :raises: :class:`TypeError`
2954 :raises: :class:`Error`
2955 :returns: (ret, method output)
2956 """
2957 self.require_ioctx_open()
2958
f67539c2
TL
2959 key_raw = cstr(key, 'key')
2960 cls_raw = cstr(cls, 'cls')
2961 method_raw = cstr(method, 'method')
7c673cae 2962 cdef:
f67539c2
TL
2963 char *_key = key_raw
2964 char *_cls = cls_raw
2965 char *_method = method_raw
7c673cae
FG
2966 char *_data = data
2967 size_t _data_len = len(data)
2968
2969 char *ref_buf
2970 size_t _length = length
2971 PyObject* ret_s = NULL
2972
2973 ret_s = PyBytes_FromStringAndSize(NULL, length)
2974 try:
2975 ret_buf = PyBytes_AsString(ret_s)
2976 with nogil:
2977 ret = rados_exec(self.io, _key, _cls, _method, _data,
2978 _data_len, ret_buf, _length)
2979 if ret < 0:
2980 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2981
2982 if ret != length:
2983 _PyBytes_Resize(&ret_s, ret)
2984
2985 return ret, <object>ret_s
2986 finally:
2987 # We DECREF unconditionally: the cast to object above will have
2988 # INCREFed if necessary. This also takes care of exceptions,
2989 # including if _PyString_Resize fails (that will free the string
2990 # itself and set ret_s to NULL, hence XDECREF).
2991 ref.Py_XDECREF(ret_s)
2992
f67539c2 2993 def get_stats(self) -> Dict[str, int]:
7c673cae
FG
2994 """
2995 Get pool usage statistics
2996
20effc67 2997 :returns: dict contains the following keys:
7c673cae
FG
2998
2999 - ``num_bytes`` (int) - size of pool in bytes
3000
3001 - ``num_kb`` (int) - size of pool in kbytes
3002
3003 - ``num_objects`` (int) - number of objects in the pool
3004
3005 - ``num_object_clones`` (int) - number of object clones
3006
3007 - ``num_object_copies`` (int) - number of object copies
3008
1e59de90 3009 - ``num_objects_missing_on_primary`` (int) - number of objects
7c673cae
FG
3010 missing on primary
3011
3012 - ``num_objects_unfound`` (int) - number of unfound objects
3013
3014 - ``num_objects_degraded`` (int) - number of degraded objects
3015
3016 - ``num_rd`` (int) - bytes read
3017
3018 - ``num_rd_kb`` (int) - kbytes read
3019
3020 - ``num_wr`` (int) - bytes written
3021
3022 - ``num_wr_kb`` (int) - kbytes written
3023 """
3024 self.require_ioctx_open()
3025 cdef rados_pool_stat_t stats
3026 with nogil:
3027 ret = rados_ioctx_pool_stat(self.io, &stats)
3028 if ret < 0:
3029 raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name)
3030 return {'num_bytes': stats.num_bytes,
3031 'num_kb': stats.num_kb,
3032 'num_objects': stats.num_objects,
3033 'num_object_clones': stats.num_object_clones,
3034 'num_object_copies': stats.num_object_copies,
3035 "num_objects_missing_on_primary": stats.num_objects_missing_on_primary,
3036 "num_objects_unfound": stats.num_objects_unfound,
3037 "num_objects_degraded": stats.num_objects_degraded,
3038 "num_rd": stats.num_rd,
3039 "num_rd_kb": stats.num_rd_kb,
3040 "num_wr": stats.num_wr,
3041 "num_wr_kb": stats.num_wr_kb}
3042
f67539c2 3043 def remove_object(self, key: str) -> bool:
7c673cae
FG
3044 """
3045 Delete an object
3046
3047 This does not delete any snapshots of the object.
3048
3049 :param key: the name of the object to delete
7c673cae
FG
3050
3051 :raises: :class:`TypeError`
3052 :raises: :class:`Error`
20effc67 3053 :returns: True on success
7c673cae
FG
3054 """
3055 self.require_ioctx_open()
f67539c2 3056 key_raw = cstr(key, 'key')
7c673cae 3057 cdef:
f67539c2 3058 char *_key = key_raw
7c673cae
FG
3059
3060 with nogil:
3061 ret = rados_remove(self.io, _key)
3062 if ret < 0:
3063 raise make_ex(ret, "Failed to remove '%s'" % key)
3064 return True
3065
20effc67 3066 def trunc(self, key: str, size: int) -> int:
7c673cae
FG
3067 """
3068 Resize an object
3069
3070 If this enlarges the object, the new area is logically filled with
3071 zeroes. If this shrinks the object, the excess data is removed.
3072
3073 :param key: the name of the object to resize
7c673cae 3074 :param size: the new size of the object in bytes
7c673cae
FG
3075
3076 :raises: :class:`TypeError`
3077 :raises: :class:`Error`
20effc67 3078 :returns: 0 on success, otherwise raises error
7c673cae
FG
3079 """
3080
3081 self.require_ioctx_open()
f67539c2 3082 key_raw = cstr(key, 'key')
7c673cae 3083 cdef:
f67539c2 3084 char *_key = key_raw
7c673cae
FG
3085 uint64_t _size = size
3086
3087 with nogil:
3088 ret = rados_trunc(self.io, _key, _size)
3089 if ret < 0:
3090 raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))
3091 return ret
f67539c2
TL
3092
3093 def cmpext(self, key: str, cmp_buf: bytes, offset: int = 0) -> int:
3094 '''
3095 Compare an on-disk object range with a buffer
3096 :param key: the name of the object
3097 :param cmp_buf: buffer containing bytes to be compared with object contents
3098 :param offset: object byte offset at which to start the comparison
3099
3100 :raises: :class:`TypeError`
3101 :raises: :class:`Error`
3102 :returns: 0 - on success, negative error code on failure,
3103 (-MAX_ERRNO - mismatch_off) on mismatch
3104 '''
3105 self.require_ioctx_open()
3106 key_raw = cstr(key, 'key')
3107 cdef:
3108 char *_key = key_raw
3109 char *_cmp_buf = cmp_buf
3110 size_t _cmp_buf_len = len(cmp_buf)
3111 uint64_t _offset = offset
3112 with nogil:
3113 ret = rados_cmpext(self.io, _key, _cmp_buf, _cmp_buf_len, _offset)
3114 assert ret < -MAX_ERRNO or ret == 0, "Ioctx.cmpext(%s): failed to compare %s" % (self.name, key)
3115 return ret
7c673cae 3116
f67539c2 3117 def stat(self, key: str) -> Tuple[int, time.struct_time]:
7c673cae
FG
3118 """
3119 Get object stats (size/mtime)
3120
3121 :param key: the name of the object to get stats from
7c673cae
FG
3122
3123 :raises: :class:`TypeError`
3124 :raises: :class:`Error`
3125 :returns: (size,timestamp)
3126 """
3127 self.require_ioctx_open()
3128
f67539c2 3129 key_raw = cstr(key, 'key')
7c673cae 3130 cdef:
f67539c2 3131 char *_key = key_raw
7c673cae
FG
3132 uint64_t psize
3133 time_t pmtime
3134
3135 with nogil:
3136 ret = rados_stat(self.io, _key, &psize, &pmtime)
3137 if ret < 0:
3138 raise make_ex(ret, "Failed to stat %r" % key)
3139 return psize, time.localtime(pmtime)
3140
f67539c2 3141 def get_xattr(self, key: str, xattr_name: str) -> bytes:
7c673cae
FG
3142 """
3143 Get the value of an extended attribute on an object.
3144
3145 :param key: the name of the object to get xattr from
7c673cae 3146 :param xattr_name: which extended attribute to read
7c673cae
FG
3147
3148 :raises: :class:`TypeError`
3149 :raises: :class:`Error`
20effc67 3150 :returns: value of the xattr
7c673cae
FG
3151 """
3152 self.require_ioctx_open()
3153
f67539c2
TL
3154 key_raw = cstr(key, 'key')
3155 xattr_name_raw = cstr(xattr_name, 'xattr_name')
7c673cae 3156 cdef:
f67539c2
TL
3157 char *_key = key_raw
3158 char *_xattr_name = xattr_name_raw
7c673cae
FG
3159 size_t ret_length = 4096
3160 char *ret_buf = NULL
3161
3162 try:
3163 while ret_length < 4096 * 1024 * 1024:
3164 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
3165 with nogil:
3166 ret = rados_getxattr(self.io, _key, _xattr_name, ret_buf, ret_length)
3167 if ret == -errno.ERANGE:
3168 ret_length *= 2
3169 elif ret < 0:
3170 raise make_ex(ret, "Failed to get xattr %r" % xattr_name)
3171 else:
3172 break
3173 return ret_buf[:ret]
3174 finally:
3175 free(ret_buf)
3176
f67539c2 3177 def get_xattrs(self, oid: str) -> XattrIterator:
7c673cae
FG
3178 """
3179 Start iterating over xattrs on an object.
3180
3181 :param oid: the name of the object to get xattrs from
7c673cae
FG
3182
3183 :raises: :class:`TypeError`
3184 :raises: :class:`Error`
3185 :returns: XattrIterator
3186 """
3187 self.require_ioctx_open()
3188 return XattrIterator(self, oid)
3189
f67539c2 3190 def set_xattr(self, key: str, xattr_name: str, xattr_value: bytes) -> bool:
7c673cae
FG
3191 """
3192 Set an extended attribute on an object.
3193
3194 :param key: the name of the object to set xattr to
7c673cae 3195 :param xattr_name: which extended attribute to set
7c673cae 3196 :param xattr_value: the value of the extended attribute
7c673cae
FG
3197
3198 :raises: :class:`TypeError`
3199 :raises: :class:`Error`
20effc67 3200 :returns: True on success, otherwise raise an error
7c673cae
FG
3201 """
3202 self.require_ioctx_open()
3203
f67539c2
TL
3204 key_raw = cstr(key, 'key')
3205 xattr_name_raw = cstr(xattr_name, 'xattr_name')
7c673cae 3206 cdef:
f67539c2
TL
3207 char *_key = key_raw
3208 char *_xattr_name = xattr_name_raw
7c673cae
FG
3209 char *_xattr_value = xattr_value
3210 size_t _xattr_value_len = len(xattr_value)
3211
3212 with nogil:
3213 ret = rados_setxattr(self.io, _key, _xattr_name,
3214 _xattr_value, _xattr_value_len)
3215 if ret < 0:
3216 raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
3217 return True
3218
f67539c2 3219 def rm_xattr(self, key: str, xattr_name: str) -> bool:
7c673cae
FG
3220 """
3221 Removes an extended attribute on from an object.
3222
3223 :param key: the name of the object to remove xattr from
7c673cae 3224 :param xattr_name: which extended attribute to remove
7c673cae
FG
3225
3226 :raises: :class:`TypeError`
3227 :raises: :class:`Error`
20effc67 3228 :returns: True on success, otherwise raise an error
7c673cae
FG
3229 """
3230 self.require_ioctx_open()
3231
f67539c2
TL
3232 key_raw = cstr(key, 'key')
3233 xattr_name_raw = cstr(xattr_name, 'xattr_name')
7c673cae 3234 cdef:
f67539c2
TL
3235 char *_key = key_raw
3236 char *_xattr_name = xattr_name_raw
7c673cae
FG
3237
3238 with nogil:
3239 ret = rados_rmxattr(self.io, _key, _xattr_name)
3240 if ret < 0:
3241 raise make_ex(ret, "Failed to delete key %r xattr %r" %
3242 (key, xattr_name))
3243 return True
3244
f67539c2 3245 def notify(self, obj: str, msg: str = '', timeout_ms: int = 5000) -> bool:
11fdf7f2
TL
3246 """
3247 Send a rados notification to an object.
3248
3249 :param obj: the name of the object to notify
11fdf7f2 3250 :param msg: optional message to send in the notification
11fdf7f2 3251 :param timeout_ms: notify timeout (in ms)
11fdf7f2
TL
3252
3253 :raises: :class:`TypeError`
3254 :raises: :class:`Error`
20effc67 3255 :returns: True on success, otherwise raise an error
11fdf7f2
TL
3256 """
3257 self.require_ioctx_open()
3258
3259 msglen = len(msg)
f67539c2
TL
3260 obj_raw = cstr(obj, 'obj')
3261 msg_raw = cstr(msg, 'msg')
11fdf7f2 3262 cdef:
f67539c2
TL
3263 char *_obj = obj_raw
3264 char *_msg = msg_raw
11fdf7f2
TL
3265 int _msglen = msglen
3266 uint64_t _timeout_ms = timeout_ms
3267
3268 with nogil:
3269 ret = rados_notify2(self.io, _obj, _msg, _msglen, _timeout_ms,
3270 NULL, NULL)
3271 if ret < 0:
3272 raise make_ex(ret, "Failed to notify %r" % (obj))
3273 return True
3274
f67539c2
TL
3275 def aio_notify(self, obj: str,
3276 oncomplete: Callable[[Completion, int, Optional[List], Optional[List]], None],
3277 msg: str = '', timeout_ms: int = 5000) -> Completion:
3278 """
3279 Asynchronously send a rados notification to an object
3280 """
3281 self.require_ioctx_open()
3282
3283 msglen = len(msg)
3284 obj_raw = cstr(obj, 'obj')
3285 msg_raw = cstr(msg, 'msg')
3286
3287 cdef:
3288 Completion completion
3289 char *_obj = obj_raw
3290 char *_msg = msg_raw
3291 int _msglen = msglen
3292 uint64_t _timeout_ms = timeout_ms
3293 char *reply
3294 size_t replylen = 0
3295
3296 def oncomplete_(completion_v):
3297 cdef:
3298 Completion _completion_v = completion_v
3299 notify_ack_t *acks = NULL
3300 notify_timeout_t *timeouts = NULL
3301 size_t nr_acks
3302 size_t nr_timeouts
3303 return_value = _completion_v.get_return_value()
3304 if return_value == 0:
3305 return_value = rados_decode_notify_response(reply, replylen, &acks, &nr_acks, &timeouts, &nr_timeouts)
3306 rados_buffer_free(reply)
3307 if return_value == 0:
3308 ack_list = [(ack.notifier_id, ack.cookie, '' if not ack.payload_len \
3309 else ack.payload[:ack.payload_len]) for ack in acks[:nr_acks]]
3310 timeout_list = [(timeout.notifier_id, timeout.cookie) for timeout in timeouts[:nr_timeouts]]
3311 rados_free_notify_response(acks, nr_acks, timeouts)
3312 return oncomplete(_completion_v, 0, ack_list, timeout_list)
3313 else:
3314 return oncomplete(_completion_v, return_value, None, None)
3315
3316 completion = self.__get_completion(oncomplete_, None)
3317 self.__track_completion(completion)
3318 with nogil:
3319 ret = rados_aio_notify(self.io, _obj, completion.rados_comp,
3320 _msg, _msglen, _timeout_ms, &reply, &replylen)
3321 if ret < 0:
3322 completion._cleanup()
3323 raise make_ex(ret, "aio_notify error: %s" % obj)
3324 return completion
3325
3326 def watch(self, obj: str,
3327 callback: Callable[[int, str, int, bytes], None],
3328 error_callback: Optional[Callable[[int], None]] = None,
3329 timeout: Optional[int] = None) -> Watch:
cd265ab1
TL
3330 """
3331 Register an interest in an object.
3332
3333 :param obj: the name of the object to notify
cd265ab1 3334 :param callback: what to do when a notify is received on this object
cd265ab1 3335 :param error_callback: what to do when the watch session encounters an error
cd265ab1 3336 :param timeout: how many seconds the connection will keep after disconnection
cd265ab1
TL
3337
3338 :raises: :class:`TypeError`
3339 :raises: :class:`Error`
3340 :returns: watch_id - internal id assigned to this watch
3341 """
3342 self.require_ioctx_open()
3343
3344 return Watch(self, obj, callback, error_callback, timeout)
3345
f67539c2 3346 def list_objects(self) -> ObjectIterator:
7c673cae
FG
3347 """
3348 Get ObjectIterator on rados.Ioctx object.
3349
3350 :returns: ObjectIterator
3351 """
3352 self.require_ioctx_open()
3353 return ObjectIterator(self)
3354
f67539c2 3355 def list_snaps(self) -> SnapIterator:
7c673cae
FG
3356 """
3357 Get SnapIterator on rados.Ioctx object.
3358
3359 :returns: SnapIterator
3360 """
3361 self.require_ioctx_open()
3362 return SnapIterator(self)
3363
f67539c2 3364 def get_pool_id(self) -> int:
9f95a23c
TL
3365 """
3366 Get pool id
3367
3368 :returns: int - pool id
3369 """
3370 with nogil:
3371 ret = rados_ioctx_get_id(self.io)
3372 return ret;
3373
f67539c2 3374 def get_pool_name(self) -> str:
9f95a23c
TL
3375 """
3376 Get pool name
3377
20effc67 3378 :returns: pool name
9f95a23c
TL
3379 """
3380 cdef:
3381 int name_len = 10
3382 char *name = NULL
3383
3384 try:
3385 while True:
3386 name = <char *>realloc_chk(name, name_len)
3387 with nogil:
3388 ret = rados_ioctx_get_pool_name(self.io, name, name_len)
3389 if ret > 0:
3390 break
3391 elif ret != -errno.ERANGE:
3392 raise make_ex(ret, "get pool name error")
3393 else:
3394 name_len = name_len * 2
3395
3396 return decode_cstr(name)
3397 finally:
3398 free(name)
3399
3400
f67539c2 3401 def create_snap(self, snap_name: str):
7c673cae
FG
3402 """
3403 Create a pool-wide snapshot
3404
3405 :param snap_name: the name of the snapshot
7c673cae
FG
3406
3407 :raises: :class:`TypeError`
3408 :raises: :class:`Error`
3409 """
3410 self.require_ioctx_open()
f67539c2
TL
3411 snap_name_raw = cstr(snap_name, 'snap_name')
3412 cdef char *_snap_name = snap_name_raw
7c673cae
FG
3413
3414 with nogil:
3415 ret = rados_ioctx_snap_create(self.io, _snap_name)
3416 if ret != 0:
3417 raise make_ex(ret, "Failed to create snap %s" % snap_name)
3418
f67539c2 3419 def remove_snap(self, snap_name: str):
7c673cae
FG
3420 """
3421 Removes a pool-wide snapshot
3422
3423 :param snap_name: the name of the snapshot
7c673cae
FG
3424
3425 :raises: :class:`TypeError`
3426 :raises: :class:`Error`
3427 """
3428 self.require_ioctx_open()
f67539c2
TL
3429 snap_name_raw = cstr(snap_name, 'snap_name')
3430 cdef char *_snap_name = snap_name_raw
7c673cae
FG
3431
3432 with nogil:
3433 ret = rados_ioctx_snap_remove(self.io, _snap_name)
3434 if ret != 0:
3435 raise make_ex(ret, "Failed to remove snap %s" % snap_name)
3436
f67539c2 3437 def lookup_snap(self, snap_name: str) -> Snap:
7c673cae
FG
3438 """
3439 Get the id of a pool snapshot
3440
3441 :param snap_name: the name of the snapshot to lookop
7c673cae
FG
3442
3443 :raises: :class:`TypeError`
3444 :raises: :class:`Error`
3445 :returns: Snap - on success
3446 """
3447 self.require_ioctx_open()
3448 csnap_name = cstr(snap_name, 'snap_name')
3449 cdef:
3450 char *_snap_name = csnap_name
3451 rados_snap_t snap_id
3452
3453 with nogil:
3454 ret = rados_ioctx_snap_lookup(self.io, _snap_name, &snap_id)
3455 if ret != 0:
3456 raise make_ex(ret, "Failed to lookup snap %s" % snap_name)
3457 return Snap(self, snap_name, int(snap_id))
3458
f67539c2 3459 def snap_rollback(self, oid: str, snap_name: str):
7c673cae
FG
3460 """
3461 Rollback an object to a snapshot
3462
3463 :param oid: the name of the object
7c673cae 3464 :param snap_name: the name of the snapshot
7c673cae
FG
3465
3466 :raises: :class:`TypeError`
3467 :raises: :class:`Error`
3468 """
3469 self.require_ioctx_open()
f67539c2
TL
3470 oid_raw = cstr(oid, 'oid')
3471 snap_name_raw = cstr(snap_name, 'snap_name')
7c673cae 3472 cdef:
f67539c2
TL
3473 char *_oid = oid_raw
3474 char *_snap_name = snap_name_raw
7c673cae
FG
3475
3476 with nogil:
3477 ret = rados_ioctx_snap_rollback(self.io, _oid, _snap_name)
3478 if ret != 0:
3479 raise make_ex(ret, "Failed to rollback %s" % oid)
3480
28e407b8
AA
3481 def create_self_managed_snap(self):
3482 """
3483 Creates a self-managed snapshot
3484
3485 :returns: snap id on success
3486
3487 :raises: :class:`Error`
3488 """
3489 self.require_ioctx_open()
3490 cdef:
3491 rados_snap_t _snap_id
3492 with nogil:
3493 ret = rados_ioctx_selfmanaged_snap_create(self.io, &_snap_id)
3494 if ret != 0:
3495 raise make_ex(ret, "Failed to create self-managed snapshot")
3496 return int(_snap_id)
3497
f67539c2 3498 def remove_self_managed_snap(self, snap_id: int):
28e407b8
AA
3499 """
3500 Removes a self-managed snapshot
3501
3502 :param snap_id: the name of the snapshot
28e407b8
AA
3503
3504 :raises: :class:`TypeError`
3505 :raises: :class:`Error`
3506 """
3507 self.require_ioctx_open()
3508 cdef:
3509 rados_snap_t _snap_id = snap_id
3510 with nogil:
3511 ret = rados_ioctx_selfmanaged_snap_remove(self.io, _snap_id)
3512 if ret != 0:
3513 raise make_ex(ret, "Failed to remove self-managed snapshot")
3514
f67539c2 3515 def set_self_managed_snap_write(self, snaps: Sequence[Union[int, str]]):
28e407b8
AA
3516 """
3517 Updates the write context to the specified self-managed
3518 snapshot ids.
3519
3520 :param snaps: all associated self-managed snapshot ids
28e407b8
AA
3521
3522 :raises: :class:`TypeError`
3523 :raises: :class:`Error`
3524 """
3525 self.require_ioctx_open()
3526 sorted_snaps = []
3527 snap_seq = 0
3528 if snaps:
3529 sorted_snaps = sorted([int(x) for x in snaps], reverse=True)
3530 snap_seq = sorted_snaps[0]
3531
3532 cdef:
3533 rados_snap_t _snap_seq = snap_seq
3534 rados_snap_t *_snaps = NULL
3535 int _num_snaps = len(sorted_snaps)
3536 try:
3537 _snaps = <rados_snap_t *>malloc(_num_snaps * sizeof(rados_snap_t))
3538 for i in range(len(sorted_snaps)):
3539 _snaps[i] = sorted_snaps[i]
3540 with nogil:
3541 ret = rados_ioctx_selfmanaged_snap_set_write_ctx(self.io,
3542 _snap_seq,
3543 _snaps,
3544 _num_snaps)
3545 if ret != 0:
3546 raise make_ex(ret, "Failed to update snapshot write context")
3547 finally:
3548 free(_snaps)
3549
f67539c2 3550 def rollback_self_managed_snap(self, oid: str, snap_id: int):
28e407b8
AA
3551 """
3552 Rolls an specific object back to a self-managed snapshot revision
3553
3554 :param oid: the name of the object
28e407b8 3555 :param snap_id: the name of the snapshot
28e407b8
AA
3556
3557 :raises: :class:`TypeError`
3558 :raises: :class:`Error`
3559 """
3560 self.require_ioctx_open()
f67539c2 3561 oid_raw = cstr(oid, 'oid')
28e407b8 3562 cdef:
f67539c2 3563 char *_oid = oid_raw
28e407b8
AA
3564 rados_snap_t _snap_id = snap_id
3565 with nogil:
3566 ret = rados_ioctx_selfmanaged_snap_rollback(self.io, _oid, _snap_id)
3567 if ret != 0:
3568 raise make_ex(ret, "Failed to rollback %s" % oid)
3569
f67539c2 3570 def get_last_version(self) -> int:
7c673cae
FG
3571 """
3572 Return the version of the last object read or written to.
3573
3574 This exposes the internal version number of the last object read or
3575 written via this io context
3576
3577 :returns: version of the last object used
3578 """
3579 self.require_ioctx_open()
3580 with nogil:
3581 ret = rados_get_last_version(self.io)
3582 return int(ret)
3583
f67539c2 3584 def create_write_op(self) -> WriteOp:
7c673cae
FG
3585 """
3586 create write operation object.
3587 need call release_write_op after use
3588 """
3589 return WriteOp().create()
3590
f67539c2 3591 def create_read_op(self) -> ReadOp:
7c673cae
FG
3592 """
3593 create read operation object.
3594 need call release_read_op after use
3595 """
3596 return ReadOp().create()
3597
3598 def release_write_op(self, write_op):
3599 """
3600 release memory alloc by create_write_op
3601 """
3602 write_op.release()
3603
f67539c2 3604 def release_read_op(self, read_op: ReadOp):
7c673cae
FG
3605 """
3606 release memory alloc by create_read_op
3607 :para read_op: read_op object
7c673cae
FG
3608 """
3609 read_op.release()
3610
05a536ef 3611 def set_omap(self, write_op: WriteOp, keys: Sequence[OMAP_KEY_TYPE], values: Sequence[bytes]):
7c673cae
FG
3612 """
3613 set keys values to write_op
3614 :para write_op: write_operation object
7c673cae 3615 :para keys: a tuple of keys
7c673cae 3616 :para values: a tuple of values
7c673cae
FG
3617 """
3618
3619 if len(keys) != len(values):
3620 raise Error("Rados(): keys and values must have the same number of items")
3621
3622 keys = cstr_list(keys, 'keys')
eafe8130
TL
3623 values = cstr_list(values, 'values')
3624 lens = [len(v) for v in values]
7c673cae
FG
3625 cdef:
3626 WriteOp _write_op = write_op
3627 size_t key_num = len(keys)
3628 char **_keys = to_bytes_array(keys)
3629 char **_values = to_bytes_array(values)
eafe8130 3630 size_t *_lens = to_csize_t_array(lens)
7c673cae
FG
3631
3632 try:
3633 with nogil:
3634 rados_write_op_omap_set(_write_op.write_op,
3635 <const char**>_keys,
3636 <const char**>_values,
3637 <const size_t*>_lens, key_num)
3638 finally:
3639 free(_keys)
3640 free(_values)
3641 free(_lens)
3642
f67539c2
TL
3643 def operate_write_op(self,
3644 write_op: WriteOp,
3645 oid: str,
3646 mtime: int = 0,
3647 flags: int = LIBRADOS_OPERATION_NOFLAG):
7c673cae 3648 """
11fdf7f2 3649 execute the real write operation
7c673cae 3650 :para write_op: write operation object
7c673cae 3651 :para oid: object name
7c673cae 3652 :para mtime: the time to set the mtime to, 0 for the current time
7c673cae 3653 :para flags: flags to apply to the entire operation
7c673cae
FG
3654 """
3655
f67539c2 3656 oid_raw = cstr(oid, 'oid')
7c673cae
FG
3657 cdef:
3658 WriteOp _write_op = write_op
f67539c2 3659 char *_oid = oid_raw
7c673cae
FG
3660 time_t _mtime = mtime
3661 int _flags = flags
3662
3663 with nogil:
3664 ret = rados_write_op_operate(_write_op.write_op, self.io, _oid, &_mtime, _flags)
3665 if ret != 0:
3666 raise make_ex(ret, "Failed to operate write op for oid %s" % oid)
3667
f67539c2
TL
3668 def operate_aio_write_op(self, write_op: WriteOp, oid: str,
3669 oncomplete: Optional[Callable[[Completion], None]] = None,
3670 onsafe: Optional[Callable[[Completion], None]] = None,
3671 mtime: int = 0,
3672 flags: int = LIBRADOS_OPERATION_NOFLAG) -> Completion:
7c673cae 3673 """
11fdf7f2 3674 execute the real write operation asynchronously
7c673cae 3675 :para write_op: write operation object
7c673cae 3676 :para oid: object name
7c673cae
FG
3677 :param oncomplete: what to do when the remove is safe and complete in memory
3678 on all replicas
7c673cae
FG
3679 :param onsafe: what to do when the remove is safe and complete on storage
3680 on all replicas
7c673cae 3681 :para mtime: the time to set the mtime to, 0 for the current time
7c673cae 3682 :para flags: flags to apply to the entire operation
7c673cae
FG
3683
3684 :raises: :class:`Error`
3685 :returns: completion object
3686 """
3687
f67539c2 3688 oid_raw = cstr(oid, 'oid')
7c673cae
FG
3689 cdef:
3690 WriteOp _write_op = write_op
f67539c2 3691 char *_oid = oid_raw
7c673cae
FG
3692 Completion completion
3693 time_t _mtime = mtime
3694 int _flags = flags
3695
3696 completion = self.__get_completion(oncomplete, onsafe)
3697 self.__track_completion(completion)
3698
3699 with nogil:
3700 ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid,
3701 &_mtime, _flags)
3702 if ret != 0:
3703 completion._cleanup()
3704 raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid)
3705 return completion
3706
f67539c2 3707 def operate_read_op(self, read_op: ReadOp, oid: str, flag: int = LIBRADOS_OPERATION_NOFLAG):
7c673cae 3708 """
11fdf7f2 3709 execute the real read operation
7c673cae 3710 :para read_op: read operation object
7c673cae 3711 :para oid: object name
7c673cae 3712 :para flag: flags to apply to the entire operation
7c673cae 3713 """
f67539c2 3714 oid_raw = cstr(oid, 'oid')
7c673cae
FG
3715 cdef:
3716 ReadOp _read_op = read_op
f67539c2 3717 char *_oid = oid_raw
7c673cae
FG
3718 int _flag = flag
3719
3720 with nogil:
3721 ret = rados_read_op_operate(_read_op.read_op, self.io, _oid, _flag)
3722 if ret != 0:
3723 raise make_ex(ret, "Failed to operate read op for oid %s" % oid)
3724
f67539c2
TL
3725 def operate_aio_read_op(self, read_op: ReadOp, oid: str,
3726 oncomplete: Optional[Callable[[Completion], None]] = None,
3727 onsafe: Optional[Callable[[Completion], None]] = None,
3728 flag: int = LIBRADOS_OPERATION_NOFLAG) -> Completion:
7c673cae 3729 """
11fdf7f2 3730 execute the real read operation
7c673cae 3731 :para read_op: read operation object
7c673cae 3732 :para oid: object name
7c673cae
FG
3733 :param oncomplete: what to do when the remove is safe and complete in memory
3734 on all replicas
7c673cae
FG
3735 :param onsafe: what to do when the remove is safe and complete on storage
3736 on all replicas
7c673cae 3737 :para flag: flags to apply to the entire operation
7c673cae 3738 """
f67539c2 3739 oid_raw = cstr(oid, 'oid')
7c673cae
FG
3740 cdef:
3741 ReadOp _read_op = read_op
f67539c2 3742 char *_oid = oid_raw
7c673cae
FG
3743 Completion completion
3744 int _flag = flag
3745
3746 completion = self.__get_completion(oncomplete, onsafe)
3747 self.__track_completion(completion)
3748
3749 with nogil:
3750 ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag)
3751 if ret != 0:
3752 completion._cleanup()
3753 raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid)
3754 return completion
3755
f67539c2
TL
3756 def get_omap_vals(self,
3757 read_op: ReadOp,
05a536ef
TL
3758 start_after: OMAP_KEY_TYPE,
3759 filter_prefix: OMAP_KEY_TYPE,
3760 max_return: int,
3761 omap_key_type = bytes.decode) -> Tuple[OmapIterator, int]:
7c673cae
FG
3762 """
3763 get the omap values
3764 :para read_op: read operation object
7c673cae 3765 :para start_after: list keys starting after start_after
7c673cae 3766 :para filter_prefix: list only keys beginning with filter_prefix
7c673cae 3767 :para max_return: list no more than max_return key/value pairs
7c673cae
FG
3768 :returns: an iterator over the requested omap values, return value from this action
3769 """
3770
f67539c2
TL
3771 start_after_raw = cstr(start_after, 'start_after') if start_after else None
3772 filter_prefix_raw = cstr(filter_prefix, 'filter_prefix') if filter_prefix else None
7c673cae 3773 cdef:
f67539c2
TL
3774 char *_start_after = opt_str(start_after_raw)
3775 char *_filter_prefix = opt_str(filter_prefix_raw)
7c673cae
FG
3776 ReadOp _read_op = read_op
3777 rados_omap_iter_t iter_addr = NULL
3778 int _max_return = max_return
7c673cae
FG
3779
3780 with nogil:
d2e6a577 3781 rados_read_op_omap_get_vals2(_read_op.read_op, _start_after, _filter_prefix,
91327a77 3782 _max_return, &iter_addr, NULL, NULL)
05a536ef 3783 it = OmapIterator(self, omap_key_type)
7c673cae 3784 it.ctx = iter_addr
91327a77 3785 return it, 0 # 0 is meaningless; there for backward-compat
7c673cae 3786
05a536ef
TL
3787 def get_omap_keys(self,
3788 read_op: ReadOp,
3789 start_after: OMAP_KEY_TYPE,
3790 max_return: int,
3791 omap_key_type = bytes.decode) -> Tuple[OmapIterator, int]:
7c673cae
FG
3792 """
3793 get the omap keys
3794 :para read_op: read operation object
7c673cae 3795 :para start_after: list keys starting after start_after
7c673cae 3796 :para max_return: list no more than max_return key/value pairs
7c673cae
FG
3797 :returns: an iterator over the requested omap values, return value from this action
3798 """
3799 start_after = cstr(start_after, 'start_after') if start_after else None
3800 cdef:
3801 char *_start_after = opt_str(start_after)
3802 ReadOp _read_op = read_op
3803 rados_omap_iter_t iter_addr = NULL
3804 int _max_return = max_return
7c673cae
FG
3805
3806 with nogil:
d2e6a577 3807 rados_read_op_omap_get_keys2(_read_op.read_op, _start_after,
91327a77 3808 _max_return, &iter_addr, NULL, NULL)
05a536ef 3809 it = OmapIterator(self, omap_key_type)
7c673cae 3810 it.ctx = iter_addr
91327a77 3811 return it, 0 # 0 is meaningless; there for backward-compat
7c673cae 3812
05a536ef
TL
3813 def get_omap_vals_by_keys(self,
3814 read_op: ReadOp,
3815 keys: Sequence[OMAP_KEY_TYPE],
3816 omap_key_type = bytes.decode) -> Tuple[OmapIterator, int]:
7c673cae
FG
3817 """
3818 get the omap values by keys
3819 :para read_op: read operation object
7c673cae 3820 :para keys: input key tuple
7c673cae
FG
3821 :returns: an iterator over the requested omap values, return value from this action
3822 """
3823 keys = cstr_list(keys, 'keys')
3824 cdef:
3825 ReadOp _read_op = read_op
3826 rados_omap_iter_t iter_addr
3827 char **_keys = to_bytes_array(keys)
3828 size_t key_num = len(keys)
7c673cae
FG
3829
3830 try:
3831 with nogil:
3832 rados_read_op_omap_get_vals_by_keys(_read_op.read_op,
3833 <const char**>_keys,
91327a77 3834 key_num, &iter_addr, NULL)
05a536ef 3835 it = OmapIterator(self, omap_key_type)
7c673cae 3836 it.ctx = iter_addr
91327a77 3837 return it, 0 # 0 is meaningless; there for backward-compat
7c673cae
FG
3838 finally:
3839 free(_keys)
3840
05a536ef 3841 def remove_omap_keys(self, write_op: WriteOp, keys: Sequence[OMAP_KEY_TYPE]):
7c673cae
FG
3842 """
3843 remove omap keys specifiled
3844 :para write_op: write operation object
7c673cae 3845 :para keys: input key tuple
7c673cae
FG
3846 """
3847
3848 keys = cstr_list(keys, 'keys')
3849 cdef:
3850 WriteOp _write_op = write_op
3851 size_t key_num = len(keys)
3852 char **_keys = to_bytes_array(keys)
3853
3854 try:
3855 with nogil:
3856 rados_write_op_omap_rm_keys(_write_op.write_op, <const char**>_keys, key_num)
3857 finally:
3858 free(_keys)
3859
f67539c2 3860 def clear_omap(self, write_op: WriteOp):
7c673cae
FG
3861 """
3862 Remove all key/value pairs from an object
3863 :para write_op: write operation object
7c673cae
FG
3864 """
3865
3866 cdef:
3867 WriteOp _write_op = write_op
3868
3869 with nogil:
3870 rados_write_op_omap_clear(_write_op.write_op)
3871
05a536ef 3872 def remove_omap_range2(self, write_op: WriteOp, key_begin: OMAP_KEY_TYPE, key_end: OMAP_KEY_TYPE):
f67539c2
TL
3873 """
3874 Remove key/value pairs from an object whose keys are in the range
3875 [key_begin, key_end)
3876 :param write_op: write operation object
3877 :param key_begin: the lower bound of the key range to remove
3878 :param key_end: the upper bound of the key range to remove
3879 """
3880 key_begin_raw = cstr(key_begin, 'key_begin')
3881 key_end_raw = cstr(key_end, 'key_end')
3882 cdef:
3883 WriteOp _write_op = write_op
3884 char* _key_begin = key_begin_raw
3885 size_t key_begin_len = len(key_begin)
3886 char* _key_end = key_end_raw
3887 size_t key_end_len = len(key_end)
3888 with nogil:
3889 rados_write_op_omap_rm_range2(_write_op.write_op, _key_begin, key_begin_len,
3890 _key_end, key_end_len)
3891
3892 def lock_exclusive(self, key: str, name: str, cookie: str, desc: str = "",
3893 duration: Optional[int] = None,
3894 flags: int = 0):
7c673cae
FG
3895
3896 """
3897 Take an exclusive lock on an object
3898
3899 :param key: name of the object
7c673cae 3900 :param name: name of the lock
7c673cae 3901 :param cookie: cookie of the lock
7c673cae 3902 :param desc: description of the lock
7c673cae 3903 :param duration: duration of the lock in seconds
7c673cae 3904 :param flags: flags
7c673cae
FG
3905
3906 :raises: :class:`TypeError`
3907 :raises: :class:`Error`
3908 """
3909 self.require_ioctx_open()
3910
f67539c2
TL
3911 key_raw = cstr(key, 'key')
3912 name_raw = cstr(name, 'name')
3913 cookie_raw = cstr(cookie, 'cookie')
3914 desc_raw = cstr(desc, 'desc')
7c673cae
FG
3915
3916 cdef:
f67539c2
TL
3917 char* _key = key_raw
3918 char* _name = name_raw
3919 char* _cookie = cookie_raw
3920 char* _desc = desc_raw
7c673cae
FG
3921 uint8_t _flags = flags
3922 timeval _duration
3923
3924 if duration is None:
3925 with nogil:
3926 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3927 NULL, _flags)
3928 else:
3929 _duration.tv_sec = duration
3930 with nogil:
3931 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3932 &_duration, _flags)
3933
3934 if ret < 0:
3935 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3936
f67539c2
TL
3937 def lock_shared(self, key: str, name: str, cookie: str, tag: str, desc: str = "",
3938 duration: Optional[int] = None,
3939 flags: int = 0):
7c673cae
FG
3940
3941 """
3942 Take a shared lock on an object
3943
3944 :param key: name of the object
7c673cae 3945 :param name: name of the lock
7c673cae 3946 :param cookie: cookie of the lock
7c673cae 3947 :param tag: tag of the lock
7c673cae 3948 :param desc: description of the lock
7c673cae 3949 :param duration: duration of the lock in seconds
7c673cae 3950 :param flags: flags
7c673cae
FG
3951
3952 :raises: :class:`TypeError`
3953 :raises: :class:`Error`
3954 """
3955 self.require_ioctx_open()
3956
f67539c2
TL
3957 key_raw = cstr(key, 'key')
3958 tag_raw = cstr(tag, 'tag')
3959 name_raw = cstr(name, 'name')
3960 cookie_raw = cstr(cookie, 'cookie')
3961 desc_raw = cstr(desc, 'desc')
7c673cae
FG
3962
3963 cdef:
f67539c2
TL
3964 char* _key = key_raw
3965 char* _tag = tag_raw
3966 char* _name = name_raw
3967 char* _cookie = cookie_raw
3968 char* _desc = desc_raw
7c673cae
FG
3969 uint8_t _flags = flags
3970 timeval _duration
3971
3972 if duration is None:
3973 with nogil:
3974 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3975 NULL, _flags)
3976 else:
3977 _duration.tv_sec = duration
3978 with nogil:
3979 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3980 &_duration, _flags)
3981 if ret < 0:
3982 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3983
f67539c2 3984 def unlock(self, key: str, name: str, cookie: str):
7c673cae
FG
3985
3986 """
3987 Release a shared or exclusive lock on an object
3988
3989 :param key: name of the object
7c673cae 3990 :param name: name of the lock
7c673cae 3991 :param cookie: cookie of the lock
7c673cae
FG
3992
3993 :raises: :class:`TypeError`
3994 :raises: :class:`Error`
3995 """
3996 self.require_ioctx_open()
3997
f67539c2
TL
3998 key_raw = cstr(key, 'key')
3999 name_raw = cstr(name, 'name')
4000 cookie_raw = cstr(cookie, 'cookie')
7c673cae
FG
4001
4002 cdef:
f67539c2
TL
4003 char* _key = key_raw
4004 char* _name = name_raw
4005 char* _cookie = cookie_raw
7c673cae
FG
4006
4007 with nogil:
4008 ret = rados_unlock(self.io, _key, _name, _cookie)
4009 if ret < 0:
4010 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
4011
11fdf7f2
TL
4012 def set_osdmap_full_try(self):
4013 """
4014 Set global osdmap_full_try label to true
4015 """
4016 with nogil:
f67539c2 4017 rados_set_pool_full_try(self.io)
11fdf7f2
TL
4018
4019 def unset_osdmap_full_try(self):
4020 """
4021 Unset
4022 """
4023 with nogil:
f67539c2 4024 rados_unset_pool_full_try(self.io)
11fdf7f2 4025
f67539c2 4026 def application_enable(self, app_name: str, force: bool = False):
c07f9fc5
FG
4027 """
4028 Enable an application on an OSD pool
4029
4030 :param app_name: application name
4031 :type app_name: str
4032 :param force: False if only a single app should exist per pool
4033 :type expire_seconds: boool
4034
4035 :raises: :class:`Error`
4036 """
f67539c2 4037 app_name_raw = cstr(app_name, 'app_name')
c07f9fc5 4038 cdef:
f67539c2 4039 char *_app_name = app_name_raw
c07f9fc5
FG
4040 int _force = (1 if force else 0)
4041
4042 with nogil:
4043 ret = rados_application_enable(self.io, _app_name, _force)
4044 if ret < 0:
4045 raise make_ex(ret, "error enabling application")
4046
f67539c2 4047 def application_list(self) -> List[str]:
c07f9fc5
FG
4048 """
4049 Returns a list of enabled applications
4050
4051 :returns: list of app name string
4052 """
4053 cdef:
4054 size_t length = 128
4055 char *apps = NULL
4056
4057 try:
4058 while True:
4059 apps = <char *>realloc_chk(apps, length)
4060 with nogil:
4061 ret = rados_application_list(self.io, apps, &length)
4062 if ret == 0:
4063 return [decode_cstr(app) for app in
4064 apps[:length].split(b'\0') if app]
4065 elif ret == -errno.ENOENT:
4066 return None
4067 elif ret == -errno.ERANGE:
4068 pass
4069 else:
4070 raise make_ex(ret, "error listing applications")
4071 finally:
4072 free(apps)
4073
f67539c2 4074 def application_metadata_get(self, app_name: str, key: str) -> str:
9f95a23c
TL
4075 """
4076 Gets application metadata on an OSD pool for the given key
4077
4078 :param app_name: application name
4079 :type app_name: str
4080 :param key: metadata key
4081 :type key: str
4082 :returns: str - metadata value
4083
4084 :raises: :class:`Error`
4085 """
4086
f67539c2
TL
4087 app_name_raw = cstr(app_name, 'app_name')
4088 key_raw = cstr(key, 'key')
9f95a23c 4089 cdef:
f67539c2
TL
4090 char *_app_name = app_name_raw
4091 char *_key = key_raw
9f95a23c
TL
4092 size_t size = 129
4093 char *value = NULL
4094 int ret
4095 try:
4096 while True:
4097 value = <char *>realloc_chk(value, size)
4098 with nogil:
4099 ret = rados_application_metadata_get(self.io, _app_name,
4100 _key, value, &size)
4101 if ret != -errno.ERANGE:
4102 break
4103 if ret == -errno.ENOENT:
4104 raise KeyError('no metadata %s for application %s' % (key, _app_name))
4105 elif ret != 0:
4106 raise make_ex(ret, 'error getting metadata %s for application %s' %
4107 (key, _app_name))
4108 return decode_cstr(value)
4109 finally:
4110 free(value)
4111
f67539c2 4112 def application_metadata_set(self, app_name: str, key: str, value: str):
c07f9fc5
FG
4113 """
4114 Sets application metadata on an OSD pool
4115
4116 :param app_name: application name
4117 :type app_name: str
4118 :param key: metadata key
4119 :type key: str
4120 :param value: metadata value
4121 :type value: str
4122
4123 :raises: :class:`Error`
4124 """
f67539c2
TL
4125 app_name_raw = cstr(app_name, 'app_name')
4126 key_raw = cstr(key, 'key')
4127 value_raw = cstr(value, 'value')
c07f9fc5 4128 cdef:
f67539c2
TL
4129 char *_app_name = app_name_raw
4130 char *_key = key_raw
4131 char *_value = value_raw
c07f9fc5
FG
4132
4133 with nogil:
4134 ret = rados_application_metadata_set(self.io, _app_name, _key,
4135 _value)
4136 if ret < 0:
4137 raise make_ex(ret, "error setting application metadata")
4138
f67539c2 4139 def application_metadata_remove(self, app_name: str, key: str):
c07f9fc5
FG
4140 """
4141 Remove application metadata from an OSD pool
4142
4143 :param app_name: application name
4144 :type app_name: str
4145 :param key: metadata key
4146 :type key: str
4147
4148 :raises: :class:`Error`
4149 """
f67539c2
TL
4150 app_name_raw = cstr(app_name, 'app_name')
4151 key_raw = cstr(key, 'key')
c07f9fc5 4152 cdef:
f67539c2
TL
4153 char *_app_name = app_name_raw
4154 char *_key = key_raw
c07f9fc5
FG
4155
4156 with nogil:
4157 ret = rados_application_metadata_remove(self.io, _app_name, _key)
4158 if ret < 0:
4159 raise make_ex(ret, "error removing application metadata")
4160
f67539c2 4161 def application_metadata_list(self, app_name: str) -> List[Tuple[str, str]]:
c07f9fc5
FG
4162 """
4163 Returns a list of enabled applications
4164
4165 :param app_name: application name
4166 :type app_name: str
4167 :returns: list of key/value tuples
4168 """
f67539c2 4169 app_name_raw = cstr(app_name, 'app_name')
c07f9fc5 4170 cdef:
f67539c2 4171 char *_app_name = app_name_raw
c07f9fc5
FG
4172 size_t key_length = 128
4173 size_t val_length = 128
4174 char *c_keys = NULL
4175 char *c_vals = NULL
4176
4177 try:
4178 while True:
4179 c_keys = <char *>realloc_chk(c_keys, key_length)
4180 c_vals = <char *>realloc_chk(c_vals, val_length)
4181 with nogil:
4182 ret = rados_application_metadata_list(self.io, _app_name,
4183 c_keys, &key_length,
4184 c_vals, &val_length)
4185 if ret == 0:
4186 keys = [decode_cstr(key) for key in
11fdf7f2 4187 c_keys[:key_length].split(b'\0')]
c07f9fc5 4188 vals = [decode_cstr(val) for val in
11fdf7f2 4189 c_vals[:val_length].split(b'\0')]
9f95a23c 4190 return list(zip(keys, vals))[:-1]
c07f9fc5
FG
4191 elif ret == -errno.ERANGE:
4192 pass
4193 else:
4194 raise make_ex(ret, "error listing application metadata")
4195 finally:
4196 free(c_keys)
4197 free(c_vals)
4198
f67539c2 4199 def alignment(self) -> int:
11fdf7f2
TL
4200 """
4201 Returns pool alignment
4202
4203 :returns:
4204 Number of alignment bytes required by the current pool, or None if
4205 alignment is not required.
4206 """
4207 cdef:
4208 int requires = 0
4209 uint64_t _alignment
4210
4211 with nogil:
4212 ret = rados_ioctx_pool_requires_alignment2(self.io, &requires)
4213 if ret != 0:
4214 raise make_ex(ret, "error checking alignment")
4215
4216 alignment = None
4217 if requires:
4218 with nogil:
4219 ret = rados_ioctx_pool_required_alignment2(self.io, &_alignment)
4220 if ret != 0:
4221 raise make_ex(ret, "error querying alignment")
4222 alignment = _alignment
4223 return alignment
4224
7c673cae
FG
4225
4226def set_object_locator(func):
4227 def retfunc(self, *args, **kwargs):
4228 if self.locator_key is not None:
4229 old_locator = self.ioctx.get_locator_key()
4230 self.ioctx.set_locator_key(self.locator_key)
4231 retval = func(self, *args, **kwargs)
4232 self.ioctx.set_locator_key(old_locator)
4233 return retval
4234 else:
4235 return func(self, *args, **kwargs)
4236 return retfunc
4237
4238
4239def set_object_namespace(func):
4240 def retfunc(self, *args, **kwargs):
4241 if self.nspace is None:
4242 raise LogicError("Namespace not set properly in context")
4243 old_nspace = self.ioctx.get_namespace()
4244 self.ioctx.set_namespace(self.nspace)
4245 retval = func(self, *args, **kwargs)
4246 self.ioctx.set_namespace(old_nspace)
4247 return retval
4248 return retfunc
4249
4250
4251class Object(object):
4252 """Rados object wrapper, makes the object look like a file"""
4253 def __init__(self, ioctx, key, locator_key=None, nspace=None):
4254 self.key = key
4255 self.ioctx = ioctx
4256 self.offset = 0
4257 self.state = "exists"
4258 self.locator_key = locator_key
4259 self.nspace = "" if nspace is None else nspace
4260
4261 def __str__(self):
4262 return "rados.Object(ioctx=%s,key=%s,nspace=%s,locator=%s)" % \
4263 (str(self.ioctx), self.key, "--default--"
4264 if self.nspace is "" else self.nspace, self.locator_key)
4265
4266 def require_object_exists(self):
4267 if self.state != "exists":
4268 raise ObjectStateError("The object is %s" % self.state)
4269
4270 @set_object_locator
4271 @set_object_namespace
4272 def read(self, length=1024 * 1024):
4273 self.require_object_exists()
4274 ret = self.ioctx.read(self.key, length, self.offset)
4275 self.offset += len(ret)
4276 return ret
4277
4278 @set_object_locator
4279 @set_object_namespace
4280 def write(self, string_to_write):
4281 self.require_object_exists()
4282 ret = self.ioctx.write(self.key, string_to_write, self.offset)
4283 if ret == 0:
4284 self.offset += len(string_to_write)
4285 return ret
4286
4287 @set_object_locator
4288 @set_object_namespace
4289 def remove(self):
4290 self.require_object_exists()
4291 self.ioctx.remove_object(self.key)
4292 self.state = "removed"
4293
4294 @set_object_locator
4295 @set_object_namespace
f67539c2 4296 def stat(self) -> Tuple[int, time.struct_time]:
7c673cae
FG
4297 self.require_object_exists()
4298 return self.ioctx.stat(self.key)
4299
f67539c2 4300 def seek(self, position: int):
7c673cae
FG
4301 self.require_object_exists()
4302 self.offset = position
4303
4304 @set_object_locator
4305 @set_object_namespace
f67539c2 4306 def get_xattr(self, xattr_name: str) -> bytes:
7c673cae
FG
4307 self.require_object_exists()
4308 return self.ioctx.get_xattr(self.key, xattr_name)
4309
4310 @set_object_locator
4311 @set_object_namespace
f67539c2 4312 def get_xattrs(self) -> XattrIterator:
7c673cae
FG
4313 self.require_object_exists()
4314 return self.ioctx.get_xattrs(self.key)
4315
4316 @set_object_locator
4317 @set_object_namespace
f67539c2 4318 def set_xattr(self, xattr_name: str, xattr_value: bytes) -> bool:
7c673cae
FG
4319 self.require_object_exists()
4320 return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)
4321
4322 @set_object_locator
4323 @set_object_namespace
f67539c2 4324 def rm_xattr(self, xattr_name: str) -> bool:
7c673cae
FG
4325 self.require_object_exists()
4326 return self.ioctx.rm_xattr(self.key, xattr_name)
4327
4328MONITOR_LEVELS = [
4329 "debug",
4330 "info",
4331 "warn", "warning",
4332 "err", "error",
4333 "sec",
4334 ]
4335
4336
4337class MonitorLog(object):
4338 # NOTE(sileht): Keep this class for backward compat
4339 # method moved to Rados.monitor_log()
4340 """
4341 For watching cluster log messages. Instantiate an object and keep
4342 it around while callback is periodically called. Construct with
4343 'level' to monitor 'level' messages (one of MONITOR_LEVELS).
4344 arg will be passed to the callback.
4345
4346 callback will be called with:
4347 arg (given to __init__)
4348 line (the full line, including timestamp, who, level, msg)
4349 who (which entity issued the log message)
4350 timestamp_sec (sec of a struct timespec)
4351 timestamp_nsec (sec of a struct timespec)
4352 seq (sequence number)
4353 level (string representing the level of the log message)
4354 msg (the message itself)
4355 callback's return value is ignored
4356 """
4357 def __init__(self, cluster, level, callback, arg):
4358 self.level = level
4359 self.callback = callback
4360 self.arg = arg
4361 self.cluster = cluster
4362 self.cluster.monitor_log(level, callback, arg)
4363