1 from __future__
import print_function
2 from nose
import SkipTest
3 from nose
.plugins
.attrib
import attr
4 from nose
.tools
import eq_
as eq
, ok_
as ok
, assert_raises
5 from rados
import (Rados
, Error
, RadosStateError
, Object
, ObjectExists
,
6 ObjectNotFound
, ObjectBusy
, NotConnected
,
7 LIBRADOS_ALL_NSPACES
, WriteOpCtx
, ReadOpCtx
, LIBRADOS_CREATE_EXCLUSIVE
,
8 LIBRADOS_CMPXATTR_OP_EQ
, LIBRADOS_CMPXATTR_OP_GT
, LIBRADOS_CMPXATTR_OP_LT
, OSError,
9 LIBRADOS_SNAP_HEAD
, LIBRADOS_OPERATION_BALANCE_READS
, LIBRADOS_OPERATION_SKIPRWLOCKS
, MonitorLog
, MAX_ERRNO
, NoData
, ExtendMismatch
)
10 from datetime
import timedelta
19 def test_rados_init_error():
20 assert_raises(Error
, Rados
, conffile
='', rados_id
='admin',
22 assert_raises(Error
, Rados
, conffile
='', name
='invalid')
23 assert_raises(Error
, Rados
, conffile
='', name
='bad.invalid')
25 def test_rados_init():
26 with
Rados(conffile
='', rados_id
='admin'):
28 with
Rados(conffile
='', name
='client.admin'):
30 with
Rados(conffile
='', name
='client.admin'):
32 with
Rados(conffile
='', name
='client.admin'):
35 def test_ioctx_context_manager():
36 with
Rados(conffile
='', rados_id
='admin') as conn
:
37 with conn
.open_ioctx('rbd') as ioctx
:
40 def test_parse_argv():
41 args
= ['osd', 'pool', 'delete', 'foobar', 'foobar', '--yes-i-really-really-mean-it']
43 eq(args
, r
.conf_parse_argv(args
))
45 def test_parse_argv_empty_str():
48 eq(args
, r
.conf_parse_argv(args
))
50 class TestRadosStateError(object):
51 def _requires_configuring(self
, rados
):
52 assert_raises(RadosStateError
, rados
.connect
)
54 def _requires_configuring_or_connected(self
, rados
):
55 assert_raises(RadosStateError
, rados
.conf_read_file
)
56 assert_raises(RadosStateError
, rados
.conf_parse_argv
, None)
57 assert_raises(RadosStateError
, rados
.conf_parse_env
)
58 assert_raises(RadosStateError
, rados
.conf_get
, 'opt')
59 assert_raises(RadosStateError
, rados
.conf_set
, 'opt', 'val')
60 assert_raises(RadosStateError
, rados
.ping_monitor
, '0')
62 def _requires_connected(self
, rados
):
63 assert_raises(RadosStateError
, rados
.pool_exists
, 'foo')
64 assert_raises(RadosStateError
, rados
.pool_lookup
, 'foo')
65 assert_raises(RadosStateError
, rados
.pool_reverse_lookup
, 0)
66 assert_raises(RadosStateError
, rados
.create_pool
, 'foo')
67 assert_raises(RadosStateError
, rados
.get_pool_base_tier
, 0)
68 assert_raises(RadosStateError
, rados
.delete_pool
, 'foo')
69 assert_raises(RadosStateError
, rados
.list_pools
)
70 assert_raises(RadosStateError
, rados
.get_fsid
)
71 assert_raises(RadosStateError
, rados
.open_ioctx
, 'foo')
72 assert_raises(RadosStateError
, rados
.mon_command
, '', b
'')
73 assert_raises(RadosStateError
, rados
.osd_command
, 0, '', b
'')
74 assert_raises(RadosStateError
, rados
.pg_command
, '', '', b
'')
75 assert_raises(RadosStateError
, rados
.wait_for_latest_osdmap
)
76 assert_raises(RadosStateError
, rados
.blocklist_add
, '127.0.0.1/123', 0)
78 def test_configuring(self
):
79 rados
= Rados(conffile
='')
80 eq('configuring', rados
.state
)
81 self
._requires
_connected
(rados
)
83 def test_connected(self
):
84 rados
= Rados(conffile
='')
86 eq('connected', rados
.state
)
87 self
._requires
_configuring
(rados
)
89 def test_shutdown(self
):
90 rados
= Rados(conffile
='')
93 eq('shutdown', rados
.state
)
94 self
._requires
_configuring
(rados
)
95 self
._requires
_configuring
_or
_connected
(rados
)
96 self
._requires
_connected
(rados
)
99 class TestRados(object):
102 self
.rados
= Rados(conffile
='')
103 self
.rados
.conf_parse_env('FOO_DOES_NOT_EXIST_BLAHBLAH')
104 self
.rados
.conf_parse_env()
107 # Assume any pre-existing pools are the cluster's defaults
108 self
.default_pools
= self
.rados
.list_pools()
111 self
.rados
.shutdown()
113 def test_ping_monitor(self
):
114 assert_raises(ObjectNotFound
, self
.rados
.ping_monitor
, 'not_exists_monitor')
115 cmd
= {'prefix': 'mon dump', 'format':'json'}
116 ret
, buf
, out
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
117 for mon
in json
.loads(buf
.decode('utf8'))['mons']:
119 output
= self
.rados
.ping_monitor(mon
['name'])
122 buf
= json
.loads(output
)
123 if buf
.get('health'):
126 def test_annotations(self
):
127 with
assert_raises(TypeError):
128 self
.rados
.create_pool(0xf00)
130 def test_create(self
):
131 self
.rados
.create_pool('foo')
132 self
.rados
.delete_pool('foo')
134 def test_create_utf8(self
):
136 self
.rados
.create_pool(poolname
)
137 assert self
.rados
.pool_exists(u
"\u9ec4")
138 self
.rados
.delete_pool(poolname
)
140 def test_pool_lookup_utf8(self
):
142 self
.rados
.create_pool(poolname
)
144 poolid
= self
.rados
.pool_lookup(poolname
)
145 eq(poolname
, self
.rados
.pool_reverse_lookup(poolid
))
147 self
.rados
.delete_pool(poolname
)
149 def test_eexist(self
):
150 self
.rados
.create_pool('foo')
151 assert_raises(ObjectExists
, self
.rados
.create_pool
, 'foo')
152 self
.rados
.delete_pool('foo')
154 def list_non_default_pools(self
):
155 pools
= self
.rados
.list_pools()
156 for p
in self
.default_pools
:
160 def test_list_pools(self
):
161 eq(set(), self
.list_non_default_pools())
162 self
.rados
.create_pool('foo')
163 eq(set(['foo']), self
.list_non_default_pools())
164 self
.rados
.create_pool('bar')
165 eq(set(['foo', 'bar']), self
.list_non_default_pools())
166 self
.rados
.create_pool('baz')
167 eq(set(['foo', 'bar', 'baz']), self
.list_non_default_pools())
168 self
.rados
.delete_pool('foo')
169 eq(set(['bar', 'baz']), self
.list_non_default_pools())
170 self
.rados
.delete_pool('baz')
171 eq(set(['bar']), self
.list_non_default_pools())
172 self
.rados
.delete_pool('bar')
173 eq(set(), self
.list_non_default_pools())
174 self
.rados
.create_pool('a' * 500)
175 eq(set(['a' * 500]), self
.list_non_default_pools())
176 self
.rados
.delete_pool('a' * 500)
179 def test_get_pool_base_tier(self
):
180 self
.rados
.create_pool('foo')
182 self
.rados
.create_pool('foo-cache')
184 pool_id
= self
.rados
.pool_lookup('foo')
185 tier_pool_id
= self
.rados
.pool_lookup('foo-cache')
187 cmd
= {"prefix":"osd tier add", "pool":"foo", "tierpool":"foo-cache", "force_nonempty":""}
188 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
192 cmd
= {"prefix":"osd tier cache-mode", "pool":"foo-cache", "tierpool":"foo-cache", "mode":"readonly", "yes_i_really_mean_it": True}
193 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
196 eq(self
.rados
.wait_for_latest_osdmap(), 0)
198 eq(pool_id
, self
.rados
.get_pool_base_tier(pool_id
))
199 eq(pool_id
, self
.rados
.get_pool_base_tier(tier_pool_id
))
201 cmd
= {"prefix":"osd tier remove", "pool":"foo", "tierpool":"foo-cache"}
202 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
205 self
.rados
.delete_pool('foo-cache')
207 self
.rados
.delete_pool('foo')
209 def test_get_fsid(self
):
210 fsid
= self
.rados
.get_fsid()
211 assert re
.match('[0-9a-f\-]{36}', fsid
, re
.I
)
213 def test_blocklist_add(self
):
214 self
.rados
.blocklist_add("1.2.3.4/123", 1)
217 def test_get_cluster_stats(self
):
218 stats
= self
.rados
.get_cluster_stats()
219 assert stats
['kb'] > 0
220 assert stats
['kb_avail'] > 0
221 assert stats
['kb_used'] > 0
222 assert stats
['num_objects'] >= 0
224 def test_monitor_log(self
):
225 lock
= threading
.Condition()
226 def cb(arg
, line
, who
, sec
, nsec
, seq
, level
, msg
):
227 # NOTE(sileht): the old pyrados API was received the pointer as int
228 # instead of the value of arg
234 # NOTE(sileht): force don't save the monitor into local var
235 # to ensure all references are correctly tracked into the lib
236 MonitorLog(self
.rados
, "debug", cb
, "arg")
239 MonitorLog(self
.rados
, "debug", None, None)
240 eq(None, self
.rados
.monitor_callback
)
242 class TestIoctx(object):
245 self
.rados
= Rados(conffile
='')
247 self
.rados
.create_pool('test_pool')
248 assert self
.rados
.pool_exists('test_pool')
249 self
.ioctx
= self
.rados
.open_ioctx('test_pool')
252 cmd
= {"prefix":"osd unset", "key":"noup"}
253 self
.rados
.mon_command(json
.dumps(cmd
), b
'')
255 self
.rados
.delete_pool('test_pool')
256 self
.rados
.shutdown()
258 def test_get_last_version(self
):
259 version
= self
.ioctx
.get_last_version()
262 def test_get_stats(self
):
263 stats
= self
.ioctx
.get_stats()
264 eq(stats
, {'num_objects_unfound': 0,
265 'num_objects_missing_on_primary': 0,
266 'num_object_clones': 0,
268 'num_object_copies': 0,
274 'num_objects_degraded': 0,
277 def test_write(self
):
278 self
.ioctx
.write('abc', b
'abc')
279 eq(self
.ioctx
.read('abc'), b
'abc')
281 def test_write_full(self
):
282 self
.ioctx
.write('abc', b
'abc')
283 eq(self
.ioctx
.read('abc'), b
'abc')
284 self
.ioctx
.write_full('abc', b
'd')
285 eq(self
.ioctx
.read('abc'), b
'd')
287 def test_writesame(self
):
288 self
.ioctx
.writesame('ob', b
'rzx', 9)
289 eq(self
.ioctx
.read('ob'), b
'rzxrzxrzx')
291 def test_append(self
):
292 self
.ioctx
.write('abc', b
'a')
293 self
.ioctx
.append('abc', b
'b')
294 self
.ioctx
.append('abc', b
'c')
295 eq(self
.ioctx
.read('abc'), b
'abc')
297 def test_write_zeros(self
):
298 self
.ioctx
.write('abc', b
'a\0b\0c')
299 eq(self
.ioctx
.read('abc'), b
'a\0b\0c')
301 def test_trunc(self
):
302 self
.ioctx
.write('abc', b
'abc')
303 self
.ioctx
.trunc('abc', 2)
304 eq(self
.ioctx
.read('abc'), b
'ab')
305 size
= self
.ioctx
.stat('abc')[0]
308 def test_cmpext(self
):
309 self
.ioctx
.write('test_object', b
'abcdefghi')
310 eq(0, self
.ioctx
.cmpext('test_object', b
'abcdefghi', 0))
311 eq(-MAX_ERRNO
- 4, self
.ioctx
.cmpext('test_object', b
'abcdxxxxx', 0))
313 def test_list_objects_empty(self
):
314 eq(list(self
.ioctx
.list_objects()), [])
316 def test_list_objects(self
):
317 self
.ioctx
.write('a', b
'')
318 self
.ioctx
.write('b', b
'foo')
319 self
.ioctx
.write_full('c', b
'bar')
320 self
.ioctx
.append('d', b
'jazz')
321 object_names
= [obj
.key
for obj
in self
.ioctx
.list_objects()]
322 eq(sorted(object_names
), ['a', 'b', 'c', 'd'])
324 def test_list_ns_objects(self
):
325 self
.ioctx
.write('a', b
'')
326 self
.ioctx
.write('b', b
'foo')
327 self
.ioctx
.write_full('c', b
'bar')
328 self
.ioctx
.append('d', b
'jazz')
329 self
.ioctx
.set_namespace("ns1")
330 self
.ioctx
.write('ns1-a', b
'')
331 self
.ioctx
.write('ns1-b', b
'foo')
332 self
.ioctx
.write_full('ns1-c', b
'bar')
333 self
.ioctx
.append('ns1-d', b
'jazz')
334 self
.ioctx
.append('d', b
'jazz')
335 self
.ioctx
.set_namespace(LIBRADOS_ALL_NSPACES
)
336 object_names
= [(obj
.nspace
, obj
.key
) for obj
in self
.ioctx
.list_objects()]
337 eq(sorted(object_names
), [('', 'a'), ('','b'), ('','c'), ('','d'),\
338 ('ns1', 'd'), ('ns1', 'ns1-a'), ('ns1', 'ns1-b'),\
339 ('ns1', 'ns1-c'), ('ns1', 'ns1-d')])
341 def test_xattrs(self
):
342 xattrs
= dict(a
=b
'1', b
=b
'2', c
=b
'3', d
=b
'a\0b', e
=b
'\0', f
=b
'')
343 self
.ioctx
.write('abc', b
'')
344 for key
, value
in xattrs
.items():
345 self
.ioctx
.set_xattr('abc', key
, value
)
346 eq(self
.ioctx
.get_xattr('abc', key
), value
)
348 for key
, value
in self
.ioctx
.get_xattrs('abc'):
349 stored_xattrs
[key
] = value
350 eq(stored_xattrs
, xattrs
)
352 def test_obj_xattrs(self
):
353 xattrs
= dict(a
=b
'1', b
=b
'2', c
=b
'3', d
=b
'a\0b', e
=b
'\0', f
=b
'')
354 self
.ioctx
.write('abc', b
'')
355 obj
= list(self
.ioctx
.list_objects())[0]
356 for key
, value
in xattrs
.items():
357 obj
.set_xattr(key
, value
)
358 eq(obj
.get_xattr(key
), value
)
360 for key
, value
in obj
.get_xattrs():
361 stored_xattrs
[key
] = value
362 eq(stored_xattrs
, xattrs
)
364 def test_get_pool_id(self
):
365 eq(self
.ioctx
.get_pool_id(), self
.rados
.pool_lookup('test_pool'))
367 def test_get_pool_name(self
):
368 eq(self
.ioctx
.get_pool_name(), 'test_pool')
370 def test_create_snap(self
):
371 assert_raises(ObjectNotFound
, self
.ioctx
.remove_snap
, 'foo')
372 self
.ioctx
.create_snap('foo')
373 self
.ioctx
.remove_snap('foo')
375 def test_list_snaps_empty(self
):
376 eq(list(self
.ioctx
.list_snaps()), [])
378 def test_list_snaps(self
):
379 snaps
= ['snap1', 'snap2', 'snap3']
381 self
.ioctx
.create_snap(snap
)
382 listed_snaps
= [snap
.name
for snap
in self
.ioctx
.list_snaps()]
383 eq(snaps
, listed_snaps
)
385 def test_lookup_snap(self
):
386 self
.ioctx
.create_snap('foo')
387 snap
= self
.ioctx
.lookup_snap('foo')
390 def test_snap_timestamp(self
):
391 self
.ioctx
.create_snap('foo')
392 snap
= self
.ioctx
.lookup_snap('foo')
395 def test_remove_snap(self
):
396 self
.ioctx
.create_snap('foo')
397 (snap
,) = self
.ioctx
.list_snaps()
399 self
.ioctx
.remove_snap('foo')
400 eq(list(self
.ioctx
.list_snaps()), [])
403 def test_snap_rollback(self
):
404 self
.ioctx
.write("insnap", b
"contents1")
405 self
.ioctx
.create_snap("snap1")
406 self
.ioctx
.remove_object("insnap")
407 self
.ioctx
.snap_rollback("insnap", "snap1")
408 eq(self
.ioctx
.read("insnap"), b
"contents1")
409 self
.ioctx
.remove_snap("snap1")
410 self
.ioctx
.remove_object("insnap")
413 def test_snap_rollback_removed(self
):
414 self
.ioctx
.write("insnap", b
"contents1")
415 self
.ioctx
.create_snap("snap1")
416 self
.ioctx
.write("insnap", b
"contents2")
417 self
.ioctx
.snap_rollback("insnap", "snap1")
418 eq(self
.ioctx
.read("insnap"), b
"contents1")
419 self
.ioctx
.remove_snap("snap1")
420 self
.ioctx
.remove_object("insnap")
422 def test_snap_read(self
):
423 self
.ioctx
.write("insnap", b
"contents1")
424 self
.ioctx
.create_snap("snap1")
425 self
.ioctx
.remove_object("insnap")
426 snap
= self
.ioctx
.lookup_snap("snap1")
427 self
.ioctx
.set_read(snap
.snap_id
)
428 eq(self
.ioctx
.read("insnap"), b
"contents1")
429 self
.ioctx
.set_read(LIBRADOS_SNAP_HEAD
)
430 self
.ioctx
.write("inhead", b
"contents2")
431 eq(self
.ioctx
.read("inhead"), b
"contents2")
432 self
.ioctx
.remove_snap("snap1")
433 self
.ioctx
.remove_object("inhead")
435 def test_set_omap(self
):
436 keys
= ("1", "2", "3", "4")
437 values
= (b
"aaa", b
"bbb", b
"ccc", b
"\x04\x04\x04\x04")
438 with
WriteOpCtx() as write_op
:
439 self
.ioctx
.set_omap(write_op
, keys
, values
)
440 write_op
.set_flags(LIBRADOS_OPERATION_SKIPRWLOCKS
)
441 self
.ioctx
.operate_write_op(write_op
, "hw")
442 with
ReadOpCtx() as read_op
:
443 iter, ret
= self
.ioctx
.get_omap_vals(read_op
, "", "", 4)
445 self
.ioctx
.operate_read_op(read_op
, "hw")
447 eq(list(iter), [("2", b
"bbb"), ("3", b
"ccc"), ("4", b
"\x04\x04\x04\x04")])
448 with
ReadOpCtx() as read_op
:
449 iter, ret
= self
.ioctx
.get_omap_vals(read_op
, "2", "", 4)
451 self
.ioctx
.operate_read_op(read_op
, "hw")
452 eq(("3", b
"ccc"), next(iter))
453 eq(list(iter), [("4", b
"\x04\x04\x04\x04")])
454 with
ReadOpCtx() as read_op
:
455 iter, ret
= self
.ioctx
.get_omap_vals(read_op
, "", "2", 4)
457 read_op
.set_flags(LIBRADOS_OPERATION_BALANCE_READS
)
458 self
.ioctx
.operate_read_op(read_op
, "hw")
459 eq(list(iter), [("2", b
"bbb")])
461 def test_set_omap_aio(self
):
462 lock
= threading
.Condition()
470 keys
= ("1", "2", "3", "4")
471 values
= (b
"aaa", b
"bbb", b
"ccc", b
"\x04\x04\x04\x04")
472 with
WriteOpCtx() as write_op
:
473 self
.ioctx
.set_omap(write_op
, keys
, values
)
474 comp
= self
.ioctx
.operate_aio_write_op(write_op
, "hw", cb
, cb
)
475 comp
.wait_for_complete()
479 eq(comp
.get_return_value(), 0)
481 with
ReadOpCtx() as read_op
:
482 iter, ret
= self
.ioctx
.get_omap_vals(read_op
, "", "", 4)
484 comp
= self
.ioctx
.operate_aio_read_op(read_op
, "hw", cb
, cb
)
485 comp
.wait_for_complete()
489 eq(comp
.get_return_value(), 0)
491 eq(list(iter), [("2", b
"bbb"), ("3", b
"ccc"), ("4", b
"\x04\x04\x04\x04")])
493 def test_write_ops(self
):
494 with
WriteOpCtx() as write_op
:
496 self
.ioctx
.operate_write_op(write_op
, "write_ops")
497 eq(self
.ioctx
.read('write_ops'), b
'')
499 write_op
.write_full(b
'1')
500 write_op
.append(b
'2')
501 self
.ioctx
.operate_write_op(write_op
, "write_ops")
502 eq(self
.ioctx
.read('write_ops'), b
'12')
504 write_op
.write_full(b
'12345')
505 write_op
.write(b
'x', 2)
506 self
.ioctx
.operate_write_op(write_op
, "write_ops")
507 eq(self
.ioctx
.read('write_ops'), b
'12x45')
509 write_op
.write_full(b
'12345')
511 self
.ioctx
.operate_write_op(write_op
, "write_ops")
512 eq(self
.ioctx
.read('write_ops'), b
'12\x00\x005')
514 write_op
.write_full(b
'12345')
516 self
.ioctx
.operate_write_op(write_op
, "write_ops")
517 eq(self
.ioctx
.read('write_ops'), b
'12')
520 self
.ioctx
.operate_write_op(write_op
, "write_ops")
521 with
assert_raises(ObjectNotFound
):
522 self
.ioctx
.read('write_ops')
524 def test_execute_op(self
):
525 with
WriteOpCtx() as write_op
:
526 write_op
.execute("hello", "record_hello", b
"ebs")
527 self
.ioctx
.operate_write_op(write_op
, "object")
528 eq(self
.ioctx
.read('object'), b
"Hello, ebs!")
530 def test_writesame_op(self
):
531 with
WriteOpCtx() as write_op
:
532 write_op
.writesame(b
'rzx', 9)
533 self
.ioctx
.operate_write_op(write_op
, 'abc')
534 eq(self
.ioctx
.read('abc'), b
'rzxrzxrzx')
536 def test_get_omap_vals_by_keys(self
):
537 keys
= ("1", "2", "3", "4")
538 values
= (b
"aaa", b
"bbb", b
"ccc", b
"\x04\x04\x04\x04")
539 with
WriteOpCtx() as write_op
:
540 self
.ioctx
.set_omap(write_op
, keys
, values
)
541 self
.ioctx
.operate_write_op(write_op
, "hw")
542 with
ReadOpCtx() as read_op
:
543 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
,("3","4",))
545 self
.ioctx
.operate_read_op(read_op
, "hw")
546 eq(list(iter), [("3", b
"ccc"), ("4", b
"\x04\x04\x04\x04")])
547 with
ReadOpCtx() as read_op
:
548 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
,("3","4",))
550 with
assert_raises(ObjectNotFound
):
551 self
.ioctx
.operate_read_op(read_op
, "no_such")
553 def test_get_omap_keys(self
):
554 keys
= ("1", "2", "3")
555 values
= (b
"aaa", b
"bbb", b
"ccc")
556 with
WriteOpCtx() as write_op
:
557 self
.ioctx
.set_omap(write_op
, keys
, values
)
558 self
.ioctx
.operate_write_op(write_op
, "hw")
559 with
ReadOpCtx() as read_op
:
560 iter, ret
= self
.ioctx
.get_omap_keys(read_op
,"",2)
562 self
.ioctx
.operate_read_op(read_op
, "hw")
563 eq(list(iter), [("1", None), ("2", None)])
564 with
ReadOpCtx() as read_op
:
565 iter, ret
= self
.ioctx
.get_omap_keys(read_op
,"",2)
567 with
assert_raises(ObjectNotFound
):
568 self
.ioctx
.operate_read_op(read_op
, "no_such")
570 def test_clear_omap(self
):
571 keys
= ("1", "2", "3")
572 values
= (b
"aaa", b
"bbb", b
"ccc")
573 with
WriteOpCtx() as write_op
:
574 self
.ioctx
.set_omap(write_op
, keys
, values
)
575 self
.ioctx
.operate_write_op(write_op
, "hw")
576 with
WriteOpCtx() as write_op_1
:
577 self
.ioctx
.clear_omap(write_op_1
)
578 self
.ioctx
.operate_write_op(write_op_1
, "hw")
579 with
ReadOpCtx() as read_op
:
580 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
,("1",))
582 self
.ioctx
.operate_read_op(read_op
, "hw")
585 def test_remove_omap_range2(self
):
586 keys
= ("1", "2", "3", "4")
587 values
= (b
"a", b
"bb", b
"ccc", b
"dddd")
588 with
WriteOpCtx() as write_op
:
589 self
.ioctx
.set_omap(write_op
, keys
, values
)
590 self
.ioctx
.operate_write_op(write_op
, "test_obj")
591 with
ReadOpCtx() as read_op
:
592 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
, keys
)
594 self
.ioctx
.operate_read_op(read_op
, "test_obj")
595 eq(list(iter), list(zip(keys
, values
)))
596 with
WriteOpCtx() as write_op
:
597 self
.ioctx
.remove_omap_range2(write_op
, "1", "4")
598 self
.ioctx
.operate_write_op(write_op
, "test_obj")
599 with
ReadOpCtx() as read_op
:
600 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
, keys
)
602 self
.ioctx
.operate_read_op(read_op
, "test_obj")
603 eq(list(iter), [("4", b
"dddd")])
605 def test_omap_cmp(self
):
607 self
.ioctx
.write(object_id
, b
'omap_cmp')
608 with
WriteOpCtx() as write_op
:
609 self
.ioctx
.set_omap(write_op
, ('key1',), ('1',))
610 self
.ioctx
.operate_write_op(write_op
, object_id
)
611 with
WriteOpCtx() as write_op
:
612 write_op
.omap_cmp('key1', '1', LIBRADOS_CMPXATTR_OP_EQ
)
613 self
.ioctx
.set_omap(write_op
, ('key1',), ('2',))
614 self
.ioctx
.operate_write_op(write_op
, object_id
)
615 with
ReadOpCtx() as read_op
:
616 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
, ('key1',))
618 self
.ioctx
.operate_read_op(read_op
, object_id
)
619 eq(list(iter), [('key1', b
'2')])
620 with
WriteOpCtx() as write_op
:
621 write_op
.omap_cmp('key1', '1', LIBRADOS_CMPXATTR_OP_GT
)
622 self
.ioctx
.set_omap(write_op
, ('key1',), ('3',))
623 self
.ioctx
.operate_write_op(write_op
, object_id
)
624 with
ReadOpCtx() as read_op
:
625 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
, ('key1',))
627 self
.ioctx
.operate_read_op(read_op
, object_id
)
628 eq(list(iter), [('key1', b
'3')])
629 with
WriteOpCtx() as write_op
:
630 write_op
.omap_cmp('key1', '4', LIBRADOS_CMPXATTR_OP_LT
)
631 self
.ioctx
.set_omap(write_op
, ('key1',), ('4',))
632 self
.ioctx
.operate_write_op(write_op
, object_id
)
633 with
ReadOpCtx() as read_op
:
634 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
, ('key1',))
636 self
.ioctx
.operate_read_op(read_op
, object_id
)
637 eq(list(iter), [('key1', b
'4')])
638 with
WriteOpCtx() as write_op
:
639 write_op
.omap_cmp('key1', '1', LIBRADOS_CMPXATTR_OP_EQ
)
640 self
.ioctx
.set_omap(write_op
, ('key1',), ('5',))
642 self
.ioctx
.operate_write_op(write_op
, object_id
)
643 except (OSError, ExtendMismatch
) as e
:
646 message
= "omap_cmp did not raise Exception when omap content does not match"
647 raise AssertionError(message
)
649 def test_cmpext_op(self
):
651 with
WriteOpCtx() as write_op
:
652 write_op
.write(b
'12345', 0)
653 self
.ioctx
.operate_write_op(write_op
, object_id
)
654 with
WriteOpCtx() as write_op
:
655 write_op
.cmpext(b
'12345', 0)
656 write_op
.write(b
'54321', 0)
657 self
.ioctx
.operate_write_op(write_op
, object_id
)
658 eq(self
.ioctx
.read(object_id
), b
'54321')
659 with
WriteOpCtx() as write_op
:
660 write_op
.cmpext(b
'56789', 0)
661 write_op
.write(b
'12345', 0)
663 self
.ioctx
.operate_write_op(write_op
, object_id
)
664 except ExtendMismatch
as e
:
665 # the cmpext_result compare with expected error number, it should be (-MAX_ERRNO - 1)
666 # where "1" is the offset of the first unmatched byte
667 eq(-e
.errno
, -MAX_ERRNO
- 1)
670 message
= "cmpext did not raise Exception when object content does not match"
671 raise AssertionError(message
)
672 with
ReadOpCtx() as read_op
:
673 read_op
.cmpext(b
'54321', 0)
674 self
.ioctx
.operate_read_op(read_op
, object_id
)
675 with
ReadOpCtx() as read_op
:
676 read_op
.cmpext(b
'54789', 0)
678 self
.ioctx
.operate_read_op(read_op
, object_id
)
679 except ExtendMismatch
as e
:
680 # the cmpext_result compare with expected error number, it should be (-MAX_ERRNO - 2)
681 # where "2" is the offset of the first unmatched byte
682 eq(-e
.errno
, -MAX_ERRNO
- 2)
685 message
= "cmpext did not raise Exception when object content does not match"
686 raise AssertionError(message
)
688 def test_xattrs_op(self
):
689 xattrs
= dict(a
=b
'1', b
=b
'2', c
=b
'3', d
=b
'a\0b', e
=b
'\0')
690 with
WriteOpCtx() as write_op
:
691 write_op
.new(LIBRADOS_CREATE_EXCLUSIVE
)
692 for key
, value
in xattrs
.items():
693 write_op
.set_xattr(key
, value
)
694 self
.ioctx
.operate_write_op(write_op
, 'abc')
695 eq(self
.ioctx
.get_xattr('abc', key
), value
)
698 for key
, value
in self
.ioctx
.get_xattrs('abc'):
699 stored_xattrs_1
[key
] = value
700 eq(stored_xattrs_1
, xattrs
)
702 for key
in xattrs
.keys():
703 write_op
.rm_xattr(key
)
704 self
.ioctx
.operate_write_op(write_op
, 'abc')
706 for key
, value
in self
.ioctx
.get_xattrs('abc'):
707 stored_xattrs_2
[key
] = value
708 eq(stored_xattrs_2
, {})
711 self
.ioctx
.operate_write_op(write_op
, 'abc')
713 def test_locator(self
):
714 self
.ioctx
.set_locator_key("bar")
715 self
.ioctx
.write('foo', b
'contents1')
716 objects
= [i
for i
in self
.ioctx
.list_objects()]
718 eq(self
.ioctx
.get_locator_key(), "bar")
719 self
.ioctx
.set_locator_key("")
721 objects
[0].write(b
"contents2")
722 eq(self
.ioctx
.get_locator_key(), "")
723 self
.ioctx
.set_locator_key("bar")
724 contents
= self
.ioctx
.read("foo")
725 eq(contents
, b
"contents2")
726 eq(self
.ioctx
.get_locator_key(), "bar")
728 objects
= [i
for i
in self
.ioctx
.list_objects()]
730 self
.ioctx
.set_locator_key("")
732 def test_operate_aio_write_op(self
):
733 lock
= threading
.Condition()
740 with
WriteOpCtx() as write_op
:
741 write_op
.write(b
'rzx')
742 comp
= self
.ioctx
.operate_aio_write_op(write_op
, "object", cb
, cb
)
743 comp
.wait_for_complete()
747 eq(comp
.get_return_value(), 0)
748 eq(self
.ioctx
.read('object'), b
'rzx')
750 def test_aio_write(self
):
751 lock
= threading
.Condition()
758 comp
= self
.ioctx
.aio_write("foo", b
"bar", 0, cb
, cb
)
759 comp
.wait_for_complete()
763 eq(comp
.get_return_value(), 0)
764 contents
= self
.ioctx
.read("foo")
766 [i
.remove() for i
in self
.ioctx
.list_objects()]
768 def test_aio_cmpext(self
):
769 lock
= threading
.Condition()
777 self
.ioctx
.write('test_object', b
'abcdefghi')
778 comp
= self
.ioctx
.aio_cmpext('test_object', b
'abcdefghi', 0, cb
)
779 comp
.wait_for_complete()
783 eq(comp
.get_return_value(), 0)
785 def test_aio_rmxattr(self
):
786 lock
= threading
.Condition()
793 self
.ioctx
.set_xattr("xyz", "key", b
'value')
794 eq(self
.ioctx
.get_xattr("xyz", "key"), b
'value')
795 comp
= self
.ioctx
.aio_rmxattr("xyz", "key", cb
)
796 comp
.wait_for_complete()
800 eq(comp
.get_return_value(), 0)
801 with
assert_raises(NoData
):
802 self
.ioctx
.get_xattr("xyz", "key")
804 def test_aio_write_no_comp_ref(self
):
805 lock
= threading
.Condition()
812 # NOTE(sileht): force don't save the comp into local var
813 # to ensure all references are correctly tracked into the lib
814 self
.ioctx
.aio_write("foo", b
"bar", 0, cb
, cb
)
818 contents
= self
.ioctx
.read("foo")
820 [i
.remove() for i
in self
.ioctx
.list_objects()]
822 def test_aio_append(self
):
823 lock
= threading
.Condition()
830 comp
= self
.ioctx
.aio_write("foo", b
"bar", 0, cb
, cb
)
831 comp2
= self
.ioctx
.aio_append("foo", b
"baz", cb
, cb
)
832 comp
.wait_for_complete()
833 contents
= self
.ioctx
.read("foo")
834 eq(contents
, b
"barbaz")
838 eq(comp
.get_return_value(), 0)
839 eq(comp2
.get_return_value(), 0)
840 [i
.remove() for i
in self
.ioctx
.list_objects()]
842 def test_aio_write_full(self
):
843 lock
= threading
.Condition()
850 self
.ioctx
.aio_write("foo", b
"barbaz", 0, cb
, cb
)
851 comp
= self
.ioctx
.aio_write_full("foo", b
"bar", cb
, cb
)
852 comp
.wait_for_complete()
856 eq(comp
.get_return_value(), 0)
857 contents
= self
.ioctx
.read("foo")
859 [i
.remove() for i
in self
.ioctx
.list_objects()]
861 def test_aio_writesame(self
):
862 lock
= threading
.Condition()
869 comp
= self
.ioctx
.aio_writesame("abc", b
"rzx", 9, 0, cb
)
870 comp
.wait_for_complete()
874 eq(comp
.get_return_value(), 0)
875 eq(self
.ioctx
.read("abc"), b
"rzxrzxrzx")
876 [i
.remove() for i
in self
.ioctx
.list_objects()]
878 def test_aio_stat(self
):
879 lock
= threading
.Condition()
881 def cb(_
, size
, mtime
):
886 comp
= self
.ioctx
.aio_stat("foo", cb
)
887 comp
.wait_for_complete()
891 eq(comp
.get_return_value(), -2)
893 self
.ioctx
.write("foo", b
"bar")
895 comp
= self
.ioctx
.aio_stat("foo", cb
)
896 comp
.wait_for_complete()
900 eq(comp
.get_return_value(), 0)
902 [i
.remove() for i
in self
.ioctx
.list_objects()]
904 def test_aio_remove(self
):
905 lock
= threading
.Condition()
912 self
.ioctx
.write('foo', b
'wrx')
913 eq(self
.ioctx
.read('foo'), b
'wrx')
914 comp
= self
.ioctx
.aio_remove('foo', cb
, cb
)
915 comp
.wait_for_complete()
919 eq(comp
.get_return_value(), 0)
920 eq(list(self
.ioctx
.list_objects()), [])
922 def _take_down_acting_set(self
, pool
, objectname
):
923 # find acting_set for pool:objectname and take it down; used to
924 # verify that async reads don't complete while acting set is missing
931 r
, jsonout
, _
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
932 objmap
= json
.loads(jsonout
.decode("utf-8"))
933 acting_set
= objmap
['acting']
934 cmd
= {"prefix":"osd set", "key":"noup"}
935 r
, _
, _
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
937 cmd
= {"prefix":"osd down", "ids":[str(i
) for i
in acting_set
]}
938 r
, _
, _
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
941 # wait for OSDs to acknowledge the down
942 eq(self
.rados
.wait_for_latest_osdmap(), 0)
944 def _let_osds_back_up(self
):
945 cmd
= {"prefix":"osd unset", "key":"noup"}
946 r
, _
, _
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
950 def test_aio_read_wait_for_complete(self
):
951 # use wait_for_complete() and wait for cb by
954 # this is a list so that the local cb() can modify it
955 payload
= b
"bar\000frob"
956 self
.ioctx
.write("foo", payload
)
957 self
._take
_down
_acting
_set
('test_pool', 'foo')
960 lock
= threading
.Condition()
966 comp
= self
.ioctx
.aio_read("foo", len(payload
), 0, cb
)
967 eq(False, comp
.is_complete())
969 eq(False, comp
.is_complete())
973 self
._let
_osds
_back
_up
()
974 comp
.wait_for_complete()
977 while retval
[0] is None and loops
<= 10:
982 eq(retval
[0], payload
)
983 eq(sys
.getrefcount(comp
), 2)
986 def test_aio_read_wait_for_complete_and_cb(self
):
987 # use wait_for_complete_and_cb(), verify retval[0] is
988 # set by the time we regain control
989 payload
= b
"bar\000frob"
990 self
.ioctx
.write("foo", payload
)
992 self
._take
_down
_acting
_set
('test_pool', 'foo')
993 # this is a list so that the local cb() can modify it
995 lock
= threading
.Condition()
1000 comp
= self
.ioctx
.aio_read("foo", len(payload
), 0, cb
)
1001 eq(False, comp
.is_complete())
1003 eq(False, comp
.is_complete())
1007 self
._let
_osds
_back
_up
()
1008 comp
.wait_for_complete_and_cb()
1009 assert(retval
[0] is not None)
1010 eq(retval
[0], payload
)
1011 eq(sys
.getrefcount(comp
), 2)
1014 def test_aio_read_wait_for_complete_and_cb_error(self
):
1015 # error case, use wait_for_complete_and_cb(), verify retval[0] is
1016 # set by the time we regain control
1017 self
._take
_down
_acting
_set
('test_pool', 'bar')
1019 # this is a list so that the local cb() can modify it
1021 lock
= threading
.Condition()
1027 # read from a DNE object
1028 comp
= self
.ioctx
.aio_read("bar", 3, 0, cb
)
1029 eq(False, comp
.is_complete())
1031 eq(False, comp
.is_complete())
1034 self
._let
_osds
_back
_up
()
1036 comp
.wait_for_complete_and_cb()
1038 assert(comp
.get_return_value() < 0)
1039 eq(sys
.getrefcount(comp
), 2)
1041 def test_lock(self
):
1042 self
.ioctx
.lock_exclusive("foo", "lock", "locker", "desc_lock",
1044 assert_raises(ObjectExists
,
1045 self
.ioctx
.lock_exclusive
,
1046 "foo", "lock", "locker", "desc_lock", 10000, 0)
1047 self
.ioctx
.unlock("foo", "lock", "locker")
1048 assert_raises(ObjectNotFound
, self
.ioctx
.unlock
, "foo", "lock", "locker")
1050 self
.ioctx
.lock_shared("foo", "lock", "locker1", "tag", "desc_lock",
1052 self
.ioctx
.lock_shared("foo", "lock", "locker2", "tag", "desc_lock",
1054 assert_raises(ObjectBusy
,
1055 self
.ioctx
.lock_exclusive
,
1056 "foo", "lock", "locker3", "desc_lock", 10000, 0)
1057 self
.ioctx
.unlock("foo", "lock", "locker1")
1058 self
.ioctx
.unlock("foo", "lock", "locker2")
1059 assert_raises(ObjectNotFound
, self
.ioctx
.unlock
, "foo", "lock", "locker1")
1060 assert_raises(ObjectNotFound
, self
.ioctx
.unlock
, "foo", "lock", "locker2")
1062 def test_execute(self
):
1063 self
.ioctx
.write("foo", b
"") # ensure object exists
1065 ret
, buf
= self
.ioctx
.execute("foo", "hello", "say_hello", b
"")
1066 eq(buf
, b
"Hello, world!")
1068 ret
, buf
= self
.ioctx
.execute("foo", "hello", "say_hello", b
"nose")
1069 eq(buf
, b
"Hello, nose!")
1071 def test_aio_execute(self
):
1074 lock
= threading
.Condition()
1077 if retval
[0] is None:
1081 self
.ioctx
.write("foo", b
"") # ensure object exists
1083 comp
= self
.ioctx
.aio_execute("foo", "hello", "say_hello", b
"", 32, cb
, cb
)
1084 comp
.wait_for_complete()
1088 eq(comp
.get_return_value(), 13)
1089 eq(retval
[0], b
"Hello, world!")
1092 comp
= self
.ioctx
.aio_execute("foo", "hello", "say_hello", b
"nose", 32, cb
, cb
)
1093 comp
.wait_for_complete()
1097 eq(comp
.get_return_value(), 12)
1098 eq(retval
[0], b
"Hello, nose!")
1100 [i
.remove() for i
in self
.ioctx
.list_objects()]
1102 def test_aio_setxattr(self
):
1103 lock
= threading
.Condition()
1110 comp
= self
.ioctx
.aio_setxattr("obj", "key", b
'value', cb
)
1111 comp
.wait_for_complete()
1115 eq(comp
.get_return_value(), 0)
1116 eq(self
.ioctx
.get_xattr("obj", "key"), b
'value')
1118 def test_applications(self
):
1119 cmd
= {"prefix":"osd dump", "format":"json"}
1120 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
1123 release
= json
.loads(buf
.decode("utf-8")).get("require_osd_release",
1125 if not release
or release
[0] < 'l':
1128 eq([], self
.ioctx
.application_list())
1130 self
.ioctx
.application_enable("app1")
1131 assert_raises(Error
, self
.ioctx
.application_enable
, "app2")
1132 self
.ioctx
.application_enable("app2", True)
1134 assert_raises(Error
, self
.ioctx
.application_metadata_list
, "dne")
1135 eq([], self
.ioctx
.application_metadata_list("app1"))
1137 assert_raises(Error
, self
.ioctx
.application_metadata_set
, "dne", "key",
1139 self
.ioctx
.application_metadata_set("app1", "key1", "val1")
1140 eq("val1", self
.ioctx
.application_metadata_get("app1", "key1"))
1141 self
.ioctx
.application_metadata_set("app1", "key2", "val2")
1142 eq("val2", self
.ioctx
.application_metadata_get("app1", "key2"))
1143 self
.ioctx
.application_metadata_set("app2", "key1", "val1")
1144 eq("val1", self
.ioctx
.application_metadata_get("app2", "key1"))
1146 eq([("key1", "val1"), ("key2", "val2")],
1147 self
.ioctx
.application_metadata_list("app1"))
1149 self
.ioctx
.application_metadata_remove("app1", "key1")
1150 eq([("key2", "val2")], self
.ioctx
.application_metadata_list("app1"))
1152 def test_service_daemon(self
):
1153 name
= "pid-" + str(os
.getpid())
1154 metadata
= {'version': '3.14', 'memory': '42'}
1155 self
.rados
.service_daemon_register("laundry", name
, metadata
)
1156 status
= {'result': 'unknown', 'test': 'running'}
1157 self
.rados
.service_daemon_update(status
)
1159 def test_alignment(self
):
1160 eq(self
.ioctx
.alignment(), None)
1164 class TestIoctxEc(object):
1167 self
.rados
= Rados(conffile
='')
1168 self
.rados
.connect()
1169 self
.pool
= 'test-ec'
1170 self
.profile
= 'testprofile-%s' % self
.pool
1171 cmd
= {"prefix": "osd erasure-code-profile set",
1172 "name": self
.profile
, "profile": ["k=2", "m=1", "crush-failure-domain=osd"]}
1173 ret
, buf
, out
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
1175 # create ec pool with profile created above
1176 cmd
= {'prefix': 'osd pool create', 'pg_num': 8, 'pgp_num': 8,
1177 'pool': self
.pool
, 'pool_type': 'erasure',
1178 'erasure_code_profile': self
.profile
}
1179 ret
, buf
, out
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
1181 assert self
.rados
.pool_exists(self
.pool
)
1182 self
.ioctx
= self
.rados
.open_ioctx(self
.pool
)
1185 cmd
= {"prefix": "osd unset", "key": "noup"}
1186 self
.rados
.mon_command(json
.dumps(cmd
), b
'')
1188 self
.rados
.delete_pool(self
.pool
)
1189 self
.rados
.shutdown()
1191 def test_alignment(self
):
1192 eq(self
.ioctx
.alignment(), 8192)
1195 class TestIoctx2(object):
1198 self
.rados
= Rados(conffile
='')
1199 self
.rados
.connect()
1200 self
.rados
.create_pool('test_pool')
1201 assert self
.rados
.pool_exists('test_pool')
1202 pool_id
= self
.rados
.pool_lookup('test_pool')
1204 self
.ioctx2
= self
.rados
.open_ioctx2(pool_id
)
1207 cmd
= {"prefix": "osd unset", "key": "noup"}
1208 self
.rados
.mon_command(json
.dumps(cmd
), b
'')
1210 self
.rados
.delete_pool('test_pool')
1211 self
.rados
.shutdown()
1213 def test_get_last_version(self
):
1214 version
= self
.ioctx2
.get_last_version()
1217 def test_get_stats(self
):
1218 stats
= self
.ioctx2
.get_stats()
1219 eq(stats
, {'num_objects_unfound': 0,
1220 'num_objects_missing_on_primary': 0,
1221 'num_object_clones': 0,
1223 'num_object_copies': 0,
1229 'num_objects_degraded': 0,
1233 class TestObject(object):
1236 self
.rados
= Rados(conffile
='')
1237 self
.rados
.connect()
1238 self
.rados
.create_pool('test_pool')
1239 assert self
.rados
.pool_exists('test_pool')
1240 self
.ioctx
= self
.rados
.open_ioctx('test_pool')
1241 self
.ioctx
.write('foo', b
'bar')
1242 self
.object = Object(self
.ioctx
, 'foo')
1247 self
.rados
.delete_pool('test_pool')
1248 self
.rados
.shutdown()
1251 def test_read(self
):
1252 eq(self
.object.read(3), b
'bar')
1253 eq(self
.object.read(100), b
'')
1255 def test_seek(self
):
1256 self
.object.write(b
'blah')
1258 eq(self
.object.read(4), b
'blah')
1260 eq(self
.object.read(3), b
'lah')
1262 def test_write(self
):
1263 self
.object.write(b
'barbaz')
1265 eq(self
.object.read(3), b
'bar')
1266 eq(self
.object.read(3), b
'baz')
1268 class TestIoCtxSelfManagedSnaps(object):
1270 self
.rados
= Rados(conffile
='')
1271 self
.rados
.connect()
1272 self
.rados
.create_pool('test_pool')
1273 assert self
.rados
.pool_exists('test_pool')
1274 self
.ioctx
= self
.rados
.open_ioctx('test_pool')
1277 cmd
= {"prefix":"osd unset", "key":"noup"}
1278 self
.rados
.mon_command(json
.dumps(cmd
), b
'')
1280 self
.rados
.delete_pool('test_pool')
1281 self
.rados
.shutdown()
1285 # cannot mix-and-match pool and self-managed snapshot mode
1286 self
.ioctx
.set_self_managed_snap_write([])
1287 self
.ioctx
.write('abc', b
'abc')
1288 snap_id_1
= self
.ioctx
.create_self_managed_snap()
1289 self
.ioctx
.set_self_managed_snap_write([snap_id_1
])
1291 self
.ioctx
.write('abc', b
'def')
1292 snap_id_2
= self
.ioctx
.create_self_managed_snap()
1293 self
.ioctx
.set_self_managed_snap_write([snap_id_1
, snap_id_2
])
1295 self
.ioctx
.write('abc', b
'ghi')
1297 self
.ioctx
.rollback_self_managed_snap('abc', snap_id_1
)
1298 eq(self
.ioctx
.read('abc'), b
'abc')
1300 self
.ioctx
.rollback_self_managed_snap('abc', snap_id_2
)
1301 eq(self
.ioctx
.read('abc'), b
'def')
1303 self
.ioctx
.remove_self_managed_snap(snap_id_1
)
1304 self
.ioctx
.remove_self_managed_snap(snap_id_2
)
1306 class TestCommand(object):
1309 self
.rados
= Rados(conffile
='')
1310 self
.rados
.connect()
1313 self
.rados
.shutdown()
1315 def test_monmap_dump(self
):
1317 # check for success and some plain output with epoch in it
1318 cmd
= {"prefix":"mon dump"}
1319 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
1322 assert(b
'epoch' in buf
)
1324 # JSON, and grab current epoch
1325 cmd
['format'] = 'json'
1326 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
1329 d
= json
.loads(buf
.decode("utf-8"))
1330 assert('epoch' in d
)
1333 # assume epoch + 1000 does not exist; test for ENOENT
1334 cmd
['epoch'] = epoch
+ 1000
1335 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
1336 eq(ret
, -errno
.ENOENT
)
1340 # send to specific target by name, rank
1341 cmd
= {"prefix": "version"}
1343 target
= d
['mons'][0]['name']
1345 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30,
1349 e
= json
.loads(buf
.decode("utf-8"))
1350 assert('release' in e
)
1352 target
= d
['mons'][0]['rank']
1354 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30,
1358 e
= json
.loads(buf
.decode("utf-8"))
1359 assert('release' in e
)
1362 def test_osd_bench(self
):
1363 cmd
= dict(prefix
='bench', size
=4096, count
=8192)
1364 ret
, buf
, err
= self
.rados
.osd_command(0, json
.dumps(cmd
), b
'',
1368 out
= json
.loads(buf
.decode('utf-8'))
1369 eq(out
['blocksize'], cmd
['size'])
1370 eq(out
['bytes_written'], cmd
['count'])
1372 def test_ceph_osd_pool_create_utf8(self
):
1375 cmd
= {"prefix": "osd pool create", "pg_num": 16, "pool": poolname
}
1376 ret
, buf
, out
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
1379 eq(u
"pool '\u9ec5' created", out
)
1383 class TestWatchNotify(object):
1384 OID
= "test_watch_notify"
1387 self
.rados
= Rados(conffile
='')
1388 self
.rados
.connect()
1389 self
.rados
.create_pool('test_pool')
1390 assert self
.rados
.pool_exists('test_pool')
1391 self
.ioctx
= self
.rados
.open_ioctx('test_pool')
1392 self
.ioctx
.write(self
.OID
, b
'test watch notify')
1393 self
.lock
= threading
.Condition()
1394 self
.notify_cnt
= {}
1395 self
.notify_data
= {}
1396 self
.notify_error
= {}
1400 self
.instance_id
= self
.rados
.get_instance_id()
1404 self
.rados
.delete_pool('test_pool')
1405 self
.rados
.shutdown()
1407 def make_callback(self
):
1408 def callback(notify_id
, notifier_id
, watch_id
, data
):
1410 if watch_id
not in self
.notify_cnt
:
1411 self
.notify_cnt
[watch_id
] = 1
1412 elif self
.notify_data
[watch_id
] != data
:
1413 self
.notify_cnt
[watch_id
] += 1
1414 self
.notify_data
[watch_id
] = data
1417 def make_error_callback(self
):
1418 def callback(watch_id
, error
):
1420 self
.notify_error
[watch_id
] = error
1425 with self
.ioctx
.watch(self
.OID
, self
.make_callback(),
1426 self
.make_error_callback()) as watch1
:
1427 watch_id1
= watch1
.get_id()
1428 assert(watch_id1
> 0)
1430 with self
.rados
.open_ioctx('test_pool') as ioctx
:
1431 watch2
= ioctx
.watch(self
.OID
, self
.make_callback(),
1432 self
.make_error_callback())
1433 watch_id2
= watch2
.get_id()
1434 assert(watch_id2
> 0)
1436 assert(self
.ioctx
.notify(self
.OID
, 'test'))
1438 assert(watch_id1
in self
.notify_cnt
)
1439 assert(watch_id2
in self
.notify_cnt
)
1440 eq(self
.notify_cnt
[watch_id1
], 1)
1441 eq(self
.notify_cnt
[watch_id2
], 1)
1442 eq(self
.notify_data
[watch_id1
], b
'test')
1443 eq(self
.notify_data
[watch_id2
], b
'test')
1445 assert(watch1
.check() >= timedelta())
1446 assert(watch2
.check() >= timedelta())
1448 assert(self
.ioctx
.notify(self
.OID
, 'best'))
1450 eq(self
.notify_cnt
[watch_id1
], 2)
1451 eq(self
.notify_cnt
[watch_id2
], 2)
1452 eq(self
.notify_data
[watch_id1
], b
'best')
1453 eq(self
.notify_data
[watch_id2
], b
'best')
1457 assert(self
.ioctx
.notify(self
.OID
, 'rest'))
1459 eq(self
.notify_cnt
[watch_id1
], 3)
1460 eq(self
.notify_cnt
[watch_id2
], 2)
1461 eq(self
.notify_data
[watch_id1
], b
'rest')
1462 eq(self
.notify_data
[watch_id2
], b
'best')
1464 assert(watch1
.check() >= timedelta())
1466 self
.ioctx
.remove_object(self
.OID
)
1470 if watch_id1
in self
.notify_error
:
1473 eq(self
.notify_error
[watch_id1
], -errno
.ENOTCONN
)
1474 assert_raises(NotConnected
, watch1
.check
)
1476 assert_raises(ObjectNotFound
, self
.ioctx
.notify
, self
.OID
, 'test')
1478 def make_callback_reply(self
):
1479 def callback(notify_id
, notifier_id
, watch_id
, data
):
1484 def notify_callback(self
, _
, r
, ack_list
, timeout_list
):
1487 for notifier_id
, _
, notifier_data
in ack_list
:
1488 if notifier_id
not in self
.ack_cnt
:
1489 self
.ack_cnt
[notifier_id
] = 0
1490 self
.ack_cnt
[notifier_id
] += 1
1491 self
.ack_data
[notifier_id
] = notifier_data
1493 def notify_callback_err(self
, _
, r
, ack_list
, timeout_list
):
1494 eq(r
, -errno
.ENOENT
)
1496 def test_aio_notify(self
):
1497 with self
.ioctx
.watch(self
.OID
, self
.make_callback_reply(),
1498 self
.make_error_callback()) as watch1
:
1499 watch_id1
= watch1
.get_id()
1502 with self
.rados
.open_ioctx('test_pool') as ioctx
:
1503 watch2
= ioctx
.watch(self
.OID
, self
.make_callback_reply(),
1504 self
.make_error_callback())
1505 watch_id2
= watch2
.get_id()
1508 comp
= self
.ioctx
.aio_notify(self
.OID
, self
.notify_callback
, msg
='test')
1509 comp
.wait_for_complete_and_cb()
1511 ok(self
.instance_id
in self
.ack_cnt
)
1512 eq(self
.ack_cnt
[self
.instance_id
], 2)
1513 eq(self
.ack_data
[self
.instance_id
], b
'test')
1515 ok(watch1
.check() >= timedelta())
1516 ok(watch2
.check() >= timedelta())
1518 comp
= self
.ioctx
.aio_notify(self
.OID
, self
.notify_callback
, msg
='best')
1519 comp
.wait_for_complete_and_cb()
1521 eq(self
.ack_cnt
[self
.instance_id
], 4)
1522 eq(self
.ack_data
[self
.instance_id
], b
'best')
1526 comp
= self
.ioctx
.aio_notify(self
.OID
, self
.notify_callback
, msg
='rest')
1527 comp
.wait_for_complete_and_cb()
1529 eq(self
.ack_cnt
[self
.instance_id
], 5)
1530 eq(self
.ack_data
[self
.instance_id
], b
'rest')
1532 assert(watch1
.check() >= timedelta())
1533 self
.ioctx
.remove_object(self
.OID
)
1537 if watch_id1
in self
.notify_error
:
1540 eq(self
.notify_error
[watch_id1
], -errno
.ENOTCONN
)
1541 assert_raises(NotConnected
, watch1
.check
)
1543 comp
= self
.ioctx
.aio_notify(self
.OID
, self
.notify_callback_err
, msg
='test')
1544 comp
.wait_for_complete_and_cb()