]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/rados/rados.pyx
update ceph source to reef 18.2.0
[ceph.git] / ceph / src / pybind / rados / rados.pyx
1 # cython: embedsignature=True, binding=True
2 """
3 This module is a thin wrapper around librados.
4
5 Error codes from librados are turned into exceptions that subclass
6 :class:`Error`. Almost all methods may raise :class:`Error(the base class of all rados exceptions), :class:`PermissionError`
7 (the base class of all rados exceptions), :class:`PermissionError`
8 and :class:`IOError`, in addition to those documented for the
9 method.
10 """
11 # Copyright 2011 Josh Durgin
12 # Copyright 2011, Hannu Valtonen <hannu.valtonen@ormod.com>
13 # Copyright 2015 Hector Martin <marcan@marcan.st>
14 # Copyright 2016 Mehdi Abaakouk <sileht@redhat.com>
15
16 from cpython cimport PyObject, ref
17 from cpython.pycapsule cimport *
18 from libc cimport errno
19 from libc.stdint cimport *
20 from libc.stdlib cimport malloc, realloc, free
21 IF BUILD_DOC:
22 include "mock_rados.pxi"
23 ELSE:
24 from c_rados cimport *
25
26 import threading
27 import time
28
29 from datetime import datetime, timedelta
30 from functools import partial, wraps
31 from itertools import chain
32 from typing import Callable, Dict, List, Optional, Sequence, Tuple, Union
33
34 cdef extern from "Python.h":
35 # These are in cpython/string.pxd, but use "object" types instead of
36 # PyObject*, which invokes assumptions in cpython that we need to
37 # legitimately break to implement zero-copy string buffers in Ioctx.read().
38 # This is valid use of the Python API and documented as a special case.
39 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
40 char* PyBytes_AsString(PyObject *string) except NULL
41 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
42 void PyEval_InitThreads()
43
44 LIBRADOS_OP_FLAG_EXCL = _LIBRADOS_OP_FLAG_EXCL
45 LIBRADOS_OP_FLAG_FAILOK = _LIBRADOS_OP_FLAG_FAILOK
46 LIBRADOS_OP_FLAG_FADVISE_RANDOM = _LIBRADOS_OP_FLAG_FADVISE_RANDOM
47 LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL = _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL
48 LIBRADOS_OP_FLAG_FADVISE_WILLNEED = _LIBRADOS_OP_FLAG_FADVISE_WILLNEED
49 LIBRADOS_OP_FLAG_FADVISE_DONTNEED = _LIBRADOS_OP_FLAG_FADVISE_DONTNEED
50 LIBRADOS_OP_FLAG_FADVISE_NOCACHE = _LIBRADOS_OP_FLAG_FADVISE_NOCACHE
51
52 LIBRADOS_CMPXATTR_OP_EQ = _LIBRADOS_CMPXATTR_OP_EQ
53 LIBRADOS_CMPXATTR_OP_NE = _LIBRADOS_CMPXATTR_OP_NE
54 LIBRADOS_CMPXATTR_OP_GT = _LIBRADOS_CMPXATTR_OP_GT
55 LIBRADOS_CMPXATTR_OP_GTE = _LIBRADOS_CMPXATTR_OP_GTE
56 LIBRADOS_CMPXATTR_OP_LT = _LIBRADOS_CMPXATTR_OP_LT
57 LIBRADOS_CMPXATTR_OP_LTE = _LIBRADOS_CMPXATTR_OP_LTE
58
59 LIBRADOS_SNAP_HEAD = _LIBRADOS_SNAP_HEAD
60
61 LIBRADOS_OPERATION_NOFLAG = _LIBRADOS_OPERATION_NOFLAG
62 LIBRADOS_OPERATION_BALANCE_READS = _LIBRADOS_OPERATION_BALANCE_READS
63 LIBRADOS_OPERATION_LOCALIZE_READS = _LIBRADOS_OPERATION_LOCALIZE_READS
64 LIBRADOS_OPERATION_ORDER_READS_WRITES = _LIBRADOS_OPERATION_ORDER_READS_WRITES
65 LIBRADOS_OPERATION_IGNORE_CACHE = _LIBRADOS_OPERATION_IGNORE_CACHE
66 LIBRADOS_OPERATION_SKIPRWLOCKS = _LIBRADOS_OPERATION_SKIPRWLOCKS
67 LIBRADOS_OPERATION_IGNORE_OVERLAY = _LIBRADOS_OPERATION_IGNORE_OVERLAY
68
69 LIBRADOS_ALL_NSPACES = _LIBRADOS_ALL_NSPACES.decode('utf-8')
70
71 LIBRADOS_CREATE_EXCLUSIVE = _LIBRADOS_CREATE_EXCLUSIVE
72 LIBRADOS_CREATE_IDEMPOTENT = _LIBRADOS_CREATE_IDEMPOTENT
73
74 MAX_ERRNO = _MAX_ERRNO
75
76 ANONYMOUS_AUID = 0xffffffffffffffff
77 ADMIN_AUID = 0
78
79 OMAP_KEY_TYPE = Union[str,bytes]
80
81 class Error(Exception):
82 """ `Error` class, derived from `Exception` """
83 def __init__(self, message, errno=None):
84 super(Exception, self).__init__(message)
85 self.errno = errno
86
87 def __str__(self):
88 msg = super(Exception, self).__str__()
89 if self.errno is None:
90 return msg
91 return '[errno {0}] {1}'.format(self.errno, msg)
92
93 def __reduce__(self):
94 return (self.__class__, (self.message, self.errno))
95
96 class InvalidArgumentError(Error):
97 def __init__(self, message, errno=None):
98 super(InvalidArgumentError, self).__init__(
99 "RADOS invalid argument (%s)" % message, errno)
100
101 class ExtendMismatch(Error):
102 def __init__(self, message, errno, offset):
103 super().__init__(
104 "object content does not match (%s)" % message, errno)
105 self.offset = offset
106
107 class OSError(Error):
108 """ `OSError` class, derived from `Error` """
109 pass
110
111 class InterruptedOrTimeoutError(OSError):
112 """ `InterruptedOrTimeoutError` class, derived from `OSError` """
113 def __init__(self, message, errno=None):
114 super(InterruptedOrTimeoutError, self).__init__(
115 "RADOS interrupted or timeout (%s)" % message, errno)
116
117
118 class PermissionError(OSError):
119 """ `PermissionError` class, derived from `OSError` """
120 def __init__(self, message, errno=None):
121 super(PermissionError, self).__init__(
122 "RADOS permission error (%s)" % message, errno)
123
124
125 class PermissionDeniedError(OSError):
126 """ deal with EACCES related. """
127 def __init__(self, message, errno=None):
128 super(PermissionDeniedError, self).__init__(
129 "RADOS permission denied (%s)" % message, errno)
130
131
132 class ObjectNotFound(OSError):
133 """ `ObjectNotFound` class, derived from `OSError` """
134 def __init__(self, message, errno=None):
135 super(ObjectNotFound, self).__init__(
136 "RADOS object not found (%s)" % message, errno)
137
138
139 class NoData(OSError):
140 """ `NoData` class, derived from `OSError` """
141 def __init__(self, message, errno=None):
142 super(NoData, self).__init__(
143 "RADOS no data (%s)" % message, errno)
144
145
146 class ObjectExists(OSError):
147 """ `ObjectExists` class, derived from `OSError` """
148 def __init__(self, message, errno=None):
149 super(ObjectExists, self).__init__(
150 "RADOS object exists (%s)" % message, errno)
151
152
153 class ObjectBusy(OSError):
154 """ `ObjectBusy` class, derived from `IOError` """
155 def __init__(self, message, errno=None):
156 super(ObjectBusy, self).__init__(
157 "RADOS object busy (%s)" % message, errno)
158
159
160 class IOError(OSError):
161 """ `ObjectBusy` class, derived from `OSError` """
162 def __init__(self, message, errno=None):
163 super(IOError, self).__init__(
164 "RADOS I/O error (%s)" % message, errno)
165
166
167 class NoSpace(OSError):
168 """ `NoSpace` class, derived from `OSError` """
169 def __init__(self, message, errno=None):
170 super(NoSpace, self).__init__(
171 "RADOS no space (%s)" % message, errno)
172
173 class NotConnected(OSError):
174 """ `NotConnected` class, derived from `OSError` """
175 def __init__(self, message, errno=None):
176 super(NotConnected, self).__init__(
177 "RADOS not connected (%s)" % message, errno)
178
179 class RadosStateError(Error):
180 """ `RadosStateError` class, derived from `Error` """
181 def __init__(self, message, errno=None):
182 super(RadosStateError, self).__init__(
183 "RADOS rados state (%s)" % message, errno)
184
185
186 class IoctxStateError(Error):
187 """ `IoctxStateError` class, derived from `Error` """
188 def __init__(self, message, errno=None):
189 super(IoctxStateError, self).__init__(
190 "RADOS Ioctx state error (%s)" % message, errno)
191
192
193 class ObjectStateError(Error):
194 """ `ObjectStateError` class, derived from `Error` """
195 def __init__(self, message, errno=None):
196 super(ObjectStateError, self).__init__(
197 "RADOS object state error (%s)" % message, errno)
198
199
200 class LogicError(Error):
201 """ `` class, derived from `Error` """
202 def __init__(self, message, errno=None):
203 super(LogicError, self).__init__(
204 "RADOS logic error (%s)" % message, errno)
205
206
207 class TimedOut(OSError):
208 """ `TimedOut` class, derived from `OSError` """
209 def __init__(self, message, errno=None):
210 super(TimedOut, self).__init__(
211 "RADOS timed out (%s)" % message, errno)
212
213
214 class InProgress(Error):
215 """ `InProgress` class, derived from `Error` """
216 def __init__(self, message, errno=None):
217 super(InProgress, self).__init__(
218 "RADOS in progress error (%s)" % message, errno)
219
220
221 class IsConnected(Error):
222 """ `IsConnected` class, derived from `Error` """
223 def __init__(self, message, errno=None):
224 super(IsConnected, self).__init__(
225 "RADOS is connected error (%s)" % message, errno)
226
227
228 class ConnectionShutdown(OSError):
229 """ `ConnectionShutdown` class, derived from `OSError` """
230 def __init__(self, message, errno=None):
231 super(ConnectionShutdown, self).__init__(
232 "RADOS connection was shutdown (%s)" % message, errno)
233
234
235 IF UNAME_SYSNAME == "FreeBSD":
236 cdef errno_to_exception = {
237 errno.EPERM : PermissionError,
238 errno.ENOENT : ObjectNotFound,
239 errno.EIO : IOError,
240 errno.ENOSPC : NoSpace,
241 errno.EEXIST : ObjectExists,
242 errno.EBUSY : ObjectBusy,
243 errno.ENOATTR : NoData,
244 errno.EINTR : InterruptedOrTimeoutError,
245 errno.ETIMEDOUT : TimedOut,
246 errno.EACCES : PermissionDeniedError,
247 errno.EINPROGRESS : InProgress,
248 errno.EISCONN : IsConnected,
249 errno.EINVAL : InvalidArgumentError,
250 errno.ENOTCONN : NotConnected,
251 errno.ESHUTDOWN : ConnectionShutdown,
252 }
253 ELSE:
254 cdef errno_to_exception = {
255 errno.EPERM : PermissionError,
256 errno.ENOENT : ObjectNotFound,
257 errno.EIO : IOError,
258 errno.ENOSPC : NoSpace,
259 errno.EEXIST : ObjectExists,
260 errno.EBUSY : ObjectBusy,
261 errno.ENODATA : NoData,
262 errno.EINTR : InterruptedOrTimeoutError,
263 errno.ETIMEDOUT : TimedOut,
264 errno.EACCES : PermissionDeniedError,
265 errno.EINPROGRESS : InProgress,
266 errno.EISCONN : IsConnected,
267 errno.EINVAL : InvalidArgumentError,
268 errno.ENOTCONN : NotConnected,
269 errno.ESHUTDOWN : ConnectionShutdown,
270 }
271
272
273 cdef make_ex(ret: int, msg: str):
274 """
275 Translate a librados return code into an exception.
276
277 :param ret: the return code
278 :type ret: int
279 :param msg: the error message to use
280 :type msg: str
281 :returns: a subclass of :class:`Error`
282 """
283 ret = abs(ret)
284 if ret in errno_to_exception:
285 return errno_to_exception[ret](msg, errno=ret)
286 elif ret > MAX_ERRNO:
287 offset = ret - MAX_ERRNO
288 return ExtendMismatch(msg, ret, offset)
289 else:
290 return OSError(msg, errno=ret)
291
292
293 def cstr(val, name, encoding="utf-8", opt=False) -> Optional[bytes]:
294 """
295 Create a byte string from a Python string
296
297 :param basestring val: Python string
298 :param str name: Name of the string parameter, for exceptions
299 :param str encoding: Encoding to use
300 :param bool opt: If True, None is allowed
301 :raises: :class:`InvalidArgument`
302 """
303 if opt and val is None:
304 return None
305 if isinstance(val, bytes):
306 return val
307 elif isinstance(val, str):
308 return val.encode(encoding)
309 else:
310 raise TypeError('%s must be a string' % name)
311
312
313 def cstr_list(list_str, name, encoding="utf-8"):
314 return [cstr(s, name) for s in list_str]
315
316
317 def decode_cstr(val, encoding="utf-8") -> Optional[str]:
318 """
319 Decode a byte string into a Python string.
320
321 :param bytes val: byte string
322 """
323 if val is None:
324 return None
325
326 return val.decode(encoding)
327
328
329 def flatten_dict(d, name):
330 items = chain.from_iterable(d.items())
331 return cstr(''.join(i + '\0' for i in items), name)
332
333
334 cdef char* opt_str(s) except? NULL:
335 if s is None:
336 return NULL
337 return s
338
339
340 cdef void* realloc_chk(void* ptr, size_t size) except NULL:
341 cdef void *ret = realloc(ptr, size)
342 if ret == NULL:
343 raise MemoryError("realloc failed")
344 return ret
345
346
347 cdef size_t * to_csize_t_array(list_int):
348 cdef size_t *ret = <size_t *>malloc(len(list_int) * sizeof(size_t))
349 if ret == NULL:
350 raise MemoryError("malloc failed")
351 for i in range(len(list_int)):
352 ret[i] = <size_t>list_int[i]
353 return ret
354
355
356 cdef char ** to_bytes_array(list_bytes):
357 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
358 if ret == NULL:
359 raise MemoryError("malloc failed")
360 for i in range(len(list_bytes)):
361 ret[i] = <char *>list_bytes[i]
362 return ret
363
364 cdef int __monitor_callback(void *arg, const char *line, const char *who,
365 uint64_t sec, uint64_t nsec, uint64_t seq,
366 const char *level, const char *msg) with gil:
367 cdef object cb_info = <object>arg
368 cb_info[0](cb_info[1], line, who, sec, nsec, seq, level, msg)
369 return 0
370
371 cdef int __monitor_callback2(void *arg, const char *line, const char *channel,
372 const char *who,
373 const char *name,
374 uint64_t sec, uint64_t nsec, uint64_t seq,
375 const char *level, const char *msg) with gil:
376 cdef object cb_info = <object>arg
377 cb_info[0](cb_info[1], line, channel, name, who, sec, nsec, seq, level, msg)
378 return 0
379
380
381 class Version(object):
382 """ Version information """
383 def __init__(self, major, minor, extra):
384 self.major = major
385 self.minor = minor
386 self.extra = extra
387
388 def __str__(self):
389 return "%d.%d.%d" % (self.major, self.minor, self.extra)
390
391
392 cdef class Rados(object):
393 """This class wraps librados functions"""
394 # NOTE(sileht): attributes declared in .pyd
395
396 def __init__(self, *args, **kwargs):
397 PyEval_InitThreads()
398 self.__setup(*args, **kwargs)
399
400 NO_CONF_FILE = -1
401 "special value that indicates no conffile should be read when creating a mount handle"
402 DEFAULT_CONF_FILES = -2
403 "special value that indicates the default conffiles should be read when creating a mount handle"
404
405 def __setup(self,
406 rados_id: Optional[str] = None,
407 name: Optional[str] = None,
408 clustername: Optional[str] = None,
409 conf_defaults: Optional[Dict[str, str]] = None,
410 conffile: Union[str, int, None] = NO_CONF_FILE,
411 conf: Optional[Dict[str, str]] = None,
412 flags: int = 0,
413 context: object = None):
414 self.monitor_callback = None
415 self.monitor_callback2 = None
416 self.parsed_args = []
417 self.conf_defaults = conf_defaults
418 self.conffile = conffile
419 self.rados_id = rados_id
420
421 if rados_id and name:
422 raise Error("Rados(): can't supply both rados_id and name")
423 elif rados_id:
424 name = 'client.' + rados_id
425 elif name is None:
426 name = 'client.admin'
427 if clustername is None:
428 clustername = ''
429
430 name_raw = cstr(name, 'name')
431 clustername_raw = cstr(clustername, 'clustername')
432 cdef:
433 char *_name = name_raw
434 char *_clustername = clustername_raw
435 int _flags = flags
436 int ret
437
438 if context:
439 # Unpack void* (aka rados_config_t) from capsule
440 rados_config = <rados_config_t> PyCapsule_GetPointer(context, NULL)
441 with nogil:
442 ret = rados_create_with_context(&self.cluster, rados_config)
443 else:
444 with nogil:
445 ret = rados_create2(&self.cluster, _clustername, _name, _flags)
446 if ret != 0:
447 raise Error("rados_initialize failed with error code: %d" % ret)
448
449 self.state = "configuring"
450 # order is important: conf_defaults, then conffile, then conf
451 if conf_defaults:
452 for key, value in conf_defaults.items():
453 self.conf_set(key, value)
454 if conffile in (self.NO_CONF_FILE, None):
455 pass
456 elif conffile in (self.DEFAULT_CONF_FILES, ''):
457 self.conf_read_file(None)
458 else:
459 self.conf_read_file(conffile)
460 if conf:
461 for key, value in conf.items():
462 self.conf_set(key, value)
463
464 def get_addrs(self):
465 """
466 Get associated client addresses with this RADOS session.
467 """
468 self.require_state("configuring", "connected")
469
470 cdef:
471 char* addrs = NULL
472
473 try:
474
475 with nogil:
476 ret = rados_getaddrs(self.cluster, &addrs)
477 if ret:
478 raise make_ex(ret, "error calling getaddrs")
479
480 return decode_cstr(addrs)
481 finally:
482 free(addrs)
483
484 def require_state(self, *args):
485 """
486 Checks if the Rados object is in a special state
487
488 :raises: :class:`RadosStateError`
489 """
490 if self.state in args:
491 return
492 raise RadosStateError("You cannot perform that operation on a \
493 Rados object in state %s." % self.state)
494
495 def shutdown(self):
496 """
497 Disconnects from the cluster. Call this explicitly when a
498 Rados.connect()ed object is no longer used.
499 """
500 if self.state != "shutdown":
501 with nogil:
502 rados_shutdown(self.cluster)
503 self.state = "shutdown"
504
505 def __enter__(self):
506 self.connect()
507 return self
508
509 def __exit__(self, type_, value, traceback):
510 self.shutdown()
511 return False
512
513 def version(self) -> Version:
514 """
515 Get the version number of the ``librados`` C library.
516
517 :returns: a tuple of ``(major, minor, extra)`` components of the
518 librados version
519 """
520 cdef int major = 0
521 cdef int minor = 0
522 cdef int extra = 0
523 with nogil:
524 rados_version(&major, &minor, &extra)
525 return Version(major, minor, extra)
526
527 def conf_read_file(self, path: Optional[str] = None):
528 """
529 Configure the cluster handle using a Ceph config file.
530
531 :param path: path to the config file
532 """
533 self.require_state("configuring", "connected")
534 path_raw = cstr(path, 'path', opt=True)
535 cdef:
536 char *_path = opt_str(path_raw)
537 with nogil:
538 ret = rados_conf_read_file(self.cluster, _path)
539 if ret != 0:
540 raise make_ex(ret, "error calling conf_read_file")
541
542 def conf_parse_argv(self, args: Sequence[str]):
543 """
544 Parse known arguments from args, and remove; returned
545 args contain only those unknown to ceph
546 """
547 self.require_state("configuring", "connected")
548 if not args:
549 return
550
551 cargs = cstr_list(args, 'args')
552 cdef:
553 int _argc = len(args)
554 char **_argv = to_bytes_array(cargs)
555 char **_remargv = NULL
556
557 try:
558 _remargv = <char **>malloc(_argc * sizeof(char *))
559 with nogil:
560 ret = rados_conf_parse_argv_remainder(self.cluster, _argc,
561 <const char**>_argv,
562 <const char**>_remargv)
563 if ret:
564 raise make_ex(ret, "error calling conf_parse_argv_remainder")
565
566 # _remargv was allocated with fixed argc; collapse return
567 # list to eliminate any missing args
568 retargs = [decode_cstr(a) for a in _remargv[:_argc]
569 if a != NULL]
570 self.parsed_args = args
571 return retargs
572 finally:
573 free(_argv)
574 free(_remargv)
575
576 def conf_parse_env(self, var: Optional[str] = 'CEPH_ARGS'):
577 """
578 Parse known arguments from an environment variable, normally
579 CEPH_ARGS.
580 """
581 self.require_state("configuring", "connected")
582 if not var:
583 return
584
585 var_raw = cstr(var, 'var')
586 cdef:
587 char *_var = var_raw
588 with nogil:
589 ret = rados_conf_parse_env(self.cluster, _var)
590 if ret != 0:
591 raise make_ex(ret, "error calling conf_parse_env")
592
593 def conf_get(self, option: str) -> Optional[str]:
594 """
595 Get the value of a configuration option
596
597 :param option: which option to read
598
599 :returns: value of the option or None
600 :raises: :class:`TypeError`
601 """
602 self.require_state("configuring", "connected")
603 option_raw = cstr(option, 'option')
604 cdef:
605 char *_option = option_raw
606 size_t length = 20
607 char *ret_buf = NULL
608
609 try:
610 while True:
611 ret_buf = <char *>realloc_chk(ret_buf, length)
612 with nogil:
613 ret = rados_conf_get(self.cluster, _option, ret_buf, length)
614 if ret == 0:
615 return decode_cstr(ret_buf)
616 elif ret == -errno.ENAMETOOLONG:
617 length = length * 2
618 elif ret == -errno.ENOENT:
619 return None
620 else:
621 raise make_ex(ret, "error calling conf_get")
622 finally:
623 free(ret_buf)
624
625 def conf_set(self, option: str, val: str):
626 """
627 Set the value of a configuration option
628
629 :param option: which option to set
630 :param option: value of the option
631
632 :raises: :class:`TypeError`, :class:`ObjectNotFound`
633 """
634 self.require_state("configuring", "connected")
635 option_raw = cstr(option, 'option')
636 val_raw = cstr(val, 'val')
637 cdef:
638 char *_option = option_raw
639 char *_val = val_raw
640
641 with nogil:
642 ret = rados_conf_set(self.cluster, _option, _val)
643 if ret != 0:
644 raise make_ex(ret, "error calling conf_set")
645
646 def ping_monitor(self, mon_id: str):
647 """
648 Ping a monitor to assess liveness
649
650 May be used as a simply way to assess liveness, or to obtain
651 information about the monitor in a simple way even in the
652 absence of quorum.
653
654 :param mon_id: the ID portion of the monitor's name (i.e., mon.<ID>)
655 :returns: the string reply from the monitor
656 """
657
658 self.require_state("configuring", "connected")
659
660 mon_id_raw = cstr(mon_id, 'mon_id')
661 cdef:
662 char *_mon_id = mon_id_raw
663 size_t outstrlen = 0
664 char *outstr
665
666 with nogil:
667 ret = rados_ping_monitor(self.cluster, _mon_id, &outstr, &outstrlen)
668
669 if ret != 0:
670 raise make_ex(ret, "error calling ping_monitor")
671
672 if outstrlen:
673 my_outstr = outstr[:outstrlen]
674 rados_buffer_free(outstr)
675 return decode_cstr(my_outstr)
676
677 def connect(self, timeout: int = 0):
678 """
679 Connect to the cluster. Use shutdown() to release resources.
680
681 :param timeout: Any supplied timeout value is currently ignored.
682 """
683 self.require_state("configuring")
684 # NOTE(sileht): timeout was supported by old python API,
685 # but this is not something available in C API, so ignore
686 # for now and remove it later
687 with nogil:
688 ret = rados_connect(self.cluster)
689 if ret != 0:
690 raise make_ex(ret, "error connecting to the cluster")
691 self.state = "connected"
692
693 def get_instance_id(self) -> int:
694 """
695 Get a global id for current instance
696 """
697 self.require_state("connected")
698 with nogil:
699 ret = rados_get_instance_id(self.cluster)
700 return ret;
701
702 def get_cluster_stats(self) -> Dict[str, int]:
703 """
704 Read usage info about the cluster
705
706 This tells you total space, space used, space available, and number
707 of objects. These are not updated immediately when data is written,
708 they are eventually consistent.
709 :returns: contains the following keys:
710
711 - ``kb`` (int) - total space
712
713 - ``kb_used`` (int) - space used
714
715 - ``kb_avail`` (int) - free space available
716
717 - ``num_objects`` (int) - number of objects
718
719 """
720 cdef:
721 rados_cluster_stat_t stats
722
723 with nogil:
724 ret = rados_cluster_stat(self.cluster, &stats)
725
726 if ret < 0:
727 raise make_ex(
728 ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id)
729 return {'kb': stats.kb,
730 'kb_used': stats.kb_used,
731 'kb_avail': stats.kb_avail,
732 'num_objects': stats.num_objects}
733
734 def pool_exists(self, pool_name: str) -> bool:
735 """
736 Checks if a given pool exists.
737
738 :param pool_name: name of the pool to check
739
740 :raises: :class:`TypeError`, :class:`Error`
741 :returns: whether the pool exists, false otherwise.
742 """
743 self.require_state("connected")
744
745 pool_name_raw = cstr(pool_name, 'pool_name')
746 cdef:
747 char *_pool_name = pool_name_raw
748
749 with nogil:
750 ret = rados_pool_lookup(self.cluster, _pool_name)
751 if ret >= 0:
752 return True
753 elif ret == -errno.ENOENT:
754 return False
755 else:
756 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
757
758 def pool_lookup(self, pool_name: str) -> int:
759 """
760 Returns a pool's ID based on its name.
761
762 :param pool_name: name of the pool to look up
763
764 :raises: :class:`TypeError`, :class:`Error`
765 :returns: pool ID, or None if it doesn't exist
766 """
767 self.require_state("connected")
768 pool_name_raw = cstr(pool_name, 'pool_name')
769 cdef:
770 char *_pool_name = pool_name_raw
771
772 with nogil:
773 ret = rados_pool_lookup(self.cluster, _pool_name)
774 if ret >= 0:
775 return int(ret)
776 elif ret == -errno.ENOENT:
777 return None
778 else:
779 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
780
781 def pool_reverse_lookup(self, pool_id: int) -> Optional[str]:
782 """
783 Returns a pool's name based on its ID.
784
785 :param pool_id: ID of the pool to look up
786
787 :raises: :class:`TypeError`, :class:`Error`
788 :returns: pool name, or None if it doesn't exist
789 """
790 self.require_state("connected")
791 cdef:
792 int64_t _pool_id = pool_id
793 size_t size = 512
794 char *name = NULL
795
796 try:
797 while True:
798 name = <char *>realloc_chk(name, size)
799 with nogil:
800 ret = rados_pool_reverse_lookup(self.cluster, _pool_id, name, size)
801 if ret >= 0:
802 break
803 elif ret != -errno.ERANGE and size <= 4096:
804 size *= 2
805 elif ret == -errno.ENOENT:
806 return None
807 elif ret < 0:
808 raise make_ex(ret, "error reverse looking up pool '%s'" % pool_id)
809
810 return decode_cstr(name)
811
812 finally:
813 free(name)
814
815 def create_pool(self, pool_name: str,
816 crush_rule: Optional[int] = None,
817 auid: Optional[int] = None):
818 """
819 Create a pool:
820 - with default settings: if crush_rule=None and auid=None
821 - with a specific CRUSH rule: crush_rule given
822 - with a specific auid: auid given
823 - with a specific CRUSH rule and auid: crush_rule and auid given
824
825 :param pool_name: name of the pool to create
826 :param crush_rule: rule to use for placement in the new pool
827 :param auid: id of the owner of the new pool
828
829 :raises: :class:`TypeError`, :class:`Error`
830 """
831 self.require_state("connected")
832
833 pool_name_raw = cstr(pool_name, 'pool_name')
834 cdef:
835 char *_pool_name = pool_name_raw
836 uint8_t _crush_rule
837 uint64_t _auid
838
839 if crush_rule is None and auid is None:
840 with nogil:
841 ret = rados_pool_create(self.cluster, _pool_name)
842 elif crush_rule is not None and auid is None:
843 _crush_rule = crush_rule
844 with nogil:
845 ret = rados_pool_create_with_crush_rule(self.cluster, _pool_name, _crush_rule)
846 elif crush_rule is None and auid is not None:
847 _auid = auid
848 with nogil:
849 ret = rados_pool_create_with_auid(self.cluster, _pool_name, _auid)
850 else:
851 _crush_rule = crush_rule
852 _auid = auid
853 with nogil:
854 ret = rados_pool_create_with_all(self.cluster, _pool_name, _auid, _crush_rule)
855 if ret < 0:
856 raise make_ex(ret, "error creating pool '%s'" % pool_name)
857
858 def get_pool_base_tier(self, pool_id: int) -> int:
859 """
860 Get base pool
861
862 :returns: base pool, or pool_id if tiering is not configured for the pool
863 """
864 self.require_state("connected")
865 cdef:
866 int64_t base_tier = 0
867 int64_t _pool_id = pool_id
868
869 with nogil:
870 ret = rados_pool_get_base_tier(self.cluster, _pool_id, &base_tier)
871 if ret < 0:
872 raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id)
873 return int(base_tier)
874
875 def delete_pool(self, pool_name: str):
876 """
877 Delete a pool and all data inside it.
878
879 The pool is removed from the cluster immediately,
880 but the actual data is deleted in the background.
881
882 :param pool_name: name of the pool to delete
883
884 :raises: :class:`TypeError`, :class:`Error`
885 """
886 self.require_state("connected")
887
888 pool_name_raw = cstr(pool_name, 'pool_name')
889 cdef:
890 char *_pool_name = pool_name_raw
891
892 with nogil:
893 ret = rados_pool_delete(self.cluster, _pool_name)
894 if ret < 0:
895 raise make_ex(ret, "error deleting pool '%s'" % pool_name)
896
897 def get_inconsistent_pgs(self, pool_id: int) -> List[str]:
898 """
899 List inconsistent placement groups in the given pool
900
901 :param pool_id: ID of the pool in which PGs are listed
902 :returns: inconsistent placement groups
903 """
904 self.require_state("connected")
905 cdef:
906 int64_t pool = pool_id
907 size_t size = 512
908 char *pgs = NULL
909
910 try:
911 while True:
912 pgs = <char *>realloc_chk(pgs, size);
913 with nogil:
914 ret = rados_inconsistent_pg_list(self.cluster, pool,
915 pgs, size)
916 if ret > <int>size:
917 size *= 2
918 elif ret >= 0:
919 break
920 else:
921 raise make_ex(ret, "error calling inconsistent_pg_list")
922 return [pg for pg in decode_cstr(pgs[:ret]).split('\0') if pg]
923 finally:
924 free(pgs)
925
926 def list_pools(self) -> List[str]:
927 """
928 Gets a list of pool names.
929
930 :returns: list of pool names.
931 """
932 self.require_state("connected")
933 cdef:
934 size_t size = 512
935 char *c_names = NULL
936
937 try:
938 while True:
939 c_names = <char *>realloc_chk(c_names, size)
940 with nogil:
941 ret = rados_pool_list(self.cluster, c_names, size)
942 if ret > <int>size:
943 size *= 2
944 elif ret >= 0:
945 break
946 return [name for name in decode_cstr(c_names[:ret]).split('\0')
947 if name]
948 finally:
949 free(c_names)
950
951 def get_fsid(self) -> str:
952 """
953 Get the fsid of the cluster as a hexadecimal string.
954
955 :raises: :class:`Error`
956 :returns: cluster fsid
957 """
958 self.require_state("connected")
959 cdef:
960 char *ret_buf = NULL
961 size_t buf_len = 64
962
963 try:
964 while True:
965 ret_buf = <char *>realloc_chk(ret_buf, buf_len)
966 with nogil:
967 ret = rados_cluster_fsid(self.cluster, ret_buf, buf_len)
968 if ret == -errno.ERANGE:
969 buf_len = buf_len * 2
970 elif ret < 0:
971 raise make_ex(ret, "error getting cluster fsid")
972 else:
973 break
974 return decode_cstr(ret_buf)
975 finally:
976 free(ret_buf)
977
978 def open_ioctx(self, ioctx_name: str) -> Ioctx:
979 """
980 Create an io context
981
982 The io context allows you to perform operations within a particular
983 pool.
984
985 :param ioctx_name: name of the pool
986
987 :raises: :class:`TypeError`, :class:`Error`
988 :returns: Rados Ioctx object
989 """
990 self.require_state("connected")
991 ioctx_name_raw = cstr(ioctx_name, 'ioctx_name')
992 cdef:
993 rados_ioctx_t ioctx
994 char *_ioctx_name = ioctx_name_raw
995 with nogil:
996 ret = rados_ioctx_create(self.cluster, _ioctx_name, &ioctx)
997 if ret < 0:
998 raise make_ex(ret, "error opening pool '%s'" % ioctx_name)
999 io = Ioctx(self, ioctx_name)
1000 io.io = ioctx
1001 return io
1002
1003 def open_ioctx2(self, pool_id: int) -> Ioctx:
1004 """
1005 Create an io context
1006
1007 The io context allows you to perform operations within a particular
1008 pool.
1009
1010 :param pool_id: ID of the pool
1011
1012 :raises: :class:`TypeError`, :class:`Error`
1013 :returns: Rados Ioctx object
1014 """
1015 self.require_state("connected")
1016 cdef:
1017 rados_ioctx_t ioctx
1018 int64_t _pool_id = pool_id
1019 with nogil:
1020 ret = rados_ioctx_create2(self.cluster, _pool_id, &ioctx)
1021 if ret < 0:
1022 raise make_ex(ret, "error opening pool id '%s'" % pool_id)
1023 io = Ioctx(self, str(pool_id))
1024 io.io = ioctx
1025 return io
1026
1027 def mon_command(self,
1028 cmd: str,
1029 inbuf: bytes,
1030 timeout: int = 0,
1031 target: Optional[Union[str, int]] = None) -> Tuple[int, bytes, str]:
1032 """
1033 Send a command to the mon.
1034
1035 mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen)
1036
1037 :param cmd: JSON formatted string.
1038 :param inbuf: optional string.
1039 :param timeout: This parameter is ignored.
1040 :param target: name or rank of a specific mon. Optional
1041 :return: (int ret, string outbuf, string outs)
1042
1043 Example:
1044
1045 >>> import json
1046 >>> c = Rados(conffile='/etc/ceph/ceph.conf')
1047 >>> c.connect()
1048 >>> cmd = json.dumps({"prefix": "osd safe-to-destroy", "ids": ["2"], "format": "json"})
1049 >>> c.mon_command(cmd, b'')
1050 """
1051 # NOTE(sileht): timeout is ignored because C API doesn't provide
1052 # timeout argument, but we keep it for backward compat with old python binding
1053 self.require_state("connected")
1054 cmds = [cstr(cmd, 'cmd')]
1055
1056 if isinstance(target, int):
1057 # NOTE(sileht): looks weird but test_monmap_dump pass int
1058 target = str(target)
1059
1060 target = cstr(target, 'target', opt=True)
1061
1062 cdef:
1063 char *_target = opt_str(target)
1064 char **_cmd = to_bytes_array(cmds)
1065 size_t _cmdlen = len(cmds)
1066
1067 char *_inbuf = inbuf
1068 size_t _inbuf_len = len(inbuf)
1069
1070 char *_outbuf
1071 size_t _outbuf_len
1072 char *_outs
1073 size_t _outs_len
1074
1075 try:
1076 if target:
1077 with nogil:
1078 ret = rados_mon_command_target(self.cluster, _target,
1079 <const char **>_cmd, _cmdlen,
1080 <const char*>_inbuf, _inbuf_len,
1081 &_outbuf, &_outbuf_len,
1082 &_outs, &_outs_len)
1083 else:
1084 with nogil:
1085 ret = rados_mon_command(self.cluster,
1086 <const char **>_cmd, _cmdlen,
1087 <const char*>_inbuf, _inbuf_len,
1088 &_outbuf, &_outbuf_len,
1089 &_outs, &_outs_len)
1090
1091 my_outs = decode_cstr(_outs[:_outs_len])
1092 my_outbuf = _outbuf[:_outbuf_len]
1093 if _outs_len:
1094 rados_buffer_free(_outs)
1095 if _outbuf_len:
1096 rados_buffer_free(_outbuf)
1097 return (ret, my_outbuf, my_outs)
1098 finally:
1099 free(_cmd)
1100
1101 def osd_command(self,
1102 osdid: int,
1103 cmd: str,
1104 inbuf: bytes,
1105 timeout: int = 0) -> Tuple[int, bytes, str]:
1106 """
1107 osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1108
1109 :return: (int ret, string outbuf, string outs)
1110 """
1111 # NOTE(sileht): timeout is ignored because C API doesn't provide
1112 # timeout argument, but we keep it for backward compat with old python binding
1113 self.require_state("connected")
1114
1115 cmds = [cstr(cmd, 'cmd')]
1116
1117 cdef:
1118 int _osdid = osdid
1119 char **_cmd = to_bytes_array(cmds)
1120 size_t _cmdlen = len(cmds)
1121
1122 char *_inbuf = inbuf
1123 size_t _inbuf_len = len(inbuf)
1124
1125 char *_outbuf
1126 size_t _outbuf_len
1127 char *_outs
1128 size_t _outs_len
1129
1130 try:
1131 with nogil:
1132 ret = rados_osd_command(self.cluster, _osdid,
1133 <const char **>_cmd, _cmdlen,
1134 <const char*>_inbuf, _inbuf_len,
1135 &_outbuf, &_outbuf_len,
1136 &_outs, &_outs_len)
1137
1138 my_outs = decode_cstr(_outs[:_outs_len])
1139 my_outbuf = _outbuf[:_outbuf_len]
1140 if _outs_len:
1141 rados_buffer_free(_outs)
1142 if _outbuf_len:
1143 rados_buffer_free(_outbuf)
1144 return (ret, my_outbuf, my_outs)
1145 finally:
1146 free(_cmd)
1147
1148 def mgr_command(self,
1149 cmd: str,
1150 inbuf: bytes,
1151 timeout: int = 0,
1152 target: Optional[str] = None) -> Tuple[int, str, bytes]:
1153 """
1154 :return: (int ret, string outbuf, string outs)
1155 """
1156 # NOTE(sileht): timeout is ignored because C API doesn't provide
1157 # timeout argument, but we keep it for backward compat with old python binding
1158 self.require_state("connected")
1159
1160 cmds = [cstr(cmd, 'cmd')]
1161 target = cstr(target, 'target', opt=True)
1162
1163 cdef:
1164 char *_target = opt_str(target)
1165
1166 char **_cmd = to_bytes_array(cmds)
1167 size_t _cmdlen = len(cmds)
1168
1169 char *_inbuf = inbuf
1170 size_t _inbuf_len = len(inbuf)
1171
1172 char *_outbuf
1173 size_t _outbuf_len
1174 char *_outs
1175 size_t _outs_len
1176
1177 try:
1178 if target is not None:
1179 with nogil:
1180 ret = rados_mgr_command_target(self.cluster,
1181 <const char*>_target,
1182 <const char **>_cmd, _cmdlen,
1183 <const char*>_inbuf, _inbuf_len,
1184 &_outbuf, &_outbuf_len,
1185 &_outs, &_outs_len)
1186 else:
1187 with nogil:
1188 ret = rados_mgr_command(self.cluster,
1189 <const char **>_cmd, _cmdlen,
1190 <const char*>_inbuf, _inbuf_len,
1191 &_outbuf, &_outbuf_len,
1192 &_outs, &_outs_len)
1193
1194 my_outs = decode_cstr(_outs[:_outs_len])
1195 my_outbuf = _outbuf[:_outbuf_len]
1196 if _outs_len:
1197 rados_buffer_free(_outs)
1198 if _outbuf_len:
1199 rados_buffer_free(_outbuf)
1200 return (ret, my_outbuf, my_outs)
1201 finally:
1202 free(_cmd)
1203
1204 def pg_command(self,
1205 pgid: str,
1206 cmd: str,
1207 inbuf: bytes,
1208 timeout: int = 0) -> Tuple[int, bytes, str]:
1209 """
1210 pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1211
1212 :return: (int ret, string outbuf, string outs)
1213 """
1214 # NOTE(sileht): timeout is ignored because C API doesn't provide
1215 # timeout argument, but we keep it for backward compat with old python binding
1216 self.require_state("connected")
1217
1218 pgid_raw = cstr(pgid, 'pgid')
1219 cmds = [cstr(cmd, 'cmd')]
1220
1221 cdef:
1222 char *_pgid = pgid_raw
1223 char **_cmd = to_bytes_array(cmds)
1224 size_t _cmdlen = len(cmds)
1225
1226 char *_inbuf = inbuf
1227 size_t _inbuf_len = len(inbuf)
1228
1229 char *_outbuf
1230 size_t _outbuf_len
1231 char *_outs
1232 size_t _outs_len
1233
1234 try:
1235 with nogil:
1236 ret = rados_pg_command(self.cluster, _pgid,
1237 <const char **>_cmd, _cmdlen,
1238 <const char *>_inbuf, _inbuf_len,
1239 &_outbuf, &_outbuf_len,
1240 &_outs, &_outs_len)
1241
1242 my_outs = decode_cstr(_outs[:_outs_len])
1243 my_outbuf = _outbuf[:_outbuf_len]
1244 if _outs_len:
1245 rados_buffer_free(_outs)
1246 if _outbuf_len:
1247 rados_buffer_free(_outbuf)
1248 return (ret, my_outbuf, my_outs)
1249 finally:
1250 free(_cmd)
1251
1252 def wait_for_latest_osdmap(self) -> int:
1253 self.require_state("connected")
1254 with nogil:
1255 ret = rados_wait_for_latest_osdmap(self.cluster)
1256 return ret
1257
1258 def blocklist_add(self, client_address: str, expire_seconds: int = 0):
1259 """
1260 Blocklist a client from the OSDs
1261
1262 :param client_address: client address
1263 :param expire_seconds: number of seconds to blocklist
1264
1265 :raises: :class:`Error`
1266 """
1267 self.require_state("connected")
1268 client_address_raw = cstr(client_address, 'client_address')
1269 cdef:
1270 uint32_t _expire_seconds = expire_seconds
1271 char *_client_address = client_address_raw
1272
1273 with nogil:
1274 ret = rados_blocklist_add(self.cluster, _client_address, _expire_seconds)
1275 if ret < 0:
1276 raise make_ex(ret, "error blocklisting client '%s'" % client_address)
1277
1278 def monitor_log(self, level: str,
1279 callback: Optional[Callable[[object, str, str, str, int, int, int, str, str], None]] = None,
1280 arg: Optional[object] = None):
1281 if level not in MONITOR_LEVELS:
1282 raise LogicError("invalid monitor level " + level)
1283 if callback is not None and not callable(callback):
1284 raise LogicError("callback must be a callable function or None")
1285
1286 level_raw = cstr(level, 'level')
1287 cdef char *_level = level_raw
1288
1289 if callback is None:
1290 with nogil:
1291 r = rados_monitor_log(self.cluster, <const char*>_level, NULL, NULL)
1292 self.monitor_callback = None
1293 self.monitor_callback2 = None
1294 return
1295
1296 cb = (callback, arg)
1297 cdef PyObject* _arg = <PyObject*>cb
1298 with nogil:
1299 r = rados_monitor_log(self.cluster, <const char*>_level,
1300 <rados_log_callback_t>&__monitor_callback, _arg)
1301
1302 if r:
1303 raise make_ex(r, 'error calling rados_monitor_log')
1304 # NOTE(sileht): Prevents the callback method from being garbage collected
1305 self.monitor_callback = cb
1306 self.monitor_callback2 = None
1307
1308 def monitor_log2(self, level: str,
1309 callback: Optional[Callable[[object, str, str, str, str, int, int, int, str, str], None]] = None,
1310 arg: Optional[object] = None):
1311 if level not in MONITOR_LEVELS:
1312 raise LogicError("invalid monitor level " + level)
1313 if callback is not None and not callable(callback):
1314 raise LogicError("callback must be a callable function or None")
1315
1316 level_raw = cstr(level, 'level')
1317 cdef char *_level = level_raw
1318
1319 if callback is None:
1320 with nogil:
1321 r = rados_monitor_log2(self.cluster, <const char*>_level, NULL, NULL)
1322 self.monitor_callback = None
1323 self.monitor_callback2 = None
1324 return
1325
1326 cb = (callback, arg)
1327 cdef PyObject* _arg = <PyObject*>cb
1328 with nogil:
1329 r = rados_monitor_log2(self.cluster, <const char*>_level,
1330 <rados_log_callback2_t>&__monitor_callback2, _arg)
1331
1332 if r:
1333 raise make_ex(r, 'error calling rados_monitor_log')
1334 # NOTE(sileht): Prevents the callback method from being garbage collected
1335 self.monitor_callback = None
1336 self.monitor_callback2 = cb
1337
1338 def service_daemon_register(self, service: str, daemon: str, metadata: Dict[str, str]):
1339 """
1340 :param str service: service name (e.g. "rgw")
1341 :param str daemon: daemon name (e.g. "gwfoo")
1342 :param dict metadata: static metadata about the register daemon
1343 (e.g., the version of Ceph, the kernel version.)
1344 """
1345 service_raw = cstr(service, 'service')
1346 daemon_raw = cstr(daemon, 'daemon')
1347 metadata_dict = flatten_dict(metadata, 'metadata')
1348 cdef:
1349 char *_service = service_raw
1350 char *_daemon = daemon_raw
1351 char *_metadata = metadata_dict
1352
1353 with nogil:
1354 ret = rados_service_register(self.cluster, _service, _daemon, _metadata)
1355 if ret != 0:
1356 raise make_ex(ret, "error calling service_register()")
1357
1358 def service_daemon_update(self, status: Dict[str, str]):
1359 status_dict = flatten_dict(status, 'status')
1360 cdef:
1361 char *_status = status_dict
1362
1363 with nogil:
1364 ret = rados_service_update_status(self.cluster, _status)
1365 if ret != 0:
1366 raise make_ex(ret, "error calling service_daemon_update()")
1367
1368
1369 cdef class OmapIterator(object):
1370 """Omap iterator"""
1371
1372 cdef public Ioctx ioctx
1373 cdef public object omap_key_type
1374 cdef rados_omap_iter_t ctx
1375
1376 def __cinit__(self, Ioctx ioctx, omap_key_type):
1377 self.ioctx = ioctx
1378 self.omap_key_type = omap_key_type
1379
1380 def __iter__(self):
1381 return self
1382
1383 def __next__(self):
1384 """
1385 Get the next key-value pair in the object
1386 :returns: next rados.OmapItem
1387 """
1388 cdef:
1389 char *key_ = NULL
1390 char *val_ = NULL
1391 size_t len_
1392
1393 with nogil:
1394 ret = rados_omap_get_next(self.ctx, &key_, &val_, &len_)
1395
1396 if ret != 0:
1397 raise make_ex(ret, "error iterating over the omap")
1398 if key_ == NULL:
1399 raise StopIteration()
1400 key = self.omap_key_type(key_)
1401 val = None
1402 if val_ != NULL:
1403 val = val_[:len_]
1404 return (key, val)
1405
1406 def __dealloc__(self):
1407 with nogil:
1408 rados_omap_get_end(self.ctx)
1409
1410
1411 cdef class ObjectIterator(object):
1412 """rados.Ioctx Object iterator"""
1413
1414 cdef rados_list_ctx_t ctx
1415
1416 cdef public object ioctx
1417
1418 def __cinit__(self, Ioctx ioctx):
1419 self.ioctx = ioctx
1420
1421 with nogil:
1422 ret = rados_nobjects_list_open(ioctx.io, &self.ctx)
1423 if ret < 0:
1424 raise make_ex(ret, "error iterating over the objects in ioctx '%s'"
1425 % self.ioctx.name)
1426
1427 def __iter__(self):
1428 return self
1429
1430 def __next__(self):
1431 """
1432 Get the next object name and locator in the pool
1433
1434 :raises: StopIteration
1435 :returns: next rados.Ioctx Object
1436 """
1437 cdef:
1438 const char *key_ = NULL
1439 const char *locator_ = NULL
1440 const char *nspace_ = NULL
1441 size_t key_size_ = 0
1442 size_t locator_size_ = 0
1443 size_t nspace_size_ = 0
1444
1445 with nogil:
1446 ret = rados_nobjects_list_next2(self.ctx, &key_, &locator_, &nspace_,
1447 &key_size_, &locator_size_, &nspace_size_)
1448
1449 if ret < 0:
1450 raise StopIteration()
1451
1452 key = decode_cstr(key_[:key_size_])
1453 locator = decode_cstr(locator_[:locator_size_]) if locator_ != NULL else None
1454 nspace = decode_cstr(nspace_[:nspace_size_]) if nspace_ != NULL else None
1455 return Object(self.ioctx, key, locator, nspace)
1456
1457 def __dealloc__(self):
1458 with nogil:
1459 rados_nobjects_list_close(self.ctx)
1460
1461
1462 cdef class XattrIterator(object):
1463 """Extended attribute iterator"""
1464
1465 cdef rados_xattrs_iter_t it
1466 cdef char* _oid
1467
1468 cdef public Ioctx ioctx
1469 cdef public object oid
1470
1471 def __cinit__(self, Ioctx ioctx, oid):
1472 self.ioctx = ioctx
1473 self.oid = cstr(oid, 'oid')
1474 self._oid = self.oid
1475
1476 with nogil:
1477 ret = rados_getxattrs(ioctx.io, self._oid, &self.it)
1478 if ret != 0:
1479 raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid)
1480
1481 def __iter__(self):
1482 return self
1483
1484 def __next__(self):
1485 """
1486 Get the next xattr on the object
1487
1488 :raises: StopIteration
1489 :returns: pair - of name and value of the next Xattr
1490 """
1491 cdef:
1492 const char *name_ = NULL
1493 const char *val_ = NULL
1494 size_t len_ = 0
1495
1496 with nogil:
1497 ret = rados_getxattrs_next(self.it, &name_, &val_, &len_)
1498 if ret != 0:
1499 raise make_ex(ret, "error iterating over the extended attributes \
1500 in '%s'" % self.oid)
1501 if name_ == NULL:
1502 raise StopIteration()
1503 name = decode_cstr(name_)
1504 val = val_[:len_]
1505 return (name, val)
1506
1507 def __dealloc__(self):
1508 with nogil:
1509 rados_getxattrs_end(self.it)
1510
1511
1512 cdef class SnapIterator(object):
1513 """Snapshot iterator"""
1514
1515 cdef public Ioctx ioctx
1516
1517 cdef rados_snap_t *snaps
1518 cdef int max_snap
1519 cdef int cur_snap
1520
1521 def __cinit__(self, Ioctx ioctx):
1522 self.ioctx = ioctx
1523 # We don't know how big a buffer we need until we've called the
1524 # function. So use the exponential doubling strategy.
1525 cdef int num_snaps = 10
1526 while True:
1527 self.snaps = <rados_snap_t*>realloc_chk(self.snaps,
1528 num_snaps *
1529 sizeof(rados_snap_t))
1530
1531 with nogil:
1532 ret = rados_ioctx_snap_list(ioctx.io, self.snaps, num_snaps)
1533 if ret >= 0:
1534 self.max_snap = ret
1535 break
1536 elif ret != -errno.ERANGE:
1537 raise make_ex(ret, "error calling rados_snap_list for \
1538 ioctx '%s'" % self.ioctx.name)
1539 num_snaps = num_snaps * 2
1540 self.cur_snap = 0
1541
1542 def __iter__(self) -> 'SnapIterator':
1543 return self
1544
1545 def __next__(self) -> 'Snap':
1546 """
1547 Get the next Snapshot
1548
1549 :raises: :class:`Error`, StopIteration
1550 :returns: next snapshot
1551 """
1552 if self.cur_snap >= self.max_snap:
1553 raise StopIteration
1554
1555 cdef:
1556 rados_snap_t snap_id = self.snaps[self.cur_snap]
1557 int name_len = 10
1558 char *name = NULL
1559
1560 try:
1561 while True:
1562 name = <char *>realloc_chk(name, name_len)
1563 with nogil:
1564 ret = rados_ioctx_snap_get_name(self.ioctx.io, snap_id, name, name_len)
1565 if ret == 0:
1566 break
1567 elif ret != -errno.ERANGE:
1568 raise make_ex(ret, "rados_snap_get_name error")
1569 else:
1570 name_len = name_len * 2
1571
1572 snap = Snap(self.ioctx, decode_cstr(name[:name_len]).rstrip('\0'), snap_id)
1573 self.cur_snap = self.cur_snap + 1
1574 return snap
1575 finally:
1576 free(name)
1577
1578
1579 cdef class Snap(object):
1580 """Snapshot object"""
1581 cdef public Ioctx ioctx
1582 cdef public object name
1583
1584 # NOTE(sileht): old API was storing the ctypes object
1585 # instead of the value ....
1586 cdef public rados_snap_t snap_id
1587
1588 def __cinit__(self, Ioctx ioctx, object name, rados_snap_t snap_id):
1589 self.ioctx = ioctx
1590 self.name = name
1591 self.snap_id = snap_id
1592
1593 def __str__(self):
1594 return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \
1595 % (str(self.ioctx), self.name, self.snap_id)
1596
1597 def get_timestamp(self) -> float:
1598 """
1599 Find when a snapshot in the current pool occurred
1600
1601 :raises: :class:`Error`
1602 :returns: the data and time the snapshot was created
1603 """
1604 cdef time_t snap_time
1605
1606 with nogil:
1607 ret = rados_ioctx_snap_get_stamp(self.ioctx.io, self.snap_id, &snap_time)
1608 if ret != 0:
1609 raise make_ex(ret, "rados_ioctx_snap_get_stamp error")
1610 return datetime.fromtimestamp(snap_time)
1611
1612 # https://github.com/cython/cython/issues/1370
1613 unicode = str
1614
1615 cdef class Completion(object):
1616 """completion object"""
1617
1618 cdef public:
1619 Ioctx ioctx
1620 object oncomplete
1621 object onsafe
1622
1623 cdef:
1624 rados_callback_t complete_cb
1625 rados_callback_t safe_cb
1626 rados_completion_t rados_comp
1627 PyObject* buf
1628
1629 def __cinit__(self, Ioctx ioctx, object oncomplete, object onsafe):
1630 self.oncomplete = oncomplete
1631 self.onsafe = onsafe
1632 self.ioctx = ioctx
1633
1634 def is_safe(self) -> bool:
1635 """
1636 Is an asynchronous operation safe?
1637
1638 This does not imply that the safe callback has finished.
1639
1640 :returns: True if the operation is safe
1641 """
1642 return self.is_complete()
1643
1644 def is_complete(self) -> bool:
1645 """
1646 Has an asynchronous operation completed?
1647
1648 This does not imply that the safe callback has finished.
1649
1650 :returns: True if the operation is completed
1651 """
1652 with nogil:
1653 ret = rados_aio_is_complete(self.rados_comp)
1654 return ret == 1
1655
1656 def wait_for_safe(self):
1657 """
1658 Wait for an asynchronous operation to be marked safe
1659
1660 wait_for_safe() is an alias of wait_for_complete() since Luminous
1661 """
1662 self.wait_for_complete()
1663
1664 def wait_for_complete(self):
1665 """
1666 Wait for an asynchronous operation to complete
1667
1668 This does not imply that the complete callback has finished.
1669 """
1670 with nogil:
1671 rados_aio_wait_for_complete(self.rados_comp)
1672
1673 def wait_for_safe_and_cb(self):
1674 """
1675 Wait for an asynchronous operation to be marked safe and for
1676 the safe callback to have returned
1677 """
1678 return self.wait_for_complete_and_cb()
1679
1680 def wait_for_complete_and_cb(self):
1681 """
1682 Wait for an asynchronous operation to complete and for the
1683 complete callback to have returned
1684
1685 :returns: whether the operation is completed
1686 """
1687 with nogil:
1688 ret = rados_aio_wait_for_complete_and_cb(self.rados_comp)
1689 return ret
1690
1691 def get_return_value(self) -> int:
1692 """
1693 Get the return value of an asychronous operation
1694
1695 The return value is set when the operation is complete or safe,
1696 whichever comes first.
1697
1698 :returns: return value of the operation
1699 """
1700 with nogil:
1701 ret = rados_aio_get_return_value(self.rados_comp)
1702 return ret
1703
1704 def __dealloc__(self):
1705 """
1706 Release a completion
1707
1708 Call this when you no longer need the completion. It may not be
1709 freed immediately if the operation is not acked and committed.
1710 """
1711 ref.Py_XDECREF(self.buf)
1712 self.buf = NULL
1713 if self.rados_comp != NULL:
1714 with nogil:
1715 rados_aio_release(self.rados_comp)
1716 self.rados_comp = NULL
1717
1718 def _complete(self):
1719 self.oncomplete(self)
1720 if self.onsafe:
1721 self.onsafe(self)
1722 self._cleanup()
1723
1724 def _cleanup(self):
1725 with self.ioctx.lock:
1726 if self.oncomplete:
1727 self.ioctx.complete_completions.remove(self)
1728 if self.onsafe:
1729 self.ioctx.safe_completions.remove(self)
1730
1731
1732 class OpCtx(object):
1733 def __enter__(self):
1734 return self.create()
1735
1736 def __exit__(self, type, msg, traceback):
1737 self.release()
1738
1739
1740 cdef class WriteOp(object):
1741 cdef rados_write_op_t write_op
1742
1743 def create(self):
1744 with nogil:
1745 self.write_op = rados_create_write_op()
1746 return self
1747
1748 def release(self):
1749 with nogil:
1750 rados_release_write_op(self.write_op)
1751
1752 def new(self, exclusive: Optional[int] = None):
1753 """
1754 Create the object.
1755 """
1756
1757 cdef:
1758 int _exclusive = exclusive
1759
1760 with nogil:
1761 rados_write_op_create(self.write_op, _exclusive, NULL)
1762
1763
1764 def remove(self):
1765 """
1766 Remove object.
1767 """
1768 with nogil:
1769 rados_write_op_remove(self.write_op)
1770
1771 def set_flags(self, flags: int = LIBRADOS_OPERATION_NOFLAG):
1772 """
1773 Set flags for the last operation added to this write_op.
1774 :para flags: flags to apply to the last operation
1775 """
1776
1777 cdef:
1778 int _flags = flags
1779
1780 with nogil:
1781 rados_write_op_set_flags(self.write_op, _flags)
1782
1783 def set_xattr(self, xattr_name: str, xattr_value: bytes):
1784 """
1785 Set an extended attribute on an object.
1786 :param xattr_name: name of the xattr
1787 :param xattr_value: buffer to set xattr to
1788 """
1789 xattr_name_raw = cstr(xattr_name, 'xattr_name')
1790 cdef:
1791 char *_xattr_name = xattr_name_raw
1792 char *_xattr_value = xattr_value
1793 size_t _xattr_value_len = len(xattr_value)
1794 with nogil:
1795 rados_write_op_setxattr(self.write_op, _xattr_name, _xattr_value, _xattr_value_len)
1796
1797 def rm_xattr(self, xattr_name: str):
1798 """
1799 Removes an extended attribute on from an object.
1800 :param xattr_name: name of the xattr to remove
1801 """
1802 xattr_name_raw = cstr(xattr_name, 'xattr_name')
1803 cdef:
1804 char *_xattr_name = xattr_name_raw
1805 with nogil:
1806 rados_write_op_rmxattr(self.write_op, _xattr_name)
1807
1808 def append(self, to_write: bytes):
1809 """
1810 Append data to an object synchronously
1811 :param to_write: data to write
1812 """
1813
1814 cdef:
1815 char *_to_write = to_write
1816 size_t length = len(to_write)
1817
1818 with nogil:
1819 rados_write_op_append(self.write_op, _to_write, length)
1820
1821 def write_full(self, to_write: bytes):
1822 """
1823 Write whole object, atomically replacing it.
1824 :param to_write: data to write
1825 """
1826
1827 cdef:
1828 char *_to_write = to_write
1829 size_t length = len(to_write)
1830
1831 with nogil:
1832 rados_write_op_write_full(self.write_op, _to_write, length)
1833
1834 def write(self, to_write: bytes, offset: int = 0):
1835 """
1836 Write to offset.
1837 :param to_write: data to write
1838 :param offset: byte offset in the object to begin writing at
1839 """
1840
1841 cdef:
1842 char *_to_write = to_write
1843 size_t length = len(to_write)
1844 uint64_t _offset = offset
1845
1846 with nogil:
1847 rados_write_op_write(self.write_op, _to_write, length, _offset)
1848
1849 def assert_version(self, version: int):
1850 """
1851 Check if object's version is the expected one.
1852 :param version: expected version of the object
1853 :param type: int
1854 """
1855 cdef:
1856 uint64_t _version = version
1857
1858 with nogil:
1859 rados_write_op_assert_version(self.write_op, _version)
1860
1861 def zero(self, offset: int, length: int):
1862 """
1863 Zero part of an object.
1864 :param offset: byte offset in the object to begin writing at
1865 :param offset: number of zero to write
1866 """
1867
1868 cdef:
1869 size_t _length = length
1870 uint64_t _offset = offset
1871
1872 with nogil:
1873 rados_write_op_zero(self.write_op, _length, _offset)
1874
1875 def truncate(self, offset: int):
1876 """
1877 Truncate an object.
1878 :param offset: byte offset in the object to begin truncating at
1879 """
1880
1881 cdef:
1882 uint64_t _offset = offset
1883
1884 with nogil:
1885 rados_write_op_truncate(self.write_op, _offset)
1886
1887 def execute(self, cls: str, method: str, data: bytes):
1888 """
1889 Execute an OSD class method on an object
1890
1891 :param cls: name of the object class
1892 :param method: name of the method
1893 :param data: input data
1894 """
1895
1896 cls_raw = cstr(cls, 'cls')
1897 method_raw = cstr(method, 'method')
1898 cdef:
1899 char *_cls = cls_raw
1900 char *_method = method_raw
1901 char *_data = data
1902 size_t _data_len = len(data)
1903
1904 with nogil:
1905 rados_write_op_exec(self.write_op, _cls, _method, _data, _data_len, NULL)
1906
1907 def writesame(self, to_write: bytes, write_len: int, offset: int = 0):
1908 """
1909 Write the same buffer multiple times
1910 :param to_write: data to write
1911 :param write_len: total number of bytes to write
1912 :param offset: byte offset in the object to begin writing at
1913 """
1914 cdef:
1915 char *_to_write = to_write
1916 size_t _data_len = len(to_write)
1917 size_t _write_len = write_len
1918 uint64_t _offset = offset
1919 with nogil:
1920 rados_write_op_writesame(self.write_op, _to_write, _data_len, _write_len, _offset)
1921
1922 def cmpext(self, cmp_buf: bytes, offset: int = 0):
1923 """
1924 Ensure that given object range (extent) satisfies comparison
1925 :param cmp_buf: buffer containing bytes to be compared with object contents
1926 :param offset: object byte offset at which to start the comparison
1927 """
1928 cdef:
1929 char *_cmp_buf = cmp_buf
1930 size_t _cmp_buf_len = len(cmp_buf)
1931 uint64_t _offset = offset
1932 with nogil:
1933 rados_write_op_cmpext(self.write_op, _cmp_buf, _cmp_buf_len, _offset, NULL)
1934
1935 def omap_cmp(self, key: OMAP_KEY_TYPE, val: OMAP_KEY_TYPE, cmp_op: int = LIBRADOS_CMPXATTR_OP_EQ):
1936 """
1937 Ensure that an omap key value satisfies comparison
1938 :param key: omap key whose associated value is evaluated for comparison
1939 :param val: value to compare with
1940 :param cmp_op: comparison operator, one of LIBRADOS_CMPXATTR_OP_EQ (1),
1941 LIBRADOS_CMPXATTR_OP_GT (3), or LIBRADOS_CMPXATTR_OP_LT (5).
1942 """
1943 key_raw = cstr(key, 'key')
1944 val_raw = cstr(val, 'val')
1945 cdef:
1946 char *_key = key_raw
1947 char *_val = val_raw
1948 size_t _val_len = len(val)
1949 uint8_t _comparison_operator = cmp_op
1950 with nogil:
1951 rados_write_op_omap_cmp(self.write_op, _key, _comparison_operator, _val, _val_len, NULL)
1952
1953 class WriteOpCtx(WriteOp, OpCtx):
1954 """write operation context manager"""
1955
1956
1957 cdef class ReadOp(object):
1958 cdef rados_read_op_t read_op
1959
1960 def create(self):
1961 with nogil:
1962 self.read_op = rados_create_read_op()
1963 return self
1964
1965 def release(self):
1966 with nogil:
1967 rados_release_read_op(self.read_op)
1968
1969 def cmpext(self, cmp_buf: bytes, offset: int = 0):
1970 """
1971 Ensure that given object range (extent) satisfies comparison
1972 :param cmp_buf: buffer containing bytes to be compared with object contents
1973 :param offset: object byte offset at which to start the comparison
1974 """
1975 cdef:
1976 char *_cmp_buf = cmp_buf
1977 size_t _cmp_buf_len = len(cmp_buf)
1978 uint64_t _offset = offset
1979 with nogil:
1980 rados_read_op_cmpext(self.read_op, _cmp_buf, _cmp_buf_len, _offset, NULL)
1981
1982 def set_flags(self, flags: int = LIBRADOS_OPERATION_NOFLAG):
1983 """
1984 Set flags for the last operation added to this read_op.
1985 :para flags: flags to apply to the last operation
1986 """
1987
1988 cdef:
1989 int _flags = flags
1990
1991 with nogil:
1992 rados_read_op_set_flags(self.read_op, _flags)
1993
1994
1995 class ReadOpCtx(ReadOp, OpCtx):
1996 """read operation context manager"""
1997
1998
1999 cdef void __watch_callback(void *_arg, int64_t _notify_id, uint64_t _cookie,
2000 uint64_t _notifier_id, void *_data,
2001 size_t _data_len) with gil:
2002 """
2003 Watch callback
2004 """
2005 cdef object watch = <object>_arg
2006 data = None
2007 if _data != NULL:
2008 data = (<char *>_data)[:_data_len]
2009 watch._callback(_notify_id, _notifier_id, _cookie, data)
2010
2011 cdef void __watch_error_callback(void *_arg, uint64_t _cookie,
2012 int _error) with gil:
2013 """
2014 Watch error callback
2015 """
2016 cdef object watch = <object>_arg
2017 watch._error_callback(_cookie, _error)
2018
2019
2020 cdef class Watch(object):
2021 """watch object"""
2022
2023 cdef:
2024 object id
2025 Ioctx ioctx
2026 object oid
2027 object callback
2028 object error_callback
2029
2030 def __cinit__(self, Ioctx ioctx, object oid, object callback,
2031 object error_callback, object timeout):
2032 self.id = 0
2033 self.ioctx = ioctx.dup()
2034 self.oid = cstr(oid, 'oid')
2035 self.callback = callback
2036 self.error_callback = error_callback
2037
2038 if timeout is None:
2039 timeout = 0
2040
2041 cdef:
2042 char *_oid = self.oid
2043 uint64_t _cookie;
2044 uint32_t _timeout = timeout;
2045 void *_args = <PyObject*>self
2046
2047 with nogil:
2048 ret = rados_watch3(self.ioctx.io, _oid, &_cookie,
2049 <rados_watchcb2_t>&__watch_callback,
2050 <rados_watcherrcb_t>&__watch_error_callback,
2051 _timeout, _args)
2052 if ret < 0:
2053 raise make_ex(ret, "watch error")
2054
2055 self.id = int(_cookie);
2056
2057 def __enter__(self):
2058 return self
2059
2060 def __exit__(self, type_, value, traceback):
2061 self.close()
2062 return False
2063
2064 def __dealloc__(self):
2065 if self.id == 0:
2066 return
2067 self.ioctx.rados.require_state("connected")
2068 self.close()
2069
2070 def _callback(self, notify_id, notifier_id, watch_id, data):
2071 replay = self.callback(notify_id, notifier_id, watch_id, data)
2072
2073 cdef:
2074 rados_ioctx_t _io = <rados_ioctx_t>self.ioctx.io
2075 char *_obj = self.oid
2076 int64_t _notify_id = notify_id
2077 uint64_t _cookie = watch_id
2078 char *_replay = NULL
2079 int _replay_len = 0
2080
2081 if replay is not None:
2082 replay = cstr(replay, 'replay')
2083 _replay = replay
2084 _replaylen = len(replay)
2085
2086 with nogil:
2087 rados_notify_ack(_io, _obj, _notify_id, _cookie, _replay,
2088 _replaylen)
2089
2090 def _error_callback(self, watch_id, error):
2091 if self.error_callback is None:
2092 return
2093 self.error_callback(watch_id, error)
2094
2095 def get_id(self) -> int:
2096 return self.id
2097
2098 def check(self):
2099 """
2100 Check on watch validity.
2101
2102 :raises: :class:`Error`
2103 :returns: timedelta since last confirmed valid
2104 """
2105 self.ioctx.require_ioctx_open()
2106
2107 cdef:
2108 uint64_t _cookie = self.id
2109
2110 with nogil:
2111 ret = rados_watch_check(self.ioctx.io, _cookie)
2112 if ret < 0:
2113 raise make_ex(ret, "check error")
2114
2115 return timedelta(milliseconds=ret)
2116
2117 def close(self):
2118 """
2119 Unregister an interest in an object.
2120
2121 :raises: :class:`Error`
2122 """
2123 if self.id == 0:
2124 return
2125
2126 self.ioctx.require_ioctx_open()
2127
2128 cdef:
2129 uint64_t _cookie = self.id
2130
2131 with nogil:
2132 ret = rados_unwatch2(self.ioctx.io, _cookie)
2133 if ret < 0 and ret != -errno.ENOENT:
2134 raise make_ex(ret, "unwatch error")
2135 self.id = 0
2136
2137 with nogil:
2138 cluster = rados_ioctx_get_cluster(self.ioctx.io)
2139 ret = rados_watch_flush(cluster);
2140 if ret < 0:
2141 raise make_ex(ret, "watch_flush error")
2142
2143 self.ioctx.close()
2144
2145
2146 cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
2147 """
2148 Callback to oncomplete() for asynchronous operations
2149 """
2150 cdef object cb = <object>args
2151 cb._complete()
2152 return 0
2153
2154 cdef class Ioctx(object):
2155 """rados.Ioctx object"""
2156 # NOTE(sileht): attributes declared in .pyd
2157
2158 def __init__(self, rados, name):
2159 self.rados = rados
2160 self.name = name
2161 self.state = "open"
2162
2163 self.locator_key = ""
2164 self.nspace = ""
2165 self.lock = threading.Lock()
2166 self.safe_completions = []
2167 self.complete_completions = []
2168
2169 def __enter__(self):
2170 return self
2171
2172 def __exit__(self, type_, value, traceback):
2173 self.close()
2174 return False
2175
2176 def __dealloc__(self):
2177 self.close()
2178
2179 def __track_completion(self, completion_obj):
2180 if completion_obj.oncomplete:
2181 with self.lock:
2182 self.complete_completions.append(completion_obj)
2183 if completion_obj.onsafe:
2184 with self.lock:
2185 self.safe_completions.append(completion_obj)
2186
2187 def __get_completion(self,
2188 oncomplete: Callable[[Completion], None],
2189 onsafe: Callable[[Completion], None]):
2190 """
2191 Constructs a completion to use with asynchronous operations
2192
2193 :param oncomplete: what to do when the write is safe and complete in memory
2194 on all replicas
2195 :param onsafe: what to do when the write is safe and complete on storage
2196 on all replicas
2197
2198 :raises: :class:`Error`
2199 :returns: completion object
2200 """
2201
2202 completion_obj = Completion(self, oncomplete, onsafe)
2203
2204 cdef:
2205 rados_callback_t complete_cb = NULL
2206 rados_completion_t completion
2207 PyObject* p_completion_obj= <PyObject*>completion_obj
2208
2209 if oncomplete:
2210 complete_cb = <rados_callback_t>&__aio_complete_cb
2211
2212 with nogil:
2213 ret = rados_aio_create_completion2(p_completion_obj, complete_cb,
2214 &completion)
2215 if ret < 0:
2216 raise make_ex(ret, "error getting a completion")
2217
2218 completion_obj.rados_comp = completion
2219 return completion_obj
2220
2221 def dup(self):
2222 """
2223 Duplicate IoCtx
2224 """
2225
2226 ioctx = self.rados.open_ioctx2(self.get_pool_id())
2227 ioctx.set_namespace(self.get_namespace())
2228 return ioctx
2229
2230 def aio_stat(self,
2231 object_name: str,
2232 oncomplete: Callable[[Completion, Optional[int], Optional[time.struct_time]], None]) -> Completion:
2233 """
2234 Asynchronously get object stats (size/mtime)
2235
2236 oncomplete will be called with the returned size and mtime
2237 as well as the completion:
2238
2239 oncomplete(completion, size, mtime)
2240
2241 :param object_name: the name of the object to get stats from
2242 :param oncomplete: what to do when the stat is complete
2243
2244 :raises: :class:`Error`
2245 :returns: completion object
2246 """
2247
2248 object_name_raw = cstr(object_name, 'object_name')
2249
2250 cdef:
2251 Completion completion
2252 char *_object_name = object_name_raw
2253 uint64_t psize
2254 time_t pmtime
2255
2256 def oncomplete_(completion_v):
2257 cdef Completion _completion_v = completion_v
2258 return_value = _completion_v.get_return_value()
2259 if return_value >= 0:
2260 return oncomplete(_completion_v, psize, time.localtime(pmtime))
2261 else:
2262 return oncomplete(_completion_v, None, None)
2263
2264 completion = self.__get_completion(oncomplete_, None)
2265 self.__track_completion(completion)
2266 with nogil:
2267 ret = rados_aio_stat(self.io, _object_name, completion.rados_comp,
2268 &psize, &pmtime)
2269
2270 if ret < 0:
2271 completion._cleanup()
2272 raise make_ex(ret, "error stating %s" % object_name)
2273 return completion
2274
2275 def aio_write(self, object_name: str, to_write: bytes, offset: int = 0,
2276 oncomplete: Optional[Callable[[Completion], None]] = None,
2277 onsafe: Optional[Callable[[Completion], None]] = None) -> Completion:
2278 """
2279 Write data to an object asynchronously
2280
2281 Queues the write and returns.
2282
2283 :param object_name: name of the object
2284 :param to_write: data to write
2285 :param offset: byte offset in the object to begin writing at
2286 :param oncomplete: what to do when the write is safe and complete in memory
2287 on all replicas
2288 :param onsafe: what to do when the write is safe and complete on storage
2289 on all replicas
2290
2291 :raises: :class:`Error`
2292 :returns: completion object
2293 """
2294
2295 object_name_raw = cstr(object_name, 'object_name')
2296
2297 cdef:
2298 Completion completion
2299 char* _object_name = object_name_raw
2300 char* _to_write = to_write
2301 size_t size = len(to_write)
2302 uint64_t _offset = offset
2303
2304 completion = self.__get_completion(oncomplete, onsafe)
2305 self.__track_completion(completion)
2306 with nogil:
2307 ret = rados_aio_write(self.io, _object_name, completion.rados_comp,
2308 _to_write, size, _offset)
2309 if ret < 0:
2310 completion._cleanup()
2311 raise make_ex(ret, "error writing object %s" % object_name)
2312 return completion
2313
2314 def aio_write_full(self, object_name: str, to_write: bytes,
2315 oncomplete: Optional[Callable] = None,
2316 onsafe: Optional[Callable] = None) -> Completion:
2317 """
2318 Asynchronously write an entire object
2319
2320 The object is filled with the provided data. If the object exists,
2321 it is atomically truncated and then written.
2322 Queues the write and returns.
2323
2324 :param object_name: name of the object
2325 :param to_write: data to write
2326 :param oncomplete: what to do when the write is safe and complete in memory
2327 on all replicas
2328 :param onsafe: what to do when the write is safe and complete on storage
2329 on all replicas
2330
2331 :raises: :class:`Error`
2332 :returns: completion object
2333 """
2334
2335 object_name_raw = cstr(object_name, 'object_name')
2336
2337 cdef:
2338 Completion completion
2339 char* _object_name = object_name_raw
2340 char* _to_write = to_write
2341 size_t size = len(to_write)
2342
2343 completion = self.__get_completion(oncomplete, onsafe)
2344 self.__track_completion(completion)
2345 with nogil:
2346 ret = rados_aio_write_full(self.io, _object_name,
2347 completion.rados_comp,
2348 _to_write, size)
2349 if ret < 0:
2350 completion._cleanup()
2351 raise make_ex(ret, "error writing object %s" % object_name)
2352 return completion
2353
2354 def aio_writesame(self, object_name: str, to_write: bytes,
2355 write_len: int, offset: int = 0,
2356 oncomplete: Optional[Callable] = None) -> Completion:
2357 """
2358 Asynchronously write the same buffer multiple times
2359
2360 :param object_name: name of the object
2361 :param to_write: data to write
2362 :param write_len: total number of bytes to write
2363 :param offset: byte offset in the object to begin writing at
2364 :param oncomplete: what to do when the writesame is safe and
2365 complete in memory on all replicas
2366 :raises: :class:`Error`
2367 :returns: completion object
2368 """
2369
2370 object_name_raw = cstr(object_name, 'object_name')
2371
2372 cdef:
2373 Completion completion
2374 char* _object_name = object_name_raw
2375 char* _to_write = to_write
2376 size_t _data_len = len(to_write)
2377 size_t _write_len = write_len
2378 uint64_t _offset = offset
2379
2380 completion = self.__get_completion(oncomplete, None)
2381 self.__track_completion(completion)
2382 with nogil:
2383 ret = rados_aio_writesame(self.io, _object_name, completion.rados_comp,
2384 _to_write, _data_len, _write_len, _offset)
2385
2386 if ret < 0:
2387 completion._cleanup()
2388 raise make_ex(ret, "error writing object %s" % object_name)
2389 return completion
2390
2391 def aio_append(self, object_name: str, to_append: bytes,
2392 oncomplete: Optional[Callable] = None,
2393 onsafe: Optional[Callable] = None) -> Completion:
2394 """
2395 Asynchronously append data to an object
2396
2397 Queues the write and returns.
2398
2399 :param object_name: name of the object
2400 :param to_append: data to append
2401 :param offset: byte offset in the object to begin writing at
2402 :param oncomplete: what to do when the write is safe and complete in memory
2403 on all replicas
2404 :param onsafe: what to do when the write is safe and complete on storage
2405 on all replicas
2406
2407 :raises: :class:`Error`
2408 :returns: completion object
2409 """
2410 object_name_raw = cstr(object_name, 'object_name')
2411
2412 cdef:
2413 Completion completion
2414 char* _object_name = object_name_raw
2415 char* _to_append = to_append
2416 size_t size = len(to_append)
2417
2418 completion = self.__get_completion(oncomplete, onsafe)
2419 self.__track_completion(completion)
2420 with nogil:
2421 ret = rados_aio_append(self.io, _object_name,
2422 completion.rados_comp,
2423 _to_append, size)
2424 if ret < 0:
2425 completion._cleanup()
2426 raise make_ex(ret, "error appending object %s" % object_name)
2427 return completion
2428
2429 def aio_flush(self):
2430 """
2431 Block until all pending writes in an io context are safe
2432
2433 :raises: :class:`Error`
2434 """
2435 with nogil:
2436 ret = rados_aio_flush(self.io)
2437 if ret < 0:
2438 raise make_ex(ret, "error flushing")
2439
2440 def aio_cmpext(self, object_name: str, cmp_buf: bytes, offset: int = 0,
2441 oncomplete: Optional[Callable] = None) -> Completion:
2442 """
2443 Asynchronously compare an on-disk object range with a buffer
2444 :param object_name: the name of the object
2445 :param cmp_buf: buffer containing bytes to be compared with object contents
2446 :param offset: object byte offset at which to start the comparison
2447 :param oncomplete: what to do when the write is safe and complete in memory
2448 on all replicas
2449
2450 :raises: :class:`TypeError`
2451 returns: 0 - on success, negative error code on failure,
2452 (-MAX_ERRNO - mismatch_off) on mismatch
2453 """
2454 object_name_raw = cstr(object_name, 'object_name')
2455
2456 cdef:
2457 Completion completion
2458 char* _object_name = object_name_raw
2459 char* _cmp_buf = cmp_buf
2460 size_t _cmp_buf_len = len(cmp_buf)
2461 uint64_t _offset = offset
2462
2463 completion = self.__get_completion(oncomplete, None)
2464 self.__track_completion(completion)
2465
2466 with nogil:
2467 ret = rados_aio_cmpext(self.io, _object_name, completion.rados_comp,
2468 _cmp_buf, _cmp_buf_len, _offset)
2469
2470 if ret < 0:
2471 completion._cleanup()
2472 raise make_ex(ret, "failed to compare %s" % object_name)
2473 return completion
2474
2475 def aio_rmxattr(self, object_name: str, xattr_name: str,
2476 oncomplete: Optional[Callable] = None) -> Completion:
2477 """
2478 Asynchronously delete an extended attribute from an object
2479
2480 :param object_name: the name of the object to remove xattr from
2481 :param xattr_name: which extended attribute to remove
2482 :param oncomplete: what to do when the rmxattr completes
2483
2484 :raises: :class:`Error`
2485 :returns: completion object
2486 """
2487 object_name_raw = cstr(object_name, 'object_name')
2488 xattr_name_raw = cstr(xattr_name , 'xattr_name')
2489
2490 cdef:
2491 Completion completion
2492 char* _object_name = object_name_raw
2493 char* _xattr_name = xattr_name_raw
2494
2495 completion = self.__get_completion(oncomplete, None)
2496 self.__track_completion(completion)
2497 with nogil:
2498 ret = rados_aio_rmxattr(self.io, _object_name,
2499 completion.rados_comp, _xattr_name)
2500
2501 if ret < 0:
2502 completion._cleanup()
2503 raise make_ex(ret, "Failed to remove xattr %r" % xattr_name)
2504 return completion
2505
2506 def aio_read(self, object_name: str, length: int, offset: int,
2507 oncomplete: Optional[Callable] = None) -> Completion:
2508 """
2509 Asynchronously read data from an object
2510
2511 oncomplete will be called with the returned read value as
2512 well as the completion:
2513
2514 oncomplete(completion, data_read)
2515
2516 :param object_name: name of the object to read from
2517 :param length: the number of bytes to read
2518 :param offset: byte offset in the object to begin reading from
2519 :param oncomplete: what to do when the read is complete
2520
2521 :raises: :class:`Error`
2522 :returns: completion object
2523 """
2524
2525 object_name_raw = cstr(object_name, 'object_name')
2526
2527 cdef:
2528 Completion completion
2529 char* _object_name = object_name_raw
2530 uint64_t _offset = offset
2531
2532 char *ref_buf
2533 size_t _length = length
2534
2535 def oncomplete_(completion_v):
2536 cdef Completion _completion_v = completion_v
2537 return_value = _completion_v.get_return_value()
2538 if return_value > 0 and return_value != length:
2539 _PyBytes_Resize(&_completion_v.buf, return_value)
2540 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2541
2542 completion = self.__get_completion(oncomplete_, None)
2543 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2544 ret_buf = PyBytes_AsString(completion.buf)
2545 self.__track_completion(completion)
2546 with nogil:
2547 ret = rados_aio_read(self.io, _object_name, completion.rados_comp,
2548 ret_buf, _length, _offset)
2549 if ret < 0:
2550 completion._cleanup()
2551 raise make_ex(ret, "error reading %s" % object_name)
2552 return completion
2553
2554 def aio_execute(self, object_name: str, cls: str, method: str,
2555 data: bytes, length: int = 8192,
2556 oncomplete: Optional[Callable[[Completion, bytes], None]] = None,
2557 onsafe: Optional[Callable[[Completion, bytes], None]] = None) -> Completion:
2558 """
2559 Asynchronously execute an OSD class method on an object.
2560
2561 oncomplete and onsafe will be called with the data returned from
2562 the plugin as well as the completion:
2563
2564 oncomplete(completion, data)
2565 onsafe(completion, data)
2566
2567 :param object_name: name of the object
2568 :param cls: name of the object class
2569 :param method: name of the method
2570 :param data: input data
2571 :param length: size of output buffer in bytes (default=8192)
2572 :param oncomplete: what to do when the execution is complete
2573 :param onsafe: what to do when the execution is safe and complete
2574
2575 :raises: :class:`Error`
2576 :returns: completion object
2577 """
2578
2579 object_name_raw = cstr(object_name, 'object_name')
2580 cls_raw = cstr(cls, 'cls')
2581 method_raw = cstr(method, 'method')
2582 cdef:
2583 Completion completion
2584 char *_object_name = object_name_raw
2585 char *_cls = cls_raw
2586 char *_method = method_raw
2587 char *_data = data
2588 size_t _data_len = len(data)
2589
2590 char *ref_buf
2591 size_t _length = length
2592
2593 def oncomplete_(completion_v):
2594 cdef Completion _completion_v = completion_v
2595 return_value = _completion_v.get_return_value()
2596 if return_value > 0 and return_value != length:
2597 _PyBytes_Resize(&_completion_v.buf, return_value)
2598 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2599
2600 def onsafe_(completion_v):
2601 cdef Completion _completion_v = completion_v
2602 return_value = _completion_v.get_return_value()
2603 return onsafe(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2604
2605 completion = self.__get_completion(oncomplete_ if oncomplete else None, onsafe_ if onsafe else None)
2606 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2607 ret_buf = PyBytes_AsString(completion.buf)
2608 self.__track_completion(completion)
2609 with nogil:
2610 ret = rados_aio_exec(self.io, _object_name, completion.rados_comp,
2611 _cls, _method, _data, _data_len, ret_buf, _length)
2612 if ret < 0:
2613 completion._cleanup()
2614 raise make_ex(ret, "error executing %s::%s on %s" % (cls, method, object_name))
2615 return completion
2616
2617 def aio_setxattr(self, object_name: str, xattr_name: str, xattr_value: bytes,
2618 oncomplete: Optional[Callable] = None) -> Completion:
2619 """
2620 Asynchronously set an extended attribute on an object
2621
2622 :param object_name: the name of the object to set xattr to
2623 :param xattr_name: which extended attribute to set
2624 :param xattr_value: the value of the extended attribute
2625 :param oncomplete: what to do when the setxttr completes
2626
2627 :raises: :class:`Error`
2628 :returns: completion object
2629 """
2630 object_name_raw = cstr(object_name, 'object_name')
2631 xattr_name_raw = cstr(xattr_name , 'xattr_name')
2632
2633 cdef:
2634 Completion completion
2635 char* _object_name = object_name_raw
2636 char* _xattr_name = xattr_name_raw
2637 char* _xattr_value = xattr_value
2638 size_t xattr_value_len = len(xattr_value)
2639
2640 completion = self.__get_completion(oncomplete, None)
2641 self.__track_completion(completion)
2642 with nogil:
2643 ret = rados_aio_setxattr(self.io, _object_name,
2644 completion.rados_comp,
2645 _xattr_name, _xattr_value, xattr_value_len)
2646
2647 if ret < 0:
2648 completion._cleanup()
2649 raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
2650 return completion
2651
2652 def aio_remove(self, object_name: str,
2653 oncomplete: Optional[Callable] = None,
2654 onsafe: Optional[Callable] = None) -> Completion:
2655 """
2656 Asynchronously remove an object
2657
2658 :param object_name: name of the object to remove
2659 :param oncomplete: what to do when the remove is safe and complete in memory
2660 on all replicas
2661 :param onsafe: what to do when the remove is safe and complete on storage
2662 on all replicas
2663
2664 :raises: :class:`Error`
2665 :returns: completion object
2666 """
2667 object_name_raw = cstr(object_name, 'object_name')
2668
2669 cdef:
2670 Completion completion
2671 char* _object_name = object_name_raw
2672
2673 completion = self.__get_completion(oncomplete, onsafe)
2674 self.__track_completion(completion)
2675 with nogil:
2676 ret = rados_aio_remove(self.io, _object_name,
2677 completion.rados_comp)
2678 if ret < 0:
2679 completion._cleanup()
2680 raise make_ex(ret, "error removing %s" % object_name)
2681 return completion
2682
2683 def require_ioctx_open(self):
2684 """
2685 Checks if the rados.Ioctx object state is 'open'
2686
2687 :raises: IoctxStateError
2688 """
2689 if self.state != "open":
2690 raise IoctxStateError("The pool is %s" % self.state)
2691
2692 def set_locator_key(self, loc_key: str):
2693 """
2694 Set the key for mapping objects to pgs within an io context.
2695
2696 The key is used instead of the object name to determine which
2697 placement groups an object is put in. This affects all subsequent
2698 operations of the io context - until a different locator key is
2699 set, all objects in this io context will be placed in the same pg.
2700
2701 :param loc_key: the key to use as the object locator, or NULL to discard
2702 any previously set key
2703
2704 :raises: :class:`TypeError`
2705 """
2706 self.require_ioctx_open()
2707 cloc_key = cstr(loc_key, 'loc_key')
2708 cdef char *_loc_key = cloc_key
2709 with nogil:
2710 rados_ioctx_locator_set_key(self.io, _loc_key)
2711 self.locator_key = loc_key
2712
2713 def get_locator_key(self) -> str:
2714 """
2715 Get the locator_key of context
2716
2717 :returns: locator_key
2718 """
2719 return self.locator_key
2720
2721 def set_read(self, snap_id: int):
2722 """
2723 Set the snapshot for reading objects.
2724
2725 To stop to read from snapshot, use set_read(LIBRADOS_SNAP_HEAD)
2726
2727 :param snap_id: the snapshot Id
2728
2729 :raises: :class:`TypeError`
2730 """
2731 self.require_ioctx_open()
2732 cdef rados_snap_t _snap_id = snap_id
2733 with nogil:
2734 rados_ioctx_snap_set_read(self.io, _snap_id)
2735
2736 def set_namespace(self, nspace: str):
2737 """
2738 Set the namespace for objects within an io context.
2739
2740 The namespace in addition to the object name fully identifies
2741 an object. This affects all subsequent operations of the io context
2742 - until a different namespace is set, all objects in this io context
2743 will be placed in the same namespace.
2744
2745 :param nspace: the namespace to use, or None/"" for the default namespace
2746
2747 :raises: :class:`TypeError`
2748 """
2749 self.require_ioctx_open()
2750 if nspace is None:
2751 nspace = ""
2752 cnspace = cstr(nspace, 'nspace')
2753 cdef char *_nspace = cnspace
2754 with nogil:
2755 rados_ioctx_set_namespace(self.io, _nspace)
2756 self.nspace = nspace
2757
2758 def get_namespace(self) -> str:
2759 """
2760 Get the namespace of context
2761
2762 :returns: namespace
2763 """
2764 return self.nspace
2765
2766 def close(self):
2767 """
2768 Close a rados.Ioctx object.
2769
2770 This just tells librados that you no longer need to use the io context.
2771 It may not be freed immediately if there are pending asynchronous
2772 requests on it, but you should not use an io context again after
2773 calling this function on it.
2774 """
2775 if self.state == "open":
2776 self.require_ioctx_open()
2777 with nogil:
2778 rados_ioctx_destroy(self.io)
2779 self.state = "closed"
2780
2781
2782 def write(self, key: str, data: bytes, offset: int = 0):
2783 """
2784 Write data to an object synchronously
2785
2786 :param key: name of the object
2787 :param data: data to write
2788 :param offset: byte offset in the object to begin writing at
2789
2790 :raises: :class:`TypeError`
2791 :raises: :class:`LogicError`
2792 :returns: int - 0 on success
2793 """
2794 self.require_ioctx_open()
2795
2796 key_raw = cstr(key, 'key')
2797 cdef:
2798 char *_key = key_raw
2799 char *_data = data
2800 size_t length = len(data)
2801 uint64_t _offset = offset
2802
2803 with nogil:
2804 ret = rados_write(self.io, _key, _data, length, _offset)
2805 if ret == 0:
2806 return ret
2807 elif ret < 0:
2808 raise make_ex(ret, "Ioctx.write(%s): failed to write %s"
2809 % (self.name, key))
2810 else:
2811 raise LogicError("Ioctx.write(%s): rados_write \
2812 returned %d, but should return zero on success." % (self.name, ret))
2813
2814 def write_full(self, key: str, data: bytes):
2815 """
2816 Write an entire object synchronously.
2817
2818 The object is filled with the provided data. If the object exists,
2819 it is atomically truncated and then written.
2820
2821 :param key: name of the object
2822 :param data: data to write
2823
2824 :raises: :class:`TypeError`
2825 :raises: :class:`Error`
2826 :returns: int - 0 on success
2827 """
2828 self.require_ioctx_open()
2829 key_raw = cstr(key, 'key')
2830 cdef:
2831 char *_key = key_raw
2832 char *_data = data
2833 size_t length = len(data)
2834
2835 with nogil:
2836 ret = rados_write_full(self.io, _key, _data, length)
2837 if ret == 0:
2838 return ret
2839 elif ret < 0:
2840 raise make_ex(ret, "Ioctx.write_full(%s): failed to write %s"
2841 % (self.name, key))
2842 else:
2843 raise LogicError("Ioctx.write_full(%s): rados_write_full \
2844 returned %d, but should return zero on success." % (self.name, ret))
2845
2846 def writesame(self, key: str, data: bytes, write_len: int, offset: int = 0):
2847 """
2848 Write the same buffer multiple times
2849 :param key: name of the object
2850 :param data: data to write
2851 :param write_len: total number of bytes to write
2852 :param offset: byte offset in the object to begin writing at
2853
2854 :raises: :class:`TypeError`
2855 :raises: :class:`LogicError`
2856 """
2857 self.require_ioctx_open()
2858
2859 key_raw = cstr(key, 'key')
2860 cdef:
2861 char *_key = key_raw
2862 char *_data = data
2863 size_t _data_len = len(data)
2864 size_t _write_len = write_len
2865 uint64_t _offset = offset
2866
2867 with nogil:
2868 ret = rados_writesame(self.io, _key, _data, _data_len, _write_len, _offset)
2869 if ret < 0:
2870 raise make_ex(ret, "Ioctx.writesame(%s): failed to write %s"
2871 % (self.name, key))
2872 assert(ret == 0)
2873
2874 def append(self, key: str, data: bytes):
2875 """
2876 Append data to an object synchronously
2877
2878 :param key: name of the object
2879 :param data: data to write
2880
2881 :raises: :class:`TypeError`
2882 :raises: :class:`LogicError`
2883 :returns: int - 0 on success
2884 """
2885 self.require_ioctx_open()
2886 key_raw = cstr(key, 'key')
2887 cdef:
2888 char *_key = key_raw
2889 char *_data = data
2890 size_t length = len(data)
2891
2892 with nogil:
2893 ret = rados_append(self.io, _key, _data, length)
2894 if ret == 0:
2895 return ret
2896 elif ret < 0:
2897 raise make_ex(ret, "Ioctx.append(%s): failed to append %s"
2898 % (self.name, key))
2899 else:
2900 raise LogicError("Ioctx.append(%s): rados_append \
2901 returned %d, but should return zero on success." % (self.name, ret))
2902
2903 def read(self, key: str, length: int = 8192, offset: int = 0) -> bytes:
2904 """
2905 Read data from an object synchronously
2906
2907 :param key: name of the object
2908 :param length: the number of bytes to read (default=8192)
2909 :param offset: byte offset in the object to begin reading at
2910
2911 :raises: :class:`TypeError`
2912 :raises: :class:`Error`
2913 :returns: data read from object
2914 """
2915 self.require_ioctx_open()
2916 key_raw = cstr(key, 'key')
2917 cdef:
2918 char *_key = key_raw
2919 char *ret_buf
2920 uint64_t _offset = offset
2921 size_t _length = length
2922 PyObject* ret_s = NULL
2923
2924 ret_s = PyBytes_FromStringAndSize(NULL, length)
2925 try:
2926 ret_buf = PyBytes_AsString(ret_s)
2927 with nogil:
2928 ret = rados_read(self.io, _key, ret_buf, _length, _offset)
2929 if ret < 0:
2930 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2931
2932 if ret != length:
2933 _PyBytes_Resize(&ret_s, ret)
2934
2935 return <object>ret_s
2936 finally:
2937 # We DECREF unconditionally: the cast to object above will have
2938 # INCREFed if necessary. This also takes care of exceptions,
2939 # including if _PyString_Resize fails (that will free the string
2940 # itself and set ret_s to NULL, hence XDECREF).
2941 ref.Py_XDECREF(ret_s)
2942
2943 def execute(self, key: str, cls: str, method: str, data: bytes, length: int = 8192) -> Tuple[int, object]:
2944 """
2945 Execute an OSD class method on an object.
2946
2947 :param key: name of the object
2948 :param cls: name of the object class
2949 :param method: name of the method
2950 :param data: input data
2951 :param length: size of output buffer in bytes (default=8192)
2952
2953 :raises: :class:`TypeError`
2954 :raises: :class:`Error`
2955 :returns: (ret, method output)
2956 """
2957 self.require_ioctx_open()
2958
2959 key_raw = cstr(key, 'key')
2960 cls_raw = cstr(cls, 'cls')
2961 method_raw = cstr(method, 'method')
2962 cdef:
2963 char *_key = key_raw
2964 char *_cls = cls_raw
2965 char *_method = method_raw
2966 char *_data = data
2967 size_t _data_len = len(data)
2968
2969 char *ref_buf
2970 size_t _length = length
2971 PyObject* ret_s = NULL
2972
2973 ret_s = PyBytes_FromStringAndSize(NULL, length)
2974 try:
2975 ret_buf = PyBytes_AsString(ret_s)
2976 with nogil:
2977 ret = rados_exec(self.io, _key, _cls, _method, _data,
2978 _data_len, ret_buf, _length)
2979 if ret < 0:
2980 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2981
2982 if ret != length:
2983 _PyBytes_Resize(&ret_s, ret)
2984
2985 return ret, <object>ret_s
2986 finally:
2987 # We DECREF unconditionally: the cast to object above will have
2988 # INCREFed if necessary. This also takes care of exceptions,
2989 # including if _PyString_Resize fails (that will free the string
2990 # itself and set ret_s to NULL, hence XDECREF).
2991 ref.Py_XDECREF(ret_s)
2992
2993 def get_stats(self) -> Dict[str, int]:
2994 """
2995 Get pool usage statistics
2996
2997 :returns: dict contains the following keys:
2998
2999 - ``num_bytes`` (int) - size of pool in bytes
3000
3001 - ``num_kb`` (int) - size of pool in kbytes
3002
3003 - ``num_objects`` (int) - number of objects in the pool
3004
3005 - ``num_object_clones`` (int) - number of object clones
3006
3007 - ``num_object_copies`` (int) - number of object copies
3008
3009 - ``num_objects_missing_on_primary`` (int) - number of objects
3010 missing on primary
3011
3012 - ``num_objects_unfound`` (int) - number of unfound objects
3013
3014 - ``num_objects_degraded`` (int) - number of degraded objects
3015
3016 - ``num_rd`` (int) - bytes read
3017
3018 - ``num_rd_kb`` (int) - kbytes read
3019
3020 - ``num_wr`` (int) - bytes written
3021
3022 - ``num_wr_kb`` (int) - kbytes written
3023 """
3024 self.require_ioctx_open()
3025 cdef rados_pool_stat_t stats
3026 with nogil:
3027 ret = rados_ioctx_pool_stat(self.io, &stats)
3028 if ret < 0:
3029 raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name)
3030 return {'num_bytes': stats.num_bytes,
3031 'num_kb': stats.num_kb,
3032 'num_objects': stats.num_objects,
3033 'num_object_clones': stats.num_object_clones,
3034 'num_object_copies': stats.num_object_copies,
3035 "num_objects_missing_on_primary": stats.num_objects_missing_on_primary,
3036 "num_objects_unfound": stats.num_objects_unfound,
3037 "num_objects_degraded": stats.num_objects_degraded,
3038 "num_rd": stats.num_rd,
3039 "num_rd_kb": stats.num_rd_kb,
3040 "num_wr": stats.num_wr,
3041 "num_wr_kb": stats.num_wr_kb}
3042
3043 def remove_object(self, key: str) -> bool:
3044 """
3045 Delete an object
3046
3047 This does not delete any snapshots of the object.
3048
3049 :param key: the name of the object to delete
3050
3051 :raises: :class:`TypeError`
3052 :raises: :class:`Error`
3053 :returns: True on success
3054 """
3055 self.require_ioctx_open()
3056 key_raw = cstr(key, 'key')
3057 cdef:
3058 char *_key = key_raw
3059
3060 with nogil:
3061 ret = rados_remove(self.io, _key)
3062 if ret < 0:
3063 raise make_ex(ret, "Failed to remove '%s'" % key)
3064 return True
3065
3066 def trunc(self, key: str, size: int) -> int:
3067 """
3068 Resize an object
3069
3070 If this enlarges the object, the new area is logically filled with
3071 zeroes. If this shrinks the object, the excess data is removed.
3072
3073 :param key: the name of the object to resize
3074 :param size: the new size of the object in bytes
3075
3076 :raises: :class:`TypeError`
3077 :raises: :class:`Error`
3078 :returns: 0 on success, otherwise raises error
3079 """
3080
3081 self.require_ioctx_open()
3082 key_raw = cstr(key, 'key')
3083 cdef:
3084 char *_key = key_raw
3085 uint64_t _size = size
3086
3087 with nogil:
3088 ret = rados_trunc(self.io, _key, _size)
3089 if ret < 0:
3090 raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))
3091 return ret
3092
3093 def cmpext(self, key: str, cmp_buf: bytes, offset: int = 0) -> int:
3094 '''
3095 Compare an on-disk object range with a buffer
3096 :param key: the name of the object
3097 :param cmp_buf: buffer containing bytes to be compared with object contents
3098 :param offset: object byte offset at which to start the comparison
3099
3100 :raises: :class:`TypeError`
3101 :raises: :class:`Error`
3102 :returns: 0 - on success, negative error code on failure,
3103 (-MAX_ERRNO - mismatch_off) on mismatch
3104 '''
3105 self.require_ioctx_open()
3106 key_raw = cstr(key, 'key')
3107 cdef:
3108 char *_key = key_raw
3109 char *_cmp_buf = cmp_buf
3110 size_t _cmp_buf_len = len(cmp_buf)
3111 uint64_t _offset = offset
3112 with nogil:
3113 ret = rados_cmpext(self.io, _key, _cmp_buf, _cmp_buf_len, _offset)
3114 assert ret < -MAX_ERRNO or ret == 0, "Ioctx.cmpext(%s): failed to compare %s" % (self.name, key)
3115 return ret
3116
3117 def stat(self, key: str) -> Tuple[int, time.struct_time]:
3118 """
3119 Get object stats (size/mtime)
3120
3121 :param key: the name of the object to get stats from
3122
3123 :raises: :class:`TypeError`
3124 :raises: :class:`Error`
3125 :returns: (size,timestamp)
3126 """
3127 self.require_ioctx_open()
3128
3129 key_raw = cstr(key, 'key')
3130 cdef:
3131 char *_key = key_raw
3132 uint64_t psize
3133 time_t pmtime
3134
3135 with nogil:
3136 ret = rados_stat(self.io, _key, &psize, &pmtime)
3137 if ret < 0:
3138 raise make_ex(ret, "Failed to stat %r" % key)
3139 return psize, time.localtime(pmtime)
3140
3141 def get_xattr(self, key: str, xattr_name: str) -> bytes:
3142 """
3143 Get the value of an extended attribute on an object.
3144
3145 :param key: the name of the object to get xattr from
3146 :param xattr_name: which extended attribute to read
3147
3148 :raises: :class:`TypeError`
3149 :raises: :class:`Error`
3150 :returns: value of the xattr
3151 """
3152 self.require_ioctx_open()
3153
3154 key_raw = cstr(key, 'key')
3155 xattr_name_raw = cstr(xattr_name, 'xattr_name')
3156 cdef:
3157 char *_key = key_raw
3158 char *_xattr_name = xattr_name_raw
3159 size_t ret_length = 4096
3160 char *ret_buf = NULL
3161
3162 try:
3163 while ret_length < 4096 * 1024 * 1024:
3164 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
3165 with nogil:
3166 ret = rados_getxattr(self.io, _key, _xattr_name, ret_buf, ret_length)
3167 if ret == -errno.ERANGE:
3168 ret_length *= 2
3169 elif ret < 0:
3170 raise make_ex(ret, "Failed to get xattr %r" % xattr_name)
3171 else:
3172 break
3173 return ret_buf[:ret]
3174 finally:
3175 free(ret_buf)
3176
3177 def get_xattrs(self, oid: str) -> XattrIterator:
3178 """
3179 Start iterating over xattrs on an object.
3180
3181 :param oid: the name of the object to get xattrs from
3182
3183 :raises: :class:`TypeError`
3184 :raises: :class:`Error`
3185 :returns: XattrIterator
3186 """
3187 self.require_ioctx_open()
3188 return XattrIterator(self, oid)
3189
3190 def set_xattr(self, key: str, xattr_name: str, xattr_value: bytes) -> bool:
3191 """
3192 Set an extended attribute on an object.
3193
3194 :param key: the name of the object to set xattr to
3195 :param xattr_name: which extended attribute to set
3196 :param xattr_value: the value of the extended attribute
3197
3198 :raises: :class:`TypeError`
3199 :raises: :class:`Error`
3200 :returns: True on success, otherwise raise an error
3201 """
3202 self.require_ioctx_open()
3203
3204 key_raw = cstr(key, 'key')
3205 xattr_name_raw = cstr(xattr_name, 'xattr_name')
3206 cdef:
3207 char *_key = key_raw
3208 char *_xattr_name = xattr_name_raw
3209 char *_xattr_value = xattr_value
3210 size_t _xattr_value_len = len(xattr_value)
3211
3212 with nogil:
3213 ret = rados_setxattr(self.io, _key, _xattr_name,
3214 _xattr_value, _xattr_value_len)
3215 if ret < 0:
3216 raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
3217 return True
3218
3219 def rm_xattr(self, key: str, xattr_name: str) -> bool:
3220 """
3221 Removes an extended attribute on from an object.
3222
3223 :param key: the name of the object to remove xattr from
3224 :param xattr_name: which extended attribute to remove
3225
3226 :raises: :class:`TypeError`
3227 :raises: :class:`Error`
3228 :returns: True on success, otherwise raise an error
3229 """
3230 self.require_ioctx_open()
3231
3232 key_raw = cstr(key, 'key')
3233 xattr_name_raw = cstr(xattr_name, 'xattr_name')
3234 cdef:
3235 char *_key = key_raw
3236 char *_xattr_name = xattr_name_raw
3237
3238 with nogil:
3239 ret = rados_rmxattr(self.io, _key, _xattr_name)
3240 if ret < 0:
3241 raise make_ex(ret, "Failed to delete key %r xattr %r" %
3242 (key, xattr_name))
3243 return True
3244
3245 def notify(self, obj: str, msg: str = '', timeout_ms: int = 5000) -> bool:
3246 """
3247 Send a rados notification to an object.
3248
3249 :param obj: the name of the object to notify
3250 :param msg: optional message to send in the notification
3251 :param timeout_ms: notify timeout (in ms)
3252
3253 :raises: :class:`TypeError`
3254 :raises: :class:`Error`
3255 :returns: True on success, otherwise raise an error
3256 """
3257 self.require_ioctx_open()
3258
3259 msglen = len(msg)
3260 obj_raw = cstr(obj, 'obj')
3261 msg_raw = cstr(msg, 'msg')
3262 cdef:
3263 char *_obj = obj_raw
3264 char *_msg = msg_raw
3265 int _msglen = msglen
3266 uint64_t _timeout_ms = timeout_ms
3267
3268 with nogil:
3269 ret = rados_notify2(self.io, _obj, _msg, _msglen, _timeout_ms,
3270 NULL, NULL)
3271 if ret < 0:
3272 raise make_ex(ret, "Failed to notify %r" % (obj))
3273 return True
3274
3275 def aio_notify(self, obj: str,
3276 oncomplete: Callable[[Completion, int, Optional[List], Optional[List]], None],
3277 msg: str = '', timeout_ms: int = 5000) -> Completion:
3278 """
3279 Asynchronously send a rados notification to an object
3280 """
3281 self.require_ioctx_open()
3282
3283 msglen = len(msg)
3284 obj_raw = cstr(obj, 'obj')
3285 msg_raw = cstr(msg, 'msg')
3286
3287 cdef:
3288 Completion completion
3289 char *_obj = obj_raw
3290 char *_msg = msg_raw
3291 int _msglen = msglen
3292 uint64_t _timeout_ms = timeout_ms
3293 char *reply
3294 size_t replylen = 0
3295
3296 def oncomplete_(completion_v):
3297 cdef:
3298 Completion _completion_v = completion_v
3299 notify_ack_t *acks = NULL
3300 notify_timeout_t *timeouts = NULL
3301 size_t nr_acks
3302 size_t nr_timeouts
3303 return_value = _completion_v.get_return_value()
3304 if return_value == 0:
3305 return_value = rados_decode_notify_response(reply, replylen, &acks, &nr_acks, &timeouts, &nr_timeouts)
3306 rados_buffer_free(reply)
3307 if return_value == 0:
3308 ack_list = [(ack.notifier_id, ack.cookie, '' if not ack.payload_len \
3309 else ack.payload[:ack.payload_len]) for ack in acks[:nr_acks]]
3310 timeout_list = [(timeout.notifier_id, timeout.cookie) for timeout in timeouts[:nr_timeouts]]
3311 rados_free_notify_response(acks, nr_acks, timeouts)
3312 return oncomplete(_completion_v, 0, ack_list, timeout_list)
3313 else:
3314 return oncomplete(_completion_v, return_value, None, None)
3315
3316 completion = self.__get_completion(oncomplete_, None)
3317 self.__track_completion(completion)
3318 with nogil:
3319 ret = rados_aio_notify(self.io, _obj, completion.rados_comp,
3320 _msg, _msglen, _timeout_ms, &reply, &replylen)
3321 if ret < 0:
3322 completion._cleanup()
3323 raise make_ex(ret, "aio_notify error: %s" % obj)
3324 return completion
3325
3326 def watch(self, obj: str,
3327 callback: Callable[[int, str, int, bytes], None],
3328 error_callback: Optional[Callable[[int], None]] = None,
3329 timeout: Optional[int] = None) -> Watch:
3330 """
3331 Register an interest in an object.
3332
3333 :param obj: the name of the object to notify
3334 :param callback: what to do when a notify is received on this object
3335 :param error_callback: what to do when the watch session encounters an error
3336 :param timeout: how many seconds the connection will keep after disconnection
3337
3338 :raises: :class:`TypeError`
3339 :raises: :class:`Error`
3340 :returns: watch_id - internal id assigned to this watch
3341 """
3342 self.require_ioctx_open()
3343
3344 return Watch(self, obj, callback, error_callback, timeout)
3345
3346 def list_objects(self) -> ObjectIterator:
3347 """
3348 Get ObjectIterator on rados.Ioctx object.
3349
3350 :returns: ObjectIterator
3351 """
3352 self.require_ioctx_open()
3353 return ObjectIterator(self)
3354
3355 def list_snaps(self) -> SnapIterator:
3356 """
3357 Get SnapIterator on rados.Ioctx object.
3358
3359 :returns: SnapIterator
3360 """
3361 self.require_ioctx_open()
3362 return SnapIterator(self)
3363
3364 def get_pool_id(self) -> int:
3365 """
3366 Get pool id
3367
3368 :returns: int - pool id
3369 """
3370 with nogil:
3371 ret = rados_ioctx_get_id(self.io)
3372 return ret;
3373
3374 def get_pool_name(self) -> str:
3375 """
3376 Get pool name
3377
3378 :returns: pool name
3379 """
3380 cdef:
3381 int name_len = 10
3382 char *name = NULL
3383
3384 try:
3385 while True:
3386 name = <char *>realloc_chk(name, name_len)
3387 with nogil:
3388 ret = rados_ioctx_get_pool_name(self.io, name, name_len)
3389 if ret > 0:
3390 break
3391 elif ret != -errno.ERANGE:
3392 raise make_ex(ret, "get pool name error")
3393 else:
3394 name_len = name_len * 2
3395
3396 return decode_cstr(name)
3397 finally:
3398 free(name)
3399
3400
3401 def create_snap(self, snap_name: str):
3402 """
3403 Create a pool-wide snapshot
3404
3405 :param snap_name: the name of the snapshot
3406
3407 :raises: :class:`TypeError`
3408 :raises: :class:`Error`
3409 """
3410 self.require_ioctx_open()
3411 snap_name_raw = cstr(snap_name, 'snap_name')
3412 cdef char *_snap_name = snap_name_raw
3413
3414 with nogil:
3415 ret = rados_ioctx_snap_create(self.io, _snap_name)
3416 if ret != 0:
3417 raise make_ex(ret, "Failed to create snap %s" % snap_name)
3418
3419 def remove_snap(self, snap_name: str):
3420 """
3421 Removes a pool-wide snapshot
3422
3423 :param snap_name: the name of the snapshot
3424
3425 :raises: :class:`TypeError`
3426 :raises: :class:`Error`
3427 """
3428 self.require_ioctx_open()
3429 snap_name_raw = cstr(snap_name, 'snap_name')
3430 cdef char *_snap_name = snap_name_raw
3431
3432 with nogil:
3433 ret = rados_ioctx_snap_remove(self.io, _snap_name)
3434 if ret != 0:
3435 raise make_ex(ret, "Failed to remove snap %s" % snap_name)
3436
3437 def lookup_snap(self, snap_name: str) -> Snap:
3438 """
3439 Get the id of a pool snapshot
3440
3441 :param snap_name: the name of the snapshot to lookop
3442
3443 :raises: :class:`TypeError`
3444 :raises: :class:`Error`
3445 :returns: Snap - on success
3446 """
3447 self.require_ioctx_open()
3448 csnap_name = cstr(snap_name, 'snap_name')
3449 cdef:
3450 char *_snap_name = csnap_name
3451 rados_snap_t snap_id
3452
3453 with nogil:
3454 ret = rados_ioctx_snap_lookup(self.io, _snap_name, &snap_id)
3455 if ret != 0:
3456 raise make_ex(ret, "Failed to lookup snap %s" % snap_name)
3457 return Snap(self, snap_name, int(snap_id))
3458
3459 def snap_rollback(self, oid: str, snap_name: str):
3460 """
3461 Rollback an object to a snapshot
3462
3463 :param oid: the name of the object
3464 :param snap_name: the name of the snapshot
3465
3466 :raises: :class:`TypeError`
3467 :raises: :class:`Error`
3468 """
3469 self.require_ioctx_open()
3470 oid_raw = cstr(oid, 'oid')
3471 snap_name_raw = cstr(snap_name, 'snap_name')
3472 cdef:
3473 char *_oid = oid_raw
3474 char *_snap_name = snap_name_raw
3475
3476 with nogil:
3477 ret = rados_ioctx_snap_rollback(self.io, _oid, _snap_name)
3478 if ret != 0:
3479 raise make_ex(ret, "Failed to rollback %s" % oid)
3480
3481 def create_self_managed_snap(self):
3482 """
3483 Creates a self-managed snapshot
3484
3485 :returns: snap id on success
3486
3487 :raises: :class:`Error`
3488 """
3489 self.require_ioctx_open()
3490 cdef:
3491 rados_snap_t _snap_id
3492 with nogil:
3493 ret = rados_ioctx_selfmanaged_snap_create(self.io, &_snap_id)
3494 if ret != 0:
3495 raise make_ex(ret, "Failed to create self-managed snapshot")
3496 return int(_snap_id)
3497
3498 def remove_self_managed_snap(self, snap_id: int):
3499 """
3500 Removes a self-managed snapshot
3501
3502 :param snap_id: the name of the snapshot
3503
3504 :raises: :class:`TypeError`
3505 :raises: :class:`Error`
3506 """
3507 self.require_ioctx_open()
3508 cdef:
3509 rados_snap_t _snap_id = snap_id
3510 with nogil:
3511 ret = rados_ioctx_selfmanaged_snap_remove(self.io, _snap_id)
3512 if ret != 0:
3513 raise make_ex(ret, "Failed to remove self-managed snapshot")
3514
3515 def set_self_managed_snap_write(self, snaps: Sequence[Union[int, str]]):
3516 """
3517 Updates the write context to the specified self-managed
3518 snapshot ids.
3519
3520 :param snaps: all associated self-managed snapshot ids
3521
3522 :raises: :class:`TypeError`
3523 :raises: :class:`Error`
3524 """
3525 self.require_ioctx_open()
3526 sorted_snaps = []
3527 snap_seq = 0
3528 if snaps:
3529 sorted_snaps = sorted([int(x) for x in snaps], reverse=True)
3530 snap_seq = sorted_snaps[0]
3531
3532 cdef:
3533 rados_snap_t _snap_seq = snap_seq
3534 rados_snap_t *_snaps = NULL
3535 int _num_snaps = len(sorted_snaps)
3536 try:
3537 _snaps = <rados_snap_t *>malloc(_num_snaps * sizeof(rados_snap_t))
3538 for i in range(len(sorted_snaps)):
3539 _snaps[i] = sorted_snaps[i]
3540 with nogil:
3541 ret = rados_ioctx_selfmanaged_snap_set_write_ctx(self.io,
3542 _snap_seq,
3543 _snaps,
3544 _num_snaps)
3545 if ret != 0:
3546 raise make_ex(ret, "Failed to update snapshot write context")
3547 finally:
3548 free(_snaps)
3549
3550 def rollback_self_managed_snap(self, oid: str, snap_id: int):
3551 """
3552 Rolls an specific object back to a self-managed snapshot revision
3553
3554 :param oid: the name of the object
3555 :param snap_id: the name of the snapshot
3556
3557 :raises: :class:`TypeError`
3558 :raises: :class:`Error`
3559 """
3560 self.require_ioctx_open()
3561 oid_raw = cstr(oid, 'oid')
3562 cdef:
3563 char *_oid = oid_raw
3564 rados_snap_t _snap_id = snap_id
3565 with nogil:
3566 ret = rados_ioctx_selfmanaged_snap_rollback(self.io, _oid, _snap_id)
3567 if ret != 0:
3568 raise make_ex(ret, "Failed to rollback %s" % oid)
3569
3570 def get_last_version(self) -> int:
3571 """
3572 Return the version of the last object read or written to.
3573
3574 This exposes the internal version number of the last object read or
3575 written via this io context
3576
3577 :returns: version of the last object used
3578 """
3579 self.require_ioctx_open()
3580 with nogil:
3581 ret = rados_get_last_version(self.io)
3582 return int(ret)
3583
3584 def create_write_op(self) -> WriteOp:
3585 """
3586 create write operation object.
3587 need call release_write_op after use
3588 """
3589 return WriteOp().create()
3590
3591 def create_read_op(self) -> ReadOp:
3592 """
3593 create read operation object.
3594 need call release_read_op after use
3595 """
3596 return ReadOp().create()
3597
3598 def release_write_op(self, write_op):
3599 """
3600 release memory alloc by create_write_op
3601 """
3602 write_op.release()
3603
3604 def release_read_op(self, read_op: ReadOp):
3605 """
3606 release memory alloc by create_read_op
3607 :para read_op: read_op object
3608 """
3609 read_op.release()
3610
3611 def set_omap(self, write_op: WriteOp, keys: Sequence[OMAP_KEY_TYPE], values: Sequence[bytes]):
3612 """
3613 set keys values to write_op
3614 :para write_op: write_operation object
3615 :para keys: a tuple of keys
3616 :para values: a tuple of values
3617 """
3618
3619 if len(keys) != len(values):
3620 raise Error("Rados(): keys and values must have the same number of items")
3621
3622 keys = cstr_list(keys, 'keys')
3623 values = cstr_list(values, 'values')
3624 lens = [len(v) for v in values]
3625 cdef:
3626 WriteOp _write_op = write_op
3627 size_t key_num = len(keys)
3628 char **_keys = to_bytes_array(keys)
3629 char **_values = to_bytes_array(values)
3630 size_t *_lens = to_csize_t_array(lens)
3631
3632 try:
3633 with nogil:
3634 rados_write_op_omap_set(_write_op.write_op,
3635 <const char**>_keys,
3636 <const char**>_values,
3637 <const size_t*>_lens, key_num)
3638 finally:
3639 free(_keys)
3640 free(_values)
3641 free(_lens)
3642
3643 def operate_write_op(self,
3644 write_op: WriteOp,
3645 oid: str,
3646 mtime: int = 0,
3647 flags: int = LIBRADOS_OPERATION_NOFLAG):
3648 """
3649 execute the real write operation
3650 :para write_op: write operation object
3651 :para oid: object name
3652 :para mtime: the time to set the mtime to, 0 for the current time
3653 :para flags: flags to apply to the entire operation
3654 """
3655
3656 oid_raw = cstr(oid, 'oid')
3657 cdef:
3658 WriteOp _write_op = write_op
3659 char *_oid = oid_raw
3660 time_t _mtime = mtime
3661 int _flags = flags
3662
3663 with nogil:
3664 ret = rados_write_op_operate(_write_op.write_op, self.io, _oid, &_mtime, _flags)
3665 if ret != 0:
3666 raise make_ex(ret, "Failed to operate write op for oid %s" % oid)
3667
3668 def operate_aio_write_op(self, write_op: WriteOp, oid: str,
3669 oncomplete: Optional[Callable[[Completion], None]] = None,
3670 onsafe: Optional[Callable[[Completion], None]] = None,
3671 mtime: int = 0,
3672 flags: int = LIBRADOS_OPERATION_NOFLAG) -> Completion:
3673 """
3674 execute the real write operation asynchronously
3675 :para write_op: write operation object
3676 :para oid: object name
3677 :param oncomplete: what to do when the remove is safe and complete in memory
3678 on all replicas
3679 :param onsafe: what to do when the remove is safe and complete on storage
3680 on all replicas
3681 :para mtime: the time to set the mtime to, 0 for the current time
3682 :para flags: flags to apply to the entire operation
3683
3684 :raises: :class:`Error`
3685 :returns: completion object
3686 """
3687
3688 oid_raw = cstr(oid, 'oid')
3689 cdef:
3690 WriteOp _write_op = write_op
3691 char *_oid = oid_raw
3692 Completion completion
3693 time_t _mtime = mtime
3694 int _flags = flags
3695
3696 completion = self.__get_completion(oncomplete, onsafe)
3697 self.__track_completion(completion)
3698
3699 with nogil:
3700 ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid,
3701 &_mtime, _flags)
3702 if ret != 0:
3703 completion._cleanup()
3704 raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid)
3705 return completion
3706
3707 def operate_read_op(self, read_op: ReadOp, oid: str, flag: int = LIBRADOS_OPERATION_NOFLAG):
3708 """
3709 execute the real read operation
3710 :para read_op: read operation object
3711 :para oid: object name
3712 :para flag: flags to apply to the entire operation
3713 """
3714 oid_raw = cstr(oid, 'oid')
3715 cdef:
3716 ReadOp _read_op = read_op
3717 char *_oid = oid_raw
3718 int _flag = flag
3719
3720 with nogil:
3721 ret = rados_read_op_operate(_read_op.read_op, self.io, _oid, _flag)
3722 if ret != 0:
3723 raise make_ex(ret, "Failed to operate read op for oid %s" % oid)
3724
3725 def operate_aio_read_op(self, read_op: ReadOp, oid: str,
3726 oncomplete: Optional[Callable[[Completion], None]] = None,
3727 onsafe: Optional[Callable[[Completion], None]] = None,
3728 flag: int = LIBRADOS_OPERATION_NOFLAG) -> Completion:
3729 """
3730 execute the real read operation
3731 :para read_op: read operation object
3732 :para oid: object name
3733 :param oncomplete: what to do when the remove is safe and complete in memory
3734 on all replicas
3735 :param onsafe: what to do when the remove is safe and complete on storage
3736 on all replicas
3737 :para flag: flags to apply to the entire operation
3738 """
3739 oid_raw = cstr(oid, 'oid')
3740 cdef:
3741 ReadOp _read_op = read_op
3742 char *_oid = oid_raw
3743 Completion completion
3744 int _flag = flag
3745
3746 completion = self.__get_completion(oncomplete, onsafe)
3747 self.__track_completion(completion)
3748
3749 with nogil:
3750 ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag)
3751 if ret != 0:
3752 completion._cleanup()
3753 raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid)
3754 return completion
3755
3756 def get_omap_vals(self,
3757 read_op: ReadOp,
3758 start_after: OMAP_KEY_TYPE,
3759 filter_prefix: OMAP_KEY_TYPE,
3760 max_return: int,
3761 omap_key_type = bytes.decode) -> Tuple[OmapIterator, int]:
3762 """
3763 get the omap values
3764 :para read_op: read operation object
3765 :para start_after: list keys starting after start_after
3766 :para filter_prefix: list only keys beginning with filter_prefix
3767 :para max_return: list no more than max_return key/value pairs
3768 :returns: an iterator over the requested omap values, return value from this action
3769 """
3770
3771 start_after_raw = cstr(start_after, 'start_after') if start_after else None
3772 filter_prefix_raw = cstr(filter_prefix, 'filter_prefix') if filter_prefix else None
3773 cdef:
3774 char *_start_after = opt_str(start_after_raw)
3775 char *_filter_prefix = opt_str(filter_prefix_raw)
3776 ReadOp _read_op = read_op
3777 rados_omap_iter_t iter_addr = NULL
3778 int _max_return = max_return
3779
3780 with nogil:
3781 rados_read_op_omap_get_vals2(_read_op.read_op, _start_after, _filter_prefix,
3782 _max_return, &iter_addr, NULL, NULL)
3783 it = OmapIterator(self, omap_key_type)
3784 it.ctx = iter_addr
3785 return it, 0 # 0 is meaningless; there for backward-compat
3786
3787 def get_omap_keys(self,
3788 read_op: ReadOp,
3789 start_after: OMAP_KEY_TYPE,
3790 max_return: int,
3791 omap_key_type = bytes.decode) -> Tuple[OmapIterator, int]:
3792 """
3793 get the omap keys
3794 :para read_op: read operation object
3795 :para start_after: list keys starting after start_after
3796 :para max_return: list no more than max_return key/value pairs
3797 :returns: an iterator over the requested omap values, return value from this action
3798 """
3799 start_after = cstr(start_after, 'start_after') if start_after else None
3800 cdef:
3801 char *_start_after = opt_str(start_after)
3802 ReadOp _read_op = read_op
3803 rados_omap_iter_t iter_addr = NULL
3804 int _max_return = max_return
3805
3806 with nogil:
3807 rados_read_op_omap_get_keys2(_read_op.read_op, _start_after,
3808 _max_return, &iter_addr, NULL, NULL)
3809 it = OmapIterator(self, omap_key_type)
3810 it.ctx = iter_addr
3811 return it, 0 # 0 is meaningless; there for backward-compat
3812
3813 def get_omap_vals_by_keys(self,
3814 read_op: ReadOp,
3815 keys: Sequence[OMAP_KEY_TYPE],
3816 omap_key_type = bytes.decode) -> Tuple[OmapIterator, int]:
3817 """
3818 get the omap values by keys
3819 :para read_op: read operation object
3820 :para keys: input key tuple
3821 :returns: an iterator over the requested omap values, return value from this action
3822 """
3823 keys = cstr_list(keys, 'keys')
3824 cdef:
3825 ReadOp _read_op = read_op
3826 rados_omap_iter_t iter_addr
3827 char **_keys = to_bytes_array(keys)
3828 size_t key_num = len(keys)
3829
3830 try:
3831 with nogil:
3832 rados_read_op_omap_get_vals_by_keys(_read_op.read_op,
3833 <const char**>_keys,
3834 key_num, &iter_addr, NULL)
3835 it = OmapIterator(self, omap_key_type)
3836 it.ctx = iter_addr
3837 return it, 0 # 0 is meaningless; there for backward-compat
3838 finally:
3839 free(_keys)
3840
3841 def remove_omap_keys(self, write_op: WriteOp, keys: Sequence[OMAP_KEY_TYPE]):
3842 """
3843 remove omap keys specifiled
3844 :para write_op: write operation object
3845 :para keys: input key tuple
3846 """
3847
3848 keys = cstr_list(keys, 'keys')
3849 cdef:
3850 WriteOp _write_op = write_op
3851 size_t key_num = len(keys)
3852 char **_keys = to_bytes_array(keys)
3853
3854 try:
3855 with nogil:
3856 rados_write_op_omap_rm_keys(_write_op.write_op, <const char**>_keys, key_num)
3857 finally:
3858 free(_keys)
3859
3860 def clear_omap(self, write_op: WriteOp):
3861 """
3862 Remove all key/value pairs from an object
3863 :para write_op: write operation object
3864 """
3865
3866 cdef:
3867 WriteOp _write_op = write_op
3868
3869 with nogil:
3870 rados_write_op_omap_clear(_write_op.write_op)
3871
3872 def remove_omap_range2(self, write_op: WriteOp, key_begin: OMAP_KEY_TYPE, key_end: OMAP_KEY_TYPE):
3873 """
3874 Remove key/value pairs from an object whose keys are in the range
3875 [key_begin, key_end)
3876 :param write_op: write operation object
3877 :param key_begin: the lower bound of the key range to remove
3878 :param key_end: the upper bound of the key range to remove
3879 """
3880 key_begin_raw = cstr(key_begin, 'key_begin')
3881 key_end_raw = cstr(key_end, 'key_end')
3882 cdef:
3883 WriteOp _write_op = write_op
3884 char* _key_begin = key_begin_raw
3885 size_t key_begin_len = len(key_begin)
3886 char* _key_end = key_end_raw
3887 size_t key_end_len = len(key_end)
3888 with nogil:
3889 rados_write_op_omap_rm_range2(_write_op.write_op, _key_begin, key_begin_len,
3890 _key_end, key_end_len)
3891
3892 def lock_exclusive(self, key: str, name: str, cookie: str, desc: str = "",
3893 duration: Optional[int] = None,
3894 flags: int = 0):
3895
3896 """
3897 Take an exclusive lock on an object
3898
3899 :param key: name of the object
3900 :param name: name of the lock
3901 :param cookie: cookie of the lock
3902 :param desc: description of the lock
3903 :param duration: duration of the lock in seconds
3904 :param flags: flags
3905
3906 :raises: :class:`TypeError`
3907 :raises: :class:`Error`
3908 """
3909 self.require_ioctx_open()
3910
3911 key_raw = cstr(key, 'key')
3912 name_raw = cstr(name, 'name')
3913 cookie_raw = cstr(cookie, 'cookie')
3914 desc_raw = cstr(desc, 'desc')
3915
3916 cdef:
3917 char* _key = key_raw
3918 char* _name = name_raw
3919 char* _cookie = cookie_raw
3920 char* _desc = desc_raw
3921 uint8_t _flags = flags
3922 timeval _duration
3923
3924 if duration is None:
3925 with nogil:
3926 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3927 NULL, _flags)
3928 else:
3929 _duration.tv_sec = duration
3930 with nogil:
3931 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3932 &_duration, _flags)
3933
3934 if ret < 0:
3935 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3936
3937 def lock_shared(self, key: str, name: str, cookie: str, tag: str, desc: str = "",
3938 duration: Optional[int] = None,
3939 flags: int = 0):
3940
3941 """
3942 Take a shared lock on an object
3943
3944 :param key: name of the object
3945 :param name: name of the lock
3946 :param cookie: cookie of the lock
3947 :param tag: tag of the lock
3948 :param desc: description of the lock
3949 :param duration: duration of the lock in seconds
3950 :param flags: flags
3951
3952 :raises: :class:`TypeError`
3953 :raises: :class:`Error`
3954 """
3955 self.require_ioctx_open()
3956
3957 key_raw = cstr(key, 'key')
3958 tag_raw = cstr(tag, 'tag')
3959 name_raw = cstr(name, 'name')
3960 cookie_raw = cstr(cookie, 'cookie')
3961 desc_raw = cstr(desc, 'desc')
3962
3963 cdef:
3964 char* _key = key_raw
3965 char* _tag = tag_raw
3966 char* _name = name_raw
3967 char* _cookie = cookie_raw
3968 char* _desc = desc_raw
3969 uint8_t _flags = flags
3970 timeval _duration
3971
3972 if duration is None:
3973 with nogil:
3974 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3975 NULL, _flags)
3976 else:
3977 _duration.tv_sec = duration
3978 with nogil:
3979 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3980 &_duration, _flags)
3981 if ret < 0:
3982 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3983
3984 def unlock(self, key: str, name: str, cookie: str):
3985
3986 """
3987 Release a shared or exclusive lock on an object
3988
3989 :param key: name of the object
3990 :param name: name of the lock
3991 :param cookie: cookie of the lock
3992
3993 :raises: :class:`TypeError`
3994 :raises: :class:`Error`
3995 """
3996 self.require_ioctx_open()
3997
3998 key_raw = cstr(key, 'key')
3999 name_raw = cstr(name, 'name')
4000 cookie_raw = cstr(cookie, 'cookie')
4001
4002 cdef:
4003 char* _key = key_raw
4004 char* _name = name_raw
4005 char* _cookie = cookie_raw
4006
4007 with nogil:
4008 ret = rados_unlock(self.io, _key, _name, _cookie)
4009 if ret < 0:
4010 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
4011
4012 def set_osdmap_full_try(self):
4013 """
4014 Set global osdmap_full_try label to true
4015 """
4016 with nogil:
4017 rados_set_pool_full_try(self.io)
4018
4019 def unset_osdmap_full_try(self):
4020 """
4021 Unset
4022 """
4023 with nogil:
4024 rados_unset_pool_full_try(self.io)
4025
4026 def application_enable(self, app_name: str, force: bool = False):
4027 """
4028 Enable an application on an OSD pool
4029
4030 :param app_name: application name
4031 :type app_name: str
4032 :param force: False if only a single app should exist per pool
4033 :type expire_seconds: boool
4034
4035 :raises: :class:`Error`
4036 """
4037 app_name_raw = cstr(app_name, 'app_name')
4038 cdef:
4039 char *_app_name = app_name_raw
4040 int _force = (1 if force else 0)
4041
4042 with nogil:
4043 ret = rados_application_enable(self.io, _app_name, _force)
4044 if ret < 0:
4045 raise make_ex(ret, "error enabling application")
4046
4047 def application_list(self) -> List[str]:
4048 """
4049 Returns a list of enabled applications
4050
4051 :returns: list of app name string
4052 """
4053 cdef:
4054 size_t length = 128
4055 char *apps = NULL
4056
4057 try:
4058 while True:
4059 apps = <char *>realloc_chk(apps, length)
4060 with nogil:
4061 ret = rados_application_list(self.io, apps, &length)
4062 if ret == 0:
4063 return [decode_cstr(app) for app in
4064 apps[:length].split(b'\0') if app]
4065 elif ret == -errno.ENOENT:
4066 return None
4067 elif ret == -errno.ERANGE:
4068 pass
4069 else:
4070 raise make_ex(ret, "error listing applications")
4071 finally:
4072 free(apps)
4073
4074 def application_metadata_get(self, app_name: str, key: str) -> str:
4075 """
4076 Gets application metadata on an OSD pool for the given key
4077
4078 :param app_name: application name
4079 :type app_name: str
4080 :param key: metadata key
4081 :type key: str
4082 :returns: str - metadata value
4083
4084 :raises: :class:`Error`
4085 """
4086
4087 app_name_raw = cstr(app_name, 'app_name')
4088 key_raw = cstr(key, 'key')
4089 cdef:
4090 char *_app_name = app_name_raw
4091 char *_key = key_raw
4092 size_t size = 129
4093 char *value = NULL
4094 int ret
4095 try:
4096 while True:
4097 value = <char *>realloc_chk(value, size)
4098 with nogil:
4099 ret = rados_application_metadata_get(self.io, _app_name,
4100 _key, value, &size)
4101 if ret != -errno.ERANGE:
4102 break
4103 if ret == -errno.ENOENT:
4104 raise KeyError('no metadata %s for application %s' % (key, _app_name))
4105 elif ret != 0:
4106 raise make_ex(ret, 'error getting metadata %s for application %s' %
4107 (key, _app_name))
4108 return decode_cstr(value)
4109 finally:
4110 free(value)
4111
4112 def application_metadata_set(self, app_name: str, key: str, value: str):
4113 """
4114 Sets application metadata on an OSD pool
4115
4116 :param app_name: application name
4117 :type app_name: str
4118 :param key: metadata key
4119 :type key: str
4120 :param value: metadata value
4121 :type value: str
4122
4123 :raises: :class:`Error`
4124 """
4125 app_name_raw = cstr(app_name, 'app_name')
4126 key_raw = cstr(key, 'key')
4127 value_raw = cstr(value, 'value')
4128 cdef:
4129 char *_app_name = app_name_raw
4130 char *_key = key_raw
4131 char *_value = value_raw
4132
4133 with nogil:
4134 ret = rados_application_metadata_set(self.io, _app_name, _key,
4135 _value)
4136 if ret < 0:
4137 raise make_ex(ret, "error setting application metadata")
4138
4139 def application_metadata_remove(self, app_name: str, key: str):
4140 """
4141 Remove application metadata from an OSD pool
4142
4143 :param app_name: application name
4144 :type app_name: str
4145 :param key: metadata key
4146 :type key: str
4147
4148 :raises: :class:`Error`
4149 """
4150 app_name_raw = cstr(app_name, 'app_name')
4151 key_raw = cstr(key, 'key')
4152 cdef:
4153 char *_app_name = app_name_raw
4154 char *_key = key_raw
4155
4156 with nogil:
4157 ret = rados_application_metadata_remove(self.io, _app_name, _key)
4158 if ret < 0:
4159 raise make_ex(ret, "error removing application metadata")
4160
4161 def application_metadata_list(self, app_name: str) -> List[Tuple[str, str]]:
4162 """
4163 Returns a list of enabled applications
4164
4165 :param app_name: application name
4166 :type app_name: str
4167 :returns: list of key/value tuples
4168 """
4169 app_name_raw = cstr(app_name, 'app_name')
4170 cdef:
4171 char *_app_name = app_name_raw
4172 size_t key_length = 128
4173 size_t val_length = 128
4174 char *c_keys = NULL
4175 char *c_vals = NULL
4176
4177 try:
4178 while True:
4179 c_keys = <char *>realloc_chk(c_keys, key_length)
4180 c_vals = <char *>realloc_chk(c_vals, val_length)
4181 with nogil:
4182 ret = rados_application_metadata_list(self.io, _app_name,
4183 c_keys, &key_length,
4184 c_vals, &val_length)
4185 if ret == 0:
4186 keys = [decode_cstr(key) for key in
4187 c_keys[:key_length].split(b'\0')]
4188 vals = [decode_cstr(val) for val in
4189 c_vals[:val_length].split(b'\0')]
4190 return list(zip(keys, vals))[:-1]
4191 elif ret == -errno.ERANGE:
4192 pass
4193 else:
4194 raise make_ex(ret, "error listing application metadata")
4195 finally:
4196 free(c_keys)
4197 free(c_vals)
4198
4199 def alignment(self) -> int:
4200 """
4201 Returns pool alignment
4202
4203 :returns:
4204 Number of alignment bytes required by the current pool, or None if
4205 alignment is not required.
4206 """
4207 cdef:
4208 int requires = 0
4209 uint64_t _alignment
4210
4211 with nogil:
4212 ret = rados_ioctx_pool_requires_alignment2(self.io, &requires)
4213 if ret != 0:
4214 raise make_ex(ret, "error checking alignment")
4215
4216 alignment = None
4217 if requires:
4218 with nogil:
4219 ret = rados_ioctx_pool_required_alignment2(self.io, &_alignment)
4220 if ret != 0:
4221 raise make_ex(ret, "error querying alignment")
4222 alignment = _alignment
4223 return alignment
4224
4225
4226 def set_object_locator(func):
4227 def retfunc(self, *args, **kwargs):
4228 if self.locator_key is not None:
4229 old_locator = self.ioctx.get_locator_key()
4230 self.ioctx.set_locator_key(self.locator_key)
4231 retval = func(self, *args, **kwargs)
4232 self.ioctx.set_locator_key(old_locator)
4233 return retval
4234 else:
4235 return func(self, *args, **kwargs)
4236 return retfunc
4237
4238
4239 def set_object_namespace(func):
4240 def retfunc(self, *args, **kwargs):
4241 if self.nspace is None:
4242 raise LogicError("Namespace not set properly in context")
4243 old_nspace = self.ioctx.get_namespace()
4244 self.ioctx.set_namespace(self.nspace)
4245 retval = func(self, *args, **kwargs)
4246 self.ioctx.set_namespace(old_nspace)
4247 return retval
4248 return retfunc
4249
4250
4251 class Object(object):
4252 """Rados object wrapper, makes the object look like a file"""
4253 def __init__(self, ioctx, key, locator_key=None, nspace=None):
4254 self.key = key
4255 self.ioctx = ioctx
4256 self.offset = 0
4257 self.state = "exists"
4258 self.locator_key = locator_key
4259 self.nspace = "" if nspace is None else nspace
4260
4261 def __str__(self):
4262 return "rados.Object(ioctx=%s,key=%s,nspace=%s,locator=%s)" % \
4263 (str(self.ioctx), self.key, "--default--"
4264 if self.nspace is "" else self.nspace, self.locator_key)
4265
4266 def require_object_exists(self):
4267 if self.state != "exists":
4268 raise ObjectStateError("The object is %s" % self.state)
4269
4270 @set_object_locator
4271 @set_object_namespace
4272 def read(self, length=1024 * 1024):
4273 self.require_object_exists()
4274 ret = self.ioctx.read(self.key, length, self.offset)
4275 self.offset += len(ret)
4276 return ret
4277
4278 @set_object_locator
4279 @set_object_namespace
4280 def write(self, string_to_write):
4281 self.require_object_exists()
4282 ret = self.ioctx.write(self.key, string_to_write, self.offset)
4283 if ret == 0:
4284 self.offset += len(string_to_write)
4285 return ret
4286
4287 @set_object_locator
4288 @set_object_namespace
4289 def remove(self):
4290 self.require_object_exists()
4291 self.ioctx.remove_object(self.key)
4292 self.state = "removed"
4293
4294 @set_object_locator
4295 @set_object_namespace
4296 def stat(self) -> Tuple[int, time.struct_time]:
4297 self.require_object_exists()
4298 return self.ioctx.stat(self.key)
4299
4300 def seek(self, position: int):
4301 self.require_object_exists()
4302 self.offset = position
4303
4304 @set_object_locator
4305 @set_object_namespace
4306 def get_xattr(self, xattr_name: str) -> bytes:
4307 self.require_object_exists()
4308 return self.ioctx.get_xattr(self.key, xattr_name)
4309
4310 @set_object_locator
4311 @set_object_namespace
4312 def get_xattrs(self) -> XattrIterator:
4313 self.require_object_exists()
4314 return self.ioctx.get_xattrs(self.key)
4315
4316 @set_object_locator
4317 @set_object_namespace
4318 def set_xattr(self, xattr_name: str, xattr_value: bytes) -> bool:
4319 self.require_object_exists()
4320 return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)
4321
4322 @set_object_locator
4323 @set_object_namespace
4324 def rm_xattr(self, xattr_name: str) -> bool:
4325 self.require_object_exists()
4326 return self.ioctx.rm_xattr(self.key, xattr_name)
4327
4328 MONITOR_LEVELS = [
4329 "debug",
4330 "info",
4331 "warn", "warning",
4332 "err", "error",
4333 "sec",
4334 ]
4335
4336
4337 class MonitorLog(object):
4338 # NOTE(sileht): Keep this class for backward compat
4339 # method moved to Rados.monitor_log()
4340 """
4341 For watching cluster log messages. Instantiate an object and keep
4342 it around while callback is periodically called. Construct with
4343 'level' to monitor 'level' messages (one of MONITOR_LEVELS).
4344 arg will be passed to the callback.
4345
4346 callback will be called with:
4347 arg (given to __init__)
4348 line (the full line, including timestamp, who, level, msg)
4349 who (which entity issued the log message)
4350 timestamp_sec (sec of a struct timespec)
4351 timestamp_nsec (sec of a struct timespec)
4352 seq (sequence number)
4353 level (string representing the level of the log message)
4354 msg (the message itself)
4355 callback's return value is ignored
4356 """
4357 def __init__(self, cluster, level, callback, arg):
4358 self.level = level
4359 self.callback = callback
4360 self.arg = arg
4361 self.cluster = cluster
4362 self.cluster.monitor_log(level, callback, arg)
4363