]> git.proxmox.com Git - ceph.git/blame - ceph/src/os/kstore/KStore.cc
update sources to v12.1.1
[ceph.git] / ceph / src / os / kstore / KStore.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include <unistd.h>
16#include <stdlib.h>
17#include <sys/types.h>
18#include <sys/stat.h>
19#include <fcntl.h>
20#include <unistd.h>
21
22#include "KStore.h"
23#include "osd/osd_types.h"
24#include "os/kv.h"
25#include "include/compat.h"
26#include "include/stringify.h"
27#include "common/errno.h"
28#include "common/safe_io.h"
29#include "common/Formatter.h"
30
31
32#define dout_context cct
33#define dout_subsys ceph_subsys_kstore
34
35/*
36
37 TODO:
38
39 * superblock, features
40 * refcounted extents (for efficient clone)
41
42 */
43
44const string PREFIX_SUPER = "S"; // field -> value
45const string PREFIX_COLL = "C"; // collection name -> (nothing)
46const string PREFIX_OBJ = "O"; // object name -> onode
47const string PREFIX_DATA = "D"; // nid + offset -> data
48const string PREFIX_OMAP = "M"; // u64 + keyname -> value
49
50/*
51 * object name key structure
52 *
53 * 2 chars: shard (-- for none, or hex digit, so that we sort properly)
54 * encoded u64: poolid + 2^63 (so that it sorts properly)
55 * encoded u32: hash (bit reversed)
56 *
57 * 1 char: '.'
58 *
59 * escaped string: namespace
60 *
61 * 1 char: '<', '=', or '>'. if =, then object key == object name, and
62 * we are followed just by the key. otherwise, we are followed by
63 * the key and then the object name.
64 * escaped string: key
65 * escaped string: object name (unless '=' above)
66 *
67 * encoded u64: snap
68 * encoded u64: generation
69 */
70
71/*
72 * string encoding in the key
73 *
74 * The key string needs to lexicographically sort the same way that
75 * ghobject_t does. We do this by escaping anything <= to '#' with #
76 * plus a 2 digit hex string, and anything >= '~' with ~ plus the two
77 * hex digits.
78 *
79 * We use ! as a terminator for strings; this works because it is < #
80 * and will get escaped if it is present in the string.
81 *
82 */
83
84static void append_escaped(const string &in, string *out)
85{
86 char hexbyte[8];
87 for (string::const_iterator i = in.begin(); i != in.end(); ++i) {
88 if (*i <= '#') {
89 snprintf(hexbyte, sizeof(hexbyte), "#%02x", (uint8_t)*i);
90 out->append(hexbyte);
91 } else if (*i >= '~') {
92 snprintf(hexbyte, sizeof(hexbyte), "~%02x", (uint8_t)*i);
93 out->append(hexbyte);
94 } else {
95 out->push_back(*i);
96 }
97 }
98 out->push_back('!');
99}
100
101static int decode_escaped(const char *p, string *out)
102{
103 const char *orig_p = p;
104 while (*p && *p != '!') {
105 if (*p == '#' || *p == '~') {
106 unsigned hex;
107 int r = sscanf(++p, "%2x", &hex);
108 if (r < 1)
109 return -EINVAL;
110 out->push_back((char)hex);
111 p += 2;
112 } else {
113 out->push_back(*p++);
114 }
115 }
116 return p - orig_p;
117}
118
119// some things we encode in binary (as le32 or le64); print the
120// resulting key strings nicely
121static string pretty_binary_string(const string& in)
122{
123 char buf[10];
124 string out;
125 out.reserve(in.length() * 3);
126 enum { NONE, HEX, STRING } mode = NONE;
127 unsigned from = 0, i;
128 for (i=0; i < in.length(); ++i) {
129 if ((in[i] < 32 || (unsigned char)in[i] > 126) ||
130 (mode == HEX && in.length() - i >= 4 &&
131 ((in[i] < 32 || (unsigned char)in[i] > 126) ||
132 (in[i+1] < 32 || (unsigned char)in[i+1] > 126) ||
133 (in[i+2] < 32 || (unsigned char)in[i+2] > 126) ||
134 (in[i+3] < 32 || (unsigned char)in[i+3] > 126)))) {
135 if (mode == STRING) {
136 out.append(in.substr(from, i - from));
137 out.push_back('\'');
138 }
139 if (mode != HEX) {
140 out.append("0x");
141 mode = HEX;
142 }
143 if (in.length() - i >= 4) {
144 // print a whole u32 at once
145 snprintf(buf, sizeof(buf), "%08x",
146 (uint32_t)(((unsigned char)in[i] << 24) |
147 ((unsigned char)in[i+1] << 16) |
148 ((unsigned char)in[i+2] << 8) |
149 ((unsigned char)in[i+3] << 0)));
150 i += 3;
151 } else {
152 snprintf(buf, sizeof(buf), "%02x", (int)(unsigned char)in[i]);
153 }
154 out.append(buf);
155 } else {
156 if (mode != STRING) {
157 out.push_back('\'');
158 mode = STRING;
159 from = i;
160 }
161 }
162 }
163 if (mode == STRING) {
164 out.append(in.substr(from, i - from));
165 out.push_back('\'');
166 }
167 return out;
168}
169
170static void _key_encode_shard(shard_id_t shard, string *key)
171{
172 // make field ordering match with ghobject_t compare operations
173 if (shard == shard_id_t::NO_SHARD) {
174 // otherwise ff will sort *after* 0, not before.
175 key->append("--");
176 } else {
177 char buf[32];
178 snprintf(buf, sizeof(buf), "%02x", (int)shard);
179 key->append(buf);
180 }
181}
182static const char *_key_decode_shard(const char *key, shard_id_t *pshard)
183{
184 if (key[0] == '-') {
185 *pshard = shard_id_t::NO_SHARD;
186 } else {
187 unsigned shard;
188 int r = sscanf(key, "%x", &shard);
189 if (r < 1)
190 return NULL;
191 *pshard = shard_id_t(shard);
192 }
193 return key + 2;
194}
195
196static void get_coll_key_range(const coll_t& cid, int bits,
197 string *temp_start, string *temp_end,
198 string *start, string *end)
199{
200 temp_start->clear();
201 temp_end->clear();
202 start->clear();
203 end->clear();
204
205 spg_t pgid;
206 if (cid.is_pg(&pgid)) {
207 _key_encode_shard(pgid.shard, start);
208 *end = *start;
209 *temp_start = *start;
210 *temp_end = *start;
211
212 _key_encode_u64(pgid.pool() + 0x8000000000000000ull, start);
213 _key_encode_u64((-2ll - pgid.pool()) + 0x8000000000000000ull, temp_start);
214 _key_encode_u32(hobject_t::_reverse_bits(pgid.ps()), start);
215 _key_encode_u32(hobject_t::_reverse_bits(pgid.ps()), temp_start);
216 start->append(".");
217 temp_start->append(".");
218
219 _key_encode_u64(pgid.pool() + 0x8000000000000000ull, end);
220 _key_encode_u64((-2ll - pgid.pool()) + 0x8000000000000000ull, temp_end);
221
222 uint64_t end_hash =
223 hobject_t::_reverse_bits(pgid.ps()) + (1ull << (32-bits));
224 if (end_hash <= 0xffffffffull) {
225 _key_encode_u32(end_hash, end);
226 _key_encode_u32(end_hash, temp_end);
227 end->append(".");
228 temp_end->append(".");
229 } else {
230 _key_encode_u32(0xffffffff, end);
231 _key_encode_u32(0xffffffff, temp_end);
232 end->append(":");
233 temp_end->append(":");
234 }
235 } else {
236 _key_encode_shard(shard_id_t::NO_SHARD, start);
237 _key_encode_u64(-1ull + 0x8000000000000000ull, start);
238 *end = *start;
239 _key_encode_u32(0, start);
240 start->append(".");
241 _key_encode_u32(0xffffffff, end);
242 end->append(":");
243
244 // no separate temp section
245 *temp_start = *end;
246 *temp_end = *end;
247 }
248}
249
250static int get_key_object(const string& key, ghobject_t *oid);
251
252static void get_object_key(CephContext* cct, const ghobject_t& oid,
253 string *key)
254{
255 key->clear();
256
257 _key_encode_shard(oid.shard_id, key);
258 _key_encode_u64(oid.hobj.pool + 0x8000000000000000ull, key);
259 _key_encode_u32(oid.hobj.get_bitwise_key_u32(), key);
260 key->append(".");
261
262 append_escaped(oid.hobj.nspace, key);
263
264 if (oid.hobj.get_key().length()) {
265 // is a key... could be < = or >.
266 // (ASCII chars < = and > sort in that order, yay)
267 if (oid.hobj.get_key() < oid.hobj.oid.name) {
268 key->append("<");
269 append_escaped(oid.hobj.get_key(), key);
270 append_escaped(oid.hobj.oid.name, key);
271 } else if (oid.hobj.get_key() > oid.hobj.oid.name) {
272 key->append(">");
273 append_escaped(oid.hobj.get_key(), key);
274 append_escaped(oid.hobj.oid.name, key);
275 } else {
276 // same as no key
277 key->append("=");
278 append_escaped(oid.hobj.oid.name, key);
279 }
280 } else {
281 // no key
282 key->append("=");
283 append_escaped(oid.hobj.oid.name, key);
284 }
285
286 _key_encode_u64(oid.hobj.snap, key);
287 _key_encode_u64(oid.generation, key);
288
289 // sanity check
290 if (true) {
291 ghobject_t t;
292 int r = get_key_object(*key, &t);
293 if (r || t != oid) {
294 derr << " r " << r << dendl;
295 derr << "key " << pretty_binary_string(*key) << dendl;
296 derr << "oid " << oid << dendl;
297 derr << " t " << t << dendl;
298 assert(t == oid);
299 }
300 }
301}
302
303static int get_key_object(const string& key, ghobject_t *oid)
304{
305 int r;
306 const char *p = key.c_str();
307
308 p = _key_decode_shard(p, &oid->shard_id);
309
310 uint64_t pool;
311 p = _key_decode_u64(p, &pool);
312 oid->hobj.pool = pool - 0x8000000000000000ull;
313
314 unsigned hash;
315 p = _key_decode_u32(p, &hash);
316 oid->hobj.set_bitwise_key_u32(hash);
317 if (*p != '.')
318 return -5;
319 ++p;
320
321 r = decode_escaped(p, &oid->hobj.nspace);
322 if (r < 0)
323 return -6;
324 p += r + 1;
325
326 if (*p == '=') {
327 // no key
328 ++p;
329 r = decode_escaped(p, &oid->hobj.oid.name);
330 if (r < 0)
331 return -7;
332 p += r + 1;
333 } else if (*p == '<' || *p == '>') {
334 // key + name
335 ++p;
336 string okey;
337 r = decode_escaped(p, &okey);
338 if (r < 0)
339 return -8;
340 p += r + 1;
341 r = decode_escaped(p, &oid->hobj.oid.name);
342 if (r < 0)
343 return -9;
344 p += r + 1;
345 oid->hobj.set_key(okey);
346 } else {
347 // malformed
348 return -10;
349 }
350
351 p = _key_decode_u64(p, &oid->hobj.snap.val);
352 p = _key_decode_u64(p, &oid->generation);
353 if (*p) {
354 // if we get something other than a null terminator here,
355 // something goes wrong.
356 return -12;
357 }
358
359 return 0;
360}
361
362
363static void get_data_key(uint64_t nid, uint64_t offset, string *out)
364{
365 _key_encode_u64(nid, out);
366 _key_encode_u64(offset, out);
367}
368
369// '-' < '.' < '~'
370static void get_omap_header(uint64_t id, string *out)
371{
372 _key_encode_u64(id, out);
373 out->push_back('-');
374}
375
376// hmm, I don't think there's any need to escape the user key since we
377// have a clean prefix.
378static void get_omap_key(uint64_t id, const string& key, string *out)
379{
380 _key_encode_u64(id, out);
381 out->push_back('.');
382 out->append(key);
383}
384
385static void rewrite_omap_key(uint64_t id, string old, string *out)
386{
387 _key_encode_u64(id, out);
388 out->append(old.substr(out->length()));
389}
390
391static void decode_omap_key(const string& key, string *user_key)
392{
393 *user_key = key.substr(sizeof(uint64_t) + 1);
394}
395
396static void get_omap_tail(uint64_t id, string *out)
397{
398 _key_encode_u64(id, out);
399 out->push_back('~');
400}
401
402
403
404// Onode
405
406#undef dout_prefix
407#define dout_prefix *_dout << "kstore.onode(" << this << ") "
408
409void KStore::Onode::flush()
410{
411 std::unique_lock<std::mutex> l(flush_lock);
412 dout(20) << __func__ << " " << flush_txns << dendl;
413 while (!flush_txns.empty())
414 flush_cond.wait(l);
415 dout(20) << __func__ << " done" << dendl;
416}
417
418// OnodeHashLRU
419
420#undef dout_prefix
421#define dout_prefix *_dout << "kstore.lru(" << this << ") "
422
423void KStore::OnodeHashLRU::_touch(OnodeRef o)
424{
425 lru_list_t::iterator p = lru.iterator_to(*o);
426 lru.erase(p);
427 lru.push_front(*o);
428}
429
430void KStore::OnodeHashLRU::add(const ghobject_t& oid, OnodeRef o)
431{
432 std::lock_guard<std::mutex> l(lock);
433 dout(30) << __func__ << " " << oid << " " << o << dendl;
434 assert(onode_map.count(oid) == 0);
435 onode_map[oid] = o;
436 lru.push_front(*o);
437}
438
439KStore::OnodeRef KStore::OnodeHashLRU::lookup(const ghobject_t& oid)
440{
441 std::lock_guard<std::mutex> l(lock);
442 dout(30) << __func__ << dendl;
443 ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.find(oid);
444 if (p == onode_map.end()) {
445 dout(30) << __func__ << " " << oid << " miss" << dendl;
446 return OnodeRef();
447 }
448 dout(30) << __func__ << " " << oid << " hit " << p->second << dendl;
449 _touch(p->second);
450 return p->second;
451}
452
453void KStore::OnodeHashLRU::clear()
454{
455 std::lock_guard<std::mutex> l(lock);
456 dout(10) << __func__ << dendl;
457 lru.clear();
458 onode_map.clear();
459}
460
461void KStore::OnodeHashLRU::rename(const ghobject_t& old_oid,
462 const ghobject_t& new_oid)
463{
464 std::lock_guard<std::mutex> l(lock);
465 dout(30) << __func__ << " " << old_oid << " -> " << new_oid << dendl;
466 ceph::unordered_map<ghobject_t,OnodeRef>::iterator po, pn;
467 po = onode_map.find(old_oid);
468 pn = onode_map.find(new_oid);
469
470 assert(po != onode_map.end());
471 if (pn != onode_map.end()) {
472 lru_list_t::iterator p = lru.iterator_to(*pn->second);
473 lru.erase(p);
474 onode_map.erase(pn);
475 }
476 OnodeRef o = po->second;
477
478 // install a non-existent onode it its place
479 po->second.reset(new Onode(cct, old_oid, o->key));
480 lru.push_back(*po->second);
481
482 // fix oid, key
483 onode_map.insert(make_pair(new_oid, o));
484 _touch(o);
485 o->oid = new_oid;
486 get_object_key(cct, new_oid, &o->key);
487}
488
489bool KStore::OnodeHashLRU::get_next(
490 const ghobject_t& after,
491 pair<ghobject_t,OnodeRef> *next)
492{
493 std::lock_guard<std::mutex> l(lock);
494 dout(20) << __func__ << " after " << after << dendl;
495
496 if (after == ghobject_t()) {
497 if (lru.empty()) {
498 return false;
499 }
500 ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.begin();
501 assert(p != onode_map.end());
502 next->first = p->first;
503 next->second = p->second;
504 return true;
505 }
506
507 ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.find(after);
508 assert(p != onode_map.end()); // for now
509 lru_list_t::iterator pi = lru.iterator_to(*p->second);
510 ++pi;
511 if (pi == lru.end()) {
512 return false;
513 }
514 next->first = pi->oid;
515 next->second = onode_map[pi->oid];
516 return true;
517}
518
519int KStore::OnodeHashLRU::trim(int max)
520{
521 std::lock_guard<std::mutex> l(lock);
522 dout(20) << __func__ << " max " << max
523 << " size " << onode_map.size() << dendl;
524 int trimmed = 0;
525 int num = onode_map.size() - max;
526 if (onode_map.size() == 0 || num <= 0)
527 return 0; // don't even try
528
529 lru_list_t::iterator p = lru.end();
530 if (num)
531 --p;
532 while (num > 0) {
533 Onode *o = &*p;
534 int refs = o->nref.load();
535 if (refs > 1) {
536 dout(20) << __func__ << " " << o->oid << " has " << refs
537 << " refs; stopping with " << num << " left to trim" << dendl;
538 break;
539 }
540 dout(30) << __func__ << " trim " << o->oid << dendl;
541 if (p != lru.begin()) {
542 lru.erase(p--);
543 } else {
544 lru.erase(p);
545 assert(num == 1);
546 }
547 o->get(); // paranoia
548 onode_map.erase(o->oid);
549 o->put();
550 --num;
551 ++trimmed;
552 }
553 return trimmed;
554}
555
556// =======================================================
557
558// Collection
559
560#undef dout_prefix
561#define dout_prefix *_dout << "kstore(" << store->path << ").collection(" << cid << ") "
562
563KStore::Collection::Collection(KStore *ns, coll_t c)
564 : store(ns),
565 cid(c),
566 lock("KStore::Collection::lock", true, false),
567 onode_map(store->cct)
568{
569}
570
571KStore::OnodeRef KStore::Collection::get_onode(
572 const ghobject_t& oid,
573 bool create)
574{
575 assert(create ? lock.is_wlocked() : lock.is_locked());
576
577 spg_t pgid;
578 if (cid.is_pg(&pgid)) {
579 if (!oid.match(cnode.bits, pgid.ps())) {
580 lderr(store->cct) << __func__ << " oid " << oid << " not part of "
581 << pgid << " bits " << cnode.bits << dendl;
582 ceph_abort();
583 }
584 }
585
586 OnodeRef o = onode_map.lookup(oid);
587 if (o)
588 return o;
589
590 string key;
591 get_object_key(store->cct, oid, &key);
592
593 ldout(store->cct, 20) << __func__ << " oid " << oid << " key "
594 << pretty_binary_string(key) << dendl;
595
596 bufferlist v;
597 int r = store->db->get(PREFIX_OBJ, key, &v);
598 ldout(store->cct, 20) << " r " << r << " v.len " << v.length() << dendl;
599 Onode *on;
600 if (v.length() == 0) {
601 assert(r == -ENOENT);
602 if (!create)
603 return OnodeRef();
604
605 // new
606 on = new Onode(store->cct, oid, key);
607 on->dirty = true;
608 } else {
609 // loaded
610 assert(r >=0);
611 on = new Onode(store->cct, oid, key);
612 on->exists = true;
613 bufferlist::iterator p = v.begin();
614 ::decode(on->onode, p);
615 }
616 o.reset(on);
617 onode_map.add(oid, o);
618 return o;
619}
620
621
622
623// =======================================================
624
625#undef dout_prefix
626#define dout_prefix *_dout << "kstore(" << path << ") "
627
628KStore::KStore(CephContext *cct, const string& path)
629 : ObjectStore(cct, path),
630 db(NULL),
631 path_fd(-1),
632 fsid_fd(-1),
633 mounted(false),
634 coll_lock("KStore::coll_lock"),
635 nid_last(0),
636 nid_max(0),
637 throttle_ops(cct, "kstore_max_ops", cct->_conf->kstore_max_ops),
638 throttle_bytes(cct, "kstore_max_bytes", cct->_conf->kstore_max_bytes),
639 finisher(cct),
640 kv_sync_thread(this),
641 kv_stop(false),
642 logger(nullptr)
643{
644 _init_logger();
645}
646
647KStore::~KStore()
648{
649 _shutdown_logger();
650 assert(!mounted);
651 assert(db == NULL);
652 assert(fsid_fd < 0);
653}
654
655void KStore::_init_logger()
656{
657 // XXX
658 PerfCountersBuilder b(cct, "KStore",
659 l_kstore_first, l_kstore_last);
660 b.add_time_avg(l_kstore_state_prepare_lat, "state_prepare_lat", "Average prepare state latency");
661 b.add_time_avg(l_kstore_state_kv_queued_lat, "state_kv_queued_lat", "Average kv_queued state latency");
662 b.add_time_avg(l_kstore_state_kv_done_lat, "state_kv_done_lat", "Average kv_done state latency");
663 b.add_time_avg(l_kstore_state_finishing_lat, "state_finishing_lat", "Average finishing state latency");
664 b.add_time_avg(l_kstore_state_done_lat, "state_done_lat", "Average done state latency");
665 logger = b.create_perf_counters();
666 cct->get_perfcounters_collection()->add(logger);
667}
668
669void KStore::_shutdown_logger()
670{
671 // XXX
672 cct->get_perfcounters_collection()->remove(logger);
673 delete logger;
674}
675
676int KStore::_open_path()
677{
678 assert(path_fd < 0);
679 path_fd = ::open(path.c_str(), O_DIRECTORY);
680 if (path_fd < 0) {
681 int r = -errno;
682 derr << __func__ << " unable to open " << path << ": " << cpp_strerror(r)
683 << dendl;
684 return r;
685 }
686 return 0;
687}
688
689void KStore::_close_path()
690{
691 VOID_TEMP_FAILURE_RETRY(::close(path_fd));
692 path_fd = -1;
693}
694
695int KStore::_open_fsid(bool create)
696{
697 assert(fsid_fd < 0);
698 int flags = O_RDWR;
699 if (create)
700 flags |= O_CREAT;
701 fsid_fd = ::openat(path_fd, "fsid", flags, 0644);
702 if (fsid_fd < 0) {
703 int err = -errno;
704 derr << __func__ << " " << cpp_strerror(err) << dendl;
705 return err;
706 }
707 return 0;
708}
709
710int KStore::_read_fsid(uuid_d *uuid)
711{
712 char fsid_str[40];
713 memset(fsid_str, 0, sizeof(fsid_str));
714 int ret = safe_read(fsid_fd, fsid_str, sizeof(fsid_str));
715 if (ret < 0) {
716 derr << __func__ << " failed: " << cpp_strerror(ret) << dendl;
717 return ret;
718 }
719 if (ret > 36)
720 fsid_str[36] = 0;
721 else
722 fsid_str[ret] = 0;
723 if (!uuid->parse(fsid_str)) {
724 derr << __func__ << " unparsable uuid " << fsid_str << dendl;
725 return -EINVAL;
726 }
727 return 0;
728}
729
730int KStore::_write_fsid()
731{
732 int r = ::ftruncate(fsid_fd, 0);
733 if (r < 0) {
734 r = -errno;
735 derr << __func__ << " fsid truncate failed: " << cpp_strerror(r) << dendl;
736 return r;
737 }
738 string str = stringify(fsid) + "\n";
739 r = safe_write(fsid_fd, str.c_str(), str.length());
740 if (r < 0) {
741 derr << __func__ << " fsid write failed: " << cpp_strerror(r) << dendl;
742 return r;
743 }
744 r = ::fsync(fsid_fd);
745 if (r < 0) {
746 r = -errno;
747 derr << __func__ << " fsid fsync failed: " << cpp_strerror(r) << dendl;
748 return r;
749 }
750 return 0;
751}
752
753void KStore::_close_fsid()
754{
755 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
756 fsid_fd = -1;
757}
758
759int KStore::_lock_fsid()
760{
761 struct flock l;
762 memset(&l, 0, sizeof(l));
763 l.l_type = F_WRLCK;
764 l.l_whence = SEEK_SET;
765 l.l_start = 0;
766 l.l_len = 0;
767 int r = ::fcntl(fsid_fd, F_SETLK, &l);
768 if (r < 0) {
769 int err = errno;
770 derr << __func__ << " failed to lock " << path << "/fsid"
771 << " (is another ceph-osd still running?)"
772 << cpp_strerror(err) << dendl;
773 return -err;
774 }
775 return 0;
776}
777
778bool KStore::test_mount_in_use()
779{
780 // most error conditions mean the mount is not in use (e.g., because
781 // it doesn't exist). only if we fail to lock do we conclude it is
782 // in use.
783 bool ret = false;
784 int r = _open_path();
785 if (r < 0)
786 return false;
787 r = _open_fsid(false);
788 if (r < 0)
789 goto out_path;
790 r = _lock_fsid();
791 if (r < 0)
792 ret = true; // if we can't lock, it is in use
793 _close_fsid();
794 out_path:
795 _close_path();
796 return ret;
797}
798
799int KStore::_open_db(bool create)
800{
801 int r;
802 assert(!db);
803 char fn[PATH_MAX];
804 snprintf(fn, sizeof(fn), "%s/db", path.c_str());
805
806 string kv_backend;
807 if (create) {
808 kv_backend = cct->_conf->kstore_backend;
809 } else {
810 r = read_meta("kv_backend", &kv_backend);
811 if (r < 0) {
812 derr << __func__ << " uanble to read 'kv_backend' meta" << dendl;
813 return -EIO;
814 }
815 }
816 dout(10) << __func__ << " kv_backend = " << kv_backend << dendl;
817
818 if (create) {
819 int r = ::mkdir(fn, 0755);
820 if (r < 0)
821 r = -errno;
822 if (r < 0 && r != -EEXIST) {
823 derr << __func__ << " failed to create " << fn << ": " << cpp_strerror(r)
824 << dendl;
825 return r;
826 }
827
828 // wal_dir, too!
829 char walfn[PATH_MAX];
830 snprintf(walfn, sizeof(walfn), "%s/db.wal", path.c_str());
831 r = ::mkdir(walfn, 0755);
832 if (r < 0)
833 r = -errno;
834 if (r < 0 && r != -EEXIST) {
835 derr << __func__ << " failed to create " << walfn
836 << ": " << cpp_strerror(r)
837 << dendl;
838 return r;
839 }
840 }
841
842 db = KeyValueDB::create(cct, kv_backend, fn);
843 if (!db) {
844 derr << __func__ << " error creating db" << dendl;
845 return -EIO;
846 }
847 string options;
848 if (kv_backend == "rocksdb")
849 options = cct->_conf->kstore_rocksdb_options;
850 db->init(options);
851 stringstream err;
852 if (create)
853 r = db->create_and_open(err);
854 else
855 r = db->open(err);
856 if (r) {
857 derr << __func__ << " erroring opening db: " << err.str() << dendl;
858 delete db;
859 db = NULL;
860 return -EIO;
861 }
862 dout(1) << __func__ << " opened " << kv_backend
863 << " path " << fn << " options " << options << dendl;
864 return 0;
865}
866
867void KStore::_close_db()
868{
869 assert(db);
870 delete db;
871 db = NULL;
872}
873
874int KStore::_open_collections(int *errors)
875{
876 assert(coll_map.empty());
877 KeyValueDB::Iterator it = db->get_iterator(PREFIX_COLL);
878 for (it->upper_bound(string());
879 it->valid();
880 it->next()) {
881 coll_t cid;
882 if (cid.parse(it->key())) {
883 CollectionRef c(new Collection(this, cid));
884 bufferlist bl = it->value();
885 bufferlist::iterator p = bl.begin();
886 try {
887 ::decode(c->cnode, p);
888 } catch (buffer::error& e) {
889 derr << __func__ << " failed to decode cnode, key:"
890 << pretty_binary_string(it->key()) << dendl;
891 return -EIO;
892 }
893 dout(20) << __func__ << " opened " << cid << dendl;
894 coll_map[cid] = c;
895 } else {
896 derr << __func__ << " unrecognized collection " << it->key() << dendl;
897 if (errors)
898 (*errors)++;
899 }
900 }
901 return 0;
902}
903
904int KStore::mkfs()
905{
906 dout(1) << __func__ << " path " << path << dendl;
907 int r;
908 uuid_d old_fsid;
909
910 r = _open_path();
911 if (r < 0)
912 return r;
913
914 r = _open_fsid(true);
915 if (r < 0)
916 goto out_path_fd;
917
918 r = _lock_fsid();
919 if (r < 0)
920 goto out_close_fsid;
921
922 r = _read_fsid(&old_fsid);
923 if (r < 0 || old_fsid.is_zero()) {
924 if (fsid.is_zero()) {
925 fsid.generate_random();
926 dout(1) << __func__ << " generated fsid " << fsid << dendl;
927 } else {
928 dout(1) << __func__ << " using provided fsid " << fsid << dendl;
929 }
930 // we'll write it last.
931 } else {
932 if (!fsid.is_zero() && fsid != old_fsid) {
933 derr << __func__ << " on-disk fsid " << old_fsid
934 << " != provided " << fsid << dendl;
935 r = -EINVAL;
936 goto out_close_fsid;
937 }
938 fsid = old_fsid;
939 dout(1) << __func__ << " already created, fsid is " << fsid << dendl;
940 goto out_close_fsid;
941 }
942
943 r = _open_db(true);
944 if (r < 0)
945 goto out_close_fsid;
946
947 r = write_meta("kv_backend", cct->_conf->kstore_backend);
948 if (r < 0)
949 goto out_close_db;
950
951 r = write_meta("type", "kstore");
952 if (r < 0)
953 goto out_close_db;
954
955 // indicate mkfs completion/success by writing the fsid file
956 r = _write_fsid();
957 if (r == 0)
958 dout(10) << __func__ << " success" << dendl;
959 else
960 derr << __func__ << " error writing fsid: " << cpp_strerror(r) << dendl;
961
962 out_close_db:
963 _close_db();
964 out_close_fsid:
965 _close_fsid();
966 out_path_fd:
967 _close_path();
968 return r;
969}
970
971int KStore::mount()
972{
973 dout(1) << __func__ << " path " << path << dendl;
974
975 if (cct->_conf->kstore_fsck_on_mount) {
976 int rc = fsck(cct->_conf->kstore_fsck_on_mount_deep);
977 if (rc < 0)
978 return rc;
979 }
980
981 int r = _open_path();
982 if (r < 0)
983 return r;
984 r = _open_fsid(false);
985 if (r < 0)
986 goto out_path;
987
988 r = _read_fsid(&fsid);
989 if (r < 0)
990 goto out_fsid;
991
992 r = _lock_fsid();
993 if (r < 0)
994 goto out_fsid;
995
996 r = _open_db(false);
997 if (r < 0)
998 goto out_fsid;
999
1000 r = _open_super_meta();
1001 if (r < 0)
1002 goto out_db;
1003
1004 r = _open_collections();
1005 if (r < 0)
1006 goto out_db;
1007
1008 finisher.start();
1009 kv_sync_thread.create("kstore_kv_sync");
1010
1011 mounted = true;
1012 return 0;
1013
1014 out_db:
1015 _close_db();
1016 out_fsid:
1017 _close_fsid();
1018 out_path:
1019 _close_path();
1020 return r;
1021}
1022
1023int KStore::umount()
1024{
1025 assert(mounted);
1026 dout(1) << __func__ << dendl;
1027
1028 _sync();
1029 _reap_collections();
1030 coll_map.clear();
1031
1032 dout(20) << __func__ << " stopping kv thread" << dendl;
1033 _kv_stop();
1034 dout(20) << __func__ << " draining finisher" << dendl;
1035 finisher.wait_for_empty();
1036 dout(20) << __func__ << " stopping finisher" << dendl;
1037 finisher.stop();
1038 dout(20) << __func__ << " closing" << dendl;
1039
1040 mounted = false;
1041 _close_db();
1042 _close_fsid();
1043 _close_path();
1044 return 0;
1045}
1046
1047int KStore::fsck(bool deep)
1048{
1049 dout(1) << __func__ << dendl;
1050 int errors = 0;
1051 dout(1) << __func__ << " finish with " << errors << " errors" << dendl;
1052 return errors;
1053}
1054
1055void KStore::_sync()
1056{
1057 dout(10) << __func__ << dendl;
1058
1059 std::unique_lock<std::mutex> l(kv_lock);
1060 while (!kv_committing.empty() ||
1061 !kv_queue.empty()) {
1062 dout(20) << " waiting for kv to commit" << dendl;
1063 kv_sync_cond.wait(l);
1064 }
1065
1066 dout(10) << __func__ << " done" << dendl;
1067}
1068
1069int KStore::statfs(struct store_statfs_t* buf)
1070{
1071 return db->get_statfs(buf);
1072}
1073
1074// ---------------
1075// cache
1076
1077KStore::CollectionRef KStore::_get_collection(coll_t cid)
1078{
1079 RWLock::RLocker l(coll_lock);
1080 ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
1081 if (cp == coll_map.end())
1082 return CollectionRef();
1083 return cp->second;
1084}
1085
1086void KStore::_queue_reap_collection(CollectionRef& c)
1087{
1088 dout(10) << __func__ << " " << c->cid << dendl;
1089 std::lock_guard<std::mutex> l(reap_lock);
1090 removed_collections.push_back(c);
1091}
1092
1093void KStore::_reap_collections()
1094{
1095 list<CollectionRef> removed_colls;
1096 std::lock_guard<std::mutex> l(reap_lock);
1097 removed_colls.swap(removed_collections);
1098
1099 for (list<CollectionRef>::iterator p = removed_colls.begin();
1100 p != removed_colls.end();
1101 ++p) {
1102 CollectionRef c = *p;
1103 dout(10) << __func__ << " " << c->cid << dendl;
1104 {
1105 pair<ghobject_t,OnodeRef> next;
1106 while (c->onode_map.get_next(next.first, &next)) {
1107 assert(!next.second->exists);
1108 if (!next.second->flush_txns.empty()) {
1109 dout(10) << __func__ << " " << c->cid << " " << next.second->oid
1110 << " flush_txns " << next.second->flush_txns << dendl;
1111 return;
1112 }
1113 }
1114 }
1115 c->onode_map.clear();
1116 dout(10) << __func__ << " " << c->cid << " done" << dendl;
1117 }
1118
1119 dout(10) << __func__ << " all reaped" << dendl;
1120}
1121
1122// ---------------
1123// read operations
1124
1125bool KStore::exists(const coll_t& cid, const ghobject_t& oid)
1126{
1127 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1128 CollectionRef c = _get_collection(cid);
1129 if (!c)
1130 return false;
1131 RWLock::RLocker l(c->lock);
1132 OnodeRef o = c->get_onode(oid, false);
1133 if (!o || !o->exists)
1134 return false;
1135 return true;
1136}
1137
1138int KStore::stat(
1139 const coll_t& cid,
1140 const ghobject_t& oid,
1141 struct stat *st,
1142 bool allow_eio)
1143{
1144 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1145 CollectionRef c = _get_collection(cid);
1146 if (!c)
1147 return -ENOENT;
1148 RWLock::RLocker l(c->lock);
1149 OnodeRef o = c->get_onode(oid, false);
1150 if (!o || !o->exists)
1151 return -ENOENT;
1152 st->st_size = o->onode.size;
1153 st->st_blksize = 4096;
1154 st->st_blocks = (st->st_size + st->st_blksize - 1) / st->st_blksize;
1155 st->st_nlink = 1;
1156 return 0;
1157}
1158
1159int KStore::set_collection_opts(
1160 const coll_t& cid,
1161 const pool_opts_t& opts)
1162{
1163 return -EOPNOTSUPP;
1164}
1165
1166int KStore::read(
1167 const coll_t& cid,
1168 const ghobject_t& oid,
1169 uint64_t offset,
1170 size_t length,
1171 bufferlist& bl,
224ce89b 1172 uint32_t op_flags)
7c673cae
FG
1173{
1174 dout(15) << __func__ << " " << cid << " " << oid
1175 << " " << offset << "~" << length
1176 << dendl;
1177 bl.clear();
1178 CollectionRef c = _get_collection(cid);
1179 if (!c)
1180 return -ENOENT;
1181 RWLock::RLocker l(c->lock);
1182
1183 int r;
1184
1185 OnodeRef o = c->get_onode(oid, false);
1186 if (!o || !o->exists) {
1187 r = -ENOENT;
1188 goto out;
1189 }
1190
1191 if (offset == length && offset == 0)
1192 length = o->onode.size;
1193
1194 r = _do_read(o, offset, length, bl, op_flags);
1195
1196 out:
1197 dout(10) << __func__ << " " << cid << " " << oid
1198 << " " << offset << "~" << length
1199 << " = " << r << dendl;
1200 return r;
1201}
1202
1203int KStore::_do_read(
1204 OnodeRef o,
1205 uint64_t offset,
1206 size_t length,
1207 bufferlist& bl,
1208 uint32_t op_flags)
1209{
1210 int r = 0;
1211 uint64_t stripe_size = o->onode.stripe_size;
1212 uint64_t stripe_off;
1213
1214 dout(20) << __func__ << " " << offset << "~" << length << " size "
1215 << o->onode.size << " nid " << o->onode.nid << dendl;
1216 bl.clear();
1217
1218 if (offset > o->onode.size) {
1219 goto out;
1220 }
1221 if (offset + length > o->onode.size) {
1222 length = o->onode.size - offset;
1223 }
1224 if (stripe_size == 0) {
1225 bl.append_zero(length);
1226 r = length;
1227 goto out;
1228 }
1229
1230 o->flush();
1231
1232 stripe_off = offset % stripe_size;
1233 while (length > 0) {
1234 bufferlist stripe;
1235 _do_read_stripe(o, offset - stripe_off, &stripe);
1236 dout(30) << __func__ << " stripe " << offset - stripe_off << " got "
1237 << stripe.length() << dendl;
1238 unsigned swant = MIN(stripe_size - stripe_off, length);
1239 if (stripe.length()) {
1240 if (swant == stripe.length()) {
1241 bl.claim_append(stripe);
1242 dout(30) << __func__ << " taking full stripe" << dendl;
1243 } else {
1244 unsigned l = 0;
1245 if (stripe_off < stripe.length()) {
1246 l = MIN(stripe.length() - stripe_off, swant);
1247 bufferlist t;
1248 t.substr_of(stripe, stripe_off, l);
1249 bl.claim_append(t);
1250 dout(30) << __func__ << " taking " << stripe_off << "~" << l << dendl;
1251 }
1252 if (l < swant) {
1253 bl.append_zero(swant - l);
1254 dout(30) << __func__ << " adding " << swant - l << " zeros" << dendl;
1255 }
1256 }
1257 } else {
1258 dout(30) << __func__ << " generating " << swant << " zeros" << dendl;
1259 bl.append_zero(swant);
1260 }
1261 offset += swant;
1262 length -= swant;
1263 stripe_off = 0;
1264 }
1265 r = bl.length();
1266 dout(30) << " result:\n";
1267 bl.hexdump(*_dout);
1268 *_dout << dendl;
1269
1270 out:
1271 return r;
1272}
1273
1274int KStore::fiemap(
1275 const coll_t& cid,
1276 const ghobject_t& oid,
1277 uint64_t offset,
1278 size_t len,
1279 bufferlist& bl)
1280{
1281 map<uint64_t, uint64_t> m;
1282 int r = fiemap(cid, oid, offset, len, m);
1283 if (r >= 0) {
1284 ::encode(m, bl);
1285 }
1286
1287 return r;
1288}
1289
1290int KStore::fiemap(
1291 const coll_t& cid,
1292 const ghobject_t& oid,
1293 uint64_t offset,
1294 size_t len,
1295 map<uint64_t, uint64_t>& destmap)
1296{
1297 CollectionRef c = _get_collection(cid);
1298 if (!c)
1299 return -ENOENT;
1300 RWLock::RLocker l(c->lock);
1301
1302 OnodeRef o = c->get_onode(oid, false);
1303 if (!o || !o->exists) {
1304 return -ENOENT;
1305 }
1306
1307 if (offset > o->onode.size)
1308 goto out;
1309
1310 if (offset + len > o->onode.size) {
1311 len = o->onode.size - offset;
1312 }
1313
1314 dout(20) << __func__ << " " << offset << "~" << len << " size "
1315 << o->onode.size << dendl;
1316
1317 // FIXME: do something smarter here
1318 destmap[0] = o->onode.size;
1319
1320 out:
1321 dout(20) << __func__ << " " << offset << "~" << len
1322 << " size = 0 (" << destmap << ")" << dendl;
1323 return 0;
1324}
1325
1326int KStore::getattr(
1327 const coll_t& cid,
1328 const ghobject_t& oid,
1329 const char *name,
1330 bufferptr& value)
1331{
1332 dout(15) << __func__ << " " << cid << " " << oid << " " << name << dendl;
1333 CollectionRef c = _get_collection(cid);
1334 if (!c)
1335 return -ENOENT;
1336 RWLock::RLocker l(c->lock);
1337 int r;
1338 string k(name);
1339
1340 OnodeRef o = c->get_onode(oid, false);
1341 if (!o || !o->exists) {
1342 r = -ENOENT;
1343 goto out;
1344 }
1345
1346 if (!o->onode.attrs.count(k)) {
1347 r = -ENODATA;
1348 goto out;
1349 }
1350 value = o->onode.attrs[k];
1351 r = 0;
1352 out:
1353 dout(10) << __func__ << " " << cid << " " << oid << " " << name
1354 << " = " << r << dendl;
1355 return r;
1356}
1357
1358int KStore::getattrs(
1359 const coll_t& cid,
1360 const ghobject_t& oid,
1361 map<string,bufferptr>& aset)
1362{
1363 dout(15) << __func__ << " " << cid << " " << oid << dendl;
1364 CollectionRef c = _get_collection(cid);
1365 if (!c)
1366 return -ENOENT;
1367 RWLock::RLocker l(c->lock);
1368 int r;
1369
1370 OnodeRef o = c->get_onode(oid, false);
1371 if (!o || !o->exists) {
1372 r = -ENOENT;
1373 goto out;
1374 }
1375 aset = o->onode.attrs;
1376 r = 0;
1377 out:
1378 dout(10) << __func__ << " " << cid << " " << oid
1379 << " = " << r << dendl;
1380 return r;
1381}
1382
1383int KStore::list_collections(vector<coll_t>& ls)
1384{
1385 RWLock::RLocker l(coll_lock);
1386 for (ceph::unordered_map<coll_t, CollectionRef>::iterator p = coll_map.begin();
1387 p != coll_map.end();
1388 ++p)
1389 ls.push_back(p->first);
1390 return 0;
1391}
1392
1393bool KStore::collection_exists(const coll_t& c)
1394{
1395 RWLock::RLocker l(coll_lock);
1396 return coll_map.count(c);
1397}
1398
1399int KStore::collection_empty(const coll_t& cid, bool *empty)
1400{
1401 dout(15) << __func__ << " " << cid << dendl;
1402 vector<ghobject_t> ls;
1403 ghobject_t next;
1404 int r = collection_list(cid, ghobject_t(), ghobject_t::get_max(), 1,
1405 &ls, &next);
1406 if (r < 0) {
1407 derr << __func__ << " collection_list returned: " << cpp_strerror(r)
1408 << dendl;
1409 return r;
1410 }
1411 *empty = ls.empty();
1412 dout(10) << __func__ << " " << cid << " = " << (int)(*empty) << dendl;
1413 return 0;
1414}
1415
1416int KStore::collection_bits(const coll_t& cid)
1417{
1418 dout(15) << __func__ << " " << cid << dendl;
1419 CollectionHandle ch = _get_collection(cid);
1420 if (!ch)
1421 return -ENOENT;
1422 Collection *c = static_cast<Collection*>(ch.get());
1423 RWLock::RLocker l(c->lock);
1424 dout(10) << __func__ << " " << cid << " = " << c->cnode.bits << dendl;
1425 return c->cnode.bits;
1426}
1427
1428int KStore::collection_list(
1429 const coll_t& cid, const ghobject_t& start, const ghobject_t& end, int max,
1430 vector<ghobject_t> *ls, ghobject_t *pnext)
1431{
1432 CollectionHandle c = _get_collection(cid);
1433 if (!c)
1434 return -ENOENT;
1435 return collection_list(c, start, end, max, ls, pnext);
1436}
1437
1438int KStore::collection_list(
1439 CollectionHandle &c_, const ghobject_t& start, const ghobject_t& end, int max,
1440 vector<ghobject_t> *ls, ghobject_t *pnext)
1441
1442{
1443 Collection *c = static_cast<Collection*>(c_.get());
1444 dout(15) << __func__ << " " << c->cid
1445 << " start " << start << " end " << end << " max " << max << dendl;
1446 int r;
1447 {
1448 RWLock::RLocker l(c->lock);
1449 r = _collection_list(c, start, end, max, ls, pnext);
1450 }
1451
1452 dout(10) << __func__ << " " << c->cid
1453 << " start " << start << " end " << end << " max " << max
1454 << " = " << r << ", ls.size() = " << ls->size()
1455 << ", next = " << (pnext ? *pnext : ghobject_t()) << dendl;
1456 return r;
1457}
1458
1459int KStore::_collection_list(
1460 Collection* c, const ghobject_t& start, const ghobject_t& end, int max,
1461 vector<ghobject_t> *ls, ghobject_t *pnext)
1462{
1463 int r = 0;
1464 KeyValueDB::Iterator it;
1465 string temp_start_key, temp_end_key;
1466 string start_key, end_key;
1467 bool set_next = false;
1468 string pend;
1469 bool temp;
1470
1471 ghobject_t static_next;
1472 if (!pnext)
1473 pnext = &static_next;
1474
1475 if (start == ghobject_t::get_max() ||
1476 start.hobj.is_max()) {
1477 goto out;
1478 }
1479 get_coll_key_range(c->cid, c->cnode.bits, &temp_start_key, &temp_end_key,
1480 &start_key, &end_key);
1481 dout(20) << __func__
1482 << " range " << pretty_binary_string(temp_start_key)
1483 << " to " << pretty_binary_string(temp_end_key)
1484 << " and " << pretty_binary_string(start_key)
1485 << " to " << pretty_binary_string(end_key)
1486 << " start " << start << dendl;
1487 it = db->get_iterator(PREFIX_OBJ);
1488 if (start == ghobject_t() || start == c->cid.get_min_hobj()) {
1489 it->upper_bound(temp_start_key);
1490 temp = true;
1491 } else {
1492 string k;
1493 get_object_key(cct, start, &k);
1494 if (start.hobj.is_temp()) {
1495 temp = true;
1496 assert(k >= temp_start_key && k < temp_end_key);
1497 } else {
1498 temp = false;
1499 assert(k >= start_key && k < end_key);
1500 }
1501 dout(20) << " start from " << pretty_binary_string(k)
1502 << " temp=" << (int)temp << dendl;
1503 it->lower_bound(k);
1504 }
1505 if (end.hobj.is_max()) {
1506 pend = temp ? temp_end_key : end_key;
1507 } else {
1508 get_object_key(cct, end, &end_key);
1509 if (end.hobj.is_temp()) {
1510 if (temp)
1511 pend = end_key;
1512 else
1513 goto out;
1514 } else {
1515 pend = temp ? temp_end_key : end_key;
1516 }
1517 }
1518 dout(20) << __func__ << " pend " << pretty_binary_string(pend) << dendl;
1519 while (true) {
1520 if (!it->valid() || it->key() >= pend) {
1521 if (!it->valid())
1522 dout(20) << __func__ << " iterator not valid (end of db?)" << dendl;
1523 else
1524 dout(20) << __func__ << " key " << pretty_binary_string(it->key())
1525 << " > " << end << dendl;
1526 if (temp) {
1527 if (end.hobj.is_temp()) {
1528 break;
1529 }
1530 dout(30) << __func__ << " switch to non-temp namespace" << dendl;
1531 temp = false;
1532 it->upper_bound(start_key);
1533 pend = end_key;
1534 dout(30) << __func__ << " pend " << pretty_binary_string(pend) << dendl;
1535 continue;
1536 }
1537 break;
1538 }
1539 dout(20) << __func__ << " key " << pretty_binary_string(it->key()) << dendl;
1540 ghobject_t oid;
1541 int r = get_key_object(it->key(), &oid);
1542 assert(r == 0);
1543 if (ls->size() >= (unsigned)max) {
1544 dout(20) << __func__ << " reached max " << max << dendl;
1545 *pnext = oid;
1546 set_next = true;
1547 break;
1548 }
1549 ls->push_back(oid);
1550 it->next();
1551 }
1552out:
1553 if (!set_next) {
1554 *pnext = ghobject_t::get_max();
1555 }
1556 return r;
1557}
1558
1559// omap reads
1560
1561KStore::OmapIteratorImpl::OmapIteratorImpl(
1562 CollectionRef c, OnodeRef o, KeyValueDB::Iterator it)
1563 : c(c), o(o), it(it)
1564{
1565 RWLock::RLocker l(c->lock);
1566 if (o->onode.omap_head) {
1567 get_omap_key(o->onode.omap_head, string(), &head);
1568 get_omap_tail(o->onode.omap_head, &tail);
1569 it->lower_bound(head);
1570 }
1571}
1572
1573int KStore::OmapIteratorImpl::seek_to_first()
1574{
1575 RWLock::RLocker l(c->lock);
1576 if (o->onode.omap_head) {
1577 it->lower_bound(head);
1578 } else {
1579 it = KeyValueDB::Iterator();
1580 }
1581 return 0;
1582}
1583
1584int KStore::OmapIteratorImpl::upper_bound(const string& after)
1585{
1586 RWLock::RLocker l(c->lock);
1587 if (o->onode.omap_head) {
1588 string key;
1589 get_omap_key(o->onode.omap_head, after, &key);
1590 it->upper_bound(key);
1591 } else {
1592 it = KeyValueDB::Iterator();
1593 }
1594 return 0;
1595}
1596
1597int KStore::OmapIteratorImpl::lower_bound(const string& to)
1598{
1599 RWLock::RLocker l(c->lock);
1600 if (o->onode.omap_head) {
1601 string key;
1602 get_omap_key(o->onode.omap_head, to, &key);
1603 it->lower_bound(key);
1604 } else {
1605 it = KeyValueDB::Iterator();
1606 }
1607 return 0;
1608}
1609
1610bool KStore::OmapIteratorImpl::valid()
1611{
1612 RWLock::RLocker l(c->lock);
1613 if (o->onode.omap_head && it->valid() && it->raw_key().second <= tail) {
1614 return true;
1615 } else {
1616 return false;
1617 }
1618}
1619
1620int KStore::OmapIteratorImpl::next(bool validate)
1621{
1622 RWLock::RLocker l(c->lock);
1623 if (o->onode.omap_head) {
1624 it->next();
1625 return 0;
1626 } else {
1627 return -1;
1628 }
1629}
1630
1631string KStore::OmapIteratorImpl::key()
1632{
1633 RWLock::RLocker l(c->lock);
1634 assert(it->valid());
1635 string db_key = it->raw_key().second;
1636 string user_key;
1637 decode_omap_key(db_key, &user_key);
1638 return user_key;
1639}
1640
1641bufferlist KStore::OmapIteratorImpl::value()
1642{
1643 RWLock::RLocker l(c->lock);
1644 assert(it->valid());
1645 return it->value();
1646}
1647
1648int KStore::omap_get(
1649 const coll_t& cid, ///< [in] Collection containing oid
1650 const ghobject_t &oid, ///< [in] Object containing omap
1651 bufferlist *header, ///< [out] omap header
1652 map<string, bufferlist> *out /// < [out] Key to value map
1653 )
1654{
1655 dout(15) << __func__ << " " << cid << " oid " << oid << dendl;
1656 CollectionRef c = _get_collection(cid);
1657 if (!c)
1658 return -ENOENT;
1659 RWLock::RLocker l(c->lock);
1660 int r = 0;
1661 OnodeRef o = c->get_onode(oid, false);
1662 if (!o || !o->exists) {
1663 r = -ENOENT;
1664 goto out;
1665 }
1666 if (!o->onode.omap_head)
1667 goto out;
1668 o->flush();
1669 {
1670 KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
1671 string head, tail;
1672 get_omap_header(o->onode.omap_head, &head);
1673 get_omap_tail(o->onode.omap_head, &tail);
1674 it->lower_bound(head);
1675 while (it->valid()) {
1676 if (it->key() == head) {
1677 dout(30) << __func__ << " got header" << dendl;
1678 *header = it->value();
1679 } else if (it->key() >= tail) {
1680 dout(30) << __func__ << " reached tail" << dendl;
1681 break;
1682 } else {
1683 string user_key;
1684 decode_omap_key(it->key(), &user_key);
1685 dout(30) << __func__ << " got " << pretty_binary_string(it->key())
1686 << " -> " << user_key << dendl;
1687 assert(it->key() < tail);
1688 (*out)[user_key] = it->value();
1689 }
1690 it->next();
1691 }
1692 }
1693 out:
1694 dout(10) << __func__ << " " << cid << " oid " << oid << " = " << r << dendl;
1695 return r;
1696}
1697
1698int KStore::omap_get_header(
1699 const coll_t& cid, ///< [in] Collection containing oid
1700 const ghobject_t &oid, ///< [in] Object containing omap
1701 bufferlist *header, ///< [out] omap header
1702 bool allow_eio ///< [in] don't assert on eio
1703 )
1704{
1705 dout(15) << __func__ << " " << cid << " oid " << oid << dendl;
1706 CollectionRef c = _get_collection(cid);
1707 if (!c)
1708 return -ENOENT;
1709 RWLock::RLocker l(c->lock);
1710 int r = 0;
1711 OnodeRef o = c->get_onode(oid, false);
1712 if (!o || !o->exists) {
1713 r = -ENOENT;
1714 goto out;
1715 }
1716 if (!o->onode.omap_head)
1717 goto out;
1718 o->flush();
1719 {
1720 string head;
1721 get_omap_header(o->onode.omap_head, &head);
1722 if (db->get(PREFIX_OMAP, head, header) >= 0) {
1723 dout(30) << __func__ << " got header" << dendl;
1724 } else {
1725 dout(30) << __func__ << " no header" << dendl;
1726 }
1727 }
1728 out:
1729 dout(10) << __func__ << " " << cid << " oid " << oid << " = " << r << dendl;
1730 return r;
1731}
1732
1733int KStore::omap_get_keys(
1734 const coll_t& cid, ///< [in] Collection containing oid
1735 const ghobject_t &oid, ///< [in] Object containing omap
1736 set<string> *keys ///< [out] Keys defined on oid
1737 )
1738{
1739 dout(15) << __func__ << " " << cid << " oid " << oid << dendl;
1740 CollectionRef c = _get_collection(cid);
1741 if (!c)
1742 return -ENOENT;
1743 RWLock::RLocker l(c->lock);
1744 int r = 0;
1745 OnodeRef o = c->get_onode(oid, false);
1746 if (!o || !o->exists) {
1747 r = -ENOENT;
1748 goto out;
1749 }
1750 if (!o->onode.omap_head)
1751 goto out;
1752 o->flush();
1753 {
1754 KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
1755 string head, tail;
1756 get_omap_key(o->onode.omap_head, string(), &head);
1757 get_omap_tail(o->onode.omap_head, &tail);
1758 it->lower_bound(head);
1759 while (it->valid()) {
1760 if (it->key() >= tail) {
1761 dout(30) << __func__ << " reached tail" << dendl;
1762 break;
1763 }
1764 string user_key;
1765 decode_omap_key(it->key(), &user_key);
1766 dout(30) << __func__ << " got " << pretty_binary_string(it->key())
1767 << " -> " << user_key << dendl;
1768 assert(it->key() < tail);
1769 keys->insert(user_key);
1770 it->next();
1771 }
1772 }
1773 out:
1774 dout(10) << __func__ << " " << cid << " oid " << oid << " = " << r << dendl;
1775 return r;
1776}
1777
1778int KStore::omap_get_values(
1779 const coll_t& cid, ///< [in] Collection containing oid
1780 const ghobject_t &oid, ///< [in] Object containing omap
1781 const set<string> &keys, ///< [in] Keys to get
1782 map<string, bufferlist> *out ///< [out] Returned keys and values
1783 )
1784{
1785 dout(15) << __func__ << " " << cid << " oid " << oid << dendl;
1786 CollectionRef c = _get_collection(cid);
1787 if (!c)
1788 return -ENOENT;
1789 RWLock::RLocker l(c->lock);
1790 int r = 0;
1791 OnodeRef o = c->get_onode(oid, false);
1792 if (!o || !o->exists) {
1793 r = -ENOENT;
1794 goto out;
1795 }
1796 if (!o->onode.omap_head)
1797 goto out;
1798 o->flush();
1799 for (set<string>::const_iterator p = keys.begin(); p != keys.end(); ++p) {
1800 string key;
1801 get_omap_key(o->onode.omap_head, *p, &key);
1802 bufferlist val;
1803 if (db->get(PREFIX_OMAP, key, &val) >= 0) {
1804 dout(30) << __func__ << " got " << pretty_binary_string(key)
1805 << " -> " << *p << dendl;
1806 out->insert(make_pair(*p, val));
1807 }
1808 }
1809 out:
1810 dout(10) << __func__ << " " << cid << " oid " << oid << " = " << r << dendl;
1811 return r;
1812}
1813
1814int KStore::omap_check_keys(
1815 const coll_t& cid, ///< [in] Collection containing oid
1816 const ghobject_t &oid, ///< [in] Object containing omap
1817 const set<string> &keys, ///< [in] Keys to check
1818 set<string> *out ///< [out] Subset of keys defined on oid
1819 )
1820{
1821 dout(15) << __func__ << " " << cid << " oid " << oid << dendl;
1822 CollectionRef c = _get_collection(cid);
1823 if (!c)
1824 return -ENOENT;
1825 RWLock::RLocker l(c->lock);
1826 int r = 0;
1827 OnodeRef o = c->get_onode(oid, false);
1828 if (!o || !o->exists) {
1829 r = -ENOENT;
1830 goto out;
1831 }
1832 if (!o->onode.omap_head)
1833 goto out;
1834 o->flush();
1835 for (set<string>::const_iterator p = keys.begin(); p != keys.end(); ++p) {
1836 string key;
1837 get_omap_key(o->onode.omap_head, *p, &key);
1838 bufferlist val;
1839 if (db->get(PREFIX_OMAP, key, &val) >= 0) {
1840 dout(30) << __func__ << " have " << pretty_binary_string(key)
1841 << " -> " << *p << dendl;
1842 out->insert(*p);
1843 } else {
1844 dout(30) << __func__ << " miss " << pretty_binary_string(key)
1845 << " -> " << *p << dendl;
1846 }
1847 }
1848 out:
1849 dout(10) << __func__ << " " << cid << " oid " << oid << " = " << r << dendl;
1850 return r;
1851}
1852
1853ObjectMap::ObjectMapIterator KStore::get_omap_iterator(
1854 const coll_t& cid, ///< [in] collection
1855 const ghobject_t &oid ///< [in] object
1856 )
1857{
1858
1859 dout(10) << __func__ << " " << cid << " " << oid << dendl;
1860 CollectionRef c = _get_collection(cid);
1861 if (!c) {
1862 dout(10) << __func__ << " " << cid << "doesn't exist" <<dendl;
1863 return ObjectMap::ObjectMapIterator();
1864 }
1865 RWLock::RLocker l(c->lock);
1866 OnodeRef o = c->get_onode(oid, false);
1867 if (!o || !o->exists) {
1868 dout(10) << __func__ << " " << oid << "doesn't exist" <<dendl;
1869 return ObjectMap::ObjectMapIterator();
1870 }
1871 o->flush();
1872 dout(10) << __func__ << " header = " << o->onode.omap_head <<dendl;
1873 KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
1874 return ObjectMap::ObjectMapIterator(new OmapIteratorImpl(c, o, it));
1875}
1876
1877
1878// -----------------
1879// write helpers
1880
1881int KStore::_open_super_meta()
1882{
1883 // nid
1884 {
1885 nid_max = 0;
1886 bufferlist bl;
1887 db->get(PREFIX_SUPER, "nid_max", &bl);
1888 bufferlist::iterator p = bl.begin();
1889 try {
1890 ::decode(nid_max, p);
1891 } catch (buffer::error& e) {
1892 }
1893 dout(10) << __func__ << " old nid_max " << nid_max << dendl;
1894 nid_last = nid_max;
1895 }
1896 return 0;
1897}
1898
1899void KStore::_assign_nid(TransContext *txc, OnodeRef o)
1900{
1901 if (o->onode.nid)
1902 return;
1903 std::lock_guard<std::mutex> l(nid_lock);
1904 o->onode.nid = ++nid_last;
1905 dout(20) << __func__ << " " << o->oid << " nid " << o->onode.nid << dendl;
1906 if (nid_last > nid_max) {
1907 nid_max += cct->_conf->kstore_nid_prealloc;
1908 bufferlist bl;
1909 ::encode(nid_max, bl);
1910 txc->t->set(PREFIX_SUPER, "nid_max", bl);
1911 dout(10) << __func__ << " nid_max now " << nid_max << dendl;
1912 }
1913}
1914
1915KStore::TransContext *KStore::_txc_create(OpSequencer *osr)
1916{
1917 TransContext *txc = new TransContext(osr);
1918 txc->t = db->get_transaction();
1919 osr->queue_new(txc);
1920 dout(20) << __func__ << " osr " << osr << " = " << txc << dendl;
1921 return txc;
1922}
1923
1924void KStore::_txc_state_proc(TransContext *txc)
1925{
1926 while (true) {
1927 dout(10) << __func__ << " txc " << txc
1928 << " " << txc->get_state_name() << dendl;
1929 switch (txc->state) {
1930 case TransContext::STATE_PREPARE:
1931 txc->log_state_latency(logger, l_kstore_state_prepare_lat);
1932 txc->state = TransContext::STATE_KV_QUEUED;
1933 if (!cct->_conf->kstore_sync_transaction) {
1934 std::lock_guard<std::mutex> l(kv_lock);
1935 if (cct->_conf->kstore_sync_submit_transaction) {
1936 int r = db->submit_transaction(txc->t);
1937 assert(r == 0);
1938 }
1939 kv_queue.push_back(txc);
1940 kv_cond.notify_one();
1941 return;
1942 }
1943 {
1944 int r = db->submit_transaction_sync(txc->t);
1945 assert(r == 0);
1946 }
1947 break;
1948
1949 case TransContext::STATE_KV_QUEUED:
1950 txc->log_state_latency(logger, l_kstore_state_kv_queued_lat);
1951 txc->state = TransContext::STATE_KV_DONE;
1952 _txc_finish_kv(txc);
1953 // ** fall-thru **
1954
1955 case TransContext::STATE_KV_DONE:
1956 txc->log_state_latency(logger, l_kstore_state_kv_done_lat);
1957 txc->state = TransContext::STATE_FINISHING;
1958 // ** fall-thru **
1959
1960 case TransContext::TransContext::STATE_FINISHING:
1961 txc->log_state_latency(logger, l_kstore_state_finishing_lat);
1962 _txc_finish(txc);
1963 return;
1964
1965 default:
1966 derr << __func__ << " unexpected txc " << txc
1967 << " state " << txc->get_state_name() << dendl;
1968 assert(0 == "unexpected txc state");
1969 return;
1970 }
1971 }
1972}
1973
1974void KStore::_txc_finalize(OpSequencer *osr, TransContext *txc)
1975{
1976 dout(20) << __func__ << " osr " << osr << " txc " << txc
1977 << " onodes " << txc->onodes << dendl;
1978
1979 // finalize onodes
1980 for (set<OnodeRef>::iterator p = txc->onodes.begin();
1981 p != txc->onodes.end();
1982 ++p) {
1983 bufferlist bl;
1984 ::encode((*p)->onode, bl);
1985 dout(20) << " onode size is " << bl.length() << dendl;
1986 txc->t->set(PREFIX_OBJ, (*p)->key, bl);
1987
1988 std::lock_guard<std::mutex> l((*p)->flush_lock);
1989 (*p)->flush_txns.insert(txc);
1990 }
1991}
1992
1993void KStore::_txc_finish_kv(TransContext *txc)
1994{
1995 dout(20) << __func__ << " txc " << txc << dendl;
1996
1997 // warning: we're calling onreadable_sync inside the sequencer lock
1998 if (txc->onreadable_sync) {
1999 txc->onreadable_sync->complete(0);
2000 txc->onreadable_sync = NULL;
2001 }
2002 if (txc->onreadable) {
2003 finisher.queue(txc->onreadable);
2004 txc->onreadable = NULL;
2005 }
2006 if (txc->oncommit) {
2007 finisher.queue(txc->oncommit);
2008 txc->oncommit = NULL;
2009 }
2010 if (!txc->oncommits.empty()) {
2011 finisher.queue(txc->oncommits);
2012 }
2013
2014 throttle_ops.put(txc->ops);
2015 throttle_bytes.put(txc->bytes);
2016}
2017
2018void KStore::_txc_finish(TransContext *txc)
2019{
2020 dout(20) << __func__ << " " << txc << " onodes " << txc->onodes << dendl;
2021 assert(txc->state == TransContext::STATE_FINISHING);
2022
2023 for (set<OnodeRef>::iterator p = txc->onodes.begin();
2024 p != txc->onodes.end();
2025 ++p) {
2026 std::lock_guard<std::mutex> l((*p)->flush_lock);
2027 dout(20) << __func__ << " onode " << *p << " had " << (*p)->flush_txns
2028 << dendl;
2029 assert((*p)->flush_txns.count(txc));
2030 (*p)->flush_txns.erase(txc);
2031 if ((*p)->flush_txns.empty()) {
2032 (*p)->flush_cond.notify_all();
2033 (*p)->clear_pending_stripes();
2034 }
2035 }
2036
2037 // clear out refs
2038 txc->onodes.clear();
2039
2040 while (!txc->removed_collections.empty()) {
2041 _queue_reap_collection(txc->removed_collections.front());
2042 txc->removed_collections.pop_front();
2043 }
2044
2045 OpSequencerRef osr = txc->osr;
2046 {
2047 std::lock_guard<std::mutex> l(osr->qlock);
2048 txc->state = TransContext::STATE_DONE;
2049 }
2050
2051 _osr_reap_done(osr.get());
2052}
2053
2054void KStore::_osr_reap_done(OpSequencer *osr)
2055{
2056 std::lock_guard<std::mutex> l(osr->qlock);
2057 dout(20) << __func__ << " osr " << osr << dendl;
2058 while (!osr->q.empty()) {
2059 TransContext *txc = &osr->q.front();
2060 dout(20) << __func__ << " txc " << txc << " " << txc->get_state_name()
2061 << dendl;
2062 if (txc->state != TransContext::STATE_DONE) {
2063 break;
2064 }
2065
2066 if (txc->first_collection) {
2067 txc->first_collection->onode_map.trim(cct->_conf->kstore_onode_map_size);
2068 }
2069
2070 osr->q.pop_front();
2071 txc->log_state_latency(logger, l_kstore_state_done_lat);
2072 delete txc;
2073 osr->qcond.notify_all();
2074 if (osr->q.empty())
2075 dout(20) << __func__ << " osr " << osr << " q now empty" << dendl;
2076 }
2077}
2078
2079void KStore::_kv_sync_thread()
2080{
2081 dout(10) << __func__ << " start" << dendl;
2082 std::unique_lock<std::mutex> l(kv_lock);
2083 while (true) {
2084 assert(kv_committing.empty());
2085 if (kv_queue.empty()) {
2086 if (kv_stop)
2087 break;
2088 dout(20) << __func__ << " sleep" << dendl;
2089 kv_sync_cond.notify_all();
2090 kv_cond.wait(l);
2091 dout(20) << __func__ << " wake" << dendl;
2092 } else {
2093 dout(20) << __func__ << " committing " << kv_queue.size() << dendl;
2094 kv_committing.swap(kv_queue);
2095 utime_t start = ceph_clock_now();
2096 l.unlock();
2097
2098 dout(30) << __func__ << " committing txc " << kv_committing << dendl;
2099
2100 // one transaction to force a sync
2101 KeyValueDB::Transaction t = db->get_transaction();
2102 if (!cct->_conf->kstore_sync_submit_transaction) {
2103 for (std::deque<TransContext *>::iterator it = kv_committing.begin();
2104 it != kv_committing.end();
2105 ++it) {
2106 int r = db->submit_transaction((*it)->t);
2107 assert(r == 0);
2108 }
2109 }
2110 int r = db->submit_transaction_sync(t);
2111 assert(r == 0);
2112 utime_t finish = ceph_clock_now();
2113 utime_t dur = finish - start;
2114 dout(20) << __func__ << " committed " << kv_committing.size()
2115 << " in " << dur << dendl;
2116 while (!kv_committing.empty()) {
2117 TransContext *txc = kv_committing.front();
2118 _txc_state_proc(txc);
2119 kv_committing.pop_front();
2120 }
2121
2122 // this is as good a place as any ...
2123 _reap_collections();
2124
2125 l.lock();
2126 }
2127 }
2128 dout(10) << __func__ << " finish" << dendl;
2129}
2130
2131
2132// ---------------------------
2133// transactions
2134
2135int KStore::queue_transactions(
2136 Sequencer *posr,
2137 vector<Transaction>& tls,
2138 TrackedOpRef op,
2139 ThreadPool::TPHandle *handle)
2140{
2141 Context *onreadable;
2142 Context *ondisk;
2143 Context *onreadable_sync;
2144 ObjectStore::Transaction::collect_contexts(
2145 tls, &onreadable, &ondisk, &onreadable_sync);
2146
2147 // set up the sequencer
2148 OpSequencer *osr;
2149 assert(posr);
2150 if (posr->p) {
2151 osr = static_cast<OpSequencer *>(posr->p.get());
2152 dout(10) << __func__ << " existing " << osr << " " << *osr << dendl;
2153 } else {
2154 osr = new OpSequencer(cct);
2155 osr->parent = posr;
2156 posr->p = osr;
2157 dout(10) << __func__ << " new " << osr << " " << *osr << dendl;
2158 }
2159
2160 // prepare
2161 TransContext *txc = _txc_create(osr);
2162 txc->onreadable = onreadable;
2163 txc->onreadable_sync = onreadable_sync;
2164 txc->oncommit = ondisk;
2165
2166 for (vector<Transaction>::iterator p = tls.begin(); p != tls.end(); ++p) {
2167 (*p).set_osr(osr);
2168 txc->ops += (*p).get_num_ops();
2169 txc->bytes += (*p).get_num_bytes();
2170 _txc_add_transaction(txc, &(*p));
2171 }
2172
2173 _txc_finalize(osr, txc);
2174
2175 throttle_ops.get(txc->ops);
2176 throttle_bytes.get(txc->bytes);
2177
2178 // execute (start)
2179 _txc_state_proc(txc);
2180 return 0;
2181}
2182
2183void KStore::_txc_add_transaction(TransContext *txc, Transaction *t)
2184{
2185 Transaction::iterator i = t->begin();
2186
2187 dout(30) << __func__ << " transaction dump:\n";
2188 JSONFormatter f(true);
2189 f.open_object_section("transaction");
2190 t->dump(&f);
2191 f.close_section();
2192 f.flush(*_dout);
2193 *_dout << dendl;
2194
2195 vector<CollectionRef> cvec(i.colls.size());
2196 unsigned j = 0;
2197 for (vector<coll_t>::iterator p = i.colls.begin(); p != i.colls.end();
2198 ++p, ++j) {
2199 cvec[j] = _get_collection(*p);
2200
2201 // note first collection we reference
2202 if (!j && !txc->first_collection)
2203 txc->first_collection = cvec[j];
2204 }
2205 vector<OnodeRef> ovec(i.objects.size());
2206
2207 for (int pos = 0; i.have_op(); ++pos) {
2208 Transaction::Op *op = i.decode_op();
2209 int r = 0;
2210
2211 // no coll or obj
2212 if (op->op == Transaction::OP_NOP)
2213 continue;
2214
2215 // collection operations
2216 CollectionRef &c = cvec[op->cid];
2217 switch (op->op) {
2218 case Transaction::OP_RMCOLL:
2219 {
2220 coll_t cid = i.get_cid(op->cid);
2221 r = _remove_collection(txc, cid, &c);
2222 if (!r)
2223 continue;
2224 }
2225 break;
2226
2227 case Transaction::OP_MKCOLL:
2228 {
2229 assert(!c);
2230 coll_t cid = i.get_cid(op->cid);
2231 r = _create_collection(txc, cid, op->split_bits, &c);
2232 if (!r)
2233 continue;
2234 }
2235 break;
2236
2237 case Transaction::OP_SPLIT_COLLECTION:
2238 assert(0 == "deprecated");
2239 break;
2240
2241 case Transaction::OP_SPLIT_COLLECTION2:
2242 {
2243 uint32_t bits = op->split_bits;
2244 uint32_t rem = op->split_rem;
2245 r = _split_collection(txc, c, cvec[op->dest_cid], bits, rem);
2246 if (!r)
2247 continue;
2248 }
2249 break;
2250
2251 case Transaction::OP_COLL_HINT:
2252 {
2253 uint32_t type = op->hint_type;
2254 bufferlist hint;
2255 i.decode_bl(hint);
2256 bufferlist::iterator hiter = hint.begin();
2257 if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) {
2258 uint32_t pg_num;
2259 uint64_t num_objs;
2260 ::decode(pg_num, hiter);
2261 ::decode(num_objs, hiter);
2262 dout(10) << __func__ << " collection hint objects is a no-op, "
2263 << " pg_num " << pg_num << " num_objects " << num_objs
2264 << dendl;
2265 } else {
2266 // Ignore the hint
2267 dout(10) << __func__ << " unknown collection hint " << type << dendl;
2268 }
2269 continue;
2270 }
2271 break;
2272
2273 case Transaction::OP_COLL_SETATTR:
2274 r = -EOPNOTSUPP;
2275 break;
2276
2277 case Transaction::OP_COLL_RMATTR:
2278 r = -EOPNOTSUPP;
2279 break;
2280
2281 case Transaction::OP_COLL_RENAME:
2282 assert(0 == "not implemented");
2283 break;
2284 }
2285 if (r < 0) {
2286 derr << " error " << cpp_strerror(r)
2287 << " not handled on operation " << op->op
2288 << " (op " << pos << ", counting from 0)" << dendl;
2289 dout(0) << " transaction dump:\n";
2290 JSONFormatter f(true);
2291 f.open_object_section("transaction");
2292 t->dump(&f);
2293 f.close_section();
2294 f.flush(*_dout);
2295 *_dout << dendl;
2296 assert(0 == "unexpected error");
2297 }
2298
2299 // object operations
2300 RWLock::WLocker l(c->lock);
2301 OnodeRef &o = ovec[op->oid];
2302 if (!o) {
2303 // these operations implicity create the object
2304 bool create = false;
2305 if (op->op == Transaction::OP_TOUCH ||
2306 op->op == Transaction::OP_WRITE ||
2307 op->op == Transaction::OP_ZERO) {
2308 create = true;
2309 }
2310 ghobject_t oid = i.get_oid(op->oid);
2311 o = c->get_onode(oid, create);
2312 if (!create) {
2313 if (!o || !o->exists) {
2314 dout(10) << __func__ << " op " << op->op << " got ENOENT on "
2315 << oid << dendl;
2316 r = -ENOENT;
2317 goto endop;
2318 }
2319 }
2320 }
2321
2322 switch (op->op) {
2323 case Transaction::OP_TOUCH:
2324 r = _touch(txc, c, o);
2325 break;
2326
2327 case Transaction::OP_WRITE:
2328 {
2329 uint64_t off = op->off;
2330 uint64_t len = op->len;
2331 uint32_t fadvise_flags = i.get_fadvise_flags();
2332 bufferlist bl;
2333 i.decode_bl(bl);
2334 r = _write(txc, c, o, off, len, bl, fadvise_flags);
2335 }
2336 break;
2337
2338 case Transaction::OP_ZERO:
2339 {
2340 uint64_t off = op->off;
2341 uint64_t len = op->len;
2342 r = _zero(txc, c, o, off, len);
2343 }
2344 break;
2345
2346 case Transaction::OP_TRIMCACHE:
2347 {
2348 // deprecated, no-op
2349 }
2350 break;
2351
2352 case Transaction::OP_TRUNCATE:
2353 {
2354 uint64_t off = op->off;
2355 r = _truncate(txc, c, o, off);
2356 }
2357 break;
2358
2359 case Transaction::OP_REMOVE:
2360 r = _remove(txc, c, o);
2361 break;
2362
2363 case Transaction::OP_SETATTR:
2364 {
2365 string name = i.decode_string();
2366 bufferlist bl;
2367 i.decode_bl(bl);
2368 map<string, bufferptr> to_set;
2369 to_set[name] = bufferptr(bl.c_str(), bl.length());
2370 r = _setattrs(txc, c, o, to_set);
2371 }
2372 break;
2373
2374 case Transaction::OP_SETATTRS:
2375 {
2376 map<string, bufferptr> aset;
2377 i.decode_attrset(aset);
2378 r = _setattrs(txc, c, o, aset);
2379 }
2380 break;
2381
2382 case Transaction::OP_RMATTR:
2383 {
2384 string name = i.decode_string();
2385 r = _rmattr(txc, c, o, name);
2386 }
2387 break;
2388
2389 case Transaction::OP_RMATTRS:
2390 {
2391 r = _rmattrs(txc, c, o);
2392 }
2393 break;
2394
2395 case Transaction::OP_CLONE:
2396 {
2397 const ghobject_t& noid = i.get_oid(op->dest_oid);
2398 OnodeRef no = c->get_onode(noid, true);
2399 r = _clone(txc, c, o, no);
2400 }
2401 break;
2402
2403 case Transaction::OP_CLONERANGE:
2404 assert(0 == "deprecated");
2405 break;
2406
2407 case Transaction::OP_CLONERANGE2:
2408 {
2409 const ghobject_t& noid = i.get_oid(op->dest_oid);
2410 OnodeRef no = c->get_onode(noid, true);
2411 uint64_t srcoff = op->off;
2412 uint64_t len = op->len;
2413 uint64_t dstoff = op->dest_off;
2414 r = _clone_range(txc, c, o, no, srcoff, len, dstoff);
2415 }
2416 break;
2417
2418 case Transaction::OP_COLL_ADD:
2419 assert(0 == "not implemented");
2420 break;
2421
2422 case Transaction::OP_COLL_REMOVE:
2423 assert(0 == "not implemented");
2424 break;
2425
2426 case Transaction::OP_COLL_MOVE:
2427 assert(0 == "deprecated");
2428 break;
2429
2430 case Transaction::OP_COLL_MOVE_RENAME:
2431 {
2432 assert(op->cid == op->dest_cid);
2433 const ghobject_t& noid = i.get_oid(op->dest_oid);
2434 OnodeRef no = c->get_onode(noid, true);
2435 r = _rename(txc, c, o, no, noid);
2436 o.reset();
2437 }
2438 break;
2439
2440 case Transaction::OP_TRY_RENAME:
2441 {
2442 const ghobject_t& noid = i.get_oid(op->dest_oid);
2443 OnodeRef no = c->get_onode(noid, true);
2444 r = _rename(txc, c, o, no, noid);
2445 if (r == -ENOENT)
2446 r = 0;
2447 o.reset();
2448 }
2449 break;
2450
2451 case Transaction::OP_OMAP_CLEAR:
2452 {
2453 r = _omap_clear(txc, c, o);
2454 }
2455 break;
2456 case Transaction::OP_OMAP_SETKEYS:
2457 {
2458 bufferlist aset_bl;
2459 i.decode_attrset_bl(&aset_bl);
2460 r = _omap_setkeys(txc, c, o, aset_bl);
2461 }
2462 break;
2463 case Transaction::OP_OMAP_RMKEYS:
2464 {
2465 bufferlist keys_bl;
2466 i.decode_keyset_bl(&keys_bl);
2467 r = _omap_rmkeys(txc, c, o, keys_bl);
2468 }
2469 break;
2470 case Transaction::OP_OMAP_RMKEYRANGE:
2471 {
2472 string first, last;
2473 first = i.decode_string();
2474 last = i.decode_string();
2475 r = _omap_rmkey_range(txc, c, o, first, last);
2476 }
2477 break;
2478 case Transaction::OP_OMAP_SETHEADER:
2479 {
2480 bufferlist bl;
2481 i.decode_bl(bl);
2482 r = _omap_setheader(txc, c, o, bl);
2483 }
2484 break;
2485
2486 case Transaction::OP_SETALLOCHINT:
2487 {
2488 uint64_t expected_object_size = op->expected_object_size;
2489 uint64_t expected_write_size = op->expected_write_size;
2490 uint32_t flags = op->alloc_hint_flags;
2491 r = _setallochint(txc, c, o,
2492 expected_object_size,
2493 expected_write_size,
2494 flags);
2495 }
2496 break;
2497
2498 default:
2499 derr << "bad op " << op->op << dendl;
2500 ceph_abort();
2501 }
2502
2503 endop:
2504 if (r < 0) {
2505 bool ok = false;
2506
2507 if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE ||
2508 op->op == Transaction::OP_CLONE ||
2509 op->op == Transaction::OP_CLONERANGE2 ||
2510 op->op == Transaction::OP_COLL_ADD))
2511 // -ENOENT is usually okay
2512 ok = true;
2513 if (r == -ENODATA)
2514 ok = true;
2515
2516 if (!ok) {
2517 const char *msg = "unexpected error code";
2518
2519 if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE ||
2520 op->op == Transaction::OP_CLONE ||
2521 op->op == Transaction::OP_CLONERANGE2))
2522 msg = "ENOENT on clone suggests osd bug";
2523
2524 if (r == -ENOSPC)
2525 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
2526 // by partially applying transactions.
2527 msg = "ENOSPC from key value store, misconfigured cluster";
2528
2529 if (r == -ENOTEMPTY) {
2530 msg = "ENOTEMPTY suggests garbage data in osd data dir";
2531 }
2532
2533 dout(0) << " error " << cpp_strerror(r) << " not handled on operation " << op->op
2534 << " (op " << pos << ", counting from 0)" << dendl;
2535 dout(0) << msg << dendl;
2536 dout(0) << " transaction dump:\n";
2537 JSONFormatter f(true);
2538 f.open_object_section("transaction");
2539 t->dump(&f);
2540 f.close_section();
2541 f.flush(*_dout);
2542 *_dout << dendl;
2543 assert(0 == "unexpected error");
2544 }
2545 }
2546 }
2547}
2548
2549
2550
2551// -----------------
2552// write operations
2553
2554int KStore::_touch(TransContext *txc,
2555 CollectionRef& c,
2556 OnodeRef &o)
2557{
2558 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2559 int r = 0;
2560 o->exists = true;
2561 _assign_nid(txc, o);
2562 txc->write_onode(o);
2563 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
2564 return r;
2565}
2566
2567void KStore::_dump_onode(OnodeRef o)
2568{
2569 dout(30) << __func__ << " " << o
2570 << " nid " << o->onode.nid
2571 << " size " << o->onode.size
2572 << " expected_object_size " << o->onode.expected_object_size
2573 << " expected_write_size " << o->onode.expected_write_size
2574 << dendl;
2575 for (map<string,bufferptr>::iterator p = o->onode.attrs.begin();
2576 p != o->onode.attrs.end();
2577 ++p) {
2578 dout(30) << __func__ << " attr " << p->first
2579 << " len " << p->second.length() << dendl;
2580 }
2581}
2582
2583void KStore::_do_read_stripe(OnodeRef o, uint64_t offset, bufferlist *pbl)
2584{
2585 map<uint64_t,bufferlist>::iterator p = o->pending_stripes.find(offset);
2586 if (p == o->pending_stripes.end()) {
2587 string key;
2588 get_data_key(o->onode.nid, offset, &key);
2589 db->get(PREFIX_DATA, key, pbl);
2590 o->pending_stripes[offset] = *pbl;
2591 } else {
2592 *pbl = p->second;
2593 }
2594}
2595
2596void KStore::_do_write_stripe(TransContext *txc, OnodeRef o,
2597 uint64_t offset, bufferlist& bl)
2598{
2599 o->pending_stripes[offset] = bl;
2600 string key;
2601 get_data_key(o->onode.nid, offset, &key);
2602 txc->t->set(PREFIX_DATA, key, bl);
2603}
2604
2605void KStore::_do_remove_stripe(TransContext *txc, OnodeRef o, uint64_t offset)
2606{
2607 o->pending_stripes.erase(offset);
2608 string key;
2609 get_data_key(o->onode.nid, offset, &key);
2610 txc->t->rmkey(PREFIX_DATA, key);
2611}
2612
2613int KStore::_do_write(TransContext *txc,
2614 OnodeRef o,
2615 uint64_t offset, uint64_t length,
2616 bufferlist& orig_bl,
2617 uint32_t fadvise_flags)
2618{
2619 int r = 0;
2620
2621 dout(20) << __func__
2622 << " " << o->oid << " " << offset << "~" << length
2623 << " - have " << o->onode.size
2624 << " bytes, nid " << o->onode.nid << dendl;
2625 _dump_onode(o);
2626 o->exists = true;
2627
2628 if (length == 0) {
2629 return 0;
2630 }
2631
2632 uint64_t stripe_size = o->onode.stripe_size;
2633 if (!stripe_size) {
2634 o->onode.stripe_size = cct->_conf->kstore_default_stripe_size;
2635 stripe_size = o->onode.stripe_size;
2636 }
2637
2638 unsigned bl_off = 0;
2639 while (length > 0) {
2640 uint64_t offset_rem = offset % stripe_size;
2641 uint64_t end_rem = (offset + length) % stripe_size;
2642 if (offset_rem == 0 && end_rem == 0) {
2643 bufferlist bl;
2644 bl.substr_of(orig_bl, bl_off, stripe_size);
2645 dout(30) << __func__ << " full stripe " << offset << dendl;
2646 _do_write_stripe(txc, o, offset, bl);
2647 offset += stripe_size;
2648 length -= stripe_size;
2649 bl_off += stripe_size;
2650 continue;
2651 }
2652 uint64_t stripe_off = offset - offset_rem;
2653 bufferlist prev;
2654 _do_read_stripe(o, stripe_off, &prev);
2655 dout(20) << __func__ << " read previous stripe " << stripe_off
2656 << ", got " << prev.length() << dendl;
2657 bufferlist bl;
2658 if (offset_rem) {
2659 unsigned p = MIN(prev.length(), offset_rem);
2660 if (p) {
2661 dout(20) << __func__ << " reuse leading " << p << " bytes" << dendl;
2662 bl.substr_of(prev, 0, p);
2663 }
2664 if (p < offset_rem) {
2665 dout(20) << __func__ << " add leading " << offset_rem - p << " zeros" << dendl;
2666 bl.append_zero(offset_rem - p);
2667 }
2668 }
2669 unsigned use = stripe_size - offset_rem;
2670 if (use > length)
2671 use -= stripe_size - end_rem;
2672 dout(20) << __func__ << " using " << use << " for this stripe" << dendl;
2673 bufferlist t;
2674 t.substr_of(orig_bl, bl_off, use);
2675 bl.claim_append(t);
2676 bl_off += use;
2677 if (end_rem) {
2678 if (end_rem < prev.length()) {
2679 unsigned l = prev.length() - end_rem;
2680 dout(20) << __func__ << " reuse trailing " << l << " bytes" << dendl;
2681 bufferlist t;
2682 t.substr_of(prev, end_rem, l);
2683 bl.claim_append(t);
2684 }
2685 }
2686 dout(30) << " writing:\n";
2687 bl.hexdump(*_dout);
2688 *_dout << dendl;
2689 _do_write_stripe(txc, o, stripe_off, bl);
2690 offset += use;
2691 length -= use;
2692 }
2693
2694 if (offset > o->onode.size) {
2695 dout(20) << __func__ << " extending size to " << offset + length
2696 << dendl;
2697 o->onode.size = offset;
2698 }
2699
2700 return r;
2701}
2702
2703int KStore::_write(TransContext *txc,
2704 CollectionRef& c,
2705 OnodeRef& o,
2706 uint64_t offset, size_t length,
2707 bufferlist& bl,
2708 uint32_t fadvise_flags)
2709{
2710 dout(15) << __func__ << " " << c->cid << " " << o->oid
2711 << " " << offset << "~" << length
2712 << dendl;
2713 _assign_nid(txc, o);
2714 int r = _do_write(txc, o, offset, length, bl, fadvise_flags);
2715 txc->write_onode(o);
2716
2717 dout(10) << __func__ << " " << c->cid << " " << o->oid
2718 << " " << offset << "~" << length
2719 << " = " << r << dendl;
2720 return r;
2721}
2722
2723int KStore::_zero(TransContext *txc,
2724 CollectionRef& c,
2725 OnodeRef& o,
2726 uint64_t offset, size_t length)
2727{
2728 dout(15) << __func__ << " " << c->cid << " " << o->oid
2729 << " " << offset << "~" << length
2730 << dendl;
2731 int r = 0;
2732 o->exists = true;
2733
2734 _dump_onode(o);
2735 _assign_nid(txc, o);
2736
2737 uint64_t stripe_size = o->onode.stripe_size;
2738 if (stripe_size) {
2739 uint64_t end = offset + length;
2740 uint64_t pos = offset;
2741 uint64_t stripe_off = pos % stripe_size;
2742 while (pos < offset + length) {
2743 if (stripe_off || end - pos < stripe_size) {
2744 bufferlist stripe;
2745 _do_read_stripe(o, pos - stripe_off, &stripe);
2746 dout(30) << __func__ << " stripe " << pos - stripe_off << " got "
2747 << stripe.length() << dendl;
2748 bufferlist bl;
2749 bl.substr_of(stripe, 0, MIN(stripe.length(), stripe_off));
2750 if (end >= pos - stripe_off + stripe_size ||
2751 end >= o->onode.size) {
2752 dout(20) << __func__ << " truncated stripe " << pos - stripe_off
2753 << " to " << bl.length() << dendl;
2754 } else {
2755 auto len = end - (pos - stripe_off + bl.length());
2756 bl.append_zero(len);
2757 dout(20) << __func__ << " adding " << len << " of zeros" << dendl;
2758 if (stripe.length() > bl.length()) {
2759 unsigned l = stripe.length() - bl.length();
2760 bufferlist t;
2761 t.substr_of(stripe, stripe.length() - l, l);
2762 dout(20) << __func__ << " keeping tail " << l << " of stripe" << dendl;
2763 bl.claim_append(t);
2764 }
2765 }
2766 _do_write_stripe(txc, o, pos - stripe_off, bl);
2767 pos += stripe_size - stripe_off;
2768 stripe_off = 0;
2769 } else {
2770 dout(20) << __func__ << " rm stripe " << pos << dendl;
2771 _do_remove_stripe(txc, o, pos - stripe_off);
2772 pos += stripe_size;
2773 }
2774 }
2775 }
2776 if (offset + length > o->onode.size) {
2777 o->onode.size = offset + length;
2778 dout(20) << __func__ << " extending size to " << offset + length
2779 << dendl;
2780 }
2781 txc->write_onode(o);
2782
2783 dout(10) << __func__ << " " << c->cid << " " << o->oid
2784 << " " << offset << "~" << length
2785 << " = " << r << dendl;
2786 return r;
2787}
2788
2789int KStore::_do_truncate(TransContext *txc, OnodeRef o, uint64_t offset)
2790{
2791 uint64_t stripe_size = o->onode.stripe_size;
2792
2793 o->flush();
2794
2795 // trim down stripes
2796 if (stripe_size) {
2797 uint64_t pos = offset;
2798 uint64_t stripe_off = pos % stripe_size;
2799 while (pos < o->onode.size) {
2800 if (stripe_off) {
2801 bufferlist stripe;
2802 _do_read_stripe(o, pos - stripe_off, &stripe);
2803 dout(30) << __func__ << " stripe " << pos - stripe_off << " got "
2804 << stripe.length() << dendl;
2805 bufferlist t;
2806 t.substr_of(stripe, 0, MIN(stripe_off, stripe.length()));
2807 _do_write_stripe(txc, o, pos - stripe_off, t);
2808 dout(20) << __func__ << " truncated stripe " << pos - stripe_off
2809 << " to " << t.length() << dendl;
2810 pos += stripe_size - stripe_off;
2811 stripe_off = 0;
2812 } else {
2813 dout(20) << __func__ << " rm stripe " << pos << dendl;
2814 _do_remove_stripe(txc, o, pos - stripe_off);
2815 pos += stripe_size;
2816 }
2817 }
2818
2819 // trim down cached tail
2820 if (o->tail_bl.length()) {
2821 if (offset / stripe_size != o->onode.size / stripe_size) {
2822 dout(20) << __func__ << " clear cached tail" << dendl;
2823 o->clear_tail();
2824 }
2825 }
2826 }
2827
2828 o->onode.size = offset;
2829 dout(10) << __func__ << " truncate size to " << offset << dendl;
2830
2831 txc->write_onode(o);
2832 return 0;
2833}
2834
2835int KStore::_truncate(TransContext *txc,
2836 CollectionRef& c,
2837 OnodeRef& o,
2838 uint64_t offset)
2839{
2840 dout(15) << __func__ << " " << c->cid << " " << o->oid
2841 << " " << offset
2842 << dendl;
2843 int r = _do_truncate(txc, o, offset);
2844 dout(10) << __func__ << " " << c->cid << " " << o->oid
2845 << " " << offset
2846 << " = " << r << dendl;
2847 return r;
2848}
2849
2850int KStore::_do_remove(TransContext *txc,
2851 OnodeRef o)
2852{
2853 string key;
2854
2855 _do_truncate(txc, o, 0);
2856
2857 o->onode.size = 0;
2858 if (o->onode.omap_head) {
2859 _do_omap_clear(txc, o->onode.omap_head);
2860 }
2861 o->exists = false;
2862 o->onode = kstore_onode_t();
2863 txc->onodes.erase(o);
2864 get_object_key(cct, o->oid, &key);
2865 txc->t->rmkey(PREFIX_OBJ, key);
2866 return 0;
2867}
2868
2869int KStore::_remove(TransContext *txc,
2870 CollectionRef& c,
2871 OnodeRef &o)
2872{
2873 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2874 int r = _do_remove(txc, o);
2875 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
2876 return r;
2877}
2878
2879int KStore::_setattr(TransContext *txc,
2880 CollectionRef& c,
2881 OnodeRef& o,
2882 const string& name,
2883 bufferptr& val)
2884{
2885 dout(15) << __func__ << " " << c->cid << " " << o->oid
2886 << " " << name << " (" << val.length() << " bytes)"
2887 << dendl;
2888 int r = 0;
2889 o->onode.attrs[name] = val;
2890 txc->write_onode(o);
2891 dout(10) << __func__ << " " << c->cid << " " << o->oid
2892 << " " << name << " (" << val.length() << " bytes)"
2893 << " = " << r << dendl;
2894 return r;
2895}
2896
2897int KStore::_setattrs(TransContext *txc,
2898 CollectionRef& c,
2899 OnodeRef& o,
2900 const map<string,bufferptr>& aset)
2901{
2902 dout(15) << __func__ << " " << c->cid << " " << o->oid
2903 << " " << aset.size() << " keys"
2904 << dendl;
2905 int r = 0;
2906 for (map<string,bufferptr>::const_iterator p = aset.begin();
2907 p != aset.end(); ++p) {
2908 if (p->second.is_partial())
2909 o->onode.attrs[p->first] = bufferptr(p->second.c_str(), p->second.length());
2910 else
2911 o->onode.attrs[p->first] = p->second;
2912 }
2913 txc->write_onode(o);
2914 dout(10) << __func__ << " " << c->cid << " " << o->oid
2915 << " " << aset.size() << " keys"
2916 << " = " << r << dendl;
2917 return r;
2918}
2919
2920
2921int KStore::_rmattr(TransContext *txc,
2922 CollectionRef& c,
2923 OnodeRef& o,
2924 const string& name)
2925{
2926 dout(15) << __func__ << " " << c->cid << " " << o->oid
2927 << " " << name << dendl;
2928 int r = 0;
2929 o->onode.attrs.erase(name);
2930 txc->write_onode(o);
2931 dout(10) << __func__ << " " << c->cid << " " << o->oid
2932 << " " << name << " = " << r << dendl;
2933 return r;
2934}
2935
2936int KStore::_rmattrs(TransContext *txc,
2937 CollectionRef& c,
2938 OnodeRef& o)
2939{
2940 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2941 int r = 0;
2942 o->onode.attrs.clear();
2943 txc->write_onode(o);
2944 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
2945 return r;
2946}
2947
2948void KStore::_do_omap_clear(TransContext *txc, uint64_t id)
2949{
2950 KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
2951 string prefix, tail;
2952 get_omap_header(id, &prefix);
2953 get_omap_tail(id, &tail);
2954 it->lower_bound(prefix);
2955 while (it->valid()) {
2956 if (it->key() >= tail) {
2957 dout(30) << __func__ << " stop at " << tail << dendl;
2958 break;
2959 }
2960 txc->t->rmkey(PREFIX_OMAP, it->key());
2961 dout(30) << __func__ << " rm " << pretty_binary_string(it->key()) << dendl;
2962 it->next();
2963 }
2964}
2965
2966int KStore::_omap_clear(TransContext *txc,
2967 CollectionRef& c,
2968 OnodeRef& o)
2969{
2970 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2971 int r = 0;
2972 if (o->onode.omap_head != 0) {
2973 _do_omap_clear(txc, o->onode.omap_head);
2974 }
2975 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
2976 return r;
2977}
2978
2979int KStore::_omap_setkeys(TransContext *txc,
2980 CollectionRef& c,
2981 OnodeRef& o,
2982 bufferlist &bl)
2983{
2984 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2985 int r;
2986 bufferlist::iterator p = bl.begin();
2987 __u32 num;
2988 if (!o->onode.omap_head) {
2989 o->onode.omap_head = o->onode.nid;
2990 txc->write_onode(o);
2991 }
2992 ::decode(num, p);
2993 while (num--) {
2994 string key;
2995 bufferlist value;
2996 ::decode(key, p);
2997 ::decode(value, p);
2998 string final_key;
2999 get_omap_key(o->onode.omap_head, key, &final_key);
3000 dout(30) << __func__ << " " << pretty_binary_string(final_key)
3001 << " <- " << key << dendl;
3002 txc->t->set(PREFIX_OMAP, final_key, value);
3003 }
3004 r = 0;
3005 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
3006 return r;
3007}
3008
3009int KStore::_omap_setheader(TransContext *txc,
3010 CollectionRef& c,
3011 OnodeRef &o,
3012 bufferlist& bl)
3013{
3014 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
3015 int r;
3016 string key;
3017 if (!o->onode.omap_head) {
3018 o->onode.omap_head = o->onode.nid;
3019 txc->write_onode(o);
3020 }
3021 get_omap_header(o->onode.omap_head, &key);
3022 txc->t->set(PREFIX_OMAP, key, bl);
3023 r = 0;
3024 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
3025 return r;
3026}
3027
3028int KStore::_omap_rmkeys(TransContext *txc,
3029 CollectionRef& c,
3030 OnodeRef& o,
3031 bufferlist& bl)
3032{
3033 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
3034 int r = 0;
3035 bufferlist::iterator p = bl.begin();
3036 __u32 num;
3037
3038 if (!o->onode.omap_head) {
3039 r = 0;
3040 goto out;
3041 }
3042 ::decode(num, p);
3043 while (num--) {
3044 string key;
3045 ::decode(key, p);
3046 string final_key;
3047 get_omap_key(o->onode.omap_head, key, &final_key);
3048 dout(30) << __func__ << " rm " << pretty_binary_string(final_key)
3049 << " <- " << key << dendl;
3050 txc->t->rmkey(PREFIX_OMAP, final_key);
3051 }
3052 r = 0;
3053
3054 out:
3055 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
3056 return r;
3057}
3058
3059int KStore::_omap_rmkey_range(TransContext *txc,
3060 CollectionRef& c,
3061 OnodeRef& o,
3062 const string& first, const string& last)
3063{
3064 dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
3065 KeyValueDB::Iterator it;
3066 string key_first, key_last;
3067 int r = 0;
3068
3069 if (!o->onode.omap_head) {
3070 goto out;
3071 }
3072 it = db->get_iterator(PREFIX_OMAP);
3073 get_omap_key(o->onode.omap_head, first, &key_first);
3074 get_omap_key(o->onode.omap_head, last, &key_last);
3075 it->lower_bound(key_first);
3076 while (it->valid()) {
3077 if (it->key() >= key_last) {
3078 dout(30) << __func__ << " stop at " << pretty_binary_string(key_last)
3079 << dendl;
3080 break;
3081 }
3082 txc->t->rmkey(PREFIX_OMAP, it->key());
3083 dout(30) << __func__ << " rm " << pretty_binary_string(it->key()) << dendl;
3084 it->next();
3085 }
3086 r = 0;
3087
3088 out:
3089 dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
3090 return r;
3091}
3092
3093int KStore::_setallochint(TransContext *txc,
3094 CollectionRef& c,
3095 OnodeRef& o,
3096 uint64_t expected_object_size,
3097 uint64_t expected_write_size,
3098 uint32_t flags)
3099{
3100 dout(15) << __func__ << " " << c->cid << " " << o->oid
3101 << " object_size " << expected_object_size
3102 << " write_size " << expected_write_size
3103 << " flags " << flags
3104 << dendl;
3105 int r = 0;
3106 o->onode.expected_object_size = expected_object_size;
3107 o->onode.expected_write_size = expected_write_size;
3108 o->onode.alloc_hint_flags = flags;
3109
3110 txc->write_onode(o);
3111 dout(10) << __func__ << " " << c->cid << " " << o->oid
3112 << " object_size " << expected_object_size
3113 << " write_size " << expected_write_size
3114 << " = " << r << dendl;
3115 return r;
3116}
3117
3118int KStore::_clone(TransContext *txc,
3119 CollectionRef& c,
3120 OnodeRef& oldo,
3121 OnodeRef& newo)
3122{
3123 dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3124 << newo->oid << dendl;
3125 int r = 0;
3126 if (oldo->oid.hobj.get_hash() != newo->oid.hobj.get_hash()) {
3127 derr << __func__ << " mismatched hash on " << oldo->oid
3128 << " and " << newo->oid << dendl;
3129 return -EINVAL;
3130 }
3131
3132 bufferlist bl;
3133 newo->exists = true;
3134 _assign_nid(txc, newo);
3135
3136 // data
3137 oldo->flush();
3138
3139 r = _do_read(oldo, 0, oldo->onode.size, bl, 0);
3140 if (r < 0)
3141 goto out;
3142
3143 // truncate any old data
3144 r = _do_truncate(txc, newo, 0);
3145 if (r < 0)
3146 goto out;
3147
3148 r = _do_write(txc, newo, 0, oldo->onode.size, bl, 0);
3149 if (r < 0)
3150 goto out;
3151
3152 newo->onode.attrs = oldo->onode.attrs;
3153
3154 // clone omap
3155 if (newo->onode.omap_head) {
3156 dout(20) << __func__ << " clearing old omap data" << dendl;
3157 _do_omap_clear(txc, newo->onode.omap_head);
3158 }
3159 if (oldo->onode.omap_head) {
3160 dout(20) << __func__ << " copying omap data" << dendl;
3161 if (!newo->onode.omap_head) {
3162 newo->onode.omap_head = newo->onode.nid;
3163 }
3164 KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
3165 string head, tail;
3166 get_omap_header(oldo->onode.omap_head, &head);
3167 get_omap_tail(oldo->onode.omap_head, &tail);
3168 it->lower_bound(head);
3169 while (it->valid()) {
3170 string key;
3171 if (it->key() >= tail) {
3172 dout(30) << __func__ << " reached tail" << dendl;
3173 break;
3174 } else {
3175 dout(30) << __func__ << " got header/data "
3176 << pretty_binary_string(it->key()) << dendl;
3177 assert(it->key() < tail);
3178 rewrite_omap_key(newo->onode.omap_head, it->key(), &key);
3179 txc->t->set(PREFIX_OMAP, key, it->value());
3180 }
3181 it->next();
3182 }
3183 }
3184
3185 txc->write_onode(newo);
3186 r = 0;
3187
3188 out:
3189 dout(10) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3190 << newo->oid << " = " << r << dendl;
3191 return r;
3192}
3193
3194int KStore::_clone_range(TransContext *txc,
3195 CollectionRef& c,
3196 OnodeRef& oldo,
3197 OnodeRef& newo,
3198 uint64_t srcoff, uint64_t length, uint64_t dstoff)
3199{
3200 dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3201 << newo->oid << " from " << srcoff << "~" << length
3202 << " to offset " << dstoff << dendl;
3203 int r = 0;
3204
3205 bufferlist bl;
3206 newo->exists = true;
3207 _assign_nid(txc, newo);
3208
3209 r = _do_read(oldo, srcoff, length, bl, 0);
3210 if (r < 0)
3211 goto out;
3212
3213 r = _do_write(txc, newo, dstoff, bl.length(), bl, 0);
3214 if (r < 0)
3215 goto out;
3216
3217 txc->write_onode(newo);
3218
3219 r = 0;
3220
3221 out:
3222 dout(10) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3223 << newo->oid << " from " << srcoff << "~" << length
3224 << " to offset " << dstoff
3225 << " = " << r << dendl;
3226 return r;
3227}
3228
3229int KStore::_rename(TransContext *txc,
3230 CollectionRef& c,
3231 OnodeRef& oldo,
3232 OnodeRef& newo,
3233 const ghobject_t& new_oid)
3234{
3235 dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3236 << new_oid << dendl;
3237 int r;
3238 ghobject_t old_oid = oldo->oid;
3239 bufferlist bl;
3240 string old_key, new_key;
3241
3242 if (newo && newo->exists) {
3243 // destination object already exists, remove it first
3244 r = _do_remove(txc, newo);
3245 if (r < 0)
3246 goto out;
3247 }
3248
3249 txc->t->rmkey(PREFIX_OBJ, oldo->key);
3250 txc->write_onode(oldo);
3251 c->onode_map.rename(old_oid, new_oid); // this adjusts oldo->{oid,key}
3252 r = 0;
3253
3254 out:
3255 dout(10) << __func__ << " " << c->cid << " " << old_oid << " -> "
3256 << new_oid << " = " << r << dendl;
3257 return r;
3258}
3259
3260// collections
3261
3262int KStore::_create_collection(
3263 TransContext *txc,
3264 coll_t cid,
3265 unsigned bits,
3266 CollectionRef *c)
3267{
3268 dout(15) << __func__ << " " << cid << " bits " << bits << dendl;
3269 int r;
3270 bufferlist bl;
3271
3272 {
3273 RWLock::WLocker l(coll_lock);
3274 if (*c) {
3275 r = -EEXIST;
3276 goto out;
3277 }
3278 c->reset(new Collection(this, cid));
3279 (*c)->cnode.bits = bits;
3280 coll_map[cid] = *c;
3281 }
3282 ::encode((*c)->cnode, bl);
3283 txc->t->set(PREFIX_COLL, stringify(cid), bl);
3284 r = 0;
3285
3286 out:
3287 dout(10) << __func__ << " " << cid << " bits " << bits << " = " << r << dendl;
3288 return r;
3289}
3290
3291int KStore::_remove_collection(TransContext *txc, coll_t cid,
3292 CollectionRef *c)
3293{
3294 dout(15) << __func__ << " " << cid << dendl;
3295 int r;
3296
3297 {
3298 RWLock::WLocker l(coll_lock);
3299 if (!*c) {
3300 r = -ENOENT;
3301 goto out;
3302 }
3303 size_t nonexistent_count = 0;
3304 pair<ghobject_t,OnodeRef> next_onode;
3305 while ((*c)->onode_map.get_next(next_onode.first, &next_onode)) {
3306 if (next_onode.second->exists) {
3307 r = -ENOTEMPTY;
3308 goto out;
3309 }
3310 ++nonexistent_count;
3311 }
3312 vector<ghobject_t> ls;
3313 ghobject_t next;
3314 // Enumerate onodes in db, up to nonexistent_count + 1
3315 // then check if all of them are marked as non-existent.
3316 // Bypass the check if returned number is greater than nonexistent_count
3317 r = _collection_list(c->get(), ghobject_t(), ghobject_t::get_max(),
3318 nonexistent_count + 1, &ls, &next);
3319 if (r >= 0) {
3320 bool exists = false; //ls.size() > nonexistent_count;
3321 for (auto it = ls.begin(); !exists && it < ls.end(); ++it) {
3322 dout(10) << __func__ << " oid " << *it << dendl;
3323 auto onode = (*c)->onode_map.lookup(*it);
3324 exists = !onode || onode->exists;
3325 if (exists) {
3326 dout(10) << __func__ << " " << *it
3327 << " exists in db" << dendl;
3328 }
3329 }
3330 if (!exists) {
3331 coll_map.erase(cid);
3332 txc->removed_collections.push_back(*c);
3333 c->reset();
3334 txc->t->rmkey(PREFIX_COLL, stringify(cid));
3335 r = 0;
3336 } else {
3337 dout(10) << __func__ << " " << cid
3338 << " is non-empty" << dendl;
3339 r = -ENOTEMPTY;
3340 }
3341 }
3342 }
3343
3344 out:
3345 dout(10) << __func__ << " " << cid << " = " << r << dendl;
3346 return r;
3347}
3348
3349int KStore::_split_collection(TransContext *txc,
3350 CollectionRef& c,
3351 CollectionRef& d,
3352 unsigned bits, int rem)
3353{
3354 dout(15) << __func__ << " " << c->cid << " to " << d->cid << " "
3355 << " bits " << bits << dendl;
3356 int r;
3357 RWLock::WLocker l(c->lock);
3358 RWLock::WLocker l2(d->lock);
3359 c->onode_map.clear();
3360 d->onode_map.clear();
3361 c->cnode.bits = bits;
3362 assert(d->cnode.bits == bits);
3363 r = 0;
3364
3365 bufferlist bl;
3366 ::encode(c->cnode, bl);
3367 txc->t->set(PREFIX_COLL, stringify(c->cid), bl);
3368
3369 dout(10) << __func__ << " " << c->cid << " to " << d->cid << " "
3370 << " bits " << bits << " = " << r << dendl;
3371 return r;
3372}
3373
3374// ===========================================