1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #ifdef HAVE_SYS_MOUNT_H
17 #include <sys/mount.h>
20 #ifdef HAVE_SYS_PARAM_H
21 #include <sys/param.h>
24 #include "include/types.h"
25 #include "include/stringify.h"
26 #include "include/unordered_map.h"
27 #include "include/memory.h"
28 #include "common/errno.h"
30 #include "include/compat.h"
32 #define dout_context cct
33 #define dout_subsys ceph_subsys_filestore
35 #define dout_prefix *_dout << "memstore(" << path << ") "
37 // for comparing collections for lock ordering
38 bool operator>(const MemStore::CollectionRef
& l
,
39 const MemStore::CollectionRef
& r
)
41 return (unsigned long)l
.get() > (unsigned long)r
.get();
54 int MemStore::umount()
56 finisher
.wait_for_empty();
63 dout(10) << __func__
<< dendl
;
65 set
<coll_t
> collections
;
66 for (ceph::unordered_map
<coll_t
,CollectionRef
>::iterator p
= coll_map
.begin();
69 dout(20) << __func__
<< " coll " << p
->first
<< " " << p
->second
<< dendl
;
70 collections
.insert(p
->first
);
73 p
->second
->encode(bl
);
74 string fn
= path
+ "/" + stringify(p
->first
);
75 int r
= bl
.write_file(fn
.c_str());
80 string fn
= path
+ "/collections";
82 ::encode(collections
, bl
);
83 int r
= bl
.write_file(fn
.c_str());
90 void MemStore::dump_all()
92 Formatter
*f
= Formatter::create("json-pretty");
93 f
->open_object_section("store");
102 void MemStore::dump(Formatter
*f
)
104 f
->open_array_section("collections");
105 for (ceph::unordered_map
<coll_t
,CollectionRef
>::iterator p
= coll_map
.begin();
108 f
->open_object_section("collection");
109 f
->dump_string("name", stringify(p
->first
));
111 f
->open_array_section("xattrs");
112 for (map
<string
,bufferptr
>::iterator q
= p
->second
->xattr
.begin();
113 q
!= p
->second
->xattr
.end();
115 f
->open_object_section("xattr");
116 f
->dump_string("name", q
->first
);
117 f
->dump_int("length", q
->second
.length());
122 f
->open_array_section("objects");
123 for (map
<ghobject_t
,ObjectRef
>::iterator q
= p
->second
->object_map
.begin();
124 q
!= p
->second
->object_map
.end();
126 f
->open_object_section("object");
127 f
->dump_string("name", stringify(q
->first
));
139 int MemStore::_load()
141 dout(10) << __func__
<< dendl
;
143 string fn
= path
+ "/collections";
145 int r
= bl
.read_file(fn
.c_str(), &err
);
149 set
<coll_t
> collections
;
150 bufferlist::iterator p
= bl
.begin();
151 ::decode(collections
, p
);
153 for (set
<coll_t
>::iterator q
= collections
.begin();
154 q
!= collections
.end();
156 string fn
= path
+ "/" + stringify(*q
);
158 int r
= cbl
.read_file(fn
.c_str(), &err
);
161 CollectionRef
c(new Collection(cct
, *q
));
162 bufferlist::iterator p
= cbl
.begin();
165 used_bytes
+= c
->used_bytes();
173 void MemStore::set_fsid(uuid_d u
)
175 int r
= write_meta("fs_fsid", stringify(u
));
179 uuid_d
MemStore::get_fsid()
182 int r
= read_meta("fs_fsid", &fsid_str
);
185 bool b
= uuid
.parse(fsid_str
.c_str());
193 int r
= read_meta("fs_fsid", &fsid_str
);
196 fsid
.generate_random();
197 fsid_str
= stringify(fsid
);
198 r
= write_meta("fs_fsid", fsid_str
);
201 dout(1) << __func__
<< " new fsid " << fsid_str
<< dendl
;
205 dout(1) << __func__
<< " had fsid " << fsid_str
<< dendl
;
208 string fn
= path
+ "/collections";
209 derr
<< path
<< dendl
;
211 set
<coll_t
> collections
;
212 ::encode(collections
, bl
);
213 r
= bl
.write_file(fn
.c_str());
217 r
= write_meta("type", "memstore");
224 int MemStore::statfs(struct store_statfs_t
*st
)
226 dout(10) << __func__
<< dendl
;
228 st
->total
= cct
->_conf
->memstore_device_bytes
;
229 st
->available
= MAX(int64_t(st
->total
) - int64_t(used_bytes
), 0ll);
230 dout(10) << __func__
<< ": used_bytes: " << used_bytes
231 << "/" << cct
->_conf
->memstore_device_bytes
<< dendl
;
235 objectstore_perf_stat_t
MemStore::get_cur_stats()
238 return objectstore_perf_stat_t();
241 MemStore::CollectionRef
MemStore::get_collection(const coll_t
& cid
)
243 RWLock::RLocker
l(coll_lock
);
244 ceph::unordered_map
<coll_t
,CollectionRef
>::iterator cp
= coll_map
.find(cid
);
245 if (cp
== coll_map
.end())
246 return CollectionRef();
254 bool MemStore::exists(const coll_t
& cid
, const ghobject_t
& oid
)
256 CollectionHandle c
= get_collection(cid
);
259 return exists(c
, oid
);
262 bool MemStore::exists(CollectionHandle
&c_
, const ghobject_t
& oid
)
264 Collection
*c
= static_cast<Collection
*>(c_
.get());
265 dout(10) << __func__
<< " " << c
->get_cid() << " " << oid
<< dendl
;
269 // Perform equivalent of c->get_object_(oid) != NULL. In C++11 the
270 // shared_ptr needs to be compared to nullptr.
271 return (bool)c
->get_object(oid
);
276 const ghobject_t
& oid
,
280 CollectionHandle c
= get_collection(cid
);
283 return stat(c
, oid
, st
, allow_eio
);
287 CollectionHandle
&c_
,
288 const ghobject_t
& oid
,
292 Collection
*c
= static_cast<Collection
*>(c_
.get());
293 dout(10) << __func__
<< " " << c
->cid
<< " " << oid
<< dendl
;
296 ObjectRef o
= c
->get_object(oid
);
299 st
->st_size
= o
->get_size();
300 st
->st_blksize
= 4096;
301 st
->st_blocks
= (st
->st_size
+ st
->st_blksize
- 1) / st
->st_blksize
;
306 int MemStore::set_collection_opts(
308 const pool_opts_t
& opts
)
315 const ghobject_t
& oid
,
322 CollectionHandle c
= get_collection(cid
);
325 return read(c
, oid
, offset
, len
, bl
, op_flags
, allow_eio
);
329 CollectionHandle
&c_
,
330 const ghobject_t
& oid
,
337 Collection
*c
= static_cast<Collection
*>(c_
.get());
338 dout(10) << __func__
<< " " << c
->cid
<< " " << oid
<< " "
339 << offset
<< "~" << len
<< dendl
;
342 ObjectRef o
= c
->get_object(oid
);
345 if (offset
>= o
->get_size())
348 if (l
== 0 && offset
== 0) // note: len == 0 means read the entire object
350 else if (offset
+ l
> o
->get_size())
351 l
= o
->get_size() - offset
;
353 return o
->read(offset
, l
, bl
);
356 int MemStore::fiemap(const coll_t
& cid
, const ghobject_t
& oid
,
357 uint64_t offset
, size_t len
, bufferlist
& bl
)
359 map
<uint64_t, uint64_t> destmap
;
360 int r
= fiemap(cid
, oid
, offset
, len
, destmap
);
362 ::encode(destmap
, bl
);
366 int MemStore::fiemap(const coll_t
& cid
, const ghobject_t
& oid
,
367 uint64_t offset
, size_t len
, map
<uint64_t, uint64_t>& destmap
)
369 dout(10) << __func__
<< " " << cid
<< " " << oid
<< " " << offset
<< "~"
371 CollectionRef c
= get_collection(cid
);
375 ObjectRef o
= c
->get_object(oid
);
379 if (offset
+ l
> o
->get_size())
380 l
= o
->get_size() - offset
;
381 if (offset
>= o
->get_size())
388 int MemStore::getattr(const coll_t
& cid
, const ghobject_t
& oid
,
389 const char *name
, bufferptr
& value
)
391 CollectionHandle c
= get_collection(cid
);
394 return getattr(c
, oid
, name
, value
);
397 int MemStore::getattr(CollectionHandle
&c_
, const ghobject_t
& oid
,
398 const char *name
, bufferptr
& value
)
400 Collection
*c
= static_cast<Collection
*>(c_
.get());
401 dout(10) << __func__
<< " " << c
->cid
<< " " << oid
<< " " << name
<< dendl
;
404 ObjectRef o
= c
->get_object(oid
);
408 std::lock_guard
<std::mutex
> lock(o
->xattr_mutex
);
409 if (!o
->xattr
.count(k
)) {
416 int MemStore::getattrs(const coll_t
& cid
, const ghobject_t
& oid
,
417 map
<string
,bufferptr
>& aset
)
419 CollectionHandle c
= get_collection(cid
);
422 return getattrs(c
, oid
, aset
);
425 int MemStore::getattrs(CollectionHandle
&c_
, const ghobject_t
& oid
,
426 map
<string
,bufferptr
>& aset
)
428 Collection
*c
= static_cast<Collection
*>(c_
.get());
429 dout(10) << __func__
<< " " << c
->cid
<< " " << oid
<< dendl
;
433 ObjectRef o
= c
->get_object(oid
);
436 std::lock_guard
<std::mutex
> lock(o
->xattr_mutex
);
441 int MemStore::list_collections(vector
<coll_t
>& ls
)
443 dout(10) << __func__
<< dendl
;
444 RWLock::RLocker
l(coll_lock
);
445 for (ceph::unordered_map
<coll_t
,CollectionRef
>::iterator p
= coll_map
.begin();
448 ls
.push_back(p
->first
);
453 bool MemStore::collection_exists(const coll_t
& cid
)
455 dout(10) << __func__
<< " " << cid
<< dendl
;
456 RWLock::RLocker
l(coll_lock
);
457 return coll_map
.count(cid
);
460 int MemStore::collection_empty(const coll_t
& cid
, bool *empty
)
462 dout(10) << __func__
<< " " << cid
<< dendl
;
463 CollectionRef c
= get_collection(cid
);
466 RWLock::RLocker
l(c
->lock
);
467 *empty
= c
->object_map
.empty();
471 int MemStore::collection_bits(const coll_t
& cid
)
473 dout(10) << __func__
<< " " << cid
<< dendl
;
474 CollectionRef c
= get_collection(cid
);
477 RWLock::RLocker
l(c
->lock
);
481 int MemStore::collection_list(const coll_t
& cid
,
482 const ghobject_t
& start
,
483 const ghobject_t
& end
,
485 vector
<ghobject_t
> *ls
, ghobject_t
*next
)
487 CollectionRef c
= get_collection(cid
);
490 RWLock::RLocker
l(c
->lock
);
492 dout(10) << __func__
<< " cid " << cid
<< " start " << start
493 << " end " << end
<< dendl
;
494 map
<ghobject_t
,ObjectRef
>::iterator p
= c
->object_map
.lower_bound(start
);
495 while (p
!= c
->object_map
.end() &&
496 ls
->size() < (unsigned)max
&&
498 ls
->push_back(p
->first
);
502 if (p
== c
->object_map
.end())
503 *next
= ghobject_t::get_max();
507 dout(10) << __func__
<< " cid " << cid
<< " got " << ls
->size() << dendl
;
511 int MemStore::omap_get(
512 const coll_t
& cid
, ///< [in] Collection containing oid
513 const ghobject_t
&oid
, ///< [in] Object containing omap
514 bufferlist
*header
, ///< [out] omap header
515 map
<string
, bufferlist
> *out
/// < [out] Key to value map
518 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
519 CollectionRef c
= get_collection(cid
);
523 ObjectRef o
= c
->get_object(oid
);
526 std::lock_guard
<std::mutex
> lock(o
->omap_mutex
);
527 *header
= o
->omap_header
;
532 int MemStore::omap_get_header(
533 const coll_t
& cid
, ///< [in] Collection containing oid
534 const ghobject_t
&oid
, ///< [in] Object containing omap
535 bufferlist
*header
, ///< [out] omap header
536 bool allow_eio
///< [in] don't assert on eio
539 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
540 CollectionRef c
= get_collection(cid
);
544 ObjectRef o
= c
->get_object(oid
);
547 std::lock_guard
<std::mutex
> lock(o
->omap_mutex
);
548 *header
= o
->omap_header
;
552 int MemStore::omap_get_keys(
553 const coll_t
& cid
, ///< [in] Collection containing oid
554 const ghobject_t
&oid
, ///< [in] Object containing omap
555 set
<string
> *keys
///< [out] Keys defined on oid
558 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
559 CollectionRef c
= get_collection(cid
);
563 ObjectRef o
= c
->get_object(oid
);
566 std::lock_guard
<std::mutex
> lock(o
->omap_mutex
);
567 for (map
<string
,bufferlist
>::iterator p
= o
->omap
.begin();
570 keys
->insert(p
->first
);
574 int MemStore::omap_get_values(
575 const coll_t
& cid
, ///< [in] Collection containing oid
576 const ghobject_t
&oid
, ///< [in] Object containing omap
577 const set
<string
> &keys
, ///< [in] Keys to get
578 map
<string
, bufferlist
> *out
///< [out] Returned keys and values
581 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
582 CollectionRef c
= get_collection(cid
);
586 ObjectRef o
= c
->get_object(oid
);
589 std::lock_guard
<std::mutex
> lock(o
->omap_mutex
);
590 for (set
<string
>::const_iterator p
= keys
.begin();
593 map
<string
,bufferlist
>::iterator q
= o
->omap
.find(*p
);
594 if (q
!= o
->omap
.end())
600 int MemStore::omap_check_keys(
601 const coll_t
& cid
, ///< [in] Collection containing oid
602 const ghobject_t
&oid
, ///< [in] Object containing omap
603 const set
<string
> &keys
, ///< [in] Keys to check
604 set
<string
> *out
///< [out] Subset of keys defined on oid
607 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
608 CollectionRef c
= get_collection(cid
);
612 ObjectRef o
= c
->get_object(oid
);
615 std::lock_guard
<std::mutex
> lock(o
->omap_mutex
);
616 for (set
<string
>::const_iterator p
= keys
.begin();
619 map
<string
,bufferlist
>::iterator q
= o
->omap
.find(*p
);
620 if (q
!= o
->omap
.end())
626 class MemStore::OmapIteratorImpl
: public ObjectMap::ObjectMapIteratorImpl
{
629 map
<string
,bufferlist
>::iterator it
;
631 OmapIteratorImpl(CollectionRef c
, ObjectRef o
)
632 : c(c
), o(o
), it(o
->omap
.begin()) {}
634 int seek_to_first() override
{
635 std::lock_guard
<std::mutex
>(o
->omap_mutex
);
636 it
= o
->omap
.begin();
639 int upper_bound(const string
&after
) override
{
640 std::lock_guard
<std::mutex
>(o
->omap_mutex
);
641 it
= o
->omap
.upper_bound(after
);
644 int lower_bound(const string
&to
) override
{
645 std::lock_guard
<std::mutex
>(o
->omap_mutex
);
646 it
= o
->omap
.lower_bound(to
);
649 bool valid() override
{
650 std::lock_guard
<std::mutex
>(o
->omap_mutex
);
651 return it
!= o
->omap
.end();
653 int next(bool validate
=true) override
{
654 std::lock_guard
<std::mutex
>(o
->omap_mutex
);
658 string
key() override
{
659 std::lock_guard
<std::mutex
>(o
->omap_mutex
);
662 bufferlist
value() override
{
663 std::lock_guard
<std::mutex
>(o
->omap_mutex
);
666 int status() override
{
671 ObjectMap::ObjectMapIterator
MemStore::get_omap_iterator(const coll_t
& cid
,
672 const ghobject_t
& oid
)
674 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
675 CollectionRef c
= get_collection(cid
);
677 return ObjectMap::ObjectMapIterator();
679 ObjectRef o
= c
->get_object(oid
);
681 return ObjectMap::ObjectMapIterator();
682 return ObjectMap::ObjectMapIterator(new OmapIteratorImpl(c
, o
));
689 int MemStore::queue_transactions(Sequencer
*osr
,
690 vector
<Transaction
>& tls
,
692 ThreadPool::TPHandle
*handle
)
694 // because memstore operations are synchronous, we can implement the
695 // Sequencer with a mutex. this guarantees ordering on a given sequencer,
696 // while allowing operations on different sequencers to happen in parallel
697 struct OpSequencer
: public Sequencer_impl
{
698 OpSequencer(CephContext
* cct
) :
699 Sequencer_impl(cct
) {}
701 void flush() override
{}
702 bool flush_commit(Context
*) override
{ return true; }
705 std::unique_lock
<std::mutex
> lock
;
708 osr
->p
= new OpSequencer(cct
);
710 auto seq
= static_cast<OpSequencer
*>(osr
->p
.get());
711 lock
= std::unique_lock
<std::mutex
>(seq
->mutex
);
714 for (vector
<Transaction
>::iterator p
= tls
.begin(); p
!= tls
.end(); ++p
) {
715 // poke the TPHandle heartbeat just to exercise that code path
717 handle
->reset_tp_timeout();
722 Context
*on_apply
= NULL
, *on_apply_sync
= NULL
, *on_commit
= NULL
;
723 ObjectStore::Transaction::collect_contexts(tls
, &on_apply
, &on_commit
,
726 on_apply_sync
->complete(0);
728 finisher
.queue(on_apply
);
730 finisher
.queue(on_commit
);
734 void MemStore::_do_transaction(Transaction
& t
)
736 Transaction::iterator i
= t
.begin();
739 while (i
.have_op()) {
740 Transaction::Op
*op
= i
.decode_op();
744 case Transaction::OP_NOP
:
746 case Transaction::OP_TOUCH
:
748 coll_t cid
= i
.get_cid(op
->cid
);
749 ghobject_t oid
= i
.get_oid(op
->oid
);
750 r
= _touch(cid
, oid
);
754 case Transaction::OP_WRITE
:
756 coll_t cid
= i
.get_cid(op
->cid
);
757 ghobject_t oid
= i
.get_oid(op
->oid
);
758 uint64_t off
= op
->off
;
759 uint64_t len
= op
->len
;
760 uint32_t fadvise_flags
= i
.get_fadvise_flags();
763 r
= _write(cid
, oid
, off
, len
, bl
, fadvise_flags
);
767 case Transaction::OP_ZERO
:
769 coll_t cid
= i
.get_cid(op
->cid
);
770 ghobject_t oid
= i
.get_oid(op
->oid
);
771 uint64_t off
= op
->off
;
772 uint64_t len
= op
->len
;
773 r
= _zero(cid
, oid
, off
, len
);
777 case Transaction::OP_TRIMCACHE
:
783 case Transaction::OP_TRUNCATE
:
785 coll_t cid
= i
.get_cid(op
->cid
);
786 ghobject_t oid
= i
.get_oid(op
->oid
);
787 uint64_t off
= op
->off
;
788 r
= _truncate(cid
, oid
, off
);
792 case Transaction::OP_REMOVE
:
794 coll_t cid
= i
.get_cid(op
->cid
);
795 ghobject_t oid
= i
.get_oid(op
->oid
);
796 r
= _remove(cid
, oid
);
800 case Transaction::OP_SETATTR
:
802 coll_t cid
= i
.get_cid(op
->cid
);
803 ghobject_t oid
= i
.get_oid(op
->oid
);
804 string name
= i
.decode_string();
807 map
<string
, bufferptr
> to_set
;
808 to_set
[name
] = bufferptr(bl
.c_str(), bl
.length());
809 r
= _setattrs(cid
, oid
, to_set
);
813 case Transaction::OP_SETATTRS
:
815 coll_t cid
= i
.get_cid(op
->cid
);
816 ghobject_t oid
= i
.get_oid(op
->oid
);
817 map
<string
, bufferptr
> aset
;
818 i
.decode_attrset(aset
);
819 r
= _setattrs(cid
, oid
, aset
);
823 case Transaction::OP_RMATTR
:
825 coll_t cid
= i
.get_cid(op
->cid
);
826 ghobject_t oid
= i
.get_oid(op
->oid
);
827 string name
= i
.decode_string();
828 r
= _rmattr(cid
, oid
, name
.c_str());
832 case Transaction::OP_RMATTRS
:
834 coll_t cid
= i
.get_cid(op
->cid
);
835 ghobject_t oid
= i
.get_oid(op
->oid
);
836 r
= _rmattrs(cid
, oid
);
840 case Transaction::OP_CLONE
:
842 coll_t cid
= i
.get_cid(op
->cid
);
843 ghobject_t oid
= i
.get_oid(op
->oid
);
844 ghobject_t noid
= i
.get_oid(op
->dest_oid
);
845 r
= _clone(cid
, oid
, noid
);
849 case Transaction::OP_CLONERANGE
:
851 coll_t cid
= i
.get_cid(op
->cid
);
852 ghobject_t oid
= i
.get_oid(op
->oid
);
853 ghobject_t noid
= i
.get_oid(op
->dest_oid
);
854 uint64_t off
= op
->off
;
855 uint64_t len
= op
->len
;
856 r
= _clone_range(cid
, oid
, noid
, off
, len
, off
);
860 case Transaction::OP_CLONERANGE2
:
862 coll_t cid
= i
.get_cid(op
->cid
);
863 ghobject_t oid
= i
.get_oid(op
->oid
);
864 ghobject_t noid
= i
.get_oid(op
->dest_oid
);
865 uint64_t srcoff
= op
->off
;
866 uint64_t len
= op
->len
;
867 uint64_t dstoff
= op
->dest_off
;
868 r
= _clone_range(cid
, oid
, noid
, srcoff
, len
, dstoff
);
872 case Transaction::OP_MKCOLL
:
874 coll_t cid
= i
.get_cid(op
->cid
);
875 r
= _create_collection(cid
, op
->split_bits
);
879 case Transaction::OP_COLL_HINT
:
881 coll_t cid
= i
.get_cid(op
->cid
);
882 uint32_t type
= op
->hint_type
;
885 bufferlist::iterator hiter
= hint
.begin();
886 if (type
== Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS
) {
889 ::decode(pg_num
, hiter
);
890 ::decode(num_objs
, hiter
);
891 r
= _collection_hint_expected_num_objs(cid
, pg_num
, num_objs
);
894 dout(10) << "Unrecognized collection hint type: " << type
<< dendl
;
899 case Transaction::OP_RMCOLL
:
901 coll_t cid
= i
.get_cid(op
->cid
);
902 r
= _destroy_collection(cid
);
906 case Transaction::OP_COLL_ADD
:
908 coll_t ocid
= i
.get_cid(op
->cid
);
909 coll_t ncid
= i
.get_cid(op
->dest_cid
);
910 ghobject_t oid
= i
.get_oid(op
->oid
);
911 r
= _collection_add(ncid
, ocid
, oid
);
915 case Transaction::OP_COLL_REMOVE
:
917 coll_t cid
= i
.get_cid(op
->cid
);
918 ghobject_t oid
= i
.get_oid(op
->oid
);
919 r
= _remove(cid
, oid
);
923 case Transaction::OP_COLL_MOVE
:
924 assert(0 == "deprecated");
927 case Transaction::OP_COLL_MOVE_RENAME
:
929 coll_t oldcid
= i
.get_cid(op
->cid
);
930 ghobject_t oldoid
= i
.get_oid(op
->oid
);
931 coll_t newcid
= i
.get_cid(op
->dest_cid
);
932 ghobject_t newoid
= i
.get_oid(op
->dest_oid
);
933 r
= _collection_move_rename(oldcid
, oldoid
, newcid
, newoid
);
939 case Transaction::OP_TRY_RENAME
:
941 coll_t cid
= i
.get_cid(op
->cid
);
942 ghobject_t oldoid
= i
.get_oid(op
->oid
);
943 ghobject_t newoid
= i
.get_oid(op
->dest_oid
);
944 r
= _collection_move_rename(cid
, oldoid
, cid
, newoid
);
950 case Transaction::OP_COLL_SETATTR
:
952 assert(0 == "not implemented");
956 case Transaction::OP_COLL_RMATTR
:
958 assert(0 == "not implemented");
962 case Transaction::OP_COLL_RENAME
:
964 assert(0 == "not implemented");
968 case Transaction::OP_OMAP_CLEAR
:
970 coll_t cid
= i
.get_cid(op
->cid
);
971 ghobject_t oid
= i
.get_oid(op
->oid
);
972 r
= _omap_clear(cid
, oid
);
975 case Transaction::OP_OMAP_SETKEYS
:
977 coll_t cid
= i
.get_cid(op
->cid
);
978 ghobject_t oid
= i
.get_oid(op
->oid
);
980 i
.decode_attrset_bl(&aset_bl
);
981 r
= _omap_setkeys(cid
, oid
, aset_bl
);
984 case Transaction::OP_OMAP_RMKEYS
:
986 coll_t cid
= i
.get_cid(op
->cid
);
987 ghobject_t oid
= i
.get_oid(op
->oid
);
989 i
.decode_keyset_bl(&keys_bl
);
990 r
= _omap_rmkeys(cid
, oid
, keys_bl
);
993 case Transaction::OP_OMAP_RMKEYRANGE
:
995 coll_t cid
= i
.get_cid(op
->cid
);
996 ghobject_t oid
= i
.get_oid(op
->oid
);
998 first
= i
.decode_string();
999 last
= i
.decode_string();
1000 r
= _omap_rmkeyrange(cid
, oid
, first
, last
);
1003 case Transaction::OP_OMAP_SETHEADER
:
1005 coll_t cid
= i
.get_cid(op
->cid
);
1006 ghobject_t oid
= i
.get_oid(op
->oid
);
1009 r
= _omap_setheader(cid
, oid
, bl
);
1012 case Transaction::OP_SPLIT_COLLECTION
:
1013 assert(0 == "deprecated");
1015 case Transaction::OP_SPLIT_COLLECTION2
:
1017 coll_t cid
= i
.get_cid(op
->cid
);
1018 uint32_t bits
= op
->split_bits
;
1019 uint32_t rem
= op
->split_rem
;
1020 coll_t dest
= i
.get_cid(op
->dest_cid
);
1021 r
= _split_collection(cid
, bits
, rem
, dest
);
1025 case Transaction::OP_SETALLOCHINT
:
1032 derr
<< "bad op " << op
->op
<< dendl
;
1039 if (r
== -ENOENT
&& !(op
->op
== Transaction::OP_CLONERANGE
||
1040 op
->op
== Transaction::OP_CLONE
||
1041 op
->op
== Transaction::OP_CLONERANGE2
||
1042 op
->op
== Transaction::OP_COLL_ADD
))
1043 // -ENOENT is usually okay
1049 const char *msg
= "unexpected error code";
1051 if (r
== -ENOENT
&& (op
->op
== Transaction::OP_CLONERANGE
||
1052 op
->op
== Transaction::OP_CLONE
||
1053 op
->op
== Transaction::OP_CLONERANGE2
))
1054 msg
= "ENOENT on clone suggests osd bug";
1057 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
1058 // by partially applying transactions.
1059 msg
= "ENOSPC from MemStore, misconfigured cluster or insufficient memory";
1061 if (r
== -ENOTEMPTY
) {
1062 msg
= "ENOTEMPTY suggests garbage data in osd data dir";
1066 derr
<< " error " << cpp_strerror(r
) << " not handled on operation " << op
->op
1067 << " (op " << pos
<< ", counting from 0)" << dendl
;
1068 dout(0) << msg
<< dendl
;
1069 dout(0) << " transaction dump:\n";
1070 JSONFormatter
f(true);
1071 f
.open_object_section("transaction");
1076 assert(0 == "unexpected error");
1084 int MemStore::_touch(const coll_t
& cid
, const ghobject_t
& oid
)
1086 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1087 CollectionRef c
= get_collection(cid
);
1091 c
->get_or_create_object(oid
);
1095 int MemStore::_write(const coll_t
& cid
, const ghobject_t
& oid
,
1096 uint64_t offset
, size_t len
, const bufferlist
& bl
,
1097 uint32_t fadvise_flags
)
1099 dout(10) << __func__
<< " " << cid
<< " " << oid
<< " "
1100 << offset
<< "~" << len
<< dendl
;
1101 assert(len
== bl
.length());
1103 CollectionRef c
= get_collection(cid
);
1107 ObjectRef o
= c
->get_or_create_object(oid
);
1109 const ssize_t old_size
= o
->get_size();
1110 o
->write(offset
, bl
);
1111 used_bytes
+= (o
->get_size() - old_size
);
1117 int MemStore::_zero(const coll_t
& cid
, const ghobject_t
& oid
,
1118 uint64_t offset
, size_t len
)
1120 dout(10) << __func__
<< " " << cid
<< " " << oid
<< " " << offset
<< "~"
1123 bl
.append_zero(len
);
1124 return _write(cid
, oid
, offset
, len
, bl
);
1127 int MemStore::_truncate(const coll_t
& cid
, const ghobject_t
& oid
, uint64_t size
)
1129 dout(10) << __func__
<< " " << cid
<< " " << oid
<< " " << size
<< dendl
;
1130 CollectionRef c
= get_collection(cid
);
1134 ObjectRef o
= c
->get_object(oid
);
1137 const ssize_t old_size
= o
->get_size();
1138 int r
= o
->truncate(size
);
1139 used_bytes
+= (o
->get_size() - old_size
);
1143 int MemStore::_remove(const coll_t
& cid
, const ghobject_t
& oid
)
1145 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1146 CollectionRef c
= get_collection(cid
);
1149 RWLock::WLocker
l(c
->lock
);
1151 auto i
= c
->object_hash
.find(oid
);
1152 if (i
== c
->object_hash
.end())
1154 used_bytes
-= i
->second
->get_size();
1155 c
->object_hash
.erase(i
);
1156 c
->object_map
.erase(oid
);
1161 int MemStore::_setattrs(const coll_t
& cid
, const ghobject_t
& oid
,
1162 map
<string
,bufferptr
>& aset
)
1164 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1165 CollectionRef c
= get_collection(cid
);
1169 ObjectRef o
= c
->get_object(oid
);
1172 std::lock_guard
<std::mutex
> lock(o
->xattr_mutex
);
1173 for (map
<string
,bufferptr
>::const_iterator p
= aset
.begin(); p
!= aset
.end(); ++p
)
1174 o
->xattr
[p
->first
] = p
->second
;
1178 int MemStore::_rmattr(const coll_t
& cid
, const ghobject_t
& oid
, const char *name
)
1180 dout(10) << __func__
<< " " << cid
<< " " << oid
<< " " << name
<< dendl
;
1181 CollectionRef c
= get_collection(cid
);
1185 ObjectRef o
= c
->get_object(oid
);
1188 std::lock_guard
<std::mutex
> lock(o
->xattr_mutex
);
1189 auto i
= o
->xattr
.find(name
);
1190 if (i
== o
->xattr
.end())
1196 int MemStore::_rmattrs(const coll_t
& cid
, const ghobject_t
& oid
)
1198 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1199 CollectionRef c
= get_collection(cid
);
1203 ObjectRef o
= c
->get_object(oid
);
1206 std::lock_guard
<std::mutex
> lock(o
->xattr_mutex
);
1211 int MemStore::_clone(const coll_t
& cid
, const ghobject_t
& oldoid
,
1212 const ghobject_t
& newoid
)
1214 dout(10) << __func__
<< " " << cid
<< " " << oldoid
1215 << " -> " << newoid
<< dendl
;
1216 CollectionRef c
= get_collection(cid
);
1220 ObjectRef oo
= c
->get_object(oldoid
);
1223 ObjectRef no
= c
->get_or_create_object(newoid
);
1224 used_bytes
+= oo
->get_size() - no
->get_size();
1225 no
->clone(oo
.get(), 0, oo
->get_size(), 0);
1227 // take xattr and omap locks with std::lock()
1228 std::unique_lock
<std::mutex
>
1229 ox_lock(oo
->xattr_mutex
, std::defer_lock
),
1230 nx_lock(no
->xattr_mutex
, std::defer_lock
),
1231 oo_lock(oo
->omap_mutex
, std::defer_lock
),
1232 no_lock(no
->omap_mutex
, std::defer_lock
);
1233 std::lock(ox_lock
, nx_lock
, oo_lock
, no_lock
);
1235 no
->omap_header
= oo
->omap_header
;
1236 no
->omap
= oo
->omap
;
1237 no
->xattr
= oo
->xattr
;
1241 int MemStore::_clone_range(const coll_t
& cid
, const ghobject_t
& oldoid
,
1242 const ghobject_t
& newoid
,
1243 uint64_t srcoff
, uint64_t len
, uint64_t dstoff
)
1245 dout(10) << __func__
<< " " << cid
<< " "
1246 << oldoid
<< " " << srcoff
<< "~" << len
<< " -> "
1247 << newoid
<< " " << dstoff
<< "~" << len
1249 CollectionRef c
= get_collection(cid
);
1253 ObjectRef oo
= c
->get_object(oldoid
);
1256 ObjectRef no
= c
->get_or_create_object(newoid
);
1257 if (srcoff
>= oo
->get_size())
1259 if (srcoff
+ len
>= oo
->get_size())
1260 len
= oo
->get_size() - srcoff
;
1262 const ssize_t old_size
= no
->get_size();
1263 no
->clone(oo
.get(), srcoff
, len
, dstoff
);
1264 used_bytes
+= (no
->get_size() - old_size
);
1269 int MemStore::_omap_clear(const coll_t
& cid
, const ghobject_t
&oid
)
1271 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1272 CollectionRef c
= get_collection(cid
);
1276 ObjectRef o
= c
->get_object(oid
);
1279 std::lock_guard
<std::mutex
> lock(o
->omap_mutex
);
1281 o
->omap_header
.clear();
1285 int MemStore::_omap_setkeys(const coll_t
& cid
, const ghobject_t
&oid
,
1286 bufferlist
& aset_bl
)
1288 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1289 CollectionRef c
= get_collection(cid
);
1293 ObjectRef o
= c
->get_object(oid
);
1296 std::lock_guard
<std::mutex
> lock(o
->omap_mutex
);
1297 bufferlist::iterator p
= aset_bl
.begin();
1303 ::decode(o
->omap
[key
], p
);
1308 int MemStore::_omap_rmkeys(const coll_t
& cid
, const ghobject_t
&oid
,
1309 bufferlist
& keys_bl
)
1311 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1312 CollectionRef c
= get_collection(cid
);
1316 ObjectRef o
= c
->get_object(oid
);
1319 std::lock_guard
<std::mutex
> lock(o
->omap_mutex
);
1320 bufferlist::iterator p
= keys_bl
.begin();
1331 int MemStore::_omap_rmkeyrange(const coll_t
& cid
, const ghobject_t
&oid
,
1332 const string
& first
, const string
& last
)
1334 dout(10) << __func__
<< " " << cid
<< " " << oid
<< " " << first
1335 << " " << last
<< dendl
;
1336 CollectionRef c
= get_collection(cid
);
1340 ObjectRef o
= c
->get_object(oid
);
1343 std::lock_guard
<std::mutex
> lock(o
->omap_mutex
);
1344 map
<string
,bufferlist
>::iterator p
= o
->omap
.lower_bound(first
);
1345 map
<string
,bufferlist
>::iterator e
= o
->omap
.lower_bound(last
);
1346 o
->omap
.erase(p
, e
);
1350 int MemStore::_omap_setheader(const coll_t
& cid
, const ghobject_t
&oid
,
1351 const bufferlist
&bl
)
1353 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1354 CollectionRef c
= get_collection(cid
);
1358 ObjectRef o
= c
->get_object(oid
);
1361 std::lock_guard
<std::mutex
> lock(o
->omap_mutex
);
1362 o
->omap_header
= bl
;
1366 int MemStore::_create_collection(const coll_t
& cid
, int bits
)
1368 dout(10) << __func__
<< " " << cid
<< dendl
;
1369 RWLock::WLocker
l(coll_lock
);
1370 auto result
= coll_map
.insert(std::make_pair(cid
, CollectionRef()));
1373 result
.first
->second
.reset(new Collection(cct
, cid
));
1374 result
.first
->second
->bits
= bits
;
1378 int MemStore::_destroy_collection(const coll_t
& cid
)
1380 dout(10) << __func__
<< " " << cid
<< dendl
;
1381 RWLock::WLocker
l(coll_lock
);
1382 ceph::unordered_map
<coll_t
,CollectionRef
>::iterator cp
= coll_map
.find(cid
);
1383 if (cp
== coll_map
.end())
1386 RWLock::RLocker
l2(cp
->second
->lock
);
1387 if (!cp
->second
->object_map
.empty())
1389 cp
->second
->exists
= false;
1391 used_bytes
-= cp
->second
->used_bytes();
1396 int MemStore::_collection_add(const coll_t
& cid
, const coll_t
& ocid
, const ghobject_t
& oid
)
1398 dout(10) << __func__
<< " " << cid
<< " " << ocid
<< " " << oid
<< dendl
;
1399 CollectionRef c
= get_collection(cid
);
1402 CollectionRef oc
= get_collection(ocid
);
1405 RWLock::WLocker
l1(MIN(&(*c
), &(*oc
))->lock
);
1406 RWLock::WLocker
l2(MAX(&(*c
), &(*oc
))->lock
);
1408 if (c
->object_hash
.count(oid
))
1410 if (oc
->object_hash
.count(oid
) == 0)
1412 ObjectRef o
= oc
->object_hash
[oid
];
1413 c
->object_map
[oid
] = o
;
1414 c
->object_hash
[oid
] = o
;
1418 int MemStore::_collection_move_rename(const coll_t
& oldcid
, const ghobject_t
& oldoid
,
1419 coll_t cid
, const ghobject_t
& oid
)
1421 dout(10) << __func__
<< " " << oldcid
<< " " << oldoid
<< " -> "
1422 << cid
<< " " << oid
<< dendl
;
1423 CollectionRef c
= get_collection(cid
);
1426 CollectionRef oc
= get_collection(oldcid
);
1430 // note: c and oc may be the same
1431 assert(&(*c
) == &(*oc
));
1432 c
->lock
.get_write();
1435 if (c
->object_hash
.count(oid
))
1438 if (oc
->object_hash
.count(oldoid
) == 0)
1441 ObjectRef o
= oc
->object_hash
[oldoid
];
1442 c
->object_map
[oid
] = o
;
1443 c
->object_hash
[oid
] = o
;
1444 oc
->object_map
.erase(oldoid
);
1445 oc
->object_hash
.erase(oldoid
);
1449 c
->lock
.put_write();
1453 int MemStore::_split_collection(const coll_t
& cid
, uint32_t bits
, uint32_t match
,
1456 dout(10) << __func__
<< " " << cid
<< " " << bits
<< " " << match
<< " "
1458 CollectionRef sc
= get_collection(cid
);
1461 CollectionRef dc
= get_collection(dest
);
1464 RWLock::WLocker
l1(MIN(&(*sc
), &(*dc
))->lock
);
1465 RWLock::WLocker
l2(MAX(&(*sc
), &(*dc
))->lock
);
1467 map
<ghobject_t
,ObjectRef
>::iterator p
= sc
->object_map
.begin();
1468 while (p
!= sc
->object_map
.end()) {
1469 if (p
->first
.match(bits
, match
)) {
1470 dout(20) << " moving " << p
->first
<< dendl
;
1471 dc
->object_map
.insert(make_pair(p
->first
, p
->second
));
1472 dc
->object_hash
.insert(make_pair(p
->first
, p
->second
));
1473 sc
->object_hash
.erase(p
->first
);
1474 sc
->object_map
.erase(p
++);
1481 assert(dc
->bits
== (int)bits
);
1486 struct BufferlistObject
: public MemStore::Object
{
1490 size_t get_size() const override
{ return data
.length(); }
1492 int read(uint64_t offset
, uint64_t len
, bufferlist
&bl
) override
;
1493 int write(uint64_t offset
, const bufferlist
&bl
) override
;
1494 int clone(Object
*src
, uint64_t srcoff
, uint64_t len
,
1495 uint64_t dstoff
) override
;
1496 int truncate(uint64_t offset
) override
;
1498 void encode(bufferlist
& bl
) const override
{
1499 ENCODE_START(1, 1, bl
);
1504 void decode(bufferlist::iterator
& p
) override
{
1513 int BufferlistObject::read(uint64_t offset
, uint64_t len
,
1516 std::lock_guard
<Spinlock
> lock(mutex
);
1517 bl
.substr_of(data
, offset
, len
);
1521 int BufferlistObject::write(uint64_t offset
, const bufferlist
&src
)
1523 unsigned len
= src
.length();
1525 std::lock_guard
<Spinlock
> lock(mutex
);
1529 if (get_size() >= offset
) {
1530 newdata
.substr_of(data
, 0, offset
);
1533 newdata
.substr_of(data
, 0, get_size());
1535 newdata
.append_zero(offset
- get_size());
1538 newdata
.append(src
);
1541 if (get_size() > offset
+ len
) {
1543 tail
.substr_of(data
, offset
+ len
, get_size() - (offset
+ len
));
1544 newdata
.append(tail
);
1547 data
.claim(newdata
);
1551 int BufferlistObject::clone(Object
*src
, uint64_t srcoff
,
1552 uint64_t len
, uint64_t dstoff
)
1554 auto srcbl
= dynamic_cast<BufferlistObject
*>(src
);
1555 if (srcbl
== nullptr)
1560 std::lock_guard
<Spinlock
> lock(srcbl
->mutex
);
1561 if (srcoff
== dstoff
&& len
== src
->get_size()) {
1565 bl
.substr_of(srcbl
->data
, srcoff
, len
);
1567 return write(dstoff
, bl
);
1570 int BufferlistObject::truncate(uint64_t size
)
1572 std::lock_guard
<Spinlock
> lock(mutex
);
1573 if (get_size() > size
) {
1575 bl
.substr_of(data
, 0, size
);
1577 } else if (get_size() == size
) {
1580 data
.append_zero(size
- get_size());
1587 struct MemStore::PageSetObject
: public Object
{
1590 #if defined(__GLIBCXX__)
1591 // use a thread-local vector for the pages returned by PageSet, so we
1592 // can avoid allocations in read/write()
1593 static thread_local
PageSet::page_vector tls_pages
;
1596 explicit PageSetObject(size_t page_size
) : data(page_size
), data_len(0) {}
1598 size_t get_size() const override
{ return data_len
; }
1600 int read(uint64_t offset
, uint64_t len
, bufferlist
&bl
) override
;
1601 int write(uint64_t offset
, const bufferlist
&bl
) override
;
1602 int clone(Object
*src
, uint64_t srcoff
, uint64_t len
,
1603 uint64_t dstoff
) override
;
1604 int truncate(uint64_t offset
) override
;
1606 void encode(bufferlist
& bl
) const override
{
1607 ENCODE_START(1, 1, bl
);
1608 ::encode(data_len
, bl
);
1613 void decode(bufferlist::iterator
& p
) override
{
1615 ::decode(data_len
, p
);
1622 #if defined(__GLIBCXX__)
1623 // use a thread-local vector for the pages returned by PageSet, so we
1624 // can avoid allocations in read/write()
1625 thread_local
PageSet::page_vector
MemStore::PageSetObject::tls_pages
;
1626 #define DEFINE_PAGE_VECTOR(name)
1628 #define DEFINE_PAGE_VECTOR(name) PageSet::page_vector name;
1631 int MemStore::PageSetObject::read(uint64_t offset
, uint64_t len
, bufferlist
& bl
)
1633 const auto start
= offset
;
1634 const auto end
= offset
+ len
;
1635 auto remaining
= len
;
1637 DEFINE_PAGE_VECTOR(tls_pages
);
1638 data
.get_range(offset
, len
, tls_pages
);
1640 // allocate a buffer for the data
1641 buffer::ptr
buf(len
);
1643 auto p
= tls_pages
.begin();
1645 // no more pages in range
1646 if (p
== tls_pages
.end() || (*p
)->offset
>= end
) {
1647 buf
.zero(offset
- start
, remaining
);
1652 // fill any holes between pages with zeroes
1653 if (page
->offset
> offset
) {
1654 const auto count
= std::min(remaining
, page
->offset
- offset
);
1655 buf
.zero(offset
- start
, count
);
1657 offset
= page
->offset
;
1663 const auto page_offset
= offset
- page
->offset
;
1664 const auto count
= min(remaining
, data
.get_page_size() - page_offset
);
1666 buf
.copy_in(offset
- start
, count
, page
->data
+ page_offset
);
1674 tls_pages
.clear(); // drop page refs
1676 bl
.append(std::move(buf
));
1680 int MemStore::PageSetObject::write(uint64_t offset
, const bufferlist
&src
)
1682 unsigned len
= src
.length();
1684 DEFINE_PAGE_VECTOR(tls_pages
);
1685 // make sure the page range is allocated
1686 data
.alloc_range(offset
, src
.length(), tls_pages
);
1688 auto page
= tls_pages
.begin();
1690 auto p
= src
.begin();
1692 unsigned page_offset
= offset
- (*page
)->offset
;
1693 unsigned pageoff
= data
.get_page_size() - page_offset
;
1694 unsigned count
= min(len
, pageoff
);
1695 p
.copy(count
, (*page
)->data
+ page_offset
);
1698 if (count
== pageoff
)
1701 if (data_len
< offset
)
1703 tls_pages
.clear(); // drop page refs
1707 int MemStore::PageSetObject::clone(Object
*src
, uint64_t srcoff
,
1708 uint64_t len
, uint64_t dstoff
)
1710 const int64_t delta
= dstoff
- srcoff
;
1712 auto &src_data
= static_cast<PageSetObject
*>(src
)->data
;
1713 const uint64_t src_page_size
= src_data
.get_page_size();
1715 auto &dst_data
= data
;
1716 const auto dst_page_size
= dst_data
.get_page_size();
1718 DEFINE_PAGE_VECTOR(tls_pages
);
1719 PageSet::page_vector dst_pages
;
1722 // limit to 16 pages at a time so tls_pages doesn't balloon in size
1723 auto count
= std::min(len
, (uint64_t)src_page_size
* 16);
1724 src_data
.get_range(srcoff
, count
, tls_pages
);
1726 // allocate the destination range
1727 // TODO: avoid allocating pages for holes in the source range
1728 dst_data
.alloc_range(srcoff
+ delta
, count
, dst_pages
);
1729 auto dst_iter
= dst_pages
.begin();
1731 for (auto &src_page
: tls_pages
) {
1732 auto sbegin
= std::max(srcoff
, src_page
->offset
);
1733 auto send
= std::min(srcoff
+ count
, src_page
->offset
+ src_page_size
);
1735 // zero-fill holes before src_page
1736 if (srcoff
< sbegin
) {
1737 while (dst_iter
!= dst_pages
.end()) {
1738 auto &dst_page
= *dst_iter
;
1739 auto dbegin
= std::max(srcoff
+ delta
, dst_page
->offset
);
1740 auto dend
= std::min(sbegin
+ delta
, dst_page
->offset
+ dst_page_size
);
1741 std::fill(dst_page
->data
+ dbegin
- dst_page
->offset
,
1742 dst_page
->data
+ dend
- dst_page
->offset
, 0);
1743 if (dend
< dst_page
->offset
+ dst_page_size
)
1747 const auto c
= sbegin
- srcoff
;
1752 // copy data from src page to dst pages
1753 while (dst_iter
!= dst_pages
.end()) {
1754 auto &dst_page
= *dst_iter
;
1755 auto dbegin
= std::max(sbegin
+ delta
, dst_page
->offset
);
1756 auto dend
= std::min(send
+ delta
, dst_page
->offset
+ dst_page_size
);
1758 std::copy(src_page
->data
+ (dbegin
- delta
) - src_page
->offset
,
1759 src_page
->data
+ (dend
- delta
) - src_page
->offset
,
1760 dst_page
->data
+ dbegin
- dst_page
->offset
);
1761 if (dend
< dst_page
->offset
+ dst_page_size
)
1766 const auto c
= send
- sbegin
;
1770 dstoff
= send
+ delta
;
1772 tls_pages
.clear(); // drop page refs
1774 // zero-fill holes after the last src_page
1776 while (dst_iter
!= dst_pages
.end()) {
1777 auto &dst_page
= *dst_iter
;
1778 auto dbegin
= std::max(dstoff
, dst_page
->offset
);
1779 auto dend
= std::min(dstoff
+ count
, dst_page
->offset
+ dst_page_size
);
1780 std::fill(dst_page
->data
+ dbegin
- dst_page
->offset
,
1781 dst_page
->data
+ dend
- dst_page
->offset
, 0);
1788 dst_pages
.clear(); // drop page refs
1791 // update object size
1792 if (data_len
< dstoff
)
1797 int MemStore::PageSetObject::truncate(uint64_t size
)
1799 data
.free_pages_after(size
);
1802 const auto page_size
= data
.get_page_size();
1803 const auto page_offset
= size
& ~(page_size
-1);
1804 if (page_offset
== size
)
1807 DEFINE_PAGE_VECTOR(tls_pages
);
1808 // write zeroes to the rest of the last page
1809 data
.get_range(page_offset
, page_size
, tls_pages
);
1810 if (tls_pages
.empty())
1813 auto page
= tls_pages
.begin();
1814 auto data
= (*page
)->data
;
1815 std::fill(data
+ (size
- page_offset
), data
+ page_size
, 0);
1816 tls_pages
.clear(); // drop page ref
1821 MemStore::ObjectRef
MemStore::Collection::create_object() const {
1823 return new PageSetObject(cct
->_conf
->memstore_page_size
);
1824 return new BufferlistObject();