1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
15 #include "include/compat.h"
16 #include "include/int_types.h"
17 #include "boost/tuple/tuple.hpp"
21 #include <sys/types.h>
27 #include <sys/ioctl.h>
29 #if defined(__linux__)
31 #include <linux/falloc.h>
37 #include "include/linux_fiemap.h"
39 #include "chain_xattr.h"
41 #if defined(__APPLE__) || defined(__FreeBSD__)
42 #include <sys/param.h>
43 #include <sys/mount.h>
50 #include "FileStore.h"
51 #include "GenericFileStoreBackend.h"
52 #include "BtrfsFileStoreBackend.h"
53 #include "XfsFileStoreBackend.h"
54 #include "ZFSFileStoreBackend.h"
55 #include "common/BackTrace.h"
56 #include "include/types.h"
57 #include "FileJournal.h"
59 #include "osd/osd_types.h"
60 #include "include/color.h"
61 #include "include/buffer.h"
63 #include "common/Timer.h"
64 #include "common/debug.h"
65 #include "common/errno.h"
66 #include "common/run_cmd.h"
67 #include "common/safe_io.h"
68 #include "common/perf_counters.h"
69 #include "common/sync_filesystem.h"
70 #include "common/fd.h"
71 #include "HashIndex.h"
72 #include "DBObjectMap.h"
73 #include "kv/KeyValueDB.h"
75 #include "common/ceph_crypto.h"
77 #include "include/ceph_assert.h"
79 #include "common/config.h"
80 #include "common/blkdev.h"
83 #define TRACEPOINT_DEFINE
84 #define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
85 #include "tracing/objectstore.h"
86 #undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE
87 #undef TRACEPOINT_DEFINE
89 #define tracepoint(...)
92 #define dout_context cct
93 #define dout_subsys ceph_subsys_filestore
95 #define dout_prefix *_dout << "filestore(" << basedir << ") "
97 #define COMMIT_SNAP_ITEM "snap_%llu"
98 #define CLUSTER_SNAP_ITEM "clustersnap_%s"
100 #define REPLAY_GUARD_XATTR "user.cephos.seq"
101 #define GLOBAL_REPLAY_GUARD_XATTR "user.cephos.gseq"
103 // XATTR_SPILL_OUT_NAME as a xattr is used to maintain that indicates whether
104 // xattrs spill over into DBObjectMap, if XATTR_SPILL_OUT_NAME exists in file
105 // xattrs and the value is "no", it indicates no xattrs in DBObjectMap
106 #define XATTR_SPILL_OUT_NAME "user.cephos.spill_out"
107 #define XATTR_NO_SPILL_OUT "0"
108 #define XATTR_SPILL_OUT "1"
109 #define __FUNC__ __func__ << "(" << __LINE__ << ")"
113 using std::make_pair
;
116 using std::ostringstream
;
119 using std::stringstream
;
122 using ceph::crypto::SHA1
;
123 using ceph::BackTrace
;
124 using ceph::bufferlist
;
125 using ceph::bufferptr
;
128 using ceph::Formatter
;
129 using ceph::JSONFormatter
;
131 //Initial features in new superblock.
132 static CompatSet
get_fs_initial_compat_set() {
133 CompatSet::FeatureSet ceph_osd_feature_compat
;
134 CompatSet::FeatureSet ceph_osd_feature_ro_compat
;
135 CompatSet::FeatureSet ceph_osd_feature_incompat
;
136 return CompatSet(ceph_osd_feature_compat
, ceph_osd_feature_ro_compat
,
137 ceph_osd_feature_incompat
);
140 //Features are added here that this FileStore supports.
141 static CompatSet
get_fs_supported_compat_set() {
142 CompatSet compat
= get_fs_initial_compat_set();
143 //Any features here can be set in code, but not in initial superblock
144 compat
.incompat
.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS
);
148 int FileStore::validate_hobject_key(const hobject_t
&obj
) const
150 unsigned len
= LFNIndex::get_max_escaped_name_len(obj
);
151 return len
> m_filestore_max_xattr_value_size
? -ENAMETOOLONG
: 0;
154 int FileStore::get_block_device_fsid(CephContext
* cct
, const string
& path
,
157 // make sure we don't try to use aio or direct_io (and get annoying
158 // error messages from failing to do so); performance implications
159 // should be irrelevant for this use
160 FileJournal
j(cct
, *fsid
, 0, 0, path
.c_str(), false, false);
161 return j
.peek_fsid(*fsid
);
164 void FileStore::FSPerfTracker::update_from_perfcounters(
165 PerfCounters
&logger
)
167 os_commit_latency_ns
.consume_next(
169 l_filestore_journal_latency
));
170 os_apply_latency_ns
.consume_next(
172 l_filestore_apply_latency
));
176 ostream
& operator<<(ostream
& out
, const FileStore::OpSequencer
& s
)
178 return out
<< "osr(" << s
.cid
<< ")";
181 int FileStore::get_cdir(const coll_t
& cid
, char *s
, int len
)
183 const string
&cid_str(cid
.to_str());
184 return snprintf(s
, len
, "%s/current/%s", basedir
.c_str(), cid_str
.c_str());
187 void FileStore::handle_eio()
189 // don't try to map this back to an offset; too hard since there is
190 // a file system in between. we also don't really know whether this
191 // was a read or a write, since we have so many layers beneath us.
193 note_io_error_event(devname
.c_str(), basedir
.c_str(), -EIO
, 0, 0, 0);
194 ceph_abort_msg("unexpected eio error");
197 int FileStore::get_index(const coll_t
& cid
, Index
*index
)
199 int r
= index_manager
.get_index(cid
, basedir
, index
);
200 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
204 int FileStore::init_index(const coll_t
& cid
)
207 get_cdir(cid
, path
, sizeof(path
));
208 int r
= index_manager
.init_index(cid
, path
, target_version
);
209 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
213 int FileStore::lfn_find(const ghobject_t
& oid
, const Index
& index
, IndexedPath
*path
)
219 ceph_assert(index
.index
);
220 r
= (index
.index
)->lookup(oid
, path
, &exist
);
222 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
230 int FileStore::lfn_truncate(const coll_t
& cid
, const ghobject_t
& oid
, off_t length
)
233 int r
= lfn_open(cid
, oid
, false, &fd
);
236 r
= ::ftruncate(**fd
, length
);
239 if (r
>= 0 && m_filestore_sloppy_crc
) {
240 int rc
= backend
->_crc_update_truncate(**fd
, length
);
241 ceph_assert(rc
>= 0);
244 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
248 int FileStore::lfn_stat(const coll_t
& cid
, const ghobject_t
& oid
, struct stat
*buf
)
252 int r
= get_index(cid
, &index
);
256 ceph_assert(index
.index
);
257 std::shared_lock l
{(index
.index
)->access_lock
};
259 r
= lfn_find(oid
, index
, &path
);
262 r
= ::stat(path
->path(), buf
);
268 int FileStore::lfn_open(const coll_t
& cid
,
269 const ghobject_t
& oid
,
276 bool need_lock
= true;
281 if (cct
->_conf
->filestore_odsync_write
) {
289 if (!((*index
).index
)) {
290 r
= get_index(cid
, index
);
292 dout(10) << __FUNC__
<< ": could not get index r = " << r
<< dendl
;
300 ceph_assert((*index
).index
);
302 ((*index
).index
)->access_lock
.lock();
305 *outfd
= fdcache
.lookup(oid
);
308 ((*index
).index
)->access_lock
.unlock();
316 IndexedPath
*path
= &path2
;
318 r
= (*index
)->lookup(oid
, path
, &exist
);
320 derr
<< "could not find " << oid
<< " in index: "
321 << cpp_strerror(-r
) << dendl
;
325 r
= ::open((*path
)->path(), flags
|O_CLOEXEC
, 0644);
328 dout(10) << "error opening file " << (*path
)->path() << " with flags="
329 << flags
<< ": " << cpp_strerror(-r
) << dendl
;
333 if (create
&& (!exist
)) {
334 r
= (*index
)->created(oid
, (*path
)->path());
336 VOID_TEMP_FAILURE_RETRY(::close(fd
));
337 derr
<< "error creating " << oid
<< " (" << (*path
)->path()
338 << ") in index: " << cpp_strerror(-r
) << dendl
;
341 r
= chain_fsetxattr
<true, true>(
342 fd
, XATTR_SPILL_OUT_NAME
,
343 XATTR_NO_SPILL_OUT
, sizeof(XATTR_NO_SPILL_OUT
));
345 VOID_TEMP_FAILURE_RETRY(::close(fd
));
346 derr
<< "error setting spillout xattr for oid " << oid
<< " (" << (*path
)->path()
347 << "):" << cpp_strerror(-r
) << dendl
;
354 *outfd
= fdcache
.add(oid
, fd
, &existed
);
356 TEMP_FAILURE_RETRY(::close(fd
));
359 *outfd
= std::make_shared
<FDCache::FD
>(fd
);
363 ((*index
).index
)->access_lock
.unlock();
371 ((*index
).index
)->access_lock
.unlock();
374 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
378 void FileStore::lfn_close(FDRef fd
)
382 int FileStore::lfn_link(const coll_t
& c
, const coll_t
& newcid
, const ghobject_t
& o
, const ghobject_t
& newoid
)
384 Index index_new
, index_old
;
385 IndexedPath path_new
, path_old
;
388 bool index_same
= false;
390 r
= get_index(newcid
, &index_new
);
393 r
= get_index(c
, &index_old
);
396 } else if (c
== newcid
) {
397 r
= get_index(c
, &index_old
);
400 index_new
= index_old
;
403 r
= get_index(c
, &index_old
);
406 r
= get_index(newcid
, &index_new
);
411 ceph_assert(index_old
.index
);
412 ceph_assert(index_new
.index
);
416 std::shared_lock l1
{(index_old
.index
)->access_lock
};
418 r
= index_old
->lookup(o
, &path_old
, &exist
);
420 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
426 std::unique_lock l2
{(index_new
.index
)->access_lock
};
428 r
= index_new
->lookup(newoid
, &path_new
, &exist
);
430 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
436 dout(25) << __FUNC__
<< ": path_old: " << path_old
<< dendl
;
437 dout(25) << __FUNC__
<< ": path_new: " << path_new
<< dendl
;
438 r
= ::link(path_old
->path(), path_new
->path());
442 r
= index_new
->created(newoid
, path_new
->path());
444 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
448 std::unique_lock l1
{(index_old
.index
)->access_lock
};
450 r
= index_old
->lookup(o
, &path_old
, &exist
);
452 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
458 r
= index_new
->lookup(newoid
, &path_new
, &exist
);
460 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
466 dout(25) << __FUNC__
<< ": path_old: " << path_old
<< dendl
;
467 dout(25) << __FUNC__
<< ": path_new: " << path_new
<< dendl
;
468 r
= ::link(path_old
->path(), path_new
->path());
472 // make sure old fd for unlinked/overwritten file is gone
473 fdcache
.clear(newoid
);
475 r
= index_new
->created(newoid
, path_new
->path());
477 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
484 int FileStore::lfn_unlink(const coll_t
& cid
, const ghobject_t
& o
,
485 const SequencerPosition
&spos
,
486 bool force_clear_omap
)
489 int r
= get_index(cid
, &index
);
491 dout(25) << __FUNC__
<< ": get_index failed " << cpp_strerror(r
) << dendl
;
495 ceph_assert(index
.index
);
496 std::unique_lock l
{(index
.index
)->access_lock
};
501 r
= index
->lookup(o
, &path
, &hardlink
);
503 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
507 if (!force_clear_omap
) {
508 if (hardlink
== 0 || hardlink
== 1) {
509 force_clear_omap
= true;
512 if (force_clear_omap
) {
513 dout(20) << __FUNC__
<< ": clearing omap on " << o
514 << " in cid " << cid
<< dendl
;
515 r
= object_map
->clear(o
, &spos
);
516 if (r
< 0 && r
!= -ENOENT
) {
517 dout(25) << __FUNC__
<< ": omap clear failed " << cpp_strerror(r
) << dendl
;
518 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
521 if (cct
->_conf
->filestore_debug_inject_read_err
) {
522 debug_obj_on_delete(o
);
524 if (!m_disable_wbthrottle
) {
525 wbthrottle
.clear_object(o
); // should be only non-cache ref
529 /* Ensure that replay of this op doesn't result in the object_map
532 if (!backend
->can_checkpoint())
533 object_map
->sync(&o
, &spos
);
536 if (!m_disable_wbthrottle
) {
537 wbthrottle
.clear_object(o
); // should be only non-cache ref
542 r
= index
->unlink(o
);
544 dout(25) << __FUNC__
<< ": index unlink failed " << cpp_strerror(r
) << dendl
;
550 FileStore::FileStore(CephContext
* cct
, const std::string
&base
,
551 const std::string
&jdev
, osflagbits_t flags
,
552 const char *name
, bool do_update
) :
553 JournalingObjectStore(cct
, base
),
555 basedir(base
), journalpath(jdev
),
556 generic_flags(flags
),
558 fsid_fd(-1), op_fd(-1),
559 basedir_fd(-1), current_fd(-1),
561 index_manager(cct
, do_update
),
563 timer(cct
, sync_entry_timeo_lock
),
564 stop(false), sync_thread(this),
568 m_disable_wbthrottle(cct
->_conf
->filestore_odsync_write
||
569 !cct
->_conf
->filestore_wbthrottle_enable
),
570 throttle_ops(cct
, "filestore_ops", cct
->_conf
->filestore_caller_concurrency
),
571 throttle_bytes(cct
, "filestore_bytes", cct
->_conf
->filestore_caller_concurrency
),
572 m_ondisk_finisher_num(cct
->_conf
->filestore_ondisk_finisher_threads
),
573 m_apply_finisher_num(cct
->_conf
->filestore_apply_finisher_threads
),
574 op_tp(cct
, "FileStore::op_tp", "tp_fstore_op", cct
->_conf
->filestore_op_threads
, "filestore_op_threads"),
576 ceph::make_timespan(cct
->_conf
->filestore_op_thread_timeout
),
577 ceph::make_timespan(cct
->_conf
->filestore_op_thread_suicide_timeout
),
580 trace_endpoint("0.0.0.0", 0, "FileStore"),
581 m_filestore_commit_timeout(cct
->_conf
->filestore_commit_timeout
),
582 m_filestore_journal_parallel(cct
->_conf
->filestore_journal_parallel
),
583 m_filestore_journal_trailing(cct
->_conf
->filestore_journal_trailing
),
584 m_filestore_journal_writeahead(cct
->_conf
->filestore_journal_writeahead
),
585 m_filestore_fiemap_threshold(cct
->_conf
->filestore_fiemap_threshold
),
586 m_filestore_max_sync_interval(cct
->_conf
->filestore_max_sync_interval
),
587 m_filestore_min_sync_interval(cct
->_conf
->filestore_min_sync_interval
),
588 m_filestore_fail_eio(cct
->_conf
->filestore_fail_eio
),
589 m_filestore_fadvise(cct
->_conf
->filestore_fadvise
),
590 do_update(do_update
),
591 m_journal_dio(cct
->_conf
->journal_dio
),
592 m_journal_aio(cct
->_conf
->journal_aio
),
593 m_journal_force_aio(cct
->_conf
->journal_force_aio
),
594 m_osd_rollback_to_cluster_snap(cct
->_conf
->osd_rollback_to_cluster_snap
),
595 m_osd_use_stale_snap(cct
->_conf
->osd_use_stale_snap
),
596 m_filestore_do_dump(false),
597 m_filestore_dump_fmt(true),
598 m_filestore_sloppy_crc(cct
->_conf
->filestore_sloppy_crc
),
599 m_filestore_sloppy_crc_block_size(cct
->_conf
->filestore_sloppy_crc_block_size
),
600 m_filestore_max_alloc_hint_size(cct
->_conf
->filestore_max_alloc_hint_size
),
602 m_filestore_max_inline_xattr_size(0),
603 m_filestore_max_inline_xattrs(0),
604 m_filestore_max_xattr_value_size(0)
606 m_filestore_kill_at
= cct
->_conf
->filestore_kill_at
;
607 for (int i
= 0; i
< m_ondisk_finisher_num
; ++i
) {
609 oss
<< "filestore-ondisk-" << i
;
610 Finisher
*f
= new Finisher(cct
, oss
.str(), "fn_odsk_fstore");
611 ondisk_finishers
.push_back(f
);
613 for (int i
= 0; i
< m_apply_finisher_num
; ++i
) {
615 oss
<< "filestore-apply-" << i
;
616 Finisher
*f
= new Finisher(cct
, oss
.str(), "fn_appl_fstore");
617 apply_finishers
.push_back(f
);
621 oss
<< basedir
<< "/current";
622 current_fn
= oss
.str();
625 sss
<< basedir
<< "/current/commit_op_seq";
626 current_op_seq_fn
= sss
.str();
629 if (cct
->_conf
->filestore_omap_backend_path
!= "") {
630 omap_dir
= cct
->_conf
->filestore_omap_backend_path
;
632 omss
<< basedir
<< "/current/omap";
633 omap_dir
= omss
.str();
637 PerfCountersBuilder
plb(cct
, internal_name
, l_filestore_first
, l_filestore_last
);
639 plb
.add_u64(l_filestore_journal_queue_ops
, "journal_queue_ops", "Operations in journal queue");
640 plb
.add_u64(l_filestore_journal_ops
, "journal_ops", "Active journal entries to be applied");
641 plb
.add_u64(l_filestore_journal_queue_bytes
, "journal_queue_bytes", "Size of journal queue");
642 plb
.add_u64(l_filestore_journal_bytes
, "journal_bytes", "Active journal operation size to be applied");
643 plb
.add_time_avg(l_filestore_journal_latency
, "journal_latency", "Average journal queue completing latency",
644 NULL
, PerfCountersBuilder::PRIO_USEFUL
);
645 plb
.add_u64_counter(l_filestore_journal_wr
, "journal_wr", "Journal write IOs");
646 plb
.add_u64_avg(l_filestore_journal_wr_bytes
, "journal_wr_bytes", "Journal data written");
647 plb
.add_u64(l_filestore_op_queue_max_ops
, "op_queue_max_ops", "Max operations in writing to FS queue");
648 plb
.add_u64(l_filestore_op_queue_ops
, "op_queue_ops", "Operations in writing to FS queue");
649 plb
.add_u64_counter(l_filestore_ops
, "ops", "Operations written to store");
650 plb
.add_u64(l_filestore_op_queue_max_bytes
, "op_queue_max_bytes", "Max data in writing to FS queue");
651 plb
.add_u64(l_filestore_op_queue_bytes
, "op_queue_bytes", "Size of writing to FS queue");
652 plb
.add_u64_counter(l_filestore_bytes
, "bytes", "Data written to store");
653 plb
.add_time_avg(l_filestore_apply_latency
, "apply_latency", "Apply latency");
654 plb
.add_u64(l_filestore_committing
, "committing", "Is currently committing");
656 plb
.add_u64_counter(l_filestore_commitcycle
, "commitcycle", "Commit cycles");
657 plb
.add_time_avg(l_filestore_commitcycle_interval
, "commitcycle_interval", "Average interval between commits");
658 plb
.add_time_avg(l_filestore_commitcycle_latency
, "commitcycle_latency", "Average latency of commit");
659 plb
.add_u64_counter(l_filestore_journal_full
, "journal_full", "Journal writes while full");
660 plb
.add_time_avg(l_filestore_queue_transaction_latency_avg
, "queue_transaction_latency_avg",
661 "Store operation queue latency", NULL
, PerfCountersBuilder::PRIO_USEFUL
);
662 plb
.add_time(l_filestore_sync_pause_max_lat
, "sync_pause_max_latency", "Max latency of op_wq pause before syncfs");
664 logger
= plb
.create_perf_counters();
666 cct
->get_perfcounters_collection()->add(logger
);
667 cct
->_conf
.add_observer(this);
669 superblock
.compat_features
= get_fs_initial_compat_set();
672 FileStore::~FileStore()
674 for (auto it
= ondisk_finishers
.begin(); it
!= ondisk_finishers
.end(); ++it
) {
678 for (auto it
= apply_finishers
.begin(); it
!= apply_finishers
.end(); ++it
) {
682 cct
->_conf
.remove_observer(this);
683 cct
->get_perfcounters_collection()->remove(logger
);
686 journal
->logger
= nullptr;
690 if (m_filestore_do_dump
) {
695 static void get_attrname(const char *name
, char *buf
, int len
)
697 snprintf(buf
, len
, "user.ceph.%s", name
);
700 bool parse_attrname(char **name
)
702 if (strncmp(*name
, "user.ceph.", 10) == 0) {
709 void FileStore::collect_metadata(map
<string
,string
> *pm
)
711 char partition_path
[PATH_MAX
];
712 char dev_node
[PATH_MAX
];
714 (*pm
)["filestore_backend"] = backend
->get_name();
716 ss
<< "0x" << std::hex
<< m_fs_type
<< std::dec
;
717 (*pm
)["filestore_f_type"] = ss
.str();
719 if (cct
->_conf
->filestore_collect_device_partition_information
) {
721 BlkDev
blkdev(fsid_fd
);
722 if (rc
= blkdev
.partition(partition_path
, PATH_MAX
); rc
) {
723 (*pm
)["backend_filestore_partition_path"] = "unknown";
725 (*pm
)["backend_filestore_partition_path"] = string(partition_path
);
727 if (rc
= blkdev
.wholedisk(dev_node
, PATH_MAX
); rc
) {
728 (*pm
)["backend_filestore_dev_node"] = "unknown";
730 (*pm
)["backend_filestore_dev_node"] = string(dev_node
);
733 if (rc
== 0 && vdo_fd
>= 0) {
734 (*pm
)["vdo"] = "true";
735 (*pm
)["vdo_physical_size"] =
736 stringify(4096 * get_vdo_stat(vdo_fd
, "physical_blocks"));
739 journal
->collect_metadata(pm
);
744 int FileStore::get_devices(set
<string
> *ls
)
747 BlkDev
blkdev(fsid_fd
);
748 if (int rc
= blkdev
.wholedisk(&dev_node
); rc
) {
751 get_raw_devices(dev_node
, ls
);
753 journal
->get_devices(ls
);
758 int FileStore::statfs(struct store_statfs_t
*buf0
, osd_alert_list_t
* alerts
)
763 alerts
->clear(); // returns nothing for now
765 if (::statfs(basedir
.c_str(), &buf
) < 0) {
767 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
768 ceph_assert(r
!= -ENOENT
);
772 uint64_t bfree
= buf
.f_bavail
* buf
.f_bsize
;
774 // assume all of leveldb/rocksdb is omap.
776 map
<string
,uint64_t> kv_usage
;
777 buf0
->omap_allocated
+= object_map
->get_db()->get_estimated_size(kv_usage
);
780 uint64_t thin_total
, thin_avail
;
781 if (get_vdo_utilization(vdo_fd
, &thin_total
, &thin_avail
)) {
782 buf0
->total
= thin_total
;
783 bfree
= std::min(bfree
, thin_avail
);
784 buf0
->allocated
= thin_total
- thin_avail
;
785 buf0
->data_stored
= bfree
;
787 buf0
->total
= buf
.f_blocks
* buf
.f_bsize
;
788 buf0
->allocated
= bfree
;
789 buf0
->data_stored
= bfree
;
791 buf0
->available
= bfree
;
793 // FIXME: we don't know how to populate buf->internal_metadata; XFS doesn't
794 // tell us what its internal overhead is.
796 // Adjust for writes pending in the journal
798 uint64_t estimate
= journal
->get_journal_size_estimate();
799 buf0
->internally_reserved
= estimate
;
800 if (buf0
->available
> estimate
)
801 buf0
->available
-= estimate
;
809 int FileStore::pool_statfs(uint64_t pool_id
, struct store_statfs_t
*buf
,
815 void FileStore::new_journal()
817 if (journalpath
.length()) {
818 dout(10) << "open_journal at " << journalpath
<< dendl
;
819 journal
= new FileJournal(cct
, fsid
, &finisher
, &sync_cond
,
821 m_journal_dio
, m_journal_aio
,
822 m_journal_force_aio
);
824 journal
->logger
= logger
;
829 int FileStore::dump_journal(ostream
& out
)
833 if (!journalpath
.length())
836 FileJournal
*journal
= new FileJournal(cct
, fsid
, &finisher
, &sync_cond
, journalpath
.c_str(), m_journal_dio
);
837 r
= journal
->dump(out
);
843 FileStoreBackend
*FileStoreBackend::create(unsigned long f_type
, FileStore
*fs
)
846 #if defined(__linux__)
847 case BTRFS_SUPER_MAGIC
:
848 return new BtrfsFileStoreBackend(fs
);
850 case XFS_SUPER_MAGIC
:
851 return new XfsFileStoreBackend(fs
);
855 case ZFS_SUPER_MAGIC
:
856 return new ZFSFileStoreBackend(fs
);
859 return new GenericFileStoreBackend(fs
);
863 void FileStore::create_backend(unsigned long f_type
)
867 ceph_assert(!backend
);
868 backend
= FileStoreBackend::create(f_type
, this);
870 dout(0) << "backend " << backend
->get_name()
871 << " (magic 0x" << std::hex
<< f_type
<< std::dec
<< ")"
875 #if defined(__linux__)
876 case BTRFS_SUPER_MAGIC
:
877 if (!m_disable_wbthrottle
){
878 wbthrottle
.set_fs(WBThrottle::BTRFS
);
882 case XFS_SUPER_MAGIC
:
883 // wbthrottle is constructed with fs(WBThrottle::XFS)
888 set_xattr_limits_via_conf();
891 int FileStore::mkfs()
894 char fsid_fn
[PATH_MAX
];
897 uuid_d old_omap_fsid
;
899 dout(1) << "mkfs in " << basedir
<< dendl
;
900 basedir_fd
= ::open(basedir
.c_str(), O_RDONLY
|O_CLOEXEC
);
901 if (basedir_fd
< 0) {
903 derr
<< __FUNC__
<< ": failed to open base dir " << basedir
<< ": " << cpp_strerror(ret
) << dendl
;
908 snprintf(fsid_fn
, sizeof(fsid_fn
), "%s/fsid", basedir
.c_str());
909 fsid_fd
= ::open(fsid_fn
, O_RDWR
|O_CREAT
|O_CLOEXEC
, 0644);
912 derr
<< __FUNC__
<< ": failed to open " << fsid_fn
<< ": " << cpp_strerror(ret
) << dendl
;
913 goto close_basedir_fd
;
916 if (lock_fsid() < 0) {
921 if (read_fsid(fsid_fd
, &old_fsid
) < 0 || old_fsid
.is_zero()) {
922 if (fsid
.is_zero()) {
923 fsid
.generate_random();
924 dout(1) << __FUNC__
<< ": generated fsid " << fsid
<< dendl
;
926 dout(1) << __FUNC__
<< ": using provided fsid " << fsid
<< dendl
;
929 fsid
.print(fsid_str
);
930 strcat(fsid_str
, "\n");
931 ret
= ::ftruncate(fsid_fd
, 0);
934 derr
<< __FUNC__
<< ": failed to truncate fsid: "
935 << cpp_strerror(ret
) << dendl
;
938 ret
= safe_write(fsid_fd
, fsid_str
, strlen(fsid_str
));
940 derr
<< __FUNC__
<< ": failed to write fsid: "
941 << cpp_strerror(ret
) << dendl
;
944 if (::fsync(fsid_fd
) < 0) {
946 derr
<< __FUNC__
<< ": close failed: can't write fsid: "
947 << cpp_strerror(ret
) << dendl
;
950 dout(10) << __FUNC__
<< ": fsid is " << fsid
<< dendl
;
952 if (!fsid
.is_zero() && fsid
!= old_fsid
) {
953 derr
<< __FUNC__
<< ": on-disk fsid " << old_fsid
<< " != provided " << fsid
<< dendl
;
958 dout(1) << __FUNC__
<< ": fsid is already set to " << fsid
<< dendl
;
962 ret
= write_version_stamp();
964 derr
<< __FUNC__
<< ": write_version_stamp() failed: "
965 << cpp_strerror(ret
) << dendl
;
970 superblock
.omap_backend
= cct
->_conf
->filestore_omap_backend
;
971 ret
= write_superblock();
973 derr
<< __FUNC__
<< ": write_superblock() failed: "
974 << cpp_strerror(ret
) << dendl
;
978 struct statfs basefs
;
979 ret
= ::fstatfs(basedir_fd
, &basefs
);
982 derr
<< __FUNC__
<< ": cannot fstatfs basedir "
983 << cpp_strerror(ret
) << dendl
;
987 #if defined(__linux__)
988 if (basefs
.f_type
== BTRFS_SUPER_MAGIC
&&
989 !g_ceph_context
->check_experimental_feature_enabled("btrfs")) {
990 derr
<< __FUNC__
<< ": deprecated btrfs support is not enabled" << dendl
;
995 create_backend(basefs
.f_type
);
997 ret
= backend
->create_current();
999 derr
<< __FUNC__
<< ": failed to create current/ " << cpp_strerror(ret
) << dendl
;
1003 // write initial op_seq
1005 uint64_t initial_seq
= 0;
1006 int fd
= read_op_seq(&initial_seq
);
1009 derr
<< __FUNC__
<< ": failed to create " << current_op_seq_fn
<< ": "
1010 << cpp_strerror(ret
) << dendl
;
1013 if (initial_seq
== 0) {
1014 ret
= write_op_seq(fd
, 1);
1016 VOID_TEMP_FAILURE_RETRY(::close(fd
));
1017 derr
<< __FUNC__
<< ": failed to write to " << current_op_seq_fn
<< ": "
1018 << cpp_strerror(ret
) << dendl
;
1022 if (backend
->can_checkpoint()) {
1023 // create snap_1 too
1024 current_fd
= ::open(current_fn
.c_str(), O_RDONLY
|O_CLOEXEC
);
1025 ceph_assert(current_fd
>= 0);
1027 snprintf(s
, sizeof(s
), COMMIT_SNAP_ITEM
, 1ull);
1028 ret
= backend
->create_checkpoint(s
, nullptr);
1029 VOID_TEMP_FAILURE_RETRY(::close(current_fd
));
1030 if (ret
< 0 && ret
!= -EEXIST
) {
1031 VOID_TEMP_FAILURE_RETRY(::close(fd
));
1032 derr
<< __FUNC__
<< ": failed to create snap_1: " << cpp_strerror(ret
) << dendl
;
1037 VOID_TEMP_FAILURE_RETRY(::close(fd
));
1039 ret
= KeyValueDB::test_init(superblock
.omap_backend
, omap_dir
);
1041 derr
<< __FUNC__
<< ": failed to create " << cct
->_conf
->filestore_omap_backend
<< dendl
;
1044 // create fsid under omap
1047 char omap_fsid_fn
[PATH_MAX
];
1048 snprintf(omap_fsid_fn
, sizeof(omap_fsid_fn
), "%s/osd_uuid", omap_dir
.c_str());
1049 omap_fsid_fd
= ::open(omap_fsid_fn
, O_RDWR
|O_CREAT
|O_CLOEXEC
, 0644);
1050 if (omap_fsid_fd
< 0) {
1052 derr
<< __FUNC__
<< ": failed to open " << omap_fsid_fn
<< ": " << cpp_strerror(ret
) << dendl
;
1056 if (read_fsid(omap_fsid_fd
, &old_omap_fsid
) < 0 || old_omap_fsid
.is_zero()) {
1057 ceph_assert(!fsid
.is_zero());
1058 fsid
.print(fsid_str
);
1059 strcat(fsid_str
, "\n");
1060 ret
= ::ftruncate(omap_fsid_fd
, 0);
1063 derr
<< __FUNC__
<< ": failed to truncate fsid: "
1064 << cpp_strerror(ret
) << dendl
;
1065 goto close_omap_fsid_fd
;
1067 ret
= safe_write(omap_fsid_fd
, fsid_str
, strlen(fsid_str
));
1069 derr
<< __FUNC__
<< ": failed to write fsid: "
1070 << cpp_strerror(ret
) << dendl
;
1071 goto close_omap_fsid_fd
;
1073 dout(10) << __FUNC__
<< ": write success, fsid:" << fsid_str
<< ", ret:" << ret
<< dendl
;
1074 if (::fsync(omap_fsid_fd
) < 0) {
1076 derr
<< __FUNC__
<< ": close failed: can't write fsid: "
1077 << cpp_strerror(ret
) << dendl
;
1078 goto close_omap_fsid_fd
;
1080 dout(10) << "mkfs omap fsid is " << fsid
<< dendl
;
1082 if (fsid
!= old_omap_fsid
) {
1083 derr
<< __FUNC__
<< ": " << omap_fsid_fn
1084 << " has existed omap fsid " << old_omap_fsid
1085 << " != expected osd fsid " << fsid
1088 goto close_omap_fsid_fd
;
1090 dout(1) << __FUNC__
<< ": omap fsid is already set to " << fsid
<< dendl
;
1093 dout(1) << cct
->_conf
->filestore_omap_backend
<< " db exists/created" << dendl
;
1098 goto close_omap_fsid_fd
;
1100 ret
= write_meta("type", "filestore");
1102 goto close_omap_fsid_fd
;
1104 dout(1) << "mkfs done in " << basedir
<< dendl
;
1108 VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd
));
1110 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd
));
1113 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd
));
1119 int FileStore::mkjournal()
1124 snprintf(fn
, sizeof(fn
), "%s/fsid", basedir
.c_str());
1125 int fd
= ::open(fn
, O_RDONLY
|O_CLOEXEC
, 0644);
1128 derr
<< __FUNC__
<< ": open error: " << cpp_strerror(err
) << dendl
;
1131 ret
= read_fsid(fd
, &fsid
);
1133 derr
<< __FUNC__
<< ": read error: " << cpp_strerror(ret
) << dendl
;
1134 VOID_TEMP_FAILURE_RETRY(::close(fd
));
1137 VOID_TEMP_FAILURE_RETRY(::close(fd
));
1143 ret
= journal
->check();
1145 ret
= journal
->create();
1147 derr
<< __FUNC__
<< ": error creating journal on " << journalpath
1148 << ": " << cpp_strerror(ret
) << dendl
;
1150 dout(0) << __FUNC__
<< ": created journal on " << journalpath
<< dendl
;
1158 int FileStore::read_fsid(int fd
, uuid_d
*uuid
)
1161 memset(fsid_str
, 0, sizeof(fsid_str
));
1162 int ret
= safe_read(fd
, fsid_str
, sizeof(fsid_str
));
1166 // old 64-bit fsid... mirror it.
1167 *(uint64_t*)&uuid
->bytes()[0] = *(uint64_t*)fsid_str
;
1168 *(uint64_t*)&uuid
->bytes()[8] = *(uint64_t*)fsid_str
;
1176 if (!uuid
->parse(fsid_str
))
1181 int FileStore::lock_fsid()
1184 memset(&l
, 0, sizeof(l
));
1186 l
.l_whence
= SEEK_SET
;
1189 int r
= ::fcntl(fsid_fd
, F_SETLK
, &l
);
1192 dout(0) << __FUNC__
<< ": failed to lock " << basedir
<< "/fsid, is another ceph-osd still running? "
1193 << cpp_strerror(err
) << dendl
;
1199 bool FileStore::test_mount_in_use()
1201 dout(5) << __FUNC__
<< ": basedir " << basedir
<< " journal " << journalpath
<< dendl
;
1203 snprintf(fn
, sizeof(fn
), "%s/fsid", basedir
.c_str());
1205 // verify fs isn't in use
1207 fsid_fd
= ::open(fn
, O_RDWR
|O_CLOEXEC
, 0644);
1209 return 0; // no fsid, ok.
1210 bool inuse
= lock_fsid() < 0;
1211 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd
));
1216 bool FileStore::is_rotational()
1220 rotational
= backend
->is_rotational();
1222 int fd
= ::open(basedir
.c_str(), O_RDONLY
|O_CLOEXEC
);
1226 int r
= ::fstatfs(fd
, &st
);
1231 create_backend(st
.f_type
);
1232 rotational
= backend
->is_rotational();
1236 dout(10) << __func__
<< " " << (int)rotational
<< dendl
;
1240 bool FileStore::is_journal_rotational()
1242 bool journal_rotational
;
1244 journal_rotational
= backend
->is_journal_rotational();
1246 int fd
= ::open(journalpath
.c_str(), O_RDONLY
|O_CLOEXEC
);
1250 int r
= ::fstatfs(fd
, &st
);
1255 create_backend(st
.f_type
);
1256 journal_rotational
= backend
->is_journal_rotational();
1260 dout(10) << __func__
<< " " << (int)journal_rotational
<< dendl
;
1261 return journal_rotational
;
1264 int FileStore::_detect_fs()
1267 int r
= ::fstatfs(basedir_fd
, &st
);
1271 blk_size
= st
.f_bsize
;
1273 #if defined(__linux__)
1274 if (st
.f_type
== BTRFS_SUPER_MAGIC
&&
1275 !g_ceph_context
->check_experimental_feature_enabled("btrfs")) {
1276 derr
<<__FUNC__
<< ": deprecated btrfs support is not enabled" << dendl
;
1281 create_backend(st
.f_type
);
1283 r
= backend
->detect_features();
1285 derr
<< __FUNC__
<< ": detect_features error: " << cpp_strerror(r
) << dendl
;
1291 char dev_node
[PATH_MAX
];
1292 if (int rc
= BlkDev
{fsid_fd
}.wholedisk(dev_node
, PATH_MAX
); rc
== 0) {
1293 vdo_fd
= get_vdo_stats_handle(dev_node
, &vdo_name
);
1295 dout(0) << __func__
<< " VDO volume " << vdo_name
<< " for " << dev_node
1305 snprintf(fn
, sizeof(fn
), "%s/xattr_test", basedir
.c_str());
1306 int tmpfd
= ::open(fn
, O_CREAT
|O_WRONLY
|O_TRUNC
|O_CLOEXEC
, 0700);
1309 derr
<< __FUNC__
<< ": unable to create " << fn
<< ": " << cpp_strerror(ret
) << dendl
;
1313 int ret
= chain_fsetxattr(tmpfd
, "user.test", &x
, sizeof(x
));
1315 ret
= chain_fgetxattr(tmpfd
, "user.test", &y
, sizeof(y
));
1316 if ((ret
< 0) || (x
!= y
)) {
1317 derr
<< "Extended attributes don't appear to work. ";
1319 *_dout
<< "Got error " + cpp_strerror(ret
) + ". ";
1320 *_dout
<< "If you are using ext3 or ext4, be sure to mount the underlying "
1321 << "file system with the 'user_xattr' option." << dendl
;
1323 VOID_TEMP_FAILURE_RETRY(::close(tmpfd
));
1328 memset(buf
, 0, sizeof(buf
)); // shut up valgrind
1329 chain_fsetxattr(tmpfd
, "user.test", &buf
, sizeof(buf
));
1330 chain_fsetxattr(tmpfd
, "user.test2", &buf
, sizeof(buf
));
1331 chain_fsetxattr(tmpfd
, "user.test3", &buf
, sizeof(buf
));
1332 chain_fsetxattr(tmpfd
, "user.test4", &buf
, sizeof(buf
));
1333 ret
= chain_fsetxattr(tmpfd
, "user.test5", &buf
, sizeof(buf
));
1334 if (ret
== -ENOSPC
) {
1335 dout(0) << "limited size xattrs" << dendl
;
1337 chain_fremovexattr(tmpfd
, "user.test");
1338 chain_fremovexattr(tmpfd
, "user.test2");
1339 chain_fremovexattr(tmpfd
, "user.test3");
1340 chain_fremovexattr(tmpfd
, "user.test4");
1341 chain_fremovexattr(tmpfd
, "user.test5");
1344 VOID_TEMP_FAILURE_RETRY(::close(tmpfd
));
1349 int FileStore::_sanity_check_fs()
1353 if (((int)m_filestore_journal_writeahead
+
1354 (int)m_filestore_journal_parallel
+
1355 (int)m_filestore_journal_trailing
) > 1) {
1356 dout(0) << "mount ERROR: more than one of filestore journal {writeahead,parallel,trailing} enabled" << dendl
;
1358 << " ** WARNING: more than one of 'filestore journal {writeahead,parallel,trailing}'\n"
1359 << " is enabled in ceph.conf. You must choose a single journal mode."
1360 << TEXT_NORMAL
<< std::endl
;
1364 if (!backend
->can_checkpoint()) {
1365 if (!journal
|| !m_filestore_journal_writeahead
) {
1366 dout(0) << "mount WARNING: no btrfs, and no journal in writeahead mode; data may be lost" << dendl
;
1368 << " ** WARNING: no btrfs AND (no journal OR journal not in writeahead mode)\n"
1369 << " For non-btrfs volumes, a writeahead journal is required to\n"
1370 << " maintain on-disk consistency in the event of a crash. Your conf\n"
1371 << " should include something like:\n"
1372 << " osd journal = /path/to/journal_device_or_file\n"
1373 << " filestore journal writeahead = true\n"
1379 dout(0) << "mount WARNING: no journal" << dendl
;
1381 << " ** WARNING: No osd journal is configured: write latency may be high.\n"
1382 << " If you will not be using an osd journal, write latency may be\n"
1383 << " relatively high. It can be reduced somewhat by lowering\n"
1384 << " filestore_max_sync_interval, but lower values mean lower write\n"
1385 << " throughput, especially with spinning disks.\n"
1392 int FileStore::write_superblock()
1395 encode(superblock
, bl
);
1396 return safe_write_file(basedir
.c_str(), "superblock",
1397 bl
.c_str(), bl
.length(), 0600);
1400 int FileStore::read_superblock()
1402 bufferptr
bp(PATH_MAX
);
1403 int ret
= safe_read_file(basedir
.c_str(), "superblock",
1404 bp
.c_str(), bp
.length());
1406 if (ret
== -ENOENT
) {
1407 // If the file doesn't exist write initial CompatSet
1408 return write_superblock();
1414 bl
.push_back(std::move(bp
));
1415 auto i
= bl
.cbegin();
1416 decode(superblock
, i
);
1420 int FileStore::update_version_stamp()
1422 return write_version_stamp();
1425 int FileStore::version_stamp_is_valid(uint32_t *version
)
1427 bufferptr
bp(PATH_MAX
);
1428 int ret
= safe_read_file(basedir
.c_str(), "store_version",
1429 bp
.c_str(), bp
.length());
1434 bl
.push_back(std::move(bp
));
1435 auto i
= bl
.cbegin();
1436 decode(*version
, i
);
1437 dout(10) << __FUNC__
<< ": was " << *version
<< " vs target "
1438 << target_version
<< dendl
;
1439 if (*version
== target_version
)
1445 int FileStore::flush_cache(ostream
*os
)
1447 string drop_caches_file
= "/proc/sys/vm/drop_caches";
1448 int drop_caches_fd
= ::open(drop_caches_file
.c_str(), O_WRONLY
|O_CLOEXEC
), ret
= 0;
1450 size_t len
= strlen(buf
);
1452 if (drop_caches_fd
< 0) {
1454 derr
<< __FUNC__
<< ": failed to open " << drop_caches_file
<< ": " << cpp_strerror(ret
) << dendl
;
1456 *os
<< "FileStore flush_cache: failed to open " << drop_caches_file
<< ": " << cpp_strerror(ret
);
1461 if (::write(drop_caches_fd
, buf
, len
) < 0) {
1463 derr
<< __FUNC__
<< ": failed to write to " << drop_caches_file
<< ": " << cpp_strerror(ret
) << dendl
;
1465 *os
<< "FileStore flush_cache: failed to write to " << drop_caches_file
<< ": " << cpp_strerror(ret
);
1471 ::close(drop_caches_fd
);
1475 int FileStore::write_version_stamp()
1477 dout(1) << __FUNC__
<< ": " << target_version
<< dendl
;
1479 encode(target_version
, bl
);
1481 return safe_write_file(basedir
.c_str(), "store_version",
1482 bl
.c_str(), bl
.length(), 0600);
1485 int FileStore::upgrade()
1487 dout(1) << __FUNC__
<< dendl
;
1489 int r
= version_stamp_is_valid(&version
);
1492 derr
<< "The store_version file doesn't exist." << dendl
;
1501 derr
<< "ObjectStore is old at version " << version
<< ". Please upgrade to firefly v0.80.x, convert your store, and then upgrade." << dendl
;
1505 // nothing necessary in FileStore for v3 -> v4 upgrade; we just need to
1506 // open up DBObjectMap with the do_upgrade flag, which we already did.
1507 update_version_stamp();
1511 int FileStore::read_op_seq(uint64_t *seq
)
1513 int op_fd
= ::open(current_op_seq_fn
.c_str(), O_CREAT
|O_RDWR
|O_CLOEXEC
, 0644);
1516 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
1520 memset(s
, 0, sizeof(s
));
1521 int ret
= safe_read(op_fd
, s
, sizeof(s
) - 1);
1523 derr
<< __FUNC__
<< ": error reading " << current_op_seq_fn
<< ": " << cpp_strerror(ret
) << dendl
;
1524 VOID_TEMP_FAILURE_RETRY(::close(op_fd
));
1525 ceph_assert(!m_filestore_fail_eio
|| ret
!= -EIO
);
1532 int FileStore::write_op_seq(int fd
, uint64_t seq
)
1535 snprintf(s
, sizeof(s
), "%" PRId64
"\n", seq
);
1536 int ret
= TEMP_FAILURE_RETRY(::pwrite(fd
, s
, strlen(s
), 0));
1539 ceph_assert(!m_filestore_fail_eio
|| ret
!= -EIO
);
1544 int FileStore::mount()
1548 uint64_t initial_op_seq
;
1550 set
<string
> cluster_snaps
;
1551 CompatSet supported_compat_set
= get_fs_supported_compat_set();
1553 dout(5) << "basedir " << basedir
<< " journal " << journalpath
<< dendl
;
1555 ret
= set_throttle_params();
1559 // make sure global base dir exists
1560 if (::access(basedir
.c_str(), R_OK
| W_OK
)) {
1562 derr
<< __FUNC__
<< ": unable to access basedir '" << basedir
<< "': "
1563 << cpp_strerror(ret
) << dendl
;
1568 snprintf(buf
, sizeof(buf
), "%s/fsid", basedir
.c_str());
1569 fsid_fd
= ::open(buf
, O_RDWR
|O_CLOEXEC
, 0644);
1572 derr
<< __FUNC__
<< ": error opening '" << buf
<< "': "
1573 << cpp_strerror(ret
) << dendl
;
1577 ret
= read_fsid(fsid_fd
, &fsid
);
1579 derr
<< __FUNC__
<< ": error reading fsid_fd: " << cpp_strerror(ret
)
1584 if (lock_fsid() < 0) {
1585 derr
<< __FUNC__
<< ": lock_fsid failed" << dendl
;
1590 dout(10) << "mount fsid is " << fsid
<< dendl
;
1593 uint32_t version_stamp
;
1594 ret
= version_stamp_is_valid(&version_stamp
);
1596 derr
<< __FUNC__
<< ": error in version_stamp_is_valid: "
1597 << cpp_strerror(ret
) << dendl
;
1599 } else if (ret
== 0) {
1600 if (do_update
|| (int)version_stamp
< cct
->_conf
->filestore_update_to
) {
1601 derr
<< __FUNC__
<< ": stale version stamp detected: "
1603 << ". Proceeding, do_update "
1604 << "is set, performing disk format upgrade."
1609 derr
<< __FUNC__
<< ": stale version stamp " << version_stamp
1610 << ". Please run the FileStore update script before starting the "
1611 << "OSD, or set filestore_update_to to " << target_version
1612 << " (currently " << cct
->_conf
->filestore_update_to
<< ")"
1618 ret
= read_superblock();
1623 // Check if this FileStore supports all the necessary features to mount
1624 if (supported_compat_set
.compare(superblock
.compat_features
) == -1) {
1625 derr
<< __FUNC__
<< ": Incompatible features set "
1626 << superblock
.compat_features
<< dendl
;
1631 // open some dir handles
1632 basedir_fd
= ::open(basedir
.c_str(), O_RDONLY
|O_CLOEXEC
);
1633 if (basedir_fd
< 0) {
1635 derr
<< __FUNC__
<< ": failed to open " << basedir
<< ": "
1636 << cpp_strerror(ret
) << dendl
;
1641 // test for btrfs, xattrs, etc.
1644 derr
<< __FUNC__
<< ": error in _detect_fs: "
1645 << cpp_strerror(ret
) << dendl
;
1646 goto close_basedir_fd
;
1651 ret
= backend
->list_checkpoints(ls
);
1653 derr
<< __FUNC__
<< ": error in _list_snaps: "<< cpp_strerror(ret
) << dendl
;
1654 goto close_basedir_fd
;
1657 long long unsigned c
, prev
= 0;
1658 char clustersnap
[NAME_MAX
];
1659 for (list
<string
>::iterator it
= ls
.begin(); it
!= ls
.end(); ++it
) {
1660 if (sscanf(it
->c_str(), COMMIT_SNAP_ITEM
, &c
) == 1) {
1661 ceph_assert(c
> prev
);
1664 } else if (sscanf(it
->c_str(), CLUSTER_SNAP_ITEM
, clustersnap
) == 1)
1665 cluster_snaps
.insert(*it
);
1669 if (m_osd_rollback_to_cluster_snap
.length() &&
1670 cluster_snaps
.count(m_osd_rollback_to_cluster_snap
) == 0) {
1671 derr
<< "rollback to cluster snapshot '" << m_osd_rollback_to_cluster_snap
<< "': not found" << dendl
;
1673 goto close_basedir_fd
;
1677 snprintf(nosnapfn
, sizeof(nosnapfn
), "%s/nosnap", current_fn
.c_str());
1679 if (backend
->can_checkpoint()) {
1680 if (snaps
.empty()) {
1681 dout(0) << __FUNC__
<< ": WARNING: no consistent snaps found, store may be in inconsistent state" << dendl
;
1684 uint64_t curr_seq
= 0;
1686 if (m_osd_rollback_to_cluster_snap
.length()) {
1688 << " ** NOTE: rolling back to cluster snapshot " << m_osd_rollback_to_cluster_snap
<< " **"
1691 ceph_assert(cluster_snaps
.count(m_osd_rollback_to_cluster_snap
));
1692 snprintf(s
, sizeof(s
), CLUSTER_SNAP_ITEM
, m_osd_rollback_to_cluster_snap
.c_str());
1695 int fd
= read_op_seq(&curr_seq
);
1697 VOID_TEMP_FAILURE_RETRY(::close(fd
));
1701 dout(10) << " current/ seq was " << curr_seq
<< dendl
;
1703 dout(10) << " current/ missing entirely (unusual, but okay)" << dendl
;
1705 uint64_t cp
= snaps
.back();
1706 dout(10) << " most recent snap from " << snaps
<< " is " << cp
<< dendl
;
1708 // if current/ is marked as non-snapshotted, refuse to roll
1709 // back (without clear direction) to avoid throwing out new
1712 if (::stat(nosnapfn
, &st
) == 0) {
1713 if (!m_osd_use_stale_snap
) {
1714 derr
<< "ERROR: " << nosnapfn
<< " exists, not rolling back to avoid losing new data" << dendl
;
1715 derr
<< "Force rollback to old snapshotted version with 'osd use stale snap = true'" << dendl
;
1716 derr
<< "config option for --osd-use-stale-snap startup argument." << dendl
;
1718 goto close_basedir_fd
;
1720 derr
<< "WARNING: user forced start with data sequence mismatch: current was " << curr_seq
1721 << ", newest snap is " << cp
<< dendl
;
1723 << " ** WARNING: forcing the use of stale snapshot data **"
1724 << TEXT_NORMAL
<< std::endl
;
1727 dout(10) << __FUNC__
<< ": rolling back to consistent snap " << cp
<< dendl
;
1728 snprintf(s
, sizeof(s
), COMMIT_SNAP_ITEM
, (long long unsigned)cp
);
1732 ret
= backend
->rollback_to(s
);
1734 derr
<< __FUNC__
<< ": error rolling back to " << s
<< ": "
1735 << cpp_strerror(ret
) << dendl
;
1736 goto close_basedir_fd
;
1742 current_fd
= ::open(current_fn
.c_str(), O_RDONLY
|O_CLOEXEC
);
1743 if (current_fd
< 0) {
1745 derr
<< __FUNC__
<< ": error opening: " << current_fn
<< ": " << cpp_strerror(ret
) << dendl
;
1746 goto close_basedir_fd
;
1749 ceph_assert(current_fd
>= 0);
1751 op_fd
= read_op_seq(&initial_op_seq
);
1754 derr
<< __FUNC__
<< ": read_op_seq failed" << dendl
;
1755 goto close_current_fd
;
1758 dout(5) << "mount op_seq is " << initial_op_seq
<< dendl
;
1759 if (initial_op_seq
== 0) {
1760 derr
<< "mount initial op seq is 0; something is wrong" << dendl
;
1762 goto close_current_fd
;
1765 if (!backend
->can_checkpoint()) {
1766 // mark current/ as non-snapshotted so that we don't rollback away
1768 int r
= ::creat(nosnapfn
, 0644);
1771 derr
<< __FUNC__
<< ": failed to create current/nosnap" << dendl
;
1772 goto close_current_fd
;
1774 VOID_TEMP_FAILURE_RETRY(::close(r
));
1776 // clear nosnap marker, if present.
1780 // check fsid with omap
1782 char omap_fsid_buf
[PATH_MAX
];
1783 struct ::stat omap_fsid_stat
;
1784 snprintf(omap_fsid_buf
, sizeof(omap_fsid_buf
), "%s/osd_uuid", omap_dir
.c_str());
1785 // if osd_uuid not exists, assume as this omap matchs corresponding osd
1786 if (::stat(omap_fsid_buf
, &omap_fsid_stat
) != 0){
1787 dout(10) << __FUNC__
<< ": osd_uuid not found under omap, "
1788 << "assume as matched."
1792 // if osd_uuid exists, compares osd_uuid with fsid
1793 omap_fsid_fd
= ::open(omap_fsid_buf
, O_RDONLY
|O_CLOEXEC
, 0644);
1794 if (omap_fsid_fd
< 0) {
1796 derr
<< __FUNC__
<< ": error opening '" << omap_fsid_buf
<< "': "
1797 << cpp_strerror(ret
)
1799 goto close_current_fd
;
1801 ret
= read_fsid(omap_fsid_fd
, &omap_fsid
);
1802 VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd
));
1804 derr
<< __FUNC__
<< ": error reading omap_fsid_fd"
1805 << ", omap_fsid = " << omap_fsid
1806 << cpp_strerror(ret
)
1808 goto close_current_fd
;
1810 if (fsid
!= omap_fsid
) {
1811 derr
<< __FUNC__
<< ": " << omap_fsid_buf
1812 << " has existed omap fsid " << omap_fsid
1813 << " != expected osd fsid " << fsid
1816 goto close_current_fd
;
1820 dout(0) << "start omap initiation" << dendl
;
1821 if (!(generic_flags
& SKIP_MOUNT_OMAP
)) {
1822 KeyValueDB
* omap_store
= KeyValueDB::create(cct
,
1823 superblock
.omap_backend
,
1827 derr
<< __FUNC__
<< ": Error creating " << superblock
.omap_backend
<< dendl
;
1829 goto close_current_fd
;
1832 if (superblock
.omap_backend
== "rocksdb")
1833 ret
= omap_store
->init(cct
->_conf
->filestore_rocksdb_options
);
1835 ret
= omap_store
->init();
1838 derr
<< __FUNC__
<< ": Error initializing omap_store: " << cpp_strerror(ret
) << dendl
;
1839 goto close_current_fd
;
1843 if (omap_store
->create_and_open(err
)) {
1845 omap_store
= nullptr;
1846 derr
<< __FUNC__
<< ": Error initializing " << superblock
.omap_backend
1847 << " : " << err
.str() << dendl
;
1849 goto close_current_fd
;
1852 DBObjectMap
*dbomap
= new DBObjectMap(cct
, omap_store
);
1853 ret
= dbomap
->init(do_update
);
1857 derr
<< __FUNC__
<< ": Error initializing DBObjectMap: " << ret
<< dendl
;
1858 goto close_current_fd
;
1862 if (cct
->_conf
->filestore_debug_omap_check
&& !dbomap
->check(err2
)) {
1863 derr
<< err2
.str() << dendl
;
1867 goto close_current_fd
;
1869 object_map
.reset(dbomap
);
1875 // select journal mode?
1877 if (!m_filestore_journal_writeahead
&&
1878 !m_filestore_journal_parallel
&&
1879 !m_filestore_journal_trailing
) {
1880 if (!backend
->can_checkpoint()) {
1881 m_filestore_journal_writeahead
= true;
1882 dout(0) << __FUNC__
<< ": enabling WRITEAHEAD journal mode: checkpoint is not enabled" << dendl
;
1884 m_filestore_journal_parallel
= true;
1885 dout(0) << __FUNC__
<< ": enabling PARALLEL journal mode: fs, checkpoint is enabled" << dendl
;
1888 if (m_filestore_journal_writeahead
)
1889 dout(0) << __FUNC__
<< ": WRITEAHEAD journal mode explicitly enabled in conf" << dendl
;
1890 if (m_filestore_journal_parallel
)
1891 dout(0) << __FUNC__
<< ": PARALLEL journal mode explicitly enabled in conf" << dendl
;
1892 if (m_filestore_journal_trailing
)
1893 dout(0) << __FUNC__
<< ": TRAILING journal mode explicitly enabled in conf" << dendl
;
1895 if (m_filestore_journal_writeahead
)
1896 journal
->set_wait_on_full(true);
1898 dout(0) << __FUNC__
<< ": no journal" << dendl
;
1901 ret
= _sanity_check_fs();
1903 derr
<< __FUNC__
<< ": _sanity_check_fs failed with error "
1905 goto close_current_fd
;
1908 // Cleanup possibly invalid collections
1910 vector
<coll_t
> collections
;
1911 ret
= list_collections(collections
, true);
1913 derr
<< "Error " << ret
<< " while listing collections" << dendl
;
1914 goto close_current_fd
;
1916 for (vector
<coll_t
>::iterator i
= collections
.begin();
1917 i
!= collections
.end();
1920 ret
= get_index(*i
, &index
);
1922 derr
<< "Unable to mount index " << *i
1923 << " with error: " << ret
<< dendl
;
1924 goto close_current_fd
;
1926 ceph_assert(index
.index
);
1927 std::unique_lock l
{(index
.index
)->access_lock
};
1932 if (!m_disable_wbthrottle
) {
1935 dout(0) << __FUNC__
<< ": INFO: WbThrottle is disabled" << dendl
;
1936 if (cct
->_conf
->filestore_odsync_write
) {
1937 dout(0) << __FUNC__
<< ": INFO: O_DSYNC write is enabled" << dendl
;
1940 sync_thread
.create("filestore_sync");
1942 if (!(generic_flags
& SKIP_JOURNAL_REPLAY
)) {
1943 ret
= journal_replay(initial_op_seq
);
1945 derr
<< __FUNC__
<< ": failed to open journal " << journalpath
<< ": " << cpp_strerror(ret
) << dendl
;
1946 if (ret
== -ENOTTY
) {
1947 derr
<< "maybe journal is not pointing to a block device and its size "
1948 << "wasn't configured?" << dendl
;
1957 if (cct
->_conf
->filestore_debug_omap_check
&& !object_map
->check(err2
)) {
1958 derr
<< err2
.str() << dendl
;
1964 init_temp_collections();
1969 for (vector
<Finisher
*>::iterator it
= ondisk_finishers
.begin(); it
!= ondisk_finishers
.end(); ++it
) {
1972 for (vector
<Finisher
*>::iterator it
= apply_finishers
.begin(); it
!= apply_finishers
.end(); ++it
) {
1979 if (cct
->_conf
->filestore_update_to
>= (int)get_target_version()) {
1980 int err
= upgrade();
1982 derr
<< "error converting store" << dendl
;
1994 std::lock_guard l
{lock
};
1996 sync_cond
.notify_all();
1999 if (!m_disable_wbthrottle
) {
2003 VOID_TEMP_FAILURE_RETRY(::close(current_fd
));
2006 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd
));
2009 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd
));
2012 ceph_assert(!m_filestore_fail_eio
|| ret
!= -EIO
);
2019 void FileStore::init_temp_collections()
2021 dout(10) << __FUNC__
<< dendl
;
2023 int r
= list_collections(ls
, true);
2024 ceph_assert(r
>= 0);
2026 dout(20) << " ls " << ls
<< dendl
;
2028 SequencerPosition spos
;
2031 for (vector
<coll_t
>::iterator p
= ls
.begin(); p
!= ls
.end(); ++p
)
2034 dout(20) << " temps " << temps
<< dendl
;
2036 for (vector
<coll_t
>::iterator p
= ls
.begin(); p
!= ls
.end(); ++p
) {
2039 coll_map
[*p
] = ceph::make_ref
<OpSequencer
>(cct
, ++next_osr_id
, *p
);
2042 coll_t temp
= p
->get_temp();
2043 if (temps
.count(temp
)) {
2046 dout(10) << __FUNC__
<< ": creating " << temp
<< dendl
;
2047 r
= _create_collection(temp
, 0, spos
);
2048 ceph_assert(r
== 0);
2052 for (set
<coll_t
>::iterator p
= temps
.begin(); p
!= temps
.end(); ++p
) {
2053 dout(10) << __FUNC__
<< ": removing stray " << *p
<< dendl
;
2054 r
= _collection_remove_recursive(*p
, spos
);
2055 ceph_assert(r
== 0);
2059 int FileStore::umount()
2061 dout(5) << __FUNC__
<< ": " << basedir
<< dendl
;
2068 std::lock_guard
l(coll_lock
);
2073 std::lock_guard l
{lock
};
2075 sync_cond
.notify_all();
2078 if (!m_disable_wbthrottle
){
2084 if (!(generic_flags
& SKIP_JOURNAL_REPLAY
))
2085 journal_write_close();
2087 for (vector
<Finisher
*>::iterator it
= ondisk_finishers
.begin(); it
!= ondisk_finishers
.end(); ++it
) {
2090 for (vector
<Finisher
*>::iterator it
= apply_finishers
.begin(); it
!= apply_finishers
.end(); ++it
) {
2095 VOID_TEMP_FAILURE_RETRY(::close(vdo_fd
));
2099 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd
));
2103 VOID_TEMP_FAILURE_RETRY(::close(op_fd
));
2106 if (current_fd
>= 0) {
2107 VOID_TEMP_FAILURE_RETRY(::close(current_fd
));
2110 if (basedir_fd
>= 0) {
2111 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd
));
2123 std::lock_guard l
{sync_entry_timeo_lock
};
2132 /// -----------------------------
2134 // keep OpSequencer handles alive for all time so that a sequence
2135 // that removes a collection and creates a new one will not allow
2136 // two sequencers for the same collection to be alive at once.
2138 ObjectStore::CollectionHandle
FileStore::open_collection(const coll_t
& c
)
2140 std::lock_guard l
{coll_lock
};
2141 auto p
= coll_map
.find(c
);
2142 if (p
== coll_map
.end()) {
2143 return CollectionHandle();
2148 ObjectStore::CollectionHandle
FileStore::create_new_collection(const coll_t
& c
)
2150 std::lock_guard l
{coll_lock
};
2151 auto p
= coll_map
.find(c
);
2152 if (p
== coll_map
.end()) {
2153 auto r
= ceph::make_ref
<OpSequencer
>(cct
, ++next_osr_id
, c
);
2162 /// -----------------------------
2164 FileStore::Op
*FileStore::build_op(vector
<Transaction
>& tls
,
2165 Context
*onreadable
,
2166 Context
*onreadable_sync
,
2167 TrackedOpRef osd_op
)
2169 uint64_t bytes
= 0, ops
= 0;
2170 for (vector
<Transaction
>::iterator p
= tls
.begin();
2173 bytes
+= (*p
).get_num_bytes();
2174 ops
+= (*p
).get_num_ops();
2178 o
->start
= ceph_clock_now();
2179 o
->tls
= std::move(tls
);
2180 o
->onreadable
= onreadable
;
2181 o
->onreadable_sync
= onreadable_sync
;
2190 void FileStore::queue_op(OpSequencer
*osr
, Op
*o
)
2192 // queue op on sequencer, then queue sequencer for the threadpool,
2193 // so that regardless of which order the threads pick up the
2194 // sequencer, the op order will be preserved.
2197 o
->trace
.event("queued");
2199 logger
->inc(l_filestore_ops
);
2200 logger
->inc(l_filestore_bytes
, o
->bytes
);
2202 dout(5) << __FUNC__
<< ": " << o
<< " seq " << o
->op
2204 << " " << o
->bytes
<< " bytes"
2205 << " (queue has " << throttle_ops
.get_current() << " ops and " << throttle_bytes
.get_current() << " bytes)"
2210 void FileStore::op_queue_reserve_throttle(Op
*o
)
2213 throttle_bytes
.get(o
->bytes
);
2215 logger
->set(l_filestore_op_queue_ops
, throttle_ops
.get_current());
2216 logger
->set(l_filestore_op_queue_bytes
, throttle_bytes
.get_current());
2219 void FileStore::op_queue_release_throttle(Op
*o
)
2222 throttle_bytes
.put(o
->bytes
);
2223 logger
->set(l_filestore_op_queue_ops
, throttle_ops
.get_current());
2224 logger
->set(l_filestore_op_queue_bytes
, throttle_bytes
.get_current());
2227 void FileStore::_do_op(OpSequencer
*osr
, ThreadPool::TPHandle
&handle
)
2229 if (!m_disable_wbthrottle
) {
2230 wbthrottle
.throttle();
2233 if (cct
->_conf
->filestore_inject_stall
) {
2234 int orig
= cct
->_conf
->filestore_inject_stall
;
2235 dout(5) << __FUNC__
<< ": filestore_inject_stall " << orig
<< ", sleeping" << dendl
;
2237 cct
->_conf
.set_val("filestore_inject_stall", "0");
2238 dout(5) << __FUNC__
<< ": done stalling" << dendl
;
2241 osr
->apply_lock
.lock();
2242 Op
*o
= osr
->peek_queue();
2243 o
->trace
.event("op_apply_start");
2244 apply_manager
.op_apply_start(o
->op
);
2245 dout(5) << __FUNC__
<< ": " << o
<< " seq " << o
->op
<< " " << *osr
<< " start" << dendl
;
2246 o
->trace
.event("_do_transactions start");
2247 int r
= _do_transactions(o
->tls
, o
->op
, &handle
, osr
->osr_name
);
2248 o
->trace
.event("op_apply_finish");
2249 apply_manager
.op_apply_finish(o
->op
);
2250 dout(10) << __FUNC__
<< ": " << o
<< " seq " << o
->op
<< " r = " << r
2251 << ", finisher " << o
->onreadable
<< " " << o
->onreadable_sync
<< dendl
;
2254 void FileStore::_finish_op(OpSequencer
*osr
)
2256 list
<Context
*> to_queue
;
2257 Op
*o
= osr
->dequeue(&to_queue
);
2261 utime_t lat
= ceph_clock_now();
2264 dout(10) << __FUNC__
<< ": " << o
<< " seq " << o
->op
<< " " << *osr
<< " lat " << lat
<< dendl
;
2265 osr
->apply_lock
.unlock(); // locked in _do_op
2266 o
->trace
.event("_finish_op");
2268 // called with tp lock held
2269 op_queue_release_throttle(o
);
2271 logger
->tinc(l_filestore_apply_latency
, lat
);
2273 if (o
->onreadable_sync
) {
2274 o
->onreadable_sync
->complete(0);
2276 if (o
->onreadable
) {
2277 apply_finishers
[osr
->id
% m_apply_finisher_num
]->queue(o
->onreadable
);
2279 if (!to_queue
.empty()) {
2280 apply_finishers
[osr
->id
% m_apply_finisher_num
]->queue(to_queue
);
2286 struct C_JournaledAhead
: public Context
{
2288 FileStore::OpSequencer
*osr
;
2292 C_JournaledAhead(FileStore
*f
, FileStore::OpSequencer
*os
, FileStore::Op
*o
, Context
*ondisk
):
2293 fs(f
), osr(os
), o(o
), ondisk(ondisk
) { }
2294 void finish(int r
) override
{
2295 fs
->_journaled_ahead(osr
, o
, ondisk
);
2299 int FileStore::queue_transactions(CollectionHandle
& ch
, vector
<Transaction
>& tls
,
2300 TrackedOpRef osd_op
,
2301 ThreadPool::TPHandle
*handle
)
2303 Context
*onreadable
;
2305 Context
*onreadable_sync
;
2306 ObjectStore::Transaction::collect_contexts(
2307 tls
, &onreadable
, &ondisk
, &onreadable_sync
);
2309 if (cct
->_conf
->objectstore_blackhole
) {
2310 dout(0) << __FUNC__
<< ": objectstore_blackhole = TRUE, dropping transaction"
2315 onreadable
= nullptr;
2316 delete onreadable_sync
;
2317 onreadable_sync
= nullptr;
2321 utime_t start
= ceph_clock_now();
2323 OpSequencer
*osr
= static_cast<OpSequencer
*>(ch
.get());
2324 dout(5) << __FUNC__
<< ": osr " << osr
<< " " << *osr
<< dendl
;
2326 ZTracer::Trace trace
;
2327 if (osd_op
&& osd_op
->pg_trace
) {
2328 osd_op
->store_trace
.init("filestore op", &trace_endpoint
, &osd_op
->pg_trace
);
2329 trace
= osd_op
->store_trace
;
2332 if (journal
&& journal
->is_writeable() && !m_filestore_journal_trailing
) {
2333 Op
*o
= build_op(tls
, onreadable
, onreadable_sync
, osd_op
);
2335 //prepare and encode transactions data out of lock
2337 int orig_len
= journal
->prepare_entry(o
->tls
, &tbl
);
2340 handle
->suspend_tp_timeout();
2342 op_queue_reserve_throttle(o
);
2343 journal
->reserve_throttle_and_backoff(tbl
.length());
2346 handle
->reset_tp_timeout();
2348 uint64_t op_num
= submit_manager
.op_submit_start();
2350 trace
.keyval("opnum", op_num
);
2352 if (m_filestore_do_dump
)
2353 dump_transactions(o
->tls
, o
->op
, osr
);
2355 if (m_filestore_journal_parallel
) {
2356 dout(5) << __FUNC__
<< ": (parallel) " << o
->op
<< " " << o
->tls
<< dendl
;
2358 trace
.keyval("journal mode", "parallel");
2359 trace
.event("journal started");
2360 _op_journal_transactions(tbl
, orig_len
, o
->op
, ondisk
, osd_op
);
2362 // queue inside submit_manager op submission lock
2364 trace
.event("op queued");
2365 } else if (m_filestore_journal_writeahead
) {
2366 dout(5) << __FUNC__
<< ": (writeahead) " << o
->op
<< " " << o
->tls
<< dendl
;
2368 osr
->queue_journal(o
);
2370 trace
.keyval("journal mode", "writeahead");
2371 trace
.event("journal started");
2372 _op_journal_transactions(tbl
, orig_len
, o
->op
,
2373 new C_JournaledAhead(this, osr
, o
, ondisk
),
2378 submit_manager
.op_submit_finish(op_num
);
2379 utime_t end
= ceph_clock_now();
2380 logger
->tinc(l_filestore_queue_transaction_latency_avg
, end
- start
);
2385 Op
*o
= build_op(tls
, onreadable
, onreadable_sync
, osd_op
);
2386 dout(5) << __FUNC__
<< ": (no journal) " << o
<< " " << tls
<< dendl
;
2389 handle
->suspend_tp_timeout();
2391 op_queue_reserve_throttle(o
);
2394 handle
->reset_tp_timeout();
2396 uint64_t op_num
= submit_manager
.op_submit_start();
2399 if (m_filestore_do_dump
)
2400 dump_transactions(o
->tls
, o
->op
, osr
);
2403 trace
.keyval("opnum", op_num
);
2404 trace
.keyval("journal mode", "none");
2405 trace
.event("op queued");
2408 apply_manager
.add_waiter(op_num
, ondisk
);
2409 submit_manager
.op_submit_finish(op_num
);
2410 utime_t end
= ceph_clock_now();
2411 logger
->tinc(l_filestore_queue_transaction_latency_avg
, end
- start
);
2415 ceph_assert(journal
);
2416 //prepare and encode transactions data out of lock
2419 if (journal
->is_writeable()) {
2420 orig_len
= journal
->prepare_entry(tls
, &tbl
);
2422 uint64_t op
= submit_manager
.op_submit_start();
2423 dout(5) << __FUNC__
<< ": (trailing journal) " << op
<< " " << tls
<< dendl
;
2425 if (m_filestore_do_dump
)
2426 dump_transactions(tls
, op
, osr
);
2428 trace
.event("op_apply_start");
2429 trace
.keyval("opnum", op
);
2430 trace
.keyval("journal mode", "trailing");
2431 apply_manager
.op_apply_start(op
);
2432 trace
.event("do_transactions");
2433 int r
= do_transactions(tls
, op
);
2436 trace
.event("journal started");
2437 _op_journal_transactions(tbl
, orig_len
, op
, ondisk
, osd_op
);
2443 // start on_readable finisher after we queue journal item, as on_readable callback
2444 // is allowed to delete the Transaction
2445 if (onreadable_sync
) {
2446 onreadable_sync
->complete(r
);
2448 apply_finishers
[osr
->id
% m_apply_finisher_num
]->queue(onreadable
, r
);
2450 submit_manager
.op_submit_finish(op
);
2451 trace
.event("op_apply_finish");
2452 apply_manager
.op_apply_finish(op
);
2454 utime_t end
= ceph_clock_now();
2455 logger
->tinc(l_filestore_queue_transaction_latency_avg
, end
- start
);
2459 void FileStore::_journaled_ahead(OpSequencer
*osr
, Op
*o
, Context
*ondisk
)
2461 dout(5) << __FUNC__
<< ": " << o
<< " seq " << o
->op
<< " " << *osr
<< " " << o
->tls
<< dendl
;
2463 o
->trace
.event("writeahead journal finished");
2465 // this should queue in order because the journal does it's completions in order.
2468 list
<Context
*> to_queue
;
2469 osr
->dequeue_journal(&to_queue
);
2471 // do ondisk completions async, to prevent any onreadable_sync completions
2472 // getting blocked behind an ondisk completion.
2474 dout(10) << " queueing ondisk " << ondisk
<< dendl
;
2475 ondisk_finishers
[osr
->id
% m_ondisk_finisher_num
]->queue(ondisk
);
2477 if (!to_queue
.empty()) {
2478 ondisk_finishers
[osr
->id
% m_ondisk_finisher_num
]->queue(to_queue
);
2482 int FileStore::_do_transactions(
2483 vector
<Transaction
> &tls
,
2485 ThreadPool::TPHandle
*handle
,
2486 const char *osr_name
)
2490 for (vector
<Transaction
>::iterator p
= tls
.begin();
2493 _do_transaction(*p
, op_seq
, trans_num
, handle
, osr_name
);
2495 handle
->reset_tp_timeout();
2501 void FileStore::_set_global_replay_guard(const coll_t
& cid
,
2502 const SequencerPosition
&spos
)
2504 if (backend
->can_checkpoint())
2507 // sync all previous operations on this sequencer
2508 int ret
= object_map
->sync();
2510 derr
<< __FUNC__
<< ": omap sync error " << cpp_strerror(ret
) << dendl
;
2511 ceph_abort_msg("_set_global_replay_guard failed");
2513 ret
= sync_filesystem(basedir_fd
);
2515 derr
<< __FUNC__
<< ": sync_filesystem error " << cpp_strerror(ret
) << dendl
;
2516 ceph_abort_msg("_set_global_replay_guard failed");
2520 get_cdir(cid
, fn
, sizeof(fn
));
2521 int fd
= ::open(fn
, O_RDONLY
|O_CLOEXEC
);
2524 derr
<< __FUNC__
<< ": " << cid
<< " error " << cpp_strerror(err
) << dendl
;
2525 ceph_abort_msg("_set_global_replay_guard failed");
2530 // then record that we did it
2533 int r
= chain_fsetxattr
<true, true>(
2534 fd
, GLOBAL_REPLAY_GUARD_XATTR
, v
.c_str(), v
.length());
2536 derr
<< __FUNC__
<< ": fsetxattr " << GLOBAL_REPLAY_GUARD_XATTR
2537 << " got " << cpp_strerror(r
) << dendl
;
2538 ceph_abort_msg("fsetxattr failed");
2541 // and make sure our xattr is durable.
2544 derr
<< __func__
<< " fsync failed: " << cpp_strerror(errno
) << dendl
;
2550 VOID_TEMP_FAILURE_RETRY(::close(fd
));
2551 dout(10) << __FUNC__
<< ": " << spos
<< " done" << dendl
;
2554 int FileStore::_check_global_replay_guard(const coll_t
& cid
,
2555 const SequencerPosition
& spos
)
2558 get_cdir(cid
, fn
, sizeof(fn
));
2559 int fd
= ::open(fn
, O_RDONLY
|O_CLOEXEC
);
2561 dout(10) << __FUNC__
<< ": " << cid
<< " dne" << dendl
;
2562 return 1; // if collection does not exist, there is no guard, and we can replay.
2566 int r
= chain_fgetxattr(fd
, GLOBAL_REPLAY_GUARD_XATTR
, buf
, sizeof(buf
));
2568 dout(20) << __FUNC__
<< ": no xattr" << dendl
;
2569 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
2570 VOID_TEMP_FAILURE_RETRY(::close(fd
));
2571 return 1; // no xattr
2576 SequencerPosition opos
;
2577 auto p
= bl
.cbegin();
2580 VOID_TEMP_FAILURE_RETRY(::close(fd
));
2581 return spos
>= opos
? 1 : -1;
2585 void FileStore::_set_replay_guard(const coll_t
& cid
,
2586 const SequencerPosition
&spos
,
2587 bool in_progress
=false)
2590 get_cdir(cid
, fn
, sizeof(fn
));
2591 int fd
= ::open(fn
, O_RDONLY
|O_CLOEXEC
);
2594 derr
<< __FUNC__
<< ": " << cid
<< " error " << cpp_strerror(err
) << dendl
;
2595 ceph_abort_msg("_set_replay_guard failed");
2597 _set_replay_guard(fd
, spos
, 0, in_progress
);
2598 VOID_TEMP_FAILURE_RETRY(::close(fd
));
2602 void FileStore::_set_replay_guard(int fd
,
2603 const SequencerPosition
& spos
,
2604 const ghobject_t
*hoid
,
2607 if (backend
->can_checkpoint())
2610 dout(10) << __FUNC__
<< ": " << spos
<< (in_progress
? " START" : "") << dendl
;
2614 // first make sure the previous operation commits
2615 int r
= ::fsync(fd
);
2617 derr
<< __func__
<< " fsync failed: " << cpp_strerror(errno
) << dendl
;
2622 // sync object_map too. even if this object has a header or keys,
2623 // it have had them in the past and then removed them, so always
2625 object_map
->sync(hoid
, &spos
);
2630 // then record that we did it
2633 encode(in_progress
, v
);
2634 r
= chain_fsetxattr
<true, true>(
2635 fd
, REPLAY_GUARD_XATTR
, v
.c_str(), v
.length());
2637 derr
<< "fsetxattr " << REPLAY_GUARD_XATTR
<< " got " << cpp_strerror(r
) << dendl
;
2638 ceph_abort_msg("fsetxattr failed");
2641 // and make sure our xattr is durable.
2644 derr
<< __func__
<< " fsync failed: " << cpp_strerror(errno
) << dendl
;
2650 dout(10) << __FUNC__
<< ": " << spos
<< " done" << dendl
;
2653 void FileStore::_close_replay_guard(const coll_t
& cid
,
2654 const SequencerPosition
&spos
)
2657 get_cdir(cid
, fn
, sizeof(fn
));
2658 int fd
= ::open(fn
, O_RDONLY
|O_CLOEXEC
);
2661 derr
<< __FUNC__
<< ": " << cid
<< " error " << cpp_strerror(err
) << dendl
;
2662 ceph_abort_msg("_close_replay_guard failed");
2664 _close_replay_guard(fd
, spos
);
2665 VOID_TEMP_FAILURE_RETRY(::close(fd
));
2668 void FileStore::_close_replay_guard(int fd
, const SequencerPosition
& spos
,
2669 const ghobject_t
*hoid
)
2671 if (backend
->can_checkpoint())
2674 dout(10) << __FUNC__
<< ": " << spos
<< dendl
;
2678 // sync object_map too. even if this object has a header or keys,
2679 // it have had them in the past and then removed them, so always
2681 object_map
->sync(hoid
, &spos
);
2683 // then record that we are done with this operation
2686 bool in_progress
= false;
2687 encode(in_progress
, v
);
2688 int r
= chain_fsetxattr
<true, true>(
2689 fd
, REPLAY_GUARD_XATTR
, v
.c_str(), v
.length());
2691 derr
<< "fsetxattr " << REPLAY_GUARD_XATTR
<< " got " << cpp_strerror(r
) << dendl
;
2692 ceph_abort_msg("fsetxattr failed");
2695 // and make sure our xattr is durable.
2698 derr
<< __func__
<< " fsync failed: " << cpp_strerror(errno
) << dendl
;
2704 dout(10) << __FUNC__
<< ": " << spos
<< " done" << dendl
;
2707 int FileStore::_check_replay_guard(const coll_t
& cid
, const ghobject_t
&oid
,
2708 const SequencerPosition
& spos
)
2710 if (!replaying
|| backend
->can_checkpoint())
2713 int r
= _check_global_replay_guard(cid
, spos
);
2718 r
= lfn_open(cid
, oid
, false, &fd
);
2720 dout(10) << __FUNC__
<< ": " << cid
<< " " << oid
<< " dne" << dendl
;
2721 return 1; // if file does not exist, there is no guard, and we can replay.
2723 int ret
= _check_replay_guard(**fd
, spos
);
2728 int FileStore::_check_replay_guard(const coll_t
& cid
, const SequencerPosition
& spos
)
2730 if (!replaying
|| backend
->can_checkpoint())
2734 get_cdir(cid
, fn
, sizeof(fn
));
2735 int fd
= ::open(fn
, O_RDONLY
|O_CLOEXEC
);
2737 dout(10) << __FUNC__
<< ": " << cid
<< " dne" << dendl
;
2738 return 1; // if collection does not exist, there is no guard, and we can replay.
2740 int ret
= _check_replay_guard(fd
, spos
);
2741 VOID_TEMP_FAILURE_RETRY(::close(fd
));
2745 int FileStore::_check_replay_guard(int fd
, const SequencerPosition
& spos
)
2747 if (!replaying
|| backend
->can_checkpoint())
2751 int r
= chain_fgetxattr(fd
, REPLAY_GUARD_XATTR
, buf
, sizeof(buf
));
2753 dout(20) << __FUNC__
<< ": no xattr" << dendl
;
2754 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
2755 return 1; // no xattr
2760 SequencerPosition opos
;
2761 auto p
= bl
.cbegin();
2763 bool in_progress
= false;
2764 if (!p
.end()) // older journals don't have this
2765 decode(in_progress
, p
);
2767 dout(10) << __FUNC__
<< ": object has " << opos
<< " > current pos " << spos
2768 << ", now or in future, SKIPPING REPLAY" << dendl
;
2770 } else if (opos
== spos
) {
2772 dout(10) << __FUNC__
<< ": object has " << opos
<< " == current pos " << spos
2773 << ", in_progress=true, CONDITIONAL REPLAY" << dendl
;
2776 dout(10) << __FUNC__
<< ": object has " << opos
<< " == current pos " << spos
2777 << ", in_progress=false, SKIPPING REPLAY" << dendl
;
2781 dout(10) << __FUNC__
<< ": object has " << opos
<< " < current pos " << spos
2782 << ", in past, will replay" << dendl
;
2787 void FileStore::_do_transaction(
2788 Transaction
& t
, uint64_t op_seq
, int trans_num
,
2789 ThreadPool::TPHandle
*handle
,
2790 const char *osr_name
)
2792 dout(10) << __FUNC__
<< ": on " << &t
<< dendl
;
2794 Transaction::iterator i
= t
.begin();
2796 SequencerPosition
spos(op_seq
, trans_num
, 0);
2797 while (i
.have_op()) {
2799 handle
->reset_tp_timeout();
2801 Transaction::Op
*op
= i
.decode_op();
2807 case Transaction::OP_NOP
:
2809 case Transaction::OP_TOUCH
:
2810 case Transaction::OP_CREATE
:
2812 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2813 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2814 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2815 _cid
: _cid
.get_temp();
2816 tracepoint(objectstore
, touch_enter
, osr_name
);
2817 if (_check_replay_guard(cid
, oid
, spos
) > 0)
2818 r
= _touch(cid
, oid
);
2819 tracepoint(objectstore
, touch_exit
, r
);
2823 case Transaction::OP_WRITE
:
2825 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2826 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2827 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2828 _cid
: _cid
.get_temp();
2829 uint64_t off
= op
->off
;
2830 uint64_t len
= op
->len
;
2831 uint32_t fadvise_flags
= i
.get_fadvise_flags();
2834 tracepoint(objectstore
, write_enter
, osr_name
, off
, len
);
2835 if (_check_replay_guard(cid
, oid
, spos
) > 0)
2836 r
= _write(cid
, oid
, off
, len
, bl
, fadvise_flags
);
2837 tracepoint(objectstore
, write_exit
, r
);
2841 case Transaction::OP_ZERO
:
2843 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2844 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2845 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2846 _cid
: _cid
.get_temp();
2847 uint64_t off
= op
->off
;
2848 uint64_t len
= op
->len
;
2849 tracepoint(objectstore
, zero_enter
, osr_name
, off
, len
);
2850 if (_check_replay_guard(cid
, oid
, spos
) > 0)
2851 r
= _zero(cid
, oid
, off
, len
);
2852 tracepoint(objectstore
, zero_exit
, r
);
2856 case Transaction::OP_TRIMCACHE
:
2858 // deprecated, no-op
2862 case Transaction::OP_TRUNCATE
:
2864 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2865 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2866 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2867 _cid
: _cid
.get_temp();
2868 uint64_t off
= op
->off
;
2869 tracepoint(objectstore
, truncate_enter
, osr_name
, off
);
2870 if (_check_replay_guard(cid
, oid
, spos
) > 0)
2871 r
= _truncate(cid
, oid
, off
);
2872 tracepoint(objectstore
, truncate_exit
, r
);
2876 case Transaction::OP_REMOVE
:
2878 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2879 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2880 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2881 _cid
: _cid
.get_temp();
2882 tracepoint(objectstore
, remove_enter
, osr_name
);
2883 if (_check_replay_guard(cid
, oid
, spos
) > 0)
2884 r
= _remove(cid
, oid
, spos
);
2885 tracepoint(objectstore
, remove_exit
, r
);
2889 case Transaction::OP_SETATTR
:
2891 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2892 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2893 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2894 _cid
: _cid
.get_temp();
2895 string name
= i
.decode_string();
2898 tracepoint(objectstore
, setattr_enter
, osr_name
);
2899 if (_check_replay_guard(cid
, oid
, spos
) > 0) {
2900 map
<string
, bufferptr
> to_set
;
2901 to_set
[name
] = bufferptr(bl
.c_str(), bl
.length());
2902 r
= _setattrs(cid
, oid
, to_set
, spos
);
2904 dout(0) << " ENOSPC on setxattr on " << cid
<< "/" << oid
2905 << " name " << name
<< " size " << bl
.length() << dendl
;
2907 tracepoint(objectstore
, setattr_exit
, r
);
2911 case Transaction::OP_SETATTRS
:
2913 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2914 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2915 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2916 _cid
: _cid
.get_temp();
2917 map
<string
, bufferptr
> aset
;
2918 i
.decode_attrset(aset
);
2919 tracepoint(objectstore
, setattrs_enter
, osr_name
);
2920 if (_check_replay_guard(cid
, oid
, spos
) > 0)
2921 r
= _setattrs(cid
, oid
, aset
, spos
);
2922 tracepoint(objectstore
, setattrs_exit
, r
);
2924 dout(0) << " ENOSPC on setxattrs on " << cid
<< "/" << oid
<< dendl
;
2928 case Transaction::OP_RMATTR
:
2930 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2931 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2932 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2933 _cid
: _cid
.get_temp();
2934 string name
= i
.decode_string();
2935 tracepoint(objectstore
, rmattr_enter
, osr_name
);
2936 if (_check_replay_guard(cid
, oid
, spos
) > 0)
2937 r
= _rmattr(cid
, oid
, name
.c_str(), spos
);
2938 tracepoint(objectstore
, rmattr_exit
, r
);
2942 case Transaction::OP_RMATTRS
:
2944 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2945 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2946 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2947 _cid
: _cid
.get_temp();
2948 tracepoint(objectstore
, rmattrs_enter
, osr_name
);
2949 if (_check_replay_guard(cid
, oid
, spos
) > 0)
2950 r
= _rmattrs(cid
, oid
, spos
);
2951 tracepoint(objectstore
, rmattrs_exit
, r
);
2955 case Transaction::OP_CLONE
:
2957 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2958 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2959 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2960 _cid
: _cid
.get_temp();
2961 const ghobject_t
&noid
= i
.get_oid(op
->dest_oid
);
2962 tracepoint(objectstore
, clone_enter
, osr_name
);
2963 r
= _clone(cid
, oid
, noid
, spos
);
2964 tracepoint(objectstore
, clone_exit
, r
);
2968 case Transaction::OP_CLONERANGE
:
2970 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2971 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2972 const ghobject_t
&noid
= i
.get_oid(op
->dest_oid
);
2973 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2974 _cid
: _cid
.get_temp();
2975 const coll_t
&ncid
= !_need_temp_object_collection(_cid
, noid
) ?
2976 _cid
: _cid
.get_temp();
2977 uint64_t off
= op
->off
;
2978 uint64_t len
= op
->len
;
2979 tracepoint(objectstore
, clone_range_enter
, osr_name
, len
);
2980 r
= _clone_range(cid
, oid
, ncid
, noid
, off
, len
, off
, spos
);
2981 tracepoint(objectstore
, clone_range_exit
, r
);
2985 case Transaction::OP_CLONERANGE2
:
2987 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2988 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2989 const ghobject_t
&noid
= i
.get_oid(op
->dest_oid
);
2990 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2991 _cid
: _cid
.get_temp();
2992 const coll_t
&ncid
= !_need_temp_object_collection(_cid
, noid
) ?
2993 _cid
: _cid
.get_temp();
2994 uint64_t srcoff
= op
->off
;
2995 uint64_t len
= op
->len
;
2996 uint64_t dstoff
= op
->dest_off
;
2997 tracepoint(objectstore
, clone_range2_enter
, osr_name
, len
);
2998 r
= _clone_range(cid
, oid
, ncid
, noid
, srcoff
, len
, dstoff
, spos
);
2999 tracepoint(objectstore
, clone_range2_exit
, r
);
3003 case Transaction::OP_MKCOLL
:
3005 const coll_t
&cid
= i
.get_cid(op
->cid
);
3006 tracepoint(objectstore
, mkcoll_enter
, osr_name
);
3007 if (_check_replay_guard(cid
, spos
) > 0)
3008 r
= _create_collection(cid
, op
->split_bits
, spos
);
3009 tracepoint(objectstore
, mkcoll_exit
, r
);
3013 case Transaction::OP_COLL_SET_BITS
:
3015 const coll_t
&cid
= i
.get_cid(op
->cid
);
3016 int bits
= op
->split_bits
;
3017 r
= _collection_set_bits(cid
, bits
);
3021 case Transaction::OP_COLL_HINT
:
3023 const coll_t
&cid
= i
.get_cid(op
->cid
);
3024 uint32_t type
= op
->hint
;
3027 auto hiter
= hint
.cbegin();
3028 if (type
== Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS
) {
3031 decode(pg_num
, hiter
);
3032 decode(num_objs
, hiter
);
3033 if (_check_replay_guard(cid
, spos
) > 0) {
3034 r
= _collection_hint_expected_num_objs(cid
, pg_num
, num_objs
, spos
);
3038 dout(10) << "Unrecognized collection hint type: " << type
<< dendl
;
3043 case Transaction::OP_RMCOLL
:
3045 const coll_t
&cid
= i
.get_cid(op
->cid
);
3046 tracepoint(objectstore
, rmcoll_enter
, osr_name
);
3047 if (_check_replay_guard(cid
, spos
) > 0)
3048 r
= _destroy_collection(cid
);
3049 tracepoint(objectstore
, rmcoll_exit
, r
);
3053 case Transaction::OP_COLL_ADD
:
3055 const coll_t
&ocid
= i
.get_cid(op
->cid
);
3056 const coll_t
&ncid
= i
.get_cid(op
->dest_cid
);
3057 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
3059 ceph_assert(oid
.hobj
.pool
>= -1);
3061 // always followed by OP_COLL_REMOVE
3062 Transaction::Op
*op2
= i
.decode_op();
3063 const coll_t
&ocid2
= i
.get_cid(op2
->cid
);
3064 const ghobject_t
&oid2
= i
.get_oid(op2
->oid
);
3065 ceph_assert(op2
->op
== Transaction::OP_COLL_REMOVE
);
3066 ceph_assert(ocid2
== ocid
);
3067 ceph_assert(oid2
== oid
);
3069 tracepoint(objectstore
, coll_add_enter
);
3070 r
= _collection_add(ncid
, ocid
, oid
, spos
);
3071 tracepoint(objectstore
, coll_add_exit
, r
);
3075 tracepoint(objectstore
, coll_remove_enter
, osr_name
);
3076 if (_check_replay_guard(ocid
, oid
, spos
) > 0)
3077 r
= _remove(ocid
, oid
, spos
);
3078 tracepoint(objectstore
, coll_remove_exit
, r
);
3082 case Transaction::OP_COLL_MOVE
:
3084 // WARNING: this is deprecated and buggy; only here to replay old journals.
3085 const coll_t
&ocid
= i
.get_cid(op
->cid
);
3086 const coll_t
&ncid
= i
.get_cid(op
->dest_cid
);
3087 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
3088 tracepoint(objectstore
, coll_move_enter
);
3089 r
= _collection_add(ocid
, ncid
, oid
, spos
);
3091 (_check_replay_guard(ocid
, oid
, spos
) > 0))
3092 r
= _remove(ocid
, oid
, spos
);
3093 tracepoint(objectstore
, coll_move_exit
, r
);
3097 case Transaction::OP_COLL_MOVE_RENAME
:
3099 const coll_t
&_oldcid
= i
.get_cid(op
->cid
);
3100 const ghobject_t
&oldoid
= i
.get_oid(op
->oid
);
3101 const coll_t
&_newcid
= i
.get_cid(op
->dest_cid
);
3102 const ghobject_t
&newoid
= i
.get_oid(op
->dest_oid
);
3103 const coll_t
&oldcid
= !_need_temp_object_collection(_oldcid
, oldoid
) ?
3104 _oldcid
: _oldcid
.get_temp();
3105 const coll_t
&newcid
= !_need_temp_object_collection(_newcid
, newoid
) ?
3106 _oldcid
: _newcid
.get_temp();
3107 tracepoint(objectstore
, coll_move_rename_enter
);
3108 r
= _collection_move_rename(oldcid
, oldoid
, newcid
, newoid
, spos
);
3109 tracepoint(objectstore
, coll_move_rename_exit
, r
);
3113 case Transaction::OP_TRY_RENAME
:
3115 const coll_t
&_cid
= i
.get_cid(op
->cid
);
3116 const ghobject_t
&oldoid
= i
.get_oid(op
->oid
);
3117 const ghobject_t
&newoid
= i
.get_oid(op
->dest_oid
);
3118 const coll_t
&oldcid
= !_need_temp_object_collection(_cid
, oldoid
) ?
3119 _cid
: _cid
.get_temp();
3120 const coll_t
&newcid
= !_need_temp_object_collection(_cid
, newoid
) ?
3121 _cid
: _cid
.get_temp();
3122 tracepoint(objectstore
, coll_try_rename_enter
);
3123 r
= _collection_move_rename(oldcid
, oldoid
, newcid
, newoid
, spos
, true);
3124 tracepoint(objectstore
, coll_try_rename_exit
, r
);
3128 case Transaction::OP_COLL_SETATTR
:
3129 case Transaction::OP_COLL_RMATTR
:
3130 ceph_abort_msg("collection attr methods no longer implemented");
3133 case Transaction::OP_COLL_RENAME
:
3139 case Transaction::OP_OMAP_CLEAR
:
3141 const coll_t
&_cid
= i
.get_cid(op
->cid
);
3142 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
3143 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
3144 _cid
: _cid
.get_temp();
3145 tracepoint(objectstore
, omap_clear_enter
, osr_name
);
3146 if (_check_replay_guard(cid
, oid
, spos
) > 0)
3147 r
= _omap_clear(cid
, oid
, spos
);
3148 tracepoint(objectstore
, omap_clear_exit
, r
);
3151 case Transaction::OP_OMAP_SETKEYS
:
3153 const coll_t
&_cid
= i
.get_cid(op
->cid
);
3154 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
3155 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
3156 _cid
: _cid
.get_temp();
3157 map
<string
, bufferlist
> aset
;
3158 i
.decode_attrset(aset
);
3159 tracepoint(objectstore
, omap_setkeys_enter
, osr_name
);
3160 if (_check_replay_guard(cid
, oid
, spos
) > 0)
3161 r
= _omap_setkeys(cid
, oid
, aset
, spos
);
3162 tracepoint(objectstore
, omap_setkeys_exit
, r
);
3165 case Transaction::OP_OMAP_RMKEYS
:
3167 const coll_t
&_cid
= i
.get_cid(op
->cid
);
3168 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
3169 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
3170 _cid
: _cid
.get_temp();
3172 i
.decode_keyset(keys
);
3173 tracepoint(objectstore
, omap_rmkeys_enter
, osr_name
);
3174 if (_check_replay_guard(cid
, oid
, spos
) > 0)
3175 r
= _omap_rmkeys(cid
, oid
, keys
, spos
);
3176 tracepoint(objectstore
, omap_rmkeys_exit
, r
);
3179 case Transaction::OP_OMAP_RMKEYRANGE
:
3181 const coll_t
&_cid
= i
.get_cid(op
->cid
);
3182 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
3183 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
3184 _cid
: _cid
.get_temp();
3186 first
= i
.decode_string();
3187 last
= i
.decode_string();
3188 tracepoint(objectstore
, omap_rmkeyrange_enter
, osr_name
);
3189 if (_check_replay_guard(cid
, oid
, spos
) > 0)
3190 r
= _omap_rmkeyrange(cid
, oid
, first
, last
, spos
);
3191 tracepoint(objectstore
, omap_rmkeyrange_exit
, r
);
3194 case Transaction::OP_OMAP_SETHEADER
:
3196 const coll_t
&_cid
= i
.get_cid(op
->cid
);
3197 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
3198 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
3199 _cid
: _cid
.get_temp();
3202 tracepoint(objectstore
, omap_setheader_enter
, osr_name
);
3203 if (_check_replay_guard(cid
, oid
, spos
) > 0)
3204 r
= _omap_setheader(cid
, oid
, bl
, spos
);
3205 tracepoint(objectstore
, omap_setheader_exit
, r
);
3208 case Transaction::OP_SPLIT_COLLECTION
:
3210 ceph_abort_msg("not legacy journal; upgrade to firefly first");
3213 case Transaction::OP_SPLIT_COLLECTION2
:
3215 coll_t cid
= i
.get_cid(op
->cid
);
3216 uint32_t bits
= op
->split_bits
;
3217 uint32_t rem
= op
->split_rem
;
3218 coll_t dest
= i
.get_cid(op
->dest_cid
);
3219 tracepoint(objectstore
, split_coll2_enter
, osr_name
);
3220 r
= _split_collection(cid
, bits
, rem
, dest
, spos
);
3221 tracepoint(objectstore
, split_coll2_exit
, r
);
3225 case Transaction::OP_MERGE_COLLECTION
:
3227 coll_t cid
= i
.get_cid(op
->cid
);
3228 uint32_t bits
= op
->split_bits
;
3229 coll_t dest
= i
.get_cid(op
->dest_cid
);
3230 tracepoint(objectstore
, merge_coll_enter
, osr_name
);
3231 r
= _merge_collection(cid
, bits
, dest
, spos
);
3232 tracepoint(objectstore
, merge_coll_exit
, r
);
3236 case Transaction::OP_SETALLOCHINT
:
3238 const coll_t
&_cid
= i
.get_cid(op
->cid
);
3239 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
3240 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
3241 _cid
: _cid
.get_temp();
3242 uint64_t expected_object_size
= op
->expected_object_size
;
3243 uint64_t expected_write_size
= op
->expected_write_size
;
3244 tracepoint(objectstore
, setallochint_enter
, osr_name
);
3245 if (_check_replay_guard(cid
, oid
, spos
) > 0)
3246 r
= _set_alloc_hint(cid
, oid
, expected_object_size
,
3247 expected_write_size
);
3248 tracepoint(objectstore
, setallochint_exit
, r
);
3253 derr
<< "bad op " << op
->op
<< dendl
;
3260 if (r
== -ENOENT
&& !(op
->op
== Transaction::OP_CLONERANGE
||
3261 op
->op
== Transaction::OP_CLONE
||
3262 op
->op
== Transaction::OP_CLONERANGE2
||
3263 op
->op
== Transaction::OP_COLL_ADD
||
3264 op
->op
== Transaction::OP_SETATTR
||
3265 op
->op
== Transaction::OP_SETATTRS
||
3266 op
->op
== Transaction::OP_RMATTR
||
3267 op
->op
== Transaction::OP_OMAP_SETKEYS
||
3268 op
->op
== Transaction::OP_OMAP_RMKEYS
||
3269 op
->op
== Transaction::OP_OMAP_RMKEYRANGE
||
3270 op
->op
== Transaction::OP_OMAP_SETHEADER
))
3271 // -ENOENT is normally okay
3272 // ...including on a replayed OP_RMCOLL with checkpoint mode
3277 if (op
->op
== Transaction::OP_SETALLOCHINT
)
3278 // Either EOPNOTSUPP or EINVAL most probably. EINVAL in most
3279 // cases means invalid hint size (e.g. too big, not a multiple
3280 // of block size, etc) or, at least on xfs, an attempt to set
3281 // or change it when the file is not empty. However,
3282 // OP_SETALLOCHINT is advisory, so ignore all errors.
3285 if (replaying
&& !backend
->can_checkpoint()) {
3286 if (r
== -EEXIST
&& op
->op
== Transaction::OP_MKCOLL
) {
3287 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl
;
3290 if (r
== -EEXIST
&& op
->op
== Transaction::OP_COLL_ADD
) {
3291 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl
;
3294 if (r
== -EEXIST
&& op
->op
== Transaction::OP_COLL_MOVE
) {
3295 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl
;
3299 dout(10) << "tolerating ERANGE on replay" << dendl
;
3303 dout(10) << "tolerating ENOENT on replay" << dendl
;
3309 const char *msg
= "unexpected error code";
3311 if (r
== -ENOENT
&& (op
->op
== Transaction::OP_CLONERANGE
||
3312 op
->op
== Transaction::OP_CLONE
||
3313 op
->op
== Transaction::OP_CLONERANGE2
)) {
3314 msg
= "ENOENT on clone suggests osd bug";
3315 } else if (r
== -ENOSPC
) {
3316 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
3317 // by partially applying transactions.
3318 msg
= "ENOSPC from disk filesystem, misconfigured cluster";
3319 } else if (r
== -ENOTEMPTY
) {
3320 msg
= "ENOTEMPTY suggests garbage data in osd data dir";
3321 } else if (r
== -EPERM
) {
3322 msg
= "EPERM suggests file(s) in osd data dir not owned by ceph user, or leveldb corruption";
3325 derr
<< " error " << cpp_strerror(r
) << " not handled on operation " << op
3326 << " (" << spos
<< ", or op " << spos
.op
<< ", counting from 0)" << dendl
;
3327 dout(0) << msg
<< dendl
;
3328 dout(0) << " transaction dump:\n";
3329 JSONFormatter
f(true);
3330 f
.open_object_section("transaction");
3340 ceph_abort_msg("unexpected error");
3350 /*********************************************/
3354 // --------------------
3357 bool FileStore::exists(CollectionHandle
& ch
, const ghobject_t
& oid
)
3359 tracepoint(objectstore
, exists_enter
, ch
->cid
.c_str());
3360 auto osr
= static_cast<OpSequencer
*>(ch
.get());
3361 osr
->wait_for_apply(oid
);
3363 bool retval
= stat(ch
, oid
, &st
) == 0;
3364 tracepoint(objectstore
, exists_exit
, retval
);
3368 int FileStore::stat(
3369 CollectionHandle
& ch
, const ghobject_t
& oid
, struct stat
*st
, bool allow_eio
)
3371 tracepoint(objectstore
, stat_enter
, ch
->cid
.c_str());
3372 auto osr
= static_cast<OpSequencer
*>(ch
.get());
3373 osr
->wait_for_apply(oid
);
3374 const coll_t
& cid
= !_need_temp_object_collection(ch
->cid
, oid
) ? ch
->cid
: ch
->cid
.get_temp();
3375 int r
= lfn_stat(cid
, oid
, st
);
3376 ceph_assert(allow_eio
|| !m_filestore_fail_eio
|| r
!= -EIO
);
3378 dout(10) << __FUNC__
<< ": " << ch
->cid
<< "/" << oid
3379 << " = " << r
<< dendl
;
3381 dout(10) << __FUNC__
<< ": " << ch
->cid
<< "/" << oid
3383 << " (size " << st
->st_size
<< ")" << dendl
;
3385 if (cct
->_conf
->filestore_debug_inject_read_err
&&
3386 debug_mdata_eio(oid
)) {
3389 tracepoint(objectstore
, stat_exit
, r
);
3394 int FileStore::set_collection_opts(
3395 CollectionHandle
& ch
,
3396 const pool_opts_t
& opts
)
3401 int FileStore::read(
3402 CollectionHandle
& ch
,
3403 const ghobject_t
& oid
,
3410 tracepoint(objectstore
, read_enter
, ch
->cid
.c_str(), offset
, len
);
3411 const coll_t
& cid
= !_need_temp_object_collection(ch
->cid
, oid
) ? ch
->cid
: ch
->cid
.get_temp();
3413 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< dendl
;
3415 auto osr
= static_cast<OpSequencer
*>(ch
.get());
3416 osr
->wait_for_apply(oid
);
3419 int r
= lfn_open(cid
, oid
, false, &fd
);
3421 dout(10) << __FUNC__
<< ": (" << cid
<< "/" << oid
<< ") open error: "
3422 << cpp_strerror(r
) << dendl
;
3426 if (offset
== 0 && len
== 0) {
3428 memset(&st
, 0, sizeof(struct stat
));
3429 int r
= ::fstat(**fd
, &st
);
3430 ceph_assert(r
== 0);
3434 #ifdef HAVE_POSIX_FADVISE
3435 if (op_flags
& CEPH_OSD_OP_FLAG_FADVISE_RANDOM
)
3436 posix_fadvise(**fd
, offset
, len
, POSIX_FADV_RANDOM
);
3437 if (op_flags
& CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
)
3438 posix_fadvise(**fd
, offset
, len
, POSIX_FADV_SEQUENTIAL
);
3441 bufferptr
bptr(len
); // prealloc space for entire read
3442 got
= safe_pread(**fd
, bptr
.c_str(), len
, offset
);
3444 dout(10) << __FUNC__
<< ": (" << cid
<< "/" << oid
<< ") pread error: " << cpp_strerror(got
) << dendl
;
3448 bptr
.set_length(got
); // properly size the buffer
3450 bl
.push_back(std::move(bptr
)); // put it in the target bufferlist
3452 #ifdef HAVE_POSIX_FADVISE
3453 if (op_flags
& CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
)
3454 posix_fadvise(**fd
, offset
, len
, POSIX_FADV_DONTNEED
);
3455 if (op_flags
& (CEPH_OSD_OP_FLAG_FADVISE_RANDOM
| CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
))
3456 posix_fadvise(**fd
, offset
, len
, POSIX_FADV_NORMAL
);
3459 if (m_filestore_sloppy_crc
&& (!replaying
|| backend
->can_checkpoint())) {
3461 int errors
= backend
->_crc_verify_read(**fd
, offset
, got
, bl
, &ss
);
3463 dout(0) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " " << offset
<< "~"
3464 << got
<< " ... BAD CRC:\n" << ss
.str() << dendl
;
3465 ceph_abort_msg("bad crc on read");
3471 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " " << offset
<< "~"
3472 << got
<< "/" << len
<< dendl
;
3473 if (cct
->_conf
->filestore_debug_inject_read_err
&&
3474 debug_data_eio(oid
)) {
3476 } else if (oid
.hobj
.pool
> 0 && /* FIXME, see #23029 */
3477 cct
->_conf
->filestore_debug_random_read_err
&&
3478 (rand() % (int)(cct
->_conf
->filestore_debug_random_read_err
*
3480 dout(0) << __func__
<< ": inject random EIO" << dendl
;
3483 tracepoint(objectstore
, read_exit
, got
);
3488 int FileStore::_do_fiemap(int fd
, uint64_t offset
, size_t len
,
3489 map
<uint64_t, uint64_t> *m
)
3492 struct fiemap_extent
*extent
= nullptr;
3493 struct fiemap
*fiemap
= nullptr;
3497 r
= backend
->do_fiemap(fd
, offset
, len
, &fiemap
);
3501 if (fiemap
->fm_mapped_extents
== 0) {
3506 extent
= &fiemap
->fm_extents
[0];
3508 /* start where we were asked to start */
3509 if (extent
->fe_logical
< offset
) {
3510 extent
->fe_length
-= offset
- extent
->fe_logical
;
3511 extent
->fe_logical
= offset
;
3516 struct fiemap_extent
*last
= nullptr;
3517 while (i
< fiemap
->fm_mapped_extents
) {
3518 struct fiemap_extent
*next
= extent
+ 1;
3520 dout(10) << __FUNC__
<< ": fm_mapped_extents=" << fiemap
->fm_mapped_extents
3521 << " fe_logical=" << extent
->fe_logical
<< " fe_length=" << extent
->fe_length
<< dendl
;
3523 /* try to merge extents */
3524 while ((i
< fiemap
->fm_mapped_extents
- 1) &&
3525 (extent
->fe_logical
+ extent
->fe_length
== next
->fe_logical
)) {
3526 next
->fe_length
+= extent
->fe_length
;
3527 next
->fe_logical
= extent
->fe_logical
;
3533 if (extent
->fe_logical
+ extent
->fe_length
> offset
+ len
)
3534 extent
->fe_length
= offset
+ len
- extent
->fe_logical
;
3535 (*m
)[extent
->fe_logical
] = extent
->fe_length
;
3539 uint64_t xoffset
= last
->fe_logical
+ last
->fe_length
- offset
;
3540 offset
= last
->fe_logical
+ last
->fe_length
;
3542 const bool is_last
= (last
->fe_flags
& FIEMAP_EXTENT_LAST
) || (len
== 0);
3551 int FileStore::_do_seek_hole_data(int fd
, uint64_t offset
, size_t len
,
3552 map
<uint64_t, uint64_t> *m
)
3554 #if defined(__linux__) && defined(SEEK_HOLE) && defined(SEEK_DATA)
3555 off_t hole_pos
, data_pos
;
3558 // If lseek fails with errno setting to be ENXIO, this means the current
3559 // file offset is beyond the end of the file.
3560 off_t start
= offset
;
3561 while(start
< (off_t
)(offset
+ len
)) {
3562 data_pos
= lseek(fd
, start
, SEEK_DATA
);
3568 dout(10) << "failed to lseek: " << cpp_strerror(r
) << dendl
;
3571 } else if (data_pos
> (off_t
)(offset
+ len
)) {
3575 hole_pos
= lseek(fd
, data_pos
, SEEK_HOLE
);
3577 if (errno
== ENXIO
) {
3581 dout(10) << "failed to lseek: " << cpp_strerror(r
) << dendl
;
3586 if (hole_pos
>= (off_t
)(offset
+ len
)) {
3587 (*m
)[data_pos
] = offset
+ len
- data_pos
;
3590 (*m
)[data_pos
] = hole_pos
- data_pos
;
3601 int FileStore::fiemap(CollectionHandle
& ch
, const ghobject_t
& oid
,
3602 uint64_t offset
, size_t len
,
3605 map
<uint64_t, uint64_t> exomap
;
3606 int r
= fiemap(ch
, oid
, offset
, len
, exomap
);
3613 int FileStore::fiemap(CollectionHandle
& ch
, const ghobject_t
& oid
,
3614 uint64_t offset
, size_t len
,
3615 map
<uint64_t, uint64_t>& destmap
)
3617 tracepoint(objectstore
, fiemap_enter
, ch
->cid
.c_str(), offset
, len
);
3618 const coll_t
& cid
= !_need_temp_object_collection(ch
->cid
, oid
) ? ch
->cid
: ch
->cid
.get_temp();
3621 if ((!backend
->has_seek_data_hole() && !backend
->has_fiemap()) ||
3622 len
<= (size_t)m_filestore_fiemap_threshold
) {
3623 destmap
[offset
] = len
;
3627 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< dendl
;
3629 auto osr
= static_cast<OpSequencer
*>(ch
.get());
3630 osr
->wait_for_apply(oid
);
3634 int r
= lfn_open(cid
, oid
, false, &fd
);
3636 dout(10) << "read couldn't open " << cid
<< "/" << oid
<< ": " << cpp_strerror(r
) << dendl
;
3640 if (backend
->has_seek_data_hole()) {
3641 dout(15) << "seek_data/seek_hole " << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< dendl
;
3642 r
= _do_seek_hole_data(**fd
, offset
, len
, &destmap
);
3643 } else if (backend
->has_fiemap()) {
3644 dout(15) << "fiemap ioctl" << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< dendl
;
3645 r
= _do_fiemap(**fd
, offset
, len
, &destmap
);
3652 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< " = " << r
<< " num_extents=" << destmap
.size() << " " << destmap
<< dendl
;
3653 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
3654 tracepoint(objectstore
, fiemap_exit
, r
);
3658 int FileStore::_remove(const coll_t
& cid
, const ghobject_t
& oid
,
3659 const SequencerPosition
&spos
)
3661 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
<< dendl
;
3662 int r
= lfn_unlink(cid
, oid
, spos
);
3663 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " = " << r
<< dendl
;
3667 int FileStore::_truncate(const coll_t
& cid
, const ghobject_t
& oid
, uint64_t size
)
3669 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " size " << size
<< dendl
;
3670 int r
= lfn_truncate(cid
, oid
, size
);
3671 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " size " << size
<< " = " << r
<< dendl
;
3676 int FileStore::_touch(const coll_t
& cid
, const ghobject_t
& oid
)
3678 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
<< dendl
;
3681 int r
= lfn_open(cid
, oid
, true, &fd
);
3687 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " = " << r
<< dendl
;
3691 int FileStore::_write(const coll_t
& cid
, const ghobject_t
& oid
,
3692 uint64_t offset
, size_t len
,
3693 const bufferlist
& bl
, uint32_t fadvise_flags
)
3695 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< dendl
;
3699 r
= lfn_open(cid
, oid
, true, &fd
);
3701 dout(0) << __FUNC__
<< ": couldn't open " << cid
<< "/"
3703 << cpp_strerror(r
) << dendl
;
3708 r
= bl
.write_fd(**fd
, offset
);
3710 derr
<< __FUNC__
<< ": write_fd on " << cid
<< "/" << oid
3711 << " error: " << cpp_strerror(r
) << dendl
;
3717 if (r
>= 0 && m_filestore_sloppy_crc
) {
3718 int rc
= backend
->_crc_update_write(**fd
, offset
, len
, bl
);
3719 ceph_assert(rc
>= 0);
3722 if (replaying
|| m_disable_wbthrottle
) {
3723 if (fadvise_flags
& CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
) {
3724 #ifdef HAVE_POSIX_FADVISE
3725 posix_fadvise(**fd
, 0, 0, POSIX_FADV_DONTNEED
);
3729 wbthrottle
.queue_wb(fd
, oid
, offset
, len
,
3730 fadvise_flags
& CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
);
3736 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< " = " << r
<< dendl
;
3740 int FileStore::_zero(const coll_t
& cid
, const ghobject_t
& oid
, uint64_t offset
, size_t len
)
3742 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< dendl
;
3745 if (cct
->_conf
->filestore_punch_hole
) {
3746 #ifdef CEPH_HAVE_FALLOCATE
3747 # if !defined(__APPLE__) && !defined(__FreeBSD__)
3748 # ifdef FALLOC_FL_KEEP_SIZE
3749 // first try to punch a hole.
3751 ret
= lfn_open(cid
, oid
, false, &fd
);
3757 ret
= ::fstat(**fd
, &st
);
3764 // first try fallocate
3765 ret
= fallocate(**fd
, FALLOC_FL_KEEP_SIZE
| FALLOC_FL_PUNCH_HOLE
,
3770 // ensure we extend file size, if needed
3771 if (len
> 0 && offset
+ len
> (uint64_t)st
.st_size
) {
3772 ret
= ::ftruncate(**fd
, offset
+ len
);
3782 if (ret
>= 0 && m_filestore_sloppy_crc
) {
3783 int rc
= backend
->_crc_update_zero(**fd
, offset
, len
);
3784 ceph_assert(rc
>= 0);
3789 if (ret
!= -EOPNOTSUPP
)
3790 goto out
; // some other error
3796 // lame, kernel is old and doesn't support it.
3797 // write zeros.. yuck!
3798 dout(20) << __FUNC__
<< ": falling back to writing zeros" << dendl
;
3801 bl
.append_zero(len
);
3802 ret
= _write(cid
, oid
, offset
, len
, bl
);
3805 #ifdef CEPH_HAVE_FALLOCATE
3806 # if !defined(__APPLE__) && !defined(__FreeBSD__)
3807 # ifdef FALLOC_FL_KEEP_SIZE
3812 dout(20) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< " = " << ret
<< dendl
;
3816 int FileStore::_clone(const coll_t
& cid
, const ghobject_t
& oldoid
, const ghobject_t
& newoid
,
3817 const SequencerPosition
& spos
)
3819 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oldoid
<< " -> " << cid
<< "/" << newoid
<< dendl
;
3821 if (_check_replay_guard(cid
, newoid
, spos
) < 0)
3828 r
= lfn_open(cid
, oldoid
, false, &o
, &index
);
3832 ceph_assert(index
.index
);
3833 std::unique_lock l
{(index
.index
)->access_lock
};
3835 r
= lfn_open(cid
, newoid
, true, &n
, &index
);
3839 r
= ::ftruncate(**n
, 0);
3845 r
= ::fstat(**o
, &st
);
3851 r
= _do_clone_range(**o
, **n
, 0, st
.st_size
, 0);
3856 dout(20) << "objectmap clone" << dendl
;
3857 r
= object_map
->clone(oldoid
, newoid
, &spos
);
3858 if (r
< 0 && r
!= -ENOENT
)
3864 map
<string
, bufferptr
> aset
;
3865 r
= _fgetattrs(**o
, aset
);
3869 r
= chain_fgetxattr(**o
, XATTR_SPILL_OUT_NAME
, buf
, sizeof(buf
));
3870 if (r
>= 0 && !strncmp(buf
, XATTR_NO_SPILL_OUT
, sizeof(XATTR_NO_SPILL_OUT
))) {
3871 r
= chain_fsetxattr
<true, true>(**n
, XATTR_SPILL_OUT_NAME
, XATTR_NO_SPILL_OUT
,
3872 sizeof(XATTR_NO_SPILL_OUT
));
3874 r
= chain_fsetxattr
<true, true>(**n
, XATTR_SPILL_OUT_NAME
, XATTR_SPILL_OUT
,
3875 sizeof(XATTR_SPILL_OUT
));
3880 r
= _fsetattrs(**n
, aset
);
3885 // clone is non-idempotent; record our work.
3886 _set_replay_guard(**n
, spos
, &newoid
);
3893 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oldoid
<< " -> " << cid
<< "/" << newoid
<< " = " << r
<< dendl
;
3894 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
3898 int FileStore::_do_clone_range(int from
, int to
, uint64_t srcoff
, uint64_t len
, uint64_t dstoff
)
3900 dout(20) << __FUNC__
<< ": copy " << srcoff
<< "~" << len
<< " to " << dstoff
<< dendl
;
3901 return backend
->clone_range(from
, to
, srcoff
, len
, dstoff
);
3904 int FileStore::_do_sparse_copy_range(int from
, int to
, uint64_t srcoff
, uint64_t len
, uint64_t dstoff
)
3906 dout(20) << __FUNC__
<< ": " << srcoff
<< "~" << len
<< " to " << dstoff
<< dendl
;
3908 map
<uint64_t, uint64_t> exomap
;
3909 // fiemap doesn't allow zero length
3913 if (backend
->has_seek_data_hole()) {
3914 dout(15) << "seek_data/seek_hole " << from
<< " " << srcoff
<< "~" << len
<< dendl
;
3915 r
= _do_seek_hole_data(from
, srcoff
, len
, &exomap
);
3916 } else if (backend
->has_fiemap()) {
3917 dout(15) << "fiemap ioctl" << from
<< " " << srcoff
<< "~" << len
<< dendl
;
3918 r
= _do_fiemap(from
, srcoff
, len
, &exomap
);
3922 int64_t written
= 0;
3926 for (map
<uint64_t, uint64_t>::iterator miter
= exomap
.begin(); miter
!= exomap
.end(); ++miter
) {
3927 uint64_t it_off
= miter
->first
- srcoff
+ dstoff
;
3928 r
= _do_copy_range(from
, to
, miter
->first
, miter
->second
, it_off
, true);
3930 derr
<< __FUNC__
<< ": copy error at " << miter
->first
<< "~" << miter
->second
3931 << " to " << it_off
<< ", " << cpp_strerror(r
) << dendl
;
3934 written
+= miter
->second
;
3938 if (m_filestore_sloppy_crc
) {
3939 int rc
= backend
->_crc_update_clone_range(from
, to
, srcoff
, len
, dstoff
);
3940 ceph_assert(rc
>= 0);
3943 r
= ::fstat(to
, &st
);
3946 derr
<< __FUNC__
<< ": fstat error at " << to
<< " " << cpp_strerror(r
) << dendl
;
3949 if (st
.st_size
< (int)(dstoff
+ len
)) {
3950 r
= ::ftruncate(to
, dstoff
+ len
);
3953 derr
<< __FUNC__
<< ": ftruncate error at " << dstoff
+len
<< " " << cpp_strerror(r
) << dendl
;
3961 dout(20) << __FUNC__
<< ": " << srcoff
<< "~" << len
<< " to " << dstoff
<< " = " << r
<< dendl
;
3965 int FileStore::_do_copy_range(int from
, int to
, uint64_t srcoff
, uint64_t len
, uint64_t dstoff
, bool skip_sloppycrc
)
3967 dout(20) << __FUNC__
<< ": " << srcoff
<< "~" << len
<< " to " << dstoff
<< dendl
;
3969 loff_t pos
= srcoff
;
3970 loff_t end
= srcoff
+ len
;
3971 int buflen
= 4096 * 16; //limit by pipe max size.see fcntl
3973 #ifdef CEPH_HAVE_SPLICE
3974 if (backend
->has_splice()) {
3976 if (pipe_cloexec(pipefd
, 0) < 0) {
3978 derr
<< " pipe " << " got " << cpp_strerror(e
) << dendl
;
3982 loff_t dstpos
= dstoff
;
3984 int l
= std::min
<int>(end
-pos
, buflen
);
3985 r
= safe_splice(from
, &pos
, pipefd
[1], nullptr, l
, SPLICE_F_NONBLOCK
);
3986 dout(10) << " safe_splice read from " << pos
<< "~" << l
<< " got " << r
<< dendl
;
3988 derr
<< __FUNC__
<< ": safe_splice read error at " << pos
<< "~" << len
3989 << ", " << cpp_strerror(r
) << dendl
;
3993 // hrm, bad source range, wtf.
3995 derr
<< __FUNC__
<< ": got short read result at " << pos
3996 << " of fd " << from
<< " len " << len
<< dendl
;
4000 r
= safe_splice(pipefd
[0], nullptr, to
, &dstpos
, r
, 0);
4001 dout(10) << " safe_splice write to " << to
<< " len " << r
4002 << " got " << r
<< dendl
;
4004 derr
<< __FUNC__
<< ": write error at " << pos
<< "~"
4005 << r
<< ", " << cpp_strerror(r
) << dendl
;
4016 actual
= ::lseek64(from
, srcoff
, SEEK_SET
);
4017 if (actual
!= (int64_t)srcoff
) {
4022 derr
<< "lseek64 to " << srcoff
<< " got " << cpp_strerror(r
) << dendl
;
4025 actual
= ::lseek64(to
, dstoff
, SEEK_SET
);
4026 if (actual
!= (int64_t)dstoff
) {
4031 derr
<< "lseek64 to " << dstoff
<< " got " << cpp_strerror(r
) << dendl
;
4037 int l
= std::min
<int>(end
-pos
, buflen
);
4038 r
= ::read(from
, buf
, l
);
4039 dout(25) << " read from " << pos
<< "~" << l
<< " got " << r
<< dendl
;
4041 if (errno
== EINTR
) {
4045 derr
<< __FUNC__
<< ": read error at " << pos
<< "~" << len
4046 << ", " << cpp_strerror(r
) << dendl
;
4051 // hrm, bad source range, wtf.
4053 derr
<< __FUNC__
<< ": got short read result at " << pos
4054 << " of fd " << from
<< " len " << len
<< dendl
;
4059 int r2
= safe_write(to
, buf
+op
, r
-op
);
4060 dout(25) << " write to " << to
<< " len " << (r
-op
)
4061 << " got " << r2
<< dendl
;
4064 derr
<< __FUNC__
<< ": write error at " << pos
<< "~"
4065 << r
-op
<< ", " << cpp_strerror(r
) << dendl
;
4077 if (r
< 0 && replaying
) {
4078 ceph_assert(r
== -ERANGE
);
4079 derr
<< __FUNC__
<< ": short source tolerated because we are replaying" << dendl
;
4082 ceph_assert(replaying
|| pos
== end
);
4083 if (r
>= 0 && !skip_sloppycrc
&& m_filestore_sloppy_crc
) {
4084 int rc
= backend
->_crc_update_clone_range(from
, to
, srcoff
, len
, dstoff
);
4085 ceph_assert(rc
>= 0);
4087 dout(20) << __FUNC__
<< ": " << srcoff
<< "~" << len
<< " to " << dstoff
<< " = " << r
<< dendl
;
4091 int FileStore::_clone_range(const coll_t
& oldcid
, const ghobject_t
& oldoid
, const coll_t
& newcid
, const ghobject_t
& newoid
,
4092 uint64_t srcoff
, uint64_t len
, uint64_t dstoff
,
4093 const SequencerPosition
& spos
)
4095 dout(15) << __FUNC__
<< ": " << oldcid
<< "/" << oldoid
<< " -> " << newcid
<< "/" << newoid
<< " " << srcoff
<< "~" << len
<< " to " << dstoff
<< dendl
;
4097 if (_check_replay_guard(newcid
, newoid
, spos
) < 0)
4102 r
= lfn_open(oldcid
, oldoid
, false, &o
);
4106 r
= lfn_open(newcid
, newoid
, true, &n
);
4110 r
= _do_clone_range(**o
, **n
, srcoff
, len
, dstoff
);
4115 // clone is non-idempotent; record our work.
4116 _set_replay_guard(**n
, spos
, &newoid
);
4123 dout(10) << __FUNC__
<< ": " << oldcid
<< "/" << oldoid
<< " -> " << newcid
<< "/" << newoid
<< " "
4124 << srcoff
<< "~" << len
<< " to " << dstoff
<< " = " << r
<< dendl
;
4128 class SyncEntryTimeout
: public Context
{
4131 explicit SyncEntryTimeout(CephContext
* cct
, int commit_timeo
)
4132 : cct(cct
), m_commit_timeo(commit_timeo
)
4136 void finish(int r
) override
{
4137 BackTrace
*bt
= new BackTrace(1);
4138 generic_dout(-1) << "FileStore: sync_entry timed out after "
4139 << m_commit_timeo
<< " seconds.\n";
4150 void FileStore::sync_entry()
4152 std::unique_lock l
{lock
};
4154 auto min_interval
= ceph::make_timespan(m_filestore_min_sync_interval
);
4155 auto max_interval
= ceph::make_timespan(m_filestore_max_sync_interval
);
4156 auto startwait
= ceph::real_clock::now();
4158 dout(20) << __FUNC__
<< ": waiting for max_interval " << max_interval
<< dendl
;
4159 sync_cond
.wait_for(l
, max_interval
);
4161 dout(20) << __FUNC__
<< ": not waiting, force_sync set" << dendl
;
4165 dout(20) << __FUNC__
<< ": force_sync set" << dendl
;
4168 dout(20) << __FUNC__
<< ": stop set" << dendl
;
4171 // wait for at least the min interval
4172 auto woke
= ceph::real_clock::now() - startwait
;
4173 dout(20) << __FUNC__
<< ": woke after " << woke
<< dendl
;
4174 if (woke
< min_interval
) {
4175 auto t
= min_interval
- woke
;
4176 dout(20) << __FUNC__
<< ": waiting for another " << t
4177 << " to reach min interval " << min_interval
<< dendl
;
4178 sync_cond
.wait_for(l
, t
);
4184 fin
.swap(sync_waiters
);
4188 if (apply_manager
.commit_start()) {
4189 auto start
= ceph::real_clock::now();
4190 uint64_t cp
= apply_manager
.get_committing_seq();
4192 sync_entry_timeo_lock
.lock();
4193 SyncEntryTimeout
*sync_entry_timeo
=
4194 new SyncEntryTimeout(cct
, m_filestore_commit_timeout
);
4195 if (!timer
.add_event_after(m_filestore_commit_timeout
,
4196 sync_entry_timeo
)) {
4197 sync_entry_timeo
= nullptr;
4199 sync_entry_timeo_lock
.unlock();
4201 logger
->set(l_filestore_committing
, 1);
4203 dout(15) << __FUNC__
<< ": committing " << cp
<< dendl
;
4204 stringstream errstream
;
4205 if (cct
->_conf
->filestore_debug_omap_check
&& !object_map
->check(errstream
)) {
4206 derr
<< errstream
.str() << dendl
;
4210 if (backend
->can_checkpoint()) {
4211 int err
= write_op_seq(op_fd
, cp
);
4213 derr
<< "Error during write_op_seq: " << cpp_strerror(err
) << dendl
;
4214 ceph_abort_msg("error during write_op_seq");
4218 snprintf(s
, sizeof(s
), COMMIT_SNAP_ITEM
, (long long unsigned)cp
);
4220 err
= backend
->create_checkpoint(s
, &cid
);
4223 derr
<< "snap create '" << s
<< "' got error " << err
<< dendl
;
4224 ceph_assert(err
== 0);
4227 snaps
.push_back(cp
);
4228 apply_manager
.commit_started();
4232 dout(20) << " waiting for checkpoint " << cid
<< " to complete" << dendl
;
4233 err
= backend
->sync_checkpoint(cid
);
4235 derr
<< "ioctl WAIT_SYNC got " << cpp_strerror(err
) << dendl
;
4236 ceph_abort_msg("wait_sync got error");
4238 dout(20) << " done waiting for checkpoint " << cid
<< " to complete" << dendl
;
4241 apply_manager
.commit_started();
4244 int err
= object_map
->sync();
4246 derr
<< "object_map sync got " << cpp_strerror(err
) << dendl
;
4247 ceph_abort_msg("object_map sync returned error");
4250 err
= backend
->syncfs();
4252 derr
<< "syncfs got " << cpp_strerror(err
) << dendl
;
4253 ceph_abort_msg("syncfs returned error");
4256 err
= write_op_seq(op_fd
, cp
);
4258 derr
<< "Error during write_op_seq: " << cpp_strerror(err
) << dendl
;
4259 ceph_abort_msg("error during write_op_seq");
4261 err
= ::fsync(op_fd
);
4263 derr
<< "Error during fsync of op_seq: " << cpp_strerror(err
) << dendl
;
4264 ceph_abort_msg("error during fsync of op_seq");
4268 auto done
= ceph::real_clock::now();
4269 auto lat
= done
- start
;
4270 auto dur
= done
- startwait
;
4271 dout(10) << __FUNC__
<< ": commit took " << lat
<< ", interval was " << dur
<< dendl
;
4272 utime_t max_pause_lat
= logger
->tget(l_filestore_sync_pause_max_lat
);
4273 if (max_pause_lat
< utime_t
{dur
- lat
}) {
4274 logger
->tinc(l_filestore_sync_pause_max_lat
, dur
- lat
);
4277 logger
->inc(l_filestore_commitcycle
);
4278 logger
->tinc(l_filestore_commitcycle_latency
, lat
);
4279 logger
->tinc(l_filestore_commitcycle_interval
, dur
);
4281 apply_manager
.commit_finish();
4282 if (!m_disable_wbthrottle
) {
4286 logger
->set(l_filestore_committing
, 0);
4288 // remove old snaps?
4289 if (backend
->can_checkpoint()) {
4291 while (snaps
.size() > 2) {
4292 snprintf(s
, sizeof(s
), COMMIT_SNAP_ITEM
, (long long unsigned)snaps
.front());
4294 dout(10) << "removing snap '" << s
<< "'" << dendl
;
4295 int r
= backend
->destroy_checkpoint(s
);
4298 derr
<< "unable to destroy snap '" << s
<< "' got " << cpp_strerror(err
) << dendl
;
4303 dout(15) << __FUNC__
<< ": committed to op_seq " << cp
<< dendl
;
4305 if (sync_entry_timeo
) {
4306 std::lock_guard lock
{sync_entry_timeo_lock
};
4307 timer
.cancel_event(sync_entry_timeo
);
4314 finish_contexts(cct
, fin
, 0);
4316 if (!sync_waiters
.empty()) {
4317 dout(10) << __FUNC__
<< ": more waiters, committing again" << dendl
;
4320 if (!stop
&& journal
&& journal
->should_commit_now()) {
4321 dout(10) << __FUNC__
<< ": journal says we should commit again (probably is/was full)" << dendl
;
4328 void FileStore::do_force_sync()
4330 dout(10) << __FUNC__
<< dendl
;
4331 std::lock_guard l
{lock
};
4333 sync_cond
.notify_all();
4336 void FileStore::start_sync(Context
*onsafe
)
4338 std::lock_guard l
{lock
};
4339 sync_waiters
.push_back(onsafe
);
4340 sync_cond
.notify_all();
4342 dout(10) << __FUNC__
<< dendl
;
4345 void FileStore::sync()
4347 ceph::mutex m
= ceph::make_mutex("FileStore::sync");
4348 ceph::condition_variable c
;
4350 C_SafeCond
*fin
= new C_SafeCond(m
, c
, &done
);
4354 std::unique_lock l
{m
};
4355 c
.wait(l
, [&done
, this] {
4357 dout(10) << "sync waiting" << dendl
;
4361 dout(10) << "sync done" << dendl
;
4364 void FileStore::_flush_op_queue()
4366 dout(10) << __FUNC__
<< ": draining op tp" << dendl
;
4368 dout(10) << __FUNC__
<< ": waiting for apply finisher" << dendl
;
4369 for (vector
<Finisher
*>::iterator it
= apply_finishers
.begin(); it
!= apply_finishers
.end(); ++it
) {
4370 (*it
)->wait_for_empty();
4375 * flush - make every queued write readable
4377 void FileStore::flush()
4379 dout(10) << __FUNC__
<< dendl
;
4381 if (cct
->_conf
->filestore_blackhole
) {
4383 ceph::mutex lock
= ceph::make_mutex("FileStore::flush::lock");
4384 ceph::condition_variable cond
;
4385 std::unique_lock l
{lock
};
4386 cond
.wait(l
, [] {return false;} );
4390 if (m_filestore_journal_writeahead
) {
4393 dout(10) << __FUNC__
<< ": draining ondisk finisher" << dendl
;
4394 for (vector
<Finisher
*>::iterator it
= ondisk_finishers
.begin(); it
!= ondisk_finishers
.end(); ++it
) {
4395 (*it
)->wait_for_empty();
4400 dout(10) << __FUNC__
<< ": complete" << dendl
;
4404 * sync_and_flush - make every queued write readable AND committed to disk
4406 void FileStore::sync_and_flush()
4408 dout(10) << __FUNC__
<< dendl
;
4410 if (m_filestore_journal_writeahead
) {
4415 // includes m_filestore_journal_parallel
4419 dout(10) << __FUNC__
<< ": done" << dendl
;
4422 int FileStore::flush_journal()
4424 dout(10) << __FUNC__
<< dendl
;
4430 int FileStore::snapshot(const string
& name
)
4432 dout(10) << __FUNC__
<< ": " << name
<< dendl
;
4435 if (!backend
->can_checkpoint()) {
4436 dout(0) << __FUNC__
<< ": " << name
<< " failed, not supported" << dendl
;
4441 snprintf(s
, sizeof(s
), CLUSTER_SNAP_ITEM
, name
.c_str());
4443 int r
= backend
->create_checkpoint(s
, nullptr);
4445 derr
<< __FUNC__
<< ": " << name
<< " failed: " << cpp_strerror(r
) << dendl
;
4451 // -------------------------------
4454 int FileStore::_fgetattr(int fd
, const char *name
, bufferptr
& bp
)
4456 char val
[CHAIN_XATTR_MAX_BLOCK_LEN
];
4457 int l
= chain_fgetxattr(fd
, name
, val
, sizeof(val
));
4459 bp
= ceph::buffer::create(l
);
4460 memcpy(bp
.c_str(), val
, l
);
4461 } else if (l
== -ERANGE
) {
4462 l
= chain_fgetxattr(fd
, name
, 0, 0);
4464 bp
= ceph::buffer::create(l
);
4465 l
= chain_fgetxattr(fd
, name
, bp
.c_str(), l
);
4468 ceph_assert(!m_filestore_fail_eio
|| l
!= -EIO
);
4472 int FileStore::_fgetattrs(int fd
, map
<string
,bufferptr
>& aset
)
4476 int len
= chain_flistxattr(fd
, names1
, sizeof(names1
)-1);
4479 if (len
== -ERANGE
) {
4480 len
= chain_flistxattr(fd
, 0, 0);
4482 ceph_assert(!m_filestore_fail_eio
|| len
!= -EIO
);
4485 dout(10) << " -ERANGE, len is " << len
<< dendl
;
4486 names2
= new char[len
+1];
4487 len
= chain_flistxattr(fd
, names2
, len
);
4488 dout(10) << " -ERANGE, got " << len
<< dendl
;
4490 ceph_assert(!m_filestore_fail_eio
|| len
!= -EIO
);
4495 } else if (len
< 0) {
4496 ceph_assert(!m_filestore_fail_eio
|| len
!= -EIO
);
4503 char *end
= name
+ len
;
4504 while (name
< end
) {
4505 char *attrname
= name
;
4506 if (parse_attrname(&name
)) {
4508 dout(20) << __FUNC__
<< ": " << fd
<< " getting '" << name
<< "'" << dendl
;
4509 int r
= _fgetattr(fd
, attrname
, aset
[name
]);
4516 name
+= strlen(name
) + 1;
4523 int FileStore::_fsetattrs(int fd
, map
<string
, bufferptr
> &aset
)
4525 for (map
<string
, bufferptr
>::iterator p
= aset
.begin();
4528 char n
[CHAIN_XATTR_MAX_NAME_LEN
];
4529 get_attrname(p
->first
.c_str(), n
, CHAIN_XATTR_MAX_NAME_LEN
);
4531 if (p
->second
.length())
4532 val
= p
->second
.c_str();
4535 // ??? Why do we skip setting all the other attrs if one fails?
4536 int r
= chain_fsetxattr(fd
, n
, val
, p
->second
.length());
4538 derr
<< __FUNC__
<< ": chain_setxattr returned " << r
<< dendl
;
4545 // debug EIO injection
4546 void FileStore::inject_data_error(const ghobject_t
&oid
) {
4547 std::lock_guard l
{read_error_lock
};
4548 dout(10) << __FUNC__
<< ": init error on " << oid
<< dendl
;
4549 data_error_set
.insert(oid
);
4551 void FileStore::inject_mdata_error(const ghobject_t
&oid
) {
4552 std::lock_guard l
{read_error_lock
};
4553 dout(10) << __FUNC__
<< ": init error on " << oid
<< dendl
;
4554 mdata_error_set
.insert(oid
);
4557 void FileStore::debug_obj_on_delete(const ghobject_t
&oid
) {
4558 std::lock_guard l
{read_error_lock
};
4559 dout(10) << __FUNC__
<< ": clear error on " << oid
<< dendl
;
4560 data_error_set
.erase(oid
);
4561 mdata_error_set
.erase(oid
);
4563 bool FileStore::debug_data_eio(const ghobject_t
&oid
) {
4564 std::lock_guard l
{read_error_lock
};
4565 if (data_error_set
.count(oid
)) {
4566 dout(10) << __FUNC__
<< ": inject error on " << oid
<< dendl
;
4572 bool FileStore::debug_mdata_eio(const ghobject_t
&oid
) {
4573 std::lock_guard l
{read_error_lock
};
4574 if (mdata_error_set
.count(oid
)) {
4575 dout(10) << __FUNC__
<< ": inject error on " << oid
<< dendl
;
4585 int FileStore::getattr(CollectionHandle
& ch
, const ghobject_t
& oid
, const char *name
, bufferptr
&bp
)
4587 tracepoint(objectstore
, getattr_enter
, ch
->cid
.c_str());
4588 const coll_t
& cid
= !_need_temp_object_collection(ch
->cid
, oid
) ? ch
->cid
: ch
->cid
.get_temp();
4589 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " '" << name
<< "'" << dendl
;
4591 auto osr
= static_cast<OpSequencer
*>(ch
.get());
4592 osr
->wait_for_apply(oid
);
4595 int r
= lfn_open(cid
, oid
, false, &fd
);
4599 char n
[CHAIN_XATTR_MAX_NAME_LEN
];
4600 get_attrname(name
, n
, CHAIN_XATTR_MAX_NAME_LEN
);
4601 r
= _fgetattr(**fd
, n
, bp
);
4603 if (r
== -ENODATA
) {
4604 map
<string
, bufferlist
> got
;
4606 to_get
.insert(string(name
));
4608 r
= get_index(cid
, &index
);
4610 dout(10) << __FUNC__
<< ": could not get index r = " << r
<< dendl
;
4613 r
= object_map
->get_xattrs(oid
, to_get
, &got
);
4614 if (r
< 0 && r
!= -ENOENT
) {
4615 dout(10) << __FUNC__
<< ": get_xattrs err r =" << r
<< dendl
;
4619 dout(10) << __FUNC__
<< ": got.size() is 0" << dendl
;
4622 bp
= bufferptr(got
.begin()->second
.c_str(),
4623 got
.begin()->second
.length());
4627 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " '" << name
<< "' = " << r
<< dendl
;
4628 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
4629 if (cct
->_conf
->filestore_debug_inject_read_err
&&
4630 debug_mdata_eio(oid
)) {
4633 tracepoint(objectstore
, getattr_exit
, r
);
4634 return r
< 0 ? r
: 0;
4638 int FileStore::getattrs(CollectionHandle
& ch
, const ghobject_t
& oid
, map
<string
,bufferptr
>& aset
)
4640 tracepoint(objectstore
, getattrs_enter
, ch
->cid
.c_str());
4641 const coll_t
& cid
= !_need_temp_object_collection(ch
->cid
, oid
) ? ch
->cid
: ch
->cid
.get_temp();
4642 set
<string
> omap_attrs
;
4643 map
<string
, bufferlist
> omap_aset
;
4645 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
<< dendl
;
4647 auto osr
= static_cast<OpSequencer
*>(ch
.get());
4648 osr
->wait_for_apply(oid
);
4651 bool spill_out
= true;
4654 int r
= lfn_open(cid
, oid
, false, &fd
);
4659 r
= chain_fgetxattr(**fd
, XATTR_SPILL_OUT_NAME
, buf
, sizeof(buf
));
4660 if (r
>= 0 && !strncmp(buf
, XATTR_NO_SPILL_OUT
, sizeof(XATTR_NO_SPILL_OUT
)))
4663 r
= _fgetattrs(**fd
, aset
);
4665 fd
= FDRef(); // defensive
4671 dout(10) << __FUNC__
<< ": no xattr exists in object_map r = " << r
<< dendl
;
4675 r
= get_index(cid
, &index
);
4677 dout(10) << __FUNC__
<< ": could not get index r = " << r
<< dendl
;
4681 r
= object_map
->get_all_xattrs(oid
, &omap_attrs
);
4682 if (r
< 0 && r
!= -ENOENT
) {
4683 dout(10) << __FUNC__
<< ": could not get omap_attrs r = " << r
<< dendl
;
4687 r
= object_map
->get_xattrs(oid
, omap_attrs
, &omap_aset
);
4688 if (r
< 0 && r
!= -ENOENT
) {
4689 dout(10) << __FUNC__
<< ": could not get omap_attrs r = " << r
<< dendl
;
4695 ceph_assert(omap_attrs
.size() == omap_aset
.size());
4696 for (map
<string
, bufferlist
>::iterator i
= omap_aset
.begin();
4697 i
!= omap_aset
.end();
4699 string
key(i
->first
);
4700 aset
.insert(make_pair(key
,
4701 bufferptr(i
->second
.c_str(), i
->second
.length())));
4704 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " = " << r
<< dendl
;
4705 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
4707 if (cct
->_conf
->filestore_debug_inject_read_err
&&
4708 debug_mdata_eio(oid
)) {
4711 tracepoint(objectstore
, getattrs_exit
, r
);
4716 int FileStore::_setattrs(const coll_t
& cid
, const ghobject_t
& oid
, map
<string
,bufferptr
>& aset
,
4717 const SequencerPosition
&spos
)
4719 map
<string
, bufferlist
> omap_set
;
4720 set
<string
> omap_remove
;
4721 map
<string
, bufferptr
> inline_set
;
4722 map
<string
, bufferptr
> inline_to_set
;
4725 bool incomplete_inline
= false;
4727 int r
= lfn_open(cid
, oid
, false, &fd
);
4733 r
= chain_fgetxattr(**fd
, XATTR_SPILL_OUT_NAME
, buf
, sizeof(buf
));
4734 if (r
>= 0 && !strncmp(buf
, XATTR_NO_SPILL_OUT
, sizeof(XATTR_NO_SPILL_OUT
)))
4739 r
= _fgetattrs(**fd
, inline_set
);
4740 incomplete_inline
= (r
== -E2BIG
);
4741 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
4742 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
4743 << (incomplete_inline
? " (incomplete_inline, forcing omap)" : "")
4746 for (map
<string
,bufferptr
>::iterator p
= aset
.begin();
4749 char n
[CHAIN_XATTR_MAX_NAME_LEN
];
4750 get_attrname(p
->first
.c_str(), n
, CHAIN_XATTR_MAX_NAME_LEN
);
4752 if (incomplete_inline
) {
4753 chain_fremovexattr(**fd
, n
); // ignore any error
4754 omap_set
[p
->first
].push_back(p
->second
);
4758 if (p
->second
.length() > m_filestore_max_inline_xattr_size
) {
4759 if (inline_set
.count(p
->first
)) {
4760 inline_set
.erase(p
->first
);
4761 r
= chain_fremovexattr(**fd
, n
);
4765 omap_set
[p
->first
].push_back(p
->second
);
4769 if (!inline_set
.count(p
->first
) &&
4770 inline_set
.size() >= m_filestore_max_inline_xattrs
) {
4771 omap_set
[p
->first
].push_back(p
->second
);
4774 omap_remove
.insert(p
->first
);
4775 inline_set
.insert(*p
);
4777 inline_to_set
.insert(*p
);
4780 if (spill_out
!= 1 && !omap_set
.empty()) {
4781 chain_fsetxattr(**fd
, XATTR_SPILL_OUT_NAME
, XATTR_SPILL_OUT
,
4782 sizeof(XATTR_SPILL_OUT
));
4785 r
= _fsetattrs(**fd
, inline_to_set
);
4789 if (spill_out
&& !omap_remove
.empty()) {
4790 r
= object_map
->remove_xattrs(oid
, omap_remove
, &spos
);
4791 if (r
< 0 && r
!= -ENOENT
) {
4792 dout(10) << __FUNC__
<< ": could not remove_xattrs r = " << r
<< dendl
;
4793 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
4796 r
= 0; // don't confuse the debug output
4800 if (!omap_set
.empty()) {
4801 r
= object_map
->set_xattrs(oid
, omap_set
, &spos
);
4803 dout(10) << __FUNC__
<< ": could not set_xattrs r = " << r
<< dendl
;
4804 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
4811 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " = " << r
<< dendl
;
4816 int FileStore::_rmattr(const coll_t
& cid
, const ghobject_t
& oid
, const char *name
,
4817 const SequencerPosition
&spos
)
4819 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " '" << name
<< "'" << dendl
;
4821 bool spill_out
= true;
4823 int r
= lfn_open(cid
, oid
, false, &fd
);
4829 r
= chain_fgetxattr(**fd
, XATTR_SPILL_OUT_NAME
, buf
, sizeof(buf
));
4830 if (r
>= 0 && !strncmp(buf
, XATTR_NO_SPILL_OUT
, sizeof(XATTR_NO_SPILL_OUT
))) {
4834 char n
[CHAIN_XATTR_MAX_NAME_LEN
];
4835 get_attrname(name
, n
, CHAIN_XATTR_MAX_NAME_LEN
);
4836 r
= chain_fremovexattr(**fd
, n
);
4837 if (r
== -ENODATA
&& spill_out
) {
4839 r
= get_index(cid
, &index
);
4841 dout(10) << __FUNC__
<< ": could not get index r = " << r
<< dendl
;
4844 set
<string
> to_remove
;
4845 to_remove
.insert(string(name
));
4846 r
= object_map
->remove_xattrs(oid
, to_remove
, &spos
);
4847 if (r
< 0 && r
!= -ENOENT
) {
4848 dout(10) << __FUNC__
<< ": could not remove_xattrs index r = " << r
<< dendl
;
4849 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
4856 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " '" << name
<< "' = " << r
<< dendl
;
4860 int FileStore::_rmattrs(const coll_t
& cid
, const ghobject_t
& oid
,
4861 const SequencerPosition
&spos
)
4863 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
<< dendl
;
4865 map
<string
,bufferptr
> aset
;
4867 set
<string
> omap_attrs
;
4869 bool spill_out
= true;
4871 int r
= lfn_open(cid
, oid
, false, &fd
);
4877 r
= chain_fgetxattr(**fd
, XATTR_SPILL_OUT_NAME
, buf
, sizeof(buf
));
4878 if (r
>= 0 && !strncmp(buf
, XATTR_NO_SPILL_OUT
, sizeof(XATTR_NO_SPILL_OUT
))) {
4882 r
= _fgetattrs(**fd
, aset
);
4884 for (map
<string
,bufferptr
>::iterator p
= aset
.begin(); p
!= aset
.end(); ++p
) {
4885 char n
[CHAIN_XATTR_MAX_NAME_LEN
];
4886 get_attrname(p
->first
.c_str(), n
, CHAIN_XATTR_MAX_NAME_LEN
);
4887 r
= chain_fremovexattr(**fd
, n
);
4889 dout(10) << __FUNC__
<< ": could not remove xattr r = " << r
<< dendl
;
4896 dout(10) << __FUNC__
<< ": no xattr exists in object_map r = " << r
<< dendl
;
4900 r
= get_index(cid
, &index
);
4902 dout(10) << __FUNC__
<< ": could not get index r = " << r
<< dendl
;
4906 r
= object_map
->get_all_xattrs(oid
, &omap_attrs
);
4907 if (r
< 0 && r
!= -ENOENT
) {
4908 dout(10) << __FUNC__
<< ": could not get omap_attrs r = " << r
<< dendl
;
4909 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
4912 r
= object_map
->remove_xattrs(oid
, omap_attrs
, &spos
);
4913 if (r
< 0 && r
!= -ENOENT
) {
4914 dout(10) << __FUNC__
<< ": could not remove omap_attrs r = " << r
<< dendl
;
4919 chain_fsetxattr(**fd
, XATTR_SPILL_OUT_NAME
, XATTR_NO_SPILL_OUT
,
4920 sizeof(XATTR_NO_SPILL_OUT
));
4926 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " = " << r
<< dendl
;
4933 int FileStore::_collection_remove_recursive(const coll_t
&cid
,
4934 const SequencerPosition
&spos
)
4937 int r
= collection_stat(cid
, &st
);
4944 vector
<ghobject_t
> objects
;
4946 while (!max
.is_max()) {
4947 r
= collection_list(cid
, max
, ghobject_t::get_max(),
4948 300, &objects
, &max
);
4951 for (vector
<ghobject_t
>::iterator i
= objects
.begin();
4954 ceph_assert(_check_replay_guard(cid
, *i
, spos
));
4955 r
= _remove(cid
, *i
, spos
);
4961 return _destroy_collection(cid
);
4964 // --------------------------
4967 int FileStore::list_collections(vector
<coll_t
>& ls
)
4969 return list_collections(ls
, false);
4972 int FileStore::list_collections(vector
<coll_t
>& ls
, bool include_temp
)
4974 tracepoint(objectstore
, list_collections_enter
);
4975 dout(10) << __FUNC__
<< dendl
;
4978 snprintf(fn
, sizeof(fn
), "%s/current", basedir
.c_str());
4981 DIR *dir
= ::opendir(fn
);
4984 derr
<< "tried opening directory " << fn
<< ": " << cpp_strerror(-r
) << dendl
;
4985 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
4989 struct dirent
*de
= nullptr;
4990 while ((de
= ::readdir(dir
))) {
4991 if (de
->d_type
== DT_UNKNOWN
) {
4992 // d_type not supported (non-ext[234], btrfs), must stat
4994 char filename
[PATH_MAX
];
4995 if (int n
= snprintf(filename
, sizeof(filename
), "%s/%s", fn
, de
->d_name
);
4996 n
>= static_cast<int>(sizeof(filename
))) {
4997 derr
<< __func__
<< " path length overrun: " << n
<< dendl
;
5001 r
= ::stat(filename
, &sb
);
5004 derr
<< "stat on " << filename
<< ": " << cpp_strerror(-r
) << dendl
;
5005 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
5008 if (!S_ISDIR(sb
.st_mode
)) {
5011 } else if (de
->d_type
!= DT_DIR
) {
5014 if (strcmp(de
->d_name
, "omap") == 0) {
5017 if (de
->d_name
[0] == '.' &&
5018 (de
->d_name
[1] == '\0' ||
5019 (de
->d_name
[1] == '.' &&
5020 de
->d_name
[2] == '\0')))
5023 if (!cid
.parse(de
->d_name
)) {
5024 derr
<< "ignoring invalid collection '" << de
->d_name
<< "'" << dendl
;
5027 if (!cid
.is_temp() || include_temp
)
5032 derr
<< "trying readdir " << fn
<< ": " << cpp_strerror(r
) << dendl
;
5037 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
5038 tracepoint(objectstore
, list_collections_exit
, r
);
5042 int FileStore::collection_stat(const coll_t
& c
, struct stat
*st
)
5044 tracepoint(objectstore
, collection_stat_enter
, c
.c_str());
5046 get_cdir(c
, fn
, sizeof(fn
));
5047 dout(15) << __FUNC__
<< ": " << fn
<< dendl
;
5048 int r
= ::stat(fn
, st
);
5051 dout(10) << __FUNC__
<< ": " << fn
<< " = " << r
<< dendl
;
5052 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
5053 tracepoint(objectstore
, collection_stat_exit
, r
);
5057 bool FileStore::collection_exists(const coll_t
& c
)
5059 tracepoint(objectstore
, collection_exists_enter
, c
.c_str());
5061 bool ret
= collection_stat(c
, &st
) == 0;
5062 tracepoint(objectstore
, collection_exists_exit
, ret
);
5066 int FileStore::collection_empty(const coll_t
& cid
, bool *empty
)
5068 tracepoint(objectstore
, collection_empty_enter
, cid
.c_str());
5069 dout(15) << __FUNC__
<< ": " << cid
<< dendl
;
5071 int r
= get_index(cid
, &index
);
5073 derr
<< __FUNC__
<< ": get_index returned: " << cpp_strerror(r
)
5078 ceph_assert(index
.index
);
5079 std::shared_lock l
{(index
.index
)->access_lock
};
5081 vector
<ghobject_t
> ls
;
5082 r
= index
->collection_list_partial(ghobject_t(), ghobject_t::get_max(),
5085 derr
<< __FUNC__
<< ": collection_list_partial returned: "
5086 << cpp_strerror(r
) << dendl
;
5087 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
5090 *empty
= ls
.empty();
5091 tracepoint(objectstore
, collection_empty_exit
, *empty
);
5095 int FileStore::_collection_set_bits(const coll_t
& c
, int bits
)
5098 get_cdir(c
, fn
, sizeof(fn
));
5099 dout(10) << __FUNC__
<< ": " << fn
<< " " << bits
<< dendl
;
5103 int fd
= ::open(fn
, O_RDONLY
|O_CLOEXEC
);
5108 get_attrname("bits", n
, PATH_MAX
);
5109 r
= chain_fsetxattr(fd
, n
, (char*)&v
, sizeof(v
));
5110 VOID_TEMP_FAILURE_RETRY(::close(fd
));
5112 dout(10) << __FUNC__
<< ": " << fn
<< " " << bits
<< " = " << r
<< dendl
;
5116 int FileStore::collection_bits(CollectionHandle
& ch
)
5119 get_cdir(ch
->cid
, fn
, sizeof(fn
));
5120 dout(15) << __FUNC__
<< ": " << fn
<< dendl
;
5124 int fd
= ::open(fn
, O_RDONLY
|O_CLOEXEC
);
5129 get_attrname("bits", n
, PATH_MAX
);
5130 r
= chain_fgetxattr(fd
, n
, (char*)&bits
, sizeof(bits
));
5131 VOID_TEMP_FAILURE_RETRY(::close(fd
));
5137 dout(10) << __FUNC__
<< ": " << fn
<< " = " << bits
<< dendl
;
5141 int FileStore::collection_list(const coll_t
& c
,
5142 const ghobject_t
& orig_start
,
5143 const ghobject_t
& end
,
5145 vector
<ghobject_t
> *ls
, ghobject_t
*next
)
5147 ghobject_t start
= orig_start
;
5151 ghobject_t temp_next
;
5154 // figure out the pool id. we need this in order to generate a
5155 // meaningful 'next' value.
5160 if (c
.is_temp(&pgid
)) {
5161 pool
= -2 - pgid
.pool();
5163 } else if (c
.is_pg(&pgid
)) {
5166 } else if (c
.is_meta()) {
5168 shard
= shard_id_t::NO_SHARD
;
5170 // hrm, the caller is test code! we should get kill it off. for now,
5173 shard
= shard_id_t::NO_SHARD
;
5175 dout(20) << __FUNC__
<< ": pool is " << pool
<< " shard is " << shard
5176 << " pgid " << pgid
<< dendl
;
5180 sep
.set_shard(shard
);
5181 if (!c
.is_temp() && !c
.is_meta()) {
5183 dout(10) << __FUNC__
<< ": first checking temp pool" << dendl
;
5184 coll_t temp
= c
.get_temp();
5185 int r
= collection_list(temp
, start
, end
, max
, ls
, next
);
5188 if (*next
!= ghobject_t::get_max())
5191 dout(10) << __FUNC__
<< ": fall through to non-temp collection, start "
5194 dout(10) << __FUNC__
<< ": start " << start
<< " >= sep " << sep
<< dendl
;
5199 int r
= get_index(c
, &index
);
5203 ceph_assert(index
.index
);
5204 std::shared_lock l
{(index
.index
)->access_lock
};
5206 r
= index
->collection_list_partial(start
, end
, max
, ls
, next
);
5209 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
5212 dout(20) << "objects: " << *ls
<< dendl
;
5214 // HashIndex doesn't know the pool when constructing a 'next' value
5215 if (!next
->is_max()) {
5216 next
->hobj
.pool
= pool
;
5217 next
->set_shard(shard
);
5218 dout(20) << " next " << *next
<< dendl
;
5224 int FileStore::omap_get(CollectionHandle
& ch
, const ghobject_t
&hoid
,
5226 map
<string
, bufferlist
> *out
)
5228 tracepoint(objectstore
, omap_get_enter
, ch
->cid
.c_str());
5229 const coll_t
& c
= !_need_temp_object_collection(ch
->cid
, hoid
) ? ch
->cid
: ch
->cid
.get_temp();
5230 dout(15) << __FUNC__
<< ": " << c
<< "/" << hoid
<< dendl
;
5232 auto osr
= static_cast<OpSequencer
*>(ch
.get());
5233 osr
->wait_for_apply(hoid
);
5236 int r
= get_index(c
, &index
);
5240 ceph_assert(index
.index
);
5241 std::shared_lock l
{(index
.index
)->access_lock
};
5242 r
= lfn_find(hoid
, index
);
5246 r
= object_map
->get(hoid
, header
, out
);
5247 if (r
< 0 && r
!= -ENOENT
) {
5248 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
5251 tracepoint(objectstore
, omap_get_exit
, 0);
5255 int FileStore::omap_get_header(
5256 CollectionHandle
& ch
,
5257 const ghobject_t
&hoid
,
5261 tracepoint(objectstore
, omap_get_header_enter
, ch
->cid
.c_str());
5262 const coll_t
& c
= !_need_temp_object_collection(ch
->cid
, hoid
) ? ch
->cid
: ch
->cid
.get_temp();
5263 dout(15) << __FUNC__
<< ": " << c
<< "/" << hoid
<< dendl
;
5265 auto osr
= static_cast<OpSequencer
*>(ch
.get());
5266 osr
->wait_for_apply(hoid
);
5269 int r
= get_index(c
, &index
);
5273 ceph_assert(index
.index
);
5274 std::shared_lock l
{(index
.index
)->access_lock
};
5275 r
= lfn_find(hoid
, index
);
5279 r
= object_map
->get_header(hoid
, bl
);
5280 if (r
< 0 && r
!= -ENOENT
) {
5281 ceph_assert(allow_eio
|| !m_filestore_fail_eio
|| r
!= -EIO
);
5284 tracepoint(objectstore
, omap_get_header_exit
, 0);
5288 int FileStore::omap_get_keys(CollectionHandle
& ch
, const ghobject_t
&hoid
, set
<string
> *keys
)
5290 tracepoint(objectstore
, omap_get_keys_enter
, ch
->cid
.c_str());
5291 const coll_t
& c
= !_need_temp_object_collection(ch
->cid
, hoid
) ? ch
->cid
: ch
->cid
.get_temp();
5292 dout(15) << __FUNC__
<< ": " << c
<< "/" << hoid
<< dendl
;
5294 auto osr
= static_cast<OpSequencer
*>(ch
.get());
5295 osr
->wait_for_apply(hoid
);
5298 int r
= get_index(c
, &index
);
5302 ceph_assert(index
.index
);
5303 std::shared_lock l
{(index
.index
)->access_lock
};
5304 r
= lfn_find(hoid
, index
);
5308 r
= object_map
->get_keys(hoid
, keys
);
5309 if (r
< 0 && r
!= -ENOENT
) {
5310 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
5313 tracepoint(objectstore
, omap_get_keys_exit
, 0);
5317 int FileStore::omap_get_values(CollectionHandle
& ch
, const ghobject_t
&hoid
,
5318 const set
<string
> &keys
,
5319 map
<string
, bufferlist
> *out
)
5321 tracepoint(objectstore
, omap_get_values_enter
, ch
->cid
.c_str());
5322 const coll_t
& c
= !_need_temp_object_collection(ch
->cid
, hoid
) ? ch
->cid
: ch
->cid
.get_temp();
5323 dout(15) << __FUNC__
<< ": " << c
<< "/" << hoid
<< dendl
;
5325 auto osr
= static_cast<OpSequencer
*>(ch
.get());
5326 osr
->wait_for_apply(hoid
);
5329 const char *where
= "()";
5330 int r
= get_index(c
, &index
);
5332 where
= " (get_index)";
5336 ceph_assert(index
.index
);
5337 std::shared_lock l
{(index
.index
)->access_lock
};
5338 r
= lfn_find(hoid
, index
);
5340 where
= " (lfn_find)";
5344 r
= object_map
->get_values(hoid
, keys
, out
);
5345 if (r
< 0 && r
!= -ENOENT
) {
5346 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
5347 where
= " (get_values)";
5352 tracepoint(objectstore
, omap_get_values_exit
, r
);
5353 dout(15) << __FUNC__
<< ": " << c
<< "/" << hoid
<< " = " << r
5358 int FileStore::omap_check_keys(CollectionHandle
& ch
, const ghobject_t
&hoid
,
5359 const set
<string
> &keys
,
5362 tracepoint(objectstore
, omap_check_keys_enter
, ch
->cid
.c_str());
5363 const coll_t
& c
= !_need_temp_object_collection(ch
->cid
, hoid
) ? ch
->cid
: ch
->cid
.get_temp();
5364 dout(15) << __FUNC__
<< ": " << c
<< "/" << hoid
<< dendl
;
5366 auto osr
= static_cast<OpSequencer
*>(ch
.get());
5367 osr
->wait_for_apply(hoid
);
5370 int r
= get_index(c
, &index
);
5374 ceph_assert(index
.index
);
5375 std::shared_lock l
{(index
.index
)->access_lock
};
5376 r
= lfn_find(hoid
, index
);
5380 r
= object_map
->check_keys(hoid
, keys
, out
);
5381 if (r
< 0 && r
!= -ENOENT
) {
5382 if (r
== -EIO
&& m_filestore_fail_eio
) handle_eio();
5385 tracepoint(objectstore
, omap_check_keys_exit
, 0);
5389 ObjectMap::ObjectMapIterator
FileStore::get_omap_iterator(
5390 CollectionHandle
& ch
,
5391 const ghobject_t
&oid
)
5393 auto osr
= static_cast<OpSequencer
*>(ch
.get());
5394 osr
->wait_for_apply(oid
);
5395 return get_omap_iterator(ch
->cid
, oid
);
5398 ObjectMap::ObjectMapIterator
FileStore::get_omap_iterator(const coll_t
& _c
,
5399 const ghobject_t
&hoid
)
5401 tracepoint(objectstore
, get_omap_iterator
, _c
.c_str());
5402 const coll_t
& c
= !_need_temp_object_collection(_c
, hoid
) ? _c
: _c
.get_temp();
5403 dout(15) << __FUNC__
<< ": " << c
<< "/" << hoid
<< dendl
;
5405 int r
= get_index(c
, &index
);
5407 dout(10) << __FUNC__
<< ": " << c
<< "/" << hoid
<< " = 0 "
5408 << "(get_index failed with " << cpp_strerror(r
) << ")" << dendl
;
5409 return ObjectMap::ObjectMapIterator();
5412 ceph_assert(index
.index
);
5413 std::shared_lock l
{(index
.index
)->access_lock
};
5414 r
= lfn_find(hoid
, index
);
5416 dout(10) << __FUNC__
<< ": " << c
<< "/" << hoid
<< " = 0 "
5417 << "(lfn_find failed with " << cpp_strerror(r
) << ")" << dendl
;
5418 return ObjectMap::ObjectMapIterator();
5421 return object_map
->get_iterator(hoid
);
5424 int FileStore::_collection_hint_expected_num_objs(const coll_t
& c
, uint32_t pg_num
,
5425 uint64_t expected_num_objs
,
5426 const SequencerPosition
&spos
)
5428 dout(15) << __FUNC__
<< ": collection: " << c
<< " pg number: "
5429 << pg_num
<< " expected number of objects: " << expected_num_objs
<< dendl
;
5432 int ret
= collection_empty(c
, &empty
);
5435 if (!empty
&& !replaying
) {
5436 dout(0) << "Failed to give an expected number of objects hint to collection : "
5437 << c
<< ", only empty collection can take such type of hint. " << dendl
;
5442 ret
= get_index(c
, &index
);
5445 // Pre-hash the collection
5446 ret
= index
->pre_hash_collection(pg_num
, expected_num_objs
);
5447 dout(10) << "pre_hash_collection " << c
<< " = " << ret
<< dendl
;
5450 _set_replay_guard(c
, spos
);
5455 int FileStore::_create_collection(
5458 const SequencerPosition
&spos
)
5461 get_cdir(c
, fn
, sizeof(fn
));
5462 dout(15) << __FUNC__
<< ": " << fn
<< dendl
;
5463 int r
= ::mkdir(fn
, 0755);
5466 if (r
== -EEXIST
&& replaying
)
5468 dout(10) << __FUNC__
<< ": " << fn
<< " = " << r
<< dendl
;
5475 r
= _collection_set_bits(c
, bits
);
5478 // create parallel temp collection, too
5479 if (!c
.is_meta() && !c
.is_temp()) {
5480 coll_t temp
= c
.get_temp();
5481 r
= _create_collection(temp
, 0, spos
);
5486 _set_replay_guard(c
, spos
);
5490 int FileStore::_destroy_collection(const coll_t
& c
)
5494 get_cdir(c
, fn
, sizeof(fn
));
5495 dout(15) << __FUNC__
<< ": " << fn
<< dendl
;
5498 r
= get_index(c
, &from
);
5501 ceph_assert(from
.index
);
5502 std::unique_lock l
{(from
.index
)->access_lock
};
5504 r
= from
->prep_delete();
5515 // destroy parallel temp collection, too
5516 if (!c
.is_meta() && !c
.is_temp()) {
5517 coll_t temp
= c
.get_temp();
5518 int r2
= _destroy_collection(temp
);
5526 dout(10) << __FUNC__
<< ": " << fn
<< " = " << r
<< dendl
;
5531 int FileStore::_collection_add(const coll_t
& c
, const coll_t
& oldcid
, const ghobject_t
& o
,
5532 const SequencerPosition
& spos
)
5534 dout(15) << __FUNC__
<< ": " << c
<< "/" << o
<< " from " << oldcid
<< "/" << o
<< dendl
;
5536 int dstcmp
= _check_replay_guard(c
, o
, spos
);
5540 // check the src name too; it might have a newer guard, and we don't
5541 // want to clobber it
5542 int srccmp
= _check_replay_guard(oldcid
, o
, spos
);
5546 // open guard on object so we don't any previous operations on the
5547 // new name that will modify the source inode.
5549 int r
= lfn_open(oldcid
, o
, 0, &fd
);
5551 // the source collection/object does not exist. If we are replaying, we
5552 // should be safe, so just return 0 and move on.
5553 ceph_assert(replaying
);
5554 dout(10) << __FUNC__
<< ": " << c
<< "/" << o
<< " from "
5555 << oldcid
<< "/" << o
<< " (dne, continue replay) " << dendl
;
5558 if (dstcmp
> 0) { // if dstcmp == 0 the guard already says "in-progress"
5559 _set_replay_guard(**fd
, spos
, &o
, true);
5562 r
= lfn_link(oldcid
, c
, o
, o
);
5563 if (replaying
&& !backend
->can_checkpoint() &&
5564 r
== -EEXIST
) // crashed between link() and set_replay_guard()
5569 // close guard on object so we don't do this again
5571 _close_replay_guard(**fd
, spos
);
5575 dout(10) << __FUNC__
<< ": " << c
<< "/" << o
<< " from " << oldcid
<< "/" << o
<< " = " << r
<< dendl
;
5579 int FileStore::_collection_move_rename(const coll_t
& oldcid
, const ghobject_t
& oldoid
,
5580 coll_t c
, const ghobject_t
& o
,
5581 const SequencerPosition
& spos
,
5584 dout(15) << __FUNC__
<< ": " << c
<< "/" << o
<< " from " << oldcid
<< "/" << oldoid
<< dendl
;
5589 /* If the destination collection doesn't exist during replay,
5590 * we need to delete the src object and continue on
5592 if (!collection_exists(c
))
5596 dstcmp
= _check_replay_guard(c
, o
, spos
);
5600 // check the src name too; it might have a newer guard, and we don't
5601 // want to clobber it
5602 srccmp
= _check_replay_guard(oldcid
, oldoid
, spos
);
5607 // open guard on object so we don't any previous operations on the
5608 // new name that will modify the source inode.
5610 r
= lfn_open(oldcid
, oldoid
, 0, &fd
);
5612 // the source collection/object does not exist. If we are replaying, we
5613 // should be safe, so just return 0 and move on.
5615 dout(10) << __FUNC__
<< ": " << c
<< "/" << o
<< " from "
5616 << oldcid
<< "/" << oldoid
<< " (dne, continue replay) " << dendl
;
5617 } else if (allow_enoent
) {
5618 dout(10) << __FUNC__
<< ": " << c
<< "/" << o
<< " from "
5619 << oldcid
<< "/" << oldoid
<< " (dne, ignoring enoent)"
5622 ceph_abort_msg("ERROR: source must exist");
5628 if (allow_enoent
&& dstcmp
> 0) { // if dstcmp == 0, try_rename was started.
5632 r
= 0; // don't know if object_map was cloned
5634 if (dstcmp
> 0) { // if dstcmp == 0 the guard already says "in-progress"
5635 _set_replay_guard(**fd
, spos
, &o
, true);
5638 r
= lfn_link(oldcid
, c
, oldoid
, o
);
5639 if (replaying
&& !backend
->can_checkpoint() &&
5640 r
== -EEXIST
) // crashed between link() and set_replay_guard()
5650 // the name changed; link the omap content
5651 r
= object_map
->rename(oldoid
, o
, &spos
);
5659 r
= lfn_unlink(oldcid
, oldoid
, spos
, true);
5662 r
= lfn_open(c
, o
, 0, &fd
);
5664 // close guard on object so we don't do this again
5666 _close_replay_guard(**fd
, spos
, &o
);
5671 dout(10) << __FUNC__
<< ": " << c
<< "/" << o
<< " from " << oldcid
<< "/" << oldoid
5672 << " = " << r
<< dendl
;
5677 if (_check_replay_guard(oldcid
, oldoid
, spos
) > 0) {
5678 r
= lfn_unlink(oldcid
, oldoid
, spos
, true);
5681 dout(10) << __FUNC__
<< ": " << c
<< "/" << o
<< " from " << oldcid
<< "/" << oldoid
5682 << " = " << r
<< dendl
;
5686 void FileStore::_inject_failure()
5688 if (m_filestore_kill_at
) {
5689 int final
= --m_filestore_kill_at
;
5690 dout(5) << __FUNC__
<< ": " << (final
+1) << " -> " << final
<< dendl
;
5692 derr
<< __FUNC__
<< ": KILLING" << dendl
;
5699 int FileStore::_omap_clear(const coll_t
& cid
, const ghobject_t
&hoid
,
5700 const SequencerPosition
&spos
) {
5701 dout(15) << __FUNC__
<< ": " << cid
<< "/" << hoid
<< dendl
;
5703 int r
= get_index(cid
, &index
);
5707 ceph_assert(index
.index
);
5708 std::shared_lock l
{(index
.index
)->access_lock
};
5709 r
= lfn_find(hoid
, index
);
5713 r
= object_map
->clear_keys_header(hoid
, &spos
);
5714 if (r
< 0 && r
!= -ENOENT
)
5719 int FileStore::_omap_setkeys(const coll_t
& cid
, const ghobject_t
&hoid
,
5720 const map
<string
, bufferlist
> &aset
,
5721 const SequencerPosition
&spos
) {
5722 dout(15) << __FUNC__
<< ": " << cid
<< "/" << hoid
<< dendl
;
5725 //treat pgmeta as a logical object, skip to check exist
5726 if (hoid
.is_pgmeta())
5729 r
= get_index(cid
, &index
);
5731 dout(20) << __FUNC__
<< ": get_index got " << cpp_strerror(r
) << dendl
;
5735 ceph_assert(index
.index
);
5736 std::shared_lock l
{(index
.index
)->access_lock
};
5737 r
= lfn_find(hoid
, index
);
5739 dout(20) << __FUNC__
<< ": lfn_find got " << cpp_strerror(r
) << dendl
;
5744 if (g_conf()->subsys
.should_gather
<ceph_subsys_filestore
, 20>()) {
5745 for (auto& p
: aset
) {
5746 dout(20) << __FUNC__
<< ": set " << p
.first
<< dendl
;
5749 r
= object_map
->set_keys(hoid
, aset
, &spos
);
5750 dout(20) << __FUNC__
<< ": " << cid
<< "/" << hoid
<< " = " << r
<< dendl
;
5754 int FileStore::_omap_rmkeys(const coll_t
& cid
, const ghobject_t
&hoid
,
5755 const set
<string
> &keys
,
5756 const SequencerPosition
&spos
) {
5757 dout(15) << __FUNC__
<< ": " << cid
<< "/" << hoid
<< dendl
;
5760 //treat pgmeta as a logical object, skip to check exist
5761 if (hoid
.is_pgmeta())
5764 r
= get_index(cid
, &index
);
5768 ceph_assert(index
.index
);
5769 std::shared_lock l
{(index
.index
)->access_lock
};
5770 r
= lfn_find(hoid
, index
);
5775 r
= object_map
->rm_keys(hoid
, keys
, &spos
);
5776 if (r
< 0 && r
!= -ENOENT
)
5781 int FileStore::_omap_rmkeyrange(const coll_t
& cid
, const ghobject_t
&hoid
,
5782 const string
& first
, const string
& last
,
5783 const SequencerPosition
&spos
) {
5784 dout(15) << __FUNC__
<< ": " << cid
<< "/" << hoid
<< " [" << first
<< "," << last
<< "]" << dendl
;
5787 ObjectMap::ObjectMapIterator iter
= get_omap_iterator(cid
, hoid
);
5790 for (iter
->lower_bound(first
); iter
->valid() && iter
->key() < last
;
5792 keys
.insert(iter
->key());
5795 return _omap_rmkeys(cid
, hoid
, keys
, spos
);
5798 int FileStore::_omap_setheader(const coll_t
& cid
, const ghobject_t
&hoid
,
5799 const bufferlist
&bl
,
5800 const SequencerPosition
&spos
)
5802 dout(15) << __FUNC__
<< ": " << cid
<< "/" << hoid
<< dendl
;
5804 int r
= get_index(cid
, &index
);
5808 ceph_assert(index
.index
);
5809 std::shared_lock l
{(index
.index
)->access_lock
};
5810 r
= lfn_find(hoid
, index
);
5814 return object_map
->set_header(hoid
, bl
, &spos
);
5817 int FileStore::_merge_collection(const coll_t
& cid
,
5820 const SequencerPosition
&spos
)
5822 dout(15) << __FUNC__
<< ": " << cid
<< " " << dest
5823 << " bits " << bits
<< dendl
;
5826 if (!collection_exists(cid
)) {
5827 dout(2) << __FUNC__
<< ": " << cid
<< " DNE" << dendl
;
5828 ceph_assert(replaying
);
5831 if (!collection_exists(dest
)) {
5832 dout(2) << __FUNC__
<< ": " << dest
<< " DNE" << dendl
;
5833 ceph_assert(replaying
);
5838 if (_check_replay_guard(cid
, spos
) > 0)
5839 _collection_set_bits(dest
, bits
);
5842 bool is_pg
= dest
.is_pg(&pgid
);
5845 int dstcmp
= _check_replay_guard(dest
, spos
);
5849 int srccmp
= _check_replay_guard(cid
, spos
);
5853 _set_global_replay_guard(cid
, spos
);
5854 _set_replay_guard(cid
, spos
, true);
5855 _set_replay_guard(dest
, spos
, true);
5860 r
= get_index(cid
, &from
);
5864 r
= get_index(dest
, &to
);
5867 ceph_assert(from
.index
);
5868 std::unique_lock l1
{(from
.index
)->access_lock
};
5870 ceph_assert(to
.index
);
5871 std::unique_lock l2
{(to
.index
)->access_lock
};
5873 r
= from
->merge(bits
, to
.index
);
5880 r
= get_index(cid
.get_temp(), &from
);
5884 r
= get_index(dest
.get_temp(), &to
);
5887 ceph_assert(from
.index
);
5888 std::unique_lock l1
{(from
.index
)->access_lock
};
5890 ceph_assert(to
.index
);
5891 std::unique_lock l2
{(to
.index
)->access_lock
};
5893 r
= from
->merge(bits
, to
.index
);
5898 _destroy_collection(cid
);
5900 _close_replay_guard(dest
, spos
);
5901 _close_replay_guard(dest
.get_temp(), spos
);
5902 // no need to close guards on cid... it's removed.
5904 if (!r
&& cct
->_conf
->filestore_debug_verify_split
) {
5905 vector
<ghobject_t
> objects
;
5910 next
, ghobject_t::get_max(),
5911 get_ideal_list_max(),
5914 if (objects
.empty())
5916 for (vector
<ghobject_t
>::iterator i
= objects
.begin();
5919 if (!i
->match(bits
, pgid
.pgid
.ps())) {
5920 dout(20) << __FUNC__
<< ": " << *i
<< " does not belong in "
5922 ceph_assert(i
->match(bits
, pgid
.pgid
.ps()));
5929 dout(15) << __FUNC__
<< ": " << cid
<< " " << dest
<< " bits " << bits
5930 << " = " << r
<< dendl
;
5934 int FileStore::_split_collection(const coll_t
& cid
,
5938 const SequencerPosition
&spos
)
5942 dout(15) << __FUNC__
<< ": " << cid
<< " bits: " << bits
<< dendl
;
5943 if (!collection_exists(cid
)) {
5944 dout(2) << __FUNC__
<< ": " << cid
<< " DNE" << dendl
;
5945 ceph_assert(replaying
);
5948 if (!collection_exists(dest
)) {
5949 dout(2) << __FUNC__
<< ": " << dest
<< " DNE" << dendl
;
5950 ceph_assert(replaying
);
5954 int dstcmp
= _check_replay_guard(dest
, spos
);
5958 int srccmp
= _check_replay_guard(cid
, spos
);
5962 _set_global_replay_guard(cid
, spos
);
5963 _set_replay_guard(cid
, spos
, true);
5964 _set_replay_guard(dest
, spos
, true);
5967 r
= get_index(cid
, &from
);
5971 r
= get_index(dest
, &to
);
5974 ceph_assert(from
.index
);
5975 std::unique_lock l1
{(from
.index
)->access_lock
};
5977 ceph_assert(to
.index
);
5978 std::unique_lock l2
{(to
.index
)->access_lock
};
5980 r
= from
->split(rem
, bits
, to
.index
);
5983 _close_replay_guard(cid
, spos
);
5984 _close_replay_guard(dest
, spos
);
5986 _collection_set_bits(cid
, bits
);
5987 if (!r
&& cct
->_conf
->filestore_debug_verify_split
) {
5988 vector
<ghobject_t
> objects
;
5993 next
, ghobject_t::get_max(),
5994 get_ideal_list_max(),
5997 if (objects
.empty())
5999 for (vector
<ghobject_t
>::iterator i
= objects
.begin();
6002 dout(20) << __FUNC__
<< ": " << *i
<< " still in source "
6004 ceph_assert(!i
->match(bits
, rem
));
6008 next
= ghobject_t();
6012 next
, ghobject_t::get_max(),
6013 get_ideal_list_max(),
6016 if (objects
.empty())
6018 for (vector
<ghobject_t
>::iterator i
= objects
.begin();
6021 dout(20) << __FUNC__
<< ": " << *i
<< " now in dest "
6023 ceph_assert(i
->match(bits
, rem
));
6031 int FileStore::_set_alloc_hint(const coll_t
& cid
, const ghobject_t
& oid
,
6032 uint64_t expected_object_size
,
6033 uint64_t expected_write_size
)
6035 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " object_size " << expected_object_size
<< " write_size " << expected_write_size
<< dendl
;
6040 if (expected_object_size
== 0 || expected_write_size
== 0)
6043 ret
= lfn_open(cid
, oid
, false, &fd
);
6048 // TODO: a more elaborate hint calculation
6049 uint64_t hint
= std::min
<uint64_t>(expected_write_size
, m_filestore_max_alloc_hint_size
);
6051 ret
= backend
->set_alloc_hint(**fd
, hint
);
6052 dout(20) << __FUNC__
<< ": hint " << hint
<< " ret " << ret
<< dendl
;
6057 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " object_size " << expected_object_size
<< " write_size " << expected_write_size
<< " = " << ret
<< dendl
;
6058 ceph_assert(!m_filestore_fail_eio
|| ret
!= -EIO
);
6062 const char** FileStore::get_tracked_conf_keys() const
6064 static const char* KEYS
[] = {
6065 "filestore_max_inline_xattr_size",
6066 "filestore_max_inline_xattr_size_xfs",
6067 "filestore_max_inline_xattr_size_btrfs",
6068 "filestore_max_inline_xattr_size_other",
6069 "filestore_max_inline_xattrs",
6070 "filestore_max_inline_xattrs_xfs",
6071 "filestore_max_inline_xattrs_btrfs",
6072 "filestore_max_inline_xattrs_other",
6073 "filestore_max_xattr_value_size",
6074 "filestore_max_xattr_value_size_xfs",
6075 "filestore_max_xattr_value_size_btrfs",
6076 "filestore_max_xattr_value_size_other",
6077 "filestore_min_sync_interval",
6078 "filestore_max_sync_interval",
6079 "filestore_queue_max_ops",
6080 "filestore_queue_max_bytes",
6081 "filestore_expected_throughput_bytes",
6082 "filestore_expected_throughput_ops",
6083 "filestore_queue_low_threshhold",
6084 "filestore_queue_high_threshhold",
6085 "filestore_queue_high_delay_multiple",
6086 "filestore_queue_max_delay_multiple",
6087 "filestore_commit_timeout",
6088 "filestore_dump_file",
6089 "filestore_kill_at",
6090 "filestore_fail_eio",
6091 "filestore_fadvise",
6092 "filestore_sloppy_crc",
6093 "filestore_sloppy_crc_block_size",
6094 "filestore_max_alloc_hint_size",
6100 void FileStore::handle_conf_change(const ConfigProxy
& conf
,
6101 const std::set
<std::string
> &changed
)
6103 if (changed
.count("filestore_max_inline_xattr_size") ||
6104 changed
.count("filestore_max_inline_xattr_size_xfs") ||
6105 changed
.count("filestore_max_inline_xattr_size_btrfs") ||
6106 changed
.count("filestore_max_inline_xattr_size_other") ||
6107 changed
.count("filestore_max_inline_xattrs") ||
6108 changed
.count("filestore_max_inline_xattrs_xfs") ||
6109 changed
.count("filestore_max_inline_xattrs_btrfs") ||
6110 changed
.count("filestore_max_inline_xattrs_other") ||
6111 changed
.count("filestore_max_xattr_value_size") ||
6112 changed
.count("filestore_max_xattr_value_size_xfs") ||
6113 changed
.count("filestore_max_xattr_value_size_btrfs") ||
6114 changed
.count("filestore_max_xattr_value_size_other")) {
6116 std::lock_guard
l(lock
);
6117 set_xattr_limits_via_conf();
6121 if (changed
.count("filestore_queue_max_bytes") ||
6122 changed
.count("filestore_queue_max_ops") ||
6123 changed
.count("filestore_expected_throughput_bytes") ||
6124 changed
.count("filestore_expected_throughput_ops") ||
6125 changed
.count("filestore_queue_low_threshhold") ||
6126 changed
.count("filestore_queue_high_threshhold") ||
6127 changed
.count("filestore_queue_high_delay_multiple") ||
6128 changed
.count("filestore_queue_max_delay_multiple")) {
6129 std::lock_guard
l(lock
);
6130 set_throttle_params();
6133 if (changed
.count("filestore_min_sync_interval") ||
6134 changed
.count("filestore_max_sync_interval") ||
6135 changed
.count("filestore_kill_at") ||
6136 changed
.count("filestore_fail_eio") ||
6137 changed
.count("filestore_sloppy_crc") ||
6138 changed
.count("filestore_sloppy_crc_block_size") ||
6139 changed
.count("filestore_max_alloc_hint_size") ||
6140 changed
.count("filestore_fadvise")) {
6141 std::lock_guard
l(lock
);
6142 m_filestore_min_sync_interval
= conf
->filestore_min_sync_interval
;
6143 m_filestore_max_sync_interval
= conf
->filestore_max_sync_interval
;
6144 m_filestore_kill_at
= conf
->filestore_kill_at
;
6145 m_filestore_fail_eio
= conf
->filestore_fail_eio
;
6146 m_filestore_fadvise
= conf
->filestore_fadvise
;
6147 m_filestore_sloppy_crc
= conf
->filestore_sloppy_crc
;
6148 m_filestore_sloppy_crc_block_size
= conf
->filestore_sloppy_crc_block_size
;
6149 m_filestore_max_alloc_hint_size
= conf
->filestore_max_alloc_hint_size
;
6151 if (changed
.count("filestore_commit_timeout")) {
6152 std::lock_guard
l(sync_entry_timeo_lock
);
6153 m_filestore_commit_timeout
= conf
->filestore_commit_timeout
;
6155 if (changed
.count("filestore_dump_file")) {
6156 if (conf
->filestore_dump_file
.length() &&
6157 conf
->filestore_dump_file
!= "-") {
6158 dump_start(conf
->filestore_dump_file
);
6165 int FileStore::set_throttle_params()
6168 bool valid
= throttle_bytes
.set_params(
6169 cct
->_conf
->filestore_queue_low_threshhold
,
6170 cct
->_conf
->filestore_queue_high_threshhold
,
6171 cct
->_conf
->filestore_expected_throughput_bytes
,
6172 cct
->_conf
->filestore_queue_high_delay_multiple
?
6173 cct
->_conf
->filestore_queue_high_delay_multiple
:
6174 cct
->_conf
->filestore_queue_high_delay_multiple_bytes
,
6175 cct
->_conf
->filestore_queue_max_delay_multiple
?
6176 cct
->_conf
->filestore_queue_max_delay_multiple
:
6177 cct
->_conf
->filestore_queue_max_delay_multiple_bytes
,
6178 cct
->_conf
->filestore_queue_max_bytes
,
6181 valid
&= throttle_ops
.set_params(
6182 cct
->_conf
->filestore_queue_low_threshhold
,
6183 cct
->_conf
->filestore_queue_high_threshhold
,
6184 cct
->_conf
->filestore_expected_throughput_ops
,
6185 cct
->_conf
->filestore_queue_high_delay_multiple
?
6186 cct
->_conf
->filestore_queue_high_delay_multiple
:
6187 cct
->_conf
->filestore_queue_high_delay_multiple_ops
,
6188 cct
->_conf
->filestore_queue_max_delay_multiple
?
6189 cct
->_conf
->filestore_queue_max_delay_multiple
:
6190 cct
->_conf
->filestore_queue_max_delay_multiple_ops
,
6191 cct
->_conf
->filestore_queue_max_ops
,
6194 logger
->set(l_filestore_op_queue_max_ops
, throttle_ops
.get_max());
6195 logger
->set(l_filestore_op_queue_max_bytes
, throttle_bytes
.get_max());
6198 derr
<< "tried to set invalid params: "
6202 return valid
? 0 : -EINVAL
;
6205 void FileStore::dump_start(const std::string
& file
)
6207 dout(10) << __FUNC__
<< ": " << file
<< dendl
;
6208 if (m_filestore_do_dump
) {
6211 m_filestore_dump_fmt
.reset();
6212 m_filestore_dump_fmt
.open_array_section("dump");
6213 m_filestore_dump
.open(file
.c_str());
6214 m_filestore_do_dump
= true;
6217 void FileStore::dump_stop()
6219 dout(10) << __FUNC__
<< dendl
;
6220 m_filestore_do_dump
= false;
6221 if (m_filestore_dump
.is_open()) {
6222 m_filestore_dump_fmt
.close_section();
6223 m_filestore_dump_fmt
.flush(m_filestore_dump
);
6224 m_filestore_dump
.flush();
6225 m_filestore_dump
.close();
6229 void FileStore::dump_transactions(vector
<ObjectStore::Transaction
>& ls
, uint64_t seq
, OpSequencer
*osr
)
6231 m_filestore_dump_fmt
.open_array_section("transactions");
6232 unsigned trans_num
= 0;
6233 for (vector
<ObjectStore::Transaction
>::iterator i
= ls
.begin(); i
!= ls
.end(); ++i
, ++trans_num
) {
6234 m_filestore_dump_fmt
.open_object_section("transaction");
6235 m_filestore_dump_fmt
.dump_stream("osr") << osr
->cid
;
6236 m_filestore_dump_fmt
.dump_unsigned("seq", seq
);
6237 m_filestore_dump_fmt
.dump_unsigned("trans_num", trans_num
);
6238 (*i
).dump(&m_filestore_dump_fmt
);
6239 m_filestore_dump_fmt
.close_section();
6241 m_filestore_dump_fmt
.close_section();
6242 m_filestore_dump_fmt
.flush(m_filestore_dump
);
6243 m_filestore_dump
.flush();
6246 void FileStore::get_db_statistics(Formatter
* f
)
6248 object_map
->db
->get_statistics(f
);
6251 void FileStore::set_xattr_limits_via_conf()
6253 uint32_t fs_xattr_size
;
6255 uint32_t fs_xattr_max_value_size
;
6257 switch (m_fs_type
) {
6258 #if defined(__linux__)
6259 case XFS_SUPER_MAGIC
:
6260 fs_xattr_size
= cct
->_conf
->filestore_max_inline_xattr_size_xfs
;
6261 fs_xattrs
= cct
->_conf
->filestore_max_inline_xattrs_xfs
;
6262 fs_xattr_max_value_size
= cct
->_conf
->filestore_max_xattr_value_size_xfs
;
6264 case BTRFS_SUPER_MAGIC
:
6265 fs_xattr_size
= cct
->_conf
->filestore_max_inline_xattr_size_btrfs
;
6266 fs_xattrs
= cct
->_conf
->filestore_max_inline_xattrs_btrfs
;
6267 fs_xattr_max_value_size
= cct
->_conf
->filestore_max_xattr_value_size_btrfs
;
6271 fs_xattr_size
= cct
->_conf
->filestore_max_inline_xattr_size_other
;
6272 fs_xattrs
= cct
->_conf
->filestore_max_inline_xattrs_other
;
6273 fs_xattr_max_value_size
= cct
->_conf
->filestore_max_xattr_value_size_other
;
6277 // Use override value if set
6278 if (cct
->_conf
->filestore_max_inline_xattr_size
)
6279 m_filestore_max_inline_xattr_size
= cct
->_conf
->filestore_max_inline_xattr_size
;
6281 m_filestore_max_inline_xattr_size
= fs_xattr_size
;
6283 // Use override value if set
6284 if (cct
->_conf
->filestore_max_inline_xattrs
)
6285 m_filestore_max_inline_xattrs
= cct
->_conf
->filestore_max_inline_xattrs
;
6287 m_filestore_max_inline_xattrs
= fs_xattrs
;
6289 // Use override value if set
6290 if (cct
->_conf
->filestore_max_xattr_value_size
)
6291 m_filestore_max_xattr_value_size
= cct
->_conf
->filestore_max_xattr_value_size
;
6293 m_filestore_max_xattr_value_size
= fs_xattr_max_value_size
;
6295 if (m_filestore_max_xattr_value_size
< cct
->_conf
->osd_max_object_name_len
) {
6296 derr
<< "WARNING: max attr value size ("
6297 << m_filestore_max_xattr_value_size
6298 << ") is smaller than osd_max_object_name_len ("
6299 << cct
->_conf
->osd_max_object_name_len
6300 << "). Your backend filesystem appears to not support attrs large "
6301 << "enough to handle the configured max rados name size. You may get "
6302 << "unexpected ENAMETOOLONG errors on rados operations or buggy "
6308 uint64_t FileStore::estimate_objects_overhead(uint64_t num_objects
)
6310 uint64_t res
= num_objects
* blk_size
/ 2; //assumes that each object uses ( in average ) additional 1/2 block due to FS allocation granularity.
6314 int FileStore::apply_layout_settings(const coll_t
&cid
, int target_level
)
6316 dout(20) << __FUNC__
<< ": " << cid
<< " target level: "
6317 << target_level
<< dendl
;
6319 int r
= get_index(cid
, &index
);
6321 dout(10) << "Error getting index for " << cid
<< ": " << cpp_strerror(r
)
6326 return index
->apply_layout_settings(target_level
);
6330 // -- FSSuperblock --
6332 void FSSuperblock::encode(bufferlist
&bl
) const
6334 ENCODE_START(2, 1, bl
);
6335 compat_features
.encode(bl
);
6336 encode(omap_backend
, bl
);
6340 void FSSuperblock::decode(bufferlist::const_iterator
&bl
)
6342 DECODE_START(2, bl
);
6343 compat_features
.decode(bl
);
6345 decode(omap_backend
, bl
);
6347 omap_backend
= "leveldb";
6351 void FSSuperblock::dump(Formatter
*f
) const
6353 f
->open_object_section("compat");
6354 compat_features
.dump(f
);
6355 f
->dump_string("omap_backend", omap_backend
);
6359 void FSSuperblock::generate_test_instances(list
<FSSuperblock
*>& o
)
6362 o
.push_back(new FSSuperblock(z
));
6363 CompatSet::FeatureSet feature_compat
;
6364 CompatSet::FeatureSet feature_ro_compat
;
6365 CompatSet::FeatureSet feature_incompat
;
6366 feature_incompat
.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS
);
6367 z
.compat_features
= CompatSet(feature_compat
, feature_ro_compat
,
6369 o
.push_back(new FSSuperblock(z
));
6370 z
.omap_backend
= "rocksdb";
6371 o
.push_back(new FSSuperblock(z
));
6375 #define dout_prefix *_dout << "filestore.osr(" << this << ") "
6377 void FileStore::OpSequencer::_register_apply(Op
*o
)
6379 if (o
->registered_apply
) {
6380 dout(20) << __func__
<< " " << o
<< " already registered" << dendl
;
6383 o
->registered_apply
= true;
6384 for (auto& t
: o
->tls
) {
6385 for (auto& i
: t
.get_object_index()) {
6386 uint32_t key
= i
.first
.hobj
.get_hash();
6387 applying
.emplace(make_pair(key
, &i
.first
));
6388 dout(20) << __func__
<< " " << o
<< " " << i
.first
<< " ("
6389 << &i
.first
<< ")" << dendl
;
6394 void FileStore::OpSequencer::_unregister_apply(Op
*o
)
6396 ceph_assert(o
->registered_apply
);
6397 for (auto& t
: o
->tls
) {
6398 for (auto& i
: t
.get_object_index()) {
6399 uint32_t key
= i
.first
.hobj
.get_hash();
6400 auto p
= applying
.find(key
);
6401 bool removed
= false;
6402 while (p
!= applying
.end() &&
6404 if (p
->second
== &i
.first
) {
6405 dout(20) << __func__
<< " " << o
<< " " << i
.first
<< " ("
6406 << &i
.first
<< ")" << dendl
;
6413 ceph_assert(removed
);
6418 void FileStore::OpSequencer::wait_for_apply(const ghobject_t
& oid
)
6420 std::unique_lock l
{qlock
};
6421 uint32_t key
= oid
.hobj
.get_hash();
6424 // search all items in hash slot for a matching object
6425 auto p
= applying
.find(key
);
6426 while (p
!= applying
.end() &&
6428 if (*p
->second
== oid
) {
6429 dout(20) << __func__
<< " " << oid
<< " waiting on " << p
->second
6438 dout(20) << __func__
<< " " << oid
<< " done" << dendl
;