]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/pybind/test_rados.py
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / test / pybind / test_rados.py
1 from __future__ import print_function
2 from nose.tools import eq_ as eq, ok_ as ok, assert_raises
3 from rados import (Rados, Error, RadosStateError, Object, ObjectExists,
4 ObjectNotFound, ObjectBusy, requires, opt,
5 ANONYMOUS_AUID, ADMIN_AUID, LIBRADOS_ALL_NSPACES, WriteOpCtx, ReadOpCtx,
6 LIBRADOS_SNAP_HEAD, LIBRADOS_OPERATION_BALANCE_READS, LIBRADOS_OPERATION_SKIPRWLOCKS, MonitorLog)
7 import time
8 import threading
9 import json
10 import errno
11 import sys
12
13 # Are we running Python 2.x
14 _python2 = sys.version_info[0] < 3
15
16 def test_rados_init_error():
17 assert_raises(Error, Rados, conffile='', rados_id='admin',
18 name='client.admin')
19 assert_raises(Error, Rados, conffile='', name='invalid')
20 assert_raises(Error, Rados, conffile='', name='bad.invalid')
21
22 def test_rados_init():
23 with Rados(conffile='', rados_id='admin'):
24 pass
25 with Rados(conffile='', name='client.admin'):
26 pass
27 with Rados(conffile='', name='client.admin'):
28 pass
29 with Rados(conffile='', name='client.admin'):
30 pass
31
32 def test_ioctx_context_manager():
33 with Rados(conffile='', rados_id='admin') as conn:
34 with conn.open_ioctx('rbd') as ioctx:
35 pass
36
37 def test_parse_argv():
38 args = ['osd', 'pool', 'delete', 'foobar', 'foobar', '--yes-i-really-really-mean-it']
39 r = Rados()
40 eq(args, r.conf_parse_argv(args))
41
42 def test_parse_argv_empty_str():
43 args = ['']
44 r = Rados()
45 eq(args, r.conf_parse_argv(args))
46
47 class TestRequires(object):
48 @requires(('foo', str), ('bar', int), ('baz', int))
49 def _method_plain(self, foo, bar, baz):
50 ok(isinstance(foo, str))
51 ok(isinstance(bar, int))
52 ok(isinstance(baz, int))
53 return (foo, bar, baz)
54
55 def test_method_plain(self):
56 assert_raises(TypeError, self._method_plain, 42, 42, 42)
57 assert_raises(TypeError, self._method_plain, '42', '42', '42')
58 assert_raises(TypeError, self._method_plain, foo='42', bar='42', baz='42')
59 eq(self._method_plain('42', 42, 42), ('42', 42, 42))
60 eq(self._method_plain(foo='42', bar=42, baz=42), ('42', 42, 42))
61
62 @requires(('opt_foo', opt(str)), ('opt_bar', opt(int)), ('baz', int))
63 def _method_with_opt_arg(self, foo, bar, baz):
64 ok(isinstance(foo, str) or foo is None)
65 ok(isinstance(bar, int) or bar is None)
66 ok(isinstance(baz, int))
67 return (foo, bar, baz)
68
69 def test_method_with_opt_args(self):
70 assert_raises(TypeError, self._method_with_opt_arg, 42, 42, 42)
71 assert_raises(TypeError, self._method_with_opt_arg, '42', '42', 42)
72 assert_raises(TypeError, self._method_with_opt_arg, None, None, None)
73 eq(self._method_with_opt_arg(None, 42, 42), (None, 42, 42))
74 eq(self._method_with_opt_arg('42', None, 42), ('42', None, 42))
75 eq(self._method_with_opt_arg(None, None, 42), (None, None, 42))
76
77
78 class TestRadosStateError(object):
79 def _requires_configuring(self, rados):
80 assert_raises(RadosStateError, rados.connect)
81
82 def _requires_configuring_or_connected(self, rados):
83 assert_raises(RadosStateError, rados.conf_read_file)
84 assert_raises(RadosStateError, rados.conf_parse_argv, None)
85 assert_raises(RadosStateError, rados.conf_parse_env)
86 assert_raises(RadosStateError, rados.conf_get, 'opt')
87 assert_raises(RadosStateError, rados.conf_set, 'opt', 'val')
88 assert_raises(RadosStateError, rados.ping_monitor, 0)
89
90 def _requires_connected(self, rados):
91 assert_raises(RadosStateError, rados.pool_exists, 'foo')
92 assert_raises(RadosStateError, rados.pool_lookup, 'foo')
93 assert_raises(RadosStateError, rados.pool_reverse_lookup, 0)
94 assert_raises(RadosStateError, rados.create_pool, 'foo')
95 assert_raises(RadosStateError, rados.get_pool_base_tier, 0)
96 assert_raises(RadosStateError, rados.delete_pool, 'foo')
97 assert_raises(RadosStateError, rados.list_pools)
98 assert_raises(RadosStateError, rados.get_fsid)
99 assert_raises(RadosStateError, rados.open_ioctx, 'foo')
100 assert_raises(RadosStateError, rados.mon_command, '', b'')
101 assert_raises(RadosStateError, rados.osd_command, 0, '', b'')
102 assert_raises(RadosStateError, rados.pg_command, '', '', b'')
103 assert_raises(RadosStateError, rados.wait_for_latest_osdmap)
104 assert_raises(RadosStateError, rados.blacklist_add, '127.0.0.1/123', 0)
105
106 def test_configuring(self):
107 rados = Rados(conffile='')
108 eq('configuring', rados.state)
109 self._requires_connected(rados)
110
111 def test_connected(self):
112 rados = Rados(conffile='')
113 with rados:
114 eq('connected', rados.state)
115 self._requires_configuring(rados)
116
117 def test_shutdown(self):
118 rados = Rados(conffile='')
119 with rados:
120 pass
121 eq('shutdown', rados.state)
122 self._requires_configuring(rados)
123 self._requires_configuring_or_connected(rados)
124 self._requires_connected(rados)
125
126
127 class TestRados(object):
128
129 def setUp(self):
130 self.rados = Rados(conffile='')
131 self.rados.conf_parse_env('FOO_DOES_NOT_EXIST_BLAHBLAH')
132 self.rados.conf_parse_env()
133 self.rados.connect()
134
135 # Assume any pre-existing pools are the cluster's defaults
136 self.default_pools = self.rados.list_pools()
137
138 def tearDown(self):
139 self.rados.shutdown()
140
141 def test_ping_monitor(self):
142 assert_raises(ObjectNotFound, self.rados.ping_monitor, 'not_exists_monitor')
143 cmd = {'prefix': 'mon dump', 'format':'json'}
144 ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'')
145 for mon in json.loads(buf.decode('utf8'))['mons']:
146 while True:
147 output = self.rados.ping_monitor(mon['name'])
148 if output is None:
149 continue
150 buf = json.loads(output)
151 if buf.get('health'):
152 break
153
154 def test_create(self):
155 self.rados.create_pool('foo')
156 self.rados.delete_pool('foo')
157
158 def test_create_utf8(self):
159 if _python2:
160 # Use encoded bytestring
161 poolname = b"\351\273\204"
162 else:
163 poolname = "\u9ec4"
164 self.rados.create_pool(poolname)
165 assert self.rados.pool_exists(u"\u9ec4")
166 self.rados.delete_pool(poolname)
167
168 def test_pool_lookup_utf8(self):
169 if _python2:
170 poolname = u'\u9ec4'
171 else:
172 poolname = '\u9ec4'
173 self.rados.create_pool(poolname)
174 try:
175 poolid = self.rados.pool_lookup(poolname)
176 eq(poolname, self.rados.pool_reverse_lookup(poolid))
177 finally:
178 self.rados.delete_pool(poolname)
179
180 def test_create_auid(self):
181 self.rados.create_pool('foo', 100)
182 assert self.rados.pool_exists('foo')
183 self.rados.delete_pool('foo')
184
185 def test_eexist(self):
186 self.rados.create_pool('foo')
187 assert_raises(ObjectExists, self.rados.create_pool, 'foo')
188 self.rados.delete_pool('foo')
189
190 def list_non_default_pools(self):
191 pools = self.rados.list_pools()
192 for p in self.default_pools:
193 pools.remove(p)
194 return set(pools)
195
196 def test_list_pools(self):
197 eq(set(), self.list_non_default_pools())
198 self.rados.create_pool('foo')
199 eq(set(['foo']), self.list_non_default_pools())
200 self.rados.create_pool('bar')
201 eq(set(['foo', 'bar']), self.list_non_default_pools())
202 self.rados.create_pool('baz')
203 eq(set(['foo', 'bar', 'baz']), self.list_non_default_pools())
204 self.rados.delete_pool('foo')
205 eq(set(['bar', 'baz']), self.list_non_default_pools())
206 self.rados.delete_pool('baz')
207 eq(set(['bar']), self.list_non_default_pools())
208 self.rados.delete_pool('bar')
209 eq(set(), self.list_non_default_pools())
210 self.rados.create_pool('a' * 500)
211 eq(set(['a' * 500]), self.list_non_default_pools())
212 self.rados.delete_pool('a' * 500)
213
214 def test_get_pool_base_tier(self):
215 self.rados.create_pool('foo')
216 try:
217 self.rados.create_pool('foo-cache')
218 try:
219 pool_id = self.rados.pool_lookup('foo')
220 tier_pool_id = self.rados.pool_lookup('foo-cache')
221
222 cmd = {"prefix":"osd tier add", "pool":"foo", "tierpool":"foo-cache", "force_nonempty":""}
223 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
224 eq(ret, 0)
225
226 try:
227 cmd = {"prefix":"osd tier cache-mode", "pool":"foo-cache", "tierpool":"foo-cache", "mode":"readonly", "sure":"--yes-i-really-mean-it"}
228 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
229 eq(ret, 0)
230
231 eq(self.rados.wait_for_latest_osdmap(), 0)
232
233 eq(pool_id, self.rados.get_pool_base_tier(pool_id))
234 eq(pool_id, self.rados.get_pool_base_tier(tier_pool_id))
235 finally:
236 cmd = {"prefix":"osd tier remove", "pool":"foo", "tierpool":"foo-cache"}
237 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
238 eq(ret, 0)
239 finally:
240 self.rados.delete_pool('foo-cache')
241 finally:
242 self.rados.delete_pool('foo')
243
244 def test_get_fsid(self):
245 fsid = self.rados.get_fsid()
246 eq(len(fsid), 36)
247
248 def test_blacklist_add(self):
249 self.rados.blacklist_add("1.2.3.4/123", 1)
250
251 def test_get_cluster_stats(self):
252 stats = self.rados.get_cluster_stats()
253 assert stats['kb'] > 0
254 assert stats['kb_avail'] > 0
255 assert stats['kb_used'] > 0
256 assert stats['num_objects'] >= 0
257
258 def test_monitor_log(self):
259 lock = threading.Condition()
260 def cb(arg, line, who, sec, nsec, seq, level, msg):
261 # NOTE(sileht): the old pyrados API was received the pointer as int
262 # instead of the value of arg
263 eq(arg, "arg")
264 with lock:
265 lock.notify()
266 return 0
267
268 # NOTE(sileht): force don't save the monitor into local var
269 # to ensure all references are correctly tracked into the lib
270 MonitorLog(self.rados, "debug", cb, "arg")
271 with lock:
272 lock.wait()
273 MonitorLog(self.rados, "debug", None, None)
274 eq(None, self.rados.monitor_callback)
275
276 class TestIoctx(object):
277
278 def setUp(self):
279 self.rados = Rados(conffile='')
280 self.rados.connect()
281 self.rados.create_pool('test_pool')
282 assert self.rados.pool_exists('test_pool')
283 self.ioctx = self.rados.open_ioctx('test_pool')
284
285 def tearDown(self):
286 cmd = {"prefix":"osd unset", "key":"noup"}
287 self.rados.mon_command(json.dumps(cmd), b'')
288 self.ioctx.close()
289 self.rados.delete_pool('test_pool')
290 self.rados.shutdown()
291
292 def test_get_last_version(self):
293 version = self.ioctx.get_last_version()
294 assert version >= 0
295
296 def test_get_stats(self):
297 stats = self.ioctx.get_stats()
298 eq(stats, {'num_objects_unfound': 0,
299 'num_objects_missing_on_primary': 0,
300 'num_object_clones': 0,
301 'num_objects': 0,
302 'num_object_copies': 0,
303 'num_bytes': 0,
304 'num_rd_kb': 0,
305 'num_wr_kb': 0,
306 'num_kb': 0,
307 'num_wr': 0,
308 'num_objects_degraded': 0,
309 'num_rd': 0})
310
311 def test_change_auid(self):
312 self.ioctx.change_auid(ANONYMOUS_AUID)
313 self.ioctx.change_auid(ADMIN_AUID)
314
315 def test_write(self):
316 self.ioctx.write('abc', b'abc')
317 eq(self.ioctx.read('abc'), b'abc')
318
319 def test_write_full(self):
320 self.ioctx.write('abc', b'abc')
321 eq(self.ioctx.read('abc'), b'abc')
322 self.ioctx.write_full('abc', b'd')
323 eq(self.ioctx.read('abc'), b'd')
324
325 def test_append(self):
326 self.ioctx.write('abc', b'a')
327 self.ioctx.append('abc', b'b')
328 self.ioctx.append('abc', b'c')
329 eq(self.ioctx.read('abc'), b'abc')
330
331 def test_write_zeros(self):
332 self.ioctx.write('abc', b'a\0b\0c')
333 eq(self.ioctx.read('abc'), b'a\0b\0c')
334
335 def test_trunc(self):
336 self.ioctx.write('abc', b'abc')
337 self.ioctx.trunc('abc', 2)
338 eq(self.ioctx.read('abc'), b'ab')
339 size = self.ioctx.stat('abc')[0]
340 eq(size, 2)
341
342 def test_list_objects_empty(self):
343 eq(list(self.ioctx.list_objects()), [])
344
345 def test_list_objects(self):
346 self.ioctx.write('a', b'')
347 self.ioctx.write('b', b'foo')
348 self.ioctx.write_full('c', b'bar')
349 self.ioctx.append('d', b'jazz')
350 object_names = [obj.key for obj in self.ioctx.list_objects()]
351 eq(sorted(object_names), ['a', 'b', 'c', 'd'])
352
353 def test_list_ns_objects(self):
354 self.ioctx.write('a', b'')
355 self.ioctx.write('b', b'foo')
356 self.ioctx.write_full('c', b'bar')
357 self.ioctx.append('d', b'jazz')
358 self.ioctx.set_namespace("ns1")
359 self.ioctx.write('ns1-a', b'')
360 self.ioctx.write('ns1-b', b'foo')
361 self.ioctx.write_full('ns1-c', b'bar')
362 self.ioctx.append('ns1-d', b'jazz')
363 self.ioctx.append('d', b'jazz')
364 self.ioctx.set_namespace(LIBRADOS_ALL_NSPACES)
365 object_names = [(obj.nspace, obj.key) for obj in self.ioctx.list_objects()]
366 eq(sorted(object_names), [('', 'a'), ('','b'), ('','c'), ('','d'),\
367 ('ns1', 'd'), ('ns1', 'ns1-a'), ('ns1', 'ns1-b'),\
368 ('ns1', 'ns1-c'), ('ns1', 'ns1-d')])
369
370 def test_xattrs(self):
371 xattrs = dict(a=b'1', b=b'2', c=b'3', d=b'a\0b', e=b'\0')
372 self.ioctx.write('abc', b'')
373 for key, value in xattrs.items():
374 self.ioctx.set_xattr('abc', key, value)
375 eq(self.ioctx.get_xattr('abc', key), value)
376 stored_xattrs = {}
377 for key, value in self.ioctx.get_xattrs('abc'):
378 stored_xattrs[key] = value
379 eq(stored_xattrs, xattrs)
380
381 def test_obj_xattrs(self):
382 xattrs = dict(a=b'1', b=b'2', c=b'3', d=b'a\0b', e=b'\0')
383 self.ioctx.write('abc', b'')
384 obj = list(self.ioctx.list_objects())[0]
385 for key, value in xattrs.items():
386 obj.set_xattr(key, value)
387 eq(obj.get_xattr(key), value)
388 stored_xattrs = {}
389 for key, value in obj.get_xattrs():
390 stored_xattrs[key] = value
391 eq(stored_xattrs, xattrs)
392
393 def test_create_snap(self):
394 assert_raises(ObjectNotFound, self.ioctx.remove_snap, 'foo')
395 self.ioctx.create_snap('foo')
396 self.ioctx.remove_snap('foo')
397
398 def test_list_snaps_empty(self):
399 eq(list(self.ioctx.list_snaps()), [])
400
401 def test_list_snaps(self):
402 snaps = ['snap1', 'snap2', 'snap3']
403 for snap in snaps:
404 self.ioctx.create_snap(snap)
405 listed_snaps = [snap.name for snap in self.ioctx.list_snaps()]
406 eq(snaps, listed_snaps)
407
408 def test_lookup_snap(self):
409 self.ioctx.create_snap('foo')
410 snap = self.ioctx.lookup_snap('foo')
411 eq(snap.name, 'foo')
412
413 def test_snap_timestamp(self):
414 self.ioctx.create_snap('foo')
415 snap = self.ioctx.lookup_snap('foo')
416 snap.get_timestamp()
417
418 def test_remove_snap(self):
419 self.ioctx.create_snap('foo')
420 (snap,) = self.ioctx.list_snaps()
421 eq(snap.name, 'foo')
422 self.ioctx.remove_snap('foo')
423 eq(list(self.ioctx.list_snaps()), [])
424
425 def test_snap_rollback(self):
426 self.ioctx.write("insnap", b"contents1")
427 self.ioctx.create_snap("snap1")
428 self.ioctx.remove_object("insnap")
429 self.ioctx.snap_rollback("insnap", "snap1")
430 eq(self.ioctx.read("insnap"), b"contents1")
431 self.ioctx.remove_snap("snap1")
432 self.ioctx.remove_object("insnap")
433
434 def test_snap_read(self):
435 self.ioctx.write("insnap", b"contents1")
436 self.ioctx.create_snap("snap1")
437 self.ioctx.remove_object("insnap")
438 snap = self.ioctx.lookup_snap("snap1")
439 self.ioctx.set_read(snap.snap_id)
440 eq(self.ioctx.read("insnap"), b"contents1")
441 self.ioctx.set_read(LIBRADOS_SNAP_HEAD)
442 self.ioctx.write("inhead", b"contents2")
443 eq(self.ioctx.read("inhead"), b"contents2")
444 self.ioctx.remove_snap("snap1")
445 self.ioctx.remove_object("inhead")
446
447 def test_set_omap(self):
448 keys = ("1", "2", "3", "4")
449 values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
450 with WriteOpCtx(self.ioctx) as write_op:
451 self.ioctx.set_omap(write_op, keys, values)
452 write_op.set_flags(LIBRADOS_OPERATION_SKIPRWLOCKS)
453 self.ioctx.operate_write_op(write_op, "hw")
454 with ReadOpCtx(self.ioctx) as read_op:
455 iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 4)
456 eq(ret, 0)
457 self.ioctx.operate_read_op(read_op, "hw")
458 next(iter)
459 eq(list(iter), [("2", b"bbb"), ("3", b"ccc"), ("4", b"\x04\x04\x04\x04")])
460 with ReadOpCtx(self.ioctx) as read_op:
461 iter, ret = self.ioctx.get_omap_vals(read_op, "2", "", 4)
462 eq(ret, 0)
463 self.ioctx.operate_read_op(read_op, "hw")
464 eq(("3", b"ccc"), next(iter))
465 eq(list(iter), [("4", b"\x04\x04\x04\x04")])
466 with ReadOpCtx(self.ioctx) as read_op:
467 iter, ret = self.ioctx.get_omap_vals(read_op, "", "2", 4)
468 eq(ret, 0)
469 read_op.set_flags(LIBRADOS_OPERATION_BALANCE_READS)
470 self.ioctx.operate_read_op(read_op, "hw")
471 eq(list(iter), [("2", b"bbb")])
472
473 def test_set_omap_aio(self):
474 lock = threading.Condition()
475 count = [0]
476 def cb(blah):
477 with lock:
478 count[0] += 1
479 lock.notify()
480 return 0
481
482 keys = ("1", "2", "3", "4")
483 values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
484 with WriteOpCtx(self.ioctx) as write_op:
485 self.ioctx.set_omap(write_op, keys, values)
486 comp = self.ioctx.operate_aio_write_op(write_op, "hw", cb, cb)
487 comp.wait_for_complete()
488 comp.wait_for_safe()
489 with lock:
490 while count[0] < 2:
491 lock.wait()
492 eq(comp.get_return_value(), 0)
493
494 with ReadOpCtx(self.ioctx) as read_op:
495 iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 4)
496 eq(ret, 0)
497 comp = self.ioctx.operate_aio_read_op(read_op, "hw", cb, cb)
498 comp.wait_for_complete()
499 comp.wait_for_safe()
500 with lock:
501 while count[0] < 4:
502 lock.wait()
503 eq(comp.get_return_value(), 0)
504 next(iter)
505 eq(list(iter), [("2", b"bbb"), ("3", b"ccc"), ("4", b"\x04\x04\x04\x04")])
506
507 def test_write_ops(self):
508 with WriteOpCtx(self.ioctx) as write_op:
509 write_op.new(0)
510 self.ioctx.operate_write_op(write_op, "write_ops")
511 eq(self.ioctx.read('write_ops'), b'')
512 write_op.write_full(b'1')
513 write_op.append(b'2')
514 self.ioctx.operate_write_op(write_op, "write_ops")
515 eq(self.ioctx.read('write_ops'), b'12')
516 write_op.write_full(b'12345')
517 write_op.write(b'x', 2)
518 self.ioctx.operate_write_op(write_op, "write_ops")
519 eq(self.ioctx.read('write_ops'), b'12x45')
520 write_op.write_full(b'12345')
521 write_op.zero(2, 2)
522 self.ioctx.operate_write_op(write_op, "write_ops")
523 eq(self.ioctx.read('write_ops'), b'12\x00\x005')
524 write_op.write_full(b'12345')
525 write_op.truncate(2)
526 self.ioctx.operate_write_op(write_op, "write_ops")
527 eq(self.ioctx.read('write_ops'), b'12')
528 write_op.remove()
529 self.ioctx.operate_write_op(write_op, "write_ops")
530 with assert_raises(ObjectNotFound):
531 self.ioctx.read('write_ops')
532
533 def test_get_omap_vals_by_keys(self):
534 keys = ("1", "2", "3", "4")
535 values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
536 with WriteOpCtx(self.ioctx) as write_op:
537 self.ioctx.set_omap(write_op, keys, values)
538 self.ioctx.operate_write_op(write_op, "hw")
539 with ReadOpCtx(self.ioctx) as read_op:
540 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("3","4",))
541 eq(ret, 0)
542 self.ioctx.operate_read_op(read_op, "hw")
543 eq(list(iter), [("3", b"ccc"), ("4", b"\x04\x04\x04\x04")])
544 with ReadOpCtx(self.ioctx) as read_op:
545 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("3","4",))
546 eq(ret, 0)
547 with assert_raises(ObjectNotFound):
548 self.ioctx.operate_read_op(read_op, "no_such")
549
550 def test_get_omap_keys(self):
551 keys = ("1", "2", "3")
552 values = (b"aaa", b"bbb", b"ccc")
553 with WriteOpCtx(self.ioctx) as write_op:
554 self.ioctx.set_omap(write_op, keys, values)
555 self.ioctx.operate_write_op(write_op, "hw")
556 with ReadOpCtx(self.ioctx) as read_op:
557 iter, ret = self.ioctx.get_omap_keys(read_op,"",2)
558 eq(ret, 0)
559 self.ioctx.operate_read_op(read_op, "hw")
560 eq(list(iter), [("1", None), ("2", None)])
561 with ReadOpCtx(self.ioctx) as read_op:
562 iter, ret = self.ioctx.get_omap_keys(read_op,"",2)
563 eq(ret, 0)
564 with assert_raises(ObjectNotFound):
565 self.ioctx.operate_read_op(read_op, "no_such")
566
567 def test_clear_omap(self):
568 keys = ("1", "2", "3")
569 values = (b"aaa", b"bbb", b"ccc")
570 with WriteOpCtx(self.ioctx) as write_op:
571 self.ioctx.set_omap(write_op, keys, values)
572 self.ioctx.operate_write_op(write_op, "hw")
573 with WriteOpCtx(self.ioctx) as write_op_1:
574 self.ioctx.clear_omap(write_op_1)
575 self.ioctx.operate_write_op(write_op_1, "hw")
576 with ReadOpCtx(self.ioctx) as read_op:
577 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("1",))
578 eq(ret, 0)
579 self.ioctx.operate_read_op(read_op, "hw")
580 eq(list(iter), [])
581
582 def test_locator(self):
583 self.ioctx.set_locator_key("bar")
584 self.ioctx.write('foo', b'contents1')
585 objects = [i for i in self.ioctx.list_objects()]
586 eq(len(objects), 1)
587 eq(self.ioctx.get_locator_key(), "bar")
588 self.ioctx.set_locator_key("")
589 objects[0].seek(0)
590 objects[0].write(b"contents2")
591 eq(self.ioctx.get_locator_key(), "")
592 self.ioctx.set_locator_key("bar")
593 contents = self.ioctx.read("foo")
594 eq(contents, b"contents2")
595 eq(self.ioctx.get_locator_key(), "bar")
596 objects[0].remove()
597 objects = [i for i in self.ioctx.list_objects()]
598 eq(objects, [])
599 self.ioctx.set_locator_key("")
600
601 def test_aio_write(self):
602 lock = threading.Condition()
603 count = [0]
604 def cb(blah):
605 with lock:
606 count[0] += 1
607 lock.notify()
608 return 0
609 comp = self.ioctx.aio_write("foo", b"bar", 0, cb, cb)
610 comp.wait_for_complete()
611 comp.wait_for_safe()
612 with lock:
613 while count[0] < 2:
614 lock.wait()
615 eq(comp.get_return_value(), 0)
616 contents = self.ioctx.read("foo")
617 eq(contents, b"bar")
618 [i.remove() for i in self.ioctx.list_objects()]
619
620 def test_aio_write_no_comp_ref(self):
621 lock = threading.Condition()
622 count = [0]
623 def cb(blah):
624 with lock:
625 count[0] += 1
626 lock.notify()
627 return 0
628 # NOTE(sileht): force don't save the comp into local var
629 # to ensure all references are correctly tracked into the lib
630 self.ioctx.aio_write("foo", b"bar", 0, cb, cb)
631 with lock:
632 while count[0] < 2:
633 lock.wait()
634 contents = self.ioctx.read("foo")
635 eq(contents, b"bar")
636 [i.remove() for i in self.ioctx.list_objects()]
637
638 def test_aio_append(self):
639 lock = threading.Condition()
640 count = [0]
641 def cb(blah):
642 with lock:
643 count[0] += 1
644 lock.notify()
645 return 0
646 comp = self.ioctx.aio_write("foo", b"bar", 0, cb, cb)
647 comp2 = self.ioctx.aio_append("foo", b"baz", cb, cb)
648 comp.wait_for_complete()
649 contents = self.ioctx.read("foo")
650 eq(contents, b"barbaz")
651 with lock:
652 while count[0] < 4:
653 lock.wait()
654 eq(comp.get_return_value(), 0)
655 eq(comp2.get_return_value(), 0)
656 [i.remove() for i in self.ioctx.list_objects()]
657
658 def test_aio_write_full(self):
659 lock = threading.Condition()
660 count = [0]
661 def cb(blah):
662 with lock:
663 count[0] += 1
664 lock.notify()
665 return 0
666 self.ioctx.aio_write("foo", b"barbaz", 0, cb, cb)
667 comp = self.ioctx.aio_write_full("foo", b"bar", cb, cb)
668 comp.wait_for_complete()
669 comp.wait_for_safe()
670 with lock:
671 while count[0] < 2:
672 lock.wait()
673 eq(comp.get_return_value(), 0)
674 contents = self.ioctx.read("foo")
675 eq(contents, b"bar")
676 [i.remove() for i in self.ioctx.list_objects()]
677
678 def test_aio_stat(self):
679 lock = threading.Condition()
680 count = [0]
681 def cb(_, size, mtime):
682 with lock:
683 count[0] += 1
684 lock.notify()
685
686 comp = self.ioctx.aio_stat("foo", cb)
687 comp.wait_for_complete()
688 with lock:
689 while count[0] < 1:
690 lock.wait()
691 eq(comp.get_return_value(), -2)
692
693 self.ioctx.write("foo", b"bar")
694
695 comp = self.ioctx.aio_stat("foo", cb)
696 comp.wait_for_complete()
697 with lock:
698 while count[0] < 2:
699 lock.wait()
700 eq(comp.get_return_value(), 0)
701
702 [i.remove() for i in self.ioctx.list_objects()]
703
704 def _take_down_acting_set(self, pool, objectname):
705 # find acting_set for pool:objectname and take it down; used to
706 # verify that async reads don't complete while acting set is missing
707 cmd = {
708 "prefix":"osd map",
709 "pool":pool,
710 "object":objectname,
711 "format":"json",
712 }
713 r, jsonout, _ = self.rados.mon_command(json.dumps(cmd), b'')
714 objmap = json.loads(jsonout.decode("utf-8"))
715 acting_set = objmap['acting']
716 cmd = {"prefix":"osd set", "key":"noup"}
717 r, _, _ = self.rados.mon_command(json.dumps(cmd), b'')
718 eq(r, 0)
719 cmd = {"prefix":"osd down", "ids":[str(i) for i in acting_set]}
720 r, _, _ = self.rados.mon_command(json.dumps(cmd), b'')
721 eq(r, 0)
722
723 # wait for OSDs to acknowledge the down
724 eq(self.rados.wait_for_latest_osdmap(), 0)
725
726 def _let_osds_back_up(self):
727 cmd = {"prefix":"osd unset", "key":"noup"}
728 r, _, _ = self.rados.mon_command(json.dumps(cmd), b'')
729 eq(r, 0)
730
731 def test_aio_read(self):
732 # this is a list so that the local cb() can modify it
733 retval = [None]
734 lock = threading.Condition()
735 def cb(_, buf):
736 with lock:
737 retval[0] = buf
738 lock.notify()
739 payload = b"bar\000frob"
740 self.ioctx.write("foo", payload)
741
742 # test1: use wait_for_complete() and wait for cb by
743 # watching retval[0]
744 self._take_down_acting_set('test_pool', 'foo')
745 comp = self.ioctx.aio_read("foo", len(payload), 0, cb)
746 eq(False, comp.is_complete())
747 time.sleep(3)
748 eq(False, comp.is_complete())
749 with lock:
750 eq(None, retval[0])
751 self._let_osds_back_up()
752 comp.wait_for_complete()
753 loops = 0
754 with lock:
755 while retval[0] is None and loops <= 10:
756 lock.wait(timeout=5)
757 loops += 1
758 assert(loops <= 10)
759
760 eq(retval[0], payload)
761 eq(sys.getrefcount(comp), 2)
762
763 # test2: use wait_for_complete_and_cb(), verify retval[0] is
764 # set by the time we regain control
765
766 retval[0] = None
767 self._take_down_acting_set('test_pool', 'foo')
768 comp = self.ioctx.aio_read("foo", len(payload), 0, cb)
769 eq(False, comp.is_complete())
770 time.sleep(3)
771 eq(False, comp.is_complete())
772 with lock:
773 eq(None, retval[0])
774 self._let_osds_back_up()
775
776 comp.wait_for_complete_and_cb()
777 assert(retval[0] is not None)
778 eq(retval[0], payload)
779 eq(sys.getrefcount(comp), 2)
780
781 # test3: error case, use wait_for_complete_and_cb(), verify retval[0] is
782 # set by the time we regain control
783
784 retval[0] = 1
785 self._take_down_acting_set('test_pool', 'bar')
786 comp = self.ioctx.aio_read("bar", len(payload), 0, cb)
787 eq(False, comp.is_complete())
788 time.sleep(3)
789 eq(False, comp.is_complete())
790 with lock:
791 eq(1, retval[0])
792 self._let_osds_back_up()
793
794 comp.wait_for_complete_and_cb()
795 eq(None, retval[0])
796 assert(comp.get_return_value() < 0)
797 eq(sys.getrefcount(comp), 2)
798
799 [i.remove() for i in self.ioctx.list_objects()]
800
801 def test_lock(self):
802 self.ioctx.lock_exclusive("foo", "lock", "locker", "desc_lock",
803 10000, 0)
804 assert_raises(ObjectExists,
805 self.ioctx.lock_exclusive,
806 "foo", "lock", "locker", "desc_lock", 10000, 0)
807 self.ioctx.unlock("foo", "lock", "locker")
808 assert_raises(ObjectNotFound, self.ioctx.unlock, "foo", "lock", "locker")
809
810 self.ioctx.lock_shared("foo", "lock", "locker1", "tag", "desc_lock",
811 10000, 0)
812 self.ioctx.lock_shared("foo", "lock", "locker2", "tag", "desc_lock",
813 10000, 0)
814 assert_raises(ObjectBusy,
815 self.ioctx.lock_exclusive,
816 "foo", "lock", "locker3", "desc_lock", 10000, 0)
817 self.ioctx.unlock("foo", "lock", "locker1")
818 self.ioctx.unlock("foo", "lock", "locker2")
819 assert_raises(ObjectNotFound, self.ioctx.unlock, "foo", "lock", "locker1")
820 assert_raises(ObjectNotFound, self.ioctx.unlock, "foo", "lock", "locker2")
821
822 def test_execute(self):
823 self.ioctx.write("foo", b"") # ensure object exists
824
825 ret, buf = self.ioctx.execute("foo", "hello", "say_hello", b"")
826 eq(buf, b"Hello, world!")
827
828 ret, buf = self.ioctx.execute("foo", "hello", "say_hello", b"nose")
829 eq(buf, b"Hello, nose!")
830
831 def test_aio_execute(self):
832 count = [0]
833 retval = [None]
834 lock = threading.Condition()
835 def cb(_, buf):
836 with lock:
837 if retval[0] is None:
838 retval[0] = buf
839 count[0] += 1
840 lock.notify()
841 self.ioctx.write("foo", b"") # ensure object exists
842
843 comp = self.ioctx.aio_execute("foo", "hello", "say_hello", b"", 32, cb, cb)
844 comp.wait_for_complete()
845 with lock:
846 while count[0] < 2:
847 lock.wait()
848 eq(comp.get_return_value(), 13)
849 eq(retval[0], b"Hello, world!")
850
851 retval[0] = None
852 comp = self.ioctx.aio_execute("foo", "hello", "say_hello", b"nose", 32, cb, cb)
853 comp.wait_for_complete()
854 with lock:
855 while count[0] < 4:
856 lock.wait()
857 eq(comp.get_return_value(), 12)
858 eq(retval[0], b"Hello, nose!")
859
860 [i.remove() for i in self.ioctx.list_objects()]
861
862 class TestObject(object):
863
864 def setUp(self):
865 self.rados = Rados(conffile='')
866 self.rados.connect()
867 self.rados.create_pool('test_pool')
868 assert self.rados.pool_exists('test_pool')
869 self.ioctx = self.rados.open_ioctx('test_pool')
870 self.ioctx.write('foo', b'bar')
871 self.object = Object(self.ioctx, 'foo')
872
873 def tearDown(self):
874 self.ioctx.close()
875 self.ioctx = None
876 self.rados.delete_pool('test_pool')
877 self.rados.shutdown()
878 self.rados = None
879
880 def test_read(self):
881 eq(self.object.read(3), b'bar')
882 eq(self.object.read(100), b'')
883
884 def test_seek(self):
885 self.object.write(b'blah')
886 self.object.seek(0)
887 eq(self.object.read(4), b'blah')
888 self.object.seek(1)
889 eq(self.object.read(3), b'lah')
890
891 def test_write(self):
892 self.object.write(b'barbaz')
893 self.object.seek(0)
894 eq(self.object.read(3), b'bar')
895 eq(self.object.read(3), b'baz')
896
897 class TestCommand(object):
898
899 def setUp(self):
900 self.rados = Rados(conffile='')
901 self.rados.connect()
902
903 def tearDown(self):
904 self.rados.shutdown()
905
906 def test_monmap_dump(self):
907
908 # check for success and some plain output with epoch in it
909 cmd = {"prefix":"mon dump"}
910 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
911 eq(ret, 0)
912 assert len(buf) > 0
913 assert(b'epoch' in buf)
914
915 # JSON, and grab current epoch
916 cmd['format'] = 'json'
917 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
918 eq(ret, 0)
919 assert len(buf) > 0
920 d = json.loads(buf.decode("utf-8"))
921 assert('epoch' in d)
922 epoch = d['epoch']
923
924 # assume epoch + 1000 does not exist; test for ENOENT
925 cmd['epoch'] = epoch + 1000
926 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
927 eq(ret, -errno.ENOENT)
928 eq(len(buf), 0)
929 del cmd['epoch']
930
931 # send to specific target by name
932 target = d['mons'][0]['name']
933 print(target)
934 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30,
935 target=target)
936 eq(ret, 0)
937 assert len(buf) > 0
938 d = json.loads(buf.decode("utf-8"))
939 assert('epoch' in d)
940
941 # and by rank
942 target = d['mons'][0]['rank']
943 print(target)
944 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30,
945 target=target)
946 eq(ret, 0)
947 assert len(buf) > 0
948 d = json.loads(buf.decode("utf-8"))
949 assert('epoch' in d)
950
951 def test_osd_bench(self):
952 cmd = dict(prefix='bench', size=4096, count=8192)
953 ret, buf, err = self.rados.osd_command(0, json.dumps(cmd), b'',
954 timeout=30)
955 eq(ret, 0)
956 assert len(err) > 0
957 out = json.loads(err)
958 eq(out['blocksize'], cmd['size'])
959 eq(out['bytes_written'], cmd['count'])
960
961 def test_ceph_osd_pool_create_utf8(self):
962 if _python2:
963 # Use encoded bytestring
964 poolname = b"\351\273\205"
965 else:
966 poolname = "\u9ec5"
967
968 cmd = {"prefix": "osd pool create", "pg_num": 16, "pool": poolname}
969 ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'')
970 eq(ret, 0)
971 assert len(out) > 0
972 eq(u"pool '\u9ec5' created", out)