]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | #include "FileStoreTracker.h" | |
3 | #include <stdlib.h> | |
4 | #include <iostream> | |
5 | #include <boost/scoped_ptr.hpp> | |
6 | #include "include/Context.h" | |
7c673cae | 7 | |
20effc67 TL |
8 | using namespace std; |
9 | ||
7c673cae FG |
10 | class OnApplied : public Context { |
11 | FileStoreTracker *tracker; | |
12 | list<pair<pair<coll_t, string>, uint64_t> > in_flight; | |
13 | public: | |
14 | OnApplied(FileStoreTracker *tracker, | |
15 | list<pair<pair<coll_t, string>, uint64_t> > in_flight) | |
16 | : tracker(tracker), in_flight(in_flight) {} | |
17 | ||
18 | void finish(int r) override { | |
19 | for (list<pair<pair<coll_t, string>, uint64_t> >::iterator i = | |
20 | in_flight.begin(); | |
21 | i != in_flight.end(); | |
22 | ++i) { | |
23 | tracker->applied(i->first, i->second); | |
24 | } | |
25 | } | |
26 | }; | |
27 | ||
28 | class OnCommitted : public Context { | |
29 | FileStoreTracker *tracker; | |
30 | list<pair<pair<coll_t, string>, uint64_t> > in_flight; | |
31 | public: | |
32 | OnCommitted(FileStoreTracker *tracker, | |
33 | list<pair<pair<coll_t, string>, uint64_t> > in_flight) | |
34 | : tracker(tracker), in_flight(in_flight) {} | |
35 | ||
36 | void finish(int r) override { | |
37 | for (list<pair<pair<coll_t, string>, uint64_t> >::iterator i = | |
38 | in_flight.begin(); | |
39 | i != in_flight.end(); | |
40 | ++i) { | |
41 | tracker->committed(i->first, i->second); | |
42 | } | |
43 | } | |
44 | }; | |
45 | ||
46 | int FileStoreTracker::init() | |
47 | { | |
48 | set<string> to_get; | |
49 | to_get.insert("STATUS"); | |
50 | map<string, bufferlist> got; | |
51 | db->get("STATUS", to_get, &got); | |
52 | restart_seq = 0; | |
53 | if (!got.empty()) { | |
11fdf7f2 TL |
54 | auto bp = got.begin()->second.cbegin(); |
55 | decode(restart_seq, bp); | |
7c673cae FG |
56 | } |
57 | ++restart_seq; | |
58 | KeyValueDB::Transaction t = db->get_transaction(); | |
59 | got.clear(); | |
11fdf7f2 | 60 | encode(restart_seq, got["STATUS"]); |
7c673cae FG |
61 | t->set("STATUS", got); |
62 | db->submit_transaction(t); | |
63 | return 0; | |
64 | } | |
65 | ||
66 | void FileStoreTracker::submit_transaction(Transaction &t) | |
67 | { | |
68 | list<pair<pair<coll_t, string>, uint64_t> > in_flight; | |
69 | OutTransaction out; | |
70 | out.t = new ObjectStore::Transaction; | |
71 | out.in_flight = &in_flight; | |
72 | for (list<Transaction::Op*>::iterator i = t.ops.begin(); | |
73 | i != t.ops.end(); | |
74 | ++i) { | |
75 | (**i)(this, &out); | |
76 | } | |
11fdf7f2 TL |
77 | out.t->register_on_applied(new OnApplied(this, in_flight)); |
78 | out.t->register_on_commit(new OnCommitted(this, in_flight)); | |
79 | auto ch = store->open_collection(coll_t()); | |
80 | store->queue_transaction(ch, std::move(*out.t), nullptr); | |
7c673cae FG |
81 | delete out.t; |
82 | } | |
83 | ||
84 | void FileStoreTracker::write(const pair<coll_t, string> &obj, | |
85 | OutTransaction *out) | |
86 | { | |
9f95a23c | 87 | std::lock_guard l{lock}; |
7c673cae FG |
88 | std::cerr << "Writing " << obj << std::endl; |
89 | ObjectContents contents = get_current_content(obj); | |
90 | ||
91 | uint64_t offset = rand() % (SIZE/2); | |
92 | uint64_t len = rand() % (SIZE/2); | |
93 | if (!len) len = 10; | |
94 | contents.write(rand(), offset, len); | |
95 | ||
96 | bufferlist to_write; | |
97 | ObjectContents::Iterator iter = contents.get_iterator(); | |
98 | iter.seek_to(offset); | |
99 | for (uint64_t i = offset; | |
100 | i < offset + len; | |
101 | ++i, ++iter) { | |
11fdf7f2 | 102 | ceph_assert(iter.valid()); |
7c673cae FG |
103 | to_write.append(*iter); |
104 | } | |
105 | out->t->write(coll_t(obj.first), | |
106 | ghobject_t(hobject_t(sobject_t(obj.second, CEPH_NOSNAP))), | |
107 | offset, | |
108 | len, | |
109 | to_write); | |
110 | out->in_flight->push_back(make_pair(obj, set_content(obj, contents))); | |
111 | } | |
112 | ||
113 | void FileStoreTracker::remove(const pair<coll_t, string> &obj, | |
114 | OutTransaction *out) | |
115 | { | |
116 | std::cerr << "Deleting " << obj << std::endl; | |
9f95a23c | 117 | std::lock_guard l{lock}; |
7c673cae FG |
118 | ObjectContents old_contents = get_current_content(obj); |
119 | if (!old_contents.exists()) | |
120 | return; | |
121 | out->t->remove(coll_t(obj.first), | |
122 | ghobject_t(hobject_t(sobject_t(obj.second, CEPH_NOSNAP)))); | |
123 | ObjectContents contents; | |
124 | out->in_flight->push_back(make_pair(obj, set_content(obj, contents))); | |
125 | } | |
126 | ||
127 | void FileStoreTracker::clone_range(const pair<coll_t, string> &from, | |
128 | const pair<coll_t, string> &to, | |
129 | OutTransaction *out) { | |
9f95a23c | 130 | std::lock_guard l{lock}; |
7c673cae | 131 | std::cerr << "CloningRange " << from << " to " << to << std::endl; |
11fdf7f2 | 132 | ceph_assert(from.first == to.first); |
7c673cae FG |
133 | ObjectContents from_contents = get_current_content(from); |
134 | ObjectContents to_contents = get_current_content(to); | |
135 | if (!from_contents.exists()) { | |
136 | return; | |
137 | } | |
138 | if (from.second == to.second) { | |
139 | return; | |
140 | } | |
141 | ||
142 | uint64_t new_size = from_contents.size(); | |
143 | interval_set<uint64_t> interval_to_clone; | |
144 | uint64_t offset = rand() % (new_size/2); | |
145 | uint64_t len = rand() % (new_size/2); | |
146 | if (!len) len = 10; | |
147 | interval_to_clone.insert(offset, len); | |
148 | to_contents.clone_range(from_contents, interval_to_clone); | |
149 | out->t->clone_range(coll_t(from.first), | |
150 | ghobject_t(hobject_t(sobject_t(from.second, CEPH_NOSNAP))), | |
151 | ghobject_t(hobject_t(sobject_t(to.second, CEPH_NOSNAP))), | |
152 | offset, | |
153 | len, | |
154 | offset); | |
155 | out->in_flight->push_back(make_pair(to, set_content(to, to_contents))); | |
156 | } | |
157 | ||
158 | void FileStoreTracker::clone(const pair<coll_t, string> &from, | |
159 | const pair<coll_t, string> &to, | |
160 | OutTransaction *out) { | |
9f95a23c | 161 | std::lock_guard l{lock}; |
7c673cae | 162 | std::cerr << "Cloning " << from << " to " << to << std::endl; |
11fdf7f2 | 163 | ceph_assert(from.first == to.first); |
7c673cae FG |
164 | if (from.second == to.second) { |
165 | return; | |
166 | } | |
167 | ObjectContents from_contents = get_current_content(from); | |
168 | ObjectContents to_contents = get_current_content(to); | |
169 | if (!from_contents.exists()) { | |
170 | return; | |
171 | } | |
172 | ||
173 | if (to_contents.exists()) | |
174 | out->t->remove(coll_t(to.first), | |
175 | ghobject_t(hobject_t(sobject_t(to.second, CEPH_NOSNAP)))); | |
176 | out->t->clone(coll_t(from.first), | |
177 | ghobject_t(hobject_t(sobject_t(from.second, CEPH_NOSNAP))), | |
178 | ghobject_t(hobject_t(sobject_t(to.second, CEPH_NOSNAP)))); | |
179 | out->in_flight->push_back(make_pair(to, set_content(to, from_contents))); | |
180 | } | |
181 | ||
182 | ||
183 | string obj_to_prefix(const pair<coll_t, string> &obj) { | |
184 | string sep; | |
185 | sep.push_back('^'); | |
186 | return obj.first.to_str() + sep + obj.second + "_CONTENTS_"; | |
187 | } | |
188 | ||
189 | string obj_to_meta_prefix(const pair<coll_t, string> &obj) { | |
190 | string sep; | |
191 | sep.push_back('^'); | |
192 | return obj.first.to_str() + sep + obj.second; | |
193 | } | |
194 | ||
195 | string seq_to_key(uint64_t seq) { | |
196 | char buf[50]; | |
197 | snprintf(buf, sizeof(buf), "%*llu", 20, (unsigned long long int)seq); | |
198 | return string(buf); | |
199 | } | |
200 | ||
201 | struct ObjStatus { | |
202 | uint64_t last_applied; | |
203 | uint64_t last_committed; | |
204 | uint64_t restart_seq; | |
205 | ObjStatus() : last_applied(0), last_committed(0), restart_seq(0) {} | |
206 | ||
207 | uint64_t get_last_applied(uint64_t seq) const { | |
208 | if (seq > restart_seq) | |
209 | return last_committed; | |
210 | else | |
211 | return last_applied; | |
212 | } | |
213 | void set_last_applied(uint64_t _last_applied, uint64_t seq) { | |
214 | last_applied = _last_applied; | |
215 | restart_seq = seq; | |
216 | } | |
217 | uint64_t trim_to() const { | |
218 | return last_applied < last_committed ? | |
219 | last_applied : last_committed; | |
220 | } | |
221 | }; | |
222 | void encode(const ObjStatus &obj, bufferlist &bl) { | |
11fdf7f2 TL |
223 | encode(obj.last_applied, bl); |
224 | encode(obj.last_committed, bl); | |
225 | encode(obj.restart_seq, bl); | |
7c673cae | 226 | } |
11fdf7f2 TL |
227 | void decode(ObjStatus &obj, bufferlist::const_iterator &bl) { |
228 | decode(obj.last_applied, bl); | |
229 | decode(obj.last_committed, bl); | |
230 | decode(obj.restart_seq, bl); | |
7c673cae FG |
231 | } |
232 | ||
233 | ||
234 | ObjStatus get_obj_status(const pair<coll_t, string> &obj, | |
235 | KeyValueDB *db) | |
236 | { | |
237 | set<string> to_get; | |
238 | to_get.insert("META"); | |
239 | map<string, bufferlist> got; | |
240 | db->get(obj_to_meta_prefix(obj), to_get, &got); | |
241 | ObjStatus retval; | |
242 | if (!got.empty()) { | |
11fdf7f2 TL |
243 | auto bp = got.begin()->second.cbegin(); |
244 | decode(retval, bp); | |
7c673cae FG |
245 | } |
246 | return retval; | |
247 | } | |
248 | ||
249 | void set_obj_status(const pair<coll_t, string> &obj, | |
250 | const ObjStatus &status, | |
251 | KeyValueDB::Transaction t) | |
252 | { | |
253 | map<string, bufferlist> to_set; | |
11fdf7f2 | 254 | encode(status, to_set["META"]); |
7c673cae FG |
255 | t->set(obj_to_meta_prefix(obj), to_set); |
256 | } | |
257 | ||
258 | void _clean_forward(const pair<coll_t, string> &obj, | |
259 | uint64_t last_valid, | |
260 | KeyValueDB *db) | |
261 | { | |
262 | KeyValueDB::Transaction t = db->get_transaction(); | |
263 | KeyValueDB::Iterator i = db->get_iterator(obj_to_prefix(obj)); | |
264 | set<string> to_remove; | |
265 | i->upper_bound(seq_to_key(last_valid)); | |
266 | for (; i->valid(); i->next()) { | |
267 | to_remove.insert(i->key()); | |
268 | } | |
269 | t->rmkeys(obj_to_prefix(obj), to_remove); | |
270 | db->submit_transaction(t); | |
271 | } | |
272 | ||
273 | ||
274 | void FileStoreTracker::verify(const coll_t &coll, const string &obj, | |
275 | bool on_start) { | |
9f95a23c | 276 | std::lock_guard l{lock}; |
7c673cae FG |
277 | std::cerr << "Verifying " << make_pair(coll, obj) << std::endl; |
278 | ||
279 | pair<uint64_t, uint64_t> valid_reads = get_valid_reads(make_pair(coll, obj)); | |
280 | std::cerr << "valid_reads is " << valid_reads << std::endl; | |
281 | bufferlist contents; | |
11fdf7f2 TL |
282 | auto ch = store->open_collection(coll_t(coll)); |
283 | int r = store->read(ch, | |
7c673cae FG |
284 | ghobject_t(hobject_t(sobject_t(obj, CEPH_NOSNAP))), |
285 | 0, | |
286 | 2*SIZE, | |
287 | contents); | |
288 | std::cerr << "exists: " << r << std::endl; | |
289 | ||
290 | ||
291 | for (uint64_t i = valid_reads.first; | |
292 | i < valid_reads.second; | |
293 | ++i) { | |
294 | ObjectContents old_contents = get_content(make_pair(coll, obj), i); | |
295 | ||
296 | std::cerr << "old_contents exists " << old_contents.exists() << std::endl; | |
297 | if (!old_contents.exists() && (r == -ENOENT)) | |
298 | return; | |
299 | ||
300 | if (old_contents.exists() && (r == -ENOENT)) | |
301 | continue; | |
302 | ||
303 | if (!old_contents.exists() && (r != -ENOENT)) | |
304 | continue; | |
305 | ||
306 | if (contents.length() != old_contents.size()) { | |
307 | std::cerr << "old_contents.size() is " | |
308 | << old_contents.size() << std::endl; | |
309 | continue; | |
310 | } | |
311 | ||
312 | bufferlist::iterator bp = contents.begin(); | |
313 | ObjectContents::Iterator iter = old_contents.get_iterator(); | |
314 | iter.seek_to_first(); | |
315 | bool matches = true; | |
316 | uint64_t pos = 0; | |
317 | for (; !bp.end() && iter.valid(); | |
318 | ++iter, ++bp, ++pos) { | |
319 | if (*iter != *bp) { | |
320 | std::cerr << "does not match at pos " << pos << std::endl; | |
321 | matches = false; | |
322 | break; | |
323 | } | |
324 | } | |
325 | if (matches) { | |
326 | if (on_start) | |
327 | _clean_forward(make_pair(coll, obj), i, db); | |
328 | return; | |
329 | } | |
330 | } | |
331 | std::cerr << "Verifying " << make_pair(coll, obj) << " failed " << std::endl; | |
332 | ceph_abort(); | |
333 | } | |
334 | ||
335 | ObjectContents FileStoreTracker::get_current_content( | |
336 | const pair<coll_t, string> &obj) | |
337 | { | |
338 | KeyValueDB::Iterator iter = db->get_iterator( | |
339 | obj_to_prefix(obj)); | |
340 | iter->seek_to_last(); | |
341 | if (iter->valid()) { | |
342 | bufferlist bl = iter->value(); | |
11fdf7f2 | 343 | auto bp = bl.cbegin(); |
7c673cae | 344 | pair<uint64_t, bufferlist> val; |
11fdf7f2 TL |
345 | decode(val, bp); |
346 | ceph_assert(seq_to_key(val.first) == iter->key()); | |
7c673cae FG |
347 | bp = val.second.begin(); |
348 | return ObjectContents(bp); | |
349 | } | |
350 | return ObjectContents(); | |
351 | } | |
352 | ||
353 | ObjectContents FileStoreTracker::get_content( | |
354 | const pair<coll_t, string> &obj, uint64_t version) | |
355 | { | |
356 | set<string> to_get; | |
357 | map<string, bufferlist> got; | |
358 | to_get.insert(seq_to_key(version)); | |
359 | db->get(obj_to_prefix(obj), to_get, &got); | |
360 | if (got.empty()) | |
361 | return ObjectContents(); | |
362 | pair<uint64_t, bufferlist> val; | |
11fdf7f2 TL |
363 | auto bp = got.begin()->second.cbegin(); |
364 | decode(val, bp); | |
7c673cae | 365 | bp = val.second.begin(); |
11fdf7f2 | 366 | ceph_assert(val.first == version); |
7c673cae FG |
367 | return ObjectContents(bp); |
368 | } | |
369 | ||
370 | pair<uint64_t, uint64_t> FileStoreTracker::get_valid_reads( | |
371 | const pair<coll_t, string> &obj) | |
372 | { | |
373 | pair<uint64_t, uint64_t> bounds = make_pair(0,1); | |
374 | KeyValueDB::Iterator iter = db->get_iterator( | |
375 | obj_to_prefix(obj)); | |
376 | iter->seek_to_last(); | |
377 | if (iter->valid()) { | |
378 | pair<uint64_t, bufferlist> val; | |
379 | bufferlist bl = iter->value(); | |
11fdf7f2 TL |
380 | auto bp = bl.cbegin(); |
381 | decode(val, bp); | |
7c673cae FG |
382 | bounds.second = val.first + 1; |
383 | } | |
384 | ||
385 | ObjStatus obj_status = get_obj_status(obj, db); | |
386 | bounds.first = obj_status.get_last_applied(restart_seq); | |
387 | return bounds; | |
388 | } | |
389 | ||
390 | void clear_obsolete(const pair<coll_t, string> &obj, | |
391 | const ObjStatus &status, | |
392 | KeyValueDB *db, | |
393 | KeyValueDB::Transaction t) | |
394 | { | |
395 | KeyValueDB::Iterator iter = db->get_iterator(obj_to_prefix(obj)); | |
396 | set<string> to_remove; | |
397 | iter->seek_to_first(); | |
398 | for (; iter->valid() && iter->key() < seq_to_key(status.trim_to()); | |
399 | iter->next()) | |
400 | to_remove.insert(iter->key()); | |
401 | t->rmkeys(obj_to_prefix(obj), to_remove); | |
402 | } | |
403 | ||
404 | void FileStoreTracker::committed(const pair<coll_t, string> &obj, | |
405 | uint64_t seq) { | |
9f95a23c | 406 | std::lock_guard l{lock}; |
7c673cae | 407 | ObjStatus status = get_obj_status(obj, db); |
11fdf7f2 | 408 | ceph_assert(status.last_committed < seq); |
7c673cae FG |
409 | status.last_committed = seq; |
410 | KeyValueDB::Transaction t = db->get_transaction(); | |
411 | clear_obsolete(obj, status, db, t); | |
412 | set_obj_status(obj, status, t); | |
413 | db->submit_transaction(t); | |
414 | } | |
415 | ||
416 | void FileStoreTracker::applied(const pair<coll_t, string> &obj, | |
417 | uint64_t seq) { | |
9f95a23c | 418 | std::lock_guard l{lock}; |
7c673cae FG |
419 | std::cerr << "Applied " << obj << " version " << seq << std::endl; |
420 | ObjStatus status = get_obj_status(obj, db); | |
11fdf7f2 | 421 | ceph_assert(status.last_applied < seq); |
7c673cae FG |
422 | status.set_last_applied(seq, restart_seq); |
423 | KeyValueDB::Transaction t = db->get_transaction(); | |
424 | clear_obsolete(obj, status, db, t); | |
425 | set_obj_status(obj, status, t); | |
426 | db->submit_transaction(t); | |
427 | } | |
428 | ||
429 | ||
430 | uint64_t FileStoreTracker::set_content(const pair<coll_t, string> &obj, | |
431 | ObjectContents &content) { | |
432 | KeyValueDB::Transaction t = db->get_transaction(); | |
433 | KeyValueDB::Iterator iter = db->get_iterator( | |
434 | obj_to_prefix(obj)); | |
435 | iter->seek_to_last(); | |
436 | uint64_t most_recent = 0; | |
437 | if (iter->valid()) { | |
438 | pair<uint64_t, bufferlist> val; | |
439 | bufferlist bl = iter->value(); | |
11fdf7f2 TL |
440 | auto bp = bl.cbegin(); |
441 | decode(val, bp); | |
7c673cae FG |
442 | most_recent = val.first; |
443 | } | |
444 | bufferlist buf_content; | |
445 | content.encode(buf_content); | |
446 | map<string, bufferlist> to_set; | |
11fdf7f2 | 447 | encode(make_pair(most_recent + 1, buf_content), |
7c673cae FG |
448 | to_set[seq_to_key(most_recent + 1)]); |
449 | t->set(obj_to_prefix(obj), to_set); | |
450 | db->submit_transaction(t); | |
451 | return most_recent + 1; | |
452 | } |