]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/filestore/FileStore.cc
update sources to 12.2.7
[ceph.git] / ceph / src / os / filestore / FileStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15 #include "include/compat.h"
16 #include "include/int_types.h"
17 #include "boost/tuple/tuple.hpp"
18
19 #include <unistd.h>
20 #include <stdlib.h>
21 #include <sys/types.h>
22 #include <sys/stat.h>
23 #include <fcntl.h>
24 #include <sys/file.h>
25 #include <errno.h>
26 #include <dirent.h>
27 #include <sys/ioctl.h>
28
29 #if defined(__linux__)
30 #include <linux/fs.h>
31 #endif
32
33 #include <iostream>
34 #include <map>
35
36 #include "include/linux_fiemap.h"
37
38 #include "common/xattr.h"
39 #include "chain_xattr.h"
40
41 #if defined(DARWIN) || defined(__FreeBSD__)
42 #include <sys/param.h>
43 #include <sys/mount.h>
44 #endif // DARWIN
45
46
47 #include <fstream>
48 #include <sstream>
49
50 #include "FileStore.h"
51 #include "GenericFileStoreBackend.h"
52 #include "BtrfsFileStoreBackend.h"
53 #include "XfsFileStoreBackend.h"
54 #include "ZFSFileStoreBackend.h"
55 #include "common/BackTrace.h"
56 #include "include/types.h"
57 #include "FileJournal.h"
58
59 #include "osd/osd_types.h"
60 #include "include/color.h"
61 #include "include/buffer.h"
62
63 #include "common/Timer.h"
64 #include "common/debug.h"
65 #include "common/errno.h"
66 #include "common/run_cmd.h"
67 #include "common/safe_io.h"
68 #include "common/perf_counters.h"
69 #include "common/sync_filesystem.h"
70 #include "common/fd.h"
71 #include "HashIndex.h"
72 #include "DBObjectMap.h"
73 #include "kv/KeyValueDB.h"
74
75 #include "common/ceph_crypto.h"
76 using ceph::crypto::SHA1;
77
78 #include "include/assert.h"
79
80 #include "common/config.h"
81 #include "common/blkdev.h"
82
83 #ifdef WITH_LTTNG
84 #define TRACEPOINT_DEFINE
85 #define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
86 #include "tracing/objectstore.h"
87 #undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE
88 #undef TRACEPOINT_DEFINE
89 #else
90 #define tracepoint(...)
91 #endif
92
93 #define dout_context cct
94 #define dout_subsys ceph_subsys_filestore
95 #undef dout_prefix
96 #define dout_prefix *_dout << "filestore(" << basedir << ") "
97
98 #define COMMIT_SNAP_ITEM "snap_%llu"
99 #define CLUSTER_SNAP_ITEM "clustersnap_%s"
100
101 #define REPLAY_GUARD_XATTR "user.cephos.seq"
102 #define GLOBAL_REPLAY_GUARD_XATTR "user.cephos.gseq"
103
104 // XATTR_SPILL_OUT_NAME as a xattr is used to maintain that indicates whether
105 // xattrs spill over into DBObjectMap, if XATTR_SPILL_OUT_NAME exists in file
106 // xattrs and the value is "no", it indicates no xattrs in DBObjectMap
107 #define XATTR_SPILL_OUT_NAME "user.cephos.spill_out"
108 #define XATTR_NO_SPILL_OUT "0"
109 #define XATTR_SPILL_OUT "1"
110 #define __FUNC__ __func__ << "(" << __LINE__ << ")"
111
112 //Initial features in new superblock.
113 static CompatSet get_fs_initial_compat_set() {
114 CompatSet::FeatureSet ceph_osd_feature_compat;
115 CompatSet::FeatureSet ceph_osd_feature_ro_compat;
116 CompatSet::FeatureSet ceph_osd_feature_incompat;
117 return CompatSet(ceph_osd_feature_compat, ceph_osd_feature_ro_compat,
118 ceph_osd_feature_incompat);
119 }
120
121 //Features are added here that this FileStore supports.
122 static CompatSet get_fs_supported_compat_set() {
123 CompatSet compat = get_fs_initial_compat_set();
124 //Any features here can be set in code, but not in initial superblock
125 compat.incompat.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS);
126 return compat;
127 }
128
129 int FileStore::validate_hobject_key(const hobject_t &obj) const
130 {
131 unsigned len = LFNIndex::get_max_escaped_name_len(obj);
132 return len > m_filestore_max_xattr_value_size ? -ENAMETOOLONG : 0;
133 }
134
135 int FileStore::get_block_device_fsid(CephContext* cct, const string& path,
136 uuid_d *fsid)
137 {
138 // make sure we don't try to use aio or direct_io (and get annoying
139 // error messages from failing to do so); performance implications
140 // should be irrelevant for this use
141 FileJournal j(cct, *fsid, 0, 0, path.c_str(), false, false);
142 return j.peek_fsid(*fsid);
143 }
144
145 void FileStore::FSPerfTracker::update_from_perfcounters(
146 PerfCounters &logger)
147 {
148 os_commit_latency.consume_next(
149 logger.get_tavg_ms(
150 l_filestore_journal_latency));
151 os_apply_latency.consume_next(
152 logger.get_tavg_ms(
153 l_filestore_apply_latency));
154 }
155
156
157 ostream& operator<<(ostream& out, const FileStore::OpSequencer& s)
158 {
159 return out << *s.parent;
160 }
161
162 int FileStore::get_cdir(const coll_t& cid, char *s, int len)
163 {
164 const string &cid_str(cid.to_str());
165 return snprintf(s, len, "%s/current/%s", basedir.c_str(), cid_str.c_str());
166 }
167
168 int FileStore::get_index(const coll_t& cid, Index *index)
169 {
170 int r = index_manager.get_index(cid, basedir, index);
171 assert(!m_filestore_fail_eio || r != -EIO);
172 return r;
173 }
174
175 int FileStore::init_index(const coll_t& cid)
176 {
177 char path[PATH_MAX];
178 get_cdir(cid, path, sizeof(path));
179 int r = index_manager.init_index(cid, path, target_version);
180 assert(!m_filestore_fail_eio || r != -EIO);
181 return r;
182 }
183
184 int FileStore::lfn_find(const ghobject_t& oid, const Index& index, IndexedPath *path)
185 {
186 IndexedPath path2;
187 if (!path)
188 path = &path2;
189 int r, exist;
190 assert(NULL != index.index);
191 r = (index.index)->lookup(oid, path, &exist);
192 if (r < 0) {
193 assert(!m_filestore_fail_eio || r != -EIO);
194 return r;
195 }
196 if (!exist)
197 return -ENOENT;
198 return 0;
199 }
200
201 int FileStore::lfn_truncate(const coll_t& cid, const ghobject_t& oid, off_t length)
202 {
203 FDRef fd;
204 int r = lfn_open(cid, oid, false, &fd);
205 if (r < 0)
206 return r;
207 r = ::ftruncate(**fd, length);
208 if (r < 0)
209 r = -errno;
210 if (r >= 0 && m_filestore_sloppy_crc) {
211 int rc = backend->_crc_update_truncate(**fd, length);
212 assert(rc >= 0);
213 }
214 lfn_close(fd);
215 assert(!m_filestore_fail_eio || r != -EIO);
216 return r;
217 }
218
219 int FileStore::lfn_stat(const coll_t& cid, const ghobject_t& oid, struct stat *buf)
220 {
221 IndexedPath path;
222 Index index;
223 int r = get_index(cid, &index);
224 if (r < 0)
225 return r;
226
227 assert(NULL != index.index);
228 RWLock::RLocker l((index.index)->access_lock);
229
230 r = lfn_find(oid, index, &path);
231 if (r < 0)
232 return r;
233 r = ::stat(path->path(), buf);
234 if (r < 0)
235 r = -errno;
236 return r;
237 }
238
239 int FileStore::lfn_open(const coll_t& cid,
240 const ghobject_t& oid,
241 bool create,
242 FDRef *outfd,
243 Index *index)
244 {
245 assert(outfd);
246 int r = 0;
247 bool need_lock = true;
248 int flags = O_RDWR;
249
250 if (create)
251 flags |= O_CREAT;
252 if (cct->_conf->filestore_odsync_write) {
253 flags |= O_DSYNC;
254 }
255
256 Index index2;
257 if (!index) {
258 index = &index2;
259 }
260 if (!((*index).index)) {
261 r = get_index(cid, index);
262 if (r < 0) {
263 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
264 return r;
265 }
266 } else {
267 need_lock = false;
268 }
269
270 int fd, exist;
271 assert(NULL != (*index).index);
272 if (need_lock) {
273 ((*index).index)->access_lock.get_write();
274 }
275 if (!replaying) {
276 *outfd = fdcache.lookup(oid);
277 if (*outfd) {
278 if (need_lock) {
279 ((*index).index)->access_lock.put_write();
280 }
281 return 0;
282 }
283 }
284
285
286 IndexedPath path2;
287 IndexedPath *path = &path2;
288
289 r = (*index)->lookup(oid, path, &exist);
290 if (r < 0) {
291 derr << "could not find " << oid << " in index: "
292 << cpp_strerror(-r) << dendl;
293 goto fail;
294 }
295
296 r = ::open((*path)->path(), flags, 0644);
297 if (r < 0) {
298 r = -errno;
299 dout(10) << "error opening file " << (*path)->path() << " with flags="
300 << flags << ": " << cpp_strerror(-r) << dendl;
301 goto fail;
302 }
303 fd = r;
304 if (create && (!exist)) {
305 r = (*index)->created(oid, (*path)->path());
306 if (r < 0) {
307 VOID_TEMP_FAILURE_RETRY(::close(fd));
308 derr << "error creating " << oid << " (" << (*path)->path()
309 << ") in index: " << cpp_strerror(-r) << dendl;
310 goto fail;
311 }
312 r = chain_fsetxattr<true, true>(
313 fd, XATTR_SPILL_OUT_NAME,
314 XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT));
315 if (r < 0) {
316 VOID_TEMP_FAILURE_RETRY(::close(fd));
317 derr << "error setting spillout xattr for oid " << oid << " (" << (*path)->path()
318 << "):" << cpp_strerror(-r) << dendl;
319 goto fail;
320 }
321 }
322
323 if (!replaying) {
324 bool existed;
325 *outfd = fdcache.add(oid, fd, &existed);
326 if (existed) {
327 TEMP_FAILURE_RETRY(::close(fd));
328 }
329 } else {
330 *outfd = std::make_shared<FDCache::FD>(fd);
331 }
332
333 if (need_lock) {
334 ((*index).index)->access_lock.put_write();
335 }
336
337 return 0;
338
339 fail:
340
341 if (need_lock) {
342 ((*index).index)->access_lock.put_write();
343 }
344
345 assert(!m_filestore_fail_eio || r != -EIO);
346 return r;
347 }
348
349 void FileStore::lfn_close(FDRef fd)
350 {
351 }
352
353 int FileStore::lfn_link(const coll_t& c, const coll_t& newcid, const ghobject_t& o, const ghobject_t& newoid)
354 {
355 Index index_new, index_old;
356 IndexedPath path_new, path_old;
357 int exist;
358 int r;
359 bool index_same = false;
360 if (c < newcid) {
361 r = get_index(newcid, &index_new);
362 if (r < 0)
363 return r;
364 r = get_index(c, &index_old);
365 if (r < 0)
366 return r;
367 } else if (c == newcid) {
368 r = get_index(c, &index_old);
369 if (r < 0)
370 return r;
371 index_new = index_old;
372 index_same = true;
373 } else {
374 r = get_index(c, &index_old);
375 if (r < 0)
376 return r;
377 r = get_index(newcid, &index_new);
378 if (r < 0)
379 return r;
380 }
381
382 assert(NULL != index_old.index);
383 assert(NULL != index_new.index);
384
385 if (!index_same) {
386
387 RWLock::RLocker l1((index_old.index)->access_lock);
388
389 r = index_old->lookup(o, &path_old, &exist);
390 if (r < 0) {
391 assert(!m_filestore_fail_eio || r != -EIO);
392 return r;
393 }
394 if (!exist)
395 return -ENOENT;
396
397 RWLock::WLocker l2((index_new.index)->access_lock);
398
399 r = index_new->lookup(newoid, &path_new, &exist);
400 if (r < 0) {
401 assert(!m_filestore_fail_eio || r != -EIO);
402 return r;
403 }
404 if (exist)
405 return -EEXIST;
406
407 dout(25) << __FUNC__ << ": path_old: " << path_old << dendl;
408 dout(25) << __FUNC__ << ": path_new: " << path_new << dendl;
409 r = ::link(path_old->path(), path_new->path());
410 if (r < 0)
411 return -errno;
412
413 r = index_new->created(newoid, path_new->path());
414 if (r < 0) {
415 assert(!m_filestore_fail_eio || r != -EIO);
416 return r;
417 }
418 } else {
419 RWLock::WLocker l1((index_old.index)->access_lock);
420
421 r = index_old->lookup(o, &path_old, &exist);
422 if (r < 0) {
423 assert(!m_filestore_fail_eio || r != -EIO);
424 return r;
425 }
426 if (!exist)
427 return -ENOENT;
428
429 r = index_new->lookup(newoid, &path_new, &exist);
430 if (r < 0) {
431 assert(!m_filestore_fail_eio || r != -EIO);
432 return r;
433 }
434 if (exist)
435 return -EEXIST;
436
437 dout(25) << __FUNC__ << ": path_old: " << path_old << dendl;
438 dout(25) << __FUNC__ << ": path_new: " << path_new << dendl;
439 r = ::link(path_old->path(), path_new->path());
440 if (r < 0)
441 return -errno;
442
443 // make sure old fd for unlinked/overwritten file is gone
444 fdcache.clear(newoid);
445
446 r = index_new->created(newoid, path_new->path());
447 if (r < 0) {
448 assert(!m_filestore_fail_eio || r != -EIO);
449 return r;
450 }
451 }
452 return 0;
453 }
454
455 int FileStore::lfn_unlink(const coll_t& cid, const ghobject_t& o,
456 const SequencerPosition &spos,
457 bool force_clear_omap)
458 {
459 Index index;
460 int r = get_index(cid, &index);
461 if (r < 0) {
462 dout(25) << __FUNC__ << ": get_index failed " << cpp_strerror(r) << dendl;
463 return r;
464 }
465
466 assert(NULL != index.index);
467 RWLock::WLocker l((index.index)->access_lock);
468
469 {
470 IndexedPath path;
471 int hardlink;
472 r = index->lookup(o, &path, &hardlink);
473 if (r < 0) {
474 assert(!m_filestore_fail_eio || r != -EIO);
475 return r;
476 }
477
478 if (!force_clear_omap) {
479 if (hardlink == 0 || hardlink == 1) {
480 force_clear_omap = true;
481 }
482 }
483 if (force_clear_omap) {
484 dout(20) << __FUNC__ << ": clearing omap on " << o
485 << " in cid " << cid << dendl;
486 r = object_map->clear(o, &spos);
487 if (r < 0 && r != -ENOENT) {
488 dout(25) << __FUNC__ << ": omap clear failed " << cpp_strerror(r) << dendl;
489 assert(!m_filestore_fail_eio || r != -EIO);
490 return r;
491 }
492 if (cct->_conf->filestore_debug_inject_read_err) {
493 debug_obj_on_delete(o);
494 }
495 if (!m_disable_wbthrottle) {
496 wbthrottle.clear_object(o); // should be only non-cache ref
497 }
498 fdcache.clear(o);
499 } else {
500 /* Ensure that replay of this op doesn't result in the object_map
501 * going away.
502 */
503 if (!backend->can_checkpoint())
504 object_map->sync(&o, &spos);
505 }
506 if (hardlink == 0) {
507 if (!m_disable_wbthrottle) {
508 wbthrottle.clear_object(o); // should be only non-cache ref
509 }
510 return 0;
511 }
512 }
513 r = index->unlink(o);
514 if (r < 0) {
515 dout(25) << __FUNC__ << ": index unlink failed " << cpp_strerror(r) << dendl;
516 return r;
517 }
518 return 0;
519 }
520
521 FileStore::FileStore(CephContext* cct, const std::string &base,
522 const std::string &jdev, osflagbits_t flags,
523 const char *name, bool do_update) :
524 JournalingObjectStore(cct, base),
525 internal_name(name),
526 basedir(base), journalpath(jdev),
527 generic_flags(flags),
528 blk_size(0),
529 fsid_fd(-1), op_fd(-1),
530 basedir_fd(-1), current_fd(-1),
531 backend(NULL),
532 index_manager(cct, do_update),
533 lock("FileStore::lock"),
534 force_sync(false),
535 sync_entry_timeo_lock("FileStore::sync_entry_timeo_lock"),
536 timer(cct, sync_entry_timeo_lock),
537 stop(false), sync_thread(this),
538 fdcache(cct),
539 wbthrottle(cct),
540 next_osr_id(0),
541 m_disable_wbthrottle(cct->_conf->filestore_odsync_write ||
542 !cct->_conf->filestore_wbthrottle_enable),
543 throttle_ops(cct, "filestore_ops", cct->_conf->filestore_caller_concurrency),
544 throttle_bytes(cct, "filestore_bytes", cct->_conf->filestore_caller_concurrency),
545 m_ondisk_finisher_num(cct->_conf->filestore_ondisk_finisher_threads),
546 m_apply_finisher_num(cct->_conf->filestore_apply_finisher_threads),
547 op_tp(cct, "FileStore::op_tp", "tp_fstore_op", cct->_conf->filestore_op_threads, "filestore_op_threads"),
548 op_wq(this, cct->_conf->filestore_op_thread_timeout,
549 cct->_conf->filestore_op_thread_suicide_timeout, &op_tp),
550 logger(NULL),
551 trace_endpoint("0.0.0.0", 0, "FileStore"),
552 read_error_lock("FileStore::read_error_lock"),
553 m_filestore_commit_timeout(cct->_conf->filestore_commit_timeout),
554 m_filestore_journal_parallel(cct->_conf->filestore_journal_parallel ),
555 m_filestore_journal_trailing(cct->_conf->filestore_journal_trailing),
556 m_filestore_journal_writeahead(cct->_conf->filestore_journal_writeahead),
557 m_filestore_fiemap_threshold(cct->_conf->filestore_fiemap_threshold),
558 m_filestore_max_sync_interval(cct->_conf->filestore_max_sync_interval),
559 m_filestore_min_sync_interval(cct->_conf->filestore_min_sync_interval),
560 m_filestore_fail_eio(cct->_conf->filestore_fail_eio),
561 m_filestore_fadvise(cct->_conf->filestore_fadvise),
562 do_update(do_update),
563 m_journal_dio(cct->_conf->journal_dio),
564 m_journal_aio(cct->_conf->journal_aio),
565 m_journal_force_aio(cct->_conf->journal_force_aio),
566 m_osd_rollback_to_cluster_snap(cct->_conf->osd_rollback_to_cluster_snap),
567 m_osd_use_stale_snap(cct->_conf->osd_use_stale_snap),
568 m_filestore_do_dump(false),
569 m_filestore_dump_fmt(true),
570 m_filestore_sloppy_crc(cct->_conf->filestore_sloppy_crc),
571 m_filestore_sloppy_crc_block_size(cct->_conf->filestore_sloppy_crc_block_size),
572 m_filestore_max_alloc_hint_size(cct->_conf->filestore_max_alloc_hint_size),
573 m_fs_type(0),
574 m_filestore_max_inline_xattr_size(0),
575 m_filestore_max_inline_xattrs(0),
576 m_filestore_max_xattr_value_size(0)
577 {
578 m_filestore_kill_at = cct->_conf->filestore_kill_at;
579 for (int i = 0; i < m_ondisk_finisher_num; ++i) {
580 ostringstream oss;
581 oss << "filestore-ondisk-" << i;
582 Finisher *f = new Finisher(cct, oss.str(), "fn_odsk_fstore");
583 ondisk_finishers.push_back(f);
584 }
585 for (int i = 0; i < m_apply_finisher_num; ++i) {
586 ostringstream oss;
587 oss << "filestore-apply-" << i;
588 Finisher *f = new Finisher(cct, oss.str(), "fn_appl_fstore");
589 apply_finishers.push_back(f);
590 }
591
592 ostringstream oss;
593 oss << basedir << "/current";
594 current_fn = oss.str();
595
596 ostringstream sss;
597 sss << basedir << "/current/commit_op_seq";
598 current_op_seq_fn = sss.str();
599
600 ostringstream omss;
601 if (cct->_conf->filestore_omap_backend_path != "") {
602 omap_dir = cct->_conf->filestore_omap_backend_path;
603 } else {
604 omss << basedir << "/current/omap";
605 omap_dir = omss.str();
606 }
607
608 // initialize logger
609 PerfCountersBuilder plb(cct, internal_name, l_filestore_first, l_filestore_last);
610
611 plb.add_u64(l_filestore_journal_queue_ops, "journal_queue_ops", "Operations in journal queue");
612 plb.add_u64(l_filestore_journal_ops, "journal_ops", "Active journal entries to be applied");
613 plb.add_u64(l_filestore_journal_queue_bytes, "journal_queue_bytes", "Size of journal queue");
614 plb.add_u64(l_filestore_journal_bytes, "journal_bytes", "Active journal operation size to be applied");
615 plb.add_time_avg(l_filestore_journal_latency, "journal_latency", "Average journal queue completing latency",
616 NULL, PerfCountersBuilder::PRIO_USEFUL);
617 plb.add_u64_counter(l_filestore_journal_wr, "journal_wr", "Journal write IOs");
618 plb.add_u64_avg(l_filestore_journal_wr_bytes, "journal_wr_bytes", "Journal data written");
619 plb.add_u64(l_filestore_op_queue_max_ops, "op_queue_max_ops", "Max operations in writing to FS queue");
620 plb.add_u64(l_filestore_op_queue_ops, "op_queue_ops", "Operations in writing to FS queue");
621 plb.add_u64_counter(l_filestore_ops, "ops", "Operations written to store");
622 plb.add_u64(l_filestore_op_queue_max_bytes, "op_queue_max_bytes", "Max data in writing to FS queue");
623 plb.add_u64(l_filestore_op_queue_bytes, "op_queue_bytes", "Size of writing to FS queue");
624 plb.add_u64_counter(l_filestore_bytes, "bytes", "Data written to store");
625 plb.add_time_avg(l_filestore_apply_latency, "apply_latency", "Apply latency");
626 plb.add_u64(l_filestore_committing, "committing", "Is currently committing");
627
628 plb.add_u64_counter(l_filestore_commitcycle, "commitcycle", "Commit cycles");
629 plb.add_time_avg(l_filestore_commitcycle_interval, "commitcycle_interval", "Average interval between commits");
630 plb.add_time_avg(l_filestore_commitcycle_latency, "commitcycle_latency", "Average latency of commit");
631 plb.add_u64_counter(l_filestore_journal_full, "journal_full", "Journal writes while full");
632 plb.add_time_avg(l_filestore_queue_transaction_latency_avg, "queue_transaction_latency_avg",
633 "Store operation queue latency", NULL, PerfCountersBuilder::PRIO_USEFUL);
634 plb.add_time(l_filestore_sync_pause_max_lat, "sync_pause_max_latency", "Max latency of op_wq pause before syncfs");
635
636 logger = plb.create_perf_counters();
637
638 cct->get_perfcounters_collection()->add(logger);
639 cct->_conf->add_observer(this);
640
641 superblock.compat_features = get_fs_initial_compat_set();
642 }
643
644 FileStore::~FileStore()
645 {
646 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
647 delete *it;
648 *it = NULL;
649 }
650 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
651 delete *it;
652 *it = NULL;
653 }
654 cct->_conf->remove_observer(this);
655 cct->get_perfcounters_collection()->remove(logger);
656
657 if (journal)
658 journal->logger = NULL;
659 delete logger;
660
661 if (m_filestore_do_dump) {
662 dump_stop();
663 }
664 }
665
666 static void get_attrname(const char *name, char *buf, int len)
667 {
668 snprintf(buf, len, "user.ceph.%s", name);
669 }
670
671 bool parse_attrname(char **name)
672 {
673 if (strncmp(*name, "user.ceph.", 10) == 0) {
674 *name += 10;
675 return true;
676 }
677 return false;
678 }
679
680 void FileStore::collect_metadata(map<string,string> *pm)
681 {
682 char partition_path[PATH_MAX];
683 char dev_node[PATH_MAX];
684 int rc = 0;
685
686 (*pm)["filestore_backend"] = backend->get_name();
687 ostringstream ss;
688 ss << "0x" << std::hex << m_fs_type << std::dec;
689 (*pm)["filestore_f_type"] = ss.str();
690
691 if (cct->_conf->filestore_collect_device_partition_information) {
692 rc = get_device_by_fd(fsid_fd, partition_path, dev_node, PATH_MAX);
693 } else {
694 rc = -EINVAL;
695 }
696
697 switch (rc) {
698 case -EOPNOTSUPP:
699 case -EINVAL:
700 (*pm)["backend_filestore_partition_path"] = "unknown";
701 (*pm)["backend_filestore_dev_node"] = "unknown";
702 break;
703 case -ENODEV:
704 (*pm)["backend_filestore_partition_path"] = string(partition_path);
705 (*pm)["backend_filestore_dev_node"] = "unknown";
706 break;
707 default:
708 (*pm)["backend_filestore_partition_path"] = string(partition_path);
709 (*pm)["backend_filestore_dev_node"] = string(dev_node);
710 }
711 }
712
713 int FileStore::statfs(struct store_statfs_t *buf0)
714 {
715 struct statfs buf;
716 buf0->reset();
717 if (::statfs(basedir.c_str(), &buf) < 0) {
718 int r = -errno;
719 assert(!m_filestore_fail_eio || r != -EIO);
720 assert(r != -ENOENT);
721 return r;
722 }
723 buf0->total = buf.f_blocks * buf.f_bsize;
724 buf0->available = buf.f_bavail * buf.f_bsize;
725 // Adjust for writes pending in the journal
726 if (journal) {
727 uint64_t estimate = journal->get_journal_size_estimate();
728 if (buf0->available > estimate)
729 buf0->available -= estimate;
730 else
731 buf0->available = 0;
732 }
733 return 0;
734 }
735
736
737 void FileStore::new_journal()
738 {
739 if (journalpath.length()) {
740 dout(10) << "open_journal at " << journalpath << dendl;
741 journal = new FileJournal(cct, fsid, &finisher, &sync_cond,
742 journalpath.c_str(),
743 m_journal_dio, m_journal_aio,
744 m_journal_force_aio);
745 if (journal)
746 journal->logger = logger;
747 }
748 return;
749 }
750
751 int FileStore::dump_journal(ostream& out)
752 {
753 int r;
754
755 if (!journalpath.length())
756 return -EINVAL;
757
758 FileJournal *journal = new FileJournal(cct, fsid, &finisher, &sync_cond, journalpath.c_str(), m_journal_dio);
759 r = journal->dump(out);
760 delete journal;
761 return r;
762 }
763
764 FileStoreBackend *FileStoreBackend::create(long f_type, FileStore *fs)
765 {
766 switch (f_type) {
767 #if defined(__linux__)
768 case BTRFS_SUPER_MAGIC:
769 return new BtrfsFileStoreBackend(fs);
770 # ifdef HAVE_LIBXFS
771 case XFS_SUPER_MAGIC:
772 return new XfsFileStoreBackend(fs);
773 # endif
774 #endif
775 #ifdef HAVE_LIBZFS
776 case ZFS_SUPER_MAGIC:
777 return new ZFSFileStoreBackend(fs);
778 #endif
779 default:
780 return new GenericFileStoreBackend(fs);
781 }
782 }
783
784 void FileStore::create_backend(long f_type)
785 {
786 m_fs_type = f_type;
787
788 assert(backend == NULL);
789 backend = FileStoreBackend::create(f_type, this);
790
791 dout(0) << "backend " << backend->get_name()
792 << " (magic 0x" << std::hex << f_type << std::dec << ")"
793 << dendl;
794
795 switch (f_type) {
796 #if defined(__linux__)
797 case BTRFS_SUPER_MAGIC:
798 if (!m_disable_wbthrottle){
799 wbthrottle.set_fs(WBThrottle::BTRFS);
800 }
801 break;
802
803 case XFS_SUPER_MAGIC:
804 // wbthrottle is constructed with fs(WBThrottle::XFS)
805 break;
806 #endif
807 }
808
809 set_xattr_limits_via_conf();
810 }
811
812 int FileStore::mkfs()
813 {
814 int ret = 0;
815 char fsid_fn[PATH_MAX];
816 char fsid_str[40];
817 uuid_d old_fsid;
818 uuid_d old_omap_fsid;
819
820 dout(1) << "mkfs in " << basedir << dendl;
821 basedir_fd = ::open(basedir.c_str(), O_RDONLY);
822 if (basedir_fd < 0) {
823 ret = -errno;
824 derr << __FUNC__ << ": failed to open base dir " << basedir << ": " << cpp_strerror(ret) << dendl;
825 return ret;
826 }
827
828 // open+lock fsid
829 snprintf(fsid_fn, sizeof(fsid_fn), "%s/fsid", basedir.c_str());
830 fsid_fd = ::open(fsid_fn, O_RDWR|O_CREAT, 0644);
831 if (fsid_fd < 0) {
832 ret = -errno;
833 derr << __FUNC__ << ": failed to open " << fsid_fn << ": " << cpp_strerror(ret) << dendl;
834 goto close_basedir_fd;
835 }
836
837 if (lock_fsid() < 0) {
838 ret = -EBUSY;
839 goto close_fsid_fd;
840 }
841
842 if (read_fsid(fsid_fd, &old_fsid) < 0 || old_fsid.is_zero()) {
843 if (fsid.is_zero()) {
844 fsid.generate_random();
845 dout(1) << __FUNC__ << ": generated fsid " << fsid << dendl;
846 } else {
847 dout(1) << __FUNC__ << ": using provided fsid " << fsid << dendl;
848 }
849
850 fsid.print(fsid_str);
851 strcat(fsid_str, "\n");
852 ret = ::ftruncate(fsid_fd, 0);
853 if (ret < 0) {
854 ret = -errno;
855 derr << __FUNC__ << ": failed to truncate fsid: "
856 << cpp_strerror(ret) << dendl;
857 goto close_fsid_fd;
858 }
859 ret = safe_write(fsid_fd, fsid_str, strlen(fsid_str));
860 if (ret < 0) {
861 derr << __FUNC__ << ": failed to write fsid: "
862 << cpp_strerror(ret) << dendl;
863 goto close_fsid_fd;
864 }
865 if (::fsync(fsid_fd) < 0) {
866 ret = -errno;
867 derr << __FUNC__ << ": close failed: can't write fsid: "
868 << cpp_strerror(ret) << dendl;
869 goto close_fsid_fd;
870 }
871 dout(10) << __FUNC__ << ": fsid is " << fsid << dendl;
872 } else {
873 if (!fsid.is_zero() && fsid != old_fsid) {
874 derr << __FUNC__ << ": on-disk fsid " << old_fsid << " != provided " << fsid << dendl;
875 ret = -EINVAL;
876 goto close_fsid_fd;
877 }
878 fsid = old_fsid;
879 dout(1) << __FUNC__ << ": fsid is already set to " << fsid << dendl;
880 }
881
882 // version stamp
883 ret = write_version_stamp();
884 if (ret < 0) {
885 derr << __FUNC__ << ": write_version_stamp() failed: "
886 << cpp_strerror(ret) << dendl;
887 goto close_fsid_fd;
888 }
889
890 // superblock
891 superblock.omap_backend = cct->_conf->filestore_omap_backend;
892 ret = write_superblock();
893 if (ret < 0) {
894 derr << __FUNC__ << ": write_superblock() failed: "
895 << cpp_strerror(ret) << dendl;
896 goto close_fsid_fd;
897 }
898
899 struct statfs basefs;
900 ret = ::fstatfs(basedir_fd, &basefs);
901 if (ret < 0) {
902 ret = -errno;
903 derr << __FUNC__ << ": cannot fstatfs basedir "
904 << cpp_strerror(ret) << dendl;
905 goto close_fsid_fd;
906 }
907
908 #if defined(__linux__)
909 if (basefs.f_type == BTRFS_SUPER_MAGIC &&
910 !g_ceph_context->check_experimental_feature_enabled("btrfs")) {
911 derr << __FUNC__ << ": deprecated btrfs support is not enabled" << dendl;
912 goto close_fsid_fd;
913 }
914 #endif
915
916 create_backend(basefs.f_type);
917
918 ret = backend->create_current();
919 if (ret < 0) {
920 derr << __FUNC__ << ": failed to create current/ " << cpp_strerror(ret) << dendl;
921 goto close_fsid_fd;
922 }
923
924 // write initial op_seq
925 {
926 uint64_t initial_seq = 0;
927 int fd = read_op_seq(&initial_seq);
928 if (fd < 0) {
929 ret = fd;
930 derr << __FUNC__ << ": failed to create " << current_op_seq_fn << ": "
931 << cpp_strerror(ret) << dendl;
932 goto close_fsid_fd;
933 }
934 if (initial_seq == 0) {
935 ret = write_op_seq(fd, 1);
936 if (ret < 0) {
937 VOID_TEMP_FAILURE_RETRY(::close(fd));
938 derr << __FUNC__ << ": failed to write to " << current_op_seq_fn << ": "
939 << cpp_strerror(ret) << dendl;
940 goto close_fsid_fd;
941 }
942
943 if (backend->can_checkpoint()) {
944 // create snap_1 too
945 current_fd = ::open(current_fn.c_str(), O_RDONLY);
946 assert(current_fd >= 0);
947 char s[NAME_MAX];
948 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, 1ull);
949 ret = backend->create_checkpoint(s, NULL);
950 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
951 if (ret < 0 && ret != -EEXIST) {
952 VOID_TEMP_FAILURE_RETRY(::close(fd));
953 derr << __FUNC__ << ": failed to create snap_1: " << cpp_strerror(ret) << dendl;
954 goto close_fsid_fd;
955 }
956 }
957 }
958 VOID_TEMP_FAILURE_RETRY(::close(fd));
959 }
960 ret = KeyValueDB::test_init(superblock.omap_backend, omap_dir);
961 if (ret < 0) {
962 derr << __FUNC__ << ": failed to create " << cct->_conf->filestore_omap_backend << dendl;
963 goto close_fsid_fd;
964 }
965 // create fsid under omap
966 // open+lock fsid
967 int omap_fsid_fd;
968 char omap_fsid_fn[PATH_MAX];
969 snprintf(omap_fsid_fn, sizeof(omap_fsid_fn), "%s/osd_uuid", omap_dir.c_str());
970 omap_fsid_fd = ::open(omap_fsid_fn, O_RDWR|O_CREAT, 0644);
971 if (omap_fsid_fd < 0) {
972 ret = -errno;
973 derr << __FUNC__ << ": failed to open " << omap_fsid_fn << ": " << cpp_strerror(ret) << dendl;
974 goto close_fsid_fd;
975 }
976
977 if (read_fsid(omap_fsid_fd, &old_omap_fsid) < 0 || old_omap_fsid.is_zero()) {
978 assert(!fsid.is_zero());
979 fsid.print(fsid_str);
980 strcat(fsid_str, "\n");
981 ret = ::ftruncate(omap_fsid_fd, 0);
982 if (ret < 0) {
983 ret = -errno;
984 derr << __FUNC__ << ": failed to truncate fsid: "
985 << cpp_strerror(ret) << dendl;
986 goto close_omap_fsid_fd;
987 }
988 ret = safe_write(omap_fsid_fd, fsid_str, strlen(fsid_str));
989 if (ret < 0) {
990 derr << __FUNC__ << ": failed to write fsid: "
991 << cpp_strerror(ret) << dendl;
992 goto close_omap_fsid_fd;
993 }
994 dout(10) << __FUNC__ << ": write success, fsid:" << fsid_str << ", ret:" << ret << dendl;
995 if (::fsync(omap_fsid_fd) < 0) {
996 ret = -errno;
997 derr << __FUNC__ << ": close failed: can't write fsid: "
998 << cpp_strerror(ret) << dendl;
999 goto close_omap_fsid_fd;
1000 }
1001 dout(10) << "mkfs omap fsid is " << fsid << dendl;
1002 } else {
1003 if (fsid != old_omap_fsid) {
1004 derr << __FUNC__ << ": " << omap_fsid_fn
1005 << " has existed omap fsid " << old_omap_fsid
1006 << " != expected osd fsid " << fsid
1007 << dendl;
1008 ret = -EINVAL;
1009 goto close_omap_fsid_fd;
1010 }
1011 dout(1) << __FUNC__ << ": omap fsid is already set to " << fsid << dendl;
1012 }
1013
1014 dout(1) << cct->_conf->filestore_omap_backend << " db exists/created" << dendl;
1015
1016 // journal?
1017 ret = mkjournal();
1018 if (ret)
1019 goto close_omap_fsid_fd;
1020
1021 ret = write_meta("type", "filestore");
1022 if (ret)
1023 goto close_omap_fsid_fd;
1024
1025 dout(1) << "mkfs done in " << basedir << dendl;
1026 ret = 0;
1027
1028 close_omap_fsid_fd:
1029 VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd));
1030 close_fsid_fd:
1031 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1032 fsid_fd = -1;
1033 close_basedir_fd:
1034 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1035 delete backend;
1036 backend = NULL;
1037 return ret;
1038 }
1039
1040 int FileStore::mkjournal()
1041 {
1042 // read fsid
1043 int ret;
1044 char fn[PATH_MAX];
1045 snprintf(fn, sizeof(fn), "%s/fsid", basedir.c_str());
1046 int fd = ::open(fn, O_RDONLY, 0644);
1047 if (fd < 0) {
1048 int err = errno;
1049 derr << __FUNC__ << ": open error: " << cpp_strerror(err) << dendl;
1050 return -err;
1051 }
1052 ret = read_fsid(fd, &fsid);
1053 if (ret < 0) {
1054 derr << __FUNC__ << ": read error: " << cpp_strerror(ret) << dendl;
1055 VOID_TEMP_FAILURE_RETRY(::close(fd));
1056 return ret;
1057 }
1058 VOID_TEMP_FAILURE_RETRY(::close(fd));
1059
1060 ret = 0;
1061
1062 new_journal();
1063 if (journal) {
1064 ret = journal->check();
1065 if (ret < 0) {
1066 ret = journal->create();
1067 if (ret)
1068 derr << __FUNC__ << ": error creating journal on " << journalpath
1069 << ": " << cpp_strerror(ret) << dendl;
1070 else
1071 dout(0) << __FUNC__ << ": created journal on " << journalpath << dendl;
1072 }
1073 delete journal;
1074 journal = 0;
1075 }
1076 return ret;
1077 }
1078
1079 int FileStore::read_fsid(int fd, uuid_d *uuid)
1080 {
1081 char fsid_str[40];
1082 memset(fsid_str, 0, sizeof(fsid_str));
1083 int ret = safe_read(fd, fsid_str, sizeof(fsid_str));
1084 if (ret < 0)
1085 return ret;
1086 if (ret == 8) {
1087 // old 64-bit fsid... mirror it.
1088 *(uint64_t*)&uuid->bytes()[0] = *(uint64_t*)fsid_str;
1089 *(uint64_t*)&uuid->bytes()[8] = *(uint64_t*)fsid_str;
1090 return 0;
1091 }
1092
1093 if (ret > 36)
1094 fsid_str[36] = 0;
1095 else
1096 fsid_str[ret] = 0;
1097 if (!uuid->parse(fsid_str))
1098 return -EINVAL;
1099 return 0;
1100 }
1101
1102 int FileStore::lock_fsid()
1103 {
1104 struct flock l;
1105 memset(&l, 0, sizeof(l));
1106 l.l_type = F_WRLCK;
1107 l.l_whence = SEEK_SET;
1108 l.l_start = 0;
1109 l.l_len = 0;
1110 int r = ::fcntl(fsid_fd, F_SETLK, &l);
1111 if (r < 0) {
1112 int err = errno;
1113 dout(0) << __FUNC__ << ": failed to lock " << basedir << "/fsid, is another ceph-osd still running? "
1114 << cpp_strerror(err) << dendl;
1115 return -err;
1116 }
1117 return 0;
1118 }
1119
1120 bool FileStore::test_mount_in_use()
1121 {
1122 dout(5) << __FUNC__ << ": basedir " << basedir << " journal " << journalpath << dendl;
1123 char fn[PATH_MAX];
1124 snprintf(fn, sizeof(fn), "%s/fsid", basedir.c_str());
1125
1126 // verify fs isn't in use
1127
1128 fsid_fd = ::open(fn, O_RDWR, 0644);
1129 if (fsid_fd < 0)
1130 return 0; // no fsid, ok.
1131 bool inuse = lock_fsid() < 0;
1132 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1133 fsid_fd = -1;
1134 return inuse;
1135 }
1136
1137 bool FileStore::is_rotational()
1138 {
1139 bool rotational;
1140 if (backend) {
1141 rotational = backend->is_rotational();
1142 } else {
1143 int fd = ::open(basedir.c_str(), O_RDONLY);
1144 if (fd < 0)
1145 return true;
1146 struct statfs st;
1147 int r = ::fstatfs(fd, &st);
1148 ::close(fd);
1149 if (r < 0) {
1150 return true;
1151 }
1152 create_backend(st.f_type);
1153 rotational = backend->is_rotational();
1154 delete backend;
1155 backend = NULL;
1156 }
1157 dout(10) << __func__ << " " << (int)rotational << dendl;
1158 return rotational;
1159 }
1160
1161 bool FileStore::is_journal_rotational()
1162 {
1163 bool journal_rotational;
1164 if (backend) {
1165 journal_rotational = backend->is_journal_rotational();
1166 } else {
1167 int fd = ::open(journalpath.c_str(), O_RDONLY);
1168 if (fd < 0)
1169 return true;
1170 struct statfs st;
1171 int r = ::fstatfs(fd, &st);
1172 ::close(fd);
1173 if (r < 0) {
1174 return true;
1175 }
1176 create_backend(st.f_type);
1177 journal_rotational = backend->is_journal_rotational();
1178 delete backend;
1179 backend = NULL;
1180 }
1181 dout(10) << __func__ << " " << (int)journal_rotational << dendl;
1182 return journal_rotational;
1183 }
1184
1185 int FileStore::_detect_fs()
1186 {
1187 struct statfs st;
1188 int r = ::fstatfs(basedir_fd, &st);
1189 if (r < 0)
1190 return -errno;
1191
1192 blk_size = st.f_bsize;
1193
1194 #if defined(__linux__)
1195 if (st.f_type == BTRFS_SUPER_MAGIC &&
1196 !g_ceph_context->check_experimental_feature_enabled("btrfs")) {
1197 derr <<__FUNC__ << ": deprecated btrfs support is not enabled" << dendl;
1198 return -EPERM;
1199 }
1200 #endif
1201
1202 create_backend(st.f_type);
1203
1204 r = backend->detect_features();
1205 if (r < 0) {
1206 derr << __FUNC__ << ": detect_features error: " << cpp_strerror(r) << dendl;
1207 return r;
1208 }
1209
1210 // test xattrs
1211 char fn[PATH_MAX];
1212 int x = rand();
1213 int y = x+1;
1214 snprintf(fn, sizeof(fn), "%s/xattr_test", basedir.c_str());
1215 int tmpfd = ::open(fn, O_CREAT|O_WRONLY|O_TRUNC, 0700);
1216 if (tmpfd < 0) {
1217 int ret = -errno;
1218 derr << __FUNC__ << ": unable to create " << fn << ": " << cpp_strerror(ret) << dendl;
1219 return ret;
1220 }
1221
1222 int ret = chain_fsetxattr(tmpfd, "user.test", &x, sizeof(x));
1223 if (ret >= 0)
1224 ret = chain_fgetxattr(tmpfd, "user.test", &y, sizeof(y));
1225 if ((ret < 0) || (x != y)) {
1226 derr << "Extended attributes don't appear to work. ";
1227 if (ret)
1228 *_dout << "Got error " + cpp_strerror(ret) + ". ";
1229 *_dout << "If you are using ext3 or ext4, be sure to mount the underlying "
1230 << "file system with the 'user_xattr' option." << dendl;
1231 ::unlink(fn);
1232 VOID_TEMP_FAILURE_RETRY(::close(tmpfd));
1233 return -ENOTSUP;
1234 }
1235
1236 char buf[1000];
1237 memset(buf, 0, sizeof(buf)); // shut up valgrind
1238 chain_fsetxattr(tmpfd, "user.test", &buf, sizeof(buf));
1239 chain_fsetxattr(tmpfd, "user.test2", &buf, sizeof(buf));
1240 chain_fsetxattr(tmpfd, "user.test3", &buf, sizeof(buf));
1241 chain_fsetxattr(tmpfd, "user.test4", &buf, sizeof(buf));
1242 ret = chain_fsetxattr(tmpfd, "user.test5", &buf, sizeof(buf));
1243 if (ret == -ENOSPC) {
1244 dout(0) << "limited size xattrs" << dendl;
1245 }
1246 chain_fremovexattr(tmpfd, "user.test");
1247 chain_fremovexattr(tmpfd, "user.test2");
1248 chain_fremovexattr(tmpfd, "user.test3");
1249 chain_fremovexattr(tmpfd, "user.test4");
1250 chain_fremovexattr(tmpfd, "user.test5");
1251
1252 ::unlink(fn);
1253 VOID_TEMP_FAILURE_RETRY(::close(tmpfd));
1254
1255 return 0;
1256 }
1257
1258 int FileStore::_sanity_check_fs()
1259 {
1260 // sanity check(s)
1261
1262 if (((int)m_filestore_journal_writeahead +
1263 (int)m_filestore_journal_parallel +
1264 (int)m_filestore_journal_trailing) > 1) {
1265 dout(0) << "mount ERROR: more than one of filestore journal {writeahead,parallel,trailing} enabled" << dendl;
1266 cerr << TEXT_RED
1267 << " ** WARNING: more than one of 'filestore journal {writeahead,parallel,trailing}'\n"
1268 << " is enabled in ceph.conf. You must choose a single journal mode."
1269 << TEXT_NORMAL << std::endl;
1270 return -EINVAL;
1271 }
1272
1273 if (!backend->can_checkpoint()) {
1274 if (!journal || !m_filestore_journal_writeahead) {
1275 dout(0) << "mount WARNING: no btrfs, and no journal in writeahead mode; data may be lost" << dendl;
1276 cerr << TEXT_RED
1277 << " ** WARNING: no btrfs AND (no journal OR journal not in writeahead mode)\n"
1278 << " For non-btrfs volumes, a writeahead journal is required to\n"
1279 << " maintain on-disk consistency in the event of a crash. Your conf\n"
1280 << " should include something like:\n"
1281 << " osd journal = /path/to/journal_device_or_file\n"
1282 << " filestore journal writeahead = true\n"
1283 << TEXT_NORMAL;
1284 }
1285 }
1286
1287 if (!journal) {
1288 dout(0) << "mount WARNING: no journal" << dendl;
1289 cerr << TEXT_YELLOW
1290 << " ** WARNING: No osd journal is configured: write latency may be high.\n"
1291 << " If you will not be using an osd journal, write latency may be\n"
1292 << " relatively high. It can be reduced somewhat by lowering\n"
1293 << " filestore_max_sync_interval, but lower values mean lower write\n"
1294 << " throughput, especially with spinning disks.\n"
1295 << TEXT_NORMAL;
1296 }
1297
1298 return 0;
1299 }
1300
1301 int FileStore::write_superblock()
1302 {
1303 bufferlist bl;
1304 ::encode(superblock, bl);
1305 return safe_write_file(basedir.c_str(), "superblock",
1306 bl.c_str(), bl.length());
1307 }
1308
1309 int FileStore::read_superblock()
1310 {
1311 bufferptr bp(PATH_MAX);
1312 int ret = safe_read_file(basedir.c_str(), "superblock",
1313 bp.c_str(), bp.length());
1314 if (ret < 0) {
1315 if (ret == -ENOENT) {
1316 // If the file doesn't exist write initial CompatSet
1317 return write_superblock();
1318 }
1319 return ret;
1320 }
1321
1322 bufferlist bl;
1323 bl.push_back(std::move(bp));
1324 bufferlist::iterator i = bl.begin();
1325 ::decode(superblock, i);
1326 return 0;
1327 }
1328
1329 int FileStore::update_version_stamp()
1330 {
1331 return write_version_stamp();
1332 }
1333
1334 int FileStore::version_stamp_is_valid(uint32_t *version)
1335 {
1336 bufferptr bp(PATH_MAX);
1337 int ret = safe_read_file(basedir.c_str(), "store_version",
1338 bp.c_str(), bp.length());
1339 if (ret < 0) {
1340 return ret;
1341 }
1342 bufferlist bl;
1343 bl.push_back(std::move(bp));
1344 bufferlist::iterator i = bl.begin();
1345 ::decode(*version, i);
1346 dout(10) << __FUNC__ << ": was " << *version << " vs target "
1347 << target_version << dendl;
1348 if (*version == target_version)
1349 return 1;
1350 else
1351 return 0;
1352 }
1353
1354 int FileStore::write_version_stamp()
1355 {
1356 dout(1) << __FUNC__ << ": " << target_version << dendl;
1357 bufferlist bl;
1358 ::encode(target_version, bl);
1359
1360 return safe_write_file(basedir.c_str(), "store_version",
1361 bl.c_str(), bl.length());
1362 }
1363
1364 int FileStore::upgrade()
1365 {
1366 dout(1) << __FUNC__ << dendl;
1367 uint32_t version;
1368 int r = version_stamp_is_valid(&version);
1369
1370 if (r == -ENOENT) {
1371 derr << "The store_version file doesn't exist." << dendl;
1372 return -EINVAL;
1373 }
1374 if (r < 0)
1375 return r;
1376 if (r == 1)
1377 return 0;
1378
1379 if (version < 3) {
1380 derr << "ObjectStore is old at version " << version << ". Please upgrade to firefly v0.80.x, convert your store, and then upgrade." << dendl;
1381 return -EINVAL;
1382 }
1383
1384 // nothing necessary in FileStore for v3 -> v4 upgrade; we just need to
1385 // open up DBObjectMap with the do_upgrade flag, which we already did.
1386 update_version_stamp();
1387 return 0;
1388 }
1389
1390 int FileStore::read_op_seq(uint64_t *seq)
1391 {
1392 int op_fd = ::open(current_op_seq_fn.c_str(), O_CREAT|O_RDWR, 0644);
1393 if (op_fd < 0) {
1394 int r = -errno;
1395 assert(!m_filestore_fail_eio || r != -EIO);
1396 return r;
1397 }
1398 char s[40];
1399 memset(s, 0, sizeof(s));
1400 int ret = safe_read(op_fd, s, sizeof(s) - 1);
1401 if (ret < 0) {
1402 derr << __FUNC__ << ": error reading " << current_op_seq_fn << ": " << cpp_strerror(ret) << dendl;
1403 VOID_TEMP_FAILURE_RETRY(::close(op_fd));
1404 assert(!m_filestore_fail_eio || ret != -EIO);
1405 return ret;
1406 }
1407 *seq = atoll(s);
1408 return op_fd;
1409 }
1410
1411 int FileStore::write_op_seq(int fd, uint64_t seq)
1412 {
1413 char s[30];
1414 snprintf(s, sizeof(s), "%" PRId64 "\n", seq);
1415 int ret = TEMP_FAILURE_RETRY(::pwrite(fd, s, strlen(s), 0));
1416 if (ret < 0) {
1417 ret = -errno;
1418 assert(!m_filestore_fail_eio || ret != -EIO);
1419 }
1420 return ret;
1421 }
1422
1423 int FileStore::mount()
1424 {
1425 int ret;
1426 char buf[PATH_MAX];
1427 uint64_t initial_op_seq;
1428 uuid_d omap_fsid;
1429 set<string> cluster_snaps;
1430 CompatSet supported_compat_set = get_fs_supported_compat_set();
1431
1432 dout(5) << "basedir " << basedir << " journal " << journalpath << dendl;
1433
1434 ret = set_throttle_params();
1435 if (ret != 0)
1436 goto done;
1437
1438 // make sure global base dir exists
1439 if (::access(basedir.c_str(), R_OK | W_OK)) {
1440 ret = -errno;
1441 derr << __FUNC__ << ": unable to access basedir '" << basedir << "': "
1442 << cpp_strerror(ret) << dendl;
1443 goto done;
1444 }
1445
1446 // get fsid
1447 snprintf(buf, sizeof(buf), "%s/fsid", basedir.c_str());
1448 fsid_fd = ::open(buf, O_RDWR, 0644);
1449 if (fsid_fd < 0) {
1450 ret = -errno;
1451 derr << __FUNC__ << ": error opening '" << buf << "': "
1452 << cpp_strerror(ret) << dendl;
1453 goto done;
1454 }
1455
1456 ret = read_fsid(fsid_fd, &fsid);
1457 if (ret < 0) {
1458 derr << __FUNC__ << ": error reading fsid_fd: " << cpp_strerror(ret)
1459 << dendl;
1460 goto close_fsid_fd;
1461 }
1462
1463 if (lock_fsid() < 0) {
1464 derr << __FUNC__ << ": lock_fsid failed" << dendl;
1465 ret = -EBUSY;
1466 goto close_fsid_fd;
1467 }
1468
1469 dout(10) << "mount fsid is " << fsid << dendl;
1470
1471
1472 uint32_t version_stamp;
1473 ret = version_stamp_is_valid(&version_stamp);
1474 if (ret < 0) {
1475 derr << __FUNC__ << ": error in version_stamp_is_valid: "
1476 << cpp_strerror(ret) << dendl;
1477 goto close_fsid_fd;
1478 } else if (ret == 0) {
1479 if (do_update || (int)version_stamp < cct->_conf->filestore_update_to) {
1480 derr << __FUNC__ << ": stale version stamp detected: "
1481 << version_stamp
1482 << ". Proceeding, do_update "
1483 << "is set, performing disk format upgrade."
1484 << dendl;
1485 do_update = true;
1486 } else {
1487 ret = -EINVAL;
1488 derr << __FUNC__ << ": stale version stamp " << version_stamp
1489 << ". Please run the FileStore update script before starting the "
1490 << "OSD, or set filestore_update_to to " << target_version
1491 << " (currently " << cct->_conf->filestore_update_to << ")"
1492 << dendl;
1493 goto close_fsid_fd;
1494 }
1495 }
1496
1497 ret = read_superblock();
1498 if (ret < 0) {
1499 goto close_fsid_fd;
1500 }
1501
1502 // Check if this FileStore supports all the necessary features to mount
1503 if (supported_compat_set.compare(superblock.compat_features) == -1) {
1504 derr << __FUNC__ << ": Incompatible features set "
1505 << superblock.compat_features << dendl;
1506 ret = -EINVAL;
1507 goto close_fsid_fd;
1508 }
1509
1510 // open some dir handles
1511 basedir_fd = ::open(basedir.c_str(), O_RDONLY);
1512 if (basedir_fd < 0) {
1513 ret = -errno;
1514 derr << __FUNC__ << ": failed to open " << basedir << ": "
1515 << cpp_strerror(ret) << dendl;
1516 basedir_fd = -1;
1517 goto close_fsid_fd;
1518 }
1519
1520 // test for btrfs, xattrs, etc.
1521 ret = _detect_fs();
1522 if (ret < 0) {
1523 derr << __FUNC__ << ": error in _detect_fs: "
1524 << cpp_strerror(ret) << dendl;
1525 goto close_basedir_fd;
1526 }
1527
1528 {
1529 list<string> ls;
1530 ret = backend->list_checkpoints(ls);
1531 if (ret < 0) {
1532 derr << __FUNC__ << ": error in _list_snaps: "<< cpp_strerror(ret) << dendl;
1533 goto close_basedir_fd;
1534 }
1535
1536 long long unsigned c, prev = 0;
1537 char clustersnap[NAME_MAX];
1538 for (list<string>::iterator it = ls.begin(); it != ls.end(); ++it) {
1539 if (sscanf(it->c_str(), COMMIT_SNAP_ITEM, &c) == 1) {
1540 assert(c > prev);
1541 prev = c;
1542 snaps.push_back(c);
1543 } else if (sscanf(it->c_str(), CLUSTER_SNAP_ITEM, clustersnap) == 1)
1544 cluster_snaps.insert(*it);
1545 }
1546 }
1547
1548 if (m_osd_rollback_to_cluster_snap.length() &&
1549 cluster_snaps.count(m_osd_rollback_to_cluster_snap) == 0) {
1550 derr << "rollback to cluster snapshot '" << m_osd_rollback_to_cluster_snap << "': not found" << dendl;
1551 ret = -ENOENT;
1552 goto close_basedir_fd;
1553 }
1554
1555 char nosnapfn[200];
1556 snprintf(nosnapfn, sizeof(nosnapfn), "%s/nosnap", current_fn.c_str());
1557
1558 if (backend->can_checkpoint()) {
1559 if (snaps.empty()) {
1560 dout(0) << __FUNC__ << ": WARNING: no consistent snaps found, store may be in inconsistent state" << dendl;
1561 } else {
1562 char s[NAME_MAX];
1563 uint64_t curr_seq = 0;
1564
1565 if (m_osd_rollback_to_cluster_snap.length()) {
1566 derr << TEXT_RED
1567 << " ** NOTE: rolling back to cluster snapshot " << m_osd_rollback_to_cluster_snap << " **"
1568 << TEXT_NORMAL
1569 << dendl;
1570 assert(cluster_snaps.count(m_osd_rollback_to_cluster_snap));
1571 snprintf(s, sizeof(s), CLUSTER_SNAP_ITEM, m_osd_rollback_to_cluster_snap.c_str());
1572 } else {
1573 {
1574 int fd = read_op_seq(&curr_seq);
1575 if (fd >= 0) {
1576 VOID_TEMP_FAILURE_RETRY(::close(fd));
1577 }
1578 }
1579 if (curr_seq)
1580 dout(10) << " current/ seq was " << curr_seq << dendl;
1581 else
1582 dout(10) << " current/ missing entirely (unusual, but okay)" << dendl;
1583
1584 uint64_t cp = snaps.back();
1585 dout(10) << " most recent snap from " << snaps << " is " << cp << dendl;
1586
1587 // if current/ is marked as non-snapshotted, refuse to roll
1588 // back (without clear direction) to avoid throwing out new
1589 // data.
1590 struct stat st;
1591 if (::stat(nosnapfn, &st) == 0) {
1592 if (!m_osd_use_stale_snap) {
1593 derr << "ERROR: " << nosnapfn << " exists, not rolling back to avoid losing new data" << dendl;
1594 derr << "Force rollback to old snapshotted version with 'osd use stale snap = true'" << dendl;
1595 derr << "config option for --osd-use-stale-snap startup argument." << dendl;
1596 ret = -ENOTSUP;
1597 goto close_basedir_fd;
1598 }
1599 derr << "WARNING: user forced start with data sequence mismatch: current was " << curr_seq
1600 << ", newest snap is " << cp << dendl;
1601 cerr << TEXT_YELLOW
1602 << " ** WARNING: forcing the use of stale snapshot data **"
1603 << TEXT_NORMAL << std::endl;
1604 }
1605
1606 dout(10) << __FUNC__ << ": rolling back to consistent snap " << cp << dendl;
1607 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)cp);
1608 }
1609
1610 // drop current?
1611 ret = backend->rollback_to(s);
1612 if (ret) {
1613 derr << __FUNC__ << ": error rolling back to " << s << ": "
1614 << cpp_strerror(ret) << dendl;
1615 goto close_basedir_fd;
1616 }
1617 }
1618 }
1619 initial_op_seq = 0;
1620
1621 current_fd = ::open(current_fn.c_str(), O_RDONLY);
1622 if (current_fd < 0) {
1623 ret = -errno;
1624 derr << __FUNC__ << ": error opening: " << current_fn << ": " << cpp_strerror(ret) << dendl;
1625 goto close_basedir_fd;
1626 }
1627
1628 assert(current_fd >= 0);
1629
1630 op_fd = read_op_seq(&initial_op_seq);
1631 if (op_fd < 0) {
1632 ret = op_fd;
1633 derr << __FUNC__ << ": read_op_seq failed" << dendl;
1634 goto close_current_fd;
1635 }
1636
1637 dout(5) << "mount op_seq is " << initial_op_seq << dendl;
1638 if (initial_op_seq == 0) {
1639 derr << "mount initial op seq is 0; something is wrong" << dendl;
1640 ret = -EINVAL;
1641 goto close_current_fd;
1642 }
1643
1644 if (!backend->can_checkpoint()) {
1645 // mark current/ as non-snapshotted so that we don't rollback away
1646 // from it.
1647 int r = ::creat(nosnapfn, 0644);
1648 if (r < 0) {
1649 ret = -errno;
1650 derr << __FUNC__ << ": failed to create current/nosnap" << dendl;
1651 goto close_current_fd;
1652 }
1653 VOID_TEMP_FAILURE_RETRY(::close(r));
1654 } else {
1655 // clear nosnap marker, if present.
1656 ::unlink(nosnapfn);
1657 }
1658
1659 // check fsid with omap
1660 // get omap fsid
1661 int omap_fsid_fd;
1662 char omap_fsid_buf[PATH_MAX];
1663 struct ::stat omap_fsid_stat;
1664 snprintf(omap_fsid_buf, sizeof(omap_fsid_buf), "%s/osd_uuid", omap_dir.c_str());
1665 // if osd_uuid not exists, assume as this omap matchs corresponding osd
1666 if (::stat(omap_fsid_buf, &omap_fsid_stat) != 0){
1667 dout(10) << __FUNC__ << ": osd_uuid not found under omap, "
1668 << "assume as matched."
1669 << dendl;
1670 }else{
1671 // if osd_uuid exists, compares osd_uuid with fsid
1672 omap_fsid_fd = ::open(omap_fsid_buf, O_RDONLY, 0644);
1673 if (omap_fsid_fd < 0) {
1674 ret = -errno;
1675 derr << __FUNC__ << ": error opening '" << omap_fsid_buf << "': "
1676 << cpp_strerror(ret)
1677 << dendl;
1678 goto close_current_fd;
1679 }
1680 ret = read_fsid(omap_fsid_fd, &omap_fsid);
1681 VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd));
1682 omap_fsid_fd = -1; // defensive
1683 if (ret < 0) {
1684 derr << __FUNC__ << ": error reading omap_fsid_fd"
1685 << ", omap_fsid = " << omap_fsid
1686 << cpp_strerror(ret)
1687 << dendl;
1688 goto close_current_fd;
1689 }
1690 if (fsid != omap_fsid) {
1691 derr << __FUNC__ << ": " << omap_fsid_buf
1692 << " has existed omap fsid " << omap_fsid
1693 << " != expected osd fsid " << fsid
1694 << dendl;
1695 ret = -EINVAL;
1696 goto close_current_fd;
1697 }
1698 }
1699
1700 dout(0) << "start omap initiation" << dendl;
1701 if (!(generic_flags & SKIP_MOUNT_OMAP)) {
1702 KeyValueDB * omap_store = KeyValueDB::create(cct,
1703 superblock.omap_backend,
1704 omap_dir);
1705 if (omap_store == NULL)
1706 {
1707 derr << __FUNC__ << ": Error creating " << superblock.omap_backend << dendl;
1708 ret = -1;
1709 goto close_current_fd;
1710 }
1711
1712 if (superblock.omap_backend == "rocksdb")
1713 ret = omap_store->init(cct->_conf->filestore_rocksdb_options);
1714 else
1715 ret = omap_store->init();
1716
1717 if (ret < 0) {
1718 derr << __FUNC__ << ": Error initializing omap_store: " << cpp_strerror(ret) << dendl;
1719 goto close_current_fd;
1720 }
1721
1722 stringstream err;
1723 if (omap_store->create_and_open(err)) {
1724 delete omap_store;
1725 derr << __FUNC__ << ": Error initializing " << superblock.omap_backend
1726 << " : " << err.str() << dendl;
1727 ret = -1;
1728 goto close_current_fd;
1729 }
1730
1731 DBObjectMap *dbomap = new DBObjectMap(cct, omap_store);
1732 ret = dbomap->init(do_update);
1733 if (ret < 0) {
1734 delete dbomap;
1735 derr << __FUNC__ << ": Error initializing DBObjectMap: " << ret << dendl;
1736 goto close_current_fd;
1737 }
1738 stringstream err2;
1739
1740 if (cct->_conf->filestore_debug_omap_check && !dbomap->check(err2)) {
1741 derr << err2.str() << dendl;
1742 delete dbomap;
1743 ret = -EINVAL;
1744 goto close_current_fd;
1745 }
1746 object_map.reset(dbomap);
1747 }
1748
1749 // journal
1750 new_journal();
1751
1752 // select journal mode?
1753 if (journal) {
1754 if (!m_filestore_journal_writeahead &&
1755 !m_filestore_journal_parallel &&
1756 !m_filestore_journal_trailing) {
1757 if (!backend->can_checkpoint()) {
1758 m_filestore_journal_writeahead = true;
1759 dout(0) << __FUNC__ << ": enabling WRITEAHEAD journal mode: checkpoint is not enabled" << dendl;
1760 } else {
1761 m_filestore_journal_parallel = true;
1762 dout(0) << __FUNC__ << ": enabling PARALLEL journal mode: fs, checkpoint is enabled" << dendl;
1763 }
1764 } else {
1765 if (m_filestore_journal_writeahead)
1766 dout(0) << __FUNC__ << ": WRITEAHEAD journal mode explicitly enabled in conf" << dendl;
1767 if (m_filestore_journal_parallel)
1768 dout(0) << __FUNC__ << ": PARALLEL journal mode explicitly enabled in conf" << dendl;
1769 if (m_filestore_journal_trailing)
1770 dout(0) << __FUNC__ << ": TRAILING journal mode explicitly enabled in conf" << dendl;
1771 }
1772 if (m_filestore_journal_writeahead)
1773 journal->set_wait_on_full(true);
1774 } else {
1775 dout(0) << __FUNC__ << ": no journal" << dendl;
1776 }
1777
1778 ret = _sanity_check_fs();
1779 if (ret) {
1780 derr << __FUNC__ << ": _sanity_check_fs failed with error "
1781 << ret << dendl;
1782 goto close_current_fd;
1783 }
1784
1785 // Cleanup possibly invalid collections
1786 {
1787 vector<coll_t> collections;
1788 ret = list_collections(collections, true);
1789 if (ret < 0) {
1790 derr << "Error " << ret << " while listing collections" << dendl;
1791 goto close_current_fd;
1792 }
1793 for (vector<coll_t>::iterator i = collections.begin();
1794 i != collections.end();
1795 ++i) {
1796 Index index;
1797 ret = get_index(*i, &index);
1798 if (ret < 0) {
1799 derr << "Unable to mount index " << *i
1800 << " with error: " << ret << dendl;
1801 goto close_current_fd;
1802 }
1803 assert(NULL != index.index);
1804 RWLock::WLocker l((index.index)->access_lock);
1805
1806 index->cleanup();
1807 }
1808 }
1809 if (!m_disable_wbthrottle) {
1810 wbthrottle.start();
1811 } else {
1812 dout(0) << __FUNC__ << ": INFO: WbThrottle is disabled" << dendl;
1813 if (cct->_conf->filestore_odsync_write) {
1814 dout(0) << __FUNC__ << ": INFO: O_DSYNC write is enabled" << dendl;
1815 }
1816 }
1817 sync_thread.create("filestore_sync");
1818
1819 if (!(generic_flags & SKIP_JOURNAL_REPLAY)) {
1820 ret = journal_replay(initial_op_seq);
1821 if (ret < 0) {
1822 derr << __FUNC__ << ": failed to open journal " << journalpath << ": " << cpp_strerror(ret) << dendl;
1823 if (ret == -ENOTTY) {
1824 derr << "maybe journal is not pointing to a block device and its size "
1825 << "wasn't configured?" << dendl;
1826 }
1827
1828 goto stop_sync;
1829 }
1830 }
1831
1832 {
1833 stringstream err2;
1834 if (cct->_conf->filestore_debug_omap_check && !object_map->check(err2)) {
1835 derr << err2.str() << dendl;
1836 ret = -EINVAL;
1837 goto stop_sync;
1838 }
1839 }
1840
1841 init_temp_collections();
1842
1843 journal_start();
1844
1845 op_tp.start();
1846 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
1847 (*it)->start();
1848 }
1849 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
1850 (*it)->start();
1851 }
1852
1853 timer.init();
1854
1855 // upgrade?
1856 if (cct->_conf->filestore_update_to >= (int)get_target_version()) {
1857 int err = upgrade();
1858 if (err < 0) {
1859 derr << "error converting store" << dendl;
1860 umount();
1861 return err;
1862 }
1863 }
1864
1865 // all okay.
1866 return 0;
1867
1868 stop_sync:
1869 // stop sync thread
1870 lock.Lock();
1871 stop = true;
1872 sync_cond.Signal();
1873 lock.Unlock();
1874 sync_thread.join();
1875 if (!m_disable_wbthrottle) {
1876 wbthrottle.stop();
1877 }
1878 close_current_fd:
1879 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
1880 current_fd = -1;
1881 close_basedir_fd:
1882 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1883 basedir_fd = -1;
1884 close_fsid_fd:
1885 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1886 fsid_fd = -1;
1887 done:
1888 assert(!m_filestore_fail_eio || ret != -EIO);
1889 delete backend;
1890 backend = NULL;
1891 object_map.reset();
1892 return ret;
1893 }
1894
1895 void FileStore::init_temp_collections()
1896 {
1897 dout(10) << __FUNC__ << dendl;
1898 vector<coll_t> ls;
1899 int r = list_collections(ls, true);
1900 assert(r >= 0);
1901
1902 dout(20) << " ls " << ls << dendl;
1903
1904 SequencerPosition spos;
1905
1906 set<coll_t> temps;
1907 for (vector<coll_t>::iterator p = ls.begin(); p != ls.end(); ++p)
1908 if (p->is_temp())
1909 temps.insert(*p);
1910 dout(20) << " temps " << temps << dendl;
1911
1912 for (vector<coll_t>::iterator p = ls.begin(); p != ls.end(); ++p) {
1913 if (p->is_temp())
1914 continue;
1915 if (p->is_meta())
1916 continue;
1917 coll_t temp = p->get_temp();
1918 if (temps.count(temp)) {
1919 temps.erase(temp);
1920 } else {
1921 dout(10) << __FUNC__ << ": creating " << temp << dendl;
1922 r = _create_collection(temp, 0, spos);
1923 assert(r == 0);
1924 }
1925 }
1926
1927 for (set<coll_t>::iterator p = temps.begin(); p != temps.end(); ++p) {
1928 dout(10) << __FUNC__ << ": removing stray " << *p << dendl;
1929 r = _collection_remove_recursive(*p, spos);
1930 assert(r == 0);
1931 }
1932 }
1933
1934 int FileStore::umount()
1935 {
1936 dout(5) << __FUNC__ << ": " << basedir << dendl;
1937
1938 flush();
1939 sync();
1940 do_force_sync();
1941
1942 lock.Lock();
1943 stop = true;
1944 sync_cond.Signal();
1945 lock.Unlock();
1946 sync_thread.join();
1947 if (!m_disable_wbthrottle){
1948 wbthrottle.stop();
1949 }
1950 op_tp.stop();
1951
1952 journal_stop();
1953 if (!(generic_flags & SKIP_JOURNAL_REPLAY))
1954 journal_write_close();
1955
1956 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
1957 (*it)->stop();
1958 }
1959 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
1960 (*it)->stop();
1961 }
1962
1963 if (fsid_fd >= 0) {
1964 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1965 fsid_fd = -1;
1966 }
1967 if (op_fd >= 0) {
1968 VOID_TEMP_FAILURE_RETRY(::close(op_fd));
1969 op_fd = -1;
1970 }
1971 if (current_fd >= 0) {
1972 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
1973 current_fd = -1;
1974 }
1975 if (basedir_fd >= 0) {
1976 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1977 basedir_fd = -1;
1978 }
1979
1980 force_sync = false;
1981
1982 delete backend;
1983 backend = NULL;
1984
1985 object_map.reset();
1986
1987 {
1988 Mutex::Locker l(sync_entry_timeo_lock);
1989 timer.shutdown();
1990 }
1991
1992 // nothing
1993 return 0;
1994 }
1995
1996
1997
1998
1999 /// -----------------------------
2000
2001 FileStore::Op *FileStore::build_op(vector<Transaction>& tls,
2002 Context *onreadable,
2003 Context *onreadable_sync,
2004 TrackedOpRef osd_op)
2005 {
2006 uint64_t bytes = 0, ops = 0;
2007 for (vector<Transaction>::iterator p = tls.begin();
2008 p != tls.end();
2009 ++p) {
2010 bytes += (*p).get_num_bytes();
2011 ops += (*p).get_num_ops();
2012 }
2013
2014 Op *o = new Op;
2015 o->start = ceph_clock_now();
2016 o->tls = std::move(tls);
2017 o->onreadable = onreadable;
2018 o->onreadable_sync = onreadable_sync;
2019 o->ops = ops;
2020 o->bytes = bytes;
2021 o->osd_op = osd_op;
2022 return o;
2023 }
2024
2025
2026
2027 void FileStore::queue_op(OpSequencer *osr, Op *o)
2028 {
2029 // queue op on sequencer, then queue sequencer for the threadpool,
2030 // so that regardless of which order the threads pick up the
2031 // sequencer, the op order will be preserved.
2032
2033 osr->queue(o);
2034 o->trace.event("queued");
2035
2036 logger->inc(l_filestore_ops);
2037 logger->inc(l_filestore_bytes, o->bytes);
2038
2039 dout(5) << __FUNC__ << ": " << o << " seq " << o->op
2040 << " " << *osr
2041 << " " << o->bytes << " bytes"
2042 << " (queue has " << throttle_ops.get_current() << " ops and " << throttle_bytes.get_current() << " bytes)"
2043 << dendl;
2044 op_wq.queue(osr);
2045 }
2046
2047 void FileStore::op_queue_reserve_throttle(Op *o)
2048 {
2049 throttle_ops.get();
2050 throttle_bytes.get(o->bytes);
2051
2052 logger->set(l_filestore_op_queue_ops, throttle_ops.get_current());
2053 logger->set(l_filestore_op_queue_bytes, throttle_bytes.get_current());
2054 }
2055
2056 void FileStore::op_queue_release_throttle(Op *o)
2057 {
2058 throttle_ops.put();
2059 throttle_bytes.put(o->bytes);
2060 logger->set(l_filestore_op_queue_ops, throttle_ops.get_current());
2061 logger->set(l_filestore_op_queue_bytes, throttle_bytes.get_current());
2062 }
2063
2064 void FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)
2065 {
2066 if (!m_disable_wbthrottle) {
2067 wbthrottle.throttle();
2068 }
2069 // inject a stall?
2070 if (cct->_conf->filestore_inject_stall) {
2071 int orig = cct->_conf->filestore_inject_stall;
2072 dout(5) << __FUNC__ << ": filestore_inject_stall " << orig << ", sleeping" << dendl;
2073 sleep(orig);
2074 cct->_conf->set_val("filestore_inject_stall", "0");
2075 dout(5) << __FUNC__ << ": done stalling" << dendl;
2076 }
2077
2078 osr->apply_lock.Lock();
2079 Op *o = osr->peek_queue();
2080 o->trace.event("op_apply_start");
2081 apply_manager.op_apply_start(o->op);
2082 dout(5) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " start" << dendl;
2083 o->trace.event("_do_transactions start");
2084 int r = _do_transactions(o->tls, o->op, &handle);
2085 o->trace.event("op_apply_finish");
2086 apply_manager.op_apply_finish(o->op);
2087 dout(10) << __FUNC__ << ": " << o << " seq " << o->op << " r = " << r
2088 << ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl;
2089
2090 o->tls.clear();
2091
2092 }
2093
2094 void FileStore::_finish_op(OpSequencer *osr)
2095 {
2096 list<Context*> to_queue;
2097 Op *o = osr->dequeue(&to_queue);
2098
2099 utime_t lat = ceph_clock_now();
2100 lat -= o->start;
2101
2102 dout(10) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " lat " << lat << dendl;
2103 osr->apply_lock.Unlock(); // locked in _do_op
2104 o->trace.event("_finish_op");
2105
2106 // called with tp lock held
2107 op_queue_release_throttle(o);
2108
2109 logger->tinc(l_filestore_apply_latency, lat);
2110
2111 if (o->onreadable_sync) {
2112 o->onreadable_sync->complete(0);
2113 }
2114 if (o->onreadable) {
2115 apply_finishers[osr->id % m_apply_finisher_num]->queue(o->onreadable);
2116 }
2117 if (!to_queue.empty()) {
2118 apply_finishers[osr->id % m_apply_finisher_num]->queue(to_queue);
2119 }
2120 delete o;
2121 }
2122
2123
2124 struct C_JournaledAhead : public Context {
2125 FileStore *fs;
2126 FileStore::OpSequencer *osr;
2127 FileStore::Op *o;
2128 Context *ondisk;
2129
2130 C_JournaledAhead(FileStore *f, FileStore::OpSequencer *os, FileStore::Op *o, Context *ondisk):
2131 fs(f), osr(os), o(o), ondisk(ondisk) { }
2132 void finish(int r) override {
2133 fs->_journaled_ahead(osr, o, ondisk);
2134 }
2135 };
2136
2137 int FileStore::queue_transactions(Sequencer *posr, vector<Transaction>& tls,
2138 TrackedOpRef osd_op,
2139 ThreadPool::TPHandle *handle)
2140 {
2141 Context *onreadable;
2142 Context *ondisk;
2143 Context *onreadable_sync;
2144 ObjectStore::Transaction::collect_contexts(
2145 tls, &onreadable, &ondisk, &onreadable_sync);
2146
2147 if (cct->_conf->objectstore_blackhole) {
2148 dout(0) << __FUNC__ << ": objectstore_blackhole = TRUE, dropping transaction"
2149 << dendl;
2150 delete ondisk;
2151 delete onreadable;
2152 delete onreadable_sync;
2153 return 0;
2154 }
2155
2156 utime_t start = ceph_clock_now();
2157 // set up the sequencer
2158 OpSequencer *osr;
2159 assert(posr);
2160 if (posr->p) {
2161 osr = static_cast<OpSequencer *>(posr->p.get());
2162 dout(5) << __FUNC__ << ": existing " << osr << " " << *osr << dendl;
2163 } else {
2164 osr = new OpSequencer(cct, ++next_osr_id);
2165 osr->set_cct(cct);
2166 osr->parent = posr;
2167 posr->p = osr;
2168 dout(5) << __FUNC__ << ": new " << osr << " " << *osr << dendl;
2169 }
2170
2171 // used to include osr information in tracepoints during transaction apply
2172 for (vector<Transaction>::iterator i = tls.begin(); i != tls.end(); ++i) {
2173 (*i).set_osr(osr);
2174 }
2175
2176 ZTracer::Trace trace;
2177 if (osd_op && osd_op->pg_trace) {
2178 osd_op->store_trace.init("filestore op", &trace_endpoint, &osd_op->pg_trace);
2179 trace = osd_op->store_trace;
2180 }
2181
2182 if (journal && journal->is_writeable() && !m_filestore_journal_trailing) {
2183 Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
2184
2185 //prepare and encode transactions data out of lock
2186 bufferlist tbl;
2187 int orig_len = journal->prepare_entry(o->tls, &tbl);
2188
2189 if (handle)
2190 handle->suspend_tp_timeout();
2191
2192 op_queue_reserve_throttle(o);
2193 journal->reserve_throttle_and_backoff(tbl.length());
2194
2195 if (handle)
2196 handle->reset_tp_timeout();
2197
2198 uint64_t op_num = submit_manager.op_submit_start();
2199 o->op = op_num;
2200 trace.keyval("opnum", op_num);
2201
2202 if (m_filestore_do_dump)
2203 dump_transactions(o->tls, o->op, osr);
2204
2205 if (m_filestore_journal_parallel) {
2206 dout(5) << __FUNC__ << ": (parallel) " << o->op << " " << o->tls << dendl;
2207
2208 trace.keyval("journal mode", "parallel");
2209 trace.event("journal started");
2210 _op_journal_transactions(tbl, orig_len, o->op, ondisk, osd_op);
2211
2212 // queue inside submit_manager op submission lock
2213 queue_op(osr, o);
2214 trace.event("op queued");
2215 } else if (m_filestore_journal_writeahead) {
2216 dout(5) << __FUNC__ << ": (writeahead) " << o->op << " " << o->tls << dendl;
2217
2218 osr->queue_journal(o->op);
2219
2220 trace.keyval("journal mode", "writeahead");
2221 trace.event("journal started");
2222 _op_journal_transactions(tbl, orig_len, o->op,
2223 new C_JournaledAhead(this, osr, o, ondisk),
2224 osd_op);
2225 } else {
2226 ceph_abort();
2227 }
2228 submit_manager.op_submit_finish(op_num);
2229 utime_t end = ceph_clock_now();
2230 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2231 return 0;
2232 }
2233
2234 if (!journal) {
2235 Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
2236 dout(5) << __FUNC__ << ": (no journal) " << o << " " << tls << dendl;
2237
2238 if (handle)
2239 handle->suspend_tp_timeout();
2240
2241 op_queue_reserve_throttle(o);
2242
2243 if (handle)
2244 handle->reset_tp_timeout();
2245
2246 uint64_t op_num = submit_manager.op_submit_start();
2247 o->op = op_num;
2248
2249 if (m_filestore_do_dump)
2250 dump_transactions(o->tls, o->op, osr);
2251
2252 queue_op(osr, o);
2253 trace.keyval("opnum", op_num);
2254 trace.keyval("journal mode", "none");
2255 trace.event("op queued");
2256
2257 if (ondisk)
2258 apply_manager.add_waiter(op_num, ondisk);
2259 submit_manager.op_submit_finish(op_num);
2260 utime_t end = ceph_clock_now();
2261 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2262 return 0;
2263 }
2264
2265 assert(journal);
2266 //prepare and encode transactions data out of lock
2267 bufferlist tbl;
2268 int orig_len = -1;
2269 if (journal->is_writeable()) {
2270 orig_len = journal->prepare_entry(tls, &tbl);
2271 }
2272 uint64_t op = submit_manager.op_submit_start();
2273 dout(5) << __FUNC__ << ": (trailing journal) " << op << " " << tls << dendl;
2274
2275 if (m_filestore_do_dump)
2276 dump_transactions(tls, op, osr);
2277
2278 trace.event("op_apply_start");
2279 trace.keyval("opnum", op);
2280 trace.keyval("journal mode", "trailing");
2281 apply_manager.op_apply_start(op);
2282 trace.event("do_transactions");
2283 int r = do_transactions(tls, op);
2284
2285 if (r >= 0) {
2286 trace.event("journal started");
2287 _op_journal_transactions(tbl, orig_len, op, ondisk, osd_op);
2288 } else {
2289 delete ondisk;
2290 }
2291
2292 // start on_readable finisher after we queue journal item, as on_readable callback
2293 // is allowed to delete the Transaction
2294 if (onreadable_sync) {
2295 onreadable_sync->complete(r);
2296 }
2297 apply_finishers[osr->id % m_apply_finisher_num]->queue(onreadable, r);
2298
2299 submit_manager.op_submit_finish(op);
2300 trace.event("op_apply_finish");
2301 apply_manager.op_apply_finish(op);
2302
2303 utime_t end = ceph_clock_now();
2304 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2305 return r;
2306 }
2307
2308 void FileStore::_journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk)
2309 {
2310 dout(5) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << " " << o->tls << dendl;
2311
2312 o->trace.event("writeahead journal finished");
2313
2314 // this should queue in order because the journal does it's completions in order.
2315 queue_op(osr, o);
2316
2317 list<Context*> to_queue;
2318 osr->dequeue_journal(&to_queue);
2319
2320 // do ondisk completions async, to prevent any onreadable_sync completions
2321 // getting blocked behind an ondisk completion.
2322 if (ondisk) {
2323 dout(10) << " queueing ondisk " << ondisk << dendl;
2324 ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(ondisk);
2325 }
2326 if (!to_queue.empty()) {
2327 ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(to_queue);
2328 }
2329 }
2330
2331 int FileStore::_do_transactions(
2332 vector<Transaction> &tls,
2333 uint64_t op_seq,
2334 ThreadPool::TPHandle *handle)
2335 {
2336 int trans_num = 0;
2337
2338 for (vector<Transaction>::iterator p = tls.begin();
2339 p != tls.end();
2340 ++p, trans_num++) {
2341 _do_transaction(*p, op_seq, trans_num, handle);
2342 if (handle)
2343 handle->reset_tp_timeout();
2344 }
2345
2346 return 0;
2347 }
2348
2349 void FileStore::_set_global_replay_guard(const coll_t& cid,
2350 const SequencerPosition &spos)
2351 {
2352 if (backend->can_checkpoint())
2353 return;
2354
2355 // sync all previous operations on this sequencer
2356 int ret = object_map->sync();
2357 if (ret < 0) {
2358 derr << __FUNC__ << ": omap sync error " << cpp_strerror(ret) << dendl;
2359 assert(0 == "_set_global_replay_guard failed");
2360 }
2361 ret = sync_filesystem(basedir_fd);
2362 if (ret < 0) {
2363 derr << __FUNC__ << ": sync_filesystem error " << cpp_strerror(ret) << dendl;
2364 assert(0 == "_set_global_replay_guard failed");
2365 }
2366
2367 char fn[PATH_MAX];
2368 get_cdir(cid, fn, sizeof(fn));
2369 int fd = ::open(fn, O_RDONLY);
2370 if (fd < 0) {
2371 int err = errno;
2372 derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
2373 assert(0 == "_set_global_replay_guard failed");
2374 }
2375
2376 _inject_failure();
2377
2378 // then record that we did it
2379 bufferlist v;
2380 ::encode(spos, v);
2381 int r = chain_fsetxattr<true, true>(
2382 fd, GLOBAL_REPLAY_GUARD_XATTR, v.c_str(), v.length());
2383 if (r < 0) {
2384 derr << __FUNC__ << ": fsetxattr " << GLOBAL_REPLAY_GUARD_XATTR
2385 << " got " << cpp_strerror(r) << dendl;
2386 assert(0 == "fsetxattr failed");
2387 }
2388
2389 // and make sure our xattr is durable.
2390 ::fsync(fd);
2391
2392 _inject_failure();
2393
2394 VOID_TEMP_FAILURE_RETRY(::close(fd));
2395 dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
2396 }
2397
2398 int FileStore::_check_global_replay_guard(const coll_t& cid,
2399 const SequencerPosition& spos)
2400 {
2401 char fn[PATH_MAX];
2402 get_cdir(cid, fn, sizeof(fn));
2403 int fd = ::open(fn, O_RDONLY);
2404 if (fd < 0) {
2405 dout(10) << __FUNC__ << ": " << cid << " dne" << dendl;
2406 return 1; // if collection does not exist, there is no guard, and we can replay.
2407 }
2408
2409 char buf[100];
2410 int r = chain_fgetxattr(fd, GLOBAL_REPLAY_GUARD_XATTR, buf, sizeof(buf));
2411 if (r < 0) {
2412 dout(20) << __FUNC__ << ": no xattr" << dendl;
2413 assert(!m_filestore_fail_eio || r != -EIO);
2414 VOID_TEMP_FAILURE_RETRY(::close(fd));
2415 return 1; // no xattr
2416 }
2417 bufferlist bl;
2418 bl.append(buf, r);
2419
2420 SequencerPosition opos;
2421 bufferlist::iterator p = bl.begin();
2422 ::decode(opos, p);
2423
2424 VOID_TEMP_FAILURE_RETRY(::close(fd));
2425 return spos >= opos ? 1 : -1;
2426 }
2427
2428
2429 void FileStore::_set_replay_guard(const coll_t& cid,
2430 const SequencerPosition &spos,
2431 bool in_progress=false)
2432 {
2433 char fn[PATH_MAX];
2434 get_cdir(cid, fn, sizeof(fn));
2435 int fd = ::open(fn, O_RDONLY);
2436 if (fd < 0) {
2437 int err = errno;
2438 derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
2439 assert(0 == "_set_replay_guard failed");
2440 }
2441 _set_replay_guard(fd, spos, 0, in_progress);
2442 VOID_TEMP_FAILURE_RETRY(::close(fd));
2443 }
2444
2445
2446 void FileStore::_set_replay_guard(int fd,
2447 const SequencerPosition& spos,
2448 const ghobject_t *hoid,
2449 bool in_progress)
2450 {
2451 if (backend->can_checkpoint())
2452 return;
2453
2454 dout(10) << __FUNC__ << ": " << spos << (in_progress ? " START" : "") << dendl;
2455
2456 _inject_failure();
2457
2458 // first make sure the previous operation commits
2459 ::fsync(fd);
2460
2461 if (!in_progress) {
2462 // sync object_map too. even if this object has a header or keys,
2463 // it have had them in the past and then removed them, so always
2464 // sync.
2465 object_map->sync(hoid, &spos);
2466 }
2467
2468 _inject_failure();
2469
2470 // then record that we did it
2471 bufferlist v(40);
2472 ::encode(spos, v);
2473 ::encode(in_progress, v);
2474 int r = chain_fsetxattr<true, true>(
2475 fd, REPLAY_GUARD_XATTR, v.c_str(), v.length());
2476 if (r < 0) {
2477 derr << "fsetxattr " << REPLAY_GUARD_XATTR << " got " << cpp_strerror(r) << dendl;
2478 assert(0 == "fsetxattr failed");
2479 }
2480
2481 // and make sure our xattr is durable.
2482 ::fsync(fd);
2483
2484 _inject_failure();
2485
2486 dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
2487 }
2488
2489 void FileStore::_close_replay_guard(const coll_t& cid,
2490 const SequencerPosition &spos)
2491 {
2492 char fn[PATH_MAX];
2493 get_cdir(cid, fn, sizeof(fn));
2494 int fd = ::open(fn, O_RDONLY);
2495 if (fd < 0) {
2496 int err = errno;
2497 derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
2498 assert(0 == "_close_replay_guard failed");
2499 }
2500 _close_replay_guard(fd, spos);
2501 VOID_TEMP_FAILURE_RETRY(::close(fd));
2502 }
2503
2504 void FileStore::_close_replay_guard(int fd, const SequencerPosition& spos,
2505 const ghobject_t *hoid)
2506 {
2507 if (backend->can_checkpoint())
2508 return;
2509
2510 dout(10) << __FUNC__ << ": " << spos << dendl;
2511
2512 _inject_failure();
2513
2514 // sync object_map too. even if this object has a header or keys,
2515 // it have had them in the past and then removed them, so always
2516 // sync.
2517 object_map->sync(hoid, &spos);
2518
2519 // then record that we are done with this operation
2520 bufferlist v(40);
2521 ::encode(spos, v);
2522 bool in_progress = false;
2523 ::encode(in_progress, v);
2524 int r = chain_fsetxattr<true, true>(
2525 fd, REPLAY_GUARD_XATTR, v.c_str(), v.length());
2526 if (r < 0) {
2527 derr << "fsetxattr " << REPLAY_GUARD_XATTR << " got " << cpp_strerror(r) << dendl;
2528 assert(0 == "fsetxattr failed");
2529 }
2530
2531 // and make sure our xattr is durable.
2532 ::fsync(fd);
2533
2534 _inject_failure();
2535
2536 dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
2537 }
2538
2539 int FileStore::_check_replay_guard(const coll_t& cid, const ghobject_t &oid,
2540 const SequencerPosition& spos)
2541 {
2542 if (!replaying || backend->can_checkpoint())
2543 return 1;
2544
2545 int r = _check_global_replay_guard(cid, spos);
2546 if (r < 0)
2547 return r;
2548
2549 FDRef fd;
2550 r = lfn_open(cid, oid, false, &fd);
2551 if (r < 0) {
2552 dout(10) << __FUNC__ << ": " << cid << " " << oid << " dne" << dendl;
2553 return 1; // if file does not exist, there is no guard, and we can replay.
2554 }
2555 int ret = _check_replay_guard(**fd, spos);
2556 lfn_close(fd);
2557 return ret;
2558 }
2559
2560 int FileStore::_check_replay_guard(const coll_t& cid, const SequencerPosition& spos)
2561 {
2562 if (!replaying || backend->can_checkpoint())
2563 return 1;
2564
2565 char fn[PATH_MAX];
2566 get_cdir(cid, fn, sizeof(fn));
2567 int fd = ::open(fn, O_RDONLY);
2568 if (fd < 0) {
2569 dout(10) << __FUNC__ << ": " << cid << " dne" << dendl;
2570 return 1; // if collection does not exist, there is no guard, and we can replay.
2571 }
2572 int ret = _check_replay_guard(fd, spos);
2573 VOID_TEMP_FAILURE_RETRY(::close(fd));
2574 return ret;
2575 }
2576
2577 int FileStore::_check_replay_guard(int fd, const SequencerPosition& spos)
2578 {
2579 if (!replaying || backend->can_checkpoint())
2580 return 1;
2581
2582 char buf[100];
2583 int r = chain_fgetxattr(fd, REPLAY_GUARD_XATTR, buf, sizeof(buf));
2584 if (r < 0) {
2585 dout(20) << __FUNC__ << ": no xattr" << dendl;
2586 assert(!m_filestore_fail_eio || r != -EIO);
2587 return 1; // no xattr
2588 }
2589 bufferlist bl;
2590 bl.append(buf, r);
2591
2592 SequencerPosition opos;
2593 bufferlist::iterator p = bl.begin();
2594 ::decode(opos, p);
2595 bool in_progress = false;
2596 if (!p.end()) // older journals don't have this
2597 ::decode(in_progress, p);
2598 if (opos > spos) {
2599 dout(10) << __FUNC__ << ": object has " << opos << " > current pos " << spos
2600 << ", now or in future, SKIPPING REPLAY" << dendl;
2601 return -1;
2602 } else if (opos == spos) {
2603 if (in_progress) {
2604 dout(10) << __FUNC__ << ": object has " << opos << " == current pos " << spos
2605 << ", in_progress=true, CONDITIONAL REPLAY" << dendl;
2606 return 0;
2607 } else {
2608 dout(10) << __FUNC__ << ": object has " << opos << " == current pos " << spos
2609 << ", in_progress=false, SKIPPING REPLAY" << dendl;
2610 return -1;
2611 }
2612 } else {
2613 dout(10) << __FUNC__ << ": object has " << opos << " < current pos " << spos
2614 << ", in past, will replay" << dendl;
2615 return 1;
2616 }
2617 }
2618
2619 void FileStore::_do_transaction(
2620 Transaction& t, uint64_t op_seq, int trans_num,
2621 ThreadPool::TPHandle *handle)
2622 {
2623 dout(10) << __FUNC__ << ": on " << &t << dendl;
2624
2625 #ifdef WITH_LTTNG
2626 const char *osr_name = t.get_osr() ? static_cast<OpSequencer*>(t.get_osr())->get_name().c_str() : "<NULL>";
2627 #endif
2628
2629 Transaction::iterator i = t.begin();
2630
2631 SequencerPosition spos(op_seq, trans_num, 0);
2632 while (i.have_op()) {
2633 if (handle)
2634 handle->reset_tp_timeout();
2635
2636 Transaction::Op *op = i.decode_op();
2637 int r = 0;
2638
2639 _inject_failure();
2640
2641 switch (op->op) {
2642 case Transaction::OP_NOP:
2643 break;
2644 case Transaction::OP_TOUCH:
2645 {
2646 const coll_t &_cid = i.get_cid(op->cid);
2647 const ghobject_t &oid = i.get_oid(op->oid);
2648 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2649 _cid : _cid.get_temp();
2650 tracepoint(objectstore, touch_enter, osr_name);
2651 if (_check_replay_guard(cid, oid, spos) > 0)
2652 r = _touch(cid, oid);
2653 tracepoint(objectstore, touch_exit, r);
2654 }
2655 break;
2656
2657 case Transaction::OP_WRITE:
2658 {
2659 const coll_t &_cid = i.get_cid(op->cid);
2660 const ghobject_t &oid = i.get_oid(op->oid);
2661 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2662 _cid : _cid.get_temp();
2663 uint64_t off = op->off;
2664 uint64_t len = op->len;
2665 uint32_t fadvise_flags = i.get_fadvise_flags();
2666 bufferlist bl;
2667 i.decode_bl(bl);
2668 tracepoint(objectstore, write_enter, osr_name, off, len);
2669 if (_check_replay_guard(cid, oid, spos) > 0)
2670 r = _write(cid, oid, off, len, bl, fadvise_flags);
2671 tracepoint(objectstore, write_exit, r);
2672 }
2673 break;
2674
2675 case Transaction::OP_ZERO:
2676 {
2677 const coll_t &_cid = i.get_cid(op->cid);
2678 const ghobject_t &oid = i.get_oid(op->oid);
2679 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2680 _cid : _cid.get_temp();
2681 uint64_t off = op->off;
2682 uint64_t len = op->len;
2683 tracepoint(objectstore, zero_enter, osr_name, off, len);
2684 if (_check_replay_guard(cid, oid, spos) > 0)
2685 r = _zero(cid, oid, off, len);
2686 tracepoint(objectstore, zero_exit, r);
2687 }
2688 break;
2689
2690 case Transaction::OP_TRIMCACHE:
2691 {
2692 // deprecated, no-op
2693 }
2694 break;
2695
2696 case Transaction::OP_TRUNCATE:
2697 {
2698 const coll_t &_cid = i.get_cid(op->cid);
2699 const ghobject_t &oid = i.get_oid(op->oid);
2700 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2701 _cid : _cid.get_temp();
2702 uint64_t off = op->off;
2703 tracepoint(objectstore, truncate_enter, osr_name, off);
2704 if (_check_replay_guard(cid, oid, spos) > 0)
2705 r = _truncate(cid, oid, off);
2706 tracepoint(objectstore, truncate_exit, r);
2707 }
2708 break;
2709
2710 case Transaction::OP_REMOVE:
2711 {
2712 const coll_t &_cid = i.get_cid(op->cid);
2713 const ghobject_t &oid = i.get_oid(op->oid);
2714 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2715 _cid : _cid.get_temp();
2716 tracepoint(objectstore, remove_enter, osr_name);
2717 if (_check_replay_guard(cid, oid, spos) > 0)
2718 r = _remove(cid, oid, spos);
2719 tracepoint(objectstore, remove_exit, r);
2720 }
2721 break;
2722
2723 case Transaction::OP_SETATTR:
2724 {
2725 const coll_t &_cid = i.get_cid(op->cid);
2726 const ghobject_t &oid = i.get_oid(op->oid);
2727 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2728 _cid : _cid.get_temp();
2729 string name = i.decode_string();
2730 bufferlist bl;
2731 i.decode_bl(bl);
2732 tracepoint(objectstore, setattr_enter, osr_name);
2733 if (_check_replay_guard(cid, oid, spos) > 0) {
2734 map<string, bufferptr> to_set;
2735 to_set[name] = bufferptr(bl.c_str(), bl.length());
2736 r = _setattrs(cid, oid, to_set, spos);
2737 if (r == -ENOSPC)
2738 dout(0) << " ENOSPC on setxattr on " << cid << "/" << oid
2739 << " name " << name << " size " << bl.length() << dendl;
2740 }
2741 tracepoint(objectstore, setattr_exit, r);
2742 }
2743 break;
2744
2745 case Transaction::OP_SETATTRS:
2746 {
2747 const coll_t &_cid = i.get_cid(op->cid);
2748 const ghobject_t &oid = i.get_oid(op->oid);
2749 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2750 _cid : _cid.get_temp();
2751 map<string, bufferptr> aset;
2752 i.decode_attrset(aset);
2753 tracepoint(objectstore, setattrs_enter, osr_name);
2754 if (_check_replay_guard(cid, oid, spos) > 0)
2755 r = _setattrs(cid, oid, aset, spos);
2756 tracepoint(objectstore, setattrs_exit, r);
2757 if (r == -ENOSPC)
2758 dout(0) << " ENOSPC on setxattrs on " << cid << "/" << oid << dendl;
2759 }
2760 break;
2761
2762 case Transaction::OP_RMATTR:
2763 {
2764 const coll_t &_cid = i.get_cid(op->cid);
2765 const ghobject_t &oid = i.get_oid(op->oid);
2766 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2767 _cid : _cid.get_temp();
2768 string name = i.decode_string();
2769 tracepoint(objectstore, rmattr_enter, osr_name);
2770 if (_check_replay_guard(cid, oid, spos) > 0)
2771 r = _rmattr(cid, oid, name.c_str(), spos);
2772 tracepoint(objectstore, rmattr_exit, r);
2773 }
2774 break;
2775
2776 case Transaction::OP_RMATTRS:
2777 {
2778 const coll_t &_cid = i.get_cid(op->cid);
2779 const ghobject_t &oid = i.get_oid(op->oid);
2780 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2781 _cid : _cid.get_temp();
2782 tracepoint(objectstore, rmattrs_enter, osr_name);
2783 if (_check_replay_guard(cid, oid, spos) > 0)
2784 r = _rmattrs(cid, oid, spos);
2785 tracepoint(objectstore, rmattrs_exit, r);
2786 }
2787 break;
2788
2789 case Transaction::OP_CLONE:
2790 {
2791 const coll_t &_cid = i.get_cid(op->cid);
2792 const ghobject_t &oid = i.get_oid(op->oid);
2793 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2794 _cid : _cid.get_temp();
2795 const ghobject_t &noid = i.get_oid(op->dest_oid);
2796 tracepoint(objectstore, clone_enter, osr_name);
2797 r = _clone(cid, oid, noid, spos);
2798 tracepoint(objectstore, clone_exit, r);
2799 }
2800 break;
2801
2802 case Transaction::OP_CLONERANGE:
2803 {
2804 const coll_t &_cid = i.get_cid(op->cid);
2805 const ghobject_t &oid = i.get_oid(op->oid);
2806 const ghobject_t &noid = i.get_oid(op->dest_oid);
2807 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2808 _cid : _cid.get_temp();
2809 const coll_t &ncid = !_need_temp_object_collection(_cid, noid) ?
2810 _cid : _cid.get_temp();
2811 uint64_t off = op->off;
2812 uint64_t len = op->len;
2813 tracepoint(objectstore, clone_range_enter, osr_name, len);
2814 r = _clone_range(cid, oid, ncid, noid, off, len, off, spos);
2815 tracepoint(objectstore, clone_range_exit, r);
2816 }
2817 break;
2818
2819 case Transaction::OP_CLONERANGE2:
2820 {
2821 const coll_t &_cid = i.get_cid(op->cid);
2822 const ghobject_t &oid = i.get_oid(op->oid);
2823 const ghobject_t &noid = i.get_oid(op->dest_oid);
2824 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2825 _cid : _cid.get_temp();
2826 const coll_t &ncid = !_need_temp_object_collection(_cid, noid) ?
2827 _cid : _cid.get_temp();
2828 uint64_t srcoff = op->off;
2829 uint64_t len = op->len;
2830 uint64_t dstoff = op->dest_off;
2831 tracepoint(objectstore, clone_range2_enter, osr_name, len);
2832 r = _clone_range(cid, oid, ncid, noid, srcoff, len, dstoff, spos);
2833 tracepoint(objectstore, clone_range2_exit, r);
2834 }
2835 break;
2836
2837 case Transaction::OP_MKCOLL:
2838 {
2839 const coll_t &cid = i.get_cid(op->cid);
2840 tracepoint(objectstore, mkcoll_enter, osr_name);
2841 if (_check_replay_guard(cid, spos) > 0)
2842 r = _create_collection(cid, op->split_bits, spos);
2843 tracepoint(objectstore, mkcoll_exit, r);
2844 }
2845 break;
2846
2847 case Transaction::OP_COLL_SET_BITS:
2848 {
2849 const coll_t &cid = i.get_cid(op->cid);
2850 int bits = op->split_bits;
2851 r = _collection_set_bits(cid, bits);
2852 }
2853 break;
2854
2855 case Transaction::OP_COLL_HINT:
2856 {
2857 const coll_t &cid = i.get_cid(op->cid);
2858 uint32_t type = op->hint_type;
2859 bufferlist hint;
2860 i.decode_bl(hint);
2861 bufferlist::iterator hiter = hint.begin();
2862 if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) {
2863 uint32_t pg_num;
2864 uint64_t num_objs;
2865 ::decode(pg_num, hiter);
2866 ::decode(num_objs, hiter);
2867 if (_check_replay_guard(cid, spos) > 0) {
2868 r = _collection_hint_expected_num_objs(cid, pg_num, num_objs, spos);
2869 }
2870 } else {
2871 // Ignore the hint
2872 dout(10) << "Unrecognized collection hint type: " << type << dendl;
2873 }
2874 }
2875 break;
2876
2877 case Transaction::OP_RMCOLL:
2878 {
2879 const coll_t &cid = i.get_cid(op->cid);
2880 tracepoint(objectstore, rmcoll_enter, osr_name);
2881 if (_check_replay_guard(cid, spos) > 0)
2882 r = _destroy_collection(cid);
2883 tracepoint(objectstore, rmcoll_exit, r);
2884 }
2885 break;
2886
2887 case Transaction::OP_COLL_ADD:
2888 {
2889 const coll_t &ocid = i.get_cid(op->cid);
2890 const coll_t &ncid = i.get_cid(op->dest_cid);
2891 const ghobject_t &oid = i.get_oid(op->oid);
2892
2893 assert(oid.hobj.pool >= -1);
2894
2895 // always followed by OP_COLL_REMOVE
2896 Transaction::Op *op2 = i.decode_op();
2897 const coll_t &ocid2 = i.get_cid(op2->cid);
2898 const ghobject_t &oid2 = i.get_oid(op2->oid);
2899 assert(op2->op == Transaction::OP_COLL_REMOVE);
2900 assert(ocid2 == ocid);
2901 assert(oid2 == oid);
2902
2903 tracepoint(objectstore, coll_add_enter);
2904 r = _collection_add(ncid, ocid, oid, spos);
2905 tracepoint(objectstore, coll_add_exit, r);
2906 spos.op++;
2907 if (r < 0)
2908 break;
2909 tracepoint(objectstore, coll_remove_enter, osr_name);
2910 if (_check_replay_guard(ocid, oid, spos) > 0)
2911 r = _remove(ocid, oid, spos);
2912 tracepoint(objectstore, coll_remove_exit, r);
2913 }
2914 break;
2915
2916 case Transaction::OP_COLL_MOVE:
2917 {
2918 // WARNING: this is deprecated and buggy; only here to replay old journals.
2919 const coll_t &ocid = i.get_cid(op->cid);
2920 const coll_t &ncid = i.get_cid(op->dest_cid);
2921 const ghobject_t &oid = i.get_oid(op->oid);
2922 tracepoint(objectstore, coll_move_enter);
2923 r = _collection_add(ocid, ncid, oid, spos);
2924 if (r == 0 &&
2925 (_check_replay_guard(ocid, oid, spos) > 0))
2926 r = _remove(ocid, oid, spos);
2927 tracepoint(objectstore, coll_move_exit, r);
2928 }
2929 break;
2930
2931 case Transaction::OP_COLL_MOVE_RENAME:
2932 {
2933 const coll_t &_oldcid = i.get_cid(op->cid);
2934 const ghobject_t &oldoid = i.get_oid(op->oid);
2935 const coll_t &_newcid = i.get_cid(op->dest_cid);
2936 const ghobject_t &newoid = i.get_oid(op->dest_oid);
2937 const coll_t &oldcid = !_need_temp_object_collection(_oldcid, oldoid) ?
2938 _oldcid : _oldcid.get_temp();
2939 const coll_t &newcid = !_need_temp_object_collection(_newcid, newoid) ?
2940 _oldcid : _newcid.get_temp();
2941 tracepoint(objectstore, coll_move_rename_enter);
2942 r = _collection_move_rename(oldcid, oldoid, newcid, newoid, spos);
2943 tracepoint(objectstore, coll_move_rename_exit, r);
2944 }
2945 break;
2946
2947 case Transaction::OP_TRY_RENAME:
2948 {
2949 const coll_t &_cid = i.get_cid(op->cid);
2950 const ghobject_t &oldoid = i.get_oid(op->oid);
2951 const ghobject_t &newoid = i.get_oid(op->dest_oid);
2952 const coll_t &oldcid = !_need_temp_object_collection(_cid, oldoid) ?
2953 _cid : _cid.get_temp();
2954 const coll_t &newcid = !_need_temp_object_collection(_cid, newoid) ?
2955 _cid : _cid.get_temp();
2956 tracepoint(objectstore, coll_try_rename_enter);
2957 r = _collection_move_rename(oldcid, oldoid, newcid, newoid, spos, true);
2958 tracepoint(objectstore, coll_try_rename_exit, r);
2959 }
2960 break;
2961
2962 case Transaction::OP_COLL_SETATTR:
2963 case Transaction::OP_COLL_RMATTR:
2964 assert(0 == "collection attr methods no longer implemented");
2965 break;
2966
2967 case Transaction::OP_STARTSYNC:
2968 tracepoint(objectstore, startsync_enter, osr_name);
2969 _start_sync();
2970 tracepoint(objectstore, startsync_exit);
2971 break;
2972
2973 case Transaction::OP_COLL_RENAME:
2974 {
2975 r = -EOPNOTSUPP;
2976 }
2977 break;
2978
2979 case Transaction::OP_OMAP_CLEAR:
2980 {
2981 const coll_t &_cid = i.get_cid(op->cid);
2982 const ghobject_t &oid = i.get_oid(op->oid);
2983 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2984 _cid : _cid.get_temp();
2985 tracepoint(objectstore, omap_clear_enter, osr_name);
2986 if (_check_replay_guard(cid, oid, spos) > 0)
2987 r = _omap_clear(cid, oid, spos);
2988 tracepoint(objectstore, omap_clear_exit, r);
2989 }
2990 break;
2991 case Transaction::OP_OMAP_SETKEYS:
2992 {
2993 const coll_t &_cid = i.get_cid(op->cid);
2994 const ghobject_t &oid = i.get_oid(op->oid);
2995 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2996 _cid : _cid.get_temp();
2997 map<string, bufferlist> aset;
2998 i.decode_attrset(aset);
2999 tracepoint(objectstore, omap_setkeys_enter, osr_name);
3000 if (_check_replay_guard(cid, oid, spos) > 0)
3001 r = _omap_setkeys(cid, oid, aset, spos);
3002 tracepoint(objectstore, omap_setkeys_exit, r);
3003 }
3004 break;
3005 case Transaction::OP_OMAP_RMKEYS:
3006 {
3007 const coll_t &_cid = i.get_cid(op->cid);
3008 const ghobject_t &oid = i.get_oid(op->oid);
3009 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3010 _cid : _cid.get_temp();
3011 set<string> keys;
3012 i.decode_keyset(keys);
3013 tracepoint(objectstore, omap_rmkeys_enter, osr_name);
3014 if (_check_replay_guard(cid, oid, spos) > 0)
3015 r = _omap_rmkeys(cid, oid, keys, spos);
3016 tracepoint(objectstore, omap_rmkeys_exit, r);
3017 }
3018 break;
3019 case Transaction::OP_OMAP_RMKEYRANGE:
3020 {
3021 const coll_t &_cid = i.get_cid(op->cid);
3022 const ghobject_t &oid = i.get_oid(op->oid);
3023 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3024 _cid : _cid.get_temp();
3025 string first, last;
3026 first = i.decode_string();
3027 last = i.decode_string();
3028 tracepoint(objectstore, omap_rmkeyrange_enter, osr_name);
3029 if (_check_replay_guard(cid, oid, spos) > 0)
3030 r = _omap_rmkeyrange(cid, oid, first, last, spos);
3031 tracepoint(objectstore, omap_rmkeyrange_exit, r);
3032 }
3033 break;
3034 case Transaction::OP_OMAP_SETHEADER:
3035 {
3036 const coll_t &_cid = i.get_cid(op->cid);
3037 const ghobject_t &oid = i.get_oid(op->oid);
3038 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3039 _cid : _cid.get_temp();
3040 bufferlist bl;
3041 i.decode_bl(bl);
3042 tracepoint(objectstore, omap_setheader_enter, osr_name);
3043 if (_check_replay_guard(cid, oid, spos) > 0)
3044 r = _omap_setheader(cid, oid, bl, spos);
3045 tracepoint(objectstore, omap_setheader_exit, r);
3046 }
3047 break;
3048 case Transaction::OP_SPLIT_COLLECTION:
3049 {
3050 assert(0 == "not legacy journal; upgrade to firefly first");
3051 }
3052 break;
3053 case Transaction::OP_SPLIT_COLLECTION2:
3054 {
3055 coll_t cid = i.get_cid(op->cid);
3056 uint32_t bits = op->split_bits;
3057 uint32_t rem = op->split_rem;
3058 coll_t dest = i.get_cid(op->dest_cid);
3059 tracepoint(objectstore, split_coll2_enter, osr_name);
3060 r = _split_collection(cid, bits, rem, dest, spos);
3061 tracepoint(objectstore, split_coll2_exit, r);
3062 }
3063 break;
3064
3065 case Transaction::OP_SETALLOCHINT:
3066 {
3067 const coll_t &_cid = i.get_cid(op->cid);
3068 const ghobject_t &oid = i.get_oid(op->oid);
3069 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3070 _cid : _cid.get_temp();
3071 uint64_t expected_object_size = op->expected_object_size;
3072 uint64_t expected_write_size = op->expected_write_size;
3073 tracepoint(objectstore, setallochint_enter, osr_name);
3074 if (_check_replay_guard(cid, oid, spos) > 0)
3075 r = _set_alloc_hint(cid, oid, expected_object_size,
3076 expected_write_size);
3077 tracepoint(objectstore, setallochint_exit, r);
3078 }
3079 break;
3080
3081 default:
3082 derr << "bad op " << op->op << dendl;
3083 ceph_abort();
3084 }
3085
3086 if (r < 0) {
3087 bool ok = false;
3088
3089 if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE ||
3090 op->op == Transaction::OP_CLONE ||
3091 op->op == Transaction::OP_CLONERANGE2 ||
3092 op->op == Transaction::OP_COLL_ADD ||
3093 op->op == Transaction::OP_SETATTR ||
3094 op->op == Transaction::OP_SETATTRS ||
3095 op->op == Transaction::OP_RMATTR ||
3096 op->op == Transaction::OP_OMAP_SETKEYS ||
3097 op->op == Transaction::OP_OMAP_RMKEYS ||
3098 op->op == Transaction::OP_OMAP_RMKEYRANGE ||
3099 op->op == Transaction::OP_OMAP_SETHEADER))
3100 // -ENOENT is normally okay
3101 // ...including on a replayed OP_RMCOLL with checkpoint mode
3102 ok = true;
3103 if (r == -ENODATA)
3104 ok = true;
3105
3106 if (op->op == Transaction::OP_SETALLOCHINT)
3107 // Either EOPNOTSUPP or EINVAL most probably. EINVAL in most
3108 // cases means invalid hint size (e.g. too big, not a multiple
3109 // of block size, etc) or, at least on xfs, an attempt to set
3110 // or change it when the file is not empty. However,
3111 // OP_SETALLOCHINT is advisory, so ignore all errors.
3112 ok = true;
3113
3114 if (replaying && !backend->can_checkpoint()) {
3115 if (r == -EEXIST && op->op == Transaction::OP_MKCOLL) {
3116 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3117 ok = true;
3118 }
3119 if (r == -EEXIST && op->op == Transaction::OP_COLL_ADD) {
3120 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3121 ok = true;
3122 }
3123 if (r == -EEXIST && op->op == Transaction::OP_COLL_MOVE) {
3124 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3125 ok = true;
3126 }
3127 if (r == -ERANGE) {
3128 dout(10) << "tolerating ERANGE on replay" << dendl;
3129 ok = true;
3130 }
3131 if (r == -ENOENT) {
3132 dout(10) << "tolerating ENOENT on replay" << dendl;
3133 ok = true;
3134 }
3135 }
3136
3137 if (!ok) {
3138 const char *msg = "unexpected error code";
3139
3140 if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE ||
3141 op->op == Transaction::OP_CLONE ||
3142 op->op == Transaction::OP_CLONERANGE2)) {
3143 msg = "ENOENT on clone suggests osd bug";
3144 } else if (r == -ENOSPC) {
3145 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
3146 // by partially applying transactions.
3147 msg = "ENOSPC from disk filesystem, misconfigured cluster";
3148 } else if (r == -ENOTEMPTY) {
3149 msg = "ENOTEMPTY suggests garbage data in osd data dir";
3150 } else if (r == -EPERM) {
3151 msg = "EPERM suggests file(s) in osd data dir not owned by ceph user, or leveldb corruption";
3152 }
3153
3154 derr << " error " << cpp_strerror(r) << " not handled on operation " << op
3155 << " (" << spos << ", or op " << spos.op << ", counting from 0)" << dendl;
3156 dout(0) << msg << dendl;
3157 dout(0) << " transaction dump:\n";
3158 JSONFormatter f(true);
3159 f.open_object_section("transaction");
3160 t.dump(&f);
3161 f.close_section();
3162 f.flush(*_dout);
3163 *_dout << dendl;
3164
3165 if (r == -EMFILE) {
3166 dump_open_fds(cct);
3167 }
3168
3169 assert(0 == "unexpected error");
3170 }
3171 }
3172
3173 spos.op++;
3174 }
3175
3176 _inject_failure();
3177 }
3178
3179 /*********************************************/
3180
3181
3182
3183 // --------------------
3184 // objects
3185
3186 bool FileStore::exists(const coll_t& _cid, const ghobject_t& oid)
3187 {
3188 tracepoint(objectstore, exists_enter, _cid.c_str());
3189 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3190 struct stat st;
3191 bool retval = stat(cid, oid, &st) == 0;
3192 tracepoint(objectstore, exists_exit, retval);
3193 return retval;
3194 }
3195
3196 int FileStore::stat(
3197 const coll_t& _cid, const ghobject_t& oid, struct stat *st, bool allow_eio)
3198 {
3199 tracepoint(objectstore, stat_enter, _cid.c_str());
3200 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3201 int r = lfn_stat(cid, oid, st);
3202 assert(allow_eio || !m_filestore_fail_eio || r != -EIO);
3203 if (r < 0) {
3204 dout(10) << __FUNC__ << ": " << cid << "/" << oid
3205 << " = " << r << dendl;
3206 } else {
3207 dout(10) << __FUNC__ << ": " << cid << "/" << oid
3208 << " = " << r
3209 << " (size " << st->st_size << ")" << dendl;
3210 }
3211 if (cct->_conf->filestore_debug_inject_read_err &&
3212 debug_mdata_eio(oid)) {
3213 return -EIO;
3214 } else {
3215 tracepoint(objectstore, stat_exit, r);
3216 return r;
3217 }
3218 }
3219
3220 int FileStore::set_collection_opts(
3221 const coll_t& cid,
3222 const pool_opts_t& opts)
3223 {
3224 return -EOPNOTSUPP;
3225 }
3226
3227 int FileStore::read(
3228 const coll_t& _cid,
3229 const ghobject_t& oid,
3230 uint64_t offset,
3231 size_t len,
3232 bufferlist& bl,
3233 uint32_t op_flags)
3234 {
3235 int got;
3236 tracepoint(objectstore, read_enter, _cid.c_str(), offset, len);
3237 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3238
3239 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3240
3241 FDRef fd;
3242 int r = lfn_open(cid, oid, false, &fd);
3243 if (r < 0) {
3244 dout(10) << __FUNC__ << ": (" << cid << "/" << oid << ") open error: "
3245 << cpp_strerror(r) << dendl;
3246 return r;
3247 }
3248
3249 if (offset == 0 && len == 0) {
3250 struct stat st;
3251 memset(&st, 0, sizeof(struct stat));
3252 int r = ::fstat(**fd, &st);
3253 assert(r == 0);
3254 len = st.st_size;
3255 }
3256
3257 #ifdef HAVE_POSIX_FADVISE
3258 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_RANDOM)
3259 posix_fadvise(**fd, offset, len, POSIX_FADV_RANDOM);
3260 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL)
3261 posix_fadvise(**fd, offset, len, POSIX_FADV_SEQUENTIAL);
3262 #endif
3263
3264 bufferptr bptr(len); // prealloc space for entire read
3265 got = safe_pread(**fd, bptr.c_str(), len, offset);
3266 if (got < 0) {
3267 dout(10) << __FUNC__ << ": (" << cid << "/" << oid << ") pread error: " << cpp_strerror(got) << dendl;
3268 lfn_close(fd);
3269 return got;
3270 }
3271 bptr.set_length(got); // properly size the buffer
3272 bl.clear();
3273 bl.push_back(std::move(bptr)); // put it in the target bufferlist
3274
3275 #ifdef HAVE_POSIX_FADVISE
3276 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED)
3277 posix_fadvise(**fd, offset, len, POSIX_FADV_DONTNEED);
3278 if (op_flags & (CEPH_OSD_OP_FLAG_FADVISE_RANDOM | CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL))
3279 posix_fadvise(**fd, offset, len, POSIX_FADV_NORMAL);
3280 #endif
3281
3282 if (m_filestore_sloppy_crc && (!replaying || backend->can_checkpoint())) {
3283 ostringstream ss;
3284 int errors = backend->_crc_verify_read(**fd, offset, got, bl, &ss);
3285 if (errors != 0) {
3286 dout(0) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~"
3287 << got << " ... BAD CRC:\n" << ss.str() << dendl;
3288 assert(0 == "bad crc on read");
3289 }
3290 }
3291
3292 lfn_close(fd);
3293
3294 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~"
3295 << got << "/" << len << dendl;
3296 if (cct->_conf->filestore_debug_inject_read_err &&
3297 debug_data_eio(oid)) {
3298 return -EIO;
3299 } else if (cct->_conf->filestore_debug_random_read_err &&
3300 (rand() % (int)(cct->_conf->filestore_debug_random_read_err * 100.0)) == 0) {
3301 dout(0) << __func__ << ": inject random EIO" << dendl;
3302 return -EIO;
3303 } else {
3304 tracepoint(objectstore, read_exit, got);
3305 return got;
3306 }
3307 }
3308
3309 int FileStore::_do_fiemap(int fd, uint64_t offset, size_t len,
3310 map<uint64_t, uint64_t> *m)
3311 {
3312 uint64_t i;
3313 struct fiemap_extent *extent = NULL;
3314 struct fiemap *fiemap = NULL;
3315 int r = 0;
3316
3317 more:
3318 r = backend->do_fiemap(fd, offset, len, &fiemap);
3319 if (r < 0)
3320 return r;
3321
3322 if (fiemap->fm_mapped_extents == 0) {
3323 free(fiemap);
3324 return r;
3325 }
3326
3327 extent = &fiemap->fm_extents[0];
3328
3329 /* start where we were asked to start */
3330 if (extent->fe_logical < offset) {
3331 extent->fe_length -= offset - extent->fe_logical;
3332 extent->fe_logical = offset;
3333 }
3334
3335 i = 0;
3336
3337 struct fiemap_extent *last = nullptr;
3338 while (i < fiemap->fm_mapped_extents) {
3339 struct fiemap_extent *next = extent + 1;
3340
3341 dout(10) << __FUNC__ << ": fm_mapped_extents=" << fiemap->fm_mapped_extents
3342 << " fe_logical=" << extent->fe_logical << " fe_length=" << extent->fe_length << dendl;
3343
3344 /* try to merge extents */
3345 while ((i < fiemap->fm_mapped_extents - 1) &&
3346 (extent->fe_logical + extent->fe_length == next->fe_logical)) {
3347 next->fe_length += extent->fe_length;
3348 next->fe_logical = extent->fe_logical;
3349 extent = next;
3350 next = extent + 1;
3351 i++;
3352 }
3353
3354 if (extent->fe_logical + extent->fe_length > offset + len)
3355 extent->fe_length = offset + len - extent->fe_logical;
3356 (*m)[extent->fe_logical] = extent->fe_length;
3357 i++;
3358 last = extent++;
3359 }
3360 uint64_t xoffset = last->fe_logical + last->fe_length - offset;
3361 offset = last->fe_logical + last->fe_length;
3362 len -= xoffset;
3363 const bool is_last = (last->fe_flags & FIEMAP_EXTENT_LAST) || (len == 0);
3364 free(fiemap);
3365 if (!is_last) {
3366 goto more;
3367 }
3368
3369 return r;
3370 }
3371
3372 int FileStore::_do_seek_hole_data(int fd, uint64_t offset, size_t len,
3373 map<uint64_t, uint64_t> *m)
3374 {
3375 #if defined(__linux__) && defined(SEEK_HOLE) && defined(SEEK_DATA)
3376 off_t hole_pos, data_pos;
3377 int r = 0;
3378
3379 // If lseek fails with errno setting to be ENXIO, this means the current
3380 // file offset is beyond the end of the file.
3381 off_t start = offset;
3382 while(start < (off_t)(offset + len)) {
3383 data_pos = lseek(fd, start, SEEK_DATA);
3384 if (data_pos < 0) {
3385 if (errno == ENXIO)
3386 break;
3387 else {
3388 r = -errno;
3389 dout(10) << "failed to lseek: " << cpp_strerror(r) << dendl;
3390 return r;
3391 }
3392 } else if (data_pos > (off_t)(offset + len)) {
3393 break;
3394 }
3395
3396 hole_pos = lseek(fd, data_pos, SEEK_HOLE);
3397 if (hole_pos < 0) {
3398 if (errno == ENXIO) {
3399 break;
3400 } else {
3401 r = -errno;
3402 dout(10) << "failed to lseek: " << cpp_strerror(r) << dendl;
3403 return r;
3404 }
3405 }
3406
3407 if (hole_pos >= (off_t)(offset + len)) {
3408 (*m)[data_pos] = offset + len - data_pos;
3409 break;
3410 }
3411 (*m)[data_pos] = hole_pos - data_pos;
3412 start = hole_pos;
3413 }
3414
3415 return r;
3416 #else
3417 (*m)[offset] = len;
3418 return 0;
3419 #endif
3420 }
3421
3422 int FileStore::fiemap(const coll_t& _cid, const ghobject_t& oid,
3423 uint64_t offset, size_t len,
3424 bufferlist& bl)
3425 {
3426 map<uint64_t, uint64_t> exomap;
3427 int r = fiemap(_cid, oid, offset, len, exomap);
3428 if (r >= 0) {
3429 ::encode(exomap, bl);
3430 }
3431 return r;
3432 }
3433
3434 int FileStore::fiemap(const coll_t& _cid, const ghobject_t& oid,
3435 uint64_t offset, size_t len,
3436 map<uint64_t, uint64_t>& destmap)
3437 {
3438 tracepoint(objectstore, fiemap_enter, _cid.c_str(), offset, len);
3439 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3440 destmap.clear();
3441
3442 if ((!backend->has_seek_data_hole() && !backend->has_fiemap()) ||
3443 len <= (size_t)m_filestore_fiemap_threshold) {
3444 destmap[offset] = len;
3445 return 0;
3446 }
3447
3448 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3449
3450 FDRef fd;
3451
3452 int r = lfn_open(cid, oid, false, &fd);
3453 if (r < 0) {
3454 dout(10) << "read couldn't open " << cid << "/" << oid << ": " << cpp_strerror(r) << dendl;
3455 goto done;
3456 }
3457
3458 if (backend->has_seek_data_hole()) {
3459 dout(15) << "seek_data/seek_hole " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3460 r = _do_seek_hole_data(**fd, offset, len, &destmap);
3461 } else if (backend->has_fiemap()) {
3462 dout(15) << "fiemap ioctl" << cid << "/" << oid << " " << offset << "~" << len << dendl;
3463 r = _do_fiemap(**fd, offset, len, &destmap);
3464 }
3465
3466 lfn_close(fd);
3467
3468 done:
3469
3470 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << " num_extents=" << destmap.size() << " " << destmap << dendl;
3471 assert(!m_filestore_fail_eio || r != -EIO);
3472 tracepoint(objectstore, fiemap_exit, r);
3473 return r;
3474 }
3475
3476 int FileStore::_remove(const coll_t& cid, const ghobject_t& oid,
3477 const SequencerPosition &spos)
3478 {
3479 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
3480 int r = lfn_unlink(cid, oid, spos);
3481 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
3482 return r;
3483 }
3484
3485 int FileStore::_truncate(const coll_t& cid, const ghobject_t& oid, uint64_t size)
3486 {
3487 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " size " << size << dendl;
3488 int r = lfn_truncate(cid, oid, size);
3489 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " size " << size << " = " << r << dendl;
3490 return r;
3491 }
3492
3493
3494 int FileStore::_touch(const coll_t& cid, const ghobject_t& oid)
3495 {
3496 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
3497
3498 FDRef fd;
3499 int r = lfn_open(cid, oid, true, &fd);
3500 if (r < 0) {
3501 return r;
3502 } else {
3503 lfn_close(fd);
3504 }
3505 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
3506 return r;
3507 }
3508
3509 int FileStore::_write(const coll_t& cid, const ghobject_t& oid,
3510 uint64_t offset, size_t len,
3511 const bufferlist& bl, uint32_t fadvise_flags)
3512 {
3513 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3514 int r;
3515
3516 FDRef fd;
3517 r = lfn_open(cid, oid, true, &fd);
3518 if (r < 0) {
3519 dout(0) << __FUNC__ << ": couldn't open " << cid << "/"
3520 << oid << ": "
3521 << cpp_strerror(r) << dendl;
3522 goto out;
3523 }
3524
3525 // write
3526 r = bl.write_fd(**fd, offset);
3527 if (r < 0) {
3528 derr << __FUNC__ << ": write_fd on " << cid << "/" << oid
3529 << " error: " << cpp_strerror(r) << dendl;
3530 lfn_close(fd);
3531 goto out;
3532 }
3533 r = bl.length();
3534
3535 if (r >= 0 && m_filestore_sloppy_crc) {
3536 int rc = backend->_crc_update_write(**fd, offset, len, bl);
3537 assert(rc >= 0);
3538 }
3539
3540 if (replaying || m_disable_wbthrottle) {
3541 if (fadvise_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED) {
3542 #ifdef HAVE_POSIX_FADVISE
3543 posix_fadvise(**fd, 0, 0, POSIX_FADV_DONTNEED);
3544 #endif
3545 }
3546 } else {
3547 wbthrottle.queue_wb(fd, oid, offset, len,
3548 fadvise_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
3549 }
3550
3551 lfn_close(fd);
3552
3553 out:
3554 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << dendl;
3555 return r;
3556 }
3557
3558 int FileStore::_zero(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len)
3559 {
3560 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3561 int ret = 0;
3562
3563 if (cct->_conf->filestore_punch_hole) {
3564 #ifdef CEPH_HAVE_FALLOCATE
3565 # if !defined(DARWIN) && !defined(__FreeBSD__)
3566 # ifdef FALLOC_FL_KEEP_SIZE
3567 // first try to punch a hole.
3568 FDRef fd;
3569 ret = lfn_open(cid, oid, false, &fd);
3570 if (ret < 0) {
3571 goto out;
3572 }
3573
3574 struct stat st;
3575 ret = ::fstat(**fd, &st);
3576 if (ret < 0) {
3577 ret = -errno;
3578 lfn_close(fd);
3579 goto out;
3580 }
3581
3582 // first try fallocate
3583 ret = fallocate(**fd, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE,
3584 offset, len);
3585 if (ret < 0) {
3586 ret = -errno;
3587 } else {
3588 // ensure we extend file size, if needed
3589 if (len > 0 && offset + len > (uint64_t)st.st_size) {
3590 ret = ::ftruncate(**fd, offset + len);
3591 if (ret < 0) {
3592 ret = -errno;
3593 lfn_close(fd);
3594 goto out;
3595 }
3596 }
3597 }
3598 lfn_close(fd);
3599
3600 if (ret >= 0 && m_filestore_sloppy_crc) {
3601 int rc = backend->_crc_update_zero(**fd, offset, len);
3602 assert(rc >= 0);
3603 }
3604
3605 if (ret == 0)
3606 goto out; // yay!
3607 if (ret != -EOPNOTSUPP)
3608 goto out; // some other error
3609 # endif
3610 # endif
3611 #endif
3612 }
3613
3614 // lame, kernel is old and doesn't support it.
3615 // write zeros.. yuck!
3616 dout(20) << __FUNC__ << ": falling back to writing zeros" << dendl;
3617 {
3618 bufferlist bl;
3619 bl.append_zero(len);
3620 ret = _write(cid, oid, offset, len, bl);
3621 }
3622
3623 #ifdef CEPH_HAVE_FALLOCATE
3624 # if !defined(DARWIN) && !defined(__FreeBSD__)
3625 # ifdef FALLOC_FL_KEEP_SIZE
3626 out:
3627 # endif
3628 # endif
3629 #endif
3630 dout(20) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << ret << dendl;
3631 return ret;
3632 }
3633
3634 int FileStore::_clone(const coll_t& cid, const ghobject_t& oldoid, const ghobject_t& newoid,
3635 const SequencerPosition& spos)
3636 {
3637 dout(15) << __FUNC__ << ": " << cid << "/" << oldoid << " -> " << cid << "/" << newoid << dendl;
3638
3639 if (_check_replay_guard(cid, newoid, spos) < 0)
3640 return 0;
3641
3642 int r;
3643 FDRef o, n;
3644 {
3645 Index index;
3646 r = lfn_open(cid, oldoid, false, &o, &index);
3647 if (r < 0) {
3648 goto out2;
3649 }
3650 assert(NULL != (index.index));
3651 RWLock::WLocker l((index.index)->access_lock);
3652
3653 r = lfn_open(cid, newoid, true, &n, &index);
3654 if (r < 0) {
3655 goto out;
3656 }
3657 r = ::ftruncate(**n, 0);
3658 if (r < 0) {
3659 r = -errno;
3660 goto out3;
3661 }
3662 struct stat st;
3663 r = ::fstat(**o, &st);
3664 if (r < 0) {
3665 r = -errno;
3666 goto out3;
3667 }
3668
3669 r = _do_clone_range(**o, **n, 0, st.st_size, 0);
3670 if (r < 0) {
3671 goto out3;
3672 }
3673
3674 dout(20) << "objectmap clone" << dendl;
3675 r = object_map->clone(oldoid, newoid, &spos);
3676 if (r < 0 && r != -ENOENT)
3677 goto out3;
3678 }
3679
3680 {
3681 char buf[2];
3682 map<string, bufferptr> aset;
3683 r = _fgetattrs(**o, aset);
3684 if (r < 0)
3685 goto out3;
3686
3687 r = chain_fgetxattr(**o, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
3688 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
3689 r = chain_fsetxattr<true, true>(**n, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
3690 sizeof(XATTR_NO_SPILL_OUT));
3691 } else {
3692 r = chain_fsetxattr<true, true>(**n, XATTR_SPILL_OUT_NAME, XATTR_SPILL_OUT,
3693 sizeof(XATTR_SPILL_OUT));
3694 }
3695 if (r < 0)
3696 goto out3;
3697
3698 r = _fsetattrs(**n, aset);
3699 if (r < 0)
3700 goto out3;
3701 }
3702
3703 // clone is non-idempotent; record our work.
3704 _set_replay_guard(**n, spos, &newoid);
3705
3706 out3:
3707 lfn_close(n);
3708 out:
3709 lfn_close(o);
3710 out2:
3711 dout(10) << __FUNC__ << ": " << cid << "/" << oldoid << " -> " << cid << "/" << newoid << " = " << r << dendl;
3712 assert(!m_filestore_fail_eio || r != -EIO);
3713 return r;
3714 }
3715
3716 int FileStore::_do_clone_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff)
3717 {
3718 dout(20) << __FUNC__ << ": copy " << srcoff << "~" << len << " to " << dstoff << dendl;
3719 return backend->clone_range(from, to, srcoff, len, dstoff);
3720 }
3721
3722 int FileStore::_do_sparse_copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff)
3723 {
3724 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << dendl;
3725 int r = 0;
3726 map<uint64_t, uint64_t> exomap;
3727 // fiemap doesn't allow zero length
3728 if (len == 0)
3729 return 0;
3730
3731 if (backend->has_seek_data_hole()) {
3732 dout(15) << "seek_data/seek_hole " << from << " " << srcoff << "~" << len << dendl;
3733 r = _do_seek_hole_data(from, srcoff, len, &exomap);
3734 } else if (backend->has_fiemap()) {
3735 dout(15) << "fiemap ioctl" << from << " " << srcoff << "~" << len << dendl;
3736 r = _do_fiemap(from, srcoff, len, &exomap);
3737 }
3738
3739
3740 int64_t written = 0;
3741 if (r < 0)
3742 goto out;
3743
3744 for (map<uint64_t, uint64_t>::iterator miter = exomap.begin(); miter != exomap.end(); ++miter) {
3745 uint64_t it_off = miter->first - srcoff + dstoff;
3746 r = _do_copy_range(from, to, miter->first, miter->second, it_off, true);
3747 if (r < 0) {
3748 derr << __FUNC__ << ": copy error at " << miter->first << "~" << miter->second
3749 << " to " << it_off << ", " << cpp_strerror(r) << dendl;
3750 break;
3751 }
3752 written += miter->second;
3753 }
3754
3755 if (r >= 0) {
3756 if (m_filestore_sloppy_crc) {
3757 int rc = backend->_crc_update_clone_range(from, to, srcoff, len, dstoff);
3758 assert(rc >= 0);
3759 }
3760 struct stat st;
3761 r = ::fstat(to, &st);
3762 if (r < 0) {
3763 r = -errno;
3764 derr << __FUNC__ << ": fstat error at " << to << " " << cpp_strerror(r) << dendl;
3765 goto out;
3766 }
3767 if (st.st_size < (int)(dstoff + len)) {
3768 r = ::ftruncate(to, dstoff + len);
3769 if (r < 0) {
3770 r = -errno;
3771 derr << __FUNC__ << ": ftruncate error at " << dstoff+len << " " << cpp_strerror(r) << dendl;
3772 goto out;
3773 }
3774 }
3775 r = written;
3776 }
3777
3778 out:
3779 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
3780 return r;
3781 }
3782
3783 int FileStore::_do_copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff, bool skip_sloppycrc)
3784 {
3785 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << dendl;
3786 int r = 0;
3787 loff_t pos = srcoff;
3788 loff_t end = srcoff + len;
3789 int buflen = 4096 * 16; //limit by pipe max size.see fcntl
3790
3791 #ifdef CEPH_HAVE_SPLICE
3792 if (backend->has_splice()) {
3793 int pipefd[2];
3794 if (pipe(pipefd) < 0) {
3795 r = -errno;
3796 derr << " pipe " << " got " << cpp_strerror(r) << dendl;
3797 return r;
3798 }
3799
3800 loff_t dstpos = dstoff;
3801 while (pos < end) {
3802 int l = MIN(end-pos, buflen);
3803 r = safe_splice(from, &pos, pipefd[1], NULL, l, SPLICE_F_NONBLOCK);
3804 dout(10) << " safe_splice read from " << pos << "~" << l << " got " << r << dendl;
3805 if (r < 0) {
3806 derr << __FUNC__ << ": safe_splice read error at " << pos << "~" << len
3807 << ", " << cpp_strerror(r) << dendl;
3808 break;
3809 }
3810 if (r == 0) {
3811 // hrm, bad source range, wtf.
3812 r = -ERANGE;
3813 derr << __FUNC__ << ": got short read result at " << pos
3814 << " of fd " << from << " len " << len << dendl;
3815 break;
3816 }
3817
3818 r = safe_splice(pipefd[0], NULL, to, &dstpos, r, 0);
3819 dout(10) << " safe_splice write to " << to << " len " << r
3820 << " got " << r << dendl;
3821 if (r < 0) {
3822 derr << __FUNC__ << ": write error at " << pos << "~"
3823 << r << ", " << cpp_strerror(r) << dendl;
3824 break;
3825 }
3826 }
3827 close(pipefd[0]);
3828 close(pipefd[1]);
3829 } else
3830 #endif
3831 {
3832 int64_t actual;
3833
3834 actual = ::lseek64(from, srcoff, SEEK_SET);
3835 if (actual != (int64_t)srcoff) {
3836 if (actual < 0)
3837 r = -errno;
3838 else
3839 r = -EINVAL;
3840 derr << "lseek64 to " << srcoff << " got " << cpp_strerror(r) << dendl;
3841 return r;
3842 }
3843 actual = ::lseek64(to, dstoff, SEEK_SET);
3844 if (actual != (int64_t)dstoff) {
3845 if (actual < 0)
3846 r = -errno;
3847 else
3848 r = -EINVAL;
3849 derr << "lseek64 to " << dstoff << " got " << cpp_strerror(r) << dendl;
3850 return r;
3851 }
3852
3853 char buf[buflen];
3854 while (pos < end) {
3855 int l = MIN(end-pos, buflen);
3856 r = ::read(from, buf, l);
3857 dout(25) << " read from " << pos << "~" << l << " got " << r << dendl;
3858 if (r < 0) {
3859 if (errno == EINTR) {
3860 continue;
3861 } else {
3862 r = -errno;
3863 derr << __FUNC__ << ": read error at " << pos << "~" << len
3864 << ", " << cpp_strerror(r) << dendl;
3865 break;
3866 }
3867 }
3868 if (r == 0) {
3869 // hrm, bad source range, wtf.
3870 r = -ERANGE;
3871 derr << __FUNC__ << ": got short read result at " << pos
3872 << " of fd " << from << " len " << len << dendl;
3873 break;
3874 }
3875 int op = 0;
3876 while (op < r) {
3877 int r2 = safe_write(to, buf+op, r-op);
3878 dout(25) << " write to " << to << " len " << (r-op)
3879 << " got " << r2 << dendl;
3880 if (r2 < 0) {
3881 r = r2;
3882 derr << __FUNC__ << ": write error at " << pos << "~"
3883 << r-op << ", " << cpp_strerror(r) << dendl;
3884
3885 break;
3886 }
3887 op += (r-op);
3888 }
3889 if (r < 0)
3890 break;
3891 pos += r;
3892 }
3893 }
3894
3895 if (r < 0 && replaying) {
3896 assert(r == -ERANGE);
3897 derr << __FUNC__ << ": short source tolerated because we are replaying" << dendl;
3898 r = len;
3899 }
3900 assert(replaying || pos == end);
3901 if (r >= 0 && !skip_sloppycrc && m_filestore_sloppy_crc) {
3902 int rc = backend->_crc_update_clone_range(from, to, srcoff, len, dstoff);
3903 assert(rc >= 0);
3904 }
3905 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
3906 return r;
3907 }
3908
3909 int FileStore::_clone_range(const coll_t& oldcid, const ghobject_t& oldoid, const coll_t& newcid, const ghobject_t& newoid,
3910 uint64_t srcoff, uint64_t len, uint64_t dstoff,
3911 const SequencerPosition& spos)
3912 {
3913 dout(15) << __FUNC__ << ": " << oldcid << "/" << oldoid << " -> " << newcid << "/" << newoid << " " << srcoff << "~" << len << " to " << dstoff << dendl;
3914
3915 if (_check_replay_guard(newcid, newoid, spos) < 0)
3916 return 0;
3917
3918 int r;
3919 FDRef o, n;
3920 r = lfn_open(oldcid, oldoid, false, &o);
3921 if (r < 0) {
3922 goto out2;
3923 }
3924 r = lfn_open(newcid, newoid, true, &n);
3925 if (r < 0) {
3926 goto out;
3927 }
3928 r = _do_clone_range(**o, **n, srcoff, len, dstoff);
3929 if (r < 0) {
3930 goto out3;
3931 }
3932
3933 // clone is non-idempotent; record our work.
3934 _set_replay_guard(**n, spos, &newoid);
3935
3936 out3:
3937 lfn_close(n);
3938 out:
3939 lfn_close(o);
3940 out2:
3941 dout(10) << __FUNC__ << ": " << oldcid << "/" << oldoid << " -> " << newcid << "/" << newoid << " "
3942 << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
3943 return r;
3944 }
3945
3946 class SyncEntryTimeout : public Context {
3947 public:
3948 CephContext* cct;
3949 explicit SyncEntryTimeout(CephContext* cct, int commit_timeo)
3950 : cct(cct), m_commit_timeo(commit_timeo)
3951 {
3952 }
3953
3954 void finish(int r) override {
3955 BackTrace *bt = new BackTrace(1);
3956 generic_dout(-1) << "FileStore: sync_entry timed out after "
3957 << m_commit_timeo << " seconds.\n";
3958 bt->print(*_dout);
3959 *_dout << dendl;
3960 delete bt;
3961 ceph_abort();
3962 }
3963 private:
3964 int m_commit_timeo;
3965 };
3966
3967 void FileStore::sync_entry()
3968 {
3969 lock.Lock();
3970 while (!stop) {
3971 utime_t max_interval;
3972 max_interval.set_from_double(m_filestore_max_sync_interval);
3973 utime_t min_interval;
3974 min_interval.set_from_double(m_filestore_min_sync_interval);
3975
3976 utime_t startwait = ceph_clock_now();
3977 if (!force_sync) {
3978 dout(20) << __FUNC__ << ": waiting for max_interval " << max_interval << dendl;
3979 sync_cond.WaitInterval(lock, max_interval);
3980 } else {
3981 dout(20) << __FUNC__ << ": not waiting, force_sync set" << dendl;
3982 }
3983
3984 if (force_sync) {
3985 dout(20) << __FUNC__ << ": force_sync set" << dendl;
3986 force_sync = false;
3987 } else if (stop) {
3988 dout(20) << __FUNC__ << ": stop set" << dendl;
3989 break;
3990 } else {
3991 // wait for at least the min interval
3992 utime_t woke = ceph_clock_now();
3993 woke -= startwait;
3994 dout(20) << __FUNC__ << ": woke after " << woke << dendl;
3995 if (woke < min_interval) {
3996 utime_t t = min_interval;
3997 t -= woke;
3998 dout(20) << __FUNC__ << ": waiting for another " << t
3999 << " to reach min interval " << min_interval << dendl;
4000 sync_cond.WaitInterval(lock, t);
4001 }
4002 }
4003
4004 list<Context*> fin;
4005 again:
4006 fin.swap(sync_waiters);
4007 lock.Unlock();
4008
4009 op_tp.pause();
4010 if (apply_manager.commit_start()) {
4011 utime_t start = ceph_clock_now();
4012 uint64_t cp = apply_manager.get_committing_seq();
4013
4014 sync_entry_timeo_lock.Lock();
4015 SyncEntryTimeout *sync_entry_timeo =
4016 new SyncEntryTimeout(cct, m_filestore_commit_timeout);
4017 if (!timer.add_event_after(m_filestore_commit_timeout,
4018 sync_entry_timeo)) {
4019 sync_entry_timeo = nullptr;
4020 }
4021 sync_entry_timeo_lock.Unlock();
4022
4023 logger->set(l_filestore_committing, 1);
4024
4025 dout(15) << __FUNC__ << ": committing " << cp << dendl;
4026 stringstream errstream;
4027 if (cct->_conf->filestore_debug_omap_check && !object_map->check(errstream)) {
4028 derr << errstream.str() << dendl;
4029 ceph_abort();
4030 }
4031
4032 if (backend->can_checkpoint()) {
4033 int err = write_op_seq(op_fd, cp);
4034 if (err < 0) {
4035 derr << "Error during write_op_seq: " << cpp_strerror(err) << dendl;
4036 assert(0 == "error during write_op_seq");
4037 }
4038
4039 char s[NAME_MAX];
4040 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)cp);
4041 uint64_t cid = 0;
4042 err = backend->create_checkpoint(s, &cid);
4043 if (err < 0) {
4044 int err = errno;
4045 derr << "snap create '" << s << "' got error " << err << dendl;
4046 assert(err == 0);
4047 }
4048
4049 snaps.push_back(cp);
4050 apply_manager.commit_started();
4051 op_tp.unpause();
4052
4053 if (cid > 0) {
4054 dout(20) << " waiting for checkpoint " << cid << " to complete" << dendl;
4055 err = backend->sync_checkpoint(cid);
4056 if (err < 0) {
4057 derr << "ioctl WAIT_SYNC got " << cpp_strerror(err) << dendl;
4058 assert(0 == "wait_sync got error");
4059 }
4060 dout(20) << " done waiting for checkpoint " << cid << " to complete" << dendl;
4061 }
4062 } else {
4063 apply_manager.commit_started();
4064 op_tp.unpause();
4065
4066 int err = object_map->sync();
4067 if (err < 0) {
4068 derr << "object_map sync got " << cpp_strerror(err) << dendl;
4069 assert(0 == "object_map sync returned error");
4070 }
4071
4072 err = backend->syncfs();
4073 if (err < 0) {
4074 derr << "syncfs got " << cpp_strerror(err) << dendl;
4075 assert(0 == "syncfs returned error");
4076 }
4077
4078 err = write_op_seq(op_fd, cp);
4079 if (err < 0) {
4080 derr << "Error during write_op_seq: " << cpp_strerror(err) << dendl;
4081 assert(0 == "error during write_op_seq");
4082 }
4083 err = ::fsync(op_fd);
4084 if (err < 0) {
4085 derr << "Error during fsync of op_seq: " << cpp_strerror(err) << dendl;
4086 assert(0 == "error during fsync of op_seq");
4087 }
4088 }
4089
4090 utime_t done = ceph_clock_now();
4091 utime_t lat = done - start;
4092 utime_t dur = done - startwait;
4093 dout(10) << __FUNC__ << ": commit took " << lat << ", interval was " << dur << dendl;
4094 utime_t max_pause_lat = logger->tget(l_filestore_sync_pause_max_lat);
4095 if (max_pause_lat < dur - lat) {
4096 logger->tinc(l_filestore_sync_pause_max_lat, dur - lat);
4097 }
4098
4099 logger->inc(l_filestore_commitcycle);
4100 logger->tinc(l_filestore_commitcycle_latency, lat);
4101 logger->tinc(l_filestore_commitcycle_interval, dur);
4102
4103 apply_manager.commit_finish();
4104 if (!m_disable_wbthrottle) {
4105 wbthrottle.clear();
4106 }
4107
4108 logger->set(l_filestore_committing, 0);
4109
4110 // remove old snaps?
4111 if (backend->can_checkpoint()) {
4112 char s[NAME_MAX];
4113 while (snaps.size() > 2) {
4114 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)snaps.front());
4115 snaps.pop_front();
4116 dout(10) << "removing snap '" << s << "'" << dendl;
4117 int r = backend->destroy_checkpoint(s);
4118 if (r) {
4119 int err = errno;
4120 derr << "unable to destroy snap '" << s << "' got " << cpp_strerror(err) << dendl;
4121 }
4122 }
4123 }
4124
4125 dout(15) << __FUNC__ << ": committed to op_seq " << cp << dendl;
4126
4127 if (sync_entry_timeo) {
4128 Mutex::Locker lock(sync_entry_timeo_lock);
4129 timer.cancel_event(sync_entry_timeo);
4130 }
4131 } else {
4132 op_tp.unpause();
4133 }
4134
4135 lock.Lock();
4136 finish_contexts(cct, fin, 0);
4137 fin.clear();
4138 if (!sync_waiters.empty()) {
4139 dout(10) << __FUNC__ << ": more waiters, committing again" << dendl;
4140 goto again;
4141 }
4142 if (!stop && journal && journal->should_commit_now()) {
4143 dout(10) << __FUNC__ << ": journal says we should commit again (probably is/was full)" << dendl;
4144 goto again;
4145 }
4146 }
4147 stop = false;
4148 lock.Unlock();
4149 }
4150
4151 void FileStore::_start_sync()
4152 {
4153 if (!journal) { // don't do a big sync if the journal is on
4154 dout(10) << __FUNC__ << dendl;
4155 sync_cond.Signal();
4156 } else {
4157 dout(10) << __FUNC__ << ": - NOOP (journal is on)" << dendl;
4158 }
4159 }
4160
4161 void FileStore::do_force_sync()
4162 {
4163 dout(10) << __FUNC__ << dendl;
4164 Mutex::Locker l(lock);
4165 force_sync = true;
4166 sync_cond.Signal();
4167 }
4168
4169 void FileStore::start_sync(Context *onsafe)
4170 {
4171 Mutex::Locker l(lock);
4172 sync_waiters.push_back(onsafe);
4173 sync_cond.Signal();
4174 force_sync = true;
4175 dout(10) << __FUNC__ << dendl;
4176 }
4177
4178 void FileStore::sync()
4179 {
4180 Mutex l("FileStore::sync");
4181 Cond c;
4182 bool done;
4183 C_SafeCond *fin = new C_SafeCond(&l, &c, &done);
4184
4185 start_sync(fin);
4186
4187 l.Lock();
4188 while (!done) {
4189 dout(10) << "sync waiting" << dendl;
4190 c.Wait(l);
4191 }
4192 l.Unlock();
4193 dout(10) << "sync done" << dendl;
4194 }
4195
4196 void FileStore::_flush_op_queue()
4197 {
4198 dout(10) << __FUNC__ << ": draining op tp" << dendl;
4199 op_wq.drain();
4200 dout(10) << __FUNC__ << ": waiting for apply finisher" << dendl;
4201 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
4202 (*it)->wait_for_empty();
4203 }
4204 }
4205
4206 /*
4207 * flush - make every queued write readable
4208 */
4209 void FileStore::flush()
4210 {
4211 dout(10) << __FUNC__ << dendl;
4212
4213 if (cct->_conf->filestore_blackhole) {
4214 // wait forever
4215 Mutex lock("FileStore::flush::lock");
4216 Cond cond;
4217 lock.Lock();
4218 while (true)
4219 cond.Wait(lock);
4220 ceph_abort();
4221 }
4222
4223 if (m_filestore_journal_writeahead) {
4224 if (journal)
4225 journal->flush();
4226 dout(10) << __FUNC__ << ": draining ondisk finisher" << dendl;
4227 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
4228 (*it)->wait_for_empty();
4229 }
4230 }
4231
4232 _flush_op_queue();
4233 dout(10) << __FUNC__ << ": complete" << dendl;
4234 }
4235
4236 /*
4237 * sync_and_flush - make every queued write readable AND committed to disk
4238 */
4239 void FileStore::sync_and_flush()
4240 {
4241 dout(10) << __FUNC__ << dendl;
4242
4243 if (m_filestore_journal_writeahead) {
4244 if (journal)
4245 journal->flush();
4246 _flush_op_queue();
4247 } else {
4248 // includes m_filestore_journal_parallel
4249 _flush_op_queue();
4250 sync();
4251 }
4252 dout(10) << __FUNC__ << ": done" << dendl;
4253 }
4254
4255 int FileStore::flush_journal()
4256 {
4257 dout(10) << __FUNC__ << dendl;
4258 sync_and_flush();
4259 sync();
4260 return 0;
4261 }
4262
4263 int FileStore::snapshot(const string& name)
4264 {
4265 dout(10) << __FUNC__ << ": " << name << dendl;
4266 sync_and_flush();
4267
4268 if (!backend->can_checkpoint()) {
4269 dout(0) << __FUNC__ << ": " << name << " failed, not supported" << dendl;
4270 return -EOPNOTSUPP;
4271 }
4272
4273 char s[NAME_MAX];
4274 snprintf(s, sizeof(s), CLUSTER_SNAP_ITEM, name.c_str());
4275
4276 int r = backend->create_checkpoint(s, NULL);
4277 if (r) {
4278 derr << __FUNC__ << ": " << name << " failed: " << cpp_strerror(r) << dendl;
4279 }
4280
4281 return r;
4282 }
4283
4284 // -------------------------------
4285 // attributes
4286
4287 int FileStore::_fgetattr(int fd, const char *name, bufferptr& bp)
4288 {
4289 char val[CHAIN_XATTR_MAX_BLOCK_LEN];
4290 int l = chain_fgetxattr(fd, name, val, sizeof(val));
4291 if (l >= 0) {
4292 bp = buffer::create(l);
4293 memcpy(bp.c_str(), val, l);
4294 } else if (l == -ERANGE) {
4295 l = chain_fgetxattr(fd, name, 0, 0);
4296 if (l > 0) {
4297 bp = buffer::create(l);
4298 l = chain_fgetxattr(fd, name, bp.c_str(), l);
4299 }
4300 }
4301 assert(!m_filestore_fail_eio || l != -EIO);
4302 return l;
4303 }
4304
4305 int FileStore::_fgetattrs(int fd, map<string,bufferptr>& aset)
4306 {
4307 // get attr list
4308 char names1[100];
4309 int len = chain_flistxattr(fd, names1, sizeof(names1)-1);
4310 char *names2 = 0;
4311 char *name = 0;
4312 if (len == -ERANGE) {
4313 len = chain_flistxattr(fd, 0, 0);
4314 if (len < 0) {
4315 assert(!m_filestore_fail_eio || len != -EIO);
4316 return len;
4317 }
4318 dout(10) << " -ERANGE, len is " << len << dendl;
4319 names2 = new char[len+1];
4320 len = chain_flistxattr(fd, names2, len);
4321 dout(10) << " -ERANGE, got " << len << dendl;
4322 if (len < 0) {
4323 assert(!m_filestore_fail_eio || len != -EIO);
4324 delete[] names2;
4325 return len;
4326 }
4327 name = names2;
4328 } else if (len < 0) {
4329 assert(!m_filestore_fail_eio || len != -EIO);
4330 return len;
4331 } else {
4332 name = names1;
4333 }
4334 name[len] = 0;
4335
4336 char *end = name + len;
4337 while (name < end) {
4338 char *attrname = name;
4339 if (parse_attrname(&name)) {
4340 if (*name) {
4341 dout(20) << __FUNC__ << ": " << fd << " getting '" << name << "'" << dendl;
4342 int r = _fgetattr(fd, attrname, aset[name]);
4343 if (r < 0) {
4344 delete[] names2;
4345 return r;
4346 }
4347 }
4348 }
4349 name += strlen(name) + 1;
4350 }
4351
4352 delete[] names2;
4353 return 0;
4354 }
4355
4356 int FileStore::_fsetattrs(int fd, map<string, bufferptr> &aset)
4357 {
4358 for (map<string, bufferptr>::iterator p = aset.begin();
4359 p != aset.end();
4360 ++p) {
4361 char n[CHAIN_XATTR_MAX_NAME_LEN];
4362 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4363 const char *val;
4364 if (p->second.length())
4365 val = p->second.c_str();
4366 else
4367 val = "";
4368 // ??? Why do we skip setting all the other attrs if one fails?
4369 int r = chain_fsetxattr(fd, n, val, p->second.length());
4370 if (r < 0) {
4371 derr << __FUNC__ << ": chain_setxattr returned " << r << dendl;
4372 return r;
4373 }
4374 }
4375 return 0;
4376 }
4377
4378 // debug EIO injection
4379 void FileStore::inject_data_error(const ghobject_t &oid) {
4380 Mutex::Locker l(read_error_lock);
4381 dout(10) << __FUNC__ << ": init error on " << oid << dendl;
4382 data_error_set.insert(oid);
4383 }
4384 void FileStore::inject_mdata_error(const ghobject_t &oid) {
4385 Mutex::Locker l(read_error_lock);
4386 dout(10) << __FUNC__ << ": init error on " << oid << dendl;
4387 mdata_error_set.insert(oid);
4388 }
4389
4390 void FileStore::debug_obj_on_delete(const ghobject_t &oid) {
4391 Mutex::Locker l(read_error_lock);
4392 dout(10) << __FUNC__ << ": clear error on " << oid << dendl;
4393 data_error_set.erase(oid);
4394 mdata_error_set.erase(oid);
4395 }
4396 bool FileStore::debug_data_eio(const ghobject_t &oid) {
4397 Mutex::Locker l(read_error_lock);
4398 if (data_error_set.count(oid)) {
4399 dout(10) << __FUNC__ << ": inject error on " << oid << dendl;
4400 return true;
4401 } else {
4402 return false;
4403 }
4404 }
4405 bool FileStore::debug_mdata_eio(const ghobject_t &oid) {
4406 Mutex::Locker l(read_error_lock);
4407 if (mdata_error_set.count(oid)) {
4408 dout(10) << __FUNC__ << ": inject error on " << oid << dendl;
4409 return true;
4410 } else {
4411 return false;
4412 }
4413 }
4414
4415
4416 // objects
4417
4418 int FileStore::getattr(const coll_t& _cid, const ghobject_t& oid, const char *name, bufferptr &bp)
4419 {
4420 tracepoint(objectstore, getattr_enter, _cid.c_str());
4421 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
4422 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "'" << dendl;
4423 FDRef fd;
4424 int r = lfn_open(cid, oid, false, &fd);
4425 if (r < 0) {
4426 goto out;
4427 }
4428 char n[CHAIN_XATTR_MAX_NAME_LEN];
4429 get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN);
4430 r = _fgetattr(**fd, n, bp);
4431 lfn_close(fd);
4432 if (r == -ENODATA) {
4433 map<string, bufferlist> got;
4434 set<string> to_get;
4435 to_get.insert(string(name));
4436 Index index;
4437 r = get_index(cid, &index);
4438 if (r < 0) {
4439 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
4440 goto out;
4441 }
4442 r = object_map->get_xattrs(oid, to_get, &got);
4443 if (r < 0 && r != -ENOENT) {
4444 dout(10) << __FUNC__ << ": get_xattrs err r =" << r << dendl;
4445 goto out;
4446 }
4447 if (got.empty()) {
4448 dout(10) << __FUNC__ << ": got.size() is 0" << dendl;
4449 return -ENODATA;
4450 }
4451 bp = bufferptr(got.begin()->second.c_str(),
4452 got.begin()->second.length());
4453 r = bp.length();
4454 }
4455 out:
4456 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "' = " << r << dendl;
4457 assert(!m_filestore_fail_eio || r != -EIO);
4458 if (cct->_conf->filestore_debug_inject_read_err &&
4459 debug_mdata_eio(oid)) {
4460 return -EIO;
4461 } else {
4462 tracepoint(objectstore, getattr_exit, r);
4463 return r < 0 ? r : 0;
4464 }
4465 }
4466
4467 int FileStore::getattrs(const coll_t& _cid, const ghobject_t& oid, map<string,bufferptr>& aset)
4468 {
4469 tracepoint(objectstore, getattrs_enter, _cid.c_str());
4470 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
4471 set<string> omap_attrs;
4472 map<string, bufferlist> omap_aset;
4473 Index index;
4474 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
4475 FDRef fd;
4476 bool spill_out = true;
4477 char buf[2];
4478
4479 int r = lfn_open(cid, oid, false, &fd);
4480 if (r < 0) {
4481 goto out;
4482 }
4483
4484 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4485 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT)))
4486 spill_out = false;
4487
4488 r = _fgetattrs(**fd, aset);
4489 lfn_close(fd);
4490 fd = FDRef(); // defensive
4491 if (r < 0) {
4492 goto out;
4493 }
4494
4495 if (!spill_out) {
4496 dout(10) << __FUNC__ << ": no xattr exists in object_map r = " << r << dendl;
4497 goto out;
4498 }
4499
4500 r = get_index(cid, &index);
4501 if (r < 0) {
4502 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
4503 goto out;
4504 }
4505 {
4506 r = object_map->get_all_xattrs(oid, &omap_attrs);
4507 if (r < 0 && r != -ENOENT) {
4508 dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
4509 goto out;
4510 }
4511
4512 r = object_map->get_xattrs(oid, omap_attrs, &omap_aset);
4513 if (r < 0 && r != -ENOENT) {
4514 dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
4515 goto out;
4516 }
4517 if (r == -ENOENT)
4518 r = 0;
4519 }
4520 assert(omap_attrs.size() == omap_aset.size());
4521 for (map<string, bufferlist>::iterator i = omap_aset.begin();
4522 i != omap_aset.end();
4523 ++i) {
4524 string key(i->first);
4525 aset.insert(make_pair(key,
4526 bufferptr(i->second.c_str(), i->second.length())));
4527 }
4528 out:
4529 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
4530 assert(!m_filestore_fail_eio || r != -EIO);
4531
4532 if (cct->_conf->filestore_debug_inject_read_err &&
4533 debug_mdata_eio(oid)) {
4534 return -EIO;
4535 } else {
4536 tracepoint(objectstore, getattrs_exit, r);
4537 return r;
4538 }
4539 }
4540
4541 int FileStore::_setattrs(const coll_t& cid, const ghobject_t& oid, map<string,bufferptr>& aset,
4542 const SequencerPosition &spos)
4543 {
4544 map<string, bufferlist> omap_set;
4545 set<string> omap_remove;
4546 map<string, bufferptr> inline_set;
4547 map<string, bufferptr> inline_to_set;
4548 FDRef fd;
4549 int spill_out = -1;
4550 bool incomplete_inline = false;
4551
4552 int r = lfn_open(cid, oid, false, &fd);
4553 if (r < 0) {
4554 goto out;
4555 }
4556
4557 char buf[2];
4558 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4559 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT)))
4560 spill_out = 0;
4561 else
4562 spill_out = 1;
4563
4564 r = _fgetattrs(**fd, inline_set);
4565 incomplete_inline = (r == -E2BIG);
4566 assert(!m_filestore_fail_eio || r != -EIO);
4567 dout(15) << __FUNC__ << ": " << cid << "/" << oid
4568 << (incomplete_inline ? " (incomplete_inline, forcing omap)" : "")
4569 << dendl;
4570
4571 for (map<string,bufferptr>::iterator p = aset.begin();
4572 p != aset.end();
4573 ++p) {
4574 char n[CHAIN_XATTR_MAX_NAME_LEN];
4575 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4576
4577 if (incomplete_inline) {
4578 chain_fremovexattr(**fd, n); // ignore any error
4579 omap_set[p->first].push_back(p->second);
4580 continue;
4581 }
4582
4583 if (p->second.length() > m_filestore_max_inline_xattr_size) {
4584 if (inline_set.count(p->first)) {
4585 inline_set.erase(p->first);
4586 r = chain_fremovexattr(**fd, n);
4587 if (r < 0)
4588 goto out_close;
4589 }
4590 omap_set[p->first].push_back(p->second);
4591 continue;
4592 }
4593
4594 if (!inline_set.count(p->first) &&
4595 inline_set.size() >= m_filestore_max_inline_xattrs) {
4596 omap_set[p->first].push_back(p->second);
4597 continue;
4598 }
4599 omap_remove.insert(p->first);
4600 inline_set.insert(*p);
4601
4602 inline_to_set.insert(*p);
4603 }
4604
4605 if (spill_out != 1 && !omap_set.empty()) {
4606 chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_SPILL_OUT,
4607 sizeof(XATTR_SPILL_OUT));
4608 }
4609
4610 r = _fsetattrs(**fd, inline_to_set);
4611 if (r < 0)
4612 goto out_close;
4613
4614 if (spill_out && !omap_remove.empty()) {
4615 r = object_map->remove_xattrs(oid, omap_remove, &spos);
4616 if (r < 0 && r != -ENOENT) {
4617 dout(10) << __FUNC__ << ": could not remove_xattrs r = " << r << dendl;
4618 assert(!m_filestore_fail_eio || r != -EIO);
4619 goto out_close;
4620 } else {
4621 r = 0; // don't confuse the debug output
4622 }
4623 }
4624
4625 if (!omap_set.empty()) {
4626 r = object_map->set_xattrs(oid, omap_set, &spos);
4627 if (r < 0) {
4628 dout(10) << __FUNC__ << ": could not set_xattrs r = " << r << dendl;
4629 assert(!m_filestore_fail_eio || r != -EIO);
4630 goto out_close;
4631 }
4632 }
4633 out_close:
4634 lfn_close(fd);
4635 out:
4636 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
4637 return r;
4638 }
4639
4640
4641 int FileStore::_rmattr(const coll_t& cid, const ghobject_t& oid, const char *name,
4642 const SequencerPosition &spos)
4643 {
4644 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "'" << dendl;
4645 FDRef fd;
4646 bool spill_out = true;
4647
4648 int r = lfn_open(cid, oid, false, &fd);
4649 if (r < 0) {
4650 goto out;
4651 }
4652
4653 char buf[2];
4654 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4655 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
4656 spill_out = false;
4657 }
4658
4659 char n[CHAIN_XATTR_MAX_NAME_LEN];
4660 get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN);
4661 r = chain_fremovexattr(**fd, n);
4662 if (r == -ENODATA && spill_out) {
4663 Index index;
4664 r = get_index(cid, &index);
4665 if (r < 0) {
4666 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
4667 goto out_close;
4668 }
4669 set<string> to_remove;
4670 to_remove.insert(string(name));
4671 r = object_map->remove_xattrs(oid, to_remove, &spos);
4672 if (r < 0 && r != -ENOENT) {
4673 dout(10) << __FUNC__ << ": could not remove_xattrs index r = " << r << dendl;
4674 assert(!m_filestore_fail_eio || r != -EIO);
4675 goto out_close;
4676 }
4677 }
4678 out_close:
4679 lfn_close(fd);
4680 out:
4681 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "' = " << r << dendl;
4682 return r;
4683 }
4684
4685 int FileStore::_rmattrs(const coll_t& cid, const ghobject_t& oid,
4686 const SequencerPosition &spos)
4687 {
4688 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
4689
4690 map<string,bufferptr> aset;
4691 FDRef fd;
4692 set<string> omap_attrs;
4693 Index index;
4694 bool spill_out = true;
4695
4696 int r = lfn_open(cid, oid, false, &fd);
4697 if (r < 0) {
4698 goto out;
4699 }
4700
4701 char buf[2];
4702 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4703 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
4704 spill_out = false;
4705 }
4706
4707 r = _fgetattrs(**fd, aset);
4708 if (r >= 0) {
4709 for (map<string,bufferptr>::iterator p = aset.begin(); p != aset.end(); ++p) {
4710 char n[CHAIN_XATTR_MAX_NAME_LEN];
4711 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4712 r = chain_fremovexattr(**fd, n);
4713 if (r < 0) {
4714 dout(10) << __FUNC__ << ": could not remove xattr r = " << r << dendl;
4715 goto out_close;
4716 }
4717 }
4718 }
4719
4720 if (!spill_out) {
4721 dout(10) << __FUNC__ << ": no xattr exists in object_map r = " << r << dendl;
4722 goto out_close;
4723 }
4724
4725 r = get_index(cid, &index);
4726 if (r < 0) {
4727 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
4728 goto out_close;
4729 }
4730 {
4731 r = object_map->get_all_xattrs(oid, &omap_attrs);
4732 if (r < 0 && r != -ENOENT) {
4733 dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
4734 assert(!m_filestore_fail_eio || r != -EIO);
4735 goto out_close;
4736 }
4737 r = object_map->remove_xattrs(oid, omap_attrs, &spos);
4738 if (r < 0 && r != -ENOENT) {
4739 dout(10) << __FUNC__ << ": could not remove omap_attrs r = " << r << dendl;
4740 goto out_close;
4741 }
4742 if (r == -ENOENT)
4743 r = 0;
4744 chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
4745 sizeof(XATTR_NO_SPILL_OUT));
4746 }
4747
4748 out_close:
4749 lfn_close(fd);
4750 out:
4751 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
4752 return r;
4753 }
4754
4755
4756
4757
4758 int FileStore::_collection_remove_recursive(const coll_t &cid,
4759 const SequencerPosition &spos)
4760 {
4761 struct stat st;
4762 int r = collection_stat(cid, &st);
4763 if (r < 0) {
4764 if (r == -ENOENT)
4765 return 0;
4766 return r;
4767 }
4768
4769 vector<ghobject_t> objects;
4770 ghobject_t max;
4771 while (!max.is_max()) {
4772 r = collection_list(cid, max, ghobject_t::get_max(),
4773 300, &objects, &max);
4774 if (r < 0)
4775 return r;
4776 for (vector<ghobject_t>::iterator i = objects.begin();
4777 i != objects.end();
4778 ++i) {
4779 assert(_check_replay_guard(cid, *i, spos));
4780 r = _remove(cid, *i, spos);
4781 if (r < 0)
4782 return r;
4783 }
4784 objects.clear();
4785 }
4786 return _destroy_collection(cid);
4787 }
4788
4789 // --------------------------
4790 // collections
4791
4792 int FileStore::list_collections(vector<coll_t>& ls)
4793 {
4794 return list_collections(ls, false);
4795 }
4796
4797 int FileStore::list_collections(vector<coll_t>& ls, bool include_temp)
4798 {
4799 tracepoint(objectstore, list_collections_enter);
4800 dout(10) << __FUNC__ << dendl;
4801
4802 char fn[PATH_MAX];
4803 snprintf(fn, sizeof(fn), "%s/current", basedir.c_str());
4804
4805 int r = 0;
4806 DIR *dir = ::opendir(fn);
4807 if (!dir) {
4808 r = -errno;
4809 derr << "tried opening directory " << fn << ": " << cpp_strerror(-r) << dendl;
4810 assert(!m_filestore_fail_eio || r != -EIO);
4811 return r;
4812 }
4813
4814 struct dirent *de = nullptr;
4815 while ((de = ::readdir(dir))) {
4816 if (de->d_type == DT_UNKNOWN) {
4817 // d_type not supported (non-ext[234], btrfs), must stat
4818 struct stat sb;
4819 char filename[PATH_MAX];
4820 snprintf(filename, sizeof(filename), "%s/%s", fn, de->d_name);
4821
4822 r = ::stat(filename, &sb);
4823 if (r < 0) {
4824 r = -errno;
4825 derr << "stat on " << filename << ": " << cpp_strerror(-r) << dendl;
4826 assert(!m_filestore_fail_eio || r != -EIO);
4827 break;
4828 }
4829 if (!S_ISDIR(sb.st_mode)) {
4830 continue;
4831 }
4832 } else if (de->d_type != DT_DIR) {
4833 continue;
4834 }
4835 if (strcmp(de->d_name, "omap") == 0) {
4836 continue;
4837 }
4838 if (de->d_name[0] == '.' &&
4839 (de->d_name[1] == '\0' ||
4840 (de->d_name[1] == '.' &&
4841 de->d_name[2] == '\0')))
4842 continue;
4843 coll_t cid;
4844 if (!cid.parse(de->d_name)) {
4845 derr << "ignoring invalid collection '" << de->d_name << "'" << dendl;
4846 continue;
4847 }
4848 if (!cid.is_temp() || include_temp)
4849 ls.push_back(cid);
4850 }
4851
4852 if (r > 0) {
4853 derr << "trying readdir " << fn << ": " << cpp_strerror(r) << dendl;
4854 r = -r;
4855 }
4856
4857 ::closedir(dir);
4858 assert(!m_filestore_fail_eio || r != -EIO);
4859 tracepoint(objectstore, list_collections_exit, r);
4860 return r;
4861 }
4862
4863 int FileStore::collection_stat(const coll_t& c, struct stat *st)
4864 {
4865 tracepoint(objectstore, collection_stat_enter, c.c_str());
4866 char fn[PATH_MAX];
4867 get_cdir(c, fn, sizeof(fn));
4868 dout(15) << __FUNC__ << ": " << fn << dendl;
4869 int r = ::stat(fn, st);
4870 if (r < 0)
4871 r = -errno;
4872 dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
4873 assert(!m_filestore_fail_eio || r != -EIO);
4874 tracepoint(objectstore, collection_stat_exit, r);
4875 return r;
4876 }
4877
4878 bool FileStore::collection_exists(const coll_t& c)
4879 {
4880 tracepoint(objectstore, collection_exists_enter, c.c_str());
4881 struct stat st;
4882 bool ret = collection_stat(c, &st) == 0;
4883 tracepoint(objectstore, collection_exists_exit, ret);
4884 return ret;
4885 }
4886
4887 int FileStore::collection_empty(const coll_t& c, bool *empty)
4888 {
4889 tracepoint(objectstore, collection_empty_enter, c.c_str());
4890 dout(15) << __FUNC__ << ": " << c << dendl;
4891 Index index;
4892 int r = get_index(c, &index);
4893 if (r < 0) {
4894 derr << __FUNC__ << ": get_index returned: " << cpp_strerror(r)
4895 << dendl;
4896 return r;
4897 }
4898
4899 assert(NULL != index.index);
4900 RWLock::RLocker l((index.index)->access_lock);
4901
4902 vector<ghobject_t> ls;
4903 r = index->collection_list_partial(ghobject_t(), ghobject_t::get_max(),
4904 1, &ls, NULL);
4905 if (r < 0) {
4906 derr << __FUNC__ << ": collection_list_partial returned: "
4907 << cpp_strerror(r) << dendl;
4908 assert(!m_filestore_fail_eio || r != -EIO);
4909 return r;
4910 }
4911 *empty = ls.empty();
4912 tracepoint(objectstore, collection_empty_exit, *empty);
4913 return 0;
4914 }
4915
4916 int FileStore::_collection_set_bits(const coll_t& c, int bits)
4917 {
4918 char fn[PATH_MAX];
4919 get_cdir(c, fn, sizeof(fn));
4920 dout(10) << __FUNC__ << ": " << fn << " " << bits << dendl;
4921 char n[PATH_MAX];
4922 int r;
4923 int32_t v = bits;
4924 int fd = ::open(fn, O_RDONLY);
4925 if (fd < 0) {
4926 r = -errno;
4927 goto out;
4928 }
4929 get_attrname("bits", n, PATH_MAX);
4930 r = chain_fsetxattr(fd, n, (char*)&v, sizeof(v));
4931 VOID_TEMP_FAILURE_RETRY(::close(fd));
4932 out:
4933 dout(10) << __FUNC__ << ": " << fn << " " << bits << " = " << r << dendl;
4934 return r;
4935 }
4936
4937 int FileStore::collection_bits(const coll_t& c)
4938 {
4939 char fn[PATH_MAX];
4940 get_cdir(c, fn, sizeof(fn));
4941 dout(15) << __FUNC__ << ": " << fn << dendl;
4942 int r;
4943 char n[PATH_MAX];
4944 int32_t bits;
4945 int fd = ::open(fn, O_RDONLY);
4946 if (fd < 0) {
4947 bits = r = -errno;
4948 goto out;
4949 }
4950 get_attrname("bits", n, PATH_MAX);
4951 r = chain_fgetxattr(fd, n, (char*)&bits, sizeof(bits));
4952 VOID_TEMP_FAILURE_RETRY(::close(fd));
4953 if (r < 0) {
4954 bits = r;
4955 goto out;
4956 }
4957 out:
4958 dout(10) << __FUNC__ << ": " << fn << " = " << bits << dendl;
4959 return bits;
4960 }
4961
4962 int FileStore::collection_list(const coll_t& c,
4963 const ghobject_t& orig_start,
4964 const ghobject_t& end,
4965 int max,
4966 vector<ghobject_t> *ls, ghobject_t *next)
4967 {
4968 ghobject_t start = orig_start;
4969 if (start.is_max())
4970 return 0;
4971
4972 ghobject_t temp_next;
4973 if (!next)
4974 next = &temp_next;
4975 // figure out the pool id. we need this in order to generate a
4976 // meaningful 'next' value.
4977 int64_t pool = -1;
4978 shard_id_t shard;
4979 {
4980 spg_t pgid;
4981 if (c.is_temp(&pgid)) {
4982 pool = -2 - pgid.pool();
4983 shard = pgid.shard;
4984 } else if (c.is_pg(&pgid)) {
4985 pool = pgid.pool();
4986 shard = pgid.shard;
4987 } else if (c.is_meta()) {
4988 pool = -1;
4989 shard = shard_id_t::NO_SHARD;
4990 } else {
4991 // hrm, the caller is test code! we should get kill it off. for now,
4992 // tolerate it.
4993 pool = 0;
4994 shard = shard_id_t::NO_SHARD;
4995 }
4996 dout(20) << __FUNC__ << ": pool is " << pool << " shard is " << shard
4997 << " pgid " << pgid << dendl;
4998 }
4999 ghobject_t sep;
5000 sep.hobj.pool = -1;
5001 sep.set_shard(shard);
5002 if (!c.is_temp() && !c.is_meta()) {
5003 if (start < sep) {
5004 dout(10) << __FUNC__ << ": first checking temp pool" << dendl;
5005 coll_t temp = c.get_temp();
5006 int r = collection_list(temp, start, end, max, ls, next);
5007 if (r < 0)
5008 return r;
5009 if (*next != ghobject_t::get_max())
5010 return r;
5011 start = sep;
5012 dout(10) << __FUNC__ << ": fall through to non-temp collection, start "
5013 << start << dendl;
5014 } else {
5015 dout(10) << __FUNC__ << ": start " << start << " >= sep " << sep << dendl;
5016 }
5017 }
5018
5019 Index index;
5020 int r = get_index(c, &index);
5021 if (r < 0)
5022 return r;
5023
5024 assert(NULL != index.index);
5025 RWLock::RLocker l((index.index)->access_lock);
5026
5027 r = index->collection_list_partial(start, end, max, ls, next);
5028
5029 if (r < 0) {
5030 assert(!m_filestore_fail_eio || r != -EIO);
5031 return r;
5032 }
5033 dout(20) << "objects: " << *ls << dendl;
5034
5035 // HashIndex doesn't know the pool when constructing a 'next' value
5036 if (next && !next->is_max()) {
5037 next->hobj.pool = pool;
5038 next->set_shard(shard);
5039 dout(20) << " next " << *next << dendl;
5040 }
5041
5042 return 0;
5043 }
5044
5045 int FileStore::omap_get(const coll_t& _c, const ghobject_t &hoid,
5046 bufferlist *header,
5047 map<string, bufferlist> *out)
5048 {
5049 tracepoint(objectstore, omap_get_enter, _c.c_str());
5050 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5051 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5052 Index index;
5053 int r = get_index(c, &index);
5054 if (r < 0)
5055 return r;
5056 {
5057 assert(NULL != index.index);
5058 RWLock::RLocker l((index.index)->access_lock);
5059 r = lfn_find(hoid, index);
5060 if (r < 0)
5061 return r;
5062 }
5063 r = object_map->get(hoid, header, out);
5064 if (r < 0 && r != -ENOENT) {
5065 assert(!m_filestore_fail_eio || r != -EIO);
5066 return r;
5067 }
5068 tracepoint(objectstore, omap_get_exit, 0);
5069 return 0;
5070 }
5071
5072 int FileStore::omap_get_header(
5073 const coll_t& _c,
5074 const ghobject_t &hoid,
5075 bufferlist *bl,
5076 bool allow_eio)
5077 {
5078 tracepoint(objectstore, omap_get_header_enter, _c.c_str());
5079 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5080 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5081 Index index;
5082 int r = get_index(c, &index);
5083 if (r < 0)
5084 return r;
5085 {
5086 assert(NULL != index.index);
5087 RWLock::RLocker l((index.index)->access_lock);
5088 r = lfn_find(hoid, index);
5089 if (r < 0)
5090 return r;
5091 }
5092 r = object_map->get_header(hoid, bl);
5093 if (r < 0 && r != -ENOENT) {
5094 assert(allow_eio || !m_filestore_fail_eio || r != -EIO);
5095 return r;
5096 }
5097 tracepoint(objectstore, omap_get_header_exit, 0);
5098 return 0;
5099 }
5100
5101 int FileStore::omap_get_keys(const coll_t& _c, const ghobject_t &hoid, set<string> *keys)
5102 {
5103 tracepoint(objectstore, omap_get_keys_enter, _c.c_str());
5104 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5105 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5106 Index index;
5107 int r = get_index(c, &index);
5108 if (r < 0)
5109 return r;
5110 {
5111 assert(NULL != index.index);
5112 RWLock::RLocker l((index.index)->access_lock);
5113 r = lfn_find(hoid, index);
5114 if (r < 0)
5115 return r;
5116 }
5117 r = object_map->get_keys(hoid, keys);
5118 if (r < 0 && r != -ENOENT) {
5119 assert(!m_filestore_fail_eio || r != -EIO);
5120 return r;
5121 }
5122 tracepoint(objectstore, omap_get_keys_exit, 0);
5123 return 0;
5124 }
5125
5126 int FileStore::omap_get_values(const coll_t& _c, const ghobject_t &hoid,
5127 const set<string> &keys,
5128 map<string, bufferlist> *out)
5129 {
5130 tracepoint(objectstore, omap_get_values_enter, _c.c_str());
5131 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5132 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5133 Index index;
5134 const char *where = "()";
5135 int r = get_index(c, &index);
5136 if (r < 0) {
5137 where = " (get_index)";
5138 goto out;
5139 }
5140 {
5141 assert(NULL != index.index);
5142 RWLock::RLocker l((index.index)->access_lock);
5143 r = lfn_find(hoid, index);
5144 if (r < 0) {
5145 where = " (lfn_find)";
5146 goto out;
5147 }
5148 }
5149 r = object_map->get_values(hoid, keys, out);
5150 if (r < 0 && r != -ENOENT) {
5151 assert(!m_filestore_fail_eio || r != -EIO);
5152 where = " (get_values)";
5153 goto out;
5154 }
5155 r = 0;
5156 out:
5157 tracepoint(objectstore, omap_get_values_exit, r);
5158 dout(15) << __FUNC__ << ": " << c << "/" << hoid << " = " << r
5159 << where << dendl;
5160 return r;
5161 }
5162
5163 int FileStore::omap_check_keys(const coll_t& _c, const ghobject_t &hoid,
5164 const set<string> &keys,
5165 set<string> *out)
5166 {
5167 tracepoint(objectstore, omap_check_keys_enter, _c.c_str());
5168 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5169 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5170
5171 Index index;
5172 int r = get_index(c, &index);
5173 if (r < 0)
5174 return r;
5175 {
5176 assert(NULL != index.index);
5177 RWLock::RLocker l((index.index)->access_lock);
5178 r = lfn_find(hoid, index);
5179 if (r < 0)
5180 return r;
5181 }
5182 r = object_map->check_keys(hoid, keys, out);
5183 if (r < 0 && r != -ENOENT) {
5184 assert(!m_filestore_fail_eio || r != -EIO);
5185 return r;
5186 }
5187 tracepoint(objectstore, omap_check_keys_exit, 0);
5188 return 0;
5189 }
5190
5191 ObjectMap::ObjectMapIterator FileStore::get_omap_iterator(const coll_t& _c,
5192 const ghobject_t &hoid)
5193 {
5194 tracepoint(objectstore, get_omap_iterator, _c.c_str());
5195 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5196 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5197 Index index;
5198 int r = get_index(c, &index);
5199 if (r < 0) {
5200 dout(10) << __FUNC__ << ": " << c << "/" << hoid << " = 0 "
5201 << "(get_index failed with " << cpp_strerror(r) << ")" << dendl;
5202 return ObjectMap::ObjectMapIterator();
5203 }
5204 {
5205 assert(NULL != index.index);
5206 RWLock::RLocker l((index.index)->access_lock);
5207 r = lfn_find(hoid, index);
5208 if (r < 0) {
5209 dout(10) << __FUNC__ << ": " << c << "/" << hoid << " = 0 "
5210 << "(lfn_find failed with " << cpp_strerror(r) << ")" << dendl;
5211 return ObjectMap::ObjectMapIterator();
5212 }
5213 }
5214 return object_map->get_iterator(hoid);
5215 }
5216
5217 int FileStore::_collection_hint_expected_num_objs(const coll_t& c, uint32_t pg_num,
5218 uint64_t expected_num_objs,
5219 const SequencerPosition &spos)
5220 {
5221 dout(15) << __FUNC__ << ": collection: " << c << " pg number: "
5222 << pg_num << " expected number of objects: " << expected_num_objs << dendl;
5223
5224 bool empty;
5225 int ret = collection_empty(c, &empty);
5226 if (ret < 0)
5227 return ret;
5228 if (!empty && !replaying) {
5229 dout(0) << "Failed to give an expected number of objects hint to collection : "
5230 << c << ", only empty collection can take such type of hint. " << dendl;
5231 return 0;
5232 }
5233
5234 Index index;
5235 ret = get_index(c, &index);
5236 if (ret < 0)
5237 return ret;
5238 // Pre-hash the collection
5239 ret = index->pre_hash_collection(pg_num, expected_num_objs);
5240 dout(10) << "pre_hash_collection " << c << " = " << ret << dendl;
5241 if (ret < 0)
5242 return ret;
5243 _set_replay_guard(c, spos);
5244
5245 return 0;
5246 }
5247
5248 int FileStore::_create_collection(
5249 const coll_t& c,
5250 int bits,
5251 const SequencerPosition &spos)
5252 {
5253 char fn[PATH_MAX];
5254 get_cdir(c, fn, sizeof(fn));
5255 dout(15) << __FUNC__ << ": " << fn << dendl;
5256 int r = ::mkdir(fn, 0755);
5257 if (r < 0)
5258 r = -errno;
5259 if (r == -EEXIST && replaying)
5260 r = 0;
5261 dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
5262
5263 if (r < 0)
5264 return r;
5265 r = init_index(c);
5266 if (r < 0)
5267 return r;
5268 r = _collection_set_bits(c, bits);
5269 if (r < 0)
5270 return r;
5271 // create parallel temp collection, too
5272 if (!c.is_meta() && !c.is_temp()) {
5273 coll_t temp = c.get_temp();
5274 r = _create_collection(temp, 0, spos);
5275 if (r < 0)
5276 return r;
5277 }
5278
5279 _set_replay_guard(c, spos);
5280 return 0;
5281 }
5282
5283 int FileStore::_destroy_collection(const coll_t& c)
5284 {
5285 int r = 0;
5286 char fn[PATH_MAX];
5287 get_cdir(c, fn, sizeof(fn));
5288 dout(15) << __FUNC__ << ": " << fn << dendl;
5289 {
5290 Index from;
5291 r = get_index(c, &from);
5292 if (r < 0)
5293 goto out;
5294 assert(NULL != from.index);
5295 RWLock::WLocker l((from.index)->access_lock);
5296
5297 r = from->prep_delete();
5298 if (r < 0)
5299 goto out;
5300 }
5301 r = ::rmdir(fn);
5302 if (r < 0) {
5303 r = -errno;
5304 goto out;
5305 }
5306
5307 out:
5308 // destroy parallel temp collection, too
5309 if (!c.is_meta() && !c.is_temp()) {
5310 coll_t temp = c.get_temp();
5311 int r2 = _destroy_collection(temp);
5312 if (r2 < 0) {
5313 r = r2;
5314 goto out_final;
5315 }
5316 }
5317
5318 out_final:
5319 dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
5320 return r;
5321 }
5322
5323
5324 int FileStore::_collection_add(const coll_t& c, const coll_t& oldcid, const ghobject_t& o,
5325 const SequencerPosition& spos)
5326 {
5327 dout(15) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << o << dendl;
5328
5329 int dstcmp = _check_replay_guard(c, o, spos);
5330 if (dstcmp < 0)
5331 return 0;
5332
5333 // check the src name too; it might have a newer guard, and we don't
5334 // want to clobber it
5335 int srccmp = _check_replay_guard(oldcid, o, spos);
5336 if (srccmp < 0)
5337 return 0;
5338
5339 // open guard on object so we don't any previous operations on the
5340 // new name that will modify the source inode.
5341 FDRef fd;
5342 int r = lfn_open(oldcid, o, 0, &fd);
5343 if (r < 0) {
5344 // the source collection/object does not exist. If we are replaying, we
5345 // should be safe, so just return 0 and move on.
5346 assert(replaying);
5347 dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
5348 << oldcid << "/" << o << " (dne, continue replay) " << dendl;
5349 return 0;
5350 }
5351 if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress"
5352 _set_replay_guard(**fd, spos, &o, true);
5353 }
5354
5355 r = lfn_link(oldcid, c, o, o);
5356 if (replaying && !backend->can_checkpoint() &&
5357 r == -EEXIST) // crashed between link() and set_replay_guard()
5358 r = 0;
5359
5360 _inject_failure();
5361
5362 // close guard on object so we don't do this again
5363 if (r == 0) {
5364 _close_replay_guard(**fd, spos);
5365 }
5366 lfn_close(fd);
5367
5368 dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << o << " = " << r << dendl;
5369 return r;
5370 }
5371
5372 int FileStore::_collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid,
5373 coll_t c, const ghobject_t& o,
5374 const SequencerPosition& spos,
5375 bool allow_enoent)
5376 {
5377 dout(15) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid << dendl;
5378 int r = 0;
5379 int dstcmp, srccmp;
5380
5381 if (replaying) {
5382 /* If the destination collection doesn't exist during replay,
5383 * we need to delete the src object and continue on
5384 */
5385 if (!collection_exists(c))
5386 goto out_rm_src;
5387 }
5388
5389 dstcmp = _check_replay_guard(c, o, spos);
5390 if (dstcmp < 0)
5391 goto out_rm_src;
5392
5393 // check the src name too; it might have a newer guard, and we don't
5394 // want to clobber it
5395 srccmp = _check_replay_guard(oldcid, oldoid, spos);
5396 if (srccmp < 0)
5397 return 0;
5398
5399 {
5400 // open guard on object so we don't any previous operations on the
5401 // new name that will modify the source inode.
5402 FDRef fd;
5403 r = lfn_open(oldcid, oldoid, 0, &fd);
5404 if (r < 0) {
5405 // the source collection/object does not exist. If we are replaying, we
5406 // should be safe, so just return 0 and move on.
5407 if (replaying) {
5408 dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
5409 << oldcid << "/" << oldoid << " (dne, continue replay) " << dendl;
5410 } else if (allow_enoent) {
5411 dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
5412 << oldcid << "/" << oldoid << " (dne, ignoring enoent)"
5413 << dendl;
5414 } else {
5415 assert(0 == "ERROR: source must exist");
5416 }
5417
5418 if (!replaying) {
5419 return 0;
5420 }
5421 if (allow_enoent && dstcmp > 0) { // if dstcmp == 0, try_rename was started.
5422 return 0;
5423 }
5424
5425 r = 0; // don't know if object_map was cloned
5426 } else {
5427 if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress"
5428 _set_replay_guard(**fd, spos, &o, true);
5429 }
5430
5431 r = lfn_link(oldcid, c, oldoid, o);
5432 if (replaying && !backend->can_checkpoint() &&
5433 r == -EEXIST) // crashed between link() and set_replay_guard()
5434 r = 0;
5435
5436 lfn_close(fd);
5437 fd = FDRef();
5438
5439 _inject_failure();
5440 }
5441
5442 if (r == 0) {
5443 // the name changed; link the omap content
5444 r = object_map->rename(oldoid, o, &spos);
5445 if (r == -ENOENT)
5446 r = 0;
5447 }
5448
5449 _inject_failure();
5450
5451 if (r == 0)
5452 r = lfn_unlink(oldcid, oldoid, spos, true);
5453
5454 if (r == 0)
5455 r = lfn_open(c, o, 0, &fd);
5456
5457 // close guard on object so we don't do this again
5458 if (r == 0) {
5459 _close_replay_guard(**fd, spos, &o);
5460 lfn_close(fd);
5461 }
5462 }
5463
5464 dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid
5465 << " = " << r << dendl;
5466 return r;
5467
5468 out_rm_src:
5469 // remove source
5470 if (_check_replay_guard(oldcid, oldoid, spos) > 0) {
5471 r = lfn_unlink(oldcid, oldoid, spos, true);
5472 }
5473
5474 dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid
5475 << " = " << r << dendl;
5476 return r;
5477 }
5478
5479 void FileStore::_inject_failure()
5480 {
5481 if (m_filestore_kill_at) {
5482 int final = --m_filestore_kill_at;
5483 dout(5) << __FUNC__ << ": " << (final+1) << " -> " << final << dendl;
5484 if (final == 0) {
5485 derr << __FUNC__ << ": KILLING" << dendl;
5486 cct->_log->flush();
5487 _exit(1);
5488 }
5489 }
5490 }
5491
5492 int FileStore::_omap_clear(const coll_t& cid, const ghobject_t &hoid,
5493 const SequencerPosition &spos) {
5494 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
5495 Index index;
5496 int r = get_index(cid, &index);
5497 if (r < 0)
5498 return r;
5499 {
5500 assert(NULL != index.index);
5501 RWLock::RLocker l((index.index)->access_lock);
5502 r = lfn_find(hoid, index);
5503 if (r < 0)
5504 return r;
5505 }
5506 r = object_map->clear_keys_header(hoid, &spos);
5507 if (r < 0 && r != -ENOENT)
5508 return r;
5509 return 0;
5510 }
5511
5512 int FileStore::_omap_setkeys(const coll_t& cid, const ghobject_t &hoid,
5513 const map<string, bufferlist> &aset,
5514 const SequencerPosition &spos) {
5515 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
5516 Index index;
5517 int r;
5518 //treat pgmeta as a logical object, skip to check exist
5519 if (hoid.is_pgmeta())
5520 goto skip;
5521
5522 r = get_index(cid, &index);
5523 if (r < 0) {
5524 dout(20) << __FUNC__ << ": get_index got " << cpp_strerror(r) << dendl;
5525 return r;
5526 }
5527 {
5528 assert(NULL != index.index);
5529 RWLock::RLocker l((index.index)->access_lock);
5530 r = lfn_find(hoid, index);
5531 if (r < 0) {
5532 dout(20) << __FUNC__ << ": lfn_find got " << cpp_strerror(r) << dendl;
5533 return r;
5534 }
5535 }
5536 skip:
5537 if (g_conf->subsys.should_gather(ceph_subsys_filestore, 20)) {
5538 for (auto& p : aset) {
5539 dout(20) << __FUNC__ << ": set " << p.first << dendl;
5540 }
5541 }
5542 r = object_map->set_keys(hoid, aset, &spos);
5543 dout(20) << __FUNC__ << ": " << cid << "/" << hoid << " = " << r << dendl;
5544 return r;
5545 }
5546
5547 int FileStore::_omap_rmkeys(const coll_t& cid, const ghobject_t &hoid,
5548 const set<string> &keys,
5549 const SequencerPosition &spos) {
5550 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
5551 Index index;
5552 int r;
5553 //treat pgmeta as a logical object, skip to check exist
5554 if (hoid.is_pgmeta())
5555 goto skip;
5556
5557 r = get_index(cid, &index);
5558 if (r < 0)
5559 return r;
5560 {
5561 assert(NULL != index.index);
5562 RWLock::RLocker l((index.index)->access_lock);
5563 r = lfn_find(hoid, index);
5564 if (r < 0)
5565 return r;
5566 }
5567 skip:
5568 r = object_map->rm_keys(hoid, keys, &spos);
5569 if (r < 0 && r != -ENOENT)
5570 return r;
5571 return 0;
5572 }
5573
5574 int FileStore::_omap_rmkeyrange(const coll_t& cid, const ghobject_t &hoid,
5575 const string& first, const string& last,
5576 const SequencerPosition &spos) {
5577 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << " [" << first << "," << last << "]" << dendl;
5578 set<string> keys;
5579 {
5580 ObjectMap::ObjectMapIterator iter = get_omap_iterator(cid, hoid);
5581 if (!iter)
5582 return -ENOENT;
5583 for (iter->lower_bound(first); iter->valid() && iter->key() < last;
5584 iter->next()) {
5585 keys.insert(iter->key());
5586 }
5587 }
5588 return _omap_rmkeys(cid, hoid, keys, spos);
5589 }
5590
5591 int FileStore::_omap_setheader(const coll_t& cid, const ghobject_t &hoid,
5592 const bufferlist &bl,
5593 const SequencerPosition &spos)
5594 {
5595 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
5596 Index index;
5597 int r = get_index(cid, &index);
5598 if (r < 0)
5599 return r;
5600 {
5601 assert(NULL != index.index);
5602 RWLock::RLocker l((index.index)->access_lock);
5603 r = lfn_find(hoid, index);
5604 if (r < 0)
5605 return r;
5606 }
5607 return object_map->set_header(hoid, bl, &spos);
5608 }
5609
5610 int FileStore::_split_collection(const coll_t& cid,
5611 uint32_t bits,
5612 uint32_t rem,
5613 coll_t dest,
5614 const SequencerPosition &spos)
5615 {
5616 int r;
5617 {
5618 dout(15) << __FUNC__ << ": " << cid << " bits: " << bits << dendl;
5619 if (!collection_exists(cid)) {
5620 dout(2) << __FUNC__ << ": " << cid << " DNE" << dendl;
5621 assert(replaying);
5622 return 0;
5623 }
5624 if (!collection_exists(dest)) {
5625 dout(2) << __FUNC__ << ": " << dest << " DNE" << dendl;
5626 assert(replaying);
5627 return 0;
5628 }
5629
5630 int dstcmp = _check_replay_guard(dest, spos);
5631 if (dstcmp < 0)
5632 return 0;
5633
5634 int srccmp = _check_replay_guard(cid, spos);
5635 if (srccmp < 0)
5636 return 0;
5637
5638 _set_global_replay_guard(cid, spos);
5639 _set_replay_guard(cid, spos, true);
5640 _set_replay_guard(dest, spos, true);
5641
5642 Index from;
5643 r = get_index(cid, &from);
5644
5645 Index to;
5646 if (!r)
5647 r = get_index(dest, &to);
5648
5649 if (!r) {
5650 assert(NULL != from.index);
5651 RWLock::WLocker l1((from.index)->access_lock);
5652
5653 assert(NULL != to.index);
5654 RWLock::WLocker l2((to.index)->access_lock);
5655
5656 r = from->split(rem, bits, to.index);
5657 }
5658
5659 _close_replay_guard(cid, spos);
5660 _close_replay_guard(dest, spos);
5661 }
5662 _collection_set_bits(cid, bits);
5663 if (!r && cct->_conf->filestore_debug_verify_split) {
5664 vector<ghobject_t> objects;
5665 ghobject_t next;
5666 while (1) {
5667 collection_list(
5668 cid,
5669 next, ghobject_t::get_max(),
5670 get_ideal_list_max(),
5671 &objects,
5672 &next);
5673 if (objects.empty())
5674 break;
5675 for (vector<ghobject_t>::iterator i = objects.begin();
5676 i != objects.end();
5677 ++i) {
5678 dout(20) << __FUNC__ << ": " << *i << " still in source "
5679 << cid << dendl;
5680 assert(!i->match(bits, rem));
5681 }
5682 objects.clear();
5683 }
5684 next = ghobject_t();
5685 while (1) {
5686 collection_list(
5687 dest,
5688 next, ghobject_t::get_max(),
5689 get_ideal_list_max(),
5690 &objects,
5691 &next);
5692 if (objects.empty())
5693 break;
5694 for (vector<ghobject_t>::iterator i = objects.begin();
5695 i != objects.end();
5696 ++i) {
5697 dout(20) << __FUNC__ << ": " << *i << " now in dest "
5698 << *i << dendl;
5699 assert(i->match(bits, rem));
5700 }
5701 objects.clear();
5702 }
5703 }
5704 return r;
5705 }
5706
5707 int FileStore::_set_alloc_hint(const coll_t& cid, const ghobject_t& oid,
5708 uint64_t expected_object_size,
5709 uint64_t expected_write_size)
5710 {
5711 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " object_size " << expected_object_size << " write_size " << expected_write_size << dendl;
5712
5713 FDRef fd;
5714 int ret = 0;
5715
5716 if (expected_object_size == 0 || expected_write_size == 0)
5717 goto out;
5718
5719 ret = lfn_open(cid, oid, false, &fd);
5720 if (ret < 0)
5721 goto out;
5722
5723 {
5724 // TODO: a more elaborate hint calculation
5725 uint64_t hint = MIN(expected_write_size, m_filestore_max_alloc_hint_size);
5726
5727 ret = backend->set_alloc_hint(**fd, hint);
5728 dout(20) << __FUNC__ << ": hint " << hint << " ret " << ret << dendl;
5729 }
5730
5731 lfn_close(fd);
5732 out:
5733 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " object_size " << expected_object_size << " write_size " << expected_write_size << " = " << ret << dendl;
5734 assert(!m_filestore_fail_eio || ret != -EIO);
5735 return ret;
5736 }
5737
5738 const char** FileStore::get_tracked_conf_keys() const
5739 {
5740 static const char* KEYS[] = {
5741 "filestore_max_inline_xattr_size",
5742 "filestore_max_inline_xattr_size_xfs",
5743 "filestore_max_inline_xattr_size_btrfs",
5744 "filestore_max_inline_xattr_size_other",
5745 "filestore_max_inline_xattrs",
5746 "filestore_max_inline_xattrs_xfs",
5747 "filestore_max_inline_xattrs_btrfs",
5748 "filestore_max_inline_xattrs_other",
5749 "filestore_max_xattr_value_size",
5750 "filestore_max_xattr_value_size_xfs",
5751 "filestore_max_xattr_value_size_btrfs",
5752 "filestore_max_xattr_value_size_other",
5753 "filestore_min_sync_interval",
5754 "filestore_max_sync_interval",
5755 "filestore_queue_max_ops",
5756 "filestore_queue_max_bytes",
5757 "filestore_expected_throughput_bytes",
5758 "filestore_expected_throughput_ops",
5759 "filestore_queue_low_threshhold",
5760 "filestore_queue_high_threshhold",
5761 "filestore_queue_high_delay_multiple",
5762 "filestore_queue_max_delay_multiple",
5763 "filestore_commit_timeout",
5764 "filestore_dump_file",
5765 "filestore_kill_at",
5766 "filestore_fail_eio",
5767 "filestore_fadvise",
5768 "filestore_sloppy_crc",
5769 "filestore_sloppy_crc_block_size",
5770 "filestore_max_alloc_hint_size",
5771 NULL
5772 };
5773 return KEYS;
5774 }
5775
5776 void FileStore::handle_conf_change(const struct md_config_t *conf,
5777 const std::set <std::string> &changed)
5778 {
5779 if (changed.count("filestore_max_inline_xattr_size") ||
5780 changed.count("filestore_max_inline_xattr_size_xfs") ||
5781 changed.count("filestore_max_inline_xattr_size_btrfs") ||
5782 changed.count("filestore_max_inline_xattr_size_other") ||
5783 changed.count("filestore_max_inline_xattrs") ||
5784 changed.count("filestore_max_inline_xattrs_xfs") ||
5785 changed.count("filestore_max_inline_xattrs_btrfs") ||
5786 changed.count("filestore_max_inline_xattrs_other") ||
5787 changed.count("filestore_max_xattr_value_size") ||
5788 changed.count("filestore_max_xattr_value_size_xfs") ||
5789 changed.count("filestore_max_xattr_value_size_btrfs") ||
5790 changed.count("filestore_max_xattr_value_size_other")) {
5791 if (backend) {
5792 Mutex::Locker l(lock);
5793 set_xattr_limits_via_conf();
5794 }
5795 }
5796
5797 if (changed.count("filestore_queue_max_bytes") ||
5798 changed.count("filestore_queue_max_ops") ||
5799 changed.count("filestore_expected_throughput_bytes") ||
5800 changed.count("filestore_expected_throughput_ops") ||
5801 changed.count("filestore_queue_low_threshhold") ||
5802 changed.count("filestore_queue_high_threshhold") ||
5803 changed.count("filestore_queue_high_delay_multiple") ||
5804 changed.count("filestore_queue_max_delay_multiple")) {
5805 Mutex::Locker l(lock);
5806 set_throttle_params();
5807 }
5808
5809 if (changed.count("filestore_min_sync_interval") ||
5810 changed.count("filestore_max_sync_interval") ||
5811 changed.count("filestore_kill_at") ||
5812 changed.count("filestore_fail_eio") ||
5813 changed.count("filestore_sloppy_crc") ||
5814 changed.count("filestore_sloppy_crc_block_size") ||
5815 changed.count("filestore_max_alloc_hint_size") ||
5816 changed.count("filestore_fadvise")) {
5817 Mutex::Locker l(lock);
5818 m_filestore_min_sync_interval = conf->filestore_min_sync_interval;
5819 m_filestore_max_sync_interval = conf->filestore_max_sync_interval;
5820 m_filestore_kill_at = conf->filestore_kill_at;
5821 m_filestore_fail_eio = conf->filestore_fail_eio;
5822 m_filestore_fadvise = conf->filestore_fadvise;
5823 m_filestore_sloppy_crc = conf->filestore_sloppy_crc;
5824 m_filestore_sloppy_crc_block_size = conf->filestore_sloppy_crc_block_size;
5825 m_filestore_max_alloc_hint_size = conf->filestore_max_alloc_hint_size;
5826 }
5827 if (changed.count("filestore_commit_timeout")) {
5828 Mutex::Locker l(sync_entry_timeo_lock);
5829 m_filestore_commit_timeout = conf->filestore_commit_timeout;
5830 }
5831 if (changed.count("filestore_dump_file")) {
5832 if (conf->filestore_dump_file.length() &&
5833 conf->filestore_dump_file != "-") {
5834 dump_start(conf->filestore_dump_file);
5835 } else {
5836 dump_stop();
5837 }
5838 }
5839 }
5840
5841 int FileStore::set_throttle_params()
5842 {
5843 stringstream ss;
5844 bool valid = throttle_bytes.set_params(
5845 cct->_conf->filestore_queue_low_threshhold,
5846 cct->_conf->filestore_queue_high_threshhold,
5847 cct->_conf->filestore_expected_throughput_bytes,
5848 cct->_conf->filestore_queue_high_delay_multiple,
5849 cct->_conf->filestore_queue_max_delay_multiple,
5850 cct->_conf->filestore_queue_max_bytes,
5851 &ss);
5852
5853 valid &= throttle_ops.set_params(
5854 cct->_conf->filestore_queue_low_threshhold,
5855 cct->_conf->filestore_queue_high_threshhold,
5856 cct->_conf->filestore_expected_throughput_ops,
5857 cct->_conf->filestore_queue_high_delay_multiple,
5858 cct->_conf->filestore_queue_max_delay_multiple,
5859 cct->_conf->filestore_queue_max_ops,
5860 &ss);
5861
5862 logger->set(l_filestore_op_queue_max_ops, throttle_ops.get_max());
5863 logger->set(l_filestore_op_queue_max_bytes, throttle_bytes.get_max());
5864
5865 if (!valid) {
5866 derr << "tried to set invalid params: "
5867 << ss.str()
5868 << dendl;
5869 }
5870 return valid ? 0 : -EINVAL;
5871 }
5872
5873 void FileStore::dump_start(const std::string& file)
5874 {
5875 dout(10) << __FUNC__ << ": " << file << dendl;
5876 if (m_filestore_do_dump) {
5877 dump_stop();
5878 }
5879 m_filestore_dump_fmt.reset();
5880 m_filestore_dump_fmt.open_array_section("dump");
5881 m_filestore_dump.open(file.c_str());
5882 m_filestore_do_dump = true;
5883 }
5884
5885 void FileStore::dump_stop()
5886 {
5887 dout(10) << __FUNC__ << dendl;
5888 m_filestore_do_dump = false;
5889 if (m_filestore_dump.is_open()) {
5890 m_filestore_dump_fmt.close_section();
5891 m_filestore_dump_fmt.flush(m_filestore_dump);
5892 m_filestore_dump.flush();
5893 m_filestore_dump.close();
5894 }
5895 }
5896
5897 void FileStore::dump_transactions(vector<ObjectStore::Transaction>& ls, uint64_t seq, OpSequencer *osr)
5898 {
5899 m_filestore_dump_fmt.open_array_section("transactions");
5900 unsigned trans_num = 0;
5901 for (vector<ObjectStore::Transaction>::iterator i = ls.begin(); i != ls.end(); ++i, ++trans_num) {
5902 m_filestore_dump_fmt.open_object_section("transaction");
5903 m_filestore_dump_fmt.dump_string("osr", osr->get_name());
5904 m_filestore_dump_fmt.dump_unsigned("seq", seq);
5905 m_filestore_dump_fmt.dump_unsigned("trans_num", trans_num);
5906 (*i).dump(&m_filestore_dump_fmt);
5907 m_filestore_dump_fmt.close_section();
5908 }
5909 m_filestore_dump_fmt.close_section();
5910 m_filestore_dump_fmt.flush(m_filestore_dump);
5911 m_filestore_dump.flush();
5912 }
5913
5914 void FileStore::set_xattr_limits_via_conf()
5915 {
5916 uint32_t fs_xattr_size;
5917 uint32_t fs_xattrs;
5918 uint32_t fs_xattr_max_value_size;
5919
5920 switch (m_fs_type) {
5921 #if defined(__linux__)
5922 case XFS_SUPER_MAGIC:
5923 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_xfs;
5924 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_xfs;
5925 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_xfs;
5926 break;
5927 case BTRFS_SUPER_MAGIC:
5928 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_btrfs;
5929 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_btrfs;
5930 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_btrfs;
5931 break;
5932 #endif
5933 default:
5934 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_other;
5935 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_other;
5936 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_other;
5937 break;
5938 }
5939
5940 // Use override value if set
5941 if (cct->_conf->filestore_max_inline_xattr_size)
5942 m_filestore_max_inline_xattr_size = cct->_conf->filestore_max_inline_xattr_size;
5943 else
5944 m_filestore_max_inline_xattr_size = fs_xattr_size;
5945
5946 // Use override value if set
5947 if (cct->_conf->filestore_max_inline_xattrs)
5948 m_filestore_max_inline_xattrs = cct->_conf->filestore_max_inline_xattrs;
5949 else
5950 m_filestore_max_inline_xattrs = fs_xattrs;
5951
5952 // Use override value if set
5953 if (cct->_conf->filestore_max_xattr_value_size)
5954 m_filestore_max_xattr_value_size = cct->_conf->filestore_max_xattr_value_size;
5955 else
5956 m_filestore_max_xattr_value_size = fs_xattr_max_value_size;
5957
5958 if (m_filestore_max_xattr_value_size < cct->_conf->osd_max_object_name_len) {
5959 derr << "WARNING: max attr value size ("
5960 << m_filestore_max_xattr_value_size
5961 << ") is smaller than osd_max_object_name_len ("
5962 << cct->_conf->osd_max_object_name_len
5963 << "). Your backend filesystem appears to not support attrs large "
5964 << "enough to handle the configured max rados name size. You may get "
5965 << "unexpected ENAMETOOLONG errors on rados operations or buggy "
5966 << "behavior"
5967 << dendl;
5968 }
5969 }
5970
5971 uint64_t FileStore::estimate_objects_overhead(uint64_t num_objects)
5972 {
5973 uint64_t res = num_objects * blk_size / 2; //assumes that each object uses ( in average ) additional 1/2 block due to FS allocation granularity.
5974 return res;
5975 }
5976
5977 int FileStore::apply_layout_settings(const coll_t &cid)
5978 {
5979 dout(20) << __FUNC__ << ": " << cid << dendl;
5980 Index index;
5981 int r = get_index(cid, &index);
5982 if (r < 0) {
5983 dout(10) << "Error getting index for " << cid << ": " << cpp_strerror(r)
5984 << dendl;
5985 return r;
5986 }
5987
5988 return index->apply_layout_settings();
5989 }
5990
5991
5992 // -- FSSuperblock --
5993
5994 void FSSuperblock::encode(bufferlist &bl) const
5995 {
5996 ENCODE_START(2, 1, bl);
5997 compat_features.encode(bl);
5998 ::encode(omap_backend, bl);
5999 ENCODE_FINISH(bl);
6000 }
6001
6002 void FSSuperblock::decode(bufferlist::iterator &bl)
6003 {
6004 DECODE_START(2, bl);
6005 compat_features.decode(bl);
6006 if (struct_v >= 2)
6007 ::decode(omap_backend, bl);
6008 else
6009 omap_backend = "leveldb";
6010 DECODE_FINISH(bl);
6011 }
6012
6013 void FSSuperblock::dump(Formatter *f) const
6014 {
6015 f->open_object_section("compat");
6016 compat_features.dump(f);
6017 f->dump_string("omap_backend", omap_backend);
6018 f->close_section();
6019 }
6020
6021 void FSSuperblock::generate_test_instances(list<FSSuperblock*>& o)
6022 {
6023 FSSuperblock z;
6024 o.push_back(new FSSuperblock(z));
6025 CompatSet::FeatureSet feature_compat;
6026 CompatSet::FeatureSet feature_ro_compat;
6027 CompatSet::FeatureSet feature_incompat;
6028 feature_incompat.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS);
6029 z.compat_features = CompatSet(feature_compat, feature_ro_compat,
6030 feature_incompat);
6031 o.push_back(new FSSuperblock(z));
6032 z.omap_backend = "rocksdb";
6033 o.push_back(new FSSuperblock(z));
6034 }