]> git.proxmox.com Git - ceph.git/blame - ceph/src/os/kstore/KStore.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / os / kstore / KStore.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include <unistd.h>
16#include <stdlib.h>
17#include <sys/types.h>
18#include <sys/stat.h>
19#include <fcntl.h>
20#include <unistd.h>
11fdf7f2
TL
21#if defined(__FreeBSD__)
22#include <sys/param.h>
23#include <sys/mount.h>
24#endif
7c673cae
FG
25
26#include "KStore.h"
27#include "osd/osd_types.h"
28#include "os/kv.h"
29#include "include/compat.h"
30#include "include/stringify.h"
31#include "common/errno.h"
32#include "common/safe_io.h"
33#include "common/Formatter.h"
34
35
36#define dout_context cct
37#define dout_subsys ceph_subsys_kstore
38
39/*
40
41 TODO:
42
43 * superblock, features
44 * refcounted extents (for efficient clone)
45
46 */
47
48const string PREFIX_SUPER = "S"; // field -> value
49const string PREFIX_COLL = "C"; // collection name -> (nothing)
50const string PREFIX_OBJ = "O"; // object name -> onode
51const string PREFIX_DATA = "D"; // nid + offset -> data
52const string PREFIX_OMAP = "M"; // u64 + keyname -> value
53
54/*
55 * object name key structure
56 *
57 * 2 chars: shard (-- for none, or hex digit, so that we sort properly)
58 * encoded u64: poolid + 2^63 (so that it sorts properly)
59 * encoded u32: hash (bit reversed)
60 *
61 * 1 char: '.'
62 *
63 * escaped string: namespace
64 *
65 * 1 char: '<', '=', or '>'. if =, then object key == object name, and
66 * we are followed just by the key. otherwise, we are followed by
67 * the key and then the object name.
68 * escaped string: key
69 * escaped string: object name (unless '=' above)
70 *
71 * encoded u64: snap
72 * encoded u64: generation
73 */
74
75/*
76 * string encoding in the key
77 *
78 * The key string needs to lexicographically sort the same way that
79 * ghobject_t does. We do this by escaping anything <= to '#' with #
80 * plus a 2 digit hex string, and anything >= '~' with ~ plus the two
81 * hex digits.
82 *
83 * We use ! as a terminator for strings; this works because it is < #
84 * and will get escaped if it is present in the string.
85 *
86 */
87
88static void append_escaped(const string &in, string *out)
89{
90 char hexbyte[8];
91 for (string::const_iterator i = in.begin(); i != in.end(); ++i) {
92 if (*i <= '#') {
93 snprintf(hexbyte, sizeof(hexbyte), "#%02x", (uint8_t)*i);
94 out->append(hexbyte);
95 } else if (*i >= '~') {
96 snprintf(hexbyte, sizeof(hexbyte), "~%02x", (uint8_t)*i);
97 out->append(hexbyte);
98 } else {
99 out->push_back(*i);
100 }
101 }
102 out->push_back('!');
103}
104
105static int decode_escaped(const char *p, string *out)
106{
107 const char *orig_p = p;
108 while (*p && *p != '!') {
109 if (*p == '#' || *p == '~') {
110 unsigned hex;
111 int r = sscanf(++p, "%2x", &hex);
112 if (r < 1)
113 return -EINVAL;
114 out->push_back((char)hex);
115 p += 2;
116 } else {
117 out->push_back(*p++);
118 }
119 }
120 return p - orig_p;
121}
122
123// some things we encode in binary (as le32 or le64); print the
124// resulting key strings nicely
125static string pretty_binary_string(const string& in)
126{
127 char buf[10];
128 string out;
129 out.reserve(in.length() * 3);
130 enum { NONE, HEX, STRING } mode = NONE;
131 unsigned from = 0, i;
132 for (i=0; i < in.length(); ++i) {
133 if ((in[i] < 32 || (unsigned char)in[i] > 126) ||
134 (mode == HEX && in.length() - i >= 4 &&
135 ((in[i] < 32 || (unsigned char)in[i] > 126) ||
136 (in[i+1] < 32 || (unsigned char)in[i+1] > 126) ||
137 (in[i+2] < 32 || (unsigned char)in[i+2] > 126) ||
138 (in[i+3] < 32 || (unsigned char)in[i+3] > 126)))) {
139 if (mode == STRING) {
140 out.append(in.substr(from, i - from));
141 out.push_back('\'');
142 }
143 if (mode != HEX) {
144 out.append("0x");
145 mode = HEX;
146 }
147 if (in.length() - i >= 4) {
148 // print a whole u32 at once
149 snprintf(buf, sizeof(buf), "%08x",
150 (uint32_t)(((unsigned char)in[i] << 24) |
151 ((unsigned char)in[i+1] << 16) |
152 ((unsigned char)in[i+2] << 8) |
153 ((unsigned char)in[i+3] << 0)));
154 i += 3;
155 } else {
156 snprintf(buf, sizeof(buf), "%02x", (int)(unsigned char)in[i]);
157 }
158 out.append(buf);
159 } else {
160 if (mode != STRING) {
161 out.push_back('\'');
162 mode = STRING;
163 from = i;
164 }
165 }
166 }
167 if (mode == STRING) {
168 out.append(in.substr(from, i - from));
169 out.push_back('\'');
170 }
171 return out;
172}
173
174static void _key_encode_shard(shard_id_t shard, string *key)
175{
176 // make field ordering match with ghobject_t compare operations
177 if (shard == shard_id_t::NO_SHARD) {
178 // otherwise ff will sort *after* 0, not before.
179 key->append("--");
180 } else {
181 char buf[32];
182 snprintf(buf, sizeof(buf), "%02x", (int)shard);
183 key->append(buf);
184 }
185}
186static const char *_key_decode_shard(const char *key, shard_id_t *pshard)
187{
188 if (key[0] == '-') {
189 *pshard = shard_id_t::NO_SHARD;
190 } else {
191 unsigned shard;
192 int r = sscanf(key, "%x", &shard);
193 if (r < 1)
194 return NULL;
195 *pshard = shard_id_t(shard);
196 }
197 return key + 2;
198}
199
200static void get_coll_key_range(const coll_t& cid, int bits,
201 string *temp_start, string *temp_end,
202 string *start, string *end)
203{
204 temp_start->clear();
205 temp_end->clear();
206 start->clear();
207 end->clear();
208
209 spg_t pgid;
210 if (cid.is_pg(&pgid)) {
211 _key_encode_shard(pgid.shard, start);
212 *end = *start;
213 *temp_start = *start;
214 *temp_end = *start;
215
216 _key_encode_u64(pgid.pool() + 0x8000000000000000ull, start);
217 _key_encode_u64((-2ll - pgid.pool()) + 0x8000000000000000ull, temp_start);
218 _key_encode_u32(hobject_t::_reverse_bits(pgid.ps()), start);
219 _key_encode_u32(hobject_t::_reverse_bits(pgid.ps()), temp_start);
220 start->append(".");
221 temp_start->append(".");
222
223 _key_encode_u64(pgid.pool() + 0x8000000000000000ull, end);
224 _key_encode_u64((-2ll - pgid.pool()) + 0x8000000000000000ull, temp_end);
225
226 uint64_t end_hash =
227 hobject_t::_reverse_bits(pgid.ps()) + (1ull << (32-bits));
228 if (end_hash <= 0xffffffffull) {
229 _key_encode_u32(end_hash, end);
230 _key_encode_u32(end_hash, temp_end);
231 end->append(".");
232 temp_end->append(".");
233 } else {
234 _key_encode_u32(0xffffffff, end);
235 _key_encode_u32(0xffffffff, temp_end);
236 end->append(":");
237 temp_end->append(":");
238 }
239 } else {
240 _key_encode_shard(shard_id_t::NO_SHARD, start);
241 _key_encode_u64(-1ull + 0x8000000000000000ull, start);
242 *end = *start;
243 _key_encode_u32(0, start);
244 start->append(".");
245 _key_encode_u32(0xffffffff, end);
246 end->append(":");
247
248 // no separate temp section
249 *temp_start = *end;
250 *temp_end = *end;
251 }
252}
253
254static int get_key_object(const string& key, ghobject_t *oid);
255
256static void get_object_key(CephContext* cct, const ghobject_t& oid,
257 string *key)
258{
259 key->clear();
260
261 _key_encode_shard(oid.shard_id, key);
262 _key_encode_u64(oid.hobj.pool + 0x8000000000000000ull, key);
263 _key_encode_u32(oid.hobj.get_bitwise_key_u32(), key);
264 key->append(".");
265
266 append_escaped(oid.hobj.nspace, key);
267
268 if (oid.hobj.get_key().length()) {
269 // is a key... could be < = or >.
270 // (ASCII chars < = and > sort in that order, yay)
271 if (oid.hobj.get_key() < oid.hobj.oid.name) {
272 key->append("<");
273 append_escaped(oid.hobj.get_key(), key);
274 append_escaped(oid.hobj.oid.name, key);
275 } else if (oid.hobj.get_key() > oid.hobj.oid.name) {
276 key->append(">");
277 append_escaped(oid.hobj.get_key(), key);
278 append_escaped(oid.hobj.oid.name, key);
279 } else {
280 // same as no key
281 key->append("=");
282 append_escaped(oid.hobj.oid.name, key);
283 }
284 } else {
285 // no key
286 key->append("=");
287 append_escaped(oid.hobj.oid.name, key);
288 }
289
290 _key_encode_u64(oid.hobj.snap, key);
291 _key_encode_u64(oid.generation, key);
292
293 // sanity check
294 if (true) {
295 ghobject_t t;
296 int r = get_key_object(*key, &t);
297 if (r || t != oid) {
298 derr << " r " << r << dendl;
299 derr << "key " << pretty_binary_string(*key) << dendl;
300 derr << "oid " << oid << dendl;
301 derr << " t " << t << dendl;
11fdf7f2 302 ceph_assert(t == oid);
7c673cae
FG
303 }
304 }
305}
306
307static int get_key_object(const string& key, ghobject_t *oid)
308{
309 int r;
310 const char *p = key.c_str();
311
312 p = _key_decode_shard(p, &oid->shard_id);
313
314 uint64_t pool;
315 p = _key_decode_u64(p, &pool);
316 oid->hobj.pool = pool - 0x8000000000000000ull;
317
318 unsigned hash;
319 p = _key_decode_u32(p, &hash);
320 oid->hobj.set_bitwise_key_u32(hash);
321 if (*p != '.')
322 return -5;
323 ++p;
324
325 r = decode_escaped(p, &oid->hobj.nspace);
326 if (r < 0)
327 return -6;
328 p += r + 1;
329
330 if (*p == '=') {
331 // no key
332 ++p;
333 r = decode_escaped(p, &oid->hobj.oid.name);
334 if (r < 0)
335 return -7;
336 p += r + 1;
337 } else if (*p == '<' || *p == '>') {
338 // key + name
339 ++p;
340 string okey;
341 r = decode_escaped(p, &okey);
342 if (r < 0)
343 return -8;
344 p += r + 1;
345 r = decode_escaped(p, &oid->hobj.oid.name);
346 if (r < 0)
347 return -9;
348 p += r + 1;
349 oid->hobj.set_key(okey);
350 } else {
351 // malformed
352 return -10;
353 }
354
355 p = _key_decode_u64(p, &oid->hobj.snap.val);
356 p = _key_decode_u64(p, &oid->generation);
357 if (*p) {
358 // if we get something other than a null terminator here,
359 // something goes wrong.
360 return -12;
361 }
362
363 return 0;
364}
365
366
367static void get_data_key(uint64_t nid, uint64_t offset, string *out)
368{
369 _key_encode_u64(nid, out);
370 _key_encode_u64(offset, out);
371}
372
373// '-' < '.' < '~'
374static void get_omap_header(uint64_t id, string *out)
375{
376 _key_encode_u64(id, out);
377 out->push_back('-');
378}
379
380// hmm, I don't think there's any need to escape the user key since we
381// have a clean prefix.
382static void get_omap_key(uint64_t id, const string& key, string *out)
383{
384 _key_encode_u64(id, out);
385 out->push_back('.');
386 out->append(key);
387}
388
389static void rewrite_omap_key(uint64_t id, string old, string *out)
390{
391 _key_encode_u64(id, out);
392 out->append(old.substr(out->length()));
393}
394
395static void decode_omap_key(const string& key, string *user_key)
396{
397 *user_key = key.substr(sizeof(uint64_t) + 1);
398}
399
400static void get_omap_tail(uint64_t id, string *out)
401{
402 _key_encode_u64(id, out);
403 out->push_back('~');
404}
405
406
407
408// Onode
409
410#undef dout_prefix
411#define dout_prefix *_dout << "kstore.onode(" << this << ") "
412
413void KStore::Onode::flush()
414{
415 std::unique_lock<std::mutex> l(flush_lock);
416 dout(20) << __func__ << " " << flush_txns << dendl;
417 while (!flush_txns.empty())
418 flush_cond.wait(l);
419 dout(20) << __func__ << " done" << dendl;
420}
421
422// OnodeHashLRU
423
424#undef dout_prefix
425#define dout_prefix *_dout << "kstore.lru(" << this << ") "
426
427void KStore::OnodeHashLRU::_touch(OnodeRef o)
428{
429 lru_list_t::iterator p = lru.iterator_to(*o);
430 lru.erase(p);
431 lru.push_front(*o);
432}
433
434void KStore::OnodeHashLRU::add(const ghobject_t& oid, OnodeRef o)
435{
436 std::lock_guard<std::mutex> l(lock);
437 dout(30) << __func__ << " " << oid << " " << o << dendl;
11fdf7f2 438 ceph_assert(onode_map.count(oid) == 0);
7c673cae
FG
439 onode_map[oid] = o;
440 lru.push_front(*o);
441}
442
443KStore::OnodeRef KStore::OnodeHashLRU::lookup(const ghobject_t& oid)
444{
445 std::lock_guard<std::mutex> l(lock);
446 dout(30) << __func__ << dendl;
447 ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.find(oid);
448 if (p == onode_map.end()) {
449 dout(30) << __func__ << " " << oid << " miss" << dendl;
450 return OnodeRef();
451 }
452 dout(30) << __func__ << " " << oid << " hit " << p->second << dendl;
453 _touch(p->second);
454 return p->second;
455}
456
457void KStore::OnodeHashLRU::clear()
458{
459 std::lock_guard<std::mutex> l(lock);
460 dout(10) << __func__ << dendl;
461 lru.clear();
462 onode_map.clear();
463}
464
465void KStore::OnodeHashLRU::rename(const ghobject_t& old_oid,
466 const ghobject_t& new_oid)
467{
468 std::lock_guard<std::mutex> l(lock);
469 dout(30) << __func__ << " " << old_oid << " -> " << new_oid << dendl;
470 ceph::unordered_map<ghobject_t,OnodeRef>::iterator po, pn;
471 po = onode_map.find(old_oid);
472 pn = onode_map.find(new_oid);
473
11fdf7f2 474 ceph_assert(po != onode_map.end());
7c673cae
FG
475 if (pn != onode_map.end()) {
476 lru_list_t::iterator p = lru.iterator_to(*pn->second);
477 lru.erase(p);
478 onode_map.erase(pn);
479 }
480 OnodeRef o = po->second;
481
482 // install a non-existent onode it its place
483 po->second.reset(new Onode(cct, old_oid, o->key));
484 lru.push_back(*po->second);
485
486 // fix oid, key
487 onode_map.insert(make_pair(new_oid, o));
488 _touch(o);
489 o->oid = new_oid;
490 get_object_key(cct, new_oid, &o->key);
491}
492
493bool KStore::OnodeHashLRU::get_next(
494 const ghobject_t& after,
495 pair<ghobject_t,OnodeRef> *next)
496{
497 std::lock_guard<std::mutex> l(lock);
498 dout(20) << __func__ << " after " << after << dendl;
499
500 if (after == ghobject_t()) {
501 if (lru.empty()) {
502 return false;
503 }
504 ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.begin();
11fdf7f2 505 ceph_assert(p != onode_map.end());
7c673cae
FG
506 next->first = p->first;
507 next->second = p->second;
508 return true;
509 }
510
511 ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.find(after);
11fdf7f2 512 ceph_assert(p != onode_map.end()); // for now
7c673cae
FG
513 lru_list_t::iterator pi = lru.iterator_to(*p->second);
514 ++pi;
515 if (pi == lru.end()) {
516 return false;
517 }
518 next->first = pi->oid;
519 next->second = onode_map[pi->oid];
520 return true;
521}
522
523int KStore::OnodeHashLRU::trim(int max)
524{
525 std::lock_guard<std::mutex> l(lock);
526 dout(20) << __func__ << " max " << max
527 << " size " << onode_map.size() << dendl;
528 int trimmed = 0;
529 int num = onode_map.size() - max;
530 if (onode_map.size() == 0 || num <= 0)
531 return 0; // don't even try
532
533 lru_list_t::iterator p = lru.end();
534 if (num)
535 --p;
536 while (num > 0) {
537 Onode *o = &*p;
538 int refs = o->nref.load();
539 if (refs > 1) {
540 dout(20) << __func__ << " " << o->oid << " has " << refs
541 << " refs; stopping with " << num << " left to trim" << dendl;
542 break;
543 }
544 dout(30) << __func__ << " trim " << o->oid << dendl;
545 if (p != lru.begin()) {
546 lru.erase(p--);
547 } else {
548 lru.erase(p);
11fdf7f2 549 ceph_assert(num == 1);
7c673cae
FG
550 }
551 o->get(); // paranoia
552 onode_map.erase(o->oid);
553 o->put();
554 --num;
555 ++trimmed;
556 }
557 return trimmed;
558}
559
560// =======================================================
561
562// Collection
563
564#undef dout_prefix
565#define dout_prefix *_dout << "kstore(" << store->path << ").collection(" << cid << ") "
566
11fdf7f2
TL
567KStore::Collection::Collection(KStore *ns, coll_t cid)
568 : CollectionImpl(cid),
569 store(ns),
7c673cae 570 lock("KStore::Collection::lock", true, false),
11fdf7f2 571 osr(new OpSequencer()),
7c673cae
FG
572 onode_map(store->cct)
573{
574}
575
11fdf7f2
TL
576void KStore::Collection::flush()
577{
578 osr->flush();
579}
580
581bool KStore::Collection::flush_commit(Context *c)
582{
583 return osr->flush_commit(c);
584}
585
586
7c673cae
FG
587KStore::OnodeRef KStore::Collection::get_onode(
588 const ghobject_t& oid,
589 bool create)
590{
11fdf7f2 591 ceph_assert(create ? lock.is_wlocked() : lock.is_locked());
7c673cae
FG
592
593 spg_t pgid;
594 if (cid.is_pg(&pgid)) {
595 if (!oid.match(cnode.bits, pgid.ps())) {
596 lderr(store->cct) << __func__ << " oid " << oid << " not part of "
597 << pgid << " bits " << cnode.bits << dendl;
598 ceph_abort();
599 }
600 }
601
602 OnodeRef o = onode_map.lookup(oid);
603 if (o)
604 return o;
605
606 string key;
607 get_object_key(store->cct, oid, &key);
608
609 ldout(store->cct, 20) << __func__ << " oid " << oid << " key "
610 << pretty_binary_string(key) << dendl;
611
612 bufferlist v;
613 int r = store->db->get(PREFIX_OBJ, key, &v);
614 ldout(store->cct, 20) << " r " << r << " v.len " << v.length() << dendl;
615 Onode *on;
616 if (v.length() == 0) {
11fdf7f2 617 ceph_assert(r == -ENOENT);
7c673cae
FG
618 if (!create)
619 return OnodeRef();
620
621 // new
622 on = new Onode(store->cct, oid, key);
623 on->dirty = true;
624 } else {
625 // loaded
11fdf7f2 626 ceph_assert(r >=0);
7c673cae
FG
627 on = new Onode(store->cct, oid, key);
628 on->exists = true;
11fdf7f2
TL
629 auto p = v.cbegin();
630 decode(on->onode, p);
7c673cae
FG
631 }
632 o.reset(on);
633 onode_map.add(oid, o);
634 return o;
635}
636
637
638
639// =======================================================
640
641#undef dout_prefix
642#define dout_prefix *_dout << "kstore(" << path << ") "
643
644KStore::KStore(CephContext *cct, const string& path)
645 : ObjectStore(cct, path),
646 db(NULL),
11fdf7f2 647 basedir(path),
7c673cae
FG
648 path_fd(-1),
649 fsid_fd(-1),
650 mounted(false),
651 coll_lock("KStore::coll_lock"),
652 nid_last(0),
653 nid_max(0),
654 throttle_ops(cct, "kstore_max_ops", cct->_conf->kstore_max_ops),
655 throttle_bytes(cct, "kstore_max_bytes", cct->_conf->kstore_max_bytes),
656 finisher(cct),
657 kv_sync_thread(this),
658 kv_stop(false),
659 logger(nullptr)
660{
661 _init_logger();
662}
663
664KStore::~KStore()
665{
666 _shutdown_logger();
11fdf7f2
TL
667 ceph_assert(!mounted);
668 ceph_assert(db == NULL);
669 ceph_assert(fsid_fd < 0);
7c673cae
FG
670}
671
672void KStore::_init_logger()
673{
674 // XXX
675 PerfCountersBuilder b(cct, "KStore",
676 l_kstore_first, l_kstore_last);
677 b.add_time_avg(l_kstore_state_prepare_lat, "state_prepare_lat", "Average prepare state latency");
678 b.add_time_avg(l_kstore_state_kv_queued_lat, "state_kv_queued_lat", "Average kv_queued state latency");
679 b.add_time_avg(l_kstore_state_kv_done_lat, "state_kv_done_lat", "Average kv_done state latency");
680 b.add_time_avg(l_kstore_state_finishing_lat, "state_finishing_lat", "Average finishing state latency");
681 b.add_time_avg(l_kstore_state_done_lat, "state_done_lat", "Average done state latency");
682 logger = b.create_perf_counters();
683 cct->get_perfcounters_collection()->add(logger);
684}
685
686void KStore::_shutdown_logger()
687{
688 // XXX
689 cct->get_perfcounters_collection()->remove(logger);
690 delete logger;
691}
692
693int KStore::_open_path()
694{
11fdf7f2 695 ceph_assert(path_fd < 0);
91327a77 696 path_fd = ::open(path.c_str(), O_DIRECTORY|O_CLOEXEC);
7c673cae
FG
697 if (path_fd < 0) {
698 int r = -errno;
699 derr << __func__ << " unable to open " << path << ": " << cpp_strerror(r)
700 << dendl;
701 return r;
702 }
703 return 0;
704}
705
706void KStore::_close_path()
707{
708 VOID_TEMP_FAILURE_RETRY(::close(path_fd));
709 path_fd = -1;
710}
711
712int KStore::_open_fsid(bool create)
713{
11fdf7f2 714 ceph_assert(fsid_fd < 0);
7c673cae
FG
715 int flags = O_RDWR;
716 if (create)
717 flags |= O_CREAT;
718 fsid_fd = ::openat(path_fd, "fsid", flags, 0644);
719 if (fsid_fd < 0) {
720 int err = -errno;
721 derr << __func__ << " " << cpp_strerror(err) << dendl;
722 return err;
723 }
724 return 0;
725}
726
727int KStore::_read_fsid(uuid_d *uuid)
728{
729 char fsid_str[40];
730 memset(fsid_str, 0, sizeof(fsid_str));
731 int ret = safe_read(fsid_fd, fsid_str, sizeof(fsid_str));
732 if (ret < 0) {
733 derr << __func__ << " failed: " << cpp_strerror(ret) << dendl;
734 return ret;
735 }
736 if (ret > 36)
737 fsid_str[36] = 0;
738 else
739 fsid_str[ret] = 0;
740 if (!uuid->parse(fsid_str)) {
741 derr << __func__ << " unparsable uuid " << fsid_str << dendl;
742 return -EINVAL;
743 }
744 return 0;
745}
746
747int KStore::_write_fsid()
748{
749 int r = ::ftruncate(fsid_fd, 0);
750 if (r < 0) {
751 r = -errno;
752 derr << __func__ << " fsid truncate failed: " << cpp_strerror(r) << dendl;
753 return r;
754 }
755 string str = stringify(fsid) + "\n";
756 r = safe_write(fsid_fd, str.c_str(), str.length());
757 if (r < 0) {
758 derr << __func__ << " fsid write failed: " << cpp_strerror(r) << dendl;
759 return r;
760 }
761 r = ::fsync(fsid_fd);
762 if (r < 0) {
763 r = -errno;
764 derr << __func__ << " fsid fsync failed: " << cpp_strerror(r) << dendl;
765 return r;
766 }
767 return 0;
768}
769
770void KStore::_close_fsid()
771{
772 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
773 fsid_fd = -1;
774}
775
776int KStore::_lock_fsid()
777{
778 struct flock l;
779 memset(&l, 0, sizeof(l));
780 l.l_type = F_WRLCK;
781 l.l_whence = SEEK_SET;
782 l.l_start = 0;
783 l.l_len = 0;
784 int r = ::fcntl(fsid_fd, F_SETLK, &l);
785 if (r < 0) {
786 int err = errno;
787 derr << __func__ << " failed to lock " << path << "/fsid"
788 << " (is another ceph-osd still running?)"
789 << cpp_strerror(err) << dendl;
790 return -err;
791 }
792 return 0;
793}
794
795bool KStore::test_mount_in_use()
796{
797 // most error conditions mean the mount is not in use (e.g., because
798 // it doesn't exist). only if we fail to lock do we conclude it is
799 // in use.
800 bool ret = false;
801 int r = _open_path();
802 if (r < 0)
803 return false;
804 r = _open_fsid(false);
805 if (r < 0)
806 goto out_path;
807 r = _lock_fsid();
808 if (r < 0)
809 ret = true; // if we can't lock, it is in use
810 _close_fsid();
811 out_path:
812 _close_path();
813 return ret;
814}
815
816int KStore::_open_db(bool create)
817{
818 int r;
11fdf7f2 819 ceph_assert(!db);
7c673cae
FG
820 char fn[PATH_MAX];
821 snprintf(fn, sizeof(fn), "%s/db", path.c_str());
822
823 string kv_backend;
824 if (create) {
825 kv_backend = cct->_conf->kstore_backend;
826 } else {
827 r = read_meta("kv_backend", &kv_backend);
828 if (r < 0) {
829 derr << __func__ << " uanble to read 'kv_backend' meta" << dendl;
830 return -EIO;
831 }
832 }
833 dout(10) << __func__ << " kv_backend = " << kv_backend << dendl;
834
835 if (create) {
836 int r = ::mkdir(fn, 0755);
837 if (r < 0)
838 r = -errno;
839 if (r < 0 && r != -EEXIST) {
840 derr << __func__ << " failed to create " << fn << ": " << cpp_strerror(r)
841 << dendl;
842 return r;
843 }
844
845 // wal_dir, too!
846 char walfn[PATH_MAX];
847 snprintf(walfn, sizeof(walfn), "%s/db.wal", path.c_str());
848 r = ::mkdir(walfn, 0755);
849 if (r < 0)
850 r = -errno;
851 if (r < 0 && r != -EEXIST) {
852 derr << __func__ << " failed to create " << walfn
853 << ": " << cpp_strerror(r)
854 << dendl;
855 return r;
856 }
857 }
858
859 db = KeyValueDB::create(cct, kv_backend, fn);
860 if (!db) {
861 derr << __func__ << " error creating db" << dendl;
862 return -EIO;
863 }
864 string options;
865 if (kv_backend == "rocksdb")
866 options = cct->_conf->kstore_rocksdb_options;
867 db->init(options);
868 stringstream err;
869 if (create)
870 r = db->create_and_open(err);
871 else
872 r = db->open(err);
873 if (r) {
874 derr << __func__ << " erroring opening db: " << err.str() << dendl;
875 delete db;
876 db = NULL;
877 return -EIO;
878 }
879 dout(1) << __func__ << " opened " << kv_backend
880 << " path " << fn << " options " << options << dendl;
881 return 0;
882}
883
884void KStore::_close_db()
885{
11fdf7f2 886 ceph_assert(db);
7c673cae
FG
887 delete db;
888 db = NULL;
889}
890
891int KStore::_open_collections(int *errors)
892{
11fdf7f2 893 ceph_assert(coll_map.empty());
7c673cae
FG
894 KeyValueDB::Iterator it = db->get_iterator(PREFIX_COLL);
895 for (it->upper_bound(string());
896 it->valid();
897 it->next()) {
898 coll_t cid;
899 if (cid.parse(it->key())) {
900 CollectionRef c(new Collection(this, cid));
901 bufferlist bl = it->value();
11fdf7f2 902 auto p = bl.cbegin();
7c673cae 903 try {
11fdf7f2 904 decode(c->cnode, p);
7c673cae
FG
905 } catch (buffer::error& e) {
906 derr << __func__ << " failed to decode cnode, key:"
907 << pretty_binary_string(it->key()) << dendl;
908 return -EIO;
909 }
910 dout(20) << __func__ << " opened " << cid << dendl;
911 coll_map[cid] = c;
912 } else {
913 derr << __func__ << " unrecognized collection " << it->key() << dendl;
914 if (errors)
915 (*errors)++;
916 }
917 }
918 return 0;
919}
920
921int KStore::mkfs()
922{
923 dout(1) << __func__ << " path " << path << dendl;
924 int r;
925 uuid_d old_fsid;
926
927 r = _open_path();
928 if (r < 0)
929 return r;
930
931 r = _open_fsid(true);
932 if (r < 0)
933 goto out_path_fd;
934
935 r = _lock_fsid();
936 if (r < 0)
937 goto out_close_fsid;
938
939 r = _read_fsid(&old_fsid);
940 if (r < 0 || old_fsid.is_zero()) {
941 if (fsid.is_zero()) {
942 fsid.generate_random();
943 dout(1) << __func__ << " generated fsid " << fsid << dendl;
944 } else {
945 dout(1) << __func__ << " using provided fsid " << fsid << dendl;
946 }
947 // we'll write it last.
948 } else {
949 if (!fsid.is_zero() && fsid != old_fsid) {
950 derr << __func__ << " on-disk fsid " << old_fsid
951 << " != provided " << fsid << dendl;
952 r = -EINVAL;
953 goto out_close_fsid;
954 }
955 fsid = old_fsid;
956 dout(1) << __func__ << " already created, fsid is " << fsid << dendl;
957 goto out_close_fsid;
958 }
959
960 r = _open_db(true);
961 if (r < 0)
962 goto out_close_fsid;
963
964 r = write_meta("kv_backend", cct->_conf->kstore_backend);
965 if (r < 0)
966 goto out_close_db;
967
968 r = write_meta("type", "kstore");
969 if (r < 0)
970 goto out_close_db;
971
972 // indicate mkfs completion/success by writing the fsid file
973 r = _write_fsid();
974 if (r == 0)
975 dout(10) << __func__ << " success" << dendl;
976 else
977 derr << __func__ << " error writing fsid: " << cpp_strerror(r) << dendl;
978
979 out_close_db:
980 _close_db();
981 out_close_fsid:
982 _close_fsid();
983 out_path_fd:
984 _close_path();
985 return r;
986}
987
988int KStore::mount()
989{
990 dout(1) << __func__ << " path " << path << dendl;
991
992 if (cct->_conf->kstore_fsck_on_mount) {
993 int rc = fsck(cct->_conf->kstore_fsck_on_mount_deep);
994 if (rc < 0)
995 return rc;
996 }
997
998 int r = _open_path();
999 if (r < 0)
1000 return r;
1001 r = _open_fsid(false);
1002 if (r < 0)
1003 goto out_path;
1004
1005 r = _read_fsid(&fsid);
1006 if (r < 0)
1007 goto out_fsid;
1008
1009 r = _lock_fsid();
1010 if (r < 0)
1011 goto out_fsid;
1012
1013 r = _open_db(false);
1014 if (r < 0)
1015 goto out_fsid;
1016
1017 r = _open_super_meta();
1018 if (r < 0)
1019 goto out_db;
1020
1021 r = _open_collections();
1022 if (r < 0)
1023 goto out_db;
1024
1025 finisher.start();
1026 kv_sync_thread.create("kstore_kv_sync");
1027
1028 mounted = true;
1029 return 0;
1030
1031 out_db:
1032 _close_db();
1033 out_fsid:
1034 _close_fsid();
1035 out_path:
1036 _close_path();
1037 return r;
1038}
1039
1040int KStore::umount()
1041{
11fdf7f2 1042 ceph_assert(mounted);
7c673cae
FG
1043 dout(1) << __func__ << dendl;
1044
1045 _sync();
1046 _reap_collections();
1047 coll_map.clear();
1048
1049 dout(20) << __func__ << " stopping kv thread" << dendl;
1050 _kv_stop();
1051 dout(20) << __func__ << " draining finisher" << dendl;
1052 finisher.wait_for_empty();
1053 dout(20) << __func__ << " stopping finisher" << dendl;
1054 finisher.stop();
1055 dout(20) << __func__ << " closing" << dendl;
1056
1057 mounted = false;
1058 _close_db();
1059 _close_fsid();
1060 _close_path();
1061 return 0;
1062}
1063
1064int KStore::fsck(bool deep)
1065{
1066 dout(1) << __func__ << dendl;
1067 int errors = 0;
1068 dout(1) << __func__ << " finish with " << errors << " errors" << dendl;
1069 return errors;
1070}
1071
1072void KStore::_sync()
1073{
1074 dout(10) << __func__ << dendl;
1075
1076 std::unique_lock<std::mutex> l(kv_lock);
1077 while (!kv_committing.empty() ||
1078 !kv_queue.empty()) {
1079 dout(20) << " waiting for kv to commit" << dendl;
1080 kv_sync_cond.wait(l);
1081 }
1082
1083 dout(10) << __func__ << " done" << dendl;
1084}
1085
11fdf7f2
TL
1086int KStore::statfs(struct store_statfs_t* buf0, osd_alert_list_t* alerts)
1087{
1088 struct statfs buf;
1089 buf0->reset();
1090 if (alerts) {
1091 alerts->clear(); // returns nothing for now
1092 }
1093 if (::statfs(basedir.c_str(), &buf) < 0) {
1094 int r = -errno;
1095 ceph_assert(r != -ENOENT);
1096 return r;
1097 }
1098
1099 buf0->total = buf.f_blocks * buf.f_bsize;
1100 buf0->available = buf.f_bavail * buf.f_bsize;
1101
1102 return 0;
1103}
1104
1105ObjectStore::CollectionHandle KStore::open_collection(const coll_t& cid)
1106{
1107 return _get_collection(cid);
1108}
1109
1110ObjectStore::CollectionHandle KStore::create_new_collection(const coll_t& cid)
1111{
1112 auto *c = new Collection(this, cid);
1113 RWLock::WLocker l(coll_lock);
1114 new_coll_map[cid] = c;
1115 return c;
1116}
1117
1118int KStore::pool_statfs(uint64_t pool_id, struct store_statfs_t *buf)
7c673cae 1119{
11fdf7f2 1120 return -ENOTSUP;
7c673cae
FG
1121}
1122
1123// ---------------
1124// cache
1125
1126KStore::CollectionRef KStore::_get_collection(coll_t cid)
1127{
1128 RWLock::RLocker l(coll_lock);
1129 ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
1130 if (cp == coll_map.end())
1131 return CollectionRef();
1132 return cp->second;
1133}
1134
1135void KStore::_queue_reap_collection(CollectionRef& c)
1136{
1137 dout(10) << __func__ << " " << c->cid << dendl;
1138 std::lock_guard<std::mutex> l(reap_lock);
1139 removed_collections.push_back(c);
1140}
1141
1142void KStore::_reap_collections()
1143{
1144 list<CollectionRef> removed_colls;
1145 std::lock_guard<std::mutex> l(reap_lock);
1146 removed_colls.swap(removed_collections);
1147
1148 for (list<CollectionRef>::iterator p = removed_colls.begin();
1149 p != removed_colls.end();
1150 ++p) {
1151 CollectionRef c = *p;
1152 dout(10) << __func__ << " " << c->cid << dendl;
1153 {
1154 pair<ghobject_t,OnodeRef> next;
1155 while (c->onode_map.get_next(next.first, &next)) {
11fdf7f2 1156 ceph_assert(!next.second->exists);
7c673cae
FG
1157 if (!next.second->flush_txns.empty()) {
1158 dout(10) << __func__ << " " << c->cid << " " << next.second->oid
1159 << " flush_txns " << next.second->flush_txns << dendl;
1160 return;
1161 }
1162 }
1163 }
1164 c->onode_map.clear();
1165 dout(10) << __func__ << " " << c->cid << " done" << dendl;
1166 }
1167
1168 dout(10) << __func__ << " all reaped" << dendl;
1169}
1170
1171// ---------------
1172// read operations
1173
11fdf7f2 1174bool KStore::exists(CollectionHandle& ch, const ghobject_t& oid)
7c673cae 1175{
11fdf7f2
TL
1176 dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
1177 Collection *c = static_cast<Collection*>(ch.get());
7c673cae
FG
1178 RWLock::RLocker l(c->lock);
1179 OnodeRef o = c->get_onode(oid, false);
1180 if (!o || !o->exists)
1181 return false;
1182 return true;
1183}
1184
1185int KStore::stat(
11fdf7f2
TL
1186 CollectionHandle& ch,
1187 const ghobject_t& oid,
1188 struct stat *st,
1189 bool allow_eio)
7c673cae 1190{
11fdf7f2
TL
1191 dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
1192 Collection *c = static_cast<Collection*>(ch.get());
7c673cae
FG
1193 RWLock::RLocker l(c->lock);
1194 OnodeRef o = c->get_onode(oid, false);
1195 if (!o || !o->exists)
1196 return -ENOENT;
1197 st->st_size = o->onode.size;
1198 st->st_blksize = 4096;
1199 st->st_blocks = (st->st_size + st->st_blksize - 1) / st->st_blksize;
1200 st->st_nlink = 1;
1201 return 0;
1202}
1203
1204int KStore::set_collection_opts(
11fdf7f2 1205 CollectionHandle& ch,
7c673cae
FG
1206 const pool_opts_t& opts)
1207{
1208 return -EOPNOTSUPP;
1209}
1210
1211int KStore::read(
11fdf7f2 1212 CollectionHandle& ch,
7c673cae
FG
1213 const ghobject_t& oid,
1214 uint64_t offset,
1215 size_t length,
1216 bufferlist& bl,
224ce89b 1217 uint32_t op_flags)
7c673cae 1218{
11fdf7f2 1219 dout(15) << __func__ << " " << ch->cid << " " << oid
7c673cae
FG
1220 << " " << offset << "~" << length
1221 << dendl;
1222 bl.clear();
11fdf7f2 1223 Collection *c = static_cast<Collection*>(ch.get());
7c673cae
FG
1224 RWLock::RLocker l(c->lock);
1225
1226 int r;
1227
1228 OnodeRef o = c->get_onode(oid, false);
1229 if (!o || !o->exists) {
1230 r = -ENOENT;
1231 goto out;
1232 }
1233
1234 if (offset == length && offset == 0)
1235 length = o->onode.size;
1236
1237 r = _do_read(o, offset, length, bl, op_flags);
1238
1239 out:
11fdf7f2 1240 dout(10) << __func__ << " " << ch->cid << " " << oid
7c673cae
FG
1241 << " " << offset << "~" << length
1242 << " = " << r << dendl;
1243 return r;
1244}
1245
1246int KStore::_do_read(
1247 OnodeRef o,
1248 uint64_t offset,
1249 size_t length,
1250 bufferlist& bl,
1251 uint32_t op_flags)
1252{
1253 int r = 0;
1254 uint64_t stripe_size = o->onode.stripe_size;
1255 uint64_t stripe_off;
1256
1257 dout(20) << __func__ << " " << offset << "~" << length << " size "
1258 << o->onode.size << " nid " << o->onode.nid << dendl;
1259 bl.clear();
1260
1261 if (offset > o->onode.size) {
1262 goto out;
1263 }
1264 if (offset + length > o->onode.size) {
1265 length = o->onode.size - offset;
1266 }
1267 if (stripe_size == 0) {
1268 bl.append_zero(length);
1269 r = length;
1270 goto out;
1271 }
1272
1273 o->flush();
1274
1275 stripe_off = offset % stripe_size;
1276 while (length > 0) {
1277 bufferlist stripe;
1278 _do_read_stripe(o, offset - stripe_off, &stripe);
1279 dout(30) << __func__ << " stripe " << offset - stripe_off << " got "
1280 << stripe.length() << dendl;
11fdf7f2 1281 unsigned swant = std::min<unsigned>(stripe_size - stripe_off, length);
7c673cae
FG
1282 if (stripe.length()) {
1283 if (swant == stripe.length()) {
1284 bl.claim_append(stripe);
1285 dout(30) << __func__ << " taking full stripe" << dendl;
1286 } else {
1287 unsigned l = 0;
1288 if (stripe_off < stripe.length()) {
11fdf7f2 1289 l = std::min<uint64_t>(stripe.length() - stripe_off, swant);
7c673cae
FG
1290 bufferlist t;
1291 t.substr_of(stripe, stripe_off, l);
1292 bl.claim_append(t);
1293 dout(30) << __func__ << " taking " << stripe_off << "~" << l << dendl;
1294 }
1295 if (l < swant) {
1296 bl.append_zero(swant - l);
1297 dout(30) << __func__ << " adding " << swant - l << " zeros" << dendl;
1298 }
1299 }
1300 } else {
1301 dout(30) << __func__ << " generating " << swant << " zeros" << dendl;
1302 bl.append_zero(swant);
1303 }
1304 offset += swant;
1305 length -= swant;
1306 stripe_off = 0;
1307 }
1308 r = bl.length();
1309 dout(30) << " result:\n";
1310 bl.hexdump(*_dout);
1311 *_dout << dendl;
1312
1313 out:
1314 return r;
1315}
1316
1317int KStore::fiemap(
11fdf7f2 1318 CollectionHandle& ch,
7c673cae
FG
1319 const ghobject_t& oid,
1320 uint64_t offset,
1321 size_t len,
1322 bufferlist& bl)
1323{
1324 map<uint64_t, uint64_t> m;
11fdf7f2 1325 int r = fiemap(ch, oid, offset, len, m);
7c673cae 1326 if (r >= 0) {
11fdf7f2 1327 encode(m, bl);
7c673cae 1328 }
7c673cae
FG
1329 return r;
1330}
1331
1332int KStore::fiemap(
11fdf7f2 1333 CollectionHandle& ch,
7c673cae
FG
1334 const ghobject_t& oid,
1335 uint64_t offset,
1336 size_t len,
1337 map<uint64_t, uint64_t>& destmap)
1338{
11fdf7f2 1339 CollectionRef c = static_cast<Collection*>(ch.get());
7c673cae
FG
1340 if (!c)
1341 return -ENOENT;
1342 RWLock::RLocker l(c->lock);
1343
1344 OnodeRef o = c->get_onode(oid, false);
1345 if (!o || !o->exists) {
1346 return -ENOENT;
1347 }
1348
1349 if (offset > o->onode.size)
1350 goto out;
1351
1352 if (offset + len > o->onode.size) {
1353 len = o->onode.size - offset;
1354 }
1355
1356 dout(20) << __func__ << " " << offset << "~" << len << " size "
1357 << o->onode.size << dendl;
1358
1359 // FIXME: do something smarter here
1360 destmap[0] = o->onode.size;
1361
1362 out:
1363 dout(20) << __func__ << " " << offset << "~" << len
1364 << " size = 0 (" << destmap << ")" << dendl;
1365 return 0;
1366}
1367
1368int KStore::getattr(
11fdf7f2 1369 CollectionHandle& ch,
7c673cae
FG
1370 const ghobject_t& oid,
1371 const char *name,
1372 bufferptr& value)
1373{
11fdf7f2
TL
1374 dout(15) << __func__ << " " << ch->cid << " " << oid << " " << name << dendl;
1375 Collection *c = static_cast<Collection*>(ch.get());
7c673cae
FG
1376 RWLock::RLocker l(c->lock);
1377 int r;
1378 string k(name);
1379
1380 OnodeRef o = c->get_onode(oid, false);
1381 if (!o || !o->exists) {
1382 r = -ENOENT;
1383 goto out;
1384 }
1385
1386 if (!o->onode.attrs.count(k)) {
1387 r = -ENODATA;
1388 goto out;
1389 }
1390 value = o->onode.attrs[k];
1391 r = 0;
1392 out:
11fdf7f2 1393 dout(10) << __func__ << " " << ch->cid << " " << oid << " " << name
7c673cae
FG
1394 << " = " << r << dendl;
1395 return r;
1396}
1397
1398int KStore::getattrs(
11fdf7f2 1399 CollectionHandle& ch,
7c673cae
FG
1400 const ghobject_t& oid,
1401 map<string,bufferptr>& aset)
1402{
11fdf7f2
TL
1403 dout(15) << __func__ << " " << ch->cid << " " << oid << dendl;
1404 Collection *c = static_cast<Collection*>(ch.get());
7c673cae
FG
1405 RWLock::RLocker l(c->lock);
1406 int r;
1407
1408 OnodeRef o = c->get_onode(oid, false);
1409 if (!o || !o->exists) {
1410 r = -ENOENT;
1411 goto out;
1412 }
1413 aset = o->onode.attrs;
1414 r = 0;
1415 out:
11fdf7f2 1416 dout(10) << __func__ << " " << ch->cid << " " << oid
7c673cae
FG
1417 << " = " << r << dendl;
1418 return r;
1419}
1420
1421int KStore::list_collections(vector<coll_t>& ls)
1422{
1423 RWLock::RLocker l(coll_lock);
1424 for (ceph::unordered_map<coll_t, CollectionRef>::iterator p = coll_map.begin();
1425 p != coll_map.end();
1426 ++p)
1427 ls.push_back(p->first);
1428 return 0;
1429}
1430
1431bool KStore::collection_exists(const coll_t& c)
1432{
1433 RWLock::RLocker l(coll_lock);
1434 return coll_map.count(c);
1435}
1436
11fdf7f2 1437int KStore::collection_empty(CollectionHandle& ch, bool *empty)
7c673cae 1438{
11fdf7f2 1439 dout(15) << __func__ << " " << ch->cid << dendl;
7c673cae
FG
1440 vector<ghobject_t> ls;
1441 ghobject_t next;
11fdf7f2 1442 int r = collection_list(ch, ghobject_t(), ghobject_t::get_max(), 1,
7c673cae
FG
1443 &ls, &next);
1444 if (r < 0) {
1445 derr << __func__ << " collection_list returned: " << cpp_strerror(r)
1446 << dendl;
1447 return r;
1448 }
1449 *empty = ls.empty();
11fdf7f2 1450 dout(10) << __func__ << " " << ch->cid << " = " << (int)(*empty) << dendl;
7c673cae
FG
1451 return 0;
1452}
1453
11fdf7f2 1454int KStore::collection_bits(CollectionHandle& ch)
7c673cae 1455{
11fdf7f2 1456 dout(15) << __func__ << " " << ch->cid << dendl;
7c673cae
FG
1457 Collection *c = static_cast<Collection*>(ch.get());
1458 RWLock::RLocker l(c->lock);
11fdf7f2 1459 dout(10) << __func__ << " " << ch->cid << " = " << c->cnode.bits << dendl;
7c673cae
FG
1460 return c->cnode.bits;
1461}
1462
7c673cae
FG
1463int KStore::collection_list(
1464 CollectionHandle &c_, const ghobject_t& start, const ghobject_t& end, int max,
1465 vector<ghobject_t> *ls, ghobject_t *pnext)
1466
1467{
1468 Collection *c = static_cast<Collection*>(c_.get());
11fdf7f2 1469 c->flush();
7c673cae
FG
1470 dout(15) << __func__ << " " << c->cid
1471 << " start " << start << " end " << end << " max " << max << dendl;
1472 int r;
1473 {
1474 RWLock::RLocker l(c->lock);
1475 r = _collection_list(c, start, end, max, ls, pnext);
1476 }
1477
1478 dout(10) << __func__ << " " << c->cid
1479 << " start " << start << " end " << end << " max " << max
1480 << " = " << r << ", ls.size() = " << ls->size()
1481 << ", next = " << (pnext ? *pnext : ghobject_t()) << dendl;
1482 return r;
1483}
1484
1485int KStore::_collection_list(
1486 Collection* c, const ghobject_t& start, const ghobject_t& end, int max,
1487 vector<ghobject_t> *ls, ghobject_t *pnext)
1488{
1489 int r = 0;
1490 KeyValueDB::Iterator it;
1491 string temp_start_key, temp_end_key;
1492 string start_key, end_key;
1493 bool set_next = false;
1494 string pend;
1495 bool temp;
1496
1497 ghobject_t static_next;
1498 if (!pnext)
1499 pnext = &static_next;
1500
1501 if (start == ghobject_t::get_max() ||
1502 start.hobj.is_max()) {
1503 goto out;
1504 }
1505 get_coll_key_range(c->cid, c->cnode.bits, &temp_start_key, &temp_end_key,
1506 &start_key, &end_key);
1507 dout(20) << __func__
1508 << " range " << pretty_binary_string(temp_start_key)
1509 << " to " << pretty_binary_string(temp_end_key)
1510 << " and " << pretty_binary_string(start_key)
1511 << " to " << pretty_binary_string(end_key)
1512 << " start " << start << dendl;
1513 it = db->get_iterator(PREFIX_OBJ);
1514 if (start == ghobject_t() || start == c->cid.get_min_hobj()) {
1515 it->upper_bound(temp_start_key);
1516 temp = true;
1517 } else {
1518 string k;
1519 get_object_key(cct, start, &k);
1520 if (start.hobj.is_temp()) {
1521 temp = true;
11fdf7f2 1522 ceph_assert(k >= temp_start_key && k < temp_end_key);
7c673cae
FG
1523 } else {
1524 temp = false;
11fdf7f2 1525 ceph_assert(k >= start_key && k < end_key);
7c673cae
FG
1526 }
1527 dout(20) << " start from " << pretty_binary_string(k)
1528 << " temp=" << (int)temp << dendl;
1529 it->lower_bound(k);
1530 }
1531 if (end.hobj.is_max()) {
1532 pend = temp ? temp_end_key : end_key;
1533 } else {
1534 get_object_key(cct, end, &end_key);
1535 if (end.hobj.is_temp()) {
1536 if (temp)
1537 pend = end_key;
1538 else
1539 goto out;
1540 } else {
1541 pend = temp ? temp_end_key : end_key;
1542 }
1543 }
1544 dout(20) << __func__ << " pend " << pretty_binary_string(pend) << dendl;
1545 while (true) {
1546 if (!it->valid() || it->key() >= pend) {
1547 if (!it->valid())
1548 dout(20) << __func__ << " iterator not valid (end of db?)" << dendl;
1549 else
1550 dout(20) << __func__ << " key " << pretty_binary_string(it->key())
1551 << " > " << end << dendl;
1552 if (temp) {
1553 if (end.hobj.is_temp()) {
1554 break;
1555 }
1556 dout(30) << __func__ << " switch to non-temp namespace" << dendl;
1557 temp = false;
1558 it->upper_bound(start_key);
1559 pend = end_key;
1560 dout(30) << __func__ << " pend " << pretty_binary_string(pend) << dendl;
1561 continue;
1562 }
1563 break;
1564 }
1565 dout(20) << __func__ << " key " << pretty_binary_string(it->key()) << dendl;
1566 ghobject_t oid;
1567 int r = get_key_object(it->key(), &oid);
11fdf7f2 1568 ceph_assert(r == 0);
7c673cae
FG
1569 if (ls->size() >= (unsigned)max) {
1570 dout(20) << __func__ << " reached max " << max << dendl;
1571 *pnext = oid;
1572 set_next = true;
1573 break;
1574 }
1575 ls->push_back(oid);
1576 it->next();
1577 }
1578out:
1579 if (!set_next) {
1580 *pnext = ghobject_t::get_max();
1581 }
1582 return r;
1583}
1584
1585// omap reads
1586
1587KStore::OmapIteratorImpl::OmapIteratorImpl(
1588 CollectionRef c, OnodeRef o, KeyValueDB::Iterator it)
1589 : c(c), o(o), it(it)
1590{
1591 RWLock::RLocker l(c->lock);
1592 if (o->onode.omap_head) {
1593 get_omap_key(o->onode.omap_head, string(), &head);
1594 get_omap_tail(o->onode.omap_head, &tail);
1595 it->lower_bound(head);
1596 }
1597}
1598
1599int KStore::OmapIteratorImpl::seek_to_first()
1600{
1601 RWLock::RLocker l(c->lock);
1602 if (o->onode.omap_head) {
1603 it->lower_bound(head);
1604 } else {
1605 it = KeyValueDB::Iterator();
1606 }
1607 return 0;
1608}
1609
1610int KStore::OmapIteratorImpl::upper_bound(const string& after)
1611{
1612 RWLock::RLocker l(c->lock);
1613 if (o->onode.omap_head) {
1614 string key;
1615 get_omap_key(o->onode.omap_head, after, &key);
1616 it->upper_bound(key);
1617 } else {
1618 it = KeyValueDB::Iterator();
1619 }
1620 return 0;
1621}
1622
1623int KStore::OmapIteratorImpl::lower_bound(const string& to)
1624{
1625 RWLock::RLocker l(c->lock);
1626 if (o->onode.omap_head) {
1627 string key;
1628 get_omap_key(o->onode.omap_head, to, &key);
1629 it->lower_bound(key);
1630 } else {
1631 it = KeyValueDB::Iterator();
1632 }
1633 return 0;
1634}
1635
1636bool KStore::OmapIteratorImpl::valid()
1637{
1638 RWLock::RLocker l(c->lock);
1639 if (o->onode.omap_head && it->valid() && it->raw_key().second <= tail) {
1640 return true;
1641 } else {
1642 return false;
1643 }
1644}
1645
11fdf7f2 1646int KStore::OmapIteratorImpl::next()
7c673cae
FG
1647{
1648 RWLock::RLocker l(c->lock);
1649 if (o->onode.omap_head) {
1650 it->next();
1651 return 0;
1652 } else {
1653 return -1;
1654 }
1655}
1656
1657string KStore::OmapIteratorImpl::key()
1658{
1659 RWLock::RLocker l(c->lock);
11fdf7f2 1660 ceph_assert(it->valid());
7c673cae
FG
1661 string db_key = it->raw_key().second;
1662 string user_key;
1663 decode_omap_key(db_key, &user_key);
1664 return user_key;
1665}
1666
1667bufferlist KStore::OmapIteratorImpl::value()
1668{
1669 RWLock::RLocker l(c->lock);
11fdf7f2 1670 ceph_assert(it->valid());
7c673cae
FG
1671 return it->value();
1672}
1673
1674int KStore::omap_get(
11fdf7f2 1675 CollectionHandle& ch, ///< [in] Collection containing oid
7c673cae
FG
1676 const ghobject_t &oid, ///< [in] Object containing omap
1677 bufferlist *header, ///< [out] omap header
1678 map<string, bufferlist> *out /// < [out] Key to value map
1679 )
1680{
11fdf7f2
TL
1681 dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl;
1682 Collection *c = static_cast<Collection*>(ch.get());
7c673cae
FG
1683 RWLock::RLocker l(c->lock);
1684 int r = 0;
1685 OnodeRef o = c->get_onode(oid, false);
1686 if (!o || !o->exists) {
1687 r = -ENOENT;
1688 goto out;
1689 }
1690 if (!o->onode.omap_head)
1691 goto out;
1692 o->flush();
1693 {
1694 KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
1695 string head, tail;
1696 get_omap_header(o->onode.omap_head, &head);
1697 get_omap_tail(o->onode.omap_head, &tail);
1698 it->lower_bound(head);
1699 while (it->valid()) {
1700 if (it->key() == head) {
1701 dout(30) << __func__ << " got header" << dendl;
1702 *header = it->value();
1703 } else if (it->key() >= tail) {
1704 dout(30) << __func__ << " reached tail" << dendl;
1705 break;
1706 } else {
1707 string user_key;
1708 decode_omap_key(it->key(), &user_key);
1709 dout(30) << __func__ << " got " << pretty_binary_string(it->key())
1710 << " -> " << user_key << dendl;
11fdf7f2 1711 ceph_assert(it->key() < tail);
7c673cae
FG
1712 (*out)[user_key] = it->value();
1713 }
1714 it->next();
1715 }
1716 }
1717 out:
11fdf7f2 1718 dout(10) << __func__ << " " << ch->cid << " oid " << oid << " = " << r << dendl;
7c673cae
FG
1719 return r;
1720}
1721
1722int KStore::omap_get_header(
11fdf7f2 1723 CollectionHandle& ch, ///< [in] Collection containing oid
7c673cae
FG
1724 const ghobject_t &oid, ///< [in] Object containing omap
1725 bufferlist *header, ///< [out] omap header
1726 bool allow_eio ///< [in] don't assert on eio
1727 )
1728{
11fdf7f2
TL
1729 dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl;
1730 Collection *c = static_cast<Collection*>(ch.get());
7c673cae
FG
1731 RWLock::RLocker l(c->lock);
1732 int r = 0;
1733 OnodeRef o = c->get_onode(oid, false);
1734 if (!o || !o->exists) {
1735 r = -ENOENT;
1736 goto out;
1737 }
1738 if (!o->onode.omap_head)
1739 goto out;
1740 o->flush();
1741 {
1742 string head;
1743 get_omap_header(o->onode.omap_head, &head);
1744 if (db->get(PREFIX_OMAP, head, header) >= 0) {
1745 dout(30) << __func__ << " got header" << dendl;
1746 } else {
1747 dout(30) << __func__ << " no header" << dendl;
1748 }
1749 }
1750 out:
11fdf7f2 1751 dout(10) << __func__ << " " << ch->cid << " oid " << oid << " = " << r << dendl;
7c673cae
FG
1752 return r;
1753}
1754
1755int KStore::omap_get_keys(
11fdf7f2 1756 CollectionHandle& ch, ///< [in] Collection containing oid
7c673cae
FG
1757 const ghobject_t &oid, ///< [in] Object containing omap
1758 set<string> *keys ///< [out] Keys defined on oid
1759 )
1760{
11fdf7f2
TL
1761 dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl;
1762 Collection *c = static_cast<Collection*>(ch.get());
7c673cae
FG
1763 RWLock::RLocker l(c->lock);
1764 int r = 0;
1765 OnodeRef o = c->get_onode(oid, false);
1766 if (!o || !o->exists) {
1767 r = -ENOENT;
1768 goto out;
1769 }
1770 if (!o->onode.omap_head)
1771 goto out;
1772 o->flush();
1773 {
1774 KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
1775 string head, tail;
1776 get_omap_key(o->onode.omap_head, string(), &head);
1777 get_omap_tail(o->onode.omap_head, &tail);
1778 it->lower_bound(head);
1779 while (it->valid()) {
1780 if (it->key() >= tail) {
1781 dout(30) << __func__ << " reached tail" << dendl;
1782 break;
1783 }
1784 string user_key;
1785 decode_omap_key(it->key(), &user_key);
1786 dout(30) << __func__ << " got " << pretty_binary_string(it->key())
1787 << " -> " << user_key << dendl;
11fdf7f2 1788 ceph_assert(it->key() < tail);
7c673cae
FG
1789 keys->insert(user_key);
1790 it->next();
1791 }
1792 }
1793 out:
11fdf7f2 1794 dout(10) << __func__ << " " << ch->cid << " oid " << oid << " = " << r << dendl;
7c673cae
FG
1795 return r;
1796}
1797
1798int KStore::omap_get_values(
11fdf7f2 1799 CollectionHandle& ch, ///< [in] Collection containing oid
7c673cae
FG
1800 const ghobject_t &oid, ///< [in] Object containing omap
1801 const set<string> &keys, ///< [in] Keys to get
1802 map<string, bufferlist> *out ///< [out] Returned keys and values
1803 )
1804{
11fdf7f2
TL
1805 dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl;
1806 Collection *c = static_cast<Collection*>(ch.get());
7c673cae
FG
1807 RWLock::RLocker l(c->lock);
1808 int r = 0;
1809 OnodeRef o = c->get_onode(oid, false);
1810 if (!o || !o->exists) {
1811 r = -ENOENT;
1812 goto out;
1813 }
1814 if (!o->onode.omap_head)
1815 goto out;
1816 o->flush();
1817 for (set<string>::const_iterator p = keys.begin(); p != keys.end(); ++p) {
1818 string key;
1819 get_omap_key(o->onode.omap_head, *p, &key);
1820 bufferlist val;
1821 if (db->get(PREFIX_OMAP, key, &val) >= 0) {
1822 dout(30) << __func__ << " got " << pretty_binary_string(key)
1823 << " -> " << *p << dendl;
1824 out->insert(make_pair(*p, val));
1825 }
1826 }
1827 out:
11fdf7f2 1828 dout(10) << __func__ << " " << ch->cid << " oid " << oid << " = " << r << dendl;
7c673cae
FG
1829 return r;
1830}
1831
1832int KStore::omap_check_keys(
11fdf7f2 1833 CollectionHandle& ch, ///< [in] Collection containing oid
7c673cae
FG
1834 const ghobject_t &oid, ///< [in] Object containing omap
1835 const set<string> &keys, ///< [in] Keys to check
1836 set<string> *out ///< [out] Subset of keys defined on oid
1837 )
1838{
11fdf7f2
TL
1839 dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl;
1840 Collection *c = static_cast<Collection*>(ch.get());
7c673cae
FG
1841 RWLock::RLocker l(c->lock);
1842 int r = 0;
1843 OnodeRef o = c->get_onode(oid, false);
1844 if (!o || !o->exists) {
1845 r = -ENOENT;
1846 goto out;
1847 }
1848 if (!o->onode.omap_head)
1849 goto out;
1850 o->flush();
1851 for (set<string>::const_iterator p = keys.begin(); p != keys.end(); ++p) {
1852 string key;
1853 get_omap_key(o->onode.omap_head, *p, &key);
1854 bufferlist val;
1855 if (db->get(PREFIX_OMAP, key, &val) >= 0) {
1856 dout(30) << __func__ << " have " << pretty_binary_string(key)
1857 << " -> " << *p << dendl;
1858 out->insert(*p);
1859 } else {
1860 dout(30) << __func__ << " miss " << pretty_binary_string(key)
1861 << " -> " << *p << dendl;
1862 }
1863 }
1864 out:
11fdf7f2 1865 dout(10) << __func__ << " " << ch->cid << " oid " << oid << " = " << r << dendl;
7c673cae
FG
1866 return r;
1867}
1868
1869ObjectMap::ObjectMapIterator KStore::get_omap_iterator(
11fdf7f2 1870 CollectionHandle& ch, ///< [in] collection
7c673cae
FG
1871 const ghobject_t &oid ///< [in] object
1872 )
1873{
1874
11fdf7f2
TL
1875 dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
1876 Collection *c = static_cast<Collection*>(ch.get());
7c673cae
FG
1877 RWLock::RLocker l(c->lock);
1878 OnodeRef o = c->get_onode(oid, false);
1879 if (!o || !o->exists) {
1880 dout(10) << __func__ << " " << oid << "doesn't exist" <<dendl;
1881 return ObjectMap::ObjectMapIterator();
1882 }
1883 o->flush();
1884 dout(10) << __func__ << " header = " << o->onode.omap_head <<dendl;
1885 KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
1886 return ObjectMap::ObjectMapIterator(new OmapIteratorImpl(c, o, it));
1887}
1888
1889
1890// -----------------
1891// write helpers
1892
1893int KStore::_open_super_meta()
1894{
1895 // nid
1896 {
1897 nid_max = 0;
1898 bufferlist bl;
1899 db->get(PREFIX_SUPER, "nid_max", &bl);
11fdf7f2 1900 auto p = bl.cbegin();
7c673cae 1901 try {
11fdf7f2 1902 decode(nid_max, p);
7c673cae
FG
1903 } catch (buffer::error& e) {
1904 }
1905 dout(10) << __func__ << " old nid_max " << nid_max << dendl;
1906 nid_last = nid_max;
1907 }
1908 return 0;
1909}
1910
1911void KStore::_assign_nid(TransContext *txc, OnodeRef o)
1912{
1913 if (o->onode.nid)
1914 return;
1915 std::lock_guard<std::mutex> l(nid_lock);
1916 o->onode.nid = ++nid_last;
1917 dout(20) << __func__ << " " << o->oid << " nid " << o->onode.nid << dendl;
1918 if (nid_last > nid_max) {
1919 nid_max += cct->_conf->kstore_nid_prealloc;
1920 bufferlist bl;
11fdf7f2 1921 encode(nid_max, bl);
7c673cae
FG
1922 txc->t->set(PREFIX_SUPER, "nid_max", bl);
1923 dout(10) << __func__ << " nid_max now " << nid_max << dendl;
1924 }
1925}
1926
1927KStore::TransContext *KStore::_txc_create(OpSequencer *osr)
1928{
1929 TransContext *txc = new TransContext(osr);
1930 txc->t = db->get_transaction();
1931 osr->queue_new(txc);
1932 dout(20) << __func__ << " osr " << osr << " = " << txc << dendl;
1933 return txc;
1934}
1935
1936void KStore::_txc_state_proc(TransContext *txc)
1937{
1938 while (true) {
1939 dout(10) << __func__ << " txc " << txc
1940 << " " << txc->get_state_name() << dendl;
1941 switch (txc->state) {
1942 case TransContext::STATE_PREPARE:
1943 txc->log_state_latency(logger, l_kstore_state_prepare_lat);
1944 txc->state = TransContext::STATE_KV_QUEUED;
1945 if (!cct->_conf->kstore_sync_transaction) {
1946 std::lock_guard<std::mutex> l(kv_lock);
1947 if (cct->_conf->kstore_sync_submit_transaction) {
1948 int r = db->submit_transaction(txc->t);
11fdf7f2 1949 ceph_assert(r == 0);
7c673cae
FG
1950 }
1951 kv_queue.push_back(txc);
1952 kv_cond.notify_one();
1953 return;
1954 }
1955 {
1956 int r = db->submit_transaction_sync(txc->t);
11fdf7f2 1957 ceph_assert(r == 0);
7c673cae
FG
1958 }
1959 break;
1960
1961 case TransContext::STATE_KV_QUEUED:
1962 txc->log_state_latency(logger, l_kstore_state_kv_queued_lat);
1963 txc->state = TransContext::STATE_KV_DONE;
1964 _txc_finish_kv(txc);
1965 // ** fall-thru **
1966
1967 case TransContext::STATE_KV_DONE:
1968 txc->log_state_latency(logger, l_kstore_state_kv_done_lat);
1969 txc->state = TransContext::STATE_FINISHING;
1970 // ** fall-thru **
1971
1972 case TransContext::TransContext::STATE_FINISHING:
1973 txc->log_state_latency(logger, l_kstore_state_finishing_lat);
1974 _txc_finish(txc);
1975 return;
1976
1977 default:
1978 derr << __func__ << " unexpected txc " << txc
1979 << " state " << txc->get_state_name() << dendl;
11fdf7f2 1980 ceph_abort_msg("unexpected txc state");
7c673cae
FG
1981 return;
1982 }
1983 }
1984}
1985
1986void KStore::_txc_finalize(OpSequencer *osr, TransContext *txc)
1987{
1988 dout(20) << __func__ << " osr " << osr << " txc " << txc
1989 << " onodes " << txc->onodes << dendl;
1990
1991 // finalize onodes
1992 for (set<OnodeRef>::iterator p = txc->onodes.begin();
1993 p != txc->onodes.end();
1994 ++p) {
1995 bufferlist bl;
11fdf7f2 1996 encode((*p)->onode, bl);
7c673cae
FG
1997 dout(20) << " onode size is " << bl.length() << dendl;
1998 txc->t->set(PREFIX_OBJ, (*p)->key, bl);
1999
2000 std::lock_guard<std::mutex> l((*p)->flush_lock);
2001 (*p)->flush_txns.insert(txc);
2002 }
2003}
2004
2005void KStore::_txc_finish_kv(TransContext *txc)
2006{
2007 dout(20) << __func__ << " txc " << txc << dendl;
2008
2009 // warning: we're calling onreadable_sync inside the sequencer lock
2010 if (txc->onreadable_sync) {
2011 txc->onreadable_sync->complete(0);
2012 txc->onreadable_sync = NULL;
2013 }
2014 if (txc->onreadable) {
2015 finisher.queue(txc->onreadable);
2016 txc->onreadable = NULL;
2017 }
2018 if (txc->oncommit) {
2019 finisher.queue(txc->oncommit);
2020 txc->oncommit = NULL;
2021 }
2022 if (!txc->oncommits.empty()) {
2023 finisher.queue(txc->oncommits);
2024 }
2025
2026 throttle_ops.put(txc->ops);
2027 throttle_bytes.put(txc->bytes);
2028}
2029
2030void KStore::_txc_finish(TransContext *txc)
2031{
2032 dout(20) << __func__ << " " << txc << " onodes " << txc->onodes << dendl;
11fdf7f2 2033 ceph_assert(txc->state == TransContext::STATE_FINISHING);
7c673cae
FG
2034
2035 for (set<OnodeRef>::iterator p = txc->onodes.begin();
2036 p != txc->onodes.end();
2037 ++p) {
2038 std::lock_guard<std::mutex> l((*p)->flush_lock);
2039 dout(20) << __func__ << " onode " << *p << " had " << (*p)->flush_txns
2040 << dendl;
11fdf7f2 2041 ceph_assert((*p)->flush_txns.count(txc));
7c673cae
FG
2042 (*p)->flush_txns.erase(txc);
2043 if ((*p)->flush_txns.empty()) {
2044 (*p)->flush_cond.notify_all();
2045 (*p)->clear_pending_stripes();
2046 }
2047 }
2048
2049 // clear out refs
2050 txc->onodes.clear();
2051
2052 while (!txc->removed_collections.empty()) {
2053 _queue_reap_collection(txc->removed_collections.front());
2054 txc->removed_collections.pop_front();
2055 }
2056
2057 OpSequencerRef osr = txc->osr;
2058 {
2059 std::lock_guard<std::mutex> l(osr->qlock);
2060 txc->state = TransContext::STATE_DONE;
2061 }
2062
2063 _osr_reap_done(osr.get());
2064}
2065
2066void KStore::_osr_reap_done(OpSequencer *osr)
2067{
2068 std::lock_guard<std::mutex> l(osr->qlock);
2069 dout(20) << __func__ << " osr " << osr << dendl;
2070 while (!osr->q.empty()) {
2071 TransContext *txc = &osr->q.front();
2072 dout(20) << __func__ << " txc " << txc << " " << txc->get_state_name()
2073 << dendl;
2074 if (txc->state != TransContext::STATE_DONE) {
2075 break;
2076 }
2077
2078 if (txc->first_collection) {
2079 txc->first_collection->onode_map.trim(cct->_conf->kstore_onode_map_size);
2080 }
2081
2082 osr->q.pop_front();
2083 txc->log_state_latency(logger, l_kstore_state_done_lat);
2084 delete txc;
2085 osr->qcond.notify_all();
2086 if (osr->q.empty())
2087 dout(20) << __func__ << " osr " << osr << " q now empty" << dendl;
2088 }
2089}
2090
2091void KStore::_kv_sync_thread()
2092{
2093 dout(10) << __func__ << " start" << dendl;
2094 std::unique_lock<std::mutex> l(kv_lock);
2095 while (true) {
11fdf7f2 2096 ceph_assert(kv_committing.empty());
7c673cae
FG
2097 if (kv_queue.empty()) {
2098 if (kv_stop)
2099 break;
2100 dout(20) << __func__ << " sleep" << dendl;
2101 kv_sync_cond.notify_all();
2102 kv_cond.wait(l);
2103 dout(20) << __func__ << " wake" << dendl;
2104 } else {
2105 dout(20) << __func__ << " committing " << kv_queue.size() << dendl;
2106 kv_committing.swap(kv_queue);
2107 utime_t start = ceph_clock_now();
2108 l.unlock();
2109
2110 dout(30) << __func__ << " committing txc " << kv_committing << dendl;
2111
2112 // one transaction to force a sync
2113 KeyValueDB::Transaction t = db->get_transaction();
2114 if (!cct->_conf->kstore_sync_submit_transaction) {
2115 for (std::deque<TransContext *>::iterator it = kv_committing.begin();
2116 it != kv_committing.end();
2117 ++it) {
2118 int r = db->submit_transaction((*it)->t);
11fdf7f2 2119 ceph_assert(r == 0);
7c673cae
FG
2120 }
2121 }
2122 int r = db->submit_transaction_sync(t);
11fdf7f2 2123 ceph_assert(r == 0);
7c673cae
FG
2124 utime_t finish = ceph_clock_now();
2125 utime_t dur = finish - start;
2126 dout(20) << __func__ << " committed " << kv_committing.size()
2127 << " in " << dur << dendl;
2128 while (!kv_committing.empty()) {
2129 TransContext *txc = kv_committing.front();
2130 _txc_state_proc(txc);
2131 kv_committing.pop_front();
2132 }
2133
2134 // this is as good a place as any ...
2135 _reap_collections();
2136
2137 l.lock();
2138 }
2139 }
2140 dout(10) << __func__ << " finish" << dendl;
2141}
2142
2143
2144// ---------------------------
2145// transactions
2146
2147int KStore::queue_transactions(
11fdf7f2
TL
2148 CollectionHandle& ch,
2149 vector<Transaction>& tls,
2150 TrackedOpRef op,
2151 ThreadPool::TPHandle *handle)
7c673cae
FG
2152{
2153 Context *onreadable;
2154 Context *ondisk;
2155 Context *onreadable_sync;
2156 ObjectStore::Transaction::collect_contexts(
2157 tls, &onreadable, &ondisk, &onreadable_sync);
2158
2159 // set up the sequencer
11fdf7f2
TL
2160 Collection *c = static_cast<Collection*>(ch.get());
2161 OpSequencer *osr = c->osr.get();
2162 dout(10) << __func__ << " ch " << ch.get() << " " << c->cid << dendl;
7c673cae
FG
2163
2164 // prepare
2165 TransContext *txc = _txc_create(osr);
2166 txc->onreadable = onreadable;
2167 txc->onreadable_sync = onreadable_sync;
2168 txc->oncommit = ondisk;
2169
2170 for (vector<Transaction>::iterator p = tls.begin(); p != tls.end(); ++p) {
7c673cae
FG
2171 txc->ops += (*p).get_num_ops();
2172 txc->bytes += (*p).get_num_bytes();
2173 _txc_add_transaction(txc, &(*p));
2174 }
2175
2176 _txc_finalize(osr, txc);
2177
2178 throttle_ops.get(txc->ops);
2179 throttle_bytes.get(txc->bytes);
2180
2181 // execute (start)
2182 _txc_state_proc(txc);
2183 return 0;
2184}
2185
2186void KStore::_txc_add_transaction(TransContext *txc, Transaction *t)
2187{
2188 Transaction::iterator i = t->begin();
2189
2190 dout(30) << __func__ << " transaction dump:\n";
2191 JSONFormatter f(true);
2192 f.open_object_section("transaction");
2193 t->dump(&f);
2194 f.close_section();
2195 f.flush(*_dout);
2196 *_dout << dendl;
2197
2198 vector<CollectionRef> cvec(i.colls.size());
2199 unsigned j = 0;
2200 for (vector<coll_t>::iterator p = i.colls.begin(); p != i.colls.end();
2201 ++p, ++j) {
2202 cvec[j] = _get_collection(*p);
2203
2204 // note first collection we reference
2205 if (!j && !txc->first_collection)
2206 txc->first_collection = cvec[j];
2207 }
2208 vector<OnodeRef> ovec(i.objects.size());
2209
2210 for (int pos = 0; i.have_op(); ++pos) {
2211 Transaction::Op *op = i.decode_op();
2212 int r = 0;
2213
2214 // no coll or obj
2215 if (op->op == Transaction::OP_NOP)
2216 continue;
2217
2218 // collection operations
2219 CollectionRef &c = cvec[op->cid];
2220 switch (op->op) {
2221 case Transaction::OP_RMCOLL:
2222 {
2223 coll_t cid = i.get_cid(op->cid);
2224 r = _remove_collection(txc, cid, &c);
2225 if (!r)
2226 continue;
2227 }
2228 break;
2229
2230 case Transaction::OP_MKCOLL:
2231 {
11fdf7f2 2232 ceph_assert(!c);
7c673cae
FG
2233 coll_t cid = i.get_cid(op->cid);
2234 r = _create_collection(txc, cid, op->split_bits, &c);
2235 if (!r)
2236 continue;
2237 }
2238 break;
2239
2240 case Transaction::OP_SPLIT_COLLECTION:
11fdf7f2 2241 ceph_abort_msg("deprecated");
7c673cae
FG
2242 break;
2243
2244 case Transaction::OP_SPLIT_COLLECTION2:
2245 {
2246 uint32_t bits = op->split_bits;
2247 uint32_t rem = op->split_rem;
2248 r = _split_collection(txc, c, cvec[op->dest_cid], bits, rem);
2249 if (!r)
2250 continue;
2251 }
2252 break;
2253
11fdf7f2
TL
2254 case Transaction::OP_MERGE_COLLECTION:
2255 {
2256 uint32_t bits = op->split_bits;
2257 r = _merge_collection(txc, &c, cvec[op->dest_cid], bits);
2258 if (!r)
2259 continue;
2260 }
2261 break;
2262
7c673cae
FG
2263 case Transaction::OP_COLL_HINT:
2264 {
2265 uint32_t type = op->hint_type;
2266 bufferlist hint;
2267 i.decode_bl(hint);
11fdf7f2 2268 auto hiter = hint.cbegin();
7c673cae
FG
2269 if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) {
2270 uint32_t pg_num;
2271 uint64_t num_objs;
11fdf7f2
TL
2272 decode(pg_num, hiter);
2273 decode(num_objs, hiter);
7c673cae
FG
2274 dout(10) << __func__ << " collection hint objects is a no-op, "
2275 << " pg_num " << pg_num << " num_objects " << num_objs
2276 << dendl;
2277 } else {
2278 // Ignore the hint
2279 dout(10) << __func__ << " unknown collection hint " << type << dendl;
2280 }
2281 continue;
2282 }
2283 break;
2284
2285 case Transaction::OP_COLL_SETATTR:
2286 r = -EOPNOTSUPP;
2287 break;
2288
2289 case Transaction::OP_COLL_RMATTR:
2290 r = -EOPNOTSUPP;
2291 break;
2292
2293 case Transaction::OP_COLL_RENAME:
11fdf7f2 2294 ceph_abort_msg("not implemented");
7c673cae
FG
2295 break;
2296 }
2297 if (r < 0) {
2298 derr << " error " << cpp_strerror(r)
2299 << " not handled on operation " << op->op
2300 << " (op " << pos << ", counting from 0)" << dendl;
2301 dout(0) << " transaction dump:\n";
2302 JSONFormatter f(true);
2303 f.open_object_section("transaction");
2304 t->dump(&f);
2305 f.close_section();
2306 f.flush(*_dout);
2307 *_dout << dendl;
11fdf7f2 2308 ceph_abort_msg("unexpected error");
7c673cae
FG
2309 }
2310
2311 // object operations
2312 RWLock::WLocker l(c->lock);
2313 OnodeRef &o = ovec[op->oid];
2314 if (!o) {
2315 // these operations implicity create the object
2316 bool create = false;
2317 if (op->op == Transaction::OP_TOUCH ||
2318 op->op == Transaction::OP_WRITE ||
2319 op->op == Transaction::OP_ZERO) {
2320 create = true;
2321 }
2322 ghobject_t oid = i.get_oid(op->oid);
2323 o = c->get_onode(oid, create);
2324 if (!create) {
2325 if (!o || !o->exists) {
2326 dout(10) << __func__ << " op " << op->op << " got ENOENT on "
2327 << oid << dendl;
2328 r = -ENOENT;
2329 goto endop;
2330 }
2331 }
2332 }
2333
2334 switch (op->op) {
2335 case Transaction::OP_TOUCH:
2336 r = _touch(txc, c, o);
2337 break;
2338
2339 case Transaction::OP_WRITE:
2340 {
2341 uint64_t off = op->off;
2342 uint64_t len = op->len;
2343 uint32_t fadvise_flags = i.get_fadvise_flags();
2344 bufferlist bl;
2345 i.decode_bl(bl);
2346 r = _write(txc, c, o, off, len, bl, fadvise_flags);
2347 }
2348 break;
2349
2350 case Transaction::OP_ZERO:
2351 {
2352 uint64_t off = op->off;
2353 uint64_t len = op->len;
2354 r = _zero(txc, c, o, off, len);
2355 }
2356 break;
2357
2358 case Transaction::OP_TRIMCACHE:
2359 {
2360 // deprecated, no-op
2361 }
2362 break;
2363
2364 case Transaction::OP_TRUNCATE:
2365 {
2366 uint64_t off = op->off;
2367 r = _truncate(txc, c, o, off);
2368 }
2369 break;
2370
2371 case Transaction::OP_REMOVE:
2372 r = _remove(txc, c, o);
2373 break;
2374
2375 case Transaction::OP_SETATTR:
2376 {
2377 string name = i.decode_string();
2378 bufferlist bl;
2379 i.decode_bl(bl);
2380 map<string, bufferptr> to_set;
2381 to_set[name] = bufferptr(bl.c_str(), bl.length());
2382 r = _setattrs(txc, c, o, to_set);
2383 }
2384 break;
2385
2386 case Transaction::OP_SETATTRS:
2387 {
2388 map<string, bufferptr> aset;
2389 i.decode_attrset(aset);
2390 r = _setattrs(txc, c, o, aset);
2391 }
2392 break;
2393
2394 case Transaction::OP_RMATTR:
2395 {
2396 string name = i.decode_string();
2397 r = _rmattr(txc, c, o, name);
2398 }
2399 break;
2400
2401 case Transaction::OP_RMATTRS:
2402 {
2403 r = _rmattrs(txc, c, o);
2404 }
2405 break;
2406
2407 case Transaction::OP_CLONE:
2408 {
2409 const ghobject_t& noid = i.get_oid(op->dest_oid);
2410 OnodeRef no = c->get_onode(noid, true);
2411 r = _clone(txc, c, o, no);
2412 }
2413 break;
2414
2415 case Transaction::OP_CLONERANGE:
11fdf7f2 2416 ceph_abort_msg("deprecated");
7c673cae
FG
2417 break;
2418
2419 case Transaction::OP_CLONERANGE2:
2420 {
2421 const ghobject_t& noid = i.get_oid(op->dest_oid);
2422 OnodeRef no = c->get_onode(noid, true);
2423 uint64_t srcoff = op->off;
2424 uint64_t len = op->len;
2425 uint64_t dstoff = op->dest_off;
2426 r = _clone_range(txc, c, o, no, srcoff, len, dstoff);
2427 }
2428 break;
2429
2430 case Transaction::OP_COLL_ADD:
11fdf7f2 2431 ceph_abort_msg("not implemented");
7c673cae
FG
2432 break;
2433
2434 case Transaction::OP_COLL_REMOVE:
11fdf7f2 2435 ceph_abort_msg("not implemented");
7c673cae
FG
2436 break;
2437
2438 case Transaction::OP_COLL_MOVE:
11fdf7f2 2439 ceph_abort_msg("deprecated");
7c673cae
FG
2440 break;
2441
2442 case Transaction::OP_COLL_MOVE_RENAME:
2443 {
11fdf7f2 2444 ceph_assert(op->cid == op->dest_cid);
7c673cae
FG
2445 const ghobject_t& noid = i.get_oid(op->dest_oid);
2446 OnodeRef no = c->get_onode(noid, true);
2447 r = _rename(txc, c, o, no, noid);
2448 o.reset();
2449 }
2450 break;
2451
2452 case Transaction::OP_TRY_RENAME:
2453 {
2454 const ghobject_t& noid = i.get_oid(op->dest_oid);
2455 OnodeRef no = c->get_onode(noid, true);
2456 r = _rename(txc, c, o, no, noid);
2457 if (r == -ENOENT)
2458 r = 0;
2459 o.reset();
2460 }
2461 break;
2462
2463 case Transaction::OP_OMAP_CLEAR:
2464 {
2465 r = _omap_clear(txc, c, o);
2466 }
2467 break;
2468 case Transaction::OP_OMAP_SETKEYS:
2469 {
2470 bufferlist aset_bl;
2471 i.decode_attrset_bl(&aset_bl);
2472 r = _omap_setkeys(txc, c, o, aset_bl);
2473 }
2474 break;
2475 case Transaction::OP_OMAP_RMKEYS:
2476 {
2477 bufferlist keys_bl;
2478 i.decode_keyset_bl(&keys_bl);
2479 r = _omap_rmkeys(txc, c, o, keys_bl);
2480 }
2481 break;
2482 case Transaction::OP_OMAP_RMKEYRANGE:
2483 {
2484 string first, last;
2485 first = i.decode_string();
2486 last = i.decode_string();
2487 r = _omap_rmkey_range(txc, c, o, first, last);
2488 }
2489 break;
2490 case Transaction::OP_OMAP_SETHEADER:
2491 {
2492 bufferlist bl;
2493 i.decode_bl(bl);
2494 r = _omap_setheader(txc, c, o, bl);
2495 }
2496 break;
2497
2498 case Transaction::OP_SETALLOCHINT:
2499 {
2500 uint64_t expected_object_size = op->expected_object_size;
2501 uint64_t expected_write_size = op->expected_write_size;
2502 uint32_t flags = op->alloc_hint_flags;
2503 r = _setallochint(txc, c, o,
2504 expected_object_size,
2505 expected_write_size,
2506 flags);
2507 }
2508 break;
2509
2510 default:
2511 derr << "bad op " << op->op << dendl;
2512 ceph_abort();
2513 }
2514
2515 endop:
2516 if (r < 0) {
2517 bool ok = false;
2518
2519 if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE ||
2520 op->op == Transaction::OP_CLONE ||
2521 op->op == Transaction::OP_CLONERANGE2 ||
2522 op->op == Transaction::OP_COLL_ADD))
2523 // -ENOENT is usually okay
2524 ok = true;
2525 if (r == -ENODATA)
2526 ok = true;
2527
2528 if (!ok) {
2529 const char *msg = "unexpected error code";
2530
2531 if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE ||
2532 op->op == Transaction::OP_CLONE ||
2533 op->op == Transaction::OP_CLONERANGE2))
2534 msg = "ENOENT on clone suggests osd bug";
2535
2536 if (r == -ENOSPC)
2537 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
2538 // by partially applying transactions.
2539 msg = "ENOSPC from key value store, misconfigured cluster";
2540
2541 if (r == -ENOTEMPTY) {
2542 msg = "ENOTEMPTY suggests garbage data in osd data dir";
2543 }
2544
2545 dout(0) << " error " << cpp_strerror(r) << " not handled on operation " << op->op
2546 << " (op " << pos << ", counting from 0)" << dendl;
2547 dout(0) << msg << dendl;
2548 dout(0) << " transaction dump:\n";
2549 JSONFormatter f(true);
2550 f.open_object_section("transaction");
2551 t->dump(&f);
2552 f.close_section();
2553 f.flush(*_dout);
2554 *_dout << dendl;
11fdf7f2 2555 ceph_abort_msg("unexpected error");
7c673cae
FG
2556 }
2557 }
2558 }
2559}
2560
2561
2562
2563// -----------------
2564// write operations
2565
2566int KStore::_touch(TransContext *txc,
2567 CollectionRef& c,
2568 OnodeRef &o)
2569{
2570 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2571 int r = 0;
2572 o->exists = true;
2573 _assign_nid(txc, o);
2574 txc->write_onode(o);
2575 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
2576 return r;
2577}
2578
2579void KStore::_dump_onode(OnodeRef o)
2580{
2581 dout(30) << __func__ << " " << o
2582 << " nid " << o->onode.nid
2583 << " size " << o->onode.size
2584 << " expected_object_size " << o->onode.expected_object_size
2585 << " expected_write_size " << o->onode.expected_write_size
2586 << dendl;
2587 for (map<string,bufferptr>::iterator p = o->onode.attrs.begin();
2588 p != o->onode.attrs.end();
2589 ++p) {
2590 dout(30) << __func__ << " attr " << p->first
2591 << " len " << p->second.length() << dendl;
2592 }
2593}
2594
2595void KStore::_do_read_stripe(OnodeRef o, uint64_t offset, bufferlist *pbl)
2596{
2597 map<uint64_t,bufferlist>::iterator p = o->pending_stripes.find(offset);
2598 if (p == o->pending_stripes.end()) {
2599 string key;
2600 get_data_key(o->onode.nid, offset, &key);
2601 db->get(PREFIX_DATA, key, pbl);
2602 o->pending_stripes[offset] = *pbl;
2603 } else {
2604 *pbl = p->second;
2605 }
2606}
2607
2608void KStore::_do_write_stripe(TransContext *txc, OnodeRef o,
2609 uint64_t offset, bufferlist& bl)
2610{
2611 o->pending_stripes[offset] = bl;
2612 string key;
2613 get_data_key(o->onode.nid, offset, &key);
2614 txc->t->set(PREFIX_DATA, key, bl);
2615}
2616
2617void KStore::_do_remove_stripe(TransContext *txc, OnodeRef o, uint64_t offset)
2618{
2619 o->pending_stripes.erase(offset);
2620 string key;
2621 get_data_key(o->onode.nid, offset, &key);
2622 txc->t->rmkey(PREFIX_DATA, key);
2623}
2624
2625int KStore::_do_write(TransContext *txc,
2626 OnodeRef o,
2627 uint64_t offset, uint64_t length,
2628 bufferlist& orig_bl,
2629 uint32_t fadvise_flags)
2630{
2631 int r = 0;
2632
2633 dout(20) << __func__
2634 << " " << o->oid << " " << offset << "~" << length
2635 << " - have " << o->onode.size
2636 << " bytes, nid " << o->onode.nid << dendl;
2637 _dump_onode(o);
2638 o->exists = true;
2639
2640 if (length == 0) {
2641 return 0;
2642 }
2643
2644 uint64_t stripe_size = o->onode.stripe_size;
2645 if (!stripe_size) {
2646 o->onode.stripe_size = cct->_conf->kstore_default_stripe_size;
2647 stripe_size = o->onode.stripe_size;
2648 }
2649
2650 unsigned bl_off = 0;
2651 while (length > 0) {
2652 uint64_t offset_rem = offset % stripe_size;
2653 uint64_t end_rem = (offset + length) % stripe_size;
2654 if (offset_rem == 0 && end_rem == 0) {
2655 bufferlist bl;
2656 bl.substr_of(orig_bl, bl_off, stripe_size);
2657 dout(30) << __func__ << " full stripe " << offset << dendl;
2658 _do_write_stripe(txc, o, offset, bl);
2659 offset += stripe_size;
2660 length -= stripe_size;
2661 bl_off += stripe_size;
2662 continue;
2663 }
2664 uint64_t stripe_off = offset - offset_rem;
2665 bufferlist prev;
2666 _do_read_stripe(o, stripe_off, &prev);
2667 dout(20) << __func__ << " read previous stripe " << stripe_off
2668 << ", got " << prev.length() << dendl;
2669 bufferlist bl;
2670 if (offset_rem) {
11fdf7f2 2671 unsigned p = std::min<uint64_t>(prev.length(), offset_rem);
7c673cae
FG
2672 if (p) {
2673 dout(20) << __func__ << " reuse leading " << p << " bytes" << dendl;
2674 bl.substr_of(prev, 0, p);
2675 }
2676 if (p < offset_rem) {
2677 dout(20) << __func__ << " add leading " << offset_rem - p << " zeros" << dendl;
2678 bl.append_zero(offset_rem - p);
2679 }
2680 }
2681 unsigned use = stripe_size - offset_rem;
2682 if (use > length)
2683 use -= stripe_size - end_rem;
2684 dout(20) << __func__ << " using " << use << " for this stripe" << dendl;
2685 bufferlist t;
2686 t.substr_of(orig_bl, bl_off, use);
2687 bl.claim_append(t);
2688 bl_off += use;
2689 if (end_rem) {
2690 if (end_rem < prev.length()) {
2691 unsigned l = prev.length() - end_rem;
2692 dout(20) << __func__ << " reuse trailing " << l << " bytes" << dendl;
2693 bufferlist t;
2694 t.substr_of(prev, end_rem, l);
2695 bl.claim_append(t);
2696 }
2697 }
2698 dout(30) << " writing:\n";
2699 bl.hexdump(*_dout);
2700 *_dout << dendl;
2701 _do_write_stripe(txc, o, stripe_off, bl);
2702 offset += use;
2703 length -= use;
2704 }
2705
2706 if (offset > o->onode.size) {
2707 dout(20) << __func__ << " extending size to " << offset + length
2708 << dendl;
2709 o->onode.size = offset;
2710 }
2711
2712 return r;
2713}
2714
2715int KStore::_write(TransContext *txc,
2716 CollectionRef& c,
2717 OnodeRef& o,
2718 uint64_t offset, size_t length,
2719 bufferlist& bl,
2720 uint32_t fadvise_flags)
2721{
2722 dout(15) << __func__ << " " << c->cid << " " << o->oid
2723 << " " << offset << "~" << length
2724 << dendl;
2725 _assign_nid(txc, o);
2726 int r = _do_write(txc, o, offset, length, bl, fadvise_flags);
2727 txc->write_onode(o);
2728
2729 dout(10) << __func__ << " " << c->cid << " " << o->oid
2730 << " " << offset << "~" << length
2731 << " = " << r << dendl;
2732 return r;
2733}
2734
2735int KStore::_zero(TransContext *txc,
2736 CollectionRef& c,
2737 OnodeRef& o,
2738 uint64_t offset, size_t length)
2739{
2740 dout(15) << __func__ << " " << c->cid << " " << o->oid
2741 << " " << offset << "~" << length
2742 << dendl;
2743 int r = 0;
2744 o->exists = true;
2745
2746 _dump_onode(o);
2747 _assign_nid(txc, o);
2748
2749 uint64_t stripe_size = o->onode.stripe_size;
2750 if (stripe_size) {
2751 uint64_t end = offset + length;
2752 uint64_t pos = offset;
2753 uint64_t stripe_off = pos % stripe_size;
2754 while (pos < offset + length) {
2755 if (stripe_off || end - pos < stripe_size) {
2756 bufferlist stripe;
2757 _do_read_stripe(o, pos - stripe_off, &stripe);
2758 dout(30) << __func__ << " stripe " << pos - stripe_off << " got "
2759 << stripe.length() << dendl;
2760 bufferlist bl;
11fdf7f2 2761 bl.substr_of(stripe, 0, std::min<uint64_t>(stripe.length(), stripe_off));
7c673cae
FG
2762 if (end >= pos - stripe_off + stripe_size ||
2763 end >= o->onode.size) {
2764 dout(20) << __func__ << " truncated stripe " << pos - stripe_off
2765 << " to " << bl.length() << dendl;
2766 } else {
2767 auto len = end - (pos - stripe_off + bl.length());
2768 bl.append_zero(len);
2769 dout(20) << __func__ << " adding " << len << " of zeros" << dendl;
2770 if (stripe.length() > bl.length()) {
2771 unsigned l = stripe.length() - bl.length();
2772 bufferlist t;
2773 t.substr_of(stripe, stripe.length() - l, l);
2774 dout(20) << __func__ << " keeping tail " << l << " of stripe" << dendl;
2775 bl.claim_append(t);
2776 }
2777 }
2778 _do_write_stripe(txc, o, pos - stripe_off, bl);
2779 pos += stripe_size - stripe_off;
2780 stripe_off = 0;
2781 } else {
2782 dout(20) << __func__ << " rm stripe " << pos << dendl;
2783 _do_remove_stripe(txc, o, pos - stripe_off);
2784 pos += stripe_size;
2785 }
2786 }
2787 }
2788 if (offset + length > o->onode.size) {
2789 o->onode.size = offset + length;
2790 dout(20) << __func__ << " extending size to " << offset + length
2791 << dendl;
2792 }
2793 txc->write_onode(o);
2794
2795 dout(10) << __func__ << " " << c->cid << " " << o->oid
2796 << " " << offset << "~" << length
2797 << " = " << r << dendl;
2798 return r;
2799}
2800
2801int KStore::_do_truncate(TransContext *txc, OnodeRef o, uint64_t offset)
2802{
2803 uint64_t stripe_size = o->onode.stripe_size;
2804
2805 o->flush();
2806
2807 // trim down stripes
2808 if (stripe_size) {
2809 uint64_t pos = offset;
2810 uint64_t stripe_off = pos % stripe_size;
2811 while (pos < o->onode.size) {
2812 if (stripe_off) {
2813 bufferlist stripe;
2814 _do_read_stripe(o, pos - stripe_off, &stripe);
2815 dout(30) << __func__ << " stripe " << pos - stripe_off << " got "
2816 << stripe.length() << dendl;
2817 bufferlist t;
11fdf7f2 2818 t.substr_of(stripe, 0, std::min<uint64_t>(stripe_off, stripe.length()));
7c673cae
FG
2819 _do_write_stripe(txc, o, pos - stripe_off, t);
2820 dout(20) << __func__ << " truncated stripe " << pos - stripe_off
2821 << " to " << t.length() << dendl;
2822 pos += stripe_size - stripe_off;
2823 stripe_off = 0;
2824 } else {
2825 dout(20) << __func__ << " rm stripe " << pos << dendl;
2826 _do_remove_stripe(txc, o, pos - stripe_off);
2827 pos += stripe_size;
2828 }
2829 }
2830
2831 // trim down cached tail
2832 if (o->tail_bl.length()) {
2833 if (offset / stripe_size != o->onode.size / stripe_size) {
2834 dout(20) << __func__ << " clear cached tail" << dendl;
2835 o->clear_tail();
2836 }
2837 }
2838 }
2839
2840 o->onode.size = offset;
2841 dout(10) << __func__ << " truncate size to " << offset << dendl;
2842
2843 txc->write_onode(o);
2844 return 0;
2845}
2846
2847int KStore::_truncate(TransContext *txc,
2848 CollectionRef& c,
2849 OnodeRef& o,
2850 uint64_t offset)
2851{
2852 dout(15) << __func__ << " " << c->cid << " " << o->oid
2853 << " " << offset
2854 << dendl;
2855 int r = _do_truncate(txc, o, offset);
2856 dout(10) << __func__ << " " << c->cid << " " << o->oid
2857 << " " << offset
2858 << " = " << r << dendl;
2859 return r;
2860}
2861
2862int KStore::_do_remove(TransContext *txc,
2863 OnodeRef o)
2864{
2865 string key;
2866
2867 _do_truncate(txc, o, 0);
2868
2869 o->onode.size = 0;
2870 if (o->onode.omap_head) {
2871 _do_omap_clear(txc, o->onode.omap_head);
2872 }
2873 o->exists = false;
2874 o->onode = kstore_onode_t();
2875 txc->onodes.erase(o);
2876 get_object_key(cct, o->oid, &key);
2877 txc->t->rmkey(PREFIX_OBJ, key);
2878 return 0;
2879}
2880
2881int KStore::_remove(TransContext *txc,
2882 CollectionRef& c,
2883 OnodeRef &o)
2884{
2885 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2886 int r = _do_remove(txc, o);
2887 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
2888 return r;
2889}
2890
2891int KStore::_setattr(TransContext *txc,
2892 CollectionRef& c,
2893 OnodeRef& o,
2894 const string& name,
2895 bufferptr& val)
2896{
2897 dout(15) << __func__ << " " << c->cid << " " << o->oid
2898 << " " << name << " (" << val.length() << " bytes)"
2899 << dendl;
2900 int r = 0;
2901 o->onode.attrs[name] = val;
2902 txc->write_onode(o);
2903 dout(10) << __func__ << " " << c->cid << " " << o->oid
2904 << " " << name << " (" << val.length() << " bytes)"
2905 << " = " << r << dendl;
2906 return r;
2907}
2908
2909int KStore::_setattrs(TransContext *txc,
2910 CollectionRef& c,
2911 OnodeRef& o,
2912 const map<string,bufferptr>& aset)
2913{
2914 dout(15) << __func__ << " " << c->cid << " " << o->oid
2915 << " " << aset.size() << " keys"
2916 << dendl;
2917 int r = 0;
2918 for (map<string,bufferptr>::const_iterator p = aset.begin();
2919 p != aset.end(); ++p) {
2920 if (p->second.is_partial())
2921 o->onode.attrs[p->first] = bufferptr(p->second.c_str(), p->second.length());
2922 else
2923 o->onode.attrs[p->first] = p->second;
2924 }
2925 txc->write_onode(o);
2926 dout(10) << __func__ << " " << c->cid << " " << o->oid
2927 << " " << aset.size() << " keys"
2928 << " = " << r << dendl;
2929 return r;
2930}
2931
2932
2933int KStore::_rmattr(TransContext *txc,
2934 CollectionRef& c,
2935 OnodeRef& o,
2936 const string& name)
2937{
2938 dout(15) << __func__ << " " << c->cid << " " << o->oid
2939 << " " << name << dendl;
2940 int r = 0;
2941 o->onode.attrs.erase(name);
2942 txc->write_onode(o);
2943 dout(10) << __func__ << " " << c->cid << " " << o->oid
2944 << " " << name << " = " << r << dendl;
2945 return r;
2946}
2947
2948int KStore::_rmattrs(TransContext *txc,
2949 CollectionRef& c,
2950 OnodeRef& o)
2951{
2952 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2953 int r = 0;
2954 o->onode.attrs.clear();
2955 txc->write_onode(o);
2956 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
2957 return r;
2958}
2959
2960void KStore::_do_omap_clear(TransContext *txc, uint64_t id)
2961{
2962 KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
2963 string prefix, tail;
2964 get_omap_header(id, &prefix);
2965 get_omap_tail(id, &tail);
2966 it->lower_bound(prefix);
2967 while (it->valid()) {
2968 if (it->key() >= tail) {
2969 dout(30) << __func__ << " stop at " << tail << dendl;
2970 break;
2971 }
2972 txc->t->rmkey(PREFIX_OMAP, it->key());
2973 dout(30) << __func__ << " rm " << pretty_binary_string(it->key()) << dendl;
2974 it->next();
2975 }
2976}
2977
2978int KStore::_omap_clear(TransContext *txc,
2979 CollectionRef& c,
2980 OnodeRef& o)
2981{
2982 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2983 int r = 0;
2984 if (o->onode.omap_head != 0) {
2985 _do_omap_clear(txc, o->onode.omap_head);
2986 }
2987 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
2988 return r;
2989}
2990
2991int KStore::_omap_setkeys(TransContext *txc,
2992 CollectionRef& c,
2993 OnodeRef& o,
2994 bufferlist &bl)
2995{
2996 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2997 int r;
11fdf7f2 2998 auto p = bl.cbegin();
7c673cae
FG
2999 __u32 num;
3000 if (!o->onode.omap_head) {
3001 o->onode.omap_head = o->onode.nid;
3002 txc->write_onode(o);
3003 }
11fdf7f2 3004 decode(num, p);
7c673cae
FG
3005 while (num--) {
3006 string key;
3007 bufferlist value;
11fdf7f2
TL
3008 decode(key, p);
3009 decode(value, p);
7c673cae
FG
3010 string final_key;
3011 get_omap_key(o->onode.omap_head, key, &final_key);
3012 dout(30) << __func__ << " " << pretty_binary_string(final_key)
3013 << " <- " << key << dendl;
3014 txc->t->set(PREFIX_OMAP, final_key, value);
3015 }
3016 r = 0;
3017 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
3018 return r;
3019}
3020
3021int KStore::_omap_setheader(TransContext *txc,
3022 CollectionRef& c,
3023 OnodeRef &o,
3024 bufferlist& bl)
3025{
3026 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
3027 int r;
3028 string key;
3029 if (!o->onode.omap_head) {
3030 o->onode.omap_head = o->onode.nid;
3031 txc->write_onode(o);
3032 }
3033 get_omap_header(o->onode.omap_head, &key);
3034 txc->t->set(PREFIX_OMAP, key, bl);
3035 r = 0;
3036 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
3037 return r;
3038}
3039
3040int KStore::_omap_rmkeys(TransContext *txc,
3041 CollectionRef& c,
3042 OnodeRef& o,
11fdf7f2 3043 const bufferlist& bl)
7c673cae
FG
3044{
3045 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
3046 int r = 0;
11fdf7f2 3047 auto p = bl.cbegin();
7c673cae
FG
3048 __u32 num;
3049
3050 if (!o->onode.omap_head) {
3051 r = 0;
3052 goto out;
3053 }
11fdf7f2 3054 decode(num, p);
7c673cae
FG
3055 while (num--) {
3056 string key;
11fdf7f2 3057 decode(key, p);
7c673cae
FG
3058 string final_key;
3059 get_omap_key(o->onode.omap_head, key, &final_key);
3060 dout(30) << __func__ << " rm " << pretty_binary_string(final_key)
3061 << " <- " << key << dendl;
3062 txc->t->rmkey(PREFIX_OMAP, final_key);
3063 }
3064 r = 0;
3065
3066 out:
3067 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
3068 return r;
3069}
3070
3071int KStore::_omap_rmkey_range(TransContext *txc,
3072 CollectionRef& c,
3073 OnodeRef& o,
3074 const string& first, const string& last)
3075{
3076 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
3077 KeyValueDB::Iterator it;
3078 string key_first, key_last;
3079 int r = 0;
3080
3081 if (!o->onode.omap_head) {
3082 goto out;
3083 }
3084 it = db->get_iterator(PREFIX_OMAP);
3085 get_omap_key(o->onode.omap_head, first, &key_first);
3086 get_omap_key(o->onode.omap_head, last, &key_last);
3087 it->lower_bound(key_first);
3088 while (it->valid()) {
3089 if (it->key() >= key_last) {
3090 dout(30) << __func__ << " stop at " << pretty_binary_string(key_last)
3091 << dendl;
3092 break;
3093 }
3094 txc->t->rmkey(PREFIX_OMAP, it->key());
3095 dout(30) << __func__ << " rm " << pretty_binary_string(it->key()) << dendl;
3096 it->next();
3097 }
3098 r = 0;
3099
3100 out:
3101 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
3102 return r;
3103}
3104
3105int KStore::_setallochint(TransContext *txc,
3106 CollectionRef& c,
3107 OnodeRef& o,
3108 uint64_t expected_object_size,
3109 uint64_t expected_write_size,
3110 uint32_t flags)
3111{
3112 dout(15) << __func__ << " " << c->cid << " " << o->oid
3113 << " object_size " << expected_object_size
3114 << " write_size " << expected_write_size
3115 << " flags " << flags
3116 << dendl;
3117 int r = 0;
3118 o->onode.expected_object_size = expected_object_size;
3119 o->onode.expected_write_size = expected_write_size;
3120 o->onode.alloc_hint_flags = flags;
3121
3122 txc->write_onode(o);
3123 dout(10) << __func__ << " " << c->cid << " " << o->oid
3124 << " object_size " << expected_object_size
3125 << " write_size " << expected_write_size
3126 << " = " << r << dendl;
3127 return r;
3128}
3129
3130int KStore::_clone(TransContext *txc,
3131 CollectionRef& c,
3132 OnodeRef& oldo,
3133 OnodeRef& newo)
3134{
3135 dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3136 << newo->oid << dendl;
3137 int r = 0;
3138 if (oldo->oid.hobj.get_hash() != newo->oid.hobj.get_hash()) {
3139 derr << __func__ << " mismatched hash on " << oldo->oid
3140 << " and " << newo->oid << dendl;
3141 return -EINVAL;
3142 }
3143
3144 bufferlist bl;
3145 newo->exists = true;
3146 _assign_nid(txc, newo);
3147
3148 // data
3149 oldo->flush();
3150
3151 r = _do_read(oldo, 0, oldo->onode.size, bl, 0);
3152 if (r < 0)
3153 goto out;
3154
3155 // truncate any old data
3156 r = _do_truncate(txc, newo, 0);
3157 if (r < 0)
3158 goto out;
3159
3160 r = _do_write(txc, newo, 0, oldo->onode.size, bl, 0);
3161 if (r < 0)
3162 goto out;
3163
3164 newo->onode.attrs = oldo->onode.attrs;
3165
3166 // clone omap
3167 if (newo->onode.omap_head) {
3168 dout(20) << __func__ << " clearing old omap data" << dendl;
3169 _do_omap_clear(txc, newo->onode.omap_head);
3170 }
3171 if (oldo->onode.omap_head) {
3172 dout(20) << __func__ << " copying omap data" << dendl;
3173 if (!newo->onode.omap_head) {
3174 newo->onode.omap_head = newo->onode.nid;
3175 }
3176 KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
3177 string head, tail;
3178 get_omap_header(oldo->onode.omap_head, &head);
3179 get_omap_tail(oldo->onode.omap_head, &tail);
3180 it->lower_bound(head);
3181 while (it->valid()) {
3182 string key;
3183 if (it->key() >= tail) {
3184 dout(30) << __func__ << " reached tail" << dendl;
3185 break;
3186 } else {
3187 dout(30) << __func__ << " got header/data "
3188 << pretty_binary_string(it->key()) << dendl;
11fdf7f2 3189 ceph_assert(it->key() < tail);
7c673cae
FG
3190 rewrite_omap_key(newo->onode.omap_head, it->key(), &key);
3191 txc->t->set(PREFIX_OMAP, key, it->value());
3192 }
3193 it->next();
3194 }
3195 }
3196
3197 txc->write_onode(newo);
3198 r = 0;
3199
3200 out:
3201 dout(10) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3202 << newo->oid << " = " << r << dendl;
3203 return r;
3204}
3205
3206int KStore::_clone_range(TransContext *txc,
3207 CollectionRef& c,
3208 OnodeRef& oldo,
3209 OnodeRef& newo,
3210 uint64_t srcoff, uint64_t length, uint64_t dstoff)
3211{
3212 dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3213 << newo->oid << " from " << srcoff << "~" << length
3214 << " to offset " << dstoff << dendl;
3215 int r = 0;
3216
3217 bufferlist bl;
3218 newo->exists = true;
3219 _assign_nid(txc, newo);
3220
3221 r = _do_read(oldo, srcoff, length, bl, 0);
3222 if (r < 0)
3223 goto out;
3224
3225 r = _do_write(txc, newo, dstoff, bl.length(), bl, 0);
3226 if (r < 0)
3227 goto out;
3228
3229 txc->write_onode(newo);
3230
3231 r = 0;
3232
3233 out:
3234 dout(10) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3235 << newo->oid << " from " << srcoff << "~" << length
3236 << " to offset " << dstoff
3237 << " = " << r << dendl;
3238 return r;
3239}
3240
3241int KStore::_rename(TransContext *txc,
3242 CollectionRef& c,
3243 OnodeRef& oldo,
3244 OnodeRef& newo,
3245 const ghobject_t& new_oid)
3246{
3247 dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3248 << new_oid << dendl;
3249 int r;
3250 ghobject_t old_oid = oldo->oid;
3251 bufferlist bl;
3252 string old_key, new_key;
3253
3254 if (newo && newo->exists) {
3255 // destination object already exists, remove it first
3256 r = _do_remove(txc, newo);
3257 if (r < 0)
3258 goto out;
3259 }
3260
3261 txc->t->rmkey(PREFIX_OBJ, oldo->key);
3262 txc->write_onode(oldo);
3263 c->onode_map.rename(old_oid, new_oid); // this adjusts oldo->{oid,key}
3264 r = 0;
3265
3266 out:
3267 dout(10) << __func__ << " " << c->cid << " " << old_oid << " -> "
3268 << new_oid << " = " << r << dendl;
3269 return r;
3270}
3271
3272// collections
3273
3274int KStore::_create_collection(
3275 TransContext *txc,
3276 coll_t cid,
3277 unsigned bits,
3278 CollectionRef *c)
3279{
3280 dout(15) << __func__ << " " << cid << " bits " << bits << dendl;
3281 int r;
3282 bufferlist bl;
3283
3284 {
3285 RWLock::WLocker l(coll_lock);
3286 if (*c) {
3287 r = -EEXIST;
3288 goto out;
3289 }
11fdf7f2
TL
3290 auto p = new_coll_map.find(cid);
3291 ceph_assert(p != new_coll_map.end());
3292 *c = p->second;
3293 ceph_assert((*c)->cid == cid);
7c673cae
FG
3294 (*c)->cnode.bits = bits;
3295 coll_map[cid] = *c;
11fdf7f2 3296 new_coll_map.erase(p);
7c673cae 3297 }
11fdf7f2 3298 encode((*c)->cnode, bl);
7c673cae
FG
3299 txc->t->set(PREFIX_COLL, stringify(cid), bl);
3300 r = 0;
3301
3302 out:
3303 dout(10) << __func__ << " " << cid << " bits " << bits << " = " << r << dendl;
3304 return r;
3305}
3306
3307int KStore::_remove_collection(TransContext *txc, coll_t cid,
3308 CollectionRef *c)
3309{
3310 dout(15) << __func__ << " " << cid << dendl;
3311 int r;
3312
3313 {
3314 RWLock::WLocker l(coll_lock);
3315 if (!*c) {
3316 r = -ENOENT;
3317 goto out;
3318 }
3319 size_t nonexistent_count = 0;
3320 pair<ghobject_t,OnodeRef> next_onode;
3321 while ((*c)->onode_map.get_next(next_onode.first, &next_onode)) {
3322 if (next_onode.second->exists) {
3323 r = -ENOTEMPTY;
3324 goto out;
3325 }
3326 ++nonexistent_count;
3327 }
3328 vector<ghobject_t> ls;
3329 ghobject_t next;
3330 // Enumerate onodes in db, up to nonexistent_count + 1
3331 // then check if all of them are marked as non-existent.
3332 // Bypass the check if returned number is greater than nonexistent_count
3333 r = _collection_list(c->get(), ghobject_t(), ghobject_t::get_max(),
3334 nonexistent_count + 1, &ls, &next);
3335 if (r >= 0) {
3336 bool exists = false; //ls.size() > nonexistent_count;
3337 for (auto it = ls.begin(); !exists && it < ls.end(); ++it) {
3338 dout(10) << __func__ << " oid " << *it << dendl;
3339 auto onode = (*c)->onode_map.lookup(*it);
3340 exists = !onode || onode->exists;
3341 if (exists) {
3342 dout(10) << __func__ << " " << *it
3343 << " exists in db" << dendl;
3344 }
3345 }
3346 if (!exists) {
3347 coll_map.erase(cid);
3348 txc->removed_collections.push_back(*c);
3349 c->reset();
3350 txc->t->rmkey(PREFIX_COLL, stringify(cid));
3351 r = 0;
3352 } else {
3353 dout(10) << __func__ << " " << cid
3354 << " is non-empty" << dendl;
3355 r = -ENOTEMPTY;
3356 }
3357 }
3358 }
3359
3360 out:
3361 dout(10) << __func__ << " " << cid << " = " << r << dendl;
3362 return r;
3363}
3364
3365int KStore::_split_collection(TransContext *txc,
3366 CollectionRef& c,
3367 CollectionRef& d,
3368 unsigned bits, int rem)
3369{
3370 dout(15) << __func__ << " " << c->cid << " to " << d->cid << " "
3371 << " bits " << bits << dendl;
3372 int r;
3373 RWLock::WLocker l(c->lock);
3374 RWLock::WLocker l2(d->lock);
3375 c->onode_map.clear();
3376 d->onode_map.clear();
3377 c->cnode.bits = bits;
11fdf7f2 3378 ceph_assert(d->cnode.bits == bits);
7c673cae
FG
3379 r = 0;
3380
3381 bufferlist bl;
11fdf7f2 3382 encode(c->cnode, bl);
7c673cae
FG
3383 txc->t->set(PREFIX_COLL, stringify(c->cid), bl);
3384
3385 dout(10) << __func__ << " " << c->cid << " to " << d->cid << " "
3386 << " bits " << bits << " = " << r << dendl;
3387 return r;
3388}
3389
11fdf7f2
TL
3390int KStore::_merge_collection(TransContext *txc,
3391 CollectionRef *c,
3392 CollectionRef& d,
3393 unsigned bits)
3394{
3395 dout(15) << __func__ << " " << (*c)->cid << " to " << d->cid << " "
3396 << " bits " << bits << dendl;
3397 int r;
3398 RWLock::WLocker l((*c)->lock);
3399 RWLock::WLocker l2(d->lock);
3400 (*c)->onode_map.clear();
3401 d->onode_map.clear();
3402 d->cnode.bits = bits;
3403 r = 0;
3404
3405 coll_t cid = (*c)->cid;
3406
3407 bufferlist bl;
3408 encode(d->cnode, bl);
3409 txc->t->set(PREFIX_COLL, stringify(d->cid), bl);
3410
3411 coll_map.erase((*c)->cid);
3412 txc->removed_collections.push_back(*c);
3413 c->reset();
3414 txc->t->rmkey(PREFIX_COLL, stringify(cid));
3415
3416 dout(10) << __func__ << " " << cid << " to " << d->cid << " "
3417 << " bits " << bits << " = " << r << dendl;
3418 return r;
3419}
3420
7c673cae 3421// ===========================================