1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_OSD_KSTORE_H
16 #define CEPH_OSD_KSTORE_H
24 #include <condition_variable>
26 #include "include/assert.h"
27 #include "include/unordered_map.h"
28 #include "include/memory.h"
29 #include "common/Finisher.h"
30 #include "common/RWLock.h"
31 #include "common/WorkQueue.h"
32 #include "os/ObjectStore.h"
33 #include "common/perf_counters.h"
35 #include "kv/KeyValueDB.h"
37 #include "kstore_types.h"
39 #include "boost/intrusive/list.hpp"
42 l_kstore_first
= 832430,
43 l_kstore_state_prepare_lat
,
44 l_kstore_state_kv_queued_lat
,
45 l_kstore_state_kv_done_lat
,
46 l_kstore_state_finishing_lat
,
47 l_kstore_state_done_lat
,
51 class KStore
: public ObjectStore
{
52 // -----------------------------------------------------
58 /// an in-memory object
61 std::atomic_int nref
; ///< reference count
64 string key
; ///< key under PREFIX_OBJ where we are stored
65 boost::intrusive::list_member_hook
<> lru_item
;
67 kstore_onode_t onode
; ///< metadata stored as value in kv store
71 std::mutex flush_lock
; ///< protect flush_txns
72 std::condition_variable flush_cond
; ///< wait here for unapplied txns
73 set
<TransContext
*> flush_txns
; ///< committing txns
78 map
<uint64_t,bufferlist
> pending_stripes
; ///< unwritten stripes
80 Onode(CephContext
* cct
, const ghobject_t
& o
, const string
& k
)
103 void clear_pending_stripes() {
104 pending_stripes
.clear();
107 typedef boost::intrusive_ptr
<Onode
> OnodeRef
;
109 struct OnodeHashLRU
{
111 typedef boost::intrusive::list
<
113 boost::intrusive::member_hook
<
115 boost::intrusive::list_member_hook
<>,
116 &Onode::lru_item
> > lru_list_t
;
119 ceph::unordered_map
<ghobject_t
,OnodeRef
> onode_map
; ///< forward lookups
120 lru_list_t lru
; ///< lru
122 OnodeHashLRU(CephContext
* cct
) : cct(cct
) {}
124 void add(const ghobject_t
& oid
, OnodeRef o
);
125 void _touch(OnodeRef o
);
126 OnodeRef
lookup(const ghobject_t
& o
);
127 void rename(const ghobject_t
& old_oid
, const ghobject_t
& new_oid
);
129 bool get_next(const ghobject_t
& after
, pair
<ghobject_t
,OnodeRef
> *next
);
130 int trim(int max
=-1);
133 struct Collection
: public CollectionImpl
{
136 kstore_cnode_t cnode
;
139 // cache onodes on a per-collection basis to avoid lock
141 OnodeHashLRU onode_map
;
143 OnodeRef
get_onode(const ghobject_t
& oid
, bool create
);
145 const coll_t
&get_cid() override
{
149 bool contains(const ghobject_t
& oid
) {
151 return oid
.hobj
.pool
== -1;
153 if (cid
.is_pg(&spgid
))
155 spgid
.pgid
.contains(cnode
.bits
, oid
) &&
156 oid
.shard_id
== spgid
.shard
;
160 Collection(KStore
*ns
, coll_t c
);
162 typedef boost::intrusive_ptr
<Collection
> CollectionRef
;
164 class OmapIteratorImpl
: public ObjectMap::ObjectMapIteratorImpl
{
167 KeyValueDB::Iterator it
;
170 OmapIteratorImpl(CollectionRef c
, OnodeRef o
, KeyValueDB::Iterator it
);
171 int seek_to_first() override
;
172 int upper_bound(const string
&after
) override
;
173 int lower_bound(const string
&to
) override
;
174 bool valid() override
;
175 int next(bool validate
=true) override
;
176 string
key() override
;
177 bufferlist
value() override
;
178 int status() override
{
184 typedef boost::intrusive_ptr
<OpSequencer
> OpSequencerRef
;
186 struct TransContext
{
200 const char *get_state_name() {
202 case STATE_PREPARE
: return "prepare";
203 case STATE_AIO_WAIT
: return "aio_wait";
204 case STATE_IO_DONE
: return "io_done";
205 case STATE_KV_QUEUED
: return "kv_queued";
206 case STATE_KV_COMMITTING
: return "kv_committing";
207 case STATE_KV_DONE
: return "kv_done";
208 case STATE_FINISHING
: return "finishing";
209 case STATE_DONE
: return "done";
214 void log_state_latency(PerfCounters
*logger
, int state
) {
215 utime_t lat
, now
= ceph_clock_now();
217 logger
->tinc(state
, lat
);
222 boost::intrusive::list_member_hook
<> sequencer_item
;
226 set
<OnodeRef
> onodes
; ///< these onodes need to be updated/written
227 KeyValueDB::Transaction t
; ///< then we will commit this
228 Context
*oncommit
; ///< signal on commit
229 Context
*onreadable
; ///< signal on readable
230 Context
*onreadable_sync
; ///< signal on readable
231 list
<Context
*> oncommits
; ///< more commit completions
232 list
<CollectionRef
> removed_collections
; ///< colls we removed
234 CollectionRef first_collection
; ///< first referenced collection
236 explicit TransContext(OpSequencer
*o
)
237 : state(STATE_PREPARE
),
243 onreadable_sync(NULL
),
244 start(ceph_clock_now()){
245 //cout << "txc new " << this << std::endl;
248 //cout << "txc del " << this << std::endl;
251 void write_onode(OnodeRef
&o
) {
256 class OpSequencer
: public Sequencer_impl
{
259 std::condition_variable qcond
;
260 typedef boost::intrusive::list
<
262 boost::intrusive::member_hook
<
264 boost::intrusive::list_member_hook
<>,
265 &TransContext::sequencer_item
> > q_list_t
;
266 q_list_t q
; ///< transactions
270 OpSequencer(CephContext
* cct
)
271 //set the qlock to PTHREAD_MUTEX_RECURSIVE mode
272 : Sequencer_impl(cct
),
275 ~OpSequencer() override
{
279 void queue_new(TransContext
*txc
) {
280 std::lock_guard
<std::mutex
> l(qlock
);
284 void flush() override
{
285 std::unique_lock
<std::mutex
> l(qlock
);
290 bool flush_commit(Context
*c
) override
{
291 std::lock_guard
<std::mutex
> l(qlock
);
295 TransContext
*txc
= &q
.back();
296 if (txc
->state
>= TransContext::STATE_KV_DONE
) {
299 assert(txc
->state
< TransContext::STATE_KV_DONE
);
300 txc
->oncommits
.push_back(c
);
305 struct KVSyncThread
: public Thread
{
307 explicit KVSyncThread(KStore
*s
) : store(s
) {}
308 void *entry() override
{
309 store
->_kv_sync_thread();
314 // --------------------------------------------------------
319 int path_fd
; ///< open handle to $path
320 int fsid_fd
; ///< open handle (locked) to $path/fsid
323 RWLock coll_lock
; ///< rwlock to protect coll_map
324 ceph::unordered_map
<coll_t
, CollectionRef
> coll_map
;
330 Throttle throttle_ops
, throttle_bytes
; ///< submit to commit
334 KVSyncThread kv_sync_thread
;
336 std::condition_variable kv_cond
, kv_sync_cond
;
338 deque
<TransContext
*> kv_queue
, kv_committing
;
341 PerfCounters
*logger
;
342 std::mutex reap_lock
;
343 list
<CollectionRef
> removed_collections
;
346 // --------------------------------------------------------
350 void _shutdown_logger();
354 int _open_fsid(bool create
);
356 int _read_fsid(uuid_d
*f
);
359 int _open_db(bool create
);
361 int _open_collections(int *errors
=0);
362 void _close_collections();
364 int _open_super_meta();
366 CollectionRef
_get_collection(coll_t cid
);
367 void _queue_reap_collection(CollectionRef
& c
);
368 void _reap_collections();
370 void _assign_nid(TransContext
*txc
, OnodeRef o
);
372 void _dump_onode(OnodeRef o
);
374 TransContext
*_txc_create(OpSequencer
*osr
);
375 void _txc_release(TransContext
*txc
, uint64_t offset
, uint64_t length
);
376 void _txc_add_transaction(TransContext
*txc
, Transaction
*t
);
377 void _txc_finalize(OpSequencer
*osr
, TransContext
*txc
);
378 void _txc_state_proc(TransContext
*txc
);
379 void _txc_finish_kv(TransContext
*txc
);
380 void _txc_finish(TransContext
*txc
);
382 void _osr_reap_done(OpSequencer
*osr
);
384 void _kv_sync_thread();
387 std::lock_guard
<std::mutex
> l(kv_lock
);
389 kv_cond
.notify_all();
391 kv_sync_thread
.join();
395 void _do_read_stripe(OnodeRef o
, uint64_t offset
, bufferlist
*pbl
);
396 void _do_write_stripe(TransContext
*txc
, OnodeRef o
,
397 uint64_t offset
, bufferlist
& bl
);
398 void _do_remove_stripe(TransContext
*txc
, OnodeRef o
, uint64_t offset
);
400 int _collection_list(
401 Collection
*c
, const ghobject_t
& start
, const ghobject_t
& end
,
402 int max
, vector
<ghobject_t
> *ls
, ghobject_t
*next
);
405 KStore(CephContext
*cct
, const string
& path
);
408 string
get_type() override
{
412 bool needs_journal() override
{ return false; };
413 bool wants_journal() override
{ return false; };
414 bool allows_journal() override
{ return false; };
416 static int get_block_device_fsid(const string
& path
, uuid_d
*fsid
);
418 bool test_mount_in_use() override
;
420 int mount() override
;
421 int umount() override
;
424 int fsck(bool deep
) override
;
427 int validate_hobject_key(const hobject_t
&obj
) const override
{
430 unsigned get_max_attr_name_length() override
{
431 return 256; // arbitrary; there is no real limit internally
435 int mkjournal() override
{
438 void dump_perf_counters(Formatter
*f
) override
{
439 f
->open_object_section("perf_counters");
440 logger
->dump_formatted(f
, false);
444 int statfs(struct store_statfs_t
*buf
) override
;
446 using ObjectStore::exists
;
447 bool exists(const coll_t
& cid
, const ghobject_t
& oid
) override
;
448 using ObjectStore::stat
;
451 const ghobject_t
& oid
,
453 bool allow_eio
= false) override
; // struct stat?
454 int set_collection_opts(
456 const pool_opts_t
& opts
) override
;
457 using ObjectStore::read
;
460 const ghobject_t
& oid
,
464 uint32_t op_flags
= 0) override
;
470 uint32_t op_flags
= 0);
472 using ObjectStore::fiemap
;
473 int fiemap(const coll_t
& cid
, const ghobject_t
& oid
, uint64_t offset
, size_t len
, bufferlist
& bl
) override
;
474 int fiemap(const coll_t
& cid
, const ghobject_t
& oid
, uint64_t offset
, size_t len
, map
<uint64_t, uint64_t>& destmap
) override
;
475 using ObjectStore::getattr
;
476 int getattr(const coll_t
& cid
, const ghobject_t
& oid
, const char *name
, bufferptr
& value
) override
;
477 using ObjectStore::getattrs
;
478 int getattrs(const coll_t
& cid
, const ghobject_t
& oid
, map
<string
,bufferptr
>& aset
) override
;
480 int list_collections(vector
<coll_t
>& ls
) override
;
481 bool collection_exists(const coll_t
& c
) override
;
482 int collection_empty(const coll_t
& c
, bool *empty
) override
;
483 int collection_bits(const coll_t
& c
) override
;
485 const coll_t
& cid
, const ghobject_t
& start
, const ghobject_t
& end
,
487 vector
<ghobject_t
> *ls
, ghobject_t
*next
) override
;
489 CollectionHandle
&c
, const ghobject_t
& start
, const ghobject_t
& end
,
491 vector
<ghobject_t
> *ls
, ghobject_t
*next
) override
;
493 using ObjectStore::omap_get
;
495 const coll_t
& cid
, ///< [in] Collection containing oid
496 const ghobject_t
&oid
, ///< [in] Object containing omap
497 bufferlist
*header
, ///< [out] omap header
498 map
<string
, bufferlist
> *out
/// < [out] Key to value map
501 using ObjectStore::omap_get_header
;
504 const coll_t
& cid
, ///< [in] Collection containing oid
505 const ghobject_t
&oid
, ///< [in] Object containing omap
506 bufferlist
*header
, ///< [out] omap header
507 bool allow_eio
= false ///< [in] don't assert on eio
510 using ObjectStore::omap_get_keys
;
511 /// Get keys defined on oid
513 const coll_t
& cid
, ///< [in] Collection containing oid
514 const ghobject_t
&oid
, ///< [in] Object containing omap
515 set
<string
> *keys
///< [out] Keys defined on oid
518 using ObjectStore::omap_get_values
;
521 const coll_t
& cid
, ///< [in] Collection containing oid
522 const ghobject_t
&oid
, ///< [in] Object containing omap
523 const set
<string
> &keys
, ///< [in] Keys to get
524 map
<string
, bufferlist
> *out
///< [out] Returned keys and values
527 using ObjectStore::omap_check_keys
;
528 /// Filters keys into out which are defined on oid
530 const coll_t
& cid
, ///< [in] Collection containing oid
531 const ghobject_t
&oid
, ///< [in] Object containing omap
532 const set
<string
> &keys
, ///< [in] Keys to check
533 set
<string
> *out
///< [out] Subset of keys defined on oid
536 using ObjectStore::get_omap_iterator
;
537 ObjectMap::ObjectMapIterator
get_omap_iterator(
538 const coll_t
& cid
, ///< [in] collection
539 const ghobject_t
&oid
///< [in] object
542 void set_fsid(uuid_d u
) override
{
545 uuid_d
get_fsid() override
{
549 uint64_t estimate_objects_overhead(uint64_t num_objects
) override
{
550 return num_objects
* 300; //assuming per-object overhead is 300 bytes
553 objectstore_perf_stat_t
get_cur_stats() override
{
554 return objectstore_perf_stat_t();
556 const PerfCounters
* get_perf_counters() const override
{
561 int queue_transactions(
563 vector
<Transaction
>& tls
,
564 TrackedOpRef op
= TrackedOpRef(),
565 ThreadPool::TPHandle
*handle
= NULL
) override
;
567 void compact () override
{
573 // --------------------------------------------------------
576 int _do_transaction(Transaction
*t
,
578 ThreadPool::TPHandle
*handle
);
580 int _write(TransContext
*txc
,
583 uint64_t offset
, size_t len
,
585 uint32_t fadvise_flags
);
586 int _do_write(TransContext
*txc
,
588 uint64_t offset
, uint64_t length
,
590 uint32_t fadvise_flags
);
591 int _touch(TransContext
*txc
,
594 int _zero(TransContext
*txc
,
597 uint64_t offset
, size_t len
);
598 int _do_truncate(TransContext
*txc
,
601 int _truncate(TransContext
*txc
,
605 int _remove(TransContext
*txc
,
608 int _do_remove(TransContext
*txc
,
610 int _setattr(TransContext
*txc
,
615 int _setattrs(TransContext
*txc
,
618 const map
<string
,bufferptr
>& aset
);
619 int _rmattr(TransContext
*txc
,
623 int _rmattrs(TransContext
*txc
,
626 void _do_omap_clear(TransContext
*txc
, uint64_t id
);
627 int _omap_clear(TransContext
*txc
,
630 int _omap_setkeys(TransContext
*txc
,
634 int _omap_setheader(TransContext
*txc
,
638 int _omap_rmkeys(TransContext
*txc
,
642 int _omap_rmkey_range(TransContext
*txc
,
645 const string
& first
, const string
& last
);
646 int _setallochint(TransContext
*txc
,
649 uint64_t expected_object_size
,
650 uint64_t expected_write_size
,
652 int _clone(TransContext
*txc
,
656 int _clone_range(TransContext
*txc
,
660 uint64_t srcoff
, uint64_t length
, uint64_t dstoff
);
661 int _rename(TransContext
*txc
,
665 const ghobject_t
& new_oid
);
666 int _create_collection(TransContext
*txc
, coll_t cid
, unsigned bits
,
668 int _remove_collection(TransContext
*txc
, coll_t cid
, CollectionRef
*c
);
669 int _split_collection(TransContext
*txc
,
672 unsigned bits
, int rem
);
676 inline ostream
& operator<<(ostream
& out
, const KStore::OpSequencer
& s
) {
677 return out
<< *s
.parent
;
680 static inline void intrusive_ptr_add_ref(KStore::Onode
*o
) {
683 static inline void intrusive_ptr_release(KStore::Onode
*o
) {
687 static inline void intrusive_ptr_add_ref(KStore::OpSequencer
*o
) {
690 static inline void intrusive_ptr_release(KStore::OpSequencer
*o
) {