]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/pybind/test_rados.py
6334d6ebd78a65c2d7990dda032e6bb556a5b305
[ceph.git] / ceph / src / test / pybind / test_rados.py
1 from __future__ import print_function
2 from nose import SkipTest
3 from nose.plugins.attrib import attr
4 from nose.tools import eq_ as eq, ok_ as ok, assert_raises
5 from rados import (Rados, Error, RadosStateError, Object, ObjectExists,
6 ObjectNotFound, ObjectBusy, NotConnected,
7 LIBRADOS_ALL_NSPACES, WriteOpCtx, ReadOpCtx, LIBRADOS_CREATE_EXCLUSIVE,
8 LIBRADOS_CMPXATTR_OP_EQ, LIBRADOS_CMPXATTR_OP_GT, LIBRADOS_CMPXATTR_OP_LT, OSError,
9 LIBRADOS_SNAP_HEAD, LIBRADOS_OPERATION_BALANCE_READS, LIBRADOS_OPERATION_SKIPRWLOCKS, MonitorLog, MAX_ERRNO, NoData, ExtendMismatch)
10 from datetime import timedelta
11 import time
12 import threading
13 import json
14 import errno
15 import os
16 import re
17 import sys
18
19 def test_rados_init_error():
20 assert_raises(Error, Rados, conffile='', rados_id='admin',
21 name='client.admin')
22 assert_raises(Error, Rados, conffile='', name='invalid')
23 assert_raises(Error, Rados, conffile='', name='bad.invalid')
24
25 def test_rados_init():
26 with Rados(conffile='', rados_id='admin'):
27 pass
28 with Rados(conffile='', name='client.admin'):
29 pass
30 with Rados(conffile='', name='client.admin'):
31 pass
32 with Rados(conffile='', name='client.admin'):
33 pass
34
35 def test_ioctx_context_manager():
36 with Rados(conffile='', rados_id='admin') as conn:
37 with conn.open_ioctx('rbd') as ioctx:
38 pass
39
40 def test_parse_argv():
41 args = ['osd', 'pool', 'delete', 'foobar', 'foobar', '--yes-i-really-really-mean-it']
42 r = Rados()
43 eq(args, r.conf_parse_argv(args))
44
45 def test_parse_argv_empty_str():
46 args = ['']
47 r = Rados()
48 eq(args, r.conf_parse_argv(args))
49
50 class TestRadosStateError(object):
51 def _requires_configuring(self, rados):
52 assert_raises(RadosStateError, rados.connect)
53
54 def _requires_configuring_or_connected(self, rados):
55 assert_raises(RadosStateError, rados.conf_read_file)
56 assert_raises(RadosStateError, rados.conf_parse_argv, None)
57 assert_raises(RadosStateError, rados.conf_parse_env)
58 assert_raises(RadosStateError, rados.conf_get, 'opt')
59 assert_raises(RadosStateError, rados.conf_set, 'opt', 'val')
60 assert_raises(RadosStateError, rados.ping_monitor, '0')
61
62 def _requires_connected(self, rados):
63 assert_raises(RadosStateError, rados.pool_exists, 'foo')
64 assert_raises(RadosStateError, rados.pool_lookup, 'foo')
65 assert_raises(RadosStateError, rados.pool_reverse_lookup, 0)
66 assert_raises(RadosStateError, rados.create_pool, 'foo')
67 assert_raises(RadosStateError, rados.get_pool_base_tier, 0)
68 assert_raises(RadosStateError, rados.delete_pool, 'foo')
69 assert_raises(RadosStateError, rados.list_pools)
70 assert_raises(RadosStateError, rados.get_fsid)
71 assert_raises(RadosStateError, rados.open_ioctx, 'foo')
72 assert_raises(RadosStateError, rados.mon_command, '', b'')
73 assert_raises(RadosStateError, rados.osd_command, 0, '', b'')
74 assert_raises(RadosStateError, rados.pg_command, '', '', b'')
75 assert_raises(RadosStateError, rados.wait_for_latest_osdmap)
76 assert_raises(RadosStateError, rados.blocklist_add, '127.0.0.1/123', 0)
77
78 def test_configuring(self):
79 rados = Rados(conffile='')
80 eq('configuring', rados.state)
81 self._requires_connected(rados)
82
83 def test_connected(self):
84 rados = Rados(conffile='')
85 with rados:
86 eq('connected', rados.state)
87 self._requires_configuring(rados)
88
89 def test_shutdown(self):
90 rados = Rados(conffile='')
91 with rados:
92 pass
93 eq('shutdown', rados.state)
94 self._requires_configuring(rados)
95 self._requires_configuring_or_connected(rados)
96 self._requires_connected(rados)
97
98
99 class TestRados(object):
100
101 def setUp(self):
102 self.rados = Rados(conffile='')
103 self.rados.conf_parse_env('FOO_DOES_NOT_EXIST_BLAHBLAH')
104 self.rados.conf_parse_env()
105 self.rados.connect()
106
107 # Assume any pre-existing pools are the cluster's defaults
108 self.default_pools = self.rados.list_pools()
109
110 def tearDown(self):
111 self.rados.shutdown()
112
113 def test_ping_monitor(self):
114 assert_raises(ObjectNotFound, self.rados.ping_monitor, 'not_exists_monitor')
115 cmd = {'prefix': 'mon dump', 'format':'json'}
116 ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'')
117 for mon in json.loads(buf.decode('utf8'))['mons']:
118 while True:
119 output = self.rados.ping_monitor(mon['name'])
120 if output is None:
121 continue
122 buf = json.loads(output)
123 if buf.get('health'):
124 break
125
126 def test_annotations(self):
127 with assert_raises(TypeError):
128 self.rados.create_pool(0xf00)
129
130 def test_create(self):
131 self.rados.create_pool('foo')
132 self.rados.delete_pool('foo')
133
134 def test_create_utf8(self):
135 poolname = "\u9ec4"
136 self.rados.create_pool(poolname)
137 assert self.rados.pool_exists(u"\u9ec4")
138 self.rados.delete_pool(poolname)
139
140 def test_pool_lookup_utf8(self):
141 poolname = '\u9ec4'
142 self.rados.create_pool(poolname)
143 try:
144 poolid = self.rados.pool_lookup(poolname)
145 eq(poolname, self.rados.pool_reverse_lookup(poolid))
146 finally:
147 self.rados.delete_pool(poolname)
148
149 def test_eexist(self):
150 self.rados.create_pool('foo')
151 assert_raises(ObjectExists, self.rados.create_pool, 'foo')
152 self.rados.delete_pool('foo')
153
154 def list_non_default_pools(self):
155 pools = self.rados.list_pools()
156 for p in self.default_pools:
157 pools.remove(p)
158 return set(pools)
159
160 def test_list_pools(self):
161 eq(set(), self.list_non_default_pools())
162 self.rados.create_pool('foo')
163 eq(set(['foo']), self.list_non_default_pools())
164 self.rados.create_pool('bar')
165 eq(set(['foo', 'bar']), self.list_non_default_pools())
166 self.rados.create_pool('baz')
167 eq(set(['foo', 'bar', 'baz']), self.list_non_default_pools())
168 self.rados.delete_pool('foo')
169 eq(set(['bar', 'baz']), self.list_non_default_pools())
170 self.rados.delete_pool('baz')
171 eq(set(['bar']), self.list_non_default_pools())
172 self.rados.delete_pool('bar')
173 eq(set(), self.list_non_default_pools())
174 self.rados.create_pool('a' * 500)
175 eq(set(['a' * 500]), self.list_non_default_pools())
176 self.rados.delete_pool('a' * 500)
177
178 @attr('tier')
179 def test_get_pool_base_tier(self):
180 self.rados.create_pool('foo')
181 try:
182 self.rados.create_pool('foo-cache')
183 try:
184 pool_id = self.rados.pool_lookup('foo')
185 tier_pool_id = self.rados.pool_lookup('foo-cache')
186
187 cmd = {"prefix":"osd tier add", "pool":"foo", "tierpool":"foo-cache", "force_nonempty":""}
188 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
189 eq(ret, 0)
190
191 try:
192 cmd = {"prefix":"osd tier cache-mode", "pool":"foo-cache", "tierpool":"foo-cache", "mode":"readonly", "yes_i_really_mean_it": True}
193 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
194 eq(ret, 0)
195
196 eq(self.rados.wait_for_latest_osdmap(), 0)
197
198 eq(pool_id, self.rados.get_pool_base_tier(pool_id))
199 eq(pool_id, self.rados.get_pool_base_tier(tier_pool_id))
200 finally:
201 cmd = {"prefix":"osd tier remove", "pool":"foo", "tierpool":"foo-cache"}
202 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
203 eq(ret, 0)
204 finally:
205 self.rados.delete_pool('foo-cache')
206 finally:
207 self.rados.delete_pool('foo')
208
209 def test_get_fsid(self):
210 fsid = self.rados.get_fsid()
211 assert re.match('[0-9a-f\-]{36}', fsid, re.I)
212
213 def test_blocklist_add(self):
214 self.rados.blocklist_add("1.2.3.4/123", 1)
215
216 @attr('stats')
217 def test_get_cluster_stats(self):
218 stats = self.rados.get_cluster_stats()
219 assert stats['kb'] > 0
220 assert stats['kb_avail'] > 0
221 assert stats['kb_used'] > 0
222 assert stats['num_objects'] >= 0
223
224 def test_monitor_log(self):
225 lock = threading.Condition()
226 def cb(arg, line, who, sec, nsec, seq, level, msg):
227 # NOTE(sileht): the old pyrados API was received the pointer as int
228 # instead of the value of arg
229 eq(arg, "arg")
230 with lock:
231 lock.notify()
232 return 0
233
234 # NOTE(sileht): force don't save the monitor into local var
235 # to ensure all references are correctly tracked into the lib
236 MonitorLog(self.rados, "debug", cb, "arg")
237 with lock:
238 lock.wait()
239 MonitorLog(self.rados, "debug", None, None)
240 eq(None, self.rados.monitor_callback)
241
242 class TestIoctx(object):
243
244 def setUp(self):
245 self.rados = Rados(conffile='')
246 self.rados.connect()
247 self.rados.create_pool('test_pool')
248 assert self.rados.pool_exists('test_pool')
249 self.ioctx = self.rados.open_ioctx('test_pool')
250
251 def tearDown(self):
252 cmd = {"prefix":"osd unset", "key":"noup"}
253 self.rados.mon_command(json.dumps(cmd), b'')
254 self.ioctx.close()
255 self.rados.delete_pool('test_pool')
256 self.rados.shutdown()
257
258 def test_get_last_version(self):
259 version = self.ioctx.get_last_version()
260 assert version >= 0
261
262 def test_get_stats(self):
263 stats = self.ioctx.get_stats()
264 eq(stats, {'num_objects_unfound': 0,
265 'num_objects_missing_on_primary': 0,
266 'num_object_clones': 0,
267 'num_objects': 0,
268 'num_object_copies': 0,
269 'num_bytes': 0,
270 'num_rd_kb': 0,
271 'num_wr_kb': 0,
272 'num_kb': 0,
273 'num_wr': 0,
274 'num_objects_degraded': 0,
275 'num_rd': 0})
276
277 def test_write(self):
278 self.ioctx.write('abc', b'abc')
279 eq(self.ioctx.read('abc'), b'abc')
280
281 def test_write_full(self):
282 self.ioctx.write('abc', b'abc')
283 eq(self.ioctx.read('abc'), b'abc')
284 self.ioctx.write_full('abc', b'd')
285 eq(self.ioctx.read('abc'), b'd')
286
287 def test_writesame(self):
288 self.ioctx.writesame('ob', b'rzx', 9)
289 eq(self.ioctx.read('ob'), b'rzxrzxrzx')
290
291 def test_append(self):
292 self.ioctx.write('abc', b'a')
293 self.ioctx.append('abc', b'b')
294 self.ioctx.append('abc', b'c')
295 eq(self.ioctx.read('abc'), b'abc')
296
297 def test_write_zeros(self):
298 self.ioctx.write('abc', b'a\0b\0c')
299 eq(self.ioctx.read('abc'), b'a\0b\0c')
300
301 def test_trunc(self):
302 self.ioctx.write('abc', b'abc')
303 self.ioctx.trunc('abc', 2)
304 eq(self.ioctx.read('abc'), b'ab')
305 size = self.ioctx.stat('abc')[0]
306 eq(size, 2)
307
308 def test_cmpext(self):
309 self.ioctx.write('test_object', b'abcdefghi')
310 eq(0, self.ioctx.cmpext('test_object', b'abcdefghi', 0))
311 eq(-MAX_ERRNO - 4, self.ioctx.cmpext('test_object', b'abcdxxxxx', 0))
312
313 def test_list_objects_empty(self):
314 eq(list(self.ioctx.list_objects()), [])
315
316 def test_list_objects(self):
317 self.ioctx.write('a', b'')
318 self.ioctx.write('b', b'foo')
319 self.ioctx.write_full('c', b'bar')
320 self.ioctx.append('d', b'jazz')
321 object_names = [obj.key for obj in self.ioctx.list_objects()]
322 eq(sorted(object_names), ['a', 'b', 'c', 'd'])
323
324 def test_list_ns_objects(self):
325 self.ioctx.write('a', b'')
326 self.ioctx.write('b', b'foo')
327 self.ioctx.write_full('c', b'bar')
328 self.ioctx.append('d', b'jazz')
329 self.ioctx.set_namespace("ns1")
330 self.ioctx.write('ns1-a', b'')
331 self.ioctx.write('ns1-b', b'foo')
332 self.ioctx.write_full('ns1-c', b'bar')
333 self.ioctx.append('ns1-d', b'jazz')
334 self.ioctx.append('d', b'jazz')
335 self.ioctx.set_namespace(LIBRADOS_ALL_NSPACES)
336 object_names = [(obj.nspace, obj.key) for obj in self.ioctx.list_objects()]
337 eq(sorted(object_names), [('', 'a'), ('','b'), ('','c'), ('','d'),\
338 ('ns1', 'd'), ('ns1', 'ns1-a'), ('ns1', 'ns1-b'),\
339 ('ns1', 'ns1-c'), ('ns1', 'ns1-d')])
340
341 def test_xattrs(self):
342 xattrs = dict(a=b'1', b=b'2', c=b'3', d=b'a\0b', e=b'\0', f=b'')
343 self.ioctx.write('abc', b'')
344 for key, value in xattrs.items():
345 self.ioctx.set_xattr('abc', key, value)
346 eq(self.ioctx.get_xattr('abc', key), value)
347 stored_xattrs = {}
348 for key, value in self.ioctx.get_xattrs('abc'):
349 stored_xattrs[key] = value
350 eq(stored_xattrs, xattrs)
351
352 def test_obj_xattrs(self):
353 xattrs = dict(a=b'1', b=b'2', c=b'3', d=b'a\0b', e=b'\0', f=b'')
354 self.ioctx.write('abc', b'')
355 obj = list(self.ioctx.list_objects())[0]
356 for key, value in xattrs.items():
357 obj.set_xattr(key, value)
358 eq(obj.get_xattr(key), value)
359 stored_xattrs = {}
360 for key, value in obj.get_xattrs():
361 stored_xattrs[key] = value
362 eq(stored_xattrs, xattrs)
363
364 def test_get_pool_id(self):
365 eq(self.ioctx.get_pool_id(), self.rados.pool_lookup('test_pool'))
366
367 def test_get_pool_name(self):
368 eq(self.ioctx.get_pool_name(), 'test_pool')
369
370 def test_create_snap(self):
371 assert_raises(ObjectNotFound, self.ioctx.remove_snap, 'foo')
372 self.ioctx.create_snap('foo')
373 self.ioctx.remove_snap('foo')
374
375 def test_list_snaps_empty(self):
376 eq(list(self.ioctx.list_snaps()), [])
377
378 def test_list_snaps(self):
379 snaps = ['snap1', 'snap2', 'snap3']
380 for snap in snaps:
381 self.ioctx.create_snap(snap)
382 listed_snaps = [snap.name for snap in self.ioctx.list_snaps()]
383 eq(snaps, listed_snaps)
384
385 def test_lookup_snap(self):
386 self.ioctx.create_snap('foo')
387 snap = self.ioctx.lookup_snap('foo')
388 eq(snap.name, 'foo')
389
390 def test_snap_timestamp(self):
391 self.ioctx.create_snap('foo')
392 snap = self.ioctx.lookup_snap('foo')
393 snap.get_timestamp()
394
395 def test_remove_snap(self):
396 self.ioctx.create_snap('foo')
397 (snap,) = self.ioctx.list_snaps()
398 eq(snap.name, 'foo')
399 self.ioctx.remove_snap('foo')
400 eq(list(self.ioctx.list_snaps()), [])
401
402 @attr('rollback')
403 def test_snap_rollback(self):
404 self.ioctx.write("insnap", b"contents1")
405 self.ioctx.create_snap("snap1")
406 self.ioctx.remove_object("insnap")
407 self.ioctx.snap_rollback("insnap", "snap1")
408 eq(self.ioctx.read("insnap"), b"contents1")
409 self.ioctx.remove_snap("snap1")
410 self.ioctx.remove_object("insnap")
411
412 @attr('rollback')
413 def test_snap_rollback_removed(self):
414 self.ioctx.write("insnap", b"contents1")
415 self.ioctx.create_snap("snap1")
416 self.ioctx.write("insnap", b"contents2")
417 self.ioctx.snap_rollback("insnap", "snap1")
418 eq(self.ioctx.read("insnap"), b"contents1")
419 self.ioctx.remove_snap("snap1")
420 self.ioctx.remove_object("insnap")
421
422 def test_snap_read(self):
423 self.ioctx.write("insnap", b"contents1")
424 self.ioctx.create_snap("snap1")
425 self.ioctx.remove_object("insnap")
426 snap = self.ioctx.lookup_snap("snap1")
427 self.ioctx.set_read(snap.snap_id)
428 eq(self.ioctx.read("insnap"), b"contents1")
429 self.ioctx.set_read(LIBRADOS_SNAP_HEAD)
430 self.ioctx.write("inhead", b"contents2")
431 eq(self.ioctx.read("inhead"), b"contents2")
432 self.ioctx.remove_snap("snap1")
433 self.ioctx.remove_object("inhead")
434
435 def test_set_omap(self):
436 keys = ("1", "2", "3", "4")
437 values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
438 with WriteOpCtx() as write_op:
439 self.ioctx.set_omap(write_op, keys, values)
440 write_op.set_flags(LIBRADOS_OPERATION_SKIPRWLOCKS)
441 self.ioctx.operate_write_op(write_op, "hw")
442 with ReadOpCtx() as read_op:
443 iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 4)
444 eq(ret, 0)
445 self.ioctx.operate_read_op(read_op, "hw")
446 next(iter)
447 eq(list(iter), [("2", b"bbb"), ("3", b"ccc"), ("4", b"\x04\x04\x04\x04")])
448 with ReadOpCtx() as read_op:
449 iter, ret = self.ioctx.get_omap_vals(read_op, "2", "", 4)
450 eq(ret, 0)
451 self.ioctx.operate_read_op(read_op, "hw")
452 eq(("3", b"ccc"), next(iter))
453 eq(list(iter), [("4", b"\x04\x04\x04\x04")])
454 with ReadOpCtx() as read_op:
455 iter, ret = self.ioctx.get_omap_vals(read_op, "", "2", 4)
456 eq(ret, 0)
457 read_op.set_flags(LIBRADOS_OPERATION_BALANCE_READS)
458 self.ioctx.operate_read_op(read_op, "hw")
459 eq(list(iter), [("2", b"bbb")])
460
461 def test_set_omap_aio(self):
462 lock = threading.Condition()
463 count = [0]
464 def cb(blah):
465 with lock:
466 count[0] += 1
467 lock.notify()
468 return 0
469
470 keys = ("1", "2", "3", "4")
471 values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
472 with WriteOpCtx() as write_op:
473 self.ioctx.set_omap(write_op, keys, values)
474 comp = self.ioctx.operate_aio_write_op(write_op, "hw", cb, cb)
475 comp.wait_for_complete()
476 with lock:
477 while count[0] < 2:
478 lock.wait()
479 eq(comp.get_return_value(), 0)
480
481 with ReadOpCtx() as read_op:
482 iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 4)
483 eq(ret, 0)
484 comp = self.ioctx.operate_aio_read_op(read_op, "hw", cb, cb)
485 comp.wait_for_complete()
486 with lock:
487 while count[0] < 4:
488 lock.wait()
489 eq(comp.get_return_value(), 0)
490 next(iter)
491 eq(list(iter), [("2", b"bbb"), ("3", b"ccc"), ("4", b"\x04\x04\x04\x04")])
492
493 def test_write_ops(self):
494 with WriteOpCtx() as write_op:
495 write_op.new(0)
496 self.ioctx.operate_write_op(write_op, "write_ops")
497 eq(self.ioctx.read('write_ops'), b'')
498
499 write_op.write_full(b'1')
500 write_op.append(b'2')
501 self.ioctx.operate_write_op(write_op, "write_ops")
502 eq(self.ioctx.read('write_ops'), b'12')
503
504 write_op.write_full(b'12345')
505 write_op.write(b'x', 2)
506 self.ioctx.operate_write_op(write_op, "write_ops")
507 eq(self.ioctx.read('write_ops'), b'12x45')
508
509 write_op.write_full(b'12345')
510 write_op.zero(2, 2)
511 self.ioctx.operate_write_op(write_op, "write_ops")
512 eq(self.ioctx.read('write_ops'), b'12\x00\x005')
513
514 write_op.write_full(b'12345')
515 write_op.truncate(2)
516 self.ioctx.operate_write_op(write_op, "write_ops")
517 eq(self.ioctx.read('write_ops'), b'12')
518
519 write_op.remove()
520 self.ioctx.operate_write_op(write_op, "write_ops")
521 with assert_raises(ObjectNotFound):
522 self.ioctx.read('write_ops')
523
524 def test_execute_op(self):
525 with WriteOpCtx() as write_op:
526 write_op.execute("hello", "record_hello", b"ebs")
527 self.ioctx.operate_write_op(write_op, "object")
528 eq(self.ioctx.read('object'), b"Hello, ebs!")
529
530 def test_writesame_op(self):
531 with WriteOpCtx() as write_op:
532 write_op.writesame(b'rzx', 9)
533 self.ioctx.operate_write_op(write_op, 'abc')
534 eq(self.ioctx.read('abc'), b'rzxrzxrzx')
535
536 def test_get_omap_vals_by_keys(self):
537 keys = ("1", "2", "3", "4")
538 values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
539 with WriteOpCtx() as write_op:
540 self.ioctx.set_omap(write_op, keys, values)
541 self.ioctx.operate_write_op(write_op, "hw")
542 with ReadOpCtx() as read_op:
543 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("3","4",))
544 eq(ret, 0)
545 self.ioctx.operate_read_op(read_op, "hw")
546 eq(list(iter), [("3", b"ccc"), ("4", b"\x04\x04\x04\x04")])
547 with ReadOpCtx() as read_op:
548 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("3","4",))
549 eq(ret, 0)
550 with assert_raises(ObjectNotFound):
551 self.ioctx.operate_read_op(read_op, "no_such")
552
553 def test_get_omap_keys(self):
554 keys = ("1", "2", "3")
555 values = (b"aaa", b"bbb", b"ccc")
556 with WriteOpCtx() as write_op:
557 self.ioctx.set_omap(write_op, keys, values)
558 self.ioctx.operate_write_op(write_op, "hw")
559 with ReadOpCtx() as read_op:
560 iter, ret = self.ioctx.get_omap_keys(read_op,"",2)
561 eq(ret, 0)
562 self.ioctx.operate_read_op(read_op, "hw")
563 eq(list(iter), [("1", None), ("2", None)])
564 with ReadOpCtx() as read_op:
565 iter, ret = self.ioctx.get_omap_keys(read_op,"",2)
566 eq(ret, 0)
567 with assert_raises(ObjectNotFound):
568 self.ioctx.operate_read_op(read_op, "no_such")
569
570 def test_clear_omap(self):
571 keys = ("1", "2", "3")
572 values = (b"aaa", b"bbb", b"ccc")
573 with WriteOpCtx() as write_op:
574 self.ioctx.set_omap(write_op, keys, values)
575 self.ioctx.operate_write_op(write_op, "hw")
576 with WriteOpCtx() as write_op_1:
577 self.ioctx.clear_omap(write_op_1)
578 self.ioctx.operate_write_op(write_op_1, "hw")
579 with ReadOpCtx() as read_op:
580 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("1",))
581 eq(ret, 0)
582 self.ioctx.operate_read_op(read_op, "hw")
583 eq(list(iter), [])
584
585 def test_remove_omap_range2(self):
586 keys = ("1", "2", "3", "4")
587 values = (b"a", b"bb", b"ccc", b"dddd")
588 with WriteOpCtx() as write_op:
589 self.ioctx.set_omap(write_op, keys, values)
590 self.ioctx.operate_write_op(write_op, "test_obj")
591 with ReadOpCtx() as read_op:
592 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op, keys)
593 eq(ret, 0)
594 self.ioctx.operate_read_op(read_op, "test_obj")
595 eq(list(iter), list(zip(keys, values)))
596 with WriteOpCtx() as write_op:
597 self.ioctx.remove_omap_range2(write_op, "1", "4")
598 self.ioctx.operate_write_op(write_op, "test_obj")
599 with ReadOpCtx() as read_op:
600 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op, keys)
601 eq(ret, 0)
602 self.ioctx.operate_read_op(read_op, "test_obj")
603 eq(list(iter), [("4", b"dddd")])
604
605 def test_omap_cmp(self):
606 object_id = 'test'
607 self.ioctx.write(object_id, b'omap_cmp')
608 with WriteOpCtx() as write_op:
609 self.ioctx.set_omap(write_op, ('key1',), ('1',))
610 self.ioctx.operate_write_op(write_op, object_id)
611 with WriteOpCtx() as write_op:
612 write_op.omap_cmp('key1', '1', LIBRADOS_CMPXATTR_OP_EQ)
613 self.ioctx.set_omap(write_op, ('key1',), ('2',))
614 self.ioctx.operate_write_op(write_op, object_id)
615 with ReadOpCtx() as read_op:
616 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op, ('key1',))
617 eq(ret, 0)
618 self.ioctx.operate_read_op(read_op, object_id)
619 eq(list(iter), [('key1', b'2')])
620 with WriteOpCtx() as write_op:
621 write_op.omap_cmp('key1', '1', LIBRADOS_CMPXATTR_OP_GT)
622 self.ioctx.set_omap(write_op, ('key1',), ('3',))
623 self.ioctx.operate_write_op(write_op, object_id)
624 with ReadOpCtx() as read_op:
625 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op, ('key1',))
626 eq(ret, 0)
627 self.ioctx.operate_read_op(read_op, object_id)
628 eq(list(iter), [('key1', b'3')])
629 with WriteOpCtx() as write_op:
630 write_op.omap_cmp('key1', '4', LIBRADOS_CMPXATTR_OP_LT)
631 self.ioctx.set_omap(write_op, ('key1',), ('4',))
632 self.ioctx.operate_write_op(write_op, object_id)
633 with ReadOpCtx() as read_op:
634 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op, ('key1',))
635 eq(ret, 0)
636 self.ioctx.operate_read_op(read_op, object_id)
637 eq(list(iter), [('key1', b'4')])
638 with WriteOpCtx() as write_op:
639 write_op.omap_cmp('key1', '1', LIBRADOS_CMPXATTR_OP_EQ)
640 self.ioctx.set_omap(write_op, ('key1',), ('5',))
641 try:
642 self.ioctx.operate_write_op(write_op, object_id)
643 except (OSError, ExtendMismatch) as e:
644 eq(e.errno, 125)
645 else:
646 message = "omap_cmp did not raise Exception when omap content does not match"
647 raise AssertionError(message)
648
649 def test_cmpext_op(self):
650 object_id = 'test'
651 with WriteOpCtx() as write_op:
652 write_op.write(b'12345', 0)
653 self.ioctx.operate_write_op(write_op, object_id)
654 with WriteOpCtx() as write_op:
655 write_op.cmpext(b'12345', 0)
656 write_op.write(b'54321', 0)
657 self.ioctx.operate_write_op(write_op, object_id)
658 eq(self.ioctx.read(object_id), b'54321')
659 with WriteOpCtx() as write_op:
660 write_op.cmpext(b'56789', 0)
661 write_op.write(b'12345', 0)
662 try:
663 self.ioctx.operate_write_op(write_op, object_id)
664 except ExtendMismatch as e:
665 # the cmpext_result compare with expected error number, it should be (-MAX_ERRNO - 1)
666 # where "1" is the offset of the first unmatched byte
667 eq(-e.errno, -MAX_ERRNO - 1)
668 eq(e.offset, 1)
669 else:
670 message = "cmpext did not raise Exception when object content does not match"
671 raise AssertionError(message)
672 with ReadOpCtx() as read_op:
673 read_op.cmpext(b'54321', 0)
674 self.ioctx.operate_read_op(read_op, object_id)
675 with ReadOpCtx() as read_op:
676 read_op.cmpext(b'54789', 0)
677 try:
678 self.ioctx.operate_read_op(read_op, object_id)
679 except ExtendMismatch as e:
680 # the cmpext_result compare with expected error number, it should be (-MAX_ERRNO - 2)
681 # where "2" is the offset of the first unmatched byte
682 eq(-e.errno, -MAX_ERRNO - 2)
683 eq(e.offset, 2)
684 else:
685 message = "cmpext did not raise Exception when object content does not match"
686 raise AssertionError(message)
687
688 def test_xattrs_op(self):
689 xattrs = dict(a=b'1', b=b'2', c=b'3', d=b'a\0b', e=b'\0')
690 with WriteOpCtx() as write_op:
691 write_op.new(LIBRADOS_CREATE_EXCLUSIVE)
692 for key, value in xattrs.items():
693 write_op.set_xattr(key, value)
694 self.ioctx.operate_write_op(write_op, 'abc')
695 eq(self.ioctx.get_xattr('abc', key), value)
696
697 stored_xattrs_1 = {}
698 for key, value in self.ioctx.get_xattrs('abc'):
699 stored_xattrs_1[key] = value
700 eq(stored_xattrs_1, xattrs)
701
702 for key in xattrs.keys():
703 write_op.rm_xattr(key)
704 self.ioctx.operate_write_op(write_op, 'abc')
705 stored_xattrs_2 = {}
706 for key, value in self.ioctx.get_xattrs('abc'):
707 stored_xattrs_2[key] = value
708 eq(stored_xattrs_2, {})
709
710 write_op.remove()
711 self.ioctx.operate_write_op(write_op, 'abc')
712
713 def test_locator(self):
714 self.ioctx.set_locator_key("bar")
715 self.ioctx.write('foo', b'contents1')
716 objects = [i for i in self.ioctx.list_objects()]
717 eq(len(objects), 1)
718 eq(self.ioctx.get_locator_key(), "bar")
719 self.ioctx.set_locator_key("")
720 objects[0].seek(0)
721 objects[0].write(b"contents2")
722 eq(self.ioctx.get_locator_key(), "")
723 self.ioctx.set_locator_key("bar")
724 contents = self.ioctx.read("foo")
725 eq(contents, b"contents2")
726 eq(self.ioctx.get_locator_key(), "bar")
727 objects[0].remove()
728 objects = [i for i in self.ioctx.list_objects()]
729 eq(objects, [])
730 self.ioctx.set_locator_key("")
731
732 def test_operate_aio_write_op(self):
733 lock = threading.Condition()
734 count = [0]
735 def cb(blah):
736 with lock:
737 count[0] += 1
738 lock.notify()
739 return 0
740 with WriteOpCtx() as write_op:
741 write_op.write(b'rzx')
742 comp = self.ioctx.operate_aio_write_op(write_op, "object", cb, cb)
743 comp.wait_for_complete()
744 with lock:
745 while count[0] < 2:
746 lock.wait()
747 eq(comp.get_return_value(), 0)
748 eq(self.ioctx.read('object'), b'rzx')
749
750 def test_aio_write(self):
751 lock = threading.Condition()
752 count = [0]
753 def cb(blah):
754 with lock:
755 count[0] += 1
756 lock.notify()
757 return 0
758 comp = self.ioctx.aio_write("foo", b"bar", 0, cb, cb)
759 comp.wait_for_complete()
760 with lock:
761 while count[0] < 2:
762 lock.wait()
763 eq(comp.get_return_value(), 0)
764 contents = self.ioctx.read("foo")
765 eq(contents, b"bar")
766 [i.remove() for i in self.ioctx.list_objects()]
767
768 def test_aio_cmpext(self):
769 lock = threading.Condition()
770 count = [0]
771 def cb(blah):
772 with lock:
773 count[0] += 1
774 lock.notify()
775 return 0
776
777 self.ioctx.write('test_object', b'abcdefghi')
778 comp = self.ioctx.aio_cmpext('test_object', b'abcdefghi', 0, cb)
779 comp.wait_for_complete()
780 with lock:
781 while count[0] < 1:
782 lock.wait()
783 eq(comp.get_return_value(), 0)
784
785 def test_aio_rmxattr(self):
786 lock = threading.Condition()
787 count = [0]
788 def cb(blah):
789 with lock:
790 count[0] += 1
791 lock.notify()
792 return 0
793 self.ioctx.set_xattr("xyz", "key", b'value')
794 eq(self.ioctx.get_xattr("xyz", "key"), b'value')
795 comp = self.ioctx.aio_rmxattr("xyz", "key", cb)
796 comp.wait_for_complete()
797 with lock:
798 while count[0] < 1:
799 lock.wait()
800 eq(comp.get_return_value(), 0)
801 with assert_raises(NoData):
802 self.ioctx.get_xattr("xyz", "key")
803
804 def test_aio_write_no_comp_ref(self):
805 lock = threading.Condition()
806 count = [0]
807 def cb(blah):
808 with lock:
809 count[0] += 1
810 lock.notify()
811 return 0
812 # NOTE(sileht): force don't save the comp into local var
813 # to ensure all references are correctly tracked into the lib
814 self.ioctx.aio_write("foo", b"bar", 0, cb, cb)
815 with lock:
816 while count[0] < 2:
817 lock.wait()
818 contents = self.ioctx.read("foo")
819 eq(contents, b"bar")
820 [i.remove() for i in self.ioctx.list_objects()]
821
822 def test_aio_append(self):
823 lock = threading.Condition()
824 count = [0]
825 def cb(blah):
826 with lock:
827 count[0] += 1
828 lock.notify()
829 return 0
830 comp = self.ioctx.aio_write("foo", b"bar", 0, cb, cb)
831 comp2 = self.ioctx.aio_append("foo", b"baz", cb, cb)
832 comp.wait_for_complete()
833 contents = self.ioctx.read("foo")
834 eq(contents, b"barbaz")
835 with lock:
836 while count[0] < 4:
837 lock.wait()
838 eq(comp.get_return_value(), 0)
839 eq(comp2.get_return_value(), 0)
840 [i.remove() for i in self.ioctx.list_objects()]
841
842 def test_aio_write_full(self):
843 lock = threading.Condition()
844 count = [0]
845 def cb(blah):
846 with lock:
847 count[0] += 1
848 lock.notify()
849 return 0
850 self.ioctx.aio_write("foo", b"barbaz", 0, cb, cb)
851 comp = self.ioctx.aio_write_full("foo", b"bar", cb, cb)
852 comp.wait_for_complete()
853 with lock:
854 while count[0] < 2:
855 lock.wait()
856 eq(comp.get_return_value(), 0)
857 contents = self.ioctx.read("foo")
858 eq(contents, b"bar")
859 [i.remove() for i in self.ioctx.list_objects()]
860
861 def test_aio_writesame(self):
862 lock = threading.Condition()
863 count = [0]
864 def cb(blah):
865 with lock:
866 count[0] += 1
867 lock.notify()
868 return 0
869 comp = self.ioctx.aio_writesame("abc", b"rzx", 9, 0, cb)
870 comp.wait_for_complete()
871 with lock:
872 while count[0] < 1:
873 lock.wait()
874 eq(comp.get_return_value(), 0)
875 eq(self.ioctx.read("abc"), b"rzxrzxrzx")
876 [i.remove() for i in self.ioctx.list_objects()]
877
878 def test_aio_stat(self):
879 lock = threading.Condition()
880 count = [0]
881 def cb(_, size, mtime):
882 with lock:
883 count[0] += 1
884 lock.notify()
885
886 comp = self.ioctx.aio_stat("foo", cb)
887 comp.wait_for_complete()
888 with lock:
889 while count[0] < 1:
890 lock.wait()
891 eq(comp.get_return_value(), -2)
892
893 self.ioctx.write("foo", b"bar")
894
895 comp = self.ioctx.aio_stat("foo", cb)
896 comp.wait_for_complete()
897 with lock:
898 while count[0] < 2:
899 lock.wait()
900 eq(comp.get_return_value(), 0)
901
902 [i.remove() for i in self.ioctx.list_objects()]
903
904 def test_aio_remove(self):
905 lock = threading.Condition()
906 count = [0]
907 def cb(blah):
908 with lock:
909 count[0] += 1
910 lock.notify()
911 return 0
912 self.ioctx.write('foo', b'wrx')
913 eq(self.ioctx.read('foo'), b'wrx')
914 comp = self.ioctx.aio_remove('foo', cb, cb)
915 comp.wait_for_complete()
916 with lock:
917 while count[0] < 2:
918 lock.wait()
919 eq(comp.get_return_value(), 0)
920 eq(list(self.ioctx.list_objects()), [])
921
922 def _take_down_acting_set(self, pool, objectname):
923 # find acting_set for pool:objectname and take it down; used to
924 # verify that async reads don't complete while acting set is missing
925 cmd = {
926 "prefix":"osd map",
927 "pool":pool,
928 "object":objectname,
929 "format":"json",
930 }
931 r, jsonout, _ = self.rados.mon_command(json.dumps(cmd), b'')
932 objmap = json.loads(jsonout.decode("utf-8"))
933 acting_set = objmap['acting']
934 cmd = {"prefix":"osd set", "key":"noup"}
935 r, _, _ = self.rados.mon_command(json.dumps(cmd), b'')
936 eq(r, 0)
937 cmd = {"prefix":"osd down", "ids":[str(i) for i in acting_set]}
938 r, _, _ = self.rados.mon_command(json.dumps(cmd), b'')
939 eq(r, 0)
940
941 # wait for OSDs to acknowledge the down
942 eq(self.rados.wait_for_latest_osdmap(), 0)
943
944 def _let_osds_back_up(self):
945 cmd = {"prefix":"osd unset", "key":"noup"}
946 r, _, _ = self.rados.mon_command(json.dumps(cmd), b'')
947 eq(r, 0)
948
949 @attr('wait')
950 def test_aio_read_wait_for_complete(self):
951 # use wait_for_complete() and wait for cb by
952 # watching retval[0]
953
954 # this is a list so that the local cb() can modify it
955 payload = b"bar\000frob"
956 self.ioctx.write("foo", payload)
957 self._take_down_acting_set('test_pool', 'foo')
958
959 retval = [None]
960 lock = threading.Condition()
961 def cb(_, buf):
962 with lock:
963 retval[0] = buf
964 lock.notify()
965
966 comp = self.ioctx.aio_read("foo", len(payload), 0, cb)
967 eq(False, comp.is_complete())
968 time.sleep(3)
969 eq(False, comp.is_complete())
970 with lock:
971 eq(None, retval[0])
972
973 self._let_osds_back_up()
974 comp.wait_for_complete()
975 loops = 0
976 with lock:
977 while retval[0] is None and loops <= 10:
978 lock.wait(timeout=5)
979 loops += 1
980 assert(loops <= 10)
981
982 eq(retval[0], payload)
983 eq(sys.getrefcount(comp), 2)
984
985 @attr('wait')
986 def test_aio_read_wait_for_complete_and_cb(self):
987 # use wait_for_complete_and_cb(), verify retval[0] is
988 # set by the time we regain control
989 payload = b"bar\000frob"
990 self.ioctx.write("foo", payload)
991
992 self._take_down_acting_set('test_pool', 'foo')
993 # this is a list so that the local cb() can modify it
994 retval = [None]
995 lock = threading.Condition()
996 def cb(_, buf):
997 with lock:
998 retval[0] = buf
999 lock.notify()
1000 comp = self.ioctx.aio_read("foo", len(payload), 0, cb)
1001 eq(False, comp.is_complete())
1002 time.sleep(3)
1003 eq(False, comp.is_complete())
1004 with lock:
1005 eq(None, retval[0])
1006
1007 self._let_osds_back_up()
1008 comp.wait_for_complete_and_cb()
1009 assert(retval[0] is not None)
1010 eq(retval[0], payload)
1011 eq(sys.getrefcount(comp), 2)
1012
1013 @attr('wait')
1014 def test_aio_read_wait_for_complete_and_cb_error(self):
1015 # error case, use wait_for_complete_and_cb(), verify retval[0] is
1016 # set by the time we regain control
1017 self._take_down_acting_set('test_pool', 'bar')
1018
1019 # this is a list so that the local cb() can modify it
1020 retval = [1]
1021 lock = threading.Condition()
1022 def cb(_, buf):
1023 with lock:
1024 retval[0] = buf
1025 lock.notify()
1026
1027 # read from a DNE object
1028 comp = self.ioctx.aio_read("bar", 3, 0, cb)
1029 eq(False, comp.is_complete())
1030 time.sleep(3)
1031 eq(False, comp.is_complete())
1032 with lock:
1033 eq(1, retval[0])
1034 self._let_osds_back_up()
1035
1036 comp.wait_for_complete_and_cb()
1037 eq(None, retval[0])
1038 assert(comp.get_return_value() < 0)
1039 eq(sys.getrefcount(comp), 2)
1040
1041 def test_lock(self):
1042 self.ioctx.lock_exclusive("foo", "lock", "locker", "desc_lock",
1043 10000, 0)
1044 assert_raises(ObjectExists,
1045 self.ioctx.lock_exclusive,
1046 "foo", "lock", "locker", "desc_lock", 10000, 0)
1047 self.ioctx.unlock("foo", "lock", "locker")
1048 assert_raises(ObjectNotFound, self.ioctx.unlock, "foo", "lock", "locker")
1049
1050 self.ioctx.lock_shared("foo", "lock", "locker1", "tag", "desc_lock",
1051 10000, 0)
1052 self.ioctx.lock_shared("foo", "lock", "locker2", "tag", "desc_lock",
1053 10000, 0)
1054 assert_raises(ObjectBusy,
1055 self.ioctx.lock_exclusive,
1056 "foo", "lock", "locker3", "desc_lock", 10000, 0)
1057 self.ioctx.unlock("foo", "lock", "locker1")
1058 self.ioctx.unlock("foo", "lock", "locker2")
1059 assert_raises(ObjectNotFound, self.ioctx.unlock, "foo", "lock", "locker1")
1060 assert_raises(ObjectNotFound, self.ioctx.unlock, "foo", "lock", "locker2")
1061
1062 def test_execute(self):
1063 self.ioctx.write("foo", b"") # ensure object exists
1064
1065 ret, buf = self.ioctx.execute("foo", "hello", "say_hello", b"")
1066 eq(buf, b"Hello, world!")
1067
1068 ret, buf = self.ioctx.execute("foo", "hello", "say_hello", b"nose")
1069 eq(buf, b"Hello, nose!")
1070
1071 def test_aio_execute(self):
1072 count = [0]
1073 retval = [None]
1074 lock = threading.Condition()
1075 def cb(_, buf):
1076 with lock:
1077 if retval[0] is None:
1078 retval[0] = buf
1079 count[0] += 1
1080 lock.notify()
1081 self.ioctx.write("foo", b"") # ensure object exists
1082
1083 comp = self.ioctx.aio_execute("foo", "hello", "say_hello", b"", 32, cb, cb)
1084 comp.wait_for_complete()
1085 with lock:
1086 while count[0] < 2:
1087 lock.wait()
1088 eq(comp.get_return_value(), 13)
1089 eq(retval[0], b"Hello, world!")
1090
1091 retval[0] = None
1092 comp = self.ioctx.aio_execute("foo", "hello", "say_hello", b"nose", 32, cb, cb)
1093 comp.wait_for_complete()
1094 with lock:
1095 while count[0] < 4:
1096 lock.wait()
1097 eq(comp.get_return_value(), 12)
1098 eq(retval[0], b"Hello, nose!")
1099
1100 [i.remove() for i in self.ioctx.list_objects()]
1101
1102 def test_aio_setxattr(self):
1103 lock = threading.Condition()
1104 count = [0]
1105 def cb(blah):
1106 with lock:
1107 count[0] += 1
1108 lock.notify()
1109 return 0
1110 comp = self.ioctx.aio_setxattr("obj", "key", b'value', cb)
1111 comp.wait_for_complete()
1112 with lock:
1113 while count[0] < 1:
1114 lock.wait()
1115 eq(comp.get_return_value(), 0)
1116 eq(self.ioctx.get_xattr("obj", "key"), b'value')
1117
1118 def test_applications(self):
1119 cmd = {"prefix":"osd dump", "format":"json"}
1120 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'')
1121 eq(ret, 0)
1122 assert len(buf) > 0
1123 release = json.loads(buf.decode("utf-8")).get("require_osd_release",
1124 None)
1125 if not release or release[0] < 'l':
1126 raise SkipTest
1127
1128 eq([], self.ioctx.application_list())
1129
1130 self.ioctx.application_enable("app1")
1131 assert_raises(Error, self.ioctx.application_enable, "app2")
1132 self.ioctx.application_enable("app2", True)
1133
1134 assert_raises(Error, self.ioctx.application_metadata_list, "dne")
1135 eq([], self.ioctx.application_metadata_list("app1"))
1136
1137 assert_raises(Error, self.ioctx.application_metadata_set, "dne", "key",
1138 "key")
1139 self.ioctx.application_metadata_set("app1", "key1", "val1")
1140 eq("val1", self.ioctx.application_metadata_get("app1", "key1"))
1141 self.ioctx.application_metadata_set("app1", "key2", "val2")
1142 eq("val2", self.ioctx.application_metadata_get("app1", "key2"))
1143 self.ioctx.application_metadata_set("app2", "key1", "val1")
1144 eq("val1", self.ioctx.application_metadata_get("app2", "key1"))
1145
1146 eq([("key1", "val1"), ("key2", "val2")],
1147 self.ioctx.application_metadata_list("app1"))
1148
1149 self.ioctx.application_metadata_remove("app1", "key1")
1150 eq([("key2", "val2")], self.ioctx.application_metadata_list("app1"))
1151
1152 def test_service_daemon(self):
1153 name = "pid-" + str(os.getpid())
1154 metadata = {'version': '3.14', 'memory': '42'}
1155 self.rados.service_daemon_register("laundry", name, metadata)
1156 status = {'result': 'unknown', 'test': 'running'}
1157 self.rados.service_daemon_update(status)
1158
1159 def test_alignment(self):
1160 eq(self.ioctx.alignment(), None)
1161
1162
1163 @attr('ec')
1164 class TestIoctxEc(object):
1165
1166 def setUp(self):
1167 self.rados = Rados(conffile='')
1168 self.rados.connect()
1169 self.pool = 'test-ec'
1170 self.profile = 'testprofile-%s' % self.pool
1171 cmd = {"prefix": "osd erasure-code-profile set",
1172 "name": self.profile, "profile": ["k=2", "m=1", "crush-failure-domain=osd"]}
1173 ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
1174 eq(ret, 0, msg=out)
1175 # create ec pool with profile created above
1176 cmd = {'prefix': 'osd pool create', 'pg_num': 8, 'pgp_num': 8,
1177 'pool': self.pool, 'pool_type': 'erasure',
1178 'erasure_code_profile': self.profile}
1179 ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
1180 eq(ret, 0, msg=out)
1181 assert self.rados.pool_exists(self.pool)
1182 self.ioctx = self.rados.open_ioctx(self.pool)
1183
1184 def tearDown(self):
1185 cmd = {"prefix": "osd unset", "key": "noup"}
1186 self.rados.mon_command(json.dumps(cmd), b'')
1187 self.ioctx.close()
1188 self.rados.delete_pool(self.pool)
1189 self.rados.shutdown()
1190
1191 def test_alignment(self):
1192 eq(self.ioctx.alignment(), 8192)
1193
1194
1195 class TestIoctx2(object):
1196
1197 def setUp(self):
1198 self.rados = Rados(conffile='')
1199 self.rados.connect()
1200 self.rados.create_pool('test_pool')
1201 assert self.rados.pool_exists('test_pool')
1202 pool_id = self.rados.pool_lookup('test_pool')
1203 assert pool_id > 0
1204 self.ioctx2 = self.rados.open_ioctx2(pool_id)
1205
1206 def tearDown(self):
1207 cmd = {"prefix": "osd unset", "key": "noup"}
1208 self.rados.mon_command(json.dumps(cmd), b'')
1209 self.ioctx2.close()
1210 self.rados.delete_pool('test_pool')
1211 self.rados.shutdown()
1212
1213 def test_get_last_version(self):
1214 version = self.ioctx2.get_last_version()
1215 assert version >= 0
1216
1217 def test_get_stats(self):
1218 stats = self.ioctx2.get_stats()
1219 eq(stats, {'num_objects_unfound': 0,
1220 'num_objects_missing_on_primary': 0,
1221 'num_object_clones': 0,
1222 'num_objects': 0,
1223 'num_object_copies': 0,
1224 'num_bytes': 0,
1225 'num_rd_kb': 0,
1226 'num_wr_kb': 0,
1227 'num_kb': 0,
1228 'num_wr': 0,
1229 'num_objects_degraded': 0,
1230 'num_rd': 0})
1231
1232
1233 class TestObject(object):
1234
1235 def setUp(self):
1236 self.rados = Rados(conffile='')
1237 self.rados.connect()
1238 self.rados.create_pool('test_pool')
1239 assert self.rados.pool_exists('test_pool')
1240 self.ioctx = self.rados.open_ioctx('test_pool')
1241 self.ioctx.write('foo', b'bar')
1242 self.object = Object(self.ioctx, 'foo')
1243
1244 def tearDown(self):
1245 self.ioctx.close()
1246 self.ioctx = None
1247 self.rados.delete_pool('test_pool')
1248 self.rados.shutdown()
1249 self.rados = None
1250
1251 def test_read(self):
1252 eq(self.object.read(3), b'bar')
1253 eq(self.object.read(100), b'')
1254
1255 def test_seek(self):
1256 self.object.write(b'blah')
1257 self.object.seek(0)
1258 eq(self.object.read(4), b'blah')
1259 self.object.seek(1)
1260 eq(self.object.read(3), b'lah')
1261
1262 def test_write(self):
1263 self.object.write(b'barbaz')
1264 self.object.seek(0)
1265 eq(self.object.read(3), b'bar')
1266 eq(self.object.read(3), b'baz')
1267
1268 class TestIoCtxSelfManagedSnaps(object):
1269 def setUp(self):
1270 self.rados = Rados(conffile='')
1271 self.rados.connect()
1272 self.rados.create_pool('test_pool')
1273 assert self.rados.pool_exists('test_pool')
1274 self.ioctx = self.rados.open_ioctx('test_pool')
1275
1276 def tearDown(self):
1277 cmd = {"prefix":"osd unset", "key":"noup"}
1278 self.rados.mon_command(json.dumps(cmd), b'')
1279 self.ioctx.close()
1280 self.rados.delete_pool('test_pool')
1281 self.rados.shutdown()
1282
1283 @attr('rollback')
1284 def test(self):
1285 # cannot mix-and-match pool and self-managed snapshot mode
1286 self.ioctx.set_self_managed_snap_write([])
1287 self.ioctx.write('abc', b'abc')
1288 snap_id_1 = self.ioctx.create_self_managed_snap()
1289 self.ioctx.set_self_managed_snap_write([snap_id_1])
1290
1291 self.ioctx.write('abc', b'def')
1292 snap_id_2 = self.ioctx.create_self_managed_snap()
1293 self.ioctx.set_self_managed_snap_write([snap_id_1, snap_id_2])
1294
1295 self.ioctx.write('abc', b'ghi')
1296
1297 self.ioctx.rollback_self_managed_snap('abc', snap_id_1)
1298 eq(self.ioctx.read('abc'), b'abc')
1299
1300 self.ioctx.rollback_self_managed_snap('abc', snap_id_2)
1301 eq(self.ioctx.read('abc'), b'def')
1302
1303 self.ioctx.remove_self_managed_snap(snap_id_1)
1304 self.ioctx.remove_self_managed_snap(snap_id_2)
1305
1306 class TestCommand(object):
1307
1308 def setUp(self):
1309 self.rados = Rados(conffile='')
1310 self.rados.connect()
1311
1312 def tearDown(self):
1313 self.rados.shutdown()
1314
1315 def test_monmap_dump(self):
1316
1317 # check for success and some plain output with epoch in it
1318 cmd = {"prefix":"mon dump"}
1319 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
1320 eq(ret, 0)
1321 assert len(buf) > 0
1322 assert(b'epoch' in buf)
1323
1324 # JSON, and grab current epoch
1325 cmd['format'] = 'json'
1326 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
1327 eq(ret, 0)
1328 assert len(buf) > 0
1329 d = json.loads(buf.decode("utf-8"))
1330 assert('epoch' in d)
1331 epoch = d['epoch']
1332
1333 # assume epoch + 1000 does not exist; test for ENOENT
1334 cmd['epoch'] = epoch + 1000
1335 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
1336 eq(ret, -errno.ENOENT)
1337 eq(len(buf), 0)
1338 del cmd['epoch']
1339
1340 # send to specific target by name, rank
1341 cmd = {"prefix": "version"}
1342
1343 target = d['mons'][0]['name']
1344 print(target)
1345 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30,
1346 target=target)
1347 eq(ret, 0)
1348 assert len(buf) > 0
1349 e = json.loads(buf.decode("utf-8"))
1350 assert('release' in e)
1351
1352 target = d['mons'][0]['rank']
1353 print(target)
1354 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30,
1355 target=target)
1356 eq(ret, 0)
1357 assert len(buf) > 0
1358 e = json.loads(buf.decode("utf-8"))
1359 assert('release' in e)
1360
1361 @attr('bench')
1362 def test_osd_bench(self):
1363 cmd = dict(prefix='bench', size=4096, count=8192)
1364 ret, buf, err = self.rados.osd_command(0, json.dumps(cmd), b'',
1365 timeout=30)
1366 eq(ret, 0)
1367 assert len(buf) > 0
1368 out = json.loads(buf.decode('utf-8'))
1369 eq(out['blocksize'], cmd['size'])
1370 eq(out['bytes_written'], cmd['count'])
1371
1372 def test_ceph_osd_pool_create_utf8(self):
1373 poolname = "\u9ec5"
1374
1375 cmd = {"prefix": "osd pool create", "pg_num": 16, "pool": poolname}
1376 ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'')
1377 eq(ret, 0)
1378 assert len(out) > 0
1379 eq(u"pool '\u9ec5' created", out)
1380
1381
1382 @attr('watch')
1383 class TestWatchNotify(object):
1384 OID = "test_watch_notify"
1385
1386 def setUp(self):
1387 self.rados = Rados(conffile='')
1388 self.rados.connect()
1389 self.rados.create_pool('test_pool')
1390 assert self.rados.pool_exists('test_pool')
1391 self.ioctx = self.rados.open_ioctx('test_pool')
1392 self.ioctx.write(self.OID, b'test watch notify')
1393 self.lock = threading.Condition()
1394 self.notify_cnt = {}
1395 self.notify_data = {}
1396 self.notify_error = {}
1397 # aio related
1398 self.ack_cnt = {}
1399 self.ack_data = {}
1400 self.instance_id = self.rados.get_instance_id()
1401
1402 def tearDown(self):
1403 self.ioctx.close()
1404 self.rados.delete_pool('test_pool')
1405 self.rados.shutdown()
1406
1407 def make_callback(self):
1408 def callback(notify_id, notifier_id, watch_id, data):
1409 with self.lock:
1410 if watch_id not in self.notify_cnt:
1411 self.notify_cnt[watch_id] = 1
1412 elif self.notify_data[watch_id] != data:
1413 self.notify_cnt[watch_id] += 1
1414 self.notify_data[watch_id] = data
1415 return callback
1416
1417 def make_error_callback(self):
1418 def callback(watch_id, error):
1419 with self.lock:
1420 self.notify_error[watch_id] = error
1421 return callback
1422
1423
1424 def test(self):
1425 with self.ioctx.watch(self.OID, self.make_callback(),
1426 self.make_error_callback()) as watch1:
1427 watch_id1 = watch1.get_id()
1428 assert(watch_id1 > 0)
1429
1430 with self.rados.open_ioctx('test_pool') as ioctx:
1431 watch2 = ioctx.watch(self.OID, self.make_callback(),
1432 self.make_error_callback())
1433 watch_id2 = watch2.get_id()
1434 assert(watch_id2 > 0)
1435
1436 assert(self.ioctx.notify(self.OID, 'test'))
1437 with self.lock:
1438 assert(watch_id1 in self.notify_cnt)
1439 assert(watch_id2 in self.notify_cnt)
1440 eq(self.notify_cnt[watch_id1], 1)
1441 eq(self.notify_cnt[watch_id2], 1)
1442 eq(self.notify_data[watch_id1], b'test')
1443 eq(self.notify_data[watch_id2], b'test')
1444
1445 assert(watch1.check() >= timedelta())
1446 assert(watch2.check() >= timedelta())
1447
1448 assert(self.ioctx.notify(self.OID, 'best'))
1449 with self.lock:
1450 eq(self.notify_cnt[watch_id1], 2)
1451 eq(self.notify_cnt[watch_id2], 2)
1452 eq(self.notify_data[watch_id1], b'best')
1453 eq(self.notify_data[watch_id2], b'best')
1454
1455 watch2.close()
1456
1457 assert(self.ioctx.notify(self.OID, 'rest'))
1458 with self.lock:
1459 eq(self.notify_cnt[watch_id1], 3)
1460 eq(self.notify_cnt[watch_id2], 2)
1461 eq(self.notify_data[watch_id1], b'rest')
1462 eq(self.notify_data[watch_id2], b'best')
1463
1464 assert(watch1.check() >= timedelta())
1465
1466 self.ioctx.remove_object(self.OID)
1467
1468 for i in range(10):
1469 with self.lock:
1470 if watch_id1 in self.notify_error:
1471 break
1472 time.sleep(1)
1473 eq(self.notify_error[watch_id1], -errno.ENOTCONN)
1474 assert_raises(NotConnected, watch1.check)
1475
1476 assert_raises(ObjectNotFound, self.ioctx.notify, self.OID, 'test')
1477
1478 def make_callback_reply(self):
1479 def callback(notify_id, notifier_id, watch_id, data):
1480 with self.lock:
1481 return data
1482 return callback
1483
1484 def notify_callback(self, _, r, ack_list, timeout_list):
1485 eq(r, 0)
1486 with self.lock:
1487 for notifier_id, _, notifier_data in ack_list:
1488 if notifier_id not in self.ack_cnt:
1489 self.ack_cnt[notifier_id] = 0
1490 self.ack_cnt[notifier_id] += 1
1491 self.ack_data[notifier_id] = notifier_data
1492
1493 def notify_callback_err(self, _, r, ack_list, timeout_list):
1494 eq(r, -errno.ENOENT)
1495
1496 def test_aio_notify(self):
1497 with self.ioctx.watch(self.OID, self.make_callback_reply(),
1498 self.make_error_callback()) as watch1:
1499 watch_id1 = watch1.get_id()
1500 ok(watch_id1 > 0)
1501
1502 with self.rados.open_ioctx('test_pool') as ioctx:
1503 watch2 = ioctx.watch(self.OID, self.make_callback_reply(),
1504 self.make_error_callback())
1505 watch_id2 = watch2.get_id()
1506 ok(watch_id2 > 0)
1507
1508 comp = self.ioctx.aio_notify(self.OID, self.notify_callback, msg='test')
1509 comp.wait_for_complete_and_cb()
1510 with self.lock:
1511 ok(self.instance_id in self.ack_cnt)
1512 eq(self.ack_cnt[self.instance_id], 2)
1513 eq(self.ack_data[self.instance_id], b'test')
1514
1515 ok(watch1.check() >= timedelta())
1516 ok(watch2.check() >= timedelta())
1517
1518 comp = self.ioctx.aio_notify(self.OID, self.notify_callback, msg='best')
1519 comp.wait_for_complete_and_cb()
1520 with self.lock:
1521 eq(self.ack_cnt[self.instance_id], 4)
1522 eq(self.ack_data[self.instance_id], b'best')
1523
1524 watch2.close()
1525
1526 comp = self.ioctx.aio_notify(self.OID, self.notify_callback, msg='rest')
1527 comp.wait_for_complete_and_cb()
1528 with self.lock:
1529 eq(self.ack_cnt[self.instance_id], 5)
1530 eq(self.ack_data[self.instance_id], b'rest')
1531
1532 assert(watch1.check() >= timedelta())
1533 self.ioctx.remove_object(self.OID)
1534
1535 for i in range(10):
1536 with self.lock:
1537 if watch_id1 in self.notify_error:
1538 break
1539 time.sleep(1)
1540 eq(self.notify_error[watch_id1], -errno.ENOTCONN)
1541 assert_raises(NotConnected, watch1.check)
1542
1543 comp = self.ioctx.aio_notify(self.OID, self.notify_callback_err, msg='test')
1544 comp.wait_for_complete_and_cb()