]> git.proxmox.com Git - ceph.git/blame - ceph/src/os/filestore/FileStore.cc
update sources to v12.1.1
[ceph.git] / ceph / src / os / filestore / FileStore.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15#include "include/compat.h"
16#include "include/int_types.h"
17#include "boost/tuple/tuple.hpp"
18
19#include <unistd.h>
20#include <stdlib.h>
21#include <sys/types.h>
22#include <sys/stat.h>
23#include <fcntl.h>
24#include <sys/file.h>
25#include <errno.h>
26#include <dirent.h>
27#include <sys/ioctl.h>
28
29#if defined(__linux__)
30#include <linux/fs.h>
31#endif
32
33#include <iostream>
34#include <map>
35
36#include "include/linux_fiemap.h"
37
38#include "common/xattr.h"
39#include "chain_xattr.h"
40
41#if defined(DARWIN) || defined(__FreeBSD__)
42#include <sys/param.h>
43#include <sys/mount.h>
44#endif // DARWIN
45
46
47#include <fstream>
48#include <sstream>
49
50#include "FileStore.h"
51#include "GenericFileStoreBackend.h"
52#include "BtrfsFileStoreBackend.h"
53#include "XfsFileStoreBackend.h"
54#include "ZFSFileStoreBackend.h"
55#include "common/BackTrace.h"
56#include "include/types.h"
57#include "FileJournal.h"
58
59#include "osd/osd_types.h"
60#include "include/color.h"
61#include "include/buffer.h"
62
63#include "common/Timer.h"
64#include "common/debug.h"
65#include "common/errno.h"
66#include "common/run_cmd.h"
67#include "common/safe_io.h"
68#include "common/perf_counters.h"
69#include "common/sync_filesystem.h"
70#include "common/fd.h"
71#include "HashIndex.h"
72#include "DBObjectMap.h"
73#include "kv/KeyValueDB.h"
74
75#include "common/ceph_crypto.h"
76using ceph::crypto::SHA1;
77
78#include "include/assert.h"
79
80#include "common/config.h"
81#include "common/blkdev.h"
82
83#ifdef WITH_LTTNG
84#define TRACEPOINT_DEFINE
85#define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
86#include "tracing/objectstore.h"
87#undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE
88#undef TRACEPOINT_DEFINE
89#else
90#define tracepoint(...)
91#endif
92
93#define dout_context cct
94#define dout_subsys ceph_subsys_filestore
95#undef dout_prefix
96#define dout_prefix *_dout << "filestore(" << basedir << ") "
97
98#define COMMIT_SNAP_ITEM "snap_%llu"
99#define CLUSTER_SNAP_ITEM "clustersnap_%s"
100
101#define REPLAY_GUARD_XATTR "user.cephos.seq"
102#define GLOBAL_REPLAY_GUARD_XATTR "user.cephos.gseq"
103
104// XATTR_SPILL_OUT_NAME as a xattr is used to maintain that indicates whether
105// xattrs spill over into DBObjectMap, if XATTR_SPILL_OUT_NAME exists in file
106// xattrs and the value is "no", it indicates no xattrs in DBObjectMap
107#define XATTR_SPILL_OUT_NAME "user.cephos.spill_out"
108#define XATTR_NO_SPILL_OUT "0"
109#define XATTR_SPILL_OUT "1"
31f18b77 110#define __FUNC__ __func__ << "(" << __LINE__ << ")"
7c673cae
FG
111
112//Initial features in new superblock.
113static CompatSet get_fs_initial_compat_set() {
114 CompatSet::FeatureSet ceph_osd_feature_compat;
115 CompatSet::FeatureSet ceph_osd_feature_ro_compat;
116 CompatSet::FeatureSet ceph_osd_feature_incompat;
117 return CompatSet(ceph_osd_feature_compat, ceph_osd_feature_ro_compat,
118 ceph_osd_feature_incompat);
119}
120
121//Features are added here that this FileStore supports.
122static CompatSet get_fs_supported_compat_set() {
123 CompatSet compat = get_fs_initial_compat_set();
124 //Any features here can be set in code, but not in initial superblock
125 compat.incompat.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS);
126 return compat;
127}
128
129int FileStore::validate_hobject_key(const hobject_t &obj) const
130{
131 unsigned len = LFNIndex::get_max_escaped_name_len(obj);
132 return len > m_filestore_max_xattr_value_size ? -ENAMETOOLONG : 0;
133}
134
135int FileStore::get_block_device_fsid(CephContext* cct, const string& path,
136 uuid_d *fsid)
137{
138 // make sure we don't try to use aio or direct_io (and get annoying
139 // error messages from failing to do so); performance implications
140 // should be irrelevant for this use
141 FileJournal j(cct, *fsid, 0, 0, path.c_str(), false, false);
142 return j.peek_fsid(*fsid);
143}
144
145void FileStore::FSPerfTracker::update_from_perfcounters(
146 PerfCounters &logger)
147{
148 os_commit_latency.consume_next(
149 logger.get_tavg_ms(
150 l_filestore_journal_latency));
151 os_apply_latency.consume_next(
152 logger.get_tavg_ms(
153 l_filestore_apply_latency));
154}
155
156
157ostream& operator<<(ostream& out, const FileStore::OpSequencer& s)
158{
159 return out << *s.parent;
160}
161
162int FileStore::get_cdir(const coll_t& cid, char *s, int len)
163{
164 const string &cid_str(cid.to_str());
165 return snprintf(s, len, "%s/current/%s", basedir.c_str(), cid_str.c_str());
166}
167
168int FileStore::get_index(const coll_t& cid, Index *index)
169{
170 int r = index_manager.get_index(cid, basedir, index);
171 assert(!m_filestore_fail_eio || r != -EIO);
172 return r;
173}
174
175int FileStore::init_index(const coll_t& cid)
176{
177 char path[PATH_MAX];
178 get_cdir(cid, path, sizeof(path));
179 int r = index_manager.init_index(cid, path, target_version);
180 assert(!m_filestore_fail_eio || r != -EIO);
181 return r;
182}
183
184int FileStore::lfn_find(const ghobject_t& oid, const Index& index, IndexedPath *path)
185{
186 IndexedPath path2;
187 if (!path)
188 path = &path2;
189 int r, exist;
190 assert(NULL != index.index);
191 r = (index.index)->lookup(oid, path, &exist);
192 if (r < 0) {
193 assert(!m_filestore_fail_eio || r != -EIO);
194 return r;
195 }
196 if (!exist)
197 return -ENOENT;
198 return 0;
199}
200
201int FileStore::lfn_truncate(const coll_t& cid, const ghobject_t& oid, off_t length)
202{
203 FDRef fd;
204 int r = lfn_open(cid, oid, false, &fd);
205 if (r < 0)
206 return r;
207 r = ::ftruncate(**fd, length);
208 if (r < 0)
209 r = -errno;
210 if (r >= 0 && m_filestore_sloppy_crc) {
211 int rc = backend->_crc_update_truncate(**fd, length);
212 assert(rc >= 0);
213 }
214 lfn_close(fd);
215 assert(!m_filestore_fail_eio || r != -EIO);
216 return r;
217}
218
219int FileStore::lfn_stat(const coll_t& cid, const ghobject_t& oid, struct stat *buf)
220{
221 IndexedPath path;
222 Index index;
223 int r = get_index(cid, &index);
224 if (r < 0)
225 return r;
226
227 assert(NULL != index.index);
228 RWLock::RLocker l((index.index)->access_lock);
229
230 r = lfn_find(oid, index, &path);
231 if (r < 0)
232 return r;
233 r = ::stat(path->path(), buf);
234 if (r < 0)
235 r = -errno;
236 return r;
237}
238
239int FileStore::lfn_open(const coll_t& cid,
240 const ghobject_t& oid,
241 bool create,
242 FDRef *outfd,
243 Index *index)
244{
245 assert(outfd);
246 int r = 0;
247 bool need_lock = true;
248 int flags = O_RDWR;
249
250 if (create)
251 flags |= O_CREAT;
252 if (cct->_conf->filestore_odsync_write) {
253 flags |= O_DSYNC;
254 }
255
256 Index index2;
257 if (!index) {
258 index = &index2;
259 }
260 if (!((*index).index)) {
261 r = get_index(cid, index);
262 if (r < 0) {
31f18b77 263 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
7c673cae
FG
264 return r;
265 }
266 } else {
267 need_lock = false;
268 }
269
270 int fd, exist;
271 assert(NULL != (*index).index);
272 if (need_lock) {
273 ((*index).index)->access_lock.get_write();
274 }
275 if (!replaying) {
276 *outfd = fdcache.lookup(oid);
277 if (*outfd) {
278 if (need_lock) {
279 ((*index).index)->access_lock.put_write();
280 }
281 return 0;
282 }
283 }
284
285
286 IndexedPath path2;
287 IndexedPath *path = &path2;
288
289 r = (*index)->lookup(oid, path, &exist);
290 if (r < 0) {
291 derr << "could not find " << oid << " in index: "
292 << cpp_strerror(-r) << dendl;
293 goto fail;
294 }
295
296 r = ::open((*path)->path(), flags, 0644);
297 if (r < 0) {
298 r = -errno;
299 dout(10) << "error opening file " << (*path)->path() << " with flags="
300 << flags << ": " << cpp_strerror(-r) << dendl;
301 goto fail;
302 }
303 fd = r;
304 if (create && (!exist)) {
305 r = (*index)->created(oid, (*path)->path());
306 if (r < 0) {
307 VOID_TEMP_FAILURE_RETRY(::close(fd));
308 derr << "error creating " << oid << " (" << (*path)->path()
309 << ") in index: " << cpp_strerror(-r) << dendl;
310 goto fail;
311 }
312 r = chain_fsetxattr<true, true>(
313 fd, XATTR_SPILL_OUT_NAME,
314 XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT));
315 if (r < 0) {
316 VOID_TEMP_FAILURE_RETRY(::close(fd));
317 derr << "error setting spillout xattr for oid " << oid << " (" << (*path)->path()
318 << "):" << cpp_strerror(-r) << dendl;
319 goto fail;
320 }
321 }
322
323 if (!replaying) {
324 bool existed;
325 *outfd = fdcache.add(oid, fd, &existed);
326 if (existed) {
327 TEMP_FAILURE_RETRY(::close(fd));
328 }
329 } else {
330 *outfd = std::make_shared<FDCache::FD>(fd);
331 }
332
333 if (need_lock) {
334 ((*index).index)->access_lock.put_write();
335 }
336
337 return 0;
338
339 fail:
340
341 if (need_lock) {
342 ((*index).index)->access_lock.put_write();
343 }
344
345 assert(!m_filestore_fail_eio || r != -EIO);
346 return r;
347}
348
349void FileStore::lfn_close(FDRef fd)
350{
351}
352
353int FileStore::lfn_link(const coll_t& c, const coll_t& newcid, const ghobject_t& o, const ghobject_t& newoid)
354{
355 Index index_new, index_old;
356 IndexedPath path_new, path_old;
357 int exist;
358 int r;
359 bool index_same = false;
360 if (c < newcid) {
361 r = get_index(newcid, &index_new);
362 if (r < 0)
363 return r;
364 r = get_index(c, &index_old);
365 if (r < 0)
366 return r;
367 } else if (c == newcid) {
368 r = get_index(c, &index_old);
369 if (r < 0)
370 return r;
371 index_new = index_old;
372 index_same = true;
373 } else {
374 r = get_index(c, &index_old);
375 if (r < 0)
376 return r;
377 r = get_index(newcid, &index_new);
378 if (r < 0)
379 return r;
380 }
381
382 assert(NULL != index_old.index);
383 assert(NULL != index_new.index);
384
385 if (!index_same) {
386
387 RWLock::RLocker l1((index_old.index)->access_lock);
388
389 r = index_old->lookup(o, &path_old, &exist);
390 if (r < 0) {
391 assert(!m_filestore_fail_eio || r != -EIO);
392 return r;
393 }
394 if (!exist)
395 return -ENOENT;
396
397 RWLock::WLocker l2((index_new.index)->access_lock);
398
399 r = index_new->lookup(newoid, &path_new, &exist);
400 if (r < 0) {
401 assert(!m_filestore_fail_eio || r != -EIO);
402 return r;
403 }
404 if (exist)
405 return -EEXIST;
406
31f18b77
FG
407 dout(25) << __FUNC__ << ": path_old: " << path_old << dendl;
408 dout(25) << __FUNC__ << ": path_new: " << path_new << dendl;
7c673cae
FG
409 r = ::link(path_old->path(), path_new->path());
410 if (r < 0)
411 return -errno;
412
413 r = index_new->created(newoid, path_new->path());
414 if (r < 0) {
415 assert(!m_filestore_fail_eio || r != -EIO);
416 return r;
417 }
418 } else {
419 RWLock::WLocker l1((index_old.index)->access_lock);
420
421 r = index_old->lookup(o, &path_old, &exist);
422 if (r < 0) {
423 assert(!m_filestore_fail_eio || r != -EIO);
424 return r;
425 }
426 if (!exist)
427 return -ENOENT;
428
429 r = index_new->lookup(newoid, &path_new, &exist);
430 if (r < 0) {
431 assert(!m_filestore_fail_eio || r != -EIO);
432 return r;
433 }
434 if (exist)
435 return -EEXIST;
436
31f18b77
FG
437 dout(25) << __FUNC__ << ": path_old: " << path_old << dendl;
438 dout(25) << __FUNC__ << ": path_new: " << path_new << dendl;
7c673cae
FG
439 r = ::link(path_old->path(), path_new->path());
440 if (r < 0)
441 return -errno;
442
443 // make sure old fd for unlinked/overwritten file is gone
444 fdcache.clear(newoid);
445
446 r = index_new->created(newoid, path_new->path());
447 if (r < 0) {
448 assert(!m_filestore_fail_eio || r != -EIO);
449 return r;
450 }
451 }
452 return 0;
453}
454
455int FileStore::lfn_unlink(const coll_t& cid, const ghobject_t& o,
456 const SequencerPosition &spos,
457 bool force_clear_omap)
458{
459 Index index;
460 int r = get_index(cid, &index);
461 if (r < 0) {
31f18b77 462 dout(25) << __FUNC__ << ": get_index failed " << cpp_strerror(r) << dendl;
7c673cae
FG
463 return r;
464 }
465
466 assert(NULL != index.index);
467 RWLock::WLocker l((index.index)->access_lock);
468
469 {
470 IndexedPath path;
471 int hardlink;
472 r = index->lookup(o, &path, &hardlink);
473 if (r < 0) {
474 assert(!m_filestore_fail_eio || r != -EIO);
475 return r;
476 }
477
478 if (!force_clear_omap) {
479 if (hardlink == 0 || hardlink == 1) {
480 force_clear_omap = true;
481 }
482 }
483 if (force_clear_omap) {
31f18b77 484 dout(20) << __FUNC__ << ": clearing omap on " << o
7c673cae
FG
485 << " in cid " << cid << dendl;
486 r = object_map->clear(o, &spos);
487 if (r < 0 && r != -ENOENT) {
31f18b77 488 dout(25) << __FUNC__ << ": omap clear failed " << cpp_strerror(r) << dendl;
7c673cae
FG
489 assert(!m_filestore_fail_eio || r != -EIO);
490 return r;
491 }
492 if (cct->_conf->filestore_debug_inject_read_err) {
493 debug_obj_on_delete(o);
494 }
495 if (!m_disable_wbthrottle) {
496 wbthrottle.clear_object(o); // should be only non-cache ref
497 }
498 fdcache.clear(o);
499 } else {
500 /* Ensure that replay of this op doesn't result in the object_map
501 * going away.
502 */
503 if (!backend->can_checkpoint())
504 object_map->sync(&o, &spos);
505 }
506 if (hardlink == 0) {
507 if (!m_disable_wbthrottle) {
508 wbthrottle.clear_object(o); // should be only non-cache ref
509 }
510 return 0;
511 }
512 }
513 r = index->unlink(o);
514 if (r < 0) {
31f18b77 515 dout(25) << __FUNC__ << ": index unlink failed " << cpp_strerror(r) << dendl;
7c673cae
FG
516 return r;
517 }
518 return 0;
519}
520
521FileStore::FileStore(CephContext* cct, const std::string &base,
522 const std::string &jdev, osflagbits_t flags,
523 const char *name, bool do_update) :
524 JournalingObjectStore(cct, base),
525 internal_name(name),
526 basedir(base), journalpath(jdev),
527 generic_flags(flags),
528 blk_size(0),
529 fsid_fd(-1), op_fd(-1),
530 basedir_fd(-1), current_fd(-1),
531 backend(NULL),
532 index_manager(cct, do_update),
533 lock("FileStore::lock"),
534 force_sync(false),
535 sync_entry_timeo_lock("FileStore::sync_entry_timeo_lock"),
536 timer(cct, sync_entry_timeo_lock),
537 stop(false), sync_thread(this),
538 fdcache(cct),
539 wbthrottle(cct),
540 next_osr_id(0),
541 m_disable_wbthrottle(cct->_conf->filestore_odsync_write ||
542 !cct->_conf->filestore_wbthrottle_enable),
543 throttle_ops(cct, "filestore_ops", cct->_conf->filestore_caller_concurrency),
544 throttle_bytes(cct, "filestore_bytes", cct->_conf->filestore_caller_concurrency),
545 m_ondisk_finisher_num(cct->_conf->filestore_ondisk_finisher_threads),
546 m_apply_finisher_num(cct->_conf->filestore_apply_finisher_threads),
547 op_tp(cct, "FileStore::op_tp", "tp_fstore_op", cct->_conf->filestore_op_threads, "filestore_op_threads"),
548 op_wq(this, cct->_conf->filestore_op_thread_timeout,
549 cct->_conf->filestore_op_thread_suicide_timeout, &op_tp),
550 logger(NULL),
551 trace_endpoint("0.0.0.0", 0, "FileStore"),
552 read_error_lock("FileStore::read_error_lock"),
553 m_filestore_commit_timeout(cct->_conf->filestore_commit_timeout),
554 m_filestore_journal_parallel(cct->_conf->filestore_journal_parallel ),
555 m_filestore_journal_trailing(cct->_conf->filestore_journal_trailing),
556 m_filestore_journal_writeahead(cct->_conf->filestore_journal_writeahead),
557 m_filestore_fiemap_threshold(cct->_conf->filestore_fiemap_threshold),
558 m_filestore_max_sync_interval(cct->_conf->filestore_max_sync_interval),
559 m_filestore_min_sync_interval(cct->_conf->filestore_min_sync_interval),
560 m_filestore_fail_eio(cct->_conf->filestore_fail_eio),
561 m_filestore_fadvise(cct->_conf->filestore_fadvise),
562 do_update(do_update),
563 m_journal_dio(cct->_conf->journal_dio),
564 m_journal_aio(cct->_conf->journal_aio),
565 m_journal_force_aio(cct->_conf->journal_force_aio),
566 m_osd_rollback_to_cluster_snap(cct->_conf->osd_rollback_to_cluster_snap),
567 m_osd_use_stale_snap(cct->_conf->osd_use_stale_snap),
568 m_filestore_do_dump(false),
569 m_filestore_dump_fmt(true),
570 m_filestore_sloppy_crc(cct->_conf->filestore_sloppy_crc),
571 m_filestore_sloppy_crc_block_size(cct->_conf->filestore_sloppy_crc_block_size),
572 m_filestore_max_alloc_hint_size(cct->_conf->filestore_max_alloc_hint_size),
573 m_fs_type(0),
574 m_filestore_max_inline_xattr_size(0),
575 m_filestore_max_inline_xattrs(0),
576 m_filestore_max_xattr_value_size(0)
577{
31f18b77 578 m_filestore_kill_at = cct->_conf->filestore_kill_at;
7c673cae
FG
579 for (int i = 0; i < m_ondisk_finisher_num; ++i) {
580 ostringstream oss;
581 oss << "filestore-ondisk-" << i;
582 Finisher *f = new Finisher(cct, oss.str(), "fn_odsk_fstore");
583 ondisk_finishers.push_back(f);
584 }
585 for (int i = 0; i < m_apply_finisher_num; ++i) {
586 ostringstream oss;
587 oss << "filestore-apply-" << i;
588 Finisher *f = new Finisher(cct, oss.str(), "fn_appl_fstore");
589 apply_finishers.push_back(f);
590 }
591
592 ostringstream oss;
593 oss << basedir << "/current";
594 current_fn = oss.str();
595
596 ostringstream sss;
597 sss << basedir << "/current/commit_op_seq";
598 current_op_seq_fn = sss.str();
599
600 ostringstream omss;
601 if (cct->_conf->filestore_omap_backend_path != "") {
602 omap_dir = cct->_conf->filestore_omap_backend_path;
603 } else {
604 omss << basedir << "/current/omap";
605 omap_dir = omss.str();
606 }
607
608 // initialize logger
609 PerfCountersBuilder plb(cct, internal_name, l_filestore_first, l_filestore_last);
610
611 plb.add_u64(l_filestore_journal_queue_ops, "journal_queue_ops", "Operations in journal queue");
612 plb.add_u64(l_filestore_journal_ops, "journal_ops", "Active journal entries to be applied");
613 plb.add_u64(l_filestore_journal_queue_bytes, "journal_queue_bytes", "Size of journal queue");
614 plb.add_u64(l_filestore_journal_bytes, "journal_bytes", "Active journal operation size to be applied");
615 plb.add_time_avg(l_filestore_journal_latency, "journal_latency", "Average journal queue completing latency");
616 plb.add_u64_counter(l_filestore_journal_wr, "journal_wr", "Journal write IOs");
617 plb.add_u64_avg(l_filestore_journal_wr_bytes, "journal_wr_bytes", "Journal data written");
618 plb.add_u64(l_filestore_op_queue_max_ops, "op_queue_max_ops", "Max operations in writing to FS queue");
619 plb.add_u64(l_filestore_op_queue_ops, "op_queue_ops", "Operations in writing to FS queue");
620 plb.add_u64_counter(l_filestore_ops, "ops", "Operations written to store");
621 plb.add_u64(l_filestore_op_queue_max_bytes, "op_queue_max_bytes", "Max data in writing to FS queue");
622 plb.add_u64(l_filestore_op_queue_bytes, "op_queue_bytes", "Size of writing to FS queue");
623 plb.add_u64_counter(l_filestore_bytes, "bytes", "Data written to store");
624 plb.add_time_avg(l_filestore_apply_latency, "apply_latency", "Apply latency");
625 plb.add_u64(l_filestore_committing, "committing", "Is currently committing");
626
627 plb.add_u64_counter(l_filestore_commitcycle, "commitcycle", "Commit cycles");
628 plb.add_time_avg(l_filestore_commitcycle_interval, "commitcycle_interval", "Average interval between commits");
629 plb.add_time_avg(l_filestore_commitcycle_latency, "commitcycle_latency", "Average latency of commit");
630 plb.add_u64_counter(l_filestore_journal_full, "journal_full", "Journal writes while full");
631 plb.add_time_avg(l_filestore_queue_transaction_latency_avg, "queue_transaction_latency_avg", "Store operation queue latency");
224ce89b 632 plb.add_time(l_filestore_sync_pause_max_lat, "sync_pause_max_latency", "Max latency of op_wq pause before syncfs");
7c673cae
FG
633
634 logger = plb.create_perf_counters();
635
636 cct->get_perfcounters_collection()->add(logger);
637 cct->_conf->add_observer(this);
638
639 superblock.compat_features = get_fs_initial_compat_set();
640}
641
642FileStore::~FileStore()
643{
644 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
645 delete *it;
646 *it = NULL;
647 }
648 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
649 delete *it;
650 *it = NULL;
651 }
652 cct->_conf->remove_observer(this);
653 cct->get_perfcounters_collection()->remove(logger);
654
655 if (journal)
656 journal->logger = NULL;
657 delete logger;
658
659 if (m_filestore_do_dump) {
660 dump_stop();
661 }
662}
663
664static void get_attrname(const char *name, char *buf, int len)
665{
666 snprintf(buf, len, "user.ceph.%s", name);
667}
668
669bool parse_attrname(char **name)
670{
671 if (strncmp(*name, "user.ceph.", 10) == 0) {
672 *name += 10;
673 return true;
674 }
675 return false;
676}
677
678void FileStore::collect_metadata(map<string,string> *pm)
679{
680 char partition_path[PATH_MAX];
681 char dev_node[PATH_MAX];
682 int rc = 0;
683
684 (*pm)["filestore_backend"] = backend->get_name();
685 ostringstream ss;
686 ss << "0x" << std::hex << m_fs_type << std::dec;
687 (*pm)["filestore_f_type"] = ss.str();
688
689 if (cct->_conf->filestore_collect_device_partition_information) {
690 rc = get_device_by_uuid(get_fsid(), "PARTUUID", partition_path,
691 dev_node);
692 } else {
693 rc = -EINVAL;
694 }
695
696 switch (rc) {
697 case -EOPNOTSUPP:
698 case -EINVAL:
699 (*pm)["backend_filestore_partition_path"] = "unknown";
700 (*pm)["backend_filestore_dev_node"] = "unknown";
701 break;
702 case -ENODEV:
703 (*pm)["backend_filestore_partition_path"] = string(partition_path);
704 (*pm)["backend_filestore_dev_node"] = "unknown";
705 break;
706 default:
707 (*pm)["backend_filestore_partition_path"] = string(partition_path);
708 (*pm)["backend_filestore_dev_node"] = string(dev_node);
709 }
710}
711
712int FileStore::statfs(struct store_statfs_t *buf0)
713{
714 struct statfs buf;
715 buf0->reset();
716 if (::statfs(basedir.c_str(), &buf) < 0) {
717 int r = -errno;
718 assert(!m_filestore_fail_eio || r != -EIO);
719 assert(r != -ENOENT);
720 return r;
721 }
722 buf0->total = buf.f_blocks * buf.f_bsize;
723 buf0->available = buf.f_bavail * buf.f_bsize;
724 // Adjust for writes pending in the journal
725 if (journal) {
726 uint64_t estimate = journal->get_journal_size_estimate();
727 if (buf0->available > estimate)
728 buf0->available -= estimate;
729 else
730 buf0->available = 0;
731 }
732 return 0;
733}
734
735
736void FileStore::new_journal()
737{
738 if (journalpath.length()) {
739 dout(10) << "open_journal at " << journalpath << dendl;
740 journal = new FileJournal(cct, fsid, &finisher, &sync_cond,
741 journalpath.c_str(),
742 m_journal_dio, m_journal_aio,
743 m_journal_force_aio);
744 if (journal)
745 journal->logger = logger;
746 }
747 return;
748}
749
750int FileStore::dump_journal(ostream& out)
751{
752 int r;
753
754 if (!journalpath.length())
755 return -EINVAL;
756
757 FileJournal *journal = new FileJournal(cct, fsid, &finisher, &sync_cond, journalpath.c_str(), m_journal_dio);
758 r = journal->dump(out);
759 delete journal;
760 return r;
761}
762
763FileStoreBackend *FileStoreBackend::create(long f_type, FileStore *fs)
764{
765 switch (f_type) {
766#if defined(__linux__)
767 case BTRFS_SUPER_MAGIC:
768 return new BtrfsFileStoreBackend(fs);
769# ifdef HAVE_LIBXFS
770 case XFS_SUPER_MAGIC:
771 return new XfsFileStoreBackend(fs);
772# endif
773#endif
774#ifdef HAVE_LIBZFS
775 case ZFS_SUPER_MAGIC:
776 return new ZFSFileStoreBackend(fs);
777#endif
778 default:
779 return new GenericFileStoreBackend(fs);
780 }
781}
782
783void FileStore::create_backend(long f_type)
784{
785 m_fs_type = f_type;
786
787 assert(backend == NULL);
788 backend = FileStoreBackend::create(f_type, this);
789
790 dout(0) << "backend " << backend->get_name()
791 << " (magic 0x" << std::hex << f_type << std::dec << ")"
792 << dendl;
793
794 switch (f_type) {
795#if defined(__linux__)
796 case BTRFS_SUPER_MAGIC:
797 if (!m_disable_wbthrottle){
798 wbthrottle.set_fs(WBThrottle::BTRFS);
799 }
800 break;
801
802 case XFS_SUPER_MAGIC:
803 // wbthrottle is constructed with fs(WBThrottle::XFS)
804 break;
805#endif
806 }
807
808 set_xattr_limits_via_conf();
809}
810
811int FileStore::mkfs()
812{
813 int ret = 0;
814 char fsid_fn[PATH_MAX];
815 char fsid_str[40];
816 uuid_d old_fsid;
817 uuid_d old_omap_fsid;
818
819 dout(1) << "mkfs in " << basedir << dendl;
820 basedir_fd = ::open(basedir.c_str(), O_RDONLY);
821 if (basedir_fd < 0) {
822 ret = -errno;
224ce89b 823 derr << __FUNC__ << ": failed to open base dir " << basedir << ": " << cpp_strerror(ret) << dendl;
7c673cae
FG
824 return ret;
825 }
826
827 // open+lock fsid
828 snprintf(fsid_fn, sizeof(fsid_fn), "%s/fsid", basedir.c_str());
829 fsid_fd = ::open(fsid_fn, O_RDWR|O_CREAT, 0644);
830 if (fsid_fd < 0) {
831 ret = -errno;
224ce89b 832 derr << __FUNC__ << ": failed to open " << fsid_fn << ": " << cpp_strerror(ret) << dendl;
7c673cae
FG
833 goto close_basedir_fd;
834 }
835
836 if (lock_fsid() < 0) {
837 ret = -EBUSY;
838 goto close_fsid_fd;
839 }
840
841 if (read_fsid(fsid_fd, &old_fsid) < 0 || old_fsid.is_zero()) {
842 if (fsid.is_zero()) {
843 fsid.generate_random();
224ce89b 844 dout(1) << __FUNC__ << ": generated fsid " << fsid << dendl;
7c673cae 845 } else {
224ce89b 846 dout(1) << __FUNC__ << ": using provided fsid " << fsid << dendl;
7c673cae
FG
847 }
848
849 fsid.print(fsid_str);
850 strcat(fsid_str, "\n");
851 ret = ::ftruncate(fsid_fd, 0);
852 if (ret < 0) {
853 ret = -errno;
31f18b77 854 derr << __FUNC__ << ": failed to truncate fsid: "
7c673cae
FG
855 << cpp_strerror(ret) << dendl;
856 goto close_fsid_fd;
857 }
858 ret = safe_write(fsid_fd, fsid_str, strlen(fsid_str));
859 if (ret < 0) {
31f18b77 860 derr << __FUNC__ << ": failed to write fsid: "
7c673cae
FG
861 << cpp_strerror(ret) << dendl;
862 goto close_fsid_fd;
863 }
864 if (::fsync(fsid_fd) < 0) {
865 ret = -errno;
31f18b77 866 derr << __FUNC__ << ": close failed: can't write fsid: "
7c673cae
FG
867 << cpp_strerror(ret) << dendl;
868 goto close_fsid_fd;
869 }
224ce89b 870 dout(10) << __FUNC__ << ": fsid is " << fsid << dendl;
7c673cae
FG
871 } else {
872 if (!fsid.is_zero() && fsid != old_fsid) {
31f18b77 873 derr << __FUNC__ << ": on-disk fsid " << old_fsid << " != provided " << fsid << dendl;
7c673cae
FG
874 ret = -EINVAL;
875 goto close_fsid_fd;
876 }
877 fsid = old_fsid;
31f18b77 878 dout(1) << __FUNC__ << ": fsid is already set to " << fsid << dendl;
7c673cae
FG
879 }
880
881 // version stamp
882 ret = write_version_stamp();
883 if (ret < 0) {
31f18b77 884 derr << __FUNC__ << ": write_version_stamp() failed: "
7c673cae
FG
885 << cpp_strerror(ret) << dendl;
886 goto close_fsid_fd;
887 }
888
889 // superblock
890 superblock.omap_backend = cct->_conf->filestore_omap_backend;
891 ret = write_superblock();
892 if (ret < 0) {
31f18b77 893 derr << __FUNC__ << ": write_superblock() failed: "
7c673cae
FG
894 << cpp_strerror(ret) << dendl;
895 goto close_fsid_fd;
896 }
897
898 struct statfs basefs;
899 ret = ::fstatfs(basedir_fd, &basefs);
900 if (ret < 0) {
901 ret = -errno;
31f18b77 902 derr << __FUNC__ << ": cannot fstatfs basedir "
7c673cae
FG
903 << cpp_strerror(ret) << dendl;
904 goto close_fsid_fd;
905 }
906
224ce89b
WB
907#if defined(__linux__)
908 if (basefs.f_type == BTRFS_SUPER_MAGIC &&
909 !g_ceph_context->check_experimental_feature_enabled("btrfs")) {
910 derr << __FUNC__ << ": deprecated btrfs support is not enabled" << dendl;
911 goto close_fsid_fd;
912 }
913#endif
914
7c673cae
FG
915 create_backend(basefs.f_type);
916
917 ret = backend->create_current();
918 if (ret < 0) {
31f18b77 919 derr << __FUNC__ << ": failed to create current/ " << cpp_strerror(ret) << dendl;
7c673cae
FG
920 goto close_fsid_fd;
921 }
922
923 // write initial op_seq
924 {
925 uint64_t initial_seq = 0;
926 int fd = read_op_seq(&initial_seq);
927 if (fd < 0) {
928 ret = fd;
31f18b77 929 derr << __FUNC__ << ": failed to create " << current_op_seq_fn << ": "
7c673cae
FG
930 << cpp_strerror(ret) << dendl;
931 goto close_fsid_fd;
932 }
933 if (initial_seq == 0) {
934 ret = write_op_seq(fd, 1);
935 if (ret < 0) {
936 VOID_TEMP_FAILURE_RETRY(::close(fd));
31f18b77 937 derr << __FUNC__ << ": failed to write to " << current_op_seq_fn << ": "
7c673cae
FG
938 << cpp_strerror(ret) << dendl;
939 goto close_fsid_fd;
940 }
941
942 if (backend->can_checkpoint()) {
943 // create snap_1 too
944 current_fd = ::open(current_fn.c_str(), O_RDONLY);
945 assert(current_fd >= 0);
946 char s[NAME_MAX];
947 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, 1ull);
948 ret = backend->create_checkpoint(s, NULL);
949 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
950 if (ret < 0 && ret != -EEXIST) {
951 VOID_TEMP_FAILURE_RETRY(::close(fd));
31f18b77 952 derr << __FUNC__ << ": failed to create snap_1: " << cpp_strerror(ret) << dendl;
7c673cae
FG
953 goto close_fsid_fd;
954 }
955 }
956 }
957 VOID_TEMP_FAILURE_RETRY(::close(fd));
958 }
959 ret = KeyValueDB::test_init(superblock.omap_backend, omap_dir);
960 if (ret < 0) {
31f18b77 961 derr << __FUNC__ << ": failed to create " << cct->_conf->filestore_omap_backend << dendl;
7c673cae
FG
962 goto close_fsid_fd;
963 }
964 // create fsid under omap
965 // open+lock fsid
966 int omap_fsid_fd;
967 char omap_fsid_fn[PATH_MAX];
968 snprintf(omap_fsid_fn, sizeof(omap_fsid_fn), "%s/osd_uuid", omap_dir.c_str());
969 omap_fsid_fd = ::open(omap_fsid_fn, O_RDWR|O_CREAT, 0644);
970 if (omap_fsid_fd < 0) {
971 ret = -errno;
31f18b77 972 derr << __FUNC__ << ": failed to open " << omap_fsid_fn << ": " << cpp_strerror(ret) << dendl;
7c673cae
FG
973 goto close_fsid_fd;
974 }
975
976 if (read_fsid(omap_fsid_fd, &old_omap_fsid) < 0 || old_omap_fsid.is_zero()) {
977 assert(!fsid.is_zero());
978 fsid.print(fsid_str);
979 strcat(fsid_str, "\n");
980 ret = ::ftruncate(omap_fsid_fd, 0);
981 if (ret < 0) {
982 ret = -errno;
31f18b77 983 derr << __FUNC__ << ": failed to truncate fsid: "
7c673cae
FG
984 << cpp_strerror(ret) << dendl;
985 goto close_omap_fsid_fd;
986 }
987 ret = safe_write(omap_fsid_fd, fsid_str, strlen(fsid_str));
988 if (ret < 0) {
31f18b77 989 derr << __FUNC__ << ": failed to write fsid: "
7c673cae
FG
990 << cpp_strerror(ret) << dendl;
991 goto close_omap_fsid_fd;
992 }
31f18b77 993 dout(10) << __FUNC__ << ": write success, fsid:" << fsid_str << ", ret:" << ret << dendl;
7c673cae
FG
994 if (::fsync(omap_fsid_fd) < 0) {
995 ret = -errno;
31f18b77 996 derr << __FUNC__ << ": close failed: can't write fsid: "
7c673cae
FG
997 << cpp_strerror(ret) << dendl;
998 goto close_omap_fsid_fd;
999 }
1000 dout(10) << "mkfs omap fsid is " << fsid << dendl;
1001 } else {
1002 if (fsid != old_omap_fsid) {
31f18b77 1003 derr << __FUNC__ << ": " << omap_fsid_fn
7c673cae
FG
1004 << " has existed omap fsid " << old_omap_fsid
1005 << " != expected osd fsid " << fsid
1006 << dendl;
1007 ret = -EINVAL;
1008 goto close_omap_fsid_fd;
1009 }
31f18b77 1010 dout(1) << __FUNC__ << ": omap fsid is already set to " << fsid << dendl;
7c673cae
FG
1011 }
1012
1013 dout(1) << cct->_conf->filestore_omap_backend << " db exists/created" << dendl;
1014
1015 // journal?
1016 ret = mkjournal();
1017 if (ret)
1018 goto close_omap_fsid_fd;
1019
1020 ret = write_meta("type", "filestore");
1021 if (ret)
1022 goto close_omap_fsid_fd;
1023
1024 dout(1) << "mkfs done in " << basedir << dendl;
1025 ret = 0;
1026
1027 close_omap_fsid_fd:
1028 VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd));
1029 close_fsid_fd:
1030 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1031 fsid_fd = -1;
1032 close_basedir_fd:
1033 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1034 delete backend;
1035 backend = NULL;
1036 return ret;
1037}
1038
1039int FileStore::mkjournal()
1040{
1041 // read fsid
1042 int ret;
1043 char fn[PATH_MAX];
1044 snprintf(fn, sizeof(fn), "%s/fsid", basedir.c_str());
1045 int fd = ::open(fn, O_RDONLY, 0644);
1046 if (fd < 0) {
1047 int err = errno;
31f18b77 1048 derr << __FUNC__ << ": open error: " << cpp_strerror(err) << dendl;
7c673cae
FG
1049 return -err;
1050 }
1051 ret = read_fsid(fd, &fsid);
1052 if (ret < 0) {
31f18b77 1053 derr << __FUNC__ << ": read error: " << cpp_strerror(ret) << dendl;
7c673cae
FG
1054 VOID_TEMP_FAILURE_RETRY(::close(fd));
1055 return ret;
1056 }
1057 VOID_TEMP_FAILURE_RETRY(::close(fd));
1058
1059 ret = 0;
1060
1061 new_journal();
1062 if (journal) {
1063 ret = journal->check();
1064 if (ret < 0) {
1065 ret = journal->create();
1066 if (ret)
31f18b77 1067 derr << __FUNC__ << ": error creating journal on " << journalpath
7c673cae
FG
1068 << ": " << cpp_strerror(ret) << dendl;
1069 else
31f18b77 1070 dout(0) << __FUNC__ << ": created journal on " << journalpath << dendl;
7c673cae
FG
1071 }
1072 delete journal;
1073 journal = 0;
1074 }
1075 return ret;
1076}
1077
1078int FileStore::read_fsid(int fd, uuid_d *uuid)
1079{
1080 char fsid_str[40];
1081 memset(fsid_str, 0, sizeof(fsid_str));
1082 int ret = safe_read(fd, fsid_str, sizeof(fsid_str));
1083 if (ret < 0)
1084 return ret;
1085 if (ret == 8) {
1086 // old 64-bit fsid... mirror it.
1087 *(uint64_t*)&uuid->bytes()[0] = *(uint64_t*)fsid_str;
1088 *(uint64_t*)&uuid->bytes()[8] = *(uint64_t*)fsid_str;
1089 return 0;
1090 }
1091
1092 if (ret > 36)
1093 fsid_str[36] = 0;
1094 else
1095 fsid_str[ret] = 0;
1096 if (!uuid->parse(fsid_str))
1097 return -EINVAL;
1098 return 0;
1099}
1100
1101int FileStore::lock_fsid()
1102{
1103 struct flock l;
1104 memset(&l, 0, sizeof(l));
1105 l.l_type = F_WRLCK;
1106 l.l_whence = SEEK_SET;
1107 l.l_start = 0;
1108 l.l_len = 0;
1109 int r = ::fcntl(fsid_fd, F_SETLK, &l);
1110 if (r < 0) {
1111 int err = errno;
31f18b77 1112 dout(0) << __FUNC__ << ": failed to lock " << basedir << "/fsid, is another ceph-osd still running? "
7c673cae
FG
1113 << cpp_strerror(err) << dendl;
1114 return -err;
1115 }
1116 return 0;
1117}
1118
1119bool FileStore::test_mount_in_use()
1120{
31f18b77 1121 dout(5) << __FUNC__ << ": basedir " << basedir << " journal " << journalpath << dendl;
7c673cae
FG
1122 char fn[PATH_MAX];
1123 snprintf(fn, sizeof(fn), "%s/fsid", basedir.c_str());
1124
1125 // verify fs isn't in use
1126
1127 fsid_fd = ::open(fn, O_RDWR, 0644);
1128 if (fsid_fd < 0)
1129 return 0; // no fsid, ok.
1130 bool inuse = lock_fsid() < 0;
1131 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1132 fsid_fd = -1;
1133 return inuse;
1134}
1135
31f18b77
FG
1136bool FileStore::is_rotational()
1137{
1138 bool rotational;
1139 if (backend) {
1140 rotational = backend->is_rotational();
1141 } else {
1142 int fd = ::open(basedir.c_str(), O_RDONLY);
1143 if (fd < 0)
1144 return true;
1145 struct statfs st;
1146 int r = ::fstatfs(fd, &st);
1147 ::close(fd);
1148 if (r < 0) {
1149 return true;
1150 }
1151 create_backend(st.f_type);
1152 rotational = backend->is_rotational();
1153 delete backend;
1154 backend = NULL;
1155 }
1156 dout(10) << __func__ << " " << (int)rotational << dendl;
1157 return rotational;
1158}
1159
7c673cae
FG
1160int FileStore::_detect_fs()
1161{
1162 struct statfs st;
1163 int r = ::fstatfs(basedir_fd, &st);
1164 if (r < 0)
1165 return -errno;
1166
1167 blk_size = st.f_bsize;
1168
224ce89b
WB
1169#if defined(__linux__)
1170 if (st.f_type == BTRFS_SUPER_MAGIC &&
1171 !g_ceph_context->check_experimental_feature_enabled("btrfs")) {
1172 derr <<__FUNC__ << ": deprecated btrfs support is not enabled" << dendl;
1173 return -EPERM;
1174 }
1175#endif
1176
7c673cae
FG
1177 create_backend(st.f_type);
1178
1179 r = backend->detect_features();
1180 if (r < 0) {
31f18b77 1181 derr << __FUNC__ << ": detect_features error: " << cpp_strerror(r) << dendl;
7c673cae
FG
1182 return r;
1183 }
1184
1185 // test xattrs
1186 char fn[PATH_MAX];
1187 int x = rand();
1188 int y = x+1;
1189 snprintf(fn, sizeof(fn), "%s/xattr_test", basedir.c_str());
1190 int tmpfd = ::open(fn, O_CREAT|O_WRONLY|O_TRUNC, 0700);
1191 if (tmpfd < 0) {
1192 int ret = -errno;
31f18b77 1193 derr << __FUNC__ << ": unable to create " << fn << ": " << cpp_strerror(ret) << dendl;
7c673cae
FG
1194 return ret;
1195 }
1196
1197 int ret = chain_fsetxattr(tmpfd, "user.test", &x, sizeof(x));
1198 if (ret >= 0)
1199 ret = chain_fgetxattr(tmpfd, "user.test", &y, sizeof(y));
1200 if ((ret < 0) || (x != y)) {
1201 derr << "Extended attributes don't appear to work. ";
1202 if (ret)
1203 *_dout << "Got error " + cpp_strerror(ret) + ". ";
1204 *_dout << "If you are using ext3 or ext4, be sure to mount the underlying "
1205 << "file system with the 'user_xattr' option." << dendl;
1206 ::unlink(fn);
1207 VOID_TEMP_FAILURE_RETRY(::close(tmpfd));
1208 return -ENOTSUP;
1209 }
1210
1211 char buf[1000];
1212 memset(buf, 0, sizeof(buf)); // shut up valgrind
1213 chain_fsetxattr(tmpfd, "user.test", &buf, sizeof(buf));
1214 chain_fsetxattr(tmpfd, "user.test2", &buf, sizeof(buf));
1215 chain_fsetxattr(tmpfd, "user.test3", &buf, sizeof(buf));
1216 chain_fsetxattr(tmpfd, "user.test4", &buf, sizeof(buf));
1217 ret = chain_fsetxattr(tmpfd, "user.test5", &buf, sizeof(buf));
1218 if (ret == -ENOSPC) {
1219 dout(0) << "limited size xattrs" << dendl;
1220 }
1221 chain_fremovexattr(tmpfd, "user.test");
1222 chain_fremovexattr(tmpfd, "user.test2");
1223 chain_fremovexattr(tmpfd, "user.test3");
1224 chain_fremovexattr(tmpfd, "user.test4");
1225 chain_fremovexattr(tmpfd, "user.test5");
1226
1227 ::unlink(fn);
1228 VOID_TEMP_FAILURE_RETRY(::close(tmpfd));
1229
1230 return 0;
1231}
1232
1233int FileStore::_sanity_check_fs()
1234{
1235 // sanity check(s)
1236
1237 if (((int)m_filestore_journal_writeahead +
1238 (int)m_filestore_journal_parallel +
1239 (int)m_filestore_journal_trailing) > 1) {
1240 dout(0) << "mount ERROR: more than one of filestore journal {writeahead,parallel,trailing} enabled" << dendl;
1241 cerr << TEXT_RED
1242 << " ** WARNING: more than one of 'filestore journal {writeahead,parallel,trailing}'\n"
1243 << " is enabled in ceph.conf. You must choose a single journal mode."
1244 << TEXT_NORMAL << std::endl;
1245 return -EINVAL;
1246 }
1247
1248 if (!backend->can_checkpoint()) {
1249 if (!journal || !m_filestore_journal_writeahead) {
1250 dout(0) << "mount WARNING: no btrfs, and no journal in writeahead mode; data may be lost" << dendl;
1251 cerr << TEXT_RED
1252 << " ** WARNING: no btrfs AND (no journal OR journal not in writeahead mode)\n"
1253 << " For non-btrfs volumes, a writeahead journal is required to\n"
1254 << " maintain on-disk consistency in the event of a crash. Your conf\n"
1255 << " should include something like:\n"
1256 << " osd journal = /path/to/journal_device_or_file\n"
1257 << " filestore journal writeahead = true\n"
1258 << TEXT_NORMAL;
1259 }
1260 }
1261
1262 if (!journal) {
1263 dout(0) << "mount WARNING: no journal" << dendl;
1264 cerr << TEXT_YELLOW
1265 << " ** WARNING: No osd journal is configured: write latency may be high.\n"
1266 << " If you will not be using an osd journal, write latency may be\n"
1267 << " relatively high. It can be reduced somewhat by lowering\n"
1268 << " filestore_max_sync_interval, but lower values mean lower write\n"
1269 << " throughput, especially with spinning disks.\n"
1270 << TEXT_NORMAL;
1271 }
1272
1273 return 0;
1274}
1275
1276int FileStore::write_superblock()
1277{
1278 bufferlist bl;
1279 ::encode(superblock, bl);
1280 return safe_write_file(basedir.c_str(), "superblock",
1281 bl.c_str(), bl.length());
1282}
1283
1284int FileStore::read_superblock()
1285{
1286 bufferptr bp(PATH_MAX);
1287 int ret = safe_read_file(basedir.c_str(), "superblock",
1288 bp.c_str(), bp.length());
1289 if (ret < 0) {
1290 if (ret == -ENOENT) {
1291 // If the file doesn't exist write initial CompatSet
1292 return write_superblock();
1293 }
1294 return ret;
1295 }
1296
1297 bufferlist bl;
1298 bl.push_back(std::move(bp));
1299 bufferlist::iterator i = bl.begin();
1300 ::decode(superblock, i);
1301 return 0;
1302}
1303
1304int FileStore::update_version_stamp()
1305{
1306 return write_version_stamp();
1307}
1308
1309int FileStore::version_stamp_is_valid(uint32_t *version)
1310{
1311 bufferptr bp(PATH_MAX);
1312 int ret = safe_read_file(basedir.c_str(), "store_version",
1313 bp.c_str(), bp.length());
1314 if (ret < 0) {
1315 return ret;
1316 }
1317 bufferlist bl;
1318 bl.push_back(std::move(bp));
1319 bufferlist::iterator i = bl.begin();
1320 ::decode(*version, i);
31f18b77 1321 dout(10) << __FUNC__ << ": was " << *version << " vs target "
7c673cae
FG
1322 << target_version << dendl;
1323 if (*version == target_version)
1324 return 1;
1325 else
1326 return 0;
1327}
1328
1329int FileStore::write_version_stamp()
1330{
31f18b77 1331 dout(1) << __FUNC__ << ": " << target_version << dendl;
7c673cae
FG
1332 bufferlist bl;
1333 ::encode(target_version, bl);
1334
1335 return safe_write_file(basedir.c_str(), "store_version",
1336 bl.c_str(), bl.length());
1337}
1338
1339int FileStore::upgrade()
1340{
31f18b77 1341 dout(1) << __FUNC__ << dendl;
7c673cae
FG
1342 uint32_t version;
1343 int r = version_stamp_is_valid(&version);
1344
1345 if (r == -ENOENT) {
1346 derr << "The store_version file doesn't exist." << dendl;
1347 return -EINVAL;
1348 }
1349 if (r < 0)
1350 return r;
1351 if (r == 1)
1352 return 0;
1353
1354 if (version < 3) {
1355 derr << "ObjectStore is old at version " << version << ". Please upgrade to firefly v0.80.x, convert your store, and then upgrade." << dendl;
1356 return -EINVAL;
1357 }
1358
1359 // nothing necessary in FileStore for v3 -> v4 upgrade; we just need to
1360 // open up DBObjectMap with the do_upgrade flag, which we already did.
1361 update_version_stamp();
1362 return 0;
1363}
1364
1365int FileStore::read_op_seq(uint64_t *seq)
1366{
1367 int op_fd = ::open(current_op_seq_fn.c_str(), O_CREAT|O_RDWR, 0644);
1368 if (op_fd < 0) {
1369 int r = -errno;
1370 assert(!m_filestore_fail_eio || r != -EIO);
1371 return r;
1372 }
1373 char s[40];
1374 memset(s, 0, sizeof(s));
1375 int ret = safe_read(op_fd, s, sizeof(s) - 1);
1376 if (ret < 0) {
31f18b77 1377 derr << __FUNC__ << ": error reading " << current_op_seq_fn << ": " << cpp_strerror(ret) << dendl;
7c673cae
FG
1378 VOID_TEMP_FAILURE_RETRY(::close(op_fd));
1379 assert(!m_filestore_fail_eio || ret != -EIO);
1380 return ret;
1381 }
1382 *seq = atoll(s);
1383 return op_fd;
1384}
1385
1386int FileStore::write_op_seq(int fd, uint64_t seq)
1387{
1388 char s[30];
1389 snprintf(s, sizeof(s), "%" PRId64 "\n", seq);
1390 int ret = TEMP_FAILURE_RETRY(::pwrite(fd, s, strlen(s), 0));
1391 if (ret < 0) {
1392 ret = -errno;
1393 assert(!m_filestore_fail_eio || ret != -EIO);
1394 }
1395 return ret;
1396}
1397
1398int FileStore::mount()
1399{
1400 int ret;
1401 char buf[PATH_MAX];
1402 uint64_t initial_op_seq;
1403 uuid_d omap_fsid;
1404 set<string> cluster_snaps;
1405 CompatSet supported_compat_set = get_fs_supported_compat_set();
1406
1407 dout(5) << "basedir " << basedir << " journal " << journalpath << dendl;
1408
1409 ret = set_throttle_params();
1410 if (ret != 0)
1411 goto done;
1412
1413 // make sure global base dir exists
1414 if (::access(basedir.c_str(), R_OK | W_OK)) {
1415 ret = -errno;
31f18b77 1416 derr << __FUNC__ << ": unable to access basedir '" << basedir << "': "
7c673cae
FG
1417 << cpp_strerror(ret) << dendl;
1418 goto done;
1419 }
1420
1421 // get fsid
1422 snprintf(buf, sizeof(buf), "%s/fsid", basedir.c_str());
1423 fsid_fd = ::open(buf, O_RDWR, 0644);
1424 if (fsid_fd < 0) {
1425 ret = -errno;
31f18b77 1426 derr << __FUNC__ << ": error opening '" << buf << "': "
7c673cae
FG
1427 << cpp_strerror(ret) << dendl;
1428 goto done;
1429 }
1430
1431 ret = read_fsid(fsid_fd, &fsid);
1432 if (ret < 0) {
31f18b77 1433 derr << __FUNC__ << ": error reading fsid_fd: " << cpp_strerror(ret)
7c673cae
FG
1434 << dendl;
1435 goto close_fsid_fd;
1436 }
1437
1438 if (lock_fsid() < 0) {
31f18b77 1439 derr << __FUNC__ << ": lock_fsid failed" << dendl;
7c673cae
FG
1440 ret = -EBUSY;
1441 goto close_fsid_fd;
1442 }
1443
1444 dout(10) << "mount fsid is " << fsid << dendl;
1445
1446
1447 uint32_t version_stamp;
1448 ret = version_stamp_is_valid(&version_stamp);
1449 if (ret < 0) {
31f18b77 1450 derr << __FUNC__ << ": error in version_stamp_is_valid: "
7c673cae
FG
1451 << cpp_strerror(ret) << dendl;
1452 goto close_fsid_fd;
1453 } else if (ret == 0) {
1454 if (do_update || (int)version_stamp < cct->_conf->filestore_update_to) {
31f18b77 1455 derr << __FUNC__ << ": stale version stamp detected: "
7c673cae
FG
1456 << version_stamp
1457 << ". Proceeding, do_update "
1458 << "is set, performing disk format upgrade."
1459 << dendl;
1460 do_update = true;
1461 } else {
1462 ret = -EINVAL;
31f18b77 1463 derr << __FUNC__ << ": stale version stamp " << version_stamp
7c673cae
FG
1464 << ". Please run the FileStore update script before starting the "
1465 << "OSD, or set filestore_update_to to " << target_version
1466 << " (currently " << cct->_conf->filestore_update_to << ")"
1467 << dendl;
1468 goto close_fsid_fd;
1469 }
1470 }
1471
1472 ret = read_superblock();
1473 if (ret < 0) {
1474 goto close_fsid_fd;
1475 }
1476
1477 // Check if this FileStore supports all the necessary features to mount
1478 if (supported_compat_set.compare(superblock.compat_features) == -1) {
31f18b77 1479 derr << __FUNC__ << ": Incompatible features set "
7c673cae
FG
1480 << superblock.compat_features << dendl;
1481 ret = -EINVAL;
1482 goto close_fsid_fd;
1483 }
1484
1485 // open some dir handles
1486 basedir_fd = ::open(basedir.c_str(), O_RDONLY);
1487 if (basedir_fd < 0) {
1488 ret = -errno;
31f18b77 1489 derr << __FUNC__ << ": failed to open " << basedir << ": "
7c673cae
FG
1490 << cpp_strerror(ret) << dendl;
1491 basedir_fd = -1;
1492 goto close_fsid_fd;
1493 }
1494
1495 // test for btrfs, xattrs, etc.
1496 ret = _detect_fs();
1497 if (ret < 0) {
31f18b77 1498 derr << __FUNC__ << ": error in _detect_fs: "
7c673cae
FG
1499 << cpp_strerror(ret) << dendl;
1500 goto close_basedir_fd;
1501 }
1502
1503 {
1504 list<string> ls;
1505 ret = backend->list_checkpoints(ls);
1506 if (ret < 0) {
31f18b77 1507 derr << __FUNC__ << ": error in _list_snaps: "<< cpp_strerror(ret) << dendl;
7c673cae
FG
1508 goto close_basedir_fd;
1509 }
1510
1511 long long unsigned c, prev = 0;
1512 char clustersnap[NAME_MAX];
1513 for (list<string>::iterator it = ls.begin(); it != ls.end(); ++it) {
1514 if (sscanf(it->c_str(), COMMIT_SNAP_ITEM, &c) == 1) {
1515 assert(c > prev);
1516 prev = c;
1517 snaps.push_back(c);
1518 } else if (sscanf(it->c_str(), CLUSTER_SNAP_ITEM, clustersnap) == 1)
1519 cluster_snaps.insert(*it);
1520 }
1521 }
1522
1523 if (m_osd_rollback_to_cluster_snap.length() &&
1524 cluster_snaps.count(m_osd_rollback_to_cluster_snap) == 0) {
1525 derr << "rollback to cluster snapshot '" << m_osd_rollback_to_cluster_snap << "': not found" << dendl;
1526 ret = -ENOENT;
1527 goto close_basedir_fd;
1528 }
1529
1530 char nosnapfn[200];
1531 snprintf(nosnapfn, sizeof(nosnapfn), "%s/nosnap", current_fn.c_str());
1532
1533 if (backend->can_checkpoint()) {
1534 if (snaps.empty()) {
31f18b77 1535 dout(0) << __FUNC__ << ": WARNING: no consistent snaps found, store may be in inconsistent state" << dendl;
7c673cae
FG
1536 } else {
1537 char s[NAME_MAX];
1538 uint64_t curr_seq = 0;
1539
1540 if (m_osd_rollback_to_cluster_snap.length()) {
1541 derr << TEXT_RED
1542 << " ** NOTE: rolling back to cluster snapshot " << m_osd_rollback_to_cluster_snap << " **"
1543 << TEXT_NORMAL
1544 << dendl;
1545 assert(cluster_snaps.count(m_osd_rollback_to_cluster_snap));
1546 snprintf(s, sizeof(s), CLUSTER_SNAP_ITEM, m_osd_rollback_to_cluster_snap.c_str());
1547 } else {
1548 {
1549 int fd = read_op_seq(&curr_seq);
1550 if (fd >= 0) {
1551 VOID_TEMP_FAILURE_RETRY(::close(fd));
1552 }
1553 }
1554 if (curr_seq)
1555 dout(10) << " current/ seq was " << curr_seq << dendl;
1556 else
1557 dout(10) << " current/ missing entirely (unusual, but okay)" << dendl;
1558
1559 uint64_t cp = snaps.back();
1560 dout(10) << " most recent snap from " << snaps << " is " << cp << dendl;
1561
1562 // if current/ is marked as non-snapshotted, refuse to roll
1563 // back (without clear direction) to avoid throwing out new
1564 // data.
1565 struct stat st;
1566 if (::stat(nosnapfn, &st) == 0) {
1567 if (!m_osd_use_stale_snap) {
1568 derr << "ERROR: " << nosnapfn << " exists, not rolling back to avoid losing new data" << dendl;
1569 derr << "Force rollback to old snapshotted version with 'osd use stale snap = true'" << dendl;
1570 derr << "config option for --osd-use-stale-snap startup argument." << dendl;
1571 ret = -ENOTSUP;
1572 goto close_basedir_fd;
1573 }
1574 derr << "WARNING: user forced start with data sequence mismatch: current was " << curr_seq
1575 << ", newest snap is " << cp << dendl;
1576 cerr << TEXT_YELLOW
1577 << " ** WARNING: forcing the use of stale snapshot data **"
1578 << TEXT_NORMAL << std::endl;
1579 }
1580
31f18b77 1581 dout(10) << __FUNC__ << ": rolling back to consistent snap " << cp << dendl;
7c673cae
FG
1582 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)cp);
1583 }
1584
1585 // drop current?
1586 ret = backend->rollback_to(s);
1587 if (ret) {
31f18b77 1588 derr << __FUNC__ << ": error rolling back to " << s << ": "
7c673cae
FG
1589 << cpp_strerror(ret) << dendl;
1590 goto close_basedir_fd;
1591 }
1592 }
1593 }
1594 initial_op_seq = 0;
1595
1596 current_fd = ::open(current_fn.c_str(), O_RDONLY);
1597 if (current_fd < 0) {
1598 ret = -errno;
31f18b77 1599 derr << __FUNC__ << ": error opening: " << current_fn << ": " << cpp_strerror(ret) << dendl;
7c673cae
FG
1600 goto close_basedir_fd;
1601 }
1602
1603 assert(current_fd >= 0);
1604
1605 op_fd = read_op_seq(&initial_op_seq);
1606 if (op_fd < 0) {
1607 ret = op_fd;
31f18b77 1608 derr << __FUNC__ << ": read_op_seq failed" << dendl;
7c673cae
FG
1609 goto close_current_fd;
1610 }
1611
1612 dout(5) << "mount op_seq is " << initial_op_seq << dendl;
1613 if (initial_op_seq == 0) {
1614 derr << "mount initial op seq is 0; something is wrong" << dendl;
1615 ret = -EINVAL;
1616 goto close_current_fd;
1617 }
1618
1619 if (!backend->can_checkpoint()) {
1620 // mark current/ as non-snapshotted so that we don't rollback away
1621 // from it.
1622 int r = ::creat(nosnapfn, 0644);
1623 if (r < 0) {
1624 ret = -errno;
31f18b77 1625 derr << __FUNC__ << ": failed to create current/nosnap" << dendl;
7c673cae
FG
1626 goto close_current_fd;
1627 }
1628 VOID_TEMP_FAILURE_RETRY(::close(r));
1629 } else {
1630 // clear nosnap marker, if present.
1631 ::unlink(nosnapfn);
1632 }
1633
1634 // check fsid with omap
1635 // get omap fsid
1636 int omap_fsid_fd;
1637 char omap_fsid_buf[PATH_MAX];
1638 struct ::stat omap_fsid_stat;
1639 snprintf(omap_fsid_buf, sizeof(omap_fsid_buf), "%s/osd_uuid", omap_dir.c_str());
1640 // if osd_uuid not exists, assume as this omap matchs corresponding osd
1641 if (::stat(omap_fsid_buf, &omap_fsid_stat) != 0){
31f18b77 1642 dout(10) << __FUNC__ << ": osd_uuid not found under omap, "
7c673cae
FG
1643 << "assume as matched."
1644 << dendl;
1645 }else{
1646 // if osd_uuid exists, compares osd_uuid with fsid
1647 omap_fsid_fd = ::open(omap_fsid_buf, O_RDONLY, 0644);
1648 if (omap_fsid_fd < 0) {
1649 ret = -errno;
31f18b77 1650 derr << __FUNC__ << ": error opening '" << omap_fsid_buf << "': "
7c673cae
FG
1651 << cpp_strerror(ret)
1652 << dendl;
1653 goto close_current_fd;
1654 }
1655 ret = read_fsid(omap_fsid_fd, &omap_fsid);
1656 VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd));
1657 omap_fsid_fd = -1; // defensive
1658 if (ret < 0) {
31f18b77 1659 derr << __FUNC__ << ": error reading omap_fsid_fd"
7c673cae
FG
1660 << ", omap_fsid = " << omap_fsid
1661 << cpp_strerror(ret)
1662 << dendl;
1663 goto close_current_fd;
1664 }
1665 if (fsid != omap_fsid) {
31f18b77 1666 derr << __FUNC__ << ": " << omap_fsid_buf
7c673cae
FG
1667 << " has existed omap fsid " << omap_fsid
1668 << " != expected osd fsid " << fsid
1669 << dendl;
1670 ret = -EINVAL;
1671 goto close_current_fd;
1672 }
1673 }
1674
1675 dout(0) << "start omap initiation" << dendl;
1676 if (!(generic_flags & SKIP_MOUNT_OMAP)) {
1677 KeyValueDB * omap_store = KeyValueDB::create(cct,
1678 superblock.omap_backend,
1679 omap_dir);
1680 if (omap_store == NULL)
1681 {
31f18b77 1682 derr << __FUNC__ << ": Error creating " << superblock.omap_backend << dendl;
7c673cae
FG
1683 ret = -1;
1684 goto close_current_fd;
1685 }
1686
1687 if (superblock.omap_backend == "rocksdb")
1688 ret = omap_store->init(cct->_conf->filestore_rocksdb_options);
1689 else
1690 ret = omap_store->init();
1691
1692 if (ret < 0) {
31f18b77 1693 derr << __FUNC__ << ": Error initializing omap_store: " << cpp_strerror(ret) << dendl;
7c673cae
FG
1694 goto close_current_fd;
1695 }
1696
1697 stringstream err;
1698 if (omap_store->create_and_open(err)) {
1699 delete omap_store;
31f18b77 1700 derr << __FUNC__ << ": Error initializing " << superblock.omap_backend
7c673cae
FG
1701 << " : " << err.str() << dendl;
1702 ret = -1;
1703 goto close_current_fd;
1704 }
1705
1706 DBObjectMap *dbomap = new DBObjectMap(cct, omap_store);
1707 ret = dbomap->init(do_update);
1708 if (ret < 0) {
1709 delete dbomap;
31f18b77 1710 derr << __FUNC__ << ": Error initializing DBObjectMap: " << ret << dendl;
7c673cae
FG
1711 goto close_current_fd;
1712 }
1713 stringstream err2;
1714
1715 if (cct->_conf->filestore_debug_omap_check && !dbomap->check(err2)) {
1716 derr << err2.str() << dendl;
1717 delete dbomap;
1718 ret = -EINVAL;
1719 goto close_current_fd;
1720 }
1721 object_map.reset(dbomap);
1722 }
1723
1724 // journal
1725 new_journal();
1726
1727 // select journal mode?
1728 if (journal) {
1729 if (!m_filestore_journal_writeahead &&
1730 !m_filestore_journal_parallel &&
1731 !m_filestore_journal_trailing) {
1732 if (!backend->can_checkpoint()) {
1733 m_filestore_journal_writeahead = true;
31f18b77 1734 dout(0) << __FUNC__ << ": enabling WRITEAHEAD journal mode: checkpoint is not enabled" << dendl;
7c673cae
FG
1735 } else {
1736 m_filestore_journal_parallel = true;
31f18b77 1737 dout(0) << __FUNC__ << ": enabling PARALLEL journal mode: fs, checkpoint is enabled" << dendl;
7c673cae
FG
1738 }
1739 } else {
1740 if (m_filestore_journal_writeahead)
31f18b77 1741 dout(0) << __FUNC__ << ": WRITEAHEAD journal mode explicitly enabled in conf" << dendl;
7c673cae 1742 if (m_filestore_journal_parallel)
31f18b77 1743 dout(0) << __FUNC__ << ": PARALLEL journal mode explicitly enabled in conf" << dendl;
7c673cae 1744 if (m_filestore_journal_trailing)
31f18b77 1745 dout(0) << __FUNC__ << ": TRAILING journal mode explicitly enabled in conf" << dendl;
7c673cae
FG
1746 }
1747 if (m_filestore_journal_writeahead)
1748 journal->set_wait_on_full(true);
1749 } else {
31f18b77 1750 dout(0) << __FUNC__ << ": no journal" << dendl;
7c673cae
FG
1751 }
1752
1753 ret = _sanity_check_fs();
1754 if (ret) {
31f18b77 1755 derr << __FUNC__ << ": _sanity_check_fs failed with error "
7c673cae
FG
1756 << ret << dendl;
1757 goto close_current_fd;
1758 }
1759
1760 // Cleanup possibly invalid collections
1761 {
1762 vector<coll_t> collections;
1763 ret = list_collections(collections, true);
1764 if (ret < 0) {
1765 derr << "Error " << ret << " while listing collections" << dendl;
1766 goto close_current_fd;
1767 }
1768 for (vector<coll_t>::iterator i = collections.begin();
1769 i != collections.end();
1770 ++i) {
1771 Index index;
1772 ret = get_index(*i, &index);
1773 if (ret < 0) {
1774 derr << "Unable to mount index " << *i
1775 << " with error: " << ret << dendl;
1776 goto close_current_fd;
1777 }
1778 assert(NULL != index.index);
1779 RWLock::WLocker l((index.index)->access_lock);
1780
1781 index->cleanup();
1782 }
1783 }
1784 if (!m_disable_wbthrottle) {
1785 wbthrottle.start();
1786 } else {
31f18b77 1787 dout(0) << __FUNC__ << ": INFO: WbThrottle is disabled" << dendl;
7c673cae 1788 if (cct->_conf->filestore_odsync_write) {
31f18b77 1789 dout(0) << __FUNC__ << ": INFO: O_DSYNC write is enabled" << dendl;
7c673cae
FG
1790 }
1791 }
1792 sync_thread.create("filestore_sync");
1793
1794 if (!(generic_flags & SKIP_JOURNAL_REPLAY)) {
1795 ret = journal_replay(initial_op_seq);
1796 if (ret < 0) {
31f18b77 1797 derr << __FUNC__ << ": failed to open journal " << journalpath << ": " << cpp_strerror(ret) << dendl;
7c673cae
FG
1798 if (ret == -ENOTTY) {
1799 derr << "maybe journal is not pointing to a block device and its size "
1800 << "wasn't configured?" << dendl;
1801 }
1802
1803 goto stop_sync;
1804 }
1805 }
1806
1807 {
1808 stringstream err2;
1809 if (cct->_conf->filestore_debug_omap_check && !object_map->check(err2)) {
1810 derr << err2.str() << dendl;
1811 ret = -EINVAL;
1812 goto stop_sync;
1813 }
1814 }
1815
1816 init_temp_collections();
1817
1818 journal_start();
1819
1820 op_tp.start();
1821 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
1822 (*it)->start();
1823 }
1824 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
1825 (*it)->start();
1826 }
1827
1828 timer.init();
1829
1830 // upgrade?
1831 if (cct->_conf->filestore_update_to >= (int)get_target_version()) {
1832 int err = upgrade();
1833 if (err < 0) {
1834 derr << "error converting store" << dendl;
1835 umount();
1836 return err;
1837 }
1838 }
1839
1840 // all okay.
1841 return 0;
1842
1843stop_sync:
1844 // stop sync thread
1845 lock.Lock();
1846 stop = true;
1847 sync_cond.Signal();
1848 lock.Unlock();
1849 sync_thread.join();
1850 if (!m_disable_wbthrottle) {
1851 wbthrottle.stop();
1852 }
1853close_current_fd:
1854 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
1855 current_fd = -1;
1856close_basedir_fd:
1857 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1858 basedir_fd = -1;
1859close_fsid_fd:
1860 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1861 fsid_fd = -1;
1862done:
1863 assert(!m_filestore_fail_eio || ret != -EIO);
1864 delete backend;
1865 backend = NULL;
1866 object_map.reset();
1867 return ret;
1868}
1869
1870void FileStore::init_temp_collections()
1871{
31f18b77 1872 dout(10) << __FUNC__ << dendl;
7c673cae
FG
1873 vector<coll_t> ls;
1874 int r = list_collections(ls, true);
1875 assert(r >= 0);
1876
1877 dout(20) << " ls " << ls << dendl;
1878
1879 SequencerPosition spos;
1880
1881 set<coll_t> temps;
1882 for (vector<coll_t>::iterator p = ls.begin(); p != ls.end(); ++p)
1883 if (p->is_temp())
1884 temps.insert(*p);
1885 dout(20) << " temps " << temps << dendl;
1886
1887 for (vector<coll_t>::iterator p = ls.begin(); p != ls.end(); ++p) {
1888 if (p->is_temp())
1889 continue;
1890 if (p->is_meta())
1891 continue;
1892 coll_t temp = p->get_temp();
1893 if (temps.count(temp)) {
1894 temps.erase(temp);
1895 } else {
31f18b77 1896 dout(10) << __FUNC__ << ": creating " << temp << dendl;
7c673cae
FG
1897 r = _create_collection(temp, 0, spos);
1898 assert(r == 0);
1899 }
1900 }
1901
1902 for (set<coll_t>::iterator p = temps.begin(); p != temps.end(); ++p) {
31f18b77 1903 dout(10) << __FUNC__ << ": removing stray " << *p << dendl;
7c673cae
FG
1904 r = _collection_remove_recursive(*p, spos);
1905 assert(r == 0);
1906 }
1907}
1908
1909int FileStore::umount()
1910{
31f18b77 1911 dout(5) << __FUNC__ << ": " << basedir << dendl;
7c673cae
FG
1912
1913 flush();
1914 sync();
1915 do_force_sync();
1916
1917 lock.Lock();
1918 stop = true;
1919 sync_cond.Signal();
1920 lock.Unlock();
1921 sync_thread.join();
1922 if (!m_disable_wbthrottle){
1923 wbthrottle.stop();
1924 }
1925 op_tp.stop();
1926
1927 journal_stop();
1928 if (!(generic_flags & SKIP_JOURNAL_REPLAY))
1929 journal_write_close();
1930
1931 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
1932 (*it)->stop();
1933 }
1934 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
1935 (*it)->stop();
1936 }
1937
1938 if (fsid_fd >= 0) {
1939 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1940 fsid_fd = -1;
1941 }
1942 if (op_fd >= 0) {
1943 VOID_TEMP_FAILURE_RETRY(::close(op_fd));
1944 op_fd = -1;
1945 }
1946 if (current_fd >= 0) {
1947 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
1948 current_fd = -1;
1949 }
1950 if (basedir_fd >= 0) {
1951 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1952 basedir_fd = -1;
1953 }
1954
1955 force_sync = false;
1956
1957 delete backend;
1958 backend = NULL;
1959
1960 object_map.reset();
1961
1962 {
1963 Mutex::Locker l(sync_entry_timeo_lock);
1964 timer.shutdown();
1965 }
1966
1967 // nothing
1968 return 0;
1969}
1970
1971
1972
1973
1974/// -----------------------------
1975
1976FileStore::Op *FileStore::build_op(vector<Transaction>& tls,
1977 Context *onreadable,
1978 Context *onreadable_sync,
1979 TrackedOpRef osd_op)
1980{
1981 uint64_t bytes = 0, ops = 0;
1982 for (vector<Transaction>::iterator p = tls.begin();
1983 p != tls.end();
1984 ++p) {
1985 bytes += (*p).get_num_bytes();
1986 ops += (*p).get_num_ops();
1987 }
1988
1989 Op *o = new Op;
1990 o->start = ceph_clock_now();
1991 o->tls = std::move(tls);
1992 o->onreadable = onreadable;
1993 o->onreadable_sync = onreadable_sync;
1994 o->ops = ops;
1995 o->bytes = bytes;
1996 o->osd_op = osd_op;
1997 return o;
1998}
1999
2000
2001
2002void FileStore::queue_op(OpSequencer *osr, Op *o)
2003{
2004 // queue op on sequencer, then queue sequencer for the threadpool,
2005 // so that regardless of which order the threads pick up the
2006 // sequencer, the op order will be preserved.
2007
2008 osr->queue(o);
2009 o->trace.event("queued");
2010
2011 logger->inc(l_filestore_ops);
2012 logger->inc(l_filestore_bytes, o->bytes);
2013
31f18b77 2014 dout(5) << __FUNC__ << ": " << o << " seq " << o->op
7c673cae
FG
2015 << " " << *osr
2016 << " " << o->bytes << " bytes"
2017 << " (queue has " << throttle_ops.get_current() << " ops and " << throttle_bytes.get_current() << " bytes)"
2018 << dendl;
2019 op_wq.queue(osr);
2020}
2021
2022void FileStore::op_queue_reserve_throttle(Op *o)
2023{
2024 throttle_ops.get();
2025 throttle_bytes.get(o->bytes);
2026
2027 logger->set(l_filestore_op_queue_ops, throttle_ops.get_current());
2028 logger->set(l_filestore_op_queue_bytes, throttle_bytes.get_current());
2029}
2030
2031void FileStore::op_queue_release_throttle(Op *o)
2032{
2033 throttle_ops.put();
2034 throttle_bytes.put(o->bytes);
2035 logger->set(l_filestore_op_queue_ops, throttle_ops.get_current());
2036 logger->set(l_filestore_op_queue_bytes, throttle_bytes.get_current());
2037}
2038
2039void FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)
2040{
2041 if (!m_disable_wbthrottle) {
2042 wbthrottle.throttle();
2043 }
2044 // inject a stall?
2045 if (cct->_conf->filestore_inject_stall) {
2046 int orig = cct->_conf->filestore_inject_stall;
31f18b77 2047 dout(5) << __FUNC__ << ": filestore_inject_stall " << orig << ", sleeping" << dendl;
7c673cae
FG
2048 sleep(orig);
2049 cct->_conf->set_val("filestore_inject_stall", "0");
31f18b77 2050 dout(5) << __FUNC__ << ": done stalling" << dendl;
7c673cae
FG
2051 }
2052
2053 osr->apply_lock.Lock();
2054 Op *o = osr->peek_queue();
2055 o->trace.event("op_apply_start");
2056 apply_manager.op_apply_start(o->op);
31f18b77 2057 dout(5) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " start" << dendl;
7c673cae
FG
2058 o->trace.event("_do_transactions start");
2059 int r = _do_transactions(o->tls, o->op, &handle);
2060 o->trace.event("op_apply_finish");
2061 apply_manager.op_apply_finish(o->op);
31f18b77 2062 dout(10) << __FUNC__ << ": " << o << " seq " << o->op << " r = " << r
7c673cae
FG
2063 << ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl;
2064
2065 o->tls.clear();
2066
2067}
2068
2069void FileStore::_finish_op(OpSequencer *osr)
2070{
2071 list<Context*> to_queue;
2072 Op *o = osr->dequeue(&to_queue);
2073
2074 utime_t lat = ceph_clock_now();
2075 lat -= o->start;
2076
31f18b77 2077 dout(10) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " lat " << lat << dendl;
7c673cae
FG
2078 osr->apply_lock.Unlock(); // locked in _do_op
2079 o->trace.event("_finish_op");
2080
2081 // called with tp lock held
2082 op_queue_release_throttle(o);
2083
2084 logger->tinc(l_filestore_apply_latency, lat);
2085
2086 if (o->onreadable_sync) {
2087 o->onreadable_sync->complete(0);
2088 }
2089 if (o->onreadable) {
2090 apply_finishers[osr->id % m_apply_finisher_num]->queue(o->onreadable);
2091 }
2092 if (!to_queue.empty()) {
2093 apply_finishers[osr->id % m_apply_finisher_num]->queue(to_queue);
2094 }
2095 delete o;
2096}
2097
2098
2099struct C_JournaledAhead : public Context {
2100 FileStore *fs;
2101 FileStore::OpSequencer *osr;
2102 FileStore::Op *o;
2103 Context *ondisk;
2104
2105 C_JournaledAhead(FileStore *f, FileStore::OpSequencer *os, FileStore::Op *o, Context *ondisk):
2106 fs(f), osr(os), o(o), ondisk(ondisk) { }
2107 void finish(int r) override {
2108 fs->_journaled_ahead(osr, o, ondisk);
2109 }
2110};
2111
2112int FileStore::queue_transactions(Sequencer *posr, vector<Transaction>& tls,
2113 TrackedOpRef osd_op,
2114 ThreadPool::TPHandle *handle)
2115{
2116 Context *onreadable;
2117 Context *ondisk;
2118 Context *onreadable_sync;
2119 ObjectStore::Transaction::collect_contexts(
2120 tls, &onreadable, &ondisk, &onreadable_sync);
2121
2122 if (cct->_conf->objectstore_blackhole) {
31f18b77 2123 dout(0) << __FUNC__ << ": objectstore_blackhole = TRUE, dropping transaction"
7c673cae
FG
2124 << dendl;
2125 delete ondisk;
2126 delete onreadable;
2127 delete onreadable_sync;
2128 return 0;
2129 }
2130
2131 utime_t start = ceph_clock_now();
2132 // set up the sequencer
2133 OpSequencer *osr;
2134 assert(posr);
2135 if (posr->p) {
2136 osr = static_cast<OpSequencer *>(posr->p.get());
31f18b77 2137 dout(5) << __FUNC__ << ": existing " << osr << " " << *osr << dendl;
7c673cae 2138 } else {
31f18b77 2139 osr = new OpSequencer(cct, ++next_osr_id);
7c673cae
FG
2140 osr->set_cct(cct);
2141 osr->parent = posr;
2142 posr->p = osr;
31f18b77 2143 dout(5) << __FUNC__ << ": new " << osr << " " << *osr << dendl;
7c673cae
FG
2144 }
2145
2146 // used to include osr information in tracepoints during transaction apply
2147 for (vector<Transaction>::iterator i = tls.begin(); i != tls.end(); ++i) {
2148 (*i).set_osr(osr);
2149 }
2150
2151 ZTracer::Trace trace;
2152 if (osd_op && osd_op->pg_trace) {
2153 osd_op->store_trace.init("filestore op", &trace_endpoint, &osd_op->pg_trace);
2154 trace = osd_op->store_trace;
2155 }
2156
2157 if (journal && journal->is_writeable() && !m_filestore_journal_trailing) {
2158 Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
2159
2160 //prepare and encode transactions data out of lock
2161 bufferlist tbl;
2162 int orig_len = journal->prepare_entry(o->tls, &tbl);
2163
2164 if (handle)
2165 handle->suspend_tp_timeout();
2166
2167 op_queue_reserve_throttle(o);
2168 journal->reserve_throttle_and_backoff(tbl.length());
2169
2170 if (handle)
2171 handle->reset_tp_timeout();
2172
2173 uint64_t op_num = submit_manager.op_submit_start();
2174 o->op = op_num;
2175 trace.keyval("opnum", op_num);
2176
2177 if (m_filestore_do_dump)
2178 dump_transactions(o->tls, o->op, osr);
2179
2180 if (m_filestore_journal_parallel) {
31f18b77 2181 dout(5) << __FUNC__ << ": (parallel) " << o->op << " " << o->tls << dendl;
7c673cae
FG
2182
2183 trace.keyval("journal mode", "parallel");
2184 trace.event("journal started");
2185 _op_journal_transactions(tbl, orig_len, o->op, ondisk, osd_op);
2186
2187 // queue inside submit_manager op submission lock
2188 queue_op(osr, o);
2189 trace.event("op queued");
2190 } else if (m_filestore_journal_writeahead) {
31f18b77 2191 dout(5) << __FUNC__ << ": (writeahead) " << o->op << " " << o->tls << dendl;
7c673cae
FG
2192
2193 osr->queue_journal(o->op);
2194
2195 trace.keyval("journal mode", "writeahead");
2196 trace.event("journal started");
2197 _op_journal_transactions(tbl, orig_len, o->op,
2198 new C_JournaledAhead(this, osr, o, ondisk),
2199 osd_op);
2200 } else {
2201 ceph_abort();
2202 }
2203 submit_manager.op_submit_finish(op_num);
2204 utime_t end = ceph_clock_now();
2205 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2206 return 0;
2207 }
2208
2209 if (!journal) {
2210 Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
31f18b77 2211 dout(5) << __FUNC__ << ": (no journal) " << o << " " << tls << dendl;
7c673cae
FG
2212
2213 if (handle)
2214 handle->suspend_tp_timeout();
2215
2216 op_queue_reserve_throttle(o);
2217
2218 if (handle)
2219 handle->reset_tp_timeout();
2220
2221 uint64_t op_num = submit_manager.op_submit_start();
2222 o->op = op_num;
2223
2224 if (m_filestore_do_dump)
2225 dump_transactions(o->tls, o->op, osr);
2226
2227 queue_op(osr, o);
2228 trace.keyval("opnum", op_num);
2229 trace.keyval("journal mode", "none");
2230 trace.event("op queued");
2231
2232 if (ondisk)
2233 apply_manager.add_waiter(op_num, ondisk);
2234 submit_manager.op_submit_finish(op_num);
2235 utime_t end = ceph_clock_now();
2236 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2237 return 0;
2238 }
2239
2240 assert(journal);
2241 //prepare and encode transactions data out of lock
2242 bufferlist tbl;
2243 int orig_len = -1;
2244 if (journal->is_writeable()) {
2245 orig_len = journal->prepare_entry(tls, &tbl);
2246 }
2247 uint64_t op = submit_manager.op_submit_start();
31f18b77 2248 dout(5) << __FUNC__ << ": (trailing journal) " << op << " " << tls << dendl;
7c673cae
FG
2249
2250 if (m_filestore_do_dump)
2251 dump_transactions(tls, op, osr);
2252
2253 trace.event("op_apply_start");
2254 trace.keyval("opnum", op);
2255 trace.keyval("journal mode", "trailing");
2256 apply_manager.op_apply_start(op);
2257 trace.event("do_transactions");
2258 int r = do_transactions(tls, op);
2259
2260 if (r >= 0) {
2261 trace.event("journal started");
2262 _op_journal_transactions(tbl, orig_len, op, ondisk, osd_op);
2263 } else {
2264 delete ondisk;
2265 }
2266
2267 // start on_readable finisher after we queue journal item, as on_readable callback
2268 // is allowed to delete the Transaction
2269 if (onreadable_sync) {
2270 onreadable_sync->complete(r);
2271 }
2272 apply_finishers[osr->id % m_apply_finisher_num]->queue(onreadable, r);
2273
2274 submit_manager.op_submit_finish(op);
2275 trace.event("op_apply_finish");
2276 apply_manager.op_apply_finish(op);
2277
2278 utime_t end = ceph_clock_now();
2279 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2280 return r;
2281}
2282
2283void FileStore::_journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk)
2284{
31f18b77 2285 dout(5) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << " " << o->tls << dendl;
7c673cae
FG
2286
2287 o->trace.event("writeahead journal finished");
2288
2289 // this should queue in order because the journal does it's completions in order.
2290 queue_op(osr, o);
2291
2292 list<Context*> to_queue;
2293 osr->dequeue_journal(&to_queue);
2294
2295 // do ondisk completions async, to prevent any onreadable_sync completions
2296 // getting blocked behind an ondisk completion.
2297 if (ondisk) {
2298 dout(10) << " queueing ondisk " << ondisk << dendl;
2299 ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(ondisk);
2300 }
2301 if (!to_queue.empty()) {
2302 ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(to_queue);
2303 }
2304}
2305
2306int FileStore::_do_transactions(
2307 vector<Transaction> &tls,
2308 uint64_t op_seq,
2309 ThreadPool::TPHandle *handle)
2310{
2311 int trans_num = 0;
2312
2313 for (vector<Transaction>::iterator p = tls.begin();
2314 p != tls.end();
2315 ++p, trans_num++) {
2316 _do_transaction(*p, op_seq, trans_num, handle);
2317 if (handle)
2318 handle->reset_tp_timeout();
2319 }
2320
2321 return 0;
2322}
2323
2324void FileStore::_set_global_replay_guard(const coll_t& cid,
2325 const SequencerPosition &spos)
2326{
2327 if (backend->can_checkpoint())
2328 return;
2329
2330 // sync all previous operations on this sequencer
2331 int ret = object_map->sync();
2332 if (ret < 0) {
31f18b77 2333 derr << __FUNC__ << ": omap sync error " << cpp_strerror(ret) << dendl;
7c673cae
FG
2334 assert(0 == "_set_global_replay_guard failed");
2335 }
2336 ret = sync_filesystem(basedir_fd);
2337 if (ret < 0) {
31f18b77 2338 derr << __FUNC__ << ": sync_filesystem error " << cpp_strerror(ret) << dendl;
7c673cae
FG
2339 assert(0 == "_set_global_replay_guard failed");
2340 }
2341
2342 char fn[PATH_MAX];
2343 get_cdir(cid, fn, sizeof(fn));
2344 int fd = ::open(fn, O_RDONLY);
2345 if (fd < 0) {
2346 int err = errno;
31f18b77 2347 derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
7c673cae
FG
2348 assert(0 == "_set_global_replay_guard failed");
2349 }
2350
2351 _inject_failure();
2352
2353 // then record that we did it
2354 bufferlist v;
2355 ::encode(spos, v);
2356 int r = chain_fsetxattr<true, true>(
2357 fd, GLOBAL_REPLAY_GUARD_XATTR, v.c_str(), v.length());
2358 if (r < 0) {
31f18b77 2359 derr << __FUNC__ << ": fsetxattr " << GLOBAL_REPLAY_GUARD_XATTR
7c673cae
FG
2360 << " got " << cpp_strerror(r) << dendl;
2361 assert(0 == "fsetxattr failed");
2362 }
2363
2364 // and make sure our xattr is durable.
2365 ::fsync(fd);
2366
2367 _inject_failure();
2368
2369 VOID_TEMP_FAILURE_RETRY(::close(fd));
31f18b77 2370 dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
7c673cae
FG
2371}
2372
2373int FileStore::_check_global_replay_guard(const coll_t& cid,
2374 const SequencerPosition& spos)
2375{
2376 char fn[PATH_MAX];
2377 get_cdir(cid, fn, sizeof(fn));
2378 int fd = ::open(fn, O_RDONLY);
2379 if (fd < 0) {
31f18b77 2380 dout(10) << __FUNC__ << ": " << cid << " dne" << dendl;
7c673cae
FG
2381 return 1; // if collection does not exist, there is no guard, and we can replay.
2382 }
2383
2384 char buf[100];
2385 int r = chain_fgetxattr(fd, GLOBAL_REPLAY_GUARD_XATTR, buf, sizeof(buf));
2386 if (r < 0) {
31f18b77 2387 dout(20) << __FUNC__ << ": no xattr" << dendl;
7c673cae
FG
2388 assert(!m_filestore_fail_eio || r != -EIO);
2389 VOID_TEMP_FAILURE_RETRY(::close(fd));
2390 return 1; // no xattr
2391 }
2392 bufferlist bl;
2393 bl.append(buf, r);
2394
2395 SequencerPosition opos;
2396 bufferlist::iterator p = bl.begin();
2397 ::decode(opos, p);
2398
2399 VOID_TEMP_FAILURE_RETRY(::close(fd));
2400 return spos >= opos ? 1 : -1;
2401}
2402
2403
2404void FileStore::_set_replay_guard(const coll_t& cid,
2405 const SequencerPosition &spos,
2406 bool in_progress=false)
2407{
2408 char fn[PATH_MAX];
2409 get_cdir(cid, fn, sizeof(fn));
2410 int fd = ::open(fn, O_RDONLY);
2411 if (fd < 0) {
2412 int err = errno;
31f18b77 2413 derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
7c673cae
FG
2414 assert(0 == "_set_replay_guard failed");
2415 }
2416 _set_replay_guard(fd, spos, 0, in_progress);
2417 VOID_TEMP_FAILURE_RETRY(::close(fd));
2418}
2419
2420
2421void FileStore::_set_replay_guard(int fd,
2422 const SequencerPosition& spos,
2423 const ghobject_t *hoid,
2424 bool in_progress)
2425{
2426 if (backend->can_checkpoint())
2427 return;
2428
31f18b77 2429 dout(10) << __FUNC__ << ": " << spos << (in_progress ? " START" : "") << dendl;
7c673cae
FG
2430
2431 _inject_failure();
2432
2433 // first make sure the previous operation commits
2434 ::fsync(fd);
2435
2436 if (!in_progress) {
2437 // sync object_map too. even if this object has a header or keys,
2438 // it have had them in the past and then removed them, so always
2439 // sync.
2440 object_map->sync(hoid, &spos);
2441 }
2442
2443 _inject_failure();
2444
2445 // then record that we did it
2446 bufferlist v(40);
2447 ::encode(spos, v);
2448 ::encode(in_progress, v);
2449 int r = chain_fsetxattr<true, true>(
2450 fd, REPLAY_GUARD_XATTR, v.c_str(), v.length());
2451 if (r < 0) {
2452 derr << "fsetxattr " << REPLAY_GUARD_XATTR << " got " << cpp_strerror(r) << dendl;
2453 assert(0 == "fsetxattr failed");
2454 }
2455
2456 // and make sure our xattr is durable.
2457 ::fsync(fd);
2458
2459 _inject_failure();
2460
31f18b77 2461 dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
7c673cae
FG
2462}
2463
2464void FileStore::_close_replay_guard(const coll_t& cid,
2465 const SequencerPosition &spos)
2466{
2467 char fn[PATH_MAX];
2468 get_cdir(cid, fn, sizeof(fn));
2469 int fd = ::open(fn, O_RDONLY);
2470 if (fd < 0) {
2471 int err = errno;
31f18b77 2472 derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
7c673cae
FG
2473 assert(0 == "_close_replay_guard failed");
2474 }
2475 _close_replay_guard(fd, spos);
2476 VOID_TEMP_FAILURE_RETRY(::close(fd));
2477}
2478
2479void FileStore::_close_replay_guard(int fd, const SequencerPosition& spos,
2480 const ghobject_t *hoid)
2481{
2482 if (backend->can_checkpoint())
2483 return;
2484
31f18b77 2485 dout(10) << __FUNC__ << ": " << spos << dendl;
7c673cae
FG
2486
2487 _inject_failure();
2488
2489 // sync object_map too. even if this object has a header or keys,
2490 // it have had them in the past and then removed them, so always
2491 // sync.
2492 object_map->sync(hoid, &spos);
2493
2494 // then record that we are done with this operation
2495 bufferlist v(40);
2496 ::encode(spos, v);
2497 bool in_progress = false;
2498 ::encode(in_progress, v);
2499 int r = chain_fsetxattr<true, true>(
2500 fd, REPLAY_GUARD_XATTR, v.c_str(), v.length());
2501 if (r < 0) {
2502 derr << "fsetxattr " << REPLAY_GUARD_XATTR << " got " << cpp_strerror(r) << dendl;
2503 assert(0 == "fsetxattr failed");
2504 }
2505
2506 // and make sure our xattr is durable.
2507 ::fsync(fd);
2508
2509 _inject_failure();
2510
31f18b77 2511 dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
7c673cae
FG
2512}
2513
2514int FileStore::_check_replay_guard(const coll_t& cid, const ghobject_t &oid,
2515 const SequencerPosition& spos)
2516{
2517 if (!replaying || backend->can_checkpoint())
2518 return 1;
2519
2520 int r = _check_global_replay_guard(cid, spos);
2521 if (r < 0)
2522 return r;
2523
2524 FDRef fd;
2525 r = lfn_open(cid, oid, false, &fd);
2526 if (r < 0) {
31f18b77 2527 dout(10) << __FUNC__ << ": " << cid << " " << oid << " dne" << dendl;
7c673cae
FG
2528 return 1; // if file does not exist, there is no guard, and we can replay.
2529 }
2530 int ret = _check_replay_guard(**fd, spos);
2531 lfn_close(fd);
2532 return ret;
2533}
2534
2535int FileStore::_check_replay_guard(const coll_t& cid, const SequencerPosition& spos)
2536{
2537 if (!replaying || backend->can_checkpoint())
2538 return 1;
2539
2540 char fn[PATH_MAX];
2541 get_cdir(cid, fn, sizeof(fn));
2542 int fd = ::open(fn, O_RDONLY);
2543 if (fd < 0) {
31f18b77 2544 dout(10) << __FUNC__ << ": " << cid << " dne" << dendl;
7c673cae
FG
2545 return 1; // if collection does not exist, there is no guard, and we can replay.
2546 }
2547 int ret = _check_replay_guard(fd, spos);
2548 VOID_TEMP_FAILURE_RETRY(::close(fd));
2549 return ret;
2550}
2551
2552int FileStore::_check_replay_guard(int fd, const SequencerPosition& spos)
2553{
2554 if (!replaying || backend->can_checkpoint())
2555 return 1;
2556
2557 char buf[100];
2558 int r = chain_fgetxattr(fd, REPLAY_GUARD_XATTR, buf, sizeof(buf));
2559 if (r < 0) {
31f18b77 2560 dout(20) << __FUNC__ << ": no xattr" << dendl;
7c673cae
FG
2561 assert(!m_filestore_fail_eio || r != -EIO);
2562 return 1; // no xattr
2563 }
2564 bufferlist bl;
2565 bl.append(buf, r);
2566
2567 SequencerPosition opos;
2568 bufferlist::iterator p = bl.begin();
2569 ::decode(opos, p);
2570 bool in_progress = false;
2571 if (!p.end()) // older journals don't have this
2572 ::decode(in_progress, p);
2573 if (opos > spos) {
31f18b77 2574 dout(10) << __FUNC__ << ": object has " << opos << " > current pos " << spos
7c673cae
FG
2575 << ", now or in future, SKIPPING REPLAY" << dendl;
2576 return -1;
2577 } else if (opos == spos) {
2578 if (in_progress) {
31f18b77 2579 dout(10) << __FUNC__ << ": object has " << opos << " == current pos " << spos
7c673cae
FG
2580 << ", in_progress=true, CONDITIONAL REPLAY" << dendl;
2581 return 0;
2582 } else {
31f18b77 2583 dout(10) << __FUNC__ << ": object has " << opos << " == current pos " << spos
7c673cae
FG
2584 << ", in_progress=false, SKIPPING REPLAY" << dendl;
2585 return -1;
2586 }
2587 } else {
31f18b77 2588 dout(10) << __FUNC__ << ": object has " << opos << " < current pos " << spos
7c673cae
FG
2589 << ", in past, will replay" << dendl;
2590 return 1;
2591 }
2592}
2593
2594void FileStore::_do_transaction(
2595 Transaction& t, uint64_t op_seq, int trans_num,
2596 ThreadPool::TPHandle *handle)
2597{
31f18b77 2598 dout(10) << __FUNC__ << ": on " << &t << dendl;
7c673cae
FG
2599
2600#ifdef WITH_LTTNG
2601 const char *osr_name = t.get_osr() ? static_cast<OpSequencer*>(t.get_osr())->get_name().c_str() : "<NULL>";
2602#endif
2603
2604 Transaction::iterator i = t.begin();
2605
2606 SequencerPosition spos(op_seq, trans_num, 0);
2607 while (i.have_op()) {
2608 if (handle)
2609 handle->reset_tp_timeout();
2610
2611 Transaction::Op *op = i.decode_op();
2612 int r = 0;
2613
2614 _inject_failure();
2615
2616 switch (op->op) {
2617 case Transaction::OP_NOP:
2618 break;
2619 case Transaction::OP_TOUCH:
2620 {
2621 const coll_t &_cid = i.get_cid(op->cid);
2622 const ghobject_t &oid = i.get_oid(op->oid);
2623 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2624 _cid : _cid.get_temp();
2625 tracepoint(objectstore, touch_enter, osr_name);
2626 if (_check_replay_guard(cid, oid, spos) > 0)
2627 r = _touch(cid, oid);
2628 tracepoint(objectstore, touch_exit, r);
2629 }
2630 break;
2631
2632 case Transaction::OP_WRITE:
2633 {
2634 const coll_t &_cid = i.get_cid(op->cid);
2635 const ghobject_t &oid = i.get_oid(op->oid);
2636 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2637 _cid : _cid.get_temp();
2638 uint64_t off = op->off;
2639 uint64_t len = op->len;
2640 uint32_t fadvise_flags = i.get_fadvise_flags();
2641 bufferlist bl;
2642 i.decode_bl(bl);
2643 tracepoint(objectstore, write_enter, osr_name, off, len);
2644 if (_check_replay_guard(cid, oid, spos) > 0)
2645 r = _write(cid, oid, off, len, bl, fadvise_flags);
2646 tracepoint(objectstore, write_exit, r);
2647 }
2648 break;
2649
2650 case Transaction::OP_ZERO:
2651 {
2652 const coll_t &_cid = i.get_cid(op->cid);
2653 const ghobject_t &oid = i.get_oid(op->oid);
2654 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2655 _cid : _cid.get_temp();
2656 uint64_t off = op->off;
2657 uint64_t len = op->len;
2658 tracepoint(objectstore, zero_enter, osr_name, off, len);
2659 if (_check_replay_guard(cid, oid, spos) > 0)
2660 r = _zero(cid, oid, off, len);
2661 tracepoint(objectstore, zero_exit, r);
2662 }
2663 break;
2664
2665 case Transaction::OP_TRIMCACHE:
2666 {
2667 // deprecated, no-op
2668 }
2669 break;
2670
2671 case Transaction::OP_TRUNCATE:
2672 {
2673 const coll_t &_cid = i.get_cid(op->cid);
2674 const ghobject_t &oid = i.get_oid(op->oid);
2675 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2676 _cid : _cid.get_temp();
2677 uint64_t off = op->off;
2678 tracepoint(objectstore, truncate_enter, osr_name, off);
2679 if (_check_replay_guard(cid, oid, spos) > 0)
2680 r = _truncate(cid, oid, off);
2681 tracepoint(objectstore, truncate_exit, r);
2682 }
2683 break;
2684
2685 case Transaction::OP_REMOVE:
2686 {
2687 const coll_t &_cid = i.get_cid(op->cid);
2688 const ghobject_t &oid = i.get_oid(op->oid);
2689 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2690 _cid : _cid.get_temp();
2691 tracepoint(objectstore, remove_enter, osr_name);
2692 if (_check_replay_guard(cid, oid, spos) > 0)
2693 r = _remove(cid, oid, spos);
2694 tracepoint(objectstore, remove_exit, r);
2695 }
2696 break;
2697
2698 case Transaction::OP_SETATTR:
2699 {
2700 const coll_t &_cid = i.get_cid(op->cid);
2701 const ghobject_t &oid = i.get_oid(op->oid);
2702 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2703 _cid : _cid.get_temp();
2704 string name = i.decode_string();
2705 bufferlist bl;
2706 i.decode_bl(bl);
2707 tracepoint(objectstore, setattr_enter, osr_name);
2708 if (_check_replay_guard(cid, oid, spos) > 0) {
2709 map<string, bufferptr> to_set;
2710 to_set[name] = bufferptr(bl.c_str(), bl.length());
2711 r = _setattrs(cid, oid, to_set, spos);
2712 if (r == -ENOSPC)
2713 dout(0) << " ENOSPC on setxattr on " << cid << "/" << oid
2714 << " name " << name << " size " << bl.length() << dendl;
2715 }
2716 tracepoint(objectstore, setattr_exit, r);
2717 }
2718 break;
2719
2720 case Transaction::OP_SETATTRS:
2721 {
2722 const coll_t &_cid = i.get_cid(op->cid);
2723 const ghobject_t &oid = i.get_oid(op->oid);
2724 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2725 _cid : _cid.get_temp();
2726 map<string, bufferptr> aset;
2727 i.decode_attrset(aset);
2728 tracepoint(objectstore, setattrs_enter, osr_name);
2729 if (_check_replay_guard(cid, oid, spos) > 0)
2730 r = _setattrs(cid, oid, aset, spos);
2731 tracepoint(objectstore, setattrs_exit, r);
2732 if (r == -ENOSPC)
2733 dout(0) << " ENOSPC on setxattrs on " << cid << "/" << oid << dendl;
2734 }
2735 break;
2736
2737 case Transaction::OP_RMATTR:
2738 {
2739 const coll_t &_cid = i.get_cid(op->cid);
2740 const ghobject_t &oid = i.get_oid(op->oid);
2741 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2742 _cid : _cid.get_temp();
2743 string name = i.decode_string();
2744 tracepoint(objectstore, rmattr_enter, osr_name);
2745 if (_check_replay_guard(cid, oid, spos) > 0)
2746 r = _rmattr(cid, oid, name.c_str(), spos);
2747 tracepoint(objectstore, rmattr_exit, r);
2748 }
2749 break;
2750
2751 case Transaction::OP_RMATTRS:
2752 {
2753 const coll_t &_cid = i.get_cid(op->cid);
2754 const ghobject_t &oid = i.get_oid(op->oid);
2755 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2756 _cid : _cid.get_temp();
2757 tracepoint(objectstore, rmattrs_enter, osr_name);
2758 if (_check_replay_guard(cid, oid, spos) > 0)
2759 r = _rmattrs(cid, oid, spos);
2760 tracepoint(objectstore, rmattrs_exit, r);
2761 }
2762 break;
2763
2764 case Transaction::OP_CLONE:
2765 {
2766 const coll_t &_cid = i.get_cid(op->cid);
2767 const ghobject_t &oid = i.get_oid(op->oid);
2768 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2769 _cid : _cid.get_temp();
2770 const ghobject_t &noid = i.get_oid(op->dest_oid);
2771 tracepoint(objectstore, clone_enter, osr_name);
2772 r = _clone(cid, oid, noid, spos);
2773 tracepoint(objectstore, clone_exit, r);
2774 }
2775 break;
2776
2777 case Transaction::OP_CLONERANGE:
2778 {
2779 const coll_t &_cid = i.get_cid(op->cid);
2780 const ghobject_t &oid = i.get_oid(op->oid);
2781 const ghobject_t &noid = i.get_oid(op->dest_oid);
2782 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2783 _cid : _cid.get_temp();
2784 const coll_t &ncid = !_need_temp_object_collection(_cid, noid) ?
2785 _cid : _cid.get_temp();
2786 uint64_t off = op->off;
2787 uint64_t len = op->len;
2788 tracepoint(objectstore, clone_range_enter, osr_name, len);
2789 r = _clone_range(cid, oid, ncid, noid, off, len, off, spos);
2790 tracepoint(objectstore, clone_range_exit, r);
2791 }
2792 break;
2793
2794 case Transaction::OP_CLONERANGE2:
2795 {
2796 const coll_t &_cid = i.get_cid(op->cid);
2797 const ghobject_t &oid = i.get_oid(op->oid);
2798 const ghobject_t &noid = i.get_oid(op->dest_oid);
2799 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2800 _cid : _cid.get_temp();
2801 const coll_t &ncid = !_need_temp_object_collection(_cid, noid) ?
2802 _cid : _cid.get_temp();
2803 uint64_t srcoff = op->off;
2804 uint64_t len = op->len;
2805 uint64_t dstoff = op->dest_off;
2806 tracepoint(objectstore, clone_range2_enter, osr_name, len);
2807 r = _clone_range(cid, oid, ncid, noid, srcoff, len, dstoff, spos);
2808 tracepoint(objectstore, clone_range2_exit, r);
2809 }
2810 break;
2811
2812 case Transaction::OP_MKCOLL:
2813 {
2814 const coll_t &cid = i.get_cid(op->cid);
2815 tracepoint(objectstore, mkcoll_enter, osr_name);
2816 if (_check_replay_guard(cid, spos) > 0)
2817 r = _create_collection(cid, op->split_bits, spos);
2818 tracepoint(objectstore, mkcoll_exit, r);
2819 }
2820 break;
2821
2822 case Transaction::OP_COLL_SET_BITS:
2823 {
2824 const coll_t &cid = i.get_cid(op->cid);
2825 int bits = op->split_bits;
2826 r = _collection_set_bits(cid, bits);
2827 }
2828 break;
2829
2830 case Transaction::OP_COLL_HINT:
2831 {
2832 const coll_t &cid = i.get_cid(op->cid);
2833 uint32_t type = op->hint_type;
2834 bufferlist hint;
2835 i.decode_bl(hint);
2836 bufferlist::iterator hiter = hint.begin();
2837 if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) {
2838 uint32_t pg_num;
2839 uint64_t num_objs;
2840 ::decode(pg_num, hiter);
2841 ::decode(num_objs, hiter);
2842 if (_check_replay_guard(cid, spos) > 0) {
2843 r = _collection_hint_expected_num_objs(cid, pg_num, num_objs, spos);
2844 }
2845 } else {
2846 // Ignore the hint
2847 dout(10) << "Unrecognized collection hint type: " << type << dendl;
2848 }
2849 }
2850 break;
2851
2852 case Transaction::OP_RMCOLL:
2853 {
2854 const coll_t &cid = i.get_cid(op->cid);
2855 tracepoint(objectstore, rmcoll_enter, osr_name);
2856 if (_check_replay_guard(cid, spos) > 0)
2857 r = _destroy_collection(cid);
2858 tracepoint(objectstore, rmcoll_exit, r);
2859 }
2860 break;
2861
2862 case Transaction::OP_COLL_ADD:
2863 {
2864 const coll_t &ocid = i.get_cid(op->cid);
2865 const coll_t &ncid = i.get_cid(op->dest_cid);
2866 const ghobject_t &oid = i.get_oid(op->oid);
2867
2868 assert(oid.hobj.pool >= -1);
2869
2870 // always followed by OP_COLL_REMOVE
2871 Transaction::Op *op2 = i.decode_op();
2872 const coll_t &ocid2 = i.get_cid(op2->cid);
2873 const ghobject_t &oid2 = i.get_oid(op2->oid);
2874 assert(op2->op == Transaction::OP_COLL_REMOVE);
2875 assert(ocid2 == ocid);
2876 assert(oid2 == oid);
2877
2878 tracepoint(objectstore, coll_add_enter);
2879 r = _collection_add(ncid, ocid, oid, spos);
2880 tracepoint(objectstore, coll_add_exit, r);
2881 spos.op++;
2882 if (r < 0)
2883 break;
2884 tracepoint(objectstore, coll_remove_enter, osr_name);
2885 if (_check_replay_guard(ocid, oid, spos) > 0)
2886 r = _remove(ocid, oid, spos);
2887 tracepoint(objectstore, coll_remove_exit, r);
2888 }
2889 break;
2890
2891 case Transaction::OP_COLL_MOVE:
2892 {
2893 // WARNING: this is deprecated and buggy; only here to replay old journals.
2894 const coll_t &ocid = i.get_cid(op->cid);
2895 const coll_t &ncid = i.get_cid(op->dest_cid);
2896 const ghobject_t &oid = i.get_oid(op->oid);
2897 tracepoint(objectstore, coll_move_enter);
2898 r = _collection_add(ocid, ncid, oid, spos);
2899 if (r == 0 &&
2900 (_check_replay_guard(ocid, oid, spos) > 0))
2901 r = _remove(ocid, oid, spos);
2902 tracepoint(objectstore, coll_move_exit, r);
2903 }
2904 break;
2905
2906 case Transaction::OP_COLL_MOVE_RENAME:
2907 {
2908 const coll_t &_oldcid = i.get_cid(op->cid);
2909 const ghobject_t &oldoid = i.get_oid(op->oid);
2910 const coll_t &_newcid = i.get_cid(op->dest_cid);
2911 const ghobject_t &newoid = i.get_oid(op->dest_oid);
2912 const coll_t &oldcid = !_need_temp_object_collection(_oldcid, oldoid) ?
2913 _oldcid : _oldcid.get_temp();
2914 const coll_t &newcid = !_need_temp_object_collection(_newcid, newoid) ?
2915 _oldcid : _newcid.get_temp();
2916 tracepoint(objectstore, coll_move_rename_enter);
2917 r = _collection_move_rename(oldcid, oldoid, newcid, newoid, spos);
2918 tracepoint(objectstore, coll_move_rename_exit, r);
2919 }
2920 break;
2921
2922 case Transaction::OP_TRY_RENAME:
2923 {
2924 const coll_t &_cid = i.get_cid(op->cid);
2925 const ghobject_t &oldoid = i.get_oid(op->oid);
2926 const ghobject_t &newoid = i.get_oid(op->dest_oid);
2927 const coll_t &oldcid = !_need_temp_object_collection(_cid, oldoid) ?
2928 _cid : _cid.get_temp();
2929 const coll_t &newcid = !_need_temp_object_collection(_cid, newoid) ?
2930 _cid : _cid.get_temp();
2931 tracepoint(objectstore, coll_try_rename_enter);
2932 r = _collection_move_rename(oldcid, oldoid, newcid, newoid, spos, true);
2933 tracepoint(objectstore, coll_try_rename_exit, r);
2934 }
2935 break;
2936
2937 case Transaction::OP_COLL_SETATTR:
2938 case Transaction::OP_COLL_RMATTR:
2939 assert(0 == "collection attr methods no longer implemented");
2940 break;
2941
2942 case Transaction::OP_STARTSYNC:
2943 tracepoint(objectstore, startsync_enter, osr_name);
2944 _start_sync();
2945 tracepoint(objectstore, startsync_exit);
2946 break;
2947
2948 case Transaction::OP_COLL_RENAME:
2949 {
2950 r = -EOPNOTSUPP;
2951 }
2952 break;
2953
2954 case Transaction::OP_OMAP_CLEAR:
2955 {
2956 const coll_t &_cid = i.get_cid(op->cid);
2957 const ghobject_t &oid = i.get_oid(op->oid);
2958 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2959 _cid : _cid.get_temp();
2960 tracepoint(objectstore, omap_clear_enter, osr_name);
2961 r = _omap_clear(cid, oid, spos);
2962 tracepoint(objectstore, omap_clear_exit, r);
2963 }
2964 break;
2965 case Transaction::OP_OMAP_SETKEYS:
2966 {
2967 const coll_t &_cid = i.get_cid(op->cid);
2968 const ghobject_t &oid = i.get_oid(op->oid);
2969 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2970 _cid : _cid.get_temp();
2971 map<string, bufferlist> aset;
2972 i.decode_attrset(aset);
2973 tracepoint(objectstore, omap_setkeys_enter, osr_name);
2974 r = _omap_setkeys(cid, oid, aset, spos);
2975 tracepoint(objectstore, omap_setkeys_exit, r);
2976 }
2977 break;
2978 case Transaction::OP_OMAP_RMKEYS:
2979 {
2980 const coll_t &_cid = i.get_cid(op->cid);
2981 const ghobject_t &oid = i.get_oid(op->oid);
2982 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2983 _cid : _cid.get_temp();
2984 set<string> keys;
2985 i.decode_keyset(keys);
2986 tracepoint(objectstore, omap_rmkeys_enter, osr_name);
2987 r = _omap_rmkeys(cid, oid, keys, spos);
2988 tracepoint(objectstore, omap_rmkeys_exit, r);
2989 }
2990 break;
2991 case Transaction::OP_OMAP_RMKEYRANGE:
2992 {
2993 const coll_t &_cid = i.get_cid(op->cid);
2994 const ghobject_t &oid = i.get_oid(op->oid);
2995 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2996 _cid : _cid.get_temp();
2997 string first, last;
2998 first = i.decode_string();
2999 last = i.decode_string();
3000 tracepoint(objectstore, omap_rmkeyrange_enter, osr_name);
3001 r = _omap_rmkeyrange(cid, oid, first, last, spos);
3002 tracepoint(objectstore, omap_rmkeyrange_exit, r);
3003 }
3004 break;
3005 case Transaction::OP_OMAP_SETHEADER:
3006 {
3007 const coll_t &_cid = i.get_cid(op->cid);
3008 const ghobject_t &oid = i.get_oid(op->oid);
3009 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3010 _cid : _cid.get_temp();
3011 bufferlist bl;
3012 i.decode_bl(bl);
3013 tracepoint(objectstore, omap_setheader_enter, osr_name);
3014 r = _omap_setheader(cid, oid, bl, spos);
3015 tracepoint(objectstore, omap_setheader_exit, r);
3016 }
3017 break;
3018 case Transaction::OP_SPLIT_COLLECTION:
3019 {
3020 assert(0 == "not legacy journal; upgrade to firefly first");
3021 }
3022 break;
3023 case Transaction::OP_SPLIT_COLLECTION2:
3024 {
3025 coll_t cid = i.get_cid(op->cid);
3026 uint32_t bits = op->split_bits;
3027 uint32_t rem = op->split_rem;
3028 coll_t dest = i.get_cid(op->dest_cid);
3029 tracepoint(objectstore, split_coll2_enter, osr_name);
3030 r = _split_collection(cid, bits, rem, dest, spos);
3031 tracepoint(objectstore, split_coll2_exit, r);
3032 }
3033 break;
3034
3035 case Transaction::OP_SETALLOCHINT:
3036 {
3037 const coll_t &_cid = i.get_cid(op->cid);
3038 const ghobject_t &oid = i.get_oid(op->oid);
3039 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3040 _cid : _cid.get_temp();
3041 uint64_t expected_object_size = op->expected_object_size;
3042 uint64_t expected_write_size = op->expected_write_size;
3043 tracepoint(objectstore, setallochint_enter, osr_name);
3044 if (_check_replay_guard(cid, oid, spos) > 0)
3045 r = _set_alloc_hint(cid, oid, expected_object_size,
3046 expected_write_size);
3047 tracepoint(objectstore, setallochint_exit, r);
3048 }
3049 break;
3050
3051 default:
3052 derr << "bad op " << op->op << dendl;
3053 ceph_abort();
3054 }
3055
3056 if (r < 0) {
3057 bool ok = false;
3058
3059 if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE ||
3060 op->op == Transaction::OP_CLONE ||
3061 op->op == Transaction::OP_CLONERANGE2 ||
3062 op->op == Transaction::OP_COLL_ADD ||
3063 op->op == Transaction::OP_SETATTR ||
3064 op->op == Transaction::OP_SETATTRS ||
3065 op->op == Transaction::OP_RMATTR ||
3066 op->op == Transaction::OP_OMAP_SETKEYS ||
3067 op->op == Transaction::OP_OMAP_RMKEYS ||
3068 op->op == Transaction::OP_OMAP_RMKEYRANGE ||
3069 op->op == Transaction::OP_OMAP_SETHEADER))
3070 // -ENOENT is normally okay
3071 // ...including on a replayed OP_RMCOLL with checkpoint mode
3072 ok = true;
3073 if (r == -ENODATA)
3074 ok = true;
3075
3076 if (op->op == Transaction::OP_SETALLOCHINT)
3077 // Either EOPNOTSUPP or EINVAL most probably. EINVAL in most
3078 // cases means invalid hint size (e.g. too big, not a multiple
3079 // of block size, etc) or, at least on xfs, an attempt to set
3080 // or change it when the file is not empty. However,
3081 // OP_SETALLOCHINT is advisory, so ignore all errors.
3082 ok = true;
3083
3084 if (replaying && !backend->can_checkpoint()) {
3085 if (r == -EEXIST && op->op == Transaction::OP_MKCOLL) {
3086 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3087 ok = true;
3088 }
3089 if (r == -EEXIST && op->op == Transaction::OP_COLL_ADD) {
3090 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3091 ok = true;
3092 }
3093 if (r == -EEXIST && op->op == Transaction::OP_COLL_MOVE) {
3094 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3095 ok = true;
3096 }
3097 if (r == -ERANGE) {
3098 dout(10) << "tolerating ERANGE on replay" << dendl;
3099 ok = true;
3100 }
3101 if (r == -ENOENT) {
3102 dout(10) << "tolerating ENOENT on replay" << dendl;
3103 ok = true;
3104 }
3105 }
3106
3107 if (!ok) {
3108 const char *msg = "unexpected error code";
3109
3110 if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE ||
3111 op->op == Transaction::OP_CLONE ||
3112 op->op == Transaction::OP_CLONERANGE2)) {
3113 msg = "ENOENT on clone suggests osd bug";
3114 } else if (r == -ENOSPC) {
3115 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
3116 // by partially applying transactions.
3117 msg = "ENOSPC from disk filesystem, misconfigured cluster";
3118 } else if (r == -ENOTEMPTY) {
3119 msg = "ENOTEMPTY suggests garbage data in osd data dir";
3120 } else if (r == -EPERM) {
3121 msg = "EPERM suggests file(s) in osd data dir not owned by ceph user, or leveldb corruption";
3122 }
3123
3124 derr << " error " << cpp_strerror(r) << " not handled on operation " << op
3125 << " (" << spos << ", or op " << spos.op << ", counting from 0)" << dendl;
3126 dout(0) << msg << dendl;
3127 dout(0) << " transaction dump:\n";
3128 JSONFormatter f(true);
3129 f.open_object_section("transaction");
3130 t.dump(&f);
3131 f.close_section();
3132 f.flush(*_dout);
3133 *_dout << dendl;
3134
3135 if (r == -EMFILE) {
3136 dump_open_fds(cct);
3137 }
3138
3139 assert(0 == "unexpected error");
3140 }
3141 }
3142
3143 spos.op++;
3144 }
3145
3146 _inject_failure();
3147}
3148
3149 /*********************************************/
3150
3151
3152
3153// --------------------
3154// objects
3155
3156bool FileStore::exists(const coll_t& _cid, const ghobject_t& oid)
3157{
3158 tracepoint(objectstore, exists_enter, _cid.c_str());
3159 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3160 struct stat st;
3161 bool retval = stat(cid, oid, &st) == 0;
3162 tracepoint(objectstore, exists_exit, retval);
3163 return retval;
3164}
3165
3166int FileStore::stat(
3167 const coll_t& _cid, const ghobject_t& oid, struct stat *st, bool allow_eio)
3168{
3169 tracepoint(objectstore, stat_enter, _cid.c_str());
3170 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3171 int r = lfn_stat(cid, oid, st);
3172 assert(allow_eio || !m_filestore_fail_eio || r != -EIO);
3173 if (r < 0) {
31f18b77 3174 dout(10) << __FUNC__ << ": " << cid << "/" << oid
7c673cae
FG
3175 << " = " << r << dendl;
3176 } else {
31f18b77 3177 dout(10) << __FUNC__ << ": " << cid << "/" << oid
7c673cae
FG
3178 << " = " << r
3179 << " (size " << st->st_size << ")" << dendl;
3180 }
3181 if (cct->_conf->filestore_debug_inject_read_err &&
3182 debug_mdata_eio(oid)) {
3183 return -EIO;
3184 } else {
3185 tracepoint(objectstore, stat_exit, r);
3186 return r;
3187 }
3188}
3189
3190int FileStore::set_collection_opts(
3191 const coll_t& cid,
3192 const pool_opts_t& opts)
3193{
3194 return -EOPNOTSUPP;
3195}
3196
3197int FileStore::read(
3198 const coll_t& _cid,
3199 const ghobject_t& oid,
3200 uint64_t offset,
3201 size_t len,
3202 bufferlist& bl,
224ce89b 3203 uint32_t op_flags)
7c673cae
FG
3204{
3205 int got;
3206 tracepoint(objectstore, read_enter, _cid.c_str(), offset, len);
3207 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3208
31f18b77 3209 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
7c673cae
FG
3210
3211 FDRef fd;
3212 int r = lfn_open(cid, oid, false, &fd);
3213 if (r < 0) {
31f18b77 3214 dout(10) << __FUNC__ << ": (" << cid << "/" << oid << ") open error: "
7c673cae
FG
3215 << cpp_strerror(r) << dendl;
3216 return r;
3217 }
3218
3219 if (offset == 0 && len == 0) {
3220 struct stat st;
3221 memset(&st, 0, sizeof(struct stat));
3222 int r = ::fstat(**fd, &st);
3223 assert(r == 0);
3224 len = st.st_size;
3225 }
3226
3227#ifdef HAVE_POSIX_FADVISE
3228 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_RANDOM)
3229 posix_fadvise(**fd, offset, len, POSIX_FADV_RANDOM);
3230 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL)
3231 posix_fadvise(**fd, offset, len, POSIX_FADV_SEQUENTIAL);
3232#endif
3233
3234 bufferptr bptr(len); // prealloc space for entire read
3235 got = safe_pread(**fd, bptr.c_str(), len, offset);
3236 if (got < 0) {
31f18b77 3237 dout(10) << __FUNC__ << ": (" << cid << "/" << oid << ") pread error: " << cpp_strerror(got) << dendl;
7c673cae 3238 lfn_close(fd);
7c673cae
FG
3239 return got;
3240 }
3241 bptr.set_length(got); // properly size the buffer
3242 bl.clear();
3243 bl.push_back(std::move(bptr)); // put it in the target bufferlist
3244
3245#ifdef HAVE_POSIX_FADVISE
3246 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED)
3247 posix_fadvise(**fd, offset, len, POSIX_FADV_DONTNEED);
3248 if (op_flags & (CEPH_OSD_OP_FLAG_FADVISE_RANDOM | CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL))
3249 posix_fadvise(**fd, offset, len, POSIX_FADV_NORMAL);
3250#endif
3251
3252 if (m_filestore_sloppy_crc && (!replaying || backend->can_checkpoint())) {
3253 ostringstream ss;
3254 int errors = backend->_crc_verify_read(**fd, offset, got, bl, &ss);
3255 if (errors != 0) {
31f18b77 3256 dout(0) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~"
7c673cae
FG
3257 << got << " ... BAD CRC:\n" << ss.str() << dendl;
3258 assert(0 == "bad crc on read");
3259 }
3260 }
3261
3262 lfn_close(fd);
3263
31f18b77 3264 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~"
7c673cae
FG
3265 << got << "/" << len << dendl;
3266 if (cct->_conf->filestore_debug_inject_read_err &&
3267 debug_data_eio(oid)) {
3268 return -EIO;
224ce89b
WB
3269 } else if (cct->_conf->filestore_debug_random_read_err &&
3270 (rand() % (int)(cct->_conf->filestore_debug_random_read_err * 100.0)) == 0) {
3271 dout(0) << __func__ << ": inject random EIO" << dendl;
3272 return -EIO;
7c673cae
FG
3273 } else {
3274 tracepoint(objectstore, read_exit, got);
3275 return got;
3276 }
3277}
3278
3279int FileStore::_do_fiemap(int fd, uint64_t offset, size_t len,
3280 map<uint64_t, uint64_t> *m)
3281{
3282 uint64_t i;
3283 struct fiemap_extent *extent = NULL;
3284 struct fiemap *fiemap = NULL;
3285 int r = 0;
3286
3287more:
3288 r = backend->do_fiemap(fd, offset, len, &fiemap);
3289 if (r < 0)
3290 return r;
3291
3292 if (fiemap->fm_mapped_extents == 0) {
3293 free(fiemap);
3294 return r;
3295 }
3296
3297 extent = &fiemap->fm_extents[0];
3298
3299 /* start where we were asked to start */
3300 if (extent->fe_logical < offset) {
3301 extent->fe_length -= offset - extent->fe_logical;
3302 extent->fe_logical = offset;
3303 }
3304
3305 i = 0;
3306
3307 struct fiemap_extent *last = nullptr;
3308 while (i < fiemap->fm_mapped_extents) {
3309 struct fiemap_extent *next = extent + 1;
3310
31f18b77 3311 dout(10) << __FUNC__ << ": fm_mapped_extents=" << fiemap->fm_mapped_extents
7c673cae
FG
3312 << " fe_logical=" << extent->fe_logical << " fe_length=" << extent->fe_length << dendl;
3313
3314 /* try to merge extents */
3315 while ((i < fiemap->fm_mapped_extents - 1) &&
3316 (extent->fe_logical + extent->fe_length == next->fe_logical)) {
3317 next->fe_length += extent->fe_length;
3318 next->fe_logical = extent->fe_logical;
3319 extent = next;
3320 next = extent + 1;
3321 i++;
3322 }
3323
3324 if (extent->fe_logical + extent->fe_length > offset + len)
3325 extent->fe_length = offset + len - extent->fe_logical;
3326 (*m)[extent->fe_logical] = extent->fe_length;
3327 i++;
3328 last = extent++;
3329 }
3330 uint64_t xoffset = last->fe_logical + last->fe_length - offset;
3331 offset = last->fe_logical + last->fe_length;
3332 len -= xoffset;
3333 const bool is_last = (last->fe_flags & FIEMAP_EXTENT_LAST) || (len == 0);
3334 free(fiemap);
3335 if (!is_last) {
3336 goto more;
3337 }
3338
3339 return r;
3340}
3341
3342int FileStore::_do_seek_hole_data(int fd, uint64_t offset, size_t len,
3343 map<uint64_t, uint64_t> *m)
3344{
3345#if defined(__linux__) && defined(SEEK_HOLE) && defined(SEEK_DATA)
3346 off_t hole_pos, data_pos;
3347 int r = 0;
3348
3349 // If lseek fails with errno setting to be ENXIO, this means the current
3350 // file offset is beyond the end of the file.
3351 off_t start = offset;
3352 while(start < (off_t)(offset + len)) {
3353 data_pos = lseek(fd, start, SEEK_DATA);
3354 if (data_pos < 0) {
3355 if (errno == ENXIO)
3356 break;
3357 else {
3358 r = -errno;
3359 dout(10) << "failed to lseek: " << cpp_strerror(r) << dendl;
3360 return r;
3361 }
3362 } else if (data_pos > (off_t)(offset + len)) {
3363 break;
3364 }
3365
3366 hole_pos = lseek(fd, data_pos, SEEK_HOLE);
3367 if (hole_pos < 0) {
3368 if (errno == ENXIO) {
3369 break;
3370 } else {
3371 r = -errno;
3372 dout(10) << "failed to lseek: " << cpp_strerror(r) << dendl;
3373 return r;
3374 }
3375 }
3376
3377 if (hole_pos >= (off_t)(offset + len)) {
3378 (*m)[data_pos] = offset + len - data_pos;
3379 break;
3380 }
3381 (*m)[data_pos] = hole_pos - data_pos;
3382 start = hole_pos;
3383 }
3384
3385 return r;
3386#else
3387 (*m)[offset] = len;
3388 return 0;
3389#endif
3390}
3391
3392int FileStore::fiemap(const coll_t& _cid, const ghobject_t& oid,
3393 uint64_t offset, size_t len,
3394 bufferlist& bl)
3395{
3396 map<uint64_t, uint64_t> exomap;
3397 int r = fiemap(_cid, oid, offset, len, exomap);
3398 if (r >= 0) {
3399 ::encode(exomap, bl);
3400 }
3401 return r;
3402}
3403
3404int FileStore::fiemap(const coll_t& _cid, const ghobject_t& oid,
3405 uint64_t offset, size_t len,
3406 map<uint64_t, uint64_t>& destmap)
3407{
3408 tracepoint(objectstore, fiemap_enter, _cid.c_str(), offset, len);
3409 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3410 destmap.clear();
3411
3412 if ((!backend->has_seek_data_hole() && !backend->has_fiemap()) ||
3413 len <= (size_t)m_filestore_fiemap_threshold) {
3414 destmap[offset] = len;
3415 return 0;
3416 }
3417
31f18b77 3418 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
7c673cae
FG
3419
3420 FDRef fd;
3421
3422 int r = lfn_open(cid, oid, false, &fd);
3423 if (r < 0) {
3424 dout(10) << "read couldn't open " << cid << "/" << oid << ": " << cpp_strerror(r) << dendl;
3425 goto done;
3426 }
3427
3428 if (backend->has_seek_data_hole()) {
3429 dout(15) << "seek_data/seek_hole " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3430 r = _do_seek_hole_data(**fd, offset, len, &destmap);
3431 } else if (backend->has_fiemap()) {
3432 dout(15) << "fiemap ioctl" << cid << "/" << oid << " " << offset << "~" << len << dendl;
3433 r = _do_fiemap(**fd, offset, len, &destmap);
3434 }
3435
3436 lfn_close(fd);
3437
3438done:
3439
31f18b77 3440 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << " num_extents=" << destmap.size() << " " << destmap << dendl;
7c673cae
FG
3441 assert(!m_filestore_fail_eio || r != -EIO);
3442 tracepoint(objectstore, fiemap_exit, r);
3443 return r;
3444}
3445
3446int FileStore::_remove(const coll_t& cid, const ghobject_t& oid,
3447 const SequencerPosition &spos)
3448{
31f18b77 3449 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
7c673cae 3450 int r = lfn_unlink(cid, oid, spos);
31f18b77 3451 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
7c673cae
FG
3452 return r;
3453}
3454
3455int FileStore::_truncate(const coll_t& cid, const ghobject_t& oid, uint64_t size)
3456{
31f18b77 3457 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " size " << size << dendl;
7c673cae 3458 int r = lfn_truncate(cid, oid, size);
31f18b77 3459 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " size " << size << " = " << r << dendl;
7c673cae
FG
3460 return r;
3461}
3462
3463
3464int FileStore::_touch(const coll_t& cid, const ghobject_t& oid)
3465{
31f18b77 3466 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
7c673cae
FG
3467
3468 FDRef fd;
3469 int r = lfn_open(cid, oid, true, &fd);
3470 if (r < 0) {
3471 return r;
3472 } else {
3473 lfn_close(fd);
3474 }
31f18b77 3475 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
7c673cae
FG
3476 return r;
3477}
3478
3479int FileStore::_write(const coll_t& cid, const ghobject_t& oid,
3480 uint64_t offset, size_t len,
3481 const bufferlist& bl, uint32_t fadvise_flags)
3482{
31f18b77 3483 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
7c673cae
FG
3484 int r;
3485
3486 FDRef fd;
3487 r = lfn_open(cid, oid, true, &fd);
3488 if (r < 0) {
31f18b77 3489 dout(0) << __FUNC__ << ": couldn't open " << cid << "/"
7c673cae
FG
3490 << oid << ": "
3491 << cpp_strerror(r) << dendl;
3492 goto out;
3493 }
3494
3495 // write
3496 r = bl.write_fd(**fd, offset);
3497 if (r < 0) {
31f18b77 3498 derr << __FUNC__ << ": write_fd on " << cid << "/" << oid
7c673cae
FG
3499 << " error: " << cpp_strerror(r) << dendl;
3500 lfn_close(fd);
3501 goto out;
3502 }
3503 r = bl.length();
3504
3505 if (r >= 0 && m_filestore_sloppy_crc) {
3506 int rc = backend->_crc_update_write(**fd, offset, len, bl);
3507 assert(rc >= 0);
3508 }
3509
3510 if (replaying || m_disable_wbthrottle) {
3511 if (fadvise_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED) {
3512#ifdef HAVE_POSIX_FADVISE
3513 posix_fadvise(**fd, 0, 0, POSIX_FADV_DONTNEED);
3514#endif
3515 }
3516 } else {
3517 wbthrottle.queue_wb(fd, oid, offset, len,
3518 fadvise_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
3519 }
3520
3521 lfn_close(fd);
3522
3523 out:
31f18b77 3524 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << dendl;
7c673cae
FG
3525 return r;
3526}
3527
3528int FileStore::_zero(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len)
3529{
31f18b77 3530 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
7c673cae
FG
3531 int ret = 0;
3532
3533 if (cct->_conf->filestore_punch_hole) {
3534#ifdef CEPH_HAVE_FALLOCATE
3535# if !defined(DARWIN) && !defined(__FreeBSD__)
3536# ifdef FALLOC_FL_KEEP_SIZE
3537 // first try to punch a hole.
3538 FDRef fd;
3539 ret = lfn_open(cid, oid, false, &fd);
3540 if (ret < 0) {
3541 goto out;
3542 }
3543
3544 struct stat st;
3545 ret = ::fstat(**fd, &st);
3546 if (ret < 0) {
3547 ret = -errno;
3548 lfn_close(fd);
3549 goto out;
3550 }
3551
3552 // first try fallocate
3553 ret = fallocate(**fd, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE,
3554 offset, len);
3555 if (ret < 0) {
3556 ret = -errno;
3557 } else {
3558 // ensure we extent file size, if needed
3559 if (offset + len > (uint64_t)st.st_size) {
3560 ret = ::ftruncate(**fd, offset + len);
3561 if (ret < 0) {
3562 ret = -errno;
3563 lfn_close(fd);
3564 goto out;
3565 }
3566 }
3567 }
3568 lfn_close(fd);
3569
3570 if (ret >= 0 && m_filestore_sloppy_crc) {
3571 int rc = backend->_crc_update_zero(**fd, offset, len);
3572 assert(rc >= 0);
3573 }
3574
3575 if (ret == 0)
3576 goto out; // yay!
3577 if (ret != -EOPNOTSUPP)
3578 goto out; // some other error
3579# endif
3580# endif
3581#endif
3582 }
3583
3584 // lame, kernel is old and doesn't support it.
3585 // write zeros.. yuck!
31f18b77 3586 dout(20) << __FUNC__ << ": falling back to writing zeros" << dendl;
7c673cae
FG
3587 {
3588 bufferlist bl;
3589 bl.append_zero(len);
3590 ret = _write(cid, oid, offset, len, bl);
3591 }
3592
3593#ifdef CEPH_HAVE_FALLOCATE
3594# if !defined(DARWIN) && !defined(__FreeBSD__)
3595# ifdef FALLOC_FL_KEEP_SIZE
3596 out:
3597# endif
3598# endif
3599#endif
31f18b77 3600 dout(20) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << ret << dendl;
7c673cae
FG
3601 return ret;
3602}
3603
3604int FileStore::_clone(const coll_t& cid, const ghobject_t& oldoid, const ghobject_t& newoid,
3605 const SequencerPosition& spos)
3606{
31f18b77 3607 dout(15) << __FUNC__ << ": " << cid << "/" << oldoid << " -> " << cid << "/" << newoid << dendl;
7c673cae
FG
3608
3609 if (_check_replay_guard(cid, newoid, spos) < 0)
3610 return 0;
3611
3612 int r;
3613 FDRef o, n;
3614 {
3615 Index index;
3616 r = lfn_open(cid, oldoid, false, &o, &index);
3617 if (r < 0) {
3618 goto out2;
3619 }
3620 assert(NULL != (index.index));
3621 RWLock::WLocker l((index.index)->access_lock);
3622
3623 r = lfn_open(cid, newoid, true, &n, &index);
3624 if (r < 0) {
3625 goto out;
3626 }
3627 r = ::ftruncate(**n, 0);
3628 if (r < 0) {
3629 r = -errno;
3630 goto out3;
3631 }
3632 struct stat st;
3633 r = ::fstat(**o, &st);
3634 if (r < 0) {
3635 r = -errno;
3636 goto out3;
3637 }
3638
3639 r = _do_clone_range(**o, **n, 0, st.st_size, 0);
3640 if (r < 0) {
3641 goto out3;
3642 }
3643
3644 dout(20) << "objectmap clone" << dendl;
3645 r = object_map->clone(oldoid, newoid, &spos);
3646 if (r < 0 && r != -ENOENT)
3647 goto out3;
3648 }
3649
3650 {
3651 char buf[2];
3652 map<string, bufferptr> aset;
3653 r = _fgetattrs(**o, aset);
3654 if (r < 0)
3655 goto out3;
3656
3657 r = chain_fgetxattr(**o, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
3658 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
3659 r = chain_fsetxattr<true, true>(**n, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
3660 sizeof(XATTR_NO_SPILL_OUT));
3661 } else {
3662 r = chain_fsetxattr<true, true>(**n, XATTR_SPILL_OUT_NAME, XATTR_SPILL_OUT,
3663 sizeof(XATTR_SPILL_OUT));
3664 }
3665 if (r < 0)
3666 goto out3;
3667
3668 r = _fsetattrs(**n, aset);
3669 if (r < 0)
3670 goto out3;
3671 }
3672
3673 // clone is non-idempotent; record our work.
3674 _set_replay_guard(**n, spos, &newoid);
3675
3676 out3:
3677 lfn_close(n);
3678 out:
3679 lfn_close(o);
3680 out2:
31f18b77 3681 dout(10) << __FUNC__ << ": " << cid << "/" << oldoid << " -> " << cid << "/" << newoid << " = " << r << dendl;
7c673cae
FG
3682 assert(!m_filestore_fail_eio || r != -EIO);
3683 return r;
3684}
3685
3686int FileStore::_do_clone_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff)
3687{
31f18b77 3688 dout(20) << __FUNC__ << ": copy " << srcoff << "~" << len << " to " << dstoff << dendl;
7c673cae
FG
3689 return backend->clone_range(from, to, srcoff, len, dstoff);
3690}
3691
3692int FileStore::_do_sparse_copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff)
3693{
31f18b77 3694 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << dendl;
7c673cae
FG
3695 int r = 0;
3696 map<uint64_t, uint64_t> exomap;
3697 // fiemap doesn't allow zero length
3698 if (len == 0)
3699 return 0;
3700
3701 if (backend->has_seek_data_hole()) {
3702 dout(15) << "seek_data/seek_hole " << from << " " << srcoff << "~" << len << dendl;
3703 r = _do_seek_hole_data(from, srcoff, len, &exomap);
3704 } else if (backend->has_fiemap()) {
3705 dout(15) << "fiemap ioctl" << from << " " << srcoff << "~" << len << dendl;
3706 r = _do_fiemap(from, srcoff, len, &exomap);
3707 }
3708
3709
3710 int64_t written = 0;
3711 if (r < 0)
3712 goto out;
3713
3714 for (map<uint64_t, uint64_t>::iterator miter = exomap.begin(); miter != exomap.end(); ++miter) {
3715 uint64_t it_off = miter->first - srcoff + dstoff;
3716 r = _do_copy_range(from, to, miter->first, miter->second, it_off, true);
3717 if (r < 0) {
31f18b77 3718 derr << __FUNC__ << ": copy error at " << miter->first << "~" << miter->second
7c673cae
FG
3719 << " to " << it_off << ", " << cpp_strerror(r) << dendl;
3720 break;
3721 }
3722 written += miter->second;
3723 }
3724
3725 if (r >= 0) {
3726 if (m_filestore_sloppy_crc) {
3727 int rc = backend->_crc_update_clone_range(from, to, srcoff, len, dstoff);
3728 assert(rc >= 0);
3729 }
3730 struct stat st;
3731 r = ::fstat(to, &st);
3732 if (r < 0) {
3733 r = -errno;
31f18b77 3734 derr << __FUNC__ << ": fstat error at " << to << " " << cpp_strerror(r) << dendl;
7c673cae
FG
3735 goto out;
3736 }
3737 if (st.st_size < (int)(dstoff + len)) {
3738 r = ::ftruncate(to, dstoff + len);
3739 if (r < 0) {
3740 r = -errno;
31f18b77 3741 derr << __FUNC__ << ": ftruncate error at " << dstoff+len << " " << cpp_strerror(r) << dendl;
7c673cae
FG
3742 goto out;
3743 }
3744 }
3745 r = written;
3746 }
3747
3748 out:
31f18b77 3749 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
7c673cae
FG
3750 return r;
3751}
3752
3753int FileStore::_do_copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff, bool skip_sloppycrc)
3754{
31f18b77 3755 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << dendl;
7c673cae
FG
3756 int r = 0;
3757 loff_t pos = srcoff;
3758 loff_t end = srcoff + len;
3759 int buflen = 4096 * 16; //limit by pipe max size.see fcntl
3760
3761#ifdef CEPH_HAVE_SPLICE
3762 if (backend->has_splice()) {
3763 int pipefd[2];
3764 if (pipe(pipefd) < 0) {
3765 r = -errno;
3766 derr << " pipe " << " got " << cpp_strerror(r) << dendl;
3767 return r;
3768 }
3769
3770 loff_t dstpos = dstoff;
3771 while (pos < end) {
3772 int l = MIN(end-pos, buflen);
3773 r = safe_splice(from, &pos, pipefd[1], NULL, l, SPLICE_F_NONBLOCK);
3774 dout(10) << " safe_splice read from " << pos << "~" << l << " got " << r << dendl;
3775 if (r < 0) {
31f18b77 3776 derr << __FUNC__ << ": safe_splice read error at " << pos << "~" << len
7c673cae
FG
3777 << ", " << cpp_strerror(r) << dendl;
3778 break;
3779 }
3780 if (r == 0) {
3781 // hrm, bad source range, wtf.
3782 r = -ERANGE;
31f18b77 3783 derr << __FUNC__ << ": got short read result at " << pos
7c673cae
FG
3784 << " of fd " << from << " len " << len << dendl;
3785 break;
3786 }
3787
3788 r = safe_splice(pipefd[0], NULL, to, &dstpos, r, 0);
3789 dout(10) << " safe_splice write to " << to << " len " << r
3790 << " got " << r << dendl;
3791 if (r < 0) {
31f18b77 3792 derr << __FUNC__ << ": write error at " << pos << "~"
7c673cae
FG
3793 << r << ", " << cpp_strerror(r) << dendl;
3794 break;
3795 }
3796 }
3797 close(pipefd[0]);
3798 close(pipefd[1]);
3799 } else
3800#endif
3801 {
3802 int64_t actual;
3803
3804 actual = ::lseek64(from, srcoff, SEEK_SET);
3805 if (actual != (int64_t)srcoff) {
3806 if (actual < 0)
3807 r = -errno;
3808 else
3809 r = -EINVAL;
3810 derr << "lseek64 to " << srcoff << " got " << cpp_strerror(r) << dendl;
3811 return r;
3812 }
3813 actual = ::lseek64(to, dstoff, SEEK_SET);
3814 if (actual != (int64_t)dstoff) {
3815 if (actual < 0)
3816 r = -errno;
3817 else
3818 r = -EINVAL;
3819 derr << "lseek64 to " << dstoff << " got " << cpp_strerror(r) << dendl;
3820 return r;
3821 }
3822
3823 char buf[buflen];
3824 while (pos < end) {
3825 int l = MIN(end-pos, buflen);
3826 r = ::read(from, buf, l);
3827 dout(25) << " read from " << pos << "~" << l << " got " << r << dendl;
3828 if (r < 0) {
3829 if (errno == EINTR) {
3830 continue;
3831 } else {
3832 r = -errno;
31f18b77 3833 derr << __FUNC__ << ": read error at " << pos << "~" << len
7c673cae
FG
3834 << ", " << cpp_strerror(r) << dendl;
3835 break;
3836 }
3837 }
3838 if (r == 0) {
3839 // hrm, bad source range, wtf.
3840 r = -ERANGE;
31f18b77 3841 derr << __FUNC__ << ": got short read result at " << pos
7c673cae
FG
3842 << " of fd " << from << " len " << len << dendl;
3843 break;
3844 }
3845 int op = 0;
3846 while (op < r) {
3847 int r2 = safe_write(to, buf+op, r-op);
3848 dout(25) << " write to " << to << " len " << (r-op)
3849 << " got " << r2 << dendl;
3850 if (r2 < 0) {
3851 r = r2;
31f18b77 3852 derr << __FUNC__ << ": write error at " << pos << "~"
7c673cae
FG
3853 << r-op << ", " << cpp_strerror(r) << dendl;
3854
3855 break;
3856 }
3857 op += (r-op);
3858 }
3859 if (r < 0)
3860 break;
3861 pos += r;
3862 }
3863 }
3864
3865 if (r < 0 && replaying) {
3866 assert(r == -ERANGE);
31f18b77 3867 derr << __FUNC__ << ": short source tolerated because we are replaying" << dendl;
7c673cae
FG
3868 r = pos - from;;
3869 }
3870 assert(replaying || pos == end);
3871 if (r >= 0 && !skip_sloppycrc && m_filestore_sloppy_crc) {
3872 int rc = backend->_crc_update_clone_range(from, to, srcoff, len, dstoff);
3873 assert(rc >= 0);
3874 }
31f18b77 3875 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
7c673cae
FG
3876 return r;
3877}
3878
3879int FileStore::_clone_range(const coll_t& oldcid, const ghobject_t& oldoid, const coll_t& newcid, const ghobject_t& newoid,
3880 uint64_t srcoff, uint64_t len, uint64_t dstoff,
3881 const SequencerPosition& spos)
3882{
31f18b77 3883 dout(15) << __FUNC__ << ": " << oldcid << "/" << oldoid << " -> " << newcid << "/" << newoid << " " << srcoff << "~" << len << " to " << dstoff << dendl;
7c673cae
FG
3884
3885 if (_check_replay_guard(newcid, newoid, spos) < 0)
3886 return 0;
3887
3888 int r;
3889 FDRef o, n;
3890 r = lfn_open(oldcid, oldoid, false, &o);
3891 if (r < 0) {
3892 goto out2;
3893 }
3894 r = lfn_open(newcid, newoid, true, &n);
3895 if (r < 0) {
3896 goto out;
3897 }
3898 r = _do_clone_range(**o, **n, srcoff, len, dstoff);
3899 if (r < 0) {
3900 goto out3;
3901 }
3902
3903 // clone is non-idempotent; record our work.
3904 _set_replay_guard(**n, spos, &newoid);
3905
3906 out3:
3907 lfn_close(n);
3908 out:
3909 lfn_close(o);
3910 out2:
31f18b77 3911 dout(10) << __FUNC__ << ": " << oldcid << "/" << oldoid << " -> " << newcid << "/" << newoid << " "
7c673cae
FG
3912 << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
3913 return r;
3914}
3915
3916class SyncEntryTimeout : public Context {
3917public:
3918 CephContext* cct;
3919 explicit SyncEntryTimeout(CephContext* cct, int commit_timeo)
3920 : cct(cct), m_commit_timeo(commit_timeo)
3921 {
3922 }
3923
3924 void finish(int r) override {
3925 BackTrace *bt = new BackTrace(1);
3926 generic_dout(-1) << "FileStore: sync_entry timed out after "
3927 << m_commit_timeo << " seconds.\n";
3928 bt->print(*_dout);
3929 *_dout << dendl;
3930 delete bt;
3931 ceph_abort();
3932 }
3933private:
3934 int m_commit_timeo;
3935};
3936
3937void FileStore::sync_entry()
3938{
3939 lock.Lock();
3940 while (!stop) {
3941 utime_t max_interval;
3942 max_interval.set_from_double(m_filestore_max_sync_interval);
3943 utime_t min_interval;
3944 min_interval.set_from_double(m_filestore_min_sync_interval);
3945
3946 utime_t startwait = ceph_clock_now();
3947 if (!force_sync) {
31f18b77 3948 dout(20) << __FUNC__ << ": waiting for max_interval " << max_interval << dendl;
7c673cae
FG
3949 sync_cond.WaitInterval(lock, max_interval);
3950 } else {
31f18b77 3951 dout(20) << __FUNC__ << ": not waiting, force_sync set" << dendl;
7c673cae
FG
3952 }
3953
3954 if (force_sync) {
31f18b77 3955 dout(20) << __FUNC__ << ": force_sync set" << dendl;
7c673cae
FG
3956 force_sync = false;
3957 } else if (stop) {
31f18b77 3958 dout(20) << __FUNC__ << ": stop set" << dendl;
7c673cae
FG
3959 break;
3960 } else {
3961 // wait for at least the min interval
3962 utime_t woke = ceph_clock_now();
3963 woke -= startwait;
31f18b77 3964 dout(20) << __FUNC__ << ": woke after " << woke << dendl;
7c673cae
FG
3965 if (woke < min_interval) {
3966 utime_t t = min_interval;
3967 t -= woke;
31f18b77 3968 dout(20) << __FUNC__ << ": waiting for another " << t
7c673cae
FG
3969 << " to reach min interval " << min_interval << dendl;
3970 sync_cond.WaitInterval(lock, t);
3971 }
3972 }
3973
3974 list<Context*> fin;
3975 again:
3976 fin.swap(sync_waiters);
3977 lock.Unlock();
3978
3979 op_tp.pause();
3980 if (apply_manager.commit_start()) {
3981 utime_t start = ceph_clock_now();
3982 uint64_t cp = apply_manager.get_committing_seq();
3983
3984 sync_entry_timeo_lock.Lock();
3985 SyncEntryTimeout *sync_entry_timeo =
3986 new SyncEntryTimeout(cct, m_filestore_commit_timeout);
224ce89b
WB
3987 if (!timer.add_event_after(m_filestore_commit_timeout,
3988 sync_entry_timeo)) {
3989 sync_entry_timeo = nullptr;
3990 }
7c673cae
FG
3991 sync_entry_timeo_lock.Unlock();
3992
3993 logger->set(l_filestore_committing, 1);
3994
31f18b77 3995 dout(15) << __FUNC__ << ": committing " << cp << dendl;
7c673cae
FG
3996 stringstream errstream;
3997 if (cct->_conf->filestore_debug_omap_check && !object_map->check(errstream)) {
3998 derr << errstream.str() << dendl;
3999 ceph_abort();
4000 }
4001
4002 if (backend->can_checkpoint()) {
4003 int err = write_op_seq(op_fd, cp);
4004 if (err < 0) {
4005 derr << "Error during write_op_seq: " << cpp_strerror(err) << dendl;
4006 assert(0 == "error during write_op_seq");
4007 }
4008
4009 char s[NAME_MAX];
4010 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)cp);
4011 uint64_t cid = 0;
4012 err = backend->create_checkpoint(s, &cid);
4013 if (err < 0) {
4014 int err = errno;
4015 derr << "snap create '" << s << "' got error " << err << dendl;
4016 assert(err == 0);
4017 }
4018
4019 snaps.push_back(cp);
4020 apply_manager.commit_started();
4021 op_tp.unpause();
4022
4023 if (cid > 0) {
4024 dout(20) << " waiting for checkpoint " << cid << " to complete" << dendl;
4025 err = backend->sync_checkpoint(cid);
4026 if (err < 0) {
4027 derr << "ioctl WAIT_SYNC got " << cpp_strerror(err) << dendl;
4028 assert(0 == "wait_sync got error");
4029 }
4030 dout(20) << " done waiting for checkpoint " << cid << " to complete" << dendl;
4031 }
224ce89b 4032 } else {
7c673cae
FG
4033 apply_manager.commit_started();
4034 op_tp.unpause();
4035
4036 int err = object_map->sync();
4037 if (err < 0) {
4038 derr << "object_map sync got " << cpp_strerror(err) << dendl;
4039 assert(0 == "object_map sync returned error");
4040 }
4041
4042 err = backend->syncfs();
4043 if (err < 0) {
4044 derr << "syncfs got " << cpp_strerror(err) << dendl;
4045 assert(0 == "syncfs returned error");
4046 }
4047
4048 err = write_op_seq(op_fd, cp);
4049 if (err < 0) {
4050 derr << "Error during write_op_seq: " << cpp_strerror(err) << dendl;
4051 assert(0 == "error during write_op_seq");
4052 }
4053 err = ::fsync(op_fd);
4054 if (err < 0) {
4055 derr << "Error during fsync of op_seq: " << cpp_strerror(err) << dendl;
4056 assert(0 == "error during fsync of op_seq");
4057 }
4058 }
4059
4060 utime_t done = ceph_clock_now();
4061 utime_t lat = done - start;
4062 utime_t dur = done - startwait;
31f18b77 4063 dout(10) << __FUNC__ << ": commit took " << lat << ", interval was " << dur << dendl;
224ce89b
WB
4064 utime_t max_pause_lat = logger->tget(l_filestore_sync_pause_max_lat);
4065 if (max_pause_lat < dur - lat) {
4066 logger->tinc(l_filestore_sync_pause_max_lat, dur - lat);
4067 }
7c673cae
FG
4068
4069 logger->inc(l_filestore_commitcycle);
4070 logger->tinc(l_filestore_commitcycle_latency, lat);
4071 logger->tinc(l_filestore_commitcycle_interval, dur);
4072
4073 apply_manager.commit_finish();
4074 if (!m_disable_wbthrottle) {
4075 wbthrottle.clear();
4076 }
4077
4078 logger->set(l_filestore_committing, 0);
4079
4080 // remove old snaps?
4081 if (backend->can_checkpoint()) {
4082 char s[NAME_MAX];
4083 while (snaps.size() > 2) {
4084 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)snaps.front());
4085 snaps.pop_front();
4086 dout(10) << "removing snap '" << s << "'" << dendl;
4087 int r = backend->destroy_checkpoint(s);
4088 if (r) {
4089 int err = errno;
4090 derr << "unable to destroy snap '" << s << "' got " << cpp_strerror(err) << dendl;
4091 }
4092 }
4093 }
4094
31f18b77 4095 dout(15) << __FUNC__ << ": committed to op_seq " << cp << dendl;
7c673cae 4096
224ce89b
WB
4097 if (sync_entry_timeo) {
4098 Mutex::Locker lock(sync_entry_timeo_lock);
4099 timer.cancel_event(sync_entry_timeo);
4100 }
7c673cae
FG
4101 } else {
4102 op_tp.unpause();
4103 }
4104
4105 lock.Lock();
4106 finish_contexts(cct, fin, 0);
4107 fin.clear();
4108 if (!sync_waiters.empty()) {
31f18b77 4109 dout(10) << __FUNC__ << ": more waiters, committing again" << dendl;
7c673cae
FG
4110 goto again;
4111 }
4112 if (!stop && journal && journal->should_commit_now()) {
31f18b77 4113 dout(10) << __FUNC__ << ": journal says we should commit again (probably is/was full)" << dendl;
7c673cae
FG
4114 goto again;
4115 }
4116 }
4117 stop = false;
4118 lock.Unlock();
4119}
4120
4121void FileStore::_start_sync()
4122{
4123 if (!journal) { // don't do a big sync if the journal is on
31f18b77 4124 dout(10) << __FUNC__ << dendl;
7c673cae
FG
4125 sync_cond.Signal();
4126 } else {
31f18b77 4127 dout(10) << __FUNC__ << ": - NOOP (journal is on)" << dendl;
7c673cae
FG
4128 }
4129}
4130
4131void FileStore::do_force_sync()
4132{
31f18b77 4133 dout(10) << __FUNC__ << dendl;
7c673cae
FG
4134 Mutex::Locker l(lock);
4135 force_sync = true;
4136 sync_cond.Signal();
4137}
4138
4139void FileStore::start_sync(Context *onsafe)
4140{
4141 Mutex::Locker l(lock);
4142 sync_waiters.push_back(onsafe);
4143 sync_cond.Signal();
4144 force_sync = true;
31f18b77 4145 dout(10) << __FUNC__ << dendl;
7c673cae
FG
4146}
4147
4148void FileStore::sync()
4149{
4150 Mutex l("FileStore::sync");
4151 Cond c;
4152 bool done;
4153 C_SafeCond *fin = new C_SafeCond(&l, &c, &done);
4154
4155 start_sync(fin);
4156
4157 l.Lock();
4158 while (!done) {
4159 dout(10) << "sync waiting" << dendl;
4160 c.Wait(l);
4161 }
4162 l.Unlock();
4163 dout(10) << "sync done" << dendl;
4164}
4165
4166void FileStore::_flush_op_queue()
4167{
31f18b77 4168 dout(10) << __FUNC__ << ": draining op tp" << dendl;
7c673cae 4169 op_wq.drain();
31f18b77 4170 dout(10) << __FUNC__ << ": waiting for apply finisher" << dendl;
7c673cae
FG
4171 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
4172 (*it)->wait_for_empty();
4173 }
4174}
4175
4176/*
4177 * flush - make every queued write readable
4178 */
4179void FileStore::flush()
4180{
31f18b77 4181 dout(10) << __FUNC__ << dendl;
7c673cae
FG
4182
4183 if (cct->_conf->filestore_blackhole) {
4184 // wait forever
4185 Mutex lock("FileStore::flush::lock");
4186 Cond cond;
4187 lock.Lock();
4188 while (true)
4189 cond.Wait(lock);
4190 ceph_abort();
4191 }
4192
4193 if (m_filestore_journal_writeahead) {
4194 if (journal)
4195 journal->flush();
31f18b77 4196 dout(10) << __FUNC__ << ": draining ondisk finisher" << dendl;
7c673cae
FG
4197 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
4198 (*it)->wait_for_empty();
4199 }
4200 }
4201
4202 _flush_op_queue();
31f18b77 4203 dout(10) << __FUNC__ << ": complete" << dendl;
7c673cae
FG
4204}
4205
4206/*
4207 * sync_and_flush - make every queued write readable AND committed to disk
4208 */
4209void FileStore::sync_and_flush()
4210{
31f18b77 4211 dout(10) << __FUNC__ << dendl;
7c673cae
FG
4212
4213 if (m_filestore_journal_writeahead) {
4214 if (journal)
4215 journal->flush();
4216 _flush_op_queue();
4217 } else {
4218 // includes m_filestore_journal_parallel
4219 _flush_op_queue();
4220 sync();
4221 }
31f18b77 4222 dout(10) << __FUNC__ << ": done" << dendl;
7c673cae
FG
4223}
4224
4225int FileStore::flush_journal()
4226{
31f18b77 4227 dout(10) << __FUNC__ << dendl;
7c673cae
FG
4228 sync_and_flush();
4229 sync();
4230 return 0;
4231}
4232
4233int FileStore::snapshot(const string& name)
4234{
31f18b77 4235 dout(10) << __FUNC__ << ": " << name << dendl;
7c673cae
FG
4236 sync_and_flush();
4237
4238 if (!backend->can_checkpoint()) {
31f18b77 4239 dout(0) << __FUNC__ << ": " << name << " failed, not supported" << dendl;
7c673cae
FG
4240 return -EOPNOTSUPP;
4241 }
4242
4243 char s[NAME_MAX];
4244 snprintf(s, sizeof(s), CLUSTER_SNAP_ITEM, name.c_str());
4245
4246 int r = backend->create_checkpoint(s, NULL);
4247 if (r) {
31f18b77 4248 derr << __FUNC__ << ": " << name << " failed: " << cpp_strerror(r) << dendl;
7c673cae
FG
4249 }
4250
4251 return r;
4252}
4253
4254// -------------------------------
4255// attributes
4256
4257int FileStore::_fgetattr(int fd, const char *name, bufferptr& bp)
4258{
4259 char val[CHAIN_XATTR_MAX_BLOCK_LEN];
4260 int l = chain_fgetxattr(fd, name, val, sizeof(val));
4261 if (l >= 0) {
4262 bp = buffer::create(l);
4263 memcpy(bp.c_str(), val, l);
4264 } else if (l == -ERANGE) {
4265 l = chain_fgetxattr(fd, name, 0, 0);
4266 if (l > 0) {
4267 bp = buffer::create(l);
4268 l = chain_fgetxattr(fd, name, bp.c_str(), l);
4269 }
4270 }
4271 assert(!m_filestore_fail_eio || l != -EIO);
4272 return l;
4273}
4274
4275int FileStore::_fgetattrs(int fd, map<string,bufferptr>& aset)
4276{
4277 // get attr list
4278 char names1[100];
4279 int len = chain_flistxattr(fd, names1, sizeof(names1)-1);
4280 char *names2 = 0;
4281 char *name = 0;
4282 if (len == -ERANGE) {
4283 len = chain_flistxattr(fd, 0, 0);
4284 if (len < 0) {
4285 assert(!m_filestore_fail_eio || len != -EIO);
4286 return len;
4287 }
4288 dout(10) << " -ERANGE, len is " << len << dendl;
4289 names2 = new char[len+1];
4290 len = chain_flistxattr(fd, names2, len);
4291 dout(10) << " -ERANGE, got " << len << dendl;
4292 if (len < 0) {
4293 assert(!m_filestore_fail_eio || len != -EIO);
4294 delete[] names2;
4295 return len;
4296 }
4297 name = names2;
4298 } else if (len < 0) {
4299 assert(!m_filestore_fail_eio || len != -EIO);
4300 return len;
4301 } else {
4302 name = names1;
4303 }
4304 name[len] = 0;
4305
4306 char *end = name + len;
4307 while (name < end) {
4308 char *attrname = name;
4309 if (parse_attrname(&name)) {
4310 if (*name) {
31f18b77 4311 dout(20) << __FUNC__ << ": " << fd << " getting '" << name << "'" << dendl;
7c673cae
FG
4312 int r = _fgetattr(fd, attrname, aset[name]);
4313 if (r < 0) {
4314 delete[] names2;
4315 return r;
4316 }
4317 }
4318 }
4319 name += strlen(name) + 1;
4320 }
4321
4322 delete[] names2;
4323 return 0;
4324}
4325
4326int FileStore::_fsetattrs(int fd, map<string, bufferptr> &aset)
4327{
4328 for (map<string, bufferptr>::iterator p = aset.begin();
4329 p != aset.end();
4330 ++p) {
4331 char n[CHAIN_XATTR_MAX_NAME_LEN];
4332 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4333 const char *val;
4334 if (p->second.length())
4335 val = p->second.c_str();
4336 else
4337 val = "";
4338 // ??? Why do we skip setting all the other attrs if one fails?
4339 int r = chain_fsetxattr(fd, n, val, p->second.length());
4340 if (r < 0) {
31f18b77 4341 derr << __FUNC__ << ": chain_setxattr returned " << r << dendl;
7c673cae
FG
4342 return r;
4343 }
4344 }
4345 return 0;
4346}
4347
4348// debug EIO injection
4349void FileStore::inject_data_error(const ghobject_t &oid) {
4350 Mutex::Locker l(read_error_lock);
31f18b77 4351 dout(10) << __FUNC__ << ": init error on " << oid << dendl;
7c673cae
FG
4352 data_error_set.insert(oid);
4353}
4354void FileStore::inject_mdata_error(const ghobject_t &oid) {
4355 Mutex::Locker l(read_error_lock);
31f18b77 4356 dout(10) << __FUNC__ << ": init error on " << oid << dendl;
7c673cae
FG
4357 mdata_error_set.insert(oid);
4358}
224ce89b 4359
7c673cae
FG
4360void FileStore::debug_obj_on_delete(const ghobject_t &oid) {
4361 Mutex::Locker l(read_error_lock);
31f18b77 4362 dout(10) << __FUNC__ << ": clear error on " << oid << dendl;
7c673cae
FG
4363 data_error_set.erase(oid);
4364 mdata_error_set.erase(oid);
4365}
4366bool FileStore::debug_data_eio(const ghobject_t &oid) {
4367 Mutex::Locker l(read_error_lock);
4368 if (data_error_set.count(oid)) {
31f18b77 4369 dout(10) << __FUNC__ << ": inject error on " << oid << dendl;
7c673cae
FG
4370 return true;
4371 } else {
4372 return false;
4373 }
4374}
4375bool FileStore::debug_mdata_eio(const ghobject_t &oid) {
4376 Mutex::Locker l(read_error_lock);
4377 if (mdata_error_set.count(oid)) {
31f18b77 4378 dout(10) << __FUNC__ << ": inject error on " << oid << dendl;
7c673cae
FG
4379 return true;
4380 } else {
4381 return false;
4382 }
4383}
4384
4385
4386// objects
4387
4388int FileStore::getattr(const coll_t& _cid, const ghobject_t& oid, const char *name, bufferptr &bp)
4389{
4390 tracepoint(objectstore, getattr_enter, _cid.c_str());
4391 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
31f18b77 4392 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "'" << dendl;
7c673cae
FG
4393 FDRef fd;
4394 int r = lfn_open(cid, oid, false, &fd);
4395 if (r < 0) {
4396 goto out;
4397 }
4398 char n[CHAIN_XATTR_MAX_NAME_LEN];
4399 get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN);
4400 r = _fgetattr(**fd, n, bp);
4401 lfn_close(fd);
4402 if (r == -ENODATA) {
4403 map<string, bufferlist> got;
4404 set<string> to_get;
4405 to_get.insert(string(name));
4406 Index index;
4407 r = get_index(cid, &index);
4408 if (r < 0) {
31f18b77 4409 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
7c673cae
FG
4410 goto out;
4411 }
4412 r = object_map->get_xattrs(oid, to_get, &got);
4413 if (r < 0 && r != -ENOENT) {
31f18b77 4414 dout(10) << __FUNC__ << ": get_xattrs err r =" << r << dendl;
7c673cae
FG
4415 goto out;
4416 }
4417 if (got.empty()) {
31f18b77 4418 dout(10) << __FUNC__ << ": got.size() is 0" << dendl;
7c673cae
FG
4419 return -ENODATA;
4420 }
4421 bp = bufferptr(got.begin()->second.c_str(),
4422 got.begin()->second.length());
4423 r = bp.length();
4424 }
4425 out:
31f18b77 4426 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "' = " << r << dendl;
7c673cae
FG
4427 assert(!m_filestore_fail_eio || r != -EIO);
4428 if (cct->_conf->filestore_debug_inject_read_err &&
4429 debug_mdata_eio(oid)) {
4430 return -EIO;
4431 } else {
4432 tracepoint(objectstore, getattr_exit, r);
4433 return r < 0 ? r : 0;
4434 }
4435}
4436
4437int FileStore::getattrs(const coll_t& _cid, const ghobject_t& oid, map<string,bufferptr>& aset)
4438{
4439 tracepoint(objectstore, getattrs_enter, _cid.c_str());
4440 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
4441 set<string> omap_attrs;
4442 map<string, bufferlist> omap_aset;
4443 Index index;
31f18b77 4444 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
7c673cae
FG
4445 FDRef fd;
4446 bool spill_out = true;
4447 char buf[2];
4448
4449 int r = lfn_open(cid, oid, false, &fd);
4450 if (r < 0) {
4451 goto out;
4452 }
4453
4454 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4455 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT)))
4456 spill_out = false;
4457
4458 r = _fgetattrs(**fd, aset);
4459 lfn_close(fd);
4460 fd = FDRef(); // defensive
4461 if (r < 0) {
4462 goto out;
4463 }
4464
4465 if (!spill_out) {
31f18b77 4466 dout(10) << __FUNC__ << ": no xattr exists in object_map r = " << r << dendl;
7c673cae
FG
4467 goto out;
4468 }
4469
4470 r = get_index(cid, &index);
4471 if (r < 0) {
31f18b77 4472 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
7c673cae
FG
4473 goto out;
4474 }
4475 {
4476 r = object_map->get_all_xattrs(oid, &omap_attrs);
4477 if (r < 0 && r != -ENOENT) {
31f18b77 4478 dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
7c673cae
FG
4479 goto out;
4480 }
4481
4482 r = object_map->get_xattrs(oid, omap_attrs, &omap_aset);
4483 if (r < 0 && r != -ENOENT) {
31f18b77 4484 dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
7c673cae
FG
4485 goto out;
4486 }
4487 if (r == -ENOENT)
4488 r = 0;
4489 }
4490 assert(omap_attrs.size() == omap_aset.size());
4491 for (map<string, bufferlist>::iterator i = omap_aset.begin();
4492 i != omap_aset.end();
4493 ++i) {
4494 string key(i->first);
4495 aset.insert(make_pair(key,
4496 bufferptr(i->second.c_str(), i->second.length())));
4497 }
4498 out:
31f18b77 4499 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
7c673cae
FG
4500 assert(!m_filestore_fail_eio || r != -EIO);
4501
4502 if (cct->_conf->filestore_debug_inject_read_err &&
4503 debug_mdata_eio(oid)) {
4504 return -EIO;
4505 } else {
4506 tracepoint(objectstore, getattrs_exit, r);
4507 return r;
4508 }
4509}
4510
4511int FileStore::_setattrs(const coll_t& cid, const ghobject_t& oid, map<string,bufferptr>& aset,
4512 const SequencerPosition &spos)
4513{
4514 map<string, bufferlist> omap_set;
4515 set<string> omap_remove;
4516 map<string, bufferptr> inline_set;
4517 map<string, bufferptr> inline_to_set;
4518 FDRef fd;
4519 int spill_out = -1;
4520 bool incomplete_inline = false;
4521
4522 int r = lfn_open(cid, oid, false, &fd);
4523 if (r < 0) {
4524 goto out;
4525 }
4526
4527 char buf[2];
4528 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4529 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT)))
4530 spill_out = 0;
4531 else
4532 spill_out = 1;
4533
4534 r = _fgetattrs(**fd, inline_set);
4535 incomplete_inline = (r == -E2BIG);
4536 assert(!m_filestore_fail_eio || r != -EIO);
31f18b77 4537 dout(15) << __FUNC__ << ": " << cid << "/" << oid
7c673cae
FG
4538 << (incomplete_inline ? " (incomplete_inline, forcing omap)" : "")
4539 << dendl;
4540
4541 for (map<string,bufferptr>::iterator p = aset.begin();
4542 p != aset.end();
4543 ++p) {
4544 char n[CHAIN_XATTR_MAX_NAME_LEN];
4545 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4546
4547 if (incomplete_inline) {
4548 chain_fremovexattr(**fd, n); // ignore any error
4549 omap_set[p->first].push_back(p->second);
4550 continue;
4551 }
4552
4553 if (p->second.length() > m_filestore_max_inline_xattr_size) {
4554 if (inline_set.count(p->first)) {
4555 inline_set.erase(p->first);
4556 r = chain_fremovexattr(**fd, n);
4557 if (r < 0)
4558 goto out_close;
4559 }
4560 omap_set[p->first].push_back(p->second);
4561 continue;
4562 }
4563
4564 if (!inline_set.count(p->first) &&
4565 inline_set.size() >= m_filestore_max_inline_xattrs) {
4566 omap_set[p->first].push_back(p->second);
4567 continue;
4568 }
4569 omap_remove.insert(p->first);
4570 inline_set.insert(*p);
4571
4572 inline_to_set.insert(*p);
4573 }
4574
4575 if (spill_out != 1 && !omap_set.empty()) {
4576 chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_SPILL_OUT,
4577 sizeof(XATTR_SPILL_OUT));
4578 }
4579
4580 r = _fsetattrs(**fd, inline_to_set);
4581 if (r < 0)
4582 goto out_close;
4583
4584 if (spill_out && !omap_remove.empty()) {
4585 r = object_map->remove_xattrs(oid, omap_remove, &spos);
4586 if (r < 0 && r != -ENOENT) {
31f18b77 4587 dout(10) << __FUNC__ << ": could not remove_xattrs r = " << r << dendl;
7c673cae
FG
4588 assert(!m_filestore_fail_eio || r != -EIO);
4589 goto out_close;
4590 } else {
4591 r = 0; // don't confuse the debug output
4592 }
4593 }
4594
4595 if (!omap_set.empty()) {
4596 r = object_map->set_xattrs(oid, omap_set, &spos);
4597 if (r < 0) {
31f18b77 4598 dout(10) << __FUNC__ << ": could not set_xattrs r = " << r << dendl;
7c673cae
FG
4599 assert(!m_filestore_fail_eio || r != -EIO);
4600 goto out_close;
4601 }
4602 }
4603 out_close:
4604 lfn_close(fd);
4605 out:
31f18b77 4606 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
7c673cae
FG
4607 return r;
4608}
4609
4610
4611int FileStore::_rmattr(const coll_t& cid, const ghobject_t& oid, const char *name,
4612 const SequencerPosition &spos)
4613{
31f18b77 4614 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "'" << dendl;
7c673cae
FG
4615 FDRef fd;
4616 bool spill_out = true;
4617
4618 int r = lfn_open(cid, oid, false, &fd);
4619 if (r < 0) {
4620 goto out;
4621 }
4622
4623 char buf[2];
4624 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4625 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
4626 spill_out = false;
4627 }
4628
4629 char n[CHAIN_XATTR_MAX_NAME_LEN];
4630 get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN);
4631 r = chain_fremovexattr(**fd, n);
4632 if (r == -ENODATA && spill_out) {
4633 Index index;
4634 r = get_index(cid, &index);
4635 if (r < 0) {
31f18b77 4636 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
7c673cae
FG
4637 goto out_close;
4638 }
4639 set<string> to_remove;
4640 to_remove.insert(string(name));
4641 r = object_map->remove_xattrs(oid, to_remove, &spos);
4642 if (r < 0 && r != -ENOENT) {
31f18b77 4643 dout(10) << __FUNC__ << ": could not remove_xattrs index r = " << r << dendl;
7c673cae
FG
4644 assert(!m_filestore_fail_eio || r != -EIO);
4645 goto out_close;
4646 }
4647 }
4648 out_close:
4649 lfn_close(fd);
4650 out:
31f18b77 4651 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "' = " << r << dendl;
7c673cae
FG
4652 return r;
4653}
4654
4655int FileStore::_rmattrs(const coll_t& cid, const ghobject_t& oid,
4656 const SequencerPosition &spos)
4657{
31f18b77 4658 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
7c673cae
FG
4659
4660 map<string,bufferptr> aset;
4661 FDRef fd;
4662 set<string> omap_attrs;
4663 Index index;
4664 bool spill_out = true;
4665
4666 int r = lfn_open(cid, oid, false, &fd);
4667 if (r < 0) {
4668 goto out;
4669 }
4670
4671 char buf[2];
4672 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4673 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
4674 spill_out = false;
4675 }
4676
4677 r = _fgetattrs(**fd, aset);
4678 if (r >= 0) {
4679 for (map<string,bufferptr>::iterator p = aset.begin(); p != aset.end(); ++p) {
4680 char n[CHAIN_XATTR_MAX_NAME_LEN];
4681 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4682 r = chain_fremovexattr(**fd, n);
4683 if (r < 0) {
31f18b77 4684 dout(10) << __FUNC__ << ": could not remove xattr r = " << r << dendl;
7c673cae
FG
4685 goto out_close;
4686 }
4687 }
4688 }
4689
4690 if (!spill_out) {
31f18b77 4691 dout(10) << __FUNC__ << ": no xattr exists in object_map r = " << r << dendl;
7c673cae
FG
4692 goto out_close;
4693 }
4694
4695 r = get_index(cid, &index);
4696 if (r < 0) {
31f18b77 4697 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
7c673cae
FG
4698 goto out_close;
4699 }
4700 {
4701 r = object_map->get_all_xattrs(oid, &omap_attrs);
4702 if (r < 0 && r != -ENOENT) {
31f18b77 4703 dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
7c673cae
FG
4704 assert(!m_filestore_fail_eio || r != -EIO);
4705 goto out_close;
4706 }
4707 r = object_map->remove_xattrs(oid, omap_attrs, &spos);
4708 if (r < 0 && r != -ENOENT) {
31f18b77 4709 dout(10) << __FUNC__ << ": could not remove omap_attrs r = " << r << dendl;
7c673cae
FG
4710 goto out_close;
4711 }
4712 if (r == -ENOENT)
4713 r = 0;
4714 chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
4715 sizeof(XATTR_NO_SPILL_OUT));
4716 }
4717
4718 out_close:
4719 lfn_close(fd);
4720 out:
31f18b77 4721 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
7c673cae
FG
4722 return r;
4723}
4724
4725
4726
4727
4728int FileStore::_collection_remove_recursive(const coll_t &cid,
4729 const SequencerPosition &spos)
4730{
4731 struct stat st;
4732 int r = collection_stat(cid, &st);
4733 if (r < 0) {
4734 if (r == -ENOENT)
4735 return 0;
4736 return r;
4737 }
4738
4739 vector<ghobject_t> objects;
4740 ghobject_t max;
4741 while (!max.is_max()) {
4742 r = collection_list(cid, max, ghobject_t::get_max(),
4743 300, &objects, &max);
4744 if (r < 0)
4745 return r;
4746 for (vector<ghobject_t>::iterator i = objects.begin();
4747 i != objects.end();
4748 ++i) {
4749 assert(_check_replay_guard(cid, *i, spos));
4750 r = _remove(cid, *i, spos);
4751 if (r < 0)
4752 return r;
4753 }
4754 objects.clear();
4755 }
4756 return _destroy_collection(cid);
4757}
4758
4759// --------------------------
4760// collections
4761
4762int FileStore::list_collections(vector<coll_t>& ls)
4763{
4764 return list_collections(ls, false);
4765}
4766
4767int FileStore::list_collections(vector<coll_t>& ls, bool include_temp)
4768{
4769 tracepoint(objectstore, list_collections_enter);
31f18b77 4770 dout(10) << __FUNC__ << dendl;
7c673cae
FG
4771
4772 char fn[PATH_MAX];
4773 snprintf(fn, sizeof(fn), "%s/current", basedir.c_str());
4774
4775 int r = 0;
4776 DIR *dir = ::opendir(fn);
4777 if (!dir) {
4778 r = -errno;
4779 derr << "tried opening directory " << fn << ": " << cpp_strerror(-r) << dendl;
4780 assert(!m_filestore_fail_eio || r != -EIO);
4781 return r;
4782 }
4783
4784 struct dirent *de = nullptr;
4785 while ((de = ::readdir(dir))) {
4786 if (de->d_type == DT_UNKNOWN) {
4787 // d_type not supported (non-ext[234], btrfs), must stat
4788 struct stat sb;
4789 char filename[PATH_MAX];
4790 snprintf(filename, sizeof(filename), "%s/%s", fn, de->d_name);
4791
4792 r = ::stat(filename, &sb);
4793 if (r < 0) {
4794 r = -errno;
4795 derr << "stat on " << filename << ": " << cpp_strerror(-r) << dendl;
4796 assert(!m_filestore_fail_eio || r != -EIO);
4797 break;
4798 }
4799 if (!S_ISDIR(sb.st_mode)) {
4800 continue;
4801 }
4802 } else if (de->d_type != DT_DIR) {
4803 continue;
4804 }
4805 if (strcmp(de->d_name, "omap") == 0) {
4806 continue;
4807 }
4808 if (de->d_name[0] == '.' &&
4809 (de->d_name[1] == '\0' ||
4810 (de->d_name[1] == '.' &&
4811 de->d_name[2] == '\0')))
4812 continue;
4813 coll_t cid;
4814 if (!cid.parse(de->d_name)) {
4815 derr << "ignoring invalid collection '" << de->d_name << "'" << dendl;
4816 continue;
4817 }
4818 if (!cid.is_temp() || include_temp)
4819 ls.push_back(cid);
4820 }
4821
4822 if (r > 0) {
4823 derr << "trying readdir " << fn << ": " << cpp_strerror(r) << dendl;
4824 r = -r;
4825 }
4826
4827 ::closedir(dir);
4828 assert(!m_filestore_fail_eio || r != -EIO);
4829 tracepoint(objectstore, list_collections_exit, r);
4830 return r;
4831}
4832
4833int FileStore::collection_stat(const coll_t& c, struct stat *st)
4834{
4835 tracepoint(objectstore, collection_stat_enter, c.c_str());
4836 char fn[PATH_MAX];
4837 get_cdir(c, fn, sizeof(fn));
31f18b77 4838 dout(15) << __FUNC__ << ": " << fn << dendl;
7c673cae
FG
4839 int r = ::stat(fn, st);
4840 if (r < 0)
4841 r = -errno;
31f18b77 4842 dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
7c673cae
FG
4843 assert(!m_filestore_fail_eio || r != -EIO);
4844 tracepoint(objectstore, collection_stat_exit, r);
4845 return r;
4846}
4847
4848bool FileStore::collection_exists(const coll_t& c)
4849{
4850 tracepoint(objectstore, collection_exists_enter, c.c_str());
4851 struct stat st;
4852 bool ret = collection_stat(c, &st) == 0;
4853 tracepoint(objectstore, collection_exists_exit, ret);
4854 return ret;
4855}
4856
4857int FileStore::collection_empty(const coll_t& c, bool *empty)
4858{
4859 tracepoint(objectstore, collection_empty_enter, c.c_str());
31f18b77 4860 dout(15) << __FUNC__ << ": " << c << dendl;
7c673cae
FG
4861 Index index;
4862 int r = get_index(c, &index);
4863 if (r < 0) {
31f18b77 4864 derr << __FUNC__ << ": get_index returned: " << cpp_strerror(r)
7c673cae
FG
4865 << dendl;
4866 return r;
4867 }
4868
4869 assert(NULL != index.index);
4870 RWLock::RLocker l((index.index)->access_lock);
4871
4872 vector<ghobject_t> ls;
4873 r = index->collection_list_partial(ghobject_t(), ghobject_t::get_max(),
4874 1, &ls, NULL);
4875 if (r < 0) {
31f18b77 4876 derr << __FUNC__ << ": collection_list_partial returned: "
7c673cae
FG
4877 << cpp_strerror(r) << dendl;
4878 assert(!m_filestore_fail_eio || r != -EIO);
4879 return r;
4880 }
4881 *empty = ls.empty();
4882 tracepoint(objectstore, collection_empty_exit, *empty);
4883 return 0;
4884}
4885
4886int FileStore::_collection_set_bits(const coll_t& c, int bits)
4887{
4888 char fn[PATH_MAX];
4889 get_cdir(c, fn, sizeof(fn));
31f18b77 4890 dout(10) << __FUNC__ << ": " << fn << " " << bits << dendl;
7c673cae
FG
4891 char n[PATH_MAX];
4892 int r;
4893 int32_t v = bits;
4894 int fd = ::open(fn, O_RDONLY);
4895 if (fd < 0) {
4896 r = -errno;
4897 goto out;
4898 }
4899 get_attrname("bits", n, PATH_MAX);
4900 r = chain_fsetxattr(fd, n, (char*)&v, sizeof(v));
4901 VOID_TEMP_FAILURE_RETRY(::close(fd));
4902 out:
31f18b77 4903 dout(10) << __FUNC__ << ": " << fn << " " << bits << " = " << r << dendl;
7c673cae
FG
4904 return r;
4905}
4906
4907int FileStore::collection_bits(const coll_t& c)
4908{
4909 char fn[PATH_MAX];
4910 get_cdir(c, fn, sizeof(fn));
31f18b77 4911 dout(15) << __FUNC__ << ": " << fn << dendl;
7c673cae
FG
4912 int r;
4913 char n[PATH_MAX];
4914 int32_t bits;
4915 int fd = ::open(fn, O_RDONLY);
4916 if (fd < 0) {
4917 bits = r = -errno;
4918 goto out;
4919 }
4920 get_attrname("bits", n, PATH_MAX);
4921 r = chain_fgetxattr(fd, n, (char*)&bits, sizeof(bits));
4922 VOID_TEMP_FAILURE_RETRY(::close(fd));
4923 if (r < 0) {
4924 bits = r;
4925 goto out;
4926 }
4927 out:
31f18b77 4928 dout(10) << __FUNC__ << ": " << fn << " = " << bits << dendl;
7c673cae
FG
4929 return bits;
4930}
4931
4932int FileStore::collection_list(const coll_t& c,
4933 const ghobject_t& orig_start,
4934 const ghobject_t& end,
4935 int max,
4936 vector<ghobject_t> *ls, ghobject_t *next)
4937{
4938 ghobject_t start = orig_start;
4939 if (start.is_max())
4940 return 0;
4941
4942 ghobject_t temp_next;
4943 if (!next)
4944 next = &temp_next;
4945 // figure out the pool id. we need this in order to generate a
4946 // meaningful 'next' value.
4947 int64_t pool = -1;
4948 shard_id_t shard;
4949 {
4950 spg_t pgid;
4951 if (c.is_temp(&pgid)) {
4952 pool = -2 - pgid.pool();
4953 shard = pgid.shard;
4954 } else if (c.is_pg(&pgid)) {
4955 pool = pgid.pool();
4956 shard = pgid.shard;
4957 } else if (c.is_meta()) {
4958 pool = -1;
4959 shard = shard_id_t::NO_SHARD;
4960 } else {
4961 // hrm, the caller is test code! we should get kill it off. for now,
4962 // tolerate it.
4963 pool = 0;
4964 shard = shard_id_t::NO_SHARD;
4965 }
31f18b77 4966 dout(20) << __FUNC__ << ": pool is " << pool << " shard is " << shard
7c673cae
FG
4967 << " pgid " << pgid << dendl;
4968 }
4969 ghobject_t sep;
4970 sep.hobj.pool = -1;
4971 sep.set_shard(shard);
4972 if (!c.is_temp() && !c.is_meta()) {
4973 if (start < sep) {
31f18b77 4974 dout(10) << __FUNC__ << ": first checking temp pool" << dendl;
7c673cae
FG
4975 coll_t temp = c.get_temp();
4976 int r = collection_list(temp, start, end, max, ls, next);
4977 if (r < 0)
4978 return r;
4979 if (*next != ghobject_t::get_max())
4980 return r;
4981 start = sep;
31f18b77 4982 dout(10) << __FUNC__ << ": fall through to non-temp collection, start "
7c673cae
FG
4983 << start << dendl;
4984 } else {
31f18b77 4985 dout(10) << __FUNC__ << ": start " << start << " >= sep " << sep << dendl;
7c673cae
FG
4986 }
4987 }
4988
4989 Index index;
4990 int r = get_index(c, &index);
4991 if (r < 0)
4992 return r;
4993
4994 assert(NULL != index.index);
4995 RWLock::RLocker l((index.index)->access_lock);
4996
4997 r = index->collection_list_partial(start, end, max, ls, next);
4998
4999 if (r < 0) {
5000 assert(!m_filestore_fail_eio || r != -EIO);
5001 return r;
5002 }
5003 dout(20) << "objects: " << *ls << dendl;
5004
5005 // HashIndex doesn't know the pool when constructing a 'next' value
5006 if (next && !next->is_max()) {
5007 next->hobj.pool = pool;
5008 next->set_shard(shard);
5009 dout(20) << " next " << *next << dendl;
5010 }
5011
5012 return 0;
5013}
5014
5015int FileStore::omap_get(const coll_t& _c, const ghobject_t &hoid,
5016 bufferlist *header,
5017 map<string, bufferlist> *out)
5018{
5019 tracepoint(objectstore, omap_get_enter, _c.c_str());
5020 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
31f18b77 5021 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
7c673cae
FG
5022 Index index;
5023 int r = get_index(c, &index);
5024 if (r < 0)
5025 return r;
5026 {
5027 assert(NULL != index.index);
5028 RWLock::RLocker l((index.index)->access_lock);
5029 r = lfn_find(hoid, index);
5030 if (r < 0)
5031 return r;
5032 }
5033 r = object_map->get(hoid, header, out);
5034 if (r < 0 && r != -ENOENT) {
5035 assert(!m_filestore_fail_eio || r != -EIO);
5036 return r;
5037 }
5038 tracepoint(objectstore, omap_get_exit, 0);
5039 return 0;
5040}
5041
5042int FileStore::omap_get_header(
5043 const coll_t& _c,
5044 const ghobject_t &hoid,
5045 bufferlist *bl,
5046 bool allow_eio)
5047{
5048 tracepoint(objectstore, omap_get_header_enter, _c.c_str());
5049 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
31f18b77 5050 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
7c673cae
FG
5051 Index index;
5052 int r = get_index(c, &index);
5053 if (r < 0)
5054 return r;
5055 {
5056 assert(NULL != index.index);
5057 RWLock::RLocker l((index.index)->access_lock);
5058 r = lfn_find(hoid, index);
5059 if (r < 0)
5060 return r;
5061 }
5062 r = object_map->get_header(hoid, bl);
5063 if (r < 0 && r != -ENOENT) {
5064 assert(allow_eio || !m_filestore_fail_eio || r != -EIO);
5065 return r;
5066 }
5067 tracepoint(objectstore, omap_get_header_exit, 0);
5068 return 0;
5069}
5070
5071int FileStore::omap_get_keys(const coll_t& _c, const ghobject_t &hoid, set<string> *keys)
5072{
5073 tracepoint(objectstore, omap_get_keys_enter, _c.c_str());
5074 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
31f18b77 5075 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
7c673cae
FG
5076 Index index;
5077 int r = get_index(c, &index);
5078 if (r < 0)
5079 return r;
5080 {
5081 assert(NULL != index.index);
5082 RWLock::RLocker l((index.index)->access_lock);
5083 r = lfn_find(hoid, index);
5084 if (r < 0)
5085 return r;
5086 }
5087 r = object_map->get_keys(hoid, keys);
5088 if (r < 0 && r != -ENOENT) {
5089 assert(!m_filestore_fail_eio || r != -EIO);
5090 return r;
5091 }
5092 tracepoint(objectstore, omap_get_keys_exit, 0);
5093 return 0;
5094}
5095
5096int FileStore::omap_get_values(const coll_t& _c, const ghobject_t &hoid,
5097 const set<string> &keys,
5098 map<string, bufferlist> *out)
5099{
5100 tracepoint(objectstore, omap_get_values_enter, _c.c_str());
5101 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
31f18b77 5102 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
7c673cae
FG
5103 Index index;
5104 const char *where = "()";
5105 int r = get_index(c, &index);
5106 if (r < 0) {
5107 where = " (get_index)";
5108 goto out;
5109 }
5110 {
5111 assert(NULL != index.index);
5112 RWLock::RLocker l((index.index)->access_lock);
5113 r = lfn_find(hoid, index);
5114 if (r < 0) {
5115 where = " (lfn_find)";
5116 goto out;
5117 }
5118 }
5119 r = object_map->get_values(hoid, keys, out);
5120 if (r < 0 && r != -ENOENT) {
5121 assert(!m_filestore_fail_eio || r != -EIO);
5122 where = " (get_values)";
5123 goto out;
5124 }
5125 r = 0;
5126 out:
5127 tracepoint(objectstore, omap_get_values_exit, r);
31f18b77 5128 dout(15) << __FUNC__ << ": " << c << "/" << hoid << " = " << r
7c673cae
FG
5129 << where << dendl;
5130 return r;
5131}
5132
5133int FileStore::omap_check_keys(const coll_t& _c, const ghobject_t &hoid,
5134 const set<string> &keys,
5135 set<string> *out)
5136{
5137 tracepoint(objectstore, omap_check_keys_enter, _c.c_str());
5138 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
31f18b77 5139 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
7c673cae
FG
5140
5141 Index index;
5142 int r = get_index(c, &index);
5143 if (r < 0)
5144 return r;
5145 {
5146 assert(NULL != index.index);
5147 RWLock::RLocker l((index.index)->access_lock);
5148 r = lfn_find(hoid, index);
5149 if (r < 0)
5150 return r;
5151 }
5152 r = object_map->check_keys(hoid, keys, out);
5153 if (r < 0 && r != -ENOENT) {
5154 assert(!m_filestore_fail_eio || r != -EIO);
5155 return r;
5156 }
5157 tracepoint(objectstore, omap_check_keys_exit, 0);
5158 return 0;
5159}
5160
5161ObjectMap::ObjectMapIterator FileStore::get_omap_iterator(const coll_t& _c,
5162 const ghobject_t &hoid)
5163{
5164 tracepoint(objectstore, get_omap_iterator, _c.c_str());
5165 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
31f18b77 5166 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
7c673cae
FG
5167 Index index;
5168 int r = get_index(c, &index);
5169 if (r < 0) {
31f18b77 5170 dout(10) << __FUNC__ << ": " << c << "/" << hoid << " = 0 "
7c673cae
FG
5171 << "(get_index failed with " << cpp_strerror(r) << ")" << dendl;
5172 return ObjectMap::ObjectMapIterator();
5173 }
5174 {
5175 assert(NULL != index.index);
5176 RWLock::RLocker l((index.index)->access_lock);
5177 r = lfn_find(hoid, index);
5178 if (r < 0) {
31f18b77 5179 dout(10) << __FUNC__ << ": " << c << "/" << hoid << " = 0 "
7c673cae
FG
5180 << "(lfn_find failed with " << cpp_strerror(r) << ")" << dendl;
5181 return ObjectMap::ObjectMapIterator();
5182 }
5183 }
5184 return object_map->get_iterator(hoid);
5185}
5186
5187int FileStore::_collection_hint_expected_num_objs(const coll_t& c, uint32_t pg_num,
5188 uint64_t expected_num_objs,
5189 const SequencerPosition &spos)
5190{
31f18b77 5191 dout(15) << __FUNC__ << ": collection: " << c << " pg number: "
7c673cae
FG
5192 << pg_num << " expected number of objects: " << expected_num_objs << dendl;
5193
5194 bool empty;
5195 int ret = collection_empty(c, &empty);
5196 if (ret < 0)
5197 return ret;
5198 if (!empty && !replaying) {
5199 dout(0) << "Failed to give an expected number of objects hint to collection : "
5200 << c << ", only empty collection can take such type of hint. " << dendl;
5201 return 0;
5202 }
5203
5204 Index index;
5205 ret = get_index(c, &index);
5206 if (ret < 0)
5207 return ret;
5208 // Pre-hash the collection
5209 ret = index->pre_hash_collection(pg_num, expected_num_objs);
5210 dout(10) << "pre_hash_collection " << c << " = " << ret << dendl;
5211 if (ret < 0)
5212 return ret;
5213 _set_replay_guard(c, spos);
5214
5215 return 0;
5216}
5217
5218int FileStore::_create_collection(
5219 const coll_t& c,
5220 int bits,
5221 const SequencerPosition &spos)
5222{
5223 char fn[PATH_MAX];
5224 get_cdir(c, fn, sizeof(fn));
31f18b77 5225 dout(15) << __FUNC__ << ": " << fn << dendl;
7c673cae
FG
5226 int r = ::mkdir(fn, 0755);
5227 if (r < 0)
5228 r = -errno;
5229 if (r == -EEXIST && replaying)
5230 r = 0;
31f18b77 5231 dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
7c673cae
FG
5232
5233 if (r < 0)
5234 return r;
5235 r = init_index(c);
5236 if (r < 0)
5237 return r;
5238 r = _collection_set_bits(c, bits);
5239 if (r < 0)
5240 return r;
5241 // create parallel temp collection, too
5242 if (!c.is_meta() && !c.is_temp()) {
5243 coll_t temp = c.get_temp();
5244 r = _create_collection(temp, 0, spos);
5245 if (r < 0)
5246 return r;
5247 }
5248
5249 _set_replay_guard(c, spos);
5250 return 0;
5251}
5252
5253int FileStore::_destroy_collection(const coll_t& c)
5254{
5255 int r = 0;
5256 char fn[PATH_MAX];
5257 get_cdir(c, fn, sizeof(fn));
31f18b77 5258 dout(15) << __FUNC__ << ": " << fn << dendl;
7c673cae
FG
5259 {
5260 Index from;
5261 r = get_index(c, &from);
5262 if (r < 0)
5263 goto out;
5264 assert(NULL != from.index);
5265 RWLock::WLocker l((from.index)->access_lock);
5266
5267 r = from->prep_delete();
5268 if (r < 0)
5269 goto out;
5270 }
5271 r = ::rmdir(fn);
5272 if (r < 0) {
5273 r = -errno;
5274 goto out;
5275 }
5276
5277 out:
5278 // destroy parallel temp collection, too
5279 if (!c.is_meta() && !c.is_temp()) {
5280 coll_t temp = c.get_temp();
5281 int r2 = _destroy_collection(temp);
5282 if (r2 < 0) {
5283 r = r2;
5284 goto out_final;
5285 }
5286 }
5287
5288 out_final:
31f18b77 5289 dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
7c673cae
FG
5290 return r;
5291}
5292
5293
5294int FileStore::_collection_add(const coll_t& c, const coll_t& oldcid, const ghobject_t& o,
5295 const SequencerPosition& spos)
5296{
31f18b77 5297 dout(15) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << o << dendl;
7c673cae
FG
5298
5299 int dstcmp = _check_replay_guard(c, o, spos);
5300 if (dstcmp < 0)
5301 return 0;
5302
5303 // check the src name too; it might have a newer guard, and we don't
5304 // want to clobber it
5305 int srccmp = _check_replay_guard(oldcid, o, spos);
5306 if (srccmp < 0)
5307 return 0;
5308
5309 // open guard on object so we don't any previous operations on the
5310 // new name that will modify the source inode.
5311 FDRef fd;
5312 int r = lfn_open(oldcid, o, 0, &fd);
5313 if (r < 0) {
5314 // the source collection/object does not exist. If we are replaying, we
5315 // should be safe, so just return 0 and move on.
5316 assert(replaying);
31f18b77 5317 dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
7c673cae
FG
5318 << oldcid << "/" << o << " (dne, continue replay) " << dendl;
5319 return 0;
5320 }
5321 if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress"
5322 _set_replay_guard(**fd, spos, &o, true);
5323 }
5324
5325 r = lfn_link(oldcid, c, o, o);
5326 if (replaying && !backend->can_checkpoint() &&
5327 r == -EEXIST) // crashed between link() and set_replay_guard()
5328 r = 0;
5329
5330 _inject_failure();
5331
5332 // close guard on object so we don't do this again
5333 if (r == 0) {
5334 _close_replay_guard(**fd, spos);
5335 }
5336 lfn_close(fd);
5337
31f18b77 5338 dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << o << " = " << r << dendl;
7c673cae
FG
5339 return r;
5340}
5341
5342int FileStore::_collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid,
5343 coll_t c, const ghobject_t& o,
5344 const SequencerPosition& spos,
5345 bool allow_enoent)
5346{
31f18b77 5347 dout(15) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid << dendl;
7c673cae
FG
5348 int r = 0;
5349 int dstcmp, srccmp;
5350
5351 if (replaying) {
5352 /* If the destination collection doesn't exist during replay,
5353 * we need to delete the src object and continue on
5354 */
5355 if (!collection_exists(c))
5356 goto out_rm_src;
5357 }
5358
5359 dstcmp = _check_replay_guard(c, o, spos);
5360 if (dstcmp < 0)
5361 goto out_rm_src;
5362
5363 // check the src name too; it might have a newer guard, and we don't
5364 // want to clobber it
5365 srccmp = _check_replay_guard(oldcid, oldoid, spos);
5366 if (srccmp < 0)
5367 return 0;
5368
5369 {
5370 // open guard on object so we don't any previous operations on the
5371 // new name that will modify the source inode.
5372 FDRef fd;
5373 r = lfn_open(oldcid, oldoid, 0, &fd);
5374 if (r < 0) {
5375 // the source collection/object does not exist. If we are replaying, we
5376 // should be safe, so just return 0 and move on.
5377 if (replaying) {
31f18b77 5378 dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
7c673cae
FG
5379 << oldcid << "/" << oldoid << " (dne, continue replay) " << dendl;
5380 } else if (allow_enoent) {
31f18b77 5381 dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
7c673cae
FG
5382 << oldcid << "/" << oldoid << " (dne, ignoring enoent)"
5383 << dendl;
5384 } else {
5385 assert(0 == "ERROR: source must exist");
5386 }
5387
5388 if (!replaying) {
5389 return 0;
5390 }
5391 if (allow_enoent && dstcmp > 0) { // if dstcmp == 0, try_rename was started.
5392 return 0;
5393 }
5394
5395 r = 0; // don't know if object_map was cloned
5396 } else {
5397 if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress"
5398 _set_replay_guard(**fd, spos, &o, true);
5399 }
5400
5401 r = lfn_link(oldcid, c, oldoid, o);
5402 if (replaying && !backend->can_checkpoint() &&
5403 r == -EEXIST) // crashed between link() and set_replay_guard()
5404 r = 0;
5405
5406 lfn_close(fd);
5407 fd = FDRef();
5408
5409 _inject_failure();
5410 }
5411
5412 if (r == 0) {
5413 // the name changed; link the omap content
5414 r = object_map->rename(oldoid, o, &spos);
5415 if (r == -ENOENT)
5416 r = 0;
5417 }
5418
5419 _inject_failure();
5420
5421 if (r == 0)
5422 r = lfn_unlink(oldcid, oldoid, spos, true);
5423
5424 if (r == 0)
5425 r = lfn_open(c, o, 0, &fd);
5426
5427 // close guard on object so we don't do this again
5428 if (r == 0) {
5429 _close_replay_guard(**fd, spos, &o);
5430 lfn_close(fd);
5431 }
5432 }
5433
31f18b77 5434 dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid
7c673cae
FG
5435 << " = " << r << dendl;
5436 return r;
5437
5438 out_rm_src:
5439 // remove source
5440 if (_check_replay_guard(oldcid, oldoid, spos) > 0) {
5441 r = lfn_unlink(oldcid, oldoid, spos, true);
5442 }
5443
31f18b77 5444 dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid
7c673cae
FG
5445 << " = " << r << dendl;
5446 return r;
5447}
5448
5449void FileStore::_inject_failure()
5450{
31f18b77
FG
5451 if (m_filestore_kill_at) {
5452 int final = --m_filestore_kill_at;
5453 dout(5) << __FUNC__ << ": " << (final+1) << " -> " << final << dendl;
7c673cae 5454 if (final == 0) {
31f18b77 5455 derr << __FUNC__ << ": KILLING" << dendl;
7c673cae
FG
5456 cct->_log->flush();
5457 _exit(1);
5458 }
5459 }
5460}
5461
5462int FileStore::_omap_clear(const coll_t& cid, const ghobject_t &hoid,
5463 const SequencerPosition &spos) {
31f18b77 5464 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
7c673cae
FG
5465 Index index;
5466 int r = get_index(cid, &index);
5467 if (r < 0)
5468 return r;
5469 {
5470 assert(NULL != index.index);
5471 RWLock::RLocker l((index.index)->access_lock);
5472 r = lfn_find(hoid, index);
5473 if (r < 0)
5474 return r;
5475 }
5476 r = object_map->clear_keys_header(hoid, &spos);
5477 if (r < 0 && r != -ENOENT)
5478 return r;
5479 return 0;
5480}
5481
5482int FileStore::_omap_setkeys(const coll_t& cid, const ghobject_t &hoid,
5483 const map<string, bufferlist> &aset,
5484 const SequencerPosition &spos) {
31f18b77 5485 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
7c673cae
FG
5486 Index index;
5487 int r;
5488 //treat pgmeta as a logical object, skip to check exist
5489 if (hoid.is_pgmeta())
5490 goto skip;
5491
5492 r = get_index(cid, &index);
5493 if (r < 0) {
31f18b77 5494 dout(20) << __FUNC__ << ": get_index got " << cpp_strerror(r) << dendl;
7c673cae
FG
5495 return r;
5496 }
5497 {
5498 assert(NULL != index.index);
5499 RWLock::RLocker l((index.index)->access_lock);
5500 r = lfn_find(hoid, index);
5501 if (r < 0) {
31f18b77 5502 dout(20) << __FUNC__ << ": lfn_find got " << cpp_strerror(r) << dendl;
7c673cae
FG
5503 return r;
5504 }
5505 }
5506skip:
5507 if (g_conf->subsys.should_gather(ceph_subsys_filestore, 20)) {
5508 for (auto& p : aset) {
31f18b77 5509 dout(20) << __FUNC__ << ": set " << p.first << dendl;
7c673cae
FG
5510 }
5511 }
5512 r = object_map->set_keys(hoid, aset, &spos);
31f18b77 5513 dout(20) << __FUNC__ << ": " << cid << "/" << hoid << " = " << r << dendl;
7c673cae
FG
5514 return r;
5515}
5516
5517int FileStore::_omap_rmkeys(const coll_t& cid, const ghobject_t &hoid,
5518 const set<string> &keys,
5519 const SequencerPosition &spos) {
31f18b77 5520 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
7c673cae
FG
5521 Index index;
5522 int r;
5523 //treat pgmeta as a logical object, skip to check exist
5524 if (hoid.is_pgmeta())
5525 goto skip;
5526
5527 r = get_index(cid, &index);
5528 if (r < 0)
5529 return r;
5530 {
5531 assert(NULL != index.index);
5532 RWLock::RLocker l((index.index)->access_lock);
5533 r = lfn_find(hoid, index);
5534 if (r < 0)
5535 return r;
5536 }
5537skip:
5538 r = object_map->rm_keys(hoid, keys, &spos);
5539 if (r < 0 && r != -ENOENT)
5540 return r;
5541 return 0;
5542}
5543
5544int FileStore::_omap_rmkeyrange(const coll_t& cid, const ghobject_t &hoid,
5545 const string& first, const string& last,
5546 const SequencerPosition &spos) {
31f18b77 5547 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << " [" << first << "," << last << "]" << dendl;
7c673cae
FG
5548 set<string> keys;
5549 {
5550 ObjectMap::ObjectMapIterator iter = get_omap_iterator(cid, hoid);
5551 if (!iter)
5552 return -ENOENT;
5553 for (iter->lower_bound(first); iter->valid() && iter->key() < last;
5554 iter->next()) {
5555 keys.insert(iter->key());
5556 }
5557 }
5558 return _omap_rmkeys(cid, hoid, keys, spos);
5559}
5560
5561int FileStore::_omap_setheader(const coll_t& cid, const ghobject_t &hoid,
5562 const bufferlist &bl,
5563 const SequencerPosition &spos)
5564{
31f18b77 5565 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
7c673cae
FG
5566 Index index;
5567 int r = get_index(cid, &index);
5568 if (r < 0)
5569 return r;
5570 {
5571 assert(NULL != index.index);
5572 RWLock::RLocker l((index.index)->access_lock);
5573 r = lfn_find(hoid, index);
5574 if (r < 0)
5575 return r;
5576 }
5577 return object_map->set_header(hoid, bl, &spos);
5578}
5579
5580int FileStore::_split_collection(const coll_t& cid,
5581 uint32_t bits,
5582 uint32_t rem,
5583 coll_t dest,
5584 const SequencerPosition &spos)
5585{
5586 int r;
5587 {
31f18b77 5588 dout(15) << __FUNC__ << ": " << cid << " bits: " << bits << dendl;
7c673cae 5589 if (!collection_exists(cid)) {
31f18b77 5590 dout(2) << __FUNC__ << ": " << cid << " DNE" << dendl;
7c673cae
FG
5591 assert(replaying);
5592 return 0;
5593 }
5594 if (!collection_exists(dest)) {
31f18b77 5595 dout(2) << __FUNC__ << ": " << dest << " DNE" << dendl;
7c673cae
FG
5596 assert(replaying);
5597 return 0;
5598 }
5599
5600 int dstcmp = _check_replay_guard(dest, spos);
5601 if (dstcmp < 0)
5602 return 0;
5603
5604 int srccmp = _check_replay_guard(cid, spos);
5605 if (srccmp < 0)
5606 return 0;
5607
5608 _set_global_replay_guard(cid, spos);
5609 _set_replay_guard(cid, spos, true);
5610 _set_replay_guard(dest, spos, true);
5611
5612 Index from;
5613 r = get_index(cid, &from);
5614
5615 Index to;
5616 if (!r)
5617 r = get_index(dest, &to);
5618
5619 if (!r) {
5620 assert(NULL != from.index);
5621 RWLock::WLocker l1((from.index)->access_lock);
5622
5623 assert(NULL != to.index);
5624 RWLock::WLocker l2((to.index)->access_lock);
5625
5626 r = from->split(rem, bits, to.index);
5627 }
5628
5629 _close_replay_guard(cid, spos);
5630 _close_replay_guard(dest, spos);
5631 }
5632 _collection_set_bits(cid, bits);
5633 if (!r && cct->_conf->filestore_debug_verify_split) {
5634 vector<ghobject_t> objects;
5635 ghobject_t next;
5636 while (1) {
5637 collection_list(
5638 cid,
5639 next, ghobject_t::get_max(),
5640 get_ideal_list_max(),
5641 &objects,
5642 &next);
5643 if (objects.empty())
5644 break;
5645 for (vector<ghobject_t>::iterator i = objects.begin();
5646 i != objects.end();
5647 ++i) {
31f18b77 5648 dout(20) << __FUNC__ << ": " << *i << " still in source "
7c673cae
FG
5649 << cid << dendl;
5650 assert(!i->match(bits, rem));
5651 }
5652 objects.clear();
5653 }
5654 next = ghobject_t();
5655 while (1) {
5656 collection_list(
5657 dest,
5658 next, ghobject_t::get_max(),
5659 get_ideal_list_max(),
5660 &objects,
5661 &next);
5662 if (objects.empty())
5663 break;
5664 for (vector<ghobject_t>::iterator i = objects.begin();
5665 i != objects.end();
5666 ++i) {
31f18b77 5667 dout(20) << __FUNC__ << ": " << *i << " now in dest "
7c673cae
FG
5668 << *i << dendl;
5669 assert(i->match(bits, rem));
5670 }
5671 objects.clear();
5672 }
5673 }
5674 return r;
5675}
5676
5677int FileStore::_set_alloc_hint(const coll_t& cid, const ghobject_t& oid,
5678 uint64_t expected_object_size,
5679 uint64_t expected_write_size)
5680{
31f18b77 5681 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " object_size " << expected_object_size << " write_size " << expected_write_size << dendl;
7c673cae
FG
5682
5683 FDRef fd;
5684 int ret = 0;
5685
5686 if (expected_object_size == 0 || expected_write_size == 0)
5687 goto out;
5688
5689 ret = lfn_open(cid, oid, false, &fd);
5690 if (ret < 0)
5691 goto out;
5692
5693 {
5694 // TODO: a more elaborate hint calculation
5695 uint64_t hint = MIN(expected_write_size, m_filestore_max_alloc_hint_size);
5696
5697 ret = backend->set_alloc_hint(**fd, hint);
31f18b77 5698 dout(20) << __FUNC__ << ": hint " << hint << " ret " << ret << dendl;
7c673cae
FG
5699 }
5700
5701 lfn_close(fd);
5702out:
31f18b77 5703 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " object_size " << expected_object_size << " write_size " << expected_write_size << " = " << ret << dendl;
7c673cae
FG
5704 assert(!m_filestore_fail_eio || ret != -EIO);
5705 return ret;
5706}
5707
5708const char** FileStore::get_tracked_conf_keys() const
5709{
5710 static const char* KEYS[] = {
5711 "filestore_max_inline_xattr_size",
5712 "filestore_max_inline_xattr_size_xfs",
5713 "filestore_max_inline_xattr_size_btrfs",
5714 "filestore_max_inline_xattr_size_other",
5715 "filestore_max_inline_xattrs",
5716 "filestore_max_inline_xattrs_xfs",
5717 "filestore_max_inline_xattrs_btrfs",
5718 "filestore_max_inline_xattrs_other",
5719 "filestore_max_xattr_value_size",
5720 "filestore_max_xattr_value_size_xfs",
5721 "filestore_max_xattr_value_size_btrfs",
5722 "filestore_max_xattr_value_size_other",
5723 "filestore_min_sync_interval",
5724 "filestore_max_sync_interval",
5725 "filestore_queue_max_ops",
5726 "filestore_queue_max_bytes",
5727 "filestore_expected_throughput_bytes",
5728 "filestore_expected_throughput_ops",
5729 "filestore_queue_low_threshhold",
5730 "filestore_queue_high_threshhold",
5731 "filestore_queue_high_delay_multiple",
5732 "filestore_queue_max_delay_multiple",
5733 "filestore_commit_timeout",
5734 "filestore_dump_file",
5735 "filestore_kill_at",
5736 "filestore_fail_eio",
5737 "filestore_fadvise",
5738 "filestore_sloppy_crc",
5739 "filestore_sloppy_crc_block_size",
5740 "filestore_max_alloc_hint_size",
5741 NULL
5742 };
5743 return KEYS;
5744}
5745
5746void FileStore::handle_conf_change(const struct md_config_t *conf,
5747 const std::set <std::string> &changed)
5748{
5749 if (changed.count("filestore_max_inline_xattr_size") ||
5750 changed.count("filestore_max_inline_xattr_size_xfs") ||
5751 changed.count("filestore_max_inline_xattr_size_btrfs") ||
5752 changed.count("filestore_max_inline_xattr_size_other") ||
5753 changed.count("filestore_max_inline_xattrs") ||
5754 changed.count("filestore_max_inline_xattrs_xfs") ||
5755 changed.count("filestore_max_inline_xattrs_btrfs") ||
5756 changed.count("filestore_max_inline_xattrs_other") ||
5757 changed.count("filestore_max_xattr_value_size") ||
5758 changed.count("filestore_max_xattr_value_size_xfs") ||
5759 changed.count("filestore_max_xattr_value_size_btrfs") ||
5760 changed.count("filestore_max_xattr_value_size_other")) {
5761 if (backend) {
5762 Mutex::Locker l(lock);
5763 set_xattr_limits_via_conf();
5764 }
5765 }
5766
5767 if (changed.count("filestore_queue_max_bytes") ||
5768 changed.count("filestore_queue_max_ops") ||
5769 changed.count("filestore_expected_throughput_bytes") ||
5770 changed.count("filestore_expected_throughput_ops") ||
5771 changed.count("filestore_queue_low_threshhold") ||
5772 changed.count("filestore_queue_high_threshhold") ||
5773 changed.count("filestore_queue_high_delay_multiple") ||
5774 changed.count("filestore_queue_max_delay_multiple")) {
5775 Mutex::Locker l(lock);
5776 set_throttle_params();
5777 }
5778
5779 if (changed.count("filestore_min_sync_interval") ||
5780 changed.count("filestore_max_sync_interval") ||
5781 changed.count("filestore_kill_at") ||
5782 changed.count("filestore_fail_eio") ||
5783 changed.count("filestore_sloppy_crc") ||
5784 changed.count("filestore_sloppy_crc_block_size") ||
5785 changed.count("filestore_max_alloc_hint_size") ||
5786 changed.count("filestore_fadvise")) {
5787 Mutex::Locker l(lock);
5788 m_filestore_min_sync_interval = conf->filestore_min_sync_interval;
5789 m_filestore_max_sync_interval = conf->filestore_max_sync_interval;
31f18b77 5790 m_filestore_kill_at = conf->filestore_kill_at;
7c673cae
FG
5791 m_filestore_fail_eio = conf->filestore_fail_eio;
5792 m_filestore_fadvise = conf->filestore_fadvise;
5793 m_filestore_sloppy_crc = conf->filestore_sloppy_crc;
5794 m_filestore_sloppy_crc_block_size = conf->filestore_sloppy_crc_block_size;
5795 m_filestore_max_alloc_hint_size = conf->filestore_max_alloc_hint_size;
5796 }
5797 if (changed.count("filestore_commit_timeout")) {
5798 Mutex::Locker l(sync_entry_timeo_lock);
5799 m_filestore_commit_timeout = conf->filestore_commit_timeout;
5800 }
5801 if (changed.count("filestore_dump_file")) {
5802 if (conf->filestore_dump_file.length() &&
5803 conf->filestore_dump_file != "-") {
5804 dump_start(conf->filestore_dump_file);
5805 } else {
5806 dump_stop();
5807 }
5808 }
5809}
5810
5811int FileStore::set_throttle_params()
5812{
5813 stringstream ss;
5814 bool valid = throttle_bytes.set_params(
5815 cct->_conf->filestore_queue_low_threshhold,
5816 cct->_conf->filestore_queue_high_threshhold,
5817 cct->_conf->filestore_expected_throughput_bytes,
5818 cct->_conf->filestore_queue_high_delay_multiple,
5819 cct->_conf->filestore_queue_max_delay_multiple,
5820 cct->_conf->filestore_queue_max_bytes,
5821 &ss);
5822
5823 valid &= throttle_ops.set_params(
5824 cct->_conf->filestore_queue_low_threshhold,
5825 cct->_conf->filestore_queue_high_threshhold,
5826 cct->_conf->filestore_expected_throughput_ops,
5827 cct->_conf->filestore_queue_high_delay_multiple,
5828 cct->_conf->filestore_queue_max_delay_multiple,
5829 cct->_conf->filestore_queue_max_ops,
5830 &ss);
5831
5832 logger->set(l_filestore_op_queue_max_ops, throttle_ops.get_max());
5833 logger->set(l_filestore_op_queue_max_bytes, throttle_bytes.get_max());
5834
5835 if (!valid) {
5836 derr << "tried to set invalid params: "
5837 << ss.str()
5838 << dendl;
5839 }
5840 return valid ? 0 : -EINVAL;
5841}
5842
5843void FileStore::dump_start(const std::string& file)
5844{
31f18b77 5845 dout(10) << __FUNC__ << ": " << file << dendl;
7c673cae
FG
5846 if (m_filestore_do_dump) {
5847 dump_stop();
5848 }
5849 m_filestore_dump_fmt.reset();
5850 m_filestore_dump_fmt.open_array_section("dump");
5851 m_filestore_dump.open(file.c_str());
5852 m_filestore_do_dump = true;
5853}
5854
5855void FileStore::dump_stop()
5856{
31f18b77 5857 dout(10) << __FUNC__ << dendl;
7c673cae
FG
5858 m_filestore_do_dump = false;
5859 if (m_filestore_dump.is_open()) {
5860 m_filestore_dump_fmt.close_section();
5861 m_filestore_dump_fmt.flush(m_filestore_dump);
5862 m_filestore_dump.flush();
5863 m_filestore_dump.close();
5864 }
5865}
5866
5867void FileStore::dump_transactions(vector<ObjectStore::Transaction>& ls, uint64_t seq, OpSequencer *osr)
5868{
5869 m_filestore_dump_fmt.open_array_section("transactions");
5870 unsigned trans_num = 0;
5871 for (vector<ObjectStore::Transaction>::iterator i = ls.begin(); i != ls.end(); ++i, ++trans_num) {
5872 m_filestore_dump_fmt.open_object_section("transaction");
5873 m_filestore_dump_fmt.dump_string("osr", osr->get_name());
5874 m_filestore_dump_fmt.dump_unsigned("seq", seq);
5875 m_filestore_dump_fmt.dump_unsigned("trans_num", trans_num);
5876 (*i).dump(&m_filestore_dump_fmt);
5877 m_filestore_dump_fmt.close_section();
5878 }
5879 m_filestore_dump_fmt.close_section();
5880 m_filestore_dump_fmt.flush(m_filestore_dump);
5881 m_filestore_dump.flush();
5882}
5883
5884void FileStore::set_xattr_limits_via_conf()
5885{
5886 uint32_t fs_xattr_size;
5887 uint32_t fs_xattrs;
5888 uint32_t fs_xattr_max_value_size;
5889
5890 switch (m_fs_type) {
5891#if defined(__linux__)
5892 case XFS_SUPER_MAGIC:
5893 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_xfs;
5894 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_xfs;
5895 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_xfs;
5896 break;
5897 case BTRFS_SUPER_MAGIC:
5898 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_btrfs;
5899 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_btrfs;
5900 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_btrfs;
5901 break;
5902#endif
5903 default:
5904 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_other;
5905 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_other;
5906 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_other;
5907 break;
5908 }
5909
5910 // Use override value if set
5911 if (cct->_conf->filestore_max_inline_xattr_size)
5912 m_filestore_max_inline_xattr_size = cct->_conf->filestore_max_inline_xattr_size;
5913 else
5914 m_filestore_max_inline_xattr_size = fs_xattr_size;
5915
5916 // Use override value if set
5917 if (cct->_conf->filestore_max_inline_xattrs)
5918 m_filestore_max_inline_xattrs = cct->_conf->filestore_max_inline_xattrs;
5919 else
5920 m_filestore_max_inline_xattrs = fs_xattrs;
5921
5922 // Use override value if set
5923 if (cct->_conf->filestore_max_xattr_value_size)
5924 m_filestore_max_xattr_value_size = cct->_conf->filestore_max_xattr_value_size;
5925 else
5926 m_filestore_max_xattr_value_size = fs_xattr_max_value_size;
5927
5928 if (m_filestore_max_xattr_value_size < cct->_conf->osd_max_object_name_len) {
5929 derr << "WARNING: max attr value size ("
5930 << m_filestore_max_xattr_value_size
5931 << ") is smaller than osd_max_object_name_len ("
5932 << cct->_conf->osd_max_object_name_len
5933 << "). Your backend filesystem appears to not support attrs large "
5934 << "enough to handle the configured max rados name size. You may get "
5935 << "unexpected ENAMETOOLONG errors on rados operations or buggy "
5936 << "behavior"
5937 << dendl;
5938 }
5939}
5940
5941uint64_t FileStore::estimate_objects_overhead(uint64_t num_objects)
5942{
5943 uint64_t res = num_objects * blk_size / 2; //assumes that each object uses ( in average ) additional 1/2 block due to FS allocation granularity.
5944 return res;
5945}
5946
5947int FileStore::apply_layout_settings(const coll_t &cid)
5948{
31f18b77 5949 dout(20) << __FUNC__ << ": " << cid << dendl;
7c673cae
FG
5950 Index index;
5951 int r = get_index(cid, &index);
5952 if (r < 0) {
5953 dout(10) << "Error getting index for " << cid << ": " << cpp_strerror(r)
5954 << dendl;
5955 return r;
5956 }
5957
5958 return index->apply_layout_settings();
5959}
5960
5961
5962// -- FSSuperblock --
5963
5964void FSSuperblock::encode(bufferlist &bl) const
5965{
5966 ENCODE_START(2, 1, bl);
5967 compat_features.encode(bl);
5968 ::encode(omap_backend, bl);
5969 ENCODE_FINISH(bl);
5970}
5971
5972void FSSuperblock::decode(bufferlist::iterator &bl)
5973{
5974 DECODE_START(2, bl);
5975 compat_features.decode(bl);
5976 if (struct_v >= 2)
5977 ::decode(omap_backend, bl);
5978 else
5979 omap_backend = "leveldb";
5980 DECODE_FINISH(bl);
5981}
5982
5983void FSSuperblock::dump(Formatter *f) const
5984{
5985 f->open_object_section("compat");
5986 compat_features.dump(f);
5987 f->dump_string("omap_backend", omap_backend);
5988 f->close_section();
5989}
5990
5991void FSSuperblock::generate_test_instances(list<FSSuperblock*>& o)
5992{
5993 FSSuperblock z;
5994 o.push_back(new FSSuperblock(z));
5995 CompatSet::FeatureSet feature_compat;
5996 CompatSet::FeatureSet feature_ro_compat;
5997 CompatSet::FeatureSet feature_incompat;
5998 feature_incompat.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS);
5999 z.compat_features = CompatSet(feature_compat, feature_ro_compat,
6000 feature_incompat);
6001 o.push_back(new FSSuperblock(z));
6002 z.omap_backend = "rocksdb";
6003 o.push_back(new FSSuperblock(z));
6004}