1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
15 #include "include/compat.h"
16 #include "include/int_types.h"
17 #include "boost/tuple/tuple.hpp"
21 #include <sys/types.h>
27 #include <sys/ioctl.h>
29 #if defined(__linux__)
36 #include "include/linux_fiemap.h"
38 #include "common/xattr.h"
39 #include "chain_xattr.h"
41 #if defined(DARWIN) || defined(__FreeBSD__)
42 #include <sys/param.h>
43 #include <sys/mount.h>
50 #include "FileStore.h"
51 #include "GenericFileStoreBackend.h"
52 #include "BtrfsFileStoreBackend.h"
53 #include "XfsFileStoreBackend.h"
54 #include "ZFSFileStoreBackend.h"
55 #include "common/BackTrace.h"
56 #include "include/types.h"
57 #include "FileJournal.h"
59 #include "osd/osd_types.h"
60 #include "include/color.h"
61 #include "include/buffer.h"
63 #include "common/Timer.h"
64 #include "common/debug.h"
65 #include "common/errno.h"
66 #include "common/run_cmd.h"
67 #include "common/safe_io.h"
68 #include "common/perf_counters.h"
69 #include "common/sync_filesystem.h"
70 #include "common/fd.h"
71 #include "HashIndex.h"
72 #include "DBObjectMap.h"
73 #include "kv/KeyValueDB.h"
75 #include "common/ceph_crypto.h"
76 using ceph::crypto::SHA1
;
78 #include "include/assert.h"
80 #include "common/config.h"
81 #include "common/blkdev.h"
84 #define TRACEPOINT_DEFINE
85 #define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
86 #include "tracing/objectstore.h"
87 #undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE
88 #undef TRACEPOINT_DEFINE
90 #define tracepoint(...)
93 #define dout_context cct
94 #define dout_subsys ceph_subsys_filestore
96 #define dout_prefix *_dout << "filestore(" << basedir << ") "
98 #define COMMIT_SNAP_ITEM "snap_%llu"
99 #define CLUSTER_SNAP_ITEM "clustersnap_%s"
101 #define REPLAY_GUARD_XATTR "user.cephos.seq"
102 #define GLOBAL_REPLAY_GUARD_XATTR "user.cephos.gseq"
104 // XATTR_SPILL_OUT_NAME as a xattr is used to maintain that indicates whether
105 // xattrs spill over into DBObjectMap, if XATTR_SPILL_OUT_NAME exists in file
106 // xattrs and the value is "no", it indicates no xattrs in DBObjectMap
107 #define XATTR_SPILL_OUT_NAME "user.cephos.spill_out"
108 #define XATTR_NO_SPILL_OUT "0"
109 #define XATTR_SPILL_OUT "1"
110 #define __FUNC__ __func__ << "(" << __LINE__ << ")"
112 //Initial features in new superblock.
113 static CompatSet
get_fs_initial_compat_set() {
114 CompatSet::FeatureSet ceph_osd_feature_compat
;
115 CompatSet::FeatureSet ceph_osd_feature_ro_compat
;
116 CompatSet::FeatureSet ceph_osd_feature_incompat
;
117 return CompatSet(ceph_osd_feature_compat
, ceph_osd_feature_ro_compat
,
118 ceph_osd_feature_incompat
);
121 //Features are added here that this FileStore supports.
122 static CompatSet
get_fs_supported_compat_set() {
123 CompatSet compat
= get_fs_initial_compat_set();
124 //Any features here can be set in code, but not in initial superblock
125 compat
.incompat
.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS
);
129 int FileStore::validate_hobject_key(const hobject_t
&obj
) const
131 unsigned len
= LFNIndex::get_max_escaped_name_len(obj
);
132 return len
> m_filestore_max_xattr_value_size
? -ENAMETOOLONG
: 0;
135 int FileStore::get_block_device_fsid(CephContext
* cct
, const string
& path
,
138 // make sure we don't try to use aio or direct_io (and get annoying
139 // error messages from failing to do so); performance implications
140 // should be irrelevant for this use
141 FileJournal
j(cct
, *fsid
, 0, 0, path
.c_str(), false, false);
142 return j
.peek_fsid(*fsid
);
145 void FileStore::FSPerfTracker::update_from_perfcounters(
146 PerfCounters
&logger
)
148 os_commit_latency
.consume_next(
150 l_filestore_journal_latency
));
151 os_apply_latency
.consume_next(
153 l_filestore_apply_latency
));
157 ostream
& operator<<(ostream
& out
, const FileStore::OpSequencer
& s
)
159 return out
<< *s
.parent
;
162 int FileStore::get_cdir(const coll_t
& cid
, char *s
, int len
)
164 const string
&cid_str(cid
.to_str());
165 return snprintf(s
, len
, "%s/current/%s", basedir
.c_str(), cid_str
.c_str());
168 int FileStore::get_index(const coll_t
& cid
, Index
*index
)
170 int r
= index_manager
.get_index(cid
, basedir
, index
);
171 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
175 int FileStore::init_index(const coll_t
& cid
)
178 get_cdir(cid
, path
, sizeof(path
));
179 int r
= index_manager
.init_index(cid
, path
, target_version
);
180 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
184 int FileStore::lfn_find(const ghobject_t
& oid
, const Index
& index
, IndexedPath
*path
)
190 assert(NULL
!= index
.index
);
191 r
= (index
.index
)->lookup(oid
, path
, &exist
);
193 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
201 int FileStore::lfn_truncate(const coll_t
& cid
, const ghobject_t
& oid
, off_t length
)
204 int r
= lfn_open(cid
, oid
, false, &fd
);
207 r
= ::ftruncate(**fd
, length
);
210 if (r
>= 0 && m_filestore_sloppy_crc
) {
211 int rc
= backend
->_crc_update_truncate(**fd
, length
);
215 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
219 int FileStore::lfn_stat(const coll_t
& cid
, const ghobject_t
& oid
, struct stat
*buf
)
223 int r
= get_index(cid
, &index
);
227 assert(NULL
!= index
.index
);
228 RWLock::RLocker
l((index
.index
)->access_lock
);
230 r
= lfn_find(oid
, index
, &path
);
233 r
= ::stat(path
->path(), buf
);
239 int FileStore::lfn_open(const coll_t
& cid
,
240 const ghobject_t
& oid
,
247 bool need_lock
= true;
252 if (cct
->_conf
->filestore_odsync_write
) {
260 if (!((*index
).index
)) {
261 r
= get_index(cid
, index
);
263 dout(10) << __FUNC__
<< ": could not get index r = " << r
<< dendl
;
271 assert(NULL
!= (*index
).index
);
273 ((*index
).index
)->access_lock
.get_write();
276 *outfd
= fdcache
.lookup(oid
);
279 ((*index
).index
)->access_lock
.put_write();
287 IndexedPath
*path
= &path2
;
289 r
= (*index
)->lookup(oid
, path
, &exist
);
291 derr
<< "could not find " << oid
<< " in index: "
292 << cpp_strerror(-r
) << dendl
;
296 r
= ::open((*path
)->path(), flags
, 0644);
299 dout(10) << "error opening file " << (*path
)->path() << " with flags="
300 << flags
<< ": " << cpp_strerror(-r
) << dendl
;
304 if (create
&& (!exist
)) {
305 r
= (*index
)->created(oid
, (*path
)->path());
307 VOID_TEMP_FAILURE_RETRY(::close(fd
));
308 derr
<< "error creating " << oid
<< " (" << (*path
)->path()
309 << ") in index: " << cpp_strerror(-r
) << dendl
;
312 r
= chain_fsetxattr
<true, true>(
313 fd
, XATTR_SPILL_OUT_NAME
,
314 XATTR_NO_SPILL_OUT
, sizeof(XATTR_NO_SPILL_OUT
));
316 VOID_TEMP_FAILURE_RETRY(::close(fd
));
317 derr
<< "error setting spillout xattr for oid " << oid
<< " (" << (*path
)->path()
318 << "):" << cpp_strerror(-r
) << dendl
;
325 *outfd
= fdcache
.add(oid
, fd
, &existed
);
327 TEMP_FAILURE_RETRY(::close(fd
));
330 *outfd
= std::make_shared
<FDCache::FD
>(fd
);
334 ((*index
).index
)->access_lock
.put_write();
342 ((*index
).index
)->access_lock
.put_write();
345 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
349 void FileStore::lfn_close(FDRef fd
)
353 int FileStore::lfn_link(const coll_t
& c
, const coll_t
& newcid
, const ghobject_t
& o
, const ghobject_t
& newoid
)
355 Index index_new
, index_old
;
356 IndexedPath path_new
, path_old
;
359 bool index_same
= false;
361 r
= get_index(newcid
, &index_new
);
364 r
= get_index(c
, &index_old
);
367 } else if (c
== newcid
) {
368 r
= get_index(c
, &index_old
);
371 index_new
= index_old
;
374 r
= get_index(c
, &index_old
);
377 r
= get_index(newcid
, &index_new
);
382 assert(NULL
!= index_old
.index
);
383 assert(NULL
!= index_new
.index
);
387 RWLock::RLocker
l1((index_old
.index
)->access_lock
);
389 r
= index_old
->lookup(o
, &path_old
, &exist
);
391 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
397 RWLock::WLocker
l2((index_new
.index
)->access_lock
);
399 r
= index_new
->lookup(newoid
, &path_new
, &exist
);
401 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
407 dout(25) << __FUNC__
<< ": path_old: " << path_old
<< dendl
;
408 dout(25) << __FUNC__
<< ": path_new: " << path_new
<< dendl
;
409 r
= ::link(path_old
->path(), path_new
->path());
413 r
= index_new
->created(newoid
, path_new
->path());
415 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
419 RWLock::WLocker
l1((index_old
.index
)->access_lock
);
421 r
= index_old
->lookup(o
, &path_old
, &exist
);
423 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
429 r
= index_new
->lookup(newoid
, &path_new
, &exist
);
431 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
437 dout(25) << __FUNC__
<< ": path_old: " << path_old
<< dendl
;
438 dout(25) << __FUNC__
<< ": path_new: " << path_new
<< dendl
;
439 r
= ::link(path_old
->path(), path_new
->path());
443 // make sure old fd for unlinked/overwritten file is gone
444 fdcache
.clear(newoid
);
446 r
= index_new
->created(newoid
, path_new
->path());
448 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
455 int FileStore::lfn_unlink(const coll_t
& cid
, const ghobject_t
& o
,
456 const SequencerPosition
&spos
,
457 bool force_clear_omap
)
460 int r
= get_index(cid
, &index
);
462 dout(25) << __FUNC__
<< ": get_index failed " << cpp_strerror(r
) << dendl
;
466 assert(NULL
!= index
.index
);
467 RWLock::WLocker
l((index
.index
)->access_lock
);
472 r
= index
->lookup(o
, &path
, &hardlink
);
474 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
478 if (!force_clear_omap
) {
479 if (hardlink
== 0 || hardlink
== 1) {
480 force_clear_omap
= true;
483 if (force_clear_omap
) {
484 dout(20) << __FUNC__
<< ": clearing omap on " << o
485 << " in cid " << cid
<< dendl
;
486 r
= object_map
->clear(o
, &spos
);
487 if (r
< 0 && r
!= -ENOENT
) {
488 dout(25) << __FUNC__
<< ": omap clear failed " << cpp_strerror(r
) << dendl
;
489 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
492 if (cct
->_conf
->filestore_debug_inject_read_err
) {
493 debug_obj_on_delete(o
);
495 if (!m_disable_wbthrottle
) {
496 wbthrottle
.clear_object(o
); // should be only non-cache ref
500 /* Ensure that replay of this op doesn't result in the object_map
503 if (!backend
->can_checkpoint())
504 object_map
->sync(&o
, &spos
);
507 if (!m_disable_wbthrottle
) {
508 wbthrottle
.clear_object(o
); // should be only non-cache ref
513 r
= index
->unlink(o
);
515 dout(25) << __FUNC__
<< ": index unlink failed " << cpp_strerror(r
) << dendl
;
521 FileStore::FileStore(CephContext
* cct
, const std::string
&base
,
522 const std::string
&jdev
, osflagbits_t flags
,
523 const char *name
, bool do_update
) :
524 JournalingObjectStore(cct
, base
),
526 basedir(base
), journalpath(jdev
),
527 generic_flags(flags
),
529 fsid_fd(-1), op_fd(-1),
530 basedir_fd(-1), current_fd(-1),
532 index_manager(cct
, do_update
),
533 lock("FileStore::lock"),
535 sync_entry_timeo_lock("FileStore::sync_entry_timeo_lock"),
536 timer(cct
, sync_entry_timeo_lock
),
537 stop(false), sync_thread(this),
541 m_disable_wbthrottle(cct
->_conf
->filestore_odsync_write
||
542 !cct
->_conf
->filestore_wbthrottle_enable
),
543 throttle_ops(cct
, "filestore_ops", cct
->_conf
->filestore_caller_concurrency
),
544 throttle_bytes(cct
, "filestore_bytes", cct
->_conf
->filestore_caller_concurrency
),
545 m_ondisk_finisher_num(cct
->_conf
->filestore_ondisk_finisher_threads
),
546 m_apply_finisher_num(cct
->_conf
->filestore_apply_finisher_threads
),
547 op_tp(cct
, "FileStore::op_tp", "tp_fstore_op", cct
->_conf
->filestore_op_threads
, "filestore_op_threads"),
548 op_wq(this, cct
->_conf
->filestore_op_thread_timeout
,
549 cct
->_conf
->filestore_op_thread_suicide_timeout
, &op_tp
),
551 trace_endpoint("0.0.0.0", 0, "FileStore"),
552 read_error_lock("FileStore::read_error_lock"),
553 m_filestore_commit_timeout(cct
->_conf
->filestore_commit_timeout
),
554 m_filestore_journal_parallel(cct
->_conf
->filestore_journal_parallel
),
555 m_filestore_journal_trailing(cct
->_conf
->filestore_journal_trailing
),
556 m_filestore_journal_writeahead(cct
->_conf
->filestore_journal_writeahead
),
557 m_filestore_fiemap_threshold(cct
->_conf
->filestore_fiemap_threshold
),
558 m_filestore_max_sync_interval(cct
->_conf
->filestore_max_sync_interval
),
559 m_filestore_min_sync_interval(cct
->_conf
->filestore_min_sync_interval
),
560 m_filestore_fail_eio(cct
->_conf
->filestore_fail_eio
),
561 m_filestore_fadvise(cct
->_conf
->filestore_fadvise
),
562 do_update(do_update
),
563 m_journal_dio(cct
->_conf
->journal_dio
),
564 m_journal_aio(cct
->_conf
->journal_aio
),
565 m_journal_force_aio(cct
->_conf
->journal_force_aio
),
566 m_osd_rollback_to_cluster_snap(cct
->_conf
->osd_rollback_to_cluster_snap
),
567 m_osd_use_stale_snap(cct
->_conf
->osd_use_stale_snap
),
568 m_filestore_do_dump(false),
569 m_filestore_dump_fmt(true),
570 m_filestore_sloppy_crc(cct
->_conf
->filestore_sloppy_crc
),
571 m_filestore_sloppy_crc_block_size(cct
->_conf
->filestore_sloppy_crc_block_size
),
572 m_filestore_max_alloc_hint_size(cct
->_conf
->filestore_max_alloc_hint_size
),
574 m_filestore_max_inline_xattr_size(0),
575 m_filestore_max_inline_xattrs(0),
576 m_filestore_max_xattr_value_size(0)
578 m_filestore_kill_at
= cct
->_conf
->filestore_kill_at
;
579 for (int i
= 0; i
< m_ondisk_finisher_num
; ++i
) {
581 oss
<< "filestore-ondisk-" << i
;
582 Finisher
*f
= new Finisher(cct
, oss
.str(), "fn_odsk_fstore");
583 ondisk_finishers
.push_back(f
);
585 for (int i
= 0; i
< m_apply_finisher_num
; ++i
) {
587 oss
<< "filestore-apply-" << i
;
588 Finisher
*f
= new Finisher(cct
, oss
.str(), "fn_appl_fstore");
589 apply_finishers
.push_back(f
);
593 oss
<< basedir
<< "/current";
594 current_fn
= oss
.str();
597 sss
<< basedir
<< "/current/commit_op_seq";
598 current_op_seq_fn
= sss
.str();
601 if (cct
->_conf
->filestore_omap_backend_path
!= "") {
602 omap_dir
= cct
->_conf
->filestore_omap_backend_path
;
604 omss
<< basedir
<< "/current/omap";
605 omap_dir
= omss
.str();
609 PerfCountersBuilder
plb(cct
, internal_name
, l_filestore_first
, l_filestore_last
);
611 plb
.add_u64(l_filestore_journal_queue_ops
, "journal_queue_ops", "Operations in journal queue");
612 plb
.add_u64(l_filestore_journal_ops
, "journal_ops", "Active journal entries to be applied");
613 plb
.add_u64(l_filestore_journal_queue_bytes
, "journal_queue_bytes", "Size of journal queue");
614 plb
.add_u64(l_filestore_journal_bytes
, "journal_bytes", "Active journal operation size to be applied");
615 plb
.add_time_avg(l_filestore_journal_latency
, "journal_latency", "Average journal queue completing latency",
616 NULL
, PerfCountersBuilder::PRIO_USEFUL
);
617 plb
.add_u64_counter(l_filestore_journal_wr
, "journal_wr", "Journal write IOs");
618 plb
.add_u64_avg(l_filestore_journal_wr_bytes
, "journal_wr_bytes", "Journal data written");
619 plb
.add_u64(l_filestore_op_queue_max_ops
, "op_queue_max_ops", "Max operations in writing to FS queue");
620 plb
.add_u64(l_filestore_op_queue_ops
, "op_queue_ops", "Operations in writing to FS queue");
621 plb
.add_u64_counter(l_filestore_ops
, "ops", "Operations written to store");
622 plb
.add_u64(l_filestore_op_queue_max_bytes
, "op_queue_max_bytes", "Max data in writing to FS queue");
623 plb
.add_u64(l_filestore_op_queue_bytes
, "op_queue_bytes", "Size of writing to FS queue");
624 plb
.add_u64_counter(l_filestore_bytes
, "bytes", "Data written to store");
625 plb
.add_time_avg(l_filestore_apply_latency
, "apply_latency", "Apply latency");
626 plb
.add_u64(l_filestore_committing
, "committing", "Is currently committing");
628 plb
.add_u64_counter(l_filestore_commitcycle
, "commitcycle", "Commit cycles");
629 plb
.add_time_avg(l_filestore_commitcycle_interval
, "commitcycle_interval", "Average interval between commits");
630 plb
.add_time_avg(l_filestore_commitcycle_latency
, "commitcycle_latency", "Average latency of commit");
631 plb
.add_u64_counter(l_filestore_journal_full
, "journal_full", "Journal writes while full");
632 plb
.add_time_avg(l_filestore_queue_transaction_latency_avg
, "queue_transaction_latency_avg",
633 "Store operation queue latency", NULL
, PerfCountersBuilder::PRIO_USEFUL
);
634 plb
.add_time(l_filestore_sync_pause_max_lat
, "sync_pause_max_latency", "Max latency of op_wq pause before syncfs");
636 logger
= plb
.create_perf_counters();
638 cct
->get_perfcounters_collection()->add(logger
);
639 cct
->_conf
->add_observer(this);
641 superblock
.compat_features
= get_fs_initial_compat_set();
644 FileStore::~FileStore()
646 for (vector
<Finisher
*>::iterator it
= ondisk_finishers
.begin(); it
!= ondisk_finishers
.end(); ++it
) {
650 for (vector
<Finisher
*>::iterator it
= apply_finishers
.begin(); it
!= apply_finishers
.end(); ++it
) {
654 cct
->_conf
->remove_observer(this);
655 cct
->get_perfcounters_collection()->remove(logger
);
658 journal
->logger
= NULL
;
661 if (m_filestore_do_dump
) {
666 static void get_attrname(const char *name
, char *buf
, int len
)
668 snprintf(buf
, len
, "user.ceph.%s", name
);
671 bool parse_attrname(char **name
)
673 if (strncmp(*name
, "user.ceph.", 10) == 0) {
680 void FileStore::collect_metadata(map
<string
,string
> *pm
)
682 char partition_path
[PATH_MAX
];
683 char dev_node
[PATH_MAX
];
686 (*pm
)["filestore_backend"] = backend
->get_name();
688 ss
<< "0x" << std::hex
<< m_fs_type
<< std::dec
;
689 (*pm
)["filestore_f_type"] = ss
.str();
691 if (cct
->_conf
->filestore_collect_device_partition_information
) {
692 rc
= get_device_by_fd(fsid_fd
, partition_path
, dev_node
, PATH_MAX
);
700 (*pm
)["backend_filestore_partition_path"] = "unknown";
701 (*pm
)["backend_filestore_dev_node"] = "unknown";
704 (*pm
)["backend_filestore_partition_path"] = string(partition_path
);
705 (*pm
)["backend_filestore_dev_node"] = "unknown";
708 (*pm
)["backend_filestore_partition_path"] = string(partition_path
);
709 (*pm
)["backend_filestore_dev_node"] = string(dev_node
);
713 int FileStore::statfs(struct store_statfs_t
*buf0
)
717 if (::statfs(basedir
.c_str(), &buf
) < 0) {
719 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
720 assert(r
!= -ENOENT
);
723 buf0
->total
= buf
.f_blocks
* buf
.f_bsize
;
724 buf0
->available
= buf
.f_bavail
* buf
.f_bsize
;
725 // Adjust for writes pending in the journal
727 uint64_t estimate
= journal
->get_journal_size_estimate();
728 if (buf0
->available
> estimate
)
729 buf0
->available
-= estimate
;
737 void FileStore::new_journal()
739 if (journalpath
.length()) {
740 dout(10) << "open_journal at " << journalpath
<< dendl
;
741 journal
= new FileJournal(cct
, fsid
, &finisher
, &sync_cond
,
743 m_journal_dio
, m_journal_aio
,
744 m_journal_force_aio
);
746 journal
->logger
= logger
;
751 int FileStore::dump_journal(ostream
& out
)
755 if (!journalpath
.length())
758 FileJournal
*journal
= new FileJournal(cct
, fsid
, &finisher
, &sync_cond
, journalpath
.c_str(), m_journal_dio
);
759 r
= journal
->dump(out
);
764 FileStoreBackend
*FileStoreBackend::create(long f_type
, FileStore
*fs
)
767 #if defined(__linux__)
768 case BTRFS_SUPER_MAGIC
:
769 return new BtrfsFileStoreBackend(fs
);
771 case XFS_SUPER_MAGIC
:
772 return new XfsFileStoreBackend(fs
);
776 case ZFS_SUPER_MAGIC
:
777 return new ZFSFileStoreBackend(fs
);
780 return new GenericFileStoreBackend(fs
);
784 void FileStore::create_backend(long f_type
)
788 assert(backend
== NULL
);
789 backend
= FileStoreBackend::create(f_type
, this);
791 dout(0) << "backend " << backend
->get_name()
792 << " (magic 0x" << std::hex
<< f_type
<< std::dec
<< ")"
796 #if defined(__linux__)
797 case BTRFS_SUPER_MAGIC
:
798 if (!m_disable_wbthrottle
){
799 wbthrottle
.set_fs(WBThrottle::BTRFS
);
803 case XFS_SUPER_MAGIC
:
804 // wbthrottle is constructed with fs(WBThrottle::XFS)
809 set_xattr_limits_via_conf();
812 int FileStore::mkfs()
815 char fsid_fn
[PATH_MAX
];
818 uuid_d old_omap_fsid
;
820 dout(1) << "mkfs in " << basedir
<< dendl
;
821 basedir_fd
= ::open(basedir
.c_str(), O_RDONLY
);
822 if (basedir_fd
< 0) {
824 derr
<< __FUNC__
<< ": failed to open base dir " << basedir
<< ": " << cpp_strerror(ret
) << dendl
;
829 snprintf(fsid_fn
, sizeof(fsid_fn
), "%s/fsid", basedir
.c_str());
830 fsid_fd
= ::open(fsid_fn
, O_RDWR
|O_CREAT
, 0644);
833 derr
<< __FUNC__
<< ": failed to open " << fsid_fn
<< ": " << cpp_strerror(ret
) << dendl
;
834 goto close_basedir_fd
;
837 if (lock_fsid() < 0) {
842 if (read_fsid(fsid_fd
, &old_fsid
) < 0 || old_fsid
.is_zero()) {
843 if (fsid
.is_zero()) {
844 fsid
.generate_random();
845 dout(1) << __FUNC__
<< ": generated fsid " << fsid
<< dendl
;
847 dout(1) << __FUNC__
<< ": using provided fsid " << fsid
<< dendl
;
850 fsid
.print(fsid_str
);
851 strcat(fsid_str
, "\n");
852 ret
= ::ftruncate(fsid_fd
, 0);
855 derr
<< __FUNC__
<< ": failed to truncate fsid: "
856 << cpp_strerror(ret
) << dendl
;
859 ret
= safe_write(fsid_fd
, fsid_str
, strlen(fsid_str
));
861 derr
<< __FUNC__
<< ": failed to write fsid: "
862 << cpp_strerror(ret
) << dendl
;
865 if (::fsync(fsid_fd
) < 0) {
867 derr
<< __FUNC__
<< ": close failed: can't write fsid: "
868 << cpp_strerror(ret
) << dendl
;
871 dout(10) << __FUNC__
<< ": fsid is " << fsid
<< dendl
;
873 if (!fsid
.is_zero() && fsid
!= old_fsid
) {
874 derr
<< __FUNC__
<< ": on-disk fsid " << old_fsid
<< " != provided " << fsid
<< dendl
;
879 dout(1) << __FUNC__
<< ": fsid is already set to " << fsid
<< dendl
;
883 ret
= write_version_stamp();
885 derr
<< __FUNC__
<< ": write_version_stamp() failed: "
886 << cpp_strerror(ret
) << dendl
;
891 superblock
.omap_backend
= cct
->_conf
->filestore_omap_backend
;
892 ret
= write_superblock();
894 derr
<< __FUNC__
<< ": write_superblock() failed: "
895 << cpp_strerror(ret
) << dendl
;
899 struct statfs basefs
;
900 ret
= ::fstatfs(basedir_fd
, &basefs
);
903 derr
<< __FUNC__
<< ": cannot fstatfs basedir "
904 << cpp_strerror(ret
) << dendl
;
908 #if defined(__linux__)
909 if (basefs
.f_type
== BTRFS_SUPER_MAGIC
&&
910 !g_ceph_context
->check_experimental_feature_enabled("btrfs")) {
911 derr
<< __FUNC__
<< ": deprecated btrfs support is not enabled" << dendl
;
916 create_backend(basefs
.f_type
);
918 ret
= backend
->create_current();
920 derr
<< __FUNC__
<< ": failed to create current/ " << cpp_strerror(ret
) << dendl
;
924 // write initial op_seq
926 uint64_t initial_seq
= 0;
927 int fd
= read_op_seq(&initial_seq
);
930 derr
<< __FUNC__
<< ": failed to create " << current_op_seq_fn
<< ": "
931 << cpp_strerror(ret
) << dendl
;
934 if (initial_seq
== 0) {
935 ret
= write_op_seq(fd
, 1);
937 VOID_TEMP_FAILURE_RETRY(::close(fd
));
938 derr
<< __FUNC__
<< ": failed to write to " << current_op_seq_fn
<< ": "
939 << cpp_strerror(ret
) << dendl
;
943 if (backend
->can_checkpoint()) {
945 current_fd
= ::open(current_fn
.c_str(), O_RDONLY
);
946 assert(current_fd
>= 0);
948 snprintf(s
, sizeof(s
), COMMIT_SNAP_ITEM
, 1ull);
949 ret
= backend
->create_checkpoint(s
, NULL
);
950 VOID_TEMP_FAILURE_RETRY(::close(current_fd
));
951 if (ret
< 0 && ret
!= -EEXIST
) {
952 VOID_TEMP_FAILURE_RETRY(::close(fd
));
953 derr
<< __FUNC__
<< ": failed to create snap_1: " << cpp_strerror(ret
) << dendl
;
958 VOID_TEMP_FAILURE_RETRY(::close(fd
));
960 ret
= KeyValueDB::test_init(superblock
.omap_backend
, omap_dir
);
962 derr
<< __FUNC__
<< ": failed to create " << cct
->_conf
->filestore_omap_backend
<< dendl
;
965 // create fsid under omap
968 char omap_fsid_fn
[PATH_MAX
];
969 snprintf(omap_fsid_fn
, sizeof(omap_fsid_fn
), "%s/osd_uuid", omap_dir
.c_str());
970 omap_fsid_fd
= ::open(omap_fsid_fn
, O_RDWR
|O_CREAT
, 0644);
971 if (omap_fsid_fd
< 0) {
973 derr
<< __FUNC__
<< ": failed to open " << omap_fsid_fn
<< ": " << cpp_strerror(ret
) << dendl
;
977 if (read_fsid(omap_fsid_fd
, &old_omap_fsid
) < 0 || old_omap_fsid
.is_zero()) {
978 assert(!fsid
.is_zero());
979 fsid
.print(fsid_str
);
980 strcat(fsid_str
, "\n");
981 ret
= ::ftruncate(omap_fsid_fd
, 0);
984 derr
<< __FUNC__
<< ": failed to truncate fsid: "
985 << cpp_strerror(ret
) << dendl
;
986 goto close_omap_fsid_fd
;
988 ret
= safe_write(omap_fsid_fd
, fsid_str
, strlen(fsid_str
));
990 derr
<< __FUNC__
<< ": failed to write fsid: "
991 << cpp_strerror(ret
) << dendl
;
992 goto close_omap_fsid_fd
;
994 dout(10) << __FUNC__
<< ": write success, fsid:" << fsid_str
<< ", ret:" << ret
<< dendl
;
995 if (::fsync(omap_fsid_fd
) < 0) {
997 derr
<< __FUNC__
<< ": close failed: can't write fsid: "
998 << cpp_strerror(ret
) << dendl
;
999 goto close_omap_fsid_fd
;
1001 dout(10) << "mkfs omap fsid is " << fsid
<< dendl
;
1003 if (fsid
!= old_omap_fsid
) {
1004 derr
<< __FUNC__
<< ": " << omap_fsid_fn
1005 << " has existed omap fsid " << old_omap_fsid
1006 << " != expected osd fsid " << fsid
1009 goto close_omap_fsid_fd
;
1011 dout(1) << __FUNC__
<< ": omap fsid is already set to " << fsid
<< dendl
;
1014 dout(1) << cct
->_conf
->filestore_omap_backend
<< " db exists/created" << dendl
;
1019 goto close_omap_fsid_fd
;
1021 ret
= write_meta("type", "filestore");
1023 goto close_omap_fsid_fd
;
1025 dout(1) << "mkfs done in " << basedir
<< dendl
;
1029 VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd
));
1031 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd
));
1034 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd
));
1040 int FileStore::mkjournal()
1045 snprintf(fn
, sizeof(fn
), "%s/fsid", basedir
.c_str());
1046 int fd
= ::open(fn
, O_RDONLY
, 0644);
1049 derr
<< __FUNC__
<< ": open error: " << cpp_strerror(err
) << dendl
;
1052 ret
= read_fsid(fd
, &fsid
);
1054 derr
<< __FUNC__
<< ": read error: " << cpp_strerror(ret
) << dendl
;
1055 VOID_TEMP_FAILURE_RETRY(::close(fd
));
1058 VOID_TEMP_FAILURE_RETRY(::close(fd
));
1064 ret
= journal
->check();
1066 ret
= journal
->create();
1068 derr
<< __FUNC__
<< ": error creating journal on " << journalpath
1069 << ": " << cpp_strerror(ret
) << dendl
;
1071 dout(0) << __FUNC__
<< ": created journal on " << journalpath
<< dendl
;
1079 int FileStore::read_fsid(int fd
, uuid_d
*uuid
)
1082 memset(fsid_str
, 0, sizeof(fsid_str
));
1083 int ret
= safe_read(fd
, fsid_str
, sizeof(fsid_str
));
1087 // old 64-bit fsid... mirror it.
1088 *(uint64_t*)&uuid
->bytes()[0] = *(uint64_t*)fsid_str
;
1089 *(uint64_t*)&uuid
->bytes()[8] = *(uint64_t*)fsid_str
;
1097 if (!uuid
->parse(fsid_str
))
1102 int FileStore::lock_fsid()
1105 memset(&l
, 0, sizeof(l
));
1107 l
.l_whence
= SEEK_SET
;
1110 int r
= ::fcntl(fsid_fd
, F_SETLK
, &l
);
1113 dout(0) << __FUNC__
<< ": failed to lock " << basedir
<< "/fsid, is another ceph-osd still running? "
1114 << cpp_strerror(err
) << dendl
;
1120 bool FileStore::test_mount_in_use()
1122 dout(5) << __FUNC__
<< ": basedir " << basedir
<< " journal " << journalpath
<< dendl
;
1124 snprintf(fn
, sizeof(fn
), "%s/fsid", basedir
.c_str());
1126 // verify fs isn't in use
1128 fsid_fd
= ::open(fn
, O_RDWR
, 0644);
1130 return 0; // no fsid, ok.
1131 bool inuse
= lock_fsid() < 0;
1132 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd
));
1137 bool FileStore::is_rotational()
1141 rotational
= backend
->is_rotational();
1143 int fd
= ::open(basedir
.c_str(), O_RDONLY
);
1147 int r
= ::fstatfs(fd
, &st
);
1152 create_backend(st
.f_type
);
1153 rotational
= backend
->is_rotational();
1157 dout(10) << __func__
<< " " << (int)rotational
<< dendl
;
1161 bool FileStore::is_journal_rotational()
1163 bool journal_rotational
;
1165 journal_rotational
= backend
->is_journal_rotational();
1167 int fd
= ::open(journalpath
.c_str(), O_RDONLY
);
1171 int r
= ::fstatfs(fd
, &st
);
1176 create_backend(st
.f_type
);
1177 journal_rotational
= backend
->is_journal_rotational();
1181 dout(10) << __func__
<< " " << (int)journal_rotational
<< dendl
;
1182 return journal_rotational
;
1185 int FileStore::_detect_fs()
1188 int r
= ::fstatfs(basedir_fd
, &st
);
1192 blk_size
= st
.f_bsize
;
1194 #if defined(__linux__)
1195 if (st
.f_type
== BTRFS_SUPER_MAGIC
&&
1196 !g_ceph_context
->check_experimental_feature_enabled("btrfs")) {
1197 derr
<<__FUNC__
<< ": deprecated btrfs support is not enabled" << dendl
;
1202 create_backend(st
.f_type
);
1204 r
= backend
->detect_features();
1206 derr
<< __FUNC__
<< ": detect_features error: " << cpp_strerror(r
) << dendl
;
1214 snprintf(fn
, sizeof(fn
), "%s/xattr_test", basedir
.c_str());
1215 int tmpfd
= ::open(fn
, O_CREAT
|O_WRONLY
|O_TRUNC
, 0700);
1218 derr
<< __FUNC__
<< ": unable to create " << fn
<< ": " << cpp_strerror(ret
) << dendl
;
1222 int ret
= chain_fsetxattr(tmpfd
, "user.test", &x
, sizeof(x
));
1224 ret
= chain_fgetxattr(tmpfd
, "user.test", &y
, sizeof(y
));
1225 if ((ret
< 0) || (x
!= y
)) {
1226 derr
<< "Extended attributes don't appear to work. ";
1228 *_dout
<< "Got error " + cpp_strerror(ret
) + ". ";
1229 *_dout
<< "If you are using ext3 or ext4, be sure to mount the underlying "
1230 << "file system with the 'user_xattr' option." << dendl
;
1232 VOID_TEMP_FAILURE_RETRY(::close(tmpfd
));
1237 memset(buf
, 0, sizeof(buf
)); // shut up valgrind
1238 chain_fsetxattr(tmpfd
, "user.test", &buf
, sizeof(buf
));
1239 chain_fsetxattr(tmpfd
, "user.test2", &buf
, sizeof(buf
));
1240 chain_fsetxattr(tmpfd
, "user.test3", &buf
, sizeof(buf
));
1241 chain_fsetxattr(tmpfd
, "user.test4", &buf
, sizeof(buf
));
1242 ret
= chain_fsetxattr(tmpfd
, "user.test5", &buf
, sizeof(buf
));
1243 if (ret
== -ENOSPC
) {
1244 dout(0) << "limited size xattrs" << dendl
;
1246 chain_fremovexattr(tmpfd
, "user.test");
1247 chain_fremovexattr(tmpfd
, "user.test2");
1248 chain_fremovexattr(tmpfd
, "user.test3");
1249 chain_fremovexattr(tmpfd
, "user.test4");
1250 chain_fremovexattr(tmpfd
, "user.test5");
1253 VOID_TEMP_FAILURE_RETRY(::close(tmpfd
));
1258 int FileStore::_sanity_check_fs()
1262 if (((int)m_filestore_journal_writeahead
+
1263 (int)m_filestore_journal_parallel
+
1264 (int)m_filestore_journal_trailing
) > 1) {
1265 dout(0) << "mount ERROR: more than one of filestore journal {writeahead,parallel,trailing} enabled" << dendl
;
1267 << " ** WARNING: more than one of 'filestore journal {writeahead,parallel,trailing}'\n"
1268 << " is enabled in ceph.conf. You must choose a single journal mode."
1269 << TEXT_NORMAL
<< std::endl
;
1273 if (!backend
->can_checkpoint()) {
1274 if (!journal
|| !m_filestore_journal_writeahead
) {
1275 dout(0) << "mount WARNING: no btrfs, and no journal in writeahead mode; data may be lost" << dendl
;
1277 << " ** WARNING: no btrfs AND (no journal OR journal not in writeahead mode)\n"
1278 << " For non-btrfs volumes, a writeahead journal is required to\n"
1279 << " maintain on-disk consistency in the event of a crash. Your conf\n"
1280 << " should include something like:\n"
1281 << " osd journal = /path/to/journal_device_or_file\n"
1282 << " filestore journal writeahead = true\n"
1288 dout(0) << "mount WARNING: no journal" << dendl
;
1290 << " ** WARNING: No osd journal is configured: write latency may be high.\n"
1291 << " If you will not be using an osd journal, write latency may be\n"
1292 << " relatively high. It can be reduced somewhat by lowering\n"
1293 << " filestore_max_sync_interval, but lower values mean lower write\n"
1294 << " throughput, especially with spinning disks.\n"
1301 int FileStore::write_superblock()
1304 ::encode(superblock
, bl
);
1305 return safe_write_file(basedir
.c_str(), "superblock",
1306 bl
.c_str(), bl
.length());
1309 int FileStore::read_superblock()
1311 bufferptr
bp(PATH_MAX
);
1312 int ret
= safe_read_file(basedir
.c_str(), "superblock",
1313 bp
.c_str(), bp
.length());
1315 if (ret
== -ENOENT
) {
1316 // If the file doesn't exist write initial CompatSet
1317 return write_superblock();
1323 bl
.push_back(std::move(bp
));
1324 bufferlist::iterator i
= bl
.begin();
1325 ::decode(superblock
, i
);
1329 int FileStore::update_version_stamp()
1331 return write_version_stamp();
1334 int FileStore::version_stamp_is_valid(uint32_t *version
)
1336 bufferptr
bp(PATH_MAX
);
1337 int ret
= safe_read_file(basedir
.c_str(), "store_version",
1338 bp
.c_str(), bp
.length());
1343 bl
.push_back(std::move(bp
));
1344 bufferlist::iterator i
= bl
.begin();
1345 ::decode(*version
, i
);
1346 dout(10) << __FUNC__
<< ": was " << *version
<< " vs target "
1347 << target_version
<< dendl
;
1348 if (*version
== target_version
)
1354 int FileStore::write_version_stamp()
1356 dout(1) << __FUNC__
<< ": " << target_version
<< dendl
;
1358 ::encode(target_version
, bl
);
1360 return safe_write_file(basedir
.c_str(), "store_version",
1361 bl
.c_str(), bl
.length());
1364 int FileStore::upgrade()
1366 dout(1) << __FUNC__
<< dendl
;
1368 int r
= version_stamp_is_valid(&version
);
1371 derr
<< "The store_version file doesn't exist." << dendl
;
1380 derr
<< "ObjectStore is old at version " << version
<< ". Please upgrade to firefly v0.80.x, convert your store, and then upgrade." << dendl
;
1384 // nothing necessary in FileStore for v3 -> v4 upgrade; we just need to
1385 // open up DBObjectMap with the do_upgrade flag, which we already did.
1386 update_version_stamp();
1390 int FileStore::read_op_seq(uint64_t *seq
)
1392 int op_fd
= ::open(current_op_seq_fn
.c_str(), O_CREAT
|O_RDWR
, 0644);
1395 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
1399 memset(s
, 0, sizeof(s
));
1400 int ret
= safe_read(op_fd
, s
, sizeof(s
) - 1);
1402 derr
<< __FUNC__
<< ": error reading " << current_op_seq_fn
<< ": " << cpp_strerror(ret
) << dendl
;
1403 VOID_TEMP_FAILURE_RETRY(::close(op_fd
));
1404 assert(!m_filestore_fail_eio
|| ret
!= -EIO
);
1411 int FileStore::write_op_seq(int fd
, uint64_t seq
)
1414 snprintf(s
, sizeof(s
), "%" PRId64
"\n", seq
);
1415 int ret
= TEMP_FAILURE_RETRY(::pwrite(fd
, s
, strlen(s
), 0));
1418 assert(!m_filestore_fail_eio
|| ret
!= -EIO
);
1423 int FileStore::mount()
1427 uint64_t initial_op_seq
;
1429 set
<string
> cluster_snaps
;
1430 CompatSet supported_compat_set
= get_fs_supported_compat_set();
1432 dout(5) << "basedir " << basedir
<< " journal " << journalpath
<< dendl
;
1434 ret
= set_throttle_params();
1438 // make sure global base dir exists
1439 if (::access(basedir
.c_str(), R_OK
| W_OK
)) {
1441 derr
<< __FUNC__
<< ": unable to access basedir '" << basedir
<< "': "
1442 << cpp_strerror(ret
) << dendl
;
1447 snprintf(buf
, sizeof(buf
), "%s/fsid", basedir
.c_str());
1448 fsid_fd
= ::open(buf
, O_RDWR
, 0644);
1451 derr
<< __FUNC__
<< ": error opening '" << buf
<< "': "
1452 << cpp_strerror(ret
) << dendl
;
1456 ret
= read_fsid(fsid_fd
, &fsid
);
1458 derr
<< __FUNC__
<< ": error reading fsid_fd: " << cpp_strerror(ret
)
1463 if (lock_fsid() < 0) {
1464 derr
<< __FUNC__
<< ": lock_fsid failed" << dendl
;
1469 dout(10) << "mount fsid is " << fsid
<< dendl
;
1472 uint32_t version_stamp
;
1473 ret
= version_stamp_is_valid(&version_stamp
);
1475 derr
<< __FUNC__
<< ": error in version_stamp_is_valid: "
1476 << cpp_strerror(ret
) << dendl
;
1478 } else if (ret
== 0) {
1479 if (do_update
|| (int)version_stamp
< cct
->_conf
->filestore_update_to
) {
1480 derr
<< __FUNC__
<< ": stale version stamp detected: "
1482 << ". Proceeding, do_update "
1483 << "is set, performing disk format upgrade."
1488 derr
<< __FUNC__
<< ": stale version stamp " << version_stamp
1489 << ". Please run the FileStore update script before starting the "
1490 << "OSD, or set filestore_update_to to " << target_version
1491 << " (currently " << cct
->_conf
->filestore_update_to
<< ")"
1497 ret
= read_superblock();
1502 // Check if this FileStore supports all the necessary features to mount
1503 if (supported_compat_set
.compare(superblock
.compat_features
) == -1) {
1504 derr
<< __FUNC__
<< ": Incompatible features set "
1505 << superblock
.compat_features
<< dendl
;
1510 // open some dir handles
1511 basedir_fd
= ::open(basedir
.c_str(), O_RDONLY
);
1512 if (basedir_fd
< 0) {
1514 derr
<< __FUNC__
<< ": failed to open " << basedir
<< ": "
1515 << cpp_strerror(ret
) << dendl
;
1520 // test for btrfs, xattrs, etc.
1523 derr
<< __FUNC__
<< ": error in _detect_fs: "
1524 << cpp_strerror(ret
) << dendl
;
1525 goto close_basedir_fd
;
1530 ret
= backend
->list_checkpoints(ls
);
1532 derr
<< __FUNC__
<< ": error in _list_snaps: "<< cpp_strerror(ret
) << dendl
;
1533 goto close_basedir_fd
;
1536 long long unsigned c
, prev
= 0;
1537 char clustersnap
[NAME_MAX
];
1538 for (list
<string
>::iterator it
= ls
.begin(); it
!= ls
.end(); ++it
) {
1539 if (sscanf(it
->c_str(), COMMIT_SNAP_ITEM
, &c
) == 1) {
1543 } else if (sscanf(it
->c_str(), CLUSTER_SNAP_ITEM
, clustersnap
) == 1)
1544 cluster_snaps
.insert(*it
);
1548 if (m_osd_rollback_to_cluster_snap
.length() &&
1549 cluster_snaps
.count(m_osd_rollback_to_cluster_snap
) == 0) {
1550 derr
<< "rollback to cluster snapshot '" << m_osd_rollback_to_cluster_snap
<< "': not found" << dendl
;
1552 goto close_basedir_fd
;
1556 snprintf(nosnapfn
, sizeof(nosnapfn
), "%s/nosnap", current_fn
.c_str());
1558 if (backend
->can_checkpoint()) {
1559 if (snaps
.empty()) {
1560 dout(0) << __FUNC__
<< ": WARNING: no consistent snaps found, store may be in inconsistent state" << dendl
;
1563 uint64_t curr_seq
= 0;
1565 if (m_osd_rollback_to_cluster_snap
.length()) {
1567 << " ** NOTE: rolling back to cluster snapshot " << m_osd_rollback_to_cluster_snap
<< " **"
1570 assert(cluster_snaps
.count(m_osd_rollback_to_cluster_snap
));
1571 snprintf(s
, sizeof(s
), CLUSTER_SNAP_ITEM
, m_osd_rollback_to_cluster_snap
.c_str());
1574 int fd
= read_op_seq(&curr_seq
);
1576 VOID_TEMP_FAILURE_RETRY(::close(fd
));
1580 dout(10) << " current/ seq was " << curr_seq
<< dendl
;
1582 dout(10) << " current/ missing entirely (unusual, but okay)" << dendl
;
1584 uint64_t cp
= snaps
.back();
1585 dout(10) << " most recent snap from " << snaps
<< " is " << cp
<< dendl
;
1587 // if current/ is marked as non-snapshotted, refuse to roll
1588 // back (without clear direction) to avoid throwing out new
1591 if (::stat(nosnapfn
, &st
) == 0) {
1592 if (!m_osd_use_stale_snap
) {
1593 derr
<< "ERROR: " << nosnapfn
<< " exists, not rolling back to avoid losing new data" << dendl
;
1594 derr
<< "Force rollback to old snapshotted version with 'osd use stale snap = true'" << dendl
;
1595 derr
<< "config option for --osd-use-stale-snap startup argument." << dendl
;
1597 goto close_basedir_fd
;
1599 derr
<< "WARNING: user forced start with data sequence mismatch: current was " << curr_seq
1600 << ", newest snap is " << cp
<< dendl
;
1602 << " ** WARNING: forcing the use of stale snapshot data **"
1603 << TEXT_NORMAL
<< std::endl
;
1606 dout(10) << __FUNC__
<< ": rolling back to consistent snap " << cp
<< dendl
;
1607 snprintf(s
, sizeof(s
), COMMIT_SNAP_ITEM
, (long long unsigned)cp
);
1611 ret
= backend
->rollback_to(s
);
1613 derr
<< __FUNC__
<< ": error rolling back to " << s
<< ": "
1614 << cpp_strerror(ret
) << dendl
;
1615 goto close_basedir_fd
;
1621 current_fd
= ::open(current_fn
.c_str(), O_RDONLY
);
1622 if (current_fd
< 0) {
1624 derr
<< __FUNC__
<< ": error opening: " << current_fn
<< ": " << cpp_strerror(ret
) << dendl
;
1625 goto close_basedir_fd
;
1628 assert(current_fd
>= 0);
1630 op_fd
= read_op_seq(&initial_op_seq
);
1633 derr
<< __FUNC__
<< ": read_op_seq failed" << dendl
;
1634 goto close_current_fd
;
1637 dout(5) << "mount op_seq is " << initial_op_seq
<< dendl
;
1638 if (initial_op_seq
== 0) {
1639 derr
<< "mount initial op seq is 0; something is wrong" << dendl
;
1641 goto close_current_fd
;
1644 if (!backend
->can_checkpoint()) {
1645 // mark current/ as non-snapshotted so that we don't rollback away
1647 int r
= ::creat(nosnapfn
, 0644);
1650 derr
<< __FUNC__
<< ": failed to create current/nosnap" << dendl
;
1651 goto close_current_fd
;
1653 VOID_TEMP_FAILURE_RETRY(::close(r
));
1655 // clear nosnap marker, if present.
1659 // check fsid with omap
1662 char omap_fsid_buf
[PATH_MAX
];
1663 struct ::stat omap_fsid_stat
;
1664 snprintf(omap_fsid_buf
, sizeof(omap_fsid_buf
), "%s/osd_uuid", omap_dir
.c_str());
1665 // if osd_uuid not exists, assume as this omap matchs corresponding osd
1666 if (::stat(omap_fsid_buf
, &omap_fsid_stat
) != 0){
1667 dout(10) << __FUNC__
<< ": osd_uuid not found under omap, "
1668 << "assume as matched."
1671 // if osd_uuid exists, compares osd_uuid with fsid
1672 omap_fsid_fd
= ::open(omap_fsid_buf
, O_RDONLY
, 0644);
1673 if (omap_fsid_fd
< 0) {
1675 derr
<< __FUNC__
<< ": error opening '" << omap_fsid_buf
<< "': "
1676 << cpp_strerror(ret
)
1678 goto close_current_fd
;
1680 ret
= read_fsid(omap_fsid_fd
, &omap_fsid
);
1681 VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd
));
1682 omap_fsid_fd
= -1; // defensive
1684 derr
<< __FUNC__
<< ": error reading omap_fsid_fd"
1685 << ", omap_fsid = " << omap_fsid
1686 << cpp_strerror(ret
)
1688 goto close_current_fd
;
1690 if (fsid
!= omap_fsid
) {
1691 derr
<< __FUNC__
<< ": " << omap_fsid_buf
1692 << " has existed omap fsid " << omap_fsid
1693 << " != expected osd fsid " << fsid
1696 goto close_current_fd
;
1700 dout(0) << "start omap initiation" << dendl
;
1701 if (!(generic_flags
& SKIP_MOUNT_OMAP
)) {
1702 KeyValueDB
* omap_store
= KeyValueDB::create(cct
,
1703 superblock
.omap_backend
,
1705 if (omap_store
== NULL
)
1707 derr
<< __FUNC__
<< ": Error creating " << superblock
.omap_backend
<< dendl
;
1709 goto close_current_fd
;
1712 if (superblock
.omap_backend
== "rocksdb")
1713 ret
= omap_store
->init(cct
->_conf
->filestore_rocksdb_options
);
1715 ret
= omap_store
->init();
1718 derr
<< __FUNC__
<< ": Error initializing omap_store: " << cpp_strerror(ret
) << dendl
;
1719 goto close_current_fd
;
1723 if (omap_store
->create_and_open(err
)) {
1725 derr
<< __FUNC__
<< ": Error initializing " << superblock
.omap_backend
1726 << " : " << err
.str() << dendl
;
1728 goto close_current_fd
;
1731 DBObjectMap
*dbomap
= new DBObjectMap(cct
, omap_store
);
1732 ret
= dbomap
->init(do_update
);
1735 derr
<< __FUNC__
<< ": Error initializing DBObjectMap: " << ret
<< dendl
;
1736 goto close_current_fd
;
1740 if (cct
->_conf
->filestore_debug_omap_check
&& !dbomap
->check(err2
)) {
1741 derr
<< err2
.str() << dendl
;
1744 goto close_current_fd
;
1746 object_map
.reset(dbomap
);
1752 // select journal mode?
1754 if (!m_filestore_journal_writeahead
&&
1755 !m_filestore_journal_parallel
&&
1756 !m_filestore_journal_trailing
) {
1757 if (!backend
->can_checkpoint()) {
1758 m_filestore_journal_writeahead
= true;
1759 dout(0) << __FUNC__
<< ": enabling WRITEAHEAD journal mode: checkpoint is not enabled" << dendl
;
1761 m_filestore_journal_parallel
= true;
1762 dout(0) << __FUNC__
<< ": enabling PARALLEL journal mode: fs, checkpoint is enabled" << dendl
;
1765 if (m_filestore_journal_writeahead
)
1766 dout(0) << __FUNC__
<< ": WRITEAHEAD journal mode explicitly enabled in conf" << dendl
;
1767 if (m_filestore_journal_parallel
)
1768 dout(0) << __FUNC__
<< ": PARALLEL journal mode explicitly enabled in conf" << dendl
;
1769 if (m_filestore_journal_trailing
)
1770 dout(0) << __FUNC__
<< ": TRAILING journal mode explicitly enabled in conf" << dendl
;
1772 if (m_filestore_journal_writeahead
)
1773 journal
->set_wait_on_full(true);
1775 dout(0) << __FUNC__
<< ": no journal" << dendl
;
1778 ret
= _sanity_check_fs();
1780 derr
<< __FUNC__
<< ": _sanity_check_fs failed with error "
1782 goto close_current_fd
;
1785 // Cleanup possibly invalid collections
1787 vector
<coll_t
> collections
;
1788 ret
= list_collections(collections
, true);
1790 derr
<< "Error " << ret
<< " while listing collections" << dendl
;
1791 goto close_current_fd
;
1793 for (vector
<coll_t
>::iterator i
= collections
.begin();
1794 i
!= collections
.end();
1797 ret
= get_index(*i
, &index
);
1799 derr
<< "Unable to mount index " << *i
1800 << " with error: " << ret
<< dendl
;
1801 goto close_current_fd
;
1803 assert(NULL
!= index
.index
);
1804 RWLock::WLocker
l((index
.index
)->access_lock
);
1809 if (!m_disable_wbthrottle
) {
1812 dout(0) << __FUNC__
<< ": INFO: WbThrottle is disabled" << dendl
;
1813 if (cct
->_conf
->filestore_odsync_write
) {
1814 dout(0) << __FUNC__
<< ": INFO: O_DSYNC write is enabled" << dendl
;
1817 sync_thread
.create("filestore_sync");
1819 if (!(generic_flags
& SKIP_JOURNAL_REPLAY
)) {
1820 ret
= journal_replay(initial_op_seq
);
1822 derr
<< __FUNC__
<< ": failed to open journal " << journalpath
<< ": " << cpp_strerror(ret
) << dendl
;
1823 if (ret
== -ENOTTY
) {
1824 derr
<< "maybe journal is not pointing to a block device and its size "
1825 << "wasn't configured?" << dendl
;
1834 if (cct
->_conf
->filestore_debug_omap_check
&& !object_map
->check(err2
)) {
1835 derr
<< err2
.str() << dendl
;
1841 init_temp_collections();
1846 for (vector
<Finisher
*>::iterator it
= ondisk_finishers
.begin(); it
!= ondisk_finishers
.end(); ++it
) {
1849 for (vector
<Finisher
*>::iterator it
= apply_finishers
.begin(); it
!= apply_finishers
.end(); ++it
) {
1856 if (cct
->_conf
->filestore_update_to
>= (int)get_target_version()) {
1857 int err
= upgrade();
1859 derr
<< "error converting store" << dendl
;
1875 if (!m_disable_wbthrottle
) {
1879 VOID_TEMP_FAILURE_RETRY(::close(current_fd
));
1882 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd
));
1885 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd
));
1888 assert(!m_filestore_fail_eio
|| ret
!= -EIO
);
1895 void FileStore::init_temp_collections()
1897 dout(10) << __FUNC__
<< dendl
;
1899 int r
= list_collections(ls
, true);
1902 dout(20) << " ls " << ls
<< dendl
;
1904 SequencerPosition spos
;
1907 for (vector
<coll_t
>::iterator p
= ls
.begin(); p
!= ls
.end(); ++p
)
1910 dout(20) << " temps " << temps
<< dendl
;
1912 for (vector
<coll_t
>::iterator p
= ls
.begin(); p
!= ls
.end(); ++p
) {
1917 coll_t temp
= p
->get_temp();
1918 if (temps
.count(temp
)) {
1921 dout(10) << __FUNC__
<< ": creating " << temp
<< dendl
;
1922 r
= _create_collection(temp
, 0, spos
);
1927 for (set
<coll_t
>::iterator p
= temps
.begin(); p
!= temps
.end(); ++p
) {
1928 dout(10) << __FUNC__
<< ": removing stray " << *p
<< dendl
;
1929 r
= _collection_remove_recursive(*p
, spos
);
1934 int FileStore::umount()
1936 dout(5) << __FUNC__
<< ": " << basedir
<< dendl
;
1947 if (!m_disable_wbthrottle
){
1953 if (!(generic_flags
& SKIP_JOURNAL_REPLAY
))
1954 journal_write_close();
1956 for (vector
<Finisher
*>::iterator it
= ondisk_finishers
.begin(); it
!= ondisk_finishers
.end(); ++it
) {
1959 for (vector
<Finisher
*>::iterator it
= apply_finishers
.begin(); it
!= apply_finishers
.end(); ++it
) {
1964 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd
));
1968 VOID_TEMP_FAILURE_RETRY(::close(op_fd
));
1971 if (current_fd
>= 0) {
1972 VOID_TEMP_FAILURE_RETRY(::close(current_fd
));
1975 if (basedir_fd
>= 0) {
1976 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd
));
1988 Mutex::Locker
l(sync_entry_timeo_lock
);
1999 /// -----------------------------
2001 FileStore::Op
*FileStore::build_op(vector
<Transaction
>& tls
,
2002 Context
*onreadable
,
2003 Context
*onreadable_sync
,
2004 TrackedOpRef osd_op
)
2006 uint64_t bytes
= 0, ops
= 0;
2007 for (vector
<Transaction
>::iterator p
= tls
.begin();
2010 bytes
+= (*p
).get_num_bytes();
2011 ops
+= (*p
).get_num_ops();
2015 o
->start
= ceph_clock_now();
2016 o
->tls
= std::move(tls
);
2017 o
->onreadable
= onreadable
;
2018 o
->onreadable_sync
= onreadable_sync
;
2027 void FileStore::queue_op(OpSequencer
*osr
, Op
*o
)
2029 // queue op on sequencer, then queue sequencer for the threadpool,
2030 // so that regardless of which order the threads pick up the
2031 // sequencer, the op order will be preserved.
2034 o
->trace
.event("queued");
2036 logger
->inc(l_filestore_ops
);
2037 logger
->inc(l_filestore_bytes
, o
->bytes
);
2039 dout(5) << __FUNC__
<< ": " << o
<< " seq " << o
->op
2041 << " " << o
->bytes
<< " bytes"
2042 << " (queue has " << throttle_ops
.get_current() << " ops and " << throttle_bytes
.get_current() << " bytes)"
2047 void FileStore::op_queue_reserve_throttle(Op
*o
)
2050 throttle_bytes
.get(o
->bytes
);
2052 logger
->set(l_filestore_op_queue_ops
, throttle_ops
.get_current());
2053 logger
->set(l_filestore_op_queue_bytes
, throttle_bytes
.get_current());
2056 void FileStore::op_queue_release_throttle(Op
*o
)
2059 throttle_bytes
.put(o
->bytes
);
2060 logger
->set(l_filestore_op_queue_ops
, throttle_ops
.get_current());
2061 logger
->set(l_filestore_op_queue_bytes
, throttle_bytes
.get_current());
2064 void FileStore::_do_op(OpSequencer
*osr
, ThreadPool::TPHandle
&handle
)
2066 if (!m_disable_wbthrottle
) {
2067 wbthrottle
.throttle();
2070 if (cct
->_conf
->filestore_inject_stall
) {
2071 int orig
= cct
->_conf
->filestore_inject_stall
;
2072 dout(5) << __FUNC__
<< ": filestore_inject_stall " << orig
<< ", sleeping" << dendl
;
2074 cct
->_conf
->set_val("filestore_inject_stall", "0");
2075 dout(5) << __FUNC__
<< ": done stalling" << dendl
;
2078 osr
->apply_lock
.Lock();
2079 Op
*o
= osr
->peek_queue();
2080 o
->trace
.event("op_apply_start");
2081 apply_manager
.op_apply_start(o
->op
);
2082 dout(5) << __FUNC__
<< ": " << o
<< " seq " << o
->op
<< " " << *osr
<< "/" << osr
->parent
<< " start" << dendl
;
2083 o
->trace
.event("_do_transactions start");
2084 int r
= _do_transactions(o
->tls
, o
->op
, &handle
);
2085 o
->trace
.event("op_apply_finish");
2086 apply_manager
.op_apply_finish(o
->op
);
2087 dout(10) << __FUNC__
<< ": " << o
<< " seq " << o
->op
<< " r = " << r
2088 << ", finisher " << o
->onreadable
<< " " << o
->onreadable_sync
<< dendl
;
2094 void FileStore::_finish_op(OpSequencer
*osr
)
2096 list
<Context
*> to_queue
;
2097 Op
*o
= osr
->dequeue(&to_queue
);
2099 utime_t lat
= ceph_clock_now();
2102 dout(10) << __FUNC__
<< ": " << o
<< " seq " << o
->op
<< " " << *osr
<< "/" << osr
->parent
<< " lat " << lat
<< dendl
;
2103 osr
->apply_lock
.Unlock(); // locked in _do_op
2104 o
->trace
.event("_finish_op");
2106 // called with tp lock held
2107 op_queue_release_throttle(o
);
2109 logger
->tinc(l_filestore_apply_latency
, lat
);
2111 if (o
->onreadable_sync
) {
2112 o
->onreadable_sync
->complete(0);
2114 if (o
->onreadable
) {
2115 apply_finishers
[osr
->id
% m_apply_finisher_num
]->queue(o
->onreadable
);
2117 if (!to_queue
.empty()) {
2118 apply_finishers
[osr
->id
% m_apply_finisher_num
]->queue(to_queue
);
2124 struct C_JournaledAhead
: public Context
{
2126 FileStore::OpSequencer
*osr
;
2130 C_JournaledAhead(FileStore
*f
, FileStore::OpSequencer
*os
, FileStore::Op
*o
, Context
*ondisk
):
2131 fs(f
), osr(os
), o(o
), ondisk(ondisk
) { }
2132 void finish(int r
) override
{
2133 fs
->_journaled_ahead(osr
, o
, ondisk
);
2137 int FileStore::queue_transactions(Sequencer
*posr
, vector
<Transaction
>& tls
,
2138 TrackedOpRef osd_op
,
2139 ThreadPool::TPHandle
*handle
)
2141 Context
*onreadable
;
2143 Context
*onreadable_sync
;
2144 ObjectStore::Transaction::collect_contexts(
2145 tls
, &onreadable
, &ondisk
, &onreadable_sync
);
2147 if (cct
->_conf
->objectstore_blackhole
) {
2148 dout(0) << __FUNC__
<< ": objectstore_blackhole = TRUE, dropping transaction"
2152 delete onreadable_sync
;
2156 utime_t start
= ceph_clock_now();
2157 // set up the sequencer
2161 osr
= static_cast<OpSequencer
*>(posr
->p
.get());
2162 dout(5) << __FUNC__
<< ": existing " << osr
<< " " << *osr
<< dendl
;
2164 osr
= new OpSequencer(cct
, ++next_osr_id
);
2168 dout(5) << __FUNC__
<< ": new " << osr
<< " " << *osr
<< dendl
;
2171 // used to include osr information in tracepoints during transaction apply
2172 for (vector
<Transaction
>::iterator i
= tls
.begin(); i
!= tls
.end(); ++i
) {
2176 ZTracer::Trace trace
;
2177 if (osd_op
&& osd_op
->pg_trace
) {
2178 osd_op
->store_trace
.init("filestore op", &trace_endpoint
, &osd_op
->pg_trace
);
2179 trace
= osd_op
->store_trace
;
2182 if (journal
&& journal
->is_writeable() && !m_filestore_journal_trailing
) {
2183 Op
*o
= build_op(tls
, onreadable
, onreadable_sync
, osd_op
);
2185 //prepare and encode transactions data out of lock
2187 int orig_len
= journal
->prepare_entry(o
->tls
, &tbl
);
2190 handle
->suspend_tp_timeout();
2192 op_queue_reserve_throttle(o
);
2193 journal
->reserve_throttle_and_backoff(tbl
.length());
2196 handle
->reset_tp_timeout();
2198 uint64_t op_num
= submit_manager
.op_submit_start();
2200 trace
.keyval("opnum", op_num
);
2202 if (m_filestore_do_dump
)
2203 dump_transactions(o
->tls
, o
->op
, osr
);
2205 if (m_filestore_journal_parallel
) {
2206 dout(5) << __FUNC__
<< ": (parallel) " << o
->op
<< " " << o
->tls
<< dendl
;
2208 trace
.keyval("journal mode", "parallel");
2209 trace
.event("journal started");
2210 _op_journal_transactions(tbl
, orig_len
, o
->op
, ondisk
, osd_op
);
2212 // queue inside submit_manager op submission lock
2214 trace
.event("op queued");
2215 } else if (m_filestore_journal_writeahead
) {
2216 dout(5) << __FUNC__
<< ": (writeahead) " << o
->op
<< " " << o
->tls
<< dendl
;
2218 osr
->queue_journal(o
->op
);
2220 trace
.keyval("journal mode", "writeahead");
2221 trace
.event("journal started");
2222 _op_journal_transactions(tbl
, orig_len
, o
->op
,
2223 new C_JournaledAhead(this, osr
, o
, ondisk
),
2228 submit_manager
.op_submit_finish(op_num
);
2229 utime_t end
= ceph_clock_now();
2230 logger
->tinc(l_filestore_queue_transaction_latency_avg
, end
- start
);
2235 Op
*o
= build_op(tls
, onreadable
, onreadable_sync
, osd_op
);
2236 dout(5) << __FUNC__
<< ": (no journal) " << o
<< " " << tls
<< dendl
;
2239 handle
->suspend_tp_timeout();
2241 op_queue_reserve_throttle(o
);
2244 handle
->reset_tp_timeout();
2246 uint64_t op_num
= submit_manager
.op_submit_start();
2249 if (m_filestore_do_dump
)
2250 dump_transactions(o
->tls
, o
->op
, osr
);
2253 trace
.keyval("opnum", op_num
);
2254 trace
.keyval("journal mode", "none");
2255 trace
.event("op queued");
2258 apply_manager
.add_waiter(op_num
, ondisk
);
2259 submit_manager
.op_submit_finish(op_num
);
2260 utime_t end
= ceph_clock_now();
2261 logger
->tinc(l_filestore_queue_transaction_latency_avg
, end
- start
);
2266 //prepare and encode transactions data out of lock
2269 if (journal
->is_writeable()) {
2270 orig_len
= journal
->prepare_entry(tls
, &tbl
);
2272 uint64_t op
= submit_manager
.op_submit_start();
2273 dout(5) << __FUNC__
<< ": (trailing journal) " << op
<< " " << tls
<< dendl
;
2275 if (m_filestore_do_dump
)
2276 dump_transactions(tls
, op
, osr
);
2278 trace
.event("op_apply_start");
2279 trace
.keyval("opnum", op
);
2280 trace
.keyval("journal mode", "trailing");
2281 apply_manager
.op_apply_start(op
);
2282 trace
.event("do_transactions");
2283 int r
= do_transactions(tls
, op
);
2286 trace
.event("journal started");
2287 _op_journal_transactions(tbl
, orig_len
, op
, ondisk
, osd_op
);
2292 // start on_readable finisher after we queue journal item, as on_readable callback
2293 // is allowed to delete the Transaction
2294 if (onreadable_sync
) {
2295 onreadable_sync
->complete(r
);
2297 apply_finishers
[osr
->id
% m_apply_finisher_num
]->queue(onreadable
, r
);
2299 submit_manager
.op_submit_finish(op
);
2300 trace
.event("op_apply_finish");
2301 apply_manager
.op_apply_finish(op
);
2303 utime_t end
= ceph_clock_now();
2304 logger
->tinc(l_filestore_queue_transaction_latency_avg
, end
- start
);
2308 void FileStore::_journaled_ahead(OpSequencer
*osr
, Op
*o
, Context
*ondisk
)
2310 dout(5) << __FUNC__
<< ": " << o
<< " seq " << o
->op
<< " " << *osr
<< " " << o
->tls
<< dendl
;
2312 o
->trace
.event("writeahead journal finished");
2314 // this should queue in order because the journal does it's completions in order.
2317 list
<Context
*> to_queue
;
2318 osr
->dequeue_journal(&to_queue
);
2320 // do ondisk completions async, to prevent any onreadable_sync completions
2321 // getting blocked behind an ondisk completion.
2323 dout(10) << " queueing ondisk " << ondisk
<< dendl
;
2324 ondisk_finishers
[osr
->id
% m_ondisk_finisher_num
]->queue(ondisk
);
2326 if (!to_queue
.empty()) {
2327 ondisk_finishers
[osr
->id
% m_ondisk_finisher_num
]->queue(to_queue
);
2331 int FileStore::_do_transactions(
2332 vector
<Transaction
> &tls
,
2334 ThreadPool::TPHandle
*handle
)
2338 for (vector
<Transaction
>::iterator p
= tls
.begin();
2341 _do_transaction(*p
, op_seq
, trans_num
, handle
);
2343 handle
->reset_tp_timeout();
2349 void FileStore::_set_global_replay_guard(const coll_t
& cid
,
2350 const SequencerPosition
&spos
)
2352 if (backend
->can_checkpoint())
2355 // sync all previous operations on this sequencer
2356 int ret
= object_map
->sync();
2358 derr
<< __FUNC__
<< ": omap sync error " << cpp_strerror(ret
) << dendl
;
2359 assert(0 == "_set_global_replay_guard failed");
2361 ret
= sync_filesystem(basedir_fd
);
2363 derr
<< __FUNC__
<< ": sync_filesystem error " << cpp_strerror(ret
) << dendl
;
2364 assert(0 == "_set_global_replay_guard failed");
2368 get_cdir(cid
, fn
, sizeof(fn
));
2369 int fd
= ::open(fn
, O_RDONLY
);
2372 derr
<< __FUNC__
<< ": " << cid
<< " error " << cpp_strerror(err
) << dendl
;
2373 assert(0 == "_set_global_replay_guard failed");
2378 // then record that we did it
2381 int r
= chain_fsetxattr
<true, true>(
2382 fd
, GLOBAL_REPLAY_GUARD_XATTR
, v
.c_str(), v
.length());
2384 derr
<< __FUNC__
<< ": fsetxattr " << GLOBAL_REPLAY_GUARD_XATTR
2385 << " got " << cpp_strerror(r
) << dendl
;
2386 assert(0 == "fsetxattr failed");
2389 // and make sure our xattr is durable.
2394 VOID_TEMP_FAILURE_RETRY(::close(fd
));
2395 dout(10) << __FUNC__
<< ": " << spos
<< " done" << dendl
;
2398 int FileStore::_check_global_replay_guard(const coll_t
& cid
,
2399 const SequencerPosition
& spos
)
2402 get_cdir(cid
, fn
, sizeof(fn
));
2403 int fd
= ::open(fn
, O_RDONLY
);
2405 dout(10) << __FUNC__
<< ": " << cid
<< " dne" << dendl
;
2406 return 1; // if collection does not exist, there is no guard, and we can replay.
2410 int r
= chain_fgetxattr(fd
, GLOBAL_REPLAY_GUARD_XATTR
, buf
, sizeof(buf
));
2412 dout(20) << __FUNC__
<< ": no xattr" << dendl
;
2413 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
2414 VOID_TEMP_FAILURE_RETRY(::close(fd
));
2415 return 1; // no xattr
2420 SequencerPosition opos
;
2421 bufferlist::iterator p
= bl
.begin();
2424 VOID_TEMP_FAILURE_RETRY(::close(fd
));
2425 return spos
>= opos
? 1 : -1;
2429 void FileStore::_set_replay_guard(const coll_t
& cid
,
2430 const SequencerPosition
&spos
,
2431 bool in_progress
=false)
2434 get_cdir(cid
, fn
, sizeof(fn
));
2435 int fd
= ::open(fn
, O_RDONLY
);
2438 derr
<< __FUNC__
<< ": " << cid
<< " error " << cpp_strerror(err
) << dendl
;
2439 assert(0 == "_set_replay_guard failed");
2441 _set_replay_guard(fd
, spos
, 0, in_progress
);
2442 VOID_TEMP_FAILURE_RETRY(::close(fd
));
2446 void FileStore::_set_replay_guard(int fd
,
2447 const SequencerPosition
& spos
,
2448 const ghobject_t
*hoid
,
2451 if (backend
->can_checkpoint())
2454 dout(10) << __FUNC__
<< ": " << spos
<< (in_progress
? " START" : "") << dendl
;
2458 // first make sure the previous operation commits
2462 // sync object_map too. even if this object has a header or keys,
2463 // it have had them in the past and then removed them, so always
2465 object_map
->sync(hoid
, &spos
);
2470 // then record that we did it
2473 ::encode(in_progress
, v
);
2474 int r
= chain_fsetxattr
<true, true>(
2475 fd
, REPLAY_GUARD_XATTR
, v
.c_str(), v
.length());
2477 derr
<< "fsetxattr " << REPLAY_GUARD_XATTR
<< " got " << cpp_strerror(r
) << dendl
;
2478 assert(0 == "fsetxattr failed");
2481 // and make sure our xattr is durable.
2486 dout(10) << __FUNC__
<< ": " << spos
<< " done" << dendl
;
2489 void FileStore::_close_replay_guard(const coll_t
& cid
,
2490 const SequencerPosition
&spos
)
2493 get_cdir(cid
, fn
, sizeof(fn
));
2494 int fd
= ::open(fn
, O_RDONLY
);
2497 derr
<< __FUNC__
<< ": " << cid
<< " error " << cpp_strerror(err
) << dendl
;
2498 assert(0 == "_close_replay_guard failed");
2500 _close_replay_guard(fd
, spos
);
2501 VOID_TEMP_FAILURE_RETRY(::close(fd
));
2504 void FileStore::_close_replay_guard(int fd
, const SequencerPosition
& spos
,
2505 const ghobject_t
*hoid
)
2507 if (backend
->can_checkpoint())
2510 dout(10) << __FUNC__
<< ": " << spos
<< dendl
;
2514 // sync object_map too. even if this object has a header or keys,
2515 // it have had them in the past and then removed them, so always
2517 object_map
->sync(hoid
, &spos
);
2519 // then record that we are done with this operation
2522 bool in_progress
= false;
2523 ::encode(in_progress
, v
);
2524 int r
= chain_fsetxattr
<true, true>(
2525 fd
, REPLAY_GUARD_XATTR
, v
.c_str(), v
.length());
2527 derr
<< "fsetxattr " << REPLAY_GUARD_XATTR
<< " got " << cpp_strerror(r
) << dendl
;
2528 assert(0 == "fsetxattr failed");
2531 // and make sure our xattr is durable.
2536 dout(10) << __FUNC__
<< ": " << spos
<< " done" << dendl
;
2539 int FileStore::_check_replay_guard(const coll_t
& cid
, const ghobject_t
&oid
,
2540 const SequencerPosition
& spos
)
2542 if (!replaying
|| backend
->can_checkpoint())
2545 int r
= _check_global_replay_guard(cid
, spos
);
2550 r
= lfn_open(cid
, oid
, false, &fd
);
2552 dout(10) << __FUNC__
<< ": " << cid
<< " " << oid
<< " dne" << dendl
;
2553 return 1; // if file does not exist, there is no guard, and we can replay.
2555 int ret
= _check_replay_guard(**fd
, spos
);
2560 int FileStore::_check_replay_guard(const coll_t
& cid
, const SequencerPosition
& spos
)
2562 if (!replaying
|| backend
->can_checkpoint())
2566 get_cdir(cid
, fn
, sizeof(fn
));
2567 int fd
= ::open(fn
, O_RDONLY
);
2569 dout(10) << __FUNC__
<< ": " << cid
<< " dne" << dendl
;
2570 return 1; // if collection does not exist, there is no guard, and we can replay.
2572 int ret
= _check_replay_guard(fd
, spos
);
2573 VOID_TEMP_FAILURE_RETRY(::close(fd
));
2577 int FileStore::_check_replay_guard(int fd
, const SequencerPosition
& spos
)
2579 if (!replaying
|| backend
->can_checkpoint())
2583 int r
= chain_fgetxattr(fd
, REPLAY_GUARD_XATTR
, buf
, sizeof(buf
));
2585 dout(20) << __FUNC__
<< ": no xattr" << dendl
;
2586 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
2587 return 1; // no xattr
2592 SequencerPosition opos
;
2593 bufferlist::iterator p
= bl
.begin();
2595 bool in_progress
= false;
2596 if (!p
.end()) // older journals don't have this
2597 ::decode(in_progress
, p
);
2599 dout(10) << __FUNC__
<< ": object has " << opos
<< " > current pos " << spos
2600 << ", now or in future, SKIPPING REPLAY" << dendl
;
2602 } else if (opos
== spos
) {
2604 dout(10) << __FUNC__
<< ": object has " << opos
<< " == current pos " << spos
2605 << ", in_progress=true, CONDITIONAL REPLAY" << dendl
;
2608 dout(10) << __FUNC__
<< ": object has " << opos
<< " == current pos " << spos
2609 << ", in_progress=false, SKIPPING REPLAY" << dendl
;
2613 dout(10) << __FUNC__
<< ": object has " << opos
<< " < current pos " << spos
2614 << ", in past, will replay" << dendl
;
2619 void FileStore::_do_transaction(
2620 Transaction
& t
, uint64_t op_seq
, int trans_num
,
2621 ThreadPool::TPHandle
*handle
)
2623 dout(10) << __FUNC__
<< ": on " << &t
<< dendl
;
2626 const char *osr_name
= t
.get_osr() ? static_cast<OpSequencer
*>(t
.get_osr())->get_name().c_str() : "<NULL>";
2629 Transaction::iterator i
= t
.begin();
2631 SequencerPosition
spos(op_seq
, trans_num
, 0);
2632 while (i
.have_op()) {
2634 handle
->reset_tp_timeout();
2636 Transaction::Op
*op
= i
.decode_op();
2642 case Transaction::OP_NOP
:
2644 case Transaction::OP_TOUCH
:
2646 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2647 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2648 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2649 _cid
: _cid
.get_temp();
2650 tracepoint(objectstore
, touch_enter
, osr_name
);
2651 if (_check_replay_guard(cid
, oid
, spos
) > 0)
2652 r
= _touch(cid
, oid
);
2653 tracepoint(objectstore
, touch_exit
, r
);
2657 case Transaction::OP_WRITE
:
2659 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2660 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2661 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2662 _cid
: _cid
.get_temp();
2663 uint64_t off
= op
->off
;
2664 uint64_t len
= op
->len
;
2665 uint32_t fadvise_flags
= i
.get_fadvise_flags();
2668 tracepoint(objectstore
, write_enter
, osr_name
, off
, len
);
2669 if (_check_replay_guard(cid
, oid
, spos
) > 0)
2670 r
= _write(cid
, oid
, off
, len
, bl
, fadvise_flags
);
2671 tracepoint(objectstore
, write_exit
, r
);
2675 case Transaction::OP_ZERO
:
2677 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2678 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2679 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2680 _cid
: _cid
.get_temp();
2681 uint64_t off
= op
->off
;
2682 uint64_t len
= op
->len
;
2683 tracepoint(objectstore
, zero_enter
, osr_name
, off
, len
);
2684 if (_check_replay_guard(cid
, oid
, spos
) > 0)
2685 r
= _zero(cid
, oid
, off
, len
);
2686 tracepoint(objectstore
, zero_exit
, r
);
2690 case Transaction::OP_TRIMCACHE
:
2692 // deprecated, no-op
2696 case Transaction::OP_TRUNCATE
:
2698 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2699 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2700 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2701 _cid
: _cid
.get_temp();
2702 uint64_t off
= op
->off
;
2703 tracepoint(objectstore
, truncate_enter
, osr_name
, off
);
2704 if (_check_replay_guard(cid
, oid
, spos
) > 0)
2705 r
= _truncate(cid
, oid
, off
);
2706 tracepoint(objectstore
, truncate_exit
, r
);
2710 case Transaction::OP_REMOVE
:
2712 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2713 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2714 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2715 _cid
: _cid
.get_temp();
2716 tracepoint(objectstore
, remove_enter
, osr_name
);
2717 if (_check_replay_guard(cid
, oid
, spos
) > 0)
2718 r
= _remove(cid
, oid
, spos
);
2719 tracepoint(objectstore
, remove_exit
, r
);
2723 case Transaction::OP_SETATTR
:
2725 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2726 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2727 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2728 _cid
: _cid
.get_temp();
2729 string name
= i
.decode_string();
2732 tracepoint(objectstore
, setattr_enter
, osr_name
);
2733 if (_check_replay_guard(cid
, oid
, spos
) > 0) {
2734 map
<string
, bufferptr
> to_set
;
2735 to_set
[name
] = bufferptr(bl
.c_str(), bl
.length());
2736 r
= _setattrs(cid
, oid
, to_set
, spos
);
2738 dout(0) << " ENOSPC on setxattr on " << cid
<< "/" << oid
2739 << " name " << name
<< " size " << bl
.length() << dendl
;
2741 tracepoint(objectstore
, setattr_exit
, r
);
2745 case Transaction::OP_SETATTRS
:
2747 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2748 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2749 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2750 _cid
: _cid
.get_temp();
2751 map
<string
, bufferptr
> aset
;
2752 i
.decode_attrset(aset
);
2753 tracepoint(objectstore
, setattrs_enter
, osr_name
);
2754 if (_check_replay_guard(cid
, oid
, spos
) > 0)
2755 r
= _setattrs(cid
, oid
, aset
, spos
);
2756 tracepoint(objectstore
, setattrs_exit
, r
);
2758 dout(0) << " ENOSPC on setxattrs on " << cid
<< "/" << oid
<< dendl
;
2762 case Transaction::OP_RMATTR
:
2764 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2765 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2766 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2767 _cid
: _cid
.get_temp();
2768 string name
= i
.decode_string();
2769 tracepoint(objectstore
, rmattr_enter
, osr_name
);
2770 if (_check_replay_guard(cid
, oid
, spos
) > 0)
2771 r
= _rmattr(cid
, oid
, name
.c_str(), spos
);
2772 tracepoint(objectstore
, rmattr_exit
, r
);
2776 case Transaction::OP_RMATTRS
:
2778 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2779 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2780 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2781 _cid
: _cid
.get_temp();
2782 tracepoint(objectstore
, rmattrs_enter
, osr_name
);
2783 if (_check_replay_guard(cid
, oid
, spos
) > 0)
2784 r
= _rmattrs(cid
, oid
, spos
);
2785 tracepoint(objectstore
, rmattrs_exit
, r
);
2789 case Transaction::OP_CLONE
:
2791 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2792 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2793 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2794 _cid
: _cid
.get_temp();
2795 const ghobject_t
&noid
= i
.get_oid(op
->dest_oid
);
2796 tracepoint(objectstore
, clone_enter
, osr_name
);
2797 r
= _clone(cid
, oid
, noid
, spos
);
2798 tracepoint(objectstore
, clone_exit
, r
);
2802 case Transaction::OP_CLONERANGE
:
2804 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2805 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2806 const ghobject_t
&noid
= i
.get_oid(op
->dest_oid
);
2807 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2808 _cid
: _cid
.get_temp();
2809 const coll_t
&ncid
= !_need_temp_object_collection(_cid
, noid
) ?
2810 _cid
: _cid
.get_temp();
2811 uint64_t off
= op
->off
;
2812 uint64_t len
= op
->len
;
2813 tracepoint(objectstore
, clone_range_enter
, osr_name
, len
);
2814 r
= _clone_range(cid
, oid
, ncid
, noid
, off
, len
, off
, spos
);
2815 tracepoint(objectstore
, clone_range_exit
, r
);
2819 case Transaction::OP_CLONERANGE2
:
2821 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2822 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2823 const ghobject_t
&noid
= i
.get_oid(op
->dest_oid
);
2824 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2825 _cid
: _cid
.get_temp();
2826 const coll_t
&ncid
= !_need_temp_object_collection(_cid
, noid
) ?
2827 _cid
: _cid
.get_temp();
2828 uint64_t srcoff
= op
->off
;
2829 uint64_t len
= op
->len
;
2830 uint64_t dstoff
= op
->dest_off
;
2831 tracepoint(objectstore
, clone_range2_enter
, osr_name
, len
);
2832 r
= _clone_range(cid
, oid
, ncid
, noid
, srcoff
, len
, dstoff
, spos
);
2833 tracepoint(objectstore
, clone_range2_exit
, r
);
2837 case Transaction::OP_MKCOLL
:
2839 const coll_t
&cid
= i
.get_cid(op
->cid
);
2840 tracepoint(objectstore
, mkcoll_enter
, osr_name
);
2841 if (_check_replay_guard(cid
, spos
) > 0)
2842 r
= _create_collection(cid
, op
->split_bits
, spos
);
2843 tracepoint(objectstore
, mkcoll_exit
, r
);
2847 case Transaction::OP_COLL_SET_BITS
:
2849 const coll_t
&cid
= i
.get_cid(op
->cid
);
2850 int bits
= op
->split_bits
;
2851 r
= _collection_set_bits(cid
, bits
);
2855 case Transaction::OP_COLL_HINT
:
2857 const coll_t
&cid
= i
.get_cid(op
->cid
);
2858 uint32_t type
= op
->hint_type
;
2861 bufferlist::iterator hiter
= hint
.begin();
2862 if (type
== Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS
) {
2865 ::decode(pg_num
, hiter
);
2866 ::decode(num_objs
, hiter
);
2867 if (_check_replay_guard(cid
, spos
) > 0) {
2868 r
= _collection_hint_expected_num_objs(cid
, pg_num
, num_objs
, spos
);
2872 dout(10) << "Unrecognized collection hint type: " << type
<< dendl
;
2877 case Transaction::OP_RMCOLL
:
2879 const coll_t
&cid
= i
.get_cid(op
->cid
);
2880 tracepoint(objectstore
, rmcoll_enter
, osr_name
);
2881 if (_check_replay_guard(cid
, spos
) > 0)
2882 r
= _destroy_collection(cid
);
2883 tracepoint(objectstore
, rmcoll_exit
, r
);
2887 case Transaction::OP_COLL_ADD
:
2889 const coll_t
&ocid
= i
.get_cid(op
->cid
);
2890 const coll_t
&ncid
= i
.get_cid(op
->dest_cid
);
2891 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2893 assert(oid
.hobj
.pool
>= -1);
2895 // always followed by OP_COLL_REMOVE
2896 Transaction::Op
*op2
= i
.decode_op();
2897 const coll_t
&ocid2
= i
.get_cid(op2
->cid
);
2898 const ghobject_t
&oid2
= i
.get_oid(op2
->oid
);
2899 assert(op2
->op
== Transaction::OP_COLL_REMOVE
);
2900 assert(ocid2
== ocid
);
2901 assert(oid2
== oid
);
2903 tracepoint(objectstore
, coll_add_enter
);
2904 r
= _collection_add(ncid
, ocid
, oid
, spos
);
2905 tracepoint(objectstore
, coll_add_exit
, r
);
2909 tracepoint(objectstore
, coll_remove_enter
, osr_name
);
2910 if (_check_replay_guard(ocid
, oid
, spos
) > 0)
2911 r
= _remove(ocid
, oid
, spos
);
2912 tracepoint(objectstore
, coll_remove_exit
, r
);
2916 case Transaction::OP_COLL_MOVE
:
2918 // WARNING: this is deprecated and buggy; only here to replay old journals.
2919 const coll_t
&ocid
= i
.get_cid(op
->cid
);
2920 const coll_t
&ncid
= i
.get_cid(op
->dest_cid
);
2921 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2922 tracepoint(objectstore
, coll_move_enter
);
2923 r
= _collection_add(ocid
, ncid
, oid
, spos
);
2925 (_check_replay_guard(ocid
, oid
, spos
) > 0))
2926 r
= _remove(ocid
, oid
, spos
);
2927 tracepoint(objectstore
, coll_move_exit
, r
);
2931 case Transaction::OP_COLL_MOVE_RENAME
:
2933 const coll_t
&_oldcid
= i
.get_cid(op
->cid
);
2934 const ghobject_t
&oldoid
= i
.get_oid(op
->oid
);
2935 const coll_t
&_newcid
= i
.get_cid(op
->dest_cid
);
2936 const ghobject_t
&newoid
= i
.get_oid(op
->dest_oid
);
2937 const coll_t
&oldcid
= !_need_temp_object_collection(_oldcid
, oldoid
) ?
2938 _oldcid
: _oldcid
.get_temp();
2939 const coll_t
&newcid
= !_need_temp_object_collection(_newcid
, newoid
) ?
2940 _oldcid
: _newcid
.get_temp();
2941 tracepoint(objectstore
, coll_move_rename_enter
);
2942 r
= _collection_move_rename(oldcid
, oldoid
, newcid
, newoid
, spos
);
2943 tracepoint(objectstore
, coll_move_rename_exit
, r
);
2947 case Transaction::OP_TRY_RENAME
:
2949 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2950 const ghobject_t
&oldoid
= i
.get_oid(op
->oid
);
2951 const ghobject_t
&newoid
= i
.get_oid(op
->dest_oid
);
2952 const coll_t
&oldcid
= !_need_temp_object_collection(_cid
, oldoid
) ?
2953 _cid
: _cid
.get_temp();
2954 const coll_t
&newcid
= !_need_temp_object_collection(_cid
, newoid
) ?
2955 _cid
: _cid
.get_temp();
2956 tracepoint(objectstore
, coll_try_rename_enter
);
2957 r
= _collection_move_rename(oldcid
, oldoid
, newcid
, newoid
, spos
, true);
2958 tracepoint(objectstore
, coll_try_rename_exit
, r
);
2962 case Transaction::OP_COLL_SETATTR
:
2963 case Transaction::OP_COLL_RMATTR
:
2964 assert(0 == "collection attr methods no longer implemented");
2967 case Transaction::OP_STARTSYNC
:
2968 tracepoint(objectstore
, startsync_enter
, osr_name
);
2970 tracepoint(objectstore
, startsync_exit
);
2973 case Transaction::OP_COLL_RENAME
:
2979 case Transaction::OP_OMAP_CLEAR
:
2981 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2982 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2983 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2984 _cid
: _cid
.get_temp();
2985 tracepoint(objectstore
, omap_clear_enter
, osr_name
);
2986 if (_check_replay_guard(cid
, oid
, spos
) > 0)
2987 r
= _omap_clear(cid
, oid
, spos
);
2988 tracepoint(objectstore
, omap_clear_exit
, r
);
2991 case Transaction::OP_OMAP_SETKEYS
:
2993 const coll_t
&_cid
= i
.get_cid(op
->cid
);
2994 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
2995 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
2996 _cid
: _cid
.get_temp();
2997 map
<string
, bufferlist
> aset
;
2998 i
.decode_attrset(aset
);
2999 tracepoint(objectstore
, omap_setkeys_enter
, osr_name
);
3000 if (_check_replay_guard(cid
, oid
, spos
) > 0)
3001 r
= _omap_setkeys(cid
, oid
, aset
, spos
);
3002 tracepoint(objectstore
, omap_setkeys_exit
, r
);
3005 case Transaction::OP_OMAP_RMKEYS
:
3007 const coll_t
&_cid
= i
.get_cid(op
->cid
);
3008 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
3009 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
3010 _cid
: _cid
.get_temp();
3012 i
.decode_keyset(keys
);
3013 tracepoint(objectstore
, omap_rmkeys_enter
, osr_name
);
3014 if (_check_replay_guard(cid
, oid
, spos
) > 0)
3015 r
= _omap_rmkeys(cid
, oid
, keys
, spos
);
3016 tracepoint(objectstore
, omap_rmkeys_exit
, r
);
3019 case Transaction::OP_OMAP_RMKEYRANGE
:
3021 const coll_t
&_cid
= i
.get_cid(op
->cid
);
3022 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
3023 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
3024 _cid
: _cid
.get_temp();
3026 first
= i
.decode_string();
3027 last
= i
.decode_string();
3028 tracepoint(objectstore
, omap_rmkeyrange_enter
, osr_name
);
3029 if (_check_replay_guard(cid
, oid
, spos
) > 0)
3030 r
= _omap_rmkeyrange(cid
, oid
, first
, last
, spos
);
3031 tracepoint(objectstore
, omap_rmkeyrange_exit
, r
);
3034 case Transaction::OP_OMAP_SETHEADER
:
3036 const coll_t
&_cid
= i
.get_cid(op
->cid
);
3037 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
3038 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
3039 _cid
: _cid
.get_temp();
3042 tracepoint(objectstore
, omap_setheader_enter
, osr_name
);
3043 if (_check_replay_guard(cid
, oid
, spos
) > 0)
3044 r
= _omap_setheader(cid
, oid
, bl
, spos
);
3045 tracepoint(objectstore
, omap_setheader_exit
, r
);
3048 case Transaction::OP_SPLIT_COLLECTION
:
3050 assert(0 == "not legacy journal; upgrade to firefly first");
3053 case Transaction::OP_SPLIT_COLLECTION2
:
3055 coll_t cid
= i
.get_cid(op
->cid
);
3056 uint32_t bits
= op
->split_bits
;
3057 uint32_t rem
= op
->split_rem
;
3058 coll_t dest
= i
.get_cid(op
->dest_cid
);
3059 tracepoint(objectstore
, split_coll2_enter
, osr_name
);
3060 r
= _split_collection(cid
, bits
, rem
, dest
, spos
);
3061 tracepoint(objectstore
, split_coll2_exit
, r
);
3065 case Transaction::OP_SETALLOCHINT
:
3067 const coll_t
&_cid
= i
.get_cid(op
->cid
);
3068 const ghobject_t
&oid
= i
.get_oid(op
->oid
);
3069 const coll_t
&cid
= !_need_temp_object_collection(_cid
, oid
) ?
3070 _cid
: _cid
.get_temp();
3071 uint64_t expected_object_size
= op
->expected_object_size
;
3072 uint64_t expected_write_size
= op
->expected_write_size
;
3073 tracepoint(objectstore
, setallochint_enter
, osr_name
);
3074 if (_check_replay_guard(cid
, oid
, spos
) > 0)
3075 r
= _set_alloc_hint(cid
, oid
, expected_object_size
,
3076 expected_write_size
);
3077 tracepoint(objectstore
, setallochint_exit
, r
);
3082 derr
<< "bad op " << op
->op
<< dendl
;
3089 if (r
== -ENOENT
&& !(op
->op
== Transaction::OP_CLONERANGE
||
3090 op
->op
== Transaction::OP_CLONE
||
3091 op
->op
== Transaction::OP_CLONERANGE2
||
3092 op
->op
== Transaction::OP_COLL_ADD
||
3093 op
->op
== Transaction::OP_SETATTR
||
3094 op
->op
== Transaction::OP_SETATTRS
||
3095 op
->op
== Transaction::OP_RMATTR
||
3096 op
->op
== Transaction::OP_OMAP_SETKEYS
||
3097 op
->op
== Transaction::OP_OMAP_RMKEYS
||
3098 op
->op
== Transaction::OP_OMAP_RMKEYRANGE
||
3099 op
->op
== Transaction::OP_OMAP_SETHEADER
))
3100 // -ENOENT is normally okay
3101 // ...including on a replayed OP_RMCOLL with checkpoint mode
3106 if (op
->op
== Transaction::OP_SETALLOCHINT
)
3107 // Either EOPNOTSUPP or EINVAL most probably. EINVAL in most
3108 // cases means invalid hint size (e.g. too big, not a multiple
3109 // of block size, etc) or, at least on xfs, an attempt to set
3110 // or change it when the file is not empty. However,
3111 // OP_SETALLOCHINT is advisory, so ignore all errors.
3114 if (replaying
&& !backend
->can_checkpoint()) {
3115 if (r
== -EEXIST
&& op
->op
== Transaction::OP_MKCOLL
) {
3116 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl
;
3119 if (r
== -EEXIST
&& op
->op
== Transaction::OP_COLL_ADD
) {
3120 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl
;
3123 if (r
== -EEXIST
&& op
->op
== Transaction::OP_COLL_MOVE
) {
3124 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl
;
3128 dout(10) << "tolerating ERANGE on replay" << dendl
;
3132 dout(10) << "tolerating ENOENT on replay" << dendl
;
3138 const char *msg
= "unexpected error code";
3140 if (r
== -ENOENT
&& (op
->op
== Transaction::OP_CLONERANGE
||
3141 op
->op
== Transaction::OP_CLONE
||
3142 op
->op
== Transaction::OP_CLONERANGE2
)) {
3143 msg
= "ENOENT on clone suggests osd bug";
3144 } else if (r
== -ENOSPC
) {
3145 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
3146 // by partially applying transactions.
3147 msg
= "ENOSPC from disk filesystem, misconfigured cluster";
3148 } else if (r
== -ENOTEMPTY
) {
3149 msg
= "ENOTEMPTY suggests garbage data in osd data dir";
3150 } else if (r
== -EPERM
) {
3151 msg
= "EPERM suggests file(s) in osd data dir not owned by ceph user, or leveldb corruption";
3154 derr
<< " error " << cpp_strerror(r
) << " not handled on operation " << op
3155 << " (" << spos
<< ", or op " << spos
.op
<< ", counting from 0)" << dendl
;
3156 dout(0) << msg
<< dendl
;
3157 dout(0) << " transaction dump:\n";
3158 JSONFormatter
f(true);
3159 f
.open_object_section("transaction");
3169 assert(0 == "unexpected error");
3179 /*********************************************/
3183 // --------------------
3186 bool FileStore::exists(const coll_t
& _cid
, const ghobject_t
& oid
)
3188 tracepoint(objectstore
, exists_enter
, _cid
.c_str());
3189 const coll_t
& cid
= !_need_temp_object_collection(_cid
, oid
) ? _cid
: _cid
.get_temp();
3191 bool retval
= stat(cid
, oid
, &st
) == 0;
3192 tracepoint(objectstore
, exists_exit
, retval
);
3196 int FileStore::stat(
3197 const coll_t
& _cid
, const ghobject_t
& oid
, struct stat
*st
, bool allow_eio
)
3199 tracepoint(objectstore
, stat_enter
, _cid
.c_str());
3200 const coll_t
& cid
= !_need_temp_object_collection(_cid
, oid
) ? _cid
: _cid
.get_temp();
3201 int r
= lfn_stat(cid
, oid
, st
);
3202 assert(allow_eio
|| !m_filestore_fail_eio
|| r
!= -EIO
);
3204 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
3205 << " = " << r
<< dendl
;
3207 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
3209 << " (size " << st
->st_size
<< ")" << dendl
;
3211 if (cct
->_conf
->filestore_debug_inject_read_err
&&
3212 debug_mdata_eio(oid
)) {
3215 tracepoint(objectstore
, stat_exit
, r
);
3220 int FileStore::set_collection_opts(
3222 const pool_opts_t
& opts
)
3227 int FileStore::read(
3229 const ghobject_t
& oid
,
3236 tracepoint(objectstore
, read_enter
, _cid
.c_str(), offset
, len
);
3237 const coll_t
& cid
= !_need_temp_object_collection(_cid
, oid
) ? _cid
: _cid
.get_temp();
3239 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< dendl
;
3242 int r
= lfn_open(cid
, oid
, false, &fd
);
3244 dout(10) << __FUNC__
<< ": (" << cid
<< "/" << oid
<< ") open error: "
3245 << cpp_strerror(r
) << dendl
;
3249 if (offset
== 0 && len
== 0) {
3251 memset(&st
, 0, sizeof(struct stat
));
3252 int r
= ::fstat(**fd
, &st
);
3257 #ifdef HAVE_POSIX_FADVISE
3258 if (op_flags
& CEPH_OSD_OP_FLAG_FADVISE_RANDOM
)
3259 posix_fadvise(**fd
, offset
, len
, POSIX_FADV_RANDOM
);
3260 if (op_flags
& CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
)
3261 posix_fadvise(**fd
, offset
, len
, POSIX_FADV_SEQUENTIAL
);
3264 bufferptr
bptr(len
); // prealloc space for entire read
3265 got
= safe_pread(**fd
, bptr
.c_str(), len
, offset
);
3267 dout(10) << __FUNC__
<< ": (" << cid
<< "/" << oid
<< ") pread error: " << cpp_strerror(got
) << dendl
;
3271 bptr
.set_length(got
); // properly size the buffer
3273 bl
.push_back(std::move(bptr
)); // put it in the target bufferlist
3275 #ifdef HAVE_POSIX_FADVISE
3276 if (op_flags
& CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
)
3277 posix_fadvise(**fd
, offset
, len
, POSIX_FADV_DONTNEED
);
3278 if (op_flags
& (CEPH_OSD_OP_FLAG_FADVISE_RANDOM
| CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
))
3279 posix_fadvise(**fd
, offset
, len
, POSIX_FADV_NORMAL
);
3282 if (m_filestore_sloppy_crc
&& (!replaying
|| backend
->can_checkpoint())) {
3284 int errors
= backend
->_crc_verify_read(**fd
, offset
, got
, bl
, &ss
);
3286 dout(0) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " " << offset
<< "~"
3287 << got
<< " ... BAD CRC:\n" << ss
.str() << dendl
;
3288 assert(0 == "bad crc on read");
3294 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " " << offset
<< "~"
3295 << got
<< "/" << len
<< dendl
;
3296 if (cct
->_conf
->filestore_debug_inject_read_err
&&
3297 debug_data_eio(oid
)) {
3299 } else if (cct
->_conf
->filestore_debug_random_read_err
&&
3300 (rand() % (int)(cct
->_conf
->filestore_debug_random_read_err
* 100.0)) == 0) {
3301 dout(0) << __func__
<< ": inject random EIO" << dendl
;
3304 tracepoint(objectstore
, read_exit
, got
);
3309 int FileStore::_do_fiemap(int fd
, uint64_t offset
, size_t len
,
3310 map
<uint64_t, uint64_t> *m
)
3313 struct fiemap_extent
*extent
= NULL
;
3314 struct fiemap
*fiemap
= NULL
;
3318 r
= backend
->do_fiemap(fd
, offset
, len
, &fiemap
);
3322 if (fiemap
->fm_mapped_extents
== 0) {
3327 extent
= &fiemap
->fm_extents
[0];
3329 /* start where we were asked to start */
3330 if (extent
->fe_logical
< offset
) {
3331 extent
->fe_length
-= offset
- extent
->fe_logical
;
3332 extent
->fe_logical
= offset
;
3337 struct fiemap_extent
*last
= nullptr;
3338 while (i
< fiemap
->fm_mapped_extents
) {
3339 struct fiemap_extent
*next
= extent
+ 1;
3341 dout(10) << __FUNC__
<< ": fm_mapped_extents=" << fiemap
->fm_mapped_extents
3342 << " fe_logical=" << extent
->fe_logical
<< " fe_length=" << extent
->fe_length
<< dendl
;
3344 /* try to merge extents */
3345 while ((i
< fiemap
->fm_mapped_extents
- 1) &&
3346 (extent
->fe_logical
+ extent
->fe_length
== next
->fe_logical
)) {
3347 next
->fe_length
+= extent
->fe_length
;
3348 next
->fe_logical
= extent
->fe_logical
;
3354 if (extent
->fe_logical
+ extent
->fe_length
> offset
+ len
)
3355 extent
->fe_length
= offset
+ len
- extent
->fe_logical
;
3356 (*m
)[extent
->fe_logical
] = extent
->fe_length
;
3360 uint64_t xoffset
= last
->fe_logical
+ last
->fe_length
- offset
;
3361 offset
= last
->fe_logical
+ last
->fe_length
;
3363 const bool is_last
= (last
->fe_flags
& FIEMAP_EXTENT_LAST
) || (len
== 0);
3372 int FileStore::_do_seek_hole_data(int fd
, uint64_t offset
, size_t len
,
3373 map
<uint64_t, uint64_t> *m
)
3375 #if defined(__linux__) && defined(SEEK_HOLE) && defined(SEEK_DATA)
3376 off_t hole_pos
, data_pos
;
3379 // If lseek fails with errno setting to be ENXIO, this means the current
3380 // file offset is beyond the end of the file.
3381 off_t start
= offset
;
3382 while(start
< (off_t
)(offset
+ len
)) {
3383 data_pos
= lseek(fd
, start
, SEEK_DATA
);
3389 dout(10) << "failed to lseek: " << cpp_strerror(r
) << dendl
;
3392 } else if (data_pos
> (off_t
)(offset
+ len
)) {
3396 hole_pos
= lseek(fd
, data_pos
, SEEK_HOLE
);
3398 if (errno
== ENXIO
) {
3402 dout(10) << "failed to lseek: " << cpp_strerror(r
) << dendl
;
3407 if (hole_pos
>= (off_t
)(offset
+ len
)) {
3408 (*m
)[data_pos
] = offset
+ len
- data_pos
;
3411 (*m
)[data_pos
] = hole_pos
- data_pos
;
3422 int FileStore::fiemap(const coll_t
& _cid
, const ghobject_t
& oid
,
3423 uint64_t offset
, size_t len
,
3426 map
<uint64_t, uint64_t> exomap
;
3427 int r
= fiemap(_cid
, oid
, offset
, len
, exomap
);
3429 ::encode(exomap
, bl
);
3434 int FileStore::fiemap(const coll_t
& _cid
, const ghobject_t
& oid
,
3435 uint64_t offset
, size_t len
,
3436 map
<uint64_t, uint64_t>& destmap
)
3438 tracepoint(objectstore
, fiemap_enter
, _cid
.c_str(), offset
, len
);
3439 const coll_t
& cid
= !_need_temp_object_collection(_cid
, oid
) ? _cid
: _cid
.get_temp();
3442 if ((!backend
->has_seek_data_hole() && !backend
->has_fiemap()) ||
3443 len
<= (size_t)m_filestore_fiemap_threshold
) {
3444 destmap
[offset
] = len
;
3448 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< dendl
;
3452 int r
= lfn_open(cid
, oid
, false, &fd
);
3454 dout(10) << "read couldn't open " << cid
<< "/" << oid
<< ": " << cpp_strerror(r
) << dendl
;
3458 if (backend
->has_seek_data_hole()) {
3459 dout(15) << "seek_data/seek_hole " << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< dendl
;
3460 r
= _do_seek_hole_data(**fd
, offset
, len
, &destmap
);
3461 } else if (backend
->has_fiemap()) {
3462 dout(15) << "fiemap ioctl" << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< dendl
;
3463 r
= _do_fiemap(**fd
, offset
, len
, &destmap
);
3470 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< " = " << r
<< " num_extents=" << destmap
.size() << " " << destmap
<< dendl
;
3471 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
3472 tracepoint(objectstore
, fiemap_exit
, r
);
3476 int FileStore::_remove(const coll_t
& cid
, const ghobject_t
& oid
,
3477 const SequencerPosition
&spos
)
3479 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
<< dendl
;
3480 int r
= lfn_unlink(cid
, oid
, spos
);
3481 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " = " << r
<< dendl
;
3485 int FileStore::_truncate(const coll_t
& cid
, const ghobject_t
& oid
, uint64_t size
)
3487 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " size " << size
<< dendl
;
3488 int r
= lfn_truncate(cid
, oid
, size
);
3489 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " size " << size
<< " = " << r
<< dendl
;
3494 int FileStore::_touch(const coll_t
& cid
, const ghobject_t
& oid
)
3496 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
<< dendl
;
3499 int r
= lfn_open(cid
, oid
, true, &fd
);
3505 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " = " << r
<< dendl
;
3509 int FileStore::_write(const coll_t
& cid
, const ghobject_t
& oid
,
3510 uint64_t offset
, size_t len
,
3511 const bufferlist
& bl
, uint32_t fadvise_flags
)
3513 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< dendl
;
3517 r
= lfn_open(cid
, oid
, true, &fd
);
3519 dout(0) << __FUNC__
<< ": couldn't open " << cid
<< "/"
3521 << cpp_strerror(r
) << dendl
;
3526 r
= bl
.write_fd(**fd
, offset
);
3528 derr
<< __FUNC__
<< ": write_fd on " << cid
<< "/" << oid
3529 << " error: " << cpp_strerror(r
) << dendl
;
3535 if (r
>= 0 && m_filestore_sloppy_crc
) {
3536 int rc
= backend
->_crc_update_write(**fd
, offset
, len
, bl
);
3540 if (replaying
|| m_disable_wbthrottle
) {
3541 if (fadvise_flags
& CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
) {
3542 #ifdef HAVE_POSIX_FADVISE
3543 posix_fadvise(**fd
, 0, 0, POSIX_FADV_DONTNEED
);
3547 wbthrottle
.queue_wb(fd
, oid
, offset
, len
,
3548 fadvise_flags
& CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
);
3554 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< " = " << r
<< dendl
;
3558 int FileStore::_zero(const coll_t
& cid
, const ghobject_t
& oid
, uint64_t offset
, size_t len
)
3560 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< dendl
;
3563 if (cct
->_conf
->filestore_punch_hole
) {
3564 #ifdef CEPH_HAVE_FALLOCATE
3565 # if !defined(DARWIN) && !defined(__FreeBSD__)
3566 # ifdef FALLOC_FL_KEEP_SIZE
3567 // first try to punch a hole.
3569 ret
= lfn_open(cid
, oid
, false, &fd
);
3575 ret
= ::fstat(**fd
, &st
);
3582 // first try fallocate
3583 ret
= fallocate(**fd
, FALLOC_FL_KEEP_SIZE
| FALLOC_FL_PUNCH_HOLE
,
3588 // ensure we extend file size, if needed
3589 if (len
> 0 && offset
+ len
> (uint64_t)st
.st_size
) {
3590 ret
= ::ftruncate(**fd
, offset
+ len
);
3600 if (ret
>= 0 && m_filestore_sloppy_crc
) {
3601 int rc
= backend
->_crc_update_zero(**fd
, offset
, len
);
3607 if (ret
!= -EOPNOTSUPP
)
3608 goto out
; // some other error
3614 // lame, kernel is old and doesn't support it.
3615 // write zeros.. yuck!
3616 dout(20) << __FUNC__
<< ": falling back to writing zeros" << dendl
;
3619 bl
.append_zero(len
);
3620 ret
= _write(cid
, oid
, offset
, len
, bl
);
3623 #ifdef CEPH_HAVE_FALLOCATE
3624 # if !defined(DARWIN) && !defined(__FreeBSD__)
3625 # ifdef FALLOC_FL_KEEP_SIZE
3630 dout(20) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " " << offset
<< "~" << len
<< " = " << ret
<< dendl
;
3634 int FileStore::_clone(const coll_t
& cid
, const ghobject_t
& oldoid
, const ghobject_t
& newoid
,
3635 const SequencerPosition
& spos
)
3637 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oldoid
<< " -> " << cid
<< "/" << newoid
<< dendl
;
3639 if (_check_replay_guard(cid
, newoid
, spos
) < 0)
3646 r
= lfn_open(cid
, oldoid
, false, &o
, &index
);
3650 assert(NULL
!= (index
.index
));
3651 RWLock::WLocker
l((index
.index
)->access_lock
);
3653 r
= lfn_open(cid
, newoid
, true, &n
, &index
);
3657 r
= ::ftruncate(**n
, 0);
3663 r
= ::fstat(**o
, &st
);
3669 r
= _do_clone_range(**o
, **n
, 0, st
.st_size
, 0);
3674 dout(20) << "objectmap clone" << dendl
;
3675 r
= object_map
->clone(oldoid
, newoid
, &spos
);
3676 if (r
< 0 && r
!= -ENOENT
)
3682 map
<string
, bufferptr
> aset
;
3683 r
= _fgetattrs(**o
, aset
);
3687 r
= chain_fgetxattr(**o
, XATTR_SPILL_OUT_NAME
, buf
, sizeof(buf
));
3688 if (r
>= 0 && !strncmp(buf
, XATTR_NO_SPILL_OUT
, sizeof(XATTR_NO_SPILL_OUT
))) {
3689 r
= chain_fsetxattr
<true, true>(**n
, XATTR_SPILL_OUT_NAME
, XATTR_NO_SPILL_OUT
,
3690 sizeof(XATTR_NO_SPILL_OUT
));
3692 r
= chain_fsetxattr
<true, true>(**n
, XATTR_SPILL_OUT_NAME
, XATTR_SPILL_OUT
,
3693 sizeof(XATTR_SPILL_OUT
));
3698 r
= _fsetattrs(**n
, aset
);
3703 // clone is non-idempotent; record our work.
3704 _set_replay_guard(**n
, spos
, &newoid
);
3711 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oldoid
<< " -> " << cid
<< "/" << newoid
<< " = " << r
<< dendl
;
3712 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
3716 int FileStore::_do_clone_range(int from
, int to
, uint64_t srcoff
, uint64_t len
, uint64_t dstoff
)
3718 dout(20) << __FUNC__
<< ": copy " << srcoff
<< "~" << len
<< " to " << dstoff
<< dendl
;
3719 return backend
->clone_range(from
, to
, srcoff
, len
, dstoff
);
3722 int FileStore::_do_sparse_copy_range(int from
, int to
, uint64_t srcoff
, uint64_t len
, uint64_t dstoff
)
3724 dout(20) << __FUNC__
<< ": " << srcoff
<< "~" << len
<< " to " << dstoff
<< dendl
;
3726 map
<uint64_t, uint64_t> exomap
;
3727 // fiemap doesn't allow zero length
3731 if (backend
->has_seek_data_hole()) {
3732 dout(15) << "seek_data/seek_hole " << from
<< " " << srcoff
<< "~" << len
<< dendl
;
3733 r
= _do_seek_hole_data(from
, srcoff
, len
, &exomap
);
3734 } else if (backend
->has_fiemap()) {
3735 dout(15) << "fiemap ioctl" << from
<< " " << srcoff
<< "~" << len
<< dendl
;
3736 r
= _do_fiemap(from
, srcoff
, len
, &exomap
);
3740 int64_t written
= 0;
3744 for (map
<uint64_t, uint64_t>::iterator miter
= exomap
.begin(); miter
!= exomap
.end(); ++miter
) {
3745 uint64_t it_off
= miter
->first
- srcoff
+ dstoff
;
3746 r
= _do_copy_range(from
, to
, miter
->first
, miter
->second
, it_off
, true);
3748 derr
<< __FUNC__
<< ": copy error at " << miter
->first
<< "~" << miter
->second
3749 << " to " << it_off
<< ", " << cpp_strerror(r
) << dendl
;
3752 written
+= miter
->second
;
3756 if (m_filestore_sloppy_crc
) {
3757 int rc
= backend
->_crc_update_clone_range(from
, to
, srcoff
, len
, dstoff
);
3761 r
= ::fstat(to
, &st
);
3764 derr
<< __FUNC__
<< ": fstat error at " << to
<< " " << cpp_strerror(r
) << dendl
;
3767 if (st
.st_size
< (int)(dstoff
+ len
)) {
3768 r
= ::ftruncate(to
, dstoff
+ len
);
3771 derr
<< __FUNC__
<< ": ftruncate error at " << dstoff
+len
<< " " << cpp_strerror(r
) << dendl
;
3779 dout(20) << __FUNC__
<< ": " << srcoff
<< "~" << len
<< " to " << dstoff
<< " = " << r
<< dendl
;
3783 int FileStore::_do_copy_range(int from
, int to
, uint64_t srcoff
, uint64_t len
, uint64_t dstoff
, bool skip_sloppycrc
)
3785 dout(20) << __FUNC__
<< ": " << srcoff
<< "~" << len
<< " to " << dstoff
<< dendl
;
3787 loff_t pos
= srcoff
;
3788 loff_t end
= srcoff
+ len
;
3789 int buflen
= 4096 * 16; //limit by pipe max size.see fcntl
3791 #ifdef CEPH_HAVE_SPLICE
3792 if (backend
->has_splice()) {
3794 if (pipe(pipefd
) < 0) {
3796 derr
<< " pipe " << " got " << cpp_strerror(r
) << dendl
;
3800 loff_t dstpos
= dstoff
;
3802 int l
= MIN(end
-pos
, buflen
);
3803 r
= safe_splice(from
, &pos
, pipefd
[1], NULL
, l
, SPLICE_F_NONBLOCK
);
3804 dout(10) << " safe_splice read from " << pos
<< "~" << l
<< " got " << r
<< dendl
;
3806 derr
<< __FUNC__
<< ": safe_splice read error at " << pos
<< "~" << len
3807 << ", " << cpp_strerror(r
) << dendl
;
3811 // hrm, bad source range, wtf.
3813 derr
<< __FUNC__
<< ": got short read result at " << pos
3814 << " of fd " << from
<< " len " << len
<< dendl
;
3818 r
= safe_splice(pipefd
[0], NULL
, to
, &dstpos
, r
, 0);
3819 dout(10) << " safe_splice write to " << to
<< " len " << r
3820 << " got " << r
<< dendl
;
3822 derr
<< __FUNC__
<< ": write error at " << pos
<< "~"
3823 << r
<< ", " << cpp_strerror(r
) << dendl
;
3834 actual
= ::lseek64(from
, srcoff
, SEEK_SET
);
3835 if (actual
!= (int64_t)srcoff
) {
3840 derr
<< "lseek64 to " << srcoff
<< " got " << cpp_strerror(r
) << dendl
;
3843 actual
= ::lseek64(to
, dstoff
, SEEK_SET
);
3844 if (actual
!= (int64_t)dstoff
) {
3849 derr
<< "lseek64 to " << dstoff
<< " got " << cpp_strerror(r
) << dendl
;
3855 int l
= MIN(end
-pos
, buflen
);
3856 r
= ::read(from
, buf
, l
);
3857 dout(25) << " read from " << pos
<< "~" << l
<< " got " << r
<< dendl
;
3859 if (errno
== EINTR
) {
3863 derr
<< __FUNC__
<< ": read error at " << pos
<< "~" << len
3864 << ", " << cpp_strerror(r
) << dendl
;
3869 // hrm, bad source range, wtf.
3871 derr
<< __FUNC__
<< ": got short read result at " << pos
3872 << " of fd " << from
<< " len " << len
<< dendl
;
3877 int r2
= safe_write(to
, buf
+op
, r
-op
);
3878 dout(25) << " write to " << to
<< " len " << (r
-op
)
3879 << " got " << r2
<< dendl
;
3882 derr
<< __FUNC__
<< ": write error at " << pos
<< "~"
3883 << r
-op
<< ", " << cpp_strerror(r
) << dendl
;
3895 if (r
< 0 && replaying
) {
3896 assert(r
== -ERANGE
);
3897 derr
<< __FUNC__
<< ": short source tolerated because we are replaying" << dendl
;
3900 assert(replaying
|| pos
== end
);
3901 if (r
>= 0 && !skip_sloppycrc
&& m_filestore_sloppy_crc
) {
3902 int rc
= backend
->_crc_update_clone_range(from
, to
, srcoff
, len
, dstoff
);
3905 dout(20) << __FUNC__
<< ": " << srcoff
<< "~" << len
<< " to " << dstoff
<< " = " << r
<< dendl
;
3909 int FileStore::_clone_range(const coll_t
& oldcid
, const ghobject_t
& oldoid
, const coll_t
& newcid
, const ghobject_t
& newoid
,
3910 uint64_t srcoff
, uint64_t len
, uint64_t dstoff
,
3911 const SequencerPosition
& spos
)
3913 dout(15) << __FUNC__
<< ": " << oldcid
<< "/" << oldoid
<< " -> " << newcid
<< "/" << newoid
<< " " << srcoff
<< "~" << len
<< " to " << dstoff
<< dendl
;
3915 if (_check_replay_guard(newcid
, newoid
, spos
) < 0)
3920 r
= lfn_open(oldcid
, oldoid
, false, &o
);
3924 r
= lfn_open(newcid
, newoid
, true, &n
);
3928 r
= _do_clone_range(**o
, **n
, srcoff
, len
, dstoff
);
3933 // clone is non-idempotent; record our work.
3934 _set_replay_guard(**n
, spos
, &newoid
);
3941 dout(10) << __FUNC__
<< ": " << oldcid
<< "/" << oldoid
<< " -> " << newcid
<< "/" << newoid
<< " "
3942 << srcoff
<< "~" << len
<< " to " << dstoff
<< " = " << r
<< dendl
;
3946 class SyncEntryTimeout
: public Context
{
3949 explicit SyncEntryTimeout(CephContext
* cct
, int commit_timeo
)
3950 : cct(cct
), m_commit_timeo(commit_timeo
)
3954 void finish(int r
) override
{
3955 BackTrace
*bt
= new BackTrace(1);
3956 generic_dout(-1) << "FileStore: sync_entry timed out after "
3957 << m_commit_timeo
<< " seconds.\n";
3967 void FileStore::sync_entry()
3971 utime_t max_interval
;
3972 max_interval
.set_from_double(m_filestore_max_sync_interval
);
3973 utime_t min_interval
;
3974 min_interval
.set_from_double(m_filestore_min_sync_interval
);
3976 utime_t startwait
= ceph_clock_now();
3978 dout(20) << __FUNC__
<< ": waiting for max_interval " << max_interval
<< dendl
;
3979 sync_cond
.WaitInterval(lock
, max_interval
);
3981 dout(20) << __FUNC__
<< ": not waiting, force_sync set" << dendl
;
3985 dout(20) << __FUNC__
<< ": force_sync set" << dendl
;
3988 dout(20) << __FUNC__
<< ": stop set" << dendl
;
3991 // wait for at least the min interval
3992 utime_t woke
= ceph_clock_now();
3994 dout(20) << __FUNC__
<< ": woke after " << woke
<< dendl
;
3995 if (woke
< min_interval
) {
3996 utime_t t
= min_interval
;
3998 dout(20) << __FUNC__
<< ": waiting for another " << t
3999 << " to reach min interval " << min_interval
<< dendl
;
4000 sync_cond
.WaitInterval(lock
, t
);
4006 fin
.swap(sync_waiters
);
4010 if (apply_manager
.commit_start()) {
4011 utime_t start
= ceph_clock_now();
4012 uint64_t cp
= apply_manager
.get_committing_seq();
4014 sync_entry_timeo_lock
.Lock();
4015 SyncEntryTimeout
*sync_entry_timeo
=
4016 new SyncEntryTimeout(cct
, m_filestore_commit_timeout
);
4017 if (!timer
.add_event_after(m_filestore_commit_timeout
,
4018 sync_entry_timeo
)) {
4019 sync_entry_timeo
= nullptr;
4021 sync_entry_timeo_lock
.Unlock();
4023 logger
->set(l_filestore_committing
, 1);
4025 dout(15) << __FUNC__
<< ": committing " << cp
<< dendl
;
4026 stringstream errstream
;
4027 if (cct
->_conf
->filestore_debug_omap_check
&& !object_map
->check(errstream
)) {
4028 derr
<< errstream
.str() << dendl
;
4032 if (backend
->can_checkpoint()) {
4033 int err
= write_op_seq(op_fd
, cp
);
4035 derr
<< "Error during write_op_seq: " << cpp_strerror(err
) << dendl
;
4036 assert(0 == "error during write_op_seq");
4040 snprintf(s
, sizeof(s
), COMMIT_SNAP_ITEM
, (long long unsigned)cp
);
4042 err
= backend
->create_checkpoint(s
, &cid
);
4045 derr
<< "snap create '" << s
<< "' got error " << err
<< dendl
;
4049 snaps
.push_back(cp
);
4050 apply_manager
.commit_started();
4054 dout(20) << " waiting for checkpoint " << cid
<< " to complete" << dendl
;
4055 err
= backend
->sync_checkpoint(cid
);
4057 derr
<< "ioctl WAIT_SYNC got " << cpp_strerror(err
) << dendl
;
4058 assert(0 == "wait_sync got error");
4060 dout(20) << " done waiting for checkpoint " << cid
<< " to complete" << dendl
;
4063 apply_manager
.commit_started();
4066 int err
= object_map
->sync();
4068 derr
<< "object_map sync got " << cpp_strerror(err
) << dendl
;
4069 assert(0 == "object_map sync returned error");
4072 err
= backend
->syncfs();
4074 derr
<< "syncfs got " << cpp_strerror(err
) << dendl
;
4075 assert(0 == "syncfs returned error");
4078 err
= write_op_seq(op_fd
, cp
);
4080 derr
<< "Error during write_op_seq: " << cpp_strerror(err
) << dendl
;
4081 assert(0 == "error during write_op_seq");
4083 err
= ::fsync(op_fd
);
4085 derr
<< "Error during fsync of op_seq: " << cpp_strerror(err
) << dendl
;
4086 assert(0 == "error during fsync of op_seq");
4090 utime_t done
= ceph_clock_now();
4091 utime_t lat
= done
- start
;
4092 utime_t dur
= done
- startwait
;
4093 dout(10) << __FUNC__
<< ": commit took " << lat
<< ", interval was " << dur
<< dendl
;
4094 utime_t max_pause_lat
= logger
->tget(l_filestore_sync_pause_max_lat
);
4095 if (max_pause_lat
< dur
- lat
) {
4096 logger
->tinc(l_filestore_sync_pause_max_lat
, dur
- lat
);
4099 logger
->inc(l_filestore_commitcycle
);
4100 logger
->tinc(l_filestore_commitcycle_latency
, lat
);
4101 logger
->tinc(l_filestore_commitcycle_interval
, dur
);
4103 apply_manager
.commit_finish();
4104 if (!m_disable_wbthrottle
) {
4108 logger
->set(l_filestore_committing
, 0);
4110 // remove old snaps?
4111 if (backend
->can_checkpoint()) {
4113 while (snaps
.size() > 2) {
4114 snprintf(s
, sizeof(s
), COMMIT_SNAP_ITEM
, (long long unsigned)snaps
.front());
4116 dout(10) << "removing snap '" << s
<< "'" << dendl
;
4117 int r
= backend
->destroy_checkpoint(s
);
4120 derr
<< "unable to destroy snap '" << s
<< "' got " << cpp_strerror(err
) << dendl
;
4125 dout(15) << __FUNC__
<< ": committed to op_seq " << cp
<< dendl
;
4127 if (sync_entry_timeo
) {
4128 Mutex::Locker
lock(sync_entry_timeo_lock
);
4129 timer
.cancel_event(sync_entry_timeo
);
4136 finish_contexts(cct
, fin
, 0);
4138 if (!sync_waiters
.empty()) {
4139 dout(10) << __FUNC__
<< ": more waiters, committing again" << dendl
;
4142 if (!stop
&& journal
&& journal
->should_commit_now()) {
4143 dout(10) << __FUNC__
<< ": journal says we should commit again (probably is/was full)" << dendl
;
4151 void FileStore::_start_sync()
4153 if (!journal
) { // don't do a big sync if the journal is on
4154 dout(10) << __FUNC__
<< dendl
;
4157 dout(10) << __FUNC__
<< ": - NOOP (journal is on)" << dendl
;
4161 void FileStore::do_force_sync()
4163 dout(10) << __FUNC__
<< dendl
;
4164 Mutex::Locker
l(lock
);
4169 void FileStore::start_sync(Context
*onsafe
)
4171 Mutex::Locker
l(lock
);
4172 sync_waiters
.push_back(onsafe
);
4175 dout(10) << __FUNC__
<< dendl
;
4178 void FileStore::sync()
4180 Mutex
l("FileStore::sync");
4183 C_SafeCond
*fin
= new C_SafeCond(&l
, &c
, &done
);
4189 dout(10) << "sync waiting" << dendl
;
4193 dout(10) << "sync done" << dendl
;
4196 void FileStore::_flush_op_queue()
4198 dout(10) << __FUNC__
<< ": draining op tp" << dendl
;
4200 dout(10) << __FUNC__
<< ": waiting for apply finisher" << dendl
;
4201 for (vector
<Finisher
*>::iterator it
= apply_finishers
.begin(); it
!= apply_finishers
.end(); ++it
) {
4202 (*it
)->wait_for_empty();
4207 * flush - make every queued write readable
4209 void FileStore::flush()
4211 dout(10) << __FUNC__
<< dendl
;
4213 if (cct
->_conf
->filestore_blackhole
) {
4215 Mutex
lock("FileStore::flush::lock");
4223 if (m_filestore_journal_writeahead
) {
4226 dout(10) << __FUNC__
<< ": draining ondisk finisher" << dendl
;
4227 for (vector
<Finisher
*>::iterator it
= ondisk_finishers
.begin(); it
!= ondisk_finishers
.end(); ++it
) {
4228 (*it
)->wait_for_empty();
4233 dout(10) << __FUNC__
<< ": complete" << dendl
;
4237 * sync_and_flush - make every queued write readable AND committed to disk
4239 void FileStore::sync_and_flush()
4241 dout(10) << __FUNC__
<< dendl
;
4243 if (m_filestore_journal_writeahead
) {
4248 // includes m_filestore_journal_parallel
4252 dout(10) << __FUNC__
<< ": done" << dendl
;
4255 int FileStore::flush_journal()
4257 dout(10) << __FUNC__
<< dendl
;
4263 int FileStore::snapshot(const string
& name
)
4265 dout(10) << __FUNC__
<< ": " << name
<< dendl
;
4268 if (!backend
->can_checkpoint()) {
4269 dout(0) << __FUNC__
<< ": " << name
<< " failed, not supported" << dendl
;
4274 snprintf(s
, sizeof(s
), CLUSTER_SNAP_ITEM
, name
.c_str());
4276 int r
= backend
->create_checkpoint(s
, NULL
);
4278 derr
<< __FUNC__
<< ": " << name
<< " failed: " << cpp_strerror(r
) << dendl
;
4284 // -------------------------------
4287 int FileStore::_fgetattr(int fd
, const char *name
, bufferptr
& bp
)
4289 char val
[CHAIN_XATTR_MAX_BLOCK_LEN
];
4290 int l
= chain_fgetxattr(fd
, name
, val
, sizeof(val
));
4292 bp
= buffer::create(l
);
4293 memcpy(bp
.c_str(), val
, l
);
4294 } else if (l
== -ERANGE
) {
4295 l
= chain_fgetxattr(fd
, name
, 0, 0);
4297 bp
= buffer::create(l
);
4298 l
= chain_fgetxattr(fd
, name
, bp
.c_str(), l
);
4301 assert(!m_filestore_fail_eio
|| l
!= -EIO
);
4305 int FileStore::_fgetattrs(int fd
, map
<string
,bufferptr
>& aset
)
4309 int len
= chain_flistxattr(fd
, names1
, sizeof(names1
)-1);
4312 if (len
== -ERANGE
) {
4313 len
= chain_flistxattr(fd
, 0, 0);
4315 assert(!m_filestore_fail_eio
|| len
!= -EIO
);
4318 dout(10) << " -ERANGE, len is " << len
<< dendl
;
4319 names2
= new char[len
+1];
4320 len
= chain_flistxattr(fd
, names2
, len
);
4321 dout(10) << " -ERANGE, got " << len
<< dendl
;
4323 assert(!m_filestore_fail_eio
|| len
!= -EIO
);
4328 } else if (len
< 0) {
4329 assert(!m_filestore_fail_eio
|| len
!= -EIO
);
4336 char *end
= name
+ len
;
4337 while (name
< end
) {
4338 char *attrname
= name
;
4339 if (parse_attrname(&name
)) {
4341 dout(20) << __FUNC__
<< ": " << fd
<< " getting '" << name
<< "'" << dendl
;
4342 int r
= _fgetattr(fd
, attrname
, aset
[name
]);
4349 name
+= strlen(name
) + 1;
4356 int FileStore::_fsetattrs(int fd
, map
<string
, bufferptr
> &aset
)
4358 for (map
<string
, bufferptr
>::iterator p
= aset
.begin();
4361 char n
[CHAIN_XATTR_MAX_NAME_LEN
];
4362 get_attrname(p
->first
.c_str(), n
, CHAIN_XATTR_MAX_NAME_LEN
);
4364 if (p
->second
.length())
4365 val
= p
->second
.c_str();
4368 // ??? Why do we skip setting all the other attrs if one fails?
4369 int r
= chain_fsetxattr(fd
, n
, val
, p
->second
.length());
4371 derr
<< __FUNC__
<< ": chain_setxattr returned " << r
<< dendl
;
4378 // debug EIO injection
4379 void FileStore::inject_data_error(const ghobject_t
&oid
) {
4380 Mutex::Locker
l(read_error_lock
);
4381 dout(10) << __FUNC__
<< ": init error on " << oid
<< dendl
;
4382 data_error_set
.insert(oid
);
4384 void FileStore::inject_mdata_error(const ghobject_t
&oid
) {
4385 Mutex::Locker
l(read_error_lock
);
4386 dout(10) << __FUNC__
<< ": init error on " << oid
<< dendl
;
4387 mdata_error_set
.insert(oid
);
4390 void FileStore::debug_obj_on_delete(const ghobject_t
&oid
) {
4391 Mutex::Locker
l(read_error_lock
);
4392 dout(10) << __FUNC__
<< ": clear error on " << oid
<< dendl
;
4393 data_error_set
.erase(oid
);
4394 mdata_error_set
.erase(oid
);
4396 bool FileStore::debug_data_eio(const ghobject_t
&oid
) {
4397 Mutex::Locker
l(read_error_lock
);
4398 if (data_error_set
.count(oid
)) {
4399 dout(10) << __FUNC__
<< ": inject error on " << oid
<< dendl
;
4405 bool FileStore::debug_mdata_eio(const ghobject_t
&oid
) {
4406 Mutex::Locker
l(read_error_lock
);
4407 if (mdata_error_set
.count(oid
)) {
4408 dout(10) << __FUNC__
<< ": inject error on " << oid
<< dendl
;
4418 int FileStore::getattr(const coll_t
& _cid
, const ghobject_t
& oid
, const char *name
, bufferptr
&bp
)
4420 tracepoint(objectstore
, getattr_enter
, _cid
.c_str());
4421 const coll_t
& cid
= !_need_temp_object_collection(_cid
, oid
) ? _cid
: _cid
.get_temp();
4422 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " '" << name
<< "'" << dendl
;
4424 int r
= lfn_open(cid
, oid
, false, &fd
);
4428 char n
[CHAIN_XATTR_MAX_NAME_LEN
];
4429 get_attrname(name
, n
, CHAIN_XATTR_MAX_NAME_LEN
);
4430 r
= _fgetattr(**fd
, n
, bp
);
4432 if (r
== -ENODATA
) {
4433 map
<string
, bufferlist
> got
;
4435 to_get
.insert(string(name
));
4437 r
= get_index(cid
, &index
);
4439 dout(10) << __FUNC__
<< ": could not get index r = " << r
<< dendl
;
4442 r
= object_map
->get_xattrs(oid
, to_get
, &got
);
4443 if (r
< 0 && r
!= -ENOENT
) {
4444 dout(10) << __FUNC__
<< ": get_xattrs err r =" << r
<< dendl
;
4448 dout(10) << __FUNC__
<< ": got.size() is 0" << dendl
;
4451 bp
= bufferptr(got
.begin()->second
.c_str(),
4452 got
.begin()->second
.length());
4456 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " '" << name
<< "' = " << r
<< dendl
;
4457 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4458 if (cct
->_conf
->filestore_debug_inject_read_err
&&
4459 debug_mdata_eio(oid
)) {
4462 tracepoint(objectstore
, getattr_exit
, r
);
4463 return r
< 0 ? r
: 0;
4467 int FileStore::getattrs(const coll_t
& _cid
, const ghobject_t
& oid
, map
<string
,bufferptr
>& aset
)
4469 tracepoint(objectstore
, getattrs_enter
, _cid
.c_str());
4470 const coll_t
& cid
= !_need_temp_object_collection(_cid
, oid
) ? _cid
: _cid
.get_temp();
4471 set
<string
> omap_attrs
;
4472 map
<string
, bufferlist
> omap_aset
;
4474 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
<< dendl
;
4476 bool spill_out
= true;
4479 int r
= lfn_open(cid
, oid
, false, &fd
);
4484 r
= chain_fgetxattr(**fd
, XATTR_SPILL_OUT_NAME
, buf
, sizeof(buf
));
4485 if (r
>= 0 && !strncmp(buf
, XATTR_NO_SPILL_OUT
, sizeof(XATTR_NO_SPILL_OUT
)))
4488 r
= _fgetattrs(**fd
, aset
);
4490 fd
= FDRef(); // defensive
4496 dout(10) << __FUNC__
<< ": no xattr exists in object_map r = " << r
<< dendl
;
4500 r
= get_index(cid
, &index
);
4502 dout(10) << __FUNC__
<< ": could not get index r = " << r
<< dendl
;
4506 r
= object_map
->get_all_xattrs(oid
, &omap_attrs
);
4507 if (r
< 0 && r
!= -ENOENT
) {
4508 dout(10) << __FUNC__
<< ": could not get omap_attrs r = " << r
<< dendl
;
4512 r
= object_map
->get_xattrs(oid
, omap_attrs
, &omap_aset
);
4513 if (r
< 0 && r
!= -ENOENT
) {
4514 dout(10) << __FUNC__
<< ": could not get omap_attrs r = " << r
<< dendl
;
4520 assert(omap_attrs
.size() == omap_aset
.size());
4521 for (map
<string
, bufferlist
>::iterator i
= omap_aset
.begin();
4522 i
!= omap_aset
.end();
4524 string
key(i
->first
);
4525 aset
.insert(make_pair(key
,
4526 bufferptr(i
->second
.c_str(), i
->second
.length())));
4529 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " = " << r
<< dendl
;
4530 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4532 if (cct
->_conf
->filestore_debug_inject_read_err
&&
4533 debug_mdata_eio(oid
)) {
4536 tracepoint(objectstore
, getattrs_exit
, r
);
4541 int FileStore::_setattrs(const coll_t
& cid
, const ghobject_t
& oid
, map
<string
,bufferptr
>& aset
,
4542 const SequencerPosition
&spos
)
4544 map
<string
, bufferlist
> omap_set
;
4545 set
<string
> omap_remove
;
4546 map
<string
, bufferptr
> inline_set
;
4547 map
<string
, bufferptr
> inline_to_set
;
4550 bool incomplete_inline
= false;
4552 int r
= lfn_open(cid
, oid
, false, &fd
);
4558 r
= chain_fgetxattr(**fd
, XATTR_SPILL_OUT_NAME
, buf
, sizeof(buf
));
4559 if (r
>= 0 && !strncmp(buf
, XATTR_NO_SPILL_OUT
, sizeof(XATTR_NO_SPILL_OUT
)))
4564 r
= _fgetattrs(**fd
, inline_set
);
4565 incomplete_inline
= (r
== -E2BIG
);
4566 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4567 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
4568 << (incomplete_inline
? " (incomplete_inline, forcing omap)" : "")
4571 for (map
<string
,bufferptr
>::iterator p
= aset
.begin();
4574 char n
[CHAIN_XATTR_MAX_NAME_LEN
];
4575 get_attrname(p
->first
.c_str(), n
, CHAIN_XATTR_MAX_NAME_LEN
);
4577 if (incomplete_inline
) {
4578 chain_fremovexattr(**fd
, n
); // ignore any error
4579 omap_set
[p
->first
].push_back(p
->second
);
4583 if (p
->second
.length() > m_filestore_max_inline_xattr_size
) {
4584 if (inline_set
.count(p
->first
)) {
4585 inline_set
.erase(p
->first
);
4586 r
= chain_fremovexattr(**fd
, n
);
4590 omap_set
[p
->first
].push_back(p
->second
);
4594 if (!inline_set
.count(p
->first
) &&
4595 inline_set
.size() >= m_filestore_max_inline_xattrs
) {
4596 omap_set
[p
->first
].push_back(p
->second
);
4599 omap_remove
.insert(p
->first
);
4600 inline_set
.insert(*p
);
4602 inline_to_set
.insert(*p
);
4605 if (spill_out
!= 1 && !omap_set
.empty()) {
4606 chain_fsetxattr(**fd
, XATTR_SPILL_OUT_NAME
, XATTR_SPILL_OUT
,
4607 sizeof(XATTR_SPILL_OUT
));
4610 r
= _fsetattrs(**fd
, inline_to_set
);
4614 if (spill_out
&& !omap_remove
.empty()) {
4615 r
= object_map
->remove_xattrs(oid
, omap_remove
, &spos
);
4616 if (r
< 0 && r
!= -ENOENT
) {
4617 dout(10) << __FUNC__
<< ": could not remove_xattrs r = " << r
<< dendl
;
4618 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4621 r
= 0; // don't confuse the debug output
4625 if (!omap_set
.empty()) {
4626 r
= object_map
->set_xattrs(oid
, omap_set
, &spos
);
4628 dout(10) << __FUNC__
<< ": could not set_xattrs r = " << r
<< dendl
;
4629 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4636 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " = " << r
<< dendl
;
4641 int FileStore::_rmattr(const coll_t
& cid
, const ghobject_t
& oid
, const char *name
,
4642 const SequencerPosition
&spos
)
4644 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " '" << name
<< "'" << dendl
;
4646 bool spill_out
= true;
4648 int r
= lfn_open(cid
, oid
, false, &fd
);
4654 r
= chain_fgetxattr(**fd
, XATTR_SPILL_OUT_NAME
, buf
, sizeof(buf
));
4655 if (r
>= 0 && !strncmp(buf
, XATTR_NO_SPILL_OUT
, sizeof(XATTR_NO_SPILL_OUT
))) {
4659 char n
[CHAIN_XATTR_MAX_NAME_LEN
];
4660 get_attrname(name
, n
, CHAIN_XATTR_MAX_NAME_LEN
);
4661 r
= chain_fremovexattr(**fd
, n
);
4662 if (r
== -ENODATA
&& spill_out
) {
4664 r
= get_index(cid
, &index
);
4666 dout(10) << __FUNC__
<< ": could not get index r = " << r
<< dendl
;
4669 set
<string
> to_remove
;
4670 to_remove
.insert(string(name
));
4671 r
= object_map
->remove_xattrs(oid
, to_remove
, &spos
);
4672 if (r
< 0 && r
!= -ENOENT
) {
4673 dout(10) << __FUNC__
<< ": could not remove_xattrs index r = " << r
<< dendl
;
4674 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4681 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " '" << name
<< "' = " << r
<< dendl
;
4685 int FileStore::_rmattrs(const coll_t
& cid
, const ghobject_t
& oid
,
4686 const SequencerPosition
&spos
)
4688 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
<< dendl
;
4690 map
<string
,bufferptr
> aset
;
4692 set
<string
> omap_attrs
;
4694 bool spill_out
= true;
4696 int r
= lfn_open(cid
, oid
, false, &fd
);
4702 r
= chain_fgetxattr(**fd
, XATTR_SPILL_OUT_NAME
, buf
, sizeof(buf
));
4703 if (r
>= 0 && !strncmp(buf
, XATTR_NO_SPILL_OUT
, sizeof(XATTR_NO_SPILL_OUT
))) {
4707 r
= _fgetattrs(**fd
, aset
);
4709 for (map
<string
,bufferptr
>::iterator p
= aset
.begin(); p
!= aset
.end(); ++p
) {
4710 char n
[CHAIN_XATTR_MAX_NAME_LEN
];
4711 get_attrname(p
->first
.c_str(), n
, CHAIN_XATTR_MAX_NAME_LEN
);
4712 r
= chain_fremovexattr(**fd
, n
);
4714 dout(10) << __FUNC__
<< ": could not remove xattr r = " << r
<< dendl
;
4721 dout(10) << __FUNC__
<< ": no xattr exists in object_map r = " << r
<< dendl
;
4725 r
= get_index(cid
, &index
);
4727 dout(10) << __FUNC__
<< ": could not get index r = " << r
<< dendl
;
4731 r
= object_map
->get_all_xattrs(oid
, &omap_attrs
);
4732 if (r
< 0 && r
!= -ENOENT
) {
4733 dout(10) << __FUNC__
<< ": could not get omap_attrs r = " << r
<< dendl
;
4734 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4737 r
= object_map
->remove_xattrs(oid
, omap_attrs
, &spos
);
4738 if (r
< 0 && r
!= -ENOENT
) {
4739 dout(10) << __FUNC__
<< ": could not remove omap_attrs r = " << r
<< dendl
;
4744 chain_fsetxattr(**fd
, XATTR_SPILL_OUT_NAME
, XATTR_NO_SPILL_OUT
,
4745 sizeof(XATTR_NO_SPILL_OUT
));
4751 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " = " << r
<< dendl
;
4758 int FileStore::_collection_remove_recursive(const coll_t
&cid
,
4759 const SequencerPosition
&spos
)
4762 int r
= collection_stat(cid
, &st
);
4769 vector
<ghobject_t
> objects
;
4771 while (!max
.is_max()) {
4772 r
= collection_list(cid
, max
, ghobject_t::get_max(),
4773 300, &objects
, &max
);
4776 for (vector
<ghobject_t
>::iterator i
= objects
.begin();
4779 assert(_check_replay_guard(cid
, *i
, spos
));
4780 r
= _remove(cid
, *i
, spos
);
4786 return _destroy_collection(cid
);
4789 // --------------------------
4792 int FileStore::list_collections(vector
<coll_t
>& ls
)
4794 return list_collections(ls
, false);
4797 int FileStore::list_collections(vector
<coll_t
>& ls
, bool include_temp
)
4799 tracepoint(objectstore
, list_collections_enter
);
4800 dout(10) << __FUNC__
<< dendl
;
4803 snprintf(fn
, sizeof(fn
), "%s/current", basedir
.c_str());
4806 DIR *dir
= ::opendir(fn
);
4809 derr
<< "tried opening directory " << fn
<< ": " << cpp_strerror(-r
) << dendl
;
4810 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4814 struct dirent
*de
= nullptr;
4815 while ((de
= ::readdir(dir
))) {
4816 if (de
->d_type
== DT_UNKNOWN
) {
4817 // d_type not supported (non-ext[234], btrfs), must stat
4819 char filename
[PATH_MAX
];
4820 snprintf(filename
, sizeof(filename
), "%s/%s", fn
, de
->d_name
);
4822 r
= ::stat(filename
, &sb
);
4825 derr
<< "stat on " << filename
<< ": " << cpp_strerror(-r
) << dendl
;
4826 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4829 if (!S_ISDIR(sb
.st_mode
)) {
4832 } else if (de
->d_type
!= DT_DIR
) {
4835 if (strcmp(de
->d_name
, "omap") == 0) {
4838 if (de
->d_name
[0] == '.' &&
4839 (de
->d_name
[1] == '\0' ||
4840 (de
->d_name
[1] == '.' &&
4841 de
->d_name
[2] == '\0')))
4844 if (!cid
.parse(de
->d_name
)) {
4845 derr
<< "ignoring invalid collection '" << de
->d_name
<< "'" << dendl
;
4848 if (!cid
.is_temp() || include_temp
)
4853 derr
<< "trying readdir " << fn
<< ": " << cpp_strerror(r
) << dendl
;
4858 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4859 tracepoint(objectstore
, list_collections_exit
, r
);
4863 int FileStore::collection_stat(const coll_t
& c
, struct stat
*st
)
4865 tracepoint(objectstore
, collection_stat_enter
, c
.c_str());
4867 get_cdir(c
, fn
, sizeof(fn
));
4868 dout(15) << __FUNC__
<< ": " << fn
<< dendl
;
4869 int r
= ::stat(fn
, st
);
4872 dout(10) << __FUNC__
<< ": " << fn
<< " = " << r
<< dendl
;
4873 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4874 tracepoint(objectstore
, collection_stat_exit
, r
);
4878 bool FileStore::collection_exists(const coll_t
& c
)
4880 tracepoint(objectstore
, collection_exists_enter
, c
.c_str());
4882 bool ret
= collection_stat(c
, &st
) == 0;
4883 tracepoint(objectstore
, collection_exists_exit
, ret
);
4887 int FileStore::collection_empty(const coll_t
& c
, bool *empty
)
4889 tracepoint(objectstore
, collection_empty_enter
, c
.c_str());
4890 dout(15) << __FUNC__
<< ": " << c
<< dendl
;
4892 int r
= get_index(c
, &index
);
4894 derr
<< __FUNC__
<< ": get_index returned: " << cpp_strerror(r
)
4899 assert(NULL
!= index
.index
);
4900 RWLock::RLocker
l((index
.index
)->access_lock
);
4902 vector
<ghobject_t
> ls
;
4903 r
= index
->collection_list_partial(ghobject_t(), ghobject_t::get_max(),
4906 derr
<< __FUNC__
<< ": collection_list_partial returned: "
4907 << cpp_strerror(r
) << dendl
;
4908 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
4911 *empty
= ls
.empty();
4912 tracepoint(objectstore
, collection_empty_exit
, *empty
);
4916 int FileStore::_collection_set_bits(const coll_t
& c
, int bits
)
4919 get_cdir(c
, fn
, sizeof(fn
));
4920 dout(10) << __FUNC__
<< ": " << fn
<< " " << bits
<< dendl
;
4924 int fd
= ::open(fn
, O_RDONLY
);
4929 get_attrname("bits", n
, PATH_MAX
);
4930 r
= chain_fsetxattr(fd
, n
, (char*)&v
, sizeof(v
));
4931 VOID_TEMP_FAILURE_RETRY(::close(fd
));
4933 dout(10) << __FUNC__
<< ": " << fn
<< " " << bits
<< " = " << r
<< dendl
;
4937 int FileStore::collection_bits(const coll_t
& c
)
4940 get_cdir(c
, fn
, sizeof(fn
));
4941 dout(15) << __FUNC__
<< ": " << fn
<< dendl
;
4945 int fd
= ::open(fn
, O_RDONLY
);
4950 get_attrname("bits", n
, PATH_MAX
);
4951 r
= chain_fgetxattr(fd
, n
, (char*)&bits
, sizeof(bits
));
4952 VOID_TEMP_FAILURE_RETRY(::close(fd
));
4958 dout(10) << __FUNC__
<< ": " << fn
<< " = " << bits
<< dendl
;
4962 int FileStore::collection_list(const coll_t
& c
,
4963 const ghobject_t
& orig_start
,
4964 const ghobject_t
& end
,
4966 vector
<ghobject_t
> *ls
, ghobject_t
*next
)
4968 ghobject_t start
= orig_start
;
4972 ghobject_t temp_next
;
4975 // figure out the pool id. we need this in order to generate a
4976 // meaningful 'next' value.
4981 if (c
.is_temp(&pgid
)) {
4982 pool
= -2 - pgid
.pool();
4984 } else if (c
.is_pg(&pgid
)) {
4987 } else if (c
.is_meta()) {
4989 shard
= shard_id_t::NO_SHARD
;
4991 // hrm, the caller is test code! we should get kill it off. for now,
4994 shard
= shard_id_t::NO_SHARD
;
4996 dout(20) << __FUNC__
<< ": pool is " << pool
<< " shard is " << shard
4997 << " pgid " << pgid
<< dendl
;
5001 sep
.set_shard(shard
);
5002 if (!c
.is_temp() && !c
.is_meta()) {
5004 dout(10) << __FUNC__
<< ": first checking temp pool" << dendl
;
5005 coll_t temp
= c
.get_temp();
5006 int r
= collection_list(temp
, start
, end
, max
, ls
, next
);
5009 if (*next
!= ghobject_t::get_max())
5012 dout(10) << __FUNC__
<< ": fall through to non-temp collection, start "
5015 dout(10) << __FUNC__
<< ": start " << start
<< " >= sep " << sep
<< dendl
;
5020 int r
= get_index(c
, &index
);
5024 assert(NULL
!= index
.index
);
5025 RWLock::RLocker
l((index
.index
)->access_lock
);
5027 r
= index
->collection_list_partial(start
, end
, max
, ls
, next
);
5030 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
5033 dout(20) << "objects: " << *ls
<< dendl
;
5035 // HashIndex doesn't know the pool when constructing a 'next' value
5036 if (next
&& !next
->is_max()) {
5037 next
->hobj
.pool
= pool
;
5038 next
->set_shard(shard
);
5039 dout(20) << " next " << *next
<< dendl
;
5045 int FileStore::omap_get(const coll_t
& _c
, const ghobject_t
&hoid
,
5047 map
<string
, bufferlist
> *out
)
5049 tracepoint(objectstore
, omap_get_enter
, _c
.c_str());
5050 const coll_t
& c
= !_need_temp_object_collection(_c
, hoid
) ? _c
: _c
.get_temp();
5051 dout(15) << __FUNC__
<< ": " << c
<< "/" << hoid
<< dendl
;
5053 int r
= get_index(c
, &index
);
5057 assert(NULL
!= index
.index
);
5058 RWLock::RLocker
l((index
.index
)->access_lock
);
5059 r
= lfn_find(hoid
, index
);
5063 r
= object_map
->get(hoid
, header
, out
);
5064 if (r
< 0 && r
!= -ENOENT
) {
5065 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
5068 tracepoint(objectstore
, omap_get_exit
, 0);
5072 int FileStore::omap_get_header(
5074 const ghobject_t
&hoid
,
5078 tracepoint(objectstore
, omap_get_header_enter
, _c
.c_str());
5079 const coll_t
& c
= !_need_temp_object_collection(_c
, hoid
) ? _c
: _c
.get_temp();
5080 dout(15) << __FUNC__
<< ": " << c
<< "/" << hoid
<< dendl
;
5082 int r
= get_index(c
, &index
);
5086 assert(NULL
!= index
.index
);
5087 RWLock::RLocker
l((index
.index
)->access_lock
);
5088 r
= lfn_find(hoid
, index
);
5092 r
= object_map
->get_header(hoid
, bl
);
5093 if (r
< 0 && r
!= -ENOENT
) {
5094 assert(allow_eio
|| !m_filestore_fail_eio
|| r
!= -EIO
);
5097 tracepoint(objectstore
, omap_get_header_exit
, 0);
5101 int FileStore::omap_get_keys(const coll_t
& _c
, const ghobject_t
&hoid
, set
<string
> *keys
)
5103 tracepoint(objectstore
, omap_get_keys_enter
, _c
.c_str());
5104 const coll_t
& c
= !_need_temp_object_collection(_c
, hoid
) ? _c
: _c
.get_temp();
5105 dout(15) << __FUNC__
<< ": " << c
<< "/" << hoid
<< dendl
;
5107 int r
= get_index(c
, &index
);
5111 assert(NULL
!= index
.index
);
5112 RWLock::RLocker
l((index
.index
)->access_lock
);
5113 r
= lfn_find(hoid
, index
);
5117 r
= object_map
->get_keys(hoid
, keys
);
5118 if (r
< 0 && r
!= -ENOENT
) {
5119 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
5122 tracepoint(objectstore
, omap_get_keys_exit
, 0);
5126 int FileStore::omap_get_values(const coll_t
& _c
, const ghobject_t
&hoid
,
5127 const set
<string
> &keys
,
5128 map
<string
, bufferlist
> *out
)
5130 tracepoint(objectstore
, omap_get_values_enter
, _c
.c_str());
5131 const coll_t
& c
= !_need_temp_object_collection(_c
, hoid
) ? _c
: _c
.get_temp();
5132 dout(15) << __FUNC__
<< ": " << c
<< "/" << hoid
<< dendl
;
5134 const char *where
= "()";
5135 int r
= get_index(c
, &index
);
5137 where
= " (get_index)";
5141 assert(NULL
!= index
.index
);
5142 RWLock::RLocker
l((index
.index
)->access_lock
);
5143 r
= lfn_find(hoid
, index
);
5145 where
= " (lfn_find)";
5149 r
= object_map
->get_values(hoid
, keys
, out
);
5150 if (r
< 0 && r
!= -ENOENT
) {
5151 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
5152 where
= " (get_values)";
5157 tracepoint(objectstore
, omap_get_values_exit
, r
);
5158 dout(15) << __FUNC__
<< ": " << c
<< "/" << hoid
<< " = " << r
5163 int FileStore::omap_check_keys(const coll_t
& _c
, const ghobject_t
&hoid
,
5164 const set
<string
> &keys
,
5167 tracepoint(objectstore
, omap_check_keys_enter
, _c
.c_str());
5168 const coll_t
& c
= !_need_temp_object_collection(_c
, hoid
) ? _c
: _c
.get_temp();
5169 dout(15) << __FUNC__
<< ": " << c
<< "/" << hoid
<< dendl
;
5172 int r
= get_index(c
, &index
);
5176 assert(NULL
!= index
.index
);
5177 RWLock::RLocker
l((index
.index
)->access_lock
);
5178 r
= lfn_find(hoid
, index
);
5182 r
= object_map
->check_keys(hoid
, keys
, out
);
5183 if (r
< 0 && r
!= -ENOENT
) {
5184 assert(!m_filestore_fail_eio
|| r
!= -EIO
);
5187 tracepoint(objectstore
, omap_check_keys_exit
, 0);
5191 ObjectMap::ObjectMapIterator
FileStore::get_omap_iterator(const coll_t
& _c
,
5192 const ghobject_t
&hoid
)
5194 tracepoint(objectstore
, get_omap_iterator
, _c
.c_str());
5195 const coll_t
& c
= !_need_temp_object_collection(_c
, hoid
) ? _c
: _c
.get_temp();
5196 dout(15) << __FUNC__
<< ": " << c
<< "/" << hoid
<< dendl
;
5198 int r
= get_index(c
, &index
);
5200 dout(10) << __FUNC__
<< ": " << c
<< "/" << hoid
<< " = 0 "
5201 << "(get_index failed with " << cpp_strerror(r
) << ")" << dendl
;
5202 return ObjectMap::ObjectMapIterator();
5205 assert(NULL
!= index
.index
);
5206 RWLock::RLocker
l((index
.index
)->access_lock
);
5207 r
= lfn_find(hoid
, index
);
5209 dout(10) << __FUNC__
<< ": " << c
<< "/" << hoid
<< " = 0 "
5210 << "(lfn_find failed with " << cpp_strerror(r
) << ")" << dendl
;
5211 return ObjectMap::ObjectMapIterator();
5214 return object_map
->get_iterator(hoid
);
5217 int FileStore::_collection_hint_expected_num_objs(const coll_t
& c
, uint32_t pg_num
,
5218 uint64_t expected_num_objs
,
5219 const SequencerPosition
&spos
)
5221 dout(15) << __FUNC__
<< ": collection: " << c
<< " pg number: "
5222 << pg_num
<< " expected number of objects: " << expected_num_objs
<< dendl
;
5225 int ret
= collection_empty(c
, &empty
);
5228 if (!empty
&& !replaying
) {
5229 dout(0) << "Failed to give an expected number of objects hint to collection : "
5230 << c
<< ", only empty collection can take such type of hint. " << dendl
;
5235 ret
= get_index(c
, &index
);
5238 // Pre-hash the collection
5239 ret
= index
->pre_hash_collection(pg_num
, expected_num_objs
);
5240 dout(10) << "pre_hash_collection " << c
<< " = " << ret
<< dendl
;
5243 _set_replay_guard(c
, spos
);
5248 int FileStore::_create_collection(
5251 const SequencerPosition
&spos
)
5254 get_cdir(c
, fn
, sizeof(fn
));
5255 dout(15) << __FUNC__
<< ": " << fn
<< dendl
;
5256 int r
= ::mkdir(fn
, 0755);
5259 if (r
== -EEXIST
&& replaying
)
5261 dout(10) << __FUNC__
<< ": " << fn
<< " = " << r
<< dendl
;
5268 r
= _collection_set_bits(c
, bits
);
5271 // create parallel temp collection, too
5272 if (!c
.is_meta() && !c
.is_temp()) {
5273 coll_t temp
= c
.get_temp();
5274 r
= _create_collection(temp
, 0, spos
);
5279 _set_replay_guard(c
, spos
);
5283 int FileStore::_destroy_collection(const coll_t
& c
)
5287 get_cdir(c
, fn
, sizeof(fn
));
5288 dout(15) << __FUNC__
<< ": " << fn
<< dendl
;
5291 r
= get_index(c
, &from
);
5294 assert(NULL
!= from
.index
);
5295 RWLock::WLocker
l((from
.index
)->access_lock
);
5297 r
= from
->prep_delete();
5308 // destroy parallel temp collection, too
5309 if (!c
.is_meta() && !c
.is_temp()) {
5310 coll_t temp
= c
.get_temp();
5311 int r2
= _destroy_collection(temp
);
5319 dout(10) << __FUNC__
<< ": " << fn
<< " = " << r
<< dendl
;
5324 int FileStore::_collection_add(const coll_t
& c
, const coll_t
& oldcid
, const ghobject_t
& o
,
5325 const SequencerPosition
& spos
)
5327 dout(15) << __FUNC__
<< ": " << c
<< "/" << o
<< " from " << oldcid
<< "/" << o
<< dendl
;
5329 int dstcmp
= _check_replay_guard(c
, o
, spos
);
5333 // check the src name too; it might have a newer guard, and we don't
5334 // want to clobber it
5335 int srccmp
= _check_replay_guard(oldcid
, o
, spos
);
5339 // open guard on object so we don't any previous operations on the
5340 // new name that will modify the source inode.
5342 int r
= lfn_open(oldcid
, o
, 0, &fd
);
5344 // the source collection/object does not exist. If we are replaying, we
5345 // should be safe, so just return 0 and move on.
5347 dout(10) << __FUNC__
<< ": " << c
<< "/" << o
<< " from "
5348 << oldcid
<< "/" << o
<< " (dne, continue replay) " << dendl
;
5351 if (dstcmp
> 0) { // if dstcmp == 0 the guard already says "in-progress"
5352 _set_replay_guard(**fd
, spos
, &o
, true);
5355 r
= lfn_link(oldcid
, c
, o
, o
);
5356 if (replaying
&& !backend
->can_checkpoint() &&
5357 r
== -EEXIST
) // crashed between link() and set_replay_guard()
5362 // close guard on object so we don't do this again
5364 _close_replay_guard(**fd
, spos
);
5368 dout(10) << __FUNC__
<< ": " << c
<< "/" << o
<< " from " << oldcid
<< "/" << o
<< " = " << r
<< dendl
;
5372 int FileStore::_collection_move_rename(const coll_t
& oldcid
, const ghobject_t
& oldoid
,
5373 coll_t c
, const ghobject_t
& o
,
5374 const SequencerPosition
& spos
,
5377 dout(15) << __FUNC__
<< ": " << c
<< "/" << o
<< " from " << oldcid
<< "/" << oldoid
<< dendl
;
5382 /* If the destination collection doesn't exist during replay,
5383 * we need to delete the src object and continue on
5385 if (!collection_exists(c
))
5389 dstcmp
= _check_replay_guard(c
, o
, spos
);
5393 // check the src name too; it might have a newer guard, and we don't
5394 // want to clobber it
5395 srccmp
= _check_replay_guard(oldcid
, oldoid
, spos
);
5400 // open guard on object so we don't any previous operations on the
5401 // new name that will modify the source inode.
5403 r
= lfn_open(oldcid
, oldoid
, 0, &fd
);
5405 // the source collection/object does not exist. If we are replaying, we
5406 // should be safe, so just return 0 and move on.
5408 dout(10) << __FUNC__
<< ": " << c
<< "/" << o
<< " from "
5409 << oldcid
<< "/" << oldoid
<< " (dne, continue replay) " << dendl
;
5410 } else if (allow_enoent
) {
5411 dout(10) << __FUNC__
<< ": " << c
<< "/" << o
<< " from "
5412 << oldcid
<< "/" << oldoid
<< " (dne, ignoring enoent)"
5415 assert(0 == "ERROR: source must exist");
5421 if (allow_enoent
&& dstcmp
> 0) { // if dstcmp == 0, try_rename was started.
5425 r
= 0; // don't know if object_map was cloned
5427 if (dstcmp
> 0) { // if dstcmp == 0 the guard already says "in-progress"
5428 _set_replay_guard(**fd
, spos
, &o
, true);
5431 r
= lfn_link(oldcid
, c
, oldoid
, o
);
5432 if (replaying
&& !backend
->can_checkpoint() &&
5433 r
== -EEXIST
) // crashed between link() and set_replay_guard()
5443 // the name changed; link the omap content
5444 r
= object_map
->rename(oldoid
, o
, &spos
);
5452 r
= lfn_unlink(oldcid
, oldoid
, spos
, true);
5455 r
= lfn_open(c
, o
, 0, &fd
);
5457 // close guard on object so we don't do this again
5459 _close_replay_guard(**fd
, spos
, &o
);
5464 dout(10) << __FUNC__
<< ": " << c
<< "/" << o
<< " from " << oldcid
<< "/" << oldoid
5465 << " = " << r
<< dendl
;
5470 if (_check_replay_guard(oldcid
, oldoid
, spos
) > 0) {
5471 r
= lfn_unlink(oldcid
, oldoid
, spos
, true);
5474 dout(10) << __FUNC__
<< ": " << c
<< "/" << o
<< " from " << oldcid
<< "/" << oldoid
5475 << " = " << r
<< dendl
;
5479 void FileStore::_inject_failure()
5481 if (m_filestore_kill_at
) {
5482 int final
= --m_filestore_kill_at
;
5483 dout(5) << __FUNC__
<< ": " << (final
+1) << " -> " << final
<< dendl
;
5485 derr
<< __FUNC__
<< ": KILLING" << dendl
;
5492 int FileStore::_omap_clear(const coll_t
& cid
, const ghobject_t
&hoid
,
5493 const SequencerPosition
&spos
) {
5494 dout(15) << __FUNC__
<< ": " << cid
<< "/" << hoid
<< dendl
;
5496 int r
= get_index(cid
, &index
);
5500 assert(NULL
!= index
.index
);
5501 RWLock::RLocker
l((index
.index
)->access_lock
);
5502 r
= lfn_find(hoid
, index
);
5506 r
= object_map
->clear_keys_header(hoid
, &spos
);
5507 if (r
< 0 && r
!= -ENOENT
)
5512 int FileStore::_omap_setkeys(const coll_t
& cid
, const ghobject_t
&hoid
,
5513 const map
<string
, bufferlist
> &aset
,
5514 const SequencerPosition
&spos
) {
5515 dout(15) << __FUNC__
<< ": " << cid
<< "/" << hoid
<< dendl
;
5518 //treat pgmeta as a logical object, skip to check exist
5519 if (hoid
.is_pgmeta())
5522 r
= get_index(cid
, &index
);
5524 dout(20) << __FUNC__
<< ": get_index got " << cpp_strerror(r
) << dendl
;
5528 assert(NULL
!= index
.index
);
5529 RWLock::RLocker
l((index
.index
)->access_lock
);
5530 r
= lfn_find(hoid
, index
);
5532 dout(20) << __FUNC__
<< ": lfn_find got " << cpp_strerror(r
) << dendl
;
5537 if (g_conf
->subsys
.should_gather(ceph_subsys_filestore
, 20)) {
5538 for (auto& p
: aset
) {
5539 dout(20) << __FUNC__
<< ": set " << p
.first
<< dendl
;
5542 r
= object_map
->set_keys(hoid
, aset
, &spos
);
5543 dout(20) << __FUNC__
<< ": " << cid
<< "/" << hoid
<< " = " << r
<< dendl
;
5547 int FileStore::_omap_rmkeys(const coll_t
& cid
, const ghobject_t
&hoid
,
5548 const set
<string
> &keys
,
5549 const SequencerPosition
&spos
) {
5550 dout(15) << __FUNC__
<< ": " << cid
<< "/" << hoid
<< dendl
;
5553 //treat pgmeta as a logical object, skip to check exist
5554 if (hoid
.is_pgmeta())
5557 r
= get_index(cid
, &index
);
5561 assert(NULL
!= index
.index
);
5562 RWLock::RLocker
l((index
.index
)->access_lock
);
5563 r
= lfn_find(hoid
, index
);
5568 r
= object_map
->rm_keys(hoid
, keys
, &spos
);
5569 if (r
< 0 && r
!= -ENOENT
)
5574 int FileStore::_omap_rmkeyrange(const coll_t
& cid
, const ghobject_t
&hoid
,
5575 const string
& first
, const string
& last
,
5576 const SequencerPosition
&spos
) {
5577 dout(15) << __FUNC__
<< ": " << cid
<< "/" << hoid
<< " [" << first
<< "," << last
<< "]" << dendl
;
5580 ObjectMap::ObjectMapIterator iter
= get_omap_iterator(cid
, hoid
);
5583 for (iter
->lower_bound(first
); iter
->valid() && iter
->key() < last
;
5585 keys
.insert(iter
->key());
5588 return _omap_rmkeys(cid
, hoid
, keys
, spos
);
5591 int FileStore::_omap_setheader(const coll_t
& cid
, const ghobject_t
&hoid
,
5592 const bufferlist
&bl
,
5593 const SequencerPosition
&spos
)
5595 dout(15) << __FUNC__
<< ": " << cid
<< "/" << hoid
<< dendl
;
5597 int r
= get_index(cid
, &index
);
5601 assert(NULL
!= index
.index
);
5602 RWLock::RLocker
l((index
.index
)->access_lock
);
5603 r
= lfn_find(hoid
, index
);
5607 return object_map
->set_header(hoid
, bl
, &spos
);
5610 int FileStore::_split_collection(const coll_t
& cid
,
5614 const SequencerPosition
&spos
)
5618 dout(15) << __FUNC__
<< ": " << cid
<< " bits: " << bits
<< dendl
;
5619 if (!collection_exists(cid
)) {
5620 dout(2) << __FUNC__
<< ": " << cid
<< " DNE" << dendl
;
5624 if (!collection_exists(dest
)) {
5625 dout(2) << __FUNC__
<< ": " << dest
<< " DNE" << dendl
;
5630 int dstcmp
= _check_replay_guard(dest
, spos
);
5634 int srccmp
= _check_replay_guard(cid
, spos
);
5638 _set_global_replay_guard(cid
, spos
);
5639 _set_replay_guard(cid
, spos
, true);
5640 _set_replay_guard(dest
, spos
, true);
5643 r
= get_index(cid
, &from
);
5647 r
= get_index(dest
, &to
);
5650 assert(NULL
!= from
.index
);
5651 RWLock::WLocker
l1((from
.index
)->access_lock
);
5653 assert(NULL
!= to
.index
);
5654 RWLock::WLocker
l2((to
.index
)->access_lock
);
5656 r
= from
->split(rem
, bits
, to
.index
);
5659 _close_replay_guard(cid
, spos
);
5660 _close_replay_guard(dest
, spos
);
5662 _collection_set_bits(cid
, bits
);
5663 if (!r
&& cct
->_conf
->filestore_debug_verify_split
) {
5664 vector
<ghobject_t
> objects
;
5669 next
, ghobject_t::get_max(),
5670 get_ideal_list_max(),
5673 if (objects
.empty())
5675 for (vector
<ghobject_t
>::iterator i
= objects
.begin();
5678 dout(20) << __FUNC__
<< ": " << *i
<< " still in source "
5680 assert(!i
->match(bits
, rem
));
5684 next
= ghobject_t();
5688 next
, ghobject_t::get_max(),
5689 get_ideal_list_max(),
5692 if (objects
.empty())
5694 for (vector
<ghobject_t
>::iterator i
= objects
.begin();
5697 dout(20) << __FUNC__
<< ": " << *i
<< " now in dest "
5699 assert(i
->match(bits
, rem
));
5707 int FileStore::_set_alloc_hint(const coll_t
& cid
, const ghobject_t
& oid
,
5708 uint64_t expected_object_size
,
5709 uint64_t expected_write_size
)
5711 dout(15) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " object_size " << expected_object_size
<< " write_size " << expected_write_size
<< dendl
;
5716 if (expected_object_size
== 0 || expected_write_size
== 0)
5719 ret
= lfn_open(cid
, oid
, false, &fd
);
5724 // TODO: a more elaborate hint calculation
5725 uint64_t hint
= MIN(expected_write_size
, m_filestore_max_alloc_hint_size
);
5727 ret
= backend
->set_alloc_hint(**fd
, hint
);
5728 dout(20) << __FUNC__
<< ": hint " << hint
<< " ret " << ret
<< dendl
;
5733 dout(10) << __FUNC__
<< ": " << cid
<< "/" << oid
<< " object_size " << expected_object_size
<< " write_size " << expected_write_size
<< " = " << ret
<< dendl
;
5734 assert(!m_filestore_fail_eio
|| ret
!= -EIO
);
5738 const char** FileStore::get_tracked_conf_keys() const
5740 static const char* KEYS
[] = {
5741 "filestore_max_inline_xattr_size",
5742 "filestore_max_inline_xattr_size_xfs",
5743 "filestore_max_inline_xattr_size_btrfs",
5744 "filestore_max_inline_xattr_size_other",
5745 "filestore_max_inline_xattrs",
5746 "filestore_max_inline_xattrs_xfs",
5747 "filestore_max_inline_xattrs_btrfs",
5748 "filestore_max_inline_xattrs_other",
5749 "filestore_max_xattr_value_size",
5750 "filestore_max_xattr_value_size_xfs",
5751 "filestore_max_xattr_value_size_btrfs",
5752 "filestore_max_xattr_value_size_other",
5753 "filestore_min_sync_interval",
5754 "filestore_max_sync_interval",
5755 "filestore_queue_max_ops",
5756 "filestore_queue_max_bytes",
5757 "filestore_expected_throughput_bytes",
5758 "filestore_expected_throughput_ops",
5759 "filestore_queue_low_threshhold",
5760 "filestore_queue_high_threshhold",
5761 "filestore_queue_high_delay_multiple",
5762 "filestore_queue_max_delay_multiple",
5763 "filestore_commit_timeout",
5764 "filestore_dump_file",
5765 "filestore_kill_at",
5766 "filestore_fail_eio",
5767 "filestore_fadvise",
5768 "filestore_sloppy_crc",
5769 "filestore_sloppy_crc_block_size",
5770 "filestore_max_alloc_hint_size",
5776 void FileStore::handle_conf_change(const struct md_config_t
*conf
,
5777 const std::set
<std::string
> &changed
)
5779 if (changed
.count("filestore_max_inline_xattr_size") ||
5780 changed
.count("filestore_max_inline_xattr_size_xfs") ||
5781 changed
.count("filestore_max_inline_xattr_size_btrfs") ||
5782 changed
.count("filestore_max_inline_xattr_size_other") ||
5783 changed
.count("filestore_max_inline_xattrs") ||
5784 changed
.count("filestore_max_inline_xattrs_xfs") ||
5785 changed
.count("filestore_max_inline_xattrs_btrfs") ||
5786 changed
.count("filestore_max_inline_xattrs_other") ||
5787 changed
.count("filestore_max_xattr_value_size") ||
5788 changed
.count("filestore_max_xattr_value_size_xfs") ||
5789 changed
.count("filestore_max_xattr_value_size_btrfs") ||
5790 changed
.count("filestore_max_xattr_value_size_other")) {
5792 Mutex::Locker
l(lock
);
5793 set_xattr_limits_via_conf();
5797 if (changed
.count("filestore_queue_max_bytes") ||
5798 changed
.count("filestore_queue_max_ops") ||
5799 changed
.count("filestore_expected_throughput_bytes") ||
5800 changed
.count("filestore_expected_throughput_ops") ||
5801 changed
.count("filestore_queue_low_threshhold") ||
5802 changed
.count("filestore_queue_high_threshhold") ||
5803 changed
.count("filestore_queue_high_delay_multiple") ||
5804 changed
.count("filestore_queue_max_delay_multiple")) {
5805 Mutex::Locker
l(lock
);
5806 set_throttle_params();
5809 if (changed
.count("filestore_min_sync_interval") ||
5810 changed
.count("filestore_max_sync_interval") ||
5811 changed
.count("filestore_kill_at") ||
5812 changed
.count("filestore_fail_eio") ||
5813 changed
.count("filestore_sloppy_crc") ||
5814 changed
.count("filestore_sloppy_crc_block_size") ||
5815 changed
.count("filestore_max_alloc_hint_size") ||
5816 changed
.count("filestore_fadvise")) {
5817 Mutex::Locker
l(lock
);
5818 m_filestore_min_sync_interval
= conf
->filestore_min_sync_interval
;
5819 m_filestore_max_sync_interval
= conf
->filestore_max_sync_interval
;
5820 m_filestore_kill_at
= conf
->filestore_kill_at
;
5821 m_filestore_fail_eio
= conf
->filestore_fail_eio
;
5822 m_filestore_fadvise
= conf
->filestore_fadvise
;
5823 m_filestore_sloppy_crc
= conf
->filestore_sloppy_crc
;
5824 m_filestore_sloppy_crc_block_size
= conf
->filestore_sloppy_crc_block_size
;
5825 m_filestore_max_alloc_hint_size
= conf
->filestore_max_alloc_hint_size
;
5827 if (changed
.count("filestore_commit_timeout")) {
5828 Mutex::Locker
l(sync_entry_timeo_lock
);
5829 m_filestore_commit_timeout
= conf
->filestore_commit_timeout
;
5831 if (changed
.count("filestore_dump_file")) {
5832 if (conf
->filestore_dump_file
.length() &&
5833 conf
->filestore_dump_file
!= "-") {
5834 dump_start(conf
->filestore_dump_file
);
5841 int FileStore::set_throttle_params()
5844 bool valid
= throttle_bytes
.set_params(
5845 cct
->_conf
->filestore_queue_low_threshhold
,
5846 cct
->_conf
->filestore_queue_high_threshhold
,
5847 cct
->_conf
->filestore_expected_throughput_bytes
,
5848 cct
->_conf
->filestore_queue_high_delay_multiple
,
5849 cct
->_conf
->filestore_queue_max_delay_multiple
,
5850 cct
->_conf
->filestore_queue_max_bytes
,
5853 valid
&= throttle_ops
.set_params(
5854 cct
->_conf
->filestore_queue_low_threshhold
,
5855 cct
->_conf
->filestore_queue_high_threshhold
,
5856 cct
->_conf
->filestore_expected_throughput_ops
,
5857 cct
->_conf
->filestore_queue_high_delay_multiple
,
5858 cct
->_conf
->filestore_queue_max_delay_multiple
,
5859 cct
->_conf
->filestore_queue_max_ops
,
5862 logger
->set(l_filestore_op_queue_max_ops
, throttle_ops
.get_max());
5863 logger
->set(l_filestore_op_queue_max_bytes
, throttle_bytes
.get_max());
5866 derr
<< "tried to set invalid params: "
5870 return valid
? 0 : -EINVAL
;
5873 void FileStore::dump_start(const std::string
& file
)
5875 dout(10) << __FUNC__
<< ": " << file
<< dendl
;
5876 if (m_filestore_do_dump
) {
5879 m_filestore_dump_fmt
.reset();
5880 m_filestore_dump_fmt
.open_array_section("dump");
5881 m_filestore_dump
.open(file
.c_str());
5882 m_filestore_do_dump
= true;
5885 void FileStore::dump_stop()
5887 dout(10) << __FUNC__
<< dendl
;
5888 m_filestore_do_dump
= false;
5889 if (m_filestore_dump
.is_open()) {
5890 m_filestore_dump_fmt
.close_section();
5891 m_filestore_dump_fmt
.flush(m_filestore_dump
);
5892 m_filestore_dump
.flush();
5893 m_filestore_dump
.close();
5897 void FileStore::dump_transactions(vector
<ObjectStore::Transaction
>& ls
, uint64_t seq
, OpSequencer
*osr
)
5899 m_filestore_dump_fmt
.open_array_section("transactions");
5900 unsigned trans_num
= 0;
5901 for (vector
<ObjectStore::Transaction
>::iterator i
= ls
.begin(); i
!= ls
.end(); ++i
, ++trans_num
) {
5902 m_filestore_dump_fmt
.open_object_section("transaction");
5903 m_filestore_dump_fmt
.dump_string("osr", osr
->get_name());
5904 m_filestore_dump_fmt
.dump_unsigned("seq", seq
);
5905 m_filestore_dump_fmt
.dump_unsigned("trans_num", trans_num
);
5906 (*i
).dump(&m_filestore_dump_fmt
);
5907 m_filestore_dump_fmt
.close_section();
5909 m_filestore_dump_fmt
.close_section();
5910 m_filestore_dump_fmt
.flush(m_filestore_dump
);
5911 m_filestore_dump
.flush();
5914 void FileStore::set_xattr_limits_via_conf()
5916 uint32_t fs_xattr_size
;
5918 uint32_t fs_xattr_max_value_size
;
5920 switch (m_fs_type
) {
5921 #if defined(__linux__)
5922 case XFS_SUPER_MAGIC
:
5923 fs_xattr_size
= cct
->_conf
->filestore_max_inline_xattr_size_xfs
;
5924 fs_xattrs
= cct
->_conf
->filestore_max_inline_xattrs_xfs
;
5925 fs_xattr_max_value_size
= cct
->_conf
->filestore_max_xattr_value_size_xfs
;
5927 case BTRFS_SUPER_MAGIC
:
5928 fs_xattr_size
= cct
->_conf
->filestore_max_inline_xattr_size_btrfs
;
5929 fs_xattrs
= cct
->_conf
->filestore_max_inline_xattrs_btrfs
;
5930 fs_xattr_max_value_size
= cct
->_conf
->filestore_max_xattr_value_size_btrfs
;
5934 fs_xattr_size
= cct
->_conf
->filestore_max_inline_xattr_size_other
;
5935 fs_xattrs
= cct
->_conf
->filestore_max_inline_xattrs_other
;
5936 fs_xattr_max_value_size
= cct
->_conf
->filestore_max_xattr_value_size_other
;
5940 // Use override value if set
5941 if (cct
->_conf
->filestore_max_inline_xattr_size
)
5942 m_filestore_max_inline_xattr_size
= cct
->_conf
->filestore_max_inline_xattr_size
;
5944 m_filestore_max_inline_xattr_size
= fs_xattr_size
;
5946 // Use override value if set
5947 if (cct
->_conf
->filestore_max_inline_xattrs
)
5948 m_filestore_max_inline_xattrs
= cct
->_conf
->filestore_max_inline_xattrs
;
5950 m_filestore_max_inline_xattrs
= fs_xattrs
;
5952 // Use override value if set
5953 if (cct
->_conf
->filestore_max_xattr_value_size
)
5954 m_filestore_max_xattr_value_size
= cct
->_conf
->filestore_max_xattr_value_size
;
5956 m_filestore_max_xattr_value_size
= fs_xattr_max_value_size
;
5958 if (m_filestore_max_xattr_value_size
< cct
->_conf
->osd_max_object_name_len
) {
5959 derr
<< "WARNING: max attr value size ("
5960 << m_filestore_max_xattr_value_size
5961 << ") is smaller than osd_max_object_name_len ("
5962 << cct
->_conf
->osd_max_object_name_len
5963 << "). Your backend filesystem appears to not support attrs large "
5964 << "enough to handle the configured max rados name size. You may get "
5965 << "unexpected ENAMETOOLONG errors on rados operations or buggy "
5971 uint64_t FileStore::estimate_objects_overhead(uint64_t num_objects
)
5973 uint64_t res
= num_objects
* blk_size
/ 2; //assumes that each object uses ( in average ) additional 1/2 block due to FS allocation granularity.
5977 int FileStore::apply_layout_settings(const coll_t
&cid
)
5979 dout(20) << __FUNC__
<< ": " << cid
<< dendl
;
5981 int r
= get_index(cid
, &index
);
5983 dout(10) << "Error getting index for " << cid
<< ": " << cpp_strerror(r
)
5988 return index
->apply_layout_settings();
5992 // -- FSSuperblock --
5994 void FSSuperblock::encode(bufferlist
&bl
) const
5996 ENCODE_START(2, 1, bl
);
5997 compat_features
.encode(bl
);
5998 ::encode(omap_backend
, bl
);
6002 void FSSuperblock::decode(bufferlist::iterator
&bl
)
6004 DECODE_START(2, bl
);
6005 compat_features
.decode(bl
);
6007 ::decode(omap_backend
, bl
);
6009 omap_backend
= "leveldb";
6013 void FSSuperblock::dump(Formatter
*f
) const
6015 f
->open_object_section("compat");
6016 compat_features
.dump(f
);
6017 f
->dump_string("omap_backend", omap_backend
);
6021 void FSSuperblock::generate_test_instances(list
<FSSuperblock
*>& o
)
6024 o
.push_back(new FSSuperblock(z
));
6025 CompatSet::FeatureSet feature_compat
;
6026 CompatSet::FeatureSet feature_ro_compat
;
6027 CompatSet::FeatureSet feature_incompat
;
6028 feature_incompat
.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS
);
6029 z
.compat_features
= CompatSet(feature_compat
, feature_ro_compat
,
6031 o
.push_back(new FSSuperblock(z
));
6032 z
.omap_backend
= "rocksdb";
6033 o
.push_back(new FSSuperblock(z
));