]> git.proxmox.com Git - ceph.git/blame - ceph/src/os/memstore/MemStore.cc
update sources to v12.1.1
[ceph.git] / ceph / src / os / memstore / MemStore.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14#include "acconfig.h"
15
16#ifdef HAVE_SYS_MOUNT_H
17#include <sys/mount.h>
18#endif
19
20#ifdef HAVE_SYS_PARAM_H
21#include <sys/param.h>
22#endif
23
24#include "include/types.h"
25#include "include/stringify.h"
26#include "include/unordered_map.h"
27#include "include/memory.h"
28#include "common/errno.h"
29#include "MemStore.h"
30#include "include/compat.h"
31
32#define dout_context cct
33#define dout_subsys ceph_subsys_filestore
34#undef dout_prefix
35#define dout_prefix *_dout << "memstore(" << path << ") "
36
37// for comparing collections for lock ordering
38bool operator>(const MemStore::CollectionRef& l,
39 const MemStore::CollectionRef& r)
40{
41 return (unsigned long)l.get() > (unsigned long)r.get();
42}
43
44
45int MemStore::mount()
46{
47 int r = _load();
48 if (r < 0)
49 return r;
50 finisher.start();
51 return 0;
52}
53
54int MemStore::umount()
55{
56 finisher.wait_for_empty();
57 finisher.stop();
58 return _save();
59}
60
61int MemStore::_save()
62{
63 dout(10) << __func__ << dendl;
64 dump_all();
65 set<coll_t> collections;
66 for (ceph::unordered_map<coll_t,CollectionRef>::iterator p = coll_map.begin();
67 p != coll_map.end();
68 ++p) {
69 dout(20) << __func__ << " coll " << p->first << " " << p->second << dendl;
70 collections.insert(p->first);
71 bufferlist bl;
72 assert(p->second);
73 p->second->encode(bl);
74 string fn = path + "/" + stringify(p->first);
75 int r = bl.write_file(fn.c_str());
76 if (r < 0)
77 return r;
78 }
79
80 string fn = path + "/collections";
81 bufferlist bl;
82 ::encode(collections, bl);
83 int r = bl.write_file(fn.c_str());
84 if (r < 0)
85 return r;
86
87 return 0;
88}
89
90void MemStore::dump_all()
91{
92 Formatter *f = Formatter::create("json-pretty");
93 f->open_object_section("store");
94 dump(f);
95 f->close_section();
96 dout(0) << "dump:";
97 f->flush(*_dout);
98 *_dout << dendl;
99 delete f;
100}
101
102void MemStore::dump(Formatter *f)
103{
104 f->open_array_section("collections");
105 for (ceph::unordered_map<coll_t,CollectionRef>::iterator p = coll_map.begin();
106 p != coll_map.end();
107 ++p) {
108 f->open_object_section("collection");
109 f->dump_string("name", stringify(p->first));
110
111 f->open_array_section("xattrs");
112 for (map<string,bufferptr>::iterator q = p->second->xattr.begin();
113 q != p->second->xattr.end();
114 ++q) {
115 f->open_object_section("xattr");
116 f->dump_string("name", q->first);
117 f->dump_int("length", q->second.length());
118 f->close_section();
119 }
120 f->close_section();
121
122 f->open_array_section("objects");
123 for (map<ghobject_t,ObjectRef>::iterator q = p->second->object_map.begin();
124 q != p->second->object_map.end();
125 ++q) {
126 f->open_object_section("object");
127 f->dump_string("name", stringify(q->first));
128 if (q->second)
129 q->second->dump(f);
130 f->close_section();
131 }
132 f->close_section();
133
134 f->close_section();
135 }
136 f->close_section();
137}
138
139int MemStore::_load()
140{
141 dout(10) << __func__ << dendl;
142 bufferlist bl;
143 string fn = path + "/collections";
144 string err;
145 int r = bl.read_file(fn.c_str(), &err);
146 if (r < 0)
147 return r;
148
149 set<coll_t> collections;
150 bufferlist::iterator p = bl.begin();
151 ::decode(collections, p);
152
153 for (set<coll_t>::iterator q = collections.begin();
154 q != collections.end();
155 ++q) {
156 string fn = path + "/" + stringify(*q);
157 bufferlist cbl;
158 int r = cbl.read_file(fn.c_str(), &err);
159 if (r < 0)
160 return r;
161 CollectionRef c(new Collection(cct, *q));
162 bufferlist::iterator p = cbl.begin();
163 c->decode(p);
164 coll_map[*q] = c;
165 used_bytes += c->used_bytes();
166 }
167
168 dump_all();
169
170 return 0;
171}
172
173void MemStore::set_fsid(uuid_d u)
174{
175 int r = write_meta("fs_fsid", stringify(u));
176 assert(r >= 0);
177}
178
179uuid_d MemStore::get_fsid()
180{
181 string fsid_str;
182 int r = read_meta("fs_fsid", &fsid_str);
183 assert(r >= 0);
184 uuid_d uuid;
185 bool b = uuid.parse(fsid_str.c_str());
186 assert(b);
187 return uuid;
188}
189
190int MemStore::mkfs()
191{
192 string fsid_str;
193 int r = read_meta("fs_fsid", &fsid_str);
194 if (r == -ENOENT) {
195 uuid_d fsid;
196 fsid.generate_random();
197 fsid_str = stringify(fsid);
198 r = write_meta("fs_fsid", fsid_str);
199 if (r < 0)
200 return r;
201 dout(1) << __func__ << " new fsid " << fsid_str << dendl;
202 } else if (r < 0) {
203 return r;
204 } else {
205 dout(1) << __func__ << " had fsid " << fsid_str << dendl;
206 }
207
208 string fn = path + "/collections";
209 derr << path << dendl;
210 bufferlist bl;
211 set<coll_t> collections;
212 ::encode(collections, bl);
213 r = bl.write_file(fn.c_str());
214 if (r < 0)
215 return r;
216
217 r = write_meta("type", "memstore");
218 if (r < 0)
219 return r;
220
221 return 0;
222}
223
224int MemStore::statfs(struct store_statfs_t *st)
225{
226 dout(10) << __func__ << dendl;
227 st->reset();
228 st->total = cct->_conf->memstore_device_bytes;
229 st->available = MAX(int64_t(st->total) - int64_t(used_bytes), 0ll);
230 dout(10) << __func__ << ": used_bytes: " << used_bytes
231 << "/" << cct->_conf->memstore_device_bytes << dendl;
232 return 0;
233}
234
235objectstore_perf_stat_t MemStore::get_cur_stats()
236{
237 // fixme
238 return objectstore_perf_stat_t();
239}
240
241MemStore::CollectionRef MemStore::get_collection(const coll_t& cid)
242{
243 RWLock::RLocker l(coll_lock);
244 ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
245 if (cp == coll_map.end())
246 return CollectionRef();
247 return cp->second;
248}
249
250
251// ---------------
252// read operations
253
254bool MemStore::exists(const coll_t& cid, const ghobject_t& oid)
255{
256 CollectionHandle c = get_collection(cid);
257 if (!c)
258 return false;
259 return exists(c, oid);
260}
261
262bool MemStore::exists(CollectionHandle &c_, const ghobject_t& oid)
263{
264 Collection *c = static_cast<Collection*>(c_.get());
265 dout(10) << __func__ << " " << c->get_cid() << " " << oid << dendl;
266 if (!c->exists)
267 return false;
268
269 // Perform equivalent of c->get_object_(oid) != NULL. In C++11 the
270 // shared_ptr needs to be compared to nullptr.
271 return (bool)c->get_object(oid);
272}
273
274int MemStore::stat(
275 const coll_t& cid,
276 const ghobject_t& oid,
277 struct stat *st,
278 bool allow_eio)
279{
280 CollectionHandle c = get_collection(cid);
281 if (!c)
282 return -ENOENT;
283 return stat(c, oid, st, allow_eio);
284}
285
286int MemStore::stat(
287 CollectionHandle &c_,
288 const ghobject_t& oid,
289 struct stat *st,
290 bool allow_eio)
291{
292 Collection *c = static_cast<Collection*>(c_.get());
293 dout(10) << __func__ << " " << c->cid << " " << oid << dendl;
294 if (!c->exists)
295 return -ENOENT;
296 ObjectRef o = c->get_object(oid);
297 if (!o)
298 return -ENOENT;
299 st->st_size = o->get_size();
300 st->st_blksize = 4096;
301 st->st_blocks = (st->st_size + st->st_blksize - 1) / st->st_blksize;
302 st->st_nlink = 1;
303 return 0;
304}
305
306int MemStore::set_collection_opts(
307 const coll_t& cid,
308 const pool_opts_t& opts)
309{
310 return -EOPNOTSUPP;
311}
312
313int MemStore::read(
314 const coll_t& cid,
315 const ghobject_t& oid,
316 uint64_t offset,
317 size_t len,
318 bufferlist& bl,
224ce89b 319 uint32_t op_flags)
7c673cae
FG
320{
321 CollectionHandle c = get_collection(cid);
322 if (!c)
323 return -ENOENT;
224ce89b 324 return read(c, oid, offset, len, bl, op_flags);
7c673cae
FG
325}
326
327int MemStore::read(
328 CollectionHandle &c_,
329 const ghobject_t& oid,
330 uint64_t offset,
331 size_t len,
332 bufferlist& bl,
224ce89b 333 uint32_t op_flags)
7c673cae
FG
334{
335 Collection *c = static_cast<Collection*>(c_.get());
336 dout(10) << __func__ << " " << c->cid << " " << oid << " "
337 << offset << "~" << len << dendl;
338 if (!c->exists)
339 return -ENOENT;
340 ObjectRef o = c->get_object(oid);
341 if (!o)
342 return -ENOENT;
343 if (offset >= o->get_size())
344 return 0;
345 size_t l = len;
346 if (l == 0 && offset == 0) // note: len == 0 means read the entire object
347 l = o->get_size();
348 else if (offset + l > o->get_size())
349 l = o->get_size() - offset;
350 bl.clear();
351 return o->read(offset, l, bl);
352}
353
354int MemStore::fiemap(const coll_t& cid, const ghobject_t& oid,
355 uint64_t offset, size_t len, bufferlist& bl)
356{
357 map<uint64_t, uint64_t> destmap;
358 int r = fiemap(cid, oid, offset, len, destmap);
359 if (r >= 0)
360 ::encode(destmap, bl);
361 return r;
362}
363
364int MemStore::fiemap(const coll_t& cid, const ghobject_t& oid,
365 uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap)
366{
367 dout(10) << __func__ << " " << cid << " " << oid << " " << offset << "~"
368 << len << dendl;
369 CollectionRef c = get_collection(cid);
370 if (!c)
371 return -ENOENT;
372
373 ObjectRef o = c->get_object(oid);
374 if (!o)
375 return -ENOENT;
376 size_t l = len;
377 if (offset + l > o->get_size())
378 l = o->get_size() - offset;
379 if (offset >= o->get_size())
380 goto out;
381 destmap[offset] = l;
382 out:
383 return 0;
384}
385
386int MemStore::getattr(const coll_t& cid, const ghobject_t& oid,
387 const char *name, bufferptr& value)
388{
389 CollectionHandle c = get_collection(cid);
390 if (!c)
391 return -ENOENT;
392 return getattr(c, oid, name, value);
393}
394
395int MemStore::getattr(CollectionHandle &c_, const ghobject_t& oid,
396 const char *name, bufferptr& value)
397{
398 Collection *c = static_cast<Collection*>(c_.get());
399 dout(10) << __func__ << " " << c->cid << " " << oid << " " << name << dendl;
400 if (!c->exists)
401 return -ENOENT;
402 ObjectRef o = c->get_object(oid);
403 if (!o)
404 return -ENOENT;
405 string k(name);
406 std::lock_guard<std::mutex> lock(o->xattr_mutex);
407 if (!o->xattr.count(k)) {
408 return -ENODATA;
409 }
410 value = o->xattr[k];
411 return 0;
412}
413
414int MemStore::getattrs(const coll_t& cid, const ghobject_t& oid,
415 map<string,bufferptr>& aset)
416{
417 CollectionHandle c = get_collection(cid);
418 if (!c)
419 return -ENOENT;
420 return getattrs(c, oid, aset);
421}
422
423int MemStore::getattrs(CollectionHandle &c_, const ghobject_t& oid,
424 map<string,bufferptr>& aset)
425{
426 Collection *c = static_cast<Collection*>(c_.get());
427 dout(10) << __func__ << " " << c->cid << " " << oid << dendl;
428 if (!c->exists)
429 return -ENOENT;
430
431 ObjectRef o = c->get_object(oid);
432 if (!o)
433 return -ENOENT;
434 std::lock_guard<std::mutex> lock(o->xattr_mutex);
435 aset = o->xattr;
436 return 0;
437}
438
439int MemStore::list_collections(vector<coll_t>& ls)
440{
441 dout(10) << __func__ << dendl;
442 RWLock::RLocker l(coll_lock);
443 for (ceph::unordered_map<coll_t,CollectionRef>::iterator p = coll_map.begin();
444 p != coll_map.end();
445 ++p) {
446 ls.push_back(p->first);
447 }
448 return 0;
449}
450
451bool MemStore::collection_exists(const coll_t& cid)
452{
453 dout(10) << __func__ << " " << cid << dendl;
454 RWLock::RLocker l(coll_lock);
455 return coll_map.count(cid);
456}
457
458int MemStore::collection_empty(const coll_t& cid, bool *empty)
459{
460 dout(10) << __func__ << " " << cid << dendl;
461 CollectionRef c = get_collection(cid);
462 if (!c)
463 return -ENOENT;
464 RWLock::RLocker l(c->lock);
465 *empty = c->object_map.empty();
466 return 0;
467}
468
469int MemStore::collection_bits(const coll_t& cid)
470{
471 dout(10) << __func__ << " " << cid << dendl;
472 CollectionRef c = get_collection(cid);
473 if (!c)
474 return -ENOENT;
475 RWLock::RLocker l(c->lock);
476 return c->bits;
477}
478
479int MemStore::collection_list(const coll_t& cid,
480 const ghobject_t& start,
481 const ghobject_t& end,
482 int max,
483 vector<ghobject_t> *ls, ghobject_t *next)
484{
485 CollectionRef c = get_collection(cid);
486 if (!c)
487 return -ENOENT;
488 RWLock::RLocker l(c->lock);
489
490 dout(10) << __func__ << " cid " << cid << " start " << start
491 << " end " << end << dendl;
492 map<ghobject_t,ObjectRef>::iterator p = c->object_map.lower_bound(start);
493 while (p != c->object_map.end() &&
494 ls->size() < (unsigned)max &&
495 p->first < end) {
496 ls->push_back(p->first);
497 ++p;
498 }
499 if (next != NULL) {
500 if (p == c->object_map.end())
501 *next = ghobject_t::get_max();
502 else
503 *next = p->first;
504 }
505 dout(10) << __func__ << " cid " << cid << " got " << ls->size() << dendl;
506 return 0;
507}
508
509int MemStore::omap_get(
510 const coll_t& cid, ///< [in] Collection containing oid
511 const ghobject_t &oid, ///< [in] Object containing omap
512 bufferlist *header, ///< [out] omap header
513 map<string, bufferlist> *out /// < [out] Key to value map
514 )
515{
516 dout(10) << __func__ << " " << cid << " " << oid << dendl;
517 CollectionRef c = get_collection(cid);
518 if (!c)
519 return -ENOENT;
520
521 ObjectRef o = c->get_object(oid);
522 if (!o)
523 return -ENOENT;
524 std::lock_guard<std::mutex> lock(o->omap_mutex);
525 *header = o->omap_header;
526 *out = o->omap;
527 return 0;
528}
529
530int MemStore::omap_get_header(
531 const coll_t& cid, ///< [in] Collection containing oid
532 const ghobject_t &oid, ///< [in] Object containing omap
533 bufferlist *header, ///< [out] omap header
534 bool allow_eio ///< [in] don't assert on eio
535 )
536{
537 dout(10) << __func__ << " " << cid << " " << oid << dendl;
538 CollectionRef c = get_collection(cid);
539 if (!c)
540 return -ENOENT;
541
542 ObjectRef o = c->get_object(oid);
543 if (!o)
544 return -ENOENT;
545 std::lock_guard<std::mutex> lock(o->omap_mutex);
546 *header = o->omap_header;
547 return 0;
548}
549
550int MemStore::omap_get_keys(
551 const coll_t& cid, ///< [in] Collection containing oid
552 const ghobject_t &oid, ///< [in] Object containing omap
553 set<string> *keys ///< [out] Keys defined on oid
554 )
555{
556 dout(10) << __func__ << " " << cid << " " << oid << dendl;
557 CollectionRef c = get_collection(cid);
558 if (!c)
559 return -ENOENT;
560
561 ObjectRef o = c->get_object(oid);
562 if (!o)
563 return -ENOENT;
564 std::lock_guard<std::mutex> lock(o->omap_mutex);
565 for (map<string,bufferlist>::iterator p = o->omap.begin();
566 p != o->omap.end();
567 ++p)
568 keys->insert(p->first);
569 return 0;
570}
571
572int MemStore::omap_get_values(
573 const coll_t& cid, ///< [in] Collection containing oid
574 const ghobject_t &oid, ///< [in] Object containing omap
575 const set<string> &keys, ///< [in] Keys to get
576 map<string, bufferlist> *out ///< [out] Returned keys and values
577 )
578{
579 dout(10) << __func__ << " " << cid << " " << oid << dendl;
580 CollectionRef c = get_collection(cid);
581 if (!c)
582 return -ENOENT;
583
584 ObjectRef o = c->get_object(oid);
585 if (!o)
586 return -ENOENT;
587 std::lock_guard<std::mutex> lock(o->omap_mutex);
588 for (set<string>::const_iterator p = keys.begin();
589 p != keys.end();
590 ++p) {
591 map<string,bufferlist>::iterator q = o->omap.find(*p);
592 if (q != o->omap.end())
593 out->insert(*q);
594 }
595 return 0;
596}
597
598int MemStore::omap_check_keys(
599 const coll_t& cid, ///< [in] Collection containing oid
600 const ghobject_t &oid, ///< [in] Object containing omap
601 const set<string> &keys, ///< [in] Keys to check
602 set<string> *out ///< [out] Subset of keys defined on oid
603 )
604{
605 dout(10) << __func__ << " " << cid << " " << oid << dendl;
606 CollectionRef c = get_collection(cid);
607 if (!c)
608 return -ENOENT;
609
610 ObjectRef o = c->get_object(oid);
611 if (!o)
612 return -ENOENT;
613 std::lock_guard<std::mutex> lock(o->omap_mutex);
614 for (set<string>::const_iterator p = keys.begin();
615 p != keys.end();
616 ++p) {
617 map<string,bufferlist>::iterator q = o->omap.find(*p);
618 if (q != o->omap.end())
619 out->insert(*p);
620 }
621 return 0;
622}
623
624class MemStore::OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
625 CollectionRef c;
626 ObjectRef o;
627 map<string,bufferlist>::iterator it;
628public:
629 OmapIteratorImpl(CollectionRef c, ObjectRef o)
630 : c(c), o(o), it(o->omap.begin()) {}
631
632 int seek_to_first() override {
633 std::lock_guard<std::mutex>(o->omap_mutex);
634 it = o->omap.begin();
635 return 0;
636 }
637 int upper_bound(const string &after) override {
638 std::lock_guard<std::mutex>(o->omap_mutex);
639 it = o->omap.upper_bound(after);
640 return 0;
641 }
642 int lower_bound(const string &to) override {
643 std::lock_guard<std::mutex>(o->omap_mutex);
644 it = o->omap.lower_bound(to);
645 return 0;
646 }
647 bool valid() override {
648 std::lock_guard<std::mutex>(o->omap_mutex);
649 return it != o->omap.end();
650 }
651 int next(bool validate=true) override {
652 std::lock_guard<std::mutex>(o->omap_mutex);
653 ++it;
654 return 0;
655 }
656 string key() override {
657 std::lock_guard<std::mutex>(o->omap_mutex);
658 return it->first;
659 }
660 bufferlist value() override {
661 std::lock_guard<std::mutex>(o->omap_mutex);
662 return it->second;
663 }
664 int status() override {
665 return 0;
666 }
667};
668
669ObjectMap::ObjectMapIterator MemStore::get_omap_iterator(const coll_t& cid,
670 const ghobject_t& oid)
671{
672 dout(10) << __func__ << " " << cid << " " << oid << dendl;
673 CollectionRef c = get_collection(cid);
674 if (!c)
675 return ObjectMap::ObjectMapIterator();
676
677 ObjectRef o = c->get_object(oid);
678 if (!o)
679 return ObjectMap::ObjectMapIterator();
680 return ObjectMap::ObjectMapIterator(new OmapIteratorImpl(c, o));
681}
682
683
684// ---------------
685// write operations
686
687int MemStore::queue_transactions(Sequencer *osr,
688 vector<Transaction>& tls,
689 TrackedOpRef op,
690 ThreadPool::TPHandle *handle)
691{
692 // because memstore operations are synchronous, we can implement the
693 // Sequencer with a mutex. this guarantees ordering on a given sequencer,
694 // while allowing operations on different sequencers to happen in parallel
695 struct OpSequencer : public Sequencer_impl {
696 OpSequencer(CephContext* cct) :
697 Sequencer_impl(cct) {}
698 std::mutex mutex;
699 void flush() override {}
700 bool flush_commit(Context*) override { return true; }
701 };
702
703 std::unique_lock<std::mutex> lock;
704 if (osr) {
705 if (!osr->p) {
706 osr->p = new OpSequencer(cct);
707 }
708 auto seq = static_cast<OpSequencer*>(osr->p.get());
709 lock = std::unique_lock<std::mutex>(seq->mutex);
710 }
711
712 for (vector<Transaction>::iterator p = tls.begin(); p != tls.end(); ++p) {
713 // poke the TPHandle heartbeat just to exercise that code path
714 if (handle)
715 handle->reset_tp_timeout();
716
717 _do_transaction(*p);
718 }
719
720 Context *on_apply = NULL, *on_apply_sync = NULL, *on_commit = NULL;
721 ObjectStore::Transaction::collect_contexts(tls, &on_apply, &on_commit,
722 &on_apply_sync);
723 if (on_apply_sync)
724 on_apply_sync->complete(0);
725 if (on_apply)
726 finisher.queue(on_apply);
727 if (on_commit)
728 finisher.queue(on_commit);
729 return 0;
730}
731
732void MemStore::_do_transaction(Transaction& t)
733{
734 Transaction::iterator i = t.begin();
735 int pos = 0;
736
737 while (i.have_op()) {
738 Transaction::Op *op = i.decode_op();
739 int r = 0;
740
741 switch (op->op) {
742 case Transaction::OP_NOP:
743 break;
744 case Transaction::OP_TOUCH:
745 {
746 coll_t cid = i.get_cid(op->cid);
747 ghobject_t oid = i.get_oid(op->oid);
748 r = _touch(cid, oid);
749 }
750 break;
751
752 case Transaction::OP_WRITE:
753 {
754 coll_t cid = i.get_cid(op->cid);
755 ghobject_t oid = i.get_oid(op->oid);
756 uint64_t off = op->off;
757 uint64_t len = op->len;
758 uint32_t fadvise_flags = i.get_fadvise_flags();
759 bufferlist bl;
760 i.decode_bl(bl);
761 r = _write(cid, oid, off, len, bl, fadvise_flags);
762 }
763 break;
764
765 case Transaction::OP_ZERO:
766 {
767 coll_t cid = i.get_cid(op->cid);
768 ghobject_t oid = i.get_oid(op->oid);
769 uint64_t off = op->off;
770 uint64_t len = op->len;
771 r = _zero(cid, oid, off, len);
772 }
773 break;
774
775 case Transaction::OP_TRIMCACHE:
776 {
777 // deprecated, no-op
778 }
779 break;
780
781 case Transaction::OP_TRUNCATE:
782 {
783 coll_t cid = i.get_cid(op->cid);
784 ghobject_t oid = i.get_oid(op->oid);
785 uint64_t off = op->off;
786 r = _truncate(cid, oid, off);
787 }
788 break;
789
790 case Transaction::OP_REMOVE:
791 {
792 coll_t cid = i.get_cid(op->cid);
793 ghobject_t oid = i.get_oid(op->oid);
794 r = _remove(cid, oid);
795 }
796 break;
797
798 case Transaction::OP_SETATTR:
799 {
800 coll_t cid = i.get_cid(op->cid);
801 ghobject_t oid = i.get_oid(op->oid);
802 string name = i.decode_string();
803 bufferlist bl;
804 i.decode_bl(bl);
805 map<string, bufferptr> to_set;
806 to_set[name] = bufferptr(bl.c_str(), bl.length());
807 r = _setattrs(cid, oid, to_set);
808 }
809 break;
810
811 case Transaction::OP_SETATTRS:
812 {
813 coll_t cid = i.get_cid(op->cid);
814 ghobject_t oid = i.get_oid(op->oid);
815 map<string, bufferptr> aset;
816 i.decode_attrset(aset);
817 r = _setattrs(cid, oid, aset);
818 }
819 break;
820
821 case Transaction::OP_RMATTR:
822 {
823 coll_t cid = i.get_cid(op->cid);
824 ghobject_t oid = i.get_oid(op->oid);
825 string name = i.decode_string();
826 r = _rmattr(cid, oid, name.c_str());
827 }
828 break;
829
830 case Transaction::OP_RMATTRS:
831 {
832 coll_t cid = i.get_cid(op->cid);
833 ghobject_t oid = i.get_oid(op->oid);
834 r = _rmattrs(cid, oid);
835 }
836 break;
837
838 case Transaction::OP_CLONE:
839 {
840 coll_t cid = i.get_cid(op->cid);
841 ghobject_t oid = i.get_oid(op->oid);
842 ghobject_t noid = i.get_oid(op->dest_oid);
843 r = _clone(cid, oid, noid);
844 }
845 break;
846
847 case Transaction::OP_CLONERANGE:
848 {
849 coll_t cid = i.get_cid(op->cid);
850 ghobject_t oid = i.get_oid(op->oid);
851 ghobject_t noid = i.get_oid(op->dest_oid);
852 uint64_t off = op->off;
853 uint64_t len = op->len;
854 r = _clone_range(cid, oid, noid, off, len, off);
855 }
856 break;
857
858 case Transaction::OP_CLONERANGE2:
859 {
860 coll_t cid = i.get_cid(op->cid);
861 ghobject_t oid = i.get_oid(op->oid);
862 ghobject_t noid = i.get_oid(op->dest_oid);
863 uint64_t srcoff = op->off;
864 uint64_t len = op->len;
865 uint64_t dstoff = op->dest_off;
866 r = _clone_range(cid, oid, noid, srcoff, len, dstoff);
867 }
868 break;
869
870 case Transaction::OP_MKCOLL:
871 {
872 coll_t cid = i.get_cid(op->cid);
873 r = _create_collection(cid, op->split_bits);
874 }
875 break;
876
877 case Transaction::OP_COLL_HINT:
878 {
879 coll_t cid = i.get_cid(op->cid);
880 uint32_t type = op->hint_type;
881 bufferlist hint;
882 i.decode_bl(hint);
883 bufferlist::iterator hiter = hint.begin();
884 if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) {
885 uint32_t pg_num;
886 uint64_t num_objs;
887 ::decode(pg_num, hiter);
888 ::decode(num_objs, hiter);
889 r = _collection_hint_expected_num_objs(cid, pg_num, num_objs);
890 } else {
891 // Ignore the hint
892 dout(10) << "Unrecognized collection hint type: " << type << dendl;
893 }
894 }
895 break;
896
897 case Transaction::OP_RMCOLL:
898 {
899 coll_t cid = i.get_cid(op->cid);
900 r = _destroy_collection(cid);
901 }
902 break;
903
904 case Transaction::OP_COLL_ADD:
905 {
906 coll_t ocid = i.get_cid(op->cid);
907 coll_t ncid = i.get_cid(op->dest_cid);
908 ghobject_t oid = i.get_oid(op->oid);
909 r = _collection_add(ncid, ocid, oid);
910 }
911 break;
912
913 case Transaction::OP_COLL_REMOVE:
914 {
915 coll_t cid = i.get_cid(op->cid);
916 ghobject_t oid = i.get_oid(op->oid);
917 r = _remove(cid, oid);
918 }
919 break;
920
921 case Transaction::OP_COLL_MOVE:
922 assert(0 == "deprecated");
923 break;
924
925 case Transaction::OP_COLL_MOVE_RENAME:
926 {
927 coll_t oldcid = i.get_cid(op->cid);
928 ghobject_t oldoid = i.get_oid(op->oid);
929 coll_t newcid = i.get_cid(op->dest_cid);
930 ghobject_t newoid = i.get_oid(op->dest_oid);
931 r = _collection_move_rename(oldcid, oldoid, newcid, newoid);
932 if (r == -ENOENT)
933 r = 0;
934 }
935 break;
936
937 case Transaction::OP_TRY_RENAME:
938 {
939 coll_t cid = i.get_cid(op->cid);
940 ghobject_t oldoid = i.get_oid(op->oid);
941 ghobject_t newoid = i.get_oid(op->dest_oid);
942 r = _collection_move_rename(cid, oldoid, cid, newoid);
943 if (r == -ENOENT)
944 r = 0;
945 }
946 break;
947
948 case Transaction::OP_COLL_SETATTR:
949 {
950 assert(0 == "not implemented");
951 }
952 break;
953
954 case Transaction::OP_COLL_RMATTR:
955 {
956 assert(0 == "not implemented");
957 }
958 break;
959
960 case Transaction::OP_COLL_RENAME:
961 {
962 assert(0 == "not implemented");
963 }
964 break;
965
966 case Transaction::OP_OMAP_CLEAR:
967 {
968 coll_t cid = i.get_cid(op->cid);
969 ghobject_t oid = i.get_oid(op->oid);
970 r = _omap_clear(cid, oid);
971 }
972 break;
973 case Transaction::OP_OMAP_SETKEYS:
974 {
975 coll_t cid = i.get_cid(op->cid);
976 ghobject_t oid = i.get_oid(op->oid);
977 bufferlist aset_bl;
978 i.decode_attrset_bl(&aset_bl);
979 r = _omap_setkeys(cid, oid, aset_bl);
980 }
981 break;
982 case Transaction::OP_OMAP_RMKEYS:
983 {
984 coll_t cid = i.get_cid(op->cid);
985 ghobject_t oid = i.get_oid(op->oid);
986 bufferlist keys_bl;
987 i.decode_keyset_bl(&keys_bl);
988 r = _omap_rmkeys(cid, oid, keys_bl);
989 }
990 break;
991 case Transaction::OP_OMAP_RMKEYRANGE:
992 {
993 coll_t cid = i.get_cid(op->cid);
994 ghobject_t oid = i.get_oid(op->oid);
995 string first, last;
996 first = i.decode_string();
997 last = i.decode_string();
998 r = _omap_rmkeyrange(cid, oid, first, last);
999 }
1000 break;
1001 case Transaction::OP_OMAP_SETHEADER:
1002 {
1003 coll_t cid = i.get_cid(op->cid);
1004 ghobject_t oid = i.get_oid(op->oid);
1005 bufferlist bl;
1006 i.decode_bl(bl);
1007 r = _omap_setheader(cid, oid, bl);
1008 }
1009 break;
1010 case Transaction::OP_SPLIT_COLLECTION:
1011 assert(0 == "deprecated");
1012 break;
1013 case Transaction::OP_SPLIT_COLLECTION2:
1014 {
1015 coll_t cid = i.get_cid(op->cid);
1016 uint32_t bits = op->split_bits;
1017 uint32_t rem = op->split_rem;
1018 coll_t dest = i.get_cid(op->dest_cid);
1019 r = _split_collection(cid, bits, rem, dest);
1020 }
1021 break;
1022
1023 case Transaction::OP_SETALLOCHINT:
1024 {
1025 r = 0;
1026 }
1027 break;
1028
1029 default:
1030 derr << "bad op " << op->op << dendl;
1031 ceph_abort();
1032 }
1033
1034 if (r < 0) {
1035 bool ok = false;
1036
1037 if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE ||
1038 op->op == Transaction::OP_CLONE ||
1039 op->op == Transaction::OP_CLONERANGE2 ||
1040 op->op == Transaction::OP_COLL_ADD))
1041 // -ENOENT is usually okay
1042 ok = true;
1043 if (r == -ENODATA)
1044 ok = true;
1045
1046 if (!ok) {
1047 const char *msg = "unexpected error code";
1048
1049 if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE ||
1050 op->op == Transaction::OP_CLONE ||
1051 op->op == Transaction::OP_CLONERANGE2))
1052 msg = "ENOENT on clone suggests osd bug";
1053
1054 if (r == -ENOSPC)
1055 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
1056 // by partially applying transactions.
1057 msg = "ENOSPC from MemStore, misconfigured cluster or insufficient memory";
1058
1059 if (r == -ENOTEMPTY) {
1060 msg = "ENOTEMPTY suggests garbage data in osd data dir";
1061 dump_all();
1062 }
1063
1064 derr << " error " << cpp_strerror(r) << " not handled on operation " << op->op
1065 << " (op " << pos << ", counting from 0)" << dendl;
1066 dout(0) << msg << dendl;
1067 dout(0) << " transaction dump:\n";
1068 JSONFormatter f(true);
1069 f.open_object_section("transaction");
1070 t.dump(&f);
1071 f.close_section();
1072 f.flush(*_dout);
1073 *_dout << dendl;
1074 assert(0 == "unexpected error");
1075 }
1076 }
1077
1078 ++pos;
1079 }
1080}
1081
1082int MemStore::_touch(const coll_t& cid, const ghobject_t& oid)
1083{
1084 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1085 CollectionRef c = get_collection(cid);
1086 if (!c)
1087 return -ENOENT;
1088
1089 c->get_or_create_object(oid);
1090 return 0;
1091}
1092
1093int MemStore::_write(const coll_t& cid, const ghobject_t& oid,
1094 uint64_t offset, size_t len, const bufferlist& bl,
1095 uint32_t fadvise_flags)
1096{
1097 dout(10) << __func__ << " " << cid << " " << oid << " "
1098 << offset << "~" << len << dendl;
1099 assert(len == bl.length());
1100
1101 CollectionRef c = get_collection(cid);
1102 if (!c)
1103 return -ENOENT;
1104
1105 ObjectRef o = c->get_or_create_object(oid);
1106 if (len > 0) {
1107 const ssize_t old_size = o->get_size();
1108 o->write(offset, bl);
1109 used_bytes += (o->get_size() - old_size);
1110 }
1111
1112 return 0;
1113}
1114
1115int MemStore::_zero(const coll_t& cid, const ghobject_t& oid,
1116 uint64_t offset, size_t len)
1117{
1118 dout(10) << __func__ << " " << cid << " " << oid << " " << offset << "~"
1119 << len << dendl;
1120 bufferlist bl;
1121 bl.append_zero(len);
1122 return _write(cid, oid, offset, len, bl);
1123}
1124
1125int MemStore::_truncate(const coll_t& cid, const ghobject_t& oid, uint64_t size)
1126{
1127 dout(10) << __func__ << " " << cid << " " << oid << " " << size << dendl;
1128 CollectionRef c = get_collection(cid);
1129 if (!c)
1130 return -ENOENT;
1131
1132 ObjectRef o = c->get_object(oid);
1133 if (!o)
1134 return -ENOENT;
1135 const ssize_t old_size = o->get_size();
1136 int r = o->truncate(size);
1137 used_bytes += (o->get_size() - old_size);
1138 return r;
1139}
1140
1141int MemStore::_remove(const coll_t& cid, const ghobject_t& oid)
1142{
1143 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1144 CollectionRef c = get_collection(cid);
1145 if (!c)
1146 return -ENOENT;
1147 RWLock::WLocker l(c->lock);
1148
1149 auto i = c->object_hash.find(oid);
1150 if (i == c->object_hash.end())
1151 return -ENOENT;
1152 used_bytes -= i->second->get_size();
1153 c->object_hash.erase(i);
1154 c->object_map.erase(oid);
1155
1156 return 0;
1157}
1158
1159int MemStore::_setattrs(const coll_t& cid, const ghobject_t& oid,
1160 map<string,bufferptr>& aset)
1161{
1162 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1163 CollectionRef c = get_collection(cid);
1164 if (!c)
1165 return -ENOENT;
1166
1167 ObjectRef o = c->get_object(oid);
1168 if (!o)
1169 return -ENOENT;
1170 std::lock_guard<std::mutex> lock(o->xattr_mutex);
1171 for (map<string,bufferptr>::const_iterator p = aset.begin(); p != aset.end(); ++p)
1172 o->xattr[p->first] = p->second;
1173 return 0;
1174}
1175
1176int MemStore::_rmattr(const coll_t& cid, const ghobject_t& oid, const char *name)
1177{
1178 dout(10) << __func__ << " " << cid << " " << oid << " " << name << dendl;
1179 CollectionRef c = get_collection(cid);
1180 if (!c)
1181 return -ENOENT;
1182
1183 ObjectRef o = c->get_object(oid);
1184 if (!o)
1185 return -ENOENT;
1186 std::lock_guard<std::mutex> lock(o->xattr_mutex);
1187 auto i = o->xattr.find(name);
1188 if (i == o->xattr.end())
1189 return -ENODATA;
1190 o->xattr.erase(i);
1191 return 0;
1192}
1193
1194int MemStore::_rmattrs(const coll_t& cid, const ghobject_t& oid)
1195{
1196 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1197 CollectionRef c = get_collection(cid);
1198 if (!c)
1199 return -ENOENT;
1200
1201 ObjectRef o = c->get_object(oid);
1202 if (!o)
1203 return -ENOENT;
1204 std::lock_guard<std::mutex> lock(o->xattr_mutex);
1205 o->xattr.clear();
1206 return 0;
1207}
1208
1209int MemStore::_clone(const coll_t& cid, const ghobject_t& oldoid,
1210 const ghobject_t& newoid)
1211{
1212 dout(10) << __func__ << " " << cid << " " << oldoid
1213 << " -> " << newoid << dendl;
1214 CollectionRef c = get_collection(cid);
1215 if (!c)
1216 return -ENOENT;
1217
1218 ObjectRef oo = c->get_object(oldoid);
1219 if (!oo)
1220 return -ENOENT;
1221 ObjectRef no = c->get_or_create_object(newoid);
1222 used_bytes += oo->get_size() - no->get_size();
1223 no->clone(oo.get(), 0, oo->get_size(), 0);
1224
1225 // take xattr and omap locks with std::lock()
1226 std::unique_lock<std::mutex>
1227 ox_lock(oo->xattr_mutex, std::defer_lock),
1228 nx_lock(no->xattr_mutex, std::defer_lock),
1229 oo_lock(oo->omap_mutex, std::defer_lock),
1230 no_lock(no->omap_mutex, std::defer_lock);
1231 std::lock(ox_lock, nx_lock, oo_lock, no_lock);
1232
1233 no->omap_header = oo->omap_header;
1234 no->omap = oo->omap;
1235 no->xattr = oo->xattr;
1236 return 0;
1237}
1238
1239int MemStore::_clone_range(const coll_t& cid, const ghobject_t& oldoid,
1240 const ghobject_t& newoid,
1241 uint64_t srcoff, uint64_t len, uint64_t dstoff)
1242{
1243 dout(10) << __func__ << " " << cid << " "
1244 << oldoid << " " << srcoff << "~" << len << " -> "
1245 << newoid << " " << dstoff << "~" << len
1246 << dendl;
1247 CollectionRef c = get_collection(cid);
1248 if (!c)
1249 return -ENOENT;
1250
1251 ObjectRef oo = c->get_object(oldoid);
1252 if (!oo)
1253 return -ENOENT;
1254 ObjectRef no = c->get_or_create_object(newoid);
1255 if (srcoff >= oo->get_size())
1256 return 0;
1257 if (srcoff + len >= oo->get_size())
1258 len = oo->get_size() - srcoff;
1259
1260 const ssize_t old_size = no->get_size();
1261 no->clone(oo.get(), srcoff, len, dstoff);
1262 used_bytes += (no->get_size() - old_size);
1263
1264 return len;
1265}
1266
1267int MemStore::_omap_clear(const coll_t& cid, const ghobject_t &oid)
1268{
1269 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1270 CollectionRef c = get_collection(cid);
1271 if (!c)
1272 return -ENOENT;
1273
1274 ObjectRef o = c->get_object(oid);
1275 if (!o)
1276 return -ENOENT;
1277 std::lock_guard<std::mutex> lock(o->omap_mutex);
1278 o->omap.clear();
1279 o->omap_header.clear();
1280 return 0;
1281}
1282
1283int MemStore::_omap_setkeys(const coll_t& cid, const ghobject_t &oid,
1284 bufferlist& aset_bl)
1285{
1286 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1287 CollectionRef c = get_collection(cid);
1288 if (!c)
1289 return -ENOENT;
1290
1291 ObjectRef o = c->get_object(oid);
1292 if (!o)
1293 return -ENOENT;
1294 std::lock_guard<std::mutex> lock(o->omap_mutex);
1295 bufferlist::iterator p = aset_bl.begin();
1296 __u32 num;
1297 ::decode(num, p);
1298 while (num--) {
1299 string key;
1300 ::decode(key, p);
1301 ::decode(o->omap[key], p);
1302 }
1303 return 0;
1304}
1305
1306int MemStore::_omap_rmkeys(const coll_t& cid, const ghobject_t &oid,
1307 bufferlist& keys_bl)
1308{
1309 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1310 CollectionRef c = get_collection(cid);
1311 if (!c)
1312 return -ENOENT;
1313
1314 ObjectRef o = c->get_object(oid);
1315 if (!o)
1316 return -ENOENT;
1317 std::lock_guard<std::mutex> lock(o->omap_mutex);
1318 bufferlist::iterator p = keys_bl.begin();
1319 __u32 num;
1320 ::decode(num, p);
1321 while (num--) {
1322 string key;
1323 ::decode(key, p);
1324 o->omap.erase(key);
1325 }
1326 return 0;
1327}
1328
1329int MemStore::_omap_rmkeyrange(const coll_t& cid, const ghobject_t &oid,
1330 const string& first, const string& last)
1331{
1332 dout(10) << __func__ << " " << cid << " " << oid << " " << first
1333 << " " << last << dendl;
1334 CollectionRef c = get_collection(cid);
1335 if (!c)
1336 return -ENOENT;
1337
1338 ObjectRef o = c->get_object(oid);
1339 if (!o)
1340 return -ENOENT;
1341 std::lock_guard<std::mutex> lock(o->omap_mutex);
1342 map<string,bufferlist>::iterator p = o->omap.lower_bound(first);
1343 map<string,bufferlist>::iterator e = o->omap.lower_bound(last);
1344 o->omap.erase(p, e);
1345 return 0;
1346}
1347
1348int MemStore::_omap_setheader(const coll_t& cid, const ghobject_t &oid,
1349 const bufferlist &bl)
1350{
1351 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1352 CollectionRef c = get_collection(cid);
1353 if (!c)
1354 return -ENOENT;
1355
1356 ObjectRef o = c->get_object(oid);
1357 if (!o)
1358 return -ENOENT;
1359 std::lock_guard<std::mutex> lock(o->omap_mutex);
1360 o->omap_header = bl;
1361 return 0;
1362}
1363
1364int MemStore::_create_collection(const coll_t& cid, int bits)
1365{
1366 dout(10) << __func__ << " " << cid << dendl;
1367 RWLock::WLocker l(coll_lock);
1368 auto result = coll_map.insert(std::make_pair(cid, CollectionRef()));
1369 if (!result.second)
1370 return -EEXIST;
1371 result.first->second.reset(new Collection(cct, cid));
1372 result.first->second->bits = bits;
1373 return 0;
1374}
1375
1376int MemStore::_destroy_collection(const coll_t& cid)
1377{
1378 dout(10) << __func__ << " " << cid << dendl;
1379 RWLock::WLocker l(coll_lock);
1380 ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
1381 if (cp == coll_map.end())
1382 return -ENOENT;
1383 {
1384 RWLock::RLocker l2(cp->second->lock);
1385 if (!cp->second->object_map.empty())
1386 return -ENOTEMPTY;
1387 cp->second->exists = false;
1388 }
1389 used_bytes -= cp->second->used_bytes();
1390 coll_map.erase(cp);
1391 return 0;
1392}
1393
1394int MemStore::_collection_add(const coll_t& cid, const coll_t& ocid, const ghobject_t& oid)
1395{
1396 dout(10) << __func__ << " " << cid << " " << ocid << " " << oid << dendl;
1397 CollectionRef c = get_collection(cid);
1398 if (!c)
1399 return -ENOENT;
1400 CollectionRef oc = get_collection(ocid);
1401 if (!oc)
1402 return -ENOENT;
1403 RWLock::WLocker l1(MIN(&(*c), &(*oc))->lock);
1404 RWLock::WLocker l2(MAX(&(*c), &(*oc))->lock);
1405
1406 if (c->object_hash.count(oid))
1407 return -EEXIST;
1408 if (oc->object_hash.count(oid) == 0)
1409 return -ENOENT;
1410 ObjectRef o = oc->object_hash[oid];
1411 c->object_map[oid] = o;
1412 c->object_hash[oid] = o;
1413 return 0;
1414}
1415
1416int MemStore::_collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid,
1417 coll_t cid, const ghobject_t& oid)
1418{
1419 dout(10) << __func__ << " " << oldcid << " " << oldoid << " -> "
1420 << cid << " " << oid << dendl;
1421 CollectionRef c = get_collection(cid);
1422 if (!c)
1423 return -ENOENT;
1424 CollectionRef oc = get_collection(oldcid);
1425 if (!oc)
1426 return -ENOENT;
1427
1428 // note: c and oc may be the same
1429 assert(&(*c) == &(*oc));
1430 c->lock.get_write();
1431
1432 int r = -EEXIST;
1433 if (c->object_hash.count(oid))
1434 goto out;
1435 r = -ENOENT;
1436 if (oc->object_hash.count(oldoid) == 0)
1437 goto out;
1438 {
1439 ObjectRef o = oc->object_hash[oldoid];
1440 c->object_map[oid] = o;
1441 c->object_hash[oid] = o;
1442 oc->object_map.erase(oldoid);
1443 oc->object_hash.erase(oldoid);
1444 }
1445 r = 0;
1446 out:
1447 c->lock.put_write();
1448 return r;
1449}
1450
1451int MemStore::_split_collection(const coll_t& cid, uint32_t bits, uint32_t match,
1452 coll_t dest)
1453{
1454 dout(10) << __func__ << " " << cid << " " << bits << " " << match << " "
1455 << dest << dendl;
1456 CollectionRef sc = get_collection(cid);
1457 if (!sc)
1458 return -ENOENT;
1459 CollectionRef dc = get_collection(dest);
1460 if (!dc)
1461 return -ENOENT;
1462 RWLock::WLocker l1(MIN(&(*sc), &(*dc))->lock);
1463 RWLock::WLocker l2(MAX(&(*sc), &(*dc))->lock);
1464
1465 map<ghobject_t,ObjectRef>::iterator p = sc->object_map.begin();
1466 while (p != sc->object_map.end()) {
1467 if (p->first.match(bits, match)) {
1468 dout(20) << " moving " << p->first << dendl;
1469 dc->object_map.insert(make_pair(p->first, p->second));
1470 dc->object_hash.insert(make_pair(p->first, p->second));
1471 sc->object_hash.erase(p->first);
1472 sc->object_map.erase(p++);
1473 } else {
1474 ++p;
1475 }
1476 }
1477
1478 sc->bits = bits;
1479 assert(dc->bits == (int)bits);
1480
1481 return 0;
1482}
1483namespace {
1484struct BufferlistObject : public MemStore::Object {
1485 Spinlock mutex;
1486 bufferlist data;
1487
1488 size_t get_size() const override { return data.length(); }
1489
1490 int read(uint64_t offset, uint64_t len, bufferlist &bl) override;
1491 int write(uint64_t offset, const bufferlist &bl) override;
1492 int clone(Object *src, uint64_t srcoff, uint64_t len,
1493 uint64_t dstoff) override;
1494 int truncate(uint64_t offset) override;
1495
1496 void encode(bufferlist& bl) const override {
1497 ENCODE_START(1, 1, bl);
1498 ::encode(data, bl);
1499 encode_base(bl);
1500 ENCODE_FINISH(bl);
1501 }
1502 void decode(bufferlist::iterator& p) override {
1503 DECODE_START(1, p);
1504 ::decode(data, p);
1505 decode_base(p);
1506 DECODE_FINISH(p);
1507 }
1508};
1509}
1510// BufferlistObject
1511int BufferlistObject::read(uint64_t offset, uint64_t len,
1512 bufferlist &bl)
1513{
1514 std::lock_guard<Spinlock> lock(mutex);
1515 bl.substr_of(data, offset, len);
1516 return bl.length();
1517}
1518
1519int BufferlistObject::write(uint64_t offset, const bufferlist &src)
1520{
1521 unsigned len = src.length();
1522
1523 std::lock_guard<Spinlock> lock(mutex);
1524
1525 // before
1526 bufferlist newdata;
1527 if (get_size() >= offset) {
1528 newdata.substr_of(data, 0, offset);
1529 } else {
1530 if (get_size()) {
1531 newdata.substr_of(data, 0, get_size());
1532 }
1533 newdata.append_zero(offset - get_size());
1534 }
1535
1536 newdata.append(src);
1537
1538 // after
1539 if (get_size() > offset + len) {
1540 bufferlist tail;
1541 tail.substr_of(data, offset + len, get_size() - (offset + len));
1542 newdata.append(tail);
1543 }
1544
1545 data.claim(newdata);
1546 return 0;
1547}
1548
1549int BufferlistObject::clone(Object *src, uint64_t srcoff,
1550 uint64_t len, uint64_t dstoff)
1551{
1552 auto srcbl = dynamic_cast<BufferlistObject*>(src);
1553 if (srcbl == nullptr)
1554 return -ENOTSUP;
1555
1556 bufferlist bl;
1557 {
1558 std::lock_guard<Spinlock> lock(srcbl->mutex);
1559 if (srcoff == dstoff && len == src->get_size()) {
1560 data = srcbl->data;
1561 return 0;
1562 }
1563 bl.substr_of(srcbl->data, srcoff, len);
1564 }
1565 return write(dstoff, bl);
1566}
1567
1568int BufferlistObject::truncate(uint64_t size)
1569{
1570 std::lock_guard<Spinlock> lock(mutex);
1571 if (get_size() > size) {
1572 bufferlist bl;
1573 bl.substr_of(data, 0, size);
1574 data.claim(bl);
1575 } else if (get_size() == size) {
1576 // do nothing
1577 } else {
1578 data.append_zero(size - get_size());
1579 }
1580 return 0;
1581}
1582
1583// PageSetObject
1584
1585struct MemStore::PageSetObject : public Object {
1586 PageSet data;
1587 uint64_t data_len;
1588#if defined(__GLIBCXX__)
1589 // use a thread-local vector for the pages returned by PageSet, so we
1590 // can avoid allocations in read/write()
1591 static thread_local PageSet::page_vector tls_pages;
1592#endif
1593
1594 explicit PageSetObject(size_t page_size) : data(page_size), data_len(0) {}
1595
1596 size_t get_size() const override { return data_len; }
1597
1598 int read(uint64_t offset, uint64_t len, bufferlist &bl) override;
1599 int write(uint64_t offset, const bufferlist &bl) override;
1600 int clone(Object *src, uint64_t srcoff, uint64_t len,
1601 uint64_t dstoff) override;
1602 int truncate(uint64_t offset) override;
1603
1604 void encode(bufferlist& bl) const override {
1605 ENCODE_START(1, 1, bl);
1606 ::encode(data_len, bl);
1607 data.encode(bl);
1608 encode_base(bl);
1609 ENCODE_FINISH(bl);
1610 }
1611 void decode(bufferlist::iterator& p) override {
1612 DECODE_START(1, p);
1613 ::decode(data_len, p);
1614 data.decode(p);
1615 decode_base(p);
1616 DECODE_FINISH(p);
1617 }
1618};
1619
1620#if defined(__GLIBCXX__)
1621// use a thread-local vector for the pages returned by PageSet, so we
1622// can avoid allocations in read/write()
1623thread_local PageSet::page_vector MemStore::PageSetObject::tls_pages;
1624#define DEFINE_PAGE_VECTOR(name)
1625#else
1626#define DEFINE_PAGE_VECTOR(name) PageSet::page_vector name;
1627#endif
1628
1629int MemStore::PageSetObject::read(uint64_t offset, uint64_t len, bufferlist& bl)
1630{
1631 const auto start = offset;
1632 const auto end = offset + len;
1633 auto remaining = len;
1634
1635 DEFINE_PAGE_VECTOR(tls_pages);
1636 data.get_range(offset, len, tls_pages);
1637
1638 // allocate a buffer for the data
1639 buffer::ptr buf(len);
1640
1641 auto p = tls_pages.begin();
1642 while (remaining) {
1643 // no more pages in range
1644 if (p == tls_pages.end() || (*p)->offset >= end) {
1645 buf.zero(offset - start, remaining);
1646 break;
1647 }
1648 auto page = *p;
1649
1650 // fill any holes between pages with zeroes
1651 if (page->offset > offset) {
1652 const auto count = std::min(remaining, page->offset - offset);
1653 buf.zero(offset - start, count);
1654 remaining -= count;
1655 offset = page->offset;
1656 if (!remaining)
1657 break;
1658 }
1659
1660 // read from page
1661 const auto page_offset = offset - page->offset;
1662 const auto count = min(remaining, data.get_page_size() - page_offset);
1663
1664 buf.copy_in(offset - start, count, page->data + page_offset);
1665
1666 remaining -= count;
1667 offset += count;
1668
1669 ++p;
1670 }
1671
1672 tls_pages.clear(); // drop page refs
1673
1674 bl.append(std::move(buf));
1675 return len;
1676}
1677
1678int MemStore::PageSetObject::write(uint64_t offset, const bufferlist &src)
1679{
1680 unsigned len = src.length();
1681
1682 DEFINE_PAGE_VECTOR(tls_pages);
1683 // make sure the page range is allocated
1684 data.alloc_range(offset, src.length(), tls_pages);
1685
1686 auto page = tls_pages.begin();
1687
1688 auto p = src.begin();
1689 while (len > 0) {
1690 unsigned page_offset = offset - (*page)->offset;
1691 unsigned pageoff = data.get_page_size() - page_offset;
1692 unsigned count = min(len, pageoff);
1693 p.copy(count, (*page)->data + page_offset);
1694 offset += count;
1695 len -= count;
1696 if (count == pageoff)
1697 ++page;
1698 }
1699 if (data_len < offset)
1700 data_len = offset;
1701 tls_pages.clear(); // drop page refs
1702 return 0;
1703}
1704
1705int MemStore::PageSetObject::clone(Object *src, uint64_t srcoff,
1706 uint64_t len, uint64_t dstoff)
1707{
1708 const int64_t delta = dstoff - srcoff;
1709
1710 auto &src_data = static_cast<PageSetObject*>(src)->data;
1711 const uint64_t src_page_size = src_data.get_page_size();
1712
1713 auto &dst_data = data;
1714 const auto dst_page_size = dst_data.get_page_size();
1715
1716 DEFINE_PAGE_VECTOR(tls_pages);
1717 PageSet::page_vector dst_pages;
1718
1719 while (len) {
1720 // limit to 16 pages at a time so tls_pages doesn't balloon in size
1721 auto count = std::min(len, (uint64_t)src_page_size * 16);
1722 src_data.get_range(srcoff, count, tls_pages);
1723
1724 // allocate the destination range
1725 // TODO: avoid allocating pages for holes in the source range
1726 dst_data.alloc_range(srcoff + delta, count, dst_pages);
1727 auto dst_iter = dst_pages.begin();
1728
1729 for (auto &src_page : tls_pages) {
1730 auto sbegin = std::max(srcoff, src_page->offset);
1731 auto send = std::min(srcoff + count, src_page->offset + src_page_size);
1732
1733 // zero-fill holes before src_page
1734 if (srcoff < sbegin) {
1735 while (dst_iter != dst_pages.end()) {
1736 auto &dst_page = *dst_iter;
1737 auto dbegin = std::max(srcoff + delta, dst_page->offset);
1738 auto dend = std::min(sbegin + delta, dst_page->offset + dst_page_size);
1739 std::fill(dst_page->data + dbegin - dst_page->offset,
1740 dst_page->data + dend - dst_page->offset, 0);
1741 if (dend < dst_page->offset + dst_page_size)
1742 break;
1743 ++dst_iter;
1744 }
1745 const auto c = sbegin - srcoff;
1746 count -= c;
1747 len -= c;
1748 }
1749
1750 // copy data from src page to dst pages
1751 while (dst_iter != dst_pages.end()) {
1752 auto &dst_page = *dst_iter;
1753 auto dbegin = std::max(sbegin + delta, dst_page->offset);
1754 auto dend = std::min(send + delta, dst_page->offset + dst_page_size);
1755
1756 std::copy(src_page->data + (dbegin - delta) - src_page->offset,
1757 src_page->data + (dend - delta) - src_page->offset,
1758 dst_page->data + dbegin - dst_page->offset);
1759 if (dend < dst_page->offset + dst_page_size)
1760 break;
1761 ++dst_iter;
1762 }
1763
1764 const auto c = send - sbegin;
1765 count -= c;
1766 len -= c;
1767 srcoff = send;
1768 dstoff = send + delta;
1769 }
1770 tls_pages.clear(); // drop page refs
1771
1772 // zero-fill holes after the last src_page
1773 if (count > 0) {
1774 while (dst_iter != dst_pages.end()) {
1775 auto &dst_page = *dst_iter;
1776 auto dbegin = std::max(dstoff, dst_page->offset);
1777 auto dend = std::min(dstoff + count, dst_page->offset + dst_page_size);
1778 std::fill(dst_page->data + dbegin - dst_page->offset,
1779 dst_page->data + dend - dst_page->offset, 0);
1780 ++dst_iter;
1781 }
1782 srcoff += count;
1783 dstoff += count;
1784 len -= count;
1785 }
1786 dst_pages.clear(); // drop page refs
1787 }
1788
1789 // update object size
1790 if (data_len < dstoff)
1791 data_len = dstoff;
1792 return 0;
1793}
1794
1795int MemStore::PageSetObject::truncate(uint64_t size)
1796{
1797 data.free_pages_after(size);
1798 data_len = size;
1799
1800 const auto page_size = data.get_page_size();
1801 const auto page_offset = size & ~(page_size-1);
1802 if (page_offset == size)
1803 return 0;
1804
1805 DEFINE_PAGE_VECTOR(tls_pages);
1806 // write zeroes to the rest of the last page
1807 data.get_range(page_offset, page_size, tls_pages);
1808 if (tls_pages.empty())
1809 return 0;
1810
1811 auto page = tls_pages.begin();
1812 auto data = (*page)->data;
1813 std::fill(data + (size - page_offset), data + page_size, 0);
1814 tls_pages.clear(); // drop page ref
1815 return 0;
1816}
1817
1818
1819MemStore::ObjectRef MemStore::Collection::create_object() const {
1820 if (use_page_set)
1821 return new PageSetObject(cct->_conf->memstore_page_size);
1822 return new BufferlistObject();
1823}