]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/rados/rados.pyx
d7b68b4ac904794c94b06fbaf556dc4ab6d93e15
[ceph.git] / ceph / src / pybind / rados / rados.pyx
1 # cython: embedsignature=True, binding=True
2 """
3 This module is a thin wrapper around librados.
4
5 Error codes from librados are turned into exceptions that subclass
6 :class:`Error`. Almost all methods may raise :class:`Error(the base class of all rados exceptions), :class:`PermissionError`
7 (the base class of all rados exceptions), :class:`PermissionError`
8 and :class:`IOError`, in addition to those documented for the
9 method.
10 """
11 # Copyright 2011 Josh Durgin
12 # Copyright 2011, Hannu Valtonen <hannu.valtonen@ormod.com>
13 # Copyright 2015 Hector Martin <marcan@marcan.st>
14 # Copyright 2016 Mehdi Abaakouk <sileht@redhat.com>
15
16 from cpython cimport PyObject, ref
17 from cpython.pycapsule cimport *
18 from libc cimport errno
19 from libc.stdint cimport *
20 from libc.stdlib cimport malloc, realloc, free
21 IF BUILD_DOC:
22 include "mock_rados.pxi"
23 ELSE:
24 from c_rados cimport *
25
26 import threading
27 import time
28
29 from datetime import datetime, timedelta
30 from functools import partial, wraps
31 from itertools import chain
32 from typing import Callable, Dict, List, Optional, Sequence, Tuple, Union
33
34 cdef extern from "Python.h":
35 # These are in cpython/string.pxd, but use "object" types instead of
36 # PyObject*, which invokes assumptions in cpython that we need to
37 # legitimately break to implement zero-copy string buffers in Ioctx.read().
38 # This is valid use of the Python API and documented as a special case.
39 PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
40 char* PyBytes_AsString(PyObject *string) except NULL
41 int _PyBytes_Resize(PyObject **string, Py_ssize_t newsize) except -1
42 void PyEval_InitThreads()
43
44 LIBRADOS_OP_FLAG_EXCL = _LIBRADOS_OP_FLAG_EXCL
45 LIBRADOS_OP_FLAG_FAILOK = _LIBRADOS_OP_FLAG_FAILOK
46 LIBRADOS_OP_FLAG_FADVISE_RANDOM = _LIBRADOS_OP_FLAG_FADVISE_RANDOM
47 LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL = _LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL
48 LIBRADOS_OP_FLAG_FADVISE_WILLNEED = _LIBRADOS_OP_FLAG_FADVISE_WILLNEED
49 LIBRADOS_OP_FLAG_FADVISE_DONTNEED = _LIBRADOS_OP_FLAG_FADVISE_DONTNEED
50 LIBRADOS_OP_FLAG_FADVISE_NOCACHE = _LIBRADOS_OP_FLAG_FADVISE_NOCACHE
51
52 LIBRADOS_CMPXATTR_OP_EQ = _LIBRADOS_CMPXATTR_OP_EQ
53 LIBRADOS_CMPXATTR_OP_NE = _LIBRADOS_CMPXATTR_OP_NE
54 LIBRADOS_CMPXATTR_OP_GT = _LIBRADOS_CMPXATTR_OP_GT
55 LIBRADOS_CMPXATTR_OP_GTE = _LIBRADOS_CMPXATTR_OP_GTE
56 LIBRADOS_CMPXATTR_OP_LT = _LIBRADOS_CMPXATTR_OP_LT
57 LIBRADOS_CMPXATTR_OP_LTE = _LIBRADOS_CMPXATTR_OP_LTE
58
59 LIBRADOS_SNAP_HEAD = _LIBRADOS_SNAP_HEAD
60
61 LIBRADOS_OPERATION_NOFLAG = _LIBRADOS_OPERATION_NOFLAG
62 LIBRADOS_OPERATION_BALANCE_READS = _LIBRADOS_OPERATION_BALANCE_READS
63 LIBRADOS_OPERATION_LOCALIZE_READS = _LIBRADOS_OPERATION_LOCALIZE_READS
64 LIBRADOS_OPERATION_ORDER_READS_WRITES = _LIBRADOS_OPERATION_ORDER_READS_WRITES
65 LIBRADOS_OPERATION_IGNORE_CACHE = _LIBRADOS_OPERATION_IGNORE_CACHE
66 LIBRADOS_OPERATION_SKIPRWLOCKS = _LIBRADOS_OPERATION_SKIPRWLOCKS
67 LIBRADOS_OPERATION_IGNORE_OVERLAY = _LIBRADOS_OPERATION_IGNORE_OVERLAY
68
69 LIBRADOS_ALL_NSPACES = _LIBRADOS_ALL_NSPACES.decode('utf-8')
70
71 LIBRADOS_CREATE_EXCLUSIVE = _LIBRADOS_CREATE_EXCLUSIVE
72 LIBRADOS_CREATE_IDEMPOTENT = _LIBRADOS_CREATE_IDEMPOTENT
73
74 MAX_ERRNO = _MAX_ERRNO
75
76 ANONYMOUS_AUID = 0xffffffffffffffff
77 ADMIN_AUID = 0
78
79
80 class Error(Exception):
81 """ `Error` class, derived from `Exception` """
82 def __init__(self, message, errno=None):
83 super(Exception, self).__init__(message)
84 self.errno = errno
85
86 def __str__(self):
87 msg = super(Exception, self).__str__()
88 if self.errno is None:
89 return msg
90 return '[errno {0}] {1}'.format(self.errno, msg)
91
92 def __reduce__(self):
93 return (self.__class__, (self.message, self.errno))
94
95 class InvalidArgumentError(Error):
96 def __init__(self, message, errno=None):
97 super(InvalidArgumentError, self).__init__(
98 "RADOS invalid argument (%s)" % message, errno)
99
100 class ExtendMismatch(Error):
101 def __init__(self, message, errno, offset):
102 super().__init__(
103 "object content does not match (%s)" % message, errno)
104 self.offset = offset
105
106 class OSError(Error):
107 """ `OSError` class, derived from `Error` """
108 pass
109
110 class InterruptedOrTimeoutError(OSError):
111 """ `InterruptedOrTimeoutError` class, derived from `OSError` """
112 def __init__(self, message, errno=None):
113 super(InterruptedOrTimeoutError, self).__init__(
114 "RADOS interrupted or timeout (%s)" % message, errno)
115
116
117 class PermissionError(OSError):
118 """ `PermissionError` class, derived from `OSError` """
119 def __init__(self, message, errno=None):
120 super(PermissionError, self).__init__(
121 "RADOS permission error (%s)" % message, errno)
122
123
124 class PermissionDeniedError(OSError):
125 """ deal with EACCES related. """
126 def __init__(self, message, errno=None):
127 super(PermissionDeniedError, self).__init__(
128 "RADOS permission denied (%s)" % message, errno)
129
130
131 class ObjectNotFound(OSError):
132 """ `ObjectNotFound` class, derived from `OSError` """
133 def __init__(self, message, errno=None):
134 super(ObjectNotFound, self).__init__(
135 "RADOS object not found (%s)" % message, errno)
136
137
138 class NoData(OSError):
139 """ `NoData` class, derived from `OSError` """
140 def __init__(self, message, errno=None):
141 super(NoData, self).__init__(
142 "RADOS no data (%s)" % message, errno)
143
144
145 class ObjectExists(OSError):
146 """ `ObjectExists` class, derived from `OSError` """
147 def __init__(self, message, errno=None):
148 super(ObjectExists, self).__init__(
149 "RADOS object exists (%s)" % message, errno)
150
151
152 class ObjectBusy(OSError):
153 """ `ObjectBusy` class, derived from `IOError` """
154 def __init__(self, message, errno=None):
155 super(ObjectBusy, self).__init__(
156 "RADOS object busy (%s)" % message, errno)
157
158
159 class IOError(OSError):
160 """ `ObjectBusy` class, derived from `OSError` """
161 def __init__(self, message, errno=None):
162 super(IOError, self).__init__(
163 "RADOS I/O error (%s)" % message, errno)
164
165
166 class NoSpace(OSError):
167 """ `NoSpace` class, derived from `OSError` """
168 def __init__(self, message, errno=None):
169 super(NoSpace, self).__init__(
170 "RADOS no space (%s)" % message, errno)
171
172 class NotConnected(OSError):
173 """ `NotConnected` class, derived from `OSError` """
174 def __init__(self, message, errno=None):
175 super(NotConnected, self).__init__(
176 "RADOS not connected (%s)" % message, errno)
177
178 class RadosStateError(Error):
179 """ `RadosStateError` class, derived from `Error` """
180 def __init__(self, message, errno=None):
181 super(RadosStateError, self).__init__(
182 "RADOS rados state (%s)" % message, errno)
183
184
185 class IoctxStateError(Error):
186 """ `IoctxStateError` class, derived from `Error` """
187 def __init__(self, message, errno=None):
188 super(IoctxStateError, self).__init__(
189 "RADOS Ioctx state error (%s)" % message, errno)
190
191
192 class ObjectStateError(Error):
193 """ `ObjectStateError` class, derived from `Error` """
194 def __init__(self, message, errno=None):
195 super(ObjectStateError, self).__init__(
196 "RADOS object state error (%s)" % message, errno)
197
198
199 class LogicError(Error):
200 """ `` class, derived from `Error` """
201 def __init__(self, message, errno=None):
202 super(LogicError, self).__init__(
203 "RADOS logic error (%s)" % message, errno)
204
205
206 class TimedOut(OSError):
207 """ `TimedOut` class, derived from `OSError` """
208 def __init__(self, message, errno=None):
209 super(TimedOut, self).__init__(
210 "RADOS timed out (%s)" % message, errno)
211
212
213 class InProgress(Error):
214 """ `InProgress` class, derived from `Error` """
215 def __init__(self, message, errno=None):
216 super(InProgress, self).__init__(
217 "RADOS in progress error (%s)" % message, errno)
218
219
220 class IsConnected(Error):
221 """ `IsConnected` class, derived from `Error` """
222 def __init__(self, message, errno=None):
223 super(IsConnected, self).__init__(
224 "RADOS is connected error (%s)" % message, errno)
225
226
227 class ConnectionShutdown(OSError):
228 """ `ConnectionShutdown` class, derived from `OSError` """
229 def __init__(self, message, errno=None):
230 super(ConnectionShutdown, self).__init__(
231 "RADOS connection was shutdown (%s)" % message, errno)
232
233
234 IF UNAME_SYSNAME == "FreeBSD":
235 cdef errno_to_exception = {
236 errno.EPERM : PermissionError,
237 errno.ENOENT : ObjectNotFound,
238 errno.EIO : IOError,
239 errno.ENOSPC : NoSpace,
240 errno.EEXIST : ObjectExists,
241 errno.EBUSY : ObjectBusy,
242 errno.ENOATTR : NoData,
243 errno.EINTR : InterruptedOrTimeoutError,
244 errno.ETIMEDOUT : TimedOut,
245 errno.EACCES : PermissionDeniedError,
246 errno.EINPROGRESS : InProgress,
247 errno.EISCONN : IsConnected,
248 errno.EINVAL : InvalidArgumentError,
249 errno.ENOTCONN : NotConnected,
250 errno.ESHUTDOWN : ConnectionShutdown,
251 }
252 ELSE:
253 cdef errno_to_exception = {
254 errno.EPERM : PermissionError,
255 errno.ENOENT : ObjectNotFound,
256 errno.EIO : IOError,
257 errno.ENOSPC : NoSpace,
258 errno.EEXIST : ObjectExists,
259 errno.EBUSY : ObjectBusy,
260 errno.ENODATA : NoData,
261 errno.EINTR : InterruptedOrTimeoutError,
262 errno.ETIMEDOUT : TimedOut,
263 errno.EACCES : PermissionDeniedError,
264 errno.EINPROGRESS : InProgress,
265 errno.EISCONN : IsConnected,
266 errno.EINVAL : InvalidArgumentError,
267 errno.ENOTCONN : NotConnected,
268 errno.ESHUTDOWN : ConnectionShutdown,
269 }
270
271
272 cdef make_ex(ret: int, msg: str):
273 """
274 Translate a librados return code into an exception.
275
276 :param ret: the return code
277 :type ret: int
278 :param msg: the error message to use
279 :type msg: str
280 :returns: a subclass of :class:`Error`
281 """
282 ret = abs(ret)
283 if ret in errno_to_exception:
284 return errno_to_exception[ret](msg, errno=ret)
285 elif ret > MAX_ERRNO:
286 offset = ret - MAX_ERRNO
287 return ExtendMismatch(msg, ret, offset)
288 else:
289 return OSError(msg, errno=ret)
290
291
292 def cstr(val, name, encoding="utf-8", opt=False) -> Optional[bytes]:
293 """
294 Create a byte string from a Python string
295
296 :param basestring val: Python string
297 :param str name: Name of the string parameter, for exceptions
298 :param str encoding: Encoding to use
299 :param bool opt: If True, None is allowed
300 :raises: :class:`InvalidArgument`
301 """
302 if opt and val is None:
303 return None
304 if isinstance(val, bytes):
305 return val
306 elif isinstance(val, str):
307 return val.encode(encoding)
308 else:
309 raise TypeError('%s must be a string' % name)
310
311
312 def cstr_list(list_str, name, encoding="utf-8"):
313 return [cstr(s, name) for s in list_str]
314
315
316 def decode_cstr(val, encoding="utf-8") -> Optional[str]:
317 """
318 Decode a byte string into a Python string.
319
320 :param bytes val: byte string
321 """
322 if val is None:
323 return None
324
325 return val.decode(encoding)
326
327
328 def flatten_dict(d, name):
329 items = chain.from_iterable(d.items())
330 return cstr(''.join(i + '\0' for i in items), name)
331
332
333 cdef char* opt_str(s) except? NULL:
334 if s is None:
335 return NULL
336 return s
337
338
339 cdef void* realloc_chk(void* ptr, size_t size) except NULL:
340 cdef void *ret = realloc(ptr, size)
341 if ret == NULL:
342 raise MemoryError("realloc failed")
343 return ret
344
345
346 cdef size_t * to_csize_t_array(list_int):
347 cdef size_t *ret = <size_t *>malloc(len(list_int) * sizeof(size_t))
348 if ret == NULL:
349 raise MemoryError("malloc failed")
350 for i in range(len(list_int)):
351 ret[i] = <size_t>list_int[i]
352 return ret
353
354
355 cdef char ** to_bytes_array(list_bytes):
356 cdef char **ret = <char **>malloc(len(list_bytes) * sizeof(char *))
357 if ret == NULL:
358 raise MemoryError("malloc failed")
359 for i in range(len(list_bytes)):
360 ret[i] = <char *>list_bytes[i]
361 return ret
362
363 cdef int __monitor_callback(void *arg, const char *line, const char *who,
364 uint64_t sec, uint64_t nsec, uint64_t seq,
365 const char *level, const char *msg) with gil:
366 cdef object cb_info = <object>arg
367 cb_info[0](cb_info[1], line, who, sec, nsec, seq, level, msg)
368 return 0
369
370 cdef int __monitor_callback2(void *arg, const char *line, const char *channel,
371 const char *who,
372 const char *name,
373 uint64_t sec, uint64_t nsec, uint64_t seq,
374 const char *level, const char *msg) with gil:
375 cdef object cb_info = <object>arg
376 cb_info[0](cb_info[1], line, channel, name, who, sec, nsec, seq, level, msg)
377 return 0
378
379
380 class Version(object):
381 """ Version information """
382 def __init__(self, major, minor, extra):
383 self.major = major
384 self.minor = minor
385 self.extra = extra
386
387 def __str__(self):
388 return "%d.%d.%d" % (self.major, self.minor, self.extra)
389
390
391 cdef class Rados(object):
392 """This class wraps librados functions"""
393 # NOTE(sileht): attributes declared in .pyd
394
395 def __init__(self, *args, **kwargs):
396 PyEval_InitThreads()
397 self.__setup(*args, **kwargs)
398
399 NO_CONF_FILE = -1
400 "special value that indicates no conffile should be read when creating a mount handle"
401 DEFAULT_CONF_FILES = -2
402 "special value that indicates the default conffiles should be read when creating a mount handle"
403
404 def __setup(self,
405 rados_id: Optional[str] = None,
406 name: Optional[str] = None,
407 clustername: Optional[str] = None,
408 conf_defaults: Optional[Dict[str, str]] = None,
409 conffile: Union[str, int, None] = NO_CONF_FILE,
410 conf: Optional[Dict[str, str]] = None,
411 flags: int = 0,
412 context: object = None):
413 self.monitor_callback = None
414 self.monitor_callback2 = None
415 self.parsed_args = []
416 self.conf_defaults = conf_defaults
417 self.conffile = conffile
418 self.rados_id = rados_id
419
420 if rados_id and name:
421 raise Error("Rados(): can't supply both rados_id and name")
422 elif rados_id:
423 name = 'client.' + rados_id
424 elif name is None:
425 name = 'client.admin'
426 if clustername is None:
427 clustername = ''
428
429 name_raw = cstr(name, 'name')
430 clustername_raw = cstr(clustername, 'clustername')
431 cdef:
432 char *_name = name_raw
433 char *_clustername = clustername_raw
434 int _flags = flags
435 int ret
436
437 if context:
438 # Unpack void* (aka rados_config_t) from capsule
439 rados_config = <rados_config_t> PyCapsule_GetPointer(context, NULL)
440 with nogil:
441 ret = rados_create_with_context(&self.cluster, rados_config)
442 else:
443 with nogil:
444 ret = rados_create2(&self.cluster, _clustername, _name, _flags)
445 if ret != 0:
446 raise Error("rados_initialize failed with error code: %d" % ret)
447
448 self.state = "configuring"
449 # order is important: conf_defaults, then conffile, then conf
450 if conf_defaults:
451 for key, value in conf_defaults.items():
452 self.conf_set(key, value)
453 if conffile in (self.NO_CONF_FILE, None):
454 pass
455 elif conffile in (self.DEFAULT_CONF_FILES, ''):
456 self.conf_read_file(None)
457 else:
458 self.conf_read_file(conffile)
459 if conf:
460 for key, value in conf.items():
461 self.conf_set(key, value)
462
463 def get_addrs(self):
464 """
465 Get associated client addresses with this RADOS session.
466 """
467 self.require_state("configuring", "connected")
468
469 cdef:
470 char* addrs = NULL
471
472 try:
473
474 with nogil:
475 ret = rados_getaddrs(self.cluster, &addrs)
476 if ret:
477 raise make_ex(ret, "error calling getaddrs")
478
479 return decode_cstr(addrs)
480 finally:
481 free(addrs)
482
483 def require_state(self, *args):
484 """
485 Checks if the Rados object is in a special state
486
487 :raises: :class:`RadosStateError`
488 """
489 if self.state in args:
490 return
491 raise RadosStateError("You cannot perform that operation on a \
492 Rados object in state %s." % self.state)
493
494 def shutdown(self):
495 """
496 Disconnects from the cluster. Call this explicitly when a
497 Rados.connect()ed object is no longer used.
498 """
499 if self.state != "shutdown":
500 with nogil:
501 rados_shutdown(self.cluster)
502 self.state = "shutdown"
503
504 def __enter__(self):
505 self.connect()
506 return self
507
508 def __exit__(self, type_, value, traceback):
509 self.shutdown()
510 return False
511
512 def version(self) -> Version:
513 """
514 Get the version number of the ``librados`` C library.
515
516 :returns: a tuple of ``(major, minor, extra)`` components of the
517 librados version
518 """
519 cdef int major = 0
520 cdef int minor = 0
521 cdef int extra = 0
522 with nogil:
523 rados_version(&major, &minor, &extra)
524 return Version(major, minor, extra)
525
526 def conf_read_file(self, path: Optional[str] = None):
527 """
528 Configure the cluster handle using a Ceph config file.
529
530 :param path: path to the config file
531 """
532 self.require_state("configuring", "connected")
533 path_raw = cstr(path, 'path', opt=True)
534 cdef:
535 char *_path = opt_str(path_raw)
536 with nogil:
537 ret = rados_conf_read_file(self.cluster, _path)
538 if ret != 0:
539 raise make_ex(ret, "error calling conf_read_file")
540
541 def conf_parse_argv(self, args: Sequence[str]):
542 """
543 Parse known arguments from args, and remove; returned
544 args contain only those unknown to ceph
545 """
546 self.require_state("configuring", "connected")
547 if not args:
548 return
549
550 cargs = cstr_list(args, 'args')
551 cdef:
552 int _argc = len(args)
553 char **_argv = to_bytes_array(cargs)
554 char **_remargv = NULL
555
556 try:
557 _remargv = <char **>malloc(_argc * sizeof(char *))
558 with nogil:
559 ret = rados_conf_parse_argv_remainder(self.cluster, _argc,
560 <const char**>_argv,
561 <const char**>_remargv)
562 if ret:
563 raise make_ex(ret, "error calling conf_parse_argv_remainder")
564
565 # _remargv was allocated with fixed argc; collapse return
566 # list to eliminate any missing args
567 retargs = [decode_cstr(a) for a in _remargv[:_argc]
568 if a != NULL]
569 self.parsed_args = args
570 return retargs
571 finally:
572 free(_argv)
573 free(_remargv)
574
575 def conf_parse_env(self, var: Optional[str] = 'CEPH_ARGS'):
576 """
577 Parse known arguments from an environment variable, normally
578 CEPH_ARGS.
579 """
580 self.require_state("configuring", "connected")
581 if not var:
582 return
583
584 var_raw = cstr(var, 'var')
585 cdef:
586 char *_var = var_raw
587 with nogil:
588 ret = rados_conf_parse_env(self.cluster, _var)
589 if ret != 0:
590 raise make_ex(ret, "error calling conf_parse_env")
591
592 def conf_get(self, option: str) -> Optional[str]:
593 """
594 Get the value of a configuration option
595
596 :param option: which option to read
597
598 :returns: value of the option or None
599 :raises: :class:`TypeError`
600 """
601 self.require_state("configuring", "connected")
602 option_raw = cstr(option, 'option')
603 cdef:
604 char *_option = option_raw
605 size_t length = 20
606 char *ret_buf = NULL
607
608 try:
609 while True:
610 ret_buf = <char *>realloc_chk(ret_buf, length)
611 with nogil:
612 ret = rados_conf_get(self.cluster, _option, ret_buf, length)
613 if ret == 0:
614 return decode_cstr(ret_buf)
615 elif ret == -errno.ENAMETOOLONG:
616 length = length * 2
617 elif ret == -errno.ENOENT:
618 return None
619 else:
620 raise make_ex(ret, "error calling conf_get")
621 finally:
622 free(ret_buf)
623
624 def conf_set(self, option: str, val: str):
625 """
626 Set the value of a configuration option
627
628 :param option: which option to set
629 :param option: value of the option
630
631 :raises: :class:`TypeError`, :class:`ObjectNotFound`
632 """
633 self.require_state("configuring", "connected")
634 option_raw = cstr(option, 'option')
635 val_raw = cstr(val, 'val')
636 cdef:
637 char *_option = option_raw
638 char *_val = val_raw
639
640 with nogil:
641 ret = rados_conf_set(self.cluster, _option, _val)
642 if ret != 0:
643 raise make_ex(ret, "error calling conf_set")
644
645 def ping_monitor(self, mon_id: str):
646 """
647 Ping a monitor to assess liveness
648
649 May be used as a simply way to assess liveness, or to obtain
650 information about the monitor in a simple way even in the
651 absence of quorum.
652
653 :param mon_id: the ID portion of the monitor's name (i.e., mon.<ID>)
654 :returns: the string reply from the monitor
655 """
656
657 self.require_state("configuring", "connected")
658
659 mon_id_raw = cstr(mon_id, 'mon_id')
660 cdef:
661 char *_mon_id = mon_id_raw
662 size_t outstrlen = 0
663 char *outstr
664
665 with nogil:
666 ret = rados_ping_monitor(self.cluster, _mon_id, &outstr, &outstrlen)
667
668 if ret != 0:
669 raise make_ex(ret, "error calling ping_monitor")
670
671 if outstrlen:
672 my_outstr = outstr[:outstrlen]
673 rados_buffer_free(outstr)
674 return decode_cstr(my_outstr)
675
676 def connect(self, timeout: int = 0):
677 """
678 Connect to the cluster. Use shutdown() to release resources.
679
680 :param timeout: Any supplied timeout value is currently ignored.
681 """
682 self.require_state("configuring")
683 # NOTE(sileht): timeout was supported by old python API,
684 # but this is not something available in C API, so ignore
685 # for now and remove it later
686 with nogil:
687 ret = rados_connect(self.cluster)
688 if ret != 0:
689 raise make_ex(ret, "error connecting to the cluster")
690 self.state = "connected"
691
692 def get_instance_id(self) -> int:
693 """
694 Get a global id for current instance
695 """
696 self.require_state("connected")
697 with nogil:
698 ret = rados_get_instance_id(self.cluster)
699 return ret;
700
701 def get_cluster_stats(self) -> Dict[str, int]:
702 """
703 Read usage info about the cluster
704
705 This tells you total space, space used, space available, and number
706 of objects. These are not updated immediately when data is written,
707 they are eventually consistent.
708 :returns: contains the following keys:
709
710 - ``kb`` (int) - total space
711
712 - ``kb_used`` (int) - space used
713
714 - ``kb_avail`` (int) - free space available
715
716 - ``num_objects`` (int) - number of objects
717
718 """
719 cdef:
720 rados_cluster_stat_t stats
721
722 with nogil:
723 ret = rados_cluster_stat(self.cluster, &stats)
724
725 if ret < 0:
726 raise make_ex(
727 ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id)
728 return {'kb': stats.kb,
729 'kb_used': stats.kb_used,
730 'kb_avail': stats.kb_avail,
731 'num_objects': stats.num_objects}
732
733 def pool_exists(self, pool_name: str) -> bool:
734 """
735 Checks if a given pool exists.
736
737 :param pool_name: name of the pool to check
738
739 :raises: :class:`TypeError`, :class:`Error`
740 :returns: whether the pool exists, false otherwise.
741 """
742 self.require_state("connected")
743
744 pool_name_raw = cstr(pool_name, 'pool_name')
745 cdef:
746 char *_pool_name = pool_name_raw
747
748 with nogil:
749 ret = rados_pool_lookup(self.cluster, _pool_name)
750 if ret >= 0:
751 return True
752 elif ret == -errno.ENOENT:
753 return False
754 else:
755 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
756
757 def pool_lookup(self, pool_name: str) -> int:
758 """
759 Returns a pool's ID based on its name.
760
761 :param pool_name: name of the pool to look up
762
763 :raises: :class:`TypeError`, :class:`Error`
764 :returns: pool ID, or None if it doesn't exist
765 """
766 self.require_state("connected")
767 pool_name_raw = cstr(pool_name, 'pool_name')
768 cdef:
769 char *_pool_name = pool_name_raw
770
771 with nogil:
772 ret = rados_pool_lookup(self.cluster, _pool_name)
773 if ret >= 0:
774 return int(ret)
775 elif ret == -errno.ENOENT:
776 return None
777 else:
778 raise make_ex(ret, "error looking up pool '%s'" % pool_name)
779
780 def pool_reverse_lookup(self, pool_id: int) -> Optional[str]:
781 """
782 Returns a pool's name based on its ID.
783
784 :param pool_id: ID of the pool to look up
785
786 :raises: :class:`TypeError`, :class:`Error`
787 :returns: pool name, or None if it doesn't exist
788 """
789 self.require_state("connected")
790 cdef:
791 int64_t _pool_id = pool_id
792 size_t size = 512
793 char *name = NULL
794
795 try:
796 while True:
797 name = <char *>realloc_chk(name, size)
798 with nogil:
799 ret = rados_pool_reverse_lookup(self.cluster, _pool_id, name, size)
800 if ret >= 0:
801 break
802 elif ret != -errno.ERANGE and size <= 4096:
803 size *= 2
804 elif ret == -errno.ENOENT:
805 return None
806 elif ret < 0:
807 raise make_ex(ret, "error reverse looking up pool '%s'" % pool_id)
808
809 return decode_cstr(name)
810
811 finally:
812 free(name)
813
814 def create_pool(self, pool_name: str,
815 crush_rule: Optional[int] = None,
816 auid: Optional[int] = None):
817 """
818 Create a pool:
819 - with default settings: if crush_rule=None and auid=None
820 - with a specific CRUSH rule: crush_rule given
821 - with a specific auid: auid given
822 - with a specific CRUSH rule and auid: crush_rule and auid given
823
824 :param pool_name: name of the pool to create
825 :param crush_rule: rule to use for placement in the new pool
826 :param auid: id of the owner of the new pool
827
828 :raises: :class:`TypeError`, :class:`Error`
829 """
830 self.require_state("connected")
831
832 pool_name_raw = cstr(pool_name, 'pool_name')
833 cdef:
834 char *_pool_name = pool_name_raw
835 uint8_t _crush_rule
836 uint64_t _auid
837
838 if crush_rule is None and auid is None:
839 with nogil:
840 ret = rados_pool_create(self.cluster, _pool_name)
841 elif crush_rule is not None and auid is None:
842 _crush_rule = crush_rule
843 with nogil:
844 ret = rados_pool_create_with_crush_rule(self.cluster, _pool_name, _crush_rule)
845 elif crush_rule is None and auid is not None:
846 _auid = auid
847 with nogil:
848 ret = rados_pool_create_with_auid(self.cluster, _pool_name, _auid)
849 else:
850 _crush_rule = crush_rule
851 _auid = auid
852 with nogil:
853 ret = rados_pool_create_with_all(self.cluster, _pool_name, _auid, _crush_rule)
854 if ret < 0:
855 raise make_ex(ret, "error creating pool '%s'" % pool_name)
856
857 def get_pool_base_tier(self, pool_id: int) -> int:
858 """
859 Get base pool
860
861 :returns: base pool, or pool_id if tiering is not configured for the pool
862 """
863 self.require_state("connected")
864 cdef:
865 int64_t base_tier = 0
866 int64_t _pool_id = pool_id
867
868 with nogil:
869 ret = rados_pool_get_base_tier(self.cluster, _pool_id, &base_tier)
870 if ret < 0:
871 raise make_ex(ret, "get_pool_base_tier(%d)" % pool_id)
872 return int(base_tier)
873
874 def delete_pool(self, pool_name: str):
875 """
876 Delete a pool and all data inside it.
877
878 The pool is removed from the cluster immediately,
879 but the actual data is deleted in the background.
880
881 :param pool_name: name of the pool to delete
882
883 :raises: :class:`TypeError`, :class:`Error`
884 """
885 self.require_state("connected")
886
887 pool_name_raw = cstr(pool_name, 'pool_name')
888 cdef:
889 char *_pool_name = pool_name_raw
890
891 with nogil:
892 ret = rados_pool_delete(self.cluster, _pool_name)
893 if ret < 0:
894 raise make_ex(ret, "error deleting pool '%s'" % pool_name)
895
896 def get_inconsistent_pgs(self, pool_id: int) -> List[str]:
897 """
898 List inconsistent placement groups in the given pool
899
900 :param pool_id: ID of the pool in which PGs are listed
901 :returns: inconsistent placement groups
902 """
903 self.require_state("connected")
904 cdef:
905 int64_t pool = pool_id
906 size_t size = 512
907 char *pgs = NULL
908
909 try:
910 while True:
911 pgs = <char *>realloc_chk(pgs, size);
912 with nogil:
913 ret = rados_inconsistent_pg_list(self.cluster, pool,
914 pgs, size)
915 if ret > <int>size:
916 size *= 2
917 elif ret >= 0:
918 break
919 else:
920 raise make_ex(ret, "error calling inconsistent_pg_list")
921 return [pg for pg in decode_cstr(pgs[:ret]).split('\0') if pg]
922 finally:
923 free(pgs)
924
925 def list_pools(self) -> List[str]:
926 """
927 Gets a list of pool names.
928
929 :returns: list of pool names.
930 """
931 self.require_state("connected")
932 cdef:
933 size_t size = 512
934 char *c_names = NULL
935
936 try:
937 while True:
938 c_names = <char *>realloc_chk(c_names, size)
939 with nogil:
940 ret = rados_pool_list(self.cluster, c_names, size)
941 if ret > <int>size:
942 size *= 2
943 elif ret >= 0:
944 break
945 return [name for name in decode_cstr(c_names[:ret]).split('\0')
946 if name]
947 finally:
948 free(c_names)
949
950 def get_fsid(self) -> str:
951 """
952 Get the fsid of the cluster as a hexadecimal string.
953
954 :raises: :class:`Error`
955 :returns: cluster fsid
956 """
957 self.require_state("connected")
958 cdef:
959 char *ret_buf = NULL
960 size_t buf_len = 64
961
962 try:
963 while True:
964 ret_buf = <char *>realloc_chk(ret_buf, buf_len)
965 with nogil:
966 ret = rados_cluster_fsid(self.cluster, ret_buf, buf_len)
967 if ret == -errno.ERANGE:
968 buf_len = buf_len * 2
969 elif ret < 0:
970 raise make_ex(ret, "error getting cluster fsid")
971 else:
972 break
973 return decode_cstr(ret_buf)
974 finally:
975 free(ret_buf)
976
977 def open_ioctx(self, ioctx_name: str) -> Ioctx:
978 """
979 Create an io context
980
981 The io context allows you to perform operations within a particular
982 pool.
983
984 :param ioctx_name: name of the pool
985
986 :raises: :class:`TypeError`, :class:`Error`
987 :returns: Rados Ioctx object
988 """
989 self.require_state("connected")
990 ioctx_name_raw = cstr(ioctx_name, 'ioctx_name')
991 cdef:
992 rados_ioctx_t ioctx
993 char *_ioctx_name = ioctx_name_raw
994 with nogil:
995 ret = rados_ioctx_create(self.cluster, _ioctx_name, &ioctx)
996 if ret < 0:
997 raise make_ex(ret, "error opening pool '%s'" % ioctx_name)
998 io = Ioctx(self, ioctx_name)
999 io.io = ioctx
1000 return io
1001
1002 def open_ioctx2(self, pool_id: int) -> Ioctx:
1003 """
1004 Create an io context
1005
1006 The io context allows you to perform operations within a particular
1007 pool.
1008
1009 :param pool_id: ID of the pool
1010
1011 :raises: :class:`TypeError`, :class:`Error`
1012 :returns: Rados Ioctx object
1013 """
1014 self.require_state("connected")
1015 cdef:
1016 rados_ioctx_t ioctx
1017 int64_t _pool_id = pool_id
1018 with nogil:
1019 ret = rados_ioctx_create2(self.cluster, _pool_id, &ioctx)
1020 if ret < 0:
1021 raise make_ex(ret, "error opening pool id '%s'" % pool_id)
1022 io = Ioctx(self, str(pool_id))
1023 io.io = ioctx
1024 return io
1025
1026 def mon_command(self,
1027 cmd: str,
1028 inbuf: bytes,
1029 timeout: int = 0,
1030 target: Optional[Union[str, int]] = None) -> Tuple[int, bytes, str]:
1031 """
1032 Send a command to the mon.
1033
1034 mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen)
1035
1036 :param cmd: JSON formatted string.
1037 :param inbuf: optional string.
1038 :param timeout: This parameter is ignored.
1039 :param target: name or rank of a specific mon. Optional
1040 :return: (int ret, string outbuf, string outs)
1041
1042 Example:
1043
1044 >>> import json
1045 >>> c = Rados(conffile='/etc/ceph/ceph.conf')
1046 >>> c.connect()
1047 >>> cmd = json.dumps({"prefix": "osd safe-to-destroy", "ids": ["2"], "format": "json"})
1048 >>> c.mon_command(cmd, b'')
1049 """
1050 # NOTE(sileht): timeout is ignored because C API doesn't provide
1051 # timeout argument, but we keep it for backward compat with old python binding
1052 self.require_state("connected")
1053 cmds = [cstr(cmd, 'cmd')]
1054
1055 if isinstance(target, int):
1056 # NOTE(sileht): looks weird but test_monmap_dump pass int
1057 target = str(target)
1058
1059 target = cstr(target, 'target', opt=True)
1060
1061 cdef:
1062 char *_target = opt_str(target)
1063 char **_cmd = to_bytes_array(cmds)
1064 size_t _cmdlen = len(cmds)
1065
1066 char *_inbuf = inbuf
1067 size_t _inbuf_len = len(inbuf)
1068
1069 char *_outbuf
1070 size_t _outbuf_len
1071 char *_outs
1072 size_t _outs_len
1073
1074 try:
1075 if target:
1076 with nogil:
1077 ret = rados_mon_command_target(self.cluster, _target,
1078 <const char **>_cmd, _cmdlen,
1079 <const char*>_inbuf, _inbuf_len,
1080 &_outbuf, &_outbuf_len,
1081 &_outs, &_outs_len)
1082 else:
1083 with nogil:
1084 ret = rados_mon_command(self.cluster,
1085 <const char **>_cmd, _cmdlen,
1086 <const char*>_inbuf, _inbuf_len,
1087 &_outbuf, &_outbuf_len,
1088 &_outs, &_outs_len)
1089
1090 my_outs = decode_cstr(_outs[:_outs_len])
1091 my_outbuf = _outbuf[:_outbuf_len]
1092 if _outs_len:
1093 rados_buffer_free(_outs)
1094 if _outbuf_len:
1095 rados_buffer_free(_outbuf)
1096 return (ret, my_outbuf, my_outs)
1097 finally:
1098 free(_cmd)
1099
1100 def osd_command(self,
1101 osdid: int,
1102 cmd: str,
1103 inbuf: bytes,
1104 timeout: int = 0) -> Tuple[int, bytes, str]:
1105 """
1106 osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1107
1108 :return: (int ret, string outbuf, string outs)
1109 """
1110 # NOTE(sileht): timeout is ignored because C API doesn't provide
1111 # timeout argument, but we keep it for backward compat with old python binding
1112 self.require_state("connected")
1113
1114 cmds = [cstr(cmd, 'cmd')]
1115
1116 cdef:
1117 int _osdid = osdid
1118 char **_cmd = to_bytes_array(cmds)
1119 size_t _cmdlen = len(cmds)
1120
1121 char *_inbuf = inbuf
1122 size_t _inbuf_len = len(inbuf)
1123
1124 char *_outbuf
1125 size_t _outbuf_len
1126 char *_outs
1127 size_t _outs_len
1128
1129 try:
1130 with nogil:
1131 ret = rados_osd_command(self.cluster, _osdid,
1132 <const char **>_cmd, _cmdlen,
1133 <const char*>_inbuf, _inbuf_len,
1134 &_outbuf, &_outbuf_len,
1135 &_outs, &_outs_len)
1136
1137 my_outs = decode_cstr(_outs[:_outs_len])
1138 my_outbuf = _outbuf[:_outbuf_len]
1139 if _outs_len:
1140 rados_buffer_free(_outs)
1141 if _outbuf_len:
1142 rados_buffer_free(_outbuf)
1143 return (ret, my_outbuf, my_outs)
1144 finally:
1145 free(_cmd)
1146
1147 def mgr_command(self,
1148 cmd: str,
1149 inbuf: bytes,
1150 timeout: int = 0,
1151 target: Optional[str] = None) -> Tuple[int, str, bytes]:
1152 """
1153 :return: (int ret, string outbuf, string outs)
1154 """
1155 # NOTE(sileht): timeout is ignored because C API doesn't provide
1156 # timeout argument, but we keep it for backward compat with old python binding
1157 self.require_state("connected")
1158
1159 cmds = [cstr(cmd, 'cmd')]
1160 target = cstr(target, 'target', opt=True)
1161
1162 cdef:
1163 char *_target = opt_str(target)
1164
1165 char **_cmd = to_bytes_array(cmds)
1166 size_t _cmdlen = len(cmds)
1167
1168 char *_inbuf = inbuf
1169 size_t _inbuf_len = len(inbuf)
1170
1171 char *_outbuf
1172 size_t _outbuf_len
1173 char *_outs
1174 size_t _outs_len
1175
1176 try:
1177 if target is not None:
1178 with nogil:
1179 ret = rados_mgr_command_target(self.cluster,
1180 <const char*>_target,
1181 <const char **>_cmd, _cmdlen,
1182 <const char*>_inbuf, _inbuf_len,
1183 &_outbuf, &_outbuf_len,
1184 &_outs, &_outs_len)
1185 else:
1186 with nogil:
1187 ret = rados_mgr_command(self.cluster,
1188 <const char **>_cmd, _cmdlen,
1189 <const char*>_inbuf, _inbuf_len,
1190 &_outbuf, &_outbuf_len,
1191 &_outs, &_outs_len)
1192
1193 my_outs = decode_cstr(_outs[:_outs_len])
1194 my_outbuf = _outbuf[:_outbuf_len]
1195 if _outs_len:
1196 rados_buffer_free(_outs)
1197 if _outbuf_len:
1198 rados_buffer_free(_outbuf)
1199 return (ret, my_outbuf, my_outs)
1200 finally:
1201 free(_cmd)
1202
1203 def pg_command(self,
1204 pgid: str,
1205 cmd: str,
1206 inbuf: bytes,
1207 timeout: int = 0) -> Tuple[int, bytes, str]:
1208 """
1209 pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen)
1210
1211 :return: (int ret, string outbuf, string outs)
1212 """
1213 # NOTE(sileht): timeout is ignored because C API doesn't provide
1214 # timeout argument, but we keep it for backward compat with old python binding
1215 self.require_state("connected")
1216
1217 pgid_raw = cstr(pgid, 'pgid')
1218 cmds = [cstr(cmd, 'cmd')]
1219
1220 cdef:
1221 char *_pgid = pgid_raw
1222 char **_cmd = to_bytes_array(cmds)
1223 size_t _cmdlen = len(cmds)
1224
1225 char *_inbuf = inbuf
1226 size_t _inbuf_len = len(inbuf)
1227
1228 char *_outbuf
1229 size_t _outbuf_len
1230 char *_outs
1231 size_t _outs_len
1232
1233 try:
1234 with nogil:
1235 ret = rados_pg_command(self.cluster, _pgid,
1236 <const char **>_cmd, _cmdlen,
1237 <const char *>_inbuf, _inbuf_len,
1238 &_outbuf, &_outbuf_len,
1239 &_outs, &_outs_len)
1240
1241 my_outs = decode_cstr(_outs[:_outs_len])
1242 my_outbuf = _outbuf[:_outbuf_len]
1243 if _outs_len:
1244 rados_buffer_free(_outs)
1245 if _outbuf_len:
1246 rados_buffer_free(_outbuf)
1247 return (ret, my_outbuf, my_outs)
1248 finally:
1249 free(_cmd)
1250
1251 def wait_for_latest_osdmap(self) -> int:
1252 self.require_state("connected")
1253 with nogil:
1254 ret = rados_wait_for_latest_osdmap(self.cluster)
1255 return ret
1256
1257 def blocklist_add(self, client_address: str, expire_seconds: int = 0):
1258 """
1259 Blocklist a client from the OSDs
1260
1261 :param client_address: client address
1262 :param expire_seconds: number of seconds to blocklist
1263
1264 :raises: :class:`Error`
1265 """
1266 self.require_state("connected")
1267 client_address_raw = cstr(client_address, 'client_address')
1268 cdef:
1269 uint32_t _expire_seconds = expire_seconds
1270 char *_client_address = client_address_raw
1271
1272 with nogil:
1273 ret = rados_blocklist_add(self.cluster, _client_address, _expire_seconds)
1274 if ret < 0:
1275 raise make_ex(ret, "error blocklisting client '%s'" % client_address)
1276
1277 def monitor_log(self, level: str,
1278 callback: Optional[Callable[[object, str, str, str, int, int, int, str, str], None]] = None,
1279 arg: Optional[object] = None):
1280 if level not in MONITOR_LEVELS:
1281 raise LogicError("invalid monitor level " + level)
1282 if callback is not None and not callable(callback):
1283 raise LogicError("callback must be a callable function or None")
1284
1285 level_raw = cstr(level, 'level')
1286 cdef char *_level = level_raw
1287
1288 if callback is None:
1289 with nogil:
1290 r = rados_monitor_log(self.cluster, <const char*>_level, NULL, NULL)
1291 self.monitor_callback = None
1292 self.monitor_callback2 = None
1293 return
1294
1295 cb = (callback, arg)
1296 cdef PyObject* _arg = <PyObject*>cb
1297 with nogil:
1298 r = rados_monitor_log(self.cluster, <const char*>_level,
1299 <rados_log_callback_t>&__monitor_callback, _arg)
1300
1301 if r:
1302 raise make_ex(r, 'error calling rados_monitor_log')
1303 # NOTE(sileht): Prevents the callback method from being garbage collected
1304 self.monitor_callback = cb
1305 self.monitor_callback2 = None
1306
1307 def monitor_log2(self, level: str,
1308 callback: Optional[Callable[[object, str, str, str, str, int, int, int, str, str], None]] = None,
1309 arg: Optional[object] = None):
1310 if level not in MONITOR_LEVELS:
1311 raise LogicError("invalid monitor level " + level)
1312 if callback is not None and not callable(callback):
1313 raise LogicError("callback must be a callable function or None")
1314
1315 level_raw = cstr(level, 'level')
1316 cdef char *_level = level_raw
1317
1318 if callback is None:
1319 with nogil:
1320 r = rados_monitor_log2(self.cluster, <const char*>_level, NULL, NULL)
1321 self.monitor_callback = None
1322 self.monitor_callback2 = None
1323 return
1324
1325 cb = (callback, arg)
1326 cdef PyObject* _arg = <PyObject*>cb
1327 with nogil:
1328 r = rados_monitor_log2(self.cluster, <const char*>_level,
1329 <rados_log_callback2_t>&__monitor_callback2, _arg)
1330
1331 if r:
1332 raise make_ex(r, 'error calling rados_monitor_log')
1333 # NOTE(sileht): Prevents the callback method from being garbage collected
1334 self.monitor_callback = None
1335 self.monitor_callback2 = cb
1336
1337 def service_daemon_register(self, service: str, daemon: str, metadata: Dict[str, str]):
1338 """
1339 :param str service: service name (e.g. "rgw")
1340 :param str daemon: daemon name (e.g. "gwfoo")
1341 :param dict metadata: static metadata about the register daemon
1342 (e.g., the version of Ceph, the kernel version.)
1343 """
1344 service_raw = cstr(service, 'service')
1345 daemon_raw = cstr(daemon, 'daemon')
1346 metadata_dict = flatten_dict(metadata, 'metadata')
1347 cdef:
1348 char *_service = service_raw
1349 char *_daemon = daemon_raw
1350 char *_metadata = metadata_dict
1351
1352 with nogil:
1353 ret = rados_service_register(self.cluster, _service, _daemon, _metadata)
1354 if ret != 0:
1355 raise make_ex(ret, "error calling service_register()")
1356
1357 def service_daemon_update(self, status: Dict[str, str]):
1358 status_dict = flatten_dict(status, 'status')
1359 cdef:
1360 char *_status = status_dict
1361
1362 with nogil:
1363 ret = rados_service_update_status(self.cluster, _status)
1364 if ret != 0:
1365 raise make_ex(ret, "error calling service_daemon_update()")
1366
1367
1368 cdef class OmapIterator(object):
1369 """Omap iterator"""
1370
1371 cdef public Ioctx ioctx
1372 cdef rados_omap_iter_t ctx
1373
1374 def __cinit__(self, Ioctx ioctx):
1375 self.ioctx = ioctx
1376
1377 def __iter__(self):
1378 return self
1379
1380 def __next__(self):
1381 """
1382 Get the next key-value pair in the object
1383 :returns: next rados.OmapItem
1384 """
1385 cdef:
1386 char *key_ = NULL
1387 char *val_ = NULL
1388 size_t len_
1389
1390 with nogil:
1391 ret = rados_omap_get_next(self.ctx, &key_, &val_, &len_)
1392
1393 if ret != 0:
1394 raise make_ex(ret, "error iterating over the omap")
1395 if key_ == NULL:
1396 raise StopIteration()
1397 key = decode_cstr(key_)
1398 val = None
1399 if val_ != NULL:
1400 val = val_[:len_]
1401 return (key, val)
1402
1403 def __dealloc__(self):
1404 with nogil:
1405 rados_omap_get_end(self.ctx)
1406
1407
1408 cdef class ObjectIterator(object):
1409 """rados.Ioctx Object iterator"""
1410
1411 cdef rados_list_ctx_t ctx
1412
1413 cdef public object ioctx
1414
1415 def __cinit__(self, Ioctx ioctx):
1416 self.ioctx = ioctx
1417
1418 with nogil:
1419 ret = rados_nobjects_list_open(ioctx.io, &self.ctx)
1420 if ret < 0:
1421 raise make_ex(ret, "error iterating over the objects in ioctx '%s'"
1422 % self.ioctx.name)
1423
1424 def __iter__(self):
1425 return self
1426
1427 def __next__(self):
1428 """
1429 Get the next object name and locator in the pool
1430
1431 :raises: StopIteration
1432 :returns: next rados.Ioctx Object
1433 """
1434 cdef:
1435 const char *key_ = NULL
1436 const char *locator_ = NULL
1437 const char *nspace_ = NULL
1438 size_t key_size_ = 0
1439 size_t locator_size_ = 0
1440 size_t nspace_size_ = 0
1441
1442 with nogil:
1443 ret = rados_nobjects_list_next2(self.ctx, &key_, &locator_, &nspace_,
1444 &key_size_, &locator_size_, &nspace_size_)
1445
1446 if ret < 0:
1447 raise StopIteration()
1448
1449 key = decode_cstr(key_[:key_size_])
1450 locator = decode_cstr(locator_[:locator_size_]) if locator_ != NULL else None
1451 nspace = decode_cstr(nspace_[:nspace_size_]) if nspace_ != NULL else None
1452 return Object(self.ioctx, key, locator, nspace)
1453
1454 def __dealloc__(self):
1455 with nogil:
1456 rados_nobjects_list_close(self.ctx)
1457
1458
1459 cdef class XattrIterator(object):
1460 """Extended attribute iterator"""
1461
1462 cdef rados_xattrs_iter_t it
1463 cdef char* _oid
1464
1465 cdef public Ioctx ioctx
1466 cdef public object oid
1467
1468 def __cinit__(self, Ioctx ioctx, oid):
1469 self.ioctx = ioctx
1470 self.oid = cstr(oid, 'oid')
1471 self._oid = self.oid
1472
1473 with nogil:
1474 ret = rados_getxattrs(ioctx.io, self._oid, &self.it)
1475 if ret != 0:
1476 raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid)
1477
1478 def __iter__(self):
1479 return self
1480
1481 def __next__(self):
1482 """
1483 Get the next xattr on the object
1484
1485 :raises: StopIteration
1486 :returns: pair - of name and value of the next Xattr
1487 """
1488 cdef:
1489 const char *name_ = NULL
1490 const char *val_ = NULL
1491 size_t len_ = 0
1492
1493 with nogil:
1494 ret = rados_getxattrs_next(self.it, &name_, &val_, &len_)
1495 if ret != 0:
1496 raise make_ex(ret, "error iterating over the extended attributes \
1497 in '%s'" % self.oid)
1498 if name_ == NULL:
1499 raise StopIteration()
1500 name = decode_cstr(name_)
1501 val = val_[:len_]
1502 return (name, val)
1503
1504 def __dealloc__(self):
1505 with nogil:
1506 rados_getxattrs_end(self.it)
1507
1508
1509 cdef class SnapIterator(object):
1510 """Snapshot iterator"""
1511
1512 cdef public Ioctx ioctx
1513
1514 cdef rados_snap_t *snaps
1515 cdef int max_snap
1516 cdef int cur_snap
1517
1518 def __cinit__(self, Ioctx ioctx):
1519 self.ioctx = ioctx
1520 # We don't know how big a buffer we need until we've called the
1521 # function. So use the exponential doubling strategy.
1522 cdef int num_snaps = 10
1523 while True:
1524 self.snaps = <rados_snap_t*>realloc_chk(self.snaps,
1525 num_snaps *
1526 sizeof(rados_snap_t))
1527
1528 with nogil:
1529 ret = rados_ioctx_snap_list(ioctx.io, self.snaps, num_snaps)
1530 if ret >= 0:
1531 self.max_snap = ret
1532 break
1533 elif ret != -errno.ERANGE:
1534 raise make_ex(ret, "error calling rados_snap_list for \
1535 ioctx '%s'" % self.ioctx.name)
1536 num_snaps = num_snaps * 2
1537 self.cur_snap = 0
1538
1539 def __iter__(self) -> 'SnapIterator':
1540 return self
1541
1542 def __next__(self) -> 'Snap':
1543 """
1544 Get the next Snapshot
1545
1546 :raises: :class:`Error`, StopIteration
1547 :returns: next snapshot
1548 """
1549 if self.cur_snap >= self.max_snap:
1550 raise StopIteration
1551
1552 cdef:
1553 rados_snap_t snap_id = self.snaps[self.cur_snap]
1554 int name_len = 10
1555 char *name = NULL
1556
1557 try:
1558 while True:
1559 name = <char *>realloc_chk(name, name_len)
1560 with nogil:
1561 ret = rados_ioctx_snap_get_name(self.ioctx.io, snap_id, name, name_len)
1562 if ret == 0:
1563 break
1564 elif ret != -errno.ERANGE:
1565 raise make_ex(ret, "rados_snap_get_name error")
1566 else:
1567 name_len = name_len * 2
1568
1569 snap = Snap(self.ioctx, decode_cstr(name[:name_len]).rstrip('\0'), snap_id)
1570 self.cur_snap = self.cur_snap + 1
1571 return snap
1572 finally:
1573 free(name)
1574
1575
1576 cdef class Snap(object):
1577 """Snapshot object"""
1578 cdef public Ioctx ioctx
1579 cdef public object name
1580
1581 # NOTE(sileht): old API was storing the ctypes object
1582 # instead of the value ....
1583 cdef public rados_snap_t snap_id
1584
1585 def __cinit__(self, Ioctx ioctx, object name, rados_snap_t snap_id):
1586 self.ioctx = ioctx
1587 self.name = name
1588 self.snap_id = snap_id
1589
1590 def __str__(self):
1591 return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \
1592 % (str(self.ioctx), self.name, self.snap_id)
1593
1594 def get_timestamp(self) -> float:
1595 """
1596 Find when a snapshot in the current pool occurred
1597
1598 :raises: :class:`Error`
1599 :returns: the data and time the snapshot was created
1600 """
1601 cdef time_t snap_time
1602
1603 with nogil:
1604 ret = rados_ioctx_snap_get_stamp(self.ioctx.io, self.snap_id, &snap_time)
1605 if ret != 0:
1606 raise make_ex(ret, "rados_ioctx_snap_get_stamp error")
1607 return datetime.fromtimestamp(snap_time)
1608
1609 # https://github.com/cython/cython/issues/1370
1610 unicode = str
1611
1612 cdef class Completion(object):
1613 """completion object"""
1614
1615 cdef public:
1616 Ioctx ioctx
1617 object oncomplete
1618 object onsafe
1619
1620 cdef:
1621 rados_callback_t complete_cb
1622 rados_callback_t safe_cb
1623 rados_completion_t rados_comp
1624 PyObject* buf
1625
1626 def __cinit__(self, Ioctx ioctx, object oncomplete, object onsafe):
1627 self.oncomplete = oncomplete
1628 self.onsafe = onsafe
1629 self.ioctx = ioctx
1630
1631 def is_safe(self) -> bool:
1632 """
1633 Is an asynchronous operation safe?
1634
1635 This does not imply that the safe callback has finished.
1636
1637 :returns: True if the operation is safe
1638 """
1639 return self.is_complete()
1640
1641 def is_complete(self) -> bool:
1642 """
1643 Has an asynchronous operation completed?
1644
1645 This does not imply that the safe callback has finished.
1646
1647 :returns: True if the operation is completed
1648 """
1649 with nogil:
1650 ret = rados_aio_is_complete(self.rados_comp)
1651 return ret == 1
1652
1653 def wait_for_safe(self):
1654 """
1655 Wait for an asynchronous operation to be marked safe
1656
1657 wait_for_safe() is an alias of wait_for_complete() since Luminous
1658 """
1659 self.wait_for_complete()
1660
1661 def wait_for_complete(self):
1662 """
1663 Wait for an asynchronous operation to complete
1664
1665 This does not imply that the complete callback has finished.
1666 """
1667 with nogil:
1668 rados_aio_wait_for_complete(self.rados_comp)
1669
1670 def wait_for_safe_and_cb(self):
1671 """
1672 Wait for an asynchronous operation to be marked safe and for
1673 the safe callback to have returned
1674 """
1675 return self.wait_for_complete_and_cb()
1676
1677 def wait_for_complete_and_cb(self):
1678 """
1679 Wait for an asynchronous operation to complete and for the
1680 complete callback to have returned
1681
1682 :returns: whether the operation is completed
1683 """
1684 with nogil:
1685 ret = rados_aio_wait_for_complete_and_cb(self.rados_comp)
1686 return ret
1687
1688 def get_return_value(self) -> int:
1689 """
1690 Get the return value of an asychronous operation
1691
1692 The return value is set when the operation is complete or safe,
1693 whichever comes first.
1694
1695 :returns: return value of the operation
1696 """
1697 with nogil:
1698 ret = rados_aio_get_return_value(self.rados_comp)
1699 return ret
1700
1701 def __dealloc__(self):
1702 """
1703 Release a completion
1704
1705 Call this when you no longer need the completion. It may not be
1706 freed immediately if the operation is not acked and committed.
1707 """
1708 ref.Py_XDECREF(self.buf)
1709 self.buf = NULL
1710 if self.rados_comp != NULL:
1711 with nogil:
1712 rados_aio_release(self.rados_comp)
1713 self.rados_comp = NULL
1714
1715 def _complete(self):
1716 self.oncomplete(self)
1717 if self.onsafe:
1718 self.onsafe(self)
1719 self._cleanup()
1720
1721 def _cleanup(self):
1722 with self.ioctx.lock:
1723 if self.oncomplete:
1724 self.ioctx.complete_completions.remove(self)
1725 if self.onsafe:
1726 self.ioctx.safe_completions.remove(self)
1727
1728
1729 class OpCtx(object):
1730 def __enter__(self):
1731 return self.create()
1732
1733 def __exit__(self, type, msg, traceback):
1734 self.release()
1735
1736
1737 cdef class WriteOp(object):
1738 cdef rados_write_op_t write_op
1739
1740 def create(self):
1741 with nogil:
1742 self.write_op = rados_create_write_op()
1743 return self
1744
1745 def release(self):
1746 with nogil:
1747 rados_release_write_op(self.write_op)
1748
1749 def new(self, exclusive: Optional[int] = None):
1750 """
1751 Create the object.
1752 """
1753
1754 cdef:
1755 int _exclusive = exclusive
1756
1757 with nogil:
1758 rados_write_op_create(self.write_op, _exclusive, NULL)
1759
1760
1761 def remove(self):
1762 """
1763 Remove object.
1764 """
1765 with nogil:
1766 rados_write_op_remove(self.write_op)
1767
1768 def set_flags(self, flags: int = LIBRADOS_OPERATION_NOFLAG):
1769 """
1770 Set flags for the last operation added to this write_op.
1771 :para flags: flags to apply to the last operation
1772 """
1773
1774 cdef:
1775 int _flags = flags
1776
1777 with nogil:
1778 rados_write_op_set_flags(self.write_op, _flags)
1779
1780 def set_xattr(self, xattr_name: str, xattr_value: bytes):
1781 """
1782 Set an extended attribute on an object.
1783 :param xattr_name: name of the xattr
1784 :param xattr_value: buffer to set xattr to
1785 """
1786 xattr_name_raw = cstr(xattr_name, 'xattr_name')
1787 cdef:
1788 char *_xattr_name = xattr_name_raw
1789 char *_xattr_value = xattr_value
1790 size_t _xattr_value_len = len(xattr_value)
1791 with nogil:
1792 rados_write_op_setxattr(self.write_op, _xattr_name, _xattr_value, _xattr_value_len)
1793
1794 def rm_xattr(self, xattr_name: str):
1795 """
1796 Removes an extended attribute on from an object.
1797 :param xattr_name: name of the xattr to remove
1798 """
1799 xattr_name_raw = cstr(xattr_name, 'xattr_name')
1800 cdef:
1801 char *_xattr_name = xattr_name_raw
1802 with nogil:
1803 rados_write_op_rmxattr(self.write_op, _xattr_name)
1804
1805 def append(self, to_write: bytes):
1806 """
1807 Append data to an object synchronously
1808 :param to_write: data to write
1809 """
1810
1811 cdef:
1812 char *_to_write = to_write
1813 size_t length = len(to_write)
1814
1815 with nogil:
1816 rados_write_op_append(self.write_op, _to_write, length)
1817
1818 def write_full(self, to_write: bytes):
1819 """
1820 Write whole object, atomically replacing it.
1821 :param to_write: data to write
1822 """
1823
1824 cdef:
1825 char *_to_write = to_write
1826 size_t length = len(to_write)
1827
1828 with nogil:
1829 rados_write_op_write_full(self.write_op, _to_write, length)
1830
1831 def write(self, to_write: bytes, offset: int = 0):
1832 """
1833 Write to offset.
1834 :param to_write: data to write
1835 :param offset: byte offset in the object to begin writing at
1836 """
1837
1838 cdef:
1839 char *_to_write = to_write
1840 size_t length = len(to_write)
1841 uint64_t _offset = offset
1842
1843 with nogil:
1844 rados_write_op_write(self.write_op, _to_write, length, _offset)
1845
1846 def assert_version(self, version: int):
1847 """
1848 Check if object's version is the expected one.
1849 :param version: expected version of the object
1850 :param type: int
1851 """
1852 cdef:
1853 uint64_t _version = version
1854
1855 with nogil:
1856 rados_write_op_assert_version(self.write_op, _version)
1857
1858 def zero(self, offset: int, length: int):
1859 """
1860 Zero part of an object.
1861 :param offset: byte offset in the object to begin writing at
1862 :param offset: number of zero to write
1863 """
1864
1865 cdef:
1866 size_t _length = length
1867 uint64_t _offset = offset
1868
1869 with nogil:
1870 rados_write_op_zero(self.write_op, _length, _offset)
1871
1872 def truncate(self, offset: int):
1873 """
1874 Truncate an object.
1875 :param offset: byte offset in the object to begin truncating at
1876 """
1877
1878 cdef:
1879 uint64_t _offset = offset
1880
1881 with nogil:
1882 rados_write_op_truncate(self.write_op, _offset)
1883
1884 def execute(self, cls: str, method: str, data: bytes):
1885 """
1886 Execute an OSD class method on an object
1887
1888 :param cls: name of the object class
1889 :param method: name of the method
1890 :param data: input data
1891 """
1892
1893 cls_raw = cstr(cls, 'cls')
1894 method_raw = cstr(method, 'method')
1895 cdef:
1896 char *_cls = cls_raw
1897 char *_method = method_raw
1898 char *_data = data
1899 size_t _data_len = len(data)
1900
1901 with nogil:
1902 rados_write_op_exec(self.write_op, _cls, _method, _data, _data_len, NULL)
1903
1904 def writesame(self, to_write: bytes, write_len: int, offset: int = 0):
1905 """
1906 Write the same buffer multiple times
1907 :param to_write: data to write
1908 :param write_len: total number of bytes to write
1909 :param offset: byte offset in the object to begin writing at
1910 """
1911 cdef:
1912 char *_to_write = to_write
1913 size_t _data_len = len(to_write)
1914 size_t _write_len = write_len
1915 uint64_t _offset = offset
1916 with nogil:
1917 rados_write_op_writesame(self.write_op, _to_write, _data_len, _write_len, _offset)
1918
1919 def cmpext(self, cmp_buf: bytes, offset: int = 0):
1920 """
1921 Ensure that given object range (extent) satisfies comparison
1922 :param cmp_buf: buffer containing bytes to be compared with object contents
1923 :param offset: object byte offset at which to start the comparison
1924 """
1925 cdef:
1926 char *_cmp_buf = cmp_buf
1927 size_t _cmp_buf_len = len(cmp_buf)
1928 uint64_t _offset = offset
1929 with nogil:
1930 rados_write_op_cmpext(self.write_op, _cmp_buf, _cmp_buf_len, _offset, NULL)
1931
1932 def omap_cmp(self, key: str, val: str, cmp_op: int = LIBRADOS_CMPXATTR_OP_EQ):
1933 """
1934 Ensure that an omap key value satisfies comparison
1935 :param key: omap key whose associated value is evaluated for comparison
1936 :param val: value to compare with
1937 :param cmp_op: comparison operator, one of LIBRADOS_CMPXATTR_OP_EQ (1),
1938 LIBRADOS_CMPXATTR_OP_GT (3), or LIBRADOS_CMPXATTR_OP_LT (5).
1939 """
1940 key_raw = cstr(key, 'key')
1941 val_raw = cstr(val, 'val')
1942 cdef:
1943 char *_key = key_raw
1944 char *_val = val_raw
1945 size_t _val_len = len(val)
1946 uint8_t _comparison_operator = cmp_op
1947 with nogil:
1948 rados_write_op_omap_cmp(self.write_op, _key, _comparison_operator, _val, _val_len, NULL)
1949
1950 class WriteOpCtx(WriteOp, OpCtx):
1951 """write operation context manager"""
1952
1953
1954 cdef class ReadOp(object):
1955 cdef rados_read_op_t read_op
1956
1957 def create(self):
1958 with nogil:
1959 self.read_op = rados_create_read_op()
1960 return self
1961
1962 def release(self):
1963 with nogil:
1964 rados_release_read_op(self.read_op)
1965
1966 def cmpext(self, cmp_buf: bytes, offset: int = 0):
1967 """
1968 Ensure that given object range (extent) satisfies comparison
1969 :param cmp_buf: buffer containing bytes to be compared with object contents
1970 :param offset: object byte offset at which to start the comparison
1971 """
1972 cdef:
1973 char *_cmp_buf = cmp_buf
1974 size_t _cmp_buf_len = len(cmp_buf)
1975 uint64_t _offset = offset
1976 with nogil:
1977 rados_read_op_cmpext(self.read_op, _cmp_buf, _cmp_buf_len, _offset, NULL)
1978
1979 def set_flags(self, flags: int = LIBRADOS_OPERATION_NOFLAG):
1980 """
1981 Set flags for the last operation added to this read_op.
1982 :para flags: flags to apply to the last operation
1983 """
1984
1985 cdef:
1986 int _flags = flags
1987
1988 with nogil:
1989 rados_read_op_set_flags(self.read_op, _flags)
1990
1991
1992 class ReadOpCtx(ReadOp, OpCtx):
1993 """read operation context manager"""
1994
1995
1996 cdef void __watch_callback(void *_arg, int64_t _notify_id, uint64_t _cookie,
1997 uint64_t _notifier_id, void *_data,
1998 size_t _data_len) with gil:
1999 """
2000 Watch callback
2001 """
2002 cdef object watch = <object>_arg
2003 data = None
2004 if _data != NULL:
2005 data = (<char *>_data)[:_data_len]
2006 watch._callback(_notify_id, _notifier_id, _cookie, data)
2007
2008 cdef void __watch_error_callback(void *_arg, uint64_t _cookie,
2009 int _error) with gil:
2010 """
2011 Watch error callback
2012 """
2013 cdef object watch = <object>_arg
2014 watch._error_callback(_cookie, _error)
2015
2016
2017 cdef class Watch(object):
2018 """watch object"""
2019
2020 cdef:
2021 object id
2022 Ioctx ioctx
2023 object oid
2024 object callback
2025 object error_callback
2026
2027 def __cinit__(self, Ioctx ioctx, object oid, object callback,
2028 object error_callback, object timeout):
2029 self.id = 0
2030 self.ioctx = ioctx.dup()
2031 self.oid = cstr(oid, 'oid')
2032 self.callback = callback
2033 self.error_callback = error_callback
2034
2035 if timeout is None:
2036 timeout = 0
2037
2038 cdef:
2039 char *_oid = self.oid
2040 uint64_t _cookie;
2041 uint32_t _timeout = timeout;
2042 void *_args = <PyObject*>self
2043
2044 with nogil:
2045 ret = rados_watch3(self.ioctx.io, _oid, &_cookie,
2046 <rados_watchcb2_t>&__watch_callback,
2047 <rados_watcherrcb_t>&__watch_error_callback,
2048 _timeout, _args)
2049 if ret < 0:
2050 raise make_ex(ret, "watch error")
2051
2052 self.id = int(_cookie);
2053
2054 def __enter__(self):
2055 return self
2056
2057 def __exit__(self, type_, value, traceback):
2058 self.close()
2059 return False
2060
2061 def __dealloc__(self):
2062 if self.id == 0:
2063 return
2064 self.ioctx.rados.require_state("connected")
2065 self.close()
2066
2067 def _callback(self, notify_id, notifier_id, watch_id, data):
2068 replay = self.callback(notify_id, notifier_id, watch_id, data)
2069
2070 cdef:
2071 rados_ioctx_t _io = <rados_ioctx_t>self.ioctx.io
2072 char *_obj = self.oid
2073 int64_t _notify_id = notify_id
2074 uint64_t _cookie = watch_id
2075 char *_replay = NULL
2076 int _replay_len = 0
2077
2078 if replay is not None:
2079 replay = cstr(replay, 'replay')
2080 _replay = replay
2081 _replaylen = len(replay)
2082
2083 with nogil:
2084 rados_notify_ack(_io, _obj, _notify_id, _cookie, _replay,
2085 _replaylen)
2086
2087 def _error_callback(self, watch_id, error):
2088 if self.error_callback is None:
2089 return
2090 self.error_callback(watch_id, error)
2091
2092 def get_id(self) -> int:
2093 return self.id
2094
2095 def check(self):
2096 """
2097 Check on watch validity.
2098
2099 :raises: :class:`Error`
2100 :returns: timedelta since last confirmed valid
2101 """
2102 self.ioctx.require_ioctx_open()
2103
2104 cdef:
2105 uint64_t _cookie = self.id
2106
2107 with nogil:
2108 ret = rados_watch_check(self.ioctx.io, _cookie)
2109 if ret < 0:
2110 raise make_ex(ret, "check error")
2111
2112 return timedelta(milliseconds=ret)
2113
2114 def close(self):
2115 """
2116 Unregister an interest in an object.
2117
2118 :raises: :class:`Error`
2119 """
2120 if self.id == 0:
2121 return
2122
2123 self.ioctx.require_ioctx_open()
2124
2125 cdef:
2126 uint64_t _cookie = self.id
2127
2128 with nogil:
2129 ret = rados_unwatch2(self.ioctx.io, _cookie)
2130 if ret < 0 and ret != -errno.ENOENT:
2131 raise make_ex(ret, "unwatch error")
2132 self.id = 0
2133
2134 with nogil:
2135 cluster = rados_ioctx_get_cluster(self.ioctx.io)
2136 ret = rados_watch_flush(cluster);
2137 if ret < 0:
2138 raise make_ex(ret, "watch_flush error")
2139
2140 self.ioctx.close()
2141
2142
2143 cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
2144 """
2145 Callback to oncomplete() for asynchronous operations
2146 """
2147 cdef object cb = <object>args
2148 cb._complete()
2149 return 0
2150
2151 cdef class Ioctx(object):
2152 """rados.Ioctx object"""
2153 # NOTE(sileht): attributes declared in .pyd
2154
2155 def __init__(self, rados, name):
2156 self.rados = rados
2157 self.name = name
2158 self.state = "open"
2159
2160 self.locator_key = ""
2161 self.nspace = ""
2162 self.lock = threading.Lock()
2163 self.safe_completions = []
2164 self.complete_completions = []
2165
2166 def __enter__(self):
2167 return self
2168
2169 def __exit__(self, type_, value, traceback):
2170 self.close()
2171 return False
2172
2173 def __dealloc__(self):
2174 self.close()
2175
2176 def __track_completion(self, completion_obj):
2177 if completion_obj.oncomplete:
2178 with self.lock:
2179 self.complete_completions.append(completion_obj)
2180 if completion_obj.onsafe:
2181 with self.lock:
2182 self.safe_completions.append(completion_obj)
2183
2184 def __get_completion(self,
2185 oncomplete: Callable[[Completion], None],
2186 onsafe: Callable[[Completion], None]):
2187 """
2188 Constructs a completion to use with asynchronous operations
2189
2190 :param oncomplete: what to do when the write is safe and complete in memory
2191 on all replicas
2192 :param onsafe: what to do when the write is safe and complete on storage
2193 on all replicas
2194
2195 :raises: :class:`Error`
2196 :returns: completion object
2197 """
2198
2199 completion_obj = Completion(self, oncomplete, onsafe)
2200
2201 cdef:
2202 rados_callback_t complete_cb = NULL
2203 rados_completion_t completion
2204 PyObject* p_completion_obj= <PyObject*>completion_obj
2205
2206 if oncomplete:
2207 complete_cb = <rados_callback_t>&__aio_complete_cb
2208
2209 with nogil:
2210 ret = rados_aio_create_completion2(p_completion_obj, complete_cb,
2211 &completion)
2212 if ret < 0:
2213 raise make_ex(ret, "error getting a completion")
2214
2215 completion_obj.rados_comp = completion
2216 return completion_obj
2217
2218 def dup(self):
2219 """
2220 Duplicate IoCtx
2221 """
2222
2223 ioctx = self.rados.open_ioctx2(self.get_pool_id())
2224 ioctx.set_namespace(self.get_namespace())
2225 return ioctx
2226
2227 def aio_stat(self,
2228 object_name: str,
2229 oncomplete: Callable[[Completion, Optional[int], Optional[time.struct_time]], None]) -> Completion:
2230 """
2231 Asynchronously get object stats (size/mtime)
2232
2233 oncomplete will be called with the returned size and mtime
2234 as well as the completion:
2235
2236 oncomplete(completion, size, mtime)
2237
2238 :param object_name: the name of the object to get stats from
2239 :param oncomplete: what to do when the stat is complete
2240
2241 :raises: :class:`Error`
2242 :returns: completion object
2243 """
2244
2245 object_name_raw = cstr(object_name, 'object_name')
2246
2247 cdef:
2248 Completion completion
2249 char *_object_name = object_name_raw
2250 uint64_t psize
2251 time_t pmtime
2252
2253 def oncomplete_(completion_v):
2254 cdef Completion _completion_v = completion_v
2255 return_value = _completion_v.get_return_value()
2256 if return_value >= 0:
2257 return oncomplete(_completion_v, psize, time.localtime(pmtime))
2258 else:
2259 return oncomplete(_completion_v, None, None)
2260
2261 completion = self.__get_completion(oncomplete_, None)
2262 self.__track_completion(completion)
2263 with nogil:
2264 ret = rados_aio_stat(self.io, _object_name, completion.rados_comp,
2265 &psize, &pmtime)
2266
2267 if ret < 0:
2268 completion._cleanup()
2269 raise make_ex(ret, "error stating %s" % object_name)
2270 return completion
2271
2272 def aio_write(self, object_name: str, to_write: bytes, offset: int = 0,
2273 oncomplete: Optional[Callable[[Completion], None]] = None,
2274 onsafe: Optional[Callable[[Completion], None]] = None) -> Completion:
2275 """
2276 Write data to an object asynchronously
2277
2278 Queues the write and returns.
2279
2280 :param object_name: name of the object
2281 :param to_write: data to write
2282 :param offset: byte offset in the object to begin writing at
2283 :param oncomplete: what to do when the write is safe and complete in memory
2284 on all replicas
2285 :param onsafe: what to do when the write is safe and complete on storage
2286 on all replicas
2287
2288 :raises: :class:`Error`
2289 :returns: completion object
2290 """
2291
2292 object_name_raw = cstr(object_name, 'object_name')
2293
2294 cdef:
2295 Completion completion
2296 char* _object_name = object_name_raw
2297 char* _to_write = to_write
2298 size_t size = len(to_write)
2299 uint64_t _offset = offset
2300
2301 completion = self.__get_completion(oncomplete, onsafe)
2302 self.__track_completion(completion)
2303 with nogil:
2304 ret = rados_aio_write(self.io, _object_name, completion.rados_comp,
2305 _to_write, size, _offset)
2306 if ret < 0:
2307 completion._cleanup()
2308 raise make_ex(ret, "error writing object %s" % object_name)
2309 return completion
2310
2311 def aio_write_full(self, object_name: str, to_write: bytes,
2312 oncomplete: Optional[Callable] = None,
2313 onsafe: Optional[Callable] = None) -> Completion:
2314 """
2315 Asynchronously write an entire object
2316
2317 The object is filled with the provided data. If the object exists,
2318 it is atomically truncated and then written.
2319 Queues the write and returns.
2320
2321 :param object_name: name of the object
2322 :param to_write: data to write
2323 :param oncomplete: what to do when the write is safe and complete in memory
2324 on all replicas
2325 :param onsafe: what to do when the write is safe and complete on storage
2326 on all replicas
2327
2328 :raises: :class:`Error`
2329 :returns: completion object
2330 """
2331
2332 object_name_raw = cstr(object_name, 'object_name')
2333
2334 cdef:
2335 Completion completion
2336 char* _object_name = object_name_raw
2337 char* _to_write = to_write
2338 size_t size = len(to_write)
2339
2340 completion = self.__get_completion(oncomplete, onsafe)
2341 self.__track_completion(completion)
2342 with nogil:
2343 ret = rados_aio_write_full(self.io, _object_name,
2344 completion.rados_comp,
2345 _to_write, size)
2346 if ret < 0:
2347 completion._cleanup()
2348 raise make_ex(ret, "error writing object %s" % object_name)
2349 return completion
2350
2351 def aio_writesame(self, object_name: str, to_write: bytes,
2352 write_len: int, offset: int = 0,
2353 oncomplete: Optional[Callable] = None) -> Completion:
2354 """
2355 Asynchronously write the same buffer multiple times
2356
2357 :param object_name: name of the object
2358 :param to_write: data to write
2359 :param write_len: total number of bytes to write
2360 :param offset: byte offset in the object to begin writing at
2361 :param oncomplete: what to do when the writesame is safe and
2362 complete in memory on all replicas
2363 :raises: :class:`Error`
2364 :returns: completion object
2365 """
2366
2367 object_name_raw = cstr(object_name, 'object_name')
2368
2369 cdef:
2370 Completion completion
2371 char* _object_name = object_name_raw
2372 char* _to_write = to_write
2373 size_t _data_len = len(to_write)
2374 size_t _write_len = write_len
2375 uint64_t _offset = offset
2376
2377 completion = self.__get_completion(oncomplete, None)
2378 self.__track_completion(completion)
2379 with nogil:
2380 ret = rados_aio_writesame(self.io, _object_name, completion.rados_comp,
2381 _to_write, _data_len, _write_len, _offset)
2382
2383 if ret < 0:
2384 completion._cleanup()
2385 raise make_ex(ret, "error writing object %s" % object_name)
2386 return completion
2387
2388 def aio_append(self, object_name: str, to_append: bytes,
2389 oncomplete: Optional[Callable] = None,
2390 onsafe: Optional[Callable] = None) -> Completion:
2391 """
2392 Asynchronously append data to an object
2393
2394 Queues the write and returns.
2395
2396 :param object_name: name of the object
2397 :param to_append: data to append
2398 :param offset: byte offset in the object to begin writing at
2399 :param oncomplete: what to do when the write is safe and complete in memory
2400 on all replicas
2401 :param onsafe: what to do when the write is safe and complete on storage
2402 on all replicas
2403
2404 :raises: :class:`Error`
2405 :returns: completion object
2406 """
2407 object_name_raw = cstr(object_name, 'object_name')
2408
2409 cdef:
2410 Completion completion
2411 char* _object_name = object_name_raw
2412 char* _to_append = to_append
2413 size_t size = len(to_append)
2414
2415 completion = self.__get_completion(oncomplete, onsafe)
2416 self.__track_completion(completion)
2417 with nogil:
2418 ret = rados_aio_append(self.io, _object_name,
2419 completion.rados_comp,
2420 _to_append, size)
2421 if ret < 0:
2422 completion._cleanup()
2423 raise make_ex(ret, "error appending object %s" % object_name)
2424 return completion
2425
2426 def aio_flush(self):
2427 """
2428 Block until all pending writes in an io context are safe
2429
2430 :raises: :class:`Error`
2431 """
2432 with nogil:
2433 ret = rados_aio_flush(self.io)
2434 if ret < 0:
2435 raise make_ex(ret, "error flushing")
2436
2437 def aio_cmpext(self, object_name: str, cmp_buf: bytes, offset: int = 0,
2438 oncomplete: Optional[Callable] = None) -> Completion:
2439 """
2440 Asynchronously compare an on-disk object range with a buffer
2441 :param object_name: the name of the object
2442 :param cmp_buf: buffer containing bytes to be compared with object contents
2443 :param offset: object byte offset at which to start the comparison
2444 :param oncomplete: what to do when the write is safe and complete in memory
2445 on all replicas
2446
2447 :raises: :class:`TypeError`
2448 returns: 0 - on success, negative error code on failure,
2449 (-MAX_ERRNO - mismatch_off) on mismatch
2450 """
2451 object_name_raw = cstr(object_name, 'object_name')
2452
2453 cdef:
2454 Completion completion
2455 char* _object_name = object_name_raw
2456 char* _cmp_buf = cmp_buf
2457 size_t _cmp_buf_len = len(cmp_buf)
2458 uint64_t _offset = offset
2459
2460 completion = self.__get_completion(oncomplete, None)
2461 self.__track_completion(completion)
2462
2463 with nogil:
2464 ret = rados_aio_cmpext(self.io, _object_name, completion.rados_comp,
2465 _cmp_buf, _cmp_buf_len, _offset)
2466
2467 if ret < 0:
2468 completion._cleanup()
2469 raise make_ex(ret, "failed to compare %s" % object_name)
2470 return completion
2471
2472 def aio_rmxattr(self, object_name: str, xattr_name: str,
2473 oncomplete: Optional[Callable] = None) -> Completion:
2474 """
2475 Asynchronously delete an extended attribute from an object
2476
2477 :param object_name: the name of the object to remove xattr from
2478 :param xattr_name: which extended attribute to remove
2479 :param oncomplete: what to do when the rmxattr completes
2480
2481 :raises: :class:`Error`
2482 :returns: completion object
2483 """
2484 object_name_raw = cstr(object_name, 'object_name')
2485 xattr_name_raw = cstr(xattr_name , 'xattr_name')
2486
2487 cdef:
2488 Completion completion
2489 char* _object_name = object_name_raw
2490 char* _xattr_name = xattr_name_raw
2491
2492 completion = self.__get_completion(oncomplete, None)
2493 self.__track_completion(completion)
2494 with nogil:
2495 ret = rados_aio_rmxattr(self.io, _object_name,
2496 completion.rados_comp, _xattr_name)
2497
2498 if ret < 0:
2499 completion._cleanup()
2500 raise make_ex(ret, "Failed to remove xattr %r" % xattr_name)
2501 return completion
2502
2503 def aio_read(self, object_name: str, length: int, offset: int,
2504 oncomplete: Optional[Callable] = None) -> Completion:
2505 """
2506 Asynchronously read data from an object
2507
2508 oncomplete will be called with the returned read value as
2509 well as the completion:
2510
2511 oncomplete(completion, data_read)
2512
2513 :param object_name: name of the object to read from
2514 :param length: the number of bytes to read
2515 :param offset: byte offset in the object to begin reading from
2516 :param oncomplete: what to do when the read is complete
2517
2518 :raises: :class:`Error`
2519 :returns: completion object
2520 """
2521
2522 object_name_raw = cstr(object_name, 'object_name')
2523
2524 cdef:
2525 Completion completion
2526 char* _object_name = object_name_raw
2527 uint64_t _offset = offset
2528
2529 char *ref_buf
2530 size_t _length = length
2531
2532 def oncomplete_(completion_v):
2533 cdef Completion _completion_v = completion_v
2534 return_value = _completion_v.get_return_value()
2535 if return_value > 0 and return_value != length:
2536 _PyBytes_Resize(&_completion_v.buf, return_value)
2537 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2538
2539 completion = self.__get_completion(oncomplete_, None)
2540 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2541 ret_buf = PyBytes_AsString(completion.buf)
2542 self.__track_completion(completion)
2543 with nogil:
2544 ret = rados_aio_read(self.io, _object_name, completion.rados_comp,
2545 ret_buf, _length, _offset)
2546 if ret < 0:
2547 completion._cleanup()
2548 raise make_ex(ret, "error reading %s" % object_name)
2549 return completion
2550
2551 def aio_execute(self, object_name: str, cls: str, method: str,
2552 data: bytes, length: int = 8192,
2553 oncomplete: Optional[Callable[[Completion, bytes], None]] = None,
2554 onsafe: Optional[Callable[[Completion, bytes], None]] = None) -> Completion:
2555 """
2556 Asynchronously execute an OSD class method on an object.
2557
2558 oncomplete and onsafe will be called with the data returned from
2559 the plugin as well as the completion:
2560
2561 oncomplete(completion, data)
2562 onsafe(completion, data)
2563
2564 :param object_name: name of the object
2565 :param cls: name of the object class
2566 :param method: name of the method
2567 :param data: input data
2568 :param length: size of output buffer in bytes (default=8192)
2569 :param oncomplete: what to do when the execution is complete
2570 :param onsafe: what to do when the execution is safe and complete
2571
2572 :raises: :class:`Error`
2573 :returns: completion object
2574 """
2575
2576 object_name_raw = cstr(object_name, 'object_name')
2577 cls_raw = cstr(cls, 'cls')
2578 method_raw = cstr(method, 'method')
2579 cdef:
2580 Completion completion
2581 char *_object_name = object_name_raw
2582 char *_cls = cls_raw
2583 char *_method = method_raw
2584 char *_data = data
2585 size_t _data_len = len(data)
2586
2587 char *ref_buf
2588 size_t _length = length
2589
2590 def oncomplete_(completion_v):
2591 cdef Completion _completion_v = completion_v
2592 return_value = _completion_v.get_return_value()
2593 if return_value > 0 and return_value != length:
2594 _PyBytes_Resize(&_completion_v.buf, return_value)
2595 return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2596
2597 def onsafe_(completion_v):
2598 cdef Completion _completion_v = completion_v
2599 return_value = _completion_v.get_return_value()
2600 return onsafe(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
2601
2602 completion = self.__get_completion(oncomplete_ if oncomplete else None, onsafe_ if onsafe else None)
2603 completion.buf = PyBytes_FromStringAndSize(NULL, length)
2604 ret_buf = PyBytes_AsString(completion.buf)
2605 self.__track_completion(completion)
2606 with nogil:
2607 ret = rados_aio_exec(self.io, _object_name, completion.rados_comp,
2608 _cls, _method, _data, _data_len, ret_buf, _length)
2609 if ret < 0:
2610 completion._cleanup()
2611 raise make_ex(ret, "error executing %s::%s on %s" % (cls, method, object_name))
2612 return completion
2613
2614 def aio_setxattr(self, object_name: str, xattr_name: str, xattr_value: bytes,
2615 oncomplete: Optional[Callable] = None) -> Completion:
2616 """
2617 Asynchronously set an extended attribute on an object
2618
2619 :param object_name: the name of the object to set xattr to
2620 :param xattr_name: which extended attribute to set
2621 :param xattr_value: the value of the extended attribute
2622 :param oncomplete: what to do when the setxttr completes
2623
2624 :raises: :class:`Error`
2625 :returns: completion object
2626 """
2627 object_name_raw = cstr(object_name, 'object_name')
2628 xattr_name_raw = cstr(xattr_name , 'xattr_name')
2629
2630 cdef:
2631 Completion completion
2632 char* _object_name = object_name_raw
2633 char* _xattr_name = xattr_name_raw
2634 char* _xattr_value = xattr_value
2635 size_t xattr_value_len = len(xattr_value)
2636
2637 completion = self.__get_completion(oncomplete, None)
2638 self.__track_completion(completion)
2639 with nogil:
2640 ret = rados_aio_setxattr(self.io, _object_name,
2641 completion.rados_comp,
2642 _xattr_name, _xattr_value, xattr_value_len)
2643
2644 if ret < 0:
2645 completion._cleanup()
2646 raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
2647 return completion
2648
2649 def aio_remove(self, object_name: str,
2650 oncomplete: Optional[Callable] = None,
2651 onsafe: Optional[Callable] = None) -> Completion:
2652 """
2653 Asynchronously remove an object
2654
2655 :param object_name: name of the object to remove
2656 :param oncomplete: what to do when the remove is safe and complete in memory
2657 on all replicas
2658 :param onsafe: what to do when the remove is safe and complete on storage
2659 on all replicas
2660
2661 :raises: :class:`Error`
2662 :returns: completion object
2663 """
2664 object_name_raw = cstr(object_name, 'object_name')
2665
2666 cdef:
2667 Completion completion
2668 char* _object_name = object_name_raw
2669
2670 completion = self.__get_completion(oncomplete, onsafe)
2671 self.__track_completion(completion)
2672 with nogil:
2673 ret = rados_aio_remove(self.io, _object_name,
2674 completion.rados_comp)
2675 if ret < 0:
2676 completion._cleanup()
2677 raise make_ex(ret, "error removing %s" % object_name)
2678 return completion
2679
2680 def require_ioctx_open(self):
2681 """
2682 Checks if the rados.Ioctx object state is 'open'
2683
2684 :raises: IoctxStateError
2685 """
2686 if self.state != "open":
2687 raise IoctxStateError("The pool is %s" % self.state)
2688
2689 def set_locator_key(self, loc_key: str):
2690 """
2691 Set the key for mapping objects to pgs within an io context.
2692
2693 The key is used instead of the object name to determine which
2694 placement groups an object is put in. This affects all subsequent
2695 operations of the io context - until a different locator key is
2696 set, all objects in this io context will be placed in the same pg.
2697
2698 :param loc_key: the key to use as the object locator, or NULL to discard
2699 any previously set key
2700
2701 :raises: :class:`TypeError`
2702 """
2703 self.require_ioctx_open()
2704 cloc_key = cstr(loc_key, 'loc_key')
2705 cdef char *_loc_key = cloc_key
2706 with nogil:
2707 rados_ioctx_locator_set_key(self.io, _loc_key)
2708 self.locator_key = loc_key
2709
2710 def get_locator_key(self) -> str:
2711 """
2712 Get the locator_key of context
2713
2714 :returns: locator_key
2715 """
2716 return self.locator_key
2717
2718 def set_read(self, snap_id: int):
2719 """
2720 Set the snapshot for reading objects.
2721
2722 To stop to read from snapshot, use set_read(LIBRADOS_SNAP_HEAD)
2723
2724 :param snap_id: the snapshot Id
2725
2726 :raises: :class:`TypeError`
2727 """
2728 self.require_ioctx_open()
2729 cdef rados_snap_t _snap_id = snap_id
2730 with nogil:
2731 rados_ioctx_snap_set_read(self.io, _snap_id)
2732
2733 def set_namespace(self, nspace: str):
2734 """
2735 Set the namespace for objects within an io context.
2736
2737 The namespace in addition to the object name fully identifies
2738 an object. This affects all subsequent operations of the io context
2739 - until a different namespace is set, all objects in this io context
2740 will be placed in the same namespace.
2741
2742 :param nspace: the namespace to use, or None/"" for the default namespace
2743
2744 :raises: :class:`TypeError`
2745 """
2746 self.require_ioctx_open()
2747 if nspace is None:
2748 nspace = ""
2749 cnspace = cstr(nspace, 'nspace')
2750 cdef char *_nspace = cnspace
2751 with nogil:
2752 rados_ioctx_set_namespace(self.io, _nspace)
2753 self.nspace = nspace
2754
2755 def get_namespace(self) -> str:
2756 """
2757 Get the namespace of context
2758
2759 :returns: namespace
2760 """
2761 return self.nspace
2762
2763 def close(self):
2764 """
2765 Close a rados.Ioctx object.
2766
2767 This just tells librados that you no longer need to use the io context.
2768 It may not be freed immediately if there are pending asynchronous
2769 requests on it, but you should not use an io context again after
2770 calling this function on it.
2771 """
2772 if self.state == "open":
2773 self.require_ioctx_open()
2774 with nogil:
2775 rados_ioctx_destroy(self.io)
2776 self.state = "closed"
2777
2778
2779 def write(self, key: str, data: bytes, offset: int = 0):
2780 """
2781 Write data to an object synchronously
2782
2783 :param key: name of the object
2784 :param data: data to write
2785 :param offset: byte offset in the object to begin writing at
2786
2787 :raises: :class:`TypeError`
2788 :raises: :class:`LogicError`
2789 :returns: int - 0 on success
2790 """
2791 self.require_ioctx_open()
2792
2793 key_raw = cstr(key, 'key')
2794 cdef:
2795 char *_key = key_raw
2796 char *_data = data
2797 size_t length = len(data)
2798 uint64_t _offset = offset
2799
2800 with nogil:
2801 ret = rados_write(self.io, _key, _data, length, _offset)
2802 if ret == 0:
2803 return ret
2804 elif ret < 0:
2805 raise make_ex(ret, "Ioctx.write(%s): failed to write %s"
2806 % (self.name, key))
2807 else:
2808 raise LogicError("Ioctx.write(%s): rados_write \
2809 returned %d, but should return zero on success." % (self.name, ret))
2810
2811 def write_full(self, key: str, data: bytes):
2812 """
2813 Write an entire object synchronously.
2814
2815 The object is filled with the provided data. If the object exists,
2816 it is atomically truncated and then written.
2817
2818 :param key: name of the object
2819 :param data: data to write
2820
2821 :raises: :class:`TypeError`
2822 :raises: :class:`Error`
2823 :returns: int - 0 on success
2824 """
2825 self.require_ioctx_open()
2826 key_raw = cstr(key, 'key')
2827 cdef:
2828 char *_key = key_raw
2829 char *_data = data
2830 size_t length = len(data)
2831
2832 with nogil:
2833 ret = rados_write_full(self.io, _key, _data, length)
2834 if ret == 0:
2835 return ret
2836 elif ret < 0:
2837 raise make_ex(ret, "Ioctx.write_full(%s): failed to write %s"
2838 % (self.name, key))
2839 else:
2840 raise LogicError("Ioctx.write_full(%s): rados_write_full \
2841 returned %d, but should return zero on success." % (self.name, ret))
2842
2843 def writesame(self, key: str, data: bytes, write_len: int, offset: int = 0):
2844 """
2845 Write the same buffer multiple times
2846 :param key: name of the object
2847 :param data: data to write
2848 :param write_len: total number of bytes to write
2849 :param offset: byte offset in the object to begin writing at
2850
2851 :raises: :class:`TypeError`
2852 :raises: :class:`LogicError`
2853 """
2854 self.require_ioctx_open()
2855
2856 key_raw = cstr(key, 'key')
2857 cdef:
2858 char *_key = key_raw
2859 char *_data = data
2860 size_t _data_len = len(data)
2861 size_t _write_len = write_len
2862 uint64_t _offset = offset
2863
2864 with nogil:
2865 ret = rados_writesame(self.io, _key, _data, _data_len, _write_len, _offset)
2866 if ret < 0:
2867 raise make_ex(ret, "Ioctx.writesame(%s): failed to write %s"
2868 % (self.name, key))
2869 assert(ret == 0)
2870
2871 def append(self, key: str, data: bytes):
2872 """
2873 Append data to an object synchronously
2874
2875 :param key: name of the object
2876 :param data: data to write
2877
2878 :raises: :class:`TypeError`
2879 :raises: :class:`LogicError`
2880 :returns: int - 0 on success
2881 """
2882 self.require_ioctx_open()
2883 key_raw = cstr(key, 'key')
2884 cdef:
2885 char *_key = key_raw
2886 char *_data = data
2887 size_t length = len(data)
2888
2889 with nogil:
2890 ret = rados_append(self.io, _key, _data, length)
2891 if ret == 0:
2892 return ret
2893 elif ret < 0:
2894 raise make_ex(ret, "Ioctx.append(%s): failed to append %s"
2895 % (self.name, key))
2896 else:
2897 raise LogicError("Ioctx.append(%s): rados_append \
2898 returned %d, but should return zero on success." % (self.name, ret))
2899
2900 def read(self, key: str, length: int = 8192, offset: int = 0) -> bytes:
2901 """
2902 Read data from an object synchronously
2903
2904 :param key: name of the object
2905 :param length: the number of bytes to read (default=8192)
2906 :param offset: byte offset in the object to begin reading at
2907
2908 :raises: :class:`TypeError`
2909 :raises: :class:`Error`
2910 :returns: data read from object
2911 """
2912 self.require_ioctx_open()
2913 key_raw = cstr(key, 'key')
2914 cdef:
2915 char *_key = key_raw
2916 char *ret_buf
2917 uint64_t _offset = offset
2918 size_t _length = length
2919 PyObject* ret_s = NULL
2920
2921 ret_s = PyBytes_FromStringAndSize(NULL, length)
2922 try:
2923 ret_buf = PyBytes_AsString(ret_s)
2924 with nogil:
2925 ret = rados_read(self.io, _key, ret_buf, _length, _offset)
2926 if ret < 0:
2927 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2928
2929 if ret != length:
2930 _PyBytes_Resize(&ret_s, ret)
2931
2932 return <object>ret_s
2933 finally:
2934 # We DECREF unconditionally: the cast to object above will have
2935 # INCREFed if necessary. This also takes care of exceptions,
2936 # including if _PyString_Resize fails (that will free the string
2937 # itself and set ret_s to NULL, hence XDECREF).
2938 ref.Py_XDECREF(ret_s)
2939
2940 def execute(self, key: str, cls: str, method: str, data: bytes, length: int = 8192) -> Tuple[int, object]:
2941 """
2942 Execute an OSD class method on an object.
2943
2944 :param key: name of the object
2945 :param cls: name of the object class
2946 :param method: name of the method
2947 :param data: input data
2948 :param length: size of output buffer in bytes (default=8192)
2949
2950 :raises: :class:`TypeError`
2951 :raises: :class:`Error`
2952 :returns: (ret, method output)
2953 """
2954 self.require_ioctx_open()
2955
2956 key_raw = cstr(key, 'key')
2957 cls_raw = cstr(cls, 'cls')
2958 method_raw = cstr(method, 'method')
2959 cdef:
2960 char *_key = key_raw
2961 char *_cls = cls_raw
2962 char *_method = method_raw
2963 char *_data = data
2964 size_t _data_len = len(data)
2965
2966 char *ref_buf
2967 size_t _length = length
2968 PyObject* ret_s = NULL
2969
2970 ret_s = PyBytes_FromStringAndSize(NULL, length)
2971 try:
2972 ret_buf = PyBytes_AsString(ret_s)
2973 with nogil:
2974 ret = rados_exec(self.io, _key, _cls, _method, _data,
2975 _data_len, ret_buf, _length)
2976 if ret < 0:
2977 raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
2978
2979 if ret != length:
2980 _PyBytes_Resize(&ret_s, ret)
2981
2982 return ret, <object>ret_s
2983 finally:
2984 # We DECREF unconditionally: the cast to object above will have
2985 # INCREFed if necessary. This also takes care of exceptions,
2986 # including if _PyString_Resize fails (that will free the string
2987 # itself and set ret_s to NULL, hence XDECREF).
2988 ref.Py_XDECREF(ret_s)
2989
2990 def get_stats(self) -> Dict[str, int]:
2991 """
2992 Get pool usage statistics
2993
2994 :returns: dict contains the following keys:
2995
2996 - ``num_bytes`` (int) - size of pool in bytes
2997
2998 - ``num_kb`` (int) - size of pool in kbytes
2999
3000 - ``num_objects`` (int) - number of objects in the pool
3001
3002 - ``num_object_clones`` (int) - number of object clones
3003
3004 - ``num_object_copies`` (int) - number of object copies
3005
3006 - ``num_objects_missing_on_primary`` (int) - number of objects
3007 missing on primary
3008
3009 - ``num_objects_unfound`` (int) - number of unfound objects
3010
3011 - ``num_objects_degraded`` (int) - number of degraded objects
3012
3013 - ``num_rd`` (int) - bytes read
3014
3015 - ``num_rd_kb`` (int) - kbytes read
3016
3017 - ``num_wr`` (int) - bytes written
3018
3019 - ``num_wr_kb`` (int) - kbytes written
3020 """
3021 self.require_ioctx_open()
3022 cdef rados_pool_stat_t stats
3023 with nogil:
3024 ret = rados_ioctx_pool_stat(self.io, &stats)
3025 if ret < 0:
3026 raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name)
3027 return {'num_bytes': stats.num_bytes,
3028 'num_kb': stats.num_kb,
3029 'num_objects': stats.num_objects,
3030 'num_object_clones': stats.num_object_clones,
3031 'num_object_copies': stats.num_object_copies,
3032 "num_objects_missing_on_primary": stats.num_objects_missing_on_primary,
3033 "num_objects_unfound": stats.num_objects_unfound,
3034 "num_objects_degraded": stats.num_objects_degraded,
3035 "num_rd": stats.num_rd,
3036 "num_rd_kb": stats.num_rd_kb,
3037 "num_wr": stats.num_wr,
3038 "num_wr_kb": stats.num_wr_kb}
3039
3040 def remove_object(self, key: str) -> bool:
3041 """
3042 Delete an object
3043
3044 This does not delete any snapshots of the object.
3045
3046 :param key: the name of the object to delete
3047
3048 :raises: :class:`TypeError`
3049 :raises: :class:`Error`
3050 :returns: True on success
3051 """
3052 self.require_ioctx_open()
3053 key_raw = cstr(key, 'key')
3054 cdef:
3055 char *_key = key_raw
3056
3057 with nogil:
3058 ret = rados_remove(self.io, _key)
3059 if ret < 0:
3060 raise make_ex(ret, "Failed to remove '%s'" % key)
3061 return True
3062
3063 def trunc(self, key: str, size: int) -> int:
3064 """
3065 Resize an object
3066
3067 If this enlarges the object, the new area is logically filled with
3068 zeroes. If this shrinks the object, the excess data is removed.
3069
3070 :param key: the name of the object to resize
3071 :param size: the new size of the object in bytes
3072
3073 :raises: :class:`TypeError`
3074 :raises: :class:`Error`
3075 :returns: 0 on success, otherwise raises error
3076 """
3077
3078 self.require_ioctx_open()
3079 key_raw = cstr(key, 'key')
3080 cdef:
3081 char *_key = key_raw
3082 uint64_t _size = size
3083
3084 with nogil:
3085 ret = rados_trunc(self.io, _key, _size)
3086 if ret < 0:
3087 raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))
3088 return ret
3089
3090 def cmpext(self, key: str, cmp_buf: bytes, offset: int = 0) -> int:
3091 '''
3092 Compare an on-disk object range with a buffer
3093 :param key: the name of the object
3094 :param cmp_buf: buffer containing bytes to be compared with object contents
3095 :param offset: object byte offset at which to start the comparison
3096
3097 :raises: :class:`TypeError`
3098 :raises: :class:`Error`
3099 :returns: 0 - on success, negative error code on failure,
3100 (-MAX_ERRNO - mismatch_off) on mismatch
3101 '''
3102 self.require_ioctx_open()
3103 key_raw = cstr(key, 'key')
3104 cdef:
3105 char *_key = key_raw
3106 char *_cmp_buf = cmp_buf
3107 size_t _cmp_buf_len = len(cmp_buf)
3108 uint64_t _offset = offset
3109 with nogil:
3110 ret = rados_cmpext(self.io, _key, _cmp_buf, _cmp_buf_len, _offset)
3111 assert ret < -MAX_ERRNO or ret == 0, "Ioctx.cmpext(%s): failed to compare %s" % (self.name, key)
3112 return ret
3113
3114 def stat(self, key: str) -> Tuple[int, time.struct_time]:
3115 """
3116 Get object stats (size/mtime)
3117
3118 :param key: the name of the object to get stats from
3119
3120 :raises: :class:`TypeError`
3121 :raises: :class:`Error`
3122 :returns: (size,timestamp)
3123 """
3124 self.require_ioctx_open()
3125
3126 key_raw = cstr(key, 'key')
3127 cdef:
3128 char *_key = key_raw
3129 uint64_t psize
3130 time_t pmtime
3131
3132 with nogil:
3133 ret = rados_stat(self.io, _key, &psize, &pmtime)
3134 if ret < 0:
3135 raise make_ex(ret, "Failed to stat %r" % key)
3136 return psize, time.localtime(pmtime)
3137
3138 def get_xattr(self, key: str, xattr_name: str) -> bytes:
3139 """
3140 Get the value of an extended attribute on an object.
3141
3142 :param key: the name of the object to get xattr from
3143 :param xattr_name: which extended attribute to read
3144
3145 :raises: :class:`TypeError`
3146 :raises: :class:`Error`
3147 :returns: value of the xattr
3148 """
3149 self.require_ioctx_open()
3150
3151 key_raw = cstr(key, 'key')
3152 xattr_name_raw = cstr(xattr_name, 'xattr_name')
3153 cdef:
3154 char *_key = key_raw
3155 char *_xattr_name = xattr_name_raw
3156 size_t ret_length = 4096
3157 char *ret_buf = NULL
3158
3159 try:
3160 while ret_length < 4096 * 1024 * 1024:
3161 ret_buf = <char *>realloc_chk(ret_buf, ret_length)
3162 with nogil:
3163 ret = rados_getxattr(self.io, _key, _xattr_name, ret_buf, ret_length)
3164 if ret == -errno.ERANGE:
3165 ret_length *= 2
3166 elif ret < 0:
3167 raise make_ex(ret, "Failed to get xattr %r" % xattr_name)
3168 else:
3169 break
3170 return ret_buf[:ret]
3171 finally:
3172 free(ret_buf)
3173
3174 def get_xattrs(self, oid: str) -> XattrIterator:
3175 """
3176 Start iterating over xattrs on an object.
3177
3178 :param oid: the name of the object to get xattrs from
3179
3180 :raises: :class:`TypeError`
3181 :raises: :class:`Error`
3182 :returns: XattrIterator
3183 """
3184 self.require_ioctx_open()
3185 return XattrIterator(self, oid)
3186
3187 def set_xattr(self, key: str, xattr_name: str, xattr_value: bytes) -> bool:
3188 """
3189 Set an extended attribute on an object.
3190
3191 :param key: the name of the object to set xattr to
3192 :param xattr_name: which extended attribute to set
3193 :param xattr_value: the value of the extended attribute
3194
3195 :raises: :class:`TypeError`
3196 :raises: :class:`Error`
3197 :returns: True on success, otherwise raise an error
3198 """
3199 self.require_ioctx_open()
3200
3201 key_raw = cstr(key, 'key')
3202 xattr_name_raw = cstr(xattr_name, 'xattr_name')
3203 cdef:
3204 char *_key = key_raw
3205 char *_xattr_name = xattr_name_raw
3206 char *_xattr_value = xattr_value
3207 size_t _xattr_value_len = len(xattr_value)
3208
3209 with nogil:
3210 ret = rados_setxattr(self.io, _key, _xattr_name,
3211 _xattr_value, _xattr_value_len)
3212 if ret < 0:
3213 raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
3214 return True
3215
3216 def rm_xattr(self, key: str, xattr_name: str) -> bool:
3217 """
3218 Removes an extended attribute on from an object.
3219
3220 :param key: the name of the object to remove xattr from
3221 :param xattr_name: which extended attribute to remove
3222
3223 :raises: :class:`TypeError`
3224 :raises: :class:`Error`
3225 :returns: True on success, otherwise raise an error
3226 """
3227 self.require_ioctx_open()
3228
3229 key_raw = cstr(key, 'key')
3230 xattr_name_raw = cstr(xattr_name, 'xattr_name')
3231 cdef:
3232 char *_key = key_raw
3233 char *_xattr_name = xattr_name_raw
3234
3235 with nogil:
3236 ret = rados_rmxattr(self.io, _key, _xattr_name)
3237 if ret < 0:
3238 raise make_ex(ret, "Failed to delete key %r xattr %r" %
3239 (key, xattr_name))
3240 return True
3241
3242 def notify(self, obj: str, msg: str = '', timeout_ms: int = 5000) -> bool:
3243 """
3244 Send a rados notification to an object.
3245
3246 :param obj: the name of the object to notify
3247 :param msg: optional message to send in the notification
3248 :param timeout_ms: notify timeout (in ms)
3249
3250 :raises: :class:`TypeError`
3251 :raises: :class:`Error`
3252 :returns: True on success, otherwise raise an error
3253 """
3254 self.require_ioctx_open()
3255
3256 msglen = len(msg)
3257 obj_raw = cstr(obj, 'obj')
3258 msg_raw = cstr(msg, 'msg')
3259 cdef:
3260 char *_obj = obj_raw
3261 char *_msg = msg_raw
3262 int _msglen = msglen
3263 uint64_t _timeout_ms = timeout_ms
3264
3265 with nogil:
3266 ret = rados_notify2(self.io, _obj, _msg, _msglen, _timeout_ms,
3267 NULL, NULL)
3268 if ret < 0:
3269 raise make_ex(ret, "Failed to notify %r" % (obj))
3270 return True
3271
3272 def aio_notify(self, obj: str,
3273 oncomplete: Callable[[Completion, int, Optional[List], Optional[List]], None],
3274 msg: str = '', timeout_ms: int = 5000) -> Completion:
3275 """
3276 Asynchronously send a rados notification to an object
3277 """
3278 self.require_ioctx_open()
3279
3280 msglen = len(msg)
3281 obj_raw = cstr(obj, 'obj')
3282 msg_raw = cstr(msg, 'msg')
3283
3284 cdef:
3285 Completion completion
3286 char *_obj = obj_raw
3287 char *_msg = msg_raw
3288 int _msglen = msglen
3289 uint64_t _timeout_ms = timeout_ms
3290 char *reply
3291 size_t replylen = 0
3292
3293 def oncomplete_(completion_v):
3294 cdef:
3295 Completion _completion_v = completion_v
3296 notify_ack_t *acks = NULL
3297 notify_timeout_t *timeouts = NULL
3298 size_t nr_acks
3299 size_t nr_timeouts
3300 return_value = _completion_v.get_return_value()
3301 if return_value == 0:
3302 return_value = rados_decode_notify_response(reply, replylen, &acks, &nr_acks, &timeouts, &nr_timeouts)
3303 rados_buffer_free(reply)
3304 if return_value == 0:
3305 ack_list = [(ack.notifier_id, ack.cookie, '' if not ack.payload_len \
3306 else ack.payload[:ack.payload_len]) for ack in acks[:nr_acks]]
3307 timeout_list = [(timeout.notifier_id, timeout.cookie) for timeout in timeouts[:nr_timeouts]]
3308 rados_free_notify_response(acks, nr_acks, timeouts)
3309 return oncomplete(_completion_v, 0, ack_list, timeout_list)
3310 else:
3311 return oncomplete(_completion_v, return_value, None, None)
3312
3313 completion = self.__get_completion(oncomplete_, None)
3314 self.__track_completion(completion)
3315 with nogil:
3316 ret = rados_aio_notify(self.io, _obj, completion.rados_comp,
3317 _msg, _msglen, _timeout_ms, &reply, &replylen)
3318 if ret < 0:
3319 completion._cleanup()
3320 raise make_ex(ret, "aio_notify error: %s" % obj)
3321 return completion
3322
3323 def watch(self, obj: str,
3324 callback: Callable[[int, str, int, bytes], None],
3325 error_callback: Optional[Callable[[int], None]] = None,
3326 timeout: Optional[int] = None) -> Watch:
3327 """
3328 Register an interest in an object.
3329
3330 :param obj: the name of the object to notify
3331 :param callback: what to do when a notify is received on this object
3332 :param error_callback: what to do when the watch session encounters an error
3333 :param timeout: how many seconds the connection will keep after disconnection
3334
3335 :raises: :class:`TypeError`
3336 :raises: :class:`Error`
3337 :returns: watch_id - internal id assigned to this watch
3338 """
3339 self.require_ioctx_open()
3340
3341 return Watch(self, obj, callback, error_callback, timeout)
3342
3343 def list_objects(self) -> ObjectIterator:
3344 """
3345 Get ObjectIterator on rados.Ioctx object.
3346
3347 :returns: ObjectIterator
3348 """
3349 self.require_ioctx_open()
3350 return ObjectIterator(self)
3351
3352 def list_snaps(self) -> SnapIterator:
3353 """
3354 Get SnapIterator on rados.Ioctx object.
3355
3356 :returns: SnapIterator
3357 """
3358 self.require_ioctx_open()
3359 return SnapIterator(self)
3360
3361 def get_pool_id(self) -> int:
3362 """
3363 Get pool id
3364
3365 :returns: int - pool id
3366 """
3367 with nogil:
3368 ret = rados_ioctx_get_id(self.io)
3369 return ret;
3370
3371 def get_pool_name(self) -> str:
3372 """
3373 Get pool name
3374
3375 :returns: pool name
3376 """
3377 cdef:
3378 int name_len = 10
3379 char *name = NULL
3380
3381 try:
3382 while True:
3383 name = <char *>realloc_chk(name, name_len)
3384 with nogil:
3385 ret = rados_ioctx_get_pool_name(self.io, name, name_len)
3386 if ret > 0:
3387 break
3388 elif ret != -errno.ERANGE:
3389 raise make_ex(ret, "get pool name error")
3390 else:
3391 name_len = name_len * 2
3392
3393 return decode_cstr(name)
3394 finally:
3395 free(name)
3396
3397
3398 def create_snap(self, snap_name: str):
3399 """
3400 Create a pool-wide snapshot
3401
3402 :param snap_name: the name of the snapshot
3403
3404 :raises: :class:`TypeError`
3405 :raises: :class:`Error`
3406 """
3407 self.require_ioctx_open()
3408 snap_name_raw = cstr(snap_name, 'snap_name')
3409 cdef char *_snap_name = snap_name_raw
3410
3411 with nogil:
3412 ret = rados_ioctx_snap_create(self.io, _snap_name)
3413 if ret != 0:
3414 raise make_ex(ret, "Failed to create snap %s" % snap_name)
3415
3416 def remove_snap(self, snap_name: str):
3417 """
3418 Removes a pool-wide snapshot
3419
3420 :param snap_name: the name of the snapshot
3421
3422 :raises: :class:`TypeError`
3423 :raises: :class:`Error`
3424 """
3425 self.require_ioctx_open()
3426 snap_name_raw = cstr(snap_name, 'snap_name')
3427 cdef char *_snap_name = snap_name_raw
3428
3429 with nogil:
3430 ret = rados_ioctx_snap_remove(self.io, _snap_name)
3431 if ret != 0:
3432 raise make_ex(ret, "Failed to remove snap %s" % snap_name)
3433
3434 def lookup_snap(self, snap_name: str) -> Snap:
3435 """
3436 Get the id of a pool snapshot
3437
3438 :param snap_name: the name of the snapshot to lookop
3439
3440 :raises: :class:`TypeError`
3441 :raises: :class:`Error`
3442 :returns: Snap - on success
3443 """
3444 self.require_ioctx_open()
3445 csnap_name = cstr(snap_name, 'snap_name')
3446 cdef:
3447 char *_snap_name = csnap_name
3448 rados_snap_t snap_id
3449
3450 with nogil:
3451 ret = rados_ioctx_snap_lookup(self.io, _snap_name, &snap_id)
3452 if ret != 0:
3453 raise make_ex(ret, "Failed to lookup snap %s" % snap_name)
3454 return Snap(self, snap_name, int(snap_id))
3455
3456 def snap_rollback(self, oid: str, snap_name: str):
3457 """
3458 Rollback an object to a snapshot
3459
3460 :param oid: the name of the object
3461 :param snap_name: the name of the snapshot
3462
3463 :raises: :class:`TypeError`
3464 :raises: :class:`Error`
3465 """
3466 self.require_ioctx_open()
3467 oid_raw = cstr(oid, 'oid')
3468 snap_name_raw = cstr(snap_name, 'snap_name')
3469 cdef:
3470 char *_oid = oid_raw
3471 char *_snap_name = snap_name_raw
3472
3473 with nogil:
3474 ret = rados_ioctx_snap_rollback(self.io, _oid, _snap_name)
3475 if ret != 0:
3476 raise make_ex(ret, "Failed to rollback %s" % oid)
3477
3478 def create_self_managed_snap(self):
3479 """
3480 Creates a self-managed snapshot
3481
3482 :returns: snap id on success
3483
3484 :raises: :class:`Error`
3485 """
3486 self.require_ioctx_open()
3487 cdef:
3488 rados_snap_t _snap_id
3489 with nogil:
3490 ret = rados_ioctx_selfmanaged_snap_create(self.io, &_snap_id)
3491 if ret != 0:
3492 raise make_ex(ret, "Failed to create self-managed snapshot")
3493 return int(_snap_id)
3494
3495 def remove_self_managed_snap(self, snap_id: int):
3496 """
3497 Removes a self-managed snapshot
3498
3499 :param snap_id: the name of the snapshot
3500
3501 :raises: :class:`TypeError`
3502 :raises: :class:`Error`
3503 """
3504 self.require_ioctx_open()
3505 cdef:
3506 rados_snap_t _snap_id = snap_id
3507 with nogil:
3508 ret = rados_ioctx_selfmanaged_snap_remove(self.io, _snap_id)
3509 if ret != 0:
3510 raise make_ex(ret, "Failed to remove self-managed snapshot")
3511
3512 def set_self_managed_snap_write(self, snaps: Sequence[Union[int, str]]):
3513 """
3514 Updates the write context to the specified self-managed
3515 snapshot ids.
3516
3517 :param snaps: all associated self-managed snapshot ids
3518
3519 :raises: :class:`TypeError`
3520 :raises: :class:`Error`
3521 """
3522 self.require_ioctx_open()
3523 sorted_snaps = []
3524 snap_seq = 0
3525 if snaps:
3526 sorted_snaps = sorted([int(x) for x in snaps], reverse=True)
3527 snap_seq = sorted_snaps[0]
3528
3529 cdef:
3530 rados_snap_t _snap_seq = snap_seq
3531 rados_snap_t *_snaps = NULL
3532 int _num_snaps = len(sorted_snaps)
3533 try:
3534 _snaps = <rados_snap_t *>malloc(_num_snaps * sizeof(rados_snap_t))
3535 for i in range(len(sorted_snaps)):
3536 _snaps[i] = sorted_snaps[i]
3537 with nogil:
3538 ret = rados_ioctx_selfmanaged_snap_set_write_ctx(self.io,
3539 _snap_seq,
3540 _snaps,
3541 _num_snaps)
3542 if ret != 0:
3543 raise make_ex(ret, "Failed to update snapshot write context")
3544 finally:
3545 free(_snaps)
3546
3547 def rollback_self_managed_snap(self, oid: str, snap_id: int):
3548 """
3549 Rolls an specific object back to a self-managed snapshot revision
3550
3551 :param oid: the name of the object
3552 :param snap_id: the name of the snapshot
3553
3554 :raises: :class:`TypeError`
3555 :raises: :class:`Error`
3556 """
3557 self.require_ioctx_open()
3558 oid_raw = cstr(oid, 'oid')
3559 cdef:
3560 char *_oid = oid_raw
3561 rados_snap_t _snap_id = snap_id
3562 with nogil:
3563 ret = rados_ioctx_selfmanaged_snap_rollback(self.io, _oid, _snap_id)
3564 if ret != 0:
3565 raise make_ex(ret, "Failed to rollback %s" % oid)
3566
3567 def get_last_version(self) -> int:
3568 """
3569 Return the version of the last object read or written to.
3570
3571 This exposes the internal version number of the last object read or
3572 written via this io context
3573
3574 :returns: version of the last object used
3575 """
3576 self.require_ioctx_open()
3577 with nogil:
3578 ret = rados_get_last_version(self.io)
3579 return int(ret)
3580
3581 def create_write_op(self) -> WriteOp:
3582 """
3583 create write operation object.
3584 need call release_write_op after use
3585 """
3586 return WriteOp().create()
3587
3588 def create_read_op(self) -> ReadOp:
3589 """
3590 create read operation object.
3591 need call release_read_op after use
3592 """
3593 return ReadOp().create()
3594
3595 def release_write_op(self, write_op):
3596 """
3597 release memory alloc by create_write_op
3598 """
3599 write_op.release()
3600
3601 def release_read_op(self, read_op: ReadOp):
3602 """
3603 release memory alloc by create_read_op
3604 :para read_op: read_op object
3605 """
3606 read_op.release()
3607
3608 def set_omap(self, write_op: WriteOp, keys: Sequence[str], values: Sequence[bytes]):
3609 """
3610 set keys values to write_op
3611 :para write_op: write_operation object
3612 :para keys: a tuple of keys
3613 :para values: a tuple of values
3614 """
3615
3616 if len(keys) != len(values):
3617 raise Error("Rados(): keys and values must have the same number of items")
3618
3619 keys = cstr_list(keys, 'keys')
3620 values = cstr_list(values, 'values')
3621 lens = [len(v) for v in values]
3622 cdef:
3623 WriteOp _write_op = write_op
3624 size_t key_num = len(keys)
3625 char **_keys = to_bytes_array(keys)
3626 char **_values = to_bytes_array(values)
3627 size_t *_lens = to_csize_t_array(lens)
3628
3629 try:
3630 with nogil:
3631 rados_write_op_omap_set(_write_op.write_op,
3632 <const char**>_keys,
3633 <const char**>_values,
3634 <const size_t*>_lens, key_num)
3635 finally:
3636 free(_keys)
3637 free(_values)
3638 free(_lens)
3639
3640 def operate_write_op(self,
3641 write_op: WriteOp,
3642 oid: str,
3643 mtime: int = 0,
3644 flags: int = LIBRADOS_OPERATION_NOFLAG):
3645 """
3646 execute the real write operation
3647 :para write_op: write operation object
3648 :para oid: object name
3649 :para mtime: the time to set the mtime to, 0 for the current time
3650 :para flags: flags to apply to the entire operation
3651 """
3652
3653 oid_raw = cstr(oid, 'oid')
3654 cdef:
3655 WriteOp _write_op = write_op
3656 char *_oid = oid_raw
3657 time_t _mtime = mtime
3658 int _flags = flags
3659
3660 with nogil:
3661 ret = rados_write_op_operate(_write_op.write_op, self.io, _oid, &_mtime, _flags)
3662 if ret != 0:
3663 raise make_ex(ret, "Failed to operate write op for oid %s" % oid)
3664
3665 def operate_aio_write_op(self, write_op: WriteOp, oid: str,
3666 oncomplete: Optional[Callable[[Completion], None]] = None,
3667 onsafe: Optional[Callable[[Completion], None]] = None,
3668 mtime: int = 0,
3669 flags: int = LIBRADOS_OPERATION_NOFLAG) -> Completion:
3670 """
3671 execute the real write operation asynchronously
3672 :para write_op: write operation object
3673 :para oid: object name
3674 :param oncomplete: what to do when the remove is safe and complete in memory
3675 on all replicas
3676 :param onsafe: what to do when the remove is safe and complete on storage
3677 on all replicas
3678 :para mtime: the time to set the mtime to, 0 for the current time
3679 :para flags: flags to apply to the entire operation
3680
3681 :raises: :class:`Error`
3682 :returns: completion object
3683 """
3684
3685 oid_raw = cstr(oid, 'oid')
3686 cdef:
3687 WriteOp _write_op = write_op
3688 char *_oid = oid_raw
3689 Completion completion
3690 time_t _mtime = mtime
3691 int _flags = flags
3692
3693 completion = self.__get_completion(oncomplete, onsafe)
3694 self.__track_completion(completion)
3695
3696 with nogil:
3697 ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid,
3698 &_mtime, _flags)
3699 if ret != 0:
3700 completion._cleanup()
3701 raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid)
3702 return completion
3703
3704 def operate_read_op(self, read_op: ReadOp, oid: str, flag: int = LIBRADOS_OPERATION_NOFLAG):
3705 """
3706 execute the real read operation
3707 :para read_op: read operation object
3708 :para oid: object name
3709 :para flag: flags to apply to the entire operation
3710 """
3711 oid_raw = cstr(oid, 'oid')
3712 cdef:
3713 ReadOp _read_op = read_op
3714 char *_oid = oid_raw
3715 int _flag = flag
3716
3717 with nogil:
3718 ret = rados_read_op_operate(_read_op.read_op, self.io, _oid, _flag)
3719 if ret != 0:
3720 raise make_ex(ret, "Failed to operate read op for oid %s" % oid)
3721
3722 def operate_aio_read_op(self, read_op: ReadOp, oid: str,
3723 oncomplete: Optional[Callable[[Completion], None]] = None,
3724 onsafe: Optional[Callable[[Completion], None]] = None,
3725 flag: int = LIBRADOS_OPERATION_NOFLAG) -> Completion:
3726 """
3727 execute the real read operation
3728 :para read_op: read operation object
3729 :para oid: object name
3730 :param oncomplete: what to do when the remove is safe and complete in memory
3731 on all replicas
3732 :param onsafe: what to do when the remove is safe and complete on storage
3733 on all replicas
3734 :para flag: flags to apply to the entire operation
3735 """
3736 oid_raw = cstr(oid, 'oid')
3737 cdef:
3738 ReadOp _read_op = read_op
3739 char *_oid = oid_raw
3740 Completion completion
3741 int _flag = flag
3742
3743 completion = self.__get_completion(oncomplete, onsafe)
3744 self.__track_completion(completion)
3745
3746 with nogil:
3747 ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag)
3748 if ret != 0:
3749 completion._cleanup()
3750 raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid)
3751 return completion
3752
3753 def get_omap_vals(self,
3754 read_op: ReadOp,
3755 start_after: str,
3756 filter_prefix: str,
3757 max_return: int) -> Tuple[OmapIterator, int]:
3758 """
3759 get the omap values
3760 :para read_op: read operation object
3761 :para start_after: list keys starting after start_after
3762 :para filter_prefix: list only keys beginning with filter_prefix
3763 :para max_return: list no more than max_return key/value pairs
3764 :returns: an iterator over the requested omap values, return value from this action
3765 """
3766
3767 start_after_raw = cstr(start_after, 'start_after') if start_after else None
3768 filter_prefix_raw = cstr(filter_prefix, 'filter_prefix') if filter_prefix else None
3769 cdef:
3770 char *_start_after = opt_str(start_after_raw)
3771 char *_filter_prefix = opt_str(filter_prefix_raw)
3772 ReadOp _read_op = read_op
3773 rados_omap_iter_t iter_addr = NULL
3774 int _max_return = max_return
3775
3776 with nogil:
3777 rados_read_op_omap_get_vals2(_read_op.read_op, _start_after, _filter_prefix,
3778 _max_return, &iter_addr, NULL, NULL)
3779 it = OmapIterator(self)
3780 it.ctx = iter_addr
3781 return it, 0 # 0 is meaningless; there for backward-compat
3782
3783 def get_omap_keys(self, read_op: ReadOp, start_after: str, max_return: int) -> Tuple[OmapIterator, int]:
3784 """
3785 get the omap keys
3786 :para read_op: read operation object
3787 :para start_after: list keys starting after start_after
3788 :para max_return: list no more than max_return key/value pairs
3789 :returns: an iterator over the requested omap values, return value from this action
3790 """
3791 start_after = cstr(start_after, 'start_after') if start_after else None
3792 cdef:
3793 char *_start_after = opt_str(start_after)
3794 ReadOp _read_op = read_op
3795 rados_omap_iter_t iter_addr = NULL
3796 int _max_return = max_return
3797
3798 with nogil:
3799 rados_read_op_omap_get_keys2(_read_op.read_op, _start_after,
3800 _max_return, &iter_addr, NULL, NULL)
3801 it = OmapIterator(self)
3802 it.ctx = iter_addr
3803 return it, 0 # 0 is meaningless; there for backward-compat
3804
3805 def get_omap_vals_by_keys(self, read_op: ReadOp, keys: Sequence[str]) -> Tuple[OmapIterator, int]:
3806 """
3807 get the omap values by keys
3808 :para read_op: read operation object
3809 :para keys: input key tuple
3810 :returns: an iterator over the requested omap values, return value from this action
3811 """
3812 keys = cstr_list(keys, 'keys')
3813 cdef:
3814 ReadOp _read_op = read_op
3815 rados_omap_iter_t iter_addr
3816 char **_keys = to_bytes_array(keys)
3817 size_t key_num = len(keys)
3818
3819 try:
3820 with nogil:
3821 rados_read_op_omap_get_vals_by_keys(_read_op.read_op,
3822 <const char**>_keys,
3823 key_num, &iter_addr, NULL)
3824 it = OmapIterator(self)
3825 it.ctx = iter_addr
3826 return it, 0 # 0 is meaningless; there for backward-compat
3827 finally:
3828 free(_keys)
3829
3830 def remove_omap_keys(self, write_op: WriteOp, keys: Sequence[str]):
3831 """
3832 remove omap keys specifiled
3833 :para write_op: write operation object
3834 :para keys: input key tuple
3835 """
3836
3837 keys = cstr_list(keys, 'keys')
3838 cdef:
3839 WriteOp _write_op = write_op
3840 size_t key_num = len(keys)
3841 char **_keys = to_bytes_array(keys)
3842
3843 try:
3844 with nogil:
3845 rados_write_op_omap_rm_keys(_write_op.write_op, <const char**>_keys, key_num)
3846 finally:
3847 free(_keys)
3848
3849 def clear_omap(self, write_op: WriteOp):
3850 """
3851 Remove all key/value pairs from an object
3852 :para write_op: write operation object
3853 """
3854
3855 cdef:
3856 WriteOp _write_op = write_op
3857
3858 with nogil:
3859 rados_write_op_omap_clear(_write_op.write_op)
3860
3861 def remove_omap_range2(self, write_op: WriteOp, key_begin: str, key_end: str):
3862 """
3863 Remove key/value pairs from an object whose keys are in the range
3864 [key_begin, key_end)
3865 :param write_op: write operation object
3866 :param key_begin: the lower bound of the key range to remove
3867 :param key_end: the upper bound of the key range to remove
3868 """
3869 key_begin_raw = cstr(key_begin, 'key_begin')
3870 key_end_raw = cstr(key_end, 'key_end')
3871 cdef:
3872 WriteOp _write_op = write_op
3873 char* _key_begin = key_begin_raw
3874 size_t key_begin_len = len(key_begin)
3875 char* _key_end = key_end_raw
3876 size_t key_end_len = len(key_end)
3877 with nogil:
3878 rados_write_op_omap_rm_range2(_write_op.write_op, _key_begin, key_begin_len,
3879 _key_end, key_end_len)
3880
3881 def lock_exclusive(self, key: str, name: str, cookie: str, desc: str = "",
3882 duration: Optional[int] = None,
3883 flags: int = 0):
3884
3885 """
3886 Take an exclusive lock on an object
3887
3888 :param key: name of the object
3889 :param name: name of the lock
3890 :param cookie: cookie of the lock
3891 :param desc: description of the lock
3892 :param duration: duration of the lock in seconds
3893 :param flags: flags
3894
3895 :raises: :class:`TypeError`
3896 :raises: :class:`Error`
3897 """
3898 self.require_ioctx_open()
3899
3900 key_raw = cstr(key, 'key')
3901 name_raw = cstr(name, 'name')
3902 cookie_raw = cstr(cookie, 'cookie')
3903 desc_raw = cstr(desc, 'desc')
3904
3905 cdef:
3906 char* _key = key_raw
3907 char* _name = name_raw
3908 char* _cookie = cookie_raw
3909 char* _desc = desc_raw
3910 uint8_t _flags = flags
3911 timeval _duration
3912
3913 if duration is None:
3914 with nogil:
3915 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3916 NULL, _flags)
3917 else:
3918 _duration.tv_sec = duration
3919 with nogil:
3920 ret = rados_lock_exclusive(self.io, _key, _name, _cookie, _desc,
3921 &_duration, _flags)
3922
3923 if ret < 0:
3924 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3925
3926 def lock_shared(self, key: str, name: str, cookie: str, tag: str, desc: str = "",
3927 duration: Optional[int] = None,
3928 flags: int = 0):
3929
3930 """
3931 Take a shared lock on an object
3932
3933 :param key: name of the object
3934 :param name: name of the lock
3935 :param cookie: cookie of the lock
3936 :param tag: tag of the lock
3937 :param desc: description of the lock
3938 :param duration: duration of the lock in seconds
3939 :param flags: flags
3940
3941 :raises: :class:`TypeError`
3942 :raises: :class:`Error`
3943 """
3944 self.require_ioctx_open()
3945
3946 key_raw = cstr(key, 'key')
3947 tag_raw = cstr(tag, 'tag')
3948 name_raw = cstr(name, 'name')
3949 cookie_raw = cstr(cookie, 'cookie')
3950 desc_raw = cstr(desc, 'desc')
3951
3952 cdef:
3953 char* _key = key_raw
3954 char* _tag = tag_raw
3955 char* _name = name_raw
3956 char* _cookie = cookie_raw
3957 char* _desc = desc_raw
3958 uint8_t _flags = flags
3959 timeval _duration
3960
3961 if duration is None:
3962 with nogil:
3963 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3964 NULL, _flags)
3965 else:
3966 _duration.tv_sec = duration
3967 with nogil:
3968 ret = rados_lock_shared(self.io, _key, _name, _cookie, _tag, _desc,
3969 &_duration, _flags)
3970 if ret < 0:
3971 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
3972
3973 def unlock(self, key: str, name: str, cookie: str):
3974
3975 """
3976 Release a shared or exclusive lock on an object
3977
3978 :param key: name of the object
3979 :param name: name of the lock
3980 :param cookie: cookie of the lock
3981
3982 :raises: :class:`TypeError`
3983 :raises: :class:`Error`
3984 """
3985 self.require_ioctx_open()
3986
3987 key_raw = cstr(key, 'key')
3988 name_raw = cstr(name, 'name')
3989 cookie_raw = cstr(cookie, 'cookie')
3990
3991 cdef:
3992 char* _key = key_raw
3993 char* _name = name_raw
3994 char* _cookie = cookie_raw
3995
3996 with nogil:
3997 ret = rados_unlock(self.io, _key, _name, _cookie)
3998 if ret < 0:
3999 raise make_ex(ret, "Ioctx.rados_lock_exclusive(%s): failed to set lock %s on %s" % (self.name, name, key))
4000
4001 def set_osdmap_full_try(self):
4002 """
4003 Set global osdmap_full_try label to true
4004 """
4005 with nogil:
4006 rados_set_pool_full_try(self.io)
4007
4008 def unset_osdmap_full_try(self):
4009 """
4010 Unset
4011 """
4012 with nogil:
4013 rados_unset_pool_full_try(self.io)
4014
4015 def application_enable(self, app_name: str, force: bool = False):
4016 """
4017 Enable an application on an OSD pool
4018
4019 :param app_name: application name
4020 :type app_name: str
4021 :param force: False if only a single app should exist per pool
4022 :type expire_seconds: boool
4023
4024 :raises: :class:`Error`
4025 """
4026 app_name_raw = cstr(app_name, 'app_name')
4027 cdef:
4028 char *_app_name = app_name_raw
4029 int _force = (1 if force else 0)
4030
4031 with nogil:
4032 ret = rados_application_enable(self.io, _app_name, _force)
4033 if ret < 0:
4034 raise make_ex(ret, "error enabling application")
4035
4036 def application_list(self) -> List[str]:
4037 """
4038 Returns a list of enabled applications
4039
4040 :returns: list of app name string
4041 """
4042 cdef:
4043 size_t length = 128
4044 char *apps = NULL
4045
4046 try:
4047 while True:
4048 apps = <char *>realloc_chk(apps, length)
4049 with nogil:
4050 ret = rados_application_list(self.io, apps, &length)
4051 if ret == 0:
4052 return [decode_cstr(app) for app in
4053 apps[:length].split(b'\0') if app]
4054 elif ret == -errno.ENOENT:
4055 return None
4056 elif ret == -errno.ERANGE:
4057 pass
4058 else:
4059 raise make_ex(ret, "error listing applications")
4060 finally:
4061 free(apps)
4062
4063 def application_metadata_get(self, app_name: str, key: str) -> str:
4064 """
4065 Gets application metadata on an OSD pool for the given key
4066
4067 :param app_name: application name
4068 :type app_name: str
4069 :param key: metadata key
4070 :type key: str
4071 :returns: str - metadata value
4072
4073 :raises: :class:`Error`
4074 """
4075
4076 app_name_raw = cstr(app_name, 'app_name')
4077 key_raw = cstr(key, 'key')
4078 cdef:
4079 char *_app_name = app_name_raw
4080 char *_key = key_raw
4081 size_t size = 129
4082 char *value = NULL
4083 int ret
4084 try:
4085 while True:
4086 value = <char *>realloc_chk(value, size)
4087 with nogil:
4088 ret = rados_application_metadata_get(self.io, _app_name,
4089 _key, value, &size)
4090 if ret != -errno.ERANGE:
4091 break
4092 if ret == -errno.ENOENT:
4093 raise KeyError('no metadata %s for application %s' % (key, _app_name))
4094 elif ret != 0:
4095 raise make_ex(ret, 'error getting metadata %s for application %s' %
4096 (key, _app_name))
4097 return decode_cstr(value)
4098 finally:
4099 free(value)
4100
4101 def application_metadata_set(self, app_name: str, key: str, value: str):
4102 """
4103 Sets application metadata on an OSD pool
4104
4105 :param app_name: application name
4106 :type app_name: str
4107 :param key: metadata key
4108 :type key: str
4109 :param value: metadata value
4110 :type value: str
4111
4112 :raises: :class:`Error`
4113 """
4114 app_name_raw = cstr(app_name, 'app_name')
4115 key_raw = cstr(key, 'key')
4116 value_raw = cstr(value, 'value')
4117 cdef:
4118 char *_app_name = app_name_raw
4119 char *_key = key_raw
4120 char *_value = value_raw
4121
4122 with nogil:
4123 ret = rados_application_metadata_set(self.io, _app_name, _key,
4124 _value)
4125 if ret < 0:
4126 raise make_ex(ret, "error setting application metadata")
4127
4128 def application_metadata_remove(self, app_name: str, key: str):
4129 """
4130 Remove application metadata from an OSD pool
4131
4132 :param app_name: application name
4133 :type app_name: str
4134 :param key: metadata key
4135 :type key: str
4136
4137 :raises: :class:`Error`
4138 """
4139 app_name_raw = cstr(app_name, 'app_name')
4140 key_raw = cstr(key, 'key')
4141 cdef:
4142 char *_app_name = app_name_raw
4143 char *_key = key_raw
4144
4145 with nogil:
4146 ret = rados_application_metadata_remove(self.io, _app_name, _key)
4147 if ret < 0:
4148 raise make_ex(ret, "error removing application metadata")
4149
4150 def application_metadata_list(self, app_name: str) -> List[Tuple[str, str]]:
4151 """
4152 Returns a list of enabled applications
4153
4154 :param app_name: application name
4155 :type app_name: str
4156 :returns: list of key/value tuples
4157 """
4158 app_name_raw = cstr(app_name, 'app_name')
4159 cdef:
4160 char *_app_name = app_name_raw
4161 size_t key_length = 128
4162 size_t val_length = 128
4163 char *c_keys = NULL
4164 char *c_vals = NULL
4165
4166 try:
4167 while True:
4168 c_keys = <char *>realloc_chk(c_keys, key_length)
4169 c_vals = <char *>realloc_chk(c_vals, val_length)
4170 with nogil:
4171 ret = rados_application_metadata_list(self.io, _app_name,
4172 c_keys, &key_length,
4173 c_vals, &val_length)
4174 if ret == 0:
4175 keys = [decode_cstr(key) for key in
4176 c_keys[:key_length].split(b'\0')]
4177 vals = [decode_cstr(val) for val in
4178 c_vals[:val_length].split(b'\0')]
4179 return list(zip(keys, vals))[:-1]
4180 elif ret == -errno.ERANGE:
4181 pass
4182 else:
4183 raise make_ex(ret, "error listing application metadata")
4184 finally:
4185 free(c_keys)
4186 free(c_vals)
4187
4188 def alignment(self) -> int:
4189 """
4190 Returns pool alignment
4191
4192 :returns:
4193 Number of alignment bytes required by the current pool, or None if
4194 alignment is not required.
4195 """
4196 cdef:
4197 int requires = 0
4198 uint64_t _alignment
4199
4200 with nogil:
4201 ret = rados_ioctx_pool_requires_alignment2(self.io, &requires)
4202 if ret != 0:
4203 raise make_ex(ret, "error checking alignment")
4204
4205 alignment = None
4206 if requires:
4207 with nogil:
4208 ret = rados_ioctx_pool_required_alignment2(self.io, &_alignment)
4209 if ret != 0:
4210 raise make_ex(ret, "error querying alignment")
4211 alignment = _alignment
4212 return alignment
4213
4214
4215 def set_object_locator(func):
4216 def retfunc(self, *args, **kwargs):
4217 if self.locator_key is not None:
4218 old_locator = self.ioctx.get_locator_key()
4219 self.ioctx.set_locator_key(self.locator_key)
4220 retval = func(self, *args, **kwargs)
4221 self.ioctx.set_locator_key(old_locator)
4222 return retval
4223 else:
4224 return func(self, *args, **kwargs)
4225 return retfunc
4226
4227
4228 def set_object_namespace(func):
4229 def retfunc(self, *args, **kwargs):
4230 if self.nspace is None:
4231 raise LogicError("Namespace not set properly in context")
4232 old_nspace = self.ioctx.get_namespace()
4233 self.ioctx.set_namespace(self.nspace)
4234 retval = func(self, *args, **kwargs)
4235 self.ioctx.set_namespace(old_nspace)
4236 return retval
4237 return retfunc
4238
4239
4240 class Object(object):
4241 """Rados object wrapper, makes the object look like a file"""
4242 def __init__(self, ioctx, key, locator_key=None, nspace=None):
4243 self.key = key
4244 self.ioctx = ioctx
4245 self.offset = 0
4246 self.state = "exists"
4247 self.locator_key = locator_key
4248 self.nspace = "" if nspace is None else nspace
4249
4250 def __str__(self):
4251 return "rados.Object(ioctx=%s,key=%s,nspace=%s,locator=%s)" % \
4252 (str(self.ioctx), self.key, "--default--"
4253 if self.nspace is "" else self.nspace, self.locator_key)
4254
4255 def require_object_exists(self):
4256 if self.state != "exists":
4257 raise ObjectStateError("The object is %s" % self.state)
4258
4259 @set_object_locator
4260 @set_object_namespace
4261 def read(self, length=1024 * 1024):
4262 self.require_object_exists()
4263 ret = self.ioctx.read(self.key, length, self.offset)
4264 self.offset += len(ret)
4265 return ret
4266
4267 @set_object_locator
4268 @set_object_namespace
4269 def write(self, string_to_write):
4270 self.require_object_exists()
4271 ret = self.ioctx.write(self.key, string_to_write, self.offset)
4272 if ret == 0:
4273 self.offset += len(string_to_write)
4274 return ret
4275
4276 @set_object_locator
4277 @set_object_namespace
4278 def remove(self):
4279 self.require_object_exists()
4280 self.ioctx.remove_object(self.key)
4281 self.state = "removed"
4282
4283 @set_object_locator
4284 @set_object_namespace
4285 def stat(self) -> Tuple[int, time.struct_time]:
4286 self.require_object_exists()
4287 return self.ioctx.stat(self.key)
4288
4289 def seek(self, position: int):
4290 self.require_object_exists()
4291 self.offset = position
4292
4293 @set_object_locator
4294 @set_object_namespace
4295 def get_xattr(self, xattr_name: str) -> bytes:
4296 self.require_object_exists()
4297 return self.ioctx.get_xattr(self.key, xattr_name)
4298
4299 @set_object_locator
4300 @set_object_namespace
4301 def get_xattrs(self) -> XattrIterator:
4302 self.require_object_exists()
4303 return self.ioctx.get_xattrs(self.key)
4304
4305 @set_object_locator
4306 @set_object_namespace
4307 def set_xattr(self, xattr_name: str, xattr_value: bytes) -> bool:
4308 self.require_object_exists()
4309 return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)
4310
4311 @set_object_locator
4312 @set_object_namespace
4313 def rm_xattr(self, xattr_name: str) -> bool:
4314 self.require_object_exists()
4315 return self.ioctx.rm_xattr(self.key, xattr_name)
4316
4317 MONITOR_LEVELS = [
4318 "debug",
4319 "info",
4320 "warn", "warning",
4321 "err", "error",
4322 "sec",
4323 ]
4324
4325
4326 class MonitorLog(object):
4327 # NOTE(sileht): Keep this class for backward compat
4328 # method moved to Rados.monitor_log()
4329 """
4330 For watching cluster log messages. Instantiate an object and keep
4331 it around while callback is periodically called. Construct with
4332 'level' to monitor 'level' messages (one of MONITOR_LEVELS).
4333 arg will be passed to the callback.
4334
4335 callback will be called with:
4336 arg (given to __init__)
4337 line (the full line, including timestamp, who, level, msg)
4338 who (which entity issued the log message)
4339 timestamp_sec (sec of a struct timespec)
4340 timestamp_nsec (sec of a struct timespec)
4341 seq (sequence number)
4342 level (string representing the level of the log message)
4343 msg (the message itself)
4344 callback's return value is ignored
4345 """
4346 def __init__(self, cluster, level, callback, arg):
4347 self.level = level
4348 self.callback = callback
4349 self.arg = arg
4350 self.cluster = cluster
4351 self.cluster.monitor_log(level, callback, arg)
4352