1 from __future__
import print_function
2 from assertions
import assert_equal
as eq
, assert_raises
3 from rados
import (Rados
, Error
, RadosStateError
, Object
, ObjectExists
,
4 ObjectNotFound
, ObjectBusy
, NotConnected
,
5 LIBRADOS_ALL_NSPACES
, WriteOpCtx
, ReadOpCtx
, LIBRADOS_CREATE_EXCLUSIVE
,
6 LIBRADOS_CMPXATTR_OP_EQ
, LIBRADOS_CMPXATTR_OP_GT
, LIBRADOS_CMPXATTR_OP_LT
, OSError,
7 LIBRADOS_SNAP_HEAD
, LIBRADOS_OPERATION_BALANCE_READS
, LIBRADOS_OPERATION_SKIPRWLOCKS
, MonitorLog
, MAX_ERRNO
, NoData
, ExtendMismatch
)
8 from datetime
import timedelta
18 def test_rados_init_error():
19 assert_raises(Error
, Rados
, conffile
='', rados_id
='admin',
21 assert_raises(Error
, Rados
, conffile
='', name
='invalid')
22 assert_raises(Error
, Rados
, conffile
='', name
='bad.invalid')
24 def test_rados_init():
25 with
Rados(conffile
='', rados_id
='admin'):
27 with
Rados(conffile
='', name
='client.admin'):
29 with
Rados(conffile
='', name
='client.admin'):
31 with
Rados(conffile
='', name
='client.admin'):
34 def test_ioctx_context_manager():
35 with
Rados(conffile
='', rados_id
='admin') as conn
:
36 with conn
.open_ioctx('rbd') as ioctx
:
39 def test_parse_argv():
40 args
= ['osd', 'pool', 'delete', 'foobar', 'foobar', '--yes-i-really-really-mean-it']
42 eq(args
, r
.conf_parse_argv(args
))
44 def test_parse_argv_empty_str():
47 eq(args
, r
.conf_parse_argv(args
))
49 class TestRadosStateError(object):
50 def _requires_configuring(self
, rados
):
51 assert_raises(RadosStateError
, rados
.connect
)
53 def _requires_configuring_or_connected(self
, rados
):
54 assert_raises(RadosStateError
, rados
.conf_read_file
)
55 assert_raises(RadosStateError
, rados
.conf_parse_argv
, None)
56 assert_raises(RadosStateError
, rados
.conf_parse_env
)
57 assert_raises(RadosStateError
, rados
.conf_get
, 'opt')
58 assert_raises(RadosStateError
, rados
.conf_set
, 'opt', 'val')
59 assert_raises(RadosStateError
, rados
.ping_monitor
, '0')
61 def _requires_connected(self
, rados
):
62 assert_raises(RadosStateError
, rados
.pool_exists
, 'foo')
63 assert_raises(RadosStateError
, rados
.pool_lookup
, 'foo')
64 assert_raises(RadosStateError
, rados
.pool_reverse_lookup
, 0)
65 assert_raises(RadosStateError
, rados
.create_pool
, 'foo')
66 assert_raises(RadosStateError
, rados
.get_pool_base_tier
, 0)
67 assert_raises(RadosStateError
, rados
.delete_pool
, 'foo')
68 assert_raises(RadosStateError
, rados
.list_pools
)
69 assert_raises(RadosStateError
, rados
.get_fsid
)
70 assert_raises(RadosStateError
, rados
.open_ioctx
, 'foo')
71 assert_raises(RadosStateError
, rados
.mon_command
, '', b
'')
72 assert_raises(RadosStateError
, rados
.osd_command
, 0, '', b
'')
73 assert_raises(RadosStateError
, rados
.pg_command
, '', '', b
'')
74 assert_raises(RadosStateError
, rados
.wait_for_latest_osdmap
)
75 assert_raises(RadosStateError
, rados
.blocklist_add
, '127.0.0.1/123', 0)
77 def test_configuring(self
):
78 rados
= Rados(conffile
='')
79 eq('configuring', rados
.state
)
80 self
._requires
_connected
(rados
)
82 def test_connected(self
):
83 rados
= Rados(conffile
='')
85 eq('connected', rados
.state
)
86 self
._requires
_configuring
(rados
)
88 def test_shutdown(self
):
89 rados
= Rados(conffile
='')
92 eq('shutdown', rados
.state
)
93 self
._requires
_configuring
(rados
)
94 self
._requires
_configuring
_or
_connected
(rados
)
95 self
._requires
_connected
(rados
)
98 class TestRados(object):
100 def setup_method(self
, method
):
101 self
.rados
= Rados(conffile
='')
102 self
.rados
.conf_parse_env('FOO_DOES_NOT_EXIST_BLAHBLAH')
103 self
.rados
.conf_parse_env()
106 # Assume any pre-existing pools are the cluster's defaults
107 self
.default_pools
= self
.rados
.list_pools()
109 def teardown_method(self
, method
):
110 self
.rados
.shutdown()
112 def test_ping_monitor(self
):
113 assert_raises(ObjectNotFound
, self
.rados
.ping_monitor
, 'not_exists_monitor')
114 cmd
= {'prefix': 'mon dump', 'format':'json'}
115 ret
, buf
, out
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
116 for mon
in json
.loads(buf
.decode('utf8'))['mons']:
118 output
= self
.rados
.ping_monitor(mon
['name'])
121 buf
= json
.loads(output
)
122 if buf
.get('health'):
125 def test_annotations(self
):
126 with pytest
.raises(TypeError):
127 self
.rados
.create_pool(0xf00)
129 def test_create(self
):
130 self
.rados
.create_pool('foo')
131 self
.rados
.delete_pool('foo')
133 def test_create_utf8(self
):
135 self
.rados
.create_pool(poolname
)
136 assert self
.rados
.pool_exists(u
"\u9ec4")
137 self
.rados
.delete_pool(poolname
)
139 def test_pool_lookup_utf8(self
):
141 self
.rados
.create_pool(poolname
)
143 poolid
= self
.rados
.pool_lookup(poolname
)
144 eq(poolname
, self
.rados
.pool_reverse_lookup(poolid
))
146 self
.rados
.delete_pool(poolname
)
148 def test_eexist(self
):
149 self
.rados
.create_pool('foo')
150 assert_raises(ObjectExists
, self
.rados
.create_pool
, 'foo')
151 self
.rados
.delete_pool('foo')
153 def list_non_default_pools(self
):
154 pools
= self
.rados
.list_pools()
155 for p
in self
.default_pools
:
159 def test_list_pools(self
):
160 eq(set(), self
.list_non_default_pools())
161 self
.rados
.create_pool('foo')
162 eq(set(['foo']), self
.list_non_default_pools())
163 self
.rados
.create_pool('bar')
164 eq(set(['foo', 'bar']), self
.list_non_default_pools())
165 self
.rados
.create_pool('baz')
166 eq(set(['foo', 'bar', 'baz']), self
.list_non_default_pools())
167 self
.rados
.delete_pool('foo')
168 eq(set(['bar', 'baz']), self
.list_non_default_pools())
169 self
.rados
.delete_pool('baz')
170 eq(set(['bar']), self
.list_non_default_pools())
171 self
.rados
.delete_pool('bar')
172 eq(set(), self
.list_non_default_pools())
173 self
.rados
.create_pool('a' * 500)
174 eq(set(['a' * 500]), self
.list_non_default_pools())
175 self
.rados
.delete_pool('a' * 500)
178 def test_get_pool_base_tier(self
):
179 self
.rados
.create_pool('foo')
181 self
.rados
.create_pool('foo-cache')
183 pool_id
= self
.rados
.pool_lookup('foo')
184 tier_pool_id
= self
.rados
.pool_lookup('foo-cache')
186 cmd
= {"prefix":"osd tier add", "pool":"foo", "tierpool":"foo-cache", "force_nonempty":""}
187 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
191 cmd
= {"prefix":"osd tier cache-mode", "pool":"foo-cache", "tierpool":"foo-cache", "mode":"readonly", "yes_i_really_mean_it": True}
192 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
195 eq(self
.rados
.wait_for_latest_osdmap(), 0)
197 eq(pool_id
, self
.rados
.get_pool_base_tier(pool_id
))
198 eq(pool_id
, self
.rados
.get_pool_base_tier(tier_pool_id
))
200 cmd
= {"prefix":"osd tier remove", "pool":"foo", "tierpool":"foo-cache"}
201 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
204 self
.rados
.delete_pool('foo-cache')
206 self
.rados
.delete_pool('foo')
208 def test_get_fsid(self
):
209 fsid
= self
.rados
.get_fsid()
210 assert re
.match('[0-9a-f\-]{36}', fsid
, re
.I
)
212 def test_blocklist_add(self
):
213 self
.rados
.blocklist_add("1.2.3.4/123", 1)
216 def test_get_cluster_stats(self
):
217 stats
= self
.rados
.get_cluster_stats()
218 assert stats
['kb'] > 0
219 assert stats
['kb_avail'] > 0
220 assert stats
['kb_used'] > 0
221 assert stats
['num_objects'] >= 0
223 def test_monitor_log(self
):
224 lock
= threading
.Condition()
225 def cb(arg
, line
, who
, sec
, nsec
, seq
, level
, msg
):
226 # NOTE(sileht): the old pyrados API was received the pointer as int
227 # instead of the value of arg
233 # NOTE(sileht): force don't save the monitor into local var
234 # to ensure all references are correctly tracked into the lib
235 MonitorLog(self
.rados
, "debug", cb
, "arg")
238 MonitorLog(self
.rados
, "debug", None, None)
239 eq(None, self
.rados
.monitor_callback
)
241 class TestIoctx(object):
243 def setup_method(self
, method
):
244 self
.rados
= Rados(conffile
='')
246 self
.rados
.create_pool('test_pool')
247 assert self
.rados
.pool_exists('test_pool')
248 self
.ioctx
= self
.rados
.open_ioctx('test_pool')
250 def teardown_method(self
, method
):
251 cmd
= {"prefix":"osd unset", "key":"noup"}
252 self
.rados
.mon_command(json
.dumps(cmd
), b
'')
254 self
.rados
.delete_pool('test_pool')
255 self
.rados
.shutdown()
257 def test_get_last_version(self
):
258 version
= self
.ioctx
.get_last_version()
261 def test_get_stats(self
):
262 stats
= self
.ioctx
.get_stats()
263 eq(stats
, {'num_objects_unfound': 0,
264 'num_objects_missing_on_primary': 0,
265 'num_object_clones': 0,
267 'num_object_copies': 0,
273 'num_objects_degraded': 0,
276 def test_write(self
):
277 self
.ioctx
.write('abc', b
'abc')
278 eq(self
.ioctx
.read('abc'), b
'abc')
280 def test_write_full(self
):
281 self
.ioctx
.write('abc', b
'abc')
282 eq(self
.ioctx
.read('abc'), b
'abc')
283 self
.ioctx
.write_full('abc', b
'd')
284 eq(self
.ioctx
.read('abc'), b
'd')
286 def test_writesame(self
):
287 self
.ioctx
.writesame('ob', b
'rzx', 9)
288 eq(self
.ioctx
.read('ob'), b
'rzxrzxrzx')
290 def test_append(self
):
291 self
.ioctx
.write('abc', b
'a')
292 self
.ioctx
.append('abc', b
'b')
293 self
.ioctx
.append('abc', b
'c')
294 eq(self
.ioctx
.read('abc'), b
'abc')
296 def test_write_zeros(self
):
297 self
.ioctx
.write('abc', b
'a\0b\0c')
298 eq(self
.ioctx
.read('abc'), b
'a\0b\0c')
300 def test_trunc(self
):
301 self
.ioctx
.write('abc', b
'abc')
302 self
.ioctx
.trunc('abc', 2)
303 eq(self
.ioctx
.read('abc'), b
'ab')
304 size
= self
.ioctx
.stat('abc')[0]
307 def test_cmpext(self
):
308 self
.ioctx
.write('test_object', b
'abcdefghi')
309 eq(0, self
.ioctx
.cmpext('test_object', b
'abcdefghi', 0))
310 eq(-MAX_ERRNO
- 4, self
.ioctx
.cmpext('test_object', b
'abcdxxxxx', 0))
312 def test_list_objects_empty(self
):
313 eq(list(self
.ioctx
.list_objects()), [])
315 def test_list_objects(self
):
316 self
.ioctx
.write('a', b
'')
317 self
.ioctx
.write('b', b
'foo')
318 self
.ioctx
.write_full('c', b
'bar')
319 self
.ioctx
.append('d', b
'jazz')
320 object_names
= [obj
.key
for obj
in self
.ioctx
.list_objects()]
321 eq(sorted(object_names
), ['a', 'b', 'c', 'd'])
323 def test_list_ns_objects(self
):
324 self
.ioctx
.write('a', b
'')
325 self
.ioctx
.write('b', b
'foo')
326 self
.ioctx
.write_full('c', b
'bar')
327 self
.ioctx
.append('d', b
'jazz')
328 self
.ioctx
.set_namespace("ns1")
329 self
.ioctx
.write('ns1-a', b
'')
330 self
.ioctx
.write('ns1-b', b
'foo')
331 self
.ioctx
.write_full('ns1-c', b
'bar')
332 self
.ioctx
.append('ns1-d', b
'jazz')
333 self
.ioctx
.append('d', b
'jazz')
334 self
.ioctx
.set_namespace(LIBRADOS_ALL_NSPACES
)
335 object_names
= [(obj
.nspace
, obj
.key
) for obj
in self
.ioctx
.list_objects()]
336 eq(sorted(object_names
), [('', 'a'), ('','b'), ('','c'), ('','d'),\
337 ('ns1', 'd'), ('ns1', 'ns1-a'), ('ns1', 'ns1-b'),\
338 ('ns1', 'ns1-c'), ('ns1', 'ns1-d')])
340 def test_xattrs(self
):
341 xattrs
= dict(a
=b
'1', b
=b
'2', c
=b
'3', d
=b
'a\0b', e
=b
'\0', f
=b
'')
342 self
.ioctx
.write('abc', b
'')
343 for key
, value
in xattrs
.items():
344 self
.ioctx
.set_xattr('abc', key
, value
)
345 eq(self
.ioctx
.get_xattr('abc', key
), value
)
347 for key
, value
in self
.ioctx
.get_xattrs('abc'):
348 stored_xattrs
[key
] = value
349 eq(stored_xattrs
, xattrs
)
351 def test_obj_xattrs(self
):
352 xattrs
= dict(a
=b
'1', b
=b
'2', c
=b
'3', d
=b
'a\0b', e
=b
'\0', f
=b
'')
353 self
.ioctx
.write('abc', b
'')
354 obj
= list(self
.ioctx
.list_objects())[0]
355 for key
, value
in xattrs
.items():
356 obj
.set_xattr(key
, value
)
357 eq(obj
.get_xattr(key
), value
)
359 for key
, value
in obj
.get_xattrs():
360 stored_xattrs
[key
] = value
361 eq(stored_xattrs
, xattrs
)
363 def test_get_pool_id(self
):
364 eq(self
.ioctx
.get_pool_id(), self
.rados
.pool_lookup('test_pool'))
366 def test_get_pool_name(self
):
367 eq(self
.ioctx
.get_pool_name(), 'test_pool')
369 def test_create_snap(self
):
370 assert_raises(ObjectNotFound
, self
.ioctx
.remove_snap
, 'foo')
371 self
.ioctx
.create_snap('foo')
372 self
.ioctx
.remove_snap('foo')
374 def test_list_snaps_empty(self
):
375 eq(list(self
.ioctx
.list_snaps()), [])
377 def test_list_snaps(self
):
378 snaps
= ['snap1', 'snap2', 'snap3']
380 self
.ioctx
.create_snap(snap
)
381 listed_snaps
= [snap
.name
for snap
in self
.ioctx
.list_snaps()]
382 eq(snaps
, listed_snaps
)
384 def test_lookup_snap(self
):
385 self
.ioctx
.create_snap('foo')
386 snap
= self
.ioctx
.lookup_snap('foo')
389 def test_snap_timestamp(self
):
390 self
.ioctx
.create_snap('foo')
391 snap
= self
.ioctx
.lookup_snap('foo')
394 def test_remove_snap(self
):
395 self
.ioctx
.create_snap('foo')
396 (snap
,) = self
.ioctx
.list_snaps()
398 self
.ioctx
.remove_snap('foo')
399 eq(list(self
.ioctx
.list_snaps()), [])
401 @pytest.mark
.rollback
402 def test_snap_rollback(self
):
403 self
.ioctx
.write("insnap", b
"contents1")
404 self
.ioctx
.create_snap("snap1")
405 self
.ioctx
.remove_object("insnap")
406 self
.ioctx
.snap_rollback("insnap", "snap1")
407 eq(self
.ioctx
.read("insnap"), b
"contents1")
408 self
.ioctx
.remove_snap("snap1")
409 self
.ioctx
.remove_object("insnap")
411 @pytest.mark
.rollback
412 def test_snap_rollback_removed(self
):
413 self
.ioctx
.write("insnap", b
"contents1")
414 self
.ioctx
.create_snap("snap1")
415 self
.ioctx
.write("insnap", b
"contents2")
416 self
.ioctx
.snap_rollback("insnap", "snap1")
417 eq(self
.ioctx
.read("insnap"), b
"contents1")
418 self
.ioctx
.remove_snap("snap1")
419 self
.ioctx
.remove_object("insnap")
421 def test_snap_read(self
):
422 self
.ioctx
.write("insnap", b
"contents1")
423 self
.ioctx
.create_snap("snap1")
424 self
.ioctx
.remove_object("insnap")
425 snap
= self
.ioctx
.lookup_snap("snap1")
426 self
.ioctx
.set_read(snap
.snap_id
)
427 eq(self
.ioctx
.read("insnap"), b
"contents1")
428 self
.ioctx
.set_read(LIBRADOS_SNAP_HEAD
)
429 self
.ioctx
.write("inhead", b
"contents2")
430 eq(self
.ioctx
.read("inhead"), b
"contents2")
431 self
.ioctx
.remove_snap("snap1")
432 self
.ioctx
.remove_object("inhead")
434 def test_set_omap(self
):
435 keys
= ("1", "2", "3", "4", b
"\xff")
436 values
= (b
"aaa", b
"bbb", b
"ccc", b
"\x04\x04\x04\x04", b
"5")
437 with
WriteOpCtx() as write_op
:
438 self
.ioctx
.set_omap(write_op
, keys
, values
)
439 write_op
.set_flags(LIBRADOS_OPERATION_SKIPRWLOCKS
)
440 self
.ioctx
.operate_write_op(write_op
, "hw")
441 with
ReadOpCtx() as read_op
:
442 iter, ret
= self
.ioctx
.get_omap_vals(read_op
, "", "", 5, omap_key_type
=bytes
)
444 self
.ioctx
.operate_read_op(read_op
, "hw")
446 eq(list(iter), [(b
"2", b
"bbb"), (b
"3", b
"ccc"), (b
"4", b
"\x04\x04\x04\x04"), (b
"\xff", b
"5")])
447 with
ReadOpCtx() as read_op
:
448 iter, ret
= self
.ioctx
.get_omap_vals(read_op
, b
"2", "", 4, omap_key_type
=bytes
)
450 self
.ioctx
.operate_read_op(read_op
, "hw")
451 eq((b
"3", b
"ccc"), next(iter))
452 eq(list(iter), [(b
"4", b
"\x04\x04\x04\x04"), (b
"\xff", b
"5")])
453 with
ReadOpCtx() as read_op
:
454 iter, ret
= self
.ioctx
.get_omap_vals(read_op
, "", "2", 4, omap_key_type
=bytes
)
456 read_op
.set_flags(LIBRADOS_OPERATION_BALANCE_READS
)
457 self
.ioctx
.operate_read_op(read_op
, "hw")
458 eq(list(iter), [(b
"2", b
"bbb")])
460 def test_set_omap_aio(self
):
461 lock
= threading
.Condition()
469 keys
= ("1", "2", "3", "4")
470 values
= (b
"aaa", b
"bbb", b
"ccc", b
"\x04\x04\x04\x04")
471 with
WriteOpCtx() as write_op
:
472 self
.ioctx
.set_omap(write_op
, keys
, values
)
473 comp
= self
.ioctx
.operate_aio_write_op(write_op
, "hw", cb
, cb
)
474 comp
.wait_for_complete()
478 eq(comp
.get_return_value(), 0)
480 with
ReadOpCtx() as read_op
:
481 iter, ret
= self
.ioctx
.get_omap_vals(read_op
, "", "", 4)
483 comp
= self
.ioctx
.operate_aio_read_op(read_op
, "hw", cb
, cb
)
484 comp
.wait_for_complete()
488 eq(comp
.get_return_value(), 0)
490 eq(list(iter), [("2", b
"bbb"), ("3", b
"ccc"), ("4", b
"\x04\x04\x04\x04")])
492 def test_write_ops(self
):
493 with
WriteOpCtx() as write_op
:
495 self
.ioctx
.operate_write_op(write_op
, "write_ops")
496 eq(self
.ioctx
.read('write_ops'), b
'')
498 write_op
.write_full(b
'1')
499 write_op
.append(b
'2')
500 self
.ioctx
.operate_write_op(write_op
, "write_ops")
501 eq(self
.ioctx
.read('write_ops'), b
'12')
503 write_op
.write_full(b
'12345')
504 write_op
.write(b
'x', 2)
505 self
.ioctx
.operate_write_op(write_op
, "write_ops")
506 eq(self
.ioctx
.read('write_ops'), b
'12x45')
508 write_op
.write_full(b
'12345')
510 self
.ioctx
.operate_write_op(write_op
, "write_ops")
511 eq(self
.ioctx
.read('write_ops'), b
'12\x00\x005')
513 write_op
.write_full(b
'12345')
515 self
.ioctx
.operate_write_op(write_op
, "write_ops")
516 eq(self
.ioctx
.read('write_ops'), b
'12')
519 self
.ioctx
.operate_write_op(write_op
, "write_ops")
520 with pytest
.raises(ObjectNotFound
):
521 self
.ioctx
.read('write_ops')
523 def test_execute_op(self
):
524 with
WriteOpCtx() as write_op
:
525 write_op
.execute("hello", "record_hello", b
"ebs")
526 self
.ioctx
.operate_write_op(write_op
, "object")
527 eq(self
.ioctx
.read('object'), b
"Hello, ebs!")
529 def test_writesame_op(self
):
530 with
WriteOpCtx() as write_op
:
531 write_op
.writesame(b
'rzx', 9)
532 self
.ioctx
.operate_write_op(write_op
, 'abc')
533 eq(self
.ioctx
.read('abc'), b
'rzxrzxrzx')
535 def test_get_omap_vals_by_keys(self
):
536 keys
= ("1", "2", "3", "4", b
"\xff")
537 values
= (b
"aaa", b
"bbb", b
"ccc", b
"\x04\x04\x04\x04", b
"5")
538 with
WriteOpCtx() as write_op
:
539 self
.ioctx
.set_omap(write_op
, keys
, values
)
540 self
.ioctx
.operate_write_op(write_op
, "hw")
541 with
ReadOpCtx() as read_op
:
542 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
,("3","4",b
"\xff"), omap_key_type
=bytes
)
544 self
.ioctx
.operate_read_op(read_op
, "hw")
545 eq(list(iter), [(b
"3", b
"ccc"), (b
"4", b
"\x04\x04\x04\x04"), (b
"\xff", b
"5")])
546 with
ReadOpCtx() as read_op
:
547 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
,("3","4",), omap_key_type
=bytes
)
549 with pytest
.raises(ObjectNotFound
):
550 self
.ioctx
.operate_read_op(read_op
, "no_such")
552 def test_get_omap_keys(self
):
553 keys
= ("1", "2", "3")
554 values
= (b
"aaa", b
"bbb", b
"ccc")
555 with
WriteOpCtx() as write_op
:
556 self
.ioctx
.set_omap(write_op
, keys
, values
)
557 self
.ioctx
.operate_write_op(write_op
, "hw")
558 with
ReadOpCtx() as read_op
:
559 iter, ret
= self
.ioctx
.get_omap_keys(read_op
,"",2)
561 self
.ioctx
.operate_read_op(read_op
, "hw")
562 eq(list(iter), [("1", None), ("2", None)])
563 with
ReadOpCtx() as read_op
:
564 iter, ret
= self
.ioctx
.get_omap_keys(read_op
,"",2)
566 with pytest
.raises(ObjectNotFound
):
567 self
.ioctx
.operate_read_op(read_op
, "no_such")
569 def test_clear_omap(self
):
570 keys
= ("1", "2", "3")
571 values
= (b
"aaa", b
"bbb", b
"ccc")
572 with
WriteOpCtx() as write_op
:
573 self
.ioctx
.set_omap(write_op
, keys
, values
)
574 self
.ioctx
.operate_write_op(write_op
, "hw")
575 with
WriteOpCtx() as write_op_1
:
576 self
.ioctx
.clear_omap(write_op_1
)
577 self
.ioctx
.operate_write_op(write_op_1
, "hw")
578 with
ReadOpCtx() as read_op
:
579 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
,("1",))
581 self
.ioctx
.operate_read_op(read_op
, "hw")
584 def test_remove_omap_range2(self
):
585 keys
= ("1", "2", "3", "4")
586 values
= (b
"a", b
"bb", b
"ccc", b
"dddd")
587 with
WriteOpCtx() as write_op
:
588 self
.ioctx
.set_omap(write_op
, keys
, values
)
589 self
.ioctx
.operate_write_op(write_op
, "test_obj")
590 with
ReadOpCtx() as read_op
:
591 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
, keys
)
593 self
.ioctx
.operate_read_op(read_op
, "test_obj")
594 eq(list(iter), list(zip(keys
, values
)))
595 with
WriteOpCtx() as write_op
:
596 self
.ioctx
.remove_omap_range2(write_op
, "1", "4")
597 self
.ioctx
.operate_write_op(write_op
, "test_obj")
598 with
ReadOpCtx() as read_op
:
599 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
, keys
)
601 self
.ioctx
.operate_read_op(read_op
, "test_obj")
602 eq(list(iter), [("4", b
"dddd")])
604 def test_omap_cmp(self
):
606 self
.ioctx
.write(object_id
, b
'omap_cmp')
607 with
WriteOpCtx() as write_op
:
608 self
.ioctx
.set_omap(write_op
, ('key1',), ('1',))
609 self
.ioctx
.operate_write_op(write_op
, object_id
)
610 with
WriteOpCtx() as write_op
:
611 write_op
.omap_cmp('key1', '1', LIBRADOS_CMPXATTR_OP_EQ
)
612 self
.ioctx
.set_omap(write_op
, ('key1',), ('2',))
613 self
.ioctx
.operate_write_op(write_op
, object_id
)
614 with
ReadOpCtx() as read_op
:
615 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
, ('key1',))
617 self
.ioctx
.operate_read_op(read_op
, object_id
)
618 eq(list(iter), [('key1', b
'2')])
619 with
WriteOpCtx() as write_op
:
620 write_op
.omap_cmp('key1', '1', LIBRADOS_CMPXATTR_OP_GT
)
621 self
.ioctx
.set_omap(write_op
, ('key1',), ('3',))
622 self
.ioctx
.operate_write_op(write_op
, object_id
)
623 with
ReadOpCtx() as read_op
:
624 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
, ('key1',))
626 self
.ioctx
.operate_read_op(read_op
, object_id
)
627 eq(list(iter), [('key1', b
'3')])
628 with
WriteOpCtx() as write_op
:
629 write_op
.omap_cmp('key1', '4', LIBRADOS_CMPXATTR_OP_LT
)
630 self
.ioctx
.set_omap(write_op
, ('key1',), ('4',))
631 self
.ioctx
.operate_write_op(write_op
, object_id
)
632 with
ReadOpCtx() as read_op
:
633 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
, ('key1',))
635 self
.ioctx
.operate_read_op(read_op
, object_id
)
636 eq(list(iter), [('key1', b
'4')])
637 with
WriteOpCtx() as write_op
:
638 write_op
.omap_cmp('key1', '1', LIBRADOS_CMPXATTR_OP_EQ
)
639 self
.ioctx
.set_omap(write_op
, ('key1',), ('5',))
641 self
.ioctx
.operate_write_op(write_op
, object_id
)
642 except (OSError, ExtendMismatch
) as e
:
645 message
= "omap_cmp did not raise Exception when omap content does not match"
646 raise AssertionError(message
)
648 def test_cmpext_op(self
):
650 with
WriteOpCtx() as write_op
:
651 write_op
.write(b
'12345', 0)
652 self
.ioctx
.operate_write_op(write_op
, object_id
)
653 with
WriteOpCtx() as write_op
:
654 write_op
.cmpext(b
'12345', 0)
655 write_op
.write(b
'54321', 0)
656 self
.ioctx
.operate_write_op(write_op
, object_id
)
657 eq(self
.ioctx
.read(object_id
), b
'54321')
658 with
WriteOpCtx() as write_op
:
659 write_op
.cmpext(b
'56789', 0)
660 write_op
.write(b
'12345', 0)
662 self
.ioctx
.operate_write_op(write_op
, object_id
)
663 except ExtendMismatch
as e
:
664 # the cmpext_result compare with expected error number, it should be (-MAX_ERRNO - 1)
665 # where "1" is the offset of the first unmatched byte
666 eq(-e
.errno
, -MAX_ERRNO
- 1)
669 message
= "cmpext did not raise Exception when object content does not match"
670 raise AssertionError(message
)
671 with
ReadOpCtx() as read_op
:
672 read_op
.cmpext(b
'54321', 0)
673 self
.ioctx
.operate_read_op(read_op
, object_id
)
674 with
ReadOpCtx() as read_op
:
675 read_op
.cmpext(b
'54789', 0)
677 self
.ioctx
.operate_read_op(read_op
, object_id
)
678 except ExtendMismatch
as e
:
679 # the cmpext_result compare with expected error number, it should be (-MAX_ERRNO - 2)
680 # where "2" is the offset of the first unmatched byte
681 eq(-e
.errno
, -MAX_ERRNO
- 2)
684 message
= "cmpext did not raise Exception when object content does not match"
685 raise AssertionError(message
)
687 def test_xattrs_op(self
):
688 xattrs
= dict(a
=b
'1', b
=b
'2', c
=b
'3', d
=b
'a\0b', e
=b
'\0')
689 with
WriteOpCtx() as write_op
:
690 write_op
.new(LIBRADOS_CREATE_EXCLUSIVE
)
691 for key
, value
in xattrs
.items():
692 write_op
.set_xattr(key
, value
)
693 self
.ioctx
.operate_write_op(write_op
, 'abc')
694 eq(self
.ioctx
.get_xattr('abc', key
), value
)
697 for key
, value
in self
.ioctx
.get_xattrs('abc'):
698 stored_xattrs_1
[key
] = value
699 eq(stored_xattrs_1
, xattrs
)
701 for key
in xattrs
.keys():
702 write_op
.rm_xattr(key
)
703 self
.ioctx
.operate_write_op(write_op
, 'abc')
705 for key
, value
in self
.ioctx
.get_xattrs('abc'):
706 stored_xattrs_2
[key
] = value
707 eq(stored_xattrs_2
, {})
710 self
.ioctx
.operate_write_op(write_op
, 'abc')
712 def test_locator(self
):
713 self
.ioctx
.set_locator_key("bar")
714 self
.ioctx
.write('foo', b
'contents1')
715 objects
= [i
for i
in self
.ioctx
.list_objects()]
717 eq(self
.ioctx
.get_locator_key(), "bar")
718 self
.ioctx
.set_locator_key("")
720 objects
[0].write(b
"contents2")
721 eq(self
.ioctx
.get_locator_key(), "")
722 self
.ioctx
.set_locator_key("bar")
723 contents
= self
.ioctx
.read("foo")
724 eq(contents
, b
"contents2")
725 eq(self
.ioctx
.get_locator_key(), "bar")
727 objects
= [i
for i
in self
.ioctx
.list_objects()]
729 self
.ioctx
.set_locator_key("")
731 def test_operate_aio_write_op(self
):
732 lock
= threading
.Condition()
739 with
WriteOpCtx() as write_op
:
740 write_op
.write(b
'rzx')
741 comp
= self
.ioctx
.operate_aio_write_op(write_op
, "object", cb
, cb
)
742 comp
.wait_for_complete()
746 eq(comp
.get_return_value(), 0)
747 eq(self
.ioctx
.read('object'), b
'rzx')
749 def test_aio_write(self
):
750 lock
= threading
.Condition()
757 comp
= self
.ioctx
.aio_write("foo", b
"bar", 0, cb
, cb
)
758 comp
.wait_for_complete()
762 eq(comp
.get_return_value(), 0)
763 contents
= self
.ioctx
.read("foo")
765 [i
.remove() for i
in self
.ioctx
.list_objects()]
767 def test_aio_cmpext(self
):
768 lock
= threading
.Condition()
776 self
.ioctx
.write('test_object', b
'abcdefghi')
777 comp
= self
.ioctx
.aio_cmpext('test_object', b
'abcdefghi', 0, cb
)
778 comp
.wait_for_complete()
782 eq(comp
.get_return_value(), 0)
784 def test_aio_rmxattr(self
):
785 lock
= threading
.Condition()
792 self
.ioctx
.set_xattr("xyz", "key", b
'value')
793 eq(self
.ioctx
.get_xattr("xyz", "key"), b
'value')
794 comp
= self
.ioctx
.aio_rmxattr("xyz", "key", cb
)
795 comp
.wait_for_complete()
799 eq(comp
.get_return_value(), 0)
800 with pytest
.raises(NoData
):
801 self
.ioctx
.get_xattr("xyz", "key")
803 def test_aio_write_no_comp_ref(self
):
804 lock
= threading
.Condition()
811 # NOTE(sileht): force don't save the comp into local var
812 # to ensure all references are correctly tracked into the lib
813 self
.ioctx
.aio_write("foo", b
"bar", 0, cb
, cb
)
817 contents
= self
.ioctx
.read("foo")
819 [i
.remove() for i
in self
.ioctx
.list_objects()]
821 def test_aio_append(self
):
822 lock
= threading
.Condition()
829 comp
= self
.ioctx
.aio_write("foo", b
"bar", 0, cb
, cb
)
830 comp2
= self
.ioctx
.aio_append("foo", b
"baz", cb
, cb
)
831 comp
.wait_for_complete()
832 contents
= self
.ioctx
.read("foo")
833 eq(contents
, b
"barbaz")
837 eq(comp
.get_return_value(), 0)
838 eq(comp2
.get_return_value(), 0)
839 [i
.remove() for i
in self
.ioctx
.list_objects()]
841 def test_aio_write_full(self
):
842 lock
= threading
.Condition()
849 self
.ioctx
.aio_write("foo", b
"barbaz", 0, cb
, cb
)
850 comp
= self
.ioctx
.aio_write_full("foo", b
"bar", cb
, cb
)
851 comp
.wait_for_complete()
855 eq(comp
.get_return_value(), 0)
856 contents
= self
.ioctx
.read("foo")
858 [i
.remove() for i
in self
.ioctx
.list_objects()]
860 def test_aio_writesame(self
):
861 lock
= threading
.Condition()
868 comp
= self
.ioctx
.aio_writesame("abc", b
"rzx", 9, 0, cb
)
869 comp
.wait_for_complete()
873 eq(comp
.get_return_value(), 0)
874 eq(self
.ioctx
.read("abc"), b
"rzxrzxrzx")
875 [i
.remove() for i
in self
.ioctx
.list_objects()]
877 def test_aio_stat(self
):
878 lock
= threading
.Condition()
880 def cb(_
, size
, mtime
):
885 comp
= self
.ioctx
.aio_stat("foo", cb
)
886 comp
.wait_for_complete()
890 eq(comp
.get_return_value(), -2)
892 self
.ioctx
.write("foo", b
"bar")
894 comp
= self
.ioctx
.aio_stat("foo", cb
)
895 comp
.wait_for_complete()
899 eq(comp
.get_return_value(), 0)
901 [i
.remove() for i
in self
.ioctx
.list_objects()]
903 def test_aio_remove(self
):
904 lock
= threading
.Condition()
911 self
.ioctx
.write('foo', b
'wrx')
912 eq(self
.ioctx
.read('foo'), b
'wrx')
913 comp
= self
.ioctx
.aio_remove('foo', cb
, cb
)
914 comp
.wait_for_complete()
918 eq(comp
.get_return_value(), 0)
919 eq(list(self
.ioctx
.list_objects()), [])
921 def _take_down_acting_set(self
, pool
, objectname
):
922 # find acting_set for pool:objectname and take it down; used to
923 # verify that async reads don't complete while acting set is missing
930 r
, jsonout
, _
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
931 objmap
= json
.loads(jsonout
.decode("utf-8"))
932 acting_set
= objmap
['acting']
933 cmd
= {"prefix":"osd set", "key":"noup"}
934 r
, _
, _
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
936 cmd
= {"prefix":"osd down", "ids":[str(i
) for i
in acting_set
]}
937 r
, _
, _
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
940 # wait for OSDs to acknowledge the down
941 eq(self
.rados
.wait_for_latest_osdmap(), 0)
943 def _let_osds_back_up(self
):
944 cmd
= {"prefix":"osd unset", "key":"noup"}
945 r
, _
, _
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
949 def test_aio_read_wait_for_complete(self
):
950 # use wait_for_complete() and wait for cb by
953 # this is a list so that the local cb() can modify it
954 payload
= b
"bar\000frob"
955 self
.ioctx
.write("foo", payload
)
956 self
._take
_down
_acting
_set
('test_pool', 'foo')
959 lock
= threading
.Condition()
965 comp
= self
.ioctx
.aio_read("foo", len(payload
), 0, cb
)
966 eq(False, comp
.is_complete())
968 eq(False, comp
.is_complete())
972 self
._let
_osds
_back
_up
()
973 comp
.wait_for_complete()
976 while retval
[0] is None and loops
<= 10:
981 eq(retval
[0], payload
)
982 eq(sys
.getrefcount(comp
), 2)
985 def test_aio_read_wait_for_complete_and_cb(self
):
986 # use wait_for_complete_and_cb(), verify retval[0] is
987 # set by the time we regain control
988 payload
= b
"bar\000frob"
989 self
.ioctx
.write("foo", payload
)
991 self
._take
_down
_acting
_set
('test_pool', 'foo')
992 # this is a list so that the local cb() can modify it
994 lock
= threading
.Condition()
999 comp
= self
.ioctx
.aio_read("foo", len(payload
), 0, cb
)
1000 eq(False, comp
.is_complete())
1002 eq(False, comp
.is_complete())
1006 self
._let
_osds
_back
_up
()
1007 comp
.wait_for_complete_and_cb()
1008 assert(retval
[0] is not None)
1009 eq(retval
[0], payload
)
1010 eq(sys
.getrefcount(comp
), 2)
1013 def test_aio_read_wait_for_complete_and_cb_error(self
):
1014 # error case, use wait_for_complete_and_cb(), verify retval[0] is
1015 # set by the time we regain control
1016 self
._take
_down
_acting
_set
('test_pool', 'bar')
1018 # this is a list so that the local cb() can modify it
1020 lock
= threading
.Condition()
1026 # read from a DNE object
1027 comp
= self
.ioctx
.aio_read("bar", 3, 0, cb
)
1028 eq(False, comp
.is_complete())
1030 eq(False, comp
.is_complete())
1033 self
._let
_osds
_back
_up
()
1035 comp
.wait_for_complete_and_cb()
1037 assert(comp
.get_return_value() < 0)
1038 eq(sys
.getrefcount(comp
), 2)
1040 def test_lock(self
):
1041 self
.ioctx
.lock_exclusive("foo", "lock", "locker", "desc_lock",
1043 assert_raises(ObjectExists
,
1044 self
.ioctx
.lock_exclusive
,
1045 "foo", "lock", "locker", "desc_lock", 10000, 0)
1046 self
.ioctx
.unlock("foo", "lock", "locker")
1047 assert_raises(ObjectNotFound
, self
.ioctx
.unlock
, "foo", "lock", "locker")
1049 self
.ioctx
.lock_shared("foo", "lock", "locker1", "tag", "desc_lock",
1051 self
.ioctx
.lock_shared("foo", "lock", "locker2", "tag", "desc_lock",
1053 assert_raises(ObjectBusy
,
1054 self
.ioctx
.lock_exclusive
,
1055 "foo", "lock", "locker3", "desc_lock", 10000, 0)
1056 self
.ioctx
.unlock("foo", "lock", "locker1")
1057 self
.ioctx
.unlock("foo", "lock", "locker2")
1058 assert_raises(ObjectNotFound
, self
.ioctx
.unlock
, "foo", "lock", "locker1")
1059 assert_raises(ObjectNotFound
, self
.ioctx
.unlock
, "foo", "lock", "locker2")
1061 def test_execute(self
):
1062 self
.ioctx
.write("foo", b
"") # ensure object exists
1064 ret
, buf
= self
.ioctx
.execute("foo", "hello", "say_hello", b
"")
1065 eq(buf
, b
"Hello, world!")
1067 ret
, buf
= self
.ioctx
.execute("foo", "hello", "say_hello", b
"nose")
1068 eq(buf
, b
"Hello, nose!")
1070 def test_aio_execute(self
):
1073 lock
= threading
.Condition()
1076 if retval
[0] is None:
1080 self
.ioctx
.write("foo", b
"") # ensure object exists
1082 comp
= self
.ioctx
.aio_execute("foo", "hello", "say_hello", b
"", 32, cb
, cb
)
1083 comp
.wait_for_complete()
1087 eq(comp
.get_return_value(), 13)
1088 eq(retval
[0], b
"Hello, world!")
1091 comp
= self
.ioctx
.aio_execute("foo", "hello", "say_hello", b
"nose", 32, cb
, cb
)
1092 comp
.wait_for_complete()
1096 eq(comp
.get_return_value(), 12)
1097 eq(retval
[0], b
"Hello, nose!")
1099 [i
.remove() for i
in self
.ioctx
.list_objects()]
1101 def test_aio_setxattr(self
):
1102 lock
= threading
.Condition()
1109 comp
= self
.ioctx
.aio_setxattr("obj", "key", b
'value', cb
)
1110 comp
.wait_for_complete()
1114 eq(comp
.get_return_value(), 0)
1115 eq(self
.ioctx
.get_xattr("obj", "key"), b
'value')
1117 def test_applications(self
):
1118 cmd
= {"prefix":"osd dump", "format":"json"}
1119 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
1122 release
= json
.loads(buf
.decode("utf-8")).get("require_osd_release",
1124 if not release
or release
[0] < 'l':
1125 pytest
.skip('required_osd_release >= l')
1127 eq([], self
.ioctx
.application_list())
1129 self
.ioctx
.application_enable("app1")
1130 assert_raises(Error
, self
.ioctx
.application_enable
, "app2")
1131 self
.ioctx
.application_enable("app2", True)
1133 assert_raises(Error
, self
.ioctx
.application_metadata_list
, "dne")
1134 eq([], self
.ioctx
.application_metadata_list("app1"))
1136 assert_raises(Error
, self
.ioctx
.application_metadata_set
, "dne", "key",
1138 self
.ioctx
.application_metadata_set("app1", "key1", "val1")
1139 eq("val1", self
.ioctx
.application_metadata_get("app1", "key1"))
1140 self
.ioctx
.application_metadata_set("app1", "key2", "val2")
1141 eq("val2", self
.ioctx
.application_metadata_get("app1", "key2"))
1142 self
.ioctx
.application_metadata_set("app2", "key1", "val1")
1143 eq("val1", self
.ioctx
.application_metadata_get("app2", "key1"))
1145 eq([("key1", "val1"), ("key2", "val2")],
1146 self
.ioctx
.application_metadata_list("app1"))
1148 self
.ioctx
.application_metadata_remove("app1", "key1")
1149 eq([("key2", "val2")], self
.ioctx
.application_metadata_list("app1"))
1151 def test_service_daemon(self
):
1152 name
= "pid-" + str(os
.getpid())
1153 metadata
= {'version': '3.14', 'memory': '42'}
1154 self
.rados
.service_daemon_register("laundry", name
, metadata
)
1155 status
= {'result': 'unknown', 'test': 'running'}
1156 self
.rados
.service_daemon_update(status
)
1158 def test_alignment(self
):
1159 eq(self
.ioctx
.alignment(), None)
1163 class TestIoctxEc(object):
1165 def setup_method(self
, method
):
1166 self
.rados
= Rados(conffile
='')
1167 self
.rados
.connect()
1168 self
.pool
= 'test-ec'
1169 self
.profile
= 'testprofile-%s' % self
.pool
1170 cmd
= {"prefix": "osd erasure-code-profile set",
1171 "name": self
.profile
, "profile": ["k=2", "m=1", "crush-failure-domain=osd"]}
1172 ret
, buf
, out
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
1173 assert ret
== 0, out
1174 # create ec pool with profile created above
1175 cmd
= {'prefix': 'osd pool create', 'pg_num': 8, 'pgp_num': 8,
1176 'pool': self
.pool
, 'pool_type': 'erasure',
1177 'erasure_code_profile': self
.profile
}
1178 ret
, buf
, out
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
1179 assert ret
== 0, out
1180 assert self
.rados
.pool_exists(self
.pool
)
1181 self
.ioctx
= self
.rados
.open_ioctx(self
.pool
)
1183 def teardown_method(self
, method
):
1184 cmd
= {"prefix": "osd unset", "key": "noup"}
1185 self
.rados
.mon_command(json
.dumps(cmd
), b
'')
1187 self
.rados
.delete_pool(self
.pool
)
1188 self
.rados
.shutdown()
1190 def test_alignment(self
):
1191 eq(self
.ioctx
.alignment(), 8192)
1194 class TestIoctx2(object):
1196 def setup_method(self
, method
):
1197 self
.rados
= Rados(conffile
='')
1198 self
.rados
.connect()
1199 self
.rados
.create_pool('test_pool')
1200 assert self
.rados
.pool_exists('test_pool')
1201 pool_id
= self
.rados
.pool_lookup('test_pool')
1203 self
.ioctx2
= self
.rados
.open_ioctx2(pool_id
)
1205 def teardown_method(self
, method
):
1206 cmd
= {"prefix": "osd unset", "key": "noup"}
1207 self
.rados
.mon_command(json
.dumps(cmd
), b
'')
1209 self
.rados
.delete_pool('test_pool')
1210 self
.rados
.shutdown()
1212 def test_get_last_version(self
):
1213 version
= self
.ioctx2
.get_last_version()
1216 def test_get_stats(self
):
1217 stats
= self
.ioctx2
.get_stats()
1218 eq(stats
, {'num_objects_unfound': 0,
1219 'num_objects_missing_on_primary': 0,
1220 'num_object_clones': 0,
1222 'num_object_copies': 0,
1228 'num_objects_degraded': 0,
1232 class TestObject(object):
1234 def setup_method(self
, method
):
1235 self
.rados
= Rados(conffile
='')
1236 self
.rados
.connect()
1237 self
.rados
.create_pool('test_pool')
1238 assert self
.rados
.pool_exists('test_pool')
1239 self
.ioctx
= self
.rados
.open_ioctx('test_pool')
1240 self
.ioctx
.write('foo', b
'bar')
1241 self
.object = Object(self
.ioctx
, 'foo')
1243 def teardown_method(self
, method
):
1246 self
.rados
.delete_pool('test_pool')
1247 self
.rados
.shutdown()
1250 def test_read(self
):
1251 eq(self
.object.read(3), b
'bar')
1252 eq(self
.object.read(100), b
'')
1254 def test_seek(self
):
1255 self
.object.write(b
'blah')
1257 eq(self
.object.read(4), b
'blah')
1259 eq(self
.object.read(3), b
'lah')
1261 def test_write(self
):
1262 self
.object.write(b
'barbaz')
1264 eq(self
.object.read(3), b
'bar')
1265 eq(self
.object.read(3), b
'baz')
1267 class TestIoCtxSelfManagedSnaps(object):
1268 def setup_method(self
, method
):
1269 self
.rados
= Rados(conffile
='')
1270 self
.rados
.connect()
1271 self
.rados
.create_pool('test_pool')
1272 assert self
.rados
.pool_exists('test_pool')
1273 self
.ioctx
= self
.rados
.open_ioctx('test_pool')
1275 def teardown_method(self
, method
):
1276 cmd
= {"prefix":"osd unset", "key":"noup"}
1277 self
.rados
.mon_command(json
.dumps(cmd
), b
'')
1279 self
.rados
.delete_pool('test_pool')
1280 self
.rados
.shutdown()
1282 @pytest.mark
.rollback
1284 # cannot mix-and-match pool and self-managed snapshot mode
1285 self
.ioctx
.set_self_managed_snap_write([])
1286 self
.ioctx
.write('abc', b
'abc')
1287 snap_id_1
= self
.ioctx
.create_self_managed_snap()
1288 self
.ioctx
.set_self_managed_snap_write([snap_id_1
])
1290 self
.ioctx
.write('abc', b
'def')
1291 snap_id_2
= self
.ioctx
.create_self_managed_snap()
1292 self
.ioctx
.set_self_managed_snap_write([snap_id_1
, snap_id_2
])
1294 self
.ioctx
.write('abc', b
'ghi')
1296 self
.ioctx
.rollback_self_managed_snap('abc', snap_id_1
)
1297 eq(self
.ioctx
.read('abc'), b
'abc')
1299 self
.ioctx
.rollback_self_managed_snap('abc', snap_id_2
)
1300 eq(self
.ioctx
.read('abc'), b
'def')
1302 self
.ioctx
.remove_self_managed_snap(snap_id_1
)
1303 self
.ioctx
.remove_self_managed_snap(snap_id_2
)
1305 class TestCommand(object):
1307 def setup_method(self
, method
):
1308 self
.rados
= Rados(conffile
='')
1309 self
.rados
.connect()
1311 def teardown_method(self
, method
):
1312 self
.rados
.shutdown()
1314 def test_monmap_dump(self
):
1316 # check for success and some plain output with epoch in it
1317 cmd
= {"prefix":"mon dump"}
1318 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
1321 assert(b
'epoch' in buf
)
1323 # JSON, and grab current epoch
1324 cmd
['format'] = 'json'
1325 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
1328 d
= json
.loads(buf
.decode("utf-8"))
1329 assert('epoch' in d
)
1332 # assume epoch + 1000 does not exist; test for ENOENT
1333 cmd
['epoch'] = epoch
+ 1000
1334 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
1335 eq(ret
, -errno
.ENOENT
)
1339 # send to specific target by name, rank
1340 cmd
= {"prefix": "version"}
1342 target
= d
['mons'][0]['name']
1344 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30,
1348 e
= json
.loads(buf
.decode("utf-8"))
1349 assert('release' in e
)
1351 target
= d
['mons'][0]['rank']
1353 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30,
1357 e
= json
.loads(buf
.decode("utf-8"))
1358 assert('release' in e
)
1361 def test_osd_bench(self
):
1362 cmd
= dict(prefix
='bench', size
=4096, count
=8192)
1363 ret
, buf
, err
= self
.rados
.osd_command(0, json
.dumps(cmd
), b
'',
1367 out
= json
.loads(buf
.decode('utf-8'))
1368 eq(out
['blocksize'], cmd
['size'])
1369 eq(out
['bytes_written'], cmd
['count'])
1371 def test_ceph_osd_pool_create_utf8(self
):
1374 cmd
= {"prefix": "osd pool create", "pg_num": 16, "pool": poolname
}
1375 ret
, buf
, out
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
1378 eq(u
"pool '\u9ec5' created", out
)
1382 class TestWatchNotify(object):
1383 OID
= "test_watch_notify"
1385 def setup_method(self
, method
):
1386 self
.rados
= Rados(conffile
='')
1387 self
.rados
.connect()
1388 self
.rados
.create_pool('test_pool')
1389 assert self
.rados
.pool_exists('test_pool')
1390 self
.ioctx
= self
.rados
.open_ioctx('test_pool')
1391 self
.ioctx
.write(self
.OID
, b
'test watch notify')
1392 self
.lock
= threading
.Condition()
1393 self
.notify_cnt
= {}
1394 self
.notify_data
= {}
1395 self
.notify_error
= {}
1399 self
.instance_id
= self
.rados
.get_instance_id()
1401 def teardown_method(self
, method
):
1403 self
.rados
.delete_pool('test_pool')
1404 self
.rados
.shutdown()
1406 def make_callback(self
):
1407 def callback(notify_id
, notifier_id
, watch_id
, data
):
1409 if watch_id
not in self
.notify_cnt
:
1410 self
.notify_cnt
[watch_id
] = 1
1411 elif self
.notify_data
[watch_id
] != data
:
1412 self
.notify_cnt
[watch_id
] += 1
1413 self
.notify_data
[watch_id
] = data
1416 def make_error_callback(self
):
1417 def callback(watch_id
, error
):
1419 self
.notify_error
[watch_id
] = error
1424 with self
.ioctx
.watch(self
.OID
, self
.make_callback(),
1425 self
.make_error_callback()) as watch1
:
1426 watch_id1
= watch1
.get_id()
1427 assert(watch_id1
> 0)
1429 with self
.rados
.open_ioctx('test_pool') as ioctx
:
1430 watch2
= ioctx
.watch(self
.OID
, self
.make_callback(),
1431 self
.make_error_callback())
1432 watch_id2
= watch2
.get_id()
1433 assert(watch_id2
> 0)
1435 assert(self
.ioctx
.notify(self
.OID
, 'test'))
1437 assert(watch_id1
in self
.notify_cnt
)
1438 assert(watch_id2
in self
.notify_cnt
)
1439 eq(self
.notify_cnt
[watch_id1
], 1)
1440 eq(self
.notify_cnt
[watch_id2
], 1)
1441 eq(self
.notify_data
[watch_id1
], b
'test')
1442 eq(self
.notify_data
[watch_id2
], b
'test')
1444 assert(watch1
.check() >= timedelta())
1445 assert(watch2
.check() >= timedelta())
1447 assert(self
.ioctx
.notify(self
.OID
, 'best'))
1449 eq(self
.notify_cnt
[watch_id1
], 2)
1450 eq(self
.notify_cnt
[watch_id2
], 2)
1451 eq(self
.notify_data
[watch_id1
], b
'best')
1452 eq(self
.notify_data
[watch_id2
], b
'best')
1456 assert(self
.ioctx
.notify(self
.OID
, 'rest'))
1458 eq(self
.notify_cnt
[watch_id1
], 3)
1459 eq(self
.notify_cnt
[watch_id2
], 2)
1460 eq(self
.notify_data
[watch_id1
], b
'rest')
1461 eq(self
.notify_data
[watch_id2
], b
'best')
1463 assert(watch1
.check() >= timedelta())
1465 self
.ioctx
.remove_object(self
.OID
)
1469 if watch_id1
in self
.notify_error
:
1472 eq(self
.notify_error
[watch_id1
], -errno
.ENOTCONN
)
1473 assert_raises(NotConnected
, watch1
.check
)
1475 assert_raises(ObjectNotFound
, self
.ioctx
.notify
, self
.OID
, 'test')
1477 def make_callback_reply(self
):
1478 def callback(notify_id
, notifier_id
, watch_id
, data
):
1483 def notify_callback(self
, _
, r
, ack_list
, timeout_list
):
1486 for notifier_id
, _
, notifier_data
in ack_list
:
1487 if notifier_id
not in self
.ack_cnt
:
1488 self
.ack_cnt
[notifier_id
] = 0
1489 self
.ack_cnt
[notifier_id
] += 1
1490 self
.ack_data
[notifier_id
] = notifier_data
1492 def notify_callback_err(self
, _
, r
, ack_list
, timeout_list
):
1493 eq(r
, -errno
.ENOENT
)
1495 def test_aio_notify(self
):
1496 with self
.ioctx
.watch(self
.OID
, self
.make_callback_reply(),
1497 self
.make_error_callback()) as watch1
:
1498 watch_id1
= watch1
.get_id()
1499 assert watch_id1
> 0
1501 with self
.rados
.open_ioctx('test_pool') as ioctx
:
1502 watch2
= ioctx
.watch(self
.OID
, self
.make_callback_reply(),
1503 self
.make_error_callback())
1504 watch_id2
= watch2
.get_id()
1505 assert watch_id2
> 0
1507 comp
= self
.ioctx
.aio_notify(self
.OID
, self
.notify_callback
, msg
='test')
1508 comp
.wait_for_complete_and_cb()
1510 assert self
.instance_id
in self
.ack_cnt
1511 eq(self
.ack_cnt
[self
.instance_id
], 2)
1512 eq(self
.ack_data
[self
.instance_id
], b
'test')
1514 assert watch1
.check() >= timedelta()
1515 assert watch2
.check() >= timedelta()
1517 comp
= self
.ioctx
.aio_notify(self
.OID
, self
.notify_callback
, msg
='best')
1518 comp
.wait_for_complete_and_cb()
1520 eq(self
.ack_cnt
[self
.instance_id
], 4)
1521 eq(self
.ack_data
[self
.instance_id
], b
'best')
1525 comp
= self
.ioctx
.aio_notify(self
.OID
, self
.notify_callback
, msg
='rest')
1526 comp
.wait_for_complete_and_cb()
1528 eq(self
.ack_cnt
[self
.instance_id
], 5)
1529 eq(self
.ack_data
[self
.instance_id
], b
'rest')
1531 assert(watch1
.check() >= timedelta())
1532 self
.ioctx
.remove_object(self
.OID
)
1536 if watch_id1
in self
.notify_error
:
1539 eq(self
.notify_error
[watch_id1
], -errno
.ENOTCONN
)
1540 assert_raises(NotConnected
, watch1
.check
)
1542 comp
= self
.ioctx
.aio_notify(self
.OID
, self
.notify_callback_err
, msg
='test')
1543 comp
.wait_for_complete_and_cb()