]> git.proxmox.com Git - ceph.git/blame - ceph/src/os/filestore/FileStore.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / os / filestore / FileStore.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15#include "include/compat.h"
16#include "include/int_types.h"
17#include "boost/tuple/tuple.hpp"
18
19#include <unistd.h>
20#include <stdlib.h>
21#include <sys/types.h>
22#include <sys/stat.h>
23#include <fcntl.h>
24#include <sys/file.h>
25#include <errno.h>
26#include <dirent.h>
27#include <sys/ioctl.h>
28
29#if defined(__linux__)
30#include <linux/fs.h>
11fdf7f2 31#include <linux/falloc.h>
7c673cae
FG
32#endif
33
34#include <iostream>
35#include <map>
36
37#include "include/linux_fiemap.h"
38
7c673cae
FG
39#include "chain_xattr.h"
40
11fdf7f2 41#if defined(__APPLE__) || defined(__FreeBSD__)
7c673cae
FG
42#include <sys/param.h>
43#include <sys/mount.h>
11fdf7f2 44#endif
7c673cae
FG
45
46
47#include <fstream>
48#include <sstream>
49
50#include "FileStore.h"
51#include "GenericFileStoreBackend.h"
52#include "BtrfsFileStoreBackend.h"
53#include "XfsFileStoreBackend.h"
54#include "ZFSFileStoreBackend.h"
55#include "common/BackTrace.h"
56#include "include/types.h"
57#include "FileJournal.h"
58
59#include "osd/osd_types.h"
60#include "include/color.h"
61#include "include/buffer.h"
62
63#include "common/Timer.h"
64#include "common/debug.h"
65#include "common/errno.h"
66#include "common/run_cmd.h"
67#include "common/safe_io.h"
68#include "common/perf_counters.h"
69#include "common/sync_filesystem.h"
70#include "common/fd.h"
71#include "HashIndex.h"
72#include "DBObjectMap.h"
73#include "kv/KeyValueDB.h"
74
75#include "common/ceph_crypto.h"
7c673cae 76
11fdf7f2 77#include "include/ceph_assert.h"
7c673cae
FG
78
79#include "common/config.h"
80#include "common/blkdev.h"
81
82#ifdef WITH_LTTNG
83#define TRACEPOINT_DEFINE
84#define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
85#include "tracing/objectstore.h"
86#undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE
87#undef TRACEPOINT_DEFINE
88#else
89#define tracepoint(...)
90#endif
91
92#define dout_context cct
93#define dout_subsys ceph_subsys_filestore
94#undef dout_prefix
95#define dout_prefix *_dout << "filestore(" << basedir << ") "
96
97#define COMMIT_SNAP_ITEM "snap_%llu"
98#define CLUSTER_SNAP_ITEM "clustersnap_%s"
99
100#define REPLAY_GUARD_XATTR "user.cephos.seq"
101#define GLOBAL_REPLAY_GUARD_XATTR "user.cephos.gseq"
102
103// XATTR_SPILL_OUT_NAME as a xattr is used to maintain that indicates whether
104// xattrs spill over into DBObjectMap, if XATTR_SPILL_OUT_NAME exists in file
105// xattrs and the value is "no", it indicates no xattrs in DBObjectMap
106#define XATTR_SPILL_OUT_NAME "user.cephos.spill_out"
107#define XATTR_NO_SPILL_OUT "0"
108#define XATTR_SPILL_OUT "1"
31f18b77 109#define __FUNC__ __func__ << "(" << __LINE__ << ")"
7c673cae 110
f67539c2
TL
111using std::cerr;
112using std::list;
113using std::make_pair;
114using std::map;
115using std::ostream;
116using std::ostringstream;
117using std::set;
118using std::string;
119using std::stringstream;
120using std::vector;
121
122using ceph::crypto::SHA1;
123using ceph::BackTrace;
124using ceph::bufferlist;
125using ceph::bufferptr;
126using ceph::decode;
127using ceph::encode;
128using ceph::Formatter;
129using ceph::JSONFormatter;
130
7c673cae
FG
131//Initial features in new superblock.
132static CompatSet get_fs_initial_compat_set() {
133 CompatSet::FeatureSet ceph_osd_feature_compat;
134 CompatSet::FeatureSet ceph_osd_feature_ro_compat;
135 CompatSet::FeatureSet ceph_osd_feature_incompat;
136 return CompatSet(ceph_osd_feature_compat, ceph_osd_feature_ro_compat,
137 ceph_osd_feature_incompat);
138}
139
140//Features are added here that this FileStore supports.
141static CompatSet get_fs_supported_compat_set() {
142 CompatSet compat = get_fs_initial_compat_set();
143 //Any features here can be set in code, but not in initial superblock
144 compat.incompat.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS);
145 return compat;
146}
147
148int FileStore::validate_hobject_key(const hobject_t &obj) const
149{
150 unsigned len = LFNIndex::get_max_escaped_name_len(obj);
151 return len > m_filestore_max_xattr_value_size ? -ENAMETOOLONG : 0;
152}
153
154int FileStore::get_block_device_fsid(CephContext* cct, const string& path,
155 uuid_d *fsid)
156{
157 // make sure we don't try to use aio or direct_io (and get annoying
158 // error messages from failing to do so); performance implications
159 // should be irrelevant for this use
160 FileJournal j(cct, *fsid, 0, 0, path.c_str(), false, false);
161 return j.peek_fsid(*fsid);
162}
163
164void FileStore::FSPerfTracker::update_from_perfcounters(
165 PerfCounters &logger)
166{
11fdf7f2
TL
167 os_commit_latency_ns.consume_next(
168 logger.get_tavg_ns(
7c673cae 169 l_filestore_journal_latency));
11fdf7f2
TL
170 os_apply_latency_ns.consume_next(
171 logger.get_tavg_ns(
7c673cae
FG
172 l_filestore_apply_latency));
173}
174
175
176ostream& operator<<(ostream& out, const FileStore::OpSequencer& s)
177{
11fdf7f2 178 return out << "osr(" << s.cid << ")";
7c673cae
FG
179}
180
181int FileStore::get_cdir(const coll_t& cid, char *s, int len)
182{
183 const string &cid_str(cid.to_str());
184 return snprintf(s, len, "%s/current/%s", basedir.c_str(), cid_str.c_str());
185}
186
11fdf7f2
TL
187void FileStore::handle_eio()
188{
189 // don't try to map this back to an offset; too hard since there is
190 // a file system in between. we also don't really know whether this
191 // was a read or a write, since we have so many layers beneath us.
192 // don't even try.
193 note_io_error_event(devname.c_str(), basedir.c_str(), -EIO, 0, 0, 0);
194 ceph_abort_msg("unexpected eio error");
195}
196
7c673cae
FG
197int FileStore::get_index(const coll_t& cid, Index *index)
198{
199 int r = index_manager.get_index(cid, basedir, index);
11fdf7f2 200 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
201 return r;
202}
203
204int FileStore::init_index(const coll_t& cid)
205{
206 char path[PATH_MAX];
207 get_cdir(cid, path, sizeof(path));
208 int r = index_manager.init_index(cid, path, target_version);
11fdf7f2 209 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
210 return r;
211}
212
213int FileStore::lfn_find(const ghobject_t& oid, const Index& index, IndexedPath *path)
214{
215 IndexedPath path2;
216 if (!path)
217 path = &path2;
218 int r, exist;
11fdf7f2 219 ceph_assert(index.index);
7c673cae
FG
220 r = (index.index)->lookup(oid, path, &exist);
221 if (r < 0) {
11fdf7f2 222 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
223 return r;
224 }
225 if (!exist)
226 return -ENOENT;
227 return 0;
228}
229
230int FileStore::lfn_truncate(const coll_t& cid, const ghobject_t& oid, off_t length)
231{
232 FDRef fd;
233 int r = lfn_open(cid, oid, false, &fd);
234 if (r < 0)
235 return r;
236 r = ::ftruncate(**fd, length);
237 if (r < 0)
238 r = -errno;
239 if (r >= 0 && m_filestore_sloppy_crc) {
240 int rc = backend->_crc_update_truncate(**fd, length);
11fdf7f2 241 ceph_assert(rc >= 0);
7c673cae
FG
242 }
243 lfn_close(fd);
11fdf7f2 244 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
245 return r;
246}
247
248int FileStore::lfn_stat(const coll_t& cid, const ghobject_t& oid, struct stat *buf)
249{
250 IndexedPath path;
251 Index index;
252 int r = get_index(cid, &index);
253 if (r < 0)
254 return r;
255
11fdf7f2 256 ceph_assert(index.index);
9f95a23c 257 std::shared_lock l{(index.index)->access_lock};
7c673cae
FG
258
259 r = lfn_find(oid, index, &path);
260 if (r < 0)
261 return r;
262 r = ::stat(path->path(), buf);
263 if (r < 0)
264 r = -errno;
265 return r;
266}
267
268int FileStore::lfn_open(const coll_t& cid,
269 const ghobject_t& oid,
270 bool create,
271 FDRef *outfd,
272 Index *index)
273{
11fdf7f2 274 ceph_assert(outfd);
7c673cae
FG
275 int r = 0;
276 bool need_lock = true;
277 int flags = O_RDWR;
278
279 if (create)
280 flags |= O_CREAT;
281 if (cct->_conf->filestore_odsync_write) {
282 flags |= O_DSYNC;
283 }
284
285 Index index2;
286 if (!index) {
287 index = &index2;
288 }
289 if (!((*index).index)) {
290 r = get_index(cid, index);
291 if (r < 0) {
31f18b77 292 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
7c673cae
FG
293 return r;
294 }
295 } else {
296 need_lock = false;
297 }
298
299 int fd, exist;
11fdf7f2 300 ceph_assert((*index).index);
7c673cae 301 if (need_lock) {
9f95a23c 302 ((*index).index)->access_lock.lock();
7c673cae
FG
303 }
304 if (!replaying) {
305 *outfd = fdcache.lookup(oid);
306 if (*outfd) {
307 if (need_lock) {
9f95a23c 308 ((*index).index)->access_lock.unlock();
7c673cae
FG
309 }
310 return 0;
311 }
312 }
313
314
315 IndexedPath path2;
316 IndexedPath *path = &path2;
317
318 r = (*index)->lookup(oid, path, &exist);
319 if (r < 0) {
320 derr << "could not find " << oid << " in index: "
321 << cpp_strerror(-r) << dendl;
322 goto fail;
323 }
324
91327a77 325 r = ::open((*path)->path(), flags|O_CLOEXEC, 0644);
7c673cae
FG
326 if (r < 0) {
327 r = -errno;
328 dout(10) << "error opening file " << (*path)->path() << " with flags="
329 << flags << ": " << cpp_strerror(-r) << dendl;
330 goto fail;
331 }
332 fd = r;
333 if (create && (!exist)) {
334 r = (*index)->created(oid, (*path)->path());
335 if (r < 0) {
336 VOID_TEMP_FAILURE_RETRY(::close(fd));
337 derr << "error creating " << oid << " (" << (*path)->path()
338 << ") in index: " << cpp_strerror(-r) << dendl;
339 goto fail;
340 }
341 r = chain_fsetxattr<true, true>(
342 fd, XATTR_SPILL_OUT_NAME,
343 XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT));
344 if (r < 0) {
345 VOID_TEMP_FAILURE_RETRY(::close(fd));
346 derr << "error setting spillout xattr for oid " << oid << " (" << (*path)->path()
347 << "):" << cpp_strerror(-r) << dendl;
348 goto fail;
349 }
350 }
351
352 if (!replaying) {
353 bool existed;
354 *outfd = fdcache.add(oid, fd, &existed);
355 if (existed) {
356 TEMP_FAILURE_RETRY(::close(fd));
357 }
358 } else {
359 *outfd = std::make_shared<FDCache::FD>(fd);
360 }
361
362 if (need_lock) {
9f95a23c 363 ((*index).index)->access_lock.unlock();
7c673cae
FG
364 }
365
366 return 0;
367
368 fail:
369
370 if (need_lock) {
9f95a23c 371 ((*index).index)->access_lock.unlock();
7c673cae
FG
372 }
373
11fdf7f2 374 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
375 return r;
376}
377
378void FileStore::lfn_close(FDRef fd)
379{
380}
381
382int FileStore::lfn_link(const coll_t& c, const coll_t& newcid, const ghobject_t& o, const ghobject_t& newoid)
383{
384 Index index_new, index_old;
385 IndexedPath path_new, path_old;
386 int exist;
387 int r;
388 bool index_same = false;
389 if (c < newcid) {
390 r = get_index(newcid, &index_new);
391 if (r < 0)
392 return r;
393 r = get_index(c, &index_old);
394 if (r < 0)
395 return r;
396 } else if (c == newcid) {
397 r = get_index(c, &index_old);
398 if (r < 0)
399 return r;
400 index_new = index_old;
401 index_same = true;
402 } else {
403 r = get_index(c, &index_old);
404 if (r < 0)
405 return r;
406 r = get_index(newcid, &index_new);
407 if (r < 0)
408 return r;
409 }
410
11fdf7f2
TL
411 ceph_assert(index_old.index);
412 ceph_assert(index_new.index);
7c673cae
FG
413
414 if (!index_same) {
415
9f95a23c 416 std::shared_lock l1{(index_old.index)->access_lock};
7c673cae
FG
417
418 r = index_old->lookup(o, &path_old, &exist);
419 if (r < 0) {
11fdf7f2 420 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
421 return r;
422 }
423 if (!exist)
424 return -ENOENT;
425
9f95a23c 426 std::unique_lock l2{(index_new.index)->access_lock};
7c673cae
FG
427
428 r = index_new->lookup(newoid, &path_new, &exist);
429 if (r < 0) {
11fdf7f2 430 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
431 return r;
432 }
433 if (exist)
434 return -EEXIST;
435
31f18b77
FG
436 dout(25) << __FUNC__ << ": path_old: " << path_old << dendl;
437 dout(25) << __FUNC__ << ": path_new: " << path_new << dendl;
7c673cae
FG
438 r = ::link(path_old->path(), path_new->path());
439 if (r < 0)
440 return -errno;
441
442 r = index_new->created(newoid, path_new->path());
443 if (r < 0) {
11fdf7f2 444 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
445 return r;
446 }
447 } else {
9f95a23c 448 std::unique_lock l1{(index_old.index)->access_lock};
7c673cae
FG
449
450 r = index_old->lookup(o, &path_old, &exist);
451 if (r < 0) {
11fdf7f2 452 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
453 return r;
454 }
455 if (!exist)
456 return -ENOENT;
457
458 r = index_new->lookup(newoid, &path_new, &exist);
459 if (r < 0) {
11fdf7f2 460 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
461 return r;
462 }
463 if (exist)
464 return -EEXIST;
465
31f18b77
FG
466 dout(25) << __FUNC__ << ": path_old: " << path_old << dendl;
467 dout(25) << __FUNC__ << ": path_new: " << path_new << dendl;
7c673cae
FG
468 r = ::link(path_old->path(), path_new->path());
469 if (r < 0)
470 return -errno;
471
472 // make sure old fd for unlinked/overwritten file is gone
473 fdcache.clear(newoid);
474
475 r = index_new->created(newoid, path_new->path());
476 if (r < 0) {
11fdf7f2 477 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
478 return r;
479 }
480 }
481 return 0;
482}
483
484int FileStore::lfn_unlink(const coll_t& cid, const ghobject_t& o,
485 const SequencerPosition &spos,
486 bool force_clear_omap)
487{
488 Index index;
489 int r = get_index(cid, &index);
490 if (r < 0) {
31f18b77 491 dout(25) << __FUNC__ << ": get_index failed " << cpp_strerror(r) << dendl;
7c673cae
FG
492 return r;
493 }
494
11fdf7f2 495 ceph_assert(index.index);
9f95a23c 496 std::unique_lock l{(index.index)->access_lock};
7c673cae
FG
497
498 {
499 IndexedPath path;
500 int hardlink;
501 r = index->lookup(o, &path, &hardlink);
502 if (r < 0) {
11fdf7f2 503 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
504 return r;
505 }
506
507 if (!force_clear_omap) {
508 if (hardlink == 0 || hardlink == 1) {
509 force_clear_omap = true;
510 }
511 }
512 if (force_clear_omap) {
31f18b77 513 dout(20) << __FUNC__ << ": clearing omap on " << o
7c673cae
FG
514 << " in cid " << cid << dendl;
515 r = object_map->clear(o, &spos);
516 if (r < 0 && r != -ENOENT) {
31f18b77 517 dout(25) << __FUNC__ << ": omap clear failed " << cpp_strerror(r) << dendl;
11fdf7f2 518 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
519 return r;
520 }
521 if (cct->_conf->filestore_debug_inject_read_err) {
522 debug_obj_on_delete(o);
523 }
524 if (!m_disable_wbthrottle) {
525 wbthrottle.clear_object(o); // should be only non-cache ref
526 }
527 fdcache.clear(o);
528 } else {
529 /* Ensure that replay of this op doesn't result in the object_map
530 * going away.
531 */
532 if (!backend->can_checkpoint())
533 object_map->sync(&o, &spos);
534 }
535 if (hardlink == 0) {
536 if (!m_disable_wbthrottle) {
537 wbthrottle.clear_object(o); // should be only non-cache ref
538 }
539 return 0;
540 }
541 }
542 r = index->unlink(o);
543 if (r < 0) {
31f18b77 544 dout(25) << __FUNC__ << ": index unlink failed " << cpp_strerror(r) << dendl;
7c673cae
FG
545 return r;
546 }
547 return 0;
548}
549
550FileStore::FileStore(CephContext* cct, const std::string &base,
551 const std::string &jdev, osflagbits_t flags,
552 const char *name, bool do_update) :
553 JournalingObjectStore(cct, base),
554 internal_name(name),
555 basedir(base), journalpath(jdev),
556 generic_flags(flags),
557 blk_size(0),
558 fsid_fd(-1), op_fd(-1),
559 basedir_fd(-1), current_fd(-1),
11fdf7f2 560 backend(nullptr),
7c673cae 561 index_manager(cct, do_update),
7c673cae 562 force_sync(false),
7c673cae
FG
563 timer(cct, sync_entry_timeo_lock),
564 stop(false), sync_thread(this),
565 fdcache(cct),
566 wbthrottle(cct),
567 next_osr_id(0),
568 m_disable_wbthrottle(cct->_conf->filestore_odsync_write ||
569 !cct->_conf->filestore_wbthrottle_enable),
570 throttle_ops(cct, "filestore_ops", cct->_conf->filestore_caller_concurrency),
571 throttle_bytes(cct, "filestore_bytes", cct->_conf->filestore_caller_concurrency),
572 m_ondisk_finisher_num(cct->_conf->filestore_ondisk_finisher_threads),
573 m_apply_finisher_num(cct->_conf->filestore_apply_finisher_threads),
574 op_tp(cct, "FileStore::op_tp", "tp_fstore_op", cct->_conf->filestore_op_threads, "filestore_op_threads"),
f67539c2
TL
575 op_wq(this,
576 ceph::make_timespan(cct->_conf->filestore_op_thread_timeout),
577 ceph::make_timespan(cct->_conf->filestore_op_thread_suicide_timeout),
578 &op_tp),
11fdf7f2 579 logger(nullptr),
7c673cae 580 trace_endpoint("0.0.0.0", 0, "FileStore"),
7c673cae
FG
581 m_filestore_commit_timeout(cct->_conf->filestore_commit_timeout),
582 m_filestore_journal_parallel(cct->_conf->filestore_journal_parallel ),
583 m_filestore_journal_trailing(cct->_conf->filestore_journal_trailing),
584 m_filestore_journal_writeahead(cct->_conf->filestore_journal_writeahead),
585 m_filestore_fiemap_threshold(cct->_conf->filestore_fiemap_threshold),
586 m_filestore_max_sync_interval(cct->_conf->filestore_max_sync_interval),
587 m_filestore_min_sync_interval(cct->_conf->filestore_min_sync_interval),
588 m_filestore_fail_eio(cct->_conf->filestore_fail_eio),
589 m_filestore_fadvise(cct->_conf->filestore_fadvise),
590 do_update(do_update),
591 m_journal_dio(cct->_conf->journal_dio),
592 m_journal_aio(cct->_conf->journal_aio),
593 m_journal_force_aio(cct->_conf->journal_force_aio),
594 m_osd_rollback_to_cluster_snap(cct->_conf->osd_rollback_to_cluster_snap),
595 m_osd_use_stale_snap(cct->_conf->osd_use_stale_snap),
596 m_filestore_do_dump(false),
597 m_filestore_dump_fmt(true),
598 m_filestore_sloppy_crc(cct->_conf->filestore_sloppy_crc),
599 m_filestore_sloppy_crc_block_size(cct->_conf->filestore_sloppy_crc_block_size),
600 m_filestore_max_alloc_hint_size(cct->_conf->filestore_max_alloc_hint_size),
601 m_fs_type(0),
602 m_filestore_max_inline_xattr_size(0),
603 m_filestore_max_inline_xattrs(0),
604 m_filestore_max_xattr_value_size(0)
605{
31f18b77 606 m_filestore_kill_at = cct->_conf->filestore_kill_at;
7c673cae
FG
607 for (int i = 0; i < m_ondisk_finisher_num; ++i) {
608 ostringstream oss;
609 oss << "filestore-ondisk-" << i;
610 Finisher *f = new Finisher(cct, oss.str(), "fn_odsk_fstore");
611 ondisk_finishers.push_back(f);
612 }
613 for (int i = 0; i < m_apply_finisher_num; ++i) {
614 ostringstream oss;
615 oss << "filestore-apply-" << i;
616 Finisher *f = new Finisher(cct, oss.str(), "fn_appl_fstore");
617 apply_finishers.push_back(f);
618 }
619
620 ostringstream oss;
621 oss << basedir << "/current";
622 current_fn = oss.str();
623
624 ostringstream sss;
625 sss << basedir << "/current/commit_op_seq";
626 current_op_seq_fn = sss.str();
627
628 ostringstream omss;
629 if (cct->_conf->filestore_omap_backend_path != "") {
630 omap_dir = cct->_conf->filestore_omap_backend_path;
631 } else {
632 omss << basedir << "/current/omap";
633 omap_dir = omss.str();
634 }
635
636 // initialize logger
637 PerfCountersBuilder plb(cct, internal_name, l_filestore_first, l_filestore_last);
638
639 plb.add_u64(l_filestore_journal_queue_ops, "journal_queue_ops", "Operations in journal queue");
640 plb.add_u64(l_filestore_journal_ops, "journal_ops", "Active journal entries to be applied");
641 plb.add_u64(l_filestore_journal_queue_bytes, "journal_queue_bytes", "Size of journal queue");
642 plb.add_u64(l_filestore_journal_bytes, "journal_bytes", "Active journal operation size to be applied");
28e407b8
AA
643 plb.add_time_avg(l_filestore_journal_latency, "journal_latency", "Average journal queue completing latency",
644 NULL, PerfCountersBuilder::PRIO_USEFUL);
7c673cae
FG
645 plb.add_u64_counter(l_filestore_journal_wr, "journal_wr", "Journal write IOs");
646 plb.add_u64_avg(l_filestore_journal_wr_bytes, "journal_wr_bytes", "Journal data written");
647 plb.add_u64(l_filestore_op_queue_max_ops, "op_queue_max_ops", "Max operations in writing to FS queue");
648 plb.add_u64(l_filestore_op_queue_ops, "op_queue_ops", "Operations in writing to FS queue");
649 plb.add_u64_counter(l_filestore_ops, "ops", "Operations written to store");
650 plb.add_u64(l_filestore_op_queue_max_bytes, "op_queue_max_bytes", "Max data in writing to FS queue");
651 plb.add_u64(l_filestore_op_queue_bytes, "op_queue_bytes", "Size of writing to FS queue");
652 plb.add_u64_counter(l_filestore_bytes, "bytes", "Data written to store");
653 plb.add_time_avg(l_filestore_apply_latency, "apply_latency", "Apply latency");
654 plb.add_u64(l_filestore_committing, "committing", "Is currently committing");
655
656 plb.add_u64_counter(l_filestore_commitcycle, "commitcycle", "Commit cycles");
657 plb.add_time_avg(l_filestore_commitcycle_interval, "commitcycle_interval", "Average interval between commits");
658 plb.add_time_avg(l_filestore_commitcycle_latency, "commitcycle_latency", "Average latency of commit");
659 plb.add_u64_counter(l_filestore_journal_full, "journal_full", "Journal writes while full");
28e407b8
AA
660 plb.add_time_avg(l_filestore_queue_transaction_latency_avg, "queue_transaction_latency_avg",
661 "Store operation queue latency", NULL, PerfCountersBuilder::PRIO_USEFUL);
224ce89b 662 plb.add_time(l_filestore_sync_pause_max_lat, "sync_pause_max_latency", "Max latency of op_wq pause before syncfs");
7c673cae
FG
663
664 logger = plb.create_perf_counters();
665
666 cct->get_perfcounters_collection()->add(logger);
11fdf7f2 667 cct->_conf.add_observer(this);
7c673cae
FG
668
669 superblock.compat_features = get_fs_initial_compat_set();
670}
671
672FileStore::~FileStore()
673{
f67539c2 674 for (auto it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
7c673cae 675 delete *it;
11fdf7f2 676 *it = nullptr;
7c673cae 677 }
f67539c2 678 for (auto it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
7c673cae 679 delete *it;
11fdf7f2 680 *it = nullptr;
7c673cae 681 }
11fdf7f2 682 cct->_conf.remove_observer(this);
7c673cae
FG
683 cct->get_perfcounters_collection()->remove(logger);
684
685 if (journal)
11fdf7f2 686 journal->logger = nullptr;
7c673cae 687 delete logger;
11fdf7f2 688 logger = nullptr;
7c673cae
FG
689
690 if (m_filestore_do_dump) {
691 dump_stop();
692 }
693}
694
695static void get_attrname(const char *name, char *buf, int len)
696{
697 snprintf(buf, len, "user.ceph.%s", name);
698}
699
700bool parse_attrname(char **name)
701{
702 if (strncmp(*name, "user.ceph.", 10) == 0) {
703 *name += 10;
704 return true;
705 }
706 return false;
707}
708
709void FileStore::collect_metadata(map<string,string> *pm)
710{
711 char partition_path[PATH_MAX];
712 char dev_node[PATH_MAX];
7c673cae
FG
713
714 (*pm)["filestore_backend"] = backend->get_name();
715 ostringstream ss;
716 ss << "0x" << std::hex << m_fs_type << std::dec;
717 (*pm)["filestore_f_type"] = ss.str();
718
719 if (cct->_conf->filestore_collect_device_partition_information) {
11fdf7f2
TL
720 int rc = 0;
721 BlkDev blkdev(fsid_fd);
722 if (rc = blkdev.partition(partition_path, PATH_MAX); rc) {
7c673cae 723 (*pm)["backend_filestore_partition_path"] = "unknown";
11fdf7f2 724 } else {
7c673cae 725 (*pm)["backend_filestore_partition_path"] = string(partition_path);
11fdf7f2
TL
726 }
727 if (rc = blkdev.wholedisk(dev_node, PATH_MAX); rc) {
7c673cae 728 (*pm)["backend_filestore_dev_node"] = "unknown";
11fdf7f2 729 } else {
7c673cae 730 (*pm)["backend_filestore_dev_node"] = string(dev_node);
11fdf7f2
TL
731 devname = dev_node;
732 }
733 if (rc == 0 && vdo_fd >= 0) {
734 (*pm)["vdo"] = "true";
735 (*pm)["vdo_physical_size"] =
736 stringify(4096 * get_vdo_stat(vdo_fd, "physical_blocks"));
737 }
738 if (journal) {
739 journal->collect_metadata(pm);
740 }
7c673cae
FG
741 }
742}
743
11fdf7f2
TL
744int FileStore::get_devices(set<string> *ls)
745{
746 string dev_node;
747 BlkDev blkdev(fsid_fd);
748 if (int rc = blkdev.wholedisk(&dev_node); rc) {
749 return rc;
750 }
751 get_raw_devices(dev_node, ls);
752 if (journal) {
753 journal->get_devices(ls);
754 }
755 return 0;
756}
757
758int FileStore::statfs(struct store_statfs_t *buf0, osd_alert_list_t* alerts)
7c673cae
FG
759{
760 struct statfs buf;
761 buf0->reset();
11fdf7f2
TL
762 if (alerts) {
763 alerts->clear(); // returns nothing for now
764 }
7c673cae
FG
765 if (::statfs(basedir.c_str(), &buf) < 0) {
766 int r = -errno;
11fdf7f2
TL
767 if (r == -EIO && m_filestore_fail_eio) handle_eio();
768 ceph_assert(r != -ENOENT);
7c673cae
FG
769 return r;
770 }
11fdf7f2
TL
771
772 uint64_t bfree = buf.f_bavail * buf.f_bsize;
773
774 // assume all of leveldb/rocksdb is omap.
775 {
776 map<string,uint64_t> kv_usage;
777 buf0->omap_allocated += object_map->get_db()->get_estimated_size(kv_usage);
778 }
779
780 uint64_t thin_total, thin_avail;
781 if (get_vdo_utilization(vdo_fd, &thin_total, &thin_avail)) {
782 buf0->total = thin_total;
783 bfree = std::min(bfree, thin_avail);
784 buf0->allocated = thin_total - thin_avail;
785 buf0->data_stored = bfree;
786 } else {
787 buf0->total = buf.f_blocks * buf.f_bsize;
788 buf0->allocated = bfree;
789 buf0->data_stored = bfree;
790 }
791 buf0->available = bfree;
792
793 // FIXME: we don't know how to populate buf->internal_metadata; XFS doesn't
794 // tell us what its internal overhead is.
795
7c673cae
FG
796 // Adjust for writes pending in the journal
797 if (journal) {
798 uint64_t estimate = journal->get_journal_size_estimate();
11fdf7f2 799 buf0->internally_reserved = estimate;
7c673cae
FG
800 if (buf0->available > estimate)
801 buf0->available -= estimate;
802 else
803 buf0->available = 0;
804 }
11fdf7f2 805
7c673cae
FG
806 return 0;
807}
808
9f95a23c
TL
809int FileStore::pool_statfs(uint64_t pool_id, struct store_statfs_t *buf,
810 bool *per_pool_omap)
11fdf7f2
TL
811{
812 return -ENOTSUP;
813}
7c673cae
FG
814
815void FileStore::new_journal()
816{
817 if (journalpath.length()) {
818 dout(10) << "open_journal at " << journalpath << dendl;
819 journal = new FileJournal(cct, fsid, &finisher, &sync_cond,
820 journalpath.c_str(),
821 m_journal_dio, m_journal_aio,
822 m_journal_force_aio);
823 if (journal)
824 journal->logger = logger;
825 }
826 return;
827}
828
829int FileStore::dump_journal(ostream& out)
830{
831 int r;
832
833 if (!journalpath.length())
834 return -EINVAL;
835
836 FileJournal *journal = new FileJournal(cct, fsid, &finisher, &sync_cond, journalpath.c_str(), m_journal_dio);
837 r = journal->dump(out);
838 delete journal;
11fdf7f2 839 journal = nullptr;
7c673cae
FG
840 return r;
841}
842
11fdf7f2 843FileStoreBackend *FileStoreBackend::create(unsigned long f_type, FileStore *fs)
7c673cae
FG
844{
845 switch (f_type) {
846#if defined(__linux__)
847 case BTRFS_SUPER_MAGIC:
848 return new BtrfsFileStoreBackend(fs);
849# ifdef HAVE_LIBXFS
850 case XFS_SUPER_MAGIC:
851 return new XfsFileStoreBackend(fs);
852# endif
853#endif
854#ifdef HAVE_LIBZFS
855 case ZFS_SUPER_MAGIC:
856 return new ZFSFileStoreBackend(fs);
857#endif
858 default:
859 return new GenericFileStoreBackend(fs);
860 }
861}
862
11fdf7f2 863void FileStore::create_backend(unsigned long f_type)
7c673cae
FG
864{
865 m_fs_type = f_type;
866
11fdf7f2 867 ceph_assert(!backend);
7c673cae
FG
868 backend = FileStoreBackend::create(f_type, this);
869
870 dout(0) << "backend " << backend->get_name()
871 << " (magic 0x" << std::hex << f_type << std::dec << ")"
872 << dendl;
873
874 switch (f_type) {
875#if defined(__linux__)
876 case BTRFS_SUPER_MAGIC:
877 if (!m_disable_wbthrottle){
878 wbthrottle.set_fs(WBThrottle::BTRFS);
879 }
880 break;
881
882 case XFS_SUPER_MAGIC:
883 // wbthrottle is constructed with fs(WBThrottle::XFS)
884 break;
885#endif
886 }
887
888 set_xattr_limits_via_conf();
889}
890
891int FileStore::mkfs()
892{
893 int ret = 0;
894 char fsid_fn[PATH_MAX];
895 char fsid_str[40];
896 uuid_d old_fsid;
897 uuid_d old_omap_fsid;
898
899 dout(1) << "mkfs in " << basedir << dendl;
91327a77 900 basedir_fd = ::open(basedir.c_str(), O_RDONLY|O_CLOEXEC);
7c673cae
FG
901 if (basedir_fd < 0) {
902 ret = -errno;
224ce89b 903 derr << __FUNC__ << ": failed to open base dir " << basedir << ": " << cpp_strerror(ret) << dendl;
7c673cae
FG
904 return ret;
905 }
906
907 // open+lock fsid
908 snprintf(fsid_fn, sizeof(fsid_fn), "%s/fsid", basedir.c_str());
91327a77 909 fsid_fd = ::open(fsid_fn, O_RDWR|O_CREAT|O_CLOEXEC, 0644);
7c673cae
FG
910 if (fsid_fd < 0) {
911 ret = -errno;
224ce89b 912 derr << __FUNC__ << ": failed to open " << fsid_fn << ": " << cpp_strerror(ret) << dendl;
7c673cae
FG
913 goto close_basedir_fd;
914 }
915
916 if (lock_fsid() < 0) {
917 ret = -EBUSY;
918 goto close_fsid_fd;
919 }
920
921 if (read_fsid(fsid_fd, &old_fsid) < 0 || old_fsid.is_zero()) {
922 if (fsid.is_zero()) {
923 fsid.generate_random();
224ce89b 924 dout(1) << __FUNC__ << ": generated fsid " << fsid << dendl;
7c673cae 925 } else {
224ce89b 926 dout(1) << __FUNC__ << ": using provided fsid " << fsid << dendl;
7c673cae
FG
927 }
928
929 fsid.print(fsid_str);
930 strcat(fsid_str, "\n");
931 ret = ::ftruncate(fsid_fd, 0);
932 if (ret < 0) {
933 ret = -errno;
31f18b77 934 derr << __FUNC__ << ": failed to truncate fsid: "
7c673cae
FG
935 << cpp_strerror(ret) << dendl;
936 goto close_fsid_fd;
937 }
938 ret = safe_write(fsid_fd, fsid_str, strlen(fsid_str));
939 if (ret < 0) {
31f18b77 940 derr << __FUNC__ << ": failed to write fsid: "
7c673cae
FG
941 << cpp_strerror(ret) << dendl;
942 goto close_fsid_fd;
943 }
944 if (::fsync(fsid_fd) < 0) {
945 ret = -errno;
31f18b77 946 derr << __FUNC__ << ": close failed: can't write fsid: "
7c673cae
FG
947 << cpp_strerror(ret) << dendl;
948 goto close_fsid_fd;
949 }
224ce89b 950 dout(10) << __FUNC__ << ": fsid is " << fsid << dendl;
7c673cae
FG
951 } else {
952 if (!fsid.is_zero() && fsid != old_fsid) {
31f18b77 953 derr << __FUNC__ << ": on-disk fsid " << old_fsid << " != provided " << fsid << dendl;
7c673cae
FG
954 ret = -EINVAL;
955 goto close_fsid_fd;
956 }
957 fsid = old_fsid;
31f18b77 958 dout(1) << __FUNC__ << ": fsid is already set to " << fsid << dendl;
7c673cae
FG
959 }
960
961 // version stamp
962 ret = write_version_stamp();
963 if (ret < 0) {
31f18b77 964 derr << __FUNC__ << ": write_version_stamp() failed: "
7c673cae
FG
965 << cpp_strerror(ret) << dendl;
966 goto close_fsid_fd;
967 }
968
969 // superblock
970 superblock.omap_backend = cct->_conf->filestore_omap_backend;
971 ret = write_superblock();
972 if (ret < 0) {
31f18b77 973 derr << __FUNC__ << ": write_superblock() failed: "
7c673cae
FG
974 << cpp_strerror(ret) << dendl;
975 goto close_fsid_fd;
976 }
977
978 struct statfs basefs;
979 ret = ::fstatfs(basedir_fd, &basefs);
980 if (ret < 0) {
981 ret = -errno;
31f18b77 982 derr << __FUNC__ << ": cannot fstatfs basedir "
7c673cae
FG
983 << cpp_strerror(ret) << dendl;
984 goto close_fsid_fd;
985 }
986
224ce89b
WB
987#if defined(__linux__)
988 if (basefs.f_type == BTRFS_SUPER_MAGIC &&
989 !g_ceph_context->check_experimental_feature_enabled("btrfs")) {
990 derr << __FUNC__ << ": deprecated btrfs support is not enabled" << dendl;
991 goto close_fsid_fd;
992 }
993#endif
994
7c673cae
FG
995 create_backend(basefs.f_type);
996
997 ret = backend->create_current();
998 if (ret < 0) {
31f18b77 999 derr << __FUNC__ << ": failed to create current/ " << cpp_strerror(ret) << dendl;
7c673cae
FG
1000 goto close_fsid_fd;
1001 }
1002
1003 // write initial op_seq
1004 {
1005 uint64_t initial_seq = 0;
1006 int fd = read_op_seq(&initial_seq);
1007 if (fd < 0) {
1008 ret = fd;
31f18b77 1009 derr << __FUNC__ << ": failed to create " << current_op_seq_fn << ": "
7c673cae
FG
1010 << cpp_strerror(ret) << dendl;
1011 goto close_fsid_fd;
1012 }
1013 if (initial_seq == 0) {
1014 ret = write_op_seq(fd, 1);
1015 if (ret < 0) {
1016 VOID_TEMP_FAILURE_RETRY(::close(fd));
31f18b77 1017 derr << __FUNC__ << ": failed to write to " << current_op_seq_fn << ": "
7c673cae
FG
1018 << cpp_strerror(ret) << dendl;
1019 goto close_fsid_fd;
1020 }
1021
1022 if (backend->can_checkpoint()) {
1023 // create snap_1 too
91327a77 1024 current_fd = ::open(current_fn.c_str(), O_RDONLY|O_CLOEXEC);
11fdf7f2 1025 ceph_assert(current_fd >= 0);
7c673cae
FG
1026 char s[NAME_MAX];
1027 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, 1ull);
11fdf7f2 1028 ret = backend->create_checkpoint(s, nullptr);
7c673cae
FG
1029 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
1030 if (ret < 0 && ret != -EEXIST) {
1031 VOID_TEMP_FAILURE_RETRY(::close(fd));
31f18b77 1032 derr << __FUNC__ << ": failed to create snap_1: " << cpp_strerror(ret) << dendl;
7c673cae
FG
1033 goto close_fsid_fd;
1034 }
1035 }
1036 }
1037 VOID_TEMP_FAILURE_RETRY(::close(fd));
1038 }
1039 ret = KeyValueDB::test_init(superblock.omap_backend, omap_dir);
1040 if (ret < 0) {
31f18b77 1041 derr << __FUNC__ << ": failed to create " << cct->_conf->filestore_omap_backend << dendl;
7c673cae
FG
1042 goto close_fsid_fd;
1043 }
1044 // create fsid under omap
1045 // open+lock fsid
1046 int omap_fsid_fd;
1047 char omap_fsid_fn[PATH_MAX];
1048 snprintf(omap_fsid_fn, sizeof(omap_fsid_fn), "%s/osd_uuid", omap_dir.c_str());
91327a77 1049 omap_fsid_fd = ::open(omap_fsid_fn, O_RDWR|O_CREAT|O_CLOEXEC, 0644);
7c673cae
FG
1050 if (omap_fsid_fd < 0) {
1051 ret = -errno;
31f18b77 1052 derr << __FUNC__ << ": failed to open " << omap_fsid_fn << ": " << cpp_strerror(ret) << dendl;
7c673cae
FG
1053 goto close_fsid_fd;
1054 }
1055
1056 if (read_fsid(omap_fsid_fd, &old_omap_fsid) < 0 || old_omap_fsid.is_zero()) {
11fdf7f2 1057 ceph_assert(!fsid.is_zero());
7c673cae
FG
1058 fsid.print(fsid_str);
1059 strcat(fsid_str, "\n");
1060 ret = ::ftruncate(omap_fsid_fd, 0);
1061 if (ret < 0) {
1062 ret = -errno;
31f18b77 1063 derr << __FUNC__ << ": failed to truncate fsid: "
7c673cae
FG
1064 << cpp_strerror(ret) << dendl;
1065 goto close_omap_fsid_fd;
1066 }
1067 ret = safe_write(omap_fsid_fd, fsid_str, strlen(fsid_str));
1068 if (ret < 0) {
31f18b77 1069 derr << __FUNC__ << ": failed to write fsid: "
7c673cae
FG
1070 << cpp_strerror(ret) << dendl;
1071 goto close_omap_fsid_fd;
1072 }
31f18b77 1073 dout(10) << __FUNC__ << ": write success, fsid:" << fsid_str << ", ret:" << ret << dendl;
7c673cae
FG
1074 if (::fsync(omap_fsid_fd) < 0) {
1075 ret = -errno;
31f18b77 1076 derr << __FUNC__ << ": close failed: can't write fsid: "
7c673cae
FG
1077 << cpp_strerror(ret) << dendl;
1078 goto close_omap_fsid_fd;
1079 }
1080 dout(10) << "mkfs omap fsid is " << fsid << dendl;
1081 } else {
1082 if (fsid != old_omap_fsid) {
31f18b77 1083 derr << __FUNC__ << ": " << omap_fsid_fn
7c673cae
FG
1084 << " has existed omap fsid " << old_omap_fsid
1085 << " != expected osd fsid " << fsid
1086 << dendl;
1087 ret = -EINVAL;
1088 goto close_omap_fsid_fd;
1089 }
31f18b77 1090 dout(1) << __FUNC__ << ": omap fsid is already set to " << fsid << dendl;
7c673cae
FG
1091 }
1092
1093 dout(1) << cct->_conf->filestore_omap_backend << " db exists/created" << dendl;
1094
1095 // journal?
1096 ret = mkjournal();
1097 if (ret)
1098 goto close_omap_fsid_fd;
1099
1100 ret = write_meta("type", "filestore");
1101 if (ret)
1102 goto close_omap_fsid_fd;
1103
1104 dout(1) << "mkfs done in " << basedir << dendl;
1105 ret = 0;
1106
1107 close_omap_fsid_fd:
1108 VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd));
1109 close_fsid_fd:
1110 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1111 fsid_fd = -1;
1112 close_basedir_fd:
1113 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1114 delete backend;
11fdf7f2 1115 backend = nullptr;
7c673cae
FG
1116 return ret;
1117}
1118
1119int FileStore::mkjournal()
1120{
1121 // read fsid
1122 int ret;
1123 char fn[PATH_MAX];
1124 snprintf(fn, sizeof(fn), "%s/fsid", basedir.c_str());
91327a77 1125 int fd = ::open(fn, O_RDONLY|O_CLOEXEC, 0644);
7c673cae
FG
1126 if (fd < 0) {
1127 int err = errno;
31f18b77 1128 derr << __FUNC__ << ": open error: " << cpp_strerror(err) << dendl;
7c673cae
FG
1129 return -err;
1130 }
1131 ret = read_fsid(fd, &fsid);
1132 if (ret < 0) {
31f18b77 1133 derr << __FUNC__ << ": read error: " << cpp_strerror(ret) << dendl;
7c673cae
FG
1134 VOID_TEMP_FAILURE_RETRY(::close(fd));
1135 return ret;
1136 }
1137 VOID_TEMP_FAILURE_RETRY(::close(fd));
1138
1139 ret = 0;
1140
1141 new_journal();
1142 if (journal) {
1143 ret = journal->check();
1144 if (ret < 0) {
1145 ret = journal->create();
1146 if (ret)
31f18b77 1147 derr << __FUNC__ << ": error creating journal on " << journalpath
7c673cae
FG
1148 << ": " << cpp_strerror(ret) << dendl;
1149 else
31f18b77 1150 dout(0) << __FUNC__ << ": created journal on " << journalpath << dendl;
7c673cae
FG
1151 }
1152 delete journal;
11fdf7f2 1153 journal = nullptr;
7c673cae
FG
1154 }
1155 return ret;
1156}
1157
1158int FileStore::read_fsid(int fd, uuid_d *uuid)
1159{
1160 char fsid_str[40];
1161 memset(fsid_str, 0, sizeof(fsid_str));
1162 int ret = safe_read(fd, fsid_str, sizeof(fsid_str));
1163 if (ret < 0)
1164 return ret;
1165 if (ret == 8) {
1166 // old 64-bit fsid... mirror it.
1167 *(uint64_t*)&uuid->bytes()[0] = *(uint64_t*)fsid_str;
1168 *(uint64_t*)&uuid->bytes()[8] = *(uint64_t*)fsid_str;
1169 return 0;
1170 }
1171
1172 if (ret > 36)
1173 fsid_str[36] = 0;
1174 else
1175 fsid_str[ret] = 0;
1176 if (!uuid->parse(fsid_str))
1177 return -EINVAL;
1178 return 0;
1179}
1180
1181int FileStore::lock_fsid()
1182{
1183 struct flock l;
1184 memset(&l, 0, sizeof(l));
1185 l.l_type = F_WRLCK;
1186 l.l_whence = SEEK_SET;
1187 l.l_start = 0;
1188 l.l_len = 0;
1189 int r = ::fcntl(fsid_fd, F_SETLK, &l);
1190 if (r < 0) {
1191 int err = errno;
31f18b77 1192 dout(0) << __FUNC__ << ": failed to lock " << basedir << "/fsid, is another ceph-osd still running? "
7c673cae
FG
1193 << cpp_strerror(err) << dendl;
1194 return -err;
1195 }
1196 return 0;
1197}
1198
1199bool FileStore::test_mount_in_use()
1200{
31f18b77 1201 dout(5) << __FUNC__ << ": basedir " << basedir << " journal " << journalpath << dendl;
7c673cae
FG
1202 char fn[PATH_MAX];
1203 snprintf(fn, sizeof(fn), "%s/fsid", basedir.c_str());
1204
1205 // verify fs isn't in use
1206
91327a77 1207 fsid_fd = ::open(fn, O_RDWR|O_CLOEXEC, 0644);
7c673cae
FG
1208 if (fsid_fd < 0)
1209 return 0; // no fsid, ok.
1210 bool inuse = lock_fsid() < 0;
1211 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1212 fsid_fd = -1;
1213 return inuse;
1214}
1215
31f18b77
FG
1216bool FileStore::is_rotational()
1217{
1218 bool rotational;
1219 if (backend) {
1220 rotational = backend->is_rotational();
1221 } else {
91327a77 1222 int fd = ::open(basedir.c_str(), O_RDONLY|O_CLOEXEC);
31f18b77
FG
1223 if (fd < 0)
1224 return true;
1225 struct statfs st;
1226 int r = ::fstatfs(fd, &st);
1227 ::close(fd);
1228 if (r < 0) {
1229 return true;
1230 }
1231 create_backend(st.f_type);
1232 rotational = backend->is_rotational();
1233 delete backend;
11fdf7f2 1234 backend = nullptr;
31f18b77
FG
1235 }
1236 dout(10) << __func__ << " " << (int)rotational << dendl;
1237 return rotational;
1238}
1239
d2e6a577
FG
1240bool FileStore::is_journal_rotational()
1241{
1242 bool journal_rotational;
1243 if (backend) {
1244 journal_rotational = backend->is_journal_rotational();
1245 } else {
91327a77 1246 int fd = ::open(journalpath.c_str(), O_RDONLY|O_CLOEXEC);
d2e6a577
FG
1247 if (fd < 0)
1248 return true;
1249 struct statfs st;
1250 int r = ::fstatfs(fd, &st);
1251 ::close(fd);
1252 if (r < 0) {
1253 return true;
1254 }
1255 create_backend(st.f_type);
1256 journal_rotational = backend->is_journal_rotational();
1257 delete backend;
11fdf7f2 1258 backend = nullptr;
d2e6a577
FG
1259 }
1260 dout(10) << __func__ << " " << (int)journal_rotational << dendl;
1261 return journal_rotational;
1262}
1263
7c673cae
FG
1264int FileStore::_detect_fs()
1265{
1266 struct statfs st;
1267 int r = ::fstatfs(basedir_fd, &st);
1268 if (r < 0)
1269 return -errno;
1270
1271 blk_size = st.f_bsize;
1272
224ce89b
WB
1273#if defined(__linux__)
1274 if (st.f_type == BTRFS_SUPER_MAGIC &&
1275 !g_ceph_context->check_experimental_feature_enabled("btrfs")) {
1276 derr <<__FUNC__ << ": deprecated btrfs support is not enabled" << dendl;
1277 return -EPERM;
1278 }
1279#endif
1280
7c673cae
FG
1281 create_backend(st.f_type);
1282
1283 r = backend->detect_features();
1284 if (r < 0) {
31f18b77 1285 derr << __FUNC__ << ": detect_features error: " << cpp_strerror(r) << dendl;
7c673cae
FG
1286 return r;
1287 }
1288
11fdf7f2
TL
1289 // vdo
1290 {
1291 char dev_node[PATH_MAX];
1292 if (int rc = BlkDev{fsid_fd}.wholedisk(dev_node, PATH_MAX); rc == 0) {
1293 vdo_fd = get_vdo_stats_handle(dev_node, &vdo_name);
1294 if (vdo_fd >= 0) {
1295 dout(0) << __func__ << " VDO volume " << vdo_name << " for " << dev_node
1296 << dendl;
1297 }
1298 }
1299 }
1300
7c673cae
FG
1301 // test xattrs
1302 char fn[PATH_MAX];
1303 int x = rand();
1304 int y = x+1;
1305 snprintf(fn, sizeof(fn), "%s/xattr_test", basedir.c_str());
91327a77 1306 int tmpfd = ::open(fn, O_CREAT|O_WRONLY|O_TRUNC|O_CLOEXEC, 0700);
7c673cae
FG
1307 if (tmpfd < 0) {
1308 int ret = -errno;
31f18b77 1309 derr << __FUNC__ << ": unable to create " << fn << ": " << cpp_strerror(ret) << dendl;
7c673cae
FG
1310 return ret;
1311 }
1312
1313 int ret = chain_fsetxattr(tmpfd, "user.test", &x, sizeof(x));
1314 if (ret >= 0)
1315 ret = chain_fgetxattr(tmpfd, "user.test", &y, sizeof(y));
1316 if ((ret < 0) || (x != y)) {
1317 derr << "Extended attributes don't appear to work. ";
1318 if (ret)
1319 *_dout << "Got error " + cpp_strerror(ret) + ". ";
1320 *_dout << "If you are using ext3 or ext4, be sure to mount the underlying "
1321 << "file system with the 'user_xattr' option." << dendl;
1322 ::unlink(fn);
1323 VOID_TEMP_FAILURE_RETRY(::close(tmpfd));
1324 return -ENOTSUP;
1325 }
1326
1327 char buf[1000];
1328 memset(buf, 0, sizeof(buf)); // shut up valgrind
1329 chain_fsetxattr(tmpfd, "user.test", &buf, sizeof(buf));
1330 chain_fsetxattr(tmpfd, "user.test2", &buf, sizeof(buf));
1331 chain_fsetxattr(tmpfd, "user.test3", &buf, sizeof(buf));
1332 chain_fsetxattr(tmpfd, "user.test4", &buf, sizeof(buf));
1333 ret = chain_fsetxattr(tmpfd, "user.test5", &buf, sizeof(buf));
1334 if (ret == -ENOSPC) {
1335 dout(0) << "limited size xattrs" << dendl;
1336 }
1337 chain_fremovexattr(tmpfd, "user.test");
1338 chain_fremovexattr(tmpfd, "user.test2");
1339 chain_fremovexattr(tmpfd, "user.test3");
1340 chain_fremovexattr(tmpfd, "user.test4");
1341 chain_fremovexattr(tmpfd, "user.test5");
1342
1343 ::unlink(fn);
1344 VOID_TEMP_FAILURE_RETRY(::close(tmpfd));
1345
1346 return 0;
1347}
1348
1349int FileStore::_sanity_check_fs()
1350{
1351 // sanity check(s)
1352
1353 if (((int)m_filestore_journal_writeahead +
1354 (int)m_filestore_journal_parallel +
1355 (int)m_filestore_journal_trailing) > 1) {
1356 dout(0) << "mount ERROR: more than one of filestore journal {writeahead,parallel,trailing} enabled" << dendl;
1357 cerr << TEXT_RED
1358 << " ** WARNING: more than one of 'filestore journal {writeahead,parallel,trailing}'\n"
1359 << " is enabled in ceph.conf. You must choose a single journal mode."
1360 << TEXT_NORMAL << std::endl;
1361 return -EINVAL;
1362 }
1363
1364 if (!backend->can_checkpoint()) {
1365 if (!journal || !m_filestore_journal_writeahead) {
1366 dout(0) << "mount WARNING: no btrfs, and no journal in writeahead mode; data may be lost" << dendl;
1367 cerr << TEXT_RED
1368 << " ** WARNING: no btrfs AND (no journal OR journal not in writeahead mode)\n"
1369 << " For non-btrfs volumes, a writeahead journal is required to\n"
1370 << " maintain on-disk consistency in the event of a crash. Your conf\n"
1371 << " should include something like:\n"
1372 << " osd journal = /path/to/journal_device_or_file\n"
1373 << " filestore journal writeahead = true\n"
1374 << TEXT_NORMAL;
1375 }
1376 }
1377
1378 if (!journal) {
1379 dout(0) << "mount WARNING: no journal" << dendl;
1380 cerr << TEXT_YELLOW
1381 << " ** WARNING: No osd journal is configured: write latency may be high.\n"
1382 << " If you will not be using an osd journal, write latency may be\n"
1383 << " relatively high. It can be reduced somewhat by lowering\n"
1384 << " filestore_max_sync_interval, but lower values mean lower write\n"
1385 << " throughput, especially with spinning disks.\n"
1386 << TEXT_NORMAL;
1387 }
1388
1389 return 0;
1390}
1391
1392int FileStore::write_superblock()
1393{
1394 bufferlist bl;
11fdf7f2 1395 encode(superblock, bl);
7c673cae 1396 return safe_write_file(basedir.c_str(), "superblock",
eafe8130 1397 bl.c_str(), bl.length(), 0600);
7c673cae
FG
1398}
1399
1400int FileStore::read_superblock()
1401{
1402 bufferptr bp(PATH_MAX);
1403 int ret = safe_read_file(basedir.c_str(), "superblock",
1404 bp.c_str(), bp.length());
1405 if (ret < 0) {
1406 if (ret == -ENOENT) {
1407 // If the file doesn't exist write initial CompatSet
1408 return write_superblock();
1409 }
1410 return ret;
1411 }
1412
1413 bufferlist bl;
1414 bl.push_back(std::move(bp));
11fdf7f2
TL
1415 auto i = bl.cbegin();
1416 decode(superblock, i);
7c673cae
FG
1417 return 0;
1418}
1419
1420int FileStore::update_version_stamp()
1421{
1422 return write_version_stamp();
1423}
1424
1425int FileStore::version_stamp_is_valid(uint32_t *version)
1426{
1427 bufferptr bp(PATH_MAX);
1428 int ret = safe_read_file(basedir.c_str(), "store_version",
1429 bp.c_str(), bp.length());
1430 if (ret < 0) {
1431 return ret;
1432 }
1433 bufferlist bl;
1434 bl.push_back(std::move(bp));
11fdf7f2
TL
1435 auto i = bl.cbegin();
1436 decode(*version, i);
31f18b77 1437 dout(10) << __FUNC__ << ": was " << *version << " vs target "
7c673cae
FG
1438 << target_version << dendl;
1439 if (*version == target_version)
1440 return 1;
1441 else
1442 return 0;
1443}
1444
11fdf7f2
TL
1445int FileStore::flush_cache(ostream *os)
1446{
1447 string drop_caches_file = "/proc/sys/vm/drop_caches";
1448 int drop_caches_fd = ::open(drop_caches_file.c_str(), O_WRONLY|O_CLOEXEC), ret = 0;
1449 char buf[2] = "3";
1450 size_t len = strlen(buf);
1451
1452 if (drop_caches_fd < 0) {
1453 ret = -errno;
1454 derr << __FUNC__ << ": failed to open " << drop_caches_file << ": " << cpp_strerror(ret) << dendl;
1455 if (os) {
1456 *os << "FileStore flush_cache: failed to open " << drop_caches_file << ": " << cpp_strerror(ret);
1457 }
1458 return ret;
1459 }
1460
1461 if (::write(drop_caches_fd, buf, len) < 0) {
1462 ret = -errno;
1463 derr << __FUNC__ << ": failed to write to " << drop_caches_file << ": " << cpp_strerror(ret) << dendl;
1464 if (os) {
1465 *os << "FileStore flush_cache: failed to write to " << drop_caches_file << ": " << cpp_strerror(ret);
1466 }
1467 goto out;
1468 }
1469
1470out:
1471 ::close(drop_caches_fd);
1472 return ret;
1473}
1474
7c673cae
FG
1475int FileStore::write_version_stamp()
1476{
31f18b77 1477 dout(1) << __FUNC__ << ": " << target_version << dendl;
7c673cae 1478 bufferlist bl;
11fdf7f2 1479 encode(target_version, bl);
7c673cae
FG
1480
1481 return safe_write_file(basedir.c_str(), "store_version",
eafe8130 1482 bl.c_str(), bl.length(), 0600);
7c673cae
FG
1483}
1484
1485int FileStore::upgrade()
1486{
31f18b77 1487 dout(1) << __FUNC__ << dendl;
7c673cae
FG
1488 uint32_t version;
1489 int r = version_stamp_is_valid(&version);
1490
1491 if (r == -ENOENT) {
1492 derr << "The store_version file doesn't exist." << dendl;
1493 return -EINVAL;
1494 }
1495 if (r < 0)
1496 return r;
1497 if (r == 1)
1498 return 0;
1499
1500 if (version < 3) {
1501 derr << "ObjectStore is old at version " << version << ". Please upgrade to firefly v0.80.x, convert your store, and then upgrade." << dendl;
1502 return -EINVAL;
1503 }
1504
1505 // nothing necessary in FileStore for v3 -> v4 upgrade; we just need to
1506 // open up DBObjectMap with the do_upgrade flag, which we already did.
1507 update_version_stamp();
1508 return 0;
1509}
1510
1511int FileStore::read_op_seq(uint64_t *seq)
1512{
91327a77 1513 int op_fd = ::open(current_op_seq_fn.c_str(), O_CREAT|O_RDWR|O_CLOEXEC, 0644);
7c673cae
FG
1514 if (op_fd < 0) {
1515 int r = -errno;
11fdf7f2 1516 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
1517 return r;
1518 }
1519 char s[40];
1520 memset(s, 0, sizeof(s));
1521 int ret = safe_read(op_fd, s, sizeof(s) - 1);
1522 if (ret < 0) {
31f18b77 1523 derr << __FUNC__ << ": error reading " << current_op_seq_fn << ": " << cpp_strerror(ret) << dendl;
7c673cae 1524 VOID_TEMP_FAILURE_RETRY(::close(op_fd));
11fdf7f2 1525 ceph_assert(!m_filestore_fail_eio || ret != -EIO);
7c673cae
FG
1526 return ret;
1527 }
1528 *seq = atoll(s);
1529 return op_fd;
1530}
1531
1532int FileStore::write_op_seq(int fd, uint64_t seq)
1533{
1534 char s[30];
1535 snprintf(s, sizeof(s), "%" PRId64 "\n", seq);
1536 int ret = TEMP_FAILURE_RETRY(::pwrite(fd, s, strlen(s), 0));
1537 if (ret < 0) {
1538 ret = -errno;
11fdf7f2 1539 ceph_assert(!m_filestore_fail_eio || ret != -EIO);
7c673cae
FG
1540 }
1541 return ret;
1542}
1543
1544int FileStore::mount()
1545{
1546 int ret;
1547 char buf[PATH_MAX];
1548 uint64_t initial_op_seq;
1549 uuid_d omap_fsid;
1550 set<string> cluster_snaps;
1551 CompatSet supported_compat_set = get_fs_supported_compat_set();
1552
1553 dout(5) << "basedir " << basedir << " journal " << journalpath << dendl;
1554
1555 ret = set_throttle_params();
1556 if (ret != 0)
1557 goto done;
1558
1559 // make sure global base dir exists
1560 if (::access(basedir.c_str(), R_OK | W_OK)) {
1561 ret = -errno;
31f18b77 1562 derr << __FUNC__ << ": unable to access basedir '" << basedir << "': "
7c673cae
FG
1563 << cpp_strerror(ret) << dendl;
1564 goto done;
1565 }
1566
1567 // get fsid
1568 snprintf(buf, sizeof(buf), "%s/fsid", basedir.c_str());
91327a77 1569 fsid_fd = ::open(buf, O_RDWR|O_CLOEXEC, 0644);
7c673cae
FG
1570 if (fsid_fd < 0) {
1571 ret = -errno;
31f18b77 1572 derr << __FUNC__ << ": error opening '" << buf << "': "
7c673cae
FG
1573 << cpp_strerror(ret) << dendl;
1574 goto done;
1575 }
1576
1577 ret = read_fsid(fsid_fd, &fsid);
1578 if (ret < 0) {
31f18b77 1579 derr << __FUNC__ << ": error reading fsid_fd: " << cpp_strerror(ret)
7c673cae
FG
1580 << dendl;
1581 goto close_fsid_fd;
1582 }
1583
1584 if (lock_fsid() < 0) {
31f18b77 1585 derr << __FUNC__ << ": lock_fsid failed" << dendl;
7c673cae
FG
1586 ret = -EBUSY;
1587 goto close_fsid_fd;
1588 }
1589
1590 dout(10) << "mount fsid is " << fsid << dendl;
1591
1592
1593 uint32_t version_stamp;
1594 ret = version_stamp_is_valid(&version_stamp);
1595 if (ret < 0) {
31f18b77 1596 derr << __FUNC__ << ": error in version_stamp_is_valid: "
7c673cae
FG
1597 << cpp_strerror(ret) << dendl;
1598 goto close_fsid_fd;
1599 } else if (ret == 0) {
1600 if (do_update || (int)version_stamp < cct->_conf->filestore_update_to) {
31f18b77 1601 derr << __FUNC__ << ": stale version stamp detected: "
7c673cae
FG
1602 << version_stamp
1603 << ". Proceeding, do_update "
1604 << "is set, performing disk format upgrade."
1605 << dendl;
1606 do_update = true;
1607 } else {
1608 ret = -EINVAL;
31f18b77 1609 derr << __FUNC__ << ": stale version stamp " << version_stamp
7c673cae
FG
1610 << ". Please run the FileStore update script before starting the "
1611 << "OSD, or set filestore_update_to to " << target_version
1612 << " (currently " << cct->_conf->filestore_update_to << ")"
1613 << dendl;
1614 goto close_fsid_fd;
1615 }
1616 }
1617
1618 ret = read_superblock();
1619 if (ret < 0) {
1620 goto close_fsid_fd;
1621 }
1622
1623 // Check if this FileStore supports all the necessary features to mount
1624 if (supported_compat_set.compare(superblock.compat_features) == -1) {
31f18b77 1625 derr << __FUNC__ << ": Incompatible features set "
7c673cae
FG
1626 << superblock.compat_features << dendl;
1627 ret = -EINVAL;
1628 goto close_fsid_fd;
1629 }
1630
1631 // open some dir handles
91327a77 1632 basedir_fd = ::open(basedir.c_str(), O_RDONLY|O_CLOEXEC);
7c673cae
FG
1633 if (basedir_fd < 0) {
1634 ret = -errno;
31f18b77 1635 derr << __FUNC__ << ": failed to open " << basedir << ": "
7c673cae
FG
1636 << cpp_strerror(ret) << dendl;
1637 basedir_fd = -1;
1638 goto close_fsid_fd;
1639 }
1640
1641 // test for btrfs, xattrs, etc.
1642 ret = _detect_fs();
1643 if (ret < 0) {
31f18b77 1644 derr << __FUNC__ << ": error in _detect_fs: "
7c673cae
FG
1645 << cpp_strerror(ret) << dendl;
1646 goto close_basedir_fd;
1647 }
1648
1649 {
1650 list<string> ls;
1651 ret = backend->list_checkpoints(ls);
1652 if (ret < 0) {
31f18b77 1653 derr << __FUNC__ << ": error in _list_snaps: "<< cpp_strerror(ret) << dendl;
7c673cae
FG
1654 goto close_basedir_fd;
1655 }
1656
1657 long long unsigned c, prev = 0;
1658 char clustersnap[NAME_MAX];
1659 for (list<string>::iterator it = ls.begin(); it != ls.end(); ++it) {
1660 if (sscanf(it->c_str(), COMMIT_SNAP_ITEM, &c) == 1) {
11fdf7f2 1661 ceph_assert(c > prev);
7c673cae
FG
1662 prev = c;
1663 snaps.push_back(c);
1664 } else if (sscanf(it->c_str(), CLUSTER_SNAP_ITEM, clustersnap) == 1)
1665 cluster_snaps.insert(*it);
1666 }
1667 }
1668
1669 if (m_osd_rollback_to_cluster_snap.length() &&
1670 cluster_snaps.count(m_osd_rollback_to_cluster_snap) == 0) {
1671 derr << "rollback to cluster snapshot '" << m_osd_rollback_to_cluster_snap << "': not found" << dendl;
1672 ret = -ENOENT;
1673 goto close_basedir_fd;
1674 }
1675
1676 char nosnapfn[200];
1677 snprintf(nosnapfn, sizeof(nosnapfn), "%s/nosnap", current_fn.c_str());
1678
1679 if (backend->can_checkpoint()) {
1680 if (snaps.empty()) {
31f18b77 1681 dout(0) << __FUNC__ << ": WARNING: no consistent snaps found, store may be in inconsistent state" << dendl;
7c673cae
FG
1682 } else {
1683 char s[NAME_MAX];
1684 uint64_t curr_seq = 0;
1685
1686 if (m_osd_rollback_to_cluster_snap.length()) {
1687 derr << TEXT_RED
1688 << " ** NOTE: rolling back to cluster snapshot " << m_osd_rollback_to_cluster_snap << " **"
1689 << TEXT_NORMAL
1690 << dendl;
11fdf7f2 1691 ceph_assert(cluster_snaps.count(m_osd_rollback_to_cluster_snap));
7c673cae
FG
1692 snprintf(s, sizeof(s), CLUSTER_SNAP_ITEM, m_osd_rollback_to_cluster_snap.c_str());
1693 } else {
1694 {
1695 int fd = read_op_seq(&curr_seq);
1696 if (fd >= 0) {
1697 VOID_TEMP_FAILURE_RETRY(::close(fd));
1698 }
1699 }
1700 if (curr_seq)
1701 dout(10) << " current/ seq was " << curr_seq << dendl;
1702 else
1703 dout(10) << " current/ missing entirely (unusual, but okay)" << dendl;
1704
1705 uint64_t cp = snaps.back();
1706 dout(10) << " most recent snap from " << snaps << " is " << cp << dendl;
1707
1708 // if current/ is marked as non-snapshotted, refuse to roll
1709 // back (without clear direction) to avoid throwing out new
1710 // data.
1711 struct stat st;
1712 if (::stat(nosnapfn, &st) == 0) {
1713 if (!m_osd_use_stale_snap) {
1714 derr << "ERROR: " << nosnapfn << " exists, not rolling back to avoid losing new data" << dendl;
1715 derr << "Force rollback to old snapshotted version with 'osd use stale snap = true'" << dendl;
1716 derr << "config option for --osd-use-stale-snap startup argument." << dendl;
1717 ret = -ENOTSUP;
1718 goto close_basedir_fd;
1719 }
1720 derr << "WARNING: user forced start with data sequence mismatch: current was " << curr_seq
1721 << ", newest snap is " << cp << dendl;
1722 cerr << TEXT_YELLOW
1723 << " ** WARNING: forcing the use of stale snapshot data **"
1724 << TEXT_NORMAL << std::endl;
1725 }
1726
31f18b77 1727 dout(10) << __FUNC__ << ": rolling back to consistent snap " << cp << dendl;
7c673cae
FG
1728 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)cp);
1729 }
1730
1731 // drop current?
1732 ret = backend->rollback_to(s);
1733 if (ret) {
31f18b77 1734 derr << __FUNC__ << ": error rolling back to " << s << ": "
7c673cae
FG
1735 << cpp_strerror(ret) << dendl;
1736 goto close_basedir_fd;
1737 }
1738 }
1739 }
1740 initial_op_seq = 0;
1741
91327a77 1742 current_fd = ::open(current_fn.c_str(), O_RDONLY|O_CLOEXEC);
7c673cae
FG
1743 if (current_fd < 0) {
1744 ret = -errno;
31f18b77 1745 derr << __FUNC__ << ": error opening: " << current_fn << ": " << cpp_strerror(ret) << dendl;
7c673cae
FG
1746 goto close_basedir_fd;
1747 }
1748
11fdf7f2 1749 ceph_assert(current_fd >= 0);
7c673cae
FG
1750
1751 op_fd = read_op_seq(&initial_op_seq);
1752 if (op_fd < 0) {
1753 ret = op_fd;
31f18b77 1754 derr << __FUNC__ << ": read_op_seq failed" << dendl;
7c673cae
FG
1755 goto close_current_fd;
1756 }
1757
1758 dout(5) << "mount op_seq is " << initial_op_seq << dendl;
1759 if (initial_op_seq == 0) {
1760 derr << "mount initial op seq is 0; something is wrong" << dendl;
1761 ret = -EINVAL;
1762 goto close_current_fd;
1763 }
1764
1765 if (!backend->can_checkpoint()) {
1766 // mark current/ as non-snapshotted so that we don't rollback away
1767 // from it.
1768 int r = ::creat(nosnapfn, 0644);
1769 if (r < 0) {
1770 ret = -errno;
31f18b77 1771 derr << __FUNC__ << ": failed to create current/nosnap" << dendl;
7c673cae
FG
1772 goto close_current_fd;
1773 }
1774 VOID_TEMP_FAILURE_RETRY(::close(r));
1775 } else {
1776 // clear nosnap marker, if present.
1777 ::unlink(nosnapfn);
1778 }
1779
1780 // check fsid with omap
1781 // get omap fsid
7c673cae
FG
1782 char omap_fsid_buf[PATH_MAX];
1783 struct ::stat omap_fsid_stat;
1784 snprintf(omap_fsid_buf, sizeof(omap_fsid_buf), "%s/osd_uuid", omap_dir.c_str());
1785 // if osd_uuid not exists, assume as this omap matchs corresponding osd
1786 if (::stat(omap_fsid_buf, &omap_fsid_stat) != 0){
31f18b77 1787 dout(10) << __FUNC__ << ": osd_uuid not found under omap, "
7c673cae
FG
1788 << "assume as matched."
1789 << dendl;
11fdf7f2
TL
1790 } else {
1791 int omap_fsid_fd;
7c673cae 1792 // if osd_uuid exists, compares osd_uuid with fsid
91327a77 1793 omap_fsid_fd = ::open(omap_fsid_buf, O_RDONLY|O_CLOEXEC, 0644);
7c673cae
FG
1794 if (omap_fsid_fd < 0) {
1795 ret = -errno;
31f18b77 1796 derr << __FUNC__ << ": error opening '" << omap_fsid_buf << "': "
7c673cae
FG
1797 << cpp_strerror(ret)
1798 << dendl;
1799 goto close_current_fd;
1800 }
1801 ret = read_fsid(omap_fsid_fd, &omap_fsid);
1802 VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd));
7c673cae 1803 if (ret < 0) {
31f18b77 1804 derr << __FUNC__ << ": error reading omap_fsid_fd"
7c673cae
FG
1805 << ", omap_fsid = " << omap_fsid
1806 << cpp_strerror(ret)
1807 << dendl;
1808 goto close_current_fd;
1809 }
1810 if (fsid != omap_fsid) {
31f18b77 1811 derr << __FUNC__ << ": " << omap_fsid_buf
7c673cae
FG
1812 << " has existed omap fsid " << omap_fsid
1813 << " != expected osd fsid " << fsid
1814 << dendl;
1815 ret = -EINVAL;
1816 goto close_current_fd;
1817 }
1818 }
1819
1820 dout(0) << "start omap initiation" << dendl;
1821 if (!(generic_flags & SKIP_MOUNT_OMAP)) {
1822 KeyValueDB * omap_store = KeyValueDB::create(cct,
1823 superblock.omap_backend,
1824 omap_dir);
11fdf7f2 1825 if (!omap_store)
7c673cae 1826 {
31f18b77 1827 derr << __FUNC__ << ": Error creating " << superblock.omap_backend << dendl;
7c673cae
FG
1828 ret = -1;
1829 goto close_current_fd;
1830 }
1831
1832 if (superblock.omap_backend == "rocksdb")
1833 ret = omap_store->init(cct->_conf->filestore_rocksdb_options);
1834 else
1835 ret = omap_store->init();
1836
1837 if (ret < 0) {
31f18b77 1838 derr << __FUNC__ << ": Error initializing omap_store: " << cpp_strerror(ret) << dendl;
7c673cae
FG
1839 goto close_current_fd;
1840 }
1841
1842 stringstream err;
1843 if (omap_store->create_and_open(err)) {
1844 delete omap_store;
11fdf7f2 1845 omap_store = nullptr;
31f18b77 1846 derr << __FUNC__ << ": Error initializing " << superblock.omap_backend
7c673cae
FG
1847 << " : " << err.str() << dendl;
1848 ret = -1;
1849 goto close_current_fd;
1850 }
1851
1852 DBObjectMap *dbomap = new DBObjectMap(cct, omap_store);
1853 ret = dbomap->init(do_update);
1854 if (ret < 0) {
1855 delete dbomap;
11fdf7f2 1856 dbomap = nullptr;
31f18b77 1857 derr << __FUNC__ << ": Error initializing DBObjectMap: " << ret << dendl;
7c673cae
FG
1858 goto close_current_fd;
1859 }
1860 stringstream err2;
1861
1862 if (cct->_conf->filestore_debug_omap_check && !dbomap->check(err2)) {
1863 derr << err2.str() << dendl;
1864 delete dbomap;
11fdf7f2 1865 dbomap = nullptr;
7c673cae
FG
1866 ret = -EINVAL;
1867 goto close_current_fd;
1868 }
1869 object_map.reset(dbomap);
1870 }
1871
1872 // journal
1873 new_journal();
1874
1875 // select journal mode?
1876 if (journal) {
1877 if (!m_filestore_journal_writeahead &&
1878 !m_filestore_journal_parallel &&
1879 !m_filestore_journal_trailing) {
1880 if (!backend->can_checkpoint()) {
1881 m_filestore_journal_writeahead = true;
31f18b77 1882 dout(0) << __FUNC__ << ": enabling WRITEAHEAD journal mode: checkpoint is not enabled" << dendl;
7c673cae
FG
1883 } else {
1884 m_filestore_journal_parallel = true;
31f18b77 1885 dout(0) << __FUNC__ << ": enabling PARALLEL journal mode: fs, checkpoint is enabled" << dendl;
7c673cae
FG
1886 }
1887 } else {
1888 if (m_filestore_journal_writeahead)
31f18b77 1889 dout(0) << __FUNC__ << ": WRITEAHEAD journal mode explicitly enabled in conf" << dendl;
7c673cae 1890 if (m_filestore_journal_parallel)
31f18b77 1891 dout(0) << __FUNC__ << ": PARALLEL journal mode explicitly enabled in conf" << dendl;
7c673cae 1892 if (m_filestore_journal_trailing)
31f18b77 1893 dout(0) << __FUNC__ << ": TRAILING journal mode explicitly enabled in conf" << dendl;
7c673cae
FG
1894 }
1895 if (m_filestore_journal_writeahead)
1896 journal->set_wait_on_full(true);
1897 } else {
31f18b77 1898 dout(0) << __FUNC__ << ": no journal" << dendl;
7c673cae
FG
1899 }
1900
1901 ret = _sanity_check_fs();
1902 if (ret) {
31f18b77 1903 derr << __FUNC__ << ": _sanity_check_fs failed with error "
7c673cae
FG
1904 << ret << dendl;
1905 goto close_current_fd;
1906 }
1907
1908 // Cleanup possibly invalid collections
1909 {
1910 vector<coll_t> collections;
1911 ret = list_collections(collections, true);
1912 if (ret < 0) {
1913 derr << "Error " << ret << " while listing collections" << dendl;
1914 goto close_current_fd;
1915 }
1916 for (vector<coll_t>::iterator i = collections.begin();
1917 i != collections.end();
1918 ++i) {
1919 Index index;
1920 ret = get_index(*i, &index);
1921 if (ret < 0) {
1922 derr << "Unable to mount index " << *i
1923 << " with error: " << ret << dendl;
1924 goto close_current_fd;
1925 }
11fdf7f2 1926 ceph_assert(index.index);
9f95a23c 1927 std::unique_lock l{(index.index)->access_lock};
7c673cae
FG
1928
1929 index->cleanup();
1930 }
1931 }
1932 if (!m_disable_wbthrottle) {
1933 wbthrottle.start();
1934 } else {
31f18b77 1935 dout(0) << __FUNC__ << ": INFO: WbThrottle is disabled" << dendl;
7c673cae 1936 if (cct->_conf->filestore_odsync_write) {
31f18b77 1937 dout(0) << __FUNC__ << ": INFO: O_DSYNC write is enabled" << dendl;
7c673cae
FG
1938 }
1939 }
1940 sync_thread.create("filestore_sync");
1941
1942 if (!(generic_flags & SKIP_JOURNAL_REPLAY)) {
1943 ret = journal_replay(initial_op_seq);
1944 if (ret < 0) {
31f18b77 1945 derr << __FUNC__ << ": failed to open journal " << journalpath << ": " << cpp_strerror(ret) << dendl;
7c673cae
FG
1946 if (ret == -ENOTTY) {
1947 derr << "maybe journal is not pointing to a block device and its size "
1948 << "wasn't configured?" << dendl;
1949 }
1950
1951 goto stop_sync;
1952 }
1953 }
1954
1955 {
1956 stringstream err2;
1957 if (cct->_conf->filestore_debug_omap_check && !object_map->check(err2)) {
1958 derr << err2.str() << dendl;
1959 ret = -EINVAL;
1960 goto stop_sync;
1961 }
1962 }
1963
1964 init_temp_collections();
1965
1966 journal_start();
1967
1968 op_tp.start();
1969 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
1970 (*it)->start();
1971 }
1972 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
1973 (*it)->start();
1974 }
1975
1976 timer.init();
1977
1978 // upgrade?
1979 if (cct->_conf->filestore_update_to >= (int)get_target_version()) {
1980 int err = upgrade();
1981 if (err < 0) {
1982 derr << "error converting store" << dendl;
1983 umount();
1984 return err;
1985 }
1986 }
1987
1988 // all okay.
1989 return 0;
1990
1991stop_sync:
1992 // stop sync thread
9f95a23c
TL
1993 {
1994 std::lock_guard l{lock};
1995 stop = true;
1996 sync_cond.notify_all();
1997 }
7c673cae
FG
1998 sync_thread.join();
1999 if (!m_disable_wbthrottle) {
2000 wbthrottle.stop();
2001 }
2002close_current_fd:
2003 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
2004 current_fd = -1;
2005close_basedir_fd:
2006 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
2007 basedir_fd = -1;
2008close_fsid_fd:
2009 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
2010 fsid_fd = -1;
2011done:
11fdf7f2 2012 ceph_assert(!m_filestore_fail_eio || ret != -EIO);
7c673cae 2013 delete backend;
11fdf7f2 2014 backend = nullptr;
7c673cae
FG
2015 object_map.reset();
2016 return ret;
2017}
2018
2019void FileStore::init_temp_collections()
2020{
31f18b77 2021 dout(10) << __FUNC__ << dendl;
7c673cae
FG
2022 vector<coll_t> ls;
2023 int r = list_collections(ls, true);
11fdf7f2 2024 ceph_assert(r >= 0);
7c673cae
FG
2025
2026 dout(20) << " ls " << ls << dendl;
2027
2028 SequencerPosition spos;
2029
2030 set<coll_t> temps;
2031 for (vector<coll_t>::iterator p = ls.begin(); p != ls.end(); ++p)
2032 if (p->is_temp())
2033 temps.insert(*p);
2034 dout(20) << " temps " << temps << dendl;
2035
2036 for (vector<coll_t>::iterator p = ls.begin(); p != ls.end(); ++p) {
2037 if (p->is_temp())
2038 continue;
9f95a23c 2039 coll_map[*p] = ceph::make_ref<OpSequencer>(cct, ++next_osr_id, *p);
7c673cae
FG
2040 if (p->is_meta())
2041 continue;
2042 coll_t temp = p->get_temp();
2043 if (temps.count(temp)) {
2044 temps.erase(temp);
2045 } else {
31f18b77 2046 dout(10) << __FUNC__ << ": creating " << temp << dendl;
7c673cae 2047 r = _create_collection(temp, 0, spos);
11fdf7f2 2048 ceph_assert(r == 0);
7c673cae
FG
2049 }
2050 }
2051
2052 for (set<coll_t>::iterator p = temps.begin(); p != temps.end(); ++p) {
31f18b77 2053 dout(10) << __FUNC__ << ": removing stray " << *p << dendl;
7c673cae 2054 r = _collection_remove_recursive(*p, spos);
11fdf7f2 2055 ceph_assert(r == 0);
7c673cae
FG
2056 }
2057}
2058
2059int FileStore::umount()
2060{
31f18b77 2061 dout(5) << __FUNC__ << ": " << basedir << dendl;
7c673cae
FG
2062
2063 flush();
2064 sync();
2065 do_force_sync();
2066
11fdf7f2 2067 {
9f95a23c 2068 std::lock_guard l(coll_lock);
11fdf7f2
TL
2069 coll_map.clear();
2070 }
2071
9f95a23c
TL
2072 {
2073 std::lock_guard l{lock};
2074 stop = true;
2075 sync_cond.notify_all();
2076 }
7c673cae
FG
2077 sync_thread.join();
2078 if (!m_disable_wbthrottle){
2079 wbthrottle.stop();
2080 }
2081 op_tp.stop();
2082
2083 journal_stop();
2084 if (!(generic_flags & SKIP_JOURNAL_REPLAY))
2085 journal_write_close();
2086
2087 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
2088 (*it)->stop();
2089 }
2090 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
2091 (*it)->stop();
2092 }
2093
11fdf7f2
TL
2094 if (vdo_fd >= 0) {
2095 VOID_TEMP_FAILURE_RETRY(::close(vdo_fd));
2096 vdo_fd = -1;
2097 }
7c673cae
FG
2098 if (fsid_fd >= 0) {
2099 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
2100 fsid_fd = -1;
2101 }
2102 if (op_fd >= 0) {
2103 VOID_TEMP_FAILURE_RETRY(::close(op_fd));
2104 op_fd = -1;
2105 }
2106 if (current_fd >= 0) {
2107 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
2108 current_fd = -1;
2109 }
2110 if (basedir_fd >= 0) {
2111 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
2112 basedir_fd = -1;
2113 }
2114
2115 force_sync = false;
2116
2117 delete backend;
11fdf7f2 2118 backend = nullptr;
7c673cae
FG
2119
2120 object_map.reset();
2121
2122 {
9f95a23c 2123 std::lock_guard l{sync_entry_timeo_lock};
7c673cae
FG
2124 timer.shutdown();
2125 }
2126
2127 // nothing
2128 return 0;
2129}
2130
2131
11fdf7f2
TL
2132/// -----------------------------
2133
2134// keep OpSequencer handles alive for all time so that a sequence
2135// that removes a collection and creates a new one will not allow
2136// two sequencers for the same collection to be alive at once.
2137
2138ObjectStore::CollectionHandle FileStore::open_collection(const coll_t& c)
2139{
9f95a23c 2140 std::lock_guard l{coll_lock};
11fdf7f2
TL
2141 auto p = coll_map.find(c);
2142 if (p == coll_map.end()) {
2143 return CollectionHandle();
2144 }
2145 return p->second;
2146}
2147
2148ObjectStore::CollectionHandle FileStore::create_new_collection(const coll_t& c)
2149{
9f95a23c 2150 std::lock_guard l{coll_lock};
11fdf7f2
TL
2151 auto p = coll_map.find(c);
2152 if (p == coll_map.end()) {
9f95a23c 2153 auto r = ceph::make_ref<OpSequencer>(cct, ++next_osr_id, c);
11fdf7f2
TL
2154 coll_map[c] = r;
2155 return r;
2156 } else {
2157 return p->second;
2158 }
2159}
7c673cae
FG
2160
2161
2162/// -----------------------------
2163
2164FileStore::Op *FileStore::build_op(vector<Transaction>& tls,
2165 Context *onreadable,
2166 Context *onreadable_sync,
2167 TrackedOpRef osd_op)
2168{
2169 uint64_t bytes = 0, ops = 0;
2170 for (vector<Transaction>::iterator p = tls.begin();
2171 p != tls.end();
2172 ++p) {
2173 bytes += (*p).get_num_bytes();
2174 ops += (*p).get_num_ops();
2175 }
2176
2177 Op *o = new Op;
2178 o->start = ceph_clock_now();
2179 o->tls = std::move(tls);
2180 o->onreadable = onreadable;
2181 o->onreadable_sync = onreadable_sync;
2182 o->ops = ops;
2183 o->bytes = bytes;
2184 o->osd_op = osd_op;
2185 return o;
2186}
2187
2188
2189
2190void FileStore::queue_op(OpSequencer *osr, Op *o)
2191{
2192 // queue op on sequencer, then queue sequencer for the threadpool,
2193 // so that regardless of which order the threads pick up the
2194 // sequencer, the op order will be preserved.
2195
2196 osr->queue(o);
2197 o->trace.event("queued");
2198
2199 logger->inc(l_filestore_ops);
2200 logger->inc(l_filestore_bytes, o->bytes);
2201
31f18b77 2202 dout(5) << __FUNC__ << ": " << o << " seq " << o->op
7c673cae
FG
2203 << " " << *osr
2204 << " " << o->bytes << " bytes"
2205 << " (queue has " << throttle_ops.get_current() << " ops and " << throttle_bytes.get_current() << " bytes)"
2206 << dendl;
2207 op_wq.queue(osr);
2208}
2209
2210void FileStore::op_queue_reserve_throttle(Op *o)
2211{
2212 throttle_ops.get();
2213 throttle_bytes.get(o->bytes);
2214
2215 logger->set(l_filestore_op_queue_ops, throttle_ops.get_current());
2216 logger->set(l_filestore_op_queue_bytes, throttle_bytes.get_current());
2217}
2218
2219void FileStore::op_queue_release_throttle(Op *o)
2220{
2221 throttle_ops.put();
2222 throttle_bytes.put(o->bytes);
2223 logger->set(l_filestore_op_queue_ops, throttle_ops.get_current());
2224 logger->set(l_filestore_op_queue_bytes, throttle_bytes.get_current());
2225}
2226
2227void FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)
2228{
2229 if (!m_disable_wbthrottle) {
2230 wbthrottle.throttle();
2231 }
2232 // inject a stall?
2233 if (cct->_conf->filestore_inject_stall) {
2234 int orig = cct->_conf->filestore_inject_stall;
31f18b77 2235 dout(5) << __FUNC__ << ": filestore_inject_stall " << orig << ", sleeping" << dendl;
7c673cae 2236 sleep(orig);
11fdf7f2 2237 cct->_conf.set_val("filestore_inject_stall", "0");
31f18b77 2238 dout(5) << __FUNC__ << ": done stalling" << dendl;
7c673cae
FG
2239 }
2240
9f95a23c 2241 osr->apply_lock.lock();
7c673cae
FG
2242 Op *o = osr->peek_queue();
2243 o->trace.event("op_apply_start");
2244 apply_manager.op_apply_start(o->op);
11fdf7f2 2245 dout(5) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << " start" << dendl;
7c673cae 2246 o->trace.event("_do_transactions start");
11fdf7f2 2247 int r = _do_transactions(o->tls, o->op, &handle, osr->osr_name);
7c673cae
FG
2248 o->trace.event("op_apply_finish");
2249 apply_manager.op_apply_finish(o->op);
31f18b77 2250 dout(10) << __FUNC__ << ": " << o << " seq " << o->op << " r = " << r
7c673cae 2251 << ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl;
7c673cae
FG
2252}
2253
2254void FileStore::_finish_op(OpSequencer *osr)
2255{
2256 list<Context*> to_queue;
2257 Op *o = osr->dequeue(&to_queue);
2258
11fdf7f2
TL
2259 o->tls.clear();
2260
7c673cae
FG
2261 utime_t lat = ceph_clock_now();
2262 lat -= o->start;
2263
11fdf7f2 2264 dout(10) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << " lat " << lat << dendl;
9f95a23c 2265 osr->apply_lock.unlock(); // locked in _do_op
7c673cae
FG
2266 o->trace.event("_finish_op");
2267
2268 // called with tp lock held
2269 op_queue_release_throttle(o);
2270
2271 logger->tinc(l_filestore_apply_latency, lat);
2272
2273 if (o->onreadable_sync) {
2274 o->onreadable_sync->complete(0);
2275 }
2276 if (o->onreadable) {
2277 apply_finishers[osr->id % m_apply_finisher_num]->queue(o->onreadable);
2278 }
2279 if (!to_queue.empty()) {
2280 apply_finishers[osr->id % m_apply_finisher_num]->queue(to_queue);
2281 }
2282 delete o;
11fdf7f2 2283 o = nullptr;
7c673cae
FG
2284}
2285
7c673cae
FG
2286struct C_JournaledAhead : public Context {
2287 FileStore *fs;
2288 FileStore::OpSequencer *osr;
2289 FileStore::Op *o;
2290 Context *ondisk;
2291
2292 C_JournaledAhead(FileStore *f, FileStore::OpSequencer *os, FileStore::Op *o, Context *ondisk):
2293 fs(f), osr(os), o(o), ondisk(ondisk) { }
2294 void finish(int r) override {
2295 fs->_journaled_ahead(osr, o, ondisk);
2296 }
2297};
2298
11fdf7f2 2299int FileStore::queue_transactions(CollectionHandle& ch, vector<Transaction>& tls,
7c673cae
FG
2300 TrackedOpRef osd_op,
2301 ThreadPool::TPHandle *handle)
2302{
2303 Context *onreadable;
2304 Context *ondisk;
2305 Context *onreadable_sync;
2306 ObjectStore::Transaction::collect_contexts(
2307 tls, &onreadable, &ondisk, &onreadable_sync);
2308
2309 if (cct->_conf->objectstore_blackhole) {
31f18b77 2310 dout(0) << __FUNC__ << ": objectstore_blackhole = TRUE, dropping transaction"
7c673cae
FG
2311 << dendl;
2312 delete ondisk;
11fdf7f2 2313 ondisk = nullptr;
7c673cae 2314 delete onreadable;
11fdf7f2 2315 onreadable = nullptr;
7c673cae 2316 delete onreadable_sync;
11fdf7f2 2317 onreadable_sync = nullptr;
7c673cae
FG
2318 return 0;
2319 }
2320
2321 utime_t start = ceph_clock_now();
7c673cae 2322
11fdf7f2
TL
2323 OpSequencer *osr = static_cast<OpSequencer*>(ch.get());
2324 dout(5) << __FUNC__ << ": osr " << osr << " " << *osr << dendl;
7c673cae
FG
2325
2326 ZTracer::Trace trace;
2327 if (osd_op && osd_op->pg_trace) {
2328 osd_op->store_trace.init("filestore op", &trace_endpoint, &osd_op->pg_trace);
2329 trace = osd_op->store_trace;
2330 }
2331
2332 if (journal && journal->is_writeable() && !m_filestore_journal_trailing) {
2333 Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
2334
2335 //prepare and encode transactions data out of lock
2336 bufferlist tbl;
2337 int orig_len = journal->prepare_entry(o->tls, &tbl);
2338
2339 if (handle)
2340 handle->suspend_tp_timeout();
2341
2342 op_queue_reserve_throttle(o);
2343 journal->reserve_throttle_and_backoff(tbl.length());
2344
2345 if (handle)
2346 handle->reset_tp_timeout();
2347
2348 uint64_t op_num = submit_manager.op_submit_start();
2349 o->op = op_num;
2350 trace.keyval("opnum", op_num);
2351
2352 if (m_filestore_do_dump)
2353 dump_transactions(o->tls, o->op, osr);
2354
2355 if (m_filestore_journal_parallel) {
31f18b77 2356 dout(5) << __FUNC__ << ": (parallel) " << o->op << " " << o->tls << dendl;
7c673cae
FG
2357
2358 trace.keyval("journal mode", "parallel");
2359 trace.event("journal started");
2360 _op_journal_transactions(tbl, orig_len, o->op, ondisk, osd_op);
2361
2362 // queue inside submit_manager op submission lock
2363 queue_op(osr, o);
2364 trace.event("op queued");
2365 } else if (m_filestore_journal_writeahead) {
31f18b77 2366 dout(5) << __FUNC__ << ": (writeahead) " << o->op << " " << o->tls << dendl;
7c673cae 2367
11fdf7f2 2368 osr->queue_journal(o);
7c673cae
FG
2369
2370 trace.keyval("journal mode", "writeahead");
2371 trace.event("journal started");
2372 _op_journal_transactions(tbl, orig_len, o->op,
2373 new C_JournaledAhead(this, osr, o, ondisk),
2374 osd_op);
2375 } else {
2376 ceph_abort();
2377 }
2378 submit_manager.op_submit_finish(op_num);
2379 utime_t end = ceph_clock_now();
2380 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2381 return 0;
2382 }
2383
2384 if (!journal) {
2385 Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
31f18b77 2386 dout(5) << __FUNC__ << ": (no journal) " << o << " " << tls << dendl;
7c673cae
FG
2387
2388 if (handle)
2389 handle->suspend_tp_timeout();
2390
2391 op_queue_reserve_throttle(o);
2392
2393 if (handle)
2394 handle->reset_tp_timeout();
2395
2396 uint64_t op_num = submit_manager.op_submit_start();
2397 o->op = op_num;
2398
2399 if (m_filestore_do_dump)
2400 dump_transactions(o->tls, o->op, osr);
2401
2402 queue_op(osr, o);
2403 trace.keyval("opnum", op_num);
2404 trace.keyval("journal mode", "none");
2405 trace.event("op queued");
2406
2407 if (ondisk)
2408 apply_manager.add_waiter(op_num, ondisk);
2409 submit_manager.op_submit_finish(op_num);
2410 utime_t end = ceph_clock_now();
2411 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2412 return 0;
2413 }
2414
11fdf7f2 2415 ceph_assert(journal);
7c673cae
FG
2416 //prepare and encode transactions data out of lock
2417 bufferlist tbl;
2418 int orig_len = -1;
2419 if (journal->is_writeable()) {
2420 orig_len = journal->prepare_entry(tls, &tbl);
2421 }
2422 uint64_t op = submit_manager.op_submit_start();
31f18b77 2423 dout(5) << __FUNC__ << ": (trailing journal) " << op << " " << tls << dendl;
7c673cae
FG
2424
2425 if (m_filestore_do_dump)
2426 dump_transactions(tls, op, osr);
2427
2428 trace.event("op_apply_start");
2429 trace.keyval("opnum", op);
2430 trace.keyval("journal mode", "trailing");
2431 apply_manager.op_apply_start(op);
2432 trace.event("do_transactions");
2433 int r = do_transactions(tls, op);
2434
2435 if (r >= 0) {
2436 trace.event("journal started");
2437 _op_journal_transactions(tbl, orig_len, op, ondisk, osd_op);
2438 } else {
2439 delete ondisk;
11fdf7f2 2440 ondisk = nullptr;
7c673cae
FG
2441 }
2442
2443 // start on_readable finisher after we queue journal item, as on_readable callback
2444 // is allowed to delete the Transaction
2445 if (onreadable_sync) {
2446 onreadable_sync->complete(r);
2447 }
2448 apply_finishers[osr->id % m_apply_finisher_num]->queue(onreadable, r);
2449
2450 submit_manager.op_submit_finish(op);
2451 trace.event("op_apply_finish");
2452 apply_manager.op_apply_finish(op);
2453
2454 utime_t end = ceph_clock_now();
2455 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2456 return r;
2457}
2458
2459void FileStore::_journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk)
2460{
31f18b77 2461 dout(5) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << " " << o->tls << dendl;
7c673cae
FG
2462
2463 o->trace.event("writeahead journal finished");
2464
2465 // this should queue in order because the journal does it's completions in order.
2466 queue_op(osr, o);
2467
2468 list<Context*> to_queue;
2469 osr->dequeue_journal(&to_queue);
2470
2471 // do ondisk completions async, to prevent any onreadable_sync completions
2472 // getting blocked behind an ondisk completion.
2473 if (ondisk) {
2474 dout(10) << " queueing ondisk " << ondisk << dendl;
2475 ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(ondisk);
2476 }
2477 if (!to_queue.empty()) {
2478 ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(to_queue);
2479 }
2480}
2481
2482int FileStore::_do_transactions(
2483 vector<Transaction> &tls,
2484 uint64_t op_seq,
11fdf7f2
TL
2485 ThreadPool::TPHandle *handle,
2486 const char *osr_name)
7c673cae
FG
2487{
2488 int trans_num = 0;
2489
2490 for (vector<Transaction>::iterator p = tls.begin();
2491 p != tls.end();
2492 ++p, trans_num++) {
11fdf7f2 2493 _do_transaction(*p, op_seq, trans_num, handle, osr_name);
7c673cae
FG
2494 if (handle)
2495 handle->reset_tp_timeout();
2496 }
2497
2498 return 0;
2499}
2500
2501void FileStore::_set_global_replay_guard(const coll_t& cid,
2502 const SequencerPosition &spos)
2503{
2504 if (backend->can_checkpoint())
2505 return;
2506
2507 // sync all previous operations on this sequencer
2508 int ret = object_map->sync();
2509 if (ret < 0) {
31f18b77 2510 derr << __FUNC__ << ": omap sync error " << cpp_strerror(ret) << dendl;
11fdf7f2 2511 ceph_abort_msg("_set_global_replay_guard failed");
7c673cae
FG
2512 }
2513 ret = sync_filesystem(basedir_fd);
2514 if (ret < 0) {
31f18b77 2515 derr << __FUNC__ << ": sync_filesystem error " << cpp_strerror(ret) << dendl;
11fdf7f2 2516 ceph_abort_msg("_set_global_replay_guard failed");
7c673cae
FG
2517 }
2518
2519 char fn[PATH_MAX];
2520 get_cdir(cid, fn, sizeof(fn));
91327a77 2521 int fd = ::open(fn, O_RDONLY|O_CLOEXEC);
7c673cae
FG
2522 if (fd < 0) {
2523 int err = errno;
31f18b77 2524 derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
11fdf7f2 2525 ceph_abort_msg("_set_global_replay_guard failed");
7c673cae
FG
2526 }
2527
2528 _inject_failure();
2529
2530 // then record that we did it
2531 bufferlist v;
11fdf7f2 2532 encode(spos, v);
7c673cae
FG
2533 int r = chain_fsetxattr<true, true>(
2534 fd, GLOBAL_REPLAY_GUARD_XATTR, v.c_str(), v.length());
2535 if (r < 0) {
31f18b77 2536 derr << __FUNC__ << ": fsetxattr " << GLOBAL_REPLAY_GUARD_XATTR
7c673cae 2537 << " got " << cpp_strerror(r) << dendl;
11fdf7f2 2538 ceph_abort_msg("fsetxattr failed");
7c673cae
FG
2539 }
2540
2541 // and make sure our xattr is durable.
a8e16298
TL
2542 r = ::fsync(fd);
2543 if (r < 0) {
2544 derr << __func__ << " fsync failed: " << cpp_strerror(errno) << dendl;
2545 ceph_abort();
2546 }
7c673cae
FG
2547
2548 _inject_failure();
2549
2550 VOID_TEMP_FAILURE_RETRY(::close(fd));
31f18b77 2551 dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
7c673cae
FG
2552}
2553
2554int FileStore::_check_global_replay_guard(const coll_t& cid,
2555 const SequencerPosition& spos)
2556{
2557 char fn[PATH_MAX];
2558 get_cdir(cid, fn, sizeof(fn));
91327a77 2559 int fd = ::open(fn, O_RDONLY|O_CLOEXEC);
7c673cae 2560 if (fd < 0) {
31f18b77 2561 dout(10) << __FUNC__ << ": " << cid << " dne" << dendl;
7c673cae
FG
2562 return 1; // if collection does not exist, there is no guard, and we can replay.
2563 }
2564
2565 char buf[100];
2566 int r = chain_fgetxattr(fd, GLOBAL_REPLAY_GUARD_XATTR, buf, sizeof(buf));
2567 if (r < 0) {
31f18b77 2568 dout(20) << __FUNC__ << ": no xattr" << dendl;
11fdf7f2 2569 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
2570 VOID_TEMP_FAILURE_RETRY(::close(fd));
2571 return 1; // no xattr
2572 }
2573 bufferlist bl;
2574 bl.append(buf, r);
2575
2576 SequencerPosition opos;
11fdf7f2
TL
2577 auto p = bl.cbegin();
2578 decode(opos, p);
7c673cae
FG
2579
2580 VOID_TEMP_FAILURE_RETRY(::close(fd));
2581 return spos >= opos ? 1 : -1;
2582}
2583
2584
2585void FileStore::_set_replay_guard(const coll_t& cid,
2586 const SequencerPosition &spos,
2587 bool in_progress=false)
2588{
2589 char fn[PATH_MAX];
2590 get_cdir(cid, fn, sizeof(fn));
91327a77 2591 int fd = ::open(fn, O_RDONLY|O_CLOEXEC);
7c673cae
FG
2592 if (fd < 0) {
2593 int err = errno;
31f18b77 2594 derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
11fdf7f2 2595 ceph_abort_msg("_set_replay_guard failed");
7c673cae
FG
2596 }
2597 _set_replay_guard(fd, spos, 0, in_progress);
2598 VOID_TEMP_FAILURE_RETRY(::close(fd));
2599}
2600
2601
2602void FileStore::_set_replay_guard(int fd,
2603 const SequencerPosition& spos,
2604 const ghobject_t *hoid,
2605 bool in_progress)
2606{
2607 if (backend->can_checkpoint())
2608 return;
2609
31f18b77 2610 dout(10) << __FUNC__ << ": " << spos << (in_progress ? " START" : "") << dendl;
7c673cae
FG
2611
2612 _inject_failure();
2613
2614 // first make sure the previous operation commits
a8e16298
TL
2615 int r = ::fsync(fd);
2616 if (r < 0) {
2617 derr << __func__ << " fsync failed: " << cpp_strerror(errno) << dendl;
2618 ceph_abort();
2619 }
7c673cae
FG
2620
2621 if (!in_progress) {
2622 // sync object_map too. even if this object has a header or keys,
2623 // it have had them in the past and then removed them, so always
2624 // sync.
2625 object_map->sync(hoid, &spos);
2626 }
2627
2628 _inject_failure();
2629
2630 // then record that we did it
2631 bufferlist v(40);
11fdf7f2
TL
2632 encode(spos, v);
2633 encode(in_progress, v);
a8e16298 2634 r = chain_fsetxattr<true, true>(
7c673cae
FG
2635 fd, REPLAY_GUARD_XATTR, v.c_str(), v.length());
2636 if (r < 0) {
2637 derr << "fsetxattr " << REPLAY_GUARD_XATTR << " got " << cpp_strerror(r) << dendl;
11fdf7f2 2638 ceph_abort_msg("fsetxattr failed");
7c673cae
FG
2639 }
2640
2641 // and make sure our xattr is durable.
a8e16298
TL
2642 r = ::fsync(fd);
2643 if (r < 0) {
2644 derr << __func__ << " fsync failed: " << cpp_strerror(errno) << dendl;
2645 ceph_abort();
2646 }
7c673cae
FG
2647
2648 _inject_failure();
2649
31f18b77 2650 dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
7c673cae
FG
2651}
2652
2653void FileStore::_close_replay_guard(const coll_t& cid,
2654 const SequencerPosition &spos)
2655{
2656 char fn[PATH_MAX];
2657 get_cdir(cid, fn, sizeof(fn));
91327a77 2658 int fd = ::open(fn, O_RDONLY|O_CLOEXEC);
7c673cae
FG
2659 if (fd < 0) {
2660 int err = errno;
31f18b77 2661 derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
11fdf7f2 2662 ceph_abort_msg("_close_replay_guard failed");
7c673cae
FG
2663 }
2664 _close_replay_guard(fd, spos);
2665 VOID_TEMP_FAILURE_RETRY(::close(fd));
2666}
2667
2668void FileStore::_close_replay_guard(int fd, const SequencerPosition& spos,
2669 const ghobject_t *hoid)
2670{
2671 if (backend->can_checkpoint())
2672 return;
2673
31f18b77 2674 dout(10) << __FUNC__ << ": " << spos << dendl;
7c673cae
FG
2675
2676 _inject_failure();
2677
2678 // sync object_map too. even if this object has a header or keys,
2679 // it have had them in the past and then removed them, so always
2680 // sync.
2681 object_map->sync(hoid, &spos);
2682
2683 // then record that we are done with this operation
2684 bufferlist v(40);
11fdf7f2 2685 encode(spos, v);
7c673cae 2686 bool in_progress = false;
11fdf7f2 2687 encode(in_progress, v);
7c673cae
FG
2688 int r = chain_fsetxattr<true, true>(
2689 fd, REPLAY_GUARD_XATTR, v.c_str(), v.length());
2690 if (r < 0) {
2691 derr << "fsetxattr " << REPLAY_GUARD_XATTR << " got " << cpp_strerror(r) << dendl;
11fdf7f2 2692 ceph_abort_msg("fsetxattr failed");
7c673cae
FG
2693 }
2694
2695 // and make sure our xattr is durable.
a8e16298
TL
2696 r = ::fsync(fd);
2697 if (r < 0) {
2698 derr << __func__ << " fsync failed: " << cpp_strerror(errno) << dendl;
2699 ceph_abort();
2700 }
7c673cae
FG
2701
2702 _inject_failure();
2703
31f18b77 2704 dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
7c673cae
FG
2705}
2706
2707int FileStore::_check_replay_guard(const coll_t& cid, const ghobject_t &oid,
2708 const SequencerPosition& spos)
2709{
2710 if (!replaying || backend->can_checkpoint())
2711 return 1;
2712
2713 int r = _check_global_replay_guard(cid, spos);
2714 if (r < 0)
2715 return r;
2716
2717 FDRef fd;
2718 r = lfn_open(cid, oid, false, &fd);
2719 if (r < 0) {
31f18b77 2720 dout(10) << __FUNC__ << ": " << cid << " " << oid << " dne" << dendl;
7c673cae
FG
2721 return 1; // if file does not exist, there is no guard, and we can replay.
2722 }
2723 int ret = _check_replay_guard(**fd, spos);
2724 lfn_close(fd);
2725 return ret;
2726}
2727
2728int FileStore::_check_replay_guard(const coll_t& cid, const SequencerPosition& spos)
2729{
2730 if (!replaying || backend->can_checkpoint())
2731 return 1;
2732
2733 char fn[PATH_MAX];
2734 get_cdir(cid, fn, sizeof(fn));
91327a77 2735 int fd = ::open(fn, O_RDONLY|O_CLOEXEC);
7c673cae 2736 if (fd < 0) {
31f18b77 2737 dout(10) << __FUNC__ << ": " << cid << " dne" << dendl;
7c673cae
FG
2738 return 1; // if collection does not exist, there is no guard, and we can replay.
2739 }
2740 int ret = _check_replay_guard(fd, spos);
2741 VOID_TEMP_FAILURE_RETRY(::close(fd));
2742 return ret;
2743}
2744
2745int FileStore::_check_replay_guard(int fd, const SequencerPosition& spos)
2746{
2747 if (!replaying || backend->can_checkpoint())
2748 return 1;
2749
2750 char buf[100];
2751 int r = chain_fgetxattr(fd, REPLAY_GUARD_XATTR, buf, sizeof(buf));
2752 if (r < 0) {
31f18b77 2753 dout(20) << __FUNC__ << ": no xattr" << dendl;
11fdf7f2 2754 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
2755 return 1; // no xattr
2756 }
2757 bufferlist bl;
2758 bl.append(buf, r);
2759
2760 SequencerPosition opos;
11fdf7f2
TL
2761 auto p = bl.cbegin();
2762 decode(opos, p);
7c673cae
FG
2763 bool in_progress = false;
2764 if (!p.end()) // older journals don't have this
11fdf7f2 2765 decode(in_progress, p);
7c673cae 2766 if (opos > spos) {
31f18b77 2767 dout(10) << __FUNC__ << ": object has " << opos << " > current pos " << spos
7c673cae
FG
2768 << ", now or in future, SKIPPING REPLAY" << dendl;
2769 return -1;
2770 } else if (opos == spos) {
2771 if (in_progress) {
31f18b77 2772 dout(10) << __FUNC__ << ": object has " << opos << " == current pos " << spos
7c673cae
FG
2773 << ", in_progress=true, CONDITIONAL REPLAY" << dendl;
2774 return 0;
2775 } else {
31f18b77 2776 dout(10) << __FUNC__ << ": object has " << opos << " == current pos " << spos
7c673cae
FG
2777 << ", in_progress=false, SKIPPING REPLAY" << dendl;
2778 return -1;
2779 }
2780 } else {
31f18b77 2781 dout(10) << __FUNC__ << ": object has " << opos << " < current pos " << spos
7c673cae
FG
2782 << ", in past, will replay" << dendl;
2783 return 1;
2784 }
2785}
2786
2787void FileStore::_do_transaction(
2788 Transaction& t, uint64_t op_seq, int trans_num,
11fdf7f2
TL
2789 ThreadPool::TPHandle *handle,
2790 const char *osr_name)
7c673cae 2791{
31f18b77 2792 dout(10) << __FUNC__ << ": on " << &t << dendl;
7c673cae 2793
7c673cae
FG
2794 Transaction::iterator i = t.begin();
2795
2796 SequencerPosition spos(op_seq, trans_num, 0);
2797 while (i.have_op()) {
2798 if (handle)
2799 handle->reset_tp_timeout();
2800
2801 Transaction::Op *op = i.decode_op();
2802 int r = 0;
2803
2804 _inject_failure();
2805
2806 switch (op->op) {
2807 case Transaction::OP_NOP:
2808 break;
2809 case Transaction::OP_TOUCH:
9f95a23c 2810 case Transaction::OP_CREATE:
7c673cae
FG
2811 {
2812 const coll_t &_cid = i.get_cid(op->cid);
2813 const ghobject_t &oid = i.get_oid(op->oid);
2814 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2815 _cid : _cid.get_temp();
2816 tracepoint(objectstore, touch_enter, osr_name);
2817 if (_check_replay_guard(cid, oid, spos) > 0)
2818 r = _touch(cid, oid);
2819 tracepoint(objectstore, touch_exit, r);
2820 }
2821 break;
2822
2823 case Transaction::OP_WRITE:
2824 {
2825 const coll_t &_cid = i.get_cid(op->cid);
2826 const ghobject_t &oid = i.get_oid(op->oid);
2827 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2828 _cid : _cid.get_temp();
2829 uint64_t off = op->off;
2830 uint64_t len = op->len;
2831 uint32_t fadvise_flags = i.get_fadvise_flags();
2832 bufferlist bl;
2833 i.decode_bl(bl);
2834 tracepoint(objectstore, write_enter, osr_name, off, len);
2835 if (_check_replay_guard(cid, oid, spos) > 0)
2836 r = _write(cid, oid, off, len, bl, fadvise_flags);
2837 tracepoint(objectstore, write_exit, r);
2838 }
2839 break;
2840
2841 case Transaction::OP_ZERO:
2842 {
2843 const coll_t &_cid = i.get_cid(op->cid);
2844 const ghobject_t &oid = i.get_oid(op->oid);
2845 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2846 _cid : _cid.get_temp();
2847 uint64_t off = op->off;
2848 uint64_t len = op->len;
2849 tracepoint(objectstore, zero_enter, osr_name, off, len);
2850 if (_check_replay_guard(cid, oid, spos) > 0)
2851 r = _zero(cid, oid, off, len);
2852 tracepoint(objectstore, zero_exit, r);
2853 }
2854 break;
2855
2856 case Transaction::OP_TRIMCACHE:
2857 {
2858 // deprecated, no-op
2859 }
2860 break;
2861
2862 case Transaction::OP_TRUNCATE:
2863 {
2864 const coll_t &_cid = i.get_cid(op->cid);
2865 const ghobject_t &oid = i.get_oid(op->oid);
2866 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2867 _cid : _cid.get_temp();
2868 uint64_t off = op->off;
2869 tracepoint(objectstore, truncate_enter, osr_name, off);
2870 if (_check_replay_guard(cid, oid, spos) > 0)
2871 r = _truncate(cid, oid, off);
2872 tracepoint(objectstore, truncate_exit, r);
2873 }
2874 break;
2875
2876 case Transaction::OP_REMOVE:
2877 {
2878 const coll_t &_cid = i.get_cid(op->cid);
2879 const ghobject_t &oid = i.get_oid(op->oid);
2880 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2881 _cid : _cid.get_temp();
2882 tracepoint(objectstore, remove_enter, osr_name);
2883 if (_check_replay_guard(cid, oid, spos) > 0)
2884 r = _remove(cid, oid, spos);
2885 tracepoint(objectstore, remove_exit, r);
2886 }
2887 break;
2888
2889 case Transaction::OP_SETATTR:
2890 {
2891 const coll_t &_cid = i.get_cid(op->cid);
2892 const ghobject_t &oid = i.get_oid(op->oid);
2893 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2894 _cid : _cid.get_temp();
2895 string name = i.decode_string();
2896 bufferlist bl;
2897 i.decode_bl(bl);
2898 tracepoint(objectstore, setattr_enter, osr_name);
2899 if (_check_replay_guard(cid, oid, spos) > 0) {
2900 map<string, bufferptr> to_set;
2901 to_set[name] = bufferptr(bl.c_str(), bl.length());
2902 r = _setattrs(cid, oid, to_set, spos);
2903 if (r == -ENOSPC)
2904 dout(0) << " ENOSPC on setxattr on " << cid << "/" << oid
2905 << " name " << name << " size " << bl.length() << dendl;
2906 }
2907 tracepoint(objectstore, setattr_exit, r);
2908 }
2909 break;
2910
2911 case Transaction::OP_SETATTRS:
2912 {
2913 const coll_t &_cid = i.get_cid(op->cid);
2914 const ghobject_t &oid = i.get_oid(op->oid);
2915 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2916 _cid : _cid.get_temp();
2917 map<string, bufferptr> aset;
2918 i.decode_attrset(aset);
2919 tracepoint(objectstore, setattrs_enter, osr_name);
2920 if (_check_replay_guard(cid, oid, spos) > 0)
2921 r = _setattrs(cid, oid, aset, spos);
2922 tracepoint(objectstore, setattrs_exit, r);
2923 if (r == -ENOSPC)
2924 dout(0) << " ENOSPC on setxattrs on " << cid << "/" << oid << dendl;
2925 }
2926 break;
2927
2928 case Transaction::OP_RMATTR:
2929 {
2930 const coll_t &_cid = i.get_cid(op->cid);
2931 const ghobject_t &oid = i.get_oid(op->oid);
2932 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2933 _cid : _cid.get_temp();
2934 string name = i.decode_string();
2935 tracepoint(objectstore, rmattr_enter, osr_name);
2936 if (_check_replay_guard(cid, oid, spos) > 0)
2937 r = _rmattr(cid, oid, name.c_str(), spos);
2938 tracepoint(objectstore, rmattr_exit, r);
2939 }
2940 break;
2941
2942 case Transaction::OP_RMATTRS:
2943 {
2944 const coll_t &_cid = i.get_cid(op->cid);
2945 const ghobject_t &oid = i.get_oid(op->oid);
2946 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2947 _cid : _cid.get_temp();
2948 tracepoint(objectstore, rmattrs_enter, osr_name);
2949 if (_check_replay_guard(cid, oid, spos) > 0)
2950 r = _rmattrs(cid, oid, spos);
2951 tracepoint(objectstore, rmattrs_exit, r);
2952 }
2953 break;
2954
2955 case Transaction::OP_CLONE:
2956 {
2957 const coll_t &_cid = i.get_cid(op->cid);
2958 const ghobject_t &oid = i.get_oid(op->oid);
2959 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2960 _cid : _cid.get_temp();
2961 const ghobject_t &noid = i.get_oid(op->dest_oid);
2962 tracepoint(objectstore, clone_enter, osr_name);
2963 r = _clone(cid, oid, noid, spos);
2964 tracepoint(objectstore, clone_exit, r);
2965 }
2966 break;
2967
2968 case Transaction::OP_CLONERANGE:
2969 {
2970 const coll_t &_cid = i.get_cid(op->cid);
2971 const ghobject_t &oid = i.get_oid(op->oid);
2972 const ghobject_t &noid = i.get_oid(op->dest_oid);
2973 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2974 _cid : _cid.get_temp();
2975 const coll_t &ncid = !_need_temp_object_collection(_cid, noid) ?
2976 _cid : _cid.get_temp();
2977 uint64_t off = op->off;
2978 uint64_t len = op->len;
2979 tracepoint(objectstore, clone_range_enter, osr_name, len);
2980 r = _clone_range(cid, oid, ncid, noid, off, len, off, spos);
2981 tracepoint(objectstore, clone_range_exit, r);
2982 }
2983 break;
2984
2985 case Transaction::OP_CLONERANGE2:
2986 {
2987 const coll_t &_cid = i.get_cid(op->cid);
2988 const ghobject_t &oid = i.get_oid(op->oid);
2989 const ghobject_t &noid = i.get_oid(op->dest_oid);
2990 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2991 _cid : _cid.get_temp();
2992 const coll_t &ncid = !_need_temp_object_collection(_cid, noid) ?
2993 _cid : _cid.get_temp();
2994 uint64_t srcoff = op->off;
2995 uint64_t len = op->len;
2996 uint64_t dstoff = op->dest_off;
2997 tracepoint(objectstore, clone_range2_enter, osr_name, len);
2998 r = _clone_range(cid, oid, ncid, noid, srcoff, len, dstoff, spos);
2999 tracepoint(objectstore, clone_range2_exit, r);
3000 }
3001 break;
3002
3003 case Transaction::OP_MKCOLL:
3004 {
3005 const coll_t &cid = i.get_cid(op->cid);
3006 tracepoint(objectstore, mkcoll_enter, osr_name);
3007 if (_check_replay_guard(cid, spos) > 0)
3008 r = _create_collection(cid, op->split_bits, spos);
3009 tracepoint(objectstore, mkcoll_exit, r);
3010 }
3011 break;
3012
3013 case Transaction::OP_COLL_SET_BITS:
3014 {
3015 const coll_t &cid = i.get_cid(op->cid);
3016 int bits = op->split_bits;
3017 r = _collection_set_bits(cid, bits);
3018 }
3019 break;
3020
3021 case Transaction::OP_COLL_HINT:
3022 {
3023 const coll_t &cid = i.get_cid(op->cid);
f67539c2 3024 uint32_t type = op->hint;
7c673cae
FG
3025 bufferlist hint;
3026 i.decode_bl(hint);
11fdf7f2 3027 auto hiter = hint.cbegin();
7c673cae
FG
3028 if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) {
3029 uint32_t pg_num;
3030 uint64_t num_objs;
11fdf7f2
TL
3031 decode(pg_num, hiter);
3032 decode(num_objs, hiter);
7c673cae
FG
3033 if (_check_replay_guard(cid, spos) > 0) {
3034 r = _collection_hint_expected_num_objs(cid, pg_num, num_objs, spos);
3035 }
3036 } else {
3037 // Ignore the hint
3038 dout(10) << "Unrecognized collection hint type: " << type << dendl;
3039 }
3040 }
3041 break;
3042
3043 case Transaction::OP_RMCOLL:
3044 {
3045 const coll_t &cid = i.get_cid(op->cid);
3046 tracepoint(objectstore, rmcoll_enter, osr_name);
3047 if (_check_replay_guard(cid, spos) > 0)
3048 r = _destroy_collection(cid);
3049 tracepoint(objectstore, rmcoll_exit, r);
3050 }
3051 break;
3052
3053 case Transaction::OP_COLL_ADD:
3054 {
3055 const coll_t &ocid = i.get_cid(op->cid);
3056 const coll_t &ncid = i.get_cid(op->dest_cid);
3057 const ghobject_t &oid = i.get_oid(op->oid);
3058
11fdf7f2 3059 ceph_assert(oid.hobj.pool >= -1);
7c673cae
FG
3060
3061 // always followed by OP_COLL_REMOVE
3062 Transaction::Op *op2 = i.decode_op();
3063 const coll_t &ocid2 = i.get_cid(op2->cid);
3064 const ghobject_t &oid2 = i.get_oid(op2->oid);
11fdf7f2
TL
3065 ceph_assert(op2->op == Transaction::OP_COLL_REMOVE);
3066 ceph_assert(ocid2 == ocid);
3067 ceph_assert(oid2 == oid);
7c673cae
FG
3068
3069 tracepoint(objectstore, coll_add_enter);
3070 r = _collection_add(ncid, ocid, oid, spos);
3071 tracepoint(objectstore, coll_add_exit, r);
3072 spos.op++;
3073 if (r < 0)
3074 break;
3075 tracepoint(objectstore, coll_remove_enter, osr_name);
3076 if (_check_replay_guard(ocid, oid, spos) > 0)
3077 r = _remove(ocid, oid, spos);
3078 tracepoint(objectstore, coll_remove_exit, r);
3079 }
3080 break;
3081
3082 case Transaction::OP_COLL_MOVE:
3083 {
3084 // WARNING: this is deprecated and buggy; only here to replay old journals.
3085 const coll_t &ocid = i.get_cid(op->cid);
3086 const coll_t &ncid = i.get_cid(op->dest_cid);
3087 const ghobject_t &oid = i.get_oid(op->oid);
3088 tracepoint(objectstore, coll_move_enter);
3089 r = _collection_add(ocid, ncid, oid, spos);
3090 if (r == 0 &&
3091 (_check_replay_guard(ocid, oid, spos) > 0))
3092 r = _remove(ocid, oid, spos);
3093 tracepoint(objectstore, coll_move_exit, r);
3094 }
3095 break;
3096
3097 case Transaction::OP_COLL_MOVE_RENAME:
3098 {
3099 const coll_t &_oldcid = i.get_cid(op->cid);
3100 const ghobject_t &oldoid = i.get_oid(op->oid);
3101 const coll_t &_newcid = i.get_cid(op->dest_cid);
3102 const ghobject_t &newoid = i.get_oid(op->dest_oid);
3103 const coll_t &oldcid = !_need_temp_object_collection(_oldcid, oldoid) ?
3104 _oldcid : _oldcid.get_temp();
3105 const coll_t &newcid = !_need_temp_object_collection(_newcid, newoid) ?
3106 _oldcid : _newcid.get_temp();
3107 tracepoint(objectstore, coll_move_rename_enter);
3108 r = _collection_move_rename(oldcid, oldoid, newcid, newoid, spos);
3109 tracepoint(objectstore, coll_move_rename_exit, r);
3110 }
3111 break;
3112
3113 case Transaction::OP_TRY_RENAME:
3114 {
3115 const coll_t &_cid = i.get_cid(op->cid);
3116 const ghobject_t &oldoid = i.get_oid(op->oid);
3117 const ghobject_t &newoid = i.get_oid(op->dest_oid);
3118 const coll_t &oldcid = !_need_temp_object_collection(_cid, oldoid) ?
3119 _cid : _cid.get_temp();
3120 const coll_t &newcid = !_need_temp_object_collection(_cid, newoid) ?
3121 _cid : _cid.get_temp();
3122 tracepoint(objectstore, coll_try_rename_enter);
3123 r = _collection_move_rename(oldcid, oldoid, newcid, newoid, spos, true);
3124 tracepoint(objectstore, coll_try_rename_exit, r);
3125 }
3126 break;
3127
3128 case Transaction::OP_COLL_SETATTR:
3129 case Transaction::OP_COLL_RMATTR:
11fdf7f2 3130 ceph_abort_msg("collection attr methods no longer implemented");
7c673cae
FG
3131 break;
3132
3133 case Transaction::OP_COLL_RENAME:
3134 {
3135 r = -EOPNOTSUPP;
3136 }
3137 break;
3138
3139 case Transaction::OP_OMAP_CLEAR:
3140 {
3141 const coll_t &_cid = i.get_cid(op->cid);
3142 const ghobject_t &oid = i.get_oid(op->oid);
3143 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3144 _cid : _cid.get_temp();
3145 tracepoint(objectstore, omap_clear_enter, osr_name);
28e407b8
AA
3146 if (_check_replay_guard(cid, oid, spos) > 0)
3147 r = _omap_clear(cid, oid, spos);
7c673cae
FG
3148 tracepoint(objectstore, omap_clear_exit, r);
3149 }
3150 break;
3151 case Transaction::OP_OMAP_SETKEYS:
3152 {
3153 const coll_t &_cid = i.get_cid(op->cid);
3154 const ghobject_t &oid = i.get_oid(op->oid);
3155 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3156 _cid : _cid.get_temp();
3157 map<string, bufferlist> aset;
3158 i.decode_attrset(aset);
3159 tracepoint(objectstore, omap_setkeys_enter, osr_name);
28e407b8
AA
3160 if (_check_replay_guard(cid, oid, spos) > 0)
3161 r = _omap_setkeys(cid, oid, aset, spos);
7c673cae
FG
3162 tracepoint(objectstore, omap_setkeys_exit, r);
3163 }
3164 break;
3165 case Transaction::OP_OMAP_RMKEYS:
3166 {
3167 const coll_t &_cid = i.get_cid(op->cid);
3168 const ghobject_t &oid = i.get_oid(op->oid);
3169 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3170 _cid : _cid.get_temp();
3171 set<string> keys;
3172 i.decode_keyset(keys);
3173 tracepoint(objectstore, omap_rmkeys_enter, osr_name);
28e407b8
AA
3174 if (_check_replay_guard(cid, oid, spos) > 0)
3175 r = _omap_rmkeys(cid, oid, keys, spos);
7c673cae
FG
3176 tracepoint(objectstore, omap_rmkeys_exit, r);
3177 }
3178 break;
3179 case Transaction::OP_OMAP_RMKEYRANGE:
3180 {
3181 const coll_t &_cid = i.get_cid(op->cid);
3182 const ghobject_t &oid = i.get_oid(op->oid);
3183 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3184 _cid : _cid.get_temp();
3185 string first, last;
3186 first = i.decode_string();
3187 last = i.decode_string();
3188 tracepoint(objectstore, omap_rmkeyrange_enter, osr_name);
28e407b8
AA
3189 if (_check_replay_guard(cid, oid, spos) > 0)
3190 r = _omap_rmkeyrange(cid, oid, first, last, spos);
7c673cae
FG
3191 tracepoint(objectstore, omap_rmkeyrange_exit, r);
3192 }
3193 break;
3194 case Transaction::OP_OMAP_SETHEADER:
3195 {
3196 const coll_t &_cid = i.get_cid(op->cid);
3197 const ghobject_t &oid = i.get_oid(op->oid);
3198 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3199 _cid : _cid.get_temp();
3200 bufferlist bl;
3201 i.decode_bl(bl);
3202 tracepoint(objectstore, omap_setheader_enter, osr_name);
28e407b8
AA
3203 if (_check_replay_guard(cid, oid, spos) > 0)
3204 r = _omap_setheader(cid, oid, bl, spos);
7c673cae
FG
3205 tracepoint(objectstore, omap_setheader_exit, r);
3206 }
3207 break;
3208 case Transaction::OP_SPLIT_COLLECTION:
3209 {
11fdf7f2 3210 ceph_abort_msg("not legacy journal; upgrade to firefly first");
7c673cae
FG
3211 }
3212 break;
3213 case Transaction::OP_SPLIT_COLLECTION2:
3214 {
3215 coll_t cid = i.get_cid(op->cid);
3216 uint32_t bits = op->split_bits;
3217 uint32_t rem = op->split_rem;
3218 coll_t dest = i.get_cid(op->dest_cid);
3219 tracepoint(objectstore, split_coll2_enter, osr_name);
3220 r = _split_collection(cid, bits, rem, dest, spos);
3221 tracepoint(objectstore, split_coll2_exit, r);
3222 }
3223 break;
3224
11fdf7f2
TL
3225 case Transaction::OP_MERGE_COLLECTION:
3226 {
3227 coll_t cid = i.get_cid(op->cid);
3228 uint32_t bits = op->split_bits;
3229 coll_t dest = i.get_cid(op->dest_cid);
3230 tracepoint(objectstore, merge_coll_enter, osr_name);
3231 r = _merge_collection(cid, bits, dest, spos);
3232 tracepoint(objectstore, merge_coll_exit, r);
3233 }
3234 break;
3235
7c673cae
FG
3236 case Transaction::OP_SETALLOCHINT:
3237 {
3238 const coll_t &_cid = i.get_cid(op->cid);
3239 const ghobject_t &oid = i.get_oid(op->oid);
3240 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3241 _cid : _cid.get_temp();
3242 uint64_t expected_object_size = op->expected_object_size;
3243 uint64_t expected_write_size = op->expected_write_size;
3244 tracepoint(objectstore, setallochint_enter, osr_name);
3245 if (_check_replay_guard(cid, oid, spos) > 0)
3246 r = _set_alloc_hint(cid, oid, expected_object_size,
3247 expected_write_size);
3248 tracepoint(objectstore, setallochint_exit, r);
3249 }
3250 break;
3251
3252 default:
3253 derr << "bad op " << op->op << dendl;
3254 ceph_abort();
3255 }
3256
3257 if (r < 0) {
3258 bool ok = false;
3259
3260 if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE ||
3261 op->op == Transaction::OP_CLONE ||
3262 op->op == Transaction::OP_CLONERANGE2 ||
3263 op->op == Transaction::OP_COLL_ADD ||
3264 op->op == Transaction::OP_SETATTR ||
3265 op->op == Transaction::OP_SETATTRS ||
3266 op->op == Transaction::OP_RMATTR ||
3267 op->op == Transaction::OP_OMAP_SETKEYS ||
3268 op->op == Transaction::OP_OMAP_RMKEYS ||
3269 op->op == Transaction::OP_OMAP_RMKEYRANGE ||
3270 op->op == Transaction::OP_OMAP_SETHEADER))
3271 // -ENOENT is normally okay
3272 // ...including on a replayed OP_RMCOLL with checkpoint mode
3273 ok = true;
3274 if (r == -ENODATA)
3275 ok = true;
3276
3277 if (op->op == Transaction::OP_SETALLOCHINT)
3278 // Either EOPNOTSUPP or EINVAL most probably. EINVAL in most
3279 // cases means invalid hint size (e.g. too big, not a multiple
3280 // of block size, etc) or, at least on xfs, an attempt to set
3281 // or change it when the file is not empty. However,
3282 // OP_SETALLOCHINT is advisory, so ignore all errors.
3283 ok = true;
3284
3285 if (replaying && !backend->can_checkpoint()) {
3286 if (r == -EEXIST && op->op == Transaction::OP_MKCOLL) {
3287 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3288 ok = true;
3289 }
3290 if (r == -EEXIST && op->op == Transaction::OP_COLL_ADD) {
3291 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3292 ok = true;
3293 }
3294 if (r == -EEXIST && op->op == Transaction::OP_COLL_MOVE) {
3295 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3296 ok = true;
3297 }
3298 if (r == -ERANGE) {
3299 dout(10) << "tolerating ERANGE on replay" << dendl;
3300 ok = true;
3301 }
3302 if (r == -ENOENT) {
3303 dout(10) << "tolerating ENOENT on replay" << dendl;
3304 ok = true;
3305 }
3306 }
3307
3308 if (!ok) {
3309 const char *msg = "unexpected error code";
3310
3311 if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE ||
3312 op->op == Transaction::OP_CLONE ||
3313 op->op == Transaction::OP_CLONERANGE2)) {
3314 msg = "ENOENT on clone suggests osd bug";
3315 } else if (r == -ENOSPC) {
3316 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
3317 // by partially applying transactions.
3318 msg = "ENOSPC from disk filesystem, misconfigured cluster";
3319 } else if (r == -ENOTEMPTY) {
3320 msg = "ENOTEMPTY suggests garbage data in osd data dir";
3321 } else if (r == -EPERM) {
3322 msg = "EPERM suggests file(s) in osd data dir not owned by ceph user, or leveldb corruption";
3323 }
3324
3325 derr << " error " << cpp_strerror(r) << " not handled on operation " << op
3326 << " (" << spos << ", or op " << spos.op << ", counting from 0)" << dendl;
3327 dout(0) << msg << dendl;
3328 dout(0) << " transaction dump:\n";
3329 JSONFormatter f(true);
3330 f.open_object_section("transaction");
3331 t.dump(&f);
3332 f.close_section();
3333 f.flush(*_dout);
3334 *_dout << dendl;
3335
3336 if (r == -EMFILE) {
3337 dump_open_fds(cct);
3338 }
3339
11fdf7f2 3340 ceph_abort_msg("unexpected error");
7c673cae
FG
3341 }
3342 }
3343
3344 spos.op++;
3345 }
3346
3347 _inject_failure();
3348}
3349
3350 /*********************************************/
3351
3352
3353
3354// --------------------
3355// objects
3356
11fdf7f2 3357bool FileStore::exists(CollectionHandle& ch, const ghobject_t& oid)
7c673cae 3358{
11fdf7f2
TL
3359 tracepoint(objectstore, exists_enter, ch->cid.c_str());
3360 auto osr = static_cast<OpSequencer*>(ch.get());
3361 osr->wait_for_apply(oid);
7c673cae 3362 struct stat st;
11fdf7f2 3363 bool retval = stat(ch, oid, &st) == 0;
7c673cae
FG
3364 tracepoint(objectstore, exists_exit, retval);
3365 return retval;
3366}
3367
3368int FileStore::stat(
11fdf7f2 3369 CollectionHandle& ch, const ghobject_t& oid, struct stat *st, bool allow_eio)
7c673cae 3370{
11fdf7f2
TL
3371 tracepoint(objectstore, stat_enter, ch->cid.c_str());
3372 auto osr = static_cast<OpSequencer*>(ch.get());
3373 osr->wait_for_apply(oid);
3374 const coll_t& cid = !_need_temp_object_collection(ch->cid, oid) ? ch->cid : ch->cid.get_temp();
7c673cae 3375 int r = lfn_stat(cid, oid, st);
11fdf7f2 3376 ceph_assert(allow_eio || !m_filestore_fail_eio || r != -EIO);
7c673cae 3377 if (r < 0) {
11fdf7f2 3378 dout(10) << __FUNC__ << ": " << ch->cid << "/" << oid
7c673cae
FG
3379 << " = " << r << dendl;
3380 } else {
11fdf7f2 3381 dout(10) << __FUNC__ << ": " << ch->cid << "/" << oid
7c673cae
FG
3382 << " = " << r
3383 << " (size " << st->st_size << ")" << dendl;
3384 }
3385 if (cct->_conf->filestore_debug_inject_read_err &&
3386 debug_mdata_eio(oid)) {
3387 return -EIO;
3388 } else {
3389 tracepoint(objectstore, stat_exit, r);
3390 return r;
3391 }
3392}
3393
3394int FileStore::set_collection_opts(
11fdf7f2 3395 CollectionHandle& ch,
7c673cae
FG
3396 const pool_opts_t& opts)
3397{
3398 return -EOPNOTSUPP;
3399}
3400
3401int FileStore::read(
11fdf7f2 3402 CollectionHandle& ch,
7c673cae
FG
3403 const ghobject_t& oid,
3404 uint64_t offset,
3405 size_t len,
3406 bufferlist& bl,
224ce89b 3407 uint32_t op_flags)
7c673cae
FG
3408{
3409 int got;
11fdf7f2
TL
3410 tracepoint(objectstore, read_enter, ch->cid.c_str(), offset, len);
3411 const coll_t& cid = !_need_temp_object_collection(ch->cid, oid) ? ch->cid : ch->cid.get_temp();
7c673cae 3412
31f18b77 3413 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
7c673cae 3414
11fdf7f2
TL
3415 auto osr = static_cast<OpSequencer*>(ch.get());
3416 osr->wait_for_apply(oid);
3417
7c673cae
FG
3418 FDRef fd;
3419 int r = lfn_open(cid, oid, false, &fd);
3420 if (r < 0) {
31f18b77 3421 dout(10) << __FUNC__ << ": (" << cid << "/" << oid << ") open error: "
7c673cae
FG
3422 << cpp_strerror(r) << dendl;
3423 return r;
3424 }
3425
3426 if (offset == 0 && len == 0) {
3427 struct stat st;
3428 memset(&st, 0, sizeof(struct stat));
3429 int r = ::fstat(**fd, &st);
11fdf7f2 3430 ceph_assert(r == 0);
7c673cae
FG
3431 len = st.st_size;
3432 }
3433
3434#ifdef HAVE_POSIX_FADVISE
3435 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_RANDOM)
3436 posix_fadvise(**fd, offset, len, POSIX_FADV_RANDOM);
3437 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL)
3438 posix_fadvise(**fd, offset, len, POSIX_FADV_SEQUENTIAL);
3439#endif
3440
3441 bufferptr bptr(len); // prealloc space for entire read
3442 got = safe_pread(**fd, bptr.c_str(), len, offset);
3443 if (got < 0) {
31f18b77 3444 dout(10) << __FUNC__ << ": (" << cid << "/" << oid << ") pread error: " << cpp_strerror(got) << dendl;
7c673cae 3445 lfn_close(fd);
7c673cae
FG
3446 return got;
3447 }
3448 bptr.set_length(got); // properly size the buffer
3449 bl.clear();
3450 bl.push_back(std::move(bptr)); // put it in the target bufferlist
3451
3452#ifdef HAVE_POSIX_FADVISE
3453 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED)
3454 posix_fadvise(**fd, offset, len, POSIX_FADV_DONTNEED);
3455 if (op_flags & (CEPH_OSD_OP_FLAG_FADVISE_RANDOM | CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL))
3456 posix_fadvise(**fd, offset, len, POSIX_FADV_NORMAL);
3457#endif
3458
3459 if (m_filestore_sloppy_crc && (!replaying || backend->can_checkpoint())) {
3460 ostringstream ss;
3461 int errors = backend->_crc_verify_read(**fd, offset, got, bl, &ss);
3462 if (errors != 0) {
31f18b77 3463 dout(0) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~"
7c673cae 3464 << got << " ... BAD CRC:\n" << ss.str() << dendl;
11fdf7f2 3465 ceph_abort_msg("bad crc on read");
7c673cae
FG
3466 }
3467 }
3468
3469 lfn_close(fd);
3470
31f18b77 3471 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~"
7c673cae
FG
3472 << got << "/" << len << dendl;
3473 if (cct->_conf->filestore_debug_inject_read_err &&
3474 debug_data_eio(oid)) {
3475 return -EIO;
11fdf7f2
TL
3476 } else if (oid.hobj.pool > 0 && /* FIXME, see #23029 */
3477 cct->_conf->filestore_debug_random_read_err &&
3478 (rand() % (int)(cct->_conf->filestore_debug_random_read_err *
3479 100.0)) == 0) {
224ce89b
WB
3480 dout(0) << __func__ << ": inject random EIO" << dendl;
3481 return -EIO;
7c673cae
FG
3482 } else {
3483 tracepoint(objectstore, read_exit, got);
3484 return got;
3485 }
3486}
3487
3488int FileStore::_do_fiemap(int fd, uint64_t offset, size_t len,
3489 map<uint64_t, uint64_t> *m)
3490{
3491 uint64_t i;
11fdf7f2
TL
3492 struct fiemap_extent *extent = nullptr;
3493 struct fiemap *fiemap = nullptr;
7c673cae
FG
3494 int r = 0;
3495
3496more:
3497 r = backend->do_fiemap(fd, offset, len, &fiemap);
3498 if (r < 0)
3499 return r;
3500
3501 if (fiemap->fm_mapped_extents == 0) {
3502 free(fiemap);
3503 return r;
3504 }
3505
3506 extent = &fiemap->fm_extents[0];
3507
3508 /* start where we were asked to start */
3509 if (extent->fe_logical < offset) {
3510 extent->fe_length -= offset - extent->fe_logical;
3511 extent->fe_logical = offset;
3512 }
3513
3514 i = 0;
3515
3516 struct fiemap_extent *last = nullptr;
3517 while (i < fiemap->fm_mapped_extents) {
3518 struct fiemap_extent *next = extent + 1;
3519
31f18b77 3520 dout(10) << __FUNC__ << ": fm_mapped_extents=" << fiemap->fm_mapped_extents
7c673cae
FG
3521 << " fe_logical=" << extent->fe_logical << " fe_length=" << extent->fe_length << dendl;
3522
3523 /* try to merge extents */
3524 while ((i < fiemap->fm_mapped_extents - 1) &&
3525 (extent->fe_logical + extent->fe_length == next->fe_logical)) {
3526 next->fe_length += extent->fe_length;
3527 next->fe_logical = extent->fe_logical;
3528 extent = next;
3529 next = extent + 1;
3530 i++;
3531 }
3532
3533 if (extent->fe_logical + extent->fe_length > offset + len)
3534 extent->fe_length = offset + len - extent->fe_logical;
3535 (*m)[extent->fe_logical] = extent->fe_length;
3536 i++;
3537 last = extent++;
3538 }
3539 uint64_t xoffset = last->fe_logical + last->fe_length - offset;
3540 offset = last->fe_logical + last->fe_length;
3541 len -= xoffset;
3542 const bool is_last = (last->fe_flags & FIEMAP_EXTENT_LAST) || (len == 0);
3543 free(fiemap);
3544 if (!is_last) {
3545 goto more;
3546 }
3547
3548 return r;
3549}
3550
3551int FileStore::_do_seek_hole_data(int fd, uint64_t offset, size_t len,
3552 map<uint64_t, uint64_t> *m)
3553{
3554#if defined(__linux__) && defined(SEEK_HOLE) && defined(SEEK_DATA)
3555 off_t hole_pos, data_pos;
3556 int r = 0;
3557
3558 // If lseek fails with errno setting to be ENXIO, this means the current
3559 // file offset is beyond the end of the file.
3560 off_t start = offset;
3561 while(start < (off_t)(offset + len)) {
3562 data_pos = lseek(fd, start, SEEK_DATA);
3563 if (data_pos < 0) {
3564 if (errno == ENXIO)
3565 break;
3566 else {
3567 r = -errno;
3568 dout(10) << "failed to lseek: " << cpp_strerror(r) << dendl;
3569 return r;
3570 }
3571 } else if (data_pos > (off_t)(offset + len)) {
3572 break;
3573 }
3574
3575 hole_pos = lseek(fd, data_pos, SEEK_HOLE);
3576 if (hole_pos < 0) {
3577 if (errno == ENXIO) {
3578 break;
3579 } else {
3580 r = -errno;
3581 dout(10) << "failed to lseek: " << cpp_strerror(r) << dendl;
3582 return r;
3583 }
3584 }
3585
3586 if (hole_pos >= (off_t)(offset + len)) {
3587 (*m)[data_pos] = offset + len - data_pos;
3588 break;
3589 }
3590 (*m)[data_pos] = hole_pos - data_pos;
3591 start = hole_pos;
3592 }
3593
3594 return r;
3595#else
3596 (*m)[offset] = len;
3597 return 0;
3598#endif
3599}
3600
11fdf7f2 3601int FileStore::fiemap(CollectionHandle& ch, const ghobject_t& oid,
7c673cae
FG
3602 uint64_t offset, size_t len,
3603 bufferlist& bl)
3604{
3605 map<uint64_t, uint64_t> exomap;
11fdf7f2 3606 int r = fiemap(ch, oid, offset, len, exomap);
7c673cae 3607 if (r >= 0) {
11fdf7f2 3608 encode(exomap, bl);
7c673cae
FG
3609 }
3610 return r;
3611}
3612
11fdf7f2 3613int FileStore::fiemap(CollectionHandle& ch, const ghobject_t& oid,
7c673cae
FG
3614 uint64_t offset, size_t len,
3615 map<uint64_t, uint64_t>& destmap)
3616{
11fdf7f2
TL
3617 tracepoint(objectstore, fiemap_enter, ch->cid.c_str(), offset, len);
3618 const coll_t& cid = !_need_temp_object_collection(ch->cid, oid) ? ch->cid : ch->cid.get_temp();
7c673cae
FG
3619 destmap.clear();
3620
3621 if ((!backend->has_seek_data_hole() && !backend->has_fiemap()) ||
3622 len <= (size_t)m_filestore_fiemap_threshold) {
3623 destmap[offset] = len;
3624 return 0;
3625 }
3626
31f18b77 3627 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
7c673cae 3628
11fdf7f2
TL
3629 auto osr = static_cast<OpSequencer*>(ch.get());
3630 osr->wait_for_apply(oid);
3631
7c673cae
FG
3632 FDRef fd;
3633
3634 int r = lfn_open(cid, oid, false, &fd);
3635 if (r < 0) {
3636 dout(10) << "read couldn't open " << cid << "/" << oid << ": " << cpp_strerror(r) << dendl;
3637 goto done;
3638 }
3639
3640 if (backend->has_seek_data_hole()) {
3641 dout(15) << "seek_data/seek_hole " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3642 r = _do_seek_hole_data(**fd, offset, len, &destmap);
3643 } else if (backend->has_fiemap()) {
3644 dout(15) << "fiemap ioctl" << cid << "/" << oid << " " << offset << "~" << len << dendl;
3645 r = _do_fiemap(**fd, offset, len, &destmap);
3646 }
3647
3648 lfn_close(fd);
3649
3650done:
3651
31f18b77 3652 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << " num_extents=" << destmap.size() << " " << destmap << dendl;
11fdf7f2 3653 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
3654 tracepoint(objectstore, fiemap_exit, r);
3655 return r;
3656}
3657
3658int FileStore::_remove(const coll_t& cid, const ghobject_t& oid,
3659 const SequencerPosition &spos)
3660{
31f18b77 3661 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
7c673cae 3662 int r = lfn_unlink(cid, oid, spos);
31f18b77 3663 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
7c673cae
FG
3664 return r;
3665}
3666
3667int FileStore::_truncate(const coll_t& cid, const ghobject_t& oid, uint64_t size)
3668{
31f18b77 3669 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " size " << size << dendl;
7c673cae 3670 int r = lfn_truncate(cid, oid, size);
31f18b77 3671 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " size " << size << " = " << r << dendl;
7c673cae
FG
3672 return r;
3673}
3674
3675
3676int FileStore::_touch(const coll_t& cid, const ghobject_t& oid)
3677{
31f18b77 3678 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
7c673cae
FG
3679
3680 FDRef fd;
3681 int r = lfn_open(cid, oid, true, &fd);
3682 if (r < 0) {
3683 return r;
3684 } else {
3685 lfn_close(fd);
3686 }
31f18b77 3687 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
7c673cae
FG
3688 return r;
3689}
3690
3691int FileStore::_write(const coll_t& cid, const ghobject_t& oid,
3692 uint64_t offset, size_t len,
3693 const bufferlist& bl, uint32_t fadvise_flags)
3694{
31f18b77 3695 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
7c673cae
FG
3696 int r;
3697
3698 FDRef fd;
3699 r = lfn_open(cid, oid, true, &fd);
3700 if (r < 0) {
31f18b77 3701 dout(0) << __FUNC__ << ": couldn't open " << cid << "/"
7c673cae
FG
3702 << oid << ": "
3703 << cpp_strerror(r) << dendl;
3704 goto out;
3705 }
3706
3707 // write
3708 r = bl.write_fd(**fd, offset);
3709 if (r < 0) {
31f18b77 3710 derr << __FUNC__ << ": write_fd on " << cid << "/" << oid
7c673cae
FG
3711 << " error: " << cpp_strerror(r) << dendl;
3712 lfn_close(fd);
3713 goto out;
3714 }
3715 r = bl.length();
3716
3717 if (r >= 0 && m_filestore_sloppy_crc) {
3718 int rc = backend->_crc_update_write(**fd, offset, len, bl);
11fdf7f2 3719 ceph_assert(rc >= 0);
7c673cae
FG
3720 }
3721
3722 if (replaying || m_disable_wbthrottle) {
3723 if (fadvise_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED) {
3724#ifdef HAVE_POSIX_FADVISE
3725 posix_fadvise(**fd, 0, 0, POSIX_FADV_DONTNEED);
3726#endif
3727 }
3728 } else {
3729 wbthrottle.queue_wb(fd, oid, offset, len,
3730 fadvise_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
3731 }
3732
3733 lfn_close(fd);
3734
3735 out:
31f18b77 3736 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << dendl;
7c673cae
FG
3737 return r;
3738}
3739
3740int FileStore::_zero(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len)
3741{
31f18b77 3742 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
7c673cae
FG
3743 int ret = 0;
3744
3745 if (cct->_conf->filestore_punch_hole) {
3746#ifdef CEPH_HAVE_FALLOCATE
11fdf7f2 3747# if !defined(__APPLE__) && !defined(__FreeBSD__)
7c673cae
FG
3748# ifdef FALLOC_FL_KEEP_SIZE
3749 // first try to punch a hole.
3750 FDRef fd;
3751 ret = lfn_open(cid, oid, false, &fd);
3752 if (ret < 0) {
3753 goto out;
3754 }
3755
3756 struct stat st;
3757 ret = ::fstat(**fd, &st);
3758 if (ret < 0) {
3759 ret = -errno;
3760 lfn_close(fd);
3761 goto out;
3762 }
3763
3764 // first try fallocate
3765 ret = fallocate(**fd, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE,
3766 offset, len);
3767 if (ret < 0) {
3768 ret = -errno;
3769 } else {
b32b8144
FG
3770 // ensure we extend file size, if needed
3771 if (len > 0 && offset + len > (uint64_t)st.st_size) {
7c673cae
FG
3772 ret = ::ftruncate(**fd, offset + len);
3773 if (ret < 0) {
3774 ret = -errno;
3775 lfn_close(fd);
3776 goto out;
3777 }
3778 }
3779 }
3780 lfn_close(fd);
3781
3782 if (ret >= 0 && m_filestore_sloppy_crc) {
3783 int rc = backend->_crc_update_zero(**fd, offset, len);
11fdf7f2 3784 ceph_assert(rc >= 0);
7c673cae
FG
3785 }
3786
3787 if (ret == 0)
3788 goto out; // yay!
3789 if (ret != -EOPNOTSUPP)
3790 goto out; // some other error
3791# endif
3792# endif
3793#endif
3794 }
3795
3796 // lame, kernel is old and doesn't support it.
3797 // write zeros.. yuck!
31f18b77 3798 dout(20) << __FUNC__ << ": falling back to writing zeros" << dendl;
7c673cae
FG
3799 {
3800 bufferlist bl;
3801 bl.append_zero(len);
3802 ret = _write(cid, oid, offset, len, bl);
3803 }
3804
3805#ifdef CEPH_HAVE_FALLOCATE
11fdf7f2 3806# if !defined(__APPLE__) && !defined(__FreeBSD__)
7c673cae
FG
3807# ifdef FALLOC_FL_KEEP_SIZE
3808 out:
3809# endif
3810# endif
3811#endif
31f18b77 3812 dout(20) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << ret << dendl;
7c673cae
FG
3813 return ret;
3814}
3815
3816int FileStore::_clone(const coll_t& cid, const ghobject_t& oldoid, const ghobject_t& newoid,
3817 const SequencerPosition& spos)
3818{
31f18b77 3819 dout(15) << __FUNC__ << ": " << cid << "/" << oldoid << " -> " << cid << "/" << newoid << dendl;
7c673cae
FG
3820
3821 if (_check_replay_guard(cid, newoid, spos) < 0)
3822 return 0;
3823
3824 int r;
3825 FDRef o, n;
3826 {
3827 Index index;
3828 r = lfn_open(cid, oldoid, false, &o, &index);
3829 if (r < 0) {
3830 goto out2;
3831 }
11fdf7f2 3832 ceph_assert(index.index);
9f95a23c 3833 std::unique_lock l{(index.index)->access_lock};
7c673cae
FG
3834
3835 r = lfn_open(cid, newoid, true, &n, &index);
3836 if (r < 0) {
3837 goto out;
3838 }
3839 r = ::ftruncate(**n, 0);
3840 if (r < 0) {
3841 r = -errno;
3842 goto out3;
3843 }
3844 struct stat st;
3845 r = ::fstat(**o, &st);
3846 if (r < 0) {
3847 r = -errno;
3848 goto out3;
3849 }
3850
3851 r = _do_clone_range(**o, **n, 0, st.st_size, 0);
3852 if (r < 0) {
3853 goto out3;
3854 }
3855
3856 dout(20) << "objectmap clone" << dendl;
3857 r = object_map->clone(oldoid, newoid, &spos);
3858 if (r < 0 && r != -ENOENT)
3859 goto out3;
3860 }
3861
3862 {
3863 char buf[2];
3864 map<string, bufferptr> aset;
3865 r = _fgetattrs(**o, aset);
3866 if (r < 0)
3867 goto out3;
3868
3869 r = chain_fgetxattr(**o, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
3870 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
3871 r = chain_fsetxattr<true, true>(**n, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
3872 sizeof(XATTR_NO_SPILL_OUT));
3873 } else {
3874 r = chain_fsetxattr<true, true>(**n, XATTR_SPILL_OUT_NAME, XATTR_SPILL_OUT,
3875 sizeof(XATTR_SPILL_OUT));
3876 }
3877 if (r < 0)
3878 goto out3;
3879
3880 r = _fsetattrs(**n, aset);
3881 if (r < 0)
3882 goto out3;
3883 }
3884
3885 // clone is non-idempotent; record our work.
3886 _set_replay_guard(**n, spos, &newoid);
3887
3888 out3:
3889 lfn_close(n);
3890 out:
3891 lfn_close(o);
3892 out2:
31f18b77 3893 dout(10) << __FUNC__ << ": " << cid << "/" << oldoid << " -> " << cid << "/" << newoid << " = " << r << dendl;
11fdf7f2 3894 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
3895 return r;
3896}
3897
3898int FileStore::_do_clone_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff)
3899{
31f18b77 3900 dout(20) << __FUNC__ << ": copy " << srcoff << "~" << len << " to " << dstoff << dendl;
7c673cae
FG
3901 return backend->clone_range(from, to, srcoff, len, dstoff);
3902}
3903
3904int FileStore::_do_sparse_copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff)
3905{
31f18b77 3906 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << dendl;
7c673cae
FG
3907 int r = 0;
3908 map<uint64_t, uint64_t> exomap;
3909 // fiemap doesn't allow zero length
3910 if (len == 0)
3911 return 0;
3912
3913 if (backend->has_seek_data_hole()) {
3914 dout(15) << "seek_data/seek_hole " << from << " " << srcoff << "~" << len << dendl;
3915 r = _do_seek_hole_data(from, srcoff, len, &exomap);
3916 } else if (backend->has_fiemap()) {
3917 dout(15) << "fiemap ioctl" << from << " " << srcoff << "~" << len << dendl;
3918 r = _do_fiemap(from, srcoff, len, &exomap);
3919 }
3920
3921
3922 int64_t written = 0;
3923 if (r < 0)
3924 goto out;
3925
3926 for (map<uint64_t, uint64_t>::iterator miter = exomap.begin(); miter != exomap.end(); ++miter) {
3927 uint64_t it_off = miter->first - srcoff + dstoff;
3928 r = _do_copy_range(from, to, miter->first, miter->second, it_off, true);
3929 if (r < 0) {
31f18b77 3930 derr << __FUNC__ << ": copy error at " << miter->first << "~" << miter->second
7c673cae
FG
3931 << " to " << it_off << ", " << cpp_strerror(r) << dendl;
3932 break;
3933 }
3934 written += miter->second;
3935 }
3936
3937 if (r >= 0) {
3938 if (m_filestore_sloppy_crc) {
3939 int rc = backend->_crc_update_clone_range(from, to, srcoff, len, dstoff);
11fdf7f2 3940 ceph_assert(rc >= 0);
7c673cae
FG
3941 }
3942 struct stat st;
3943 r = ::fstat(to, &st);
3944 if (r < 0) {
3945 r = -errno;
31f18b77 3946 derr << __FUNC__ << ": fstat error at " << to << " " << cpp_strerror(r) << dendl;
7c673cae
FG
3947 goto out;
3948 }
3949 if (st.st_size < (int)(dstoff + len)) {
3950 r = ::ftruncate(to, dstoff + len);
3951 if (r < 0) {
3952 r = -errno;
31f18b77 3953 derr << __FUNC__ << ": ftruncate error at " << dstoff+len << " " << cpp_strerror(r) << dendl;
7c673cae
FG
3954 goto out;
3955 }
3956 }
3957 r = written;
3958 }
3959
3960 out:
31f18b77 3961 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
7c673cae
FG
3962 return r;
3963}
3964
3965int FileStore::_do_copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff, bool skip_sloppycrc)
3966{
31f18b77 3967 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << dendl;
7c673cae
FG
3968 int r = 0;
3969 loff_t pos = srcoff;
3970 loff_t end = srcoff + len;
3971 int buflen = 4096 * 16; //limit by pipe max size.see fcntl
3972
3973#ifdef CEPH_HAVE_SPLICE
3974 if (backend->has_splice()) {
3975 int pipefd[2];
9f95a23c 3976 if (pipe_cloexec(pipefd, 0) < 0) {
91327a77
AA
3977 int e = errno;
3978 derr << " pipe " << " got " << cpp_strerror(e) << dendl;
3979 return -e;
7c673cae
FG
3980 }
3981
3982 loff_t dstpos = dstoff;
3983 while (pos < end) {
11fdf7f2
TL
3984 int l = std::min<int>(end-pos, buflen);
3985 r = safe_splice(from, &pos, pipefd[1], nullptr, l, SPLICE_F_NONBLOCK);
7c673cae
FG
3986 dout(10) << " safe_splice read from " << pos << "~" << l << " got " << r << dendl;
3987 if (r < 0) {
31f18b77 3988 derr << __FUNC__ << ": safe_splice read error at " << pos << "~" << len
7c673cae
FG
3989 << ", " << cpp_strerror(r) << dendl;
3990 break;
3991 }
3992 if (r == 0) {
3993 // hrm, bad source range, wtf.
3994 r = -ERANGE;
31f18b77 3995 derr << __FUNC__ << ": got short read result at " << pos
7c673cae
FG
3996 << " of fd " << from << " len " << len << dendl;
3997 break;
3998 }
3999
11fdf7f2 4000 r = safe_splice(pipefd[0], nullptr, to, &dstpos, r, 0);
7c673cae
FG
4001 dout(10) << " safe_splice write to " << to << " len " << r
4002 << " got " << r << dendl;
4003 if (r < 0) {
31f18b77 4004 derr << __FUNC__ << ": write error at " << pos << "~"
7c673cae
FG
4005 << r << ", " << cpp_strerror(r) << dendl;
4006 break;
4007 }
4008 }
4009 close(pipefd[0]);
4010 close(pipefd[1]);
4011 } else
4012#endif
4013 {
4014 int64_t actual;
4015
4016 actual = ::lseek64(from, srcoff, SEEK_SET);
4017 if (actual != (int64_t)srcoff) {
4018 if (actual < 0)
4019 r = -errno;
4020 else
4021 r = -EINVAL;
4022 derr << "lseek64 to " << srcoff << " got " << cpp_strerror(r) << dendl;
4023 return r;
4024 }
4025 actual = ::lseek64(to, dstoff, SEEK_SET);
4026 if (actual != (int64_t)dstoff) {
4027 if (actual < 0)
4028 r = -errno;
4029 else
4030 r = -EINVAL;
4031 derr << "lseek64 to " << dstoff << " got " << cpp_strerror(r) << dendl;
4032 return r;
4033 }
4034
4035 char buf[buflen];
4036 while (pos < end) {
11fdf7f2 4037 int l = std::min<int>(end-pos, buflen);
7c673cae
FG
4038 r = ::read(from, buf, l);
4039 dout(25) << " read from " << pos << "~" << l << " got " << r << dendl;
4040 if (r < 0) {
4041 if (errno == EINTR) {
4042 continue;
4043 } else {
4044 r = -errno;
31f18b77 4045 derr << __FUNC__ << ": read error at " << pos << "~" << len
7c673cae
FG
4046 << ", " << cpp_strerror(r) << dendl;
4047 break;
4048 }
4049 }
4050 if (r == 0) {
4051 // hrm, bad source range, wtf.
4052 r = -ERANGE;
31f18b77 4053 derr << __FUNC__ << ": got short read result at " << pos
7c673cae
FG
4054 << " of fd " << from << " len " << len << dendl;
4055 break;
4056 }
4057 int op = 0;
4058 while (op < r) {
4059 int r2 = safe_write(to, buf+op, r-op);
4060 dout(25) << " write to " << to << " len " << (r-op)
4061 << " got " << r2 << dendl;
4062 if (r2 < 0) {
4063 r = r2;
31f18b77 4064 derr << __FUNC__ << ": write error at " << pos << "~"
7c673cae
FG
4065 << r-op << ", " << cpp_strerror(r) << dendl;
4066
4067 break;
4068 }
4069 op += (r-op);
4070 }
4071 if (r < 0)
4072 break;
4073 pos += r;
4074 }
4075 }
4076
4077 if (r < 0 && replaying) {
11fdf7f2 4078 ceph_assert(r == -ERANGE);
31f18b77 4079 derr << __FUNC__ << ": short source tolerated because we are replaying" << dendl;
94b18763 4080 r = len;
7c673cae 4081 }
11fdf7f2 4082 ceph_assert(replaying || pos == end);
7c673cae
FG
4083 if (r >= 0 && !skip_sloppycrc && m_filestore_sloppy_crc) {
4084 int rc = backend->_crc_update_clone_range(from, to, srcoff, len, dstoff);
11fdf7f2 4085 ceph_assert(rc >= 0);
7c673cae 4086 }
31f18b77 4087 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
7c673cae
FG
4088 return r;
4089}
4090
4091int FileStore::_clone_range(const coll_t& oldcid, const ghobject_t& oldoid, const coll_t& newcid, const ghobject_t& newoid,
4092 uint64_t srcoff, uint64_t len, uint64_t dstoff,
4093 const SequencerPosition& spos)
4094{
31f18b77 4095 dout(15) << __FUNC__ << ": " << oldcid << "/" << oldoid << " -> " << newcid << "/" << newoid << " " << srcoff << "~" << len << " to " << dstoff << dendl;
7c673cae
FG
4096
4097 if (_check_replay_guard(newcid, newoid, spos) < 0)
4098 return 0;
4099
4100 int r;
4101 FDRef o, n;
4102 r = lfn_open(oldcid, oldoid, false, &o);
4103 if (r < 0) {
4104 goto out2;
4105 }
4106 r = lfn_open(newcid, newoid, true, &n);
4107 if (r < 0) {
4108 goto out;
4109 }
4110 r = _do_clone_range(**o, **n, srcoff, len, dstoff);
4111 if (r < 0) {
4112 goto out3;
4113 }
4114
4115 // clone is non-idempotent; record our work.
4116 _set_replay_guard(**n, spos, &newoid);
4117
4118 out3:
4119 lfn_close(n);
4120 out:
4121 lfn_close(o);
4122 out2:
31f18b77 4123 dout(10) << __FUNC__ << ": " << oldcid << "/" << oldoid << " -> " << newcid << "/" << newoid << " "
7c673cae
FG
4124 << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
4125 return r;
4126}
4127
4128class SyncEntryTimeout : public Context {
4129public:
4130 CephContext* cct;
4131 explicit SyncEntryTimeout(CephContext* cct, int commit_timeo)
4132 : cct(cct), m_commit_timeo(commit_timeo)
4133 {
4134 }
4135
4136 void finish(int r) override {
4137 BackTrace *bt = new BackTrace(1);
4138 generic_dout(-1) << "FileStore: sync_entry timed out after "
4139 << m_commit_timeo << " seconds.\n";
4140 bt->print(*_dout);
4141 *_dout << dendl;
4142 delete bt;
11fdf7f2 4143 bt = nullptr;
7c673cae
FG
4144 ceph_abort();
4145 }
4146private:
4147 int m_commit_timeo;
4148};
4149
4150void FileStore::sync_entry()
4151{
9f95a23c 4152 std::unique_lock l{lock};
7c673cae 4153 while (!stop) {
9f95a23c
TL
4154 auto min_interval = ceph::make_timespan(m_filestore_min_sync_interval);
4155 auto max_interval = ceph::make_timespan(m_filestore_max_sync_interval);
4156 auto startwait = ceph::real_clock::now();
7c673cae 4157 if (!force_sync) {
31f18b77 4158 dout(20) << __FUNC__ << ": waiting for max_interval " << max_interval << dendl;
9f95a23c 4159 sync_cond.wait_for(l, max_interval);
7c673cae 4160 } else {
31f18b77 4161 dout(20) << __FUNC__ << ": not waiting, force_sync set" << dendl;
7c673cae
FG
4162 }
4163
4164 if (force_sync) {
31f18b77 4165 dout(20) << __FUNC__ << ": force_sync set" << dendl;
7c673cae
FG
4166 force_sync = false;
4167 } else if (stop) {
31f18b77 4168 dout(20) << __FUNC__ << ": stop set" << dendl;
7c673cae
FG
4169 break;
4170 } else {
4171 // wait for at least the min interval
9f95a23c 4172 auto woke = ceph::real_clock::now() - startwait;
31f18b77 4173 dout(20) << __FUNC__ << ": woke after " << woke << dendl;
7c673cae 4174 if (woke < min_interval) {
9f95a23c 4175 auto t = min_interval - woke;
31f18b77 4176 dout(20) << __FUNC__ << ": waiting for another " << t
7c673cae 4177 << " to reach min interval " << min_interval << dendl;
9f95a23c 4178 sync_cond.wait_for(l, t);
7c673cae
FG
4179 }
4180 }
4181
4182 list<Context*> fin;
4183 again:
4184 fin.swap(sync_waiters);
9f95a23c 4185 l.unlock();
7c673cae
FG
4186
4187 op_tp.pause();
4188 if (apply_manager.commit_start()) {
9f95a23c 4189 auto start = ceph::real_clock::now();
7c673cae
FG
4190 uint64_t cp = apply_manager.get_committing_seq();
4191
9f95a23c 4192 sync_entry_timeo_lock.lock();
7c673cae
FG
4193 SyncEntryTimeout *sync_entry_timeo =
4194 new SyncEntryTimeout(cct, m_filestore_commit_timeout);
224ce89b
WB
4195 if (!timer.add_event_after(m_filestore_commit_timeout,
4196 sync_entry_timeo)) {
4197 sync_entry_timeo = nullptr;
4198 }
9f95a23c 4199 sync_entry_timeo_lock.unlock();
7c673cae
FG
4200
4201 logger->set(l_filestore_committing, 1);
4202
31f18b77 4203 dout(15) << __FUNC__ << ": committing " << cp << dendl;
7c673cae
FG
4204 stringstream errstream;
4205 if (cct->_conf->filestore_debug_omap_check && !object_map->check(errstream)) {
4206 derr << errstream.str() << dendl;
4207 ceph_abort();
4208 }
4209
4210 if (backend->can_checkpoint()) {
4211 int err = write_op_seq(op_fd, cp);
4212 if (err < 0) {
4213 derr << "Error during write_op_seq: " << cpp_strerror(err) << dendl;
11fdf7f2 4214 ceph_abort_msg("error during write_op_seq");
7c673cae
FG
4215 }
4216
4217 char s[NAME_MAX];
4218 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)cp);
4219 uint64_t cid = 0;
4220 err = backend->create_checkpoint(s, &cid);
4221 if (err < 0) {
4222 int err = errno;
4223 derr << "snap create '" << s << "' got error " << err << dendl;
11fdf7f2 4224 ceph_assert(err == 0);
7c673cae
FG
4225 }
4226
4227 snaps.push_back(cp);
4228 apply_manager.commit_started();
4229 op_tp.unpause();
4230
4231 if (cid > 0) {
4232 dout(20) << " waiting for checkpoint " << cid << " to complete" << dendl;
4233 err = backend->sync_checkpoint(cid);
4234 if (err < 0) {
4235 derr << "ioctl WAIT_SYNC got " << cpp_strerror(err) << dendl;
11fdf7f2 4236 ceph_abort_msg("wait_sync got error");
7c673cae
FG
4237 }
4238 dout(20) << " done waiting for checkpoint " << cid << " to complete" << dendl;
4239 }
224ce89b 4240 } else {
7c673cae
FG
4241 apply_manager.commit_started();
4242 op_tp.unpause();
4243
4244 int err = object_map->sync();
4245 if (err < 0) {
4246 derr << "object_map sync got " << cpp_strerror(err) << dendl;
11fdf7f2 4247 ceph_abort_msg("object_map sync returned error");
7c673cae
FG
4248 }
4249
4250 err = backend->syncfs();
4251 if (err < 0) {
4252 derr << "syncfs got " << cpp_strerror(err) << dendl;
11fdf7f2 4253 ceph_abort_msg("syncfs returned error");
7c673cae
FG
4254 }
4255
4256 err = write_op_seq(op_fd, cp);
4257 if (err < 0) {
4258 derr << "Error during write_op_seq: " << cpp_strerror(err) << dendl;
11fdf7f2 4259 ceph_abort_msg("error during write_op_seq");
7c673cae
FG
4260 }
4261 err = ::fsync(op_fd);
4262 if (err < 0) {
4263 derr << "Error during fsync of op_seq: " << cpp_strerror(err) << dendl;
11fdf7f2 4264 ceph_abort_msg("error during fsync of op_seq");
7c673cae
FG
4265 }
4266 }
4267
9f95a23c
TL
4268 auto done = ceph::real_clock::now();
4269 auto lat = done - start;
4270 auto dur = done - startwait;
31f18b77 4271 dout(10) << __FUNC__ << ": commit took " << lat << ", interval was " << dur << dendl;
224ce89b 4272 utime_t max_pause_lat = logger->tget(l_filestore_sync_pause_max_lat);
9f95a23c 4273 if (max_pause_lat < utime_t{dur - lat}) {
224ce89b
WB
4274 logger->tinc(l_filestore_sync_pause_max_lat, dur - lat);
4275 }
7c673cae
FG
4276
4277 logger->inc(l_filestore_commitcycle);
4278 logger->tinc(l_filestore_commitcycle_latency, lat);
4279 logger->tinc(l_filestore_commitcycle_interval, dur);
4280
4281 apply_manager.commit_finish();
4282 if (!m_disable_wbthrottle) {
4283 wbthrottle.clear();
4284 }
4285
4286 logger->set(l_filestore_committing, 0);
4287
4288 // remove old snaps?
4289 if (backend->can_checkpoint()) {
4290 char s[NAME_MAX];
4291 while (snaps.size() > 2) {
4292 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)snaps.front());
4293 snaps.pop_front();
4294 dout(10) << "removing snap '" << s << "'" << dendl;
4295 int r = backend->destroy_checkpoint(s);
4296 if (r) {
4297 int err = errno;
4298 derr << "unable to destroy snap '" << s << "' got " << cpp_strerror(err) << dendl;
4299 }
4300 }
4301 }
4302
31f18b77 4303 dout(15) << __FUNC__ << ": committed to op_seq " << cp << dendl;
7c673cae 4304
224ce89b 4305 if (sync_entry_timeo) {
9f95a23c 4306 std::lock_guard lock{sync_entry_timeo_lock};
224ce89b
WB
4307 timer.cancel_event(sync_entry_timeo);
4308 }
7c673cae
FG
4309 } else {
4310 op_tp.unpause();
4311 }
4312
9f95a23c 4313 l.lock();
7c673cae
FG
4314 finish_contexts(cct, fin, 0);
4315 fin.clear();
4316 if (!sync_waiters.empty()) {
31f18b77 4317 dout(10) << __FUNC__ << ": more waiters, committing again" << dendl;
7c673cae
FG
4318 goto again;
4319 }
4320 if (!stop && journal && journal->should_commit_now()) {
31f18b77 4321 dout(10) << __FUNC__ << ": journal says we should commit again (probably is/was full)" << dendl;
7c673cae
FG
4322 goto again;
4323 }
4324 }
4325 stop = false;
7c673cae
FG
4326}
4327
7c673cae
FG
4328void FileStore::do_force_sync()
4329{
31f18b77 4330 dout(10) << __FUNC__ << dendl;
9f95a23c 4331 std::lock_guard l{lock};
7c673cae 4332 force_sync = true;
9f95a23c 4333 sync_cond.notify_all();
7c673cae
FG
4334}
4335
4336void FileStore::start_sync(Context *onsafe)
4337{
9f95a23c 4338 std::lock_guard l{lock};
7c673cae 4339 sync_waiters.push_back(onsafe);
9f95a23c 4340 sync_cond.notify_all();
7c673cae 4341 force_sync = true;
31f18b77 4342 dout(10) << __FUNC__ << dendl;
7c673cae
FG
4343}
4344
4345void FileStore::sync()
4346{
9f95a23c
TL
4347 ceph::mutex m = ceph::make_mutex("FileStore::sync");
4348 ceph::condition_variable c;
7c673cae 4349 bool done;
9f95a23c 4350 C_SafeCond *fin = new C_SafeCond(m, c, &done);
7c673cae
FG
4351
4352 start_sync(fin);
4353
9f95a23c
TL
4354 std::unique_lock l{m};
4355 c.wait(l, [&done, this] {
4356 if (!done) {
4357 dout(10) << "sync waiting" << dendl;
4358 }
4359 return done;
4360 });
7c673cae
FG
4361 dout(10) << "sync done" << dendl;
4362}
4363
4364void FileStore::_flush_op_queue()
4365{
31f18b77 4366 dout(10) << __FUNC__ << ": draining op tp" << dendl;
7c673cae 4367 op_wq.drain();
31f18b77 4368 dout(10) << __FUNC__ << ": waiting for apply finisher" << dendl;
7c673cae
FG
4369 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
4370 (*it)->wait_for_empty();
4371 }
4372}
4373
4374/*
4375 * flush - make every queued write readable
4376 */
4377void FileStore::flush()
4378{
31f18b77 4379 dout(10) << __FUNC__ << dendl;
7c673cae
FG
4380
4381 if (cct->_conf->filestore_blackhole) {
4382 // wait forever
9f95a23c
TL
4383 ceph::mutex lock = ceph::make_mutex("FileStore::flush::lock");
4384 ceph::condition_variable cond;
4385 std::unique_lock l{lock};
4386 cond.wait(l, [] {return false;} );
7c673cae
FG
4387 ceph_abort();
4388 }
4389
4390 if (m_filestore_journal_writeahead) {
4391 if (journal)
4392 journal->flush();
31f18b77 4393 dout(10) << __FUNC__ << ": draining ondisk finisher" << dendl;
7c673cae
FG
4394 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
4395 (*it)->wait_for_empty();
4396 }
4397 }
4398
4399 _flush_op_queue();
31f18b77 4400 dout(10) << __FUNC__ << ": complete" << dendl;
7c673cae
FG
4401}
4402
4403/*
4404 * sync_and_flush - make every queued write readable AND committed to disk
4405 */
4406void FileStore::sync_and_flush()
4407{
31f18b77 4408 dout(10) << __FUNC__ << dendl;
7c673cae
FG
4409
4410 if (m_filestore_journal_writeahead) {
4411 if (journal)
4412 journal->flush();
4413 _flush_op_queue();
4414 } else {
4415 // includes m_filestore_journal_parallel
4416 _flush_op_queue();
4417 sync();
4418 }
31f18b77 4419 dout(10) << __FUNC__ << ": done" << dendl;
7c673cae
FG
4420}
4421
4422int FileStore::flush_journal()
4423{
31f18b77 4424 dout(10) << __FUNC__ << dendl;
7c673cae
FG
4425 sync_and_flush();
4426 sync();
4427 return 0;
4428}
4429
4430int FileStore::snapshot(const string& name)
4431{
31f18b77 4432 dout(10) << __FUNC__ << ": " << name << dendl;
7c673cae
FG
4433 sync_and_flush();
4434
4435 if (!backend->can_checkpoint()) {
31f18b77 4436 dout(0) << __FUNC__ << ": " << name << " failed, not supported" << dendl;
7c673cae
FG
4437 return -EOPNOTSUPP;
4438 }
4439
4440 char s[NAME_MAX];
4441 snprintf(s, sizeof(s), CLUSTER_SNAP_ITEM, name.c_str());
4442
11fdf7f2 4443 int r = backend->create_checkpoint(s, nullptr);
7c673cae 4444 if (r) {
31f18b77 4445 derr << __FUNC__ << ": " << name << " failed: " << cpp_strerror(r) << dendl;
7c673cae
FG
4446 }
4447
4448 return r;
4449}
4450
4451// -------------------------------
4452// attributes
4453
4454int FileStore::_fgetattr(int fd, const char *name, bufferptr& bp)
4455{
4456 char val[CHAIN_XATTR_MAX_BLOCK_LEN];
4457 int l = chain_fgetxattr(fd, name, val, sizeof(val));
4458 if (l >= 0) {
f67539c2 4459 bp = ceph::buffer::create(l);
7c673cae
FG
4460 memcpy(bp.c_str(), val, l);
4461 } else if (l == -ERANGE) {
4462 l = chain_fgetxattr(fd, name, 0, 0);
4463 if (l > 0) {
f67539c2 4464 bp = ceph::buffer::create(l);
7c673cae
FG
4465 l = chain_fgetxattr(fd, name, bp.c_str(), l);
4466 }
4467 }
11fdf7f2 4468 ceph_assert(!m_filestore_fail_eio || l != -EIO);
7c673cae
FG
4469 return l;
4470}
4471
4472int FileStore::_fgetattrs(int fd, map<string,bufferptr>& aset)
4473{
4474 // get attr list
4475 char names1[100];
4476 int len = chain_flistxattr(fd, names1, sizeof(names1)-1);
4477 char *names2 = 0;
4478 char *name = 0;
4479 if (len == -ERANGE) {
4480 len = chain_flistxattr(fd, 0, 0);
4481 if (len < 0) {
11fdf7f2 4482 ceph_assert(!m_filestore_fail_eio || len != -EIO);
7c673cae
FG
4483 return len;
4484 }
4485 dout(10) << " -ERANGE, len is " << len << dendl;
4486 names2 = new char[len+1];
4487 len = chain_flistxattr(fd, names2, len);
4488 dout(10) << " -ERANGE, got " << len << dendl;
4489 if (len < 0) {
11fdf7f2 4490 ceph_assert(!m_filestore_fail_eio || len != -EIO);
7c673cae
FG
4491 delete[] names2;
4492 return len;
4493 }
4494 name = names2;
4495 } else if (len < 0) {
11fdf7f2 4496 ceph_assert(!m_filestore_fail_eio || len != -EIO);
7c673cae
FG
4497 return len;
4498 } else {
4499 name = names1;
4500 }
4501 name[len] = 0;
4502
4503 char *end = name + len;
4504 while (name < end) {
4505 char *attrname = name;
4506 if (parse_attrname(&name)) {
4507 if (*name) {
31f18b77 4508 dout(20) << __FUNC__ << ": " << fd << " getting '" << name << "'" << dendl;
7c673cae
FG
4509 int r = _fgetattr(fd, attrname, aset[name]);
4510 if (r < 0) {
4511 delete[] names2;
4512 return r;
4513 }
4514 }
4515 }
4516 name += strlen(name) + 1;
4517 }
4518
4519 delete[] names2;
4520 return 0;
4521}
4522
4523int FileStore::_fsetattrs(int fd, map<string, bufferptr> &aset)
4524{
4525 for (map<string, bufferptr>::iterator p = aset.begin();
4526 p != aset.end();
4527 ++p) {
4528 char n[CHAIN_XATTR_MAX_NAME_LEN];
4529 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4530 const char *val;
4531 if (p->second.length())
4532 val = p->second.c_str();
4533 else
4534 val = "";
4535 // ??? Why do we skip setting all the other attrs if one fails?
4536 int r = chain_fsetxattr(fd, n, val, p->second.length());
4537 if (r < 0) {
31f18b77 4538 derr << __FUNC__ << ": chain_setxattr returned " << r << dendl;
7c673cae
FG
4539 return r;
4540 }
4541 }
4542 return 0;
4543}
4544
4545// debug EIO injection
4546void FileStore::inject_data_error(const ghobject_t &oid) {
9f95a23c 4547 std::lock_guard l{read_error_lock};
31f18b77 4548 dout(10) << __FUNC__ << ": init error on " << oid << dendl;
7c673cae
FG
4549 data_error_set.insert(oid);
4550}
4551void FileStore::inject_mdata_error(const ghobject_t &oid) {
9f95a23c 4552 std::lock_guard l{read_error_lock};
31f18b77 4553 dout(10) << __FUNC__ << ": init error on " << oid << dendl;
7c673cae
FG
4554 mdata_error_set.insert(oid);
4555}
224ce89b 4556
7c673cae 4557void FileStore::debug_obj_on_delete(const ghobject_t &oid) {
9f95a23c 4558 std::lock_guard l{read_error_lock};
31f18b77 4559 dout(10) << __FUNC__ << ": clear error on " << oid << dendl;
7c673cae
FG
4560 data_error_set.erase(oid);
4561 mdata_error_set.erase(oid);
4562}
4563bool FileStore::debug_data_eio(const ghobject_t &oid) {
9f95a23c 4564 std::lock_guard l{read_error_lock};
7c673cae 4565 if (data_error_set.count(oid)) {
31f18b77 4566 dout(10) << __FUNC__ << ": inject error on " << oid << dendl;
7c673cae
FG
4567 return true;
4568 } else {
4569 return false;
4570 }
4571}
4572bool FileStore::debug_mdata_eio(const ghobject_t &oid) {
9f95a23c 4573 std::lock_guard l{read_error_lock};
7c673cae 4574 if (mdata_error_set.count(oid)) {
31f18b77 4575 dout(10) << __FUNC__ << ": inject error on " << oid << dendl;
7c673cae
FG
4576 return true;
4577 } else {
4578 return false;
4579 }
4580}
4581
4582
4583// objects
4584
11fdf7f2 4585int FileStore::getattr(CollectionHandle& ch, const ghobject_t& oid, const char *name, bufferptr &bp)
7c673cae 4586{
11fdf7f2
TL
4587 tracepoint(objectstore, getattr_enter, ch->cid.c_str());
4588 const coll_t& cid = !_need_temp_object_collection(ch->cid, oid) ? ch->cid : ch->cid.get_temp();
31f18b77 4589 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "'" << dendl;
11fdf7f2
TL
4590
4591 auto osr = static_cast<OpSequencer*>(ch.get());
4592 osr->wait_for_apply(oid);
4593
7c673cae
FG
4594 FDRef fd;
4595 int r = lfn_open(cid, oid, false, &fd);
4596 if (r < 0) {
4597 goto out;
4598 }
4599 char n[CHAIN_XATTR_MAX_NAME_LEN];
4600 get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN);
4601 r = _fgetattr(**fd, n, bp);
4602 lfn_close(fd);
4603 if (r == -ENODATA) {
4604 map<string, bufferlist> got;
4605 set<string> to_get;
4606 to_get.insert(string(name));
4607 Index index;
4608 r = get_index(cid, &index);
4609 if (r < 0) {
31f18b77 4610 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
7c673cae
FG
4611 goto out;
4612 }
4613 r = object_map->get_xattrs(oid, to_get, &got);
4614 if (r < 0 && r != -ENOENT) {
31f18b77 4615 dout(10) << __FUNC__ << ": get_xattrs err r =" << r << dendl;
7c673cae
FG
4616 goto out;
4617 }
4618 if (got.empty()) {
31f18b77 4619 dout(10) << __FUNC__ << ": got.size() is 0" << dendl;
7c673cae
FG
4620 return -ENODATA;
4621 }
4622 bp = bufferptr(got.begin()->second.c_str(),
4623 got.begin()->second.length());
4624 r = bp.length();
4625 }
4626 out:
31f18b77 4627 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "' = " << r << dendl;
11fdf7f2 4628 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
4629 if (cct->_conf->filestore_debug_inject_read_err &&
4630 debug_mdata_eio(oid)) {
4631 return -EIO;
4632 } else {
4633 tracepoint(objectstore, getattr_exit, r);
4634 return r < 0 ? r : 0;
4635 }
4636}
4637
11fdf7f2 4638int FileStore::getattrs(CollectionHandle& ch, const ghobject_t& oid, map<string,bufferptr>& aset)
7c673cae 4639{
11fdf7f2
TL
4640 tracepoint(objectstore, getattrs_enter, ch->cid.c_str());
4641 const coll_t& cid = !_need_temp_object_collection(ch->cid, oid) ? ch->cid : ch->cid.get_temp();
7c673cae
FG
4642 set<string> omap_attrs;
4643 map<string, bufferlist> omap_aset;
4644 Index index;
31f18b77 4645 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
11fdf7f2
TL
4646
4647 auto osr = static_cast<OpSequencer*>(ch.get());
4648 osr->wait_for_apply(oid);
4649
7c673cae
FG
4650 FDRef fd;
4651 bool spill_out = true;
4652 char buf[2];
4653
4654 int r = lfn_open(cid, oid, false, &fd);
4655 if (r < 0) {
4656 goto out;
4657 }
4658
4659 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4660 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT)))
4661 spill_out = false;
4662
4663 r = _fgetattrs(**fd, aset);
4664 lfn_close(fd);
4665 fd = FDRef(); // defensive
4666 if (r < 0) {
4667 goto out;
4668 }
4669
4670 if (!spill_out) {
31f18b77 4671 dout(10) << __FUNC__ << ": no xattr exists in object_map r = " << r << dendl;
7c673cae
FG
4672 goto out;
4673 }
4674
4675 r = get_index(cid, &index);
4676 if (r < 0) {
31f18b77 4677 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
7c673cae
FG
4678 goto out;
4679 }
4680 {
4681 r = object_map->get_all_xattrs(oid, &omap_attrs);
4682 if (r < 0 && r != -ENOENT) {
31f18b77 4683 dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
7c673cae
FG
4684 goto out;
4685 }
4686
4687 r = object_map->get_xattrs(oid, omap_attrs, &omap_aset);
4688 if (r < 0 && r != -ENOENT) {
31f18b77 4689 dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
7c673cae
FG
4690 goto out;
4691 }
4692 if (r == -ENOENT)
4693 r = 0;
4694 }
11fdf7f2 4695 ceph_assert(omap_attrs.size() == omap_aset.size());
7c673cae
FG
4696 for (map<string, bufferlist>::iterator i = omap_aset.begin();
4697 i != omap_aset.end();
4698 ++i) {
4699 string key(i->first);
4700 aset.insert(make_pair(key,
4701 bufferptr(i->second.c_str(), i->second.length())));
4702 }
4703 out:
31f18b77 4704 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
11fdf7f2 4705 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
4706
4707 if (cct->_conf->filestore_debug_inject_read_err &&
4708 debug_mdata_eio(oid)) {
4709 return -EIO;
4710 } else {
4711 tracepoint(objectstore, getattrs_exit, r);
4712 return r;
4713 }
4714}
4715
4716int FileStore::_setattrs(const coll_t& cid, const ghobject_t& oid, map<string,bufferptr>& aset,
4717 const SequencerPosition &spos)
4718{
4719 map<string, bufferlist> omap_set;
4720 set<string> omap_remove;
4721 map<string, bufferptr> inline_set;
4722 map<string, bufferptr> inline_to_set;
4723 FDRef fd;
4724 int spill_out = -1;
4725 bool incomplete_inline = false;
4726
4727 int r = lfn_open(cid, oid, false, &fd);
4728 if (r < 0) {
4729 goto out;
4730 }
4731
4732 char buf[2];
4733 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4734 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT)))
4735 spill_out = 0;
4736 else
4737 spill_out = 1;
4738
4739 r = _fgetattrs(**fd, inline_set);
4740 incomplete_inline = (r == -E2BIG);
11fdf7f2 4741 if (r == -EIO && m_filestore_fail_eio) handle_eio();
31f18b77 4742 dout(15) << __FUNC__ << ": " << cid << "/" << oid
7c673cae
FG
4743 << (incomplete_inline ? " (incomplete_inline, forcing omap)" : "")
4744 << dendl;
4745
4746 for (map<string,bufferptr>::iterator p = aset.begin();
4747 p != aset.end();
4748 ++p) {
4749 char n[CHAIN_XATTR_MAX_NAME_LEN];
4750 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4751
4752 if (incomplete_inline) {
4753 chain_fremovexattr(**fd, n); // ignore any error
4754 omap_set[p->first].push_back(p->second);
4755 continue;
4756 }
4757
4758 if (p->second.length() > m_filestore_max_inline_xattr_size) {
4759 if (inline_set.count(p->first)) {
4760 inline_set.erase(p->first);
4761 r = chain_fremovexattr(**fd, n);
4762 if (r < 0)
4763 goto out_close;
4764 }
4765 omap_set[p->first].push_back(p->second);
4766 continue;
4767 }
4768
4769 if (!inline_set.count(p->first) &&
4770 inline_set.size() >= m_filestore_max_inline_xattrs) {
4771 omap_set[p->first].push_back(p->second);
4772 continue;
4773 }
4774 omap_remove.insert(p->first);
4775 inline_set.insert(*p);
4776
4777 inline_to_set.insert(*p);
4778 }
4779
4780 if (spill_out != 1 && !omap_set.empty()) {
4781 chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_SPILL_OUT,
4782 sizeof(XATTR_SPILL_OUT));
4783 }
4784
4785 r = _fsetattrs(**fd, inline_to_set);
4786 if (r < 0)
4787 goto out_close;
4788
4789 if (spill_out && !omap_remove.empty()) {
4790 r = object_map->remove_xattrs(oid, omap_remove, &spos);
4791 if (r < 0 && r != -ENOENT) {
31f18b77 4792 dout(10) << __FUNC__ << ": could not remove_xattrs r = " << r << dendl;
11fdf7f2 4793 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
4794 goto out_close;
4795 } else {
4796 r = 0; // don't confuse the debug output
4797 }
4798 }
4799
4800 if (!omap_set.empty()) {
4801 r = object_map->set_xattrs(oid, omap_set, &spos);
4802 if (r < 0) {
31f18b77 4803 dout(10) << __FUNC__ << ": could not set_xattrs r = " << r << dendl;
11fdf7f2 4804 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
4805 goto out_close;
4806 }
4807 }
4808 out_close:
4809 lfn_close(fd);
4810 out:
31f18b77 4811 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
7c673cae
FG
4812 return r;
4813}
4814
4815
4816int FileStore::_rmattr(const coll_t& cid, const ghobject_t& oid, const char *name,
4817 const SequencerPosition &spos)
4818{
31f18b77 4819 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "'" << dendl;
7c673cae
FG
4820 FDRef fd;
4821 bool spill_out = true;
4822
4823 int r = lfn_open(cid, oid, false, &fd);
4824 if (r < 0) {
4825 goto out;
4826 }
4827
4828 char buf[2];
4829 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4830 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
4831 spill_out = false;
4832 }
4833
4834 char n[CHAIN_XATTR_MAX_NAME_LEN];
4835 get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN);
4836 r = chain_fremovexattr(**fd, n);
4837 if (r == -ENODATA && spill_out) {
4838 Index index;
4839 r = get_index(cid, &index);
4840 if (r < 0) {
31f18b77 4841 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
7c673cae
FG
4842 goto out_close;
4843 }
4844 set<string> to_remove;
4845 to_remove.insert(string(name));
4846 r = object_map->remove_xattrs(oid, to_remove, &spos);
4847 if (r < 0 && r != -ENOENT) {
31f18b77 4848 dout(10) << __FUNC__ << ": could not remove_xattrs index r = " << r << dendl;
11fdf7f2 4849 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
4850 goto out_close;
4851 }
4852 }
4853 out_close:
4854 lfn_close(fd);
4855 out:
31f18b77 4856 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "' = " << r << dendl;
7c673cae
FG
4857 return r;
4858}
4859
4860int FileStore::_rmattrs(const coll_t& cid, const ghobject_t& oid,
4861 const SequencerPosition &spos)
4862{
31f18b77 4863 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
7c673cae
FG
4864
4865 map<string,bufferptr> aset;
4866 FDRef fd;
4867 set<string> omap_attrs;
4868 Index index;
4869 bool spill_out = true;
4870
4871 int r = lfn_open(cid, oid, false, &fd);
4872 if (r < 0) {
4873 goto out;
4874 }
4875
4876 char buf[2];
4877 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4878 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
4879 spill_out = false;
4880 }
4881
4882 r = _fgetattrs(**fd, aset);
4883 if (r >= 0) {
4884 for (map<string,bufferptr>::iterator p = aset.begin(); p != aset.end(); ++p) {
4885 char n[CHAIN_XATTR_MAX_NAME_LEN];
4886 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4887 r = chain_fremovexattr(**fd, n);
4888 if (r < 0) {
31f18b77 4889 dout(10) << __FUNC__ << ": could not remove xattr r = " << r << dendl;
7c673cae
FG
4890 goto out_close;
4891 }
4892 }
4893 }
4894
4895 if (!spill_out) {
31f18b77 4896 dout(10) << __FUNC__ << ": no xattr exists in object_map r = " << r << dendl;
7c673cae
FG
4897 goto out_close;
4898 }
4899
4900 r = get_index(cid, &index);
4901 if (r < 0) {
31f18b77 4902 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
7c673cae
FG
4903 goto out_close;
4904 }
4905 {
4906 r = object_map->get_all_xattrs(oid, &omap_attrs);
4907 if (r < 0 && r != -ENOENT) {
31f18b77 4908 dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
11fdf7f2 4909 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
4910 goto out_close;
4911 }
4912 r = object_map->remove_xattrs(oid, omap_attrs, &spos);
4913 if (r < 0 && r != -ENOENT) {
31f18b77 4914 dout(10) << __FUNC__ << ": could not remove omap_attrs r = " << r << dendl;
7c673cae
FG
4915 goto out_close;
4916 }
4917 if (r == -ENOENT)
4918 r = 0;
4919 chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
4920 sizeof(XATTR_NO_SPILL_OUT));
4921 }
4922
4923 out_close:
4924 lfn_close(fd);
4925 out:
31f18b77 4926 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
7c673cae
FG
4927 return r;
4928}
4929
4930
4931
4932
4933int FileStore::_collection_remove_recursive(const coll_t &cid,
4934 const SequencerPosition &spos)
4935{
4936 struct stat st;
4937 int r = collection_stat(cid, &st);
4938 if (r < 0) {
4939 if (r == -ENOENT)
4940 return 0;
4941 return r;
4942 }
4943
4944 vector<ghobject_t> objects;
4945 ghobject_t max;
4946 while (!max.is_max()) {
4947 r = collection_list(cid, max, ghobject_t::get_max(),
4948 300, &objects, &max);
4949 if (r < 0)
4950 return r;
4951 for (vector<ghobject_t>::iterator i = objects.begin();
4952 i != objects.end();
4953 ++i) {
11fdf7f2 4954 ceph_assert(_check_replay_guard(cid, *i, spos));
7c673cae
FG
4955 r = _remove(cid, *i, spos);
4956 if (r < 0)
4957 return r;
4958 }
4959 objects.clear();
4960 }
4961 return _destroy_collection(cid);
4962}
4963
4964// --------------------------
4965// collections
4966
4967int FileStore::list_collections(vector<coll_t>& ls)
4968{
4969 return list_collections(ls, false);
4970}
4971
4972int FileStore::list_collections(vector<coll_t>& ls, bool include_temp)
4973{
4974 tracepoint(objectstore, list_collections_enter);
31f18b77 4975 dout(10) << __FUNC__ << dendl;
7c673cae
FG
4976
4977 char fn[PATH_MAX];
4978 snprintf(fn, sizeof(fn), "%s/current", basedir.c_str());
4979
4980 int r = 0;
4981 DIR *dir = ::opendir(fn);
4982 if (!dir) {
4983 r = -errno;
4984 derr << "tried opening directory " << fn << ": " << cpp_strerror(-r) << dendl;
11fdf7f2 4985 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
4986 return r;
4987 }
4988
4989 struct dirent *de = nullptr;
b3b6e05e
TL
4990 while (true) {
4991 errno = 0;
4992 de = ::readdir(dir);
4993 if (de == nullptr) {
4994 if (errno != 0) {
4995 r = -errno;
4996 derr << "readdir failed " << fn << ": " << cpp_strerror(-r) << dendl;
4997 if (r == -EIO && m_filestore_fail_eio) handle_eio();
4998 }
4999 break;
5000 }
7c673cae
FG
5001 if (de->d_type == DT_UNKNOWN) {
5002 // d_type not supported (non-ext[234], btrfs), must stat
5003 struct stat sb;
5004 char filename[PATH_MAX];
11fdf7f2
TL
5005 if (int n = snprintf(filename, sizeof(filename), "%s/%s", fn, de->d_name);
5006 n >= static_cast<int>(sizeof(filename))) {
5007 derr << __func__ << " path length overrun: " << n << dendl;
5008 ceph_abort();
5009 }
7c673cae
FG
5010
5011 r = ::stat(filename, &sb);
5012 if (r < 0) {
5013 r = -errno;
5014 derr << "stat on " << filename << ": " << cpp_strerror(-r) << dendl;
11fdf7f2 5015 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
5016 break;
5017 }
5018 if (!S_ISDIR(sb.st_mode)) {
5019 continue;
5020 }
5021 } else if (de->d_type != DT_DIR) {
5022 continue;
5023 }
5024 if (strcmp(de->d_name, "omap") == 0) {
5025 continue;
5026 }
5027 if (de->d_name[0] == '.' &&
5028 (de->d_name[1] == '\0' ||
5029 (de->d_name[1] == '.' &&
5030 de->d_name[2] == '\0')))
5031 continue;
5032 coll_t cid;
5033 if (!cid.parse(de->d_name)) {
5034 derr << "ignoring invalid collection '" << de->d_name << "'" << dendl;
5035 continue;
5036 }
5037 if (!cid.is_temp() || include_temp)
5038 ls.push_back(cid);
5039 }
5040
5041 if (r > 0) {
5042 derr << "trying readdir " << fn << ": " << cpp_strerror(r) << dendl;
5043 r = -r;
5044 }
5045
5046 ::closedir(dir);
11fdf7f2 5047 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
5048 tracepoint(objectstore, list_collections_exit, r);
5049 return r;
5050}
5051
5052int FileStore::collection_stat(const coll_t& c, struct stat *st)
5053{
5054 tracepoint(objectstore, collection_stat_enter, c.c_str());
5055 char fn[PATH_MAX];
5056 get_cdir(c, fn, sizeof(fn));
31f18b77 5057 dout(15) << __FUNC__ << ": " << fn << dendl;
7c673cae
FG
5058 int r = ::stat(fn, st);
5059 if (r < 0)
5060 r = -errno;
31f18b77 5061 dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
11fdf7f2 5062 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
5063 tracepoint(objectstore, collection_stat_exit, r);
5064 return r;
5065}
5066
5067bool FileStore::collection_exists(const coll_t& c)
5068{
5069 tracepoint(objectstore, collection_exists_enter, c.c_str());
5070 struct stat st;
5071 bool ret = collection_stat(c, &st) == 0;
5072 tracepoint(objectstore, collection_exists_exit, ret);
5073 return ret;
5074}
5075
11fdf7f2 5076int FileStore::collection_empty(const coll_t& cid, bool *empty)
7c673cae 5077{
11fdf7f2
TL
5078 tracepoint(objectstore, collection_empty_enter, cid.c_str());
5079 dout(15) << __FUNC__ << ": " << cid << dendl;
7c673cae 5080 Index index;
11fdf7f2 5081 int r = get_index(cid, &index);
7c673cae 5082 if (r < 0) {
31f18b77 5083 derr << __FUNC__ << ": get_index returned: " << cpp_strerror(r)
7c673cae
FG
5084 << dendl;
5085 return r;
5086 }
5087
11fdf7f2 5088 ceph_assert(index.index);
9f95a23c 5089 std::shared_lock l{(index.index)->access_lock};
7c673cae
FG
5090
5091 vector<ghobject_t> ls;
5092 r = index->collection_list_partial(ghobject_t(), ghobject_t::get_max(),
11fdf7f2 5093 1, &ls, nullptr);
7c673cae 5094 if (r < 0) {
31f18b77 5095 derr << __FUNC__ << ": collection_list_partial returned: "
7c673cae 5096 << cpp_strerror(r) << dendl;
11fdf7f2 5097 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
5098 return r;
5099 }
5100 *empty = ls.empty();
5101 tracepoint(objectstore, collection_empty_exit, *empty);
5102 return 0;
5103}
5104
5105int FileStore::_collection_set_bits(const coll_t& c, int bits)
5106{
5107 char fn[PATH_MAX];
5108 get_cdir(c, fn, sizeof(fn));
31f18b77 5109 dout(10) << __FUNC__ << ": " << fn << " " << bits << dendl;
7c673cae
FG
5110 char n[PATH_MAX];
5111 int r;
5112 int32_t v = bits;
91327a77 5113 int fd = ::open(fn, O_RDONLY|O_CLOEXEC);
7c673cae
FG
5114 if (fd < 0) {
5115 r = -errno;
5116 goto out;
5117 }
5118 get_attrname("bits", n, PATH_MAX);
5119 r = chain_fsetxattr(fd, n, (char*)&v, sizeof(v));
5120 VOID_TEMP_FAILURE_RETRY(::close(fd));
5121 out:
31f18b77 5122 dout(10) << __FUNC__ << ": " << fn << " " << bits << " = " << r << dendl;
7c673cae
FG
5123 return r;
5124}
5125
11fdf7f2 5126int FileStore::collection_bits(CollectionHandle& ch)
7c673cae
FG
5127{
5128 char fn[PATH_MAX];
11fdf7f2 5129 get_cdir(ch->cid, fn, sizeof(fn));
31f18b77 5130 dout(15) << __FUNC__ << ": " << fn << dendl;
7c673cae
FG
5131 int r;
5132 char n[PATH_MAX];
5133 int32_t bits;
91327a77 5134 int fd = ::open(fn, O_RDONLY|O_CLOEXEC);
7c673cae
FG
5135 if (fd < 0) {
5136 bits = r = -errno;
5137 goto out;
5138 }
5139 get_attrname("bits", n, PATH_MAX);
5140 r = chain_fgetxattr(fd, n, (char*)&bits, sizeof(bits));
5141 VOID_TEMP_FAILURE_RETRY(::close(fd));
5142 if (r < 0) {
5143 bits = r;
5144 goto out;
5145 }
5146 out:
31f18b77 5147 dout(10) << __FUNC__ << ": " << fn << " = " << bits << dendl;
7c673cae
FG
5148 return bits;
5149}
5150
5151int FileStore::collection_list(const coll_t& c,
5152 const ghobject_t& orig_start,
5153 const ghobject_t& end,
5154 int max,
5155 vector<ghobject_t> *ls, ghobject_t *next)
5156{
5157 ghobject_t start = orig_start;
5158 if (start.is_max())
5159 return 0;
5160
5161 ghobject_t temp_next;
5162 if (!next)
5163 next = &temp_next;
5164 // figure out the pool id. we need this in order to generate a
5165 // meaningful 'next' value.
5166 int64_t pool = -1;
5167 shard_id_t shard;
5168 {
5169 spg_t pgid;
5170 if (c.is_temp(&pgid)) {
5171 pool = -2 - pgid.pool();
5172 shard = pgid.shard;
5173 } else if (c.is_pg(&pgid)) {
5174 pool = pgid.pool();
5175 shard = pgid.shard;
5176 } else if (c.is_meta()) {
5177 pool = -1;
5178 shard = shard_id_t::NO_SHARD;
5179 } else {
5180 // hrm, the caller is test code! we should get kill it off. for now,
5181 // tolerate it.
5182 pool = 0;
5183 shard = shard_id_t::NO_SHARD;
5184 }
31f18b77 5185 dout(20) << __FUNC__ << ": pool is " << pool << " shard is " << shard
7c673cae
FG
5186 << " pgid " << pgid << dendl;
5187 }
5188 ghobject_t sep;
5189 sep.hobj.pool = -1;
5190 sep.set_shard(shard);
5191 if (!c.is_temp() && !c.is_meta()) {
5192 if (start < sep) {
31f18b77 5193 dout(10) << __FUNC__ << ": first checking temp pool" << dendl;
7c673cae
FG
5194 coll_t temp = c.get_temp();
5195 int r = collection_list(temp, start, end, max, ls, next);
5196 if (r < 0)
5197 return r;
5198 if (*next != ghobject_t::get_max())
5199 return r;
5200 start = sep;
31f18b77 5201 dout(10) << __FUNC__ << ": fall through to non-temp collection, start "
7c673cae
FG
5202 << start << dendl;
5203 } else {
31f18b77 5204 dout(10) << __FUNC__ << ": start " << start << " >= sep " << sep << dendl;
7c673cae
FG
5205 }
5206 }
5207
5208 Index index;
5209 int r = get_index(c, &index);
5210 if (r < 0)
5211 return r;
5212
11fdf7f2 5213 ceph_assert(index.index);
9f95a23c 5214 std::shared_lock l{(index.index)->access_lock};
7c673cae
FG
5215
5216 r = index->collection_list_partial(start, end, max, ls, next);
5217
5218 if (r < 0) {
11fdf7f2 5219 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
5220 return r;
5221 }
5222 dout(20) << "objects: " << *ls << dendl;
5223
5224 // HashIndex doesn't know the pool when constructing a 'next' value
11fdf7f2 5225 if (!next->is_max()) {
7c673cae
FG
5226 next->hobj.pool = pool;
5227 next->set_shard(shard);
5228 dout(20) << " next " << *next << dendl;
5229 }
5230
5231 return 0;
5232}
5233
11fdf7f2 5234int FileStore::omap_get(CollectionHandle& ch, const ghobject_t &hoid,
7c673cae
FG
5235 bufferlist *header,
5236 map<string, bufferlist> *out)
5237{
11fdf7f2
TL
5238 tracepoint(objectstore, omap_get_enter, ch->cid.c_str());
5239 const coll_t& c = !_need_temp_object_collection(ch->cid, hoid) ? ch->cid : ch->cid.get_temp();
31f18b77 5240 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
11fdf7f2
TL
5241
5242 auto osr = static_cast<OpSequencer*>(ch.get());
5243 osr->wait_for_apply(hoid);
5244
7c673cae
FG
5245 Index index;
5246 int r = get_index(c, &index);
5247 if (r < 0)
5248 return r;
5249 {
11fdf7f2 5250 ceph_assert(index.index);
9f95a23c 5251 std::shared_lock l{(index.index)->access_lock};
7c673cae
FG
5252 r = lfn_find(hoid, index);
5253 if (r < 0)
5254 return r;
5255 }
5256 r = object_map->get(hoid, header, out);
5257 if (r < 0 && r != -ENOENT) {
11fdf7f2 5258 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
5259 return r;
5260 }
5261 tracepoint(objectstore, omap_get_exit, 0);
5262 return 0;
5263}
5264
5265int FileStore::omap_get_header(
11fdf7f2 5266 CollectionHandle& ch,
7c673cae
FG
5267 const ghobject_t &hoid,
5268 bufferlist *bl,
5269 bool allow_eio)
5270{
11fdf7f2
TL
5271 tracepoint(objectstore, omap_get_header_enter, ch->cid.c_str());
5272 const coll_t& c = !_need_temp_object_collection(ch->cid, hoid) ? ch->cid : ch->cid.get_temp();
31f18b77 5273 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
11fdf7f2
TL
5274
5275 auto osr = static_cast<OpSequencer*>(ch.get());
5276 osr->wait_for_apply(hoid);
5277
7c673cae
FG
5278 Index index;
5279 int r = get_index(c, &index);
5280 if (r < 0)
5281 return r;
5282 {
11fdf7f2 5283 ceph_assert(index.index);
9f95a23c 5284 std::shared_lock l{(index.index)->access_lock};
7c673cae
FG
5285 r = lfn_find(hoid, index);
5286 if (r < 0)
5287 return r;
5288 }
5289 r = object_map->get_header(hoid, bl);
5290 if (r < 0 && r != -ENOENT) {
11fdf7f2 5291 ceph_assert(allow_eio || !m_filestore_fail_eio || r != -EIO);
7c673cae
FG
5292 return r;
5293 }
5294 tracepoint(objectstore, omap_get_header_exit, 0);
5295 return 0;
5296}
5297
11fdf7f2 5298int FileStore::omap_get_keys(CollectionHandle& ch, const ghobject_t &hoid, set<string> *keys)
7c673cae 5299{
11fdf7f2
TL
5300 tracepoint(objectstore, omap_get_keys_enter, ch->cid.c_str());
5301 const coll_t& c = !_need_temp_object_collection(ch->cid, hoid) ? ch->cid : ch->cid.get_temp();
31f18b77 5302 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
11fdf7f2
TL
5303
5304 auto osr = static_cast<OpSequencer*>(ch.get());
5305 osr->wait_for_apply(hoid);
5306
7c673cae
FG
5307 Index index;
5308 int r = get_index(c, &index);
5309 if (r < 0)
5310 return r;
5311 {
11fdf7f2 5312 ceph_assert(index.index);
9f95a23c 5313 std::shared_lock l{(index.index)->access_lock};
7c673cae
FG
5314 r = lfn_find(hoid, index);
5315 if (r < 0)
5316 return r;
5317 }
5318 r = object_map->get_keys(hoid, keys);
5319 if (r < 0 && r != -ENOENT) {
11fdf7f2 5320 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
5321 return r;
5322 }
5323 tracepoint(objectstore, omap_get_keys_exit, 0);
5324 return 0;
5325}
5326
11fdf7f2 5327int FileStore::omap_get_values(CollectionHandle& ch, const ghobject_t &hoid,
7c673cae
FG
5328 const set<string> &keys,
5329 map<string, bufferlist> *out)
5330{
11fdf7f2
TL
5331 tracepoint(objectstore, omap_get_values_enter, ch->cid.c_str());
5332 const coll_t& c = !_need_temp_object_collection(ch->cid, hoid) ? ch->cid : ch->cid.get_temp();
31f18b77 5333 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
11fdf7f2
TL
5334
5335 auto osr = static_cast<OpSequencer*>(ch.get());
5336 osr->wait_for_apply(hoid);
5337
7c673cae
FG
5338 Index index;
5339 const char *where = "()";
5340 int r = get_index(c, &index);
5341 if (r < 0) {
5342 where = " (get_index)";
5343 goto out;
5344 }
5345 {
11fdf7f2 5346 ceph_assert(index.index);
9f95a23c 5347 std::shared_lock l{(index.index)->access_lock};
7c673cae
FG
5348 r = lfn_find(hoid, index);
5349 if (r < 0) {
5350 where = " (lfn_find)";
5351 goto out;
5352 }
5353 }
5354 r = object_map->get_values(hoid, keys, out);
5355 if (r < 0 && r != -ENOENT) {
11fdf7f2 5356 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
5357 where = " (get_values)";
5358 goto out;
5359 }
5360 r = 0;
5361 out:
5362 tracepoint(objectstore, omap_get_values_exit, r);
31f18b77 5363 dout(15) << __FUNC__ << ": " << c << "/" << hoid << " = " << r
7c673cae
FG
5364 << where << dendl;
5365 return r;
5366}
5367
11fdf7f2 5368int FileStore::omap_check_keys(CollectionHandle& ch, const ghobject_t &hoid,
7c673cae
FG
5369 const set<string> &keys,
5370 set<string> *out)
5371{
11fdf7f2
TL
5372 tracepoint(objectstore, omap_check_keys_enter, ch->cid.c_str());
5373 const coll_t& c = !_need_temp_object_collection(ch->cid, hoid) ? ch->cid : ch->cid.get_temp();
31f18b77 5374 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
7c673cae 5375
11fdf7f2
TL
5376 auto osr = static_cast<OpSequencer*>(ch.get());
5377 osr->wait_for_apply(hoid);
5378
7c673cae
FG
5379 Index index;
5380 int r = get_index(c, &index);
5381 if (r < 0)
5382 return r;
5383 {
11fdf7f2 5384 ceph_assert(index.index);
9f95a23c 5385 std::shared_lock l{(index.index)->access_lock};
7c673cae
FG
5386 r = lfn_find(hoid, index);
5387 if (r < 0)
5388 return r;
5389 }
5390 r = object_map->check_keys(hoid, keys, out);
5391 if (r < 0 && r != -ENOENT) {
11fdf7f2 5392 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
5393 return r;
5394 }
5395 tracepoint(objectstore, omap_check_keys_exit, 0);
5396 return 0;
5397}
5398
11fdf7f2
TL
5399ObjectMap::ObjectMapIterator FileStore::get_omap_iterator(
5400 CollectionHandle& ch,
5401 const ghobject_t &oid)
5402{
5403 auto osr = static_cast<OpSequencer*>(ch.get());
5404 osr->wait_for_apply(oid);
5405 return get_omap_iterator(ch->cid, oid);
5406}
5407
7c673cae
FG
5408ObjectMap::ObjectMapIterator FileStore::get_omap_iterator(const coll_t& _c,
5409 const ghobject_t &hoid)
5410{
5411 tracepoint(objectstore, get_omap_iterator, _c.c_str());
5412 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
31f18b77 5413 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
7c673cae
FG
5414 Index index;
5415 int r = get_index(c, &index);
5416 if (r < 0) {
31f18b77 5417 dout(10) << __FUNC__ << ": " << c << "/" << hoid << " = 0 "
7c673cae
FG
5418 << "(get_index failed with " << cpp_strerror(r) << ")" << dendl;
5419 return ObjectMap::ObjectMapIterator();
5420 }
5421 {
11fdf7f2 5422 ceph_assert(index.index);
9f95a23c 5423 std::shared_lock l{(index.index)->access_lock};
7c673cae
FG
5424 r = lfn_find(hoid, index);
5425 if (r < 0) {
31f18b77 5426 dout(10) << __FUNC__ << ": " << c << "/" << hoid << " = 0 "
7c673cae
FG
5427 << "(lfn_find failed with " << cpp_strerror(r) << ")" << dendl;
5428 return ObjectMap::ObjectMapIterator();
5429 }
5430 }
5431 return object_map->get_iterator(hoid);
5432}
5433
5434int FileStore::_collection_hint_expected_num_objs(const coll_t& c, uint32_t pg_num,
5435 uint64_t expected_num_objs,
5436 const SequencerPosition &spos)
5437{
31f18b77 5438 dout(15) << __FUNC__ << ": collection: " << c << " pg number: "
7c673cae
FG
5439 << pg_num << " expected number of objects: " << expected_num_objs << dendl;
5440
5441 bool empty;
5442 int ret = collection_empty(c, &empty);
5443 if (ret < 0)
5444 return ret;
5445 if (!empty && !replaying) {
5446 dout(0) << "Failed to give an expected number of objects hint to collection : "
5447 << c << ", only empty collection can take such type of hint. " << dendl;
5448 return 0;
5449 }
5450
5451 Index index;
5452 ret = get_index(c, &index);
5453 if (ret < 0)
5454 return ret;
5455 // Pre-hash the collection
5456 ret = index->pre_hash_collection(pg_num, expected_num_objs);
5457 dout(10) << "pre_hash_collection " << c << " = " << ret << dendl;
5458 if (ret < 0)
5459 return ret;
5460 _set_replay_guard(c, spos);
5461
5462 return 0;
5463}
5464
5465int FileStore::_create_collection(
5466 const coll_t& c,
5467 int bits,
5468 const SequencerPosition &spos)
5469{
5470 char fn[PATH_MAX];
5471 get_cdir(c, fn, sizeof(fn));
31f18b77 5472 dout(15) << __FUNC__ << ": " << fn << dendl;
7c673cae
FG
5473 int r = ::mkdir(fn, 0755);
5474 if (r < 0)
5475 r = -errno;
5476 if (r == -EEXIST && replaying)
5477 r = 0;
31f18b77 5478 dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
7c673cae
FG
5479
5480 if (r < 0)
5481 return r;
5482 r = init_index(c);
5483 if (r < 0)
5484 return r;
5485 r = _collection_set_bits(c, bits);
5486 if (r < 0)
5487 return r;
5488 // create parallel temp collection, too
5489 if (!c.is_meta() && !c.is_temp()) {
5490 coll_t temp = c.get_temp();
5491 r = _create_collection(temp, 0, spos);
5492 if (r < 0)
5493 return r;
5494 }
5495
5496 _set_replay_guard(c, spos);
5497 return 0;
5498}
5499
5500int FileStore::_destroy_collection(const coll_t& c)
5501{
5502 int r = 0;
5503 char fn[PATH_MAX];
5504 get_cdir(c, fn, sizeof(fn));
31f18b77 5505 dout(15) << __FUNC__ << ": " << fn << dendl;
7c673cae
FG
5506 {
5507 Index from;
5508 r = get_index(c, &from);
5509 if (r < 0)
5510 goto out;
11fdf7f2 5511 ceph_assert(from.index);
9f95a23c 5512 std::unique_lock l{(from.index)->access_lock};
7c673cae
FG
5513
5514 r = from->prep_delete();
5515 if (r < 0)
5516 goto out;
5517 }
5518 r = ::rmdir(fn);
5519 if (r < 0) {
5520 r = -errno;
5521 goto out;
5522 }
5523
5524 out:
5525 // destroy parallel temp collection, too
5526 if (!c.is_meta() && !c.is_temp()) {
5527 coll_t temp = c.get_temp();
5528 int r2 = _destroy_collection(temp);
5529 if (r2 < 0) {
5530 r = r2;
5531 goto out_final;
5532 }
5533 }
5534
5535 out_final:
31f18b77 5536 dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
7c673cae
FG
5537 return r;
5538}
5539
5540
5541int FileStore::_collection_add(const coll_t& c, const coll_t& oldcid, const ghobject_t& o,
5542 const SequencerPosition& spos)
5543{
31f18b77 5544 dout(15) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << o << dendl;
7c673cae
FG
5545
5546 int dstcmp = _check_replay_guard(c, o, spos);
5547 if (dstcmp < 0)
5548 return 0;
5549
5550 // check the src name too; it might have a newer guard, and we don't
5551 // want to clobber it
5552 int srccmp = _check_replay_guard(oldcid, o, spos);
5553 if (srccmp < 0)
5554 return 0;
5555
5556 // open guard on object so we don't any previous operations on the
5557 // new name that will modify the source inode.
5558 FDRef fd;
5559 int r = lfn_open(oldcid, o, 0, &fd);
5560 if (r < 0) {
5561 // the source collection/object does not exist. If we are replaying, we
5562 // should be safe, so just return 0 and move on.
11fdf7f2 5563 ceph_assert(replaying);
31f18b77 5564 dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
7c673cae
FG
5565 << oldcid << "/" << o << " (dne, continue replay) " << dendl;
5566 return 0;
5567 }
5568 if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress"
5569 _set_replay_guard(**fd, spos, &o, true);
5570 }
5571
5572 r = lfn_link(oldcid, c, o, o);
5573 if (replaying && !backend->can_checkpoint() &&
5574 r == -EEXIST) // crashed between link() and set_replay_guard()
5575 r = 0;
5576
5577 _inject_failure();
5578
5579 // close guard on object so we don't do this again
5580 if (r == 0) {
5581 _close_replay_guard(**fd, spos);
5582 }
5583 lfn_close(fd);
5584
31f18b77 5585 dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << o << " = " << r << dendl;
7c673cae
FG
5586 return r;
5587}
5588
5589int FileStore::_collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid,
5590 coll_t c, const ghobject_t& o,
5591 const SequencerPosition& spos,
5592 bool allow_enoent)
5593{
31f18b77 5594 dout(15) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid << dendl;
7c673cae
FG
5595 int r = 0;
5596 int dstcmp, srccmp;
5597
5598 if (replaying) {
5599 /* If the destination collection doesn't exist during replay,
5600 * we need to delete the src object and continue on
5601 */
5602 if (!collection_exists(c))
5603 goto out_rm_src;
5604 }
5605
5606 dstcmp = _check_replay_guard(c, o, spos);
5607 if (dstcmp < 0)
5608 goto out_rm_src;
5609
5610 // check the src name too; it might have a newer guard, and we don't
5611 // want to clobber it
5612 srccmp = _check_replay_guard(oldcid, oldoid, spos);
5613 if (srccmp < 0)
5614 return 0;
5615
5616 {
5617 // open guard on object so we don't any previous operations on the
5618 // new name that will modify the source inode.
5619 FDRef fd;
5620 r = lfn_open(oldcid, oldoid, 0, &fd);
5621 if (r < 0) {
5622 // the source collection/object does not exist. If we are replaying, we
5623 // should be safe, so just return 0 and move on.
5624 if (replaying) {
31f18b77 5625 dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
7c673cae
FG
5626 << oldcid << "/" << oldoid << " (dne, continue replay) " << dendl;
5627 } else if (allow_enoent) {
31f18b77 5628 dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
7c673cae
FG
5629 << oldcid << "/" << oldoid << " (dne, ignoring enoent)"
5630 << dendl;
5631 } else {
11fdf7f2 5632 ceph_abort_msg("ERROR: source must exist");
7c673cae
FG
5633 }
5634
5635 if (!replaying) {
5636 return 0;
5637 }
5638 if (allow_enoent && dstcmp > 0) { // if dstcmp == 0, try_rename was started.
5639 return 0;
5640 }
5641
5642 r = 0; // don't know if object_map was cloned
5643 } else {
5644 if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress"
5645 _set_replay_guard(**fd, spos, &o, true);
5646 }
5647
5648 r = lfn_link(oldcid, c, oldoid, o);
5649 if (replaying && !backend->can_checkpoint() &&
5650 r == -EEXIST) // crashed between link() and set_replay_guard()
5651 r = 0;
5652
5653 lfn_close(fd);
5654 fd = FDRef();
5655
5656 _inject_failure();
5657 }
5658
5659 if (r == 0) {
5660 // the name changed; link the omap content
5661 r = object_map->rename(oldoid, o, &spos);
5662 if (r == -ENOENT)
5663 r = 0;
5664 }
5665
5666 _inject_failure();
5667
5668 if (r == 0)
5669 r = lfn_unlink(oldcid, oldoid, spos, true);
5670
5671 if (r == 0)
5672 r = lfn_open(c, o, 0, &fd);
5673
5674 // close guard on object so we don't do this again
5675 if (r == 0) {
5676 _close_replay_guard(**fd, spos, &o);
5677 lfn_close(fd);
5678 }
5679 }
5680
31f18b77 5681 dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid
7c673cae
FG
5682 << " = " << r << dendl;
5683 return r;
5684
5685 out_rm_src:
5686 // remove source
5687 if (_check_replay_guard(oldcid, oldoid, spos) > 0) {
5688 r = lfn_unlink(oldcid, oldoid, spos, true);
5689 }
5690
31f18b77 5691 dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid
7c673cae
FG
5692 << " = " << r << dendl;
5693 return r;
5694}
5695
5696void FileStore::_inject_failure()
5697{
31f18b77
FG
5698 if (m_filestore_kill_at) {
5699 int final = --m_filestore_kill_at;
5700 dout(5) << __FUNC__ << ": " << (final+1) << " -> " << final << dendl;
7c673cae 5701 if (final == 0) {
31f18b77 5702 derr << __FUNC__ << ": KILLING" << dendl;
7c673cae
FG
5703 cct->_log->flush();
5704 _exit(1);
5705 }
5706 }
5707}
5708
5709int FileStore::_omap_clear(const coll_t& cid, const ghobject_t &hoid,
5710 const SequencerPosition &spos) {
31f18b77 5711 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
7c673cae
FG
5712 Index index;
5713 int r = get_index(cid, &index);
5714 if (r < 0)
5715 return r;
5716 {
11fdf7f2 5717 ceph_assert(index.index);
9f95a23c 5718 std::shared_lock l{(index.index)->access_lock};
7c673cae
FG
5719 r = lfn_find(hoid, index);
5720 if (r < 0)
5721 return r;
5722 }
5723 r = object_map->clear_keys_header(hoid, &spos);
5724 if (r < 0 && r != -ENOENT)
5725 return r;
5726 return 0;
5727}
5728
5729int FileStore::_omap_setkeys(const coll_t& cid, const ghobject_t &hoid,
5730 const map<string, bufferlist> &aset,
5731 const SequencerPosition &spos) {
31f18b77 5732 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
7c673cae
FG
5733 Index index;
5734 int r;
5735 //treat pgmeta as a logical object, skip to check exist
5736 if (hoid.is_pgmeta())
5737 goto skip;
5738
5739 r = get_index(cid, &index);
5740 if (r < 0) {
31f18b77 5741 dout(20) << __FUNC__ << ": get_index got " << cpp_strerror(r) << dendl;
7c673cae
FG
5742 return r;
5743 }
5744 {
11fdf7f2 5745 ceph_assert(index.index);
9f95a23c 5746 std::shared_lock l{(index.index)->access_lock};
7c673cae
FG
5747 r = lfn_find(hoid, index);
5748 if (r < 0) {
31f18b77 5749 dout(20) << __FUNC__ << ": lfn_find got " << cpp_strerror(r) << dendl;
7c673cae
FG
5750 return r;
5751 }
5752 }
5753skip:
11fdf7f2 5754 if (g_conf()->subsys.should_gather<ceph_subsys_filestore, 20>()) {
7c673cae 5755 for (auto& p : aset) {
31f18b77 5756 dout(20) << __FUNC__ << ": set " << p.first << dendl;
7c673cae
FG
5757 }
5758 }
5759 r = object_map->set_keys(hoid, aset, &spos);
31f18b77 5760 dout(20) << __FUNC__ << ": " << cid << "/" << hoid << " = " << r << dendl;
7c673cae
FG
5761 return r;
5762}
5763
5764int FileStore::_omap_rmkeys(const coll_t& cid, const ghobject_t &hoid,
5765 const set<string> &keys,
5766 const SequencerPosition &spos) {
31f18b77 5767 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
7c673cae
FG
5768 Index index;
5769 int r;
5770 //treat pgmeta as a logical object, skip to check exist
5771 if (hoid.is_pgmeta())
5772 goto skip;
5773
5774 r = get_index(cid, &index);
5775 if (r < 0)
5776 return r;
5777 {
11fdf7f2 5778 ceph_assert(index.index);
9f95a23c 5779 std::shared_lock l{(index.index)->access_lock};
7c673cae
FG
5780 r = lfn_find(hoid, index);
5781 if (r < 0)
5782 return r;
5783 }
5784skip:
5785 r = object_map->rm_keys(hoid, keys, &spos);
5786 if (r < 0 && r != -ENOENT)
5787 return r;
5788 return 0;
5789}
5790
5791int FileStore::_omap_rmkeyrange(const coll_t& cid, const ghobject_t &hoid,
5792 const string& first, const string& last,
5793 const SequencerPosition &spos) {
31f18b77 5794 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << " [" << first << "," << last << "]" << dendl;
7c673cae
FG
5795 set<string> keys;
5796 {
5797 ObjectMap::ObjectMapIterator iter = get_omap_iterator(cid, hoid);
5798 if (!iter)
5799 return -ENOENT;
5800 for (iter->lower_bound(first); iter->valid() && iter->key() < last;
5801 iter->next()) {
5802 keys.insert(iter->key());
5803 }
5804 }
5805 return _omap_rmkeys(cid, hoid, keys, spos);
5806}
5807
5808int FileStore::_omap_setheader(const coll_t& cid, const ghobject_t &hoid,
5809 const bufferlist &bl,
5810 const SequencerPosition &spos)
5811{
31f18b77 5812 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
7c673cae
FG
5813 Index index;
5814 int r = get_index(cid, &index);
5815 if (r < 0)
5816 return r;
5817 {
11fdf7f2 5818 ceph_assert(index.index);
9f95a23c 5819 std::shared_lock l{(index.index)->access_lock};
7c673cae
FG
5820 r = lfn_find(hoid, index);
5821 if (r < 0)
5822 return r;
5823 }
5824 return object_map->set_header(hoid, bl, &spos);
5825}
5826
11fdf7f2
TL
5827int FileStore::_merge_collection(const coll_t& cid,
5828 uint32_t bits,
5829 coll_t dest,
5830 const SequencerPosition &spos)
5831{
5832 dout(15) << __FUNC__ << ": " << cid << " " << dest
5833 << " bits " << bits << dendl;
5834 int r = 0;
5835
5836 if (!collection_exists(cid)) {
5837 dout(2) << __FUNC__ << ": " << cid << " DNE" << dendl;
5838 ceph_assert(replaying);
5839 return 0;
5840 }
5841 if (!collection_exists(dest)) {
5842 dout(2) << __FUNC__ << ": " << dest << " DNE" << dendl;
5843 ceph_assert(replaying);
5844 return 0;
5845 }
5846
5847 // set bits
5848 if (_check_replay_guard(cid, spos) > 0)
5849 _collection_set_bits(dest, bits);
5850
5851 spg_t pgid;
5852 bool is_pg = dest.is_pg(&pgid);
5853 ceph_assert(is_pg);
5854
5855 int dstcmp = _check_replay_guard(dest, spos);
5856 if (dstcmp < 0)
5857 return 0;
5858
5859 int srccmp = _check_replay_guard(cid, spos);
5860 if (srccmp < 0)
5861 return 0;
5862
5863 _set_global_replay_guard(cid, spos);
5864 _set_replay_guard(cid, spos, true);
5865 _set_replay_guard(dest, spos, true);
5866
5867 // main collection
5868 {
5869 Index from;
5870 r = get_index(cid, &from);
5871
5872 Index to;
5873 if (!r)
5874 r = get_index(dest, &to);
5875
5876 if (!r) {
5877 ceph_assert(from.index);
9f95a23c 5878 std::unique_lock l1{(from.index)->access_lock};
11fdf7f2
TL
5879
5880 ceph_assert(to.index);
9f95a23c 5881 std::unique_lock l2{(to.index)->access_lock};
11fdf7f2
TL
5882
5883 r = from->merge(bits, to.index);
5884 }
5885 }
5886
5887 // temp too
5888 {
5889 Index from;
5890 r = get_index(cid.get_temp(), &from);
5891
5892 Index to;
5893 if (!r)
5894 r = get_index(dest.get_temp(), &to);
5895
5896 if (!r) {
5897 ceph_assert(from.index);
9f95a23c 5898 std::unique_lock l1{(from.index)->access_lock};
11fdf7f2
TL
5899
5900 ceph_assert(to.index);
9f95a23c 5901 std::unique_lock l2{(to.index)->access_lock};
11fdf7f2
TL
5902
5903 r = from->merge(bits, to.index);
5904 }
5905 }
5906
5907 // remove source
5908 _destroy_collection(cid);
5909
5910 _close_replay_guard(dest, spos);
5911 _close_replay_guard(dest.get_temp(), spos);
5912 // no need to close guards on cid... it's removed.
5913
5914 if (!r && cct->_conf->filestore_debug_verify_split) {
5915 vector<ghobject_t> objects;
5916 ghobject_t next;
5917 while (1) {
5918 collection_list(
5919 dest,
5920 next, ghobject_t::get_max(),
5921 get_ideal_list_max(),
5922 &objects,
5923 &next);
5924 if (objects.empty())
5925 break;
5926 for (vector<ghobject_t>::iterator i = objects.begin();
5927 i != objects.end();
5928 ++i) {
5929 if (!i->match(bits, pgid.pgid.ps())) {
5930 dout(20) << __FUNC__ << ": " << *i << " does not belong in "
5931 << cid << dendl;
5932 ceph_assert(i->match(bits, pgid.pgid.ps()));
5933 }
5934 }
5935 objects.clear();
5936 }
5937 }
5938
5939 dout(15) << __FUNC__ << ": " << cid << " " << dest << " bits " << bits
5940 << " = " << r << dendl;
5941 return r;
5942}
5943
7c673cae
FG
5944int FileStore::_split_collection(const coll_t& cid,
5945 uint32_t bits,
5946 uint32_t rem,
5947 coll_t dest,
5948 const SequencerPosition &spos)
5949{
5950 int r;
5951 {
31f18b77 5952 dout(15) << __FUNC__ << ": " << cid << " bits: " << bits << dendl;
7c673cae 5953 if (!collection_exists(cid)) {
31f18b77 5954 dout(2) << __FUNC__ << ": " << cid << " DNE" << dendl;
11fdf7f2 5955 ceph_assert(replaying);
7c673cae
FG
5956 return 0;
5957 }
5958 if (!collection_exists(dest)) {
31f18b77 5959 dout(2) << __FUNC__ << ": " << dest << " DNE" << dendl;
11fdf7f2 5960 ceph_assert(replaying);
7c673cae
FG
5961 return 0;
5962 }
5963
5964 int dstcmp = _check_replay_guard(dest, spos);
5965 if (dstcmp < 0)
5966 return 0;
5967
5968 int srccmp = _check_replay_guard(cid, spos);
5969 if (srccmp < 0)
5970 return 0;
5971
5972 _set_global_replay_guard(cid, spos);
5973 _set_replay_guard(cid, spos, true);
5974 _set_replay_guard(dest, spos, true);
5975
5976 Index from;
5977 r = get_index(cid, &from);
5978
5979 Index to;
5980 if (!r)
5981 r = get_index(dest, &to);
5982
5983 if (!r) {
11fdf7f2 5984 ceph_assert(from.index);
9f95a23c 5985 std::unique_lock l1{(from.index)->access_lock};
7c673cae 5986
11fdf7f2 5987 ceph_assert(to.index);
9f95a23c 5988 std::unique_lock l2{(to.index)->access_lock};
7c673cae
FG
5989
5990 r = from->split(rem, bits, to.index);
5991 }
5992
5993 _close_replay_guard(cid, spos);
5994 _close_replay_guard(dest, spos);
5995 }
5996 _collection_set_bits(cid, bits);
5997 if (!r && cct->_conf->filestore_debug_verify_split) {
5998 vector<ghobject_t> objects;
5999 ghobject_t next;
6000 while (1) {
6001 collection_list(
6002 cid,
6003 next, ghobject_t::get_max(),
6004 get_ideal_list_max(),
6005 &objects,
6006 &next);
6007 if (objects.empty())
6008 break;
6009 for (vector<ghobject_t>::iterator i = objects.begin();
6010 i != objects.end();
6011 ++i) {
31f18b77 6012 dout(20) << __FUNC__ << ": " << *i << " still in source "
7c673cae 6013 << cid << dendl;
11fdf7f2 6014 ceph_assert(!i->match(bits, rem));
7c673cae
FG
6015 }
6016 objects.clear();
6017 }
6018 next = ghobject_t();
6019 while (1) {
6020 collection_list(
6021 dest,
6022 next, ghobject_t::get_max(),
6023 get_ideal_list_max(),
6024 &objects,
6025 &next);
6026 if (objects.empty())
6027 break;
6028 for (vector<ghobject_t>::iterator i = objects.begin();
6029 i != objects.end();
6030 ++i) {
31f18b77 6031 dout(20) << __FUNC__ << ": " << *i << " now in dest "
7c673cae 6032 << *i << dendl;
11fdf7f2 6033 ceph_assert(i->match(bits, rem));
7c673cae
FG
6034 }
6035 objects.clear();
6036 }
6037 }
6038 return r;
6039}
6040
6041int FileStore::_set_alloc_hint(const coll_t& cid, const ghobject_t& oid,
6042 uint64_t expected_object_size,
6043 uint64_t expected_write_size)
6044{
31f18b77 6045 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " object_size " << expected_object_size << " write_size " << expected_write_size << dendl;
7c673cae
FG
6046
6047 FDRef fd;
6048 int ret = 0;
6049
6050 if (expected_object_size == 0 || expected_write_size == 0)
6051 goto out;
6052
6053 ret = lfn_open(cid, oid, false, &fd);
6054 if (ret < 0)
6055 goto out;
6056
6057 {
6058 // TODO: a more elaborate hint calculation
11fdf7f2 6059 uint64_t hint = std::min<uint64_t>(expected_write_size, m_filestore_max_alloc_hint_size);
7c673cae
FG
6060
6061 ret = backend->set_alloc_hint(**fd, hint);
31f18b77 6062 dout(20) << __FUNC__ << ": hint " << hint << " ret " << ret << dendl;
7c673cae
FG
6063 }
6064
6065 lfn_close(fd);
6066out:
31f18b77 6067 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " object_size " << expected_object_size << " write_size " << expected_write_size << " = " << ret << dendl;
11fdf7f2 6068 ceph_assert(!m_filestore_fail_eio || ret != -EIO);
7c673cae
FG
6069 return ret;
6070}
6071
6072const char** FileStore::get_tracked_conf_keys() const
6073{
6074 static const char* KEYS[] = {
6075 "filestore_max_inline_xattr_size",
6076 "filestore_max_inline_xattr_size_xfs",
6077 "filestore_max_inline_xattr_size_btrfs",
6078 "filestore_max_inline_xattr_size_other",
6079 "filestore_max_inline_xattrs",
6080 "filestore_max_inline_xattrs_xfs",
6081 "filestore_max_inline_xattrs_btrfs",
6082 "filestore_max_inline_xattrs_other",
6083 "filestore_max_xattr_value_size",
6084 "filestore_max_xattr_value_size_xfs",
6085 "filestore_max_xattr_value_size_btrfs",
6086 "filestore_max_xattr_value_size_other",
6087 "filestore_min_sync_interval",
6088 "filestore_max_sync_interval",
6089 "filestore_queue_max_ops",
6090 "filestore_queue_max_bytes",
6091 "filestore_expected_throughput_bytes",
6092 "filestore_expected_throughput_ops",
6093 "filestore_queue_low_threshhold",
6094 "filestore_queue_high_threshhold",
6095 "filestore_queue_high_delay_multiple",
6096 "filestore_queue_max_delay_multiple",
6097 "filestore_commit_timeout",
6098 "filestore_dump_file",
6099 "filestore_kill_at",
6100 "filestore_fail_eio",
6101 "filestore_fadvise",
6102 "filestore_sloppy_crc",
6103 "filestore_sloppy_crc_block_size",
6104 "filestore_max_alloc_hint_size",
6105 NULL
6106 };
6107 return KEYS;
6108}
6109
11fdf7f2 6110void FileStore::handle_conf_change(const ConfigProxy& conf,
7c673cae
FG
6111 const std::set <std::string> &changed)
6112{
6113 if (changed.count("filestore_max_inline_xattr_size") ||
6114 changed.count("filestore_max_inline_xattr_size_xfs") ||
6115 changed.count("filestore_max_inline_xattr_size_btrfs") ||
6116 changed.count("filestore_max_inline_xattr_size_other") ||
6117 changed.count("filestore_max_inline_xattrs") ||
6118 changed.count("filestore_max_inline_xattrs_xfs") ||
6119 changed.count("filestore_max_inline_xattrs_btrfs") ||
6120 changed.count("filestore_max_inline_xattrs_other") ||
6121 changed.count("filestore_max_xattr_value_size") ||
6122 changed.count("filestore_max_xattr_value_size_xfs") ||
6123 changed.count("filestore_max_xattr_value_size_btrfs") ||
6124 changed.count("filestore_max_xattr_value_size_other")) {
6125 if (backend) {
9f95a23c 6126 std::lock_guard l(lock);
7c673cae
FG
6127 set_xattr_limits_via_conf();
6128 }
6129 }
6130
6131 if (changed.count("filestore_queue_max_bytes") ||
6132 changed.count("filestore_queue_max_ops") ||
6133 changed.count("filestore_expected_throughput_bytes") ||
6134 changed.count("filestore_expected_throughput_ops") ||
6135 changed.count("filestore_queue_low_threshhold") ||
6136 changed.count("filestore_queue_high_threshhold") ||
6137 changed.count("filestore_queue_high_delay_multiple") ||
6138 changed.count("filestore_queue_max_delay_multiple")) {
9f95a23c 6139 std::lock_guard l(lock);
7c673cae
FG
6140 set_throttle_params();
6141 }
6142
6143 if (changed.count("filestore_min_sync_interval") ||
6144 changed.count("filestore_max_sync_interval") ||
6145 changed.count("filestore_kill_at") ||
6146 changed.count("filestore_fail_eio") ||
6147 changed.count("filestore_sloppy_crc") ||
6148 changed.count("filestore_sloppy_crc_block_size") ||
6149 changed.count("filestore_max_alloc_hint_size") ||
6150 changed.count("filestore_fadvise")) {
9f95a23c 6151 std::lock_guard l(lock);
7c673cae
FG
6152 m_filestore_min_sync_interval = conf->filestore_min_sync_interval;
6153 m_filestore_max_sync_interval = conf->filestore_max_sync_interval;
31f18b77 6154 m_filestore_kill_at = conf->filestore_kill_at;
7c673cae
FG
6155 m_filestore_fail_eio = conf->filestore_fail_eio;
6156 m_filestore_fadvise = conf->filestore_fadvise;
6157 m_filestore_sloppy_crc = conf->filestore_sloppy_crc;
6158 m_filestore_sloppy_crc_block_size = conf->filestore_sloppy_crc_block_size;
6159 m_filestore_max_alloc_hint_size = conf->filestore_max_alloc_hint_size;
6160 }
6161 if (changed.count("filestore_commit_timeout")) {
9f95a23c 6162 std::lock_guard l(sync_entry_timeo_lock);
7c673cae
FG
6163 m_filestore_commit_timeout = conf->filestore_commit_timeout;
6164 }
6165 if (changed.count("filestore_dump_file")) {
6166 if (conf->filestore_dump_file.length() &&
6167 conf->filestore_dump_file != "-") {
6168 dump_start(conf->filestore_dump_file);
6169 } else {
6170 dump_stop();
6171 }
6172 }
6173}
6174
6175int FileStore::set_throttle_params()
6176{
6177 stringstream ss;
6178 bool valid = throttle_bytes.set_params(
6179 cct->_conf->filestore_queue_low_threshhold,
6180 cct->_conf->filestore_queue_high_threshhold,
6181 cct->_conf->filestore_expected_throughput_bytes,
11fdf7f2
TL
6182 cct->_conf->filestore_queue_high_delay_multiple?
6183 cct->_conf->filestore_queue_high_delay_multiple:
6184 cct->_conf->filestore_queue_high_delay_multiple_bytes,
6185 cct->_conf->filestore_queue_max_delay_multiple?
6186 cct->_conf->filestore_queue_max_delay_multiple:
6187 cct->_conf->filestore_queue_max_delay_multiple_bytes,
7c673cae
FG
6188 cct->_conf->filestore_queue_max_bytes,
6189 &ss);
6190
6191 valid &= throttle_ops.set_params(
6192 cct->_conf->filestore_queue_low_threshhold,
6193 cct->_conf->filestore_queue_high_threshhold,
6194 cct->_conf->filestore_expected_throughput_ops,
11fdf7f2
TL
6195 cct->_conf->filestore_queue_high_delay_multiple?
6196 cct->_conf->filestore_queue_high_delay_multiple:
6197 cct->_conf->filestore_queue_high_delay_multiple_ops,
6198 cct->_conf->filestore_queue_max_delay_multiple?
6199 cct->_conf->filestore_queue_max_delay_multiple:
6200 cct->_conf->filestore_queue_max_delay_multiple_ops,
7c673cae
FG
6201 cct->_conf->filestore_queue_max_ops,
6202 &ss);
6203
6204 logger->set(l_filestore_op_queue_max_ops, throttle_ops.get_max());
6205 logger->set(l_filestore_op_queue_max_bytes, throttle_bytes.get_max());
6206
6207 if (!valid) {
6208 derr << "tried to set invalid params: "
6209 << ss.str()
6210 << dendl;
6211 }
6212 return valid ? 0 : -EINVAL;
6213}
6214
6215void FileStore::dump_start(const std::string& file)
6216{
31f18b77 6217 dout(10) << __FUNC__ << ": " << file << dendl;
7c673cae
FG
6218 if (m_filestore_do_dump) {
6219 dump_stop();
6220 }
6221 m_filestore_dump_fmt.reset();
6222 m_filestore_dump_fmt.open_array_section("dump");
6223 m_filestore_dump.open(file.c_str());
6224 m_filestore_do_dump = true;
6225}
6226
6227void FileStore::dump_stop()
6228{
31f18b77 6229 dout(10) << __FUNC__ << dendl;
7c673cae
FG
6230 m_filestore_do_dump = false;
6231 if (m_filestore_dump.is_open()) {
6232 m_filestore_dump_fmt.close_section();
6233 m_filestore_dump_fmt.flush(m_filestore_dump);
6234 m_filestore_dump.flush();
6235 m_filestore_dump.close();
6236 }
6237}
6238
6239void FileStore::dump_transactions(vector<ObjectStore::Transaction>& ls, uint64_t seq, OpSequencer *osr)
6240{
6241 m_filestore_dump_fmt.open_array_section("transactions");
6242 unsigned trans_num = 0;
6243 for (vector<ObjectStore::Transaction>::iterator i = ls.begin(); i != ls.end(); ++i, ++trans_num) {
6244 m_filestore_dump_fmt.open_object_section("transaction");
11fdf7f2 6245 m_filestore_dump_fmt.dump_stream("osr") << osr->cid;
7c673cae
FG
6246 m_filestore_dump_fmt.dump_unsigned("seq", seq);
6247 m_filestore_dump_fmt.dump_unsigned("trans_num", trans_num);
6248 (*i).dump(&m_filestore_dump_fmt);
6249 m_filestore_dump_fmt.close_section();
6250 }
6251 m_filestore_dump_fmt.close_section();
6252 m_filestore_dump_fmt.flush(m_filestore_dump);
6253 m_filestore_dump.flush();
6254}
6255
11fdf7f2
TL
6256void FileStore::get_db_statistics(Formatter* f)
6257{
6258 object_map->db->get_statistics(f);
6259}
6260
7c673cae
FG
6261void FileStore::set_xattr_limits_via_conf()
6262{
6263 uint32_t fs_xattr_size;
6264 uint32_t fs_xattrs;
6265 uint32_t fs_xattr_max_value_size;
6266
6267 switch (m_fs_type) {
6268#if defined(__linux__)
6269 case XFS_SUPER_MAGIC:
6270 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_xfs;
6271 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_xfs;
6272 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_xfs;
6273 break;
6274 case BTRFS_SUPER_MAGIC:
6275 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_btrfs;
6276 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_btrfs;
6277 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_btrfs;
6278 break;
6279#endif
6280 default:
6281 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_other;
6282 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_other;
6283 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_other;
6284 break;
6285 }
6286
6287 // Use override value if set
6288 if (cct->_conf->filestore_max_inline_xattr_size)
6289 m_filestore_max_inline_xattr_size = cct->_conf->filestore_max_inline_xattr_size;
6290 else
6291 m_filestore_max_inline_xattr_size = fs_xattr_size;
6292
6293 // Use override value if set
6294 if (cct->_conf->filestore_max_inline_xattrs)
6295 m_filestore_max_inline_xattrs = cct->_conf->filestore_max_inline_xattrs;
6296 else
6297 m_filestore_max_inline_xattrs = fs_xattrs;
6298
6299 // Use override value if set
6300 if (cct->_conf->filestore_max_xattr_value_size)
6301 m_filestore_max_xattr_value_size = cct->_conf->filestore_max_xattr_value_size;
6302 else
6303 m_filestore_max_xattr_value_size = fs_xattr_max_value_size;
6304
6305 if (m_filestore_max_xattr_value_size < cct->_conf->osd_max_object_name_len) {
6306 derr << "WARNING: max attr value size ("
6307 << m_filestore_max_xattr_value_size
6308 << ") is smaller than osd_max_object_name_len ("
6309 << cct->_conf->osd_max_object_name_len
6310 << "). Your backend filesystem appears to not support attrs large "
6311 << "enough to handle the configured max rados name size. You may get "
6312 << "unexpected ENAMETOOLONG errors on rados operations or buggy "
6313 << "behavior"
6314 << dendl;
6315 }
6316}
6317
6318uint64_t FileStore::estimate_objects_overhead(uint64_t num_objects)
6319{
6320 uint64_t res = num_objects * blk_size / 2; //assumes that each object uses ( in average ) additional 1/2 block due to FS allocation granularity.
6321 return res;
6322}
6323
1adf2230 6324int FileStore::apply_layout_settings(const coll_t &cid, int target_level)
7c673cae 6325{
1adf2230
AA
6326 dout(20) << __FUNC__ << ": " << cid << " target level: "
6327 << target_level << dendl;
7c673cae
FG
6328 Index index;
6329 int r = get_index(cid, &index);
6330 if (r < 0) {
6331 dout(10) << "Error getting index for " << cid << ": " << cpp_strerror(r)
6332 << dendl;
6333 return r;
6334 }
6335
1adf2230 6336 return index->apply_layout_settings(target_level);
7c673cae
FG
6337}
6338
6339
6340// -- FSSuperblock --
6341
6342void FSSuperblock::encode(bufferlist &bl) const
6343{
6344 ENCODE_START(2, 1, bl);
6345 compat_features.encode(bl);
11fdf7f2 6346 encode(omap_backend, bl);
7c673cae
FG
6347 ENCODE_FINISH(bl);
6348}
6349
11fdf7f2 6350void FSSuperblock::decode(bufferlist::const_iterator &bl)
7c673cae
FG
6351{
6352 DECODE_START(2, bl);
6353 compat_features.decode(bl);
6354 if (struct_v >= 2)
11fdf7f2 6355 decode(omap_backend, bl);
7c673cae
FG
6356 else
6357 omap_backend = "leveldb";
6358 DECODE_FINISH(bl);
6359}
6360
6361void FSSuperblock::dump(Formatter *f) const
6362{
6363 f->open_object_section("compat");
6364 compat_features.dump(f);
6365 f->dump_string("omap_backend", omap_backend);
6366 f->close_section();
6367}
6368
6369void FSSuperblock::generate_test_instances(list<FSSuperblock*>& o)
6370{
6371 FSSuperblock z;
6372 o.push_back(new FSSuperblock(z));
6373 CompatSet::FeatureSet feature_compat;
6374 CompatSet::FeatureSet feature_ro_compat;
6375 CompatSet::FeatureSet feature_incompat;
6376 feature_incompat.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS);
6377 z.compat_features = CompatSet(feature_compat, feature_ro_compat,
6378 feature_incompat);
6379 o.push_back(new FSSuperblock(z));
6380 z.omap_backend = "rocksdb";
6381 o.push_back(new FSSuperblock(z));
6382}
11fdf7f2
TL
6383
6384#undef dout_prefix
6385#define dout_prefix *_dout << "filestore.osr(" << this << ") "
6386
6387void FileStore::OpSequencer::_register_apply(Op *o)
6388{
6389 if (o->registered_apply) {
6390 dout(20) << __func__ << " " << o << " already registered" << dendl;
6391 return;
6392 }
6393 o->registered_apply = true;
6394 for (auto& t : o->tls) {
6395 for (auto& i : t.get_object_index()) {
6396 uint32_t key = i.first.hobj.get_hash();
6397 applying.emplace(make_pair(key, &i.first));
6398 dout(20) << __func__ << " " << o << " " << i.first << " ("
6399 << &i.first << ")" << dendl;
6400 }
6401 }
6402}
6403
6404void FileStore::OpSequencer::_unregister_apply(Op *o)
6405{
6406 ceph_assert(o->registered_apply);
6407 for (auto& t : o->tls) {
6408 for (auto& i : t.get_object_index()) {
6409 uint32_t key = i.first.hobj.get_hash();
6410 auto p = applying.find(key);
6411 bool removed = false;
6412 while (p != applying.end() &&
6413 p->first == key) {
6414 if (p->second == &i.first) {
6415 dout(20) << __func__ << " " << o << " " << i.first << " ("
6416 << &i.first << ")" << dendl;
6417 applying.erase(p);
6418 removed = true;
6419 break;
6420 }
6421 ++p;
6422 }
6423 ceph_assert(removed);
6424 }
6425 }
6426}
6427
6428void FileStore::OpSequencer::wait_for_apply(const ghobject_t& oid)
6429{
9f95a23c 6430 std::unique_lock l{qlock};
11fdf7f2
TL
6431 uint32_t key = oid.hobj.get_hash();
6432retry:
6433 while (true) {
6434 // search all items in hash slot for a matching object
6435 auto p = applying.find(key);
6436 while (p != applying.end() &&
6437 p->first == key) {
6438 if (*p->second == oid) {
6439 dout(20) << __func__ << " " << oid << " waiting on " << p->second
6440 << dendl;
9f95a23c 6441 cond.wait(l);
11fdf7f2
TL
6442 goto retry;
6443 }
6444 ++p;
6445 }
6446 break;
6447 }
6448 dout(20) << __func__ << " " << oid << " done" << dendl;
6449}