]>
git.proxmox.com Git - ceph.git/blob - ceph/src/spdk/ocf/tests/functional/tests/basic/test_pyocf.py
2 # Copyright(c) 2019 Intel Corporation
3 # SPDX-License-Identifier: BSD-3-Clause-Clear
7 from ctypes
import c_int
9 from pyocf
.types
.cache
import Cache
10 from pyocf
.types
.core
import Core
11 from pyocf
.types
.volume
import Volume
, ErrorDevice
12 from pyocf
.types
.data
import Data
13 from pyocf
.types
.io
import IoDir
14 from pyocf
.utils
import Size
as S
15 from pyocf
.types
.shared
import OcfError
, OcfCompletion
18 def test_ctx_fixture(pyocf_ctx
):
22 def test_simple_wt_write(pyocf_ctx
):
23 cache_device
= Volume(S
.from_MiB(30))
24 core_device
= Volume(S
.from_MiB(30))
26 cache
= Cache
.start_on_device(cache_device
)
27 core
= Core
.using_device(core_device
)
31 cache_device
.reset_stats()
32 core_device
.reset_stats()
34 write_data
= Data
.from_string("This is test data")
36 io
.set_data(write_data
)
37 io
.configure(20, write_data
.size
, IoDir
.WRITE
, 0, 0)
38 io
.set_queue(cache
.get_default_queue())
40 cmpl
= OcfCompletion([("err", c_int
)])
41 io
.callback
= cmpl
.callback
45 assert cmpl
.results
["err"] == 0
46 assert cache_device
.get_stats()[IoDir
.WRITE
] == 1
47 stats
= cache
.get_stats()
48 assert stats
["req"]["wr_full_misses"]["value"] == 1
49 assert stats
["usage"]["occupancy"]["value"] == 1
51 assert core
.exp_obj_md5() == core_device
.md5()
55 def test_start_corrupted_metadata_lba(pyocf_ctx
):
56 cache_device
= ErrorDevice(S
.from_MiB(30), error_sectors
=set([0]))
58 with pytest
.raises(OcfError
, match
="OCF_ERR_WRITE_CACHE"):
59 cache
= Cache
.start_on_device(cache_device
)
62 def test_load_cache_no_preexisting_data(pyocf_ctx
):
63 cache_device
= Volume(S
.from_MiB(30))
65 with pytest
.raises(OcfError
, match
="OCF_ERR_START_CACHE_FAIL"):
66 cache
= Cache
.load_from_device(cache_device
)
69 # TODO: Find out why this fails and fix
71 def test_load_cache(pyocf_ctx
):
72 cache_device
= Volume(S
.from_MiB(30))
74 cache
= Cache
.start_on_device(cache_device
)
77 cache
= Cache
.load_from_device(cache_device
)