]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/osd/RadosModel.h
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / test / osd / RadosModel.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #include "include/int_types.h"
4
5 #include "common/ceph_mutex.h"
6 #include "include/rados/librados.hpp"
7
8 #include <iostream>
9 #include <iterator>
10 #include <sstream>
11 #include <map>
12 #include <set>
13 #include <list>
14 #include <string>
15 #include <string.h>
16 #include <stdlib.h>
17 #include <errno.h>
18 #include <time.h>
19 #include "Object.h"
20 #include "TestOpStat.h"
21 #include "test/librados/test.h"
22 #include "common/sharedptr_registry.hpp"
23 #include "common/errno.h"
24 #include "osd/HitSet.h"
25
26 #ifndef RADOSMODEL_H
27 #define RADOSMODEL_H
28
29 using namespace std;
30
31 class RadosTestContext;
32 class TestOpStat;
33
34 template <typename T>
35 typename T::iterator rand_choose(T &cont) {
36 if (std::empty(cont)) {
37 return std::end(cont);
38 }
39 return std::next(std::begin(cont), rand() % cont.size());
40 }
41
42 enum TestOpType {
43 TEST_OP_READ,
44 TEST_OP_WRITE,
45 TEST_OP_WRITE_EXCL,
46 TEST_OP_WRITESAME,
47 TEST_OP_DELETE,
48 TEST_OP_SNAP_CREATE,
49 TEST_OP_SNAP_REMOVE,
50 TEST_OP_ROLLBACK,
51 TEST_OP_SETATTR,
52 TEST_OP_RMATTR,
53 TEST_OP_WATCH,
54 TEST_OP_COPY_FROM,
55 TEST_OP_HIT_SET_LIST,
56 TEST_OP_UNDIRTY,
57 TEST_OP_IS_DIRTY,
58 TEST_OP_CACHE_FLUSH,
59 TEST_OP_CACHE_TRY_FLUSH,
60 TEST_OP_CACHE_EVICT,
61 TEST_OP_APPEND,
62 TEST_OP_APPEND_EXCL,
63 TEST_OP_SET_REDIRECT,
64 TEST_OP_UNSET_REDIRECT,
65 TEST_OP_CHUNK_READ,
66 TEST_OP_TIER_PROMOTE,
67 TEST_OP_TIER_FLUSH
68 };
69
70 class TestWatchContext : public librados::WatchCtx2 {
71 TestWatchContext(const TestWatchContext&);
72 public:
73 ceph::condition_variable cond;
74 uint64_t handle = 0;
75 bool waiting = false;
76 ceph::mutex lock = ceph::make_mutex("watch lock");
77 TestWatchContext() = default;
78 void handle_notify(uint64_t notify_id, uint64_t cookie,
79 uint64_t notifier_id,
80 bufferlist &bl) override {
81 std::lock_guard l{lock};
82 waiting = false;
83 cond.notify_all();
84 }
85 void handle_error(uint64_t cookie, int err) override {
86 std::lock_guard l{lock};
87 cout << "watch handle_error " << err << std::endl;
88 }
89 void start() {
90 std::lock_guard l{lock};
91 waiting = true;
92 }
93 void wait() {
94 std::unique_lock l{lock};
95 cond.wait(l, [this] { return !waiting; });
96 }
97 uint64_t &get_handle() {
98 return handle;
99 }
100 };
101
102 class TestOp {
103 public:
104 const int num;
105 RadosTestContext *context;
106 TestOpStat *stat;
107 bool done = false;
108 TestOp(int n, RadosTestContext *context,
109 TestOpStat *stat = 0)
110 : num(n),
111 context(context),
112 stat(stat)
113 {}
114
115 virtual ~TestOp() {};
116
117 /**
118 * This struct holds data to be passed by a callback
119 * to a TestOp::finish method.
120 */
121 struct CallbackInfo {
122 uint64_t id;
123 explicit CallbackInfo(uint64_t id) : id(id) {}
124 virtual ~CallbackInfo() {};
125 };
126
127 virtual void _begin() = 0;
128
129 /**
130 * Called when the operation completes.
131 * This should be overridden by asynchronous operations.
132 *
133 * @param info information stored by a callback, or NULL -
134 * useful for multi-operation TestOps
135 */
136 virtual void _finish(CallbackInfo *info)
137 {
138 return;
139 }
140 virtual string getType() = 0;
141 virtual bool finished()
142 {
143 return true;
144 }
145
146 void begin();
147 void finish(CallbackInfo *info);
148 virtual bool must_quiesce_other_ops() { return false; }
149 };
150
151 class TestOpGenerator {
152 public:
153 virtual ~TestOpGenerator() {};
154 virtual TestOp *next(RadosTestContext &context) = 0;
155 };
156
157 class RadosTestContext {
158 public:
159 ceph::mutex state_lock = ceph::make_mutex("Context Lock");
160 ceph::condition_variable wait_cond;
161 // snap => {oid => desc}
162 map<int, map<string,ObjectDesc> > pool_obj_cont;
163 set<string> oid_in_use;
164 set<string> oid_not_in_use;
165 set<string> oid_flushing;
166 set<string> oid_not_flushing;
167 set<string> oid_redirect_not_in_use;
168 set<string> oid_redirect_in_use;
169 SharedPtrRegistry<int, int> snaps_in_use;
170 int current_snap;
171 string pool_name;
172 librados::IoCtx io_ctx;
173 librados::Rados rados;
174 int next_oid;
175 string prefix;
176 int errors;
177 int max_in_flight;
178 int seq_num;
179 map<int,uint64_t> snaps;
180 uint64_t seq;
181 const char *rados_id;
182 bool initialized;
183 map<string, TestWatchContext*> watches;
184 const uint64_t max_size;
185 const uint64_t min_stride_size;
186 const uint64_t max_stride_size;
187 AttrGenerator attr_gen;
188 const bool no_omap;
189 const bool no_sparse;
190 bool pool_snaps;
191 bool write_fadvise_dontneed;
192 string low_tier_pool_name;
193 librados::IoCtx low_tier_io_ctx;
194 int snapname_num;
195 map<string,string > redirect_objs;
196 bool enable_dedup;
197
198 RadosTestContext(const string &pool_name,
199 int max_in_flight,
200 uint64_t max_size,
201 uint64_t min_stride_size,
202 uint64_t max_stride_size,
203 bool no_omap,
204 bool no_sparse,
205 bool pool_snaps,
206 bool write_fadvise_dontneed,
207 const string &low_tier_pool_name,
208 bool enable_dedup,
209 const char *id = 0) :
210 pool_obj_cont(),
211 current_snap(0),
212 pool_name(pool_name),
213 next_oid(0),
214 errors(0),
215 max_in_flight(max_in_flight),
216 seq_num(0), seq(0),
217 rados_id(id), initialized(false),
218 max_size(max_size),
219 min_stride_size(min_stride_size), max_stride_size(max_stride_size),
220 attr_gen(2000, 20000),
221 no_omap(no_omap),
222 no_sparse(no_sparse),
223 pool_snaps(pool_snaps),
224 write_fadvise_dontneed(write_fadvise_dontneed),
225 low_tier_pool_name(low_tier_pool_name),
226 snapname_num(0),
227 enable_dedup(enable_dedup)
228 {
229 }
230
231 int init()
232 {
233 int r = rados.init(rados_id);
234 if (r < 0)
235 return r;
236 r = rados.conf_read_file(NULL);
237 if (r < 0)
238 return r;
239 r = rados.conf_parse_env(NULL);
240 if (r < 0)
241 return r;
242 r = rados.connect();
243 if (r < 0)
244 return r;
245 r = rados.ioctx_create(pool_name.c_str(), io_ctx);
246 if (r < 0) {
247 rados.shutdown();
248 return r;
249 }
250 if (!low_tier_pool_name.empty()) {
251 r = rados.ioctx_create(low_tier_pool_name.c_str(), low_tier_io_ctx);
252 if (r < 0) {
253 rados.shutdown();
254 return r;
255 }
256 }
257 bufferlist inbl;
258 r = rados.mon_command(
259 "{\"prefix\": \"osd pool set\", \"pool\": \"" + pool_name +
260 "\", \"var\": \"write_fadvise_dontneed\", \"val\": \"" + (write_fadvise_dontneed ? "true" : "false") + "\"}",
261 inbl, NULL, NULL);
262 if (r < 0) {
263 rados.shutdown();
264 return r;
265 }
266 if (enable_dedup) {
267 r = rados.mon_command(
268 "{\"prefix\": \"osd pool set\", \"pool\": \"" + pool_name +
269 "\", \"var\": \"fingerprint_algorithm\", \"val\": \"" + "sha256" + "\"}",
270 inbl, NULL, NULL);
271 if (r < 0) {
272 rados.shutdown();
273 return r;
274 }
275 }
276
277 char hostname_cstr[100];
278 gethostname(hostname_cstr, 100);
279 stringstream hostpid;
280 hostpid << hostname_cstr << getpid() << "-";
281 prefix = hostpid.str();
282 ceph_assert(!initialized);
283 initialized = true;
284 return 0;
285 }
286
287 void shutdown()
288 {
289 if (initialized) {
290 rados.shutdown();
291 }
292 }
293
294 void loop(TestOpGenerator *gen)
295 {
296 ceph_assert(initialized);
297 list<TestOp*> inflight;
298 std::unique_lock state_locker{state_lock};
299
300 TestOp *next = gen->next(*this);
301 TestOp *waiting = NULL;
302
303 while (next || !inflight.empty()) {
304 if (next && next->must_quiesce_other_ops() && !inflight.empty()) {
305 waiting = next;
306 next = NULL; // Force to wait for inflight to drain
307 }
308 if (next) {
309 inflight.push_back(next);
310 }
311 state_lock.unlock();
312 if (next) {
313 (*inflight.rbegin())->begin();
314 }
315 state_lock.lock();
316 while (1) {
317 for (list<TestOp*>::iterator i = inflight.begin();
318 i != inflight.end();) {
319 if ((*i)->finished()) {
320 cout << (*i)->num << ": done (" << (inflight.size()-1) << " left)" << std::endl;
321 delete *i;
322 inflight.erase(i++);
323 } else {
324 ++i;
325 }
326 }
327
328 if (inflight.size() >= (unsigned) max_in_flight || (!next && !inflight.empty())) {
329 cout << " waiting on " << inflight.size() << std::endl;
330 wait_cond.wait(state_locker);
331 } else {
332 break;
333 }
334 }
335 if (waiting) {
336 next = waiting;
337 waiting = NULL;
338 } else {
339 next = gen->next(*this);
340 }
341 }
342 }
343
344 void kick()
345 {
346 wait_cond.notify_all();
347 }
348
349 TestWatchContext *get_watch_context(const string &oid) {
350 return watches.count(oid) ? watches[oid] : 0;
351 }
352
353 TestWatchContext *watch(const string &oid) {
354 ceph_assert(!watches.count(oid));
355 return (watches[oid] = new TestWatchContext);
356 }
357
358 void unwatch(const string &oid) {
359 ceph_assert(watches.count(oid));
360 delete watches[oid];
361 watches.erase(oid);
362 }
363
364 ObjectDesc get_most_recent(const string &oid) {
365 ObjectDesc new_obj;
366 for (map<int, map<string,ObjectDesc> >::reverse_iterator i =
367 pool_obj_cont.rbegin();
368 i != pool_obj_cont.rend();
369 ++i) {
370 map<string,ObjectDesc>::iterator j = i->second.find(oid);
371 if (j != i->second.end()) {
372 new_obj = j->second;
373 break;
374 }
375 }
376 return new_obj;
377 }
378
379 void rm_object_attrs(const string &oid, const set<string> &attrs)
380 {
381 ObjectDesc new_obj = get_most_recent(oid);
382 for (set<string>::const_iterator i = attrs.begin();
383 i != attrs.end();
384 ++i) {
385 new_obj.attrs.erase(*i);
386 }
387 new_obj.dirty = true;
388 pool_obj_cont[current_snap].insert_or_assign(oid, new_obj);
389 }
390
391 void remove_object_header(const string &oid)
392 {
393 ObjectDesc new_obj = get_most_recent(oid);
394 new_obj.header = bufferlist();
395 new_obj.dirty = true;
396 pool_obj_cont[current_snap].insert_or_assign(oid, new_obj);
397 }
398
399
400 void update_object_header(const string &oid, const bufferlist &bl)
401 {
402 ObjectDesc new_obj = get_most_recent(oid);
403 new_obj.header = bl;
404 new_obj.exists = true;
405 new_obj.dirty = true;
406 pool_obj_cont[current_snap].insert_or_assign(oid, new_obj);
407 }
408
409 void update_object_attrs(const string &oid, const map<string, ContDesc> &attrs)
410 {
411 ObjectDesc new_obj = get_most_recent(oid);
412 for (map<string, ContDesc>::const_iterator i = attrs.begin();
413 i != attrs.end();
414 ++i) {
415 new_obj.attrs[i->first] = i->second;
416 }
417 new_obj.exists = true;
418 new_obj.dirty = true;
419 pool_obj_cont[current_snap].insert_or_assign(oid, new_obj);
420 }
421
422 void update_object(ContentsGenerator *cont_gen,
423 const string &oid, const ContDesc &contents)
424 {
425 ObjectDesc new_obj = get_most_recent(oid);
426 new_obj.exists = true;
427 new_obj.dirty = true;
428 new_obj.update(cont_gen,
429 contents);
430 pool_obj_cont[current_snap].insert_or_assign(oid, new_obj);
431 }
432
433 void update_object_full(const string &oid, const ObjectDesc &contents)
434 {
435 pool_obj_cont[current_snap].insert_or_assign(oid, contents);
436 pool_obj_cont[current_snap][oid].dirty = true;
437 }
438
439 void update_object_undirty(const string &oid)
440 {
441 ObjectDesc new_obj = get_most_recent(oid);
442 new_obj.dirty = false;
443 pool_obj_cont[current_snap].insert_or_assign(oid, new_obj);
444 }
445
446 void update_object_version(const string &oid, uint64_t version,
447 int snap = -1)
448 {
449 for (map<int, map<string,ObjectDesc> >::reverse_iterator i =
450 pool_obj_cont.rbegin();
451 i != pool_obj_cont.rend();
452 ++i) {
453 if (snap != -1 && snap < i->first)
454 continue;
455 map<string,ObjectDesc>::iterator j = i->second.find(oid);
456 if (j != i->second.end()) {
457 if (version)
458 j->second.version = version;
459 cout << __func__ << " oid " << oid
460 << " v " << version << " " << j->second.most_recent()
461 << " " << (j->second.dirty ? "dirty" : "clean")
462 << " " << (j->second.exists ? "exists" : "dne")
463 << std::endl;
464 break;
465 }
466 }
467 }
468
469 void remove_object(const string &oid)
470 {
471 ceph_assert(!get_watch_context(oid));
472 ObjectDesc new_obj;
473 pool_obj_cont[current_snap].insert_or_assign(oid, new_obj);
474 }
475
476 bool find_object(const string &oid, ObjectDesc *contents, int snap = -1) const
477 {
478 for (map<int, map<string,ObjectDesc> >::const_reverse_iterator i =
479 pool_obj_cont.rbegin();
480 i != pool_obj_cont.rend();
481 ++i) {
482 if (snap != -1 && snap < i->first) continue;
483 if (i->second.count(oid) != 0) {
484 *contents = i->second.find(oid)->second;
485 return true;
486 }
487 }
488 return false;
489 }
490
491 void update_object_redirect_target(const string &oid, const string &target)
492 {
493 redirect_objs[oid] = target;
494 }
495
496 void update_object_chunk_target(const string &oid, uint64_t offset, const ChunkDesc &info)
497 {
498 for (map<int, map<string,ObjectDesc> >::const_reverse_iterator i =
499 pool_obj_cont.rbegin();
500 i != pool_obj_cont.rend();
501 ++i) {
502 if (i->second.count(oid) != 0) {
503 ObjectDesc obj_desc = i->second.find(oid)->second;
504 obj_desc.chunk_info[offset] = info;
505 update_object_full(oid, obj_desc);
506 return ;
507 }
508 }
509 return;
510 }
511
512 bool object_existed_at(const string &oid, int snap = -1) const
513 {
514 ObjectDesc contents;
515 bool found = find_object(oid, &contents, snap);
516 return found && contents.exists;
517 }
518
519 void remove_snap(int snap)
520 {
521 map<int, map<string,ObjectDesc> >::iterator next_iter = pool_obj_cont.find(snap);
522 ceph_assert(next_iter != pool_obj_cont.end());
523 map<int, map<string,ObjectDesc> >::iterator current_iter = next_iter++;
524 ceph_assert(current_iter != pool_obj_cont.end());
525 map<string,ObjectDesc> &current = current_iter->second;
526 map<string,ObjectDesc> &next = next_iter->second;
527 for (map<string,ObjectDesc>::iterator i = current.begin();
528 i != current.end();
529 ++i) {
530 if (next.count(i->first) == 0) {
531 next.insert(pair<string,ObjectDesc>(i->first, i->second));
532 }
533 }
534 pool_obj_cont.erase(current_iter);
535 snaps.erase(snap);
536 }
537
538 void add_snap(uint64_t snap)
539 {
540 snaps[current_snap] = snap;
541 current_snap++;
542 pool_obj_cont[current_snap];
543 seq = snap;
544 }
545
546 void roll_back(const string &oid, int snap)
547 {
548 ceph_assert(!get_watch_context(oid));
549 ObjectDesc contents;
550 find_object(oid, &contents, snap);
551 contents.dirty = true;
552 pool_obj_cont.rbegin()->second.insert_or_assign(oid, contents);
553 }
554 };
555
556 void read_callback(librados::completion_t comp, void *arg);
557 void write_callback(librados::completion_t comp, void *arg);
558
559 /// remove random xattrs from given object, and optionally remove omap
560 /// entries if @c no_omap is not specified in context
561 class RemoveAttrsOp : public TestOp {
562 public:
563 string oid;
564 librados::ObjectWriteOperation op;
565 librados::AioCompletion *comp;
566 RemoveAttrsOp(int n, RadosTestContext *context,
567 const string &oid,
568 TestOpStat *stat)
569 : TestOp(n, context, stat), oid(oid), comp(NULL)
570 {}
571
572 void _begin() override
573 {
574 ContDesc cont;
575 set<string> to_remove;
576 {
577 std::lock_guard l{context->state_lock};
578 ObjectDesc obj;
579 if (!context->find_object(oid, &obj)) {
580 context->kick();
581 done = true;
582 return;
583 }
584 cont = ContDesc(context->seq_num, context->current_snap,
585 context->seq_num, "");
586 context->oid_in_use.insert(oid);
587 context->oid_not_in_use.erase(oid);
588
589 if (rand() % 30) {
590 ContentsGenerator::iterator iter = context->attr_gen.get_iterator(cont);
591 for (map<string, ContDesc>::iterator i = obj.attrs.begin();
592 i != obj.attrs.end();
593 ++i, ++iter) {
594 if (!(*iter % 3)) {
595 to_remove.insert(i->first);
596 op.rmxattr(i->first.c_str());
597 }
598 }
599 if (to_remove.empty()) {
600 context->kick();
601 context->oid_in_use.erase(oid);
602 context->oid_not_in_use.insert(oid);
603 done = true;
604 return;
605 }
606 if (!context->no_omap) {
607 op.omap_rm_keys(to_remove);
608 }
609 } else {
610 if (!context->no_omap) {
611 op.omap_clear();
612 }
613 for (map<string, ContDesc>::iterator i = obj.attrs.begin();
614 i != obj.attrs.end();
615 ++i) {
616 op.rmxattr(i->first.c_str());
617 to_remove.insert(i->first);
618 }
619 context->remove_object_header(oid);
620 }
621 context->rm_object_attrs(oid, to_remove);
622 }
623
624 pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
625 new pair<TestOp*, TestOp::CallbackInfo*>(this,
626 new TestOp::CallbackInfo(0));
627 comp = context->rados.aio_create_completion((void*) cb_arg,
628 &write_callback);
629 context->io_ctx.aio_operate(context->prefix+oid, comp, &op);
630 }
631
632 void _finish(CallbackInfo *info) override
633 {
634 std::lock_guard l{context->state_lock};
635 done = true;
636 context->update_object_version(oid, comp->get_version64());
637 context->oid_in_use.erase(oid);
638 context->oid_not_in_use.insert(oid);
639 context->kick();
640 }
641
642 bool finished() override
643 {
644 return done;
645 }
646
647 string getType() override
648 {
649 return "RemoveAttrsOp";
650 }
651 };
652
653 /// add random xattrs to given object, and optionally add omap
654 /// entries if @c no_omap is not specified in context
655 class SetAttrsOp : public TestOp {
656 public:
657 string oid;
658 librados::ObjectWriteOperation op;
659 librados::AioCompletion *comp;
660 SetAttrsOp(int n,
661 RadosTestContext *context,
662 const string &oid,
663 TestOpStat *stat)
664 : TestOp(n, context, stat),
665 oid(oid), comp(NULL)
666 {}
667
668 void _begin() override
669 {
670 ContDesc cont;
671 {
672 std::lock_guard l{context->state_lock};
673 cont = ContDesc(context->seq_num, context->current_snap,
674 context->seq_num, "");
675 context->oid_in_use.insert(oid);
676 context->oid_not_in_use.erase(oid);
677 }
678
679 map<string, bufferlist> omap_contents;
680 map<string, ContDesc> omap;
681 bufferlist header;
682 ContentsGenerator::iterator keygen = context->attr_gen.get_iterator(cont);
683 op.create(false);
684 while (!*keygen) ++keygen;
685 while (*keygen) {
686 if (*keygen != '_')
687 header.append(*keygen);
688 ++keygen;
689 }
690 for (int i = 0; i < 20; ++i) {
691 string key;
692 while (!*keygen) ++keygen;
693 while (*keygen && key.size() < 40) {
694 key.push_back((*keygen % 20) + 'a');
695 ++keygen;
696 }
697 ContDesc val(cont);
698 val.seqnum += (unsigned)(*keygen);
699 val.prefix = ("oid: " + oid);
700 omap[key] = val;
701 bufferlist val_buffer = context->attr_gen.gen_bl(val);
702 omap_contents[key] = val_buffer;
703 op.setxattr(key.c_str(), val_buffer);
704 }
705 if (!context->no_omap) {
706 op.omap_set_header(header);
707 op.omap_set(omap_contents);
708 }
709
710 {
711 std::lock_guard l{context->state_lock};
712 context->update_object_header(oid, header);
713 context->update_object_attrs(oid, omap);
714 }
715
716 pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
717 new pair<TestOp*, TestOp::CallbackInfo*>(this,
718 new TestOp::CallbackInfo(0));
719 comp = context->rados.aio_create_completion((void*) cb_arg, &write_callback);
720 context->io_ctx.aio_operate(context->prefix+oid, comp, &op);
721 }
722
723 void _finish(CallbackInfo *info) override
724 {
725 std::lock_guard l{context->state_lock};
726 int r;
727 if ((r = comp->get_return_value())) {
728 cerr << "err " << r << std::endl;
729 ceph_abort();
730 }
731 done = true;
732 context->update_object_version(oid, comp->get_version64());
733 context->oid_in_use.erase(oid);
734 context->oid_not_in_use.insert(oid);
735 context->kick();
736 }
737
738 bool finished() override
739 {
740 return done;
741 }
742
743 string getType() override
744 {
745 return "SetAttrsOp";
746 }
747 };
748
749 class WriteOp : public TestOp {
750 public:
751 const string oid;
752 ContDesc cont;
753 set<librados::AioCompletion *> waiting;
754 librados::AioCompletion *rcompletion = nullptr;
755 // numbers of async ops submitted
756 uint64_t waiting_on = 0;
757 uint64_t last_acked_tid = 0;
758
759 librados::ObjectReadOperation read_op;
760 librados::ObjectWriteOperation write_op;
761 bufferlist rbuffer;
762
763 const bool do_append;
764 const bool do_excl;
765
766 WriteOp(int n,
767 RadosTestContext *context,
768 const string &oid,
769 bool do_append,
770 bool do_excl,
771 TestOpStat *stat = 0)
772 : TestOp(n, context, stat),
773 oid(oid),
774 do_append(do_append),
775 do_excl(do_excl)
776 {}
777
778 void _begin() override
779 {
780 assert(!done);
781 stringstream acc;
782 std::lock_guard state_locker{context->state_lock};
783 acc << context->prefix << "OID: " << oid << " snap " << context->current_snap << std::endl;
784 string prefix = acc.str();
785
786 cont = ContDesc(context->seq_num, context->current_snap, context->seq_num, prefix);
787
788 ContentsGenerator *cont_gen;
789 if (do_append) {
790 ObjectDesc old_value;
791 bool found = context->find_object(oid, &old_value);
792 uint64_t prev_length = found && old_value.has_contents() ?
793 old_value.most_recent_gen()->get_length(old_value.most_recent()) :
794 0;
795 bool requires_alignment;
796 int r = context->io_ctx.pool_requires_alignment2(&requires_alignment);
797 ceph_assert(r == 0);
798 uint64_t alignment = 0;
799 if (requires_alignment) {
800 r = context->io_ctx.pool_required_alignment2(&alignment);
801 ceph_assert(r == 0);
802 ceph_assert(alignment != 0);
803 }
804 cont_gen = new AppendGenerator(
805 prev_length,
806 alignment,
807 context->min_stride_size,
808 context->max_stride_size,
809 3);
810 } else {
811 cont_gen = new VarLenGenerator(
812 context->max_size, context->min_stride_size, context->max_stride_size);
813 }
814 context->update_object(cont_gen, oid, cont);
815
816 context->oid_in_use.insert(oid);
817 context->oid_not_in_use.erase(oid);
818
819 map<uint64_t, uint64_t> ranges;
820
821 cont_gen->get_ranges_map(cont, ranges);
822 std::cout << num << ": seq_num " << context->seq_num << " ranges " << ranges << std::endl;
823 context->seq_num++;
824
825 waiting_on = ranges.size();
826 ContentsGenerator::iterator gen_pos = cont_gen->get_iterator(cont);
827 // assure that tid is greater than last_acked_tid
828 uint64_t tid = last_acked_tid + 1;
829 for (auto [offset, len] : ranges) {
830 gen_pos.seek(offset);
831 bufferlist to_write = gen_pos.gen_bl_advance(len);
832 ceph_assert(to_write.length() == len);
833 ceph_assert(to_write.length() > 0);
834 std::cout << num << ": writing " << context->prefix+oid
835 << " from " << offset
836 << " to " << len + offset << " tid " << tid << std::endl;
837 auto cb_arg =
838 new pair<TestOp*, TestOp::CallbackInfo*>(this,
839 new TestOp::CallbackInfo(tid++));
840 librados::AioCompletion *completion =
841 context->rados.aio_create_completion((void*) cb_arg, &write_callback);
842 waiting.insert(completion);
843 librados::ObjectWriteOperation op;
844 if (do_append) {
845 op.append(to_write);
846 } else {
847 op.write(offset, to_write);
848 }
849 if (do_excl && cb_arg->second->id == last_acked_tid + 1)
850 op.assert_exists();
851 context->io_ctx.aio_operate(
852 context->prefix+oid, completion,
853 &op);
854 }
855
856 bufferlist contbl;
857 encode(cont, contbl);
858 pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
859 new pair<TestOp*, TestOp::CallbackInfo*>(
860 this,
861 new TestOp::CallbackInfo(tid++));
862 librados::AioCompletion *completion = context->rados.aio_create_completion(
863 (void*) cb_arg, &write_callback);
864 waiting.insert(completion);
865 waiting_on++;
866 write_op.setxattr("_header", contbl);
867 if (!do_append) {
868 write_op.truncate(cont_gen->get_length(cont));
869 }
870 context->io_ctx.aio_operate(
871 context->prefix+oid, completion, &write_op);
872
873 cb_arg =
874 new pair<TestOp*, TestOp::CallbackInfo*>(
875 this,
876 new TestOp::CallbackInfo(tid++));
877 rcompletion = context->rados.aio_create_completion(
878 (void*) cb_arg, &write_callback);
879 waiting_on++;
880 read_op.read(0, 1, &rbuffer, 0);
881 context->io_ctx.aio_operate(
882 context->prefix+oid, rcompletion,
883 &read_op,
884 librados::OPERATION_ORDER_READS_WRITES, // order wrt previous write/update
885 0);
886 }
887
888 void _finish(CallbackInfo *info) override
889 {
890 ceph_assert(info);
891 std::lock_guard state_locker{context->state_lock};
892 uint64_t tid = info->id;
893
894 cout << num << ": finishing write tid " << tid << " to " << context->prefix + oid << std::endl;
895
896 if (tid <= last_acked_tid) {
897 cerr << "Error: finished tid " << tid
898 << " when last_acked_tid was " << last_acked_tid << std::endl;
899 ceph_abort();
900 }
901 last_acked_tid = tid;
902
903 ceph_assert(!done);
904 waiting_on--;
905 if (waiting_on == 0) {
906 uint64_t version = 0;
907 for (set<librados::AioCompletion *>::iterator i = waiting.begin();
908 i != waiting.end();
909 ) {
910 ceph_assert((*i)->is_complete());
911 if (int err = (*i)->get_return_value()) {
912 cerr << "Error: oid " << oid << " write returned error code "
913 << err << std::endl;
914 }
915 if ((*i)->get_version64() > version)
916 version = (*i)->get_version64();
917 (*i)->release();
918 waiting.erase(i++);
919 }
920
921 context->update_object_version(oid, version);
922 if (rcompletion->get_version64() != version) {
923 cerr << "Error: racing read on " << oid << " returned version "
924 << rcompletion->get_version64() << " rather than version "
925 << version << std::endl;
926 ceph_abort_msg("racing read got wrong version");
927 }
928
929 {
930 ObjectDesc old_value;
931 ceph_assert(context->find_object(oid, &old_value, -1));
932 if (old_value.deleted())
933 std::cout << num << ": left oid " << oid << " deleted" << std::endl;
934 else
935 std::cout << num << ": left oid " << oid << " "
936 << old_value.most_recent() << std::endl;
937 }
938
939 rcompletion->release();
940 context->oid_in_use.erase(oid);
941 context->oid_not_in_use.insert(oid);
942 context->kick();
943 done = true;
944 }
945 }
946
947 bool finished() override
948 {
949 return done;
950 }
951
952 string getType() override
953 {
954 return "WriteOp";
955 }
956 };
957
958 class WriteSameOp : public TestOp {
959 public:
960 string oid;
961 ContDesc cont;
962 set<librados::AioCompletion *> waiting;
963 librados::AioCompletion *rcompletion;
964 uint64_t waiting_on;
965 uint64_t last_acked_tid;
966
967 librados::ObjectReadOperation read_op;
968 librados::ObjectWriteOperation write_op;
969 bufferlist rbuffer;
970
971 WriteSameOp(int n,
972 RadosTestContext *context,
973 const string &oid,
974 TestOpStat *stat = 0)
975 : TestOp(n, context, stat),
976 oid(oid), rcompletion(NULL), waiting_on(0),
977 last_acked_tid(0)
978 {}
979
980 void _begin() override
981 {
982 std::lock_guard state_locker{context->state_lock};
983 done = 0;
984 stringstream acc;
985 acc << context->prefix << "OID: " << oid << " snap " << context->current_snap << std::endl;
986 string prefix = acc.str();
987
988 cont = ContDesc(context->seq_num, context->current_snap, context->seq_num, prefix);
989
990 ContentsGenerator *cont_gen;
991 cont_gen = new VarLenGenerator(
992 context->max_size, context->min_stride_size, context->max_stride_size);
993 context->update_object(cont_gen, oid, cont);
994
995 context->oid_in_use.insert(oid);
996 context->oid_not_in_use.erase(oid);
997
998 map<uint64_t, uint64_t> ranges;
999
1000 cont_gen->get_ranges_map(cont, ranges);
1001 std::cout << num << ": seq_num " << context->seq_num << " ranges " << ranges << std::endl;
1002 context->seq_num++;
1003
1004 waiting_on = ranges.size();
1005 ContentsGenerator::iterator gen_pos = cont_gen->get_iterator(cont);
1006 // assure that tid is greater than last_acked_tid
1007 uint64_t tid = last_acked_tid + 1;
1008 for (auto [offset, len] : ranges) {
1009 gen_pos.seek(offset);
1010 bufferlist to_write = gen_pos.gen_bl_advance(len);
1011 ceph_assert(to_write.length() == len);
1012 ceph_assert(to_write.length() > 0);
1013 std::cout << num << ": writing " << context->prefix+oid
1014 << " from " << offset
1015 << " to " << offset + len << " tid " << tid << std::endl;
1016 auto cb_arg =
1017 new pair<TestOp*, TestOp::CallbackInfo*>(this,
1018 new TestOp::CallbackInfo(tid++));
1019 librados::AioCompletion *completion =
1020 context->rados.aio_create_completion((void*) cb_arg,
1021 &write_callback);
1022 waiting.insert(completion);
1023 librados::ObjectWriteOperation op;
1024 /* no writesame multiplication factor for now */
1025 op.writesame(offset, to_write.length(), to_write);
1026
1027 context->io_ctx.aio_operate(
1028 context->prefix+oid, completion,
1029 &op);
1030 }
1031
1032 bufferlist contbl;
1033 encode(cont, contbl);
1034 pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
1035 new pair<TestOp*, TestOp::CallbackInfo*>(
1036 this,
1037 new TestOp::CallbackInfo(tid++));
1038 librados::AioCompletion *completion = context->rados.aio_create_completion(
1039 (void*) cb_arg, &write_callback);
1040 waiting.insert(completion);
1041 waiting_on++;
1042 write_op.setxattr("_header", contbl);
1043 write_op.truncate(cont_gen->get_length(cont));
1044 context->io_ctx.aio_operate(
1045 context->prefix+oid, completion, &write_op);
1046
1047 cb_arg =
1048 new pair<TestOp*, TestOp::CallbackInfo*>(
1049 this,
1050 new TestOp::CallbackInfo(tid++));
1051 rcompletion = context->rados.aio_create_completion(
1052 (void*) cb_arg, &write_callback);
1053 waiting_on++;
1054 read_op.read(0, 1, &rbuffer, 0);
1055 context->io_ctx.aio_operate(
1056 context->prefix+oid, rcompletion,
1057 &read_op,
1058 librados::OPERATION_ORDER_READS_WRITES, // order wrt previous write/update
1059 0);
1060 }
1061
1062 void _finish(CallbackInfo *info) override
1063 {
1064 ceph_assert(info);
1065 std::lock_guard state_locker{context->state_lock};
1066 uint64_t tid = info->id;
1067
1068 cout << num << ": finishing writesame tid " << tid << " to " << context->prefix + oid << std::endl;
1069
1070 if (tid <= last_acked_tid) {
1071 cerr << "Error: finished tid " << tid
1072 << " when last_acked_tid was " << last_acked_tid << std::endl;
1073 ceph_abort();
1074 }
1075 last_acked_tid = tid;
1076
1077 ceph_assert(!done);
1078 waiting_on--;
1079 if (waiting_on == 0) {
1080 uint64_t version = 0;
1081 for (set<librados::AioCompletion *>::iterator i = waiting.begin();
1082 i != waiting.end();
1083 ) {
1084 ceph_assert((*i)->is_complete());
1085 if (int err = (*i)->get_return_value()) {
1086 cerr << "Error: oid " << oid << " writesame returned error code "
1087 << err << std::endl;
1088 }
1089 if ((*i)->get_version64() > version)
1090 version = (*i)->get_version64();
1091 (*i)->release();
1092 waiting.erase(i++);
1093 }
1094
1095 context->update_object_version(oid, version);
1096 ceph_assert(rcompletion->is_complete());
1097 ceph_assert(rcompletion->get_return_value() == 1);
1098 if (rcompletion->get_version64() != version) {
1099 cerr << "Error: racing read on " << oid << " returned version "
1100 << rcompletion->get_version64() << " rather than version "
1101 << version << std::endl;
1102 ceph_abort_msg("racing read got wrong version");
1103 }
1104 rcompletion->release();
1105
1106 {
1107 ObjectDesc old_value;
1108 ceph_assert(context->find_object(oid, &old_value, -1));
1109 if (old_value.deleted())
1110 std::cout << num << ": left oid " << oid << " deleted" << std::endl;
1111 else
1112 std::cout << num << ": left oid " << oid << " "
1113 << old_value.most_recent() << std::endl;
1114 }
1115
1116 context->oid_in_use.erase(oid);
1117 context->oid_not_in_use.insert(oid);
1118 context->kick();
1119 done = true;
1120 }
1121 }
1122
1123 bool finished() override
1124 {
1125 return done;
1126 }
1127
1128 string getType() override
1129 {
1130 return "WriteSameOp";
1131 }
1132 };
1133
1134 class DeleteOp : public TestOp {
1135 public:
1136 string oid;
1137
1138 DeleteOp(int n,
1139 RadosTestContext *context,
1140 const string &oid,
1141 TestOpStat *stat = 0)
1142 : TestOp(n, context, stat), oid(oid)
1143 {}
1144
1145 void _begin() override
1146 {
1147 std::unique_lock state_locker{context->state_lock};
1148 if (context->get_watch_context(oid)) {
1149 context->kick();
1150 return;
1151 }
1152
1153 ObjectDesc contents;
1154 context->find_object(oid, &contents);
1155 bool present = !contents.deleted();
1156
1157 context->oid_in_use.insert(oid);
1158 context->oid_not_in_use.erase(oid);
1159 context->seq_num++;
1160
1161 context->remove_object(oid);
1162
1163 interval_set<uint64_t> ranges;
1164 state_locker.unlock();
1165
1166 int r = 0;
1167 if (rand() % 2) {
1168 librados::ObjectWriteOperation op;
1169 op.assert_exists();
1170 op.remove();
1171 r = context->io_ctx.operate(context->prefix+oid, &op);
1172 } else {
1173 r = context->io_ctx.remove(context->prefix+oid);
1174 }
1175 if (r && !(r == -ENOENT && !present)) {
1176 cerr << "r is " << r << " while deleting " << oid << " and present is " << present << std::endl;
1177 ceph_abort();
1178 }
1179
1180 state_locker.lock();
1181 context->oid_in_use.erase(oid);
1182 context->oid_not_in_use.insert(oid);
1183 context->kick();
1184 }
1185
1186 string getType() override
1187 {
1188 return "DeleteOp";
1189 }
1190 };
1191
1192 class ReadOp : public TestOp {
1193 public:
1194 vector<librados::AioCompletion *> completions;
1195 librados::ObjectReadOperation op;
1196 string oid;
1197 ObjectDesc old_value;
1198 int snap;
1199 bool balance_reads;
1200 bool localize_reads;
1201
1202 std::shared_ptr<int> in_use;
1203
1204 vector<bufferlist> results;
1205 vector<int> retvals;
1206 vector<std::map<uint64_t, uint64_t>> extent_results;
1207 vector<bool> is_sparse_read;
1208 uint64_t waiting_on;
1209
1210 vector<bufferlist> checksums;
1211 vector<int> checksum_retvals;
1212
1213 map<string, bufferlist> attrs;
1214 int attrretval;
1215
1216 set<string> omap_requested_keys;
1217 map<string, bufferlist> omap_returned_values;
1218 set<string> omap_keys;
1219 map<string, bufferlist> omap;
1220 bufferlist header;
1221
1222 map<string, bufferlist> xattrs;
1223 ReadOp(int n,
1224 RadosTestContext *context,
1225 const string &oid,
1226 bool balance_reads,
1227 bool localize_reads,
1228 TestOpStat *stat = 0)
1229 : TestOp(n, context, stat),
1230 completions(3),
1231 oid(oid),
1232 snap(0),
1233 balance_reads(balance_reads),
1234 localize_reads(localize_reads),
1235 results(3),
1236 retvals(3),
1237 extent_results(3),
1238 is_sparse_read(3, false),
1239 waiting_on(0),
1240 checksums(3),
1241 checksum_retvals(3),
1242 attrretval(0)
1243 {}
1244
1245 void _do_read(librados::ObjectReadOperation& read_op, int index) {
1246 uint64_t len = 0;
1247 if (old_value.has_contents())
1248 len = old_value.most_recent_gen()->get_length(old_value.most_recent());
1249 if (context->no_sparse || rand() % 2) {
1250 is_sparse_read[index] = false;
1251 read_op.read(0,
1252 len,
1253 &results[index],
1254 &retvals[index]);
1255 bufferlist init_value_bl;
1256 encode(static_cast<uint32_t>(-1), init_value_bl);
1257 read_op.checksum(LIBRADOS_CHECKSUM_TYPE_CRC32C, init_value_bl, 0, len,
1258 0, &checksums[index], &checksum_retvals[index]);
1259 } else {
1260 is_sparse_read[index] = true;
1261 read_op.sparse_read(0,
1262 len,
1263 &extent_results[index],
1264 &results[index],
1265 &retvals[index]);
1266 }
1267 }
1268
1269 void _begin() override
1270 {
1271 std::unique_lock state_locker{context->state_lock};
1272 if (!(rand() % 4) && !context->snaps.empty()) {
1273 snap = rand_choose(context->snaps)->first;
1274 in_use = context->snaps_in_use.lookup_or_create(snap, snap);
1275 } else {
1276 snap = -1;
1277 }
1278 std::cout << num << ": read oid " << oid << " snap " << snap << std::endl;
1279 done = 0;
1280 for (uint32_t i = 0; i < 3; i++) {
1281 completions[i] = context->rados.aio_create_completion((void *) this, &read_callback);
1282 }
1283
1284 context->oid_in_use.insert(oid);
1285 context->oid_not_in_use.erase(oid);
1286 ceph_assert(context->find_object(oid, &old_value, snap));
1287 if (old_value.deleted())
1288 std::cout << num << ": expect deleted" << std::endl;
1289 else
1290 std::cout << num << ": expect " << old_value.most_recent() << std::endl;
1291
1292 TestWatchContext *ctx = context->get_watch_context(oid);
1293 state_locker.unlock();
1294 if (ctx) {
1295 ceph_assert(old_value.exists);
1296 TestAlarm alarm;
1297 std::cerr << num << ": about to start" << std::endl;
1298 ctx->start();
1299 std::cerr << num << ": started" << std::endl;
1300 bufferlist bl;
1301 context->io_ctx.set_notify_timeout(600);
1302 int r = context->io_ctx.notify2(context->prefix+oid, bl, 0, NULL);
1303 if (r < 0) {
1304 std::cerr << "r is " << r << std::endl;
1305 ceph_abort();
1306 }
1307 std::cerr << num << ": notified, waiting" << std::endl;
1308 ctx->wait();
1309 }
1310 state_locker.lock();
1311 if (snap >= 0) {
1312 context->io_ctx.snap_set_read(context->snaps[snap]);
1313 }
1314 _do_read(op, 0);
1315 for (map<string, ContDesc>::iterator i = old_value.attrs.begin();
1316 i != old_value.attrs.end();
1317 ++i) {
1318 if (rand() % 2) {
1319 string key = i->first;
1320 if (rand() % 2)
1321 key.push_back((rand() % 26) + 'a');
1322 omap_requested_keys.insert(key);
1323 }
1324 }
1325 if (!context->no_omap) {
1326 op.omap_get_vals_by_keys(omap_requested_keys, &omap_returned_values, 0);
1327 // NOTE: we're ignore pmore here, which assumes the OSD limit is high
1328 // enough for us.
1329 op.omap_get_keys2("", -1, &omap_keys, nullptr, nullptr);
1330 op.omap_get_vals2("", -1, &omap, nullptr, nullptr);
1331 op.omap_get_header(&header, 0);
1332 }
1333 op.getxattrs(&xattrs, 0);
1334
1335 unsigned flags = 0;
1336 if (balance_reads)
1337 flags |= librados::OPERATION_BALANCE_READS;
1338 if (localize_reads)
1339 flags |= librados::OPERATION_LOCALIZE_READS;
1340
1341 ceph_assert(!context->io_ctx.aio_operate(context->prefix+oid, completions[0], &op,
1342 flags, NULL));
1343 waiting_on++;
1344
1345 // send 2 pipelined reads on the same object/snap. This can help testing
1346 // OSD's read behavior in some scenarios
1347 for (uint32_t i = 1; i < 3; ++i) {
1348 librados::ObjectReadOperation pipeline_op;
1349 _do_read(pipeline_op, i);
1350 ceph_assert(!context->io_ctx.aio_operate(context->prefix+oid, completions[i], &pipeline_op, 0));
1351 waiting_on++;
1352 }
1353
1354 if (snap >= 0) {
1355 context->io_ctx.snap_set_read(0);
1356 }
1357 }
1358
1359 void _finish(CallbackInfo *info) override
1360 {
1361 std::unique_lock state_locker{context->state_lock};
1362 ceph_assert(!done);
1363 ceph_assert(waiting_on > 0);
1364 if (--waiting_on) {
1365 return;
1366 }
1367
1368 context->oid_in_use.erase(oid);
1369 context->oid_not_in_use.insert(oid);
1370 int retval = completions[0]->get_return_value();
1371 for (vector<librados::AioCompletion *>::iterator it = completions.begin();
1372 it != completions.end(); ++it) {
1373 ceph_assert((*it)->is_complete());
1374 uint64_t version = (*it)->get_version64();
1375 int err = (*it)->get_return_value();
1376 if (err != retval) {
1377 cerr << num << ": Error: oid " << oid << " read returned different error codes: "
1378 << retval << " and " << err << std::endl;
1379 ceph_abort();
1380 }
1381 if (err) {
1382 if (!(err == -ENOENT && old_value.deleted())) {
1383 cerr << num << ": Error: oid " << oid << " read returned error code "
1384 << err << std::endl;
1385 ceph_abort();
1386 }
1387 } else if (version != old_value.version) {
1388 cerr << num << ": oid " << oid << " version is " << version
1389 << " and expected " << old_value.version << std::endl;
1390 ceph_assert(version == old_value.version);
1391 }
1392 }
1393 if (!retval) {
1394 map<string, bufferlist>::iterator iter = xattrs.find("_header");
1395 bufferlist headerbl;
1396 if (iter == xattrs.end()) {
1397 if (old_value.has_contents()) {
1398 cerr << num << ": Error: did not find header attr, has_contents: "
1399 << old_value.has_contents()
1400 << std::endl;
1401 ceph_assert(!old_value.has_contents());
1402 }
1403 } else {
1404 headerbl = iter->second;
1405 xattrs.erase(iter);
1406 }
1407 if (old_value.deleted()) {
1408 std::cout << num << ": expect deleted" << std::endl;
1409 ceph_abort_msg("expected deleted");
1410 } else {
1411 std::cout << num << ": expect " << old_value.most_recent() << std::endl;
1412 }
1413 if (old_value.has_contents()) {
1414 ContDesc to_check;
1415 auto p = headerbl.cbegin();
1416 decode(to_check, p);
1417 if (to_check != old_value.most_recent()) {
1418 cerr << num << ": oid " << oid << " found incorrect object contents " << to_check
1419 << ", expected " << old_value.most_recent() << std::endl;
1420 context->errors++;
1421 }
1422 for (unsigned i = 0; i < results.size(); i++) {
1423 if (is_sparse_read[i]) {
1424 if (!old_value.check_sparse(extent_results[i], results[i])) {
1425 cerr << num << ": oid " << oid << " contents " << to_check << " corrupt" << std::endl;
1426 context->errors++;
1427 }
1428 } else {
1429 if (!old_value.check(results[i])) {
1430 cerr << num << ": oid " << oid << " contents " << to_check << " corrupt" << std::endl;
1431 context->errors++;
1432 }
1433
1434 uint32_t checksum = 0;
1435 if (checksum_retvals[i] == 0) {
1436 try {
1437 auto bl_it = checksums[i].cbegin();
1438 uint32_t csum_count;
1439 decode(csum_count, bl_it);
1440 decode(checksum, bl_it);
1441 } catch (const buffer::error &err) {
1442 checksum_retvals[i] = -EBADMSG;
1443 }
1444 }
1445 if (checksum_retvals[i] != 0 || checksum != results[i].crc32c(-1)) {
1446 cerr << num << ": oid " << oid << " checksum " << checksums[i]
1447 << " incorrect, expecting " << results[i].crc32c(-1)
1448 << std::endl;
1449 context->errors++;
1450 }
1451 }
1452 }
1453 if (context->errors) ceph_abort();
1454 }
1455
1456 // Attributes
1457 if (!context->no_omap) {
1458 if (!(old_value.header == header)) {
1459 cerr << num << ": oid " << oid << " header does not match, old size: "
1460 << old_value.header.length() << " new size " << header.length()
1461 << std::endl;
1462 ceph_assert(old_value.header == header);
1463 }
1464 if (omap.size() != old_value.attrs.size()) {
1465 cerr << num << ": oid " << oid << " omap.size() is " << omap.size()
1466 << " and old is " << old_value.attrs.size() << std::endl;
1467 ceph_assert(omap.size() == old_value.attrs.size());
1468 }
1469 if (omap_keys.size() != old_value.attrs.size()) {
1470 cerr << num << ": oid " << oid << " omap.size() is " << omap_keys.size()
1471 << " and old is " << old_value.attrs.size() << std::endl;
1472 ceph_assert(omap_keys.size() == old_value.attrs.size());
1473 }
1474 }
1475 if (xattrs.size() != old_value.attrs.size()) {
1476 cerr << num << ": oid " << oid << " xattrs.size() is " << xattrs.size()
1477 << " and old is " << old_value.attrs.size() << std::endl;
1478 ceph_assert(xattrs.size() == old_value.attrs.size());
1479 }
1480 for (map<string, ContDesc>::iterator iter = old_value.attrs.begin();
1481 iter != old_value.attrs.end();
1482 ++iter) {
1483 bufferlist bl = context->attr_gen.gen_bl(
1484 iter->second);
1485 if (!context->no_omap) {
1486 map<string, bufferlist>::iterator omap_iter = omap.find(iter->first);
1487 ceph_assert(omap_iter != omap.end());
1488 ceph_assert(bl.length() == omap_iter->second.length());
1489 bufferlist::iterator k = bl.begin();
1490 for(bufferlist::iterator l = omap_iter->second.begin();
1491 !k.end() && !l.end();
1492 ++k, ++l) {
1493 ceph_assert(*l == *k);
1494 }
1495 }
1496 map<string, bufferlist>::iterator xattr_iter = xattrs.find(iter->first);
1497 ceph_assert(xattr_iter != xattrs.end());
1498 ceph_assert(bl.length() == xattr_iter->second.length());
1499 bufferlist::iterator k = bl.begin();
1500 for (bufferlist::iterator j = xattr_iter->second.begin();
1501 !k.end() && !j.end();
1502 ++j, ++k) {
1503 ceph_assert(*j == *k);
1504 }
1505 }
1506 if (!context->no_omap) {
1507 for (set<string>::iterator i = omap_requested_keys.begin();
1508 i != omap_requested_keys.end();
1509 ++i) {
1510 if (!omap_returned_values.count(*i))
1511 ceph_assert(!old_value.attrs.count(*i));
1512 if (!old_value.attrs.count(*i))
1513 ceph_assert(!omap_returned_values.count(*i));
1514 }
1515 for (map<string, bufferlist>::iterator i = omap_returned_values.begin();
1516 i != omap_returned_values.end();
1517 ++i) {
1518 ceph_assert(omap_requested_keys.count(i->first));
1519 ceph_assert(omap.count(i->first));
1520 ceph_assert(old_value.attrs.count(i->first));
1521 ceph_assert(i->second == omap[i->first]);
1522 }
1523 }
1524 }
1525 for (vector<librados::AioCompletion *>::iterator it = completions.begin();
1526 it != completions.end(); ++it) {
1527 (*it)->release();
1528 }
1529 context->kick();
1530 done = true;
1531 }
1532
1533 bool finished() override
1534 {
1535 return done;
1536 }
1537
1538 string getType() override
1539 {
1540 return "ReadOp";
1541 }
1542 };
1543
1544 class SnapCreateOp : public TestOp {
1545 public:
1546 SnapCreateOp(int n,
1547 RadosTestContext *context,
1548 TestOpStat *stat = 0)
1549 : TestOp(n, context, stat)
1550 {}
1551
1552 void _begin() override
1553 {
1554 uint64_t snap;
1555 string snapname;
1556
1557 if (context->pool_snaps) {
1558 stringstream ss;
1559
1560 ss << context->prefix << "snap" << ++context->snapname_num;
1561 snapname = ss.str();
1562
1563 int ret = context->io_ctx.snap_create(snapname.c_str());
1564 if (ret) {
1565 cerr << "snap_create returned " << ret << std::endl;
1566 ceph_abort();
1567 }
1568 ceph_assert(!context->io_ctx.snap_lookup(snapname.c_str(), &snap));
1569
1570 } else {
1571 ceph_assert(!context->io_ctx.selfmanaged_snap_create(&snap));
1572 }
1573
1574 std::unique_lock state_locker{context->state_lock};
1575 context->add_snap(snap);
1576
1577 if (!context->pool_snaps) {
1578 vector<uint64_t> snapset(context->snaps.size());
1579
1580 int j = 0;
1581 for (map<int,uint64_t>::reverse_iterator i = context->snaps.rbegin();
1582 i != context->snaps.rend();
1583 ++i, ++j) {
1584 snapset[j] = i->second;
1585 }
1586
1587 state_locker.unlock();
1588
1589 int r = context->io_ctx.selfmanaged_snap_set_write_ctx(context->seq, snapset);
1590 if (r) {
1591 cerr << "r is " << r << " snapset is " << snapset << " seq is " << context->seq << std::endl;
1592 ceph_abort();
1593 }
1594 }
1595 }
1596
1597 string getType() override
1598 {
1599 return "SnapCreateOp";
1600 }
1601 bool must_quiesce_other_ops() override { return context->pool_snaps; }
1602 };
1603
1604 class SnapRemoveOp : public TestOp {
1605 public:
1606 int to_remove;
1607 SnapRemoveOp(int n, RadosTestContext *context,
1608 int snap,
1609 TestOpStat *stat = 0)
1610 : TestOp(n, context, stat),
1611 to_remove(snap)
1612 {}
1613
1614 void _begin() override
1615 {
1616 std::unique_lock state_locker{context->state_lock};
1617 uint64_t snap = context->snaps[to_remove];
1618 context->remove_snap(to_remove);
1619
1620 if (context->pool_snaps) {
1621 string snapname;
1622
1623 ceph_assert(!context->io_ctx.snap_get_name(snap, &snapname));
1624 ceph_assert(!context->io_ctx.snap_remove(snapname.c_str()));
1625 } else {
1626 ceph_assert(!context->io_ctx.selfmanaged_snap_remove(snap));
1627
1628 vector<uint64_t> snapset(context->snaps.size());
1629 int j = 0;
1630 for (map<int,uint64_t>::reverse_iterator i = context->snaps.rbegin();
1631 i != context->snaps.rend();
1632 ++i, ++j) {
1633 snapset[j] = i->second;
1634 }
1635
1636 int r = context->io_ctx.selfmanaged_snap_set_write_ctx(context->seq, snapset);
1637 if (r) {
1638 cerr << "r is " << r << " snapset is " << snapset << " seq is " << context->seq << std::endl;
1639 ceph_abort();
1640 }
1641 }
1642 }
1643
1644 string getType() override
1645 {
1646 return "SnapRemoveOp";
1647 }
1648 };
1649
1650 class WatchOp : public TestOp {
1651 string oid;
1652 public:
1653 WatchOp(int n,
1654 RadosTestContext *context,
1655 const string &_oid,
1656 TestOpStat *stat = 0)
1657 : TestOp(n, context, stat),
1658 oid(_oid)
1659 {}
1660
1661 void _begin() override
1662 {
1663 std::unique_lock state_locker{context->state_lock};
1664 ObjectDesc contents;
1665 context->find_object(oid, &contents);
1666 if (contents.deleted()) {
1667 context->kick();
1668 return;
1669 }
1670 context->oid_in_use.insert(oid);
1671 context->oid_not_in_use.erase(oid);
1672
1673 TestWatchContext *ctx = context->get_watch_context(oid);
1674 state_locker.unlock();
1675 int r;
1676 if (!ctx) {
1677 {
1678 std::lock_guard l{context->state_lock};
1679 ctx = context->watch(oid);
1680 }
1681
1682 r = context->io_ctx.watch2(context->prefix+oid,
1683 &ctx->get_handle(),
1684 ctx);
1685 } else {
1686 r = context->io_ctx.unwatch2(ctx->get_handle());
1687 {
1688 std::lock_guard l{context->state_lock};
1689 context->unwatch(oid);
1690 }
1691 }
1692
1693 if (r) {
1694 cerr << "r is " << r << std::endl;
1695 ceph_abort();
1696 }
1697
1698 {
1699 std::lock_guard l{context->state_lock};
1700 context->oid_in_use.erase(oid);
1701 context->oid_not_in_use.insert(oid);
1702 }
1703 }
1704
1705 string getType() override
1706 {
1707 return "WatchOp";
1708 }
1709 };
1710
1711 class RollbackOp : public TestOp {
1712 public:
1713 string oid;
1714 int roll_back_to;
1715 librados::ObjectWriteOperation zero_write_op1;
1716 librados::ObjectWriteOperation zero_write_op2;
1717 librados::ObjectWriteOperation op;
1718 vector<librados::AioCompletion *> comps;
1719 std::shared_ptr<int> in_use;
1720 int last_finished;
1721 int outstanding;
1722
1723 RollbackOp(int n,
1724 RadosTestContext *context,
1725 const string &_oid,
1726 TestOpStat *stat = 0)
1727 : TestOp(n, context, stat),
1728 oid(_oid), roll_back_to(-1),
1729 comps(3, NULL),
1730 last_finished(-1), outstanding(3)
1731 {}
1732
1733 void _begin() override
1734 {
1735 context->state_lock.lock();
1736 if (context->get_watch_context(oid)) {
1737 context->kick();
1738 context->state_lock.unlock();
1739 return;
1740 }
1741
1742 if (context->snaps.empty()) {
1743 context->kick();
1744 context->state_lock.unlock();
1745 done = true;
1746 return;
1747 }
1748
1749 context->oid_in_use.insert(oid);
1750 context->oid_not_in_use.erase(oid);
1751
1752 roll_back_to = rand_choose(context->snaps)->first;
1753 in_use = context->snaps_in_use.lookup_or_create(
1754 roll_back_to,
1755 roll_back_to);
1756
1757
1758 cout << "rollback oid " << oid << " to " << roll_back_to << std::endl;
1759
1760 bool existed_before = context->object_existed_at(oid);
1761 bool existed_after = context->object_existed_at(oid, roll_back_to);
1762
1763 context->roll_back(oid, roll_back_to);
1764 uint64_t snap = context->snaps[roll_back_to];
1765
1766 outstanding -= (!existed_before) + (!existed_after);
1767
1768 context->state_lock.unlock();
1769
1770 bufferlist bl, bl2;
1771 zero_write_op1.append(bl);
1772 zero_write_op2.append(bl2);
1773
1774 if (context->pool_snaps) {
1775 op.snap_rollback(snap);
1776 } else {
1777 op.selfmanaged_snap_rollback(snap);
1778 }
1779
1780 if (existed_before) {
1781 pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
1782 new pair<TestOp*, TestOp::CallbackInfo*>(this,
1783 new TestOp::CallbackInfo(0));
1784 comps[0] =
1785 context->rados.aio_create_completion((void*) cb_arg,
1786 &write_callback);
1787 context->io_ctx.aio_operate(
1788 context->prefix+oid, comps[0], &zero_write_op1);
1789 }
1790 {
1791 pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
1792 new pair<TestOp*, TestOp::CallbackInfo*>(this,
1793 new TestOp::CallbackInfo(1));
1794 comps[1] =
1795 context->rados.aio_create_completion((void*) cb_arg,
1796 &write_callback);
1797 context->io_ctx.aio_operate(
1798 context->prefix+oid, comps[1], &op);
1799 }
1800 if (existed_after) {
1801 pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
1802 new pair<TestOp*, TestOp::CallbackInfo*>(this,
1803 new TestOp::CallbackInfo(2));
1804 comps[2] =
1805 context->rados.aio_create_completion((void*) cb_arg,
1806 &write_callback);
1807 context->io_ctx.aio_operate(
1808 context->prefix+oid, comps[2], &zero_write_op2);
1809 }
1810 }
1811
1812 void _finish(CallbackInfo *info) override
1813 {
1814 std::lock_guard l{context->state_lock};
1815 uint64_t tid = info->id;
1816 cout << num << ": finishing rollback tid " << tid
1817 << " to " << context->prefix + oid << std::endl;
1818 ceph_assert((int)(info->id) > last_finished);
1819 last_finished = info->id;
1820
1821 int r;
1822 if ((r = comps[last_finished]->get_return_value()) != 0) {
1823 cerr << "err " << r << std::endl;
1824 ceph_abort();
1825 }
1826 if (--outstanding == 0) {
1827 done = true;
1828 context->update_object_version(oid, comps[tid]->get_version64());
1829 context->oid_in_use.erase(oid);
1830 context->oid_not_in_use.insert(oid);
1831 in_use = std::shared_ptr<int>();
1832 context->kick();
1833 }
1834 }
1835
1836 bool finished() override
1837 {
1838 return done;
1839 }
1840
1841 string getType() override
1842 {
1843 return "RollBackOp";
1844 }
1845 };
1846
1847 class CopyFromOp : public TestOp {
1848 public:
1849 string oid, oid_src;
1850 ObjectDesc src_value;
1851 librados::ObjectWriteOperation op;
1852 librados::ObjectReadOperation rd_op;
1853 librados::AioCompletion *comp;
1854 librados::AioCompletion *comp_racing_read = nullptr;
1855 std::shared_ptr<int> in_use;
1856 int snap;
1857 int done;
1858 uint64_t version;
1859 int r;
1860 CopyFromOp(int n,
1861 RadosTestContext *context,
1862 const string &oid,
1863 const string &oid_src,
1864 TestOpStat *stat)
1865 : TestOp(n, context, stat),
1866 oid(oid), oid_src(oid_src),
1867 comp(NULL), snap(-1), done(0),
1868 version(0), r(0)
1869 {}
1870
1871 void _begin() override
1872 {
1873 ContDesc cont;
1874 {
1875 std::lock_guard l{context->state_lock};
1876 cont = ContDesc(context->seq_num, context->current_snap,
1877 context->seq_num, "");
1878 context->oid_in_use.insert(oid);
1879 context->oid_not_in_use.erase(oid);
1880 context->oid_in_use.insert(oid_src);
1881 context->oid_not_in_use.erase(oid_src);
1882
1883 // choose source snap
1884 if (0 && !(rand() % 4) && !context->snaps.empty()) {
1885 snap = rand_choose(context->snaps)->first;
1886 in_use = context->snaps_in_use.lookup_or_create(snap, snap);
1887 } else {
1888 snap = -1;
1889 }
1890 context->find_object(oid_src, &src_value, snap);
1891 if (!src_value.deleted())
1892 context->update_object_full(oid, src_value);
1893 }
1894
1895 string src = context->prefix+oid_src;
1896 op.copy_from(src.c_str(), context->io_ctx, src_value.version, 0);
1897
1898 pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
1899 new pair<TestOp*, TestOp::CallbackInfo*>(this,
1900 new TestOp::CallbackInfo(0));
1901 comp = context->rados.aio_create_completion((void*) cb_arg,
1902 &write_callback);
1903 context->io_ctx.aio_operate(context->prefix+oid, comp, &op);
1904
1905 // queue up a racing read, too.
1906 pair<TestOp*, TestOp::CallbackInfo*> *read_cb_arg =
1907 new pair<TestOp*, TestOp::CallbackInfo*>(this,
1908 new TestOp::CallbackInfo(1));
1909 comp_racing_read = context->rados.aio_create_completion((void*) read_cb_arg, &write_callback);
1910 rd_op.stat(NULL, NULL, NULL);
1911 context->io_ctx.aio_operate(context->prefix+oid, comp_racing_read, &rd_op,
1912 librados::OPERATION_ORDER_READS_WRITES, // order wrt previous write/update
1913 NULL);
1914
1915 }
1916
1917 void _finish(CallbackInfo *info) override
1918 {
1919 std::lock_guard l{context->state_lock};
1920
1921 // note that the read can (and atm will) come back before the
1922 // write reply, but will reflect the update and the versions will
1923 // match.
1924
1925 if (info->id == 0) {
1926 // copy_from
1927 ceph_assert(comp->is_complete());
1928 cout << num << ": finishing copy_from to " << context->prefix + oid << std::endl;
1929 if ((r = comp->get_return_value())) {
1930 if (r == -ENOENT && src_value.deleted()) {
1931 cout << num << ": got expected ENOENT (src dne)" << std::endl;
1932 } else {
1933 cerr << "Error: oid " << oid << " copy_from " << oid_src << " returned error code "
1934 << r << std::endl;
1935 ceph_abort();
1936 }
1937 } else {
1938 ceph_assert(!version || comp->get_version64() == version);
1939 version = comp->get_version64();
1940 context->update_object_version(oid, comp->get_version64());
1941 }
1942 } else if (info->id == 1) {
1943 // racing read
1944 ceph_assert(comp_racing_read->is_complete());
1945 cout << num << ": finishing copy_from racing read to " << context->prefix + oid << std::endl;
1946 if ((r = comp_racing_read->get_return_value())) {
1947 if (!(r == -ENOENT && src_value.deleted())) {
1948 cerr << "Error: oid " << oid << " copy_from " << oid_src << " returned error code "
1949 << r << std::endl;
1950 }
1951 } else {
1952 ceph_assert(comp_racing_read->get_return_value() == 0);
1953 ceph_assert(!version || comp_racing_read->get_version64() == version);
1954 version = comp_racing_read->get_version64();
1955 }
1956 }
1957 if (++done == 2) {
1958 context->oid_in_use.erase(oid);
1959 context->oid_not_in_use.insert(oid);
1960 context->oid_in_use.erase(oid_src);
1961 context->oid_not_in_use.insert(oid_src);
1962 context->kick();
1963 }
1964 }
1965
1966 bool finished() override
1967 {
1968 return done == 2;
1969 }
1970
1971 string getType() override
1972 {
1973 return "CopyFromOp";
1974 }
1975 };
1976
1977 class ChunkReadOp : public TestOp {
1978 public:
1979 vector<librados::AioCompletion *> completions;
1980 librados::ObjectReadOperation op;
1981 string oid;
1982 ObjectDesc old_value;
1983 ObjectDesc tgt_value;
1984 int snap;
1985 bool balance_reads;
1986 bool localize_reads;
1987
1988 std::shared_ptr<int> in_use;
1989
1990 vector<bufferlist> results;
1991 vector<int> retvals;
1992 vector<bool> is_sparse_read;
1993 uint64_t waiting_on;
1994
1995 vector<bufferlist> checksums;
1996 vector<int> checksum_retvals;
1997 uint32_t offset = 0;
1998 uint32_t length = 0;
1999 string tgt_oid;
2000 string tgt_pool_name;
2001 uint32_t tgt_offset = 0;
2002
2003 ChunkReadOp(int n,
2004 RadosTestContext *context,
2005 const string &oid,
2006 const string &tgt_pool_name,
2007 bool balance_reads,
2008 bool localize_reads,
2009 TestOpStat *stat = 0)
2010 : TestOp(n, context, stat),
2011 completions(2),
2012 oid(oid),
2013 snap(0),
2014 balance_reads(balance_reads),
2015 localize_reads(localize_reads),
2016 results(2),
2017 retvals(2),
2018 waiting_on(0),
2019 checksums(2),
2020 checksum_retvals(2),
2021 tgt_pool_name(tgt_pool_name)
2022 {}
2023
2024 void _do_read(librados::ObjectReadOperation& read_op, uint32_t offset, uint32_t length, int index) {
2025 read_op.read(offset,
2026 length,
2027 &results[index],
2028 &retvals[index]);
2029 if (index != 0) {
2030 bufferlist init_value_bl;
2031 encode(static_cast<uint32_t>(-1), init_value_bl);
2032 read_op.checksum(LIBRADOS_CHECKSUM_TYPE_CRC32C, init_value_bl, offset, length,
2033 0, &checksums[index], &checksum_retvals[index]);
2034 }
2035
2036 }
2037
2038 void _begin() override
2039 {
2040 context->state_lock.lock();
2041 std::cout << num << ": chunk read oid " << oid << " snap " << snap << std::endl;
2042 done = 0;
2043 for (uint32_t i = 0; i < 2; i++) {
2044 completions[i] = context->rados.aio_create_completion((void *) this, &read_callback);
2045 }
2046
2047 context->find_object(oid, &old_value);
2048
2049 if (old_value.chunk_info.size() == 0) {
2050 std::cout << ": no chunks" << std::endl;
2051 context->kick();
2052 context->state_lock.unlock();
2053 done = true;
2054 return;
2055 }
2056
2057 context->oid_in_use.insert(oid);
2058 context->oid_not_in_use.erase(oid);
2059 if (old_value.deleted()) {
2060 std::cout << num << ": expect deleted" << std::endl;
2061 } else {
2062 std::cout << num << ": expect " << old_value.most_recent() << std::endl;
2063 }
2064
2065 int rand_index = rand() % old_value.chunk_info.size();
2066 auto iter = old_value.chunk_info.begin();
2067 for (int i = 0; i < rand_index; i++) {
2068 iter++;
2069 }
2070 offset = iter->first;
2071 offset += (rand() % iter->second.length)/2;
2072 uint32_t t_length = rand() % iter->second.length;
2073 while (t_length + offset > iter->first + iter->second.length) {
2074 t_length = rand() % iter->second.length;
2075 }
2076 length = t_length;
2077 tgt_offset = iter->second.offset + offset - iter->first;
2078 tgt_oid = iter->second.oid;
2079
2080 std::cout << num << ": ori offset " << iter->first << " req offset " << offset
2081 << " ori length " << iter->second.length << " req length " << length
2082 << " ori tgt_offset " << iter->second.offset << " req tgt_offset " << tgt_offset
2083 << " tgt_oid " << tgt_oid << std::endl;
2084
2085 TestWatchContext *ctx = context->get_watch_context(oid);
2086 context->state_lock.unlock();
2087 if (ctx) {
2088 ceph_assert(old_value.exists);
2089 TestAlarm alarm;
2090 std::cerr << num << ": about to start" << std::endl;
2091 ctx->start();
2092 std::cerr << num << ": started" << std::endl;
2093 bufferlist bl;
2094 context->io_ctx.set_notify_timeout(600);
2095 int r = context->io_ctx.notify2(context->prefix+oid, bl, 0, NULL);
2096 if (r < 0) {
2097 std::cerr << "r is " << r << std::endl;
2098 ceph_abort();
2099 }
2100 std::cerr << num << ": notified, waiting" << std::endl;
2101 ctx->wait();
2102 }
2103 std::lock_guard state_locker{context->state_lock};
2104
2105 _do_read(op, offset, length, 0);
2106
2107 unsigned flags = 0;
2108 if (balance_reads)
2109 flags |= librados::OPERATION_BALANCE_READS;
2110 if (localize_reads)
2111 flags |= librados::OPERATION_LOCALIZE_READS;
2112
2113 ceph_assert(!context->io_ctx.aio_operate(context->prefix+oid, completions[0], &op,
2114 flags, NULL));
2115 waiting_on++;
2116
2117 _do_read(op, tgt_offset, length, 1);
2118 ceph_assert(!context->io_ctx.aio_operate(context->prefix+tgt_oid, completions[1], &op,
2119 flags, NULL));
2120
2121 waiting_on++;
2122 }
2123
2124 void _finish(CallbackInfo *info) override
2125 {
2126 std::lock_guard l{context->state_lock};
2127 ceph_assert(!done);
2128 ceph_assert(waiting_on > 0);
2129 if (--waiting_on) {
2130 return;
2131 }
2132
2133 context->oid_in_use.erase(oid);
2134 context->oid_not_in_use.insert(oid);
2135 int retval = completions[0]->get_return_value();
2136 std::cout << ": finish!! ret: " << retval << std::endl;
2137 context->find_object(tgt_oid, &tgt_value);
2138
2139 for (int i = 0; i < 2; i++) {
2140 ceph_assert(completions[i]->is_complete());
2141 int err = completions[i]->get_return_value();
2142 if (err != retval) {
2143 cerr << num << ": Error: oid " << oid << " read returned different error codes: "
2144 << retval << " and " << err << std::endl;
2145 ceph_abort();
2146 }
2147 if (err) {
2148 if (!(err == -ENOENT && old_value.deleted())) {
2149 cerr << num << ": Error: oid " << oid << " read returned error code "
2150 << err << std::endl;
2151 ceph_abort();
2152 }
2153 }
2154 }
2155
2156 if (!retval) {
2157 if (old_value.deleted()) {
2158 std::cout << num << ": expect deleted" << std::endl;
2159 ceph_abort_msg("expected deleted");
2160 } else {
2161 std::cout << num << ": expect " << old_value.most_recent() << std::endl;
2162 }
2163 if (tgt_value.has_contents()) {
2164 uint32_t checksum[2] = {0};
2165 if (checksum_retvals[1] == 0) {
2166 try {
2167 auto bl_it = checksums[1].cbegin();
2168 uint32_t csum_count;
2169 decode(csum_count, bl_it);
2170 decode(checksum[1], bl_it);
2171 } catch (const buffer::error &err) {
2172 checksum_retvals[1] = -EBADMSG;
2173 }
2174 }
2175
2176 if (checksum_retvals[1] != 0) {
2177 cerr << num << ": oid " << oid << " checksum retvals " << checksums[0]
2178 << " error " << std::endl;
2179 context->errors++;
2180 }
2181
2182 checksum[0] = results[0].crc32c(-1);
2183
2184 if (checksum[0] != checksum[1]) {
2185 cerr << num << ": oid " << oid << " checksum src " << checksum[0]
2186 << " chunksum tgt " << checksum[1] << " incorrect, expecting "
2187 << results[0].crc32c(-1)
2188 << std::endl;
2189 context->errors++;
2190 }
2191 if (context->errors) ceph_abort();
2192 }
2193 }
2194 for (vector<librados::AioCompletion *>::iterator it = completions.begin();
2195 it != completions.end(); ++it) {
2196 (*it)->release();
2197 }
2198 context->kick();
2199 done = true;
2200 }
2201
2202 bool finished() override
2203 {
2204 return done;
2205 }
2206
2207 string getType() override
2208 {
2209 return "ChunkReadOp";
2210 }
2211 };
2212
2213 class CopyOp : public TestOp {
2214 public:
2215 string oid, oid_src, tgt_pool_name;
2216 librados::ObjectWriteOperation op;
2217 librados::ObjectReadOperation rd_op;
2218 librados::AioCompletion *comp;
2219 ObjectDesc src_value, tgt_value;
2220 int done;
2221 int r;
2222 CopyOp(int n,
2223 RadosTestContext *context,
2224 const string &oid_src,
2225 const string &oid,
2226 const string &tgt_pool_name,
2227 TestOpStat *stat = 0)
2228 : TestOp(n, context, stat),
2229 oid(oid), oid_src(oid_src), tgt_pool_name(tgt_pool_name),
2230 comp(NULL), done(0), r(0)
2231 {}
2232
2233 void _begin() override
2234 {
2235 std::lock_guard l{context->state_lock};
2236 context->oid_in_use.insert(oid_src);
2237 context->oid_not_in_use.erase(oid_src);
2238
2239 string src = context->prefix+oid_src;
2240 context->find_object(oid_src, &src_value);
2241 op.copy_from(src.c_str(), context->io_ctx, src_value.version, 0);
2242
2243 cout << "copy op oid " << oid_src << " to " << oid << " tgt_pool_name " << tgt_pool_name << std::endl;
2244
2245 pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
2246 new pair<TestOp*, TestOp::CallbackInfo*>(this,
2247 new TestOp::CallbackInfo(0));
2248 comp = context->rados.aio_create_completion((void*) cb_arg, &write_callback);
2249 if (tgt_pool_name == context->low_tier_pool_name) {
2250 context->low_tier_io_ctx.aio_operate(context->prefix+oid, comp, &op);
2251 } else {
2252 context->io_ctx.aio_operate(context->prefix+oid, comp, &op);
2253 }
2254 }
2255
2256 void _finish(CallbackInfo *info) override
2257 {
2258 std::lock_guard l{context->state_lock};
2259
2260 if (info->id == 0) {
2261 ceph_assert(comp->is_complete());
2262 cout << num << ": finishing copy op to oid " << oid << std::endl;
2263 if ((r = comp->get_return_value())) {
2264 cerr << "Error: oid " << oid << " write returned error code "
2265 << r << std::endl;
2266 ceph_abort();
2267 }
2268 }
2269
2270 if (++done == 1) {
2271 context->oid_in_use.erase(oid_src);
2272 context->oid_not_in_use.insert(oid_src);
2273 context->kick();
2274 }
2275 }
2276
2277 bool finished() override
2278 {
2279 return done == 1;
2280 }
2281
2282 string getType() override
2283 {
2284 return "CopyOp";
2285 }
2286 };
2287
2288 class SetChunkOp : public TestOp {
2289 public:
2290 string oid, oid_tgt, tgt_pool_name;
2291 ObjectDesc src_value, tgt_value;
2292 librados::ObjectReadOperation op;
2293 librados::ObjectReadOperation rd_op;
2294 librados::AioCompletion *comp;
2295 std::shared_ptr<int> in_use;
2296 int done;
2297 int r;
2298 uint64_t offset;
2299 uint32_t length;
2300 uint64_t tgt_offset;
2301 SetChunkOp(int n,
2302 RadosTestContext *context,
2303 const string &oid,
2304 uint64_t offset,
2305 uint32_t length,
2306 const string &oid_tgt,
2307 const string &tgt_pool_name,
2308 uint64_t tgt_offset,
2309 TestOpStat *stat = 0)
2310 : TestOp(n, context, stat),
2311 oid(oid), oid_tgt(oid_tgt), tgt_pool_name(tgt_pool_name),
2312 comp(NULL), done(0),
2313 r(0), offset(offset), length(length),
2314 tgt_offset(tgt_offset)
2315 {}
2316
2317 void _begin() override
2318 {
2319 std::lock_guard l{context->state_lock};
2320 context->oid_in_use.insert(oid);
2321 context->oid_not_in_use.erase(oid);
2322
2323 if (tgt_pool_name.empty()) ceph_abort();
2324
2325 context->find_object(oid, &src_value);
2326 context->find_object(oid_tgt, &tgt_value);
2327
2328 if (src_value.version != 0 && !src_value.deleted())
2329 op.assert_version(src_value.version);
2330 op.set_chunk(offset, length, context->low_tier_io_ctx,
2331 context->prefix+oid_tgt, tgt_offset, CEPH_OSD_OP_FLAG_WITH_REFERENCE);
2332
2333 pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
2334 new pair<TestOp*, TestOp::CallbackInfo*>(this,
2335 new TestOp::CallbackInfo(0));
2336 comp = context->rados.aio_create_completion((void*) cb_arg,
2337 &write_callback);
2338 context->io_ctx.aio_operate(context->prefix+oid, comp, &op,
2339 librados::OPERATION_ORDER_READS_WRITES, NULL);
2340 }
2341
2342 void _finish(CallbackInfo *info) override
2343 {
2344 std::lock_guard l{context->state_lock};
2345
2346 if (info->id == 0) {
2347 ceph_assert(comp->is_complete());
2348 cout << num << ": finishing set_chunk to oid " << oid << std::endl;
2349 if ((r = comp->get_return_value())) {
2350 if (r == -ENOENT && src_value.deleted()) {
2351 cout << num << ": got expected ENOENT (src dne)" << std::endl;
2352 } else if (r == -EOPNOTSUPP) {
2353 bool is_overlapped = false;
2354 for (auto &p : src_value.chunk_info) {
2355 if ((p.first <= offset && p.first + p.second.length > offset) ||
2356 (p.first > offset && p.first <= offset + length)) {
2357 cout << " range is overlapped offset: " << offset << " length: " << length
2358 << " chunk_info offset: " << p.second.offset << " length "
2359 << p.second.length << std::endl;
2360 is_overlapped = true;
2361 context->update_object_version(oid, comp->get_version64());
2362 }
2363 }
2364 if (!is_overlapped) {
2365 cerr << "Error: oid " << oid << " set_chunk " << oid_tgt << " returned error code "
2366 << r << " offset: " << offset << " length: " << length << std::endl;
2367 ceph_abort();
2368 }
2369 } else {
2370 cerr << "Error: oid " << oid << " set_chunk " << oid_tgt << " returned error code "
2371 << r << std::endl;
2372 ceph_abort();
2373 }
2374 } else {
2375 ChunkDesc info;
2376 info.offset = tgt_offset;
2377 info.length = length;
2378 info.oid = oid_tgt;
2379 context->update_object_chunk_target(oid, offset, info);
2380 context->update_object_version(oid, comp->get_version64());
2381 }
2382 }
2383
2384 if (++done == 1) {
2385 context->oid_in_use.erase(oid);
2386 context->oid_not_in_use.insert(oid);
2387 context->kick();
2388 }
2389 }
2390
2391 bool finished() override
2392 {
2393 return done == 1;
2394 }
2395
2396 string getType() override
2397 {
2398 return "SetChunkOp";
2399 }
2400 };
2401
2402 class SetRedirectOp : public TestOp {
2403 public:
2404 string oid, oid_tgt, tgt_pool_name;
2405 ObjectDesc src_value, tgt_value;
2406 librados::ObjectWriteOperation op;
2407 librados::ObjectReadOperation rd_op;
2408 librados::AioCompletion *comp;
2409 std::shared_ptr<int> in_use;
2410 int done;
2411 int r;
2412 SetRedirectOp(int n,
2413 RadosTestContext *context,
2414 const string &oid,
2415 const string &oid_tgt,
2416 const string &tgt_pool_name,
2417 TestOpStat *stat = 0)
2418 : TestOp(n, context, stat),
2419 oid(oid), oid_tgt(oid_tgt), tgt_pool_name(tgt_pool_name),
2420 comp(NULL), done(0),
2421 r(0)
2422 {}
2423
2424 void _begin() override
2425 {
2426 std::lock_guard l{context->state_lock};
2427 context->oid_in_use.insert(oid);
2428 context->oid_not_in_use.erase(oid);
2429 context->oid_redirect_in_use.insert(oid_tgt);
2430 context->oid_redirect_not_in_use.erase(oid_tgt);
2431
2432 if (tgt_pool_name.empty()) ceph_abort();
2433
2434 context->find_object(oid, &src_value);
2435 if(!context->redirect_objs[oid].empty()) {
2436 /* copy_from oid --> oid_tgt */
2437 comp = context->rados.aio_create_completion();
2438 string src = context->prefix+oid;
2439 op.copy_from(src.c_str(), context->io_ctx, src_value.version, 0);
2440 context->low_tier_io_ctx.aio_operate(context->prefix+oid_tgt, comp, &op,
2441 librados::OPERATION_ORDER_READS_WRITES);
2442 comp->wait_for_complete();
2443 if ((r = comp->get_return_value())) {
2444 cerr << "Error: oid " << oid << " copy_from " << oid_tgt << " returned error code "
2445 << r << std::endl;
2446 ceph_abort();
2447 }
2448 comp->release();
2449
2450 /* unset redirect target */
2451 comp = context->rados.aio_create_completion();
2452 bool present = !src_value.deleted();
2453 op.unset_manifest();
2454 context->io_ctx.aio_operate(context->prefix+oid, comp, &op,
2455 librados::OPERATION_ORDER_READS_WRITES |
2456 librados::OPERATION_IGNORE_REDIRECT);
2457 comp->wait_for_complete();
2458 if ((r = comp->get_return_value())) {
2459 if (!(r == -ENOENT && !present) && r != -EOPNOTSUPP) {
2460 cerr << "r is " << r << " while deleting " << oid << " and present is " << present << std::endl;
2461 ceph_abort();
2462 }
2463 }
2464 comp->release();
2465
2466 context->oid_redirect_not_in_use.insert(context->redirect_objs[oid]);
2467 context->oid_redirect_in_use.erase(context->redirect_objs[oid]);
2468 }
2469
2470 comp = context->rados.aio_create_completion();
2471 rd_op.stat(NULL, NULL, NULL);
2472 context->io_ctx.aio_operate(context->prefix+oid, comp, &rd_op,
2473 librados::OPERATION_ORDER_READS_WRITES |
2474 librados::OPERATION_IGNORE_REDIRECT,
2475 NULL);
2476 comp->wait_for_complete();
2477 if ((r = comp->get_return_value()) && !src_value.deleted()) {
2478 cerr << "Error: oid " << oid << " stat returned error code "
2479 << r << std::endl;
2480 ceph_abort();
2481 }
2482 context->update_object_version(oid, comp->get_version64());
2483 comp->release();
2484
2485 comp = context->rados.aio_create_completion();
2486 rd_op.stat(NULL, NULL, NULL);
2487 context->low_tier_io_ctx.aio_operate(context->prefix+oid_tgt, comp, &rd_op,
2488 librados::OPERATION_ORDER_READS_WRITES |
2489 librados::OPERATION_IGNORE_REDIRECT,
2490 NULL);
2491 comp->wait_for_complete();
2492 if ((r = comp->get_return_value())) {
2493 cerr << "Error: oid " << oid_tgt << " stat returned error code "
2494 << r << std::endl;
2495 ceph_abort();
2496 }
2497 uint64_t tgt_version = comp->get_version64();
2498 comp->release();
2499
2500
2501 context->find_object(oid, &src_value);
2502
2503 if (src_value.version != 0 && !src_value.deleted())
2504 op.assert_version(src_value.version);
2505 op.set_redirect(context->prefix+oid_tgt, context->low_tier_io_ctx, tgt_version);
2506
2507 pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
2508 new pair<TestOp*, TestOp::CallbackInfo*>(this,
2509 new TestOp::CallbackInfo(0));
2510 comp = context->rados.aio_create_completion((void*) cb_arg, &write_callback);
2511 context->io_ctx.aio_operate(context->prefix+oid, comp, &op,
2512 librados::OPERATION_ORDER_READS_WRITES);
2513 }
2514
2515 void _finish(CallbackInfo *info) override
2516 {
2517 std::lock_guard l{context->state_lock};
2518
2519 if (info->id == 0) {
2520 ceph_assert(comp->is_complete());
2521 cout << num << ": finishing set_redirect to oid " << oid << std::endl;
2522 if ((r = comp->get_return_value())) {
2523 if (r == -ENOENT && src_value.deleted()) {
2524 cout << num << ": got expected ENOENT (src dne)" << std::endl;
2525 } else {
2526 cerr << "Error: oid " << oid << " set_redirect " << oid_tgt << " returned error code "
2527 << r << std::endl;
2528 ceph_abort();
2529 }
2530 } else {
2531 context->update_object_redirect_target(oid, oid_tgt);
2532 context->update_object_version(oid, comp->get_version64());
2533 }
2534 }
2535
2536 if (++done == 1) {
2537 context->oid_in_use.erase(oid);
2538 context->oid_not_in_use.insert(oid);
2539 context->kick();
2540 }
2541 }
2542
2543 bool finished() override
2544 {
2545 return done == 1;
2546 }
2547
2548 string getType() override
2549 {
2550 return "SetRedirectOp";
2551 }
2552 };
2553
2554 class UnsetRedirectOp : public TestOp {
2555 public:
2556 string oid;
2557 librados::ObjectWriteOperation op;
2558 librados::AioCompletion *comp = nullptr;
2559
2560 UnsetRedirectOp(int n,
2561 RadosTestContext *context,
2562 const string &oid,
2563 TestOpStat *stat = 0)
2564 : TestOp(n, context, stat), oid(oid)
2565 {}
2566
2567 void _begin() override
2568 {
2569 std::unique_lock state_locker{context->state_lock};
2570 if (context->get_watch_context(oid)) {
2571 context->kick();
2572 return;
2573 }
2574
2575 ObjectDesc contents;
2576 context->find_object(oid, &contents);
2577 bool present = !contents.deleted();
2578
2579 context->oid_in_use.insert(oid);
2580 context->oid_not_in_use.erase(oid);
2581 context->seq_num++;
2582
2583 context->remove_object(oid);
2584
2585 state_locker.unlock();
2586
2587 comp = context->rados.aio_create_completion();
2588 op.remove();
2589 context->io_ctx.aio_operate(context->prefix+oid, comp, &op,
2590 librados::OPERATION_ORDER_READS_WRITES |
2591 librados::OPERATION_IGNORE_REDIRECT);
2592 comp->wait_for_complete();
2593 int r = comp->get_return_value();
2594 if (r && !(r == -ENOENT && !present)) {
2595 cerr << "r is " << r << " while deleting " << oid << " and present is " << present << std::endl;
2596 ceph_abort();
2597 }
2598 state_locker.lock();
2599 context->oid_in_use.erase(oid);
2600 context->oid_not_in_use.insert(oid);
2601 if(!context->redirect_objs[oid].empty()) {
2602 context->oid_redirect_not_in_use.insert(context->redirect_objs[oid]);
2603 context->oid_redirect_in_use.erase(context->redirect_objs[oid]);
2604 context->update_object_redirect_target(oid, string());
2605 }
2606 context->kick();
2607 }
2608
2609 string getType() override
2610 {
2611 return "UnsetRedirectOp";
2612 }
2613 };
2614
2615 class TierPromoteOp : public TestOp {
2616 public:
2617 librados::AioCompletion *completion;
2618 librados::ObjectWriteOperation op;
2619 string oid;
2620 std::shared_ptr<int> in_use;
2621
2622 TierPromoteOp(int n,
2623 RadosTestContext *context,
2624 const string &oid,
2625 TestOpStat *stat)
2626 : TestOp(n, context, stat),
2627 completion(NULL),
2628 oid(oid)
2629 {}
2630
2631 void _begin() override
2632 {
2633 context->state_lock.lock();
2634
2635 context->oid_in_use.insert(oid);
2636 context->oid_not_in_use.erase(oid);
2637
2638 pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
2639 new pair<TestOp*, TestOp::CallbackInfo*>(this,
2640 new TestOp::CallbackInfo(0));
2641 completion = context->rados.aio_create_completion((void *) cb_arg,
2642 &write_callback);
2643 context->state_lock.unlock();
2644
2645 op.tier_promote();
2646 int r = context->io_ctx.aio_operate(context->prefix+oid, completion,
2647 &op);
2648 ceph_assert(!r);
2649 }
2650
2651 void _finish(CallbackInfo *info) override
2652 {
2653 std::lock_guard l{context->state_lock};
2654 ceph_assert(!done);
2655 ceph_assert(completion->is_complete());
2656
2657 ObjectDesc oid_value;
2658 context->find_object(oid, &oid_value);
2659 int r = completion->get_return_value();
2660 cout << num << ": got " << cpp_strerror(r) << std::endl;
2661 if (r == 0) {
2662 // sucess
2663 } else {
2664 ceph_abort_msg("shouldn't happen");
2665 }
2666 context->update_object_version(oid, completion->get_version64());
2667 context->find_object(oid, &oid_value);
2668 context->oid_in_use.erase(oid);
2669 context->oid_not_in_use.insert(oid);
2670 context->kick();
2671 done = true;
2672 }
2673
2674 bool finished() override
2675 {
2676 return done;
2677 }
2678
2679 string getType() override
2680 {
2681 return "TierPromoteOp";
2682 }
2683 };
2684
2685 class TierFlushOp : public TestOp {
2686 public:
2687 librados::AioCompletion *completion;
2688 librados::ObjectReadOperation op;
2689 string oid;
2690 std::shared_ptr<int> in_use;
2691
2692 TierFlushOp(int n,
2693 RadosTestContext *context,
2694 const string &oid,
2695 TestOpStat *stat)
2696 : TestOp(n, context, stat),
2697 completion(NULL),
2698 oid(oid)
2699 {}
2700
2701 void _begin() override
2702 {
2703 context->state_lock.lock();
2704
2705 context->oid_in_use.insert(oid);
2706 context->oid_not_in_use.erase(oid);
2707
2708 pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
2709 new pair<TestOp*, TestOp::CallbackInfo*>(this,
2710 new TestOp::CallbackInfo(0));
2711 completion = context->rados.aio_create_completion((void *) cb_arg,
2712 &write_callback);
2713 context->state_lock.unlock();
2714
2715 op.tier_flush();
2716 unsigned flags = librados::OPERATION_IGNORE_CACHE;
2717 int r = context->io_ctx.aio_operate(context->prefix+oid, completion,
2718 &op, flags, NULL);
2719 ceph_assert(!r);
2720 }
2721
2722 void _finish(CallbackInfo *info) override
2723 {
2724 context->state_lock.lock();
2725 ceph_assert(!done);
2726 ceph_assert(completion->is_complete());
2727
2728 int r = completion->get_return_value();
2729 cout << num << ": got " << cpp_strerror(r) << std::endl;
2730 if (r == 0) {
2731 // sucess
2732 } else {
2733 ceph_abort_msg("shouldn't happen");
2734 }
2735 context->update_object_version(oid, completion->get_version64());
2736 context->oid_in_use.erase(oid);
2737 context->oid_not_in_use.insert(oid);
2738 context->kick();
2739 done = true;
2740 context->state_lock.unlock();
2741 }
2742
2743 bool finished() override
2744 {
2745 return done;
2746 }
2747
2748 string getType() override
2749 {
2750 return "TierFlushOp";
2751 }
2752 };
2753
2754 class HitSetListOp : public TestOp {
2755 librados::AioCompletion *comp1, *comp2;
2756 uint32_t hash;
2757 std::list< std::pair<time_t, time_t> > ls;
2758 bufferlist bl;
2759
2760 public:
2761 HitSetListOp(int n,
2762 RadosTestContext *context,
2763 uint32_t hash,
2764 TestOpStat *stat = 0)
2765 : TestOp(n, context, stat),
2766 comp1(NULL), comp2(NULL),
2767 hash(hash)
2768 {}
2769
2770 void _begin() override
2771 {
2772 pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
2773 new pair<TestOp*, TestOp::CallbackInfo*>(this,
2774 new TestOp::CallbackInfo(0));
2775 comp1 = context->rados.aio_create_completion((void*) cb_arg,
2776 &write_callback);
2777 int r = context->io_ctx.hit_set_list(hash, comp1, &ls);
2778 ceph_assert(r == 0);
2779 }
2780
2781 void _finish(CallbackInfo *info) override {
2782 std::lock_guard l{context->state_lock};
2783 if (!comp2) {
2784 if (ls.empty()) {
2785 cerr << num << ": no hitsets" << std::endl;
2786 done = true;
2787 } else {
2788 cerr << num << ": hitsets are " << ls << std::endl;
2789 int r = rand() % ls.size();
2790 std::list<pair<time_t,time_t> >::iterator p = ls.begin();
2791 while (r--)
2792 ++p;
2793 pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
2794 new pair<TestOp*, TestOp::CallbackInfo*>(this,
2795 new TestOp::CallbackInfo(0));
2796 comp2 = context->rados.aio_create_completion((void*) cb_arg, &write_callback);
2797 r = context->io_ctx.hit_set_get(hash, comp2, p->second, &bl);
2798 ceph_assert(r == 0);
2799 }
2800 } else {
2801 int r = comp2->get_return_value();
2802 if (r == 0) {
2803 HitSet hitset;
2804 auto p = bl.cbegin();
2805 decode(hitset, p);
2806 cout << num << ": got hitset of type " << hitset.get_type_name()
2807 << " size " << bl.length()
2808 << std::endl;
2809 } else {
2810 // FIXME: we could verify that we did in fact race with a trim...
2811 ceph_assert(r == -ENOENT);
2812 }
2813 done = true;
2814 }
2815
2816 context->kick();
2817 }
2818
2819 bool finished() override {
2820 return done;
2821 }
2822
2823 string getType() override {
2824 return "HitSetListOp";
2825 }
2826 };
2827
2828 class UndirtyOp : public TestOp {
2829 public:
2830 librados::AioCompletion *completion;
2831 librados::ObjectWriteOperation op;
2832 string oid;
2833
2834 UndirtyOp(int n,
2835 RadosTestContext *context,
2836 const string &oid,
2837 TestOpStat *stat = 0)
2838 : TestOp(n, context, stat),
2839 completion(NULL),
2840 oid(oid)
2841 {}
2842
2843 void _begin() override
2844 {
2845 context->state_lock.lock();
2846 pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
2847 new pair<TestOp*, TestOp::CallbackInfo*>(this,
2848 new TestOp::CallbackInfo(0));
2849 completion = context->rados.aio_create_completion((void *) cb_arg,
2850 &write_callback);
2851
2852 context->oid_in_use.insert(oid);
2853 context->oid_not_in_use.erase(oid);
2854 context->update_object_undirty(oid);
2855 context->state_lock.unlock();
2856
2857 op.undirty();
2858 int r = context->io_ctx.aio_operate(context->prefix+oid, completion,
2859 &op, 0);
2860 ceph_assert(!r);
2861 }
2862
2863 void _finish(CallbackInfo *info) override
2864 {
2865 std::lock_guard state_locker{context->state_lock};
2866 ceph_assert(!done);
2867 ceph_assert(completion->is_complete());
2868 context->oid_in_use.erase(oid);
2869 context->oid_not_in_use.insert(oid);
2870 context->update_object_version(oid, completion->get_version64());
2871 context->kick();
2872 done = true;
2873 }
2874
2875 bool finished() override
2876 {
2877 return done;
2878 }
2879
2880 string getType() override
2881 {
2882 return "UndirtyOp";
2883 }
2884 };
2885
2886 class IsDirtyOp : public TestOp {
2887 public:
2888 librados::AioCompletion *completion;
2889 librados::ObjectReadOperation op;
2890 string oid;
2891 bool dirty;
2892 ObjectDesc old_value;
2893 int snap = 0;
2894 std::shared_ptr<int> in_use;
2895
2896 IsDirtyOp(int n,
2897 RadosTestContext *context,
2898 const string &oid,
2899 TestOpStat *stat = 0)
2900 : TestOp(n, context, stat),
2901 completion(NULL),
2902 oid(oid),
2903 dirty(false)
2904 {}
2905
2906 void _begin() override
2907 {
2908 context->state_lock.lock();
2909
2910 if (!(rand() % 4) && !context->snaps.empty()) {
2911 snap = rand_choose(context->snaps)->first;
2912 in_use = context->snaps_in_use.lookup_or_create(snap, snap);
2913 } else {
2914 snap = -1;
2915 }
2916 std::cout << num << ": is_dirty oid " << oid << " snap " << snap
2917 << std::endl;
2918
2919 pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
2920 new pair<TestOp*, TestOp::CallbackInfo*>(this,
2921 new TestOp::CallbackInfo(0));
2922 completion = context->rados.aio_create_completion((void *) cb_arg,
2923 &write_callback);
2924
2925 context->oid_in_use.insert(oid);
2926 context->oid_not_in_use.erase(oid);
2927 context->state_lock.unlock();
2928
2929 if (snap >= 0) {
2930 context->io_ctx.snap_set_read(context->snaps[snap]);
2931 }
2932
2933 op.is_dirty(&dirty, NULL);
2934 int r = context->io_ctx.aio_operate(context->prefix+oid, completion,
2935 &op, 0);
2936 ceph_assert(!r);
2937
2938 if (snap >= 0) {
2939 context->io_ctx.snap_set_read(0);
2940 }
2941 }
2942
2943 void _finish(CallbackInfo *info) override
2944 {
2945 std::lock_guard state_locker{context->state_lock};
2946 ceph_assert(!done);
2947 ceph_assert(completion->is_complete());
2948 context->oid_in_use.erase(oid);
2949 context->oid_not_in_use.insert(oid);
2950
2951 ceph_assert(context->find_object(oid, &old_value, snap));
2952
2953 int r = completion->get_return_value();
2954 if (r == 0) {
2955 cout << num << ": " << (dirty ? "dirty" : "clean") << std::endl;
2956 ceph_assert(!old_value.deleted());
2957 ceph_assert(dirty == old_value.dirty);
2958 } else {
2959 cout << num << ": got " << r << std::endl;
2960 ceph_assert(r == -ENOENT);
2961 ceph_assert(old_value.deleted());
2962 }
2963 context->kick();
2964 done = true;
2965 }
2966
2967 bool finished() override
2968 {
2969 return done;
2970 }
2971
2972 string getType() override
2973 {
2974 return "IsDirtyOp";
2975 }
2976 };
2977
2978
2979
2980 class CacheFlushOp : public TestOp {
2981 public:
2982 librados::AioCompletion *completion;
2983 librados::ObjectReadOperation op;
2984 string oid;
2985 bool blocking;
2986 int snap;
2987 bool can_fail;
2988 std::shared_ptr<int> in_use;
2989
2990 CacheFlushOp(int n,
2991 RadosTestContext *context,
2992 const string &oid,
2993 TestOpStat *stat,
2994 bool b)
2995 : TestOp(n, context, stat),
2996 completion(NULL),
2997 oid(oid),
2998 blocking(b),
2999 snap(0),
3000 can_fail(false)
3001 {}
3002
3003 void _begin() override
3004 {
3005 context->state_lock.lock();
3006
3007 if (!(rand() % 4) && !context->snaps.empty()) {
3008 snap = rand_choose(context->snaps)->first;
3009 in_use = context->snaps_in_use.lookup_or_create(snap, snap);
3010 } else {
3011 snap = -1;
3012 }
3013 // not being particularly specific here about knowing which
3014 // flushes are on the oldest clean snap and which ones are not.
3015 can_fail = !blocking || !context->snaps.empty();
3016 // FIXME: we could fail if we've ever removed a snap due to
3017 // the async snap trimming.
3018 can_fail = true;
3019 cout << num << ": " << (blocking ? "cache_flush" : "cache_try_flush")
3020 << " oid " << oid << " snap " << snap << std::endl;
3021
3022 if (snap >= 0) {
3023 context->io_ctx.snap_set_read(context->snaps[snap]);
3024 }
3025
3026 pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
3027 new pair<TestOp*, TestOp::CallbackInfo*>(this,
3028 new TestOp::CallbackInfo(0));
3029 completion = context->rados.aio_create_completion((void *) cb_arg,
3030 &write_callback);
3031 context->oid_flushing.insert(oid);
3032 context->oid_not_flushing.erase(oid);
3033 context->state_lock.unlock();
3034
3035 unsigned flags = librados::OPERATION_IGNORE_CACHE;
3036 if (blocking) {
3037 op.cache_flush();
3038 } else {
3039 op.cache_try_flush();
3040 flags = librados::OPERATION_SKIPRWLOCKS;
3041 }
3042 int r = context->io_ctx.aio_operate(context->prefix+oid, completion,
3043 &op, flags, NULL);
3044 ceph_assert(!r);
3045
3046 if (snap >= 0) {
3047 context->io_ctx.snap_set_read(0);
3048 }
3049 }
3050
3051 void _finish(CallbackInfo *info) override
3052 {
3053 std::lock_guard state_locker{context->state_lock};
3054 ceph_assert(!done);
3055 ceph_assert(completion->is_complete());
3056 context->oid_flushing.erase(oid);
3057 context->oid_not_flushing.insert(oid);
3058 int r = completion->get_return_value();
3059 cout << num << ": got " << cpp_strerror(r) << std::endl;
3060 if (r == 0) {
3061 context->update_object_version(oid, 0, snap);
3062 } else if (r == -EBUSY) {
3063 ceph_assert(can_fail);
3064 } else if (r == -EINVAL) {
3065 // caching not enabled?
3066 } else if (r == -ENOENT) {
3067 // may have raced with a remove?
3068 } else {
3069 ceph_abort_msg("shouldn't happen");
3070 }
3071 context->kick();
3072 done = true;
3073 }
3074
3075 bool finished() override
3076 {
3077 return done;
3078 }
3079
3080 string getType() override
3081 {
3082 return "CacheFlushOp";
3083 }
3084 };
3085
3086 class CacheEvictOp : public TestOp {
3087 public:
3088 librados::AioCompletion *completion;
3089 librados::ObjectReadOperation op;
3090 string oid;
3091 std::shared_ptr<int> in_use;
3092
3093 CacheEvictOp(int n,
3094 RadosTestContext *context,
3095 const string &oid,
3096 TestOpStat *stat)
3097 : TestOp(n, context, stat),
3098 completion(NULL),
3099 oid(oid)
3100 {}
3101
3102 void _begin() override
3103 {
3104 context->state_lock.lock();
3105
3106 int snap;
3107 if (!(rand() % 4) && !context->snaps.empty()) {
3108 snap = rand_choose(context->snaps)->first;
3109 in_use = context->snaps_in_use.lookup_or_create(snap, snap);
3110 } else {
3111 snap = -1;
3112 }
3113 cout << num << ": cache_evict oid " << oid << " snap " << snap << std::endl;
3114
3115 if (snap >= 0) {
3116 context->io_ctx.snap_set_read(context->snaps[snap]);
3117 }
3118
3119 pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
3120 new pair<TestOp*, TestOp::CallbackInfo*>(this,
3121 new TestOp::CallbackInfo(0));
3122 completion = context->rados.aio_create_completion((void *) cb_arg,
3123 &write_callback);
3124 context->state_lock.unlock();
3125
3126 op.cache_evict();
3127 int r = context->io_ctx.aio_operate(context->prefix+oid, completion,
3128 &op, librados::OPERATION_IGNORE_CACHE,
3129 NULL);
3130 ceph_assert(!r);
3131
3132 if (snap >= 0) {
3133 context->io_ctx.snap_set_read(0);
3134 }
3135 }
3136
3137 void _finish(CallbackInfo *info) override
3138 {
3139 std::lock_guard state_locker{context->state_lock};
3140 ceph_assert(!done);
3141 ceph_assert(completion->is_complete());
3142
3143 int r = completion->get_return_value();
3144 cout << num << ": got " << cpp_strerror(r) << std::endl;
3145 if (r == 0) {
3146 // yay!
3147 } else if (r == -EBUSY) {
3148 // raced with something that dirtied the object
3149 } else if (r == -EINVAL) {
3150 // caching not enabled?
3151 } else if (r == -ENOENT) {
3152 // may have raced with a remove?
3153 } else {
3154 ceph_abort_msg("shouldn't happen");
3155 }
3156 context->kick();
3157 done = true;
3158 }
3159
3160 bool finished() override
3161 {
3162 return done;
3163 }
3164
3165 string getType() override
3166 {
3167 return "CacheEvictOp";
3168 }
3169 };
3170
3171
3172 #endif