]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/pybind/test_rados.py
update sources to 12.2.7
[ceph.git] / ceph / src / test / pybind / test_rados.py
1 from __future__ import print_function
2 from nose import SkipTest
3 from nose.tools import eq_ as eq, ok_ as ok, assert_raises
4 from rados import (Rados, Error, RadosStateError, Object, ObjectExists,
5 ObjectNotFound, ObjectBusy, requires, opt,
6 ANONYMOUS_AUID, ADMIN_AUID, LIBRADOS_ALL_NSPACES, WriteOpCtx, ReadOpCtx,
7 LIBRADOS_SNAP_HEAD, LIBRADOS_OPERATION_BALANCE_READS, LIBRADOS_OPERATION_SKIPRWLOCKS, MonitorLog)
8 import time
9 import threading
10 import json
11 import errno
12 import sys
13
14 # Are we running Python 2.x
15 _python2 = sys.version_info[0] < 3
16
17 def test_rados_init_error():
18 assert_raises(Error, Rados, conffile='', rados_id='admin',
19 name='client.admin')
20 assert_raises(Error, Rados, conffile='', name='invalid')
21 assert_raises(Error, Rados, conffile='', name='bad.invalid')
22
23 def test_rados_init():
24 with Rados(conffile='', rados_id='admin'):
25 pass
26 with Rados(conffile='', name='client.admin'):
27 pass
28 with Rados(conffile='', name='client.admin'):
29 pass
30 with Rados(conffile='', name='client.admin'):
31 pass
32
33 def test_ioctx_context_manager():
34 with Rados(conffile='', rados_id='admin') as conn:
35 with conn.open_ioctx('rbd') as ioctx:
36 pass
37
38 def test_parse_argv():
39 args = ['osd', 'pool', 'delete', 'foobar', 'foobar', '--yes-i-really-really-mean-it']
40 r = Rados()
41 eq(args, r.conf_parse_argv(args))
42
43 def test_parse_argv_empty_str():
44 args = ['']
45 r = Rados()
46 eq(args, r.conf_parse_argv(args))
47
48 class TestRequires(object):
49 @requires(('foo', str), ('bar', int), ('baz', int))
50 def _method_plain(self, foo, bar, baz):
51 ok(isinstance(foo, str))
52 ok(isinstance(bar, int))
53 ok(isinstance(baz, int))
54 return (foo, bar, baz)
55
56 def test_method_plain(self):
57 assert_raises(TypeError, self._method_plain, 42, 42, 42)
58 assert_raises(TypeError, self._method_plain, '42', '42', '42')
59 assert_raises(TypeError, self._method_plain, foo='42', bar='42', baz='42')
60 eq(self._method_plain('42', 42, 42), ('42', 42, 42))
61 eq(self._method_plain(foo='42', bar=42, baz=42), ('42', 42, 42))
62
63 @requires(('opt_foo', opt(str)), ('opt_bar', opt(int)), ('baz', int))
64 def _method_with_opt_arg(self, foo, bar, baz):
65 ok(isinstance(foo, str) or foo is None)
66 ok(isinstance(bar, int) or bar is None)
67 ok(isinstance(baz, int))
68 return (foo, bar, baz)
69
70 def test_method_with_opt_args(self):
71 assert_raises(TypeError, self._method_with_opt_arg, 42, 42, 42)
72 assert_raises(TypeError, self._method_with_opt_arg, '42', '42', 42)
73 assert_raises(TypeError, self._method_with_opt_arg, None, None, None)
74 eq(self._method_with_opt_arg(None, 42, 42), (None, 42, 42))
75 eq(self._method_with_opt_arg('42', None, 42), ('42', None, 42))
76 eq(self._method_with_opt_arg(None, None, 42), (None, None, 42))
77
78
79 class TestRadosStateError(object):
80 def _requires_configuring(self, rados):
81 assert_raises(RadosStateError, rados.connect)
82
83 def _requires_configuring_or_connected(self, rados):
84 assert_raises(RadosStateError, rados.conf_read_file)
85 assert_raises(RadosStateError, rados.conf_parse_argv, None)
86 assert_raises(RadosStateError, rados.conf_parse_env)
87 assert_raises(RadosStateError, rados.conf_get, 'opt')
88 assert_raises(RadosStateError, rados.conf_set, 'opt', 'val')
89 assert_raises(RadosStateError, rados.ping_monitor, 0)
90
91 def _requires_connected(self, rados):
92 assert_raises(RadosStateError, rados.pool_exists, 'foo')
93 assert_raises(RadosStateError, rados.pool_lookup, 'foo')
94 assert_raises(RadosStateError, rados.pool_reverse_lookup, 0)
95 assert_raises(RadosStateError, rados.create_pool, 'foo')
96 assert_raises(RadosStateError, rados.get_pool_base_tier, 0)
97 assert_raises(RadosStateError, rados.delete_pool, 'foo')
98 assert_raises(RadosStateError, rados.list_pools)
99 assert_raises(RadosStateError, rados.get_fsid)
100 assert_raises(RadosStateError, rados.open_ioctx, 'foo')
101 assert_raises(RadosStateError, rados.mon_command, '', b'')
102 assert_raises(RadosStateError, rados.osd_command, 0, '', b'')
103 assert_raises(RadosStateError, rados.pg_command, '', '', b'')
104 assert_raises(RadosStateError, rados.wait_for_latest_osdmap)
105 assert_raises(RadosStateError, rados.blacklist_add, '127.0.0.1/123', 0)
106
107 def test_configuring(self):
108 rados = Rados(conffile='')
109 eq('configuring', rados.state)
110 self._requires_connected(rados)
111
112 def test_connected(self):
113 rados = Rados(conffile='')
114 with rados:
115 eq('connected', rados.state)
116 self._requires_configuring(rados)
117
118 def test_shutdown(self):
119 rados = Rados(conffile='')
120 with rados:
121 pass
122 eq('shutdown', rados.state)
123 self._requires_configuring(rados)
124 self._requires_configuring_or_connected(rados)
125 self._requires_connected(rados)
126
127
128 class TestRados(object):
129
130 def setUp(self):
131 self.rados = Rados(conffile='')
132 self.rados.conf_parse_env('FOO_DOES_NOT_EXIST_BLAHBLAH')
133 self.rados.conf_parse_env()
134 self.rados.connect()
135
136 # Assume any pre-existing pools are the cluster's defaults
137 self.default_pools = self.rados.list_pools()
138
139 def tearDown(self):
140 self.rados.shutdown()
141
142 def test_ping_monitor(self):
143 assert_raises(ObjectNotFound, self.rados.ping_monitor, 'not_exists_monitor')
144 cmd = {'prefix': 'mon dump', 'format':'json'}
145 ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'')
146 for mon in json.loads(buf.decode('utf8'))['mons']:
147 while True:
148 output = self.rados.ping_monitor(mon['name'])
149 if output is None:
150 continue
151 buf = json.loads(output)
152 if buf.get('health'):
153 break
154
155 def test_create(self):
156 self.rados.create_pool('foo')
157 self.rados.delete_pool('foo')
158
159 def test_create_utf8(self):
160 if _python2:
161 # Use encoded bytestring
162 poolname = b"\351\273\204"
163 else:
164 poolname = "\u9ec4"
165 self.rados.create_pool(poolname)
166 assert self.rados.pool_exists(u"\u9ec4")
167 self.rados.delete_pool(poolname)
168
169 def test_pool_lookup_utf8(self):
170 if _python2:
171 poolname = u'\u9ec4'
172 else:
173 poolname = '\u9ec4'
174 self.rados.create_pool(poolname)
175 try:
176 poolid = self.rados.pool_lookup(poolname)
177 eq(poolname, self.rados.pool_reverse_lookup(poolid))
178 finally:
179 self.rados.delete_pool(poolname)
180
181 def test_create_auid(self):
182 self.rados.create_pool('foo', 100)
183 assert self.rados.pool_exists('foo')
184 self.rados.delete_pool('foo')
185
186 def test_eexist(self):
187 self.rados.create_pool('foo')
188 assert_raises(ObjectExists, self.rados.create_pool, 'foo')
189 self.rados.delete_pool('foo')
190
191 def list_non_default_pools(self):
192 pools = self.rados.list_pools()
193 for p in self.default_pools:
194 pools.remove(p)
195 return set(pools)
196
197 def test_list_pools(self):
198 eq(set(), self.list_non_default_pools())
199 self.rados.create_pool('foo')
200 eq(set(['foo']), self.list_non_default_pools())
201 self.rados.create_pool('bar')
202 eq(set(['foo', 'bar']), self.list_non_default_pools())
203 self.rados.create_pool('baz')
204 eq(set(['foo', 'bar', 'baz']), self.list_non_default_pools())
205 self.rados.delete_pool('foo')
206 eq(set(['bar', 'baz']), self.list_non_default_pools())
207 self.rados.delete_pool('baz')
208 eq(set(['bar']), self.list_non_default_pools())
209 self.rados.delete_pool('bar')
210 eq(set(), self.list_non_default_pools())
211 self.rados.create_pool('a' * 500)
212 eq(set(['a' * 500]), self.list_non_default_pools())
213 self.rados.delete_pool('a' * 500)
214
215 def test_get_pool_base_tier(self):
216 self.rados.create_pool('foo')
217 try:
218 self.rados.create_pool('foo-cache')
219 try:
220 pool_id = self.rados.pool_lookup('foo')
221 tier_pool_id = self.rados.pool_lookup('foo-cache')
222
223 cmd = {"prefix":"osd tier add", "pool":"foo", "tierpool":"foo-cache", "force_nonempty":""}
224 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
225 eq(ret, 0)
226
227 try:
228 cmd = {"prefix":"osd tier cache-mode", "pool":"foo-cache", "tierpool":"foo-cache", "mode":"readonly", "sure":"--yes-i-really-mean-it"}
229 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
230 eq(ret, 0)
231
232 eq(self.rados.wait_for_latest_osdmap(), 0)
233
234 eq(pool_id, self.rados.get_pool_base_tier(pool_id))
235 eq(pool_id, self.rados.get_pool_base_tier(tier_pool_id))
236 finally:
237 cmd = {"prefix":"osd tier remove", "pool":"foo", "tierpool":"foo-cache"}
238 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
239 eq(ret, 0)
240 finally:
241 self.rados.delete_pool('foo-cache')
242 finally:
243 self.rados.delete_pool('foo')
244
245 def test_get_fsid(self):
246 fsid = self.rados.get_fsid()
247 eq(len(fsid), 36)
248
249 def test_blacklist_add(self):
250 self.rados.blacklist_add("1.2.3.4/123", 1)
251
252 def test_get_cluster_stats(self):
253 stats = self.rados.get_cluster_stats()
254 assert stats['kb'] > 0
255 assert stats['kb_avail'] > 0
256 assert stats['kb_used'] > 0
257 assert stats['num_objects'] >= 0
258
259 def test_monitor_log(self):
260 lock = threading.Condition()
261 def cb(arg, line, who, sec, nsec, seq, level, msg):
262 # NOTE(sileht): the old pyrados API was received the pointer as int
263 # instead of the value of arg
264 eq(arg, "arg")
265 with lock:
266 lock.notify()
267 return 0
268
269 # NOTE(sileht): force don't save the monitor into local var
270 # to ensure all references are correctly tracked into the lib
271 MonitorLog(self.rados, "debug", cb, "arg")
272 with lock:
273 lock.wait()
274 MonitorLog(self.rados, "debug", None, None)
275 eq(None, self.rados.monitor_callback)
276
277 class TestIoctx(object):
278
279 def setUp(self):
280 self.rados = Rados(conffile='')
281 self.rados.connect()
282 self.rados.create_pool('test_pool')
283 assert self.rados.pool_exists('test_pool')
284 self.ioctx = self.rados.open_ioctx('test_pool')
285
286 def tearDown(self):
287 cmd = {"prefix":"osd unset", "key":"noup"}
288 self.rados.mon_command(json.dumps(cmd), b'')
289 self.ioctx.close()
290 self.rados.delete_pool('test_pool')
291 self.rados.shutdown()
292
293 def test_get_last_version(self):
294 version = self.ioctx.get_last_version()
295 assert version >= 0
296
297 def test_get_stats(self):
298 stats = self.ioctx.get_stats()
299 eq(stats, {'num_objects_unfound': 0,
300 'num_objects_missing_on_primary': 0,
301 'num_object_clones': 0,
302 'num_objects': 0,
303 'num_object_copies': 0,
304 'num_bytes': 0,
305 'num_rd_kb': 0,
306 'num_wr_kb': 0,
307 'num_kb': 0,
308 'num_wr': 0,
309 'num_objects_degraded': 0,
310 'num_rd': 0})
311
312 def test_change_auid(self):
313 self.ioctx.change_auid(ANONYMOUS_AUID)
314 self.ioctx.change_auid(ADMIN_AUID)
315
316 def test_write(self):
317 self.ioctx.write('abc', b'abc')
318 eq(self.ioctx.read('abc'), b'abc')
319
320 def test_write_full(self):
321 self.ioctx.write('abc', b'abc')
322 eq(self.ioctx.read('abc'), b'abc')
323 self.ioctx.write_full('abc', b'd')
324 eq(self.ioctx.read('abc'), b'd')
325
326 def test_append(self):
327 self.ioctx.write('abc', b'a')
328 self.ioctx.append('abc', b'b')
329 self.ioctx.append('abc', b'c')
330 eq(self.ioctx.read('abc'), b'abc')
331
332 def test_write_zeros(self):
333 self.ioctx.write('abc', b'a\0b\0c')
334 eq(self.ioctx.read('abc'), b'a\0b\0c')
335
336 def test_trunc(self):
337 self.ioctx.write('abc', b'abc')
338 self.ioctx.trunc('abc', 2)
339 eq(self.ioctx.read('abc'), b'ab')
340 size = self.ioctx.stat('abc')[0]
341 eq(size, 2)
342
343 def test_list_objects_empty(self):
344 eq(list(self.ioctx.list_objects()), [])
345
346 def test_list_objects(self):
347 self.ioctx.write('a', b'')
348 self.ioctx.write('b', b'foo')
349 self.ioctx.write_full('c', b'bar')
350 self.ioctx.append('d', b'jazz')
351 object_names = [obj.key for obj in self.ioctx.list_objects()]
352 eq(sorted(object_names), ['a', 'b', 'c', 'd'])
353
354 def test_list_ns_objects(self):
355 self.ioctx.write('a', b'')
356 self.ioctx.write('b', b'foo')
357 self.ioctx.write_full('c', b'bar')
358 self.ioctx.append('d', b'jazz')
359 self.ioctx.set_namespace("ns1")
360 self.ioctx.write('ns1-a', b'')
361 self.ioctx.write('ns1-b', b'foo')
362 self.ioctx.write_full('ns1-c', b'bar')
363 self.ioctx.append('ns1-d', b'jazz')
364 self.ioctx.append('d', b'jazz')
365 self.ioctx.set_namespace(LIBRADOS_ALL_NSPACES)
366 object_names = [(obj.nspace, obj.key) for obj in self.ioctx.list_objects()]
367 eq(sorted(object_names), [('', 'a'), ('','b'), ('','c'), ('','d'),\
368 ('ns1', 'd'), ('ns1', 'ns1-a'), ('ns1', 'ns1-b'),\
369 ('ns1', 'ns1-c'), ('ns1', 'ns1-d')])
370
371 def test_xattrs(self):
372 xattrs = dict(a=b'1', b=b'2', c=b'3', d=b'a\0b', e=b'\0', f='')
373 self.ioctx.write('abc', b'')
374 for key, value in xattrs.items():
375 self.ioctx.set_xattr('abc', key, value)
376 eq(self.ioctx.get_xattr('abc', key), value)
377 stored_xattrs = {}
378 for key, value in self.ioctx.get_xattrs('abc'):
379 stored_xattrs[key] = value
380 eq(stored_xattrs, xattrs)
381
382 def test_obj_xattrs(self):
383 xattrs = dict(a=b'1', b=b'2', c=b'3', d=b'a\0b', e=b'\0', f='')
384 self.ioctx.write('abc', b'')
385 obj = list(self.ioctx.list_objects())[0]
386 for key, value in xattrs.items():
387 obj.set_xattr(key, value)
388 eq(obj.get_xattr(key), value)
389 stored_xattrs = {}
390 for key, value in obj.get_xattrs():
391 stored_xattrs[key] = value
392 eq(stored_xattrs, xattrs)
393
394 def test_create_snap(self):
395 assert_raises(ObjectNotFound, self.ioctx.remove_snap, 'foo')
396 self.ioctx.create_snap('foo')
397 self.ioctx.remove_snap('foo')
398
399 def test_list_snaps_empty(self):
400 eq(list(self.ioctx.list_snaps()), [])
401
402 def test_list_snaps(self):
403 snaps = ['snap1', 'snap2', 'snap3']
404 for snap in snaps:
405 self.ioctx.create_snap(snap)
406 listed_snaps = [snap.name for snap in self.ioctx.list_snaps()]
407 eq(snaps, listed_snaps)
408
409 def test_lookup_snap(self):
410 self.ioctx.create_snap('foo')
411 snap = self.ioctx.lookup_snap('foo')
412 eq(snap.name, 'foo')
413
414 def test_snap_timestamp(self):
415 self.ioctx.create_snap('foo')
416 snap = self.ioctx.lookup_snap('foo')
417 snap.get_timestamp()
418
419 def test_remove_snap(self):
420 self.ioctx.create_snap('foo')
421 (snap,) = self.ioctx.list_snaps()
422 eq(snap.name, 'foo')
423 self.ioctx.remove_snap('foo')
424 eq(list(self.ioctx.list_snaps()), [])
425
426 def test_snap_rollback(self):
427 self.ioctx.write("insnap", b"contents1")
428 self.ioctx.create_snap("snap1")
429 self.ioctx.remove_object("insnap")
430 self.ioctx.snap_rollback("insnap", "snap1")
431 eq(self.ioctx.read("insnap"), b"contents1")
432 self.ioctx.remove_snap("snap1")
433 self.ioctx.remove_object("insnap")
434
435 def test_snap_read(self):
436 self.ioctx.write("insnap", b"contents1")
437 self.ioctx.create_snap("snap1")
438 self.ioctx.remove_object("insnap")
439 snap = self.ioctx.lookup_snap("snap1")
440 self.ioctx.set_read(snap.snap_id)
441 eq(self.ioctx.read("insnap"), b"contents1")
442 self.ioctx.set_read(LIBRADOS_SNAP_HEAD)
443 self.ioctx.write("inhead", b"contents2")
444 eq(self.ioctx.read("inhead"), b"contents2")
445 self.ioctx.remove_snap("snap1")
446 self.ioctx.remove_object("inhead")
447
448 def test_set_omap(self):
449 keys = ("1", "2", "3", "4")
450 values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
451 with WriteOpCtx(self.ioctx) as write_op:
452 self.ioctx.set_omap(write_op, keys, values)
453 write_op.set_flags(LIBRADOS_OPERATION_SKIPRWLOCKS)
454 self.ioctx.operate_write_op(write_op, "hw")
455 with ReadOpCtx(self.ioctx) as read_op:
456 iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 4)
457 eq(ret, 0)
458 self.ioctx.operate_read_op(read_op, "hw")
459 next(iter)
460 eq(list(iter), [("2", b"bbb"), ("3", b"ccc"), ("4", b"\x04\x04\x04\x04")])
461 with ReadOpCtx(self.ioctx) as read_op:
462 iter, ret = self.ioctx.get_omap_vals(read_op, "2", "", 4)
463 eq(ret, 0)
464 self.ioctx.operate_read_op(read_op, "hw")
465 eq(("3", b"ccc"), next(iter))
466 eq(list(iter), [("4", b"\x04\x04\x04\x04")])
467 with ReadOpCtx(self.ioctx) as read_op:
468 iter, ret = self.ioctx.get_omap_vals(read_op, "", "2", 4)
469 eq(ret, 0)
470 read_op.set_flags(LIBRADOS_OPERATION_BALANCE_READS)
471 self.ioctx.operate_read_op(read_op, "hw")
472 eq(list(iter), [("2", b"bbb")])
473
474 def test_set_omap_aio(self):
475 lock = threading.Condition()
476 count = [0]
477 def cb(blah):
478 with lock:
479 count[0] += 1
480 lock.notify()
481 return 0
482
483 keys = ("1", "2", "3", "4")
484 values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
485 with WriteOpCtx(self.ioctx) as write_op:
486 self.ioctx.set_omap(write_op, keys, values)
487 comp = self.ioctx.operate_aio_write_op(write_op, "hw", cb, cb)
488 comp.wait_for_complete()
489 comp.wait_for_safe()
490 with lock:
491 while count[0] < 2:
492 lock.wait()
493 eq(comp.get_return_value(), 0)
494
495 with ReadOpCtx(self.ioctx) as read_op:
496 iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 4)
497 eq(ret, 0)
498 comp = self.ioctx.operate_aio_read_op(read_op, "hw", cb, cb)
499 comp.wait_for_complete()
500 comp.wait_for_safe()
501 with lock:
502 while count[0] < 4:
503 lock.wait()
504 eq(comp.get_return_value(), 0)
505 next(iter)
506 eq(list(iter), [("2", b"bbb"), ("3", b"ccc"), ("4", b"\x04\x04\x04\x04")])
507
508 def test_write_ops(self):
509 with WriteOpCtx(self.ioctx) as write_op:
510 write_op.new(0)
511 self.ioctx.operate_write_op(write_op, "write_ops")
512 eq(self.ioctx.read('write_ops'), b'')
513 write_op.write_full(b'1')
514 write_op.append(b'2')
515 self.ioctx.operate_write_op(write_op, "write_ops")
516 eq(self.ioctx.read('write_ops'), b'12')
517 write_op.write_full(b'12345')
518 write_op.write(b'x', 2)
519 self.ioctx.operate_write_op(write_op, "write_ops")
520 eq(self.ioctx.read('write_ops'), b'12x45')
521 write_op.write_full(b'12345')
522 write_op.zero(2, 2)
523 self.ioctx.operate_write_op(write_op, "write_ops")
524 eq(self.ioctx.read('write_ops'), b'12\x00\x005')
525 write_op.write_full(b'12345')
526 write_op.truncate(2)
527 self.ioctx.operate_write_op(write_op, "write_ops")
528 eq(self.ioctx.read('write_ops'), b'12')
529 write_op.remove()
530 self.ioctx.operate_write_op(write_op, "write_ops")
531 with assert_raises(ObjectNotFound):
532 self.ioctx.read('write_ops')
533
534 def test_get_omap_vals_by_keys(self):
535 keys = ("1", "2", "3", "4")
536 values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
537 with WriteOpCtx(self.ioctx) as write_op:
538 self.ioctx.set_omap(write_op, keys, values)
539 self.ioctx.operate_write_op(write_op, "hw")
540 with ReadOpCtx(self.ioctx) as read_op:
541 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("3","4",))
542 eq(ret, 0)
543 self.ioctx.operate_read_op(read_op, "hw")
544 eq(list(iter), [("3", b"ccc"), ("4", b"\x04\x04\x04\x04")])
545 with ReadOpCtx(self.ioctx) as read_op:
546 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("3","4",))
547 eq(ret, 0)
548 with assert_raises(ObjectNotFound):
549 self.ioctx.operate_read_op(read_op, "no_such")
550
551 def test_get_omap_keys(self):
552 keys = ("1", "2", "3")
553 values = (b"aaa", b"bbb", b"ccc")
554 with WriteOpCtx(self.ioctx) as write_op:
555 self.ioctx.set_omap(write_op, keys, values)
556 self.ioctx.operate_write_op(write_op, "hw")
557 with ReadOpCtx(self.ioctx) as read_op:
558 iter, ret = self.ioctx.get_omap_keys(read_op,"",2)
559 eq(ret, 0)
560 self.ioctx.operate_read_op(read_op, "hw")
561 eq(list(iter), [("1", None), ("2", None)])
562 with ReadOpCtx(self.ioctx) as read_op:
563 iter, ret = self.ioctx.get_omap_keys(read_op,"",2)
564 eq(ret, 0)
565 with assert_raises(ObjectNotFound):
566 self.ioctx.operate_read_op(read_op, "no_such")
567
568 def test_clear_omap(self):
569 keys = ("1", "2", "3")
570 values = (b"aaa", b"bbb", b"ccc")
571 with WriteOpCtx(self.ioctx) as write_op:
572 self.ioctx.set_omap(write_op, keys, values)
573 self.ioctx.operate_write_op(write_op, "hw")
574 with WriteOpCtx(self.ioctx) as write_op_1:
575 self.ioctx.clear_omap(write_op_1)
576 self.ioctx.operate_write_op(write_op_1, "hw")
577 with ReadOpCtx(self.ioctx) as read_op:
578 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("1",))
579 eq(ret, 0)
580 self.ioctx.operate_read_op(read_op, "hw")
581 eq(list(iter), [])
582
583 def test_locator(self):
584 self.ioctx.set_locator_key("bar")
585 self.ioctx.write('foo', b'contents1')
586 objects = [i for i in self.ioctx.list_objects()]
587 eq(len(objects), 1)
588 eq(self.ioctx.get_locator_key(), "bar")
589 self.ioctx.set_locator_key("")
590 objects[0].seek(0)
591 objects[0].write(b"contents2")
592 eq(self.ioctx.get_locator_key(), "")
593 self.ioctx.set_locator_key("bar")
594 contents = self.ioctx.read("foo")
595 eq(contents, b"contents2")
596 eq(self.ioctx.get_locator_key(), "bar")
597 objects[0].remove()
598 objects = [i for i in self.ioctx.list_objects()]
599 eq(objects, [])
600 self.ioctx.set_locator_key("")
601
602 def test_aio_write(self):
603 lock = threading.Condition()
604 count = [0]
605 def cb(blah):
606 with lock:
607 count[0] += 1
608 lock.notify()
609 return 0
610 comp = self.ioctx.aio_write("foo", b"bar", 0, cb, cb)
611 comp.wait_for_complete()
612 comp.wait_for_safe()
613 with lock:
614 while count[0] < 2:
615 lock.wait()
616 eq(comp.get_return_value(), 0)
617 contents = self.ioctx.read("foo")
618 eq(contents, b"bar")
619 [i.remove() for i in self.ioctx.list_objects()]
620
621 def test_aio_write_no_comp_ref(self):
622 lock = threading.Condition()
623 count = [0]
624 def cb(blah):
625 with lock:
626 count[0] += 1
627 lock.notify()
628 return 0
629 # NOTE(sileht): force don't save the comp into local var
630 # to ensure all references are correctly tracked into the lib
631 self.ioctx.aio_write("foo", b"bar", 0, cb, cb)
632 with lock:
633 while count[0] < 2:
634 lock.wait()
635 contents = self.ioctx.read("foo")
636 eq(contents, b"bar")
637 [i.remove() for i in self.ioctx.list_objects()]
638
639 def test_aio_append(self):
640 lock = threading.Condition()
641 count = [0]
642 def cb(blah):
643 with lock:
644 count[0] += 1
645 lock.notify()
646 return 0
647 comp = self.ioctx.aio_write("foo", b"bar", 0, cb, cb)
648 comp2 = self.ioctx.aio_append("foo", b"baz", cb, cb)
649 comp.wait_for_complete()
650 contents = self.ioctx.read("foo")
651 eq(contents, b"barbaz")
652 with lock:
653 while count[0] < 4:
654 lock.wait()
655 eq(comp.get_return_value(), 0)
656 eq(comp2.get_return_value(), 0)
657 [i.remove() for i in self.ioctx.list_objects()]
658
659 def test_aio_write_full(self):
660 lock = threading.Condition()
661 count = [0]
662 def cb(blah):
663 with lock:
664 count[0] += 1
665 lock.notify()
666 return 0
667 self.ioctx.aio_write("foo", b"barbaz", 0, cb, cb)
668 comp = self.ioctx.aio_write_full("foo", b"bar", cb, cb)
669 comp.wait_for_complete()
670 comp.wait_for_safe()
671 with lock:
672 while count[0] < 2:
673 lock.wait()
674 eq(comp.get_return_value(), 0)
675 contents = self.ioctx.read("foo")
676 eq(contents, b"bar")
677 [i.remove() for i in self.ioctx.list_objects()]
678
679 def test_aio_stat(self):
680 lock = threading.Condition()
681 count = [0]
682 def cb(_, size, mtime):
683 with lock:
684 count[0] += 1
685 lock.notify()
686
687 comp = self.ioctx.aio_stat("foo", cb)
688 comp.wait_for_complete()
689 with lock:
690 while count[0] < 1:
691 lock.wait()
692 eq(comp.get_return_value(), -2)
693
694 self.ioctx.write("foo", b"bar")
695
696 comp = self.ioctx.aio_stat("foo", cb)
697 comp.wait_for_complete()
698 with lock:
699 while count[0] < 2:
700 lock.wait()
701 eq(comp.get_return_value(), 0)
702
703 [i.remove() for i in self.ioctx.list_objects()]
704
705 def _take_down_acting_set(self, pool, objectname):
706 # find acting_set for pool:objectname and take it down; used to
707 # verify that async reads don't complete while acting set is missing
708 cmd = {
709 "prefix":"osd map",
710 "pool":pool,
711 "object":objectname,
712 "format":"json",
713 }
714 r, jsonout, _ = self.rados.mon_command(json.dumps(cmd), b'')
715 objmap = json.loads(jsonout.decode("utf-8"))
716 acting_set = objmap['acting']
717 cmd = {"prefix":"osd set", "key":"noup"}
718 r, _, _ = self.rados.mon_command(json.dumps(cmd), b'')
719 eq(r, 0)
720 cmd = {"prefix":"osd down", "ids":[str(i) for i in acting_set]}
721 r, _, _ = self.rados.mon_command(json.dumps(cmd), b'')
722 eq(r, 0)
723
724 # wait for OSDs to acknowledge the down
725 eq(self.rados.wait_for_latest_osdmap(), 0)
726
727 def _let_osds_back_up(self):
728 cmd = {"prefix":"osd unset", "key":"noup"}
729 r, _, _ = self.rados.mon_command(json.dumps(cmd), b'')
730 eq(r, 0)
731
732 def test_aio_read(self):
733 # this is a list so that the local cb() can modify it
734 retval = [None]
735 lock = threading.Condition()
736 def cb(_, buf):
737 with lock:
738 retval[0] = buf
739 lock.notify()
740 payload = b"bar\000frob"
741 self.ioctx.write("foo", payload)
742
743 # test1: use wait_for_complete() and wait for cb by
744 # watching retval[0]
745 self._take_down_acting_set('test_pool', 'foo')
746 comp = self.ioctx.aio_read("foo", len(payload), 0, cb)
747 eq(False, comp.is_complete())
748 time.sleep(3)
749 eq(False, comp.is_complete())
750 with lock:
751 eq(None, retval[0])
752 self._let_osds_back_up()
753 comp.wait_for_complete()
754 loops = 0
755 with lock:
756 while retval[0] is None and loops <= 10:
757 lock.wait(timeout=5)
758 loops += 1
759 assert(loops <= 10)
760
761 eq(retval[0], payload)
762 eq(sys.getrefcount(comp), 2)
763
764 # test2: use wait_for_complete_and_cb(), verify retval[0] is
765 # set by the time we regain control
766
767 retval[0] = None
768 self._take_down_acting_set('test_pool', 'foo')
769 comp = self.ioctx.aio_read("foo", len(payload), 0, cb)
770 eq(False, comp.is_complete())
771 time.sleep(3)
772 eq(False, comp.is_complete())
773 with lock:
774 eq(None, retval[0])
775 self._let_osds_back_up()
776
777 comp.wait_for_complete_and_cb()
778 assert(retval[0] is not None)
779 eq(retval[0], payload)
780 eq(sys.getrefcount(comp), 2)
781
782 # test3: error case, use wait_for_complete_and_cb(), verify retval[0] is
783 # set by the time we regain control
784
785 retval[0] = 1
786 self._take_down_acting_set('test_pool', 'bar')
787 comp = self.ioctx.aio_read("bar", len(payload), 0, cb)
788 eq(False, comp.is_complete())
789 time.sleep(3)
790 eq(False, comp.is_complete())
791 with lock:
792 eq(1, retval[0])
793 self._let_osds_back_up()
794
795 comp.wait_for_complete_and_cb()
796 eq(None, retval[0])
797 assert(comp.get_return_value() < 0)
798 eq(sys.getrefcount(comp), 2)
799
800 [i.remove() for i in self.ioctx.list_objects()]
801
802 def test_lock(self):
803 self.ioctx.lock_exclusive("foo", "lock", "locker", "desc_lock",
804 10000, 0)
805 assert_raises(ObjectExists,
806 self.ioctx.lock_exclusive,
807 "foo", "lock", "locker", "desc_lock", 10000, 0)
808 self.ioctx.unlock("foo", "lock", "locker")
809 assert_raises(ObjectNotFound, self.ioctx.unlock, "foo", "lock", "locker")
810
811 self.ioctx.lock_shared("foo", "lock", "locker1", "tag", "desc_lock",
812 10000, 0)
813 self.ioctx.lock_shared("foo", "lock", "locker2", "tag", "desc_lock",
814 10000, 0)
815 assert_raises(ObjectBusy,
816 self.ioctx.lock_exclusive,
817 "foo", "lock", "locker3", "desc_lock", 10000, 0)
818 self.ioctx.unlock("foo", "lock", "locker1")
819 self.ioctx.unlock("foo", "lock", "locker2")
820 assert_raises(ObjectNotFound, self.ioctx.unlock, "foo", "lock", "locker1")
821 assert_raises(ObjectNotFound, self.ioctx.unlock, "foo", "lock", "locker2")
822
823 def test_execute(self):
824 self.ioctx.write("foo", b"") # ensure object exists
825
826 ret, buf = self.ioctx.execute("foo", "hello", "say_hello", b"")
827 eq(buf, b"Hello, world!")
828
829 ret, buf = self.ioctx.execute("foo", "hello", "say_hello", b"nose")
830 eq(buf, b"Hello, nose!")
831
832 def test_aio_execute(self):
833 count = [0]
834 retval = [None]
835 lock = threading.Condition()
836 def cb(_, buf):
837 with lock:
838 if retval[0] is None:
839 retval[0] = buf
840 count[0] += 1
841 lock.notify()
842 self.ioctx.write("foo", b"") # ensure object exists
843
844 comp = self.ioctx.aio_execute("foo", "hello", "say_hello", b"", 32, cb, cb)
845 comp.wait_for_complete()
846 with lock:
847 while count[0] < 2:
848 lock.wait()
849 eq(comp.get_return_value(), 13)
850 eq(retval[0], b"Hello, world!")
851
852 retval[0] = None
853 comp = self.ioctx.aio_execute("foo", "hello", "say_hello", b"nose", 32, cb, cb)
854 comp.wait_for_complete()
855 with lock:
856 while count[0] < 4:
857 lock.wait()
858 eq(comp.get_return_value(), 12)
859 eq(retval[0], b"Hello, nose!")
860
861 [i.remove() for i in self.ioctx.list_objects()]
862
863 def test_applications(self):
864 cmd = {"prefix":"osd dump", "format":"json"}
865 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'')
866 eq(ret, 0)
867 assert len(buf) > 0
868 release = json.loads(buf.decode("utf-8")).get("require_osd_release",
869 None)
870 if not release or release[0] < 'l':
871 raise SkipTest
872
873 eq([], self.ioctx.application_list())
874
875 self.ioctx.application_enable("app1")
876 assert_raises(Error, self.ioctx.application_enable, "app2")
877 self.ioctx.application_enable("app2", True)
878
879 assert_raises(Error, self.ioctx.application_metadata_list, "dne")
880 eq([], self.ioctx.application_metadata_list("app1"))
881
882 assert_raises(Error, self.ioctx.application_metadata_set, "dne", "key",
883 "key")
884 self.ioctx.application_metadata_set("app1", "key1", "val1")
885 self.ioctx.application_metadata_set("app1", "key2", "val2")
886 self.ioctx.application_metadata_set("app2", "key1", "val1")
887
888 eq([("key1", "val1"), ("key2", "val2")],
889 self.ioctx.application_metadata_list("app1"))
890
891 self.ioctx.application_metadata_remove("app1", "key1")
892 eq([("key2", "val2")], self.ioctx.application_metadata_list("app1"))
893
894 class TestObject(object):
895
896 def setUp(self):
897 self.rados = Rados(conffile='')
898 self.rados.connect()
899 self.rados.create_pool('test_pool')
900 assert self.rados.pool_exists('test_pool')
901 self.ioctx = self.rados.open_ioctx('test_pool')
902 self.ioctx.write('foo', b'bar')
903 self.object = Object(self.ioctx, 'foo')
904
905 def tearDown(self):
906 self.ioctx.close()
907 self.ioctx = None
908 self.rados.delete_pool('test_pool')
909 self.rados.shutdown()
910 self.rados = None
911
912 def test_read(self):
913 eq(self.object.read(3), b'bar')
914 eq(self.object.read(100), b'')
915
916 def test_seek(self):
917 self.object.write(b'blah')
918 self.object.seek(0)
919 eq(self.object.read(4), b'blah')
920 self.object.seek(1)
921 eq(self.object.read(3), b'lah')
922
923 def test_write(self):
924 self.object.write(b'barbaz')
925 self.object.seek(0)
926 eq(self.object.read(3), b'bar')
927 eq(self.object.read(3), b'baz')
928
929 class TestIoCtxSelfManagedSnaps(object):
930 def setUp(self):
931 self.rados = Rados(conffile='')
932 self.rados.connect()
933 self.rados.create_pool('test_pool')
934 assert self.rados.pool_exists('test_pool')
935 self.ioctx = self.rados.open_ioctx('test_pool')
936
937 def tearDown(self):
938 cmd = {"prefix":"osd unset", "key":"noup"}
939 self.rados.mon_command(json.dumps(cmd), b'')
940 self.ioctx.close()
941 self.rados.delete_pool('test_pool')
942 self.rados.shutdown()
943
944 def test(self):
945 # cannot mix-and-match pool and self-managed snapshot mode
946 self.ioctx.set_self_managed_snap_write([])
947 self.ioctx.write('abc', b'abc')
948 snap_id_1 = self.ioctx.create_self_managed_snap()
949 self.ioctx.set_self_managed_snap_write([snap_id_1])
950
951 self.ioctx.write('abc', b'def')
952 snap_id_2 = self.ioctx.create_self_managed_snap()
953 self.ioctx.set_self_managed_snap_write([snap_id_1, snap_id_2])
954
955 self.ioctx.write('abc', b'ghi')
956
957 self.ioctx.rollback_self_managed_snap('abc', snap_id_1)
958 eq(self.ioctx.read('abc'), b'abc')
959
960 self.ioctx.rollback_self_managed_snap('abc', snap_id_2)
961 eq(self.ioctx.read('abc'), b'def')
962
963 self.ioctx.remove_self_managed_snap(snap_id_1)
964 self.ioctx.remove_self_managed_snap(snap_id_2)
965
966 class TestCommand(object):
967
968 def setUp(self):
969 self.rados = Rados(conffile='')
970 self.rados.connect()
971
972 def tearDown(self):
973 self.rados.shutdown()
974
975 def test_monmap_dump(self):
976
977 # check for success and some plain output with epoch in it
978 cmd = {"prefix":"mon dump"}
979 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
980 eq(ret, 0)
981 assert len(buf) > 0
982 assert(b'epoch' in buf)
983
984 # JSON, and grab current epoch
985 cmd['format'] = 'json'
986 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
987 eq(ret, 0)
988 assert len(buf) > 0
989 d = json.loads(buf.decode("utf-8"))
990 assert('epoch' in d)
991 epoch = d['epoch']
992
993 # assume epoch + 1000 does not exist; test for ENOENT
994 cmd['epoch'] = epoch + 1000
995 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
996 eq(ret, -errno.ENOENT)
997 eq(len(buf), 0)
998 del cmd['epoch']
999
1000 # send to specific target by name
1001 target = d['mons'][0]['name']
1002 print(target)
1003 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30,
1004 target=target)
1005 eq(ret, 0)
1006 assert len(buf) > 0
1007 d = json.loads(buf.decode("utf-8"))
1008 assert('epoch' in d)
1009
1010 # and by rank
1011 target = d['mons'][0]['rank']
1012 print(target)
1013 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30,
1014 target=target)
1015 eq(ret, 0)
1016 assert len(buf) > 0
1017 d = json.loads(buf.decode("utf-8"))
1018 assert('epoch' in d)
1019
1020 def test_osd_bench(self):
1021 cmd = dict(prefix='bench', size=4096, count=8192)
1022 ret, buf, err = self.rados.osd_command(0, json.dumps(cmd), b'',
1023 timeout=30)
1024 eq(ret, 0)
1025 assert len(err) > 0
1026 out = json.loads(err)
1027 eq(out['blocksize'], cmd['size'])
1028 eq(out['bytes_written'], cmd['count'])
1029
1030 def test_ceph_osd_pool_create_utf8(self):
1031 if _python2:
1032 # Use encoded bytestring
1033 poolname = b"\351\273\205"
1034 else:
1035 poolname = "\u9ec5"
1036
1037 cmd = {"prefix": "osd pool create", "pg_num": 16, "pool": poolname}
1038 ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'')
1039 eq(ret, 0)
1040 assert len(out) > 0
1041 eq(u"pool '\u9ec5' created", out)