]> git.proxmox.com Git - ceph.git/blame - ceph/src/client/barrier.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / client / barrier.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 *
5 * Copyright (C) 2012 CohortFS, LLC.
6 *
7 * This is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License version 2.1, as published by the Free Software
10 * Foundation. See file COPYING.
11 *
12 */
13
14#if defined(__FreeBSD__)
15#include <sys/param.h>
16#endif
17
18#include "include/Context.h"
19#include "Client.h"
20#include "barrier.h"
11fdf7f2 21#include "include/ceph_assert.h"
7c673cae
FG
22
23#undef dout_prefix
24#define dout_prefix *_dout << "client." << whoami << " "
25
26#define dout_subsys ceph_subsys_client
27
28#define cldout(cl, v) dout_impl((cl)->cct, dout_subsys, v) \
29 *_dout << "client." << cl->whoami << " "
30
31/* C_Block_Sync */
32class C_Block_Sync : public Context {
33private:
34 Client *cl;
35 uint64_t ino;
36 barrier_interval iv;
37 enum CBlockSync_State state;
38 Barrier *barrier;
39 int *rval; /* see Cond.h */
40
41public:
42 boost::intrusive::list_member_hook<> intervals_hook;
43 C_Block_Sync(Client *c, uint64_t i, barrier_interval iv, int *r);
44 void finish(int rval);
45
46 friend class Barrier;
47 friend class BarrierContext;
48};
49
50C_Block_Sync::C_Block_Sync(Client *c, uint64_t i, barrier_interval iv,
51 int *r=0) :
52 cl(c), ino(i), iv(iv), rval(r)
53{
54 state = CBlockSync_State_None;
55 barrier = NULL;
56
57 cldout(cl, 1) << "C_Block_Sync for " << ino << dendl;
58
59 if (!cl->barriers[ino]) {
60 cl->barriers[ino] = new BarrierContext(cl, ino);
61 }
62 /* XXX current semantics aren't commit-ordered */
63 cl->barriers[ino]->write_nobarrier(*this);
64}
65
66void C_Block_Sync::finish(int r) {
67 cldout(cl, 1) << "C_Block_Sync::finish() for " << ino << " "
68 << iv << " r==" << r << dendl;
69 if (rval)
70 *rval = r;
71 cl->barriers[ino]->complete(*this);
72}
73
74/* Barrier */
75Barrier::Barrier()
76{ }
77
78Barrier::~Barrier()
79{ }
80
81/* BarrierContext */
82BarrierContext::BarrierContext(Client *c, uint64_t ino) :
9f95a23c 83 cl(c), ino(ino)
7c673cae
FG
84{ };
85
86void BarrierContext::write_nobarrier(C_Block_Sync &cbs)
87{
11fdf7f2 88 std::lock_guard locker(lock);
7c673cae
FG
89 cbs.state = CBlockSync_State_Unclaimed;
90 outstanding_writes.push_back(cbs);
91}
92
93void BarrierContext::write_barrier(C_Block_Sync &cbs)
94{
9f95a23c 95 std::unique_lock locker(lock);
7c673cae
FG
96 barrier_interval &iv = cbs.iv;
97
98 { /* find blocking commit--intrusive no help here */
99 BarrierList::iterator iter;
100 bool done = false;
101 for (iter = active_commits.begin();
102 !done && (iter != active_commits.end());
103 ++iter) {
104 Barrier &barrier = *iter;
105 while (boost::icl::intersects(barrier.span, iv)) {
106 /* wait on this */
9f95a23c 107 barrier.cond.wait(locker);
7c673cae
FG
108 done = true;
109 }
110 }
111 }
112
113 cbs.state = CBlockSync_State_Unclaimed;
114 outstanding_writes.push_back(cbs);
115
116} /* write_barrier */
117
118void BarrierContext::commit_barrier(barrier_interval &civ)
119{
9f95a23c 120 std::unique_lock locker(lock);
7c673cae
FG
121
122 /* we commit outstanding writes--if none exist, we don't care */
123 if (outstanding_writes.size() == 0)
124 return;
125
126 boost::icl::interval_set<uint64_t> cvs;
127 cvs.insert(civ);
128
129 Barrier *barrier = NULL;
130 BlockSyncList::iterator iter, iter2;
131
132 iter = outstanding_writes.begin();
133 while (iter != outstanding_writes.end()) {
134 barrier_interval &iv = iter->iv;
135 if (boost::icl::intersects(cvs, iv)) {
136 C_Block_Sync &a_write = *iter;
137 if (! barrier)
138 barrier = new Barrier();
139 /* mark the callback */
140 a_write.state = CBlockSync_State_Committing;
141 a_write.barrier = barrier;
142 iter2 = iter++;
143 outstanding_writes.erase(iter2);
144 barrier->write_list.push_back(a_write);
145 barrier->span.insert(iv);
146 /* avoid iter invalidate */
147 } else {
148 ++iter;
149 }
150 }
151
152 if (barrier) {
153 active_commits.push_back(*barrier);
154 /* and wait on this */
9f95a23c 155 barrier->cond.wait(locker);
7c673cae
FG
156 }
157
158} /* commit_barrier */
159
160void BarrierContext::complete(C_Block_Sync &cbs)
161{
11fdf7f2 162 std::lock_guard locker(lock);
7c673cae
FG
163 BlockSyncList::iterator iter =
164 BlockSyncList::s_iterator_to(cbs);
165
166 switch (cbs.state) {
167 case CBlockSync_State_Unclaimed:
168 /* cool, no waiting */
169 outstanding_writes.erase(iter);
170 break;
171 case CBlockSync_State_Committing:
172 {
173 Barrier *barrier = iter->barrier;
174 barrier->write_list.erase(iter);
175 /* signal waiters */
9f95a23c 176 barrier->cond.notify_all();
7c673cae
FG
177 /* dispose cleared barrier */
178 if (barrier->write_list.size() == 0) {
179 BarrierList::iterator iter2 =
180 BarrierList::s_iterator_to(*barrier);
181 active_commits.erase(iter2);
182 delete barrier;
183 }
184 }
185 break;
186 default:
11fdf7f2 187 ceph_abort();
7c673cae
FG
188 break;
189 }
190
191 cbs.state = CBlockSync_State_Completed;
192
193} /* complete */
194
195BarrierContext::~BarrierContext()
196{ }