]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/objectstore/FileStoreTracker.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / test / objectstore / FileStoreTracker.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2#include "FileStoreTracker.h"
3#include <stdlib.h>
4#include <iostream>
5#include <boost/scoped_ptr.hpp>
6#include "include/Context.h"
7c673cae 7
20effc67
TL
8using namespace std;
9
7c673cae
FG
10class OnApplied : public Context {
11 FileStoreTracker *tracker;
12 list<pair<pair<coll_t, string>, uint64_t> > in_flight;
13public:
14 OnApplied(FileStoreTracker *tracker,
15 list<pair<pair<coll_t, string>, uint64_t> > in_flight)
16 : tracker(tracker), in_flight(in_flight) {}
17
18 void finish(int r) override {
19 for (list<pair<pair<coll_t, string>, uint64_t> >::iterator i =
20 in_flight.begin();
21 i != in_flight.end();
22 ++i) {
23 tracker->applied(i->first, i->second);
24 }
25 }
26};
27
28class OnCommitted : public Context {
29 FileStoreTracker *tracker;
30 list<pair<pair<coll_t, string>, uint64_t> > in_flight;
31public:
32 OnCommitted(FileStoreTracker *tracker,
33 list<pair<pair<coll_t, string>, uint64_t> > in_flight)
34 : tracker(tracker), in_flight(in_flight) {}
35
36 void finish(int r) override {
37 for (list<pair<pair<coll_t, string>, uint64_t> >::iterator i =
38 in_flight.begin();
39 i != in_flight.end();
40 ++i) {
41 tracker->committed(i->first, i->second);
42 }
43 }
44};
45
46int FileStoreTracker::init()
47{
48 set<string> to_get;
49 to_get.insert("STATUS");
50 map<string, bufferlist> got;
51 db->get("STATUS", to_get, &got);
52 restart_seq = 0;
53 if (!got.empty()) {
11fdf7f2
TL
54 auto bp = got.begin()->second.cbegin();
55 decode(restart_seq, bp);
7c673cae
FG
56 }
57 ++restart_seq;
58 KeyValueDB::Transaction t = db->get_transaction();
59 got.clear();
11fdf7f2 60 encode(restart_seq, got["STATUS"]);
7c673cae
FG
61 t->set("STATUS", got);
62 db->submit_transaction(t);
63 return 0;
64}
65
66void FileStoreTracker::submit_transaction(Transaction &t)
67{
68 list<pair<pair<coll_t, string>, uint64_t> > in_flight;
69 OutTransaction out;
70 out.t = new ObjectStore::Transaction;
71 out.in_flight = &in_flight;
72 for (list<Transaction::Op*>::iterator i = t.ops.begin();
73 i != t.ops.end();
74 ++i) {
75 (**i)(this, &out);
76 }
11fdf7f2
TL
77 out.t->register_on_applied(new OnApplied(this, in_flight));
78 out.t->register_on_commit(new OnCommitted(this, in_flight));
79 auto ch = store->open_collection(coll_t());
80 store->queue_transaction(ch, std::move(*out.t), nullptr);
7c673cae
FG
81 delete out.t;
82}
83
84void FileStoreTracker::write(const pair<coll_t, string> &obj,
85 OutTransaction *out)
86{
9f95a23c 87 std::lock_guard l{lock};
7c673cae
FG
88 std::cerr << "Writing " << obj << std::endl;
89 ObjectContents contents = get_current_content(obj);
90
91 uint64_t offset = rand() % (SIZE/2);
92 uint64_t len = rand() % (SIZE/2);
93 if (!len) len = 10;
94 contents.write(rand(), offset, len);
95
96 bufferlist to_write;
97 ObjectContents::Iterator iter = contents.get_iterator();
98 iter.seek_to(offset);
99 for (uint64_t i = offset;
100 i < offset + len;
101 ++i, ++iter) {
11fdf7f2 102 ceph_assert(iter.valid());
7c673cae
FG
103 to_write.append(*iter);
104 }
105 out->t->write(coll_t(obj.first),
106 ghobject_t(hobject_t(sobject_t(obj.second, CEPH_NOSNAP))),
107 offset,
108 len,
109 to_write);
110 out->in_flight->push_back(make_pair(obj, set_content(obj, contents)));
111}
112
113void FileStoreTracker::remove(const pair<coll_t, string> &obj,
114 OutTransaction *out)
115{
116 std::cerr << "Deleting " << obj << std::endl;
9f95a23c 117 std::lock_guard l{lock};
7c673cae
FG
118 ObjectContents old_contents = get_current_content(obj);
119 if (!old_contents.exists())
120 return;
121 out->t->remove(coll_t(obj.first),
122 ghobject_t(hobject_t(sobject_t(obj.second, CEPH_NOSNAP))));
123 ObjectContents contents;
124 out->in_flight->push_back(make_pair(obj, set_content(obj, contents)));
125}
126
127void FileStoreTracker::clone_range(const pair<coll_t, string> &from,
128 const pair<coll_t, string> &to,
129 OutTransaction *out) {
9f95a23c 130 std::lock_guard l{lock};
7c673cae 131 std::cerr << "CloningRange " << from << " to " << to << std::endl;
11fdf7f2 132 ceph_assert(from.first == to.first);
7c673cae
FG
133 ObjectContents from_contents = get_current_content(from);
134 ObjectContents to_contents = get_current_content(to);
135 if (!from_contents.exists()) {
136 return;
137 }
138 if (from.second == to.second) {
139 return;
140 }
141
142 uint64_t new_size = from_contents.size();
143 interval_set<uint64_t> interval_to_clone;
144 uint64_t offset = rand() % (new_size/2);
145 uint64_t len = rand() % (new_size/2);
146 if (!len) len = 10;
147 interval_to_clone.insert(offset, len);
148 to_contents.clone_range(from_contents, interval_to_clone);
149 out->t->clone_range(coll_t(from.first),
150 ghobject_t(hobject_t(sobject_t(from.second, CEPH_NOSNAP))),
151 ghobject_t(hobject_t(sobject_t(to.second, CEPH_NOSNAP))),
152 offset,
153 len,
154 offset);
155 out->in_flight->push_back(make_pair(to, set_content(to, to_contents)));
156}
157
158void FileStoreTracker::clone(const pair<coll_t, string> &from,
159 const pair<coll_t, string> &to,
160 OutTransaction *out) {
9f95a23c 161 std::lock_guard l{lock};
7c673cae 162 std::cerr << "Cloning " << from << " to " << to << std::endl;
11fdf7f2 163 ceph_assert(from.first == to.first);
7c673cae
FG
164 if (from.second == to.second) {
165 return;
166 }
167 ObjectContents from_contents = get_current_content(from);
168 ObjectContents to_contents = get_current_content(to);
169 if (!from_contents.exists()) {
170 return;
171 }
172
173 if (to_contents.exists())
174 out->t->remove(coll_t(to.first),
175 ghobject_t(hobject_t(sobject_t(to.second, CEPH_NOSNAP))));
176 out->t->clone(coll_t(from.first),
177 ghobject_t(hobject_t(sobject_t(from.second, CEPH_NOSNAP))),
178 ghobject_t(hobject_t(sobject_t(to.second, CEPH_NOSNAP))));
179 out->in_flight->push_back(make_pair(to, set_content(to, from_contents)));
180}
181
182
183string obj_to_prefix(const pair<coll_t, string> &obj) {
184 string sep;
185 sep.push_back('^');
186 return obj.first.to_str() + sep + obj.second + "_CONTENTS_";
187}
188
189string obj_to_meta_prefix(const pair<coll_t, string> &obj) {
190 string sep;
191 sep.push_back('^');
192 return obj.first.to_str() + sep + obj.second;
193}
194
195string seq_to_key(uint64_t seq) {
196 char buf[50];
197 snprintf(buf, sizeof(buf), "%*llu", 20, (unsigned long long int)seq);
198 return string(buf);
199}
200
201struct ObjStatus {
202 uint64_t last_applied;
203 uint64_t last_committed;
204 uint64_t restart_seq;
205 ObjStatus() : last_applied(0), last_committed(0), restart_seq(0) {}
206
207 uint64_t get_last_applied(uint64_t seq) const {
208 if (seq > restart_seq)
209 return last_committed;
210 else
211 return last_applied;
212 }
213 void set_last_applied(uint64_t _last_applied, uint64_t seq) {
214 last_applied = _last_applied;
215 restart_seq = seq;
216 }
217 uint64_t trim_to() const {
218 return last_applied < last_committed ?
219 last_applied : last_committed;
220 }
221};
222void encode(const ObjStatus &obj, bufferlist &bl) {
11fdf7f2
TL
223 encode(obj.last_applied, bl);
224 encode(obj.last_committed, bl);
225 encode(obj.restart_seq, bl);
7c673cae 226}
11fdf7f2
TL
227void decode(ObjStatus &obj, bufferlist::const_iterator &bl) {
228 decode(obj.last_applied, bl);
229 decode(obj.last_committed, bl);
230 decode(obj.restart_seq, bl);
7c673cae
FG
231}
232
233
234ObjStatus get_obj_status(const pair<coll_t, string> &obj,
235 KeyValueDB *db)
236{
237 set<string> to_get;
238 to_get.insert("META");
239 map<string, bufferlist> got;
240 db->get(obj_to_meta_prefix(obj), to_get, &got);
241 ObjStatus retval;
242 if (!got.empty()) {
11fdf7f2
TL
243 auto bp = got.begin()->second.cbegin();
244 decode(retval, bp);
7c673cae
FG
245 }
246 return retval;
247}
248
249void set_obj_status(const pair<coll_t, string> &obj,
250 const ObjStatus &status,
251 KeyValueDB::Transaction t)
252{
253 map<string, bufferlist> to_set;
11fdf7f2 254 encode(status, to_set["META"]);
7c673cae
FG
255 t->set(obj_to_meta_prefix(obj), to_set);
256}
257
258void _clean_forward(const pair<coll_t, string> &obj,
259 uint64_t last_valid,
260 KeyValueDB *db)
261{
262 KeyValueDB::Transaction t = db->get_transaction();
263 KeyValueDB::Iterator i = db->get_iterator(obj_to_prefix(obj));
264 set<string> to_remove;
265 i->upper_bound(seq_to_key(last_valid));
266 for (; i->valid(); i->next()) {
267 to_remove.insert(i->key());
268 }
269 t->rmkeys(obj_to_prefix(obj), to_remove);
270 db->submit_transaction(t);
271}
272
273
274void FileStoreTracker::verify(const coll_t &coll, const string &obj,
275 bool on_start) {
9f95a23c 276 std::lock_guard l{lock};
7c673cae
FG
277 std::cerr << "Verifying " << make_pair(coll, obj) << std::endl;
278
279 pair<uint64_t, uint64_t> valid_reads = get_valid_reads(make_pair(coll, obj));
280 std::cerr << "valid_reads is " << valid_reads << std::endl;
281 bufferlist contents;
11fdf7f2
TL
282 auto ch = store->open_collection(coll_t(coll));
283 int r = store->read(ch,
7c673cae
FG
284 ghobject_t(hobject_t(sobject_t(obj, CEPH_NOSNAP))),
285 0,
286 2*SIZE,
287 contents);
288 std::cerr << "exists: " << r << std::endl;
289
290
291 for (uint64_t i = valid_reads.first;
292 i < valid_reads.second;
293 ++i) {
294 ObjectContents old_contents = get_content(make_pair(coll, obj), i);
295
296 std::cerr << "old_contents exists " << old_contents.exists() << std::endl;
297 if (!old_contents.exists() && (r == -ENOENT))
298 return;
299
300 if (old_contents.exists() && (r == -ENOENT))
301 continue;
302
303 if (!old_contents.exists() && (r != -ENOENT))
304 continue;
305
306 if (contents.length() != old_contents.size()) {
307 std::cerr << "old_contents.size() is "
308 << old_contents.size() << std::endl;
309 continue;
310 }
311
312 bufferlist::iterator bp = contents.begin();
313 ObjectContents::Iterator iter = old_contents.get_iterator();
314 iter.seek_to_first();
315 bool matches = true;
316 uint64_t pos = 0;
317 for (; !bp.end() && iter.valid();
318 ++iter, ++bp, ++pos) {
319 if (*iter != *bp) {
320 std::cerr << "does not match at pos " << pos << std::endl;
321 matches = false;
322 break;
323 }
324 }
325 if (matches) {
326 if (on_start)
327 _clean_forward(make_pair(coll, obj), i, db);
328 return;
329 }
330 }
331 std::cerr << "Verifying " << make_pair(coll, obj) << " failed " << std::endl;
332 ceph_abort();
333}
334
335ObjectContents FileStoreTracker::get_current_content(
336 const pair<coll_t, string> &obj)
337{
338 KeyValueDB::Iterator iter = db->get_iterator(
339 obj_to_prefix(obj));
340 iter->seek_to_last();
341 if (iter->valid()) {
342 bufferlist bl = iter->value();
11fdf7f2 343 auto bp = bl.cbegin();
7c673cae 344 pair<uint64_t, bufferlist> val;
11fdf7f2
TL
345 decode(val, bp);
346 ceph_assert(seq_to_key(val.first) == iter->key());
7c673cae
FG
347 bp = val.second.begin();
348 return ObjectContents(bp);
349 }
350 return ObjectContents();
351}
352
353ObjectContents FileStoreTracker::get_content(
354 const pair<coll_t, string> &obj, uint64_t version)
355{
356 set<string> to_get;
357 map<string, bufferlist> got;
358 to_get.insert(seq_to_key(version));
359 db->get(obj_to_prefix(obj), to_get, &got);
360 if (got.empty())
361 return ObjectContents();
362 pair<uint64_t, bufferlist> val;
11fdf7f2
TL
363 auto bp = got.begin()->second.cbegin();
364 decode(val, bp);
7c673cae 365 bp = val.second.begin();
11fdf7f2 366 ceph_assert(val.first == version);
7c673cae
FG
367 return ObjectContents(bp);
368}
369
370pair<uint64_t, uint64_t> FileStoreTracker::get_valid_reads(
371 const pair<coll_t, string> &obj)
372{
373 pair<uint64_t, uint64_t> bounds = make_pair(0,1);
374 KeyValueDB::Iterator iter = db->get_iterator(
375 obj_to_prefix(obj));
376 iter->seek_to_last();
377 if (iter->valid()) {
378 pair<uint64_t, bufferlist> val;
379 bufferlist bl = iter->value();
11fdf7f2
TL
380 auto bp = bl.cbegin();
381 decode(val, bp);
7c673cae
FG
382 bounds.second = val.first + 1;
383 }
384
385 ObjStatus obj_status = get_obj_status(obj, db);
386 bounds.first = obj_status.get_last_applied(restart_seq);
387 return bounds;
388}
389
390void clear_obsolete(const pair<coll_t, string> &obj,
391 const ObjStatus &status,
392 KeyValueDB *db,
393 KeyValueDB::Transaction t)
394{
395 KeyValueDB::Iterator iter = db->get_iterator(obj_to_prefix(obj));
396 set<string> to_remove;
397 iter->seek_to_first();
398 for (; iter->valid() && iter->key() < seq_to_key(status.trim_to());
399 iter->next())
400 to_remove.insert(iter->key());
401 t->rmkeys(obj_to_prefix(obj), to_remove);
402}
403
404void FileStoreTracker::committed(const pair<coll_t, string> &obj,
405 uint64_t seq) {
9f95a23c 406 std::lock_guard l{lock};
7c673cae 407 ObjStatus status = get_obj_status(obj, db);
11fdf7f2 408 ceph_assert(status.last_committed < seq);
7c673cae
FG
409 status.last_committed = seq;
410 KeyValueDB::Transaction t = db->get_transaction();
411 clear_obsolete(obj, status, db, t);
412 set_obj_status(obj, status, t);
413 db->submit_transaction(t);
414}
415
416void FileStoreTracker::applied(const pair<coll_t, string> &obj,
417 uint64_t seq) {
9f95a23c 418 std::lock_guard l{lock};
7c673cae
FG
419 std::cerr << "Applied " << obj << " version " << seq << std::endl;
420 ObjStatus status = get_obj_status(obj, db);
11fdf7f2 421 ceph_assert(status.last_applied < seq);
7c673cae
FG
422 status.set_last_applied(seq, restart_seq);
423 KeyValueDB::Transaction t = db->get_transaction();
424 clear_obsolete(obj, status, db, t);
425 set_obj_status(obj, status, t);
426 db->submit_transaction(t);
427}
428
429
430uint64_t FileStoreTracker::set_content(const pair<coll_t, string> &obj,
431 ObjectContents &content) {
432 KeyValueDB::Transaction t = db->get_transaction();
433 KeyValueDB::Iterator iter = db->get_iterator(
434 obj_to_prefix(obj));
435 iter->seek_to_last();
436 uint64_t most_recent = 0;
437 if (iter->valid()) {
438 pair<uint64_t, bufferlist> val;
439 bufferlist bl = iter->value();
11fdf7f2
TL
440 auto bp = bl.cbegin();
441 decode(val, bp);
7c673cae
FG
442 most_recent = val.first;
443 }
444 bufferlist buf_content;
445 content.encode(buf_content);
446 map<string, bufferlist> to_set;
11fdf7f2 447 encode(make_pair(most_recent + 1, buf_content),
7c673cae
FG
448 to_set[seq_to_key(most_recent + 1)]);
449 t->set(obj_to_prefix(obj), to_set);
450 db->submit_transaction(t);
451 return most_recent + 1;
452}