1 from __future__
import print_function
2 from nose
import SkipTest
3 from nose
.tools
import eq_
as eq
, ok_
as ok
, assert_raises
4 from rados
import (Rados
, Error
, RadosStateError
, Object
, ObjectExists
,
5 ObjectNotFound
, ObjectBusy
, requires
, opt
,
6 LIBRADOS_ALL_NSPACES
, WriteOpCtx
, ReadOpCtx
, LIBRADOS_CREATE_EXCLUSIVE
,
7 LIBRADOS_SNAP_HEAD
, LIBRADOS_OPERATION_BALANCE_READS
, LIBRADOS_OPERATION_SKIPRWLOCKS
, MonitorLog
)
16 # Are we running Python 2.x
17 _python2
= sys
.version_info
[0] < 3
19 def test_rados_init_error():
20 assert_raises(Error
, Rados
, conffile
='', rados_id
='admin',
22 assert_raises(Error
, Rados
, conffile
='', name
='invalid')
23 assert_raises(Error
, Rados
, conffile
='', name
='bad.invalid')
25 def test_rados_init():
26 with
Rados(conffile
='', rados_id
='admin'):
28 with
Rados(conffile
='', name
='client.admin'):
30 with
Rados(conffile
='', name
='client.admin'):
32 with
Rados(conffile
='', name
='client.admin'):
35 def test_ioctx_context_manager():
36 with
Rados(conffile
='', rados_id
='admin') as conn
:
37 with conn
.open_ioctx('rbd') as ioctx
:
40 def test_parse_argv():
41 args
= ['osd', 'pool', 'delete', 'foobar', 'foobar', '--yes-i-really-really-mean-it']
43 eq(args
, r
.conf_parse_argv(args
))
45 def test_parse_argv_empty_str():
48 eq(args
, r
.conf_parse_argv(args
))
50 class TestRequires(object):
51 @requires(('foo', str), ('bar', int), ('baz', int))
52 def _method_plain(self
, foo
, bar
, baz
):
53 ok(isinstance(foo
, str))
54 ok(isinstance(bar
, int))
55 ok(isinstance(baz
, int))
56 return (foo
, bar
, baz
)
58 def test_method_plain(self
):
59 assert_raises(TypeError, self
._method
_plain
, 42, 42, 42)
60 assert_raises(TypeError, self
._method
_plain
, '42', '42', '42')
61 assert_raises(TypeError, self
._method
_plain
, foo
='42', bar
='42', baz
='42')
62 eq(self
._method
_plain
('42', 42, 42), ('42', 42, 42))
63 eq(self
._method
_plain
(foo
='42', bar
=42, baz
=42), ('42', 42, 42))
65 @requires(('opt_foo', opt(str)), ('opt_bar', opt(int)), ('baz', int))
66 def _method_with_opt_arg(self
, foo
, bar
, baz
):
67 ok(isinstance(foo
, str) or foo
is None)
68 ok(isinstance(bar
, int) or bar
is None)
69 ok(isinstance(baz
, int))
70 return (foo
, bar
, baz
)
72 def test_method_with_opt_args(self
):
73 assert_raises(TypeError, self
._method
_with
_opt
_arg
, 42, 42, 42)
74 assert_raises(TypeError, self
._method
_with
_opt
_arg
, '42', '42', 42)
75 assert_raises(TypeError, self
._method
_with
_opt
_arg
, None, None, None)
76 eq(self
._method
_with
_opt
_arg
(None, 42, 42), (None, 42, 42))
77 eq(self
._method
_with
_opt
_arg
('42', None, 42), ('42', None, 42))
78 eq(self
._method
_with
_opt
_arg
(None, None, 42), (None, None, 42))
81 class TestRadosStateError(object):
82 def _requires_configuring(self
, rados
):
83 assert_raises(RadosStateError
, rados
.connect
)
85 def _requires_configuring_or_connected(self
, rados
):
86 assert_raises(RadosStateError
, rados
.conf_read_file
)
87 assert_raises(RadosStateError
, rados
.conf_parse_argv
, None)
88 assert_raises(RadosStateError
, rados
.conf_parse_env
)
89 assert_raises(RadosStateError
, rados
.conf_get
, 'opt')
90 assert_raises(RadosStateError
, rados
.conf_set
, 'opt', 'val')
91 assert_raises(RadosStateError
, rados
.ping_monitor
, 0)
93 def _requires_connected(self
, rados
):
94 assert_raises(RadosStateError
, rados
.pool_exists
, 'foo')
95 assert_raises(RadosStateError
, rados
.pool_lookup
, 'foo')
96 assert_raises(RadosStateError
, rados
.pool_reverse_lookup
, 0)
97 assert_raises(RadosStateError
, rados
.create_pool
, 'foo')
98 assert_raises(RadosStateError
, rados
.get_pool_base_tier
, 0)
99 assert_raises(RadosStateError
, rados
.delete_pool
, 'foo')
100 assert_raises(RadosStateError
, rados
.list_pools
)
101 assert_raises(RadosStateError
, rados
.get_fsid
)
102 assert_raises(RadosStateError
, rados
.open_ioctx
, 'foo')
103 assert_raises(RadosStateError
, rados
.mon_command
, '', b
'')
104 assert_raises(RadosStateError
, rados
.osd_command
, 0, '', b
'')
105 assert_raises(RadosStateError
, rados
.pg_command
, '', '', b
'')
106 assert_raises(RadosStateError
, rados
.wait_for_latest_osdmap
)
107 assert_raises(RadosStateError
, rados
.blacklist_add
, '127.0.0.1/123', 0)
109 def test_configuring(self
):
110 rados
= Rados(conffile
='')
111 eq('configuring', rados
.state
)
112 self
._requires
_connected
(rados
)
114 def test_connected(self
):
115 rados
= Rados(conffile
='')
117 eq('connected', rados
.state
)
118 self
._requires
_configuring
(rados
)
120 def test_shutdown(self
):
121 rados
= Rados(conffile
='')
124 eq('shutdown', rados
.state
)
125 self
._requires
_configuring
(rados
)
126 self
._requires
_configuring
_or
_connected
(rados
)
127 self
._requires
_connected
(rados
)
130 class TestRados(object):
133 self
.rados
= Rados(conffile
='')
134 self
.rados
.conf_parse_env('FOO_DOES_NOT_EXIST_BLAHBLAH')
135 self
.rados
.conf_parse_env()
138 # Assume any pre-existing pools are the cluster's defaults
139 self
.default_pools
= self
.rados
.list_pools()
142 self
.rados
.shutdown()
144 def test_ping_monitor(self
):
145 assert_raises(ObjectNotFound
, self
.rados
.ping_monitor
, 'not_exists_monitor')
146 cmd
= {'prefix': 'mon dump', 'format':'json'}
147 ret
, buf
, out
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
148 for mon
in json
.loads(buf
.decode('utf8'))['mons']:
150 output
= self
.rados
.ping_monitor(mon
['name'])
153 buf
= json
.loads(output
)
154 if buf
.get('health'):
157 def test_create(self
):
158 self
.rados
.create_pool('foo')
159 self
.rados
.delete_pool('foo')
161 def test_create_utf8(self
):
163 # Use encoded bytestring
164 poolname
= b
"\351\273\204"
167 self
.rados
.create_pool(poolname
)
168 assert self
.rados
.pool_exists(u
"\u9ec4")
169 self
.rados
.delete_pool(poolname
)
171 def test_pool_lookup_utf8(self
):
176 self
.rados
.create_pool(poolname
)
178 poolid
= self
.rados
.pool_lookup(poolname
)
179 eq(poolname
, self
.rados
.pool_reverse_lookup(poolid
))
181 self
.rados
.delete_pool(poolname
)
183 def test_eexist(self
):
184 self
.rados
.create_pool('foo')
185 assert_raises(ObjectExists
, self
.rados
.create_pool
, 'foo')
186 self
.rados
.delete_pool('foo')
188 def list_non_default_pools(self
):
189 pools
= self
.rados
.list_pools()
190 for p
in self
.default_pools
:
194 def test_list_pools(self
):
195 eq(set(), self
.list_non_default_pools())
196 self
.rados
.create_pool('foo')
197 eq(set(['foo']), self
.list_non_default_pools())
198 self
.rados
.create_pool('bar')
199 eq(set(['foo', 'bar']), self
.list_non_default_pools())
200 self
.rados
.create_pool('baz')
201 eq(set(['foo', 'bar', 'baz']), self
.list_non_default_pools())
202 self
.rados
.delete_pool('foo')
203 eq(set(['bar', 'baz']), self
.list_non_default_pools())
204 self
.rados
.delete_pool('baz')
205 eq(set(['bar']), self
.list_non_default_pools())
206 self
.rados
.delete_pool('bar')
207 eq(set(), self
.list_non_default_pools())
208 self
.rados
.create_pool('a' * 500)
209 eq(set(['a' * 500]), self
.list_non_default_pools())
210 self
.rados
.delete_pool('a' * 500)
212 def test_get_pool_base_tier(self
):
213 self
.rados
.create_pool('foo')
215 self
.rados
.create_pool('foo-cache')
217 pool_id
= self
.rados
.pool_lookup('foo')
218 tier_pool_id
= self
.rados
.pool_lookup('foo-cache')
220 cmd
= {"prefix":"osd tier add", "pool":"foo", "tierpool":"foo-cache", "force_nonempty":""}
221 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
225 cmd
= {"prefix":"osd tier cache-mode", "pool":"foo-cache", "tierpool":"foo-cache", "mode":"readonly", "yes_i_really_mean_it": True}
226 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
229 eq(self
.rados
.wait_for_latest_osdmap(), 0)
231 eq(pool_id
, self
.rados
.get_pool_base_tier(pool_id
))
232 eq(pool_id
, self
.rados
.get_pool_base_tier(tier_pool_id
))
234 cmd
= {"prefix":"osd tier remove", "pool":"foo", "tierpool":"foo-cache"}
235 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
238 self
.rados
.delete_pool('foo-cache')
240 self
.rados
.delete_pool('foo')
242 def test_get_fsid(self
):
243 fsid
= self
.rados
.get_fsid()
244 assert re
.match('[0-9a-f\-]{36}', fsid
, re
.I
)
246 def test_blacklist_add(self
):
247 self
.rados
.blacklist_add("1.2.3.4/123", 1)
249 def test_get_cluster_stats(self
):
250 stats
= self
.rados
.get_cluster_stats()
251 assert stats
['kb'] > 0
252 assert stats
['kb_avail'] > 0
253 assert stats
['kb_used'] > 0
254 assert stats
['num_objects'] >= 0
256 def test_monitor_log(self
):
257 lock
= threading
.Condition()
258 def cb(arg
, line
, who
, sec
, nsec
, seq
, level
, msg
):
259 # NOTE(sileht): the old pyrados API was received the pointer as int
260 # instead of the value of arg
266 # NOTE(sileht): force don't save the monitor into local var
267 # to ensure all references are correctly tracked into the lib
268 MonitorLog(self
.rados
, "debug", cb
, "arg")
271 MonitorLog(self
.rados
, "debug", None, None)
272 eq(None, self
.rados
.monitor_callback
)
274 class TestIoctx(object):
277 self
.rados
= Rados(conffile
='')
279 self
.rados
.create_pool('test_pool')
280 assert self
.rados
.pool_exists('test_pool')
281 self
.ioctx
= self
.rados
.open_ioctx('test_pool')
284 cmd
= {"prefix":"osd unset", "key":"noup"}
285 self
.rados
.mon_command(json
.dumps(cmd
), b
'')
287 self
.rados
.delete_pool('test_pool')
288 self
.rados
.shutdown()
290 def test_get_last_version(self
):
291 version
= self
.ioctx
.get_last_version()
294 def test_get_stats(self
):
295 stats
= self
.ioctx
.get_stats()
296 eq(stats
, {'num_objects_unfound': 0,
297 'num_objects_missing_on_primary': 0,
298 'num_object_clones': 0,
300 'num_object_copies': 0,
306 'num_objects_degraded': 0,
309 def test_write(self
):
310 self
.ioctx
.write('abc', b
'abc')
311 eq(self
.ioctx
.read('abc'), b
'abc')
313 def test_write_full(self
):
314 self
.ioctx
.write('abc', b
'abc')
315 eq(self
.ioctx
.read('abc'), b
'abc')
316 self
.ioctx
.write_full('abc', b
'd')
317 eq(self
.ioctx
.read('abc'), b
'd')
319 def test_writesame(self
):
320 self
.ioctx
.writesame('ob', b
'rzx', 9)
321 eq(self
.ioctx
.read('ob'), b
'rzxrzxrzx')
323 def test_append(self
):
324 self
.ioctx
.write('abc', b
'a')
325 self
.ioctx
.append('abc', b
'b')
326 self
.ioctx
.append('abc', b
'c')
327 eq(self
.ioctx
.read('abc'), b
'abc')
329 def test_write_zeros(self
):
330 self
.ioctx
.write('abc', b
'a\0b\0c')
331 eq(self
.ioctx
.read('abc'), b
'a\0b\0c')
333 def test_trunc(self
):
334 self
.ioctx
.write('abc', b
'abc')
335 self
.ioctx
.trunc('abc', 2)
336 eq(self
.ioctx
.read('abc'), b
'ab')
337 size
= self
.ioctx
.stat('abc')[0]
340 def test_list_objects_empty(self
):
341 eq(list(self
.ioctx
.list_objects()), [])
343 def test_list_objects(self
):
344 self
.ioctx
.write('a', b
'')
345 self
.ioctx
.write('b', b
'foo')
346 self
.ioctx
.write_full('c', b
'bar')
347 self
.ioctx
.append('d', b
'jazz')
348 object_names
= [obj
.key
for obj
in self
.ioctx
.list_objects()]
349 eq(sorted(object_names
), ['a', 'b', 'c', 'd'])
351 def test_list_ns_objects(self
):
352 self
.ioctx
.write('a', b
'')
353 self
.ioctx
.write('b', b
'foo')
354 self
.ioctx
.write_full('c', b
'bar')
355 self
.ioctx
.append('d', b
'jazz')
356 self
.ioctx
.set_namespace("ns1")
357 self
.ioctx
.write('ns1-a', b
'')
358 self
.ioctx
.write('ns1-b', b
'foo')
359 self
.ioctx
.write_full('ns1-c', b
'bar')
360 self
.ioctx
.append('ns1-d', b
'jazz')
361 self
.ioctx
.append('d', b
'jazz')
362 self
.ioctx
.set_namespace(LIBRADOS_ALL_NSPACES
)
363 object_names
= [(obj
.nspace
, obj
.key
) for obj
in self
.ioctx
.list_objects()]
364 eq(sorted(object_names
), [('', 'a'), ('','b'), ('','c'), ('','d'),\
365 ('ns1', 'd'), ('ns1', 'ns1-a'), ('ns1', 'ns1-b'),\
366 ('ns1', 'ns1-c'), ('ns1', 'ns1-d')])
368 def test_xattrs(self
):
369 xattrs
= dict(a
=b
'1', b
=b
'2', c
=b
'3', d
=b
'a\0b', e
=b
'\0', f
=b
'')
370 self
.ioctx
.write('abc', b
'')
371 for key
, value
in xattrs
.items():
372 self
.ioctx
.set_xattr('abc', key
, value
)
373 eq(self
.ioctx
.get_xattr('abc', key
), value
)
375 for key
, value
in self
.ioctx
.get_xattrs('abc'):
376 stored_xattrs
[key
] = value
377 eq(stored_xattrs
, xattrs
)
379 def test_obj_xattrs(self
):
380 xattrs
= dict(a
=b
'1', b
=b
'2', c
=b
'3', d
=b
'a\0b', e
=b
'\0', f
=b
'')
381 self
.ioctx
.write('abc', b
'')
382 obj
= list(self
.ioctx
.list_objects())[0]
383 for key
, value
in xattrs
.items():
384 obj
.set_xattr(key
, value
)
385 eq(obj
.get_xattr(key
), value
)
387 for key
, value
in obj
.get_xattrs():
388 stored_xattrs
[key
] = value
389 eq(stored_xattrs
, xattrs
)
391 def test_get_pool_id(self
):
392 eq(self
.ioctx
.get_pool_id(), self
.rados
.pool_lookup('test_pool'))
394 def test_get_pool_name(self
):
395 eq(self
.ioctx
.get_pool_name(), 'test_pool')
397 def test_create_snap(self
):
398 assert_raises(ObjectNotFound
, self
.ioctx
.remove_snap
, 'foo')
399 self
.ioctx
.create_snap('foo')
400 self
.ioctx
.remove_snap('foo')
402 def test_list_snaps_empty(self
):
403 eq(list(self
.ioctx
.list_snaps()), [])
405 def test_list_snaps(self
):
406 snaps
= ['snap1', 'snap2', 'snap3']
408 self
.ioctx
.create_snap(snap
)
409 listed_snaps
= [snap
.name
for snap
in self
.ioctx
.list_snaps()]
410 eq(snaps
, listed_snaps
)
412 def test_lookup_snap(self
):
413 self
.ioctx
.create_snap('foo')
414 snap
= self
.ioctx
.lookup_snap('foo')
417 def test_snap_timestamp(self
):
418 self
.ioctx
.create_snap('foo')
419 snap
= self
.ioctx
.lookup_snap('foo')
422 def test_remove_snap(self
):
423 self
.ioctx
.create_snap('foo')
424 (snap
,) = self
.ioctx
.list_snaps()
426 self
.ioctx
.remove_snap('foo')
427 eq(list(self
.ioctx
.list_snaps()), [])
429 def test_snap_rollback(self
):
430 self
.ioctx
.write("insnap", b
"contents1")
431 self
.ioctx
.create_snap("snap1")
432 self
.ioctx
.remove_object("insnap")
433 self
.ioctx
.snap_rollback("insnap", "snap1")
434 eq(self
.ioctx
.read("insnap"), b
"contents1")
435 self
.ioctx
.remove_snap("snap1")
436 self
.ioctx
.remove_object("insnap")
438 def test_snap_read(self
):
439 self
.ioctx
.write("insnap", b
"contents1")
440 self
.ioctx
.create_snap("snap1")
441 self
.ioctx
.remove_object("insnap")
442 snap
= self
.ioctx
.lookup_snap("snap1")
443 self
.ioctx
.set_read(snap
.snap_id
)
444 eq(self
.ioctx
.read("insnap"), b
"contents1")
445 self
.ioctx
.set_read(LIBRADOS_SNAP_HEAD
)
446 self
.ioctx
.write("inhead", b
"contents2")
447 eq(self
.ioctx
.read("inhead"), b
"contents2")
448 self
.ioctx
.remove_snap("snap1")
449 self
.ioctx
.remove_object("inhead")
451 def test_set_omap(self
):
452 keys
= ("1", "2", "3", "4")
453 values
= (b
"aaa", b
"bbb", b
"ccc", b
"\x04\x04\x04\x04")
454 with
WriteOpCtx() as write_op
:
455 self
.ioctx
.set_omap(write_op
, keys
, values
)
456 write_op
.set_flags(LIBRADOS_OPERATION_SKIPRWLOCKS
)
457 self
.ioctx
.operate_write_op(write_op
, "hw")
458 with
ReadOpCtx() as read_op
:
459 iter, ret
= self
.ioctx
.get_omap_vals(read_op
, "", "", 4)
461 self
.ioctx
.operate_read_op(read_op
, "hw")
463 eq(list(iter), [("2", b
"bbb"), ("3", b
"ccc"), ("4", b
"\x04\x04\x04\x04")])
464 with
ReadOpCtx() as read_op
:
465 iter, ret
= self
.ioctx
.get_omap_vals(read_op
, "2", "", 4)
467 self
.ioctx
.operate_read_op(read_op
, "hw")
468 eq(("3", b
"ccc"), next(iter))
469 eq(list(iter), [("4", b
"\x04\x04\x04\x04")])
470 with
ReadOpCtx() as read_op
:
471 iter, ret
= self
.ioctx
.get_omap_vals(read_op
, "", "2", 4)
473 read_op
.set_flags(LIBRADOS_OPERATION_BALANCE_READS
)
474 self
.ioctx
.operate_read_op(read_op
, "hw")
475 eq(list(iter), [("2", b
"bbb")])
477 def test_set_omap_aio(self
):
478 lock
= threading
.Condition()
486 keys
= ("1", "2", "3", "4")
487 values
= (b
"aaa", b
"bbb", b
"ccc", b
"\x04\x04\x04\x04")
488 with
WriteOpCtx() as write_op
:
489 self
.ioctx
.set_omap(write_op
, keys
, values
)
490 comp
= self
.ioctx
.operate_aio_write_op(write_op
, "hw", cb
, cb
)
491 comp
.wait_for_complete()
495 eq(comp
.get_return_value(), 0)
497 with
ReadOpCtx() as read_op
:
498 iter, ret
= self
.ioctx
.get_omap_vals(read_op
, "", "", 4)
500 comp
= self
.ioctx
.operate_aio_read_op(read_op
, "hw", cb
, cb
)
501 comp
.wait_for_complete()
505 eq(comp
.get_return_value(), 0)
507 eq(list(iter), [("2", b
"bbb"), ("3", b
"ccc"), ("4", b
"\x04\x04\x04\x04")])
509 def test_write_ops(self
):
510 with
WriteOpCtx() as write_op
:
512 self
.ioctx
.operate_write_op(write_op
, "write_ops")
513 eq(self
.ioctx
.read('write_ops'), b
'')
514 write_op
.write_full(b
'1')
515 write_op
.append(b
'2')
516 self
.ioctx
.operate_write_op(write_op
, "write_ops")
517 eq(self
.ioctx
.read('write_ops'), b
'12')
518 write_op
.write_full(b
'12345')
519 write_op
.write(b
'x', 2)
520 self
.ioctx
.operate_write_op(write_op
, "write_ops")
521 eq(self
.ioctx
.read('write_ops'), b
'12x45')
522 write_op
.write_full(b
'12345')
524 self
.ioctx
.operate_write_op(write_op
, "write_ops")
525 eq(self
.ioctx
.read('write_ops'), b
'12\x00\x005')
526 write_op
.write_full(b
'12345')
528 self
.ioctx
.operate_write_op(write_op
, "write_ops")
529 eq(self
.ioctx
.read('write_ops'), b
'12')
531 self
.ioctx
.operate_write_op(write_op
, "write_ops")
532 with
assert_raises(ObjectNotFound
):
533 self
.ioctx
.read('write_ops')
535 def test_execute_op(self
):
536 with
WriteOpCtx() as write_op
:
537 write_op
.execute("hello", "record_hello", b
"ebs")
538 self
.ioctx
.operate_write_op(write_op
, "object")
539 eq(self
.ioctx
.read('object'), b
"Hello, ebs!")
541 def test_writesame_op(self
):
542 with
WriteOpCtx() as write_op
:
543 write_op
.writesame(b
'rzx', 9)
544 self
.ioctx
.operate_write_op(write_op
, 'abc')
545 eq(self
.ioctx
.read('abc'), b
'rzxrzxrzx')
547 def test_get_omap_vals_by_keys(self
):
548 keys
= ("1", "2", "3", "4")
549 values
= (b
"aaa", b
"bbb", b
"ccc", b
"\x04\x04\x04\x04")
550 with
WriteOpCtx() as write_op
:
551 self
.ioctx
.set_omap(write_op
, keys
, values
)
552 self
.ioctx
.operate_write_op(write_op
, "hw")
553 with
ReadOpCtx() as read_op
:
554 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
,("3","4",))
556 self
.ioctx
.operate_read_op(read_op
, "hw")
557 eq(list(iter), [("3", b
"ccc"), ("4", b
"\x04\x04\x04\x04")])
558 with
ReadOpCtx() as read_op
:
559 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
,("3","4",))
561 with
assert_raises(ObjectNotFound
):
562 self
.ioctx
.operate_read_op(read_op
, "no_such")
564 def test_get_omap_keys(self
):
565 keys
= ("1", "2", "3")
566 values
= (b
"aaa", b
"bbb", b
"ccc")
567 with
WriteOpCtx() as write_op
:
568 self
.ioctx
.set_omap(write_op
, keys
, values
)
569 self
.ioctx
.operate_write_op(write_op
, "hw")
570 with
ReadOpCtx() as read_op
:
571 iter, ret
= self
.ioctx
.get_omap_keys(read_op
,"",2)
573 self
.ioctx
.operate_read_op(read_op
, "hw")
574 eq(list(iter), [("1", None), ("2", None)])
575 with
ReadOpCtx() as read_op
:
576 iter, ret
= self
.ioctx
.get_omap_keys(read_op
,"",2)
578 with
assert_raises(ObjectNotFound
):
579 self
.ioctx
.operate_read_op(read_op
, "no_such")
581 def test_clear_omap(self
):
582 keys
= ("1", "2", "3")
583 values
= (b
"aaa", b
"bbb", b
"ccc")
584 with
WriteOpCtx() as write_op
:
585 self
.ioctx
.set_omap(write_op
, keys
, values
)
586 self
.ioctx
.operate_write_op(write_op
, "hw")
587 with
WriteOpCtx() as write_op_1
:
588 self
.ioctx
.clear_omap(write_op_1
)
589 self
.ioctx
.operate_write_op(write_op_1
, "hw")
590 with
ReadOpCtx() as read_op
:
591 iter, ret
= self
.ioctx
.get_omap_vals_by_keys(read_op
,("1",))
593 self
.ioctx
.operate_read_op(read_op
, "hw")
596 def test_xattrs_op(self
):
597 xattrs
= dict(a
=b
'1', b
=b
'2', c
=b
'3', d
=b
'a\0b', e
=b
'\0')
598 with
WriteOpCtx() as write_op
:
599 write_op
.new(LIBRADOS_CREATE_EXCLUSIVE
)
600 for key
, value
in xattrs
.items():
601 write_op
.set_xattr(key
, value
)
602 self
.ioctx
.operate_write_op(write_op
, "abc")
603 eq(self
.ioctx
.get_xattr('abc', key
), value
)
605 for key
, value
in self
.ioctx
.get_xattrs('abc'):
606 stored_xattrs_1
[key
] = value
607 eq(stored_xattrs_1
, xattrs
)
608 for key
in xattrs
.keys():
609 write_op
.rm_xattr(key
)
610 self
.ioctx
.operate_write_op(write_op
, "abc")
612 for key
, value
in self
.ioctx
.get_xattrs('abc'):
613 stored_xattrs_2
[key
] = value
614 eq(stored_xattrs_2
, {})
617 def test_locator(self
):
618 self
.ioctx
.set_locator_key("bar")
619 self
.ioctx
.write('foo', b
'contents1')
620 objects
= [i
for i
in self
.ioctx
.list_objects()]
622 eq(self
.ioctx
.get_locator_key(), "bar")
623 self
.ioctx
.set_locator_key("")
625 objects
[0].write(b
"contents2")
626 eq(self
.ioctx
.get_locator_key(), "")
627 self
.ioctx
.set_locator_key("bar")
628 contents
= self
.ioctx
.read("foo")
629 eq(contents
, b
"contents2")
630 eq(self
.ioctx
.get_locator_key(), "bar")
632 objects
= [i
for i
in self
.ioctx
.list_objects()]
634 self
.ioctx
.set_locator_key("")
636 def test_operate_aio_write_op(self
):
637 lock
= threading
.Condition()
644 with
WriteOpCtx() as write_op
:
645 write_op
.write(b
'rzx')
646 comp
= self
.ioctx
.operate_aio_write_op(write_op
, "object", cb
, cb
)
647 comp
.wait_for_complete()
651 eq(comp
.get_return_value(), 0)
652 eq(self
.ioctx
.read('object'), b
'rzx')
654 def test_aio_write(self
):
655 lock
= threading
.Condition()
662 comp
= self
.ioctx
.aio_write("foo", b
"bar", 0, cb
, cb
)
663 comp
.wait_for_complete()
667 eq(comp
.get_return_value(), 0)
668 contents
= self
.ioctx
.read("foo")
670 [i
.remove() for i
in self
.ioctx
.list_objects()]
672 def test_aio_write_no_comp_ref(self
):
673 lock
= threading
.Condition()
680 # NOTE(sileht): force don't save the comp into local var
681 # to ensure all references are correctly tracked into the lib
682 self
.ioctx
.aio_write("foo", b
"bar", 0, cb
, cb
)
686 contents
= self
.ioctx
.read("foo")
688 [i
.remove() for i
in self
.ioctx
.list_objects()]
690 def test_aio_append(self
):
691 lock
= threading
.Condition()
698 comp
= self
.ioctx
.aio_write("foo", b
"bar", 0, cb
, cb
)
699 comp2
= self
.ioctx
.aio_append("foo", b
"baz", cb
, cb
)
700 comp
.wait_for_complete()
701 contents
= self
.ioctx
.read("foo")
702 eq(contents
, b
"barbaz")
706 eq(comp
.get_return_value(), 0)
707 eq(comp2
.get_return_value(), 0)
708 [i
.remove() for i
in self
.ioctx
.list_objects()]
710 def test_aio_write_full(self
):
711 lock
= threading
.Condition()
718 self
.ioctx
.aio_write("foo", b
"barbaz", 0, cb
, cb
)
719 comp
= self
.ioctx
.aio_write_full("foo", b
"bar", cb
, cb
)
720 comp
.wait_for_complete()
724 eq(comp
.get_return_value(), 0)
725 contents
= self
.ioctx
.read("foo")
727 [i
.remove() for i
in self
.ioctx
.list_objects()]
729 def test_aio_writesame(self
):
730 lock
= threading
.Condition()
737 comp
= self
.ioctx
.aio_writesame("abc", b
"rzx", 9, 0, cb
)
738 comp
.wait_for_complete()
742 eq(comp
.get_return_value(), 0)
743 eq(self
.ioctx
.read("abc"), b
"rzxrzxrzx")
744 [i
.remove() for i
in self
.ioctx
.list_objects()]
746 def test_aio_stat(self
):
747 lock
= threading
.Condition()
749 def cb(_
, size
, mtime
):
754 comp
= self
.ioctx
.aio_stat("foo", cb
)
755 comp
.wait_for_complete()
759 eq(comp
.get_return_value(), -2)
761 self
.ioctx
.write("foo", b
"bar")
763 comp
= self
.ioctx
.aio_stat("foo", cb
)
764 comp
.wait_for_complete()
768 eq(comp
.get_return_value(), 0)
770 [i
.remove() for i
in self
.ioctx
.list_objects()]
772 def test_aio_remove(self
):
773 lock
= threading
.Condition()
780 self
.ioctx
.write('foo', b
'wrx')
781 eq(self
.ioctx
.read('foo'), b
'wrx')
782 comp
= self
.ioctx
.aio_remove('foo', cb
, cb
)
783 comp
.wait_for_complete()
787 eq(comp
.get_return_value(), 0)
788 eq(list(self
.ioctx
.list_objects()), [])
790 def _take_down_acting_set(self
, pool
, objectname
):
791 # find acting_set for pool:objectname and take it down; used to
792 # verify that async reads don't complete while acting set is missing
799 r
, jsonout
, _
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
800 objmap
= json
.loads(jsonout
.decode("utf-8"))
801 acting_set
= objmap
['acting']
802 cmd
= {"prefix":"osd set", "key":"noup"}
803 r
, _
, _
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
805 cmd
= {"prefix":"osd down", "ids":[str(i
) for i
in acting_set
]}
806 r
, _
, _
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
809 # wait for OSDs to acknowledge the down
810 eq(self
.rados
.wait_for_latest_osdmap(), 0)
812 def _let_osds_back_up(self
):
813 cmd
= {"prefix":"osd unset", "key":"noup"}
814 r
, _
, _
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
817 def test_aio_read(self
):
818 # this is a list so that the local cb() can modify it
820 lock
= threading
.Condition()
825 payload
= b
"bar\000frob"
826 self
.ioctx
.write("foo", payload
)
828 # test1: use wait_for_complete() and wait for cb by
830 self
._take
_down
_acting
_set
('test_pool', 'foo')
831 comp
= self
.ioctx
.aio_read("foo", len(payload
), 0, cb
)
832 eq(False, comp
.is_complete())
834 eq(False, comp
.is_complete())
837 self
._let
_osds
_back
_up
()
838 comp
.wait_for_complete()
841 while retval
[0] is None and loops
<= 10:
846 eq(retval
[0], payload
)
847 eq(sys
.getrefcount(comp
), 2)
849 # test2: use wait_for_complete_and_cb(), verify retval[0] is
850 # set by the time we regain control
853 self
._take
_down
_acting
_set
('test_pool', 'foo')
854 comp
= self
.ioctx
.aio_read("foo", len(payload
), 0, cb
)
855 eq(False, comp
.is_complete())
857 eq(False, comp
.is_complete())
860 self
._let
_osds
_back
_up
()
862 comp
.wait_for_complete_and_cb()
863 assert(retval
[0] is not None)
864 eq(retval
[0], payload
)
865 eq(sys
.getrefcount(comp
), 2)
867 # test3: error case, use wait_for_complete_and_cb(), verify retval[0] is
868 # set by the time we regain control
871 self
._take
_down
_acting
_set
('test_pool', 'bar')
872 comp
= self
.ioctx
.aio_read("bar", len(payload
), 0, cb
)
873 eq(False, comp
.is_complete())
875 eq(False, comp
.is_complete())
878 self
._let
_osds
_back
_up
()
880 comp
.wait_for_complete_and_cb()
882 assert(comp
.get_return_value() < 0)
883 eq(sys
.getrefcount(comp
), 2)
885 [i
.remove() for i
in self
.ioctx
.list_objects()]
888 self
.ioctx
.lock_exclusive("foo", "lock", "locker", "desc_lock",
890 assert_raises(ObjectExists
,
891 self
.ioctx
.lock_exclusive
,
892 "foo", "lock", "locker", "desc_lock", 10000, 0)
893 self
.ioctx
.unlock("foo", "lock", "locker")
894 assert_raises(ObjectNotFound
, self
.ioctx
.unlock
, "foo", "lock", "locker")
896 self
.ioctx
.lock_shared("foo", "lock", "locker1", "tag", "desc_lock",
898 self
.ioctx
.lock_shared("foo", "lock", "locker2", "tag", "desc_lock",
900 assert_raises(ObjectBusy
,
901 self
.ioctx
.lock_exclusive
,
902 "foo", "lock", "locker3", "desc_lock", 10000, 0)
903 self
.ioctx
.unlock("foo", "lock", "locker1")
904 self
.ioctx
.unlock("foo", "lock", "locker2")
905 assert_raises(ObjectNotFound
, self
.ioctx
.unlock
, "foo", "lock", "locker1")
906 assert_raises(ObjectNotFound
, self
.ioctx
.unlock
, "foo", "lock", "locker2")
908 def test_execute(self
):
909 self
.ioctx
.write("foo", b
"") # ensure object exists
911 ret
, buf
= self
.ioctx
.execute("foo", "hello", "say_hello", b
"")
912 eq(buf
, b
"Hello, world!")
914 ret
, buf
= self
.ioctx
.execute("foo", "hello", "say_hello", b
"nose")
915 eq(buf
, b
"Hello, nose!")
917 def test_aio_execute(self
):
920 lock
= threading
.Condition()
923 if retval
[0] is None:
927 self
.ioctx
.write("foo", b
"") # ensure object exists
929 comp
= self
.ioctx
.aio_execute("foo", "hello", "say_hello", b
"", 32, cb
, cb
)
930 comp
.wait_for_complete()
934 eq(comp
.get_return_value(), 13)
935 eq(retval
[0], b
"Hello, world!")
938 comp
= self
.ioctx
.aio_execute("foo", "hello", "say_hello", b
"nose", 32, cb
, cb
)
939 comp
.wait_for_complete()
943 eq(comp
.get_return_value(), 12)
944 eq(retval
[0], b
"Hello, nose!")
946 [i
.remove() for i
in self
.ioctx
.list_objects()]
948 def test_applications(self
):
949 cmd
= {"prefix":"osd dump", "format":"json"}
950 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
953 release
= json
.loads(buf
.decode("utf-8")).get("require_osd_release",
955 if not release
or release
[0] < 'l':
958 eq([], self
.ioctx
.application_list())
960 self
.ioctx
.application_enable("app1")
961 assert_raises(Error
, self
.ioctx
.application_enable
, "app2")
962 self
.ioctx
.application_enable("app2", True)
964 assert_raises(Error
, self
.ioctx
.application_metadata_list
, "dne")
965 eq([], self
.ioctx
.application_metadata_list("app1"))
967 assert_raises(Error
, self
.ioctx
.application_metadata_set
, "dne", "key",
969 self
.ioctx
.application_metadata_set("app1", "key1", "val1")
970 eq("val1", self
.ioctx
.application_metadata_get("app1", "key1"))
971 self
.ioctx
.application_metadata_set("app1", "key2", "val2")
972 eq("val2", self
.ioctx
.application_metadata_get("app1", "key2"))
973 self
.ioctx
.application_metadata_set("app2", "key1", "val1")
974 eq("val1", self
.ioctx
.application_metadata_get("app2", "key1"))
976 eq([("key1", "val1"), ("key2", "val2")],
977 self
.ioctx
.application_metadata_list("app1"))
979 self
.ioctx
.application_metadata_remove("app1", "key1")
980 eq([("key2", "val2")], self
.ioctx
.application_metadata_list("app1"))
982 def test_service_daemon(self
):
983 name
= "pid-" + str(os
.getpid())
984 metadata
= {'version': '3.14', 'memory': '42'}
985 self
.rados
.service_daemon_register("laundry", name
, metadata
)
986 status
= {'result': 'unknown', 'test': 'running'}
987 self
.rados
.service_daemon_update(status
)
989 def test_alignment(self
):
990 eq(self
.ioctx
.alignment(), None)
993 class TestIoctxEc(object):
996 self
.rados
= Rados(conffile
='')
998 self
.pool
= 'test-ec'
999 self
.profile
= 'testprofile-%s' % self
.pool
1000 cmd
= {"prefix": "osd erasure-code-profile set",
1001 "name": self
.profile
, "profile": ["k=2", "m=1", "crush-failure-domain=osd"]}
1002 ret
, buf
, out
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
1004 # create ec pool with profile created above
1005 cmd
= {'prefix': 'osd pool create', 'pg_num': 8, 'pgp_num': 8,
1006 'pool': self
.pool
, 'pool_type': 'erasure',
1007 'erasure_code_profile': self
.profile
}
1008 ret
, buf
, out
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
1010 assert self
.rados
.pool_exists(self
.pool
)
1011 self
.ioctx
= self
.rados
.open_ioctx(self
.pool
)
1014 cmd
= {"prefix": "osd unset", "key": "noup"}
1015 self
.rados
.mon_command(json
.dumps(cmd
), b
'')
1017 self
.rados
.delete_pool(self
.pool
)
1018 self
.rados
.shutdown()
1020 def test_alignment(self
):
1021 eq(self
.ioctx
.alignment(), 8192)
1024 class TestIoctx2(object):
1027 self
.rados
= Rados(conffile
='')
1028 self
.rados
.connect()
1029 self
.rados
.create_pool('test_pool')
1030 assert self
.rados
.pool_exists('test_pool')
1031 pool_id
= self
.rados
.pool_lookup('test_pool')
1033 self
.ioctx2
= self
.rados
.open_ioctx2(pool_id
)
1036 cmd
= {"prefix": "osd unset", "key": "noup"}
1037 self
.rados
.mon_command(json
.dumps(cmd
), b
'')
1039 self
.rados
.delete_pool('test_pool')
1040 self
.rados
.shutdown()
1042 def test_get_last_version(self
):
1043 version
= self
.ioctx2
.get_last_version()
1046 def test_get_stats(self
):
1047 stats
= self
.ioctx2
.get_stats()
1048 eq(stats
, {'num_objects_unfound': 0,
1049 'num_objects_missing_on_primary': 0,
1050 'num_object_clones': 0,
1052 'num_object_copies': 0,
1058 'num_objects_degraded': 0,
1062 class TestObject(object):
1065 self
.rados
= Rados(conffile
='')
1066 self
.rados
.connect()
1067 self
.rados
.create_pool('test_pool')
1068 assert self
.rados
.pool_exists('test_pool')
1069 self
.ioctx
= self
.rados
.open_ioctx('test_pool')
1070 self
.ioctx
.write('foo', b
'bar')
1071 self
.object = Object(self
.ioctx
, 'foo')
1076 self
.rados
.delete_pool('test_pool')
1077 self
.rados
.shutdown()
1080 def test_read(self
):
1081 eq(self
.object.read(3), b
'bar')
1082 eq(self
.object.read(100), b
'')
1084 def test_seek(self
):
1085 self
.object.write(b
'blah')
1087 eq(self
.object.read(4), b
'blah')
1089 eq(self
.object.read(3), b
'lah')
1091 def test_write(self
):
1092 self
.object.write(b
'barbaz')
1094 eq(self
.object.read(3), b
'bar')
1095 eq(self
.object.read(3), b
'baz')
1097 class TestIoCtxSelfManagedSnaps(object):
1099 self
.rados
= Rados(conffile
='')
1100 self
.rados
.connect()
1101 self
.rados
.create_pool('test_pool')
1102 assert self
.rados
.pool_exists('test_pool')
1103 self
.ioctx
= self
.rados
.open_ioctx('test_pool')
1106 cmd
= {"prefix":"osd unset", "key":"noup"}
1107 self
.rados
.mon_command(json
.dumps(cmd
), b
'')
1109 self
.rados
.delete_pool('test_pool')
1110 self
.rados
.shutdown()
1113 # cannot mix-and-match pool and self-managed snapshot mode
1114 self
.ioctx
.set_self_managed_snap_write([])
1115 self
.ioctx
.write('abc', b
'abc')
1116 snap_id_1
= self
.ioctx
.create_self_managed_snap()
1117 self
.ioctx
.set_self_managed_snap_write([snap_id_1
])
1119 self
.ioctx
.write('abc', b
'def')
1120 snap_id_2
= self
.ioctx
.create_self_managed_snap()
1121 self
.ioctx
.set_self_managed_snap_write([snap_id_1
, snap_id_2
])
1123 self
.ioctx
.write('abc', b
'ghi')
1125 self
.ioctx
.rollback_self_managed_snap('abc', snap_id_1
)
1126 eq(self
.ioctx
.read('abc'), b
'abc')
1128 self
.ioctx
.rollback_self_managed_snap('abc', snap_id_2
)
1129 eq(self
.ioctx
.read('abc'), b
'def')
1131 self
.ioctx
.remove_self_managed_snap(snap_id_1
)
1132 self
.ioctx
.remove_self_managed_snap(snap_id_2
)
1134 class TestCommand(object):
1137 self
.rados
= Rados(conffile
='')
1138 self
.rados
.connect()
1141 self
.rados
.shutdown()
1143 def test_monmap_dump(self
):
1145 # check for success and some plain output with epoch in it
1146 cmd
= {"prefix":"mon dump"}
1147 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
1150 assert(b
'epoch' in buf
)
1152 # JSON, and grab current epoch
1153 cmd
['format'] = 'json'
1154 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
1157 d
= json
.loads(buf
.decode("utf-8"))
1158 assert('epoch' in d
)
1161 # assume epoch + 1000 does not exist; test for ENOENT
1162 cmd
['epoch'] = epoch
+ 1000
1163 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30)
1164 eq(ret
, -errno
.ENOENT
)
1168 # send to specific target by name, rank
1169 cmd
= {"prefix": "version"}
1171 target
= d
['mons'][0]['name']
1173 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30,
1177 e
= json
.loads(buf
.decode("utf-8"))
1178 assert('release' in e
)
1180 target
= d
['mons'][0]['rank']
1182 ret
, buf
, errs
= self
.rados
.mon_command(json
.dumps(cmd
), b
'', timeout
=30,
1186 e
= json
.loads(buf
.decode("utf-8"))
1187 assert('release' in e
)
1189 def test_osd_bench(self
):
1190 cmd
= dict(prefix
='bench', size
=4096, count
=8192)
1191 ret
, buf
, err
= self
.rados
.osd_command(0, json
.dumps(cmd
), b
'',
1195 out
= json
.loads(buf
.decode('utf-8'))
1196 eq(out
['blocksize'], cmd
['size'])
1197 eq(out
['bytes_written'], cmd
['count'])
1199 def test_ceph_osd_pool_create_utf8(self
):
1201 # Use encoded bytestring
1202 poolname
= b
"\351\273\205"
1206 cmd
= {"prefix": "osd pool create", "pg_num": 16, "pool": poolname
}
1207 ret
, buf
, out
= self
.rados
.mon_command(json
.dumps(cmd
), b
'')
1210 eq(u
"pool '\u9ec5' created", out
)