]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/filestore/FileStore.cc
d6aceafda17670109d3c2af4fe9a38ef98fe2b87
[ceph.git] / ceph / src / os / filestore / FileStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15 #include "include/compat.h"
16 #include "include/int_types.h"
17 #include "boost/tuple/tuple.hpp"
18
19 #include <unistd.h>
20 #include <stdlib.h>
21 #include <sys/types.h>
22 #include <sys/stat.h>
23 #include <fcntl.h>
24 #include <sys/file.h>
25 #include <errno.h>
26 #include <dirent.h>
27 #include <sys/ioctl.h>
28
29 #if defined(__linux__)
30 #include <linux/fs.h>
31 #include <linux/falloc.h>
32 #endif
33
34 #include <iostream>
35 #include <map>
36
37 #include "include/linux_fiemap.h"
38
39 #include "chain_xattr.h"
40
41 #if defined(__APPLE__) || defined(__FreeBSD__)
42 #include <sys/param.h>
43 #include <sys/mount.h>
44 #endif
45
46
47 #include <fstream>
48 #include <sstream>
49
50 #include "FileStore.h"
51 #include "GenericFileStoreBackend.h"
52 #include "BtrfsFileStoreBackend.h"
53 #include "XfsFileStoreBackend.h"
54 #include "ZFSFileStoreBackend.h"
55 #include "common/BackTrace.h"
56 #include "include/types.h"
57 #include "FileJournal.h"
58
59 #include "osd/osd_types.h"
60 #include "include/color.h"
61 #include "include/buffer.h"
62
63 #include "common/Timer.h"
64 #include "common/debug.h"
65 #include "common/errno.h"
66 #include "common/run_cmd.h"
67 #include "common/safe_io.h"
68 #include "common/perf_counters.h"
69 #include "common/sync_filesystem.h"
70 #include "common/fd.h"
71 #include "HashIndex.h"
72 #include "DBObjectMap.h"
73 #include "kv/KeyValueDB.h"
74
75 #include "common/ceph_crypto.h"
76 using ceph::crypto::SHA1;
77
78 #include "include/ceph_assert.h"
79
80 #include "common/config.h"
81 #include "common/blkdev.h"
82
83 #ifdef WITH_LTTNG
84 #define TRACEPOINT_DEFINE
85 #define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
86 #include "tracing/objectstore.h"
87 #undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE
88 #undef TRACEPOINT_DEFINE
89 #else
90 #define tracepoint(...)
91 #endif
92
93 #define dout_context cct
94 #define dout_subsys ceph_subsys_filestore
95 #undef dout_prefix
96 #define dout_prefix *_dout << "filestore(" << basedir << ") "
97
98 #define COMMIT_SNAP_ITEM "snap_%llu"
99 #define CLUSTER_SNAP_ITEM "clustersnap_%s"
100
101 #define REPLAY_GUARD_XATTR "user.cephos.seq"
102 #define GLOBAL_REPLAY_GUARD_XATTR "user.cephos.gseq"
103
104 // XATTR_SPILL_OUT_NAME as a xattr is used to maintain that indicates whether
105 // xattrs spill over into DBObjectMap, if XATTR_SPILL_OUT_NAME exists in file
106 // xattrs and the value is "no", it indicates no xattrs in DBObjectMap
107 #define XATTR_SPILL_OUT_NAME "user.cephos.spill_out"
108 #define XATTR_NO_SPILL_OUT "0"
109 #define XATTR_SPILL_OUT "1"
110 #define __FUNC__ __func__ << "(" << __LINE__ << ")"
111
112 //Initial features in new superblock.
113 static CompatSet get_fs_initial_compat_set() {
114 CompatSet::FeatureSet ceph_osd_feature_compat;
115 CompatSet::FeatureSet ceph_osd_feature_ro_compat;
116 CompatSet::FeatureSet ceph_osd_feature_incompat;
117 return CompatSet(ceph_osd_feature_compat, ceph_osd_feature_ro_compat,
118 ceph_osd_feature_incompat);
119 }
120
121 //Features are added here that this FileStore supports.
122 static CompatSet get_fs_supported_compat_set() {
123 CompatSet compat = get_fs_initial_compat_set();
124 //Any features here can be set in code, but not in initial superblock
125 compat.incompat.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS);
126 return compat;
127 }
128
129 int FileStore::validate_hobject_key(const hobject_t &obj) const
130 {
131 unsigned len = LFNIndex::get_max_escaped_name_len(obj);
132 return len > m_filestore_max_xattr_value_size ? -ENAMETOOLONG : 0;
133 }
134
135 int FileStore::get_block_device_fsid(CephContext* cct, const string& path,
136 uuid_d *fsid)
137 {
138 // make sure we don't try to use aio or direct_io (and get annoying
139 // error messages from failing to do so); performance implications
140 // should be irrelevant for this use
141 FileJournal j(cct, *fsid, 0, 0, path.c_str(), false, false);
142 return j.peek_fsid(*fsid);
143 }
144
145 void FileStore::FSPerfTracker::update_from_perfcounters(
146 PerfCounters &logger)
147 {
148 os_commit_latency_ns.consume_next(
149 logger.get_tavg_ns(
150 l_filestore_journal_latency));
151 os_apply_latency_ns.consume_next(
152 logger.get_tavg_ns(
153 l_filestore_apply_latency));
154 }
155
156
157 ostream& operator<<(ostream& out, const FileStore::OpSequencer& s)
158 {
159 return out << "osr(" << s.cid << ")";
160 }
161
162 int FileStore::get_cdir(const coll_t& cid, char *s, int len)
163 {
164 const string &cid_str(cid.to_str());
165 return snprintf(s, len, "%s/current/%s", basedir.c_str(), cid_str.c_str());
166 }
167
168 void FileStore::handle_eio()
169 {
170 // don't try to map this back to an offset; too hard since there is
171 // a file system in between. we also don't really know whether this
172 // was a read or a write, since we have so many layers beneath us.
173 // don't even try.
174 note_io_error_event(devname.c_str(), basedir.c_str(), -EIO, 0, 0, 0);
175 ceph_abort_msg("unexpected eio error");
176 }
177
178 int FileStore::get_index(const coll_t& cid, Index *index)
179 {
180 int r = index_manager.get_index(cid, basedir, index);
181 if (r == -EIO && m_filestore_fail_eio) handle_eio();
182 return r;
183 }
184
185 int FileStore::init_index(const coll_t& cid)
186 {
187 char path[PATH_MAX];
188 get_cdir(cid, path, sizeof(path));
189 int r = index_manager.init_index(cid, path, target_version);
190 if (r == -EIO && m_filestore_fail_eio) handle_eio();
191 return r;
192 }
193
194 int FileStore::lfn_find(const ghobject_t& oid, const Index& index, IndexedPath *path)
195 {
196 IndexedPath path2;
197 if (!path)
198 path = &path2;
199 int r, exist;
200 ceph_assert(index.index);
201 r = (index.index)->lookup(oid, path, &exist);
202 if (r < 0) {
203 if (r == -EIO && m_filestore_fail_eio) handle_eio();
204 return r;
205 }
206 if (!exist)
207 return -ENOENT;
208 return 0;
209 }
210
211 int FileStore::lfn_truncate(const coll_t& cid, const ghobject_t& oid, off_t length)
212 {
213 FDRef fd;
214 int r = lfn_open(cid, oid, false, &fd);
215 if (r < 0)
216 return r;
217 r = ::ftruncate(**fd, length);
218 if (r < 0)
219 r = -errno;
220 if (r >= 0 && m_filestore_sloppy_crc) {
221 int rc = backend->_crc_update_truncate(**fd, length);
222 ceph_assert(rc >= 0);
223 }
224 lfn_close(fd);
225 if (r == -EIO && m_filestore_fail_eio) handle_eio();
226 return r;
227 }
228
229 int FileStore::lfn_stat(const coll_t& cid, const ghobject_t& oid, struct stat *buf)
230 {
231 IndexedPath path;
232 Index index;
233 int r = get_index(cid, &index);
234 if (r < 0)
235 return r;
236
237 ceph_assert(index.index);
238 std::shared_lock l{(index.index)->access_lock};
239
240 r = lfn_find(oid, index, &path);
241 if (r < 0)
242 return r;
243 r = ::stat(path->path(), buf);
244 if (r < 0)
245 r = -errno;
246 return r;
247 }
248
249 int FileStore::lfn_open(const coll_t& cid,
250 const ghobject_t& oid,
251 bool create,
252 FDRef *outfd,
253 Index *index)
254 {
255 ceph_assert(outfd);
256 int r = 0;
257 bool need_lock = true;
258 int flags = O_RDWR;
259
260 if (create)
261 flags |= O_CREAT;
262 if (cct->_conf->filestore_odsync_write) {
263 flags |= O_DSYNC;
264 }
265
266 Index index2;
267 if (!index) {
268 index = &index2;
269 }
270 if (!((*index).index)) {
271 r = get_index(cid, index);
272 if (r < 0) {
273 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
274 return r;
275 }
276 } else {
277 need_lock = false;
278 }
279
280 int fd, exist;
281 ceph_assert((*index).index);
282 if (need_lock) {
283 ((*index).index)->access_lock.lock();
284 }
285 if (!replaying) {
286 *outfd = fdcache.lookup(oid);
287 if (*outfd) {
288 if (need_lock) {
289 ((*index).index)->access_lock.unlock();
290 }
291 return 0;
292 }
293 }
294
295
296 IndexedPath path2;
297 IndexedPath *path = &path2;
298
299 r = (*index)->lookup(oid, path, &exist);
300 if (r < 0) {
301 derr << "could not find " << oid << " in index: "
302 << cpp_strerror(-r) << dendl;
303 goto fail;
304 }
305
306 r = ::open((*path)->path(), flags|O_CLOEXEC, 0644);
307 if (r < 0) {
308 r = -errno;
309 dout(10) << "error opening file " << (*path)->path() << " with flags="
310 << flags << ": " << cpp_strerror(-r) << dendl;
311 goto fail;
312 }
313 fd = r;
314 if (create && (!exist)) {
315 r = (*index)->created(oid, (*path)->path());
316 if (r < 0) {
317 VOID_TEMP_FAILURE_RETRY(::close(fd));
318 derr << "error creating " << oid << " (" << (*path)->path()
319 << ") in index: " << cpp_strerror(-r) << dendl;
320 goto fail;
321 }
322 r = chain_fsetxattr<true, true>(
323 fd, XATTR_SPILL_OUT_NAME,
324 XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT));
325 if (r < 0) {
326 VOID_TEMP_FAILURE_RETRY(::close(fd));
327 derr << "error setting spillout xattr for oid " << oid << " (" << (*path)->path()
328 << "):" << cpp_strerror(-r) << dendl;
329 goto fail;
330 }
331 }
332
333 if (!replaying) {
334 bool existed;
335 *outfd = fdcache.add(oid, fd, &existed);
336 if (existed) {
337 TEMP_FAILURE_RETRY(::close(fd));
338 }
339 } else {
340 *outfd = std::make_shared<FDCache::FD>(fd);
341 }
342
343 if (need_lock) {
344 ((*index).index)->access_lock.unlock();
345 }
346
347 return 0;
348
349 fail:
350
351 if (need_lock) {
352 ((*index).index)->access_lock.unlock();
353 }
354
355 if (r == -EIO && m_filestore_fail_eio) handle_eio();
356 return r;
357 }
358
359 void FileStore::lfn_close(FDRef fd)
360 {
361 }
362
363 int FileStore::lfn_link(const coll_t& c, const coll_t& newcid, const ghobject_t& o, const ghobject_t& newoid)
364 {
365 Index index_new, index_old;
366 IndexedPath path_new, path_old;
367 int exist;
368 int r;
369 bool index_same = false;
370 if (c < newcid) {
371 r = get_index(newcid, &index_new);
372 if (r < 0)
373 return r;
374 r = get_index(c, &index_old);
375 if (r < 0)
376 return r;
377 } else if (c == newcid) {
378 r = get_index(c, &index_old);
379 if (r < 0)
380 return r;
381 index_new = index_old;
382 index_same = true;
383 } else {
384 r = get_index(c, &index_old);
385 if (r < 0)
386 return r;
387 r = get_index(newcid, &index_new);
388 if (r < 0)
389 return r;
390 }
391
392 ceph_assert(index_old.index);
393 ceph_assert(index_new.index);
394
395 if (!index_same) {
396
397 std::shared_lock l1{(index_old.index)->access_lock};
398
399 r = index_old->lookup(o, &path_old, &exist);
400 if (r < 0) {
401 if (r == -EIO && m_filestore_fail_eio) handle_eio();
402 return r;
403 }
404 if (!exist)
405 return -ENOENT;
406
407 std::unique_lock l2{(index_new.index)->access_lock};
408
409 r = index_new->lookup(newoid, &path_new, &exist);
410 if (r < 0) {
411 if (r == -EIO && m_filestore_fail_eio) handle_eio();
412 return r;
413 }
414 if (exist)
415 return -EEXIST;
416
417 dout(25) << __FUNC__ << ": path_old: " << path_old << dendl;
418 dout(25) << __FUNC__ << ": path_new: " << path_new << dendl;
419 r = ::link(path_old->path(), path_new->path());
420 if (r < 0)
421 return -errno;
422
423 r = index_new->created(newoid, path_new->path());
424 if (r < 0) {
425 if (r == -EIO && m_filestore_fail_eio) handle_eio();
426 return r;
427 }
428 } else {
429 std::unique_lock l1{(index_old.index)->access_lock};
430
431 r = index_old->lookup(o, &path_old, &exist);
432 if (r < 0) {
433 if (r == -EIO && m_filestore_fail_eio) handle_eio();
434 return r;
435 }
436 if (!exist)
437 return -ENOENT;
438
439 r = index_new->lookup(newoid, &path_new, &exist);
440 if (r < 0) {
441 if (r == -EIO && m_filestore_fail_eio) handle_eio();
442 return r;
443 }
444 if (exist)
445 return -EEXIST;
446
447 dout(25) << __FUNC__ << ": path_old: " << path_old << dendl;
448 dout(25) << __FUNC__ << ": path_new: " << path_new << dendl;
449 r = ::link(path_old->path(), path_new->path());
450 if (r < 0)
451 return -errno;
452
453 // make sure old fd for unlinked/overwritten file is gone
454 fdcache.clear(newoid);
455
456 r = index_new->created(newoid, path_new->path());
457 if (r < 0) {
458 if (r == -EIO && m_filestore_fail_eio) handle_eio();
459 return r;
460 }
461 }
462 return 0;
463 }
464
465 int FileStore::lfn_unlink(const coll_t& cid, const ghobject_t& o,
466 const SequencerPosition &spos,
467 bool force_clear_omap)
468 {
469 Index index;
470 int r = get_index(cid, &index);
471 if (r < 0) {
472 dout(25) << __FUNC__ << ": get_index failed " << cpp_strerror(r) << dendl;
473 return r;
474 }
475
476 ceph_assert(index.index);
477 std::unique_lock l{(index.index)->access_lock};
478
479 {
480 IndexedPath path;
481 int hardlink;
482 r = index->lookup(o, &path, &hardlink);
483 if (r < 0) {
484 if (r == -EIO && m_filestore_fail_eio) handle_eio();
485 return r;
486 }
487
488 if (!force_clear_omap) {
489 if (hardlink == 0 || hardlink == 1) {
490 force_clear_omap = true;
491 }
492 }
493 if (force_clear_omap) {
494 dout(20) << __FUNC__ << ": clearing omap on " << o
495 << " in cid " << cid << dendl;
496 r = object_map->clear(o, &spos);
497 if (r < 0 && r != -ENOENT) {
498 dout(25) << __FUNC__ << ": omap clear failed " << cpp_strerror(r) << dendl;
499 if (r == -EIO && m_filestore_fail_eio) handle_eio();
500 return r;
501 }
502 if (cct->_conf->filestore_debug_inject_read_err) {
503 debug_obj_on_delete(o);
504 }
505 if (!m_disable_wbthrottle) {
506 wbthrottle.clear_object(o); // should be only non-cache ref
507 }
508 fdcache.clear(o);
509 } else {
510 /* Ensure that replay of this op doesn't result in the object_map
511 * going away.
512 */
513 if (!backend->can_checkpoint())
514 object_map->sync(&o, &spos);
515 }
516 if (hardlink == 0) {
517 if (!m_disable_wbthrottle) {
518 wbthrottle.clear_object(o); // should be only non-cache ref
519 }
520 return 0;
521 }
522 }
523 r = index->unlink(o);
524 if (r < 0) {
525 dout(25) << __FUNC__ << ": index unlink failed " << cpp_strerror(r) << dendl;
526 return r;
527 }
528 return 0;
529 }
530
531 FileStore::FileStore(CephContext* cct, const std::string &base,
532 const std::string &jdev, osflagbits_t flags,
533 const char *name, bool do_update) :
534 JournalingObjectStore(cct, base),
535 internal_name(name),
536 basedir(base), journalpath(jdev),
537 generic_flags(flags),
538 blk_size(0),
539 fsid_fd(-1), op_fd(-1),
540 basedir_fd(-1), current_fd(-1),
541 backend(nullptr),
542 index_manager(cct, do_update),
543 force_sync(false),
544 timer(cct, sync_entry_timeo_lock),
545 stop(false), sync_thread(this),
546 fdcache(cct),
547 wbthrottle(cct),
548 next_osr_id(0),
549 m_disable_wbthrottle(cct->_conf->filestore_odsync_write ||
550 !cct->_conf->filestore_wbthrottle_enable),
551 throttle_ops(cct, "filestore_ops", cct->_conf->filestore_caller_concurrency),
552 throttle_bytes(cct, "filestore_bytes", cct->_conf->filestore_caller_concurrency),
553 m_ondisk_finisher_num(cct->_conf->filestore_ondisk_finisher_threads),
554 m_apply_finisher_num(cct->_conf->filestore_apply_finisher_threads),
555 op_tp(cct, "FileStore::op_tp", "tp_fstore_op", cct->_conf->filestore_op_threads, "filestore_op_threads"),
556 op_wq(this, cct->_conf->filestore_op_thread_timeout,
557 cct->_conf->filestore_op_thread_suicide_timeout, &op_tp),
558 logger(nullptr),
559 trace_endpoint("0.0.0.0", 0, "FileStore"),
560 m_filestore_commit_timeout(cct->_conf->filestore_commit_timeout),
561 m_filestore_journal_parallel(cct->_conf->filestore_journal_parallel ),
562 m_filestore_journal_trailing(cct->_conf->filestore_journal_trailing),
563 m_filestore_journal_writeahead(cct->_conf->filestore_journal_writeahead),
564 m_filestore_fiemap_threshold(cct->_conf->filestore_fiemap_threshold),
565 m_filestore_max_sync_interval(cct->_conf->filestore_max_sync_interval),
566 m_filestore_min_sync_interval(cct->_conf->filestore_min_sync_interval),
567 m_filestore_fail_eio(cct->_conf->filestore_fail_eio),
568 m_filestore_fadvise(cct->_conf->filestore_fadvise),
569 do_update(do_update),
570 m_journal_dio(cct->_conf->journal_dio),
571 m_journal_aio(cct->_conf->journal_aio),
572 m_journal_force_aio(cct->_conf->journal_force_aio),
573 m_osd_rollback_to_cluster_snap(cct->_conf->osd_rollback_to_cluster_snap),
574 m_osd_use_stale_snap(cct->_conf->osd_use_stale_snap),
575 m_filestore_do_dump(false),
576 m_filestore_dump_fmt(true),
577 m_filestore_sloppy_crc(cct->_conf->filestore_sloppy_crc),
578 m_filestore_sloppy_crc_block_size(cct->_conf->filestore_sloppy_crc_block_size),
579 m_filestore_max_alloc_hint_size(cct->_conf->filestore_max_alloc_hint_size),
580 m_fs_type(0),
581 m_filestore_max_inline_xattr_size(0),
582 m_filestore_max_inline_xattrs(0),
583 m_filestore_max_xattr_value_size(0)
584 {
585 m_filestore_kill_at = cct->_conf->filestore_kill_at;
586 for (int i = 0; i < m_ondisk_finisher_num; ++i) {
587 ostringstream oss;
588 oss << "filestore-ondisk-" << i;
589 Finisher *f = new Finisher(cct, oss.str(), "fn_odsk_fstore");
590 ondisk_finishers.push_back(f);
591 }
592 for (int i = 0; i < m_apply_finisher_num; ++i) {
593 ostringstream oss;
594 oss << "filestore-apply-" << i;
595 Finisher *f = new Finisher(cct, oss.str(), "fn_appl_fstore");
596 apply_finishers.push_back(f);
597 }
598
599 ostringstream oss;
600 oss << basedir << "/current";
601 current_fn = oss.str();
602
603 ostringstream sss;
604 sss << basedir << "/current/commit_op_seq";
605 current_op_seq_fn = sss.str();
606
607 ostringstream omss;
608 if (cct->_conf->filestore_omap_backend_path != "") {
609 omap_dir = cct->_conf->filestore_omap_backend_path;
610 } else {
611 omss << basedir << "/current/omap";
612 omap_dir = omss.str();
613 }
614
615 // initialize logger
616 PerfCountersBuilder plb(cct, internal_name, l_filestore_first, l_filestore_last);
617
618 plb.add_u64(l_filestore_journal_queue_ops, "journal_queue_ops", "Operations in journal queue");
619 plb.add_u64(l_filestore_journal_ops, "journal_ops", "Active journal entries to be applied");
620 plb.add_u64(l_filestore_journal_queue_bytes, "journal_queue_bytes", "Size of journal queue");
621 plb.add_u64(l_filestore_journal_bytes, "journal_bytes", "Active journal operation size to be applied");
622 plb.add_time_avg(l_filestore_journal_latency, "journal_latency", "Average journal queue completing latency",
623 NULL, PerfCountersBuilder::PRIO_USEFUL);
624 plb.add_u64_counter(l_filestore_journal_wr, "journal_wr", "Journal write IOs");
625 plb.add_u64_avg(l_filestore_journal_wr_bytes, "journal_wr_bytes", "Journal data written");
626 plb.add_u64(l_filestore_op_queue_max_ops, "op_queue_max_ops", "Max operations in writing to FS queue");
627 plb.add_u64(l_filestore_op_queue_ops, "op_queue_ops", "Operations in writing to FS queue");
628 plb.add_u64_counter(l_filestore_ops, "ops", "Operations written to store");
629 plb.add_u64(l_filestore_op_queue_max_bytes, "op_queue_max_bytes", "Max data in writing to FS queue");
630 plb.add_u64(l_filestore_op_queue_bytes, "op_queue_bytes", "Size of writing to FS queue");
631 plb.add_u64_counter(l_filestore_bytes, "bytes", "Data written to store");
632 plb.add_time_avg(l_filestore_apply_latency, "apply_latency", "Apply latency");
633 plb.add_u64(l_filestore_committing, "committing", "Is currently committing");
634
635 plb.add_u64_counter(l_filestore_commitcycle, "commitcycle", "Commit cycles");
636 plb.add_time_avg(l_filestore_commitcycle_interval, "commitcycle_interval", "Average interval between commits");
637 plb.add_time_avg(l_filestore_commitcycle_latency, "commitcycle_latency", "Average latency of commit");
638 plb.add_u64_counter(l_filestore_journal_full, "journal_full", "Journal writes while full");
639 plb.add_time_avg(l_filestore_queue_transaction_latency_avg, "queue_transaction_latency_avg",
640 "Store operation queue latency", NULL, PerfCountersBuilder::PRIO_USEFUL);
641 plb.add_time(l_filestore_sync_pause_max_lat, "sync_pause_max_latency", "Max latency of op_wq pause before syncfs");
642
643 logger = plb.create_perf_counters();
644
645 cct->get_perfcounters_collection()->add(logger);
646 cct->_conf.add_observer(this);
647
648 superblock.compat_features = get_fs_initial_compat_set();
649 }
650
651 FileStore::~FileStore()
652 {
653 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
654 delete *it;
655 *it = nullptr;
656 }
657 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
658 delete *it;
659 *it = nullptr;
660 }
661 cct->_conf.remove_observer(this);
662 cct->get_perfcounters_collection()->remove(logger);
663
664 if (journal)
665 journal->logger = nullptr;
666 delete logger;
667 logger = nullptr;
668
669 if (m_filestore_do_dump) {
670 dump_stop();
671 }
672 }
673
674 static void get_attrname(const char *name, char *buf, int len)
675 {
676 snprintf(buf, len, "user.ceph.%s", name);
677 }
678
679 bool parse_attrname(char **name)
680 {
681 if (strncmp(*name, "user.ceph.", 10) == 0) {
682 *name += 10;
683 return true;
684 }
685 return false;
686 }
687
688 void FileStore::collect_metadata(map<string,string> *pm)
689 {
690 char partition_path[PATH_MAX];
691 char dev_node[PATH_MAX];
692
693 (*pm)["filestore_backend"] = backend->get_name();
694 ostringstream ss;
695 ss << "0x" << std::hex << m_fs_type << std::dec;
696 (*pm)["filestore_f_type"] = ss.str();
697
698 if (cct->_conf->filestore_collect_device_partition_information) {
699 int rc = 0;
700 BlkDev blkdev(fsid_fd);
701 if (rc = blkdev.partition(partition_path, PATH_MAX); rc) {
702 (*pm)["backend_filestore_partition_path"] = "unknown";
703 } else {
704 (*pm)["backend_filestore_partition_path"] = string(partition_path);
705 }
706 if (rc = blkdev.wholedisk(dev_node, PATH_MAX); rc) {
707 (*pm)["backend_filestore_dev_node"] = "unknown";
708 } else {
709 (*pm)["backend_filestore_dev_node"] = string(dev_node);
710 devname = dev_node;
711 }
712 if (rc == 0 && vdo_fd >= 0) {
713 (*pm)["vdo"] = "true";
714 (*pm)["vdo_physical_size"] =
715 stringify(4096 * get_vdo_stat(vdo_fd, "physical_blocks"));
716 }
717 if (journal) {
718 journal->collect_metadata(pm);
719 }
720 }
721 }
722
723 int FileStore::get_devices(set<string> *ls)
724 {
725 string dev_node;
726 BlkDev blkdev(fsid_fd);
727 if (int rc = blkdev.wholedisk(&dev_node); rc) {
728 return rc;
729 }
730 get_raw_devices(dev_node, ls);
731 if (journal) {
732 journal->get_devices(ls);
733 }
734 return 0;
735 }
736
737 int FileStore::statfs(struct store_statfs_t *buf0, osd_alert_list_t* alerts)
738 {
739 struct statfs buf;
740 buf0->reset();
741 if (alerts) {
742 alerts->clear(); // returns nothing for now
743 }
744 if (::statfs(basedir.c_str(), &buf) < 0) {
745 int r = -errno;
746 if (r == -EIO && m_filestore_fail_eio) handle_eio();
747 ceph_assert(r != -ENOENT);
748 return r;
749 }
750
751 uint64_t bfree = buf.f_bavail * buf.f_bsize;
752
753 // assume all of leveldb/rocksdb is omap.
754 {
755 map<string,uint64_t> kv_usage;
756 buf0->omap_allocated += object_map->get_db()->get_estimated_size(kv_usage);
757 }
758
759 uint64_t thin_total, thin_avail;
760 if (get_vdo_utilization(vdo_fd, &thin_total, &thin_avail)) {
761 buf0->total = thin_total;
762 bfree = std::min(bfree, thin_avail);
763 buf0->allocated = thin_total - thin_avail;
764 buf0->data_stored = bfree;
765 } else {
766 buf0->total = buf.f_blocks * buf.f_bsize;
767 buf0->allocated = bfree;
768 buf0->data_stored = bfree;
769 }
770 buf0->available = bfree;
771
772 // FIXME: we don't know how to populate buf->internal_metadata; XFS doesn't
773 // tell us what its internal overhead is.
774
775 // Adjust for writes pending in the journal
776 if (journal) {
777 uint64_t estimate = journal->get_journal_size_estimate();
778 buf0->internally_reserved = estimate;
779 if (buf0->available > estimate)
780 buf0->available -= estimate;
781 else
782 buf0->available = 0;
783 }
784
785 return 0;
786 }
787
788 int FileStore::pool_statfs(uint64_t pool_id, struct store_statfs_t *buf,
789 bool *per_pool_omap)
790 {
791 return -ENOTSUP;
792 }
793
794 void FileStore::new_journal()
795 {
796 if (journalpath.length()) {
797 dout(10) << "open_journal at " << journalpath << dendl;
798 journal = new FileJournal(cct, fsid, &finisher, &sync_cond,
799 journalpath.c_str(),
800 m_journal_dio, m_journal_aio,
801 m_journal_force_aio);
802 if (journal)
803 journal->logger = logger;
804 }
805 return;
806 }
807
808 int FileStore::dump_journal(ostream& out)
809 {
810 int r;
811
812 if (!journalpath.length())
813 return -EINVAL;
814
815 FileJournal *journal = new FileJournal(cct, fsid, &finisher, &sync_cond, journalpath.c_str(), m_journal_dio);
816 r = journal->dump(out);
817 delete journal;
818 journal = nullptr;
819 return r;
820 }
821
822 FileStoreBackend *FileStoreBackend::create(unsigned long f_type, FileStore *fs)
823 {
824 switch (f_type) {
825 #if defined(__linux__)
826 case BTRFS_SUPER_MAGIC:
827 return new BtrfsFileStoreBackend(fs);
828 # ifdef HAVE_LIBXFS
829 case XFS_SUPER_MAGIC:
830 return new XfsFileStoreBackend(fs);
831 # endif
832 #endif
833 #ifdef HAVE_LIBZFS
834 case ZFS_SUPER_MAGIC:
835 return new ZFSFileStoreBackend(fs);
836 #endif
837 default:
838 return new GenericFileStoreBackend(fs);
839 }
840 }
841
842 void FileStore::create_backend(unsigned long f_type)
843 {
844 m_fs_type = f_type;
845
846 ceph_assert(!backend);
847 backend = FileStoreBackend::create(f_type, this);
848
849 dout(0) << "backend " << backend->get_name()
850 << " (magic 0x" << std::hex << f_type << std::dec << ")"
851 << dendl;
852
853 switch (f_type) {
854 #if defined(__linux__)
855 case BTRFS_SUPER_MAGIC:
856 if (!m_disable_wbthrottle){
857 wbthrottle.set_fs(WBThrottle::BTRFS);
858 }
859 break;
860
861 case XFS_SUPER_MAGIC:
862 // wbthrottle is constructed with fs(WBThrottle::XFS)
863 break;
864 #endif
865 }
866
867 set_xattr_limits_via_conf();
868 }
869
870 int FileStore::mkfs()
871 {
872 int ret = 0;
873 char fsid_fn[PATH_MAX];
874 char fsid_str[40];
875 uuid_d old_fsid;
876 uuid_d old_omap_fsid;
877
878 dout(1) << "mkfs in " << basedir << dendl;
879 basedir_fd = ::open(basedir.c_str(), O_RDONLY|O_CLOEXEC);
880 if (basedir_fd < 0) {
881 ret = -errno;
882 derr << __FUNC__ << ": failed to open base dir " << basedir << ": " << cpp_strerror(ret) << dendl;
883 return ret;
884 }
885
886 // open+lock fsid
887 snprintf(fsid_fn, sizeof(fsid_fn), "%s/fsid", basedir.c_str());
888 fsid_fd = ::open(fsid_fn, O_RDWR|O_CREAT|O_CLOEXEC, 0644);
889 if (fsid_fd < 0) {
890 ret = -errno;
891 derr << __FUNC__ << ": failed to open " << fsid_fn << ": " << cpp_strerror(ret) << dendl;
892 goto close_basedir_fd;
893 }
894
895 if (lock_fsid() < 0) {
896 ret = -EBUSY;
897 goto close_fsid_fd;
898 }
899
900 if (read_fsid(fsid_fd, &old_fsid) < 0 || old_fsid.is_zero()) {
901 if (fsid.is_zero()) {
902 fsid.generate_random();
903 dout(1) << __FUNC__ << ": generated fsid " << fsid << dendl;
904 } else {
905 dout(1) << __FUNC__ << ": using provided fsid " << fsid << dendl;
906 }
907
908 fsid.print(fsid_str);
909 strcat(fsid_str, "\n");
910 ret = ::ftruncate(fsid_fd, 0);
911 if (ret < 0) {
912 ret = -errno;
913 derr << __FUNC__ << ": failed to truncate fsid: "
914 << cpp_strerror(ret) << dendl;
915 goto close_fsid_fd;
916 }
917 ret = safe_write(fsid_fd, fsid_str, strlen(fsid_str));
918 if (ret < 0) {
919 derr << __FUNC__ << ": failed to write fsid: "
920 << cpp_strerror(ret) << dendl;
921 goto close_fsid_fd;
922 }
923 if (::fsync(fsid_fd) < 0) {
924 ret = -errno;
925 derr << __FUNC__ << ": close failed: can't write fsid: "
926 << cpp_strerror(ret) << dendl;
927 goto close_fsid_fd;
928 }
929 dout(10) << __FUNC__ << ": fsid is " << fsid << dendl;
930 } else {
931 if (!fsid.is_zero() && fsid != old_fsid) {
932 derr << __FUNC__ << ": on-disk fsid " << old_fsid << " != provided " << fsid << dendl;
933 ret = -EINVAL;
934 goto close_fsid_fd;
935 }
936 fsid = old_fsid;
937 dout(1) << __FUNC__ << ": fsid is already set to " << fsid << dendl;
938 }
939
940 // version stamp
941 ret = write_version_stamp();
942 if (ret < 0) {
943 derr << __FUNC__ << ": write_version_stamp() failed: "
944 << cpp_strerror(ret) << dendl;
945 goto close_fsid_fd;
946 }
947
948 // superblock
949 superblock.omap_backend = cct->_conf->filestore_omap_backend;
950 ret = write_superblock();
951 if (ret < 0) {
952 derr << __FUNC__ << ": write_superblock() failed: "
953 << cpp_strerror(ret) << dendl;
954 goto close_fsid_fd;
955 }
956
957 struct statfs basefs;
958 ret = ::fstatfs(basedir_fd, &basefs);
959 if (ret < 0) {
960 ret = -errno;
961 derr << __FUNC__ << ": cannot fstatfs basedir "
962 << cpp_strerror(ret) << dendl;
963 goto close_fsid_fd;
964 }
965
966 #if defined(__linux__)
967 if (basefs.f_type == BTRFS_SUPER_MAGIC &&
968 !g_ceph_context->check_experimental_feature_enabled("btrfs")) {
969 derr << __FUNC__ << ": deprecated btrfs support is not enabled" << dendl;
970 goto close_fsid_fd;
971 }
972 #endif
973
974 create_backend(basefs.f_type);
975
976 ret = backend->create_current();
977 if (ret < 0) {
978 derr << __FUNC__ << ": failed to create current/ " << cpp_strerror(ret) << dendl;
979 goto close_fsid_fd;
980 }
981
982 // write initial op_seq
983 {
984 uint64_t initial_seq = 0;
985 int fd = read_op_seq(&initial_seq);
986 if (fd < 0) {
987 ret = fd;
988 derr << __FUNC__ << ": failed to create " << current_op_seq_fn << ": "
989 << cpp_strerror(ret) << dendl;
990 goto close_fsid_fd;
991 }
992 if (initial_seq == 0) {
993 ret = write_op_seq(fd, 1);
994 if (ret < 0) {
995 VOID_TEMP_FAILURE_RETRY(::close(fd));
996 derr << __FUNC__ << ": failed to write to " << current_op_seq_fn << ": "
997 << cpp_strerror(ret) << dendl;
998 goto close_fsid_fd;
999 }
1000
1001 if (backend->can_checkpoint()) {
1002 // create snap_1 too
1003 current_fd = ::open(current_fn.c_str(), O_RDONLY|O_CLOEXEC);
1004 ceph_assert(current_fd >= 0);
1005 char s[NAME_MAX];
1006 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, 1ull);
1007 ret = backend->create_checkpoint(s, nullptr);
1008 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
1009 if (ret < 0 && ret != -EEXIST) {
1010 VOID_TEMP_FAILURE_RETRY(::close(fd));
1011 derr << __FUNC__ << ": failed to create snap_1: " << cpp_strerror(ret) << dendl;
1012 goto close_fsid_fd;
1013 }
1014 }
1015 }
1016 VOID_TEMP_FAILURE_RETRY(::close(fd));
1017 }
1018 ret = KeyValueDB::test_init(superblock.omap_backend, omap_dir);
1019 if (ret < 0) {
1020 derr << __FUNC__ << ": failed to create " << cct->_conf->filestore_omap_backend << dendl;
1021 goto close_fsid_fd;
1022 }
1023 // create fsid under omap
1024 // open+lock fsid
1025 int omap_fsid_fd;
1026 char omap_fsid_fn[PATH_MAX];
1027 snprintf(omap_fsid_fn, sizeof(omap_fsid_fn), "%s/osd_uuid", omap_dir.c_str());
1028 omap_fsid_fd = ::open(omap_fsid_fn, O_RDWR|O_CREAT|O_CLOEXEC, 0644);
1029 if (omap_fsid_fd < 0) {
1030 ret = -errno;
1031 derr << __FUNC__ << ": failed to open " << omap_fsid_fn << ": " << cpp_strerror(ret) << dendl;
1032 goto close_fsid_fd;
1033 }
1034
1035 if (read_fsid(omap_fsid_fd, &old_omap_fsid) < 0 || old_omap_fsid.is_zero()) {
1036 ceph_assert(!fsid.is_zero());
1037 fsid.print(fsid_str);
1038 strcat(fsid_str, "\n");
1039 ret = ::ftruncate(omap_fsid_fd, 0);
1040 if (ret < 0) {
1041 ret = -errno;
1042 derr << __FUNC__ << ": failed to truncate fsid: "
1043 << cpp_strerror(ret) << dendl;
1044 goto close_omap_fsid_fd;
1045 }
1046 ret = safe_write(omap_fsid_fd, fsid_str, strlen(fsid_str));
1047 if (ret < 0) {
1048 derr << __FUNC__ << ": failed to write fsid: "
1049 << cpp_strerror(ret) << dendl;
1050 goto close_omap_fsid_fd;
1051 }
1052 dout(10) << __FUNC__ << ": write success, fsid:" << fsid_str << ", ret:" << ret << dendl;
1053 if (::fsync(omap_fsid_fd) < 0) {
1054 ret = -errno;
1055 derr << __FUNC__ << ": close failed: can't write fsid: "
1056 << cpp_strerror(ret) << dendl;
1057 goto close_omap_fsid_fd;
1058 }
1059 dout(10) << "mkfs omap fsid is " << fsid << dendl;
1060 } else {
1061 if (fsid != old_omap_fsid) {
1062 derr << __FUNC__ << ": " << omap_fsid_fn
1063 << " has existed omap fsid " << old_omap_fsid
1064 << " != expected osd fsid " << fsid
1065 << dendl;
1066 ret = -EINVAL;
1067 goto close_omap_fsid_fd;
1068 }
1069 dout(1) << __FUNC__ << ": omap fsid is already set to " << fsid << dendl;
1070 }
1071
1072 dout(1) << cct->_conf->filestore_omap_backend << " db exists/created" << dendl;
1073
1074 // journal?
1075 ret = mkjournal();
1076 if (ret)
1077 goto close_omap_fsid_fd;
1078
1079 ret = write_meta("type", "filestore");
1080 if (ret)
1081 goto close_omap_fsid_fd;
1082
1083 dout(1) << "mkfs done in " << basedir << dendl;
1084 ret = 0;
1085
1086 close_omap_fsid_fd:
1087 VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd));
1088 close_fsid_fd:
1089 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1090 fsid_fd = -1;
1091 close_basedir_fd:
1092 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1093 delete backend;
1094 backend = nullptr;
1095 return ret;
1096 }
1097
1098 int FileStore::mkjournal()
1099 {
1100 // read fsid
1101 int ret;
1102 char fn[PATH_MAX];
1103 snprintf(fn, sizeof(fn), "%s/fsid", basedir.c_str());
1104 int fd = ::open(fn, O_RDONLY|O_CLOEXEC, 0644);
1105 if (fd < 0) {
1106 int err = errno;
1107 derr << __FUNC__ << ": open error: " << cpp_strerror(err) << dendl;
1108 return -err;
1109 }
1110 ret = read_fsid(fd, &fsid);
1111 if (ret < 0) {
1112 derr << __FUNC__ << ": read error: " << cpp_strerror(ret) << dendl;
1113 VOID_TEMP_FAILURE_RETRY(::close(fd));
1114 return ret;
1115 }
1116 VOID_TEMP_FAILURE_RETRY(::close(fd));
1117
1118 ret = 0;
1119
1120 new_journal();
1121 if (journal) {
1122 ret = journal->check();
1123 if (ret < 0) {
1124 ret = journal->create();
1125 if (ret)
1126 derr << __FUNC__ << ": error creating journal on " << journalpath
1127 << ": " << cpp_strerror(ret) << dendl;
1128 else
1129 dout(0) << __FUNC__ << ": created journal on " << journalpath << dendl;
1130 }
1131 delete journal;
1132 journal = nullptr;
1133 }
1134 return ret;
1135 }
1136
1137 int FileStore::read_fsid(int fd, uuid_d *uuid)
1138 {
1139 char fsid_str[40];
1140 memset(fsid_str, 0, sizeof(fsid_str));
1141 int ret = safe_read(fd, fsid_str, sizeof(fsid_str));
1142 if (ret < 0)
1143 return ret;
1144 if (ret == 8) {
1145 // old 64-bit fsid... mirror it.
1146 *(uint64_t*)&uuid->bytes()[0] = *(uint64_t*)fsid_str;
1147 *(uint64_t*)&uuid->bytes()[8] = *(uint64_t*)fsid_str;
1148 return 0;
1149 }
1150
1151 if (ret > 36)
1152 fsid_str[36] = 0;
1153 else
1154 fsid_str[ret] = 0;
1155 if (!uuid->parse(fsid_str))
1156 return -EINVAL;
1157 return 0;
1158 }
1159
1160 int FileStore::lock_fsid()
1161 {
1162 struct flock l;
1163 memset(&l, 0, sizeof(l));
1164 l.l_type = F_WRLCK;
1165 l.l_whence = SEEK_SET;
1166 l.l_start = 0;
1167 l.l_len = 0;
1168 int r = ::fcntl(fsid_fd, F_SETLK, &l);
1169 if (r < 0) {
1170 int err = errno;
1171 dout(0) << __FUNC__ << ": failed to lock " << basedir << "/fsid, is another ceph-osd still running? "
1172 << cpp_strerror(err) << dendl;
1173 return -err;
1174 }
1175 return 0;
1176 }
1177
1178 bool FileStore::test_mount_in_use()
1179 {
1180 dout(5) << __FUNC__ << ": basedir " << basedir << " journal " << journalpath << dendl;
1181 char fn[PATH_MAX];
1182 snprintf(fn, sizeof(fn), "%s/fsid", basedir.c_str());
1183
1184 // verify fs isn't in use
1185
1186 fsid_fd = ::open(fn, O_RDWR|O_CLOEXEC, 0644);
1187 if (fsid_fd < 0)
1188 return 0; // no fsid, ok.
1189 bool inuse = lock_fsid() < 0;
1190 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1191 fsid_fd = -1;
1192 return inuse;
1193 }
1194
1195 bool FileStore::is_rotational()
1196 {
1197 bool rotational;
1198 if (backend) {
1199 rotational = backend->is_rotational();
1200 } else {
1201 int fd = ::open(basedir.c_str(), O_RDONLY|O_CLOEXEC);
1202 if (fd < 0)
1203 return true;
1204 struct statfs st;
1205 int r = ::fstatfs(fd, &st);
1206 ::close(fd);
1207 if (r < 0) {
1208 return true;
1209 }
1210 create_backend(st.f_type);
1211 rotational = backend->is_rotational();
1212 delete backend;
1213 backend = nullptr;
1214 }
1215 dout(10) << __func__ << " " << (int)rotational << dendl;
1216 return rotational;
1217 }
1218
1219 bool FileStore::is_journal_rotational()
1220 {
1221 bool journal_rotational;
1222 if (backend) {
1223 journal_rotational = backend->is_journal_rotational();
1224 } else {
1225 int fd = ::open(journalpath.c_str(), O_RDONLY|O_CLOEXEC);
1226 if (fd < 0)
1227 return true;
1228 struct statfs st;
1229 int r = ::fstatfs(fd, &st);
1230 ::close(fd);
1231 if (r < 0) {
1232 return true;
1233 }
1234 create_backend(st.f_type);
1235 journal_rotational = backend->is_journal_rotational();
1236 delete backend;
1237 backend = nullptr;
1238 }
1239 dout(10) << __func__ << " " << (int)journal_rotational << dendl;
1240 return journal_rotational;
1241 }
1242
1243 int FileStore::_detect_fs()
1244 {
1245 struct statfs st;
1246 int r = ::fstatfs(basedir_fd, &st);
1247 if (r < 0)
1248 return -errno;
1249
1250 blk_size = st.f_bsize;
1251
1252 #if defined(__linux__)
1253 if (st.f_type == BTRFS_SUPER_MAGIC &&
1254 !g_ceph_context->check_experimental_feature_enabled("btrfs")) {
1255 derr <<__FUNC__ << ": deprecated btrfs support is not enabled" << dendl;
1256 return -EPERM;
1257 }
1258 #endif
1259
1260 create_backend(st.f_type);
1261
1262 r = backend->detect_features();
1263 if (r < 0) {
1264 derr << __FUNC__ << ": detect_features error: " << cpp_strerror(r) << dendl;
1265 return r;
1266 }
1267
1268 // vdo
1269 {
1270 char dev_node[PATH_MAX];
1271 if (int rc = BlkDev{fsid_fd}.wholedisk(dev_node, PATH_MAX); rc == 0) {
1272 vdo_fd = get_vdo_stats_handle(dev_node, &vdo_name);
1273 if (vdo_fd >= 0) {
1274 dout(0) << __func__ << " VDO volume " << vdo_name << " for " << dev_node
1275 << dendl;
1276 }
1277 }
1278 }
1279
1280 // test xattrs
1281 char fn[PATH_MAX];
1282 int x = rand();
1283 int y = x+1;
1284 snprintf(fn, sizeof(fn), "%s/xattr_test", basedir.c_str());
1285 int tmpfd = ::open(fn, O_CREAT|O_WRONLY|O_TRUNC|O_CLOEXEC, 0700);
1286 if (tmpfd < 0) {
1287 int ret = -errno;
1288 derr << __FUNC__ << ": unable to create " << fn << ": " << cpp_strerror(ret) << dendl;
1289 return ret;
1290 }
1291
1292 int ret = chain_fsetxattr(tmpfd, "user.test", &x, sizeof(x));
1293 if (ret >= 0)
1294 ret = chain_fgetxattr(tmpfd, "user.test", &y, sizeof(y));
1295 if ((ret < 0) || (x != y)) {
1296 derr << "Extended attributes don't appear to work. ";
1297 if (ret)
1298 *_dout << "Got error " + cpp_strerror(ret) + ". ";
1299 *_dout << "If you are using ext3 or ext4, be sure to mount the underlying "
1300 << "file system with the 'user_xattr' option." << dendl;
1301 ::unlink(fn);
1302 VOID_TEMP_FAILURE_RETRY(::close(tmpfd));
1303 return -ENOTSUP;
1304 }
1305
1306 char buf[1000];
1307 memset(buf, 0, sizeof(buf)); // shut up valgrind
1308 chain_fsetxattr(tmpfd, "user.test", &buf, sizeof(buf));
1309 chain_fsetxattr(tmpfd, "user.test2", &buf, sizeof(buf));
1310 chain_fsetxattr(tmpfd, "user.test3", &buf, sizeof(buf));
1311 chain_fsetxattr(tmpfd, "user.test4", &buf, sizeof(buf));
1312 ret = chain_fsetxattr(tmpfd, "user.test5", &buf, sizeof(buf));
1313 if (ret == -ENOSPC) {
1314 dout(0) << "limited size xattrs" << dendl;
1315 }
1316 chain_fremovexattr(tmpfd, "user.test");
1317 chain_fremovexattr(tmpfd, "user.test2");
1318 chain_fremovexattr(tmpfd, "user.test3");
1319 chain_fremovexattr(tmpfd, "user.test4");
1320 chain_fremovexattr(tmpfd, "user.test5");
1321
1322 ::unlink(fn);
1323 VOID_TEMP_FAILURE_RETRY(::close(tmpfd));
1324
1325 return 0;
1326 }
1327
1328 int FileStore::_sanity_check_fs()
1329 {
1330 // sanity check(s)
1331
1332 if (((int)m_filestore_journal_writeahead +
1333 (int)m_filestore_journal_parallel +
1334 (int)m_filestore_journal_trailing) > 1) {
1335 dout(0) << "mount ERROR: more than one of filestore journal {writeahead,parallel,trailing} enabled" << dendl;
1336 cerr << TEXT_RED
1337 << " ** WARNING: more than one of 'filestore journal {writeahead,parallel,trailing}'\n"
1338 << " is enabled in ceph.conf. You must choose a single journal mode."
1339 << TEXT_NORMAL << std::endl;
1340 return -EINVAL;
1341 }
1342
1343 if (!backend->can_checkpoint()) {
1344 if (!journal || !m_filestore_journal_writeahead) {
1345 dout(0) << "mount WARNING: no btrfs, and no journal in writeahead mode; data may be lost" << dendl;
1346 cerr << TEXT_RED
1347 << " ** WARNING: no btrfs AND (no journal OR journal not in writeahead mode)\n"
1348 << " For non-btrfs volumes, a writeahead journal is required to\n"
1349 << " maintain on-disk consistency in the event of a crash. Your conf\n"
1350 << " should include something like:\n"
1351 << " osd journal = /path/to/journal_device_or_file\n"
1352 << " filestore journal writeahead = true\n"
1353 << TEXT_NORMAL;
1354 }
1355 }
1356
1357 if (!journal) {
1358 dout(0) << "mount WARNING: no journal" << dendl;
1359 cerr << TEXT_YELLOW
1360 << " ** WARNING: No osd journal is configured: write latency may be high.\n"
1361 << " If you will not be using an osd journal, write latency may be\n"
1362 << " relatively high. It can be reduced somewhat by lowering\n"
1363 << " filestore_max_sync_interval, but lower values mean lower write\n"
1364 << " throughput, especially with spinning disks.\n"
1365 << TEXT_NORMAL;
1366 }
1367
1368 return 0;
1369 }
1370
1371 int FileStore::write_superblock()
1372 {
1373 bufferlist bl;
1374 encode(superblock, bl);
1375 return safe_write_file(basedir.c_str(), "superblock",
1376 bl.c_str(), bl.length(), 0600);
1377 }
1378
1379 int FileStore::read_superblock()
1380 {
1381 bufferptr bp(PATH_MAX);
1382 int ret = safe_read_file(basedir.c_str(), "superblock",
1383 bp.c_str(), bp.length());
1384 if (ret < 0) {
1385 if (ret == -ENOENT) {
1386 // If the file doesn't exist write initial CompatSet
1387 return write_superblock();
1388 }
1389 return ret;
1390 }
1391
1392 bufferlist bl;
1393 bl.push_back(std::move(bp));
1394 auto i = bl.cbegin();
1395 decode(superblock, i);
1396 return 0;
1397 }
1398
1399 int FileStore::update_version_stamp()
1400 {
1401 return write_version_stamp();
1402 }
1403
1404 int FileStore::version_stamp_is_valid(uint32_t *version)
1405 {
1406 bufferptr bp(PATH_MAX);
1407 int ret = safe_read_file(basedir.c_str(), "store_version",
1408 bp.c_str(), bp.length());
1409 if (ret < 0) {
1410 return ret;
1411 }
1412 bufferlist bl;
1413 bl.push_back(std::move(bp));
1414 auto i = bl.cbegin();
1415 decode(*version, i);
1416 dout(10) << __FUNC__ << ": was " << *version << " vs target "
1417 << target_version << dendl;
1418 if (*version == target_version)
1419 return 1;
1420 else
1421 return 0;
1422 }
1423
1424 int FileStore::flush_cache(ostream *os)
1425 {
1426 string drop_caches_file = "/proc/sys/vm/drop_caches";
1427 int drop_caches_fd = ::open(drop_caches_file.c_str(), O_WRONLY|O_CLOEXEC), ret = 0;
1428 char buf[2] = "3";
1429 size_t len = strlen(buf);
1430
1431 if (drop_caches_fd < 0) {
1432 ret = -errno;
1433 derr << __FUNC__ << ": failed to open " << drop_caches_file << ": " << cpp_strerror(ret) << dendl;
1434 if (os) {
1435 *os << "FileStore flush_cache: failed to open " << drop_caches_file << ": " << cpp_strerror(ret);
1436 }
1437 return ret;
1438 }
1439
1440 if (::write(drop_caches_fd, buf, len) < 0) {
1441 ret = -errno;
1442 derr << __FUNC__ << ": failed to write to " << drop_caches_file << ": " << cpp_strerror(ret) << dendl;
1443 if (os) {
1444 *os << "FileStore flush_cache: failed to write to " << drop_caches_file << ": " << cpp_strerror(ret);
1445 }
1446 goto out;
1447 }
1448
1449 out:
1450 ::close(drop_caches_fd);
1451 return ret;
1452 }
1453
1454 int FileStore::write_version_stamp()
1455 {
1456 dout(1) << __FUNC__ << ": " << target_version << dendl;
1457 bufferlist bl;
1458 encode(target_version, bl);
1459
1460 return safe_write_file(basedir.c_str(), "store_version",
1461 bl.c_str(), bl.length(), 0600);
1462 }
1463
1464 int FileStore::upgrade()
1465 {
1466 dout(1) << __FUNC__ << dendl;
1467 uint32_t version;
1468 int r = version_stamp_is_valid(&version);
1469
1470 if (r == -ENOENT) {
1471 derr << "The store_version file doesn't exist." << dendl;
1472 return -EINVAL;
1473 }
1474 if (r < 0)
1475 return r;
1476 if (r == 1)
1477 return 0;
1478
1479 if (version < 3) {
1480 derr << "ObjectStore is old at version " << version << ". Please upgrade to firefly v0.80.x, convert your store, and then upgrade." << dendl;
1481 return -EINVAL;
1482 }
1483
1484 // nothing necessary in FileStore for v3 -> v4 upgrade; we just need to
1485 // open up DBObjectMap with the do_upgrade flag, which we already did.
1486 update_version_stamp();
1487 return 0;
1488 }
1489
1490 int FileStore::read_op_seq(uint64_t *seq)
1491 {
1492 int op_fd = ::open(current_op_seq_fn.c_str(), O_CREAT|O_RDWR|O_CLOEXEC, 0644);
1493 if (op_fd < 0) {
1494 int r = -errno;
1495 if (r == -EIO && m_filestore_fail_eio) handle_eio();
1496 return r;
1497 }
1498 char s[40];
1499 memset(s, 0, sizeof(s));
1500 int ret = safe_read(op_fd, s, sizeof(s) - 1);
1501 if (ret < 0) {
1502 derr << __FUNC__ << ": error reading " << current_op_seq_fn << ": " << cpp_strerror(ret) << dendl;
1503 VOID_TEMP_FAILURE_RETRY(::close(op_fd));
1504 ceph_assert(!m_filestore_fail_eio || ret != -EIO);
1505 return ret;
1506 }
1507 *seq = atoll(s);
1508 return op_fd;
1509 }
1510
1511 int FileStore::write_op_seq(int fd, uint64_t seq)
1512 {
1513 char s[30];
1514 snprintf(s, sizeof(s), "%" PRId64 "\n", seq);
1515 int ret = TEMP_FAILURE_RETRY(::pwrite(fd, s, strlen(s), 0));
1516 if (ret < 0) {
1517 ret = -errno;
1518 ceph_assert(!m_filestore_fail_eio || ret != -EIO);
1519 }
1520 return ret;
1521 }
1522
1523 int FileStore::mount()
1524 {
1525 int ret;
1526 char buf[PATH_MAX];
1527 uint64_t initial_op_seq;
1528 uuid_d omap_fsid;
1529 set<string> cluster_snaps;
1530 CompatSet supported_compat_set = get_fs_supported_compat_set();
1531
1532 dout(5) << "basedir " << basedir << " journal " << journalpath << dendl;
1533
1534 ret = set_throttle_params();
1535 if (ret != 0)
1536 goto done;
1537
1538 // make sure global base dir exists
1539 if (::access(basedir.c_str(), R_OK | W_OK)) {
1540 ret = -errno;
1541 derr << __FUNC__ << ": unable to access basedir '" << basedir << "': "
1542 << cpp_strerror(ret) << dendl;
1543 goto done;
1544 }
1545
1546 // get fsid
1547 snprintf(buf, sizeof(buf), "%s/fsid", basedir.c_str());
1548 fsid_fd = ::open(buf, O_RDWR|O_CLOEXEC, 0644);
1549 if (fsid_fd < 0) {
1550 ret = -errno;
1551 derr << __FUNC__ << ": error opening '" << buf << "': "
1552 << cpp_strerror(ret) << dendl;
1553 goto done;
1554 }
1555
1556 ret = read_fsid(fsid_fd, &fsid);
1557 if (ret < 0) {
1558 derr << __FUNC__ << ": error reading fsid_fd: " << cpp_strerror(ret)
1559 << dendl;
1560 goto close_fsid_fd;
1561 }
1562
1563 if (lock_fsid() < 0) {
1564 derr << __FUNC__ << ": lock_fsid failed" << dendl;
1565 ret = -EBUSY;
1566 goto close_fsid_fd;
1567 }
1568
1569 dout(10) << "mount fsid is " << fsid << dendl;
1570
1571
1572 uint32_t version_stamp;
1573 ret = version_stamp_is_valid(&version_stamp);
1574 if (ret < 0) {
1575 derr << __FUNC__ << ": error in version_stamp_is_valid: "
1576 << cpp_strerror(ret) << dendl;
1577 goto close_fsid_fd;
1578 } else if (ret == 0) {
1579 if (do_update || (int)version_stamp < cct->_conf->filestore_update_to) {
1580 derr << __FUNC__ << ": stale version stamp detected: "
1581 << version_stamp
1582 << ". Proceeding, do_update "
1583 << "is set, performing disk format upgrade."
1584 << dendl;
1585 do_update = true;
1586 } else {
1587 ret = -EINVAL;
1588 derr << __FUNC__ << ": stale version stamp " << version_stamp
1589 << ". Please run the FileStore update script before starting the "
1590 << "OSD, or set filestore_update_to to " << target_version
1591 << " (currently " << cct->_conf->filestore_update_to << ")"
1592 << dendl;
1593 goto close_fsid_fd;
1594 }
1595 }
1596
1597 ret = read_superblock();
1598 if (ret < 0) {
1599 goto close_fsid_fd;
1600 }
1601
1602 // Check if this FileStore supports all the necessary features to mount
1603 if (supported_compat_set.compare(superblock.compat_features) == -1) {
1604 derr << __FUNC__ << ": Incompatible features set "
1605 << superblock.compat_features << dendl;
1606 ret = -EINVAL;
1607 goto close_fsid_fd;
1608 }
1609
1610 // open some dir handles
1611 basedir_fd = ::open(basedir.c_str(), O_RDONLY|O_CLOEXEC);
1612 if (basedir_fd < 0) {
1613 ret = -errno;
1614 derr << __FUNC__ << ": failed to open " << basedir << ": "
1615 << cpp_strerror(ret) << dendl;
1616 basedir_fd = -1;
1617 goto close_fsid_fd;
1618 }
1619
1620 // test for btrfs, xattrs, etc.
1621 ret = _detect_fs();
1622 if (ret < 0) {
1623 derr << __FUNC__ << ": error in _detect_fs: "
1624 << cpp_strerror(ret) << dendl;
1625 goto close_basedir_fd;
1626 }
1627
1628 {
1629 list<string> ls;
1630 ret = backend->list_checkpoints(ls);
1631 if (ret < 0) {
1632 derr << __FUNC__ << ": error in _list_snaps: "<< cpp_strerror(ret) << dendl;
1633 goto close_basedir_fd;
1634 }
1635
1636 long long unsigned c, prev = 0;
1637 char clustersnap[NAME_MAX];
1638 for (list<string>::iterator it = ls.begin(); it != ls.end(); ++it) {
1639 if (sscanf(it->c_str(), COMMIT_SNAP_ITEM, &c) == 1) {
1640 ceph_assert(c > prev);
1641 prev = c;
1642 snaps.push_back(c);
1643 } else if (sscanf(it->c_str(), CLUSTER_SNAP_ITEM, clustersnap) == 1)
1644 cluster_snaps.insert(*it);
1645 }
1646 }
1647
1648 if (m_osd_rollback_to_cluster_snap.length() &&
1649 cluster_snaps.count(m_osd_rollback_to_cluster_snap) == 0) {
1650 derr << "rollback to cluster snapshot '" << m_osd_rollback_to_cluster_snap << "': not found" << dendl;
1651 ret = -ENOENT;
1652 goto close_basedir_fd;
1653 }
1654
1655 char nosnapfn[200];
1656 snprintf(nosnapfn, sizeof(nosnapfn), "%s/nosnap", current_fn.c_str());
1657
1658 if (backend->can_checkpoint()) {
1659 if (snaps.empty()) {
1660 dout(0) << __FUNC__ << ": WARNING: no consistent snaps found, store may be in inconsistent state" << dendl;
1661 } else {
1662 char s[NAME_MAX];
1663 uint64_t curr_seq = 0;
1664
1665 if (m_osd_rollback_to_cluster_snap.length()) {
1666 derr << TEXT_RED
1667 << " ** NOTE: rolling back to cluster snapshot " << m_osd_rollback_to_cluster_snap << " **"
1668 << TEXT_NORMAL
1669 << dendl;
1670 ceph_assert(cluster_snaps.count(m_osd_rollback_to_cluster_snap));
1671 snprintf(s, sizeof(s), CLUSTER_SNAP_ITEM, m_osd_rollback_to_cluster_snap.c_str());
1672 } else {
1673 {
1674 int fd = read_op_seq(&curr_seq);
1675 if (fd >= 0) {
1676 VOID_TEMP_FAILURE_RETRY(::close(fd));
1677 }
1678 }
1679 if (curr_seq)
1680 dout(10) << " current/ seq was " << curr_seq << dendl;
1681 else
1682 dout(10) << " current/ missing entirely (unusual, but okay)" << dendl;
1683
1684 uint64_t cp = snaps.back();
1685 dout(10) << " most recent snap from " << snaps << " is " << cp << dendl;
1686
1687 // if current/ is marked as non-snapshotted, refuse to roll
1688 // back (without clear direction) to avoid throwing out new
1689 // data.
1690 struct stat st;
1691 if (::stat(nosnapfn, &st) == 0) {
1692 if (!m_osd_use_stale_snap) {
1693 derr << "ERROR: " << nosnapfn << " exists, not rolling back to avoid losing new data" << dendl;
1694 derr << "Force rollback to old snapshotted version with 'osd use stale snap = true'" << dendl;
1695 derr << "config option for --osd-use-stale-snap startup argument." << dendl;
1696 ret = -ENOTSUP;
1697 goto close_basedir_fd;
1698 }
1699 derr << "WARNING: user forced start with data sequence mismatch: current was " << curr_seq
1700 << ", newest snap is " << cp << dendl;
1701 cerr << TEXT_YELLOW
1702 << " ** WARNING: forcing the use of stale snapshot data **"
1703 << TEXT_NORMAL << std::endl;
1704 }
1705
1706 dout(10) << __FUNC__ << ": rolling back to consistent snap " << cp << dendl;
1707 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)cp);
1708 }
1709
1710 // drop current?
1711 ret = backend->rollback_to(s);
1712 if (ret) {
1713 derr << __FUNC__ << ": error rolling back to " << s << ": "
1714 << cpp_strerror(ret) << dendl;
1715 goto close_basedir_fd;
1716 }
1717 }
1718 }
1719 initial_op_seq = 0;
1720
1721 current_fd = ::open(current_fn.c_str(), O_RDONLY|O_CLOEXEC);
1722 if (current_fd < 0) {
1723 ret = -errno;
1724 derr << __FUNC__ << ": error opening: " << current_fn << ": " << cpp_strerror(ret) << dendl;
1725 goto close_basedir_fd;
1726 }
1727
1728 ceph_assert(current_fd >= 0);
1729
1730 op_fd = read_op_seq(&initial_op_seq);
1731 if (op_fd < 0) {
1732 ret = op_fd;
1733 derr << __FUNC__ << ": read_op_seq failed" << dendl;
1734 goto close_current_fd;
1735 }
1736
1737 dout(5) << "mount op_seq is " << initial_op_seq << dendl;
1738 if (initial_op_seq == 0) {
1739 derr << "mount initial op seq is 0; something is wrong" << dendl;
1740 ret = -EINVAL;
1741 goto close_current_fd;
1742 }
1743
1744 if (!backend->can_checkpoint()) {
1745 // mark current/ as non-snapshotted so that we don't rollback away
1746 // from it.
1747 int r = ::creat(nosnapfn, 0644);
1748 if (r < 0) {
1749 ret = -errno;
1750 derr << __FUNC__ << ": failed to create current/nosnap" << dendl;
1751 goto close_current_fd;
1752 }
1753 VOID_TEMP_FAILURE_RETRY(::close(r));
1754 } else {
1755 // clear nosnap marker, if present.
1756 ::unlink(nosnapfn);
1757 }
1758
1759 // check fsid with omap
1760 // get omap fsid
1761 char omap_fsid_buf[PATH_MAX];
1762 struct ::stat omap_fsid_stat;
1763 snprintf(omap_fsid_buf, sizeof(omap_fsid_buf), "%s/osd_uuid", omap_dir.c_str());
1764 // if osd_uuid not exists, assume as this omap matchs corresponding osd
1765 if (::stat(omap_fsid_buf, &omap_fsid_stat) != 0){
1766 dout(10) << __FUNC__ << ": osd_uuid not found under omap, "
1767 << "assume as matched."
1768 << dendl;
1769 } else {
1770 int omap_fsid_fd;
1771 // if osd_uuid exists, compares osd_uuid with fsid
1772 omap_fsid_fd = ::open(omap_fsid_buf, O_RDONLY|O_CLOEXEC, 0644);
1773 if (omap_fsid_fd < 0) {
1774 ret = -errno;
1775 derr << __FUNC__ << ": error opening '" << omap_fsid_buf << "': "
1776 << cpp_strerror(ret)
1777 << dendl;
1778 goto close_current_fd;
1779 }
1780 ret = read_fsid(omap_fsid_fd, &omap_fsid);
1781 VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd));
1782 if (ret < 0) {
1783 derr << __FUNC__ << ": error reading omap_fsid_fd"
1784 << ", omap_fsid = " << omap_fsid
1785 << cpp_strerror(ret)
1786 << dendl;
1787 goto close_current_fd;
1788 }
1789 if (fsid != omap_fsid) {
1790 derr << __FUNC__ << ": " << omap_fsid_buf
1791 << " has existed omap fsid " << omap_fsid
1792 << " != expected osd fsid " << fsid
1793 << dendl;
1794 ret = -EINVAL;
1795 goto close_current_fd;
1796 }
1797 }
1798
1799 dout(0) << "start omap initiation" << dendl;
1800 if (!(generic_flags & SKIP_MOUNT_OMAP)) {
1801 KeyValueDB * omap_store = KeyValueDB::create(cct,
1802 superblock.omap_backend,
1803 omap_dir);
1804 if (!omap_store)
1805 {
1806 derr << __FUNC__ << ": Error creating " << superblock.omap_backend << dendl;
1807 ret = -1;
1808 goto close_current_fd;
1809 }
1810
1811 if (superblock.omap_backend == "rocksdb")
1812 ret = omap_store->init(cct->_conf->filestore_rocksdb_options);
1813 else
1814 ret = omap_store->init();
1815
1816 if (ret < 0) {
1817 derr << __FUNC__ << ": Error initializing omap_store: " << cpp_strerror(ret) << dendl;
1818 goto close_current_fd;
1819 }
1820
1821 stringstream err;
1822 if (omap_store->create_and_open(err)) {
1823 delete omap_store;
1824 omap_store = nullptr;
1825 derr << __FUNC__ << ": Error initializing " << superblock.omap_backend
1826 << " : " << err.str() << dendl;
1827 ret = -1;
1828 goto close_current_fd;
1829 }
1830
1831 DBObjectMap *dbomap = new DBObjectMap(cct, omap_store);
1832 ret = dbomap->init(do_update);
1833 if (ret < 0) {
1834 delete dbomap;
1835 dbomap = nullptr;
1836 derr << __FUNC__ << ": Error initializing DBObjectMap: " << ret << dendl;
1837 goto close_current_fd;
1838 }
1839 stringstream err2;
1840
1841 if (cct->_conf->filestore_debug_omap_check && !dbomap->check(err2)) {
1842 derr << err2.str() << dendl;
1843 delete dbomap;
1844 dbomap = nullptr;
1845 ret = -EINVAL;
1846 goto close_current_fd;
1847 }
1848 object_map.reset(dbomap);
1849 }
1850
1851 // journal
1852 new_journal();
1853
1854 // select journal mode?
1855 if (journal) {
1856 if (!m_filestore_journal_writeahead &&
1857 !m_filestore_journal_parallel &&
1858 !m_filestore_journal_trailing) {
1859 if (!backend->can_checkpoint()) {
1860 m_filestore_journal_writeahead = true;
1861 dout(0) << __FUNC__ << ": enabling WRITEAHEAD journal mode: checkpoint is not enabled" << dendl;
1862 } else {
1863 m_filestore_journal_parallel = true;
1864 dout(0) << __FUNC__ << ": enabling PARALLEL journal mode: fs, checkpoint is enabled" << dendl;
1865 }
1866 } else {
1867 if (m_filestore_journal_writeahead)
1868 dout(0) << __FUNC__ << ": WRITEAHEAD journal mode explicitly enabled in conf" << dendl;
1869 if (m_filestore_journal_parallel)
1870 dout(0) << __FUNC__ << ": PARALLEL journal mode explicitly enabled in conf" << dendl;
1871 if (m_filestore_journal_trailing)
1872 dout(0) << __FUNC__ << ": TRAILING journal mode explicitly enabled in conf" << dendl;
1873 }
1874 if (m_filestore_journal_writeahead)
1875 journal->set_wait_on_full(true);
1876 } else {
1877 dout(0) << __FUNC__ << ": no journal" << dendl;
1878 }
1879
1880 ret = _sanity_check_fs();
1881 if (ret) {
1882 derr << __FUNC__ << ": _sanity_check_fs failed with error "
1883 << ret << dendl;
1884 goto close_current_fd;
1885 }
1886
1887 // Cleanup possibly invalid collections
1888 {
1889 vector<coll_t> collections;
1890 ret = list_collections(collections, true);
1891 if (ret < 0) {
1892 derr << "Error " << ret << " while listing collections" << dendl;
1893 goto close_current_fd;
1894 }
1895 for (vector<coll_t>::iterator i = collections.begin();
1896 i != collections.end();
1897 ++i) {
1898 Index index;
1899 ret = get_index(*i, &index);
1900 if (ret < 0) {
1901 derr << "Unable to mount index " << *i
1902 << " with error: " << ret << dendl;
1903 goto close_current_fd;
1904 }
1905 ceph_assert(index.index);
1906 std::unique_lock l{(index.index)->access_lock};
1907
1908 index->cleanup();
1909 }
1910 }
1911 if (!m_disable_wbthrottle) {
1912 wbthrottle.start();
1913 } else {
1914 dout(0) << __FUNC__ << ": INFO: WbThrottle is disabled" << dendl;
1915 if (cct->_conf->filestore_odsync_write) {
1916 dout(0) << __FUNC__ << ": INFO: O_DSYNC write is enabled" << dendl;
1917 }
1918 }
1919 sync_thread.create("filestore_sync");
1920
1921 if (!(generic_flags & SKIP_JOURNAL_REPLAY)) {
1922 ret = journal_replay(initial_op_seq);
1923 if (ret < 0) {
1924 derr << __FUNC__ << ": failed to open journal " << journalpath << ": " << cpp_strerror(ret) << dendl;
1925 if (ret == -ENOTTY) {
1926 derr << "maybe journal is not pointing to a block device and its size "
1927 << "wasn't configured?" << dendl;
1928 }
1929
1930 goto stop_sync;
1931 }
1932 }
1933
1934 {
1935 stringstream err2;
1936 if (cct->_conf->filestore_debug_omap_check && !object_map->check(err2)) {
1937 derr << err2.str() << dendl;
1938 ret = -EINVAL;
1939 goto stop_sync;
1940 }
1941 }
1942
1943 init_temp_collections();
1944
1945 journal_start();
1946
1947 op_tp.start();
1948 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
1949 (*it)->start();
1950 }
1951 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
1952 (*it)->start();
1953 }
1954
1955 timer.init();
1956
1957 // upgrade?
1958 if (cct->_conf->filestore_update_to >= (int)get_target_version()) {
1959 int err = upgrade();
1960 if (err < 0) {
1961 derr << "error converting store" << dendl;
1962 umount();
1963 return err;
1964 }
1965 }
1966
1967 // all okay.
1968 return 0;
1969
1970 stop_sync:
1971 // stop sync thread
1972 {
1973 std::lock_guard l{lock};
1974 stop = true;
1975 sync_cond.notify_all();
1976 }
1977 sync_thread.join();
1978 if (!m_disable_wbthrottle) {
1979 wbthrottle.stop();
1980 }
1981 close_current_fd:
1982 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
1983 current_fd = -1;
1984 close_basedir_fd:
1985 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1986 basedir_fd = -1;
1987 close_fsid_fd:
1988 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1989 fsid_fd = -1;
1990 done:
1991 ceph_assert(!m_filestore_fail_eio || ret != -EIO);
1992 delete backend;
1993 backend = nullptr;
1994 object_map.reset();
1995 return ret;
1996 }
1997
1998 void FileStore::init_temp_collections()
1999 {
2000 dout(10) << __FUNC__ << dendl;
2001 vector<coll_t> ls;
2002 int r = list_collections(ls, true);
2003 ceph_assert(r >= 0);
2004
2005 dout(20) << " ls " << ls << dendl;
2006
2007 SequencerPosition spos;
2008
2009 set<coll_t> temps;
2010 for (vector<coll_t>::iterator p = ls.begin(); p != ls.end(); ++p)
2011 if (p->is_temp())
2012 temps.insert(*p);
2013 dout(20) << " temps " << temps << dendl;
2014
2015 for (vector<coll_t>::iterator p = ls.begin(); p != ls.end(); ++p) {
2016 if (p->is_temp())
2017 continue;
2018 coll_map[*p] = ceph::make_ref<OpSequencer>(cct, ++next_osr_id, *p);
2019 if (p->is_meta())
2020 continue;
2021 coll_t temp = p->get_temp();
2022 if (temps.count(temp)) {
2023 temps.erase(temp);
2024 } else {
2025 dout(10) << __FUNC__ << ": creating " << temp << dendl;
2026 r = _create_collection(temp, 0, spos);
2027 ceph_assert(r == 0);
2028 }
2029 }
2030
2031 for (set<coll_t>::iterator p = temps.begin(); p != temps.end(); ++p) {
2032 dout(10) << __FUNC__ << ": removing stray " << *p << dendl;
2033 r = _collection_remove_recursive(*p, spos);
2034 ceph_assert(r == 0);
2035 }
2036 }
2037
2038 int FileStore::umount()
2039 {
2040 dout(5) << __FUNC__ << ": " << basedir << dendl;
2041
2042 flush();
2043 sync();
2044 do_force_sync();
2045
2046 {
2047 std::lock_guard l(coll_lock);
2048 coll_map.clear();
2049 }
2050
2051 {
2052 std::lock_guard l{lock};
2053 stop = true;
2054 sync_cond.notify_all();
2055 }
2056 sync_thread.join();
2057 if (!m_disable_wbthrottle){
2058 wbthrottle.stop();
2059 }
2060 op_tp.stop();
2061
2062 journal_stop();
2063 if (!(generic_flags & SKIP_JOURNAL_REPLAY))
2064 journal_write_close();
2065
2066 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
2067 (*it)->stop();
2068 }
2069 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
2070 (*it)->stop();
2071 }
2072
2073 if (vdo_fd >= 0) {
2074 VOID_TEMP_FAILURE_RETRY(::close(vdo_fd));
2075 vdo_fd = -1;
2076 }
2077 if (fsid_fd >= 0) {
2078 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
2079 fsid_fd = -1;
2080 }
2081 if (op_fd >= 0) {
2082 VOID_TEMP_FAILURE_RETRY(::close(op_fd));
2083 op_fd = -1;
2084 }
2085 if (current_fd >= 0) {
2086 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
2087 current_fd = -1;
2088 }
2089 if (basedir_fd >= 0) {
2090 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
2091 basedir_fd = -1;
2092 }
2093
2094 force_sync = false;
2095
2096 delete backend;
2097 backend = nullptr;
2098
2099 object_map.reset();
2100
2101 {
2102 std::lock_guard l{sync_entry_timeo_lock};
2103 timer.shutdown();
2104 }
2105
2106 // nothing
2107 return 0;
2108 }
2109
2110
2111 /// -----------------------------
2112
2113 // keep OpSequencer handles alive for all time so that a sequence
2114 // that removes a collection and creates a new one will not allow
2115 // two sequencers for the same collection to be alive at once.
2116
2117 ObjectStore::CollectionHandle FileStore::open_collection(const coll_t& c)
2118 {
2119 std::lock_guard l{coll_lock};
2120 auto p = coll_map.find(c);
2121 if (p == coll_map.end()) {
2122 return CollectionHandle();
2123 }
2124 return p->second;
2125 }
2126
2127 ObjectStore::CollectionHandle FileStore::create_new_collection(const coll_t& c)
2128 {
2129 std::lock_guard l{coll_lock};
2130 auto p = coll_map.find(c);
2131 if (p == coll_map.end()) {
2132 auto r = ceph::make_ref<OpSequencer>(cct, ++next_osr_id, c);
2133 coll_map[c] = r;
2134 return r;
2135 } else {
2136 return p->second;
2137 }
2138 }
2139
2140
2141 /// -----------------------------
2142
2143 FileStore::Op *FileStore::build_op(vector<Transaction>& tls,
2144 Context *onreadable,
2145 Context *onreadable_sync,
2146 TrackedOpRef osd_op)
2147 {
2148 uint64_t bytes = 0, ops = 0;
2149 for (vector<Transaction>::iterator p = tls.begin();
2150 p != tls.end();
2151 ++p) {
2152 bytes += (*p).get_num_bytes();
2153 ops += (*p).get_num_ops();
2154 }
2155
2156 Op *o = new Op;
2157 o->start = ceph_clock_now();
2158 o->tls = std::move(tls);
2159 o->onreadable = onreadable;
2160 o->onreadable_sync = onreadable_sync;
2161 o->ops = ops;
2162 o->bytes = bytes;
2163 o->osd_op = osd_op;
2164 return o;
2165 }
2166
2167
2168
2169 void FileStore::queue_op(OpSequencer *osr, Op *o)
2170 {
2171 // queue op on sequencer, then queue sequencer for the threadpool,
2172 // so that regardless of which order the threads pick up the
2173 // sequencer, the op order will be preserved.
2174
2175 osr->queue(o);
2176 o->trace.event("queued");
2177
2178 logger->inc(l_filestore_ops);
2179 logger->inc(l_filestore_bytes, o->bytes);
2180
2181 dout(5) << __FUNC__ << ": " << o << " seq " << o->op
2182 << " " << *osr
2183 << " " << o->bytes << " bytes"
2184 << " (queue has " << throttle_ops.get_current() << " ops and " << throttle_bytes.get_current() << " bytes)"
2185 << dendl;
2186 op_wq.queue(osr);
2187 }
2188
2189 void FileStore::op_queue_reserve_throttle(Op *o)
2190 {
2191 throttle_ops.get();
2192 throttle_bytes.get(o->bytes);
2193
2194 logger->set(l_filestore_op_queue_ops, throttle_ops.get_current());
2195 logger->set(l_filestore_op_queue_bytes, throttle_bytes.get_current());
2196 }
2197
2198 void FileStore::op_queue_release_throttle(Op *o)
2199 {
2200 throttle_ops.put();
2201 throttle_bytes.put(o->bytes);
2202 logger->set(l_filestore_op_queue_ops, throttle_ops.get_current());
2203 logger->set(l_filestore_op_queue_bytes, throttle_bytes.get_current());
2204 }
2205
2206 void FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)
2207 {
2208 if (!m_disable_wbthrottle) {
2209 wbthrottle.throttle();
2210 }
2211 // inject a stall?
2212 if (cct->_conf->filestore_inject_stall) {
2213 int orig = cct->_conf->filestore_inject_stall;
2214 dout(5) << __FUNC__ << ": filestore_inject_stall " << orig << ", sleeping" << dendl;
2215 sleep(orig);
2216 cct->_conf.set_val("filestore_inject_stall", "0");
2217 dout(5) << __FUNC__ << ": done stalling" << dendl;
2218 }
2219
2220 osr->apply_lock.lock();
2221 Op *o = osr->peek_queue();
2222 o->trace.event("op_apply_start");
2223 apply_manager.op_apply_start(o->op);
2224 dout(5) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << " start" << dendl;
2225 o->trace.event("_do_transactions start");
2226 int r = _do_transactions(o->tls, o->op, &handle, osr->osr_name);
2227 o->trace.event("op_apply_finish");
2228 apply_manager.op_apply_finish(o->op);
2229 dout(10) << __FUNC__ << ": " << o << " seq " << o->op << " r = " << r
2230 << ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl;
2231 }
2232
2233 void FileStore::_finish_op(OpSequencer *osr)
2234 {
2235 list<Context*> to_queue;
2236 Op *o = osr->dequeue(&to_queue);
2237
2238 o->tls.clear();
2239
2240 utime_t lat = ceph_clock_now();
2241 lat -= o->start;
2242
2243 dout(10) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << " lat " << lat << dendl;
2244 osr->apply_lock.unlock(); // locked in _do_op
2245 o->trace.event("_finish_op");
2246
2247 // called with tp lock held
2248 op_queue_release_throttle(o);
2249
2250 logger->tinc(l_filestore_apply_latency, lat);
2251
2252 if (o->onreadable_sync) {
2253 o->onreadable_sync->complete(0);
2254 }
2255 if (o->onreadable) {
2256 apply_finishers[osr->id % m_apply_finisher_num]->queue(o->onreadable);
2257 }
2258 if (!to_queue.empty()) {
2259 apply_finishers[osr->id % m_apply_finisher_num]->queue(to_queue);
2260 }
2261 delete o;
2262 o = nullptr;
2263 }
2264
2265 struct C_JournaledAhead : public Context {
2266 FileStore *fs;
2267 FileStore::OpSequencer *osr;
2268 FileStore::Op *o;
2269 Context *ondisk;
2270
2271 C_JournaledAhead(FileStore *f, FileStore::OpSequencer *os, FileStore::Op *o, Context *ondisk):
2272 fs(f), osr(os), o(o), ondisk(ondisk) { }
2273 void finish(int r) override {
2274 fs->_journaled_ahead(osr, o, ondisk);
2275 }
2276 };
2277
2278 int FileStore::queue_transactions(CollectionHandle& ch, vector<Transaction>& tls,
2279 TrackedOpRef osd_op,
2280 ThreadPool::TPHandle *handle)
2281 {
2282 Context *onreadable;
2283 Context *ondisk;
2284 Context *onreadable_sync;
2285 ObjectStore::Transaction::collect_contexts(
2286 tls, &onreadable, &ondisk, &onreadable_sync);
2287
2288 if (cct->_conf->objectstore_blackhole) {
2289 dout(0) << __FUNC__ << ": objectstore_blackhole = TRUE, dropping transaction"
2290 << dendl;
2291 delete ondisk;
2292 ondisk = nullptr;
2293 delete onreadable;
2294 onreadable = nullptr;
2295 delete onreadable_sync;
2296 onreadable_sync = nullptr;
2297 return 0;
2298 }
2299
2300 utime_t start = ceph_clock_now();
2301
2302 OpSequencer *osr = static_cast<OpSequencer*>(ch.get());
2303 dout(5) << __FUNC__ << ": osr " << osr << " " << *osr << dendl;
2304
2305 ZTracer::Trace trace;
2306 if (osd_op && osd_op->pg_trace) {
2307 osd_op->store_trace.init("filestore op", &trace_endpoint, &osd_op->pg_trace);
2308 trace = osd_op->store_trace;
2309 }
2310
2311 if (journal && journal->is_writeable() && !m_filestore_journal_trailing) {
2312 Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
2313
2314 //prepare and encode transactions data out of lock
2315 bufferlist tbl;
2316 int orig_len = journal->prepare_entry(o->tls, &tbl);
2317
2318 if (handle)
2319 handle->suspend_tp_timeout();
2320
2321 op_queue_reserve_throttle(o);
2322 journal->reserve_throttle_and_backoff(tbl.length());
2323
2324 if (handle)
2325 handle->reset_tp_timeout();
2326
2327 uint64_t op_num = submit_manager.op_submit_start();
2328 o->op = op_num;
2329 trace.keyval("opnum", op_num);
2330
2331 if (m_filestore_do_dump)
2332 dump_transactions(o->tls, o->op, osr);
2333
2334 if (m_filestore_journal_parallel) {
2335 dout(5) << __FUNC__ << ": (parallel) " << o->op << " " << o->tls << dendl;
2336
2337 trace.keyval("journal mode", "parallel");
2338 trace.event("journal started");
2339 _op_journal_transactions(tbl, orig_len, o->op, ondisk, osd_op);
2340
2341 // queue inside submit_manager op submission lock
2342 queue_op(osr, o);
2343 trace.event("op queued");
2344 } else if (m_filestore_journal_writeahead) {
2345 dout(5) << __FUNC__ << ": (writeahead) " << o->op << " " << o->tls << dendl;
2346
2347 osr->queue_journal(o);
2348
2349 trace.keyval("journal mode", "writeahead");
2350 trace.event("journal started");
2351 _op_journal_transactions(tbl, orig_len, o->op,
2352 new C_JournaledAhead(this, osr, o, ondisk),
2353 osd_op);
2354 } else {
2355 ceph_abort();
2356 }
2357 submit_manager.op_submit_finish(op_num);
2358 utime_t end = ceph_clock_now();
2359 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2360 return 0;
2361 }
2362
2363 if (!journal) {
2364 Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
2365 dout(5) << __FUNC__ << ": (no journal) " << o << " " << tls << dendl;
2366
2367 if (handle)
2368 handle->suspend_tp_timeout();
2369
2370 op_queue_reserve_throttle(o);
2371
2372 if (handle)
2373 handle->reset_tp_timeout();
2374
2375 uint64_t op_num = submit_manager.op_submit_start();
2376 o->op = op_num;
2377
2378 if (m_filestore_do_dump)
2379 dump_transactions(o->tls, o->op, osr);
2380
2381 queue_op(osr, o);
2382 trace.keyval("opnum", op_num);
2383 trace.keyval("journal mode", "none");
2384 trace.event("op queued");
2385
2386 if (ondisk)
2387 apply_manager.add_waiter(op_num, ondisk);
2388 submit_manager.op_submit_finish(op_num);
2389 utime_t end = ceph_clock_now();
2390 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2391 return 0;
2392 }
2393
2394 ceph_assert(journal);
2395 //prepare and encode transactions data out of lock
2396 bufferlist tbl;
2397 int orig_len = -1;
2398 if (journal->is_writeable()) {
2399 orig_len = journal->prepare_entry(tls, &tbl);
2400 }
2401 uint64_t op = submit_manager.op_submit_start();
2402 dout(5) << __FUNC__ << ": (trailing journal) " << op << " " << tls << dendl;
2403
2404 if (m_filestore_do_dump)
2405 dump_transactions(tls, op, osr);
2406
2407 trace.event("op_apply_start");
2408 trace.keyval("opnum", op);
2409 trace.keyval("journal mode", "trailing");
2410 apply_manager.op_apply_start(op);
2411 trace.event("do_transactions");
2412 int r = do_transactions(tls, op);
2413
2414 if (r >= 0) {
2415 trace.event("journal started");
2416 _op_journal_transactions(tbl, orig_len, op, ondisk, osd_op);
2417 } else {
2418 delete ondisk;
2419 ondisk = nullptr;
2420 }
2421
2422 // start on_readable finisher after we queue journal item, as on_readable callback
2423 // is allowed to delete the Transaction
2424 if (onreadable_sync) {
2425 onreadable_sync->complete(r);
2426 }
2427 apply_finishers[osr->id % m_apply_finisher_num]->queue(onreadable, r);
2428
2429 submit_manager.op_submit_finish(op);
2430 trace.event("op_apply_finish");
2431 apply_manager.op_apply_finish(op);
2432
2433 utime_t end = ceph_clock_now();
2434 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2435 return r;
2436 }
2437
2438 void FileStore::_journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk)
2439 {
2440 dout(5) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << " " << o->tls << dendl;
2441
2442 o->trace.event("writeahead journal finished");
2443
2444 // this should queue in order because the journal does it's completions in order.
2445 queue_op(osr, o);
2446
2447 list<Context*> to_queue;
2448 osr->dequeue_journal(&to_queue);
2449
2450 // do ondisk completions async, to prevent any onreadable_sync completions
2451 // getting blocked behind an ondisk completion.
2452 if (ondisk) {
2453 dout(10) << " queueing ondisk " << ondisk << dendl;
2454 ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(ondisk);
2455 }
2456 if (!to_queue.empty()) {
2457 ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(to_queue);
2458 }
2459 }
2460
2461 int FileStore::_do_transactions(
2462 vector<Transaction> &tls,
2463 uint64_t op_seq,
2464 ThreadPool::TPHandle *handle,
2465 const char *osr_name)
2466 {
2467 int trans_num = 0;
2468
2469 for (vector<Transaction>::iterator p = tls.begin();
2470 p != tls.end();
2471 ++p, trans_num++) {
2472 _do_transaction(*p, op_seq, trans_num, handle, osr_name);
2473 if (handle)
2474 handle->reset_tp_timeout();
2475 }
2476
2477 return 0;
2478 }
2479
2480 void FileStore::_set_global_replay_guard(const coll_t& cid,
2481 const SequencerPosition &spos)
2482 {
2483 if (backend->can_checkpoint())
2484 return;
2485
2486 // sync all previous operations on this sequencer
2487 int ret = object_map->sync();
2488 if (ret < 0) {
2489 derr << __FUNC__ << ": omap sync error " << cpp_strerror(ret) << dendl;
2490 ceph_abort_msg("_set_global_replay_guard failed");
2491 }
2492 ret = sync_filesystem(basedir_fd);
2493 if (ret < 0) {
2494 derr << __FUNC__ << ": sync_filesystem error " << cpp_strerror(ret) << dendl;
2495 ceph_abort_msg("_set_global_replay_guard failed");
2496 }
2497
2498 char fn[PATH_MAX];
2499 get_cdir(cid, fn, sizeof(fn));
2500 int fd = ::open(fn, O_RDONLY|O_CLOEXEC);
2501 if (fd < 0) {
2502 int err = errno;
2503 derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
2504 ceph_abort_msg("_set_global_replay_guard failed");
2505 }
2506
2507 _inject_failure();
2508
2509 // then record that we did it
2510 bufferlist v;
2511 encode(spos, v);
2512 int r = chain_fsetxattr<true, true>(
2513 fd, GLOBAL_REPLAY_GUARD_XATTR, v.c_str(), v.length());
2514 if (r < 0) {
2515 derr << __FUNC__ << ": fsetxattr " << GLOBAL_REPLAY_GUARD_XATTR
2516 << " got " << cpp_strerror(r) << dendl;
2517 ceph_abort_msg("fsetxattr failed");
2518 }
2519
2520 // and make sure our xattr is durable.
2521 r = ::fsync(fd);
2522 if (r < 0) {
2523 derr << __func__ << " fsync failed: " << cpp_strerror(errno) << dendl;
2524 ceph_abort();
2525 }
2526
2527 _inject_failure();
2528
2529 VOID_TEMP_FAILURE_RETRY(::close(fd));
2530 dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
2531 }
2532
2533 int FileStore::_check_global_replay_guard(const coll_t& cid,
2534 const SequencerPosition& spos)
2535 {
2536 char fn[PATH_MAX];
2537 get_cdir(cid, fn, sizeof(fn));
2538 int fd = ::open(fn, O_RDONLY|O_CLOEXEC);
2539 if (fd < 0) {
2540 dout(10) << __FUNC__ << ": " << cid << " dne" << dendl;
2541 return 1; // if collection does not exist, there is no guard, and we can replay.
2542 }
2543
2544 char buf[100];
2545 int r = chain_fgetxattr(fd, GLOBAL_REPLAY_GUARD_XATTR, buf, sizeof(buf));
2546 if (r < 0) {
2547 dout(20) << __FUNC__ << ": no xattr" << dendl;
2548 if (r == -EIO && m_filestore_fail_eio) handle_eio();
2549 VOID_TEMP_FAILURE_RETRY(::close(fd));
2550 return 1; // no xattr
2551 }
2552 bufferlist bl;
2553 bl.append(buf, r);
2554
2555 SequencerPosition opos;
2556 auto p = bl.cbegin();
2557 decode(opos, p);
2558
2559 VOID_TEMP_FAILURE_RETRY(::close(fd));
2560 return spos >= opos ? 1 : -1;
2561 }
2562
2563
2564 void FileStore::_set_replay_guard(const coll_t& cid,
2565 const SequencerPosition &spos,
2566 bool in_progress=false)
2567 {
2568 char fn[PATH_MAX];
2569 get_cdir(cid, fn, sizeof(fn));
2570 int fd = ::open(fn, O_RDONLY|O_CLOEXEC);
2571 if (fd < 0) {
2572 int err = errno;
2573 derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
2574 ceph_abort_msg("_set_replay_guard failed");
2575 }
2576 _set_replay_guard(fd, spos, 0, in_progress);
2577 VOID_TEMP_FAILURE_RETRY(::close(fd));
2578 }
2579
2580
2581 void FileStore::_set_replay_guard(int fd,
2582 const SequencerPosition& spos,
2583 const ghobject_t *hoid,
2584 bool in_progress)
2585 {
2586 if (backend->can_checkpoint())
2587 return;
2588
2589 dout(10) << __FUNC__ << ": " << spos << (in_progress ? " START" : "") << dendl;
2590
2591 _inject_failure();
2592
2593 // first make sure the previous operation commits
2594 int r = ::fsync(fd);
2595 if (r < 0) {
2596 derr << __func__ << " fsync failed: " << cpp_strerror(errno) << dendl;
2597 ceph_abort();
2598 }
2599
2600 if (!in_progress) {
2601 // sync object_map too. even if this object has a header or keys,
2602 // it have had them in the past and then removed them, so always
2603 // sync.
2604 object_map->sync(hoid, &spos);
2605 }
2606
2607 _inject_failure();
2608
2609 // then record that we did it
2610 bufferlist v(40);
2611 encode(spos, v);
2612 encode(in_progress, v);
2613 r = chain_fsetxattr<true, true>(
2614 fd, REPLAY_GUARD_XATTR, v.c_str(), v.length());
2615 if (r < 0) {
2616 derr << "fsetxattr " << REPLAY_GUARD_XATTR << " got " << cpp_strerror(r) << dendl;
2617 ceph_abort_msg("fsetxattr failed");
2618 }
2619
2620 // and make sure our xattr is durable.
2621 r = ::fsync(fd);
2622 if (r < 0) {
2623 derr << __func__ << " fsync failed: " << cpp_strerror(errno) << dendl;
2624 ceph_abort();
2625 }
2626
2627 _inject_failure();
2628
2629 dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
2630 }
2631
2632 void FileStore::_close_replay_guard(const coll_t& cid,
2633 const SequencerPosition &spos)
2634 {
2635 char fn[PATH_MAX];
2636 get_cdir(cid, fn, sizeof(fn));
2637 int fd = ::open(fn, O_RDONLY|O_CLOEXEC);
2638 if (fd < 0) {
2639 int err = errno;
2640 derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
2641 ceph_abort_msg("_close_replay_guard failed");
2642 }
2643 _close_replay_guard(fd, spos);
2644 VOID_TEMP_FAILURE_RETRY(::close(fd));
2645 }
2646
2647 void FileStore::_close_replay_guard(int fd, const SequencerPosition& spos,
2648 const ghobject_t *hoid)
2649 {
2650 if (backend->can_checkpoint())
2651 return;
2652
2653 dout(10) << __FUNC__ << ": " << spos << dendl;
2654
2655 _inject_failure();
2656
2657 // sync object_map too. even if this object has a header or keys,
2658 // it have had them in the past and then removed them, so always
2659 // sync.
2660 object_map->sync(hoid, &spos);
2661
2662 // then record that we are done with this operation
2663 bufferlist v(40);
2664 encode(spos, v);
2665 bool in_progress = false;
2666 encode(in_progress, v);
2667 int r = chain_fsetxattr<true, true>(
2668 fd, REPLAY_GUARD_XATTR, v.c_str(), v.length());
2669 if (r < 0) {
2670 derr << "fsetxattr " << REPLAY_GUARD_XATTR << " got " << cpp_strerror(r) << dendl;
2671 ceph_abort_msg("fsetxattr failed");
2672 }
2673
2674 // and make sure our xattr is durable.
2675 r = ::fsync(fd);
2676 if (r < 0) {
2677 derr << __func__ << " fsync failed: " << cpp_strerror(errno) << dendl;
2678 ceph_abort();
2679 }
2680
2681 _inject_failure();
2682
2683 dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
2684 }
2685
2686 int FileStore::_check_replay_guard(const coll_t& cid, const ghobject_t &oid,
2687 const SequencerPosition& spos)
2688 {
2689 if (!replaying || backend->can_checkpoint())
2690 return 1;
2691
2692 int r = _check_global_replay_guard(cid, spos);
2693 if (r < 0)
2694 return r;
2695
2696 FDRef fd;
2697 r = lfn_open(cid, oid, false, &fd);
2698 if (r < 0) {
2699 dout(10) << __FUNC__ << ": " << cid << " " << oid << " dne" << dendl;
2700 return 1; // if file does not exist, there is no guard, and we can replay.
2701 }
2702 int ret = _check_replay_guard(**fd, spos);
2703 lfn_close(fd);
2704 return ret;
2705 }
2706
2707 int FileStore::_check_replay_guard(const coll_t& cid, const SequencerPosition& spos)
2708 {
2709 if (!replaying || backend->can_checkpoint())
2710 return 1;
2711
2712 char fn[PATH_MAX];
2713 get_cdir(cid, fn, sizeof(fn));
2714 int fd = ::open(fn, O_RDONLY|O_CLOEXEC);
2715 if (fd < 0) {
2716 dout(10) << __FUNC__ << ": " << cid << " dne" << dendl;
2717 return 1; // if collection does not exist, there is no guard, and we can replay.
2718 }
2719 int ret = _check_replay_guard(fd, spos);
2720 VOID_TEMP_FAILURE_RETRY(::close(fd));
2721 return ret;
2722 }
2723
2724 int FileStore::_check_replay_guard(int fd, const SequencerPosition& spos)
2725 {
2726 if (!replaying || backend->can_checkpoint())
2727 return 1;
2728
2729 char buf[100];
2730 int r = chain_fgetxattr(fd, REPLAY_GUARD_XATTR, buf, sizeof(buf));
2731 if (r < 0) {
2732 dout(20) << __FUNC__ << ": no xattr" << dendl;
2733 if (r == -EIO && m_filestore_fail_eio) handle_eio();
2734 return 1; // no xattr
2735 }
2736 bufferlist bl;
2737 bl.append(buf, r);
2738
2739 SequencerPosition opos;
2740 auto p = bl.cbegin();
2741 decode(opos, p);
2742 bool in_progress = false;
2743 if (!p.end()) // older journals don't have this
2744 decode(in_progress, p);
2745 if (opos > spos) {
2746 dout(10) << __FUNC__ << ": object has " << opos << " > current pos " << spos
2747 << ", now or in future, SKIPPING REPLAY" << dendl;
2748 return -1;
2749 } else if (opos == spos) {
2750 if (in_progress) {
2751 dout(10) << __FUNC__ << ": object has " << opos << " == current pos " << spos
2752 << ", in_progress=true, CONDITIONAL REPLAY" << dendl;
2753 return 0;
2754 } else {
2755 dout(10) << __FUNC__ << ": object has " << opos << " == current pos " << spos
2756 << ", in_progress=false, SKIPPING REPLAY" << dendl;
2757 return -1;
2758 }
2759 } else {
2760 dout(10) << __FUNC__ << ": object has " << opos << " < current pos " << spos
2761 << ", in past, will replay" << dendl;
2762 return 1;
2763 }
2764 }
2765
2766 void FileStore::_do_transaction(
2767 Transaction& t, uint64_t op_seq, int trans_num,
2768 ThreadPool::TPHandle *handle,
2769 const char *osr_name)
2770 {
2771 dout(10) << __FUNC__ << ": on " << &t << dendl;
2772
2773 Transaction::iterator i = t.begin();
2774
2775 SequencerPosition spos(op_seq, trans_num, 0);
2776 while (i.have_op()) {
2777 if (handle)
2778 handle->reset_tp_timeout();
2779
2780 Transaction::Op *op = i.decode_op();
2781 int r = 0;
2782
2783 _inject_failure();
2784
2785 switch (op->op) {
2786 case Transaction::OP_NOP:
2787 break;
2788 case Transaction::OP_TOUCH:
2789 case Transaction::OP_CREATE:
2790 {
2791 const coll_t &_cid = i.get_cid(op->cid);
2792 const ghobject_t &oid = i.get_oid(op->oid);
2793 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2794 _cid : _cid.get_temp();
2795 tracepoint(objectstore, touch_enter, osr_name);
2796 if (_check_replay_guard(cid, oid, spos) > 0)
2797 r = _touch(cid, oid);
2798 tracepoint(objectstore, touch_exit, r);
2799 }
2800 break;
2801
2802 case Transaction::OP_WRITE:
2803 {
2804 const coll_t &_cid = i.get_cid(op->cid);
2805 const ghobject_t &oid = i.get_oid(op->oid);
2806 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2807 _cid : _cid.get_temp();
2808 uint64_t off = op->off;
2809 uint64_t len = op->len;
2810 uint32_t fadvise_flags = i.get_fadvise_flags();
2811 bufferlist bl;
2812 i.decode_bl(bl);
2813 tracepoint(objectstore, write_enter, osr_name, off, len);
2814 if (_check_replay_guard(cid, oid, spos) > 0)
2815 r = _write(cid, oid, off, len, bl, fadvise_flags);
2816 tracepoint(objectstore, write_exit, r);
2817 }
2818 break;
2819
2820 case Transaction::OP_ZERO:
2821 {
2822 const coll_t &_cid = i.get_cid(op->cid);
2823 const ghobject_t &oid = i.get_oid(op->oid);
2824 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2825 _cid : _cid.get_temp();
2826 uint64_t off = op->off;
2827 uint64_t len = op->len;
2828 tracepoint(objectstore, zero_enter, osr_name, off, len);
2829 if (_check_replay_guard(cid, oid, spos) > 0)
2830 r = _zero(cid, oid, off, len);
2831 tracepoint(objectstore, zero_exit, r);
2832 }
2833 break;
2834
2835 case Transaction::OP_TRIMCACHE:
2836 {
2837 // deprecated, no-op
2838 }
2839 break;
2840
2841 case Transaction::OP_TRUNCATE:
2842 {
2843 const coll_t &_cid = i.get_cid(op->cid);
2844 const ghobject_t &oid = i.get_oid(op->oid);
2845 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2846 _cid : _cid.get_temp();
2847 uint64_t off = op->off;
2848 tracepoint(objectstore, truncate_enter, osr_name, off);
2849 if (_check_replay_guard(cid, oid, spos) > 0)
2850 r = _truncate(cid, oid, off);
2851 tracepoint(objectstore, truncate_exit, r);
2852 }
2853 break;
2854
2855 case Transaction::OP_REMOVE:
2856 {
2857 const coll_t &_cid = i.get_cid(op->cid);
2858 const ghobject_t &oid = i.get_oid(op->oid);
2859 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2860 _cid : _cid.get_temp();
2861 tracepoint(objectstore, remove_enter, osr_name);
2862 if (_check_replay_guard(cid, oid, spos) > 0)
2863 r = _remove(cid, oid, spos);
2864 tracepoint(objectstore, remove_exit, r);
2865 }
2866 break;
2867
2868 case Transaction::OP_SETATTR:
2869 {
2870 const coll_t &_cid = i.get_cid(op->cid);
2871 const ghobject_t &oid = i.get_oid(op->oid);
2872 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2873 _cid : _cid.get_temp();
2874 string name = i.decode_string();
2875 bufferlist bl;
2876 i.decode_bl(bl);
2877 tracepoint(objectstore, setattr_enter, osr_name);
2878 if (_check_replay_guard(cid, oid, spos) > 0) {
2879 map<string, bufferptr> to_set;
2880 to_set[name] = bufferptr(bl.c_str(), bl.length());
2881 r = _setattrs(cid, oid, to_set, spos);
2882 if (r == -ENOSPC)
2883 dout(0) << " ENOSPC on setxattr on " << cid << "/" << oid
2884 << " name " << name << " size " << bl.length() << dendl;
2885 }
2886 tracepoint(objectstore, setattr_exit, r);
2887 }
2888 break;
2889
2890 case Transaction::OP_SETATTRS:
2891 {
2892 const coll_t &_cid = i.get_cid(op->cid);
2893 const ghobject_t &oid = i.get_oid(op->oid);
2894 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2895 _cid : _cid.get_temp();
2896 map<string, bufferptr> aset;
2897 i.decode_attrset(aset);
2898 tracepoint(objectstore, setattrs_enter, osr_name);
2899 if (_check_replay_guard(cid, oid, spos) > 0)
2900 r = _setattrs(cid, oid, aset, spos);
2901 tracepoint(objectstore, setattrs_exit, r);
2902 if (r == -ENOSPC)
2903 dout(0) << " ENOSPC on setxattrs on " << cid << "/" << oid << dendl;
2904 }
2905 break;
2906
2907 case Transaction::OP_RMATTR:
2908 {
2909 const coll_t &_cid = i.get_cid(op->cid);
2910 const ghobject_t &oid = i.get_oid(op->oid);
2911 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2912 _cid : _cid.get_temp();
2913 string name = i.decode_string();
2914 tracepoint(objectstore, rmattr_enter, osr_name);
2915 if (_check_replay_guard(cid, oid, spos) > 0)
2916 r = _rmattr(cid, oid, name.c_str(), spos);
2917 tracepoint(objectstore, rmattr_exit, r);
2918 }
2919 break;
2920
2921 case Transaction::OP_RMATTRS:
2922 {
2923 const coll_t &_cid = i.get_cid(op->cid);
2924 const ghobject_t &oid = i.get_oid(op->oid);
2925 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2926 _cid : _cid.get_temp();
2927 tracepoint(objectstore, rmattrs_enter, osr_name);
2928 if (_check_replay_guard(cid, oid, spos) > 0)
2929 r = _rmattrs(cid, oid, spos);
2930 tracepoint(objectstore, rmattrs_exit, r);
2931 }
2932 break;
2933
2934 case Transaction::OP_CLONE:
2935 {
2936 const coll_t &_cid = i.get_cid(op->cid);
2937 const ghobject_t &oid = i.get_oid(op->oid);
2938 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2939 _cid : _cid.get_temp();
2940 const ghobject_t &noid = i.get_oid(op->dest_oid);
2941 tracepoint(objectstore, clone_enter, osr_name);
2942 r = _clone(cid, oid, noid, spos);
2943 tracepoint(objectstore, clone_exit, r);
2944 }
2945 break;
2946
2947 case Transaction::OP_CLONERANGE:
2948 {
2949 const coll_t &_cid = i.get_cid(op->cid);
2950 const ghobject_t &oid = i.get_oid(op->oid);
2951 const ghobject_t &noid = i.get_oid(op->dest_oid);
2952 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2953 _cid : _cid.get_temp();
2954 const coll_t &ncid = !_need_temp_object_collection(_cid, noid) ?
2955 _cid : _cid.get_temp();
2956 uint64_t off = op->off;
2957 uint64_t len = op->len;
2958 tracepoint(objectstore, clone_range_enter, osr_name, len);
2959 r = _clone_range(cid, oid, ncid, noid, off, len, off, spos);
2960 tracepoint(objectstore, clone_range_exit, r);
2961 }
2962 break;
2963
2964 case Transaction::OP_CLONERANGE2:
2965 {
2966 const coll_t &_cid = i.get_cid(op->cid);
2967 const ghobject_t &oid = i.get_oid(op->oid);
2968 const ghobject_t &noid = i.get_oid(op->dest_oid);
2969 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2970 _cid : _cid.get_temp();
2971 const coll_t &ncid = !_need_temp_object_collection(_cid, noid) ?
2972 _cid : _cid.get_temp();
2973 uint64_t srcoff = op->off;
2974 uint64_t len = op->len;
2975 uint64_t dstoff = op->dest_off;
2976 tracepoint(objectstore, clone_range2_enter, osr_name, len);
2977 r = _clone_range(cid, oid, ncid, noid, srcoff, len, dstoff, spos);
2978 tracepoint(objectstore, clone_range2_exit, r);
2979 }
2980 break;
2981
2982 case Transaction::OP_MKCOLL:
2983 {
2984 const coll_t &cid = i.get_cid(op->cid);
2985 tracepoint(objectstore, mkcoll_enter, osr_name);
2986 if (_check_replay_guard(cid, spos) > 0)
2987 r = _create_collection(cid, op->split_bits, spos);
2988 tracepoint(objectstore, mkcoll_exit, r);
2989 }
2990 break;
2991
2992 case Transaction::OP_COLL_SET_BITS:
2993 {
2994 const coll_t &cid = i.get_cid(op->cid);
2995 int bits = op->split_bits;
2996 r = _collection_set_bits(cid, bits);
2997 }
2998 break;
2999
3000 case Transaction::OP_COLL_HINT:
3001 {
3002 const coll_t &cid = i.get_cid(op->cid);
3003 uint32_t type = op->hint_type;
3004 bufferlist hint;
3005 i.decode_bl(hint);
3006 auto hiter = hint.cbegin();
3007 if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) {
3008 uint32_t pg_num;
3009 uint64_t num_objs;
3010 decode(pg_num, hiter);
3011 decode(num_objs, hiter);
3012 if (_check_replay_guard(cid, spos) > 0) {
3013 r = _collection_hint_expected_num_objs(cid, pg_num, num_objs, spos);
3014 }
3015 } else {
3016 // Ignore the hint
3017 dout(10) << "Unrecognized collection hint type: " << type << dendl;
3018 }
3019 }
3020 break;
3021
3022 case Transaction::OP_RMCOLL:
3023 {
3024 const coll_t &cid = i.get_cid(op->cid);
3025 tracepoint(objectstore, rmcoll_enter, osr_name);
3026 if (_check_replay_guard(cid, spos) > 0)
3027 r = _destroy_collection(cid);
3028 tracepoint(objectstore, rmcoll_exit, r);
3029 }
3030 break;
3031
3032 case Transaction::OP_COLL_ADD:
3033 {
3034 const coll_t &ocid = i.get_cid(op->cid);
3035 const coll_t &ncid = i.get_cid(op->dest_cid);
3036 const ghobject_t &oid = i.get_oid(op->oid);
3037
3038 ceph_assert(oid.hobj.pool >= -1);
3039
3040 // always followed by OP_COLL_REMOVE
3041 Transaction::Op *op2 = i.decode_op();
3042 const coll_t &ocid2 = i.get_cid(op2->cid);
3043 const ghobject_t &oid2 = i.get_oid(op2->oid);
3044 ceph_assert(op2->op == Transaction::OP_COLL_REMOVE);
3045 ceph_assert(ocid2 == ocid);
3046 ceph_assert(oid2 == oid);
3047
3048 tracepoint(objectstore, coll_add_enter);
3049 r = _collection_add(ncid, ocid, oid, spos);
3050 tracepoint(objectstore, coll_add_exit, r);
3051 spos.op++;
3052 if (r < 0)
3053 break;
3054 tracepoint(objectstore, coll_remove_enter, osr_name);
3055 if (_check_replay_guard(ocid, oid, spos) > 0)
3056 r = _remove(ocid, oid, spos);
3057 tracepoint(objectstore, coll_remove_exit, r);
3058 }
3059 break;
3060
3061 case Transaction::OP_COLL_MOVE:
3062 {
3063 // WARNING: this is deprecated and buggy; only here to replay old journals.
3064 const coll_t &ocid = i.get_cid(op->cid);
3065 const coll_t &ncid = i.get_cid(op->dest_cid);
3066 const ghobject_t &oid = i.get_oid(op->oid);
3067 tracepoint(objectstore, coll_move_enter);
3068 r = _collection_add(ocid, ncid, oid, spos);
3069 if (r == 0 &&
3070 (_check_replay_guard(ocid, oid, spos) > 0))
3071 r = _remove(ocid, oid, spos);
3072 tracepoint(objectstore, coll_move_exit, r);
3073 }
3074 break;
3075
3076 case Transaction::OP_COLL_MOVE_RENAME:
3077 {
3078 const coll_t &_oldcid = i.get_cid(op->cid);
3079 const ghobject_t &oldoid = i.get_oid(op->oid);
3080 const coll_t &_newcid = i.get_cid(op->dest_cid);
3081 const ghobject_t &newoid = i.get_oid(op->dest_oid);
3082 const coll_t &oldcid = !_need_temp_object_collection(_oldcid, oldoid) ?
3083 _oldcid : _oldcid.get_temp();
3084 const coll_t &newcid = !_need_temp_object_collection(_newcid, newoid) ?
3085 _oldcid : _newcid.get_temp();
3086 tracepoint(objectstore, coll_move_rename_enter);
3087 r = _collection_move_rename(oldcid, oldoid, newcid, newoid, spos);
3088 tracepoint(objectstore, coll_move_rename_exit, r);
3089 }
3090 break;
3091
3092 case Transaction::OP_TRY_RENAME:
3093 {
3094 const coll_t &_cid = i.get_cid(op->cid);
3095 const ghobject_t &oldoid = i.get_oid(op->oid);
3096 const ghobject_t &newoid = i.get_oid(op->dest_oid);
3097 const coll_t &oldcid = !_need_temp_object_collection(_cid, oldoid) ?
3098 _cid : _cid.get_temp();
3099 const coll_t &newcid = !_need_temp_object_collection(_cid, newoid) ?
3100 _cid : _cid.get_temp();
3101 tracepoint(objectstore, coll_try_rename_enter);
3102 r = _collection_move_rename(oldcid, oldoid, newcid, newoid, spos, true);
3103 tracepoint(objectstore, coll_try_rename_exit, r);
3104 }
3105 break;
3106
3107 case Transaction::OP_COLL_SETATTR:
3108 case Transaction::OP_COLL_RMATTR:
3109 ceph_abort_msg("collection attr methods no longer implemented");
3110 break;
3111
3112 case Transaction::OP_COLL_RENAME:
3113 {
3114 r = -EOPNOTSUPP;
3115 }
3116 break;
3117
3118 case Transaction::OP_OMAP_CLEAR:
3119 {
3120 const coll_t &_cid = i.get_cid(op->cid);
3121 const ghobject_t &oid = i.get_oid(op->oid);
3122 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3123 _cid : _cid.get_temp();
3124 tracepoint(objectstore, omap_clear_enter, osr_name);
3125 if (_check_replay_guard(cid, oid, spos) > 0)
3126 r = _omap_clear(cid, oid, spos);
3127 tracepoint(objectstore, omap_clear_exit, r);
3128 }
3129 break;
3130 case Transaction::OP_OMAP_SETKEYS:
3131 {
3132 const coll_t &_cid = i.get_cid(op->cid);
3133 const ghobject_t &oid = i.get_oid(op->oid);
3134 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3135 _cid : _cid.get_temp();
3136 map<string, bufferlist> aset;
3137 i.decode_attrset(aset);
3138 tracepoint(objectstore, omap_setkeys_enter, osr_name);
3139 if (_check_replay_guard(cid, oid, spos) > 0)
3140 r = _omap_setkeys(cid, oid, aset, spos);
3141 tracepoint(objectstore, omap_setkeys_exit, r);
3142 }
3143 break;
3144 case Transaction::OP_OMAP_RMKEYS:
3145 {
3146 const coll_t &_cid = i.get_cid(op->cid);
3147 const ghobject_t &oid = i.get_oid(op->oid);
3148 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3149 _cid : _cid.get_temp();
3150 set<string> keys;
3151 i.decode_keyset(keys);
3152 tracepoint(objectstore, omap_rmkeys_enter, osr_name);
3153 if (_check_replay_guard(cid, oid, spos) > 0)
3154 r = _omap_rmkeys(cid, oid, keys, spos);
3155 tracepoint(objectstore, omap_rmkeys_exit, r);
3156 }
3157 break;
3158 case Transaction::OP_OMAP_RMKEYRANGE:
3159 {
3160 const coll_t &_cid = i.get_cid(op->cid);
3161 const ghobject_t &oid = i.get_oid(op->oid);
3162 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3163 _cid : _cid.get_temp();
3164 string first, last;
3165 first = i.decode_string();
3166 last = i.decode_string();
3167 tracepoint(objectstore, omap_rmkeyrange_enter, osr_name);
3168 if (_check_replay_guard(cid, oid, spos) > 0)
3169 r = _omap_rmkeyrange(cid, oid, first, last, spos);
3170 tracepoint(objectstore, omap_rmkeyrange_exit, r);
3171 }
3172 break;
3173 case Transaction::OP_OMAP_SETHEADER:
3174 {
3175 const coll_t &_cid = i.get_cid(op->cid);
3176 const ghobject_t &oid = i.get_oid(op->oid);
3177 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3178 _cid : _cid.get_temp();
3179 bufferlist bl;
3180 i.decode_bl(bl);
3181 tracepoint(objectstore, omap_setheader_enter, osr_name);
3182 if (_check_replay_guard(cid, oid, spos) > 0)
3183 r = _omap_setheader(cid, oid, bl, spos);
3184 tracepoint(objectstore, omap_setheader_exit, r);
3185 }
3186 break;
3187 case Transaction::OP_SPLIT_COLLECTION:
3188 {
3189 ceph_abort_msg("not legacy journal; upgrade to firefly first");
3190 }
3191 break;
3192 case Transaction::OP_SPLIT_COLLECTION2:
3193 {
3194 coll_t cid = i.get_cid(op->cid);
3195 uint32_t bits = op->split_bits;
3196 uint32_t rem = op->split_rem;
3197 coll_t dest = i.get_cid(op->dest_cid);
3198 tracepoint(objectstore, split_coll2_enter, osr_name);
3199 r = _split_collection(cid, bits, rem, dest, spos);
3200 tracepoint(objectstore, split_coll2_exit, r);
3201 }
3202 break;
3203
3204 case Transaction::OP_MERGE_COLLECTION:
3205 {
3206 coll_t cid = i.get_cid(op->cid);
3207 uint32_t bits = op->split_bits;
3208 coll_t dest = i.get_cid(op->dest_cid);
3209 tracepoint(objectstore, merge_coll_enter, osr_name);
3210 r = _merge_collection(cid, bits, dest, spos);
3211 tracepoint(objectstore, merge_coll_exit, r);
3212 }
3213 break;
3214
3215 case Transaction::OP_SETALLOCHINT:
3216 {
3217 const coll_t &_cid = i.get_cid(op->cid);
3218 const ghobject_t &oid = i.get_oid(op->oid);
3219 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3220 _cid : _cid.get_temp();
3221 uint64_t expected_object_size = op->expected_object_size;
3222 uint64_t expected_write_size = op->expected_write_size;
3223 tracepoint(objectstore, setallochint_enter, osr_name);
3224 if (_check_replay_guard(cid, oid, spos) > 0)
3225 r = _set_alloc_hint(cid, oid, expected_object_size,
3226 expected_write_size);
3227 tracepoint(objectstore, setallochint_exit, r);
3228 }
3229 break;
3230
3231 default:
3232 derr << "bad op " << op->op << dendl;
3233 ceph_abort();
3234 }
3235
3236 if (r < 0) {
3237 bool ok = false;
3238
3239 if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE ||
3240 op->op == Transaction::OP_CLONE ||
3241 op->op == Transaction::OP_CLONERANGE2 ||
3242 op->op == Transaction::OP_COLL_ADD ||
3243 op->op == Transaction::OP_SETATTR ||
3244 op->op == Transaction::OP_SETATTRS ||
3245 op->op == Transaction::OP_RMATTR ||
3246 op->op == Transaction::OP_OMAP_SETKEYS ||
3247 op->op == Transaction::OP_OMAP_RMKEYS ||
3248 op->op == Transaction::OP_OMAP_RMKEYRANGE ||
3249 op->op == Transaction::OP_OMAP_SETHEADER))
3250 // -ENOENT is normally okay
3251 // ...including on a replayed OP_RMCOLL with checkpoint mode
3252 ok = true;
3253 if (r == -ENODATA)
3254 ok = true;
3255
3256 if (op->op == Transaction::OP_SETALLOCHINT)
3257 // Either EOPNOTSUPP or EINVAL most probably. EINVAL in most
3258 // cases means invalid hint size (e.g. too big, not a multiple
3259 // of block size, etc) or, at least on xfs, an attempt to set
3260 // or change it when the file is not empty. However,
3261 // OP_SETALLOCHINT is advisory, so ignore all errors.
3262 ok = true;
3263
3264 if (replaying && !backend->can_checkpoint()) {
3265 if (r == -EEXIST && op->op == Transaction::OP_MKCOLL) {
3266 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3267 ok = true;
3268 }
3269 if (r == -EEXIST && op->op == Transaction::OP_COLL_ADD) {
3270 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3271 ok = true;
3272 }
3273 if (r == -EEXIST && op->op == Transaction::OP_COLL_MOVE) {
3274 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3275 ok = true;
3276 }
3277 if (r == -ERANGE) {
3278 dout(10) << "tolerating ERANGE on replay" << dendl;
3279 ok = true;
3280 }
3281 if (r == -ENOENT) {
3282 dout(10) << "tolerating ENOENT on replay" << dendl;
3283 ok = true;
3284 }
3285 }
3286
3287 if (!ok) {
3288 const char *msg = "unexpected error code";
3289
3290 if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE ||
3291 op->op == Transaction::OP_CLONE ||
3292 op->op == Transaction::OP_CLONERANGE2)) {
3293 msg = "ENOENT on clone suggests osd bug";
3294 } else if (r == -ENOSPC) {
3295 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
3296 // by partially applying transactions.
3297 msg = "ENOSPC from disk filesystem, misconfigured cluster";
3298 } else if (r == -ENOTEMPTY) {
3299 msg = "ENOTEMPTY suggests garbage data in osd data dir";
3300 } else if (r == -EPERM) {
3301 msg = "EPERM suggests file(s) in osd data dir not owned by ceph user, or leveldb corruption";
3302 }
3303
3304 derr << " error " << cpp_strerror(r) << " not handled on operation " << op
3305 << " (" << spos << ", or op " << spos.op << ", counting from 0)" << dendl;
3306 dout(0) << msg << dendl;
3307 dout(0) << " transaction dump:\n";
3308 JSONFormatter f(true);
3309 f.open_object_section("transaction");
3310 t.dump(&f);
3311 f.close_section();
3312 f.flush(*_dout);
3313 *_dout << dendl;
3314
3315 if (r == -EMFILE) {
3316 dump_open_fds(cct);
3317 }
3318
3319 ceph_abort_msg("unexpected error");
3320 }
3321 }
3322
3323 spos.op++;
3324 }
3325
3326 _inject_failure();
3327 }
3328
3329 /*********************************************/
3330
3331
3332
3333 // --------------------
3334 // objects
3335
3336 bool FileStore::exists(CollectionHandle& ch, const ghobject_t& oid)
3337 {
3338 tracepoint(objectstore, exists_enter, ch->cid.c_str());
3339 auto osr = static_cast<OpSequencer*>(ch.get());
3340 osr->wait_for_apply(oid);
3341 struct stat st;
3342 bool retval = stat(ch, oid, &st) == 0;
3343 tracepoint(objectstore, exists_exit, retval);
3344 return retval;
3345 }
3346
3347 int FileStore::stat(
3348 CollectionHandle& ch, const ghobject_t& oid, struct stat *st, bool allow_eio)
3349 {
3350 tracepoint(objectstore, stat_enter, ch->cid.c_str());
3351 auto osr = static_cast<OpSequencer*>(ch.get());
3352 osr->wait_for_apply(oid);
3353 const coll_t& cid = !_need_temp_object_collection(ch->cid, oid) ? ch->cid : ch->cid.get_temp();
3354 int r = lfn_stat(cid, oid, st);
3355 ceph_assert(allow_eio || !m_filestore_fail_eio || r != -EIO);
3356 if (r < 0) {
3357 dout(10) << __FUNC__ << ": " << ch->cid << "/" << oid
3358 << " = " << r << dendl;
3359 } else {
3360 dout(10) << __FUNC__ << ": " << ch->cid << "/" << oid
3361 << " = " << r
3362 << " (size " << st->st_size << ")" << dendl;
3363 }
3364 if (cct->_conf->filestore_debug_inject_read_err &&
3365 debug_mdata_eio(oid)) {
3366 return -EIO;
3367 } else {
3368 tracepoint(objectstore, stat_exit, r);
3369 return r;
3370 }
3371 }
3372
3373 int FileStore::set_collection_opts(
3374 CollectionHandle& ch,
3375 const pool_opts_t& opts)
3376 {
3377 return -EOPNOTSUPP;
3378 }
3379
3380 int FileStore::read(
3381 CollectionHandle& ch,
3382 const ghobject_t& oid,
3383 uint64_t offset,
3384 size_t len,
3385 bufferlist& bl,
3386 uint32_t op_flags)
3387 {
3388 int got;
3389 tracepoint(objectstore, read_enter, ch->cid.c_str(), offset, len);
3390 const coll_t& cid = !_need_temp_object_collection(ch->cid, oid) ? ch->cid : ch->cid.get_temp();
3391
3392 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3393
3394 auto osr = static_cast<OpSequencer*>(ch.get());
3395 osr->wait_for_apply(oid);
3396
3397 FDRef fd;
3398 int r = lfn_open(cid, oid, false, &fd);
3399 if (r < 0) {
3400 dout(10) << __FUNC__ << ": (" << cid << "/" << oid << ") open error: "
3401 << cpp_strerror(r) << dendl;
3402 return r;
3403 }
3404
3405 if (offset == 0 && len == 0) {
3406 struct stat st;
3407 memset(&st, 0, sizeof(struct stat));
3408 int r = ::fstat(**fd, &st);
3409 ceph_assert(r == 0);
3410 len = st.st_size;
3411 }
3412
3413 #ifdef HAVE_POSIX_FADVISE
3414 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_RANDOM)
3415 posix_fadvise(**fd, offset, len, POSIX_FADV_RANDOM);
3416 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL)
3417 posix_fadvise(**fd, offset, len, POSIX_FADV_SEQUENTIAL);
3418 #endif
3419
3420 bufferptr bptr(len); // prealloc space for entire read
3421 got = safe_pread(**fd, bptr.c_str(), len, offset);
3422 if (got < 0) {
3423 dout(10) << __FUNC__ << ": (" << cid << "/" << oid << ") pread error: " << cpp_strerror(got) << dendl;
3424 lfn_close(fd);
3425 return got;
3426 }
3427 bptr.set_length(got); // properly size the buffer
3428 bl.clear();
3429 bl.push_back(std::move(bptr)); // put it in the target bufferlist
3430
3431 #ifdef HAVE_POSIX_FADVISE
3432 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED)
3433 posix_fadvise(**fd, offset, len, POSIX_FADV_DONTNEED);
3434 if (op_flags & (CEPH_OSD_OP_FLAG_FADVISE_RANDOM | CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL))
3435 posix_fadvise(**fd, offset, len, POSIX_FADV_NORMAL);
3436 #endif
3437
3438 if (m_filestore_sloppy_crc && (!replaying || backend->can_checkpoint())) {
3439 ostringstream ss;
3440 int errors = backend->_crc_verify_read(**fd, offset, got, bl, &ss);
3441 if (errors != 0) {
3442 dout(0) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~"
3443 << got << " ... BAD CRC:\n" << ss.str() << dendl;
3444 ceph_abort_msg("bad crc on read");
3445 }
3446 }
3447
3448 lfn_close(fd);
3449
3450 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~"
3451 << got << "/" << len << dendl;
3452 if (cct->_conf->filestore_debug_inject_read_err &&
3453 debug_data_eio(oid)) {
3454 return -EIO;
3455 } else if (oid.hobj.pool > 0 && /* FIXME, see #23029 */
3456 cct->_conf->filestore_debug_random_read_err &&
3457 (rand() % (int)(cct->_conf->filestore_debug_random_read_err *
3458 100.0)) == 0) {
3459 dout(0) << __func__ << ": inject random EIO" << dendl;
3460 return -EIO;
3461 } else {
3462 tracepoint(objectstore, read_exit, got);
3463 return got;
3464 }
3465 }
3466
3467 int FileStore::_do_fiemap(int fd, uint64_t offset, size_t len,
3468 map<uint64_t, uint64_t> *m)
3469 {
3470 uint64_t i;
3471 struct fiemap_extent *extent = nullptr;
3472 struct fiemap *fiemap = nullptr;
3473 int r = 0;
3474
3475 more:
3476 r = backend->do_fiemap(fd, offset, len, &fiemap);
3477 if (r < 0)
3478 return r;
3479
3480 if (fiemap->fm_mapped_extents == 0) {
3481 free(fiemap);
3482 return r;
3483 }
3484
3485 extent = &fiemap->fm_extents[0];
3486
3487 /* start where we were asked to start */
3488 if (extent->fe_logical < offset) {
3489 extent->fe_length -= offset - extent->fe_logical;
3490 extent->fe_logical = offset;
3491 }
3492
3493 i = 0;
3494
3495 struct fiemap_extent *last = nullptr;
3496 while (i < fiemap->fm_mapped_extents) {
3497 struct fiemap_extent *next = extent + 1;
3498
3499 dout(10) << __FUNC__ << ": fm_mapped_extents=" << fiemap->fm_mapped_extents
3500 << " fe_logical=" << extent->fe_logical << " fe_length=" << extent->fe_length << dendl;
3501
3502 /* try to merge extents */
3503 while ((i < fiemap->fm_mapped_extents - 1) &&
3504 (extent->fe_logical + extent->fe_length == next->fe_logical)) {
3505 next->fe_length += extent->fe_length;
3506 next->fe_logical = extent->fe_logical;
3507 extent = next;
3508 next = extent + 1;
3509 i++;
3510 }
3511
3512 if (extent->fe_logical + extent->fe_length > offset + len)
3513 extent->fe_length = offset + len - extent->fe_logical;
3514 (*m)[extent->fe_logical] = extent->fe_length;
3515 i++;
3516 last = extent++;
3517 }
3518 uint64_t xoffset = last->fe_logical + last->fe_length - offset;
3519 offset = last->fe_logical + last->fe_length;
3520 len -= xoffset;
3521 const bool is_last = (last->fe_flags & FIEMAP_EXTENT_LAST) || (len == 0);
3522 free(fiemap);
3523 if (!is_last) {
3524 goto more;
3525 }
3526
3527 return r;
3528 }
3529
3530 int FileStore::_do_seek_hole_data(int fd, uint64_t offset, size_t len,
3531 map<uint64_t, uint64_t> *m)
3532 {
3533 #if defined(__linux__) && defined(SEEK_HOLE) && defined(SEEK_DATA)
3534 off_t hole_pos, data_pos;
3535 int r = 0;
3536
3537 // If lseek fails with errno setting to be ENXIO, this means the current
3538 // file offset is beyond the end of the file.
3539 off_t start = offset;
3540 while(start < (off_t)(offset + len)) {
3541 data_pos = lseek(fd, start, SEEK_DATA);
3542 if (data_pos < 0) {
3543 if (errno == ENXIO)
3544 break;
3545 else {
3546 r = -errno;
3547 dout(10) << "failed to lseek: " << cpp_strerror(r) << dendl;
3548 return r;
3549 }
3550 } else if (data_pos > (off_t)(offset + len)) {
3551 break;
3552 }
3553
3554 hole_pos = lseek(fd, data_pos, SEEK_HOLE);
3555 if (hole_pos < 0) {
3556 if (errno == ENXIO) {
3557 break;
3558 } else {
3559 r = -errno;
3560 dout(10) << "failed to lseek: " << cpp_strerror(r) << dendl;
3561 return r;
3562 }
3563 }
3564
3565 if (hole_pos >= (off_t)(offset + len)) {
3566 (*m)[data_pos] = offset + len - data_pos;
3567 break;
3568 }
3569 (*m)[data_pos] = hole_pos - data_pos;
3570 start = hole_pos;
3571 }
3572
3573 return r;
3574 #else
3575 (*m)[offset] = len;
3576 return 0;
3577 #endif
3578 }
3579
3580 int FileStore::fiemap(CollectionHandle& ch, const ghobject_t& oid,
3581 uint64_t offset, size_t len,
3582 bufferlist& bl)
3583 {
3584 map<uint64_t, uint64_t> exomap;
3585 int r = fiemap(ch, oid, offset, len, exomap);
3586 if (r >= 0) {
3587 encode(exomap, bl);
3588 }
3589 return r;
3590 }
3591
3592 int FileStore::fiemap(CollectionHandle& ch, const ghobject_t& oid,
3593 uint64_t offset, size_t len,
3594 map<uint64_t, uint64_t>& destmap)
3595 {
3596 tracepoint(objectstore, fiemap_enter, ch->cid.c_str(), offset, len);
3597 const coll_t& cid = !_need_temp_object_collection(ch->cid, oid) ? ch->cid : ch->cid.get_temp();
3598 destmap.clear();
3599
3600 if ((!backend->has_seek_data_hole() && !backend->has_fiemap()) ||
3601 len <= (size_t)m_filestore_fiemap_threshold) {
3602 destmap[offset] = len;
3603 return 0;
3604 }
3605
3606 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3607
3608 auto osr = static_cast<OpSequencer*>(ch.get());
3609 osr->wait_for_apply(oid);
3610
3611 FDRef fd;
3612
3613 int r = lfn_open(cid, oid, false, &fd);
3614 if (r < 0) {
3615 dout(10) << "read couldn't open " << cid << "/" << oid << ": " << cpp_strerror(r) << dendl;
3616 goto done;
3617 }
3618
3619 if (backend->has_seek_data_hole()) {
3620 dout(15) << "seek_data/seek_hole " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3621 r = _do_seek_hole_data(**fd, offset, len, &destmap);
3622 } else if (backend->has_fiemap()) {
3623 dout(15) << "fiemap ioctl" << cid << "/" << oid << " " << offset << "~" << len << dendl;
3624 r = _do_fiemap(**fd, offset, len, &destmap);
3625 }
3626
3627 lfn_close(fd);
3628
3629 done:
3630
3631 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << " num_extents=" << destmap.size() << " " << destmap << dendl;
3632 if (r == -EIO && m_filestore_fail_eio) handle_eio();
3633 tracepoint(objectstore, fiemap_exit, r);
3634 return r;
3635 }
3636
3637 int FileStore::_remove(const coll_t& cid, const ghobject_t& oid,
3638 const SequencerPosition &spos)
3639 {
3640 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
3641 int r = lfn_unlink(cid, oid, spos);
3642 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
3643 return r;
3644 }
3645
3646 int FileStore::_truncate(const coll_t& cid, const ghobject_t& oid, uint64_t size)
3647 {
3648 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " size " << size << dendl;
3649 int r = lfn_truncate(cid, oid, size);
3650 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " size " << size << " = " << r << dendl;
3651 return r;
3652 }
3653
3654
3655 int FileStore::_touch(const coll_t& cid, const ghobject_t& oid)
3656 {
3657 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
3658
3659 FDRef fd;
3660 int r = lfn_open(cid, oid, true, &fd);
3661 if (r < 0) {
3662 return r;
3663 } else {
3664 lfn_close(fd);
3665 }
3666 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
3667 return r;
3668 }
3669
3670 int FileStore::_write(const coll_t& cid, const ghobject_t& oid,
3671 uint64_t offset, size_t len,
3672 const bufferlist& bl, uint32_t fadvise_flags)
3673 {
3674 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3675 int r;
3676
3677 FDRef fd;
3678 r = lfn_open(cid, oid, true, &fd);
3679 if (r < 0) {
3680 dout(0) << __FUNC__ << ": couldn't open " << cid << "/"
3681 << oid << ": "
3682 << cpp_strerror(r) << dendl;
3683 goto out;
3684 }
3685
3686 // write
3687 r = bl.write_fd(**fd, offset);
3688 if (r < 0) {
3689 derr << __FUNC__ << ": write_fd on " << cid << "/" << oid
3690 << " error: " << cpp_strerror(r) << dendl;
3691 lfn_close(fd);
3692 goto out;
3693 }
3694 r = bl.length();
3695
3696 if (r >= 0 && m_filestore_sloppy_crc) {
3697 int rc = backend->_crc_update_write(**fd, offset, len, bl);
3698 ceph_assert(rc >= 0);
3699 }
3700
3701 if (replaying || m_disable_wbthrottle) {
3702 if (fadvise_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED) {
3703 #ifdef HAVE_POSIX_FADVISE
3704 posix_fadvise(**fd, 0, 0, POSIX_FADV_DONTNEED);
3705 #endif
3706 }
3707 } else {
3708 wbthrottle.queue_wb(fd, oid, offset, len,
3709 fadvise_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
3710 }
3711
3712 lfn_close(fd);
3713
3714 out:
3715 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << dendl;
3716 return r;
3717 }
3718
3719 int FileStore::_zero(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len)
3720 {
3721 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3722 int ret = 0;
3723
3724 if (cct->_conf->filestore_punch_hole) {
3725 #ifdef CEPH_HAVE_FALLOCATE
3726 # if !defined(__APPLE__) && !defined(__FreeBSD__)
3727 # ifdef FALLOC_FL_KEEP_SIZE
3728 // first try to punch a hole.
3729 FDRef fd;
3730 ret = lfn_open(cid, oid, false, &fd);
3731 if (ret < 0) {
3732 goto out;
3733 }
3734
3735 struct stat st;
3736 ret = ::fstat(**fd, &st);
3737 if (ret < 0) {
3738 ret = -errno;
3739 lfn_close(fd);
3740 goto out;
3741 }
3742
3743 // first try fallocate
3744 ret = fallocate(**fd, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE,
3745 offset, len);
3746 if (ret < 0) {
3747 ret = -errno;
3748 } else {
3749 // ensure we extend file size, if needed
3750 if (len > 0 && offset + len > (uint64_t)st.st_size) {
3751 ret = ::ftruncate(**fd, offset + len);
3752 if (ret < 0) {
3753 ret = -errno;
3754 lfn_close(fd);
3755 goto out;
3756 }
3757 }
3758 }
3759 lfn_close(fd);
3760
3761 if (ret >= 0 && m_filestore_sloppy_crc) {
3762 int rc = backend->_crc_update_zero(**fd, offset, len);
3763 ceph_assert(rc >= 0);
3764 }
3765
3766 if (ret == 0)
3767 goto out; // yay!
3768 if (ret != -EOPNOTSUPP)
3769 goto out; // some other error
3770 # endif
3771 # endif
3772 #endif
3773 }
3774
3775 // lame, kernel is old and doesn't support it.
3776 // write zeros.. yuck!
3777 dout(20) << __FUNC__ << ": falling back to writing zeros" << dendl;
3778 {
3779 bufferlist bl;
3780 bl.append_zero(len);
3781 ret = _write(cid, oid, offset, len, bl);
3782 }
3783
3784 #ifdef CEPH_HAVE_FALLOCATE
3785 # if !defined(__APPLE__) && !defined(__FreeBSD__)
3786 # ifdef FALLOC_FL_KEEP_SIZE
3787 out:
3788 # endif
3789 # endif
3790 #endif
3791 dout(20) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << ret << dendl;
3792 return ret;
3793 }
3794
3795 int FileStore::_clone(const coll_t& cid, const ghobject_t& oldoid, const ghobject_t& newoid,
3796 const SequencerPosition& spos)
3797 {
3798 dout(15) << __FUNC__ << ": " << cid << "/" << oldoid << " -> " << cid << "/" << newoid << dendl;
3799
3800 if (_check_replay_guard(cid, newoid, spos) < 0)
3801 return 0;
3802
3803 int r;
3804 FDRef o, n;
3805 {
3806 Index index;
3807 r = lfn_open(cid, oldoid, false, &o, &index);
3808 if (r < 0) {
3809 goto out2;
3810 }
3811 ceph_assert(index.index);
3812 std::unique_lock l{(index.index)->access_lock};
3813
3814 r = lfn_open(cid, newoid, true, &n, &index);
3815 if (r < 0) {
3816 goto out;
3817 }
3818 r = ::ftruncate(**n, 0);
3819 if (r < 0) {
3820 r = -errno;
3821 goto out3;
3822 }
3823 struct stat st;
3824 r = ::fstat(**o, &st);
3825 if (r < 0) {
3826 r = -errno;
3827 goto out3;
3828 }
3829
3830 r = _do_clone_range(**o, **n, 0, st.st_size, 0);
3831 if (r < 0) {
3832 goto out3;
3833 }
3834
3835 dout(20) << "objectmap clone" << dendl;
3836 r = object_map->clone(oldoid, newoid, &spos);
3837 if (r < 0 && r != -ENOENT)
3838 goto out3;
3839 }
3840
3841 {
3842 char buf[2];
3843 map<string, bufferptr> aset;
3844 r = _fgetattrs(**o, aset);
3845 if (r < 0)
3846 goto out3;
3847
3848 r = chain_fgetxattr(**o, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
3849 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
3850 r = chain_fsetxattr<true, true>(**n, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
3851 sizeof(XATTR_NO_SPILL_OUT));
3852 } else {
3853 r = chain_fsetxattr<true, true>(**n, XATTR_SPILL_OUT_NAME, XATTR_SPILL_OUT,
3854 sizeof(XATTR_SPILL_OUT));
3855 }
3856 if (r < 0)
3857 goto out3;
3858
3859 r = _fsetattrs(**n, aset);
3860 if (r < 0)
3861 goto out3;
3862 }
3863
3864 // clone is non-idempotent; record our work.
3865 _set_replay_guard(**n, spos, &newoid);
3866
3867 out3:
3868 lfn_close(n);
3869 out:
3870 lfn_close(o);
3871 out2:
3872 dout(10) << __FUNC__ << ": " << cid << "/" << oldoid << " -> " << cid << "/" << newoid << " = " << r << dendl;
3873 if (r == -EIO && m_filestore_fail_eio) handle_eio();
3874 return r;
3875 }
3876
3877 int FileStore::_do_clone_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff)
3878 {
3879 dout(20) << __FUNC__ << ": copy " << srcoff << "~" << len << " to " << dstoff << dendl;
3880 return backend->clone_range(from, to, srcoff, len, dstoff);
3881 }
3882
3883 int FileStore::_do_sparse_copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff)
3884 {
3885 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << dendl;
3886 int r = 0;
3887 map<uint64_t, uint64_t> exomap;
3888 // fiemap doesn't allow zero length
3889 if (len == 0)
3890 return 0;
3891
3892 if (backend->has_seek_data_hole()) {
3893 dout(15) << "seek_data/seek_hole " << from << " " << srcoff << "~" << len << dendl;
3894 r = _do_seek_hole_data(from, srcoff, len, &exomap);
3895 } else if (backend->has_fiemap()) {
3896 dout(15) << "fiemap ioctl" << from << " " << srcoff << "~" << len << dendl;
3897 r = _do_fiemap(from, srcoff, len, &exomap);
3898 }
3899
3900
3901 int64_t written = 0;
3902 if (r < 0)
3903 goto out;
3904
3905 for (map<uint64_t, uint64_t>::iterator miter = exomap.begin(); miter != exomap.end(); ++miter) {
3906 uint64_t it_off = miter->first - srcoff + dstoff;
3907 r = _do_copy_range(from, to, miter->first, miter->second, it_off, true);
3908 if (r < 0) {
3909 derr << __FUNC__ << ": copy error at " << miter->first << "~" << miter->second
3910 << " to " << it_off << ", " << cpp_strerror(r) << dendl;
3911 break;
3912 }
3913 written += miter->second;
3914 }
3915
3916 if (r >= 0) {
3917 if (m_filestore_sloppy_crc) {
3918 int rc = backend->_crc_update_clone_range(from, to, srcoff, len, dstoff);
3919 ceph_assert(rc >= 0);
3920 }
3921 struct stat st;
3922 r = ::fstat(to, &st);
3923 if (r < 0) {
3924 r = -errno;
3925 derr << __FUNC__ << ": fstat error at " << to << " " << cpp_strerror(r) << dendl;
3926 goto out;
3927 }
3928 if (st.st_size < (int)(dstoff + len)) {
3929 r = ::ftruncate(to, dstoff + len);
3930 if (r < 0) {
3931 r = -errno;
3932 derr << __FUNC__ << ": ftruncate error at " << dstoff+len << " " << cpp_strerror(r) << dendl;
3933 goto out;
3934 }
3935 }
3936 r = written;
3937 }
3938
3939 out:
3940 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
3941 return r;
3942 }
3943
3944 int FileStore::_do_copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff, bool skip_sloppycrc)
3945 {
3946 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << dendl;
3947 int r = 0;
3948 loff_t pos = srcoff;
3949 loff_t end = srcoff + len;
3950 int buflen = 4096 * 16; //limit by pipe max size.see fcntl
3951
3952 #ifdef CEPH_HAVE_SPLICE
3953 if (backend->has_splice()) {
3954 int pipefd[2];
3955 if (pipe_cloexec(pipefd, 0) < 0) {
3956 int e = errno;
3957 derr << " pipe " << " got " << cpp_strerror(e) << dendl;
3958 return -e;
3959 }
3960
3961 loff_t dstpos = dstoff;
3962 while (pos < end) {
3963 int l = std::min<int>(end-pos, buflen);
3964 r = safe_splice(from, &pos, pipefd[1], nullptr, l, SPLICE_F_NONBLOCK);
3965 dout(10) << " safe_splice read from " << pos << "~" << l << " got " << r << dendl;
3966 if (r < 0) {
3967 derr << __FUNC__ << ": safe_splice read error at " << pos << "~" << len
3968 << ", " << cpp_strerror(r) << dendl;
3969 break;
3970 }
3971 if (r == 0) {
3972 // hrm, bad source range, wtf.
3973 r = -ERANGE;
3974 derr << __FUNC__ << ": got short read result at " << pos
3975 << " of fd " << from << " len " << len << dendl;
3976 break;
3977 }
3978
3979 r = safe_splice(pipefd[0], nullptr, to, &dstpos, r, 0);
3980 dout(10) << " safe_splice write to " << to << " len " << r
3981 << " got " << r << dendl;
3982 if (r < 0) {
3983 derr << __FUNC__ << ": write error at " << pos << "~"
3984 << r << ", " << cpp_strerror(r) << dendl;
3985 break;
3986 }
3987 }
3988 close(pipefd[0]);
3989 close(pipefd[1]);
3990 } else
3991 #endif
3992 {
3993 int64_t actual;
3994
3995 actual = ::lseek64(from, srcoff, SEEK_SET);
3996 if (actual != (int64_t)srcoff) {
3997 if (actual < 0)
3998 r = -errno;
3999 else
4000 r = -EINVAL;
4001 derr << "lseek64 to " << srcoff << " got " << cpp_strerror(r) << dendl;
4002 return r;
4003 }
4004 actual = ::lseek64(to, dstoff, SEEK_SET);
4005 if (actual != (int64_t)dstoff) {
4006 if (actual < 0)
4007 r = -errno;
4008 else
4009 r = -EINVAL;
4010 derr << "lseek64 to " << dstoff << " got " << cpp_strerror(r) << dendl;
4011 return r;
4012 }
4013
4014 char buf[buflen];
4015 while (pos < end) {
4016 int l = std::min<int>(end-pos, buflen);
4017 r = ::read(from, buf, l);
4018 dout(25) << " read from " << pos << "~" << l << " got " << r << dendl;
4019 if (r < 0) {
4020 if (errno == EINTR) {
4021 continue;
4022 } else {
4023 r = -errno;
4024 derr << __FUNC__ << ": read error at " << pos << "~" << len
4025 << ", " << cpp_strerror(r) << dendl;
4026 break;
4027 }
4028 }
4029 if (r == 0) {
4030 // hrm, bad source range, wtf.
4031 r = -ERANGE;
4032 derr << __FUNC__ << ": got short read result at " << pos
4033 << " of fd " << from << " len " << len << dendl;
4034 break;
4035 }
4036 int op = 0;
4037 while (op < r) {
4038 int r2 = safe_write(to, buf+op, r-op);
4039 dout(25) << " write to " << to << " len " << (r-op)
4040 << " got " << r2 << dendl;
4041 if (r2 < 0) {
4042 r = r2;
4043 derr << __FUNC__ << ": write error at " << pos << "~"
4044 << r-op << ", " << cpp_strerror(r) << dendl;
4045
4046 break;
4047 }
4048 op += (r-op);
4049 }
4050 if (r < 0)
4051 break;
4052 pos += r;
4053 }
4054 }
4055
4056 if (r < 0 && replaying) {
4057 ceph_assert(r == -ERANGE);
4058 derr << __FUNC__ << ": short source tolerated because we are replaying" << dendl;
4059 r = len;
4060 }
4061 ceph_assert(replaying || pos == end);
4062 if (r >= 0 && !skip_sloppycrc && m_filestore_sloppy_crc) {
4063 int rc = backend->_crc_update_clone_range(from, to, srcoff, len, dstoff);
4064 ceph_assert(rc >= 0);
4065 }
4066 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
4067 return r;
4068 }
4069
4070 int FileStore::_clone_range(const coll_t& oldcid, const ghobject_t& oldoid, const coll_t& newcid, const ghobject_t& newoid,
4071 uint64_t srcoff, uint64_t len, uint64_t dstoff,
4072 const SequencerPosition& spos)
4073 {
4074 dout(15) << __FUNC__ << ": " << oldcid << "/" << oldoid << " -> " << newcid << "/" << newoid << " " << srcoff << "~" << len << " to " << dstoff << dendl;
4075
4076 if (_check_replay_guard(newcid, newoid, spos) < 0)
4077 return 0;
4078
4079 int r;
4080 FDRef o, n;
4081 r = lfn_open(oldcid, oldoid, false, &o);
4082 if (r < 0) {
4083 goto out2;
4084 }
4085 r = lfn_open(newcid, newoid, true, &n);
4086 if (r < 0) {
4087 goto out;
4088 }
4089 r = _do_clone_range(**o, **n, srcoff, len, dstoff);
4090 if (r < 0) {
4091 goto out3;
4092 }
4093
4094 // clone is non-idempotent; record our work.
4095 _set_replay_guard(**n, spos, &newoid);
4096
4097 out3:
4098 lfn_close(n);
4099 out:
4100 lfn_close(o);
4101 out2:
4102 dout(10) << __FUNC__ << ": " << oldcid << "/" << oldoid << " -> " << newcid << "/" << newoid << " "
4103 << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
4104 return r;
4105 }
4106
4107 class SyncEntryTimeout : public Context {
4108 public:
4109 CephContext* cct;
4110 explicit SyncEntryTimeout(CephContext* cct, int commit_timeo)
4111 : cct(cct), m_commit_timeo(commit_timeo)
4112 {
4113 }
4114
4115 void finish(int r) override {
4116 BackTrace *bt = new BackTrace(1);
4117 generic_dout(-1) << "FileStore: sync_entry timed out after "
4118 << m_commit_timeo << " seconds.\n";
4119 bt->print(*_dout);
4120 *_dout << dendl;
4121 delete bt;
4122 bt = nullptr;
4123 ceph_abort();
4124 }
4125 private:
4126 int m_commit_timeo;
4127 };
4128
4129 void FileStore::sync_entry()
4130 {
4131 std::unique_lock l{lock};
4132 while (!stop) {
4133 auto min_interval = ceph::make_timespan(m_filestore_min_sync_interval);
4134 auto max_interval = ceph::make_timespan(m_filestore_max_sync_interval);
4135 auto startwait = ceph::real_clock::now();
4136 if (!force_sync) {
4137 dout(20) << __FUNC__ << ": waiting for max_interval " << max_interval << dendl;
4138 sync_cond.wait_for(l, max_interval);
4139 } else {
4140 dout(20) << __FUNC__ << ": not waiting, force_sync set" << dendl;
4141 }
4142
4143 if (force_sync) {
4144 dout(20) << __FUNC__ << ": force_sync set" << dendl;
4145 force_sync = false;
4146 } else if (stop) {
4147 dout(20) << __FUNC__ << ": stop set" << dendl;
4148 break;
4149 } else {
4150 // wait for at least the min interval
4151 auto woke = ceph::real_clock::now() - startwait;
4152 dout(20) << __FUNC__ << ": woke after " << woke << dendl;
4153 if (woke < min_interval) {
4154 auto t = min_interval - woke;
4155 dout(20) << __FUNC__ << ": waiting for another " << t
4156 << " to reach min interval " << min_interval << dendl;
4157 sync_cond.wait_for(l, t);
4158 }
4159 }
4160
4161 list<Context*> fin;
4162 again:
4163 fin.swap(sync_waiters);
4164 l.unlock();
4165
4166 op_tp.pause();
4167 if (apply_manager.commit_start()) {
4168 auto start = ceph::real_clock::now();
4169 uint64_t cp = apply_manager.get_committing_seq();
4170
4171 sync_entry_timeo_lock.lock();
4172 SyncEntryTimeout *sync_entry_timeo =
4173 new SyncEntryTimeout(cct, m_filestore_commit_timeout);
4174 if (!timer.add_event_after(m_filestore_commit_timeout,
4175 sync_entry_timeo)) {
4176 sync_entry_timeo = nullptr;
4177 }
4178 sync_entry_timeo_lock.unlock();
4179
4180 logger->set(l_filestore_committing, 1);
4181
4182 dout(15) << __FUNC__ << ": committing " << cp << dendl;
4183 stringstream errstream;
4184 if (cct->_conf->filestore_debug_omap_check && !object_map->check(errstream)) {
4185 derr << errstream.str() << dendl;
4186 ceph_abort();
4187 }
4188
4189 if (backend->can_checkpoint()) {
4190 int err = write_op_seq(op_fd, cp);
4191 if (err < 0) {
4192 derr << "Error during write_op_seq: " << cpp_strerror(err) << dendl;
4193 ceph_abort_msg("error during write_op_seq");
4194 }
4195
4196 char s[NAME_MAX];
4197 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)cp);
4198 uint64_t cid = 0;
4199 err = backend->create_checkpoint(s, &cid);
4200 if (err < 0) {
4201 int err = errno;
4202 derr << "snap create '" << s << "' got error " << err << dendl;
4203 ceph_assert(err == 0);
4204 }
4205
4206 snaps.push_back(cp);
4207 apply_manager.commit_started();
4208 op_tp.unpause();
4209
4210 if (cid > 0) {
4211 dout(20) << " waiting for checkpoint " << cid << " to complete" << dendl;
4212 err = backend->sync_checkpoint(cid);
4213 if (err < 0) {
4214 derr << "ioctl WAIT_SYNC got " << cpp_strerror(err) << dendl;
4215 ceph_abort_msg("wait_sync got error");
4216 }
4217 dout(20) << " done waiting for checkpoint " << cid << " to complete" << dendl;
4218 }
4219 } else {
4220 apply_manager.commit_started();
4221 op_tp.unpause();
4222
4223 int err = object_map->sync();
4224 if (err < 0) {
4225 derr << "object_map sync got " << cpp_strerror(err) << dendl;
4226 ceph_abort_msg("object_map sync returned error");
4227 }
4228
4229 err = backend->syncfs();
4230 if (err < 0) {
4231 derr << "syncfs got " << cpp_strerror(err) << dendl;
4232 ceph_abort_msg("syncfs returned error");
4233 }
4234
4235 err = write_op_seq(op_fd, cp);
4236 if (err < 0) {
4237 derr << "Error during write_op_seq: " << cpp_strerror(err) << dendl;
4238 ceph_abort_msg("error during write_op_seq");
4239 }
4240 err = ::fsync(op_fd);
4241 if (err < 0) {
4242 derr << "Error during fsync of op_seq: " << cpp_strerror(err) << dendl;
4243 ceph_abort_msg("error during fsync of op_seq");
4244 }
4245 }
4246
4247 auto done = ceph::real_clock::now();
4248 auto lat = done - start;
4249 auto dur = done - startwait;
4250 dout(10) << __FUNC__ << ": commit took " << lat << ", interval was " << dur << dendl;
4251 utime_t max_pause_lat = logger->tget(l_filestore_sync_pause_max_lat);
4252 if (max_pause_lat < utime_t{dur - lat}) {
4253 logger->tinc(l_filestore_sync_pause_max_lat, dur - lat);
4254 }
4255
4256 logger->inc(l_filestore_commitcycle);
4257 logger->tinc(l_filestore_commitcycle_latency, lat);
4258 logger->tinc(l_filestore_commitcycle_interval, dur);
4259
4260 apply_manager.commit_finish();
4261 if (!m_disable_wbthrottle) {
4262 wbthrottle.clear();
4263 }
4264
4265 logger->set(l_filestore_committing, 0);
4266
4267 // remove old snaps?
4268 if (backend->can_checkpoint()) {
4269 char s[NAME_MAX];
4270 while (snaps.size() > 2) {
4271 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)snaps.front());
4272 snaps.pop_front();
4273 dout(10) << "removing snap '" << s << "'" << dendl;
4274 int r = backend->destroy_checkpoint(s);
4275 if (r) {
4276 int err = errno;
4277 derr << "unable to destroy snap '" << s << "' got " << cpp_strerror(err) << dendl;
4278 }
4279 }
4280 }
4281
4282 dout(15) << __FUNC__ << ": committed to op_seq " << cp << dendl;
4283
4284 if (sync_entry_timeo) {
4285 std::lock_guard lock{sync_entry_timeo_lock};
4286 timer.cancel_event(sync_entry_timeo);
4287 }
4288 } else {
4289 op_tp.unpause();
4290 }
4291
4292 l.lock();
4293 finish_contexts(cct, fin, 0);
4294 fin.clear();
4295 if (!sync_waiters.empty()) {
4296 dout(10) << __FUNC__ << ": more waiters, committing again" << dendl;
4297 goto again;
4298 }
4299 if (!stop && journal && journal->should_commit_now()) {
4300 dout(10) << __FUNC__ << ": journal says we should commit again (probably is/was full)" << dendl;
4301 goto again;
4302 }
4303 }
4304 stop = false;
4305 }
4306
4307 void FileStore::do_force_sync()
4308 {
4309 dout(10) << __FUNC__ << dendl;
4310 std::lock_guard l{lock};
4311 force_sync = true;
4312 sync_cond.notify_all();
4313 }
4314
4315 void FileStore::start_sync(Context *onsafe)
4316 {
4317 std::lock_guard l{lock};
4318 sync_waiters.push_back(onsafe);
4319 sync_cond.notify_all();
4320 force_sync = true;
4321 dout(10) << __FUNC__ << dendl;
4322 }
4323
4324 void FileStore::sync()
4325 {
4326 ceph::mutex m = ceph::make_mutex("FileStore::sync");
4327 ceph::condition_variable c;
4328 bool done;
4329 C_SafeCond *fin = new C_SafeCond(m, c, &done);
4330
4331 start_sync(fin);
4332
4333 std::unique_lock l{m};
4334 c.wait(l, [&done, this] {
4335 if (!done) {
4336 dout(10) << "sync waiting" << dendl;
4337 }
4338 return done;
4339 });
4340 dout(10) << "sync done" << dendl;
4341 }
4342
4343 void FileStore::_flush_op_queue()
4344 {
4345 dout(10) << __FUNC__ << ": draining op tp" << dendl;
4346 op_wq.drain();
4347 dout(10) << __FUNC__ << ": waiting for apply finisher" << dendl;
4348 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
4349 (*it)->wait_for_empty();
4350 }
4351 }
4352
4353 /*
4354 * flush - make every queued write readable
4355 */
4356 void FileStore::flush()
4357 {
4358 dout(10) << __FUNC__ << dendl;
4359
4360 if (cct->_conf->filestore_blackhole) {
4361 // wait forever
4362 ceph::mutex lock = ceph::make_mutex("FileStore::flush::lock");
4363 ceph::condition_variable cond;
4364 std::unique_lock l{lock};
4365 cond.wait(l, [] {return false;} );
4366 ceph_abort();
4367 }
4368
4369 if (m_filestore_journal_writeahead) {
4370 if (journal)
4371 journal->flush();
4372 dout(10) << __FUNC__ << ": draining ondisk finisher" << dendl;
4373 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
4374 (*it)->wait_for_empty();
4375 }
4376 }
4377
4378 _flush_op_queue();
4379 dout(10) << __FUNC__ << ": complete" << dendl;
4380 }
4381
4382 /*
4383 * sync_and_flush - make every queued write readable AND committed to disk
4384 */
4385 void FileStore::sync_and_flush()
4386 {
4387 dout(10) << __FUNC__ << dendl;
4388
4389 if (m_filestore_journal_writeahead) {
4390 if (journal)
4391 journal->flush();
4392 _flush_op_queue();
4393 } else {
4394 // includes m_filestore_journal_parallel
4395 _flush_op_queue();
4396 sync();
4397 }
4398 dout(10) << __FUNC__ << ": done" << dendl;
4399 }
4400
4401 int FileStore::flush_journal()
4402 {
4403 dout(10) << __FUNC__ << dendl;
4404 sync_and_flush();
4405 sync();
4406 return 0;
4407 }
4408
4409 int FileStore::snapshot(const string& name)
4410 {
4411 dout(10) << __FUNC__ << ": " << name << dendl;
4412 sync_and_flush();
4413
4414 if (!backend->can_checkpoint()) {
4415 dout(0) << __FUNC__ << ": " << name << " failed, not supported" << dendl;
4416 return -EOPNOTSUPP;
4417 }
4418
4419 char s[NAME_MAX];
4420 snprintf(s, sizeof(s), CLUSTER_SNAP_ITEM, name.c_str());
4421
4422 int r = backend->create_checkpoint(s, nullptr);
4423 if (r) {
4424 derr << __FUNC__ << ": " << name << " failed: " << cpp_strerror(r) << dendl;
4425 }
4426
4427 return r;
4428 }
4429
4430 // -------------------------------
4431 // attributes
4432
4433 int FileStore::_fgetattr(int fd, const char *name, bufferptr& bp)
4434 {
4435 char val[CHAIN_XATTR_MAX_BLOCK_LEN];
4436 int l = chain_fgetxattr(fd, name, val, sizeof(val));
4437 if (l >= 0) {
4438 bp = buffer::create(l);
4439 memcpy(bp.c_str(), val, l);
4440 } else if (l == -ERANGE) {
4441 l = chain_fgetxattr(fd, name, 0, 0);
4442 if (l > 0) {
4443 bp = buffer::create(l);
4444 l = chain_fgetxattr(fd, name, bp.c_str(), l);
4445 }
4446 }
4447 ceph_assert(!m_filestore_fail_eio || l != -EIO);
4448 return l;
4449 }
4450
4451 int FileStore::_fgetattrs(int fd, map<string,bufferptr>& aset)
4452 {
4453 // get attr list
4454 char names1[100];
4455 int len = chain_flistxattr(fd, names1, sizeof(names1)-1);
4456 char *names2 = 0;
4457 char *name = 0;
4458 if (len == -ERANGE) {
4459 len = chain_flistxattr(fd, 0, 0);
4460 if (len < 0) {
4461 ceph_assert(!m_filestore_fail_eio || len != -EIO);
4462 return len;
4463 }
4464 dout(10) << " -ERANGE, len is " << len << dendl;
4465 names2 = new char[len+1];
4466 len = chain_flistxattr(fd, names2, len);
4467 dout(10) << " -ERANGE, got " << len << dendl;
4468 if (len < 0) {
4469 ceph_assert(!m_filestore_fail_eio || len != -EIO);
4470 delete[] names2;
4471 return len;
4472 }
4473 name = names2;
4474 } else if (len < 0) {
4475 ceph_assert(!m_filestore_fail_eio || len != -EIO);
4476 return len;
4477 } else {
4478 name = names1;
4479 }
4480 name[len] = 0;
4481
4482 char *end = name + len;
4483 while (name < end) {
4484 char *attrname = name;
4485 if (parse_attrname(&name)) {
4486 if (*name) {
4487 dout(20) << __FUNC__ << ": " << fd << " getting '" << name << "'" << dendl;
4488 int r = _fgetattr(fd, attrname, aset[name]);
4489 if (r < 0) {
4490 delete[] names2;
4491 return r;
4492 }
4493 }
4494 }
4495 name += strlen(name) + 1;
4496 }
4497
4498 delete[] names2;
4499 return 0;
4500 }
4501
4502 int FileStore::_fsetattrs(int fd, map<string, bufferptr> &aset)
4503 {
4504 for (map<string, bufferptr>::iterator p = aset.begin();
4505 p != aset.end();
4506 ++p) {
4507 char n[CHAIN_XATTR_MAX_NAME_LEN];
4508 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4509 const char *val;
4510 if (p->second.length())
4511 val = p->second.c_str();
4512 else
4513 val = "";
4514 // ??? Why do we skip setting all the other attrs if one fails?
4515 int r = chain_fsetxattr(fd, n, val, p->second.length());
4516 if (r < 0) {
4517 derr << __FUNC__ << ": chain_setxattr returned " << r << dendl;
4518 return r;
4519 }
4520 }
4521 return 0;
4522 }
4523
4524 // debug EIO injection
4525 void FileStore::inject_data_error(const ghobject_t &oid) {
4526 std::lock_guard l{read_error_lock};
4527 dout(10) << __FUNC__ << ": init error on " << oid << dendl;
4528 data_error_set.insert(oid);
4529 }
4530 void FileStore::inject_mdata_error(const ghobject_t &oid) {
4531 std::lock_guard l{read_error_lock};
4532 dout(10) << __FUNC__ << ": init error on " << oid << dendl;
4533 mdata_error_set.insert(oid);
4534 }
4535
4536 void FileStore::debug_obj_on_delete(const ghobject_t &oid) {
4537 std::lock_guard l{read_error_lock};
4538 dout(10) << __FUNC__ << ": clear error on " << oid << dendl;
4539 data_error_set.erase(oid);
4540 mdata_error_set.erase(oid);
4541 }
4542 bool FileStore::debug_data_eio(const ghobject_t &oid) {
4543 std::lock_guard l{read_error_lock};
4544 if (data_error_set.count(oid)) {
4545 dout(10) << __FUNC__ << ": inject error on " << oid << dendl;
4546 return true;
4547 } else {
4548 return false;
4549 }
4550 }
4551 bool FileStore::debug_mdata_eio(const ghobject_t &oid) {
4552 std::lock_guard l{read_error_lock};
4553 if (mdata_error_set.count(oid)) {
4554 dout(10) << __FUNC__ << ": inject error on " << oid << dendl;
4555 return true;
4556 } else {
4557 return false;
4558 }
4559 }
4560
4561
4562 // objects
4563
4564 int FileStore::getattr(CollectionHandle& ch, const ghobject_t& oid, const char *name, bufferptr &bp)
4565 {
4566 tracepoint(objectstore, getattr_enter, ch->cid.c_str());
4567 const coll_t& cid = !_need_temp_object_collection(ch->cid, oid) ? ch->cid : ch->cid.get_temp();
4568 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "'" << dendl;
4569
4570 auto osr = static_cast<OpSequencer*>(ch.get());
4571 osr->wait_for_apply(oid);
4572
4573 FDRef fd;
4574 int r = lfn_open(cid, oid, false, &fd);
4575 if (r < 0) {
4576 goto out;
4577 }
4578 char n[CHAIN_XATTR_MAX_NAME_LEN];
4579 get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN);
4580 r = _fgetattr(**fd, n, bp);
4581 lfn_close(fd);
4582 if (r == -ENODATA) {
4583 map<string, bufferlist> got;
4584 set<string> to_get;
4585 to_get.insert(string(name));
4586 Index index;
4587 r = get_index(cid, &index);
4588 if (r < 0) {
4589 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
4590 goto out;
4591 }
4592 r = object_map->get_xattrs(oid, to_get, &got);
4593 if (r < 0 && r != -ENOENT) {
4594 dout(10) << __FUNC__ << ": get_xattrs err r =" << r << dendl;
4595 goto out;
4596 }
4597 if (got.empty()) {
4598 dout(10) << __FUNC__ << ": got.size() is 0" << dendl;
4599 return -ENODATA;
4600 }
4601 bp = bufferptr(got.begin()->second.c_str(),
4602 got.begin()->second.length());
4603 r = bp.length();
4604 }
4605 out:
4606 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "' = " << r << dendl;
4607 if (r == -EIO && m_filestore_fail_eio) handle_eio();
4608 if (cct->_conf->filestore_debug_inject_read_err &&
4609 debug_mdata_eio(oid)) {
4610 return -EIO;
4611 } else {
4612 tracepoint(objectstore, getattr_exit, r);
4613 return r < 0 ? r : 0;
4614 }
4615 }
4616
4617 int FileStore::getattrs(CollectionHandle& ch, const ghobject_t& oid, map<string,bufferptr>& aset)
4618 {
4619 tracepoint(objectstore, getattrs_enter, ch->cid.c_str());
4620 const coll_t& cid = !_need_temp_object_collection(ch->cid, oid) ? ch->cid : ch->cid.get_temp();
4621 set<string> omap_attrs;
4622 map<string, bufferlist> omap_aset;
4623 Index index;
4624 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
4625
4626 auto osr = static_cast<OpSequencer*>(ch.get());
4627 osr->wait_for_apply(oid);
4628
4629 FDRef fd;
4630 bool spill_out = true;
4631 char buf[2];
4632
4633 int r = lfn_open(cid, oid, false, &fd);
4634 if (r < 0) {
4635 goto out;
4636 }
4637
4638 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4639 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT)))
4640 spill_out = false;
4641
4642 r = _fgetattrs(**fd, aset);
4643 lfn_close(fd);
4644 fd = FDRef(); // defensive
4645 if (r < 0) {
4646 goto out;
4647 }
4648
4649 if (!spill_out) {
4650 dout(10) << __FUNC__ << ": no xattr exists in object_map r = " << r << dendl;
4651 goto out;
4652 }
4653
4654 r = get_index(cid, &index);
4655 if (r < 0) {
4656 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
4657 goto out;
4658 }
4659 {
4660 r = object_map->get_all_xattrs(oid, &omap_attrs);
4661 if (r < 0 && r != -ENOENT) {
4662 dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
4663 goto out;
4664 }
4665
4666 r = object_map->get_xattrs(oid, omap_attrs, &omap_aset);
4667 if (r < 0 && r != -ENOENT) {
4668 dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
4669 goto out;
4670 }
4671 if (r == -ENOENT)
4672 r = 0;
4673 }
4674 ceph_assert(omap_attrs.size() == omap_aset.size());
4675 for (map<string, bufferlist>::iterator i = omap_aset.begin();
4676 i != omap_aset.end();
4677 ++i) {
4678 string key(i->first);
4679 aset.insert(make_pair(key,
4680 bufferptr(i->second.c_str(), i->second.length())));
4681 }
4682 out:
4683 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
4684 if (r == -EIO && m_filestore_fail_eio) handle_eio();
4685
4686 if (cct->_conf->filestore_debug_inject_read_err &&
4687 debug_mdata_eio(oid)) {
4688 return -EIO;
4689 } else {
4690 tracepoint(objectstore, getattrs_exit, r);
4691 return r;
4692 }
4693 }
4694
4695 int FileStore::_setattrs(const coll_t& cid, const ghobject_t& oid, map<string,bufferptr>& aset,
4696 const SequencerPosition &spos)
4697 {
4698 map<string, bufferlist> omap_set;
4699 set<string> omap_remove;
4700 map<string, bufferptr> inline_set;
4701 map<string, bufferptr> inline_to_set;
4702 FDRef fd;
4703 int spill_out = -1;
4704 bool incomplete_inline = false;
4705
4706 int r = lfn_open(cid, oid, false, &fd);
4707 if (r < 0) {
4708 goto out;
4709 }
4710
4711 char buf[2];
4712 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4713 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT)))
4714 spill_out = 0;
4715 else
4716 spill_out = 1;
4717
4718 r = _fgetattrs(**fd, inline_set);
4719 incomplete_inline = (r == -E2BIG);
4720 if (r == -EIO && m_filestore_fail_eio) handle_eio();
4721 dout(15) << __FUNC__ << ": " << cid << "/" << oid
4722 << (incomplete_inline ? " (incomplete_inline, forcing omap)" : "")
4723 << dendl;
4724
4725 for (map<string,bufferptr>::iterator p = aset.begin();
4726 p != aset.end();
4727 ++p) {
4728 char n[CHAIN_XATTR_MAX_NAME_LEN];
4729 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4730
4731 if (incomplete_inline) {
4732 chain_fremovexattr(**fd, n); // ignore any error
4733 omap_set[p->first].push_back(p->second);
4734 continue;
4735 }
4736
4737 if (p->second.length() > m_filestore_max_inline_xattr_size) {
4738 if (inline_set.count(p->first)) {
4739 inline_set.erase(p->first);
4740 r = chain_fremovexattr(**fd, n);
4741 if (r < 0)
4742 goto out_close;
4743 }
4744 omap_set[p->first].push_back(p->second);
4745 continue;
4746 }
4747
4748 if (!inline_set.count(p->first) &&
4749 inline_set.size() >= m_filestore_max_inline_xattrs) {
4750 omap_set[p->first].push_back(p->second);
4751 continue;
4752 }
4753 omap_remove.insert(p->first);
4754 inline_set.insert(*p);
4755
4756 inline_to_set.insert(*p);
4757 }
4758
4759 if (spill_out != 1 && !omap_set.empty()) {
4760 chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_SPILL_OUT,
4761 sizeof(XATTR_SPILL_OUT));
4762 }
4763
4764 r = _fsetattrs(**fd, inline_to_set);
4765 if (r < 0)
4766 goto out_close;
4767
4768 if (spill_out && !omap_remove.empty()) {
4769 r = object_map->remove_xattrs(oid, omap_remove, &spos);
4770 if (r < 0 && r != -ENOENT) {
4771 dout(10) << __FUNC__ << ": could not remove_xattrs r = " << r << dendl;
4772 if (r == -EIO && m_filestore_fail_eio) handle_eio();
4773 goto out_close;
4774 } else {
4775 r = 0; // don't confuse the debug output
4776 }
4777 }
4778
4779 if (!omap_set.empty()) {
4780 r = object_map->set_xattrs(oid, omap_set, &spos);
4781 if (r < 0) {
4782 dout(10) << __FUNC__ << ": could not set_xattrs r = " << r << dendl;
4783 if (r == -EIO && m_filestore_fail_eio) handle_eio();
4784 goto out_close;
4785 }
4786 }
4787 out_close:
4788 lfn_close(fd);
4789 out:
4790 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
4791 return r;
4792 }
4793
4794
4795 int FileStore::_rmattr(const coll_t& cid, const ghobject_t& oid, const char *name,
4796 const SequencerPosition &spos)
4797 {
4798 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "'" << dendl;
4799 FDRef fd;
4800 bool spill_out = true;
4801
4802 int r = lfn_open(cid, oid, false, &fd);
4803 if (r < 0) {
4804 goto out;
4805 }
4806
4807 char buf[2];
4808 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4809 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
4810 spill_out = false;
4811 }
4812
4813 char n[CHAIN_XATTR_MAX_NAME_LEN];
4814 get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN);
4815 r = chain_fremovexattr(**fd, n);
4816 if (r == -ENODATA && spill_out) {
4817 Index index;
4818 r = get_index(cid, &index);
4819 if (r < 0) {
4820 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
4821 goto out_close;
4822 }
4823 set<string> to_remove;
4824 to_remove.insert(string(name));
4825 r = object_map->remove_xattrs(oid, to_remove, &spos);
4826 if (r < 0 && r != -ENOENT) {
4827 dout(10) << __FUNC__ << ": could not remove_xattrs index r = " << r << dendl;
4828 if (r == -EIO && m_filestore_fail_eio) handle_eio();
4829 goto out_close;
4830 }
4831 }
4832 out_close:
4833 lfn_close(fd);
4834 out:
4835 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "' = " << r << dendl;
4836 return r;
4837 }
4838
4839 int FileStore::_rmattrs(const coll_t& cid, const ghobject_t& oid,
4840 const SequencerPosition &spos)
4841 {
4842 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
4843
4844 map<string,bufferptr> aset;
4845 FDRef fd;
4846 set<string> omap_attrs;
4847 Index index;
4848 bool spill_out = true;
4849
4850 int r = lfn_open(cid, oid, false, &fd);
4851 if (r < 0) {
4852 goto out;
4853 }
4854
4855 char buf[2];
4856 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4857 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
4858 spill_out = false;
4859 }
4860
4861 r = _fgetattrs(**fd, aset);
4862 if (r >= 0) {
4863 for (map<string,bufferptr>::iterator p = aset.begin(); p != aset.end(); ++p) {
4864 char n[CHAIN_XATTR_MAX_NAME_LEN];
4865 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4866 r = chain_fremovexattr(**fd, n);
4867 if (r < 0) {
4868 dout(10) << __FUNC__ << ": could not remove xattr r = " << r << dendl;
4869 goto out_close;
4870 }
4871 }
4872 }
4873
4874 if (!spill_out) {
4875 dout(10) << __FUNC__ << ": no xattr exists in object_map r = " << r << dendl;
4876 goto out_close;
4877 }
4878
4879 r = get_index(cid, &index);
4880 if (r < 0) {
4881 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
4882 goto out_close;
4883 }
4884 {
4885 r = object_map->get_all_xattrs(oid, &omap_attrs);
4886 if (r < 0 && r != -ENOENT) {
4887 dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
4888 if (r == -EIO && m_filestore_fail_eio) handle_eio();
4889 goto out_close;
4890 }
4891 r = object_map->remove_xattrs(oid, omap_attrs, &spos);
4892 if (r < 0 && r != -ENOENT) {
4893 dout(10) << __FUNC__ << ": could not remove omap_attrs r = " << r << dendl;
4894 goto out_close;
4895 }
4896 if (r == -ENOENT)
4897 r = 0;
4898 chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
4899 sizeof(XATTR_NO_SPILL_OUT));
4900 }
4901
4902 out_close:
4903 lfn_close(fd);
4904 out:
4905 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
4906 return r;
4907 }
4908
4909
4910
4911
4912 int FileStore::_collection_remove_recursive(const coll_t &cid,
4913 const SequencerPosition &spos)
4914 {
4915 struct stat st;
4916 int r = collection_stat(cid, &st);
4917 if (r < 0) {
4918 if (r == -ENOENT)
4919 return 0;
4920 return r;
4921 }
4922
4923 vector<ghobject_t> objects;
4924 ghobject_t max;
4925 while (!max.is_max()) {
4926 r = collection_list(cid, max, ghobject_t::get_max(),
4927 300, &objects, &max);
4928 if (r < 0)
4929 return r;
4930 for (vector<ghobject_t>::iterator i = objects.begin();
4931 i != objects.end();
4932 ++i) {
4933 ceph_assert(_check_replay_guard(cid, *i, spos));
4934 r = _remove(cid, *i, spos);
4935 if (r < 0)
4936 return r;
4937 }
4938 objects.clear();
4939 }
4940 return _destroy_collection(cid);
4941 }
4942
4943 // --------------------------
4944 // collections
4945
4946 int FileStore::list_collections(vector<coll_t>& ls)
4947 {
4948 return list_collections(ls, false);
4949 }
4950
4951 int FileStore::list_collections(vector<coll_t>& ls, bool include_temp)
4952 {
4953 tracepoint(objectstore, list_collections_enter);
4954 dout(10) << __FUNC__ << dendl;
4955
4956 char fn[PATH_MAX];
4957 snprintf(fn, sizeof(fn), "%s/current", basedir.c_str());
4958
4959 int r = 0;
4960 DIR *dir = ::opendir(fn);
4961 if (!dir) {
4962 r = -errno;
4963 derr << "tried opening directory " << fn << ": " << cpp_strerror(-r) << dendl;
4964 if (r == -EIO && m_filestore_fail_eio) handle_eio();
4965 return r;
4966 }
4967
4968 struct dirent *de = nullptr;
4969 while ((de = ::readdir(dir))) {
4970 if (de->d_type == DT_UNKNOWN) {
4971 // d_type not supported (non-ext[234], btrfs), must stat
4972 struct stat sb;
4973 char filename[PATH_MAX];
4974 if (int n = snprintf(filename, sizeof(filename), "%s/%s", fn, de->d_name);
4975 n >= static_cast<int>(sizeof(filename))) {
4976 derr << __func__ << " path length overrun: " << n << dendl;
4977 ceph_abort();
4978 }
4979
4980 r = ::stat(filename, &sb);
4981 if (r < 0) {
4982 r = -errno;
4983 derr << "stat on " << filename << ": " << cpp_strerror(-r) << dendl;
4984 if (r == -EIO && m_filestore_fail_eio) handle_eio();
4985 break;
4986 }
4987 if (!S_ISDIR(sb.st_mode)) {
4988 continue;
4989 }
4990 } else if (de->d_type != DT_DIR) {
4991 continue;
4992 }
4993 if (strcmp(de->d_name, "omap") == 0) {
4994 continue;
4995 }
4996 if (de->d_name[0] == '.' &&
4997 (de->d_name[1] == '\0' ||
4998 (de->d_name[1] == '.' &&
4999 de->d_name[2] == '\0')))
5000 continue;
5001 coll_t cid;
5002 if (!cid.parse(de->d_name)) {
5003 derr << "ignoring invalid collection '" << de->d_name << "'" << dendl;
5004 continue;
5005 }
5006 if (!cid.is_temp() || include_temp)
5007 ls.push_back(cid);
5008 }
5009
5010 if (r > 0) {
5011 derr << "trying readdir " << fn << ": " << cpp_strerror(r) << dendl;
5012 r = -r;
5013 }
5014
5015 ::closedir(dir);
5016 if (r == -EIO && m_filestore_fail_eio) handle_eio();
5017 tracepoint(objectstore, list_collections_exit, r);
5018 return r;
5019 }
5020
5021 int FileStore::collection_stat(const coll_t& c, struct stat *st)
5022 {
5023 tracepoint(objectstore, collection_stat_enter, c.c_str());
5024 char fn[PATH_MAX];
5025 get_cdir(c, fn, sizeof(fn));
5026 dout(15) << __FUNC__ << ": " << fn << dendl;
5027 int r = ::stat(fn, st);
5028 if (r < 0)
5029 r = -errno;
5030 dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
5031 if (r == -EIO && m_filestore_fail_eio) handle_eio();
5032 tracepoint(objectstore, collection_stat_exit, r);
5033 return r;
5034 }
5035
5036 bool FileStore::collection_exists(const coll_t& c)
5037 {
5038 tracepoint(objectstore, collection_exists_enter, c.c_str());
5039 struct stat st;
5040 bool ret = collection_stat(c, &st) == 0;
5041 tracepoint(objectstore, collection_exists_exit, ret);
5042 return ret;
5043 }
5044
5045 int FileStore::collection_empty(const coll_t& cid, bool *empty)
5046 {
5047 tracepoint(objectstore, collection_empty_enter, cid.c_str());
5048 dout(15) << __FUNC__ << ": " << cid << dendl;
5049 Index index;
5050 int r = get_index(cid, &index);
5051 if (r < 0) {
5052 derr << __FUNC__ << ": get_index returned: " << cpp_strerror(r)
5053 << dendl;
5054 return r;
5055 }
5056
5057 ceph_assert(index.index);
5058 std::shared_lock l{(index.index)->access_lock};
5059
5060 vector<ghobject_t> ls;
5061 r = index->collection_list_partial(ghobject_t(), ghobject_t::get_max(),
5062 1, &ls, nullptr);
5063 if (r < 0) {
5064 derr << __FUNC__ << ": collection_list_partial returned: "
5065 << cpp_strerror(r) << dendl;
5066 if (r == -EIO && m_filestore_fail_eio) handle_eio();
5067 return r;
5068 }
5069 *empty = ls.empty();
5070 tracepoint(objectstore, collection_empty_exit, *empty);
5071 return 0;
5072 }
5073
5074 int FileStore::_collection_set_bits(const coll_t& c, int bits)
5075 {
5076 char fn[PATH_MAX];
5077 get_cdir(c, fn, sizeof(fn));
5078 dout(10) << __FUNC__ << ": " << fn << " " << bits << dendl;
5079 char n[PATH_MAX];
5080 int r;
5081 int32_t v = bits;
5082 int fd = ::open(fn, O_RDONLY|O_CLOEXEC);
5083 if (fd < 0) {
5084 r = -errno;
5085 goto out;
5086 }
5087 get_attrname("bits", n, PATH_MAX);
5088 r = chain_fsetxattr(fd, n, (char*)&v, sizeof(v));
5089 VOID_TEMP_FAILURE_RETRY(::close(fd));
5090 out:
5091 dout(10) << __FUNC__ << ": " << fn << " " << bits << " = " << r << dendl;
5092 return r;
5093 }
5094
5095 int FileStore::collection_bits(CollectionHandle& ch)
5096 {
5097 char fn[PATH_MAX];
5098 get_cdir(ch->cid, fn, sizeof(fn));
5099 dout(15) << __FUNC__ << ": " << fn << dendl;
5100 int r;
5101 char n[PATH_MAX];
5102 int32_t bits;
5103 int fd = ::open(fn, O_RDONLY|O_CLOEXEC);
5104 if (fd < 0) {
5105 bits = r = -errno;
5106 goto out;
5107 }
5108 get_attrname("bits", n, PATH_MAX);
5109 r = chain_fgetxattr(fd, n, (char*)&bits, sizeof(bits));
5110 VOID_TEMP_FAILURE_RETRY(::close(fd));
5111 if (r < 0) {
5112 bits = r;
5113 goto out;
5114 }
5115 out:
5116 dout(10) << __FUNC__ << ": " << fn << " = " << bits << dendl;
5117 return bits;
5118 }
5119
5120 int FileStore::collection_list(const coll_t& c,
5121 const ghobject_t& orig_start,
5122 const ghobject_t& end,
5123 int max,
5124 vector<ghobject_t> *ls, ghobject_t *next)
5125 {
5126 ghobject_t start = orig_start;
5127 if (start.is_max())
5128 return 0;
5129
5130 ghobject_t temp_next;
5131 if (!next)
5132 next = &temp_next;
5133 // figure out the pool id. we need this in order to generate a
5134 // meaningful 'next' value.
5135 int64_t pool = -1;
5136 shard_id_t shard;
5137 {
5138 spg_t pgid;
5139 if (c.is_temp(&pgid)) {
5140 pool = -2 - pgid.pool();
5141 shard = pgid.shard;
5142 } else if (c.is_pg(&pgid)) {
5143 pool = pgid.pool();
5144 shard = pgid.shard;
5145 } else if (c.is_meta()) {
5146 pool = -1;
5147 shard = shard_id_t::NO_SHARD;
5148 } else {
5149 // hrm, the caller is test code! we should get kill it off. for now,
5150 // tolerate it.
5151 pool = 0;
5152 shard = shard_id_t::NO_SHARD;
5153 }
5154 dout(20) << __FUNC__ << ": pool is " << pool << " shard is " << shard
5155 << " pgid " << pgid << dendl;
5156 }
5157 ghobject_t sep;
5158 sep.hobj.pool = -1;
5159 sep.set_shard(shard);
5160 if (!c.is_temp() && !c.is_meta()) {
5161 if (start < sep) {
5162 dout(10) << __FUNC__ << ": first checking temp pool" << dendl;
5163 coll_t temp = c.get_temp();
5164 int r = collection_list(temp, start, end, max, ls, next);
5165 if (r < 0)
5166 return r;
5167 if (*next != ghobject_t::get_max())
5168 return r;
5169 start = sep;
5170 dout(10) << __FUNC__ << ": fall through to non-temp collection, start "
5171 << start << dendl;
5172 } else {
5173 dout(10) << __FUNC__ << ": start " << start << " >= sep " << sep << dendl;
5174 }
5175 }
5176
5177 Index index;
5178 int r = get_index(c, &index);
5179 if (r < 0)
5180 return r;
5181
5182 ceph_assert(index.index);
5183 std::shared_lock l{(index.index)->access_lock};
5184
5185 r = index->collection_list_partial(start, end, max, ls, next);
5186
5187 if (r < 0) {
5188 if (r == -EIO && m_filestore_fail_eio) handle_eio();
5189 return r;
5190 }
5191 dout(20) << "objects: " << *ls << dendl;
5192
5193 // HashIndex doesn't know the pool when constructing a 'next' value
5194 if (!next->is_max()) {
5195 next->hobj.pool = pool;
5196 next->set_shard(shard);
5197 dout(20) << " next " << *next << dendl;
5198 }
5199
5200 return 0;
5201 }
5202
5203 int FileStore::omap_get(CollectionHandle& ch, const ghobject_t &hoid,
5204 bufferlist *header,
5205 map<string, bufferlist> *out)
5206 {
5207 tracepoint(objectstore, omap_get_enter, ch->cid.c_str());
5208 const coll_t& c = !_need_temp_object_collection(ch->cid, hoid) ? ch->cid : ch->cid.get_temp();
5209 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5210
5211 auto osr = static_cast<OpSequencer*>(ch.get());
5212 osr->wait_for_apply(hoid);
5213
5214 Index index;
5215 int r = get_index(c, &index);
5216 if (r < 0)
5217 return r;
5218 {
5219 ceph_assert(index.index);
5220 std::shared_lock l{(index.index)->access_lock};
5221 r = lfn_find(hoid, index);
5222 if (r < 0)
5223 return r;
5224 }
5225 r = object_map->get(hoid, header, out);
5226 if (r < 0 && r != -ENOENT) {
5227 if (r == -EIO && m_filestore_fail_eio) handle_eio();
5228 return r;
5229 }
5230 tracepoint(objectstore, omap_get_exit, 0);
5231 return 0;
5232 }
5233
5234 int FileStore::omap_get_header(
5235 CollectionHandle& ch,
5236 const ghobject_t &hoid,
5237 bufferlist *bl,
5238 bool allow_eio)
5239 {
5240 tracepoint(objectstore, omap_get_header_enter, ch->cid.c_str());
5241 const coll_t& c = !_need_temp_object_collection(ch->cid, hoid) ? ch->cid : ch->cid.get_temp();
5242 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5243
5244 auto osr = static_cast<OpSequencer*>(ch.get());
5245 osr->wait_for_apply(hoid);
5246
5247 Index index;
5248 int r = get_index(c, &index);
5249 if (r < 0)
5250 return r;
5251 {
5252 ceph_assert(index.index);
5253 std::shared_lock l{(index.index)->access_lock};
5254 r = lfn_find(hoid, index);
5255 if (r < 0)
5256 return r;
5257 }
5258 r = object_map->get_header(hoid, bl);
5259 if (r < 0 && r != -ENOENT) {
5260 ceph_assert(allow_eio || !m_filestore_fail_eio || r != -EIO);
5261 return r;
5262 }
5263 tracepoint(objectstore, omap_get_header_exit, 0);
5264 return 0;
5265 }
5266
5267 int FileStore::omap_get_keys(CollectionHandle& ch, const ghobject_t &hoid, set<string> *keys)
5268 {
5269 tracepoint(objectstore, omap_get_keys_enter, ch->cid.c_str());
5270 const coll_t& c = !_need_temp_object_collection(ch->cid, hoid) ? ch->cid : ch->cid.get_temp();
5271 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5272
5273 auto osr = static_cast<OpSequencer*>(ch.get());
5274 osr->wait_for_apply(hoid);
5275
5276 Index index;
5277 int r = get_index(c, &index);
5278 if (r < 0)
5279 return r;
5280 {
5281 ceph_assert(index.index);
5282 std::shared_lock l{(index.index)->access_lock};
5283 r = lfn_find(hoid, index);
5284 if (r < 0)
5285 return r;
5286 }
5287 r = object_map->get_keys(hoid, keys);
5288 if (r < 0 && r != -ENOENT) {
5289 if (r == -EIO && m_filestore_fail_eio) handle_eio();
5290 return r;
5291 }
5292 tracepoint(objectstore, omap_get_keys_exit, 0);
5293 return 0;
5294 }
5295
5296 int FileStore::omap_get_values(CollectionHandle& ch, const ghobject_t &hoid,
5297 const set<string> &keys,
5298 map<string, bufferlist> *out)
5299 {
5300 tracepoint(objectstore, omap_get_values_enter, ch->cid.c_str());
5301 const coll_t& c = !_need_temp_object_collection(ch->cid, hoid) ? ch->cid : ch->cid.get_temp();
5302 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5303
5304 auto osr = static_cast<OpSequencer*>(ch.get());
5305 osr->wait_for_apply(hoid);
5306
5307 Index index;
5308 const char *where = "()";
5309 int r = get_index(c, &index);
5310 if (r < 0) {
5311 where = " (get_index)";
5312 goto out;
5313 }
5314 {
5315 ceph_assert(index.index);
5316 std::shared_lock l{(index.index)->access_lock};
5317 r = lfn_find(hoid, index);
5318 if (r < 0) {
5319 where = " (lfn_find)";
5320 goto out;
5321 }
5322 }
5323 r = object_map->get_values(hoid, keys, out);
5324 if (r < 0 && r != -ENOENT) {
5325 if (r == -EIO && m_filestore_fail_eio) handle_eio();
5326 where = " (get_values)";
5327 goto out;
5328 }
5329 r = 0;
5330 out:
5331 tracepoint(objectstore, omap_get_values_exit, r);
5332 dout(15) << __FUNC__ << ": " << c << "/" << hoid << " = " << r
5333 << where << dendl;
5334 return r;
5335 }
5336
5337 int FileStore::omap_check_keys(CollectionHandle& ch, const ghobject_t &hoid,
5338 const set<string> &keys,
5339 set<string> *out)
5340 {
5341 tracepoint(objectstore, omap_check_keys_enter, ch->cid.c_str());
5342 const coll_t& c = !_need_temp_object_collection(ch->cid, hoid) ? ch->cid : ch->cid.get_temp();
5343 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5344
5345 auto osr = static_cast<OpSequencer*>(ch.get());
5346 osr->wait_for_apply(hoid);
5347
5348 Index index;
5349 int r = get_index(c, &index);
5350 if (r < 0)
5351 return r;
5352 {
5353 ceph_assert(index.index);
5354 std::shared_lock l{(index.index)->access_lock};
5355 r = lfn_find(hoid, index);
5356 if (r < 0)
5357 return r;
5358 }
5359 r = object_map->check_keys(hoid, keys, out);
5360 if (r < 0 && r != -ENOENT) {
5361 if (r == -EIO && m_filestore_fail_eio) handle_eio();
5362 return r;
5363 }
5364 tracepoint(objectstore, omap_check_keys_exit, 0);
5365 return 0;
5366 }
5367
5368 ObjectMap::ObjectMapIterator FileStore::get_omap_iterator(
5369 CollectionHandle& ch,
5370 const ghobject_t &oid)
5371 {
5372 auto osr = static_cast<OpSequencer*>(ch.get());
5373 osr->wait_for_apply(oid);
5374 return get_omap_iterator(ch->cid, oid);
5375 }
5376
5377 ObjectMap::ObjectMapIterator FileStore::get_omap_iterator(const coll_t& _c,
5378 const ghobject_t &hoid)
5379 {
5380 tracepoint(objectstore, get_omap_iterator, _c.c_str());
5381 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5382 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5383 Index index;
5384 int r = get_index(c, &index);
5385 if (r < 0) {
5386 dout(10) << __FUNC__ << ": " << c << "/" << hoid << " = 0 "
5387 << "(get_index failed with " << cpp_strerror(r) << ")" << dendl;
5388 return ObjectMap::ObjectMapIterator();
5389 }
5390 {
5391 ceph_assert(index.index);
5392 std::shared_lock l{(index.index)->access_lock};
5393 r = lfn_find(hoid, index);
5394 if (r < 0) {
5395 dout(10) << __FUNC__ << ": " << c << "/" << hoid << " = 0 "
5396 << "(lfn_find failed with " << cpp_strerror(r) << ")" << dendl;
5397 return ObjectMap::ObjectMapIterator();
5398 }
5399 }
5400 return object_map->get_iterator(hoid);
5401 }
5402
5403 int FileStore::_collection_hint_expected_num_objs(const coll_t& c, uint32_t pg_num,
5404 uint64_t expected_num_objs,
5405 const SequencerPosition &spos)
5406 {
5407 dout(15) << __FUNC__ << ": collection: " << c << " pg number: "
5408 << pg_num << " expected number of objects: " << expected_num_objs << dendl;
5409
5410 bool empty;
5411 int ret = collection_empty(c, &empty);
5412 if (ret < 0)
5413 return ret;
5414 if (!empty && !replaying) {
5415 dout(0) << "Failed to give an expected number of objects hint to collection : "
5416 << c << ", only empty collection can take such type of hint. " << dendl;
5417 return 0;
5418 }
5419
5420 Index index;
5421 ret = get_index(c, &index);
5422 if (ret < 0)
5423 return ret;
5424 // Pre-hash the collection
5425 ret = index->pre_hash_collection(pg_num, expected_num_objs);
5426 dout(10) << "pre_hash_collection " << c << " = " << ret << dendl;
5427 if (ret < 0)
5428 return ret;
5429 _set_replay_guard(c, spos);
5430
5431 return 0;
5432 }
5433
5434 int FileStore::_create_collection(
5435 const coll_t& c,
5436 int bits,
5437 const SequencerPosition &spos)
5438 {
5439 char fn[PATH_MAX];
5440 get_cdir(c, fn, sizeof(fn));
5441 dout(15) << __FUNC__ << ": " << fn << dendl;
5442 int r = ::mkdir(fn, 0755);
5443 if (r < 0)
5444 r = -errno;
5445 if (r == -EEXIST && replaying)
5446 r = 0;
5447 dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
5448
5449 if (r < 0)
5450 return r;
5451 r = init_index(c);
5452 if (r < 0)
5453 return r;
5454 r = _collection_set_bits(c, bits);
5455 if (r < 0)
5456 return r;
5457 // create parallel temp collection, too
5458 if (!c.is_meta() && !c.is_temp()) {
5459 coll_t temp = c.get_temp();
5460 r = _create_collection(temp, 0, spos);
5461 if (r < 0)
5462 return r;
5463 }
5464
5465 _set_replay_guard(c, spos);
5466 return 0;
5467 }
5468
5469 int FileStore::_destroy_collection(const coll_t& c)
5470 {
5471 int r = 0;
5472 char fn[PATH_MAX];
5473 get_cdir(c, fn, sizeof(fn));
5474 dout(15) << __FUNC__ << ": " << fn << dendl;
5475 {
5476 Index from;
5477 r = get_index(c, &from);
5478 if (r < 0)
5479 goto out;
5480 ceph_assert(from.index);
5481 std::unique_lock l{(from.index)->access_lock};
5482
5483 r = from->prep_delete();
5484 if (r < 0)
5485 goto out;
5486 }
5487 r = ::rmdir(fn);
5488 if (r < 0) {
5489 r = -errno;
5490 goto out;
5491 }
5492
5493 out:
5494 // destroy parallel temp collection, too
5495 if (!c.is_meta() && !c.is_temp()) {
5496 coll_t temp = c.get_temp();
5497 int r2 = _destroy_collection(temp);
5498 if (r2 < 0) {
5499 r = r2;
5500 goto out_final;
5501 }
5502 }
5503
5504 out_final:
5505 dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
5506 return r;
5507 }
5508
5509
5510 int FileStore::_collection_add(const coll_t& c, const coll_t& oldcid, const ghobject_t& o,
5511 const SequencerPosition& spos)
5512 {
5513 dout(15) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << o << dendl;
5514
5515 int dstcmp = _check_replay_guard(c, o, spos);
5516 if (dstcmp < 0)
5517 return 0;
5518
5519 // check the src name too; it might have a newer guard, and we don't
5520 // want to clobber it
5521 int srccmp = _check_replay_guard(oldcid, o, spos);
5522 if (srccmp < 0)
5523 return 0;
5524
5525 // open guard on object so we don't any previous operations on the
5526 // new name that will modify the source inode.
5527 FDRef fd;
5528 int r = lfn_open(oldcid, o, 0, &fd);
5529 if (r < 0) {
5530 // the source collection/object does not exist. If we are replaying, we
5531 // should be safe, so just return 0 and move on.
5532 ceph_assert(replaying);
5533 dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
5534 << oldcid << "/" << o << " (dne, continue replay) " << dendl;
5535 return 0;
5536 }
5537 if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress"
5538 _set_replay_guard(**fd, spos, &o, true);
5539 }
5540
5541 r = lfn_link(oldcid, c, o, o);
5542 if (replaying && !backend->can_checkpoint() &&
5543 r == -EEXIST) // crashed between link() and set_replay_guard()
5544 r = 0;
5545
5546 _inject_failure();
5547
5548 // close guard on object so we don't do this again
5549 if (r == 0) {
5550 _close_replay_guard(**fd, spos);
5551 }
5552 lfn_close(fd);
5553
5554 dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << o << " = " << r << dendl;
5555 return r;
5556 }
5557
5558 int FileStore::_collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid,
5559 coll_t c, const ghobject_t& o,
5560 const SequencerPosition& spos,
5561 bool allow_enoent)
5562 {
5563 dout(15) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid << dendl;
5564 int r = 0;
5565 int dstcmp, srccmp;
5566
5567 if (replaying) {
5568 /* If the destination collection doesn't exist during replay,
5569 * we need to delete the src object and continue on
5570 */
5571 if (!collection_exists(c))
5572 goto out_rm_src;
5573 }
5574
5575 dstcmp = _check_replay_guard(c, o, spos);
5576 if (dstcmp < 0)
5577 goto out_rm_src;
5578
5579 // check the src name too; it might have a newer guard, and we don't
5580 // want to clobber it
5581 srccmp = _check_replay_guard(oldcid, oldoid, spos);
5582 if (srccmp < 0)
5583 return 0;
5584
5585 {
5586 // open guard on object so we don't any previous operations on the
5587 // new name that will modify the source inode.
5588 FDRef fd;
5589 r = lfn_open(oldcid, oldoid, 0, &fd);
5590 if (r < 0) {
5591 // the source collection/object does not exist. If we are replaying, we
5592 // should be safe, so just return 0 and move on.
5593 if (replaying) {
5594 dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
5595 << oldcid << "/" << oldoid << " (dne, continue replay) " << dendl;
5596 } else if (allow_enoent) {
5597 dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
5598 << oldcid << "/" << oldoid << " (dne, ignoring enoent)"
5599 << dendl;
5600 } else {
5601 ceph_abort_msg("ERROR: source must exist");
5602 }
5603
5604 if (!replaying) {
5605 return 0;
5606 }
5607 if (allow_enoent && dstcmp > 0) { // if dstcmp == 0, try_rename was started.
5608 return 0;
5609 }
5610
5611 r = 0; // don't know if object_map was cloned
5612 } else {
5613 if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress"
5614 _set_replay_guard(**fd, spos, &o, true);
5615 }
5616
5617 r = lfn_link(oldcid, c, oldoid, o);
5618 if (replaying && !backend->can_checkpoint() &&
5619 r == -EEXIST) // crashed between link() and set_replay_guard()
5620 r = 0;
5621
5622 lfn_close(fd);
5623 fd = FDRef();
5624
5625 _inject_failure();
5626 }
5627
5628 if (r == 0) {
5629 // the name changed; link the omap content
5630 r = object_map->rename(oldoid, o, &spos);
5631 if (r == -ENOENT)
5632 r = 0;
5633 }
5634
5635 _inject_failure();
5636
5637 if (r == 0)
5638 r = lfn_unlink(oldcid, oldoid, spos, true);
5639
5640 if (r == 0)
5641 r = lfn_open(c, o, 0, &fd);
5642
5643 // close guard on object so we don't do this again
5644 if (r == 0) {
5645 _close_replay_guard(**fd, spos, &o);
5646 lfn_close(fd);
5647 }
5648 }
5649
5650 dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid
5651 << " = " << r << dendl;
5652 return r;
5653
5654 out_rm_src:
5655 // remove source
5656 if (_check_replay_guard(oldcid, oldoid, spos) > 0) {
5657 r = lfn_unlink(oldcid, oldoid, spos, true);
5658 }
5659
5660 dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid
5661 << " = " << r << dendl;
5662 return r;
5663 }
5664
5665 void FileStore::_inject_failure()
5666 {
5667 if (m_filestore_kill_at) {
5668 int final = --m_filestore_kill_at;
5669 dout(5) << __FUNC__ << ": " << (final+1) << " -> " << final << dendl;
5670 if (final == 0) {
5671 derr << __FUNC__ << ": KILLING" << dendl;
5672 cct->_log->flush();
5673 _exit(1);
5674 }
5675 }
5676 }
5677
5678 int FileStore::_omap_clear(const coll_t& cid, const ghobject_t &hoid,
5679 const SequencerPosition &spos) {
5680 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
5681 Index index;
5682 int r = get_index(cid, &index);
5683 if (r < 0)
5684 return r;
5685 {
5686 ceph_assert(index.index);
5687 std::shared_lock l{(index.index)->access_lock};
5688 r = lfn_find(hoid, index);
5689 if (r < 0)
5690 return r;
5691 }
5692 r = object_map->clear_keys_header(hoid, &spos);
5693 if (r < 0 && r != -ENOENT)
5694 return r;
5695 return 0;
5696 }
5697
5698 int FileStore::_omap_setkeys(const coll_t& cid, const ghobject_t &hoid,
5699 const map<string, bufferlist> &aset,
5700 const SequencerPosition &spos) {
5701 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
5702 Index index;
5703 int r;
5704 //treat pgmeta as a logical object, skip to check exist
5705 if (hoid.is_pgmeta())
5706 goto skip;
5707
5708 r = get_index(cid, &index);
5709 if (r < 0) {
5710 dout(20) << __FUNC__ << ": get_index got " << cpp_strerror(r) << dendl;
5711 return r;
5712 }
5713 {
5714 ceph_assert(index.index);
5715 std::shared_lock l{(index.index)->access_lock};
5716 r = lfn_find(hoid, index);
5717 if (r < 0) {
5718 dout(20) << __FUNC__ << ": lfn_find got " << cpp_strerror(r) << dendl;
5719 return r;
5720 }
5721 }
5722 skip:
5723 if (g_conf()->subsys.should_gather<ceph_subsys_filestore, 20>()) {
5724 for (auto& p : aset) {
5725 dout(20) << __FUNC__ << ": set " << p.first << dendl;
5726 }
5727 }
5728 r = object_map->set_keys(hoid, aset, &spos);
5729 dout(20) << __FUNC__ << ": " << cid << "/" << hoid << " = " << r << dendl;
5730 return r;
5731 }
5732
5733 int FileStore::_omap_rmkeys(const coll_t& cid, const ghobject_t &hoid,
5734 const set<string> &keys,
5735 const SequencerPosition &spos) {
5736 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
5737 Index index;
5738 int r;
5739 //treat pgmeta as a logical object, skip to check exist
5740 if (hoid.is_pgmeta())
5741 goto skip;
5742
5743 r = get_index(cid, &index);
5744 if (r < 0)
5745 return r;
5746 {
5747 ceph_assert(index.index);
5748 std::shared_lock l{(index.index)->access_lock};
5749 r = lfn_find(hoid, index);
5750 if (r < 0)
5751 return r;
5752 }
5753 skip:
5754 r = object_map->rm_keys(hoid, keys, &spos);
5755 if (r < 0 && r != -ENOENT)
5756 return r;
5757 return 0;
5758 }
5759
5760 int FileStore::_omap_rmkeyrange(const coll_t& cid, const ghobject_t &hoid,
5761 const string& first, const string& last,
5762 const SequencerPosition &spos) {
5763 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << " [" << first << "," << last << "]" << dendl;
5764 set<string> keys;
5765 {
5766 ObjectMap::ObjectMapIterator iter = get_omap_iterator(cid, hoid);
5767 if (!iter)
5768 return -ENOENT;
5769 for (iter->lower_bound(first); iter->valid() && iter->key() < last;
5770 iter->next()) {
5771 keys.insert(iter->key());
5772 }
5773 }
5774 return _omap_rmkeys(cid, hoid, keys, spos);
5775 }
5776
5777 int FileStore::_omap_setheader(const coll_t& cid, const ghobject_t &hoid,
5778 const bufferlist &bl,
5779 const SequencerPosition &spos)
5780 {
5781 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
5782 Index index;
5783 int r = get_index(cid, &index);
5784 if (r < 0)
5785 return r;
5786 {
5787 ceph_assert(index.index);
5788 std::shared_lock l{(index.index)->access_lock};
5789 r = lfn_find(hoid, index);
5790 if (r < 0)
5791 return r;
5792 }
5793 return object_map->set_header(hoid, bl, &spos);
5794 }
5795
5796 int FileStore::_merge_collection(const coll_t& cid,
5797 uint32_t bits,
5798 coll_t dest,
5799 const SequencerPosition &spos)
5800 {
5801 dout(15) << __FUNC__ << ": " << cid << " " << dest
5802 << " bits " << bits << dendl;
5803 int r = 0;
5804
5805 if (!collection_exists(cid)) {
5806 dout(2) << __FUNC__ << ": " << cid << " DNE" << dendl;
5807 ceph_assert(replaying);
5808 return 0;
5809 }
5810 if (!collection_exists(dest)) {
5811 dout(2) << __FUNC__ << ": " << dest << " DNE" << dendl;
5812 ceph_assert(replaying);
5813 return 0;
5814 }
5815
5816 // set bits
5817 if (_check_replay_guard(cid, spos) > 0)
5818 _collection_set_bits(dest, bits);
5819
5820 spg_t pgid;
5821 bool is_pg = dest.is_pg(&pgid);
5822 ceph_assert(is_pg);
5823
5824 int dstcmp = _check_replay_guard(dest, spos);
5825 if (dstcmp < 0)
5826 return 0;
5827
5828 int srccmp = _check_replay_guard(cid, spos);
5829 if (srccmp < 0)
5830 return 0;
5831
5832 _set_global_replay_guard(cid, spos);
5833 _set_replay_guard(cid, spos, true);
5834 _set_replay_guard(dest, spos, true);
5835
5836 // main collection
5837 {
5838 Index from;
5839 r = get_index(cid, &from);
5840
5841 Index to;
5842 if (!r)
5843 r = get_index(dest, &to);
5844
5845 if (!r) {
5846 ceph_assert(from.index);
5847 std::unique_lock l1{(from.index)->access_lock};
5848
5849 ceph_assert(to.index);
5850 std::unique_lock l2{(to.index)->access_lock};
5851
5852 r = from->merge(bits, to.index);
5853 }
5854 }
5855
5856 // temp too
5857 {
5858 Index from;
5859 r = get_index(cid.get_temp(), &from);
5860
5861 Index to;
5862 if (!r)
5863 r = get_index(dest.get_temp(), &to);
5864
5865 if (!r) {
5866 ceph_assert(from.index);
5867 std::unique_lock l1{(from.index)->access_lock};
5868
5869 ceph_assert(to.index);
5870 std::unique_lock l2{(to.index)->access_lock};
5871
5872 r = from->merge(bits, to.index);
5873 }
5874 }
5875
5876 // remove source
5877 _destroy_collection(cid);
5878
5879 _close_replay_guard(dest, spos);
5880 _close_replay_guard(dest.get_temp(), spos);
5881 // no need to close guards on cid... it's removed.
5882
5883 if (!r && cct->_conf->filestore_debug_verify_split) {
5884 vector<ghobject_t> objects;
5885 ghobject_t next;
5886 while (1) {
5887 collection_list(
5888 dest,
5889 next, ghobject_t::get_max(),
5890 get_ideal_list_max(),
5891 &objects,
5892 &next);
5893 if (objects.empty())
5894 break;
5895 for (vector<ghobject_t>::iterator i = objects.begin();
5896 i != objects.end();
5897 ++i) {
5898 if (!i->match(bits, pgid.pgid.ps())) {
5899 dout(20) << __FUNC__ << ": " << *i << " does not belong in "
5900 << cid << dendl;
5901 ceph_assert(i->match(bits, pgid.pgid.ps()));
5902 }
5903 }
5904 objects.clear();
5905 }
5906 }
5907
5908 dout(15) << __FUNC__ << ": " << cid << " " << dest << " bits " << bits
5909 << " = " << r << dendl;
5910 return r;
5911 }
5912
5913 int FileStore::_split_collection(const coll_t& cid,
5914 uint32_t bits,
5915 uint32_t rem,
5916 coll_t dest,
5917 const SequencerPosition &spos)
5918 {
5919 int r;
5920 {
5921 dout(15) << __FUNC__ << ": " << cid << " bits: " << bits << dendl;
5922 if (!collection_exists(cid)) {
5923 dout(2) << __FUNC__ << ": " << cid << " DNE" << dendl;
5924 ceph_assert(replaying);
5925 return 0;
5926 }
5927 if (!collection_exists(dest)) {
5928 dout(2) << __FUNC__ << ": " << dest << " DNE" << dendl;
5929 ceph_assert(replaying);
5930 return 0;
5931 }
5932
5933 int dstcmp = _check_replay_guard(dest, spos);
5934 if (dstcmp < 0)
5935 return 0;
5936
5937 int srccmp = _check_replay_guard(cid, spos);
5938 if (srccmp < 0)
5939 return 0;
5940
5941 _set_global_replay_guard(cid, spos);
5942 _set_replay_guard(cid, spos, true);
5943 _set_replay_guard(dest, spos, true);
5944
5945 Index from;
5946 r = get_index(cid, &from);
5947
5948 Index to;
5949 if (!r)
5950 r = get_index(dest, &to);
5951
5952 if (!r) {
5953 ceph_assert(from.index);
5954 std::unique_lock l1{(from.index)->access_lock};
5955
5956 ceph_assert(to.index);
5957 std::unique_lock l2{(to.index)->access_lock};
5958
5959 r = from->split(rem, bits, to.index);
5960 }
5961
5962 _close_replay_guard(cid, spos);
5963 _close_replay_guard(dest, spos);
5964 }
5965 _collection_set_bits(cid, bits);
5966 if (!r && cct->_conf->filestore_debug_verify_split) {
5967 vector<ghobject_t> objects;
5968 ghobject_t next;
5969 while (1) {
5970 collection_list(
5971 cid,
5972 next, ghobject_t::get_max(),
5973 get_ideal_list_max(),
5974 &objects,
5975 &next);
5976 if (objects.empty())
5977 break;
5978 for (vector<ghobject_t>::iterator i = objects.begin();
5979 i != objects.end();
5980 ++i) {
5981 dout(20) << __FUNC__ << ": " << *i << " still in source "
5982 << cid << dendl;
5983 ceph_assert(!i->match(bits, rem));
5984 }
5985 objects.clear();
5986 }
5987 next = ghobject_t();
5988 while (1) {
5989 collection_list(
5990 dest,
5991 next, ghobject_t::get_max(),
5992 get_ideal_list_max(),
5993 &objects,
5994 &next);
5995 if (objects.empty())
5996 break;
5997 for (vector<ghobject_t>::iterator i = objects.begin();
5998 i != objects.end();
5999 ++i) {
6000 dout(20) << __FUNC__ << ": " << *i << " now in dest "
6001 << *i << dendl;
6002 ceph_assert(i->match(bits, rem));
6003 }
6004 objects.clear();
6005 }
6006 }
6007 return r;
6008 }
6009
6010 int FileStore::_set_alloc_hint(const coll_t& cid, const ghobject_t& oid,
6011 uint64_t expected_object_size,
6012 uint64_t expected_write_size)
6013 {
6014 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " object_size " << expected_object_size << " write_size " << expected_write_size << dendl;
6015
6016 FDRef fd;
6017 int ret = 0;
6018
6019 if (expected_object_size == 0 || expected_write_size == 0)
6020 goto out;
6021
6022 ret = lfn_open(cid, oid, false, &fd);
6023 if (ret < 0)
6024 goto out;
6025
6026 {
6027 // TODO: a more elaborate hint calculation
6028 uint64_t hint = std::min<uint64_t>(expected_write_size, m_filestore_max_alloc_hint_size);
6029
6030 ret = backend->set_alloc_hint(**fd, hint);
6031 dout(20) << __FUNC__ << ": hint " << hint << " ret " << ret << dendl;
6032 }
6033
6034 lfn_close(fd);
6035 out:
6036 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " object_size " << expected_object_size << " write_size " << expected_write_size << " = " << ret << dendl;
6037 ceph_assert(!m_filestore_fail_eio || ret != -EIO);
6038 return ret;
6039 }
6040
6041 const char** FileStore::get_tracked_conf_keys() const
6042 {
6043 static const char* KEYS[] = {
6044 "filestore_max_inline_xattr_size",
6045 "filestore_max_inline_xattr_size_xfs",
6046 "filestore_max_inline_xattr_size_btrfs",
6047 "filestore_max_inline_xattr_size_other",
6048 "filestore_max_inline_xattrs",
6049 "filestore_max_inline_xattrs_xfs",
6050 "filestore_max_inline_xattrs_btrfs",
6051 "filestore_max_inline_xattrs_other",
6052 "filestore_max_xattr_value_size",
6053 "filestore_max_xattr_value_size_xfs",
6054 "filestore_max_xattr_value_size_btrfs",
6055 "filestore_max_xattr_value_size_other",
6056 "filestore_min_sync_interval",
6057 "filestore_max_sync_interval",
6058 "filestore_queue_max_ops",
6059 "filestore_queue_max_bytes",
6060 "filestore_expected_throughput_bytes",
6061 "filestore_expected_throughput_ops",
6062 "filestore_queue_low_threshhold",
6063 "filestore_queue_high_threshhold",
6064 "filestore_queue_high_delay_multiple",
6065 "filestore_queue_max_delay_multiple",
6066 "filestore_commit_timeout",
6067 "filestore_dump_file",
6068 "filestore_kill_at",
6069 "filestore_fail_eio",
6070 "filestore_fadvise",
6071 "filestore_sloppy_crc",
6072 "filestore_sloppy_crc_block_size",
6073 "filestore_max_alloc_hint_size",
6074 NULL
6075 };
6076 return KEYS;
6077 }
6078
6079 void FileStore::handle_conf_change(const ConfigProxy& conf,
6080 const std::set <std::string> &changed)
6081 {
6082 if (changed.count("filestore_max_inline_xattr_size") ||
6083 changed.count("filestore_max_inline_xattr_size_xfs") ||
6084 changed.count("filestore_max_inline_xattr_size_btrfs") ||
6085 changed.count("filestore_max_inline_xattr_size_other") ||
6086 changed.count("filestore_max_inline_xattrs") ||
6087 changed.count("filestore_max_inline_xattrs_xfs") ||
6088 changed.count("filestore_max_inline_xattrs_btrfs") ||
6089 changed.count("filestore_max_inline_xattrs_other") ||
6090 changed.count("filestore_max_xattr_value_size") ||
6091 changed.count("filestore_max_xattr_value_size_xfs") ||
6092 changed.count("filestore_max_xattr_value_size_btrfs") ||
6093 changed.count("filestore_max_xattr_value_size_other")) {
6094 if (backend) {
6095 std::lock_guard l(lock);
6096 set_xattr_limits_via_conf();
6097 }
6098 }
6099
6100 if (changed.count("filestore_queue_max_bytes") ||
6101 changed.count("filestore_queue_max_ops") ||
6102 changed.count("filestore_expected_throughput_bytes") ||
6103 changed.count("filestore_expected_throughput_ops") ||
6104 changed.count("filestore_queue_low_threshhold") ||
6105 changed.count("filestore_queue_high_threshhold") ||
6106 changed.count("filestore_queue_high_delay_multiple") ||
6107 changed.count("filestore_queue_max_delay_multiple")) {
6108 std::lock_guard l(lock);
6109 set_throttle_params();
6110 }
6111
6112 if (changed.count("filestore_min_sync_interval") ||
6113 changed.count("filestore_max_sync_interval") ||
6114 changed.count("filestore_kill_at") ||
6115 changed.count("filestore_fail_eio") ||
6116 changed.count("filestore_sloppy_crc") ||
6117 changed.count("filestore_sloppy_crc_block_size") ||
6118 changed.count("filestore_max_alloc_hint_size") ||
6119 changed.count("filestore_fadvise")) {
6120 std::lock_guard l(lock);
6121 m_filestore_min_sync_interval = conf->filestore_min_sync_interval;
6122 m_filestore_max_sync_interval = conf->filestore_max_sync_interval;
6123 m_filestore_kill_at = conf->filestore_kill_at;
6124 m_filestore_fail_eio = conf->filestore_fail_eio;
6125 m_filestore_fadvise = conf->filestore_fadvise;
6126 m_filestore_sloppy_crc = conf->filestore_sloppy_crc;
6127 m_filestore_sloppy_crc_block_size = conf->filestore_sloppy_crc_block_size;
6128 m_filestore_max_alloc_hint_size = conf->filestore_max_alloc_hint_size;
6129 }
6130 if (changed.count("filestore_commit_timeout")) {
6131 std::lock_guard l(sync_entry_timeo_lock);
6132 m_filestore_commit_timeout = conf->filestore_commit_timeout;
6133 }
6134 if (changed.count("filestore_dump_file")) {
6135 if (conf->filestore_dump_file.length() &&
6136 conf->filestore_dump_file != "-") {
6137 dump_start(conf->filestore_dump_file);
6138 } else {
6139 dump_stop();
6140 }
6141 }
6142 }
6143
6144 int FileStore::set_throttle_params()
6145 {
6146 stringstream ss;
6147 bool valid = throttle_bytes.set_params(
6148 cct->_conf->filestore_queue_low_threshhold,
6149 cct->_conf->filestore_queue_high_threshhold,
6150 cct->_conf->filestore_expected_throughput_bytes,
6151 cct->_conf->filestore_queue_high_delay_multiple?
6152 cct->_conf->filestore_queue_high_delay_multiple:
6153 cct->_conf->filestore_queue_high_delay_multiple_bytes,
6154 cct->_conf->filestore_queue_max_delay_multiple?
6155 cct->_conf->filestore_queue_max_delay_multiple:
6156 cct->_conf->filestore_queue_max_delay_multiple_bytes,
6157 cct->_conf->filestore_queue_max_bytes,
6158 &ss);
6159
6160 valid &= throttle_ops.set_params(
6161 cct->_conf->filestore_queue_low_threshhold,
6162 cct->_conf->filestore_queue_high_threshhold,
6163 cct->_conf->filestore_expected_throughput_ops,
6164 cct->_conf->filestore_queue_high_delay_multiple?
6165 cct->_conf->filestore_queue_high_delay_multiple:
6166 cct->_conf->filestore_queue_high_delay_multiple_ops,
6167 cct->_conf->filestore_queue_max_delay_multiple?
6168 cct->_conf->filestore_queue_max_delay_multiple:
6169 cct->_conf->filestore_queue_max_delay_multiple_ops,
6170 cct->_conf->filestore_queue_max_ops,
6171 &ss);
6172
6173 logger->set(l_filestore_op_queue_max_ops, throttle_ops.get_max());
6174 logger->set(l_filestore_op_queue_max_bytes, throttle_bytes.get_max());
6175
6176 if (!valid) {
6177 derr << "tried to set invalid params: "
6178 << ss.str()
6179 << dendl;
6180 }
6181 return valid ? 0 : -EINVAL;
6182 }
6183
6184 void FileStore::dump_start(const std::string& file)
6185 {
6186 dout(10) << __FUNC__ << ": " << file << dendl;
6187 if (m_filestore_do_dump) {
6188 dump_stop();
6189 }
6190 m_filestore_dump_fmt.reset();
6191 m_filestore_dump_fmt.open_array_section("dump");
6192 m_filestore_dump.open(file.c_str());
6193 m_filestore_do_dump = true;
6194 }
6195
6196 void FileStore::dump_stop()
6197 {
6198 dout(10) << __FUNC__ << dendl;
6199 m_filestore_do_dump = false;
6200 if (m_filestore_dump.is_open()) {
6201 m_filestore_dump_fmt.close_section();
6202 m_filestore_dump_fmt.flush(m_filestore_dump);
6203 m_filestore_dump.flush();
6204 m_filestore_dump.close();
6205 }
6206 }
6207
6208 void FileStore::dump_transactions(vector<ObjectStore::Transaction>& ls, uint64_t seq, OpSequencer *osr)
6209 {
6210 m_filestore_dump_fmt.open_array_section("transactions");
6211 unsigned trans_num = 0;
6212 for (vector<ObjectStore::Transaction>::iterator i = ls.begin(); i != ls.end(); ++i, ++trans_num) {
6213 m_filestore_dump_fmt.open_object_section("transaction");
6214 m_filestore_dump_fmt.dump_stream("osr") << osr->cid;
6215 m_filestore_dump_fmt.dump_unsigned("seq", seq);
6216 m_filestore_dump_fmt.dump_unsigned("trans_num", trans_num);
6217 (*i).dump(&m_filestore_dump_fmt);
6218 m_filestore_dump_fmt.close_section();
6219 }
6220 m_filestore_dump_fmt.close_section();
6221 m_filestore_dump_fmt.flush(m_filestore_dump);
6222 m_filestore_dump.flush();
6223 }
6224
6225 void FileStore::get_db_statistics(Formatter* f)
6226 {
6227 object_map->db->get_statistics(f);
6228 }
6229
6230 void FileStore::set_xattr_limits_via_conf()
6231 {
6232 uint32_t fs_xattr_size;
6233 uint32_t fs_xattrs;
6234 uint32_t fs_xattr_max_value_size;
6235
6236 switch (m_fs_type) {
6237 #if defined(__linux__)
6238 case XFS_SUPER_MAGIC:
6239 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_xfs;
6240 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_xfs;
6241 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_xfs;
6242 break;
6243 case BTRFS_SUPER_MAGIC:
6244 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_btrfs;
6245 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_btrfs;
6246 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_btrfs;
6247 break;
6248 #endif
6249 default:
6250 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_other;
6251 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_other;
6252 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_other;
6253 break;
6254 }
6255
6256 // Use override value if set
6257 if (cct->_conf->filestore_max_inline_xattr_size)
6258 m_filestore_max_inline_xattr_size = cct->_conf->filestore_max_inline_xattr_size;
6259 else
6260 m_filestore_max_inline_xattr_size = fs_xattr_size;
6261
6262 // Use override value if set
6263 if (cct->_conf->filestore_max_inline_xattrs)
6264 m_filestore_max_inline_xattrs = cct->_conf->filestore_max_inline_xattrs;
6265 else
6266 m_filestore_max_inline_xattrs = fs_xattrs;
6267
6268 // Use override value if set
6269 if (cct->_conf->filestore_max_xattr_value_size)
6270 m_filestore_max_xattr_value_size = cct->_conf->filestore_max_xattr_value_size;
6271 else
6272 m_filestore_max_xattr_value_size = fs_xattr_max_value_size;
6273
6274 if (m_filestore_max_xattr_value_size < cct->_conf->osd_max_object_name_len) {
6275 derr << "WARNING: max attr value size ("
6276 << m_filestore_max_xattr_value_size
6277 << ") is smaller than osd_max_object_name_len ("
6278 << cct->_conf->osd_max_object_name_len
6279 << "). Your backend filesystem appears to not support attrs large "
6280 << "enough to handle the configured max rados name size. You may get "
6281 << "unexpected ENAMETOOLONG errors on rados operations or buggy "
6282 << "behavior"
6283 << dendl;
6284 }
6285 }
6286
6287 uint64_t FileStore::estimate_objects_overhead(uint64_t num_objects)
6288 {
6289 uint64_t res = num_objects * blk_size / 2; //assumes that each object uses ( in average ) additional 1/2 block due to FS allocation granularity.
6290 return res;
6291 }
6292
6293 int FileStore::apply_layout_settings(const coll_t &cid, int target_level)
6294 {
6295 dout(20) << __FUNC__ << ": " << cid << " target level: "
6296 << target_level << dendl;
6297 Index index;
6298 int r = get_index(cid, &index);
6299 if (r < 0) {
6300 dout(10) << "Error getting index for " << cid << ": " << cpp_strerror(r)
6301 << dendl;
6302 return r;
6303 }
6304
6305 return index->apply_layout_settings(target_level);
6306 }
6307
6308
6309 // -- FSSuperblock --
6310
6311 void FSSuperblock::encode(bufferlist &bl) const
6312 {
6313 ENCODE_START(2, 1, bl);
6314 compat_features.encode(bl);
6315 encode(omap_backend, bl);
6316 ENCODE_FINISH(bl);
6317 }
6318
6319 void FSSuperblock::decode(bufferlist::const_iterator &bl)
6320 {
6321 DECODE_START(2, bl);
6322 compat_features.decode(bl);
6323 if (struct_v >= 2)
6324 decode(omap_backend, bl);
6325 else
6326 omap_backend = "leveldb";
6327 DECODE_FINISH(bl);
6328 }
6329
6330 void FSSuperblock::dump(Formatter *f) const
6331 {
6332 f->open_object_section("compat");
6333 compat_features.dump(f);
6334 f->dump_string("omap_backend", omap_backend);
6335 f->close_section();
6336 }
6337
6338 void FSSuperblock::generate_test_instances(list<FSSuperblock*>& o)
6339 {
6340 FSSuperblock z;
6341 o.push_back(new FSSuperblock(z));
6342 CompatSet::FeatureSet feature_compat;
6343 CompatSet::FeatureSet feature_ro_compat;
6344 CompatSet::FeatureSet feature_incompat;
6345 feature_incompat.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS);
6346 z.compat_features = CompatSet(feature_compat, feature_ro_compat,
6347 feature_incompat);
6348 o.push_back(new FSSuperblock(z));
6349 z.omap_backend = "rocksdb";
6350 o.push_back(new FSSuperblock(z));
6351 }
6352
6353 #undef dout_prefix
6354 #define dout_prefix *_dout << "filestore.osr(" << this << ") "
6355
6356 void FileStore::OpSequencer::_register_apply(Op *o)
6357 {
6358 if (o->registered_apply) {
6359 dout(20) << __func__ << " " << o << " already registered" << dendl;
6360 return;
6361 }
6362 o->registered_apply = true;
6363 for (auto& t : o->tls) {
6364 for (auto& i : t.get_object_index()) {
6365 uint32_t key = i.first.hobj.get_hash();
6366 applying.emplace(make_pair(key, &i.first));
6367 dout(20) << __func__ << " " << o << " " << i.first << " ("
6368 << &i.first << ")" << dendl;
6369 }
6370 }
6371 }
6372
6373 void FileStore::OpSequencer::_unregister_apply(Op *o)
6374 {
6375 ceph_assert(o->registered_apply);
6376 for (auto& t : o->tls) {
6377 for (auto& i : t.get_object_index()) {
6378 uint32_t key = i.first.hobj.get_hash();
6379 auto p = applying.find(key);
6380 bool removed = false;
6381 while (p != applying.end() &&
6382 p->first == key) {
6383 if (p->second == &i.first) {
6384 dout(20) << __func__ << " " << o << " " << i.first << " ("
6385 << &i.first << ")" << dendl;
6386 applying.erase(p);
6387 removed = true;
6388 break;
6389 }
6390 ++p;
6391 }
6392 ceph_assert(removed);
6393 }
6394 }
6395 }
6396
6397 void FileStore::OpSequencer::wait_for_apply(const ghobject_t& oid)
6398 {
6399 std::unique_lock l{qlock};
6400 uint32_t key = oid.hobj.get_hash();
6401 retry:
6402 while (true) {
6403 // search all items in hash slot for a matching object
6404 auto p = applying.find(key);
6405 while (p != applying.end() &&
6406 p->first == key) {
6407 if (*p->second == oid) {
6408 dout(20) << __func__ << " " << oid << " waiting on " << p->second
6409 << dendl;
6410 cond.wait(l);
6411 goto retry;
6412 }
6413 ++p;
6414 }
6415 break;
6416 }
6417 dout(20) << __func__ << " " << oid << " done" << dendl;
6418 }