]> git.proxmox.com Git - ceph.git/blob - ceph/src/spdk/ocf/tests/functional/tests/basic/test_pyocf.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / spdk / ocf / tests / functional / tests / basic / test_pyocf.py
1 #
2 # Copyright(c) 2019 Intel Corporation
3 # SPDX-License-Identifier: BSD-3-Clause-Clear
4 #
5
6 import pytest
7 from ctypes import c_int
8
9 from pyocf.types.cache import Cache
10 from pyocf.types.core import Core
11 from pyocf.types.volume import Volume, ErrorDevice
12 from pyocf.types.data import Data
13 from pyocf.types.io import IoDir
14 from pyocf.utils import Size as S
15 from pyocf.types.shared import OcfError, OcfCompletion
16
17
18 def test_ctx_fixture(pyocf_ctx):
19 pass
20
21
22 def test_simple_wt_write(pyocf_ctx):
23 cache_device = Volume(S.from_MiB(30))
24 core_device = Volume(S.from_MiB(30))
25
26 cache = Cache.start_on_device(cache_device)
27 core = Core.using_device(core_device)
28
29 cache.add_core(core)
30
31 cache_device.reset_stats()
32 core_device.reset_stats()
33
34 write_data = Data.from_string("This is test data")
35 io = core.new_io(cache.get_default_queue(), S.from_sector(1).B,
36 write_data.size, IoDir.WRITE, 0, 0)
37 io.set_data(write_data)
38
39 cmpl = OcfCompletion([("err", c_int)])
40 io.callback = cmpl.callback
41 io.submit()
42 cmpl.wait()
43
44 assert cmpl.results["err"] == 0
45 assert cache_device.get_stats()[IoDir.WRITE] == 1
46 stats = cache.get_stats()
47 assert stats["req"]["wr_full_misses"]["value"] == 1
48 assert stats["usage"]["occupancy"]["value"] == 1
49
50 assert core.exp_obj_md5() == core_device.md5()
51 cache.stop()
52
53
54 def test_start_corrupted_metadata_lba(pyocf_ctx):
55 cache_device = ErrorDevice(S.from_MiB(30), error_sectors=set([0]))
56
57 with pytest.raises(OcfError, match="OCF_ERR_WRITE_CACHE"):
58 cache = Cache.start_on_device(cache_device)
59
60
61 def test_load_cache_no_preexisting_data(pyocf_ctx):
62 cache_device = Volume(S.from_MiB(30))
63
64 with pytest.raises(OcfError, match="OCF_ERR_NO_METADATA"):
65 cache = Cache.load_from_device(cache_device)
66
67
68 def test_load_cache(pyocf_ctx):
69 cache_device = Volume(S.from_MiB(30))
70
71 cache = Cache.start_on_device(cache_device)
72 cache.stop()
73
74 cache = Cache.load_from_device(cache_device)
75
76
77 def test_load_cache_recovery(pyocf_ctx):
78 cache_device = Volume(S.from_MiB(30))
79
80 cache = Cache.start_on_device(cache_device)
81
82 device_copy = cache_device.get_copy()
83
84 cache.stop()
85
86 cache = Cache.load_from_device(device_copy)