1 from __future__
import print_function
2 from nose
.tools
import eq_
as eq
, ok_
as ok
, assert_raises
3 from rados
import (Rados
, Error
, RadosStateError
, Object
, ObjectExists
,
4 ObjectNotFound
, ObjectBusy
, requires
, opt
,
5 ANONYMOUS_AUID
, ADMIN_AUID
, LIBRADOS_ALL_NSPACES
, WriteOpCtx
, ReadOpCtx
,
6 LIBRADOS_SNAP_HEAD
, LIBRADOS_OPERATION_BALANCE_READS
, LIBRADOS_OPERATION_SKIPRWLOCKS
, MonitorLog
)
13 # Are we running Python 2.x
14 _python2
= sys
.version_info
[0] < 3
16 def test_rados_init_error():
17 assert_raises(Error
, Rados
, conffile
='', rados_id
='admin',
19 assert_raises(Error
, Rados
, conffile
='', name
='invalid')
20 assert_raises(Error
, Rados
, conffile
='', name
='bad.invalid')
22 def test_rados_init():
23 with
Rados(conffile
='', rados_id
='admin'):
25 with
Rados(conffile
='', name
='client.admin'):
27 with
Rados(conffile
='', name
='client.admin'):
29 with
Rados(conffile
='', name
='client.admin'):
32 def test_ioctx_context_manager():
33 with
Rados(conffile
='', rados_id
='admin') as conn
:
34 with conn
.open_ioctx('rbd') as ioctx
:
37 def test_parse_argv():
38 args
= ['osd', 'pool', 'delete', 'foobar', 'foobar', '--yes-i-really-really-mean-it']
40 eq(args
, r
.conf_parse_argv(args
))
42 def test_parse_argv_empty_str():
45 eq(args
, r
.conf_parse_argv(args
))
47 class TestRequires(object):
48 @requires(('foo', str), ('bar', int), ('baz', int))
49 def _method_plain(self
, foo
, bar
, baz
):
50 ok(isinstance(foo
, str))
51 ok(isinstance(bar
, int))
52 ok(isinstance(baz
, int))
53 return (foo
, bar
, baz
)
55 def test_method_plain(self
):
56 assert_raises(TypeError, self
._method
_plain
, 42, 42, 42)
57 assert_raises(TypeError, self
._method
_plain
, '42', '42', '42')
58 assert_raises(TypeError, self
._method
_plain
, foo
='42', bar
='42', baz
='42')
59 eq(self
._method
_plain
('42', 42, 42), ('42', 42, 42))
60 eq(self
._method
_plain
(foo
='42', bar
=42, baz
=42), ('42', 42, 42))
62 @requires(('opt_foo', opt(str)), ('opt_bar', opt(int)), ('baz', int))
63 def _method_with_opt_arg(self
, foo
, bar
, baz
):
64 ok(isinstance(foo
, str) or foo
is None)
65 ok(isinstance(bar
, int) or bar
is None)
66 ok(isinstance(baz
, int))
67 return (foo
, bar
, baz
)
69 def test_method_with_opt_args(self
):
70 assert_raises(TypeError, self
._method
_with
_opt
_arg
, 42, 42, 42)
71 assert_raises(TypeError, self
._method
_with
_opt
_arg
, '42', '42', 42)
72 assert_raises(TypeError, self
._method
_with
_opt
_arg
, None, None, None)
73 eq(self
._method
_with
_opt
_arg
(None, 42, 42), (None, 42, 42))
74 eq(self
._method
_with
_opt
_arg
('42', None, 42), ('42', None, 42))
75 eq(self
._method
_with
_opt
_arg
(None, None, 42), (None, None, 42))
78 class TestRadosStateError(object):
79 def _requires_configuring(self
, rados
):
80 assert_raises(RadosStateError
, rados
.connect
)
82 def _requires_configuring_or_connected(self
, rados
):
83 assert_raises(RadosStateError
, rados
.conf_read_file
)
84 assert_raises(RadosStateError
, rados
.conf_parse_argv
, None)
85 assert_raises(RadosStateError
, rados
.conf_parse_env
)
86 assert_raises(RadosStateError
, rados
.conf_get
, 'opt')
87 assert_raises(RadosStateError
, rados
.conf_set
, 'opt', 'val')
88 assert_raises(RadosStateError
, rados
.ping_monitor
, 0)
90 def _requires_connected(self
, rados
):
91 assert_raises(RadosStateError
, rados
.pool_exists
, 'foo')
92 assert_raises(RadosStateError
, rados
.pool_lookup
, 'foo')
93 assert_raises(RadosStateError
, rados
.pool_reverse_lookup
, 0)
94 assert_raises(RadosStateError
, rados
.create_pool
, 'foo')
95 assert_raises(RadosStateError
, rados
.get_pool_base_tier
, 0)
96 assert_raises(RadosStateError
, rados
.delete_pool
, 'foo')
97 assert_raises(RadosStateError
, rados
.list_pools
)
98 assert_raises(RadosStateError
, rados
.get_fsid
)
99 assert_raises(RadosStateError
, rados
.open_ioctx
, 'foo')
100 assert_raises(RadosStateError
, rados
.mon_command
, '', b
'')
101 assert_raises(RadosStateError
, rados
.osd_command
, 0, '', b
'')
102 assert_raises(RadosStateError
, rados
.pg_command
, '', '', b
'')
103 assert_raises(RadosStateError
, rados
.wait_for_latest_osdmap
)
104 assert_raises(RadosStateError
, rados
.blacklist_add
, '127.0.0.1/123', 0)
106 def test_configuring(self
):
107 rados
= Rados(conffile
='')
108 eq('configuring', rados
.state
)
109 self
._requires
_connected
(rados
)
111 def test_connected(self
):
112 rados
= Rados(conffile
='')
114 eq('connected', rados
.state
)
115 self
._requires
_configuring
(rados
)
117 def test_shutdown(self
):
118 rados
= Rados(conffile
='')
121 eq('shutdown', rados
.state
)
122 self
._requires
_configuring
(rados
)
123 self
._requires
_configuring
_or
_connected
(rados
)
124 self
._requires
_connected
(rados
)
127 class TestRados(object):
130 self
.rados
= Rados(conffile
='')
131 self
.rados
.conf_parse_env('FOO_DOES_NOT_EXIST_BLAHBLAH')
132 self
.rados
.conf_parse_env()
135 # Assume any pre-existing pools are the cluster's defaults
136 self
.default_pools
= self
.rados
.list_pools()
139 self
.rados
.shutdown()
141 def test_ping_monitor(self
):
142 assert_raises(ObjectNotFound
, self
.rados
.ping_monitor
, 'not_exists_monitor')
143 cmd
= {'prefix': 'mon dump', 'format':'json'}
144 ret
, buf
, out
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
145 for mon
in json
.loads(buf
.decode('utf8'))['mons']:
147 output
= self
.rados
.ping_monitor(mon
['name'])
150 buf
= json
.loads(output
)
151 if buf
.get('health'):
154 def test_create(self
):
155 self
.rados
.create_pool('foo')
156 self
.rados
.delete_pool('foo')
158 def test_create_utf8(self
):
160 # Use encoded bytestring
161 poolname
= b
"\351\273\204"
164 self
.rados
.create_pool(poolname
)
165 assert self
.rados
.pool_exists(u
"\u9ec4")
166 self
.rados
.delete_pool(poolname
)
168 def test_pool_lookup_utf8(self
):
173 self
.rados
.create_pool(poolname
)
175 poolid
= self
.rados
.pool_lookup(poolname
)
176 eq(poolname
, self
.rados
.pool_reverse_lookup(poolid
))
178 self
.rados
.delete_pool(poolname
)
180 def test_create_auid(self
):
181 self
.rados
.create_pool('foo', 100)
182 assert self
.rados
.pool_exists('foo')
183 self
.rados
.delete_pool('foo')
185 def test_eexist(self
):
186 self
.rados
.create_pool('foo')
187 assert_raises(ObjectExists
, self
.rados
.create_pool
, 'foo')
188 self
.rados
.delete_pool('foo')
190 def list_non_default_pools(self
):
191 pools
= self
.rados
.list_pools()
192 for p
in self
.default_pools
:
196 def test_list_pools(self
):
197 eq(set(), self
.list_non_default_pools())
198 self
.rados
.create_pool('foo')
199 eq(set(['foo']), self
.list_non_default_pools())
200 self
.rados
.create_pool('bar')
201 eq(set(['foo', 'bar']), self
.list_non_default_pools())
202 self
.rados
.create_pool('baz')
203 eq(set(['foo', 'bar', 'baz']), self
.list_non_default_pools())
204 self
.rados
.delete_pool('foo')
205 eq(set(['bar', 'baz']), self
.list_non_default_pools())
206 self
.rados
.delete_pool('baz')
207 eq(set(['bar']), self
.list_non_default_pools())
208 self
.rados
.delete_pool('bar')
209 eq(set(), self
.list_non_default_pools())
210 self
.rados
.create_pool('a' * 500)
211 eq(set(['a' * 500]), self
.list_non_default_pools())
212 self
.rados
.delete_pool('a' * 500)
214 def test_get_pool_base_tier(self
):
215 self
.rados
.create_pool('foo')
217 self
.rados
.create_pool('foo-cache')
219 pool_id
= self
.rados
.pool_lookup('foo')
220 tier_pool_id
= self
.rados
.pool_lookup('foo-cache')
222 cmd
= {"prefix":"osd tier add", "pool":"foo", "tierpool":"foo-cache", "force_nonempty":""}
223 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
227 cmd
= {"prefix":"osd tier cache-mode", "pool":"foo-cache", "tierpool":"foo-cache", "mode":"readonly", "sure":"--yes-i-really-mean-it"}
228 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
231 eq(self
.rados
.wait_for_latest_osdmap(), 0)
233 eq(pool_id
, self
.rados
.get_pool_base_tier(pool_id
))
234 eq(pool_id
, self
.rados
.get_pool_base_tier(tier_pool_id
))
236 cmd
= {"prefix":"osd tier remove", "pool":"foo", "tierpool":"foo-cache"}
237 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
240 self
.rados
.delete_pool('foo-cache')
242 self
.rados
.delete_pool('foo')
244 def test_get_fsid(self
):
245 fsid
= self
.rados
.get_fsid()
248 def test_blacklist_add(self
):
249 self
.rados
.blacklist_add("1.2.3.4/123", 1)
251 def test_get_cluster_stats(self
):
252 stats
= self
.rados
.get_cluster_stats()
253 assert stats
['kb'] > 0
254 assert stats
['kb_avail'] > 0
255 assert stats
['kb_used'] > 0
256 assert stats
['num_objects'] >= 0
258 def test_monitor_log(self
):
259 lock
= threading
.Condition()
260 def cb(arg
, line
, who
, sec
, nsec
, seq
, level
, msg
):
261 # NOTE(sileht): the old pyrados API was received the pointer as int
262 # instead of the value of arg
268 # NOTE(sileht): force don't save the monitor into local var
269 # to ensure all references are correctly tracked into the lib
270 MonitorLog(self
.rados
, "debug", cb
, "arg")
273 MonitorLog(self
.rados
, "debug", None, None)
274 eq(None, self
.rados
.monitor_callback
)
276 class TestIoctx(object):
279 self
.rados
= Rados(conffile
='')
281 self
.rados
.create_pool('test_pool')
282 assert self
.rados
.pool_exists('test_pool')
283 self
.ioctx
= self
.rados
.open_ioctx('test_pool')
286 cmd
= {"prefix":"osd unset", "key":"noup"}
287 self
.rados
.mon_command(json
.dumps(cmd
), b
'')
289 self
.rados
.delete_pool('test_pool')
290 self
.rados
.shutdown()
292 def test_get_last_version(self
):
293 version
= self
.ioctx
.get_last_version()
296 def test_get_stats(self
):
297 stats
= self
.ioctx
.get_stats()
298 eq(stats
, {'num_objects_unfound': 0,
299 'num_objects_missing_on_primary': 0,
300 'num_object_clones': 0,
302 'num_object_copies': 0,
308 'num_objects_degraded': 0,
311 def test_change_auid(self
):
312 self
.ioctx
.change_auid(ANONYMOUS_AUID
)
313 self
.ioctx
.change_auid(ADMIN_AUID
)
315 def test_write(self
):
316 self
.ioctx
.write('abc', b
'abc')
317 eq(self
.ioctx
.read('abc'), b
'abc')
319 def test_write_full(self
):
320 self
.ioctx
.write('abc', b
'abc')
321 eq(self
.ioctx
.read('abc'), b
'abc')
322 self
.ioctx
.write_full('abc', b
'd')
323 eq(self
.ioctx
.read('abc'), b
'd')
325 def test_append(self
):
326 self
.ioctx
.write('abc', b
'a')
327 self
.ioctx
.append('abc', b
'b')
328 self
.ioctx
.append('abc', b
'c')
329 eq(self
.ioctx
.read('abc'), b
'abc')
331 def test_write_zeros(self
):
332 self
.ioctx
.write('abc', b
'a\0b\0c')
333 eq(self
.ioctx
.read('abc'), b
'a\0b\0c')
335 def test_trunc(self
):
336 self
.ioctx
.write('abc', b
'abc')
337 self
.ioctx
.trunc('abc', 2)
338 eq(self
.ioctx
.read('abc'), b
'ab')
339 size
= self
.ioctx
.stat('abc')[0]
342 def test_list_objects_empty(self
):
343 eq(list(self
.ioctx
.list_objects()), [])
345 def test_list_objects(self
):
346 self
.ioctx
.write('a', b
'')
347 self
.ioctx
.write('b', b
'foo')
348 self
.ioctx
.write_full('c', b
'bar')
349 self
.ioctx
.append('d', b
'jazz')
350 object_names
= [obj
.key
for obj
in self
.ioctx
.list_objects()]
351 eq(sorted(object_names
), ['a', 'b', 'c', 'd'])
353 def test_list_ns_objects(self
):
354 self
.ioctx
.write('a', b
'')
355 self
.ioctx
.write('b', b
'foo')
356 self
.ioctx
.write_full('c', b
'bar')
357 self
.ioctx
.append('d', b
'jazz')
358 self
.ioctx
.set_namespace("ns1")
359 self
.ioctx
.write('ns1-a', b
'')
360 self
.ioctx
.write('ns1-b', b
'foo')
361 self
.ioctx
.write_full('ns1-c', b
'bar')
362 self
.ioctx
.append('ns1-d', b
'jazz')
363 self
.ioctx
.append('d', b
'jazz')
364 self
.ioctx
.set_namespace(LIBRADOS_ALL_NSPACES
)
365 object_names
= [(obj
.nspace
, obj
.key
) for obj
in self
.ioctx
.list_objects()]
366 eq(sorted(object_names
), [('', 'a'), ('','b'), ('','c'), ('','d'),\
367 ('ns1', 'd'), ('ns1', 'ns1-a'), ('ns1', 'ns1-b'),\
368 ('ns1', 'ns1-c'), ('ns1', 'ns1-d')])
370 def test_xattrs(self
):
371 xattrs
= dict(a
=b
'1', b
=b
'2', c
=b
'3', d
=b
'a\0b', e
=b
'\0')
372 self
.ioctx
.write('abc', b
'')
373 for key
, value
in xattrs
.items():
374 self
.ioctx
.set_xattr('abc', key
, value
)
375 eq(self
.ioctx
.get_xattr('abc', key
), value
)
377 for key
, value
in self
.ioctx
.get_xattrs('abc'):
378 stored_xattrs
[key
] = value
379 eq(stored_xattrs
, xattrs
)
381 def test_obj_xattrs(self
):
382 xattrs
= dict(a
=b
'1', b
=b
'2', c
=b
'3', d
=b
'a\0b', e
=b
'\0')
383 self
.ioctx
.write('abc', b
'')
384 obj
= list(self
.ioctx
.list_objects())[0]
385 for key
, value
in xattrs
.items():
386 obj
.set_xattr(key
, value
)
387 eq(obj
.get_xattr(key
), value
)
389 for key
, value
in obj
.get_xattrs():
390 stored_xattrs
[key
] = value
391 eq(stored_xattrs
, xattrs
)
393 def test_create_snap(self
):
394 assert_raises(ObjectNotFound
, self
.ioctx
.remove_snap
, 'foo')
395 self
.ioctx
.create_snap('foo')
396 self
.ioctx
.remove_snap('foo')
398 def test_list_snaps_empty(self
):
399 eq(list(self
.ioctx
.list_snaps()), [])
401 def test_list_snaps(self
):
402 snaps
= ['snap1', 'snap2', 'snap3']
404 self
.ioctx
.create_snap(snap
)
405 listed_snaps
= [snap
.name
for snap
in self
.ioctx
.list_snaps()]
406 eq(snaps
, listed_snaps
)
408 def test_lookup_snap(self
):
409 self
.ioctx
.create_snap('foo')
410 snap
= self
.ioctx
.lookup_snap('foo')
413 def test_snap_timestamp(self
):
414 self
.ioctx
.create_snap('foo')
415 snap
= self
.ioctx
.lookup_snap('foo')
418 def test_remove_snap(self
):
419 self
.ioctx
.create_snap('foo')
420 (snap
,) = self
.ioctx
.list_snaps()
422 self
.ioctx
.remove_snap('foo')
423 eq(list(self
.ioctx
.list_snaps()), [])
425 def test_snap_rollback(self
):
426 self
.ioctx
.write("insnap", b
"contents1")
427 self
.ioctx
.create_snap("snap1")
428 self
.ioctx
.remove_object("insnap")
429 self
.ioctx
.snap_rollback("insnap", "snap1")
430 eq(self
.ioctx
.read("insnap"), b
"contents1")
431 self
.ioctx
.remove_snap("snap1")
432 self
.ioctx
.remove_object("insnap")
434 def test_snap_read(self
):
435 self
.ioctx
.write("insnap", b
"contents1")
436 self
.ioctx
.create_snap("snap1")
437 self
.ioctx
.remove_object("insnap")
438 snap
= self
.ioctx
.lookup_snap("snap1")
439 self
.ioctx
.set_read(snap
.snap_id
)
440 eq(self
.ioctx
.read("insnap"), b
"contents1")
441 self
.ioctx
.set_read(LIBRADOS_SNAP_HEAD
)
442 self
.ioctx
.write("inhead", b
"contents2")
443 eq(self
.ioctx
.read("inhead"), b
"contents2")
444 self
.ioctx
.remove_snap("snap1")
445 self
.ioctx
.remove_object("inhead")
447 def test_set_omap(self
):
448 keys
= ("1", "2", "3", "4")
449 values
= (b
"aaa", b
"bbb", b
"ccc", b
"\x04\x04\x04\x04")
450 with
WriteOpCtx(self
.ioctx
) as write_op
:
451 self
.ioctx
.set_omap(write_op
, keys
, values
)
452 write_op
.set_flags(LIBRADOS_OPERATION_SKIPRWLOCKS
)
453 self
.ioctx
.operate_write_op(write_op
, "hw")
454 with
ReadOpCtx(self
.ioctx
) as read_op
:
455 iter, ret
= self
.ioctx
.get_omap_vals(read_op
, "", "", 4)
457 self
.ioctx
.operate_read_op(read_op
, "hw")
459 eq(list(iter), [("2", b
"bbb"), ("3", b
"ccc"), ("4", b
"\x04\x04\x04\x04")])
460 with
ReadOpCtx(self
.ioctx
) as read_op
:
461 iter, ret
= self
.ioctx
.get_omap_vals(read_op
, "2", "", 4)
463 self
.ioctx
.operate_read_op(read_op
, "hw")
464 eq(("3", b
"ccc"), next(iter))
465 eq(list(iter), [("4", b
"\x04\x04\x04\x04")])
466 with
ReadOpCtx(self
.ioctx
) as read_op
:
467 iter, ret
= self
.ioctx
.get_omap_vals(read_op
, "", "2", 4)
469 read_op
.set_flags(LIBRADOS_OPERATION_BALANCE_READS
)
470 self
.ioctx
.operate_read_op(read_op
, "hw")
471 eq(list(iter), [("2", b
"bbb")])
473 def test_set_omap_aio(self
):
474 lock
= threading
.Condition()
482 keys
= ("1", "2", "3", "4")
483 values
= (b
"aaa", b
"bbb", b
"ccc", b
"\x04\x04\x04\x04")
484 with
WriteOpCtx(self
.ioctx
) as write_op
:
485 self
.ioctx
.set_omap(write_op
, keys
, values
)
486 comp
= self
.ioctx
.operate_aio_write_op(write_op
, "hw", cb
, cb
)
487 comp
.wait_for_complete()
492 eq(comp
.get_return_value(), 0)
494 with
ReadOpCtx(self
.ioctx
) as read_op
:
495 iter, ret
= self
.ioctx
.get_omap_vals(read_op
, "", "", 4)
497 comp
= self
.ioctx
.operate_aio_read_op(read_op
, "hw", cb
, cb
)
498 comp
.wait_for_complete()
503 eq(comp
.get_return_value(), 0)
505 eq(list(iter), [("2", b
"bbb"), ("3", b
"ccc"), ("4", b
"\x04\x04\x04\x04")])
507 def test_write_ops(self
):
508 with
WriteOpCtx(self
.ioctx
) as write_op
:
510 self
.ioctx
.operate_write_op(write_op
, "write_ops")
511 eq(self
.ioctx
.read('write_ops'), b
'')
512 write_op
.write_full(b
'1')
513 write_op
.append(b
'2')
514 self
.ioctx
.operate_write_op(write_op
, "write_ops")
515 eq(self
.ioctx
.read('write_ops'), b
'12')
516 write_op
.write_full(b
'12345')
517 write_op
.write(b
'x', 2)
518 self
.ioctx
.operate_write_op(write_op
, "write_ops")
519 eq(self
.ioctx
.read('write_ops'), b
'12x45')
520 write_op
.write_full(b
'12345')
522 self
.ioctx
.operate_write_op(write_op
, "write_ops")
523 eq(self
.ioctx
.read('write_ops'), b
'12\x00\x005')
524 write_op
.write_full(b
'12345')
526 self
.ioctx
.operate_write_op(write_op
, "write_ops")
527 eq(self
.ioctx
.read('write_ops'), b
'12')
529 self
.ioctx
.operate_write_op(write_op
, "write_ops")
530 with
assert_raises(ObjectNotFound
):
531 self
.ioctx
.read('write_ops')
533 def test_get_omap_vals_by_keys(self
):
534 keys
= ("1", "2", "3", "4")
535 values
= (b
"aaa", b
"bbb", b
"ccc", b
"\x04\x04\x04\x04")
536 with
WriteOpCtx(self
.ioctx
) as write_op
:
537 self
.ioctx
.set_omap(write_op
, keys
, values
)
538 self
.ioctx
.operate_write_op(write_op
, "hw")
539 with
ReadOpCtx(self
.ioctx
) as read_op
:
540 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
,("3","4",))
542 self
.ioctx
.operate_read_op(read_op
, "hw")
543 eq(list(iter), [("3", b
"ccc"), ("4", b
"\x04\x04\x04\x04")])
544 with
ReadOpCtx(self
.ioctx
) as read_op
:
545 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
,("3","4",))
547 with
assert_raises(ObjectNotFound
):
548 self
.ioctx
.operate_read_op(read_op
, "no_such")
550 def test_get_omap_keys(self
):
551 keys
= ("1", "2", "3")
552 values
= (b
"aaa", b
"bbb", b
"ccc")
553 with
WriteOpCtx(self
.ioctx
) as write_op
:
554 self
.ioctx
.set_omap(write_op
, keys
, values
)
555 self
.ioctx
.operate_write_op(write_op
, "hw")
556 with
ReadOpCtx(self
.ioctx
) as read_op
:
557 iter, ret
= self
.ioctx
.get_omap_keys(read_op
,"",2)
559 self
.ioctx
.operate_read_op(read_op
, "hw")
560 eq(list(iter), [("1", None), ("2", None)])
561 with
ReadOpCtx(self
.ioctx
) as read_op
:
562 iter, ret
= self
.ioctx
.get_omap_keys(read_op
,"",2)
564 with
assert_raises(ObjectNotFound
):
565 self
.ioctx
.operate_read_op(read_op
, "no_such")
567 def test_clear_omap(self
):
568 keys
= ("1", "2", "3")
569 values
= (b
"aaa", b
"bbb", b
"ccc")
570 with
WriteOpCtx(self
.ioctx
) as write_op
:
571 self
.ioctx
.set_omap(write_op
, keys
, values
)
572 self
.ioctx
.operate_write_op(write_op
, "hw")
573 with
WriteOpCtx(self
.ioctx
) as write_op_1
:
574 self
.ioctx
.clear_omap(write_op_1
)
575 self
.ioctx
.operate_write_op(write_op_1
, "hw")
576 with
ReadOpCtx(self
.ioctx
) as read_op
:
577 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
,("1",))
579 self
.ioctx
.operate_read_op(read_op
, "hw")
582 def test_locator(self
):
583 self
.ioctx
.set_locator_key("bar")
584 self
.ioctx
.write('foo', b
'contents1')
585 objects
= [i
for i
in self
.ioctx
.list_objects()]
587 eq(self
.ioctx
.get_locator_key(), "bar")
588 self
.ioctx
.set_locator_key("")
590 objects
[0].write(b
"contents2")
591 eq(self
.ioctx
.get_locator_key(), "")
592 self
.ioctx
.set_locator_key("bar")
593 contents
= self
.ioctx
.read("foo")
594 eq(contents
, b
"contents2")
595 eq(self
.ioctx
.get_locator_key(), "bar")
597 objects
= [i
for i
in self
.ioctx
.list_objects()]
599 self
.ioctx
.set_locator_key("")
601 def test_aio_write(self
):
602 lock
= threading
.Condition()
609 comp
= self
.ioctx
.aio_write("foo", b
"bar", 0, cb
, cb
)
610 comp
.wait_for_complete()
615 eq(comp
.get_return_value(), 0)
616 contents
= self
.ioctx
.read("foo")
618 [i
.remove() for i
in self
.ioctx
.list_objects()]
620 def test_aio_write_no_comp_ref(self
):
621 lock
= threading
.Condition()
628 # NOTE(sileht): force don't save the comp into local var
629 # to ensure all references are correctly tracked into the lib
630 self
.ioctx
.aio_write("foo", b
"bar", 0, cb
, cb
)
634 contents
= self
.ioctx
.read("foo")
636 [i
.remove() for i
in self
.ioctx
.list_objects()]
638 def test_aio_append(self
):
639 lock
= threading
.Condition()
646 comp
= self
.ioctx
.aio_write("foo", b
"bar", 0, cb
, cb
)
647 comp2
= self
.ioctx
.aio_append("foo", b
"baz", cb
, cb
)
648 comp
.wait_for_complete()
649 contents
= self
.ioctx
.read("foo")
650 eq(contents
, b
"barbaz")
654 eq(comp
.get_return_value(), 0)
655 eq(comp2
.get_return_value(), 0)
656 [i
.remove() for i
in self
.ioctx
.list_objects()]
658 def test_aio_write_full(self
):
659 lock
= threading
.Condition()
666 self
.ioctx
.aio_write("foo", b
"barbaz", 0, cb
, cb
)
667 comp
= self
.ioctx
.aio_write_full("foo", b
"bar", cb
, cb
)
668 comp
.wait_for_complete()
673 eq(comp
.get_return_value(), 0)
674 contents
= self
.ioctx
.read("foo")
676 [i
.remove() for i
in self
.ioctx
.list_objects()]
678 def test_aio_stat(self
):
679 lock
= threading
.Condition()
681 def cb(_
, size
, mtime
):
686 comp
= self
.ioctx
.aio_stat("foo", cb
)
687 comp
.wait_for_complete()
691 eq(comp
.get_return_value(), -2)
693 self
.ioctx
.write("foo", b
"bar")
695 comp
= self
.ioctx
.aio_stat("foo", cb
)
696 comp
.wait_for_complete()
700 eq(comp
.get_return_value(), 0)
702 [i
.remove() for i
in self
.ioctx
.list_objects()]
704 def _take_down_acting_set(self
, pool
, objectname
):
705 # find acting_set for pool:objectname and take it down; used to
706 # verify that async reads don't complete while acting set is missing
713 r
, jsonout
, _
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
714 objmap
= json
.loads(jsonout
.decode("utf-8"))
715 acting_set
= objmap
['acting']
716 cmd
= {"prefix":"osd set", "key":"noup"}
717 r
, _
, _
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
719 cmd
= {"prefix":"osd down", "ids":[str(i
) for i
in acting_set
]}
720 r
, _
, _
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
723 # wait for OSDs to acknowledge the down
724 eq(self
.rados
.wait_for_latest_osdmap(), 0)
726 def _let_osds_back_up(self
):
727 cmd
= {"prefix":"osd unset", "key":"noup"}
728 r
, _
, _
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
731 def test_aio_read(self
):
732 # this is a list so that the local cb() can modify it
734 lock
= threading
.Condition()
739 payload
= b
"bar\000frob"
740 self
.ioctx
.write("foo", payload
)
742 # test1: use wait_for_complete() and wait for cb by
744 self
._take
_down
_acting
_set
('test_pool', 'foo')
745 comp
= self
.ioctx
.aio_read("foo", len(payload
), 0, cb
)
746 eq(False, comp
.is_complete())
748 eq(False, comp
.is_complete())
751 self
._let
_osds
_back
_up
()
752 comp
.wait_for_complete()
755 while retval
[0] is None and loops
<= 10:
760 eq(retval
[0], payload
)
761 eq(sys
.getrefcount(comp
), 2)
763 # test2: use wait_for_complete_and_cb(), verify retval[0] is
764 # set by the time we regain control
767 self
._take
_down
_acting
_set
('test_pool', 'foo')
768 comp
= self
.ioctx
.aio_read("foo", len(payload
), 0, cb
)
769 eq(False, comp
.is_complete())
771 eq(False, comp
.is_complete())
774 self
._let
_osds
_back
_up
()
776 comp
.wait_for_complete_and_cb()
777 assert(retval
[0] is not None)
778 eq(retval
[0], payload
)
779 eq(sys
.getrefcount(comp
), 2)
781 # test3: error case, use wait_for_complete_and_cb(), verify retval[0] is
782 # set by the time we regain control
785 self
._take
_down
_acting
_set
('test_pool', 'bar')
786 comp
= self
.ioctx
.aio_read("bar", len(payload
), 0, cb
)
787 eq(False, comp
.is_complete())
789 eq(False, comp
.is_complete())
792 self
._let
_osds
_back
_up
()
794 comp
.wait_for_complete_and_cb()
796 assert(comp
.get_return_value() < 0)
797 eq(sys
.getrefcount(comp
), 2)
799 [i
.remove() for i
in self
.ioctx
.list_objects()]
802 self
.ioctx
.lock_exclusive("foo", "lock", "locker", "desc_lock",
804 assert_raises(ObjectExists
,
805 self
.ioctx
.lock_exclusive
,
806 "foo", "lock", "locker", "desc_lock", 10000, 0)
807 self
.ioctx
.unlock("foo", "lock", "locker")
808 assert_raises(ObjectNotFound
, self
.ioctx
.unlock
, "foo", "lock", "locker")
810 self
.ioctx
.lock_shared("foo", "lock", "locker1", "tag", "desc_lock",
812 self
.ioctx
.lock_shared("foo", "lock", "locker2", "tag", "desc_lock",
814 assert_raises(ObjectBusy
,
815 self
.ioctx
.lock_exclusive
,
816 "foo", "lock", "locker3", "desc_lock", 10000, 0)
817 self
.ioctx
.unlock("foo", "lock", "locker1")
818 self
.ioctx
.unlock("foo", "lock", "locker2")
819 assert_raises(ObjectNotFound
, self
.ioctx
.unlock
, "foo", "lock", "locker1")
820 assert_raises(ObjectNotFound
, self
.ioctx
.unlock
, "foo", "lock", "locker2")
822 def test_execute(self
):
823 self
.ioctx
.write("foo", b
"") # ensure object exists
825 ret
, buf
= self
.ioctx
.execute("foo", "hello", "say_hello", b
"")
826 eq(buf
, b
"Hello, world!")
828 ret
, buf
= self
.ioctx
.execute("foo", "hello", "say_hello", b
"nose")
829 eq(buf
, b
"Hello, nose!")
831 def test_aio_execute(self
):
834 lock
= threading
.Condition()
837 if retval
[0] is None:
841 self
.ioctx
.write("foo", b
"") # ensure object exists
843 comp
= self
.ioctx
.aio_execute("foo", "hello", "say_hello", b
"", 32, cb
, cb
)
844 comp
.wait_for_complete()
848 eq(comp
.get_return_value(), 13)
849 eq(retval
[0], b
"Hello, world!")
852 comp
= self
.ioctx
.aio_execute("foo", "hello", "say_hello", b
"nose", 32, cb
, cb
)
853 comp
.wait_for_complete()
857 eq(comp
.get_return_value(), 12)
858 eq(retval
[0], b
"Hello, nose!")
860 [i
.remove() for i
in self
.ioctx
.list_objects()]
862 class TestObject(object):
865 self
.rados
= Rados(conffile
='')
867 self
.rados
.create_pool('test_pool')
868 assert self
.rados
.pool_exists('test_pool')
869 self
.ioctx
= self
.rados
.open_ioctx('test_pool')
870 self
.ioctx
.write('foo', b
'bar')
871 self
.object = Object(self
.ioctx
, 'foo')
876 self
.rados
.delete_pool('test_pool')
877 self
.rados
.shutdown()
881 eq(self
.object.read(3), b
'bar')
882 eq(self
.object.read(100), b
'')
885 self
.object.write(b
'blah')
887 eq(self
.object.read(4), b
'blah')
889 eq(self
.object.read(3), b
'lah')
891 def test_write(self
):
892 self
.object.write(b
'barbaz')
894 eq(self
.object.read(3), b
'bar')
895 eq(self
.object.read(3), b
'baz')
897 class TestCommand(object):
900 self
.rados
= Rados(conffile
='')
904 self
.rados
.shutdown()
906 def test_monmap_dump(self
):
908 # check for success and some plain output with epoch in it
909 cmd
= {"prefix":"mon dump"}
910 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
913 assert(b
'epoch' in buf
)
915 # JSON, and grab current epoch
916 cmd
['format'] = 'json'
917 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
920 d
= json
.loads(buf
.decode("utf-8"))
924 # assume epoch + 1000 does not exist; test for ENOENT
925 cmd
['epoch'] = epoch
+ 1000
926 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
927 eq(ret
, -errno
.ENOENT
)
931 # send to specific target by name
932 target
= d
['mons'][0]['name']
934 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30,
938 d
= json
.loads(buf
.decode("utf-8"))
942 target
= d
['mons'][0]['rank']
944 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30,
948 d
= json
.loads(buf
.decode("utf-8"))
951 def test_osd_bench(self
):
952 cmd
= dict(prefix
='bench', size
=4096, count
=8192)
953 ret
, buf
, err
= self
.rados
.osd_command(0, json
.dumps(cmd
), b
'',
957 out
= json
.loads(err
)
958 eq(out
['blocksize'], cmd
['size'])
959 eq(out
['bytes_written'], cmd
['count'])
961 def test_ceph_osd_pool_create_utf8(self
):
963 # Use encoded bytestring
964 poolname
= b
"\351\273\205"
968 cmd
= {"prefix": "osd pool create", "pg_num": 16, "pool": poolname
}
969 ret
, buf
, out
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
972 eq(u
"pool '\u9ec5' created", out
)