]> git.proxmox.com Git - ceph.git/blame - ceph/src/os/memstore/MemStore.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / os / memstore / MemStore.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14#include "acconfig.h"
15
16#ifdef HAVE_SYS_MOUNT_H
17#include <sys/mount.h>
18#endif
19
20#ifdef HAVE_SYS_PARAM_H
21#include <sys/param.h>
22#endif
23
24#include "include/types.h"
25#include "include/stringify.h"
26#include "include/unordered_map.h"
7c673cae
FG
27#include "common/errno.h"
28#include "MemStore.h"
29#include "include/compat.h"
30
31#define dout_context cct
32#define dout_subsys ceph_subsys_filestore
33#undef dout_prefix
34#define dout_prefix *_dout << "memstore(" << path << ") "
35
36// for comparing collections for lock ordering
37bool operator>(const MemStore::CollectionRef& l,
38 const MemStore::CollectionRef& r)
39{
40 return (unsigned long)l.get() > (unsigned long)r.get();
41}
42
43
44int MemStore::mount()
45{
46 int r = _load();
47 if (r < 0)
48 return r;
49 finisher.start();
50 return 0;
51}
52
53int MemStore::umount()
54{
55 finisher.wait_for_empty();
56 finisher.stop();
57 return _save();
58}
59
60int MemStore::_save()
61{
62 dout(10) << __func__ << dendl;
63 dump_all();
64 set<coll_t> collections;
65 for (ceph::unordered_map<coll_t,CollectionRef>::iterator p = coll_map.begin();
66 p != coll_map.end();
67 ++p) {
68 dout(20) << __func__ << " coll " << p->first << " " << p->second << dendl;
69 collections.insert(p->first);
70 bufferlist bl;
11fdf7f2 71 ceph_assert(p->second);
7c673cae
FG
72 p->second->encode(bl);
73 string fn = path + "/" + stringify(p->first);
74 int r = bl.write_file(fn.c_str());
75 if (r < 0)
76 return r;
77 }
78
79 string fn = path + "/collections";
80 bufferlist bl;
11fdf7f2 81 encode(collections, bl);
7c673cae
FG
82 int r = bl.write_file(fn.c_str());
83 if (r < 0)
84 return r;
85
86 return 0;
87}
88
89void MemStore::dump_all()
90{
91 Formatter *f = Formatter::create("json-pretty");
92 f->open_object_section("store");
93 dump(f);
94 f->close_section();
95 dout(0) << "dump:";
96 f->flush(*_dout);
97 *_dout << dendl;
98 delete f;
99}
100
101void MemStore::dump(Formatter *f)
102{
103 f->open_array_section("collections");
104 for (ceph::unordered_map<coll_t,CollectionRef>::iterator p = coll_map.begin();
105 p != coll_map.end();
106 ++p) {
107 f->open_object_section("collection");
108 f->dump_string("name", stringify(p->first));
109
110 f->open_array_section("xattrs");
111 for (map<string,bufferptr>::iterator q = p->second->xattr.begin();
112 q != p->second->xattr.end();
113 ++q) {
114 f->open_object_section("xattr");
115 f->dump_string("name", q->first);
116 f->dump_int("length", q->second.length());
117 f->close_section();
118 }
119 f->close_section();
120
121 f->open_array_section("objects");
122 for (map<ghobject_t,ObjectRef>::iterator q = p->second->object_map.begin();
123 q != p->second->object_map.end();
124 ++q) {
125 f->open_object_section("object");
126 f->dump_string("name", stringify(q->first));
127 if (q->second)
128 q->second->dump(f);
129 f->close_section();
130 }
131 f->close_section();
132
133 f->close_section();
134 }
135 f->close_section();
136}
137
138int MemStore::_load()
139{
140 dout(10) << __func__ << dendl;
141 bufferlist bl;
142 string fn = path + "/collections";
143 string err;
144 int r = bl.read_file(fn.c_str(), &err);
145 if (r < 0)
146 return r;
147
148 set<coll_t> collections;
11fdf7f2
TL
149 auto p = bl.cbegin();
150 decode(collections, p);
7c673cae
FG
151
152 for (set<coll_t>::iterator q = collections.begin();
153 q != collections.end();
154 ++q) {
155 string fn = path + "/" + stringify(*q);
156 bufferlist cbl;
157 int r = cbl.read_file(fn.c_str(), &err);
158 if (r < 0)
159 return r;
160 CollectionRef c(new Collection(cct, *q));
11fdf7f2 161 auto p = cbl.cbegin();
7c673cae
FG
162 c->decode(p);
163 coll_map[*q] = c;
164 used_bytes += c->used_bytes();
165 }
166
167 dump_all();
168
169 return 0;
170}
171
172void MemStore::set_fsid(uuid_d u)
173{
b32b8144 174 int r = write_meta("fsid", stringify(u));
11fdf7f2 175 ceph_assert(r >= 0);
7c673cae
FG
176}
177
178uuid_d MemStore::get_fsid()
179{
180 string fsid_str;
b32b8144 181 int r = read_meta("fsid", &fsid_str);
11fdf7f2 182 ceph_assert(r >= 0);
7c673cae
FG
183 uuid_d uuid;
184 bool b = uuid.parse(fsid_str.c_str());
11fdf7f2 185 ceph_assert(b);
7c673cae
FG
186 return uuid;
187}
188
189int MemStore::mkfs()
190{
191 string fsid_str;
b32b8144 192 int r = read_meta("fsid", &fsid_str);
7c673cae
FG
193 if (r == -ENOENT) {
194 uuid_d fsid;
195 fsid.generate_random();
196 fsid_str = stringify(fsid);
b32b8144 197 r = write_meta("fsid", fsid_str);
7c673cae
FG
198 if (r < 0)
199 return r;
200 dout(1) << __func__ << " new fsid " << fsid_str << dendl;
201 } else if (r < 0) {
202 return r;
203 } else {
204 dout(1) << __func__ << " had fsid " << fsid_str << dendl;
205 }
206
207 string fn = path + "/collections";
208 derr << path << dendl;
209 bufferlist bl;
210 set<coll_t> collections;
11fdf7f2 211 encode(collections, bl);
7c673cae
FG
212 r = bl.write_file(fn.c_str());
213 if (r < 0)
214 return r;
215
216 r = write_meta("type", "memstore");
217 if (r < 0)
218 return r;
219
220 return 0;
221}
222
11fdf7f2 223int MemStore::statfs(struct store_statfs_t *st, osd_alert_list_t* alerts)
7c673cae 224{
11fdf7f2
TL
225 dout(10) << __func__ << dendl;
226 if (alerts) {
227 alerts->clear(); // returns nothing for now
228 }
7c673cae
FG
229 st->reset();
230 st->total = cct->_conf->memstore_device_bytes;
11fdf7f2 231 st->available = std::max<int64_t>(st->total - used_bytes, 0);
7c673cae
FG
232 dout(10) << __func__ << ": used_bytes: " << used_bytes
233 << "/" << cct->_conf->memstore_device_bytes << dendl;
234 return 0;
235}
236
11fdf7f2
TL
237int MemStore::pool_statfs(uint64_t pool_id, struct store_statfs_t *buf)
238{
239 return -ENOTSUP;
240}
241
7c673cae
FG
242objectstore_perf_stat_t MemStore::get_cur_stats()
243{
244 // fixme
245 return objectstore_perf_stat_t();
246}
247
248MemStore::CollectionRef MemStore::get_collection(const coll_t& cid)
249{
11fdf7f2 250 std::shared_lock l{coll_lock};
7c673cae
FG
251 ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
252 if (cp == coll_map.end())
253 return CollectionRef();
254 return cp->second;
255}
256
11fdf7f2
TL
257ObjectStore::CollectionHandle MemStore::create_new_collection(const coll_t& cid)
258{
259 std::lock_guard l{coll_lock};
260 Collection *c = new Collection(cct, cid);
261 new_coll_map[cid] = c;
262 return c;
263}
264
7c673cae
FG
265
266// ---------------
267// read operations
268
7c673cae
FG
269bool MemStore::exists(CollectionHandle &c_, const ghobject_t& oid)
270{
271 Collection *c = static_cast<Collection*>(c_.get());
272 dout(10) << __func__ << " " << c->get_cid() << " " << oid << dendl;
273 if (!c->exists)
274 return false;
275
276 // Perform equivalent of c->get_object_(oid) != NULL. In C++11 the
277 // shared_ptr needs to be compared to nullptr.
278 return (bool)c->get_object(oid);
279}
280
7c673cae
FG
281int MemStore::stat(
282 CollectionHandle &c_,
283 const ghobject_t& oid,
284 struct stat *st,
285 bool allow_eio)
286{
287 Collection *c = static_cast<Collection*>(c_.get());
288 dout(10) << __func__ << " " << c->cid << " " << oid << dendl;
289 if (!c->exists)
290 return -ENOENT;
291 ObjectRef o = c->get_object(oid);
292 if (!o)
293 return -ENOENT;
294 st->st_size = o->get_size();
295 st->st_blksize = 4096;
296 st->st_blocks = (st->st_size + st->st_blksize - 1) / st->st_blksize;
297 st->st_nlink = 1;
298 return 0;
299}
300
301int MemStore::set_collection_opts(
11fdf7f2 302 CollectionHandle& ch,
7c673cae
FG
303 const pool_opts_t& opts)
304{
305 return -EOPNOTSUPP;
306}
307
7c673cae
FG
308int MemStore::read(
309 CollectionHandle &c_,
310 const ghobject_t& oid,
311 uint64_t offset,
312 size_t len,
313 bufferlist& bl,
224ce89b 314 uint32_t op_flags)
7c673cae
FG
315{
316 Collection *c = static_cast<Collection*>(c_.get());
317 dout(10) << __func__ << " " << c->cid << " " << oid << " "
318 << offset << "~" << len << dendl;
319 if (!c->exists)
320 return -ENOENT;
321 ObjectRef o = c->get_object(oid);
322 if (!o)
323 return -ENOENT;
324 if (offset >= o->get_size())
325 return 0;
326 size_t l = len;
327 if (l == 0 && offset == 0) // note: len == 0 means read the entire object
328 l = o->get_size();
329 else if (offset + l > o->get_size())
330 l = o->get_size() - offset;
331 bl.clear();
332 return o->read(offset, l, bl);
333}
334
11fdf7f2 335int MemStore::fiemap(CollectionHandle& ch, const ghobject_t& oid,
7c673cae
FG
336 uint64_t offset, size_t len, bufferlist& bl)
337{
338 map<uint64_t, uint64_t> destmap;
11fdf7f2 339 int r = fiemap(ch, oid, offset, len, destmap);
7c673cae 340 if (r >= 0)
11fdf7f2 341 encode(destmap, bl);
7c673cae
FG
342 return r;
343}
344
11fdf7f2 345int MemStore::fiemap(CollectionHandle& ch, const ghobject_t& oid,
7c673cae
FG
346 uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap)
347{
11fdf7f2 348 dout(10) << __func__ << " " << ch->cid << " " << oid << " " << offset << "~"
7c673cae 349 << len << dendl;
11fdf7f2 350 Collection *c = static_cast<Collection*>(ch.get());
7c673cae
FG
351 if (!c)
352 return -ENOENT;
353
354 ObjectRef o = c->get_object(oid);
355 if (!o)
356 return -ENOENT;
357 size_t l = len;
358 if (offset + l > o->get_size())
359 l = o->get_size() - offset;
360 if (offset >= o->get_size())
361 goto out;
362 destmap[offset] = l;
363 out:
364 return 0;
365}
366
7c673cae
FG
367int MemStore::getattr(CollectionHandle &c_, const ghobject_t& oid,
368 const char *name, bufferptr& value)
369{
370 Collection *c = static_cast<Collection*>(c_.get());
371 dout(10) << __func__ << " " << c->cid << " " << oid << " " << name << dendl;
372 if (!c->exists)
373 return -ENOENT;
374 ObjectRef o = c->get_object(oid);
375 if (!o)
376 return -ENOENT;
377 string k(name);
11fdf7f2 378 std::lock_guard lock{o->xattr_mutex};
7c673cae
FG
379 if (!o->xattr.count(k)) {
380 return -ENODATA;
381 }
382 value = o->xattr[k];
383 return 0;
384}
385
7c673cae
FG
386int MemStore::getattrs(CollectionHandle &c_, const ghobject_t& oid,
387 map<string,bufferptr>& aset)
388{
389 Collection *c = static_cast<Collection*>(c_.get());
390 dout(10) << __func__ << " " << c->cid << " " << oid << dendl;
391 if (!c->exists)
392 return -ENOENT;
393
394 ObjectRef o = c->get_object(oid);
395 if (!o)
396 return -ENOENT;
11fdf7f2 397 std::lock_guard lock{o->xattr_mutex};
7c673cae
FG
398 aset = o->xattr;
399 return 0;
400}
401
402int MemStore::list_collections(vector<coll_t>& ls)
403{
404 dout(10) << __func__ << dendl;
11fdf7f2 405 std::shared_lock l{coll_lock};
7c673cae
FG
406 for (ceph::unordered_map<coll_t,CollectionRef>::iterator p = coll_map.begin();
407 p != coll_map.end();
408 ++p) {
409 ls.push_back(p->first);
410 }
411 return 0;
412}
413
414bool MemStore::collection_exists(const coll_t& cid)
415{
416 dout(10) << __func__ << " " << cid << dendl;
11fdf7f2 417 std::shared_lock l{coll_lock};
7c673cae
FG
418 return coll_map.count(cid);
419}
420
11fdf7f2 421int MemStore::collection_empty(CollectionHandle& ch, bool *empty)
7c673cae 422{
11fdf7f2
TL
423 dout(10) << __func__ << " " << ch->cid << dendl;
424 CollectionRef c = static_cast<Collection*>(ch.get());
425 std::shared_lock l{c->lock};
7c673cae
FG
426 *empty = c->object_map.empty();
427 return 0;
428}
429
11fdf7f2 430int MemStore::collection_bits(CollectionHandle& ch)
7c673cae 431{
11fdf7f2
TL
432 dout(10) << __func__ << " " << ch->cid << dendl;
433 Collection *c = static_cast<Collection*>(ch.get());
434 std::shared_lock l{c->lock};
7c673cae
FG
435 return c->bits;
436}
437
11fdf7f2 438int MemStore::collection_list(CollectionHandle& ch,
7c673cae
FG
439 const ghobject_t& start,
440 const ghobject_t& end,
441 int max,
442 vector<ghobject_t> *ls, ghobject_t *next)
443{
11fdf7f2
TL
444 Collection *c = static_cast<Collection*>(ch.get());
445 std::shared_lock l{c->lock};
7c673cae 446
11fdf7f2 447 dout(10) << __func__ << " cid " << ch->cid << " start " << start
7c673cae
FG
448 << " end " << end << dendl;
449 map<ghobject_t,ObjectRef>::iterator p = c->object_map.lower_bound(start);
450 while (p != c->object_map.end() &&
451 ls->size() < (unsigned)max &&
452 p->first < end) {
453 ls->push_back(p->first);
454 ++p;
455 }
456 if (next != NULL) {
457 if (p == c->object_map.end())
458 *next = ghobject_t::get_max();
459 else
460 *next = p->first;
461 }
11fdf7f2 462 dout(10) << __func__ << " cid " << ch->cid << " got " << ls->size() << dendl;
7c673cae
FG
463 return 0;
464}
465
466int MemStore::omap_get(
11fdf7f2
TL
467 CollectionHandle& ch, ///< [in] Collection containing oid
468 const ghobject_t &oid, ///< [in] Object containing omap
469 bufferlist *header, ///< [out] omap header
470 map<string, bufferlist> *out /// < [out] Key to value map
471 )
7c673cae 472{
11fdf7f2
TL
473 dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
474 Collection *c = static_cast<Collection*>(ch.get());
7c673cae
FG
475
476 ObjectRef o = c->get_object(oid);
477 if (!o)
478 return -ENOENT;
11fdf7f2 479 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
480 *header = o->omap_header;
481 *out = o->omap;
482 return 0;
483}
484
485int MemStore::omap_get_header(
11fdf7f2
TL
486 CollectionHandle& ch, ///< [in] Collection containing oid
487 const ghobject_t &oid, ///< [in] Object containing omap
488 bufferlist *header, ///< [out] omap header
489 bool allow_eio ///< [in] don't assert on eio
490 )
491{
492 dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
493 Collection *c = static_cast<Collection*>(ch.get());
7c673cae
FG
494 ObjectRef o = c->get_object(oid);
495 if (!o)
496 return -ENOENT;
11fdf7f2 497 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
498 *header = o->omap_header;
499 return 0;
500}
501
502int MemStore::omap_get_keys(
11fdf7f2
TL
503 CollectionHandle& ch, ///< [in] Collection containing oid
504 const ghobject_t &oid, ///< [in] Object containing omap
505 set<string> *keys ///< [out] Keys defined on oid
506 )
7c673cae 507{
11fdf7f2
TL
508 dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
509 Collection *c = static_cast<Collection*>(ch.get());
7c673cae
FG
510 ObjectRef o = c->get_object(oid);
511 if (!o)
512 return -ENOENT;
11fdf7f2 513 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
514 for (map<string,bufferlist>::iterator p = o->omap.begin();
515 p != o->omap.end();
516 ++p)
517 keys->insert(p->first);
518 return 0;
519}
520
521int MemStore::omap_get_values(
11fdf7f2
TL
522 CollectionHandle& ch, ///< [in] Collection containing oid
523 const ghobject_t &oid, ///< [in] Object containing omap
524 const set<string> &keys, ///< [in] Keys to get
525 map<string, bufferlist> *out ///< [out] Returned keys and values
526 )
527{
528 dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
529 Collection *c = static_cast<Collection*>(ch.get());
7c673cae
FG
530 ObjectRef o = c->get_object(oid);
531 if (!o)
532 return -ENOENT;
11fdf7f2 533 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
534 for (set<string>::const_iterator p = keys.begin();
535 p != keys.end();
536 ++p) {
537 map<string,bufferlist>::iterator q = o->omap.find(*p);
538 if (q != o->omap.end())
539 out->insert(*q);
540 }
541 return 0;
542}
543
544int MemStore::omap_check_keys(
11fdf7f2
TL
545 CollectionHandle& ch, ///< [in] Collection containing oid
546 const ghobject_t &oid, ///< [in] Object containing omap
547 const set<string> &keys, ///< [in] Keys to check
548 set<string> *out ///< [out] Subset of keys defined on oid
549 )
550{
551 dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
552 Collection *c = static_cast<Collection*>(ch.get());
7c673cae
FG
553 ObjectRef o = c->get_object(oid);
554 if (!o)
555 return -ENOENT;
11fdf7f2 556 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
557 for (set<string>::const_iterator p = keys.begin();
558 p != keys.end();
559 ++p) {
560 map<string,bufferlist>::iterator q = o->omap.find(*p);
561 if (q != o->omap.end())
562 out->insert(*p);
563 }
564 return 0;
565}
566
567class MemStore::OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
568 CollectionRef c;
569 ObjectRef o;
570 map<string,bufferlist>::iterator it;
571public:
572 OmapIteratorImpl(CollectionRef c, ObjectRef o)
573 : c(c), o(o), it(o->omap.begin()) {}
574
575 int seek_to_first() override {
11fdf7f2 576 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
577 it = o->omap.begin();
578 return 0;
579 }
580 int upper_bound(const string &after) override {
11fdf7f2 581 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
582 it = o->omap.upper_bound(after);
583 return 0;
584 }
585 int lower_bound(const string &to) override {
11fdf7f2 586 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
587 it = o->omap.lower_bound(to);
588 return 0;
589 }
590 bool valid() override {
11fdf7f2 591 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
592 return it != o->omap.end();
593 }
11fdf7f2
TL
594 int next() override {
595 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
596 ++it;
597 return 0;
598 }
599 string key() override {
11fdf7f2 600 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
601 return it->first;
602 }
603 bufferlist value() override {
11fdf7f2 604 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
605 return it->second;
606 }
607 int status() override {
608 return 0;
609 }
610};
611
11fdf7f2
TL
612ObjectMap::ObjectMapIterator MemStore::get_omap_iterator(
613 CollectionHandle& ch,
614 const ghobject_t& oid)
7c673cae 615{
11fdf7f2
TL
616 dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
617 Collection *c = static_cast<Collection*>(ch.get());
7c673cae
FG
618 ObjectRef o = c->get_object(oid);
619 if (!o)
620 return ObjectMap::ObjectMapIterator();
621 return ObjectMap::ObjectMapIterator(new OmapIteratorImpl(c, o));
622}
623
624
625// ---------------
626// write operations
627
11fdf7f2
TL
628int MemStore::queue_transactions(
629 CollectionHandle& ch,
630 vector<Transaction>& tls,
631 TrackedOpRef op,
632 ThreadPool::TPHandle *handle)
7c673cae
FG
633{
634 // because memstore operations are synchronous, we can implement the
635 // Sequencer with a mutex. this guarantees ordering on a given sequencer,
636 // while allowing operations on different sequencers to happen in parallel
11fdf7f2
TL
637 Collection *c = static_cast<Collection*>(ch.get());
638 std::unique_lock lock{c->sequencer_mutex};
7c673cae
FG
639
640 for (vector<Transaction>::iterator p = tls.begin(); p != tls.end(); ++p) {
641 // poke the TPHandle heartbeat just to exercise that code path
642 if (handle)
643 handle->reset_tp_timeout();
644
645 _do_transaction(*p);
646 }
647
648 Context *on_apply = NULL, *on_apply_sync = NULL, *on_commit = NULL;
649 ObjectStore::Transaction::collect_contexts(tls, &on_apply, &on_commit,
650 &on_apply_sync);
651 if (on_apply_sync)
652 on_apply_sync->complete(0);
653 if (on_apply)
654 finisher.queue(on_apply);
655 if (on_commit)
656 finisher.queue(on_commit);
657 return 0;
658}
659
660void MemStore::_do_transaction(Transaction& t)
661{
662 Transaction::iterator i = t.begin();
663 int pos = 0;
664
665 while (i.have_op()) {
666 Transaction::Op *op = i.decode_op();
667 int r = 0;
668
669 switch (op->op) {
670 case Transaction::OP_NOP:
671 break;
672 case Transaction::OP_TOUCH:
673 {
674 coll_t cid = i.get_cid(op->cid);
675 ghobject_t oid = i.get_oid(op->oid);
676 r = _touch(cid, oid);
677 }
678 break;
679
680 case Transaction::OP_WRITE:
681 {
682 coll_t cid = i.get_cid(op->cid);
683 ghobject_t oid = i.get_oid(op->oid);
684 uint64_t off = op->off;
685 uint64_t len = op->len;
686 uint32_t fadvise_flags = i.get_fadvise_flags();
687 bufferlist bl;
688 i.decode_bl(bl);
689 r = _write(cid, oid, off, len, bl, fadvise_flags);
690 }
691 break;
692
693 case Transaction::OP_ZERO:
694 {
695 coll_t cid = i.get_cid(op->cid);
696 ghobject_t oid = i.get_oid(op->oid);
697 uint64_t off = op->off;
698 uint64_t len = op->len;
699 r = _zero(cid, oid, off, len);
700 }
701 break;
702
703 case Transaction::OP_TRIMCACHE:
704 {
705 // deprecated, no-op
706 }
707 break;
708
709 case Transaction::OP_TRUNCATE:
710 {
711 coll_t cid = i.get_cid(op->cid);
712 ghobject_t oid = i.get_oid(op->oid);
713 uint64_t off = op->off;
714 r = _truncate(cid, oid, off);
715 }
716 break;
717
718 case Transaction::OP_REMOVE:
719 {
720 coll_t cid = i.get_cid(op->cid);
721 ghobject_t oid = i.get_oid(op->oid);
722 r = _remove(cid, oid);
723 }
724 break;
725
726 case Transaction::OP_SETATTR:
727 {
728 coll_t cid = i.get_cid(op->cid);
729 ghobject_t oid = i.get_oid(op->oid);
730 string name = i.decode_string();
731 bufferlist bl;
732 i.decode_bl(bl);
733 map<string, bufferptr> to_set;
734 to_set[name] = bufferptr(bl.c_str(), bl.length());
735 r = _setattrs(cid, oid, to_set);
736 }
737 break;
738
739 case Transaction::OP_SETATTRS:
740 {
741 coll_t cid = i.get_cid(op->cid);
742 ghobject_t oid = i.get_oid(op->oid);
743 map<string, bufferptr> aset;
744 i.decode_attrset(aset);
745 r = _setattrs(cid, oid, aset);
746 }
747 break;
748
749 case Transaction::OP_RMATTR:
750 {
751 coll_t cid = i.get_cid(op->cid);
752 ghobject_t oid = i.get_oid(op->oid);
753 string name = i.decode_string();
754 r = _rmattr(cid, oid, name.c_str());
755 }
756 break;
757
758 case Transaction::OP_RMATTRS:
759 {
760 coll_t cid = i.get_cid(op->cid);
761 ghobject_t oid = i.get_oid(op->oid);
762 r = _rmattrs(cid, oid);
763 }
764 break;
765
766 case Transaction::OP_CLONE:
767 {
768 coll_t cid = i.get_cid(op->cid);
769 ghobject_t oid = i.get_oid(op->oid);
770 ghobject_t noid = i.get_oid(op->dest_oid);
771 r = _clone(cid, oid, noid);
772 }
773 break;
774
775 case Transaction::OP_CLONERANGE:
776 {
777 coll_t cid = i.get_cid(op->cid);
778 ghobject_t oid = i.get_oid(op->oid);
779 ghobject_t noid = i.get_oid(op->dest_oid);
780 uint64_t off = op->off;
781 uint64_t len = op->len;
782 r = _clone_range(cid, oid, noid, off, len, off);
783 }
784 break;
785
786 case Transaction::OP_CLONERANGE2:
787 {
788 coll_t cid = i.get_cid(op->cid);
789 ghobject_t oid = i.get_oid(op->oid);
790 ghobject_t noid = i.get_oid(op->dest_oid);
791 uint64_t srcoff = op->off;
792 uint64_t len = op->len;
793 uint64_t dstoff = op->dest_off;
794 r = _clone_range(cid, oid, noid, srcoff, len, dstoff);
795 }
796 break;
797
798 case Transaction::OP_MKCOLL:
799 {
800 coll_t cid = i.get_cid(op->cid);
801 r = _create_collection(cid, op->split_bits);
802 }
803 break;
804
805 case Transaction::OP_COLL_HINT:
806 {
807 coll_t cid = i.get_cid(op->cid);
808 uint32_t type = op->hint_type;
809 bufferlist hint;
810 i.decode_bl(hint);
11fdf7f2 811 auto hiter = hint.cbegin();
7c673cae
FG
812 if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) {
813 uint32_t pg_num;
814 uint64_t num_objs;
11fdf7f2
TL
815 decode(pg_num, hiter);
816 decode(num_objs, hiter);
7c673cae
FG
817 r = _collection_hint_expected_num_objs(cid, pg_num, num_objs);
818 } else {
819 // Ignore the hint
820 dout(10) << "Unrecognized collection hint type: " << type << dendl;
821 }
822 }
823 break;
824
825 case Transaction::OP_RMCOLL:
826 {
827 coll_t cid = i.get_cid(op->cid);
828 r = _destroy_collection(cid);
829 }
830 break;
831
832 case Transaction::OP_COLL_ADD:
833 {
834 coll_t ocid = i.get_cid(op->cid);
835 coll_t ncid = i.get_cid(op->dest_cid);
836 ghobject_t oid = i.get_oid(op->oid);
837 r = _collection_add(ncid, ocid, oid);
838 }
839 break;
840
841 case Transaction::OP_COLL_REMOVE:
842 {
843 coll_t cid = i.get_cid(op->cid);
844 ghobject_t oid = i.get_oid(op->oid);
845 r = _remove(cid, oid);
846 }
847 break;
848
849 case Transaction::OP_COLL_MOVE:
11fdf7f2 850 ceph_abort_msg("deprecated");
7c673cae
FG
851 break;
852
853 case Transaction::OP_COLL_MOVE_RENAME:
854 {
855 coll_t oldcid = i.get_cid(op->cid);
856 ghobject_t oldoid = i.get_oid(op->oid);
857 coll_t newcid = i.get_cid(op->dest_cid);
858 ghobject_t newoid = i.get_oid(op->dest_oid);
859 r = _collection_move_rename(oldcid, oldoid, newcid, newoid);
860 if (r == -ENOENT)
861 r = 0;
862 }
863 break;
864
865 case Transaction::OP_TRY_RENAME:
866 {
867 coll_t cid = i.get_cid(op->cid);
868 ghobject_t oldoid = i.get_oid(op->oid);
869 ghobject_t newoid = i.get_oid(op->dest_oid);
870 r = _collection_move_rename(cid, oldoid, cid, newoid);
871 if (r == -ENOENT)
872 r = 0;
873 }
874 break;
875
876 case Transaction::OP_COLL_SETATTR:
877 {
11fdf7f2 878 ceph_abort_msg("not implemented");
7c673cae
FG
879 }
880 break;
881
882 case Transaction::OP_COLL_RMATTR:
883 {
11fdf7f2 884 ceph_abort_msg("not implemented");
7c673cae
FG
885 }
886 break;
887
888 case Transaction::OP_COLL_RENAME:
889 {
11fdf7f2 890 ceph_abort_msg("not implemented");
7c673cae
FG
891 }
892 break;
893
894 case Transaction::OP_OMAP_CLEAR:
895 {
896 coll_t cid = i.get_cid(op->cid);
897 ghobject_t oid = i.get_oid(op->oid);
898 r = _omap_clear(cid, oid);
899 }
900 break;
901 case Transaction::OP_OMAP_SETKEYS:
902 {
903 coll_t cid = i.get_cid(op->cid);
904 ghobject_t oid = i.get_oid(op->oid);
905 bufferlist aset_bl;
906 i.decode_attrset_bl(&aset_bl);
907 r = _omap_setkeys(cid, oid, aset_bl);
908 }
909 break;
910 case Transaction::OP_OMAP_RMKEYS:
911 {
912 coll_t cid = i.get_cid(op->cid);
913 ghobject_t oid = i.get_oid(op->oid);
914 bufferlist keys_bl;
915 i.decode_keyset_bl(&keys_bl);
916 r = _omap_rmkeys(cid, oid, keys_bl);
917 }
918 break;
919 case Transaction::OP_OMAP_RMKEYRANGE:
920 {
921 coll_t cid = i.get_cid(op->cid);
922 ghobject_t oid = i.get_oid(op->oid);
923 string first, last;
924 first = i.decode_string();
925 last = i.decode_string();
926 r = _omap_rmkeyrange(cid, oid, first, last);
927 }
928 break;
929 case Transaction::OP_OMAP_SETHEADER:
930 {
931 coll_t cid = i.get_cid(op->cid);
932 ghobject_t oid = i.get_oid(op->oid);
933 bufferlist bl;
934 i.decode_bl(bl);
935 r = _omap_setheader(cid, oid, bl);
936 }
937 break;
938 case Transaction::OP_SPLIT_COLLECTION:
11fdf7f2 939 ceph_abort_msg("deprecated");
7c673cae
FG
940 break;
941 case Transaction::OP_SPLIT_COLLECTION2:
942 {
943 coll_t cid = i.get_cid(op->cid);
944 uint32_t bits = op->split_bits;
945 uint32_t rem = op->split_rem;
946 coll_t dest = i.get_cid(op->dest_cid);
947 r = _split_collection(cid, bits, rem, dest);
948 }
949 break;
11fdf7f2
TL
950 case Transaction::OP_MERGE_COLLECTION:
951 {
952 coll_t cid = i.get_cid(op->cid);
953 uint32_t bits = op->split_bits;
954 coll_t dest = i.get_cid(op->dest_cid);
955 r = _merge_collection(cid, bits, dest);
956 }
957 break;
7c673cae
FG
958
959 case Transaction::OP_SETALLOCHINT:
960 {
961 r = 0;
962 }
963 break;
964
11fdf7f2
TL
965 case Transaction::OP_COLL_SET_BITS:
966 {
967 r = 0;
968 }
969 break;
970
7c673cae
FG
971 default:
972 derr << "bad op " << op->op << dendl;
973 ceph_abort();
974 }
975
976 if (r < 0) {
977 bool ok = false;
978
979 if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE ||
980 op->op == Transaction::OP_CLONE ||
981 op->op == Transaction::OP_CLONERANGE2 ||
982 op->op == Transaction::OP_COLL_ADD))
983 // -ENOENT is usually okay
984 ok = true;
985 if (r == -ENODATA)
986 ok = true;
987
988 if (!ok) {
989 const char *msg = "unexpected error code";
990
991 if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE ||
992 op->op == Transaction::OP_CLONE ||
993 op->op == Transaction::OP_CLONERANGE2))
994 msg = "ENOENT on clone suggests osd bug";
995
996 if (r == -ENOSPC)
997 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
998 // by partially applying transactions.
999 msg = "ENOSPC from MemStore, misconfigured cluster or insufficient memory";
1000
1001 if (r == -ENOTEMPTY) {
1002 msg = "ENOTEMPTY suggests garbage data in osd data dir";
1003 dump_all();
1004 }
1005
1006 derr << " error " << cpp_strerror(r) << " not handled on operation " << op->op
1007 << " (op " << pos << ", counting from 0)" << dendl;
1008 dout(0) << msg << dendl;
1009 dout(0) << " transaction dump:\n";
1010 JSONFormatter f(true);
1011 f.open_object_section("transaction");
1012 t.dump(&f);
1013 f.close_section();
1014 f.flush(*_dout);
1015 *_dout << dendl;
11fdf7f2 1016 ceph_abort_msg("unexpected error");
7c673cae
FG
1017 }
1018 }
1019
1020 ++pos;
1021 }
1022}
1023
1024int MemStore::_touch(const coll_t& cid, const ghobject_t& oid)
1025{
1026 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1027 CollectionRef c = get_collection(cid);
1028 if (!c)
1029 return -ENOENT;
1030
1031 c->get_or_create_object(oid);
1032 return 0;
1033}
1034
1035int MemStore::_write(const coll_t& cid, const ghobject_t& oid,
1036 uint64_t offset, size_t len, const bufferlist& bl,
1037 uint32_t fadvise_flags)
1038{
1039 dout(10) << __func__ << " " << cid << " " << oid << " "
1040 << offset << "~" << len << dendl;
11fdf7f2 1041 ceph_assert(len == bl.length());
7c673cae
FG
1042
1043 CollectionRef c = get_collection(cid);
1044 if (!c)
1045 return -ENOENT;
1046
1047 ObjectRef o = c->get_or_create_object(oid);
1048 if (len > 0) {
1049 const ssize_t old_size = o->get_size();
1050 o->write(offset, bl);
1051 used_bytes += (o->get_size() - old_size);
1052 }
1053
1054 return 0;
1055}
1056
1057int MemStore::_zero(const coll_t& cid, const ghobject_t& oid,
1058 uint64_t offset, size_t len)
1059{
1060 dout(10) << __func__ << " " << cid << " " << oid << " " << offset << "~"
1061 << len << dendl;
1062 bufferlist bl;
1063 bl.append_zero(len);
1064 return _write(cid, oid, offset, len, bl);
1065}
1066
1067int MemStore::_truncate(const coll_t& cid, const ghobject_t& oid, uint64_t size)
1068{
1069 dout(10) << __func__ << " " << cid << " " << oid << " " << size << dendl;
1070 CollectionRef c = get_collection(cid);
1071 if (!c)
1072 return -ENOENT;
1073
1074 ObjectRef o = c->get_object(oid);
1075 if (!o)
1076 return -ENOENT;
1077 const ssize_t old_size = o->get_size();
1078 int r = o->truncate(size);
1079 used_bytes += (o->get_size() - old_size);
1080 return r;
1081}
1082
1083int MemStore::_remove(const coll_t& cid, const ghobject_t& oid)
1084{
1085 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1086 CollectionRef c = get_collection(cid);
1087 if (!c)
1088 return -ENOENT;
11fdf7f2 1089 std::lock_guard l{c->lock};
7c673cae
FG
1090
1091 auto i = c->object_hash.find(oid);
1092 if (i == c->object_hash.end())
1093 return -ENOENT;
1094 used_bytes -= i->second->get_size();
1095 c->object_hash.erase(i);
1096 c->object_map.erase(oid);
1097
1098 return 0;
1099}
1100
1101int MemStore::_setattrs(const coll_t& cid, const ghobject_t& oid,
1102 map<string,bufferptr>& aset)
1103{
1104 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1105 CollectionRef c = get_collection(cid);
1106 if (!c)
1107 return -ENOENT;
1108
1109 ObjectRef o = c->get_object(oid);
1110 if (!o)
1111 return -ENOENT;
11fdf7f2 1112 std::lock_guard lock{o->xattr_mutex};
7c673cae
FG
1113 for (map<string,bufferptr>::const_iterator p = aset.begin(); p != aset.end(); ++p)
1114 o->xattr[p->first] = p->second;
1115 return 0;
1116}
1117
1118int MemStore::_rmattr(const coll_t& cid, const ghobject_t& oid, const char *name)
1119{
1120 dout(10) << __func__ << " " << cid << " " << oid << " " << name << dendl;
1121 CollectionRef c = get_collection(cid);
1122 if (!c)
1123 return -ENOENT;
1124
1125 ObjectRef o = c->get_object(oid);
1126 if (!o)
1127 return -ENOENT;
11fdf7f2 1128 std::lock_guard lock{o->xattr_mutex};
7c673cae
FG
1129 auto i = o->xattr.find(name);
1130 if (i == o->xattr.end())
1131 return -ENODATA;
1132 o->xattr.erase(i);
1133 return 0;
1134}
1135
1136int MemStore::_rmattrs(const coll_t& cid, const ghobject_t& oid)
1137{
1138 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1139 CollectionRef c = get_collection(cid);
1140 if (!c)
1141 return -ENOENT;
1142
1143 ObjectRef o = c->get_object(oid);
1144 if (!o)
1145 return -ENOENT;
11fdf7f2 1146 std::lock_guard lock{o->xattr_mutex};
7c673cae
FG
1147 o->xattr.clear();
1148 return 0;
1149}
1150
1151int MemStore::_clone(const coll_t& cid, const ghobject_t& oldoid,
1152 const ghobject_t& newoid)
1153{
1154 dout(10) << __func__ << " " << cid << " " << oldoid
1155 << " -> " << newoid << dendl;
1156 CollectionRef c = get_collection(cid);
1157 if (!c)
1158 return -ENOENT;
1159
1160 ObjectRef oo = c->get_object(oldoid);
1161 if (!oo)
1162 return -ENOENT;
1163 ObjectRef no = c->get_or_create_object(newoid);
1164 used_bytes += oo->get_size() - no->get_size();
1165 no->clone(oo.get(), 0, oo->get_size(), 0);
1166
1167 // take xattr and omap locks with std::lock()
11fdf7f2
TL
1168 std::scoped_lock l{oo->xattr_mutex,
1169 no->xattr_mutex,
1170 oo->omap_mutex,
1171 no->omap_mutex};
7c673cae
FG
1172
1173 no->omap_header = oo->omap_header;
1174 no->omap = oo->omap;
1175 no->xattr = oo->xattr;
1176 return 0;
1177}
1178
1179int MemStore::_clone_range(const coll_t& cid, const ghobject_t& oldoid,
1180 const ghobject_t& newoid,
1181 uint64_t srcoff, uint64_t len, uint64_t dstoff)
1182{
1183 dout(10) << __func__ << " " << cid << " "
1184 << oldoid << " " << srcoff << "~" << len << " -> "
1185 << newoid << " " << dstoff << "~" << len
1186 << dendl;
1187 CollectionRef c = get_collection(cid);
1188 if (!c)
1189 return -ENOENT;
1190
1191 ObjectRef oo = c->get_object(oldoid);
1192 if (!oo)
1193 return -ENOENT;
1194 ObjectRef no = c->get_or_create_object(newoid);
1195 if (srcoff >= oo->get_size())
1196 return 0;
1197 if (srcoff + len >= oo->get_size())
1198 len = oo->get_size() - srcoff;
1199
1200 const ssize_t old_size = no->get_size();
1201 no->clone(oo.get(), srcoff, len, dstoff);
1202 used_bytes += (no->get_size() - old_size);
1203
1204 return len;
1205}
1206
1207int MemStore::_omap_clear(const coll_t& cid, const ghobject_t &oid)
1208{
1209 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1210 CollectionRef c = get_collection(cid);
1211 if (!c)
1212 return -ENOENT;
1213
1214 ObjectRef o = c->get_object(oid);
1215 if (!o)
1216 return -ENOENT;
11fdf7f2 1217 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
1218 o->omap.clear();
1219 o->omap_header.clear();
1220 return 0;
1221}
1222
1223int MemStore::_omap_setkeys(const coll_t& cid, const ghobject_t &oid,
1224 bufferlist& aset_bl)
1225{
1226 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1227 CollectionRef c = get_collection(cid);
1228 if (!c)
1229 return -ENOENT;
1230
1231 ObjectRef o = c->get_object(oid);
1232 if (!o)
1233 return -ENOENT;
11fdf7f2
TL
1234 std::lock_guard lock{o->omap_mutex};
1235 auto p = aset_bl.cbegin();
7c673cae 1236 __u32 num;
11fdf7f2 1237 decode(num, p);
7c673cae
FG
1238 while (num--) {
1239 string key;
11fdf7f2
TL
1240 decode(key, p);
1241 decode(o->omap[key], p);
7c673cae
FG
1242 }
1243 return 0;
1244}
1245
1246int MemStore::_omap_rmkeys(const coll_t& cid, const ghobject_t &oid,
1247 bufferlist& keys_bl)
1248{
1249 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1250 CollectionRef c = get_collection(cid);
1251 if (!c)
1252 return -ENOENT;
1253
1254 ObjectRef o = c->get_object(oid);
1255 if (!o)
1256 return -ENOENT;
11fdf7f2
TL
1257 std::lock_guard lock{o->omap_mutex};
1258 auto p = keys_bl.cbegin();
7c673cae 1259 __u32 num;
11fdf7f2 1260 decode(num, p);
7c673cae
FG
1261 while (num--) {
1262 string key;
11fdf7f2 1263 decode(key, p);
7c673cae
FG
1264 o->omap.erase(key);
1265 }
1266 return 0;
1267}
1268
1269int MemStore::_omap_rmkeyrange(const coll_t& cid, const ghobject_t &oid,
1270 const string& first, const string& last)
1271{
1272 dout(10) << __func__ << " " << cid << " " << oid << " " << first
1273 << " " << last << dendl;
1274 CollectionRef c = get_collection(cid);
1275 if (!c)
1276 return -ENOENT;
1277
1278 ObjectRef o = c->get_object(oid);
1279 if (!o)
1280 return -ENOENT;
11fdf7f2 1281 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
1282 map<string,bufferlist>::iterator p = o->omap.lower_bound(first);
1283 map<string,bufferlist>::iterator e = o->omap.lower_bound(last);
1284 o->omap.erase(p, e);
1285 return 0;
1286}
1287
1288int MemStore::_omap_setheader(const coll_t& cid, const ghobject_t &oid,
1289 const bufferlist &bl)
1290{
1291 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1292 CollectionRef c = get_collection(cid);
1293 if (!c)
1294 return -ENOENT;
1295
1296 ObjectRef o = c->get_object(oid);
1297 if (!o)
1298 return -ENOENT;
11fdf7f2 1299 std::lock_guard lock{o->omap_mutex};
7c673cae
FG
1300 o->omap_header = bl;
1301 return 0;
1302}
1303
1304int MemStore::_create_collection(const coll_t& cid, int bits)
1305{
1306 dout(10) << __func__ << " " << cid << dendl;
11fdf7f2 1307 std::lock_guard l{coll_lock};
7c673cae
FG
1308 auto result = coll_map.insert(std::make_pair(cid, CollectionRef()));
1309 if (!result.second)
1310 return -EEXIST;
11fdf7f2
TL
1311 auto p = new_coll_map.find(cid);
1312 ceph_assert(p != new_coll_map.end());
1313 result.first->second = p->second;
7c673cae 1314 result.first->second->bits = bits;
11fdf7f2 1315 new_coll_map.erase(p);
7c673cae
FG
1316 return 0;
1317}
1318
1319int MemStore::_destroy_collection(const coll_t& cid)
1320{
1321 dout(10) << __func__ << " " << cid << dendl;
11fdf7f2 1322 std::lock_guard l{coll_lock};
7c673cae
FG
1323 ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
1324 if (cp == coll_map.end())
1325 return -ENOENT;
1326 {
11fdf7f2 1327 std::shared_lock l2{cp->second->lock};
7c673cae
FG
1328 if (!cp->second->object_map.empty())
1329 return -ENOTEMPTY;
1330 cp->second->exists = false;
1331 }
1332 used_bytes -= cp->second->used_bytes();
1333 coll_map.erase(cp);
1334 return 0;
1335}
1336
1337int MemStore::_collection_add(const coll_t& cid, const coll_t& ocid, const ghobject_t& oid)
1338{
1339 dout(10) << __func__ << " " << cid << " " << ocid << " " << oid << dendl;
1340 CollectionRef c = get_collection(cid);
1341 if (!c)
1342 return -ENOENT;
1343 CollectionRef oc = get_collection(ocid);
1344 if (!oc)
1345 return -ENOENT;
11fdf7f2
TL
1346
1347 std::scoped_lock l{std::min(&(*c), &(*oc))->lock,
1348 std::max(&(*c), &(*oc))->lock};
7c673cae
FG
1349
1350 if (c->object_hash.count(oid))
1351 return -EEXIST;
1352 if (oc->object_hash.count(oid) == 0)
1353 return -ENOENT;
1354 ObjectRef o = oc->object_hash[oid];
1355 c->object_map[oid] = o;
1356 c->object_hash[oid] = o;
1357 return 0;
1358}
1359
1360int MemStore::_collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid,
1361 coll_t cid, const ghobject_t& oid)
1362{
1363 dout(10) << __func__ << " " << oldcid << " " << oldoid << " -> "
1364 << cid << " " << oid << dendl;
1365 CollectionRef c = get_collection(cid);
1366 if (!c)
1367 return -ENOENT;
1368 CollectionRef oc = get_collection(oldcid);
1369 if (!oc)
1370 return -ENOENT;
1371
1372 // note: c and oc may be the same
11fdf7f2 1373 ceph_assert(&(*c) == &(*oc));
7c673cae 1374
11fdf7f2 1375 std::lock_guard l{c->lock};
7c673cae 1376 if (c->object_hash.count(oid))
11fdf7f2 1377 return -EEXIST;
7c673cae 1378 if (oc->object_hash.count(oldoid) == 0)
11fdf7f2 1379 return -ENOENT;
7c673cae
FG
1380 {
1381 ObjectRef o = oc->object_hash[oldoid];
1382 c->object_map[oid] = o;
1383 c->object_hash[oid] = o;
1384 oc->object_map.erase(oldoid);
1385 oc->object_hash.erase(oldoid);
1386 }
11fdf7f2 1387 return 0;
7c673cae
FG
1388}
1389
1390int MemStore::_split_collection(const coll_t& cid, uint32_t bits, uint32_t match,
1391 coll_t dest)
1392{
1393 dout(10) << __func__ << " " << cid << " " << bits << " " << match << " "
1394 << dest << dendl;
1395 CollectionRef sc = get_collection(cid);
1396 if (!sc)
1397 return -ENOENT;
1398 CollectionRef dc = get_collection(dest);
1399 if (!dc)
1400 return -ENOENT;
11fdf7f2
TL
1401
1402 std::scoped_lock l{std::min(&(*sc), &(*dc))->lock,
1403 std::max(&(*sc), &(*dc))->lock};
7c673cae
FG
1404
1405 map<ghobject_t,ObjectRef>::iterator p = sc->object_map.begin();
1406 while (p != sc->object_map.end()) {
1407 if (p->first.match(bits, match)) {
1408 dout(20) << " moving " << p->first << dendl;
1409 dc->object_map.insert(make_pair(p->first, p->second));
1410 dc->object_hash.insert(make_pair(p->first, p->second));
1411 sc->object_hash.erase(p->first);
1412 sc->object_map.erase(p++);
1413 } else {
1414 ++p;
1415 }
1416 }
1417
1418 sc->bits = bits;
11fdf7f2
TL
1419 ceph_assert(dc->bits == (int)bits);
1420
1421 return 0;
1422}
1423
1424int MemStore::_merge_collection(const coll_t& cid, uint32_t bits, coll_t dest)
1425{
1426 dout(10) << __func__ << " " << cid << " " << bits << " "
1427 << dest << dendl;
1428 CollectionRef sc = get_collection(cid);
1429 if (!sc)
1430 return -ENOENT;
1431 CollectionRef dc = get_collection(dest);
1432 if (!dc)
1433 return -ENOENT;
1434 {
1435 std::scoped_lock l{std::min(&(*sc), &(*dc))->lock,
1436 std::max(&(*sc), &(*dc))->lock};
1437
1438 map<ghobject_t,ObjectRef>::iterator p = sc->object_map.begin();
1439 while (p != sc->object_map.end()) {
1440 dout(20) << " moving " << p->first << dendl;
1441 dc->object_map.insert(make_pair(p->first, p->second));
1442 dc->object_hash.insert(make_pair(p->first, p->second));
1443 sc->object_hash.erase(p->first);
1444 sc->object_map.erase(p++);
1445 }
1446
1447 dc->bits = bits;
1448 }
1449
1450 {
1451 std::lock_guard l{coll_lock};
1452 ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
1453 ceph_assert(cp != coll_map.end());
1454 used_bytes -= cp->second->used_bytes();
1455 coll_map.erase(cp);
1456 }
7c673cae
FG
1457
1458 return 0;
1459}
11fdf7f2 1460
7c673cae
FG
1461namespace {
1462struct BufferlistObject : public MemStore::Object {
11fdf7f2 1463 ceph::spinlock mutex;
7c673cae
FG
1464 bufferlist data;
1465
1466 size_t get_size() const override { return data.length(); }
1467
1468 int read(uint64_t offset, uint64_t len, bufferlist &bl) override;
1469 int write(uint64_t offset, const bufferlist &bl) override;
1470 int clone(Object *src, uint64_t srcoff, uint64_t len,
1471 uint64_t dstoff) override;
1472 int truncate(uint64_t offset) override;
1473
1474 void encode(bufferlist& bl) const override {
1475 ENCODE_START(1, 1, bl);
11fdf7f2 1476 encode(data, bl);
7c673cae
FG
1477 encode_base(bl);
1478 ENCODE_FINISH(bl);
1479 }
11fdf7f2 1480 void decode(bufferlist::const_iterator& p) override {
7c673cae 1481 DECODE_START(1, p);
11fdf7f2 1482 decode(data, p);
7c673cae
FG
1483 decode_base(p);
1484 DECODE_FINISH(p);
1485 }
1486};
1487}
1488// BufferlistObject
1489int BufferlistObject::read(uint64_t offset, uint64_t len,
1490 bufferlist &bl)
1491{
11fdf7f2 1492 std::lock_guard<decltype(mutex)> lock(mutex);
7c673cae
FG
1493 bl.substr_of(data, offset, len);
1494 return bl.length();
1495}
1496
1497int BufferlistObject::write(uint64_t offset, const bufferlist &src)
1498{
1499 unsigned len = src.length();
1500
11fdf7f2 1501 std::lock_guard<decltype(mutex)> lock(mutex);
7c673cae
FG
1502
1503 // before
1504 bufferlist newdata;
1505 if (get_size() >= offset) {
1506 newdata.substr_of(data, 0, offset);
1507 } else {
1508 if (get_size()) {
1509 newdata.substr_of(data, 0, get_size());
1510 }
1511 newdata.append_zero(offset - get_size());
1512 }
1513
1514 newdata.append(src);
1515
1516 // after
1517 if (get_size() > offset + len) {
1518 bufferlist tail;
1519 tail.substr_of(data, offset + len, get_size() - (offset + len));
1520 newdata.append(tail);
1521 }
1522
1523 data.claim(newdata);
1524 return 0;
1525}
1526
1527int BufferlistObject::clone(Object *src, uint64_t srcoff,
1528 uint64_t len, uint64_t dstoff)
1529{
1530 auto srcbl = dynamic_cast<BufferlistObject*>(src);
1531 if (srcbl == nullptr)
1532 return -ENOTSUP;
1533
1534 bufferlist bl;
1535 {
11fdf7f2 1536 std::lock_guard<decltype(srcbl->mutex)> lock(srcbl->mutex);
7c673cae
FG
1537 if (srcoff == dstoff && len == src->get_size()) {
1538 data = srcbl->data;
1539 return 0;
1540 }
1541 bl.substr_of(srcbl->data, srcoff, len);
1542 }
1543 return write(dstoff, bl);
1544}
1545
1546int BufferlistObject::truncate(uint64_t size)
1547{
11fdf7f2 1548 std::lock_guard<decltype(mutex)> lock(mutex);
7c673cae
FG
1549 if (get_size() > size) {
1550 bufferlist bl;
1551 bl.substr_of(data, 0, size);
1552 data.claim(bl);
1553 } else if (get_size() == size) {
1554 // do nothing
1555 } else {
1556 data.append_zero(size - get_size());
1557 }
1558 return 0;
1559}
1560
1561// PageSetObject
1562
1563struct MemStore::PageSetObject : public Object {
1564 PageSet data;
1565 uint64_t data_len;
1566#if defined(__GLIBCXX__)
1567 // use a thread-local vector for the pages returned by PageSet, so we
1568 // can avoid allocations in read/write()
1569 static thread_local PageSet::page_vector tls_pages;
1570#endif
1571
1572 explicit PageSetObject(size_t page_size) : data(page_size), data_len(0) {}
1573
1574 size_t get_size() const override { return data_len; }
1575
1576 int read(uint64_t offset, uint64_t len, bufferlist &bl) override;
1577 int write(uint64_t offset, const bufferlist &bl) override;
1578 int clone(Object *src, uint64_t srcoff, uint64_t len,
1579 uint64_t dstoff) override;
1580 int truncate(uint64_t offset) override;
1581
1582 void encode(bufferlist& bl) const override {
1583 ENCODE_START(1, 1, bl);
11fdf7f2 1584 encode(data_len, bl);
7c673cae
FG
1585 data.encode(bl);
1586 encode_base(bl);
1587 ENCODE_FINISH(bl);
1588 }
11fdf7f2 1589 void decode(bufferlist::const_iterator& p) override {
7c673cae 1590 DECODE_START(1, p);
11fdf7f2 1591 decode(data_len, p);
7c673cae
FG
1592 data.decode(p);
1593 decode_base(p);
1594 DECODE_FINISH(p);
1595 }
1596};
1597
1598#if defined(__GLIBCXX__)
1599// use a thread-local vector for the pages returned by PageSet, so we
1600// can avoid allocations in read/write()
1601thread_local PageSet::page_vector MemStore::PageSetObject::tls_pages;
1602#define DEFINE_PAGE_VECTOR(name)
1603#else
1604#define DEFINE_PAGE_VECTOR(name) PageSet::page_vector name;
1605#endif
1606
1607int MemStore::PageSetObject::read(uint64_t offset, uint64_t len, bufferlist& bl)
1608{
1609 const auto start = offset;
1610 const auto end = offset + len;
1611 auto remaining = len;
1612
1613 DEFINE_PAGE_VECTOR(tls_pages);
1614 data.get_range(offset, len, tls_pages);
1615
1616 // allocate a buffer for the data
1617 buffer::ptr buf(len);
1618
1619 auto p = tls_pages.begin();
1620 while (remaining) {
1621 // no more pages in range
1622 if (p == tls_pages.end() || (*p)->offset >= end) {
1623 buf.zero(offset - start, remaining);
1624 break;
1625 }
1626 auto page = *p;
1627
1628 // fill any holes between pages with zeroes
1629 if (page->offset > offset) {
1630 const auto count = std::min(remaining, page->offset - offset);
1631 buf.zero(offset - start, count);
1632 remaining -= count;
1633 offset = page->offset;
1634 if (!remaining)
1635 break;
1636 }
1637
1638 // read from page
1639 const auto page_offset = offset - page->offset;
1640 const auto count = min(remaining, data.get_page_size() - page_offset);
1641
1642 buf.copy_in(offset - start, count, page->data + page_offset);
1643
1644 remaining -= count;
1645 offset += count;
1646
1647 ++p;
1648 }
1649
1650 tls_pages.clear(); // drop page refs
1651
1652 bl.append(std::move(buf));
1653 return len;
1654}
1655
1656int MemStore::PageSetObject::write(uint64_t offset, const bufferlist &src)
1657{
1658 unsigned len = src.length();
1659
1660 DEFINE_PAGE_VECTOR(tls_pages);
1661 // make sure the page range is allocated
1662 data.alloc_range(offset, src.length(), tls_pages);
1663
1664 auto page = tls_pages.begin();
1665
1666 auto p = src.begin();
1667 while (len > 0) {
1668 unsigned page_offset = offset - (*page)->offset;
1669 unsigned pageoff = data.get_page_size() - page_offset;
1670 unsigned count = min(len, pageoff);
1671 p.copy(count, (*page)->data + page_offset);
1672 offset += count;
1673 len -= count;
1674 if (count == pageoff)
1675 ++page;
1676 }
1677 if (data_len < offset)
1678 data_len = offset;
1679 tls_pages.clear(); // drop page refs
1680 return 0;
1681}
1682
1683int MemStore::PageSetObject::clone(Object *src, uint64_t srcoff,
1684 uint64_t len, uint64_t dstoff)
1685{
1686 const int64_t delta = dstoff - srcoff;
1687
1688 auto &src_data = static_cast<PageSetObject*>(src)->data;
1689 const uint64_t src_page_size = src_data.get_page_size();
1690
1691 auto &dst_data = data;
1692 const auto dst_page_size = dst_data.get_page_size();
1693
1694 DEFINE_PAGE_VECTOR(tls_pages);
1695 PageSet::page_vector dst_pages;
1696
1697 while (len) {
1698 // limit to 16 pages at a time so tls_pages doesn't balloon in size
1699 auto count = std::min(len, (uint64_t)src_page_size * 16);
1700 src_data.get_range(srcoff, count, tls_pages);
1701
1702 // allocate the destination range
1703 // TODO: avoid allocating pages for holes in the source range
1704 dst_data.alloc_range(srcoff + delta, count, dst_pages);
1705 auto dst_iter = dst_pages.begin();
1706
1707 for (auto &src_page : tls_pages) {
1708 auto sbegin = std::max(srcoff, src_page->offset);
1709 auto send = std::min(srcoff + count, src_page->offset + src_page_size);
1710
1711 // zero-fill holes before src_page
1712 if (srcoff < sbegin) {
1713 while (dst_iter != dst_pages.end()) {
1714 auto &dst_page = *dst_iter;
1715 auto dbegin = std::max(srcoff + delta, dst_page->offset);
1716 auto dend = std::min(sbegin + delta, dst_page->offset + dst_page_size);
1717 std::fill(dst_page->data + dbegin - dst_page->offset,
1718 dst_page->data + dend - dst_page->offset, 0);
1719 if (dend < dst_page->offset + dst_page_size)
1720 break;
1721 ++dst_iter;
1722 }
1723 const auto c = sbegin - srcoff;
1724 count -= c;
1725 len -= c;
1726 }
1727
1728 // copy data from src page to dst pages
1729 while (dst_iter != dst_pages.end()) {
1730 auto &dst_page = *dst_iter;
1731 auto dbegin = std::max(sbegin + delta, dst_page->offset);
1732 auto dend = std::min(send + delta, dst_page->offset + dst_page_size);
1733
1734 std::copy(src_page->data + (dbegin - delta) - src_page->offset,
1735 src_page->data + (dend - delta) - src_page->offset,
1736 dst_page->data + dbegin - dst_page->offset);
1737 if (dend < dst_page->offset + dst_page_size)
1738 break;
1739 ++dst_iter;
1740 }
1741
1742 const auto c = send - sbegin;
1743 count -= c;
1744 len -= c;
1745 srcoff = send;
1746 dstoff = send + delta;
1747 }
1748 tls_pages.clear(); // drop page refs
1749
1750 // zero-fill holes after the last src_page
1751 if (count > 0) {
1752 while (dst_iter != dst_pages.end()) {
1753 auto &dst_page = *dst_iter;
1754 auto dbegin = std::max(dstoff, dst_page->offset);
1755 auto dend = std::min(dstoff + count, dst_page->offset + dst_page_size);
1756 std::fill(dst_page->data + dbegin - dst_page->offset,
1757 dst_page->data + dend - dst_page->offset, 0);
1758 ++dst_iter;
1759 }
1760 srcoff += count;
1761 dstoff += count;
1762 len -= count;
1763 }
1764 dst_pages.clear(); // drop page refs
1765 }
1766
1767 // update object size
1768 if (data_len < dstoff)
1769 data_len = dstoff;
1770 return 0;
1771}
1772
1773int MemStore::PageSetObject::truncate(uint64_t size)
1774{
1775 data.free_pages_after(size);
1776 data_len = size;
1777
1778 const auto page_size = data.get_page_size();
1779 const auto page_offset = size & ~(page_size-1);
1780 if (page_offset == size)
1781 return 0;
1782
1783 DEFINE_PAGE_VECTOR(tls_pages);
1784 // write zeroes to the rest of the last page
1785 data.get_range(page_offset, page_size, tls_pages);
1786 if (tls_pages.empty())
1787 return 0;
1788
1789 auto page = tls_pages.begin();
1790 auto data = (*page)->data;
1791 std::fill(data + (size - page_offset), data + page_size, 0);
1792 tls_pages.clear(); // drop page ref
1793 return 0;
1794}
1795
1796
1797MemStore::ObjectRef MemStore::Collection::create_object() const {
1798 if (use_page_set)
1799 return new PageSetObject(cct->_conf->memstore_page_size);
1800 return new BufferlistObject();
1801}