]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/memstore/MemStore.cc
4b1ca248a4353dec354eab5310b45f31ee343b73
[ceph.git] / ceph / src / os / memstore / MemStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14 #include "acconfig.h"
15
16 #ifdef HAVE_SYS_MOUNT_H
17 #include <sys/mount.h>
18 #endif
19
20 #ifdef HAVE_SYS_PARAM_H
21 #include <sys/param.h>
22 #endif
23
24 #include "include/types.h"
25 #include "include/stringify.h"
26 #include "include/unordered_map.h"
27 #include "include/memory.h"
28 #include "common/errno.h"
29 #include "MemStore.h"
30 #include "include/compat.h"
31
32 #define dout_context cct
33 #define dout_subsys ceph_subsys_filestore
34 #undef dout_prefix
35 #define dout_prefix *_dout << "memstore(" << path << ") "
36
37 // for comparing collections for lock ordering
38 bool operator>(const MemStore::CollectionRef& l,
39 const MemStore::CollectionRef& r)
40 {
41 return (unsigned long)l.get() > (unsigned long)r.get();
42 }
43
44
45 int MemStore::mount()
46 {
47 int r = _load();
48 if (r < 0)
49 return r;
50 finisher.start();
51 return 0;
52 }
53
54 int MemStore::umount()
55 {
56 finisher.wait_for_empty();
57 finisher.stop();
58 return _save();
59 }
60
61 int MemStore::_save()
62 {
63 dout(10) << __func__ << dendl;
64 dump_all();
65 set<coll_t> collections;
66 for (ceph::unordered_map<coll_t,CollectionRef>::iterator p = coll_map.begin();
67 p != coll_map.end();
68 ++p) {
69 dout(20) << __func__ << " coll " << p->first << " " << p->second << dendl;
70 collections.insert(p->first);
71 bufferlist bl;
72 assert(p->second);
73 p->second->encode(bl);
74 string fn = path + "/" + stringify(p->first);
75 int r = bl.write_file(fn.c_str());
76 if (r < 0)
77 return r;
78 }
79
80 string fn = path + "/collections";
81 bufferlist bl;
82 ::encode(collections, bl);
83 int r = bl.write_file(fn.c_str());
84 if (r < 0)
85 return r;
86
87 return 0;
88 }
89
90 void MemStore::dump_all()
91 {
92 Formatter *f = Formatter::create("json-pretty");
93 f->open_object_section("store");
94 dump(f);
95 f->close_section();
96 dout(0) << "dump:";
97 f->flush(*_dout);
98 *_dout << dendl;
99 delete f;
100 }
101
102 void MemStore::dump(Formatter *f)
103 {
104 f->open_array_section("collections");
105 for (ceph::unordered_map<coll_t,CollectionRef>::iterator p = coll_map.begin();
106 p != coll_map.end();
107 ++p) {
108 f->open_object_section("collection");
109 f->dump_string("name", stringify(p->first));
110
111 f->open_array_section("xattrs");
112 for (map<string,bufferptr>::iterator q = p->second->xattr.begin();
113 q != p->second->xattr.end();
114 ++q) {
115 f->open_object_section("xattr");
116 f->dump_string("name", q->first);
117 f->dump_int("length", q->second.length());
118 f->close_section();
119 }
120 f->close_section();
121
122 f->open_array_section("objects");
123 for (map<ghobject_t,ObjectRef>::iterator q = p->second->object_map.begin();
124 q != p->second->object_map.end();
125 ++q) {
126 f->open_object_section("object");
127 f->dump_string("name", stringify(q->first));
128 if (q->second)
129 q->second->dump(f);
130 f->close_section();
131 }
132 f->close_section();
133
134 f->close_section();
135 }
136 f->close_section();
137 }
138
139 int MemStore::_load()
140 {
141 dout(10) << __func__ << dendl;
142 bufferlist bl;
143 string fn = path + "/collections";
144 string err;
145 int r = bl.read_file(fn.c_str(), &err);
146 if (r < 0)
147 return r;
148
149 set<coll_t> collections;
150 bufferlist::iterator p = bl.begin();
151 ::decode(collections, p);
152
153 for (set<coll_t>::iterator q = collections.begin();
154 q != collections.end();
155 ++q) {
156 string fn = path + "/" + stringify(*q);
157 bufferlist cbl;
158 int r = cbl.read_file(fn.c_str(), &err);
159 if (r < 0)
160 return r;
161 CollectionRef c(new Collection(cct, *q));
162 bufferlist::iterator p = cbl.begin();
163 c->decode(p);
164 coll_map[*q] = c;
165 used_bytes += c->used_bytes();
166 }
167
168 dump_all();
169
170 return 0;
171 }
172
173 void MemStore::set_fsid(uuid_d u)
174 {
175 int r = write_meta("fs_fsid", stringify(u));
176 assert(r >= 0);
177 }
178
179 uuid_d MemStore::get_fsid()
180 {
181 string fsid_str;
182 int r = read_meta("fs_fsid", &fsid_str);
183 assert(r >= 0);
184 uuid_d uuid;
185 bool b = uuid.parse(fsid_str.c_str());
186 assert(b);
187 return uuid;
188 }
189
190 int MemStore::mkfs()
191 {
192 string fsid_str;
193 int r = read_meta("fs_fsid", &fsid_str);
194 if (r == -ENOENT) {
195 uuid_d fsid;
196 fsid.generate_random();
197 fsid_str = stringify(fsid);
198 r = write_meta("fs_fsid", fsid_str);
199 if (r < 0)
200 return r;
201 dout(1) << __func__ << " new fsid " << fsid_str << dendl;
202 } else if (r < 0) {
203 return r;
204 } else {
205 dout(1) << __func__ << " had fsid " << fsid_str << dendl;
206 }
207
208 string fn = path + "/collections";
209 derr << path << dendl;
210 bufferlist bl;
211 set<coll_t> collections;
212 ::encode(collections, bl);
213 r = bl.write_file(fn.c_str());
214 if (r < 0)
215 return r;
216
217 r = write_meta("type", "memstore");
218 if (r < 0)
219 return r;
220
221 return 0;
222 }
223
224 int MemStore::statfs(struct store_statfs_t *st)
225 {
226 dout(10) << __func__ << dendl;
227 st->reset();
228 st->total = cct->_conf->memstore_device_bytes;
229 st->available = MAX(int64_t(st->total) - int64_t(used_bytes), 0ll);
230 dout(10) << __func__ << ": used_bytes: " << used_bytes
231 << "/" << cct->_conf->memstore_device_bytes << dendl;
232 return 0;
233 }
234
235 objectstore_perf_stat_t MemStore::get_cur_stats()
236 {
237 // fixme
238 return objectstore_perf_stat_t();
239 }
240
241 MemStore::CollectionRef MemStore::get_collection(const coll_t& cid)
242 {
243 RWLock::RLocker l(coll_lock);
244 ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
245 if (cp == coll_map.end())
246 return CollectionRef();
247 return cp->second;
248 }
249
250
251 // ---------------
252 // read operations
253
254 bool MemStore::exists(const coll_t& cid, const ghobject_t& oid)
255 {
256 CollectionHandle c = get_collection(cid);
257 if (!c)
258 return false;
259 return exists(c, oid);
260 }
261
262 bool MemStore::exists(CollectionHandle &c_, const ghobject_t& oid)
263 {
264 Collection *c = static_cast<Collection*>(c_.get());
265 dout(10) << __func__ << " " << c->get_cid() << " " << oid << dendl;
266 if (!c->exists)
267 return false;
268
269 // Perform equivalent of c->get_object_(oid) != NULL. In C++11 the
270 // shared_ptr needs to be compared to nullptr.
271 return (bool)c->get_object(oid);
272 }
273
274 int MemStore::stat(
275 const coll_t& cid,
276 const ghobject_t& oid,
277 struct stat *st,
278 bool allow_eio)
279 {
280 CollectionHandle c = get_collection(cid);
281 if (!c)
282 return -ENOENT;
283 return stat(c, oid, st, allow_eio);
284 }
285
286 int MemStore::stat(
287 CollectionHandle &c_,
288 const ghobject_t& oid,
289 struct stat *st,
290 bool allow_eio)
291 {
292 Collection *c = static_cast<Collection*>(c_.get());
293 dout(10) << __func__ << " " << c->cid << " " << oid << dendl;
294 if (!c->exists)
295 return -ENOENT;
296 ObjectRef o = c->get_object(oid);
297 if (!o)
298 return -ENOENT;
299 st->st_size = o->get_size();
300 st->st_blksize = 4096;
301 st->st_blocks = (st->st_size + st->st_blksize - 1) / st->st_blksize;
302 st->st_nlink = 1;
303 return 0;
304 }
305
306 int MemStore::set_collection_opts(
307 const coll_t& cid,
308 const pool_opts_t& opts)
309 {
310 return -EOPNOTSUPP;
311 }
312
313 int MemStore::read(
314 const coll_t& cid,
315 const ghobject_t& oid,
316 uint64_t offset,
317 size_t len,
318 bufferlist& bl,
319 uint32_t op_flags,
320 bool allow_eio)
321 {
322 CollectionHandle c = get_collection(cid);
323 if (!c)
324 return -ENOENT;
325 return read(c, oid, offset, len, bl, op_flags, allow_eio);
326 }
327
328 int MemStore::read(
329 CollectionHandle &c_,
330 const ghobject_t& oid,
331 uint64_t offset,
332 size_t len,
333 bufferlist& bl,
334 uint32_t op_flags,
335 bool allow_eio)
336 {
337 Collection *c = static_cast<Collection*>(c_.get());
338 dout(10) << __func__ << " " << c->cid << " " << oid << " "
339 << offset << "~" << len << dendl;
340 if (!c->exists)
341 return -ENOENT;
342 ObjectRef o = c->get_object(oid);
343 if (!o)
344 return -ENOENT;
345 if (offset >= o->get_size())
346 return 0;
347 size_t l = len;
348 if (l == 0 && offset == 0) // note: len == 0 means read the entire object
349 l = o->get_size();
350 else if (offset + l > o->get_size())
351 l = o->get_size() - offset;
352 bl.clear();
353 return o->read(offset, l, bl);
354 }
355
356 int MemStore::fiemap(const coll_t& cid, const ghobject_t& oid,
357 uint64_t offset, size_t len, bufferlist& bl)
358 {
359 map<uint64_t, uint64_t> destmap;
360 int r = fiemap(cid, oid, offset, len, destmap);
361 if (r >= 0)
362 ::encode(destmap, bl);
363 return r;
364 }
365
366 int MemStore::fiemap(const coll_t& cid, const ghobject_t& oid,
367 uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap)
368 {
369 dout(10) << __func__ << " " << cid << " " << oid << " " << offset << "~"
370 << len << dendl;
371 CollectionRef c = get_collection(cid);
372 if (!c)
373 return -ENOENT;
374
375 ObjectRef o = c->get_object(oid);
376 if (!o)
377 return -ENOENT;
378 size_t l = len;
379 if (offset + l > o->get_size())
380 l = o->get_size() - offset;
381 if (offset >= o->get_size())
382 goto out;
383 destmap[offset] = l;
384 out:
385 return 0;
386 }
387
388 int MemStore::getattr(const coll_t& cid, const ghobject_t& oid,
389 const char *name, bufferptr& value)
390 {
391 CollectionHandle c = get_collection(cid);
392 if (!c)
393 return -ENOENT;
394 return getattr(c, oid, name, value);
395 }
396
397 int MemStore::getattr(CollectionHandle &c_, const ghobject_t& oid,
398 const char *name, bufferptr& value)
399 {
400 Collection *c = static_cast<Collection*>(c_.get());
401 dout(10) << __func__ << " " << c->cid << " " << oid << " " << name << dendl;
402 if (!c->exists)
403 return -ENOENT;
404 ObjectRef o = c->get_object(oid);
405 if (!o)
406 return -ENOENT;
407 string k(name);
408 std::lock_guard<std::mutex> lock(o->xattr_mutex);
409 if (!o->xattr.count(k)) {
410 return -ENODATA;
411 }
412 value = o->xattr[k];
413 return 0;
414 }
415
416 int MemStore::getattrs(const coll_t& cid, const ghobject_t& oid,
417 map<string,bufferptr>& aset)
418 {
419 CollectionHandle c = get_collection(cid);
420 if (!c)
421 return -ENOENT;
422 return getattrs(c, oid, aset);
423 }
424
425 int MemStore::getattrs(CollectionHandle &c_, const ghobject_t& oid,
426 map<string,bufferptr>& aset)
427 {
428 Collection *c = static_cast<Collection*>(c_.get());
429 dout(10) << __func__ << " " << c->cid << " " << oid << dendl;
430 if (!c->exists)
431 return -ENOENT;
432
433 ObjectRef o = c->get_object(oid);
434 if (!o)
435 return -ENOENT;
436 std::lock_guard<std::mutex> lock(o->xattr_mutex);
437 aset = o->xattr;
438 return 0;
439 }
440
441 int MemStore::list_collections(vector<coll_t>& ls)
442 {
443 dout(10) << __func__ << dendl;
444 RWLock::RLocker l(coll_lock);
445 for (ceph::unordered_map<coll_t,CollectionRef>::iterator p = coll_map.begin();
446 p != coll_map.end();
447 ++p) {
448 ls.push_back(p->first);
449 }
450 return 0;
451 }
452
453 bool MemStore::collection_exists(const coll_t& cid)
454 {
455 dout(10) << __func__ << " " << cid << dendl;
456 RWLock::RLocker l(coll_lock);
457 return coll_map.count(cid);
458 }
459
460 int MemStore::collection_empty(const coll_t& cid, bool *empty)
461 {
462 dout(10) << __func__ << " " << cid << dendl;
463 CollectionRef c = get_collection(cid);
464 if (!c)
465 return -ENOENT;
466 RWLock::RLocker l(c->lock);
467 *empty = c->object_map.empty();
468 return 0;
469 }
470
471 int MemStore::collection_bits(const coll_t& cid)
472 {
473 dout(10) << __func__ << " " << cid << dendl;
474 CollectionRef c = get_collection(cid);
475 if (!c)
476 return -ENOENT;
477 RWLock::RLocker l(c->lock);
478 return c->bits;
479 }
480
481 int MemStore::collection_list(const coll_t& cid,
482 const ghobject_t& start,
483 const ghobject_t& end,
484 int max,
485 vector<ghobject_t> *ls, ghobject_t *next)
486 {
487 CollectionRef c = get_collection(cid);
488 if (!c)
489 return -ENOENT;
490 RWLock::RLocker l(c->lock);
491
492 dout(10) << __func__ << " cid " << cid << " start " << start
493 << " end " << end << dendl;
494 map<ghobject_t,ObjectRef>::iterator p = c->object_map.lower_bound(start);
495 while (p != c->object_map.end() &&
496 ls->size() < (unsigned)max &&
497 p->first < end) {
498 ls->push_back(p->first);
499 ++p;
500 }
501 if (next != NULL) {
502 if (p == c->object_map.end())
503 *next = ghobject_t::get_max();
504 else
505 *next = p->first;
506 }
507 dout(10) << __func__ << " cid " << cid << " got " << ls->size() << dendl;
508 return 0;
509 }
510
511 int MemStore::omap_get(
512 const coll_t& cid, ///< [in] Collection containing oid
513 const ghobject_t &oid, ///< [in] Object containing omap
514 bufferlist *header, ///< [out] omap header
515 map<string, bufferlist> *out /// < [out] Key to value map
516 )
517 {
518 dout(10) << __func__ << " " << cid << " " << oid << dendl;
519 CollectionRef c = get_collection(cid);
520 if (!c)
521 return -ENOENT;
522
523 ObjectRef o = c->get_object(oid);
524 if (!o)
525 return -ENOENT;
526 std::lock_guard<std::mutex> lock(o->omap_mutex);
527 *header = o->omap_header;
528 *out = o->omap;
529 return 0;
530 }
531
532 int MemStore::omap_get_header(
533 const coll_t& cid, ///< [in] Collection containing oid
534 const ghobject_t &oid, ///< [in] Object containing omap
535 bufferlist *header, ///< [out] omap header
536 bool allow_eio ///< [in] don't assert on eio
537 )
538 {
539 dout(10) << __func__ << " " << cid << " " << oid << dendl;
540 CollectionRef c = get_collection(cid);
541 if (!c)
542 return -ENOENT;
543
544 ObjectRef o = c->get_object(oid);
545 if (!o)
546 return -ENOENT;
547 std::lock_guard<std::mutex> lock(o->omap_mutex);
548 *header = o->omap_header;
549 return 0;
550 }
551
552 int MemStore::omap_get_keys(
553 const coll_t& cid, ///< [in] Collection containing oid
554 const ghobject_t &oid, ///< [in] Object containing omap
555 set<string> *keys ///< [out] Keys defined on oid
556 )
557 {
558 dout(10) << __func__ << " " << cid << " " << oid << dendl;
559 CollectionRef c = get_collection(cid);
560 if (!c)
561 return -ENOENT;
562
563 ObjectRef o = c->get_object(oid);
564 if (!o)
565 return -ENOENT;
566 std::lock_guard<std::mutex> lock(o->omap_mutex);
567 for (map<string,bufferlist>::iterator p = o->omap.begin();
568 p != o->omap.end();
569 ++p)
570 keys->insert(p->first);
571 return 0;
572 }
573
574 int MemStore::omap_get_values(
575 const coll_t& cid, ///< [in] Collection containing oid
576 const ghobject_t &oid, ///< [in] Object containing omap
577 const set<string> &keys, ///< [in] Keys to get
578 map<string, bufferlist> *out ///< [out] Returned keys and values
579 )
580 {
581 dout(10) << __func__ << " " << cid << " " << oid << dendl;
582 CollectionRef c = get_collection(cid);
583 if (!c)
584 return -ENOENT;
585
586 ObjectRef o = c->get_object(oid);
587 if (!o)
588 return -ENOENT;
589 std::lock_guard<std::mutex> lock(o->omap_mutex);
590 for (set<string>::const_iterator p = keys.begin();
591 p != keys.end();
592 ++p) {
593 map<string,bufferlist>::iterator q = o->omap.find(*p);
594 if (q != o->omap.end())
595 out->insert(*q);
596 }
597 return 0;
598 }
599
600 int MemStore::omap_check_keys(
601 const coll_t& cid, ///< [in] Collection containing oid
602 const ghobject_t &oid, ///< [in] Object containing omap
603 const set<string> &keys, ///< [in] Keys to check
604 set<string> *out ///< [out] Subset of keys defined on oid
605 )
606 {
607 dout(10) << __func__ << " " << cid << " " << oid << dendl;
608 CollectionRef c = get_collection(cid);
609 if (!c)
610 return -ENOENT;
611
612 ObjectRef o = c->get_object(oid);
613 if (!o)
614 return -ENOENT;
615 std::lock_guard<std::mutex> lock(o->omap_mutex);
616 for (set<string>::const_iterator p = keys.begin();
617 p != keys.end();
618 ++p) {
619 map<string,bufferlist>::iterator q = o->omap.find(*p);
620 if (q != o->omap.end())
621 out->insert(*p);
622 }
623 return 0;
624 }
625
626 class MemStore::OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
627 CollectionRef c;
628 ObjectRef o;
629 map<string,bufferlist>::iterator it;
630 public:
631 OmapIteratorImpl(CollectionRef c, ObjectRef o)
632 : c(c), o(o), it(o->omap.begin()) {}
633
634 int seek_to_first() override {
635 std::lock_guard<std::mutex>(o->omap_mutex);
636 it = o->omap.begin();
637 return 0;
638 }
639 int upper_bound(const string &after) override {
640 std::lock_guard<std::mutex>(o->omap_mutex);
641 it = o->omap.upper_bound(after);
642 return 0;
643 }
644 int lower_bound(const string &to) override {
645 std::lock_guard<std::mutex>(o->omap_mutex);
646 it = o->omap.lower_bound(to);
647 return 0;
648 }
649 bool valid() override {
650 std::lock_guard<std::mutex>(o->omap_mutex);
651 return it != o->omap.end();
652 }
653 int next(bool validate=true) override {
654 std::lock_guard<std::mutex>(o->omap_mutex);
655 ++it;
656 return 0;
657 }
658 string key() override {
659 std::lock_guard<std::mutex>(o->omap_mutex);
660 return it->first;
661 }
662 bufferlist value() override {
663 std::lock_guard<std::mutex>(o->omap_mutex);
664 return it->second;
665 }
666 int status() override {
667 return 0;
668 }
669 };
670
671 ObjectMap::ObjectMapIterator MemStore::get_omap_iterator(const coll_t& cid,
672 const ghobject_t& oid)
673 {
674 dout(10) << __func__ << " " << cid << " " << oid << dendl;
675 CollectionRef c = get_collection(cid);
676 if (!c)
677 return ObjectMap::ObjectMapIterator();
678
679 ObjectRef o = c->get_object(oid);
680 if (!o)
681 return ObjectMap::ObjectMapIterator();
682 return ObjectMap::ObjectMapIterator(new OmapIteratorImpl(c, o));
683 }
684
685
686 // ---------------
687 // write operations
688
689 int MemStore::queue_transactions(Sequencer *osr,
690 vector<Transaction>& tls,
691 TrackedOpRef op,
692 ThreadPool::TPHandle *handle)
693 {
694 // because memstore operations are synchronous, we can implement the
695 // Sequencer with a mutex. this guarantees ordering on a given sequencer,
696 // while allowing operations on different sequencers to happen in parallel
697 struct OpSequencer : public Sequencer_impl {
698 OpSequencer(CephContext* cct) :
699 Sequencer_impl(cct) {}
700 std::mutex mutex;
701 void flush() override {}
702 bool flush_commit(Context*) override { return true; }
703 };
704
705 std::unique_lock<std::mutex> lock;
706 if (osr) {
707 if (!osr->p) {
708 osr->p = new OpSequencer(cct);
709 }
710 auto seq = static_cast<OpSequencer*>(osr->p.get());
711 lock = std::unique_lock<std::mutex>(seq->mutex);
712 }
713
714 for (vector<Transaction>::iterator p = tls.begin(); p != tls.end(); ++p) {
715 // poke the TPHandle heartbeat just to exercise that code path
716 if (handle)
717 handle->reset_tp_timeout();
718
719 _do_transaction(*p);
720 }
721
722 Context *on_apply = NULL, *on_apply_sync = NULL, *on_commit = NULL;
723 ObjectStore::Transaction::collect_contexts(tls, &on_apply, &on_commit,
724 &on_apply_sync);
725 if (on_apply_sync)
726 on_apply_sync->complete(0);
727 if (on_apply)
728 finisher.queue(on_apply);
729 if (on_commit)
730 finisher.queue(on_commit);
731 return 0;
732 }
733
734 void MemStore::_do_transaction(Transaction& t)
735 {
736 Transaction::iterator i = t.begin();
737 int pos = 0;
738
739 while (i.have_op()) {
740 Transaction::Op *op = i.decode_op();
741 int r = 0;
742
743 switch (op->op) {
744 case Transaction::OP_NOP:
745 break;
746 case Transaction::OP_TOUCH:
747 {
748 coll_t cid = i.get_cid(op->cid);
749 ghobject_t oid = i.get_oid(op->oid);
750 r = _touch(cid, oid);
751 }
752 break;
753
754 case Transaction::OP_WRITE:
755 {
756 coll_t cid = i.get_cid(op->cid);
757 ghobject_t oid = i.get_oid(op->oid);
758 uint64_t off = op->off;
759 uint64_t len = op->len;
760 uint32_t fadvise_flags = i.get_fadvise_flags();
761 bufferlist bl;
762 i.decode_bl(bl);
763 r = _write(cid, oid, off, len, bl, fadvise_flags);
764 }
765 break;
766
767 case Transaction::OP_ZERO:
768 {
769 coll_t cid = i.get_cid(op->cid);
770 ghobject_t oid = i.get_oid(op->oid);
771 uint64_t off = op->off;
772 uint64_t len = op->len;
773 r = _zero(cid, oid, off, len);
774 }
775 break;
776
777 case Transaction::OP_TRIMCACHE:
778 {
779 // deprecated, no-op
780 }
781 break;
782
783 case Transaction::OP_TRUNCATE:
784 {
785 coll_t cid = i.get_cid(op->cid);
786 ghobject_t oid = i.get_oid(op->oid);
787 uint64_t off = op->off;
788 r = _truncate(cid, oid, off);
789 }
790 break;
791
792 case Transaction::OP_REMOVE:
793 {
794 coll_t cid = i.get_cid(op->cid);
795 ghobject_t oid = i.get_oid(op->oid);
796 r = _remove(cid, oid);
797 }
798 break;
799
800 case Transaction::OP_SETATTR:
801 {
802 coll_t cid = i.get_cid(op->cid);
803 ghobject_t oid = i.get_oid(op->oid);
804 string name = i.decode_string();
805 bufferlist bl;
806 i.decode_bl(bl);
807 map<string, bufferptr> to_set;
808 to_set[name] = bufferptr(bl.c_str(), bl.length());
809 r = _setattrs(cid, oid, to_set);
810 }
811 break;
812
813 case Transaction::OP_SETATTRS:
814 {
815 coll_t cid = i.get_cid(op->cid);
816 ghobject_t oid = i.get_oid(op->oid);
817 map<string, bufferptr> aset;
818 i.decode_attrset(aset);
819 r = _setattrs(cid, oid, aset);
820 }
821 break;
822
823 case Transaction::OP_RMATTR:
824 {
825 coll_t cid = i.get_cid(op->cid);
826 ghobject_t oid = i.get_oid(op->oid);
827 string name = i.decode_string();
828 r = _rmattr(cid, oid, name.c_str());
829 }
830 break;
831
832 case Transaction::OP_RMATTRS:
833 {
834 coll_t cid = i.get_cid(op->cid);
835 ghobject_t oid = i.get_oid(op->oid);
836 r = _rmattrs(cid, oid);
837 }
838 break;
839
840 case Transaction::OP_CLONE:
841 {
842 coll_t cid = i.get_cid(op->cid);
843 ghobject_t oid = i.get_oid(op->oid);
844 ghobject_t noid = i.get_oid(op->dest_oid);
845 r = _clone(cid, oid, noid);
846 }
847 break;
848
849 case Transaction::OP_CLONERANGE:
850 {
851 coll_t cid = i.get_cid(op->cid);
852 ghobject_t oid = i.get_oid(op->oid);
853 ghobject_t noid = i.get_oid(op->dest_oid);
854 uint64_t off = op->off;
855 uint64_t len = op->len;
856 r = _clone_range(cid, oid, noid, off, len, off);
857 }
858 break;
859
860 case Transaction::OP_CLONERANGE2:
861 {
862 coll_t cid = i.get_cid(op->cid);
863 ghobject_t oid = i.get_oid(op->oid);
864 ghobject_t noid = i.get_oid(op->dest_oid);
865 uint64_t srcoff = op->off;
866 uint64_t len = op->len;
867 uint64_t dstoff = op->dest_off;
868 r = _clone_range(cid, oid, noid, srcoff, len, dstoff);
869 }
870 break;
871
872 case Transaction::OP_MKCOLL:
873 {
874 coll_t cid = i.get_cid(op->cid);
875 r = _create_collection(cid, op->split_bits);
876 }
877 break;
878
879 case Transaction::OP_COLL_HINT:
880 {
881 coll_t cid = i.get_cid(op->cid);
882 uint32_t type = op->hint_type;
883 bufferlist hint;
884 i.decode_bl(hint);
885 bufferlist::iterator hiter = hint.begin();
886 if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) {
887 uint32_t pg_num;
888 uint64_t num_objs;
889 ::decode(pg_num, hiter);
890 ::decode(num_objs, hiter);
891 r = _collection_hint_expected_num_objs(cid, pg_num, num_objs);
892 } else {
893 // Ignore the hint
894 dout(10) << "Unrecognized collection hint type: " << type << dendl;
895 }
896 }
897 break;
898
899 case Transaction::OP_RMCOLL:
900 {
901 coll_t cid = i.get_cid(op->cid);
902 r = _destroy_collection(cid);
903 }
904 break;
905
906 case Transaction::OP_COLL_ADD:
907 {
908 coll_t ocid = i.get_cid(op->cid);
909 coll_t ncid = i.get_cid(op->dest_cid);
910 ghobject_t oid = i.get_oid(op->oid);
911 r = _collection_add(ncid, ocid, oid);
912 }
913 break;
914
915 case Transaction::OP_COLL_REMOVE:
916 {
917 coll_t cid = i.get_cid(op->cid);
918 ghobject_t oid = i.get_oid(op->oid);
919 r = _remove(cid, oid);
920 }
921 break;
922
923 case Transaction::OP_COLL_MOVE:
924 assert(0 == "deprecated");
925 break;
926
927 case Transaction::OP_COLL_MOVE_RENAME:
928 {
929 coll_t oldcid = i.get_cid(op->cid);
930 ghobject_t oldoid = i.get_oid(op->oid);
931 coll_t newcid = i.get_cid(op->dest_cid);
932 ghobject_t newoid = i.get_oid(op->dest_oid);
933 r = _collection_move_rename(oldcid, oldoid, newcid, newoid);
934 if (r == -ENOENT)
935 r = 0;
936 }
937 break;
938
939 case Transaction::OP_TRY_RENAME:
940 {
941 coll_t cid = i.get_cid(op->cid);
942 ghobject_t oldoid = i.get_oid(op->oid);
943 ghobject_t newoid = i.get_oid(op->dest_oid);
944 r = _collection_move_rename(cid, oldoid, cid, newoid);
945 if (r == -ENOENT)
946 r = 0;
947 }
948 break;
949
950 case Transaction::OP_COLL_SETATTR:
951 {
952 assert(0 == "not implemented");
953 }
954 break;
955
956 case Transaction::OP_COLL_RMATTR:
957 {
958 assert(0 == "not implemented");
959 }
960 break;
961
962 case Transaction::OP_COLL_RENAME:
963 {
964 assert(0 == "not implemented");
965 }
966 break;
967
968 case Transaction::OP_OMAP_CLEAR:
969 {
970 coll_t cid = i.get_cid(op->cid);
971 ghobject_t oid = i.get_oid(op->oid);
972 r = _omap_clear(cid, oid);
973 }
974 break;
975 case Transaction::OP_OMAP_SETKEYS:
976 {
977 coll_t cid = i.get_cid(op->cid);
978 ghobject_t oid = i.get_oid(op->oid);
979 bufferlist aset_bl;
980 i.decode_attrset_bl(&aset_bl);
981 r = _omap_setkeys(cid, oid, aset_bl);
982 }
983 break;
984 case Transaction::OP_OMAP_RMKEYS:
985 {
986 coll_t cid = i.get_cid(op->cid);
987 ghobject_t oid = i.get_oid(op->oid);
988 bufferlist keys_bl;
989 i.decode_keyset_bl(&keys_bl);
990 r = _omap_rmkeys(cid, oid, keys_bl);
991 }
992 break;
993 case Transaction::OP_OMAP_RMKEYRANGE:
994 {
995 coll_t cid = i.get_cid(op->cid);
996 ghobject_t oid = i.get_oid(op->oid);
997 string first, last;
998 first = i.decode_string();
999 last = i.decode_string();
1000 r = _omap_rmkeyrange(cid, oid, first, last);
1001 }
1002 break;
1003 case Transaction::OP_OMAP_SETHEADER:
1004 {
1005 coll_t cid = i.get_cid(op->cid);
1006 ghobject_t oid = i.get_oid(op->oid);
1007 bufferlist bl;
1008 i.decode_bl(bl);
1009 r = _omap_setheader(cid, oid, bl);
1010 }
1011 break;
1012 case Transaction::OP_SPLIT_COLLECTION:
1013 assert(0 == "deprecated");
1014 break;
1015 case Transaction::OP_SPLIT_COLLECTION2:
1016 {
1017 coll_t cid = i.get_cid(op->cid);
1018 uint32_t bits = op->split_bits;
1019 uint32_t rem = op->split_rem;
1020 coll_t dest = i.get_cid(op->dest_cid);
1021 r = _split_collection(cid, bits, rem, dest);
1022 }
1023 break;
1024
1025 case Transaction::OP_SETALLOCHINT:
1026 {
1027 r = 0;
1028 }
1029 break;
1030
1031 default:
1032 derr << "bad op " << op->op << dendl;
1033 ceph_abort();
1034 }
1035
1036 if (r < 0) {
1037 bool ok = false;
1038
1039 if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE ||
1040 op->op == Transaction::OP_CLONE ||
1041 op->op == Transaction::OP_CLONERANGE2 ||
1042 op->op == Transaction::OP_COLL_ADD))
1043 // -ENOENT is usually okay
1044 ok = true;
1045 if (r == -ENODATA)
1046 ok = true;
1047
1048 if (!ok) {
1049 const char *msg = "unexpected error code";
1050
1051 if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE ||
1052 op->op == Transaction::OP_CLONE ||
1053 op->op == Transaction::OP_CLONERANGE2))
1054 msg = "ENOENT on clone suggests osd bug";
1055
1056 if (r == -ENOSPC)
1057 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
1058 // by partially applying transactions.
1059 msg = "ENOSPC from MemStore, misconfigured cluster or insufficient memory";
1060
1061 if (r == -ENOTEMPTY) {
1062 msg = "ENOTEMPTY suggests garbage data in osd data dir";
1063 dump_all();
1064 }
1065
1066 derr << " error " << cpp_strerror(r) << " not handled on operation " << op->op
1067 << " (op " << pos << ", counting from 0)" << dendl;
1068 dout(0) << msg << dendl;
1069 dout(0) << " transaction dump:\n";
1070 JSONFormatter f(true);
1071 f.open_object_section("transaction");
1072 t.dump(&f);
1073 f.close_section();
1074 f.flush(*_dout);
1075 *_dout << dendl;
1076 assert(0 == "unexpected error");
1077 }
1078 }
1079
1080 ++pos;
1081 }
1082 }
1083
1084 int MemStore::_touch(const coll_t& cid, const ghobject_t& oid)
1085 {
1086 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1087 CollectionRef c = get_collection(cid);
1088 if (!c)
1089 return -ENOENT;
1090
1091 c->get_or_create_object(oid);
1092 return 0;
1093 }
1094
1095 int MemStore::_write(const coll_t& cid, const ghobject_t& oid,
1096 uint64_t offset, size_t len, const bufferlist& bl,
1097 uint32_t fadvise_flags)
1098 {
1099 dout(10) << __func__ << " " << cid << " " << oid << " "
1100 << offset << "~" << len << dendl;
1101 assert(len == bl.length());
1102
1103 CollectionRef c = get_collection(cid);
1104 if (!c)
1105 return -ENOENT;
1106
1107 ObjectRef o = c->get_or_create_object(oid);
1108 if (len > 0) {
1109 const ssize_t old_size = o->get_size();
1110 o->write(offset, bl);
1111 used_bytes += (o->get_size() - old_size);
1112 }
1113
1114 return 0;
1115 }
1116
1117 int MemStore::_zero(const coll_t& cid, const ghobject_t& oid,
1118 uint64_t offset, size_t len)
1119 {
1120 dout(10) << __func__ << " " << cid << " " << oid << " " << offset << "~"
1121 << len << dendl;
1122 bufferlist bl;
1123 bl.append_zero(len);
1124 return _write(cid, oid, offset, len, bl);
1125 }
1126
1127 int MemStore::_truncate(const coll_t& cid, const ghobject_t& oid, uint64_t size)
1128 {
1129 dout(10) << __func__ << " " << cid << " " << oid << " " << size << dendl;
1130 CollectionRef c = get_collection(cid);
1131 if (!c)
1132 return -ENOENT;
1133
1134 ObjectRef o = c->get_object(oid);
1135 if (!o)
1136 return -ENOENT;
1137 const ssize_t old_size = o->get_size();
1138 int r = o->truncate(size);
1139 used_bytes += (o->get_size() - old_size);
1140 return r;
1141 }
1142
1143 int MemStore::_remove(const coll_t& cid, const ghobject_t& oid)
1144 {
1145 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1146 CollectionRef c = get_collection(cid);
1147 if (!c)
1148 return -ENOENT;
1149 RWLock::WLocker l(c->lock);
1150
1151 auto i = c->object_hash.find(oid);
1152 if (i == c->object_hash.end())
1153 return -ENOENT;
1154 used_bytes -= i->second->get_size();
1155 c->object_hash.erase(i);
1156 c->object_map.erase(oid);
1157
1158 return 0;
1159 }
1160
1161 int MemStore::_setattrs(const coll_t& cid, const ghobject_t& oid,
1162 map<string,bufferptr>& aset)
1163 {
1164 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1165 CollectionRef c = get_collection(cid);
1166 if (!c)
1167 return -ENOENT;
1168
1169 ObjectRef o = c->get_object(oid);
1170 if (!o)
1171 return -ENOENT;
1172 std::lock_guard<std::mutex> lock(o->xattr_mutex);
1173 for (map<string,bufferptr>::const_iterator p = aset.begin(); p != aset.end(); ++p)
1174 o->xattr[p->first] = p->second;
1175 return 0;
1176 }
1177
1178 int MemStore::_rmattr(const coll_t& cid, const ghobject_t& oid, const char *name)
1179 {
1180 dout(10) << __func__ << " " << cid << " " << oid << " " << name << dendl;
1181 CollectionRef c = get_collection(cid);
1182 if (!c)
1183 return -ENOENT;
1184
1185 ObjectRef o = c->get_object(oid);
1186 if (!o)
1187 return -ENOENT;
1188 std::lock_guard<std::mutex> lock(o->xattr_mutex);
1189 auto i = o->xattr.find(name);
1190 if (i == o->xattr.end())
1191 return -ENODATA;
1192 o->xattr.erase(i);
1193 return 0;
1194 }
1195
1196 int MemStore::_rmattrs(const coll_t& cid, const ghobject_t& oid)
1197 {
1198 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1199 CollectionRef c = get_collection(cid);
1200 if (!c)
1201 return -ENOENT;
1202
1203 ObjectRef o = c->get_object(oid);
1204 if (!o)
1205 return -ENOENT;
1206 std::lock_guard<std::mutex> lock(o->xattr_mutex);
1207 o->xattr.clear();
1208 return 0;
1209 }
1210
1211 int MemStore::_clone(const coll_t& cid, const ghobject_t& oldoid,
1212 const ghobject_t& newoid)
1213 {
1214 dout(10) << __func__ << " " << cid << " " << oldoid
1215 << " -> " << newoid << dendl;
1216 CollectionRef c = get_collection(cid);
1217 if (!c)
1218 return -ENOENT;
1219
1220 ObjectRef oo = c->get_object(oldoid);
1221 if (!oo)
1222 return -ENOENT;
1223 ObjectRef no = c->get_or_create_object(newoid);
1224 used_bytes += oo->get_size() - no->get_size();
1225 no->clone(oo.get(), 0, oo->get_size(), 0);
1226
1227 // take xattr and omap locks with std::lock()
1228 std::unique_lock<std::mutex>
1229 ox_lock(oo->xattr_mutex, std::defer_lock),
1230 nx_lock(no->xattr_mutex, std::defer_lock),
1231 oo_lock(oo->omap_mutex, std::defer_lock),
1232 no_lock(no->omap_mutex, std::defer_lock);
1233 std::lock(ox_lock, nx_lock, oo_lock, no_lock);
1234
1235 no->omap_header = oo->omap_header;
1236 no->omap = oo->omap;
1237 no->xattr = oo->xattr;
1238 return 0;
1239 }
1240
1241 int MemStore::_clone_range(const coll_t& cid, const ghobject_t& oldoid,
1242 const ghobject_t& newoid,
1243 uint64_t srcoff, uint64_t len, uint64_t dstoff)
1244 {
1245 dout(10) << __func__ << " " << cid << " "
1246 << oldoid << " " << srcoff << "~" << len << " -> "
1247 << newoid << " " << dstoff << "~" << len
1248 << dendl;
1249 CollectionRef c = get_collection(cid);
1250 if (!c)
1251 return -ENOENT;
1252
1253 ObjectRef oo = c->get_object(oldoid);
1254 if (!oo)
1255 return -ENOENT;
1256 ObjectRef no = c->get_or_create_object(newoid);
1257 if (srcoff >= oo->get_size())
1258 return 0;
1259 if (srcoff + len >= oo->get_size())
1260 len = oo->get_size() - srcoff;
1261
1262 const ssize_t old_size = no->get_size();
1263 no->clone(oo.get(), srcoff, len, dstoff);
1264 used_bytes += (no->get_size() - old_size);
1265
1266 return len;
1267 }
1268
1269 int MemStore::_omap_clear(const coll_t& cid, const ghobject_t &oid)
1270 {
1271 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1272 CollectionRef c = get_collection(cid);
1273 if (!c)
1274 return -ENOENT;
1275
1276 ObjectRef o = c->get_object(oid);
1277 if (!o)
1278 return -ENOENT;
1279 std::lock_guard<std::mutex> lock(o->omap_mutex);
1280 o->omap.clear();
1281 o->omap_header.clear();
1282 return 0;
1283 }
1284
1285 int MemStore::_omap_setkeys(const coll_t& cid, const ghobject_t &oid,
1286 bufferlist& aset_bl)
1287 {
1288 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1289 CollectionRef c = get_collection(cid);
1290 if (!c)
1291 return -ENOENT;
1292
1293 ObjectRef o = c->get_object(oid);
1294 if (!o)
1295 return -ENOENT;
1296 std::lock_guard<std::mutex> lock(o->omap_mutex);
1297 bufferlist::iterator p = aset_bl.begin();
1298 __u32 num;
1299 ::decode(num, p);
1300 while (num--) {
1301 string key;
1302 ::decode(key, p);
1303 ::decode(o->omap[key], p);
1304 }
1305 return 0;
1306 }
1307
1308 int MemStore::_omap_rmkeys(const coll_t& cid, const ghobject_t &oid,
1309 bufferlist& keys_bl)
1310 {
1311 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1312 CollectionRef c = get_collection(cid);
1313 if (!c)
1314 return -ENOENT;
1315
1316 ObjectRef o = c->get_object(oid);
1317 if (!o)
1318 return -ENOENT;
1319 std::lock_guard<std::mutex> lock(o->omap_mutex);
1320 bufferlist::iterator p = keys_bl.begin();
1321 __u32 num;
1322 ::decode(num, p);
1323 while (num--) {
1324 string key;
1325 ::decode(key, p);
1326 o->omap.erase(key);
1327 }
1328 return 0;
1329 }
1330
1331 int MemStore::_omap_rmkeyrange(const coll_t& cid, const ghobject_t &oid,
1332 const string& first, const string& last)
1333 {
1334 dout(10) << __func__ << " " << cid << " " << oid << " " << first
1335 << " " << last << dendl;
1336 CollectionRef c = get_collection(cid);
1337 if (!c)
1338 return -ENOENT;
1339
1340 ObjectRef o = c->get_object(oid);
1341 if (!o)
1342 return -ENOENT;
1343 std::lock_guard<std::mutex> lock(o->omap_mutex);
1344 map<string,bufferlist>::iterator p = o->omap.lower_bound(first);
1345 map<string,bufferlist>::iterator e = o->omap.lower_bound(last);
1346 o->omap.erase(p, e);
1347 return 0;
1348 }
1349
1350 int MemStore::_omap_setheader(const coll_t& cid, const ghobject_t &oid,
1351 const bufferlist &bl)
1352 {
1353 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1354 CollectionRef c = get_collection(cid);
1355 if (!c)
1356 return -ENOENT;
1357
1358 ObjectRef o = c->get_object(oid);
1359 if (!o)
1360 return -ENOENT;
1361 std::lock_guard<std::mutex> lock(o->omap_mutex);
1362 o->omap_header = bl;
1363 return 0;
1364 }
1365
1366 int MemStore::_create_collection(const coll_t& cid, int bits)
1367 {
1368 dout(10) << __func__ << " " << cid << dendl;
1369 RWLock::WLocker l(coll_lock);
1370 auto result = coll_map.insert(std::make_pair(cid, CollectionRef()));
1371 if (!result.second)
1372 return -EEXIST;
1373 result.first->second.reset(new Collection(cct, cid));
1374 result.first->second->bits = bits;
1375 return 0;
1376 }
1377
1378 int MemStore::_destroy_collection(const coll_t& cid)
1379 {
1380 dout(10) << __func__ << " " << cid << dendl;
1381 RWLock::WLocker l(coll_lock);
1382 ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
1383 if (cp == coll_map.end())
1384 return -ENOENT;
1385 {
1386 RWLock::RLocker l2(cp->second->lock);
1387 if (!cp->second->object_map.empty())
1388 return -ENOTEMPTY;
1389 cp->second->exists = false;
1390 }
1391 used_bytes -= cp->second->used_bytes();
1392 coll_map.erase(cp);
1393 return 0;
1394 }
1395
1396 int MemStore::_collection_add(const coll_t& cid, const coll_t& ocid, const ghobject_t& oid)
1397 {
1398 dout(10) << __func__ << " " << cid << " " << ocid << " " << oid << dendl;
1399 CollectionRef c = get_collection(cid);
1400 if (!c)
1401 return -ENOENT;
1402 CollectionRef oc = get_collection(ocid);
1403 if (!oc)
1404 return -ENOENT;
1405 RWLock::WLocker l1(MIN(&(*c), &(*oc))->lock);
1406 RWLock::WLocker l2(MAX(&(*c), &(*oc))->lock);
1407
1408 if (c->object_hash.count(oid))
1409 return -EEXIST;
1410 if (oc->object_hash.count(oid) == 0)
1411 return -ENOENT;
1412 ObjectRef o = oc->object_hash[oid];
1413 c->object_map[oid] = o;
1414 c->object_hash[oid] = o;
1415 return 0;
1416 }
1417
1418 int MemStore::_collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid,
1419 coll_t cid, const ghobject_t& oid)
1420 {
1421 dout(10) << __func__ << " " << oldcid << " " << oldoid << " -> "
1422 << cid << " " << oid << dendl;
1423 CollectionRef c = get_collection(cid);
1424 if (!c)
1425 return -ENOENT;
1426 CollectionRef oc = get_collection(oldcid);
1427 if (!oc)
1428 return -ENOENT;
1429
1430 // note: c and oc may be the same
1431 assert(&(*c) == &(*oc));
1432 c->lock.get_write();
1433
1434 int r = -EEXIST;
1435 if (c->object_hash.count(oid))
1436 goto out;
1437 r = -ENOENT;
1438 if (oc->object_hash.count(oldoid) == 0)
1439 goto out;
1440 {
1441 ObjectRef o = oc->object_hash[oldoid];
1442 c->object_map[oid] = o;
1443 c->object_hash[oid] = o;
1444 oc->object_map.erase(oldoid);
1445 oc->object_hash.erase(oldoid);
1446 }
1447 r = 0;
1448 out:
1449 c->lock.put_write();
1450 return r;
1451 }
1452
1453 int MemStore::_split_collection(const coll_t& cid, uint32_t bits, uint32_t match,
1454 coll_t dest)
1455 {
1456 dout(10) << __func__ << " " << cid << " " << bits << " " << match << " "
1457 << dest << dendl;
1458 CollectionRef sc = get_collection(cid);
1459 if (!sc)
1460 return -ENOENT;
1461 CollectionRef dc = get_collection(dest);
1462 if (!dc)
1463 return -ENOENT;
1464 RWLock::WLocker l1(MIN(&(*sc), &(*dc))->lock);
1465 RWLock::WLocker l2(MAX(&(*sc), &(*dc))->lock);
1466
1467 map<ghobject_t,ObjectRef>::iterator p = sc->object_map.begin();
1468 while (p != sc->object_map.end()) {
1469 if (p->first.match(bits, match)) {
1470 dout(20) << " moving " << p->first << dendl;
1471 dc->object_map.insert(make_pair(p->first, p->second));
1472 dc->object_hash.insert(make_pair(p->first, p->second));
1473 sc->object_hash.erase(p->first);
1474 sc->object_map.erase(p++);
1475 } else {
1476 ++p;
1477 }
1478 }
1479
1480 sc->bits = bits;
1481 assert(dc->bits == (int)bits);
1482
1483 return 0;
1484 }
1485 namespace {
1486 struct BufferlistObject : public MemStore::Object {
1487 Spinlock mutex;
1488 bufferlist data;
1489
1490 size_t get_size() const override { return data.length(); }
1491
1492 int read(uint64_t offset, uint64_t len, bufferlist &bl) override;
1493 int write(uint64_t offset, const bufferlist &bl) override;
1494 int clone(Object *src, uint64_t srcoff, uint64_t len,
1495 uint64_t dstoff) override;
1496 int truncate(uint64_t offset) override;
1497
1498 void encode(bufferlist& bl) const override {
1499 ENCODE_START(1, 1, bl);
1500 ::encode(data, bl);
1501 encode_base(bl);
1502 ENCODE_FINISH(bl);
1503 }
1504 void decode(bufferlist::iterator& p) override {
1505 DECODE_START(1, p);
1506 ::decode(data, p);
1507 decode_base(p);
1508 DECODE_FINISH(p);
1509 }
1510 };
1511 }
1512 // BufferlistObject
1513 int BufferlistObject::read(uint64_t offset, uint64_t len,
1514 bufferlist &bl)
1515 {
1516 std::lock_guard<Spinlock> lock(mutex);
1517 bl.substr_of(data, offset, len);
1518 return bl.length();
1519 }
1520
1521 int BufferlistObject::write(uint64_t offset, const bufferlist &src)
1522 {
1523 unsigned len = src.length();
1524
1525 std::lock_guard<Spinlock> lock(mutex);
1526
1527 // before
1528 bufferlist newdata;
1529 if (get_size() >= offset) {
1530 newdata.substr_of(data, 0, offset);
1531 } else {
1532 if (get_size()) {
1533 newdata.substr_of(data, 0, get_size());
1534 }
1535 newdata.append_zero(offset - get_size());
1536 }
1537
1538 newdata.append(src);
1539
1540 // after
1541 if (get_size() > offset + len) {
1542 bufferlist tail;
1543 tail.substr_of(data, offset + len, get_size() - (offset + len));
1544 newdata.append(tail);
1545 }
1546
1547 data.claim(newdata);
1548 return 0;
1549 }
1550
1551 int BufferlistObject::clone(Object *src, uint64_t srcoff,
1552 uint64_t len, uint64_t dstoff)
1553 {
1554 auto srcbl = dynamic_cast<BufferlistObject*>(src);
1555 if (srcbl == nullptr)
1556 return -ENOTSUP;
1557
1558 bufferlist bl;
1559 {
1560 std::lock_guard<Spinlock> lock(srcbl->mutex);
1561 if (srcoff == dstoff && len == src->get_size()) {
1562 data = srcbl->data;
1563 return 0;
1564 }
1565 bl.substr_of(srcbl->data, srcoff, len);
1566 }
1567 return write(dstoff, bl);
1568 }
1569
1570 int BufferlistObject::truncate(uint64_t size)
1571 {
1572 std::lock_guard<Spinlock> lock(mutex);
1573 if (get_size() > size) {
1574 bufferlist bl;
1575 bl.substr_of(data, 0, size);
1576 data.claim(bl);
1577 } else if (get_size() == size) {
1578 // do nothing
1579 } else {
1580 data.append_zero(size - get_size());
1581 }
1582 return 0;
1583 }
1584
1585 // PageSetObject
1586
1587 struct MemStore::PageSetObject : public Object {
1588 PageSet data;
1589 uint64_t data_len;
1590 #if defined(__GLIBCXX__)
1591 // use a thread-local vector for the pages returned by PageSet, so we
1592 // can avoid allocations in read/write()
1593 static thread_local PageSet::page_vector tls_pages;
1594 #endif
1595
1596 explicit PageSetObject(size_t page_size) : data(page_size), data_len(0) {}
1597
1598 size_t get_size() const override { return data_len; }
1599
1600 int read(uint64_t offset, uint64_t len, bufferlist &bl) override;
1601 int write(uint64_t offset, const bufferlist &bl) override;
1602 int clone(Object *src, uint64_t srcoff, uint64_t len,
1603 uint64_t dstoff) override;
1604 int truncate(uint64_t offset) override;
1605
1606 void encode(bufferlist& bl) const override {
1607 ENCODE_START(1, 1, bl);
1608 ::encode(data_len, bl);
1609 data.encode(bl);
1610 encode_base(bl);
1611 ENCODE_FINISH(bl);
1612 }
1613 void decode(bufferlist::iterator& p) override {
1614 DECODE_START(1, p);
1615 ::decode(data_len, p);
1616 data.decode(p);
1617 decode_base(p);
1618 DECODE_FINISH(p);
1619 }
1620 };
1621
1622 #if defined(__GLIBCXX__)
1623 // use a thread-local vector for the pages returned by PageSet, so we
1624 // can avoid allocations in read/write()
1625 thread_local PageSet::page_vector MemStore::PageSetObject::tls_pages;
1626 #define DEFINE_PAGE_VECTOR(name)
1627 #else
1628 #define DEFINE_PAGE_VECTOR(name) PageSet::page_vector name;
1629 #endif
1630
1631 int MemStore::PageSetObject::read(uint64_t offset, uint64_t len, bufferlist& bl)
1632 {
1633 const auto start = offset;
1634 const auto end = offset + len;
1635 auto remaining = len;
1636
1637 DEFINE_PAGE_VECTOR(tls_pages);
1638 data.get_range(offset, len, tls_pages);
1639
1640 // allocate a buffer for the data
1641 buffer::ptr buf(len);
1642
1643 auto p = tls_pages.begin();
1644 while (remaining) {
1645 // no more pages in range
1646 if (p == tls_pages.end() || (*p)->offset >= end) {
1647 buf.zero(offset - start, remaining);
1648 break;
1649 }
1650 auto page = *p;
1651
1652 // fill any holes between pages with zeroes
1653 if (page->offset > offset) {
1654 const auto count = std::min(remaining, page->offset - offset);
1655 buf.zero(offset - start, count);
1656 remaining -= count;
1657 offset = page->offset;
1658 if (!remaining)
1659 break;
1660 }
1661
1662 // read from page
1663 const auto page_offset = offset - page->offset;
1664 const auto count = min(remaining, data.get_page_size() - page_offset);
1665
1666 buf.copy_in(offset - start, count, page->data + page_offset);
1667
1668 remaining -= count;
1669 offset += count;
1670
1671 ++p;
1672 }
1673
1674 tls_pages.clear(); // drop page refs
1675
1676 bl.append(std::move(buf));
1677 return len;
1678 }
1679
1680 int MemStore::PageSetObject::write(uint64_t offset, const bufferlist &src)
1681 {
1682 unsigned len = src.length();
1683
1684 DEFINE_PAGE_VECTOR(tls_pages);
1685 // make sure the page range is allocated
1686 data.alloc_range(offset, src.length(), tls_pages);
1687
1688 auto page = tls_pages.begin();
1689
1690 auto p = src.begin();
1691 while (len > 0) {
1692 unsigned page_offset = offset - (*page)->offset;
1693 unsigned pageoff = data.get_page_size() - page_offset;
1694 unsigned count = min(len, pageoff);
1695 p.copy(count, (*page)->data + page_offset);
1696 offset += count;
1697 len -= count;
1698 if (count == pageoff)
1699 ++page;
1700 }
1701 if (data_len < offset)
1702 data_len = offset;
1703 tls_pages.clear(); // drop page refs
1704 return 0;
1705 }
1706
1707 int MemStore::PageSetObject::clone(Object *src, uint64_t srcoff,
1708 uint64_t len, uint64_t dstoff)
1709 {
1710 const int64_t delta = dstoff - srcoff;
1711
1712 auto &src_data = static_cast<PageSetObject*>(src)->data;
1713 const uint64_t src_page_size = src_data.get_page_size();
1714
1715 auto &dst_data = data;
1716 const auto dst_page_size = dst_data.get_page_size();
1717
1718 DEFINE_PAGE_VECTOR(tls_pages);
1719 PageSet::page_vector dst_pages;
1720
1721 while (len) {
1722 // limit to 16 pages at a time so tls_pages doesn't balloon in size
1723 auto count = std::min(len, (uint64_t)src_page_size * 16);
1724 src_data.get_range(srcoff, count, tls_pages);
1725
1726 // allocate the destination range
1727 // TODO: avoid allocating pages for holes in the source range
1728 dst_data.alloc_range(srcoff + delta, count, dst_pages);
1729 auto dst_iter = dst_pages.begin();
1730
1731 for (auto &src_page : tls_pages) {
1732 auto sbegin = std::max(srcoff, src_page->offset);
1733 auto send = std::min(srcoff + count, src_page->offset + src_page_size);
1734
1735 // zero-fill holes before src_page
1736 if (srcoff < sbegin) {
1737 while (dst_iter != dst_pages.end()) {
1738 auto &dst_page = *dst_iter;
1739 auto dbegin = std::max(srcoff + delta, dst_page->offset);
1740 auto dend = std::min(sbegin + delta, dst_page->offset + dst_page_size);
1741 std::fill(dst_page->data + dbegin - dst_page->offset,
1742 dst_page->data + dend - dst_page->offset, 0);
1743 if (dend < dst_page->offset + dst_page_size)
1744 break;
1745 ++dst_iter;
1746 }
1747 const auto c = sbegin - srcoff;
1748 count -= c;
1749 len -= c;
1750 }
1751
1752 // copy data from src page to dst pages
1753 while (dst_iter != dst_pages.end()) {
1754 auto &dst_page = *dst_iter;
1755 auto dbegin = std::max(sbegin + delta, dst_page->offset);
1756 auto dend = std::min(send + delta, dst_page->offset + dst_page_size);
1757
1758 std::copy(src_page->data + (dbegin - delta) - src_page->offset,
1759 src_page->data + (dend - delta) - src_page->offset,
1760 dst_page->data + dbegin - dst_page->offset);
1761 if (dend < dst_page->offset + dst_page_size)
1762 break;
1763 ++dst_iter;
1764 }
1765
1766 const auto c = send - sbegin;
1767 count -= c;
1768 len -= c;
1769 srcoff = send;
1770 dstoff = send + delta;
1771 }
1772 tls_pages.clear(); // drop page refs
1773
1774 // zero-fill holes after the last src_page
1775 if (count > 0) {
1776 while (dst_iter != dst_pages.end()) {
1777 auto &dst_page = *dst_iter;
1778 auto dbegin = std::max(dstoff, dst_page->offset);
1779 auto dend = std::min(dstoff + count, dst_page->offset + dst_page_size);
1780 std::fill(dst_page->data + dbegin - dst_page->offset,
1781 dst_page->data + dend - dst_page->offset, 0);
1782 ++dst_iter;
1783 }
1784 srcoff += count;
1785 dstoff += count;
1786 len -= count;
1787 }
1788 dst_pages.clear(); // drop page refs
1789 }
1790
1791 // update object size
1792 if (data_len < dstoff)
1793 data_len = dstoff;
1794 return 0;
1795 }
1796
1797 int MemStore::PageSetObject::truncate(uint64_t size)
1798 {
1799 data.free_pages_after(size);
1800 data_len = size;
1801
1802 const auto page_size = data.get_page_size();
1803 const auto page_offset = size & ~(page_size-1);
1804 if (page_offset == size)
1805 return 0;
1806
1807 DEFINE_PAGE_VECTOR(tls_pages);
1808 // write zeroes to the rest of the last page
1809 data.get_range(page_offset, page_size, tls_pages);
1810 if (tls_pages.empty())
1811 return 0;
1812
1813 auto page = tls_pages.begin();
1814 auto data = (*page)->data;
1815 std::fill(data + (size - page_offset), data + page_size, 0);
1816 tls_pages.clear(); // drop page ref
1817 return 0;
1818 }
1819
1820
1821 MemStore::ObjectRef MemStore::Collection::create_object() const {
1822 if (use_page_set)
1823 return new PageSetObject(cct->_conf->memstore_page_size);
1824 return new BufferlistObject();
1825 }