1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #ifdef HAVE_SYS_MOUNT_H
17 #include <sys/mount.h>
20 #ifdef HAVE_SYS_PARAM_H
21 #include <sys/param.h>
24 #include "include/types.h"
25 #include "include/stringify.h"
26 #include "include/unordered_map.h"
27 #include "common/errno.h"
29 #include "include/compat.h"
31 #define dout_context cct
32 #define dout_subsys ceph_subsys_filestore
34 #define dout_prefix *_dout << "memstore(" << path << ") "
36 // for comparing collections for lock ordering
37 bool operator>(const MemStore::CollectionRef
& l
,
38 const MemStore::CollectionRef
& r
)
40 return (unsigned long)l
.get() > (unsigned long)r
.get();
53 int MemStore::umount()
55 finisher
.wait_for_empty();
62 dout(10) << __func__
<< dendl
;
64 set
<coll_t
> collections
;
65 for (ceph::unordered_map
<coll_t
,CollectionRef
>::iterator p
= coll_map
.begin();
68 dout(20) << __func__
<< " coll " << p
->first
<< " " << p
->second
<< dendl
;
69 collections
.insert(p
->first
);
71 ceph_assert(p
->second
);
72 p
->second
->encode(bl
);
73 string fn
= path
+ "/" + stringify(p
->first
);
74 int r
= bl
.write_file(fn
.c_str());
79 string fn
= path
+ "/collections";
81 encode(collections
, bl
);
82 int r
= bl
.write_file(fn
.c_str());
89 void MemStore::dump_all()
91 Formatter
*f
= Formatter::create("json-pretty");
92 f
->open_object_section("store");
101 void MemStore::dump(Formatter
*f
)
103 f
->open_array_section("collections");
104 for (ceph::unordered_map
<coll_t
,CollectionRef
>::iterator p
= coll_map
.begin();
107 f
->open_object_section("collection");
108 f
->dump_string("name", stringify(p
->first
));
110 f
->open_array_section("xattrs");
111 for (map
<string
,bufferptr
>::iterator q
= p
->second
->xattr
.begin();
112 q
!= p
->second
->xattr
.end();
114 f
->open_object_section("xattr");
115 f
->dump_string("name", q
->first
);
116 f
->dump_int("length", q
->second
.length());
121 f
->open_array_section("objects");
122 for (map
<ghobject_t
,ObjectRef
>::iterator q
= p
->second
->object_map
.begin();
123 q
!= p
->second
->object_map
.end();
125 f
->open_object_section("object");
126 f
->dump_string("name", stringify(q
->first
));
138 int MemStore::_load()
140 dout(10) << __func__
<< dendl
;
142 string fn
= path
+ "/collections";
144 int r
= bl
.read_file(fn
.c_str(), &err
);
148 set
<coll_t
> collections
;
149 auto p
= bl
.cbegin();
150 decode(collections
, p
);
152 for (set
<coll_t
>::iterator q
= collections
.begin();
153 q
!= collections
.end();
155 string fn
= path
+ "/" + stringify(*q
);
157 int r
= cbl
.read_file(fn
.c_str(), &err
);
160 CollectionRef
c(new Collection(cct
, *q
));
161 auto p
= cbl
.cbegin();
164 used_bytes
+= c
->used_bytes();
172 void MemStore::set_fsid(uuid_d u
)
174 int r
= write_meta("fsid", stringify(u
));
178 uuid_d
MemStore::get_fsid()
181 int r
= read_meta("fsid", &fsid_str
);
184 bool b
= uuid
.parse(fsid_str
.c_str());
192 int r
= read_meta("fsid", &fsid_str
);
195 fsid
.generate_random();
196 fsid_str
= stringify(fsid
);
197 r
= write_meta("fsid", fsid_str
);
200 dout(1) << __func__
<< " new fsid " << fsid_str
<< dendl
;
204 dout(1) << __func__
<< " had fsid " << fsid_str
<< dendl
;
207 string fn
= path
+ "/collections";
208 derr
<< path
<< dendl
;
210 set
<coll_t
> collections
;
211 encode(collections
, bl
);
212 r
= bl
.write_file(fn
.c_str());
216 r
= write_meta("type", "memstore");
223 int MemStore::statfs(struct store_statfs_t
*st
, osd_alert_list_t
* alerts
)
225 dout(10) << __func__
<< dendl
;
227 alerts
->clear(); // returns nothing for now
230 st
->total
= cct
->_conf
->memstore_device_bytes
;
231 st
->available
= std::max
<int64_t>(st
->total
- used_bytes
, 0);
232 dout(10) << __func__
<< ": used_bytes: " << used_bytes
233 << "/" << cct
->_conf
->memstore_device_bytes
<< dendl
;
237 int MemStore::pool_statfs(uint64_t pool_id
, struct store_statfs_t
*buf
)
242 objectstore_perf_stat_t
MemStore::get_cur_stats()
245 return objectstore_perf_stat_t();
248 MemStore::CollectionRef
MemStore::get_collection(const coll_t
& cid
)
250 std::shared_lock l
{coll_lock
};
251 ceph::unordered_map
<coll_t
,CollectionRef
>::iterator cp
= coll_map
.find(cid
);
252 if (cp
== coll_map
.end())
253 return CollectionRef();
257 ObjectStore::CollectionHandle
MemStore::create_new_collection(const coll_t
& cid
)
259 std::lock_guard l
{coll_lock
};
260 Collection
*c
= new Collection(cct
, cid
);
261 new_coll_map
[cid
] = c
;
269 bool MemStore::exists(CollectionHandle
&c_
, const ghobject_t
& oid
)
271 Collection
*c
= static_cast<Collection
*>(c_
.get());
272 dout(10) << __func__
<< " " << c
->get_cid() << " " << oid
<< dendl
;
276 // Perform equivalent of c->get_object_(oid) != NULL. In C++11 the
277 // shared_ptr needs to be compared to nullptr.
278 return (bool)c
->get_object(oid
);
282 CollectionHandle
&c_
,
283 const ghobject_t
& oid
,
287 Collection
*c
= static_cast<Collection
*>(c_
.get());
288 dout(10) << __func__
<< " " << c
->cid
<< " " << oid
<< dendl
;
291 ObjectRef o
= c
->get_object(oid
);
294 st
->st_size
= o
->get_size();
295 st
->st_blksize
= 4096;
296 st
->st_blocks
= (st
->st_size
+ st
->st_blksize
- 1) / st
->st_blksize
;
301 int MemStore::set_collection_opts(
302 CollectionHandle
& ch
,
303 const pool_opts_t
& opts
)
309 CollectionHandle
&c_
,
310 const ghobject_t
& oid
,
316 Collection
*c
= static_cast<Collection
*>(c_
.get());
317 dout(10) << __func__
<< " " << c
->cid
<< " " << oid
<< " "
318 << offset
<< "~" << len
<< dendl
;
321 ObjectRef o
= c
->get_object(oid
);
324 if (offset
>= o
->get_size())
327 if (l
== 0 && offset
== 0) // note: len == 0 means read the entire object
329 else if (offset
+ l
> o
->get_size())
330 l
= o
->get_size() - offset
;
332 return o
->read(offset
, l
, bl
);
335 int MemStore::fiemap(CollectionHandle
& ch
, const ghobject_t
& oid
,
336 uint64_t offset
, size_t len
, bufferlist
& bl
)
338 map
<uint64_t, uint64_t> destmap
;
339 int r
= fiemap(ch
, oid
, offset
, len
, destmap
);
345 int MemStore::fiemap(CollectionHandle
& ch
, const ghobject_t
& oid
,
346 uint64_t offset
, size_t len
, map
<uint64_t, uint64_t>& destmap
)
348 dout(10) << __func__
<< " " << ch
->cid
<< " " << oid
<< " " << offset
<< "~"
350 Collection
*c
= static_cast<Collection
*>(ch
.get());
354 ObjectRef o
= c
->get_object(oid
);
358 if (offset
+ l
> o
->get_size())
359 l
= o
->get_size() - offset
;
360 if (offset
>= o
->get_size())
367 int MemStore::getattr(CollectionHandle
&c_
, const ghobject_t
& oid
,
368 const char *name
, bufferptr
& value
)
370 Collection
*c
= static_cast<Collection
*>(c_
.get());
371 dout(10) << __func__
<< " " << c
->cid
<< " " << oid
<< " " << name
<< dendl
;
374 ObjectRef o
= c
->get_object(oid
);
378 std::lock_guard lock
{o
->xattr_mutex
};
379 if (!o
->xattr
.count(k
)) {
386 int MemStore::getattrs(CollectionHandle
&c_
, const ghobject_t
& oid
,
387 map
<string
,bufferptr
>& aset
)
389 Collection
*c
= static_cast<Collection
*>(c_
.get());
390 dout(10) << __func__
<< " " << c
->cid
<< " " << oid
<< dendl
;
394 ObjectRef o
= c
->get_object(oid
);
397 std::lock_guard lock
{o
->xattr_mutex
};
402 int MemStore::list_collections(vector
<coll_t
>& ls
)
404 dout(10) << __func__
<< dendl
;
405 std::shared_lock l
{coll_lock
};
406 for (ceph::unordered_map
<coll_t
,CollectionRef
>::iterator p
= coll_map
.begin();
409 ls
.push_back(p
->first
);
414 bool MemStore::collection_exists(const coll_t
& cid
)
416 dout(10) << __func__
<< " " << cid
<< dendl
;
417 std::shared_lock l
{coll_lock
};
418 return coll_map
.count(cid
);
421 int MemStore::collection_empty(CollectionHandle
& ch
, bool *empty
)
423 dout(10) << __func__
<< " " << ch
->cid
<< dendl
;
424 CollectionRef c
= static_cast<Collection
*>(ch
.get());
425 std::shared_lock l
{c
->lock
};
426 *empty
= c
->object_map
.empty();
430 int MemStore::collection_bits(CollectionHandle
& ch
)
432 dout(10) << __func__
<< " " << ch
->cid
<< dendl
;
433 Collection
*c
= static_cast<Collection
*>(ch
.get());
434 std::shared_lock l
{c
->lock
};
438 int MemStore::collection_list(CollectionHandle
& ch
,
439 const ghobject_t
& start
,
440 const ghobject_t
& end
,
442 vector
<ghobject_t
> *ls
, ghobject_t
*next
)
444 Collection
*c
= static_cast<Collection
*>(ch
.get());
445 std::shared_lock l
{c
->lock
};
447 dout(10) << __func__
<< " cid " << ch
->cid
<< " start " << start
448 << " end " << end
<< dendl
;
449 map
<ghobject_t
,ObjectRef
>::iterator p
= c
->object_map
.lower_bound(start
);
450 while (p
!= c
->object_map
.end() &&
451 ls
->size() < (unsigned)max
&&
453 ls
->push_back(p
->first
);
457 if (p
== c
->object_map
.end())
458 *next
= ghobject_t::get_max();
462 dout(10) << __func__
<< " cid " << ch
->cid
<< " got " << ls
->size() << dendl
;
466 int MemStore::omap_get(
467 CollectionHandle
& ch
, ///< [in] Collection containing oid
468 const ghobject_t
&oid
, ///< [in] Object containing omap
469 bufferlist
*header
, ///< [out] omap header
470 map
<string
, bufferlist
> *out
/// < [out] Key to value map
473 dout(10) << __func__
<< " " << ch
->cid
<< " " << oid
<< dendl
;
474 Collection
*c
= static_cast<Collection
*>(ch
.get());
476 ObjectRef o
= c
->get_object(oid
);
479 std::lock_guard lock
{o
->omap_mutex
};
480 *header
= o
->omap_header
;
485 int MemStore::omap_get_header(
486 CollectionHandle
& ch
, ///< [in] Collection containing oid
487 const ghobject_t
&oid
, ///< [in] Object containing omap
488 bufferlist
*header
, ///< [out] omap header
489 bool allow_eio
///< [in] don't assert on eio
492 dout(10) << __func__
<< " " << ch
->cid
<< " " << oid
<< dendl
;
493 Collection
*c
= static_cast<Collection
*>(ch
.get());
494 ObjectRef o
= c
->get_object(oid
);
497 std::lock_guard lock
{o
->omap_mutex
};
498 *header
= o
->omap_header
;
502 int MemStore::omap_get_keys(
503 CollectionHandle
& ch
, ///< [in] Collection containing oid
504 const ghobject_t
&oid
, ///< [in] Object containing omap
505 set
<string
> *keys
///< [out] Keys defined on oid
508 dout(10) << __func__
<< " " << ch
->cid
<< " " << oid
<< dendl
;
509 Collection
*c
= static_cast<Collection
*>(ch
.get());
510 ObjectRef o
= c
->get_object(oid
);
513 std::lock_guard lock
{o
->omap_mutex
};
514 for (map
<string
,bufferlist
>::iterator p
= o
->omap
.begin();
517 keys
->insert(p
->first
);
521 int MemStore::omap_get_values(
522 CollectionHandle
& ch
, ///< [in] Collection containing oid
523 const ghobject_t
&oid
, ///< [in] Object containing omap
524 const set
<string
> &keys
, ///< [in] Keys to get
525 map
<string
, bufferlist
> *out
///< [out] Returned keys and values
528 dout(10) << __func__
<< " " << ch
->cid
<< " " << oid
<< dendl
;
529 Collection
*c
= static_cast<Collection
*>(ch
.get());
530 ObjectRef o
= c
->get_object(oid
);
533 std::lock_guard lock
{o
->omap_mutex
};
534 for (set
<string
>::const_iterator p
= keys
.begin();
537 map
<string
,bufferlist
>::iterator q
= o
->omap
.find(*p
);
538 if (q
!= o
->omap
.end())
544 int MemStore::omap_check_keys(
545 CollectionHandle
& ch
, ///< [in] Collection containing oid
546 const ghobject_t
&oid
, ///< [in] Object containing omap
547 const set
<string
> &keys
, ///< [in] Keys to check
548 set
<string
> *out
///< [out] Subset of keys defined on oid
551 dout(10) << __func__
<< " " << ch
->cid
<< " " << oid
<< dendl
;
552 Collection
*c
= static_cast<Collection
*>(ch
.get());
553 ObjectRef o
= c
->get_object(oid
);
556 std::lock_guard lock
{o
->omap_mutex
};
557 for (set
<string
>::const_iterator p
= keys
.begin();
560 map
<string
,bufferlist
>::iterator q
= o
->omap
.find(*p
);
561 if (q
!= o
->omap
.end())
567 class MemStore::OmapIteratorImpl
: public ObjectMap::ObjectMapIteratorImpl
{
570 map
<string
,bufferlist
>::iterator it
;
572 OmapIteratorImpl(CollectionRef c
, ObjectRef o
)
573 : c(c
), o(o
), it(o
->omap
.begin()) {}
575 int seek_to_first() override
{
576 std::lock_guard lock
{o
->omap_mutex
};
577 it
= o
->omap
.begin();
580 int upper_bound(const string
&after
) override
{
581 std::lock_guard lock
{o
->omap_mutex
};
582 it
= o
->omap
.upper_bound(after
);
585 int lower_bound(const string
&to
) override
{
586 std::lock_guard lock
{o
->omap_mutex
};
587 it
= o
->omap
.lower_bound(to
);
590 bool valid() override
{
591 std::lock_guard lock
{o
->omap_mutex
};
592 return it
!= o
->omap
.end();
594 int next() override
{
595 std::lock_guard lock
{o
->omap_mutex
};
599 string
key() override
{
600 std::lock_guard lock
{o
->omap_mutex
};
603 bufferlist
value() override
{
604 std::lock_guard lock
{o
->omap_mutex
};
607 int status() override
{
612 ObjectMap::ObjectMapIterator
MemStore::get_omap_iterator(
613 CollectionHandle
& ch
,
614 const ghobject_t
& oid
)
616 dout(10) << __func__
<< " " << ch
->cid
<< " " << oid
<< dendl
;
617 Collection
*c
= static_cast<Collection
*>(ch
.get());
618 ObjectRef o
= c
->get_object(oid
);
620 return ObjectMap::ObjectMapIterator();
621 return ObjectMap::ObjectMapIterator(new OmapIteratorImpl(c
, o
));
628 int MemStore::queue_transactions(
629 CollectionHandle
& ch
,
630 vector
<Transaction
>& tls
,
632 ThreadPool::TPHandle
*handle
)
634 // because memstore operations are synchronous, we can implement the
635 // Sequencer with a mutex. this guarantees ordering on a given sequencer,
636 // while allowing operations on different sequencers to happen in parallel
637 Collection
*c
= static_cast<Collection
*>(ch
.get());
638 std::unique_lock lock
{c
->sequencer_mutex
};
640 for (vector
<Transaction
>::iterator p
= tls
.begin(); p
!= tls
.end(); ++p
) {
641 // poke the TPHandle heartbeat just to exercise that code path
643 handle
->reset_tp_timeout();
648 Context
*on_apply
= NULL
, *on_apply_sync
= NULL
, *on_commit
= NULL
;
649 ObjectStore::Transaction::collect_contexts(tls
, &on_apply
, &on_commit
,
652 on_apply_sync
->complete(0);
654 finisher
.queue(on_apply
);
656 finisher
.queue(on_commit
);
660 void MemStore::_do_transaction(Transaction
& t
)
662 Transaction::iterator i
= t
.begin();
665 while (i
.have_op()) {
666 Transaction::Op
*op
= i
.decode_op();
670 case Transaction::OP_NOP
:
672 case Transaction::OP_TOUCH
:
674 coll_t cid
= i
.get_cid(op
->cid
);
675 ghobject_t oid
= i
.get_oid(op
->oid
);
676 r
= _touch(cid
, oid
);
680 case Transaction::OP_WRITE
:
682 coll_t cid
= i
.get_cid(op
->cid
);
683 ghobject_t oid
= i
.get_oid(op
->oid
);
684 uint64_t off
= op
->off
;
685 uint64_t len
= op
->len
;
686 uint32_t fadvise_flags
= i
.get_fadvise_flags();
689 r
= _write(cid
, oid
, off
, len
, bl
, fadvise_flags
);
693 case Transaction::OP_ZERO
:
695 coll_t cid
= i
.get_cid(op
->cid
);
696 ghobject_t oid
= i
.get_oid(op
->oid
);
697 uint64_t off
= op
->off
;
698 uint64_t len
= op
->len
;
699 r
= _zero(cid
, oid
, off
, len
);
703 case Transaction::OP_TRIMCACHE
:
709 case Transaction::OP_TRUNCATE
:
711 coll_t cid
= i
.get_cid(op
->cid
);
712 ghobject_t oid
= i
.get_oid(op
->oid
);
713 uint64_t off
= op
->off
;
714 r
= _truncate(cid
, oid
, off
);
718 case Transaction::OP_REMOVE
:
720 coll_t cid
= i
.get_cid(op
->cid
);
721 ghobject_t oid
= i
.get_oid(op
->oid
);
722 r
= _remove(cid
, oid
);
726 case Transaction::OP_SETATTR
:
728 coll_t cid
= i
.get_cid(op
->cid
);
729 ghobject_t oid
= i
.get_oid(op
->oid
);
730 string name
= i
.decode_string();
733 map
<string
, bufferptr
> to_set
;
734 to_set
[name
] = bufferptr(bl
.c_str(), bl
.length());
735 r
= _setattrs(cid
, oid
, to_set
);
739 case Transaction::OP_SETATTRS
:
741 coll_t cid
= i
.get_cid(op
->cid
);
742 ghobject_t oid
= i
.get_oid(op
->oid
);
743 map
<string
, bufferptr
> aset
;
744 i
.decode_attrset(aset
);
745 r
= _setattrs(cid
, oid
, aset
);
749 case Transaction::OP_RMATTR
:
751 coll_t cid
= i
.get_cid(op
->cid
);
752 ghobject_t oid
= i
.get_oid(op
->oid
);
753 string name
= i
.decode_string();
754 r
= _rmattr(cid
, oid
, name
.c_str());
758 case Transaction::OP_RMATTRS
:
760 coll_t cid
= i
.get_cid(op
->cid
);
761 ghobject_t oid
= i
.get_oid(op
->oid
);
762 r
= _rmattrs(cid
, oid
);
766 case Transaction::OP_CLONE
:
768 coll_t cid
= i
.get_cid(op
->cid
);
769 ghobject_t oid
= i
.get_oid(op
->oid
);
770 ghobject_t noid
= i
.get_oid(op
->dest_oid
);
771 r
= _clone(cid
, oid
, noid
);
775 case Transaction::OP_CLONERANGE
:
777 coll_t cid
= i
.get_cid(op
->cid
);
778 ghobject_t oid
= i
.get_oid(op
->oid
);
779 ghobject_t noid
= i
.get_oid(op
->dest_oid
);
780 uint64_t off
= op
->off
;
781 uint64_t len
= op
->len
;
782 r
= _clone_range(cid
, oid
, noid
, off
, len
, off
);
786 case Transaction::OP_CLONERANGE2
:
788 coll_t cid
= i
.get_cid(op
->cid
);
789 ghobject_t oid
= i
.get_oid(op
->oid
);
790 ghobject_t noid
= i
.get_oid(op
->dest_oid
);
791 uint64_t srcoff
= op
->off
;
792 uint64_t len
= op
->len
;
793 uint64_t dstoff
= op
->dest_off
;
794 r
= _clone_range(cid
, oid
, noid
, srcoff
, len
, dstoff
);
798 case Transaction::OP_MKCOLL
:
800 coll_t cid
= i
.get_cid(op
->cid
);
801 r
= _create_collection(cid
, op
->split_bits
);
805 case Transaction::OP_COLL_HINT
:
807 coll_t cid
= i
.get_cid(op
->cid
);
808 uint32_t type
= op
->hint_type
;
811 auto hiter
= hint
.cbegin();
812 if (type
== Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS
) {
815 decode(pg_num
, hiter
);
816 decode(num_objs
, hiter
);
817 r
= _collection_hint_expected_num_objs(cid
, pg_num
, num_objs
);
820 dout(10) << "Unrecognized collection hint type: " << type
<< dendl
;
825 case Transaction::OP_RMCOLL
:
827 coll_t cid
= i
.get_cid(op
->cid
);
828 r
= _destroy_collection(cid
);
832 case Transaction::OP_COLL_ADD
:
834 coll_t ocid
= i
.get_cid(op
->cid
);
835 coll_t ncid
= i
.get_cid(op
->dest_cid
);
836 ghobject_t oid
= i
.get_oid(op
->oid
);
837 r
= _collection_add(ncid
, ocid
, oid
);
841 case Transaction::OP_COLL_REMOVE
:
843 coll_t cid
= i
.get_cid(op
->cid
);
844 ghobject_t oid
= i
.get_oid(op
->oid
);
845 r
= _remove(cid
, oid
);
849 case Transaction::OP_COLL_MOVE
:
850 ceph_abort_msg("deprecated");
853 case Transaction::OP_COLL_MOVE_RENAME
:
855 coll_t oldcid
= i
.get_cid(op
->cid
);
856 ghobject_t oldoid
= i
.get_oid(op
->oid
);
857 coll_t newcid
= i
.get_cid(op
->dest_cid
);
858 ghobject_t newoid
= i
.get_oid(op
->dest_oid
);
859 r
= _collection_move_rename(oldcid
, oldoid
, newcid
, newoid
);
865 case Transaction::OP_TRY_RENAME
:
867 coll_t cid
= i
.get_cid(op
->cid
);
868 ghobject_t oldoid
= i
.get_oid(op
->oid
);
869 ghobject_t newoid
= i
.get_oid(op
->dest_oid
);
870 r
= _collection_move_rename(cid
, oldoid
, cid
, newoid
);
876 case Transaction::OP_COLL_SETATTR
:
878 ceph_abort_msg("not implemented");
882 case Transaction::OP_COLL_RMATTR
:
884 ceph_abort_msg("not implemented");
888 case Transaction::OP_COLL_RENAME
:
890 ceph_abort_msg("not implemented");
894 case Transaction::OP_OMAP_CLEAR
:
896 coll_t cid
= i
.get_cid(op
->cid
);
897 ghobject_t oid
= i
.get_oid(op
->oid
);
898 r
= _omap_clear(cid
, oid
);
901 case Transaction::OP_OMAP_SETKEYS
:
903 coll_t cid
= i
.get_cid(op
->cid
);
904 ghobject_t oid
= i
.get_oid(op
->oid
);
906 i
.decode_attrset_bl(&aset_bl
);
907 r
= _omap_setkeys(cid
, oid
, aset_bl
);
910 case Transaction::OP_OMAP_RMKEYS
:
912 coll_t cid
= i
.get_cid(op
->cid
);
913 ghobject_t oid
= i
.get_oid(op
->oid
);
915 i
.decode_keyset_bl(&keys_bl
);
916 r
= _omap_rmkeys(cid
, oid
, keys_bl
);
919 case Transaction::OP_OMAP_RMKEYRANGE
:
921 coll_t cid
= i
.get_cid(op
->cid
);
922 ghobject_t oid
= i
.get_oid(op
->oid
);
924 first
= i
.decode_string();
925 last
= i
.decode_string();
926 r
= _omap_rmkeyrange(cid
, oid
, first
, last
);
929 case Transaction::OP_OMAP_SETHEADER
:
931 coll_t cid
= i
.get_cid(op
->cid
);
932 ghobject_t oid
= i
.get_oid(op
->oid
);
935 r
= _omap_setheader(cid
, oid
, bl
);
938 case Transaction::OP_SPLIT_COLLECTION
:
939 ceph_abort_msg("deprecated");
941 case Transaction::OP_SPLIT_COLLECTION2
:
943 coll_t cid
= i
.get_cid(op
->cid
);
944 uint32_t bits
= op
->split_bits
;
945 uint32_t rem
= op
->split_rem
;
946 coll_t dest
= i
.get_cid(op
->dest_cid
);
947 r
= _split_collection(cid
, bits
, rem
, dest
);
950 case Transaction::OP_MERGE_COLLECTION
:
952 coll_t cid
= i
.get_cid(op
->cid
);
953 uint32_t bits
= op
->split_bits
;
954 coll_t dest
= i
.get_cid(op
->dest_cid
);
955 r
= _merge_collection(cid
, bits
, dest
);
959 case Transaction::OP_SETALLOCHINT
:
965 case Transaction::OP_COLL_SET_BITS
:
972 derr
<< "bad op " << op
->op
<< dendl
;
979 if (r
== -ENOENT
&& !(op
->op
== Transaction::OP_CLONERANGE
||
980 op
->op
== Transaction::OP_CLONE
||
981 op
->op
== Transaction::OP_CLONERANGE2
||
982 op
->op
== Transaction::OP_COLL_ADD
))
983 // -ENOENT is usually okay
989 const char *msg
= "unexpected error code";
991 if (r
== -ENOENT
&& (op
->op
== Transaction::OP_CLONERANGE
||
992 op
->op
== Transaction::OP_CLONE
||
993 op
->op
== Transaction::OP_CLONERANGE2
))
994 msg
= "ENOENT on clone suggests osd bug";
997 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
998 // by partially applying transactions.
999 msg
= "ENOSPC from MemStore, misconfigured cluster or insufficient memory";
1001 if (r
== -ENOTEMPTY
) {
1002 msg
= "ENOTEMPTY suggests garbage data in osd data dir";
1006 derr
<< " error " << cpp_strerror(r
) << " not handled on operation " << op
->op
1007 << " (op " << pos
<< ", counting from 0)" << dendl
;
1008 dout(0) << msg
<< dendl
;
1009 dout(0) << " transaction dump:\n";
1010 JSONFormatter
f(true);
1011 f
.open_object_section("transaction");
1016 ceph_abort_msg("unexpected error");
1024 int MemStore::_touch(const coll_t
& cid
, const ghobject_t
& oid
)
1026 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1027 CollectionRef c
= get_collection(cid
);
1031 c
->get_or_create_object(oid
);
1035 int MemStore::_write(const coll_t
& cid
, const ghobject_t
& oid
,
1036 uint64_t offset
, size_t len
, const bufferlist
& bl
,
1037 uint32_t fadvise_flags
)
1039 dout(10) << __func__
<< " " << cid
<< " " << oid
<< " "
1040 << offset
<< "~" << len
<< dendl
;
1041 ceph_assert(len
== bl
.length());
1043 CollectionRef c
= get_collection(cid
);
1047 ObjectRef o
= c
->get_or_create_object(oid
);
1049 const ssize_t old_size
= o
->get_size();
1050 o
->write(offset
, bl
);
1051 used_bytes
+= (o
->get_size() - old_size
);
1057 int MemStore::_zero(const coll_t
& cid
, const ghobject_t
& oid
,
1058 uint64_t offset
, size_t len
)
1060 dout(10) << __func__
<< " " << cid
<< " " << oid
<< " " << offset
<< "~"
1063 bl
.append_zero(len
);
1064 return _write(cid
, oid
, offset
, len
, bl
);
1067 int MemStore::_truncate(const coll_t
& cid
, const ghobject_t
& oid
, uint64_t size
)
1069 dout(10) << __func__
<< " " << cid
<< " " << oid
<< " " << size
<< dendl
;
1070 CollectionRef c
= get_collection(cid
);
1074 ObjectRef o
= c
->get_object(oid
);
1077 const ssize_t old_size
= o
->get_size();
1078 int r
= o
->truncate(size
);
1079 used_bytes
+= (o
->get_size() - old_size
);
1083 int MemStore::_remove(const coll_t
& cid
, const ghobject_t
& oid
)
1085 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1086 CollectionRef c
= get_collection(cid
);
1089 std::lock_guard l
{c
->lock
};
1091 auto i
= c
->object_hash
.find(oid
);
1092 if (i
== c
->object_hash
.end())
1094 used_bytes
-= i
->second
->get_size();
1095 c
->object_hash
.erase(i
);
1096 c
->object_map
.erase(oid
);
1101 int MemStore::_setattrs(const coll_t
& cid
, const ghobject_t
& oid
,
1102 map
<string
,bufferptr
>& aset
)
1104 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1105 CollectionRef c
= get_collection(cid
);
1109 ObjectRef o
= c
->get_object(oid
);
1112 std::lock_guard lock
{o
->xattr_mutex
};
1113 for (map
<string
,bufferptr
>::const_iterator p
= aset
.begin(); p
!= aset
.end(); ++p
)
1114 o
->xattr
[p
->first
] = p
->second
;
1118 int MemStore::_rmattr(const coll_t
& cid
, const ghobject_t
& oid
, const char *name
)
1120 dout(10) << __func__
<< " " << cid
<< " " << oid
<< " " << name
<< dendl
;
1121 CollectionRef c
= get_collection(cid
);
1125 ObjectRef o
= c
->get_object(oid
);
1128 std::lock_guard lock
{o
->xattr_mutex
};
1129 auto i
= o
->xattr
.find(name
);
1130 if (i
== o
->xattr
.end())
1136 int MemStore::_rmattrs(const coll_t
& cid
, const ghobject_t
& oid
)
1138 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1139 CollectionRef c
= get_collection(cid
);
1143 ObjectRef o
= c
->get_object(oid
);
1146 std::lock_guard lock
{o
->xattr_mutex
};
1151 int MemStore::_clone(const coll_t
& cid
, const ghobject_t
& oldoid
,
1152 const ghobject_t
& newoid
)
1154 dout(10) << __func__
<< " " << cid
<< " " << oldoid
1155 << " -> " << newoid
<< dendl
;
1156 CollectionRef c
= get_collection(cid
);
1160 ObjectRef oo
= c
->get_object(oldoid
);
1163 ObjectRef no
= c
->get_or_create_object(newoid
);
1164 used_bytes
+= oo
->get_size() - no
->get_size();
1165 no
->clone(oo
.get(), 0, oo
->get_size(), 0);
1167 // take xattr and omap locks with std::lock()
1168 std::scoped_lock l
{oo
->xattr_mutex
,
1173 no
->omap_header
= oo
->omap_header
;
1174 no
->omap
= oo
->omap
;
1175 no
->xattr
= oo
->xattr
;
1179 int MemStore::_clone_range(const coll_t
& cid
, const ghobject_t
& oldoid
,
1180 const ghobject_t
& newoid
,
1181 uint64_t srcoff
, uint64_t len
, uint64_t dstoff
)
1183 dout(10) << __func__
<< " " << cid
<< " "
1184 << oldoid
<< " " << srcoff
<< "~" << len
<< " -> "
1185 << newoid
<< " " << dstoff
<< "~" << len
1187 CollectionRef c
= get_collection(cid
);
1191 ObjectRef oo
= c
->get_object(oldoid
);
1194 ObjectRef no
= c
->get_or_create_object(newoid
);
1195 if (srcoff
>= oo
->get_size())
1197 if (srcoff
+ len
>= oo
->get_size())
1198 len
= oo
->get_size() - srcoff
;
1200 const ssize_t old_size
= no
->get_size();
1201 no
->clone(oo
.get(), srcoff
, len
, dstoff
);
1202 used_bytes
+= (no
->get_size() - old_size
);
1207 int MemStore::_omap_clear(const coll_t
& cid
, const ghobject_t
&oid
)
1209 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1210 CollectionRef c
= get_collection(cid
);
1214 ObjectRef o
= c
->get_object(oid
);
1217 std::lock_guard lock
{o
->omap_mutex
};
1219 o
->omap_header
.clear();
1223 int MemStore::_omap_setkeys(const coll_t
& cid
, const ghobject_t
&oid
,
1224 bufferlist
& aset_bl
)
1226 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1227 CollectionRef c
= get_collection(cid
);
1231 ObjectRef o
= c
->get_object(oid
);
1234 std::lock_guard lock
{o
->omap_mutex
};
1235 auto p
= aset_bl
.cbegin();
1241 decode(o
->omap
[key
], p
);
1246 int MemStore::_omap_rmkeys(const coll_t
& cid
, const ghobject_t
&oid
,
1247 bufferlist
& keys_bl
)
1249 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1250 CollectionRef c
= get_collection(cid
);
1254 ObjectRef o
= c
->get_object(oid
);
1257 std::lock_guard lock
{o
->omap_mutex
};
1258 auto p
= keys_bl
.cbegin();
1269 int MemStore::_omap_rmkeyrange(const coll_t
& cid
, const ghobject_t
&oid
,
1270 const string
& first
, const string
& last
)
1272 dout(10) << __func__
<< " " << cid
<< " " << oid
<< " " << first
1273 << " " << last
<< dendl
;
1274 CollectionRef c
= get_collection(cid
);
1278 ObjectRef o
= c
->get_object(oid
);
1281 std::lock_guard lock
{o
->omap_mutex
};
1282 map
<string
,bufferlist
>::iterator p
= o
->omap
.lower_bound(first
);
1283 map
<string
,bufferlist
>::iterator e
= o
->omap
.lower_bound(last
);
1284 o
->omap
.erase(p
, e
);
1288 int MemStore::_omap_setheader(const coll_t
& cid
, const ghobject_t
&oid
,
1289 const bufferlist
&bl
)
1291 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1292 CollectionRef c
= get_collection(cid
);
1296 ObjectRef o
= c
->get_object(oid
);
1299 std::lock_guard lock
{o
->omap_mutex
};
1300 o
->omap_header
= bl
;
1304 int MemStore::_create_collection(const coll_t
& cid
, int bits
)
1306 dout(10) << __func__
<< " " << cid
<< dendl
;
1307 std::lock_guard l
{coll_lock
};
1308 auto result
= coll_map
.insert(std::make_pair(cid
, CollectionRef()));
1311 auto p
= new_coll_map
.find(cid
);
1312 ceph_assert(p
!= new_coll_map
.end());
1313 result
.first
->second
= p
->second
;
1314 result
.first
->second
->bits
= bits
;
1315 new_coll_map
.erase(p
);
1319 int MemStore::_destroy_collection(const coll_t
& cid
)
1321 dout(10) << __func__
<< " " << cid
<< dendl
;
1322 std::lock_guard l
{coll_lock
};
1323 ceph::unordered_map
<coll_t
,CollectionRef
>::iterator cp
= coll_map
.find(cid
);
1324 if (cp
== coll_map
.end())
1327 std::shared_lock l2
{cp
->second
->lock
};
1328 if (!cp
->second
->object_map
.empty())
1330 cp
->second
->exists
= false;
1332 used_bytes
-= cp
->second
->used_bytes();
1337 int MemStore::_collection_add(const coll_t
& cid
, const coll_t
& ocid
, const ghobject_t
& oid
)
1339 dout(10) << __func__
<< " " << cid
<< " " << ocid
<< " " << oid
<< dendl
;
1340 CollectionRef c
= get_collection(cid
);
1343 CollectionRef oc
= get_collection(ocid
);
1347 std::scoped_lock l
{std::min(&(*c
), &(*oc
))->lock
,
1348 std::max(&(*c
), &(*oc
))->lock
};
1350 if (c
->object_hash
.count(oid
))
1352 if (oc
->object_hash
.count(oid
) == 0)
1354 ObjectRef o
= oc
->object_hash
[oid
];
1355 c
->object_map
[oid
] = o
;
1356 c
->object_hash
[oid
] = o
;
1360 int MemStore::_collection_move_rename(const coll_t
& oldcid
, const ghobject_t
& oldoid
,
1361 coll_t cid
, const ghobject_t
& oid
)
1363 dout(10) << __func__
<< " " << oldcid
<< " " << oldoid
<< " -> "
1364 << cid
<< " " << oid
<< dendl
;
1365 CollectionRef c
= get_collection(cid
);
1368 CollectionRef oc
= get_collection(oldcid
);
1372 // note: c and oc may be the same
1373 ceph_assert(&(*c
) == &(*oc
));
1375 std::lock_guard l
{c
->lock
};
1376 if (c
->object_hash
.count(oid
))
1378 if (oc
->object_hash
.count(oldoid
) == 0)
1381 ObjectRef o
= oc
->object_hash
[oldoid
];
1382 c
->object_map
[oid
] = o
;
1383 c
->object_hash
[oid
] = o
;
1384 oc
->object_map
.erase(oldoid
);
1385 oc
->object_hash
.erase(oldoid
);
1390 int MemStore::_split_collection(const coll_t
& cid
, uint32_t bits
, uint32_t match
,
1393 dout(10) << __func__
<< " " << cid
<< " " << bits
<< " " << match
<< " "
1395 CollectionRef sc
= get_collection(cid
);
1398 CollectionRef dc
= get_collection(dest
);
1402 std::scoped_lock l
{std::min(&(*sc
), &(*dc
))->lock
,
1403 std::max(&(*sc
), &(*dc
))->lock
};
1405 map
<ghobject_t
,ObjectRef
>::iterator p
= sc
->object_map
.begin();
1406 while (p
!= sc
->object_map
.end()) {
1407 if (p
->first
.match(bits
, match
)) {
1408 dout(20) << " moving " << p
->first
<< dendl
;
1409 dc
->object_map
.insert(make_pair(p
->first
, p
->second
));
1410 dc
->object_hash
.insert(make_pair(p
->first
, p
->second
));
1411 sc
->object_hash
.erase(p
->first
);
1412 sc
->object_map
.erase(p
++);
1419 ceph_assert(dc
->bits
== (int)bits
);
1424 int MemStore::_merge_collection(const coll_t
& cid
, uint32_t bits
, coll_t dest
)
1426 dout(10) << __func__
<< " " << cid
<< " " << bits
<< " "
1428 CollectionRef sc
= get_collection(cid
);
1431 CollectionRef dc
= get_collection(dest
);
1435 std::scoped_lock l
{std::min(&(*sc
), &(*dc
))->lock
,
1436 std::max(&(*sc
), &(*dc
))->lock
};
1438 map
<ghobject_t
,ObjectRef
>::iterator p
= sc
->object_map
.begin();
1439 while (p
!= sc
->object_map
.end()) {
1440 dout(20) << " moving " << p
->first
<< dendl
;
1441 dc
->object_map
.insert(make_pair(p
->first
, p
->second
));
1442 dc
->object_hash
.insert(make_pair(p
->first
, p
->second
));
1443 sc
->object_hash
.erase(p
->first
);
1444 sc
->object_map
.erase(p
++);
1451 std::lock_guard l
{coll_lock
};
1452 ceph::unordered_map
<coll_t
,CollectionRef
>::iterator cp
= coll_map
.find(cid
);
1453 ceph_assert(cp
!= coll_map
.end());
1454 used_bytes
-= cp
->second
->used_bytes();
1462 struct BufferlistObject
: public MemStore::Object
{
1463 ceph::spinlock mutex
;
1466 size_t get_size() const override
{ return data
.length(); }
1468 int read(uint64_t offset
, uint64_t len
, bufferlist
&bl
) override
;
1469 int write(uint64_t offset
, const bufferlist
&bl
) override
;
1470 int clone(Object
*src
, uint64_t srcoff
, uint64_t len
,
1471 uint64_t dstoff
) override
;
1472 int truncate(uint64_t offset
) override
;
1474 void encode(bufferlist
& bl
) const override
{
1475 ENCODE_START(1, 1, bl
);
1480 void decode(bufferlist::const_iterator
& p
) override
{
1489 int BufferlistObject::read(uint64_t offset
, uint64_t len
,
1492 std::lock_guard
<decltype(mutex
)> lock(mutex
);
1493 bl
.substr_of(data
, offset
, len
);
1497 int BufferlistObject::write(uint64_t offset
, const bufferlist
&src
)
1499 unsigned len
= src
.length();
1501 std::lock_guard
<decltype(mutex
)> lock(mutex
);
1505 if (get_size() >= offset
) {
1506 newdata
.substr_of(data
, 0, offset
);
1509 newdata
.substr_of(data
, 0, get_size());
1511 newdata
.append_zero(offset
- get_size());
1514 newdata
.append(src
);
1517 if (get_size() > offset
+ len
) {
1519 tail
.substr_of(data
, offset
+ len
, get_size() - (offset
+ len
));
1520 newdata
.append(tail
);
1523 data
.claim(newdata
);
1527 int BufferlistObject::clone(Object
*src
, uint64_t srcoff
,
1528 uint64_t len
, uint64_t dstoff
)
1530 auto srcbl
= dynamic_cast<BufferlistObject
*>(src
);
1531 if (srcbl
== nullptr)
1536 std::lock_guard
<decltype(srcbl
->mutex
)> lock(srcbl
->mutex
);
1537 if (srcoff
== dstoff
&& len
== src
->get_size()) {
1541 bl
.substr_of(srcbl
->data
, srcoff
, len
);
1543 return write(dstoff
, bl
);
1546 int BufferlistObject::truncate(uint64_t size
)
1548 std::lock_guard
<decltype(mutex
)> lock(mutex
);
1549 if (get_size() > size
) {
1551 bl
.substr_of(data
, 0, size
);
1553 } else if (get_size() == size
) {
1556 data
.append_zero(size
- get_size());
1563 struct MemStore::PageSetObject
: public Object
{
1566 #if defined(__GLIBCXX__)
1567 // use a thread-local vector for the pages returned by PageSet, so we
1568 // can avoid allocations in read/write()
1569 static thread_local
PageSet::page_vector tls_pages
;
1572 explicit PageSetObject(size_t page_size
) : data(page_size
), data_len(0) {}
1574 size_t get_size() const override
{ return data_len
; }
1576 int read(uint64_t offset
, uint64_t len
, bufferlist
&bl
) override
;
1577 int write(uint64_t offset
, const bufferlist
&bl
) override
;
1578 int clone(Object
*src
, uint64_t srcoff
, uint64_t len
,
1579 uint64_t dstoff
) override
;
1580 int truncate(uint64_t offset
) override
;
1582 void encode(bufferlist
& bl
) const override
{
1583 ENCODE_START(1, 1, bl
);
1584 encode(data_len
, bl
);
1589 void decode(bufferlist::const_iterator
& p
) override
{
1591 decode(data_len
, p
);
1598 #if defined(__GLIBCXX__)
1599 // use a thread-local vector for the pages returned by PageSet, so we
1600 // can avoid allocations in read/write()
1601 thread_local
PageSet::page_vector
MemStore::PageSetObject::tls_pages
;
1602 #define DEFINE_PAGE_VECTOR(name)
1604 #define DEFINE_PAGE_VECTOR(name) PageSet::page_vector name;
1607 int MemStore::PageSetObject::read(uint64_t offset
, uint64_t len
, bufferlist
& bl
)
1609 const auto start
= offset
;
1610 const auto end
= offset
+ len
;
1611 auto remaining
= len
;
1613 DEFINE_PAGE_VECTOR(tls_pages
);
1614 data
.get_range(offset
, len
, tls_pages
);
1616 // allocate a buffer for the data
1617 buffer::ptr
buf(len
);
1619 auto p
= tls_pages
.begin();
1621 // no more pages in range
1622 if (p
== tls_pages
.end() || (*p
)->offset
>= end
) {
1623 buf
.zero(offset
- start
, remaining
);
1628 // fill any holes between pages with zeroes
1629 if (page
->offset
> offset
) {
1630 const auto count
= std::min(remaining
, page
->offset
- offset
);
1631 buf
.zero(offset
- start
, count
);
1633 offset
= page
->offset
;
1639 const auto page_offset
= offset
- page
->offset
;
1640 const auto count
= min(remaining
, data
.get_page_size() - page_offset
);
1642 buf
.copy_in(offset
- start
, count
, page
->data
+ page_offset
);
1650 tls_pages
.clear(); // drop page refs
1652 bl
.append(std::move(buf
));
1656 int MemStore::PageSetObject::write(uint64_t offset
, const bufferlist
&src
)
1658 unsigned len
= src
.length();
1660 DEFINE_PAGE_VECTOR(tls_pages
);
1661 // make sure the page range is allocated
1662 data
.alloc_range(offset
, src
.length(), tls_pages
);
1664 auto page
= tls_pages
.begin();
1666 auto p
= src
.begin();
1668 unsigned page_offset
= offset
- (*page
)->offset
;
1669 unsigned pageoff
= data
.get_page_size() - page_offset
;
1670 unsigned count
= min(len
, pageoff
);
1671 p
.copy(count
, (*page
)->data
+ page_offset
);
1674 if (count
== pageoff
)
1677 if (data_len
< offset
)
1679 tls_pages
.clear(); // drop page refs
1683 int MemStore::PageSetObject::clone(Object
*src
, uint64_t srcoff
,
1684 uint64_t len
, uint64_t dstoff
)
1686 const int64_t delta
= dstoff
- srcoff
;
1688 auto &src_data
= static_cast<PageSetObject
*>(src
)->data
;
1689 const uint64_t src_page_size
= src_data
.get_page_size();
1691 auto &dst_data
= data
;
1692 const auto dst_page_size
= dst_data
.get_page_size();
1694 DEFINE_PAGE_VECTOR(tls_pages
);
1695 PageSet::page_vector dst_pages
;
1698 // limit to 16 pages at a time so tls_pages doesn't balloon in size
1699 auto count
= std::min(len
, (uint64_t)src_page_size
* 16);
1700 src_data
.get_range(srcoff
, count
, tls_pages
);
1702 // allocate the destination range
1703 // TODO: avoid allocating pages for holes in the source range
1704 dst_data
.alloc_range(srcoff
+ delta
, count
, dst_pages
);
1705 auto dst_iter
= dst_pages
.begin();
1707 for (auto &src_page
: tls_pages
) {
1708 auto sbegin
= std::max(srcoff
, src_page
->offset
);
1709 auto send
= std::min(srcoff
+ count
, src_page
->offset
+ src_page_size
);
1711 // zero-fill holes before src_page
1712 if (srcoff
< sbegin
) {
1713 while (dst_iter
!= dst_pages
.end()) {
1714 auto &dst_page
= *dst_iter
;
1715 auto dbegin
= std::max(srcoff
+ delta
, dst_page
->offset
);
1716 auto dend
= std::min(sbegin
+ delta
, dst_page
->offset
+ dst_page_size
);
1717 std::fill(dst_page
->data
+ dbegin
- dst_page
->offset
,
1718 dst_page
->data
+ dend
- dst_page
->offset
, 0);
1719 if (dend
< dst_page
->offset
+ dst_page_size
)
1723 const auto c
= sbegin
- srcoff
;
1728 // copy data from src page to dst pages
1729 while (dst_iter
!= dst_pages
.end()) {
1730 auto &dst_page
= *dst_iter
;
1731 auto dbegin
= std::max(sbegin
+ delta
, dst_page
->offset
);
1732 auto dend
= std::min(send
+ delta
, dst_page
->offset
+ dst_page_size
);
1734 std::copy(src_page
->data
+ (dbegin
- delta
) - src_page
->offset
,
1735 src_page
->data
+ (dend
- delta
) - src_page
->offset
,
1736 dst_page
->data
+ dbegin
- dst_page
->offset
);
1737 if (dend
< dst_page
->offset
+ dst_page_size
)
1742 const auto c
= send
- sbegin
;
1746 dstoff
= send
+ delta
;
1748 tls_pages
.clear(); // drop page refs
1750 // zero-fill holes after the last src_page
1752 while (dst_iter
!= dst_pages
.end()) {
1753 auto &dst_page
= *dst_iter
;
1754 auto dbegin
= std::max(dstoff
, dst_page
->offset
);
1755 auto dend
= std::min(dstoff
+ count
, dst_page
->offset
+ dst_page_size
);
1756 std::fill(dst_page
->data
+ dbegin
- dst_page
->offset
,
1757 dst_page
->data
+ dend
- dst_page
->offset
, 0);
1764 dst_pages
.clear(); // drop page refs
1767 // update object size
1768 if (data_len
< dstoff
)
1773 int MemStore::PageSetObject::truncate(uint64_t size
)
1775 data
.free_pages_after(size
);
1778 const auto page_size
= data
.get_page_size();
1779 const auto page_offset
= size
& ~(page_size
-1);
1780 if (page_offset
== size
)
1783 DEFINE_PAGE_VECTOR(tls_pages
);
1784 // write zeroes to the rest of the last page
1785 data
.get_range(page_offset
, page_size
, tls_pages
);
1786 if (tls_pages
.empty())
1789 auto page
= tls_pages
.begin();
1790 auto data
= (*page
)->data
;
1791 std::fill(data
+ (size
- page_offset
), data
+ page_size
, 0);
1792 tls_pages
.clear(); // drop page ref
1797 MemStore::ObjectRef
MemStore::Collection::create_object() const {
1799 return new PageSetObject(cct
->_conf
->memstore_page_size
);
1800 return new BufferlistObject();