]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/pybind/test_rados.py
bump version to 18.2.4-pve3
[ceph.git] / ceph / src / test / pybind / test_rados.py
1 from __future__ import print_function
2 from assertions import assert_equal as eq, assert_raises
3 from rados import (Rados, Error, RadosStateError, Object, ObjectExists,
4 ObjectNotFound, ObjectBusy, NotConnected,
5 LIBRADOS_ALL_NSPACES, WriteOpCtx, ReadOpCtx, LIBRADOS_CREATE_EXCLUSIVE,
6 LIBRADOS_CMPXATTR_OP_EQ, LIBRADOS_CMPXATTR_OP_GT, LIBRADOS_CMPXATTR_OP_LT, OSError,
7 LIBRADOS_SNAP_HEAD, LIBRADOS_OPERATION_BALANCE_READS, LIBRADOS_OPERATION_SKIPRWLOCKS, MonitorLog, MAX_ERRNO, NoData, ExtendMismatch)
8 from datetime import timedelta
9 import time
10 import threading
11 import json
12 import errno
13 import os
14 import pytest
15 import re
16 import sys
17
18 def test_rados_init_error():
19 assert_raises(Error, Rados, conffile='', rados_id='admin',
20 name='client.admin')
21 assert_raises(Error, Rados, conffile='', name='invalid')
22 assert_raises(Error, Rados, conffile='', name='bad.invalid')
23
24 def test_rados_init():
25 with Rados(conffile='', rados_id='admin'):
26 pass
27 with Rados(conffile='', name='client.admin'):
28 pass
29 with Rados(conffile='', name='client.admin'):
30 pass
31 with Rados(conffile='', name='client.admin'):
32 pass
33
34 def test_ioctx_context_manager():
35 with Rados(conffile='', rados_id='admin') as conn:
36 with conn.open_ioctx('rbd') as ioctx:
37 pass
38
39 def test_parse_argv():
40 args = ['osd', 'pool', 'delete', 'foobar', 'foobar', '--yes-i-really-really-mean-it']
41 r = Rados()
42 eq(args, r.conf_parse_argv(args))
43
44 def test_parse_argv_empty_str():
45 args = ['']
46 r = Rados()
47 eq(args, r.conf_parse_argv(args))
48
49 class TestRadosStateError(object):
50 def _requires_configuring(self, rados):
51 assert_raises(RadosStateError, rados.connect)
52
53 def _requires_configuring_or_connected(self, rados):
54 assert_raises(RadosStateError, rados.conf_read_file)
55 assert_raises(RadosStateError, rados.conf_parse_argv, None)
56 assert_raises(RadosStateError, rados.conf_parse_env)
57 assert_raises(RadosStateError, rados.conf_get, 'opt')
58 assert_raises(RadosStateError, rados.conf_set, 'opt', 'val')
59 assert_raises(RadosStateError, rados.ping_monitor, '0')
60
61 def _requires_connected(self, rados):
62 assert_raises(RadosStateError, rados.pool_exists, 'foo')
63 assert_raises(RadosStateError, rados.pool_lookup, 'foo')
64 assert_raises(RadosStateError, rados.pool_reverse_lookup, 0)
65 assert_raises(RadosStateError, rados.create_pool, 'foo')
66 assert_raises(RadosStateError, rados.get_pool_base_tier, 0)
67 assert_raises(RadosStateError, rados.delete_pool, 'foo')
68 assert_raises(RadosStateError, rados.list_pools)
69 assert_raises(RadosStateError, rados.get_fsid)
70 assert_raises(RadosStateError, rados.open_ioctx, 'foo')
71 assert_raises(RadosStateError, rados.mon_command, '', b'')
72 assert_raises(RadosStateError, rados.osd_command, 0, '', b'')
73 assert_raises(RadosStateError, rados.pg_command, '', '', b'')
74 assert_raises(RadosStateError, rados.wait_for_latest_osdmap)
75 assert_raises(RadosStateError, rados.blocklist_add, '127.0.0.1/123', 0)
76
77 def test_configuring(self):
78 rados = Rados(conffile='')
79 eq('configuring', rados.state)
80 self._requires_connected(rados)
81
82 def test_connected(self):
83 rados = Rados(conffile='')
84 with rados:
85 eq('connected', rados.state)
86 self._requires_configuring(rados)
87
88 def test_shutdown(self):
89 rados = Rados(conffile='')
90 with rados:
91 pass
92 eq('shutdown', rados.state)
93 self._requires_configuring(rados)
94 self._requires_configuring_or_connected(rados)
95 self._requires_connected(rados)
96
97
98 class TestRados(object):
99
100 def setup_method(self, method):
101 self.rados = Rados(conffile='')
102 self.rados.conf_parse_env('FOO_DOES_NOT_EXIST_BLAHBLAH')
103 self.rados.conf_parse_env()
104 self.rados.connect()
105
106 # Assume any pre-existing pools are the cluster's defaults
107 self.default_pools = self.rados.list_pools()
108
109 def teardown_method(self, method):
110 self.rados.shutdown()
111
112 def test_ping_monitor(self):
113 assert_raises(ObjectNotFound, self.rados.ping_monitor, 'not_exists_monitor')
114 cmd = {'prefix': 'mon dump', 'format':'json'}
115 ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'')
116 for mon in json.loads(buf.decode('utf8'))['mons']:
117 while True:
118 output = self.rados.ping_monitor(mon['name'])
119 if output is None:
120 continue
121 buf = json.loads(output)
122 if buf.get('health'):
123 break
124
125 def test_annotations(self):
126 with pytest.raises(TypeError):
127 self.rados.create_pool(0xf00)
128
129 def test_create(self):
130 self.rados.create_pool('foo')
131 self.rados.delete_pool('foo')
132
133 def test_create_utf8(self):
134 poolname = "\u9ec4"
135 self.rados.create_pool(poolname)
136 assert self.rados.pool_exists(u"\u9ec4")
137 self.rados.delete_pool(poolname)
138
139 def test_pool_lookup_utf8(self):
140 poolname = '\u9ec4'
141 self.rados.create_pool(poolname)
142 try:
143 poolid = self.rados.pool_lookup(poolname)
144 eq(poolname, self.rados.pool_reverse_lookup(poolid))
145 finally:
146 self.rados.delete_pool(poolname)
147
148 def test_eexist(self):
149 self.rados.create_pool('foo')
150 assert_raises(ObjectExists, self.rados.create_pool, 'foo')
151 self.rados.delete_pool('foo')
152
153 def list_non_default_pools(self):
154 pools = self.rados.list_pools()
155 for p in self.default_pools:
156 pools.remove(p)
157 return set(pools)
158
159 def test_list_pools(self):
160 eq(set(), self.list_non_default_pools())
161 self.rados.create_pool('foo')
162 eq(set(['foo']), self.list_non_default_pools())
163 self.rados.create_pool('bar')
164 eq(set(['foo', 'bar']), self.list_non_default_pools())
165 self.rados.create_pool('baz')
166 eq(set(['foo', 'bar', 'baz']), self.list_non_default_pools())
167 self.rados.delete_pool('foo')
168 eq(set(['bar', 'baz']), self.list_non_default_pools())
169 self.rados.delete_pool('baz')
170 eq(set(['bar']), self.list_non_default_pools())
171 self.rados.delete_pool('bar')
172 eq(set(), self.list_non_default_pools())
173 self.rados.create_pool('a' * 500)
174 eq(set(['a' * 500]), self.list_non_default_pools())
175 self.rados.delete_pool('a' * 500)
176
177 @pytest.mark.tier
178 def test_get_pool_base_tier(self):
179 self.rados.create_pool('foo')
180 try:
181 self.rados.create_pool('foo-cache')
182 try:
183 pool_id = self.rados.pool_lookup('foo')
184 tier_pool_id = self.rados.pool_lookup('foo-cache')
185
186 cmd = {"prefix":"osd tier add", "pool":"foo", "tierpool":"foo-cache", "force_nonempty":""}
187 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
188 eq(ret, 0)
189
190 try:
191 cmd = {"prefix":"osd tier cache-mode", "pool":"foo-cache", "tierpool":"foo-cache", "mode":"readonly", "yes_i_really_mean_it": True}
192 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
193 eq(ret, 0)
194
195 eq(self.rados.wait_for_latest_osdmap(), 0)
196
197 eq(pool_id, self.rados.get_pool_base_tier(pool_id))
198 eq(pool_id, self.rados.get_pool_base_tier(tier_pool_id))
199 finally:
200 cmd = {"prefix":"osd tier remove", "pool":"foo", "tierpool":"foo-cache"}
201 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
202 eq(ret, 0)
203 finally:
204 self.rados.delete_pool('foo-cache')
205 finally:
206 self.rados.delete_pool('foo')
207
208 def test_get_fsid(self):
209 fsid = self.rados.get_fsid()
210 assert re.match('[0-9a-f\-]{36}', fsid, re.I)
211
212 def test_blocklist_add(self):
213 self.rados.blocklist_add("1.2.3.4/123", 1)
214
215 @pytest.mark.stats
216 def test_get_cluster_stats(self):
217 stats = self.rados.get_cluster_stats()
218 assert stats['kb'] > 0
219 assert stats['kb_avail'] > 0
220 assert stats['kb_used'] > 0
221 assert stats['num_objects'] >= 0
222
223 def test_monitor_log(self):
224 lock = threading.Condition()
225 def cb(arg, line, who, sec, nsec, seq, level, msg):
226 # NOTE(sileht): the old pyrados API was received the pointer as int
227 # instead of the value of arg
228 eq(arg, "arg")
229 with lock:
230 lock.notify()
231 return 0
232
233 # NOTE(sileht): force don't save the monitor into local var
234 # to ensure all references are correctly tracked into the lib
235 MonitorLog(self.rados, "debug", cb, "arg")
236 with lock:
237 lock.wait()
238 MonitorLog(self.rados, "debug", None, None)
239 eq(None, self.rados.monitor_callback)
240
241 class TestIoctx(object):
242
243 def setup_method(self, method):
244 self.rados = Rados(conffile='')
245 self.rados.connect()
246 self.rados.create_pool('test_pool')
247 assert self.rados.pool_exists('test_pool')
248 self.ioctx = self.rados.open_ioctx('test_pool')
249
250 def teardown_method(self, method):
251 cmd = {"prefix":"osd unset", "key":"noup"}
252 self.rados.mon_command(json.dumps(cmd), b'')
253 self.ioctx.close()
254 self.rados.delete_pool('test_pool')
255 self.rados.shutdown()
256
257 def test_get_last_version(self):
258 version = self.ioctx.get_last_version()
259 assert version >= 0
260
261 def test_get_stats(self):
262 stats = self.ioctx.get_stats()
263 eq(stats, {'num_objects_unfound': 0,
264 'num_objects_missing_on_primary': 0,
265 'num_object_clones': 0,
266 'num_objects': 0,
267 'num_object_copies': 0,
268 'num_bytes': 0,
269 'num_rd_kb': 0,
270 'num_wr_kb': 0,
271 'num_kb': 0,
272 'num_wr': 0,
273 'num_objects_degraded': 0,
274 'num_rd': 0})
275
276 def test_write(self):
277 self.ioctx.write('abc', b'abc')
278 eq(self.ioctx.read('abc'), b'abc')
279
280 def test_write_full(self):
281 self.ioctx.write('abc', b'abc')
282 eq(self.ioctx.read('abc'), b'abc')
283 self.ioctx.write_full('abc', b'd')
284 eq(self.ioctx.read('abc'), b'd')
285
286 def test_writesame(self):
287 self.ioctx.writesame('ob', b'rzx', 9)
288 eq(self.ioctx.read('ob'), b'rzxrzxrzx')
289
290 def test_append(self):
291 self.ioctx.write('abc', b'a')
292 self.ioctx.append('abc', b'b')
293 self.ioctx.append('abc', b'c')
294 eq(self.ioctx.read('abc'), b'abc')
295
296 def test_write_zeros(self):
297 self.ioctx.write('abc', b'a\0b\0c')
298 eq(self.ioctx.read('abc'), b'a\0b\0c')
299
300 def test_trunc(self):
301 self.ioctx.write('abc', b'abc')
302 self.ioctx.trunc('abc', 2)
303 eq(self.ioctx.read('abc'), b'ab')
304 size = self.ioctx.stat('abc')[0]
305 eq(size, 2)
306
307 def test_cmpext(self):
308 self.ioctx.write('test_object', b'abcdefghi')
309 eq(0, self.ioctx.cmpext('test_object', b'abcdefghi', 0))
310 eq(-MAX_ERRNO - 4, self.ioctx.cmpext('test_object', b'abcdxxxxx', 0))
311
312 def test_list_objects_empty(self):
313 eq(list(self.ioctx.list_objects()), [])
314
315 def test_list_objects(self):
316 self.ioctx.write('a', b'')
317 self.ioctx.write('b', b'foo')
318 self.ioctx.write_full('c', b'bar')
319 self.ioctx.append('d', b'jazz')
320 object_names = [obj.key for obj in self.ioctx.list_objects()]
321 eq(sorted(object_names), ['a', 'b', 'c', 'd'])
322
323 def test_list_ns_objects(self):
324 self.ioctx.write('a', b'')
325 self.ioctx.write('b', b'foo')
326 self.ioctx.write_full('c', b'bar')
327 self.ioctx.append('d', b'jazz')
328 self.ioctx.set_namespace("ns1")
329 self.ioctx.write('ns1-a', b'')
330 self.ioctx.write('ns1-b', b'foo')
331 self.ioctx.write_full('ns1-c', b'bar')
332 self.ioctx.append('ns1-d', b'jazz')
333 self.ioctx.append('d', b'jazz')
334 self.ioctx.set_namespace(LIBRADOS_ALL_NSPACES)
335 object_names = [(obj.nspace, obj.key) for obj in self.ioctx.list_objects()]
336 eq(sorted(object_names), [('', 'a'), ('','b'), ('','c'), ('','d'),\
337 ('ns1', 'd'), ('ns1', 'ns1-a'), ('ns1', 'ns1-b'),\
338 ('ns1', 'ns1-c'), ('ns1', 'ns1-d')])
339
340 def test_xattrs(self):
341 xattrs = dict(a=b'1', b=b'2', c=b'3', d=b'a\0b', e=b'\0', f=b'')
342 self.ioctx.write('abc', b'')
343 for key, value in xattrs.items():
344 self.ioctx.set_xattr('abc', key, value)
345 eq(self.ioctx.get_xattr('abc', key), value)
346 stored_xattrs = {}
347 for key, value in self.ioctx.get_xattrs('abc'):
348 stored_xattrs[key] = value
349 eq(stored_xattrs, xattrs)
350
351 def test_obj_xattrs(self):
352 xattrs = dict(a=b'1', b=b'2', c=b'3', d=b'a\0b', e=b'\0', f=b'')
353 self.ioctx.write('abc', b'')
354 obj = list(self.ioctx.list_objects())[0]
355 for key, value in xattrs.items():
356 obj.set_xattr(key, value)
357 eq(obj.get_xattr(key), value)
358 stored_xattrs = {}
359 for key, value in obj.get_xattrs():
360 stored_xattrs[key] = value
361 eq(stored_xattrs, xattrs)
362
363 def test_get_pool_id(self):
364 eq(self.ioctx.get_pool_id(), self.rados.pool_lookup('test_pool'))
365
366 def test_get_pool_name(self):
367 eq(self.ioctx.get_pool_name(), 'test_pool')
368
369 def test_create_snap(self):
370 assert_raises(ObjectNotFound, self.ioctx.remove_snap, 'foo')
371 self.ioctx.create_snap('foo')
372 self.ioctx.remove_snap('foo')
373
374 def test_list_snaps_empty(self):
375 eq(list(self.ioctx.list_snaps()), [])
376
377 def test_list_snaps(self):
378 snaps = ['snap1', 'snap2', 'snap3']
379 for snap in snaps:
380 self.ioctx.create_snap(snap)
381 listed_snaps = [snap.name for snap in self.ioctx.list_snaps()]
382 eq(snaps, listed_snaps)
383
384 def test_lookup_snap(self):
385 self.ioctx.create_snap('foo')
386 snap = self.ioctx.lookup_snap('foo')
387 eq(snap.name, 'foo')
388
389 def test_snap_timestamp(self):
390 self.ioctx.create_snap('foo')
391 snap = self.ioctx.lookup_snap('foo')
392 snap.get_timestamp()
393
394 def test_remove_snap(self):
395 self.ioctx.create_snap('foo')
396 (snap,) = self.ioctx.list_snaps()
397 eq(snap.name, 'foo')
398 self.ioctx.remove_snap('foo')
399 eq(list(self.ioctx.list_snaps()), [])
400
401 @pytest.mark.rollback
402 def test_snap_rollback(self):
403 self.ioctx.write("insnap", b"contents1")
404 self.ioctx.create_snap("snap1")
405 self.ioctx.remove_object("insnap")
406 self.ioctx.snap_rollback("insnap", "snap1")
407 eq(self.ioctx.read("insnap"), b"contents1")
408 self.ioctx.remove_snap("snap1")
409 self.ioctx.remove_object("insnap")
410
411 @pytest.mark.rollback
412 def test_snap_rollback_removed(self):
413 self.ioctx.write("insnap", b"contents1")
414 self.ioctx.create_snap("snap1")
415 self.ioctx.write("insnap", b"contents2")
416 self.ioctx.snap_rollback("insnap", "snap1")
417 eq(self.ioctx.read("insnap"), b"contents1")
418 self.ioctx.remove_snap("snap1")
419 self.ioctx.remove_object("insnap")
420
421 def test_snap_read(self):
422 self.ioctx.write("insnap", b"contents1")
423 self.ioctx.create_snap("snap1")
424 self.ioctx.remove_object("insnap")
425 snap = self.ioctx.lookup_snap("snap1")
426 self.ioctx.set_read(snap.snap_id)
427 eq(self.ioctx.read("insnap"), b"contents1")
428 self.ioctx.set_read(LIBRADOS_SNAP_HEAD)
429 self.ioctx.write("inhead", b"contents2")
430 eq(self.ioctx.read("inhead"), b"contents2")
431 self.ioctx.remove_snap("snap1")
432 self.ioctx.remove_object("inhead")
433
434 def test_set_omap(self):
435 keys = ("1", "2", "3", "4", b"\xff")
436 values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04", b"5")
437 with WriteOpCtx() as write_op:
438 self.ioctx.set_omap(write_op, keys, values)
439 write_op.set_flags(LIBRADOS_OPERATION_SKIPRWLOCKS)
440 self.ioctx.operate_write_op(write_op, "hw")
441 with ReadOpCtx() as read_op:
442 iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 5, omap_key_type=bytes)
443 eq(ret, 0)
444 self.ioctx.operate_read_op(read_op, "hw")
445 next(iter)
446 eq(list(iter), [(b"2", b"bbb"), (b"3", b"ccc"), (b"4", b"\x04\x04\x04\x04"), (b"\xff", b"5")])
447 with ReadOpCtx() as read_op:
448 iter, ret = self.ioctx.get_omap_vals(read_op, b"2", "", 4, omap_key_type=bytes)
449 eq(ret, 0)
450 self.ioctx.operate_read_op(read_op, "hw")
451 eq((b"3", b"ccc"), next(iter))
452 eq(list(iter), [(b"4", b"\x04\x04\x04\x04"), (b"\xff", b"5")])
453 with ReadOpCtx() as read_op:
454 iter, ret = self.ioctx.get_omap_vals(read_op, "", "2", 4, omap_key_type=bytes)
455 eq(ret, 0)
456 read_op.set_flags(LIBRADOS_OPERATION_BALANCE_READS)
457 self.ioctx.operate_read_op(read_op, "hw")
458 eq(list(iter), [(b"2", b"bbb")])
459
460 def test_set_omap_aio(self):
461 lock = threading.Condition()
462 count = [0]
463 def cb(blah):
464 with lock:
465 count[0] += 1
466 lock.notify()
467 return 0
468
469 keys = ("1", "2", "3", "4")
470 values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
471 with WriteOpCtx() as write_op:
472 self.ioctx.set_omap(write_op, keys, values)
473 comp = self.ioctx.operate_aio_write_op(write_op, "hw", cb, cb)
474 comp.wait_for_complete()
475 with lock:
476 while count[0] < 2:
477 lock.wait()
478 eq(comp.get_return_value(), 0)
479
480 with ReadOpCtx() as read_op:
481 iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 4)
482 eq(ret, 0)
483 comp = self.ioctx.operate_aio_read_op(read_op, "hw", cb, cb)
484 comp.wait_for_complete()
485 with lock:
486 while count[0] < 4:
487 lock.wait()
488 eq(comp.get_return_value(), 0)
489 next(iter)
490 eq(list(iter), [("2", b"bbb"), ("3", b"ccc"), ("4", b"\x04\x04\x04\x04")])
491
492 def test_write_ops(self):
493 with WriteOpCtx() as write_op:
494 write_op.new(0)
495 self.ioctx.operate_write_op(write_op, "write_ops")
496 eq(self.ioctx.read('write_ops'), b'')
497
498 write_op.write_full(b'1')
499 write_op.append(b'2')
500 self.ioctx.operate_write_op(write_op, "write_ops")
501 eq(self.ioctx.read('write_ops'), b'12')
502
503 write_op.write_full(b'12345')
504 write_op.write(b'x', 2)
505 self.ioctx.operate_write_op(write_op, "write_ops")
506 eq(self.ioctx.read('write_ops'), b'12x45')
507
508 write_op.write_full(b'12345')
509 write_op.zero(2, 2)
510 self.ioctx.operate_write_op(write_op, "write_ops")
511 eq(self.ioctx.read('write_ops'), b'12\x00\x005')
512
513 write_op.write_full(b'12345')
514 write_op.truncate(2)
515 self.ioctx.operate_write_op(write_op, "write_ops")
516 eq(self.ioctx.read('write_ops'), b'12')
517
518 write_op.remove()
519 self.ioctx.operate_write_op(write_op, "write_ops")
520 with pytest.raises(ObjectNotFound):
521 self.ioctx.read('write_ops')
522
523 def test_execute_op(self):
524 with WriteOpCtx() as write_op:
525 write_op.execute("hello", "record_hello", b"ebs")
526 self.ioctx.operate_write_op(write_op, "object")
527 eq(self.ioctx.read('object'), b"Hello, ebs!")
528
529 def test_writesame_op(self):
530 with WriteOpCtx() as write_op:
531 write_op.writesame(b'rzx', 9)
532 self.ioctx.operate_write_op(write_op, 'abc')
533 eq(self.ioctx.read('abc'), b'rzxrzxrzx')
534
535 def test_get_omap_vals_by_keys(self):
536 keys = ("1", "2", "3", "4", b"\xff")
537 values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04", b"5")
538 with WriteOpCtx() as write_op:
539 self.ioctx.set_omap(write_op, keys, values)
540 self.ioctx.operate_write_op(write_op, "hw")
541 with ReadOpCtx() as read_op:
542 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("3","4",b"\xff"), omap_key_type=bytes)
543 eq(ret, 0)
544 self.ioctx.operate_read_op(read_op, "hw")
545 eq(list(iter), [(b"3", b"ccc"), (b"4", b"\x04\x04\x04\x04"), (b"\xff", b"5")])
546 with ReadOpCtx() as read_op:
547 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("3","4",), omap_key_type=bytes)
548 eq(ret, 0)
549 with pytest.raises(ObjectNotFound):
550 self.ioctx.operate_read_op(read_op, "no_such")
551
552 def test_get_omap_keys(self):
553 keys = ("1", "2", "3")
554 values = (b"aaa", b"bbb", b"ccc")
555 with WriteOpCtx() as write_op:
556 self.ioctx.set_omap(write_op, keys, values)
557 self.ioctx.operate_write_op(write_op, "hw")
558 with ReadOpCtx() as read_op:
559 iter, ret = self.ioctx.get_omap_keys(read_op,"",2)
560 eq(ret, 0)
561 self.ioctx.operate_read_op(read_op, "hw")
562 eq(list(iter), [("1", None), ("2", None)])
563 with ReadOpCtx() as read_op:
564 iter, ret = self.ioctx.get_omap_keys(read_op,"",2)
565 eq(ret, 0)
566 with pytest.raises(ObjectNotFound):
567 self.ioctx.operate_read_op(read_op, "no_such")
568
569 def test_clear_omap(self):
570 keys = ("1", "2", "3")
571 values = (b"aaa", b"bbb", b"ccc")
572 with WriteOpCtx() as write_op:
573 self.ioctx.set_omap(write_op, keys, values)
574 self.ioctx.operate_write_op(write_op, "hw")
575 with WriteOpCtx() as write_op_1:
576 self.ioctx.clear_omap(write_op_1)
577 self.ioctx.operate_write_op(write_op_1, "hw")
578 with ReadOpCtx() as read_op:
579 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("1",))
580 eq(ret, 0)
581 self.ioctx.operate_read_op(read_op, "hw")
582 eq(list(iter), [])
583
584 def test_remove_omap_range2(self):
585 keys = ("1", "2", "3", "4")
586 values = (b"a", b"bb", b"ccc", b"dddd")
587 with WriteOpCtx() as write_op:
588 self.ioctx.set_omap(write_op, keys, values)
589 self.ioctx.operate_write_op(write_op, "test_obj")
590 with ReadOpCtx() as read_op:
591 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op, keys)
592 eq(ret, 0)
593 self.ioctx.operate_read_op(read_op, "test_obj")
594 eq(list(iter), list(zip(keys, values)))
595 with WriteOpCtx() as write_op:
596 self.ioctx.remove_omap_range2(write_op, "1", "4")
597 self.ioctx.operate_write_op(write_op, "test_obj")
598 with ReadOpCtx() as read_op:
599 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op, keys)
600 eq(ret, 0)
601 self.ioctx.operate_read_op(read_op, "test_obj")
602 eq(list(iter), [("4", b"dddd")])
603
604 def test_omap_cmp(self):
605 object_id = 'test'
606 self.ioctx.write(object_id, b'omap_cmp')
607 with WriteOpCtx() as write_op:
608 self.ioctx.set_omap(write_op, ('key1',), ('1',))
609 self.ioctx.operate_write_op(write_op, object_id)
610 with WriteOpCtx() as write_op:
611 write_op.omap_cmp('key1', '1', LIBRADOS_CMPXATTR_OP_EQ)
612 self.ioctx.set_omap(write_op, ('key1',), ('2',))
613 self.ioctx.operate_write_op(write_op, object_id)
614 with ReadOpCtx() as read_op:
615 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op, ('key1',))
616 eq(ret, 0)
617 self.ioctx.operate_read_op(read_op, object_id)
618 eq(list(iter), [('key1', b'2')])
619 with WriteOpCtx() as write_op:
620 write_op.omap_cmp('key1', '1', LIBRADOS_CMPXATTR_OP_GT)
621 self.ioctx.set_omap(write_op, ('key1',), ('3',))
622 self.ioctx.operate_write_op(write_op, object_id)
623 with ReadOpCtx() as read_op:
624 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op, ('key1',))
625 eq(ret, 0)
626 self.ioctx.operate_read_op(read_op, object_id)
627 eq(list(iter), [('key1', b'3')])
628 with WriteOpCtx() as write_op:
629 write_op.omap_cmp('key1', '4', LIBRADOS_CMPXATTR_OP_LT)
630 self.ioctx.set_omap(write_op, ('key1',), ('4',))
631 self.ioctx.operate_write_op(write_op, object_id)
632 with ReadOpCtx() as read_op:
633 iter, ret = self.ioctx.get_omap_vals_by_keys(read_op, ('key1',))
634 eq(ret, 0)
635 self.ioctx.operate_read_op(read_op, object_id)
636 eq(list(iter), [('key1', b'4')])
637 with WriteOpCtx() as write_op:
638 write_op.omap_cmp('key1', '1', LIBRADOS_CMPXATTR_OP_EQ)
639 self.ioctx.set_omap(write_op, ('key1',), ('5',))
640 try:
641 self.ioctx.operate_write_op(write_op, object_id)
642 except (OSError, ExtendMismatch) as e:
643 eq(e.errno, 125)
644 else:
645 message = "omap_cmp did not raise Exception when omap content does not match"
646 raise AssertionError(message)
647
648 def test_cmpext_op(self):
649 object_id = 'test'
650 with WriteOpCtx() as write_op:
651 write_op.write(b'12345', 0)
652 self.ioctx.operate_write_op(write_op, object_id)
653 with WriteOpCtx() as write_op:
654 write_op.cmpext(b'12345', 0)
655 write_op.write(b'54321', 0)
656 self.ioctx.operate_write_op(write_op, object_id)
657 eq(self.ioctx.read(object_id), b'54321')
658 with WriteOpCtx() as write_op:
659 write_op.cmpext(b'56789', 0)
660 write_op.write(b'12345', 0)
661 try:
662 self.ioctx.operate_write_op(write_op, object_id)
663 except ExtendMismatch as e:
664 # the cmpext_result compare with expected error number, it should be (-MAX_ERRNO - 1)
665 # where "1" is the offset of the first unmatched byte
666 eq(-e.errno, -MAX_ERRNO - 1)
667 eq(e.offset, 1)
668 else:
669 message = "cmpext did not raise Exception when object content does not match"
670 raise AssertionError(message)
671 with ReadOpCtx() as read_op:
672 read_op.cmpext(b'54321', 0)
673 self.ioctx.operate_read_op(read_op, object_id)
674 with ReadOpCtx() as read_op:
675 read_op.cmpext(b'54789', 0)
676 try:
677 self.ioctx.operate_read_op(read_op, object_id)
678 except ExtendMismatch as e:
679 # the cmpext_result compare with expected error number, it should be (-MAX_ERRNO - 2)
680 # where "2" is the offset of the first unmatched byte
681 eq(-e.errno, -MAX_ERRNO - 2)
682 eq(e.offset, 2)
683 else:
684 message = "cmpext did not raise Exception when object content does not match"
685 raise AssertionError(message)
686
687 def test_xattrs_op(self):
688 xattrs = dict(a=b'1', b=b'2', c=b'3', d=b'a\0b', e=b'\0')
689 with WriteOpCtx() as write_op:
690 write_op.new(LIBRADOS_CREATE_EXCLUSIVE)
691 for key, value in xattrs.items():
692 write_op.set_xattr(key, value)
693 self.ioctx.operate_write_op(write_op, 'abc')
694 eq(self.ioctx.get_xattr('abc', key), value)
695
696 stored_xattrs_1 = {}
697 for key, value in self.ioctx.get_xattrs('abc'):
698 stored_xattrs_1[key] = value
699 eq(stored_xattrs_1, xattrs)
700
701 for key in xattrs.keys():
702 write_op.rm_xattr(key)
703 self.ioctx.operate_write_op(write_op, 'abc')
704 stored_xattrs_2 = {}
705 for key, value in self.ioctx.get_xattrs('abc'):
706 stored_xattrs_2[key] = value
707 eq(stored_xattrs_2, {})
708
709 write_op.remove()
710 self.ioctx.operate_write_op(write_op, 'abc')
711
712 def test_locator(self):
713 self.ioctx.set_locator_key("bar")
714 self.ioctx.write('foo', b'contents1')
715 objects = [i for i in self.ioctx.list_objects()]
716 eq(len(objects), 1)
717 eq(self.ioctx.get_locator_key(), "bar")
718 self.ioctx.set_locator_key("")
719 objects[0].seek(0)
720 objects[0].write(b"contents2")
721 eq(self.ioctx.get_locator_key(), "")
722 self.ioctx.set_locator_key("bar")
723 contents = self.ioctx.read("foo")
724 eq(contents, b"contents2")
725 eq(self.ioctx.get_locator_key(), "bar")
726 objects[0].remove()
727 objects = [i for i in self.ioctx.list_objects()]
728 eq(objects, [])
729 self.ioctx.set_locator_key("")
730
731 def test_operate_aio_write_op(self):
732 lock = threading.Condition()
733 count = [0]
734 def cb(blah):
735 with lock:
736 count[0] += 1
737 lock.notify()
738 return 0
739 with WriteOpCtx() as write_op:
740 write_op.write(b'rzx')
741 comp = self.ioctx.operate_aio_write_op(write_op, "object", cb, cb)
742 comp.wait_for_complete()
743 with lock:
744 while count[0] < 2:
745 lock.wait()
746 eq(comp.get_return_value(), 0)
747 eq(self.ioctx.read('object'), b'rzx')
748
749 def test_aio_write(self):
750 lock = threading.Condition()
751 count = [0]
752 def cb(blah):
753 with lock:
754 count[0] += 1
755 lock.notify()
756 return 0
757 comp = self.ioctx.aio_write("foo", b"bar", 0, cb, cb)
758 comp.wait_for_complete()
759 with lock:
760 while count[0] < 2:
761 lock.wait()
762 eq(comp.get_return_value(), 0)
763 contents = self.ioctx.read("foo")
764 eq(contents, b"bar")
765 [i.remove() for i in self.ioctx.list_objects()]
766
767 def test_aio_cmpext(self):
768 lock = threading.Condition()
769 count = [0]
770 def cb(blah):
771 with lock:
772 count[0] += 1
773 lock.notify()
774 return 0
775
776 self.ioctx.write('test_object', b'abcdefghi')
777 comp = self.ioctx.aio_cmpext('test_object', b'abcdefghi', 0, cb)
778 comp.wait_for_complete()
779 with lock:
780 while count[0] < 1:
781 lock.wait()
782 eq(comp.get_return_value(), 0)
783
784 def test_aio_rmxattr(self):
785 lock = threading.Condition()
786 count = [0]
787 def cb(blah):
788 with lock:
789 count[0] += 1
790 lock.notify()
791 return 0
792 self.ioctx.set_xattr("xyz", "key", b'value')
793 eq(self.ioctx.get_xattr("xyz", "key"), b'value')
794 comp = self.ioctx.aio_rmxattr("xyz", "key", cb)
795 comp.wait_for_complete()
796 with lock:
797 while count[0] < 1:
798 lock.wait()
799 eq(comp.get_return_value(), 0)
800 with pytest.raises(NoData):
801 self.ioctx.get_xattr("xyz", "key")
802
803 def test_aio_write_no_comp_ref(self):
804 lock = threading.Condition()
805 count = [0]
806 def cb(blah):
807 with lock:
808 count[0] += 1
809 lock.notify()
810 return 0
811 # NOTE(sileht): force don't save the comp into local var
812 # to ensure all references are correctly tracked into the lib
813 self.ioctx.aio_write("foo", b"bar", 0, cb, cb)
814 with lock:
815 while count[0] < 2:
816 lock.wait()
817 contents = self.ioctx.read("foo")
818 eq(contents, b"bar")
819 [i.remove() for i in self.ioctx.list_objects()]
820
821 def test_aio_append(self):
822 lock = threading.Condition()
823 count = [0]
824 def cb(blah):
825 with lock:
826 count[0] += 1
827 lock.notify()
828 return 0
829 comp = self.ioctx.aio_write("foo", b"bar", 0, cb, cb)
830 comp2 = self.ioctx.aio_append("foo", b"baz", cb, cb)
831 comp.wait_for_complete()
832 contents = self.ioctx.read("foo")
833 eq(contents, b"barbaz")
834 with lock:
835 while count[0] < 4:
836 lock.wait()
837 eq(comp.get_return_value(), 0)
838 eq(comp2.get_return_value(), 0)
839 [i.remove() for i in self.ioctx.list_objects()]
840
841 def test_aio_write_full(self):
842 lock = threading.Condition()
843 count = [0]
844 def cb(blah):
845 with lock:
846 count[0] += 1
847 lock.notify()
848 return 0
849 self.ioctx.aio_write("foo", b"barbaz", 0, cb, cb)
850 comp = self.ioctx.aio_write_full("foo", b"bar", cb, cb)
851 comp.wait_for_complete()
852 with lock:
853 while count[0] < 2:
854 lock.wait()
855 eq(comp.get_return_value(), 0)
856 contents = self.ioctx.read("foo")
857 eq(contents, b"bar")
858 [i.remove() for i in self.ioctx.list_objects()]
859
860 def test_aio_writesame(self):
861 lock = threading.Condition()
862 count = [0]
863 def cb(blah):
864 with lock:
865 count[0] += 1
866 lock.notify()
867 return 0
868 comp = self.ioctx.aio_writesame("abc", b"rzx", 9, 0, cb)
869 comp.wait_for_complete()
870 with lock:
871 while count[0] < 1:
872 lock.wait()
873 eq(comp.get_return_value(), 0)
874 eq(self.ioctx.read("abc"), b"rzxrzxrzx")
875 [i.remove() for i in self.ioctx.list_objects()]
876
877 def test_aio_stat(self):
878 lock = threading.Condition()
879 count = [0]
880 def cb(_, size, mtime):
881 with lock:
882 count[0] += 1
883 lock.notify()
884
885 comp = self.ioctx.aio_stat("foo", cb)
886 comp.wait_for_complete()
887 with lock:
888 while count[0] < 1:
889 lock.wait()
890 eq(comp.get_return_value(), -2)
891
892 self.ioctx.write("foo", b"bar")
893
894 comp = self.ioctx.aio_stat("foo", cb)
895 comp.wait_for_complete()
896 with lock:
897 while count[0] < 2:
898 lock.wait()
899 eq(comp.get_return_value(), 0)
900
901 [i.remove() for i in self.ioctx.list_objects()]
902
903 def test_aio_remove(self):
904 lock = threading.Condition()
905 count = [0]
906 def cb(blah):
907 with lock:
908 count[0] += 1
909 lock.notify()
910 return 0
911 self.ioctx.write('foo', b'wrx')
912 eq(self.ioctx.read('foo'), b'wrx')
913 comp = self.ioctx.aio_remove('foo', cb, cb)
914 comp.wait_for_complete()
915 with lock:
916 while count[0] < 2:
917 lock.wait()
918 eq(comp.get_return_value(), 0)
919 eq(list(self.ioctx.list_objects()), [])
920
921 def _take_down_acting_set(self, pool, objectname):
922 # find acting_set for pool:objectname and take it down; used to
923 # verify that async reads don't complete while acting set is missing
924 cmd = {
925 "prefix":"osd map",
926 "pool":pool,
927 "object":objectname,
928 "format":"json",
929 }
930 r, jsonout, _ = self.rados.mon_command(json.dumps(cmd), b'')
931 objmap = json.loads(jsonout.decode("utf-8"))
932 acting_set = objmap['acting']
933 cmd = {"prefix":"osd set", "key":"noup"}
934 r, _, _ = self.rados.mon_command(json.dumps(cmd), b'')
935 eq(r, 0)
936 cmd = {"prefix":"osd down", "ids":[str(i) for i in acting_set]}
937 r, _, _ = self.rados.mon_command(json.dumps(cmd), b'')
938 eq(r, 0)
939
940 # wait for OSDs to acknowledge the down
941 eq(self.rados.wait_for_latest_osdmap(), 0)
942
943 def _let_osds_back_up(self):
944 cmd = {"prefix":"osd unset", "key":"noup"}
945 r, _, _ = self.rados.mon_command(json.dumps(cmd), b'')
946 eq(r, 0)
947
948 @pytest.mark.wait
949 def test_aio_read_wait_for_complete(self):
950 # use wait_for_complete() and wait for cb by
951 # watching retval[0]
952
953 # this is a list so that the local cb() can modify it
954 payload = b"bar\000frob"
955 self.ioctx.write("foo", payload)
956 self._take_down_acting_set('test_pool', 'foo')
957
958 retval = [None]
959 lock = threading.Condition()
960 def cb(_, buf):
961 with lock:
962 retval[0] = buf
963 lock.notify()
964
965 comp = self.ioctx.aio_read("foo", len(payload), 0, cb)
966 eq(False, comp.is_complete())
967 time.sleep(3)
968 eq(False, comp.is_complete())
969 with lock:
970 eq(None, retval[0])
971
972 self._let_osds_back_up()
973 comp.wait_for_complete()
974 loops = 0
975 with lock:
976 while retval[0] is None and loops <= 10:
977 lock.wait(timeout=5)
978 loops += 1
979 assert(loops <= 10)
980
981 eq(retval[0], payload)
982 eq(sys.getrefcount(comp), 2)
983
984 @pytest.mark.wait
985 def test_aio_read_wait_for_complete_and_cb(self):
986 # use wait_for_complete_and_cb(), verify retval[0] is
987 # set by the time we regain control
988 payload = b"bar\000frob"
989 self.ioctx.write("foo", payload)
990
991 self._take_down_acting_set('test_pool', 'foo')
992 # this is a list so that the local cb() can modify it
993 retval = [None]
994 lock = threading.Condition()
995 def cb(_, buf):
996 with lock:
997 retval[0] = buf
998 lock.notify()
999 comp = self.ioctx.aio_read("foo", len(payload), 0, cb)
1000 eq(False, comp.is_complete())
1001 time.sleep(3)
1002 eq(False, comp.is_complete())
1003 with lock:
1004 eq(None, retval[0])
1005
1006 self._let_osds_back_up()
1007 comp.wait_for_complete_and_cb()
1008 assert(retval[0] is not None)
1009 eq(retval[0], payload)
1010 eq(sys.getrefcount(comp), 2)
1011
1012 @pytest.mark.wait
1013 def test_aio_read_wait_for_complete_and_cb_error(self):
1014 # error case, use wait_for_complete_and_cb(), verify retval[0] is
1015 # set by the time we regain control
1016 self._take_down_acting_set('test_pool', 'bar')
1017
1018 # this is a list so that the local cb() can modify it
1019 retval = [1]
1020 lock = threading.Condition()
1021 def cb(_, buf):
1022 with lock:
1023 retval[0] = buf
1024 lock.notify()
1025
1026 # read from a DNE object
1027 comp = self.ioctx.aio_read("bar", 3, 0, cb)
1028 eq(False, comp.is_complete())
1029 time.sleep(3)
1030 eq(False, comp.is_complete())
1031 with lock:
1032 eq(1, retval[0])
1033 self._let_osds_back_up()
1034
1035 comp.wait_for_complete_and_cb()
1036 eq(None, retval[0])
1037 assert(comp.get_return_value() < 0)
1038 eq(sys.getrefcount(comp), 2)
1039
1040 def test_lock(self):
1041 self.ioctx.lock_exclusive("foo", "lock", "locker", "desc_lock",
1042 10000, 0)
1043 assert_raises(ObjectExists,
1044 self.ioctx.lock_exclusive,
1045 "foo", "lock", "locker", "desc_lock", 10000, 0)
1046 self.ioctx.unlock("foo", "lock", "locker")
1047 assert_raises(ObjectNotFound, self.ioctx.unlock, "foo", "lock", "locker")
1048
1049 self.ioctx.lock_shared("foo", "lock", "locker1", "tag", "desc_lock",
1050 10000, 0)
1051 self.ioctx.lock_shared("foo", "lock", "locker2", "tag", "desc_lock",
1052 10000, 0)
1053 assert_raises(ObjectBusy,
1054 self.ioctx.lock_exclusive,
1055 "foo", "lock", "locker3", "desc_lock", 10000, 0)
1056 self.ioctx.unlock("foo", "lock", "locker1")
1057 self.ioctx.unlock("foo", "lock", "locker2")
1058 assert_raises(ObjectNotFound, self.ioctx.unlock, "foo", "lock", "locker1")
1059 assert_raises(ObjectNotFound, self.ioctx.unlock, "foo", "lock", "locker2")
1060
1061 def test_execute(self):
1062 self.ioctx.write("foo", b"") # ensure object exists
1063
1064 ret, buf = self.ioctx.execute("foo", "hello", "say_hello", b"")
1065 eq(buf, b"Hello, world!")
1066
1067 ret, buf = self.ioctx.execute("foo", "hello", "say_hello", b"nose")
1068 eq(buf, b"Hello, nose!")
1069
1070 def test_aio_execute(self):
1071 count = [0]
1072 retval = [None]
1073 lock = threading.Condition()
1074 def cb(_, buf):
1075 with lock:
1076 if retval[0] is None:
1077 retval[0] = buf
1078 count[0] += 1
1079 lock.notify()
1080 self.ioctx.write("foo", b"") # ensure object exists
1081
1082 comp = self.ioctx.aio_execute("foo", "hello", "say_hello", b"", 32, cb, cb)
1083 comp.wait_for_complete()
1084 with lock:
1085 while count[0] < 2:
1086 lock.wait()
1087 eq(comp.get_return_value(), 13)
1088 eq(retval[0], b"Hello, world!")
1089
1090 retval[0] = None
1091 comp = self.ioctx.aio_execute("foo", "hello", "say_hello", b"nose", 32, cb, cb)
1092 comp.wait_for_complete()
1093 with lock:
1094 while count[0] < 4:
1095 lock.wait()
1096 eq(comp.get_return_value(), 12)
1097 eq(retval[0], b"Hello, nose!")
1098
1099 [i.remove() for i in self.ioctx.list_objects()]
1100
1101 def test_aio_setxattr(self):
1102 lock = threading.Condition()
1103 count = [0]
1104 def cb(blah):
1105 with lock:
1106 count[0] += 1
1107 lock.notify()
1108 return 0
1109 comp = self.ioctx.aio_setxattr("obj", "key", b'value', cb)
1110 comp.wait_for_complete()
1111 with lock:
1112 while count[0] < 1:
1113 lock.wait()
1114 eq(comp.get_return_value(), 0)
1115 eq(self.ioctx.get_xattr("obj", "key"), b'value')
1116
1117 def test_applications(self):
1118 cmd = {"prefix":"osd dump", "format":"json"}
1119 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'')
1120 eq(ret, 0)
1121 assert len(buf) > 0
1122 release = json.loads(buf.decode("utf-8")).get("require_osd_release",
1123 None)
1124 if not release or release[0] < 'l':
1125 pytest.skip('required_osd_release >= l')
1126
1127 eq([], self.ioctx.application_list())
1128
1129 self.ioctx.application_enable("app1")
1130 assert_raises(Error, self.ioctx.application_enable, "app2")
1131 self.ioctx.application_enable("app2", True)
1132
1133 assert_raises(Error, self.ioctx.application_metadata_list, "dne")
1134 eq([], self.ioctx.application_metadata_list("app1"))
1135
1136 assert_raises(Error, self.ioctx.application_metadata_set, "dne", "key",
1137 "key")
1138 self.ioctx.application_metadata_set("app1", "key1", "val1")
1139 eq("val1", self.ioctx.application_metadata_get("app1", "key1"))
1140 self.ioctx.application_metadata_set("app1", "key2", "val2")
1141 eq("val2", self.ioctx.application_metadata_get("app1", "key2"))
1142 self.ioctx.application_metadata_set("app2", "key1", "val1")
1143 eq("val1", self.ioctx.application_metadata_get("app2", "key1"))
1144
1145 eq([("key1", "val1"), ("key2", "val2")],
1146 self.ioctx.application_metadata_list("app1"))
1147
1148 self.ioctx.application_metadata_remove("app1", "key1")
1149 eq([("key2", "val2")], self.ioctx.application_metadata_list("app1"))
1150
1151 def test_service_daemon(self):
1152 name = "pid-" + str(os.getpid())
1153 metadata = {'version': '3.14', 'memory': '42'}
1154 self.rados.service_daemon_register("laundry", name, metadata)
1155 status = {'result': 'unknown', 'test': 'running'}
1156 self.rados.service_daemon_update(status)
1157
1158 def test_alignment(self):
1159 eq(self.ioctx.alignment(), None)
1160
1161
1162 @pytest.mark.ec
1163 class TestIoctxEc(object):
1164
1165 def setup_method(self, method):
1166 self.rados = Rados(conffile='')
1167 self.rados.connect()
1168 self.pool = 'test-ec'
1169 self.profile = 'testprofile-%s' % self.pool
1170 cmd = {"prefix": "osd erasure-code-profile set",
1171 "name": self.profile, "profile": ["k=2", "m=1", "crush-failure-domain=osd"]}
1172 ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
1173 assert ret == 0, out
1174 # create ec pool with profile created above
1175 cmd = {'prefix': 'osd pool create', 'pg_num': 8, 'pgp_num': 8,
1176 'pool': self.pool, 'pool_type': 'erasure',
1177 'erasure_code_profile': self.profile}
1178 ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
1179 assert ret == 0, out
1180 assert self.rados.pool_exists(self.pool)
1181 self.ioctx = self.rados.open_ioctx(self.pool)
1182
1183 def teardown_method(self, method):
1184 cmd = {"prefix": "osd unset", "key": "noup"}
1185 self.rados.mon_command(json.dumps(cmd), b'')
1186 self.ioctx.close()
1187 self.rados.delete_pool(self.pool)
1188 self.rados.shutdown()
1189
1190 def test_alignment(self):
1191 eq(self.ioctx.alignment(), 8192)
1192
1193
1194 class TestIoctx2(object):
1195
1196 def setup_method(self, method):
1197 self.rados = Rados(conffile='')
1198 self.rados.connect()
1199 self.rados.create_pool('test_pool')
1200 assert self.rados.pool_exists('test_pool')
1201 pool_id = self.rados.pool_lookup('test_pool')
1202 assert pool_id > 0
1203 self.ioctx2 = self.rados.open_ioctx2(pool_id)
1204
1205 def teardown_method(self, method):
1206 cmd = {"prefix": "osd unset", "key": "noup"}
1207 self.rados.mon_command(json.dumps(cmd), b'')
1208 self.ioctx2.close()
1209 self.rados.delete_pool('test_pool')
1210 self.rados.shutdown()
1211
1212 def test_get_last_version(self):
1213 version = self.ioctx2.get_last_version()
1214 assert version >= 0
1215
1216 def test_get_stats(self):
1217 stats = self.ioctx2.get_stats()
1218 eq(stats, {'num_objects_unfound': 0,
1219 'num_objects_missing_on_primary': 0,
1220 'num_object_clones': 0,
1221 'num_objects': 0,
1222 'num_object_copies': 0,
1223 'num_bytes': 0,
1224 'num_rd_kb': 0,
1225 'num_wr_kb': 0,
1226 'num_kb': 0,
1227 'num_wr': 0,
1228 'num_objects_degraded': 0,
1229 'num_rd': 0})
1230
1231
1232 class TestObject(object):
1233
1234 def setup_method(self, method):
1235 self.rados = Rados(conffile='')
1236 self.rados.connect()
1237 self.rados.create_pool('test_pool')
1238 assert self.rados.pool_exists('test_pool')
1239 self.ioctx = self.rados.open_ioctx('test_pool')
1240 self.ioctx.write('foo', b'bar')
1241 self.object = Object(self.ioctx, 'foo')
1242
1243 def teardown_method(self, method):
1244 self.ioctx.close()
1245 self.ioctx = None
1246 self.rados.delete_pool('test_pool')
1247 self.rados.shutdown()
1248 self.rados = None
1249
1250 def test_read(self):
1251 eq(self.object.read(3), b'bar')
1252 eq(self.object.read(100), b'')
1253
1254 def test_seek(self):
1255 self.object.write(b'blah')
1256 self.object.seek(0)
1257 eq(self.object.read(4), b'blah')
1258 self.object.seek(1)
1259 eq(self.object.read(3), b'lah')
1260
1261 def test_write(self):
1262 self.object.write(b'barbaz')
1263 self.object.seek(0)
1264 eq(self.object.read(3), b'bar')
1265 eq(self.object.read(3), b'baz')
1266
1267 class TestIoCtxSelfManagedSnaps(object):
1268 def setup_method(self, method):
1269 self.rados = Rados(conffile='')
1270 self.rados.connect()
1271 self.rados.create_pool('test_pool')
1272 assert self.rados.pool_exists('test_pool')
1273 self.ioctx = self.rados.open_ioctx('test_pool')
1274
1275 def teardown_method(self, method):
1276 cmd = {"prefix":"osd unset", "key":"noup"}
1277 self.rados.mon_command(json.dumps(cmd), b'')
1278 self.ioctx.close()
1279 self.rados.delete_pool('test_pool')
1280 self.rados.shutdown()
1281
1282 @pytest.mark.rollback
1283 def test(self):
1284 # cannot mix-and-match pool and self-managed snapshot mode
1285 self.ioctx.set_self_managed_snap_write([])
1286 self.ioctx.write('abc', b'abc')
1287 snap_id_1 = self.ioctx.create_self_managed_snap()
1288 self.ioctx.set_self_managed_snap_write([snap_id_1])
1289
1290 self.ioctx.write('abc', b'def')
1291 snap_id_2 = self.ioctx.create_self_managed_snap()
1292 self.ioctx.set_self_managed_snap_write([snap_id_1, snap_id_2])
1293
1294 self.ioctx.write('abc', b'ghi')
1295
1296 self.ioctx.rollback_self_managed_snap('abc', snap_id_1)
1297 eq(self.ioctx.read('abc'), b'abc')
1298
1299 self.ioctx.rollback_self_managed_snap('abc', snap_id_2)
1300 eq(self.ioctx.read('abc'), b'def')
1301
1302 self.ioctx.remove_self_managed_snap(snap_id_1)
1303 self.ioctx.remove_self_managed_snap(snap_id_2)
1304
1305 class TestCommand(object):
1306
1307 def setup_method(self, method):
1308 self.rados = Rados(conffile='')
1309 self.rados.connect()
1310
1311 def teardown_method(self, method):
1312 self.rados.shutdown()
1313
1314 def test_monmap_dump(self):
1315
1316 # check for success and some plain output with epoch in it
1317 cmd = {"prefix":"mon dump"}
1318 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
1319 eq(ret, 0)
1320 assert len(buf) > 0
1321 assert(b'epoch' in buf)
1322
1323 # JSON, and grab current epoch
1324 cmd['format'] = 'json'
1325 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
1326 eq(ret, 0)
1327 assert len(buf) > 0
1328 d = json.loads(buf.decode("utf-8"))
1329 assert('epoch' in d)
1330 epoch = d['epoch']
1331
1332 # assume epoch + 1000 does not exist; test for ENOENT
1333 cmd['epoch'] = epoch + 1000
1334 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
1335 eq(ret, -errno.ENOENT)
1336 eq(len(buf), 0)
1337 del cmd['epoch']
1338
1339 # send to specific target by name, rank
1340 cmd = {"prefix": "version"}
1341
1342 target = d['mons'][0]['name']
1343 print(target)
1344 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30,
1345 target=target)
1346 eq(ret, 0)
1347 assert len(buf) > 0
1348 e = json.loads(buf.decode("utf-8"))
1349 assert('release' in e)
1350
1351 target = d['mons'][0]['rank']
1352 print(target)
1353 ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30,
1354 target=target)
1355 eq(ret, 0)
1356 assert len(buf) > 0
1357 e = json.loads(buf.decode("utf-8"))
1358 assert('release' in e)
1359
1360 @pytest.mark.bench
1361 def test_osd_bench(self):
1362 cmd = dict(prefix='bench', size=4096, count=8192)
1363 ret, buf, err = self.rados.osd_command(0, json.dumps(cmd), b'',
1364 timeout=30)
1365 eq(ret, 0)
1366 assert len(buf) > 0
1367 out = json.loads(buf.decode('utf-8'))
1368 eq(out['blocksize'], cmd['size'])
1369 eq(out['bytes_written'], cmd['count'])
1370
1371 def test_ceph_osd_pool_create_utf8(self):
1372 poolname = "\u9ec5"
1373
1374 cmd = {"prefix": "osd pool create", "pg_num": 16, "pool": poolname}
1375 ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'')
1376 eq(ret, 0)
1377 assert len(out) > 0
1378 eq(u"pool '\u9ec5' created", out)
1379
1380
1381 @pytest.mark.watch
1382 class TestWatchNotify(object):
1383 OID = "test_watch_notify"
1384
1385 def setup_method(self, method):
1386 self.rados = Rados(conffile='')
1387 self.rados.connect()
1388 self.rados.create_pool('test_pool')
1389 assert self.rados.pool_exists('test_pool')
1390 self.ioctx = self.rados.open_ioctx('test_pool')
1391 self.ioctx.write(self.OID, b'test watch notify')
1392 self.lock = threading.Condition()
1393 self.notify_cnt = {}
1394 self.notify_data = {}
1395 self.notify_error = {}
1396 # aio related
1397 self.ack_cnt = {}
1398 self.ack_data = {}
1399 self.instance_id = self.rados.get_instance_id()
1400
1401 def teardown_method(self, method):
1402 self.ioctx.close()
1403 self.rados.delete_pool('test_pool')
1404 self.rados.shutdown()
1405
1406 def make_callback(self):
1407 def callback(notify_id, notifier_id, watch_id, data):
1408 with self.lock:
1409 if watch_id not in self.notify_cnt:
1410 self.notify_cnt[watch_id] = 1
1411 elif self.notify_data[watch_id] != data:
1412 self.notify_cnt[watch_id] += 1
1413 self.notify_data[watch_id] = data
1414 return callback
1415
1416 def make_error_callback(self):
1417 def callback(watch_id, error):
1418 with self.lock:
1419 self.notify_error[watch_id] = error
1420 return callback
1421
1422
1423 def test(self):
1424 with self.ioctx.watch(self.OID, self.make_callback(),
1425 self.make_error_callback()) as watch1:
1426 watch_id1 = watch1.get_id()
1427 assert(watch_id1 > 0)
1428
1429 with self.rados.open_ioctx('test_pool') as ioctx:
1430 watch2 = ioctx.watch(self.OID, self.make_callback(),
1431 self.make_error_callback())
1432 watch_id2 = watch2.get_id()
1433 assert(watch_id2 > 0)
1434
1435 assert(self.ioctx.notify(self.OID, 'test'))
1436 with self.lock:
1437 assert(watch_id1 in self.notify_cnt)
1438 assert(watch_id2 in self.notify_cnt)
1439 eq(self.notify_cnt[watch_id1], 1)
1440 eq(self.notify_cnt[watch_id2], 1)
1441 eq(self.notify_data[watch_id1], b'test')
1442 eq(self.notify_data[watch_id2], b'test')
1443
1444 assert(watch1.check() >= timedelta())
1445 assert(watch2.check() >= timedelta())
1446
1447 assert(self.ioctx.notify(self.OID, 'best'))
1448 with self.lock:
1449 eq(self.notify_cnt[watch_id1], 2)
1450 eq(self.notify_cnt[watch_id2], 2)
1451 eq(self.notify_data[watch_id1], b'best')
1452 eq(self.notify_data[watch_id2], b'best')
1453
1454 watch2.close()
1455
1456 assert(self.ioctx.notify(self.OID, 'rest'))
1457 with self.lock:
1458 eq(self.notify_cnt[watch_id1], 3)
1459 eq(self.notify_cnt[watch_id2], 2)
1460 eq(self.notify_data[watch_id1], b'rest')
1461 eq(self.notify_data[watch_id2], b'best')
1462
1463 assert(watch1.check() >= timedelta())
1464
1465 self.ioctx.remove_object(self.OID)
1466
1467 for i in range(10):
1468 with self.lock:
1469 if watch_id1 in self.notify_error:
1470 break
1471 time.sleep(1)
1472 eq(self.notify_error[watch_id1], -errno.ENOTCONN)
1473 assert_raises(NotConnected, watch1.check)
1474
1475 assert_raises(ObjectNotFound, self.ioctx.notify, self.OID, 'test')
1476
1477 def make_callback_reply(self):
1478 def callback(notify_id, notifier_id, watch_id, data):
1479 with self.lock:
1480 return data
1481 return callback
1482
1483 def notify_callback(self, _, r, ack_list, timeout_list):
1484 eq(r, 0)
1485 with self.lock:
1486 for notifier_id, _, notifier_data in ack_list:
1487 if notifier_id not in self.ack_cnt:
1488 self.ack_cnt[notifier_id] = 0
1489 self.ack_cnt[notifier_id] += 1
1490 self.ack_data[notifier_id] = notifier_data
1491
1492 def notify_callback_err(self, _, r, ack_list, timeout_list):
1493 eq(r, -errno.ENOENT)
1494
1495 def test_aio_notify(self):
1496 with self.ioctx.watch(self.OID, self.make_callback_reply(),
1497 self.make_error_callback()) as watch1:
1498 watch_id1 = watch1.get_id()
1499 assert watch_id1 > 0
1500
1501 with self.rados.open_ioctx('test_pool') as ioctx:
1502 watch2 = ioctx.watch(self.OID, self.make_callback_reply(),
1503 self.make_error_callback())
1504 watch_id2 = watch2.get_id()
1505 assert watch_id2 > 0
1506
1507 comp = self.ioctx.aio_notify(self.OID, self.notify_callback, msg='test')
1508 comp.wait_for_complete_and_cb()
1509 with self.lock:
1510 assert self.instance_id in self.ack_cnt
1511 eq(self.ack_cnt[self.instance_id], 2)
1512 eq(self.ack_data[self.instance_id], b'test')
1513
1514 assert watch1.check() >= timedelta()
1515 assert watch2.check() >= timedelta()
1516
1517 comp = self.ioctx.aio_notify(self.OID, self.notify_callback, msg='best')
1518 comp.wait_for_complete_and_cb()
1519 with self.lock:
1520 eq(self.ack_cnt[self.instance_id], 4)
1521 eq(self.ack_data[self.instance_id], b'best')
1522
1523 watch2.close()
1524
1525 comp = self.ioctx.aio_notify(self.OID, self.notify_callback, msg='rest')
1526 comp.wait_for_complete_and_cb()
1527 with self.lock:
1528 eq(self.ack_cnt[self.instance_id], 5)
1529 eq(self.ack_data[self.instance_id], b'rest')
1530
1531 assert(watch1.check() >= timedelta())
1532 self.ioctx.remove_object(self.OID)
1533
1534 for i in range(10):
1535 with self.lock:
1536 if watch_id1 in self.notify_error:
1537 break
1538 time.sleep(1)
1539 eq(self.notify_error[watch_id1], -errno.ENOTCONN)
1540 assert_raises(NotConnected, watch1.check)
1541
1542 comp = self.ioctx.aio_notify(self.OID, self.notify_callback_err, msg='test')
1543 comp.wait_for_complete_and_cb()