]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/pybind/test_rados.py
508cbe5ae9e469056a2ed3dd6a59778d746b6cb8
[ceph.git] / ceph / src / test / pybind / test_rados.py
1 from __future__ import print_function
2 from nose import SkipTest
3 from nose.plugins.attrib import attr
4 from nose.tools import eq_ as eq, ok_ as ok, assert_raises
5 from rados import (Rados, Error, RadosStateError, Object, ObjectExists,
6 ObjectNotFound, ObjectBusy, NotConnected,
7 LIBRADOS_ALL_NSPACES, WriteOpCtx, ReadOpCtx, LIBRADOS_CREATE_EXCLUSIVE,
8 LIBRADOS_CMPXATTR_OP_EQ, LIBRADOS_CMPXATTR_OP_GT, LIBRADOS_CMPXATTR_OP_LT, OSError,
9 LIBRADOS_SNAP_HEAD, LIBRADOS_OPERATION_BALANCE_READS, LIBRADOS_OPERATION_SKIPRWLOCKS, MonitorLog, MAX_ERRNO, NoData, ExtendMismatch)
10 from datetime import timedelta
11 import time
12 import threading
13 import json
14 import errno
15 import os
16 import re
17 import sys
18
19 def test_rados_init_error():
20 assert_raises(Error, Rados, conffile='', rados_id='admin',
21 name='client.admin')
22 assert_raises(Error, Rados, conffile='', name='invalid')
23 assert_raises(Error, Rados, conffile='', name='bad.invalid')
24
25 def test_rados_init():
26 with Rados(conffile='', rados_id='admin'):
27 pass
28 with Rados(conffile='', name='client.admin'):
29 pass
30 with Rados(conffile='', name='client.admin'):
31 pass
32 with Rados(conffile='', name='client.admin'):
33 pass
34
35 def test_ioctx_context_manager():
36 with Rados(conffile='', rados_id='admin') as conn:
37 with conn.open_ioctx('rbd') as ioctx:
38 pass
39
40 def test_parse_argv():
41 args = ['osd', 'pool', 'delete', 'foobar', 'foobar', '--yes-i-really-really-mean-it']
42 r = Rados()
43 eq(args, r.conf_parse_argv(args))
44
45 def test_parse_argv_empty_str():
46 args = ['']
47 r = Rados()
48 eq(args, r.conf_parse_argv(args))
49
50 class TestRadosStateError(object):
51 def _requires_configuring(self, rados):
52 assert_raises(RadosStateError, rados.connect)
53
54 def _requires_configuring_or_connected(self, rados):
55 assert_raises(RadosStateError, rados.conf_read_file)
56 assert_raises(RadosStateError, rados.conf_parse_argv, None)
57 assert_raises(RadosStateError, rados.conf_parse_env)
58 assert_raises(RadosStateError, rados.conf_get, 'opt')
59 assert_raises(RadosStateError, rados.conf_set, 'opt', 'val')
60 assert_raises(RadosStateError, rados.ping_monitor, '0')
61
62 def _requires_connected(self, rados):
63 assert_raises(RadosStateError, rados.pool_exists, 'foo')
64 assert_raises(RadosStateError, rados.pool_lookup, 'foo')
65 assert_raises(RadosStateError, rados.pool_reverse_lookup, 0)
66 assert_raises(RadosStateError, rados.create_pool, 'foo')
67 assert_raises(RadosStateError, rados.get_pool_base_tier, 0)
68 assert_raises(RadosStateError, rados.delete_pool, 'foo')
69 assert_raises(RadosStateError, rados.list_pools)
70 assert_raises(RadosStateError, rados.get_fsid)
71 assert_raises(RadosStateError, rados.open_ioctx, 'foo')
72 assert_raises(RadosStateError, rados.mon_command, '', b'')
73 assert_raises(RadosStateError, rados.osd_command, 0, '', b'')
74 assert_raises(RadosStateError, rados.pg_command, '', '', b'')
75 assert_raises(RadosStateError, rados.wait_for_latest_osdmap)
76 assert_raises(RadosStateError, rados.blocklist_add, '127.0.0.1/123', 0)
77
78 def test_configuring(self):
79 rados = Rados(conffile='')
80 eq('configuring', rados.state)
81 self._requires_connected(rados)
82
83 def test_connected(self):
84 rados = Rados(conffile='')
85 with rados:
86 eq('connected', rados.state)
87 self._requires_configuring(rados)
88
89 def test_shutdown(self):
90 rados = Rados(conffile='')
91 with rados:
92 pass
93 eq('shutdown', rados.state)
94 self._requires_configuring(rados)
95 self._requires_configuring_or_connected(rados)
96 self._requires_connected(rados)
97
98
99 class TestRados(object):
100
101 def setUp(self):
102 self.rados = Rados(conffile='')
103 self.rados.conf_parse_env('FOO_DOES_NOT_EXIST_BLAHBLAH')
104 self.rados.conf_parse_env()
105 self.rados.connect()
106
107 # Assume any pre-existing pools are the cluster's defaults
108 self.default_pools = self.rados.list_pools()
109
110 def tearDown(self):
111 self.rados.shutdown()
112
113 def test_ping_monitor(self):
114 assert_raises(ObjectNotFound, self.rados.ping_monitor, 'not_exists_monitor')
115 cmd = {'prefix': 'mon dump', 'format':'json'}
116 ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'')
117 for mon in json.loads(buf.decode('utf8'))['mons']:
118 while True:
119 output = self.rados.ping_monitor(mon['name'])
120 if output is None:
121 continue
122 buf = json.loads(output)
123 if buf.get('health'):
124 break
125
126 def test_annotations(self):
127 with assert_raises(TypeError):
128 self.rados.create_pool(0xf00)
129
130 def test_create(self):
131 self.rados.create_pool('foo')
132 self.rados.delete_pool('foo')
133
134 def test_create_utf8(self):
135 poolname = "\u9ec4"
136 self.rados.create_pool(poolname)
137 assert self.rados.pool_exists(u"\u9ec4")
138 self.rados.delete_pool(poolname)
139
140 def test_pool_lookup_utf8(self):
141 poolname = '\u9ec4'
142 self.rados.create_pool(poolname)
143 try:
144 poolid = self.rados.pool_lookup(poolname)
145 eq(poolname, self.rados.pool_reverse_lookup(poolid))
146 finally:
147 self.rados.delete_pool(poolname)
148
149 def test_eexist(self):
150 self.rados.create_pool('foo')
151 assert_raises(ObjectExists, self.rados.create_pool, 'foo')
152 self.rados.delete_pool('foo')
153
154 def list_non_default_pools(self):
155 pools = self.rados.list_pools()
156 for p in self.default_pools:
157 pools.remove(p)
158 return set(pools)
159
160 def test_list_pools(self):
161 eq(set(), self.list_non_default_pools())
162 self.rados.create_pool('foo')
163 eq(set(['foo']), self.list_non_default_pools())
164 self.rados.create_pool('bar')
165 eq(set(['foo', 'bar']), self.list_non_default_pools())
166 self.rados.create_pool('baz')
167 eq(set(['foo', 'bar', 'baz']), self.list_non_default_pools())
168 self.rados.delete_pool('foo')
169 eq(set(['bar', 'baz']), self.list_non_default_pools())
170 self.rados.delete_pool('baz')
171 eq(set(['bar']), self.list_non_default_pools())
172 self.rados.delete_pool('bar')
173 eq(set(), self.list_non_default_pools())
174 self.rados.create_pool('a' * 500)
175 eq(set(['a' * 500]), self.list_non_default_pools())
176 self.rados.delete_pool('a' * 500)
177
178 @attr('tier')
179 def test_get_pool_base_tier(self):
180 self.rados.create_pool('foo')
181 try:
182 self.rados.create_pool('foo-cache')
183 try:
184 pool_id = self.rados.pool_lookup('foo')
185 tier_pool_id = self.rados.pool_lookup('foo-cache')
186
187 cmd = {"prefix":"osd tier add", "pool":"foo", "tierpool":"foo-cache", "force_nonempty":""}
188 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
189 eq(ret, 0)
190
191 try:
192 cmd = {"prefix":"osd tier cache-mode", "pool":"foo-cache", "tierpool":"foo-cache", "mode":"readonly", "yes_i_really_mean_it": True}
193 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
194 eq(ret, 0)
195
196 eq(self.rados.wait_for_latest_osdmap(), 0)
197
198 eq(pool_id, self.rados.get_pool_base_tier(pool_id))
199 eq(pool_id, self.rados.get_pool_base_tier(tier_pool_id))
200 finally:
201 cmd = {"prefix":"osd tier remove", "pool":"foo", "tierpool":"foo-cache"}
202 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
203 eq(ret, 0)
204 finally:
205 self.rados.delete_pool('foo-cache')
206 finally:
207 self.rados.delete_pool('foo')
208
209 def test_get_fsid(self):
210 fsid = self.rados.get_fsid()
211 assert re.match('[0-9a-f\-]{36}', fsid, re.I)
212
213 def test_blocklist_add(self):
214 self.rados.blocklist_add("1.2.3.4/123", 1)
215
216 @attr('stats')
217 def test_get_cluster_stats(self):
218 stats = self.rados.get_cluster_stats()
219 assert stats['kb'] > 0
220 assert stats['kb_avail'] > 0
221 assert stats['kb_used'] > 0
222 assert stats['num_objects'] >= 0
223
224 def test_monitor_log(self):
225 lock = threading.Condition()
226 def cb(arg, line, who, sec, nsec, seq, level, msg):
227 # NOTE(sileht): the old pyrados API was received the pointer as int
228 # instead of the value of arg
229 eq(arg, "arg")
230 with lock:
231 lock.notify()
232 return 0
233
234 # NOTE(sileht): force don't save the monitor into local var
235 # to ensure all references are correctly tracked into the lib
236 MonitorLog(self.rados, "debug", cb, "arg")
237 with lock:
238 lock.wait()
239 MonitorLog(self.rados, "debug", None, None)
240 eq(None, self.rados.monitor_callback)
241
242 class TestIoctx(object):
243
244 def setUp(self):
245 self.rados = Rados(conffile='')
246 self.rados.connect()
247 self.rados.create_pool('test_pool')
248 assert self.rados.pool_exists('test_pool')
249 self.ioctx = self.rados.open_ioctx('test_pool')
250
251 def tearDown(self):
252 cmd = {"prefix":"osd unset", "key":"noup"}
253 self.rados.mon_command(json.dumps(cmd), b'')
254 self.ioctx.close()
255 self.rados.delete_pool('test_pool')
256 self.rados.shutdown()
257
258 def test_get_last_version(self):
259 version = self.ioctx.get_last_version()
260 assert version >= 0
261
262 def test_get_stats(self):
263 stats = self.ioctx.get_stats()
264 eq(stats, {'num_objects_unfound': 0,
265 'num_objects_missing_on_primary': 0,
266 'num_object_clones': 0,
267 'num_objects': 0,
268 'num_object_copies': 0,
269 'num_bytes': 0,
270 'num_rd_kb': 0,
271 'num_wr_kb': 0,
272 'num_kb': 0,
273 'num_wr': 0,
274 'num_objects_degraded': 0,
275 'num_rd': 0})
276
277 def test_write(self):
278 self.ioctx.write('abc', b'abc')
279 eq(self.ioctx.read('abc'), b'abc')
280
281 def test_write_full(self):
282 self.ioctx.write('abc', b'abc')
283 eq(self.ioctx.read('abc'), b'abc')
284 self.ioctx.write_full('abc', b'd')
285 eq(self.ioctx.read('abc'), b'd')
286
287 def test_writesame(self):
288 self.ioctx.writesame('ob', b'rzx', 9)
289 eq(self.ioctx.read('ob'), b'rzxrzxrzx')
290
291 def test_append(self):
292 self.ioctx.write('abc', b'a')
293 self.ioctx.append('abc', b'b')
294 self.ioctx.append('abc', b'c')
295 eq(self.ioctx.read('abc'), b'abc')
296
297 def test_write_zeros(self):
298 self.ioctx.write('abc', b'a\0b\0c')
299 eq(self.ioctx.read('abc'), b'a\0b\0c')
300
301 def test_trunc(self):
302 self.ioctx.write('abc', b'abc')
303 self.ioctx.trunc('abc', 2)
304 eq(self.ioctx.read('abc'), b'ab')
305 size = self.ioctx.stat('abc')[0]
306 eq(size, 2)
307
308 def test_cmpext(self):
309 self.ioctx.write('test_object', b'abcdefghi')
310 eq(0, self.ioctx.cmpext('test_object', b'abcdefghi', 0))
311 eq(-MAX_ERRNO - 4, self.ioctx.cmpext('test_object', b'abcdxxxxx', 0))
312
313 def test_list_objects_empty(self):
314 eq(list(self.ioctx.list_objects()), [])
315
316 def test_list_objects(self):
317 self.ioctx.write('a', b'')
318 self.ioctx.write('b', b'foo')
319 self.ioctx.write_full('c', b'bar')
320 self.ioctx.append('d', b'jazz')
321 object_names = [obj.key for obj in self.ioctx.list_objects()]
322 eq(sorted(object_names), ['a', 'b', 'c', 'd'])
323
324 def test_list_ns_objects(self):
325 self.ioctx.write('a', b'')
326 self.ioctx.write('b', b'foo')
327 self.ioctx.write_full('c', b'bar')
328 self.ioctx.append('d', b'jazz')
329 self.ioctx.set_namespace("ns1")
330 self.ioctx.write('ns1-a', b'')
331 self.ioctx.write('ns1-b', b'foo')
332 self.ioctx.write_full('ns1-c', b'bar')
333 self.ioctx.append('ns1-d', b'jazz')
334 self.ioctx.append('d', b'jazz')
335 self.ioctx.set_namespace(LIBRADOS_ALL_NSPACES)
336 object_names = [(obj.nspace, obj.key) for obj in self.ioctx.list_objects()]
337 eq(sorted(object_names), [('', 'a'), ('','b'), ('','c'), ('','d'),\
338 ('ns1', 'd'), ('ns1', 'ns1-a'), ('ns1', 'ns1-b'),\
339 ('ns1', 'ns1-c'), ('ns1', 'ns1-d')])
340
341 def test_xattrs(self):
342 xattrs = dict(a=b'1', b=b'2', c=b'3', d=b'a\0b', e=b'\0', f=b'')
343 self.ioctx.write('abc', b'')
344 for key, value in xattrs.items():
345 self.ioctx.set_xattr('abc', key, value)
346 eq(self.ioctx.get_xattr('abc', key), value)
347 stored_xattrs = {}
348 for key, value in self.ioctx.get_xattrs('abc'):
349 stored_xattrs[key] = value
350 eq(stored_xattrs, xattrs)
351
352 def test_obj_xattrs(self):
353 xattrs = dict(a=b'1', b=b'2', c=b'3', d=b'a\0b', e=b'\0', f=b'')
354 self.ioctx.write('abc', b'')
355 obj = list(self.ioctx.list_objects())[0]
356 for key, value in xattrs.items():
357 obj.set_xattr(key, value)
358 eq(obj.get_xattr(key), value)
359 stored_xattrs = {}
360 for key, value in obj.get_xattrs():
361 stored_xattrs[key] = value
362 eq(stored_xattrs, xattrs)
363
364 def test_get_pool_id(self):
365 eq(self.ioctx.get_pool_id(), self.rados.pool_lookup('test_pool'))
366
367 def test_get_pool_name(self):
368 eq(self.ioctx.get_pool_name(), 'test_pool')
369
370 @attr('snap')
371 def test_create_snap(self):
372 assert_raises(ObjectNotFound, self.ioctx.remove_snap, 'foo')
373 self.ioctx.create_snap('foo')
374 self.ioctx.remove_snap('foo')
375
376 @attr('snap')
377 def test_list_snaps_empty(self):
378 eq(list(self.ioctx.list_snaps()), [])
379
380 @attr('snap')
381 def test_list_snaps(self):
382 snaps = ['snap1', 'snap2', 'snap3']
383 for snap in snaps:
384 self.ioctx.create_snap(snap)
385 listed_snaps = [snap.name for snap in self.ioctx.list_snaps()]
386 eq(snaps, listed_snaps)
387
388 @attr('snap')
389 def test_lookup_snap(self):
390 self.ioctx.create_snap('foo')
391 snap = self.ioctx.lookup_snap('foo')
392 eq(snap.name, 'foo')
393
394 @attr('snap')
395 def test_snap_timestamp(self):
396 self.ioctx.create_snap('foo')
397 snap = self.ioctx.lookup_snap('foo')
398 snap.get_timestamp()
399
400 @attr('snap')
401 def test_remove_snap(self):
402 self.ioctx.create_snap('foo')
403 (snap,) = self.ioctx.list_snaps()
404 eq(snap.name, 'foo')
405 self.ioctx.remove_snap('foo')
406 eq(list(self.ioctx.list_snaps()), [])
407
408 @attr('snap')
409 def test_snap_rollback(self):
410 self.ioctx.write("insnap", b"contents1")
411 self.ioctx.create_snap("snap1")
412 self.ioctx.remove_object("insnap")
413 self.ioctx.snap_rollback("insnap", "snap1")
414 eq(self.ioctx.read("insnap"), b"contents1")
415 self.ioctx.remove_snap("snap1")
416 self.ioctx.remove_object("insnap")
417
418 @attr('snap')
419 def test_snap_read(self):
420 self.ioctx.write("insnap", b"contents1")
421 self.ioctx.create_snap("snap1")
422 self.ioctx.remove_object("insnap")
423 snap = self.ioctx.lookup_snap("snap1")
424 self.ioctx.set_read(snap.snap_id)
425 eq(self.ioctx.read("insnap"), b"contents1")
426 self.ioctx.set_read(LIBRADOS_SNAP_HEAD)
427 self.ioctx.write("inhead", b"contents2")
428 eq(self.ioctx.read("inhead"), b"contents2")
429 self.ioctx.remove_snap("snap1")
430 self.ioctx.remove_object("inhead")
431
432 def test_set_omap(self):
433 keys = ("1", "2", "3", "4")
434 values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
435 with WriteOpCtx() as write_op:
436 self.ioctx.set_omap(write_op, keys, values)
437 write_op.set_flags(LIBRADOS_OPERATION_SKIPRWLOCKS)
438 self.ioctx.operate_write_op(write_op, "hw")
439 with ReadOpCtx() as read_op:
440 iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 4)
441 eq(ret, 0)
442 self.ioctx.operate_read_op(read_op, "hw")
443 next(iter)
444 eq(list(iter), [("2", b"bbb"), ("3", b"ccc"), ("4", b"\x04\x04\x04\x04")])
445 with ReadOpCtx() as read_op:
446 iter, ret = self.ioctx.get_omap_vals(read_op, "2", "", 4)
447 eq(ret, 0)
448 self.ioctx.operate_read_op(read_op, "hw")
449 eq(("3", b"ccc"), next(iter))
450 eq(list(iter), [("4", b"\x04\x04\x04\x04")])
451 with ReadOpCtx() as read_op:
452 iter, ret = self.ioctx.get_omap_vals(read_op, "", "2", 4)
453 eq(ret, 0)
454 read_op.set_flags(LIBRADOS_OPERATION_BALANCE_READS)
455 self.ioctx.operate_read_op(read_op, "hw")
456 eq(list(iter), [("2", b"bbb")])
457
458 def test_set_omap_aio(self):
459 lock = threading.Condition()
460 count = [0]
461 def cb(blah):
462 with lock:
463 count[0] += 1
464 lock.notify()
465 return 0
466
467 keys = ("1", "2", "3", "4")
468 values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
469 with WriteOpCtx() as write_op:
470 self.ioctx.set_omap(write_op, keys, values)
471 comp = self.ioctx.operate_aio_write_op(write_op, "hw", cb, cb)
472 comp.wait_for_complete()
473 with lock:
474 while count[0] < 2:
475 lock.wait()
476 eq(comp.get_return_value(), 0)
477
478 with ReadOpCtx() as read_op:
479 iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 4)
480 eq(ret, 0)
481 comp = self.ioctx.operate_aio_read_op(read_op, "hw", cb, cb)
482 comp.wait_for_complete()
483 with lock:
484 while count[0] < 4:
485 lock.wait()
486 eq(comp.get_return_value(), 0)
487 next(iter)
488 eq(list(iter), [("2", b"bbb"), ("3", b"ccc"), ("4", b"\x04\x04\x04\x04")])
489
490 def test_write_ops(self):
491 with WriteOpCtx() as write_op:
492 write_op.new(0)
493 self.ioctx.operate_write_op(write_op, "write_ops")
494 eq(self.ioctx.read('write_ops'), b'')
495
496 write_op.write_full(b'1')
497 write_op.append(b'2')
498 self.ioctx.operate_write_op(write_op, "write_ops")
499 eq(self.ioctx.read('write_ops'), b'12')
500
501 write_op.write_full(b'12345')
502 write_op.write(b'x', 2)
503 self.ioctx.operate_write_op(write_op, "write_ops")
504 eq(self.ioctx.read('write_ops'), b'12x45')
505
506 write_op.write_full(b'12345')
507 write_op.zero(2, 2)
508 self.ioctx.operate_write_op(write_op, "write_ops")
509 eq(self.ioctx.read('write_ops'), b'12\x00\x005')
510
511 write_op.write_full(b'12345')
512 write_op.truncate(2)
513 self.ioctx.operate_write_op(write_op, "write_ops")
514 eq(self.ioctx.read('write_ops'), b'12')
515
516 write_op.remove()
517 self.ioctx.operate_write_op(write_op, "write_ops")
518 with assert_raises(ObjectNotFound):
519 self.ioctx.read('write_ops')
520
521 def test_execute_op(self):
522 with WriteOpCtx() as write_op:
523 write_op.execute("hello", "record_hello", b"ebs")
524 self.ioctx.operate_write_op(write_op, "object")
525 eq(self.ioctx.read('object'), b"Hello, ebs!")
526
527 def test_writesame_op(self):
528 with WriteOpCtx() as write_op:
529 write_op.writesame(b'rzx', 9)
530 self.ioctx.operate_write_op(write_op, 'abc')
531 eq(self.ioctx.read('abc'), b'rzxrzxrzx')
532
533 def test_get_omap_vals_by_keys(self):
534 keys = ("1", "2", "3", "4")
535 values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
536 with WriteOpCtx() as write_op:
537 self.ioctx.set_omap(write_op, keys, values)
538 self.ioctx.operate_write_op(write_op, "hw")
539 with ReadOpCtx() as read_op:
540 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("3","4",))
541 eq(ret, 0)
542 self.ioctx.operate_read_op(read_op, "hw")
543 eq(list(iter), [("3", b"ccc"), ("4", b"\x04\x04\x04\x04")])
544 with ReadOpCtx() as read_op:
545 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("3","4",))
546 eq(ret, 0)
547 with assert_raises(ObjectNotFound):
548 self.ioctx.operate_read_op(read_op, "no_such")
549
550 def test_get_omap_keys(self):
551 keys = ("1", "2", "3")
552 values = (b"aaa", b"bbb", b"ccc")
553 with WriteOpCtx() as write_op:
554 self.ioctx.set_omap(write_op, keys, values)
555 self.ioctx.operate_write_op(write_op, "hw")
556 with ReadOpCtx() as read_op:
557 iter, ret = self.ioctx.get_omap_keys(read_op,"",2)
558 eq(ret, 0)
559 self.ioctx.operate_read_op(read_op, "hw")
560 eq(list(iter), [("1", None), ("2", None)])
561 with ReadOpCtx() as read_op:
562 iter, ret = self.ioctx.get_omap_keys(read_op,"",2)
563 eq(ret, 0)
564 with assert_raises(ObjectNotFound):
565 self.ioctx.operate_read_op(read_op, "no_such")
566
567 def test_clear_omap(self):
568 keys = ("1", "2", "3")
569 values = (b"aaa", b"bbb", b"ccc")
570 with WriteOpCtx() as write_op:
571 self.ioctx.set_omap(write_op, keys, values)
572 self.ioctx.operate_write_op(write_op, "hw")
573 with WriteOpCtx() as write_op_1:
574 self.ioctx.clear_omap(write_op_1)
575 self.ioctx.operate_write_op(write_op_1, "hw")
576 with ReadOpCtx() as read_op:
577 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("1",))
578 eq(ret, 0)
579 self.ioctx.operate_read_op(read_op, "hw")
580 eq(list(iter), [])
581
582 def test_remove_omap_ramge2(self):
583 keys = ("1", "2", "3", "4")
584 values = (b"a", b"bb", b"ccc", b"dddd")
585 with WriteOpCtx() as write_op:
586 self.ioctx.set_omap(write_op, keys, values)
587 self.ioctx.operate_write_op(write_op, "test_obj")
588 with ReadOpCtx() as read_op:
589 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op, keys)
590 eq(ret, 0)
591 self.ioctx.operate_read_op(read_op, "test_obj")
592 eq(list(iter), list(zip(keys, values)))
593 with WriteOpCtx() as write_op:
594 self.ioctx.remove_omap_range2(write_op, "1", "4")
595 self.ioctx.operate_write_op(write_op, "test_obj")
596 with ReadOpCtx() as read_op:
597 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op, keys)
598 eq(ret, 0)
599 self.ioctx.operate_read_op(read_op, "test_obj")
600 eq(list(iter), [("4", b"dddd")])
601
602 def test_omap_cmp(self):
603 object_id = 'test'
604 self.ioctx.write(object_id, b'omap_cmp')
605 with WriteOpCtx() as write_op:
606 self.ioctx.set_omap(write_op, ('key1',), ('1',))
607 self.ioctx.operate_write_op(write_op, object_id)
608 with WriteOpCtx() as write_op:
609 write_op.omap_cmp('key1', '1', LIBRADOS_CMPXATTR_OP_EQ)
610 self.ioctx.set_omap(write_op, ('key1',), ('2',))
611 self.ioctx.operate_write_op(write_op, object_id)
612 with ReadOpCtx() as read_op:
613 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op, ('key1',))
614 eq(ret, 0)
615 self.ioctx.operate_read_op(read_op, object_id)
616 eq(list(iter), [('key1', b'2')])
617 with WriteOpCtx() as write_op:
618 write_op.omap_cmp('key1', '1', LIBRADOS_CMPXATTR_OP_GT)
619 self.ioctx.set_omap(write_op, ('key1',), ('3',))
620 self.ioctx.operate_write_op(write_op, object_id)
621 with ReadOpCtx() as read_op:
622 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op, ('key1',))
623 eq(ret, 0)
624 self.ioctx.operate_read_op(read_op, object_id)
625 eq(list(iter), [('key1', b'3')])
626 with WriteOpCtx() as write_op:
627 write_op.omap_cmp('key1', '4', LIBRADOS_CMPXATTR_OP_LT)
628 self.ioctx.set_omap(write_op, ('key1',), ('4',))
629 self.ioctx.operate_write_op(write_op, object_id)
630 with ReadOpCtx() as read_op:
631 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op, ('key1',))
632 eq(ret, 0)
633 self.ioctx.operate_read_op(read_op, object_id)
634 eq(list(iter), [('key1', b'4')])
635 with WriteOpCtx() as write_op:
636 write_op.omap_cmp('key1', '1', LIBRADOS_CMPXATTR_OP_EQ)
637 self.ioctx.set_omap(write_op, ('key1',), ('5',))
638 try:
639 self.ioctx.operate_write_op(write_op, object_id)
640 except (OSError, ExtendMismatch) as e:
641 eq(e.errno, 125)
642 else:
643 message = "omap_cmp did not raise Exception when omap content does not match"
644 raise AssertionError(message)
645
646 def test_cmpext_op(self):
647 object_id = 'test'
648 with WriteOpCtx() as write_op:
649 write_op.write(b'12345', 0)
650 self.ioctx.operate_write_op(write_op, object_id)
651 with WriteOpCtx() as write_op:
652 write_op.cmpext(b'12345', 0)
653 write_op.write(b'54321', 0)
654 self.ioctx.operate_write_op(write_op, object_id)
655 eq(self.ioctx.read(object_id), b'54321')
656 with WriteOpCtx() as write_op:
657 write_op.cmpext(b'56789', 0)
658 write_op.write(b'12345', 0)
659 try:
660 self.ioctx.operate_write_op(write_op, object_id)
661 except ExtendMismatch as e:
662 # the cmpext_result compare with expected error number, it should be (-MAX_ERRNO - 1)
663 # where "1" is the offset of the first unmatched byte
664 eq(-e.errno, -MAX_ERRNO - 1)
665 eq(e.offset, 1)
666 else:
667 message = "cmpext did not raise Exception when object content does not match"
668 raise AssertionError(message)
669 with ReadOpCtx() as read_op:
670 read_op.cmpext(b'54321', 0)
671 self.ioctx.operate_read_op(read_op, object_id)
672 with ReadOpCtx() as read_op:
673 read_op.cmpext(b'54789', 0)
674 try:
675 self.ioctx.operate_read_op(read_op, object_id)
676 except ExtendMismatch as e:
677 # the cmpext_result compare with expected error number, it should be (-MAX_ERRNO - 2)
678 # where "2" is the offset of the first unmatched byte
679 eq(-e.errno, -MAX_ERRNO - 2)
680 eq(e.offset, 2)
681 else:
682 message = "cmpext did not raise Exception when object content does not match"
683 raise AssertionError(message)
684
685 def test_xattrs_op(self):
686 xattrs = dict(a=b'1', b=b'2', c=b'3', d=b'a\0b', e=b'\0')
687 with WriteOpCtx() as write_op:
688 write_op.new(LIBRADOS_CREATE_EXCLUSIVE)
689 for key, value in xattrs.items():
690 write_op.set_xattr(key, value)
691 self.ioctx.operate_write_op(write_op, 'abc')
692 eq(self.ioctx.get_xattr('abc', key), value)
693
694 stored_xattrs_1 = {}
695 for key, value in self.ioctx.get_xattrs('abc'):
696 stored_xattrs_1[key] = value
697 eq(stored_xattrs_1, xattrs)
698
699 for key in xattrs.keys():
700 write_op.rm_xattr(key)
701 self.ioctx.operate_write_op(write_op, 'abc')
702 stored_xattrs_2 = {}
703 for key, value in self.ioctx.get_xattrs('abc'):
704 stored_xattrs_2[key] = value
705 eq(stored_xattrs_2, {})
706
707 write_op.remove()
708 self.ioctx.operate_write_op(write_op, 'abc')
709
710 def test_locator(self):
711 self.ioctx.set_locator_key("bar")
712 self.ioctx.write('foo', b'contents1')
713 objects = [i for i in self.ioctx.list_objects()]
714 eq(len(objects), 1)
715 eq(self.ioctx.get_locator_key(), "bar")
716 self.ioctx.set_locator_key("")
717 objects[0].seek(0)
718 objects[0].write(b"contents2")
719 eq(self.ioctx.get_locator_key(), "")
720 self.ioctx.set_locator_key("bar")
721 contents = self.ioctx.read("foo")
722 eq(contents, b"contents2")
723 eq(self.ioctx.get_locator_key(), "bar")
724 objects[0].remove()
725 objects = [i for i in self.ioctx.list_objects()]
726 eq(objects, [])
727 self.ioctx.set_locator_key("")
728
729 def test_operate_aio_write_op(self):
730 lock = threading.Condition()
731 count = [0]
732 def cb(blah):
733 with lock:
734 count[0] += 1
735 lock.notify()
736 return 0
737 with WriteOpCtx() as write_op:
738 write_op.write(b'rzx')
739 comp = self.ioctx.operate_aio_write_op(write_op, "object", cb, cb)
740 comp.wait_for_complete()
741 with lock:
742 while count[0] < 2:
743 lock.wait()
744 eq(comp.get_return_value(), 0)
745 eq(self.ioctx.read('object'), b'rzx')
746
747 def test_aio_write(self):
748 lock = threading.Condition()
749 count = [0]
750 def cb(blah):
751 with lock:
752 count[0] += 1
753 lock.notify()
754 return 0
755 comp = self.ioctx.aio_write("foo", b"bar", 0, cb, cb)
756 comp.wait_for_complete()
757 with lock:
758 while count[0] < 2:
759 lock.wait()
760 eq(comp.get_return_value(), 0)
761 contents = self.ioctx.read("foo")
762 eq(contents, b"bar")
763 [i.remove() for i in self.ioctx.list_objects()]
764
765 def test_aio_cmpext(self):
766 lock = threading.Condition()
767 count = [0]
768 def cb(blah):
769 with lock:
770 count[0] += 1
771 lock.notify()
772 return 0
773
774 self.ioctx.write('test_object', b'abcdefghi')
775 comp = self.ioctx.aio_cmpext('test_object', b'abcdefghi', 0, cb)
776 comp.wait_for_complete()
777 with lock:
778 while count[0] < 1:
779 lock.wait()
780 eq(comp.get_return_value(), 0)
781
782 def test_aio_rmxattr(self):
783 lock = threading.Condition()
784 count = [0]
785 def cb(blah):
786 with lock:
787 count[0] += 1
788 lock.notify()
789 return 0
790 self.ioctx.set_xattr("xyz", "key", b'value')
791 eq(self.ioctx.get_xattr("xyz", "key"), b'value')
792 comp = self.ioctx.aio_rmxattr("xyz", "key", cb)
793 comp.wait_for_complete()
794 with lock:
795 while count[0] < 1:
796 lock.wait()
797 eq(comp.get_return_value(), 0)
798 with assert_raises(NoData):
799 self.ioctx.get_xattr("xyz", "key")
800
801 def test_aio_write_no_comp_ref(self):
802 lock = threading.Condition()
803 count = [0]
804 def cb(blah):
805 with lock:
806 count[0] += 1
807 lock.notify()
808 return 0
809 # NOTE(sileht): force don't save the comp into local var
810 # to ensure all references are correctly tracked into the lib
811 self.ioctx.aio_write("foo", b"bar", 0, cb, cb)
812 with lock:
813 while count[0] < 2:
814 lock.wait()
815 contents = self.ioctx.read("foo")
816 eq(contents, b"bar")
817 [i.remove() for i in self.ioctx.list_objects()]
818
819 def test_aio_append(self):
820 lock = threading.Condition()
821 count = [0]
822 def cb(blah):
823 with lock:
824 count[0] += 1
825 lock.notify()
826 return 0
827 comp = self.ioctx.aio_write("foo", b"bar", 0, cb, cb)
828 comp2 = self.ioctx.aio_append("foo", b"baz", cb, cb)
829 comp.wait_for_complete()
830 contents = self.ioctx.read("foo")
831 eq(contents, b"barbaz")
832 with lock:
833 while count[0] < 4:
834 lock.wait()
835 eq(comp.get_return_value(), 0)
836 eq(comp2.get_return_value(), 0)
837 [i.remove() for i in self.ioctx.list_objects()]
838
839 def test_aio_write_full(self):
840 lock = threading.Condition()
841 count = [0]
842 def cb(blah):
843 with lock:
844 count[0] += 1
845 lock.notify()
846 return 0
847 self.ioctx.aio_write("foo", b"barbaz", 0, cb, cb)
848 comp = self.ioctx.aio_write_full("foo", b"bar", cb, cb)
849 comp.wait_for_complete()
850 with lock:
851 while count[0] < 2:
852 lock.wait()
853 eq(comp.get_return_value(), 0)
854 contents = self.ioctx.read("foo")
855 eq(contents, b"bar")
856 [i.remove() for i in self.ioctx.list_objects()]
857
858 def test_aio_writesame(self):
859 lock = threading.Condition()
860 count = [0]
861 def cb(blah):
862 with lock:
863 count[0] += 1
864 lock.notify()
865 return 0
866 comp = self.ioctx.aio_writesame("abc", b"rzx", 9, 0, cb)
867 comp.wait_for_complete()
868 with lock:
869 while count[0] < 1:
870 lock.wait()
871 eq(comp.get_return_value(), 0)
872 eq(self.ioctx.read("abc"), b"rzxrzxrzx")
873 [i.remove() for i in self.ioctx.list_objects()]
874
875 def test_aio_stat(self):
876 lock = threading.Condition()
877 count = [0]
878 def cb(_, size, mtime):
879 with lock:
880 count[0] += 1
881 lock.notify()
882
883 comp = self.ioctx.aio_stat("foo", cb)
884 comp.wait_for_complete()
885 with lock:
886 while count[0] < 1:
887 lock.wait()
888 eq(comp.get_return_value(), -2)
889
890 self.ioctx.write("foo", b"bar")
891
892 comp = self.ioctx.aio_stat("foo", cb)
893 comp.wait_for_complete()
894 with lock:
895 while count[0] < 2:
896 lock.wait()
897 eq(comp.get_return_value(), 0)
898
899 [i.remove() for i in self.ioctx.list_objects()]
900
901 def test_aio_remove(self):
902 lock = threading.Condition()
903 count = [0]
904 def cb(blah):
905 with lock:
906 count[0] += 1
907 lock.notify()
908 return 0
909 self.ioctx.write('foo', b'wrx')
910 eq(self.ioctx.read('foo'), b'wrx')
911 comp = self.ioctx.aio_remove('foo', cb, cb)
912 comp.wait_for_complete()
913 with lock:
914 while count[0] < 2:
915 lock.wait()
916 eq(comp.get_return_value(), 0)
917 eq(list(self.ioctx.list_objects()), [])
918
919 def _take_down_acting_set(self, pool, objectname):
920 # find acting_set for pool:objectname and take it down; used to
921 # verify that async reads don't complete while acting set is missing
922 cmd = {
923 "prefix":"osd map",
924 "pool":pool,
925 "object":objectname,
926 "format":"json",
927 }
928 r, jsonout, _ = self.rados.mon_command(json.dumps(cmd), b'')
929 objmap = json.loads(jsonout.decode("utf-8"))
930 acting_set = objmap['acting']
931 cmd = {"prefix":"osd set", "key":"noup"}
932 r, _, _ = self.rados.mon_command(json.dumps(cmd), b'')
933 eq(r, 0)
934 cmd = {"prefix":"osd down", "ids":[str(i) for i in acting_set]}
935 r, _, _ = self.rados.mon_command(json.dumps(cmd), b'')
936 eq(r, 0)
937
938 # wait for OSDs to acknowledge the down
939 eq(self.rados.wait_for_latest_osdmap(), 0)
940
941 def _let_osds_back_up(self):
942 cmd = {"prefix":"osd unset", "key":"noup"}
943 r, _, _ = self.rados.mon_command(json.dumps(cmd), b'')
944 eq(r, 0)
945
946 def test_aio_read_wait_for_complete(self):
947 # use wait_for_complete() and wait for cb by
948 # watching retval[0]
949
950 # this is a list so that the local cb() can modify it
951 payload = b"bar\000frob"
952 self.ioctx.write("foo", payload)
953 self._take_down_acting_set('test_pool', 'foo')
954
955 retval = [None]
956 lock = threading.Condition()
957 def cb(_, buf):
958 with lock:
959 retval[0] = buf
960 lock.notify()
961
962 comp = self.ioctx.aio_read("foo", len(payload), 0, cb)
963 eq(False, comp.is_complete())
964 time.sleep(3)
965 eq(False, comp.is_complete())
966 with lock:
967 eq(None, retval[0])
968
969 self._let_osds_back_up()
970 comp.wait_for_complete()
971 loops = 0
972 with lock:
973 while retval[0] is None and loops <= 10:
974 lock.wait(timeout=5)
975 loops += 1
976 assert(loops <= 10)
977
978 eq(retval[0], payload)
979 eq(sys.getrefcount(comp), 2)
980
981 def test_aio_read_wait_for_complete_and_cb(self):
982 # use wait_for_complete_and_cb(), verify retval[0] is
983 # set by the time we regain control
984 payload = b"bar\000frob"
985 self.ioctx.write("foo", payload)
986
987 self._take_down_acting_set('test_pool', 'foo')
988 # this is a list so that the local cb() can modify it
989 retval = [None]
990 lock = threading.Condition()
991 def cb(_, buf):
992 with lock:
993 retval[0] = buf
994 lock.notify()
995 comp = self.ioctx.aio_read("foo", len(payload), 0, cb)
996 eq(False, comp.is_complete())
997 time.sleep(3)
998 eq(False, comp.is_complete())
999 with lock:
1000 eq(None, retval[0])
1001
1002 self._let_osds_back_up()
1003 comp.wait_for_complete_and_cb()
1004 assert(retval[0] is not None)
1005 eq(retval[0], payload)
1006 eq(sys.getrefcount(comp), 2)
1007
1008 def test_aio_read_wait_for_complete_and_cb_error(self):
1009 # error case, use wait_for_complete_and_cb(), verify retval[0] is
1010 # set by the time we regain control
1011 self._take_down_acting_set('test_pool', 'bar')
1012
1013 # this is a list so that the local cb() can modify it
1014 retval = [1]
1015 lock = threading.Condition()
1016 def cb(_, buf):
1017 with lock:
1018 retval[0] = buf
1019 lock.notify()
1020
1021 # read from a DNE object
1022 comp = self.ioctx.aio_read("bar", 3, 0, cb)
1023 eq(False, comp.is_complete())
1024 time.sleep(3)
1025 eq(False, comp.is_complete())
1026 with lock:
1027 eq(1, retval[0])
1028 self._let_osds_back_up()
1029
1030 comp.wait_for_complete_and_cb()
1031 eq(None, retval[0])
1032 assert(comp.get_return_value() < 0)
1033 eq(sys.getrefcount(comp), 2)
1034
1035 def test_lock(self):
1036 self.ioctx.lock_exclusive("foo", "lock", "locker", "desc_lock",
1037 10000, 0)
1038 assert_raises(ObjectExists,
1039 self.ioctx.lock_exclusive,
1040 "foo", "lock", "locker", "desc_lock", 10000, 0)
1041 self.ioctx.unlock("foo", "lock", "locker")
1042 assert_raises(ObjectNotFound, self.ioctx.unlock, "foo", "lock", "locker")
1043
1044 self.ioctx.lock_shared("foo", "lock", "locker1", "tag", "desc_lock",
1045 10000, 0)
1046 self.ioctx.lock_shared("foo", "lock", "locker2", "tag", "desc_lock",
1047 10000, 0)
1048 assert_raises(ObjectBusy,
1049 self.ioctx.lock_exclusive,
1050 "foo", "lock", "locker3", "desc_lock", 10000, 0)
1051 self.ioctx.unlock("foo", "lock", "locker1")
1052 self.ioctx.unlock("foo", "lock", "locker2")
1053 assert_raises(ObjectNotFound, self.ioctx.unlock, "foo", "lock", "locker1")
1054 assert_raises(ObjectNotFound, self.ioctx.unlock, "foo", "lock", "locker2")
1055
1056 def test_execute(self):
1057 self.ioctx.write("foo", b"") # ensure object exists
1058
1059 ret, buf = self.ioctx.execute("foo", "hello", "say_hello", b"")
1060 eq(buf, b"Hello, world!")
1061
1062 ret, buf = self.ioctx.execute("foo", "hello", "say_hello", b"nose")
1063 eq(buf, b"Hello, nose!")
1064
1065 def test_aio_execute(self):
1066 count = [0]
1067 retval = [None]
1068 lock = threading.Condition()
1069 def cb(_, buf):
1070 with lock:
1071 if retval[0] is None:
1072 retval[0] = buf
1073 count[0] += 1
1074 lock.notify()
1075 self.ioctx.write("foo", b"") # ensure object exists
1076
1077 comp = self.ioctx.aio_execute("foo", "hello", "say_hello", b"", 32, cb, cb)
1078 comp.wait_for_complete()
1079 with lock:
1080 while count[0] < 2:
1081 lock.wait()
1082 eq(comp.get_return_value(), 13)
1083 eq(retval[0], b"Hello, world!")
1084
1085 retval[0] = None
1086 comp = self.ioctx.aio_execute("foo", "hello", "say_hello", b"nose", 32, cb, cb)
1087 comp.wait_for_complete()
1088 with lock:
1089 while count[0] < 4:
1090 lock.wait()
1091 eq(comp.get_return_value(), 12)
1092 eq(retval[0], b"Hello, nose!")
1093
1094 [i.remove() for i in self.ioctx.list_objects()]
1095
1096 def test_aio_setxattr(self):
1097 lock = threading.Condition()
1098 count = [0]
1099 def cb(blah):
1100 with lock:
1101 count[0] += 1
1102 lock.notify()
1103 return 0
1104 comp = self.ioctx.aio_setxattr("obj", "key", b'value', cb)
1105 comp.wait_for_complete()
1106 with lock:
1107 while count[0] < 1:
1108 lock.wait()
1109 eq(comp.get_return_value(), 0)
1110 eq(self.ioctx.get_xattr("obj", "key"), b'value')
1111
1112 def test_applications(self):
1113 cmd = {"prefix":"osd dump", "format":"json"}
1114 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'')
1115 eq(ret, 0)
1116 assert len(buf) > 0
1117 release = json.loads(buf.decode("utf-8")).get("require_osd_release",
1118 None)
1119 if not release or release[0] < 'l':
1120 raise SkipTest
1121
1122 eq([], self.ioctx.application_list())
1123
1124 self.ioctx.application_enable("app1")
1125 assert_raises(Error, self.ioctx.application_enable, "app2")
1126 self.ioctx.application_enable("app2", True)
1127
1128 assert_raises(Error, self.ioctx.application_metadata_list, "dne")
1129 eq([], self.ioctx.application_metadata_list("app1"))
1130
1131 assert_raises(Error, self.ioctx.application_metadata_set, "dne", "key",
1132 "key")
1133 self.ioctx.application_metadata_set("app1", "key1", "val1")
1134 eq("val1", self.ioctx.application_metadata_get("app1", "key1"))
1135 self.ioctx.application_metadata_set("app1", "key2", "val2")
1136 eq("val2", self.ioctx.application_metadata_get("app1", "key2"))
1137 self.ioctx.application_metadata_set("app2", "key1", "val1")
1138 eq("val1", self.ioctx.application_metadata_get("app2", "key1"))
1139
1140 eq([("key1", "val1"), ("key2", "val2")],
1141 self.ioctx.application_metadata_list("app1"))
1142
1143 self.ioctx.application_metadata_remove("app1", "key1")
1144 eq([("key2", "val2")], self.ioctx.application_metadata_list("app1"))
1145
1146 def test_service_daemon(self):
1147 name = "pid-" + str(os.getpid())
1148 metadata = {'version': '3.14', 'memory': '42'}
1149 self.rados.service_daemon_register("laundry", name, metadata)
1150 status = {'result': 'unknown', 'test': 'running'}
1151 self.rados.service_daemon_update(status)
1152
1153 def test_alignment(self):
1154 eq(self.ioctx.alignment(), None)
1155
1156
1157 @attr('ec')
1158 class TestIoctxEc(object):
1159
1160 def setUp(self):
1161 self.rados = Rados(conffile='')
1162 self.rados.connect()
1163 self.pool = 'test-ec'
1164 self.profile = 'testprofile-%s' % self.pool
1165 cmd = {"prefix": "osd erasure-code-profile set",
1166 "name": self.profile, "profile": ["k=2", "m=1", "crush-failure-domain=osd"]}
1167 ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
1168 eq(ret, 0, msg=out)
1169 # create ec pool with profile created above
1170 cmd = {'prefix': 'osd pool create', 'pg_num': 8, 'pgp_num': 8,
1171 'pool': self.pool, 'pool_type': 'erasure',
1172 'erasure_code_profile': self.profile}
1173 ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
1174 eq(ret, 0, msg=out)
1175 assert self.rados.pool_exists(self.pool)
1176 self.ioctx = self.rados.open_ioctx(self.pool)
1177
1178 def tearDown(self):
1179 cmd = {"prefix": "osd unset", "key": "noup"}
1180 self.rados.mon_command(json.dumps(cmd), b'')
1181 self.ioctx.close()
1182 self.rados.delete_pool(self.pool)
1183 self.rados.shutdown()
1184
1185 def test_alignment(self):
1186 eq(self.ioctx.alignment(), 8192)
1187
1188
1189 class TestIoctx2(object):
1190
1191 def setUp(self):
1192 self.rados = Rados(conffile='')
1193 self.rados.connect()
1194 self.rados.create_pool('test_pool')
1195 assert self.rados.pool_exists('test_pool')
1196 pool_id = self.rados.pool_lookup('test_pool')
1197 assert pool_id > 0
1198 self.ioctx2 = self.rados.open_ioctx2(pool_id)
1199
1200 def tearDown(self):
1201 cmd = {"prefix": "osd unset", "key": "noup"}
1202 self.rados.mon_command(json.dumps(cmd), b'')
1203 self.ioctx2.close()
1204 self.rados.delete_pool('test_pool')
1205 self.rados.shutdown()
1206
1207 def test_get_last_version(self):
1208 version = self.ioctx2.get_last_version()
1209 assert version >= 0
1210
1211 def test_get_stats(self):
1212 stats = self.ioctx2.get_stats()
1213 eq(stats, {'num_objects_unfound': 0,
1214 'num_objects_missing_on_primary': 0,
1215 'num_object_clones': 0,
1216 'num_objects': 0,
1217 'num_object_copies': 0,
1218 'num_bytes': 0,
1219 'num_rd_kb': 0,
1220 'num_wr_kb': 0,
1221 'num_kb': 0,
1222 'num_wr': 0,
1223 'num_objects_degraded': 0,
1224 'num_rd': 0})
1225
1226
1227 class TestObject(object):
1228
1229 def setUp(self):
1230 self.rados = Rados(conffile='')
1231 self.rados.connect()
1232 self.rados.create_pool('test_pool')
1233 assert self.rados.pool_exists('test_pool')
1234 self.ioctx = self.rados.open_ioctx('test_pool')
1235 self.ioctx.write('foo', b'bar')
1236 self.object = Object(self.ioctx, 'foo')
1237
1238 def tearDown(self):
1239 self.ioctx.close()
1240 self.ioctx = None
1241 self.rados.delete_pool('test_pool')
1242 self.rados.shutdown()
1243 self.rados = None
1244
1245 def test_read(self):
1246 eq(self.object.read(3), b'bar')
1247 eq(self.object.read(100), b'')
1248
1249 def test_seek(self):
1250 self.object.write(b'blah')
1251 self.object.seek(0)
1252 eq(self.object.read(4), b'blah')
1253 self.object.seek(1)
1254 eq(self.object.read(3), b'lah')
1255
1256 def test_write(self):
1257 self.object.write(b'barbaz')
1258 self.object.seek(0)
1259 eq(self.object.read(3), b'bar')
1260 eq(self.object.read(3), b'baz')
1261
1262 @attr('snap')
1263 class TestIoCtxSelfManagedSnaps(object):
1264 def setUp(self):
1265 self.rados = Rados(conffile='')
1266 self.rados.connect()
1267 self.rados.create_pool('test_pool')
1268 assert self.rados.pool_exists('test_pool')
1269 self.ioctx = self.rados.open_ioctx('test_pool')
1270
1271 def tearDown(self):
1272 cmd = {"prefix":"osd unset", "key":"noup"}
1273 self.rados.mon_command(json.dumps(cmd), b'')
1274 self.ioctx.close()
1275 self.rados.delete_pool('test_pool')
1276 self.rados.shutdown()
1277
1278 def test(self):
1279 # cannot mix-and-match pool and self-managed snapshot mode
1280 self.ioctx.set_self_managed_snap_write([])
1281 self.ioctx.write('abc', b'abc')
1282 snap_id_1 = self.ioctx.create_self_managed_snap()
1283 self.ioctx.set_self_managed_snap_write([snap_id_1])
1284
1285 self.ioctx.write('abc', b'def')
1286 snap_id_2 = self.ioctx.create_self_managed_snap()
1287 self.ioctx.set_self_managed_snap_write([snap_id_1, snap_id_2])
1288
1289 self.ioctx.write('abc', b'ghi')
1290
1291 self.ioctx.rollback_self_managed_snap('abc', snap_id_1)
1292 eq(self.ioctx.read('abc'), b'abc')
1293
1294 self.ioctx.rollback_self_managed_snap('abc', snap_id_2)
1295 eq(self.ioctx.read('abc'), b'def')
1296
1297 self.ioctx.remove_self_managed_snap(snap_id_1)
1298 self.ioctx.remove_self_managed_snap(snap_id_2)
1299
1300 class TestCommand(object):
1301
1302 def setUp(self):
1303 self.rados = Rados(conffile='')
1304 self.rados.connect()
1305
1306 def tearDown(self):
1307 self.rados.shutdown()
1308
1309 def test_monmap_dump(self):
1310
1311 # check for success and some plain output with epoch in it
1312 cmd = {"prefix":"mon dump"}
1313 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
1314 eq(ret, 0)
1315 assert len(buf) > 0
1316 assert(b'epoch' in buf)
1317
1318 # JSON, and grab current epoch
1319 cmd['format'] = 'json'
1320 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
1321 eq(ret, 0)
1322 assert len(buf) > 0
1323 d = json.loads(buf.decode("utf-8"))
1324 assert('epoch' in d)
1325 epoch = d['epoch']
1326
1327 # assume epoch + 1000 does not exist; test for ENOENT
1328 cmd['epoch'] = epoch + 1000
1329 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
1330 eq(ret, -errno.ENOENT)
1331 eq(len(buf), 0)
1332 del cmd['epoch']
1333
1334 # send to specific target by name, rank
1335 cmd = {"prefix": "version"}
1336
1337 target = d['mons'][0]['name']
1338 print(target)
1339 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30,
1340 target=target)
1341 eq(ret, 0)
1342 assert len(buf) > 0
1343 e = json.loads(buf.decode("utf-8"))
1344 assert('release' in e)
1345
1346 target = d['mons'][0]['rank']
1347 print(target)
1348 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30,
1349 target=target)
1350 eq(ret, 0)
1351 assert len(buf) > 0
1352 e = json.loads(buf.decode("utf-8"))
1353 assert('release' in e)
1354
1355 @attr('bench')
1356 def test_osd_bench(self):
1357 cmd = dict(prefix='bench', size=4096, count=8192)
1358 ret, buf, err = self.rados.osd_command(0, json.dumps(cmd), b'',
1359 timeout=30)
1360 eq(ret, 0)
1361 assert len(buf) > 0
1362 out = json.loads(buf.decode('utf-8'))
1363 eq(out['blocksize'], cmd['size'])
1364 eq(out['bytes_written'], cmd['count'])
1365
1366 def test_ceph_osd_pool_create_utf8(self):
1367 poolname = "\u9ec5"
1368
1369 cmd = {"prefix": "osd pool create", "pg_num": 16, "pool": poolname}
1370 ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'')
1371 eq(ret, 0)
1372 assert len(out) > 0
1373 eq(u"pool '\u9ec5' created", out)
1374
1375
1376 class TestWatchNotify(object):
1377 OID = "test_watch_notify"
1378
1379 def setUp(self):
1380 self.rados = Rados(conffile='')
1381 self.rados.connect()
1382 self.rados.create_pool('test_pool')
1383 assert self.rados.pool_exists('test_pool')
1384 self.ioctx = self.rados.open_ioctx('test_pool')
1385 self.ioctx.write(self.OID, b'test watch notify')
1386 self.lock = threading.Condition()
1387 self.notify_cnt = {}
1388 self.notify_data = {}
1389 self.notify_error = {}
1390 # aio related
1391 self.ack_cnt = {}
1392 self.ack_data = {}
1393 self.instance_id = self.rados.get_instance_id()
1394
1395 def tearDown(self):
1396 self.ioctx.close()
1397 self.rados.delete_pool('test_pool')
1398 self.rados.shutdown()
1399
1400 def make_callback(self):
1401 def callback(notify_id, notifier_id, watch_id, data):
1402 with self.lock:
1403 if watch_id not in self.notify_cnt:
1404 self.notify_cnt[watch_id] = 0
1405 self.notify_cnt[watch_id] += 1
1406 self.notify_data[watch_id] = data
1407 return callback
1408
1409 def make_error_callback(self):
1410 def callback(watch_id, error):
1411 with self.lock:
1412 self.notify_error[watch_id] = error
1413 return callback
1414
1415
1416 def test(self):
1417 with self.ioctx.watch(self.OID, self.make_callback(),
1418 self.make_error_callback()) as watch1:
1419 watch_id1 = watch1.get_id()
1420 assert(watch_id1 > 0)
1421
1422 with self.rados.open_ioctx('test_pool') as ioctx:
1423 watch2 = ioctx.watch(self.OID, self.make_callback(),
1424 self.make_error_callback())
1425 watch_id2 = watch2.get_id()
1426 assert(watch_id2 > 0)
1427
1428 assert(self.ioctx.notify(self.OID, 'test'))
1429 with self.lock:
1430 assert(watch_id1 in self.notify_cnt)
1431 assert(watch_id2 in self.notify_cnt)
1432 eq(self.notify_cnt[watch_id1], 1)
1433 eq(self.notify_cnt[watch_id2], 1)
1434 eq(self.notify_data[watch_id1], b'test')
1435 eq(self.notify_data[watch_id2], b'test')
1436
1437 assert(watch1.check() >= timedelta())
1438 assert(watch2.check() >= timedelta())
1439
1440 assert(self.ioctx.notify(self.OID, 'best'))
1441 with self.lock:
1442 eq(self.notify_cnt[watch_id1], 2)
1443 eq(self.notify_cnt[watch_id2], 2)
1444 eq(self.notify_data[watch_id1], b'best')
1445 eq(self.notify_data[watch_id2], b'best')
1446
1447 watch2.close()
1448
1449 assert(self.ioctx.notify(self.OID, 'rest'))
1450 with self.lock:
1451 eq(self.notify_cnt[watch_id1], 3)
1452 eq(self.notify_cnt[watch_id2], 2)
1453 eq(self.notify_data[watch_id1], b'rest')
1454 eq(self.notify_data[watch_id2], b'best')
1455
1456 assert(watch1.check() >= timedelta())
1457
1458 self.ioctx.remove_object(self.OID)
1459
1460 for i in range(10):
1461 with self.lock:
1462 if watch_id1 in self.notify_error:
1463 break
1464 time.sleep(1)
1465 eq(self.notify_error[watch_id1], -errno.ENOTCONN)
1466 assert_raises(NotConnected, watch1.check)
1467
1468 assert_raises(ObjectNotFound, self.ioctx.notify, self.OID, 'test')
1469
1470 def make_callback_reply(self):
1471 def callback(notify_id, notifier_id, watch_id, data):
1472 with self.lock:
1473 return data
1474 return callback
1475
1476 def notify_callback(self, _, r, ack_list, timeout_list):
1477 eq(r, 0)
1478 with self.lock:
1479 for notifier_id, _, notifier_data in ack_list:
1480 if notifier_id not in self.ack_cnt:
1481 self.ack_cnt[notifier_id] = 0
1482 self.ack_cnt[notifier_id] += 1
1483 self.ack_data[notifier_id] = notifier_data
1484
1485 def notify_callback_err(self, _, r, ack_list, timeout_list):
1486 eq(r, -errno.ENOENT)
1487
1488 def test_aio_notify(self):
1489 with self.ioctx.watch(self.OID, self.make_callback_reply(),
1490 self.make_error_callback()) as watch1:
1491 watch_id1 = watch1.get_id()
1492 ok(watch_id1 > 0)
1493
1494 with self.rados.open_ioctx('test_pool') as ioctx:
1495 watch2 = ioctx.watch(self.OID, self.make_callback_reply(),
1496 self.make_error_callback())
1497 watch_id2 = watch2.get_id()
1498 ok(watch_id2 > 0)
1499
1500 comp = self.ioctx.aio_notify(self.OID, self.notify_callback, msg='test')
1501 comp.wait_for_complete_and_cb()
1502 with self.lock:
1503 ok(self.instance_id in self.ack_cnt)
1504 eq(self.ack_cnt[self.instance_id], 2)
1505 eq(self.ack_data[self.instance_id], b'test')
1506
1507 ok(watch1.check() >= timedelta())
1508 ok(watch2.check() >= timedelta())
1509
1510 comp = self.ioctx.aio_notify(self.OID, self.notify_callback, msg='best')
1511 comp.wait_for_complete_and_cb()
1512 with self.lock:
1513 eq(self.ack_cnt[self.instance_id], 4)
1514 eq(self.ack_data[self.instance_id], b'best')
1515
1516 watch2.close()
1517
1518 comp = self.ioctx.aio_notify(self.OID, self.notify_callback, msg='rest')
1519 comp.wait_for_complete_and_cb()
1520 with self.lock:
1521 eq(self.ack_cnt[self.instance_id], 5)
1522 eq(self.ack_data[self.instance_id], b'rest')
1523
1524 assert(watch1.check() >= timedelta())
1525 self.ioctx.remove_object(self.OID)
1526
1527 for i in range(10):
1528 with self.lock:
1529 if watch_id1 in self.notify_error:
1530 break
1531 time.sleep(1)
1532 eq(self.notify_error[watch_id1], -errno.ENOTCONN)
1533 assert_raises(NotConnected, watch1.check)
1534
1535 comp = self.ioctx.aio_notify(self.OID, self.notify_callback_err, msg='test')
1536 comp.wait_for_complete_and_cb()