]> git.proxmox.com Git - ceph.git/blame - ceph/src/os/memstore/MemStore.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / os / memstore / MemStore.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14#include "acconfig.h"
15
16#ifdef HAVE_SYS_MOUNT_H
17#include <sys/mount.h>
18#endif
19
20#ifdef HAVE_SYS_PARAM_H
21#include <sys/param.h>
22#endif
23
24#include "include/types.h"
25#include "include/stringify.h"
26#include "include/unordered_map.h"
7c673cae
FG
27#include "common/errno.h"
28#include "MemStore.h"
29#include "include/compat.h"
30
31#define dout_context cct
32#define dout_subsys ceph_subsys_filestore
33#undef dout_prefix
34#define dout_prefix *_dout << "memstore(" << path << ") "
35
36// for comparing collections for lock ordering
37bool operator>(const MemStore::CollectionRef& l,
38 const MemStore::CollectionRef& r)
39{
40 return (unsigned long)l.get() > (unsigned long)r.get();
41}
42
43
44int MemStore::mount()
45{
46 int r = _load();
47 if (r < 0)
48 return r;
49 finisher.start();
50 return 0;
51}
52
53int MemStore::umount()
54{
55 finisher.wait_for_empty();
56 finisher.stop();
57 return _save();
58}
59
60int MemStore::_save()
61{
62 dout(10) << __func__ << dendl;
63 dump_all();
64 set<coll_t> collections;
65 for (ceph::unordered_map<coll_t,CollectionRef>::iterator p = coll_map.begin();
66 p != coll_map.end();
67 ++p) {
68 dout(20) << __func__ << " coll " << p->first << " " << p->second << dendl;
69 collections.insert(p->first);
70 bufferlist bl;
11fdf7f2 71 ceph_assert(p->second);
7c673cae
FG
72 p->second->encode(bl);
73 string fn = path + "/" + stringify(p->first);
74 int r = bl.write_file(fn.c_str());
75 if (r < 0)
76 return r;
77 }
78
79 string fn = path + "/collections";
80 bufferlist bl;
11fdf7f2 81 encode(collections, bl);
7c673cae
FG
82 int r = bl.write_file(fn.c_str());
83 if (r < 0)
84 return r;
85
86 return 0;
87}
88
89void MemStore::dump_all()
90{
91 Formatter *f = Formatter::create("json-pretty");
92 f->open_object_section("store");
93 dump(f);
94 f->close_section();
95 dout(0) << "dump:";
96 f->flush(*_dout);
97 *_dout << dendl;
98 delete f;
99}
100
101void MemStore::dump(Formatter *f)
102{
103 f->open_array_section("collections");
104 for (ceph::unordered_map<coll_t,CollectionRef>::iterator p = coll_map.begin();
105 p != coll_map.end();
106 ++p) {
107 f->open_object_section("collection");
108 f->dump_string("name", stringify(p->first));
109
110 f->open_array_section("xattrs");
111 for (map<string,bufferptr>::iterator q = p->second->xattr.begin();
112 q != p->second->xattr.end();
113 ++q) {
114 f->open_object_section("xattr");
115 f->dump_string("name", q->first);
116 f->dump_int("length", q->second.length());
117 f->close_section();
118 }
119 f->close_section();
120
121 f->open_array_section("objects");
122 for (map<ghobject_t,ObjectRef>::iterator q = p->second->object_map.begin();
123 q != p->second->object_map.end();
124 ++q) {
125 f->open_object_section("object");
126 f->dump_string("name", stringify(q->first));
127 if (q->second)
128 q->second->dump(f);
129 f->close_section();
130 }
131 f->close_section();
132
133 f->close_section();
134 }
135 f->close_section();
136}
137
138int MemStore::_load()
139{
140 dout(10) << __func__ << dendl;
141 bufferlist bl;
142 string fn = path + "/collections";
143 string err;
144 int r = bl.read_file(fn.c_str(), &err);
145 if (r < 0)
146 return r;
147
148 set<coll_t> collections;
11fdf7f2
TL
149 auto p = bl.cbegin();
150 decode(collections, p);
7c673cae
FG
151
152 for (set<coll_t>::iterator q = collections.begin();
153 q != collections.end();
154 ++q) {
155 string fn = path + "/" + stringify(*q);
156 bufferlist cbl;
157 int r = cbl.read_file(fn.c_str(), &err);
158 if (r < 0)
159 return r;
9f95a23c 160 auto c = ceph::make_ref<Collection>(cct, *q);
11fdf7f2 161 auto p = cbl.cbegin();
7c673cae
FG
162 c->decode(p);
163 coll_map[*q] = c;
164 used_bytes += c->used_bytes();
165 }
166
167 dump_all();
168
169 return 0;
170}
171
172void MemStore::set_fsid(uuid_d u)
173{
b32b8144 174 int r = write_meta("fsid", stringify(u));
11fdf7f2 175 ceph_assert(r >= 0);
7c673cae
FG
176}
177
178uuid_d MemStore::get_fsid()
179{
180 string fsid_str;
b32b8144 181 int r = read_meta("fsid", &fsid_str);
11fdf7f2 182 ceph_assert(r >= 0);
7c673cae
FG
183 uuid_d uuid;
184 bool b = uuid.parse(fsid_str.c_str());
11fdf7f2 185 ceph_assert(b);
7c673cae
FG
186 return uuid;
187}
188
189int MemStore::mkfs()
190{
191 string fsid_str;
b32b8144 192 int r = read_meta("fsid", &fsid_str);
7c673cae
FG
193 if (r == -ENOENT) {
194 uuid_d fsid;
195 fsid.generate_random();
196 fsid_str = stringify(fsid);
b32b8144 197 r = write_meta("fsid", fsid_str);
7c673cae
FG
198 if (r < 0)
199 return r;
200 dout(1) << __func__ << " new fsid " << fsid_str << dendl;
201 } else if (r < 0) {
202 return r;
203 } else {
204 dout(1) << __func__ << " had fsid " << fsid_str << dendl;
205 }
206
207 string fn = path + "/collections";
208 derr << path << dendl;
209 bufferlist bl;
210 set<coll_t> collections;
11fdf7f2 211 encode(collections, bl);
7c673cae
FG
212 r = bl.write_file(fn.c_str());
213 if (r < 0)
214 return r;
215
216 r = write_meta("type", "memstore");
217 if (r < 0)
218 return r;
219
220 return 0;
221}
222
11fdf7f2 223int MemStore::statfs(struct store_statfs_t *st, osd_alert_list_t* alerts)
7c673cae 224{
11fdf7f2
TL
225 dout(10) << __func__ << dendl;
226 if (alerts) {
227 alerts->clear(); // returns nothing for now
228 }
7c673cae
FG
229 st->reset();
230 st->total = cct->_conf->memstore_device_bytes;
11fdf7f2 231 st->available = std::max<int64_t>(st->total - used_bytes, 0);
7c673cae
FG
232 dout(10) << __func__ << ": used_bytes: " << used_bytes
233 << "/" << cct->_conf->memstore_device_bytes << dendl;
234 return 0;
235}
236
9f95a23c
TL
237int MemStore::pool_statfs(uint64_t pool_id, struct store_statfs_t *buf,
238 bool *per_pool_omap)
11fdf7f2
TL
239{
240 return -ENOTSUP;
241}
242
7c673cae
FG
243objectstore_perf_stat_t MemStore::get_cur_stats()
244{
245 // fixme
246 return objectstore_perf_stat_t();
247}
248
249MemStore::CollectionRef MemStore::get_collection(const coll_t& cid)
250{
11fdf7f2 251 std::shared_lock l{coll_lock};
7c673cae
FG
252 ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
253 if (cp == coll_map.end())
254 return CollectionRef();
255 return cp->second;
256}
257
11fdf7f2
TL
258ObjectStore::CollectionHandle MemStore::create_new_collection(const coll_t& cid)
259{
260 std::lock_guard l{coll_lock};
9f95a23c 261 auto c = ceph::make_ref<Collection>(cct, cid);
11fdf7f2
TL
262 new_coll_map[cid] = c;
263 return c;
264}
265
7c673cae
FG
266
267// ---------------
268// read operations
269
7c673cae
FG
270bool MemStore::exists(CollectionHandle &c_, const ghobject_t& oid)
271{
272 Collection *c = static_cast<Collection*>(c_.get());
273 dout(10) << __func__ << " " << c->get_cid() << " " << oid << dendl;
274 if (!c->exists)
275 return false;
276
277 // Perform equivalent of c->get_object_(oid) != NULL. In C++11 the
278 // shared_ptr needs to be compared to nullptr.
279 return (bool)c->get_object(oid);
280}
281
7c673cae
FG
282int MemStore::stat(
283 CollectionHandle &c_,
284 const ghobject_t& oid,
285 struct stat *st,
286 bool allow_eio)
287{
288 Collection *c = static_cast<Collection*>(c_.get());
289 dout(10) << __func__ << " " << c->cid << " " << oid << dendl;
290 if (!c->exists)
291 return -ENOENT;
292 ObjectRef o = c->get_object(oid);
293 if (!o)
294 return -ENOENT;
295 st->st_size = o->get_size();
296 st->st_blksize = 4096;
297 st->st_blocks = (st->st_size + st->st_blksize - 1) / st->st_blksize;
298 st->st_nlink = 1;
299 return 0;
300}
301
302int MemStore::set_collection_opts(
11fdf7f2 303 CollectionHandle& ch,
7c673cae
FG
304 const pool_opts_t& opts)
305{
306 return -EOPNOTSUPP;
307}
308
7c673cae
FG
309int MemStore::read(
310 CollectionHandle &c_,
311 const ghobject_t& oid,
312 uint64_t offset,
313 size_t len,
314 bufferlist& bl,
224ce89b 315 uint32_t op_flags)
7c673cae
FG
316{
317 Collection *c = static_cast<Collection*>(c_.get());
318 dout(10) << __func__ << " " << c->cid << " " << oid << " "
319 << offset << "~" << len << dendl;
320 if (!c->exists)
321 return -ENOENT;
322 ObjectRef o = c->get_object(oid);
323 if (!o)
324 return -ENOENT;
325 if (offset >= o->get_size())
326 return 0;
327 size_t l = len;
328 if (l == 0 && offset == 0) // note: len == 0 means read the entire object
329 l = o->get_size();
330 else if (offset + l > o->get_size())
331 l = o->get_size() - offset;
332 bl.clear();
333 return o->read(offset, l, bl);
334}
335
11fdf7f2 336int MemStore::fiemap(CollectionHandle& ch, const ghobject_t& oid,
7c673cae
FG
337 uint64_t offset, size_t len, bufferlist& bl)
338{
339 map<uint64_t, uint64_t> destmap;
11fdf7f2 340 int r = fiemap(ch, oid, offset, len, destmap);
7c673cae 341 if (r >= 0)
11fdf7f2 342 encode(destmap, bl);
7c673cae
FG
343 return r;
344}
345
11fdf7f2 346int MemStore::fiemap(CollectionHandle& ch, const ghobject_t& oid,
7c673cae
FG
347 uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap)
348{
11fdf7f2 349 dout(10) << __func__ << " " << ch->cid << " " << oid << " " << offset << "~"
7c673cae 350 << len << dendl;
11fdf7f2 351 Collection *c = static_cast<Collection*>(ch.get());
7c673cae
FG
352 if (!c)
353 return -ENOENT;
354
355 ObjectRef o = c->get_object(oid);
356 if (!o)
357 return -ENOENT;
358 size_t l = len;
359 if (offset + l > o->get_size())
360 l = o->get_size() - offset;
361 if (offset >= o->get_size())
362 goto out;
363 destmap[offset] = l;
364 out:
365 return 0;
366}
367
7c673cae
FG
368int MemStore::getattr(CollectionHandle &c_, const ghobject_t& oid,
369 const char *name, bufferptr& value)
370{
371 Collection *c = static_cast<Collection*>(c_.get());
372 dout(10) << __func__ << " " << c->cid << " " << oid << " " << name << dendl;
373 if (!c->exists)
374 return -ENOENT;
375 ObjectRef o = c->get_object(oid);
376 if (!o)
377 return -ENOENT;
378 string k(name);
11fdf7f2 379 std::lock_guard lock{o->xattr_mutex};
7c673cae
FG
380 if (!o->xattr.count(k)) {
381 return -ENODATA;
382 }
383 value = o->xattr[k];
384 return 0;
385}
386
7c673cae
FG
387int MemStore::getattrs(CollectionHandle &c_, const ghobject_t& oid,
388 map<string,bufferptr>& aset)
389{
390 Collection *c = static_cast<Collection*>(c_.get());
391 dout(10) << __func__ << " " << c->cid << " " << oid << dendl;
392 if (!c->exists)
393 return -ENOENT;
394
395 ObjectRef o = c->get_object(oid);
396 if (!o)
397 return -ENOENT;
11fdf7f2 398 std::lock_guard lock{o->xattr_mutex};
7c673cae
FG
399 aset = o->xattr;
400 return 0;
401}
402
403int MemStore::list_collections(vector<coll_t>& ls)
404{
405 dout(10) << __func__ << dendl;
11fdf7f2 406 std::shared_lock l{coll_lock};
7c673cae
FG
407 for (ceph::unordered_map<coll_t,CollectionRef>::iterator p = coll_map.begin();
408 p != coll_map.end();
409 ++p) {
410 ls.push_back(p->first);
411 }
412 return 0;
413}
414
415bool MemStore::collection_exists(const coll_t& cid)
416{
417 dout(10) << __func__ << " " << cid << dendl;
11fdf7f2 418 std::shared_lock l{coll_lock};
7c673cae
FG
419 return coll_map.count(cid);
420}
421
11fdf7f2 422int MemStore::collection_empty(CollectionHandle& ch, bool *empty)
7c673cae 423{
11fdf7f2
TL
424 dout(10) << __func__ << " " << ch->cid << dendl;
425 CollectionRef c = static_cast<Collection*>(ch.get());
426 std::shared_lock l{c->lock};
7c673cae
FG
427 *empty = c->object_map.empty();
428 return 0;
429}
430
11fdf7f2 431int MemStore::collection_bits(CollectionHandle& ch)
7c673cae 432{
11fdf7f2
TL
433 dout(10) << __func__ << " " << ch->cid << dendl;
434 Collection *c = static_cast<Collection*>(ch.get());
435 std::shared_lock l{c->lock};
7c673cae
FG
436 return c->bits;
437}
438
11fdf7f2 439int MemStore::collection_list(CollectionHandle& ch,
7c673cae
FG
440 const ghobject_t& start,
441 const ghobject_t& end,
442 int max,
443 vector<ghobject_t> *ls, ghobject_t *next)
444{
11fdf7f2
TL
445 Collection *c = static_cast<Collection*>(ch.get());
446 std::shared_lock l{c->lock};
7c673cae 447
11fdf7f2 448 dout(10) << __func__ << " cid " << ch->cid << " start " << start
7c673cae
FG
449 << " end " << end << dendl;
450 map<ghobject_t,ObjectRef>::iterator p = c->object_map.lower_bound(start);
451 while (p != c->object_map.end() &&
452 ls->size() < (unsigned)max &&
453 p->first < end) {
454 ls->push_back(p->first);
455 ++p;
456 }
457 if (next != NULL) {
458 if (p == c->object_map.end())
459 *next = ghobject_t::get_max();
460 else
461 *next = p->first;
462 }
11fdf7f2 463 dout(10) << __func__ << " cid " << ch->cid << " got " << ls->size() << dendl;
7c673cae
FG
464 return 0;
465}
466
467int MemStore::omap_get(
11fdf7f2
TL
468 CollectionHandle& ch, ///< [in] Collection containing oid
469 const ghobject_t &oid, ///< [in] Object containing omap
470 bufferlist *header, ///< [out] omap header
471 map<string, bufferlist> *out /// < [out] Key to value map
472 )
7c673cae 473{
11fdf7f2
TL
474 dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
475 Collection *c = static_cast<Collection*>(ch.get());
7c673cae
FG
476
477 ObjectRef o = c->get_object(oid);
478 if (!o)
479 return -ENOENT;
11fdf7f2 480 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
481 *header = o->omap_header;
482 *out = o->omap;
483 return 0;
484}
485
486int MemStore::omap_get_header(
11fdf7f2
TL
487 CollectionHandle& ch, ///< [in] Collection containing oid
488 const ghobject_t &oid, ///< [in] Object containing omap
489 bufferlist *header, ///< [out] omap header
490 bool allow_eio ///< [in] don't assert on eio
491 )
492{
493 dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
494 Collection *c = static_cast<Collection*>(ch.get());
7c673cae
FG
495 ObjectRef o = c->get_object(oid);
496 if (!o)
497 return -ENOENT;
11fdf7f2 498 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
499 *header = o->omap_header;
500 return 0;
501}
502
503int MemStore::omap_get_keys(
11fdf7f2
TL
504 CollectionHandle& ch, ///< [in] Collection containing oid
505 const ghobject_t &oid, ///< [in] Object containing omap
506 set<string> *keys ///< [out] Keys defined on oid
507 )
7c673cae 508{
11fdf7f2
TL
509 dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
510 Collection *c = static_cast<Collection*>(ch.get());
7c673cae
FG
511 ObjectRef o = c->get_object(oid);
512 if (!o)
513 return -ENOENT;
11fdf7f2 514 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
515 for (map<string,bufferlist>::iterator p = o->omap.begin();
516 p != o->omap.end();
517 ++p)
518 keys->insert(p->first);
519 return 0;
520}
521
522int MemStore::omap_get_values(
11fdf7f2
TL
523 CollectionHandle& ch, ///< [in] Collection containing oid
524 const ghobject_t &oid, ///< [in] Object containing omap
525 const set<string> &keys, ///< [in] Keys to get
526 map<string, bufferlist> *out ///< [out] Returned keys and values
527 )
528{
529 dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
530 Collection *c = static_cast<Collection*>(ch.get());
7c673cae
FG
531 ObjectRef o = c->get_object(oid);
532 if (!o)
533 return -ENOENT;
11fdf7f2 534 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
535 for (set<string>::const_iterator p = keys.begin();
536 p != keys.end();
537 ++p) {
538 map<string,bufferlist>::iterator q = o->omap.find(*p);
539 if (q != o->omap.end())
540 out->insert(*q);
541 }
542 return 0;
543}
544
545int MemStore::omap_check_keys(
11fdf7f2
TL
546 CollectionHandle& ch, ///< [in] Collection containing oid
547 const ghobject_t &oid, ///< [in] Object containing omap
548 const set<string> &keys, ///< [in] Keys to check
549 set<string> *out ///< [out] Subset of keys defined on oid
550 )
551{
552 dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
553 Collection *c = static_cast<Collection*>(ch.get());
7c673cae
FG
554 ObjectRef o = c->get_object(oid);
555 if (!o)
556 return -ENOENT;
11fdf7f2 557 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
558 for (set<string>::const_iterator p = keys.begin();
559 p != keys.end();
560 ++p) {
561 map<string,bufferlist>::iterator q = o->omap.find(*p);
562 if (q != o->omap.end())
563 out->insert(*p);
564 }
565 return 0;
566}
567
568class MemStore::OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
569 CollectionRef c;
570 ObjectRef o;
571 map<string,bufferlist>::iterator it;
572public:
573 OmapIteratorImpl(CollectionRef c, ObjectRef o)
574 : c(c), o(o), it(o->omap.begin()) {}
575
576 int seek_to_first() override {
11fdf7f2 577 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
578 it = o->omap.begin();
579 return 0;
580 }
581 int upper_bound(const string &after) override {
11fdf7f2 582 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
583 it = o->omap.upper_bound(after);
584 return 0;
585 }
586 int lower_bound(const string &to) override {
11fdf7f2 587 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
588 it = o->omap.lower_bound(to);
589 return 0;
590 }
591 bool valid() override {
11fdf7f2 592 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
593 return it != o->omap.end();
594 }
11fdf7f2
TL
595 int next() override {
596 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
597 ++it;
598 return 0;
599 }
600 string key() override {
11fdf7f2 601 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
602 return it->first;
603 }
604 bufferlist value() override {
11fdf7f2 605 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
606 return it->second;
607 }
608 int status() override {
609 return 0;
610 }
611};
612
11fdf7f2
TL
613ObjectMap::ObjectMapIterator MemStore::get_omap_iterator(
614 CollectionHandle& ch,
615 const ghobject_t& oid)
7c673cae 616{
11fdf7f2
TL
617 dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
618 Collection *c = static_cast<Collection*>(ch.get());
7c673cae
FG
619 ObjectRef o = c->get_object(oid);
620 if (!o)
621 return ObjectMap::ObjectMapIterator();
622 return ObjectMap::ObjectMapIterator(new OmapIteratorImpl(c, o));
623}
624
625
626// ---------------
627// write operations
628
11fdf7f2
TL
629int MemStore::queue_transactions(
630 CollectionHandle& ch,
631 vector<Transaction>& tls,
632 TrackedOpRef op,
633 ThreadPool::TPHandle *handle)
7c673cae
FG
634{
635 // because memstore operations are synchronous, we can implement the
636 // Sequencer with a mutex. this guarantees ordering on a given sequencer,
637 // while allowing operations on different sequencers to happen in parallel
11fdf7f2
TL
638 Collection *c = static_cast<Collection*>(ch.get());
639 std::unique_lock lock{c->sequencer_mutex};
7c673cae
FG
640
641 for (vector<Transaction>::iterator p = tls.begin(); p != tls.end(); ++p) {
642 // poke the TPHandle heartbeat just to exercise that code path
643 if (handle)
644 handle->reset_tp_timeout();
645
646 _do_transaction(*p);
647 }
648
649 Context *on_apply = NULL, *on_apply_sync = NULL, *on_commit = NULL;
650 ObjectStore::Transaction::collect_contexts(tls, &on_apply, &on_commit,
651 &on_apply_sync);
652 if (on_apply_sync)
653 on_apply_sync->complete(0);
654 if (on_apply)
655 finisher.queue(on_apply);
656 if (on_commit)
657 finisher.queue(on_commit);
658 return 0;
659}
660
661void MemStore::_do_transaction(Transaction& t)
662{
663 Transaction::iterator i = t.begin();
664 int pos = 0;
665
666 while (i.have_op()) {
667 Transaction::Op *op = i.decode_op();
668 int r = 0;
669
670 switch (op->op) {
671 case Transaction::OP_NOP:
672 break;
673 case Transaction::OP_TOUCH:
9f95a23c 674 case Transaction::OP_CREATE:
7c673cae
FG
675 {
676 coll_t cid = i.get_cid(op->cid);
677 ghobject_t oid = i.get_oid(op->oid);
678 r = _touch(cid, oid);
679 }
680 break;
681
682 case Transaction::OP_WRITE:
683 {
684 coll_t cid = i.get_cid(op->cid);
685 ghobject_t oid = i.get_oid(op->oid);
686 uint64_t off = op->off;
687 uint64_t len = op->len;
688 uint32_t fadvise_flags = i.get_fadvise_flags();
689 bufferlist bl;
690 i.decode_bl(bl);
691 r = _write(cid, oid, off, len, bl, fadvise_flags);
692 }
693 break;
694
695 case Transaction::OP_ZERO:
696 {
697 coll_t cid = i.get_cid(op->cid);
698 ghobject_t oid = i.get_oid(op->oid);
699 uint64_t off = op->off;
700 uint64_t len = op->len;
701 r = _zero(cid, oid, off, len);
702 }
703 break;
704
705 case Transaction::OP_TRIMCACHE:
706 {
707 // deprecated, no-op
708 }
709 break;
710
711 case Transaction::OP_TRUNCATE:
712 {
713 coll_t cid = i.get_cid(op->cid);
714 ghobject_t oid = i.get_oid(op->oid);
715 uint64_t off = op->off;
716 r = _truncate(cid, oid, off);
717 }
718 break;
719
720 case Transaction::OP_REMOVE:
721 {
722 coll_t cid = i.get_cid(op->cid);
723 ghobject_t oid = i.get_oid(op->oid);
724 r = _remove(cid, oid);
725 }
726 break;
727
728 case Transaction::OP_SETATTR:
729 {
730 coll_t cid = i.get_cid(op->cid);
731 ghobject_t oid = i.get_oid(op->oid);
732 string name = i.decode_string();
733 bufferlist bl;
734 i.decode_bl(bl);
735 map<string, bufferptr> to_set;
736 to_set[name] = bufferptr(bl.c_str(), bl.length());
737 r = _setattrs(cid, oid, to_set);
738 }
739 break;
740
741 case Transaction::OP_SETATTRS:
742 {
743 coll_t cid = i.get_cid(op->cid);
744 ghobject_t oid = i.get_oid(op->oid);
745 map<string, bufferptr> aset;
746 i.decode_attrset(aset);
747 r = _setattrs(cid, oid, aset);
748 }
749 break;
750
751 case Transaction::OP_RMATTR:
752 {
753 coll_t cid = i.get_cid(op->cid);
754 ghobject_t oid = i.get_oid(op->oid);
755 string name = i.decode_string();
756 r = _rmattr(cid, oid, name.c_str());
757 }
758 break;
759
760 case Transaction::OP_RMATTRS:
761 {
762 coll_t cid = i.get_cid(op->cid);
763 ghobject_t oid = i.get_oid(op->oid);
764 r = _rmattrs(cid, oid);
765 }
766 break;
767
768 case Transaction::OP_CLONE:
769 {
770 coll_t cid = i.get_cid(op->cid);
771 ghobject_t oid = i.get_oid(op->oid);
772 ghobject_t noid = i.get_oid(op->dest_oid);
773 r = _clone(cid, oid, noid);
774 }
775 break;
776
777 case Transaction::OP_CLONERANGE:
778 {
779 coll_t cid = i.get_cid(op->cid);
780 ghobject_t oid = i.get_oid(op->oid);
781 ghobject_t noid = i.get_oid(op->dest_oid);
782 uint64_t off = op->off;
783 uint64_t len = op->len;
784 r = _clone_range(cid, oid, noid, off, len, off);
785 }
786 break;
787
788 case Transaction::OP_CLONERANGE2:
789 {
790 coll_t cid = i.get_cid(op->cid);
791 ghobject_t oid = i.get_oid(op->oid);
792 ghobject_t noid = i.get_oid(op->dest_oid);
793 uint64_t srcoff = op->off;
794 uint64_t len = op->len;
795 uint64_t dstoff = op->dest_off;
796 r = _clone_range(cid, oid, noid, srcoff, len, dstoff);
797 }
798 break;
799
800 case Transaction::OP_MKCOLL:
801 {
802 coll_t cid = i.get_cid(op->cid);
803 r = _create_collection(cid, op->split_bits);
804 }
805 break;
806
807 case Transaction::OP_COLL_HINT:
808 {
809 coll_t cid = i.get_cid(op->cid);
810 uint32_t type = op->hint_type;
811 bufferlist hint;
812 i.decode_bl(hint);
11fdf7f2 813 auto hiter = hint.cbegin();
7c673cae
FG
814 if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) {
815 uint32_t pg_num;
816 uint64_t num_objs;
11fdf7f2
TL
817 decode(pg_num, hiter);
818 decode(num_objs, hiter);
7c673cae
FG
819 r = _collection_hint_expected_num_objs(cid, pg_num, num_objs);
820 } else {
821 // Ignore the hint
822 dout(10) << "Unrecognized collection hint type: " << type << dendl;
823 }
824 }
825 break;
826
827 case Transaction::OP_RMCOLL:
828 {
829 coll_t cid = i.get_cid(op->cid);
830 r = _destroy_collection(cid);
831 }
832 break;
833
834 case Transaction::OP_COLL_ADD:
835 {
836 coll_t ocid = i.get_cid(op->cid);
837 coll_t ncid = i.get_cid(op->dest_cid);
838 ghobject_t oid = i.get_oid(op->oid);
839 r = _collection_add(ncid, ocid, oid);
840 }
841 break;
842
843 case Transaction::OP_COLL_REMOVE:
844 {
845 coll_t cid = i.get_cid(op->cid);
846 ghobject_t oid = i.get_oid(op->oid);
847 r = _remove(cid, oid);
848 }
849 break;
850
851 case Transaction::OP_COLL_MOVE:
11fdf7f2 852 ceph_abort_msg("deprecated");
7c673cae
FG
853 break;
854
855 case Transaction::OP_COLL_MOVE_RENAME:
856 {
857 coll_t oldcid = i.get_cid(op->cid);
858 ghobject_t oldoid = i.get_oid(op->oid);
859 coll_t newcid = i.get_cid(op->dest_cid);
860 ghobject_t newoid = i.get_oid(op->dest_oid);
861 r = _collection_move_rename(oldcid, oldoid, newcid, newoid);
862 if (r == -ENOENT)
863 r = 0;
864 }
865 break;
866
867 case Transaction::OP_TRY_RENAME:
868 {
869 coll_t cid = i.get_cid(op->cid);
870 ghobject_t oldoid = i.get_oid(op->oid);
871 ghobject_t newoid = i.get_oid(op->dest_oid);
872 r = _collection_move_rename(cid, oldoid, cid, newoid);
873 if (r == -ENOENT)
874 r = 0;
875 }
876 break;
877
878 case Transaction::OP_COLL_SETATTR:
879 {
11fdf7f2 880 ceph_abort_msg("not implemented");
7c673cae
FG
881 }
882 break;
883
884 case Transaction::OP_COLL_RMATTR:
885 {
11fdf7f2 886 ceph_abort_msg("not implemented");
7c673cae
FG
887 }
888 break;
889
890 case Transaction::OP_COLL_RENAME:
891 {
11fdf7f2 892 ceph_abort_msg("not implemented");
7c673cae
FG
893 }
894 break;
895
896 case Transaction::OP_OMAP_CLEAR:
897 {
898 coll_t cid = i.get_cid(op->cid);
899 ghobject_t oid = i.get_oid(op->oid);
900 r = _omap_clear(cid, oid);
901 }
902 break;
903 case Transaction::OP_OMAP_SETKEYS:
904 {
905 coll_t cid = i.get_cid(op->cid);
906 ghobject_t oid = i.get_oid(op->oid);
907 bufferlist aset_bl;
908 i.decode_attrset_bl(&aset_bl);
909 r = _omap_setkeys(cid, oid, aset_bl);
910 }
911 break;
912 case Transaction::OP_OMAP_RMKEYS:
913 {
914 coll_t cid = i.get_cid(op->cid);
915 ghobject_t oid = i.get_oid(op->oid);
916 bufferlist keys_bl;
917 i.decode_keyset_bl(&keys_bl);
918 r = _omap_rmkeys(cid, oid, keys_bl);
919 }
920 break;
921 case Transaction::OP_OMAP_RMKEYRANGE:
922 {
923 coll_t cid = i.get_cid(op->cid);
924 ghobject_t oid = i.get_oid(op->oid);
925 string first, last;
926 first = i.decode_string();
927 last = i.decode_string();
928 r = _omap_rmkeyrange(cid, oid, first, last);
929 }
930 break;
931 case Transaction::OP_OMAP_SETHEADER:
932 {
933 coll_t cid = i.get_cid(op->cid);
934 ghobject_t oid = i.get_oid(op->oid);
935 bufferlist bl;
936 i.decode_bl(bl);
937 r = _omap_setheader(cid, oid, bl);
938 }
939 break;
940 case Transaction::OP_SPLIT_COLLECTION:
11fdf7f2 941 ceph_abort_msg("deprecated");
7c673cae
FG
942 break;
943 case Transaction::OP_SPLIT_COLLECTION2:
944 {
945 coll_t cid = i.get_cid(op->cid);
946 uint32_t bits = op->split_bits;
947 uint32_t rem = op->split_rem;
948 coll_t dest = i.get_cid(op->dest_cid);
949 r = _split_collection(cid, bits, rem, dest);
950 }
951 break;
11fdf7f2
TL
952 case Transaction::OP_MERGE_COLLECTION:
953 {
954 coll_t cid = i.get_cid(op->cid);
955 uint32_t bits = op->split_bits;
956 coll_t dest = i.get_cid(op->dest_cid);
957 r = _merge_collection(cid, bits, dest);
958 }
959 break;
7c673cae
FG
960
961 case Transaction::OP_SETALLOCHINT:
962 {
963 r = 0;
964 }
965 break;
966
11fdf7f2
TL
967 case Transaction::OP_COLL_SET_BITS:
968 {
969 r = 0;
970 }
971 break;
972
7c673cae
FG
973 default:
974 derr << "bad op " << op->op << dendl;
975 ceph_abort();
976 }
977
978 if (r < 0) {
979 bool ok = false;
980
981 if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE ||
982 op->op == Transaction::OP_CLONE ||
983 op->op == Transaction::OP_CLONERANGE2 ||
984 op->op == Transaction::OP_COLL_ADD))
985 // -ENOENT is usually okay
986 ok = true;
987 if (r == -ENODATA)
988 ok = true;
989
990 if (!ok) {
991 const char *msg = "unexpected error code";
992
993 if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE ||
994 op->op == Transaction::OP_CLONE ||
995 op->op == Transaction::OP_CLONERANGE2))
996 msg = "ENOENT on clone suggests osd bug";
997
998 if (r == -ENOSPC)
999 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
1000 // by partially applying transactions.
1001 msg = "ENOSPC from MemStore, misconfigured cluster or insufficient memory";
1002
1003 if (r == -ENOTEMPTY) {
1004 msg = "ENOTEMPTY suggests garbage data in osd data dir";
1005 dump_all();
1006 }
1007
1008 derr << " error " << cpp_strerror(r) << " not handled on operation " << op->op
1009 << " (op " << pos << ", counting from 0)" << dendl;
1010 dout(0) << msg << dendl;
1011 dout(0) << " transaction dump:\n";
1012 JSONFormatter f(true);
1013 f.open_object_section("transaction");
1014 t.dump(&f);
1015 f.close_section();
1016 f.flush(*_dout);
1017 *_dout << dendl;
11fdf7f2 1018 ceph_abort_msg("unexpected error");
7c673cae
FG
1019 }
1020 }
1021
1022 ++pos;
1023 }
1024}
1025
1026int MemStore::_touch(const coll_t& cid, const ghobject_t& oid)
1027{
1028 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1029 CollectionRef c = get_collection(cid);
1030 if (!c)
1031 return -ENOENT;
1032
1033 c->get_or_create_object(oid);
1034 return 0;
1035}
1036
1037int MemStore::_write(const coll_t& cid, const ghobject_t& oid,
1038 uint64_t offset, size_t len, const bufferlist& bl,
1039 uint32_t fadvise_flags)
1040{
1041 dout(10) << __func__ << " " << cid << " " << oid << " "
1042 << offset << "~" << len << dendl;
11fdf7f2 1043 ceph_assert(len == bl.length());
7c673cae
FG
1044
1045 CollectionRef c = get_collection(cid);
1046 if (!c)
1047 return -ENOENT;
1048
1049 ObjectRef o = c->get_or_create_object(oid);
9f95a23c 1050 if (len > 0 && !cct->_conf->memstore_debug_omit_block_device_write) {
7c673cae
FG
1051 const ssize_t old_size = o->get_size();
1052 o->write(offset, bl);
1053 used_bytes += (o->get_size() - old_size);
1054 }
1055
1056 return 0;
1057}
1058
1059int MemStore::_zero(const coll_t& cid, const ghobject_t& oid,
1060 uint64_t offset, size_t len)
1061{
1062 dout(10) << __func__ << " " << cid << " " << oid << " " << offset << "~"
1063 << len << dendl;
1064 bufferlist bl;
1065 bl.append_zero(len);
1066 return _write(cid, oid, offset, len, bl);
1067}
1068
1069int MemStore::_truncate(const coll_t& cid, const ghobject_t& oid, uint64_t size)
1070{
1071 dout(10) << __func__ << " " << cid << " " << oid << " " << size << dendl;
1072 CollectionRef c = get_collection(cid);
1073 if (!c)
1074 return -ENOENT;
1075
1076 ObjectRef o = c->get_object(oid);
1077 if (!o)
1078 return -ENOENT;
9f95a23c
TL
1079 if (cct->_conf->memstore_debug_omit_block_device_write)
1080 return 0;
7c673cae
FG
1081 const ssize_t old_size = o->get_size();
1082 int r = o->truncate(size);
1083 used_bytes += (o->get_size() - old_size);
1084 return r;
1085}
1086
1087int MemStore::_remove(const coll_t& cid, const ghobject_t& oid)
1088{
1089 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1090 CollectionRef c = get_collection(cid);
1091 if (!c)
1092 return -ENOENT;
11fdf7f2 1093 std::lock_guard l{c->lock};
7c673cae
FG
1094
1095 auto i = c->object_hash.find(oid);
1096 if (i == c->object_hash.end())
1097 return -ENOENT;
1098 used_bytes -= i->second->get_size();
1099 c->object_hash.erase(i);
1100 c->object_map.erase(oid);
1101
1102 return 0;
1103}
1104
1105int MemStore::_setattrs(const coll_t& cid, const ghobject_t& oid,
1106 map<string,bufferptr>& aset)
1107{
1108 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1109 CollectionRef c = get_collection(cid);
1110 if (!c)
1111 return -ENOENT;
1112
1113 ObjectRef o = c->get_object(oid);
1114 if (!o)
1115 return -ENOENT;
11fdf7f2 1116 std::lock_guard lock{o->xattr_mutex};
7c673cae
FG
1117 for (map<string,bufferptr>::const_iterator p = aset.begin(); p != aset.end(); ++p)
1118 o->xattr[p->first] = p->second;
1119 return 0;
1120}
1121
1122int MemStore::_rmattr(const coll_t& cid, const ghobject_t& oid, const char *name)
1123{
1124 dout(10) << __func__ << " " << cid << " " << oid << " " << name << dendl;
1125 CollectionRef c = get_collection(cid);
1126 if (!c)
1127 return -ENOENT;
1128
1129 ObjectRef o = c->get_object(oid);
1130 if (!o)
1131 return -ENOENT;
11fdf7f2 1132 std::lock_guard lock{o->xattr_mutex};
7c673cae
FG
1133 auto i = o->xattr.find(name);
1134 if (i == o->xattr.end())
1135 return -ENODATA;
1136 o->xattr.erase(i);
1137 return 0;
1138}
1139
1140int MemStore::_rmattrs(const coll_t& cid, const ghobject_t& oid)
1141{
1142 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1143 CollectionRef c = get_collection(cid);
1144 if (!c)
1145 return -ENOENT;
1146
1147 ObjectRef o = c->get_object(oid);
1148 if (!o)
1149 return -ENOENT;
11fdf7f2 1150 std::lock_guard lock{o->xattr_mutex};
7c673cae
FG
1151 o->xattr.clear();
1152 return 0;
1153}
1154
1155int MemStore::_clone(const coll_t& cid, const ghobject_t& oldoid,
1156 const ghobject_t& newoid)
1157{
1158 dout(10) << __func__ << " " << cid << " " << oldoid
1159 << " -> " << newoid << dendl;
1160 CollectionRef c = get_collection(cid);
1161 if (!c)
1162 return -ENOENT;
1163
1164 ObjectRef oo = c->get_object(oldoid);
1165 if (!oo)
1166 return -ENOENT;
1167 ObjectRef no = c->get_or_create_object(newoid);
1168 used_bytes += oo->get_size() - no->get_size();
1169 no->clone(oo.get(), 0, oo->get_size(), 0);
1170
1171 // take xattr and omap locks with std::lock()
11fdf7f2
TL
1172 std::scoped_lock l{oo->xattr_mutex,
1173 no->xattr_mutex,
1174 oo->omap_mutex,
1175 no->omap_mutex};
7c673cae
FG
1176
1177 no->omap_header = oo->omap_header;
1178 no->omap = oo->omap;
1179 no->xattr = oo->xattr;
1180 return 0;
1181}
1182
1183int MemStore::_clone_range(const coll_t& cid, const ghobject_t& oldoid,
1184 const ghobject_t& newoid,
1185 uint64_t srcoff, uint64_t len, uint64_t dstoff)
1186{
1187 dout(10) << __func__ << " " << cid << " "
1188 << oldoid << " " << srcoff << "~" << len << " -> "
1189 << newoid << " " << dstoff << "~" << len
1190 << dendl;
1191 CollectionRef c = get_collection(cid);
1192 if (!c)
1193 return -ENOENT;
1194
1195 ObjectRef oo = c->get_object(oldoid);
1196 if (!oo)
1197 return -ENOENT;
1198 ObjectRef no = c->get_or_create_object(newoid);
1199 if (srcoff >= oo->get_size())
1200 return 0;
1201 if (srcoff + len >= oo->get_size())
1202 len = oo->get_size() - srcoff;
1203
1204 const ssize_t old_size = no->get_size();
1205 no->clone(oo.get(), srcoff, len, dstoff);
1206 used_bytes += (no->get_size() - old_size);
1207
1208 return len;
1209}
1210
1211int MemStore::_omap_clear(const coll_t& cid, const ghobject_t &oid)
1212{
1213 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1214 CollectionRef c = get_collection(cid);
1215 if (!c)
1216 return -ENOENT;
1217
1218 ObjectRef o = c->get_object(oid);
1219 if (!o)
1220 return -ENOENT;
11fdf7f2 1221 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
1222 o->omap.clear();
1223 o->omap_header.clear();
1224 return 0;
1225}
1226
1227int MemStore::_omap_setkeys(const coll_t& cid, const ghobject_t &oid,
1228 bufferlist& aset_bl)
1229{
1230 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1231 CollectionRef c = get_collection(cid);
1232 if (!c)
1233 return -ENOENT;
1234
1235 ObjectRef o = c->get_object(oid);
1236 if (!o)
1237 return -ENOENT;
11fdf7f2
TL
1238 std::lock_guard lock{o->omap_mutex};
1239 auto p = aset_bl.cbegin();
7c673cae 1240 __u32 num;
11fdf7f2 1241 decode(num, p);
7c673cae
FG
1242 while (num--) {
1243 string key;
11fdf7f2
TL
1244 decode(key, p);
1245 decode(o->omap[key], p);
7c673cae
FG
1246 }
1247 return 0;
1248}
1249
1250int MemStore::_omap_rmkeys(const coll_t& cid, const ghobject_t &oid,
1251 bufferlist& keys_bl)
1252{
1253 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1254 CollectionRef c = get_collection(cid);
1255 if (!c)
1256 return -ENOENT;
1257
1258 ObjectRef o = c->get_object(oid);
1259 if (!o)
1260 return -ENOENT;
11fdf7f2
TL
1261 std::lock_guard lock{o->omap_mutex};
1262 auto p = keys_bl.cbegin();
7c673cae 1263 __u32 num;
11fdf7f2 1264 decode(num, p);
7c673cae
FG
1265 while (num--) {
1266 string key;
11fdf7f2 1267 decode(key, p);
7c673cae
FG
1268 o->omap.erase(key);
1269 }
1270 return 0;
1271}
1272
1273int MemStore::_omap_rmkeyrange(const coll_t& cid, const ghobject_t &oid,
1274 const string& first, const string& last)
1275{
1276 dout(10) << __func__ << " " << cid << " " << oid << " " << first
1277 << " " << last << dendl;
1278 CollectionRef c = get_collection(cid);
1279 if (!c)
1280 return -ENOENT;
1281
1282 ObjectRef o = c->get_object(oid);
1283 if (!o)
1284 return -ENOENT;
11fdf7f2 1285 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
1286 map<string,bufferlist>::iterator p = o->omap.lower_bound(first);
1287 map<string,bufferlist>::iterator e = o->omap.lower_bound(last);
1288 o->omap.erase(p, e);
1289 return 0;
1290}
1291
1292int MemStore::_omap_setheader(const coll_t& cid, const ghobject_t &oid,
1293 const bufferlist &bl)
1294{
1295 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1296 CollectionRef c = get_collection(cid);
1297 if (!c)
1298 return -ENOENT;
1299
1300 ObjectRef o = c->get_object(oid);
1301 if (!o)
1302 return -ENOENT;
11fdf7f2 1303 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
1304 o->omap_header = bl;
1305 return 0;
1306}
1307
1308int MemStore::_create_collection(const coll_t& cid, int bits)
1309{
1310 dout(10) << __func__ << " " << cid << dendl;
11fdf7f2 1311 std::lock_guard l{coll_lock};
7c673cae
FG
1312 auto result = coll_map.insert(std::make_pair(cid, CollectionRef()));
1313 if (!result.second)
1314 return -EEXIST;
11fdf7f2
TL
1315 auto p = new_coll_map.find(cid);
1316 ceph_assert(p != new_coll_map.end());
1317 result.first->second = p->second;
7c673cae 1318 result.first->second->bits = bits;
11fdf7f2 1319 new_coll_map.erase(p);
7c673cae
FG
1320 return 0;
1321}
1322
1323int MemStore::_destroy_collection(const coll_t& cid)
1324{
1325 dout(10) << __func__ << " " << cid << dendl;
11fdf7f2 1326 std::lock_guard l{coll_lock};
7c673cae
FG
1327 ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
1328 if (cp == coll_map.end())
1329 return -ENOENT;
1330 {
11fdf7f2 1331 std::shared_lock l2{cp->second->lock};
7c673cae
FG
1332 if (!cp->second->object_map.empty())
1333 return -ENOTEMPTY;
1334 cp->second->exists = false;
1335 }
1336 used_bytes -= cp->second->used_bytes();
1337 coll_map.erase(cp);
1338 return 0;
1339}
1340
1341int MemStore::_collection_add(const coll_t& cid, const coll_t& ocid, const ghobject_t& oid)
1342{
1343 dout(10) << __func__ << " " << cid << " " << ocid << " " << oid << dendl;
1344 CollectionRef c = get_collection(cid);
1345 if (!c)
1346 return -ENOENT;
1347 CollectionRef oc = get_collection(ocid);
1348 if (!oc)
1349 return -ENOENT;
11fdf7f2
TL
1350
1351 std::scoped_lock l{std::min(&(*c), &(*oc))->lock,
1352 std::max(&(*c), &(*oc))->lock};
7c673cae
FG
1353
1354 if (c->object_hash.count(oid))
1355 return -EEXIST;
1356 if (oc->object_hash.count(oid) == 0)
1357 return -ENOENT;
1358 ObjectRef o = oc->object_hash[oid];
1359 c->object_map[oid] = o;
1360 c->object_hash[oid] = o;
1361 return 0;
1362}
1363
1364int MemStore::_collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid,
1365 coll_t cid, const ghobject_t& oid)
1366{
1367 dout(10) << __func__ << " " << oldcid << " " << oldoid << " -> "
1368 << cid << " " << oid << dendl;
1369 CollectionRef c = get_collection(cid);
1370 if (!c)
1371 return -ENOENT;
1372 CollectionRef oc = get_collection(oldcid);
1373 if (!oc)
1374 return -ENOENT;
1375
1376 // note: c and oc may be the same
11fdf7f2 1377 ceph_assert(&(*c) == &(*oc));
7c673cae 1378
11fdf7f2 1379 std::lock_guard l{c->lock};
7c673cae 1380 if (c->object_hash.count(oid))
11fdf7f2 1381 return -EEXIST;
7c673cae 1382 if (oc->object_hash.count(oldoid) == 0)
11fdf7f2 1383 return -ENOENT;
7c673cae
FG
1384 {
1385 ObjectRef o = oc->object_hash[oldoid];
1386 c->object_map[oid] = o;
1387 c->object_hash[oid] = o;
1388 oc->object_map.erase(oldoid);
1389 oc->object_hash.erase(oldoid);
1390 }
11fdf7f2 1391 return 0;
7c673cae
FG
1392}
1393
1394int MemStore::_split_collection(const coll_t& cid, uint32_t bits, uint32_t match,
1395 coll_t dest)
1396{
1397 dout(10) << __func__ << " " << cid << " " << bits << " " << match << " "
1398 << dest << dendl;
1399 CollectionRef sc = get_collection(cid);
1400 if (!sc)
1401 return -ENOENT;
1402 CollectionRef dc = get_collection(dest);
1403 if (!dc)
1404 return -ENOENT;
11fdf7f2
TL
1405
1406 std::scoped_lock l{std::min(&(*sc), &(*dc))->lock,
1407 std::max(&(*sc), &(*dc))->lock};
7c673cae
FG
1408
1409 map<ghobject_t,ObjectRef>::iterator p = sc->object_map.begin();
1410 while (p != sc->object_map.end()) {
1411 if (p->first.match(bits, match)) {
1412 dout(20) << " moving " << p->first << dendl;
1413 dc->object_map.insert(make_pair(p->first, p->second));
1414 dc->object_hash.insert(make_pair(p->first, p->second));
1415 sc->object_hash.erase(p->first);
1416 sc->object_map.erase(p++);
1417 } else {
1418 ++p;
1419 }
1420 }
1421
1422 sc->bits = bits;
11fdf7f2
TL
1423 ceph_assert(dc->bits == (int)bits);
1424
1425 return 0;
1426}
1427
1428int MemStore::_merge_collection(const coll_t& cid, uint32_t bits, coll_t dest)
1429{
1430 dout(10) << __func__ << " " << cid << " " << bits << " "
1431 << dest << dendl;
1432 CollectionRef sc = get_collection(cid);
1433 if (!sc)
1434 return -ENOENT;
1435 CollectionRef dc = get_collection(dest);
1436 if (!dc)
1437 return -ENOENT;
1438 {
1439 std::scoped_lock l{std::min(&(*sc), &(*dc))->lock,
1440 std::max(&(*sc), &(*dc))->lock};
1441
1442 map<ghobject_t,ObjectRef>::iterator p = sc->object_map.begin();
1443 while (p != sc->object_map.end()) {
1444 dout(20) << " moving " << p->first << dendl;
1445 dc->object_map.insert(make_pair(p->first, p->second));
1446 dc->object_hash.insert(make_pair(p->first, p->second));
1447 sc->object_hash.erase(p->first);
1448 sc->object_map.erase(p++);
1449 }
1450
1451 dc->bits = bits;
1452 }
1453
1454 {
1455 std::lock_guard l{coll_lock};
1456 ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
1457 ceph_assert(cp != coll_map.end());
1458 used_bytes -= cp->second->used_bytes();
1459 coll_map.erase(cp);
1460 }
7c673cae
FG
1461
1462 return 0;
1463}
11fdf7f2 1464
7c673cae
FG
1465namespace {
1466struct BufferlistObject : public MemStore::Object {
11fdf7f2 1467 ceph::spinlock mutex;
7c673cae
FG
1468 bufferlist data;
1469
1470 size_t get_size() const override { return data.length(); }
1471
1472 int read(uint64_t offset, uint64_t len, bufferlist &bl) override;
1473 int write(uint64_t offset, const bufferlist &bl) override;
1474 int clone(Object *src, uint64_t srcoff, uint64_t len,
1475 uint64_t dstoff) override;
1476 int truncate(uint64_t offset) override;
1477
1478 void encode(bufferlist& bl) const override {
1479 ENCODE_START(1, 1, bl);
11fdf7f2 1480 encode(data, bl);
7c673cae
FG
1481 encode_base(bl);
1482 ENCODE_FINISH(bl);
1483 }
11fdf7f2 1484 void decode(bufferlist::const_iterator& p) override {
7c673cae 1485 DECODE_START(1, p);
11fdf7f2 1486 decode(data, p);
7c673cae
FG
1487 decode_base(p);
1488 DECODE_FINISH(p);
1489 }
1490};
1491}
1492// BufferlistObject
1493int BufferlistObject::read(uint64_t offset, uint64_t len,
1494 bufferlist &bl)
1495{
11fdf7f2 1496 std::lock_guard<decltype(mutex)> lock(mutex);
7c673cae
FG
1497 bl.substr_of(data, offset, len);
1498 return bl.length();
1499}
1500
1501int BufferlistObject::write(uint64_t offset, const bufferlist &src)
1502{
1503 unsigned len = src.length();
1504
11fdf7f2 1505 std::lock_guard<decltype(mutex)> lock(mutex);
7c673cae
FG
1506
1507 // before
1508 bufferlist newdata;
1509 if (get_size() >= offset) {
1510 newdata.substr_of(data, 0, offset);
1511 } else {
1512 if (get_size()) {
1513 newdata.substr_of(data, 0, get_size());
1514 }
1515 newdata.append_zero(offset - get_size());
1516 }
1517
1518 newdata.append(src);
1519
1520 // after
1521 if (get_size() > offset + len) {
1522 bufferlist tail;
1523 tail.substr_of(data, offset + len, get_size() - (offset + len));
1524 newdata.append(tail);
1525 }
1526
1527 data.claim(newdata);
1528 return 0;
1529}
1530
1531int BufferlistObject::clone(Object *src, uint64_t srcoff,
1532 uint64_t len, uint64_t dstoff)
1533{
1534 auto srcbl = dynamic_cast<BufferlistObject*>(src);
1535 if (srcbl == nullptr)
1536 return -ENOTSUP;
1537
1538 bufferlist bl;
1539 {
11fdf7f2 1540 std::lock_guard<decltype(srcbl->mutex)> lock(srcbl->mutex);
7c673cae
FG
1541 if (srcoff == dstoff && len == src->get_size()) {
1542 data = srcbl->data;
1543 return 0;
1544 }
1545 bl.substr_of(srcbl->data, srcoff, len);
1546 }
1547 return write(dstoff, bl);
1548}
1549
1550int BufferlistObject::truncate(uint64_t size)
1551{
11fdf7f2 1552 std::lock_guard<decltype(mutex)> lock(mutex);
7c673cae
FG
1553 if (get_size() > size) {
1554 bufferlist bl;
1555 bl.substr_of(data, 0, size);
1556 data.claim(bl);
1557 } else if (get_size() == size) {
1558 // do nothing
1559 } else {
1560 data.append_zero(size - get_size());
1561 }
1562 return 0;
1563}
1564
1565// PageSetObject
1566
1567struct MemStore::PageSetObject : public Object {
1568 PageSet data;
1569 uint64_t data_len;
1570#if defined(__GLIBCXX__)
1571 // use a thread-local vector for the pages returned by PageSet, so we
1572 // can avoid allocations in read/write()
1573 static thread_local PageSet::page_vector tls_pages;
1574#endif
1575
7c673cae
FG
1576 size_t get_size() const override { return data_len; }
1577
1578 int read(uint64_t offset, uint64_t len, bufferlist &bl) override;
1579 int write(uint64_t offset, const bufferlist &bl) override;
1580 int clone(Object *src, uint64_t srcoff, uint64_t len,
1581 uint64_t dstoff) override;
1582 int truncate(uint64_t offset) override;
1583
1584 void encode(bufferlist& bl) const override {
1585 ENCODE_START(1, 1, bl);
11fdf7f2 1586 encode(data_len, bl);
7c673cae
FG
1587 data.encode(bl);
1588 encode_base(bl);
1589 ENCODE_FINISH(bl);
1590 }
11fdf7f2 1591 void decode(bufferlist::const_iterator& p) override {
7c673cae 1592 DECODE_START(1, p);
11fdf7f2 1593 decode(data_len, p);
7c673cae
FG
1594 data.decode(p);
1595 decode_base(p);
1596 DECODE_FINISH(p);
1597 }
9f95a23c
TL
1598
1599private:
1600 FRIEND_MAKE_REF(PageSetObject);
1601 explicit PageSetObject(size_t page_size) : data(page_size), data_len(0) {}
7c673cae
FG
1602};
1603
1604#if defined(__GLIBCXX__)
1605// use a thread-local vector for the pages returned by PageSet, so we
1606// can avoid allocations in read/write()
1607thread_local PageSet::page_vector MemStore::PageSetObject::tls_pages;
1608#define DEFINE_PAGE_VECTOR(name)
1609#else
1610#define DEFINE_PAGE_VECTOR(name) PageSet::page_vector name;
1611#endif
1612
1613int MemStore::PageSetObject::read(uint64_t offset, uint64_t len, bufferlist& bl)
1614{
1615 const auto start = offset;
1616 const auto end = offset + len;
1617 auto remaining = len;
1618
1619 DEFINE_PAGE_VECTOR(tls_pages);
1620 data.get_range(offset, len, tls_pages);
1621
1622 // allocate a buffer for the data
1623 buffer::ptr buf(len);
1624
1625 auto p = tls_pages.begin();
1626 while (remaining) {
1627 // no more pages in range
1628 if (p == tls_pages.end() || (*p)->offset >= end) {
1629 buf.zero(offset - start, remaining);
1630 break;
1631 }
1632 auto page = *p;
1633
1634 // fill any holes between pages with zeroes
1635 if (page->offset > offset) {
1636 const auto count = std::min(remaining, page->offset - offset);
1637 buf.zero(offset - start, count);
1638 remaining -= count;
1639 offset = page->offset;
1640 if (!remaining)
1641 break;
1642 }
1643
1644 // read from page
1645 const auto page_offset = offset - page->offset;
1646 const auto count = min(remaining, data.get_page_size() - page_offset);
1647
1648 buf.copy_in(offset - start, count, page->data + page_offset);
1649
1650 remaining -= count;
1651 offset += count;
1652
1653 ++p;
1654 }
1655
1656 tls_pages.clear(); // drop page refs
1657
1658 bl.append(std::move(buf));
1659 return len;
1660}
1661
1662int MemStore::PageSetObject::write(uint64_t offset, const bufferlist &src)
1663{
1664 unsigned len = src.length();
1665
1666 DEFINE_PAGE_VECTOR(tls_pages);
1667 // make sure the page range is allocated
1668 data.alloc_range(offset, src.length(), tls_pages);
1669
1670 auto page = tls_pages.begin();
1671
1672 auto p = src.begin();
1673 while (len > 0) {
1674 unsigned page_offset = offset - (*page)->offset;
1675 unsigned pageoff = data.get_page_size() - page_offset;
1676 unsigned count = min(len, pageoff);
1677 p.copy(count, (*page)->data + page_offset);
1678 offset += count;
1679 len -= count;
1680 if (count == pageoff)
1681 ++page;
1682 }
1683 if (data_len < offset)
1684 data_len = offset;
1685 tls_pages.clear(); // drop page refs
1686 return 0;
1687}
1688
1689int MemStore::PageSetObject::clone(Object *src, uint64_t srcoff,
1690 uint64_t len, uint64_t dstoff)
1691{
1692 const int64_t delta = dstoff - srcoff;
1693
1694 auto &src_data = static_cast<PageSetObject*>(src)->data;
1695 const uint64_t src_page_size = src_data.get_page_size();
1696
1697 auto &dst_data = data;
1698 const auto dst_page_size = dst_data.get_page_size();
1699
1700 DEFINE_PAGE_VECTOR(tls_pages);
1701 PageSet::page_vector dst_pages;
1702
1703 while (len) {
1704 // limit to 16 pages at a time so tls_pages doesn't balloon in size
1705 auto count = std::min(len, (uint64_t)src_page_size * 16);
1706 src_data.get_range(srcoff, count, tls_pages);
1707
1708 // allocate the destination range
1709 // TODO: avoid allocating pages for holes in the source range
1710 dst_data.alloc_range(srcoff + delta, count, dst_pages);
1711 auto dst_iter = dst_pages.begin();
1712
1713 for (auto &src_page : tls_pages) {
1714 auto sbegin = std::max(srcoff, src_page->offset);
1715 auto send = std::min(srcoff + count, src_page->offset + src_page_size);
1716
1717 // zero-fill holes before src_page
1718 if (srcoff < sbegin) {
1719 while (dst_iter != dst_pages.end()) {
1720 auto &dst_page = *dst_iter;
1721 auto dbegin = std::max(srcoff + delta, dst_page->offset);
1722 auto dend = std::min(sbegin + delta, dst_page->offset + dst_page_size);
1723 std::fill(dst_page->data + dbegin - dst_page->offset,
1724 dst_page->data + dend - dst_page->offset, 0);
1725 if (dend < dst_page->offset + dst_page_size)
1726 break;
1727 ++dst_iter;
1728 }
1729 const auto c = sbegin - srcoff;
1730 count -= c;
1731 len -= c;
1732 }
1733
1734 // copy data from src page to dst pages
1735 while (dst_iter != dst_pages.end()) {
1736 auto &dst_page = *dst_iter;
1737 auto dbegin = std::max(sbegin + delta, dst_page->offset);
1738 auto dend = std::min(send + delta, dst_page->offset + dst_page_size);
1739
1740 std::copy(src_page->data + (dbegin - delta) - src_page->offset,
1741 src_page->data + (dend - delta) - src_page->offset,
1742 dst_page->data + dbegin - dst_page->offset);
1743 if (dend < dst_page->offset + dst_page_size)
1744 break;
1745 ++dst_iter;
1746 }
1747
1748 const auto c = send - sbegin;
1749 count -= c;
1750 len -= c;
1751 srcoff = send;
1752 dstoff = send + delta;
1753 }
1754 tls_pages.clear(); // drop page refs
1755
1756 // zero-fill holes after the last src_page
1757 if (count > 0) {
1758 while (dst_iter != dst_pages.end()) {
1759 auto &dst_page = *dst_iter;
1760 auto dbegin = std::max(dstoff, dst_page->offset);
1761 auto dend = std::min(dstoff + count, dst_page->offset + dst_page_size);
1762 std::fill(dst_page->data + dbegin - dst_page->offset,
1763 dst_page->data + dend - dst_page->offset, 0);
1764 ++dst_iter;
1765 }
1766 srcoff += count;
1767 dstoff += count;
1768 len -= count;
1769 }
1770 dst_pages.clear(); // drop page refs
1771 }
1772
1773 // update object size
1774 if (data_len < dstoff)
1775 data_len = dstoff;
1776 return 0;
1777}
1778
1779int MemStore::PageSetObject::truncate(uint64_t size)
1780{
1781 data.free_pages_after(size);
1782 data_len = size;
1783
1784 const auto page_size = data.get_page_size();
1785 const auto page_offset = size & ~(page_size-1);
1786 if (page_offset == size)
1787 return 0;
1788
1789 DEFINE_PAGE_VECTOR(tls_pages);
1790 // write zeroes to the rest of the last page
1791 data.get_range(page_offset, page_size, tls_pages);
1792 if (tls_pages.empty())
1793 return 0;
1794
1795 auto page = tls_pages.begin();
1796 auto data = (*page)->data;
1797 std::fill(data + (size - page_offset), data + page_size, 0);
1798 tls_pages.clear(); // drop page ref
1799 return 0;
1800}
1801
1802
1803MemStore::ObjectRef MemStore::Collection::create_object() const {
1804 if (use_page_set)
9f95a23c 1805 return ceph::make_ref<PageSetObject>(cct->_conf->memstore_page_size);
7c673cae
FG
1806 return new BufferlistObject();
1807}