1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_OSD_KSTORE_H
16 #define CEPH_OSD_KSTORE_H
24 #include <condition_variable>
26 #include "include/ceph_assert.h"
27 #include "include/unordered_map.h"
28 #include "common/Finisher.h"
29 #include "common/RWLock.h"
30 #include "common/Throttle.h"
31 #include "common/WorkQueue.h"
32 #include "os/ObjectStore.h"
33 #include "common/perf_counters.h"
35 #include "kv/KeyValueDB.h"
37 #include "kstore_types.h"
39 #include "boost/intrusive/list.hpp"
42 l_kstore_first
= 832430,
43 l_kstore_state_prepare_lat
,
44 l_kstore_state_kv_queued_lat
,
45 l_kstore_state_kv_done_lat
,
46 l_kstore_state_finishing_lat
,
47 l_kstore_state_done_lat
,
51 class KStore
: public ObjectStore
{
52 // -----------------------------------------------------
58 /// an in-memory object
61 std::atomic_int nref
; ///< reference count
64 string key
; ///< key under PREFIX_OBJ where we are stored
65 boost::intrusive::list_member_hook
<> lru_item
;
67 kstore_onode_t onode
; ///< metadata stored as value in kv store
71 std::mutex flush_lock
; ///< protect flush_txns
72 std::condition_variable flush_cond
; ///< wait here for unapplied txns
73 set
<TransContext
*> flush_txns
; ///< committing txns
78 map
<uint64_t,bufferlist
> pending_stripes
; ///< unwritten stripes
80 Onode(CephContext
* cct
, const ghobject_t
& o
, const string
& k
)
103 void clear_pending_stripes() {
104 pending_stripes
.clear();
107 typedef boost::intrusive_ptr
<Onode
> OnodeRef
;
109 struct OnodeHashLRU
{
111 typedef boost::intrusive::list
<
113 boost::intrusive::member_hook
<
115 boost::intrusive::list_member_hook
<>,
116 &Onode::lru_item
> > lru_list_t
;
119 ceph::unordered_map
<ghobject_t
,OnodeRef
> onode_map
; ///< forward lookups
120 lru_list_t lru
; ///< lru
122 OnodeHashLRU(CephContext
* cct
) : cct(cct
) {}
124 void add(const ghobject_t
& oid
, OnodeRef o
);
125 void _touch(OnodeRef o
);
126 OnodeRef
lookup(const ghobject_t
& o
);
127 void rename(const ghobject_t
& old_oid
, const ghobject_t
& new_oid
);
129 bool get_next(const ghobject_t
& after
, pair
<ghobject_t
,OnodeRef
> *next
);
130 int trim(int max
=-1);
134 typedef boost::intrusive_ptr
<OpSequencer
> OpSequencerRef
;
136 struct Collection
: public CollectionImpl
{
138 kstore_cnode_t cnode
;
143 // cache onodes on a per-collection basis to avoid lock
145 OnodeHashLRU onode_map
;
147 OnodeRef
get_onode(const ghobject_t
& oid
, bool create
);
149 bool contains(const ghobject_t
& oid
) {
151 return oid
.hobj
.pool
== -1;
153 if (cid
.is_pg(&spgid
))
155 spgid
.pgid
.contains(cnode
.bits
, oid
) &&
156 oid
.shard_id
== spgid
.shard
;
160 void flush() override
;
161 bool flush_commit(Context
*c
) override
;
163 Collection(KStore
*ns
, coll_t c
);
165 typedef boost::intrusive_ptr
<Collection
> CollectionRef
;
167 class OmapIteratorImpl
: public ObjectMap::ObjectMapIteratorImpl
{
170 KeyValueDB::Iterator it
;
173 OmapIteratorImpl(CollectionRef c
, OnodeRef o
, KeyValueDB::Iterator it
);
174 int seek_to_first() override
;
175 int upper_bound(const string
&after
) override
;
176 int lower_bound(const string
&to
) override
;
177 bool valid() override
;
179 string
key() override
;
180 bufferlist
value() override
;
181 int status() override
{
186 struct TransContext
{
200 const char *get_state_name() {
202 case STATE_PREPARE
: return "prepare";
203 case STATE_AIO_WAIT
: return "aio_wait";
204 case STATE_IO_DONE
: return "io_done";
205 case STATE_KV_QUEUED
: return "kv_queued";
206 case STATE_KV_COMMITTING
: return "kv_committing";
207 case STATE_KV_DONE
: return "kv_done";
208 case STATE_FINISHING
: return "finishing";
209 case STATE_DONE
: return "done";
214 void log_state_latency(PerfCounters
*logger
, int state
) {
215 utime_t lat
, now
= ceph_clock_now();
217 logger
->tinc(state
, lat
);
223 boost::intrusive::list_member_hook
<> sequencer_item
;
227 set
<OnodeRef
> onodes
; ///< these onodes need to be updated/written
228 KeyValueDB::Transaction t
; ///< then we will commit this
229 Context
*oncommit
; ///< signal on commit
230 Context
*onreadable
; ///< signal on readable
231 Context
*onreadable_sync
; ///< signal on readable
232 list
<Context
*> oncommits
; ///< more commit completions
233 list
<CollectionRef
> removed_collections
; ///< colls we removed
235 CollectionRef first_collection
; ///< first referenced collection
237 explicit TransContext(OpSequencer
*o
)
238 : state(STATE_PREPARE
),
244 onreadable_sync(NULL
),
245 start(ceph_clock_now()){
246 //cout << "txc new " << this << std::endl;
249 //cout << "txc del " << this << std::endl;
252 void write_onode(OnodeRef
&o
) {
257 class OpSequencer
: public RefCountedObject
{
260 std::condition_variable qcond
;
261 typedef boost::intrusive::list
<
263 boost::intrusive::member_hook
<
265 boost::intrusive::list_member_hook
<>,
266 &TransContext::sequencer_item
> > q_list_t
;
267 q_list_t q
; ///< transactions
270 ceph_assert(q
.empty());
273 void queue_new(TransContext
*txc
) {
274 std::lock_guard
<std::mutex
> l(qlock
);
279 std::unique_lock
<std::mutex
> l(qlock
);
284 bool flush_commit(Context
*c
) {
285 std::lock_guard
<std::mutex
> l(qlock
);
289 TransContext
*txc
= &q
.back();
290 if (txc
->state
>= TransContext::STATE_KV_DONE
) {
293 ceph_assert(txc
->state
< TransContext::STATE_KV_DONE
);
294 txc
->oncommits
.push_back(c
);
299 struct KVSyncThread
: public Thread
{
301 explicit KVSyncThread(KStore
*s
) : store(s
) {}
302 void *entry() override
{
303 store
->_kv_sync_thread();
308 // --------------------------------------------------------
314 int path_fd
; ///< open handle to $path
315 int fsid_fd
; ///< open handle (locked) to $path/fsid
318 RWLock coll_lock
; ///< rwlock to protect coll_map
319 ceph::unordered_map
<coll_t
, CollectionRef
> coll_map
;
320 map
<coll_t
,CollectionRef
> new_coll_map
;
326 Throttle throttle_ops
, throttle_bytes
; ///< submit to commit
330 KVSyncThread kv_sync_thread
;
332 std::condition_variable kv_cond
, kv_sync_cond
;
334 deque
<TransContext
*> kv_queue
, kv_committing
;
337 PerfCounters
*logger
;
338 std::mutex reap_lock
;
339 list
<CollectionRef
> removed_collections
;
342 // --------------------------------------------------------
346 void _shutdown_logger();
350 int _open_fsid(bool create
);
352 int _read_fsid(uuid_d
*f
);
355 int _open_db(bool create
);
357 int _open_collections(int *errors
=0);
358 void _close_collections();
360 int _open_super_meta();
362 CollectionRef
_get_collection(coll_t cid
);
363 void _queue_reap_collection(CollectionRef
& c
);
364 void _reap_collections();
366 void _assign_nid(TransContext
*txc
, OnodeRef o
);
368 void _dump_onode(OnodeRef o
);
370 TransContext
*_txc_create(OpSequencer
*osr
);
371 void _txc_release(TransContext
*txc
, uint64_t offset
, uint64_t length
);
372 void _txc_add_transaction(TransContext
*txc
, Transaction
*t
);
373 void _txc_finalize(OpSequencer
*osr
, TransContext
*txc
);
374 void _txc_state_proc(TransContext
*txc
);
375 void _txc_finish_kv(TransContext
*txc
);
376 void _txc_finish(TransContext
*txc
);
378 void _osr_reap_done(OpSequencer
*osr
);
380 void _kv_sync_thread();
383 std::lock_guard
<std::mutex
> l(kv_lock
);
385 kv_cond
.notify_all();
387 kv_sync_thread
.join();
391 void _do_read_stripe(OnodeRef o
, uint64_t offset
, bufferlist
*pbl
);
392 void _do_write_stripe(TransContext
*txc
, OnodeRef o
,
393 uint64_t offset
, bufferlist
& bl
);
394 void _do_remove_stripe(TransContext
*txc
, OnodeRef o
, uint64_t offset
);
396 int _collection_list(
397 Collection
*c
, const ghobject_t
& start
, const ghobject_t
& end
,
398 int max
, vector
<ghobject_t
> *ls
, ghobject_t
*next
);
401 KStore(CephContext
*cct
, const string
& path
);
404 string
get_type() override
{
408 bool needs_journal() override
{ return false; };
409 bool wants_journal() override
{ return false; };
410 bool allows_journal() override
{ return false; };
412 static int get_block_device_fsid(const string
& path
, uuid_d
*fsid
);
414 bool test_mount_in_use() override
;
416 int mount() override
;
417 int umount() override
;
420 int fsck(bool deep
) override
;
423 int validate_hobject_key(const hobject_t
&obj
) const override
{
426 unsigned get_max_attr_name_length() override
{
427 return 256; // arbitrary; there is no real limit internally
431 int mkjournal() override
{
434 void dump_perf_counters(Formatter
*f
) override
{
435 f
->open_object_section("perf_counters");
436 logger
->dump_formatted(f
, false);
439 void get_db_statistics(Formatter
*f
) override
{
440 db
->get_statistics(f
);
442 int statfs(struct store_statfs_t
*buf
,
443 osd_alert_list_t
* alerts
= nullptr) override
;
444 int pool_statfs(uint64_t pool_id
, struct store_statfs_t
*buf
) override
;
446 CollectionHandle
open_collection(const coll_t
& c
) override
;
447 CollectionHandle
create_new_collection(const coll_t
& c
) override
;
448 void set_collection_commit_queue(const coll_t
& cid
,
449 ContextQueue
*commit_queue
) override
{
452 using ObjectStore::exists
;
453 bool exists(CollectionHandle
& c
, const ghobject_t
& oid
) override
;
454 using ObjectStore::stat
;
457 const ghobject_t
& oid
,
459 bool allow_eio
= false) override
; // struct stat?
460 int set_collection_opts(
462 const pool_opts_t
& opts
) override
;
463 using ObjectStore::read
;
466 const ghobject_t
& oid
,
470 uint32_t op_flags
= 0) override
;
476 uint32_t op_flags
= 0);
478 using ObjectStore::fiemap
;
479 int fiemap(CollectionHandle
& c
, const ghobject_t
& oid
, uint64_t offset
, size_t len
, map
<uint64_t, uint64_t>& destmap
) override
;
480 int fiemap(CollectionHandle
& c
, const ghobject_t
& oid
, uint64_t offset
, size_t len
, bufferlist
& outbl
) override
;
481 using ObjectStore::getattr
;
482 int getattr(CollectionHandle
& c
, const ghobject_t
& oid
, const char *name
, bufferptr
& value
) override
;
483 using ObjectStore::getattrs
;
484 int getattrs(CollectionHandle
& c
, const ghobject_t
& oid
, map
<string
,bufferptr
>& aset
) override
;
486 int list_collections(vector
<coll_t
>& ls
) override
;
487 bool collection_exists(const coll_t
& c
) override
;
488 int collection_empty(CollectionHandle
& c
, bool *empty
) override
;
489 int collection_bits(CollectionHandle
& c
) override
;
491 CollectionHandle
&c
, const ghobject_t
& start
, const ghobject_t
& end
,
493 vector
<ghobject_t
> *ls
, ghobject_t
*next
) override
;
495 using ObjectStore::omap_get
;
497 CollectionHandle
& c
, ///< [in] Collection containing oid
498 const ghobject_t
&oid
, ///< [in] Object containing omap
499 bufferlist
*header
, ///< [out] omap header
500 map
<string
, bufferlist
> *out
/// < [out] Key to value map
503 using ObjectStore::omap_get_header
;
506 CollectionHandle
& c
, ///< [in] Collection containing oid
507 const ghobject_t
&oid
, ///< [in] Object containing omap
508 bufferlist
*header
, ///< [out] omap header
509 bool allow_eio
= false ///< [in] don't assert on eio
512 using ObjectStore::omap_get_keys
;
513 /// Get keys defined on oid
515 CollectionHandle
& c
, ///< [in] Collection containing oid
516 const ghobject_t
&oid
, ///< [in] Object containing omap
517 set
<string
> *keys
///< [out] Keys defined on oid
520 using ObjectStore::omap_get_values
;
523 CollectionHandle
& c
, ///< [in] Collection containing oid
524 const ghobject_t
&oid
, ///< [in] Object containing omap
525 const set
<string
> &keys
, ///< [in] Keys to get
526 map
<string
, bufferlist
> *out
///< [out] Returned keys and values
529 using ObjectStore::omap_check_keys
;
530 /// Filters keys into out which are defined on oid
532 CollectionHandle
& c
, ///< [in] Collection containing oid
533 const ghobject_t
&oid
, ///< [in] Object containing omap
534 const set
<string
> &keys
, ///< [in] Keys to check
535 set
<string
> *out
///< [out] Subset of keys defined on oid
538 using ObjectStore::get_omap_iterator
;
539 ObjectMap::ObjectMapIterator
get_omap_iterator(
540 CollectionHandle
& c
, ///< [in] collection
541 const ghobject_t
&oid
///< [in] object
544 void set_fsid(uuid_d u
) override
{
547 uuid_d
get_fsid() override
{
551 uint64_t estimate_objects_overhead(uint64_t num_objects
) override
{
552 return num_objects
* 300; //assuming per-object overhead is 300 bytes
555 objectstore_perf_stat_t
get_cur_stats() override
{
556 return objectstore_perf_stat_t();
558 const PerfCounters
* get_perf_counters() const override
{
563 int queue_transactions(
564 CollectionHandle
& ch
,
565 vector
<Transaction
>& tls
,
566 TrackedOpRef op
= TrackedOpRef(),
567 ThreadPool::TPHandle
*handle
= NULL
) override
;
569 void compact () override
{
575 // --------------------------------------------------------
578 int _write(TransContext
*txc
,
581 uint64_t offset
, size_t len
,
583 uint32_t fadvise_flags
);
584 int _do_write(TransContext
*txc
,
586 uint64_t offset
, uint64_t length
,
588 uint32_t fadvise_flags
);
589 int _touch(TransContext
*txc
,
592 int _zero(TransContext
*txc
,
595 uint64_t offset
, size_t len
);
596 int _do_truncate(TransContext
*txc
,
599 int _truncate(TransContext
*txc
,
603 int _remove(TransContext
*txc
,
606 int _do_remove(TransContext
*txc
,
608 int _setattr(TransContext
*txc
,
613 int _setattrs(TransContext
*txc
,
616 const map
<string
,bufferptr
>& aset
);
617 int _rmattr(TransContext
*txc
,
621 int _rmattrs(TransContext
*txc
,
624 void _do_omap_clear(TransContext
*txc
, uint64_t id
);
625 int _omap_clear(TransContext
*txc
,
628 int _omap_setkeys(TransContext
*txc
,
632 int _omap_setheader(TransContext
*txc
,
636 int _omap_rmkeys(TransContext
*txc
,
639 const bufferlist
& bl
);
640 int _omap_rmkey_range(TransContext
*txc
,
643 const string
& first
, const string
& last
);
644 int _setallochint(TransContext
*txc
,
647 uint64_t expected_object_size
,
648 uint64_t expected_write_size
,
650 int _clone(TransContext
*txc
,
654 int _clone_range(TransContext
*txc
,
658 uint64_t srcoff
, uint64_t length
, uint64_t dstoff
);
659 int _rename(TransContext
*txc
,
663 const ghobject_t
& new_oid
);
664 int _create_collection(TransContext
*txc
, coll_t cid
, unsigned bits
,
666 int _remove_collection(TransContext
*txc
, coll_t cid
, CollectionRef
*c
);
667 int _split_collection(TransContext
*txc
,
670 unsigned bits
, int rem
);
671 int _merge_collection(TransContext
*txc
,
678 static inline void intrusive_ptr_add_ref(KStore::Onode
*o
) {
681 static inline void intrusive_ptr_release(KStore::Onode
*o
) {
685 static inline void intrusive_ptr_add_ref(KStore::OpSequencer
*o
) {
688 static inline void intrusive_ptr_release(KStore::OpSequencer
*o
) {