1 from __future__
import print_function
2 from nose
import SkipTest
3 from nose
.plugins
.attrib
import attr
4 from nose
.tools
import eq_
as eq
, ok_
as ok
, assert_raises
5 from rados
import (Rados
, Error
, RadosStateError
, Object
, ObjectExists
,
6 ObjectNotFound
, ObjectBusy
, NotConnected
,
7 LIBRADOS_ALL_NSPACES
, WriteOpCtx
, ReadOpCtx
, LIBRADOS_CREATE_EXCLUSIVE
,
8 LIBRADOS_CMPXATTR_OP_EQ
, LIBRADOS_CMPXATTR_OP_GT
, LIBRADOS_CMPXATTR_OP_LT
, OSError,
9 LIBRADOS_SNAP_HEAD
, LIBRADOS_OPERATION_BALANCE_READS
, LIBRADOS_OPERATION_SKIPRWLOCKS
, MonitorLog
, MAX_ERRNO
, NoData
, ExtendMismatch
)
10 from datetime
import timedelta
19 def test_rados_init_error():
20 assert_raises(Error
, Rados
, conffile
='', rados_id
='admin',
22 assert_raises(Error
, Rados
, conffile
='', name
='invalid')
23 assert_raises(Error
, Rados
, conffile
='', name
='bad.invalid')
25 def test_rados_init():
26 with
Rados(conffile
='', rados_id
='admin'):
28 with
Rados(conffile
='', name
='client.admin'):
30 with
Rados(conffile
='', name
='client.admin'):
32 with
Rados(conffile
='', name
='client.admin'):
35 def test_ioctx_context_manager():
36 with
Rados(conffile
='', rados_id
='admin') as conn
:
37 with conn
.open_ioctx('rbd') as ioctx
:
40 def test_parse_argv():
41 args
= ['osd', 'pool', 'delete', 'foobar', 'foobar', '--yes-i-really-really-mean-it']
43 eq(args
, r
.conf_parse_argv(args
))
45 def test_parse_argv_empty_str():
48 eq(args
, r
.conf_parse_argv(args
))
50 class TestRadosStateError(object):
51 def _requires_configuring(self
, rados
):
52 assert_raises(RadosStateError
, rados
.connect
)
54 def _requires_configuring_or_connected(self
, rados
):
55 assert_raises(RadosStateError
, rados
.conf_read_file
)
56 assert_raises(RadosStateError
, rados
.conf_parse_argv
, None)
57 assert_raises(RadosStateError
, rados
.conf_parse_env
)
58 assert_raises(RadosStateError
, rados
.conf_get
, 'opt')
59 assert_raises(RadosStateError
, rados
.conf_set
, 'opt', 'val')
60 assert_raises(RadosStateError
, rados
.ping_monitor
, '0')
62 def _requires_connected(self
, rados
):
63 assert_raises(RadosStateError
, rados
.pool_exists
, 'foo')
64 assert_raises(RadosStateError
, rados
.pool_lookup
, 'foo')
65 assert_raises(RadosStateError
, rados
.pool_reverse_lookup
, 0)
66 assert_raises(RadosStateError
, rados
.create_pool
, 'foo')
67 assert_raises(RadosStateError
, rados
.get_pool_base_tier
, 0)
68 assert_raises(RadosStateError
, rados
.delete_pool
, 'foo')
69 assert_raises(RadosStateError
, rados
.list_pools
)
70 assert_raises(RadosStateError
, rados
.get_fsid
)
71 assert_raises(RadosStateError
, rados
.open_ioctx
, 'foo')
72 assert_raises(RadosStateError
, rados
.mon_command
, '', b
'')
73 assert_raises(RadosStateError
, rados
.osd_command
, 0, '', b
'')
74 assert_raises(RadosStateError
, rados
.pg_command
, '', '', b
'')
75 assert_raises(RadosStateError
, rados
.wait_for_latest_osdmap
)
76 assert_raises(RadosStateError
, rados
.blocklist_add
, '127.0.0.1/123', 0)
78 def test_configuring(self
):
79 rados
= Rados(conffile
='')
80 eq('configuring', rados
.state
)
81 self
._requires
_connected
(rados
)
83 def test_connected(self
):
84 rados
= Rados(conffile
='')
86 eq('connected', rados
.state
)
87 self
._requires
_configuring
(rados
)
89 def test_shutdown(self
):
90 rados
= Rados(conffile
='')
93 eq('shutdown', rados
.state
)
94 self
._requires
_configuring
(rados
)
95 self
._requires
_configuring
_or
_connected
(rados
)
96 self
._requires
_connected
(rados
)
99 class TestRados(object):
102 self
.rados
= Rados(conffile
='')
103 self
.rados
.conf_parse_env('FOO_DOES_NOT_EXIST_BLAHBLAH')
104 self
.rados
.conf_parse_env()
107 # Assume any pre-existing pools are the cluster's defaults
108 self
.default_pools
= self
.rados
.list_pools()
111 self
.rados
.shutdown()
113 def test_ping_monitor(self
):
114 assert_raises(ObjectNotFound
, self
.rados
.ping_monitor
, 'not_exists_monitor')
115 cmd
= {'prefix': 'mon dump', 'format':'json'}
116 ret
, buf
, out
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
117 for mon
in json
.loads(buf
.decode('utf8'))['mons']:
119 output
= self
.rados
.ping_monitor(mon
['name'])
122 buf
= json
.loads(output
)
123 if buf
.get('health'):
126 def test_annotations(self
):
127 with
assert_raises(TypeError):
128 self
.rados
.create_pool(0xf00)
130 def test_create(self
):
131 self
.rados
.create_pool('foo')
132 self
.rados
.delete_pool('foo')
134 def test_create_utf8(self
):
136 self
.rados
.create_pool(poolname
)
137 assert self
.rados
.pool_exists(u
"\u9ec4")
138 self
.rados
.delete_pool(poolname
)
140 def test_pool_lookup_utf8(self
):
142 self
.rados
.create_pool(poolname
)
144 poolid
= self
.rados
.pool_lookup(poolname
)
145 eq(poolname
, self
.rados
.pool_reverse_lookup(poolid
))
147 self
.rados
.delete_pool(poolname
)
149 def test_eexist(self
):
150 self
.rados
.create_pool('foo')
151 assert_raises(ObjectExists
, self
.rados
.create_pool
, 'foo')
152 self
.rados
.delete_pool('foo')
154 def list_non_default_pools(self
):
155 pools
= self
.rados
.list_pools()
156 for p
in self
.default_pools
:
160 def test_list_pools(self
):
161 eq(set(), self
.list_non_default_pools())
162 self
.rados
.create_pool('foo')
163 eq(set(['foo']), self
.list_non_default_pools())
164 self
.rados
.create_pool('bar')
165 eq(set(['foo', 'bar']), self
.list_non_default_pools())
166 self
.rados
.create_pool('baz')
167 eq(set(['foo', 'bar', 'baz']), self
.list_non_default_pools())
168 self
.rados
.delete_pool('foo')
169 eq(set(['bar', 'baz']), self
.list_non_default_pools())
170 self
.rados
.delete_pool('baz')
171 eq(set(['bar']), self
.list_non_default_pools())
172 self
.rados
.delete_pool('bar')
173 eq(set(), self
.list_non_default_pools())
174 self
.rados
.create_pool('a' * 500)
175 eq(set(['a' * 500]), self
.list_non_default_pools())
176 self
.rados
.delete_pool('a' * 500)
179 def test_get_pool_base_tier(self
):
180 self
.rados
.create_pool('foo')
182 self
.rados
.create_pool('foo-cache')
184 pool_id
= self
.rados
.pool_lookup('foo')
185 tier_pool_id
= self
.rados
.pool_lookup('foo-cache')
187 cmd
= {"prefix":"osd tier add", "pool":"foo", "tierpool":"foo-cache", "force_nonempty":""}
188 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
192 cmd
= {"prefix":"osd tier cache-mode", "pool":"foo-cache", "tierpool":"foo-cache", "mode":"readonly", "yes_i_really_mean_it": True}
193 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
196 eq(self
.rados
.wait_for_latest_osdmap(), 0)
198 eq(pool_id
, self
.rados
.get_pool_base_tier(pool_id
))
199 eq(pool_id
, self
.rados
.get_pool_base_tier(tier_pool_id
))
201 cmd
= {"prefix":"osd tier remove", "pool":"foo", "tierpool":"foo-cache"}
202 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
205 self
.rados
.delete_pool('foo-cache')
207 self
.rados
.delete_pool('foo')
209 def test_get_fsid(self
):
210 fsid
= self
.rados
.get_fsid()
211 assert re
.match('[0-9a-f\-]{36}', fsid
, re
.I
)
213 def test_blocklist_add(self
):
214 self
.rados
.blocklist_add("1.2.3.4/123", 1)
217 def test_get_cluster_stats(self
):
218 stats
= self
.rados
.get_cluster_stats()
219 assert stats
['kb'] > 0
220 assert stats
['kb_avail'] > 0
221 assert stats
['kb_used'] > 0
222 assert stats
['num_objects'] >= 0
224 def test_monitor_log(self
):
225 lock
= threading
.Condition()
226 def cb(arg
, line
, who
, sec
, nsec
, seq
, level
, msg
):
227 # NOTE(sileht): the old pyrados API was received the pointer as int
228 # instead of the value of arg
234 # NOTE(sileht): force don't save the monitor into local var
235 # to ensure all references are correctly tracked into the lib
236 MonitorLog(self
.rados
, "debug", cb
, "arg")
239 MonitorLog(self
.rados
, "debug", None, None)
240 eq(None, self
.rados
.monitor_callback
)
242 class TestIoctx(object):
245 self
.rados
= Rados(conffile
='')
247 self
.rados
.create_pool('test_pool')
248 assert self
.rados
.pool_exists('test_pool')
249 self
.ioctx
= self
.rados
.open_ioctx('test_pool')
252 cmd
= {"prefix":"osd unset", "key":"noup"}
253 self
.rados
.mon_command(json
.dumps(cmd
), b
'')
255 self
.rados
.delete_pool('test_pool')
256 self
.rados
.shutdown()
258 def test_get_last_version(self
):
259 version
= self
.ioctx
.get_last_version()
262 def test_get_stats(self
):
263 stats
= self
.ioctx
.get_stats()
264 eq(stats
, {'num_objects_unfound': 0,
265 'num_objects_missing_on_primary': 0,
266 'num_object_clones': 0,
268 'num_object_copies': 0,
274 'num_objects_degraded': 0,
277 def test_write(self
):
278 self
.ioctx
.write('abc', b
'abc')
279 eq(self
.ioctx
.read('abc'), b
'abc')
281 def test_write_full(self
):
282 self
.ioctx
.write('abc', b
'abc')
283 eq(self
.ioctx
.read('abc'), b
'abc')
284 self
.ioctx
.write_full('abc', b
'd')
285 eq(self
.ioctx
.read('abc'), b
'd')
287 def test_writesame(self
):
288 self
.ioctx
.writesame('ob', b
'rzx', 9)
289 eq(self
.ioctx
.read('ob'), b
'rzxrzxrzx')
291 def test_append(self
):
292 self
.ioctx
.write('abc', b
'a')
293 self
.ioctx
.append('abc', b
'b')
294 self
.ioctx
.append('abc', b
'c')
295 eq(self
.ioctx
.read('abc'), b
'abc')
297 def test_write_zeros(self
):
298 self
.ioctx
.write('abc', b
'a\0b\0c')
299 eq(self
.ioctx
.read('abc'), b
'a\0b\0c')
301 def test_trunc(self
):
302 self
.ioctx
.write('abc', b
'abc')
303 self
.ioctx
.trunc('abc', 2)
304 eq(self
.ioctx
.read('abc'), b
'ab')
305 size
= self
.ioctx
.stat('abc')[0]
308 def test_cmpext(self
):
309 self
.ioctx
.write('test_object', b
'abcdefghi')
310 eq(0, self
.ioctx
.cmpext('test_object', b
'abcdefghi', 0))
311 eq(-MAX_ERRNO
- 4, self
.ioctx
.cmpext('test_object', b
'abcdxxxxx', 0))
313 def test_list_objects_empty(self
):
314 eq(list(self
.ioctx
.list_objects()), [])
316 def test_list_objects(self
):
317 self
.ioctx
.write('a', b
'')
318 self
.ioctx
.write('b', b
'foo')
319 self
.ioctx
.write_full('c', b
'bar')
320 self
.ioctx
.append('d', b
'jazz')
321 object_names
= [obj
.key
for obj
in self
.ioctx
.list_objects()]
322 eq(sorted(object_names
), ['a', 'b', 'c', 'd'])
324 def test_list_ns_objects(self
):
325 self
.ioctx
.write('a', b
'')
326 self
.ioctx
.write('b', b
'foo')
327 self
.ioctx
.write_full('c', b
'bar')
328 self
.ioctx
.append('d', b
'jazz')
329 self
.ioctx
.set_namespace("ns1")
330 self
.ioctx
.write('ns1-a', b
'')
331 self
.ioctx
.write('ns1-b', b
'foo')
332 self
.ioctx
.write_full('ns1-c', b
'bar')
333 self
.ioctx
.append('ns1-d', b
'jazz')
334 self
.ioctx
.append('d', b
'jazz')
335 self
.ioctx
.set_namespace(LIBRADOS_ALL_NSPACES
)
336 object_names
= [(obj
.nspace
, obj
.key
) for obj
in self
.ioctx
.list_objects()]
337 eq(sorted(object_names
), [('', 'a'), ('','b'), ('','c'), ('','d'),\
338 ('ns1', 'd'), ('ns1', 'ns1-a'), ('ns1', 'ns1-b'),\
339 ('ns1', 'ns1-c'), ('ns1', 'ns1-d')])
341 def test_xattrs(self
):
342 xattrs
= dict(a
=b
'1', b
=b
'2', c
=b
'3', d
=b
'a\0b', e
=b
'\0', f
=b
'')
343 self
.ioctx
.write('abc', b
'')
344 for key
, value
in xattrs
.items():
345 self
.ioctx
.set_xattr('abc', key
, value
)
346 eq(self
.ioctx
.get_xattr('abc', key
), value
)
348 for key
, value
in self
.ioctx
.get_xattrs('abc'):
349 stored_xattrs
[key
] = value
350 eq(stored_xattrs
, xattrs
)
352 def test_obj_xattrs(self
):
353 xattrs
= dict(a
=b
'1', b
=b
'2', c
=b
'3', d
=b
'a\0b', e
=b
'\0', f
=b
'')
354 self
.ioctx
.write('abc', b
'')
355 obj
= list(self
.ioctx
.list_objects())[0]
356 for key
, value
in xattrs
.items():
357 obj
.set_xattr(key
, value
)
358 eq(obj
.get_xattr(key
), value
)
360 for key
, value
in obj
.get_xattrs():
361 stored_xattrs
[key
] = value
362 eq(stored_xattrs
, xattrs
)
364 def test_get_pool_id(self
):
365 eq(self
.ioctx
.get_pool_id(), self
.rados
.pool_lookup('test_pool'))
367 def test_get_pool_name(self
):
368 eq(self
.ioctx
.get_pool_name(), 'test_pool')
371 def test_create_snap(self
):
372 assert_raises(ObjectNotFound
, self
.ioctx
.remove_snap
, 'foo')
373 self
.ioctx
.create_snap('foo')
374 self
.ioctx
.remove_snap('foo')
377 def test_list_snaps_empty(self
):
378 eq(list(self
.ioctx
.list_snaps()), [])
381 def test_list_snaps(self
):
382 snaps
= ['snap1', 'snap2', 'snap3']
384 self
.ioctx
.create_snap(snap
)
385 listed_snaps
= [snap
.name
for snap
in self
.ioctx
.list_snaps()]
386 eq(snaps
, listed_snaps
)
389 def test_lookup_snap(self
):
390 self
.ioctx
.create_snap('foo')
391 snap
= self
.ioctx
.lookup_snap('foo')
395 def test_snap_timestamp(self
):
396 self
.ioctx
.create_snap('foo')
397 snap
= self
.ioctx
.lookup_snap('foo')
401 def test_remove_snap(self
):
402 self
.ioctx
.create_snap('foo')
403 (snap
,) = self
.ioctx
.list_snaps()
405 self
.ioctx
.remove_snap('foo')
406 eq(list(self
.ioctx
.list_snaps()), [])
409 def test_snap_rollback(self
):
410 self
.ioctx
.write("insnap", b
"contents1")
411 self
.ioctx
.create_snap("snap1")
412 self
.ioctx
.remove_object("insnap")
413 self
.ioctx
.snap_rollback("insnap", "snap1")
414 eq(self
.ioctx
.read("insnap"), b
"contents1")
415 self
.ioctx
.remove_snap("snap1")
416 self
.ioctx
.remove_object("insnap")
419 def test_snap_read(self
):
420 self
.ioctx
.write("insnap", b
"contents1")
421 self
.ioctx
.create_snap("snap1")
422 self
.ioctx
.remove_object("insnap")
423 snap
= self
.ioctx
.lookup_snap("snap1")
424 self
.ioctx
.set_read(snap
.snap_id
)
425 eq(self
.ioctx
.read("insnap"), b
"contents1")
426 self
.ioctx
.set_read(LIBRADOS_SNAP_HEAD
)
427 self
.ioctx
.write("inhead", b
"contents2")
428 eq(self
.ioctx
.read("inhead"), b
"contents2")
429 self
.ioctx
.remove_snap("snap1")
430 self
.ioctx
.remove_object("inhead")
432 def test_set_omap(self
):
433 keys
= ("1", "2", "3", "4")
434 values
= (b
"aaa", b
"bbb", b
"ccc", b
"\x04\x04\x04\x04")
435 with
WriteOpCtx() as write_op
:
436 self
.ioctx
.set_omap(write_op
, keys
, values
)
437 write_op
.set_flags(LIBRADOS_OPERATION_SKIPRWLOCKS
)
438 self
.ioctx
.operate_write_op(write_op
, "hw")
439 with
ReadOpCtx() as read_op
:
440 iter, ret
= self
.ioctx
.get_omap_vals(read_op
, "", "", 4)
442 self
.ioctx
.operate_read_op(read_op
, "hw")
444 eq(list(iter), [("2", b
"bbb"), ("3", b
"ccc"), ("4", b
"\x04\x04\x04\x04")])
445 with
ReadOpCtx() as read_op
:
446 iter, ret
= self
.ioctx
.get_omap_vals(read_op
, "2", "", 4)
448 self
.ioctx
.operate_read_op(read_op
, "hw")
449 eq(("3", b
"ccc"), next(iter))
450 eq(list(iter), [("4", b
"\x04\x04\x04\x04")])
451 with
ReadOpCtx() as read_op
:
452 iter, ret
= self
.ioctx
.get_omap_vals(read_op
, "", "2", 4)
454 read_op
.set_flags(LIBRADOS_OPERATION_BALANCE_READS
)
455 self
.ioctx
.operate_read_op(read_op
, "hw")
456 eq(list(iter), [("2", b
"bbb")])
458 def test_set_omap_aio(self
):
459 lock
= threading
.Condition()
467 keys
= ("1", "2", "3", "4")
468 values
= (b
"aaa", b
"bbb", b
"ccc", b
"\x04\x04\x04\x04")
469 with
WriteOpCtx() as write_op
:
470 self
.ioctx
.set_omap(write_op
, keys
, values
)
471 comp
= self
.ioctx
.operate_aio_write_op(write_op
, "hw", cb
, cb
)
472 comp
.wait_for_complete()
476 eq(comp
.get_return_value(), 0)
478 with
ReadOpCtx() as read_op
:
479 iter, ret
= self
.ioctx
.get_omap_vals(read_op
, "", "", 4)
481 comp
= self
.ioctx
.operate_aio_read_op(read_op
, "hw", cb
, cb
)
482 comp
.wait_for_complete()
486 eq(comp
.get_return_value(), 0)
488 eq(list(iter), [("2", b
"bbb"), ("3", b
"ccc"), ("4", b
"\x04\x04\x04\x04")])
490 def test_write_ops(self
):
491 with
WriteOpCtx() as write_op
:
493 self
.ioctx
.operate_write_op(write_op
, "write_ops")
494 eq(self
.ioctx
.read('write_ops'), b
'')
496 write_op
.write_full(b
'1')
497 write_op
.append(b
'2')
498 self
.ioctx
.operate_write_op(write_op
, "write_ops")
499 eq(self
.ioctx
.read('write_ops'), b
'12')
501 write_op
.write_full(b
'12345')
502 write_op
.write(b
'x', 2)
503 self
.ioctx
.operate_write_op(write_op
, "write_ops")
504 eq(self
.ioctx
.read('write_ops'), b
'12x45')
506 write_op
.write_full(b
'12345')
508 self
.ioctx
.operate_write_op(write_op
, "write_ops")
509 eq(self
.ioctx
.read('write_ops'), b
'12\x00\x005')
511 write_op
.write_full(b
'12345')
513 self
.ioctx
.operate_write_op(write_op
, "write_ops")
514 eq(self
.ioctx
.read('write_ops'), b
'12')
517 self
.ioctx
.operate_write_op(write_op
, "write_ops")
518 with
assert_raises(ObjectNotFound
):
519 self
.ioctx
.read('write_ops')
521 def test_execute_op(self
):
522 with
WriteOpCtx() as write_op
:
523 write_op
.execute("hello", "record_hello", b
"ebs")
524 self
.ioctx
.operate_write_op(write_op
, "object")
525 eq(self
.ioctx
.read('object'), b
"Hello, ebs!")
527 def test_writesame_op(self
):
528 with
WriteOpCtx() as write_op
:
529 write_op
.writesame(b
'rzx', 9)
530 self
.ioctx
.operate_write_op(write_op
, 'abc')
531 eq(self
.ioctx
.read('abc'), b
'rzxrzxrzx')
533 def test_get_omap_vals_by_keys(self
):
534 keys
= ("1", "2", "3", "4")
535 values
= (b
"aaa", b
"bbb", b
"ccc", b
"\x04\x04\x04\x04")
536 with
WriteOpCtx() as write_op
:
537 self
.ioctx
.set_omap(write_op
, keys
, values
)
538 self
.ioctx
.operate_write_op(write_op
, "hw")
539 with
ReadOpCtx() as read_op
:
540 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
,("3","4",))
542 self
.ioctx
.operate_read_op(read_op
, "hw")
543 eq(list(iter), [("3", b
"ccc"), ("4", b
"\x04\x04\x04\x04")])
544 with
ReadOpCtx() as read_op
:
545 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
,("3","4",))
547 with
assert_raises(ObjectNotFound
):
548 self
.ioctx
.operate_read_op(read_op
, "no_such")
550 def test_get_omap_keys(self
):
551 keys
= ("1", "2", "3")
552 values
= (b
"aaa", b
"bbb", b
"ccc")
553 with
WriteOpCtx() as write_op
:
554 self
.ioctx
.set_omap(write_op
, keys
, values
)
555 self
.ioctx
.operate_write_op(write_op
, "hw")
556 with
ReadOpCtx() as read_op
:
557 iter, ret
= self
.ioctx
.get_omap_keys(read_op
,"",2)
559 self
.ioctx
.operate_read_op(read_op
, "hw")
560 eq(list(iter), [("1", None), ("2", None)])
561 with
ReadOpCtx() as read_op
:
562 iter, ret
= self
.ioctx
.get_omap_keys(read_op
,"",2)
564 with
assert_raises(ObjectNotFound
):
565 self
.ioctx
.operate_read_op(read_op
, "no_such")
567 def test_clear_omap(self
):
568 keys
= ("1", "2", "3")
569 values
= (b
"aaa", b
"bbb", b
"ccc")
570 with
WriteOpCtx() as write_op
:
571 self
.ioctx
.set_omap(write_op
, keys
, values
)
572 self
.ioctx
.operate_write_op(write_op
, "hw")
573 with
WriteOpCtx() as write_op_1
:
574 self
.ioctx
.clear_omap(write_op_1
)
575 self
.ioctx
.operate_write_op(write_op_1
, "hw")
576 with
ReadOpCtx() as read_op
:
577 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
,("1",))
579 self
.ioctx
.operate_read_op(read_op
, "hw")
582 def test_remove_omap_ramge2(self
):
583 keys
= ("1", "2", "3", "4")
584 values
= (b
"a", b
"bb", b
"ccc", b
"dddd")
585 with
WriteOpCtx() as write_op
:
586 self
.ioctx
.set_omap(write_op
, keys
, values
)
587 self
.ioctx
.operate_write_op(write_op
, "test_obj")
588 with
ReadOpCtx() as read_op
:
589 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
, keys
)
591 self
.ioctx
.operate_read_op(read_op
, "test_obj")
592 eq(list(iter), list(zip(keys
, values
)))
593 with
WriteOpCtx() as write_op
:
594 self
.ioctx
.remove_omap_range2(write_op
, "1", "4")
595 self
.ioctx
.operate_write_op(write_op
, "test_obj")
596 with
ReadOpCtx() as read_op
:
597 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
, keys
)
599 self
.ioctx
.operate_read_op(read_op
, "test_obj")
600 eq(list(iter), [("4", b
"dddd")])
602 def test_omap_cmp(self
):
604 self
.ioctx
.write(object_id
, b
'omap_cmp')
605 with
WriteOpCtx() as write_op
:
606 self
.ioctx
.set_omap(write_op
, ('key1',), ('1',))
607 self
.ioctx
.operate_write_op(write_op
, object_id
)
608 with
WriteOpCtx() as write_op
:
609 write_op
.omap_cmp('key1', '1', LIBRADOS_CMPXATTR_OP_EQ
)
610 self
.ioctx
.set_omap(write_op
, ('key1',), ('2',))
611 self
.ioctx
.operate_write_op(write_op
, object_id
)
612 with
ReadOpCtx() as read_op
:
613 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
, ('key1',))
615 self
.ioctx
.operate_read_op(read_op
, object_id
)
616 eq(list(iter), [('key1', b
'2')])
617 with
WriteOpCtx() as write_op
:
618 write_op
.omap_cmp('key1', '1', LIBRADOS_CMPXATTR_OP_GT
)
619 self
.ioctx
.set_omap(write_op
, ('key1',), ('3',))
620 self
.ioctx
.operate_write_op(write_op
, object_id
)
621 with
ReadOpCtx() as read_op
:
622 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
, ('key1',))
624 self
.ioctx
.operate_read_op(read_op
, object_id
)
625 eq(list(iter), [('key1', b
'3')])
626 with
WriteOpCtx() as write_op
:
627 write_op
.omap_cmp('key1', '4', LIBRADOS_CMPXATTR_OP_LT
)
628 self
.ioctx
.set_omap(write_op
, ('key1',), ('4',))
629 self
.ioctx
.operate_write_op(write_op
, object_id
)
630 with
ReadOpCtx() as read_op
:
631 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
, ('key1',))
633 self
.ioctx
.operate_read_op(read_op
, object_id
)
634 eq(list(iter), [('key1', b
'4')])
635 with
WriteOpCtx() as write_op
:
636 write_op
.omap_cmp('key1', '1', LIBRADOS_CMPXATTR_OP_EQ
)
637 self
.ioctx
.set_omap(write_op
, ('key1',), ('5',))
639 self
.ioctx
.operate_write_op(write_op
, object_id
)
640 except (OSError, ExtendMismatch
) as e
:
643 message
= "omap_cmp did not raise Exception when omap content does not match"
644 raise AssertionError(message
)
646 def test_cmpext_op(self
):
648 with
WriteOpCtx() as write_op
:
649 write_op
.write(b
'12345', 0)
650 self
.ioctx
.operate_write_op(write_op
, object_id
)
651 with
WriteOpCtx() as write_op
:
652 write_op
.cmpext(b
'12345', 0)
653 write_op
.write(b
'54321', 0)
654 self
.ioctx
.operate_write_op(write_op
, object_id
)
655 eq(self
.ioctx
.read(object_id
), b
'54321')
656 with
WriteOpCtx() as write_op
:
657 write_op
.cmpext(b
'56789', 0)
658 write_op
.write(b
'12345', 0)
660 self
.ioctx
.operate_write_op(write_op
, object_id
)
661 except ExtendMismatch
as e
:
662 # the cmpext_result compare with expected error number, it should be (-MAX_ERRNO - 1)
663 # where "1" is the offset of the first unmatched byte
664 eq(-e
.errno
, -MAX_ERRNO
- 1)
667 message
= "cmpext did not raise Exception when object content does not match"
668 raise AssertionError(message
)
669 with
ReadOpCtx() as read_op
:
670 read_op
.cmpext(b
'54321', 0)
671 self
.ioctx
.operate_read_op(read_op
, object_id
)
672 with
ReadOpCtx() as read_op
:
673 read_op
.cmpext(b
'54789', 0)
675 self
.ioctx
.operate_read_op(read_op
, object_id
)
676 except ExtendMismatch
as e
:
677 # the cmpext_result compare with expected error number, it should be (-MAX_ERRNO - 2)
678 # where "2" is the offset of the first unmatched byte
679 eq(-e
.errno
, -MAX_ERRNO
- 2)
682 message
= "cmpext did not raise Exception when object content does not match"
683 raise AssertionError(message
)
685 def test_xattrs_op(self
):
686 xattrs
= dict(a
=b
'1', b
=b
'2', c
=b
'3', d
=b
'a\0b', e
=b
'\0')
687 with
WriteOpCtx() as write_op
:
688 write_op
.new(LIBRADOS_CREATE_EXCLUSIVE
)
689 for key
, value
in xattrs
.items():
690 write_op
.set_xattr(key
, value
)
691 self
.ioctx
.operate_write_op(write_op
, 'abc')
692 eq(self
.ioctx
.get_xattr('abc', key
), value
)
695 for key
, value
in self
.ioctx
.get_xattrs('abc'):
696 stored_xattrs_1
[key
] = value
697 eq(stored_xattrs_1
, xattrs
)
699 for key
in xattrs
.keys():
700 write_op
.rm_xattr(key
)
701 self
.ioctx
.operate_write_op(write_op
, 'abc')
703 for key
, value
in self
.ioctx
.get_xattrs('abc'):
704 stored_xattrs_2
[key
] = value
705 eq(stored_xattrs_2
, {})
708 self
.ioctx
.operate_write_op(write_op
, 'abc')
710 def test_locator(self
):
711 self
.ioctx
.set_locator_key("bar")
712 self
.ioctx
.write('foo', b
'contents1')
713 objects
= [i
for i
in self
.ioctx
.list_objects()]
715 eq(self
.ioctx
.get_locator_key(), "bar")
716 self
.ioctx
.set_locator_key("")
718 objects
[0].write(b
"contents2")
719 eq(self
.ioctx
.get_locator_key(), "")
720 self
.ioctx
.set_locator_key("bar")
721 contents
= self
.ioctx
.read("foo")
722 eq(contents
, b
"contents2")
723 eq(self
.ioctx
.get_locator_key(), "bar")
725 objects
= [i
for i
in self
.ioctx
.list_objects()]
727 self
.ioctx
.set_locator_key("")
729 def test_operate_aio_write_op(self
):
730 lock
= threading
.Condition()
737 with
WriteOpCtx() as write_op
:
738 write_op
.write(b
'rzx')
739 comp
= self
.ioctx
.operate_aio_write_op(write_op
, "object", cb
, cb
)
740 comp
.wait_for_complete()
744 eq(comp
.get_return_value(), 0)
745 eq(self
.ioctx
.read('object'), b
'rzx')
747 def test_aio_write(self
):
748 lock
= threading
.Condition()
755 comp
= self
.ioctx
.aio_write("foo", b
"bar", 0, cb
, cb
)
756 comp
.wait_for_complete()
760 eq(comp
.get_return_value(), 0)
761 contents
= self
.ioctx
.read("foo")
763 [i
.remove() for i
in self
.ioctx
.list_objects()]
765 def test_aio_cmpext(self
):
766 lock
= threading
.Condition()
774 self
.ioctx
.write('test_object', b
'abcdefghi')
775 comp
= self
.ioctx
.aio_cmpext('test_object', b
'abcdefghi', 0, cb
)
776 comp
.wait_for_complete()
780 eq(comp
.get_return_value(), 0)
782 def test_aio_rmxattr(self
):
783 lock
= threading
.Condition()
790 self
.ioctx
.set_xattr("xyz", "key", b
'value')
791 eq(self
.ioctx
.get_xattr("xyz", "key"), b
'value')
792 comp
= self
.ioctx
.aio_rmxattr("xyz", "key", cb
)
793 comp
.wait_for_complete()
797 eq(comp
.get_return_value(), 0)
798 with
assert_raises(NoData
):
799 self
.ioctx
.get_xattr("xyz", "key")
801 def test_aio_write_no_comp_ref(self
):
802 lock
= threading
.Condition()
809 # NOTE(sileht): force don't save the comp into local var
810 # to ensure all references are correctly tracked into the lib
811 self
.ioctx
.aio_write("foo", b
"bar", 0, cb
, cb
)
815 contents
= self
.ioctx
.read("foo")
817 [i
.remove() for i
in self
.ioctx
.list_objects()]
819 def test_aio_append(self
):
820 lock
= threading
.Condition()
827 comp
= self
.ioctx
.aio_write("foo", b
"bar", 0, cb
, cb
)
828 comp2
= self
.ioctx
.aio_append("foo", b
"baz", cb
, cb
)
829 comp
.wait_for_complete()
830 contents
= self
.ioctx
.read("foo")
831 eq(contents
, b
"barbaz")
835 eq(comp
.get_return_value(), 0)
836 eq(comp2
.get_return_value(), 0)
837 [i
.remove() for i
in self
.ioctx
.list_objects()]
839 def test_aio_write_full(self
):
840 lock
= threading
.Condition()
847 self
.ioctx
.aio_write("foo", b
"barbaz", 0, cb
, cb
)
848 comp
= self
.ioctx
.aio_write_full("foo", b
"bar", cb
, cb
)
849 comp
.wait_for_complete()
853 eq(comp
.get_return_value(), 0)
854 contents
= self
.ioctx
.read("foo")
856 [i
.remove() for i
in self
.ioctx
.list_objects()]
858 def test_aio_writesame(self
):
859 lock
= threading
.Condition()
866 comp
= self
.ioctx
.aio_writesame("abc", b
"rzx", 9, 0, cb
)
867 comp
.wait_for_complete()
871 eq(comp
.get_return_value(), 0)
872 eq(self
.ioctx
.read("abc"), b
"rzxrzxrzx")
873 [i
.remove() for i
in self
.ioctx
.list_objects()]
875 def test_aio_stat(self
):
876 lock
= threading
.Condition()
878 def cb(_
, size
, mtime
):
883 comp
= self
.ioctx
.aio_stat("foo", cb
)
884 comp
.wait_for_complete()
888 eq(comp
.get_return_value(), -2)
890 self
.ioctx
.write("foo", b
"bar")
892 comp
= self
.ioctx
.aio_stat("foo", cb
)
893 comp
.wait_for_complete()
897 eq(comp
.get_return_value(), 0)
899 [i
.remove() for i
in self
.ioctx
.list_objects()]
901 def test_aio_remove(self
):
902 lock
= threading
.Condition()
909 self
.ioctx
.write('foo', b
'wrx')
910 eq(self
.ioctx
.read('foo'), b
'wrx')
911 comp
= self
.ioctx
.aio_remove('foo', cb
, cb
)
912 comp
.wait_for_complete()
916 eq(comp
.get_return_value(), 0)
917 eq(list(self
.ioctx
.list_objects()), [])
919 def _take_down_acting_set(self
, pool
, objectname
):
920 # find acting_set for pool:objectname and take it down; used to
921 # verify that async reads don't complete while acting set is missing
928 r
, jsonout
, _
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
929 objmap
= json
.loads(jsonout
.decode("utf-8"))
930 acting_set
= objmap
['acting']
931 cmd
= {"prefix":"osd set", "key":"noup"}
932 r
, _
, _
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
934 cmd
= {"prefix":"osd down", "ids":[str(i
) for i
in acting_set
]}
935 r
, _
, _
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
938 # wait for OSDs to acknowledge the down
939 eq(self
.rados
.wait_for_latest_osdmap(), 0)
941 def _let_osds_back_up(self
):
942 cmd
= {"prefix":"osd unset", "key":"noup"}
943 r
, _
, _
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
946 def test_aio_read_wait_for_complete(self
):
947 # use wait_for_complete() and wait for cb by
950 # this is a list so that the local cb() can modify it
951 payload
= b
"bar\000frob"
952 self
.ioctx
.write("foo", payload
)
953 self
._take
_down
_acting
_set
('test_pool', 'foo')
956 lock
= threading
.Condition()
962 comp
= self
.ioctx
.aio_read("foo", len(payload
), 0, cb
)
963 eq(False, comp
.is_complete())
965 eq(False, comp
.is_complete())
969 self
._let
_osds
_back
_up
()
970 comp
.wait_for_complete()
973 while retval
[0] is None and loops
<= 10:
978 eq(retval
[0], payload
)
979 eq(sys
.getrefcount(comp
), 2)
981 def test_aio_read_wait_for_complete_and_cb(self
):
982 # use wait_for_complete_and_cb(), verify retval[0] is
983 # set by the time we regain control
984 payload
= b
"bar\000frob"
985 self
.ioctx
.write("foo", payload
)
987 self
._take
_down
_acting
_set
('test_pool', 'foo')
988 # this is a list so that the local cb() can modify it
990 lock
= threading
.Condition()
995 comp
= self
.ioctx
.aio_read("foo", len(payload
), 0, cb
)
996 eq(False, comp
.is_complete())
998 eq(False, comp
.is_complete())
1002 self
._let
_osds
_back
_up
()
1003 comp
.wait_for_complete_and_cb()
1004 assert(retval
[0] is not None)
1005 eq(retval
[0], payload
)
1006 eq(sys
.getrefcount(comp
), 2)
1008 def test_aio_read_wait_for_complete_and_cb_error(self
):
1009 # error case, use wait_for_complete_and_cb(), verify retval[0] is
1010 # set by the time we regain control
1011 self
._take
_down
_acting
_set
('test_pool', 'bar')
1013 # this is a list so that the local cb() can modify it
1015 lock
= threading
.Condition()
1021 # read from a DNE object
1022 comp
= self
.ioctx
.aio_read("bar", 3, 0, cb
)
1023 eq(False, comp
.is_complete())
1025 eq(False, comp
.is_complete())
1028 self
._let
_osds
_back
_up
()
1030 comp
.wait_for_complete_and_cb()
1032 assert(comp
.get_return_value() < 0)
1033 eq(sys
.getrefcount(comp
), 2)
1035 def test_lock(self
):
1036 self
.ioctx
.lock_exclusive("foo", "lock", "locker", "desc_lock",
1038 assert_raises(ObjectExists
,
1039 self
.ioctx
.lock_exclusive
,
1040 "foo", "lock", "locker", "desc_lock", 10000, 0)
1041 self
.ioctx
.unlock("foo", "lock", "locker")
1042 assert_raises(ObjectNotFound
, self
.ioctx
.unlock
, "foo", "lock", "locker")
1044 self
.ioctx
.lock_shared("foo", "lock", "locker1", "tag", "desc_lock",
1046 self
.ioctx
.lock_shared("foo", "lock", "locker2", "tag", "desc_lock",
1048 assert_raises(ObjectBusy
,
1049 self
.ioctx
.lock_exclusive
,
1050 "foo", "lock", "locker3", "desc_lock", 10000, 0)
1051 self
.ioctx
.unlock("foo", "lock", "locker1")
1052 self
.ioctx
.unlock("foo", "lock", "locker2")
1053 assert_raises(ObjectNotFound
, self
.ioctx
.unlock
, "foo", "lock", "locker1")
1054 assert_raises(ObjectNotFound
, self
.ioctx
.unlock
, "foo", "lock", "locker2")
1056 def test_execute(self
):
1057 self
.ioctx
.write("foo", b
"") # ensure object exists
1059 ret
, buf
= self
.ioctx
.execute("foo", "hello", "say_hello", b
"")
1060 eq(buf
, b
"Hello, world!")
1062 ret
, buf
= self
.ioctx
.execute("foo", "hello", "say_hello", b
"nose")
1063 eq(buf
, b
"Hello, nose!")
1065 def test_aio_execute(self
):
1068 lock
= threading
.Condition()
1071 if retval
[0] is None:
1075 self
.ioctx
.write("foo", b
"") # ensure object exists
1077 comp
= self
.ioctx
.aio_execute("foo", "hello", "say_hello", b
"", 32, cb
, cb
)
1078 comp
.wait_for_complete()
1082 eq(comp
.get_return_value(), 13)
1083 eq(retval
[0], b
"Hello, world!")
1086 comp
= self
.ioctx
.aio_execute("foo", "hello", "say_hello", b
"nose", 32, cb
, cb
)
1087 comp
.wait_for_complete()
1091 eq(comp
.get_return_value(), 12)
1092 eq(retval
[0], b
"Hello, nose!")
1094 [i
.remove() for i
in self
.ioctx
.list_objects()]
1096 def test_aio_setxattr(self
):
1097 lock
= threading
.Condition()
1104 comp
= self
.ioctx
.aio_setxattr("obj", "key", b
'value', cb
)
1105 comp
.wait_for_complete()
1109 eq(comp
.get_return_value(), 0)
1110 eq(self
.ioctx
.get_xattr("obj", "key"), b
'value')
1112 def test_applications(self
):
1113 cmd
= {"prefix":"osd dump", "format":"json"}
1114 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
1117 release
= json
.loads(buf
.decode("utf-8")).get("require_osd_release",
1119 if not release
or release
[0] < 'l':
1122 eq([], self
.ioctx
.application_list())
1124 self
.ioctx
.application_enable("app1")
1125 assert_raises(Error
, self
.ioctx
.application_enable
, "app2")
1126 self
.ioctx
.application_enable("app2", True)
1128 assert_raises(Error
, self
.ioctx
.application_metadata_list
, "dne")
1129 eq([], self
.ioctx
.application_metadata_list("app1"))
1131 assert_raises(Error
, self
.ioctx
.application_metadata_set
, "dne", "key",
1133 self
.ioctx
.application_metadata_set("app1", "key1", "val1")
1134 eq("val1", self
.ioctx
.application_metadata_get("app1", "key1"))
1135 self
.ioctx
.application_metadata_set("app1", "key2", "val2")
1136 eq("val2", self
.ioctx
.application_metadata_get("app1", "key2"))
1137 self
.ioctx
.application_metadata_set("app2", "key1", "val1")
1138 eq("val1", self
.ioctx
.application_metadata_get("app2", "key1"))
1140 eq([("key1", "val1"), ("key2", "val2")],
1141 self
.ioctx
.application_metadata_list("app1"))
1143 self
.ioctx
.application_metadata_remove("app1", "key1")
1144 eq([("key2", "val2")], self
.ioctx
.application_metadata_list("app1"))
1146 def test_service_daemon(self
):
1147 name
= "pid-" + str(os
.getpid())
1148 metadata
= {'version': '3.14', 'memory': '42'}
1149 self
.rados
.service_daemon_register("laundry", name
, metadata
)
1150 status
= {'result': 'unknown', 'test': 'running'}
1151 self
.rados
.service_daemon_update(status
)
1153 def test_alignment(self
):
1154 eq(self
.ioctx
.alignment(), None)
1158 class TestIoctxEc(object):
1161 self
.rados
= Rados(conffile
='')
1162 self
.rados
.connect()
1163 self
.pool
= 'test-ec'
1164 self
.profile
= 'testprofile-%s' % self
.pool
1165 cmd
= {"prefix": "osd erasure-code-profile set",
1166 "name": self
.profile
, "profile": ["k=2", "m=1", "crush-failure-domain=osd"]}
1167 ret
, buf
, out
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
1169 # create ec pool with profile created above
1170 cmd
= {'prefix': 'osd pool create', 'pg_num': 8, 'pgp_num': 8,
1171 'pool': self
.pool
, 'pool_type': 'erasure',
1172 'erasure_code_profile': self
.profile
}
1173 ret
, buf
, out
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
1175 assert self
.rados
.pool_exists(self
.pool
)
1176 self
.ioctx
= self
.rados
.open_ioctx(self
.pool
)
1179 cmd
= {"prefix": "osd unset", "key": "noup"}
1180 self
.rados
.mon_command(json
.dumps(cmd
), b
'')
1182 self
.rados
.delete_pool(self
.pool
)
1183 self
.rados
.shutdown()
1185 def test_alignment(self
):
1186 eq(self
.ioctx
.alignment(), 8192)
1189 class TestIoctx2(object):
1192 self
.rados
= Rados(conffile
='')
1193 self
.rados
.connect()
1194 self
.rados
.create_pool('test_pool')
1195 assert self
.rados
.pool_exists('test_pool')
1196 pool_id
= self
.rados
.pool_lookup('test_pool')
1198 self
.ioctx2
= self
.rados
.open_ioctx2(pool_id
)
1201 cmd
= {"prefix": "osd unset", "key": "noup"}
1202 self
.rados
.mon_command(json
.dumps(cmd
), b
'')
1204 self
.rados
.delete_pool('test_pool')
1205 self
.rados
.shutdown()
1207 def test_get_last_version(self
):
1208 version
= self
.ioctx2
.get_last_version()
1211 def test_get_stats(self
):
1212 stats
= self
.ioctx2
.get_stats()
1213 eq(stats
, {'num_objects_unfound': 0,
1214 'num_objects_missing_on_primary': 0,
1215 'num_object_clones': 0,
1217 'num_object_copies': 0,
1223 'num_objects_degraded': 0,
1227 class TestObject(object):
1230 self
.rados
= Rados(conffile
='')
1231 self
.rados
.connect()
1232 self
.rados
.create_pool('test_pool')
1233 assert self
.rados
.pool_exists('test_pool')
1234 self
.ioctx
= self
.rados
.open_ioctx('test_pool')
1235 self
.ioctx
.write('foo', b
'bar')
1236 self
.object = Object(self
.ioctx
, 'foo')
1241 self
.rados
.delete_pool('test_pool')
1242 self
.rados
.shutdown()
1245 def test_read(self
):
1246 eq(self
.object.read(3), b
'bar')
1247 eq(self
.object.read(100), b
'')
1249 def test_seek(self
):
1250 self
.object.write(b
'blah')
1252 eq(self
.object.read(4), b
'blah')
1254 eq(self
.object.read(3), b
'lah')
1256 def test_write(self
):
1257 self
.object.write(b
'barbaz')
1259 eq(self
.object.read(3), b
'bar')
1260 eq(self
.object.read(3), b
'baz')
1263 class TestIoCtxSelfManagedSnaps(object):
1265 self
.rados
= Rados(conffile
='')
1266 self
.rados
.connect()
1267 self
.rados
.create_pool('test_pool')
1268 assert self
.rados
.pool_exists('test_pool')
1269 self
.ioctx
= self
.rados
.open_ioctx('test_pool')
1272 cmd
= {"prefix":"osd unset", "key":"noup"}
1273 self
.rados
.mon_command(json
.dumps(cmd
), b
'')
1275 self
.rados
.delete_pool('test_pool')
1276 self
.rados
.shutdown()
1279 # cannot mix-and-match pool and self-managed snapshot mode
1280 self
.ioctx
.set_self_managed_snap_write([])
1281 self
.ioctx
.write('abc', b
'abc')
1282 snap_id_1
= self
.ioctx
.create_self_managed_snap()
1283 self
.ioctx
.set_self_managed_snap_write([snap_id_1
])
1285 self
.ioctx
.write('abc', b
'def')
1286 snap_id_2
= self
.ioctx
.create_self_managed_snap()
1287 self
.ioctx
.set_self_managed_snap_write([snap_id_1
, snap_id_2
])
1289 self
.ioctx
.write('abc', b
'ghi')
1291 self
.ioctx
.rollback_self_managed_snap('abc', snap_id_1
)
1292 eq(self
.ioctx
.read('abc'), b
'abc')
1294 self
.ioctx
.rollback_self_managed_snap('abc', snap_id_2
)
1295 eq(self
.ioctx
.read('abc'), b
'def')
1297 self
.ioctx
.remove_self_managed_snap(snap_id_1
)
1298 self
.ioctx
.remove_self_managed_snap(snap_id_2
)
1300 class TestCommand(object):
1303 self
.rados
= Rados(conffile
='')
1304 self
.rados
.connect()
1307 self
.rados
.shutdown()
1309 def test_monmap_dump(self
):
1311 # check for success and some plain output with epoch in it
1312 cmd
= {"prefix":"mon dump"}
1313 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
1316 assert(b
'epoch' in buf
)
1318 # JSON, and grab current epoch
1319 cmd
['format'] = 'json'
1320 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
1323 d
= json
.loads(buf
.decode("utf-8"))
1324 assert('epoch' in d
)
1327 # assume epoch + 1000 does not exist; test for ENOENT
1328 cmd
['epoch'] = epoch
+ 1000
1329 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
1330 eq(ret
, -errno
.ENOENT
)
1334 # send to specific target by name, rank
1335 cmd
= {"prefix": "version"}
1337 target
= d
['mons'][0]['name']
1339 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30,
1343 e
= json
.loads(buf
.decode("utf-8"))
1344 assert('release' in e
)
1346 target
= d
['mons'][0]['rank']
1348 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30,
1352 e
= json
.loads(buf
.decode("utf-8"))
1353 assert('release' in e
)
1356 def test_osd_bench(self
):
1357 cmd
= dict(prefix
='bench', size
=4096, count
=8192)
1358 ret
, buf
, err
= self
.rados
.osd_command(0, json
.dumps(cmd
), b
'',
1362 out
= json
.loads(buf
.decode('utf-8'))
1363 eq(out
['blocksize'], cmd
['size'])
1364 eq(out
['bytes_written'], cmd
['count'])
1366 def test_ceph_osd_pool_create_utf8(self
):
1369 cmd
= {"prefix": "osd pool create", "pg_num": 16, "pool": poolname
}
1370 ret
, buf
, out
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
1373 eq(u
"pool '\u9ec5' created", out
)
1376 class TestWatchNotify(object):
1377 OID
= "test_watch_notify"
1380 self
.rados
= Rados(conffile
='')
1381 self
.rados
.connect()
1382 self
.rados
.create_pool('test_pool')
1383 assert self
.rados
.pool_exists('test_pool')
1384 self
.ioctx
= self
.rados
.open_ioctx('test_pool')
1385 self
.ioctx
.write(self
.OID
, b
'test watch notify')
1386 self
.lock
= threading
.Condition()
1387 self
.notify_cnt
= {}
1388 self
.notify_data
= {}
1389 self
.notify_error
= {}
1393 self
.instance_id
= self
.rados
.get_instance_id()
1397 self
.rados
.delete_pool('test_pool')
1398 self
.rados
.shutdown()
1400 def make_callback(self
):
1401 def callback(notify_id
, notifier_id
, watch_id
, data
):
1403 if watch_id
not in self
.notify_cnt
:
1404 self
.notify_cnt
[watch_id
] = 0
1405 self
.notify_cnt
[watch_id
] += 1
1406 self
.notify_data
[watch_id
] = data
1409 def make_error_callback(self
):
1410 def callback(watch_id
, error
):
1412 self
.notify_error
[watch_id
] = error
1417 with self
.ioctx
.watch(self
.OID
, self
.make_callback(),
1418 self
.make_error_callback()) as watch1
:
1419 watch_id1
= watch1
.get_id()
1420 assert(watch_id1
> 0)
1422 with self
.rados
.open_ioctx('test_pool') as ioctx
:
1423 watch2
= ioctx
.watch(self
.OID
, self
.make_callback(),
1424 self
.make_error_callback())
1425 watch_id2
= watch2
.get_id()
1426 assert(watch_id2
> 0)
1428 assert(self
.ioctx
.notify(self
.OID
, 'test'))
1430 assert(watch_id1
in self
.notify_cnt
)
1431 assert(watch_id2
in self
.notify_cnt
)
1432 eq(self
.notify_cnt
[watch_id1
], 1)
1433 eq(self
.notify_cnt
[watch_id2
], 1)
1434 eq(self
.notify_data
[watch_id1
], b
'test')
1435 eq(self
.notify_data
[watch_id2
], b
'test')
1437 assert(watch1
.check() >= timedelta())
1438 assert(watch2
.check() >= timedelta())
1440 assert(self
.ioctx
.notify(self
.OID
, 'best'))
1442 eq(self
.notify_cnt
[watch_id1
], 2)
1443 eq(self
.notify_cnt
[watch_id2
], 2)
1444 eq(self
.notify_data
[watch_id1
], b
'best')
1445 eq(self
.notify_data
[watch_id2
], b
'best')
1449 assert(self
.ioctx
.notify(self
.OID
, 'rest'))
1451 eq(self
.notify_cnt
[watch_id1
], 3)
1452 eq(self
.notify_cnt
[watch_id2
], 2)
1453 eq(self
.notify_data
[watch_id1
], b
'rest')
1454 eq(self
.notify_data
[watch_id2
], b
'best')
1456 assert(watch1
.check() >= timedelta())
1458 self
.ioctx
.remove_object(self
.OID
)
1462 if watch_id1
in self
.notify_error
:
1465 eq(self
.notify_error
[watch_id1
], -errno
.ENOTCONN
)
1466 assert_raises(NotConnected
, watch1
.check
)
1468 assert_raises(ObjectNotFound
, self
.ioctx
.notify
, self
.OID
, 'test')
1470 def make_callback_reply(self
):
1471 def callback(notify_id
, notifier_id
, watch_id
, data
):
1476 def notify_callback(self
, _
, r
, ack_list
, timeout_list
):
1479 for notifier_id
, _
, notifier_data
in ack_list
:
1480 if notifier_id
not in self
.ack_cnt
:
1481 self
.ack_cnt
[notifier_id
] = 0
1482 self
.ack_cnt
[notifier_id
] += 1
1483 self
.ack_data
[notifier_id
] = notifier_data
1485 def notify_callback_err(self
, _
, r
, ack_list
, timeout_list
):
1486 eq(r
, -errno
.ENOENT
)
1488 def test_aio_notify(self
):
1489 with self
.ioctx
.watch(self
.OID
, self
.make_callback_reply(),
1490 self
.make_error_callback()) as watch1
:
1491 watch_id1
= watch1
.get_id()
1494 with self
.rados
.open_ioctx('test_pool') as ioctx
:
1495 watch2
= ioctx
.watch(self
.OID
, self
.make_callback_reply(),
1496 self
.make_error_callback())
1497 watch_id2
= watch2
.get_id()
1500 comp
= self
.ioctx
.aio_notify(self
.OID
, self
.notify_callback
, msg
='test')
1501 comp
.wait_for_complete_and_cb()
1503 ok(self
.instance_id
in self
.ack_cnt
)
1504 eq(self
.ack_cnt
[self
.instance_id
], 2)
1505 eq(self
.ack_data
[self
.instance_id
], b
'test')
1507 ok(watch1
.check() >= timedelta())
1508 ok(watch2
.check() >= timedelta())
1510 comp
= self
.ioctx
.aio_notify(self
.OID
, self
.notify_callback
, msg
='best')
1511 comp
.wait_for_complete_and_cb()
1513 eq(self
.ack_cnt
[self
.instance_id
], 4)
1514 eq(self
.ack_data
[self
.instance_id
], b
'best')
1518 comp
= self
.ioctx
.aio_notify(self
.OID
, self
.notify_callback
, msg
='rest')
1519 comp
.wait_for_complete_and_cb()
1521 eq(self
.ack_cnt
[self
.instance_id
], 5)
1522 eq(self
.ack_data
[self
.instance_id
], b
'rest')
1524 assert(watch1
.check() >= timedelta())
1525 self
.ioctx
.remove_object(self
.OID
)
1529 if watch_id1
in self
.notify_error
:
1532 eq(self
.notify_error
[watch_id1
], -errno
.ENOTCONN
)
1533 assert_raises(NotConnected
, watch1
.check
)
1535 comp
= self
.ioctx
.aio_notify(self
.OID
, self
.notify_callback_err
, msg
='test')
1536 comp
.wait_for_complete_and_cb()