]> git.proxmox.com Git - ceph.git/blob - ceph/src/spdk/ocf/tests/functional/pyocf/types/queue.py
import 15.2.0 Octopus source
[ceph.git] / ceph / src / spdk / ocf / tests / functional / pyocf / types / queue.py
1 #
2 # Copyright(c) 2019 Intel Corporation
3 # SPDX-License-Identifier: BSD-3-Clause-Clear
4 #
5
6 from ctypes import c_void_p, CFUNCTYPE, Structure, byref
7 from threading import Thread, Condition, Event
8 import weakref
9
10 from ..ocf import OcfLib
11 from .shared import OcfError
12
13
14 class QueueOps(Structure):
15 KICK = CFUNCTYPE(None, c_void_p)
16 KICK_SYNC = CFUNCTYPE(None, c_void_p)
17 STOP = CFUNCTYPE(None, c_void_p)
18
19 _fields_ = [("kick", KICK), ("kick_sync", KICK_SYNC), ("stop", STOP)]
20
21
22 class Queue:
23 pass
24
25
26 def io_queue_run(*, queue: Queue, kick: Condition, stop: Event):
27 def wait_predicate():
28 return stop.is_set() or OcfLib.getInstance().ocf_queue_pending_io(queue)
29
30 while True:
31 with kick:
32 kick.wait_for(wait_predicate)
33
34 OcfLib.getInstance().ocf_queue_run(queue)
35
36 if stop.is_set() and not OcfLib.getInstance().ocf_queue_pending_io(queue):
37 break
38
39
40 class Queue:
41 _instances_ = {}
42
43 def __init__(self, cache, name, mngt_queue: bool = False):
44
45 self.ops = QueueOps(kick=type(self)._kick, stop=type(self)._stop)
46
47 self.handle = c_void_p()
48 status = OcfLib.getInstance().ocf_queue_create(
49 cache.cache_handle, byref(self.handle), byref(self.ops)
50 )
51 if status:
52 raise OcfError("Couldn't create queue object", status)
53
54 Queue._instances_[self.handle.value] = weakref.ref(self)
55 self._as_parameter_ = self.handle
56
57 self.stop_event = Event()
58 self.kick_condition = Condition()
59 self.thread = Thread(
60 group=None,
61 target=io_queue_run,
62 name=name,
63 kwargs={
64 "queue": self,
65 "kick": self.kick_condition,
66 "stop": self.stop_event,
67 },
68 )
69 self.thread.start()
70 self.mngt_queue = mngt_queue
71
72 @classmethod
73 def get_instance(cls, ref):
74 return cls._instances_[ref]()
75
76 @staticmethod
77 @QueueOps.KICK_SYNC
78 def _kick_sync(ref):
79 Queue.get_instance(ref).kick_sync()
80
81 @staticmethod
82 @QueueOps.KICK
83 def _kick(ref):
84 Queue.get_instance(ref).kick()
85
86 @staticmethod
87 @QueueOps.STOP
88 def _stop(ref):
89 Queue.get_instance(ref).stop()
90
91 def kick_sync(self):
92 OcfLib.getInstance().ocf_queue_run(self.handle)
93
94 def kick(self):
95 with self.kick_condition:
96 self.kick_condition.notify_all()
97
98 def stop(self):
99 with self.kick_condition:
100 self.stop_event.set()
101 self.kick_condition.notify_all()
102
103 self.thread.join()
104 if self.mngt_queue:
105 OcfLib.getInstance().ocf_queue_put(self)
106