]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/memstore/MemStore.cc
5609db360c702fdcbeaab98f9a91dbe021883929
[ceph.git] / ceph / src / os / memstore / MemStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14 #include "acconfig.h"
15
16 #ifdef HAVE_SYS_MOUNT_H
17 #include <sys/mount.h>
18 #endif
19
20 #ifdef HAVE_SYS_PARAM_H
21 #include <sys/param.h>
22 #endif
23
24 #include "include/types.h"
25 #include "include/stringify.h"
26 #include "include/unordered_map.h"
27 #include "common/errno.h"
28 #include "MemStore.h"
29 #include "include/compat.h"
30
31 #define dout_context cct
32 #define dout_subsys ceph_subsys_filestore
33 #undef dout_prefix
34 #define dout_prefix *_dout << "memstore(" << path << ") "
35
36 using ceph::decode;
37 using ceph::encode;
38
39 // for comparing collections for lock ordering
40 bool operator>(const MemStore::CollectionRef& l,
41 const MemStore::CollectionRef& r)
42 {
43 return (unsigned long)l.get() > (unsigned long)r.get();
44 }
45
46
47 int MemStore::mount()
48 {
49 int r = _load();
50 if (r < 0)
51 return r;
52 finisher.start();
53 return 0;
54 }
55
56 int MemStore::umount()
57 {
58 finisher.wait_for_empty();
59 finisher.stop();
60 return _save();
61 }
62
63 int MemStore::_save()
64 {
65 dout(10) << __func__ << dendl;
66 dump_all();
67 std::set<coll_t> collections;
68 for (auto p = coll_map.begin(); p != coll_map.end(); ++p) {
69 dout(20) << __func__ << " coll " << p->first << " " << p->second << dendl;
70 collections.insert(p->first);
71 ceph::buffer::list bl;
72 ceph_assert(p->second);
73 p->second->encode(bl);
74 std::string fn = path + "/" + stringify(p->first);
75 int r = bl.write_file(fn.c_str());
76 if (r < 0)
77 return r;
78 }
79
80 std::string fn = path + "/collections";
81 ceph::buffer::list bl;
82 encode(collections, bl);
83 int r = bl.write_file(fn.c_str());
84 if (r < 0)
85 return r;
86
87 return 0;
88 }
89
90 void MemStore::dump_all()
91 {
92 auto f = ceph::Formatter::create("json-pretty");
93 f->open_object_section("store");
94 dump(f);
95 f->close_section();
96 dout(0) << "dump:";
97 f->flush(*_dout);
98 *_dout << dendl;
99 delete f;
100 }
101
102 void MemStore::dump(ceph::Formatter *f)
103 {
104 f->open_array_section("collections");
105 for (auto p = coll_map.begin(); p != coll_map.end(); ++p) {
106 f->open_object_section("collection");
107 f->dump_string("name", stringify(p->first));
108
109 f->open_array_section("xattrs");
110 for (auto q = p->second->xattr.begin();
111 q != p->second->xattr.end();
112 ++q) {
113 f->open_object_section("xattr");
114 f->dump_string("name", q->first);
115 f->dump_int("length", q->second.length());
116 f->close_section();
117 }
118 f->close_section();
119
120 f->open_array_section("objects");
121 for (auto q = p->second->object_map.begin();
122 q != p->second->object_map.end();
123 ++q) {
124 f->open_object_section("object");
125 f->dump_string("name", stringify(q->first));
126 if (q->second)
127 q->second->dump(f);
128 f->close_section();
129 }
130 f->close_section();
131
132 f->close_section();
133 }
134 f->close_section();
135 }
136
137 int MemStore::_load()
138 {
139 dout(10) << __func__ << dendl;
140 ceph::buffer::list bl;
141 std::string fn = path + "/collections";
142 std::string err;
143 int r = bl.read_file(fn.c_str(), &err);
144 if (r < 0)
145 return r;
146
147 std::set<coll_t> collections;
148 auto p = bl.cbegin();
149 decode(collections, p);
150
151 for (auto q = collections.begin();
152 q != collections.end();
153 ++q) {
154 std::string fn = path + "/" + stringify(*q);
155 ceph::buffer::list cbl;
156 int r = cbl.read_file(fn.c_str(), &err);
157 if (r < 0)
158 return r;
159 auto c = ceph::make_ref<Collection>(cct, *q);
160 auto p = cbl.cbegin();
161 c->decode(p);
162 coll_map[*q] = c;
163 used_bytes += c->used_bytes();
164 }
165
166 dump_all();
167
168 return 0;
169 }
170
171 void MemStore::set_fsid(uuid_d u)
172 {
173 int r = write_meta("fsid", stringify(u));
174 ceph_assert(r >= 0);
175 }
176
177 uuid_d MemStore::get_fsid()
178 {
179 std::string fsid_str;
180 int r = read_meta("fsid", &fsid_str);
181 ceph_assert(r >= 0);
182 uuid_d uuid;
183 bool b = uuid.parse(fsid_str.c_str());
184 ceph_assert(b);
185 return uuid;
186 }
187
188 int MemStore::mkfs()
189 {
190 std::string fsid_str;
191 int r = read_meta("fsid", &fsid_str);
192 if (r == -ENOENT) {
193 uuid_d fsid;
194 fsid.generate_random();
195 fsid_str = stringify(fsid);
196 r = write_meta("fsid", fsid_str);
197 if (r < 0)
198 return r;
199 dout(1) << __func__ << " new fsid " << fsid_str << dendl;
200 } else if (r < 0) {
201 return r;
202 } else {
203 dout(1) << __func__ << " had fsid " << fsid_str << dendl;
204 }
205
206 std::string fn = path + "/collections";
207 derr << path << dendl;
208 ceph::buffer::list bl;
209 std::set<coll_t> collections;
210 encode(collections, bl);
211 r = bl.write_file(fn.c_str());
212 if (r < 0)
213 return r;
214
215 r = write_meta("type", "memstore");
216 if (r < 0)
217 return r;
218
219 return 0;
220 }
221
222 int MemStore::statfs(struct store_statfs_t *st, osd_alert_list_t* alerts)
223 {
224 dout(10) << __func__ << dendl;
225 if (alerts) {
226 alerts->clear(); // returns nothing for now
227 }
228 st->reset();
229 st->total = cct->_conf->memstore_device_bytes;
230 st->available = std::max<int64_t>(st->total - used_bytes, 0);
231 dout(10) << __func__ << ": used_bytes: " << used_bytes
232 << "/" << cct->_conf->memstore_device_bytes << dendl;
233 return 0;
234 }
235
236 int MemStore::pool_statfs(uint64_t pool_id, struct store_statfs_t *buf,
237 bool *per_pool_omap)
238 {
239 return -ENOTSUP;
240 }
241
242 objectstore_perf_stat_t MemStore::get_cur_stats()
243 {
244 // fixme
245 return objectstore_perf_stat_t();
246 }
247
248 MemStore::CollectionRef MemStore::get_collection(const coll_t& cid)
249 {
250 std::shared_lock l{coll_lock};
251 ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
252 if (cp == coll_map.end())
253 return CollectionRef();
254 return cp->second;
255 }
256
257 ObjectStore::CollectionHandle MemStore::create_new_collection(const coll_t& cid)
258 {
259 std::lock_guard l{coll_lock};
260 auto c = ceph::make_ref<Collection>(cct, cid);
261 new_coll_map[cid] = c;
262 return c;
263 }
264
265
266 // ---------------
267 // read operations
268
269 bool MemStore::exists(CollectionHandle &c_, const ghobject_t& oid)
270 {
271 Collection *c = static_cast<Collection*>(c_.get());
272 dout(10) << __func__ << " " << c->get_cid() << " " << oid << dendl;
273 if (!c->exists)
274 return false;
275
276 // Perform equivalent of c->get_object_(oid) != NULL. In C++11 the
277 // shared_ptr needs to be compared to nullptr.
278 return (bool)c->get_object(oid);
279 }
280
281 int MemStore::stat(
282 CollectionHandle &c_,
283 const ghobject_t& oid,
284 struct stat *st,
285 bool allow_eio)
286 {
287 Collection *c = static_cast<Collection*>(c_.get());
288 dout(10) << __func__ << " " << c->cid << " " << oid << dendl;
289 if (!c->exists)
290 return -ENOENT;
291 ObjectRef o = c->get_object(oid);
292 if (!o)
293 return -ENOENT;
294 st->st_size = o->get_size();
295 st->st_blksize = 4096;
296 st->st_blocks = (st->st_size + st->st_blksize - 1) / st->st_blksize;
297 st->st_nlink = 1;
298 return 0;
299 }
300
301 int MemStore::set_collection_opts(
302 CollectionHandle& ch,
303 const pool_opts_t& opts)
304 {
305 return -EOPNOTSUPP;
306 }
307
308 int MemStore::read(
309 CollectionHandle &c_,
310 const ghobject_t& oid,
311 uint64_t offset,
312 size_t len,
313 ceph::buffer::list& bl,
314 uint32_t op_flags)
315 {
316 Collection *c = static_cast<Collection*>(c_.get());
317 dout(10) << __func__ << " " << c->cid << " " << oid << " "
318 << offset << "~" << len << dendl;
319 if (!c->exists)
320 return -ENOENT;
321 ObjectRef o = c->get_object(oid);
322 if (!o)
323 return -ENOENT;
324 if (offset >= o->get_size())
325 return 0;
326 size_t l = len;
327 if (l == 0 && offset == 0) // note: len == 0 means read the entire object
328 l = o->get_size();
329 else if (offset + l > o->get_size())
330 l = o->get_size() - offset;
331 bl.clear();
332 return o->read(offset, l, bl);
333 }
334
335 int MemStore::fiemap(CollectionHandle& ch, const ghobject_t& oid,
336 uint64_t offset, size_t len, ceph::buffer::list& bl)
337 {
338 std::map<uint64_t, uint64_t> destmap;
339 int r = fiemap(ch, oid, offset, len, destmap);
340 if (r >= 0)
341 encode(destmap, bl);
342 return r;
343 }
344
345 int MemStore::fiemap(CollectionHandle& ch, const ghobject_t& oid,
346 uint64_t offset, size_t len, std::map<uint64_t, uint64_t>& destmap)
347 {
348 dout(10) << __func__ << " " << ch->cid << " " << oid << " " << offset << "~"
349 << len << dendl;
350 Collection *c = static_cast<Collection*>(ch.get());
351 if (!c)
352 return -ENOENT;
353
354 ObjectRef o = c->get_object(oid);
355 if (!o)
356 return -ENOENT;
357 size_t l = len;
358 if (offset + l > o->get_size())
359 l = o->get_size() - offset;
360 if (offset >= o->get_size())
361 goto out;
362 destmap[offset] = l;
363 out:
364 return 0;
365 }
366
367 int MemStore::getattr(CollectionHandle &c_, const ghobject_t& oid,
368 const char *name, ceph::buffer::ptr& value)
369 {
370 Collection *c = static_cast<Collection*>(c_.get());
371 dout(10) << __func__ << " " << c->cid << " " << oid << " " << name << dendl;
372 if (!c->exists)
373 return -ENOENT;
374 ObjectRef o = c->get_object(oid);
375 if (!o)
376 return -ENOENT;
377 std::string k(name);
378 std::lock_guard lock{o->xattr_mutex};
379 if (!o->xattr.count(k)) {
380 return -ENODATA;
381 }
382 value = o->xattr[k];
383 return 0;
384 }
385
386 int MemStore::getattrs(CollectionHandle &c_, const ghobject_t& oid,
387 std::map<std::string,ceph::buffer::ptr,std::less<>>& aset)
388 {
389 Collection *c = static_cast<Collection*>(c_.get());
390 dout(10) << __func__ << " " << c->cid << " " << oid << dendl;
391 if (!c->exists)
392 return -ENOENT;
393
394 ObjectRef o = c->get_object(oid);
395 if (!o)
396 return -ENOENT;
397 std::lock_guard lock{o->xattr_mutex};
398 aset = o->xattr;
399 return 0;
400 }
401
402 int MemStore::list_collections(std::vector<coll_t>& ls)
403 {
404 dout(10) << __func__ << dendl;
405 std::shared_lock l{coll_lock};
406 for (ceph::unordered_map<coll_t,CollectionRef>::iterator p = coll_map.begin();
407 p != coll_map.end();
408 ++p) {
409 ls.push_back(p->first);
410 }
411 return 0;
412 }
413
414 bool MemStore::collection_exists(const coll_t& cid)
415 {
416 dout(10) << __func__ << " " << cid << dendl;
417 std::shared_lock l{coll_lock};
418 return coll_map.count(cid);
419 }
420
421 int MemStore::collection_empty(CollectionHandle& ch, bool *empty)
422 {
423 dout(10) << __func__ << " " << ch->cid << dendl;
424 CollectionRef c = static_cast<Collection*>(ch.get());
425 std::shared_lock l{c->lock};
426 *empty = c->object_map.empty();
427 return 0;
428 }
429
430 int MemStore::collection_bits(CollectionHandle& ch)
431 {
432 dout(10) << __func__ << " " << ch->cid << dendl;
433 Collection *c = static_cast<Collection*>(ch.get());
434 std::shared_lock l{c->lock};
435 return c->bits;
436 }
437
438 int MemStore::collection_list(CollectionHandle& ch,
439 const ghobject_t& start,
440 const ghobject_t& end,
441 int max,
442 std::vector<ghobject_t> *ls, ghobject_t *next)
443 {
444 Collection *c = static_cast<Collection*>(ch.get());
445 std::shared_lock l{c->lock};
446
447 dout(10) << __func__ << " cid " << ch->cid << " start " << start
448 << " end " << end << dendl;
449 auto p = c->object_map.lower_bound(start);
450 while (p != c->object_map.end() &&
451 ls->size() < (unsigned)max &&
452 p->first < end) {
453 ls->push_back(p->first);
454 ++p;
455 }
456 if (next != NULL) {
457 if (p == c->object_map.end())
458 *next = ghobject_t::get_max();
459 else
460 *next = p->first;
461 }
462 dout(10) << __func__ << " cid " << ch->cid << " got " << ls->size() << dendl;
463 return 0;
464 }
465
466 int MemStore::omap_get(
467 CollectionHandle& ch, ///< [in] Collection containing oid
468 const ghobject_t &oid, ///< [in] Object containing omap
469 ceph::buffer::list *header, ///< [out] omap header
470 std::map<std::string, ceph::buffer::list> *out /// < [out] Key to value map
471 )
472 {
473 dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
474 Collection *c = static_cast<Collection*>(ch.get());
475
476 ObjectRef o = c->get_object(oid);
477 if (!o)
478 return -ENOENT;
479 std::lock_guard lock{o->omap_mutex};
480 *header = o->omap_header;
481 *out = o->omap;
482 return 0;
483 }
484
485 int MemStore::omap_get_header(
486 CollectionHandle& ch, ///< [in] Collection containing oid
487 const ghobject_t &oid, ///< [in] Object containing omap
488 ceph::buffer::list *header, ///< [out] omap header
489 bool allow_eio ///< [in] don't assert on eio
490 )
491 {
492 dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
493 Collection *c = static_cast<Collection*>(ch.get());
494 ObjectRef o = c->get_object(oid);
495 if (!o)
496 return -ENOENT;
497 std::lock_guard lock{o->omap_mutex};
498 *header = o->omap_header;
499 return 0;
500 }
501
502 int MemStore::omap_get_keys(
503 CollectionHandle& ch, ///< [in] Collection containing oid
504 const ghobject_t &oid, ///< [in] Object containing omap
505 std::set<std::string> *keys ///< [out] Keys defined on oid
506 )
507 {
508 dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
509 Collection *c = static_cast<Collection*>(ch.get());
510 ObjectRef o = c->get_object(oid);
511 if (!o)
512 return -ENOENT;
513 std::lock_guard lock{o->omap_mutex};
514 for (auto p = o->omap.begin(); p != o->omap.end(); ++p)
515 keys->insert(p->first);
516 return 0;
517 }
518
519 int MemStore::omap_get_values(
520 CollectionHandle& ch, ///< [in] Collection containing oid
521 const ghobject_t &oid, ///< [in] Object containing omap
522 const std::set<std::string> &keys, ///< [in] Keys to get
523 std::map<std::string, ceph::buffer::list> *out ///< [out] Returned keys and values
524 )
525 {
526 dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
527 Collection *c = static_cast<Collection*>(ch.get());
528 ObjectRef o = c->get_object(oid);
529 if (!o)
530 return -ENOENT;
531 std::lock_guard lock{o->omap_mutex};
532 for (auto p = keys.begin(); p != keys.end(); ++p) {
533 auto q = o->omap.find(*p);
534 if (q != o->omap.end())
535 out->insert(*q);
536 }
537 return 0;
538 }
539
540 #ifdef WITH_SEASTAR
541 int MemStore::omap_get_values(
542 CollectionHandle& ch, ///< [in] Collection containing oid
543 const ghobject_t &oid, ///< [in] Object containing omap
544 const std::optional<std::string> &start_after, ///< [in] Keys to get
545 std::map<std::string, ceph::buffer::list> *out ///< [out] Returned keys and values
546 )
547 {
548 dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
549 Collection *c = static_cast<Collection*>(ch.get());
550 ObjectRef o = c->get_object(oid);
551 if (!o)
552 return -ENOENT;
553 assert(start_after);
554 std::lock_guard lock{o->omap_mutex};
555 for (auto it = o->omap.upper_bound(*start_after);
556 it != std::end(o->omap);
557 ++it) {
558 out->insert(*it);
559 }
560 return 0;
561 }
562 #endif
563
564 int MemStore::omap_check_keys(
565 CollectionHandle& ch, ///< [in] Collection containing oid
566 const ghobject_t &oid, ///< [in] Object containing omap
567 const std::set<std::string> &keys, ///< [in] Keys to check
568 std::set<std::string> *out ///< [out] Subset of keys defined on oid
569 )
570 {
571 dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
572 Collection *c = static_cast<Collection*>(ch.get());
573 ObjectRef o = c->get_object(oid);
574 if (!o)
575 return -ENOENT;
576 std::lock_guard lock{o->omap_mutex};
577 for (auto p = keys.begin(); p != keys.end(); ++p) {
578 auto q = o->omap.find(*p);
579 if (q != o->omap.end())
580 out->insert(*p);
581 }
582 return 0;
583 }
584
585 class MemStore::OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
586 CollectionRef c;
587 ObjectRef o;
588 std::map<std::string,ceph::buffer::list>::iterator it;
589 public:
590 OmapIteratorImpl(CollectionRef c, ObjectRef o)
591 : c(c), o(o), it(o->omap.begin()) {}
592
593 int seek_to_first() override {
594 std::lock_guard lock{o->omap_mutex};
595 it = o->omap.begin();
596 return 0;
597 }
598 int upper_bound(const std::string &after) override {
599 std::lock_guard lock{o->omap_mutex};
600 it = o->omap.upper_bound(after);
601 return 0;
602 }
603 int lower_bound(const std::string &to) override {
604 std::lock_guard lock{o->omap_mutex};
605 it = o->omap.lower_bound(to);
606 return 0;
607 }
608 bool valid() override {
609 std::lock_guard lock{o->omap_mutex};
610 return it != o->omap.end();
611 }
612 int next() override {
613 std::lock_guard lock{o->omap_mutex};
614 ++it;
615 return 0;
616 }
617 std::string key() override {
618 std::lock_guard lock{o->omap_mutex};
619 return it->first;
620 }
621 ceph::buffer::list value() override {
622 std::lock_guard lock{o->omap_mutex};
623 return it->second;
624 }
625 int status() override {
626 return 0;
627 }
628 };
629
630 ObjectMap::ObjectMapIterator MemStore::get_omap_iterator(
631 CollectionHandle& ch,
632 const ghobject_t& oid)
633 {
634 dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
635 Collection *c = static_cast<Collection*>(ch.get());
636 ObjectRef o = c->get_object(oid);
637 if (!o)
638 return ObjectMap::ObjectMapIterator();
639 return ObjectMap::ObjectMapIterator(new OmapIteratorImpl(c, o));
640 }
641
642
643 // ---------------
644 // write operations
645
646 int MemStore::queue_transactions(
647 CollectionHandle& ch,
648 std::vector<Transaction>& tls,
649 TrackedOpRef op,
650 ThreadPool::TPHandle *handle)
651 {
652 // because memstore operations are synchronous, we can implement the
653 // Sequencer with a mutex. this guarantees ordering on a given sequencer,
654 // while allowing operations on different sequencers to happen in parallel
655 Collection *c = static_cast<Collection*>(ch.get());
656 std::unique_lock lock{c->sequencer_mutex};
657
658 for (auto p = tls.begin(); p != tls.end(); ++p) {
659 // poke the TPHandle heartbeat just to exercise that code path
660 if (handle)
661 handle->reset_tp_timeout();
662
663 _do_transaction(*p);
664 }
665
666 Context *on_apply = NULL, *on_apply_sync = NULL, *on_commit = NULL;
667 ObjectStore::Transaction::collect_contexts(tls, &on_apply, &on_commit,
668 &on_apply_sync);
669 if (on_apply_sync)
670 on_apply_sync->complete(0);
671 if (on_apply)
672 finisher.queue(on_apply);
673 if (on_commit)
674 finisher.queue(on_commit);
675 return 0;
676 }
677
678 void MemStore::_do_transaction(Transaction& t)
679 {
680 Transaction::iterator i = t.begin();
681 int pos = 0;
682
683 while (i.have_op()) {
684 Transaction::Op *op = i.decode_op();
685 int r = 0;
686
687 switch (op->op) {
688 case Transaction::OP_NOP:
689 break;
690 case Transaction::OP_TOUCH:
691 case Transaction::OP_CREATE:
692 {
693 coll_t cid = i.get_cid(op->cid);
694 ghobject_t oid = i.get_oid(op->oid);
695 r = _touch(cid, oid);
696 }
697 break;
698
699 case Transaction::OP_WRITE:
700 {
701 coll_t cid = i.get_cid(op->cid);
702 ghobject_t oid = i.get_oid(op->oid);
703 uint64_t off = op->off;
704 uint64_t len = op->len;
705 uint32_t fadvise_flags = i.get_fadvise_flags();
706 ceph::buffer::list bl;
707 i.decode_bl(bl);
708 r = _write(cid, oid, off, len, bl, fadvise_flags);
709 }
710 break;
711
712 case Transaction::OP_ZERO:
713 {
714 coll_t cid = i.get_cid(op->cid);
715 ghobject_t oid = i.get_oid(op->oid);
716 uint64_t off = op->off;
717 uint64_t len = op->len;
718 r = _zero(cid, oid, off, len);
719 }
720 break;
721
722 case Transaction::OP_TRIMCACHE:
723 {
724 // deprecated, no-op
725 }
726 break;
727
728 case Transaction::OP_TRUNCATE:
729 {
730 coll_t cid = i.get_cid(op->cid);
731 ghobject_t oid = i.get_oid(op->oid);
732 uint64_t off = op->off;
733 r = _truncate(cid, oid, off);
734 }
735 break;
736
737 case Transaction::OP_REMOVE:
738 {
739 coll_t cid = i.get_cid(op->cid);
740 ghobject_t oid = i.get_oid(op->oid);
741 r = _remove(cid, oid);
742 }
743 break;
744
745 case Transaction::OP_SETATTR:
746 {
747 coll_t cid = i.get_cid(op->cid);
748 ghobject_t oid = i.get_oid(op->oid);
749 std::string name = i.decode_string();
750 ceph::buffer::list bl;
751 i.decode_bl(bl);
752 std::map<std::string, ceph::buffer::ptr> to_set;
753 to_set[name] = ceph::buffer::ptr(bl.c_str(), bl.length());
754 r = _setattrs(cid, oid, to_set);
755 }
756 break;
757
758 case Transaction::OP_SETATTRS:
759 {
760 coll_t cid = i.get_cid(op->cid);
761 ghobject_t oid = i.get_oid(op->oid);
762 std::map<std::string, ceph::buffer::ptr> aset;
763 i.decode_attrset(aset);
764 r = _setattrs(cid, oid, aset);
765 }
766 break;
767
768 case Transaction::OP_RMATTR:
769 {
770 coll_t cid = i.get_cid(op->cid);
771 ghobject_t oid = i.get_oid(op->oid);
772 std::string name = i.decode_string();
773 r = _rmattr(cid, oid, name.c_str());
774 }
775 break;
776
777 case Transaction::OP_RMATTRS:
778 {
779 coll_t cid = i.get_cid(op->cid);
780 ghobject_t oid = i.get_oid(op->oid);
781 r = _rmattrs(cid, oid);
782 }
783 break;
784
785 case Transaction::OP_CLONE:
786 {
787 coll_t cid = i.get_cid(op->cid);
788 ghobject_t oid = i.get_oid(op->oid);
789 ghobject_t noid = i.get_oid(op->dest_oid);
790 r = _clone(cid, oid, noid);
791 }
792 break;
793
794 case Transaction::OP_CLONERANGE:
795 {
796 coll_t cid = i.get_cid(op->cid);
797 ghobject_t oid = i.get_oid(op->oid);
798 ghobject_t noid = i.get_oid(op->dest_oid);
799 uint64_t off = op->off;
800 uint64_t len = op->len;
801 r = _clone_range(cid, oid, noid, off, len, off);
802 }
803 break;
804
805 case Transaction::OP_CLONERANGE2:
806 {
807 coll_t cid = i.get_cid(op->cid);
808 ghobject_t oid = i.get_oid(op->oid);
809 ghobject_t noid = i.get_oid(op->dest_oid);
810 uint64_t srcoff = op->off;
811 uint64_t len = op->len;
812 uint64_t dstoff = op->dest_off;
813 r = _clone_range(cid, oid, noid, srcoff, len, dstoff);
814 }
815 break;
816
817 case Transaction::OP_MKCOLL:
818 {
819 coll_t cid = i.get_cid(op->cid);
820 r = _create_collection(cid, op->split_bits);
821 }
822 break;
823
824 case Transaction::OP_COLL_HINT:
825 {
826 coll_t cid = i.get_cid(op->cid);
827 uint32_t type = op->hint;
828 ceph::buffer::list hint;
829 i.decode_bl(hint);
830 auto hiter = hint.cbegin();
831 if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) {
832 uint32_t pg_num;
833 uint64_t num_objs;
834 decode(pg_num, hiter);
835 decode(num_objs, hiter);
836 r = _collection_hint_expected_num_objs(cid, pg_num, num_objs);
837 } else {
838 // Ignore the hint
839 dout(10) << "Unrecognized collection hint type: " << type << dendl;
840 }
841 }
842 break;
843
844 case Transaction::OP_RMCOLL:
845 {
846 coll_t cid = i.get_cid(op->cid);
847 r = _destroy_collection(cid);
848 }
849 break;
850
851 case Transaction::OP_COLL_ADD:
852 {
853 coll_t ocid = i.get_cid(op->cid);
854 coll_t ncid = i.get_cid(op->dest_cid);
855 ghobject_t oid = i.get_oid(op->oid);
856 r = _collection_add(ncid, ocid, oid);
857 }
858 break;
859
860 case Transaction::OP_COLL_REMOVE:
861 {
862 coll_t cid = i.get_cid(op->cid);
863 ghobject_t oid = i.get_oid(op->oid);
864 r = _remove(cid, oid);
865 }
866 break;
867
868 case Transaction::OP_COLL_MOVE:
869 ceph_abort_msg("deprecated");
870 break;
871
872 case Transaction::OP_COLL_MOVE_RENAME:
873 {
874 coll_t oldcid = i.get_cid(op->cid);
875 ghobject_t oldoid = i.get_oid(op->oid);
876 coll_t newcid = i.get_cid(op->dest_cid);
877 ghobject_t newoid = i.get_oid(op->dest_oid);
878 r = _collection_move_rename(oldcid, oldoid, newcid, newoid);
879 if (r == -ENOENT)
880 r = 0;
881 }
882 break;
883
884 case Transaction::OP_TRY_RENAME:
885 {
886 coll_t cid = i.get_cid(op->cid);
887 ghobject_t oldoid = i.get_oid(op->oid);
888 ghobject_t newoid = i.get_oid(op->dest_oid);
889 r = _collection_move_rename(cid, oldoid, cid, newoid);
890 if (r == -ENOENT)
891 r = 0;
892 }
893 break;
894
895 case Transaction::OP_COLL_SETATTR:
896 {
897 ceph_abort_msg("not implemented");
898 }
899 break;
900
901 case Transaction::OP_COLL_RMATTR:
902 {
903 ceph_abort_msg("not implemented");
904 }
905 break;
906
907 case Transaction::OP_COLL_RENAME:
908 {
909 ceph_abort_msg("not implemented");
910 }
911 break;
912
913 case Transaction::OP_OMAP_CLEAR:
914 {
915 coll_t cid = i.get_cid(op->cid);
916 ghobject_t oid = i.get_oid(op->oid);
917 r = _omap_clear(cid, oid);
918 }
919 break;
920 case Transaction::OP_OMAP_SETKEYS:
921 {
922 coll_t cid = i.get_cid(op->cid);
923 ghobject_t oid = i.get_oid(op->oid);
924 ceph::buffer::list aset_bl;
925 i.decode_attrset_bl(&aset_bl);
926 r = _omap_setkeys(cid, oid, aset_bl);
927 }
928 break;
929 case Transaction::OP_OMAP_RMKEYS:
930 {
931 coll_t cid = i.get_cid(op->cid);
932 ghobject_t oid = i.get_oid(op->oid);
933 ceph::buffer::list keys_bl;
934 i.decode_keyset_bl(&keys_bl);
935 r = _omap_rmkeys(cid, oid, keys_bl);
936 }
937 break;
938 case Transaction::OP_OMAP_RMKEYRANGE:
939 {
940 coll_t cid = i.get_cid(op->cid);
941 ghobject_t oid = i.get_oid(op->oid);
942 std::string first, last;
943 first = i.decode_string();
944 last = i.decode_string();
945 r = _omap_rmkeyrange(cid, oid, first, last);
946 }
947 break;
948 case Transaction::OP_OMAP_SETHEADER:
949 {
950 coll_t cid = i.get_cid(op->cid);
951 ghobject_t oid = i.get_oid(op->oid);
952 ceph::buffer::list bl;
953 i.decode_bl(bl);
954 r = _omap_setheader(cid, oid, bl);
955 }
956 break;
957 case Transaction::OP_SPLIT_COLLECTION:
958 ceph_abort_msg("deprecated");
959 break;
960 case Transaction::OP_SPLIT_COLLECTION2:
961 {
962 coll_t cid = i.get_cid(op->cid);
963 uint32_t bits = op->split_bits;
964 uint32_t rem = op->split_rem;
965 coll_t dest = i.get_cid(op->dest_cid);
966 r = _split_collection(cid, bits, rem, dest);
967 }
968 break;
969 case Transaction::OP_MERGE_COLLECTION:
970 {
971 coll_t cid = i.get_cid(op->cid);
972 uint32_t bits = op->split_bits;
973 coll_t dest = i.get_cid(op->dest_cid);
974 r = _merge_collection(cid, bits, dest);
975 }
976 break;
977
978 case Transaction::OP_SETALLOCHINT:
979 {
980 r = 0;
981 }
982 break;
983
984 case Transaction::OP_COLL_SET_BITS:
985 {
986 r = 0;
987 }
988 break;
989
990 default:
991 derr << "bad op " << op->op << dendl;
992 ceph_abort();
993 }
994
995 if (r < 0) {
996 bool ok = false;
997
998 if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE ||
999 op->op == Transaction::OP_CLONE ||
1000 op->op == Transaction::OP_CLONERANGE2 ||
1001 op->op == Transaction::OP_COLL_ADD))
1002 // -ENOENT is usually okay
1003 ok = true;
1004 if (r == -ENODATA)
1005 ok = true;
1006
1007 if (!ok) {
1008 const char *msg = "unexpected error code";
1009
1010 if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE ||
1011 op->op == Transaction::OP_CLONE ||
1012 op->op == Transaction::OP_CLONERANGE2))
1013 msg = "ENOENT on clone suggests osd bug";
1014
1015 if (r == -ENOSPC)
1016 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
1017 // by partially applying transactions.
1018 msg = "ENOSPC from MemStore, misconfigured cluster or insufficient memory";
1019
1020 if (r == -ENOTEMPTY) {
1021 msg = "ENOTEMPTY suggests garbage data in osd data dir";
1022 dump_all();
1023 }
1024
1025 derr << " error " << cpp_strerror(r) << " not handled on operation " << op->op
1026 << " (op " << pos << ", counting from 0)" << dendl;
1027 dout(0) << msg << dendl;
1028 dout(0) << " transaction dump:\n";
1029 ceph::JSONFormatter f(true);
1030 f.open_object_section("transaction");
1031 t.dump(&f);
1032 f.close_section();
1033 f.flush(*_dout);
1034 *_dout << dendl;
1035 ceph_abort_msg("unexpected error");
1036 }
1037 }
1038
1039 ++pos;
1040 }
1041 }
1042
1043 int MemStore::_touch(const coll_t& cid, const ghobject_t& oid)
1044 {
1045 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1046 CollectionRef c = get_collection(cid);
1047 if (!c)
1048 return -ENOENT;
1049
1050 c->get_or_create_object(oid);
1051 return 0;
1052 }
1053
1054 int MemStore::_write(const coll_t& cid, const ghobject_t& oid,
1055 uint64_t offset, size_t len, const ceph::buffer::list& bl,
1056 uint32_t fadvise_flags)
1057 {
1058 dout(10) << __func__ << " " << cid << " " << oid << " "
1059 << offset << "~" << len << dendl;
1060 ceph_assert(len == bl.length());
1061
1062 CollectionRef c = get_collection(cid);
1063 if (!c)
1064 return -ENOENT;
1065
1066 ObjectRef o = c->get_or_create_object(oid);
1067 if (len > 0 && !cct->_conf->memstore_debug_omit_block_device_write) {
1068 const ssize_t old_size = o->get_size();
1069 o->write(offset, bl);
1070 used_bytes += (o->get_size() - old_size);
1071 }
1072
1073 return 0;
1074 }
1075
1076 int MemStore::_zero(const coll_t& cid, const ghobject_t& oid,
1077 uint64_t offset, size_t len)
1078 {
1079 dout(10) << __func__ << " " << cid << " " << oid << " " << offset << "~"
1080 << len << dendl;
1081 ceph::buffer::list bl;
1082 bl.append_zero(len);
1083 return _write(cid, oid, offset, len, bl);
1084 }
1085
1086 int MemStore::_truncate(const coll_t& cid, const ghobject_t& oid, uint64_t size)
1087 {
1088 dout(10) << __func__ << " " << cid << " " << oid << " " << size << dendl;
1089 CollectionRef c = get_collection(cid);
1090 if (!c)
1091 return -ENOENT;
1092
1093 ObjectRef o = c->get_object(oid);
1094 if (!o)
1095 return -ENOENT;
1096 if (cct->_conf->memstore_debug_omit_block_device_write)
1097 return 0;
1098 const ssize_t old_size = o->get_size();
1099 int r = o->truncate(size);
1100 used_bytes += (o->get_size() - old_size);
1101 return r;
1102 }
1103
1104 int MemStore::_remove(const coll_t& cid, const ghobject_t& oid)
1105 {
1106 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1107 CollectionRef c = get_collection(cid);
1108 if (!c)
1109 return -ENOENT;
1110 std::lock_guard l{c->lock};
1111
1112 auto i = c->object_hash.find(oid);
1113 if (i == c->object_hash.end())
1114 return -ENOENT;
1115 used_bytes -= i->second->get_size();
1116 c->object_hash.erase(i);
1117 c->object_map.erase(oid);
1118
1119 return 0;
1120 }
1121
1122 int MemStore::_setattrs(const coll_t& cid, const ghobject_t& oid,
1123 std::map<std::string,ceph::buffer::ptr>& aset)
1124 {
1125 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1126 CollectionRef c = get_collection(cid);
1127 if (!c)
1128 return -ENOENT;
1129
1130 ObjectRef o = c->get_object(oid);
1131 if (!o)
1132 return -ENOENT;
1133 std::lock_guard lock{o->xattr_mutex};
1134 for (auto p = aset.begin(); p != aset.end(); ++p)
1135 o->xattr[p->first] = p->second;
1136 return 0;
1137 }
1138
1139 int MemStore::_rmattr(const coll_t& cid, const ghobject_t& oid, const char *name)
1140 {
1141 dout(10) << __func__ << " " << cid << " " << oid << " " << name << dendl;
1142 CollectionRef c = get_collection(cid);
1143 if (!c)
1144 return -ENOENT;
1145
1146 ObjectRef o = c->get_object(oid);
1147 if (!o)
1148 return -ENOENT;
1149 std::lock_guard lock{o->xattr_mutex};
1150 auto i = o->xattr.find(name);
1151 if (i == o->xattr.end())
1152 return -ENODATA;
1153 o->xattr.erase(i);
1154 return 0;
1155 }
1156
1157 int MemStore::_rmattrs(const coll_t& cid, const ghobject_t& oid)
1158 {
1159 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1160 CollectionRef c = get_collection(cid);
1161 if (!c)
1162 return -ENOENT;
1163
1164 ObjectRef o = c->get_object(oid);
1165 if (!o)
1166 return -ENOENT;
1167 std::lock_guard lock{o->xattr_mutex};
1168 o->xattr.clear();
1169 return 0;
1170 }
1171
1172 int MemStore::_clone(const coll_t& cid, const ghobject_t& oldoid,
1173 const ghobject_t& newoid)
1174 {
1175 dout(10) << __func__ << " " << cid << " " << oldoid
1176 << " -> " << newoid << dendl;
1177 CollectionRef c = get_collection(cid);
1178 if (!c)
1179 return -ENOENT;
1180
1181 ObjectRef oo = c->get_object(oldoid);
1182 if (!oo)
1183 return -ENOENT;
1184 ObjectRef no = c->get_or_create_object(newoid);
1185 used_bytes += oo->get_size() - no->get_size();
1186 no->clone(oo.get(), 0, oo->get_size(), 0);
1187
1188 // take xattr and omap locks with std::lock()
1189 std::scoped_lock l{oo->xattr_mutex,
1190 no->xattr_mutex,
1191 oo->omap_mutex,
1192 no->omap_mutex};
1193
1194 no->omap_header = oo->omap_header;
1195 no->omap = oo->omap;
1196 no->xattr = oo->xattr;
1197 return 0;
1198 }
1199
1200 int MemStore::_clone_range(const coll_t& cid, const ghobject_t& oldoid,
1201 const ghobject_t& newoid,
1202 uint64_t srcoff, uint64_t len, uint64_t dstoff)
1203 {
1204 dout(10) << __func__ << " " << cid << " "
1205 << oldoid << " " << srcoff << "~" << len << " -> "
1206 << newoid << " " << dstoff << "~" << len
1207 << dendl;
1208 CollectionRef c = get_collection(cid);
1209 if (!c)
1210 return -ENOENT;
1211
1212 ObjectRef oo = c->get_object(oldoid);
1213 if (!oo)
1214 return -ENOENT;
1215 ObjectRef no = c->get_or_create_object(newoid);
1216 if (srcoff >= oo->get_size())
1217 return 0;
1218 if (srcoff + len >= oo->get_size())
1219 len = oo->get_size() - srcoff;
1220
1221 const ssize_t old_size = no->get_size();
1222 no->clone(oo.get(), srcoff, len, dstoff);
1223 used_bytes += (no->get_size() - old_size);
1224
1225 return len;
1226 }
1227
1228 int MemStore::_omap_clear(const coll_t& cid, const ghobject_t &oid)
1229 {
1230 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1231 CollectionRef c = get_collection(cid);
1232 if (!c)
1233 return -ENOENT;
1234
1235 ObjectRef o = c->get_object(oid);
1236 if (!o)
1237 return -ENOENT;
1238 std::lock_guard lock{o->omap_mutex};
1239 o->omap.clear();
1240 o->omap_header.clear();
1241 return 0;
1242 }
1243
1244 int MemStore::_omap_setkeys(const coll_t& cid, const ghobject_t &oid,
1245 ceph::buffer::list& aset_bl)
1246 {
1247 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1248 CollectionRef c = get_collection(cid);
1249 if (!c)
1250 return -ENOENT;
1251
1252 ObjectRef o = c->get_object(oid);
1253 if (!o)
1254 return -ENOENT;
1255 std::lock_guard lock{o->omap_mutex};
1256 auto p = aset_bl.cbegin();
1257 __u32 num;
1258 decode(num, p);
1259 while (num--) {
1260 std::string key;
1261 decode(key, p);
1262 decode(o->omap[key], p);
1263 }
1264 return 0;
1265 }
1266
1267 int MemStore::_omap_rmkeys(const coll_t& cid, const ghobject_t &oid,
1268 ceph::buffer::list& keys_bl)
1269 {
1270 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1271 CollectionRef c = get_collection(cid);
1272 if (!c)
1273 return -ENOENT;
1274
1275 ObjectRef o = c->get_object(oid);
1276 if (!o)
1277 return -ENOENT;
1278 std::lock_guard lock{o->omap_mutex};
1279 auto p = keys_bl.cbegin();
1280 __u32 num;
1281 decode(num, p);
1282 while (num--) {
1283 std::string key;
1284 decode(key, p);
1285 o->omap.erase(key);
1286 }
1287 return 0;
1288 }
1289
1290 int MemStore::_omap_rmkeyrange(const coll_t& cid, const ghobject_t &oid,
1291 const std::string& first, const std::string& last)
1292 {
1293 dout(10) << __func__ << " " << cid << " " << oid << " " << first
1294 << " " << last << dendl;
1295 CollectionRef c = get_collection(cid);
1296 if (!c)
1297 return -ENOENT;
1298
1299 ObjectRef o = c->get_object(oid);
1300 if (!o)
1301 return -ENOENT;
1302 std::lock_guard lock{o->omap_mutex};
1303 auto p = o->omap.lower_bound(first);
1304 auto e = o->omap.lower_bound(last);
1305 o->omap.erase(p, e);
1306 return 0;
1307 }
1308
1309 int MemStore::_omap_setheader(const coll_t& cid, const ghobject_t &oid,
1310 const ceph::buffer::list &bl)
1311 {
1312 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1313 CollectionRef c = get_collection(cid);
1314 if (!c)
1315 return -ENOENT;
1316
1317 ObjectRef o = c->get_object(oid);
1318 if (!o)
1319 return -ENOENT;
1320 std::lock_guard lock{o->omap_mutex};
1321 o->omap_header = bl;
1322 return 0;
1323 }
1324
1325 int MemStore::_create_collection(const coll_t& cid, int bits)
1326 {
1327 dout(10) << __func__ << " " << cid << dendl;
1328 std::lock_guard l{coll_lock};
1329 auto result = coll_map.insert(std::make_pair(cid, CollectionRef()));
1330 if (!result.second)
1331 return -EEXIST;
1332 auto p = new_coll_map.find(cid);
1333 ceph_assert(p != new_coll_map.end());
1334 result.first->second = p->second;
1335 result.first->second->bits = bits;
1336 new_coll_map.erase(p);
1337 return 0;
1338 }
1339
1340 int MemStore::_destroy_collection(const coll_t& cid)
1341 {
1342 dout(10) << __func__ << " " << cid << dendl;
1343 std::lock_guard l{coll_lock};
1344 ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
1345 if (cp == coll_map.end())
1346 return -ENOENT;
1347 {
1348 std::shared_lock l2{cp->second->lock};
1349 if (!cp->second->object_map.empty())
1350 return -ENOTEMPTY;
1351 cp->second->exists = false;
1352 }
1353 used_bytes -= cp->second->used_bytes();
1354 coll_map.erase(cp);
1355 return 0;
1356 }
1357
1358 int MemStore::_collection_add(const coll_t& cid, const coll_t& ocid, const ghobject_t& oid)
1359 {
1360 dout(10) << __func__ << " " << cid << " " << ocid << " " << oid << dendl;
1361 CollectionRef c = get_collection(cid);
1362 if (!c)
1363 return -ENOENT;
1364 CollectionRef oc = get_collection(ocid);
1365 if (!oc)
1366 return -ENOENT;
1367
1368 std::scoped_lock l{std::min(&(*c), &(*oc))->lock,
1369 std::max(&(*c), &(*oc))->lock};
1370
1371 if (c->object_hash.count(oid))
1372 return -EEXIST;
1373 if (oc->object_hash.count(oid) == 0)
1374 return -ENOENT;
1375 ObjectRef o = oc->object_hash[oid];
1376 c->object_map[oid] = o;
1377 c->object_hash[oid] = o;
1378 return 0;
1379 }
1380
1381 int MemStore::_collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid,
1382 coll_t cid, const ghobject_t& oid)
1383 {
1384 dout(10) << __func__ << " " << oldcid << " " << oldoid << " -> "
1385 << cid << " " << oid << dendl;
1386 CollectionRef c = get_collection(cid);
1387 if (!c)
1388 return -ENOENT;
1389 CollectionRef oc = get_collection(oldcid);
1390 if (!oc)
1391 return -ENOENT;
1392
1393 // note: c and oc may be the same
1394 ceph_assert(&(*c) == &(*oc));
1395
1396 std::lock_guard l{c->lock};
1397 if (c->object_hash.count(oid))
1398 return -EEXIST;
1399 if (oc->object_hash.count(oldoid) == 0)
1400 return -ENOENT;
1401 {
1402 ObjectRef o = oc->object_hash[oldoid];
1403 c->object_map[oid] = o;
1404 c->object_hash[oid] = o;
1405 oc->object_map.erase(oldoid);
1406 oc->object_hash.erase(oldoid);
1407 }
1408 return 0;
1409 }
1410
1411 int MemStore::_split_collection(const coll_t& cid, uint32_t bits, uint32_t match,
1412 coll_t dest)
1413 {
1414 dout(10) << __func__ << " " << cid << " " << bits << " " << match << " "
1415 << dest << dendl;
1416 CollectionRef sc = get_collection(cid);
1417 if (!sc)
1418 return -ENOENT;
1419 CollectionRef dc = get_collection(dest);
1420 if (!dc)
1421 return -ENOENT;
1422
1423 std::scoped_lock l{std::min(&(*sc), &(*dc))->lock,
1424 std::max(&(*sc), &(*dc))->lock};
1425
1426 auto p = sc->object_map.begin();
1427 while (p != sc->object_map.end()) {
1428 if (p->first.match(bits, match)) {
1429 dout(20) << " moving " << p->first << dendl;
1430 dc->object_map.insert(std::make_pair(p->first, p->second));
1431 dc->object_hash.insert(std::make_pair(p->first, p->second));
1432 sc->object_hash.erase(p->first);
1433 sc->object_map.erase(p++);
1434 } else {
1435 ++p;
1436 }
1437 }
1438
1439 sc->bits = bits;
1440 ceph_assert(dc->bits == (int)bits);
1441
1442 return 0;
1443 }
1444
1445 int MemStore::_merge_collection(const coll_t& cid, uint32_t bits, coll_t dest)
1446 {
1447 dout(10) << __func__ << " " << cid << " " << bits << " "
1448 << dest << dendl;
1449 CollectionRef sc = get_collection(cid);
1450 if (!sc)
1451 return -ENOENT;
1452 CollectionRef dc = get_collection(dest);
1453 if (!dc)
1454 return -ENOENT;
1455 {
1456 std::scoped_lock l{std::min(&(*sc), &(*dc))->lock,
1457 std::max(&(*sc), &(*dc))->lock};
1458
1459 auto p = sc->object_map.begin();
1460 while (p != sc->object_map.end()) {
1461 dout(20) << " moving " << p->first << dendl;
1462 dc->object_map.insert(std::make_pair(p->first, p->second));
1463 dc->object_hash.insert(std::make_pair(p->first, p->second));
1464 sc->object_hash.erase(p->first);
1465 sc->object_map.erase(p++);
1466 }
1467
1468 dc->bits = bits;
1469 }
1470
1471 {
1472 std::lock_guard l{coll_lock};
1473 ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
1474 ceph_assert(cp != coll_map.end());
1475 used_bytes -= cp->second->used_bytes();
1476 coll_map.erase(cp);
1477 }
1478
1479 return 0;
1480 }
1481
1482 namespace {
1483 struct BufferlistObject : public MemStore::Object {
1484 ceph::spinlock mutex;
1485 ceph::buffer::list data;
1486
1487 size_t get_size() const override { return data.length(); }
1488
1489 int read(uint64_t offset, uint64_t len, ceph::buffer::list &bl) override;
1490 int write(uint64_t offset, const ceph::buffer::list &bl) override;
1491 int clone(Object *src, uint64_t srcoff, uint64_t len,
1492 uint64_t dstoff) override;
1493 int truncate(uint64_t offset) override;
1494
1495 void encode(ceph::buffer::list& bl) const override {
1496 ENCODE_START(1, 1, bl);
1497 encode(data, bl);
1498 encode_base(bl);
1499 ENCODE_FINISH(bl);
1500 }
1501 void decode(ceph::buffer::list::const_iterator& p) override {
1502 DECODE_START(1, p);
1503 decode(data, p);
1504 decode_base(p);
1505 DECODE_FINISH(p);
1506 }
1507 };
1508 }
1509 // BufferlistObject
1510 int BufferlistObject::read(uint64_t offset, uint64_t len,
1511 ceph::buffer::list &bl)
1512 {
1513 std::lock_guard<decltype(mutex)> lock(mutex);
1514 bl.substr_of(data, offset, len);
1515 return bl.length();
1516 }
1517
1518 int BufferlistObject::write(uint64_t offset, const ceph::buffer::list &src)
1519 {
1520 unsigned len = src.length();
1521
1522 std::lock_guard<decltype(mutex)> lock(mutex);
1523
1524 // before
1525 ceph::buffer::list newdata;
1526 if (get_size() >= offset) {
1527 newdata.substr_of(data, 0, offset);
1528 } else {
1529 if (get_size()) {
1530 newdata.substr_of(data, 0, get_size());
1531 }
1532 newdata.append_zero(offset - get_size());
1533 }
1534
1535 newdata.append(src);
1536
1537 // after
1538 if (get_size() > offset + len) {
1539 ceph::buffer::list tail;
1540 tail.substr_of(data, offset + len, get_size() - (offset + len));
1541 newdata.append(tail);
1542 }
1543
1544 data = std::move(newdata);
1545 return 0;
1546 }
1547
1548 int BufferlistObject::clone(Object *src, uint64_t srcoff,
1549 uint64_t len, uint64_t dstoff)
1550 {
1551 auto srcbl = dynamic_cast<BufferlistObject*>(src);
1552 if (srcbl == nullptr)
1553 return -ENOTSUP;
1554
1555 ceph::buffer::list bl;
1556 {
1557 std::lock_guard<decltype(srcbl->mutex)> lock(srcbl->mutex);
1558 if (srcoff == dstoff && len == src->get_size()) {
1559 data = srcbl->data;
1560 return 0;
1561 }
1562 bl.substr_of(srcbl->data, srcoff, len);
1563 }
1564 return write(dstoff, bl);
1565 }
1566
1567 int BufferlistObject::truncate(uint64_t size)
1568 {
1569 std::lock_guard<decltype(mutex)> lock(mutex);
1570 if (get_size() > size) {
1571 ceph::buffer::list bl;
1572 bl.substr_of(data, 0, size);
1573 data = std::move(bl);
1574 } else if (get_size() == size) {
1575 // do nothing
1576 } else {
1577 data.append_zero(size - get_size());
1578 }
1579 return 0;
1580 }
1581
1582 // PageSetObject
1583
1584 struct MemStore::PageSetObject : public Object {
1585 PageSet data;
1586 uint64_t data_len;
1587 #if defined(__GLIBCXX__)
1588 // use a thread-local vector for the pages returned by PageSet, so we
1589 // can avoid allocations in read/write()
1590 static thread_local PageSet::page_vector tls_pages;
1591 #endif
1592
1593 size_t get_size() const override { return data_len; }
1594
1595 int read(uint64_t offset, uint64_t len, ceph::buffer::list &bl) override;
1596 int write(uint64_t offset, const ceph::buffer::list &bl) override;
1597 int clone(Object *src, uint64_t srcoff, uint64_t len,
1598 uint64_t dstoff) override;
1599 int truncate(uint64_t offset) override;
1600
1601 void encode(ceph::buffer::list& bl) const override {
1602 ENCODE_START(1, 1, bl);
1603 encode(data_len, bl);
1604 data.encode(bl);
1605 encode_base(bl);
1606 ENCODE_FINISH(bl);
1607 }
1608 void decode(ceph::buffer::list::const_iterator& p) override {
1609 DECODE_START(1, p);
1610 decode(data_len, p);
1611 data.decode(p);
1612 decode_base(p);
1613 DECODE_FINISH(p);
1614 }
1615
1616 private:
1617 FRIEND_MAKE_REF(PageSetObject);
1618 explicit PageSetObject(size_t page_size) : data(page_size), data_len(0) {}
1619 };
1620
1621 #if defined(__GLIBCXX__)
1622 // use a thread-local vector for the pages returned by PageSet, so we
1623 // can avoid allocations in read/write()
1624 thread_local PageSet::page_vector MemStore::PageSetObject::tls_pages;
1625 #define DEFINE_PAGE_VECTOR(name)
1626 #else
1627 #define DEFINE_PAGE_VECTOR(name) PageSet::page_vector name;
1628 #endif
1629
1630 int MemStore::PageSetObject::read(uint64_t offset, uint64_t len, ceph::buffer::list& bl)
1631 {
1632 const auto start = offset;
1633 const auto end = offset + len;
1634 auto remaining = len;
1635
1636 DEFINE_PAGE_VECTOR(tls_pages);
1637 data.get_range(offset, len, tls_pages);
1638
1639 // allocate a buffer for the data
1640 ceph::buffer::ptr buf(len);
1641
1642 auto p = tls_pages.begin();
1643 while (remaining) {
1644 // no more pages in range
1645 if (p == tls_pages.end() || (*p)->offset >= end) {
1646 buf.zero(offset - start, remaining);
1647 break;
1648 }
1649 auto page = *p;
1650
1651 // fill any holes between pages with zeroes
1652 if (page->offset > offset) {
1653 const auto count = std::min(remaining, page->offset - offset);
1654 buf.zero(offset - start, count);
1655 remaining -= count;
1656 offset = page->offset;
1657 if (!remaining)
1658 break;
1659 }
1660
1661 // read from page
1662 const auto page_offset = offset - page->offset;
1663 const auto count = std::min(remaining, data.get_page_size() - page_offset);
1664
1665 buf.copy_in(offset - start, count, page->data + page_offset);
1666
1667 remaining -= count;
1668 offset += count;
1669
1670 ++p;
1671 }
1672
1673 tls_pages.clear(); // drop page refs
1674
1675 bl.append(std::move(buf));
1676 return len;
1677 }
1678
1679 int MemStore::PageSetObject::write(uint64_t offset, const ceph::buffer::list &src)
1680 {
1681 unsigned len = src.length();
1682
1683 DEFINE_PAGE_VECTOR(tls_pages);
1684 // make sure the page range is allocated
1685 data.alloc_range(offset, src.length(), tls_pages);
1686
1687 auto page = tls_pages.begin();
1688
1689 auto p = src.begin();
1690 while (len > 0) {
1691 unsigned page_offset = offset - (*page)->offset;
1692 unsigned pageoff = data.get_page_size() - page_offset;
1693 unsigned count = std::min(len, pageoff);
1694 p.copy(count, (*page)->data + page_offset);
1695 offset += count;
1696 len -= count;
1697 if (count == pageoff)
1698 ++page;
1699 }
1700 if (data_len < offset)
1701 data_len = offset;
1702 tls_pages.clear(); // drop page refs
1703 return 0;
1704 }
1705
1706 int MemStore::PageSetObject::clone(Object *src, uint64_t srcoff,
1707 uint64_t len, uint64_t dstoff)
1708 {
1709 const int64_t delta = dstoff - srcoff;
1710
1711 auto &src_data = static_cast<PageSetObject*>(src)->data;
1712 const uint64_t src_page_size = src_data.get_page_size();
1713
1714 auto &dst_data = data;
1715 const auto dst_page_size = dst_data.get_page_size();
1716
1717 DEFINE_PAGE_VECTOR(tls_pages);
1718 PageSet::page_vector dst_pages;
1719
1720 while (len) {
1721 // limit to 16 pages at a time so tls_pages doesn't balloon in size
1722 auto count = std::min(len, (uint64_t)src_page_size * 16);
1723 src_data.get_range(srcoff, count, tls_pages);
1724
1725 // allocate the destination range
1726 // TODO: avoid allocating pages for holes in the source range
1727 dst_data.alloc_range(srcoff + delta, count, dst_pages);
1728 auto dst_iter = dst_pages.begin();
1729
1730 for (auto &src_page : tls_pages) {
1731 auto sbegin = std::max(srcoff, src_page->offset);
1732 auto send = std::min(srcoff + count, src_page->offset + src_page_size);
1733
1734 // zero-fill holes before src_page
1735 if (srcoff < sbegin) {
1736 while (dst_iter != dst_pages.end()) {
1737 auto &dst_page = *dst_iter;
1738 auto dbegin = std::max(srcoff + delta, dst_page->offset);
1739 auto dend = std::min(sbegin + delta, dst_page->offset + dst_page_size);
1740 std::fill(dst_page->data + dbegin - dst_page->offset,
1741 dst_page->data + dend - dst_page->offset, 0);
1742 if (dend < dst_page->offset + dst_page_size)
1743 break;
1744 ++dst_iter;
1745 }
1746 const auto c = sbegin - srcoff;
1747 count -= c;
1748 len -= c;
1749 }
1750
1751 // copy data from src page to dst pages
1752 while (dst_iter != dst_pages.end()) {
1753 auto &dst_page = *dst_iter;
1754 auto dbegin = std::max(sbegin + delta, dst_page->offset);
1755 auto dend = std::min(send + delta, dst_page->offset + dst_page_size);
1756
1757 std::copy(src_page->data + (dbegin - delta) - src_page->offset,
1758 src_page->data + (dend - delta) - src_page->offset,
1759 dst_page->data + dbegin - dst_page->offset);
1760 if (dend < dst_page->offset + dst_page_size)
1761 break;
1762 ++dst_iter;
1763 }
1764
1765 const auto c = send - sbegin;
1766 count -= c;
1767 len -= c;
1768 srcoff = send;
1769 dstoff = send + delta;
1770 }
1771 tls_pages.clear(); // drop page refs
1772
1773 // zero-fill holes after the last src_page
1774 if (count > 0) {
1775 while (dst_iter != dst_pages.end()) {
1776 auto &dst_page = *dst_iter;
1777 auto dbegin = std::max(dstoff, dst_page->offset);
1778 auto dend = std::min(dstoff + count, dst_page->offset + dst_page_size);
1779 std::fill(dst_page->data + dbegin - dst_page->offset,
1780 dst_page->data + dend - dst_page->offset, 0);
1781 ++dst_iter;
1782 }
1783 srcoff += count;
1784 dstoff += count;
1785 len -= count;
1786 }
1787 dst_pages.clear(); // drop page refs
1788 }
1789
1790 // update object size
1791 if (data_len < dstoff)
1792 data_len = dstoff;
1793 return 0;
1794 }
1795
1796 int MemStore::PageSetObject::truncate(uint64_t size)
1797 {
1798 data.free_pages_after(size);
1799 data_len = size;
1800
1801 const auto page_size = data.get_page_size();
1802 const auto page_offset = size & ~(page_size-1);
1803 if (page_offset == size)
1804 return 0;
1805
1806 DEFINE_PAGE_VECTOR(tls_pages);
1807 // write zeroes to the rest of the last page
1808 data.get_range(page_offset, page_size, tls_pages);
1809 if (tls_pages.empty())
1810 return 0;
1811
1812 auto page = tls_pages.begin();
1813 auto data = (*page)->data;
1814 std::fill(data + (size - page_offset), data + page_size, 0);
1815 tls_pages.clear(); // drop page ref
1816 return 0;
1817 }
1818
1819
1820 MemStore::ObjectRef MemStore::Collection::create_object() const {
1821 if (use_page_set)
1822 return ceph::make_ref<PageSetObject>(cct->_conf->memstore_page_size);
1823 return new BufferlistObject();
1824 }