]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/kstore/KStore.cc
b06b06865d484f7ddb4ea8c5507bd825dd76388d
[ceph.git] / ceph / src / os / kstore / KStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <unistd.h>
16 #include <stdlib.h>
17 #include <sys/types.h>
18 #include <sys/stat.h>
19 #include <fcntl.h>
20 #include <unistd.h>
21
22 #include "KStore.h"
23 #include "osd/osd_types.h"
24 #include "os/kv.h"
25 #include "include/compat.h"
26 #include "include/stringify.h"
27 #include "common/errno.h"
28 #include "common/safe_io.h"
29 #include "common/Formatter.h"
30
31
32 #define dout_context cct
33 #define dout_subsys ceph_subsys_kstore
34
35 /*
36
37 TODO:
38
39 * superblock, features
40 * refcounted extents (for efficient clone)
41
42 */
43
44 const string PREFIX_SUPER = "S"; // field -> value
45 const string PREFIX_COLL = "C"; // collection name -> (nothing)
46 const string PREFIX_OBJ = "O"; // object name -> onode
47 const string PREFIX_DATA = "D"; // nid + offset -> data
48 const string PREFIX_OMAP = "M"; // u64 + keyname -> value
49
50 /*
51 * object name key structure
52 *
53 * 2 chars: shard (-- for none, or hex digit, so that we sort properly)
54 * encoded u64: poolid + 2^63 (so that it sorts properly)
55 * encoded u32: hash (bit reversed)
56 *
57 * 1 char: '.'
58 *
59 * escaped string: namespace
60 *
61 * 1 char: '<', '=', or '>'. if =, then object key == object name, and
62 * we are followed just by the key. otherwise, we are followed by
63 * the key and then the object name.
64 * escaped string: key
65 * escaped string: object name (unless '=' above)
66 *
67 * encoded u64: snap
68 * encoded u64: generation
69 */
70
71 /*
72 * string encoding in the key
73 *
74 * The key string needs to lexicographically sort the same way that
75 * ghobject_t does. We do this by escaping anything <= to '#' with #
76 * plus a 2 digit hex string, and anything >= '~' with ~ plus the two
77 * hex digits.
78 *
79 * We use ! as a terminator for strings; this works because it is < #
80 * and will get escaped if it is present in the string.
81 *
82 */
83
84 static void append_escaped(const string &in, string *out)
85 {
86 char hexbyte[8];
87 for (string::const_iterator i = in.begin(); i != in.end(); ++i) {
88 if (*i <= '#') {
89 snprintf(hexbyte, sizeof(hexbyte), "#%02x", (uint8_t)*i);
90 out->append(hexbyte);
91 } else if (*i >= '~') {
92 snprintf(hexbyte, sizeof(hexbyte), "~%02x", (uint8_t)*i);
93 out->append(hexbyte);
94 } else {
95 out->push_back(*i);
96 }
97 }
98 out->push_back('!');
99 }
100
101 static int decode_escaped(const char *p, string *out)
102 {
103 const char *orig_p = p;
104 while (*p && *p != '!') {
105 if (*p == '#' || *p == '~') {
106 unsigned hex;
107 int r = sscanf(++p, "%2x", &hex);
108 if (r < 1)
109 return -EINVAL;
110 out->push_back((char)hex);
111 p += 2;
112 } else {
113 out->push_back(*p++);
114 }
115 }
116 return p - orig_p;
117 }
118
119 // some things we encode in binary (as le32 or le64); print the
120 // resulting key strings nicely
121 static string pretty_binary_string(const string& in)
122 {
123 char buf[10];
124 string out;
125 out.reserve(in.length() * 3);
126 enum { NONE, HEX, STRING } mode = NONE;
127 unsigned from = 0, i;
128 for (i=0; i < in.length(); ++i) {
129 if ((in[i] < 32 || (unsigned char)in[i] > 126) ||
130 (mode == HEX && in.length() - i >= 4 &&
131 ((in[i] < 32 || (unsigned char)in[i] > 126) ||
132 (in[i+1] < 32 || (unsigned char)in[i+1] > 126) ||
133 (in[i+2] < 32 || (unsigned char)in[i+2] > 126) ||
134 (in[i+3] < 32 || (unsigned char)in[i+3] > 126)))) {
135 if (mode == STRING) {
136 out.append(in.substr(from, i - from));
137 out.push_back('\'');
138 }
139 if (mode != HEX) {
140 out.append("0x");
141 mode = HEX;
142 }
143 if (in.length() - i >= 4) {
144 // print a whole u32 at once
145 snprintf(buf, sizeof(buf), "%08x",
146 (uint32_t)(((unsigned char)in[i] << 24) |
147 ((unsigned char)in[i+1] << 16) |
148 ((unsigned char)in[i+2] << 8) |
149 ((unsigned char)in[i+3] << 0)));
150 i += 3;
151 } else {
152 snprintf(buf, sizeof(buf), "%02x", (int)(unsigned char)in[i]);
153 }
154 out.append(buf);
155 } else {
156 if (mode != STRING) {
157 out.push_back('\'');
158 mode = STRING;
159 from = i;
160 }
161 }
162 }
163 if (mode == STRING) {
164 out.append(in.substr(from, i - from));
165 out.push_back('\'');
166 }
167 return out;
168 }
169
170 static void _key_encode_shard(shard_id_t shard, string *key)
171 {
172 // make field ordering match with ghobject_t compare operations
173 if (shard == shard_id_t::NO_SHARD) {
174 // otherwise ff will sort *after* 0, not before.
175 key->append("--");
176 } else {
177 char buf[32];
178 snprintf(buf, sizeof(buf), "%02x", (int)shard);
179 key->append(buf);
180 }
181 }
182 static const char *_key_decode_shard(const char *key, shard_id_t *pshard)
183 {
184 if (key[0] == '-') {
185 *pshard = shard_id_t::NO_SHARD;
186 } else {
187 unsigned shard;
188 int r = sscanf(key, "%x", &shard);
189 if (r < 1)
190 return NULL;
191 *pshard = shard_id_t(shard);
192 }
193 return key + 2;
194 }
195
196 static void get_coll_key_range(const coll_t& cid, int bits,
197 string *temp_start, string *temp_end,
198 string *start, string *end)
199 {
200 temp_start->clear();
201 temp_end->clear();
202 start->clear();
203 end->clear();
204
205 spg_t pgid;
206 if (cid.is_pg(&pgid)) {
207 _key_encode_shard(pgid.shard, start);
208 *end = *start;
209 *temp_start = *start;
210 *temp_end = *start;
211
212 _key_encode_u64(pgid.pool() + 0x8000000000000000ull, start);
213 _key_encode_u64((-2ll - pgid.pool()) + 0x8000000000000000ull, temp_start);
214 _key_encode_u32(hobject_t::_reverse_bits(pgid.ps()), start);
215 _key_encode_u32(hobject_t::_reverse_bits(pgid.ps()), temp_start);
216 start->append(".");
217 temp_start->append(".");
218
219 _key_encode_u64(pgid.pool() + 0x8000000000000000ull, end);
220 _key_encode_u64((-2ll - pgid.pool()) + 0x8000000000000000ull, temp_end);
221
222 uint64_t end_hash =
223 hobject_t::_reverse_bits(pgid.ps()) + (1ull << (32-bits));
224 if (end_hash <= 0xffffffffull) {
225 _key_encode_u32(end_hash, end);
226 _key_encode_u32(end_hash, temp_end);
227 end->append(".");
228 temp_end->append(".");
229 } else {
230 _key_encode_u32(0xffffffff, end);
231 _key_encode_u32(0xffffffff, temp_end);
232 end->append(":");
233 temp_end->append(":");
234 }
235 } else {
236 _key_encode_shard(shard_id_t::NO_SHARD, start);
237 _key_encode_u64(-1ull + 0x8000000000000000ull, start);
238 *end = *start;
239 _key_encode_u32(0, start);
240 start->append(".");
241 _key_encode_u32(0xffffffff, end);
242 end->append(":");
243
244 // no separate temp section
245 *temp_start = *end;
246 *temp_end = *end;
247 }
248 }
249
250 static int get_key_object(const string& key, ghobject_t *oid);
251
252 static void get_object_key(CephContext* cct, const ghobject_t& oid,
253 string *key)
254 {
255 key->clear();
256
257 _key_encode_shard(oid.shard_id, key);
258 _key_encode_u64(oid.hobj.pool + 0x8000000000000000ull, key);
259 _key_encode_u32(oid.hobj.get_bitwise_key_u32(), key);
260 key->append(".");
261
262 append_escaped(oid.hobj.nspace, key);
263
264 if (oid.hobj.get_key().length()) {
265 // is a key... could be < = or >.
266 // (ASCII chars < = and > sort in that order, yay)
267 if (oid.hobj.get_key() < oid.hobj.oid.name) {
268 key->append("<");
269 append_escaped(oid.hobj.get_key(), key);
270 append_escaped(oid.hobj.oid.name, key);
271 } else if (oid.hobj.get_key() > oid.hobj.oid.name) {
272 key->append(">");
273 append_escaped(oid.hobj.get_key(), key);
274 append_escaped(oid.hobj.oid.name, key);
275 } else {
276 // same as no key
277 key->append("=");
278 append_escaped(oid.hobj.oid.name, key);
279 }
280 } else {
281 // no key
282 key->append("=");
283 append_escaped(oid.hobj.oid.name, key);
284 }
285
286 _key_encode_u64(oid.hobj.snap, key);
287 _key_encode_u64(oid.generation, key);
288
289 // sanity check
290 if (true) {
291 ghobject_t t;
292 int r = get_key_object(*key, &t);
293 if (r || t != oid) {
294 derr << " r " << r << dendl;
295 derr << "key " << pretty_binary_string(*key) << dendl;
296 derr << "oid " << oid << dendl;
297 derr << " t " << t << dendl;
298 assert(t == oid);
299 }
300 }
301 }
302
303 static int get_key_object(const string& key, ghobject_t *oid)
304 {
305 int r;
306 const char *p = key.c_str();
307
308 p = _key_decode_shard(p, &oid->shard_id);
309
310 uint64_t pool;
311 p = _key_decode_u64(p, &pool);
312 oid->hobj.pool = pool - 0x8000000000000000ull;
313
314 unsigned hash;
315 p = _key_decode_u32(p, &hash);
316 oid->hobj.set_bitwise_key_u32(hash);
317 if (*p != '.')
318 return -5;
319 ++p;
320
321 r = decode_escaped(p, &oid->hobj.nspace);
322 if (r < 0)
323 return -6;
324 p += r + 1;
325
326 if (*p == '=') {
327 // no key
328 ++p;
329 r = decode_escaped(p, &oid->hobj.oid.name);
330 if (r < 0)
331 return -7;
332 p += r + 1;
333 } else if (*p == '<' || *p == '>') {
334 // key + name
335 ++p;
336 string okey;
337 r = decode_escaped(p, &okey);
338 if (r < 0)
339 return -8;
340 p += r + 1;
341 r = decode_escaped(p, &oid->hobj.oid.name);
342 if (r < 0)
343 return -9;
344 p += r + 1;
345 oid->hobj.set_key(okey);
346 } else {
347 // malformed
348 return -10;
349 }
350
351 p = _key_decode_u64(p, &oid->hobj.snap.val);
352 p = _key_decode_u64(p, &oid->generation);
353 if (*p) {
354 // if we get something other than a null terminator here,
355 // something goes wrong.
356 return -12;
357 }
358
359 return 0;
360 }
361
362
363 static void get_data_key(uint64_t nid, uint64_t offset, string *out)
364 {
365 _key_encode_u64(nid, out);
366 _key_encode_u64(offset, out);
367 }
368
369 // '-' < '.' < '~'
370 static void get_omap_header(uint64_t id, string *out)
371 {
372 _key_encode_u64(id, out);
373 out->push_back('-');
374 }
375
376 // hmm, I don't think there's any need to escape the user key since we
377 // have a clean prefix.
378 static void get_omap_key(uint64_t id, const string& key, string *out)
379 {
380 _key_encode_u64(id, out);
381 out->push_back('.');
382 out->append(key);
383 }
384
385 static void rewrite_omap_key(uint64_t id, string old, string *out)
386 {
387 _key_encode_u64(id, out);
388 out->append(old.substr(out->length()));
389 }
390
391 static void decode_omap_key(const string& key, string *user_key)
392 {
393 *user_key = key.substr(sizeof(uint64_t) + 1);
394 }
395
396 static void get_omap_tail(uint64_t id, string *out)
397 {
398 _key_encode_u64(id, out);
399 out->push_back('~');
400 }
401
402
403
404 // Onode
405
406 #undef dout_prefix
407 #define dout_prefix *_dout << "kstore.onode(" << this << ") "
408
409 void KStore::Onode::flush()
410 {
411 std::unique_lock<std::mutex> l(flush_lock);
412 dout(20) << __func__ << " " << flush_txns << dendl;
413 while (!flush_txns.empty())
414 flush_cond.wait(l);
415 dout(20) << __func__ << " done" << dendl;
416 }
417
418 // OnodeHashLRU
419
420 #undef dout_prefix
421 #define dout_prefix *_dout << "kstore.lru(" << this << ") "
422
423 void KStore::OnodeHashLRU::_touch(OnodeRef o)
424 {
425 lru_list_t::iterator p = lru.iterator_to(*o);
426 lru.erase(p);
427 lru.push_front(*o);
428 }
429
430 void KStore::OnodeHashLRU::add(const ghobject_t& oid, OnodeRef o)
431 {
432 std::lock_guard<std::mutex> l(lock);
433 dout(30) << __func__ << " " << oid << " " << o << dendl;
434 assert(onode_map.count(oid) == 0);
435 onode_map[oid] = o;
436 lru.push_front(*o);
437 }
438
439 KStore::OnodeRef KStore::OnodeHashLRU::lookup(const ghobject_t& oid)
440 {
441 std::lock_guard<std::mutex> l(lock);
442 dout(30) << __func__ << dendl;
443 ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.find(oid);
444 if (p == onode_map.end()) {
445 dout(30) << __func__ << " " << oid << " miss" << dendl;
446 return OnodeRef();
447 }
448 dout(30) << __func__ << " " << oid << " hit " << p->second << dendl;
449 _touch(p->second);
450 return p->second;
451 }
452
453 void KStore::OnodeHashLRU::clear()
454 {
455 std::lock_guard<std::mutex> l(lock);
456 dout(10) << __func__ << dendl;
457 lru.clear();
458 onode_map.clear();
459 }
460
461 void KStore::OnodeHashLRU::rename(const ghobject_t& old_oid,
462 const ghobject_t& new_oid)
463 {
464 std::lock_guard<std::mutex> l(lock);
465 dout(30) << __func__ << " " << old_oid << " -> " << new_oid << dendl;
466 ceph::unordered_map<ghobject_t,OnodeRef>::iterator po, pn;
467 po = onode_map.find(old_oid);
468 pn = onode_map.find(new_oid);
469
470 assert(po != onode_map.end());
471 if (pn != onode_map.end()) {
472 lru_list_t::iterator p = lru.iterator_to(*pn->second);
473 lru.erase(p);
474 onode_map.erase(pn);
475 }
476 OnodeRef o = po->second;
477
478 // install a non-existent onode it its place
479 po->second.reset(new Onode(cct, old_oid, o->key));
480 lru.push_back(*po->second);
481
482 // fix oid, key
483 onode_map.insert(make_pair(new_oid, o));
484 _touch(o);
485 o->oid = new_oid;
486 get_object_key(cct, new_oid, &o->key);
487 }
488
489 bool KStore::OnodeHashLRU::get_next(
490 const ghobject_t& after,
491 pair<ghobject_t,OnodeRef> *next)
492 {
493 std::lock_guard<std::mutex> l(lock);
494 dout(20) << __func__ << " after " << after << dendl;
495
496 if (after == ghobject_t()) {
497 if (lru.empty()) {
498 return false;
499 }
500 ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.begin();
501 assert(p != onode_map.end());
502 next->first = p->first;
503 next->second = p->second;
504 return true;
505 }
506
507 ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.find(after);
508 assert(p != onode_map.end()); // for now
509 lru_list_t::iterator pi = lru.iterator_to(*p->second);
510 ++pi;
511 if (pi == lru.end()) {
512 return false;
513 }
514 next->first = pi->oid;
515 next->second = onode_map[pi->oid];
516 return true;
517 }
518
519 int KStore::OnodeHashLRU::trim(int max)
520 {
521 std::lock_guard<std::mutex> l(lock);
522 dout(20) << __func__ << " max " << max
523 << " size " << onode_map.size() << dendl;
524 int trimmed = 0;
525 int num = onode_map.size() - max;
526 if (onode_map.size() == 0 || num <= 0)
527 return 0; // don't even try
528
529 lru_list_t::iterator p = lru.end();
530 if (num)
531 --p;
532 while (num > 0) {
533 Onode *o = &*p;
534 int refs = o->nref.load();
535 if (refs > 1) {
536 dout(20) << __func__ << " " << o->oid << " has " << refs
537 << " refs; stopping with " << num << " left to trim" << dendl;
538 break;
539 }
540 dout(30) << __func__ << " trim " << o->oid << dendl;
541 if (p != lru.begin()) {
542 lru.erase(p--);
543 } else {
544 lru.erase(p);
545 assert(num == 1);
546 }
547 o->get(); // paranoia
548 onode_map.erase(o->oid);
549 o->put();
550 --num;
551 ++trimmed;
552 }
553 return trimmed;
554 }
555
556 // =======================================================
557
558 // Collection
559
560 #undef dout_prefix
561 #define dout_prefix *_dout << "kstore(" << store->path << ").collection(" << cid << ") "
562
563 KStore::Collection::Collection(KStore *ns, coll_t c)
564 : store(ns),
565 cid(c),
566 lock("KStore::Collection::lock", true, false),
567 onode_map(store->cct)
568 {
569 }
570
571 KStore::OnodeRef KStore::Collection::get_onode(
572 const ghobject_t& oid,
573 bool create)
574 {
575 assert(create ? lock.is_wlocked() : lock.is_locked());
576
577 spg_t pgid;
578 if (cid.is_pg(&pgid)) {
579 if (!oid.match(cnode.bits, pgid.ps())) {
580 lderr(store->cct) << __func__ << " oid " << oid << " not part of "
581 << pgid << " bits " << cnode.bits << dendl;
582 ceph_abort();
583 }
584 }
585
586 OnodeRef o = onode_map.lookup(oid);
587 if (o)
588 return o;
589
590 string key;
591 get_object_key(store->cct, oid, &key);
592
593 ldout(store->cct, 20) << __func__ << " oid " << oid << " key "
594 << pretty_binary_string(key) << dendl;
595
596 bufferlist v;
597 int r = store->db->get(PREFIX_OBJ, key, &v);
598 ldout(store->cct, 20) << " r " << r << " v.len " << v.length() << dendl;
599 Onode *on;
600 if (v.length() == 0) {
601 assert(r == -ENOENT);
602 if (!create)
603 return OnodeRef();
604
605 // new
606 on = new Onode(store->cct, oid, key);
607 on->dirty = true;
608 } else {
609 // loaded
610 assert(r >=0);
611 on = new Onode(store->cct, oid, key);
612 on->exists = true;
613 bufferlist::iterator p = v.begin();
614 ::decode(on->onode, p);
615 }
616 o.reset(on);
617 onode_map.add(oid, o);
618 return o;
619 }
620
621
622
623 // =======================================================
624
625 #undef dout_prefix
626 #define dout_prefix *_dout << "kstore(" << path << ") "
627
628 KStore::KStore(CephContext *cct, const string& path)
629 : ObjectStore(cct, path),
630 db(NULL),
631 path_fd(-1),
632 fsid_fd(-1),
633 mounted(false),
634 coll_lock("KStore::coll_lock"),
635 nid_last(0),
636 nid_max(0),
637 throttle_ops(cct, "kstore_max_ops", cct->_conf->kstore_max_ops),
638 throttle_bytes(cct, "kstore_max_bytes", cct->_conf->kstore_max_bytes),
639 finisher(cct),
640 kv_sync_thread(this),
641 kv_stop(false),
642 logger(nullptr)
643 {
644 _init_logger();
645 }
646
647 KStore::~KStore()
648 {
649 _shutdown_logger();
650 assert(!mounted);
651 assert(db == NULL);
652 assert(fsid_fd < 0);
653 }
654
655 void KStore::_init_logger()
656 {
657 // XXX
658 PerfCountersBuilder b(cct, "KStore",
659 l_kstore_first, l_kstore_last);
660 b.add_time_avg(l_kstore_state_prepare_lat, "state_prepare_lat", "Average prepare state latency");
661 b.add_time_avg(l_kstore_state_kv_queued_lat, "state_kv_queued_lat", "Average kv_queued state latency");
662 b.add_time_avg(l_kstore_state_kv_done_lat, "state_kv_done_lat", "Average kv_done state latency");
663 b.add_time_avg(l_kstore_state_finishing_lat, "state_finishing_lat", "Average finishing state latency");
664 b.add_time_avg(l_kstore_state_done_lat, "state_done_lat", "Average done state latency");
665 logger = b.create_perf_counters();
666 cct->get_perfcounters_collection()->add(logger);
667 }
668
669 void KStore::_shutdown_logger()
670 {
671 // XXX
672 cct->get_perfcounters_collection()->remove(logger);
673 delete logger;
674 }
675
676 int KStore::_open_path()
677 {
678 assert(path_fd < 0);
679 path_fd = ::open(path.c_str(), O_DIRECTORY);
680 if (path_fd < 0) {
681 int r = -errno;
682 derr << __func__ << " unable to open " << path << ": " << cpp_strerror(r)
683 << dendl;
684 return r;
685 }
686 return 0;
687 }
688
689 void KStore::_close_path()
690 {
691 VOID_TEMP_FAILURE_RETRY(::close(path_fd));
692 path_fd = -1;
693 }
694
695 int KStore::_open_fsid(bool create)
696 {
697 assert(fsid_fd < 0);
698 int flags = O_RDWR;
699 if (create)
700 flags |= O_CREAT;
701 fsid_fd = ::openat(path_fd, "fsid", flags, 0644);
702 if (fsid_fd < 0) {
703 int err = -errno;
704 derr << __func__ << " " << cpp_strerror(err) << dendl;
705 return err;
706 }
707 return 0;
708 }
709
710 int KStore::_read_fsid(uuid_d *uuid)
711 {
712 char fsid_str[40];
713 memset(fsid_str, 0, sizeof(fsid_str));
714 int ret = safe_read(fsid_fd, fsid_str, sizeof(fsid_str));
715 if (ret < 0) {
716 derr << __func__ << " failed: " << cpp_strerror(ret) << dendl;
717 return ret;
718 }
719 if (ret > 36)
720 fsid_str[36] = 0;
721 else
722 fsid_str[ret] = 0;
723 if (!uuid->parse(fsid_str)) {
724 derr << __func__ << " unparsable uuid " << fsid_str << dendl;
725 return -EINVAL;
726 }
727 return 0;
728 }
729
730 int KStore::_write_fsid()
731 {
732 int r = ::ftruncate(fsid_fd, 0);
733 if (r < 0) {
734 r = -errno;
735 derr << __func__ << " fsid truncate failed: " << cpp_strerror(r) << dendl;
736 return r;
737 }
738 string str = stringify(fsid) + "\n";
739 r = safe_write(fsid_fd, str.c_str(), str.length());
740 if (r < 0) {
741 derr << __func__ << " fsid write failed: " << cpp_strerror(r) << dendl;
742 return r;
743 }
744 r = ::fsync(fsid_fd);
745 if (r < 0) {
746 r = -errno;
747 derr << __func__ << " fsid fsync failed: " << cpp_strerror(r) << dendl;
748 return r;
749 }
750 return 0;
751 }
752
753 void KStore::_close_fsid()
754 {
755 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
756 fsid_fd = -1;
757 }
758
759 int KStore::_lock_fsid()
760 {
761 struct flock l;
762 memset(&l, 0, sizeof(l));
763 l.l_type = F_WRLCK;
764 l.l_whence = SEEK_SET;
765 l.l_start = 0;
766 l.l_len = 0;
767 int r = ::fcntl(fsid_fd, F_SETLK, &l);
768 if (r < 0) {
769 int err = errno;
770 derr << __func__ << " failed to lock " << path << "/fsid"
771 << " (is another ceph-osd still running?)"
772 << cpp_strerror(err) << dendl;
773 return -err;
774 }
775 return 0;
776 }
777
778 bool KStore::test_mount_in_use()
779 {
780 // most error conditions mean the mount is not in use (e.g., because
781 // it doesn't exist). only if we fail to lock do we conclude it is
782 // in use.
783 bool ret = false;
784 int r = _open_path();
785 if (r < 0)
786 return false;
787 r = _open_fsid(false);
788 if (r < 0)
789 goto out_path;
790 r = _lock_fsid();
791 if (r < 0)
792 ret = true; // if we can't lock, it is in use
793 _close_fsid();
794 out_path:
795 _close_path();
796 return ret;
797 }
798
799 int KStore::_open_db(bool create)
800 {
801 int r;
802 assert(!db);
803 char fn[PATH_MAX];
804 snprintf(fn, sizeof(fn), "%s/db", path.c_str());
805
806 string kv_backend;
807 if (create) {
808 kv_backend = cct->_conf->kstore_backend;
809 } else {
810 r = read_meta("kv_backend", &kv_backend);
811 if (r < 0) {
812 derr << __func__ << " uanble to read 'kv_backend' meta" << dendl;
813 return -EIO;
814 }
815 }
816 dout(10) << __func__ << " kv_backend = " << kv_backend << dendl;
817
818 if (create) {
819 int r = ::mkdir(fn, 0755);
820 if (r < 0)
821 r = -errno;
822 if (r < 0 && r != -EEXIST) {
823 derr << __func__ << " failed to create " << fn << ": " << cpp_strerror(r)
824 << dendl;
825 return r;
826 }
827
828 // wal_dir, too!
829 char walfn[PATH_MAX];
830 snprintf(walfn, sizeof(walfn), "%s/db.wal", path.c_str());
831 r = ::mkdir(walfn, 0755);
832 if (r < 0)
833 r = -errno;
834 if (r < 0 && r != -EEXIST) {
835 derr << __func__ << " failed to create " << walfn
836 << ": " << cpp_strerror(r)
837 << dendl;
838 return r;
839 }
840 }
841
842 db = KeyValueDB::create(cct, kv_backend, fn);
843 if (!db) {
844 derr << __func__ << " error creating db" << dendl;
845 return -EIO;
846 }
847 string options;
848 if (kv_backend == "rocksdb")
849 options = cct->_conf->kstore_rocksdb_options;
850 db->init(options);
851 stringstream err;
852 if (create)
853 r = db->create_and_open(err);
854 else
855 r = db->open(err);
856 if (r) {
857 derr << __func__ << " erroring opening db: " << err.str() << dendl;
858 delete db;
859 db = NULL;
860 return -EIO;
861 }
862 dout(1) << __func__ << " opened " << kv_backend
863 << " path " << fn << " options " << options << dendl;
864 return 0;
865 }
866
867 void KStore::_close_db()
868 {
869 assert(db);
870 delete db;
871 db = NULL;
872 }
873
874 int KStore::_open_collections(int *errors)
875 {
876 assert(coll_map.empty());
877 KeyValueDB::Iterator it = db->get_iterator(PREFIX_COLL);
878 for (it->upper_bound(string());
879 it->valid();
880 it->next()) {
881 coll_t cid;
882 if (cid.parse(it->key())) {
883 CollectionRef c(new Collection(this, cid));
884 bufferlist bl = it->value();
885 bufferlist::iterator p = bl.begin();
886 try {
887 ::decode(c->cnode, p);
888 } catch (buffer::error& e) {
889 derr << __func__ << " failed to decode cnode, key:"
890 << pretty_binary_string(it->key()) << dendl;
891 return -EIO;
892 }
893 dout(20) << __func__ << " opened " << cid << dendl;
894 coll_map[cid] = c;
895 } else {
896 derr << __func__ << " unrecognized collection " << it->key() << dendl;
897 if (errors)
898 (*errors)++;
899 }
900 }
901 return 0;
902 }
903
904 int KStore::mkfs()
905 {
906 dout(1) << __func__ << " path " << path << dendl;
907 int r;
908 uuid_d old_fsid;
909
910 r = _open_path();
911 if (r < 0)
912 return r;
913
914 r = _open_fsid(true);
915 if (r < 0)
916 goto out_path_fd;
917
918 r = _lock_fsid();
919 if (r < 0)
920 goto out_close_fsid;
921
922 r = _read_fsid(&old_fsid);
923 if (r < 0 || old_fsid.is_zero()) {
924 if (fsid.is_zero()) {
925 fsid.generate_random();
926 dout(1) << __func__ << " generated fsid " << fsid << dendl;
927 } else {
928 dout(1) << __func__ << " using provided fsid " << fsid << dendl;
929 }
930 // we'll write it last.
931 } else {
932 if (!fsid.is_zero() && fsid != old_fsid) {
933 derr << __func__ << " on-disk fsid " << old_fsid
934 << " != provided " << fsid << dendl;
935 r = -EINVAL;
936 goto out_close_fsid;
937 }
938 fsid = old_fsid;
939 dout(1) << __func__ << " already created, fsid is " << fsid << dendl;
940 goto out_close_fsid;
941 }
942
943 r = _open_db(true);
944 if (r < 0)
945 goto out_close_fsid;
946
947 r = write_meta("kv_backend", cct->_conf->kstore_backend);
948 if (r < 0)
949 goto out_close_db;
950
951 r = write_meta("type", "kstore");
952 if (r < 0)
953 goto out_close_db;
954
955 // indicate mkfs completion/success by writing the fsid file
956 r = _write_fsid();
957 if (r == 0)
958 dout(10) << __func__ << " success" << dendl;
959 else
960 derr << __func__ << " error writing fsid: " << cpp_strerror(r) << dendl;
961
962 out_close_db:
963 _close_db();
964 out_close_fsid:
965 _close_fsid();
966 out_path_fd:
967 _close_path();
968 return r;
969 }
970
971 int KStore::mount()
972 {
973 dout(1) << __func__ << " path " << path << dendl;
974
975 if (cct->_conf->kstore_fsck_on_mount) {
976 int rc = fsck(cct->_conf->kstore_fsck_on_mount_deep);
977 if (rc < 0)
978 return rc;
979 }
980
981 int r = _open_path();
982 if (r < 0)
983 return r;
984 r = _open_fsid(false);
985 if (r < 0)
986 goto out_path;
987
988 r = _read_fsid(&fsid);
989 if (r < 0)
990 goto out_fsid;
991
992 r = _lock_fsid();
993 if (r < 0)
994 goto out_fsid;
995
996 r = _open_db(false);
997 if (r < 0)
998 goto out_fsid;
999
1000 r = _open_super_meta();
1001 if (r < 0)
1002 goto out_db;
1003
1004 r = _open_collections();
1005 if (r < 0)
1006 goto out_db;
1007
1008 finisher.start();
1009 kv_sync_thread.create("kstore_kv_sync");
1010
1011 mounted = true;
1012 return 0;
1013
1014 out_db:
1015 _close_db();
1016 out_fsid:
1017 _close_fsid();
1018 out_path:
1019 _close_path();
1020 return r;
1021 }
1022
1023 int KStore::umount()
1024 {
1025 assert(mounted);
1026 dout(1) << __func__ << dendl;
1027
1028 _sync();
1029 _reap_collections();
1030 coll_map.clear();
1031
1032 dout(20) << __func__ << " stopping kv thread" << dendl;
1033 _kv_stop();
1034 dout(20) << __func__ << " draining finisher" << dendl;
1035 finisher.wait_for_empty();
1036 dout(20) << __func__ << " stopping finisher" << dendl;
1037 finisher.stop();
1038 dout(20) << __func__ << " closing" << dendl;
1039
1040 mounted = false;
1041 _close_db();
1042 _close_fsid();
1043 _close_path();
1044 return 0;
1045 }
1046
1047 int KStore::fsck(bool deep)
1048 {
1049 dout(1) << __func__ << dendl;
1050 int errors = 0;
1051 dout(1) << __func__ << " finish with " << errors << " errors" << dendl;
1052 return errors;
1053 }
1054
1055 void KStore::_sync()
1056 {
1057 dout(10) << __func__ << dendl;
1058
1059 std::unique_lock<std::mutex> l(kv_lock);
1060 while (!kv_committing.empty() ||
1061 !kv_queue.empty()) {
1062 dout(20) << " waiting for kv to commit" << dendl;
1063 kv_sync_cond.wait(l);
1064 }
1065
1066 dout(10) << __func__ << " done" << dendl;
1067 }
1068
1069 int KStore::statfs(struct store_statfs_t* buf)
1070 {
1071 return db->get_statfs(buf);
1072 }
1073
1074 // ---------------
1075 // cache
1076
1077 KStore::CollectionRef KStore::_get_collection(coll_t cid)
1078 {
1079 RWLock::RLocker l(coll_lock);
1080 ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
1081 if (cp == coll_map.end())
1082 return CollectionRef();
1083 return cp->second;
1084 }
1085
1086 void KStore::_queue_reap_collection(CollectionRef& c)
1087 {
1088 dout(10) << __func__ << " " << c->cid << dendl;
1089 std::lock_guard<std::mutex> l(reap_lock);
1090 removed_collections.push_back(c);
1091 }
1092
1093 void KStore::_reap_collections()
1094 {
1095 list<CollectionRef> removed_colls;
1096 std::lock_guard<std::mutex> l(reap_lock);
1097 removed_colls.swap(removed_collections);
1098
1099 for (list<CollectionRef>::iterator p = removed_colls.begin();
1100 p != removed_colls.end();
1101 ++p) {
1102 CollectionRef c = *p;
1103 dout(10) << __func__ << " " << c->cid << dendl;
1104 {
1105 pair<ghobject_t,OnodeRef> next;
1106 while (c->onode_map.get_next(next.first, &next)) {
1107 assert(!next.second->exists);
1108 if (!next.second->flush_txns.empty()) {
1109 dout(10) << __func__ << " " << c->cid << " " << next.second->oid
1110 << " flush_txns " << next.second->flush_txns << dendl;
1111 return;
1112 }
1113 }
1114 }
1115 c->onode_map.clear();
1116 dout(10) << __func__ << " " << c->cid << " done" << dendl;
1117 }
1118
1119 dout(10) << __func__ << " all reaped" << dendl;
1120 }
1121
1122 // ---------------
1123 // read operations
1124
1125 bool KStore::exists(const coll_t& cid, const ghobject_t& oid)
1126 {
1127 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1128 CollectionRef c = _get_collection(cid);
1129 if (!c)
1130 return false;
1131 RWLock::RLocker l(c->lock);
1132 OnodeRef o = c->get_onode(oid, false);
1133 if (!o || !o->exists)
1134 return false;
1135 return true;
1136 }
1137
1138 int KStore::stat(
1139 const coll_t& cid,
1140 const ghobject_t& oid,
1141 struct stat *st,
1142 bool allow_eio)
1143 {
1144 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1145 CollectionRef c = _get_collection(cid);
1146 if (!c)
1147 return -ENOENT;
1148 RWLock::RLocker l(c->lock);
1149 OnodeRef o = c->get_onode(oid, false);
1150 if (!o || !o->exists)
1151 return -ENOENT;
1152 st->st_size = o->onode.size;
1153 st->st_blksize = 4096;
1154 st->st_blocks = (st->st_size + st->st_blksize - 1) / st->st_blksize;
1155 st->st_nlink = 1;
1156 return 0;
1157 }
1158
1159 int KStore::set_collection_opts(
1160 const coll_t& cid,
1161 const pool_opts_t& opts)
1162 {
1163 return -EOPNOTSUPP;
1164 }
1165
1166 int KStore::read(
1167 const coll_t& cid,
1168 const ghobject_t& oid,
1169 uint64_t offset,
1170 size_t length,
1171 bufferlist& bl,
1172 uint32_t op_flags,
1173 bool allow_eio)
1174 {
1175 dout(15) << __func__ << " " << cid << " " << oid
1176 << " " << offset << "~" << length
1177 << dendl;
1178 bl.clear();
1179 CollectionRef c = _get_collection(cid);
1180 if (!c)
1181 return -ENOENT;
1182 RWLock::RLocker l(c->lock);
1183
1184 int r;
1185
1186 OnodeRef o = c->get_onode(oid, false);
1187 if (!o || !o->exists) {
1188 r = -ENOENT;
1189 goto out;
1190 }
1191
1192 if (offset == length && offset == 0)
1193 length = o->onode.size;
1194
1195 r = _do_read(o, offset, length, bl, op_flags);
1196
1197 out:
1198 dout(10) << __func__ << " " << cid << " " << oid
1199 << " " << offset << "~" << length
1200 << " = " << r << dendl;
1201 return r;
1202 }
1203
1204 int KStore::_do_read(
1205 OnodeRef o,
1206 uint64_t offset,
1207 size_t length,
1208 bufferlist& bl,
1209 uint32_t op_flags)
1210 {
1211 int r = 0;
1212 uint64_t stripe_size = o->onode.stripe_size;
1213 uint64_t stripe_off;
1214
1215 dout(20) << __func__ << " " << offset << "~" << length << " size "
1216 << o->onode.size << " nid " << o->onode.nid << dendl;
1217 bl.clear();
1218
1219 if (offset > o->onode.size) {
1220 goto out;
1221 }
1222 if (offset + length > o->onode.size) {
1223 length = o->onode.size - offset;
1224 }
1225 if (stripe_size == 0) {
1226 bl.append_zero(length);
1227 r = length;
1228 goto out;
1229 }
1230
1231 o->flush();
1232
1233 stripe_off = offset % stripe_size;
1234 while (length > 0) {
1235 bufferlist stripe;
1236 _do_read_stripe(o, offset - stripe_off, &stripe);
1237 dout(30) << __func__ << " stripe " << offset - stripe_off << " got "
1238 << stripe.length() << dendl;
1239 unsigned swant = MIN(stripe_size - stripe_off, length);
1240 if (stripe.length()) {
1241 if (swant == stripe.length()) {
1242 bl.claim_append(stripe);
1243 dout(30) << __func__ << " taking full stripe" << dendl;
1244 } else {
1245 unsigned l = 0;
1246 if (stripe_off < stripe.length()) {
1247 l = MIN(stripe.length() - stripe_off, swant);
1248 bufferlist t;
1249 t.substr_of(stripe, stripe_off, l);
1250 bl.claim_append(t);
1251 dout(30) << __func__ << " taking " << stripe_off << "~" << l << dendl;
1252 }
1253 if (l < swant) {
1254 bl.append_zero(swant - l);
1255 dout(30) << __func__ << " adding " << swant - l << " zeros" << dendl;
1256 }
1257 }
1258 } else {
1259 dout(30) << __func__ << " generating " << swant << " zeros" << dendl;
1260 bl.append_zero(swant);
1261 }
1262 offset += swant;
1263 length -= swant;
1264 stripe_off = 0;
1265 }
1266 r = bl.length();
1267 dout(30) << " result:\n";
1268 bl.hexdump(*_dout);
1269 *_dout << dendl;
1270
1271 out:
1272 return r;
1273 }
1274
1275 int KStore::fiemap(
1276 const coll_t& cid,
1277 const ghobject_t& oid,
1278 uint64_t offset,
1279 size_t len,
1280 bufferlist& bl)
1281 {
1282 map<uint64_t, uint64_t> m;
1283 int r = fiemap(cid, oid, offset, len, m);
1284 if (r >= 0) {
1285 ::encode(m, bl);
1286 }
1287
1288 return r;
1289 }
1290
1291 int KStore::fiemap(
1292 const coll_t& cid,
1293 const ghobject_t& oid,
1294 uint64_t offset,
1295 size_t len,
1296 map<uint64_t, uint64_t>& destmap)
1297 {
1298 CollectionRef c = _get_collection(cid);
1299 if (!c)
1300 return -ENOENT;
1301 RWLock::RLocker l(c->lock);
1302
1303 OnodeRef o = c->get_onode(oid, false);
1304 if (!o || !o->exists) {
1305 return -ENOENT;
1306 }
1307
1308 if (offset > o->onode.size)
1309 goto out;
1310
1311 if (offset + len > o->onode.size) {
1312 len = o->onode.size - offset;
1313 }
1314
1315 dout(20) << __func__ << " " << offset << "~" << len << " size "
1316 << o->onode.size << dendl;
1317
1318 // FIXME: do something smarter here
1319 destmap[0] = o->onode.size;
1320
1321 out:
1322 dout(20) << __func__ << " " << offset << "~" << len
1323 << " size = 0 (" << destmap << ")" << dendl;
1324 return 0;
1325 }
1326
1327 int KStore::getattr(
1328 const coll_t& cid,
1329 const ghobject_t& oid,
1330 const char *name,
1331 bufferptr& value)
1332 {
1333 dout(15) << __func__ << " " << cid << " " << oid << " " << name << dendl;
1334 CollectionRef c = _get_collection(cid);
1335 if (!c)
1336 return -ENOENT;
1337 RWLock::RLocker l(c->lock);
1338 int r;
1339 string k(name);
1340
1341 OnodeRef o = c->get_onode(oid, false);
1342 if (!o || !o->exists) {
1343 r = -ENOENT;
1344 goto out;
1345 }
1346
1347 if (!o->onode.attrs.count(k)) {
1348 r = -ENODATA;
1349 goto out;
1350 }
1351 value = o->onode.attrs[k];
1352 r = 0;
1353 out:
1354 dout(10) << __func__ << " " << cid << " " << oid << " " << name
1355 << " = " << r << dendl;
1356 return r;
1357 }
1358
1359 int KStore::getattrs(
1360 const coll_t& cid,
1361 const ghobject_t& oid,
1362 map<string,bufferptr>& aset)
1363 {
1364 dout(15) << __func__ << " " << cid << " " << oid << dendl;
1365 CollectionRef c = _get_collection(cid);
1366 if (!c)
1367 return -ENOENT;
1368 RWLock::RLocker l(c->lock);
1369 int r;
1370
1371 OnodeRef o = c->get_onode(oid, false);
1372 if (!o || !o->exists) {
1373 r = -ENOENT;
1374 goto out;
1375 }
1376 aset = o->onode.attrs;
1377 r = 0;
1378 out:
1379 dout(10) << __func__ << " " << cid << " " << oid
1380 << " = " << r << dendl;
1381 return r;
1382 }
1383
1384 int KStore::list_collections(vector<coll_t>& ls)
1385 {
1386 RWLock::RLocker l(coll_lock);
1387 for (ceph::unordered_map<coll_t, CollectionRef>::iterator p = coll_map.begin();
1388 p != coll_map.end();
1389 ++p)
1390 ls.push_back(p->first);
1391 return 0;
1392 }
1393
1394 bool KStore::collection_exists(const coll_t& c)
1395 {
1396 RWLock::RLocker l(coll_lock);
1397 return coll_map.count(c);
1398 }
1399
1400 int KStore::collection_empty(const coll_t& cid, bool *empty)
1401 {
1402 dout(15) << __func__ << " " << cid << dendl;
1403 vector<ghobject_t> ls;
1404 ghobject_t next;
1405 int r = collection_list(cid, ghobject_t(), ghobject_t::get_max(), 1,
1406 &ls, &next);
1407 if (r < 0) {
1408 derr << __func__ << " collection_list returned: " << cpp_strerror(r)
1409 << dendl;
1410 return r;
1411 }
1412 *empty = ls.empty();
1413 dout(10) << __func__ << " " << cid << " = " << (int)(*empty) << dendl;
1414 return 0;
1415 }
1416
1417 int KStore::collection_bits(const coll_t& cid)
1418 {
1419 dout(15) << __func__ << " " << cid << dendl;
1420 CollectionHandle ch = _get_collection(cid);
1421 if (!ch)
1422 return -ENOENT;
1423 Collection *c = static_cast<Collection*>(ch.get());
1424 RWLock::RLocker l(c->lock);
1425 dout(10) << __func__ << " " << cid << " = " << c->cnode.bits << dendl;
1426 return c->cnode.bits;
1427 }
1428
1429 int KStore::collection_list(
1430 const coll_t& cid, const ghobject_t& start, const ghobject_t& end, int max,
1431 vector<ghobject_t> *ls, ghobject_t *pnext)
1432 {
1433 CollectionHandle c = _get_collection(cid);
1434 if (!c)
1435 return -ENOENT;
1436 return collection_list(c, start, end, max, ls, pnext);
1437 }
1438
1439 int KStore::collection_list(
1440 CollectionHandle &c_, const ghobject_t& start, const ghobject_t& end, int max,
1441 vector<ghobject_t> *ls, ghobject_t *pnext)
1442
1443 {
1444 Collection *c = static_cast<Collection*>(c_.get());
1445 dout(15) << __func__ << " " << c->cid
1446 << " start " << start << " end " << end << " max " << max << dendl;
1447 int r;
1448 {
1449 RWLock::RLocker l(c->lock);
1450 r = _collection_list(c, start, end, max, ls, pnext);
1451 }
1452
1453 dout(10) << __func__ << " " << c->cid
1454 << " start " << start << " end " << end << " max " << max
1455 << " = " << r << ", ls.size() = " << ls->size()
1456 << ", next = " << (pnext ? *pnext : ghobject_t()) << dendl;
1457 return r;
1458 }
1459
1460 int KStore::_collection_list(
1461 Collection* c, const ghobject_t& start, const ghobject_t& end, int max,
1462 vector<ghobject_t> *ls, ghobject_t *pnext)
1463 {
1464 int r = 0;
1465 KeyValueDB::Iterator it;
1466 string temp_start_key, temp_end_key;
1467 string start_key, end_key;
1468 bool set_next = false;
1469 string pend;
1470 bool temp;
1471
1472 ghobject_t static_next;
1473 if (!pnext)
1474 pnext = &static_next;
1475
1476 if (start == ghobject_t::get_max() ||
1477 start.hobj.is_max()) {
1478 goto out;
1479 }
1480 get_coll_key_range(c->cid, c->cnode.bits, &temp_start_key, &temp_end_key,
1481 &start_key, &end_key);
1482 dout(20) << __func__
1483 << " range " << pretty_binary_string(temp_start_key)
1484 << " to " << pretty_binary_string(temp_end_key)
1485 << " and " << pretty_binary_string(start_key)
1486 << " to " << pretty_binary_string(end_key)
1487 << " start " << start << dendl;
1488 it = db->get_iterator(PREFIX_OBJ);
1489 if (start == ghobject_t() || start == c->cid.get_min_hobj()) {
1490 it->upper_bound(temp_start_key);
1491 temp = true;
1492 } else {
1493 string k;
1494 get_object_key(cct, start, &k);
1495 if (start.hobj.is_temp()) {
1496 temp = true;
1497 assert(k >= temp_start_key && k < temp_end_key);
1498 } else {
1499 temp = false;
1500 assert(k >= start_key && k < end_key);
1501 }
1502 dout(20) << " start from " << pretty_binary_string(k)
1503 << " temp=" << (int)temp << dendl;
1504 it->lower_bound(k);
1505 }
1506 if (end.hobj.is_max()) {
1507 pend = temp ? temp_end_key : end_key;
1508 } else {
1509 get_object_key(cct, end, &end_key);
1510 if (end.hobj.is_temp()) {
1511 if (temp)
1512 pend = end_key;
1513 else
1514 goto out;
1515 } else {
1516 pend = temp ? temp_end_key : end_key;
1517 }
1518 }
1519 dout(20) << __func__ << " pend " << pretty_binary_string(pend) << dendl;
1520 while (true) {
1521 if (!it->valid() || it->key() >= pend) {
1522 if (!it->valid())
1523 dout(20) << __func__ << " iterator not valid (end of db?)" << dendl;
1524 else
1525 dout(20) << __func__ << " key " << pretty_binary_string(it->key())
1526 << " > " << end << dendl;
1527 if (temp) {
1528 if (end.hobj.is_temp()) {
1529 break;
1530 }
1531 dout(30) << __func__ << " switch to non-temp namespace" << dendl;
1532 temp = false;
1533 it->upper_bound(start_key);
1534 pend = end_key;
1535 dout(30) << __func__ << " pend " << pretty_binary_string(pend) << dendl;
1536 continue;
1537 }
1538 break;
1539 }
1540 dout(20) << __func__ << " key " << pretty_binary_string(it->key()) << dendl;
1541 ghobject_t oid;
1542 int r = get_key_object(it->key(), &oid);
1543 assert(r == 0);
1544 if (ls->size() >= (unsigned)max) {
1545 dout(20) << __func__ << " reached max " << max << dendl;
1546 *pnext = oid;
1547 set_next = true;
1548 break;
1549 }
1550 ls->push_back(oid);
1551 it->next();
1552 }
1553 out:
1554 if (!set_next) {
1555 *pnext = ghobject_t::get_max();
1556 }
1557 return r;
1558 }
1559
1560 // omap reads
1561
1562 KStore::OmapIteratorImpl::OmapIteratorImpl(
1563 CollectionRef c, OnodeRef o, KeyValueDB::Iterator it)
1564 : c(c), o(o), it(it)
1565 {
1566 RWLock::RLocker l(c->lock);
1567 if (o->onode.omap_head) {
1568 get_omap_key(o->onode.omap_head, string(), &head);
1569 get_omap_tail(o->onode.omap_head, &tail);
1570 it->lower_bound(head);
1571 }
1572 }
1573
1574 int KStore::OmapIteratorImpl::seek_to_first()
1575 {
1576 RWLock::RLocker l(c->lock);
1577 if (o->onode.omap_head) {
1578 it->lower_bound(head);
1579 } else {
1580 it = KeyValueDB::Iterator();
1581 }
1582 return 0;
1583 }
1584
1585 int KStore::OmapIteratorImpl::upper_bound(const string& after)
1586 {
1587 RWLock::RLocker l(c->lock);
1588 if (o->onode.omap_head) {
1589 string key;
1590 get_omap_key(o->onode.omap_head, after, &key);
1591 it->upper_bound(key);
1592 } else {
1593 it = KeyValueDB::Iterator();
1594 }
1595 return 0;
1596 }
1597
1598 int KStore::OmapIteratorImpl::lower_bound(const string& to)
1599 {
1600 RWLock::RLocker l(c->lock);
1601 if (o->onode.omap_head) {
1602 string key;
1603 get_omap_key(o->onode.omap_head, to, &key);
1604 it->lower_bound(key);
1605 } else {
1606 it = KeyValueDB::Iterator();
1607 }
1608 return 0;
1609 }
1610
1611 bool KStore::OmapIteratorImpl::valid()
1612 {
1613 RWLock::RLocker l(c->lock);
1614 if (o->onode.omap_head && it->valid() && it->raw_key().second <= tail) {
1615 return true;
1616 } else {
1617 return false;
1618 }
1619 }
1620
1621 int KStore::OmapIteratorImpl::next(bool validate)
1622 {
1623 RWLock::RLocker l(c->lock);
1624 if (o->onode.omap_head) {
1625 it->next();
1626 return 0;
1627 } else {
1628 return -1;
1629 }
1630 }
1631
1632 string KStore::OmapIteratorImpl::key()
1633 {
1634 RWLock::RLocker l(c->lock);
1635 assert(it->valid());
1636 string db_key = it->raw_key().second;
1637 string user_key;
1638 decode_omap_key(db_key, &user_key);
1639 return user_key;
1640 }
1641
1642 bufferlist KStore::OmapIteratorImpl::value()
1643 {
1644 RWLock::RLocker l(c->lock);
1645 assert(it->valid());
1646 return it->value();
1647 }
1648
1649 int KStore::omap_get(
1650 const coll_t& cid, ///< [in] Collection containing oid
1651 const ghobject_t &oid, ///< [in] Object containing omap
1652 bufferlist *header, ///< [out] omap header
1653 map<string, bufferlist> *out /// < [out] Key to value map
1654 )
1655 {
1656 dout(15) << __func__ << " " << cid << " oid " << oid << dendl;
1657 CollectionRef c = _get_collection(cid);
1658 if (!c)
1659 return -ENOENT;
1660 RWLock::RLocker l(c->lock);
1661 int r = 0;
1662 OnodeRef o = c->get_onode(oid, false);
1663 if (!o || !o->exists) {
1664 r = -ENOENT;
1665 goto out;
1666 }
1667 if (!o->onode.omap_head)
1668 goto out;
1669 o->flush();
1670 {
1671 KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
1672 string head, tail;
1673 get_omap_header(o->onode.omap_head, &head);
1674 get_omap_tail(o->onode.omap_head, &tail);
1675 it->lower_bound(head);
1676 while (it->valid()) {
1677 if (it->key() == head) {
1678 dout(30) << __func__ << " got header" << dendl;
1679 *header = it->value();
1680 } else if (it->key() >= tail) {
1681 dout(30) << __func__ << " reached tail" << dendl;
1682 break;
1683 } else {
1684 string user_key;
1685 decode_omap_key(it->key(), &user_key);
1686 dout(30) << __func__ << " got " << pretty_binary_string(it->key())
1687 << " -> " << user_key << dendl;
1688 assert(it->key() < tail);
1689 (*out)[user_key] = it->value();
1690 }
1691 it->next();
1692 }
1693 }
1694 out:
1695 dout(10) << __func__ << " " << cid << " oid " << oid << " = " << r << dendl;
1696 return r;
1697 }
1698
1699 int KStore::omap_get_header(
1700 const coll_t& cid, ///< [in] Collection containing oid
1701 const ghobject_t &oid, ///< [in] Object containing omap
1702 bufferlist *header, ///< [out] omap header
1703 bool allow_eio ///< [in] don't assert on eio
1704 )
1705 {
1706 dout(15) << __func__ << " " << cid << " oid " << oid << dendl;
1707 CollectionRef c = _get_collection(cid);
1708 if (!c)
1709 return -ENOENT;
1710 RWLock::RLocker l(c->lock);
1711 int r = 0;
1712 OnodeRef o = c->get_onode(oid, false);
1713 if (!o || !o->exists) {
1714 r = -ENOENT;
1715 goto out;
1716 }
1717 if (!o->onode.omap_head)
1718 goto out;
1719 o->flush();
1720 {
1721 string head;
1722 get_omap_header(o->onode.omap_head, &head);
1723 if (db->get(PREFIX_OMAP, head, header) >= 0) {
1724 dout(30) << __func__ << " got header" << dendl;
1725 } else {
1726 dout(30) << __func__ << " no header" << dendl;
1727 }
1728 }
1729 out:
1730 dout(10) << __func__ << " " << cid << " oid " << oid << " = " << r << dendl;
1731 return r;
1732 }
1733
1734 int KStore::omap_get_keys(
1735 const coll_t& cid, ///< [in] Collection containing oid
1736 const ghobject_t &oid, ///< [in] Object containing omap
1737 set<string> *keys ///< [out] Keys defined on oid
1738 )
1739 {
1740 dout(15) << __func__ << " " << cid << " oid " << oid << dendl;
1741 CollectionRef c = _get_collection(cid);
1742 if (!c)
1743 return -ENOENT;
1744 RWLock::RLocker l(c->lock);
1745 int r = 0;
1746 OnodeRef o = c->get_onode(oid, false);
1747 if (!o || !o->exists) {
1748 r = -ENOENT;
1749 goto out;
1750 }
1751 if (!o->onode.omap_head)
1752 goto out;
1753 o->flush();
1754 {
1755 KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
1756 string head, tail;
1757 get_omap_key(o->onode.omap_head, string(), &head);
1758 get_omap_tail(o->onode.omap_head, &tail);
1759 it->lower_bound(head);
1760 while (it->valid()) {
1761 if (it->key() >= tail) {
1762 dout(30) << __func__ << " reached tail" << dendl;
1763 break;
1764 }
1765 string user_key;
1766 decode_omap_key(it->key(), &user_key);
1767 dout(30) << __func__ << " got " << pretty_binary_string(it->key())
1768 << " -> " << user_key << dendl;
1769 assert(it->key() < tail);
1770 keys->insert(user_key);
1771 it->next();
1772 }
1773 }
1774 out:
1775 dout(10) << __func__ << " " << cid << " oid " << oid << " = " << r << dendl;
1776 return r;
1777 }
1778
1779 int KStore::omap_get_values(
1780 const coll_t& cid, ///< [in] Collection containing oid
1781 const ghobject_t &oid, ///< [in] Object containing omap
1782 const set<string> &keys, ///< [in] Keys to get
1783 map<string, bufferlist> *out ///< [out] Returned keys and values
1784 )
1785 {
1786 dout(15) << __func__ << " " << cid << " oid " << oid << dendl;
1787 CollectionRef c = _get_collection(cid);
1788 if (!c)
1789 return -ENOENT;
1790 RWLock::RLocker l(c->lock);
1791 int r = 0;
1792 OnodeRef o = c->get_onode(oid, false);
1793 if (!o || !o->exists) {
1794 r = -ENOENT;
1795 goto out;
1796 }
1797 if (!o->onode.omap_head)
1798 goto out;
1799 o->flush();
1800 for (set<string>::const_iterator p = keys.begin(); p != keys.end(); ++p) {
1801 string key;
1802 get_omap_key(o->onode.omap_head, *p, &key);
1803 bufferlist val;
1804 if (db->get(PREFIX_OMAP, key, &val) >= 0) {
1805 dout(30) << __func__ << " got " << pretty_binary_string(key)
1806 << " -> " << *p << dendl;
1807 out->insert(make_pair(*p, val));
1808 }
1809 }
1810 out:
1811 dout(10) << __func__ << " " << cid << " oid " << oid << " = " << r << dendl;
1812 return r;
1813 }
1814
1815 int KStore::omap_check_keys(
1816 const coll_t& cid, ///< [in] Collection containing oid
1817 const ghobject_t &oid, ///< [in] Object containing omap
1818 const set<string> &keys, ///< [in] Keys to check
1819 set<string> *out ///< [out] Subset of keys defined on oid
1820 )
1821 {
1822 dout(15) << __func__ << " " << cid << " oid " << oid << dendl;
1823 CollectionRef c = _get_collection(cid);
1824 if (!c)
1825 return -ENOENT;
1826 RWLock::RLocker l(c->lock);
1827 int r = 0;
1828 OnodeRef o = c->get_onode(oid, false);
1829 if (!o || !o->exists) {
1830 r = -ENOENT;
1831 goto out;
1832 }
1833 if (!o->onode.omap_head)
1834 goto out;
1835 o->flush();
1836 for (set<string>::const_iterator p = keys.begin(); p != keys.end(); ++p) {
1837 string key;
1838 get_omap_key(o->onode.omap_head, *p, &key);
1839 bufferlist val;
1840 if (db->get(PREFIX_OMAP, key, &val) >= 0) {
1841 dout(30) << __func__ << " have " << pretty_binary_string(key)
1842 << " -> " << *p << dendl;
1843 out->insert(*p);
1844 } else {
1845 dout(30) << __func__ << " miss " << pretty_binary_string(key)
1846 << " -> " << *p << dendl;
1847 }
1848 }
1849 out:
1850 dout(10) << __func__ << " " << cid << " oid " << oid << " = " << r << dendl;
1851 return r;
1852 }
1853
1854 ObjectMap::ObjectMapIterator KStore::get_omap_iterator(
1855 const coll_t& cid, ///< [in] collection
1856 const ghobject_t &oid ///< [in] object
1857 )
1858 {
1859
1860 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1861 CollectionRef c = _get_collection(cid);
1862 if (!c) {
1863 dout(10) << __func__ << " " << cid << "doesn't exist" <<dendl;
1864 return ObjectMap::ObjectMapIterator();
1865 }
1866 RWLock::RLocker l(c->lock);
1867 OnodeRef o = c->get_onode(oid, false);
1868 if (!o || !o->exists) {
1869 dout(10) << __func__ << " " << oid << "doesn't exist" <<dendl;
1870 return ObjectMap::ObjectMapIterator();
1871 }
1872 o->flush();
1873 dout(10) << __func__ << " header = " << o->onode.omap_head <<dendl;
1874 KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
1875 return ObjectMap::ObjectMapIterator(new OmapIteratorImpl(c, o, it));
1876 }
1877
1878
1879 // -----------------
1880 // write helpers
1881
1882 int KStore::_open_super_meta()
1883 {
1884 // nid
1885 {
1886 nid_max = 0;
1887 bufferlist bl;
1888 db->get(PREFIX_SUPER, "nid_max", &bl);
1889 bufferlist::iterator p = bl.begin();
1890 try {
1891 ::decode(nid_max, p);
1892 } catch (buffer::error& e) {
1893 }
1894 dout(10) << __func__ << " old nid_max " << nid_max << dendl;
1895 nid_last = nid_max;
1896 }
1897 return 0;
1898 }
1899
1900 void KStore::_assign_nid(TransContext *txc, OnodeRef o)
1901 {
1902 if (o->onode.nid)
1903 return;
1904 std::lock_guard<std::mutex> l(nid_lock);
1905 o->onode.nid = ++nid_last;
1906 dout(20) << __func__ << " " << o->oid << " nid " << o->onode.nid << dendl;
1907 if (nid_last > nid_max) {
1908 nid_max += cct->_conf->kstore_nid_prealloc;
1909 bufferlist bl;
1910 ::encode(nid_max, bl);
1911 txc->t->set(PREFIX_SUPER, "nid_max", bl);
1912 dout(10) << __func__ << " nid_max now " << nid_max << dendl;
1913 }
1914 }
1915
1916 KStore::TransContext *KStore::_txc_create(OpSequencer *osr)
1917 {
1918 TransContext *txc = new TransContext(osr);
1919 txc->t = db->get_transaction();
1920 osr->queue_new(txc);
1921 dout(20) << __func__ << " osr " << osr << " = " << txc << dendl;
1922 return txc;
1923 }
1924
1925 void KStore::_txc_state_proc(TransContext *txc)
1926 {
1927 while (true) {
1928 dout(10) << __func__ << " txc " << txc
1929 << " " << txc->get_state_name() << dendl;
1930 switch (txc->state) {
1931 case TransContext::STATE_PREPARE:
1932 txc->log_state_latency(logger, l_kstore_state_prepare_lat);
1933 txc->state = TransContext::STATE_KV_QUEUED;
1934 if (!cct->_conf->kstore_sync_transaction) {
1935 std::lock_guard<std::mutex> l(kv_lock);
1936 if (cct->_conf->kstore_sync_submit_transaction) {
1937 int r = db->submit_transaction(txc->t);
1938 assert(r == 0);
1939 }
1940 kv_queue.push_back(txc);
1941 kv_cond.notify_one();
1942 return;
1943 }
1944 {
1945 int r = db->submit_transaction_sync(txc->t);
1946 assert(r == 0);
1947 }
1948 break;
1949
1950 case TransContext::STATE_KV_QUEUED:
1951 txc->log_state_latency(logger, l_kstore_state_kv_queued_lat);
1952 txc->state = TransContext::STATE_KV_DONE;
1953 _txc_finish_kv(txc);
1954 // ** fall-thru **
1955
1956 case TransContext::STATE_KV_DONE:
1957 txc->log_state_latency(logger, l_kstore_state_kv_done_lat);
1958 txc->state = TransContext::STATE_FINISHING;
1959 // ** fall-thru **
1960
1961 case TransContext::TransContext::STATE_FINISHING:
1962 txc->log_state_latency(logger, l_kstore_state_finishing_lat);
1963 _txc_finish(txc);
1964 return;
1965
1966 default:
1967 derr << __func__ << " unexpected txc " << txc
1968 << " state " << txc->get_state_name() << dendl;
1969 assert(0 == "unexpected txc state");
1970 return;
1971 }
1972 }
1973 }
1974
1975 void KStore::_txc_finalize(OpSequencer *osr, TransContext *txc)
1976 {
1977 dout(20) << __func__ << " osr " << osr << " txc " << txc
1978 << " onodes " << txc->onodes << dendl;
1979
1980 // finalize onodes
1981 for (set<OnodeRef>::iterator p = txc->onodes.begin();
1982 p != txc->onodes.end();
1983 ++p) {
1984 bufferlist bl;
1985 ::encode((*p)->onode, bl);
1986 dout(20) << " onode size is " << bl.length() << dendl;
1987 txc->t->set(PREFIX_OBJ, (*p)->key, bl);
1988
1989 std::lock_guard<std::mutex> l((*p)->flush_lock);
1990 (*p)->flush_txns.insert(txc);
1991 }
1992 }
1993
1994 void KStore::_txc_finish_kv(TransContext *txc)
1995 {
1996 dout(20) << __func__ << " txc " << txc << dendl;
1997
1998 // warning: we're calling onreadable_sync inside the sequencer lock
1999 if (txc->onreadable_sync) {
2000 txc->onreadable_sync->complete(0);
2001 txc->onreadable_sync = NULL;
2002 }
2003 if (txc->onreadable) {
2004 finisher.queue(txc->onreadable);
2005 txc->onreadable = NULL;
2006 }
2007 if (txc->oncommit) {
2008 finisher.queue(txc->oncommit);
2009 txc->oncommit = NULL;
2010 }
2011 if (!txc->oncommits.empty()) {
2012 finisher.queue(txc->oncommits);
2013 }
2014
2015 throttle_ops.put(txc->ops);
2016 throttle_bytes.put(txc->bytes);
2017 }
2018
2019 void KStore::_txc_finish(TransContext *txc)
2020 {
2021 dout(20) << __func__ << " " << txc << " onodes " << txc->onodes << dendl;
2022 assert(txc->state == TransContext::STATE_FINISHING);
2023
2024 for (set<OnodeRef>::iterator p = txc->onodes.begin();
2025 p != txc->onodes.end();
2026 ++p) {
2027 std::lock_guard<std::mutex> l((*p)->flush_lock);
2028 dout(20) << __func__ << " onode " << *p << " had " << (*p)->flush_txns
2029 << dendl;
2030 assert((*p)->flush_txns.count(txc));
2031 (*p)->flush_txns.erase(txc);
2032 if ((*p)->flush_txns.empty()) {
2033 (*p)->flush_cond.notify_all();
2034 (*p)->clear_pending_stripes();
2035 }
2036 }
2037
2038 // clear out refs
2039 txc->onodes.clear();
2040
2041 while (!txc->removed_collections.empty()) {
2042 _queue_reap_collection(txc->removed_collections.front());
2043 txc->removed_collections.pop_front();
2044 }
2045
2046 OpSequencerRef osr = txc->osr;
2047 {
2048 std::lock_guard<std::mutex> l(osr->qlock);
2049 txc->state = TransContext::STATE_DONE;
2050 }
2051
2052 _osr_reap_done(osr.get());
2053 }
2054
2055 void KStore::_osr_reap_done(OpSequencer *osr)
2056 {
2057 std::lock_guard<std::mutex> l(osr->qlock);
2058 dout(20) << __func__ << " osr " << osr << dendl;
2059 while (!osr->q.empty()) {
2060 TransContext *txc = &osr->q.front();
2061 dout(20) << __func__ << " txc " << txc << " " << txc->get_state_name()
2062 << dendl;
2063 if (txc->state != TransContext::STATE_DONE) {
2064 break;
2065 }
2066
2067 if (txc->first_collection) {
2068 txc->first_collection->onode_map.trim(cct->_conf->kstore_onode_map_size);
2069 }
2070
2071 osr->q.pop_front();
2072 txc->log_state_latency(logger, l_kstore_state_done_lat);
2073 delete txc;
2074 osr->qcond.notify_all();
2075 if (osr->q.empty())
2076 dout(20) << __func__ << " osr " << osr << " q now empty" << dendl;
2077 }
2078 }
2079
2080 void KStore::_kv_sync_thread()
2081 {
2082 dout(10) << __func__ << " start" << dendl;
2083 std::unique_lock<std::mutex> l(kv_lock);
2084 while (true) {
2085 assert(kv_committing.empty());
2086 if (kv_queue.empty()) {
2087 if (kv_stop)
2088 break;
2089 dout(20) << __func__ << " sleep" << dendl;
2090 kv_sync_cond.notify_all();
2091 kv_cond.wait(l);
2092 dout(20) << __func__ << " wake" << dendl;
2093 } else {
2094 dout(20) << __func__ << " committing " << kv_queue.size() << dendl;
2095 kv_committing.swap(kv_queue);
2096 utime_t start = ceph_clock_now();
2097 l.unlock();
2098
2099 dout(30) << __func__ << " committing txc " << kv_committing << dendl;
2100
2101 // one transaction to force a sync
2102 KeyValueDB::Transaction t = db->get_transaction();
2103 if (!cct->_conf->kstore_sync_submit_transaction) {
2104 for (std::deque<TransContext *>::iterator it = kv_committing.begin();
2105 it != kv_committing.end();
2106 ++it) {
2107 int r = db->submit_transaction((*it)->t);
2108 assert(r == 0);
2109 }
2110 }
2111 int r = db->submit_transaction_sync(t);
2112 assert(r == 0);
2113 utime_t finish = ceph_clock_now();
2114 utime_t dur = finish - start;
2115 dout(20) << __func__ << " committed " << kv_committing.size()
2116 << " in " << dur << dendl;
2117 while (!kv_committing.empty()) {
2118 TransContext *txc = kv_committing.front();
2119 _txc_state_proc(txc);
2120 kv_committing.pop_front();
2121 }
2122
2123 // this is as good a place as any ...
2124 _reap_collections();
2125
2126 l.lock();
2127 }
2128 }
2129 dout(10) << __func__ << " finish" << dendl;
2130 }
2131
2132
2133 // ---------------------------
2134 // transactions
2135
2136 int KStore::queue_transactions(
2137 Sequencer *posr,
2138 vector<Transaction>& tls,
2139 TrackedOpRef op,
2140 ThreadPool::TPHandle *handle)
2141 {
2142 Context *onreadable;
2143 Context *ondisk;
2144 Context *onreadable_sync;
2145 ObjectStore::Transaction::collect_contexts(
2146 tls, &onreadable, &ondisk, &onreadable_sync);
2147
2148 // set up the sequencer
2149 OpSequencer *osr;
2150 assert(posr);
2151 if (posr->p) {
2152 osr = static_cast<OpSequencer *>(posr->p.get());
2153 dout(10) << __func__ << " existing " << osr << " " << *osr << dendl;
2154 } else {
2155 osr = new OpSequencer(cct);
2156 osr->parent = posr;
2157 posr->p = osr;
2158 dout(10) << __func__ << " new " << osr << " " << *osr << dendl;
2159 }
2160
2161 // prepare
2162 TransContext *txc = _txc_create(osr);
2163 txc->onreadable = onreadable;
2164 txc->onreadable_sync = onreadable_sync;
2165 txc->oncommit = ondisk;
2166
2167 for (vector<Transaction>::iterator p = tls.begin(); p != tls.end(); ++p) {
2168 (*p).set_osr(osr);
2169 txc->ops += (*p).get_num_ops();
2170 txc->bytes += (*p).get_num_bytes();
2171 _txc_add_transaction(txc, &(*p));
2172 }
2173
2174 _txc_finalize(osr, txc);
2175
2176 throttle_ops.get(txc->ops);
2177 throttle_bytes.get(txc->bytes);
2178
2179 // execute (start)
2180 _txc_state_proc(txc);
2181 return 0;
2182 }
2183
2184 void KStore::_txc_add_transaction(TransContext *txc, Transaction *t)
2185 {
2186 Transaction::iterator i = t->begin();
2187
2188 dout(30) << __func__ << " transaction dump:\n";
2189 JSONFormatter f(true);
2190 f.open_object_section("transaction");
2191 t->dump(&f);
2192 f.close_section();
2193 f.flush(*_dout);
2194 *_dout << dendl;
2195
2196 vector<CollectionRef> cvec(i.colls.size());
2197 unsigned j = 0;
2198 for (vector<coll_t>::iterator p = i.colls.begin(); p != i.colls.end();
2199 ++p, ++j) {
2200 cvec[j] = _get_collection(*p);
2201
2202 // note first collection we reference
2203 if (!j && !txc->first_collection)
2204 txc->first_collection = cvec[j];
2205 }
2206 vector<OnodeRef> ovec(i.objects.size());
2207
2208 for (int pos = 0; i.have_op(); ++pos) {
2209 Transaction::Op *op = i.decode_op();
2210 int r = 0;
2211
2212 // no coll or obj
2213 if (op->op == Transaction::OP_NOP)
2214 continue;
2215
2216 // collection operations
2217 CollectionRef &c = cvec[op->cid];
2218 switch (op->op) {
2219 case Transaction::OP_RMCOLL:
2220 {
2221 coll_t cid = i.get_cid(op->cid);
2222 r = _remove_collection(txc, cid, &c);
2223 if (!r)
2224 continue;
2225 }
2226 break;
2227
2228 case Transaction::OP_MKCOLL:
2229 {
2230 assert(!c);
2231 coll_t cid = i.get_cid(op->cid);
2232 r = _create_collection(txc, cid, op->split_bits, &c);
2233 if (!r)
2234 continue;
2235 }
2236 break;
2237
2238 case Transaction::OP_SPLIT_COLLECTION:
2239 assert(0 == "deprecated");
2240 break;
2241
2242 case Transaction::OP_SPLIT_COLLECTION2:
2243 {
2244 uint32_t bits = op->split_bits;
2245 uint32_t rem = op->split_rem;
2246 r = _split_collection(txc, c, cvec[op->dest_cid], bits, rem);
2247 if (!r)
2248 continue;
2249 }
2250 break;
2251
2252 case Transaction::OP_COLL_HINT:
2253 {
2254 uint32_t type = op->hint_type;
2255 bufferlist hint;
2256 i.decode_bl(hint);
2257 bufferlist::iterator hiter = hint.begin();
2258 if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) {
2259 uint32_t pg_num;
2260 uint64_t num_objs;
2261 ::decode(pg_num, hiter);
2262 ::decode(num_objs, hiter);
2263 dout(10) << __func__ << " collection hint objects is a no-op, "
2264 << " pg_num " << pg_num << " num_objects " << num_objs
2265 << dendl;
2266 } else {
2267 // Ignore the hint
2268 dout(10) << __func__ << " unknown collection hint " << type << dendl;
2269 }
2270 continue;
2271 }
2272 break;
2273
2274 case Transaction::OP_COLL_SETATTR:
2275 r = -EOPNOTSUPP;
2276 break;
2277
2278 case Transaction::OP_COLL_RMATTR:
2279 r = -EOPNOTSUPP;
2280 break;
2281
2282 case Transaction::OP_COLL_RENAME:
2283 assert(0 == "not implemented");
2284 break;
2285 }
2286 if (r < 0) {
2287 derr << " error " << cpp_strerror(r)
2288 << " not handled on operation " << op->op
2289 << " (op " << pos << ", counting from 0)" << dendl;
2290 dout(0) << " transaction dump:\n";
2291 JSONFormatter f(true);
2292 f.open_object_section("transaction");
2293 t->dump(&f);
2294 f.close_section();
2295 f.flush(*_dout);
2296 *_dout << dendl;
2297 assert(0 == "unexpected error");
2298 }
2299
2300 // object operations
2301 RWLock::WLocker l(c->lock);
2302 OnodeRef &o = ovec[op->oid];
2303 if (!o) {
2304 // these operations implicity create the object
2305 bool create = false;
2306 if (op->op == Transaction::OP_TOUCH ||
2307 op->op == Transaction::OP_WRITE ||
2308 op->op == Transaction::OP_ZERO) {
2309 create = true;
2310 }
2311 ghobject_t oid = i.get_oid(op->oid);
2312 o = c->get_onode(oid, create);
2313 if (!create) {
2314 if (!o || !o->exists) {
2315 dout(10) << __func__ << " op " << op->op << " got ENOENT on "
2316 << oid << dendl;
2317 r = -ENOENT;
2318 goto endop;
2319 }
2320 }
2321 }
2322
2323 switch (op->op) {
2324 case Transaction::OP_TOUCH:
2325 r = _touch(txc, c, o);
2326 break;
2327
2328 case Transaction::OP_WRITE:
2329 {
2330 uint64_t off = op->off;
2331 uint64_t len = op->len;
2332 uint32_t fadvise_flags = i.get_fadvise_flags();
2333 bufferlist bl;
2334 i.decode_bl(bl);
2335 r = _write(txc, c, o, off, len, bl, fadvise_flags);
2336 }
2337 break;
2338
2339 case Transaction::OP_ZERO:
2340 {
2341 uint64_t off = op->off;
2342 uint64_t len = op->len;
2343 r = _zero(txc, c, o, off, len);
2344 }
2345 break;
2346
2347 case Transaction::OP_TRIMCACHE:
2348 {
2349 // deprecated, no-op
2350 }
2351 break;
2352
2353 case Transaction::OP_TRUNCATE:
2354 {
2355 uint64_t off = op->off;
2356 r = _truncate(txc, c, o, off);
2357 }
2358 break;
2359
2360 case Transaction::OP_REMOVE:
2361 r = _remove(txc, c, o);
2362 break;
2363
2364 case Transaction::OP_SETATTR:
2365 {
2366 string name = i.decode_string();
2367 bufferlist bl;
2368 i.decode_bl(bl);
2369 map<string, bufferptr> to_set;
2370 to_set[name] = bufferptr(bl.c_str(), bl.length());
2371 r = _setattrs(txc, c, o, to_set);
2372 }
2373 break;
2374
2375 case Transaction::OP_SETATTRS:
2376 {
2377 map<string, bufferptr> aset;
2378 i.decode_attrset(aset);
2379 r = _setattrs(txc, c, o, aset);
2380 }
2381 break;
2382
2383 case Transaction::OP_RMATTR:
2384 {
2385 string name = i.decode_string();
2386 r = _rmattr(txc, c, o, name);
2387 }
2388 break;
2389
2390 case Transaction::OP_RMATTRS:
2391 {
2392 r = _rmattrs(txc, c, o);
2393 }
2394 break;
2395
2396 case Transaction::OP_CLONE:
2397 {
2398 const ghobject_t& noid = i.get_oid(op->dest_oid);
2399 OnodeRef no = c->get_onode(noid, true);
2400 r = _clone(txc, c, o, no);
2401 }
2402 break;
2403
2404 case Transaction::OP_CLONERANGE:
2405 assert(0 == "deprecated");
2406 break;
2407
2408 case Transaction::OP_CLONERANGE2:
2409 {
2410 const ghobject_t& noid = i.get_oid(op->dest_oid);
2411 OnodeRef no = c->get_onode(noid, true);
2412 uint64_t srcoff = op->off;
2413 uint64_t len = op->len;
2414 uint64_t dstoff = op->dest_off;
2415 r = _clone_range(txc, c, o, no, srcoff, len, dstoff);
2416 }
2417 break;
2418
2419 case Transaction::OP_COLL_ADD:
2420 assert(0 == "not implemented");
2421 break;
2422
2423 case Transaction::OP_COLL_REMOVE:
2424 assert(0 == "not implemented");
2425 break;
2426
2427 case Transaction::OP_COLL_MOVE:
2428 assert(0 == "deprecated");
2429 break;
2430
2431 case Transaction::OP_COLL_MOVE_RENAME:
2432 {
2433 assert(op->cid == op->dest_cid);
2434 const ghobject_t& noid = i.get_oid(op->dest_oid);
2435 OnodeRef no = c->get_onode(noid, true);
2436 r = _rename(txc, c, o, no, noid);
2437 o.reset();
2438 }
2439 break;
2440
2441 case Transaction::OP_TRY_RENAME:
2442 {
2443 const ghobject_t& noid = i.get_oid(op->dest_oid);
2444 OnodeRef no = c->get_onode(noid, true);
2445 r = _rename(txc, c, o, no, noid);
2446 if (r == -ENOENT)
2447 r = 0;
2448 o.reset();
2449 }
2450 break;
2451
2452 case Transaction::OP_OMAP_CLEAR:
2453 {
2454 r = _omap_clear(txc, c, o);
2455 }
2456 break;
2457 case Transaction::OP_OMAP_SETKEYS:
2458 {
2459 bufferlist aset_bl;
2460 i.decode_attrset_bl(&aset_bl);
2461 r = _omap_setkeys(txc, c, o, aset_bl);
2462 }
2463 break;
2464 case Transaction::OP_OMAP_RMKEYS:
2465 {
2466 bufferlist keys_bl;
2467 i.decode_keyset_bl(&keys_bl);
2468 r = _omap_rmkeys(txc, c, o, keys_bl);
2469 }
2470 break;
2471 case Transaction::OP_OMAP_RMKEYRANGE:
2472 {
2473 string first, last;
2474 first = i.decode_string();
2475 last = i.decode_string();
2476 r = _omap_rmkey_range(txc, c, o, first, last);
2477 }
2478 break;
2479 case Transaction::OP_OMAP_SETHEADER:
2480 {
2481 bufferlist bl;
2482 i.decode_bl(bl);
2483 r = _omap_setheader(txc, c, o, bl);
2484 }
2485 break;
2486
2487 case Transaction::OP_SETALLOCHINT:
2488 {
2489 uint64_t expected_object_size = op->expected_object_size;
2490 uint64_t expected_write_size = op->expected_write_size;
2491 uint32_t flags = op->alloc_hint_flags;
2492 r = _setallochint(txc, c, o,
2493 expected_object_size,
2494 expected_write_size,
2495 flags);
2496 }
2497 break;
2498
2499 default:
2500 derr << "bad op " << op->op << dendl;
2501 ceph_abort();
2502 }
2503
2504 endop:
2505 if (r < 0) {
2506 bool ok = false;
2507
2508 if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE ||
2509 op->op == Transaction::OP_CLONE ||
2510 op->op == Transaction::OP_CLONERANGE2 ||
2511 op->op == Transaction::OP_COLL_ADD))
2512 // -ENOENT is usually okay
2513 ok = true;
2514 if (r == -ENODATA)
2515 ok = true;
2516
2517 if (!ok) {
2518 const char *msg = "unexpected error code";
2519
2520 if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE ||
2521 op->op == Transaction::OP_CLONE ||
2522 op->op == Transaction::OP_CLONERANGE2))
2523 msg = "ENOENT on clone suggests osd bug";
2524
2525 if (r == -ENOSPC)
2526 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
2527 // by partially applying transactions.
2528 msg = "ENOSPC from key value store, misconfigured cluster";
2529
2530 if (r == -ENOTEMPTY) {
2531 msg = "ENOTEMPTY suggests garbage data in osd data dir";
2532 }
2533
2534 dout(0) << " error " << cpp_strerror(r) << " not handled on operation " << op->op
2535 << " (op " << pos << ", counting from 0)" << dendl;
2536 dout(0) << msg << dendl;
2537 dout(0) << " transaction dump:\n";
2538 JSONFormatter f(true);
2539 f.open_object_section("transaction");
2540 t->dump(&f);
2541 f.close_section();
2542 f.flush(*_dout);
2543 *_dout << dendl;
2544 assert(0 == "unexpected error");
2545 }
2546 }
2547 }
2548 }
2549
2550
2551
2552 // -----------------
2553 // write operations
2554
2555 int KStore::_touch(TransContext *txc,
2556 CollectionRef& c,
2557 OnodeRef &o)
2558 {
2559 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2560 int r = 0;
2561 o->exists = true;
2562 _assign_nid(txc, o);
2563 txc->write_onode(o);
2564 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
2565 return r;
2566 }
2567
2568 void KStore::_dump_onode(OnodeRef o)
2569 {
2570 dout(30) << __func__ << " " << o
2571 << " nid " << o->onode.nid
2572 << " size " << o->onode.size
2573 << " expected_object_size " << o->onode.expected_object_size
2574 << " expected_write_size " << o->onode.expected_write_size
2575 << dendl;
2576 for (map<string,bufferptr>::iterator p = o->onode.attrs.begin();
2577 p != o->onode.attrs.end();
2578 ++p) {
2579 dout(30) << __func__ << " attr " << p->first
2580 << " len " << p->second.length() << dendl;
2581 }
2582 }
2583
2584 void KStore::_do_read_stripe(OnodeRef o, uint64_t offset, bufferlist *pbl)
2585 {
2586 map<uint64_t,bufferlist>::iterator p = o->pending_stripes.find(offset);
2587 if (p == o->pending_stripes.end()) {
2588 string key;
2589 get_data_key(o->onode.nid, offset, &key);
2590 db->get(PREFIX_DATA, key, pbl);
2591 o->pending_stripes[offset] = *pbl;
2592 } else {
2593 *pbl = p->second;
2594 }
2595 }
2596
2597 void KStore::_do_write_stripe(TransContext *txc, OnodeRef o,
2598 uint64_t offset, bufferlist& bl)
2599 {
2600 o->pending_stripes[offset] = bl;
2601 string key;
2602 get_data_key(o->onode.nid, offset, &key);
2603 txc->t->set(PREFIX_DATA, key, bl);
2604 }
2605
2606 void KStore::_do_remove_stripe(TransContext *txc, OnodeRef o, uint64_t offset)
2607 {
2608 o->pending_stripes.erase(offset);
2609 string key;
2610 get_data_key(o->onode.nid, offset, &key);
2611 txc->t->rmkey(PREFIX_DATA, key);
2612 }
2613
2614 int KStore::_do_write(TransContext *txc,
2615 OnodeRef o,
2616 uint64_t offset, uint64_t length,
2617 bufferlist& orig_bl,
2618 uint32_t fadvise_flags)
2619 {
2620 int r = 0;
2621
2622 dout(20) << __func__
2623 << " " << o->oid << " " << offset << "~" << length
2624 << " - have " << o->onode.size
2625 << " bytes, nid " << o->onode.nid << dendl;
2626 _dump_onode(o);
2627 o->exists = true;
2628
2629 if (length == 0) {
2630 return 0;
2631 }
2632
2633 uint64_t stripe_size = o->onode.stripe_size;
2634 if (!stripe_size) {
2635 o->onode.stripe_size = cct->_conf->kstore_default_stripe_size;
2636 stripe_size = o->onode.stripe_size;
2637 }
2638
2639 unsigned bl_off = 0;
2640 while (length > 0) {
2641 uint64_t offset_rem = offset % stripe_size;
2642 uint64_t end_rem = (offset + length) % stripe_size;
2643 if (offset_rem == 0 && end_rem == 0) {
2644 bufferlist bl;
2645 bl.substr_of(orig_bl, bl_off, stripe_size);
2646 dout(30) << __func__ << " full stripe " << offset << dendl;
2647 _do_write_stripe(txc, o, offset, bl);
2648 offset += stripe_size;
2649 length -= stripe_size;
2650 bl_off += stripe_size;
2651 continue;
2652 }
2653 uint64_t stripe_off = offset - offset_rem;
2654 bufferlist prev;
2655 _do_read_stripe(o, stripe_off, &prev);
2656 dout(20) << __func__ << " read previous stripe " << stripe_off
2657 << ", got " << prev.length() << dendl;
2658 bufferlist bl;
2659 if (offset_rem) {
2660 unsigned p = MIN(prev.length(), offset_rem);
2661 if (p) {
2662 dout(20) << __func__ << " reuse leading " << p << " bytes" << dendl;
2663 bl.substr_of(prev, 0, p);
2664 }
2665 if (p < offset_rem) {
2666 dout(20) << __func__ << " add leading " << offset_rem - p << " zeros" << dendl;
2667 bl.append_zero(offset_rem - p);
2668 }
2669 }
2670 unsigned use = stripe_size - offset_rem;
2671 if (use > length)
2672 use -= stripe_size - end_rem;
2673 dout(20) << __func__ << " using " << use << " for this stripe" << dendl;
2674 bufferlist t;
2675 t.substr_of(orig_bl, bl_off, use);
2676 bl.claim_append(t);
2677 bl_off += use;
2678 if (end_rem) {
2679 if (end_rem < prev.length()) {
2680 unsigned l = prev.length() - end_rem;
2681 dout(20) << __func__ << " reuse trailing " << l << " bytes" << dendl;
2682 bufferlist t;
2683 t.substr_of(prev, end_rem, l);
2684 bl.claim_append(t);
2685 }
2686 }
2687 dout(30) << " writing:\n";
2688 bl.hexdump(*_dout);
2689 *_dout << dendl;
2690 _do_write_stripe(txc, o, stripe_off, bl);
2691 offset += use;
2692 length -= use;
2693 }
2694
2695 if (offset > o->onode.size) {
2696 dout(20) << __func__ << " extending size to " << offset + length
2697 << dendl;
2698 o->onode.size = offset;
2699 }
2700
2701 return r;
2702 }
2703
2704 int KStore::_write(TransContext *txc,
2705 CollectionRef& c,
2706 OnodeRef& o,
2707 uint64_t offset, size_t length,
2708 bufferlist& bl,
2709 uint32_t fadvise_flags)
2710 {
2711 dout(15) << __func__ << " " << c->cid << " " << o->oid
2712 << " " << offset << "~" << length
2713 << dendl;
2714 _assign_nid(txc, o);
2715 int r = _do_write(txc, o, offset, length, bl, fadvise_flags);
2716 txc->write_onode(o);
2717
2718 dout(10) << __func__ << " " << c->cid << " " << o->oid
2719 << " " << offset << "~" << length
2720 << " = " << r << dendl;
2721 return r;
2722 }
2723
2724 int KStore::_zero(TransContext *txc,
2725 CollectionRef& c,
2726 OnodeRef& o,
2727 uint64_t offset, size_t length)
2728 {
2729 dout(15) << __func__ << " " << c->cid << " " << o->oid
2730 << " " << offset << "~" << length
2731 << dendl;
2732 int r = 0;
2733 o->exists = true;
2734
2735 _dump_onode(o);
2736 _assign_nid(txc, o);
2737
2738 uint64_t stripe_size = o->onode.stripe_size;
2739 if (stripe_size) {
2740 uint64_t end = offset + length;
2741 uint64_t pos = offset;
2742 uint64_t stripe_off = pos % stripe_size;
2743 while (pos < offset + length) {
2744 if (stripe_off || end - pos < stripe_size) {
2745 bufferlist stripe;
2746 _do_read_stripe(o, pos - stripe_off, &stripe);
2747 dout(30) << __func__ << " stripe " << pos - stripe_off << " got "
2748 << stripe.length() << dendl;
2749 bufferlist bl;
2750 bl.substr_of(stripe, 0, MIN(stripe.length(), stripe_off));
2751 if (end >= pos - stripe_off + stripe_size ||
2752 end >= o->onode.size) {
2753 dout(20) << __func__ << " truncated stripe " << pos - stripe_off
2754 << " to " << bl.length() << dendl;
2755 } else {
2756 auto len = end - (pos - stripe_off + bl.length());
2757 bl.append_zero(len);
2758 dout(20) << __func__ << " adding " << len << " of zeros" << dendl;
2759 if (stripe.length() > bl.length()) {
2760 unsigned l = stripe.length() - bl.length();
2761 bufferlist t;
2762 t.substr_of(stripe, stripe.length() - l, l);
2763 dout(20) << __func__ << " keeping tail " << l << " of stripe" << dendl;
2764 bl.claim_append(t);
2765 }
2766 }
2767 _do_write_stripe(txc, o, pos - stripe_off, bl);
2768 pos += stripe_size - stripe_off;
2769 stripe_off = 0;
2770 } else {
2771 dout(20) << __func__ << " rm stripe " << pos << dendl;
2772 _do_remove_stripe(txc, o, pos - stripe_off);
2773 pos += stripe_size;
2774 }
2775 }
2776 }
2777 if (offset + length > o->onode.size) {
2778 o->onode.size = offset + length;
2779 dout(20) << __func__ << " extending size to " << offset + length
2780 << dendl;
2781 }
2782 txc->write_onode(o);
2783
2784 dout(10) << __func__ << " " << c->cid << " " << o->oid
2785 << " " << offset << "~" << length
2786 << " = " << r << dendl;
2787 return r;
2788 }
2789
2790 int KStore::_do_truncate(TransContext *txc, OnodeRef o, uint64_t offset)
2791 {
2792 uint64_t stripe_size = o->onode.stripe_size;
2793
2794 o->flush();
2795
2796 // trim down stripes
2797 if (stripe_size) {
2798 uint64_t pos = offset;
2799 uint64_t stripe_off = pos % stripe_size;
2800 while (pos < o->onode.size) {
2801 if (stripe_off) {
2802 bufferlist stripe;
2803 _do_read_stripe(o, pos - stripe_off, &stripe);
2804 dout(30) << __func__ << " stripe " << pos - stripe_off << " got "
2805 << stripe.length() << dendl;
2806 bufferlist t;
2807 t.substr_of(stripe, 0, MIN(stripe_off, stripe.length()));
2808 _do_write_stripe(txc, o, pos - stripe_off, t);
2809 dout(20) << __func__ << " truncated stripe " << pos - stripe_off
2810 << " to " << t.length() << dendl;
2811 pos += stripe_size - stripe_off;
2812 stripe_off = 0;
2813 } else {
2814 dout(20) << __func__ << " rm stripe " << pos << dendl;
2815 _do_remove_stripe(txc, o, pos - stripe_off);
2816 pos += stripe_size;
2817 }
2818 }
2819
2820 // trim down cached tail
2821 if (o->tail_bl.length()) {
2822 if (offset / stripe_size != o->onode.size / stripe_size) {
2823 dout(20) << __func__ << " clear cached tail" << dendl;
2824 o->clear_tail();
2825 }
2826 }
2827 }
2828
2829 o->onode.size = offset;
2830 dout(10) << __func__ << " truncate size to " << offset << dendl;
2831
2832 txc->write_onode(o);
2833 return 0;
2834 }
2835
2836 int KStore::_truncate(TransContext *txc,
2837 CollectionRef& c,
2838 OnodeRef& o,
2839 uint64_t offset)
2840 {
2841 dout(15) << __func__ << " " << c->cid << " " << o->oid
2842 << " " << offset
2843 << dendl;
2844 int r = _do_truncate(txc, o, offset);
2845 dout(10) << __func__ << " " << c->cid << " " << o->oid
2846 << " " << offset
2847 << " = " << r << dendl;
2848 return r;
2849 }
2850
2851 int KStore::_do_remove(TransContext *txc,
2852 OnodeRef o)
2853 {
2854 string key;
2855
2856 _do_truncate(txc, o, 0);
2857
2858 o->onode.size = 0;
2859 if (o->onode.omap_head) {
2860 _do_omap_clear(txc, o->onode.omap_head);
2861 }
2862 o->exists = false;
2863 o->onode = kstore_onode_t();
2864 txc->onodes.erase(o);
2865 get_object_key(cct, o->oid, &key);
2866 txc->t->rmkey(PREFIX_OBJ, key);
2867 return 0;
2868 }
2869
2870 int KStore::_remove(TransContext *txc,
2871 CollectionRef& c,
2872 OnodeRef &o)
2873 {
2874 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2875 int r = _do_remove(txc, o);
2876 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
2877 return r;
2878 }
2879
2880 int KStore::_setattr(TransContext *txc,
2881 CollectionRef& c,
2882 OnodeRef& o,
2883 const string& name,
2884 bufferptr& val)
2885 {
2886 dout(15) << __func__ << " " << c->cid << " " << o->oid
2887 << " " << name << " (" << val.length() << " bytes)"
2888 << dendl;
2889 int r = 0;
2890 o->onode.attrs[name] = val;
2891 txc->write_onode(o);
2892 dout(10) << __func__ << " " << c->cid << " " << o->oid
2893 << " " << name << " (" << val.length() << " bytes)"
2894 << " = " << r << dendl;
2895 return r;
2896 }
2897
2898 int KStore::_setattrs(TransContext *txc,
2899 CollectionRef& c,
2900 OnodeRef& o,
2901 const map<string,bufferptr>& aset)
2902 {
2903 dout(15) << __func__ << " " << c->cid << " " << o->oid
2904 << " " << aset.size() << " keys"
2905 << dendl;
2906 int r = 0;
2907 for (map<string,bufferptr>::const_iterator p = aset.begin();
2908 p != aset.end(); ++p) {
2909 if (p->second.is_partial())
2910 o->onode.attrs[p->first] = bufferptr(p->second.c_str(), p->second.length());
2911 else
2912 o->onode.attrs[p->first] = p->second;
2913 }
2914 txc->write_onode(o);
2915 dout(10) << __func__ << " " << c->cid << " " << o->oid
2916 << " " << aset.size() << " keys"
2917 << " = " << r << dendl;
2918 return r;
2919 }
2920
2921
2922 int KStore::_rmattr(TransContext *txc,
2923 CollectionRef& c,
2924 OnodeRef& o,
2925 const string& name)
2926 {
2927 dout(15) << __func__ << " " << c->cid << " " << o->oid
2928 << " " << name << dendl;
2929 int r = 0;
2930 o->onode.attrs.erase(name);
2931 txc->write_onode(o);
2932 dout(10) << __func__ << " " << c->cid << " " << o->oid
2933 << " " << name << " = " << r << dendl;
2934 return r;
2935 }
2936
2937 int KStore::_rmattrs(TransContext *txc,
2938 CollectionRef& c,
2939 OnodeRef& o)
2940 {
2941 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2942 int r = 0;
2943 o->onode.attrs.clear();
2944 txc->write_onode(o);
2945 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
2946 return r;
2947 }
2948
2949 void KStore::_do_omap_clear(TransContext *txc, uint64_t id)
2950 {
2951 KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
2952 string prefix, tail;
2953 get_omap_header(id, &prefix);
2954 get_omap_tail(id, &tail);
2955 it->lower_bound(prefix);
2956 while (it->valid()) {
2957 if (it->key() >= tail) {
2958 dout(30) << __func__ << " stop at " << tail << dendl;
2959 break;
2960 }
2961 txc->t->rmkey(PREFIX_OMAP, it->key());
2962 dout(30) << __func__ << " rm " << pretty_binary_string(it->key()) << dendl;
2963 it->next();
2964 }
2965 }
2966
2967 int KStore::_omap_clear(TransContext *txc,
2968 CollectionRef& c,
2969 OnodeRef& o)
2970 {
2971 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2972 int r = 0;
2973 if (o->onode.omap_head != 0) {
2974 _do_omap_clear(txc, o->onode.omap_head);
2975 }
2976 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
2977 return r;
2978 }
2979
2980 int KStore::_omap_setkeys(TransContext *txc,
2981 CollectionRef& c,
2982 OnodeRef& o,
2983 bufferlist &bl)
2984 {
2985 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2986 int r;
2987 bufferlist::iterator p = bl.begin();
2988 __u32 num;
2989 if (!o->onode.omap_head) {
2990 o->onode.omap_head = o->onode.nid;
2991 txc->write_onode(o);
2992 }
2993 ::decode(num, p);
2994 while (num--) {
2995 string key;
2996 bufferlist value;
2997 ::decode(key, p);
2998 ::decode(value, p);
2999 string final_key;
3000 get_omap_key(o->onode.omap_head, key, &final_key);
3001 dout(30) << __func__ << " " << pretty_binary_string(final_key)
3002 << " <- " << key << dendl;
3003 txc->t->set(PREFIX_OMAP, final_key, value);
3004 }
3005 r = 0;
3006 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
3007 return r;
3008 }
3009
3010 int KStore::_omap_setheader(TransContext *txc,
3011 CollectionRef& c,
3012 OnodeRef &o,
3013 bufferlist& bl)
3014 {
3015 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
3016 int r;
3017 string key;
3018 if (!o->onode.omap_head) {
3019 o->onode.omap_head = o->onode.nid;
3020 txc->write_onode(o);
3021 }
3022 get_omap_header(o->onode.omap_head, &key);
3023 txc->t->set(PREFIX_OMAP, key, bl);
3024 r = 0;
3025 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
3026 return r;
3027 }
3028
3029 int KStore::_omap_rmkeys(TransContext *txc,
3030 CollectionRef& c,
3031 OnodeRef& o,
3032 bufferlist& bl)
3033 {
3034 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
3035 int r = 0;
3036 bufferlist::iterator p = bl.begin();
3037 __u32 num;
3038
3039 if (!o->onode.omap_head) {
3040 r = 0;
3041 goto out;
3042 }
3043 ::decode(num, p);
3044 while (num--) {
3045 string key;
3046 ::decode(key, p);
3047 string final_key;
3048 get_omap_key(o->onode.omap_head, key, &final_key);
3049 dout(30) << __func__ << " rm " << pretty_binary_string(final_key)
3050 << " <- " << key << dendl;
3051 txc->t->rmkey(PREFIX_OMAP, final_key);
3052 }
3053 r = 0;
3054
3055 out:
3056 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
3057 return r;
3058 }
3059
3060 int KStore::_omap_rmkey_range(TransContext *txc,
3061 CollectionRef& c,
3062 OnodeRef& o,
3063 const string& first, const string& last)
3064 {
3065 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
3066 KeyValueDB::Iterator it;
3067 string key_first, key_last;
3068 int r = 0;
3069
3070 if (!o->onode.omap_head) {
3071 goto out;
3072 }
3073 it = db->get_iterator(PREFIX_OMAP);
3074 get_omap_key(o->onode.omap_head, first, &key_first);
3075 get_omap_key(o->onode.omap_head, last, &key_last);
3076 it->lower_bound(key_first);
3077 while (it->valid()) {
3078 if (it->key() >= key_last) {
3079 dout(30) << __func__ << " stop at " << pretty_binary_string(key_last)
3080 << dendl;
3081 break;
3082 }
3083 txc->t->rmkey(PREFIX_OMAP, it->key());
3084 dout(30) << __func__ << " rm " << pretty_binary_string(it->key()) << dendl;
3085 it->next();
3086 }
3087 r = 0;
3088
3089 out:
3090 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
3091 return r;
3092 }
3093
3094 int KStore::_setallochint(TransContext *txc,
3095 CollectionRef& c,
3096 OnodeRef& o,
3097 uint64_t expected_object_size,
3098 uint64_t expected_write_size,
3099 uint32_t flags)
3100 {
3101 dout(15) << __func__ << " " << c->cid << " " << o->oid
3102 << " object_size " << expected_object_size
3103 << " write_size " << expected_write_size
3104 << " flags " << flags
3105 << dendl;
3106 int r = 0;
3107 o->onode.expected_object_size = expected_object_size;
3108 o->onode.expected_write_size = expected_write_size;
3109 o->onode.alloc_hint_flags = flags;
3110
3111 txc->write_onode(o);
3112 dout(10) << __func__ << " " << c->cid << " " << o->oid
3113 << " object_size " << expected_object_size
3114 << " write_size " << expected_write_size
3115 << " = " << r << dendl;
3116 return r;
3117 }
3118
3119 int KStore::_clone(TransContext *txc,
3120 CollectionRef& c,
3121 OnodeRef& oldo,
3122 OnodeRef& newo)
3123 {
3124 dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3125 << newo->oid << dendl;
3126 int r = 0;
3127 if (oldo->oid.hobj.get_hash() != newo->oid.hobj.get_hash()) {
3128 derr << __func__ << " mismatched hash on " << oldo->oid
3129 << " and " << newo->oid << dendl;
3130 return -EINVAL;
3131 }
3132
3133 bufferlist bl;
3134 newo->exists = true;
3135 _assign_nid(txc, newo);
3136
3137 // data
3138 oldo->flush();
3139
3140 r = _do_read(oldo, 0, oldo->onode.size, bl, 0);
3141 if (r < 0)
3142 goto out;
3143
3144 // truncate any old data
3145 r = _do_truncate(txc, newo, 0);
3146 if (r < 0)
3147 goto out;
3148
3149 r = _do_write(txc, newo, 0, oldo->onode.size, bl, 0);
3150 if (r < 0)
3151 goto out;
3152
3153 newo->onode.attrs = oldo->onode.attrs;
3154
3155 // clone omap
3156 if (newo->onode.omap_head) {
3157 dout(20) << __func__ << " clearing old omap data" << dendl;
3158 _do_omap_clear(txc, newo->onode.omap_head);
3159 }
3160 if (oldo->onode.omap_head) {
3161 dout(20) << __func__ << " copying omap data" << dendl;
3162 if (!newo->onode.omap_head) {
3163 newo->onode.omap_head = newo->onode.nid;
3164 }
3165 KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
3166 string head, tail;
3167 get_omap_header(oldo->onode.omap_head, &head);
3168 get_omap_tail(oldo->onode.omap_head, &tail);
3169 it->lower_bound(head);
3170 while (it->valid()) {
3171 string key;
3172 if (it->key() >= tail) {
3173 dout(30) << __func__ << " reached tail" << dendl;
3174 break;
3175 } else {
3176 dout(30) << __func__ << " got header/data "
3177 << pretty_binary_string(it->key()) << dendl;
3178 assert(it->key() < tail);
3179 rewrite_omap_key(newo->onode.omap_head, it->key(), &key);
3180 txc->t->set(PREFIX_OMAP, key, it->value());
3181 }
3182 it->next();
3183 }
3184 }
3185
3186 txc->write_onode(newo);
3187 r = 0;
3188
3189 out:
3190 dout(10) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3191 << newo->oid << " = " << r << dendl;
3192 return r;
3193 }
3194
3195 int KStore::_clone_range(TransContext *txc,
3196 CollectionRef& c,
3197 OnodeRef& oldo,
3198 OnodeRef& newo,
3199 uint64_t srcoff, uint64_t length, uint64_t dstoff)
3200 {
3201 dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3202 << newo->oid << " from " << srcoff << "~" << length
3203 << " to offset " << dstoff << dendl;
3204 int r = 0;
3205
3206 bufferlist bl;
3207 newo->exists = true;
3208 _assign_nid(txc, newo);
3209
3210 r = _do_read(oldo, srcoff, length, bl, 0);
3211 if (r < 0)
3212 goto out;
3213
3214 r = _do_write(txc, newo, dstoff, bl.length(), bl, 0);
3215 if (r < 0)
3216 goto out;
3217
3218 txc->write_onode(newo);
3219
3220 r = 0;
3221
3222 out:
3223 dout(10) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3224 << newo->oid << " from " << srcoff << "~" << length
3225 << " to offset " << dstoff
3226 << " = " << r << dendl;
3227 return r;
3228 }
3229
3230 int KStore::_rename(TransContext *txc,
3231 CollectionRef& c,
3232 OnodeRef& oldo,
3233 OnodeRef& newo,
3234 const ghobject_t& new_oid)
3235 {
3236 dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3237 << new_oid << dendl;
3238 int r;
3239 ghobject_t old_oid = oldo->oid;
3240 bufferlist bl;
3241 string old_key, new_key;
3242
3243 if (newo && newo->exists) {
3244 // destination object already exists, remove it first
3245 r = _do_remove(txc, newo);
3246 if (r < 0)
3247 goto out;
3248 }
3249
3250 txc->t->rmkey(PREFIX_OBJ, oldo->key);
3251 txc->write_onode(oldo);
3252 c->onode_map.rename(old_oid, new_oid); // this adjusts oldo->{oid,key}
3253 r = 0;
3254
3255 out:
3256 dout(10) << __func__ << " " << c->cid << " " << old_oid << " -> "
3257 << new_oid << " = " << r << dendl;
3258 return r;
3259 }
3260
3261 // collections
3262
3263 int KStore::_create_collection(
3264 TransContext *txc,
3265 coll_t cid,
3266 unsigned bits,
3267 CollectionRef *c)
3268 {
3269 dout(15) << __func__ << " " << cid << " bits " << bits << dendl;
3270 int r;
3271 bufferlist bl;
3272
3273 {
3274 RWLock::WLocker l(coll_lock);
3275 if (*c) {
3276 r = -EEXIST;
3277 goto out;
3278 }
3279 c->reset(new Collection(this, cid));
3280 (*c)->cnode.bits = bits;
3281 coll_map[cid] = *c;
3282 }
3283 ::encode((*c)->cnode, bl);
3284 txc->t->set(PREFIX_COLL, stringify(cid), bl);
3285 r = 0;
3286
3287 out:
3288 dout(10) << __func__ << " " << cid << " bits " << bits << " = " << r << dendl;
3289 return r;
3290 }
3291
3292 int KStore::_remove_collection(TransContext *txc, coll_t cid,
3293 CollectionRef *c)
3294 {
3295 dout(15) << __func__ << " " << cid << dendl;
3296 int r;
3297
3298 {
3299 RWLock::WLocker l(coll_lock);
3300 if (!*c) {
3301 r = -ENOENT;
3302 goto out;
3303 }
3304 size_t nonexistent_count = 0;
3305 pair<ghobject_t,OnodeRef> next_onode;
3306 while ((*c)->onode_map.get_next(next_onode.first, &next_onode)) {
3307 if (next_onode.second->exists) {
3308 r = -ENOTEMPTY;
3309 goto out;
3310 }
3311 ++nonexistent_count;
3312 }
3313 vector<ghobject_t> ls;
3314 ghobject_t next;
3315 // Enumerate onodes in db, up to nonexistent_count + 1
3316 // then check if all of them are marked as non-existent.
3317 // Bypass the check if returned number is greater than nonexistent_count
3318 r = _collection_list(c->get(), ghobject_t(), ghobject_t::get_max(),
3319 nonexistent_count + 1, &ls, &next);
3320 if (r >= 0) {
3321 bool exists = false; //ls.size() > nonexistent_count;
3322 for (auto it = ls.begin(); !exists && it < ls.end(); ++it) {
3323 dout(10) << __func__ << " oid " << *it << dendl;
3324 auto onode = (*c)->onode_map.lookup(*it);
3325 exists = !onode || onode->exists;
3326 if (exists) {
3327 dout(10) << __func__ << " " << *it
3328 << " exists in db" << dendl;
3329 }
3330 }
3331 if (!exists) {
3332 coll_map.erase(cid);
3333 txc->removed_collections.push_back(*c);
3334 c->reset();
3335 txc->t->rmkey(PREFIX_COLL, stringify(cid));
3336 r = 0;
3337 } else {
3338 dout(10) << __func__ << " " << cid
3339 << " is non-empty" << dendl;
3340 r = -ENOTEMPTY;
3341 }
3342 }
3343 }
3344
3345 out:
3346 dout(10) << __func__ << " " << cid << " = " << r << dendl;
3347 return r;
3348 }
3349
3350 int KStore::_split_collection(TransContext *txc,
3351 CollectionRef& c,
3352 CollectionRef& d,
3353 unsigned bits, int rem)
3354 {
3355 dout(15) << __func__ << " " << c->cid << " to " << d->cid << " "
3356 << " bits " << bits << dendl;
3357 int r;
3358 RWLock::WLocker l(c->lock);
3359 RWLock::WLocker l2(d->lock);
3360 c->onode_map.clear();
3361 d->onode_map.clear();
3362 c->cnode.bits = bits;
3363 assert(d->cnode.bits == bits);
3364 r = 0;
3365
3366 bufferlist bl;
3367 ::encode(c->cnode, bl);
3368 txc->t->set(PREFIX_COLL, stringify(c->cid), bl);
3369
3370 dout(10) << __func__ << " " << c->cid << " to " << d->cid << " "
3371 << " bits " << bits << " = " << r << dendl;
3372 return r;
3373 }
3374
3375 // ===========================================