]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/kstore/KStore.h
bfa1197339f71893646353724347347ff114e784
[ceph.git] / ceph / src / os / kstore / KStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_OSD_KSTORE_H
16 #define CEPH_OSD_KSTORE_H
17
18 #include "acconfig.h"
19
20 #include <unistd.h>
21
22 #include <atomic>
23 #include <mutex>
24 #include <condition_variable>
25
26 #include "include/assert.h"
27 #include "include/unordered_map.h"
28 #include "include/memory.h"
29 #include "common/Finisher.h"
30 #include "common/RWLock.h"
31 #include "common/WorkQueue.h"
32 #include "os/ObjectStore.h"
33 #include "common/perf_counters.h"
34 #include "os/fs/FS.h"
35 #include "kv/KeyValueDB.h"
36
37 #include "kstore_types.h"
38
39 #include "boost/intrusive/list.hpp"
40
41 enum {
42 l_kstore_first = 832430,
43 l_kstore_state_prepare_lat,
44 l_kstore_state_kv_queued_lat,
45 l_kstore_state_kv_done_lat,
46 l_kstore_state_finishing_lat,
47 l_kstore_state_done_lat,
48 l_kstore_last
49 };
50
51 class KStore : public ObjectStore {
52 // -----------------------------------------------------
53 // types
54 public:
55
56 class TransContext;
57
58 /// an in-memory object
59 struct Onode {
60 CephContext* cct;
61 std::atomic_int nref; ///< reference count
62
63 ghobject_t oid;
64 string key; ///< key under PREFIX_OBJ where we are stored
65 boost::intrusive::list_member_hook<> lru_item;
66
67 kstore_onode_t onode; ///< metadata stored as value in kv store
68 bool dirty; // ???
69 bool exists;
70
71 std::mutex flush_lock; ///< protect flush_txns
72 std::condition_variable flush_cond; ///< wait here for unapplied txns
73 set<TransContext*> flush_txns; ///< committing txns
74
75 uint64_t tail_offset;
76 bufferlist tail_bl;
77
78 map<uint64_t,bufferlist> pending_stripes; ///< unwritten stripes
79
80 Onode(CephContext* cct, const ghobject_t& o, const string& k)
81 : cct(cct),
82 nref(0),
83 oid(o),
84 key(k),
85 dirty(false),
86 exists(false),
87 tail_offset(0) {
88 }
89
90 void flush();
91 void get() {
92 ++nref;
93 }
94 void put() {
95 if (--nref == 0)
96 delete this;
97 }
98
99 void clear_tail() {
100 tail_offset = 0;
101 tail_bl.clear();
102 }
103 void clear_pending_stripes() {
104 pending_stripes.clear();
105 }
106 };
107 typedef boost::intrusive_ptr<Onode> OnodeRef;
108
109 struct OnodeHashLRU {
110 CephContext* cct;
111 typedef boost::intrusive::list<
112 Onode,
113 boost::intrusive::member_hook<
114 Onode,
115 boost::intrusive::list_member_hook<>,
116 &Onode::lru_item> > lru_list_t;
117
118 std::mutex lock;
119 ceph::unordered_map<ghobject_t,OnodeRef> onode_map; ///< forward lookups
120 lru_list_t lru; ///< lru
121
122 OnodeHashLRU(CephContext* cct) : cct(cct) {}
123
124 void add(const ghobject_t& oid, OnodeRef o);
125 void _touch(OnodeRef o);
126 OnodeRef lookup(const ghobject_t& o);
127 void rename(const ghobject_t& old_oid, const ghobject_t& new_oid);
128 void clear();
129 bool get_next(const ghobject_t& after, pair<ghobject_t,OnodeRef> *next);
130 int trim(int max=-1);
131 };
132
133 struct Collection : public CollectionImpl {
134 KStore *store;
135 coll_t cid;
136 kstore_cnode_t cnode;
137 RWLock lock;
138
139 // cache onodes on a per-collection basis to avoid lock
140 // contention.
141 OnodeHashLRU onode_map;
142
143 OnodeRef get_onode(const ghobject_t& oid, bool create);
144
145 const coll_t &get_cid() override {
146 return cid;
147 }
148
149 bool contains(const ghobject_t& oid) {
150 if (cid.is_meta())
151 return oid.hobj.pool == -1;
152 spg_t spgid;
153 if (cid.is_pg(&spgid))
154 return
155 spgid.pgid.contains(cnode.bits, oid) &&
156 oid.shard_id == spgid.shard;
157 return false;
158 }
159
160 Collection(KStore *ns, coll_t c);
161 };
162 typedef boost::intrusive_ptr<Collection> CollectionRef;
163
164 class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
165 CollectionRef c;
166 OnodeRef o;
167 KeyValueDB::Iterator it;
168 string head, tail;
169 public:
170 OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it);
171 int seek_to_first() override;
172 int upper_bound(const string &after) override;
173 int lower_bound(const string &to) override;
174 bool valid() override;
175 int next(bool validate=true) override;
176 string key() override;
177 bufferlist value() override;
178 int status() override {
179 return 0;
180 }
181 };
182
183 class OpSequencer;
184 typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef;
185
186 struct TransContext {
187 typedef enum {
188 STATE_PREPARE,
189 STATE_AIO_WAIT,
190 STATE_IO_DONE,
191 STATE_KV_QUEUED,
192 STATE_KV_COMMITTING,
193 STATE_KV_DONE,
194 STATE_FINISHING,
195 STATE_DONE,
196 } state_t;
197
198 state_t state;
199
200 const char *get_state_name() {
201 switch (state) {
202 case STATE_PREPARE: return "prepare";
203 case STATE_AIO_WAIT: return "aio_wait";
204 case STATE_IO_DONE: return "io_done";
205 case STATE_KV_QUEUED: return "kv_queued";
206 case STATE_KV_COMMITTING: return "kv_committing";
207 case STATE_KV_DONE: return "kv_done";
208 case STATE_FINISHING: return "finishing";
209 case STATE_DONE: return "done";
210 }
211 return "???";
212 }
213
214 void log_state_latency(PerfCounters *logger, int state) {
215 utime_t lat, now = ceph_clock_now();
216 lat = now - start;
217 logger->tinc(state, lat);
218 start = now;
219 }
220
221 OpSequencerRef osr;
222 boost::intrusive::list_member_hook<> sequencer_item;
223
224 uint64_t ops, bytes;
225
226 set<OnodeRef> onodes; ///< these onodes need to be updated/written
227 KeyValueDB::Transaction t; ///< then we will commit this
228 Context *oncommit; ///< signal on commit
229 Context *onreadable; ///< signal on readable
230 Context *onreadable_sync; ///< signal on readable
231 list<Context*> oncommits; ///< more commit completions
232 list<CollectionRef> removed_collections; ///< colls we removed
233
234 CollectionRef first_collection; ///< first referenced collection
235 utime_t start;
236 explicit TransContext(OpSequencer *o)
237 : state(STATE_PREPARE),
238 osr(o),
239 ops(0),
240 bytes(0),
241 oncommit(NULL),
242 onreadable(NULL),
243 onreadable_sync(NULL),
244 start(ceph_clock_now()){
245 //cout << "txc new " << this << std::endl;
246 }
247 ~TransContext() {
248 //cout << "txc del " << this << std::endl;
249 }
250
251 void write_onode(OnodeRef &o) {
252 onodes.insert(o);
253 }
254 };
255
256 class OpSequencer : public Sequencer_impl {
257 public:
258 std::mutex qlock;
259 std::condition_variable qcond;
260 typedef boost::intrusive::list<
261 TransContext,
262 boost::intrusive::member_hook<
263 TransContext,
264 boost::intrusive::list_member_hook<>,
265 &TransContext::sequencer_item> > q_list_t;
266 q_list_t q; ///< transactions
267
268 Sequencer *parent;
269
270 OpSequencer(CephContext* cct)
271 //set the qlock to PTHREAD_MUTEX_RECURSIVE mode
272 : Sequencer_impl(cct),
273 parent(NULL) {
274 }
275 ~OpSequencer() override {
276 assert(q.empty());
277 }
278
279 void queue_new(TransContext *txc) {
280 std::lock_guard<std::mutex> l(qlock);
281 q.push_back(*txc);
282 }
283
284 void flush() override {
285 std::unique_lock<std::mutex> l(qlock);
286 while (!q.empty())
287 qcond.wait(l);
288 }
289
290 bool flush_commit(Context *c) override {
291 std::lock_guard<std::mutex> l(qlock);
292 if (q.empty()) {
293 return true;
294 }
295 TransContext *txc = &q.back();
296 if (txc->state >= TransContext::STATE_KV_DONE) {
297 return true;
298 }
299 assert(txc->state < TransContext::STATE_KV_DONE);
300 txc->oncommits.push_back(c);
301 return false;
302 }
303 };
304
305 struct KVSyncThread : public Thread {
306 KStore *store;
307 explicit KVSyncThread(KStore *s) : store(s) {}
308 void *entry() override {
309 store->_kv_sync_thread();
310 return NULL;
311 }
312 };
313
314 // --------------------------------------------------------
315 // members
316 private:
317 KeyValueDB *db;
318 uuid_d fsid;
319 int path_fd; ///< open handle to $path
320 int fsid_fd; ///< open handle (locked) to $path/fsid
321 bool mounted;
322
323 RWLock coll_lock; ///< rwlock to protect coll_map
324 ceph::unordered_map<coll_t, CollectionRef> coll_map;
325
326 std::mutex nid_lock;
327 uint64_t nid_last;
328 uint64_t nid_max;
329
330 Throttle throttle_ops, throttle_bytes; ///< submit to commit
331
332 Finisher finisher;
333
334 KVSyncThread kv_sync_thread;
335 std::mutex kv_lock;
336 std::condition_variable kv_cond, kv_sync_cond;
337 bool kv_stop;
338 deque<TransContext*> kv_queue, kv_committing;
339
340 //Logger *logger;
341 PerfCounters *logger;
342 std::mutex reap_lock;
343 list<CollectionRef> removed_collections;
344
345
346 // --------------------------------------------------------
347 // private methods
348
349 void _init_logger();
350 void _shutdown_logger();
351
352 int _open_path();
353 void _close_path();
354 int _open_fsid(bool create);
355 int _lock_fsid();
356 int _read_fsid(uuid_d *f);
357 int _write_fsid();
358 void _close_fsid();
359 int _open_db(bool create);
360 void _close_db();
361 int _open_collections(int *errors=0);
362 void _close_collections();
363
364 int _open_super_meta();
365
366 CollectionRef _get_collection(coll_t cid);
367 void _queue_reap_collection(CollectionRef& c);
368 void _reap_collections();
369
370 void _assign_nid(TransContext *txc, OnodeRef o);
371
372 void _dump_onode(OnodeRef o);
373
374 TransContext *_txc_create(OpSequencer *osr);
375 void _txc_release(TransContext *txc, uint64_t offset, uint64_t length);
376 void _txc_add_transaction(TransContext *txc, Transaction *t);
377 void _txc_finalize(OpSequencer *osr, TransContext *txc);
378 void _txc_state_proc(TransContext *txc);
379 void _txc_finish_kv(TransContext *txc);
380 void _txc_finish(TransContext *txc);
381
382 void _osr_reap_done(OpSequencer *osr);
383
384 void _kv_sync_thread();
385 void _kv_stop() {
386 {
387 std::lock_guard<std::mutex> l(kv_lock);
388 kv_stop = true;
389 kv_cond.notify_all();
390 }
391 kv_sync_thread.join();
392 kv_stop = false;
393 }
394
395 void _do_read_stripe(OnodeRef o, uint64_t offset, bufferlist *pbl);
396 void _do_write_stripe(TransContext *txc, OnodeRef o,
397 uint64_t offset, bufferlist& bl);
398 void _do_remove_stripe(TransContext *txc, OnodeRef o, uint64_t offset);
399
400 int _collection_list(
401 Collection *c, const ghobject_t& start, const ghobject_t& end,
402 int max, vector<ghobject_t> *ls, ghobject_t *next);
403
404 public:
405 KStore(CephContext *cct, const string& path);
406 ~KStore() override;
407
408 string get_type() override {
409 return "kstore";
410 }
411
412 bool needs_journal() override { return false; };
413 bool wants_journal() override { return false; };
414 bool allows_journal() override { return false; };
415
416 static int get_block_device_fsid(const string& path, uuid_d *fsid);
417
418 bool test_mount_in_use() override;
419
420 int mount() override;
421 int umount() override;
422 void _sync();
423
424 int fsck(bool deep) override;
425
426
427 int validate_hobject_key(const hobject_t &obj) const override {
428 return 0;
429 }
430 unsigned get_max_attr_name_length() override {
431 return 256; // arbitrary; there is no real limit internally
432 }
433
434 int mkfs() override;
435 int mkjournal() override {
436 return 0;
437 }
438 void dump_perf_counters(Formatter *f) override {
439 f->open_object_section("perf_counters");
440 logger->dump_formatted(f, false);
441 f->close_section();
442 }
443
444 int statfs(struct store_statfs_t *buf) override;
445
446 using ObjectStore::exists;
447 bool exists(const coll_t& cid, const ghobject_t& oid) override;
448 using ObjectStore::stat;
449 int stat(
450 const coll_t& cid,
451 const ghobject_t& oid,
452 struct stat *st,
453 bool allow_eio = false) override; // struct stat?
454 int set_collection_opts(
455 const coll_t& cid,
456 const pool_opts_t& opts) override;
457 using ObjectStore::read;
458 int read(
459 const coll_t& cid,
460 const ghobject_t& oid,
461 uint64_t offset,
462 size_t len,
463 bufferlist& bl,
464 uint32_t op_flags = 0,
465 bool allow_eio = false) override;
466 int _do_read(
467 OnodeRef o,
468 uint64_t offset,
469 size_t len,
470 bufferlist& bl,
471 uint32_t op_flags = 0);
472
473 using ObjectStore::fiemap;
474 int fiemap(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len, bufferlist& bl) override;
475 int fiemap(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
476 using ObjectStore::getattr;
477 int getattr(const coll_t& cid, const ghobject_t& oid, const char *name, bufferptr& value) override;
478 using ObjectStore::getattrs;
479 int getattrs(const coll_t& cid, const ghobject_t& oid, map<string,bufferptr>& aset) override;
480
481 int list_collections(vector<coll_t>& ls) override;
482 bool collection_exists(const coll_t& c) override;
483 int collection_empty(const coll_t& c, bool *empty) override;
484 int collection_bits(const coll_t& c) override;
485 int collection_list(
486 const coll_t& cid, const ghobject_t& start, const ghobject_t& end,
487 int max,
488 vector<ghobject_t> *ls, ghobject_t *next) override;
489 int collection_list(
490 CollectionHandle &c, const ghobject_t& start, const ghobject_t& end,
491 int max,
492 vector<ghobject_t> *ls, ghobject_t *next) override;
493
494 using ObjectStore::omap_get;
495 int omap_get(
496 const coll_t& cid, ///< [in] Collection containing oid
497 const ghobject_t &oid, ///< [in] Object containing omap
498 bufferlist *header, ///< [out] omap header
499 map<string, bufferlist> *out /// < [out] Key to value map
500 ) override;
501
502 using ObjectStore::omap_get_header;
503 /// Get omap header
504 int omap_get_header(
505 const coll_t& cid, ///< [in] Collection containing oid
506 const ghobject_t &oid, ///< [in] Object containing omap
507 bufferlist *header, ///< [out] omap header
508 bool allow_eio = false ///< [in] don't assert on eio
509 ) override;
510
511 using ObjectStore::omap_get_keys;
512 /// Get keys defined on oid
513 int omap_get_keys(
514 const coll_t& cid, ///< [in] Collection containing oid
515 const ghobject_t &oid, ///< [in] Object containing omap
516 set<string> *keys ///< [out] Keys defined on oid
517 ) override;
518
519 using ObjectStore::omap_get_values;
520 /// Get key values
521 int omap_get_values(
522 const coll_t& cid, ///< [in] Collection containing oid
523 const ghobject_t &oid, ///< [in] Object containing omap
524 const set<string> &keys, ///< [in] Keys to get
525 map<string, bufferlist> *out ///< [out] Returned keys and values
526 ) override;
527
528 using ObjectStore::omap_check_keys;
529 /// Filters keys into out which are defined on oid
530 int omap_check_keys(
531 const coll_t& cid, ///< [in] Collection containing oid
532 const ghobject_t &oid, ///< [in] Object containing omap
533 const set<string> &keys, ///< [in] Keys to check
534 set<string> *out ///< [out] Subset of keys defined on oid
535 ) override;
536
537 using ObjectStore::get_omap_iterator;
538 ObjectMap::ObjectMapIterator get_omap_iterator(
539 const coll_t& cid, ///< [in] collection
540 const ghobject_t &oid ///< [in] object
541 ) override;
542
543 void set_fsid(uuid_d u) override {
544 fsid = u;
545 }
546 uuid_d get_fsid() override {
547 return fsid;
548 }
549
550 uint64_t estimate_objects_overhead(uint64_t num_objects) override {
551 return num_objects * 300; //assuming per-object overhead is 300 bytes
552 }
553
554 objectstore_perf_stat_t get_cur_stats() override {
555 return objectstore_perf_stat_t();
556 }
557 const PerfCounters* get_perf_counters() const override {
558 return logger;
559 }
560
561
562 int queue_transactions(
563 Sequencer *osr,
564 vector<Transaction>& tls,
565 TrackedOpRef op = TrackedOpRef(),
566 ThreadPool::TPHandle *handle = NULL) override;
567
568 private:
569 // --------------------------------------------------------
570 // write ops
571
572 int _do_transaction(Transaction *t,
573 TransContext *txc,
574 ThreadPool::TPHandle *handle);
575
576 int _write(TransContext *txc,
577 CollectionRef& c,
578 OnodeRef& o,
579 uint64_t offset, size_t len,
580 bufferlist& bl,
581 uint32_t fadvise_flags);
582 int _do_write(TransContext *txc,
583 OnodeRef o,
584 uint64_t offset, uint64_t length,
585 bufferlist& bl,
586 uint32_t fadvise_flags);
587 int _touch(TransContext *txc,
588 CollectionRef& c,
589 OnodeRef& o);
590 int _zero(TransContext *txc,
591 CollectionRef& c,
592 OnodeRef& o,
593 uint64_t offset, size_t len);
594 int _do_truncate(TransContext *txc,
595 OnodeRef o,
596 uint64_t offset);
597 int _truncate(TransContext *txc,
598 CollectionRef& c,
599 OnodeRef& o,
600 uint64_t offset);
601 int _remove(TransContext *txc,
602 CollectionRef& c,
603 OnodeRef& o);
604 int _do_remove(TransContext *txc,
605 OnodeRef o);
606 int _setattr(TransContext *txc,
607 CollectionRef& c,
608 OnodeRef& o,
609 const string& name,
610 bufferptr& val);
611 int _setattrs(TransContext *txc,
612 CollectionRef& c,
613 OnodeRef& o,
614 const map<string,bufferptr>& aset);
615 int _rmattr(TransContext *txc,
616 CollectionRef& c,
617 OnodeRef& o,
618 const string& name);
619 int _rmattrs(TransContext *txc,
620 CollectionRef& c,
621 OnodeRef& o);
622 void _do_omap_clear(TransContext *txc, uint64_t id);
623 int _omap_clear(TransContext *txc,
624 CollectionRef& c,
625 OnodeRef& o);
626 int _omap_setkeys(TransContext *txc,
627 CollectionRef& c,
628 OnodeRef& o,
629 bufferlist& bl);
630 int _omap_setheader(TransContext *txc,
631 CollectionRef& c,
632 OnodeRef& o,
633 bufferlist& header);
634 int _omap_rmkeys(TransContext *txc,
635 CollectionRef& c,
636 OnodeRef& o,
637 bufferlist& bl);
638 int _omap_rmkey_range(TransContext *txc,
639 CollectionRef& c,
640 OnodeRef& o,
641 const string& first, const string& last);
642 int _setallochint(TransContext *txc,
643 CollectionRef& c,
644 OnodeRef& o,
645 uint64_t expected_object_size,
646 uint64_t expected_write_size,
647 uint32_t flags);
648 int _clone(TransContext *txc,
649 CollectionRef& c,
650 OnodeRef& oldo,
651 OnodeRef& newo);
652 int _clone_range(TransContext *txc,
653 CollectionRef& c,
654 OnodeRef& oldo,
655 OnodeRef& newo,
656 uint64_t srcoff, uint64_t length, uint64_t dstoff);
657 int _rename(TransContext *txc,
658 CollectionRef& c,
659 OnodeRef& oldo,
660 OnodeRef& newo,
661 const ghobject_t& new_oid);
662 int _create_collection(TransContext *txc, coll_t cid, unsigned bits,
663 CollectionRef *c);
664 int _remove_collection(TransContext *txc, coll_t cid, CollectionRef *c);
665 int _split_collection(TransContext *txc,
666 CollectionRef& c,
667 CollectionRef& d,
668 unsigned bits, int rem);
669
670 };
671
672 inline ostream& operator<<(ostream& out, const KStore::OpSequencer& s) {
673 return out << *s.parent;
674 }
675
676 static inline void intrusive_ptr_add_ref(KStore::Onode *o) {
677 o->get();
678 }
679 static inline void intrusive_ptr_release(KStore::Onode *o) {
680 o->put();
681 }
682
683 static inline void intrusive_ptr_add_ref(KStore::OpSequencer *o) {
684 o->get();
685 }
686 static inline void intrusive_ptr_release(KStore::OpSequencer *o) {
687 o->put();
688 }
689
690 #endif