]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | #include "FileStoreTracker.h" | |
3 | #include <stdlib.h> | |
4 | #include <iostream> | |
5 | #include <boost/scoped_ptr.hpp> | |
6 | #include "include/Context.h" | |
7 | #include "common/Mutex.h" | |
8 | ||
9 | class OnApplied : public Context { | |
10 | FileStoreTracker *tracker; | |
11 | list<pair<pair<coll_t, string>, uint64_t> > in_flight; | |
12 | public: | |
13 | OnApplied(FileStoreTracker *tracker, | |
14 | list<pair<pair<coll_t, string>, uint64_t> > in_flight) | |
15 | : tracker(tracker), in_flight(in_flight) {} | |
16 | ||
17 | void finish(int r) override { | |
18 | for (list<pair<pair<coll_t, string>, uint64_t> >::iterator i = | |
19 | in_flight.begin(); | |
20 | i != in_flight.end(); | |
21 | ++i) { | |
22 | tracker->applied(i->first, i->second); | |
23 | } | |
24 | } | |
25 | }; | |
26 | ||
27 | class OnCommitted : public Context { | |
28 | FileStoreTracker *tracker; | |
29 | list<pair<pair<coll_t, string>, uint64_t> > in_flight; | |
30 | public: | |
31 | OnCommitted(FileStoreTracker *tracker, | |
32 | list<pair<pair<coll_t, string>, uint64_t> > in_flight) | |
33 | : tracker(tracker), in_flight(in_flight) {} | |
34 | ||
35 | void finish(int r) override { | |
36 | for (list<pair<pair<coll_t, string>, uint64_t> >::iterator i = | |
37 | in_flight.begin(); | |
38 | i != in_flight.end(); | |
39 | ++i) { | |
40 | tracker->committed(i->first, i->second); | |
41 | } | |
42 | } | |
43 | }; | |
44 | ||
45 | int FileStoreTracker::init() | |
46 | { | |
47 | set<string> to_get; | |
48 | to_get.insert("STATUS"); | |
49 | map<string, bufferlist> got; | |
50 | db->get("STATUS", to_get, &got); | |
51 | restart_seq = 0; | |
52 | if (!got.empty()) { | |
53 | bufferlist::iterator bp = got.begin()->second.begin(); | |
54 | ::decode(restart_seq, bp); | |
55 | } | |
56 | ++restart_seq; | |
57 | KeyValueDB::Transaction t = db->get_transaction(); | |
58 | got.clear(); | |
59 | ::encode(restart_seq, got["STATUS"]); | |
60 | t->set("STATUS", got); | |
61 | db->submit_transaction(t); | |
62 | return 0; | |
63 | } | |
64 | ||
65 | void FileStoreTracker::submit_transaction(Transaction &t) | |
66 | { | |
67 | list<pair<pair<coll_t, string>, uint64_t> > in_flight; | |
68 | OutTransaction out; | |
69 | out.t = new ObjectStore::Transaction; | |
70 | out.in_flight = &in_flight; | |
71 | for (list<Transaction::Op*>::iterator i = t.ops.begin(); | |
72 | i != t.ops.end(); | |
73 | ++i) { | |
74 | (**i)(this, &out); | |
75 | } | |
76 | store->queue_transaction( | |
77 | 0, std::move(*out.t), | |
78 | new OnApplied(this, in_flight), | |
79 | new OnCommitted(this, in_flight)); | |
80 | delete out.t; | |
81 | } | |
82 | ||
83 | void FileStoreTracker::write(const pair<coll_t, string> &obj, | |
84 | OutTransaction *out) | |
85 | { | |
86 | Mutex::Locker l(lock); | |
87 | std::cerr << "Writing " << obj << std::endl; | |
88 | ObjectContents contents = get_current_content(obj); | |
89 | ||
90 | uint64_t offset = rand() % (SIZE/2); | |
91 | uint64_t len = rand() % (SIZE/2); | |
92 | if (!len) len = 10; | |
93 | contents.write(rand(), offset, len); | |
94 | ||
95 | bufferlist to_write; | |
96 | ObjectContents::Iterator iter = contents.get_iterator(); | |
97 | iter.seek_to(offset); | |
98 | for (uint64_t i = offset; | |
99 | i < offset + len; | |
100 | ++i, ++iter) { | |
101 | assert(iter.valid()); | |
102 | to_write.append(*iter); | |
103 | } | |
104 | out->t->write(coll_t(obj.first), | |
105 | ghobject_t(hobject_t(sobject_t(obj.second, CEPH_NOSNAP))), | |
106 | offset, | |
107 | len, | |
108 | to_write); | |
109 | out->in_flight->push_back(make_pair(obj, set_content(obj, contents))); | |
110 | } | |
111 | ||
112 | void FileStoreTracker::remove(const pair<coll_t, string> &obj, | |
113 | OutTransaction *out) | |
114 | { | |
115 | std::cerr << "Deleting " << obj << std::endl; | |
116 | Mutex::Locker l(lock); | |
117 | ObjectContents old_contents = get_current_content(obj); | |
118 | if (!old_contents.exists()) | |
119 | return; | |
120 | out->t->remove(coll_t(obj.first), | |
121 | ghobject_t(hobject_t(sobject_t(obj.second, CEPH_NOSNAP)))); | |
122 | ObjectContents contents; | |
123 | out->in_flight->push_back(make_pair(obj, set_content(obj, contents))); | |
124 | } | |
125 | ||
126 | void FileStoreTracker::clone_range(const pair<coll_t, string> &from, | |
127 | const pair<coll_t, string> &to, | |
128 | OutTransaction *out) { | |
129 | Mutex::Locker l(lock); | |
130 | std::cerr << "CloningRange " << from << " to " << to << std::endl; | |
131 | assert(from.first == to.first); | |
132 | ObjectContents from_contents = get_current_content(from); | |
133 | ObjectContents to_contents = get_current_content(to); | |
134 | if (!from_contents.exists()) { | |
135 | return; | |
136 | } | |
137 | if (from.second == to.second) { | |
138 | return; | |
139 | } | |
140 | ||
141 | uint64_t new_size = from_contents.size(); | |
142 | interval_set<uint64_t> interval_to_clone; | |
143 | uint64_t offset = rand() % (new_size/2); | |
144 | uint64_t len = rand() % (new_size/2); | |
145 | if (!len) len = 10; | |
146 | interval_to_clone.insert(offset, len); | |
147 | to_contents.clone_range(from_contents, interval_to_clone); | |
148 | out->t->clone_range(coll_t(from.first), | |
149 | ghobject_t(hobject_t(sobject_t(from.second, CEPH_NOSNAP))), | |
150 | ghobject_t(hobject_t(sobject_t(to.second, CEPH_NOSNAP))), | |
151 | offset, | |
152 | len, | |
153 | offset); | |
154 | out->in_flight->push_back(make_pair(to, set_content(to, to_contents))); | |
155 | } | |
156 | ||
157 | void FileStoreTracker::clone(const pair<coll_t, string> &from, | |
158 | const pair<coll_t, string> &to, | |
159 | OutTransaction *out) { | |
160 | Mutex::Locker l(lock); | |
161 | std::cerr << "Cloning " << from << " to " << to << std::endl; | |
162 | assert(from.first == to.first); | |
163 | if (from.second == to.second) { | |
164 | return; | |
165 | } | |
166 | ObjectContents from_contents = get_current_content(from); | |
167 | ObjectContents to_contents = get_current_content(to); | |
168 | if (!from_contents.exists()) { | |
169 | return; | |
170 | } | |
171 | ||
172 | if (to_contents.exists()) | |
173 | out->t->remove(coll_t(to.first), | |
174 | ghobject_t(hobject_t(sobject_t(to.second, CEPH_NOSNAP)))); | |
175 | out->t->clone(coll_t(from.first), | |
176 | ghobject_t(hobject_t(sobject_t(from.second, CEPH_NOSNAP))), | |
177 | ghobject_t(hobject_t(sobject_t(to.second, CEPH_NOSNAP)))); | |
178 | out->in_flight->push_back(make_pair(to, set_content(to, from_contents))); | |
179 | } | |
180 | ||
181 | ||
182 | string obj_to_prefix(const pair<coll_t, string> &obj) { | |
183 | string sep; | |
184 | sep.push_back('^'); | |
185 | return obj.first.to_str() + sep + obj.second + "_CONTENTS_"; | |
186 | } | |
187 | ||
188 | string obj_to_meta_prefix(const pair<coll_t, string> &obj) { | |
189 | string sep; | |
190 | sep.push_back('^'); | |
191 | return obj.first.to_str() + sep + obj.second; | |
192 | } | |
193 | ||
194 | string seq_to_key(uint64_t seq) { | |
195 | char buf[50]; | |
196 | snprintf(buf, sizeof(buf), "%*llu", 20, (unsigned long long int)seq); | |
197 | return string(buf); | |
198 | } | |
199 | ||
200 | struct ObjStatus { | |
201 | uint64_t last_applied; | |
202 | uint64_t last_committed; | |
203 | uint64_t restart_seq; | |
204 | ObjStatus() : last_applied(0), last_committed(0), restart_seq(0) {} | |
205 | ||
206 | uint64_t get_last_applied(uint64_t seq) const { | |
207 | if (seq > restart_seq) | |
208 | return last_committed; | |
209 | else | |
210 | return last_applied; | |
211 | } | |
212 | void set_last_applied(uint64_t _last_applied, uint64_t seq) { | |
213 | last_applied = _last_applied; | |
214 | restart_seq = seq; | |
215 | } | |
216 | uint64_t trim_to() const { | |
217 | return last_applied < last_committed ? | |
218 | last_applied : last_committed; | |
219 | } | |
220 | }; | |
221 | void encode(const ObjStatus &obj, bufferlist &bl) { | |
222 | ::encode(obj.last_applied, bl); | |
223 | ::encode(obj.last_committed, bl); | |
224 | ::encode(obj.restart_seq, bl); | |
225 | } | |
226 | void decode(ObjStatus &obj, bufferlist::iterator &bl) { | |
227 | ::decode(obj.last_applied, bl); | |
228 | ::decode(obj.last_committed, bl); | |
229 | ::decode(obj.restart_seq, bl); | |
230 | } | |
231 | ||
232 | ||
233 | ObjStatus get_obj_status(const pair<coll_t, string> &obj, | |
234 | KeyValueDB *db) | |
235 | { | |
236 | set<string> to_get; | |
237 | to_get.insert("META"); | |
238 | map<string, bufferlist> got; | |
239 | db->get(obj_to_meta_prefix(obj), to_get, &got); | |
240 | ObjStatus retval; | |
241 | if (!got.empty()) { | |
242 | bufferlist::iterator bp = got.begin()->second.begin(); | |
243 | ::decode(retval, bp); | |
244 | } | |
245 | return retval; | |
246 | } | |
247 | ||
248 | void set_obj_status(const pair<coll_t, string> &obj, | |
249 | const ObjStatus &status, | |
250 | KeyValueDB::Transaction t) | |
251 | { | |
252 | map<string, bufferlist> to_set; | |
253 | ::encode(status, to_set["META"]); | |
254 | t->set(obj_to_meta_prefix(obj), to_set); | |
255 | } | |
256 | ||
257 | void _clean_forward(const pair<coll_t, string> &obj, | |
258 | uint64_t last_valid, | |
259 | KeyValueDB *db) | |
260 | { | |
261 | KeyValueDB::Transaction t = db->get_transaction(); | |
262 | KeyValueDB::Iterator i = db->get_iterator(obj_to_prefix(obj)); | |
263 | set<string> to_remove; | |
264 | i->upper_bound(seq_to_key(last_valid)); | |
265 | for (; i->valid(); i->next()) { | |
266 | to_remove.insert(i->key()); | |
267 | } | |
268 | t->rmkeys(obj_to_prefix(obj), to_remove); | |
269 | db->submit_transaction(t); | |
270 | } | |
271 | ||
272 | ||
273 | void FileStoreTracker::verify(const coll_t &coll, const string &obj, | |
274 | bool on_start) { | |
275 | Mutex::Locker l(lock); | |
276 | std::cerr << "Verifying " << make_pair(coll, obj) << std::endl; | |
277 | ||
278 | pair<uint64_t, uint64_t> valid_reads = get_valid_reads(make_pair(coll, obj)); | |
279 | std::cerr << "valid_reads is " << valid_reads << std::endl; | |
280 | bufferlist contents; | |
281 | int r = store->read(coll_t(coll), | |
282 | ghobject_t(hobject_t(sobject_t(obj, CEPH_NOSNAP))), | |
283 | 0, | |
284 | 2*SIZE, | |
285 | contents); | |
286 | std::cerr << "exists: " << r << std::endl; | |
287 | ||
288 | ||
289 | for (uint64_t i = valid_reads.first; | |
290 | i < valid_reads.second; | |
291 | ++i) { | |
292 | ObjectContents old_contents = get_content(make_pair(coll, obj), i); | |
293 | ||
294 | std::cerr << "old_contents exists " << old_contents.exists() << std::endl; | |
295 | if (!old_contents.exists() && (r == -ENOENT)) | |
296 | return; | |
297 | ||
298 | if (old_contents.exists() && (r == -ENOENT)) | |
299 | continue; | |
300 | ||
301 | if (!old_contents.exists() && (r != -ENOENT)) | |
302 | continue; | |
303 | ||
304 | if (contents.length() != old_contents.size()) { | |
305 | std::cerr << "old_contents.size() is " | |
306 | << old_contents.size() << std::endl; | |
307 | continue; | |
308 | } | |
309 | ||
310 | bufferlist::iterator bp = contents.begin(); | |
311 | ObjectContents::Iterator iter = old_contents.get_iterator(); | |
312 | iter.seek_to_first(); | |
313 | bool matches = true; | |
314 | uint64_t pos = 0; | |
315 | for (; !bp.end() && iter.valid(); | |
316 | ++iter, ++bp, ++pos) { | |
317 | if (*iter != *bp) { | |
318 | std::cerr << "does not match at pos " << pos << std::endl; | |
319 | matches = false; | |
320 | break; | |
321 | } | |
322 | } | |
323 | if (matches) { | |
324 | if (on_start) | |
325 | _clean_forward(make_pair(coll, obj), i, db); | |
326 | return; | |
327 | } | |
328 | } | |
329 | std::cerr << "Verifying " << make_pair(coll, obj) << " failed " << std::endl; | |
330 | ceph_abort(); | |
331 | } | |
332 | ||
333 | ObjectContents FileStoreTracker::get_current_content( | |
334 | const pair<coll_t, string> &obj) | |
335 | { | |
336 | KeyValueDB::Iterator iter = db->get_iterator( | |
337 | obj_to_prefix(obj)); | |
338 | iter->seek_to_last(); | |
339 | if (iter->valid()) { | |
340 | bufferlist bl = iter->value(); | |
341 | bufferlist::iterator bp = bl.begin(); | |
342 | pair<uint64_t, bufferlist> val; | |
343 | ::decode(val, bp); | |
344 | assert(seq_to_key(val.first) == iter->key()); | |
345 | bp = val.second.begin(); | |
346 | return ObjectContents(bp); | |
347 | } | |
348 | return ObjectContents(); | |
349 | } | |
350 | ||
351 | ObjectContents FileStoreTracker::get_content( | |
352 | const pair<coll_t, string> &obj, uint64_t version) | |
353 | { | |
354 | set<string> to_get; | |
355 | map<string, bufferlist> got; | |
356 | to_get.insert(seq_to_key(version)); | |
357 | db->get(obj_to_prefix(obj), to_get, &got); | |
358 | if (got.empty()) | |
359 | return ObjectContents(); | |
360 | pair<uint64_t, bufferlist> val; | |
361 | bufferlist::iterator bp = got.begin()->second.begin(); | |
362 | ::decode(val, bp); | |
363 | bp = val.second.begin(); | |
364 | assert(val.first == version); | |
365 | return ObjectContents(bp); | |
366 | } | |
367 | ||
368 | pair<uint64_t, uint64_t> FileStoreTracker::get_valid_reads( | |
369 | const pair<coll_t, string> &obj) | |
370 | { | |
371 | pair<uint64_t, uint64_t> bounds = make_pair(0,1); | |
372 | KeyValueDB::Iterator iter = db->get_iterator( | |
373 | obj_to_prefix(obj)); | |
374 | iter->seek_to_last(); | |
375 | if (iter->valid()) { | |
376 | pair<uint64_t, bufferlist> val; | |
377 | bufferlist bl = iter->value(); | |
378 | bufferlist::iterator bp = bl.begin(); | |
379 | ::decode(val, bp); | |
380 | bounds.second = val.first + 1; | |
381 | } | |
382 | ||
383 | ObjStatus obj_status = get_obj_status(obj, db); | |
384 | bounds.first = obj_status.get_last_applied(restart_seq); | |
385 | return bounds; | |
386 | } | |
387 | ||
388 | void clear_obsolete(const pair<coll_t, string> &obj, | |
389 | const ObjStatus &status, | |
390 | KeyValueDB *db, | |
391 | KeyValueDB::Transaction t) | |
392 | { | |
393 | KeyValueDB::Iterator iter = db->get_iterator(obj_to_prefix(obj)); | |
394 | set<string> to_remove; | |
395 | iter->seek_to_first(); | |
396 | for (; iter->valid() && iter->key() < seq_to_key(status.trim_to()); | |
397 | iter->next()) | |
398 | to_remove.insert(iter->key()); | |
399 | t->rmkeys(obj_to_prefix(obj), to_remove); | |
400 | } | |
401 | ||
402 | void FileStoreTracker::committed(const pair<coll_t, string> &obj, | |
403 | uint64_t seq) { | |
404 | Mutex::Locker l(lock); | |
405 | ObjStatus status = get_obj_status(obj, db); | |
406 | assert(status.last_committed < seq); | |
407 | status.last_committed = seq; | |
408 | KeyValueDB::Transaction t = db->get_transaction(); | |
409 | clear_obsolete(obj, status, db, t); | |
410 | set_obj_status(obj, status, t); | |
411 | db->submit_transaction(t); | |
412 | } | |
413 | ||
414 | void FileStoreTracker::applied(const pair<coll_t, string> &obj, | |
415 | uint64_t seq) { | |
416 | Mutex::Locker l(lock); | |
417 | std::cerr << "Applied " << obj << " version " << seq << std::endl; | |
418 | ObjStatus status = get_obj_status(obj, db); | |
419 | assert(status.last_applied < seq); | |
420 | status.set_last_applied(seq, restart_seq); | |
421 | KeyValueDB::Transaction t = db->get_transaction(); | |
422 | clear_obsolete(obj, status, db, t); | |
423 | set_obj_status(obj, status, t); | |
424 | db->submit_transaction(t); | |
425 | } | |
426 | ||
427 | ||
428 | uint64_t FileStoreTracker::set_content(const pair<coll_t, string> &obj, | |
429 | ObjectContents &content) { | |
430 | KeyValueDB::Transaction t = db->get_transaction(); | |
431 | KeyValueDB::Iterator iter = db->get_iterator( | |
432 | obj_to_prefix(obj)); | |
433 | iter->seek_to_last(); | |
434 | uint64_t most_recent = 0; | |
435 | if (iter->valid()) { | |
436 | pair<uint64_t, bufferlist> val; | |
437 | bufferlist bl = iter->value(); | |
438 | bufferlist::iterator bp = bl.begin(); | |
439 | ::decode(val, bp); | |
440 | most_recent = val.first; | |
441 | } | |
442 | bufferlist buf_content; | |
443 | content.encode(buf_content); | |
444 | map<string, bufferlist> to_set; | |
445 | ::encode(make_pair(most_recent + 1, buf_content), | |
446 | to_set[seq_to_key(most_recent + 1)]); | |
447 | t->set(obj_to_prefix(obj), to_set); | |
448 | db->submit_transaction(t); | |
449 | return most_recent + 1; | |
450 | } |