1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #ifdef HAVE_SYS_MOUNT_H
17 #include <sys/mount.h>
20 #ifdef HAVE_SYS_PARAM_H
21 #include <sys/param.h>
24 #include "include/types.h"
25 #include "include/stringify.h"
26 #include "include/unordered_map.h"
27 #include "common/errno.h"
29 #include "include/compat.h"
31 #define dout_context cct
32 #define dout_subsys ceph_subsys_filestore
34 #define dout_prefix *_dout << "memstore(" << path << ") "
39 // for comparing collections for lock ordering
40 bool operator>(const MemStore::CollectionRef
& l
,
41 const MemStore::CollectionRef
& r
)
43 return (unsigned long)l
.get() > (unsigned long)r
.get();
56 int MemStore::umount()
58 finisher
.wait_for_empty();
65 dout(10) << __func__
<< dendl
;
67 std::set
<coll_t
> collections
;
68 for (auto p
= coll_map
.begin(); p
!= coll_map
.end(); ++p
) {
69 dout(20) << __func__
<< " coll " << p
->first
<< " " << p
->second
<< dendl
;
70 collections
.insert(p
->first
);
71 ceph::buffer::list bl
;
72 ceph_assert(p
->second
);
73 p
->second
->encode(bl
);
74 std::string fn
= path
+ "/" + stringify(p
->first
);
75 int r
= bl
.write_file(fn
.c_str());
80 std::string fn
= path
+ "/collections";
81 ceph::buffer::list bl
;
82 encode(collections
, bl
);
83 int r
= bl
.write_file(fn
.c_str());
90 void MemStore::dump_all()
92 auto f
= ceph::Formatter::create("json-pretty");
93 f
->open_object_section("store");
102 void MemStore::dump(ceph::Formatter
*f
)
104 f
->open_array_section("collections");
105 for (auto p
= coll_map
.begin(); p
!= coll_map
.end(); ++p
) {
106 f
->open_object_section("collection");
107 f
->dump_string("name", stringify(p
->first
));
109 f
->open_array_section("xattrs");
110 for (auto q
= p
->second
->xattr
.begin();
111 q
!= p
->second
->xattr
.end();
113 f
->open_object_section("xattr");
114 f
->dump_string("name", q
->first
);
115 f
->dump_int("length", q
->second
.length());
120 f
->open_array_section("objects");
121 for (auto q
= p
->second
->object_map
.begin();
122 q
!= p
->second
->object_map
.end();
124 f
->open_object_section("object");
125 f
->dump_string("name", stringify(q
->first
));
137 int MemStore::_load()
139 dout(10) << __func__
<< dendl
;
140 ceph::buffer::list bl
;
141 std::string fn
= path
+ "/collections";
143 int r
= bl
.read_file(fn
.c_str(), &err
);
147 std::set
<coll_t
> collections
;
148 auto p
= bl
.cbegin();
149 decode(collections
, p
);
151 for (auto q
= collections
.begin();
152 q
!= collections
.end();
154 std::string fn
= path
+ "/" + stringify(*q
);
155 ceph::buffer::list cbl
;
156 int r
= cbl
.read_file(fn
.c_str(), &err
);
159 auto c
= ceph::make_ref
<Collection
>(cct
, *q
);
160 auto p
= cbl
.cbegin();
163 used_bytes
+= c
->used_bytes();
171 void MemStore::set_fsid(uuid_d u
)
173 int r
= write_meta("fsid", stringify(u
));
177 uuid_d
MemStore::get_fsid()
179 std::string fsid_str
;
180 int r
= read_meta("fsid", &fsid_str
);
183 bool b
= uuid
.parse(fsid_str
.c_str());
190 std::string fsid_str
;
191 int r
= read_meta("fsid", &fsid_str
);
194 fsid
.generate_random();
195 fsid_str
= stringify(fsid
);
196 r
= write_meta("fsid", fsid_str
);
199 dout(1) << __func__
<< " new fsid " << fsid_str
<< dendl
;
203 dout(1) << __func__
<< " had fsid " << fsid_str
<< dendl
;
206 std::string fn
= path
+ "/collections";
207 derr
<< path
<< dendl
;
208 ceph::buffer::list bl
;
209 std::set
<coll_t
> collections
;
210 encode(collections
, bl
);
211 r
= bl
.write_file(fn
.c_str());
215 r
= write_meta("type", "memstore");
222 int MemStore::statfs(struct store_statfs_t
*st
, osd_alert_list_t
* alerts
)
224 dout(10) << __func__
<< dendl
;
226 alerts
->clear(); // returns nothing for now
229 st
->total
= cct
->_conf
->memstore_device_bytes
;
230 st
->available
= std::max
<int64_t>(st
->total
- used_bytes
, 0);
231 dout(10) << __func__
<< ": used_bytes: " << used_bytes
232 << "/" << cct
->_conf
->memstore_device_bytes
<< dendl
;
236 int MemStore::pool_statfs(uint64_t pool_id
, struct store_statfs_t
*buf
,
242 objectstore_perf_stat_t
MemStore::get_cur_stats()
245 return objectstore_perf_stat_t();
248 MemStore::CollectionRef
MemStore::get_collection(const coll_t
& cid
)
250 std::shared_lock l
{coll_lock
};
251 ceph::unordered_map
<coll_t
,CollectionRef
>::iterator cp
= coll_map
.find(cid
);
252 if (cp
== coll_map
.end())
253 return CollectionRef();
257 ObjectStore::CollectionHandle
MemStore::create_new_collection(const coll_t
& cid
)
259 std::lock_guard l
{coll_lock
};
260 auto c
= ceph::make_ref
<Collection
>(cct
, cid
);
261 new_coll_map
[cid
] = c
;
269 bool MemStore::exists(CollectionHandle
&c_
, const ghobject_t
& oid
)
271 Collection
*c
= static_cast<Collection
*>(c_
.get());
272 dout(10) << __func__
<< " " << c
->get_cid() << " " << oid
<< dendl
;
276 // Perform equivalent of c->get_object_(oid) != NULL. In C++11 the
277 // shared_ptr needs to be compared to nullptr.
278 return (bool)c
->get_object(oid
);
282 CollectionHandle
&c_
,
283 const ghobject_t
& oid
,
287 Collection
*c
= static_cast<Collection
*>(c_
.get());
288 dout(10) << __func__
<< " " << c
->cid
<< " " << oid
<< dendl
;
291 ObjectRef o
= c
->get_object(oid
);
294 st
->st_size
= o
->get_size();
295 st
->st_blksize
= 4096;
296 st
->st_blocks
= (st
->st_size
+ st
->st_blksize
- 1) / st
->st_blksize
;
301 int MemStore::set_collection_opts(
302 CollectionHandle
& ch
,
303 const pool_opts_t
& opts
)
309 CollectionHandle
&c_
,
310 const ghobject_t
& oid
,
313 ceph::buffer::list
& bl
,
316 Collection
*c
= static_cast<Collection
*>(c_
.get());
317 dout(10) << __func__
<< " " << c
->cid
<< " " << oid
<< " "
318 << offset
<< "~" << len
<< dendl
;
321 ObjectRef o
= c
->get_object(oid
);
324 if (offset
>= o
->get_size())
327 if (l
== 0 && offset
== 0) // note: len == 0 means read the entire object
329 else if (offset
+ l
> o
->get_size())
330 l
= o
->get_size() - offset
;
332 return o
->read(offset
, l
, bl
);
335 int MemStore::fiemap(CollectionHandle
& ch
, const ghobject_t
& oid
,
336 uint64_t offset
, size_t len
, ceph::buffer::list
& bl
)
338 std::map
<uint64_t, uint64_t> destmap
;
339 int r
= fiemap(ch
, oid
, offset
, len
, destmap
);
345 int MemStore::fiemap(CollectionHandle
& ch
, const ghobject_t
& oid
,
346 uint64_t offset
, size_t len
, std::map
<uint64_t, uint64_t>& destmap
)
348 dout(10) << __func__
<< " " << ch
->cid
<< " " << oid
<< " " << offset
<< "~"
350 Collection
*c
= static_cast<Collection
*>(ch
.get());
354 ObjectRef o
= c
->get_object(oid
);
358 if (offset
+ l
> o
->get_size())
359 l
= o
->get_size() - offset
;
360 if (offset
>= o
->get_size())
367 int MemStore::getattr(CollectionHandle
&c_
, const ghobject_t
& oid
,
368 const char *name
, ceph::buffer::ptr
& value
)
370 Collection
*c
= static_cast<Collection
*>(c_
.get());
371 dout(10) << __func__
<< " " << c
->cid
<< " " << oid
<< " " << name
<< dendl
;
374 ObjectRef o
= c
->get_object(oid
);
378 std::lock_guard lock
{o
->xattr_mutex
};
379 if (!o
->xattr
.count(k
)) {
386 int MemStore::getattrs(CollectionHandle
&c_
, const ghobject_t
& oid
,
387 std::map
<std::string
,ceph::buffer::ptr
,std::less
<>>& aset
)
389 Collection
*c
= static_cast<Collection
*>(c_
.get());
390 dout(10) << __func__
<< " " << c
->cid
<< " " << oid
<< dendl
;
394 ObjectRef o
= c
->get_object(oid
);
397 std::lock_guard lock
{o
->xattr_mutex
};
402 int MemStore::list_collections(std::vector
<coll_t
>& ls
)
404 dout(10) << __func__
<< dendl
;
405 std::shared_lock l
{coll_lock
};
406 for (ceph::unordered_map
<coll_t
,CollectionRef
>::iterator p
= coll_map
.begin();
409 ls
.push_back(p
->first
);
414 bool MemStore::collection_exists(const coll_t
& cid
)
416 dout(10) << __func__
<< " " << cid
<< dendl
;
417 std::shared_lock l
{coll_lock
};
418 return coll_map
.count(cid
);
421 int MemStore::collection_empty(CollectionHandle
& ch
, bool *empty
)
423 dout(10) << __func__
<< " " << ch
->cid
<< dendl
;
424 CollectionRef c
= static_cast<Collection
*>(ch
.get());
425 std::shared_lock l
{c
->lock
};
426 *empty
= c
->object_map
.empty();
430 int MemStore::collection_bits(CollectionHandle
& ch
)
432 dout(10) << __func__
<< " " << ch
->cid
<< dendl
;
433 Collection
*c
= static_cast<Collection
*>(ch
.get());
434 std::shared_lock l
{c
->lock
};
438 int MemStore::collection_list(CollectionHandle
& ch
,
439 const ghobject_t
& start
,
440 const ghobject_t
& end
,
442 std::vector
<ghobject_t
> *ls
, ghobject_t
*next
)
444 Collection
*c
= static_cast<Collection
*>(ch
.get());
445 std::shared_lock l
{c
->lock
};
447 dout(10) << __func__
<< " cid " << ch
->cid
<< " start " << start
448 << " end " << end
<< dendl
;
449 auto p
= c
->object_map
.lower_bound(start
);
450 while (p
!= c
->object_map
.end() &&
451 ls
->size() < (unsigned)max
&&
453 ls
->push_back(p
->first
);
457 if (p
== c
->object_map
.end())
458 *next
= ghobject_t::get_max();
462 dout(10) << __func__
<< " cid " << ch
->cid
<< " got " << ls
->size() << dendl
;
466 int MemStore::omap_get(
467 CollectionHandle
& ch
, ///< [in] Collection containing oid
468 const ghobject_t
&oid
, ///< [in] Object containing omap
469 ceph::buffer::list
*header
, ///< [out] omap header
470 std::map
<std::string
, ceph::buffer::list
> *out
/// < [out] Key to value map
473 dout(10) << __func__
<< " " << ch
->cid
<< " " << oid
<< dendl
;
474 Collection
*c
= static_cast<Collection
*>(ch
.get());
476 ObjectRef o
= c
->get_object(oid
);
479 std::lock_guard lock
{o
->omap_mutex
};
480 *header
= o
->omap_header
;
485 int MemStore::omap_get_header(
486 CollectionHandle
& ch
, ///< [in] Collection containing oid
487 const ghobject_t
&oid
, ///< [in] Object containing omap
488 ceph::buffer::list
*header
, ///< [out] omap header
489 bool allow_eio
///< [in] don't assert on eio
492 dout(10) << __func__
<< " " << ch
->cid
<< " " << oid
<< dendl
;
493 Collection
*c
= static_cast<Collection
*>(ch
.get());
494 ObjectRef o
= c
->get_object(oid
);
497 std::lock_guard lock
{o
->omap_mutex
};
498 *header
= o
->omap_header
;
502 int MemStore::omap_get_keys(
503 CollectionHandle
& ch
, ///< [in] Collection containing oid
504 const ghobject_t
&oid
, ///< [in] Object containing omap
505 std::set
<std::string
> *keys
///< [out] Keys defined on oid
508 dout(10) << __func__
<< " " << ch
->cid
<< " " << oid
<< dendl
;
509 Collection
*c
= static_cast<Collection
*>(ch
.get());
510 ObjectRef o
= c
->get_object(oid
);
513 std::lock_guard lock
{o
->omap_mutex
};
514 for (auto p
= o
->omap
.begin(); p
!= o
->omap
.end(); ++p
)
515 keys
->insert(p
->first
);
519 int MemStore::omap_get_values(
520 CollectionHandle
& ch
, ///< [in] Collection containing oid
521 const ghobject_t
&oid
, ///< [in] Object containing omap
522 const std::set
<std::string
> &keys
, ///< [in] Keys to get
523 std::map
<std::string
, ceph::buffer::list
> *out
///< [out] Returned keys and values
526 dout(10) << __func__
<< " " << ch
->cid
<< " " << oid
<< dendl
;
527 Collection
*c
= static_cast<Collection
*>(ch
.get());
528 ObjectRef o
= c
->get_object(oid
);
531 std::lock_guard lock
{o
->omap_mutex
};
532 for (auto p
= keys
.begin(); p
!= keys
.end(); ++p
) {
533 auto q
= o
->omap
.find(*p
);
534 if (q
!= o
->omap
.end())
541 int MemStore::omap_get_values(
542 CollectionHandle
& ch
, ///< [in] Collection containing oid
543 const ghobject_t
&oid
, ///< [in] Object containing omap
544 const std::optional
<std::string
> &start_after
, ///< [in] Keys to get
545 std::map
<std::string
, ceph::buffer::list
> *out
///< [out] Returned keys and values
548 dout(10) << __func__
<< " " << ch
->cid
<< " " << oid
<< dendl
;
549 Collection
*c
= static_cast<Collection
*>(ch
.get());
550 ObjectRef o
= c
->get_object(oid
);
554 std::lock_guard lock
{o
->omap_mutex
};
555 for (auto it
= o
->omap
.upper_bound(*start_after
);
556 it
!= std::end(o
->omap
);
564 int MemStore::omap_check_keys(
565 CollectionHandle
& ch
, ///< [in] Collection containing oid
566 const ghobject_t
&oid
, ///< [in] Object containing omap
567 const std::set
<std::string
> &keys
, ///< [in] Keys to check
568 std::set
<std::string
> *out
///< [out] Subset of keys defined on oid
571 dout(10) << __func__
<< " " << ch
->cid
<< " " << oid
<< dendl
;
572 Collection
*c
= static_cast<Collection
*>(ch
.get());
573 ObjectRef o
= c
->get_object(oid
);
576 std::lock_guard lock
{o
->omap_mutex
};
577 for (auto p
= keys
.begin(); p
!= keys
.end(); ++p
) {
578 auto q
= o
->omap
.find(*p
);
579 if (q
!= o
->omap
.end())
585 class MemStore::OmapIteratorImpl
: public ObjectMap::ObjectMapIteratorImpl
{
588 std::map
<std::string
,ceph::buffer::list
>::iterator it
;
590 OmapIteratorImpl(CollectionRef c
, ObjectRef o
)
591 : c(c
), o(o
), it(o
->omap
.begin()) {}
593 int seek_to_first() override
{
594 std::lock_guard lock
{o
->omap_mutex
};
595 it
= o
->omap
.begin();
598 int upper_bound(const std::string
&after
) override
{
599 std::lock_guard lock
{o
->omap_mutex
};
600 it
= o
->omap
.upper_bound(after
);
603 int lower_bound(const std::string
&to
) override
{
604 std::lock_guard lock
{o
->omap_mutex
};
605 it
= o
->omap
.lower_bound(to
);
608 bool valid() override
{
609 std::lock_guard lock
{o
->omap_mutex
};
610 return it
!= o
->omap
.end();
612 int next() override
{
613 std::lock_guard lock
{o
->omap_mutex
};
617 std::string
key() override
{
618 std::lock_guard lock
{o
->omap_mutex
};
621 ceph::buffer::list
value() override
{
622 std::lock_guard lock
{o
->omap_mutex
};
625 int status() override
{
630 ObjectMap::ObjectMapIterator
MemStore::get_omap_iterator(
631 CollectionHandle
& ch
,
632 const ghobject_t
& oid
)
634 dout(10) << __func__
<< " " << ch
->cid
<< " " << oid
<< dendl
;
635 Collection
*c
= static_cast<Collection
*>(ch
.get());
636 ObjectRef o
= c
->get_object(oid
);
638 return ObjectMap::ObjectMapIterator();
639 return ObjectMap::ObjectMapIterator(new OmapIteratorImpl(c
, o
));
646 int MemStore::queue_transactions(
647 CollectionHandle
& ch
,
648 std::vector
<Transaction
>& tls
,
650 ThreadPool::TPHandle
*handle
)
652 // because memstore operations are synchronous, we can implement the
653 // Sequencer with a mutex. this guarantees ordering on a given sequencer,
654 // while allowing operations on different sequencers to happen in parallel
655 Collection
*c
= static_cast<Collection
*>(ch
.get());
656 std::unique_lock lock
{c
->sequencer_mutex
};
658 for (auto p
= tls
.begin(); p
!= tls
.end(); ++p
) {
659 // poke the TPHandle heartbeat just to exercise that code path
661 handle
->reset_tp_timeout();
666 Context
*on_apply
= NULL
, *on_apply_sync
= NULL
, *on_commit
= NULL
;
667 ObjectStore::Transaction::collect_contexts(tls
, &on_apply
, &on_commit
,
670 on_apply_sync
->complete(0);
672 finisher
.queue(on_apply
);
674 finisher
.queue(on_commit
);
678 void MemStore::_do_transaction(Transaction
& t
)
680 Transaction::iterator i
= t
.begin();
683 while (i
.have_op()) {
684 Transaction::Op
*op
= i
.decode_op();
688 case Transaction::OP_NOP
:
690 case Transaction::OP_TOUCH
:
691 case Transaction::OP_CREATE
:
693 coll_t cid
= i
.get_cid(op
->cid
);
694 ghobject_t oid
= i
.get_oid(op
->oid
);
695 r
= _touch(cid
, oid
);
699 case Transaction::OP_WRITE
:
701 coll_t cid
= i
.get_cid(op
->cid
);
702 ghobject_t oid
= i
.get_oid(op
->oid
);
703 uint64_t off
= op
->off
;
704 uint64_t len
= op
->len
;
705 uint32_t fadvise_flags
= i
.get_fadvise_flags();
706 ceph::buffer::list bl
;
708 r
= _write(cid
, oid
, off
, len
, bl
, fadvise_flags
);
712 case Transaction::OP_ZERO
:
714 coll_t cid
= i
.get_cid(op
->cid
);
715 ghobject_t oid
= i
.get_oid(op
->oid
);
716 uint64_t off
= op
->off
;
717 uint64_t len
= op
->len
;
718 r
= _zero(cid
, oid
, off
, len
);
722 case Transaction::OP_TRIMCACHE
:
728 case Transaction::OP_TRUNCATE
:
730 coll_t cid
= i
.get_cid(op
->cid
);
731 ghobject_t oid
= i
.get_oid(op
->oid
);
732 uint64_t off
= op
->off
;
733 r
= _truncate(cid
, oid
, off
);
737 case Transaction::OP_REMOVE
:
739 coll_t cid
= i
.get_cid(op
->cid
);
740 ghobject_t oid
= i
.get_oid(op
->oid
);
741 r
= _remove(cid
, oid
);
745 case Transaction::OP_SETATTR
:
747 coll_t cid
= i
.get_cid(op
->cid
);
748 ghobject_t oid
= i
.get_oid(op
->oid
);
749 std::string name
= i
.decode_string();
750 ceph::buffer::list bl
;
752 std::map
<std::string
, ceph::buffer::ptr
> to_set
;
753 to_set
[name
] = ceph::buffer::ptr(bl
.c_str(), bl
.length());
754 r
= _setattrs(cid
, oid
, to_set
);
758 case Transaction::OP_SETATTRS
:
760 coll_t cid
= i
.get_cid(op
->cid
);
761 ghobject_t oid
= i
.get_oid(op
->oid
);
762 std::map
<std::string
, ceph::buffer::ptr
> aset
;
763 i
.decode_attrset(aset
);
764 r
= _setattrs(cid
, oid
, aset
);
768 case Transaction::OP_RMATTR
:
770 coll_t cid
= i
.get_cid(op
->cid
);
771 ghobject_t oid
= i
.get_oid(op
->oid
);
772 std::string name
= i
.decode_string();
773 r
= _rmattr(cid
, oid
, name
.c_str());
777 case Transaction::OP_RMATTRS
:
779 coll_t cid
= i
.get_cid(op
->cid
);
780 ghobject_t oid
= i
.get_oid(op
->oid
);
781 r
= _rmattrs(cid
, oid
);
785 case Transaction::OP_CLONE
:
787 coll_t cid
= i
.get_cid(op
->cid
);
788 ghobject_t oid
= i
.get_oid(op
->oid
);
789 ghobject_t noid
= i
.get_oid(op
->dest_oid
);
790 r
= _clone(cid
, oid
, noid
);
794 case Transaction::OP_CLONERANGE
:
796 coll_t cid
= i
.get_cid(op
->cid
);
797 ghobject_t oid
= i
.get_oid(op
->oid
);
798 ghobject_t noid
= i
.get_oid(op
->dest_oid
);
799 uint64_t off
= op
->off
;
800 uint64_t len
= op
->len
;
801 r
= _clone_range(cid
, oid
, noid
, off
, len
, off
);
805 case Transaction::OP_CLONERANGE2
:
807 coll_t cid
= i
.get_cid(op
->cid
);
808 ghobject_t oid
= i
.get_oid(op
->oid
);
809 ghobject_t noid
= i
.get_oid(op
->dest_oid
);
810 uint64_t srcoff
= op
->off
;
811 uint64_t len
= op
->len
;
812 uint64_t dstoff
= op
->dest_off
;
813 r
= _clone_range(cid
, oid
, noid
, srcoff
, len
, dstoff
);
817 case Transaction::OP_MKCOLL
:
819 coll_t cid
= i
.get_cid(op
->cid
);
820 r
= _create_collection(cid
, op
->split_bits
);
824 case Transaction::OP_COLL_HINT
:
826 coll_t cid
= i
.get_cid(op
->cid
);
827 uint32_t type
= op
->hint
;
828 ceph::buffer::list hint
;
830 auto hiter
= hint
.cbegin();
831 if (type
== Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS
) {
834 decode(pg_num
, hiter
);
835 decode(num_objs
, hiter
);
836 r
= _collection_hint_expected_num_objs(cid
, pg_num
, num_objs
);
839 dout(10) << "Unrecognized collection hint type: " << type
<< dendl
;
844 case Transaction::OP_RMCOLL
:
846 coll_t cid
= i
.get_cid(op
->cid
);
847 r
= _destroy_collection(cid
);
851 case Transaction::OP_COLL_ADD
:
853 coll_t ocid
= i
.get_cid(op
->cid
);
854 coll_t ncid
= i
.get_cid(op
->dest_cid
);
855 ghobject_t oid
= i
.get_oid(op
->oid
);
856 r
= _collection_add(ncid
, ocid
, oid
);
860 case Transaction::OP_COLL_REMOVE
:
862 coll_t cid
= i
.get_cid(op
->cid
);
863 ghobject_t oid
= i
.get_oid(op
->oid
);
864 r
= _remove(cid
, oid
);
868 case Transaction::OP_COLL_MOVE
:
869 ceph_abort_msg("deprecated");
872 case Transaction::OP_COLL_MOVE_RENAME
:
874 coll_t oldcid
= i
.get_cid(op
->cid
);
875 ghobject_t oldoid
= i
.get_oid(op
->oid
);
876 coll_t newcid
= i
.get_cid(op
->dest_cid
);
877 ghobject_t newoid
= i
.get_oid(op
->dest_oid
);
878 r
= _collection_move_rename(oldcid
, oldoid
, newcid
, newoid
);
884 case Transaction::OP_TRY_RENAME
:
886 coll_t cid
= i
.get_cid(op
->cid
);
887 ghobject_t oldoid
= i
.get_oid(op
->oid
);
888 ghobject_t newoid
= i
.get_oid(op
->dest_oid
);
889 r
= _collection_move_rename(cid
, oldoid
, cid
, newoid
);
895 case Transaction::OP_COLL_SETATTR
:
897 ceph_abort_msg("not implemented");
901 case Transaction::OP_COLL_RMATTR
:
903 ceph_abort_msg("not implemented");
907 case Transaction::OP_COLL_RENAME
:
909 ceph_abort_msg("not implemented");
913 case Transaction::OP_OMAP_CLEAR
:
915 coll_t cid
= i
.get_cid(op
->cid
);
916 ghobject_t oid
= i
.get_oid(op
->oid
);
917 r
= _omap_clear(cid
, oid
);
920 case Transaction::OP_OMAP_SETKEYS
:
922 coll_t cid
= i
.get_cid(op
->cid
);
923 ghobject_t oid
= i
.get_oid(op
->oid
);
924 ceph::buffer::list aset_bl
;
925 i
.decode_attrset_bl(&aset_bl
);
926 r
= _omap_setkeys(cid
, oid
, aset_bl
);
929 case Transaction::OP_OMAP_RMKEYS
:
931 coll_t cid
= i
.get_cid(op
->cid
);
932 ghobject_t oid
= i
.get_oid(op
->oid
);
933 ceph::buffer::list keys_bl
;
934 i
.decode_keyset_bl(&keys_bl
);
935 r
= _omap_rmkeys(cid
, oid
, keys_bl
);
938 case Transaction::OP_OMAP_RMKEYRANGE
:
940 coll_t cid
= i
.get_cid(op
->cid
);
941 ghobject_t oid
= i
.get_oid(op
->oid
);
942 std::string first
, last
;
943 first
= i
.decode_string();
944 last
= i
.decode_string();
945 r
= _omap_rmkeyrange(cid
, oid
, first
, last
);
948 case Transaction::OP_OMAP_SETHEADER
:
950 coll_t cid
= i
.get_cid(op
->cid
);
951 ghobject_t oid
= i
.get_oid(op
->oid
);
952 ceph::buffer::list bl
;
954 r
= _omap_setheader(cid
, oid
, bl
);
957 case Transaction::OP_SPLIT_COLLECTION
:
958 ceph_abort_msg("deprecated");
960 case Transaction::OP_SPLIT_COLLECTION2
:
962 coll_t cid
= i
.get_cid(op
->cid
);
963 uint32_t bits
= op
->split_bits
;
964 uint32_t rem
= op
->split_rem
;
965 coll_t dest
= i
.get_cid(op
->dest_cid
);
966 r
= _split_collection(cid
, bits
, rem
, dest
);
969 case Transaction::OP_MERGE_COLLECTION
:
971 coll_t cid
= i
.get_cid(op
->cid
);
972 uint32_t bits
= op
->split_bits
;
973 coll_t dest
= i
.get_cid(op
->dest_cid
);
974 r
= _merge_collection(cid
, bits
, dest
);
978 case Transaction::OP_SETALLOCHINT
:
984 case Transaction::OP_COLL_SET_BITS
:
991 derr
<< "bad op " << op
->op
<< dendl
;
998 if (r
== -ENOENT
&& !(op
->op
== Transaction::OP_CLONERANGE
||
999 op
->op
== Transaction::OP_CLONE
||
1000 op
->op
== Transaction::OP_CLONERANGE2
||
1001 op
->op
== Transaction::OP_COLL_ADD
))
1002 // -ENOENT is usually okay
1008 const char *msg
= "unexpected error code";
1010 if (r
== -ENOENT
&& (op
->op
== Transaction::OP_CLONERANGE
||
1011 op
->op
== Transaction::OP_CLONE
||
1012 op
->op
== Transaction::OP_CLONERANGE2
))
1013 msg
= "ENOENT on clone suggests osd bug";
1016 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
1017 // by partially applying transactions.
1018 msg
= "ENOSPC from MemStore, misconfigured cluster or insufficient memory";
1020 if (r
== -ENOTEMPTY
) {
1021 msg
= "ENOTEMPTY suggests garbage data in osd data dir";
1025 derr
<< " error " << cpp_strerror(r
) << " not handled on operation " << op
->op
1026 << " (op " << pos
<< ", counting from 0)" << dendl
;
1027 dout(0) << msg
<< dendl
;
1028 dout(0) << " transaction dump:\n";
1029 ceph::JSONFormatter
f(true);
1030 f
.open_object_section("transaction");
1035 ceph_abort_msg("unexpected error");
1043 int MemStore::_touch(const coll_t
& cid
, const ghobject_t
& oid
)
1045 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1046 CollectionRef c
= get_collection(cid
);
1050 c
->get_or_create_object(oid
);
1054 int MemStore::_write(const coll_t
& cid
, const ghobject_t
& oid
,
1055 uint64_t offset
, size_t len
, const ceph::buffer::list
& bl
,
1056 uint32_t fadvise_flags
)
1058 dout(10) << __func__
<< " " << cid
<< " " << oid
<< " "
1059 << offset
<< "~" << len
<< dendl
;
1060 ceph_assert(len
== bl
.length());
1062 CollectionRef c
= get_collection(cid
);
1066 ObjectRef o
= c
->get_or_create_object(oid
);
1067 if (len
> 0 && !cct
->_conf
->memstore_debug_omit_block_device_write
) {
1068 const ssize_t old_size
= o
->get_size();
1069 o
->write(offset
, bl
);
1070 used_bytes
+= (o
->get_size() - old_size
);
1076 int MemStore::_zero(const coll_t
& cid
, const ghobject_t
& oid
,
1077 uint64_t offset
, size_t len
)
1079 dout(10) << __func__
<< " " << cid
<< " " << oid
<< " " << offset
<< "~"
1081 ceph::buffer::list bl
;
1082 bl
.append_zero(len
);
1083 return _write(cid
, oid
, offset
, len
, bl
);
1086 int MemStore::_truncate(const coll_t
& cid
, const ghobject_t
& oid
, uint64_t size
)
1088 dout(10) << __func__
<< " " << cid
<< " " << oid
<< " " << size
<< dendl
;
1089 CollectionRef c
= get_collection(cid
);
1093 ObjectRef o
= c
->get_object(oid
);
1096 if (cct
->_conf
->memstore_debug_omit_block_device_write
)
1098 const ssize_t old_size
= o
->get_size();
1099 int r
= o
->truncate(size
);
1100 used_bytes
+= (o
->get_size() - old_size
);
1104 int MemStore::_remove(const coll_t
& cid
, const ghobject_t
& oid
)
1106 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1107 CollectionRef c
= get_collection(cid
);
1110 std::lock_guard l
{c
->lock
};
1112 auto i
= c
->object_hash
.find(oid
);
1113 if (i
== c
->object_hash
.end())
1115 used_bytes
-= i
->second
->get_size();
1116 c
->object_hash
.erase(i
);
1117 c
->object_map
.erase(oid
);
1122 int MemStore::_setattrs(const coll_t
& cid
, const ghobject_t
& oid
,
1123 std::map
<std::string
,ceph::buffer::ptr
>& aset
)
1125 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1126 CollectionRef c
= get_collection(cid
);
1130 ObjectRef o
= c
->get_object(oid
);
1133 std::lock_guard lock
{o
->xattr_mutex
};
1134 for (auto p
= aset
.begin(); p
!= aset
.end(); ++p
)
1135 o
->xattr
[p
->first
] = p
->second
;
1139 int MemStore::_rmattr(const coll_t
& cid
, const ghobject_t
& oid
, const char *name
)
1141 dout(10) << __func__
<< " " << cid
<< " " << oid
<< " " << name
<< dendl
;
1142 CollectionRef c
= get_collection(cid
);
1146 ObjectRef o
= c
->get_object(oid
);
1149 std::lock_guard lock
{o
->xattr_mutex
};
1150 auto i
= o
->xattr
.find(name
);
1151 if (i
== o
->xattr
.end())
1157 int MemStore::_rmattrs(const coll_t
& cid
, const ghobject_t
& oid
)
1159 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1160 CollectionRef c
= get_collection(cid
);
1164 ObjectRef o
= c
->get_object(oid
);
1167 std::lock_guard lock
{o
->xattr_mutex
};
1172 int MemStore::_clone(const coll_t
& cid
, const ghobject_t
& oldoid
,
1173 const ghobject_t
& newoid
)
1175 dout(10) << __func__
<< " " << cid
<< " " << oldoid
1176 << " -> " << newoid
<< dendl
;
1177 CollectionRef c
= get_collection(cid
);
1181 ObjectRef oo
= c
->get_object(oldoid
);
1184 ObjectRef no
= c
->get_or_create_object(newoid
);
1185 used_bytes
+= oo
->get_size() - no
->get_size();
1186 no
->clone(oo
.get(), 0, oo
->get_size(), 0);
1188 // take xattr and omap locks with std::lock()
1189 std::scoped_lock l
{oo
->xattr_mutex
,
1194 no
->omap_header
= oo
->omap_header
;
1195 no
->omap
= oo
->omap
;
1196 no
->xattr
= oo
->xattr
;
1200 int MemStore::_clone_range(const coll_t
& cid
, const ghobject_t
& oldoid
,
1201 const ghobject_t
& newoid
,
1202 uint64_t srcoff
, uint64_t len
, uint64_t dstoff
)
1204 dout(10) << __func__
<< " " << cid
<< " "
1205 << oldoid
<< " " << srcoff
<< "~" << len
<< " -> "
1206 << newoid
<< " " << dstoff
<< "~" << len
1208 CollectionRef c
= get_collection(cid
);
1212 ObjectRef oo
= c
->get_object(oldoid
);
1215 ObjectRef no
= c
->get_or_create_object(newoid
);
1216 if (srcoff
>= oo
->get_size())
1218 if (srcoff
+ len
>= oo
->get_size())
1219 len
= oo
->get_size() - srcoff
;
1221 const ssize_t old_size
= no
->get_size();
1222 no
->clone(oo
.get(), srcoff
, len
, dstoff
);
1223 used_bytes
+= (no
->get_size() - old_size
);
1228 int MemStore::_omap_clear(const coll_t
& cid
, const ghobject_t
&oid
)
1230 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1231 CollectionRef c
= get_collection(cid
);
1235 ObjectRef o
= c
->get_object(oid
);
1238 std::lock_guard lock
{o
->omap_mutex
};
1240 o
->omap_header
.clear();
1244 int MemStore::_omap_setkeys(const coll_t
& cid
, const ghobject_t
&oid
,
1245 ceph::buffer::list
& aset_bl
)
1247 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1248 CollectionRef c
= get_collection(cid
);
1252 ObjectRef o
= c
->get_object(oid
);
1255 std::lock_guard lock
{o
->omap_mutex
};
1256 auto p
= aset_bl
.cbegin();
1262 decode(o
->omap
[key
], p
);
1267 int MemStore::_omap_rmkeys(const coll_t
& cid
, const ghobject_t
&oid
,
1268 ceph::buffer::list
& keys_bl
)
1270 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1271 CollectionRef c
= get_collection(cid
);
1275 ObjectRef o
= c
->get_object(oid
);
1278 std::lock_guard lock
{o
->omap_mutex
};
1279 auto p
= keys_bl
.cbegin();
1290 int MemStore::_omap_rmkeyrange(const coll_t
& cid
, const ghobject_t
&oid
,
1291 const std::string
& first
, const std::string
& last
)
1293 dout(10) << __func__
<< " " << cid
<< " " << oid
<< " " << first
1294 << " " << last
<< dendl
;
1295 CollectionRef c
= get_collection(cid
);
1299 ObjectRef o
= c
->get_object(oid
);
1302 std::lock_guard lock
{o
->omap_mutex
};
1303 auto p
= o
->omap
.lower_bound(first
);
1304 auto e
= o
->omap
.lower_bound(last
);
1305 o
->omap
.erase(p
, e
);
1309 int MemStore::_omap_setheader(const coll_t
& cid
, const ghobject_t
&oid
,
1310 const ceph::buffer::list
&bl
)
1312 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1313 CollectionRef c
= get_collection(cid
);
1317 ObjectRef o
= c
->get_object(oid
);
1320 std::lock_guard lock
{o
->omap_mutex
};
1321 o
->omap_header
= bl
;
1325 int MemStore::_create_collection(const coll_t
& cid
, int bits
)
1327 dout(10) << __func__
<< " " << cid
<< dendl
;
1328 std::lock_guard l
{coll_lock
};
1329 auto result
= coll_map
.insert(std::make_pair(cid
, CollectionRef()));
1332 auto p
= new_coll_map
.find(cid
);
1333 ceph_assert(p
!= new_coll_map
.end());
1334 result
.first
->second
= p
->second
;
1335 result
.first
->second
->bits
= bits
;
1336 new_coll_map
.erase(p
);
1340 int MemStore::_destroy_collection(const coll_t
& cid
)
1342 dout(10) << __func__
<< " " << cid
<< dendl
;
1343 std::lock_guard l
{coll_lock
};
1344 ceph::unordered_map
<coll_t
,CollectionRef
>::iterator cp
= coll_map
.find(cid
);
1345 if (cp
== coll_map
.end())
1348 std::shared_lock l2
{cp
->second
->lock
};
1349 if (!cp
->second
->object_map
.empty())
1351 cp
->second
->exists
= false;
1353 used_bytes
-= cp
->second
->used_bytes();
1358 int MemStore::_collection_add(const coll_t
& cid
, const coll_t
& ocid
, const ghobject_t
& oid
)
1360 dout(10) << __func__
<< " " << cid
<< " " << ocid
<< " " << oid
<< dendl
;
1361 CollectionRef c
= get_collection(cid
);
1364 CollectionRef oc
= get_collection(ocid
);
1368 std::scoped_lock l
{std::min(&(*c
), &(*oc
))->lock
,
1369 std::max(&(*c
), &(*oc
))->lock
};
1371 if (c
->object_hash
.count(oid
))
1373 if (oc
->object_hash
.count(oid
) == 0)
1375 ObjectRef o
= oc
->object_hash
[oid
];
1376 c
->object_map
[oid
] = o
;
1377 c
->object_hash
[oid
] = o
;
1381 int MemStore::_collection_move_rename(const coll_t
& oldcid
, const ghobject_t
& oldoid
,
1382 coll_t cid
, const ghobject_t
& oid
)
1384 dout(10) << __func__
<< " " << oldcid
<< " " << oldoid
<< " -> "
1385 << cid
<< " " << oid
<< dendl
;
1386 CollectionRef c
= get_collection(cid
);
1389 CollectionRef oc
= get_collection(oldcid
);
1393 // note: c and oc may be the same
1394 ceph_assert(&(*c
) == &(*oc
));
1396 std::lock_guard l
{c
->lock
};
1397 if (c
->object_hash
.count(oid
))
1399 if (oc
->object_hash
.count(oldoid
) == 0)
1402 ObjectRef o
= oc
->object_hash
[oldoid
];
1403 c
->object_map
[oid
] = o
;
1404 c
->object_hash
[oid
] = o
;
1405 oc
->object_map
.erase(oldoid
);
1406 oc
->object_hash
.erase(oldoid
);
1411 int MemStore::_split_collection(const coll_t
& cid
, uint32_t bits
, uint32_t match
,
1414 dout(10) << __func__
<< " " << cid
<< " " << bits
<< " " << match
<< " "
1416 CollectionRef sc
= get_collection(cid
);
1419 CollectionRef dc
= get_collection(dest
);
1423 std::scoped_lock l
{std::min(&(*sc
), &(*dc
))->lock
,
1424 std::max(&(*sc
), &(*dc
))->lock
};
1426 auto p
= sc
->object_map
.begin();
1427 while (p
!= sc
->object_map
.end()) {
1428 if (p
->first
.match(bits
, match
)) {
1429 dout(20) << " moving " << p
->first
<< dendl
;
1430 dc
->object_map
.insert(std::make_pair(p
->first
, p
->second
));
1431 dc
->object_hash
.insert(std::make_pair(p
->first
, p
->second
));
1432 sc
->object_hash
.erase(p
->first
);
1433 sc
->object_map
.erase(p
++);
1440 ceph_assert(dc
->bits
== (int)bits
);
1445 int MemStore::_merge_collection(const coll_t
& cid
, uint32_t bits
, coll_t dest
)
1447 dout(10) << __func__
<< " " << cid
<< " " << bits
<< " "
1449 CollectionRef sc
= get_collection(cid
);
1452 CollectionRef dc
= get_collection(dest
);
1456 std::scoped_lock l
{std::min(&(*sc
), &(*dc
))->lock
,
1457 std::max(&(*sc
), &(*dc
))->lock
};
1459 auto p
= sc
->object_map
.begin();
1460 while (p
!= sc
->object_map
.end()) {
1461 dout(20) << " moving " << p
->first
<< dendl
;
1462 dc
->object_map
.insert(std::make_pair(p
->first
, p
->second
));
1463 dc
->object_hash
.insert(std::make_pair(p
->first
, p
->second
));
1464 sc
->object_hash
.erase(p
->first
);
1465 sc
->object_map
.erase(p
++);
1472 std::lock_guard l
{coll_lock
};
1473 ceph::unordered_map
<coll_t
,CollectionRef
>::iterator cp
= coll_map
.find(cid
);
1474 ceph_assert(cp
!= coll_map
.end());
1475 used_bytes
-= cp
->second
->used_bytes();
1483 struct BufferlistObject
: public MemStore::Object
{
1484 ceph::spinlock mutex
;
1485 ceph::buffer::list data
;
1487 size_t get_size() const override
{ return data
.length(); }
1489 int read(uint64_t offset
, uint64_t len
, ceph::buffer::list
&bl
) override
;
1490 int write(uint64_t offset
, const ceph::buffer::list
&bl
) override
;
1491 int clone(Object
*src
, uint64_t srcoff
, uint64_t len
,
1492 uint64_t dstoff
) override
;
1493 int truncate(uint64_t offset
) override
;
1495 void encode(ceph::buffer::list
& bl
) const override
{
1496 ENCODE_START(1, 1, bl
);
1501 void decode(ceph::buffer::list::const_iterator
& p
) override
{
1510 int BufferlistObject::read(uint64_t offset
, uint64_t len
,
1511 ceph::buffer::list
&bl
)
1513 std::lock_guard
<decltype(mutex
)> lock(mutex
);
1514 bl
.substr_of(data
, offset
, len
);
1518 int BufferlistObject::write(uint64_t offset
, const ceph::buffer::list
&src
)
1520 unsigned len
= src
.length();
1522 std::lock_guard
<decltype(mutex
)> lock(mutex
);
1525 ceph::buffer::list newdata
;
1526 if (get_size() >= offset
) {
1527 newdata
.substr_of(data
, 0, offset
);
1530 newdata
.substr_of(data
, 0, get_size());
1532 newdata
.append_zero(offset
- get_size());
1535 newdata
.append(src
);
1538 if (get_size() > offset
+ len
) {
1539 ceph::buffer::list tail
;
1540 tail
.substr_of(data
, offset
+ len
, get_size() - (offset
+ len
));
1541 newdata
.append(tail
);
1544 data
= std::move(newdata
);
1548 int BufferlistObject::clone(Object
*src
, uint64_t srcoff
,
1549 uint64_t len
, uint64_t dstoff
)
1551 auto srcbl
= dynamic_cast<BufferlistObject
*>(src
);
1552 if (srcbl
== nullptr)
1555 ceph::buffer::list bl
;
1557 std::lock_guard
<decltype(srcbl
->mutex
)> lock(srcbl
->mutex
);
1558 if (srcoff
== dstoff
&& len
== src
->get_size()) {
1562 bl
.substr_of(srcbl
->data
, srcoff
, len
);
1564 return write(dstoff
, bl
);
1567 int BufferlistObject::truncate(uint64_t size
)
1569 std::lock_guard
<decltype(mutex
)> lock(mutex
);
1570 if (get_size() > size
) {
1571 ceph::buffer::list bl
;
1572 bl
.substr_of(data
, 0, size
);
1573 data
= std::move(bl
);
1574 } else if (get_size() == size
) {
1577 data
.append_zero(size
- get_size());
1584 struct MemStore::PageSetObject
: public Object
{
1587 #if defined(__GLIBCXX__)
1588 // use a thread-local vector for the pages returned by PageSet, so we
1589 // can avoid allocations in read/write()
1590 static thread_local
PageSet::page_vector tls_pages
;
1593 size_t get_size() const override
{ return data_len
; }
1595 int read(uint64_t offset
, uint64_t len
, ceph::buffer::list
&bl
) override
;
1596 int write(uint64_t offset
, const ceph::buffer::list
&bl
) override
;
1597 int clone(Object
*src
, uint64_t srcoff
, uint64_t len
,
1598 uint64_t dstoff
) override
;
1599 int truncate(uint64_t offset
) override
;
1601 void encode(ceph::buffer::list
& bl
) const override
{
1602 ENCODE_START(1, 1, bl
);
1603 encode(data_len
, bl
);
1608 void decode(ceph::buffer::list::const_iterator
& p
) override
{
1610 decode(data_len
, p
);
1617 FRIEND_MAKE_REF(PageSetObject
);
1618 explicit PageSetObject(size_t page_size
) : data(page_size
), data_len(0) {}
1621 #if defined(__GLIBCXX__)
1622 // use a thread-local vector for the pages returned by PageSet, so we
1623 // can avoid allocations in read/write()
1624 thread_local
PageSet::page_vector
MemStore::PageSetObject::tls_pages
;
1625 #define DEFINE_PAGE_VECTOR(name)
1627 #define DEFINE_PAGE_VECTOR(name) PageSet::page_vector name;
1630 int MemStore::PageSetObject::read(uint64_t offset
, uint64_t len
, ceph::buffer::list
& bl
)
1632 const auto start
= offset
;
1633 const auto end
= offset
+ len
;
1634 auto remaining
= len
;
1636 DEFINE_PAGE_VECTOR(tls_pages
);
1637 data
.get_range(offset
, len
, tls_pages
);
1639 // allocate a buffer for the data
1640 ceph::buffer::ptr
buf(len
);
1642 auto p
= tls_pages
.begin();
1644 // no more pages in range
1645 if (p
== tls_pages
.end() || (*p
)->offset
>= end
) {
1646 buf
.zero(offset
- start
, remaining
);
1651 // fill any holes between pages with zeroes
1652 if (page
->offset
> offset
) {
1653 const auto count
= std::min(remaining
, page
->offset
- offset
);
1654 buf
.zero(offset
- start
, count
);
1656 offset
= page
->offset
;
1662 const auto page_offset
= offset
- page
->offset
;
1663 const auto count
= std::min(remaining
, data
.get_page_size() - page_offset
);
1665 buf
.copy_in(offset
- start
, count
, page
->data
+ page_offset
);
1673 tls_pages
.clear(); // drop page refs
1675 bl
.append(std::move(buf
));
1679 int MemStore::PageSetObject::write(uint64_t offset
, const ceph::buffer::list
&src
)
1681 unsigned len
= src
.length();
1683 DEFINE_PAGE_VECTOR(tls_pages
);
1684 // make sure the page range is allocated
1685 data
.alloc_range(offset
, src
.length(), tls_pages
);
1687 auto page
= tls_pages
.begin();
1689 auto p
= src
.begin();
1691 unsigned page_offset
= offset
- (*page
)->offset
;
1692 unsigned pageoff
= data
.get_page_size() - page_offset
;
1693 unsigned count
= std::min(len
, pageoff
);
1694 p
.copy(count
, (*page
)->data
+ page_offset
);
1697 if (count
== pageoff
)
1700 if (data_len
< offset
)
1702 tls_pages
.clear(); // drop page refs
1706 int MemStore::PageSetObject::clone(Object
*src
, uint64_t srcoff
,
1707 uint64_t len
, uint64_t dstoff
)
1709 const int64_t delta
= dstoff
- srcoff
;
1711 auto &src_data
= static_cast<PageSetObject
*>(src
)->data
;
1712 const uint64_t src_page_size
= src_data
.get_page_size();
1714 auto &dst_data
= data
;
1715 const auto dst_page_size
= dst_data
.get_page_size();
1717 DEFINE_PAGE_VECTOR(tls_pages
);
1718 PageSet::page_vector dst_pages
;
1721 // limit to 16 pages at a time so tls_pages doesn't balloon in size
1722 auto count
= std::min(len
, (uint64_t)src_page_size
* 16);
1723 src_data
.get_range(srcoff
, count
, tls_pages
);
1725 // allocate the destination range
1726 // TODO: avoid allocating pages for holes in the source range
1727 dst_data
.alloc_range(srcoff
+ delta
, count
, dst_pages
);
1728 auto dst_iter
= dst_pages
.begin();
1730 for (auto &src_page
: tls_pages
) {
1731 auto sbegin
= std::max(srcoff
, src_page
->offset
);
1732 auto send
= std::min(srcoff
+ count
, src_page
->offset
+ src_page_size
);
1734 // zero-fill holes before src_page
1735 if (srcoff
< sbegin
) {
1736 while (dst_iter
!= dst_pages
.end()) {
1737 auto &dst_page
= *dst_iter
;
1738 auto dbegin
= std::max(srcoff
+ delta
, dst_page
->offset
);
1739 auto dend
= std::min(sbegin
+ delta
, dst_page
->offset
+ dst_page_size
);
1740 std::fill(dst_page
->data
+ dbegin
- dst_page
->offset
,
1741 dst_page
->data
+ dend
- dst_page
->offset
, 0);
1742 if (dend
< dst_page
->offset
+ dst_page_size
)
1746 const auto c
= sbegin
- srcoff
;
1751 // copy data from src page to dst pages
1752 while (dst_iter
!= dst_pages
.end()) {
1753 auto &dst_page
= *dst_iter
;
1754 auto dbegin
= std::max(sbegin
+ delta
, dst_page
->offset
);
1755 auto dend
= std::min(send
+ delta
, dst_page
->offset
+ dst_page_size
);
1757 std::copy(src_page
->data
+ (dbegin
- delta
) - src_page
->offset
,
1758 src_page
->data
+ (dend
- delta
) - src_page
->offset
,
1759 dst_page
->data
+ dbegin
- dst_page
->offset
);
1760 if (dend
< dst_page
->offset
+ dst_page_size
)
1765 const auto c
= send
- sbegin
;
1769 dstoff
= send
+ delta
;
1771 tls_pages
.clear(); // drop page refs
1773 // zero-fill holes after the last src_page
1775 while (dst_iter
!= dst_pages
.end()) {
1776 auto &dst_page
= *dst_iter
;
1777 auto dbegin
= std::max(dstoff
, dst_page
->offset
);
1778 auto dend
= std::min(dstoff
+ count
, dst_page
->offset
+ dst_page_size
);
1779 std::fill(dst_page
->data
+ dbegin
- dst_page
->offset
,
1780 dst_page
->data
+ dend
- dst_page
->offset
, 0);
1787 dst_pages
.clear(); // drop page refs
1790 // update object size
1791 if (data_len
< dstoff
)
1796 int MemStore::PageSetObject::truncate(uint64_t size
)
1798 data
.free_pages_after(size
);
1801 const auto page_size
= data
.get_page_size();
1802 const auto page_offset
= size
& ~(page_size
-1);
1803 if (page_offset
== size
)
1806 DEFINE_PAGE_VECTOR(tls_pages
);
1807 // write zeroes to the rest of the last page
1808 data
.get_range(page_offset
, page_size
, tls_pages
);
1809 if (tls_pages
.empty())
1812 auto page
= tls_pages
.begin();
1813 auto data
= (*page
)->data
;
1814 std::fill(data
+ (size
- page_offset
), data
+ page_size
, 0);
1815 tls_pages
.clear(); // drop page ref
1820 MemStore::ObjectRef
MemStore::Collection::create_object() const {
1822 return ceph::make_ref
<PageSetObject
>(cct
->_conf
->memstore_page_size
);
1823 return new BufferlistObject();