1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #ifdef HAVE_SYS_MOUNT_H
17 #include <sys/mount.h>
20 #ifdef HAVE_SYS_PARAM_H
21 #include <sys/param.h>
24 #include "include/types.h"
25 #include "include/stringify.h"
26 #include "include/unordered_map.h"
27 #include "include/memory.h"
28 #include "common/errno.h"
30 #include "include/compat.h"
32 #define dout_context cct
33 #define dout_subsys ceph_subsys_filestore
35 #define dout_prefix *_dout << "memstore(" << path << ") "
37 // for comparing collections for lock ordering
38 bool operator>(const MemStore::CollectionRef
& l
,
39 const MemStore::CollectionRef
& r
)
41 return (unsigned long)l
.get() > (unsigned long)r
.get();
54 int MemStore::umount()
56 finisher
.wait_for_empty();
63 dout(10) << __func__
<< dendl
;
65 set
<coll_t
> collections
;
66 for (ceph::unordered_map
<coll_t
,CollectionRef
>::iterator p
= coll_map
.begin();
69 dout(20) << __func__
<< " coll " << p
->first
<< " " << p
->second
<< dendl
;
70 collections
.insert(p
->first
);
73 p
->second
->encode(bl
);
74 string fn
= path
+ "/" + stringify(p
->first
);
75 int r
= bl
.write_file(fn
.c_str());
80 string fn
= path
+ "/collections";
82 ::encode(collections
, bl
);
83 int r
= bl
.write_file(fn
.c_str());
90 void MemStore::dump_all()
92 Formatter
*f
= Formatter::create("json-pretty");
93 f
->open_object_section("store");
102 void MemStore::dump(Formatter
*f
)
104 f
->open_array_section("collections");
105 for (ceph::unordered_map
<coll_t
,CollectionRef
>::iterator p
= coll_map
.begin();
108 f
->open_object_section("collection");
109 f
->dump_string("name", stringify(p
->first
));
111 f
->open_array_section("xattrs");
112 for (map
<string
,bufferptr
>::iterator q
= p
->second
->xattr
.begin();
113 q
!= p
->second
->xattr
.end();
115 f
->open_object_section("xattr");
116 f
->dump_string("name", q
->first
);
117 f
->dump_int("length", q
->second
.length());
122 f
->open_array_section("objects");
123 for (map
<ghobject_t
,ObjectRef
>::iterator q
= p
->second
->object_map
.begin();
124 q
!= p
->second
->object_map
.end();
126 f
->open_object_section("object");
127 f
->dump_string("name", stringify(q
->first
));
139 int MemStore::_load()
141 dout(10) << __func__
<< dendl
;
143 string fn
= path
+ "/collections";
145 int r
= bl
.read_file(fn
.c_str(), &err
);
149 set
<coll_t
> collections
;
150 bufferlist::iterator p
= bl
.begin();
151 ::decode(collections
, p
);
153 for (set
<coll_t
>::iterator q
= collections
.begin();
154 q
!= collections
.end();
156 string fn
= path
+ "/" + stringify(*q
);
158 int r
= cbl
.read_file(fn
.c_str(), &err
);
161 CollectionRef
c(new Collection(cct
, *q
));
162 bufferlist::iterator p
= cbl
.begin();
165 used_bytes
+= c
->used_bytes();
173 void MemStore::set_fsid(uuid_d u
)
175 int r
= write_meta("fs_fsid", stringify(u
));
179 uuid_d
MemStore::get_fsid()
182 int r
= read_meta("fs_fsid", &fsid_str
);
185 bool b
= uuid
.parse(fsid_str
.c_str());
193 int r
= read_meta("fs_fsid", &fsid_str
);
196 fsid
.generate_random();
197 fsid_str
= stringify(fsid
);
198 r
= write_meta("fs_fsid", fsid_str
);
201 dout(1) << __func__
<< " new fsid " << fsid_str
<< dendl
;
205 dout(1) << __func__
<< " had fsid " << fsid_str
<< dendl
;
208 string fn
= path
+ "/collections";
209 derr
<< path
<< dendl
;
211 set
<coll_t
> collections
;
212 ::encode(collections
, bl
);
213 r
= bl
.write_file(fn
.c_str());
217 r
= write_meta("type", "memstore");
224 int MemStore::statfs(struct store_statfs_t
*st
)
226 dout(10) << __func__
<< dendl
;
228 st
->total
= cct
->_conf
->memstore_device_bytes
;
229 st
->available
= MAX(int64_t(st
->total
) - int64_t(used_bytes
), 0ll);
230 dout(10) << __func__
<< ": used_bytes: " << used_bytes
231 << "/" << cct
->_conf
->memstore_device_bytes
<< dendl
;
235 objectstore_perf_stat_t
MemStore::get_cur_stats()
238 return objectstore_perf_stat_t();
241 MemStore::CollectionRef
MemStore::get_collection(const coll_t
& cid
)
243 RWLock::RLocker
l(coll_lock
);
244 ceph::unordered_map
<coll_t
,CollectionRef
>::iterator cp
= coll_map
.find(cid
);
245 if (cp
== coll_map
.end())
246 return CollectionRef();
254 bool MemStore::exists(const coll_t
& cid
, const ghobject_t
& oid
)
256 CollectionHandle c
= get_collection(cid
);
259 return exists(c
, oid
);
262 bool MemStore::exists(CollectionHandle
&c_
, const ghobject_t
& oid
)
264 Collection
*c
= static_cast<Collection
*>(c_
.get());
265 dout(10) << __func__
<< " " << c
->get_cid() << " " << oid
<< dendl
;
269 // Perform equivalent of c->get_object_(oid) != NULL. In C++11 the
270 // shared_ptr needs to be compared to nullptr.
271 return (bool)c
->get_object(oid
);
276 const ghobject_t
& oid
,
280 CollectionHandle c
= get_collection(cid
);
283 return stat(c
, oid
, st
, allow_eio
);
287 CollectionHandle
&c_
,
288 const ghobject_t
& oid
,
292 Collection
*c
= static_cast<Collection
*>(c_
.get());
293 dout(10) << __func__
<< " " << c
->cid
<< " " << oid
<< dendl
;
296 ObjectRef o
= c
->get_object(oid
);
299 st
->st_size
= o
->get_size();
300 st
->st_blksize
= 4096;
301 st
->st_blocks
= (st
->st_size
+ st
->st_blksize
- 1) / st
->st_blksize
;
306 int MemStore::set_collection_opts(
308 const pool_opts_t
& opts
)
315 const ghobject_t
& oid
,
321 CollectionHandle c
= get_collection(cid
);
324 return read(c
, oid
, offset
, len
, bl
, op_flags
);
328 CollectionHandle
&c_
,
329 const ghobject_t
& oid
,
335 Collection
*c
= static_cast<Collection
*>(c_
.get());
336 dout(10) << __func__
<< " " << c
->cid
<< " " << oid
<< " "
337 << offset
<< "~" << len
<< dendl
;
340 ObjectRef o
= c
->get_object(oid
);
343 if (offset
>= o
->get_size())
346 if (l
== 0 && offset
== 0) // note: len == 0 means read the entire object
348 else if (offset
+ l
> o
->get_size())
349 l
= o
->get_size() - offset
;
351 return o
->read(offset
, l
, bl
);
354 int MemStore::fiemap(const coll_t
& cid
, const ghobject_t
& oid
,
355 uint64_t offset
, size_t len
, bufferlist
& bl
)
357 map
<uint64_t, uint64_t> destmap
;
358 int r
= fiemap(cid
, oid
, offset
, len
, destmap
);
360 ::encode(destmap
, bl
);
364 int MemStore::fiemap(const coll_t
& cid
, const ghobject_t
& oid
,
365 uint64_t offset
, size_t len
, map
<uint64_t, uint64_t>& destmap
)
367 dout(10) << __func__
<< " " << cid
<< " " << oid
<< " " << offset
<< "~"
369 CollectionRef c
= get_collection(cid
);
373 ObjectRef o
= c
->get_object(oid
);
377 if (offset
+ l
> o
->get_size())
378 l
= o
->get_size() - offset
;
379 if (offset
>= o
->get_size())
386 int MemStore::getattr(const coll_t
& cid
, const ghobject_t
& oid
,
387 const char *name
, bufferptr
& value
)
389 CollectionHandle c
= get_collection(cid
);
392 return getattr(c
, oid
, name
, value
);
395 int MemStore::getattr(CollectionHandle
&c_
, const ghobject_t
& oid
,
396 const char *name
, bufferptr
& value
)
398 Collection
*c
= static_cast<Collection
*>(c_
.get());
399 dout(10) << __func__
<< " " << c
->cid
<< " " << oid
<< " " << name
<< dendl
;
402 ObjectRef o
= c
->get_object(oid
);
406 std::lock_guard
<std::mutex
> lock(o
->xattr_mutex
);
407 if (!o
->xattr
.count(k
)) {
414 int MemStore::getattrs(const coll_t
& cid
, const ghobject_t
& oid
,
415 map
<string
,bufferptr
>& aset
)
417 CollectionHandle c
= get_collection(cid
);
420 return getattrs(c
, oid
, aset
);
423 int MemStore::getattrs(CollectionHandle
&c_
, const ghobject_t
& oid
,
424 map
<string
,bufferptr
>& aset
)
426 Collection
*c
= static_cast<Collection
*>(c_
.get());
427 dout(10) << __func__
<< " " << c
->cid
<< " " << oid
<< dendl
;
431 ObjectRef o
= c
->get_object(oid
);
434 std::lock_guard
<std::mutex
> lock(o
->xattr_mutex
);
439 int MemStore::list_collections(vector
<coll_t
>& ls
)
441 dout(10) << __func__
<< dendl
;
442 RWLock::RLocker
l(coll_lock
);
443 for (ceph::unordered_map
<coll_t
,CollectionRef
>::iterator p
= coll_map
.begin();
446 ls
.push_back(p
->first
);
451 bool MemStore::collection_exists(const coll_t
& cid
)
453 dout(10) << __func__
<< " " << cid
<< dendl
;
454 RWLock::RLocker
l(coll_lock
);
455 return coll_map
.count(cid
);
458 int MemStore::collection_empty(const coll_t
& cid
, bool *empty
)
460 dout(10) << __func__
<< " " << cid
<< dendl
;
461 CollectionRef c
= get_collection(cid
);
464 RWLock::RLocker
l(c
->lock
);
465 *empty
= c
->object_map
.empty();
469 int MemStore::collection_bits(const coll_t
& cid
)
471 dout(10) << __func__
<< " " << cid
<< dendl
;
472 CollectionRef c
= get_collection(cid
);
475 RWLock::RLocker
l(c
->lock
);
479 int MemStore::collection_list(const coll_t
& cid
,
480 const ghobject_t
& start
,
481 const ghobject_t
& end
,
483 vector
<ghobject_t
> *ls
, ghobject_t
*next
)
485 CollectionRef c
= get_collection(cid
);
488 RWLock::RLocker
l(c
->lock
);
490 dout(10) << __func__
<< " cid " << cid
<< " start " << start
491 << " end " << end
<< dendl
;
492 map
<ghobject_t
,ObjectRef
>::iterator p
= c
->object_map
.lower_bound(start
);
493 while (p
!= c
->object_map
.end() &&
494 ls
->size() < (unsigned)max
&&
496 ls
->push_back(p
->first
);
500 if (p
== c
->object_map
.end())
501 *next
= ghobject_t::get_max();
505 dout(10) << __func__
<< " cid " << cid
<< " got " << ls
->size() << dendl
;
509 int MemStore::omap_get(
510 const coll_t
& cid
, ///< [in] Collection containing oid
511 const ghobject_t
&oid
, ///< [in] Object containing omap
512 bufferlist
*header
, ///< [out] omap header
513 map
<string
, bufferlist
> *out
/// < [out] Key to value map
516 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
517 CollectionRef c
= get_collection(cid
);
521 ObjectRef o
= c
->get_object(oid
);
524 std::lock_guard
<std::mutex
> lock(o
->omap_mutex
);
525 *header
= o
->omap_header
;
530 int MemStore::omap_get_header(
531 const coll_t
& cid
, ///< [in] Collection containing oid
532 const ghobject_t
&oid
, ///< [in] Object containing omap
533 bufferlist
*header
, ///< [out] omap header
534 bool allow_eio
///< [in] don't assert on eio
537 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
538 CollectionRef c
= get_collection(cid
);
542 ObjectRef o
= c
->get_object(oid
);
545 std::lock_guard
<std::mutex
> lock(o
->omap_mutex
);
546 *header
= o
->omap_header
;
550 int MemStore::omap_get_keys(
551 const coll_t
& cid
, ///< [in] Collection containing oid
552 const ghobject_t
&oid
, ///< [in] Object containing omap
553 set
<string
> *keys
///< [out] Keys defined on oid
556 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
557 CollectionRef c
= get_collection(cid
);
561 ObjectRef o
= c
->get_object(oid
);
564 std::lock_guard
<std::mutex
> lock(o
->omap_mutex
);
565 for (map
<string
,bufferlist
>::iterator p
= o
->omap
.begin();
568 keys
->insert(p
->first
);
572 int MemStore::omap_get_values(
573 const coll_t
& cid
, ///< [in] Collection containing oid
574 const ghobject_t
&oid
, ///< [in] Object containing omap
575 const set
<string
> &keys
, ///< [in] Keys to get
576 map
<string
, bufferlist
> *out
///< [out] Returned keys and values
579 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
580 CollectionRef c
= get_collection(cid
);
584 ObjectRef o
= c
->get_object(oid
);
587 std::lock_guard
<std::mutex
> lock(o
->omap_mutex
);
588 for (set
<string
>::const_iterator p
= keys
.begin();
591 map
<string
,bufferlist
>::iterator q
= o
->omap
.find(*p
);
592 if (q
!= o
->omap
.end())
598 int MemStore::omap_check_keys(
599 const coll_t
& cid
, ///< [in] Collection containing oid
600 const ghobject_t
&oid
, ///< [in] Object containing omap
601 const set
<string
> &keys
, ///< [in] Keys to check
602 set
<string
> *out
///< [out] Subset of keys defined on oid
605 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
606 CollectionRef c
= get_collection(cid
);
610 ObjectRef o
= c
->get_object(oid
);
613 std::lock_guard
<std::mutex
> lock(o
->omap_mutex
);
614 for (set
<string
>::const_iterator p
= keys
.begin();
617 map
<string
,bufferlist
>::iterator q
= o
->omap
.find(*p
);
618 if (q
!= o
->omap
.end())
624 class MemStore::OmapIteratorImpl
: public ObjectMap::ObjectMapIteratorImpl
{
627 map
<string
,bufferlist
>::iterator it
;
629 OmapIteratorImpl(CollectionRef c
, ObjectRef o
)
630 : c(c
), o(o
), it(o
->omap
.begin()) {}
632 int seek_to_first() override
{
633 std::lock_guard
<std::mutex
>(o
->omap_mutex
);
634 it
= o
->omap
.begin();
637 int upper_bound(const string
&after
) override
{
638 std::lock_guard
<std::mutex
>(o
->omap_mutex
);
639 it
= o
->omap
.upper_bound(after
);
642 int lower_bound(const string
&to
) override
{
643 std::lock_guard
<std::mutex
>(o
->omap_mutex
);
644 it
= o
->omap
.lower_bound(to
);
647 bool valid() override
{
648 std::lock_guard
<std::mutex
>(o
->omap_mutex
);
649 return it
!= o
->omap
.end();
651 int next(bool validate
=true) override
{
652 std::lock_guard
<std::mutex
>(o
->omap_mutex
);
656 string
key() override
{
657 std::lock_guard
<std::mutex
>(o
->omap_mutex
);
660 bufferlist
value() override
{
661 std::lock_guard
<std::mutex
>(o
->omap_mutex
);
664 int status() override
{
669 ObjectMap::ObjectMapIterator
MemStore::get_omap_iterator(const coll_t
& cid
,
670 const ghobject_t
& oid
)
672 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
673 CollectionRef c
= get_collection(cid
);
675 return ObjectMap::ObjectMapIterator();
677 ObjectRef o
= c
->get_object(oid
);
679 return ObjectMap::ObjectMapIterator();
680 return ObjectMap::ObjectMapIterator(new OmapIteratorImpl(c
, o
));
687 int MemStore::queue_transactions(Sequencer
*osr
,
688 vector
<Transaction
>& tls
,
690 ThreadPool::TPHandle
*handle
)
692 // because memstore operations are synchronous, we can implement the
693 // Sequencer with a mutex. this guarantees ordering on a given sequencer,
694 // while allowing operations on different sequencers to happen in parallel
695 struct OpSequencer
: public Sequencer_impl
{
696 OpSequencer(CephContext
* cct
) :
697 Sequencer_impl(cct
) {}
699 void flush() override
{}
700 bool flush_commit(Context
*) override
{ return true; }
703 std::unique_lock
<std::mutex
> lock
;
706 osr
->p
= new OpSequencer(cct
);
708 auto seq
= static_cast<OpSequencer
*>(osr
->p
.get());
709 lock
= std::unique_lock
<std::mutex
>(seq
->mutex
);
712 for (vector
<Transaction
>::iterator p
= tls
.begin(); p
!= tls
.end(); ++p
) {
713 // poke the TPHandle heartbeat just to exercise that code path
715 handle
->reset_tp_timeout();
720 Context
*on_apply
= NULL
, *on_apply_sync
= NULL
, *on_commit
= NULL
;
721 ObjectStore::Transaction::collect_contexts(tls
, &on_apply
, &on_commit
,
724 on_apply_sync
->complete(0);
726 finisher
.queue(on_apply
);
728 finisher
.queue(on_commit
);
732 void MemStore::_do_transaction(Transaction
& t
)
734 Transaction::iterator i
= t
.begin();
737 while (i
.have_op()) {
738 Transaction::Op
*op
= i
.decode_op();
742 case Transaction::OP_NOP
:
744 case Transaction::OP_TOUCH
:
746 coll_t cid
= i
.get_cid(op
->cid
);
747 ghobject_t oid
= i
.get_oid(op
->oid
);
748 r
= _touch(cid
, oid
);
752 case Transaction::OP_WRITE
:
754 coll_t cid
= i
.get_cid(op
->cid
);
755 ghobject_t oid
= i
.get_oid(op
->oid
);
756 uint64_t off
= op
->off
;
757 uint64_t len
= op
->len
;
758 uint32_t fadvise_flags
= i
.get_fadvise_flags();
761 r
= _write(cid
, oid
, off
, len
, bl
, fadvise_flags
);
765 case Transaction::OP_ZERO
:
767 coll_t cid
= i
.get_cid(op
->cid
);
768 ghobject_t oid
= i
.get_oid(op
->oid
);
769 uint64_t off
= op
->off
;
770 uint64_t len
= op
->len
;
771 r
= _zero(cid
, oid
, off
, len
);
775 case Transaction::OP_TRIMCACHE
:
781 case Transaction::OP_TRUNCATE
:
783 coll_t cid
= i
.get_cid(op
->cid
);
784 ghobject_t oid
= i
.get_oid(op
->oid
);
785 uint64_t off
= op
->off
;
786 r
= _truncate(cid
, oid
, off
);
790 case Transaction::OP_REMOVE
:
792 coll_t cid
= i
.get_cid(op
->cid
);
793 ghobject_t oid
= i
.get_oid(op
->oid
);
794 r
= _remove(cid
, oid
);
798 case Transaction::OP_SETATTR
:
800 coll_t cid
= i
.get_cid(op
->cid
);
801 ghobject_t oid
= i
.get_oid(op
->oid
);
802 string name
= i
.decode_string();
805 map
<string
, bufferptr
> to_set
;
806 to_set
[name
] = bufferptr(bl
.c_str(), bl
.length());
807 r
= _setattrs(cid
, oid
, to_set
);
811 case Transaction::OP_SETATTRS
:
813 coll_t cid
= i
.get_cid(op
->cid
);
814 ghobject_t oid
= i
.get_oid(op
->oid
);
815 map
<string
, bufferptr
> aset
;
816 i
.decode_attrset(aset
);
817 r
= _setattrs(cid
, oid
, aset
);
821 case Transaction::OP_RMATTR
:
823 coll_t cid
= i
.get_cid(op
->cid
);
824 ghobject_t oid
= i
.get_oid(op
->oid
);
825 string name
= i
.decode_string();
826 r
= _rmattr(cid
, oid
, name
.c_str());
830 case Transaction::OP_RMATTRS
:
832 coll_t cid
= i
.get_cid(op
->cid
);
833 ghobject_t oid
= i
.get_oid(op
->oid
);
834 r
= _rmattrs(cid
, oid
);
838 case Transaction::OP_CLONE
:
840 coll_t cid
= i
.get_cid(op
->cid
);
841 ghobject_t oid
= i
.get_oid(op
->oid
);
842 ghobject_t noid
= i
.get_oid(op
->dest_oid
);
843 r
= _clone(cid
, oid
, noid
);
847 case Transaction::OP_CLONERANGE
:
849 coll_t cid
= i
.get_cid(op
->cid
);
850 ghobject_t oid
= i
.get_oid(op
->oid
);
851 ghobject_t noid
= i
.get_oid(op
->dest_oid
);
852 uint64_t off
= op
->off
;
853 uint64_t len
= op
->len
;
854 r
= _clone_range(cid
, oid
, noid
, off
, len
, off
);
858 case Transaction::OP_CLONERANGE2
:
860 coll_t cid
= i
.get_cid(op
->cid
);
861 ghobject_t oid
= i
.get_oid(op
->oid
);
862 ghobject_t noid
= i
.get_oid(op
->dest_oid
);
863 uint64_t srcoff
= op
->off
;
864 uint64_t len
= op
->len
;
865 uint64_t dstoff
= op
->dest_off
;
866 r
= _clone_range(cid
, oid
, noid
, srcoff
, len
, dstoff
);
870 case Transaction::OP_MKCOLL
:
872 coll_t cid
= i
.get_cid(op
->cid
);
873 r
= _create_collection(cid
, op
->split_bits
);
877 case Transaction::OP_COLL_HINT
:
879 coll_t cid
= i
.get_cid(op
->cid
);
880 uint32_t type
= op
->hint_type
;
883 bufferlist::iterator hiter
= hint
.begin();
884 if (type
== Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS
) {
887 ::decode(pg_num
, hiter
);
888 ::decode(num_objs
, hiter
);
889 r
= _collection_hint_expected_num_objs(cid
, pg_num
, num_objs
);
892 dout(10) << "Unrecognized collection hint type: " << type
<< dendl
;
897 case Transaction::OP_RMCOLL
:
899 coll_t cid
= i
.get_cid(op
->cid
);
900 r
= _destroy_collection(cid
);
904 case Transaction::OP_COLL_ADD
:
906 coll_t ocid
= i
.get_cid(op
->cid
);
907 coll_t ncid
= i
.get_cid(op
->dest_cid
);
908 ghobject_t oid
= i
.get_oid(op
->oid
);
909 r
= _collection_add(ncid
, ocid
, oid
);
913 case Transaction::OP_COLL_REMOVE
:
915 coll_t cid
= i
.get_cid(op
->cid
);
916 ghobject_t oid
= i
.get_oid(op
->oid
);
917 r
= _remove(cid
, oid
);
921 case Transaction::OP_COLL_MOVE
:
922 assert(0 == "deprecated");
925 case Transaction::OP_COLL_MOVE_RENAME
:
927 coll_t oldcid
= i
.get_cid(op
->cid
);
928 ghobject_t oldoid
= i
.get_oid(op
->oid
);
929 coll_t newcid
= i
.get_cid(op
->dest_cid
);
930 ghobject_t newoid
= i
.get_oid(op
->dest_oid
);
931 r
= _collection_move_rename(oldcid
, oldoid
, newcid
, newoid
);
937 case Transaction::OP_TRY_RENAME
:
939 coll_t cid
= i
.get_cid(op
->cid
);
940 ghobject_t oldoid
= i
.get_oid(op
->oid
);
941 ghobject_t newoid
= i
.get_oid(op
->dest_oid
);
942 r
= _collection_move_rename(cid
, oldoid
, cid
, newoid
);
948 case Transaction::OP_COLL_SETATTR
:
950 assert(0 == "not implemented");
954 case Transaction::OP_COLL_RMATTR
:
956 assert(0 == "not implemented");
960 case Transaction::OP_COLL_RENAME
:
962 assert(0 == "not implemented");
966 case Transaction::OP_OMAP_CLEAR
:
968 coll_t cid
= i
.get_cid(op
->cid
);
969 ghobject_t oid
= i
.get_oid(op
->oid
);
970 r
= _omap_clear(cid
, oid
);
973 case Transaction::OP_OMAP_SETKEYS
:
975 coll_t cid
= i
.get_cid(op
->cid
);
976 ghobject_t oid
= i
.get_oid(op
->oid
);
978 i
.decode_attrset_bl(&aset_bl
);
979 r
= _omap_setkeys(cid
, oid
, aset_bl
);
982 case Transaction::OP_OMAP_RMKEYS
:
984 coll_t cid
= i
.get_cid(op
->cid
);
985 ghobject_t oid
= i
.get_oid(op
->oid
);
987 i
.decode_keyset_bl(&keys_bl
);
988 r
= _omap_rmkeys(cid
, oid
, keys_bl
);
991 case Transaction::OP_OMAP_RMKEYRANGE
:
993 coll_t cid
= i
.get_cid(op
->cid
);
994 ghobject_t oid
= i
.get_oid(op
->oid
);
996 first
= i
.decode_string();
997 last
= i
.decode_string();
998 r
= _omap_rmkeyrange(cid
, oid
, first
, last
);
1001 case Transaction::OP_OMAP_SETHEADER
:
1003 coll_t cid
= i
.get_cid(op
->cid
);
1004 ghobject_t oid
= i
.get_oid(op
->oid
);
1007 r
= _omap_setheader(cid
, oid
, bl
);
1010 case Transaction::OP_SPLIT_COLLECTION
:
1011 assert(0 == "deprecated");
1013 case Transaction::OP_SPLIT_COLLECTION2
:
1015 coll_t cid
= i
.get_cid(op
->cid
);
1016 uint32_t bits
= op
->split_bits
;
1017 uint32_t rem
= op
->split_rem
;
1018 coll_t dest
= i
.get_cid(op
->dest_cid
);
1019 r
= _split_collection(cid
, bits
, rem
, dest
);
1023 case Transaction::OP_SETALLOCHINT
:
1030 derr
<< "bad op " << op
->op
<< dendl
;
1037 if (r
== -ENOENT
&& !(op
->op
== Transaction::OP_CLONERANGE
||
1038 op
->op
== Transaction::OP_CLONE
||
1039 op
->op
== Transaction::OP_CLONERANGE2
||
1040 op
->op
== Transaction::OP_COLL_ADD
))
1041 // -ENOENT is usually okay
1047 const char *msg
= "unexpected error code";
1049 if (r
== -ENOENT
&& (op
->op
== Transaction::OP_CLONERANGE
||
1050 op
->op
== Transaction::OP_CLONE
||
1051 op
->op
== Transaction::OP_CLONERANGE2
))
1052 msg
= "ENOENT on clone suggests osd bug";
1055 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
1056 // by partially applying transactions.
1057 msg
= "ENOSPC from MemStore, misconfigured cluster or insufficient memory";
1059 if (r
== -ENOTEMPTY
) {
1060 msg
= "ENOTEMPTY suggests garbage data in osd data dir";
1064 derr
<< " error " << cpp_strerror(r
) << " not handled on operation " << op
->op
1065 << " (op " << pos
<< ", counting from 0)" << dendl
;
1066 dout(0) << msg
<< dendl
;
1067 dout(0) << " transaction dump:\n";
1068 JSONFormatter
f(true);
1069 f
.open_object_section("transaction");
1074 assert(0 == "unexpected error");
1082 int MemStore::_touch(const coll_t
& cid
, const ghobject_t
& oid
)
1084 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1085 CollectionRef c
= get_collection(cid
);
1089 c
->get_or_create_object(oid
);
1093 int MemStore::_write(const coll_t
& cid
, const ghobject_t
& oid
,
1094 uint64_t offset
, size_t len
, const bufferlist
& bl
,
1095 uint32_t fadvise_flags
)
1097 dout(10) << __func__
<< " " << cid
<< " " << oid
<< " "
1098 << offset
<< "~" << len
<< dendl
;
1099 assert(len
== bl
.length());
1101 CollectionRef c
= get_collection(cid
);
1105 ObjectRef o
= c
->get_or_create_object(oid
);
1107 const ssize_t old_size
= o
->get_size();
1108 o
->write(offset
, bl
);
1109 used_bytes
+= (o
->get_size() - old_size
);
1115 int MemStore::_zero(const coll_t
& cid
, const ghobject_t
& oid
,
1116 uint64_t offset
, size_t len
)
1118 dout(10) << __func__
<< " " << cid
<< " " << oid
<< " " << offset
<< "~"
1121 bl
.append_zero(len
);
1122 return _write(cid
, oid
, offset
, len
, bl
);
1125 int MemStore::_truncate(const coll_t
& cid
, const ghobject_t
& oid
, uint64_t size
)
1127 dout(10) << __func__
<< " " << cid
<< " " << oid
<< " " << size
<< dendl
;
1128 CollectionRef c
= get_collection(cid
);
1132 ObjectRef o
= c
->get_object(oid
);
1135 const ssize_t old_size
= o
->get_size();
1136 int r
= o
->truncate(size
);
1137 used_bytes
+= (o
->get_size() - old_size
);
1141 int MemStore::_remove(const coll_t
& cid
, const ghobject_t
& oid
)
1143 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1144 CollectionRef c
= get_collection(cid
);
1147 RWLock::WLocker
l(c
->lock
);
1149 auto i
= c
->object_hash
.find(oid
);
1150 if (i
== c
->object_hash
.end())
1152 used_bytes
-= i
->second
->get_size();
1153 c
->object_hash
.erase(i
);
1154 c
->object_map
.erase(oid
);
1159 int MemStore::_setattrs(const coll_t
& cid
, const ghobject_t
& oid
,
1160 map
<string
,bufferptr
>& aset
)
1162 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1163 CollectionRef c
= get_collection(cid
);
1167 ObjectRef o
= c
->get_object(oid
);
1170 std::lock_guard
<std::mutex
> lock(o
->xattr_mutex
);
1171 for (map
<string
,bufferptr
>::const_iterator p
= aset
.begin(); p
!= aset
.end(); ++p
)
1172 o
->xattr
[p
->first
] = p
->second
;
1176 int MemStore::_rmattr(const coll_t
& cid
, const ghobject_t
& oid
, const char *name
)
1178 dout(10) << __func__
<< " " << cid
<< " " << oid
<< " " << name
<< dendl
;
1179 CollectionRef c
= get_collection(cid
);
1183 ObjectRef o
= c
->get_object(oid
);
1186 std::lock_guard
<std::mutex
> lock(o
->xattr_mutex
);
1187 auto i
= o
->xattr
.find(name
);
1188 if (i
== o
->xattr
.end())
1194 int MemStore::_rmattrs(const coll_t
& cid
, const ghobject_t
& oid
)
1196 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1197 CollectionRef c
= get_collection(cid
);
1201 ObjectRef o
= c
->get_object(oid
);
1204 std::lock_guard
<std::mutex
> lock(o
->xattr_mutex
);
1209 int MemStore::_clone(const coll_t
& cid
, const ghobject_t
& oldoid
,
1210 const ghobject_t
& newoid
)
1212 dout(10) << __func__
<< " " << cid
<< " " << oldoid
1213 << " -> " << newoid
<< dendl
;
1214 CollectionRef c
= get_collection(cid
);
1218 ObjectRef oo
= c
->get_object(oldoid
);
1221 ObjectRef no
= c
->get_or_create_object(newoid
);
1222 used_bytes
+= oo
->get_size() - no
->get_size();
1223 no
->clone(oo
.get(), 0, oo
->get_size(), 0);
1225 // take xattr and omap locks with std::lock()
1226 std::unique_lock
<std::mutex
>
1227 ox_lock(oo
->xattr_mutex
, std::defer_lock
),
1228 nx_lock(no
->xattr_mutex
, std::defer_lock
),
1229 oo_lock(oo
->omap_mutex
, std::defer_lock
),
1230 no_lock(no
->omap_mutex
, std::defer_lock
);
1231 std::lock(ox_lock
, nx_lock
, oo_lock
, no_lock
);
1233 no
->omap_header
= oo
->omap_header
;
1234 no
->omap
= oo
->omap
;
1235 no
->xattr
= oo
->xattr
;
1239 int MemStore::_clone_range(const coll_t
& cid
, const ghobject_t
& oldoid
,
1240 const ghobject_t
& newoid
,
1241 uint64_t srcoff
, uint64_t len
, uint64_t dstoff
)
1243 dout(10) << __func__
<< " " << cid
<< " "
1244 << oldoid
<< " " << srcoff
<< "~" << len
<< " -> "
1245 << newoid
<< " " << dstoff
<< "~" << len
1247 CollectionRef c
= get_collection(cid
);
1251 ObjectRef oo
= c
->get_object(oldoid
);
1254 ObjectRef no
= c
->get_or_create_object(newoid
);
1255 if (srcoff
>= oo
->get_size())
1257 if (srcoff
+ len
>= oo
->get_size())
1258 len
= oo
->get_size() - srcoff
;
1260 const ssize_t old_size
= no
->get_size();
1261 no
->clone(oo
.get(), srcoff
, len
, dstoff
);
1262 used_bytes
+= (no
->get_size() - old_size
);
1267 int MemStore::_omap_clear(const coll_t
& cid
, const ghobject_t
&oid
)
1269 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1270 CollectionRef c
= get_collection(cid
);
1274 ObjectRef o
= c
->get_object(oid
);
1277 std::lock_guard
<std::mutex
> lock(o
->omap_mutex
);
1279 o
->omap_header
.clear();
1283 int MemStore::_omap_setkeys(const coll_t
& cid
, const ghobject_t
&oid
,
1284 bufferlist
& aset_bl
)
1286 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1287 CollectionRef c
= get_collection(cid
);
1291 ObjectRef o
= c
->get_object(oid
);
1294 std::lock_guard
<std::mutex
> lock(o
->omap_mutex
);
1295 bufferlist::iterator p
= aset_bl
.begin();
1301 ::decode(o
->omap
[key
], p
);
1306 int MemStore::_omap_rmkeys(const coll_t
& cid
, const ghobject_t
&oid
,
1307 bufferlist
& keys_bl
)
1309 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1310 CollectionRef c
= get_collection(cid
);
1314 ObjectRef o
= c
->get_object(oid
);
1317 std::lock_guard
<std::mutex
> lock(o
->omap_mutex
);
1318 bufferlist::iterator p
= keys_bl
.begin();
1329 int MemStore::_omap_rmkeyrange(const coll_t
& cid
, const ghobject_t
&oid
,
1330 const string
& first
, const string
& last
)
1332 dout(10) << __func__
<< " " << cid
<< " " << oid
<< " " << first
1333 << " " << last
<< dendl
;
1334 CollectionRef c
= get_collection(cid
);
1338 ObjectRef o
= c
->get_object(oid
);
1341 std::lock_guard
<std::mutex
> lock(o
->omap_mutex
);
1342 map
<string
,bufferlist
>::iterator p
= o
->omap
.lower_bound(first
);
1343 map
<string
,bufferlist
>::iterator e
= o
->omap
.lower_bound(last
);
1344 o
->omap
.erase(p
, e
);
1348 int MemStore::_omap_setheader(const coll_t
& cid
, const ghobject_t
&oid
,
1349 const bufferlist
&bl
)
1351 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1352 CollectionRef c
= get_collection(cid
);
1356 ObjectRef o
= c
->get_object(oid
);
1359 std::lock_guard
<std::mutex
> lock(o
->omap_mutex
);
1360 o
->omap_header
= bl
;
1364 int MemStore::_create_collection(const coll_t
& cid
, int bits
)
1366 dout(10) << __func__
<< " " << cid
<< dendl
;
1367 RWLock::WLocker
l(coll_lock
);
1368 auto result
= coll_map
.insert(std::make_pair(cid
, CollectionRef()));
1371 result
.first
->second
.reset(new Collection(cct
, cid
));
1372 result
.first
->second
->bits
= bits
;
1376 int MemStore::_destroy_collection(const coll_t
& cid
)
1378 dout(10) << __func__
<< " " << cid
<< dendl
;
1379 RWLock::WLocker
l(coll_lock
);
1380 ceph::unordered_map
<coll_t
,CollectionRef
>::iterator cp
= coll_map
.find(cid
);
1381 if (cp
== coll_map
.end())
1384 RWLock::RLocker
l2(cp
->second
->lock
);
1385 if (!cp
->second
->object_map
.empty())
1387 cp
->second
->exists
= false;
1389 used_bytes
-= cp
->second
->used_bytes();
1394 int MemStore::_collection_add(const coll_t
& cid
, const coll_t
& ocid
, const ghobject_t
& oid
)
1396 dout(10) << __func__
<< " " << cid
<< " " << ocid
<< " " << oid
<< dendl
;
1397 CollectionRef c
= get_collection(cid
);
1400 CollectionRef oc
= get_collection(ocid
);
1403 RWLock::WLocker
l1(MIN(&(*c
), &(*oc
))->lock
);
1404 RWLock::WLocker
l2(MAX(&(*c
), &(*oc
))->lock
);
1406 if (c
->object_hash
.count(oid
))
1408 if (oc
->object_hash
.count(oid
) == 0)
1410 ObjectRef o
= oc
->object_hash
[oid
];
1411 c
->object_map
[oid
] = o
;
1412 c
->object_hash
[oid
] = o
;
1416 int MemStore::_collection_move_rename(const coll_t
& oldcid
, const ghobject_t
& oldoid
,
1417 coll_t cid
, const ghobject_t
& oid
)
1419 dout(10) << __func__
<< " " << oldcid
<< " " << oldoid
<< " -> "
1420 << cid
<< " " << oid
<< dendl
;
1421 CollectionRef c
= get_collection(cid
);
1424 CollectionRef oc
= get_collection(oldcid
);
1428 // note: c and oc may be the same
1429 assert(&(*c
) == &(*oc
));
1430 c
->lock
.get_write();
1433 if (c
->object_hash
.count(oid
))
1436 if (oc
->object_hash
.count(oldoid
) == 0)
1439 ObjectRef o
= oc
->object_hash
[oldoid
];
1440 c
->object_map
[oid
] = o
;
1441 c
->object_hash
[oid
] = o
;
1442 oc
->object_map
.erase(oldoid
);
1443 oc
->object_hash
.erase(oldoid
);
1447 c
->lock
.put_write();
1451 int MemStore::_split_collection(const coll_t
& cid
, uint32_t bits
, uint32_t match
,
1454 dout(10) << __func__
<< " " << cid
<< " " << bits
<< " " << match
<< " "
1456 CollectionRef sc
= get_collection(cid
);
1459 CollectionRef dc
= get_collection(dest
);
1462 RWLock::WLocker
l1(MIN(&(*sc
), &(*dc
))->lock
);
1463 RWLock::WLocker
l2(MAX(&(*sc
), &(*dc
))->lock
);
1465 map
<ghobject_t
,ObjectRef
>::iterator p
= sc
->object_map
.begin();
1466 while (p
!= sc
->object_map
.end()) {
1467 if (p
->first
.match(bits
, match
)) {
1468 dout(20) << " moving " << p
->first
<< dendl
;
1469 dc
->object_map
.insert(make_pair(p
->first
, p
->second
));
1470 dc
->object_hash
.insert(make_pair(p
->first
, p
->second
));
1471 sc
->object_hash
.erase(p
->first
);
1472 sc
->object_map
.erase(p
++);
1479 assert(dc
->bits
== (int)bits
);
1484 struct BufferlistObject
: public MemStore::Object
{
1488 size_t get_size() const override
{ return data
.length(); }
1490 int read(uint64_t offset
, uint64_t len
, bufferlist
&bl
) override
;
1491 int write(uint64_t offset
, const bufferlist
&bl
) override
;
1492 int clone(Object
*src
, uint64_t srcoff
, uint64_t len
,
1493 uint64_t dstoff
) override
;
1494 int truncate(uint64_t offset
) override
;
1496 void encode(bufferlist
& bl
) const override
{
1497 ENCODE_START(1, 1, bl
);
1502 void decode(bufferlist::iterator
& p
) override
{
1511 int BufferlistObject::read(uint64_t offset
, uint64_t len
,
1514 std::lock_guard
<Spinlock
> lock(mutex
);
1515 bl
.substr_of(data
, offset
, len
);
1519 int BufferlistObject::write(uint64_t offset
, const bufferlist
&src
)
1521 unsigned len
= src
.length();
1523 std::lock_guard
<Spinlock
> lock(mutex
);
1527 if (get_size() >= offset
) {
1528 newdata
.substr_of(data
, 0, offset
);
1531 newdata
.substr_of(data
, 0, get_size());
1533 newdata
.append_zero(offset
- get_size());
1536 newdata
.append(src
);
1539 if (get_size() > offset
+ len
) {
1541 tail
.substr_of(data
, offset
+ len
, get_size() - (offset
+ len
));
1542 newdata
.append(tail
);
1545 data
.claim(newdata
);
1549 int BufferlistObject::clone(Object
*src
, uint64_t srcoff
,
1550 uint64_t len
, uint64_t dstoff
)
1552 auto srcbl
= dynamic_cast<BufferlistObject
*>(src
);
1553 if (srcbl
== nullptr)
1558 std::lock_guard
<Spinlock
> lock(srcbl
->mutex
);
1559 if (srcoff
== dstoff
&& len
== src
->get_size()) {
1563 bl
.substr_of(srcbl
->data
, srcoff
, len
);
1565 return write(dstoff
, bl
);
1568 int BufferlistObject::truncate(uint64_t size
)
1570 std::lock_guard
<Spinlock
> lock(mutex
);
1571 if (get_size() > size
) {
1573 bl
.substr_of(data
, 0, size
);
1575 } else if (get_size() == size
) {
1578 data
.append_zero(size
- get_size());
1585 struct MemStore::PageSetObject
: public Object
{
1588 #if defined(__GLIBCXX__)
1589 // use a thread-local vector for the pages returned by PageSet, so we
1590 // can avoid allocations in read/write()
1591 static thread_local
PageSet::page_vector tls_pages
;
1594 explicit PageSetObject(size_t page_size
) : data(page_size
), data_len(0) {}
1596 size_t get_size() const override
{ return data_len
; }
1598 int read(uint64_t offset
, uint64_t len
, bufferlist
&bl
) override
;
1599 int write(uint64_t offset
, const bufferlist
&bl
) override
;
1600 int clone(Object
*src
, uint64_t srcoff
, uint64_t len
,
1601 uint64_t dstoff
) override
;
1602 int truncate(uint64_t offset
) override
;
1604 void encode(bufferlist
& bl
) const override
{
1605 ENCODE_START(1, 1, bl
);
1606 ::encode(data_len
, bl
);
1611 void decode(bufferlist::iterator
& p
) override
{
1613 ::decode(data_len
, p
);
1620 #if defined(__GLIBCXX__)
1621 // use a thread-local vector for the pages returned by PageSet, so we
1622 // can avoid allocations in read/write()
1623 thread_local
PageSet::page_vector
MemStore::PageSetObject::tls_pages
;
1624 #define DEFINE_PAGE_VECTOR(name)
1626 #define DEFINE_PAGE_VECTOR(name) PageSet::page_vector name;
1629 int MemStore::PageSetObject::read(uint64_t offset
, uint64_t len
, bufferlist
& bl
)
1631 const auto start
= offset
;
1632 const auto end
= offset
+ len
;
1633 auto remaining
= len
;
1635 DEFINE_PAGE_VECTOR(tls_pages
);
1636 data
.get_range(offset
, len
, tls_pages
);
1638 // allocate a buffer for the data
1639 buffer::ptr
buf(len
);
1641 auto p
= tls_pages
.begin();
1643 // no more pages in range
1644 if (p
== tls_pages
.end() || (*p
)->offset
>= end
) {
1645 buf
.zero(offset
- start
, remaining
);
1650 // fill any holes between pages with zeroes
1651 if (page
->offset
> offset
) {
1652 const auto count
= std::min(remaining
, page
->offset
- offset
);
1653 buf
.zero(offset
- start
, count
);
1655 offset
= page
->offset
;
1661 const auto page_offset
= offset
- page
->offset
;
1662 const auto count
= min(remaining
, data
.get_page_size() - page_offset
);
1664 buf
.copy_in(offset
- start
, count
, page
->data
+ page_offset
);
1672 tls_pages
.clear(); // drop page refs
1674 bl
.append(std::move(buf
));
1678 int MemStore::PageSetObject::write(uint64_t offset
, const bufferlist
&src
)
1680 unsigned len
= src
.length();
1682 DEFINE_PAGE_VECTOR(tls_pages
);
1683 // make sure the page range is allocated
1684 data
.alloc_range(offset
, src
.length(), tls_pages
);
1686 auto page
= tls_pages
.begin();
1688 auto p
= src
.begin();
1690 unsigned page_offset
= offset
- (*page
)->offset
;
1691 unsigned pageoff
= data
.get_page_size() - page_offset
;
1692 unsigned count
= min(len
, pageoff
);
1693 p
.copy(count
, (*page
)->data
+ page_offset
);
1696 if (count
== pageoff
)
1699 if (data_len
< offset
)
1701 tls_pages
.clear(); // drop page refs
1705 int MemStore::PageSetObject::clone(Object
*src
, uint64_t srcoff
,
1706 uint64_t len
, uint64_t dstoff
)
1708 const int64_t delta
= dstoff
- srcoff
;
1710 auto &src_data
= static_cast<PageSetObject
*>(src
)->data
;
1711 const uint64_t src_page_size
= src_data
.get_page_size();
1713 auto &dst_data
= data
;
1714 const auto dst_page_size
= dst_data
.get_page_size();
1716 DEFINE_PAGE_VECTOR(tls_pages
);
1717 PageSet::page_vector dst_pages
;
1720 // limit to 16 pages at a time so tls_pages doesn't balloon in size
1721 auto count
= std::min(len
, (uint64_t)src_page_size
* 16);
1722 src_data
.get_range(srcoff
, count
, tls_pages
);
1724 // allocate the destination range
1725 // TODO: avoid allocating pages for holes in the source range
1726 dst_data
.alloc_range(srcoff
+ delta
, count
, dst_pages
);
1727 auto dst_iter
= dst_pages
.begin();
1729 for (auto &src_page
: tls_pages
) {
1730 auto sbegin
= std::max(srcoff
, src_page
->offset
);
1731 auto send
= std::min(srcoff
+ count
, src_page
->offset
+ src_page_size
);
1733 // zero-fill holes before src_page
1734 if (srcoff
< sbegin
) {
1735 while (dst_iter
!= dst_pages
.end()) {
1736 auto &dst_page
= *dst_iter
;
1737 auto dbegin
= std::max(srcoff
+ delta
, dst_page
->offset
);
1738 auto dend
= std::min(sbegin
+ delta
, dst_page
->offset
+ dst_page_size
);
1739 std::fill(dst_page
->data
+ dbegin
- dst_page
->offset
,
1740 dst_page
->data
+ dend
- dst_page
->offset
, 0);
1741 if (dend
< dst_page
->offset
+ dst_page_size
)
1745 const auto c
= sbegin
- srcoff
;
1750 // copy data from src page to dst pages
1751 while (dst_iter
!= dst_pages
.end()) {
1752 auto &dst_page
= *dst_iter
;
1753 auto dbegin
= std::max(sbegin
+ delta
, dst_page
->offset
);
1754 auto dend
= std::min(send
+ delta
, dst_page
->offset
+ dst_page_size
);
1756 std::copy(src_page
->data
+ (dbegin
- delta
) - src_page
->offset
,
1757 src_page
->data
+ (dend
- delta
) - src_page
->offset
,
1758 dst_page
->data
+ dbegin
- dst_page
->offset
);
1759 if (dend
< dst_page
->offset
+ dst_page_size
)
1764 const auto c
= send
- sbegin
;
1768 dstoff
= send
+ delta
;
1770 tls_pages
.clear(); // drop page refs
1772 // zero-fill holes after the last src_page
1774 while (dst_iter
!= dst_pages
.end()) {
1775 auto &dst_page
= *dst_iter
;
1776 auto dbegin
= std::max(dstoff
, dst_page
->offset
);
1777 auto dend
= std::min(dstoff
+ count
, dst_page
->offset
+ dst_page_size
);
1778 std::fill(dst_page
->data
+ dbegin
- dst_page
->offset
,
1779 dst_page
->data
+ dend
- dst_page
->offset
, 0);
1786 dst_pages
.clear(); // drop page refs
1789 // update object size
1790 if (data_len
< dstoff
)
1795 int MemStore::PageSetObject::truncate(uint64_t size
)
1797 data
.free_pages_after(size
);
1800 const auto page_size
= data
.get_page_size();
1801 const auto page_offset
= size
& ~(page_size
-1);
1802 if (page_offset
== size
)
1805 DEFINE_PAGE_VECTOR(tls_pages
);
1806 // write zeroes to the rest of the last page
1807 data
.get_range(page_offset
, page_size
, tls_pages
);
1808 if (tls_pages
.empty())
1811 auto page
= tls_pages
.begin();
1812 auto data
= (*page
)->data
;
1813 std::fill(data
+ (size
- page_offset
), data
+ page_size
, 0);
1814 tls_pages
.clear(); // drop page ref
1819 MemStore::ObjectRef
MemStore::Collection::create_object() const {
1821 return new PageSetObject(cct
->_conf
->memstore_page_size
);
1822 return new BufferlistObject();