1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
15 #include "include/compat.h"
16 #include "include/int_types.h"
17 #include "boost/tuple/tuple.hpp"
21 #include <sys/types.h>
27 #include <sys/ioctl.h>
29 #if defined(__linux__)
36 #include "include/linux_fiemap.h"
38 #include "common/xattr.h"
39 #include "chain_xattr.h"
41 #if defined(DARWIN) || defined(__FreeBSD__)
42 #include <sys/param.h>
43 #include <sys/mount.h>
50 #include "FileStore.h"
51 #include "GenericFileStoreBackend.h"
52 #include "BtrfsFileStoreBackend.h"
53 #include "XfsFileStoreBackend.h"
54 #include "ZFSFileStoreBackend.h"
55 #include "common/BackTrace.h"
56 #include "include/types.h"
57 #include "FileJournal.h"
59 #include "osd/osd_types.h"
60 #include "include/color.h"
61 #include "include/buffer.h"
63 #include "common/Timer.h"
64 #include "common/debug.h"
65 #include "common/errno.h"
66 #include "common/run_cmd.h"
67 #include "common/safe_io.h"
68 #include "common/perf_counters.h"
69 #include "common/sync_filesystem.h"
70 #include "common/fd.h"
71 #include "HashIndex.h"
72 #include "DBObjectMap.h"
73 #include "kv/KeyValueDB.h"
75 #include "common/ceph_crypto.h"
76 using ceph::crypto::SHA1
;
78 #include "include/assert.h"
80 #include "common/config.h"
81 #include "common/blkdev.h"
84 #define TRACEPOINT_DEFINE
85 #define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
86 #include "tracing/objectstore.h"
87 #undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE
88 #undef TRACEPOINT_DEFINE
90 #define tracepoint(...)
93 #define dout_context cct
94 #define dout_subsys ceph_subsys_filestore
96 #define dout_prefix *_dout << "filestore(" << basedir << ") "
98 #define COMMIT_SNAP_ITEM "snap_%llu"
99 #define CLUSTER_SNAP_ITEM "clustersnap_%s"
101 #define REPLAY_GUARD_XATTR "user.cephos.seq"
102 #define GLOBAL_REPLAY_GUARD_XATTR "user.cephos.gseq"
104 // XATTR_SPILL_OUT_NAME as a xattr is used to maintain that indicates whether
105 // xattrs spill over into DBObjectMap, if XATTR_SPILL_OUT_NAME exists in file
106 // xattrs and the value is "no", it indicates no xattrs in DBObjectMap
107 #define XATTR_SPILL_OUT_NAME "user.cephos.spill_out"
108 #define XATTR_NO_SPILL_OUT "0"
109 #define XATTR_SPILL_OUT "1"
111 //Initial features in new superblock.
112 static CompatSet
get_fs_initial_compat_set() {
113 CompatSet::FeatureSet ceph_osd_feature_compat
;
114 CompatSet::FeatureSet ceph_osd_feature_ro_compat
;
115 CompatSet::FeatureSet ceph_osd_feature_incompat
;
116 return CompatSet(ceph_osd_feature_compat
, ceph_osd_feature_ro_compat
,
117 ceph_osd_feature_incompat
);
120 //Features are added here that this FileStore supports.
121 static CompatSet
get_fs_supported_compat_set() {
122 CompatSet compat
= get_fs_initial_compat_set();
123 //Any features here can be set in code, but not in initial superblock
124 compat
.incompat
.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS
);
128 int FileStore::validate_hobject_key(const hobject_t
&obj
) const
130 unsigned len
= LFNIndex::get_max_escaped_name_len(obj
);
131 return len
> m_filestore_max_xattr_value_size
? -ENAMETOOLONG
: 0;
134 int FileStore::get_block_device_fsid(CephContext
* cct
, const string
& path
,
137 // make sure we don't try to use aio or direct_io (and get annoying
138 // error messages from failing to do so); performance implications
139 // should be irrelevant for this use
140 FileJournal
j(cct
, *fsid
, 0, 0, path
.c_str(), false, false);
141 return j
.peek_fsid(*fsid
);
144 void FileStore::FSPerfTracker::update_from_perfcounters(
145 PerfCounters
&logger
)
147 os_commit_latency
.consume_next(
149 l_filestore_journal_latency
));
150 os_apply_latency
.consume_next(
152 l_filestore_apply_latency
));
156 ostream
& operator<<(ostream
& out
, const FileStore::OpSequencer
& s
)
158 return out
<< *s
.parent
;
161 int FileStore::get_cdir(const coll_t
& cid
, char *s
, int len
)
163 const string
&cid_str(cid
.to_str());
164 return snprintf(s
, len
, "%s/current/%s", basedir
.c_str(), cid_str
.c_str());
167 int FileStore::get_index(const coll_t
& cid
, Index
*index
)
169 int r
= index_manager
.get_index(cid
, basedir
, index
);
170 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
174 int FileStore::init_index(const coll_t
& cid
)
177 get_cdir(cid
, path
, sizeof(path
));
178 int r
= index_manager
.init_index(cid
, path
, target_version
);
179 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
183 int FileStore::lfn_find(const ghobject_t
& oid
, const Index
& index
, IndexedPath
*path
)
189 assert(NULL
!= index
.index
);
190 r
= (index
.index
)->lookup(oid
, path
, &exist
);
192 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
200 int FileStore::lfn_truncate(const coll_t
& cid
, const ghobject_t
& oid
, off_t length
)
203 int r
= lfn_open(cid
, oid
, false, &fd
);
206 r
= ::ftruncate(**fd
, length
);
209 if (r
>= 0 && m_filestore_sloppy_crc
) {
210 int rc
= backend
->_crc_update_truncate(**fd
, length
);
214 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
218 int FileStore::lfn_stat(const coll_t
& cid
, const ghobject_t
& oid
, struct stat
*buf
)
222 int r
= get_index(cid
, &index
);
226 assert(NULL
!= index
.index
);
227 RWLock::RLocker
l((index
.index
)->access_lock
);
229 r
= lfn_find(oid
, index
, &path
);
232 r
= ::stat(path
->path(), buf
);
238 int FileStore::lfn_open(const coll_t
& cid
,
239 const ghobject_t
& oid
,
246 bool need_lock
= true;
251 if (cct
->_conf
->filestore_odsync_write
) {
259 if (!((*index
).index
)) {
260 r
= get_index(cid
, index
);
262 dout(10) << __func__
<< " could not get index r = " << r
<< dendl
;
270 assert(NULL
!= (*index
).index
);
272 ((*index
).index
)->access_lock
.get_write();
275 *outfd
= fdcache
.lookup(oid
);
278 ((*index
).index
)->access_lock
.put_write();
286 IndexedPath
*path
= &path2
;
288 r
= (*index
)->lookup(oid
, path
, &exist
);
290 derr
<< "could not find " << oid
<< " in index: "
291 << cpp_strerror(-r
) << dendl
;
295 r
= ::open((*path
)->path(), flags
, 0644);
298 dout(10) << "error opening file " << (*path
)->path() << " with flags="
299 << flags
<< ": " << cpp_strerror(-r
) << dendl
;
303 if (create
&& (!exist
)) {
304 r
= (*index
)->created(oid
, (*path
)->path());
306 VOID_TEMP_FAILURE_RETRY(::close(fd
));
307 derr
<< "error creating " << oid
<< " (" << (*path
)->path()
308 << ") in index: " << cpp_strerror(-r
) << dendl
;
311 r
= chain_fsetxattr
<true, true>(
312 fd
, XATTR_SPILL_OUT_NAME
,
313 XATTR_NO_SPILL_OUT
, sizeof(XATTR_NO_SPILL_OUT
));
315 VOID_TEMP_FAILURE_RETRY(::close(fd
));
316 derr
<< "error setting spillout xattr for oid " << oid
<< " (" << (*path
)->path()
317 << "):" << cpp_strerror(-r
) << dendl
;
324 *outfd
= fdcache
.add(oid
, fd
, &existed
);
326 TEMP_FAILURE_RETRY(::close(fd
));
329 *outfd
= std::make_shared
<FDCache::FD
>(fd
);
333 ((*index
).index
)->access_lock
.put_write();
341 ((*index
).index
)->access_lock
.put_write();
344 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
348 void FileStore::lfn_close(FDRef fd
)
352 int FileStore::lfn_link(const coll_t
& c
, const coll_t
& newcid
, const ghobject_t
& o
, const ghobject_t
& newoid
)
354 Index index_new
, index_old
;
355 IndexedPath path_new
, path_old
;
358 bool index_same
= false;
360 r
= get_index(newcid
, &index_new
);
363 r
= get_index(c
, &index_old
);
366 } else if (c
== newcid
) {
367 r
= get_index(c
, &index_old
);
370 index_new
= index_old
;
373 r
= get_index(c
, &index_old
);
376 r
= get_index(newcid
, &index_new
);
381 assert(NULL
!= index_old
.index
);
382 assert(NULL
!= index_new
.index
);
386 RWLock::RLocker
l1((index_old
.index
)->access_lock
);
388 r
= index_old
->lookup(o
, &path_old
, &exist
);
390 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
396 RWLock::WLocker
l2((index_new
.index
)->access_lock
);
398 r
= index_new
->lookup(newoid
, &path_new
, &exist
);
400 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
406 dout(25) << "lfn_link path_old: " << path_old
<< dendl
;
407 dout(25) << "lfn_link path_new: " << path_new
<< dendl
;
408 r
= ::link(path_old
->path(), path_new
->path());
412 r
= index_new
->created(newoid
, path_new
->path());
414 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
418 RWLock::WLocker
l1((index_old
.index
)->access_lock
);
420 r
= index_old
->lookup(o
, &path_old
, &exist
);
422 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
428 r
= index_new
->lookup(newoid
, &path_new
, &exist
);
430 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
436 dout(25) << "lfn_link path_old: " << path_old
<< dendl
;
437 dout(25) << "lfn_link path_new: " << path_new
<< dendl
;
438 r
= ::link(path_old
->path(), path_new
->path());
442 // make sure old fd for unlinked/overwritten file is gone
443 fdcache
.clear(newoid
);
445 r
= index_new
->created(newoid
, path_new
->path());
447 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
454 int FileStore::lfn_unlink(const coll_t
& cid
, const ghobject_t
& o
,
455 const SequencerPosition
&spos
,
456 bool force_clear_omap
)
459 int r
= get_index(cid
, &index
);
461 dout(25) << __func__
<< " get_index failed " << cpp_strerror(r
) << dendl
;
465 assert(NULL
!= index
.index
);
466 RWLock::WLocker
l((index
.index
)->access_lock
);
471 r
= index
->lookup(o
, &path
, &hardlink
);
473 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
477 if (!force_clear_omap
) {
478 if (hardlink
== 0 || hardlink
== 1) {
479 force_clear_omap
= true;
482 if (force_clear_omap
) {
483 dout(20) << __func__
<< ": clearing omap on " << o
484 << " in cid " << cid
<< dendl
;
485 r
= object_map
->clear(o
, &spos
);
486 if (r
< 0 && r
!= -ENOENT
) {
487 dout(25) << __func__
<< " omap clear failed " << cpp_strerror(r
) << dendl
;
488 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
491 if (cct
->_conf
->filestore_debug_inject_read_err
) {
492 debug_obj_on_delete(o
);
494 if (!m_disable_wbthrottle
) {
495 wbthrottle
.clear_object(o
); // should be only non-cache ref
499 /* Ensure that replay of this op doesn't result in the object_map
502 if (!backend
->can_checkpoint())
503 object_map
->sync(&o
, &spos
);
506 if (!m_disable_wbthrottle
) {
507 wbthrottle
.clear_object(o
); // should be only non-cache ref
512 r
= index
->unlink(o
);
514 dout(25) << __func__
<< " index unlink failed " << cpp_strerror(r
) << dendl
;
520 FileStore::FileStore(CephContext
* cct
, const std::string
&base
,
521 const std::string
&jdev
, osflagbits_t flags
,
522 const char *name
, bool do_update
) :
523 JournalingObjectStore(cct
, base
),
525 basedir(base
), journalpath(jdev
),
526 generic_flags(flags
),
528 fsid_fd(-1), op_fd(-1),
529 basedir_fd(-1), current_fd(-1),
531 index_manager(cct
, do_update
),
532 lock("FileStore::lock"),
534 sync_entry_timeo_lock("FileStore::sync_entry_timeo_lock"),
535 timer(cct
, sync_entry_timeo_lock
),
536 stop(false), sync_thread(this),
540 m_disable_wbthrottle(cct
->_conf
->filestore_odsync_write
||
541 !cct
->_conf
->filestore_wbthrottle_enable
),
542 throttle_ops(cct
, "filestore_ops", cct
->_conf
->filestore_caller_concurrency
),
543 throttle_bytes(cct
, "filestore_bytes", cct
->_conf
->filestore_caller_concurrency
),
544 m_ondisk_finisher_num(cct
->_conf
->filestore_ondisk_finisher_threads
),
545 m_apply_finisher_num(cct
->_conf
->filestore_apply_finisher_threads
),
546 op_tp(cct
, "FileStore::op_tp", "tp_fstore_op", cct
->_conf
->filestore_op_threads
, "filestore_op_threads"),
547 op_wq(this, cct
->_conf
->filestore_op_thread_timeout
,
548 cct
->_conf
->filestore_op_thread_suicide_timeout
, &op_tp
),
550 trace_endpoint("0.0.0.0", 0, "FileStore"),
551 read_error_lock("FileStore::read_error_lock"),
552 m_filestore_commit_timeout(cct
->_conf
->filestore_commit_timeout
),
553 m_filestore_journal_parallel(cct
->_conf
->filestore_journal_parallel
),
554 m_filestore_journal_trailing(cct
->_conf
->filestore_journal_trailing
),
555 m_filestore_journal_writeahead(cct
->_conf
->filestore_journal_writeahead
),
556 m_filestore_fiemap_threshold(cct
->_conf
->filestore_fiemap_threshold
),
557 m_filestore_max_sync_interval(cct
->_conf
->filestore_max_sync_interval
),
558 m_filestore_min_sync_interval(cct
->_conf
->filestore_min_sync_interval
),
559 m_filestore_fail_eio(cct
->_conf
->filestore_fail_eio
),
560 m_filestore_fadvise(cct
->_conf
->filestore_fadvise
),
561 do_update(do_update
),
562 m_journal_dio(cct
->_conf
->journal_dio
),
563 m_journal_aio(cct
->_conf
->journal_aio
),
564 m_journal_force_aio(cct
->_conf
->journal_force_aio
),
565 m_osd_rollback_to_cluster_snap(cct
->_conf
->osd_rollback_to_cluster_snap
),
566 m_osd_use_stale_snap(cct
->_conf
->osd_use_stale_snap
),
567 m_filestore_do_dump(false),
568 m_filestore_dump_fmt(true),
569 m_filestore_sloppy_crc(cct
->_conf
->filestore_sloppy_crc
),
570 m_filestore_sloppy_crc_block_size(cct
->_conf
->filestore_sloppy_crc_block_size
),
571 m_filestore_max_alloc_hint_size(cct
->_conf
->filestore_max_alloc_hint_size
),
573 m_filestore_max_inline_xattr_size(0),
574 m_filestore_max_inline_xattrs(0),
575 m_filestore_max_xattr_value_size(0)
577 m_filestore_kill_at
.set(cct
->_conf
->filestore_kill_at
);
578 for (int i
= 0; i
< m_ondisk_finisher_num
; ++i
) {
580 oss
<< "filestore-ondisk-" << i
;
581 Finisher
*f
= new Finisher(cct
, oss
.str(), "fn_odsk_fstore");
582 ondisk_finishers
.push_back(f
);
584 for (int i
= 0; i
< m_apply_finisher_num
; ++i
) {
586 oss
<< "filestore-apply-" << i
;
587 Finisher
*f
= new Finisher(cct
, oss
.str(), "fn_appl_fstore");
588 apply_finishers
.push_back(f
);
592 oss
<< basedir
<< "/current";
593 current_fn
= oss
.str();
596 sss
<< basedir
<< "/current/commit_op_seq";
597 current_op_seq_fn
= sss
.str();
600 if (cct
->_conf
->filestore_omap_backend_path
!= "") {
601 omap_dir
= cct
->_conf
->filestore_omap_backend_path
;
603 omss
<< basedir
<< "/current/omap";
604 omap_dir
= omss
.str();
608 PerfCountersBuilder
plb(cct
, internal_name
, l_filestore_first
, l_filestore_last
);
610 plb
.add_u64(l_filestore_journal_queue_ops
, "journal_queue_ops", "Operations in journal queue");
611 plb
.add_u64(l_filestore_journal_ops
, "journal_ops", "Active journal entries to be applied");
612 plb
.add_u64(l_filestore_journal_queue_bytes
, "journal_queue_bytes", "Size of journal queue");
613 plb
.add_u64(l_filestore_journal_bytes
, "journal_bytes", "Active journal operation size to be applied");
614 plb
.add_time_avg(l_filestore_journal_latency
, "journal_latency", "Average journal queue completing latency");
615 plb
.add_u64_counter(l_filestore_journal_wr
, "journal_wr", "Journal write IOs");
616 plb
.add_u64_avg(l_filestore_journal_wr_bytes
, "journal_wr_bytes", "Journal data written");
617 plb
.add_u64(l_filestore_op_queue_max_ops
, "op_queue_max_ops", "Max operations in writing to FS queue");
618 plb
.add_u64(l_filestore_op_queue_ops
, "op_queue_ops", "Operations in writing to FS queue");
619 plb
.add_u64_counter(l_filestore_ops
, "ops", "Operations written to store");
620 plb
.add_u64(l_filestore_op_queue_max_bytes
, "op_queue_max_bytes", "Max data in writing to FS queue");
621 plb
.add_u64(l_filestore_op_queue_bytes
, "op_queue_bytes", "Size of writing to FS queue");
622 plb
.add_u64_counter(l_filestore_bytes
, "bytes", "Data written to store");
623 plb
.add_time_avg(l_filestore_apply_latency
, "apply_latency", "Apply latency");
624 plb
.add_u64(l_filestore_committing
, "committing", "Is currently committing");
626 plb
.add_u64_counter(l_filestore_commitcycle
, "commitcycle", "Commit cycles");
627 plb
.add_time_avg(l_filestore_commitcycle_interval
, "commitcycle_interval", "Average interval between commits");
628 plb
.add_time_avg(l_filestore_commitcycle_latency
, "commitcycle_latency", "Average latency of commit");
629 plb
.add_u64_counter(l_filestore_journal_full
, "journal_full", "Journal writes while full");
630 plb
.add_time_avg(l_filestore_queue_transaction_latency_avg
, "queue_transaction_latency_avg", "Store operation queue latency");
632 logger
= plb
.create_perf_counters();
634 cct
->get_perfcounters_collection()->add(logger
);
635 cct
->_conf
->add_observer(this);
637 superblock
.compat_features
= get_fs_initial_compat_set();
640 FileStore::~FileStore()
642 for (vector
<Finisher
*>::iterator it
= ondisk_finishers
.begin(); it
!= ondisk_finishers
.end(); ++it
) {
646 for (vector
<Finisher
*>::iterator it
= apply_finishers
.begin(); it
!= apply_finishers
.end(); ++it
) {
650 cct
->_conf
->remove_observer(this);
651 cct
->get_perfcounters_collection()->remove(logger
);
654 journal
->logger
= NULL
;
657 if (m_filestore_do_dump
) {
662 static void get_attrname(const char *name
, char *buf
, int len
)
664 snprintf(buf
, len
, "user.ceph.%s", name
);
667 bool parse_attrname(char **name
)
669 if (strncmp(*name
, "user.ceph.", 10) == 0) {
676 void FileStore::collect_metadata(map
<string
,string
> *pm
)
678 char partition_path
[PATH_MAX
];
679 char dev_node
[PATH_MAX
];
682 (*pm
)["filestore_backend"] = backend
->get_name();
684 ss
<< "0x" << std::hex
<< m_fs_type
<< std::dec
;
685 (*pm
)["filestore_f_type"] = ss
.str();
687 if (cct
->_conf
->filestore_collect_device_partition_information
) {
688 rc
= get_device_by_uuid(get_fsid(), "PARTUUID", partition_path
,
697 (*pm
)["backend_filestore_partition_path"] = "unknown";
698 (*pm
)["backend_filestore_dev_node"] = "unknown";
701 (*pm
)["backend_filestore_partition_path"] = string(partition_path
);
702 (*pm
)["backend_filestore_dev_node"] = "unknown";
705 (*pm
)["backend_filestore_partition_path"] = string(partition_path
);
706 (*pm
)["backend_filestore_dev_node"] = string(dev_node
);
710 int FileStore::statfs(struct store_statfs_t
*buf0
)
714 if (::statfs(basedir
.c_str(), &buf
) < 0) {
716 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
717 assert(r
!= -ENOENT
);
720 buf0
->total
= buf
.f_blocks
* buf
.f_bsize
;
721 buf0
->available
= buf
.f_bavail
* buf
.f_bsize
;
722 // Adjust for writes pending in the journal
724 uint64_t estimate
= journal
->get_journal_size_estimate();
725 if (buf0
->available
> estimate
)
726 buf0
->available
-= estimate
;
734 void FileStore::new_journal()
736 if (journalpath
.length()) {
737 dout(10) << "open_journal at " << journalpath
<< dendl
;
738 journal
= new FileJournal(cct
, fsid
, &finisher
, &sync_cond
,
740 m_journal_dio
, m_journal_aio
,
741 m_journal_force_aio
);
743 journal
->logger
= logger
;
748 int FileStore::dump_journal(ostream
& out
)
752 if (!journalpath
.length())
755 FileJournal
*journal
= new FileJournal(cct
, fsid
, &finisher
, &sync_cond
, journalpath
.c_str(), m_journal_dio
);
756 r
= journal
->dump(out
);
761 FileStoreBackend
*FileStoreBackend::create(long f_type
, FileStore
*fs
)
764 #if defined(__linux__)
765 case BTRFS_SUPER_MAGIC
:
766 return new BtrfsFileStoreBackend(fs
);
768 case XFS_SUPER_MAGIC
:
769 return new XfsFileStoreBackend(fs
);
773 case ZFS_SUPER_MAGIC
:
774 return new ZFSFileStoreBackend(fs
);
777 return new GenericFileStoreBackend(fs
);
781 void FileStore::create_backend(long f_type
)
785 assert(backend
== NULL
);
786 backend
= FileStoreBackend::create(f_type
, this);
788 dout(0) << "backend " << backend
->get_name()
789 << " (magic 0x" << std::hex
<< f_type
<< std::dec
<< ")"
793 #if defined(__linux__)
794 case BTRFS_SUPER_MAGIC
:
795 if (!m_disable_wbthrottle
){
796 wbthrottle
.set_fs(WBThrottle::BTRFS
);
800 case XFS_SUPER_MAGIC
:
801 // wbthrottle is constructed with fs(WBThrottle::XFS)
806 set_xattr_limits_via_conf();
809 int FileStore::mkfs()
812 char fsid_fn
[PATH_MAX
];
815 uuid_d old_omap_fsid
;
817 dout(1) << "mkfs in " << basedir
<< dendl
;
818 basedir_fd
= ::open(basedir
.c_str(), O_RDONLY
);
819 if (basedir_fd
< 0) {
821 derr
<< "mkfs failed to open base dir " << basedir
<< ": " << cpp_strerror(ret
) << dendl
;
826 snprintf(fsid_fn
, sizeof(fsid_fn
), "%s/fsid", basedir
.c_str());
827 fsid_fd
= ::open(fsid_fn
, O_RDWR
|O_CREAT
, 0644);
830 derr
<< "mkfs: failed to open " << fsid_fn
<< ": " << cpp_strerror(ret
) << dendl
;
831 goto close_basedir_fd
;
834 if (lock_fsid() < 0) {
839 if (read_fsid(fsid_fd
, &old_fsid
) < 0 || old_fsid
.is_zero()) {
840 if (fsid
.is_zero()) {
841 fsid
.generate_random();
842 dout(1) << "mkfs generated fsid " << fsid
<< dendl
;
844 dout(1) << "mkfs using provided fsid " << fsid
<< dendl
;
847 fsid
.print(fsid_str
);
848 strcat(fsid_str
, "\n");
849 ret
= ::ftruncate(fsid_fd
, 0);
852 derr
<< "mkfs: failed to truncate fsid: "
853 << cpp_strerror(ret
) << dendl
;
856 ret
= safe_write(fsid_fd
, fsid_str
, strlen(fsid_str
));
858 derr
<< "mkfs: failed to write fsid: "
859 << cpp_strerror(ret
) << dendl
;
862 if (::fsync(fsid_fd
) < 0) {
864 derr
<< "mkfs: close failed: can't write fsid: "
865 << cpp_strerror(ret
) << dendl
;
868 dout(10) << "mkfs fsid is " << fsid
<< dendl
;
870 if (!fsid
.is_zero() && fsid
!= old_fsid
) {
871 derr
<< "mkfs on-disk fsid " << old_fsid
<< " != provided " << fsid
<< dendl
;
876 dout(1) << "mkfs fsid is already set to " << fsid
<< dendl
;
880 ret
= write_version_stamp();
882 derr
<< "mkfs: write_version_stamp() failed: "
883 << cpp_strerror(ret
) << dendl
;
888 superblock
.omap_backend
= cct
->_conf
->filestore_omap_backend
;
889 ret
= write_superblock();
891 derr
<< "mkfs: write_superblock() failed: "
892 << cpp_strerror(ret
) << dendl
;
896 struct statfs basefs
;
897 ret
= ::fstatfs(basedir_fd
, &basefs
);
900 derr
<< "mkfs cannot fstatfs basedir "
901 << cpp_strerror(ret
) << dendl
;
905 create_backend(basefs
.f_type
);
907 ret
= backend
->create_current();
909 derr
<< "mkfs: failed to create current/ " << cpp_strerror(ret
) << dendl
;
913 // write initial op_seq
915 uint64_t initial_seq
= 0;
916 int fd
= read_op_seq(&initial_seq
);
919 derr
<< "mkfs: failed to create " << current_op_seq_fn
<< ": "
920 << cpp_strerror(ret
) << dendl
;
923 if (initial_seq
== 0) {
924 ret
= write_op_seq(fd
, 1);
926 VOID_TEMP_FAILURE_RETRY(::close(fd
));
927 derr
<< "mkfs: failed to write to " << current_op_seq_fn
<< ": "
928 << cpp_strerror(ret
) << dendl
;
932 if (backend
->can_checkpoint()) {
934 current_fd
= ::open(current_fn
.c_str(), O_RDONLY
);
935 assert(current_fd
>= 0);
937 snprintf(s
, sizeof(s
), COMMIT_SNAP_ITEM
, 1ull);
938 ret
= backend
->create_checkpoint(s
, NULL
);
939 VOID_TEMP_FAILURE_RETRY(::close(current_fd
));
940 if (ret
< 0 && ret
!= -EEXIST
) {
941 VOID_TEMP_FAILURE_RETRY(::close(fd
));
942 derr
<< "mkfs: failed to create snap_1: " << cpp_strerror(ret
) << dendl
;
947 VOID_TEMP_FAILURE_RETRY(::close(fd
));
949 ret
= KeyValueDB::test_init(superblock
.omap_backend
, omap_dir
);
951 derr
<< "mkfs failed to create " << cct
->_conf
->filestore_omap_backend
<< dendl
;
954 // create fsid under omap
957 char omap_fsid_fn
[PATH_MAX
];
958 snprintf(omap_fsid_fn
, sizeof(omap_fsid_fn
), "%s/osd_uuid", omap_dir
.c_str());
959 omap_fsid_fd
= ::open(omap_fsid_fn
, O_RDWR
|O_CREAT
, 0644);
960 if (omap_fsid_fd
< 0) {
962 derr
<< "mkfs: failed to open " << omap_fsid_fn
<< ": " << cpp_strerror(ret
) << dendl
;
966 if (read_fsid(omap_fsid_fd
, &old_omap_fsid
) < 0 || old_omap_fsid
.is_zero()) {
967 assert(!fsid
.is_zero());
968 fsid
.print(fsid_str
);
969 strcat(fsid_str
, "\n");
970 ret
= ::ftruncate(omap_fsid_fd
, 0);
973 derr
<< "mkfs: failed to truncate fsid: "
974 << cpp_strerror(ret
) << dendl
;
975 goto close_omap_fsid_fd
;
977 ret
= safe_write(omap_fsid_fd
, fsid_str
, strlen(fsid_str
));
979 derr
<< "mkfs: failed to write fsid: "
980 << cpp_strerror(ret
) << dendl
;
981 goto close_omap_fsid_fd
;
983 dout(10) << "mkfs: write success, fsid:" << fsid_str
<< ", ret:" << ret
<< dendl
;
984 if (::fsync(omap_fsid_fd
) < 0) {
986 derr
<< "mkfs: close failed: can't write fsid: "
987 << cpp_strerror(ret
) << dendl
;
988 goto close_omap_fsid_fd
;
990 dout(10) << "mkfs omap fsid is " << fsid
<< dendl
;
992 if (fsid
!= old_omap_fsid
) {
993 derr
<< "FileStore::mkfs: " << omap_fsid_fn
994 << " has existed omap fsid " << old_omap_fsid
995 << " != expected osd fsid " << fsid
998 goto close_omap_fsid_fd
;
1000 dout(1) << "FileStore::mkfs: omap fsid is already set to " << fsid
<< dendl
;
1003 dout(1) << cct
->_conf
->filestore_omap_backend
<< " db exists/created" << dendl
;
1008 goto close_omap_fsid_fd
;
1010 ret
= write_meta("type", "filestore");
1012 goto close_omap_fsid_fd
;
1014 dout(1) << "mkfs done in " << basedir
<< dendl
;
1018 VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd
));
1020 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd
));
1023 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd
));
1029 int FileStore::mkjournal()
1034 snprintf(fn
, sizeof(fn
), "%s/fsid", basedir
.c_str());
1035 int fd
= ::open(fn
, O_RDONLY
, 0644);
1038 derr
<< "FileStore::mkjournal: open error: " << cpp_strerror(err
) << dendl
;
1041 ret
= read_fsid(fd
, &fsid
);
1043 derr
<< "FileStore::mkjournal: read error: " << cpp_strerror(ret
) << dendl
;
1044 VOID_TEMP_FAILURE_RETRY(::close(fd
));
1047 VOID_TEMP_FAILURE_RETRY(::close(fd
));
1053 ret
= journal
->check();
1055 ret
= journal
->create();
1057 derr
<< "mkjournal error creating journal on " << journalpath
1058 << ": " << cpp_strerror(ret
) << dendl
;
1060 dout(0) << "mkjournal created journal on " << journalpath
<< dendl
;
1068 int FileStore::read_fsid(int fd
, uuid_d
*uuid
)
1071 memset(fsid_str
, 0, sizeof(fsid_str
));
1072 int ret
= safe_read(fd
, fsid_str
, sizeof(fsid_str
));
1076 // old 64-bit fsid... mirror it.
1077 *(uint64_t*)&uuid
->bytes()[0] = *(uint64_t*)fsid_str
;
1078 *(uint64_t*)&uuid
->bytes()[8] = *(uint64_t*)fsid_str
;
1086 if (!uuid
->parse(fsid_str
))
1091 int FileStore::lock_fsid()
1094 memset(&l
, 0, sizeof(l
));
1096 l
.l_whence
= SEEK_SET
;
1099 int r
= ::fcntl(fsid_fd
, F_SETLK
, &l
);
1102 dout(0) << "lock_fsid failed to lock " << basedir
<< "/fsid, is another ceph-osd still running? "
1103 << cpp_strerror(err
) << dendl
;
1109 bool FileStore::test_mount_in_use()
1111 dout(5) << "test_mount basedir " << basedir
<< " journal " << journalpath
<< dendl
;
1113 snprintf(fn
, sizeof(fn
), "%s/fsid", basedir
.c_str());
1115 // verify fs isn't in use
1117 fsid_fd
= ::open(fn
, O_RDWR
, 0644);
1119 return 0; // no fsid, ok.
1120 bool inuse
= lock_fsid() < 0;
1121 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd
));
1126 int FileStore::_detect_fs()
1129 int r
= ::fstatfs(basedir_fd
, &st
);
1133 blk_size
= st
.f_bsize
;
1135 create_backend(st
.f_type
);
1137 r
= backend
->detect_features();
1139 derr
<< "_detect_fs: detect_features error: " << cpp_strerror(r
) << dendl
;
1147 snprintf(fn
, sizeof(fn
), "%s/xattr_test", basedir
.c_str());
1148 int tmpfd
= ::open(fn
, O_CREAT
|O_WRONLY
|O_TRUNC
, 0700);
1151 derr
<< "_detect_fs unable to create " << fn
<< ": " << cpp_strerror(ret
) << dendl
;
1155 int ret
= chain_fsetxattr(tmpfd
, "user.test", &x
, sizeof(x
));
1157 ret
= chain_fgetxattr(tmpfd
, "user.test", &y
, sizeof(y
));
1158 if ((ret
< 0) || (x
!= y
)) {
1159 derr
<< "Extended attributes don't appear to work. ";
1161 *_dout
<< "Got error " + cpp_strerror(ret
) + ". ";
1162 *_dout
<< "If you are using ext3 or ext4, be sure to mount the underlying "
1163 << "file system with the 'user_xattr' option." << dendl
;
1165 VOID_TEMP_FAILURE_RETRY(::close(tmpfd
));
1170 memset(buf
, 0, sizeof(buf
)); // shut up valgrind
1171 chain_fsetxattr(tmpfd
, "user.test", &buf
, sizeof(buf
));
1172 chain_fsetxattr(tmpfd
, "user.test2", &buf
, sizeof(buf
));
1173 chain_fsetxattr(tmpfd
, "user.test3", &buf
, sizeof(buf
));
1174 chain_fsetxattr(tmpfd
, "user.test4", &buf
, sizeof(buf
));
1175 ret
= chain_fsetxattr(tmpfd
, "user.test5", &buf
, sizeof(buf
));
1176 if (ret
== -ENOSPC
) {
1177 dout(0) << "limited size xattrs" << dendl
;
1179 chain_fremovexattr(tmpfd
, "user.test");
1180 chain_fremovexattr(tmpfd
, "user.test2");
1181 chain_fremovexattr(tmpfd
, "user.test3");
1182 chain_fremovexattr(tmpfd
, "user.test4");
1183 chain_fremovexattr(tmpfd
, "user.test5");
1186 VOID_TEMP_FAILURE_RETRY(::close(tmpfd
));
1191 int FileStore::_sanity_check_fs()
1195 if (((int)m_filestore_journal_writeahead
+
1196 (int)m_filestore_journal_parallel
+
1197 (int)m_filestore_journal_trailing
) > 1) {
1198 dout(0) << "mount ERROR: more than one of filestore journal {writeahead,parallel,trailing} enabled" << dendl
;
1200 << " ** WARNING: more than one of 'filestore journal {writeahead,parallel,trailing}'\n"
1201 << " is enabled in ceph.conf. You must choose a single journal mode."
1202 << TEXT_NORMAL
<< std::endl
;
1206 if (!backend
->can_checkpoint()) {
1207 if (!journal
|| !m_filestore_journal_writeahead
) {
1208 dout(0) << "mount WARNING: no btrfs, and no journal in writeahead mode; data may be lost" << dendl
;
1210 << " ** WARNING: no btrfs AND (no journal OR journal not in writeahead mode)\n"
1211 << " For non-btrfs volumes, a writeahead journal is required to\n"
1212 << " maintain on-disk consistency in the event of a crash. Your conf\n"
1213 << " should include something like:\n"
1214 << " osd journal = /path/to/journal_device_or_file\n"
1215 << " filestore journal writeahead = true\n"
1221 dout(0) << "mount WARNING: no journal" << dendl
;
1223 << " ** WARNING: No osd journal is configured: write latency may be high.\n"
1224 << " If you will not be using an osd journal, write latency may be\n"
1225 << " relatively high. It can be reduced somewhat by lowering\n"
1226 << " filestore_max_sync_interval, but lower values mean lower write\n"
1227 << " throughput, especially with spinning disks.\n"
1234 int FileStore::write_superblock()
1237 ::encode(superblock
, bl
);
1238 return safe_write_file(basedir
.c_str(), "superblock",
1239 bl
.c_str(), bl
.length());
1242 int FileStore::read_superblock()
1244 bufferptr
bp(PATH_MAX
);
1245 int ret
= safe_read_file(basedir
.c_str(), "superblock",
1246 bp
.c_str(), bp
.length());
1248 if (ret
== -ENOENT
) {
1249 // If the file doesn't exist write initial CompatSet
1250 return write_superblock();
1256 bl
.push_back(std::move(bp
));
1257 bufferlist::iterator i
= bl
.begin();
1258 ::decode(superblock
, i
);
1262 int FileStore::update_version_stamp()
1264 return write_version_stamp();
1267 int FileStore::version_stamp_is_valid(uint32_t *version
)
1269 bufferptr
bp(PATH_MAX
);
1270 int ret
= safe_read_file(basedir
.c_str(), "store_version",
1271 bp
.c_str(), bp
.length());
1276 bl
.push_back(std::move(bp
));
1277 bufferlist::iterator i
= bl
.begin();
1278 ::decode(*version
, i
);
1279 dout(10) << __func__
<< " was " << *version
<< " vs target "
1280 << target_version
<< dendl
;
1281 if (*version
== target_version
)
1287 int FileStore::write_version_stamp()
1289 dout(1) << __func__
<< " " << target_version
<< dendl
;
1291 ::encode(target_version
, bl
);
1293 return safe_write_file(basedir
.c_str(), "store_version",
1294 bl
.c_str(), bl
.length());
1297 int FileStore::upgrade()
1299 dout(1) << "upgrade" << dendl
;
1301 int r
= version_stamp_is_valid(&version
);
1304 derr
<< "The store_version file doesn't exist." << dendl
;
1313 derr
<< "ObjectStore is old at version " << version
<< ". Please upgrade to firefly v0.80.x, convert your store, and then upgrade." << dendl
;
1317 // nothing necessary in FileStore for v3 -> v4 upgrade; we just need to
1318 // open up DBObjectMap with the do_upgrade flag, which we already did.
1319 update_version_stamp();
1323 int FileStore::read_op_seq(uint64_t *seq
)
1325 int op_fd
= ::open(current_op_seq_fn
.c_str(), O_CREAT
|O_RDWR
, 0644);
1328 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
1332 memset(s
, 0, sizeof(s
));
1333 int ret
= safe_read(op_fd
, s
, sizeof(s
) - 1);
1335 derr
<< "error reading " << current_op_seq_fn
<< ": " << cpp_strerror(ret
) << dendl
;
1336 VOID_TEMP_FAILURE_RETRY(::close(op_fd
));
1337 assert(!m_filestore_fail_eio
|| ret
!= -EIO
);
1344 int FileStore::write_op_seq(int fd
, uint64_t seq
)
1347 snprintf(s
, sizeof(s
), "%" PRId64
"\n", seq
);
1348 int ret
= TEMP_FAILURE_RETRY(::pwrite(fd
, s
, strlen(s
), 0));
1351 assert(!m_filestore_fail_eio
|| ret
!= -EIO
);
1356 int FileStore::mount()
1360 uint64_t initial_op_seq
;
1362 set
<string
> cluster_snaps
;
1363 CompatSet supported_compat_set
= get_fs_supported_compat_set();
1365 dout(5) << "basedir " << basedir
<< " journal " << journalpath
<< dendl
;
1367 ret
= set_throttle_params();
1371 // make sure global base dir exists
1372 if (::access(basedir
.c_str(), R_OK
| W_OK
)) {
1374 derr
<< "FileStore::mount: unable to access basedir '" << basedir
<< "': "
1375 << cpp_strerror(ret
) << dendl
;
1380 snprintf(buf
, sizeof(buf
), "%s/fsid", basedir
.c_str());
1381 fsid_fd
= ::open(buf
, O_RDWR
, 0644);
1384 derr
<< "FileStore::mount: error opening '" << buf
<< "': "
1385 << cpp_strerror(ret
) << dendl
;
1389 ret
= read_fsid(fsid_fd
, &fsid
);
1391 derr
<< "FileStore::mount: error reading fsid_fd: " << cpp_strerror(ret
)
1396 if (lock_fsid() < 0) {
1397 derr
<< "FileStore::mount: lock_fsid failed" << dendl
;
1402 dout(10) << "mount fsid is " << fsid
<< dendl
;
1405 uint32_t version_stamp
;
1406 ret
= version_stamp_is_valid(&version_stamp
);
1408 derr
<< "FileStore::mount: error in version_stamp_is_valid: "
1409 << cpp_strerror(ret
) << dendl
;
1411 } else if (ret
== 0) {
1412 if (do_update
|| (int)version_stamp
< cct
->_conf
->filestore_update_to
) {
1413 derr
<< "FileStore::mount: stale version stamp detected: "
1415 << ". Proceeding, do_update "
1416 << "is set, performing disk format upgrade."
1421 derr
<< "FileStore::mount: stale version stamp " << version_stamp
1422 << ". Please run the FileStore update script before starting the "
1423 << "OSD, or set filestore_update_to to " << target_version
1424 << " (currently " << cct
->_conf
->filestore_update_to
<< ")"
1430 ret
= read_superblock();
1435 // Check if this FileStore supports all the necessary features to mount
1436 if (supported_compat_set
.compare(superblock
.compat_features
) == -1) {
1437 derr
<< "FileStore::mount: Incompatible features set "
1438 << superblock
.compat_features
<< dendl
;
1443 // open some dir handles
1444 basedir_fd
= ::open(basedir
.c_str(), O_RDONLY
);
1445 if (basedir_fd
< 0) {
1447 derr
<< "FileStore::mount: failed to open " << basedir
<< ": "
1448 << cpp_strerror(ret
) << dendl
;
1453 // test for btrfs, xattrs, etc.
1456 derr
<< "FileStore::mount: error in _detect_fs: "
1457 << cpp_strerror(ret
) << dendl
;
1458 goto close_basedir_fd
;
1463 ret
= backend
->list_checkpoints(ls
);
1465 derr
<< "FileStore::mount: error in _list_snaps: "<< cpp_strerror(ret
) << dendl
;
1466 goto close_basedir_fd
;
1469 long long unsigned c
, prev
= 0;
1470 char clustersnap
[NAME_MAX
];
1471 for (list
<string
>::iterator it
= ls
.begin(); it
!= ls
.end(); ++it
) {
1472 if (sscanf(it
->c_str(), COMMIT_SNAP_ITEM
, &c
) == 1) {
1476 } else if (sscanf(it
->c_str(), CLUSTER_SNAP_ITEM
, clustersnap
) == 1)
1477 cluster_snaps
.insert(*it
);
1481 if (m_osd_rollback_to_cluster_snap
.length() &&
1482 cluster_snaps
.count(m_osd_rollback_to_cluster_snap
) == 0) {
1483 derr
<< "rollback to cluster snapshot '" << m_osd_rollback_to_cluster_snap
<< "': not found" << dendl
;
1485 goto close_basedir_fd
;
1489 snprintf(nosnapfn
, sizeof(nosnapfn
), "%s/nosnap", current_fn
.c_str());
1491 if (backend
->can_checkpoint()) {
1492 if (snaps
.empty()) {
1493 dout(0) << "mount WARNING: no consistent snaps found, store may be in inconsistent state" << dendl
;
1496 uint64_t curr_seq
= 0;
1498 if (m_osd_rollback_to_cluster_snap
.length()) {
1500 << " ** NOTE: rolling back to cluster snapshot " << m_osd_rollback_to_cluster_snap
<< " **"
1503 assert(cluster_snaps
.count(m_osd_rollback_to_cluster_snap
));
1504 snprintf(s
, sizeof(s
), CLUSTER_SNAP_ITEM
, m_osd_rollback_to_cluster_snap
.c_str());
1507 int fd
= read_op_seq(&curr_seq
);
1509 VOID_TEMP_FAILURE_RETRY(::close(fd
));
1513 dout(10) << " current/ seq was " << curr_seq
<< dendl
;
1515 dout(10) << " current/ missing entirely (unusual, but okay)" << dendl
;
1517 uint64_t cp
= snaps
.back();
1518 dout(10) << " most recent snap from " << snaps
<< " is " << cp
<< dendl
;
1520 // if current/ is marked as non-snapshotted, refuse to roll
1521 // back (without clear direction) to avoid throwing out new
1524 if (::stat(nosnapfn
, &st
) == 0) {
1525 if (!m_osd_use_stale_snap
) {
1526 derr
<< "ERROR: " << nosnapfn
<< " exists, not rolling back to avoid losing new data" << dendl
;
1527 derr
<< "Force rollback to old snapshotted version with 'osd use stale snap = true'" << dendl
;
1528 derr
<< "config option for --osd-use-stale-snap startup argument." << dendl
;
1530 goto close_basedir_fd
;
1532 derr
<< "WARNING: user forced start with data sequence mismatch: current was " << curr_seq
1533 << ", newest snap is " << cp
<< dendl
;
1535 << " ** WARNING: forcing the use of stale snapshot data **"
1536 << TEXT_NORMAL
<< std::endl
;
1539 dout(10) << "mount rolling back to consistent snap " << cp
<< dendl
;
1540 snprintf(s
, sizeof(s
), COMMIT_SNAP_ITEM
, (long long unsigned)cp
);
1544 ret
= backend
->rollback_to(s
);
1546 derr
<< "FileStore::mount: error rolling back to " << s
<< ": "
1547 << cpp_strerror(ret
) << dendl
;
1548 goto close_basedir_fd
;
1554 current_fd
= ::open(current_fn
.c_str(), O_RDONLY
);
1555 if (current_fd
< 0) {
1557 derr
<< "FileStore::mount: error opening: " << current_fn
<< ": " << cpp_strerror(ret
) << dendl
;
1558 goto close_basedir_fd
;
1561 assert(current_fd
>= 0);
1563 op_fd
= read_op_seq(&initial_op_seq
);
1566 derr
<< "FileStore::mount: read_op_seq failed" << dendl
;
1567 goto close_current_fd
;
1570 dout(5) << "mount op_seq is " << initial_op_seq
<< dendl
;
1571 if (initial_op_seq
== 0) {
1572 derr
<< "mount initial op seq is 0; something is wrong" << dendl
;
1574 goto close_current_fd
;
1577 if (!backend
->can_checkpoint()) {
1578 // mark current/ as non-snapshotted so that we don't rollback away
1580 int r
= ::creat(nosnapfn
, 0644);
1583 derr
<< "FileStore::mount: failed to create current/nosnap" << dendl
;
1584 goto close_current_fd
;
1586 VOID_TEMP_FAILURE_RETRY(::close(r
));
1588 // clear nosnap marker, if present.
1592 // check fsid with omap
1595 char omap_fsid_buf
[PATH_MAX
];
1596 struct ::stat omap_fsid_stat
;
1597 snprintf(omap_fsid_buf
, sizeof(omap_fsid_buf
), "%s/osd_uuid", omap_dir
.c_str());
1598 // if osd_uuid not exists, assume as this omap matchs corresponding osd
1599 if (::stat(omap_fsid_buf
, &omap_fsid_stat
) != 0){
1600 dout(10) << "Filestore::mount osd_uuid not found under omap, "
1601 << "assume as matched."
1604 // if osd_uuid exists, compares osd_uuid with fsid
1605 omap_fsid_fd
= ::open(omap_fsid_buf
, O_RDONLY
, 0644);
1606 if (omap_fsid_fd
< 0) {
1608 derr
<< "FileStore::mount: error opening '" << omap_fsid_buf
<< "': "
1609 << cpp_strerror(ret
)
1611 goto close_current_fd
;
1613 ret
= read_fsid(omap_fsid_fd
, &omap_fsid
);
1614 VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd
));
1615 omap_fsid_fd
= -1; // defensive
1617 derr
<< "FileStore::mount: error reading omap_fsid_fd"
1618 << ", omap_fsid = " << omap_fsid
1619 << cpp_strerror(ret
)
1621 goto close_current_fd
;
1623 if (fsid
!= omap_fsid
) {
1624 derr
<< "FileStore::mount: " << omap_fsid_buf
1625 << " has existed omap fsid " << omap_fsid
1626 << " != expected osd fsid " << fsid
1629 goto close_current_fd
;
1633 dout(0) << "start omap initiation" << dendl
;
1634 if (!(generic_flags
& SKIP_MOUNT_OMAP
)) {
1635 KeyValueDB
* omap_store
= KeyValueDB::create(cct
,
1636 superblock
.omap_backend
,
1638 if (omap_store
== NULL
)
1640 derr
<< "Error creating " << superblock
.omap_backend
<< dendl
;
1642 goto close_current_fd
;
1645 if (superblock
.omap_backend
== "rocksdb")
1646 ret
= omap_store
->init(cct
->_conf
->filestore_rocksdb_options
);
1648 ret
= omap_store
->init();
1651 derr
<< "Error initializing omap_store: " << cpp_strerror(ret
) << dendl
;
1652 goto close_current_fd
;
1656 if (omap_store
->create_and_open(err
)) {
1658 derr
<< "Error initializing " << superblock
.omap_backend
1659 << " : " << err
.str() << dendl
;
1661 goto close_current_fd
;
1664 DBObjectMap
*dbomap
= new DBObjectMap(cct
, omap_store
);
1665 ret
= dbomap
->init(do_update
);
1668 derr
<< "Error initializing DBObjectMap: " << ret
<< dendl
;
1669 goto close_current_fd
;
1673 if (cct
->_conf
->filestore_debug_omap_check
&& !dbomap
->check(err2
)) {
1674 derr
<< err2
.str() << dendl
;
1677 goto close_current_fd
;
1679 object_map
.reset(dbomap
);
1685 // select journal mode?
1687 if (!m_filestore_journal_writeahead
&&
1688 !m_filestore_journal_parallel
&&
1689 !m_filestore_journal_trailing
) {
1690 if (!backend
->can_checkpoint()) {
1691 m_filestore_journal_writeahead
= true;
1692 dout(0) << "mount: enabling WRITEAHEAD journal mode: checkpoint is not enabled" << dendl
;
1694 m_filestore_journal_parallel
= true;
1695 dout(0) << "mount: enabling PARALLEL journal mode: fs, checkpoint is enabled" << dendl
;
1698 if (m_filestore_journal_writeahead
)
1699 dout(0) << "mount: WRITEAHEAD journal mode explicitly enabled in conf" << dendl
;
1700 if (m_filestore_journal_parallel
)
1701 dout(0) << "mount: PARALLEL journal mode explicitly enabled in conf" << dendl
;
1702 if (m_filestore_journal_trailing
)
1703 dout(0) << "mount: TRAILING journal mode explicitly enabled in conf" << dendl
;
1705 if (m_filestore_journal_writeahead
)
1706 journal
->set_wait_on_full(true);
1708 dout(0) << "mount: no journal" << dendl
;
1711 ret
= _sanity_check_fs();
1713 derr
<< "FileStore::mount: _sanity_check_fs failed with error "
1715 goto close_current_fd
;
1718 // Cleanup possibly invalid collections
1720 vector
<coll_t
> collections
;
1721 ret
= list_collections(collections
, true);
1723 derr
<< "Error " << ret
<< " while listing collections" << dendl
;
1724 goto close_current_fd
;
1726 for (vector
<coll_t
>::iterator i
= collections
.begin();
1727 i
!= collections
.end();
1730 ret
= get_index(*i
, &index
);
1732 derr
<< "Unable to mount index " << *i
1733 << " with error: " << ret
<< dendl
;
1734 goto close_current_fd
;
1736 assert(NULL
!= index
.index
);
1737 RWLock::WLocker
l((index
.index
)->access_lock
);
1742 if (!m_disable_wbthrottle
) {
1745 dout(0) << "mount INFO: WbThrottle is disabled" << dendl
;
1746 if (cct
->_conf
->filestore_odsync_write
) {
1747 dout(0) << "mount INFO: O_DSYNC write is enabled" << dendl
;
1750 sync_thread
.create("filestore_sync");
1752 if (!(generic_flags
& SKIP_JOURNAL_REPLAY
)) {
1753 ret
= journal_replay(initial_op_seq
);
1755 derr
<< "mount failed to open journal " << journalpath
<< ": " << cpp_strerror(ret
) << dendl
;
1756 if (ret
== -ENOTTY
) {
1757 derr
<< "maybe journal is not pointing to a block device and its size "
1758 << "wasn't configured?" << dendl
;
1767 if (cct
->_conf
->filestore_debug_omap_check
&& !object_map
->check(err2
)) {
1768 derr
<< err2
.str() << dendl
;
1774 init_temp_collections();
1779 for (vector
<Finisher
*>::iterator it
= ondisk_finishers
.begin(); it
!= ondisk_finishers
.end(); ++it
) {
1782 for (vector
<Finisher
*>::iterator it
= apply_finishers
.begin(); it
!= apply_finishers
.end(); ++it
) {
1789 if (cct
->_conf
->filestore_update_to
>= (int)get_target_version()) {
1790 int err
= upgrade();
1792 derr
<< "error converting store" << dendl
;
1808 if (!m_disable_wbthrottle
) {
1812 VOID_TEMP_FAILURE_RETRY(::close(current_fd
));
1815 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd
));
1818 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd
));
1821 assert(!m_filestore_fail_eio
|| ret
!= -EIO
);
1828 void FileStore::init_temp_collections()
1830 dout(10) << __func__
<< dendl
;
1832 int r
= list_collections(ls
, true);
1835 dout(20) << " ls " << ls
<< dendl
;
1837 SequencerPosition spos
;
1840 for (vector
<coll_t
>::iterator p
= ls
.begin(); p
!= ls
.end(); ++p
)
1843 dout(20) << " temps " << temps
<< dendl
;
1845 for (vector
<coll_t
>::iterator p
= ls
.begin(); p
!= ls
.end(); ++p
) {
1850 coll_t temp
= p
->get_temp();
1851 if (temps
.count(temp
)) {
1854 dout(10) << __func__
<< " creating " << temp
<< dendl
;
1855 r
= _create_collection(temp
, 0, spos
);
1860 for (set
<coll_t
>::iterator p
= temps
.begin(); p
!= temps
.end(); ++p
) {
1861 dout(10) << __func__
<< " removing stray " << *p
<< dendl
;
1862 r
= _collection_remove_recursive(*p
, spos
);
1867 int FileStore::umount()
1869 dout(5) << "umount " << basedir
<< dendl
;
1880 if (!m_disable_wbthrottle
){
1886 if (!(generic_flags
& SKIP_JOURNAL_REPLAY
))
1887 journal_write_close();
1889 for (vector
<Finisher
*>::iterator it
= ondisk_finishers
.begin(); it
!= ondisk_finishers
.end(); ++it
) {
1892 for (vector
<Finisher
*>::iterator it
= apply_finishers
.begin(); it
!= apply_finishers
.end(); ++it
) {
1897 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd
));
1901 VOID_TEMP_FAILURE_RETRY(::close(op_fd
));
1904 if (current_fd
>= 0) {
1905 VOID_TEMP_FAILURE_RETRY(::close(current_fd
));
1908 if (basedir_fd
>= 0) {
1909 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd
));
1921 Mutex::Locker
l(sync_entry_timeo_lock
);
1932 /// -----------------------------
1934 FileStore::Op
*FileStore::build_op(vector
<Transaction
>& tls
,
1935 Context
*onreadable
,
1936 Context
*onreadable_sync
,
1937 TrackedOpRef osd_op
)
1939 uint64_t bytes
= 0, ops
= 0;
1940 for (vector
<Transaction
>::iterator p
= tls
.begin();
1943 bytes
+= (*p
).get_num_bytes();
1944 ops
+= (*p
).get_num_ops();
1948 o
->start
= ceph_clock_now();
1949 o
->tls
= std::move(tls
);
1950 o
->onreadable
= onreadable
;
1951 o
->onreadable_sync
= onreadable_sync
;
1960 void FileStore::queue_op(OpSequencer
*osr
, Op
*o
)
1962 // queue op on sequencer, then queue sequencer for the threadpool,
1963 // so that regardless of which order the threads pick up the
1964 // sequencer, the op order will be preserved.
1967 o
->trace
.event("queued");
1969 logger
->inc(l_filestore_ops
);
1970 logger
->inc(l_filestore_bytes
, o
->bytes
);
1972 dout(5) << "queue_op " << o
<< " seq " << o
->op
1974 << " " << o
->bytes
<< " bytes"
1975 << " (queue has " << throttle_ops
.get_current() << " ops and " << throttle_bytes
.get_current() << " bytes)"
1980 void FileStore::op_queue_reserve_throttle(Op
*o
)
1983 throttle_bytes
.get(o
->bytes
);
1985 logger
->set(l_filestore_op_queue_ops
, throttle_ops
.get_current());
1986 logger
->set(l_filestore_op_queue_bytes
, throttle_bytes
.get_current());
1989 void FileStore::op_queue_release_throttle(Op
*o
)
1992 throttle_bytes
.put(o
->bytes
);
1993 logger
->set(l_filestore_op_queue_ops
, throttle_ops
.get_current());
1994 logger
->set(l_filestore_op_queue_bytes
, throttle_bytes
.get_current());
1997 void FileStore::_do_op(OpSequencer
*osr
, ThreadPool::TPHandle
&handle
)
1999 if (!m_disable_wbthrottle
) {
2000 wbthrottle
.throttle();
2003 if (cct
->_conf
->filestore_inject_stall
) {
2004 int orig
= cct
->_conf
->filestore_inject_stall
;
2005 dout(5) << "_do_op filestore_inject_stall " << orig
<< ", sleeping" << dendl
;
2007 cct
->_conf
->set_val("filestore_inject_stall", "0");
2008 dout(5) << "_do_op done stalling" << dendl
;
2011 osr
->apply_lock
.Lock();
2012 Op
*o
= osr
->peek_queue();
2013 o
->trace
.event("op_apply_start");
2014 apply_manager
.op_apply_start(o
->op
);
2015 dout(5) << "_do_op " << o
<< " seq " << o
->op
<< " " << *osr
<< "/" << osr
->parent
<< " start" << dendl
;
2016 o
->trace
.event("_do_transactions start");
2017 int r
= _do_transactions(o
->tls
, o
->op
, &handle
);
2018 o
->trace
.event("op_apply_finish");
2019 apply_manager
.op_apply_finish(o
->op
);
2020 dout(10) << "_do_op " << o
<< " seq " << o
->op
<< " r = " << r
2021 << ", finisher " << o
->onreadable
<< " " << o
->onreadable_sync
<< dendl
;
2027 void FileStore::_finish_op(OpSequencer
*osr
)
2029 list
<Context
*> to_queue
;
2030 Op
*o
= osr
->dequeue(&to_queue
);
2032 utime_t lat
= ceph_clock_now();
2035 dout(10) << "_finish_op " << o
<< " seq " << o
->op
<< " " << *osr
<< "/" << osr
->parent
<< " lat " << lat
<< dendl
;
2036 osr
->apply_lock
.Unlock(); // locked in _do_op
2037 o
->trace
.event("_finish_op");
2039 // called with tp lock held
2040 op_queue_release_throttle(o
);
2042 logger
->tinc(l_filestore_apply_latency
, lat
);
2044 if (o
->onreadable_sync
) {
2045 o
->onreadable_sync
->complete(0);
2047 if (o
->onreadable
) {
2048 apply_finishers
[osr
->id
% m_apply_finisher_num
]->queue(o
->onreadable
);
2050 if (!to_queue
.empty()) {
2051 apply_finishers
[osr
->id
% m_apply_finisher_num
]->queue(to_queue
);
2057 struct C_JournaledAhead
: public Context
{
2059 FileStore::OpSequencer
*osr
;
2063 C_JournaledAhead(FileStore
*f
, FileStore::OpSequencer
*os
, FileStore::Op
*o
, Context
*ondisk
):
2064 fs(f
), osr(os
), o(o
), ondisk(ondisk
) { }
2065 void finish(int r
) override
{
2066 fs
->_journaled_ahead(osr
, o
, ondisk
);
2070 int FileStore::queue_transactions(Sequencer
*posr
, vector
<Transaction
>& tls
,
2071 TrackedOpRef osd_op
,
2072 ThreadPool::TPHandle
*handle
)
2074 Context
*onreadable
;
2076 Context
*onreadable_sync
;
2077 ObjectStore::Transaction::collect_contexts(
2078 tls
, &onreadable
, &ondisk
, &onreadable_sync
);
2080 if (cct
->_conf
->objectstore_blackhole
) {
2081 dout(0) << __func__
<< " objectstore_blackhole = TRUE, dropping transaction"
2085 delete onreadable_sync
;
2089 utime_t start
= ceph_clock_now();
2090 // set up the sequencer
2094 osr
= static_cast<OpSequencer
*>(posr
->p
.get());
2095 dout(5) << "queue_transactions existing " << osr
<< " " << *osr
<< dendl
;
2097 osr
= new OpSequencer(cct
, next_osr_id
.inc());
2101 dout(5) << "queue_transactions new " << osr
<< " " << *osr
<< dendl
;
2104 // used to include osr information in tracepoints during transaction apply
2105 for (vector
<Transaction
>::iterator i
= tls
.begin(); i
!= tls
.end(); ++i
) {
2109 ZTracer::Trace trace
;
2110 if (osd_op
&& osd_op
->pg_trace
) {
2111 osd_op
->store_trace
.init("filestore op", &trace_endpoint
, &osd_op
->pg_trace
);
2112 trace
= osd_op
->store_trace
;
2115 if (journal
&& journal
->is_writeable() && !m_filestore_journal_trailing
) {
2116 Op
*o
= build_op(tls
, onreadable
, onreadable_sync
, osd_op
);
2118 //prepare and encode transactions data out of lock
2120 int orig_len
= journal
->prepare_entry(o
->tls
, &tbl
);
2123 handle
->suspend_tp_timeout();
2125 op_queue_reserve_throttle(o
);
2126 journal
->reserve_throttle_and_backoff(tbl
.length());
2129 handle
->reset_tp_timeout();
2131 uint64_t op_num
= submit_manager
.op_submit_start();
2133 trace
.keyval("opnum", op_num
);
2135 if (m_filestore_do_dump
)
2136 dump_transactions(o
->tls
, o
->op
, osr
);
2138 if (m_filestore_journal_parallel
) {
2139 dout(5) << "queue_transactions (parallel) " << o
->op
<< " " << o
->tls
<< dendl
;
2141 trace
.keyval("journal mode", "parallel");
2142 trace
.event("journal started");
2143 _op_journal_transactions(tbl
, orig_len
, o
->op
, ondisk
, osd_op
);
2145 // queue inside submit_manager op submission lock
2147 trace
.event("op queued");
2148 } else if (m_filestore_journal_writeahead
) {
2149 dout(5) << "queue_transactions (writeahead) " << o
->op
<< " " << o
->tls
<< dendl
;
2151 osr
->queue_journal(o
->op
);
2153 trace
.keyval("journal mode", "writeahead");
2154 trace
.event("journal started");
2155 _op_journal_transactions(tbl
, orig_len
, o
->op
,
2156 new C_JournaledAhead(this, osr
, o
, ondisk
),
2161 submit_manager
.op_submit_finish(op_num
);
2162 utime_t end
= ceph_clock_now();
2163 logger
->tinc(l_filestore_queue_transaction_latency_avg
, end
- start
);
2168 Op
*o
= build_op(tls
, onreadable
, onreadable_sync
, osd_op
);
2169 dout(5) << __func__
<< " (no journal) " << o
<< " " << tls
<< dendl
;
2172 handle
->suspend_tp_timeout();
2174 op_queue_reserve_throttle(o
);
2177 handle
->reset_tp_timeout();
2179 uint64_t op_num
= submit_manager
.op_submit_start();
2182 if (m_filestore_do_dump
)
2183 dump_transactions(o
->tls
, o
->op
, osr
);
2186 trace
.keyval("opnum", op_num
);
2187 trace
.keyval("journal mode", "none");
2188 trace
.event("op queued");
2191 apply_manager
.add_waiter(op_num
, ondisk
);
2192 submit_manager
.op_submit_finish(op_num
);
2193 utime_t end
= ceph_clock_now();
2194 logger
->tinc(l_filestore_queue_transaction_latency_avg
, end
- start
);
2199 //prepare and encode transactions data out of lock
2202 if (journal
->is_writeable()) {
2203 orig_len
= journal
->prepare_entry(tls
, &tbl
);
2205 uint64_t op
= submit_manager
.op_submit_start();
2206 dout(5) << "queue_transactions (trailing journal) " << op
<< " " << tls
<< dendl
;
2208 if (m_filestore_do_dump
)
2209 dump_transactions(tls
, op
, osr
);
2211 trace
.event("op_apply_start");
2212 trace
.keyval("opnum", op
);
2213 trace
.keyval("journal mode", "trailing");
2214 apply_manager
.op_apply_start(op
);
2215 trace
.event("do_transactions");
2216 int r
= do_transactions(tls
, op
);
2219 trace
.event("journal started");
2220 _op_journal_transactions(tbl
, orig_len
, op
, ondisk
, osd_op
);
2225 // start on_readable finisher after we queue journal item, as on_readable callback
2226 // is allowed to delete the Transaction
2227 if (onreadable_sync
) {
2228 onreadable_sync
->complete(r
);
2230 apply_finishers
[osr
->id
% m_apply_finisher_num
]->queue(onreadable
, r
);
2232 submit_manager
.op_submit_finish(op
);
2233 trace
.event("op_apply_finish");
2234 apply_manager
.op_apply_finish(op
);
2236 utime_t end
= ceph_clock_now();
2237 logger
->tinc(l_filestore_queue_transaction_latency_avg
, end
- start
);
2241 void FileStore::_journaled_ahead(OpSequencer
*osr
, Op
*o
, Context
*ondisk
)
2243 dout(5) << "_journaled_ahead " << o
<< " seq " << o
->op
<< " " << *osr
<< " " << o
->tls
<< dendl
;
2245 o
->trace
.event("writeahead journal finished");
2247 // this should queue in order because the journal does it's completions in order.
2250 list
<Context
*> to_queue
;
2251 osr
->dequeue_journal(&to_queue
);
2253 // do ondisk completions async, to prevent any onreadable_sync completions
2254 // getting blocked behind an ondisk completion.
2256 dout(10) << " queueing ondisk " << ondisk
<< dendl
;
2257 ondisk_finishers
[osr
->id
% m_ondisk_finisher_num
]->queue(ondisk
);
2259 if (!to_queue
.empty()) {
2260 ondisk_finishers
[osr
->id
% m_ondisk_finisher_num
]->queue(to_queue
);
2264 int FileStore::_do_transactions(
2265 vector
<Transaction
> &tls
,
2267 ThreadPool::TPHandle
*handle
)
2271 for (vector
<Transaction
>::iterator p
= tls
.begin();
2274 _do_transaction(*p
, op_seq
, trans_num
, handle
);
2276 handle
->reset_tp_timeout();
2282 void FileStore::_set_global_replay_guard(const coll_t
& cid
,
2283 const SequencerPosition
&spos
)
2285 if (backend
->can_checkpoint())
2288 // sync all previous operations on this sequencer
2289 int ret
= object_map
->sync();
2291 derr
<< __func__
<< " : omap sync error " << cpp_strerror(ret
) << dendl
;
2292 assert(0 == "_set_global_replay_guard failed");
2294 ret
= sync_filesystem(basedir_fd
);
2296 derr
<< __func__
<< " : sync_filesystem error " << cpp_strerror(ret
) << dendl
;
2297 assert(0 == "_set_global_replay_guard failed");
2301 get_cdir(cid
, fn
, sizeof(fn
));
2302 int fd
= ::open(fn
, O_RDONLY
);
2305 derr
<< __func__
<< ": " << cid
<< " error " << cpp_strerror(err
) << dendl
;
2306 assert(0 == "_set_global_replay_guard failed");
2311 // then record that we did it
2314 int r
= chain_fsetxattr
<true, true>(
2315 fd
, GLOBAL_REPLAY_GUARD_XATTR
, v
.c_str(), v
.length());
2317 derr
<< __func__
<< ": fsetxattr " << GLOBAL_REPLAY_GUARD_XATTR
2318 << " got " << cpp_strerror(r
) << dendl
;
2319 assert(0 == "fsetxattr failed");
2322 // and make sure our xattr is durable.
2327 VOID_TEMP_FAILURE_RETRY(::close(fd
));
2328 dout(10) << __func__
<< ": " << spos
<< " done" << dendl
;
2331 int FileStore::_check_global_replay_guard(const coll_t
& cid
,
2332 const SequencerPosition
& spos
)
2335 get_cdir(cid
, fn
, sizeof(fn
));
2336 int fd
= ::open(fn
, O_RDONLY
);
2338 dout(10) << __func__
<< ": " << cid
<< " dne" << dendl
;
2339 return 1; // if collection does not exist, there is no guard, and we can replay.
2343 int r
= chain_fgetxattr(fd
, GLOBAL_REPLAY_GUARD_XATTR
, buf
, sizeof(buf
));
2345 dout(20) << __func__
<< " no xattr" << dendl
;
2346 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
2347 VOID_TEMP_FAILURE_RETRY(::close(fd
));
2348 return 1; // no xattr
2353 SequencerPosition opos
;
2354 bufferlist::iterator p
= bl
.begin();
2357 VOID_TEMP_FAILURE_RETRY(::close(fd
));
2358 return spos
>= opos
? 1 : -1;
2362 void FileStore::_set_replay_guard(const coll_t
& cid
,
2363 const SequencerPosition
&spos
,
2364 bool in_progress
=false)
2367 get_cdir(cid
, fn
, sizeof(fn
));
2368 int fd
= ::open(fn
, O_RDONLY
);
2371 derr
<< "_set_replay_guard " << cid
<< " error " << cpp_strerror(err
) << dendl
;
2372 assert(0 == "_set_replay_guard failed");
2374 _set_replay_guard(fd
, spos
, 0, in_progress
);
2375 VOID_TEMP_FAILURE_RETRY(::close(fd
));
2379 void FileStore::_set_replay_guard(int fd
,
2380 const SequencerPosition
& spos
,
2381 const ghobject_t
*hoid
,
2384 if (backend
->can_checkpoint())
2387 dout(10) << "_set_replay_guard " << spos
<< (in_progress
? " START" : "") << dendl
;
2391 // first make sure the previous operation commits
2395 // sync object_map too. even if this object has a header or keys,
2396 // it have had them in the past and then removed them, so always
2398 object_map
->sync(hoid
, &spos
);
2403 // then record that we did it
2406 ::encode(in_progress
, v
);
2407 int r
= chain_fsetxattr
<true, true>(
2408 fd
, REPLAY_GUARD_XATTR
, v
.c_str(), v
.length());
2410 derr
<< "fsetxattr " << REPLAY_GUARD_XATTR
<< " got " << cpp_strerror(r
) << dendl
;
2411 assert(0 == "fsetxattr failed");
2414 // and make sure our xattr is durable.
2419 dout(10) << "_set_replay_guard " << spos
<< " done" << dendl
;
2422 void FileStore::_close_replay_guard(const coll_t
& cid
,
2423 const SequencerPosition
&spos
)
2426 get_cdir(cid
, fn
, sizeof(fn
));
2427 int fd
= ::open(fn
, O_RDONLY
);
2430 derr
<< "_close_replay_guard " << cid
<< " error " << cpp_strerror(err
) << dendl
;
2431 assert(0 == "_close_replay_guard failed");
2433 _close_replay_guard(fd
, spos
);
2434 VOID_TEMP_FAILURE_RETRY(::close(fd
));
2437 void FileStore::_close_replay_guard(int fd
, const SequencerPosition
& spos
,
2438 const ghobject_t
*hoid
)
2440 if (backend
->can_checkpoint())
2443 dout(10) << "_close_replay_guard " << spos
<< dendl
;
2447 // sync object_map too. even if this object has a header or keys,
2448 // it have had them in the past and then removed them, so always
2450 object_map
->sync(hoid
, &spos
);
2452 // then record that we are done with this operation
2455 bool in_progress
= false;
2456 ::encode(in_progress
, v
);
2457 int r
= chain_fsetxattr
<true, true>(
2458 fd
, REPLAY_GUARD_XATTR
, v
.c_str(), v
.length());
2460 derr
<< "fsetxattr " << REPLAY_GUARD_XATTR
<< " got " << cpp_strerror(r
) << dendl
;
2461 assert(0 == "fsetxattr failed");
2464 // and make sure our xattr is durable.
2469 dout(10) << "_close_replay_guard " << spos
<< " done" << dendl
;
2472 int FileStore::_check_replay_guard(const coll_t
& cid
, const ghobject_t
&oid
,
2473 const SequencerPosition
& spos
)
2475 if (!replaying
|| backend
->can_checkpoint())
2478 int r
= _check_global_replay_guard(cid
, spos
);
2483 r
= lfn_open(cid
, oid
, false, &fd
);
2485 dout(10) << "_check_replay_guard " << cid
<< " " << oid
<< " dne" << dendl
;
2486 return 1; // if file does not exist, there is no guard, and we can replay.
2488 int ret
= _check_replay_guard(**fd
, spos
);
2493 int FileStore::_check_replay_guard(const coll_t
& cid
, const SequencerPosition
& spos
)
2495 if (!replaying
|| backend
->can_checkpoint())
2499 get_cdir(cid
, fn
, sizeof(fn
));
2500 int fd
= ::open(fn
, O_RDONLY
);
2502 dout(10) << "_check_replay_guard " << cid
<< " dne" << dendl
;
2503 return 1; // if collection does not exist, there is no guard, and we can replay.
2505 int ret
= _check_replay_guard(fd
, spos
);
2506 VOID_TEMP_FAILURE_RETRY(::close(fd
));
2510 int FileStore::_check_replay_guard(int fd
, const SequencerPosition
& spos
)
2512 if (!replaying
|| backend
->can_checkpoint())
2516 int r
= chain_fgetxattr(fd
, REPLAY_GUARD_XATTR
, buf
, sizeof(buf
));
2518 dout(20) << "_check_replay_guard no xattr" << dendl
;
2519 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
2520 return 1; // no xattr
2525 SequencerPosition opos
;
2526 bufferlist::iterator p
= bl
.begin();
2528 bool in_progress
= false;
2529 if (!p
.end()) // older journals don't have this
2530 ::decode(in_progress
, p
);
2532 dout(10) << "_check_replay_guard object has " << opos
<< " > current pos " << spos
2533 << ", now or in future, SKIPPING REPLAY" << dendl
;
2535 } else if (opos
== spos
) {
2537 dout(10) << "_check_replay_guard object has " << opos
<< " == current pos " << spos
2538 << ", in_progress=true, CONDITIONAL REPLAY" << dendl
;
2541 dout(10) << "_check_replay_guard object has " << opos
<< " == current pos " << spos
2542 << ", in_progress=false, SKIPPING REPLAY" << dendl
;
2546 dout(10) << "_check_replay_guard object has " << opos
<< " < current pos " << spos
2547 << ", in past, will replay" << dendl
;
2552 void FileStore::_do_transaction(
2553 Transaction
& t
, uint64_t op_seq
, int trans_num
,
2554 ThreadPool::TPHandle
*handle
)
2556 dout(10) << "_do_transaction on " << &t
<< dendl
;
2559 const char *osr_name
= t
.get_osr() ? static_cast<OpSequencer
*>(t
.get_osr())->get_name().c_str() : "<NULL>";
2562 Transaction::iterator i
= t
.begin();
2564 SequencerPosition
spos(op_seq
, trans_num
, 0);
2565 while (i
.have_op()) {
2567 handle
->reset_tp_timeout();
2569 Transaction::Op
*op
= i
.decode_op();
2575 case Transaction::OP_NOP
:
2577 case Transaction::OP_TOUCH
:
2579 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2580 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2581 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2582 _cid
: _cid
.get_temp();
2583 tracepoint(objectstore
, touch_enter
, osr_name
);
2584 if (_check_replay_guard(cid
, oid
, spos
) > 0)
2585 r
= _touch(cid
, oid
);
2586 tracepoint(objectstore
, touch_exit
, r
);
2590 case Transaction::OP_WRITE
:
2592 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2593 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2594 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2595 _cid
: _cid
.get_temp();
2596 uint64_t off
= op
->off
;
2597 uint64_t len
= op
->len
;
2598 uint32_t fadvise_flags
= i
.get_fadvise_flags();
2601 tracepoint(objectstore
, write_enter
, osr_name
, off
, len
);
2602 if (_check_replay_guard(cid
, oid
, spos
) > 0)
2603 r
= _write(cid
, oid
, off
, len
, bl
, fadvise_flags
);
2604 tracepoint(objectstore
, write_exit
, r
);
2608 case Transaction::OP_ZERO
:
2610 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2611 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2612 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2613 _cid
: _cid
.get_temp();
2614 uint64_t off
= op
->off
;
2615 uint64_t len
= op
->len
;
2616 tracepoint(objectstore
, zero_enter
, osr_name
, off
, len
);
2617 if (_check_replay_guard(cid
, oid
, spos
) > 0)
2618 r
= _zero(cid
, oid
, off
, len
);
2619 tracepoint(objectstore
, zero_exit
, r
);
2623 case Transaction::OP_TRIMCACHE
:
2625 // deprecated, no-op
2629 case Transaction::OP_TRUNCATE
:
2631 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2632 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2633 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2634 _cid
: _cid
.get_temp();
2635 uint64_t off
= op
->off
;
2636 tracepoint(objectstore
, truncate_enter
, osr_name
, off
);
2637 if (_check_replay_guard(cid
, oid
, spos
) > 0)
2638 r
= _truncate(cid
, oid
, off
);
2639 tracepoint(objectstore
, truncate_exit
, r
);
2643 case Transaction::OP_REMOVE
:
2645 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2646 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2647 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2648 _cid
: _cid
.get_temp();
2649 tracepoint(objectstore
, remove_enter
, osr_name
);
2650 if (_check_replay_guard(cid
, oid
, spos
) > 0)
2651 r
= _remove(cid
, oid
, spos
);
2652 tracepoint(objectstore
, remove_exit
, r
);
2656 case Transaction::OP_SETATTR
:
2658 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2659 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2660 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2661 _cid
: _cid
.get_temp();
2662 string name
= i
.decode_string();
2665 tracepoint(objectstore
, setattr_enter
, osr_name
);
2666 if (_check_replay_guard(cid
, oid
, spos
) > 0) {
2667 map
<string
, bufferptr
> to_set
;
2668 to_set
[name
] = bufferptr(bl
.c_str(), bl
.length());
2669 r
= _setattrs(cid
, oid
, to_set
, spos
);
2671 dout(0) << " ENOSPC on setxattr on " << cid
<< "/" << oid
2672 << " name " << name
<< " size " << bl
.length() << dendl
;
2674 tracepoint(objectstore
, setattr_exit
, r
);
2678 case Transaction::OP_SETATTRS
:
2680 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2681 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2682 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2683 _cid
: _cid
.get_temp();
2684 map
<string
, bufferptr
> aset
;
2685 i
.decode_attrset(aset
);
2686 tracepoint(objectstore
, setattrs_enter
, osr_name
);
2687 if (_check_replay_guard(cid
, oid
, spos
) > 0)
2688 r
= _setattrs(cid
, oid
, aset
, spos
);
2689 tracepoint(objectstore
, setattrs_exit
, r
);
2691 dout(0) << " ENOSPC on setxattrs on " << cid
<< "/" << oid
<< dendl
;
2695 case Transaction::OP_RMATTR
:
2697 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2698 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2699 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2700 _cid
: _cid
.get_temp();
2701 string name
= i
.decode_string();
2702 tracepoint(objectstore
, rmattr_enter
, osr_name
);
2703 if (_check_replay_guard(cid
, oid
, spos
) > 0)
2704 r
= _rmattr(cid
, oid
, name
.c_str(), spos
);
2705 tracepoint(objectstore
, rmattr_exit
, r
);
2709 case Transaction::OP_RMATTRS
:
2711 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2712 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2713 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2714 _cid
: _cid
.get_temp();
2715 tracepoint(objectstore
, rmattrs_enter
, osr_name
);
2716 if (_check_replay_guard(cid
, oid
, spos
) > 0)
2717 r
= _rmattrs(cid
, oid
, spos
);
2718 tracepoint(objectstore
, rmattrs_exit
, r
);
2722 case Transaction::OP_CLONE
:
2724 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2725 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2726 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2727 _cid
: _cid
.get_temp();
2728 const ghobject_t
&noid
= i
.get_oid(op
->dest_oid
);
2729 tracepoint(objectstore
, clone_enter
, osr_name
);
2730 r
= _clone(cid
, oid
, noid
, spos
);
2731 tracepoint(objectstore
, clone_exit
, r
);
2735 case Transaction::OP_CLONERANGE
:
2737 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2738 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2739 const ghobject_t
&noid
= i
.get_oid(op
->dest_oid
);
2740 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2741 _cid
: _cid
.get_temp();
2742 const coll_t
&ncid
= !_need_temp_object_collection(_cid
, noid
) ?
2743 _cid
: _cid
.get_temp();
2744 uint64_t off
= op
->off
;
2745 uint64_t len
= op
->len
;
2746 tracepoint(objectstore
, clone_range_enter
, osr_name
, len
);
2747 r
= _clone_range(cid
, oid
, ncid
, noid
, off
, len
, off
, spos
);
2748 tracepoint(objectstore
, clone_range_exit
, r
);
2752 case Transaction::OP_CLONERANGE2
:
2754 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2755 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2756 const ghobject_t
&noid
= i
.get_oid(op
->dest_oid
);
2757 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2758 _cid
: _cid
.get_temp();
2759 const coll_t
&ncid
= !_need_temp_object_collection(_cid
, noid
) ?
2760 _cid
: _cid
.get_temp();
2761 uint64_t srcoff
= op
->off
;
2762 uint64_t len
= op
->len
;
2763 uint64_t dstoff
= op
->dest_off
;
2764 tracepoint(objectstore
, clone_range2_enter
, osr_name
, len
);
2765 r
= _clone_range(cid
, oid
, ncid
, noid
, srcoff
, len
, dstoff
, spos
);
2766 tracepoint(objectstore
, clone_range2_exit
, r
);
2770 case Transaction::OP_MKCOLL
:
2772 const coll_t
&cid
= i
.get_cid(op
->cid
);
2773 tracepoint(objectstore
, mkcoll_enter
, osr_name
);
2774 if (_check_replay_guard(cid
, spos
) > 0)
2775 r
= _create_collection(cid
, op
->split_bits
, spos
);
2776 tracepoint(objectstore
, mkcoll_exit
, r
);
2780 case Transaction::OP_COLL_SET_BITS
:
2782 const coll_t
&cid
= i
.get_cid(op
->cid
);
2783 int bits
= op
->split_bits
;
2784 r
= _collection_set_bits(cid
, bits
);
2788 case Transaction::OP_COLL_HINT
:
2790 const coll_t
&cid
= i
.get_cid(op
->cid
);
2791 uint32_t type
= op
->hint_type
;
2794 bufferlist::iterator hiter
= hint
.begin();
2795 if (type
== Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS
) {
2798 ::decode(pg_num
, hiter
);
2799 ::decode(num_objs
, hiter
);
2800 if (_check_replay_guard(cid
, spos
) > 0) {
2801 r
= _collection_hint_expected_num_objs(cid
, pg_num
, num_objs
, spos
);
2805 dout(10) << "Unrecognized collection hint type: " << type
<< dendl
;
2810 case Transaction::OP_RMCOLL
:
2812 const coll_t
&cid
= i
.get_cid(op
->cid
);
2813 tracepoint(objectstore
, rmcoll_enter
, osr_name
);
2814 if (_check_replay_guard(cid
, spos
) > 0)
2815 r
= _destroy_collection(cid
);
2816 tracepoint(objectstore
, rmcoll_exit
, r
);
2820 case Transaction::OP_COLL_ADD
:
2822 const coll_t
&ocid
= i
.get_cid(op
->cid
);
2823 const coll_t
&ncid
= i
.get_cid(op
->dest_cid
);
2824 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2826 assert(oid
.hobj
.pool
>= -1);
2828 // always followed by OP_COLL_REMOVE
2829 Transaction::Op
*op2
= i
.decode_op();
2830 const coll_t
&ocid2
= i
.get_cid(op2
->cid
);
2831 const ghobject_t
&oid2
= i
.get_oid(op2
->oid
);
2832 assert(op2
->op
== Transaction::OP_COLL_REMOVE
);
2833 assert(ocid2
== ocid
);
2834 assert(oid2
== oid
);
2836 tracepoint(objectstore
, coll_add_enter
);
2837 r
= _collection_add(ncid
, ocid
, oid
, spos
);
2838 tracepoint(objectstore
, coll_add_exit
, r
);
2842 tracepoint(objectstore
, coll_remove_enter
, osr_name
);
2843 if (_check_replay_guard(ocid
, oid
, spos
) > 0)
2844 r
= _remove(ocid
, oid
, spos
);
2845 tracepoint(objectstore
, coll_remove_exit
, r
);
2849 case Transaction::OP_COLL_MOVE
:
2851 // WARNING: this is deprecated and buggy; only here to replay old journals.
2852 const coll_t
&ocid
= i
.get_cid(op
->cid
);
2853 const coll_t
&ncid
= i
.get_cid(op
->dest_cid
);
2854 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2855 tracepoint(objectstore
, coll_move_enter
);
2856 r
= _collection_add(ocid
, ncid
, oid
, spos
);
2858 (_check_replay_guard(ocid
, oid
, spos
) > 0))
2859 r
= _remove(ocid
, oid
, spos
);
2860 tracepoint(objectstore
, coll_move_exit
, r
);
2864 case Transaction::OP_COLL_MOVE_RENAME
:
2866 const coll_t
&_oldcid
= i
.get_cid(op
->cid
);
2867 const ghobject_t
&oldoid
= i
.get_oid(op
->oid
);
2868 const coll_t
&_newcid
= i
.get_cid(op
->dest_cid
);
2869 const ghobject_t
&newoid
= i
.get_oid(op
->dest_oid
);
2870 const coll_t
&oldcid
= !_need_temp_object_collection(_oldcid
, oldoid
) ?
2871 _oldcid
: _oldcid
.get_temp();
2872 const coll_t
&newcid
= !_need_temp_object_collection(_newcid
, newoid
) ?
2873 _oldcid
: _newcid
.get_temp();
2874 tracepoint(objectstore
, coll_move_rename_enter
);
2875 r
= _collection_move_rename(oldcid
, oldoid
, newcid
, newoid
, spos
);
2876 tracepoint(objectstore
, coll_move_rename_exit
, r
);
2880 case Transaction::OP_TRY_RENAME
:
2882 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2883 const ghobject_t
&oldoid
= i
.get_oid(op
->oid
);
2884 const ghobject_t
&newoid
= i
.get_oid(op
->dest_oid
);
2885 const coll_t
&oldcid
= !_need_temp_object_collection(_cid
, oldoid
) ?
2886 _cid
: _cid
.get_temp();
2887 const coll_t
&newcid
= !_need_temp_object_collection(_cid
, newoid
) ?
2888 _cid
: _cid
.get_temp();
2889 tracepoint(objectstore
, coll_try_rename_enter
);
2890 r
= _collection_move_rename(oldcid
, oldoid
, newcid
, newoid
, spos
, true);
2891 tracepoint(objectstore
, coll_try_rename_exit
, r
);
2895 case Transaction::OP_COLL_SETATTR
:
2896 case Transaction::OP_COLL_RMATTR
:
2897 assert(0 == "collection attr methods no longer implemented");
2900 case Transaction::OP_STARTSYNC
:
2901 tracepoint(objectstore
, startsync_enter
, osr_name
);
2903 tracepoint(objectstore
, startsync_exit
);
2906 case Transaction::OP_COLL_RENAME
:
2912 case Transaction::OP_OMAP_CLEAR
:
2914 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2915 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2916 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2917 _cid
: _cid
.get_temp();
2918 tracepoint(objectstore
, omap_clear_enter
, osr_name
);
2919 r
= _omap_clear(cid
, oid
, spos
);
2920 tracepoint(objectstore
, omap_clear_exit
, r
);
2923 case Transaction::OP_OMAP_SETKEYS
:
2925 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2926 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2927 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2928 _cid
: _cid
.get_temp();
2929 map
<string
, bufferlist
> aset
;
2930 i
.decode_attrset(aset
);
2931 tracepoint(objectstore
, omap_setkeys_enter
, osr_name
);
2932 r
= _omap_setkeys(cid
, oid
, aset
, spos
);
2933 tracepoint(objectstore
, omap_setkeys_exit
, r
);
2936 case Transaction::OP_OMAP_RMKEYS
:
2938 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2939 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2940 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2941 _cid
: _cid
.get_temp();
2943 i
.decode_keyset(keys
);
2944 tracepoint(objectstore
, omap_rmkeys_enter
, osr_name
);
2945 r
= _omap_rmkeys(cid
, oid
, keys
, spos
);
2946 tracepoint(objectstore
, omap_rmkeys_exit
, r
);
2949 case Transaction::OP_OMAP_RMKEYRANGE
:
2951 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2952 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2953 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2954 _cid
: _cid
.get_temp();
2956 first
= i
.decode_string();
2957 last
= i
.decode_string();
2958 tracepoint(objectstore
, omap_rmkeyrange_enter
, osr_name
);
2959 r
= _omap_rmkeyrange(cid
, oid
, first
, last
, spos
);
2960 tracepoint(objectstore
, omap_rmkeyrange_exit
, r
);
2963 case Transaction::OP_OMAP_SETHEADER
:
2965 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2966 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2967 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2968 _cid
: _cid
.get_temp();
2971 tracepoint(objectstore
, omap_setheader_enter
, osr_name
);
2972 r
= _omap_setheader(cid
, oid
, bl
, spos
);
2973 tracepoint(objectstore
, omap_setheader_exit
, r
);
2976 case Transaction::OP_SPLIT_COLLECTION
:
2978 assert(0 == "not legacy journal; upgrade to firefly first");
2981 case Transaction::OP_SPLIT_COLLECTION2
:
2983 coll_t cid
= i
.get_cid(op
->cid
);
2984 uint32_t bits
= op
->split_bits
;
2985 uint32_t rem
= op
->split_rem
;
2986 coll_t dest
= i
.get_cid(op
->dest_cid
);
2987 tracepoint(objectstore
, split_coll2_enter
, osr_name
);
2988 r
= _split_collection(cid
, bits
, rem
, dest
, spos
);
2989 tracepoint(objectstore
, split_coll2_exit
, r
);
2993 case Transaction::OP_SETALLOCHINT
:
2995 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2996 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2997 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2998 _cid
: _cid
.get_temp();
2999 uint64_t expected_object_size
= op
->expected_object_size
;
3000 uint64_t expected_write_size
= op
->expected_write_size
;
3001 tracepoint(objectstore
, setallochint_enter
, osr_name
);
3002 if (_check_replay_guard(cid
, oid
, spos
) > 0)
3003 r
= _set_alloc_hint(cid
, oid
, expected_object_size
,
3004 expected_write_size
);
3005 tracepoint(objectstore
, setallochint_exit
, r
);
3010 derr
<< "bad op " << op
->op
<< dendl
;
3017 if (r
== -ENOENT
&& !(op
->op
== Transaction::OP_CLONERANGE
||
3018 op
->op
== Transaction::OP_CLONE
||
3019 op
->op
== Transaction::OP_CLONERANGE2
||
3020 op
->op
== Transaction::OP_COLL_ADD
||
3021 op
->op
== Transaction::OP_SETATTR
||
3022 op
->op
== Transaction::OP_SETATTRS
||
3023 op
->op
== Transaction::OP_RMATTR
||
3024 op
->op
== Transaction::OP_OMAP_SETKEYS
||
3025 op
->op
== Transaction::OP_OMAP_RMKEYS
||
3026 op
->op
== Transaction::OP_OMAP_RMKEYRANGE
||
3027 op
->op
== Transaction::OP_OMAP_SETHEADER
))
3028 // -ENOENT is normally okay
3029 // ...including on a replayed OP_RMCOLL with checkpoint mode
3034 if (op
->op
== Transaction::OP_SETALLOCHINT
)
3035 // Either EOPNOTSUPP or EINVAL most probably. EINVAL in most
3036 // cases means invalid hint size (e.g. too big, not a multiple
3037 // of block size, etc) or, at least on xfs, an attempt to set
3038 // or change it when the file is not empty. However,
3039 // OP_SETALLOCHINT is advisory, so ignore all errors.
3042 if (replaying
&& !backend
->can_checkpoint()) {
3043 if (r
== -EEXIST
&& op
->op
== Transaction::OP_MKCOLL
) {
3044 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl
;
3047 if (r
== -EEXIST
&& op
->op
== Transaction::OP_COLL_ADD
) {
3048 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl
;
3051 if (r
== -EEXIST
&& op
->op
== Transaction::OP_COLL_MOVE
) {
3052 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl
;
3056 dout(10) << "tolerating ERANGE on replay" << dendl
;
3060 dout(10) << "tolerating ENOENT on replay" << dendl
;
3066 const char *msg
= "unexpected error code";
3068 if (r
== -ENOENT
&& (op
->op
== Transaction::OP_CLONERANGE
||
3069 op
->op
== Transaction::OP_CLONE
||
3070 op
->op
== Transaction::OP_CLONERANGE2
)) {
3071 msg
= "ENOENT on clone suggests osd bug";
3072 } else if (r
== -ENOSPC
) {
3073 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
3074 // by partially applying transactions.
3075 msg
= "ENOSPC from disk filesystem, misconfigured cluster";
3076 } else if (r
== -ENOTEMPTY
) {
3077 msg
= "ENOTEMPTY suggests garbage data in osd data dir";
3078 } else if (r
== -EPERM
) {
3079 msg
= "EPERM suggests file(s) in osd data dir not owned by ceph user, or leveldb corruption";
3082 derr
<< " error " << cpp_strerror(r
) << " not handled on operation " << op
3083 << " (" << spos
<< ", or op " << spos
.op
<< ", counting from 0)" << dendl
;
3084 dout(0) << msg
<< dendl
;
3085 dout(0) << " transaction dump:\n";
3086 JSONFormatter
f(true);
3087 f
.open_object_section("transaction");
3097 assert(0 == "unexpected error");
3107 /*********************************************/
3111 // --------------------
3114 bool FileStore::exists(const coll_t
& _cid
, const ghobject_t
& oid
)
3116 tracepoint(objectstore
, exists_enter
, _cid
.c_str());
3117 const coll_t
& cid
= !_need_temp_object_collection(_cid
, oid
) ? _cid
: _cid
.get_temp();
3119 bool retval
= stat(cid
, oid
, &st
) == 0;
3120 tracepoint(objectstore
, exists_exit
, retval
);
3124 int FileStore::stat(
3125 const coll_t
& _cid
, const ghobject_t
& oid
, struct stat
*st
, bool allow_eio
)
3127 tracepoint(objectstore
, stat_enter
, _cid
.c_str());
3128 const coll_t
& cid
= !_need_temp_object_collection(_cid
, oid
) ? _cid
: _cid
.get_temp();
3129 int r
= lfn_stat(cid
, oid
, st
);
3130 assert(allow_eio
|| !m_filestore_fail_eio
|| r
!= -EIO
);
3132 dout(10) << "stat " << cid
<< "/" << oid
3133 << " = " << r
<< dendl
;
3135 dout(10) << "stat " << cid
<< "/" << oid
3137 << " (size " << st
->st_size
<< ")" << dendl
;
3139 if (cct
->_conf
->filestore_debug_inject_read_err
&&
3140 debug_mdata_eio(oid
)) {
3143 tracepoint(objectstore
, stat_exit
, r
);
3148 int FileStore::set_collection_opts(
3150 const pool_opts_t
& opts
)
3155 int FileStore::read(
3157 const ghobject_t
& oid
,
3165 tracepoint(objectstore
, read_enter
, _cid
.c_str(), offset
, len
);
3166 const coll_t
& cid
= !_need_temp_object_collection(_cid
, oid
) ? _cid
: _cid
.get_temp();
3168 dout(15) << "read " << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< dendl
;
3171 int r
= lfn_open(cid
, oid
, false, &fd
);
3173 dout(10) << "FileStore::read(" << cid
<< "/" << oid
<< ") open error: "
3174 << cpp_strerror(r
) << dendl
;
3178 if (offset
== 0 && len
== 0) {
3180 memset(&st
, 0, sizeof(struct stat
));
3181 int r
= ::fstat(**fd
, &st
);
3186 #ifdef HAVE_POSIX_FADVISE
3187 if (op_flags
& CEPH_OSD_OP_FLAG_FADVISE_RANDOM
)
3188 posix_fadvise(**fd
, offset
, len
, POSIX_FADV_RANDOM
);
3189 if (op_flags
& CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
)
3190 posix_fadvise(**fd
, offset
, len
, POSIX_FADV_SEQUENTIAL
);
3193 bufferptr
bptr(len
); // prealloc space for entire read
3194 got
= safe_pread(**fd
, bptr
.c_str(), len
, offset
);
3196 dout(10) << "FileStore::read(" << cid
<< "/" << oid
<< ") pread error: " << cpp_strerror(got
) << dendl
;
3198 if (!(allow_eio
|| !m_filestore_fail_eio
|| got
!= -EIO
)) {
3199 derr
<< "FileStore::read(" << cid
<< "/" << oid
<< ") pread error: " << cpp_strerror(got
) << dendl
;
3200 assert(0 == "eio on pread");
3204 bptr
.set_length(got
); // properly size the buffer
3206 bl
.push_back(std::move(bptr
)); // put it in the target bufferlist
3208 #ifdef HAVE_POSIX_FADVISE
3209 if (op_flags
& CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
)
3210 posix_fadvise(**fd
, offset
, len
, POSIX_FADV_DONTNEED
);
3211 if (op_flags
& (CEPH_OSD_OP_FLAG_FADVISE_RANDOM
| CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
))
3212 posix_fadvise(**fd
, offset
, len
, POSIX_FADV_NORMAL
);
3215 if (m_filestore_sloppy_crc
&& (!replaying
|| backend
->can_checkpoint())) {
3217 int errors
= backend
->_crc_verify_read(**fd
, offset
, got
, bl
, &ss
);
3219 dout(0) << "FileStore::read " << cid
<< "/" << oid
<< " " << offset
<< "~"
3220 << got
<< " ... BAD CRC:\n" << ss
.str() << dendl
;
3221 assert(0 == "bad crc on read");
3227 dout(10) << "FileStore::read " << cid
<< "/" << oid
<< " " << offset
<< "~"
3228 << got
<< "/" << len
<< dendl
;
3229 if (cct
->_conf
->filestore_debug_inject_read_err
&&
3230 debug_data_eio(oid
)) {
3233 tracepoint(objectstore
, read_exit
, got
);
3238 int FileStore::_do_fiemap(int fd
, uint64_t offset
, size_t len
,
3239 map
<uint64_t, uint64_t> *m
)
3242 struct fiemap_extent
*extent
= NULL
;
3243 struct fiemap
*fiemap
= NULL
;
3247 r
= backend
->do_fiemap(fd
, offset
, len
, &fiemap
);
3251 if (fiemap
->fm_mapped_extents
== 0) {
3256 extent
= &fiemap
->fm_extents
[0];
3258 /* start where we were asked to start */
3259 if (extent
->fe_logical
< offset
) {
3260 extent
->fe_length
-= offset
- extent
->fe_logical
;
3261 extent
->fe_logical
= offset
;
3266 struct fiemap_extent
*last
= nullptr;
3267 while (i
< fiemap
->fm_mapped_extents
) {
3268 struct fiemap_extent
*next
= extent
+ 1;
3270 dout(10) << "FileStore::fiemap() fm_mapped_extents=" << fiemap
->fm_mapped_extents
3271 << " fe_logical=" << extent
->fe_logical
<< " fe_length=" << extent
->fe_length
<< dendl
;
3273 /* try to merge extents */
3274 while ((i
< fiemap
->fm_mapped_extents
- 1) &&
3275 (extent
->fe_logical
+ extent
->fe_length
== next
->fe_logical
)) {
3276 next
->fe_length
+= extent
->fe_length
;
3277 next
->fe_logical
= extent
->fe_logical
;
3283 if (extent
->fe_logical
+ extent
->fe_length
> offset
+ len
)
3284 extent
->fe_length
= offset
+ len
- extent
->fe_logical
;
3285 (*m
)[extent
->fe_logical
] = extent
->fe_length
;
3289 uint64_t xoffset
= last
->fe_logical
+ last
->fe_length
- offset
;
3290 offset
= last
->fe_logical
+ last
->fe_length
;
3292 const bool is_last
= (last
->fe_flags
& FIEMAP_EXTENT_LAST
) || (len
== 0);
3301 int FileStore::_do_seek_hole_data(int fd
, uint64_t offset
, size_t len
,
3302 map
<uint64_t, uint64_t> *m
)
3304 #if defined(__linux__) && defined(SEEK_HOLE) && defined(SEEK_DATA)
3305 off_t hole_pos
, data_pos
;
3308 // If lseek fails with errno setting to be ENXIO, this means the current
3309 // file offset is beyond the end of the file.
3310 off_t start
= offset
;
3311 while(start
< (off_t
)(offset
+ len
)) {
3312 data_pos
= lseek(fd
, start
, SEEK_DATA
);
3318 dout(10) << "failed to lseek: " << cpp_strerror(r
) << dendl
;
3321 } else if (data_pos
> (off_t
)(offset
+ len
)) {
3325 hole_pos
= lseek(fd
, data_pos
, SEEK_HOLE
);
3327 if (errno
== ENXIO
) {
3331 dout(10) << "failed to lseek: " << cpp_strerror(r
) << dendl
;
3336 if (hole_pos
>= (off_t
)(offset
+ len
)) {
3337 (*m
)[data_pos
] = offset
+ len
- data_pos
;
3340 (*m
)[data_pos
] = hole_pos
- data_pos
;
3351 int FileStore::fiemap(const coll_t
& _cid
, const ghobject_t
& oid
,
3352 uint64_t offset
, size_t len
,
3355 map
<uint64_t, uint64_t> exomap
;
3356 int r
= fiemap(_cid
, oid
, offset
, len
, exomap
);
3358 ::encode(exomap
, bl
);
3363 int FileStore::fiemap(const coll_t
& _cid
, const ghobject_t
& oid
,
3364 uint64_t offset
, size_t len
,
3365 map
<uint64_t, uint64_t>& destmap
)
3367 tracepoint(objectstore
, fiemap_enter
, _cid
.c_str(), offset
, len
);
3368 const coll_t
& cid
= !_need_temp_object_collection(_cid
, oid
) ? _cid
: _cid
.get_temp();
3371 if ((!backend
->has_seek_data_hole() && !backend
->has_fiemap()) ||
3372 len
<= (size_t)m_filestore_fiemap_threshold
) {
3373 destmap
[offset
] = len
;
3377 dout(15) << "fiemap " << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< dendl
;
3381 int r
= lfn_open(cid
, oid
, false, &fd
);
3383 dout(10) << "read couldn't open " << cid
<< "/" << oid
<< ": " << cpp_strerror(r
) << dendl
;
3387 if (backend
->has_seek_data_hole()) {
3388 dout(15) << "seek_data/seek_hole " << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< dendl
;
3389 r
= _do_seek_hole_data(**fd
, offset
, len
, &destmap
);
3390 } else if (backend
->has_fiemap()) {
3391 dout(15) << "fiemap ioctl" << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< dendl
;
3392 r
= _do_fiemap(**fd
, offset
, len
, &destmap
);
3399 dout(10) << "fiemap " << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< " = " << r
<< " num_extents=" << destmap
.size() << " " << destmap
<< dendl
;
3400 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
3401 tracepoint(objectstore
, fiemap_exit
, r
);
3405 int FileStore::_remove(const coll_t
& cid
, const ghobject_t
& oid
,
3406 const SequencerPosition
&spos
)
3408 dout(15) << "remove " << cid
<< "/" << oid
<< dendl
;
3409 int r
= lfn_unlink(cid
, oid
, spos
);
3410 dout(10) << "remove " << cid
<< "/" << oid
<< " = " << r
<< dendl
;
3414 int FileStore::_truncate(const coll_t
& cid
, const ghobject_t
& oid
, uint64_t size
)
3416 dout(15) << "truncate " << cid
<< "/" << oid
<< " size " << size
<< dendl
;
3417 int r
= lfn_truncate(cid
, oid
, size
);
3418 dout(10) << "truncate " << cid
<< "/" << oid
<< " size " << size
<< " = " << r
<< dendl
;
3423 int FileStore::_touch(const coll_t
& cid
, const ghobject_t
& oid
)
3425 dout(15) << "touch " << cid
<< "/" << oid
<< dendl
;
3428 int r
= lfn_open(cid
, oid
, true, &fd
);
3434 dout(10) << "touch " << cid
<< "/" << oid
<< " = " << r
<< dendl
;
3438 int FileStore::_write(const coll_t
& cid
, const ghobject_t
& oid
,
3439 uint64_t offset
, size_t len
,
3440 const bufferlist
& bl
, uint32_t fadvise_flags
)
3442 dout(15) << "write " << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< dendl
;
3446 r
= lfn_open(cid
, oid
, true, &fd
);
3448 dout(0) << "write couldn't open " << cid
<< "/"
3450 << cpp_strerror(r
) << dendl
;
3455 r
= bl
.write_fd(**fd
, offset
);
3457 derr
<< __func__
<< " write_fd on " << cid
<< "/" << oid
3458 << " error: " << cpp_strerror(r
) << dendl
;
3464 if (r
>= 0 && m_filestore_sloppy_crc
) {
3465 int rc
= backend
->_crc_update_write(**fd
, offset
, len
, bl
);
3469 if (replaying
|| m_disable_wbthrottle
) {
3470 if (fadvise_flags
& CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
) {
3471 #ifdef HAVE_POSIX_FADVISE
3472 posix_fadvise(**fd
, 0, 0, POSIX_FADV_DONTNEED
);
3476 wbthrottle
.queue_wb(fd
, oid
, offset
, len
,
3477 fadvise_flags
& CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
);
3483 dout(10) << "write " << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< " = " << r
<< dendl
;
3487 int FileStore::_zero(const coll_t
& cid
, const ghobject_t
& oid
, uint64_t offset
, size_t len
)
3489 dout(15) << "zero " << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< dendl
;
3492 if (cct
->_conf
->filestore_punch_hole
) {
3493 #ifdef CEPH_HAVE_FALLOCATE
3494 # if !defined(DARWIN) && !defined(__FreeBSD__)
3495 # ifdef FALLOC_FL_KEEP_SIZE
3496 // first try to punch a hole.
3498 ret
= lfn_open(cid
, oid
, false, &fd
);
3504 ret
= ::fstat(**fd
, &st
);
3511 // first try fallocate
3512 ret
= fallocate(**fd
, FALLOC_FL_KEEP_SIZE
| FALLOC_FL_PUNCH_HOLE
,
3517 // ensure we extent file size, if needed
3518 if (offset
+ len
> (uint64_t)st
.st_size
) {
3519 ret
= ::ftruncate(**fd
, offset
+ len
);
3529 if (ret
>= 0 && m_filestore_sloppy_crc
) {
3530 int rc
= backend
->_crc_update_zero(**fd
, offset
, len
);
3536 if (ret
!= -EOPNOTSUPP
)
3537 goto out
; // some other error
3543 // lame, kernel is old and doesn't support it.
3544 // write zeros.. yuck!
3545 dout(20) << "zero falling back to writing zeros" << dendl
;
3548 bl
.append_zero(len
);
3549 ret
= _write(cid
, oid
, offset
, len
, bl
);
3552 #ifdef CEPH_HAVE_FALLOCATE
3553 # if !defined(DARWIN) && !defined(__FreeBSD__)
3554 # ifdef FALLOC_FL_KEEP_SIZE
3559 dout(20) << "zero " << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< " = " << ret
<< dendl
;
3563 int FileStore::_clone(const coll_t
& cid
, const ghobject_t
& oldoid
, const ghobject_t
& newoid
,
3564 const SequencerPosition
& spos
)
3566 dout(15) << "clone " << cid
<< "/" << oldoid
<< " -> " << cid
<< "/" << newoid
<< dendl
;
3568 if (_check_replay_guard(cid
, newoid
, spos
) < 0)
3575 r
= lfn_open(cid
, oldoid
, false, &o
, &index
);
3579 assert(NULL
!= (index
.index
));
3580 RWLock::WLocker
l((index
.index
)->access_lock
);
3582 r
= lfn_open(cid
, newoid
, true, &n
, &index
);
3586 r
= ::ftruncate(**n
, 0);
3592 r
= ::fstat(**o
, &st
);
3598 r
= _do_clone_range(**o
, **n
, 0, st
.st_size
, 0);
3603 dout(20) << "objectmap clone" << dendl
;
3604 r
= object_map
->clone(oldoid
, newoid
, &spos
);
3605 if (r
< 0 && r
!= -ENOENT
)
3611 map
<string
, bufferptr
> aset
;
3612 r
= _fgetattrs(**o
, aset
);
3616 r
= chain_fgetxattr(**o
, XATTR_SPILL_OUT_NAME
, buf
, sizeof(buf
));
3617 if (r
>= 0 && !strncmp(buf
, XATTR_NO_SPILL_OUT
, sizeof(XATTR_NO_SPILL_OUT
))) {
3618 r
= chain_fsetxattr
<true, true>(**n
, XATTR_SPILL_OUT_NAME
, XATTR_NO_SPILL_OUT
,
3619 sizeof(XATTR_NO_SPILL_OUT
));
3621 r
= chain_fsetxattr
<true, true>(**n
, XATTR_SPILL_OUT_NAME
, XATTR_SPILL_OUT
,
3622 sizeof(XATTR_SPILL_OUT
));
3627 r
= _fsetattrs(**n
, aset
);
3632 // clone is non-idempotent; record our work.
3633 _set_replay_guard(**n
, spos
, &newoid
);
3640 dout(10) << "clone " << cid
<< "/" << oldoid
<< " -> " << cid
<< "/" << newoid
<< " = " << r
<< dendl
;
3641 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
3645 int FileStore::_do_clone_range(int from
, int to
, uint64_t srcoff
, uint64_t len
, uint64_t dstoff
)
3647 dout(20) << "_do_clone_range copy " << srcoff
<< "~" << len
<< " to " << dstoff
<< dendl
;
3648 return backend
->clone_range(from
, to
, srcoff
, len
, dstoff
);
3651 int FileStore::_do_sparse_copy_range(int from
, int to
, uint64_t srcoff
, uint64_t len
, uint64_t dstoff
)
3653 dout(20) << __func__
<< " " << srcoff
<< "~" << len
<< " to " << dstoff
<< dendl
;
3655 map
<uint64_t, uint64_t> exomap
;
3656 // fiemap doesn't allow zero length
3660 if (backend
->has_seek_data_hole()) {
3661 dout(15) << "seek_data/seek_hole " << from
<< " " << srcoff
<< "~" << len
<< dendl
;
3662 r
= _do_seek_hole_data(from
, srcoff
, len
, &exomap
);
3663 } else if (backend
->has_fiemap()) {
3664 dout(15) << "fiemap ioctl" << from
<< " " << srcoff
<< "~" << len
<< dendl
;
3665 r
= _do_fiemap(from
, srcoff
, len
, &exomap
);
3669 int64_t written
= 0;
3673 for (map
<uint64_t, uint64_t>::iterator miter
= exomap
.begin(); miter
!= exomap
.end(); ++miter
) {
3674 uint64_t it_off
= miter
->first
- srcoff
+ dstoff
;
3675 r
= _do_copy_range(from
, to
, miter
->first
, miter
->second
, it_off
, true);
3677 derr
<< "FileStore::_do_copy_range: copy error at " << miter
->first
<< "~" << miter
->second
3678 << " to " << it_off
<< ", " << cpp_strerror(r
) << dendl
;
3681 written
+= miter
->second
;
3685 if (m_filestore_sloppy_crc
) {
3686 int rc
= backend
->_crc_update_clone_range(from
, to
, srcoff
, len
, dstoff
);
3690 r
= ::fstat(to
, &st
);
3693 derr
<< __func__
<< ": fstat error at " << to
<< " " << cpp_strerror(r
) << dendl
;
3696 if (st
.st_size
< (int)(dstoff
+ len
)) {
3697 r
= ::ftruncate(to
, dstoff
+ len
);
3700 derr
<< __func__
<< ": ftruncate error at " << dstoff
+len
<< " " << cpp_strerror(r
) << dendl
;
3708 dout(20) << __func__
<< " " << srcoff
<< "~" << len
<< " to " << dstoff
<< " = " << r
<< dendl
;
3712 int FileStore::_do_copy_range(int from
, int to
, uint64_t srcoff
, uint64_t len
, uint64_t dstoff
, bool skip_sloppycrc
)
3714 dout(20) << "_do_copy_range " << srcoff
<< "~" << len
<< " to " << dstoff
<< dendl
;
3716 loff_t pos
= srcoff
;
3717 loff_t end
= srcoff
+ len
;
3718 int buflen
= 4096 * 16; //limit by pipe max size.see fcntl
3720 #ifdef CEPH_HAVE_SPLICE
3721 if (backend
->has_splice()) {
3723 if (pipe(pipefd
) < 0) {
3725 derr
<< " pipe " << " got " << cpp_strerror(r
) << dendl
;
3729 loff_t dstpos
= dstoff
;
3731 int l
= MIN(end
-pos
, buflen
);
3732 r
= safe_splice(from
, &pos
, pipefd
[1], NULL
, l
, SPLICE_F_NONBLOCK
);
3733 dout(10) << " safe_splice read from " << pos
<< "~" << l
<< " got " << r
<< dendl
;
3735 derr
<< "FileStore::_do_copy_range: safe_splice read error at " << pos
<< "~" << len
3736 << ", " << cpp_strerror(r
) << dendl
;
3740 // hrm, bad source range, wtf.
3742 derr
<< "FileStore::_do_copy_range got short read result at " << pos
3743 << " of fd " << from
<< " len " << len
<< dendl
;
3747 r
= safe_splice(pipefd
[0], NULL
, to
, &dstpos
, r
, 0);
3748 dout(10) << " safe_splice write to " << to
<< " len " << r
3749 << " got " << r
<< dendl
;
3751 derr
<< "FileStore::_do_copy_range: write error at " << pos
<< "~"
3752 << r
<< ", " << cpp_strerror(r
) << dendl
;
3763 actual
= ::lseek64(from
, srcoff
, SEEK_SET
);
3764 if (actual
!= (int64_t)srcoff
) {
3769 derr
<< "lseek64 to " << srcoff
<< " got " << cpp_strerror(r
) << dendl
;
3772 actual
= ::lseek64(to
, dstoff
, SEEK_SET
);
3773 if (actual
!= (int64_t)dstoff
) {
3778 derr
<< "lseek64 to " << dstoff
<< " got " << cpp_strerror(r
) << dendl
;
3784 int l
= MIN(end
-pos
, buflen
);
3785 r
= ::read(from
, buf
, l
);
3786 dout(25) << " read from " << pos
<< "~" << l
<< " got " << r
<< dendl
;
3788 if (errno
== EINTR
) {
3792 derr
<< "FileStore::_do_copy_range: read error at " << pos
<< "~" << len
3793 << ", " << cpp_strerror(r
) << dendl
;
3798 // hrm, bad source range, wtf.
3800 derr
<< "FileStore::_do_copy_range got short read result at " << pos
3801 << " of fd " << from
<< " len " << len
<< dendl
;
3806 int r2
= safe_write(to
, buf
+op
, r
-op
);
3807 dout(25) << " write to " << to
<< " len " << (r
-op
)
3808 << " got " << r2
<< dendl
;
3811 derr
<< "FileStore::_do_copy_range: write error at " << pos
<< "~"
3812 << r
-op
<< ", " << cpp_strerror(r
) << dendl
;
3824 if (r
< 0 && replaying
) {
3825 assert(r
== -ERANGE
);
3826 derr
<< "Filestore: short source tolerated because we are replaying" << dendl
;
3829 assert(replaying
|| pos
== end
);
3830 if (r
>= 0 && !skip_sloppycrc
&& m_filestore_sloppy_crc
) {
3831 int rc
= backend
->_crc_update_clone_range(from
, to
, srcoff
, len
, dstoff
);
3834 dout(20) << "_do_copy_range " << srcoff
<< "~" << len
<< " to " << dstoff
<< " = " << r
<< dendl
;
3838 int FileStore::_clone_range(const coll_t
& oldcid
, const ghobject_t
& oldoid
, const coll_t
& newcid
, const ghobject_t
& newoid
,
3839 uint64_t srcoff
, uint64_t len
, uint64_t dstoff
,
3840 const SequencerPosition
& spos
)
3842 dout(15) << "clone_range " << oldcid
<< "/" << oldoid
<< " -> " << newcid
<< "/" << newoid
<< " " << srcoff
<< "~" << len
<< " to " << dstoff
<< dendl
;
3844 if (_check_replay_guard(newcid
, newoid
, spos
) < 0)
3849 r
= lfn_open(oldcid
, oldoid
, false, &o
);
3853 r
= lfn_open(newcid
, newoid
, true, &n
);
3857 r
= _do_clone_range(**o
, **n
, srcoff
, len
, dstoff
);
3862 // clone is non-idempotent; record our work.
3863 _set_replay_guard(**n
, spos
, &newoid
);
3870 dout(10) << "clone_range " << oldcid
<< "/" << oldoid
<< " -> " << newcid
<< "/" << newoid
<< " "
3871 << srcoff
<< "~" << len
<< " to " << dstoff
<< " = " << r
<< dendl
;
3875 class SyncEntryTimeout
: public Context
{
3878 explicit SyncEntryTimeout(CephContext
* cct
, int commit_timeo
)
3879 : cct(cct
), m_commit_timeo(commit_timeo
)
3883 void finish(int r
) override
{
3884 BackTrace
*bt
= new BackTrace(1);
3885 generic_dout(-1) << "FileStore: sync_entry timed out after "
3886 << m_commit_timeo
<< " seconds.\n";
3896 void FileStore::sync_entry()
3900 utime_t max_interval
;
3901 max_interval
.set_from_double(m_filestore_max_sync_interval
);
3902 utime_t min_interval
;
3903 min_interval
.set_from_double(m_filestore_min_sync_interval
);
3905 utime_t startwait
= ceph_clock_now();
3907 dout(20) << "sync_entry waiting for max_interval " << max_interval
<< dendl
;
3908 sync_cond
.WaitInterval(lock
, max_interval
);
3910 dout(20) << "sync_entry not waiting, force_sync set" << dendl
;
3914 dout(20) << "sync_entry force_sync set" << dendl
;
3917 dout(20) << __func__
<< " stop set" << dendl
;
3920 // wait for at least the min interval
3921 utime_t woke
= ceph_clock_now();
3923 dout(20) << "sync_entry woke after " << woke
<< dendl
;
3924 if (woke
< min_interval
) {
3925 utime_t t
= min_interval
;
3927 dout(20) << "sync_entry waiting for another " << t
3928 << " to reach min interval " << min_interval
<< dendl
;
3929 sync_cond
.WaitInterval(lock
, t
);
3935 fin
.swap(sync_waiters
);
3939 if (apply_manager
.commit_start()) {
3940 utime_t start
= ceph_clock_now();
3941 uint64_t cp
= apply_manager
.get_committing_seq();
3943 sync_entry_timeo_lock
.Lock();
3944 SyncEntryTimeout
*sync_entry_timeo
=
3945 new SyncEntryTimeout(cct
, m_filestore_commit_timeout
);
3946 timer
.add_event_after(m_filestore_commit_timeout
, sync_entry_timeo
);
3947 sync_entry_timeo_lock
.Unlock();
3949 logger
->set(l_filestore_committing
, 1);
3951 dout(15) << "sync_entry committing " << cp
<< dendl
;
3952 stringstream errstream
;
3953 if (cct
->_conf
->filestore_debug_omap_check
&& !object_map
->check(errstream
)) {
3954 derr
<< errstream
.str() << dendl
;
3958 if (backend
->can_checkpoint()) {
3959 int err
= write_op_seq(op_fd
, cp
);
3961 derr
<< "Error during write_op_seq: " << cpp_strerror(err
) << dendl
;
3962 assert(0 == "error during write_op_seq");
3966 snprintf(s
, sizeof(s
), COMMIT_SNAP_ITEM
, (long long unsigned)cp
);
3968 err
= backend
->create_checkpoint(s
, &cid
);
3971 derr
<< "snap create '" << s
<< "' got error " << err
<< dendl
;
3975 snaps
.push_back(cp
);
3976 apply_manager
.commit_started();
3980 dout(20) << " waiting for checkpoint " << cid
<< " to complete" << dendl
;
3981 err
= backend
->sync_checkpoint(cid
);
3983 derr
<< "ioctl WAIT_SYNC got " << cpp_strerror(err
) << dendl
;
3984 assert(0 == "wait_sync got error");
3986 dout(20) << " done waiting for checkpoint " << cid
<< " to complete" << dendl
;
3990 apply_manager
.commit_started();
3993 int err
= object_map
->sync();
3995 derr
<< "object_map sync got " << cpp_strerror(err
) << dendl
;
3996 assert(0 == "object_map sync returned error");
3999 err
= backend
->syncfs();
4001 derr
<< "syncfs got " << cpp_strerror(err
) << dendl
;
4002 assert(0 == "syncfs returned error");
4005 err
= write_op_seq(op_fd
, cp
);
4007 derr
<< "Error during write_op_seq: " << cpp_strerror(err
) << dendl
;
4008 assert(0 == "error during write_op_seq");
4010 err
= ::fsync(op_fd
);
4012 derr
<< "Error during fsync of op_seq: " << cpp_strerror(err
) << dendl
;
4013 assert(0 == "error during fsync of op_seq");
4017 utime_t done
= ceph_clock_now();
4018 utime_t lat
= done
- start
;
4019 utime_t dur
= done
- startwait
;
4020 dout(10) << "sync_entry commit took " << lat
<< ", interval was " << dur
<< dendl
;
4022 logger
->inc(l_filestore_commitcycle
);
4023 logger
->tinc(l_filestore_commitcycle_latency
, lat
);
4024 logger
->tinc(l_filestore_commitcycle_interval
, dur
);
4026 apply_manager
.commit_finish();
4027 if (!m_disable_wbthrottle
) {
4031 logger
->set(l_filestore_committing
, 0);
4033 // remove old snaps?
4034 if (backend
->can_checkpoint()) {
4036 while (snaps
.size() > 2) {
4037 snprintf(s
, sizeof(s
), COMMIT_SNAP_ITEM
, (long long unsigned)snaps
.front());
4039 dout(10) << "removing snap '" << s
<< "'" << dendl
;
4040 int r
= backend
->destroy_checkpoint(s
);
4043 derr
<< "unable to destroy snap '" << s
<< "' got " << cpp_strerror(err
) << dendl
;
4048 dout(15) << "sync_entry committed to op_seq " << cp
<< dendl
;
4050 sync_entry_timeo_lock
.Lock();
4051 timer
.cancel_event(sync_entry_timeo
);
4052 sync_entry_timeo_lock
.Unlock();
4058 finish_contexts(cct
, fin
, 0);
4060 if (!sync_waiters
.empty()) {
4061 dout(10) << "sync_entry more waiters, committing again" << dendl
;
4064 if (!stop
&& journal
&& journal
->should_commit_now()) {
4065 dout(10) << "sync_entry journal says we should commit again (probably is/was full)" << dendl
;
4073 void FileStore::_start_sync()
4075 if (!journal
) { // don't do a big sync if the journal is on
4076 dout(10) << "start_sync" << dendl
;
4079 dout(10) << "start_sync - NOOP (journal is on)" << dendl
;
4083 void FileStore::do_force_sync()
4085 dout(10) << __func__
<< dendl
;
4086 Mutex::Locker
l(lock
);
4091 void FileStore::start_sync(Context
*onsafe
)
4093 Mutex::Locker
l(lock
);
4094 sync_waiters
.push_back(onsafe
);
4097 dout(10) << "start_sync" << dendl
;
4100 void FileStore::sync()
4102 Mutex
l("FileStore::sync");
4105 C_SafeCond
*fin
= new C_SafeCond(&l
, &c
, &done
);
4111 dout(10) << "sync waiting" << dendl
;
4115 dout(10) << "sync done" << dendl
;
4118 void FileStore::_flush_op_queue()
4120 dout(10) << "_flush_op_queue draining op tp" << dendl
;
4122 dout(10) << "_flush_op_queue waiting for apply finisher" << dendl
;
4123 for (vector
<Finisher
*>::iterator it
= apply_finishers
.begin(); it
!= apply_finishers
.end(); ++it
) {
4124 (*it
)->wait_for_empty();
4129 * flush - make every queued write readable
4131 void FileStore::flush()
4133 dout(10) << "flush" << dendl
;
4135 if (cct
->_conf
->filestore_blackhole
) {
4137 Mutex
lock("FileStore::flush::lock");
4145 if (m_filestore_journal_writeahead
) {
4148 dout(10) << "flush draining ondisk finisher" << dendl
;
4149 for (vector
<Finisher
*>::iterator it
= ondisk_finishers
.begin(); it
!= ondisk_finishers
.end(); ++it
) {
4150 (*it
)->wait_for_empty();
4155 dout(10) << "flush complete" << dendl
;
4159 * sync_and_flush - make every queued write readable AND committed to disk
4161 void FileStore::sync_and_flush()
4163 dout(10) << "sync_and_flush" << dendl
;
4165 if (m_filestore_journal_writeahead
) {
4170 // includes m_filestore_journal_parallel
4174 dout(10) << "sync_and_flush done" << dendl
;
4177 int FileStore::flush_journal()
4179 dout(10) << __func__
<< dendl
;
4185 int FileStore::snapshot(const string
& name
)
4187 dout(10) << "snapshot " << name
<< dendl
;
4190 if (!backend
->can_checkpoint()) {
4191 dout(0) << "snapshot " << name
<< " failed, not supported" << dendl
;
4196 snprintf(s
, sizeof(s
), CLUSTER_SNAP_ITEM
, name
.c_str());
4198 int r
= backend
->create_checkpoint(s
, NULL
);
4200 derr
<< "snapshot " << name
<< " failed: " << cpp_strerror(r
) << dendl
;
4206 // -------------------------------
4209 int FileStore::_fgetattr(int fd
, const char *name
, bufferptr
& bp
)
4211 char val
[CHAIN_XATTR_MAX_BLOCK_LEN
];
4212 int l
= chain_fgetxattr(fd
, name
, val
, sizeof(val
));
4214 bp
= buffer::create(l
);
4215 memcpy(bp
.c_str(), val
, l
);
4216 } else if (l
== -ERANGE
) {
4217 l
= chain_fgetxattr(fd
, name
, 0, 0);
4219 bp
= buffer::create(l
);
4220 l
= chain_fgetxattr(fd
, name
, bp
.c_str(), l
);
4223 assert(!m_filestore_fail_eio
|| l
!= -EIO
);
4227 int FileStore::_fgetattrs(int fd
, map
<string
,bufferptr
>& aset
)
4231 int len
= chain_flistxattr(fd
, names1
, sizeof(names1
)-1);
4234 if (len
== -ERANGE
) {
4235 len
= chain_flistxattr(fd
, 0, 0);
4237 assert(!m_filestore_fail_eio
|| len
!= -EIO
);
4240 dout(10) << " -ERANGE, len is " << len
<< dendl
;
4241 names2
= new char[len
+1];
4242 len
= chain_flistxattr(fd
, names2
, len
);
4243 dout(10) << " -ERANGE, got " << len
<< dendl
;
4245 assert(!m_filestore_fail_eio
|| len
!= -EIO
);
4250 } else if (len
< 0) {
4251 assert(!m_filestore_fail_eio
|| len
!= -EIO
);
4258 char *end
= name
+ len
;
4259 while (name
< end
) {
4260 char *attrname
= name
;
4261 if (parse_attrname(&name
)) {
4263 dout(20) << "fgetattrs " << fd
<< " getting '" << name
<< "'" << dendl
;
4264 int r
= _fgetattr(fd
, attrname
, aset
[name
]);
4271 name
+= strlen(name
) + 1;
4278 int FileStore::_fsetattrs(int fd
, map
<string
, bufferptr
> &aset
)
4280 for (map
<string
, bufferptr
>::iterator p
= aset
.begin();
4283 char n
[CHAIN_XATTR_MAX_NAME_LEN
];
4284 get_attrname(p
->first
.c_str(), n
, CHAIN_XATTR_MAX_NAME_LEN
);
4286 if (p
->second
.length())
4287 val
= p
->second
.c_str();
4290 // ??? Why do we skip setting all the other attrs if one fails?
4291 int r
= chain_fsetxattr(fd
, n
, val
, p
->second
.length());
4293 derr
<< "FileStore::_setattrs: chain_setxattr returned " << r
<< dendl
;
4300 // debug EIO injection
4301 void FileStore::inject_data_error(const ghobject_t
&oid
) {
4302 Mutex::Locker
l(read_error_lock
);
4303 dout(10) << __func__
<< ": init error on " << oid
<< dendl
;
4304 data_error_set
.insert(oid
);
4306 void FileStore::inject_mdata_error(const ghobject_t
&oid
) {
4307 Mutex::Locker
l(read_error_lock
);
4308 dout(10) << __func__
<< ": init error on " << oid
<< dendl
;
4309 mdata_error_set
.insert(oid
);
4311 void FileStore::debug_obj_on_delete(const ghobject_t
&oid
) {
4312 Mutex::Locker
l(read_error_lock
);
4313 dout(10) << __func__
<< ": clear error on " << oid
<< dendl
;
4314 data_error_set
.erase(oid
);
4315 mdata_error_set
.erase(oid
);
4317 bool FileStore::debug_data_eio(const ghobject_t
&oid
) {
4318 Mutex::Locker
l(read_error_lock
);
4319 if (data_error_set
.count(oid
)) {
4320 dout(10) << __func__
<< ": inject error on " << oid
<< dendl
;
4326 bool FileStore::debug_mdata_eio(const ghobject_t
&oid
) {
4327 Mutex::Locker
l(read_error_lock
);
4328 if (mdata_error_set
.count(oid
)) {
4329 dout(10) << __func__
<< ": inject error on " << oid
<< dendl
;
4339 int FileStore::getattr(const coll_t
& _cid
, const ghobject_t
& oid
, const char *name
, bufferptr
&bp
)
4341 tracepoint(objectstore
, getattr_enter
, _cid
.c_str());
4342 const coll_t
& cid
= !_need_temp_object_collection(_cid
, oid
) ? _cid
: _cid
.get_temp();
4343 dout(15) << "getattr " << cid
<< "/" << oid
<< " '" << name
<< "'" << dendl
;
4345 int r
= lfn_open(cid
, oid
, false, &fd
);
4349 char n
[CHAIN_XATTR_MAX_NAME_LEN
];
4350 get_attrname(name
, n
, CHAIN_XATTR_MAX_NAME_LEN
);
4351 r
= _fgetattr(**fd
, n
, bp
);
4353 if (r
== -ENODATA
) {
4354 map
<string
, bufferlist
> got
;
4356 to_get
.insert(string(name
));
4358 r
= get_index(cid
, &index
);
4360 dout(10) << __func__
<< " could not get index r = " << r
<< dendl
;
4363 r
= object_map
->get_xattrs(oid
, to_get
, &got
);
4364 if (r
< 0 && r
!= -ENOENT
) {
4365 dout(10) << __func__
<< " get_xattrs err r =" << r
<< dendl
;
4369 dout(10) << __func__
<< " got.size() is 0" << dendl
;
4372 bp
= bufferptr(got
.begin()->second
.c_str(),
4373 got
.begin()->second
.length());
4377 dout(10) << "getattr " << cid
<< "/" << oid
<< " '" << name
<< "' = " << r
<< dendl
;
4378 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4379 if (cct
->_conf
->filestore_debug_inject_read_err
&&
4380 debug_mdata_eio(oid
)) {
4383 tracepoint(objectstore
, getattr_exit
, r
);
4384 return r
< 0 ? r
: 0;
4388 int FileStore::getattrs(const coll_t
& _cid
, const ghobject_t
& oid
, map
<string
,bufferptr
>& aset
)
4390 tracepoint(objectstore
, getattrs_enter
, _cid
.c_str());
4391 const coll_t
& cid
= !_need_temp_object_collection(_cid
, oid
) ? _cid
: _cid
.get_temp();
4392 set
<string
> omap_attrs
;
4393 map
<string
, bufferlist
> omap_aset
;
4395 dout(15) << "getattrs " << cid
<< "/" << oid
<< dendl
;
4397 bool spill_out
= true;
4400 int r
= lfn_open(cid
, oid
, false, &fd
);
4405 r
= chain_fgetxattr(**fd
, XATTR_SPILL_OUT_NAME
, buf
, sizeof(buf
));
4406 if (r
>= 0 && !strncmp(buf
, XATTR_NO_SPILL_OUT
, sizeof(XATTR_NO_SPILL_OUT
)))
4409 r
= _fgetattrs(**fd
, aset
);
4411 fd
= FDRef(); // defensive
4417 dout(10) << __func__
<< " no xattr exists in object_map r = " << r
<< dendl
;
4421 r
= get_index(cid
, &index
);
4423 dout(10) << __func__
<< " could not get index r = " << r
<< dendl
;
4427 r
= object_map
->get_all_xattrs(oid
, &omap_attrs
);
4428 if (r
< 0 && r
!= -ENOENT
) {
4429 dout(10) << __func__
<< " could not get omap_attrs r = " << r
<< dendl
;
4433 r
= object_map
->get_xattrs(oid
, omap_attrs
, &omap_aset
);
4434 if (r
< 0 && r
!= -ENOENT
) {
4435 dout(10) << __func__
<< " could not get omap_attrs r = " << r
<< dendl
;
4441 assert(omap_attrs
.size() == omap_aset
.size());
4442 for (map
<string
, bufferlist
>::iterator i
= omap_aset
.begin();
4443 i
!= omap_aset
.end();
4445 string
key(i
->first
);
4446 aset
.insert(make_pair(key
,
4447 bufferptr(i
->second
.c_str(), i
->second
.length())));
4450 dout(10) << "getattrs " << cid
<< "/" << oid
<< " = " << r
<< dendl
;
4451 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4453 if (cct
->_conf
->filestore_debug_inject_read_err
&&
4454 debug_mdata_eio(oid
)) {
4457 tracepoint(objectstore
, getattrs_exit
, r
);
4462 int FileStore::_setattrs(const coll_t
& cid
, const ghobject_t
& oid
, map
<string
,bufferptr
>& aset
,
4463 const SequencerPosition
&spos
)
4465 map
<string
, bufferlist
> omap_set
;
4466 set
<string
> omap_remove
;
4467 map
<string
, bufferptr
> inline_set
;
4468 map
<string
, bufferptr
> inline_to_set
;
4471 bool incomplete_inline
= false;
4473 int r
= lfn_open(cid
, oid
, false, &fd
);
4479 r
= chain_fgetxattr(**fd
, XATTR_SPILL_OUT_NAME
, buf
, sizeof(buf
));
4480 if (r
>= 0 && !strncmp(buf
, XATTR_NO_SPILL_OUT
, sizeof(XATTR_NO_SPILL_OUT
)))
4485 r
= _fgetattrs(**fd
, inline_set
);
4486 incomplete_inline
= (r
== -E2BIG
);
4487 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4488 dout(15) << "setattrs " << cid
<< "/" << oid
4489 << (incomplete_inline
? " (incomplete_inline, forcing omap)" : "")
4492 for (map
<string
,bufferptr
>::iterator p
= aset
.begin();
4495 char n
[CHAIN_XATTR_MAX_NAME_LEN
];
4496 get_attrname(p
->first
.c_str(), n
, CHAIN_XATTR_MAX_NAME_LEN
);
4498 if (incomplete_inline
) {
4499 chain_fremovexattr(**fd
, n
); // ignore any error
4500 omap_set
[p
->first
].push_back(p
->second
);
4504 if (p
->second
.length() > m_filestore_max_inline_xattr_size
) {
4505 if (inline_set
.count(p
->first
)) {
4506 inline_set
.erase(p
->first
);
4507 r
= chain_fremovexattr(**fd
, n
);
4511 omap_set
[p
->first
].push_back(p
->second
);
4515 if (!inline_set
.count(p
->first
) &&
4516 inline_set
.size() >= m_filestore_max_inline_xattrs
) {
4517 omap_set
[p
->first
].push_back(p
->second
);
4520 omap_remove
.insert(p
->first
);
4521 inline_set
.insert(*p
);
4523 inline_to_set
.insert(*p
);
4526 if (spill_out
!= 1 && !omap_set
.empty()) {
4527 chain_fsetxattr(**fd
, XATTR_SPILL_OUT_NAME
, XATTR_SPILL_OUT
,
4528 sizeof(XATTR_SPILL_OUT
));
4531 r
= _fsetattrs(**fd
, inline_to_set
);
4535 if (spill_out
&& !omap_remove
.empty()) {
4536 r
= object_map
->remove_xattrs(oid
, omap_remove
, &spos
);
4537 if (r
< 0 && r
!= -ENOENT
) {
4538 dout(10) << __func__
<< " could not remove_xattrs r = " << r
<< dendl
;
4539 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4542 r
= 0; // don't confuse the debug output
4546 if (!omap_set
.empty()) {
4547 r
= object_map
->set_xattrs(oid
, omap_set
, &spos
);
4549 dout(10) << __func__
<< " could not set_xattrs r = " << r
<< dendl
;
4550 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4557 dout(10) << "setattrs " << cid
<< "/" << oid
<< " = " << r
<< dendl
;
4562 int FileStore::_rmattr(const coll_t
& cid
, const ghobject_t
& oid
, const char *name
,
4563 const SequencerPosition
&spos
)
4565 dout(15) << "rmattr " << cid
<< "/" << oid
<< " '" << name
<< "'" << dendl
;
4567 bool spill_out
= true;
4569 int r
= lfn_open(cid
, oid
, false, &fd
);
4575 r
= chain_fgetxattr(**fd
, XATTR_SPILL_OUT_NAME
, buf
, sizeof(buf
));
4576 if (r
>= 0 && !strncmp(buf
, XATTR_NO_SPILL_OUT
, sizeof(XATTR_NO_SPILL_OUT
))) {
4580 char n
[CHAIN_XATTR_MAX_NAME_LEN
];
4581 get_attrname(name
, n
, CHAIN_XATTR_MAX_NAME_LEN
);
4582 r
= chain_fremovexattr(**fd
, n
);
4583 if (r
== -ENODATA
&& spill_out
) {
4585 r
= get_index(cid
, &index
);
4587 dout(10) << __func__
<< " could not get index r = " << r
<< dendl
;
4590 set
<string
> to_remove
;
4591 to_remove
.insert(string(name
));
4592 r
= object_map
->remove_xattrs(oid
, to_remove
, &spos
);
4593 if (r
< 0 && r
!= -ENOENT
) {
4594 dout(10) << __func__
<< " could not remove_xattrs index r = " << r
<< dendl
;
4595 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4602 dout(10) << "rmattr " << cid
<< "/" << oid
<< " '" << name
<< "' = " << r
<< dendl
;
4606 int FileStore::_rmattrs(const coll_t
& cid
, const ghobject_t
& oid
,
4607 const SequencerPosition
&spos
)
4609 dout(15) << "rmattrs " << cid
<< "/" << oid
<< dendl
;
4611 map
<string
,bufferptr
> aset
;
4613 set
<string
> omap_attrs
;
4615 bool spill_out
= true;
4617 int r
= lfn_open(cid
, oid
, false, &fd
);
4623 r
= chain_fgetxattr(**fd
, XATTR_SPILL_OUT_NAME
, buf
, sizeof(buf
));
4624 if (r
>= 0 && !strncmp(buf
, XATTR_NO_SPILL_OUT
, sizeof(XATTR_NO_SPILL_OUT
))) {
4628 r
= _fgetattrs(**fd
, aset
);
4630 for (map
<string
,bufferptr
>::iterator p
= aset
.begin(); p
!= aset
.end(); ++p
) {
4631 char n
[CHAIN_XATTR_MAX_NAME_LEN
];
4632 get_attrname(p
->first
.c_str(), n
, CHAIN_XATTR_MAX_NAME_LEN
);
4633 r
= chain_fremovexattr(**fd
, n
);
4635 dout(10) << __func__
<< " could not remove xattr r = " << r
<< dendl
;
4642 dout(10) << __func__
<< " no xattr exists in object_map r = " << r
<< dendl
;
4646 r
= get_index(cid
, &index
);
4648 dout(10) << __func__
<< " could not get index r = " << r
<< dendl
;
4652 r
= object_map
->get_all_xattrs(oid
, &omap_attrs
);
4653 if (r
< 0 && r
!= -ENOENT
) {
4654 dout(10) << __func__
<< " could not get omap_attrs r = " << r
<< dendl
;
4655 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4658 r
= object_map
->remove_xattrs(oid
, omap_attrs
, &spos
);
4659 if (r
< 0 && r
!= -ENOENT
) {
4660 dout(10) << __func__
<< " could not remove omap_attrs r = " << r
<< dendl
;
4665 chain_fsetxattr(**fd
, XATTR_SPILL_OUT_NAME
, XATTR_NO_SPILL_OUT
,
4666 sizeof(XATTR_NO_SPILL_OUT
));
4672 dout(10) << "rmattrs " << cid
<< "/" << oid
<< " = " << r
<< dendl
;
4679 int FileStore::_collection_remove_recursive(const coll_t
&cid
,
4680 const SequencerPosition
&spos
)
4683 int r
= collection_stat(cid
, &st
);
4690 vector
<ghobject_t
> objects
;
4692 while (!max
.is_max()) {
4693 r
= collection_list(cid
, max
, ghobject_t::get_max(),
4694 300, &objects
, &max
);
4697 for (vector
<ghobject_t
>::iterator i
= objects
.begin();
4700 assert(_check_replay_guard(cid
, *i
, spos
));
4701 r
= _remove(cid
, *i
, spos
);
4707 return _destroy_collection(cid
);
4710 // --------------------------
4713 int FileStore::list_collections(vector
<coll_t
>& ls
)
4715 return list_collections(ls
, false);
4718 int FileStore::list_collections(vector
<coll_t
>& ls
, bool include_temp
)
4720 tracepoint(objectstore
, list_collections_enter
);
4721 dout(10) << "list_collections" << dendl
;
4724 snprintf(fn
, sizeof(fn
), "%s/current", basedir
.c_str());
4727 DIR *dir
= ::opendir(fn
);
4730 derr
<< "tried opening directory " << fn
<< ": " << cpp_strerror(-r
) << dendl
;
4731 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4735 struct dirent
*de
= nullptr;
4736 while ((de
= ::readdir(dir
))) {
4737 if (de
->d_type
== DT_UNKNOWN
) {
4738 // d_type not supported (non-ext[234], btrfs), must stat
4740 char filename
[PATH_MAX
];
4741 snprintf(filename
, sizeof(filename
), "%s/%s", fn
, de
->d_name
);
4743 r
= ::stat(filename
, &sb
);
4746 derr
<< "stat on " << filename
<< ": " << cpp_strerror(-r
) << dendl
;
4747 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4750 if (!S_ISDIR(sb
.st_mode
)) {
4753 } else if (de
->d_type
!= DT_DIR
) {
4756 if (strcmp(de
->d_name
, "omap") == 0) {
4759 if (de
->d_name
[0] == '.' &&
4760 (de
->d_name
[1] == '\0' ||
4761 (de
->d_name
[1] == '.' &&
4762 de
->d_name
[2] == '\0')))
4765 if (!cid
.parse(de
->d_name
)) {
4766 derr
<< "ignoring invalid collection '" << de
->d_name
<< "'" << dendl
;
4769 if (!cid
.is_temp() || include_temp
)
4774 derr
<< "trying readdir " << fn
<< ": " << cpp_strerror(r
) << dendl
;
4779 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4780 tracepoint(objectstore
, list_collections_exit
, r
);
4784 int FileStore::collection_stat(const coll_t
& c
, struct stat
*st
)
4786 tracepoint(objectstore
, collection_stat_enter
, c
.c_str());
4788 get_cdir(c
, fn
, sizeof(fn
));
4789 dout(15) << "collection_stat " << fn
<< dendl
;
4790 int r
= ::stat(fn
, st
);
4793 dout(10) << "collection_stat " << fn
<< " = " << r
<< dendl
;
4794 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4795 tracepoint(objectstore
, collection_stat_exit
, r
);
4799 bool FileStore::collection_exists(const coll_t
& c
)
4801 tracepoint(objectstore
, collection_exists_enter
, c
.c_str());
4803 bool ret
= collection_stat(c
, &st
) == 0;
4804 tracepoint(objectstore
, collection_exists_exit
, ret
);
4808 int FileStore::collection_empty(const coll_t
& c
, bool *empty
)
4810 tracepoint(objectstore
, collection_empty_enter
, c
.c_str());
4811 dout(15) << "collection_empty " << c
<< dendl
;
4813 int r
= get_index(c
, &index
);
4815 derr
<< __func__
<< " get_index returned: " << cpp_strerror(r
)
4820 assert(NULL
!= index
.index
);
4821 RWLock::RLocker
l((index
.index
)->access_lock
);
4823 vector
<ghobject_t
> ls
;
4824 r
= index
->collection_list_partial(ghobject_t(), ghobject_t::get_max(),
4827 derr
<< __func__
<< " collection_list_partial returned: "
4828 << cpp_strerror(r
) << dendl
;
4829 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4832 *empty
= ls
.empty();
4833 tracepoint(objectstore
, collection_empty_exit
, *empty
);
4837 int FileStore::_collection_set_bits(const coll_t
& c
, int bits
)
4840 get_cdir(c
, fn
, sizeof(fn
));
4841 dout(10) << "collection_set_bits " << fn
<< " " << bits
<< dendl
;
4845 int fd
= ::open(fn
, O_RDONLY
);
4850 get_attrname("bits", n
, PATH_MAX
);
4851 r
= chain_fsetxattr(fd
, n
, (char*)&v
, sizeof(v
));
4852 VOID_TEMP_FAILURE_RETRY(::close(fd
));
4854 dout(10) << "collection_setattr " << fn
<< " " << bits
<< " = " << r
<< dendl
;
4858 int FileStore::collection_bits(const coll_t
& c
)
4861 get_cdir(c
, fn
, sizeof(fn
));
4862 dout(15) << "collection_bits " << fn
<< dendl
;
4866 int fd
= ::open(fn
, O_RDONLY
);
4871 get_attrname("bits", n
, PATH_MAX
);
4872 r
= chain_fgetxattr(fd
, n
, (char*)&bits
, sizeof(bits
));
4873 VOID_TEMP_FAILURE_RETRY(::close(fd
));
4879 dout(10) << "collection_bits " << fn
<< " = " << bits
<< dendl
;
4883 int FileStore::collection_list(const coll_t
& c
,
4884 const ghobject_t
& orig_start
,
4885 const ghobject_t
& end
,
4887 vector
<ghobject_t
> *ls
, ghobject_t
*next
)
4889 ghobject_t start
= orig_start
;
4893 ghobject_t temp_next
;
4896 // figure out the pool id. we need this in order to generate a
4897 // meaningful 'next' value.
4902 if (c
.is_temp(&pgid
)) {
4903 pool
= -2 - pgid
.pool();
4905 } else if (c
.is_pg(&pgid
)) {
4908 } else if (c
.is_meta()) {
4910 shard
= shard_id_t::NO_SHARD
;
4912 // hrm, the caller is test code! we should get kill it off. for now,
4915 shard
= shard_id_t::NO_SHARD
;
4917 dout(20) << __func__
<< " pool is " << pool
<< " shard is " << shard
4918 << " pgid " << pgid
<< dendl
;
4922 sep
.set_shard(shard
);
4923 if (!c
.is_temp() && !c
.is_meta()) {
4925 dout(10) << __func__
<< " first checking temp pool" << dendl
;
4926 coll_t temp
= c
.get_temp();
4927 int r
= collection_list(temp
, start
, end
, max
, ls
, next
);
4930 if (*next
!= ghobject_t::get_max())
4933 dout(10) << __func__
<< " fall through to non-temp collection, start "
4936 dout(10) << __func__
<< " start " << start
<< " >= sep " << sep
<< dendl
;
4941 int r
= get_index(c
, &index
);
4945 assert(NULL
!= index
.index
);
4946 RWLock::RLocker
l((index
.index
)->access_lock
);
4948 r
= index
->collection_list_partial(start
, end
, max
, ls
, next
);
4951 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4954 dout(20) << "objects: " << *ls
<< dendl
;
4956 // HashIndex doesn't know the pool when constructing a 'next' value
4957 if (next
&& !next
->is_max()) {
4958 next
->hobj
.pool
= pool
;
4959 next
->set_shard(shard
);
4960 dout(20) << " next " << *next
<< dendl
;
4966 int FileStore::omap_get(const coll_t
& _c
, const ghobject_t
&hoid
,
4968 map
<string
, bufferlist
> *out
)
4970 tracepoint(objectstore
, omap_get_enter
, _c
.c_str());
4971 const coll_t
& c
= !_need_temp_object_collection(_c
, hoid
) ? _c
: _c
.get_temp();
4972 dout(15) << __func__
<< " " << c
<< "/" << hoid
<< dendl
;
4974 int r
= get_index(c
, &index
);
4978 assert(NULL
!= index
.index
);
4979 RWLock::RLocker
l((index
.index
)->access_lock
);
4980 r
= lfn_find(hoid
, index
);
4984 r
= object_map
->get(hoid
, header
, out
);
4985 if (r
< 0 && r
!= -ENOENT
) {
4986 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4989 tracepoint(objectstore
, omap_get_exit
, 0);
4993 int FileStore::omap_get_header(
4995 const ghobject_t
&hoid
,
4999 tracepoint(objectstore
, omap_get_header_enter
, _c
.c_str());
5000 const coll_t
& c
= !_need_temp_object_collection(_c
, hoid
) ? _c
: _c
.get_temp();
5001 dout(15) << __func__
<< " " << c
<< "/" << hoid
<< dendl
;
5003 int r
= get_index(c
, &index
);
5007 assert(NULL
!= index
.index
);
5008 RWLock::RLocker
l((index
.index
)->access_lock
);
5009 r
= lfn_find(hoid
, index
);
5013 r
= object_map
->get_header(hoid
, bl
);
5014 if (r
< 0 && r
!= -ENOENT
) {
5015 assert(allow_eio
|| !m_filestore_fail_eio
|| r
!= -EIO
);
5018 tracepoint(objectstore
, omap_get_header_exit
, 0);
5022 int FileStore::omap_get_keys(const coll_t
& _c
, const ghobject_t
&hoid
, set
<string
> *keys
)
5024 tracepoint(objectstore
, omap_get_keys_enter
, _c
.c_str());
5025 const coll_t
& c
= !_need_temp_object_collection(_c
, hoid
) ? _c
: _c
.get_temp();
5026 dout(15) << __func__
<< " " << c
<< "/" << hoid
<< dendl
;
5028 int r
= get_index(c
, &index
);
5032 assert(NULL
!= index
.index
);
5033 RWLock::RLocker
l((index
.index
)->access_lock
);
5034 r
= lfn_find(hoid
, index
);
5038 r
= object_map
->get_keys(hoid
, keys
);
5039 if (r
< 0 && r
!= -ENOENT
) {
5040 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
5043 tracepoint(objectstore
, omap_get_keys_exit
, 0);
5047 int FileStore::omap_get_values(const coll_t
& _c
, const ghobject_t
&hoid
,
5048 const set
<string
> &keys
,
5049 map
<string
, bufferlist
> *out
)
5051 tracepoint(objectstore
, omap_get_values_enter
, _c
.c_str());
5052 const coll_t
& c
= !_need_temp_object_collection(_c
, hoid
) ? _c
: _c
.get_temp();
5053 dout(15) << __func__
<< " " << c
<< "/" << hoid
<< dendl
;
5055 const char *where
= "()";
5056 int r
= get_index(c
, &index
);
5058 where
= " (get_index)";
5062 assert(NULL
!= index
.index
);
5063 RWLock::RLocker
l((index
.index
)->access_lock
);
5064 r
= lfn_find(hoid
, index
);
5066 where
= " (lfn_find)";
5070 r
= object_map
->get_values(hoid
, keys
, out
);
5071 if (r
< 0 && r
!= -ENOENT
) {
5072 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
5073 where
= " (get_values)";
5078 tracepoint(objectstore
, omap_get_values_exit
, r
);
5079 dout(15) << __func__
<< " " << c
<< "/" << hoid
<< " = " << r
5084 int FileStore::omap_check_keys(const coll_t
& _c
, const ghobject_t
&hoid
,
5085 const set
<string
> &keys
,
5088 tracepoint(objectstore
, omap_check_keys_enter
, _c
.c_str());
5089 const coll_t
& c
= !_need_temp_object_collection(_c
, hoid
) ? _c
: _c
.get_temp();
5090 dout(15) << __func__
<< " " << c
<< "/" << hoid
<< dendl
;
5093 int r
= get_index(c
, &index
);
5097 assert(NULL
!= index
.index
);
5098 RWLock::RLocker
l((index
.index
)->access_lock
);
5099 r
= lfn_find(hoid
, index
);
5103 r
= object_map
->check_keys(hoid
, keys
, out
);
5104 if (r
< 0 && r
!= -ENOENT
) {
5105 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
5108 tracepoint(objectstore
, omap_check_keys_exit
, 0);
5112 ObjectMap::ObjectMapIterator
FileStore::get_omap_iterator(const coll_t
& _c
,
5113 const ghobject_t
&hoid
)
5115 tracepoint(objectstore
, get_omap_iterator
, _c
.c_str());
5116 const coll_t
& c
= !_need_temp_object_collection(_c
, hoid
) ? _c
: _c
.get_temp();
5117 dout(15) << __func__
<< " " << c
<< "/" << hoid
<< dendl
;
5119 int r
= get_index(c
, &index
);
5121 dout(10) << __func__
<< " " << c
<< "/" << hoid
<< " = 0 "
5122 << "(get_index failed with " << cpp_strerror(r
) << ")" << dendl
;
5123 return ObjectMap::ObjectMapIterator();
5126 assert(NULL
!= index
.index
);
5127 RWLock::RLocker
l((index
.index
)->access_lock
);
5128 r
= lfn_find(hoid
, index
);
5130 dout(10) << __func__
<< " " << c
<< "/" << hoid
<< " = 0 "
5131 << "(lfn_find failed with " << cpp_strerror(r
) << ")" << dendl
;
5132 return ObjectMap::ObjectMapIterator();
5135 return object_map
->get_iterator(hoid
);
5138 int FileStore::_collection_hint_expected_num_objs(const coll_t
& c
, uint32_t pg_num
,
5139 uint64_t expected_num_objs
,
5140 const SequencerPosition
&spos
)
5142 dout(15) << __func__
<< " collection: " << c
<< " pg number: "
5143 << pg_num
<< " expected number of objects: " << expected_num_objs
<< dendl
;
5146 int ret
= collection_empty(c
, &empty
);
5149 if (!empty
&& !replaying
) {
5150 dout(0) << "Failed to give an expected number of objects hint to collection : "
5151 << c
<< ", only empty collection can take such type of hint. " << dendl
;
5156 ret
= get_index(c
, &index
);
5159 // Pre-hash the collection
5160 ret
= index
->pre_hash_collection(pg_num
, expected_num_objs
);
5161 dout(10) << "pre_hash_collection " << c
<< " = " << ret
<< dendl
;
5164 _set_replay_guard(c
, spos
);
5169 int FileStore::_create_collection(
5172 const SequencerPosition
&spos
)
5175 get_cdir(c
, fn
, sizeof(fn
));
5176 dout(15) << "create_collection " << fn
<< dendl
;
5177 int r
= ::mkdir(fn
, 0755);
5180 if (r
== -EEXIST
&& replaying
)
5182 dout(10) << "create_collection " << fn
<< " = " << r
<< dendl
;
5189 r
= _collection_set_bits(c
, bits
);
5192 // create parallel temp collection, too
5193 if (!c
.is_meta() && !c
.is_temp()) {
5194 coll_t temp
= c
.get_temp();
5195 r
= _create_collection(temp
, 0, spos
);
5200 _set_replay_guard(c
, spos
);
5204 int FileStore::_destroy_collection(const coll_t
& c
)
5208 get_cdir(c
, fn
, sizeof(fn
));
5209 dout(15) << "_destroy_collection " << fn
<< dendl
;
5212 r
= get_index(c
, &from
);
5215 assert(NULL
!= from
.index
);
5216 RWLock::WLocker
l((from
.index
)->access_lock
);
5218 r
= from
->prep_delete();
5229 // destroy parallel temp collection, too
5230 if (!c
.is_meta() && !c
.is_temp()) {
5231 coll_t temp
= c
.get_temp();
5232 int r2
= _destroy_collection(temp
);
5240 dout(10) << "_destroy_collection " << fn
<< " = " << r
<< dendl
;
5245 int FileStore::_collection_add(const coll_t
& c
, const coll_t
& oldcid
, const ghobject_t
& o
,
5246 const SequencerPosition
& spos
)
5248 dout(15) << "collection_add " << c
<< "/" << o
<< " from " << oldcid
<< "/" << o
<< dendl
;
5250 int dstcmp
= _check_replay_guard(c
, o
, spos
);
5254 // check the src name too; it might have a newer guard, and we don't
5255 // want to clobber it
5256 int srccmp
= _check_replay_guard(oldcid
, o
, spos
);
5260 // open guard on object so we don't any previous operations on the
5261 // new name that will modify the source inode.
5263 int r
= lfn_open(oldcid
, o
, 0, &fd
);
5265 // the source collection/object does not exist. If we are replaying, we
5266 // should be safe, so just return 0 and move on.
5268 dout(10) << "collection_add " << c
<< "/" << o
<< " from "
5269 << oldcid
<< "/" << o
<< " (dne, continue replay) " << dendl
;
5272 if (dstcmp
> 0) { // if dstcmp == 0 the guard already says "in-progress"
5273 _set_replay_guard(**fd
, spos
, &o
, true);
5276 r
= lfn_link(oldcid
, c
, o
, o
);
5277 if (replaying
&& !backend
->can_checkpoint() &&
5278 r
== -EEXIST
) // crashed between link() and set_replay_guard()
5283 // close guard on object so we don't do this again
5285 _close_replay_guard(**fd
, spos
);
5289 dout(10) << "collection_add " << c
<< "/" << o
<< " from " << oldcid
<< "/" << o
<< " = " << r
<< dendl
;
5293 int FileStore::_collection_move_rename(const coll_t
& oldcid
, const ghobject_t
& oldoid
,
5294 coll_t c
, const ghobject_t
& o
,
5295 const SequencerPosition
& spos
,
5298 dout(15) << __func__
<< " " << c
<< "/" << o
<< " from " << oldcid
<< "/" << oldoid
<< dendl
;
5303 /* If the destination collection doesn't exist during replay,
5304 * we need to delete the src object and continue on
5306 if (!collection_exists(c
))
5310 dstcmp
= _check_replay_guard(c
, o
, spos
);
5314 // check the src name too; it might have a newer guard, and we don't
5315 // want to clobber it
5316 srccmp
= _check_replay_guard(oldcid
, oldoid
, spos
);
5321 // open guard on object so we don't any previous operations on the
5322 // new name that will modify the source inode.
5324 r
= lfn_open(oldcid
, oldoid
, 0, &fd
);
5326 // the source collection/object does not exist. If we are replaying, we
5327 // should be safe, so just return 0 and move on.
5329 dout(10) << __func__
<< " " << c
<< "/" << o
<< " from "
5330 << oldcid
<< "/" << oldoid
<< " (dne, continue replay) " << dendl
;
5331 } else if (allow_enoent
) {
5332 dout(10) << __func__
<< " " << c
<< "/" << o
<< " from "
5333 << oldcid
<< "/" << oldoid
<< " (dne, ignoring enoent)"
5336 assert(0 == "ERROR: source must exist");
5342 if (allow_enoent
&& dstcmp
> 0) { // if dstcmp == 0, try_rename was started.
5346 r
= 0; // don't know if object_map was cloned
5348 if (dstcmp
> 0) { // if dstcmp == 0 the guard already says "in-progress"
5349 _set_replay_guard(**fd
, spos
, &o
, true);
5352 r
= lfn_link(oldcid
, c
, oldoid
, o
);
5353 if (replaying
&& !backend
->can_checkpoint() &&
5354 r
== -EEXIST
) // crashed between link() and set_replay_guard()
5364 // the name changed; link the omap content
5365 r
= object_map
->rename(oldoid
, o
, &spos
);
5373 r
= lfn_unlink(oldcid
, oldoid
, spos
, true);
5376 r
= lfn_open(c
, o
, 0, &fd
);
5378 // close guard on object so we don't do this again
5380 _close_replay_guard(**fd
, spos
, &o
);
5385 dout(10) << __func__
<< " " << c
<< "/" << o
<< " from " << oldcid
<< "/" << oldoid
5386 << " = " << r
<< dendl
;
5391 if (_check_replay_guard(oldcid
, oldoid
, spos
) > 0) {
5392 r
= lfn_unlink(oldcid
, oldoid
, spos
, true);
5395 dout(10) << __func__
<< " " << c
<< "/" << o
<< " from " << oldcid
<< "/" << oldoid
5396 << " = " << r
<< dendl
;
5400 void FileStore::_inject_failure()
5402 if (m_filestore_kill_at
.read()) {
5403 int final
= m_filestore_kill_at
.dec();
5404 dout(5) << "_inject_failure " << (final
+1) << " -> " << final
<< dendl
;
5406 derr
<< "_inject_failure KILLING" << dendl
;
5413 int FileStore::_omap_clear(const coll_t
& cid
, const ghobject_t
&hoid
,
5414 const SequencerPosition
&spos
) {
5415 dout(15) << __func__
<< " " << cid
<< "/" << hoid
<< dendl
;
5417 int r
= get_index(cid
, &index
);
5421 assert(NULL
!= index
.index
);
5422 RWLock::RLocker
l((index
.index
)->access_lock
);
5423 r
= lfn_find(hoid
, index
);
5427 r
= object_map
->clear_keys_header(hoid
, &spos
);
5428 if (r
< 0 && r
!= -ENOENT
)
5433 int FileStore::_omap_setkeys(const coll_t
& cid
, const ghobject_t
&hoid
,
5434 const map
<string
, bufferlist
> &aset
,
5435 const SequencerPosition
&spos
) {
5436 dout(15) << __func__
<< " " << cid
<< "/" << hoid
<< dendl
;
5439 //treat pgmeta as a logical object, skip to check exist
5440 if (hoid
.is_pgmeta())
5443 r
= get_index(cid
, &index
);
5445 dout(20) << __func__
<< " get_index got " << cpp_strerror(r
) << dendl
;
5449 assert(NULL
!= index
.index
);
5450 RWLock::RLocker
l((index
.index
)->access_lock
);
5451 r
= lfn_find(hoid
, index
);
5453 dout(20) << __func__
<< " lfn_find got " << cpp_strerror(r
) << dendl
;
5458 if (g_conf
->subsys
.should_gather(ceph_subsys_filestore
, 20)) {
5459 for (auto& p
: aset
) {
5460 dout(20) << __func__
<< " set " << p
.first
<< dendl
;
5463 r
= object_map
->set_keys(hoid
, aset
, &spos
);
5464 dout(20) << __func__
<< " " << cid
<< "/" << hoid
<< " = " << r
<< dendl
;
5468 int FileStore::_omap_rmkeys(const coll_t
& cid
, const ghobject_t
&hoid
,
5469 const set
<string
> &keys
,
5470 const SequencerPosition
&spos
) {
5471 dout(15) << __func__
<< " " << cid
<< "/" << hoid
<< dendl
;
5474 //treat pgmeta as a logical object, skip to check exist
5475 if (hoid
.is_pgmeta())
5478 r
= get_index(cid
, &index
);
5482 assert(NULL
!= index
.index
);
5483 RWLock::RLocker
l((index
.index
)->access_lock
);
5484 r
= lfn_find(hoid
, index
);
5489 r
= object_map
->rm_keys(hoid
, keys
, &spos
);
5490 if (r
< 0 && r
!= -ENOENT
)
5495 int FileStore::_omap_rmkeyrange(const coll_t
& cid
, const ghobject_t
&hoid
,
5496 const string
& first
, const string
& last
,
5497 const SequencerPosition
&spos
) {
5498 dout(15) << __func__
<< " " << cid
<< "/" << hoid
<< " [" << first
<< "," << last
<< "]" << dendl
;
5501 ObjectMap::ObjectMapIterator iter
= get_omap_iterator(cid
, hoid
);
5504 for (iter
->lower_bound(first
); iter
->valid() && iter
->key() < last
;
5506 keys
.insert(iter
->key());
5509 return _omap_rmkeys(cid
, hoid
, keys
, spos
);
5512 int FileStore::_omap_setheader(const coll_t
& cid
, const ghobject_t
&hoid
,
5513 const bufferlist
&bl
,
5514 const SequencerPosition
&spos
)
5516 dout(15) << __func__
<< " " << cid
<< "/" << hoid
<< dendl
;
5518 int r
= get_index(cid
, &index
);
5522 assert(NULL
!= index
.index
);
5523 RWLock::RLocker
l((index
.index
)->access_lock
);
5524 r
= lfn_find(hoid
, index
);
5528 return object_map
->set_header(hoid
, bl
, &spos
);
5531 int FileStore::_split_collection(const coll_t
& cid
,
5535 const SequencerPosition
&spos
)
5539 dout(15) << __func__
<< " " << cid
<< " bits: " << bits
<< dendl
;
5540 if (!collection_exists(cid
)) {
5541 dout(2) << __func__
<< ": " << cid
<< " DNE" << dendl
;
5545 if (!collection_exists(dest
)) {
5546 dout(2) << __func__
<< ": " << dest
<< " DNE" << dendl
;
5551 int dstcmp
= _check_replay_guard(dest
, spos
);
5555 int srccmp
= _check_replay_guard(cid
, spos
);
5559 _set_global_replay_guard(cid
, spos
);
5560 _set_replay_guard(cid
, spos
, true);
5561 _set_replay_guard(dest
, spos
, true);
5564 r
= get_index(cid
, &from
);
5568 r
= get_index(dest
, &to
);
5571 assert(NULL
!= from
.index
);
5572 RWLock::WLocker
l1((from
.index
)->access_lock
);
5574 assert(NULL
!= to
.index
);
5575 RWLock::WLocker
l2((to
.index
)->access_lock
);
5577 r
= from
->split(rem
, bits
, to
.index
);
5580 _close_replay_guard(cid
, spos
);
5581 _close_replay_guard(dest
, spos
);
5583 _collection_set_bits(cid
, bits
);
5584 if (!r
&& cct
->_conf
->filestore_debug_verify_split
) {
5585 vector
<ghobject_t
> objects
;
5590 next
, ghobject_t::get_max(),
5591 get_ideal_list_max(),
5594 if (objects
.empty())
5596 for (vector
<ghobject_t
>::iterator i
= objects
.begin();
5599 dout(20) << __func__
<< ": " << *i
<< " still in source "
5601 assert(!i
->match(bits
, rem
));
5605 next
= ghobject_t();
5609 next
, ghobject_t::get_max(),
5610 get_ideal_list_max(),
5613 if (objects
.empty())
5615 for (vector
<ghobject_t
>::iterator i
= objects
.begin();
5618 dout(20) << __func__
<< ": " << *i
<< " now in dest "
5620 assert(i
->match(bits
, rem
));
5628 int FileStore::_set_alloc_hint(const coll_t
& cid
, const ghobject_t
& oid
,
5629 uint64_t expected_object_size
,
5630 uint64_t expected_write_size
)
5632 dout(15) << "set_alloc_hint " << cid
<< "/" << oid
<< " object_size " << expected_object_size
<< " write_size " << expected_write_size
<< dendl
;
5637 if (expected_object_size
== 0 || expected_write_size
== 0)
5640 ret
= lfn_open(cid
, oid
, false, &fd
);
5645 // TODO: a more elaborate hint calculation
5646 uint64_t hint
= MIN(expected_write_size
, m_filestore_max_alloc_hint_size
);
5648 ret
= backend
->set_alloc_hint(**fd
, hint
);
5649 dout(20) << "set_alloc_hint hint " << hint
<< " ret " << ret
<< dendl
;
5654 dout(10) << "set_alloc_hint " << cid
<< "/" << oid
<< " object_size " << expected_object_size
<< " write_size " << expected_write_size
<< " = " << ret
<< dendl
;
5655 assert(!m_filestore_fail_eio
|| ret
!= -EIO
);
5659 const char** FileStore::get_tracked_conf_keys() const
5661 static const char* KEYS
[] = {
5662 "filestore_max_inline_xattr_size",
5663 "filestore_max_inline_xattr_size_xfs",
5664 "filestore_max_inline_xattr_size_btrfs",
5665 "filestore_max_inline_xattr_size_other",
5666 "filestore_max_inline_xattrs",
5667 "filestore_max_inline_xattrs_xfs",
5668 "filestore_max_inline_xattrs_btrfs",
5669 "filestore_max_inline_xattrs_other",
5670 "filestore_max_xattr_value_size",
5671 "filestore_max_xattr_value_size_xfs",
5672 "filestore_max_xattr_value_size_btrfs",
5673 "filestore_max_xattr_value_size_other",
5674 "filestore_min_sync_interval",
5675 "filestore_max_sync_interval",
5676 "filestore_queue_max_ops",
5677 "filestore_queue_max_bytes",
5678 "filestore_expected_throughput_bytes",
5679 "filestore_expected_throughput_ops",
5680 "filestore_queue_low_threshhold",
5681 "filestore_queue_high_threshhold",
5682 "filestore_queue_high_delay_multiple",
5683 "filestore_queue_max_delay_multiple",
5684 "filestore_commit_timeout",
5685 "filestore_dump_file",
5686 "filestore_kill_at",
5687 "filestore_fail_eio",
5688 "filestore_fadvise",
5689 "filestore_sloppy_crc",
5690 "filestore_sloppy_crc_block_size",
5691 "filestore_max_alloc_hint_size",
5697 void FileStore::handle_conf_change(const struct md_config_t
*conf
,
5698 const std::set
<std::string
> &changed
)
5700 if (changed
.count("filestore_max_inline_xattr_size") ||
5701 changed
.count("filestore_max_inline_xattr_size_xfs") ||
5702 changed
.count("filestore_max_inline_xattr_size_btrfs") ||
5703 changed
.count("filestore_max_inline_xattr_size_other") ||
5704 changed
.count("filestore_max_inline_xattrs") ||
5705 changed
.count("filestore_max_inline_xattrs_xfs") ||
5706 changed
.count("filestore_max_inline_xattrs_btrfs") ||
5707 changed
.count("filestore_max_inline_xattrs_other") ||
5708 changed
.count("filestore_max_xattr_value_size") ||
5709 changed
.count("filestore_max_xattr_value_size_xfs") ||
5710 changed
.count("filestore_max_xattr_value_size_btrfs") ||
5711 changed
.count("filestore_max_xattr_value_size_other")) {
5713 Mutex::Locker
l(lock
);
5714 set_xattr_limits_via_conf();
5718 if (changed
.count("filestore_queue_max_bytes") ||
5719 changed
.count("filestore_queue_max_ops") ||
5720 changed
.count("filestore_expected_throughput_bytes") ||
5721 changed
.count("filestore_expected_throughput_ops") ||
5722 changed
.count("filestore_queue_low_threshhold") ||
5723 changed
.count("filestore_queue_high_threshhold") ||
5724 changed
.count("filestore_queue_high_delay_multiple") ||
5725 changed
.count("filestore_queue_max_delay_multiple")) {
5726 Mutex::Locker
l(lock
);
5727 set_throttle_params();
5730 if (changed
.count("filestore_min_sync_interval") ||
5731 changed
.count("filestore_max_sync_interval") ||
5732 changed
.count("filestore_kill_at") ||
5733 changed
.count("filestore_fail_eio") ||
5734 changed
.count("filestore_sloppy_crc") ||
5735 changed
.count("filestore_sloppy_crc_block_size") ||
5736 changed
.count("filestore_max_alloc_hint_size") ||
5737 changed
.count("filestore_fadvise")) {
5738 Mutex::Locker
l(lock
);
5739 m_filestore_min_sync_interval
= conf
->filestore_min_sync_interval
;
5740 m_filestore_max_sync_interval
= conf
->filestore_max_sync_interval
;
5741 m_filestore_kill_at
.set(conf
->filestore_kill_at
);
5742 m_filestore_fail_eio
= conf
->filestore_fail_eio
;
5743 m_filestore_fadvise
= conf
->filestore_fadvise
;
5744 m_filestore_sloppy_crc
= conf
->filestore_sloppy_crc
;
5745 m_filestore_sloppy_crc_block_size
= conf
->filestore_sloppy_crc_block_size
;
5746 m_filestore_max_alloc_hint_size
= conf
->filestore_max_alloc_hint_size
;
5748 if (changed
.count("filestore_commit_timeout")) {
5749 Mutex::Locker
l(sync_entry_timeo_lock
);
5750 m_filestore_commit_timeout
= conf
->filestore_commit_timeout
;
5752 if (changed
.count("filestore_dump_file")) {
5753 if (conf
->filestore_dump_file
.length() &&
5754 conf
->filestore_dump_file
!= "-") {
5755 dump_start(conf
->filestore_dump_file
);
5762 int FileStore::set_throttle_params()
5765 bool valid
= throttle_bytes
.set_params(
5766 cct
->_conf
->filestore_queue_low_threshhold
,
5767 cct
->_conf
->filestore_queue_high_threshhold
,
5768 cct
->_conf
->filestore_expected_throughput_bytes
,
5769 cct
->_conf
->filestore_queue_high_delay_multiple
,
5770 cct
->_conf
->filestore_queue_max_delay_multiple
,
5771 cct
->_conf
->filestore_queue_max_bytes
,
5774 valid
&= throttle_ops
.set_params(
5775 cct
->_conf
->filestore_queue_low_threshhold
,
5776 cct
->_conf
->filestore_queue_high_threshhold
,
5777 cct
->_conf
->filestore_expected_throughput_ops
,
5778 cct
->_conf
->filestore_queue_high_delay_multiple
,
5779 cct
->_conf
->filestore_queue_max_delay_multiple
,
5780 cct
->_conf
->filestore_queue_max_ops
,
5783 logger
->set(l_filestore_op_queue_max_ops
, throttle_ops
.get_max());
5784 logger
->set(l_filestore_op_queue_max_bytes
, throttle_bytes
.get_max());
5787 derr
<< "tried to set invalid params: "
5791 return valid
? 0 : -EINVAL
;
5794 void FileStore::dump_start(const std::string
& file
)
5796 dout(10) << "dump_start " << file
<< dendl
;
5797 if (m_filestore_do_dump
) {
5800 m_filestore_dump_fmt
.reset();
5801 m_filestore_dump_fmt
.open_array_section("dump");
5802 m_filestore_dump
.open(file
.c_str());
5803 m_filestore_do_dump
= true;
5806 void FileStore::dump_stop()
5808 dout(10) << "dump_stop" << dendl
;
5809 m_filestore_do_dump
= false;
5810 if (m_filestore_dump
.is_open()) {
5811 m_filestore_dump_fmt
.close_section();
5812 m_filestore_dump_fmt
.flush(m_filestore_dump
);
5813 m_filestore_dump
.flush();
5814 m_filestore_dump
.close();
5818 void FileStore::dump_transactions(vector
<ObjectStore::Transaction
>& ls
, uint64_t seq
, OpSequencer
*osr
)
5820 m_filestore_dump_fmt
.open_array_section("transactions");
5821 unsigned trans_num
= 0;
5822 for (vector
<ObjectStore::Transaction
>::iterator i
= ls
.begin(); i
!= ls
.end(); ++i
, ++trans_num
) {
5823 m_filestore_dump_fmt
.open_object_section("transaction");
5824 m_filestore_dump_fmt
.dump_string("osr", osr
->get_name());
5825 m_filestore_dump_fmt
.dump_unsigned("seq", seq
);
5826 m_filestore_dump_fmt
.dump_unsigned("trans_num", trans_num
);
5827 (*i
).dump(&m_filestore_dump_fmt
);
5828 m_filestore_dump_fmt
.close_section();
5830 m_filestore_dump_fmt
.close_section();
5831 m_filestore_dump_fmt
.flush(m_filestore_dump
);
5832 m_filestore_dump
.flush();
5835 void FileStore::set_xattr_limits_via_conf()
5837 uint32_t fs_xattr_size
;
5839 uint32_t fs_xattr_max_value_size
;
5841 switch (m_fs_type
) {
5842 #if defined(__linux__)
5843 case XFS_SUPER_MAGIC
:
5844 fs_xattr_size
= cct
->_conf
->filestore_max_inline_xattr_size_xfs
;
5845 fs_xattrs
= cct
->_conf
->filestore_max_inline_xattrs_xfs
;
5846 fs_xattr_max_value_size
= cct
->_conf
->filestore_max_xattr_value_size_xfs
;
5848 case BTRFS_SUPER_MAGIC
:
5849 fs_xattr_size
= cct
->_conf
->filestore_max_inline_xattr_size_btrfs
;
5850 fs_xattrs
= cct
->_conf
->filestore_max_inline_xattrs_btrfs
;
5851 fs_xattr_max_value_size
= cct
->_conf
->filestore_max_xattr_value_size_btrfs
;
5855 fs_xattr_size
= cct
->_conf
->filestore_max_inline_xattr_size_other
;
5856 fs_xattrs
= cct
->_conf
->filestore_max_inline_xattrs_other
;
5857 fs_xattr_max_value_size
= cct
->_conf
->filestore_max_xattr_value_size_other
;
5861 // Use override value if set
5862 if (cct
->_conf
->filestore_max_inline_xattr_size
)
5863 m_filestore_max_inline_xattr_size
= cct
->_conf
->filestore_max_inline_xattr_size
;
5865 m_filestore_max_inline_xattr_size
= fs_xattr_size
;
5867 // Use override value if set
5868 if (cct
->_conf
->filestore_max_inline_xattrs
)
5869 m_filestore_max_inline_xattrs
= cct
->_conf
->filestore_max_inline_xattrs
;
5871 m_filestore_max_inline_xattrs
= fs_xattrs
;
5873 // Use override value if set
5874 if (cct
->_conf
->filestore_max_xattr_value_size
)
5875 m_filestore_max_xattr_value_size
= cct
->_conf
->filestore_max_xattr_value_size
;
5877 m_filestore_max_xattr_value_size
= fs_xattr_max_value_size
;
5879 if (m_filestore_max_xattr_value_size
< cct
->_conf
->osd_max_object_name_len
) {
5880 derr
<< "WARNING: max attr value size ("
5881 << m_filestore_max_xattr_value_size
5882 << ") is smaller than osd_max_object_name_len ("
5883 << cct
->_conf
->osd_max_object_name_len
5884 << "). Your backend filesystem appears to not support attrs large "
5885 << "enough to handle the configured max rados name size. You may get "
5886 << "unexpected ENAMETOOLONG errors on rados operations or buggy "
5892 uint64_t FileStore::estimate_objects_overhead(uint64_t num_objects
)
5894 uint64_t res
= num_objects
* blk_size
/ 2; //assumes that each object uses ( in average ) additional 1/2 block due to FS allocation granularity.
5898 int FileStore::apply_layout_settings(const coll_t
&cid
)
5900 dout(20) << __func__
<< " " << cid
<< dendl
;
5902 int r
= get_index(cid
, &index
);
5904 dout(10) << "Error getting index for " << cid
<< ": " << cpp_strerror(r
)
5909 return index
->apply_layout_settings();
5913 // -- FSSuperblock --
5915 void FSSuperblock::encode(bufferlist
&bl
) const
5917 ENCODE_START(2, 1, bl
);
5918 compat_features
.encode(bl
);
5919 ::encode(omap_backend
, bl
);
5923 void FSSuperblock::decode(bufferlist::iterator
&bl
)
5925 DECODE_START(2, bl
);
5926 compat_features
.decode(bl
);
5928 ::decode(omap_backend
, bl
);
5930 omap_backend
= "leveldb";
5934 void FSSuperblock::dump(Formatter
*f
) const
5936 f
->open_object_section("compat");
5937 compat_features
.dump(f
);
5938 f
->dump_string("omap_backend", omap_backend
);
5942 void FSSuperblock::generate_test_instances(list
<FSSuperblock
*>& o
)
5945 o
.push_back(new FSSuperblock(z
));
5946 CompatSet::FeatureSet feature_compat
;
5947 CompatSet::FeatureSet feature_ro_compat
;
5948 CompatSet::FeatureSet feature_incompat
;
5949 feature_incompat
.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS
);
5950 z
.compat_features
= CompatSet(feature_compat
, feature_ro_compat
,
5952 o
.push_back(new FSSuperblock(z
));
5953 z
.omap_backend
= "rocksdb";
5954 o
.push_back(new FSSuperblock(z
));