1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_OSD_KSTORE_H
16 #define CEPH_OSD_KSTORE_H
24 #include <condition_variable>
26 #include "include/ceph_assert.h"
27 #include "include/unordered_map.h"
28 #include "common/Finisher.h"
29 #include "common/RWLock.h"
30 #include "common/Throttle.h"
31 #include "common/WorkQueue.h"
32 #include "os/ObjectStore.h"
33 #include "common/perf_counters.h"
35 #include "kv/KeyValueDB.h"
37 #include "kstore_types.h"
39 #include "boost/intrusive/list.hpp"
42 l_kstore_first
= 832430,
43 l_kstore_state_prepare_lat
,
44 l_kstore_state_kv_queued_lat
,
45 l_kstore_state_kv_done_lat
,
46 l_kstore_state_finishing_lat
,
47 l_kstore_state_done_lat
,
51 class KStore
: public ObjectStore
{
52 // -----------------------------------------------------
58 /// an in-memory object
61 std::atomic_int nref
; ///< reference count
64 std::string key
; ///< key under PREFIX_OBJ where we are stored
65 boost::intrusive::list_member_hook
<> lru_item
;
67 kstore_onode_t onode
; ///< metadata stored as value in kv store
71 std::mutex flush_lock
; ///< protect flush_txns
72 std::condition_variable flush_cond
; ///< wait here for unapplied txns
73 std::set
<TransContext
*> flush_txns
; ///< committing txns
76 ceph::buffer::list tail_bl
;
78 std::map
<uint64_t,ceph::buffer::list
> pending_stripes
; ///< unwritten stripes
80 Onode(CephContext
* cct
, const ghobject_t
& o
, const std::string
& k
)
103 void clear_pending_stripes() {
104 pending_stripes
.clear();
107 typedef boost::intrusive_ptr
<Onode
> OnodeRef
;
109 struct OnodeHashLRU
{
111 typedef boost::intrusive::list
<
113 boost::intrusive::member_hook
<
115 boost::intrusive::list_member_hook
<>,
116 &Onode::lru_item
> > lru_list_t
;
119 ceph::unordered_map
<ghobject_t
,OnodeRef
> onode_map
; ///< forward lookups
120 lru_list_t lru
; ///< lru
122 OnodeHashLRU(CephContext
* cct
) : cct(cct
) {}
124 void add(const ghobject_t
& oid
, OnodeRef o
);
125 void _touch(OnodeRef o
);
126 OnodeRef
lookup(const ghobject_t
& o
);
127 void rename(const ghobject_t
& old_oid
, const ghobject_t
& new_oid
);
129 bool get_next(const ghobject_t
& after
, std::pair
<ghobject_t
,OnodeRef
> *next
);
130 int trim(int max
=-1);
134 typedef boost::intrusive_ptr
<OpSequencer
> OpSequencerRef
;
136 struct Collection
: public CollectionImpl
{
138 kstore_cnode_t cnode
;
139 ceph::shared_mutex lock
=
140 ceph::make_shared_mutex("KStore::Collection::lock", true, false);
144 // cache onodes on a per-collection basis to avoid lock
146 OnodeHashLRU onode_map
;
148 OnodeRef
get_onode(const ghobject_t
& oid
, bool create
);
150 bool contains(const ghobject_t
& oid
) {
152 return oid
.hobj
.pool
== -1;
154 if (cid
.is_pg(&spgid
))
156 spgid
.pgid
.contains(cnode
.bits
, oid
) &&
157 oid
.shard_id
== spgid
.shard
;
161 void flush() override
;
162 bool flush_commit(Context
*c
) override
;
165 FRIEND_MAKE_REF(Collection
);
166 Collection(KStore
*ns
, coll_t c
);
168 using CollectionRef
= ceph::ref_t
<Collection
>;
170 class OmapIteratorImpl
: public ObjectMap::ObjectMapIteratorImpl
{
173 KeyValueDB::Iterator it
;
174 std::string head
, tail
;
176 OmapIteratorImpl(CollectionRef c
, OnodeRef o
, KeyValueDB::Iterator it
);
177 int seek_to_first() override
;
178 int upper_bound(const std::string
&after
) override
;
179 int lower_bound(const std::string
&to
) override
;
180 bool valid() override
;
182 std::string
key() override
;
183 ceph::buffer::list
value() override
;
184 int status() override
{
189 struct TransContext
{
203 const char *get_state_name() {
205 case STATE_PREPARE
: return "prepare";
206 case STATE_AIO_WAIT
: return "aio_wait";
207 case STATE_IO_DONE
: return "io_done";
208 case STATE_KV_QUEUED
: return "kv_queued";
209 case STATE_KV_COMMITTING
: return "kv_committing";
210 case STATE_KV_DONE
: return "kv_done";
211 case STATE_FINISHING
: return "finishing";
212 case STATE_DONE
: return "done";
217 void log_state_latency(PerfCounters
*logger
, int state
) {
218 utime_t lat
, now
= ceph_clock_now();
220 logger
->tinc(state
, lat
);
226 boost::intrusive::list_member_hook
<> sequencer_item
;
230 std::set
<OnodeRef
> onodes
; ///< these onodes need to be updated/written
231 KeyValueDB::Transaction t
; ///< then we will commit this
232 Context
*oncommit
; ///< signal on commit
233 Context
*onreadable
; ///< signal on readable
234 Context
*onreadable_sync
; ///< signal on readable
235 std::list
<Context
*> oncommits
; ///< more commit completions
236 std::list
<CollectionRef
> removed_collections
; ///< colls we removed
238 CollectionRef first_collection
; ///< first referenced collection
240 explicit TransContext(OpSequencer
*o
)
241 : state(STATE_PREPARE
),
247 onreadable_sync(NULL
),
248 start(ceph_clock_now()){
249 //cout << "txc new " << this << std::endl;
252 //cout << "txc del " << this << std::endl;
255 void write_onode(OnodeRef
&o
) {
260 class OpSequencer
: public RefCountedObject
{
263 std::condition_variable qcond
;
264 typedef boost::intrusive::list
<
266 boost::intrusive::member_hook
<
268 boost::intrusive::list_member_hook
<>,
269 &TransContext::sequencer_item
> > q_list_t
;
270 q_list_t q
; ///< transactions
273 ceph_assert(q
.empty());
276 void queue_new(TransContext
*txc
) {
277 std::lock_guard
<std::mutex
> l(qlock
);
282 std::unique_lock
<std::mutex
> l(qlock
);
287 bool flush_commit(Context
*c
) {
288 std::lock_guard
<std::mutex
> l(qlock
);
292 TransContext
*txc
= &q
.back();
293 if (txc
->state
>= TransContext::STATE_KV_DONE
) {
296 ceph_assert(txc
->state
< TransContext::STATE_KV_DONE
);
297 txc
->oncommits
.push_back(c
);
302 struct KVSyncThread
: public Thread
{
304 explicit KVSyncThread(KStore
*s
) : store(s
) {}
305 void *entry() override
{
306 store
->_kv_sync_thread();
311 // --------------------------------------------------------
317 int path_fd
; ///< open handle to $path
318 int fsid_fd
; ///< open handle (locked) to $path/fsid
321 /// rwlock to protect coll_map
322 ceph::shared_mutex coll_lock
= ceph::make_shared_mutex("KStore::coll_lock");
323 ceph::unordered_map
<coll_t
, CollectionRef
> coll_map
;
324 std::map
<coll_t
,CollectionRef
> new_coll_map
;
330 Throttle throttle_ops
, throttle_bytes
; ///< submit to commit
334 KVSyncThread kv_sync_thread
;
336 std::condition_variable kv_cond
, kv_sync_cond
;
338 std::deque
<TransContext
*> kv_queue
, kv_committing
;
341 PerfCounters
*logger
;
342 std::mutex reap_lock
;
343 std::list
<CollectionRef
> removed_collections
;
346 // --------------------------------------------------------
350 void _shutdown_logger();
354 int _open_fsid(bool create
);
356 int _read_fsid(uuid_d
*f
);
359 int _open_db(bool create
);
361 int _open_collections(int *errors
=0);
362 void _close_collections();
364 int _open_super_meta();
366 CollectionRef
_get_collection(coll_t cid
);
367 void _queue_reap_collection(CollectionRef
& c
);
368 void _reap_collections();
370 void _assign_nid(TransContext
*txc
, OnodeRef o
);
372 void _dump_onode(OnodeRef o
);
374 TransContext
*_txc_create(OpSequencer
*osr
);
375 void _txc_release(TransContext
*txc
, uint64_t offset
, uint64_t length
);
376 void _txc_add_transaction(TransContext
*txc
, Transaction
*t
);
377 void _txc_finalize(OpSequencer
*osr
, TransContext
*txc
);
378 void _txc_state_proc(TransContext
*txc
);
379 void _txc_finish_kv(TransContext
*txc
);
380 void _txc_finish(TransContext
*txc
);
382 void _osr_reap_done(OpSequencer
*osr
);
384 void _kv_sync_thread();
387 std::lock_guard
<std::mutex
> l(kv_lock
);
389 kv_cond
.notify_all();
391 kv_sync_thread
.join();
395 void _do_read_stripe(OnodeRef o
, uint64_t offset
, ceph::buffer::list
*pbl
, bool do_cache
);
396 void _do_write_stripe(TransContext
*txc
, OnodeRef o
,
397 uint64_t offset
, ceph::buffer::list
& bl
);
398 void _do_remove_stripe(TransContext
*txc
, OnodeRef o
, uint64_t offset
);
400 int _collection_list(
401 Collection
*c
, const ghobject_t
& start
, const ghobject_t
& end
,
402 int max
, std::vector
<ghobject_t
> *ls
, ghobject_t
*next
);
405 KStore(CephContext
*cct
, const std::string
& path
);
408 std::string
get_type() override
{
412 bool needs_journal() override
{ return false; };
413 bool wants_journal() override
{ return false; };
414 bool allows_journal() override
{ return false; };
416 static int get_block_device_fsid(const std::string
& path
, uuid_d
*fsid
);
418 bool test_mount_in_use() override
;
420 int mount() override
;
421 int umount() override
;
424 int fsck(bool deep
) override
;
427 int validate_hobject_key(const hobject_t
&obj
) const override
{
430 unsigned get_max_attr_name_length() override
{
431 return 256; // arbitrary; there is no real limit internally
435 int mkjournal() override
{
438 void dump_perf_counters(ceph::Formatter
*f
) override
{
439 f
->open_object_section("perf_counters");
440 logger
->dump_formatted(f
, false);
443 void get_db_statistics(ceph::Formatter
*f
) override
{
444 db
->get_statistics(f
);
446 int statfs(struct store_statfs_t
*buf
,
447 osd_alert_list_t
* alerts
= nullptr) override
;
448 int pool_statfs(uint64_t pool_id
, struct store_statfs_t
*buf
,
449 bool *per_pool_omap
) override
;
451 CollectionHandle
open_collection(const coll_t
& c
) override
;
452 CollectionHandle
create_new_collection(const coll_t
& c
) override
;
453 void set_collection_commit_queue(const coll_t
& cid
,
454 ContextQueue
*commit_queue
) override
{
457 using ObjectStore::exists
;
458 bool exists(CollectionHandle
& c
, const ghobject_t
& oid
) override
;
459 using ObjectStore::stat
;
462 const ghobject_t
& oid
,
464 bool allow_eio
= false) override
; // struct stat?
465 int set_collection_opts(
467 const pool_opts_t
& opts
) override
;
468 using ObjectStore::read
;
471 const ghobject_t
& oid
,
474 ceph::buffer::list
& bl
,
475 uint32_t op_flags
= 0) override
;
480 ceph::buffer::list
& bl
,
482 uint32_t op_flags
= 0);
484 using ObjectStore::fiemap
;
485 int fiemap(CollectionHandle
& c
, const ghobject_t
& oid
, uint64_t offset
, size_t len
, std::map
<uint64_t, uint64_t>& destmap
) override
;
486 int fiemap(CollectionHandle
& c
, const ghobject_t
& oid
, uint64_t offset
, size_t len
, ceph::buffer::list
& outbl
) override
;
487 using ObjectStore::getattr
;
488 int getattr(CollectionHandle
& c
, const ghobject_t
& oid
, const char *name
, ceph::buffer::ptr
& value
) override
;
489 using ObjectStore::getattrs
;
490 int getattrs(CollectionHandle
& c
, const ghobject_t
& oid
, std::map
<std::string
,ceph::buffer::ptr
>& aset
) override
;
492 int list_collections(std::vector
<coll_t
>& ls
) override
;
493 bool collection_exists(const coll_t
& c
) override
;
494 int collection_empty(CollectionHandle
& c
, bool *empty
) override
;
495 int collection_bits(CollectionHandle
& c
) override
;
497 CollectionHandle
&c
, const ghobject_t
& start
, const ghobject_t
& end
,
499 std::vector
<ghobject_t
> *ls
, ghobject_t
*next
) override
;
501 using ObjectStore::omap_get
;
503 CollectionHandle
& c
, ///< [in] Collection containing oid
504 const ghobject_t
&oid
, ///< [in] Object containing omap
505 ceph::buffer::list
*header
, ///< [out] omap header
506 std::map
<std::string
, ceph::buffer::list
> *out
/// < [out] Key to value std::map
509 using ObjectStore::omap_get_header
;
512 CollectionHandle
& c
, ///< [in] Collection containing oid
513 const ghobject_t
&oid
, ///< [in] Object containing omap
514 ceph::buffer::list
*header
, ///< [out] omap header
515 bool allow_eio
= false ///< [in] don't assert on eio
518 using ObjectStore::omap_get_keys
;
519 /// Get keys defined on oid
521 CollectionHandle
& c
, ///< [in] Collection containing oid
522 const ghobject_t
&oid
, ///< [in] Object containing omap
523 std::set
<std::string
> *keys
///< [out] Keys defined on oid
526 using ObjectStore::omap_get_values
;
529 CollectionHandle
& c
, ///< [in] Collection containing oid
530 const ghobject_t
&oid
, ///< [in] Object containing omap
531 const std::set
<std::string
> &keys
, ///< [in] Keys to get
532 std::map
<std::string
, ceph::buffer::list
> *out
///< [out] Returned keys and values
535 using ObjectStore::omap_check_keys
;
536 /// Filters keys into out which are defined on oid
538 CollectionHandle
& c
, ///< [in] Collection containing oid
539 const ghobject_t
&oid
, ///< [in] Object containing omap
540 const std::set
<std::string
> &keys
, ///< [in] Keys to check
541 std::set
<std::string
> *out
///< [out] Subset of keys defined on oid
544 using ObjectStore::get_omap_iterator
;
545 ObjectMap::ObjectMapIterator
get_omap_iterator(
546 CollectionHandle
& c
, ///< [in] collection
547 const ghobject_t
&oid
///< [in] object
550 void set_fsid(uuid_d u
) override
{
553 uuid_d
get_fsid() override
{
557 uint64_t estimate_objects_overhead(uint64_t num_objects
) override
{
558 return num_objects
* 300; //assuming per-object overhead is 300 bytes
561 objectstore_perf_stat_t
get_cur_stats() override
{
562 return objectstore_perf_stat_t();
564 const PerfCounters
* get_perf_counters() const override
{
569 int queue_transactions(
570 CollectionHandle
& ch
,
571 std::vector
<Transaction
>& tls
,
572 TrackedOpRef op
= TrackedOpRef(),
573 ThreadPool::TPHandle
*handle
= NULL
) override
;
575 void compact () override
{
581 // --------------------------------------------------------
584 int _write(TransContext
*txc
,
587 uint64_t offset
, size_t len
,
588 ceph::buffer::list
& bl
,
589 uint32_t fadvise_flags
);
590 int _do_write(TransContext
*txc
,
592 uint64_t offset
, uint64_t length
,
593 ceph::buffer::list
& bl
,
594 uint32_t fadvise_flags
);
595 int _touch(TransContext
*txc
,
598 int _zero(TransContext
*txc
,
601 uint64_t offset
, size_t len
);
602 int _do_truncate(TransContext
*txc
,
605 int _truncate(TransContext
*txc
,
609 int _remove(TransContext
*txc
,
612 int _do_remove(TransContext
*txc
,
614 int _setattr(TransContext
*txc
,
617 const std::string
& name
,
618 ceph::buffer::ptr
& val
);
619 int _setattrs(TransContext
*txc
,
622 const std::map
<std::string
,ceph::buffer::ptr
>& aset
);
623 int _rmattr(TransContext
*txc
,
626 const std::string
& name
);
627 int _rmattrs(TransContext
*txc
,
630 void _do_omap_clear(TransContext
*txc
, uint64_t id
);
631 int _omap_clear(TransContext
*txc
,
634 int _omap_setkeys(TransContext
*txc
,
637 ceph::buffer::list
& bl
);
638 int _omap_setheader(TransContext
*txc
,
641 ceph::buffer::list
& header
);
642 int _omap_rmkeys(TransContext
*txc
,
645 const ceph::buffer::list
& bl
);
646 int _omap_rmkey_range(TransContext
*txc
,
649 const std::string
& first
, const std::string
& last
);
650 int _setallochint(TransContext
*txc
,
653 uint64_t expected_object_size
,
654 uint64_t expected_write_size
,
656 int _clone(TransContext
*txc
,
660 int _clone_range(TransContext
*txc
,
664 uint64_t srcoff
, uint64_t length
, uint64_t dstoff
);
665 int _rename(TransContext
*txc
,
669 const ghobject_t
& new_oid
);
670 int _create_collection(TransContext
*txc
, coll_t cid
, unsigned bits
,
672 int _remove_collection(TransContext
*txc
, coll_t cid
, CollectionRef
*c
);
673 int _split_collection(TransContext
*txc
,
676 unsigned bits
, int rem
);
677 int _merge_collection(TransContext
*txc
,
684 static inline void intrusive_ptr_add_ref(KStore::Onode
*o
) {
687 static inline void intrusive_ptr_release(KStore::Onode
*o
) {
691 static inline void intrusive_ptr_add_ref(KStore::OpSequencer
*o
) {
694 static inline void intrusive_ptr_release(KStore::OpSequencer
*o
) {