1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
7 #include "include/Context.h"
8 #include "os/ObjectStore.h"
9 #include "common/WorkQueue.h"
10 #include "common/Semaphore.h"
14 class DumbBackend
: public Backend
{
29 oid(oid
), bl(bl
), offset(offset
), on_applied(on_applied
),
30 on_commit(on_commit
) {}
36 bool do_sync_file_range
;
38 unsigned sync_interval
;
42 class SyncThread
: public Thread
{
45 explicit SyncThread(DumbBackend
*backend
) : backend(backend
) {}
46 void *entry() override
{
51 friend class SyncThread
;
53 Mutex sync_loop_mutex
;
55 int sync_loop_stop
; // 0 for running, 1 for stopping, 2 for stopped
58 Mutex pending_commit_mutex
;
59 set
<Context
*> pending_commits
;
61 class WriteQueue
: public ThreadPool::WorkQueue
<write_item
> {
62 deque
<write_item
*> item_queue
;
67 DumbBackend
*_backend
,
70 ThreadPool::WorkQueue
<write_item
>("DumbBackend::queue", ti
, ti
*10, tp
),
72 bool _enqueue(write_item
*item
) override
{
73 item_queue
.push_back(item
);
76 void _dequeue(write_item
*) override
{ ceph_abort(); }
77 write_item
*_dequeue() override
{
78 if (item_queue
.empty())
80 write_item
*retval
= item_queue
.front();
81 item_queue
.pop_front();
84 bool _empty() override
{
85 return item_queue
.empty();
87 void _process(write_item
*item
, ThreadPool::TPHandle
&) override
{
88 return backend
->_write(
95 void _clear() override
{
96 return item_queue
.clear();
99 friend class WriteQueue
;
101 string
get_full_path(const string
&oid
);
106 const bufferlist
&bl
,
114 bool do_sync_file_range
,
116 unsigned sync_interval
,
118 unsigned worker_threads
,
120 : path(path
), do_fsync(do_fsync
),
121 do_sync_file_range(do_sync_file_range
),
122 do_fadvise(do_fadvise
),
123 sync_interval(sync_interval
),
125 tp(cct
, "DumbBackend::tp", "tp_dumb_backend", worker_threads
),
127 sync_loop_mutex("DumbBackend::sync_loop_mutex"),
129 pending_commit_mutex("DumbBackend::pending_commit_mutex"),
130 queue(this, 20, &tp
) {
131 thread
.create("thread");
133 for (unsigned i
= 0; i
< 10*worker_threads
; ++i
) {
137 ~DumbBackend() override
{
139 Mutex::Locker
l(sync_loop_mutex
);
140 if (sync_loop_stop
== 0)
142 while (sync_loop_stop
< 2)
143 sync_loop_cond
.Wait(sync_loop_mutex
);
151 const bufferlist
&bl
,
153 Context
*on_commit
) override
{
157 oid
, bl
, offset
, on_applied
, on_commit
));
165 Context
*on_complete
) override
;