]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/kstore/KStore.h
update sources to v12.1.1
[ceph.git] / ceph / src / os / kstore / KStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_OSD_KSTORE_H
16 #define CEPH_OSD_KSTORE_H
17
18 #include "acconfig.h"
19
20 #include <unistd.h>
21
22 #include <atomic>
23 #include <mutex>
24 #include <condition_variable>
25
26 #include "include/assert.h"
27 #include "include/unordered_map.h"
28 #include "include/memory.h"
29 #include "common/Finisher.h"
30 #include "common/RWLock.h"
31 #include "common/WorkQueue.h"
32 #include "os/ObjectStore.h"
33 #include "common/perf_counters.h"
34 #include "os/fs/FS.h"
35 #include "kv/KeyValueDB.h"
36
37 #include "kstore_types.h"
38
39 #include "boost/intrusive/list.hpp"
40
41 enum {
42 l_kstore_first = 832430,
43 l_kstore_state_prepare_lat,
44 l_kstore_state_kv_queued_lat,
45 l_kstore_state_kv_done_lat,
46 l_kstore_state_finishing_lat,
47 l_kstore_state_done_lat,
48 l_kstore_last
49 };
50
51 class KStore : public ObjectStore {
52 // -----------------------------------------------------
53 // types
54 public:
55
56 class TransContext;
57
58 /// an in-memory object
59 struct Onode {
60 CephContext* cct;
61 std::atomic_int nref; ///< reference count
62
63 ghobject_t oid;
64 string key; ///< key under PREFIX_OBJ where we are stored
65 boost::intrusive::list_member_hook<> lru_item;
66
67 kstore_onode_t onode; ///< metadata stored as value in kv store
68 bool dirty; // ???
69 bool exists;
70
71 std::mutex flush_lock; ///< protect flush_txns
72 std::condition_variable flush_cond; ///< wait here for unapplied txns
73 set<TransContext*> flush_txns; ///< committing txns
74
75 uint64_t tail_offset;
76 bufferlist tail_bl;
77
78 map<uint64_t,bufferlist> pending_stripes; ///< unwritten stripes
79
80 Onode(CephContext* cct, const ghobject_t& o, const string& k)
81 : cct(cct),
82 nref(0),
83 oid(o),
84 key(k),
85 dirty(false),
86 exists(false),
87 tail_offset(0) {
88 }
89
90 void flush();
91 void get() {
92 ++nref;
93 }
94 void put() {
95 if (--nref == 0)
96 delete this;
97 }
98
99 void clear_tail() {
100 tail_offset = 0;
101 tail_bl.clear();
102 }
103 void clear_pending_stripes() {
104 pending_stripes.clear();
105 }
106 };
107 typedef boost::intrusive_ptr<Onode> OnodeRef;
108
109 struct OnodeHashLRU {
110 CephContext* cct;
111 typedef boost::intrusive::list<
112 Onode,
113 boost::intrusive::member_hook<
114 Onode,
115 boost::intrusive::list_member_hook<>,
116 &Onode::lru_item> > lru_list_t;
117
118 std::mutex lock;
119 ceph::unordered_map<ghobject_t,OnodeRef> onode_map; ///< forward lookups
120 lru_list_t lru; ///< lru
121
122 OnodeHashLRU(CephContext* cct) : cct(cct) {}
123
124 void add(const ghobject_t& oid, OnodeRef o);
125 void _touch(OnodeRef o);
126 OnodeRef lookup(const ghobject_t& o);
127 void rename(const ghobject_t& old_oid, const ghobject_t& new_oid);
128 void clear();
129 bool get_next(const ghobject_t& after, pair<ghobject_t,OnodeRef> *next);
130 int trim(int max=-1);
131 };
132
133 struct Collection : public CollectionImpl {
134 KStore *store;
135 coll_t cid;
136 kstore_cnode_t cnode;
137 RWLock lock;
138
139 // cache onodes on a per-collection basis to avoid lock
140 // contention.
141 OnodeHashLRU onode_map;
142
143 OnodeRef get_onode(const ghobject_t& oid, bool create);
144
145 const coll_t &get_cid() override {
146 return cid;
147 }
148
149 bool contains(const ghobject_t& oid) {
150 if (cid.is_meta())
151 return oid.hobj.pool == -1;
152 spg_t spgid;
153 if (cid.is_pg(&spgid))
154 return
155 spgid.pgid.contains(cnode.bits, oid) &&
156 oid.shard_id == spgid.shard;
157 return false;
158 }
159
160 Collection(KStore *ns, coll_t c);
161 };
162 typedef boost::intrusive_ptr<Collection> CollectionRef;
163
164 class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
165 CollectionRef c;
166 OnodeRef o;
167 KeyValueDB::Iterator it;
168 string head, tail;
169 public:
170 OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it);
171 int seek_to_first() override;
172 int upper_bound(const string &after) override;
173 int lower_bound(const string &to) override;
174 bool valid() override;
175 int next(bool validate=true) override;
176 string key() override;
177 bufferlist value() override;
178 int status() override {
179 return 0;
180 }
181 };
182
183 class OpSequencer;
184 typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef;
185
186 struct TransContext {
187 typedef enum {
188 STATE_PREPARE,
189 STATE_AIO_WAIT,
190 STATE_IO_DONE,
191 STATE_KV_QUEUED,
192 STATE_KV_COMMITTING,
193 STATE_KV_DONE,
194 STATE_FINISHING,
195 STATE_DONE,
196 } state_t;
197
198 state_t state;
199
200 const char *get_state_name() {
201 switch (state) {
202 case STATE_PREPARE: return "prepare";
203 case STATE_AIO_WAIT: return "aio_wait";
204 case STATE_IO_DONE: return "io_done";
205 case STATE_KV_QUEUED: return "kv_queued";
206 case STATE_KV_COMMITTING: return "kv_committing";
207 case STATE_KV_DONE: return "kv_done";
208 case STATE_FINISHING: return "finishing";
209 case STATE_DONE: return "done";
210 }
211 return "???";
212 }
213
214 void log_state_latency(PerfCounters *logger, int state) {
215 utime_t lat, now = ceph_clock_now();
216 lat = now - start;
217 logger->tinc(state, lat);
218 start = now;
219 }
220
221 OpSequencerRef osr;
222 boost::intrusive::list_member_hook<> sequencer_item;
223
224 uint64_t ops, bytes;
225
226 set<OnodeRef> onodes; ///< these onodes need to be updated/written
227 KeyValueDB::Transaction t; ///< then we will commit this
228 Context *oncommit; ///< signal on commit
229 Context *onreadable; ///< signal on readable
230 Context *onreadable_sync; ///< signal on readable
231 list<Context*> oncommits; ///< more commit completions
232 list<CollectionRef> removed_collections; ///< colls we removed
233
234 CollectionRef first_collection; ///< first referenced collection
235 utime_t start;
236 explicit TransContext(OpSequencer *o)
237 : state(STATE_PREPARE),
238 osr(o),
239 ops(0),
240 bytes(0),
241 oncommit(NULL),
242 onreadable(NULL),
243 onreadable_sync(NULL),
244 start(ceph_clock_now()){
245 //cout << "txc new " << this << std::endl;
246 }
247 ~TransContext() {
248 //cout << "txc del " << this << std::endl;
249 }
250
251 void write_onode(OnodeRef &o) {
252 onodes.insert(o);
253 }
254 };
255
256 class OpSequencer : public Sequencer_impl {
257 public:
258 std::mutex qlock;
259 std::condition_variable qcond;
260 typedef boost::intrusive::list<
261 TransContext,
262 boost::intrusive::member_hook<
263 TransContext,
264 boost::intrusive::list_member_hook<>,
265 &TransContext::sequencer_item> > q_list_t;
266 q_list_t q; ///< transactions
267
268 Sequencer *parent;
269
270 OpSequencer(CephContext* cct)
271 //set the qlock to PTHREAD_MUTEX_RECURSIVE mode
272 : Sequencer_impl(cct),
273 parent(NULL) {
274 }
275 ~OpSequencer() override {
276 assert(q.empty());
277 }
278
279 void queue_new(TransContext *txc) {
280 std::lock_guard<std::mutex> l(qlock);
281 q.push_back(*txc);
282 }
283
284 void flush() override {
285 std::unique_lock<std::mutex> l(qlock);
286 while (!q.empty())
287 qcond.wait(l);
288 }
289
290 bool flush_commit(Context *c) override {
291 std::lock_guard<std::mutex> l(qlock);
292 if (q.empty()) {
293 return true;
294 }
295 TransContext *txc = &q.back();
296 if (txc->state >= TransContext::STATE_KV_DONE) {
297 return true;
298 }
299 assert(txc->state < TransContext::STATE_KV_DONE);
300 txc->oncommits.push_back(c);
301 return false;
302 }
303 };
304
305 struct KVSyncThread : public Thread {
306 KStore *store;
307 explicit KVSyncThread(KStore *s) : store(s) {}
308 void *entry() override {
309 store->_kv_sync_thread();
310 return NULL;
311 }
312 };
313
314 // --------------------------------------------------------
315 // members
316 private:
317 KeyValueDB *db;
318 uuid_d fsid;
319 int path_fd; ///< open handle to $path
320 int fsid_fd; ///< open handle (locked) to $path/fsid
321 bool mounted;
322
323 RWLock coll_lock; ///< rwlock to protect coll_map
324 ceph::unordered_map<coll_t, CollectionRef> coll_map;
325
326 std::mutex nid_lock;
327 uint64_t nid_last;
328 uint64_t nid_max;
329
330 Throttle throttle_ops, throttle_bytes; ///< submit to commit
331
332 Finisher finisher;
333
334 KVSyncThread kv_sync_thread;
335 std::mutex kv_lock;
336 std::condition_variable kv_cond, kv_sync_cond;
337 bool kv_stop;
338 deque<TransContext*> kv_queue, kv_committing;
339
340 //Logger *logger;
341 PerfCounters *logger;
342 std::mutex reap_lock;
343 list<CollectionRef> removed_collections;
344
345
346 // --------------------------------------------------------
347 // private methods
348
349 void _init_logger();
350 void _shutdown_logger();
351
352 int _open_path();
353 void _close_path();
354 int _open_fsid(bool create);
355 int _lock_fsid();
356 int _read_fsid(uuid_d *f);
357 int _write_fsid();
358 void _close_fsid();
359 int _open_db(bool create);
360 void _close_db();
361 int _open_collections(int *errors=0);
362 void _close_collections();
363
364 int _open_super_meta();
365
366 CollectionRef _get_collection(coll_t cid);
367 void _queue_reap_collection(CollectionRef& c);
368 void _reap_collections();
369
370 void _assign_nid(TransContext *txc, OnodeRef o);
371
372 void _dump_onode(OnodeRef o);
373
374 TransContext *_txc_create(OpSequencer *osr);
375 void _txc_release(TransContext *txc, uint64_t offset, uint64_t length);
376 void _txc_add_transaction(TransContext *txc, Transaction *t);
377 void _txc_finalize(OpSequencer *osr, TransContext *txc);
378 void _txc_state_proc(TransContext *txc);
379 void _txc_finish_kv(TransContext *txc);
380 void _txc_finish(TransContext *txc);
381
382 void _osr_reap_done(OpSequencer *osr);
383
384 void _kv_sync_thread();
385 void _kv_stop() {
386 {
387 std::lock_guard<std::mutex> l(kv_lock);
388 kv_stop = true;
389 kv_cond.notify_all();
390 }
391 kv_sync_thread.join();
392 kv_stop = false;
393 }
394
395 void _do_read_stripe(OnodeRef o, uint64_t offset, bufferlist *pbl);
396 void _do_write_stripe(TransContext *txc, OnodeRef o,
397 uint64_t offset, bufferlist& bl);
398 void _do_remove_stripe(TransContext *txc, OnodeRef o, uint64_t offset);
399
400 int _collection_list(
401 Collection *c, const ghobject_t& start, const ghobject_t& end,
402 int max, vector<ghobject_t> *ls, ghobject_t *next);
403
404 public:
405 KStore(CephContext *cct, const string& path);
406 ~KStore() override;
407
408 string get_type() override {
409 return "kstore";
410 }
411
412 bool needs_journal() override { return false; };
413 bool wants_journal() override { return false; };
414 bool allows_journal() override { return false; };
415
416 static int get_block_device_fsid(const string& path, uuid_d *fsid);
417
418 bool test_mount_in_use() override;
419
420 int mount() override;
421 int umount() override;
422 void _sync();
423
424 int fsck(bool deep) override;
425
426
427 int validate_hobject_key(const hobject_t &obj) const override {
428 return 0;
429 }
430 unsigned get_max_attr_name_length() override {
431 return 256; // arbitrary; there is no real limit internally
432 }
433
434 int mkfs() override;
435 int mkjournal() override {
436 return 0;
437 }
438 void dump_perf_counters(Formatter *f) override {
439 f->open_object_section("perf_counters");
440 logger->dump_formatted(f, false);
441 f->close_section();
442 }
443
444 int statfs(struct store_statfs_t *buf) override;
445
446 using ObjectStore::exists;
447 bool exists(const coll_t& cid, const ghobject_t& oid) override;
448 using ObjectStore::stat;
449 int stat(
450 const coll_t& cid,
451 const ghobject_t& oid,
452 struct stat *st,
453 bool allow_eio = false) override; // struct stat?
454 int set_collection_opts(
455 const coll_t& cid,
456 const pool_opts_t& opts) override;
457 using ObjectStore::read;
458 int read(
459 const coll_t& cid,
460 const ghobject_t& oid,
461 uint64_t offset,
462 size_t len,
463 bufferlist& bl,
464 uint32_t op_flags = 0) override;
465 int _do_read(
466 OnodeRef o,
467 uint64_t offset,
468 size_t len,
469 bufferlist& bl,
470 uint32_t op_flags = 0);
471
472 using ObjectStore::fiemap;
473 int fiemap(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len, bufferlist& bl) override;
474 int fiemap(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
475 using ObjectStore::getattr;
476 int getattr(const coll_t& cid, const ghobject_t& oid, const char *name, bufferptr& value) override;
477 using ObjectStore::getattrs;
478 int getattrs(const coll_t& cid, const ghobject_t& oid, map<string,bufferptr>& aset) override;
479
480 int list_collections(vector<coll_t>& ls) override;
481 bool collection_exists(const coll_t& c) override;
482 int collection_empty(const coll_t& c, bool *empty) override;
483 int collection_bits(const coll_t& c) override;
484 int collection_list(
485 const coll_t& cid, const ghobject_t& start, const ghobject_t& end,
486 int max,
487 vector<ghobject_t> *ls, ghobject_t *next) override;
488 int collection_list(
489 CollectionHandle &c, const ghobject_t& start, const ghobject_t& end,
490 int max,
491 vector<ghobject_t> *ls, ghobject_t *next) override;
492
493 using ObjectStore::omap_get;
494 int omap_get(
495 const coll_t& cid, ///< [in] Collection containing oid
496 const ghobject_t &oid, ///< [in] Object containing omap
497 bufferlist *header, ///< [out] omap header
498 map<string, bufferlist> *out /// < [out] Key to value map
499 ) override;
500
501 using ObjectStore::omap_get_header;
502 /// Get omap header
503 int omap_get_header(
504 const coll_t& cid, ///< [in] Collection containing oid
505 const ghobject_t &oid, ///< [in] Object containing omap
506 bufferlist *header, ///< [out] omap header
507 bool allow_eio = false ///< [in] don't assert on eio
508 ) override;
509
510 using ObjectStore::omap_get_keys;
511 /// Get keys defined on oid
512 int omap_get_keys(
513 const coll_t& cid, ///< [in] Collection containing oid
514 const ghobject_t &oid, ///< [in] Object containing omap
515 set<string> *keys ///< [out] Keys defined on oid
516 ) override;
517
518 using ObjectStore::omap_get_values;
519 /// Get key values
520 int omap_get_values(
521 const coll_t& cid, ///< [in] Collection containing oid
522 const ghobject_t &oid, ///< [in] Object containing omap
523 const set<string> &keys, ///< [in] Keys to get
524 map<string, bufferlist> *out ///< [out] Returned keys and values
525 ) override;
526
527 using ObjectStore::omap_check_keys;
528 /// Filters keys into out which are defined on oid
529 int omap_check_keys(
530 const coll_t& cid, ///< [in] Collection containing oid
531 const ghobject_t &oid, ///< [in] Object containing omap
532 const set<string> &keys, ///< [in] Keys to check
533 set<string> *out ///< [out] Subset of keys defined on oid
534 ) override;
535
536 using ObjectStore::get_omap_iterator;
537 ObjectMap::ObjectMapIterator get_omap_iterator(
538 const coll_t& cid, ///< [in] collection
539 const ghobject_t &oid ///< [in] object
540 ) override;
541
542 void set_fsid(uuid_d u) override {
543 fsid = u;
544 }
545 uuid_d get_fsid() override {
546 return fsid;
547 }
548
549 uint64_t estimate_objects_overhead(uint64_t num_objects) override {
550 return num_objects * 300; //assuming per-object overhead is 300 bytes
551 }
552
553 objectstore_perf_stat_t get_cur_stats() override {
554 return objectstore_perf_stat_t();
555 }
556 const PerfCounters* get_perf_counters() const override {
557 return logger;
558 }
559
560
561 int queue_transactions(
562 Sequencer *osr,
563 vector<Transaction>& tls,
564 TrackedOpRef op = TrackedOpRef(),
565 ThreadPool::TPHandle *handle = NULL) override;
566
567 void compact () override {
568 assert(db);
569 db->compact();
570 }
571
572 private:
573 // --------------------------------------------------------
574 // write ops
575
576 int _do_transaction(Transaction *t,
577 TransContext *txc,
578 ThreadPool::TPHandle *handle);
579
580 int _write(TransContext *txc,
581 CollectionRef& c,
582 OnodeRef& o,
583 uint64_t offset, size_t len,
584 bufferlist& bl,
585 uint32_t fadvise_flags);
586 int _do_write(TransContext *txc,
587 OnodeRef o,
588 uint64_t offset, uint64_t length,
589 bufferlist& bl,
590 uint32_t fadvise_flags);
591 int _touch(TransContext *txc,
592 CollectionRef& c,
593 OnodeRef& o);
594 int _zero(TransContext *txc,
595 CollectionRef& c,
596 OnodeRef& o,
597 uint64_t offset, size_t len);
598 int _do_truncate(TransContext *txc,
599 OnodeRef o,
600 uint64_t offset);
601 int _truncate(TransContext *txc,
602 CollectionRef& c,
603 OnodeRef& o,
604 uint64_t offset);
605 int _remove(TransContext *txc,
606 CollectionRef& c,
607 OnodeRef& o);
608 int _do_remove(TransContext *txc,
609 OnodeRef o);
610 int _setattr(TransContext *txc,
611 CollectionRef& c,
612 OnodeRef& o,
613 const string& name,
614 bufferptr& val);
615 int _setattrs(TransContext *txc,
616 CollectionRef& c,
617 OnodeRef& o,
618 const map<string,bufferptr>& aset);
619 int _rmattr(TransContext *txc,
620 CollectionRef& c,
621 OnodeRef& o,
622 const string& name);
623 int _rmattrs(TransContext *txc,
624 CollectionRef& c,
625 OnodeRef& o);
626 void _do_omap_clear(TransContext *txc, uint64_t id);
627 int _omap_clear(TransContext *txc,
628 CollectionRef& c,
629 OnodeRef& o);
630 int _omap_setkeys(TransContext *txc,
631 CollectionRef& c,
632 OnodeRef& o,
633 bufferlist& bl);
634 int _omap_setheader(TransContext *txc,
635 CollectionRef& c,
636 OnodeRef& o,
637 bufferlist& header);
638 int _omap_rmkeys(TransContext *txc,
639 CollectionRef& c,
640 OnodeRef& o,
641 bufferlist& bl);
642 int _omap_rmkey_range(TransContext *txc,
643 CollectionRef& c,
644 OnodeRef& o,
645 const string& first, const string& last);
646 int _setallochint(TransContext *txc,
647 CollectionRef& c,
648 OnodeRef& o,
649 uint64_t expected_object_size,
650 uint64_t expected_write_size,
651 uint32_t flags);
652 int _clone(TransContext *txc,
653 CollectionRef& c,
654 OnodeRef& oldo,
655 OnodeRef& newo);
656 int _clone_range(TransContext *txc,
657 CollectionRef& c,
658 OnodeRef& oldo,
659 OnodeRef& newo,
660 uint64_t srcoff, uint64_t length, uint64_t dstoff);
661 int _rename(TransContext *txc,
662 CollectionRef& c,
663 OnodeRef& oldo,
664 OnodeRef& newo,
665 const ghobject_t& new_oid);
666 int _create_collection(TransContext *txc, coll_t cid, unsigned bits,
667 CollectionRef *c);
668 int _remove_collection(TransContext *txc, coll_t cid, CollectionRef *c);
669 int _split_collection(TransContext *txc,
670 CollectionRef& c,
671 CollectionRef& d,
672 unsigned bits, int rem);
673
674 };
675
676 inline ostream& operator<<(ostream& out, const KStore::OpSequencer& s) {
677 return out << *s.parent;
678 }
679
680 static inline void intrusive_ptr_add_ref(KStore::Onode *o) {
681 o->get();
682 }
683 static inline void intrusive_ptr_release(KStore::Onode *o) {
684 o->put();
685 }
686
687 static inline void intrusive_ptr_add_ref(KStore::OpSequencer *o) {
688 o->get();
689 }
690 static inline void intrusive_ptr_release(KStore::OpSequencer *o) {
691 o->put();
692 }
693
694 #endif