1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
19 #include "MDSContext.h"
20 #include "msg/Messenger.h"
24 #include "LogSegment.h"
26 #include "MDSTableClient.h"
27 #include "events/ETableClient.h"
29 #include "messages/MMDSTableRequest.h"
31 #include "common/config.h"
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_mds
36 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".tableclient(" << get_mdstable_name(table) << ") "
39 class C_LoggedAck
: public MDSLogContextBase
{
42 MDSRank
*get_mds() override
{ return tc
->mds
; }
44 C_LoggedAck(MDSTableClient
*a
, version_t t
) : tc(a
), tid(t
) {}
45 void finish(int r
) override
{
51 void MDSTableClient::handle_request(class MMDSTableRequest
*m
)
53 dout(10) << "handle_request " << *m
<< dendl
;
54 assert(m
->table
== table
);
56 version_t tid
= m
->get_tid();
57 uint64_t reqid
= m
->reqid
;
60 case TABLESERVER_OP_QUERY_REPLY
:
61 handle_query_result(m
);
64 case TABLESERVER_OP_AGREE
:
65 if (pending_prepare
.count(reqid
)) {
66 dout(10) << "got agree on " << reqid
<< " atid " << tid
<< dendl
;
68 assert(g_conf
->mds_kill_mdstable_at
!= 3);
70 MDSInternalContextBase
*onfinish
= pending_prepare
[reqid
].onfinish
;
71 *pending_prepare
[reqid
].ptid
= tid
;
72 if (pending_prepare
[reqid
].pbl
)
73 *pending_prepare
[reqid
].pbl
= m
->bl
;
74 pending_prepare
.erase(reqid
);
75 prepared_update
[tid
] = reqid
;
77 onfinish
->complete(0);
80 else if (prepared_update
.count(tid
)) {
81 dout(10) << "got duplicated agree on " << reqid
<< " atid " << tid
<< dendl
;
82 assert(prepared_update
[tid
] == reqid
);
83 assert(!server_ready
);
85 else if (pending_commit
.count(tid
)) {
86 dout(10) << "stray agree on " << reqid
<< " tid " << tid
87 << ", already committing, will resend COMMIT" << dendl
;
88 assert(!server_ready
);
89 // will re-send commit when receiving the server ready message
92 dout(10) << "stray agree on " << reqid
<< " tid " << tid
93 << ", sending ROLLBACK" << dendl
;
94 assert(!server_ready
);
95 MMDSTableRequest
*req
= new MMDSTableRequest(table
, TABLESERVER_OP_ROLLBACK
, 0, tid
);
96 mds
->send_message_mds(req
, mds
->get_mds_map()->get_tableserver());
100 case TABLESERVER_OP_ACK
:
101 if (pending_commit
.count(tid
) &&
102 pending_commit
[tid
]->pending_commit_tids
[table
].count(tid
)) {
103 dout(10) << "got ack on tid " << tid
<< ", logging" << dendl
;
105 assert(g_conf
->mds_kill_mdstable_at
!= 7);
107 // remove from committing list
108 pending_commit
[tid
]->pending_commit_tids
[table
].erase(tid
);
109 pending_commit
.erase(tid
);
112 mds
->mdlog
->start_submit_entry(new ETableClient(table
, TABLESERVER_OP_ACK
, tid
),
113 new C_LoggedAck(this, tid
));
115 dout(10) << "got stray ack on tid " << tid
<< ", ignoring" << dendl
;
119 case TABLESERVER_OP_SERVER_READY
:
120 assert(!server_ready
);
123 if (last_reqid
== ~0ULL)
132 assert(0 == "unrecognized mds_table_client request op");
139 void MDSTableClient::_logged_ack(version_t tid
)
141 dout(10) << "_logged_ack " << tid
<< dendl
;
143 assert(g_conf
->mds_kill_mdstable_at
!= 8);
145 // kick any waiters (LogSegment trim)
146 if (ack_waiters
.count(tid
)) {
147 dout(15) << "kicking ack waiters on tid " << tid
<< dendl
;
148 mds
->queue_waiters(ack_waiters
[tid
]);
149 ack_waiters
.erase(tid
);
153 void MDSTableClient::_prepare(bufferlist
& mutation
, version_t
*ptid
, bufferlist
*pbl
,
154 MDSInternalContextBase
*onfinish
)
156 if (last_reqid
== ~0ULL) {
157 dout(10) << "tableserver is not ready yet, waiting for request id" << dendl
;
158 waiting_for_reqid
.push_back(_pending_prepare(onfinish
, ptid
, pbl
, mutation
));
162 uint64_t reqid
= ++last_reqid
;
163 dout(10) << "_prepare " << reqid
<< dendl
;
165 pending_prepare
[reqid
].mutation
= mutation
;
166 pending_prepare
[reqid
].ptid
= ptid
;
167 pending_prepare
[reqid
].pbl
= pbl
;
168 pending_prepare
[reqid
].onfinish
= onfinish
;
172 MMDSTableRequest
*req
= new MMDSTableRequest(table
, TABLESERVER_OP_PREPARE
, reqid
);
174 mds
->send_message_mds(req
, mds
->get_mds_map()->get_tableserver());
176 dout(10) << "tableserver is not ready yet, deferring request" << dendl
;
179 void MDSTableClient::commit(version_t tid
, LogSegment
*ls
)
181 dout(10) << "commit " << tid
<< dendl
;
183 assert(prepared_update
.count(tid
));
184 prepared_update
.erase(tid
);
186 assert(pending_commit
.count(tid
) == 0);
187 pending_commit
[tid
] = ls
;
188 ls
->pending_commit_tids
[table
].insert(tid
);
190 assert(g_conf
->mds_kill_mdstable_at
!= 4);
194 MMDSTableRequest
*req
= new MMDSTableRequest(table
, TABLESERVER_OP_COMMIT
, 0, tid
);
195 mds
->send_message_mds(req
, mds
->get_mds_map()->get_tableserver());
197 dout(10) << "tableserver is not ready yet, deferring request" << dendl
;
204 void MDSTableClient::got_journaled_agree(version_t tid
, LogSegment
*ls
)
206 dout(10) << "got_journaled_agree " << tid
<< dendl
;
207 ls
->pending_commit_tids
[table
].insert(tid
);
208 pending_commit
[tid
] = ls
;
211 void MDSTableClient::got_journaled_ack(version_t tid
)
213 dout(10) << "got_journaled_ack " << tid
<< dendl
;
214 if (pending_commit
.count(tid
)) {
215 pending_commit
[tid
]->pending_commit_tids
[table
].erase(tid
);
216 pending_commit
.erase(tid
);
220 void MDSTableClient::resend_commits()
222 for (map
<version_t
,LogSegment
*>::iterator p
= pending_commit
.begin();
223 p
!= pending_commit
.end();
225 dout(10) << "resending commit on " << p
->first
<< dendl
;
226 MMDSTableRequest
*req
= new MMDSTableRequest(table
, TABLESERVER_OP_COMMIT
, 0, p
->first
);
227 mds
->send_message_mds(req
, mds
->get_mds_map()->get_tableserver());
231 void MDSTableClient::resend_prepares()
233 while (!waiting_for_reqid
.empty()) {
234 pending_prepare
[++last_reqid
] = waiting_for_reqid
.front();
235 waiting_for_reqid
.pop_front();
238 for (map
<uint64_t, _pending_prepare
>::iterator p
= pending_prepare
.begin();
239 p
!= pending_prepare
.end();
241 dout(10) << "resending prepare on " << p
->first
<< dendl
;
242 MMDSTableRequest
*req
= new MMDSTableRequest(table
, TABLESERVER_OP_PREPARE
, p
->first
);
243 req
->bl
= p
->second
.mutation
;
244 mds
->send_message_mds(req
, mds
->get_mds_map()->get_tableserver());
248 void MDSTableClient::handle_mds_failure(mds_rank_t who
)
250 if (who
!= mds
->get_mds_map()->get_tableserver())
251 return; // do nothing.
253 dout(7) << "tableserver mds." << who
<< " fails" << dendl
;
254 server_ready
= false;