]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/objectstore/FileStoreTracker.cc
bump version to 12.2.5-pve1
[ceph.git] / ceph / src / test / objectstore / FileStoreTracker.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2#include "FileStoreTracker.h"
3#include <stdlib.h>
4#include <iostream>
5#include <boost/scoped_ptr.hpp>
6#include "include/Context.h"
7#include "common/Mutex.h"
8
9class OnApplied : public Context {
10 FileStoreTracker *tracker;
11 list<pair<pair<coll_t, string>, uint64_t> > in_flight;
12public:
13 OnApplied(FileStoreTracker *tracker,
14 list<pair<pair<coll_t, string>, uint64_t> > in_flight)
15 : tracker(tracker), in_flight(in_flight) {}
16
17 void finish(int r) override {
18 for (list<pair<pair<coll_t, string>, uint64_t> >::iterator i =
19 in_flight.begin();
20 i != in_flight.end();
21 ++i) {
22 tracker->applied(i->first, i->second);
23 }
24 }
25};
26
27class OnCommitted : public Context {
28 FileStoreTracker *tracker;
29 list<pair<pair<coll_t, string>, uint64_t> > in_flight;
30public:
31 OnCommitted(FileStoreTracker *tracker,
32 list<pair<pair<coll_t, string>, uint64_t> > in_flight)
33 : tracker(tracker), in_flight(in_flight) {}
34
35 void finish(int r) override {
36 for (list<pair<pair<coll_t, string>, uint64_t> >::iterator i =
37 in_flight.begin();
38 i != in_flight.end();
39 ++i) {
40 tracker->committed(i->first, i->second);
41 }
42 }
43};
44
45int FileStoreTracker::init()
46{
47 set<string> to_get;
48 to_get.insert("STATUS");
49 map<string, bufferlist> got;
50 db->get("STATUS", to_get, &got);
51 restart_seq = 0;
52 if (!got.empty()) {
53 bufferlist::iterator bp = got.begin()->second.begin();
54 ::decode(restart_seq, bp);
55 }
56 ++restart_seq;
57 KeyValueDB::Transaction t = db->get_transaction();
58 got.clear();
59 ::encode(restart_seq, got["STATUS"]);
60 t->set("STATUS", got);
61 db->submit_transaction(t);
62 return 0;
63}
64
65void FileStoreTracker::submit_transaction(Transaction &t)
66{
67 list<pair<pair<coll_t, string>, uint64_t> > in_flight;
68 OutTransaction out;
69 out.t = new ObjectStore::Transaction;
70 out.in_flight = &in_flight;
71 for (list<Transaction::Op*>::iterator i = t.ops.begin();
72 i != t.ops.end();
73 ++i) {
74 (**i)(this, &out);
75 }
76 store->queue_transaction(
77 0, std::move(*out.t),
78 new OnApplied(this, in_flight),
79 new OnCommitted(this, in_flight));
80 delete out.t;
81}
82
83void FileStoreTracker::write(const pair<coll_t, string> &obj,
84 OutTransaction *out)
85{
86 Mutex::Locker l(lock);
87 std::cerr << "Writing " << obj << std::endl;
88 ObjectContents contents = get_current_content(obj);
89
90 uint64_t offset = rand() % (SIZE/2);
91 uint64_t len = rand() % (SIZE/2);
92 if (!len) len = 10;
93 contents.write(rand(), offset, len);
94
95 bufferlist to_write;
96 ObjectContents::Iterator iter = contents.get_iterator();
97 iter.seek_to(offset);
98 for (uint64_t i = offset;
99 i < offset + len;
100 ++i, ++iter) {
101 assert(iter.valid());
102 to_write.append(*iter);
103 }
104 out->t->write(coll_t(obj.first),
105 ghobject_t(hobject_t(sobject_t(obj.second, CEPH_NOSNAP))),
106 offset,
107 len,
108 to_write);
109 out->in_flight->push_back(make_pair(obj, set_content(obj, contents)));
110}
111
112void FileStoreTracker::remove(const pair<coll_t, string> &obj,
113 OutTransaction *out)
114{
115 std::cerr << "Deleting " << obj << std::endl;
116 Mutex::Locker l(lock);
117 ObjectContents old_contents = get_current_content(obj);
118 if (!old_contents.exists())
119 return;
120 out->t->remove(coll_t(obj.first),
121 ghobject_t(hobject_t(sobject_t(obj.second, CEPH_NOSNAP))));
122 ObjectContents contents;
123 out->in_flight->push_back(make_pair(obj, set_content(obj, contents)));
124}
125
126void FileStoreTracker::clone_range(const pair<coll_t, string> &from,
127 const pair<coll_t, string> &to,
128 OutTransaction *out) {
129 Mutex::Locker l(lock);
130 std::cerr << "CloningRange " << from << " to " << to << std::endl;
131 assert(from.first == to.first);
132 ObjectContents from_contents = get_current_content(from);
133 ObjectContents to_contents = get_current_content(to);
134 if (!from_contents.exists()) {
135 return;
136 }
137 if (from.second == to.second) {
138 return;
139 }
140
141 uint64_t new_size = from_contents.size();
142 interval_set<uint64_t> interval_to_clone;
143 uint64_t offset = rand() % (new_size/2);
144 uint64_t len = rand() % (new_size/2);
145 if (!len) len = 10;
146 interval_to_clone.insert(offset, len);
147 to_contents.clone_range(from_contents, interval_to_clone);
148 out->t->clone_range(coll_t(from.first),
149 ghobject_t(hobject_t(sobject_t(from.second, CEPH_NOSNAP))),
150 ghobject_t(hobject_t(sobject_t(to.second, CEPH_NOSNAP))),
151 offset,
152 len,
153 offset);
154 out->in_flight->push_back(make_pair(to, set_content(to, to_contents)));
155}
156
157void FileStoreTracker::clone(const pair<coll_t, string> &from,
158 const pair<coll_t, string> &to,
159 OutTransaction *out) {
160 Mutex::Locker l(lock);
161 std::cerr << "Cloning " << from << " to " << to << std::endl;
162 assert(from.first == to.first);
163 if (from.second == to.second) {
164 return;
165 }
166 ObjectContents from_contents = get_current_content(from);
167 ObjectContents to_contents = get_current_content(to);
168 if (!from_contents.exists()) {
169 return;
170 }
171
172 if (to_contents.exists())
173 out->t->remove(coll_t(to.first),
174 ghobject_t(hobject_t(sobject_t(to.second, CEPH_NOSNAP))));
175 out->t->clone(coll_t(from.first),
176 ghobject_t(hobject_t(sobject_t(from.second, CEPH_NOSNAP))),
177 ghobject_t(hobject_t(sobject_t(to.second, CEPH_NOSNAP))));
178 out->in_flight->push_back(make_pair(to, set_content(to, from_contents)));
179}
180
181
182string obj_to_prefix(const pair<coll_t, string> &obj) {
183 string sep;
184 sep.push_back('^');
185 return obj.first.to_str() + sep + obj.second + "_CONTENTS_";
186}
187
188string obj_to_meta_prefix(const pair<coll_t, string> &obj) {
189 string sep;
190 sep.push_back('^');
191 return obj.first.to_str() + sep + obj.second;
192}
193
194string seq_to_key(uint64_t seq) {
195 char buf[50];
196 snprintf(buf, sizeof(buf), "%*llu", 20, (unsigned long long int)seq);
197 return string(buf);
198}
199
200struct ObjStatus {
201 uint64_t last_applied;
202 uint64_t last_committed;
203 uint64_t restart_seq;
204 ObjStatus() : last_applied(0), last_committed(0), restart_seq(0) {}
205
206 uint64_t get_last_applied(uint64_t seq) const {
207 if (seq > restart_seq)
208 return last_committed;
209 else
210 return last_applied;
211 }
212 void set_last_applied(uint64_t _last_applied, uint64_t seq) {
213 last_applied = _last_applied;
214 restart_seq = seq;
215 }
216 uint64_t trim_to() const {
217 return last_applied < last_committed ?
218 last_applied : last_committed;
219 }
220};
221void encode(const ObjStatus &obj, bufferlist &bl) {
222 ::encode(obj.last_applied, bl);
223 ::encode(obj.last_committed, bl);
224 ::encode(obj.restart_seq, bl);
225}
226void decode(ObjStatus &obj, bufferlist::iterator &bl) {
227 ::decode(obj.last_applied, bl);
228 ::decode(obj.last_committed, bl);
229 ::decode(obj.restart_seq, bl);
230}
231
232
233ObjStatus get_obj_status(const pair<coll_t, string> &obj,
234 KeyValueDB *db)
235{
236 set<string> to_get;
237 to_get.insert("META");
238 map<string, bufferlist> got;
239 db->get(obj_to_meta_prefix(obj), to_get, &got);
240 ObjStatus retval;
241 if (!got.empty()) {
242 bufferlist::iterator bp = got.begin()->second.begin();
243 ::decode(retval, bp);
244 }
245 return retval;
246}
247
248void set_obj_status(const pair<coll_t, string> &obj,
249 const ObjStatus &status,
250 KeyValueDB::Transaction t)
251{
252 map<string, bufferlist> to_set;
253 ::encode(status, to_set["META"]);
254 t->set(obj_to_meta_prefix(obj), to_set);
255}
256
257void _clean_forward(const pair<coll_t, string> &obj,
258 uint64_t last_valid,
259 KeyValueDB *db)
260{
261 KeyValueDB::Transaction t = db->get_transaction();
262 KeyValueDB::Iterator i = db->get_iterator(obj_to_prefix(obj));
263 set<string> to_remove;
264 i->upper_bound(seq_to_key(last_valid));
265 for (; i->valid(); i->next()) {
266 to_remove.insert(i->key());
267 }
268 t->rmkeys(obj_to_prefix(obj), to_remove);
269 db->submit_transaction(t);
270}
271
272
273void FileStoreTracker::verify(const coll_t &coll, const string &obj,
274 bool on_start) {
275 Mutex::Locker l(lock);
276 std::cerr << "Verifying " << make_pair(coll, obj) << std::endl;
277
278 pair<uint64_t, uint64_t> valid_reads = get_valid_reads(make_pair(coll, obj));
279 std::cerr << "valid_reads is " << valid_reads << std::endl;
280 bufferlist contents;
281 int r = store->read(coll_t(coll),
282 ghobject_t(hobject_t(sobject_t(obj, CEPH_NOSNAP))),
283 0,
284 2*SIZE,
285 contents);
286 std::cerr << "exists: " << r << std::endl;
287
288
289 for (uint64_t i = valid_reads.first;
290 i < valid_reads.second;
291 ++i) {
292 ObjectContents old_contents = get_content(make_pair(coll, obj), i);
293
294 std::cerr << "old_contents exists " << old_contents.exists() << std::endl;
295 if (!old_contents.exists() && (r == -ENOENT))
296 return;
297
298 if (old_contents.exists() && (r == -ENOENT))
299 continue;
300
301 if (!old_contents.exists() && (r != -ENOENT))
302 continue;
303
304 if (contents.length() != old_contents.size()) {
305 std::cerr << "old_contents.size() is "
306 << old_contents.size() << std::endl;
307 continue;
308 }
309
310 bufferlist::iterator bp = contents.begin();
311 ObjectContents::Iterator iter = old_contents.get_iterator();
312 iter.seek_to_first();
313 bool matches = true;
314 uint64_t pos = 0;
315 for (; !bp.end() && iter.valid();
316 ++iter, ++bp, ++pos) {
317 if (*iter != *bp) {
318 std::cerr << "does not match at pos " << pos << std::endl;
319 matches = false;
320 break;
321 }
322 }
323 if (matches) {
324 if (on_start)
325 _clean_forward(make_pair(coll, obj), i, db);
326 return;
327 }
328 }
329 std::cerr << "Verifying " << make_pair(coll, obj) << " failed " << std::endl;
330 ceph_abort();
331}
332
333ObjectContents FileStoreTracker::get_current_content(
334 const pair<coll_t, string> &obj)
335{
336 KeyValueDB::Iterator iter = db->get_iterator(
337 obj_to_prefix(obj));
338 iter->seek_to_last();
339 if (iter->valid()) {
340 bufferlist bl = iter->value();
341 bufferlist::iterator bp = bl.begin();
342 pair<uint64_t, bufferlist> val;
343 ::decode(val, bp);
344 assert(seq_to_key(val.first) == iter->key());
345 bp = val.second.begin();
346 return ObjectContents(bp);
347 }
348 return ObjectContents();
349}
350
351ObjectContents FileStoreTracker::get_content(
352 const pair<coll_t, string> &obj, uint64_t version)
353{
354 set<string> to_get;
355 map<string, bufferlist> got;
356 to_get.insert(seq_to_key(version));
357 db->get(obj_to_prefix(obj), to_get, &got);
358 if (got.empty())
359 return ObjectContents();
360 pair<uint64_t, bufferlist> val;
361 bufferlist::iterator bp = got.begin()->second.begin();
362 ::decode(val, bp);
363 bp = val.second.begin();
364 assert(val.first == version);
365 return ObjectContents(bp);
366}
367
368pair<uint64_t, uint64_t> FileStoreTracker::get_valid_reads(
369 const pair<coll_t, string> &obj)
370{
371 pair<uint64_t, uint64_t> bounds = make_pair(0,1);
372 KeyValueDB::Iterator iter = db->get_iterator(
373 obj_to_prefix(obj));
374 iter->seek_to_last();
375 if (iter->valid()) {
376 pair<uint64_t, bufferlist> val;
377 bufferlist bl = iter->value();
378 bufferlist::iterator bp = bl.begin();
379 ::decode(val, bp);
380 bounds.second = val.first + 1;
381 }
382
383 ObjStatus obj_status = get_obj_status(obj, db);
384 bounds.first = obj_status.get_last_applied(restart_seq);
385 return bounds;
386}
387
388void clear_obsolete(const pair<coll_t, string> &obj,
389 const ObjStatus &status,
390 KeyValueDB *db,
391 KeyValueDB::Transaction t)
392{
393 KeyValueDB::Iterator iter = db->get_iterator(obj_to_prefix(obj));
394 set<string> to_remove;
395 iter->seek_to_first();
396 for (; iter->valid() && iter->key() < seq_to_key(status.trim_to());
397 iter->next())
398 to_remove.insert(iter->key());
399 t->rmkeys(obj_to_prefix(obj), to_remove);
400}
401
402void FileStoreTracker::committed(const pair<coll_t, string> &obj,
403 uint64_t seq) {
404 Mutex::Locker l(lock);
405 ObjStatus status = get_obj_status(obj, db);
406 assert(status.last_committed < seq);
407 status.last_committed = seq;
408 KeyValueDB::Transaction t = db->get_transaction();
409 clear_obsolete(obj, status, db, t);
410 set_obj_status(obj, status, t);
411 db->submit_transaction(t);
412}
413
414void FileStoreTracker::applied(const pair<coll_t, string> &obj,
415 uint64_t seq) {
416 Mutex::Locker l(lock);
417 std::cerr << "Applied " << obj << " version " << seq << std::endl;
418 ObjStatus status = get_obj_status(obj, db);
419 assert(status.last_applied < seq);
420 status.set_last_applied(seq, restart_seq);
421 KeyValueDB::Transaction t = db->get_transaction();
422 clear_obsolete(obj, status, db, t);
423 set_obj_status(obj, status, t);
424 db->submit_transaction(t);
425}
426
427
428uint64_t FileStoreTracker::set_content(const pair<coll_t, string> &obj,
429 ObjectContents &content) {
430 KeyValueDB::Transaction t = db->get_transaction();
431 KeyValueDB::Iterator iter = db->get_iterator(
432 obj_to_prefix(obj));
433 iter->seek_to_last();
434 uint64_t most_recent = 0;
435 if (iter->valid()) {
436 pair<uint64_t, bufferlist> val;
437 bufferlist bl = iter->value();
438 bufferlist::iterator bp = bl.begin();
439 ::decode(val, bp);
440 most_recent = val.first;
441 }
442 bufferlist buf_content;
443 content.encode(buf_content);
444 map<string, bufferlist> to_set;
445 ::encode(make_pair(most_recent + 1, buf_content),
446 to_set[seq_to_key(most_recent + 1)]);
447 t->set(obj_to_prefix(obj), to_set);
448 db->submit_transaction(t);
449 return most_recent + 1;
450}