]> git.proxmox.com Git - ceph.git/blame - ceph/src/os/kstore/KStore.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / os / kstore / KStore.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#ifndef CEPH_OSD_KSTORE_H
16#define CEPH_OSD_KSTORE_H
17
18#include "acconfig.h"
19
20#include <unistd.h>
21
22#include <atomic>
23#include <mutex>
24#include <condition_variable>
25
11fdf7f2 26#include "include/ceph_assert.h"
7c673cae 27#include "include/unordered_map.h"
7c673cae
FG
28#include "common/Finisher.h"
29#include "common/RWLock.h"
11fdf7f2 30#include "common/Throttle.h"
7c673cae
FG
31#include "common/WorkQueue.h"
32#include "os/ObjectStore.h"
33#include "common/perf_counters.h"
34#include "os/fs/FS.h"
35#include "kv/KeyValueDB.h"
36
37#include "kstore_types.h"
38
39#include "boost/intrusive/list.hpp"
40
41enum {
42 l_kstore_first = 832430,
43 l_kstore_state_prepare_lat,
44 l_kstore_state_kv_queued_lat,
45 l_kstore_state_kv_done_lat,
46 l_kstore_state_finishing_lat,
47 l_kstore_state_done_lat,
48 l_kstore_last
49};
50
51class KStore : public ObjectStore {
52 // -----------------------------------------------------
53 // types
54public:
55
56 class TransContext;
57
58 /// an in-memory object
59 struct Onode {
60 CephContext* cct;
61 std::atomic_int nref; ///< reference count
62
63 ghobject_t oid;
64 string key; ///< key under PREFIX_OBJ where we are stored
65 boost::intrusive::list_member_hook<> lru_item;
66
67 kstore_onode_t onode; ///< metadata stored as value in kv store
68 bool dirty; // ???
69 bool exists;
70
71 std::mutex flush_lock; ///< protect flush_txns
72 std::condition_variable flush_cond; ///< wait here for unapplied txns
73 set<TransContext*> flush_txns; ///< committing txns
74
75 uint64_t tail_offset;
76 bufferlist tail_bl;
77
78 map<uint64_t,bufferlist> pending_stripes; ///< unwritten stripes
79
80 Onode(CephContext* cct, const ghobject_t& o, const string& k)
81 : cct(cct),
82 nref(0),
83 oid(o),
84 key(k),
85 dirty(false),
86 exists(false),
87 tail_offset(0) {
88 }
89
90 void flush();
91 void get() {
92 ++nref;
93 }
94 void put() {
95 if (--nref == 0)
96 delete this;
97 }
98
99 void clear_tail() {
100 tail_offset = 0;
101 tail_bl.clear();
102 }
103 void clear_pending_stripes() {
104 pending_stripes.clear();
105 }
106 };
107 typedef boost::intrusive_ptr<Onode> OnodeRef;
108
109 struct OnodeHashLRU {
110 CephContext* cct;
111 typedef boost::intrusive::list<
112 Onode,
113 boost::intrusive::member_hook<
114 Onode,
115 boost::intrusive::list_member_hook<>,
116 &Onode::lru_item> > lru_list_t;
117
118 std::mutex lock;
119 ceph::unordered_map<ghobject_t,OnodeRef> onode_map; ///< forward lookups
120 lru_list_t lru; ///< lru
121
122 OnodeHashLRU(CephContext* cct) : cct(cct) {}
123
124 void add(const ghobject_t& oid, OnodeRef o);
125 void _touch(OnodeRef o);
126 OnodeRef lookup(const ghobject_t& o);
127 void rename(const ghobject_t& old_oid, const ghobject_t& new_oid);
128 void clear();
129 bool get_next(const ghobject_t& after, pair<ghobject_t,OnodeRef> *next);
130 int trim(int max=-1);
131 };
132
11fdf7f2
TL
133 class OpSequencer;
134 typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef;
135
7c673cae
FG
136 struct Collection : public CollectionImpl {
137 KStore *store;
7c673cae
FG
138 kstore_cnode_t cnode;
139 RWLock lock;
140
11fdf7f2
TL
141 OpSequencerRef osr;
142
7c673cae
FG
143 // cache onodes on a per-collection basis to avoid lock
144 // contention.
145 OnodeHashLRU onode_map;
146
147 OnodeRef get_onode(const ghobject_t& oid, bool create);
148
7c673cae
FG
149 bool contains(const ghobject_t& oid) {
150 if (cid.is_meta())
151 return oid.hobj.pool == -1;
152 spg_t spgid;
153 if (cid.is_pg(&spgid))
154 return
155 spgid.pgid.contains(cnode.bits, oid) &&
156 oid.shard_id == spgid.shard;
157 return false;
158 }
159
11fdf7f2
TL
160 void flush() override;
161 bool flush_commit(Context *c) override;
162
7c673cae
FG
163 Collection(KStore *ns, coll_t c);
164 };
165 typedef boost::intrusive_ptr<Collection> CollectionRef;
166
167 class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
168 CollectionRef c;
169 OnodeRef o;
170 KeyValueDB::Iterator it;
171 string head, tail;
172 public:
173 OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it);
174 int seek_to_first() override;
175 int upper_bound(const string &after) override;
176 int lower_bound(const string &to) override;
177 bool valid() override;
11fdf7f2 178 int next() override;
7c673cae
FG
179 string key() override;
180 bufferlist value() override;
181 int status() override {
182 return 0;
183 }
184 };
185
7c673cae
FG
186 struct TransContext {
187 typedef enum {
188 STATE_PREPARE,
189 STATE_AIO_WAIT,
190 STATE_IO_DONE,
191 STATE_KV_QUEUED,
192 STATE_KV_COMMITTING,
193 STATE_KV_DONE,
194 STATE_FINISHING,
195 STATE_DONE,
196 } state_t;
197
198 state_t state;
199
200 const char *get_state_name() {
201 switch (state) {
202 case STATE_PREPARE: return "prepare";
203 case STATE_AIO_WAIT: return "aio_wait";
204 case STATE_IO_DONE: return "io_done";
205 case STATE_KV_QUEUED: return "kv_queued";
206 case STATE_KV_COMMITTING: return "kv_committing";
207 case STATE_KV_DONE: return "kv_done";
208 case STATE_FINISHING: return "finishing";
209 case STATE_DONE: return "done";
210 }
211 return "???";
212 }
213
214 void log_state_latency(PerfCounters *logger, int state) {
215 utime_t lat, now = ceph_clock_now();
216 lat = now - start;
217 logger->tinc(state, lat);
218 start = now;
219 }
220
11fdf7f2 221 CollectionRef ch;
7c673cae
FG
222 OpSequencerRef osr;
223 boost::intrusive::list_member_hook<> sequencer_item;
224
225 uint64_t ops, bytes;
226
227 set<OnodeRef> onodes; ///< these onodes need to be updated/written
228 KeyValueDB::Transaction t; ///< then we will commit this
229 Context *oncommit; ///< signal on commit
230 Context *onreadable; ///< signal on readable
231 Context *onreadable_sync; ///< signal on readable
232 list<Context*> oncommits; ///< more commit completions
233 list<CollectionRef> removed_collections; ///< colls we removed
234
235 CollectionRef first_collection; ///< first referenced collection
236 utime_t start;
237 explicit TransContext(OpSequencer *o)
238 : state(STATE_PREPARE),
239 osr(o),
240 ops(0),
241 bytes(0),
242 oncommit(NULL),
243 onreadable(NULL),
244 onreadable_sync(NULL),
245 start(ceph_clock_now()){
246 //cout << "txc new " << this << std::endl;
247 }
248 ~TransContext() {
249 //cout << "txc del " << this << std::endl;
250 }
251
252 void write_onode(OnodeRef &o) {
253 onodes.insert(o);
254 }
255 };
256
11fdf7f2 257 class OpSequencer : public RefCountedObject {
7c673cae
FG
258 public:
259 std::mutex qlock;
260 std::condition_variable qcond;
261 typedef boost::intrusive::list<
262 TransContext,
263 boost::intrusive::member_hook<
264 TransContext,
265 boost::intrusive::list_member_hook<>,
266 &TransContext::sequencer_item> > q_list_t;
267 q_list_t q; ///< transactions
268
11fdf7f2
TL
269 ~OpSequencer() {
270 ceph_assert(q.empty());
7c673cae
FG
271 }
272
273 void queue_new(TransContext *txc) {
274 std::lock_guard<std::mutex> l(qlock);
275 q.push_back(*txc);
276 }
277
11fdf7f2 278 void flush() {
7c673cae
FG
279 std::unique_lock<std::mutex> l(qlock);
280 while (!q.empty())
281 qcond.wait(l);
282 }
283
11fdf7f2 284 bool flush_commit(Context *c) {
7c673cae
FG
285 std::lock_guard<std::mutex> l(qlock);
286 if (q.empty()) {
287 return true;
288 }
289 TransContext *txc = &q.back();
290 if (txc->state >= TransContext::STATE_KV_DONE) {
291 return true;
292 }
11fdf7f2 293 ceph_assert(txc->state < TransContext::STATE_KV_DONE);
7c673cae
FG
294 txc->oncommits.push_back(c);
295 return false;
296 }
297 };
298
299 struct KVSyncThread : public Thread {
300 KStore *store;
301 explicit KVSyncThread(KStore *s) : store(s) {}
302 void *entry() override {
303 store->_kv_sync_thread();
304 return NULL;
305 }
306 };
307
308 // --------------------------------------------------------
309 // members
310private:
311 KeyValueDB *db;
312 uuid_d fsid;
11fdf7f2 313 string basedir;
7c673cae
FG
314 int path_fd; ///< open handle to $path
315 int fsid_fd; ///< open handle (locked) to $path/fsid
316 bool mounted;
317
318 RWLock coll_lock; ///< rwlock to protect coll_map
319 ceph::unordered_map<coll_t, CollectionRef> coll_map;
11fdf7f2 320 map<coll_t,CollectionRef> new_coll_map;
7c673cae
FG
321
322 std::mutex nid_lock;
323 uint64_t nid_last;
324 uint64_t nid_max;
325
326 Throttle throttle_ops, throttle_bytes; ///< submit to commit
327
328 Finisher finisher;
329
330 KVSyncThread kv_sync_thread;
331 std::mutex kv_lock;
332 std::condition_variable kv_cond, kv_sync_cond;
333 bool kv_stop;
334 deque<TransContext*> kv_queue, kv_committing;
335
336 //Logger *logger;
337 PerfCounters *logger;
338 std::mutex reap_lock;
339 list<CollectionRef> removed_collections;
340
341
342 // --------------------------------------------------------
343 // private methods
344
345 void _init_logger();
346 void _shutdown_logger();
347
348 int _open_path();
349 void _close_path();
350 int _open_fsid(bool create);
351 int _lock_fsid();
352 int _read_fsid(uuid_d *f);
353 int _write_fsid();
354 void _close_fsid();
355 int _open_db(bool create);
356 void _close_db();
357 int _open_collections(int *errors=0);
358 void _close_collections();
359
360 int _open_super_meta();
361
362 CollectionRef _get_collection(coll_t cid);
363 void _queue_reap_collection(CollectionRef& c);
364 void _reap_collections();
365
366 void _assign_nid(TransContext *txc, OnodeRef o);
367
368 void _dump_onode(OnodeRef o);
369
370 TransContext *_txc_create(OpSequencer *osr);
371 void _txc_release(TransContext *txc, uint64_t offset, uint64_t length);
372 void _txc_add_transaction(TransContext *txc, Transaction *t);
373 void _txc_finalize(OpSequencer *osr, TransContext *txc);
374 void _txc_state_proc(TransContext *txc);
375 void _txc_finish_kv(TransContext *txc);
376 void _txc_finish(TransContext *txc);
377
378 void _osr_reap_done(OpSequencer *osr);
379
380 void _kv_sync_thread();
381 void _kv_stop() {
382 {
383 std::lock_guard<std::mutex> l(kv_lock);
384 kv_stop = true;
385 kv_cond.notify_all();
386 }
387 kv_sync_thread.join();
388 kv_stop = false;
389 }
390
391 void _do_read_stripe(OnodeRef o, uint64_t offset, bufferlist *pbl);
392 void _do_write_stripe(TransContext *txc, OnodeRef o,
393 uint64_t offset, bufferlist& bl);
394 void _do_remove_stripe(TransContext *txc, OnodeRef o, uint64_t offset);
395
396 int _collection_list(
397 Collection *c, const ghobject_t& start, const ghobject_t& end,
398 int max, vector<ghobject_t> *ls, ghobject_t *next);
399
400public:
401 KStore(CephContext *cct, const string& path);
402 ~KStore() override;
403
404 string get_type() override {
405 return "kstore";
406 }
407
408 bool needs_journal() override { return false; };
409 bool wants_journal() override { return false; };
410 bool allows_journal() override { return false; };
411
412 static int get_block_device_fsid(const string& path, uuid_d *fsid);
413
414 bool test_mount_in_use() override;
415
416 int mount() override;
417 int umount() override;
418 void _sync();
419
420 int fsck(bool deep) override;
421
422
423 int validate_hobject_key(const hobject_t &obj) const override {
424 return 0;
425 }
426 unsigned get_max_attr_name_length() override {
427 return 256; // arbitrary; there is no real limit internally
428 }
429
430 int mkfs() override;
431 int mkjournal() override {
432 return 0;
433 }
434 void dump_perf_counters(Formatter *f) override {
435 f->open_object_section("perf_counters");
436 logger->dump_formatted(f, false);
437 f->close_section();
438 }
11fdf7f2
TL
439 void get_db_statistics(Formatter *f) override {
440 db->get_statistics(f);
441 }
442 int statfs(struct store_statfs_t *buf,
443 osd_alert_list_t* alerts = nullptr) override;
444 int pool_statfs(uint64_t pool_id, struct store_statfs_t *buf) override;
445
446 CollectionHandle open_collection(const coll_t& c) override;
447 CollectionHandle create_new_collection(const coll_t& c) override;
448 void set_collection_commit_queue(const coll_t& cid,
449 ContextQueue *commit_queue) override {
450 }
7c673cae
FG
451
452 using ObjectStore::exists;
11fdf7f2 453 bool exists(CollectionHandle& c, const ghobject_t& oid) override;
7c673cae
FG
454 using ObjectStore::stat;
455 int stat(
11fdf7f2 456 CollectionHandle& c,
7c673cae
FG
457 const ghobject_t& oid,
458 struct stat *st,
459 bool allow_eio = false) override; // struct stat?
460 int set_collection_opts(
11fdf7f2 461 CollectionHandle& c,
7c673cae
FG
462 const pool_opts_t& opts) override;
463 using ObjectStore::read;
464 int read(
11fdf7f2 465 CollectionHandle& c,
7c673cae
FG
466 const ghobject_t& oid,
467 uint64_t offset,
468 size_t len,
469 bufferlist& bl,
224ce89b 470 uint32_t op_flags = 0) override;
7c673cae
FG
471 int _do_read(
472 OnodeRef o,
473 uint64_t offset,
474 size_t len,
475 bufferlist& bl,
476 uint32_t op_flags = 0);
477
478 using ObjectStore::fiemap;
11fdf7f2
TL
479 int fiemap(CollectionHandle& c, const ghobject_t& oid, uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
480 int fiemap(CollectionHandle& c, const ghobject_t& oid, uint64_t offset, size_t len, bufferlist& outbl) override;
7c673cae 481 using ObjectStore::getattr;
11fdf7f2 482 int getattr(CollectionHandle& c, const ghobject_t& oid, const char *name, bufferptr& value) override;
7c673cae 483 using ObjectStore::getattrs;
11fdf7f2 484 int getattrs(CollectionHandle& c, const ghobject_t& oid, map<string,bufferptr>& aset) override;
7c673cae
FG
485
486 int list_collections(vector<coll_t>& ls) override;
487 bool collection_exists(const coll_t& c) override;
11fdf7f2
TL
488 int collection_empty(CollectionHandle& c, bool *empty) override;
489 int collection_bits(CollectionHandle& c) override;
7c673cae
FG
490 int collection_list(
491 CollectionHandle &c, const ghobject_t& start, const ghobject_t& end,
492 int max,
493 vector<ghobject_t> *ls, ghobject_t *next) override;
494
495 using ObjectStore::omap_get;
496 int omap_get(
11fdf7f2 497 CollectionHandle& c, ///< [in] Collection containing oid
7c673cae
FG
498 const ghobject_t &oid, ///< [in] Object containing omap
499 bufferlist *header, ///< [out] omap header
500 map<string, bufferlist> *out /// < [out] Key to value map
501 ) override;
502
503 using ObjectStore::omap_get_header;
504 /// Get omap header
505 int omap_get_header(
11fdf7f2 506 CollectionHandle& c, ///< [in] Collection containing oid
7c673cae
FG
507 const ghobject_t &oid, ///< [in] Object containing omap
508 bufferlist *header, ///< [out] omap header
509 bool allow_eio = false ///< [in] don't assert on eio
510 ) override;
511
512 using ObjectStore::omap_get_keys;
513 /// Get keys defined on oid
514 int omap_get_keys(
11fdf7f2 515 CollectionHandle& c, ///< [in] Collection containing oid
7c673cae
FG
516 const ghobject_t &oid, ///< [in] Object containing omap
517 set<string> *keys ///< [out] Keys defined on oid
518 ) override;
519
520 using ObjectStore::omap_get_values;
521 /// Get key values
522 int omap_get_values(
11fdf7f2 523 CollectionHandle& c, ///< [in] Collection containing oid
7c673cae
FG
524 const ghobject_t &oid, ///< [in] Object containing omap
525 const set<string> &keys, ///< [in] Keys to get
526 map<string, bufferlist> *out ///< [out] Returned keys and values
527 ) override;
528
529 using ObjectStore::omap_check_keys;
530 /// Filters keys into out which are defined on oid
531 int omap_check_keys(
11fdf7f2 532 CollectionHandle& c, ///< [in] Collection containing oid
7c673cae
FG
533 const ghobject_t &oid, ///< [in] Object containing omap
534 const set<string> &keys, ///< [in] Keys to check
535 set<string> *out ///< [out] Subset of keys defined on oid
536 ) override;
537
538 using ObjectStore::get_omap_iterator;
539 ObjectMap::ObjectMapIterator get_omap_iterator(
11fdf7f2 540 CollectionHandle& c, ///< [in] collection
7c673cae
FG
541 const ghobject_t &oid ///< [in] object
542 ) override;
543
544 void set_fsid(uuid_d u) override {
545 fsid = u;
546 }
547 uuid_d get_fsid() override {
548 return fsid;
549 }
550
551 uint64_t estimate_objects_overhead(uint64_t num_objects) override {
552 return num_objects * 300; //assuming per-object overhead is 300 bytes
553 }
554
555 objectstore_perf_stat_t get_cur_stats() override {
556 return objectstore_perf_stat_t();
557 }
558 const PerfCounters* get_perf_counters() const override {
559 return logger;
560 }
561
562
563 int queue_transactions(
11fdf7f2 564 CollectionHandle& ch,
7c673cae
FG
565 vector<Transaction>& tls,
566 TrackedOpRef op = TrackedOpRef(),
567 ThreadPool::TPHandle *handle = NULL) override;
568
224ce89b 569 void compact () override {
11fdf7f2 570 ceph_assert(db);
224ce89b
WB
571 db->compact();
572 }
573
7c673cae
FG
574private:
575 // --------------------------------------------------------
576 // write ops
577
7c673cae
FG
578 int _write(TransContext *txc,
579 CollectionRef& c,
580 OnodeRef& o,
581 uint64_t offset, size_t len,
582 bufferlist& bl,
583 uint32_t fadvise_flags);
584 int _do_write(TransContext *txc,
585 OnodeRef o,
586 uint64_t offset, uint64_t length,
587 bufferlist& bl,
588 uint32_t fadvise_flags);
589 int _touch(TransContext *txc,
590 CollectionRef& c,
591 OnodeRef& o);
592 int _zero(TransContext *txc,
593 CollectionRef& c,
594 OnodeRef& o,
595 uint64_t offset, size_t len);
596 int _do_truncate(TransContext *txc,
597 OnodeRef o,
598 uint64_t offset);
599 int _truncate(TransContext *txc,
600 CollectionRef& c,
601 OnodeRef& o,
602 uint64_t offset);
603 int _remove(TransContext *txc,
604 CollectionRef& c,
605 OnodeRef& o);
606 int _do_remove(TransContext *txc,
607 OnodeRef o);
608 int _setattr(TransContext *txc,
609 CollectionRef& c,
610 OnodeRef& o,
611 const string& name,
612 bufferptr& val);
613 int _setattrs(TransContext *txc,
614 CollectionRef& c,
615 OnodeRef& o,
616 const map<string,bufferptr>& aset);
617 int _rmattr(TransContext *txc,
618 CollectionRef& c,
619 OnodeRef& o,
620 const string& name);
621 int _rmattrs(TransContext *txc,
622 CollectionRef& c,
623 OnodeRef& o);
624 void _do_omap_clear(TransContext *txc, uint64_t id);
625 int _omap_clear(TransContext *txc,
626 CollectionRef& c,
627 OnodeRef& o);
628 int _omap_setkeys(TransContext *txc,
629 CollectionRef& c,
630 OnodeRef& o,
631 bufferlist& bl);
632 int _omap_setheader(TransContext *txc,
633 CollectionRef& c,
634 OnodeRef& o,
635 bufferlist& header);
636 int _omap_rmkeys(TransContext *txc,
637 CollectionRef& c,
638 OnodeRef& o,
11fdf7f2 639 const bufferlist& bl);
7c673cae
FG
640 int _omap_rmkey_range(TransContext *txc,
641 CollectionRef& c,
642 OnodeRef& o,
643 const string& first, const string& last);
644 int _setallochint(TransContext *txc,
645 CollectionRef& c,
646 OnodeRef& o,
647 uint64_t expected_object_size,
648 uint64_t expected_write_size,
649 uint32_t flags);
650 int _clone(TransContext *txc,
651 CollectionRef& c,
652 OnodeRef& oldo,
653 OnodeRef& newo);
654 int _clone_range(TransContext *txc,
655 CollectionRef& c,
656 OnodeRef& oldo,
657 OnodeRef& newo,
658 uint64_t srcoff, uint64_t length, uint64_t dstoff);
659 int _rename(TransContext *txc,
660 CollectionRef& c,
661 OnodeRef& oldo,
662 OnodeRef& newo,
663 const ghobject_t& new_oid);
664 int _create_collection(TransContext *txc, coll_t cid, unsigned bits,
665 CollectionRef *c);
666 int _remove_collection(TransContext *txc, coll_t cid, CollectionRef *c);
667 int _split_collection(TransContext *txc,
668 CollectionRef& c,
669 CollectionRef& d,
670 unsigned bits, int rem);
11fdf7f2
TL
671 int _merge_collection(TransContext *txc,
672 CollectionRef *c,
673 CollectionRef& d,
674 unsigned bits);
7c673cae
FG
675
676};
677
7c673cae
FG
678static inline void intrusive_ptr_add_ref(KStore::Onode *o) {
679 o->get();
680}
681static inline void intrusive_ptr_release(KStore::Onode *o) {
682 o->put();
683}
684
685static inline void intrusive_ptr_add_ref(KStore::OpSequencer *o) {
686 o->get();
687}
688static inline void intrusive_ptr_release(KStore::OpSequencer *o) {
689 o->put();
690}
691
692#endif