]> git.proxmox.com Git - ceph.git/blame - ceph/src/os/filestore/FileStore.cc
import ceph 14.2.5
[ceph.git] / ceph / src / os / filestore / FileStore.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15#include "include/compat.h"
16#include "include/int_types.h"
17#include "boost/tuple/tuple.hpp"
18
19#include <unistd.h>
20#include <stdlib.h>
21#include <sys/types.h>
22#include <sys/stat.h>
23#include <fcntl.h>
24#include <sys/file.h>
25#include <errno.h>
26#include <dirent.h>
27#include <sys/ioctl.h>
28
29#if defined(__linux__)
30#include <linux/fs.h>
11fdf7f2 31#include <linux/falloc.h>
7c673cae
FG
32#endif
33
34#include <iostream>
35#include <map>
36
37#include "include/linux_fiemap.h"
38
39#include "common/xattr.h"
40#include "chain_xattr.h"
41
11fdf7f2 42#if defined(__APPLE__) || defined(__FreeBSD__)
7c673cae
FG
43#include <sys/param.h>
44#include <sys/mount.h>
11fdf7f2 45#endif
7c673cae
FG
46
47
48#include <fstream>
49#include <sstream>
50
51#include "FileStore.h"
52#include "GenericFileStoreBackend.h"
53#include "BtrfsFileStoreBackend.h"
54#include "XfsFileStoreBackend.h"
55#include "ZFSFileStoreBackend.h"
56#include "common/BackTrace.h"
57#include "include/types.h"
58#include "FileJournal.h"
59
60#include "osd/osd_types.h"
61#include "include/color.h"
62#include "include/buffer.h"
63
64#include "common/Timer.h"
65#include "common/debug.h"
66#include "common/errno.h"
67#include "common/run_cmd.h"
68#include "common/safe_io.h"
69#include "common/perf_counters.h"
70#include "common/sync_filesystem.h"
71#include "common/fd.h"
72#include "HashIndex.h"
73#include "DBObjectMap.h"
74#include "kv/KeyValueDB.h"
75
76#include "common/ceph_crypto.h"
77using ceph::crypto::SHA1;
78
11fdf7f2 79#include "include/ceph_assert.h"
7c673cae
FG
80
81#include "common/config.h"
82#include "common/blkdev.h"
83
84#ifdef WITH_LTTNG
85#define TRACEPOINT_DEFINE
86#define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
87#include "tracing/objectstore.h"
88#undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE
89#undef TRACEPOINT_DEFINE
90#else
91#define tracepoint(...)
92#endif
93
94#define dout_context cct
95#define dout_subsys ceph_subsys_filestore
96#undef dout_prefix
97#define dout_prefix *_dout << "filestore(" << basedir << ") "
98
99#define COMMIT_SNAP_ITEM "snap_%llu"
100#define CLUSTER_SNAP_ITEM "clustersnap_%s"
101
102#define REPLAY_GUARD_XATTR "user.cephos.seq"
103#define GLOBAL_REPLAY_GUARD_XATTR "user.cephos.gseq"
104
105// XATTR_SPILL_OUT_NAME as a xattr is used to maintain that indicates whether
106// xattrs spill over into DBObjectMap, if XATTR_SPILL_OUT_NAME exists in file
107// xattrs and the value is "no", it indicates no xattrs in DBObjectMap
108#define XATTR_SPILL_OUT_NAME "user.cephos.spill_out"
109#define XATTR_NO_SPILL_OUT "0"
110#define XATTR_SPILL_OUT "1"
31f18b77 111#define __FUNC__ __func__ << "(" << __LINE__ << ")"
7c673cae
FG
112
113//Initial features in new superblock.
114static CompatSet get_fs_initial_compat_set() {
115 CompatSet::FeatureSet ceph_osd_feature_compat;
116 CompatSet::FeatureSet ceph_osd_feature_ro_compat;
117 CompatSet::FeatureSet ceph_osd_feature_incompat;
118 return CompatSet(ceph_osd_feature_compat, ceph_osd_feature_ro_compat,
119 ceph_osd_feature_incompat);
120}
121
122//Features are added here that this FileStore supports.
123static CompatSet get_fs_supported_compat_set() {
124 CompatSet compat = get_fs_initial_compat_set();
125 //Any features here can be set in code, but not in initial superblock
126 compat.incompat.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS);
127 return compat;
128}
129
130int FileStore::validate_hobject_key(const hobject_t &obj) const
131{
132 unsigned len = LFNIndex::get_max_escaped_name_len(obj);
133 return len > m_filestore_max_xattr_value_size ? -ENAMETOOLONG : 0;
134}
135
136int FileStore::get_block_device_fsid(CephContext* cct, const string& path,
137 uuid_d *fsid)
138{
139 // make sure we don't try to use aio or direct_io (and get annoying
140 // error messages from failing to do so); performance implications
141 // should be irrelevant for this use
142 FileJournal j(cct, *fsid, 0, 0, path.c_str(), false, false);
143 return j.peek_fsid(*fsid);
144}
145
146void FileStore::FSPerfTracker::update_from_perfcounters(
147 PerfCounters &logger)
148{
11fdf7f2
TL
149 os_commit_latency_ns.consume_next(
150 logger.get_tavg_ns(
7c673cae 151 l_filestore_journal_latency));
11fdf7f2
TL
152 os_apply_latency_ns.consume_next(
153 logger.get_tavg_ns(
7c673cae
FG
154 l_filestore_apply_latency));
155}
156
157
158ostream& operator<<(ostream& out, const FileStore::OpSequencer& s)
159{
11fdf7f2 160 return out << "osr(" << s.cid << ")";
7c673cae
FG
161}
162
163int FileStore::get_cdir(const coll_t& cid, char *s, int len)
164{
165 const string &cid_str(cid.to_str());
166 return snprintf(s, len, "%s/current/%s", basedir.c_str(), cid_str.c_str());
167}
168
11fdf7f2
TL
169void FileStore::handle_eio()
170{
171 // don't try to map this back to an offset; too hard since there is
172 // a file system in between. we also don't really know whether this
173 // was a read or a write, since we have so many layers beneath us.
174 // don't even try.
175 note_io_error_event(devname.c_str(), basedir.c_str(), -EIO, 0, 0, 0);
176 ceph_abort_msg("unexpected eio error");
177}
178
7c673cae
FG
179int FileStore::get_index(const coll_t& cid, Index *index)
180{
181 int r = index_manager.get_index(cid, basedir, index);
11fdf7f2 182 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
183 return r;
184}
185
186int FileStore::init_index(const coll_t& cid)
187{
188 char path[PATH_MAX];
189 get_cdir(cid, path, sizeof(path));
190 int r = index_manager.init_index(cid, path, target_version);
11fdf7f2 191 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
192 return r;
193}
194
195int FileStore::lfn_find(const ghobject_t& oid, const Index& index, IndexedPath *path)
196{
197 IndexedPath path2;
198 if (!path)
199 path = &path2;
200 int r, exist;
11fdf7f2 201 ceph_assert(index.index);
7c673cae
FG
202 r = (index.index)->lookup(oid, path, &exist);
203 if (r < 0) {
11fdf7f2 204 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
205 return r;
206 }
207 if (!exist)
208 return -ENOENT;
209 return 0;
210}
211
212int FileStore::lfn_truncate(const coll_t& cid, const ghobject_t& oid, off_t length)
213{
214 FDRef fd;
215 int r = lfn_open(cid, oid, false, &fd);
216 if (r < 0)
217 return r;
218 r = ::ftruncate(**fd, length);
219 if (r < 0)
220 r = -errno;
221 if (r >= 0 && m_filestore_sloppy_crc) {
222 int rc = backend->_crc_update_truncate(**fd, length);
11fdf7f2 223 ceph_assert(rc >= 0);
7c673cae
FG
224 }
225 lfn_close(fd);
11fdf7f2 226 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
227 return r;
228}
229
230int FileStore::lfn_stat(const coll_t& cid, const ghobject_t& oid, struct stat *buf)
231{
232 IndexedPath path;
233 Index index;
234 int r = get_index(cid, &index);
235 if (r < 0)
236 return r;
237
11fdf7f2 238 ceph_assert(index.index);
7c673cae
FG
239 RWLock::RLocker l((index.index)->access_lock);
240
241 r = lfn_find(oid, index, &path);
242 if (r < 0)
243 return r;
244 r = ::stat(path->path(), buf);
245 if (r < 0)
246 r = -errno;
247 return r;
248}
249
250int FileStore::lfn_open(const coll_t& cid,
251 const ghobject_t& oid,
252 bool create,
253 FDRef *outfd,
254 Index *index)
255{
11fdf7f2 256 ceph_assert(outfd);
7c673cae
FG
257 int r = 0;
258 bool need_lock = true;
259 int flags = O_RDWR;
260
261 if (create)
262 flags |= O_CREAT;
263 if (cct->_conf->filestore_odsync_write) {
264 flags |= O_DSYNC;
265 }
266
267 Index index2;
268 if (!index) {
269 index = &index2;
270 }
271 if (!((*index).index)) {
272 r = get_index(cid, index);
273 if (r < 0) {
31f18b77 274 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
7c673cae
FG
275 return r;
276 }
277 } else {
278 need_lock = false;
279 }
280
281 int fd, exist;
11fdf7f2 282 ceph_assert((*index).index);
7c673cae
FG
283 if (need_lock) {
284 ((*index).index)->access_lock.get_write();
285 }
286 if (!replaying) {
287 *outfd = fdcache.lookup(oid);
288 if (*outfd) {
289 if (need_lock) {
290 ((*index).index)->access_lock.put_write();
291 }
292 return 0;
293 }
294 }
295
296
297 IndexedPath path2;
298 IndexedPath *path = &path2;
299
300 r = (*index)->lookup(oid, path, &exist);
301 if (r < 0) {
302 derr << "could not find " << oid << " in index: "
303 << cpp_strerror(-r) << dendl;
304 goto fail;
305 }
306
91327a77 307 r = ::open((*path)->path(), flags|O_CLOEXEC, 0644);
7c673cae
FG
308 if (r < 0) {
309 r = -errno;
310 dout(10) << "error opening file " << (*path)->path() << " with flags="
311 << flags << ": " << cpp_strerror(-r) << dendl;
312 goto fail;
313 }
314 fd = r;
315 if (create && (!exist)) {
316 r = (*index)->created(oid, (*path)->path());
317 if (r < 0) {
318 VOID_TEMP_FAILURE_RETRY(::close(fd));
319 derr << "error creating " << oid << " (" << (*path)->path()
320 << ") in index: " << cpp_strerror(-r) << dendl;
321 goto fail;
322 }
323 r = chain_fsetxattr<true, true>(
324 fd, XATTR_SPILL_OUT_NAME,
325 XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT));
326 if (r < 0) {
327 VOID_TEMP_FAILURE_RETRY(::close(fd));
328 derr << "error setting spillout xattr for oid " << oid << " (" << (*path)->path()
329 << "):" << cpp_strerror(-r) << dendl;
330 goto fail;
331 }
332 }
333
334 if (!replaying) {
335 bool existed;
336 *outfd = fdcache.add(oid, fd, &existed);
337 if (existed) {
338 TEMP_FAILURE_RETRY(::close(fd));
339 }
340 } else {
341 *outfd = std::make_shared<FDCache::FD>(fd);
342 }
343
344 if (need_lock) {
345 ((*index).index)->access_lock.put_write();
346 }
347
348 return 0;
349
350 fail:
351
352 if (need_lock) {
353 ((*index).index)->access_lock.put_write();
354 }
355
11fdf7f2 356 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
357 return r;
358}
359
360void FileStore::lfn_close(FDRef fd)
361{
362}
363
364int FileStore::lfn_link(const coll_t& c, const coll_t& newcid, const ghobject_t& o, const ghobject_t& newoid)
365{
366 Index index_new, index_old;
367 IndexedPath path_new, path_old;
368 int exist;
369 int r;
370 bool index_same = false;
371 if (c < newcid) {
372 r = get_index(newcid, &index_new);
373 if (r < 0)
374 return r;
375 r = get_index(c, &index_old);
376 if (r < 0)
377 return r;
378 } else if (c == newcid) {
379 r = get_index(c, &index_old);
380 if (r < 0)
381 return r;
382 index_new = index_old;
383 index_same = true;
384 } else {
385 r = get_index(c, &index_old);
386 if (r < 0)
387 return r;
388 r = get_index(newcid, &index_new);
389 if (r < 0)
390 return r;
391 }
392
11fdf7f2
TL
393 ceph_assert(index_old.index);
394 ceph_assert(index_new.index);
7c673cae
FG
395
396 if (!index_same) {
397
398 RWLock::RLocker l1((index_old.index)->access_lock);
399
400 r = index_old->lookup(o, &path_old, &exist);
401 if (r < 0) {
11fdf7f2 402 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
403 return r;
404 }
405 if (!exist)
406 return -ENOENT;
407
408 RWLock::WLocker l2((index_new.index)->access_lock);
409
410 r = index_new->lookup(newoid, &path_new, &exist);
411 if (r < 0) {
11fdf7f2 412 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
413 return r;
414 }
415 if (exist)
416 return -EEXIST;
417
31f18b77
FG
418 dout(25) << __FUNC__ << ": path_old: " << path_old << dendl;
419 dout(25) << __FUNC__ << ": path_new: " << path_new << dendl;
7c673cae
FG
420 r = ::link(path_old->path(), path_new->path());
421 if (r < 0)
422 return -errno;
423
424 r = index_new->created(newoid, path_new->path());
425 if (r < 0) {
11fdf7f2 426 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
427 return r;
428 }
429 } else {
430 RWLock::WLocker l1((index_old.index)->access_lock);
431
432 r = index_old->lookup(o, &path_old, &exist);
433 if (r < 0) {
11fdf7f2 434 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
435 return r;
436 }
437 if (!exist)
438 return -ENOENT;
439
440 r = index_new->lookup(newoid, &path_new, &exist);
441 if (r < 0) {
11fdf7f2 442 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
443 return r;
444 }
445 if (exist)
446 return -EEXIST;
447
31f18b77
FG
448 dout(25) << __FUNC__ << ": path_old: " << path_old << dendl;
449 dout(25) << __FUNC__ << ": path_new: " << path_new << dendl;
7c673cae
FG
450 r = ::link(path_old->path(), path_new->path());
451 if (r < 0)
452 return -errno;
453
454 // make sure old fd for unlinked/overwritten file is gone
455 fdcache.clear(newoid);
456
457 r = index_new->created(newoid, path_new->path());
458 if (r < 0) {
11fdf7f2 459 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
460 return r;
461 }
462 }
463 return 0;
464}
465
466int FileStore::lfn_unlink(const coll_t& cid, const ghobject_t& o,
467 const SequencerPosition &spos,
468 bool force_clear_omap)
469{
470 Index index;
471 int r = get_index(cid, &index);
472 if (r < 0) {
31f18b77 473 dout(25) << __FUNC__ << ": get_index failed " << cpp_strerror(r) << dendl;
7c673cae
FG
474 return r;
475 }
476
11fdf7f2 477 ceph_assert(index.index);
7c673cae
FG
478 RWLock::WLocker l((index.index)->access_lock);
479
480 {
481 IndexedPath path;
482 int hardlink;
483 r = index->lookup(o, &path, &hardlink);
484 if (r < 0) {
11fdf7f2 485 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
486 return r;
487 }
488
489 if (!force_clear_omap) {
490 if (hardlink == 0 || hardlink == 1) {
491 force_clear_omap = true;
492 }
493 }
494 if (force_clear_omap) {
31f18b77 495 dout(20) << __FUNC__ << ": clearing omap on " << o
7c673cae
FG
496 << " in cid " << cid << dendl;
497 r = object_map->clear(o, &spos);
498 if (r < 0 && r != -ENOENT) {
31f18b77 499 dout(25) << __FUNC__ << ": omap clear failed " << cpp_strerror(r) << dendl;
11fdf7f2 500 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
501 return r;
502 }
503 if (cct->_conf->filestore_debug_inject_read_err) {
504 debug_obj_on_delete(o);
505 }
506 if (!m_disable_wbthrottle) {
507 wbthrottle.clear_object(o); // should be only non-cache ref
508 }
509 fdcache.clear(o);
510 } else {
511 /* Ensure that replay of this op doesn't result in the object_map
512 * going away.
513 */
514 if (!backend->can_checkpoint())
515 object_map->sync(&o, &spos);
516 }
517 if (hardlink == 0) {
518 if (!m_disable_wbthrottle) {
519 wbthrottle.clear_object(o); // should be only non-cache ref
520 }
521 return 0;
522 }
523 }
524 r = index->unlink(o);
525 if (r < 0) {
31f18b77 526 dout(25) << __FUNC__ << ": index unlink failed " << cpp_strerror(r) << dendl;
7c673cae
FG
527 return r;
528 }
529 return 0;
530}
531
532FileStore::FileStore(CephContext* cct, const std::string &base,
533 const std::string &jdev, osflagbits_t flags,
534 const char *name, bool do_update) :
535 JournalingObjectStore(cct, base),
536 internal_name(name),
537 basedir(base), journalpath(jdev),
538 generic_flags(flags),
539 blk_size(0),
540 fsid_fd(-1), op_fd(-1),
541 basedir_fd(-1), current_fd(-1),
11fdf7f2 542 backend(nullptr),
7c673cae
FG
543 index_manager(cct, do_update),
544 lock("FileStore::lock"),
545 force_sync(false),
546 sync_entry_timeo_lock("FileStore::sync_entry_timeo_lock"),
547 timer(cct, sync_entry_timeo_lock),
548 stop(false), sync_thread(this),
11fdf7f2 549 coll_lock("FileStore::coll_lock"),
7c673cae
FG
550 fdcache(cct),
551 wbthrottle(cct),
552 next_osr_id(0),
553 m_disable_wbthrottle(cct->_conf->filestore_odsync_write ||
554 !cct->_conf->filestore_wbthrottle_enable),
555 throttle_ops(cct, "filestore_ops", cct->_conf->filestore_caller_concurrency),
556 throttle_bytes(cct, "filestore_bytes", cct->_conf->filestore_caller_concurrency),
557 m_ondisk_finisher_num(cct->_conf->filestore_ondisk_finisher_threads),
558 m_apply_finisher_num(cct->_conf->filestore_apply_finisher_threads),
559 op_tp(cct, "FileStore::op_tp", "tp_fstore_op", cct->_conf->filestore_op_threads, "filestore_op_threads"),
560 op_wq(this, cct->_conf->filestore_op_thread_timeout,
561 cct->_conf->filestore_op_thread_suicide_timeout, &op_tp),
11fdf7f2 562 logger(nullptr),
7c673cae
FG
563 trace_endpoint("0.0.0.0", 0, "FileStore"),
564 read_error_lock("FileStore::read_error_lock"),
565 m_filestore_commit_timeout(cct->_conf->filestore_commit_timeout),
566 m_filestore_journal_parallel(cct->_conf->filestore_journal_parallel ),
567 m_filestore_journal_trailing(cct->_conf->filestore_journal_trailing),
568 m_filestore_journal_writeahead(cct->_conf->filestore_journal_writeahead),
569 m_filestore_fiemap_threshold(cct->_conf->filestore_fiemap_threshold),
570 m_filestore_max_sync_interval(cct->_conf->filestore_max_sync_interval),
571 m_filestore_min_sync_interval(cct->_conf->filestore_min_sync_interval),
572 m_filestore_fail_eio(cct->_conf->filestore_fail_eio),
573 m_filestore_fadvise(cct->_conf->filestore_fadvise),
574 do_update(do_update),
575 m_journal_dio(cct->_conf->journal_dio),
576 m_journal_aio(cct->_conf->journal_aio),
577 m_journal_force_aio(cct->_conf->journal_force_aio),
578 m_osd_rollback_to_cluster_snap(cct->_conf->osd_rollback_to_cluster_snap),
579 m_osd_use_stale_snap(cct->_conf->osd_use_stale_snap),
580 m_filestore_do_dump(false),
581 m_filestore_dump_fmt(true),
582 m_filestore_sloppy_crc(cct->_conf->filestore_sloppy_crc),
583 m_filestore_sloppy_crc_block_size(cct->_conf->filestore_sloppy_crc_block_size),
584 m_filestore_max_alloc_hint_size(cct->_conf->filestore_max_alloc_hint_size),
585 m_fs_type(0),
586 m_filestore_max_inline_xattr_size(0),
587 m_filestore_max_inline_xattrs(0),
588 m_filestore_max_xattr_value_size(0)
589{
31f18b77 590 m_filestore_kill_at = cct->_conf->filestore_kill_at;
7c673cae
FG
591 for (int i = 0; i < m_ondisk_finisher_num; ++i) {
592 ostringstream oss;
593 oss << "filestore-ondisk-" << i;
594 Finisher *f = new Finisher(cct, oss.str(), "fn_odsk_fstore");
595 ondisk_finishers.push_back(f);
596 }
597 for (int i = 0; i < m_apply_finisher_num; ++i) {
598 ostringstream oss;
599 oss << "filestore-apply-" << i;
600 Finisher *f = new Finisher(cct, oss.str(), "fn_appl_fstore");
601 apply_finishers.push_back(f);
602 }
603
604 ostringstream oss;
605 oss << basedir << "/current";
606 current_fn = oss.str();
607
608 ostringstream sss;
609 sss << basedir << "/current/commit_op_seq";
610 current_op_seq_fn = sss.str();
611
612 ostringstream omss;
613 if (cct->_conf->filestore_omap_backend_path != "") {
614 omap_dir = cct->_conf->filestore_omap_backend_path;
615 } else {
616 omss << basedir << "/current/omap";
617 omap_dir = omss.str();
618 }
619
620 // initialize logger
621 PerfCountersBuilder plb(cct, internal_name, l_filestore_first, l_filestore_last);
622
623 plb.add_u64(l_filestore_journal_queue_ops, "journal_queue_ops", "Operations in journal queue");
624 plb.add_u64(l_filestore_journal_ops, "journal_ops", "Active journal entries to be applied");
625 plb.add_u64(l_filestore_journal_queue_bytes, "journal_queue_bytes", "Size of journal queue");
626 plb.add_u64(l_filestore_journal_bytes, "journal_bytes", "Active journal operation size to be applied");
28e407b8
AA
627 plb.add_time_avg(l_filestore_journal_latency, "journal_latency", "Average journal queue completing latency",
628 NULL, PerfCountersBuilder::PRIO_USEFUL);
7c673cae
FG
629 plb.add_u64_counter(l_filestore_journal_wr, "journal_wr", "Journal write IOs");
630 plb.add_u64_avg(l_filestore_journal_wr_bytes, "journal_wr_bytes", "Journal data written");
631 plb.add_u64(l_filestore_op_queue_max_ops, "op_queue_max_ops", "Max operations in writing to FS queue");
632 plb.add_u64(l_filestore_op_queue_ops, "op_queue_ops", "Operations in writing to FS queue");
633 plb.add_u64_counter(l_filestore_ops, "ops", "Operations written to store");
634 plb.add_u64(l_filestore_op_queue_max_bytes, "op_queue_max_bytes", "Max data in writing to FS queue");
635 plb.add_u64(l_filestore_op_queue_bytes, "op_queue_bytes", "Size of writing to FS queue");
636 plb.add_u64_counter(l_filestore_bytes, "bytes", "Data written to store");
637 plb.add_time_avg(l_filestore_apply_latency, "apply_latency", "Apply latency");
638 plb.add_u64(l_filestore_committing, "committing", "Is currently committing");
639
640 plb.add_u64_counter(l_filestore_commitcycle, "commitcycle", "Commit cycles");
641 plb.add_time_avg(l_filestore_commitcycle_interval, "commitcycle_interval", "Average interval between commits");
642 plb.add_time_avg(l_filestore_commitcycle_latency, "commitcycle_latency", "Average latency of commit");
643 plb.add_u64_counter(l_filestore_journal_full, "journal_full", "Journal writes while full");
28e407b8
AA
644 plb.add_time_avg(l_filestore_queue_transaction_latency_avg, "queue_transaction_latency_avg",
645 "Store operation queue latency", NULL, PerfCountersBuilder::PRIO_USEFUL);
224ce89b 646 plb.add_time(l_filestore_sync_pause_max_lat, "sync_pause_max_latency", "Max latency of op_wq pause before syncfs");
7c673cae
FG
647
648 logger = plb.create_perf_counters();
649
650 cct->get_perfcounters_collection()->add(logger);
11fdf7f2 651 cct->_conf.add_observer(this);
7c673cae
FG
652
653 superblock.compat_features = get_fs_initial_compat_set();
654}
655
656FileStore::~FileStore()
657{
658 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
659 delete *it;
11fdf7f2 660 *it = nullptr;
7c673cae
FG
661 }
662 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
663 delete *it;
11fdf7f2 664 *it = nullptr;
7c673cae 665 }
11fdf7f2 666 cct->_conf.remove_observer(this);
7c673cae
FG
667 cct->get_perfcounters_collection()->remove(logger);
668
669 if (journal)
11fdf7f2 670 journal->logger = nullptr;
7c673cae 671 delete logger;
11fdf7f2 672 logger = nullptr;
7c673cae
FG
673
674 if (m_filestore_do_dump) {
675 dump_stop();
676 }
677}
678
679static void get_attrname(const char *name, char *buf, int len)
680{
681 snprintf(buf, len, "user.ceph.%s", name);
682}
683
684bool parse_attrname(char **name)
685{
686 if (strncmp(*name, "user.ceph.", 10) == 0) {
687 *name += 10;
688 return true;
689 }
690 return false;
691}
692
693void FileStore::collect_metadata(map<string,string> *pm)
694{
695 char partition_path[PATH_MAX];
696 char dev_node[PATH_MAX];
7c673cae
FG
697
698 (*pm)["filestore_backend"] = backend->get_name();
699 ostringstream ss;
700 ss << "0x" << std::hex << m_fs_type << std::dec;
701 (*pm)["filestore_f_type"] = ss.str();
702
703 if (cct->_conf->filestore_collect_device_partition_information) {
11fdf7f2
TL
704 int rc = 0;
705 BlkDev blkdev(fsid_fd);
706 if (rc = blkdev.partition(partition_path, PATH_MAX); rc) {
7c673cae 707 (*pm)["backend_filestore_partition_path"] = "unknown";
11fdf7f2 708 } else {
7c673cae 709 (*pm)["backend_filestore_partition_path"] = string(partition_path);
11fdf7f2
TL
710 }
711 if (rc = blkdev.wholedisk(dev_node, PATH_MAX); rc) {
7c673cae 712 (*pm)["backend_filestore_dev_node"] = "unknown";
11fdf7f2 713 } else {
7c673cae 714 (*pm)["backend_filestore_dev_node"] = string(dev_node);
11fdf7f2
TL
715 devname = dev_node;
716 }
717 if (rc == 0 && vdo_fd >= 0) {
718 (*pm)["vdo"] = "true";
719 (*pm)["vdo_physical_size"] =
720 stringify(4096 * get_vdo_stat(vdo_fd, "physical_blocks"));
721 }
722 if (journal) {
723 journal->collect_metadata(pm);
724 }
7c673cae
FG
725 }
726}
727
11fdf7f2
TL
728int FileStore::get_devices(set<string> *ls)
729{
730 string dev_node;
731 BlkDev blkdev(fsid_fd);
732 if (int rc = blkdev.wholedisk(&dev_node); rc) {
733 return rc;
734 }
735 get_raw_devices(dev_node, ls);
736 if (journal) {
737 journal->get_devices(ls);
738 }
739 return 0;
740}
741
742int FileStore::statfs(struct store_statfs_t *buf0, osd_alert_list_t* alerts)
7c673cae
FG
743{
744 struct statfs buf;
745 buf0->reset();
11fdf7f2
TL
746 if (alerts) {
747 alerts->clear(); // returns nothing for now
748 }
7c673cae
FG
749 if (::statfs(basedir.c_str(), &buf) < 0) {
750 int r = -errno;
11fdf7f2
TL
751 if (r == -EIO && m_filestore_fail_eio) handle_eio();
752 ceph_assert(r != -ENOENT);
7c673cae
FG
753 return r;
754 }
11fdf7f2
TL
755
756 uint64_t bfree = buf.f_bavail * buf.f_bsize;
757
758 // assume all of leveldb/rocksdb is omap.
759 {
760 map<string,uint64_t> kv_usage;
761 buf0->omap_allocated += object_map->get_db()->get_estimated_size(kv_usage);
762 }
763
764 uint64_t thin_total, thin_avail;
765 if (get_vdo_utilization(vdo_fd, &thin_total, &thin_avail)) {
766 buf0->total = thin_total;
767 bfree = std::min(bfree, thin_avail);
768 buf0->allocated = thin_total - thin_avail;
769 buf0->data_stored = bfree;
770 } else {
771 buf0->total = buf.f_blocks * buf.f_bsize;
772 buf0->allocated = bfree;
773 buf0->data_stored = bfree;
774 }
775 buf0->available = bfree;
776
777 // FIXME: we don't know how to populate buf->internal_metadata; XFS doesn't
778 // tell us what its internal overhead is.
779
7c673cae
FG
780 // Adjust for writes pending in the journal
781 if (journal) {
782 uint64_t estimate = journal->get_journal_size_estimate();
11fdf7f2 783 buf0->internally_reserved = estimate;
7c673cae
FG
784 if (buf0->available > estimate)
785 buf0->available -= estimate;
786 else
787 buf0->available = 0;
788 }
11fdf7f2 789
7c673cae
FG
790 return 0;
791}
792
11fdf7f2
TL
793int FileStore::pool_statfs(uint64_t pool_id, struct store_statfs_t *buf)
794{
795 return -ENOTSUP;
796}
7c673cae
FG
797
798void FileStore::new_journal()
799{
800 if (journalpath.length()) {
801 dout(10) << "open_journal at " << journalpath << dendl;
802 journal = new FileJournal(cct, fsid, &finisher, &sync_cond,
803 journalpath.c_str(),
804 m_journal_dio, m_journal_aio,
805 m_journal_force_aio);
806 if (journal)
807 journal->logger = logger;
808 }
809 return;
810}
811
812int FileStore::dump_journal(ostream& out)
813{
814 int r;
815
816 if (!journalpath.length())
817 return -EINVAL;
818
819 FileJournal *journal = new FileJournal(cct, fsid, &finisher, &sync_cond, journalpath.c_str(), m_journal_dio);
820 r = journal->dump(out);
821 delete journal;
11fdf7f2 822 journal = nullptr;
7c673cae
FG
823 return r;
824}
825
11fdf7f2 826FileStoreBackend *FileStoreBackend::create(unsigned long f_type, FileStore *fs)
7c673cae
FG
827{
828 switch (f_type) {
829#if defined(__linux__)
830 case BTRFS_SUPER_MAGIC:
831 return new BtrfsFileStoreBackend(fs);
832# ifdef HAVE_LIBXFS
833 case XFS_SUPER_MAGIC:
834 return new XfsFileStoreBackend(fs);
835# endif
836#endif
837#ifdef HAVE_LIBZFS
838 case ZFS_SUPER_MAGIC:
839 return new ZFSFileStoreBackend(fs);
840#endif
841 default:
842 return new GenericFileStoreBackend(fs);
843 }
844}
845
11fdf7f2 846void FileStore::create_backend(unsigned long f_type)
7c673cae
FG
847{
848 m_fs_type = f_type;
849
11fdf7f2 850 ceph_assert(!backend);
7c673cae
FG
851 backend = FileStoreBackend::create(f_type, this);
852
853 dout(0) << "backend " << backend->get_name()
854 << " (magic 0x" << std::hex << f_type << std::dec << ")"
855 << dendl;
856
857 switch (f_type) {
858#if defined(__linux__)
859 case BTRFS_SUPER_MAGIC:
860 if (!m_disable_wbthrottle){
861 wbthrottle.set_fs(WBThrottle::BTRFS);
862 }
863 break;
864
865 case XFS_SUPER_MAGIC:
866 // wbthrottle is constructed with fs(WBThrottle::XFS)
867 break;
868#endif
869 }
870
871 set_xattr_limits_via_conf();
872}
873
874int FileStore::mkfs()
875{
876 int ret = 0;
877 char fsid_fn[PATH_MAX];
878 char fsid_str[40];
879 uuid_d old_fsid;
880 uuid_d old_omap_fsid;
881
882 dout(1) << "mkfs in " << basedir << dendl;
91327a77 883 basedir_fd = ::open(basedir.c_str(), O_RDONLY|O_CLOEXEC);
7c673cae
FG
884 if (basedir_fd < 0) {
885 ret = -errno;
224ce89b 886 derr << __FUNC__ << ": failed to open base dir " << basedir << ": " << cpp_strerror(ret) << dendl;
7c673cae
FG
887 return ret;
888 }
889
890 // open+lock fsid
891 snprintf(fsid_fn, sizeof(fsid_fn), "%s/fsid", basedir.c_str());
91327a77 892 fsid_fd = ::open(fsid_fn, O_RDWR|O_CREAT|O_CLOEXEC, 0644);
7c673cae
FG
893 if (fsid_fd < 0) {
894 ret = -errno;
224ce89b 895 derr << __FUNC__ << ": failed to open " << fsid_fn << ": " << cpp_strerror(ret) << dendl;
7c673cae
FG
896 goto close_basedir_fd;
897 }
898
899 if (lock_fsid() < 0) {
900 ret = -EBUSY;
901 goto close_fsid_fd;
902 }
903
904 if (read_fsid(fsid_fd, &old_fsid) < 0 || old_fsid.is_zero()) {
905 if (fsid.is_zero()) {
906 fsid.generate_random();
224ce89b 907 dout(1) << __FUNC__ << ": generated fsid " << fsid << dendl;
7c673cae 908 } else {
224ce89b 909 dout(1) << __FUNC__ << ": using provided fsid " << fsid << dendl;
7c673cae
FG
910 }
911
912 fsid.print(fsid_str);
913 strcat(fsid_str, "\n");
914 ret = ::ftruncate(fsid_fd, 0);
915 if (ret < 0) {
916 ret = -errno;
31f18b77 917 derr << __FUNC__ << ": failed to truncate fsid: "
7c673cae
FG
918 << cpp_strerror(ret) << dendl;
919 goto close_fsid_fd;
920 }
921 ret = safe_write(fsid_fd, fsid_str, strlen(fsid_str));
922 if (ret < 0) {
31f18b77 923 derr << __FUNC__ << ": failed to write fsid: "
7c673cae
FG
924 << cpp_strerror(ret) << dendl;
925 goto close_fsid_fd;
926 }
927 if (::fsync(fsid_fd) < 0) {
928 ret = -errno;
31f18b77 929 derr << __FUNC__ << ": close failed: can't write fsid: "
7c673cae
FG
930 << cpp_strerror(ret) << dendl;
931 goto close_fsid_fd;
932 }
224ce89b 933 dout(10) << __FUNC__ << ": fsid is " << fsid << dendl;
7c673cae
FG
934 } else {
935 if (!fsid.is_zero() && fsid != old_fsid) {
31f18b77 936 derr << __FUNC__ << ": on-disk fsid " << old_fsid << " != provided " << fsid << dendl;
7c673cae
FG
937 ret = -EINVAL;
938 goto close_fsid_fd;
939 }
940 fsid = old_fsid;
31f18b77 941 dout(1) << __FUNC__ << ": fsid is already set to " << fsid << dendl;
7c673cae
FG
942 }
943
944 // version stamp
945 ret = write_version_stamp();
946 if (ret < 0) {
31f18b77 947 derr << __FUNC__ << ": write_version_stamp() failed: "
7c673cae
FG
948 << cpp_strerror(ret) << dendl;
949 goto close_fsid_fd;
950 }
951
952 // superblock
953 superblock.omap_backend = cct->_conf->filestore_omap_backend;
954 ret = write_superblock();
955 if (ret < 0) {
31f18b77 956 derr << __FUNC__ << ": write_superblock() failed: "
7c673cae
FG
957 << cpp_strerror(ret) << dendl;
958 goto close_fsid_fd;
959 }
960
961 struct statfs basefs;
962 ret = ::fstatfs(basedir_fd, &basefs);
963 if (ret < 0) {
964 ret = -errno;
31f18b77 965 derr << __FUNC__ << ": cannot fstatfs basedir "
7c673cae
FG
966 << cpp_strerror(ret) << dendl;
967 goto close_fsid_fd;
968 }
969
224ce89b
WB
970#if defined(__linux__)
971 if (basefs.f_type == BTRFS_SUPER_MAGIC &&
972 !g_ceph_context->check_experimental_feature_enabled("btrfs")) {
973 derr << __FUNC__ << ": deprecated btrfs support is not enabled" << dendl;
974 goto close_fsid_fd;
975 }
976#endif
977
7c673cae
FG
978 create_backend(basefs.f_type);
979
980 ret = backend->create_current();
981 if (ret < 0) {
31f18b77 982 derr << __FUNC__ << ": failed to create current/ " << cpp_strerror(ret) << dendl;
7c673cae
FG
983 goto close_fsid_fd;
984 }
985
986 // write initial op_seq
987 {
988 uint64_t initial_seq = 0;
989 int fd = read_op_seq(&initial_seq);
990 if (fd < 0) {
991 ret = fd;
31f18b77 992 derr << __FUNC__ << ": failed to create " << current_op_seq_fn << ": "
7c673cae
FG
993 << cpp_strerror(ret) << dendl;
994 goto close_fsid_fd;
995 }
996 if (initial_seq == 0) {
997 ret = write_op_seq(fd, 1);
998 if (ret < 0) {
999 VOID_TEMP_FAILURE_RETRY(::close(fd));
31f18b77 1000 derr << __FUNC__ << ": failed to write to " << current_op_seq_fn << ": "
7c673cae
FG
1001 << cpp_strerror(ret) << dendl;
1002 goto close_fsid_fd;
1003 }
1004
1005 if (backend->can_checkpoint()) {
1006 // create snap_1 too
91327a77 1007 current_fd = ::open(current_fn.c_str(), O_RDONLY|O_CLOEXEC);
11fdf7f2 1008 ceph_assert(current_fd >= 0);
7c673cae
FG
1009 char s[NAME_MAX];
1010 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, 1ull);
11fdf7f2 1011 ret = backend->create_checkpoint(s, nullptr);
7c673cae
FG
1012 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
1013 if (ret < 0 && ret != -EEXIST) {
1014 VOID_TEMP_FAILURE_RETRY(::close(fd));
31f18b77 1015 derr << __FUNC__ << ": failed to create snap_1: " << cpp_strerror(ret) << dendl;
7c673cae
FG
1016 goto close_fsid_fd;
1017 }
1018 }
1019 }
1020 VOID_TEMP_FAILURE_RETRY(::close(fd));
1021 }
1022 ret = KeyValueDB::test_init(superblock.omap_backend, omap_dir);
1023 if (ret < 0) {
31f18b77 1024 derr << __FUNC__ << ": failed to create " << cct->_conf->filestore_omap_backend << dendl;
7c673cae
FG
1025 goto close_fsid_fd;
1026 }
1027 // create fsid under omap
1028 // open+lock fsid
1029 int omap_fsid_fd;
1030 char omap_fsid_fn[PATH_MAX];
1031 snprintf(omap_fsid_fn, sizeof(omap_fsid_fn), "%s/osd_uuid", omap_dir.c_str());
91327a77 1032 omap_fsid_fd = ::open(omap_fsid_fn, O_RDWR|O_CREAT|O_CLOEXEC, 0644);
7c673cae
FG
1033 if (omap_fsid_fd < 0) {
1034 ret = -errno;
31f18b77 1035 derr << __FUNC__ << ": failed to open " << omap_fsid_fn << ": " << cpp_strerror(ret) << dendl;
7c673cae
FG
1036 goto close_fsid_fd;
1037 }
1038
1039 if (read_fsid(omap_fsid_fd, &old_omap_fsid) < 0 || old_omap_fsid.is_zero()) {
11fdf7f2 1040 ceph_assert(!fsid.is_zero());
7c673cae
FG
1041 fsid.print(fsid_str);
1042 strcat(fsid_str, "\n");
1043 ret = ::ftruncate(omap_fsid_fd, 0);
1044 if (ret < 0) {
1045 ret = -errno;
31f18b77 1046 derr << __FUNC__ << ": failed to truncate fsid: "
7c673cae
FG
1047 << cpp_strerror(ret) << dendl;
1048 goto close_omap_fsid_fd;
1049 }
1050 ret = safe_write(omap_fsid_fd, fsid_str, strlen(fsid_str));
1051 if (ret < 0) {
31f18b77 1052 derr << __FUNC__ << ": failed to write fsid: "
7c673cae
FG
1053 << cpp_strerror(ret) << dendl;
1054 goto close_omap_fsid_fd;
1055 }
31f18b77 1056 dout(10) << __FUNC__ << ": write success, fsid:" << fsid_str << ", ret:" << ret << dendl;
7c673cae
FG
1057 if (::fsync(omap_fsid_fd) < 0) {
1058 ret = -errno;
31f18b77 1059 derr << __FUNC__ << ": close failed: can't write fsid: "
7c673cae
FG
1060 << cpp_strerror(ret) << dendl;
1061 goto close_omap_fsid_fd;
1062 }
1063 dout(10) << "mkfs omap fsid is " << fsid << dendl;
1064 } else {
1065 if (fsid != old_omap_fsid) {
31f18b77 1066 derr << __FUNC__ << ": " << omap_fsid_fn
7c673cae
FG
1067 << " has existed omap fsid " << old_omap_fsid
1068 << " != expected osd fsid " << fsid
1069 << dendl;
1070 ret = -EINVAL;
1071 goto close_omap_fsid_fd;
1072 }
31f18b77 1073 dout(1) << __FUNC__ << ": omap fsid is already set to " << fsid << dendl;
7c673cae
FG
1074 }
1075
1076 dout(1) << cct->_conf->filestore_omap_backend << " db exists/created" << dendl;
1077
1078 // journal?
1079 ret = mkjournal();
1080 if (ret)
1081 goto close_omap_fsid_fd;
1082
1083 ret = write_meta("type", "filestore");
1084 if (ret)
1085 goto close_omap_fsid_fd;
1086
1087 dout(1) << "mkfs done in " << basedir << dendl;
1088 ret = 0;
1089
1090 close_omap_fsid_fd:
1091 VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd));
1092 close_fsid_fd:
1093 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1094 fsid_fd = -1;
1095 close_basedir_fd:
1096 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1097 delete backend;
11fdf7f2 1098 backend = nullptr;
7c673cae
FG
1099 return ret;
1100}
1101
1102int FileStore::mkjournal()
1103{
1104 // read fsid
1105 int ret;
1106 char fn[PATH_MAX];
1107 snprintf(fn, sizeof(fn), "%s/fsid", basedir.c_str());
91327a77 1108 int fd = ::open(fn, O_RDONLY|O_CLOEXEC, 0644);
7c673cae
FG
1109 if (fd < 0) {
1110 int err = errno;
31f18b77 1111 derr << __FUNC__ << ": open error: " << cpp_strerror(err) << dendl;
7c673cae
FG
1112 return -err;
1113 }
1114 ret = read_fsid(fd, &fsid);
1115 if (ret < 0) {
31f18b77 1116 derr << __FUNC__ << ": read error: " << cpp_strerror(ret) << dendl;
7c673cae
FG
1117 VOID_TEMP_FAILURE_RETRY(::close(fd));
1118 return ret;
1119 }
1120 VOID_TEMP_FAILURE_RETRY(::close(fd));
1121
1122 ret = 0;
1123
1124 new_journal();
1125 if (journal) {
1126 ret = journal->check();
1127 if (ret < 0) {
1128 ret = journal->create();
1129 if (ret)
31f18b77 1130 derr << __FUNC__ << ": error creating journal on " << journalpath
7c673cae
FG
1131 << ": " << cpp_strerror(ret) << dendl;
1132 else
31f18b77 1133 dout(0) << __FUNC__ << ": created journal on " << journalpath << dendl;
7c673cae
FG
1134 }
1135 delete journal;
11fdf7f2 1136 journal = nullptr;
7c673cae
FG
1137 }
1138 return ret;
1139}
1140
1141int FileStore::read_fsid(int fd, uuid_d *uuid)
1142{
1143 char fsid_str[40];
1144 memset(fsid_str, 0, sizeof(fsid_str));
1145 int ret = safe_read(fd, fsid_str, sizeof(fsid_str));
1146 if (ret < 0)
1147 return ret;
1148 if (ret == 8) {
1149 // old 64-bit fsid... mirror it.
1150 *(uint64_t*)&uuid->bytes()[0] = *(uint64_t*)fsid_str;
1151 *(uint64_t*)&uuid->bytes()[8] = *(uint64_t*)fsid_str;
1152 return 0;
1153 }
1154
1155 if (ret > 36)
1156 fsid_str[36] = 0;
1157 else
1158 fsid_str[ret] = 0;
1159 if (!uuid->parse(fsid_str))
1160 return -EINVAL;
1161 return 0;
1162}
1163
1164int FileStore::lock_fsid()
1165{
1166 struct flock l;
1167 memset(&l, 0, sizeof(l));
1168 l.l_type = F_WRLCK;
1169 l.l_whence = SEEK_SET;
1170 l.l_start = 0;
1171 l.l_len = 0;
1172 int r = ::fcntl(fsid_fd, F_SETLK, &l);
1173 if (r < 0) {
1174 int err = errno;
31f18b77 1175 dout(0) << __FUNC__ << ": failed to lock " << basedir << "/fsid, is another ceph-osd still running? "
7c673cae
FG
1176 << cpp_strerror(err) << dendl;
1177 return -err;
1178 }
1179 return 0;
1180}
1181
1182bool FileStore::test_mount_in_use()
1183{
31f18b77 1184 dout(5) << __FUNC__ << ": basedir " << basedir << " journal " << journalpath << dendl;
7c673cae
FG
1185 char fn[PATH_MAX];
1186 snprintf(fn, sizeof(fn), "%s/fsid", basedir.c_str());
1187
1188 // verify fs isn't in use
1189
91327a77 1190 fsid_fd = ::open(fn, O_RDWR|O_CLOEXEC, 0644);
7c673cae
FG
1191 if (fsid_fd < 0)
1192 return 0; // no fsid, ok.
1193 bool inuse = lock_fsid() < 0;
1194 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1195 fsid_fd = -1;
1196 return inuse;
1197}
1198
31f18b77
FG
1199bool FileStore::is_rotational()
1200{
1201 bool rotational;
1202 if (backend) {
1203 rotational = backend->is_rotational();
1204 } else {
91327a77 1205 int fd = ::open(basedir.c_str(), O_RDONLY|O_CLOEXEC);
31f18b77
FG
1206 if (fd < 0)
1207 return true;
1208 struct statfs st;
1209 int r = ::fstatfs(fd, &st);
1210 ::close(fd);
1211 if (r < 0) {
1212 return true;
1213 }
1214 create_backend(st.f_type);
1215 rotational = backend->is_rotational();
1216 delete backend;
11fdf7f2 1217 backend = nullptr;
31f18b77
FG
1218 }
1219 dout(10) << __func__ << " " << (int)rotational << dendl;
1220 return rotational;
1221}
1222
d2e6a577
FG
1223bool FileStore::is_journal_rotational()
1224{
1225 bool journal_rotational;
1226 if (backend) {
1227 journal_rotational = backend->is_journal_rotational();
1228 } else {
91327a77 1229 int fd = ::open(journalpath.c_str(), O_RDONLY|O_CLOEXEC);
d2e6a577
FG
1230 if (fd < 0)
1231 return true;
1232 struct statfs st;
1233 int r = ::fstatfs(fd, &st);
1234 ::close(fd);
1235 if (r < 0) {
1236 return true;
1237 }
1238 create_backend(st.f_type);
1239 journal_rotational = backend->is_journal_rotational();
1240 delete backend;
11fdf7f2 1241 backend = nullptr;
d2e6a577
FG
1242 }
1243 dout(10) << __func__ << " " << (int)journal_rotational << dendl;
1244 return journal_rotational;
1245}
1246
7c673cae
FG
1247int FileStore::_detect_fs()
1248{
1249 struct statfs st;
1250 int r = ::fstatfs(basedir_fd, &st);
1251 if (r < 0)
1252 return -errno;
1253
1254 blk_size = st.f_bsize;
1255
224ce89b
WB
1256#if defined(__linux__)
1257 if (st.f_type == BTRFS_SUPER_MAGIC &&
1258 !g_ceph_context->check_experimental_feature_enabled("btrfs")) {
1259 derr <<__FUNC__ << ": deprecated btrfs support is not enabled" << dendl;
1260 return -EPERM;
1261 }
1262#endif
1263
7c673cae
FG
1264 create_backend(st.f_type);
1265
1266 r = backend->detect_features();
1267 if (r < 0) {
31f18b77 1268 derr << __FUNC__ << ": detect_features error: " << cpp_strerror(r) << dendl;
7c673cae
FG
1269 return r;
1270 }
1271
11fdf7f2
TL
1272 // vdo
1273 {
1274 char dev_node[PATH_MAX];
1275 if (int rc = BlkDev{fsid_fd}.wholedisk(dev_node, PATH_MAX); rc == 0) {
1276 vdo_fd = get_vdo_stats_handle(dev_node, &vdo_name);
1277 if (vdo_fd >= 0) {
1278 dout(0) << __func__ << " VDO volume " << vdo_name << " for " << dev_node
1279 << dendl;
1280 }
1281 }
1282 }
1283
7c673cae
FG
1284 // test xattrs
1285 char fn[PATH_MAX];
1286 int x = rand();
1287 int y = x+1;
1288 snprintf(fn, sizeof(fn), "%s/xattr_test", basedir.c_str());
91327a77 1289 int tmpfd = ::open(fn, O_CREAT|O_WRONLY|O_TRUNC|O_CLOEXEC, 0700);
7c673cae
FG
1290 if (tmpfd < 0) {
1291 int ret = -errno;
31f18b77 1292 derr << __FUNC__ << ": unable to create " << fn << ": " << cpp_strerror(ret) << dendl;
7c673cae
FG
1293 return ret;
1294 }
1295
1296 int ret = chain_fsetxattr(tmpfd, "user.test", &x, sizeof(x));
1297 if (ret >= 0)
1298 ret = chain_fgetxattr(tmpfd, "user.test", &y, sizeof(y));
1299 if ((ret < 0) || (x != y)) {
1300 derr << "Extended attributes don't appear to work. ";
1301 if (ret)
1302 *_dout << "Got error " + cpp_strerror(ret) + ". ";
1303 *_dout << "If you are using ext3 or ext4, be sure to mount the underlying "
1304 << "file system with the 'user_xattr' option." << dendl;
1305 ::unlink(fn);
1306 VOID_TEMP_FAILURE_RETRY(::close(tmpfd));
1307 return -ENOTSUP;
1308 }
1309
1310 char buf[1000];
1311 memset(buf, 0, sizeof(buf)); // shut up valgrind
1312 chain_fsetxattr(tmpfd, "user.test", &buf, sizeof(buf));
1313 chain_fsetxattr(tmpfd, "user.test2", &buf, sizeof(buf));
1314 chain_fsetxattr(tmpfd, "user.test3", &buf, sizeof(buf));
1315 chain_fsetxattr(tmpfd, "user.test4", &buf, sizeof(buf));
1316 ret = chain_fsetxattr(tmpfd, "user.test5", &buf, sizeof(buf));
1317 if (ret == -ENOSPC) {
1318 dout(0) << "limited size xattrs" << dendl;
1319 }
1320 chain_fremovexattr(tmpfd, "user.test");
1321 chain_fremovexattr(tmpfd, "user.test2");
1322 chain_fremovexattr(tmpfd, "user.test3");
1323 chain_fremovexattr(tmpfd, "user.test4");
1324 chain_fremovexattr(tmpfd, "user.test5");
1325
1326 ::unlink(fn);
1327 VOID_TEMP_FAILURE_RETRY(::close(tmpfd));
1328
1329 return 0;
1330}
1331
1332int FileStore::_sanity_check_fs()
1333{
1334 // sanity check(s)
1335
1336 if (((int)m_filestore_journal_writeahead +
1337 (int)m_filestore_journal_parallel +
1338 (int)m_filestore_journal_trailing) > 1) {
1339 dout(0) << "mount ERROR: more than one of filestore journal {writeahead,parallel,trailing} enabled" << dendl;
1340 cerr << TEXT_RED
1341 << " ** WARNING: more than one of 'filestore journal {writeahead,parallel,trailing}'\n"
1342 << " is enabled in ceph.conf. You must choose a single journal mode."
1343 << TEXT_NORMAL << std::endl;
1344 return -EINVAL;
1345 }
1346
1347 if (!backend->can_checkpoint()) {
1348 if (!journal || !m_filestore_journal_writeahead) {
1349 dout(0) << "mount WARNING: no btrfs, and no journal in writeahead mode; data may be lost" << dendl;
1350 cerr << TEXT_RED
1351 << " ** WARNING: no btrfs AND (no journal OR journal not in writeahead mode)\n"
1352 << " For non-btrfs volumes, a writeahead journal is required to\n"
1353 << " maintain on-disk consistency in the event of a crash. Your conf\n"
1354 << " should include something like:\n"
1355 << " osd journal = /path/to/journal_device_or_file\n"
1356 << " filestore journal writeahead = true\n"
1357 << TEXT_NORMAL;
1358 }
1359 }
1360
1361 if (!journal) {
1362 dout(0) << "mount WARNING: no journal" << dendl;
1363 cerr << TEXT_YELLOW
1364 << " ** WARNING: No osd journal is configured: write latency may be high.\n"
1365 << " If you will not be using an osd journal, write latency may be\n"
1366 << " relatively high. It can be reduced somewhat by lowering\n"
1367 << " filestore_max_sync_interval, but lower values mean lower write\n"
1368 << " throughput, especially with spinning disks.\n"
1369 << TEXT_NORMAL;
1370 }
1371
1372 return 0;
1373}
1374
1375int FileStore::write_superblock()
1376{
1377 bufferlist bl;
11fdf7f2 1378 encode(superblock, bl);
7c673cae 1379 return safe_write_file(basedir.c_str(), "superblock",
eafe8130 1380 bl.c_str(), bl.length(), 0600);
7c673cae
FG
1381}
1382
1383int FileStore::read_superblock()
1384{
1385 bufferptr bp(PATH_MAX);
1386 int ret = safe_read_file(basedir.c_str(), "superblock",
1387 bp.c_str(), bp.length());
1388 if (ret < 0) {
1389 if (ret == -ENOENT) {
1390 // If the file doesn't exist write initial CompatSet
1391 return write_superblock();
1392 }
1393 return ret;
1394 }
1395
1396 bufferlist bl;
1397 bl.push_back(std::move(bp));
11fdf7f2
TL
1398 auto i = bl.cbegin();
1399 decode(superblock, i);
7c673cae
FG
1400 return 0;
1401}
1402
1403int FileStore::update_version_stamp()
1404{
1405 return write_version_stamp();
1406}
1407
1408int FileStore::version_stamp_is_valid(uint32_t *version)
1409{
1410 bufferptr bp(PATH_MAX);
1411 int ret = safe_read_file(basedir.c_str(), "store_version",
1412 bp.c_str(), bp.length());
1413 if (ret < 0) {
1414 return ret;
1415 }
1416 bufferlist bl;
1417 bl.push_back(std::move(bp));
11fdf7f2
TL
1418 auto i = bl.cbegin();
1419 decode(*version, i);
31f18b77 1420 dout(10) << __FUNC__ << ": was " << *version << " vs target "
7c673cae
FG
1421 << target_version << dendl;
1422 if (*version == target_version)
1423 return 1;
1424 else
1425 return 0;
1426}
1427
11fdf7f2
TL
1428int FileStore::flush_cache(ostream *os)
1429{
1430 string drop_caches_file = "/proc/sys/vm/drop_caches";
1431 int drop_caches_fd = ::open(drop_caches_file.c_str(), O_WRONLY|O_CLOEXEC), ret = 0;
1432 char buf[2] = "3";
1433 size_t len = strlen(buf);
1434
1435 if (drop_caches_fd < 0) {
1436 ret = -errno;
1437 derr << __FUNC__ << ": failed to open " << drop_caches_file << ": " << cpp_strerror(ret) << dendl;
1438 if (os) {
1439 *os << "FileStore flush_cache: failed to open " << drop_caches_file << ": " << cpp_strerror(ret);
1440 }
1441 return ret;
1442 }
1443
1444 if (::write(drop_caches_fd, buf, len) < 0) {
1445 ret = -errno;
1446 derr << __FUNC__ << ": failed to write to " << drop_caches_file << ": " << cpp_strerror(ret) << dendl;
1447 if (os) {
1448 *os << "FileStore flush_cache: failed to write to " << drop_caches_file << ": " << cpp_strerror(ret);
1449 }
1450 goto out;
1451 }
1452
1453out:
1454 ::close(drop_caches_fd);
1455 return ret;
1456}
1457
7c673cae
FG
1458int FileStore::write_version_stamp()
1459{
31f18b77 1460 dout(1) << __FUNC__ << ": " << target_version << dendl;
7c673cae 1461 bufferlist bl;
11fdf7f2 1462 encode(target_version, bl);
7c673cae
FG
1463
1464 return safe_write_file(basedir.c_str(), "store_version",
eafe8130 1465 bl.c_str(), bl.length(), 0600);
7c673cae
FG
1466}
1467
1468int FileStore::upgrade()
1469{
31f18b77 1470 dout(1) << __FUNC__ << dendl;
7c673cae
FG
1471 uint32_t version;
1472 int r = version_stamp_is_valid(&version);
1473
1474 if (r == -ENOENT) {
1475 derr << "The store_version file doesn't exist." << dendl;
1476 return -EINVAL;
1477 }
1478 if (r < 0)
1479 return r;
1480 if (r == 1)
1481 return 0;
1482
1483 if (version < 3) {
1484 derr << "ObjectStore is old at version " << version << ". Please upgrade to firefly v0.80.x, convert your store, and then upgrade." << dendl;
1485 return -EINVAL;
1486 }
1487
1488 // nothing necessary in FileStore for v3 -> v4 upgrade; we just need to
1489 // open up DBObjectMap with the do_upgrade flag, which we already did.
1490 update_version_stamp();
1491 return 0;
1492}
1493
1494int FileStore::read_op_seq(uint64_t *seq)
1495{
91327a77 1496 int op_fd = ::open(current_op_seq_fn.c_str(), O_CREAT|O_RDWR|O_CLOEXEC, 0644);
7c673cae
FG
1497 if (op_fd < 0) {
1498 int r = -errno;
11fdf7f2 1499 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
1500 return r;
1501 }
1502 char s[40];
1503 memset(s, 0, sizeof(s));
1504 int ret = safe_read(op_fd, s, sizeof(s) - 1);
1505 if (ret < 0) {
31f18b77 1506 derr << __FUNC__ << ": error reading " << current_op_seq_fn << ": " << cpp_strerror(ret) << dendl;
7c673cae 1507 VOID_TEMP_FAILURE_RETRY(::close(op_fd));
11fdf7f2 1508 ceph_assert(!m_filestore_fail_eio || ret != -EIO);
7c673cae
FG
1509 return ret;
1510 }
1511 *seq = atoll(s);
1512 return op_fd;
1513}
1514
1515int FileStore::write_op_seq(int fd, uint64_t seq)
1516{
1517 char s[30];
1518 snprintf(s, sizeof(s), "%" PRId64 "\n", seq);
1519 int ret = TEMP_FAILURE_RETRY(::pwrite(fd, s, strlen(s), 0));
1520 if (ret < 0) {
1521 ret = -errno;
11fdf7f2 1522 ceph_assert(!m_filestore_fail_eio || ret != -EIO);
7c673cae
FG
1523 }
1524 return ret;
1525}
1526
1527int FileStore::mount()
1528{
1529 int ret;
1530 char buf[PATH_MAX];
1531 uint64_t initial_op_seq;
1532 uuid_d omap_fsid;
1533 set<string> cluster_snaps;
1534 CompatSet supported_compat_set = get_fs_supported_compat_set();
1535
1536 dout(5) << "basedir " << basedir << " journal " << journalpath << dendl;
1537
1538 ret = set_throttle_params();
1539 if (ret != 0)
1540 goto done;
1541
1542 // make sure global base dir exists
1543 if (::access(basedir.c_str(), R_OK | W_OK)) {
1544 ret = -errno;
31f18b77 1545 derr << __FUNC__ << ": unable to access basedir '" << basedir << "': "
7c673cae
FG
1546 << cpp_strerror(ret) << dendl;
1547 goto done;
1548 }
1549
1550 // get fsid
1551 snprintf(buf, sizeof(buf), "%s/fsid", basedir.c_str());
91327a77 1552 fsid_fd = ::open(buf, O_RDWR|O_CLOEXEC, 0644);
7c673cae
FG
1553 if (fsid_fd < 0) {
1554 ret = -errno;
31f18b77 1555 derr << __FUNC__ << ": error opening '" << buf << "': "
7c673cae
FG
1556 << cpp_strerror(ret) << dendl;
1557 goto done;
1558 }
1559
1560 ret = read_fsid(fsid_fd, &fsid);
1561 if (ret < 0) {
31f18b77 1562 derr << __FUNC__ << ": error reading fsid_fd: " << cpp_strerror(ret)
7c673cae
FG
1563 << dendl;
1564 goto close_fsid_fd;
1565 }
1566
1567 if (lock_fsid() < 0) {
31f18b77 1568 derr << __FUNC__ << ": lock_fsid failed" << dendl;
7c673cae
FG
1569 ret = -EBUSY;
1570 goto close_fsid_fd;
1571 }
1572
1573 dout(10) << "mount fsid is " << fsid << dendl;
1574
1575
1576 uint32_t version_stamp;
1577 ret = version_stamp_is_valid(&version_stamp);
1578 if (ret < 0) {
31f18b77 1579 derr << __FUNC__ << ": error in version_stamp_is_valid: "
7c673cae
FG
1580 << cpp_strerror(ret) << dendl;
1581 goto close_fsid_fd;
1582 } else if (ret == 0) {
1583 if (do_update || (int)version_stamp < cct->_conf->filestore_update_to) {
31f18b77 1584 derr << __FUNC__ << ": stale version stamp detected: "
7c673cae
FG
1585 << version_stamp
1586 << ". Proceeding, do_update "
1587 << "is set, performing disk format upgrade."
1588 << dendl;
1589 do_update = true;
1590 } else {
1591 ret = -EINVAL;
31f18b77 1592 derr << __FUNC__ << ": stale version stamp " << version_stamp
7c673cae
FG
1593 << ". Please run the FileStore update script before starting the "
1594 << "OSD, or set filestore_update_to to " << target_version
1595 << " (currently " << cct->_conf->filestore_update_to << ")"
1596 << dendl;
1597 goto close_fsid_fd;
1598 }
1599 }
1600
1601 ret = read_superblock();
1602 if (ret < 0) {
1603 goto close_fsid_fd;
1604 }
1605
1606 // Check if this FileStore supports all the necessary features to mount
1607 if (supported_compat_set.compare(superblock.compat_features) == -1) {
31f18b77 1608 derr << __FUNC__ << ": Incompatible features set "
7c673cae
FG
1609 << superblock.compat_features << dendl;
1610 ret = -EINVAL;
1611 goto close_fsid_fd;
1612 }
1613
1614 // open some dir handles
91327a77 1615 basedir_fd = ::open(basedir.c_str(), O_RDONLY|O_CLOEXEC);
7c673cae
FG
1616 if (basedir_fd < 0) {
1617 ret = -errno;
31f18b77 1618 derr << __FUNC__ << ": failed to open " << basedir << ": "
7c673cae
FG
1619 << cpp_strerror(ret) << dendl;
1620 basedir_fd = -1;
1621 goto close_fsid_fd;
1622 }
1623
1624 // test for btrfs, xattrs, etc.
1625 ret = _detect_fs();
1626 if (ret < 0) {
31f18b77 1627 derr << __FUNC__ << ": error in _detect_fs: "
7c673cae
FG
1628 << cpp_strerror(ret) << dendl;
1629 goto close_basedir_fd;
1630 }
1631
1632 {
1633 list<string> ls;
1634 ret = backend->list_checkpoints(ls);
1635 if (ret < 0) {
31f18b77 1636 derr << __FUNC__ << ": error in _list_snaps: "<< cpp_strerror(ret) << dendl;
7c673cae
FG
1637 goto close_basedir_fd;
1638 }
1639
1640 long long unsigned c, prev = 0;
1641 char clustersnap[NAME_MAX];
1642 for (list<string>::iterator it = ls.begin(); it != ls.end(); ++it) {
1643 if (sscanf(it->c_str(), COMMIT_SNAP_ITEM, &c) == 1) {
11fdf7f2 1644 ceph_assert(c > prev);
7c673cae
FG
1645 prev = c;
1646 snaps.push_back(c);
1647 } else if (sscanf(it->c_str(), CLUSTER_SNAP_ITEM, clustersnap) == 1)
1648 cluster_snaps.insert(*it);
1649 }
1650 }
1651
1652 if (m_osd_rollback_to_cluster_snap.length() &&
1653 cluster_snaps.count(m_osd_rollback_to_cluster_snap) == 0) {
1654 derr << "rollback to cluster snapshot '" << m_osd_rollback_to_cluster_snap << "': not found" << dendl;
1655 ret = -ENOENT;
1656 goto close_basedir_fd;
1657 }
1658
1659 char nosnapfn[200];
1660 snprintf(nosnapfn, sizeof(nosnapfn), "%s/nosnap", current_fn.c_str());
1661
1662 if (backend->can_checkpoint()) {
1663 if (snaps.empty()) {
31f18b77 1664 dout(0) << __FUNC__ << ": WARNING: no consistent snaps found, store may be in inconsistent state" << dendl;
7c673cae
FG
1665 } else {
1666 char s[NAME_MAX];
1667 uint64_t curr_seq = 0;
1668
1669 if (m_osd_rollback_to_cluster_snap.length()) {
1670 derr << TEXT_RED
1671 << " ** NOTE: rolling back to cluster snapshot " << m_osd_rollback_to_cluster_snap << " **"
1672 << TEXT_NORMAL
1673 << dendl;
11fdf7f2 1674 ceph_assert(cluster_snaps.count(m_osd_rollback_to_cluster_snap));
7c673cae
FG
1675 snprintf(s, sizeof(s), CLUSTER_SNAP_ITEM, m_osd_rollback_to_cluster_snap.c_str());
1676 } else {
1677 {
1678 int fd = read_op_seq(&curr_seq);
1679 if (fd >= 0) {
1680 VOID_TEMP_FAILURE_RETRY(::close(fd));
1681 }
1682 }
1683 if (curr_seq)
1684 dout(10) << " current/ seq was " << curr_seq << dendl;
1685 else
1686 dout(10) << " current/ missing entirely (unusual, but okay)" << dendl;
1687
1688 uint64_t cp = snaps.back();
1689 dout(10) << " most recent snap from " << snaps << " is " << cp << dendl;
1690
1691 // if current/ is marked as non-snapshotted, refuse to roll
1692 // back (without clear direction) to avoid throwing out new
1693 // data.
1694 struct stat st;
1695 if (::stat(nosnapfn, &st) == 0) {
1696 if (!m_osd_use_stale_snap) {
1697 derr << "ERROR: " << nosnapfn << " exists, not rolling back to avoid losing new data" << dendl;
1698 derr << "Force rollback to old snapshotted version with 'osd use stale snap = true'" << dendl;
1699 derr << "config option for --osd-use-stale-snap startup argument." << dendl;
1700 ret = -ENOTSUP;
1701 goto close_basedir_fd;
1702 }
1703 derr << "WARNING: user forced start with data sequence mismatch: current was " << curr_seq
1704 << ", newest snap is " << cp << dendl;
1705 cerr << TEXT_YELLOW
1706 << " ** WARNING: forcing the use of stale snapshot data **"
1707 << TEXT_NORMAL << std::endl;
1708 }
1709
31f18b77 1710 dout(10) << __FUNC__ << ": rolling back to consistent snap " << cp << dendl;
7c673cae
FG
1711 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)cp);
1712 }
1713
1714 // drop current?
1715 ret = backend->rollback_to(s);
1716 if (ret) {
31f18b77 1717 derr << __FUNC__ << ": error rolling back to " << s << ": "
7c673cae
FG
1718 << cpp_strerror(ret) << dendl;
1719 goto close_basedir_fd;
1720 }
1721 }
1722 }
1723 initial_op_seq = 0;
1724
91327a77 1725 current_fd = ::open(current_fn.c_str(), O_RDONLY|O_CLOEXEC);
7c673cae
FG
1726 if (current_fd < 0) {
1727 ret = -errno;
31f18b77 1728 derr << __FUNC__ << ": error opening: " << current_fn << ": " << cpp_strerror(ret) << dendl;
7c673cae
FG
1729 goto close_basedir_fd;
1730 }
1731
11fdf7f2 1732 ceph_assert(current_fd >= 0);
7c673cae
FG
1733
1734 op_fd = read_op_seq(&initial_op_seq);
1735 if (op_fd < 0) {
1736 ret = op_fd;
31f18b77 1737 derr << __FUNC__ << ": read_op_seq failed" << dendl;
7c673cae
FG
1738 goto close_current_fd;
1739 }
1740
1741 dout(5) << "mount op_seq is " << initial_op_seq << dendl;
1742 if (initial_op_seq == 0) {
1743 derr << "mount initial op seq is 0; something is wrong" << dendl;
1744 ret = -EINVAL;
1745 goto close_current_fd;
1746 }
1747
1748 if (!backend->can_checkpoint()) {
1749 // mark current/ as non-snapshotted so that we don't rollback away
1750 // from it.
1751 int r = ::creat(nosnapfn, 0644);
1752 if (r < 0) {
1753 ret = -errno;
31f18b77 1754 derr << __FUNC__ << ": failed to create current/nosnap" << dendl;
7c673cae
FG
1755 goto close_current_fd;
1756 }
1757 VOID_TEMP_FAILURE_RETRY(::close(r));
1758 } else {
1759 // clear nosnap marker, if present.
1760 ::unlink(nosnapfn);
1761 }
1762
1763 // check fsid with omap
1764 // get omap fsid
7c673cae
FG
1765 char omap_fsid_buf[PATH_MAX];
1766 struct ::stat omap_fsid_stat;
1767 snprintf(omap_fsid_buf, sizeof(omap_fsid_buf), "%s/osd_uuid", omap_dir.c_str());
1768 // if osd_uuid not exists, assume as this omap matchs corresponding osd
1769 if (::stat(omap_fsid_buf, &omap_fsid_stat) != 0){
31f18b77 1770 dout(10) << __FUNC__ << ": osd_uuid not found under omap, "
7c673cae
FG
1771 << "assume as matched."
1772 << dendl;
11fdf7f2
TL
1773 } else {
1774 int omap_fsid_fd;
7c673cae 1775 // if osd_uuid exists, compares osd_uuid with fsid
91327a77 1776 omap_fsid_fd = ::open(omap_fsid_buf, O_RDONLY|O_CLOEXEC, 0644);
7c673cae
FG
1777 if (omap_fsid_fd < 0) {
1778 ret = -errno;
31f18b77 1779 derr << __FUNC__ << ": error opening '" << omap_fsid_buf << "': "
7c673cae
FG
1780 << cpp_strerror(ret)
1781 << dendl;
1782 goto close_current_fd;
1783 }
1784 ret = read_fsid(omap_fsid_fd, &omap_fsid);
1785 VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd));
7c673cae 1786 if (ret < 0) {
31f18b77 1787 derr << __FUNC__ << ": error reading omap_fsid_fd"
7c673cae
FG
1788 << ", omap_fsid = " << omap_fsid
1789 << cpp_strerror(ret)
1790 << dendl;
1791 goto close_current_fd;
1792 }
1793 if (fsid != omap_fsid) {
31f18b77 1794 derr << __FUNC__ << ": " << omap_fsid_buf
7c673cae
FG
1795 << " has existed omap fsid " << omap_fsid
1796 << " != expected osd fsid " << fsid
1797 << dendl;
1798 ret = -EINVAL;
1799 goto close_current_fd;
1800 }
1801 }
1802
1803 dout(0) << "start omap initiation" << dendl;
1804 if (!(generic_flags & SKIP_MOUNT_OMAP)) {
1805 KeyValueDB * omap_store = KeyValueDB::create(cct,
1806 superblock.omap_backend,
1807 omap_dir);
11fdf7f2 1808 if (!omap_store)
7c673cae 1809 {
31f18b77 1810 derr << __FUNC__ << ": Error creating " << superblock.omap_backend << dendl;
7c673cae
FG
1811 ret = -1;
1812 goto close_current_fd;
1813 }
1814
1815 if (superblock.omap_backend == "rocksdb")
1816 ret = omap_store->init(cct->_conf->filestore_rocksdb_options);
1817 else
1818 ret = omap_store->init();
1819
1820 if (ret < 0) {
31f18b77 1821 derr << __FUNC__ << ": Error initializing omap_store: " << cpp_strerror(ret) << dendl;
7c673cae
FG
1822 goto close_current_fd;
1823 }
1824
1825 stringstream err;
1826 if (omap_store->create_and_open(err)) {
1827 delete omap_store;
11fdf7f2 1828 omap_store = nullptr;
31f18b77 1829 derr << __FUNC__ << ": Error initializing " << superblock.omap_backend
7c673cae
FG
1830 << " : " << err.str() << dendl;
1831 ret = -1;
1832 goto close_current_fd;
1833 }
1834
1835 DBObjectMap *dbomap = new DBObjectMap(cct, omap_store);
1836 ret = dbomap->init(do_update);
1837 if (ret < 0) {
1838 delete dbomap;
11fdf7f2 1839 dbomap = nullptr;
31f18b77 1840 derr << __FUNC__ << ": Error initializing DBObjectMap: " << ret << dendl;
7c673cae
FG
1841 goto close_current_fd;
1842 }
1843 stringstream err2;
1844
1845 if (cct->_conf->filestore_debug_omap_check && !dbomap->check(err2)) {
1846 derr << err2.str() << dendl;
1847 delete dbomap;
11fdf7f2 1848 dbomap = nullptr;
7c673cae
FG
1849 ret = -EINVAL;
1850 goto close_current_fd;
1851 }
1852 object_map.reset(dbomap);
1853 }
1854
1855 // journal
1856 new_journal();
1857
1858 // select journal mode?
1859 if (journal) {
1860 if (!m_filestore_journal_writeahead &&
1861 !m_filestore_journal_parallel &&
1862 !m_filestore_journal_trailing) {
1863 if (!backend->can_checkpoint()) {
1864 m_filestore_journal_writeahead = true;
31f18b77 1865 dout(0) << __FUNC__ << ": enabling WRITEAHEAD journal mode: checkpoint is not enabled" << dendl;
7c673cae
FG
1866 } else {
1867 m_filestore_journal_parallel = true;
31f18b77 1868 dout(0) << __FUNC__ << ": enabling PARALLEL journal mode: fs, checkpoint is enabled" << dendl;
7c673cae
FG
1869 }
1870 } else {
1871 if (m_filestore_journal_writeahead)
31f18b77 1872 dout(0) << __FUNC__ << ": WRITEAHEAD journal mode explicitly enabled in conf" << dendl;
7c673cae 1873 if (m_filestore_journal_parallel)
31f18b77 1874 dout(0) << __FUNC__ << ": PARALLEL journal mode explicitly enabled in conf" << dendl;
7c673cae 1875 if (m_filestore_journal_trailing)
31f18b77 1876 dout(0) << __FUNC__ << ": TRAILING journal mode explicitly enabled in conf" << dendl;
7c673cae
FG
1877 }
1878 if (m_filestore_journal_writeahead)
1879 journal->set_wait_on_full(true);
1880 } else {
31f18b77 1881 dout(0) << __FUNC__ << ": no journal" << dendl;
7c673cae
FG
1882 }
1883
1884 ret = _sanity_check_fs();
1885 if (ret) {
31f18b77 1886 derr << __FUNC__ << ": _sanity_check_fs failed with error "
7c673cae
FG
1887 << ret << dendl;
1888 goto close_current_fd;
1889 }
1890
1891 // Cleanup possibly invalid collections
1892 {
1893 vector<coll_t> collections;
1894 ret = list_collections(collections, true);
1895 if (ret < 0) {
1896 derr << "Error " << ret << " while listing collections" << dendl;
1897 goto close_current_fd;
1898 }
1899 for (vector<coll_t>::iterator i = collections.begin();
1900 i != collections.end();
1901 ++i) {
1902 Index index;
1903 ret = get_index(*i, &index);
1904 if (ret < 0) {
1905 derr << "Unable to mount index " << *i
1906 << " with error: " << ret << dendl;
1907 goto close_current_fd;
1908 }
11fdf7f2 1909 ceph_assert(index.index);
7c673cae
FG
1910 RWLock::WLocker l((index.index)->access_lock);
1911
1912 index->cleanup();
1913 }
1914 }
1915 if (!m_disable_wbthrottle) {
1916 wbthrottle.start();
1917 } else {
31f18b77 1918 dout(0) << __FUNC__ << ": INFO: WbThrottle is disabled" << dendl;
7c673cae 1919 if (cct->_conf->filestore_odsync_write) {
31f18b77 1920 dout(0) << __FUNC__ << ": INFO: O_DSYNC write is enabled" << dendl;
7c673cae
FG
1921 }
1922 }
1923 sync_thread.create("filestore_sync");
1924
1925 if (!(generic_flags & SKIP_JOURNAL_REPLAY)) {
1926 ret = journal_replay(initial_op_seq);
1927 if (ret < 0) {
31f18b77 1928 derr << __FUNC__ << ": failed to open journal " << journalpath << ": " << cpp_strerror(ret) << dendl;
7c673cae
FG
1929 if (ret == -ENOTTY) {
1930 derr << "maybe journal is not pointing to a block device and its size "
1931 << "wasn't configured?" << dendl;
1932 }
1933
1934 goto stop_sync;
1935 }
1936 }
1937
1938 {
1939 stringstream err2;
1940 if (cct->_conf->filestore_debug_omap_check && !object_map->check(err2)) {
1941 derr << err2.str() << dendl;
1942 ret = -EINVAL;
1943 goto stop_sync;
1944 }
1945 }
1946
1947 init_temp_collections();
1948
1949 journal_start();
1950
1951 op_tp.start();
1952 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
1953 (*it)->start();
1954 }
1955 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
1956 (*it)->start();
1957 }
1958
1959 timer.init();
1960
1961 // upgrade?
1962 if (cct->_conf->filestore_update_to >= (int)get_target_version()) {
1963 int err = upgrade();
1964 if (err < 0) {
1965 derr << "error converting store" << dendl;
1966 umount();
1967 return err;
1968 }
1969 }
1970
1971 // all okay.
1972 return 0;
1973
1974stop_sync:
1975 // stop sync thread
1976 lock.Lock();
1977 stop = true;
1978 sync_cond.Signal();
1979 lock.Unlock();
1980 sync_thread.join();
1981 if (!m_disable_wbthrottle) {
1982 wbthrottle.stop();
1983 }
1984close_current_fd:
1985 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
1986 current_fd = -1;
1987close_basedir_fd:
1988 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1989 basedir_fd = -1;
1990close_fsid_fd:
1991 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1992 fsid_fd = -1;
1993done:
11fdf7f2 1994 ceph_assert(!m_filestore_fail_eio || ret != -EIO);
7c673cae 1995 delete backend;
11fdf7f2 1996 backend = nullptr;
7c673cae
FG
1997 object_map.reset();
1998 return ret;
1999}
2000
2001void FileStore::init_temp_collections()
2002{
31f18b77 2003 dout(10) << __FUNC__ << dendl;
7c673cae
FG
2004 vector<coll_t> ls;
2005 int r = list_collections(ls, true);
11fdf7f2 2006 ceph_assert(r >= 0);
7c673cae
FG
2007
2008 dout(20) << " ls " << ls << dendl;
2009
2010 SequencerPosition spos;
2011
2012 set<coll_t> temps;
2013 for (vector<coll_t>::iterator p = ls.begin(); p != ls.end(); ++p)
2014 if (p->is_temp())
2015 temps.insert(*p);
2016 dout(20) << " temps " << temps << dendl;
2017
2018 for (vector<coll_t>::iterator p = ls.begin(); p != ls.end(); ++p) {
2019 if (p->is_temp())
2020 continue;
11fdf7f2 2021 coll_map[*p] = new OpSequencer(cct, ++next_osr_id, *p);
7c673cae
FG
2022 if (p->is_meta())
2023 continue;
2024 coll_t temp = p->get_temp();
2025 if (temps.count(temp)) {
2026 temps.erase(temp);
2027 } else {
31f18b77 2028 dout(10) << __FUNC__ << ": creating " << temp << dendl;
7c673cae 2029 r = _create_collection(temp, 0, spos);
11fdf7f2 2030 ceph_assert(r == 0);
7c673cae
FG
2031 }
2032 }
2033
2034 for (set<coll_t>::iterator p = temps.begin(); p != temps.end(); ++p) {
31f18b77 2035 dout(10) << __FUNC__ << ": removing stray " << *p << dendl;
7c673cae 2036 r = _collection_remove_recursive(*p, spos);
11fdf7f2 2037 ceph_assert(r == 0);
7c673cae
FG
2038 }
2039}
2040
2041int FileStore::umount()
2042{
31f18b77 2043 dout(5) << __FUNC__ << ": " << basedir << dendl;
7c673cae
FG
2044
2045 flush();
2046 sync();
2047 do_force_sync();
2048
11fdf7f2
TL
2049 {
2050 Mutex::Locker l(coll_lock);
2051 coll_map.clear();
2052 }
2053
7c673cae
FG
2054 lock.Lock();
2055 stop = true;
2056 sync_cond.Signal();
2057 lock.Unlock();
2058 sync_thread.join();
2059 if (!m_disable_wbthrottle){
2060 wbthrottle.stop();
2061 }
2062 op_tp.stop();
2063
2064 journal_stop();
2065 if (!(generic_flags & SKIP_JOURNAL_REPLAY))
2066 journal_write_close();
2067
2068 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
2069 (*it)->stop();
2070 }
2071 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
2072 (*it)->stop();
2073 }
2074
11fdf7f2
TL
2075 if (vdo_fd >= 0) {
2076 VOID_TEMP_FAILURE_RETRY(::close(vdo_fd));
2077 vdo_fd = -1;
2078 }
7c673cae
FG
2079 if (fsid_fd >= 0) {
2080 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
2081 fsid_fd = -1;
2082 }
2083 if (op_fd >= 0) {
2084 VOID_TEMP_FAILURE_RETRY(::close(op_fd));
2085 op_fd = -1;
2086 }
2087 if (current_fd >= 0) {
2088 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
2089 current_fd = -1;
2090 }
2091 if (basedir_fd >= 0) {
2092 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
2093 basedir_fd = -1;
2094 }
2095
2096 force_sync = false;
2097
2098 delete backend;
11fdf7f2 2099 backend = nullptr;
7c673cae
FG
2100
2101 object_map.reset();
2102
2103 {
2104 Mutex::Locker l(sync_entry_timeo_lock);
2105 timer.shutdown();
2106 }
2107
2108 // nothing
2109 return 0;
2110}
2111
2112
11fdf7f2
TL
2113/// -----------------------------
2114
2115// keep OpSequencer handles alive for all time so that a sequence
2116// that removes a collection and creates a new one will not allow
2117// two sequencers for the same collection to be alive at once.
2118
2119ObjectStore::CollectionHandle FileStore::open_collection(const coll_t& c)
2120{
2121 Mutex::Locker l(coll_lock);
2122 auto p = coll_map.find(c);
2123 if (p == coll_map.end()) {
2124 return CollectionHandle();
2125 }
2126 return p->second;
2127}
2128
2129ObjectStore::CollectionHandle FileStore::create_new_collection(const coll_t& c)
2130{
2131 Mutex::Locker l(coll_lock);
2132 auto p = coll_map.find(c);
2133 if (p == coll_map.end()) {
2134 auto *r = new OpSequencer(cct, ++next_osr_id, c);
2135 coll_map[c] = r;
2136 return r;
2137 } else {
2138 return p->second;
2139 }
2140}
7c673cae
FG
2141
2142
2143/// -----------------------------
2144
2145FileStore::Op *FileStore::build_op(vector<Transaction>& tls,
2146 Context *onreadable,
2147 Context *onreadable_sync,
2148 TrackedOpRef osd_op)
2149{
2150 uint64_t bytes = 0, ops = 0;
2151 for (vector<Transaction>::iterator p = tls.begin();
2152 p != tls.end();
2153 ++p) {
2154 bytes += (*p).get_num_bytes();
2155 ops += (*p).get_num_ops();
2156 }
2157
2158 Op *o = new Op;
2159 o->start = ceph_clock_now();
2160 o->tls = std::move(tls);
2161 o->onreadable = onreadable;
2162 o->onreadable_sync = onreadable_sync;
2163 o->ops = ops;
2164 o->bytes = bytes;
2165 o->osd_op = osd_op;
2166 return o;
2167}
2168
2169
2170
2171void FileStore::queue_op(OpSequencer *osr, Op *o)
2172{
2173 // queue op on sequencer, then queue sequencer for the threadpool,
2174 // so that regardless of which order the threads pick up the
2175 // sequencer, the op order will be preserved.
2176
2177 osr->queue(o);
2178 o->trace.event("queued");
2179
2180 logger->inc(l_filestore_ops);
2181 logger->inc(l_filestore_bytes, o->bytes);
2182
31f18b77 2183 dout(5) << __FUNC__ << ": " << o << " seq " << o->op
7c673cae
FG
2184 << " " << *osr
2185 << " " << o->bytes << " bytes"
2186 << " (queue has " << throttle_ops.get_current() << " ops and " << throttle_bytes.get_current() << " bytes)"
2187 << dendl;
2188 op_wq.queue(osr);
2189}
2190
2191void FileStore::op_queue_reserve_throttle(Op *o)
2192{
2193 throttle_ops.get();
2194 throttle_bytes.get(o->bytes);
2195
2196 logger->set(l_filestore_op_queue_ops, throttle_ops.get_current());
2197 logger->set(l_filestore_op_queue_bytes, throttle_bytes.get_current());
2198}
2199
2200void FileStore::op_queue_release_throttle(Op *o)
2201{
2202 throttle_ops.put();
2203 throttle_bytes.put(o->bytes);
2204 logger->set(l_filestore_op_queue_ops, throttle_ops.get_current());
2205 logger->set(l_filestore_op_queue_bytes, throttle_bytes.get_current());
2206}
2207
2208void FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)
2209{
2210 if (!m_disable_wbthrottle) {
2211 wbthrottle.throttle();
2212 }
2213 // inject a stall?
2214 if (cct->_conf->filestore_inject_stall) {
2215 int orig = cct->_conf->filestore_inject_stall;
31f18b77 2216 dout(5) << __FUNC__ << ": filestore_inject_stall " << orig << ", sleeping" << dendl;
7c673cae 2217 sleep(orig);
11fdf7f2 2218 cct->_conf.set_val("filestore_inject_stall", "0");
31f18b77 2219 dout(5) << __FUNC__ << ": done stalling" << dendl;
7c673cae
FG
2220 }
2221
2222 osr->apply_lock.Lock();
2223 Op *o = osr->peek_queue();
2224 o->trace.event("op_apply_start");
2225 apply_manager.op_apply_start(o->op);
11fdf7f2 2226 dout(5) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << " start" << dendl;
7c673cae 2227 o->trace.event("_do_transactions start");
11fdf7f2 2228 int r = _do_transactions(o->tls, o->op, &handle, osr->osr_name);
7c673cae
FG
2229 o->trace.event("op_apply_finish");
2230 apply_manager.op_apply_finish(o->op);
31f18b77 2231 dout(10) << __FUNC__ << ": " << o << " seq " << o->op << " r = " << r
7c673cae 2232 << ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl;
7c673cae
FG
2233}
2234
2235void FileStore::_finish_op(OpSequencer *osr)
2236{
2237 list<Context*> to_queue;
2238 Op *o = osr->dequeue(&to_queue);
2239
11fdf7f2
TL
2240 o->tls.clear();
2241
7c673cae
FG
2242 utime_t lat = ceph_clock_now();
2243 lat -= o->start;
2244
11fdf7f2 2245 dout(10) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << " lat " << lat << dendl;
7c673cae
FG
2246 osr->apply_lock.Unlock(); // locked in _do_op
2247 o->trace.event("_finish_op");
2248
2249 // called with tp lock held
2250 op_queue_release_throttle(o);
2251
2252 logger->tinc(l_filestore_apply_latency, lat);
2253
2254 if (o->onreadable_sync) {
2255 o->onreadable_sync->complete(0);
2256 }
2257 if (o->onreadable) {
2258 apply_finishers[osr->id % m_apply_finisher_num]->queue(o->onreadable);
2259 }
2260 if (!to_queue.empty()) {
2261 apply_finishers[osr->id % m_apply_finisher_num]->queue(to_queue);
2262 }
2263 delete o;
11fdf7f2 2264 o = nullptr;
7c673cae
FG
2265}
2266
7c673cae
FG
2267struct C_JournaledAhead : public Context {
2268 FileStore *fs;
2269 FileStore::OpSequencer *osr;
2270 FileStore::Op *o;
2271 Context *ondisk;
2272
2273 C_JournaledAhead(FileStore *f, FileStore::OpSequencer *os, FileStore::Op *o, Context *ondisk):
2274 fs(f), osr(os), o(o), ondisk(ondisk) { }
2275 void finish(int r) override {
2276 fs->_journaled_ahead(osr, o, ondisk);
2277 }
2278};
2279
11fdf7f2 2280int FileStore::queue_transactions(CollectionHandle& ch, vector<Transaction>& tls,
7c673cae
FG
2281 TrackedOpRef osd_op,
2282 ThreadPool::TPHandle *handle)
2283{
2284 Context *onreadable;
2285 Context *ondisk;
2286 Context *onreadable_sync;
2287 ObjectStore::Transaction::collect_contexts(
2288 tls, &onreadable, &ondisk, &onreadable_sync);
2289
2290 if (cct->_conf->objectstore_blackhole) {
31f18b77 2291 dout(0) << __FUNC__ << ": objectstore_blackhole = TRUE, dropping transaction"
7c673cae
FG
2292 << dendl;
2293 delete ondisk;
11fdf7f2 2294 ondisk = nullptr;
7c673cae 2295 delete onreadable;
11fdf7f2 2296 onreadable = nullptr;
7c673cae 2297 delete onreadable_sync;
11fdf7f2 2298 onreadable_sync = nullptr;
7c673cae
FG
2299 return 0;
2300 }
2301
2302 utime_t start = ceph_clock_now();
7c673cae 2303
11fdf7f2
TL
2304 OpSequencer *osr = static_cast<OpSequencer*>(ch.get());
2305 dout(5) << __FUNC__ << ": osr " << osr << " " << *osr << dendl;
7c673cae
FG
2306
2307 ZTracer::Trace trace;
2308 if (osd_op && osd_op->pg_trace) {
2309 osd_op->store_trace.init("filestore op", &trace_endpoint, &osd_op->pg_trace);
2310 trace = osd_op->store_trace;
2311 }
2312
2313 if (journal && journal->is_writeable() && !m_filestore_journal_trailing) {
2314 Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
2315
2316 //prepare and encode transactions data out of lock
2317 bufferlist tbl;
2318 int orig_len = journal->prepare_entry(o->tls, &tbl);
2319
2320 if (handle)
2321 handle->suspend_tp_timeout();
2322
2323 op_queue_reserve_throttle(o);
2324 journal->reserve_throttle_and_backoff(tbl.length());
2325
2326 if (handle)
2327 handle->reset_tp_timeout();
2328
2329 uint64_t op_num = submit_manager.op_submit_start();
2330 o->op = op_num;
2331 trace.keyval("opnum", op_num);
2332
2333 if (m_filestore_do_dump)
2334 dump_transactions(o->tls, o->op, osr);
2335
2336 if (m_filestore_journal_parallel) {
31f18b77 2337 dout(5) << __FUNC__ << ": (parallel) " << o->op << " " << o->tls << dendl;
7c673cae
FG
2338
2339 trace.keyval("journal mode", "parallel");
2340 trace.event("journal started");
2341 _op_journal_transactions(tbl, orig_len, o->op, ondisk, osd_op);
2342
2343 // queue inside submit_manager op submission lock
2344 queue_op(osr, o);
2345 trace.event("op queued");
2346 } else if (m_filestore_journal_writeahead) {
31f18b77 2347 dout(5) << __FUNC__ << ": (writeahead) " << o->op << " " << o->tls << dendl;
7c673cae 2348
11fdf7f2 2349 osr->queue_journal(o);
7c673cae
FG
2350
2351 trace.keyval("journal mode", "writeahead");
2352 trace.event("journal started");
2353 _op_journal_transactions(tbl, orig_len, o->op,
2354 new C_JournaledAhead(this, osr, o, ondisk),
2355 osd_op);
2356 } else {
2357 ceph_abort();
2358 }
2359 submit_manager.op_submit_finish(op_num);
2360 utime_t end = ceph_clock_now();
2361 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2362 return 0;
2363 }
2364
2365 if (!journal) {
2366 Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
31f18b77 2367 dout(5) << __FUNC__ << ": (no journal) " << o << " " << tls << dendl;
7c673cae
FG
2368
2369 if (handle)
2370 handle->suspend_tp_timeout();
2371
2372 op_queue_reserve_throttle(o);
2373
2374 if (handle)
2375 handle->reset_tp_timeout();
2376
2377 uint64_t op_num = submit_manager.op_submit_start();
2378 o->op = op_num;
2379
2380 if (m_filestore_do_dump)
2381 dump_transactions(o->tls, o->op, osr);
2382
2383 queue_op(osr, o);
2384 trace.keyval("opnum", op_num);
2385 trace.keyval("journal mode", "none");
2386 trace.event("op queued");
2387
2388 if (ondisk)
2389 apply_manager.add_waiter(op_num, ondisk);
2390 submit_manager.op_submit_finish(op_num);
2391 utime_t end = ceph_clock_now();
2392 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2393 return 0;
2394 }
2395
11fdf7f2 2396 ceph_assert(journal);
7c673cae
FG
2397 //prepare and encode transactions data out of lock
2398 bufferlist tbl;
2399 int orig_len = -1;
2400 if (journal->is_writeable()) {
2401 orig_len = journal->prepare_entry(tls, &tbl);
2402 }
2403 uint64_t op = submit_manager.op_submit_start();
31f18b77 2404 dout(5) << __FUNC__ << ": (trailing journal) " << op << " " << tls << dendl;
7c673cae
FG
2405
2406 if (m_filestore_do_dump)
2407 dump_transactions(tls, op, osr);
2408
2409 trace.event("op_apply_start");
2410 trace.keyval("opnum", op);
2411 trace.keyval("journal mode", "trailing");
2412 apply_manager.op_apply_start(op);
2413 trace.event("do_transactions");
2414 int r = do_transactions(tls, op);
2415
2416 if (r >= 0) {
2417 trace.event("journal started");
2418 _op_journal_transactions(tbl, orig_len, op, ondisk, osd_op);
2419 } else {
2420 delete ondisk;
11fdf7f2 2421 ondisk = nullptr;
7c673cae
FG
2422 }
2423
2424 // start on_readable finisher after we queue journal item, as on_readable callback
2425 // is allowed to delete the Transaction
2426 if (onreadable_sync) {
2427 onreadable_sync->complete(r);
2428 }
2429 apply_finishers[osr->id % m_apply_finisher_num]->queue(onreadable, r);
2430
2431 submit_manager.op_submit_finish(op);
2432 trace.event("op_apply_finish");
2433 apply_manager.op_apply_finish(op);
2434
2435 utime_t end = ceph_clock_now();
2436 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2437 return r;
2438}
2439
2440void FileStore::_journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk)
2441{
31f18b77 2442 dout(5) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << " " << o->tls << dendl;
7c673cae
FG
2443
2444 o->trace.event("writeahead journal finished");
2445
2446 // this should queue in order because the journal does it's completions in order.
2447 queue_op(osr, o);
2448
2449 list<Context*> to_queue;
2450 osr->dequeue_journal(&to_queue);
2451
2452 // do ondisk completions async, to prevent any onreadable_sync completions
2453 // getting blocked behind an ondisk completion.
2454 if (ondisk) {
2455 dout(10) << " queueing ondisk " << ondisk << dendl;
2456 ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(ondisk);
2457 }
2458 if (!to_queue.empty()) {
2459 ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(to_queue);
2460 }
2461}
2462
2463int FileStore::_do_transactions(
2464 vector<Transaction> &tls,
2465 uint64_t op_seq,
11fdf7f2
TL
2466 ThreadPool::TPHandle *handle,
2467 const char *osr_name)
7c673cae
FG
2468{
2469 int trans_num = 0;
2470
2471 for (vector<Transaction>::iterator p = tls.begin();
2472 p != tls.end();
2473 ++p, trans_num++) {
11fdf7f2 2474 _do_transaction(*p, op_seq, trans_num, handle, osr_name);
7c673cae
FG
2475 if (handle)
2476 handle->reset_tp_timeout();
2477 }
2478
2479 return 0;
2480}
2481
2482void FileStore::_set_global_replay_guard(const coll_t& cid,
2483 const SequencerPosition &spos)
2484{
2485 if (backend->can_checkpoint())
2486 return;
2487
2488 // sync all previous operations on this sequencer
2489 int ret = object_map->sync();
2490 if (ret < 0) {
31f18b77 2491 derr << __FUNC__ << ": omap sync error " << cpp_strerror(ret) << dendl;
11fdf7f2 2492 ceph_abort_msg("_set_global_replay_guard failed");
7c673cae
FG
2493 }
2494 ret = sync_filesystem(basedir_fd);
2495 if (ret < 0) {
31f18b77 2496 derr << __FUNC__ << ": sync_filesystem error " << cpp_strerror(ret) << dendl;
11fdf7f2 2497 ceph_abort_msg("_set_global_replay_guard failed");
7c673cae
FG
2498 }
2499
2500 char fn[PATH_MAX];
2501 get_cdir(cid, fn, sizeof(fn));
91327a77 2502 int fd = ::open(fn, O_RDONLY|O_CLOEXEC);
7c673cae
FG
2503 if (fd < 0) {
2504 int err = errno;
31f18b77 2505 derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
11fdf7f2 2506 ceph_abort_msg("_set_global_replay_guard failed");
7c673cae
FG
2507 }
2508
2509 _inject_failure();
2510
2511 // then record that we did it
2512 bufferlist v;
11fdf7f2 2513 encode(spos, v);
7c673cae
FG
2514 int r = chain_fsetxattr<true, true>(
2515 fd, GLOBAL_REPLAY_GUARD_XATTR, v.c_str(), v.length());
2516 if (r < 0) {
31f18b77 2517 derr << __FUNC__ << ": fsetxattr " << GLOBAL_REPLAY_GUARD_XATTR
7c673cae 2518 << " got " << cpp_strerror(r) << dendl;
11fdf7f2 2519 ceph_abort_msg("fsetxattr failed");
7c673cae
FG
2520 }
2521
2522 // and make sure our xattr is durable.
a8e16298
TL
2523 r = ::fsync(fd);
2524 if (r < 0) {
2525 derr << __func__ << " fsync failed: " << cpp_strerror(errno) << dendl;
2526 ceph_abort();
2527 }
7c673cae
FG
2528
2529 _inject_failure();
2530
2531 VOID_TEMP_FAILURE_RETRY(::close(fd));
31f18b77 2532 dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
7c673cae
FG
2533}
2534
2535int FileStore::_check_global_replay_guard(const coll_t& cid,
2536 const SequencerPosition& spos)
2537{
2538 char fn[PATH_MAX];
2539 get_cdir(cid, fn, sizeof(fn));
91327a77 2540 int fd = ::open(fn, O_RDONLY|O_CLOEXEC);
7c673cae 2541 if (fd < 0) {
31f18b77 2542 dout(10) << __FUNC__ << ": " << cid << " dne" << dendl;
7c673cae
FG
2543 return 1; // if collection does not exist, there is no guard, and we can replay.
2544 }
2545
2546 char buf[100];
2547 int r = chain_fgetxattr(fd, GLOBAL_REPLAY_GUARD_XATTR, buf, sizeof(buf));
2548 if (r < 0) {
31f18b77 2549 dout(20) << __FUNC__ << ": no xattr" << dendl;
11fdf7f2 2550 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
2551 VOID_TEMP_FAILURE_RETRY(::close(fd));
2552 return 1; // no xattr
2553 }
2554 bufferlist bl;
2555 bl.append(buf, r);
2556
2557 SequencerPosition opos;
11fdf7f2
TL
2558 auto p = bl.cbegin();
2559 decode(opos, p);
7c673cae
FG
2560
2561 VOID_TEMP_FAILURE_RETRY(::close(fd));
2562 return spos >= opos ? 1 : -1;
2563}
2564
2565
2566void FileStore::_set_replay_guard(const coll_t& cid,
2567 const SequencerPosition &spos,
2568 bool in_progress=false)
2569{
2570 char fn[PATH_MAX];
2571 get_cdir(cid, fn, sizeof(fn));
91327a77 2572 int fd = ::open(fn, O_RDONLY|O_CLOEXEC);
7c673cae
FG
2573 if (fd < 0) {
2574 int err = errno;
31f18b77 2575 derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
11fdf7f2 2576 ceph_abort_msg("_set_replay_guard failed");
7c673cae
FG
2577 }
2578 _set_replay_guard(fd, spos, 0, in_progress);
2579 VOID_TEMP_FAILURE_RETRY(::close(fd));
2580}
2581
2582
2583void FileStore::_set_replay_guard(int fd,
2584 const SequencerPosition& spos,
2585 const ghobject_t *hoid,
2586 bool in_progress)
2587{
2588 if (backend->can_checkpoint())
2589 return;
2590
31f18b77 2591 dout(10) << __FUNC__ << ": " << spos << (in_progress ? " START" : "") << dendl;
7c673cae
FG
2592
2593 _inject_failure();
2594
2595 // first make sure the previous operation commits
a8e16298
TL
2596 int r = ::fsync(fd);
2597 if (r < 0) {
2598 derr << __func__ << " fsync failed: " << cpp_strerror(errno) << dendl;
2599 ceph_abort();
2600 }
7c673cae
FG
2601
2602 if (!in_progress) {
2603 // sync object_map too. even if this object has a header or keys,
2604 // it have had them in the past and then removed them, so always
2605 // sync.
2606 object_map->sync(hoid, &spos);
2607 }
2608
2609 _inject_failure();
2610
2611 // then record that we did it
2612 bufferlist v(40);
11fdf7f2
TL
2613 encode(spos, v);
2614 encode(in_progress, v);
a8e16298 2615 r = chain_fsetxattr<true, true>(
7c673cae
FG
2616 fd, REPLAY_GUARD_XATTR, v.c_str(), v.length());
2617 if (r < 0) {
2618 derr << "fsetxattr " << REPLAY_GUARD_XATTR << " got " << cpp_strerror(r) << dendl;
11fdf7f2 2619 ceph_abort_msg("fsetxattr failed");
7c673cae
FG
2620 }
2621
2622 // and make sure our xattr is durable.
a8e16298
TL
2623 r = ::fsync(fd);
2624 if (r < 0) {
2625 derr << __func__ << " fsync failed: " << cpp_strerror(errno) << dendl;
2626 ceph_abort();
2627 }
7c673cae
FG
2628
2629 _inject_failure();
2630
31f18b77 2631 dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
7c673cae
FG
2632}
2633
2634void FileStore::_close_replay_guard(const coll_t& cid,
2635 const SequencerPosition &spos)
2636{
2637 char fn[PATH_MAX];
2638 get_cdir(cid, fn, sizeof(fn));
91327a77 2639 int fd = ::open(fn, O_RDONLY|O_CLOEXEC);
7c673cae
FG
2640 if (fd < 0) {
2641 int err = errno;
31f18b77 2642 derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
11fdf7f2 2643 ceph_abort_msg("_close_replay_guard failed");
7c673cae
FG
2644 }
2645 _close_replay_guard(fd, spos);
2646 VOID_TEMP_FAILURE_RETRY(::close(fd));
2647}
2648
2649void FileStore::_close_replay_guard(int fd, const SequencerPosition& spos,
2650 const ghobject_t *hoid)
2651{
2652 if (backend->can_checkpoint())
2653 return;
2654
31f18b77 2655 dout(10) << __FUNC__ << ": " << spos << dendl;
7c673cae
FG
2656
2657 _inject_failure();
2658
2659 // sync object_map too. even if this object has a header or keys,
2660 // it have had them in the past and then removed them, so always
2661 // sync.
2662 object_map->sync(hoid, &spos);
2663
2664 // then record that we are done with this operation
2665 bufferlist v(40);
11fdf7f2 2666 encode(spos, v);
7c673cae 2667 bool in_progress = false;
11fdf7f2 2668 encode(in_progress, v);
7c673cae
FG
2669 int r = chain_fsetxattr<true, true>(
2670 fd, REPLAY_GUARD_XATTR, v.c_str(), v.length());
2671 if (r < 0) {
2672 derr << "fsetxattr " << REPLAY_GUARD_XATTR << " got " << cpp_strerror(r) << dendl;
11fdf7f2 2673 ceph_abort_msg("fsetxattr failed");
7c673cae
FG
2674 }
2675
2676 // and make sure our xattr is durable.
a8e16298
TL
2677 r = ::fsync(fd);
2678 if (r < 0) {
2679 derr << __func__ << " fsync failed: " << cpp_strerror(errno) << dendl;
2680 ceph_abort();
2681 }
7c673cae
FG
2682
2683 _inject_failure();
2684
31f18b77 2685 dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
7c673cae
FG
2686}
2687
2688int FileStore::_check_replay_guard(const coll_t& cid, const ghobject_t &oid,
2689 const SequencerPosition& spos)
2690{
2691 if (!replaying || backend->can_checkpoint())
2692 return 1;
2693
2694 int r = _check_global_replay_guard(cid, spos);
2695 if (r < 0)
2696 return r;
2697
2698 FDRef fd;
2699 r = lfn_open(cid, oid, false, &fd);
2700 if (r < 0) {
31f18b77 2701 dout(10) << __FUNC__ << ": " << cid << " " << oid << " dne" << dendl;
7c673cae
FG
2702 return 1; // if file does not exist, there is no guard, and we can replay.
2703 }
2704 int ret = _check_replay_guard(**fd, spos);
2705 lfn_close(fd);
2706 return ret;
2707}
2708
2709int FileStore::_check_replay_guard(const coll_t& cid, const SequencerPosition& spos)
2710{
2711 if (!replaying || backend->can_checkpoint())
2712 return 1;
2713
2714 char fn[PATH_MAX];
2715 get_cdir(cid, fn, sizeof(fn));
91327a77 2716 int fd = ::open(fn, O_RDONLY|O_CLOEXEC);
7c673cae 2717 if (fd < 0) {
31f18b77 2718 dout(10) << __FUNC__ << ": " << cid << " dne" << dendl;
7c673cae
FG
2719 return 1; // if collection does not exist, there is no guard, and we can replay.
2720 }
2721 int ret = _check_replay_guard(fd, spos);
2722 VOID_TEMP_FAILURE_RETRY(::close(fd));
2723 return ret;
2724}
2725
2726int FileStore::_check_replay_guard(int fd, const SequencerPosition& spos)
2727{
2728 if (!replaying || backend->can_checkpoint())
2729 return 1;
2730
2731 char buf[100];
2732 int r = chain_fgetxattr(fd, REPLAY_GUARD_XATTR, buf, sizeof(buf));
2733 if (r < 0) {
31f18b77 2734 dout(20) << __FUNC__ << ": no xattr" << dendl;
11fdf7f2 2735 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
2736 return 1; // no xattr
2737 }
2738 bufferlist bl;
2739 bl.append(buf, r);
2740
2741 SequencerPosition opos;
11fdf7f2
TL
2742 auto p = bl.cbegin();
2743 decode(opos, p);
7c673cae
FG
2744 bool in_progress = false;
2745 if (!p.end()) // older journals don't have this
11fdf7f2 2746 decode(in_progress, p);
7c673cae 2747 if (opos > spos) {
31f18b77 2748 dout(10) << __FUNC__ << ": object has " << opos << " > current pos " << spos
7c673cae
FG
2749 << ", now or in future, SKIPPING REPLAY" << dendl;
2750 return -1;
2751 } else if (opos == spos) {
2752 if (in_progress) {
31f18b77 2753 dout(10) << __FUNC__ << ": object has " << opos << " == current pos " << spos
7c673cae
FG
2754 << ", in_progress=true, CONDITIONAL REPLAY" << dendl;
2755 return 0;
2756 } else {
31f18b77 2757 dout(10) << __FUNC__ << ": object has " << opos << " == current pos " << spos
7c673cae
FG
2758 << ", in_progress=false, SKIPPING REPLAY" << dendl;
2759 return -1;
2760 }
2761 } else {
31f18b77 2762 dout(10) << __FUNC__ << ": object has " << opos << " < current pos " << spos
7c673cae
FG
2763 << ", in past, will replay" << dendl;
2764 return 1;
2765 }
2766}
2767
2768void FileStore::_do_transaction(
2769 Transaction& t, uint64_t op_seq, int trans_num,
11fdf7f2
TL
2770 ThreadPool::TPHandle *handle,
2771 const char *osr_name)
7c673cae 2772{
31f18b77 2773 dout(10) << __FUNC__ << ": on " << &t << dendl;
7c673cae 2774
7c673cae
FG
2775 Transaction::iterator i = t.begin();
2776
2777 SequencerPosition spos(op_seq, trans_num, 0);
2778 while (i.have_op()) {
2779 if (handle)
2780 handle->reset_tp_timeout();
2781
2782 Transaction::Op *op = i.decode_op();
2783 int r = 0;
2784
2785 _inject_failure();
2786
2787 switch (op->op) {
2788 case Transaction::OP_NOP:
2789 break;
2790 case Transaction::OP_TOUCH:
2791 {
2792 const coll_t &_cid = i.get_cid(op->cid);
2793 const ghobject_t &oid = i.get_oid(op->oid);
2794 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2795 _cid : _cid.get_temp();
2796 tracepoint(objectstore, touch_enter, osr_name);
2797 if (_check_replay_guard(cid, oid, spos) > 0)
2798 r = _touch(cid, oid);
2799 tracepoint(objectstore, touch_exit, r);
2800 }
2801 break;
2802
2803 case Transaction::OP_WRITE:
2804 {
2805 const coll_t &_cid = i.get_cid(op->cid);
2806 const ghobject_t &oid = i.get_oid(op->oid);
2807 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2808 _cid : _cid.get_temp();
2809 uint64_t off = op->off;
2810 uint64_t len = op->len;
2811 uint32_t fadvise_flags = i.get_fadvise_flags();
2812 bufferlist bl;
2813 i.decode_bl(bl);
2814 tracepoint(objectstore, write_enter, osr_name, off, len);
2815 if (_check_replay_guard(cid, oid, spos) > 0)
2816 r = _write(cid, oid, off, len, bl, fadvise_flags);
2817 tracepoint(objectstore, write_exit, r);
2818 }
2819 break;
2820
2821 case Transaction::OP_ZERO:
2822 {
2823 const coll_t &_cid = i.get_cid(op->cid);
2824 const ghobject_t &oid = i.get_oid(op->oid);
2825 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2826 _cid : _cid.get_temp();
2827 uint64_t off = op->off;
2828 uint64_t len = op->len;
2829 tracepoint(objectstore, zero_enter, osr_name, off, len);
2830 if (_check_replay_guard(cid, oid, spos) > 0)
2831 r = _zero(cid, oid, off, len);
2832 tracepoint(objectstore, zero_exit, r);
2833 }
2834 break;
2835
2836 case Transaction::OP_TRIMCACHE:
2837 {
2838 // deprecated, no-op
2839 }
2840 break;
2841
2842 case Transaction::OP_TRUNCATE:
2843 {
2844 const coll_t &_cid = i.get_cid(op->cid);
2845 const ghobject_t &oid = i.get_oid(op->oid);
2846 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2847 _cid : _cid.get_temp();
2848 uint64_t off = op->off;
2849 tracepoint(objectstore, truncate_enter, osr_name, off);
2850 if (_check_replay_guard(cid, oid, spos) > 0)
2851 r = _truncate(cid, oid, off);
2852 tracepoint(objectstore, truncate_exit, r);
2853 }
2854 break;
2855
2856 case Transaction::OP_REMOVE:
2857 {
2858 const coll_t &_cid = i.get_cid(op->cid);
2859 const ghobject_t &oid = i.get_oid(op->oid);
2860 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2861 _cid : _cid.get_temp();
2862 tracepoint(objectstore, remove_enter, osr_name);
2863 if (_check_replay_guard(cid, oid, spos) > 0)
2864 r = _remove(cid, oid, spos);
2865 tracepoint(objectstore, remove_exit, r);
2866 }
2867 break;
2868
2869 case Transaction::OP_SETATTR:
2870 {
2871 const coll_t &_cid = i.get_cid(op->cid);
2872 const ghobject_t &oid = i.get_oid(op->oid);
2873 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2874 _cid : _cid.get_temp();
2875 string name = i.decode_string();
2876 bufferlist bl;
2877 i.decode_bl(bl);
2878 tracepoint(objectstore, setattr_enter, osr_name);
2879 if (_check_replay_guard(cid, oid, spos) > 0) {
2880 map<string, bufferptr> to_set;
2881 to_set[name] = bufferptr(bl.c_str(), bl.length());
2882 r = _setattrs(cid, oid, to_set, spos);
2883 if (r == -ENOSPC)
2884 dout(0) << " ENOSPC on setxattr on " << cid << "/" << oid
2885 << " name " << name << " size " << bl.length() << dendl;
2886 }
2887 tracepoint(objectstore, setattr_exit, r);
2888 }
2889 break;
2890
2891 case Transaction::OP_SETATTRS:
2892 {
2893 const coll_t &_cid = i.get_cid(op->cid);
2894 const ghobject_t &oid = i.get_oid(op->oid);
2895 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2896 _cid : _cid.get_temp();
2897 map<string, bufferptr> aset;
2898 i.decode_attrset(aset);
2899 tracepoint(objectstore, setattrs_enter, osr_name);
2900 if (_check_replay_guard(cid, oid, spos) > 0)
2901 r = _setattrs(cid, oid, aset, spos);
2902 tracepoint(objectstore, setattrs_exit, r);
2903 if (r == -ENOSPC)
2904 dout(0) << " ENOSPC on setxattrs on " << cid << "/" << oid << dendl;
2905 }
2906 break;
2907
2908 case Transaction::OP_RMATTR:
2909 {
2910 const coll_t &_cid = i.get_cid(op->cid);
2911 const ghobject_t &oid = i.get_oid(op->oid);
2912 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2913 _cid : _cid.get_temp();
2914 string name = i.decode_string();
2915 tracepoint(objectstore, rmattr_enter, osr_name);
2916 if (_check_replay_guard(cid, oid, spos) > 0)
2917 r = _rmattr(cid, oid, name.c_str(), spos);
2918 tracepoint(objectstore, rmattr_exit, r);
2919 }
2920 break;
2921
2922 case Transaction::OP_RMATTRS:
2923 {
2924 const coll_t &_cid = i.get_cid(op->cid);
2925 const ghobject_t &oid = i.get_oid(op->oid);
2926 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2927 _cid : _cid.get_temp();
2928 tracepoint(objectstore, rmattrs_enter, osr_name);
2929 if (_check_replay_guard(cid, oid, spos) > 0)
2930 r = _rmattrs(cid, oid, spos);
2931 tracepoint(objectstore, rmattrs_exit, r);
2932 }
2933 break;
2934
2935 case Transaction::OP_CLONE:
2936 {
2937 const coll_t &_cid = i.get_cid(op->cid);
2938 const ghobject_t &oid = i.get_oid(op->oid);
2939 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2940 _cid : _cid.get_temp();
2941 const ghobject_t &noid = i.get_oid(op->dest_oid);
2942 tracepoint(objectstore, clone_enter, osr_name);
2943 r = _clone(cid, oid, noid, spos);
2944 tracepoint(objectstore, clone_exit, r);
2945 }
2946 break;
2947
2948 case Transaction::OP_CLONERANGE:
2949 {
2950 const coll_t &_cid = i.get_cid(op->cid);
2951 const ghobject_t &oid = i.get_oid(op->oid);
2952 const ghobject_t &noid = i.get_oid(op->dest_oid);
2953 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2954 _cid : _cid.get_temp();
2955 const coll_t &ncid = !_need_temp_object_collection(_cid, noid) ?
2956 _cid : _cid.get_temp();
2957 uint64_t off = op->off;
2958 uint64_t len = op->len;
2959 tracepoint(objectstore, clone_range_enter, osr_name, len);
2960 r = _clone_range(cid, oid, ncid, noid, off, len, off, spos);
2961 tracepoint(objectstore, clone_range_exit, r);
2962 }
2963 break;
2964
2965 case Transaction::OP_CLONERANGE2:
2966 {
2967 const coll_t &_cid = i.get_cid(op->cid);
2968 const ghobject_t &oid = i.get_oid(op->oid);
2969 const ghobject_t &noid = i.get_oid(op->dest_oid);
2970 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2971 _cid : _cid.get_temp();
2972 const coll_t &ncid = !_need_temp_object_collection(_cid, noid) ?
2973 _cid : _cid.get_temp();
2974 uint64_t srcoff = op->off;
2975 uint64_t len = op->len;
2976 uint64_t dstoff = op->dest_off;
2977 tracepoint(objectstore, clone_range2_enter, osr_name, len);
2978 r = _clone_range(cid, oid, ncid, noid, srcoff, len, dstoff, spos);
2979 tracepoint(objectstore, clone_range2_exit, r);
2980 }
2981 break;
2982
2983 case Transaction::OP_MKCOLL:
2984 {
2985 const coll_t &cid = i.get_cid(op->cid);
2986 tracepoint(objectstore, mkcoll_enter, osr_name);
2987 if (_check_replay_guard(cid, spos) > 0)
2988 r = _create_collection(cid, op->split_bits, spos);
2989 tracepoint(objectstore, mkcoll_exit, r);
2990 }
2991 break;
2992
2993 case Transaction::OP_COLL_SET_BITS:
2994 {
2995 const coll_t &cid = i.get_cid(op->cid);
2996 int bits = op->split_bits;
2997 r = _collection_set_bits(cid, bits);
2998 }
2999 break;
3000
3001 case Transaction::OP_COLL_HINT:
3002 {
3003 const coll_t &cid = i.get_cid(op->cid);
3004 uint32_t type = op->hint_type;
3005 bufferlist hint;
3006 i.decode_bl(hint);
11fdf7f2 3007 auto hiter = hint.cbegin();
7c673cae
FG
3008 if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) {
3009 uint32_t pg_num;
3010 uint64_t num_objs;
11fdf7f2
TL
3011 decode(pg_num, hiter);
3012 decode(num_objs, hiter);
7c673cae
FG
3013 if (_check_replay_guard(cid, spos) > 0) {
3014 r = _collection_hint_expected_num_objs(cid, pg_num, num_objs, spos);
3015 }
3016 } else {
3017 // Ignore the hint
3018 dout(10) << "Unrecognized collection hint type: " << type << dendl;
3019 }
3020 }
3021 break;
3022
3023 case Transaction::OP_RMCOLL:
3024 {
3025 const coll_t &cid = i.get_cid(op->cid);
3026 tracepoint(objectstore, rmcoll_enter, osr_name);
3027 if (_check_replay_guard(cid, spos) > 0)
3028 r = _destroy_collection(cid);
3029 tracepoint(objectstore, rmcoll_exit, r);
3030 }
3031 break;
3032
3033 case Transaction::OP_COLL_ADD:
3034 {
3035 const coll_t &ocid = i.get_cid(op->cid);
3036 const coll_t &ncid = i.get_cid(op->dest_cid);
3037 const ghobject_t &oid = i.get_oid(op->oid);
3038
11fdf7f2 3039 ceph_assert(oid.hobj.pool >= -1);
7c673cae
FG
3040
3041 // always followed by OP_COLL_REMOVE
3042 Transaction::Op *op2 = i.decode_op();
3043 const coll_t &ocid2 = i.get_cid(op2->cid);
3044 const ghobject_t &oid2 = i.get_oid(op2->oid);
11fdf7f2
TL
3045 ceph_assert(op2->op == Transaction::OP_COLL_REMOVE);
3046 ceph_assert(ocid2 == ocid);
3047 ceph_assert(oid2 == oid);
7c673cae
FG
3048
3049 tracepoint(objectstore, coll_add_enter);
3050 r = _collection_add(ncid, ocid, oid, spos);
3051 tracepoint(objectstore, coll_add_exit, r);
3052 spos.op++;
3053 if (r < 0)
3054 break;
3055 tracepoint(objectstore, coll_remove_enter, osr_name);
3056 if (_check_replay_guard(ocid, oid, spos) > 0)
3057 r = _remove(ocid, oid, spos);
3058 tracepoint(objectstore, coll_remove_exit, r);
3059 }
3060 break;
3061
3062 case Transaction::OP_COLL_MOVE:
3063 {
3064 // WARNING: this is deprecated and buggy; only here to replay old journals.
3065 const coll_t &ocid = i.get_cid(op->cid);
3066 const coll_t &ncid = i.get_cid(op->dest_cid);
3067 const ghobject_t &oid = i.get_oid(op->oid);
3068 tracepoint(objectstore, coll_move_enter);
3069 r = _collection_add(ocid, ncid, oid, spos);
3070 if (r == 0 &&
3071 (_check_replay_guard(ocid, oid, spos) > 0))
3072 r = _remove(ocid, oid, spos);
3073 tracepoint(objectstore, coll_move_exit, r);
3074 }
3075 break;
3076
3077 case Transaction::OP_COLL_MOVE_RENAME:
3078 {
3079 const coll_t &_oldcid = i.get_cid(op->cid);
3080 const ghobject_t &oldoid = i.get_oid(op->oid);
3081 const coll_t &_newcid = i.get_cid(op->dest_cid);
3082 const ghobject_t &newoid = i.get_oid(op->dest_oid);
3083 const coll_t &oldcid = !_need_temp_object_collection(_oldcid, oldoid) ?
3084 _oldcid : _oldcid.get_temp();
3085 const coll_t &newcid = !_need_temp_object_collection(_newcid, newoid) ?
3086 _oldcid : _newcid.get_temp();
3087 tracepoint(objectstore, coll_move_rename_enter);
3088 r = _collection_move_rename(oldcid, oldoid, newcid, newoid, spos);
3089 tracepoint(objectstore, coll_move_rename_exit, r);
3090 }
3091 break;
3092
3093 case Transaction::OP_TRY_RENAME:
3094 {
3095 const coll_t &_cid = i.get_cid(op->cid);
3096 const ghobject_t &oldoid = i.get_oid(op->oid);
3097 const ghobject_t &newoid = i.get_oid(op->dest_oid);
3098 const coll_t &oldcid = !_need_temp_object_collection(_cid, oldoid) ?
3099 _cid : _cid.get_temp();
3100 const coll_t &newcid = !_need_temp_object_collection(_cid, newoid) ?
3101 _cid : _cid.get_temp();
3102 tracepoint(objectstore, coll_try_rename_enter);
3103 r = _collection_move_rename(oldcid, oldoid, newcid, newoid, spos, true);
3104 tracepoint(objectstore, coll_try_rename_exit, r);
3105 }
3106 break;
3107
3108 case Transaction::OP_COLL_SETATTR:
3109 case Transaction::OP_COLL_RMATTR:
11fdf7f2 3110 ceph_abort_msg("collection attr methods no longer implemented");
7c673cae
FG
3111 break;
3112
3113 case Transaction::OP_COLL_RENAME:
3114 {
3115 r = -EOPNOTSUPP;
3116 }
3117 break;
3118
3119 case Transaction::OP_OMAP_CLEAR:
3120 {
3121 const coll_t &_cid = i.get_cid(op->cid);
3122 const ghobject_t &oid = i.get_oid(op->oid);
3123 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3124 _cid : _cid.get_temp();
3125 tracepoint(objectstore, omap_clear_enter, osr_name);
28e407b8
AA
3126 if (_check_replay_guard(cid, oid, spos) > 0)
3127 r = _omap_clear(cid, oid, spos);
7c673cae
FG
3128 tracepoint(objectstore, omap_clear_exit, r);
3129 }
3130 break;
3131 case Transaction::OP_OMAP_SETKEYS:
3132 {
3133 const coll_t &_cid = i.get_cid(op->cid);
3134 const ghobject_t &oid = i.get_oid(op->oid);
3135 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3136 _cid : _cid.get_temp();
3137 map<string, bufferlist> aset;
3138 i.decode_attrset(aset);
3139 tracepoint(objectstore, omap_setkeys_enter, osr_name);
28e407b8
AA
3140 if (_check_replay_guard(cid, oid, spos) > 0)
3141 r = _omap_setkeys(cid, oid, aset, spos);
7c673cae
FG
3142 tracepoint(objectstore, omap_setkeys_exit, r);
3143 }
3144 break;
3145 case Transaction::OP_OMAP_RMKEYS:
3146 {
3147 const coll_t &_cid = i.get_cid(op->cid);
3148 const ghobject_t &oid = i.get_oid(op->oid);
3149 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3150 _cid : _cid.get_temp();
3151 set<string> keys;
3152 i.decode_keyset(keys);
3153 tracepoint(objectstore, omap_rmkeys_enter, osr_name);
28e407b8
AA
3154 if (_check_replay_guard(cid, oid, spos) > 0)
3155 r = _omap_rmkeys(cid, oid, keys, spos);
7c673cae
FG
3156 tracepoint(objectstore, omap_rmkeys_exit, r);
3157 }
3158 break;
3159 case Transaction::OP_OMAP_RMKEYRANGE:
3160 {
3161 const coll_t &_cid = i.get_cid(op->cid);
3162 const ghobject_t &oid = i.get_oid(op->oid);
3163 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3164 _cid : _cid.get_temp();
3165 string first, last;
3166 first = i.decode_string();
3167 last = i.decode_string();
3168 tracepoint(objectstore, omap_rmkeyrange_enter, osr_name);
28e407b8
AA
3169 if (_check_replay_guard(cid, oid, spos) > 0)
3170 r = _omap_rmkeyrange(cid, oid, first, last, spos);
7c673cae
FG
3171 tracepoint(objectstore, omap_rmkeyrange_exit, r);
3172 }
3173 break;
3174 case Transaction::OP_OMAP_SETHEADER:
3175 {
3176 const coll_t &_cid = i.get_cid(op->cid);
3177 const ghobject_t &oid = i.get_oid(op->oid);
3178 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3179 _cid : _cid.get_temp();
3180 bufferlist bl;
3181 i.decode_bl(bl);
3182 tracepoint(objectstore, omap_setheader_enter, osr_name);
28e407b8
AA
3183 if (_check_replay_guard(cid, oid, spos) > 0)
3184 r = _omap_setheader(cid, oid, bl, spos);
7c673cae
FG
3185 tracepoint(objectstore, omap_setheader_exit, r);
3186 }
3187 break;
3188 case Transaction::OP_SPLIT_COLLECTION:
3189 {
11fdf7f2 3190 ceph_abort_msg("not legacy journal; upgrade to firefly first");
7c673cae
FG
3191 }
3192 break;
3193 case Transaction::OP_SPLIT_COLLECTION2:
3194 {
3195 coll_t cid = i.get_cid(op->cid);
3196 uint32_t bits = op->split_bits;
3197 uint32_t rem = op->split_rem;
3198 coll_t dest = i.get_cid(op->dest_cid);
3199 tracepoint(objectstore, split_coll2_enter, osr_name);
3200 r = _split_collection(cid, bits, rem, dest, spos);
3201 tracepoint(objectstore, split_coll2_exit, r);
3202 }
3203 break;
3204
11fdf7f2
TL
3205 case Transaction::OP_MERGE_COLLECTION:
3206 {
3207 coll_t cid = i.get_cid(op->cid);
3208 uint32_t bits = op->split_bits;
3209 coll_t dest = i.get_cid(op->dest_cid);
3210 tracepoint(objectstore, merge_coll_enter, osr_name);
3211 r = _merge_collection(cid, bits, dest, spos);
3212 tracepoint(objectstore, merge_coll_exit, r);
3213 }
3214 break;
3215
7c673cae
FG
3216 case Transaction::OP_SETALLOCHINT:
3217 {
3218 const coll_t &_cid = i.get_cid(op->cid);
3219 const ghobject_t &oid = i.get_oid(op->oid);
3220 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3221 _cid : _cid.get_temp();
3222 uint64_t expected_object_size = op->expected_object_size;
3223 uint64_t expected_write_size = op->expected_write_size;
3224 tracepoint(objectstore, setallochint_enter, osr_name);
3225 if (_check_replay_guard(cid, oid, spos) > 0)
3226 r = _set_alloc_hint(cid, oid, expected_object_size,
3227 expected_write_size);
3228 tracepoint(objectstore, setallochint_exit, r);
3229 }
3230 break;
3231
3232 default:
3233 derr << "bad op " << op->op << dendl;
3234 ceph_abort();
3235 }
3236
3237 if (r < 0) {
3238 bool ok = false;
3239
3240 if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE ||
3241 op->op == Transaction::OP_CLONE ||
3242 op->op == Transaction::OP_CLONERANGE2 ||
3243 op->op == Transaction::OP_COLL_ADD ||
3244 op->op == Transaction::OP_SETATTR ||
3245 op->op == Transaction::OP_SETATTRS ||
3246 op->op == Transaction::OP_RMATTR ||
3247 op->op == Transaction::OP_OMAP_SETKEYS ||
3248 op->op == Transaction::OP_OMAP_RMKEYS ||
3249 op->op == Transaction::OP_OMAP_RMKEYRANGE ||
3250 op->op == Transaction::OP_OMAP_SETHEADER))
3251 // -ENOENT is normally okay
3252 // ...including on a replayed OP_RMCOLL with checkpoint mode
3253 ok = true;
3254 if (r == -ENODATA)
3255 ok = true;
3256
3257 if (op->op == Transaction::OP_SETALLOCHINT)
3258 // Either EOPNOTSUPP or EINVAL most probably. EINVAL in most
3259 // cases means invalid hint size (e.g. too big, not a multiple
3260 // of block size, etc) or, at least on xfs, an attempt to set
3261 // or change it when the file is not empty. However,
3262 // OP_SETALLOCHINT is advisory, so ignore all errors.
3263 ok = true;
3264
3265 if (replaying && !backend->can_checkpoint()) {
3266 if (r == -EEXIST && op->op == Transaction::OP_MKCOLL) {
3267 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3268 ok = true;
3269 }
3270 if (r == -EEXIST && op->op == Transaction::OP_COLL_ADD) {
3271 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3272 ok = true;
3273 }
3274 if (r == -EEXIST && op->op == Transaction::OP_COLL_MOVE) {
3275 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3276 ok = true;
3277 }
3278 if (r == -ERANGE) {
3279 dout(10) << "tolerating ERANGE on replay" << dendl;
3280 ok = true;
3281 }
3282 if (r == -ENOENT) {
3283 dout(10) << "tolerating ENOENT on replay" << dendl;
3284 ok = true;
3285 }
3286 }
3287
3288 if (!ok) {
3289 const char *msg = "unexpected error code";
3290
3291 if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE ||
3292 op->op == Transaction::OP_CLONE ||
3293 op->op == Transaction::OP_CLONERANGE2)) {
3294 msg = "ENOENT on clone suggests osd bug";
3295 } else if (r == -ENOSPC) {
3296 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
3297 // by partially applying transactions.
3298 msg = "ENOSPC from disk filesystem, misconfigured cluster";
3299 } else if (r == -ENOTEMPTY) {
3300 msg = "ENOTEMPTY suggests garbage data in osd data dir";
3301 } else if (r == -EPERM) {
3302 msg = "EPERM suggests file(s) in osd data dir not owned by ceph user, or leveldb corruption";
3303 }
3304
3305 derr << " error " << cpp_strerror(r) << " not handled on operation " << op
3306 << " (" << spos << ", or op " << spos.op << ", counting from 0)" << dendl;
3307 dout(0) << msg << dendl;
3308 dout(0) << " transaction dump:\n";
3309 JSONFormatter f(true);
3310 f.open_object_section("transaction");
3311 t.dump(&f);
3312 f.close_section();
3313 f.flush(*_dout);
3314 *_dout << dendl;
3315
3316 if (r == -EMFILE) {
3317 dump_open_fds(cct);
3318 }
3319
11fdf7f2 3320 ceph_abort_msg("unexpected error");
7c673cae
FG
3321 }
3322 }
3323
3324 spos.op++;
3325 }
3326
3327 _inject_failure();
3328}
3329
3330 /*********************************************/
3331
3332
3333
3334// --------------------
3335// objects
3336
11fdf7f2 3337bool FileStore::exists(CollectionHandle& ch, const ghobject_t& oid)
7c673cae 3338{
11fdf7f2
TL
3339 tracepoint(objectstore, exists_enter, ch->cid.c_str());
3340 auto osr = static_cast<OpSequencer*>(ch.get());
3341 osr->wait_for_apply(oid);
7c673cae 3342 struct stat st;
11fdf7f2 3343 bool retval = stat(ch, oid, &st) == 0;
7c673cae
FG
3344 tracepoint(objectstore, exists_exit, retval);
3345 return retval;
3346}
3347
3348int FileStore::stat(
11fdf7f2 3349 CollectionHandle& ch, const ghobject_t& oid, struct stat *st, bool allow_eio)
7c673cae 3350{
11fdf7f2
TL
3351 tracepoint(objectstore, stat_enter, ch->cid.c_str());
3352 auto osr = static_cast<OpSequencer*>(ch.get());
3353 osr->wait_for_apply(oid);
3354 const coll_t& cid = !_need_temp_object_collection(ch->cid, oid) ? ch->cid : ch->cid.get_temp();
7c673cae 3355 int r = lfn_stat(cid, oid, st);
11fdf7f2 3356 ceph_assert(allow_eio || !m_filestore_fail_eio || r != -EIO);
7c673cae 3357 if (r < 0) {
11fdf7f2 3358 dout(10) << __FUNC__ << ": " << ch->cid << "/" << oid
7c673cae
FG
3359 << " = " << r << dendl;
3360 } else {
11fdf7f2 3361 dout(10) << __FUNC__ << ": " << ch->cid << "/" << oid
7c673cae
FG
3362 << " = " << r
3363 << " (size " << st->st_size << ")" << dendl;
3364 }
3365 if (cct->_conf->filestore_debug_inject_read_err &&
3366 debug_mdata_eio(oid)) {
3367 return -EIO;
3368 } else {
3369 tracepoint(objectstore, stat_exit, r);
3370 return r;
3371 }
3372}
3373
3374int FileStore::set_collection_opts(
11fdf7f2 3375 CollectionHandle& ch,
7c673cae
FG
3376 const pool_opts_t& opts)
3377{
3378 return -EOPNOTSUPP;
3379}
3380
3381int FileStore::read(
11fdf7f2 3382 CollectionHandle& ch,
7c673cae
FG
3383 const ghobject_t& oid,
3384 uint64_t offset,
3385 size_t len,
3386 bufferlist& bl,
224ce89b 3387 uint32_t op_flags)
7c673cae
FG
3388{
3389 int got;
11fdf7f2
TL
3390 tracepoint(objectstore, read_enter, ch->cid.c_str(), offset, len);
3391 const coll_t& cid = !_need_temp_object_collection(ch->cid, oid) ? ch->cid : ch->cid.get_temp();
7c673cae 3392
31f18b77 3393 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
7c673cae 3394
11fdf7f2
TL
3395 auto osr = static_cast<OpSequencer*>(ch.get());
3396 osr->wait_for_apply(oid);
3397
7c673cae
FG
3398 FDRef fd;
3399 int r = lfn_open(cid, oid, false, &fd);
3400 if (r < 0) {
31f18b77 3401 dout(10) << __FUNC__ << ": (" << cid << "/" << oid << ") open error: "
7c673cae
FG
3402 << cpp_strerror(r) << dendl;
3403 return r;
3404 }
3405
3406 if (offset == 0 && len == 0) {
3407 struct stat st;
3408 memset(&st, 0, sizeof(struct stat));
3409 int r = ::fstat(**fd, &st);
11fdf7f2 3410 ceph_assert(r == 0);
7c673cae
FG
3411 len = st.st_size;
3412 }
3413
3414#ifdef HAVE_POSIX_FADVISE
3415 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_RANDOM)
3416 posix_fadvise(**fd, offset, len, POSIX_FADV_RANDOM);
3417 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL)
3418 posix_fadvise(**fd, offset, len, POSIX_FADV_SEQUENTIAL);
3419#endif
3420
3421 bufferptr bptr(len); // prealloc space for entire read
3422 got = safe_pread(**fd, bptr.c_str(), len, offset);
3423 if (got < 0) {
31f18b77 3424 dout(10) << __FUNC__ << ": (" << cid << "/" << oid << ") pread error: " << cpp_strerror(got) << dendl;
7c673cae 3425 lfn_close(fd);
7c673cae
FG
3426 return got;
3427 }
3428 bptr.set_length(got); // properly size the buffer
3429 bl.clear();
3430 bl.push_back(std::move(bptr)); // put it in the target bufferlist
3431
3432#ifdef HAVE_POSIX_FADVISE
3433 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED)
3434 posix_fadvise(**fd, offset, len, POSIX_FADV_DONTNEED);
3435 if (op_flags & (CEPH_OSD_OP_FLAG_FADVISE_RANDOM | CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL))
3436 posix_fadvise(**fd, offset, len, POSIX_FADV_NORMAL);
3437#endif
3438
3439 if (m_filestore_sloppy_crc && (!replaying || backend->can_checkpoint())) {
3440 ostringstream ss;
3441 int errors = backend->_crc_verify_read(**fd, offset, got, bl, &ss);
3442 if (errors != 0) {
31f18b77 3443 dout(0) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~"
7c673cae 3444 << got << " ... BAD CRC:\n" << ss.str() << dendl;
11fdf7f2 3445 ceph_abort_msg("bad crc on read");
7c673cae
FG
3446 }
3447 }
3448
3449 lfn_close(fd);
3450
31f18b77 3451 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~"
7c673cae
FG
3452 << got << "/" << len << dendl;
3453 if (cct->_conf->filestore_debug_inject_read_err &&
3454 debug_data_eio(oid)) {
3455 return -EIO;
11fdf7f2
TL
3456 } else if (oid.hobj.pool > 0 && /* FIXME, see #23029 */
3457 cct->_conf->filestore_debug_random_read_err &&
3458 (rand() % (int)(cct->_conf->filestore_debug_random_read_err *
3459 100.0)) == 0) {
224ce89b
WB
3460 dout(0) << __func__ << ": inject random EIO" << dendl;
3461 return -EIO;
7c673cae
FG
3462 } else {
3463 tracepoint(objectstore, read_exit, got);
3464 return got;
3465 }
3466}
3467
3468int FileStore::_do_fiemap(int fd, uint64_t offset, size_t len,
3469 map<uint64_t, uint64_t> *m)
3470{
3471 uint64_t i;
11fdf7f2
TL
3472 struct fiemap_extent *extent = nullptr;
3473 struct fiemap *fiemap = nullptr;
7c673cae
FG
3474 int r = 0;
3475
3476more:
3477 r = backend->do_fiemap(fd, offset, len, &fiemap);
3478 if (r < 0)
3479 return r;
3480
3481 if (fiemap->fm_mapped_extents == 0) {
3482 free(fiemap);
3483 return r;
3484 }
3485
3486 extent = &fiemap->fm_extents[0];
3487
3488 /* start where we were asked to start */
3489 if (extent->fe_logical < offset) {
3490 extent->fe_length -= offset - extent->fe_logical;
3491 extent->fe_logical = offset;
3492 }
3493
3494 i = 0;
3495
3496 struct fiemap_extent *last = nullptr;
3497 while (i < fiemap->fm_mapped_extents) {
3498 struct fiemap_extent *next = extent + 1;
3499
31f18b77 3500 dout(10) << __FUNC__ << ": fm_mapped_extents=" << fiemap->fm_mapped_extents
7c673cae
FG
3501 << " fe_logical=" << extent->fe_logical << " fe_length=" << extent->fe_length << dendl;
3502
3503 /* try to merge extents */
3504 while ((i < fiemap->fm_mapped_extents - 1) &&
3505 (extent->fe_logical + extent->fe_length == next->fe_logical)) {
3506 next->fe_length += extent->fe_length;
3507 next->fe_logical = extent->fe_logical;
3508 extent = next;
3509 next = extent + 1;
3510 i++;
3511 }
3512
3513 if (extent->fe_logical + extent->fe_length > offset + len)
3514 extent->fe_length = offset + len - extent->fe_logical;
3515 (*m)[extent->fe_logical] = extent->fe_length;
3516 i++;
3517 last = extent++;
3518 }
3519 uint64_t xoffset = last->fe_logical + last->fe_length - offset;
3520 offset = last->fe_logical + last->fe_length;
3521 len -= xoffset;
3522 const bool is_last = (last->fe_flags & FIEMAP_EXTENT_LAST) || (len == 0);
3523 free(fiemap);
3524 if (!is_last) {
3525 goto more;
3526 }
3527
3528 return r;
3529}
3530
3531int FileStore::_do_seek_hole_data(int fd, uint64_t offset, size_t len,
3532 map<uint64_t, uint64_t> *m)
3533{
3534#if defined(__linux__) && defined(SEEK_HOLE) && defined(SEEK_DATA)
3535 off_t hole_pos, data_pos;
3536 int r = 0;
3537
3538 // If lseek fails with errno setting to be ENXIO, this means the current
3539 // file offset is beyond the end of the file.
3540 off_t start = offset;
3541 while(start < (off_t)(offset + len)) {
3542 data_pos = lseek(fd, start, SEEK_DATA);
3543 if (data_pos < 0) {
3544 if (errno == ENXIO)
3545 break;
3546 else {
3547 r = -errno;
3548 dout(10) << "failed to lseek: " << cpp_strerror(r) << dendl;
3549 return r;
3550 }
3551 } else if (data_pos > (off_t)(offset + len)) {
3552 break;
3553 }
3554
3555 hole_pos = lseek(fd, data_pos, SEEK_HOLE);
3556 if (hole_pos < 0) {
3557 if (errno == ENXIO) {
3558 break;
3559 } else {
3560 r = -errno;
3561 dout(10) << "failed to lseek: " << cpp_strerror(r) << dendl;
3562 return r;
3563 }
3564 }
3565
3566 if (hole_pos >= (off_t)(offset + len)) {
3567 (*m)[data_pos] = offset + len - data_pos;
3568 break;
3569 }
3570 (*m)[data_pos] = hole_pos - data_pos;
3571 start = hole_pos;
3572 }
3573
3574 return r;
3575#else
3576 (*m)[offset] = len;
3577 return 0;
3578#endif
3579}
3580
11fdf7f2 3581int FileStore::fiemap(CollectionHandle& ch, const ghobject_t& oid,
7c673cae
FG
3582 uint64_t offset, size_t len,
3583 bufferlist& bl)
3584{
3585 map<uint64_t, uint64_t> exomap;
11fdf7f2 3586 int r = fiemap(ch, oid, offset, len, exomap);
7c673cae 3587 if (r >= 0) {
11fdf7f2 3588 encode(exomap, bl);
7c673cae
FG
3589 }
3590 return r;
3591}
3592
11fdf7f2 3593int FileStore::fiemap(CollectionHandle& ch, const ghobject_t& oid,
7c673cae
FG
3594 uint64_t offset, size_t len,
3595 map<uint64_t, uint64_t>& destmap)
3596{
11fdf7f2
TL
3597 tracepoint(objectstore, fiemap_enter, ch->cid.c_str(), offset, len);
3598 const coll_t& cid = !_need_temp_object_collection(ch->cid, oid) ? ch->cid : ch->cid.get_temp();
7c673cae
FG
3599 destmap.clear();
3600
3601 if ((!backend->has_seek_data_hole() && !backend->has_fiemap()) ||
3602 len <= (size_t)m_filestore_fiemap_threshold) {
3603 destmap[offset] = len;
3604 return 0;
3605 }
3606
31f18b77 3607 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
7c673cae 3608
11fdf7f2
TL
3609 auto osr = static_cast<OpSequencer*>(ch.get());
3610 osr->wait_for_apply(oid);
3611
7c673cae
FG
3612 FDRef fd;
3613
3614 int r = lfn_open(cid, oid, false, &fd);
3615 if (r < 0) {
3616 dout(10) << "read couldn't open " << cid << "/" << oid << ": " << cpp_strerror(r) << dendl;
3617 goto done;
3618 }
3619
3620 if (backend->has_seek_data_hole()) {
3621 dout(15) << "seek_data/seek_hole " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3622 r = _do_seek_hole_data(**fd, offset, len, &destmap);
3623 } else if (backend->has_fiemap()) {
3624 dout(15) << "fiemap ioctl" << cid << "/" << oid << " " << offset << "~" << len << dendl;
3625 r = _do_fiemap(**fd, offset, len, &destmap);
3626 }
3627
3628 lfn_close(fd);
3629
3630done:
3631
31f18b77 3632 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << " num_extents=" << destmap.size() << " " << destmap << dendl;
11fdf7f2 3633 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
3634 tracepoint(objectstore, fiemap_exit, r);
3635 return r;
3636}
3637
3638int FileStore::_remove(const coll_t& cid, const ghobject_t& oid,
3639 const SequencerPosition &spos)
3640{
31f18b77 3641 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
7c673cae 3642 int r = lfn_unlink(cid, oid, spos);
31f18b77 3643 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
7c673cae
FG
3644 return r;
3645}
3646
3647int FileStore::_truncate(const coll_t& cid, const ghobject_t& oid, uint64_t size)
3648{
31f18b77 3649 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " size " << size << dendl;
7c673cae 3650 int r = lfn_truncate(cid, oid, size);
31f18b77 3651 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " size " << size << " = " << r << dendl;
7c673cae
FG
3652 return r;
3653}
3654
3655
3656int FileStore::_touch(const coll_t& cid, const ghobject_t& oid)
3657{
31f18b77 3658 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
7c673cae
FG
3659
3660 FDRef fd;
3661 int r = lfn_open(cid, oid, true, &fd);
3662 if (r < 0) {
3663 return r;
3664 } else {
3665 lfn_close(fd);
3666 }
31f18b77 3667 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
7c673cae
FG
3668 return r;
3669}
3670
3671int FileStore::_write(const coll_t& cid, const ghobject_t& oid,
3672 uint64_t offset, size_t len,
3673 const bufferlist& bl, uint32_t fadvise_flags)
3674{
31f18b77 3675 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
7c673cae
FG
3676 int r;
3677
3678 FDRef fd;
3679 r = lfn_open(cid, oid, true, &fd);
3680 if (r < 0) {
31f18b77 3681 dout(0) << __FUNC__ << ": couldn't open " << cid << "/"
7c673cae
FG
3682 << oid << ": "
3683 << cpp_strerror(r) << dendl;
3684 goto out;
3685 }
3686
3687 // write
3688 r = bl.write_fd(**fd, offset);
3689 if (r < 0) {
31f18b77 3690 derr << __FUNC__ << ": write_fd on " << cid << "/" << oid
7c673cae
FG
3691 << " error: " << cpp_strerror(r) << dendl;
3692 lfn_close(fd);
3693 goto out;
3694 }
3695 r = bl.length();
3696
3697 if (r >= 0 && m_filestore_sloppy_crc) {
3698 int rc = backend->_crc_update_write(**fd, offset, len, bl);
11fdf7f2 3699 ceph_assert(rc >= 0);
7c673cae
FG
3700 }
3701
3702 if (replaying || m_disable_wbthrottle) {
3703 if (fadvise_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED) {
3704#ifdef HAVE_POSIX_FADVISE
3705 posix_fadvise(**fd, 0, 0, POSIX_FADV_DONTNEED);
3706#endif
3707 }
3708 } else {
3709 wbthrottle.queue_wb(fd, oid, offset, len,
3710 fadvise_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
3711 }
3712
3713 lfn_close(fd);
3714
3715 out:
31f18b77 3716 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << dendl;
7c673cae
FG
3717 return r;
3718}
3719
3720int FileStore::_zero(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len)
3721{
31f18b77 3722 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
7c673cae
FG
3723 int ret = 0;
3724
3725 if (cct->_conf->filestore_punch_hole) {
3726#ifdef CEPH_HAVE_FALLOCATE
11fdf7f2 3727# if !defined(__APPLE__) && !defined(__FreeBSD__)
7c673cae
FG
3728# ifdef FALLOC_FL_KEEP_SIZE
3729 // first try to punch a hole.
3730 FDRef fd;
3731 ret = lfn_open(cid, oid, false, &fd);
3732 if (ret < 0) {
3733 goto out;
3734 }
3735
3736 struct stat st;
3737 ret = ::fstat(**fd, &st);
3738 if (ret < 0) {
3739 ret = -errno;
3740 lfn_close(fd);
3741 goto out;
3742 }
3743
3744 // first try fallocate
3745 ret = fallocate(**fd, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE,
3746 offset, len);
3747 if (ret < 0) {
3748 ret = -errno;
3749 } else {
b32b8144
FG
3750 // ensure we extend file size, if needed
3751 if (len > 0 && offset + len > (uint64_t)st.st_size) {
7c673cae
FG
3752 ret = ::ftruncate(**fd, offset + len);
3753 if (ret < 0) {
3754 ret = -errno;
3755 lfn_close(fd);
3756 goto out;
3757 }
3758 }
3759 }
3760 lfn_close(fd);
3761
3762 if (ret >= 0 && m_filestore_sloppy_crc) {
3763 int rc = backend->_crc_update_zero(**fd, offset, len);
11fdf7f2 3764 ceph_assert(rc >= 0);
7c673cae
FG
3765 }
3766
3767 if (ret == 0)
3768 goto out; // yay!
3769 if (ret != -EOPNOTSUPP)
3770 goto out; // some other error
3771# endif
3772# endif
3773#endif
3774 }
3775
3776 // lame, kernel is old and doesn't support it.
3777 // write zeros.. yuck!
31f18b77 3778 dout(20) << __FUNC__ << ": falling back to writing zeros" << dendl;
7c673cae
FG
3779 {
3780 bufferlist bl;
3781 bl.append_zero(len);
3782 ret = _write(cid, oid, offset, len, bl);
3783 }
3784
3785#ifdef CEPH_HAVE_FALLOCATE
11fdf7f2 3786# if !defined(__APPLE__) && !defined(__FreeBSD__)
7c673cae
FG
3787# ifdef FALLOC_FL_KEEP_SIZE
3788 out:
3789# endif
3790# endif
3791#endif
31f18b77 3792 dout(20) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << ret << dendl;
7c673cae
FG
3793 return ret;
3794}
3795
3796int FileStore::_clone(const coll_t& cid, const ghobject_t& oldoid, const ghobject_t& newoid,
3797 const SequencerPosition& spos)
3798{
31f18b77 3799 dout(15) << __FUNC__ << ": " << cid << "/" << oldoid << " -> " << cid << "/" << newoid << dendl;
7c673cae
FG
3800
3801 if (_check_replay_guard(cid, newoid, spos) < 0)
3802 return 0;
3803
3804 int r;
3805 FDRef o, n;
3806 {
3807 Index index;
3808 r = lfn_open(cid, oldoid, false, &o, &index);
3809 if (r < 0) {
3810 goto out2;
3811 }
11fdf7f2 3812 ceph_assert(index.index);
7c673cae
FG
3813 RWLock::WLocker l((index.index)->access_lock);
3814
3815 r = lfn_open(cid, newoid, true, &n, &index);
3816 if (r < 0) {
3817 goto out;
3818 }
3819 r = ::ftruncate(**n, 0);
3820 if (r < 0) {
3821 r = -errno;
3822 goto out3;
3823 }
3824 struct stat st;
3825 r = ::fstat(**o, &st);
3826 if (r < 0) {
3827 r = -errno;
3828 goto out3;
3829 }
3830
3831 r = _do_clone_range(**o, **n, 0, st.st_size, 0);
3832 if (r < 0) {
3833 goto out3;
3834 }
3835
3836 dout(20) << "objectmap clone" << dendl;
3837 r = object_map->clone(oldoid, newoid, &spos);
3838 if (r < 0 && r != -ENOENT)
3839 goto out3;
3840 }
3841
3842 {
3843 char buf[2];
3844 map<string, bufferptr> aset;
3845 r = _fgetattrs(**o, aset);
3846 if (r < 0)
3847 goto out3;
3848
3849 r = chain_fgetxattr(**o, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
3850 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
3851 r = chain_fsetxattr<true, true>(**n, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
3852 sizeof(XATTR_NO_SPILL_OUT));
3853 } else {
3854 r = chain_fsetxattr<true, true>(**n, XATTR_SPILL_OUT_NAME, XATTR_SPILL_OUT,
3855 sizeof(XATTR_SPILL_OUT));
3856 }
3857 if (r < 0)
3858 goto out3;
3859
3860 r = _fsetattrs(**n, aset);
3861 if (r < 0)
3862 goto out3;
3863 }
3864
3865 // clone is non-idempotent; record our work.
3866 _set_replay_guard(**n, spos, &newoid);
3867
3868 out3:
3869 lfn_close(n);
3870 out:
3871 lfn_close(o);
3872 out2:
31f18b77 3873 dout(10) << __FUNC__ << ": " << cid << "/" << oldoid << " -> " << cid << "/" << newoid << " = " << r << dendl;
11fdf7f2 3874 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
3875 return r;
3876}
3877
3878int FileStore::_do_clone_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff)
3879{
31f18b77 3880 dout(20) << __FUNC__ << ": copy " << srcoff << "~" << len << " to " << dstoff << dendl;
7c673cae
FG
3881 return backend->clone_range(from, to, srcoff, len, dstoff);
3882}
3883
3884int FileStore::_do_sparse_copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff)
3885{
31f18b77 3886 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << dendl;
7c673cae
FG
3887 int r = 0;
3888 map<uint64_t, uint64_t> exomap;
3889 // fiemap doesn't allow zero length
3890 if (len == 0)
3891 return 0;
3892
3893 if (backend->has_seek_data_hole()) {
3894 dout(15) << "seek_data/seek_hole " << from << " " << srcoff << "~" << len << dendl;
3895 r = _do_seek_hole_data(from, srcoff, len, &exomap);
3896 } else if (backend->has_fiemap()) {
3897 dout(15) << "fiemap ioctl" << from << " " << srcoff << "~" << len << dendl;
3898 r = _do_fiemap(from, srcoff, len, &exomap);
3899 }
3900
3901
3902 int64_t written = 0;
3903 if (r < 0)
3904 goto out;
3905
3906 for (map<uint64_t, uint64_t>::iterator miter = exomap.begin(); miter != exomap.end(); ++miter) {
3907 uint64_t it_off = miter->first - srcoff + dstoff;
3908 r = _do_copy_range(from, to, miter->first, miter->second, it_off, true);
3909 if (r < 0) {
31f18b77 3910 derr << __FUNC__ << ": copy error at " << miter->first << "~" << miter->second
7c673cae
FG
3911 << " to " << it_off << ", " << cpp_strerror(r) << dendl;
3912 break;
3913 }
3914 written += miter->second;
3915 }
3916
3917 if (r >= 0) {
3918 if (m_filestore_sloppy_crc) {
3919 int rc = backend->_crc_update_clone_range(from, to, srcoff, len, dstoff);
11fdf7f2 3920 ceph_assert(rc >= 0);
7c673cae
FG
3921 }
3922 struct stat st;
3923 r = ::fstat(to, &st);
3924 if (r < 0) {
3925 r = -errno;
31f18b77 3926 derr << __FUNC__ << ": fstat error at " << to << " " << cpp_strerror(r) << dendl;
7c673cae
FG
3927 goto out;
3928 }
3929 if (st.st_size < (int)(dstoff + len)) {
3930 r = ::ftruncate(to, dstoff + len);
3931 if (r < 0) {
3932 r = -errno;
31f18b77 3933 derr << __FUNC__ << ": ftruncate error at " << dstoff+len << " " << cpp_strerror(r) << dendl;
7c673cae
FG
3934 goto out;
3935 }
3936 }
3937 r = written;
3938 }
3939
3940 out:
31f18b77 3941 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
7c673cae
FG
3942 return r;
3943}
3944
3945int FileStore::_do_copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff, bool skip_sloppycrc)
3946{
31f18b77 3947 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << dendl;
7c673cae
FG
3948 int r = 0;
3949 loff_t pos = srcoff;
3950 loff_t end = srcoff + len;
3951 int buflen = 4096 * 16; //limit by pipe max size.see fcntl
3952
3953#ifdef CEPH_HAVE_SPLICE
3954 if (backend->has_splice()) {
3955 int pipefd[2];
91327a77
AA
3956 if (pipe_cloexec(pipefd) < 0) {
3957 int e = errno;
3958 derr << " pipe " << " got " << cpp_strerror(e) << dendl;
3959 return -e;
7c673cae
FG
3960 }
3961
3962 loff_t dstpos = dstoff;
3963 while (pos < end) {
11fdf7f2
TL
3964 int l = std::min<int>(end-pos, buflen);
3965 r = safe_splice(from, &pos, pipefd[1], nullptr, l, SPLICE_F_NONBLOCK);
7c673cae
FG
3966 dout(10) << " safe_splice read from " << pos << "~" << l << " got " << r << dendl;
3967 if (r < 0) {
31f18b77 3968 derr << __FUNC__ << ": safe_splice read error at " << pos << "~" << len
7c673cae
FG
3969 << ", " << cpp_strerror(r) << dendl;
3970 break;
3971 }
3972 if (r == 0) {
3973 // hrm, bad source range, wtf.
3974 r = -ERANGE;
31f18b77 3975 derr << __FUNC__ << ": got short read result at " << pos
7c673cae
FG
3976 << " of fd " << from << " len " << len << dendl;
3977 break;
3978 }
3979
11fdf7f2 3980 r = safe_splice(pipefd[0], nullptr, to, &dstpos, r, 0);
7c673cae
FG
3981 dout(10) << " safe_splice write to " << to << " len " << r
3982 << " got " << r << dendl;
3983 if (r < 0) {
31f18b77 3984 derr << __FUNC__ << ": write error at " << pos << "~"
7c673cae
FG
3985 << r << ", " << cpp_strerror(r) << dendl;
3986 break;
3987 }
3988 }
3989 close(pipefd[0]);
3990 close(pipefd[1]);
3991 } else
3992#endif
3993 {
3994 int64_t actual;
3995
3996 actual = ::lseek64(from, srcoff, SEEK_SET);
3997 if (actual != (int64_t)srcoff) {
3998 if (actual < 0)
3999 r = -errno;
4000 else
4001 r = -EINVAL;
4002 derr << "lseek64 to " << srcoff << " got " << cpp_strerror(r) << dendl;
4003 return r;
4004 }
4005 actual = ::lseek64(to, dstoff, SEEK_SET);
4006 if (actual != (int64_t)dstoff) {
4007 if (actual < 0)
4008 r = -errno;
4009 else
4010 r = -EINVAL;
4011 derr << "lseek64 to " << dstoff << " got " << cpp_strerror(r) << dendl;
4012 return r;
4013 }
4014
4015 char buf[buflen];
4016 while (pos < end) {
11fdf7f2 4017 int l = std::min<int>(end-pos, buflen);
7c673cae
FG
4018 r = ::read(from, buf, l);
4019 dout(25) << " read from " << pos << "~" << l << " got " << r << dendl;
4020 if (r < 0) {
4021 if (errno == EINTR) {
4022 continue;
4023 } else {
4024 r = -errno;
31f18b77 4025 derr << __FUNC__ << ": read error at " << pos << "~" << len
7c673cae
FG
4026 << ", " << cpp_strerror(r) << dendl;
4027 break;
4028 }
4029 }
4030 if (r == 0) {
4031 // hrm, bad source range, wtf.
4032 r = -ERANGE;
31f18b77 4033 derr << __FUNC__ << ": got short read result at " << pos
7c673cae
FG
4034 << " of fd " << from << " len " << len << dendl;
4035 break;
4036 }
4037 int op = 0;
4038 while (op < r) {
4039 int r2 = safe_write(to, buf+op, r-op);
4040 dout(25) << " write to " << to << " len " << (r-op)
4041 << " got " << r2 << dendl;
4042 if (r2 < 0) {
4043 r = r2;
31f18b77 4044 derr << __FUNC__ << ": write error at " << pos << "~"
7c673cae
FG
4045 << r-op << ", " << cpp_strerror(r) << dendl;
4046
4047 break;
4048 }
4049 op += (r-op);
4050 }
4051 if (r < 0)
4052 break;
4053 pos += r;
4054 }
4055 }
4056
4057 if (r < 0 && replaying) {
11fdf7f2 4058 ceph_assert(r == -ERANGE);
31f18b77 4059 derr << __FUNC__ << ": short source tolerated because we are replaying" << dendl;
94b18763 4060 r = len;
7c673cae 4061 }
11fdf7f2 4062 ceph_assert(replaying || pos == end);
7c673cae
FG
4063 if (r >= 0 && !skip_sloppycrc && m_filestore_sloppy_crc) {
4064 int rc = backend->_crc_update_clone_range(from, to, srcoff, len, dstoff);
11fdf7f2 4065 ceph_assert(rc >= 0);
7c673cae 4066 }
31f18b77 4067 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
7c673cae
FG
4068 return r;
4069}
4070
4071int FileStore::_clone_range(const coll_t& oldcid, const ghobject_t& oldoid, const coll_t& newcid, const ghobject_t& newoid,
4072 uint64_t srcoff, uint64_t len, uint64_t dstoff,
4073 const SequencerPosition& spos)
4074{
31f18b77 4075 dout(15) << __FUNC__ << ": " << oldcid << "/" << oldoid << " -> " << newcid << "/" << newoid << " " << srcoff << "~" << len << " to " << dstoff << dendl;
7c673cae
FG
4076
4077 if (_check_replay_guard(newcid, newoid, spos) < 0)
4078 return 0;
4079
4080 int r;
4081 FDRef o, n;
4082 r = lfn_open(oldcid, oldoid, false, &o);
4083 if (r < 0) {
4084 goto out2;
4085 }
4086 r = lfn_open(newcid, newoid, true, &n);
4087 if (r < 0) {
4088 goto out;
4089 }
4090 r = _do_clone_range(**o, **n, srcoff, len, dstoff);
4091 if (r < 0) {
4092 goto out3;
4093 }
4094
4095 // clone is non-idempotent; record our work.
4096 _set_replay_guard(**n, spos, &newoid);
4097
4098 out3:
4099 lfn_close(n);
4100 out:
4101 lfn_close(o);
4102 out2:
31f18b77 4103 dout(10) << __FUNC__ << ": " << oldcid << "/" << oldoid << " -> " << newcid << "/" << newoid << " "
7c673cae
FG
4104 << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
4105 return r;
4106}
4107
4108class SyncEntryTimeout : public Context {
4109public:
4110 CephContext* cct;
4111 explicit SyncEntryTimeout(CephContext* cct, int commit_timeo)
4112 : cct(cct), m_commit_timeo(commit_timeo)
4113 {
4114 }
4115
4116 void finish(int r) override {
4117 BackTrace *bt = new BackTrace(1);
4118 generic_dout(-1) << "FileStore: sync_entry timed out after "
4119 << m_commit_timeo << " seconds.\n";
4120 bt->print(*_dout);
4121 *_dout << dendl;
4122 delete bt;
11fdf7f2 4123 bt = nullptr;
7c673cae
FG
4124 ceph_abort();
4125 }
4126private:
4127 int m_commit_timeo;
4128};
4129
4130void FileStore::sync_entry()
4131{
4132 lock.Lock();
4133 while (!stop) {
4134 utime_t max_interval;
4135 max_interval.set_from_double(m_filestore_max_sync_interval);
4136 utime_t min_interval;
4137 min_interval.set_from_double(m_filestore_min_sync_interval);
4138
4139 utime_t startwait = ceph_clock_now();
4140 if (!force_sync) {
31f18b77 4141 dout(20) << __FUNC__ << ": waiting for max_interval " << max_interval << dendl;
7c673cae
FG
4142 sync_cond.WaitInterval(lock, max_interval);
4143 } else {
31f18b77 4144 dout(20) << __FUNC__ << ": not waiting, force_sync set" << dendl;
7c673cae
FG
4145 }
4146
4147 if (force_sync) {
31f18b77 4148 dout(20) << __FUNC__ << ": force_sync set" << dendl;
7c673cae
FG
4149 force_sync = false;
4150 } else if (stop) {
31f18b77 4151 dout(20) << __FUNC__ << ": stop set" << dendl;
7c673cae
FG
4152 break;
4153 } else {
4154 // wait for at least the min interval
4155 utime_t woke = ceph_clock_now();
4156 woke -= startwait;
31f18b77 4157 dout(20) << __FUNC__ << ": woke after " << woke << dendl;
7c673cae
FG
4158 if (woke < min_interval) {
4159 utime_t t = min_interval;
4160 t -= woke;
31f18b77 4161 dout(20) << __FUNC__ << ": waiting for another " << t
7c673cae
FG
4162 << " to reach min interval " << min_interval << dendl;
4163 sync_cond.WaitInterval(lock, t);
4164 }
4165 }
4166
4167 list<Context*> fin;
4168 again:
4169 fin.swap(sync_waiters);
4170 lock.Unlock();
4171
4172 op_tp.pause();
4173 if (apply_manager.commit_start()) {
4174 utime_t start = ceph_clock_now();
4175 uint64_t cp = apply_manager.get_committing_seq();
4176
4177 sync_entry_timeo_lock.Lock();
4178 SyncEntryTimeout *sync_entry_timeo =
4179 new SyncEntryTimeout(cct, m_filestore_commit_timeout);
224ce89b
WB
4180 if (!timer.add_event_after(m_filestore_commit_timeout,
4181 sync_entry_timeo)) {
4182 sync_entry_timeo = nullptr;
4183 }
7c673cae
FG
4184 sync_entry_timeo_lock.Unlock();
4185
4186 logger->set(l_filestore_committing, 1);
4187
31f18b77 4188 dout(15) << __FUNC__ << ": committing " << cp << dendl;
7c673cae
FG
4189 stringstream errstream;
4190 if (cct->_conf->filestore_debug_omap_check && !object_map->check(errstream)) {
4191 derr << errstream.str() << dendl;
4192 ceph_abort();
4193 }
4194
4195 if (backend->can_checkpoint()) {
4196 int err = write_op_seq(op_fd, cp);
4197 if (err < 0) {
4198 derr << "Error during write_op_seq: " << cpp_strerror(err) << dendl;
11fdf7f2 4199 ceph_abort_msg("error during write_op_seq");
7c673cae
FG
4200 }
4201
4202 char s[NAME_MAX];
4203 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)cp);
4204 uint64_t cid = 0;
4205 err = backend->create_checkpoint(s, &cid);
4206 if (err < 0) {
4207 int err = errno;
4208 derr << "snap create '" << s << "' got error " << err << dendl;
11fdf7f2 4209 ceph_assert(err == 0);
7c673cae
FG
4210 }
4211
4212 snaps.push_back(cp);
4213 apply_manager.commit_started();
4214 op_tp.unpause();
4215
4216 if (cid > 0) {
4217 dout(20) << " waiting for checkpoint " << cid << " to complete" << dendl;
4218 err = backend->sync_checkpoint(cid);
4219 if (err < 0) {
4220 derr << "ioctl WAIT_SYNC got " << cpp_strerror(err) << dendl;
11fdf7f2 4221 ceph_abort_msg("wait_sync got error");
7c673cae
FG
4222 }
4223 dout(20) << " done waiting for checkpoint " << cid << " to complete" << dendl;
4224 }
224ce89b 4225 } else {
7c673cae
FG
4226 apply_manager.commit_started();
4227 op_tp.unpause();
4228
4229 int err = object_map->sync();
4230 if (err < 0) {
4231 derr << "object_map sync got " << cpp_strerror(err) << dendl;
11fdf7f2 4232 ceph_abort_msg("object_map sync returned error");
7c673cae
FG
4233 }
4234
4235 err = backend->syncfs();
4236 if (err < 0) {
4237 derr << "syncfs got " << cpp_strerror(err) << dendl;
11fdf7f2 4238 ceph_abort_msg("syncfs returned error");
7c673cae
FG
4239 }
4240
4241 err = write_op_seq(op_fd, cp);
4242 if (err < 0) {
4243 derr << "Error during write_op_seq: " << cpp_strerror(err) << dendl;
11fdf7f2 4244 ceph_abort_msg("error during write_op_seq");
7c673cae
FG
4245 }
4246 err = ::fsync(op_fd);
4247 if (err < 0) {
4248 derr << "Error during fsync of op_seq: " << cpp_strerror(err) << dendl;
11fdf7f2 4249 ceph_abort_msg("error during fsync of op_seq");
7c673cae
FG
4250 }
4251 }
4252
4253 utime_t done = ceph_clock_now();
4254 utime_t lat = done - start;
4255 utime_t dur = done - startwait;
31f18b77 4256 dout(10) << __FUNC__ << ": commit took " << lat << ", interval was " << dur << dendl;
224ce89b
WB
4257 utime_t max_pause_lat = logger->tget(l_filestore_sync_pause_max_lat);
4258 if (max_pause_lat < dur - lat) {
4259 logger->tinc(l_filestore_sync_pause_max_lat, dur - lat);
4260 }
7c673cae
FG
4261
4262 logger->inc(l_filestore_commitcycle);
4263 logger->tinc(l_filestore_commitcycle_latency, lat);
4264 logger->tinc(l_filestore_commitcycle_interval, dur);
4265
4266 apply_manager.commit_finish();
4267 if (!m_disable_wbthrottle) {
4268 wbthrottle.clear();
4269 }
4270
4271 logger->set(l_filestore_committing, 0);
4272
4273 // remove old snaps?
4274 if (backend->can_checkpoint()) {
4275 char s[NAME_MAX];
4276 while (snaps.size() > 2) {
4277 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)snaps.front());
4278 snaps.pop_front();
4279 dout(10) << "removing snap '" << s << "'" << dendl;
4280 int r = backend->destroy_checkpoint(s);
4281 if (r) {
4282 int err = errno;
4283 derr << "unable to destroy snap '" << s << "' got " << cpp_strerror(err) << dendl;
4284 }
4285 }
4286 }
4287
31f18b77 4288 dout(15) << __FUNC__ << ": committed to op_seq " << cp << dendl;
7c673cae 4289
224ce89b
WB
4290 if (sync_entry_timeo) {
4291 Mutex::Locker lock(sync_entry_timeo_lock);
4292 timer.cancel_event(sync_entry_timeo);
4293 }
7c673cae
FG
4294 } else {
4295 op_tp.unpause();
4296 }
4297
4298 lock.Lock();
4299 finish_contexts(cct, fin, 0);
4300 fin.clear();
4301 if (!sync_waiters.empty()) {
31f18b77 4302 dout(10) << __FUNC__ << ": more waiters, committing again" << dendl;
7c673cae
FG
4303 goto again;
4304 }
4305 if (!stop && journal && journal->should_commit_now()) {
31f18b77 4306 dout(10) << __FUNC__ << ": journal says we should commit again (probably is/was full)" << dendl;
7c673cae
FG
4307 goto again;
4308 }
4309 }
4310 stop = false;
4311 lock.Unlock();
4312}
4313
7c673cae
FG
4314void FileStore::do_force_sync()
4315{
31f18b77 4316 dout(10) << __FUNC__ << dendl;
7c673cae
FG
4317 Mutex::Locker l(lock);
4318 force_sync = true;
4319 sync_cond.Signal();
4320}
4321
4322void FileStore::start_sync(Context *onsafe)
4323{
4324 Mutex::Locker l(lock);
4325 sync_waiters.push_back(onsafe);
4326 sync_cond.Signal();
4327 force_sync = true;
31f18b77 4328 dout(10) << __FUNC__ << dendl;
7c673cae
FG
4329}
4330
4331void FileStore::sync()
4332{
4333 Mutex l("FileStore::sync");
4334 Cond c;
4335 bool done;
4336 C_SafeCond *fin = new C_SafeCond(&l, &c, &done);
4337
4338 start_sync(fin);
4339
4340 l.Lock();
4341 while (!done) {
4342 dout(10) << "sync waiting" << dendl;
4343 c.Wait(l);
4344 }
4345 l.Unlock();
4346 dout(10) << "sync done" << dendl;
4347}
4348
4349void FileStore::_flush_op_queue()
4350{
31f18b77 4351 dout(10) << __FUNC__ << ": draining op tp" << dendl;
7c673cae 4352 op_wq.drain();
31f18b77 4353 dout(10) << __FUNC__ << ": waiting for apply finisher" << dendl;
7c673cae
FG
4354 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
4355 (*it)->wait_for_empty();
4356 }
4357}
4358
4359/*
4360 * flush - make every queued write readable
4361 */
4362void FileStore::flush()
4363{
31f18b77 4364 dout(10) << __FUNC__ << dendl;
7c673cae
FG
4365
4366 if (cct->_conf->filestore_blackhole) {
4367 // wait forever
4368 Mutex lock("FileStore::flush::lock");
4369 Cond cond;
4370 lock.Lock();
4371 while (true)
4372 cond.Wait(lock);
4373 ceph_abort();
4374 }
4375
4376 if (m_filestore_journal_writeahead) {
4377 if (journal)
4378 journal->flush();
31f18b77 4379 dout(10) << __FUNC__ << ": draining ondisk finisher" << dendl;
7c673cae
FG
4380 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
4381 (*it)->wait_for_empty();
4382 }
4383 }
4384
4385 _flush_op_queue();
31f18b77 4386 dout(10) << __FUNC__ << ": complete" << dendl;
7c673cae
FG
4387}
4388
4389/*
4390 * sync_and_flush - make every queued write readable AND committed to disk
4391 */
4392void FileStore::sync_and_flush()
4393{
31f18b77 4394 dout(10) << __FUNC__ << dendl;
7c673cae
FG
4395
4396 if (m_filestore_journal_writeahead) {
4397 if (journal)
4398 journal->flush();
4399 _flush_op_queue();
4400 } else {
4401 // includes m_filestore_journal_parallel
4402 _flush_op_queue();
4403 sync();
4404 }
31f18b77 4405 dout(10) << __FUNC__ << ": done" << dendl;
7c673cae
FG
4406}
4407
4408int FileStore::flush_journal()
4409{
31f18b77 4410 dout(10) << __FUNC__ << dendl;
7c673cae
FG
4411 sync_and_flush();
4412 sync();
4413 return 0;
4414}
4415
4416int FileStore::snapshot(const string& name)
4417{
31f18b77 4418 dout(10) << __FUNC__ << ": " << name << dendl;
7c673cae
FG
4419 sync_and_flush();
4420
4421 if (!backend->can_checkpoint()) {
31f18b77 4422 dout(0) << __FUNC__ << ": " << name << " failed, not supported" << dendl;
7c673cae
FG
4423 return -EOPNOTSUPP;
4424 }
4425
4426 char s[NAME_MAX];
4427 snprintf(s, sizeof(s), CLUSTER_SNAP_ITEM, name.c_str());
4428
11fdf7f2 4429 int r = backend->create_checkpoint(s, nullptr);
7c673cae 4430 if (r) {
31f18b77 4431 derr << __FUNC__ << ": " << name << " failed: " << cpp_strerror(r) << dendl;
7c673cae
FG
4432 }
4433
4434 return r;
4435}
4436
4437// -------------------------------
4438// attributes
4439
4440int FileStore::_fgetattr(int fd, const char *name, bufferptr& bp)
4441{
4442 char val[CHAIN_XATTR_MAX_BLOCK_LEN];
4443 int l = chain_fgetxattr(fd, name, val, sizeof(val));
4444 if (l >= 0) {
4445 bp = buffer::create(l);
4446 memcpy(bp.c_str(), val, l);
4447 } else if (l == -ERANGE) {
4448 l = chain_fgetxattr(fd, name, 0, 0);
4449 if (l > 0) {
4450 bp = buffer::create(l);
4451 l = chain_fgetxattr(fd, name, bp.c_str(), l);
4452 }
4453 }
11fdf7f2 4454 ceph_assert(!m_filestore_fail_eio || l != -EIO);
7c673cae
FG
4455 return l;
4456}
4457
4458int FileStore::_fgetattrs(int fd, map<string,bufferptr>& aset)
4459{
4460 // get attr list
4461 char names1[100];
4462 int len = chain_flistxattr(fd, names1, sizeof(names1)-1);
4463 char *names2 = 0;
4464 char *name = 0;
4465 if (len == -ERANGE) {
4466 len = chain_flistxattr(fd, 0, 0);
4467 if (len < 0) {
11fdf7f2 4468 ceph_assert(!m_filestore_fail_eio || len != -EIO);
7c673cae
FG
4469 return len;
4470 }
4471 dout(10) << " -ERANGE, len is " << len << dendl;
4472 names2 = new char[len+1];
4473 len = chain_flistxattr(fd, names2, len);
4474 dout(10) << " -ERANGE, got " << len << dendl;
4475 if (len < 0) {
11fdf7f2 4476 ceph_assert(!m_filestore_fail_eio || len != -EIO);
7c673cae
FG
4477 delete[] names2;
4478 return len;
4479 }
4480 name = names2;
4481 } else if (len < 0) {
11fdf7f2 4482 ceph_assert(!m_filestore_fail_eio || len != -EIO);
7c673cae
FG
4483 return len;
4484 } else {
4485 name = names1;
4486 }
4487 name[len] = 0;
4488
4489 char *end = name + len;
4490 while (name < end) {
4491 char *attrname = name;
4492 if (parse_attrname(&name)) {
4493 if (*name) {
31f18b77 4494 dout(20) << __FUNC__ << ": " << fd << " getting '" << name << "'" << dendl;
7c673cae
FG
4495 int r = _fgetattr(fd, attrname, aset[name]);
4496 if (r < 0) {
4497 delete[] names2;
4498 return r;
4499 }
4500 }
4501 }
4502 name += strlen(name) + 1;
4503 }
4504
4505 delete[] names2;
4506 return 0;
4507}
4508
4509int FileStore::_fsetattrs(int fd, map<string, bufferptr> &aset)
4510{
4511 for (map<string, bufferptr>::iterator p = aset.begin();
4512 p != aset.end();
4513 ++p) {
4514 char n[CHAIN_XATTR_MAX_NAME_LEN];
4515 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4516 const char *val;
4517 if (p->second.length())
4518 val = p->second.c_str();
4519 else
4520 val = "";
4521 // ??? Why do we skip setting all the other attrs if one fails?
4522 int r = chain_fsetxattr(fd, n, val, p->second.length());
4523 if (r < 0) {
31f18b77 4524 derr << __FUNC__ << ": chain_setxattr returned " << r << dendl;
7c673cae
FG
4525 return r;
4526 }
4527 }
4528 return 0;
4529}
4530
4531// debug EIO injection
4532void FileStore::inject_data_error(const ghobject_t &oid) {
4533 Mutex::Locker l(read_error_lock);
31f18b77 4534 dout(10) << __FUNC__ << ": init error on " << oid << dendl;
7c673cae
FG
4535 data_error_set.insert(oid);
4536}
4537void FileStore::inject_mdata_error(const ghobject_t &oid) {
4538 Mutex::Locker l(read_error_lock);
31f18b77 4539 dout(10) << __FUNC__ << ": init error on " << oid << dendl;
7c673cae
FG
4540 mdata_error_set.insert(oid);
4541}
224ce89b 4542
7c673cae
FG
4543void FileStore::debug_obj_on_delete(const ghobject_t &oid) {
4544 Mutex::Locker l(read_error_lock);
31f18b77 4545 dout(10) << __FUNC__ << ": clear error on " << oid << dendl;
7c673cae
FG
4546 data_error_set.erase(oid);
4547 mdata_error_set.erase(oid);
4548}
4549bool FileStore::debug_data_eio(const ghobject_t &oid) {
4550 Mutex::Locker l(read_error_lock);
4551 if (data_error_set.count(oid)) {
31f18b77 4552 dout(10) << __FUNC__ << ": inject error on " << oid << dendl;
7c673cae
FG
4553 return true;
4554 } else {
4555 return false;
4556 }
4557}
4558bool FileStore::debug_mdata_eio(const ghobject_t &oid) {
4559 Mutex::Locker l(read_error_lock);
4560 if (mdata_error_set.count(oid)) {
31f18b77 4561 dout(10) << __FUNC__ << ": inject error on " << oid << dendl;
7c673cae
FG
4562 return true;
4563 } else {
4564 return false;
4565 }
4566}
4567
4568
4569// objects
4570
11fdf7f2 4571int FileStore::getattr(CollectionHandle& ch, const ghobject_t& oid, const char *name, bufferptr &bp)
7c673cae 4572{
11fdf7f2
TL
4573 tracepoint(objectstore, getattr_enter, ch->cid.c_str());
4574 const coll_t& cid = !_need_temp_object_collection(ch->cid, oid) ? ch->cid : ch->cid.get_temp();
31f18b77 4575 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "'" << dendl;
11fdf7f2
TL
4576
4577 auto osr = static_cast<OpSequencer*>(ch.get());
4578 osr->wait_for_apply(oid);
4579
7c673cae
FG
4580 FDRef fd;
4581 int r = lfn_open(cid, oid, false, &fd);
4582 if (r < 0) {
4583 goto out;
4584 }
4585 char n[CHAIN_XATTR_MAX_NAME_LEN];
4586 get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN);
4587 r = _fgetattr(**fd, n, bp);
4588 lfn_close(fd);
4589 if (r == -ENODATA) {
4590 map<string, bufferlist> got;
4591 set<string> to_get;
4592 to_get.insert(string(name));
4593 Index index;
4594 r = get_index(cid, &index);
4595 if (r < 0) {
31f18b77 4596 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
7c673cae
FG
4597 goto out;
4598 }
4599 r = object_map->get_xattrs(oid, to_get, &got);
4600 if (r < 0 && r != -ENOENT) {
31f18b77 4601 dout(10) << __FUNC__ << ": get_xattrs err r =" << r << dendl;
7c673cae
FG
4602 goto out;
4603 }
4604 if (got.empty()) {
31f18b77 4605 dout(10) << __FUNC__ << ": got.size() is 0" << dendl;
7c673cae
FG
4606 return -ENODATA;
4607 }
4608 bp = bufferptr(got.begin()->second.c_str(),
4609 got.begin()->second.length());
4610 r = bp.length();
4611 }
4612 out:
31f18b77 4613 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "' = " << r << dendl;
11fdf7f2 4614 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
4615 if (cct->_conf->filestore_debug_inject_read_err &&
4616 debug_mdata_eio(oid)) {
4617 return -EIO;
4618 } else {
4619 tracepoint(objectstore, getattr_exit, r);
4620 return r < 0 ? r : 0;
4621 }
4622}
4623
11fdf7f2 4624int FileStore::getattrs(CollectionHandle& ch, const ghobject_t& oid, map<string,bufferptr>& aset)
7c673cae 4625{
11fdf7f2
TL
4626 tracepoint(objectstore, getattrs_enter, ch->cid.c_str());
4627 const coll_t& cid = !_need_temp_object_collection(ch->cid, oid) ? ch->cid : ch->cid.get_temp();
7c673cae
FG
4628 set<string> omap_attrs;
4629 map<string, bufferlist> omap_aset;
4630 Index index;
31f18b77 4631 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
11fdf7f2
TL
4632
4633 auto osr = static_cast<OpSequencer*>(ch.get());
4634 osr->wait_for_apply(oid);
4635
7c673cae
FG
4636 FDRef fd;
4637 bool spill_out = true;
4638 char buf[2];
4639
4640 int r = lfn_open(cid, oid, false, &fd);
4641 if (r < 0) {
4642 goto out;
4643 }
4644
4645 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4646 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT)))
4647 spill_out = false;
4648
4649 r = _fgetattrs(**fd, aset);
4650 lfn_close(fd);
4651 fd = FDRef(); // defensive
4652 if (r < 0) {
4653 goto out;
4654 }
4655
4656 if (!spill_out) {
31f18b77 4657 dout(10) << __FUNC__ << ": no xattr exists in object_map r = " << r << dendl;
7c673cae
FG
4658 goto out;
4659 }
4660
4661 r = get_index(cid, &index);
4662 if (r < 0) {
31f18b77 4663 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
7c673cae
FG
4664 goto out;
4665 }
4666 {
4667 r = object_map->get_all_xattrs(oid, &omap_attrs);
4668 if (r < 0 && r != -ENOENT) {
31f18b77 4669 dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
7c673cae
FG
4670 goto out;
4671 }
4672
4673 r = object_map->get_xattrs(oid, omap_attrs, &omap_aset);
4674 if (r < 0 && r != -ENOENT) {
31f18b77 4675 dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
7c673cae
FG
4676 goto out;
4677 }
4678 if (r == -ENOENT)
4679 r = 0;
4680 }
11fdf7f2 4681 ceph_assert(omap_attrs.size() == omap_aset.size());
7c673cae
FG
4682 for (map<string, bufferlist>::iterator i = omap_aset.begin();
4683 i != omap_aset.end();
4684 ++i) {
4685 string key(i->first);
4686 aset.insert(make_pair(key,
4687 bufferptr(i->second.c_str(), i->second.length())));
4688 }
4689 out:
31f18b77 4690 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
11fdf7f2 4691 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
4692
4693 if (cct->_conf->filestore_debug_inject_read_err &&
4694 debug_mdata_eio(oid)) {
4695 return -EIO;
4696 } else {
4697 tracepoint(objectstore, getattrs_exit, r);
4698 return r;
4699 }
4700}
4701
4702int FileStore::_setattrs(const coll_t& cid, const ghobject_t& oid, map<string,bufferptr>& aset,
4703 const SequencerPosition &spos)
4704{
4705 map<string, bufferlist> omap_set;
4706 set<string> omap_remove;
4707 map<string, bufferptr> inline_set;
4708 map<string, bufferptr> inline_to_set;
4709 FDRef fd;
4710 int spill_out = -1;
4711 bool incomplete_inline = false;
4712
4713 int r = lfn_open(cid, oid, false, &fd);
4714 if (r < 0) {
4715 goto out;
4716 }
4717
4718 char buf[2];
4719 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4720 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT)))
4721 spill_out = 0;
4722 else
4723 spill_out = 1;
4724
4725 r = _fgetattrs(**fd, inline_set);
4726 incomplete_inline = (r == -E2BIG);
11fdf7f2 4727 if (r == -EIO && m_filestore_fail_eio) handle_eio();
31f18b77 4728 dout(15) << __FUNC__ << ": " << cid << "/" << oid
7c673cae
FG
4729 << (incomplete_inline ? " (incomplete_inline, forcing omap)" : "")
4730 << dendl;
4731
4732 for (map<string,bufferptr>::iterator p = aset.begin();
4733 p != aset.end();
4734 ++p) {
4735 char n[CHAIN_XATTR_MAX_NAME_LEN];
4736 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4737
4738 if (incomplete_inline) {
4739 chain_fremovexattr(**fd, n); // ignore any error
4740 omap_set[p->first].push_back(p->second);
4741 continue;
4742 }
4743
4744 if (p->second.length() > m_filestore_max_inline_xattr_size) {
4745 if (inline_set.count(p->first)) {
4746 inline_set.erase(p->first);
4747 r = chain_fremovexattr(**fd, n);
4748 if (r < 0)
4749 goto out_close;
4750 }
4751 omap_set[p->first].push_back(p->second);
4752 continue;
4753 }
4754
4755 if (!inline_set.count(p->first) &&
4756 inline_set.size() >= m_filestore_max_inline_xattrs) {
4757 omap_set[p->first].push_back(p->second);
4758 continue;
4759 }
4760 omap_remove.insert(p->first);
4761 inline_set.insert(*p);
4762
4763 inline_to_set.insert(*p);
4764 }
4765
4766 if (spill_out != 1 && !omap_set.empty()) {
4767 chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_SPILL_OUT,
4768 sizeof(XATTR_SPILL_OUT));
4769 }
4770
4771 r = _fsetattrs(**fd, inline_to_set);
4772 if (r < 0)
4773 goto out_close;
4774
4775 if (spill_out && !omap_remove.empty()) {
4776 r = object_map->remove_xattrs(oid, omap_remove, &spos);
4777 if (r < 0 && r != -ENOENT) {
31f18b77 4778 dout(10) << __FUNC__ << ": could not remove_xattrs r = " << r << dendl;
11fdf7f2 4779 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
4780 goto out_close;
4781 } else {
4782 r = 0; // don't confuse the debug output
4783 }
4784 }
4785
4786 if (!omap_set.empty()) {
4787 r = object_map->set_xattrs(oid, omap_set, &spos);
4788 if (r < 0) {
31f18b77 4789 dout(10) << __FUNC__ << ": could not set_xattrs r = " << r << dendl;
11fdf7f2 4790 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
4791 goto out_close;
4792 }
4793 }
4794 out_close:
4795 lfn_close(fd);
4796 out:
31f18b77 4797 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
7c673cae
FG
4798 return r;
4799}
4800
4801
4802int FileStore::_rmattr(const coll_t& cid, const ghobject_t& oid, const char *name,
4803 const SequencerPosition &spos)
4804{
31f18b77 4805 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "'" << dendl;
7c673cae
FG
4806 FDRef fd;
4807 bool spill_out = true;
4808
4809 int r = lfn_open(cid, oid, false, &fd);
4810 if (r < 0) {
4811 goto out;
4812 }
4813
4814 char buf[2];
4815 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4816 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
4817 spill_out = false;
4818 }
4819
4820 char n[CHAIN_XATTR_MAX_NAME_LEN];
4821 get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN);
4822 r = chain_fremovexattr(**fd, n);
4823 if (r == -ENODATA && spill_out) {
4824 Index index;
4825 r = get_index(cid, &index);
4826 if (r < 0) {
31f18b77 4827 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
7c673cae
FG
4828 goto out_close;
4829 }
4830 set<string> to_remove;
4831 to_remove.insert(string(name));
4832 r = object_map->remove_xattrs(oid, to_remove, &spos);
4833 if (r < 0 && r != -ENOENT) {
31f18b77 4834 dout(10) << __FUNC__ << ": could not remove_xattrs index r = " << r << dendl;
11fdf7f2 4835 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
4836 goto out_close;
4837 }
4838 }
4839 out_close:
4840 lfn_close(fd);
4841 out:
31f18b77 4842 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "' = " << r << dendl;
7c673cae
FG
4843 return r;
4844}
4845
4846int FileStore::_rmattrs(const coll_t& cid, const ghobject_t& oid,
4847 const SequencerPosition &spos)
4848{
31f18b77 4849 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
7c673cae
FG
4850
4851 map<string,bufferptr> aset;
4852 FDRef fd;
4853 set<string> omap_attrs;
4854 Index index;
4855 bool spill_out = true;
4856
4857 int r = lfn_open(cid, oid, false, &fd);
4858 if (r < 0) {
4859 goto out;
4860 }
4861
4862 char buf[2];
4863 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4864 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
4865 spill_out = false;
4866 }
4867
4868 r = _fgetattrs(**fd, aset);
4869 if (r >= 0) {
4870 for (map<string,bufferptr>::iterator p = aset.begin(); p != aset.end(); ++p) {
4871 char n[CHAIN_XATTR_MAX_NAME_LEN];
4872 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4873 r = chain_fremovexattr(**fd, n);
4874 if (r < 0) {
31f18b77 4875 dout(10) << __FUNC__ << ": could not remove xattr r = " << r << dendl;
7c673cae
FG
4876 goto out_close;
4877 }
4878 }
4879 }
4880
4881 if (!spill_out) {
31f18b77 4882 dout(10) << __FUNC__ << ": no xattr exists in object_map r = " << r << dendl;
7c673cae
FG
4883 goto out_close;
4884 }
4885
4886 r = get_index(cid, &index);
4887 if (r < 0) {
31f18b77 4888 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
7c673cae
FG
4889 goto out_close;
4890 }
4891 {
4892 r = object_map->get_all_xattrs(oid, &omap_attrs);
4893 if (r < 0 && r != -ENOENT) {
31f18b77 4894 dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
11fdf7f2 4895 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
4896 goto out_close;
4897 }
4898 r = object_map->remove_xattrs(oid, omap_attrs, &spos);
4899 if (r < 0 && r != -ENOENT) {
31f18b77 4900 dout(10) << __FUNC__ << ": could not remove omap_attrs r = " << r << dendl;
7c673cae
FG
4901 goto out_close;
4902 }
4903 if (r == -ENOENT)
4904 r = 0;
4905 chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
4906 sizeof(XATTR_NO_SPILL_OUT));
4907 }
4908
4909 out_close:
4910 lfn_close(fd);
4911 out:
31f18b77 4912 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
7c673cae
FG
4913 return r;
4914}
4915
4916
4917
4918
4919int FileStore::_collection_remove_recursive(const coll_t &cid,
4920 const SequencerPosition &spos)
4921{
4922 struct stat st;
4923 int r = collection_stat(cid, &st);
4924 if (r < 0) {
4925 if (r == -ENOENT)
4926 return 0;
4927 return r;
4928 }
4929
4930 vector<ghobject_t> objects;
4931 ghobject_t max;
4932 while (!max.is_max()) {
4933 r = collection_list(cid, max, ghobject_t::get_max(),
4934 300, &objects, &max);
4935 if (r < 0)
4936 return r;
4937 for (vector<ghobject_t>::iterator i = objects.begin();
4938 i != objects.end();
4939 ++i) {
11fdf7f2 4940 ceph_assert(_check_replay_guard(cid, *i, spos));
7c673cae
FG
4941 r = _remove(cid, *i, spos);
4942 if (r < 0)
4943 return r;
4944 }
4945 objects.clear();
4946 }
4947 return _destroy_collection(cid);
4948}
4949
4950// --------------------------
4951// collections
4952
4953int FileStore::list_collections(vector<coll_t>& ls)
4954{
4955 return list_collections(ls, false);
4956}
4957
4958int FileStore::list_collections(vector<coll_t>& ls, bool include_temp)
4959{
4960 tracepoint(objectstore, list_collections_enter);
31f18b77 4961 dout(10) << __FUNC__ << dendl;
7c673cae
FG
4962
4963 char fn[PATH_MAX];
4964 snprintf(fn, sizeof(fn), "%s/current", basedir.c_str());
4965
4966 int r = 0;
4967 DIR *dir = ::opendir(fn);
4968 if (!dir) {
4969 r = -errno;
4970 derr << "tried opening directory " << fn << ": " << cpp_strerror(-r) << dendl;
11fdf7f2 4971 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
4972 return r;
4973 }
4974
4975 struct dirent *de = nullptr;
4976 while ((de = ::readdir(dir))) {
4977 if (de->d_type == DT_UNKNOWN) {
4978 // d_type not supported (non-ext[234], btrfs), must stat
4979 struct stat sb;
4980 char filename[PATH_MAX];
11fdf7f2
TL
4981 if (int n = snprintf(filename, sizeof(filename), "%s/%s", fn, de->d_name);
4982 n >= static_cast<int>(sizeof(filename))) {
4983 derr << __func__ << " path length overrun: " << n << dendl;
4984 ceph_abort();
4985 }
7c673cae
FG
4986
4987 r = ::stat(filename, &sb);
4988 if (r < 0) {
4989 r = -errno;
4990 derr << "stat on " << filename << ": " << cpp_strerror(-r) << dendl;
11fdf7f2 4991 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
4992 break;
4993 }
4994 if (!S_ISDIR(sb.st_mode)) {
4995 continue;
4996 }
4997 } else if (de->d_type != DT_DIR) {
4998 continue;
4999 }
5000 if (strcmp(de->d_name, "omap") == 0) {
5001 continue;
5002 }
5003 if (de->d_name[0] == '.' &&
5004 (de->d_name[1] == '\0' ||
5005 (de->d_name[1] == '.' &&
5006 de->d_name[2] == '\0')))
5007 continue;
5008 coll_t cid;
5009 if (!cid.parse(de->d_name)) {
5010 derr << "ignoring invalid collection '" << de->d_name << "'" << dendl;
5011 continue;
5012 }
5013 if (!cid.is_temp() || include_temp)
5014 ls.push_back(cid);
5015 }
5016
5017 if (r > 0) {
5018 derr << "trying readdir " << fn << ": " << cpp_strerror(r) << dendl;
5019 r = -r;
5020 }
5021
5022 ::closedir(dir);
11fdf7f2 5023 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
5024 tracepoint(objectstore, list_collections_exit, r);
5025 return r;
5026}
5027
5028int FileStore::collection_stat(const coll_t& c, struct stat *st)
5029{
5030 tracepoint(objectstore, collection_stat_enter, c.c_str());
5031 char fn[PATH_MAX];
5032 get_cdir(c, fn, sizeof(fn));
31f18b77 5033 dout(15) << __FUNC__ << ": " << fn << dendl;
7c673cae
FG
5034 int r = ::stat(fn, st);
5035 if (r < 0)
5036 r = -errno;
31f18b77 5037 dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
11fdf7f2 5038 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
5039 tracepoint(objectstore, collection_stat_exit, r);
5040 return r;
5041}
5042
5043bool FileStore::collection_exists(const coll_t& c)
5044{
5045 tracepoint(objectstore, collection_exists_enter, c.c_str());
5046 struct stat st;
5047 bool ret = collection_stat(c, &st) == 0;
5048 tracepoint(objectstore, collection_exists_exit, ret);
5049 return ret;
5050}
5051
11fdf7f2 5052int FileStore::collection_empty(const coll_t& cid, bool *empty)
7c673cae 5053{
11fdf7f2
TL
5054 tracepoint(objectstore, collection_empty_enter, cid.c_str());
5055 dout(15) << __FUNC__ << ": " << cid << dendl;
7c673cae 5056 Index index;
11fdf7f2 5057 int r = get_index(cid, &index);
7c673cae 5058 if (r < 0) {
31f18b77 5059 derr << __FUNC__ << ": get_index returned: " << cpp_strerror(r)
7c673cae
FG
5060 << dendl;
5061 return r;
5062 }
5063
11fdf7f2 5064 ceph_assert(index.index);
7c673cae
FG
5065 RWLock::RLocker l((index.index)->access_lock);
5066
5067 vector<ghobject_t> ls;
5068 r = index->collection_list_partial(ghobject_t(), ghobject_t::get_max(),
11fdf7f2 5069 1, &ls, nullptr);
7c673cae 5070 if (r < 0) {
31f18b77 5071 derr << __FUNC__ << ": collection_list_partial returned: "
7c673cae 5072 << cpp_strerror(r) << dendl;
11fdf7f2 5073 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
5074 return r;
5075 }
5076 *empty = ls.empty();
5077 tracepoint(objectstore, collection_empty_exit, *empty);
5078 return 0;
5079}
5080
5081int FileStore::_collection_set_bits(const coll_t& c, int bits)
5082{
5083 char fn[PATH_MAX];
5084 get_cdir(c, fn, sizeof(fn));
31f18b77 5085 dout(10) << __FUNC__ << ": " << fn << " " << bits << dendl;
7c673cae
FG
5086 char n[PATH_MAX];
5087 int r;
5088 int32_t v = bits;
91327a77 5089 int fd = ::open(fn, O_RDONLY|O_CLOEXEC);
7c673cae
FG
5090 if (fd < 0) {
5091 r = -errno;
5092 goto out;
5093 }
5094 get_attrname("bits", n, PATH_MAX);
5095 r = chain_fsetxattr(fd, n, (char*)&v, sizeof(v));
5096 VOID_TEMP_FAILURE_RETRY(::close(fd));
5097 out:
31f18b77 5098 dout(10) << __FUNC__ << ": " << fn << " " << bits << " = " << r << dendl;
7c673cae
FG
5099 return r;
5100}
5101
11fdf7f2 5102int FileStore::collection_bits(CollectionHandle& ch)
7c673cae
FG
5103{
5104 char fn[PATH_MAX];
11fdf7f2 5105 get_cdir(ch->cid, fn, sizeof(fn));
31f18b77 5106 dout(15) << __FUNC__ << ": " << fn << dendl;
7c673cae
FG
5107 int r;
5108 char n[PATH_MAX];
5109 int32_t bits;
91327a77 5110 int fd = ::open(fn, O_RDONLY|O_CLOEXEC);
7c673cae
FG
5111 if (fd < 0) {
5112 bits = r = -errno;
5113 goto out;
5114 }
5115 get_attrname("bits", n, PATH_MAX);
5116 r = chain_fgetxattr(fd, n, (char*)&bits, sizeof(bits));
5117 VOID_TEMP_FAILURE_RETRY(::close(fd));
5118 if (r < 0) {
5119 bits = r;
5120 goto out;
5121 }
5122 out:
31f18b77 5123 dout(10) << __FUNC__ << ": " << fn << " = " << bits << dendl;
7c673cae
FG
5124 return bits;
5125}
5126
5127int FileStore::collection_list(const coll_t& c,
5128 const ghobject_t& orig_start,
5129 const ghobject_t& end,
5130 int max,
5131 vector<ghobject_t> *ls, ghobject_t *next)
5132{
5133 ghobject_t start = orig_start;
5134 if (start.is_max())
5135 return 0;
5136
5137 ghobject_t temp_next;
5138 if (!next)
5139 next = &temp_next;
5140 // figure out the pool id. we need this in order to generate a
5141 // meaningful 'next' value.
5142 int64_t pool = -1;
5143 shard_id_t shard;
5144 {
5145 spg_t pgid;
5146 if (c.is_temp(&pgid)) {
5147 pool = -2 - pgid.pool();
5148 shard = pgid.shard;
5149 } else if (c.is_pg(&pgid)) {
5150 pool = pgid.pool();
5151 shard = pgid.shard;
5152 } else if (c.is_meta()) {
5153 pool = -1;
5154 shard = shard_id_t::NO_SHARD;
5155 } else {
5156 // hrm, the caller is test code! we should get kill it off. for now,
5157 // tolerate it.
5158 pool = 0;
5159 shard = shard_id_t::NO_SHARD;
5160 }
31f18b77 5161 dout(20) << __FUNC__ << ": pool is " << pool << " shard is " << shard
7c673cae
FG
5162 << " pgid " << pgid << dendl;
5163 }
5164 ghobject_t sep;
5165 sep.hobj.pool = -1;
5166 sep.set_shard(shard);
5167 if (!c.is_temp() && !c.is_meta()) {
5168 if (start < sep) {
31f18b77 5169 dout(10) << __FUNC__ << ": first checking temp pool" << dendl;
7c673cae
FG
5170 coll_t temp = c.get_temp();
5171 int r = collection_list(temp, start, end, max, ls, next);
5172 if (r < 0)
5173 return r;
5174 if (*next != ghobject_t::get_max())
5175 return r;
5176 start = sep;
31f18b77 5177 dout(10) << __FUNC__ << ": fall through to non-temp collection, start "
7c673cae
FG
5178 << start << dendl;
5179 } else {
31f18b77 5180 dout(10) << __FUNC__ << ": start " << start << " >= sep " << sep << dendl;
7c673cae
FG
5181 }
5182 }
5183
5184 Index index;
5185 int r = get_index(c, &index);
5186 if (r < 0)
5187 return r;
5188
11fdf7f2 5189 ceph_assert(index.index);
7c673cae
FG
5190 RWLock::RLocker l((index.index)->access_lock);
5191
5192 r = index->collection_list_partial(start, end, max, ls, next);
5193
5194 if (r < 0) {
11fdf7f2 5195 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
5196 return r;
5197 }
5198 dout(20) << "objects: " << *ls << dendl;
5199
5200 // HashIndex doesn't know the pool when constructing a 'next' value
11fdf7f2 5201 if (!next->is_max()) {
7c673cae
FG
5202 next->hobj.pool = pool;
5203 next->set_shard(shard);
5204 dout(20) << " next " << *next << dendl;
5205 }
5206
5207 return 0;
5208}
5209
11fdf7f2 5210int FileStore::omap_get(CollectionHandle& ch, const ghobject_t &hoid,
7c673cae
FG
5211 bufferlist *header,
5212 map<string, bufferlist> *out)
5213{
11fdf7f2
TL
5214 tracepoint(objectstore, omap_get_enter, ch->cid.c_str());
5215 const coll_t& c = !_need_temp_object_collection(ch->cid, hoid) ? ch->cid : ch->cid.get_temp();
31f18b77 5216 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
11fdf7f2
TL
5217
5218 auto osr = static_cast<OpSequencer*>(ch.get());
5219 osr->wait_for_apply(hoid);
5220
7c673cae
FG
5221 Index index;
5222 int r = get_index(c, &index);
5223 if (r < 0)
5224 return r;
5225 {
11fdf7f2 5226 ceph_assert(index.index);
7c673cae
FG
5227 RWLock::RLocker l((index.index)->access_lock);
5228 r = lfn_find(hoid, index);
5229 if (r < 0)
5230 return r;
5231 }
5232 r = object_map->get(hoid, header, out);
5233 if (r < 0 && r != -ENOENT) {
11fdf7f2 5234 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
5235 return r;
5236 }
5237 tracepoint(objectstore, omap_get_exit, 0);
5238 return 0;
5239}
5240
5241int FileStore::omap_get_header(
11fdf7f2 5242 CollectionHandle& ch,
7c673cae
FG
5243 const ghobject_t &hoid,
5244 bufferlist *bl,
5245 bool allow_eio)
5246{
11fdf7f2
TL
5247 tracepoint(objectstore, omap_get_header_enter, ch->cid.c_str());
5248 const coll_t& c = !_need_temp_object_collection(ch->cid, hoid) ? ch->cid : ch->cid.get_temp();
31f18b77 5249 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
11fdf7f2
TL
5250
5251 auto osr = static_cast<OpSequencer*>(ch.get());
5252 osr->wait_for_apply(hoid);
5253
7c673cae
FG
5254 Index index;
5255 int r = get_index(c, &index);
5256 if (r < 0)
5257 return r;
5258 {
11fdf7f2 5259 ceph_assert(index.index);
7c673cae
FG
5260 RWLock::RLocker l((index.index)->access_lock);
5261 r = lfn_find(hoid, index);
5262 if (r < 0)
5263 return r;
5264 }
5265 r = object_map->get_header(hoid, bl);
5266 if (r < 0 && r != -ENOENT) {
11fdf7f2 5267 ceph_assert(allow_eio || !m_filestore_fail_eio || r != -EIO);
7c673cae
FG
5268 return r;
5269 }
5270 tracepoint(objectstore, omap_get_header_exit, 0);
5271 return 0;
5272}
5273
11fdf7f2 5274int FileStore::omap_get_keys(CollectionHandle& ch, const ghobject_t &hoid, set<string> *keys)
7c673cae 5275{
11fdf7f2
TL
5276 tracepoint(objectstore, omap_get_keys_enter, ch->cid.c_str());
5277 const coll_t& c = !_need_temp_object_collection(ch->cid, hoid) ? ch->cid : ch->cid.get_temp();
31f18b77 5278 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
11fdf7f2
TL
5279
5280 auto osr = static_cast<OpSequencer*>(ch.get());
5281 osr->wait_for_apply(hoid);
5282
7c673cae
FG
5283 Index index;
5284 int r = get_index(c, &index);
5285 if (r < 0)
5286 return r;
5287 {
11fdf7f2 5288 ceph_assert(index.index);
7c673cae
FG
5289 RWLock::RLocker l((index.index)->access_lock);
5290 r = lfn_find(hoid, index);
5291 if (r < 0)
5292 return r;
5293 }
5294 r = object_map->get_keys(hoid, keys);
5295 if (r < 0 && r != -ENOENT) {
11fdf7f2 5296 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
5297 return r;
5298 }
5299 tracepoint(objectstore, omap_get_keys_exit, 0);
5300 return 0;
5301}
5302
11fdf7f2 5303int FileStore::omap_get_values(CollectionHandle& ch, const ghobject_t &hoid,
7c673cae
FG
5304 const set<string> &keys,
5305 map<string, bufferlist> *out)
5306{
11fdf7f2
TL
5307 tracepoint(objectstore, omap_get_values_enter, ch->cid.c_str());
5308 const coll_t& c = !_need_temp_object_collection(ch->cid, hoid) ? ch->cid : ch->cid.get_temp();
31f18b77 5309 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
11fdf7f2
TL
5310
5311 auto osr = static_cast<OpSequencer*>(ch.get());
5312 osr->wait_for_apply(hoid);
5313
7c673cae
FG
5314 Index index;
5315 const char *where = "()";
5316 int r = get_index(c, &index);
5317 if (r < 0) {
5318 where = " (get_index)";
5319 goto out;
5320 }
5321 {
11fdf7f2 5322 ceph_assert(index.index);
7c673cae
FG
5323 RWLock::RLocker l((index.index)->access_lock);
5324 r = lfn_find(hoid, index);
5325 if (r < 0) {
5326 where = " (lfn_find)";
5327 goto out;
5328 }
5329 }
5330 r = object_map->get_values(hoid, keys, out);
5331 if (r < 0 && r != -ENOENT) {
11fdf7f2 5332 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
5333 where = " (get_values)";
5334 goto out;
5335 }
5336 r = 0;
5337 out:
5338 tracepoint(objectstore, omap_get_values_exit, r);
31f18b77 5339 dout(15) << __FUNC__ << ": " << c << "/" << hoid << " = " << r
7c673cae
FG
5340 << where << dendl;
5341 return r;
5342}
5343
11fdf7f2 5344int FileStore::omap_check_keys(CollectionHandle& ch, const ghobject_t &hoid,
7c673cae
FG
5345 const set<string> &keys,
5346 set<string> *out)
5347{
11fdf7f2
TL
5348 tracepoint(objectstore, omap_check_keys_enter, ch->cid.c_str());
5349 const coll_t& c = !_need_temp_object_collection(ch->cid, hoid) ? ch->cid : ch->cid.get_temp();
31f18b77 5350 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
7c673cae 5351
11fdf7f2
TL
5352 auto osr = static_cast<OpSequencer*>(ch.get());
5353 osr->wait_for_apply(hoid);
5354
7c673cae
FG
5355 Index index;
5356 int r = get_index(c, &index);
5357 if (r < 0)
5358 return r;
5359 {
11fdf7f2 5360 ceph_assert(index.index);
7c673cae
FG
5361 RWLock::RLocker l((index.index)->access_lock);
5362 r = lfn_find(hoid, index);
5363 if (r < 0)
5364 return r;
5365 }
5366 r = object_map->check_keys(hoid, keys, out);
5367 if (r < 0 && r != -ENOENT) {
11fdf7f2 5368 if (r == -EIO && m_filestore_fail_eio) handle_eio();
7c673cae
FG
5369 return r;
5370 }
5371 tracepoint(objectstore, omap_check_keys_exit, 0);
5372 return 0;
5373}
5374
11fdf7f2
TL
5375ObjectMap::ObjectMapIterator FileStore::get_omap_iterator(
5376 CollectionHandle& ch,
5377 const ghobject_t &oid)
5378{
5379 auto osr = static_cast<OpSequencer*>(ch.get());
5380 osr->wait_for_apply(oid);
5381 return get_omap_iterator(ch->cid, oid);
5382}
5383
7c673cae
FG
5384ObjectMap::ObjectMapIterator FileStore::get_omap_iterator(const coll_t& _c,
5385 const ghobject_t &hoid)
5386{
5387 tracepoint(objectstore, get_omap_iterator, _c.c_str());
5388 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
31f18b77 5389 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
7c673cae
FG
5390 Index index;
5391 int r = get_index(c, &index);
5392 if (r < 0) {
31f18b77 5393 dout(10) << __FUNC__ << ": " << c << "/" << hoid << " = 0 "
7c673cae
FG
5394 << "(get_index failed with " << cpp_strerror(r) << ")" << dendl;
5395 return ObjectMap::ObjectMapIterator();
5396 }
5397 {
11fdf7f2 5398 ceph_assert(index.index);
7c673cae
FG
5399 RWLock::RLocker l((index.index)->access_lock);
5400 r = lfn_find(hoid, index);
5401 if (r < 0) {
31f18b77 5402 dout(10) << __FUNC__ << ": " << c << "/" << hoid << " = 0 "
7c673cae
FG
5403 << "(lfn_find failed with " << cpp_strerror(r) << ")" << dendl;
5404 return ObjectMap::ObjectMapIterator();
5405 }
5406 }
5407 return object_map->get_iterator(hoid);
5408}
5409
5410int FileStore::_collection_hint_expected_num_objs(const coll_t& c, uint32_t pg_num,
5411 uint64_t expected_num_objs,
5412 const SequencerPosition &spos)
5413{
31f18b77 5414 dout(15) << __FUNC__ << ": collection: " << c << " pg number: "
7c673cae
FG
5415 << pg_num << " expected number of objects: " << expected_num_objs << dendl;
5416
5417 bool empty;
5418 int ret = collection_empty(c, &empty);
5419 if (ret < 0)
5420 return ret;
5421 if (!empty && !replaying) {
5422 dout(0) << "Failed to give an expected number of objects hint to collection : "
5423 << c << ", only empty collection can take such type of hint. " << dendl;
5424 return 0;
5425 }
5426
5427 Index index;
5428 ret = get_index(c, &index);
5429 if (ret < 0)
5430 return ret;
5431 // Pre-hash the collection
5432 ret = index->pre_hash_collection(pg_num, expected_num_objs);
5433 dout(10) << "pre_hash_collection " << c << " = " << ret << dendl;
5434 if (ret < 0)
5435 return ret;
5436 _set_replay_guard(c, spos);
5437
5438 return 0;
5439}
5440
5441int FileStore::_create_collection(
5442 const coll_t& c,
5443 int bits,
5444 const SequencerPosition &spos)
5445{
5446 char fn[PATH_MAX];
5447 get_cdir(c, fn, sizeof(fn));
31f18b77 5448 dout(15) << __FUNC__ << ": " << fn << dendl;
7c673cae
FG
5449 int r = ::mkdir(fn, 0755);
5450 if (r < 0)
5451 r = -errno;
5452 if (r == -EEXIST && replaying)
5453 r = 0;
31f18b77 5454 dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
7c673cae
FG
5455
5456 if (r < 0)
5457 return r;
5458 r = init_index(c);
5459 if (r < 0)
5460 return r;
5461 r = _collection_set_bits(c, bits);
5462 if (r < 0)
5463 return r;
5464 // create parallel temp collection, too
5465 if (!c.is_meta() && !c.is_temp()) {
5466 coll_t temp = c.get_temp();
5467 r = _create_collection(temp, 0, spos);
5468 if (r < 0)
5469 return r;
5470 }
5471
5472 _set_replay_guard(c, spos);
5473 return 0;
5474}
5475
5476int FileStore::_destroy_collection(const coll_t& c)
5477{
5478 int r = 0;
5479 char fn[PATH_MAX];
5480 get_cdir(c, fn, sizeof(fn));
31f18b77 5481 dout(15) << __FUNC__ << ": " << fn << dendl;
7c673cae
FG
5482 {
5483 Index from;
5484 r = get_index(c, &from);
5485 if (r < 0)
5486 goto out;
11fdf7f2 5487 ceph_assert(from.index);
7c673cae
FG
5488 RWLock::WLocker l((from.index)->access_lock);
5489
5490 r = from->prep_delete();
5491 if (r < 0)
5492 goto out;
5493 }
5494 r = ::rmdir(fn);
5495 if (r < 0) {
5496 r = -errno;
5497 goto out;
5498 }
5499
5500 out:
5501 // destroy parallel temp collection, too
5502 if (!c.is_meta() && !c.is_temp()) {
5503 coll_t temp = c.get_temp();
5504 int r2 = _destroy_collection(temp);
5505 if (r2 < 0) {
5506 r = r2;
5507 goto out_final;
5508 }
5509 }
5510
5511 out_final:
31f18b77 5512 dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
7c673cae
FG
5513 return r;
5514}
5515
5516
5517int FileStore::_collection_add(const coll_t& c, const coll_t& oldcid, const ghobject_t& o,
5518 const SequencerPosition& spos)
5519{
31f18b77 5520 dout(15) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << o << dendl;
7c673cae
FG
5521
5522 int dstcmp = _check_replay_guard(c, o, spos);
5523 if (dstcmp < 0)
5524 return 0;
5525
5526 // check the src name too; it might have a newer guard, and we don't
5527 // want to clobber it
5528 int srccmp = _check_replay_guard(oldcid, o, spos);
5529 if (srccmp < 0)
5530 return 0;
5531
5532 // open guard on object so we don't any previous operations on the
5533 // new name that will modify the source inode.
5534 FDRef fd;
5535 int r = lfn_open(oldcid, o, 0, &fd);
5536 if (r < 0) {
5537 // the source collection/object does not exist. If we are replaying, we
5538 // should be safe, so just return 0 and move on.
11fdf7f2 5539 ceph_assert(replaying);
31f18b77 5540 dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
7c673cae
FG
5541 << oldcid << "/" << o << " (dne, continue replay) " << dendl;
5542 return 0;
5543 }
5544 if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress"
5545 _set_replay_guard(**fd, spos, &o, true);
5546 }
5547
5548 r = lfn_link(oldcid, c, o, o);
5549 if (replaying && !backend->can_checkpoint() &&
5550 r == -EEXIST) // crashed between link() and set_replay_guard()
5551 r = 0;
5552
5553 _inject_failure();
5554
5555 // close guard on object so we don't do this again
5556 if (r == 0) {
5557 _close_replay_guard(**fd, spos);
5558 }
5559 lfn_close(fd);
5560
31f18b77 5561 dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << o << " = " << r << dendl;
7c673cae
FG
5562 return r;
5563}
5564
5565int FileStore::_collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid,
5566 coll_t c, const ghobject_t& o,
5567 const SequencerPosition& spos,
5568 bool allow_enoent)
5569{
31f18b77 5570 dout(15) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid << dendl;
7c673cae
FG
5571 int r = 0;
5572 int dstcmp, srccmp;
5573
5574 if (replaying) {
5575 /* If the destination collection doesn't exist during replay,
5576 * we need to delete the src object and continue on
5577 */
5578 if (!collection_exists(c))
5579 goto out_rm_src;
5580 }
5581
5582 dstcmp = _check_replay_guard(c, o, spos);
5583 if (dstcmp < 0)
5584 goto out_rm_src;
5585
5586 // check the src name too; it might have a newer guard, and we don't
5587 // want to clobber it
5588 srccmp = _check_replay_guard(oldcid, oldoid, spos);
5589 if (srccmp < 0)
5590 return 0;
5591
5592 {
5593 // open guard on object so we don't any previous operations on the
5594 // new name that will modify the source inode.
5595 FDRef fd;
5596 r = lfn_open(oldcid, oldoid, 0, &fd);
5597 if (r < 0) {
5598 // the source collection/object does not exist. If we are replaying, we
5599 // should be safe, so just return 0 and move on.
5600 if (replaying) {
31f18b77 5601 dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
7c673cae
FG
5602 << oldcid << "/" << oldoid << " (dne, continue replay) " << dendl;
5603 } else if (allow_enoent) {
31f18b77 5604 dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
7c673cae
FG
5605 << oldcid << "/" << oldoid << " (dne, ignoring enoent)"
5606 << dendl;
5607 } else {
11fdf7f2 5608 ceph_abort_msg("ERROR: source must exist");
7c673cae
FG
5609 }
5610
5611 if (!replaying) {
5612 return 0;
5613 }
5614 if (allow_enoent && dstcmp > 0) { // if dstcmp == 0, try_rename was started.
5615 return 0;
5616 }
5617
5618 r = 0; // don't know if object_map was cloned
5619 } else {
5620 if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress"
5621 _set_replay_guard(**fd, spos, &o, true);
5622 }
5623
5624 r = lfn_link(oldcid, c, oldoid, o);
5625 if (replaying && !backend->can_checkpoint() &&
5626 r == -EEXIST) // crashed between link() and set_replay_guard()
5627 r = 0;
5628
5629 lfn_close(fd);
5630 fd = FDRef();
5631
5632 _inject_failure();
5633 }
5634
5635 if (r == 0) {
5636 // the name changed; link the omap content
5637 r = object_map->rename(oldoid, o, &spos);
5638 if (r == -ENOENT)
5639 r = 0;
5640 }
5641
5642 _inject_failure();
5643
5644 if (r == 0)
5645 r = lfn_unlink(oldcid, oldoid, spos, true);
5646
5647 if (r == 0)
5648 r = lfn_open(c, o, 0, &fd);
5649
5650 // close guard on object so we don't do this again
5651 if (r == 0) {
5652 _close_replay_guard(**fd, spos, &o);
5653 lfn_close(fd);
5654 }
5655 }
5656
31f18b77 5657 dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid
7c673cae
FG
5658 << " = " << r << dendl;
5659 return r;
5660
5661 out_rm_src:
5662 // remove source
5663 if (_check_replay_guard(oldcid, oldoid, spos) > 0) {
5664 r = lfn_unlink(oldcid, oldoid, spos, true);
5665 }
5666
31f18b77 5667 dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid
7c673cae
FG
5668 << " = " << r << dendl;
5669 return r;
5670}
5671
5672void FileStore::_inject_failure()
5673{
31f18b77
FG
5674 if (m_filestore_kill_at) {
5675 int final = --m_filestore_kill_at;
5676 dout(5) << __FUNC__ << ": " << (final+1) << " -> " << final << dendl;
7c673cae 5677 if (final == 0) {
31f18b77 5678 derr << __FUNC__ << ": KILLING" << dendl;
7c673cae
FG
5679 cct->_log->flush();
5680 _exit(1);
5681 }
5682 }
5683}
5684
5685int FileStore::_omap_clear(const coll_t& cid, const ghobject_t &hoid,
5686 const SequencerPosition &spos) {
31f18b77 5687 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
7c673cae
FG
5688 Index index;
5689 int r = get_index(cid, &index);
5690 if (r < 0)
5691 return r;
5692 {
11fdf7f2 5693 ceph_assert(index.index);
7c673cae
FG
5694 RWLock::RLocker l((index.index)->access_lock);
5695 r = lfn_find(hoid, index);
5696 if (r < 0)
5697 return r;
5698 }
5699 r = object_map->clear_keys_header(hoid, &spos);
5700 if (r < 0 && r != -ENOENT)
5701 return r;
5702 return 0;
5703}
5704
5705int FileStore::_omap_setkeys(const coll_t& cid, const ghobject_t &hoid,
5706 const map<string, bufferlist> &aset,
5707 const SequencerPosition &spos) {
31f18b77 5708 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
7c673cae
FG
5709 Index index;
5710 int r;
5711 //treat pgmeta as a logical object, skip to check exist
5712 if (hoid.is_pgmeta())
5713 goto skip;
5714
5715 r = get_index(cid, &index);
5716 if (r < 0) {
31f18b77 5717 dout(20) << __FUNC__ << ": get_index got " << cpp_strerror(r) << dendl;
7c673cae
FG
5718 return r;
5719 }
5720 {
11fdf7f2 5721 ceph_assert(index.index);
7c673cae
FG
5722 RWLock::RLocker l((index.index)->access_lock);
5723 r = lfn_find(hoid, index);
5724 if (r < 0) {
31f18b77 5725 dout(20) << __FUNC__ << ": lfn_find got " << cpp_strerror(r) << dendl;
7c673cae
FG
5726 return r;
5727 }
5728 }
5729skip:
11fdf7f2 5730 if (g_conf()->subsys.should_gather<ceph_subsys_filestore, 20>()) {
7c673cae 5731 for (auto& p : aset) {
31f18b77 5732 dout(20) << __FUNC__ << ": set " << p.first << dendl;
7c673cae
FG
5733 }
5734 }
5735 r = object_map->set_keys(hoid, aset, &spos);
31f18b77 5736 dout(20) << __FUNC__ << ": " << cid << "/" << hoid << " = " << r << dendl;
7c673cae
FG
5737 return r;
5738}
5739
5740int FileStore::_omap_rmkeys(const coll_t& cid, const ghobject_t &hoid,
5741 const set<string> &keys,
5742 const SequencerPosition &spos) {
31f18b77 5743 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
7c673cae
FG
5744 Index index;
5745 int r;
5746 //treat pgmeta as a logical object, skip to check exist
5747 if (hoid.is_pgmeta())
5748 goto skip;
5749
5750 r = get_index(cid, &index);
5751 if (r < 0)
5752 return r;
5753 {
11fdf7f2 5754 ceph_assert(index.index);
7c673cae
FG
5755 RWLock::RLocker l((index.index)->access_lock);
5756 r = lfn_find(hoid, index);
5757 if (r < 0)
5758 return r;
5759 }
5760skip:
5761 r = object_map->rm_keys(hoid, keys, &spos);
5762 if (r < 0 && r != -ENOENT)
5763 return r;
5764 return 0;
5765}
5766
5767int FileStore::_omap_rmkeyrange(const coll_t& cid, const ghobject_t &hoid,
5768 const string& first, const string& last,
5769 const SequencerPosition &spos) {
31f18b77 5770 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << " [" << first << "," << last << "]" << dendl;
7c673cae
FG
5771 set<string> keys;
5772 {
5773 ObjectMap::ObjectMapIterator iter = get_omap_iterator(cid, hoid);
5774 if (!iter)
5775 return -ENOENT;
5776 for (iter->lower_bound(first); iter->valid() && iter->key() < last;
5777 iter->next()) {
5778 keys.insert(iter->key());
5779 }
5780 }
5781 return _omap_rmkeys(cid, hoid, keys, spos);
5782}
5783
5784int FileStore::_omap_setheader(const coll_t& cid, const ghobject_t &hoid,
5785 const bufferlist &bl,
5786 const SequencerPosition &spos)
5787{
31f18b77 5788 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
7c673cae
FG
5789 Index index;
5790 int r = get_index(cid, &index);
5791 if (r < 0)
5792 return r;
5793 {
11fdf7f2 5794 ceph_assert(index.index);
7c673cae
FG
5795 RWLock::RLocker l((index.index)->access_lock);
5796 r = lfn_find(hoid, index);
5797 if (r < 0)
5798 return r;
5799 }
5800 return object_map->set_header(hoid, bl, &spos);
5801}
5802
11fdf7f2
TL
5803int FileStore::_merge_collection(const coll_t& cid,
5804 uint32_t bits,
5805 coll_t dest,
5806 const SequencerPosition &spos)
5807{
5808 dout(15) << __FUNC__ << ": " << cid << " " << dest
5809 << " bits " << bits << dendl;
5810 int r = 0;
5811
5812 if (!collection_exists(cid)) {
5813 dout(2) << __FUNC__ << ": " << cid << " DNE" << dendl;
5814 ceph_assert(replaying);
5815 return 0;
5816 }
5817 if (!collection_exists(dest)) {
5818 dout(2) << __FUNC__ << ": " << dest << " DNE" << dendl;
5819 ceph_assert(replaying);
5820 return 0;
5821 }
5822
5823 // set bits
5824 if (_check_replay_guard(cid, spos) > 0)
5825 _collection_set_bits(dest, bits);
5826
5827 spg_t pgid;
5828 bool is_pg = dest.is_pg(&pgid);
5829 ceph_assert(is_pg);
5830
5831 int dstcmp = _check_replay_guard(dest, spos);
5832 if (dstcmp < 0)
5833 return 0;
5834
5835 int srccmp = _check_replay_guard(cid, spos);
5836 if (srccmp < 0)
5837 return 0;
5838
5839 _set_global_replay_guard(cid, spos);
5840 _set_replay_guard(cid, spos, true);
5841 _set_replay_guard(dest, spos, true);
5842
5843 // main collection
5844 {
5845 Index from;
5846 r = get_index(cid, &from);
5847
5848 Index to;
5849 if (!r)
5850 r = get_index(dest, &to);
5851
5852 if (!r) {
5853 ceph_assert(from.index);
5854 RWLock::WLocker l1((from.index)->access_lock);
5855
5856 ceph_assert(to.index);
5857 RWLock::WLocker l2((to.index)->access_lock);
5858
5859 r = from->merge(bits, to.index);
5860 }
5861 }
5862
5863 // temp too
5864 {
5865 Index from;
5866 r = get_index(cid.get_temp(), &from);
5867
5868 Index to;
5869 if (!r)
5870 r = get_index(dest.get_temp(), &to);
5871
5872 if (!r) {
5873 ceph_assert(from.index);
5874 RWLock::WLocker l1((from.index)->access_lock);
5875
5876 ceph_assert(to.index);
5877 RWLock::WLocker l2((to.index)->access_lock);
5878
5879 r = from->merge(bits, to.index);
5880 }
5881 }
5882
5883 // remove source
5884 _destroy_collection(cid);
5885
5886 _close_replay_guard(dest, spos);
5887 _close_replay_guard(dest.get_temp(), spos);
5888 // no need to close guards on cid... it's removed.
5889
5890 if (!r && cct->_conf->filestore_debug_verify_split) {
5891 vector<ghobject_t> objects;
5892 ghobject_t next;
5893 while (1) {
5894 collection_list(
5895 dest,
5896 next, ghobject_t::get_max(),
5897 get_ideal_list_max(),
5898 &objects,
5899 &next);
5900 if (objects.empty())
5901 break;
5902 for (vector<ghobject_t>::iterator i = objects.begin();
5903 i != objects.end();
5904 ++i) {
5905 if (!i->match(bits, pgid.pgid.ps())) {
5906 dout(20) << __FUNC__ << ": " << *i << " does not belong in "
5907 << cid << dendl;
5908 ceph_assert(i->match(bits, pgid.pgid.ps()));
5909 }
5910 }
5911 objects.clear();
5912 }
5913 }
5914
5915 dout(15) << __FUNC__ << ": " << cid << " " << dest << " bits " << bits
5916 << " = " << r << dendl;
5917 return r;
5918}
5919
7c673cae
FG
5920int FileStore::_split_collection(const coll_t& cid,
5921 uint32_t bits,
5922 uint32_t rem,
5923 coll_t dest,
5924 const SequencerPosition &spos)
5925{
5926 int r;
5927 {
31f18b77 5928 dout(15) << __FUNC__ << ": " << cid << " bits: " << bits << dendl;
7c673cae 5929 if (!collection_exists(cid)) {
31f18b77 5930 dout(2) << __FUNC__ << ": " << cid << " DNE" << dendl;
11fdf7f2 5931 ceph_assert(replaying);
7c673cae
FG
5932 return 0;
5933 }
5934 if (!collection_exists(dest)) {
31f18b77 5935 dout(2) << __FUNC__ << ": " << dest << " DNE" << dendl;
11fdf7f2 5936 ceph_assert(replaying);
7c673cae
FG
5937 return 0;
5938 }
5939
5940 int dstcmp = _check_replay_guard(dest, spos);
5941 if (dstcmp < 0)
5942 return 0;
5943
5944 int srccmp = _check_replay_guard(cid, spos);
5945 if (srccmp < 0)
5946 return 0;
5947
5948 _set_global_replay_guard(cid, spos);
5949 _set_replay_guard(cid, spos, true);
5950 _set_replay_guard(dest, spos, true);
5951
5952 Index from;
5953 r = get_index(cid, &from);
5954
5955 Index to;
5956 if (!r)
5957 r = get_index(dest, &to);
5958
5959 if (!r) {
11fdf7f2 5960 ceph_assert(from.index);
7c673cae
FG
5961 RWLock::WLocker l1((from.index)->access_lock);
5962
11fdf7f2 5963 ceph_assert(to.index);
7c673cae
FG
5964 RWLock::WLocker l2((to.index)->access_lock);
5965
5966 r = from->split(rem, bits, to.index);
5967 }
5968
5969 _close_replay_guard(cid, spos);
5970 _close_replay_guard(dest, spos);
5971 }
5972 _collection_set_bits(cid, bits);
5973 if (!r && cct->_conf->filestore_debug_verify_split) {
5974 vector<ghobject_t> objects;
5975 ghobject_t next;
5976 while (1) {
5977 collection_list(
5978 cid,
5979 next, ghobject_t::get_max(),
5980 get_ideal_list_max(),
5981 &objects,
5982 &next);
5983 if (objects.empty())
5984 break;
5985 for (vector<ghobject_t>::iterator i = objects.begin();
5986 i != objects.end();
5987 ++i) {
31f18b77 5988 dout(20) << __FUNC__ << ": " << *i << " still in source "
7c673cae 5989 << cid << dendl;
11fdf7f2 5990 ceph_assert(!i->match(bits, rem));
7c673cae
FG
5991 }
5992 objects.clear();
5993 }
5994 next = ghobject_t();
5995 while (1) {
5996 collection_list(
5997 dest,
5998 next, ghobject_t::get_max(),
5999 get_ideal_list_max(),
6000 &objects,
6001 &next);
6002 if (objects.empty())
6003 break;
6004 for (vector<ghobject_t>::iterator i = objects.begin();
6005 i != objects.end();
6006 ++i) {
31f18b77 6007 dout(20) << __FUNC__ << ": " << *i << " now in dest "
7c673cae 6008 << *i << dendl;
11fdf7f2 6009 ceph_assert(i->match(bits, rem));
7c673cae
FG
6010 }
6011 objects.clear();
6012 }
6013 }
6014 return r;
6015}
6016
6017int FileStore::_set_alloc_hint(const coll_t& cid, const ghobject_t& oid,
6018 uint64_t expected_object_size,
6019 uint64_t expected_write_size)
6020{
31f18b77 6021 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " object_size " << expected_object_size << " write_size " << expected_write_size << dendl;
7c673cae
FG
6022
6023 FDRef fd;
6024 int ret = 0;
6025
6026 if (expected_object_size == 0 || expected_write_size == 0)
6027 goto out;
6028
6029 ret = lfn_open(cid, oid, false, &fd);
6030 if (ret < 0)
6031 goto out;
6032
6033 {
6034 // TODO: a more elaborate hint calculation
11fdf7f2 6035 uint64_t hint = std::min<uint64_t>(expected_write_size, m_filestore_max_alloc_hint_size);
7c673cae
FG
6036
6037 ret = backend->set_alloc_hint(**fd, hint);
31f18b77 6038 dout(20) << __FUNC__ << ": hint " << hint << " ret " << ret << dendl;
7c673cae
FG
6039 }
6040
6041 lfn_close(fd);
6042out:
31f18b77 6043 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " object_size " << expected_object_size << " write_size " << expected_write_size << " = " << ret << dendl;
11fdf7f2 6044 ceph_assert(!m_filestore_fail_eio || ret != -EIO);
7c673cae
FG
6045 return ret;
6046}
6047
6048const char** FileStore::get_tracked_conf_keys() const
6049{
6050 static const char* KEYS[] = {
6051 "filestore_max_inline_xattr_size",
6052 "filestore_max_inline_xattr_size_xfs",
6053 "filestore_max_inline_xattr_size_btrfs",
6054 "filestore_max_inline_xattr_size_other",
6055 "filestore_max_inline_xattrs",
6056 "filestore_max_inline_xattrs_xfs",
6057 "filestore_max_inline_xattrs_btrfs",
6058 "filestore_max_inline_xattrs_other",
6059 "filestore_max_xattr_value_size",
6060 "filestore_max_xattr_value_size_xfs",
6061 "filestore_max_xattr_value_size_btrfs",
6062 "filestore_max_xattr_value_size_other",
6063 "filestore_min_sync_interval",
6064 "filestore_max_sync_interval",
6065 "filestore_queue_max_ops",
6066 "filestore_queue_max_bytes",
6067 "filestore_expected_throughput_bytes",
6068 "filestore_expected_throughput_ops",
6069 "filestore_queue_low_threshhold",
6070 "filestore_queue_high_threshhold",
6071 "filestore_queue_high_delay_multiple",
6072 "filestore_queue_max_delay_multiple",
6073 "filestore_commit_timeout",
6074 "filestore_dump_file",
6075 "filestore_kill_at",
6076 "filestore_fail_eio",
6077 "filestore_fadvise",
6078 "filestore_sloppy_crc",
6079 "filestore_sloppy_crc_block_size",
6080 "filestore_max_alloc_hint_size",
6081 NULL
6082 };
6083 return KEYS;
6084}
6085
11fdf7f2 6086void FileStore::handle_conf_change(const ConfigProxy& conf,
7c673cae
FG
6087 const std::set <std::string> &changed)
6088{
6089 if (changed.count("filestore_max_inline_xattr_size") ||
6090 changed.count("filestore_max_inline_xattr_size_xfs") ||
6091 changed.count("filestore_max_inline_xattr_size_btrfs") ||
6092 changed.count("filestore_max_inline_xattr_size_other") ||
6093 changed.count("filestore_max_inline_xattrs") ||
6094 changed.count("filestore_max_inline_xattrs_xfs") ||
6095 changed.count("filestore_max_inline_xattrs_btrfs") ||
6096 changed.count("filestore_max_inline_xattrs_other") ||
6097 changed.count("filestore_max_xattr_value_size") ||
6098 changed.count("filestore_max_xattr_value_size_xfs") ||
6099 changed.count("filestore_max_xattr_value_size_btrfs") ||
6100 changed.count("filestore_max_xattr_value_size_other")) {
6101 if (backend) {
6102 Mutex::Locker l(lock);
6103 set_xattr_limits_via_conf();
6104 }
6105 }
6106
6107 if (changed.count("filestore_queue_max_bytes") ||
6108 changed.count("filestore_queue_max_ops") ||
6109 changed.count("filestore_expected_throughput_bytes") ||
6110 changed.count("filestore_expected_throughput_ops") ||
6111 changed.count("filestore_queue_low_threshhold") ||
6112 changed.count("filestore_queue_high_threshhold") ||
6113 changed.count("filestore_queue_high_delay_multiple") ||
6114 changed.count("filestore_queue_max_delay_multiple")) {
6115 Mutex::Locker l(lock);
6116 set_throttle_params();
6117 }
6118
6119 if (changed.count("filestore_min_sync_interval") ||
6120 changed.count("filestore_max_sync_interval") ||
6121 changed.count("filestore_kill_at") ||
6122 changed.count("filestore_fail_eio") ||
6123 changed.count("filestore_sloppy_crc") ||
6124 changed.count("filestore_sloppy_crc_block_size") ||
6125 changed.count("filestore_max_alloc_hint_size") ||
6126 changed.count("filestore_fadvise")) {
6127 Mutex::Locker l(lock);
6128 m_filestore_min_sync_interval = conf->filestore_min_sync_interval;
6129 m_filestore_max_sync_interval = conf->filestore_max_sync_interval;
31f18b77 6130 m_filestore_kill_at = conf->filestore_kill_at;
7c673cae
FG
6131 m_filestore_fail_eio = conf->filestore_fail_eio;
6132 m_filestore_fadvise = conf->filestore_fadvise;
6133 m_filestore_sloppy_crc = conf->filestore_sloppy_crc;
6134 m_filestore_sloppy_crc_block_size = conf->filestore_sloppy_crc_block_size;
6135 m_filestore_max_alloc_hint_size = conf->filestore_max_alloc_hint_size;
6136 }
6137 if (changed.count("filestore_commit_timeout")) {
6138 Mutex::Locker l(sync_entry_timeo_lock);
6139 m_filestore_commit_timeout = conf->filestore_commit_timeout;
6140 }
6141 if (changed.count("filestore_dump_file")) {
6142 if (conf->filestore_dump_file.length() &&
6143 conf->filestore_dump_file != "-") {
6144 dump_start(conf->filestore_dump_file);
6145 } else {
6146 dump_stop();
6147 }
6148 }
6149}
6150
6151int FileStore::set_throttle_params()
6152{
6153 stringstream ss;
6154 bool valid = throttle_bytes.set_params(
6155 cct->_conf->filestore_queue_low_threshhold,
6156 cct->_conf->filestore_queue_high_threshhold,
6157 cct->_conf->filestore_expected_throughput_bytes,
11fdf7f2
TL
6158 cct->_conf->filestore_queue_high_delay_multiple?
6159 cct->_conf->filestore_queue_high_delay_multiple:
6160 cct->_conf->filestore_queue_high_delay_multiple_bytes,
6161 cct->_conf->filestore_queue_max_delay_multiple?
6162 cct->_conf->filestore_queue_max_delay_multiple:
6163 cct->_conf->filestore_queue_max_delay_multiple_bytes,
7c673cae
FG
6164 cct->_conf->filestore_queue_max_bytes,
6165 &ss);
6166
6167 valid &= throttle_ops.set_params(
6168 cct->_conf->filestore_queue_low_threshhold,
6169 cct->_conf->filestore_queue_high_threshhold,
6170 cct->_conf->filestore_expected_throughput_ops,
11fdf7f2
TL
6171 cct->_conf->filestore_queue_high_delay_multiple?
6172 cct->_conf->filestore_queue_high_delay_multiple:
6173 cct->_conf->filestore_queue_high_delay_multiple_ops,
6174 cct->_conf->filestore_queue_max_delay_multiple?
6175 cct->_conf->filestore_queue_max_delay_multiple:
6176 cct->_conf->filestore_queue_max_delay_multiple_ops,
7c673cae
FG
6177 cct->_conf->filestore_queue_max_ops,
6178 &ss);
6179
6180 logger->set(l_filestore_op_queue_max_ops, throttle_ops.get_max());
6181 logger->set(l_filestore_op_queue_max_bytes, throttle_bytes.get_max());
6182
6183 if (!valid) {
6184 derr << "tried to set invalid params: "
6185 << ss.str()
6186 << dendl;
6187 }
6188 return valid ? 0 : -EINVAL;
6189}
6190
6191void FileStore::dump_start(const std::string& file)
6192{
31f18b77 6193 dout(10) << __FUNC__ << ": " << file << dendl;
7c673cae
FG
6194 if (m_filestore_do_dump) {
6195 dump_stop();
6196 }
6197 m_filestore_dump_fmt.reset();
6198 m_filestore_dump_fmt.open_array_section("dump");
6199 m_filestore_dump.open(file.c_str());
6200 m_filestore_do_dump = true;
6201}
6202
6203void FileStore::dump_stop()
6204{
31f18b77 6205 dout(10) << __FUNC__ << dendl;
7c673cae
FG
6206 m_filestore_do_dump = false;
6207 if (m_filestore_dump.is_open()) {
6208 m_filestore_dump_fmt.close_section();
6209 m_filestore_dump_fmt.flush(m_filestore_dump);
6210 m_filestore_dump.flush();
6211 m_filestore_dump.close();
6212 }
6213}
6214
6215void FileStore::dump_transactions(vector<ObjectStore::Transaction>& ls, uint64_t seq, OpSequencer *osr)
6216{
6217 m_filestore_dump_fmt.open_array_section("transactions");
6218 unsigned trans_num = 0;
6219 for (vector<ObjectStore::Transaction>::iterator i = ls.begin(); i != ls.end(); ++i, ++trans_num) {
6220 m_filestore_dump_fmt.open_object_section("transaction");
11fdf7f2 6221 m_filestore_dump_fmt.dump_stream("osr") << osr->cid;
7c673cae
FG
6222 m_filestore_dump_fmt.dump_unsigned("seq", seq);
6223 m_filestore_dump_fmt.dump_unsigned("trans_num", trans_num);
6224 (*i).dump(&m_filestore_dump_fmt);
6225 m_filestore_dump_fmt.close_section();
6226 }
6227 m_filestore_dump_fmt.close_section();
6228 m_filestore_dump_fmt.flush(m_filestore_dump);
6229 m_filestore_dump.flush();
6230}
6231
11fdf7f2
TL
6232void FileStore::get_db_statistics(Formatter* f)
6233{
6234 object_map->db->get_statistics(f);
6235}
6236
7c673cae
FG
6237void FileStore::set_xattr_limits_via_conf()
6238{
6239 uint32_t fs_xattr_size;
6240 uint32_t fs_xattrs;
6241 uint32_t fs_xattr_max_value_size;
6242
6243 switch (m_fs_type) {
6244#if defined(__linux__)
6245 case XFS_SUPER_MAGIC:
6246 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_xfs;
6247 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_xfs;
6248 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_xfs;
6249 break;
6250 case BTRFS_SUPER_MAGIC:
6251 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_btrfs;
6252 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_btrfs;
6253 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_btrfs;
6254 break;
6255#endif
6256 default:
6257 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_other;
6258 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_other;
6259 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_other;
6260 break;
6261 }
6262
6263 // Use override value if set
6264 if (cct->_conf->filestore_max_inline_xattr_size)
6265 m_filestore_max_inline_xattr_size = cct->_conf->filestore_max_inline_xattr_size;
6266 else
6267 m_filestore_max_inline_xattr_size = fs_xattr_size;
6268
6269 // Use override value if set
6270 if (cct->_conf->filestore_max_inline_xattrs)
6271 m_filestore_max_inline_xattrs = cct->_conf->filestore_max_inline_xattrs;
6272 else
6273 m_filestore_max_inline_xattrs = fs_xattrs;
6274
6275 // Use override value if set
6276 if (cct->_conf->filestore_max_xattr_value_size)
6277 m_filestore_max_xattr_value_size = cct->_conf->filestore_max_xattr_value_size;
6278 else
6279 m_filestore_max_xattr_value_size = fs_xattr_max_value_size;
6280
6281 if (m_filestore_max_xattr_value_size < cct->_conf->osd_max_object_name_len) {
6282 derr << "WARNING: max attr value size ("
6283 << m_filestore_max_xattr_value_size
6284 << ") is smaller than osd_max_object_name_len ("
6285 << cct->_conf->osd_max_object_name_len
6286 << "). Your backend filesystem appears to not support attrs large "
6287 << "enough to handle the configured max rados name size. You may get "
6288 << "unexpected ENAMETOOLONG errors on rados operations or buggy "
6289 << "behavior"
6290 << dendl;
6291 }
6292}
6293
6294uint64_t FileStore::estimate_objects_overhead(uint64_t num_objects)
6295{
6296 uint64_t res = num_objects * blk_size / 2; //assumes that each object uses ( in average ) additional 1/2 block due to FS allocation granularity.
6297 return res;
6298}
6299
1adf2230 6300int FileStore::apply_layout_settings(const coll_t &cid, int target_level)
7c673cae 6301{
1adf2230
AA
6302 dout(20) << __FUNC__ << ": " << cid << " target level: "
6303 << target_level << dendl;
7c673cae
FG
6304 Index index;
6305 int r = get_index(cid, &index);
6306 if (r < 0) {
6307 dout(10) << "Error getting index for " << cid << ": " << cpp_strerror(r)
6308 << dendl;
6309 return r;
6310 }
6311
1adf2230 6312 return index->apply_layout_settings(target_level);
7c673cae
FG
6313}
6314
6315
6316// -- FSSuperblock --
6317
6318void FSSuperblock::encode(bufferlist &bl) const
6319{
6320 ENCODE_START(2, 1, bl);
6321 compat_features.encode(bl);
11fdf7f2 6322 encode(omap_backend, bl);
7c673cae
FG
6323 ENCODE_FINISH(bl);
6324}
6325
11fdf7f2 6326void FSSuperblock::decode(bufferlist::const_iterator &bl)
7c673cae
FG
6327{
6328 DECODE_START(2, bl);
6329 compat_features.decode(bl);
6330 if (struct_v >= 2)
11fdf7f2 6331 decode(omap_backend, bl);
7c673cae
FG
6332 else
6333 omap_backend = "leveldb";
6334 DECODE_FINISH(bl);
6335}
6336
6337void FSSuperblock::dump(Formatter *f) const
6338{
6339 f->open_object_section("compat");
6340 compat_features.dump(f);
6341 f->dump_string("omap_backend", omap_backend);
6342 f->close_section();
6343}
6344
6345void FSSuperblock::generate_test_instances(list<FSSuperblock*>& o)
6346{
6347 FSSuperblock z;
6348 o.push_back(new FSSuperblock(z));
6349 CompatSet::FeatureSet feature_compat;
6350 CompatSet::FeatureSet feature_ro_compat;
6351 CompatSet::FeatureSet feature_incompat;
6352 feature_incompat.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS);
6353 z.compat_features = CompatSet(feature_compat, feature_ro_compat,
6354 feature_incompat);
6355 o.push_back(new FSSuperblock(z));
6356 z.omap_backend = "rocksdb";
6357 o.push_back(new FSSuperblock(z));
6358}
11fdf7f2
TL
6359
6360#undef dout_prefix
6361#define dout_prefix *_dout << "filestore.osr(" << this << ") "
6362
6363void FileStore::OpSequencer::_register_apply(Op *o)
6364{
6365 if (o->registered_apply) {
6366 dout(20) << __func__ << " " << o << " already registered" << dendl;
6367 return;
6368 }
6369 o->registered_apply = true;
6370 for (auto& t : o->tls) {
6371 for (auto& i : t.get_object_index()) {
6372 uint32_t key = i.first.hobj.get_hash();
6373 applying.emplace(make_pair(key, &i.first));
6374 dout(20) << __func__ << " " << o << " " << i.first << " ("
6375 << &i.first << ")" << dendl;
6376 }
6377 }
6378}
6379
6380void FileStore::OpSequencer::_unregister_apply(Op *o)
6381{
6382 ceph_assert(o->registered_apply);
6383 for (auto& t : o->tls) {
6384 for (auto& i : t.get_object_index()) {
6385 uint32_t key = i.first.hobj.get_hash();
6386 auto p = applying.find(key);
6387 bool removed = false;
6388 while (p != applying.end() &&
6389 p->first == key) {
6390 if (p->second == &i.first) {
6391 dout(20) << __func__ << " " << o << " " << i.first << " ("
6392 << &i.first << ")" << dendl;
6393 applying.erase(p);
6394 removed = true;
6395 break;
6396 }
6397 ++p;
6398 }
6399 ceph_assert(removed);
6400 }
6401 }
6402}
6403
6404void FileStore::OpSequencer::wait_for_apply(const ghobject_t& oid)
6405{
6406 Mutex::Locker l(qlock);
6407 uint32_t key = oid.hobj.get_hash();
6408retry:
6409 while (true) {
6410 // search all items in hash slot for a matching object
6411 auto p = applying.find(key);
6412 while (p != applying.end() &&
6413 p->first == key) {
6414 if (*p->second == oid) {
6415 dout(20) << __func__ << " " << oid << " waiting on " << p->second
6416 << dendl;
6417 cond.Wait(qlock);
6418 goto retry;
6419 }
6420 ++p;
6421 }
6422 break;
6423 }
6424 dout(20) << __func__ << " " << oid << " done" << dendl;
6425}