]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/kstore/KStore.cc
77ccfe0055263f522d16ffe6aca998e2118acd35
[ceph.git] / ceph / src / os / kstore / KStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <unistd.h>
16 #include <stdlib.h>
17 #include <sys/types.h>
18 #include <sys/stat.h>
19 #include <fcntl.h>
20 #include <unistd.h>
21 #if defined(__FreeBSD__)
22 #include <sys/param.h>
23 #include <sys/mount.h>
24 #endif
25
26 #include "KStore.h"
27 #include "osd/osd_types.h"
28 #include "os/kv.h"
29 #include "include/compat.h"
30 #include "include/stringify.h"
31 #include "common/errno.h"
32 #include "common/safe_io.h"
33 #include "common/Formatter.h"
34
35
36 #define dout_context cct
37 #define dout_subsys ceph_subsys_kstore
38
39 /*
40
41 TODO:
42
43 * superblock, features
44 * refcounted extents (for efficient clone)
45
46 */
47
48 const string PREFIX_SUPER = "S"; // field -> value
49 const string PREFIX_COLL = "C"; // collection name -> (nothing)
50 const string PREFIX_OBJ = "O"; // object name -> onode
51 const string PREFIX_DATA = "D"; // nid + offset -> data
52 const string PREFIX_OMAP = "M"; // u64 + keyname -> value
53
54 /*
55 * object name key structure
56 *
57 * 2 chars: shard (-- for none, or hex digit, so that we sort properly)
58 * encoded u64: poolid + 2^63 (so that it sorts properly)
59 * encoded u32: hash (bit reversed)
60 *
61 * 1 char: '.'
62 *
63 * escaped string: namespace
64 *
65 * 1 char: '<', '=', or '>'. if =, then object key == object name, and
66 * we are followed just by the key. otherwise, we are followed by
67 * the key and then the object name.
68 * escaped string: key
69 * escaped string: object name (unless '=' above)
70 *
71 * encoded u64: snap
72 * encoded u64: generation
73 */
74
75 /*
76 * string encoding in the key
77 *
78 * The key string needs to lexicographically sort the same way that
79 * ghobject_t does. We do this by escaping anything <= to '#' with #
80 * plus a 2 digit hex string, and anything >= '~' with ~ plus the two
81 * hex digits.
82 *
83 * We use ! as a terminator for strings; this works because it is < #
84 * and will get escaped if it is present in the string.
85 *
86 */
87
88 static void append_escaped(const string &in, string *out)
89 {
90 char hexbyte[8];
91 for (string::const_iterator i = in.begin(); i != in.end(); ++i) {
92 if (*i <= '#') {
93 snprintf(hexbyte, sizeof(hexbyte), "#%02x", (uint8_t)*i);
94 out->append(hexbyte);
95 } else if (*i >= '~') {
96 snprintf(hexbyte, sizeof(hexbyte), "~%02x", (uint8_t)*i);
97 out->append(hexbyte);
98 } else {
99 out->push_back(*i);
100 }
101 }
102 out->push_back('!');
103 }
104
105 static int decode_escaped(const char *p, string *out)
106 {
107 const char *orig_p = p;
108 while (*p && *p != '!') {
109 if (*p == '#' || *p == '~') {
110 unsigned hex;
111 int r = sscanf(++p, "%2x", &hex);
112 if (r < 1)
113 return -EINVAL;
114 out->push_back((char)hex);
115 p += 2;
116 } else {
117 out->push_back(*p++);
118 }
119 }
120 return p - orig_p;
121 }
122
123 // some things we encode in binary (as le32 or le64); print the
124 // resulting key strings nicely
125 static string pretty_binary_string(const string& in)
126 {
127 char buf[10];
128 string out;
129 out.reserve(in.length() * 3);
130 enum { NONE, HEX, STRING } mode = NONE;
131 unsigned from = 0, i;
132 for (i=0; i < in.length(); ++i) {
133 if ((in[i] < 32 || (unsigned char)in[i] > 126) ||
134 (mode == HEX && in.length() - i >= 4 &&
135 ((in[i] < 32 || (unsigned char)in[i] > 126) ||
136 (in[i+1] < 32 || (unsigned char)in[i+1] > 126) ||
137 (in[i+2] < 32 || (unsigned char)in[i+2] > 126) ||
138 (in[i+3] < 32 || (unsigned char)in[i+3] > 126)))) {
139 if (mode == STRING) {
140 out.append(in.substr(from, i - from));
141 out.push_back('\'');
142 }
143 if (mode != HEX) {
144 out.append("0x");
145 mode = HEX;
146 }
147 if (in.length() - i >= 4) {
148 // print a whole u32 at once
149 snprintf(buf, sizeof(buf), "%08x",
150 (uint32_t)(((unsigned char)in[i] << 24) |
151 ((unsigned char)in[i+1] << 16) |
152 ((unsigned char)in[i+2] << 8) |
153 ((unsigned char)in[i+3] << 0)));
154 i += 3;
155 } else {
156 snprintf(buf, sizeof(buf), "%02x", (int)(unsigned char)in[i]);
157 }
158 out.append(buf);
159 } else {
160 if (mode != STRING) {
161 out.push_back('\'');
162 mode = STRING;
163 from = i;
164 }
165 }
166 }
167 if (mode == STRING) {
168 out.append(in.substr(from, i - from));
169 out.push_back('\'');
170 }
171 return out;
172 }
173
174 static void _key_encode_shard(shard_id_t shard, string *key)
175 {
176 // make field ordering match with ghobject_t compare operations
177 if (shard == shard_id_t::NO_SHARD) {
178 // otherwise ff will sort *after* 0, not before.
179 key->append("--");
180 } else {
181 char buf[32];
182 snprintf(buf, sizeof(buf), "%02x", (int)shard);
183 key->append(buf);
184 }
185 }
186 static const char *_key_decode_shard(const char *key, shard_id_t *pshard)
187 {
188 if (key[0] == '-') {
189 *pshard = shard_id_t::NO_SHARD;
190 } else {
191 unsigned shard;
192 int r = sscanf(key, "%x", &shard);
193 if (r < 1)
194 return NULL;
195 *pshard = shard_id_t(shard);
196 }
197 return key + 2;
198 }
199
200 static void get_coll_key_range(const coll_t& cid, int bits,
201 string *temp_start, string *temp_end,
202 string *start, string *end)
203 {
204 temp_start->clear();
205 temp_end->clear();
206 start->clear();
207 end->clear();
208
209 spg_t pgid;
210 if (cid.is_pg(&pgid)) {
211 _key_encode_shard(pgid.shard, start);
212 *end = *start;
213 *temp_start = *start;
214 *temp_end = *start;
215
216 _key_encode_u64(pgid.pool() + 0x8000000000000000ull, start);
217 _key_encode_u64((-2ll - pgid.pool()) + 0x8000000000000000ull, temp_start);
218 _key_encode_u32(hobject_t::_reverse_bits(pgid.ps()), start);
219 _key_encode_u32(hobject_t::_reverse_bits(pgid.ps()), temp_start);
220 start->append(".");
221 temp_start->append(".");
222
223 _key_encode_u64(pgid.pool() + 0x8000000000000000ull, end);
224 _key_encode_u64((-2ll - pgid.pool()) + 0x8000000000000000ull, temp_end);
225
226 uint64_t end_hash =
227 hobject_t::_reverse_bits(pgid.ps()) + (1ull << (32-bits));
228 if (end_hash <= 0xffffffffull) {
229 _key_encode_u32(end_hash, end);
230 _key_encode_u32(end_hash, temp_end);
231 end->append(".");
232 temp_end->append(".");
233 } else {
234 _key_encode_u32(0xffffffff, end);
235 _key_encode_u32(0xffffffff, temp_end);
236 end->append(":");
237 temp_end->append(":");
238 }
239 } else {
240 _key_encode_shard(shard_id_t::NO_SHARD, start);
241 _key_encode_u64(-1ull + 0x8000000000000000ull, start);
242 *end = *start;
243 _key_encode_u32(0, start);
244 start->append(".");
245 _key_encode_u32(0xffffffff, end);
246 end->append(":");
247
248 // no separate temp section
249 *temp_start = *end;
250 *temp_end = *end;
251 }
252 }
253
254 static int get_key_object(const string& key, ghobject_t *oid);
255
256 static void get_object_key(CephContext* cct, const ghobject_t& oid,
257 string *key)
258 {
259 key->clear();
260
261 _key_encode_shard(oid.shard_id, key);
262 _key_encode_u64(oid.hobj.pool + 0x8000000000000000ull, key);
263 _key_encode_u32(oid.hobj.get_bitwise_key_u32(), key);
264 key->append(".");
265
266 append_escaped(oid.hobj.nspace, key);
267
268 if (oid.hobj.get_key().length()) {
269 // is a key... could be < = or >.
270 // (ASCII chars < = and > sort in that order, yay)
271 if (oid.hobj.get_key() < oid.hobj.oid.name) {
272 key->append("<");
273 append_escaped(oid.hobj.get_key(), key);
274 append_escaped(oid.hobj.oid.name, key);
275 } else if (oid.hobj.get_key() > oid.hobj.oid.name) {
276 key->append(">");
277 append_escaped(oid.hobj.get_key(), key);
278 append_escaped(oid.hobj.oid.name, key);
279 } else {
280 // same as no key
281 key->append("=");
282 append_escaped(oid.hobj.oid.name, key);
283 }
284 } else {
285 // no key
286 key->append("=");
287 append_escaped(oid.hobj.oid.name, key);
288 }
289
290 _key_encode_u64(oid.hobj.snap, key);
291 _key_encode_u64(oid.generation, key);
292
293 // sanity check
294 if (true) {
295 ghobject_t t;
296 int r = get_key_object(*key, &t);
297 if (r || t != oid) {
298 derr << " r " << r << dendl;
299 derr << "key " << pretty_binary_string(*key) << dendl;
300 derr << "oid " << oid << dendl;
301 derr << " t " << t << dendl;
302 ceph_assert(t == oid);
303 }
304 }
305 }
306
307 static int get_key_object(const string& key, ghobject_t *oid)
308 {
309 int r;
310 const char *p = key.c_str();
311
312 p = _key_decode_shard(p, &oid->shard_id);
313
314 uint64_t pool;
315 p = _key_decode_u64(p, &pool);
316 oid->hobj.pool = pool - 0x8000000000000000ull;
317
318 unsigned hash;
319 p = _key_decode_u32(p, &hash);
320 oid->hobj.set_bitwise_key_u32(hash);
321 if (*p != '.')
322 return -5;
323 ++p;
324
325 r = decode_escaped(p, &oid->hobj.nspace);
326 if (r < 0)
327 return -6;
328 p += r + 1;
329
330 if (*p == '=') {
331 // no key
332 ++p;
333 r = decode_escaped(p, &oid->hobj.oid.name);
334 if (r < 0)
335 return -7;
336 p += r + 1;
337 } else if (*p == '<' || *p == '>') {
338 // key + name
339 ++p;
340 string okey;
341 r = decode_escaped(p, &okey);
342 if (r < 0)
343 return -8;
344 p += r + 1;
345 r = decode_escaped(p, &oid->hobj.oid.name);
346 if (r < 0)
347 return -9;
348 p += r + 1;
349 oid->hobj.set_key(okey);
350 } else {
351 // malformed
352 return -10;
353 }
354
355 p = _key_decode_u64(p, &oid->hobj.snap.val);
356 p = _key_decode_u64(p, &oid->generation);
357 if (*p) {
358 // if we get something other than a null terminator here,
359 // something goes wrong.
360 return -12;
361 }
362
363 return 0;
364 }
365
366
367 static void get_data_key(uint64_t nid, uint64_t offset, string *out)
368 {
369 _key_encode_u64(nid, out);
370 _key_encode_u64(offset, out);
371 }
372
373 // '-' < '.' < '~'
374 static void get_omap_header(uint64_t id, string *out)
375 {
376 _key_encode_u64(id, out);
377 out->push_back('-');
378 }
379
380 // hmm, I don't think there's any need to escape the user key since we
381 // have a clean prefix.
382 static void get_omap_key(uint64_t id, const string& key, string *out)
383 {
384 _key_encode_u64(id, out);
385 out->push_back('.');
386 out->append(key);
387 }
388
389 static void rewrite_omap_key(uint64_t id, string old, string *out)
390 {
391 _key_encode_u64(id, out);
392 out->append(old.substr(out->length()));
393 }
394
395 static void decode_omap_key(const string& key, string *user_key)
396 {
397 *user_key = key.substr(sizeof(uint64_t) + 1);
398 }
399
400 static void get_omap_tail(uint64_t id, string *out)
401 {
402 _key_encode_u64(id, out);
403 out->push_back('~');
404 }
405
406
407
408 // Onode
409
410 #undef dout_prefix
411 #define dout_prefix *_dout << "kstore.onode(" << this << ") "
412
413 void KStore::Onode::flush()
414 {
415 std::unique_lock<std::mutex> l(flush_lock);
416 dout(20) << __func__ << " " << flush_txns << dendl;
417 while (!flush_txns.empty())
418 flush_cond.wait(l);
419 dout(20) << __func__ << " done" << dendl;
420 }
421
422 // OnodeHashLRU
423
424 #undef dout_prefix
425 #define dout_prefix *_dout << "kstore.lru(" << this << ") "
426
427 void KStore::OnodeHashLRU::_touch(OnodeRef o)
428 {
429 lru_list_t::iterator p = lru.iterator_to(*o);
430 lru.erase(p);
431 lru.push_front(*o);
432 }
433
434 void KStore::OnodeHashLRU::add(const ghobject_t& oid, OnodeRef o)
435 {
436 std::lock_guard<std::mutex> l(lock);
437 dout(30) << __func__ << " " << oid << " " << o << dendl;
438 ceph_assert(onode_map.count(oid) == 0);
439 onode_map[oid] = o;
440 lru.push_front(*o);
441 }
442
443 KStore::OnodeRef KStore::OnodeHashLRU::lookup(const ghobject_t& oid)
444 {
445 std::lock_guard<std::mutex> l(lock);
446 dout(30) << __func__ << dendl;
447 ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.find(oid);
448 if (p == onode_map.end()) {
449 dout(30) << __func__ << " " << oid << " miss" << dendl;
450 return OnodeRef();
451 }
452 dout(30) << __func__ << " " << oid << " hit " << p->second << dendl;
453 _touch(p->second);
454 return p->second;
455 }
456
457 void KStore::OnodeHashLRU::clear()
458 {
459 std::lock_guard<std::mutex> l(lock);
460 dout(10) << __func__ << dendl;
461 lru.clear();
462 onode_map.clear();
463 }
464
465 void KStore::OnodeHashLRU::rename(const ghobject_t& old_oid,
466 const ghobject_t& new_oid)
467 {
468 std::lock_guard<std::mutex> l(lock);
469 dout(30) << __func__ << " " << old_oid << " -> " << new_oid << dendl;
470 ceph::unordered_map<ghobject_t,OnodeRef>::iterator po, pn;
471 po = onode_map.find(old_oid);
472 pn = onode_map.find(new_oid);
473
474 ceph_assert(po != onode_map.end());
475 if (pn != onode_map.end()) {
476 lru_list_t::iterator p = lru.iterator_to(*pn->second);
477 lru.erase(p);
478 onode_map.erase(pn);
479 }
480 OnodeRef o = po->second;
481
482 // install a non-existent onode it its place
483 po->second.reset(new Onode(cct, old_oid, o->key));
484 lru.push_back(*po->second);
485
486 // fix oid, key
487 onode_map.insert(make_pair(new_oid, o));
488 _touch(o);
489 o->oid = new_oid;
490 get_object_key(cct, new_oid, &o->key);
491 }
492
493 bool KStore::OnodeHashLRU::get_next(
494 const ghobject_t& after,
495 pair<ghobject_t,OnodeRef> *next)
496 {
497 std::lock_guard<std::mutex> l(lock);
498 dout(20) << __func__ << " after " << after << dendl;
499
500 if (after == ghobject_t()) {
501 if (lru.empty()) {
502 return false;
503 }
504 ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.begin();
505 ceph_assert(p != onode_map.end());
506 next->first = p->first;
507 next->second = p->second;
508 return true;
509 }
510
511 ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.find(after);
512 ceph_assert(p != onode_map.end()); // for now
513 lru_list_t::iterator pi = lru.iterator_to(*p->second);
514 ++pi;
515 if (pi == lru.end()) {
516 return false;
517 }
518 next->first = pi->oid;
519 next->second = onode_map[pi->oid];
520 return true;
521 }
522
523 int KStore::OnodeHashLRU::trim(int max)
524 {
525 std::lock_guard<std::mutex> l(lock);
526 dout(20) << __func__ << " max " << max
527 << " size " << onode_map.size() << dendl;
528 int trimmed = 0;
529 int num = onode_map.size() - max;
530 if (onode_map.size() == 0 || num <= 0)
531 return 0; // don't even try
532
533 lru_list_t::iterator p = lru.end();
534 if (num)
535 --p;
536 while (num > 0) {
537 Onode *o = &*p;
538 int refs = o->nref.load();
539 if (refs > 1) {
540 dout(20) << __func__ << " " << o->oid << " has " << refs
541 << " refs; stopping with " << num << " left to trim" << dendl;
542 break;
543 }
544 dout(30) << __func__ << " trim " << o->oid << dendl;
545 if (p != lru.begin()) {
546 lru.erase(p--);
547 } else {
548 lru.erase(p);
549 ceph_assert(num == 1);
550 }
551 o->get(); // paranoia
552 onode_map.erase(o->oid);
553 o->put();
554 --num;
555 ++trimmed;
556 }
557 return trimmed;
558 }
559
560 // =======================================================
561
562 // Collection
563
564 #undef dout_prefix
565 #define dout_prefix *_dout << "kstore(" << store->path << ").collection(" << cid << ") "
566
567 KStore::Collection::Collection(KStore *ns, coll_t cid)
568 : CollectionImpl(ns->cct, cid),
569 store(ns),
570 osr(new OpSequencer()),
571 onode_map(store->cct)
572 {
573 }
574
575 void KStore::Collection::flush()
576 {
577 osr->flush();
578 }
579
580 bool KStore::Collection::flush_commit(Context *c)
581 {
582 return osr->flush_commit(c);
583 }
584
585
586 KStore::OnodeRef KStore::Collection::get_onode(
587 const ghobject_t& oid,
588 bool create)
589 {
590 ceph_assert(create ? ceph_mutex_is_wlocked(lock) : ceph_mutex_is_locked(lock));
591
592 spg_t pgid;
593 if (cid.is_pg(&pgid)) {
594 if (!oid.match(cnode.bits, pgid.ps())) {
595 lderr(store->cct) << __func__ << " oid " << oid << " not part of "
596 << pgid << " bits " << cnode.bits << dendl;
597 ceph_abort();
598 }
599 }
600
601 OnodeRef o = onode_map.lookup(oid);
602 if (o)
603 return o;
604
605 string key;
606 get_object_key(store->cct, oid, &key);
607
608 ldout(store->cct, 20) << __func__ << " oid " << oid << " key "
609 << pretty_binary_string(key) << dendl;
610
611 bufferlist v;
612 int r = store->db->get(PREFIX_OBJ, key, &v);
613 ldout(store->cct, 20) << " r " << r << " v.len " << v.length() << dendl;
614 Onode *on;
615 if (v.length() == 0) {
616 ceph_assert(r == -ENOENT);
617 if (!create)
618 return OnodeRef();
619
620 // new
621 on = new Onode(store->cct, oid, key);
622 on->dirty = true;
623 } else {
624 // loaded
625 ceph_assert(r >=0);
626 on = new Onode(store->cct, oid, key);
627 on->exists = true;
628 auto p = v.cbegin();
629 decode(on->onode, p);
630 }
631 o.reset(on);
632 onode_map.add(oid, o);
633 return o;
634 }
635
636
637
638 // =======================================================
639
640 #undef dout_prefix
641 #define dout_prefix *_dout << "kstore(" << path << ") "
642
643 KStore::KStore(CephContext *cct, const string& path)
644 : ObjectStore(cct, path),
645 db(NULL),
646 basedir(path),
647 path_fd(-1),
648 fsid_fd(-1),
649 mounted(false),
650 nid_last(0),
651 nid_max(0),
652 throttle_ops(cct, "kstore_max_ops", cct->_conf->kstore_max_ops),
653 throttle_bytes(cct, "kstore_max_bytes", cct->_conf->kstore_max_bytes),
654 finisher(cct),
655 kv_sync_thread(this),
656 kv_stop(false),
657 logger(nullptr)
658 {
659 _init_logger();
660 }
661
662 KStore::~KStore()
663 {
664 _shutdown_logger();
665 ceph_assert(!mounted);
666 ceph_assert(db == NULL);
667 ceph_assert(fsid_fd < 0);
668 }
669
670 void KStore::_init_logger()
671 {
672 // XXX
673 PerfCountersBuilder b(cct, "KStore",
674 l_kstore_first, l_kstore_last);
675 b.add_time_avg(l_kstore_state_prepare_lat, "state_prepare_lat", "Average prepare state latency");
676 b.add_time_avg(l_kstore_state_kv_queued_lat, "state_kv_queued_lat", "Average kv_queued state latency");
677 b.add_time_avg(l_kstore_state_kv_done_lat, "state_kv_done_lat", "Average kv_done state latency");
678 b.add_time_avg(l_kstore_state_finishing_lat, "state_finishing_lat", "Average finishing state latency");
679 b.add_time_avg(l_kstore_state_done_lat, "state_done_lat", "Average done state latency");
680 logger = b.create_perf_counters();
681 cct->get_perfcounters_collection()->add(logger);
682 }
683
684 void KStore::_shutdown_logger()
685 {
686 // XXX
687 cct->get_perfcounters_collection()->remove(logger);
688 delete logger;
689 }
690
691 int KStore::_open_path()
692 {
693 ceph_assert(path_fd < 0);
694 path_fd = ::open(path.c_str(), O_DIRECTORY|O_CLOEXEC);
695 if (path_fd < 0) {
696 int r = -errno;
697 derr << __func__ << " unable to open " << path << ": " << cpp_strerror(r)
698 << dendl;
699 return r;
700 }
701 return 0;
702 }
703
704 void KStore::_close_path()
705 {
706 VOID_TEMP_FAILURE_RETRY(::close(path_fd));
707 path_fd = -1;
708 }
709
710 int KStore::_open_fsid(bool create)
711 {
712 ceph_assert(fsid_fd < 0);
713 int flags = O_RDWR;
714 if (create)
715 flags |= O_CREAT;
716 fsid_fd = ::openat(path_fd, "fsid", flags, 0644);
717 if (fsid_fd < 0) {
718 int err = -errno;
719 derr << __func__ << " " << cpp_strerror(err) << dendl;
720 return err;
721 }
722 return 0;
723 }
724
725 int KStore::_read_fsid(uuid_d *uuid)
726 {
727 char fsid_str[40];
728 memset(fsid_str, 0, sizeof(fsid_str));
729 int ret = safe_read(fsid_fd, fsid_str, sizeof(fsid_str));
730 if (ret < 0) {
731 derr << __func__ << " failed: " << cpp_strerror(ret) << dendl;
732 return ret;
733 }
734 if (ret > 36)
735 fsid_str[36] = 0;
736 else
737 fsid_str[ret] = 0;
738 if (!uuid->parse(fsid_str)) {
739 derr << __func__ << " unparsable uuid " << fsid_str << dendl;
740 return -EINVAL;
741 }
742 return 0;
743 }
744
745 int KStore::_write_fsid()
746 {
747 int r = ::ftruncate(fsid_fd, 0);
748 if (r < 0) {
749 r = -errno;
750 derr << __func__ << " fsid truncate failed: " << cpp_strerror(r) << dendl;
751 return r;
752 }
753 string str = stringify(fsid) + "\n";
754 r = safe_write(fsid_fd, str.c_str(), str.length());
755 if (r < 0) {
756 derr << __func__ << " fsid write failed: " << cpp_strerror(r) << dendl;
757 return r;
758 }
759 r = ::fsync(fsid_fd);
760 if (r < 0) {
761 r = -errno;
762 derr << __func__ << " fsid fsync failed: " << cpp_strerror(r) << dendl;
763 return r;
764 }
765 return 0;
766 }
767
768 void KStore::_close_fsid()
769 {
770 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
771 fsid_fd = -1;
772 }
773
774 int KStore::_lock_fsid()
775 {
776 struct flock l;
777 memset(&l, 0, sizeof(l));
778 l.l_type = F_WRLCK;
779 l.l_whence = SEEK_SET;
780 l.l_start = 0;
781 l.l_len = 0;
782 int r = ::fcntl(fsid_fd, F_SETLK, &l);
783 if (r < 0) {
784 int err = errno;
785 derr << __func__ << " failed to lock " << path << "/fsid"
786 << " (is another ceph-osd still running?)"
787 << cpp_strerror(err) << dendl;
788 return -err;
789 }
790 return 0;
791 }
792
793 bool KStore::test_mount_in_use()
794 {
795 // most error conditions mean the mount is not in use (e.g., because
796 // it doesn't exist). only if we fail to lock do we conclude it is
797 // in use.
798 bool ret = false;
799 int r = _open_path();
800 if (r < 0)
801 return false;
802 r = _open_fsid(false);
803 if (r < 0)
804 goto out_path;
805 r = _lock_fsid();
806 if (r < 0)
807 ret = true; // if we can't lock, it is in use
808 _close_fsid();
809 out_path:
810 _close_path();
811 return ret;
812 }
813
814 int KStore::_open_db(bool create)
815 {
816 int r;
817 ceph_assert(!db);
818 char fn[PATH_MAX];
819 snprintf(fn, sizeof(fn), "%s/db", path.c_str());
820
821 string kv_backend;
822 if (create) {
823 kv_backend = cct->_conf->kstore_backend;
824 } else {
825 r = read_meta("kv_backend", &kv_backend);
826 if (r < 0) {
827 derr << __func__ << " uanble to read 'kv_backend' meta" << dendl;
828 return -EIO;
829 }
830 }
831 dout(10) << __func__ << " kv_backend = " << kv_backend << dendl;
832
833 if (create) {
834 int r = ::mkdir(fn, 0755);
835 if (r < 0)
836 r = -errno;
837 if (r < 0 && r != -EEXIST) {
838 derr << __func__ << " failed to create " << fn << ": " << cpp_strerror(r)
839 << dendl;
840 return r;
841 }
842
843 // wal_dir, too!
844 char walfn[PATH_MAX];
845 snprintf(walfn, sizeof(walfn), "%s/db.wal", path.c_str());
846 r = ::mkdir(walfn, 0755);
847 if (r < 0)
848 r = -errno;
849 if (r < 0 && r != -EEXIST) {
850 derr << __func__ << " failed to create " << walfn
851 << ": " << cpp_strerror(r)
852 << dendl;
853 return r;
854 }
855 }
856
857 db = KeyValueDB::create(cct, kv_backend, fn);
858 if (!db) {
859 derr << __func__ << " error creating db" << dendl;
860 return -EIO;
861 }
862 string options;
863 if (kv_backend == "rocksdb")
864 options = cct->_conf->kstore_rocksdb_options;
865 db->init(options);
866 stringstream err;
867 if (create)
868 r = db->create_and_open(err);
869 else
870 r = db->open(err);
871 if (r) {
872 derr << __func__ << " erroring opening db: " << err.str() << dendl;
873 delete db;
874 db = NULL;
875 return -EIO;
876 }
877 dout(1) << __func__ << " opened " << kv_backend
878 << " path " << fn << " options " << options << dendl;
879 return 0;
880 }
881
882 void KStore::_close_db()
883 {
884 ceph_assert(db);
885 delete db;
886 db = NULL;
887 }
888
889 int KStore::_open_collections(int *errors)
890 {
891 ceph_assert(coll_map.empty());
892 KeyValueDB::Iterator it = db->get_iterator(PREFIX_COLL);
893 for (it->upper_bound(string());
894 it->valid();
895 it->next()) {
896 coll_t cid;
897 if (cid.parse(it->key())) {
898 auto c = ceph::make_ref<Collection>(this, cid);
899 bufferlist bl = it->value();
900 auto p = bl.cbegin();
901 try {
902 decode(c->cnode, p);
903 } catch (buffer::error& e) {
904 derr << __func__ << " failed to decode cnode, key:"
905 << pretty_binary_string(it->key()) << dendl;
906 return -EIO;
907 }
908 dout(20) << __func__ << " opened " << cid << dendl;
909 coll_map[cid] = c;
910 } else {
911 derr << __func__ << " unrecognized collection " << it->key() << dendl;
912 if (errors)
913 (*errors)++;
914 }
915 }
916 return 0;
917 }
918
919 int KStore::mkfs()
920 {
921 dout(1) << __func__ << " path " << path << dendl;
922 int r;
923 uuid_d old_fsid;
924
925 r = _open_path();
926 if (r < 0)
927 return r;
928
929 r = _open_fsid(true);
930 if (r < 0)
931 goto out_path_fd;
932
933 r = _lock_fsid();
934 if (r < 0)
935 goto out_close_fsid;
936
937 r = _read_fsid(&old_fsid);
938 if (r < 0 || old_fsid.is_zero()) {
939 if (fsid.is_zero()) {
940 fsid.generate_random();
941 dout(1) << __func__ << " generated fsid " << fsid << dendl;
942 } else {
943 dout(1) << __func__ << " using provided fsid " << fsid << dendl;
944 }
945 // we'll write it last.
946 } else {
947 if (!fsid.is_zero() && fsid != old_fsid) {
948 derr << __func__ << " on-disk fsid " << old_fsid
949 << " != provided " << fsid << dendl;
950 r = -EINVAL;
951 goto out_close_fsid;
952 }
953 fsid = old_fsid;
954 dout(1) << __func__ << " already created, fsid is " << fsid << dendl;
955 goto out_close_fsid;
956 }
957
958 r = _open_db(true);
959 if (r < 0)
960 goto out_close_fsid;
961
962 r = write_meta("kv_backend", cct->_conf->kstore_backend);
963 if (r < 0)
964 goto out_close_db;
965
966 r = write_meta("type", "kstore");
967 if (r < 0)
968 goto out_close_db;
969
970 // indicate mkfs completion/success by writing the fsid file
971 r = _write_fsid();
972 if (r == 0)
973 dout(10) << __func__ << " success" << dendl;
974 else
975 derr << __func__ << " error writing fsid: " << cpp_strerror(r) << dendl;
976
977 out_close_db:
978 _close_db();
979 out_close_fsid:
980 _close_fsid();
981 out_path_fd:
982 _close_path();
983 return r;
984 }
985
986 int KStore::mount()
987 {
988 dout(1) << __func__ << " path " << path << dendl;
989
990 if (cct->_conf->kstore_fsck_on_mount) {
991 int rc = fsck(cct->_conf->kstore_fsck_on_mount_deep);
992 if (rc < 0)
993 return rc;
994 }
995
996 int r = _open_path();
997 if (r < 0)
998 return r;
999 r = _open_fsid(false);
1000 if (r < 0)
1001 goto out_path;
1002
1003 r = _read_fsid(&fsid);
1004 if (r < 0)
1005 goto out_fsid;
1006
1007 r = _lock_fsid();
1008 if (r < 0)
1009 goto out_fsid;
1010
1011 r = _open_db(false);
1012 if (r < 0)
1013 goto out_fsid;
1014
1015 r = _open_super_meta();
1016 if (r < 0)
1017 goto out_db;
1018
1019 r = _open_collections();
1020 if (r < 0)
1021 goto out_db;
1022
1023 finisher.start();
1024 kv_sync_thread.create("kstore_kv_sync");
1025
1026 mounted = true;
1027 return 0;
1028
1029 out_db:
1030 _close_db();
1031 out_fsid:
1032 _close_fsid();
1033 out_path:
1034 _close_path();
1035 return r;
1036 }
1037
1038 int KStore::umount()
1039 {
1040 ceph_assert(mounted);
1041 dout(1) << __func__ << dendl;
1042
1043 _sync();
1044 _reap_collections();
1045 coll_map.clear();
1046
1047 dout(20) << __func__ << " stopping kv thread" << dendl;
1048 _kv_stop();
1049 dout(20) << __func__ << " draining finisher" << dendl;
1050 finisher.wait_for_empty();
1051 dout(20) << __func__ << " stopping finisher" << dendl;
1052 finisher.stop();
1053 dout(20) << __func__ << " closing" << dendl;
1054
1055 mounted = false;
1056 _close_db();
1057 _close_fsid();
1058 _close_path();
1059 return 0;
1060 }
1061
1062 int KStore::fsck(bool deep)
1063 {
1064 dout(1) << __func__ << dendl;
1065 int errors = 0;
1066 dout(1) << __func__ << " finish with " << errors << " errors" << dendl;
1067 return errors;
1068 }
1069
1070 void KStore::_sync()
1071 {
1072 dout(10) << __func__ << dendl;
1073
1074 std::unique_lock<std::mutex> l(kv_lock);
1075 while (!kv_committing.empty() ||
1076 !kv_queue.empty()) {
1077 dout(20) << " waiting for kv to commit" << dendl;
1078 kv_sync_cond.wait(l);
1079 }
1080
1081 dout(10) << __func__ << " done" << dendl;
1082 }
1083
1084 int KStore::statfs(struct store_statfs_t* buf0, osd_alert_list_t* alerts)
1085 {
1086 struct statfs buf;
1087 buf0->reset();
1088 if (alerts) {
1089 alerts->clear(); // returns nothing for now
1090 }
1091 if (::statfs(basedir.c_str(), &buf) < 0) {
1092 int r = -errno;
1093 ceph_assert(r != -ENOENT);
1094 return r;
1095 }
1096
1097 buf0->total = buf.f_blocks * buf.f_bsize;
1098 buf0->available = buf.f_bavail * buf.f_bsize;
1099
1100 return 0;
1101 }
1102
1103 ObjectStore::CollectionHandle KStore::open_collection(const coll_t& cid)
1104 {
1105 return _get_collection(cid);
1106 }
1107
1108 ObjectStore::CollectionHandle KStore::create_new_collection(const coll_t& cid)
1109 {
1110 auto c = ceph::make_ref<Collection>(this, cid);
1111 std::unique_lock l{coll_lock};
1112 new_coll_map[cid] = c;
1113 return c;
1114 }
1115
1116 int KStore::pool_statfs(uint64_t pool_id, struct store_statfs_t *buf,
1117 bool *per_pool_omap)
1118 {
1119 return -ENOTSUP;
1120 }
1121
1122 // ---------------
1123 // cache
1124
1125 KStore::CollectionRef KStore::_get_collection(coll_t cid)
1126 {
1127 std::shared_lock l{coll_lock};
1128 ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
1129 if (cp == coll_map.end())
1130 return CollectionRef();
1131 return cp->second;
1132 }
1133
1134 void KStore::_queue_reap_collection(CollectionRef& c)
1135 {
1136 dout(10) << __func__ << " " << c->cid << dendl;
1137 std::lock_guard<std::mutex> l(reap_lock);
1138 removed_collections.push_back(c);
1139 }
1140
1141 void KStore::_reap_collections()
1142 {
1143 list<CollectionRef> removed_colls;
1144 std::lock_guard<std::mutex> l(reap_lock);
1145 removed_colls.swap(removed_collections);
1146
1147 for (list<CollectionRef>::iterator p = removed_colls.begin();
1148 p != removed_colls.end();
1149 ++p) {
1150 CollectionRef c = *p;
1151 dout(10) << __func__ << " " << c->cid << dendl;
1152 {
1153 pair<ghobject_t,OnodeRef> next;
1154 while (c->onode_map.get_next(next.first, &next)) {
1155 ceph_assert(!next.second->exists);
1156 if (!next.second->flush_txns.empty()) {
1157 dout(10) << __func__ << " " << c->cid << " " << next.second->oid
1158 << " flush_txns " << next.second->flush_txns << dendl;
1159 return;
1160 }
1161 }
1162 }
1163 c->onode_map.clear();
1164 dout(10) << __func__ << " " << c->cid << " done" << dendl;
1165 }
1166
1167 dout(10) << __func__ << " all reaped" << dendl;
1168 }
1169
1170 // ---------------
1171 // read operations
1172
1173 bool KStore::exists(CollectionHandle& ch, const ghobject_t& oid)
1174 {
1175 dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
1176 Collection *c = static_cast<Collection*>(ch.get());
1177 std::shared_lock l{c->lock};
1178 OnodeRef o = c->get_onode(oid, false);
1179 if (!o || !o->exists)
1180 return false;
1181 return true;
1182 }
1183
1184 int KStore::stat(
1185 CollectionHandle& ch,
1186 const ghobject_t& oid,
1187 struct stat *st,
1188 bool allow_eio)
1189 {
1190 dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
1191 Collection *c = static_cast<Collection*>(ch.get());
1192 std::shared_lock l{c->lock};
1193 OnodeRef o = c->get_onode(oid, false);
1194 if (!o || !o->exists)
1195 return -ENOENT;
1196 st->st_size = o->onode.size;
1197 st->st_blksize = 4096;
1198 st->st_blocks = (st->st_size + st->st_blksize - 1) / st->st_blksize;
1199 st->st_nlink = 1;
1200 return 0;
1201 }
1202
1203 int KStore::set_collection_opts(
1204 CollectionHandle& ch,
1205 const pool_opts_t& opts)
1206 {
1207 return -EOPNOTSUPP;
1208 }
1209
1210 int KStore::read(
1211 CollectionHandle& ch,
1212 const ghobject_t& oid,
1213 uint64_t offset,
1214 size_t length,
1215 bufferlist& bl,
1216 uint32_t op_flags)
1217 {
1218 dout(15) << __func__ << " " << ch->cid << " " << oid
1219 << " " << offset << "~" << length
1220 << dendl;
1221 bl.clear();
1222 Collection *c = static_cast<Collection*>(ch.get());
1223 std::shared_lock l{c->lock};
1224
1225 int r;
1226
1227 OnodeRef o = c->get_onode(oid, false);
1228 if (!o || !o->exists) {
1229 r = -ENOENT;
1230 goto out;
1231 }
1232
1233 if (offset == length && offset == 0)
1234 length = o->onode.size;
1235
1236 r = _do_read(o, offset, length, bl, false, op_flags);
1237
1238 out:
1239 dout(10) << __func__ << " " << ch->cid << " " << oid
1240 << " " << offset << "~" << length
1241 << " = " << r << dendl;
1242 return r;
1243 }
1244
1245 int KStore::_do_read(
1246 OnodeRef o,
1247 uint64_t offset,
1248 size_t length,
1249 bufferlist& bl,
1250 bool do_cache,
1251 uint32_t op_flags)
1252 {
1253 int r = 0;
1254 uint64_t stripe_size = o->onode.stripe_size;
1255 uint64_t stripe_off;
1256
1257 dout(20) << __func__ << " " << offset << "~" << length << " size "
1258 << o->onode.size << " nid " << o->onode.nid << dendl;
1259 bl.clear();
1260
1261 if (offset > o->onode.size) {
1262 goto out;
1263 }
1264 if (offset + length > o->onode.size) {
1265 length = o->onode.size - offset;
1266 }
1267 if (stripe_size == 0) {
1268 bl.append_zero(length);
1269 r = length;
1270 goto out;
1271 }
1272
1273 o->flush();
1274
1275 stripe_off = offset % stripe_size;
1276 while (length > 0) {
1277 bufferlist stripe;
1278 _do_read_stripe(o, offset - stripe_off, &stripe, do_cache);
1279 dout(30) << __func__ << " stripe " << offset - stripe_off << " got "
1280 << stripe.length() << dendl;
1281 unsigned swant = std::min<unsigned>(stripe_size - stripe_off, length);
1282 if (stripe.length()) {
1283 if (swant == stripe.length()) {
1284 bl.claim_append(stripe);
1285 dout(30) << __func__ << " taking full stripe" << dendl;
1286 } else {
1287 unsigned l = 0;
1288 if (stripe_off < stripe.length()) {
1289 l = std::min<uint64_t>(stripe.length() - stripe_off, swant);
1290 bufferlist t;
1291 t.substr_of(stripe, stripe_off, l);
1292 bl.claim_append(t);
1293 dout(30) << __func__ << " taking " << stripe_off << "~" << l << dendl;
1294 }
1295 if (l < swant) {
1296 bl.append_zero(swant - l);
1297 dout(30) << __func__ << " adding " << swant - l << " zeros" << dendl;
1298 }
1299 }
1300 } else {
1301 dout(30) << __func__ << " generating " << swant << " zeros" << dendl;
1302 bl.append_zero(swant);
1303 }
1304 offset += swant;
1305 length -= swant;
1306 stripe_off = 0;
1307 }
1308 r = bl.length();
1309 dout(30) << " result:\n";
1310 bl.hexdump(*_dout);
1311 *_dout << dendl;
1312
1313 out:
1314 return r;
1315 }
1316
1317 int KStore::fiemap(
1318 CollectionHandle& ch,
1319 const ghobject_t& oid,
1320 uint64_t offset,
1321 size_t len,
1322 bufferlist& bl)
1323 {
1324 map<uint64_t, uint64_t> m;
1325 int r = fiemap(ch, oid, offset, len, m);
1326 if (r >= 0) {
1327 encode(m, bl);
1328 }
1329 return r;
1330 }
1331
1332 int KStore::fiemap(
1333 CollectionHandle& ch,
1334 const ghobject_t& oid,
1335 uint64_t offset,
1336 size_t len,
1337 map<uint64_t, uint64_t>& destmap)
1338 {
1339 CollectionRef c = static_cast<Collection*>(ch.get());
1340 if (!c)
1341 return -ENOENT;
1342 std::shared_lock l{c->lock};
1343
1344 OnodeRef o = c->get_onode(oid, false);
1345 if (!o || !o->exists) {
1346 return -ENOENT;
1347 }
1348
1349 if (offset > o->onode.size)
1350 goto out;
1351
1352 if (offset + len > o->onode.size) {
1353 len = o->onode.size - offset;
1354 }
1355
1356 dout(20) << __func__ << " " << offset << "~" << len << " size "
1357 << o->onode.size << dendl;
1358
1359 // FIXME: do something smarter here
1360 destmap[0] = o->onode.size;
1361
1362 out:
1363 dout(20) << __func__ << " " << offset << "~" << len
1364 << " size = 0 (" << destmap << ")" << dendl;
1365 return 0;
1366 }
1367
1368 int KStore::getattr(
1369 CollectionHandle& ch,
1370 const ghobject_t& oid,
1371 const char *name,
1372 bufferptr& value)
1373 {
1374 dout(15) << __func__ << " " << ch->cid << " " << oid << " " << name << dendl;
1375 Collection *c = static_cast<Collection*>(ch.get());
1376 std::shared_lock l{c->lock};
1377 int r;
1378 string k(name);
1379
1380 OnodeRef o = c->get_onode(oid, false);
1381 if (!o || !o->exists) {
1382 r = -ENOENT;
1383 goto out;
1384 }
1385
1386 if (!o->onode.attrs.count(k)) {
1387 r = -ENODATA;
1388 goto out;
1389 }
1390 value = o->onode.attrs[k];
1391 r = 0;
1392 out:
1393 dout(10) << __func__ << " " << ch->cid << " " << oid << " " << name
1394 << " = " << r << dendl;
1395 return r;
1396 }
1397
1398 int KStore::getattrs(
1399 CollectionHandle& ch,
1400 const ghobject_t& oid,
1401 map<string,bufferptr>& aset)
1402 {
1403 dout(15) << __func__ << " " << ch->cid << " " << oid << dendl;
1404 Collection *c = static_cast<Collection*>(ch.get());
1405 std::shared_lock l{c->lock};
1406 int r;
1407
1408 OnodeRef o = c->get_onode(oid, false);
1409 if (!o || !o->exists) {
1410 r = -ENOENT;
1411 goto out;
1412 }
1413 aset = o->onode.attrs;
1414 r = 0;
1415 out:
1416 dout(10) << __func__ << " " << ch->cid << " " << oid
1417 << " = " << r << dendl;
1418 return r;
1419 }
1420
1421 int KStore::list_collections(vector<coll_t>& ls)
1422 {
1423 std::shared_lock l{coll_lock};
1424 for (ceph::unordered_map<coll_t, CollectionRef>::iterator p = coll_map.begin();
1425 p != coll_map.end();
1426 ++p)
1427 ls.push_back(p->first);
1428 return 0;
1429 }
1430
1431 bool KStore::collection_exists(const coll_t& c)
1432 {
1433 std::shared_lock l{coll_lock};
1434 return coll_map.count(c);
1435 }
1436
1437 int KStore::collection_empty(CollectionHandle& ch, bool *empty)
1438 {
1439 dout(15) << __func__ << " " << ch->cid << dendl;
1440 vector<ghobject_t> ls;
1441 ghobject_t next;
1442 int r = collection_list(ch, ghobject_t(), ghobject_t::get_max(), 1,
1443 &ls, &next);
1444 if (r < 0) {
1445 derr << __func__ << " collection_list returned: " << cpp_strerror(r)
1446 << dendl;
1447 return r;
1448 }
1449 *empty = ls.empty();
1450 dout(10) << __func__ << " " << ch->cid << " = " << (int)(*empty) << dendl;
1451 return 0;
1452 }
1453
1454 int KStore::collection_bits(CollectionHandle& ch)
1455 {
1456 dout(15) << __func__ << " " << ch->cid << dendl;
1457 Collection *c = static_cast<Collection*>(ch.get());
1458 std::shared_lock l{c->lock};
1459 dout(10) << __func__ << " " << ch->cid << " = " << c->cnode.bits << dendl;
1460 return c->cnode.bits;
1461 }
1462
1463 int KStore::collection_list(
1464 CollectionHandle &c_, const ghobject_t& start, const ghobject_t& end, int max,
1465 vector<ghobject_t> *ls, ghobject_t *pnext)
1466
1467 {
1468 Collection *c = static_cast<Collection*>(c_.get());
1469 c->flush();
1470 dout(15) << __func__ << " " << c->cid
1471 << " start " << start << " end " << end << " max " << max << dendl;
1472 int r;
1473 {
1474 std::shared_lock l{c->lock};
1475 r = _collection_list(c, start, end, max, ls, pnext);
1476 }
1477
1478 dout(10) << __func__ << " " << c->cid
1479 << " start " << start << " end " << end << " max " << max
1480 << " = " << r << ", ls.size() = " << ls->size()
1481 << ", next = " << (pnext ? *pnext : ghobject_t()) << dendl;
1482 return r;
1483 }
1484
1485 int KStore::_collection_list(
1486 Collection* c, const ghobject_t& start, const ghobject_t& end, int max,
1487 vector<ghobject_t> *ls, ghobject_t *pnext)
1488 {
1489 int r = 0;
1490 KeyValueDB::Iterator it;
1491 string temp_start_key, temp_end_key;
1492 string start_key, end_key;
1493 bool set_next = false;
1494 string pend;
1495 bool temp;
1496
1497 ghobject_t static_next;
1498 if (!pnext)
1499 pnext = &static_next;
1500
1501 if (start == ghobject_t::get_max() ||
1502 start.hobj.is_max()) {
1503 goto out;
1504 }
1505 get_coll_key_range(c->cid, c->cnode.bits, &temp_start_key, &temp_end_key,
1506 &start_key, &end_key);
1507 dout(20) << __func__
1508 << " range " << pretty_binary_string(temp_start_key)
1509 << " to " << pretty_binary_string(temp_end_key)
1510 << " and " << pretty_binary_string(start_key)
1511 << " to " << pretty_binary_string(end_key)
1512 << " start " << start << dendl;
1513 it = db->get_iterator(PREFIX_OBJ);
1514 if (start == ghobject_t() || start == c->cid.get_min_hobj()) {
1515 it->upper_bound(temp_start_key);
1516 temp = true;
1517 } else {
1518 string k;
1519 get_object_key(cct, start, &k);
1520 if (start.hobj.is_temp()) {
1521 temp = true;
1522 ceph_assert(k >= temp_start_key && k < temp_end_key);
1523 } else {
1524 temp = false;
1525 ceph_assert(k >= start_key && k < end_key);
1526 }
1527 dout(20) << " start from " << pretty_binary_string(k)
1528 << " temp=" << (int)temp << dendl;
1529 it->lower_bound(k);
1530 }
1531 if (end.hobj.is_max()) {
1532 pend = temp ? temp_end_key : end_key;
1533 } else {
1534 get_object_key(cct, end, &end_key);
1535 if (end.hobj.is_temp()) {
1536 if (temp)
1537 pend = end_key;
1538 else
1539 goto out;
1540 } else {
1541 pend = temp ? temp_end_key : end_key;
1542 }
1543 }
1544 dout(20) << __func__ << " pend " << pretty_binary_string(pend) << dendl;
1545 while (true) {
1546 if (!it->valid() || it->key() >= pend) {
1547 if (!it->valid())
1548 dout(20) << __func__ << " iterator not valid (end of db?)" << dendl;
1549 else
1550 dout(20) << __func__ << " key " << pretty_binary_string(it->key())
1551 << " > " << end << dendl;
1552 if (temp) {
1553 if (end.hobj.is_temp()) {
1554 break;
1555 }
1556 dout(30) << __func__ << " switch to non-temp namespace" << dendl;
1557 temp = false;
1558 it->upper_bound(start_key);
1559 pend = end_key;
1560 dout(30) << __func__ << " pend " << pretty_binary_string(pend) << dendl;
1561 continue;
1562 }
1563 break;
1564 }
1565 dout(20) << __func__ << " key " << pretty_binary_string(it->key()) << dendl;
1566 ghobject_t oid;
1567 int r = get_key_object(it->key(), &oid);
1568 ceph_assert(r == 0);
1569 if (ls->size() >= (unsigned)max) {
1570 dout(20) << __func__ << " reached max " << max << dendl;
1571 *pnext = oid;
1572 set_next = true;
1573 break;
1574 }
1575 ls->push_back(oid);
1576 it->next();
1577 }
1578 out:
1579 if (!set_next) {
1580 *pnext = ghobject_t::get_max();
1581 }
1582 return r;
1583 }
1584
1585 // omap reads
1586
1587 KStore::OmapIteratorImpl::OmapIteratorImpl(
1588 CollectionRef c, OnodeRef o, KeyValueDB::Iterator it)
1589 : c(c), o(o), it(it)
1590 {
1591 std::shared_lock l{c->lock};
1592 if (o->onode.omap_head) {
1593 get_omap_key(o->onode.omap_head, string(), &head);
1594 get_omap_tail(o->onode.omap_head, &tail);
1595 it->lower_bound(head);
1596 }
1597 }
1598
1599 int KStore::OmapIteratorImpl::seek_to_first()
1600 {
1601 std::shared_lock l{c->lock};
1602 if (o->onode.omap_head) {
1603 it->lower_bound(head);
1604 } else {
1605 it = KeyValueDB::Iterator();
1606 }
1607 return 0;
1608 }
1609
1610 int KStore::OmapIteratorImpl::upper_bound(const string& after)
1611 {
1612 std::shared_lock l{c->lock};
1613 if (o->onode.omap_head) {
1614 string key;
1615 get_omap_key(o->onode.omap_head, after, &key);
1616 it->upper_bound(key);
1617 } else {
1618 it = KeyValueDB::Iterator();
1619 }
1620 return 0;
1621 }
1622
1623 int KStore::OmapIteratorImpl::lower_bound(const string& to)
1624 {
1625 std::shared_lock l{c->lock};
1626 if (o->onode.omap_head) {
1627 string key;
1628 get_omap_key(o->onode.omap_head, to, &key);
1629 it->lower_bound(key);
1630 } else {
1631 it = KeyValueDB::Iterator();
1632 }
1633 return 0;
1634 }
1635
1636 bool KStore::OmapIteratorImpl::valid()
1637 {
1638 std::shared_lock l{c->lock};
1639 if (o->onode.omap_head && it->valid() && it->raw_key().second <= tail) {
1640 return true;
1641 } else {
1642 return false;
1643 }
1644 }
1645
1646 int KStore::OmapIteratorImpl::next()
1647 {
1648 std::shared_lock l{c->lock};
1649 if (o->onode.omap_head) {
1650 it->next();
1651 return 0;
1652 } else {
1653 return -1;
1654 }
1655 }
1656
1657 string KStore::OmapIteratorImpl::key()
1658 {
1659 std::shared_lock l{c->lock};
1660 ceph_assert(it->valid());
1661 string db_key = it->raw_key().second;
1662 string user_key;
1663 decode_omap_key(db_key, &user_key);
1664 return user_key;
1665 }
1666
1667 bufferlist KStore::OmapIteratorImpl::value()
1668 {
1669 std::shared_lock l{c->lock};
1670 ceph_assert(it->valid());
1671 return it->value();
1672 }
1673
1674 int KStore::omap_get(
1675 CollectionHandle& ch, ///< [in] Collection containing oid
1676 const ghobject_t &oid, ///< [in] Object containing omap
1677 bufferlist *header, ///< [out] omap header
1678 map<string, bufferlist> *out /// < [out] Key to value map
1679 )
1680 {
1681 dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl;
1682 Collection *c = static_cast<Collection*>(ch.get());
1683 std::shared_lock l{c->lock};
1684 int r = 0;
1685 OnodeRef o = c->get_onode(oid, false);
1686 if (!o || !o->exists) {
1687 r = -ENOENT;
1688 goto out;
1689 }
1690 if (!o->onode.omap_head)
1691 goto out;
1692 o->flush();
1693 {
1694 KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
1695 string head, tail;
1696 get_omap_header(o->onode.omap_head, &head);
1697 get_omap_tail(o->onode.omap_head, &tail);
1698 it->lower_bound(head);
1699 while (it->valid()) {
1700 if (it->key() == head) {
1701 dout(30) << __func__ << " got header" << dendl;
1702 *header = it->value();
1703 } else if (it->key() >= tail) {
1704 dout(30) << __func__ << " reached tail" << dendl;
1705 break;
1706 } else {
1707 string user_key;
1708 decode_omap_key(it->key(), &user_key);
1709 dout(30) << __func__ << " got " << pretty_binary_string(it->key())
1710 << " -> " << user_key << dendl;
1711 ceph_assert(it->key() < tail);
1712 (*out)[user_key] = it->value();
1713 }
1714 it->next();
1715 }
1716 }
1717 out:
1718 dout(10) << __func__ << " " << ch->cid << " oid " << oid << " = " << r << dendl;
1719 return r;
1720 }
1721
1722 int KStore::omap_get_header(
1723 CollectionHandle& ch, ///< [in] Collection containing oid
1724 const ghobject_t &oid, ///< [in] Object containing omap
1725 bufferlist *header, ///< [out] omap header
1726 bool allow_eio ///< [in] don't assert on eio
1727 )
1728 {
1729 dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl;
1730 Collection *c = static_cast<Collection*>(ch.get());
1731 std::shared_lock l{c->lock};
1732 int r = 0;
1733 OnodeRef o = c->get_onode(oid, false);
1734 if (!o || !o->exists) {
1735 r = -ENOENT;
1736 goto out;
1737 }
1738 if (!o->onode.omap_head)
1739 goto out;
1740 o->flush();
1741 {
1742 string head;
1743 get_omap_header(o->onode.omap_head, &head);
1744 if (db->get(PREFIX_OMAP, head, header) >= 0) {
1745 dout(30) << __func__ << " got header" << dendl;
1746 } else {
1747 dout(30) << __func__ << " no header" << dendl;
1748 }
1749 }
1750 out:
1751 dout(10) << __func__ << " " << ch->cid << " oid " << oid << " = " << r << dendl;
1752 return r;
1753 }
1754
1755 int KStore::omap_get_keys(
1756 CollectionHandle& ch, ///< [in] Collection containing oid
1757 const ghobject_t &oid, ///< [in] Object containing omap
1758 set<string> *keys ///< [out] Keys defined on oid
1759 )
1760 {
1761 dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl;
1762 Collection *c = static_cast<Collection*>(ch.get());
1763 std::shared_lock l{c->lock};
1764 int r = 0;
1765 OnodeRef o = c->get_onode(oid, false);
1766 if (!o || !o->exists) {
1767 r = -ENOENT;
1768 goto out;
1769 }
1770 if (!o->onode.omap_head)
1771 goto out;
1772 o->flush();
1773 {
1774 KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
1775 string head, tail;
1776 get_omap_key(o->onode.omap_head, string(), &head);
1777 get_omap_tail(o->onode.omap_head, &tail);
1778 it->lower_bound(head);
1779 while (it->valid()) {
1780 if (it->key() >= tail) {
1781 dout(30) << __func__ << " reached tail" << dendl;
1782 break;
1783 }
1784 string user_key;
1785 decode_omap_key(it->key(), &user_key);
1786 dout(30) << __func__ << " got " << pretty_binary_string(it->key())
1787 << " -> " << user_key << dendl;
1788 ceph_assert(it->key() < tail);
1789 keys->insert(user_key);
1790 it->next();
1791 }
1792 }
1793 out:
1794 dout(10) << __func__ << " " << ch->cid << " oid " << oid << " = " << r << dendl;
1795 return r;
1796 }
1797
1798 int KStore::omap_get_values(
1799 CollectionHandle& ch, ///< [in] Collection containing oid
1800 const ghobject_t &oid, ///< [in] Object containing omap
1801 const set<string> &keys, ///< [in] Keys to get
1802 map<string, bufferlist> *out ///< [out] Returned keys and values
1803 )
1804 {
1805 dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl;
1806 Collection *c = static_cast<Collection*>(ch.get());
1807 std::shared_lock l{c->lock};
1808 int r = 0;
1809 OnodeRef o = c->get_onode(oid, false);
1810 if (!o || !o->exists) {
1811 r = -ENOENT;
1812 goto out;
1813 }
1814 if (!o->onode.omap_head)
1815 goto out;
1816 o->flush();
1817 for (set<string>::const_iterator p = keys.begin(); p != keys.end(); ++p) {
1818 string key;
1819 get_omap_key(o->onode.omap_head, *p, &key);
1820 bufferlist val;
1821 if (db->get(PREFIX_OMAP, key, &val) >= 0) {
1822 dout(30) << __func__ << " got " << pretty_binary_string(key)
1823 << " -> " << *p << dendl;
1824 out->insert(make_pair(*p, val));
1825 }
1826 }
1827 out:
1828 dout(10) << __func__ << " " << ch->cid << " oid " << oid << " = " << r << dendl;
1829 return r;
1830 }
1831
1832 int KStore::omap_check_keys(
1833 CollectionHandle& ch, ///< [in] Collection containing oid
1834 const ghobject_t &oid, ///< [in] Object containing omap
1835 const set<string> &keys, ///< [in] Keys to check
1836 set<string> *out ///< [out] Subset of keys defined on oid
1837 )
1838 {
1839 dout(15) << __func__ << " " << ch->cid << " oid " << oid << dendl;
1840 Collection *c = static_cast<Collection*>(ch.get());
1841 std::shared_lock l{c->lock};
1842 int r = 0;
1843 OnodeRef o = c->get_onode(oid, false);
1844 if (!o || !o->exists) {
1845 r = -ENOENT;
1846 goto out;
1847 }
1848 if (!o->onode.omap_head)
1849 goto out;
1850 o->flush();
1851 for (set<string>::const_iterator p = keys.begin(); p != keys.end(); ++p) {
1852 string key;
1853 get_omap_key(o->onode.omap_head, *p, &key);
1854 bufferlist val;
1855 if (db->get(PREFIX_OMAP, key, &val) >= 0) {
1856 dout(30) << __func__ << " have " << pretty_binary_string(key)
1857 << " -> " << *p << dendl;
1858 out->insert(*p);
1859 } else {
1860 dout(30) << __func__ << " miss " << pretty_binary_string(key)
1861 << " -> " << *p << dendl;
1862 }
1863 }
1864 out:
1865 dout(10) << __func__ << " " << ch->cid << " oid " << oid << " = " << r << dendl;
1866 return r;
1867 }
1868
1869 ObjectMap::ObjectMapIterator KStore::get_omap_iterator(
1870 CollectionHandle& ch, ///< [in] collection
1871 const ghobject_t &oid ///< [in] object
1872 )
1873 {
1874
1875 dout(10) << __func__ << " " << ch->cid << " " << oid << dendl;
1876 Collection *c = static_cast<Collection*>(ch.get());
1877 std::shared_lock l{c->lock};
1878 OnodeRef o = c->get_onode(oid, false);
1879 if (!o || !o->exists) {
1880 dout(10) << __func__ << " " << oid << "doesn't exist" <<dendl;
1881 return ObjectMap::ObjectMapIterator();
1882 }
1883 o->flush();
1884 dout(10) << __func__ << " header = " << o->onode.omap_head <<dendl;
1885 KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
1886 return ObjectMap::ObjectMapIterator(new OmapIteratorImpl(c, o, it));
1887 }
1888
1889
1890 // -----------------
1891 // write helpers
1892
1893 int KStore::_open_super_meta()
1894 {
1895 // nid
1896 {
1897 nid_max = 0;
1898 bufferlist bl;
1899 db->get(PREFIX_SUPER, "nid_max", &bl);
1900 auto p = bl.cbegin();
1901 try {
1902 decode(nid_max, p);
1903 } catch (buffer::error& e) {
1904 }
1905 dout(10) << __func__ << " old nid_max " << nid_max << dendl;
1906 nid_last = nid_max;
1907 }
1908 return 0;
1909 }
1910
1911 void KStore::_assign_nid(TransContext *txc, OnodeRef o)
1912 {
1913 if (o->onode.nid)
1914 return;
1915 std::lock_guard<std::mutex> l(nid_lock);
1916 o->onode.nid = ++nid_last;
1917 dout(20) << __func__ << " " << o->oid << " nid " << o->onode.nid << dendl;
1918 if (nid_last > nid_max) {
1919 nid_max += cct->_conf->kstore_nid_prealloc;
1920 bufferlist bl;
1921 encode(nid_max, bl);
1922 txc->t->set(PREFIX_SUPER, "nid_max", bl);
1923 dout(10) << __func__ << " nid_max now " << nid_max << dendl;
1924 }
1925 }
1926
1927 KStore::TransContext *KStore::_txc_create(OpSequencer *osr)
1928 {
1929 TransContext *txc = new TransContext(osr);
1930 txc->t = db->get_transaction();
1931 osr->queue_new(txc);
1932 dout(20) << __func__ << " osr " << osr << " = " << txc << dendl;
1933 return txc;
1934 }
1935
1936 void KStore::_txc_state_proc(TransContext *txc)
1937 {
1938 while (true) {
1939 dout(10) << __func__ << " txc " << txc
1940 << " " << txc->get_state_name() << dendl;
1941 switch (txc->state) {
1942 case TransContext::STATE_PREPARE:
1943 txc->log_state_latency(logger, l_kstore_state_prepare_lat);
1944 txc->state = TransContext::STATE_KV_QUEUED;
1945 if (!cct->_conf->kstore_sync_transaction) {
1946 std::lock_guard<std::mutex> l(kv_lock);
1947 if (cct->_conf->kstore_sync_submit_transaction) {
1948 int r = db->submit_transaction(txc->t);
1949 ceph_assert(r == 0);
1950 }
1951 kv_queue.push_back(txc);
1952 kv_cond.notify_one();
1953 return;
1954 }
1955 {
1956 int r = db->submit_transaction_sync(txc->t);
1957 ceph_assert(r == 0);
1958 }
1959 break;
1960
1961 case TransContext::STATE_KV_QUEUED:
1962 txc->log_state_latency(logger, l_kstore_state_kv_queued_lat);
1963 txc->state = TransContext::STATE_KV_DONE;
1964 _txc_finish_kv(txc);
1965 // ** fall-thru **
1966
1967 case TransContext::STATE_KV_DONE:
1968 txc->log_state_latency(logger, l_kstore_state_kv_done_lat);
1969 txc->state = TransContext::STATE_FINISHING;
1970 // ** fall-thru **
1971
1972 case TransContext::TransContext::STATE_FINISHING:
1973 txc->log_state_latency(logger, l_kstore_state_finishing_lat);
1974 _txc_finish(txc);
1975 return;
1976
1977 default:
1978 derr << __func__ << " unexpected txc " << txc
1979 << " state " << txc->get_state_name() << dendl;
1980 ceph_abort_msg("unexpected txc state");
1981 return;
1982 }
1983 }
1984 }
1985
1986 void KStore::_txc_finalize(OpSequencer *osr, TransContext *txc)
1987 {
1988 dout(20) << __func__ << " osr " << osr << " txc " << txc
1989 << " onodes " << txc->onodes << dendl;
1990
1991 // finalize onodes
1992 for (set<OnodeRef>::iterator p = txc->onodes.begin();
1993 p != txc->onodes.end();
1994 ++p) {
1995 bufferlist bl;
1996 encode((*p)->onode, bl);
1997 dout(20) << " onode size is " << bl.length() << dendl;
1998 txc->t->set(PREFIX_OBJ, (*p)->key, bl);
1999
2000 std::lock_guard<std::mutex> l((*p)->flush_lock);
2001 (*p)->flush_txns.insert(txc);
2002 }
2003 }
2004
2005 void KStore::_txc_finish_kv(TransContext *txc)
2006 {
2007 dout(20) << __func__ << " txc " << txc << dendl;
2008
2009 // warning: we're calling onreadable_sync inside the sequencer lock
2010 if (txc->onreadable_sync) {
2011 txc->onreadable_sync->complete(0);
2012 txc->onreadable_sync = NULL;
2013 }
2014 if (txc->onreadable) {
2015 finisher.queue(txc->onreadable);
2016 txc->onreadable = NULL;
2017 }
2018 if (txc->oncommit) {
2019 finisher.queue(txc->oncommit);
2020 txc->oncommit = NULL;
2021 }
2022 if (!txc->oncommits.empty()) {
2023 finisher.queue(txc->oncommits);
2024 }
2025
2026 throttle_ops.put(txc->ops);
2027 throttle_bytes.put(txc->bytes);
2028 }
2029
2030 void KStore::_txc_finish(TransContext *txc)
2031 {
2032 dout(20) << __func__ << " " << txc << " onodes " << txc->onodes << dendl;
2033 ceph_assert(txc->state == TransContext::STATE_FINISHING);
2034
2035 for (set<OnodeRef>::iterator p = txc->onodes.begin();
2036 p != txc->onodes.end();
2037 ++p) {
2038 std::lock_guard<std::mutex> l((*p)->flush_lock);
2039 dout(20) << __func__ << " onode " << *p << " had " << (*p)->flush_txns
2040 << dendl;
2041 ceph_assert((*p)->flush_txns.count(txc));
2042 (*p)->flush_txns.erase(txc);
2043 if ((*p)->flush_txns.empty()) {
2044 (*p)->flush_cond.notify_all();
2045 (*p)->clear_pending_stripes();
2046 }
2047 }
2048
2049 // clear out refs
2050 txc->onodes.clear();
2051
2052 while (!txc->removed_collections.empty()) {
2053 _queue_reap_collection(txc->removed_collections.front());
2054 txc->removed_collections.pop_front();
2055 }
2056
2057 OpSequencerRef osr = txc->osr;
2058 {
2059 std::lock_guard<std::mutex> l(osr->qlock);
2060 txc->state = TransContext::STATE_DONE;
2061 }
2062
2063 _osr_reap_done(osr.get());
2064 }
2065
2066 void KStore::_osr_reap_done(OpSequencer *osr)
2067 {
2068 std::lock_guard<std::mutex> l(osr->qlock);
2069 dout(20) << __func__ << " osr " << osr << dendl;
2070 while (!osr->q.empty()) {
2071 TransContext *txc = &osr->q.front();
2072 dout(20) << __func__ << " txc " << txc << " " << txc->get_state_name()
2073 << dendl;
2074 if (txc->state != TransContext::STATE_DONE) {
2075 break;
2076 }
2077
2078 if (txc->first_collection) {
2079 txc->first_collection->onode_map.trim(cct->_conf->kstore_onode_map_size);
2080 }
2081
2082 osr->q.pop_front();
2083 txc->log_state_latency(logger, l_kstore_state_done_lat);
2084 delete txc;
2085 osr->qcond.notify_all();
2086 if (osr->q.empty())
2087 dout(20) << __func__ << " osr " << osr << " q now empty" << dendl;
2088 }
2089 }
2090
2091 void KStore::_kv_sync_thread()
2092 {
2093 dout(10) << __func__ << " start" << dendl;
2094 std::unique_lock<std::mutex> l(kv_lock);
2095 while (true) {
2096 ceph_assert(kv_committing.empty());
2097 if (kv_queue.empty()) {
2098 if (kv_stop)
2099 break;
2100 dout(20) << __func__ << " sleep" << dendl;
2101 kv_sync_cond.notify_all();
2102 kv_cond.wait(l);
2103 dout(20) << __func__ << " wake" << dendl;
2104 } else {
2105 dout(20) << __func__ << " committing " << kv_queue.size() << dendl;
2106 kv_committing.swap(kv_queue);
2107 utime_t start = ceph_clock_now();
2108 l.unlock();
2109
2110 dout(30) << __func__ << " committing txc " << kv_committing << dendl;
2111
2112 // one transaction to force a sync
2113 KeyValueDB::Transaction t = db->get_transaction();
2114 if (!cct->_conf->kstore_sync_submit_transaction) {
2115 for (std::deque<TransContext *>::iterator it = kv_committing.begin();
2116 it != kv_committing.end();
2117 ++it) {
2118 int r = db->submit_transaction((*it)->t);
2119 ceph_assert(r == 0);
2120 }
2121 }
2122 int r = db->submit_transaction_sync(t);
2123 ceph_assert(r == 0);
2124 utime_t finish = ceph_clock_now();
2125 utime_t dur = finish - start;
2126 dout(20) << __func__ << " committed " << kv_committing.size()
2127 << " in " << dur << dendl;
2128 while (!kv_committing.empty()) {
2129 TransContext *txc = kv_committing.front();
2130 _txc_state_proc(txc);
2131 kv_committing.pop_front();
2132 }
2133
2134 // this is as good a place as any ...
2135 _reap_collections();
2136
2137 l.lock();
2138 }
2139 }
2140 dout(10) << __func__ << " finish" << dendl;
2141 }
2142
2143
2144 // ---------------------------
2145 // transactions
2146
2147 int KStore::queue_transactions(
2148 CollectionHandle& ch,
2149 vector<Transaction>& tls,
2150 TrackedOpRef op,
2151 ThreadPool::TPHandle *handle)
2152 {
2153 Context *onreadable;
2154 Context *ondisk;
2155 Context *onreadable_sync;
2156 ObjectStore::Transaction::collect_contexts(
2157 tls, &onreadable, &ondisk, &onreadable_sync);
2158
2159 // set up the sequencer
2160 Collection *c = static_cast<Collection*>(ch.get());
2161 OpSequencer *osr = c->osr.get();
2162 dout(10) << __func__ << " ch " << ch.get() << " " << c->cid << dendl;
2163
2164 // prepare
2165 TransContext *txc = _txc_create(osr);
2166 txc->onreadable = onreadable;
2167 txc->onreadable_sync = onreadable_sync;
2168 txc->oncommit = ondisk;
2169
2170 for (vector<Transaction>::iterator p = tls.begin(); p != tls.end(); ++p) {
2171 txc->ops += (*p).get_num_ops();
2172 txc->bytes += (*p).get_num_bytes();
2173 _txc_add_transaction(txc, &(*p));
2174 }
2175
2176 _txc_finalize(osr, txc);
2177
2178 throttle_ops.get(txc->ops);
2179 throttle_bytes.get(txc->bytes);
2180
2181 // execute (start)
2182 _txc_state_proc(txc);
2183 return 0;
2184 }
2185
2186 void KStore::_txc_add_transaction(TransContext *txc, Transaction *t)
2187 {
2188 Transaction::iterator i = t->begin();
2189
2190 dout(30) << __func__ << " transaction dump:\n";
2191 JSONFormatter f(true);
2192 f.open_object_section("transaction");
2193 t->dump(&f);
2194 f.close_section();
2195 f.flush(*_dout);
2196 *_dout << dendl;
2197
2198 vector<CollectionRef> cvec(i.colls.size());
2199 unsigned j = 0;
2200 for (vector<coll_t>::iterator p = i.colls.begin(); p != i.colls.end();
2201 ++p, ++j) {
2202 cvec[j] = _get_collection(*p);
2203
2204 // note first collection we reference
2205 if (!j && !txc->first_collection)
2206 txc->first_collection = cvec[j];
2207 }
2208 vector<OnodeRef> ovec(i.objects.size());
2209
2210 for (int pos = 0; i.have_op(); ++pos) {
2211 Transaction::Op *op = i.decode_op();
2212 int r = 0;
2213
2214 // no coll or obj
2215 if (op->op == Transaction::OP_NOP)
2216 continue;
2217
2218 // collection operations
2219 CollectionRef &c = cvec[op->cid];
2220 switch (op->op) {
2221 case Transaction::OP_RMCOLL:
2222 {
2223 coll_t cid = i.get_cid(op->cid);
2224 r = _remove_collection(txc, cid, &c);
2225 if (!r)
2226 continue;
2227 }
2228 break;
2229
2230 case Transaction::OP_MKCOLL:
2231 {
2232 ceph_assert(!c);
2233 coll_t cid = i.get_cid(op->cid);
2234 r = _create_collection(txc, cid, op->split_bits, &c);
2235 if (!r)
2236 continue;
2237 }
2238 break;
2239
2240 case Transaction::OP_SPLIT_COLLECTION:
2241 ceph_abort_msg("deprecated");
2242 break;
2243
2244 case Transaction::OP_SPLIT_COLLECTION2:
2245 {
2246 uint32_t bits = op->split_bits;
2247 uint32_t rem = op->split_rem;
2248 r = _split_collection(txc, c, cvec[op->dest_cid], bits, rem);
2249 if (!r)
2250 continue;
2251 }
2252 break;
2253
2254 case Transaction::OP_MERGE_COLLECTION:
2255 {
2256 uint32_t bits = op->split_bits;
2257 r = _merge_collection(txc, &c, cvec[op->dest_cid], bits);
2258 if (!r)
2259 continue;
2260 }
2261 break;
2262
2263 case Transaction::OP_COLL_HINT:
2264 {
2265 uint32_t type = op->hint_type;
2266 bufferlist hint;
2267 i.decode_bl(hint);
2268 auto hiter = hint.cbegin();
2269 if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) {
2270 uint32_t pg_num;
2271 uint64_t num_objs;
2272 decode(pg_num, hiter);
2273 decode(num_objs, hiter);
2274 dout(10) << __func__ << " collection hint objects is a no-op, "
2275 << " pg_num " << pg_num << " num_objects " << num_objs
2276 << dendl;
2277 } else {
2278 // Ignore the hint
2279 dout(10) << __func__ << " unknown collection hint " << type << dendl;
2280 }
2281 continue;
2282 }
2283 break;
2284
2285 case Transaction::OP_COLL_SETATTR:
2286 r = -EOPNOTSUPP;
2287 break;
2288
2289 case Transaction::OP_COLL_RMATTR:
2290 r = -EOPNOTSUPP;
2291 break;
2292
2293 case Transaction::OP_COLL_RENAME:
2294 ceph_abort_msg("not implemented");
2295 break;
2296 }
2297 if (r < 0) {
2298 derr << " error " << cpp_strerror(r)
2299 << " not handled on operation " << op->op
2300 << " (op " << pos << ", counting from 0)" << dendl;
2301 dout(0) << " transaction dump:\n";
2302 JSONFormatter f(true);
2303 f.open_object_section("transaction");
2304 t->dump(&f);
2305 f.close_section();
2306 f.flush(*_dout);
2307 *_dout << dendl;
2308 ceph_abort_msg("unexpected error");
2309 }
2310
2311 // object operations
2312 std::unique_lock l{c->lock};
2313 OnodeRef &o = ovec[op->oid];
2314 if (!o) {
2315 // these operations implicity create the object
2316 bool create = false;
2317 if (op->op == Transaction::OP_TOUCH ||
2318 op->op == Transaction::OP_CREATE ||
2319 op->op == Transaction::OP_WRITE ||
2320 op->op == Transaction::OP_ZERO) {
2321 create = true;
2322 }
2323 ghobject_t oid = i.get_oid(op->oid);
2324 o = c->get_onode(oid, create);
2325 if (!create) {
2326 if (!o || !o->exists) {
2327 dout(10) << __func__ << " op " << op->op << " got ENOENT on "
2328 << oid << dendl;
2329 r = -ENOENT;
2330 goto endop;
2331 }
2332 }
2333 }
2334
2335 switch (op->op) {
2336 case Transaction::OP_TOUCH:
2337 case Transaction::OP_CREATE:
2338 r = _touch(txc, c, o);
2339 break;
2340
2341 case Transaction::OP_WRITE:
2342 {
2343 uint64_t off = op->off;
2344 uint64_t len = op->len;
2345 uint32_t fadvise_flags = i.get_fadvise_flags();
2346 bufferlist bl;
2347 i.decode_bl(bl);
2348 r = _write(txc, c, o, off, len, bl, fadvise_flags);
2349 }
2350 break;
2351
2352 case Transaction::OP_ZERO:
2353 {
2354 uint64_t off = op->off;
2355 uint64_t len = op->len;
2356 r = _zero(txc, c, o, off, len);
2357 }
2358 break;
2359
2360 case Transaction::OP_TRIMCACHE:
2361 {
2362 // deprecated, no-op
2363 }
2364 break;
2365
2366 case Transaction::OP_TRUNCATE:
2367 {
2368 uint64_t off = op->off;
2369 r = _truncate(txc, c, o, off);
2370 }
2371 break;
2372
2373 case Transaction::OP_REMOVE:
2374 r = _remove(txc, c, o);
2375 break;
2376
2377 case Transaction::OP_SETATTR:
2378 {
2379 string name = i.decode_string();
2380 bufferlist bl;
2381 i.decode_bl(bl);
2382 map<string, bufferptr> to_set;
2383 to_set[name] = bufferptr(bl.c_str(), bl.length());
2384 r = _setattrs(txc, c, o, to_set);
2385 }
2386 break;
2387
2388 case Transaction::OP_SETATTRS:
2389 {
2390 map<string, bufferptr> aset;
2391 i.decode_attrset(aset);
2392 r = _setattrs(txc, c, o, aset);
2393 }
2394 break;
2395
2396 case Transaction::OP_RMATTR:
2397 {
2398 string name = i.decode_string();
2399 r = _rmattr(txc, c, o, name);
2400 }
2401 break;
2402
2403 case Transaction::OP_RMATTRS:
2404 {
2405 r = _rmattrs(txc, c, o);
2406 }
2407 break;
2408
2409 case Transaction::OP_CLONE:
2410 {
2411 const ghobject_t& noid = i.get_oid(op->dest_oid);
2412 OnodeRef no = c->get_onode(noid, true);
2413 r = _clone(txc, c, o, no);
2414 }
2415 break;
2416
2417 case Transaction::OP_CLONERANGE:
2418 ceph_abort_msg("deprecated");
2419 break;
2420
2421 case Transaction::OP_CLONERANGE2:
2422 {
2423 const ghobject_t& noid = i.get_oid(op->dest_oid);
2424 OnodeRef no = c->get_onode(noid, true);
2425 uint64_t srcoff = op->off;
2426 uint64_t len = op->len;
2427 uint64_t dstoff = op->dest_off;
2428 r = _clone_range(txc, c, o, no, srcoff, len, dstoff);
2429 }
2430 break;
2431
2432 case Transaction::OP_COLL_ADD:
2433 ceph_abort_msg("not implemented");
2434 break;
2435
2436 case Transaction::OP_COLL_REMOVE:
2437 ceph_abort_msg("not implemented");
2438 break;
2439
2440 case Transaction::OP_COLL_MOVE:
2441 ceph_abort_msg("deprecated");
2442 break;
2443
2444 case Transaction::OP_COLL_MOVE_RENAME:
2445 {
2446 ceph_assert(op->cid == op->dest_cid);
2447 const ghobject_t& noid = i.get_oid(op->dest_oid);
2448 OnodeRef no = c->get_onode(noid, true);
2449 r = _rename(txc, c, o, no, noid);
2450 o.reset();
2451 }
2452 break;
2453
2454 case Transaction::OP_TRY_RENAME:
2455 {
2456 const ghobject_t& noid = i.get_oid(op->dest_oid);
2457 OnodeRef no = c->get_onode(noid, true);
2458 r = _rename(txc, c, o, no, noid);
2459 if (r == -ENOENT)
2460 r = 0;
2461 o.reset();
2462 }
2463 break;
2464
2465 case Transaction::OP_OMAP_CLEAR:
2466 {
2467 r = _omap_clear(txc, c, o);
2468 }
2469 break;
2470 case Transaction::OP_OMAP_SETKEYS:
2471 {
2472 bufferlist aset_bl;
2473 i.decode_attrset_bl(&aset_bl);
2474 r = _omap_setkeys(txc, c, o, aset_bl);
2475 }
2476 break;
2477 case Transaction::OP_OMAP_RMKEYS:
2478 {
2479 bufferlist keys_bl;
2480 i.decode_keyset_bl(&keys_bl);
2481 r = _omap_rmkeys(txc, c, o, keys_bl);
2482 }
2483 break;
2484 case Transaction::OP_OMAP_RMKEYRANGE:
2485 {
2486 string first, last;
2487 first = i.decode_string();
2488 last = i.decode_string();
2489 r = _omap_rmkey_range(txc, c, o, first, last);
2490 }
2491 break;
2492 case Transaction::OP_OMAP_SETHEADER:
2493 {
2494 bufferlist bl;
2495 i.decode_bl(bl);
2496 r = _omap_setheader(txc, c, o, bl);
2497 }
2498 break;
2499
2500 case Transaction::OP_SETALLOCHINT:
2501 {
2502 uint64_t expected_object_size = op->expected_object_size;
2503 uint64_t expected_write_size = op->expected_write_size;
2504 uint32_t flags = op->alloc_hint_flags;
2505 r = _setallochint(txc, c, o,
2506 expected_object_size,
2507 expected_write_size,
2508 flags);
2509 }
2510 break;
2511
2512 default:
2513 derr << "bad op " << op->op << dendl;
2514 ceph_abort();
2515 }
2516
2517 endop:
2518 if (r < 0) {
2519 bool ok = false;
2520
2521 if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE ||
2522 op->op == Transaction::OP_CLONE ||
2523 op->op == Transaction::OP_CLONERANGE2 ||
2524 op->op == Transaction::OP_COLL_ADD))
2525 // -ENOENT is usually okay
2526 ok = true;
2527 if (r == -ENODATA)
2528 ok = true;
2529
2530 if (!ok) {
2531 const char *msg = "unexpected error code";
2532
2533 if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE ||
2534 op->op == Transaction::OP_CLONE ||
2535 op->op == Transaction::OP_CLONERANGE2))
2536 msg = "ENOENT on clone suggests osd bug";
2537
2538 if (r == -ENOSPC)
2539 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
2540 // by partially applying transactions.
2541 msg = "ENOSPC from key value store, misconfigured cluster";
2542
2543 if (r == -ENOTEMPTY) {
2544 msg = "ENOTEMPTY suggests garbage data in osd data dir";
2545 }
2546
2547 dout(0) << " error " << cpp_strerror(r) << " not handled on operation " << op->op
2548 << " (op " << pos << ", counting from 0)" << dendl;
2549 dout(0) << msg << dendl;
2550 dout(0) << " transaction dump:\n";
2551 JSONFormatter f(true);
2552 f.open_object_section("transaction");
2553 t->dump(&f);
2554 f.close_section();
2555 f.flush(*_dout);
2556 *_dout << dendl;
2557 ceph_abort_msg("unexpected error");
2558 }
2559 }
2560 }
2561 }
2562
2563
2564
2565 // -----------------
2566 // write operations
2567
2568 int KStore::_touch(TransContext *txc,
2569 CollectionRef& c,
2570 OnodeRef &o)
2571 {
2572 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2573 int r = 0;
2574 o->exists = true;
2575 _assign_nid(txc, o);
2576 txc->write_onode(o);
2577 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
2578 return r;
2579 }
2580
2581 void KStore::_dump_onode(OnodeRef o)
2582 {
2583 dout(30) << __func__ << " " << o
2584 << " nid " << o->onode.nid
2585 << " size " << o->onode.size
2586 << " expected_object_size " << o->onode.expected_object_size
2587 << " expected_write_size " << o->onode.expected_write_size
2588 << dendl;
2589 for (map<string,bufferptr>::iterator p = o->onode.attrs.begin();
2590 p != o->onode.attrs.end();
2591 ++p) {
2592 dout(30) << __func__ << " attr " << p->first
2593 << " len " << p->second.length() << dendl;
2594 }
2595 }
2596
2597 void KStore::_do_read_stripe(OnodeRef o, uint64_t offset, bufferlist *pbl, bool do_cache)
2598 {
2599 if (!do_cache) {
2600 string key;
2601 get_data_key(o->onode.nid, offset, &key);
2602 db->get(PREFIX_DATA, key, pbl);
2603 return;
2604 }
2605
2606 map<uint64_t,bufferlist>::iterator p = o->pending_stripes.find(offset);
2607 if (p == o->pending_stripes.end()) {
2608 string key;
2609 get_data_key(o->onode.nid, offset, &key);
2610 db->get(PREFIX_DATA, key, pbl);
2611 o->pending_stripes[offset] = *pbl;
2612 } else {
2613 *pbl = p->second;
2614 }
2615 }
2616
2617 void KStore::_do_write_stripe(TransContext *txc, OnodeRef o,
2618 uint64_t offset, bufferlist& bl)
2619 {
2620 o->pending_stripes[offset] = bl;
2621 string key;
2622 get_data_key(o->onode.nid, offset, &key);
2623 txc->t->set(PREFIX_DATA, key, bl);
2624 }
2625
2626 void KStore::_do_remove_stripe(TransContext *txc, OnodeRef o, uint64_t offset)
2627 {
2628 o->pending_stripes.erase(offset);
2629 string key;
2630 get_data_key(o->onode.nid, offset, &key);
2631 txc->t->rmkey(PREFIX_DATA, key);
2632 }
2633
2634 int KStore::_do_write(TransContext *txc,
2635 OnodeRef o,
2636 uint64_t offset, uint64_t length,
2637 bufferlist& orig_bl,
2638 uint32_t fadvise_flags)
2639 {
2640 int r = 0;
2641
2642 dout(20) << __func__
2643 << " " << o->oid << " " << offset << "~" << length
2644 << " - have " << o->onode.size
2645 << " bytes, nid " << o->onode.nid << dendl;
2646 _dump_onode(o);
2647 o->exists = true;
2648
2649 if (length == 0) {
2650 return 0;
2651 }
2652
2653 uint64_t stripe_size = o->onode.stripe_size;
2654 if (!stripe_size) {
2655 o->onode.stripe_size = cct->_conf->kstore_default_stripe_size;
2656 stripe_size = o->onode.stripe_size;
2657 }
2658
2659 unsigned bl_off = 0;
2660 while (length > 0) {
2661 uint64_t offset_rem = offset % stripe_size;
2662 uint64_t end_rem = (offset + length) % stripe_size;
2663 if (offset_rem == 0 && end_rem == 0) {
2664 bufferlist bl;
2665 bl.substr_of(orig_bl, bl_off, stripe_size);
2666 dout(30) << __func__ << " full stripe " << offset << dendl;
2667 _do_write_stripe(txc, o, offset, bl);
2668 offset += stripe_size;
2669 length -= stripe_size;
2670 bl_off += stripe_size;
2671 continue;
2672 }
2673 uint64_t stripe_off = offset - offset_rem;
2674 bufferlist prev;
2675 _do_read_stripe(o, stripe_off, &prev, true);
2676 dout(20) << __func__ << " read previous stripe " << stripe_off
2677 << ", got " << prev.length() << dendl;
2678 bufferlist bl;
2679 if (offset_rem) {
2680 unsigned p = std::min<uint64_t>(prev.length(), offset_rem);
2681 if (p) {
2682 dout(20) << __func__ << " reuse leading " << p << " bytes" << dendl;
2683 bl.substr_of(prev, 0, p);
2684 }
2685 if (p < offset_rem) {
2686 dout(20) << __func__ << " add leading " << offset_rem - p << " zeros" << dendl;
2687 bl.append_zero(offset_rem - p);
2688 }
2689 }
2690 unsigned use = stripe_size - offset_rem;
2691 if (use > length)
2692 use -= stripe_size - end_rem;
2693 dout(20) << __func__ << " using " << use << " for this stripe" << dendl;
2694 bufferlist t;
2695 t.substr_of(orig_bl, bl_off, use);
2696 bl.claim_append(t);
2697 bl_off += use;
2698 if (end_rem) {
2699 if (end_rem < prev.length()) {
2700 unsigned l = prev.length() - end_rem;
2701 dout(20) << __func__ << " reuse trailing " << l << " bytes" << dendl;
2702 bufferlist t;
2703 t.substr_of(prev, end_rem, l);
2704 bl.claim_append(t);
2705 }
2706 }
2707 dout(30) << " writing:\n";
2708 bl.hexdump(*_dout);
2709 *_dout << dendl;
2710 _do_write_stripe(txc, o, stripe_off, bl);
2711 offset += use;
2712 length -= use;
2713 }
2714
2715 if (offset > o->onode.size) {
2716 dout(20) << __func__ << " extending size to " << offset + length
2717 << dendl;
2718 o->onode.size = offset;
2719 }
2720
2721 return r;
2722 }
2723
2724 int KStore::_write(TransContext *txc,
2725 CollectionRef& c,
2726 OnodeRef& o,
2727 uint64_t offset, size_t length,
2728 bufferlist& bl,
2729 uint32_t fadvise_flags)
2730 {
2731 dout(15) << __func__ << " " << c->cid << " " << o->oid
2732 << " " << offset << "~" << length
2733 << dendl;
2734 _assign_nid(txc, o);
2735 int r = _do_write(txc, o, offset, length, bl, fadvise_flags);
2736 txc->write_onode(o);
2737
2738 dout(10) << __func__ << " " << c->cid << " " << o->oid
2739 << " " << offset << "~" << length
2740 << " = " << r << dendl;
2741 return r;
2742 }
2743
2744 int KStore::_zero(TransContext *txc,
2745 CollectionRef& c,
2746 OnodeRef& o,
2747 uint64_t offset, size_t length)
2748 {
2749 dout(15) << __func__ << " " << c->cid << " " << o->oid
2750 << " " << offset << "~" << length
2751 << dendl;
2752 int r = 0;
2753 o->exists = true;
2754
2755 _dump_onode(o);
2756 _assign_nid(txc, o);
2757
2758 uint64_t stripe_size = o->onode.stripe_size;
2759 if (stripe_size) {
2760 uint64_t end = offset + length;
2761 uint64_t pos = offset;
2762 uint64_t stripe_off = pos % stripe_size;
2763 while (pos < offset + length) {
2764 if (stripe_off || end - pos < stripe_size) {
2765 bufferlist stripe;
2766 _do_read_stripe(o, pos - stripe_off, &stripe, true);
2767 dout(30) << __func__ << " stripe " << pos - stripe_off << " got "
2768 << stripe.length() << dendl;
2769 bufferlist bl;
2770 bl.substr_of(stripe, 0, std::min<uint64_t>(stripe.length(), stripe_off));
2771 if (end >= pos - stripe_off + stripe_size ||
2772 end >= o->onode.size) {
2773 dout(20) << __func__ << " truncated stripe " << pos - stripe_off
2774 << " to " << bl.length() << dendl;
2775 } else {
2776 auto len = end - (pos - stripe_off + bl.length());
2777 bl.append_zero(len);
2778 dout(20) << __func__ << " adding " << len << " of zeros" << dendl;
2779 if (stripe.length() > bl.length()) {
2780 unsigned l = stripe.length() - bl.length();
2781 bufferlist t;
2782 t.substr_of(stripe, stripe.length() - l, l);
2783 dout(20) << __func__ << " keeping tail " << l << " of stripe" << dendl;
2784 bl.claim_append(t);
2785 }
2786 }
2787 _do_write_stripe(txc, o, pos - stripe_off, bl);
2788 pos += stripe_size - stripe_off;
2789 stripe_off = 0;
2790 } else {
2791 dout(20) << __func__ << " rm stripe " << pos << dendl;
2792 _do_remove_stripe(txc, o, pos - stripe_off);
2793 pos += stripe_size;
2794 }
2795 }
2796 }
2797 if (offset + length > o->onode.size) {
2798 o->onode.size = offset + length;
2799 dout(20) << __func__ << " extending size to " << offset + length
2800 << dendl;
2801 }
2802 txc->write_onode(o);
2803
2804 dout(10) << __func__ << " " << c->cid << " " << o->oid
2805 << " " << offset << "~" << length
2806 << " = " << r << dendl;
2807 return r;
2808 }
2809
2810 int KStore::_do_truncate(TransContext *txc, OnodeRef o, uint64_t offset)
2811 {
2812 uint64_t stripe_size = o->onode.stripe_size;
2813
2814 o->flush();
2815
2816 // trim down stripes
2817 if (stripe_size) {
2818 uint64_t pos = offset;
2819 uint64_t stripe_off = pos % stripe_size;
2820 while (pos < o->onode.size) {
2821 if (stripe_off) {
2822 bufferlist stripe;
2823 _do_read_stripe(o, pos - stripe_off, &stripe, true);
2824 dout(30) << __func__ << " stripe " << pos - stripe_off << " got "
2825 << stripe.length() << dendl;
2826 bufferlist t;
2827 t.substr_of(stripe, 0, std::min<uint64_t>(stripe_off, stripe.length()));
2828 _do_write_stripe(txc, o, pos - stripe_off, t);
2829 dout(20) << __func__ << " truncated stripe " << pos - stripe_off
2830 << " to " << t.length() << dendl;
2831 pos += stripe_size - stripe_off;
2832 stripe_off = 0;
2833 } else {
2834 dout(20) << __func__ << " rm stripe " << pos << dendl;
2835 _do_remove_stripe(txc, o, pos - stripe_off);
2836 pos += stripe_size;
2837 }
2838 }
2839
2840 // trim down cached tail
2841 if (o->tail_bl.length()) {
2842 if (offset / stripe_size != o->onode.size / stripe_size) {
2843 dout(20) << __func__ << " clear cached tail" << dendl;
2844 o->clear_tail();
2845 }
2846 }
2847 }
2848
2849 o->onode.size = offset;
2850 dout(10) << __func__ << " truncate size to " << offset << dendl;
2851
2852 txc->write_onode(o);
2853 return 0;
2854 }
2855
2856 int KStore::_truncate(TransContext *txc,
2857 CollectionRef& c,
2858 OnodeRef& o,
2859 uint64_t offset)
2860 {
2861 dout(15) << __func__ << " " << c->cid << " " << o->oid
2862 << " " << offset
2863 << dendl;
2864 int r = _do_truncate(txc, o, offset);
2865 dout(10) << __func__ << " " << c->cid << " " << o->oid
2866 << " " << offset
2867 << " = " << r << dendl;
2868 return r;
2869 }
2870
2871 int KStore::_do_remove(TransContext *txc,
2872 OnodeRef o)
2873 {
2874 string key;
2875
2876 _do_truncate(txc, o, 0);
2877
2878 o->onode.size = 0;
2879 if (o->onode.omap_head) {
2880 _do_omap_clear(txc, o->onode.omap_head);
2881 }
2882 o->exists = false;
2883 o->onode = kstore_onode_t();
2884 txc->onodes.erase(o);
2885 get_object_key(cct, o->oid, &key);
2886 txc->t->rmkey(PREFIX_OBJ, key);
2887 return 0;
2888 }
2889
2890 int KStore::_remove(TransContext *txc,
2891 CollectionRef& c,
2892 OnodeRef &o)
2893 {
2894 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2895 int r = _do_remove(txc, o);
2896 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
2897 return r;
2898 }
2899
2900 int KStore::_setattr(TransContext *txc,
2901 CollectionRef& c,
2902 OnodeRef& o,
2903 const string& name,
2904 bufferptr& val)
2905 {
2906 dout(15) << __func__ << " " << c->cid << " " << o->oid
2907 << " " << name << " (" << val.length() << " bytes)"
2908 << dendl;
2909 int r = 0;
2910 o->onode.attrs[name] = val;
2911 txc->write_onode(o);
2912 dout(10) << __func__ << " " << c->cid << " " << o->oid
2913 << " " << name << " (" << val.length() << " bytes)"
2914 << " = " << r << dendl;
2915 return r;
2916 }
2917
2918 int KStore::_setattrs(TransContext *txc,
2919 CollectionRef& c,
2920 OnodeRef& o,
2921 const map<string,bufferptr>& aset)
2922 {
2923 dout(15) << __func__ << " " << c->cid << " " << o->oid
2924 << " " << aset.size() << " keys"
2925 << dendl;
2926 int r = 0;
2927 for (map<string,bufferptr>::const_iterator p = aset.begin();
2928 p != aset.end(); ++p) {
2929 if (p->second.is_partial())
2930 o->onode.attrs[p->first] = bufferptr(p->second.c_str(), p->second.length());
2931 else
2932 o->onode.attrs[p->first] = p->second;
2933 }
2934 txc->write_onode(o);
2935 dout(10) << __func__ << " " << c->cid << " " << o->oid
2936 << " " << aset.size() << " keys"
2937 << " = " << r << dendl;
2938 return r;
2939 }
2940
2941
2942 int KStore::_rmattr(TransContext *txc,
2943 CollectionRef& c,
2944 OnodeRef& o,
2945 const string& name)
2946 {
2947 dout(15) << __func__ << " " << c->cid << " " << o->oid
2948 << " " << name << dendl;
2949 int r = 0;
2950 o->onode.attrs.erase(name);
2951 txc->write_onode(o);
2952 dout(10) << __func__ << " " << c->cid << " " << o->oid
2953 << " " << name << " = " << r << dendl;
2954 return r;
2955 }
2956
2957 int KStore::_rmattrs(TransContext *txc,
2958 CollectionRef& c,
2959 OnodeRef& o)
2960 {
2961 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2962 int r = 0;
2963 o->onode.attrs.clear();
2964 txc->write_onode(o);
2965 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
2966 return r;
2967 }
2968
2969 void KStore::_do_omap_clear(TransContext *txc, uint64_t id)
2970 {
2971 KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
2972 string prefix, tail;
2973 get_omap_header(id, &prefix);
2974 get_omap_tail(id, &tail);
2975 it->lower_bound(prefix);
2976 while (it->valid()) {
2977 if (it->key() >= tail) {
2978 dout(30) << __func__ << " stop at " << tail << dendl;
2979 break;
2980 }
2981 txc->t->rmkey(PREFIX_OMAP, it->key());
2982 dout(30) << __func__ << " rm " << pretty_binary_string(it->key()) << dendl;
2983 it->next();
2984 }
2985 }
2986
2987 int KStore::_omap_clear(TransContext *txc,
2988 CollectionRef& c,
2989 OnodeRef& o)
2990 {
2991 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2992 int r = 0;
2993 if (o->onode.omap_head != 0) {
2994 _do_omap_clear(txc, o->onode.omap_head);
2995 }
2996 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
2997 return r;
2998 }
2999
3000 int KStore::_omap_setkeys(TransContext *txc,
3001 CollectionRef& c,
3002 OnodeRef& o,
3003 bufferlist &bl)
3004 {
3005 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
3006 int r;
3007 auto p = bl.cbegin();
3008 __u32 num;
3009 if (!o->onode.omap_head) {
3010 o->onode.omap_head = o->onode.nid;
3011 txc->write_onode(o);
3012 }
3013 decode(num, p);
3014 while (num--) {
3015 string key;
3016 bufferlist value;
3017 decode(key, p);
3018 decode(value, p);
3019 string final_key;
3020 get_omap_key(o->onode.omap_head, key, &final_key);
3021 dout(30) << __func__ << " " << pretty_binary_string(final_key)
3022 << " <- " << key << dendl;
3023 txc->t->set(PREFIX_OMAP, final_key, value);
3024 }
3025 r = 0;
3026 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
3027 return r;
3028 }
3029
3030 int KStore::_omap_setheader(TransContext *txc,
3031 CollectionRef& c,
3032 OnodeRef &o,
3033 bufferlist& bl)
3034 {
3035 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
3036 int r;
3037 string key;
3038 if (!o->onode.omap_head) {
3039 o->onode.omap_head = o->onode.nid;
3040 txc->write_onode(o);
3041 }
3042 get_omap_header(o->onode.omap_head, &key);
3043 txc->t->set(PREFIX_OMAP, key, bl);
3044 r = 0;
3045 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
3046 return r;
3047 }
3048
3049 int KStore::_omap_rmkeys(TransContext *txc,
3050 CollectionRef& c,
3051 OnodeRef& o,
3052 const bufferlist& bl)
3053 {
3054 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
3055 int r = 0;
3056 auto p = bl.cbegin();
3057 __u32 num;
3058
3059 if (!o->onode.omap_head) {
3060 r = 0;
3061 goto out;
3062 }
3063 decode(num, p);
3064 while (num--) {
3065 string key;
3066 decode(key, p);
3067 string final_key;
3068 get_omap_key(o->onode.omap_head, key, &final_key);
3069 dout(30) << __func__ << " rm " << pretty_binary_string(final_key)
3070 << " <- " << key << dendl;
3071 txc->t->rmkey(PREFIX_OMAP, final_key);
3072 }
3073 r = 0;
3074
3075 out:
3076 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
3077 return r;
3078 }
3079
3080 int KStore::_omap_rmkey_range(TransContext *txc,
3081 CollectionRef& c,
3082 OnodeRef& o,
3083 const string& first, const string& last)
3084 {
3085 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
3086 KeyValueDB::Iterator it;
3087 string key_first, key_last;
3088 int r = 0;
3089
3090 if (!o->onode.omap_head) {
3091 goto out;
3092 }
3093 it = db->get_iterator(PREFIX_OMAP);
3094 get_omap_key(o->onode.omap_head, first, &key_first);
3095 get_omap_key(o->onode.omap_head, last, &key_last);
3096 it->lower_bound(key_first);
3097 while (it->valid()) {
3098 if (it->key() >= key_last) {
3099 dout(30) << __func__ << " stop at " << pretty_binary_string(key_last)
3100 << dendl;
3101 break;
3102 }
3103 txc->t->rmkey(PREFIX_OMAP, it->key());
3104 dout(30) << __func__ << " rm " << pretty_binary_string(it->key()) << dendl;
3105 it->next();
3106 }
3107 r = 0;
3108
3109 out:
3110 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
3111 return r;
3112 }
3113
3114 int KStore::_setallochint(TransContext *txc,
3115 CollectionRef& c,
3116 OnodeRef& o,
3117 uint64_t expected_object_size,
3118 uint64_t expected_write_size,
3119 uint32_t flags)
3120 {
3121 dout(15) << __func__ << " " << c->cid << " " << o->oid
3122 << " object_size " << expected_object_size
3123 << " write_size " << expected_write_size
3124 << " flags " << flags
3125 << dendl;
3126 int r = 0;
3127 o->onode.expected_object_size = expected_object_size;
3128 o->onode.expected_write_size = expected_write_size;
3129 o->onode.alloc_hint_flags = flags;
3130
3131 txc->write_onode(o);
3132 dout(10) << __func__ << " " << c->cid << " " << o->oid
3133 << " object_size " << expected_object_size
3134 << " write_size " << expected_write_size
3135 << " = " << r << dendl;
3136 return r;
3137 }
3138
3139 int KStore::_clone(TransContext *txc,
3140 CollectionRef& c,
3141 OnodeRef& oldo,
3142 OnodeRef& newo)
3143 {
3144 dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3145 << newo->oid << dendl;
3146 int r = 0;
3147 if (oldo->oid.hobj.get_hash() != newo->oid.hobj.get_hash()) {
3148 derr << __func__ << " mismatched hash on " << oldo->oid
3149 << " and " << newo->oid << dendl;
3150 return -EINVAL;
3151 }
3152
3153 bufferlist bl;
3154 newo->exists = true;
3155 _assign_nid(txc, newo);
3156
3157 // data
3158 oldo->flush();
3159
3160 r = _do_read(oldo, 0, oldo->onode.size, bl, true, 0);
3161 if (r < 0)
3162 goto out;
3163
3164 // truncate any old data
3165 r = _do_truncate(txc, newo, 0);
3166 if (r < 0)
3167 goto out;
3168
3169 r = _do_write(txc, newo, 0, oldo->onode.size, bl, 0);
3170 if (r < 0)
3171 goto out;
3172
3173 newo->onode.attrs = oldo->onode.attrs;
3174
3175 // clone omap
3176 if (newo->onode.omap_head) {
3177 dout(20) << __func__ << " clearing old omap data" << dendl;
3178 _do_omap_clear(txc, newo->onode.omap_head);
3179 }
3180 if (oldo->onode.omap_head) {
3181 dout(20) << __func__ << " copying omap data" << dendl;
3182 if (!newo->onode.omap_head) {
3183 newo->onode.omap_head = newo->onode.nid;
3184 }
3185 KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
3186 string head, tail;
3187 get_omap_header(oldo->onode.omap_head, &head);
3188 get_omap_tail(oldo->onode.omap_head, &tail);
3189 it->lower_bound(head);
3190 while (it->valid()) {
3191 string key;
3192 if (it->key() >= tail) {
3193 dout(30) << __func__ << " reached tail" << dendl;
3194 break;
3195 } else {
3196 dout(30) << __func__ << " got header/data "
3197 << pretty_binary_string(it->key()) << dendl;
3198 ceph_assert(it->key() < tail);
3199 rewrite_omap_key(newo->onode.omap_head, it->key(), &key);
3200 txc->t->set(PREFIX_OMAP, key, it->value());
3201 }
3202 it->next();
3203 }
3204 }
3205
3206 txc->write_onode(newo);
3207 r = 0;
3208
3209 out:
3210 dout(10) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3211 << newo->oid << " = " << r << dendl;
3212 return r;
3213 }
3214
3215 int KStore::_clone_range(TransContext *txc,
3216 CollectionRef& c,
3217 OnodeRef& oldo,
3218 OnodeRef& newo,
3219 uint64_t srcoff, uint64_t length, uint64_t dstoff)
3220 {
3221 dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3222 << newo->oid << " from " << srcoff << "~" << length
3223 << " to offset " << dstoff << dendl;
3224 int r = 0;
3225
3226 bufferlist bl;
3227 newo->exists = true;
3228 _assign_nid(txc, newo);
3229
3230 r = _do_read(oldo, srcoff, length, bl, true, 0);
3231 if (r < 0)
3232 goto out;
3233
3234 r = _do_write(txc, newo, dstoff, bl.length(), bl, 0);
3235 if (r < 0)
3236 goto out;
3237
3238 txc->write_onode(newo);
3239
3240 r = 0;
3241
3242 out:
3243 dout(10) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3244 << newo->oid << " from " << srcoff << "~" << length
3245 << " to offset " << dstoff
3246 << " = " << r << dendl;
3247 return r;
3248 }
3249
3250 int KStore::_rename(TransContext *txc,
3251 CollectionRef& c,
3252 OnodeRef& oldo,
3253 OnodeRef& newo,
3254 const ghobject_t& new_oid)
3255 {
3256 dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3257 << new_oid << dendl;
3258 int r;
3259 ghobject_t old_oid = oldo->oid;
3260 bufferlist bl;
3261 string old_key, new_key;
3262
3263 if (newo && newo->exists) {
3264 // destination object already exists, remove it first
3265 r = _do_remove(txc, newo);
3266 if (r < 0)
3267 goto out;
3268 }
3269
3270 txc->t->rmkey(PREFIX_OBJ, oldo->key);
3271 txc->write_onode(oldo);
3272 c->onode_map.rename(old_oid, new_oid); // this adjusts oldo->{oid,key}
3273 r = 0;
3274
3275 out:
3276 dout(10) << __func__ << " " << c->cid << " " << old_oid << " -> "
3277 << new_oid << " = " << r << dendl;
3278 return r;
3279 }
3280
3281 // collections
3282
3283 int KStore::_create_collection(
3284 TransContext *txc,
3285 coll_t cid,
3286 unsigned bits,
3287 CollectionRef *c)
3288 {
3289 dout(15) << __func__ << " " << cid << " bits " << bits << dendl;
3290 int r;
3291 bufferlist bl;
3292
3293 {
3294 std::unique_lock l{coll_lock};
3295 if (*c) {
3296 r = -EEXIST;
3297 goto out;
3298 }
3299 auto p = new_coll_map.find(cid);
3300 ceph_assert(p != new_coll_map.end());
3301 *c = p->second;
3302 ceph_assert((*c)->cid == cid);
3303 (*c)->cnode.bits = bits;
3304 coll_map[cid] = *c;
3305 new_coll_map.erase(p);
3306 }
3307 encode((*c)->cnode, bl);
3308 txc->t->set(PREFIX_COLL, stringify(cid), bl);
3309 r = 0;
3310
3311 out:
3312 dout(10) << __func__ << " " << cid << " bits " << bits << " = " << r << dendl;
3313 return r;
3314 }
3315
3316 int KStore::_remove_collection(TransContext *txc, coll_t cid,
3317 CollectionRef *c)
3318 {
3319 dout(15) << __func__ << " " << cid << dendl;
3320 int r;
3321
3322 {
3323 std::unique_lock l{coll_lock};
3324 if (!*c) {
3325 r = -ENOENT;
3326 goto out;
3327 }
3328 size_t nonexistent_count = 0;
3329 pair<ghobject_t,OnodeRef> next_onode;
3330 while ((*c)->onode_map.get_next(next_onode.first, &next_onode)) {
3331 if (next_onode.second->exists) {
3332 r = -ENOTEMPTY;
3333 goto out;
3334 }
3335 ++nonexistent_count;
3336 }
3337 vector<ghobject_t> ls;
3338 ghobject_t next;
3339 // Enumerate onodes in db, up to nonexistent_count + 1
3340 // then check if all of them are marked as non-existent.
3341 // Bypass the check if returned number is greater than nonexistent_count
3342 r = _collection_list(c->get(), ghobject_t(), ghobject_t::get_max(),
3343 nonexistent_count + 1, &ls, &next);
3344 if (r >= 0) {
3345 bool exists = false; //ls.size() > nonexistent_count;
3346 for (auto it = ls.begin(); !exists && it < ls.end(); ++it) {
3347 dout(10) << __func__ << " oid " << *it << dendl;
3348 auto onode = (*c)->onode_map.lookup(*it);
3349 exists = !onode || onode->exists;
3350 if (exists) {
3351 dout(10) << __func__ << " " << *it
3352 << " exists in db" << dendl;
3353 }
3354 }
3355 if (!exists) {
3356 coll_map.erase(cid);
3357 txc->removed_collections.push_back(*c);
3358 c->reset();
3359 txc->t->rmkey(PREFIX_COLL, stringify(cid));
3360 r = 0;
3361 } else {
3362 dout(10) << __func__ << " " << cid
3363 << " is non-empty" << dendl;
3364 r = -ENOTEMPTY;
3365 }
3366 }
3367 }
3368
3369 out:
3370 dout(10) << __func__ << " " << cid << " = " << r << dendl;
3371 return r;
3372 }
3373
3374 int KStore::_split_collection(TransContext *txc,
3375 CollectionRef& c,
3376 CollectionRef& d,
3377 unsigned bits, int rem)
3378 {
3379 dout(15) << __func__ << " " << c->cid << " to " << d->cid << " "
3380 << " bits " << bits << dendl;
3381 int r;
3382 std::unique_lock l{c->lock};
3383 std::unique_lock l2{d->lock};
3384 c->onode_map.clear();
3385 d->onode_map.clear();
3386 c->cnode.bits = bits;
3387 ceph_assert(d->cnode.bits == bits);
3388 r = 0;
3389
3390 bufferlist bl;
3391 encode(c->cnode, bl);
3392 txc->t->set(PREFIX_COLL, stringify(c->cid), bl);
3393
3394 dout(10) << __func__ << " " << c->cid << " to " << d->cid << " "
3395 << " bits " << bits << " = " << r << dendl;
3396 return r;
3397 }
3398
3399 int KStore::_merge_collection(TransContext *txc,
3400 CollectionRef *c,
3401 CollectionRef& d,
3402 unsigned bits)
3403 {
3404 dout(15) << __func__ << " " << (*c)->cid << " to " << d->cid << " "
3405 << " bits " << bits << dendl;
3406 int r;
3407 std::scoped_lock l{(*c)->lock, d->lock};
3408 (*c)->onode_map.clear();
3409 d->onode_map.clear();
3410 d->cnode.bits = bits;
3411 r = 0;
3412
3413 coll_t cid = (*c)->cid;
3414
3415 bufferlist bl;
3416 encode(d->cnode, bl);
3417 txc->t->set(PREFIX_COLL, stringify(d->cid), bl);
3418
3419 coll_map.erase((*c)->cid);
3420 txc->removed_collections.push_back(*c);
3421 c->reset();
3422 txc->t->rmkey(PREFIX_COLL, stringify(cid));
3423
3424 dout(10) << __func__ << " " << cid << " to " << d->cid << " "
3425 << " bits " << bits << " = " << r << dendl;
3426 return r;
3427 }
3428
3429 // ===========================================