]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/filestore/FileStore.cc
update sources to v12.1.3
[ceph.git] / ceph / src / os / filestore / FileStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15 #include "include/compat.h"
16 #include "include/int_types.h"
17 #include "boost/tuple/tuple.hpp"
18
19 #include <unistd.h>
20 #include <stdlib.h>
21 #include <sys/types.h>
22 #include <sys/stat.h>
23 #include <fcntl.h>
24 #include <sys/file.h>
25 #include <errno.h>
26 #include <dirent.h>
27 #include <sys/ioctl.h>
28
29 #if defined(__linux__)
30 #include <linux/fs.h>
31 #endif
32
33 #include <iostream>
34 #include <map>
35
36 #include "include/linux_fiemap.h"
37
38 #include "common/xattr.h"
39 #include "chain_xattr.h"
40
41 #if defined(DARWIN) || defined(__FreeBSD__)
42 #include <sys/param.h>
43 #include <sys/mount.h>
44 #endif // DARWIN
45
46
47 #include <fstream>
48 #include <sstream>
49
50 #include "FileStore.h"
51 #include "GenericFileStoreBackend.h"
52 #include "BtrfsFileStoreBackend.h"
53 #include "XfsFileStoreBackend.h"
54 #include "ZFSFileStoreBackend.h"
55 #include "common/BackTrace.h"
56 #include "include/types.h"
57 #include "FileJournal.h"
58
59 #include "osd/osd_types.h"
60 #include "include/color.h"
61 #include "include/buffer.h"
62
63 #include "common/Timer.h"
64 #include "common/debug.h"
65 #include "common/errno.h"
66 #include "common/run_cmd.h"
67 #include "common/safe_io.h"
68 #include "common/perf_counters.h"
69 #include "common/sync_filesystem.h"
70 #include "common/fd.h"
71 #include "HashIndex.h"
72 #include "DBObjectMap.h"
73 #include "kv/KeyValueDB.h"
74
75 #include "common/ceph_crypto.h"
76 using ceph::crypto::SHA1;
77
78 #include "include/assert.h"
79
80 #include "common/config.h"
81 #include "common/blkdev.h"
82
83 #ifdef WITH_LTTNG
84 #define TRACEPOINT_DEFINE
85 #define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
86 #include "tracing/objectstore.h"
87 #undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE
88 #undef TRACEPOINT_DEFINE
89 #else
90 #define tracepoint(...)
91 #endif
92
93 #define dout_context cct
94 #define dout_subsys ceph_subsys_filestore
95 #undef dout_prefix
96 #define dout_prefix *_dout << "filestore(" << basedir << ") "
97
98 #define COMMIT_SNAP_ITEM "snap_%llu"
99 #define CLUSTER_SNAP_ITEM "clustersnap_%s"
100
101 #define REPLAY_GUARD_XATTR "user.cephos.seq"
102 #define GLOBAL_REPLAY_GUARD_XATTR "user.cephos.gseq"
103
104 // XATTR_SPILL_OUT_NAME as a xattr is used to maintain that indicates whether
105 // xattrs spill over into DBObjectMap, if XATTR_SPILL_OUT_NAME exists in file
106 // xattrs and the value is "no", it indicates no xattrs in DBObjectMap
107 #define XATTR_SPILL_OUT_NAME "user.cephos.spill_out"
108 #define XATTR_NO_SPILL_OUT "0"
109 #define XATTR_SPILL_OUT "1"
110 #define __FUNC__ __func__ << "(" << __LINE__ << ")"
111
112 //Initial features in new superblock.
113 static CompatSet get_fs_initial_compat_set() {
114 CompatSet::FeatureSet ceph_osd_feature_compat;
115 CompatSet::FeatureSet ceph_osd_feature_ro_compat;
116 CompatSet::FeatureSet ceph_osd_feature_incompat;
117 return CompatSet(ceph_osd_feature_compat, ceph_osd_feature_ro_compat,
118 ceph_osd_feature_incompat);
119 }
120
121 //Features are added here that this FileStore supports.
122 static CompatSet get_fs_supported_compat_set() {
123 CompatSet compat = get_fs_initial_compat_set();
124 //Any features here can be set in code, but not in initial superblock
125 compat.incompat.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS);
126 return compat;
127 }
128
129 int FileStore::validate_hobject_key(const hobject_t &obj) const
130 {
131 unsigned len = LFNIndex::get_max_escaped_name_len(obj);
132 return len > m_filestore_max_xattr_value_size ? -ENAMETOOLONG : 0;
133 }
134
135 int FileStore::get_block_device_fsid(CephContext* cct, const string& path,
136 uuid_d *fsid)
137 {
138 // make sure we don't try to use aio or direct_io (and get annoying
139 // error messages from failing to do so); performance implications
140 // should be irrelevant for this use
141 FileJournal j(cct, *fsid, 0, 0, path.c_str(), false, false);
142 return j.peek_fsid(*fsid);
143 }
144
145 void FileStore::FSPerfTracker::update_from_perfcounters(
146 PerfCounters &logger)
147 {
148 os_commit_latency.consume_next(
149 logger.get_tavg_ms(
150 l_filestore_journal_latency));
151 os_apply_latency.consume_next(
152 logger.get_tavg_ms(
153 l_filestore_apply_latency));
154 }
155
156
157 ostream& operator<<(ostream& out, const FileStore::OpSequencer& s)
158 {
159 return out << *s.parent;
160 }
161
162 int FileStore::get_cdir(const coll_t& cid, char *s, int len)
163 {
164 const string &cid_str(cid.to_str());
165 return snprintf(s, len, "%s/current/%s", basedir.c_str(), cid_str.c_str());
166 }
167
168 int FileStore::get_index(const coll_t& cid, Index *index)
169 {
170 int r = index_manager.get_index(cid, basedir, index);
171 assert(!m_filestore_fail_eio || r != -EIO);
172 return r;
173 }
174
175 int FileStore::init_index(const coll_t& cid)
176 {
177 char path[PATH_MAX];
178 get_cdir(cid, path, sizeof(path));
179 int r = index_manager.init_index(cid, path, target_version);
180 assert(!m_filestore_fail_eio || r != -EIO);
181 return r;
182 }
183
184 int FileStore::lfn_find(const ghobject_t& oid, const Index& index, IndexedPath *path)
185 {
186 IndexedPath path2;
187 if (!path)
188 path = &path2;
189 int r, exist;
190 assert(NULL != index.index);
191 r = (index.index)->lookup(oid, path, &exist);
192 if (r < 0) {
193 assert(!m_filestore_fail_eio || r != -EIO);
194 return r;
195 }
196 if (!exist)
197 return -ENOENT;
198 return 0;
199 }
200
201 int FileStore::lfn_truncate(const coll_t& cid, const ghobject_t& oid, off_t length)
202 {
203 FDRef fd;
204 int r = lfn_open(cid, oid, false, &fd);
205 if (r < 0)
206 return r;
207 r = ::ftruncate(**fd, length);
208 if (r < 0)
209 r = -errno;
210 if (r >= 0 && m_filestore_sloppy_crc) {
211 int rc = backend->_crc_update_truncate(**fd, length);
212 assert(rc >= 0);
213 }
214 lfn_close(fd);
215 assert(!m_filestore_fail_eio || r != -EIO);
216 return r;
217 }
218
219 int FileStore::lfn_stat(const coll_t& cid, const ghobject_t& oid, struct stat *buf)
220 {
221 IndexedPath path;
222 Index index;
223 int r = get_index(cid, &index);
224 if (r < 0)
225 return r;
226
227 assert(NULL != index.index);
228 RWLock::RLocker l((index.index)->access_lock);
229
230 r = lfn_find(oid, index, &path);
231 if (r < 0)
232 return r;
233 r = ::stat(path->path(), buf);
234 if (r < 0)
235 r = -errno;
236 return r;
237 }
238
239 int FileStore::lfn_open(const coll_t& cid,
240 const ghobject_t& oid,
241 bool create,
242 FDRef *outfd,
243 Index *index)
244 {
245 assert(outfd);
246 int r = 0;
247 bool need_lock = true;
248 int flags = O_RDWR;
249
250 if (create)
251 flags |= O_CREAT;
252 if (cct->_conf->filestore_odsync_write) {
253 flags |= O_DSYNC;
254 }
255
256 Index index2;
257 if (!index) {
258 index = &index2;
259 }
260 if (!((*index).index)) {
261 r = get_index(cid, index);
262 if (r < 0) {
263 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
264 return r;
265 }
266 } else {
267 need_lock = false;
268 }
269
270 int fd, exist;
271 assert(NULL != (*index).index);
272 if (need_lock) {
273 ((*index).index)->access_lock.get_write();
274 }
275 if (!replaying) {
276 *outfd = fdcache.lookup(oid);
277 if (*outfd) {
278 if (need_lock) {
279 ((*index).index)->access_lock.put_write();
280 }
281 return 0;
282 }
283 }
284
285
286 IndexedPath path2;
287 IndexedPath *path = &path2;
288
289 r = (*index)->lookup(oid, path, &exist);
290 if (r < 0) {
291 derr << "could not find " << oid << " in index: "
292 << cpp_strerror(-r) << dendl;
293 goto fail;
294 }
295
296 r = ::open((*path)->path(), flags, 0644);
297 if (r < 0) {
298 r = -errno;
299 dout(10) << "error opening file " << (*path)->path() << " with flags="
300 << flags << ": " << cpp_strerror(-r) << dendl;
301 goto fail;
302 }
303 fd = r;
304 if (create && (!exist)) {
305 r = (*index)->created(oid, (*path)->path());
306 if (r < 0) {
307 VOID_TEMP_FAILURE_RETRY(::close(fd));
308 derr << "error creating " << oid << " (" << (*path)->path()
309 << ") in index: " << cpp_strerror(-r) << dendl;
310 goto fail;
311 }
312 r = chain_fsetxattr<true, true>(
313 fd, XATTR_SPILL_OUT_NAME,
314 XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT));
315 if (r < 0) {
316 VOID_TEMP_FAILURE_RETRY(::close(fd));
317 derr << "error setting spillout xattr for oid " << oid << " (" << (*path)->path()
318 << "):" << cpp_strerror(-r) << dendl;
319 goto fail;
320 }
321 }
322
323 if (!replaying) {
324 bool existed;
325 *outfd = fdcache.add(oid, fd, &existed);
326 if (existed) {
327 TEMP_FAILURE_RETRY(::close(fd));
328 }
329 } else {
330 *outfd = std::make_shared<FDCache::FD>(fd);
331 }
332
333 if (need_lock) {
334 ((*index).index)->access_lock.put_write();
335 }
336
337 return 0;
338
339 fail:
340
341 if (need_lock) {
342 ((*index).index)->access_lock.put_write();
343 }
344
345 assert(!m_filestore_fail_eio || r != -EIO);
346 return r;
347 }
348
349 void FileStore::lfn_close(FDRef fd)
350 {
351 }
352
353 int FileStore::lfn_link(const coll_t& c, const coll_t& newcid, const ghobject_t& o, const ghobject_t& newoid)
354 {
355 Index index_new, index_old;
356 IndexedPath path_new, path_old;
357 int exist;
358 int r;
359 bool index_same = false;
360 if (c < newcid) {
361 r = get_index(newcid, &index_new);
362 if (r < 0)
363 return r;
364 r = get_index(c, &index_old);
365 if (r < 0)
366 return r;
367 } else if (c == newcid) {
368 r = get_index(c, &index_old);
369 if (r < 0)
370 return r;
371 index_new = index_old;
372 index_same = true;
373 } else {
374 r = get_index(c, &index_old);
375 if (r < 0)
376 return r;
377 r = get_index(newcid, &index_new);
378 if (r < 0)
379 return r;
380 }
381
382 assert(NULL != index_old.index);
383 assert(NULL != index_new.index);
384
385 if (!index_same) {
386
387 RWLock::RLocker l1((index_old.index)->access_lock);
388
389 r = index_old->lookup(o, &path_old, &exist);
390 if (r < 0) {
391 assert(!m_filestore_fail_eio || r != -EIO);
392 return r;
393 }
394 if (!exist)
395 return -ENOENT;
396
397 RWLock::WLocker l2((index_new.index)->access_lock);
398
399 r = index_new->lookup(newoid, &path_new, &exist);
400 if (r < 0) {
401 assert(!m_filestore_fail_eio || r != -EIO);
402 return r;
403 }
404 if (exist)
405 return -EEXIST;
406
407 dout(25) << __FUNC__ << ": path_old: " << path_old << dendl;
408 dout(25) << __FUNC__ << ": path_new: " << path_new << dendl;
409 r = ::link(path_old->path(), path_new->path());
410 if (r < 0)
411 return -errno;
412
413 r = index_new->created(newoid, path_new->path());
414 if (r < 0) {
415 assert(!m_filestore_fail_eio || r != -EIO);
416 return r;
417 }
418 } else {
419 RWLock::WLocker l1((index_old.index)->access_lock);
420
421 r = index_old->lookup(o, &path_old, &exist);
422 if (r < 0) {
423 assert(!m_filestore_fail_eio || r != -EIO);
424 return r;
425 }
426 if (!exist)
427 return -ENOENT;
428
429 r = index_new->lookup(newoid, &path_new, &exist);
430 if (r < 0) {
431 assert(!m_filestore_fail_eio || r != -EIO);
432 return r;
433 }
434 if (exist)
435 return -EEXIST;
436
437 dout(25) << __FUNC__ << ": path_old: " << path_old << dendl;
438 dout(25) << __FUNC__ << ": path_new: " << path_new << dendl;
439 r = ::link(path_old->path(), path_new->path());
440 if (r < 0)
441 return -errno;
442
443 // make sure old fd for unlinked/overwritten file is gone
444 fdcache.clear(newoid);
445
446 r = index_new->created(newoid, path_new->path());
447 if (r < 0) {
448 assert(!m_filestore_fail_eio || r != -EIO);
449 return r;
450 }
451 }
452 return 0;
453 }
454
455 int FileStore::lfn_unlink(const coll_t& cid, const ghobject_t& o,
456 const SequencerPosition &spos,
457 bool force_clear_omap)
458 {
459 Index index;
460 int r = get_index(cid, &index);
461 if (r < 0) {
462 dout(25) << __FUNC__ << ": get_index failed " << cpp_strerror(r) << dendl;
463 return r;
464 }
465
466 assert(NULL != index.index);
467 RWLock::WLocker l((index.index)->access_lock);
468
469 {
470 IndexedPath path;
471 int hardlink;
472 r = index->lookup(o, &path, &hardlink);
473 if (r < 0) {
474 assert(!m_filestore_fail_eio || r != -EIO);
475 return r;
476 }
477
478 if (!force_clear_omap) {
479 if (hardlink == 0 || hardlink == 1) {
480 force_clear_omap = true;
481 }
482 }
483 if (force_clear_omap) {
484 dout(20) << __FUNC__ << ": clearing omap on " << o
485 << " in cid " << cid << dendl;
486 r = object_map->clear(o, &spos);
487 if (r < 0 && r != -ENOENT) {
488 dout(25) << __FUNC__ << ": omap clear failed " << cpp_strerror(r) << dendl;
489 assert(!m_filestore_fail_eio || r != -EIO);
490 return r;
491 }
492 if (cct->_conf->filestore_debug_inject_read_err) {
493 debug_obj_on_delete(o);
494 }
495 if (!m_disable_wbthrottle) {
496 wbthrottle.clear_object(o); // should be only non-cache ref
497 }
498 fdcache.clear(o);
499 } else {
500 /* Ensure that replay of this op doesn't result in the object_map
501 * going away.
502 */
503 if (!backend->can_checkpoint())
504 object_map->sync(&o, &spos);
505 }
506 if (hardlink == 0) {
507 if (!m_disable_wbthrottle) {
508 wbthrottle.clear_object(o); // should be only non-cache ref
509 }
510 return 0;
511 }
512 }
513 r = index->unlink(o);
514 if (r < 0) {
515 dout(25) << __FUNC__ << ": index unlink failed " << cpp_strerror(r) << dendl;
516 return r;
517 }
518 return 0;
519 }
520
521 FileStore::FileStore(CephContext* cct, const std::string &base,
522 const std::string &jdev, osflagbits_t flags,
523 const char *name, bool do_update) :
524 JournalingObjectStore(cct, base),
525 internal_name(name),
526 basedir(base), journalpath(jdev),
527 generic_flags(flags),
528 blk_size(0),
529 fsid_fd(-1), op_fd(-1),
530 basedir_fd(-1), current_fd(-1),
531 backend(NULL),
532 index_manager(cct, do_update),
533 lock("FileStore::lock"),
534 force_sync(false),
535 sync_entry_timeo_lock("FileStore::sync_entry_timeo_lock"),
536 timer(cct, sync_entry_timeo_lock),
537 stop(false), sync_thread(this),
538 fdcache(cct),
539 wbthrottle(cct),
540 next_osr_id(0),
541 m_disable_wbthrottle(cct->_conf->filestore_odsync_write ||
542 !cct->_conf->filestore_wbthrottle_enable),
543 throttle_ops(cct, "filestore_ops", cct->_conf->filestore_caller_concurrency),
544 throttle_bytes(cct, "filestore_bytes", cct->_conf->filestore_caller_concurrency),
545 m_ondisk_finisher_num(cct->_conf->filestore_ondisk_finisher_threads),
546 m_apply_finisher_num(cct->_conf->filestore_apply_finisher_threads),
547 op_tp(cct, "FileStore::op_tp", "tp_fstore_op", cct->_conf->filestore_op_threads, "filestore_op_threads"),
548 op_wq(this, cct->_conf->filestore_op_thread_timeout,
549 cct->_conf->filestore_op_thread_suicide_timeout, &op_tp),
550 logger(NULL),
551 trace_endpoint("0.0.0.0", 0, "FileStore"),
552 read_error_lock("FileStore::read_error_lock"),
553 m_filestore_commit_timeout(cct->_conf->filestore_commit_timeout),
554 m_filestore_journal_parallel(cct->_conf->filestore_journal_parallel ),
555 m_filestore_journal_trailing(cct->_conf->filestore_journal_trailing),
556 m_filestore_journal_writeahead(cct->_conf->filestore_journal_writeahead),
557 m_filestore_fiemap_threshold(cct->_conf->filestore_fiemap_threshold),
558 m_filestore_max_sync_interval(cct->_conf->filestore_max_sync_interval),
559 m_filestore_min_sync_interval(cct->_conf->filestore_min_sync_interval),
560 m_filestore_fail_eio(cct->_conf->filestore_fail_eio),
561 m_filestore_fadvise(cct->_conf->filestore_fadvise),
562 do_update(do_update),
563 m_journal_dio(cct->_conf->journal_dio),
564 m_journal_aio(cct->_conf->journal_aio),
565 m_journal_force_aio(cct->_conf->journal_force_aio),
566 m_osd_rollback_to_cluster_snap(cct->_conf->osd_rollback_to_cluster_snap),
567 m_osd_use_stale_snap(cct->_conf->osd_use_stale_snap),
568 m_filestore_do_dump(false),
569 m_filestore_dump_fmt(true),
570 m_filestore_sloppy_crc(cct->_conf->filestore_sloppy_crc),
571 m_filestore_sloppy_crc_block_size(cct->_conf->filestore_sloppy_crc_block_size),
572 m_filestore_max_alloc_hint_size(cct->_conf->filestore_max_alloc_hint_size),
573 m_fs_type(0),
574 m_filestore_max_inline_xattr_size(0),
575 m_filestore_max_inline_xattrs(0),
576 m_filestore_max_xattr_value_size(0)
577 {
578 m_filestore_kill_at = cct->_conf->filestore_kill_at;
579 for (int i = 0; i < m_ondisk_finisher_num; ++i) {
580 ostringstream oss;
581 oss << "filestore-ondisk-" << i;
582 Finisher *f = new Finisher(cct, oss.str(), "fn_odsk_fstore");
583 ondisk_finishers.push_back(f);
584 }
585 for (int i = 0; i < m_apply_finisher_num; ++i) {
586 ostringstream oss;
587 oss << "filestore-apply-" << i;
588 Finisher *f = new Finisher(cct, oss.str(), "fn_appl_fstore");
589 apply_finishers.push_back(f);
590 }
591
592 ostringstream oss;
593 oss << basedir << "/current";
594 current_fn = oss.str();
595
596 ostringstream sss;
597 sss << basedir << "/current/commit_op_seq";
598 current_op_seq_fn = sss.str();
599
600 ostringstream omss;
601 if (cct->_conf->filestore_omap_backend_path != "") {
602 omap_dir = cct->_conf->filestore_omap_backend_path;
603 } else {
604 omss << basedir << "/current/omap";
605 omap_dir = omss.str();
606 }
607
608 // initialize logger
609 PerfCountersBuilder plb(cct, internal_name, l_filestore_first, l_filestore_last);
610
611 plb.add_u64(l_filestore_journal_queue_ops, "journal_queue_ops", "Operations in journal queue");
612 plb.add_u64(l_filestore_journal_ops, "journal_ops", "Active journal entries to be applied");
613 plb.add_u64(l_filestore_journal_queue_bytes, "journal_queue_bytes", "Size of journal queue");
614 plb.add_u64(l_filestore_journal_bytes, "journal_bytes", "Active journal operation size to be applied");
615 plb.add_time_avg(l_filestore_journal_latency, "journal_latency", "Average journal queue completing latency");
616 plb.add_u64_counter(l_filestore_journal_wr, "journal_wr", "Journal write IOs");
617 plb.add_u64_avg(l_filestore_journal_wr_bytes, "journal_wr_bytes", "Journal data written");
618 plb.add_u64(l_filestore_op_queue_max_ops, "op_queue_max_ops", "Max operations in writing to FS queue");
619 plb.add_u64(l_filestore_op_queue_ops, "op_queue_ops", "Operations in writing to FS queue");
620 plb.add_u64_counter(l_filestore_ops, "ops", "Operations written to store");
621 plb.add_u64(l_filestore_op_queue_max_bytes, "op_queue_max_bytes", "Max data in writing to FS queue");
622 plb.add_u64(l_filestore_op_queue_bytes, "op_queue_bytes", "Size of writing to FS queue");
623 plb.add_u64_counter(l_filestore_bytes, "bytes", "Data written to store");
624 plb.add_time_avg(l_filestore_apply_latency, "apply_latency", "Apply latency");
625 plb.add_u64(l_filestore_committing, "committing", "Is currently committing");
626
627 plb.add_u64_counter(l_filestore_commitcycle, "commitcycle", "Commit cycles");
628 plb.add_time_avg(l_filestore_commitcycle_interval, "commitcycle_interval", "Average interval between commits");
629 plb.add_time_avg(l_filestore_commitcycle_latency, "commitcycle_latency", "Average latency of commit");
630 plb.add_u64_counter(l_filestore_journal_full, "journal_full", "Journal writes while full");
631 plb.add_time_avg(l_filestore_queue_transaction_latency_avg, "queue_transaction_latency_avg", "Store operation queue latency");
632 plb.add_time(l_filestore_sync_pause_max_lat, "sync_pause_max_latency", "Max latency of op_wq pause before syncfs");
633
634 logger = plb.create_perf_counters();
635
636 cct->get_perfcounters_collection()->add(logger);
637 cct->_conf->add_observer(this);
638
639 superblock.compat_features = get_fs_initial_compat_set();
640 }
641
642 FileStore::~FileStore()
643 {
644 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
645 delete *it;
646 *it = NULL;
647 }
648 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
649 delete *it;
650 *it = NULL;
651 }
652 cct->_conf->remove_observer(this);
653 cct->get_perfcounters_collection()->remove(logger);
654
655 if (journal)
656 journal->logger = NULL;
657 delete logger;
658
659 if (m_filestore_do_dump) {
660 dump_stop();
661 }
662 }
663
664 static void get_attrname(const char *name, char *buf, int len)
665 {
666 snprintf(buf, len, "user.ceph.%s", name);
667 }
668
669 bool parse_attrname(char **name)
670 {
671 if (strncmp(*name, "user.ceph.", 10) == 0) {
672 *name += 10;
673 return true;
674 }
675 return false;
676 }
677
678 void FileStore::collect_metadata(map<string,string> *pm)
679 {
680 char partition_path[PATH_MAX];
681 char dev_node[PATH_MAX];
682 int rc = 0;
683
684 (*pm)["filestore_backend"] = backend->get_name();
685 ostringstream ss;
686 ss << "0x" << std::hex << m_fs_type << std::dec;
687 (*pm)["filestore_f_type"] = ss.str();
688
689 if (cct->_conf->filestore_collect_device_partition_information) {
690 rc = get_device_by_uuid(get_fsid(), "PARTUUID", partition_path,
691 dev_node);
692 } else {
693 rc = -EINVAL;
694 }
695
696 switch (rc) {
697 case -EOPNOTSUPP:
698 case -EINVAL:
699 (*pm)["backend_filestore_partition_path"] = "unknown";
700 (*pm)["backend_filestore_dev_node"] = "unknown";
701 break;
702 case -ENODEV:
703 (*pm)["backend_filestore_partition_path"] = string(partition_path);
704 (*pm)["backend_filestore_dev_node"] = "unknown";
705 break;
706 default:
707 (*pm)["backend_filestore_partition_path"] = string(partition_path);
708 (*pm)["backend_filestore_dev_node"] = string(dev_node);
709 }
710 }
711
712 int FileStore::statfs(struct store_statfs_t *buf0)
713 {
714 struct statfs buf;
715 buf0->reset();
716 if (::statfs(basedir.c_str(), &buf) < 0) {
717 int r = -errno;
718 assert(!m_filestore_fail_eio || r != -EIO);
719 assert(r != -ENOENT);
720 return r;
721 }
722 buf0->total = buf.f_blocks * buf.f_bsize;
723 buf0->available = buf.f_bavail * buf.f_bsize;
724 // Adjust for writes pending in the journal
725 if (journal) {
726 uint64_t estimate = journal->get_journal_size_estimate();
727 if (buf0->available > estimate)
728 buf0->available -= estimate;
729 else
730 buf0->available = 0;
731 }
732 return 0;
733 }
734
735
736 void FileStore::new_journal()
737 {
738 if (journalpath.length()) {
739 dout(10) << "open_journal at " << journalpath << dendl;
740 journal = new FileJournal(cct, fsid, &finisher, &sync_cond,
741 journalpath.c_str(),
742 m_journal_dio, m_journal_aio,
743 m_journal_force_aio);
744 if (journal)
745 journal->logger = logger;
746 }
747 return;
748 }
749
750 int FileStore::dump_journal(ostream& out)
751 {
752 int r;
753
754 if (!journalpath.length())
755 return -EINVAL;
756
757 FileJournal *journal = new FileJournal(cct, fsid, &finisher, &sync_cond, journalpath.c_str(), m_journal_dio);
758 r = journal->dump(out);
759 delete journal;
760 return r;
761 }
762
763 FileStoreBackend *FileStoreBackend::create(long f_type, FileStore *fs)
764 {
765 switch (f_type) {
766 #if defined(__linux__)
767 case BTRFS_SUPER_MAGIC:
768 return new BtrfsFileStoreBackend(fs);
769 # ifdef HAVE_LIBXFS
770 case XFS_SUPER_MAGIC:
771 return new XfsFileStoreBackend(fs);
772 # endif
773 #endif
774 #ifdef HAVE_LIBZFS
775 case ZFS_SUPER_MAGIC:
776 return new ZFSFileStoreBackend(fs);
777 #endif
778 default:
779 return new GenericFileStoreBackend(fs);
780 }
781 }
782
783 void FileStore::create_backend(long f_type)
784 {
785 m_fs_type = f_type;
786
787 assert(backend == NULL);
788 backend = FileStoreBackend::create(f_type, this);
789
790 dout(0) << "backend " << backend->get_name()
791 << " (magic 0x" << std::hex << f_type << std::dec << ")"
792 << dendl;
793
794 switch (f_type) {
795 #if defined(__linux__)
796 case BTRFS_SUPER_MAGIC:
797 if (!m_disable_wbthrottle){
798 wbthrottle.set_fs(WBThrottle::BTRFS);
799 }
800 break;
801
802 case XFS_SUPER_MAGIC:
803 // wbthrottle is constructed with fs(WBThrottle::XFS)
804 break;
805 #endif
806 }
807
808 set_xattr_limits_via_conf();
809 }
810
811 int FileStore::mkfs()
812 {
813 int ret = 0;
814 char fsid_fn[PATH_MAX];
815 char fsid_str[40];
816 uuid_d old_fsid;
817 uuid_d old_omap_fsid;
818
819 dout(1) << "mkfs in " << basedir << dendl;
820 basedir_fd = ::open(basedir.c_str(), O_RDONLY);
821 if (basedir_fd < 0) {
822 ret = -errno;
823 derr << __FUNC__ << ": failed to open base dir " << basedir << ": " << cpp_strerror(ret) << dendl;
824 return ret;
825 }
826
827 // open+lock fsid
828 snprintf(fsid_fn, sizeof(fsid_fn), "%s/fsid", basedir.c_str());
829 fsid_fd = ::open(fsid_fn, O_RDWR|O_CREAT, 0644);
830 if (fsid_fd < 0) {
831 ret = -errno;
832 derr << __FUNC__ << ": failed to open " << fsid_fn << ": " << cpp_strerror(ret) << dendl;
833 goto close_basedir_fd;
834 }
835
836 if (lock_fsid() < 0) {
837 ret = -EBUSY;
838 goto close_fsid_fd;
839 }
840
841 if (read_fsid(fsid_fd, &old_fsid) < 0 || old_fsid.is_zero()) {
842 if (fsid.is_zero()) {
843 fsid.generate_random();
844 dout(1) << __FUNC__ << ": generated fsid " << fsid << dendl;
845 } else {
846 dout(1) << __FUNC__ << ": using provided fsid " << fsid << dendl;
847 }
848
849 fsid.print(fsid_str);
850 strcat(fsid_str, "\n");
851 ret = ::ftruncate(fsid_fd, 0);
852 if (ret < 0) {
853 ret = -errno;
854 derr << __FUNC__ << ": failed to truncate fsid: "
855 << cpp_strerror(ret) << dendl;
856 goto close_fsid_fd;
857 }
858 ret = safe_write(fsid_fd, fsid_str, strlen(fsid_str));
859 if (ret < 0) {
860 derr << __FUNC__ << ": failed to write fsid: "
861 << cpp_strerror(ret) << dendl;
862 goto close_fsid_fd;
863 }
864 if (::fsync(fsid_fd) < 0) {
865 ret = -errno;
866 derr << __FUNC__ << ": close failed: can't write fsid: "
867 << cpp_strerror(ret) << dendl;
868 goto close_fsid_fd;
869 }
870 dout(10) << __FUNC__ << ": fsid is " << fsid << dendl;
871 } else {
872 if (!fsid.is_zero() && fsid != old_fsid) {
873 derr << __FUNC__ << ": on-disk fsid " << old_fsid << " != provided " << fsid << dendl;
874 ret = -EINVAL;
875 goto close_fsid_fd;
876 }
877 fsid = old_fsid;
878 dout(1) << __FUNC__ << ": fsid is already set to " << fsid << dendl;
879 }
880
881 // version stamp
882 ret = write_version_stamp();
883 if (ret < 0) {
884 derr << __FUNC__ << ": write_version_stamp() failed: "
885 << cpp_strerror(ret) << dendl;
886 goto close_fsid_fd;
887 }
888
889 // superblock
890 superblock.omap_backend = cct->_conf->filestore_omap_backend;
891 ret = write_superblock();
892 if (ret < 0) {
893 derr << __FUNC__ << ": write_superblock() failed: "
894 << cpp_strerror(ret) << dendl;
895 goto close_fsid_fd;
896 }
897
898 struct statfs basefs;
899 ret = ::fstatfs(basedir_fd, &basefs);
900 if (ret < 0) {
901 ret = -errno;
902 derr << __FUNC__ << ": cannot fstatfs basedir "
903 << cpp_strerror(ret) << dendl;
904 goto close_fsid_fd;
905 }
906
907 #if defined(__linux__)
908 if (basefs.f_type == BTRFS_SUPER_MAGIC &&
909 !g_ceph_context->check_experimental_feature_enabled("btrfs")) {
910 derr << __FUNC__ << ": deprecated btrfs support is not enabled" << dendl;
911 goto close_fsid_fd;
912 }
913 #endif
914
915 create_backend(basefs.f_type);
916
917 ret = backend->create_current();
918 if (ret < 0) {
919 derr << __FUNC__ << ": failed to create current/ " << cpp_strerror(ret) << dendl;
920 goto close_fsid_fd;
921 }
922
923 // write initial op_seq
924 {
925 uint64_t initial_seq = 0;
926 int fd = read_op_seq(&initial_seq);
927 if (fd < 0) {
928 ret = fd;
929 derr << __FUNC__ << ": failed to create " << current_op_seq_fn << ": "
930 << cpp_strerror(ret) << dendl;
931 goto close_fsid_fd;
932 }
933 if (initial_seq == 0) {
934 ret = write_op_seq(fd, 1);
935 if (ret < 0) {
936 VOID_TEMP_FAILURE_RETRY(::close(fd));
937 derr << __FUNC__ << ": failed to write to " << current_op_seq_fn << ": "
938 << cpp_strerror(ret) << dendl;
939 goto close_fsid_fd;
940 }
941
942 if (backend->can_checkpoint()) {
943 // create snap_1 too
944 current_fd = ::open(current_fn.c_str(), O_RDONLY);
945 assert(current_fd >= 0);
946 char s[NAME_MAX];
947 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, 1ull);
948 ret = backend->create_checkpoint(s, NULL);
949 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
950 if (ret < 0 && ret != -EEXIST) {
951 VOID_TEMP_FAILURE_RETRY(::close(fd));
952 derr << __FUNC__ << ": failed to create snap_1: " << cpp_strerror(ret) << dendl;
953 goto close_fsid_fd;
954 }
955 }
956 }
957 VOID_TEMP_FAILURE_RETRY(::close(fd));
958 }
959 ret = KeyValueDB::test_init(superblock.omap_backend, omap_dir);
960 if (ret < 0) {
961 derr << __FUNC__ << ": failed to create " << cct->_conf->filestore_omap_backend << dendl;
962 goto close_fsid_fd;
963 }
964 // create fsid under omap
965 // open+lock fsid
966 int omap_fsid_fd;
967 char omap_fsid_fn[PATH_MAX];
968 snprintf(omap_fsid_fn, sizeof(omap_fsid_fn), "%s/osd_uuid", omap_dir.c_str());
969 omap_fsid_fd = ::open(omap_fsid_fn, O_RDWR|O_CREAT, 0644);
970 if (omap_fsid_fd < 0) {
971 ret = -errno;
972 derr << __FUNC__ << ": failed to open " << omap_fsid_fn << ": " << cpp_strerror(ret) << dendl;
973 goto close_fsid_fd;
974 }
975
976 if (read_fsid(omap_fsid_fd, &old_omap_fsid) < 0 || old_omap_fsid.is_zero()) {
977 assert(!fsid.is_zero());
978 fsid.print(fsid_str);
979 strcat(fsid_str, "\n");
980 ret = ::ftruncate(omap_fsid_fd, 0);
981 if (ret < 0) {
982 ret = -errno;
983 derr << __FUNC__ << ": failed to truncate fsid: "
984 << cpp_strerror(ret) << dendl;
985 goto close_omap_fsid_fd;
986 }
987 ret = safe_write(omap_fsid_fd, fsid_str, strlen(fsid_str));
988 if (ret < 0) {
989 derr << __FUNC__ << ": failed to write fsid: "
990 << cpp_strerror(ret) << dendl;
991 goto close_omap_fsid_fd;
992 }
993 dout(10) << __FUNC__ << ": write success, fsid:" << fsid_str << ", ret:" << ret << dendl;
994 if (::fsync(omap_fsid_fd) < 0) {
995 ret = -errno;
996 derr << __FUNC__ << ": close failed: can't write fsid: "
997 << cpp_strerror(ret) << dendl;
998 goto close_omap_fsid_fd;
999 }
1000 dout(10) << "mkfs omap fsid is " << fsid << dendl;
1001 } else {
1002 if (fsid != old_omap_fsid) {
1003 derr << __FUNC__ << ": " << omap_fsid_fn
1004 << " has existed omap fsid " << old_omap_fsid
1005 << " != expected osd fsid " << fsid
1006 << dendl;
1007 ret = -EINVAL;
1008 goto close_omap_fsid_fd;
1009 }
1010 dout(1) << __FUNC__ << ": omap fsid is already set to " << fsid << dendl;
1011 }
1012
1013 dout(1) << cct->_conf->filestore_omap_backend << " db exists/created" << dendl;
1014
1015 // journal?
1016 ret = mkjournal();
1017 if (ret)
1018 goto close_omap_fsid_fd;
1019
1020 ret = write_meta("type", "filestore");
1021 if (ret)
1022 goto close_omap_fsid_fd;
1023
1024 dout(1) << "mkfs done in " << basedir << dendl;
1025 ret = 0;
1026
1027 close_omap_fsid_fd:
1028 VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd));
1029 close_fsid_fd:
1030 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1031 fsid_fd = -1;
1032 close_basedir_fd:
1033 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1034 delete backend;
1035 backend = NULL;
1036 return ret;
1037 }
1038
1039 int FileStore::mkjournal()
1040 {
1041 // read fsid
1042 int ret;
1043 char fn[PATH_MAX];
1044 snprintf(fn, sizeof(fn), "%s/fsid", basedir.c_str());
1045 int fd = ::open(fn, O_RDONLY, 0644);
1046 if (fd < 0) {
1047 int err = errno;
1048 derr << __FUNC__ << ": open error: " << cpp_strerror(err) << dendl;
1049 return -err;
1050 }
1051 ret = read_fsid(fd, &fsid);
1052 if (ret < 0) {
1053 derr << __FUNC__ << ": read error: " << cpp_strerror(ret) << dendl;
1054 VOID_TEMP_FAILURE_RETRY(::close(fd));
1055 return ret;
1056 }
1057 VOID_TEMP_FAILURE_RETRY(::close(fd));
1058
1059 ret = 0;
1060
1061 new_journal();
1062 if (journal) {
1063 ret = journal->check();
1064 if (ret < 0) {
1065 ret = journal->create();
1066 if (ret)
1067 derr << __FUNC__ << ": error creating journal on " << journalpath
1068 << ": " << cpp_strerror(ret) << dendl;
1069 else
1070 dout(0) << __FUNC__ << ": created journal on " << journalpath << dendl;
1071 }
1072 delete journal;
1073 journal = 0;
1074 }
1075 return ret;
1076 }
1077
1078 int FileStore::read_fsid(int fd, uuid_d *uuid)
1079 {
1080 char fsid_str[40];
1081 memset(fsid_str, 0, sizeof(fsid_str));
1082 int ret = safe_read(fd, fsid_str, sizeof(fsid_str));
1083 if (ret < 0)
1084 return ret;
1085 if (ret == 8) {
1086 // old 64-bit fsid... mirror it.
1087 *(uint64_t*)&uuid->bytes()[0] = *(uint64_t*)fsid_str;
1088 *(uint64_t*)&uuid->bytes()[8] = *(uint64_t*)fsid_str;
1089 return 0;
1090 }
1091
1092 if (ret > 36)
1093 fsid_str[36] = 0;
1094 else
1095 fsid_str[ret] = 0;
1096 if (!uuid->parse(fsid_str))
1097 return -EINVAL;
1098 return 0;
1099 }
1100
1101 int FileStore::lock_fsid()
1102 {
1103 struct flock l;
1104 memset(&l, 0, sizeof(l));
1105 l.l_type = F_WRLCK;
1106 l.l_whence = SEEK_SET;
1107 l.l_start = 0;
1108 l.l_len = 0;
1109 int r = ::fcntl(fsid_fd, F_SETLK, &l);
1110 if (r < 0) {
1111 int err = errno;
1112 dout(0) << __FUNC__ << ": failed to lock " << basedir << "/fsid, is another ceph-osd still running? "
1113 << cpp_strerror(err) << dendl;
1114 return -err;
1115 }
1116 return 0;
1117 }
1118
1119 bool FileStore::test_mount_in_use()
1120 {
1121 dout(5) << __FUNC__ << ": basedir " << basedir << " journal " << journalpath << dendl;
1122 char fn[PATH_MAX];
1123 snprintf(fn, sizeof(fn), "%s/fsid", basedir.c_str());
1124
1125 // verify fs isn't in use
1126
1127 fsid_fd = ::open(fn, O_RDWR, 0644);
1128 if (fsid_fd < 0)
1129 return 0; // no fsid, ok.
1130 bool inuse = lock_fsid() < 0;
1131 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1132 fsid_fd = -1;
1133 return inuse;
1134 }
1135
1136 bool FileStore::is_rotational()
1137 {
1138 bool rotational;
1139 if (backend) {
1140 rotational = backend->is_rotational();
1141 } else {
1142 int fd = ::open(basedir.c_str(), O_RDONLY);
1143 if (fd < 0)
1144 return true;
1145 struct statfs st;
1146 int r = ::fstatfs(fd, &st);
1147 ::close(fd);
1148 if (r < 0) {
1149 return true;
1150 }
1151 create_backend(st.f_type);
1152 rotational = backend->is_rotational();
1153 delete backend;
1154 backend = NULL;
1155 }
1156 dout(10) << __func__ << " " << (int)rotational << dendl;
1157 return rotational;
1158 }
1159
1160 bool FileStore::is_journal_rotational()
1161 {
1162 bool journal_rotational;
1163 if (backend) {
1164 journal_rotational = backend->is_journal_rotational();
1165 } else {
1166 int fd = ::open(journalpath.c_str(), O_RDONLY);
1167 if (fd < 0)
1168 return true;
1169 struct statfs st;
1170 int r = ::fstatfs(fd, &st);
1171 ::close(fd);
1172 if (r < 0) {
1173 return true;
1174 }
1175 create_backend(st.f_type);
1176 journal_rotational = backend->is_journal_rotational();
1177 delete backend;
1178 backend = NULL;
1179 }
1180 dout(10) << __func__ << " " << (int)journal_rotational << dendl;
1181 return journal_rotational;
1182 }
1183
1184 int FileStore::_detect_fs()
1185 {
1186 struct statfs st;
1187 int r = ::fstatfs(basedir_fd, &st);
1188 if (r < 0)
1189 return -errno;
1190
1191 blk_size = st.f_bsize;
1192
1193 #if defined(__linux__)
1194 if (st.f_type == BTRFS_SUPER_MAGIC &&
1195 !g_ceph_context->check_experimental_feature_enabled("btrfs")) {
1196 derr <<__FUNC__ << ": deprecated btrfs support is not enabled" << dendl;
1197 return -EPERM;
1198 }
1199 #endif
1200
1201 create_backend(st.f_type);
1202
1203 r = backend->detect_features();
1204 if (r < 0) {
1205 derr << __FUNC__ << ": detect_features error: " << cpp_strerror(r) << dendl;
1206 return r;
1207 }
1208
1209 // test xattrs
1210 char fn[PATH_MAX];
1211 int x = rand();
1212 int y = x+1;
1213 snprintf(fn, sizeof(fn), "%s/xattr_test", basedir.c_str());
1214 int tmpfd = ::open(fn, O_CREAT|O_WRONLY|O_TRUNC, 0700);
1215 if (tmpfd < 0) {
1216 int ret = -errno;
1217 derr << __FUNC__ << ": unable to create " << fn << ": " << cpp_strerror(ret) << dendl;
1218 return ret;
1219 }
1220
1221 int ret = chain_fsetxattr(tmpfd, "user.test", &x, sizeof(x));
1222 if (ret >= 0)
1223 ret = chain_fgetxattr(tmpfd, "user.test", &y, sizeof(y));
1224 if ((ret < 0) || (x != y)) {
1225 derr << "Extended attributes don't appear to work. ";
1226 if (ret)
1227 *_dout << "Got error " + cpp_strerror(ret) + ". ";
1228 *_dout << "If you are using ext3 or ext4, be sure to mount the underlying "
1229 << "file system with the 'user_xattr' option." << dendl;
1230 ::unlink(fn);
1231 VOID_TEMP_FAILURE_RETRY(::close(tmpfd));
1232 return -ENOTSUP;
1233 }
1234
1235 char buf[1000];
1236 memset(buf, 0, sizeof(buf)); // shut up valgrind
1237 chain_fsetxattr(tmpfd, "user.test", &buf, sizeof(buf));
1238 chain_fsetxattr(tmpfd, "user.test2", &buf, sizeof(buf));
1239 chain_fsetxattr(tmpfd, "user.test3", &buf, sizeof(buf));
1240 chain_fsetxattr(tmpfd, "user.test4", &buf, sizeof(buf));
1241 ret = chain_fsetxattr(tmpfd, "user.test5", &buf, sizeof(buf));
1242 if (ret == -ENOSPC) {
1243 dout(0) << "limited size xattrs" << dendl;
1244 }
1245 chain_fremovexattr(tmpfd, "user.test");
1246 chain_fremovexattr(tmpfd, "user.test2");
1247 chain_fremovexattr(tmpfd, "user.test3");
1248 chain_fremovexattr(tmpfd, "user.test4");
1249 chain_fremovexattr(tmpfd, "user.test5");
1250
1251 ::unlink(fn);
1252 VOID_TEMP_FAILURE_RETRY(::close(tmpfd));
1253
1254 return 0;
1255 }
1256
1257 int FileStore::_sanity_check_fs()
1258 {
1259 // sanity check(s)
1260
1261 if (((int)m_filestore_journal_writeahead +
1262 (int)m_filestore_journal_parallel +
1263 (int)m_filestore_journal_trailing) > 1) {
1264 dout(0) << "mount ERROR: more than one of filestore journal {writeahead,parallel,trailing} enabled" << dendl;
1265 cerr << TEXT_RED
1266 << " ** WARNING: more than one of 'filestore journal {writeahead,parallel,trailing}'\n"
1267 << " is enabled in ceph.conf. You must choose a single journal mode."
1268 << TEXT_NORMAL << std::endl;
1269 return -EINVAL;
1270 }
1271
1272 if (!backend->can_checkpoint()) {
1273 if (!journal || !m_filestore_journal_writeahead) {
1274 dout(0) << "mount WARNING: no btrfs, and no journal in writeahead mode; data may be lost" << dendl;
1275 cerr << TEXT_RED
1276 << " ** WARNING: no btrfs AND (no journal OR journal not in writeahead mode)\n"
1277 << " For non-btrfs volumes, a writeahead journal is required to\n"
1278 << " maintain on-disk consistency in the event of a crash. Your conf\n"
1279 << " should include something like:\n"
1280 << " osd journal = /path/to/journal_device_or_file\n"
1281 << " filestore journal writeahead = true\n"
1282 << TEXT_NORMAL;
1283 }
1284 }
1285
1286 if (!journal) {
1287 dout(0) << "mount WARNING: no journal" << dendl;
1288 cerr << TEXT_YELLOW
1289 << " ** WARNING: No osd journal is configured: write latency may be high.\n"
1290 << " If you will not be using an osd journal, write latency may be\n"
1291 << " relatively high. It can be reduced somewhat by lowering\n"
1292 << " filestore_max_sync_interval, but lower values mean lower write\n"
1293 << " throughput, especially with spinning disks.\n"
1294 << TEXT_NORMAL;
1295 }
1296
1297 return 0;
1298 }
1299
1300 int FileStore::write_superblock()
1301 {
1302 bufferlist bl;
1303 ::encode(superblock, bl);
1304 return safe_write_file(basedir.c_str(), "superblock",
1305 bl.c_str(), bl.length());
1306 }
1307
1308 int FileStore::read_superblock()
1309 {
1310 bufferptr bp(PATH_MAX);
1311 int ret = safe_read_file(basedir.c_str(), "superblock",
1312 bp.c_str(), bp.length());
1313 if (ret < 0) {
1314 if (ret == -ENOENT) {
1315 // If the file doesn't exist write initial CompatSet
1316 return write_superblock();
1317 }
1318 return ret;
1319 }
1320
1321 bufferlist bl;
1322 bl.push_back(std::move(bp));
1323 bufferlist::iterator i = bl.begin();
1324 ::decode(superblock, i);
1325 return 0;
1326 }
1327
1328 int FileStore::update_version_stamp()
1329 {
1330 return write_version_stamp();
1331 }
1332
1333 int FileStore::version_stamp_is_valid(uint32_t *version)
1334 {
1335 bufferptr bp(PATH_MAX);
1336 int ret = safe_read_file(basedir.c_str(), "store_version",
1337 bp.c_str(), bp.length());
1338 if (ret < 0) {
1339 return ret;
1340 }
1341 bufferlist bl;
1342 bl.push_back(std::move(bp));
1343 bufferlist::iterator i = bl.begin();
1344 ::decode(*version, i);
1345 dout(10) << __FUNC__ << ": was " << *version << " vs target "
1346 << target_version << dendl;
1347 if (*version == target_version)
1348 return 1;
1349 else
1350 return 0;
1351 }
1352
1353 int FileStore::write_version_stamp()
1354 {
1355 dout(1) << __FUNC__ << ": " << target_version << dendl;
1356 bufferlist bl;
1357 ::encode(target_version, bl);
1358
1359 return safe_write_file(basedir.c_str(), "store_version",
1360 bl.c_str(), bl.length());
1361 }
1362
1363 int FileStore::upgrade()
1364 {
1365 dout(1) << __FUNC__ << dendl;
1366 uint32_t version;
1367 int r = version_stamp_is_valid(&version);
1368
1369 if (r == -ENOENT) {
1370 derr << "The store_version file doesn't exist." << dendl;
1371 return -EINVAL;
1372 }
1373 if (r < 0)
1374 return r;
1375 if (r == 1)
1376 return 0;
1377
1378 if (version < 3) {
1379 derr << "ObjectStore is old at version " << version << ". Please upgrade to firefly v0.80.x, convert your store, and then upgrade." << dendl;
1380 return -EINVAL;
1381 }
1382
1383 // nothing necessary in FileStore for v3 -> v4 upgrade; we just need to
1384 // open up DBObjectMap with the do_upgrade flag, which we already did.
1385 update_version_stamp();
1386 return 0;
1387 }
1388
1389 int FileStore::read_op_seq(uint64_t *seq)
1390 {
1391 int op_fd = ::open(current_op_seq_fn.c_str(), O_CREAT|O_RDWR, 0644);
1392 if (op_fd < 0) {
1393 int r = -errno;
1394 assert(!m_filestore_fail_eio || r != -EIO);
1395 return r;
1396 }
1397 char s[40];
1398 memset(s, 0, sizeof(s));
1399 int ret = safe_read(op_fd, s, sizeof(s) - 1);
1400 if (ret < 0) {
1401 derr << __FUNC__ << ": error reading " << current_op_seq_fn << ": " << cpp_strerror(ret) << dendl;
1402 VOID_TEMP_FAILURE_RETRY(::close(op_fd));
1403 assert(!m_filestore_fail_eio || ret != -EIO);
1404 return ret;
1405 }
1406 *seq = atoll(s);
1407 return op_fd;
1408 }
1409
1410 int FileStore::write_op_seq(int fd, uint64_t seq)
1411 {
1412 char s[30];
1413 snprintf(s, sizeof(s), "%" PRId64 "\n", seq);
1414 int ret = TEMP_FAILURE_RETRY(::pwrite(fd, s, strlen(s), 0));
1415 if (ret < 0) {
1416 ret = -errno;
1417 assert(!m_filestore_fail_eio || ret != -EIO);
1418 }
1419 return ret;
1420 }
1421
1422 int FileStore::mount()
1423 {
1424 int ret;
1425 char buf[PATH_MAX];
1426 uint64_t initial_op_seq;
1427 uuid_d omap_fsid;
1428 set<string> cluster_snaps;
1429 CompatSet supported_compat_set = get_fs_supported_compat_set();
1430
1431 dout(5) << "basedir " << basedir << " journal " << journalpath << dendl;
1432
1433 ret = set_throttle_params();
1434 if (ret != 0)
1435 goto done;
1436
1437 // make sure global base dir exists
1438 if (::access(basedir.c_str(), R_OK | W_OK)) {
1439 ret = -errno;
1440 derr << __FUNC__ << ": unable to access basedir '" << basedir << "': "
1441 << cpp_strerror(ret) << dendl;
1442 goto done;
1443 }
1444
1445 // get fsid
1446 snprintf(buf, sizeof(buf), "%s/fsid", basedir.c_str());
1447 fsid_fd = ::open(buf, O_RDWR, 0644);
1448 if (fsid_fd < 0) {
1449 ret = -errno;
1450 derr << __FUNC__ << ": error opening '" << buf << "': "
1451 << cpp_strerror(ret) << dendl;
1452 goto done;
1453 }
1454
1455 ret = read_fsid(fsid_fd, &fsid);
1456 if (ret < 0) {
1457 derr << __FUNC__ << ": error reading fsid_fd: " << cpp_strerror(ret)
1458 << dendl;
1459 goto close_fsid_fd;
1460 }
1461
1462 if (lock_fsid() < 0) {
1463 derr << __FUNC__ << ": lock_fsid failed" << dendl;
1464 ret = -EBUSY;
1465 goto close_fsid_fd;
1466 }
1467
1468 dout(10) << "mount fsid is " << fsid << dendl;
1469
1470
1471 uint32_t version_stamp;
1472 ret = version_stamp_is_valid(&version_stamp);
1473 if (ret < 0) {
1474 derr << __FUNC__ << ": error in version_stamp_is_valid: "
1475 << cpp_strerror(ret) << dendl;
1476 goto close_fsid_fd;
1477 } else if (ret == 0) {
1478 if (do_update || (int)version_stamp < cct->_conf->filestore_update_to) {
1479 derr << __FUNC__ << ": stale version stamp detected: "
1480 << version_stamp
1481 << ". Proceeding, do_update "
1482 << "is set, performing disk format upgrade."
1483 << dendl;
1484 do_update = true;
1485 } else {
1486 ret = -EINVAL;
1487 derr << __FUNC__ << ": stale version stamp " << version_stamp
1488 << ". Please run the FileStore update script before starting the "
1489 << "OSD, or set filestore_update_to to " << target_version
1490 << " (currently " << cct->_conf->filestore_update_to << ")"
1491 << dendl;
1492 goto close_fsid_fd;
1493 }
1494 }
1495
1496 ret = read_superblock();
1497 if (ret < 0) {
1498 goto close_fsid_fd;
1499 }
1500
1501 // Check if this FileStore supports all the necessary features to mount
1502 if (supported_compat_set.compare(superblock.compat_features) == -1) {
1503 derr << __FUNC__ << ": Incompatible features set "
1504 << superblock.compat_features << dendl;
1505 ret = -EINVAL;
1506 goto close_fsid_fd;
1507 }
1508
1509 // open some dir handles
1510 basedir_fd = ::open(basedir.c_str(), O_RDONLY);
1511 if (basedir_fd < 0) {
1512 ret = -errno;
1513 derr << __FUNC__ << ": failed to open " << basedir << ": "
1514 << cpp_strerror(ret) << dendl;
1515 basedir_fd = -1;
1516 goto close_fsid_fd;
1517 }
1518
1519 // test for btrfs, xattrs, etc.
1520 ret = _detect_fs();
1521 if (ret < 0) {
1522 derr << __FUNC__ << ": error in _detect_fs: "
1523 << cpp_strerror(ret) << dendl;
1524 goto close_basedir_fd;
1525 }
1526
1527 {
1528 list<string> ls;
1529 ret = backend->list_checkpoints(ls);
1530 if (ret < 0) {
1531 derr << __FUNC__ << ": error in _list_snaps: "<< cpp_strerror(ret) << dendl;
1532 goto close_basedir_fd;
1533 }
1534
1535 long long unsigned c, prev = 0;
1536 char clustersnap[NAME_MAX];
1537 for (list<string>::iterator it = ls.begin(); it != ls.end(); ++it) {
1538 if (sscanf(it->c_str(), COMMIT_SNAP_ITEM, &c) == 1) {
1539 assert(c > prev);
1540 prev = c;
1541 snaps.push_back(c);
1542 } else if (sscanf(it->c_str(), CLUSTER_SNAP_ITEM, clustersnap) == 1)
1543 cluster_snaps.insert(*it);
1544 }
1545 }
1546
1547 if (m_osd_rollback_to_cluster_snap.length() &&
1548 cluster_snaps.count(m_osd_rollback_to_cluster_snap) == 0) {
1549 derr << "rollback to cluster snapshot '" << m_osd_rollback_to_cluster_snap << "': not found" << dendl;
1550 ret = -ENOENT;
1551 goto close_basedir_fd;
1552 }
1553
1554 char nosnapfn[200];
1555 snprintf(nosnapfn, sizeof(nosnapfn), "%s/nosnap", current_fn.c_str());
1556
1557 if (backend->can_checkpoint()) {
1558 if (snaps.empty()) {
1559 dout(0) << __FUNC__ << ": WARNING: no consistent snaps found, store may be in inconsistent state" << dendl;
1560 } else {
1561 char s[NAME_MAX];
1562 uint64_t curr_seq = 0;
1563
1564 if (m_osd_rollback_to_cluster_snap.length()) {
1565 derr << TEXT_RED
1566 << " ** NOTE: rolling back to cluster snapshot " << m_osd_rollback_to_cluster_snap << " **"
1567 << TEXT_NORMAL
1568 << dendl;
1569 assert(cluster_snaps.count(m_osd_rollback_to_cluster_snap));
1570 snprintf(s, sizeof(s), CLUSTER_SNAP_ITEM, m_osd_rollback_to_cluster_snap.c_str());
1571 } else {
1572 {
1573 int fd = read_op_seq(&curr_seq);
1574 if (fd >= 0) {
1575 VOID_TEMP_FAILURE_RETRY(::close(fd));
1576 }
1577 }
1578 if (curr_seq)
1579 dout(10) << " current/ seq was " << curr_seq << dendl;
1580 else
1581 dout(10) << " current/ missing entirely (unusual, but okay)" << dendl;
1582
1583 uint64_t cp = snaps.back();
1584 dout(10) << " most recent snap from " << snaps << " is " << cp << dendl;
1585
1586 // if current/ is marked as non-snapshotted, refuse to roll
1587 // back (without clear direction) to avoid throwing out new
1588 // data.
1589 struct stat st;
1590 if (::stat(nosnapfn, &st) == 0) {
1591 if (!m_osd_use_stale_snap) {
1592 derr << "ERROR: " << nosnapfn << " exists, not rolling back to avoid losing new data" << dendl;
1593 derr << "Force rollback to old snapshotted version with 'osd use stale snap = true'" << dendl;
1594 derr << "config option for --osd-use-stale-snap startup argument." << dendl;
1595 ret = -ENOTSUP;
1596 goto close_basedir_fd;
1597 }
1598 derr << "WARNING: user forced start with data sequence mismatch: current was " << curr_seq
1599 << ", newest snap is " << cp << dendl;
1600 cerr << TEXT_YELLOW
1601 << " ** WARNING: forcing the use of stale snapshot data **"
1602 << TEXT_NORMAL << std::endl;
1603 }
1604
1605 dout(10) << __FUNC__ << ": rolling back to consistent snap " << cp << dendl;
1606 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)cp);
1607 }
1608
1609 // drop current?
1610 ret = backend->rollback_to(s);
1611 if (ret) {
1612 derr << __FUNC__ << ": error rolling back to " << s << ": "
1613 << cpp_strerror(ret) << dendl;
1614 goto close_basedir_fd;
1615 }
1616 }
1617 }
1618 initial_op_seq = 0;
1619
1620 current_fd = ::open(current_fn.c_str(), O_RDONLY);
1621 if (current_fd < 0) {
1622 ret = -errno;
1623 derr << __FUNC__ << ": error opening: " << current_fn << ": " << cpp_strerror(ret) << dendl;
1624 goto close_basedir_fd;
1625 }
1626
1627 assert(current_fd >= 0);
1628
1629 op_fd = read_op_seq(&initial_op_seq);
1630 if (op_fd < 0) {
1631 ret = op_fd;
1632 derr << __FUNC__ << ": read_op_seq failed" << dendl;
1633 goto close_current_fd;
1634 }
1635
1636 dout(5) << "mount op_seq is " << initial_op_seq << dendl;
1637 if (initial_op_seq == 0) {
1638 derr << "mount initial op seq is 0; something is wrong" << dendl;
1639 ret = -EINVAL;
1640 goto close_current_fd;
1641 }
1642
1643 if (!backend->can_checkpoint()) {
1644 // mark current/ as non-snapshotted so that we don't rollback away
1645 // from it.
1646 int r = ::creat(nosnapfn, 0644);
1647 if (r < 0) {
1648 ret = -errno;
1649 derr << __FUNC__ << ": failed to create current/nosnap" << dendl;
1650 goto close_current_fd;
1651 }
1652 VOID_TEMP_FAILURE_RETRY(::close(r));
1653 } else {
1654 // clear nosnap marker, if present.
1655 ::unlink(nosnapfn);
1656 }
1657
1658 // check fsid with omap
1659 // get omap fsid
1660 int omap_fsid_fd;
1661 char omap_fsid_buf[PATH_MAX];
1662 struct ::stat omap_fsid_stat;
1663 snprintf(omap_fsid_buf, sizeof(omap_fsid_buf), "%s/osd_uuid", omap_dir.c_str());
1664 // if osd_uuid not exists, assume as this omap matchs corresponding osd
1665 if (::stat(omap_fsid_buf, &omap_fsid_stat) != 0){
1666 dout(10) << __FUNC__ << ": osd_uuid not found under omap, "
1667 << "assume as matched."
1668 << dendl;
1669 }else{
1670 // if osd_uuid exists, compares osd_uuid with fsid
1671 omap_fsid_fd = ::open(omap_fsid_buf, O_RDONLY, 0644);
1672 if (omap_fsid_fd < 0) {
1673 ret = -errno;
1674 derr << __FUNC__ << ": error opening '" << omap_fsid_buf << "': "
1675 << cpp_strerror(ret)
1676 << dendl;
1677 goto close_current_fd;
1678 }
1679 ret = read_fsid(omap_fsid_fd, &omap_fsid);
1680 VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd));
1681 omap_fsid_fd = -1; // defensive
1682 if (ret < 0) {
1683 derr << __FUNC__ << ": error reading omap_fsid_fd"
1684 << ", omap_fsid = " << omap_fsid
1685 << cpp_strerror(ret)
1686 << dendl;
1687 goto close_current_fd;
1688 }
1689 if (fsid != omap_fsid) {
1690 derr << __FUNC__ << ": " << omap_fsid_buf
1691 << " has existed omap fsid " << omap_fsid
1692 << " != expected osd fsid " << fsid
1693 << dendl;
1694 ret = -EINVAL;
1695 goto close_current_fd;
1696 }
1697 }
1698
1699 dout(0) << "start omap initiation" << dendl;
1700 if (!(generic_flags & SKIP_MOUNT_OMAP)) {
1701 KeyValueDB * omap_store = KeyValueDB::create(cct,
1702 superblock.omap_backend,
1703 omap_dir);
1704 if (omap_store == NULL)
1705 {
1706 derr << __FUNC__ << ": Error creating " << superblock.omap_backend << dendl;
1707 ret = -1;
1708 goto close_current_fd;
1709 }
1710
1711 if (superblock.omap_backend == "rocksdb")
1712 ret = omap_store->init(cct->_conf->filestore_rocksdb_options);
1713 else
1714 ret = omap_store->init();
1715
1716 if (ret < 0) {
1717 derr << __FUNC__ << ": Error initializing omap_store: " << cpp_strerror(ret) << dendl;
1718 goto close_current_fd;
1719 }
1720
1721 stringstream err;
1722 if (omap_store->create_and_open(err)) {
1723 delete omap_store;
1724 derr << __FUNC__ << ": Error initializing " << superblock.omap_backend
1725 << " : " << err.str() << dendl;
1726 ret = -1;
1727 goto close_current_fd;
1728 }
1729
1730 DBObjectMap *dbomap = new DBObjectMap(cct, omap_store);
1731 ret = dbomap->init(do_update);
1732 if (ret < 0) {
1733 delete dbomap;
1734 derr << __FUNC__ << ": Error initializing DBObjectMap: " << ret << dendl;
1735 goto close_current_fd;
1736 }
1737 stringstream err2;
1738
1739 if (cct->_conf->filestore_debug_omap_check && !dbomap->check(err2)) {
1740 derr << err2.str() << dendl;
1741 delete dbomap;
1742 ret = -EINVAL;
1743 goto close_current_fd;
1744 }
1745 object_map.reset(dbomap);
1746 }
1747
1748 // journal
1749 new_journal();
1750
1751 // select journal mode?
1752 if (journal) {
1753 if (!m_filestore_journal_writeahead &&
1754 !m_filestore_journal_parallel &&
1755 !m_filestore_journal_trailing) {
1756 if (!backend->can_checkpoint()) {
1757 m_filestore_journal_writeahead = true;
1758 dout(0) << __FUNC__ << ": enabling WRITEAHEAD journal mode: checkpoint is not enabled" << dendl;
1759 } else {
1760 m_filestore_journal_parallel = true;
1761 dout(0) << __FUNC__ << ": enabling PARALLEL journal mode: fs, checkpoint is enabled" << dendl;
1762 }
1763 } else {
1764 if (m_filestore_journal_writeahead)
1765 dout(0) << __FUNC__ << ": WRITEAHEAD journal mode explicitly enabled in conf" << dendl;
1766 if (m_filestore_journal_parallel)
1767 dout(0) << __FUNC__ << ": PARALLEL journal mode explicitly enabled in conf" << dendl;
1768 if (m_filestore_journal_trailing)
1769 dout(0) << __FUNC__ << ": TRAILING journal mode explicitly enabled in conf" << dendl;
1770 }
1771 if (m_filestore_journal_writeahead)
1772 journal->set_wait_on_full(true);
1773 } else {
1774 dout(0) << __FUNC__ << ": no journal" << dendl;
1775 }
1776
1777 ret = _sanity_check_fs();
1778 if (ret) {
1779 derr << __FUNC__ << ": _sanity_check_fs failed with error "
1780 << ret << dendl;
1781 goto close_current_fd;
1782 }
1783
1784 // Cleanup possibly invalid collections
1785 {
1786 vector<coll_t> collections;
1787 ret = list_collections(collections, true);
1788 if (ret < 0) {
1789 derr << "Error " << ret << " while listing collections" << dendl;
1790 goto close_current_fd;
1791 }
1792 for (vector<coll_t>::iterator i = collections.begin();
1793 i != collections.end();
1794 ++i) {
1795 Index index;
1796 ret = get_index(*i, &index);
1797 if (ret < 0) {
1798 derr << "Unable to mount index " << *i
1799 << " with error: " << ret << dendl;
1800 goto close_current_fd;
1801 }
1802 assert(NULL != index.index);
1803 RWLock::WLocker l((index.index)->access_lock);
1804
1805 index->cleanup();
1806 }
1807 }
1808 if (!m_disable_wbthrottle) {
1809 wbthrottle.start();
1810 } else {
1811 dout(0) << __FUNC__ << ": INFO: WbThrottle is disabled" << dendl;
1812 if (cct->_conf->filestore_odsync_write) {
1813 dout(0) << __FUNC__ << ": INFO: O_DSYNC write is enabled" << dendl;
1814 }
1815 }
1816 sync_thread.create("filestore_sync");
1817
1818 if (!(generic_flags & SKIP_JOURNAL_REPLAY)) {
1819 ret = journal_replay(initial_op_seq);
1820 if (ret < 0) {
1821 derr << __FUNC__ << ": failed to open journal " << journalpath << ": " << cpp_strerror(ret) << dendl;
1822 if (ret == -ENOTTY) {
1823 derr << "maybe journal is not pointing to a block device and its size "
1824 << "wasn't configured?" << dendl;
1825 }
1826
1827 goto stop_sync;
1828 }
1829 }
1830
1831 {
1832 stringstream err2;
1833 if (cct->_conf->filestore_debug_omap_check && !object_map->check(err2)) {
1834 derr << err2.str() << dendl;
1835 ret = -EINVAL;
1836 goto stop_sync;
1837 }
1838 }
1839
1840 init_temp_collections();
1841
1842 journal_start();
1843
1844 op_tp.start();
1845 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
1846 (*it)->start();
1847 }
1848 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
1849 (*it)->start();
1850 }
1851
1852 timer.init();
1853
1854 // upgrade?
1855 if (cct->_conf->filestore_update_to >= (int)get_target_version()) {
1856 int err = upgrade();
1857 if (err < 0) {
1858 derr << "error converting store" << dendl;
1859 umount();
1860 return err;
1861 }
1862 }
1863
1864 // all okay.
1865 return 0;
1866
1867 stop_sync:
1868 // stop sync thread
1869 lock.Lock();
1870 stop = true;
1871 sync_cond.Signal();
1872 lock.Unlock();
1873 sync_thread.join();
1874 if (!m_disable_wbthrottle) {
1875 wbthrottle.stop();
1876 }
1877 close_current_fd:
1878 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
1879 current_fd = -1;
1880 close_basedir_fd:
1881 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1882 basedir_fd = -1;
1883 close_fsid_fd:
1884 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1885 fsid_fd = -1;
1886 done:
1887 assert(!m_filestore_fail_eio || ret != -EIO);
1888 delete backend;
1889 backend = NULL;
1890 object_map.reset();
1891 return ret;
1892 }
1893
1894 void FileStore::init_temp_collections()
1895 {
1896 dout(10) << __FUNC__ << dendl;
1897 vector<coll_t> ls;
1898 int r = list_collections(ls, true);
1899 assert(r >= 0);
1900
1901 dout(20) << " ls " << ls << dendl;
1902
1903 SequencerPosition spos;
1904
1905 set<coll_t> temps;
1906 for (vector<coll_t>::iterator p = ls.begin(); p != ls.end(); ++p)
1907 if (p->is_temp())
1908 temps.insert(*p);
1909 dout(20) << " temps " << temps << dendl;
1910
1911 for (vector<coll_t>::iterator p = ls.begin(); p != ls.end(); ++p) {
1912 if (p->is_temp())
1913 continue;
1914 if (p->is_meta())
1915 continue;
1916 coll_t temp = p->get_temp();
1917 if (temps.count(temp)) {
1918 temps.erase(temp);
1919 } else {
1920 dout(10) << __FUNC__ << ": creating " << temp << dendl;
1921 r = _create_collection(temp, 0, spos);
1922 assert(r == 0);
1923 }
1924 }
1925
1926 for (set<coll_t>::iterator p = temps.begin(); p != temps.end(); ++p) {
1927 dout(10) << __FUNC__ << ": removing stray " << *p << dendl;
1928 r = _collection_remove_recursive(*p, spos);
1929 assert(r == 0);
1930 }
1931 }
1932
1933 int FileStore::umount()
1934 {
1935 dout(5) << __FUNC__ << ": " << basedir << dendl;
1936
1937 flush();
1938 sync();
1939 do_force_sync();
1940
1941 lock.Lock();
1942 stop = true;
1943 sync_cond.Signal();
1944 lock.Unlock();
1945 sync_thread.join();
1946 if (!m_disable_wbthrottle){
1947 wbthrottle.stop();
1948 }
1949 op_tp.stop();
1950
1951 journal_stop();
1952 if (!(generic_flags & SKIP_JOURNAL_REPLAY))
1953 journal_write_close();
1954
1955 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
1956 (*it)->stop();
1957 }
1958 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
1959 (*it)->stop();
1960 }
1961
1962 if (fsid_fd >= 0) {
1963 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1964 fsid_fd = -1;
1965 }
1966 if (op_fd >= 0) {
1967 VOID_TEMP_FAILURE_RETRY(::close(op_fd));
1968 op_fd = -1;
1969 }
1970 if (current_fd >= 0) {
1971 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
1972 current_fd = -1;
1973 }
1974 if (basedir_fd >= 0) {
1975 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1976 basedir_fd = -1;
1977 }
1978
1979 force_sync = false;
1980
1981 delete backend;
1982 backend = NULL;
1983
1984 object_map.reset();
1985
1986 {
1987 Mutex::Locker l(sync_entry_timeo_lock);
1988 timer.shutdown();
1989 }
1990
1991 // nothing
1992 return 0;
1993 }
1994
1995
1996
1997
1998 /// -----------------------------
1999
2000 FileStore::Op *FileStore::build_op(vector<Transaction>& tls,
2001 Context *onreadable,
2002 Context *onreadable_sync,
2003 TrackedOpRef osd_op)
2004 {
2005 uint64_t bytes = 0, ops = 0;
2006 for (vector<Transaction>::iterator p = tls.begin();
2007 p != tls.end();
2008 ++p) {
2009 bytes += (*p).get_num_bytes();
2010 ops += (*p).get_num_ops();
2011 }
2012
2013 Op *o = new Op;
2014 o->start = ceph_clock_now();
2015 o->tls = std::move(tls);
2016 o->onreadable = onreadable;
2017 o->onreadable_sync = onreadable_sync;
2018 o->ops = ops;
2019 o->bytes = bytes;
2020 o->osd_op = osd_op;
2021 return o;
2022 }
2023
2024
2025
2026 void FileStore::queue_op(OpSequencer *osr, Op *o)
2027 {
2028 // queue op on sequencer, then queue sequencer for the threadpool,
2029 // so that regardless of which order the threads pick up the
2030 // sequencer, the op order will be preserved.
2031
2032 osr->queue(o);
2033 o->trace.event("queued");
2034
2035 logger->inc(l_filestore_ops);
2036 logger->inc(l_filestore_bytes, o->bytes);
2037
2038 dout(5) << __FUNC__ << ": " << o << " seq " << o->op
2039 << " " << *osr
2040 << " " << o->bytes << " bytes"
2041 << " (queue has " << throttle_ops.get_current() << " ops and " << throttle_bytes.get_current() << " bytes)"
2042 << dendl;
2043 op_wq.queue(osr);
2044 }
2045
2046 void FileStore::op_queue_reserve_throttle(Op *o)
2047 {
2048 throttle_ops.get();
2049 throttle_bytes.get(o->bytes);
2050
2051 logger->set(l_filestore_op_queue_ops, throttle_ops.get_current());
2052 logger->set(l_filestore_op_queue_bytes, throttle_bytes.get_current());
2053 }
2054
2055 void FileStore::op_queue_release_throttle(Op *o)
2056 {
2057 throttle_ops.put();
2058 throttle_bytes.put(o->bytes);
2059 logger->set(l_filestore_op_queue_ops, throttle_ops.get_current());
2060 logger->set(l_filestore_op_queue_bytes, throttle_bytes.get_current());
2061 }
2062
2063 void FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)
2064 {
2065 if (!m_disable_wbthrottle) {
2066 wbthrottle.throttle();
2067 }
2068 // inject a stall?
2069 if (cct->_conf->filestore_inject_stall) {
2070 int orig = cct->_conf->filestore_inject_stall;
2071 dout(5) << __FUNC__ << ": filestore_inject_stall " << orig << ", sleeping" << dendl;
2072 sleep(orig);
2073 cct->_conf->set_val("filestore_inject_stall", "0");
2074 dout(5) << __FUNC__ << ": done stalling" << dendl;
2075 }
2076
2077 osr->apply_lock.Lock();
2078 Op *o = osr->peek_queue();
2079 o->trace.event("op_apply_start");
2080 apply_manager.op_apply_start(o->op);
2081 dout(5) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " start" << dendl;
2082 o->trace.event("_do_transactions start");
2083 int r = _do_transactions(o->tls, o->op, &handle);
2084 o->trace.event("op_apply_finish");
2085 apply_manager.op_apply_finish(o->op);
2086 dout(10) << __FUNC__ << ": " << o << " seq " << o->op << " r = " << r
2087 << ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl;
2088
2089 o->tls.clear();
2090
2091 }
2092
2093 void FileStore::_finish_op(OpSequencer *osr)
2094 {
2095 list<Context*> to_queue;
2096 Op *o = osr->dequeue(&to_queue);
2097
2098 utime_t lat = ceph_clock_now();
2099 lat -= o->start;
2100
2101 dout(10) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " lat " << lat << dendl;
2102 osr->apply_lock.Unlock(); // locked in _do_op
2103 o->trace.event("_finish_op");
2104
2105 // called with tp lock held
2106 op_queue_release_throttle(o);
2107
2108 logger->tinc(l_filestore_apply_latency, lat);
2109
2110 if (o->onreadable_sync) {
2111 o->onreadable_sync->complete(0);
2112 }
2113 if (o->onreadable) {
2114 apply_finishers[osr->id % m_apply_finisher_num]->queue(o->onreadable);
2115 }
2116 if (!to_queue.empty()) {
2117 apply_finishers[osr->id % m_apply_finisher_num]->queue(to_queue);
2118 }
2119 delete o;
2120 }
2121
2122
2123 struct C_JournaledAhead : public Context {
2124 FileStore *fs;
2125 FileStore::OpSequencer *osr;
2126 FileStore::Op *o;
2127 Context *ondisk;
2128
2129 C_JournaledAhead(FileStore *f, FileStore::OpSequencer *os, FileStore::Op *o, Context *ondisk):
2130 fs(f), osr(os), o(o), ondisk(ondisk) { }
2131 void finish(int r) override {
2132 fs->_journaled_ahead(osr, o, ondisk);
2133 }
2134 };
2135
2136 int FileStore::queue_transactions(Sequencer *posr, vector<Transaction>& tls,
2137 TrackedOpRef osd_op,
2138 ThreadPool::TPHandle *handle)
2139 {
2140 Context *onreadable;
2141 Context *ondisk;
2142 Context *onreadable_sync;
2143 ObjectStore::Transaction::collect_contexts(
2144 tls, &onreadable, &ondisk, &onreadable_sync);
2145
2146 if (cct->_conf->objectstore_blackhole) {
2147 dout(0) << __FUNC__ << ": objectstore_blackhole = TRUE, dropping transaction"
2148 << dendl;
2149 delete ondisk;
2150 delete onreadable;
2151 delete onreadable_sync;
2152 return 0;
2153 }
2154
2155 utime_t start = ceph_clock_now();
2156 // set up the sequencer
2157 OpSequencer *osr;
2158 assert(posr);
2159 if (posr->p) {
2160 osr = static_cast<OpSequencer *>(posr->p.get());
2161 dout(5) << __FUNC__ << ": existing " << osr << " " << *osr << dendl;
2162 } else {
2163 osr = new OpSequencer(cct, ++next_osr_id);
2164 osr->set_cct(cct);
2165 osr->parent = posr;
2166 posr->p = osr;
2167 dout(5) << __FUNC__ << ": new " << osr << " " << *osr << dendl;
2168 }
2169
2170 // used to include osr information in tracepoints during transaction apply
2171 for (vector<Transaction>::iterator i = tls.begin(); i != tls.end(); ++i) {
2172 (*i).set_osr(osr);
2173 }
2174
2175 ZTracer::Trace trace;
2176 if (osd_op && osd_op->pg_trace) {
2177 osd_op->store_trace.init("filestore op", &trace_endpoint, &osd_op->pg_trace);
2178 trace = osd_op->store_trace;
2179 }
2180
2181 if (journal && journal->is_writeable() && !m_filestore_journal_trailing) {
2182 Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
2183
2184 //prepare and encode transactions data out of lock
2185 bufferlist tbl;
2186 int orig_len = journal->prepare_entry(o->tls, &tbl);
2187
2188 if (handle)
2189 handle->suspend_tp_timeout();
2190
2191 op_queue_reserve_throttle(o);
2192 journal->reserve_throttle_and_backoff(tbl.length());
2193
2194 if (handle)
2195 handle->reset_tp_timeout();
2196
2197 uint64_t op_num = submit_manager.op_submit_start();
2198 o->op = op_num;
2199 trace.keyval("opnum", op_num);
2200
2201 if (m_filestore_do_dump)
2202 dump_transactions(o->tls, o->op, osr);
2203
2204 if (m_filestore_journal_parallel) {
2205 dout(5) << __FUNC__ << ": (parallel) " << o->op << " " << o->tls << dendl;
2206
2207 trace.keyval("journal mode", "parallel");
2208 trace.event("journal started");
2209 _op_journal_transactions(tbl, orig_len, o->op, ondisk, osd_op);
2210
2211 // queue inside submit_manager op submission lock
2212 queue_op(osr, o);
2213 trace.event("op queued");
2214 } else if (m_filestore_journal_writeahead) {
2215 dout(5) << __FUNC__ << ": (writeahead) " << o->op << " " << o->tls << dendl;
2216
2217 osr->queue_journal(o->op);
2218
2219 trace.keyval("journal mode", "writeahead");
2220 trace.event("journal started");
2221 _op_journal_transactions(tbl, orig_len, o->op,
2222 new C_JournaledAhead(this, osr, o, ondisk),
2223 osd_op);
2224 } else {
2225 ceph_abort();
2226 }
2227 submit_manager.op_submit_finish(op_num);
2228 utime_t end = ceph_clock_now();
2229 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2230 return 0;
2231 }
2232
2233 if (!journal) {
2234 Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
2235 dout(5) << __FUNC__ << ": (no journal) " << o << " " << tls << dendl;
2236
2237 if (handle)
2238 handle->suspend_tp_timeout();
2239
2240 op_queue_reserve_throttle(o);
2241
2242 if (handle)
2243 handle->reset_tp_timeout();
2244
2245 uint64_t op_num = submit_manager.op_submit_start();
2246 o->op = op_num;
2247
2248 if (m_filestore_do_dump)
2249 dump_transactions(o->tls, o->op, osr);
2250
2251 queue_op(osr, o);
2252 trace.keyval("opnum", op_num);
2253 trace.keyval("journal mode", "none");
2254 trace.event("op queued");
2255
2256 if (ondisk)
2257 apply_manager.add_waiter(op_num, ondisk);
2258 submit_manager.op_submit_finish(op_num);
2259 utime_t end = ceph_clock_now();
2260 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2261 return 0;
2262 }
2263
2264 assert(journal);
2265 //prepare and encode transactions data out of lock
2266 bufferlist tbl;
2267 int orig_len = -1;
2268 if (journal->is_writeable()) {
2269 orig_len = journal->prepare_entry(tls, &tbl);
2270 }
2271 uint64_t op = submit_manager.op_submit_start();
2272 dout(5) << __FUNC__ << ": (trailing journal) " << op << " " << tls << dendl;
2273
2274 if (m_filestore_do_dump)
2275 dump_transactions(tls, op, osr);
2276
2277 trace.event("op_apply_start");
2278 trace.keyval("opnum", op);
2279 trace.keyval("journal mode", "trailing");
2280 apply_manager.op_apply_start(op);
2281 trace.event("do_transactions");
2282 int r = do_transactions(tls, op);
2283
2284 if (r >= 0) {
2285 trace.event("journal started");
2286 _op_journal_transactions(tbl, orig_len, op, ondisk, osd_op);
2287 } else {
2288 delete ondisk;
2289 }
2290
2291 // start on_readable finisher after we queue journal item, as on_readable callback
2292 // is allowed to delete the Transaction
2293 if (onreadable_sync) {
2294 onreadable_sync->complete(r);
2295 }
2296 apply_finishers[osr->id % m_apply_finisher_num]->queue(onreadable, r);
2297
2298 submit_manager.op_submit_finish(op);
2299 trace.event("op_apply_finish");
2300 apply_manager.op_apply_finish(op);
2301
2302 utime_t end = ceph_clock_now();
2303 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2304 return r;
2305 }
2306
2307 void FileStore::_journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk)
2308 {
2309 dout(5) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << " " << o->tls << dendl;
2310
2311 o->trace.event("writeahead journal finished");
2312
2313 // this should queue in order because the journal does it's completions in order.
2314 queue_op(osr, o);
2315
2316 list<Context*> to_queue;
2317 osr->dequeue_journal(&to_queue);
2318
2319 // do ondisk completions async, to prevent any onreadable_sync completions
2320 // getting blocked behind an ondisk completion.
2321 if (ondisk) {
2322 dout(10) << " queueing ondisk " << ondisk << dendl;
2323 ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(ondisk);
2324 }
2325 if (!to_queue.empty()) {
2326 ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(to_queue);
2327 }
2328 }
2329
2330 int FileStore::_do_transactions(
2331 vector<Transaction> &tls,
2332 uint64_t op_seq,
2333 ThreadPool::TPHandle *handle)
2334 {
2335 int trans_num = 0;
2336
2337 for (vector<Transaction>::iterator p = tls.begin();
2338 p != tls.end();
2339 ++p, trans_num++) {
2340 _do_transaction(*p, op_seq, trans_num, handle);
2341 if (handle)
2342 handle->reset_tp_timeout();
2343 }
2344
2345 return 0;
2346 }
2347
2348 void FileStore::_set_global_replay_guard(const coll_t& cid,
2349 const SequencerPosition &spos)
2350 {
2351 if (backend->can_checkpoint())
2352 return;
2353
2354 // sync all previous operations on this sequencer
2355 int ret = object_map->sync();
2356 if (ret < 0) {
2357 derr << __FUNC__ << ": omap sync error " << cpp_strerror(ret) << dendl;
2358 assert(0 == "_set_global_replay_guard failed");
2359 }
2360 ret = sync_filesystem(basedir_fd);
2361 if (ret < 0) {
2362 derr << __FUNC__ << ": sync_filesystem error " << cpp_strerror(ret) << dendl;
2363 assert(0 == "_set_global_replay_guard failed");
2364 }
2365
2366 char fn[PATH_MAX];
2367 get_cdir(cid, fn, sizeof(fn));
2368 int fd = ::open(fn, O_RDONLY);
2369 if (fd < 0) {
2370 int err = errno;
2371 derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
2372 assert(0 == "_set_global_replay_guard failed");
2373 }
2374
2375 _inject_failure();
2376
2377 // then record that we did it
2378 bufferlist v;
2379 ::encode(spos, v);
2380 int r = chain_fsetxattr<true, true>(
2381 fd, GLOBAL_REPLAY_GUARD_XATTR, v.c_str(), v.length());
2382 if (r < 0) {
2383 derr << __FUNC__ << ": fsetxattr " << GLOBAL_REPLAY_GUARD_XATTR
2384 << " got " << cpp_strerror(r) << dendl;
2385 assert(0 == "fsetxattr failed");
2386 }
2387
2388 // and make sure our xattr is durable.
2389 ::fsync(fd);
2390
2391 _inject_failure();
2392
2393 VOID_TEMP_FAILURE_RETRY(::close(fd));
2394 dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
2395 }
2396
2397 int FileStore::_check_global_replay_guard(const coll_t& cid,
2398 const SequencerPosition& spos)
2399 {
2400 char fn[PATH_MAX];
2401 get_cdir(cid, fn, sizeof(fn));
2402 int fd = ::open(fn, O_RDONLY);
2403 if (fd < 0) {
2404 dout(10) << __FUNC__ << ": " << cid << " dne" << dendl;
2405 return 1; // if collection does not exist, there is no guard, and we can replay.
2406 }
2407
2408 char buf[100];
2409 int r = chain_fgetxattr(fd, GLOBAL_REPLAY_GUARD_XATTR, buf, sizeof(buf));
2410 if (r < 0) {
2411 dout(20) << __FUNC__ << ": no xattr" << dendl;
2412 assert(!m_filestore_fail_eio || r != -EIO);
2413 VOID_TEMP_FAILURE_RETRY(::close(fd));
2414 return 1; // no xattr
2415 }
2416 bufferlist bl;
2417 bl.append(buf, r);
2418
2419 SequencerPosition opos;
2420 bufferlist::iterator p = bl.begin();
2421 ::decode(opos, p);
2422
2423 VOID_TEMP_FAILURE_RETRY(::close(fd));
2424 return spos >= opos ? 1 : -1;
2425 }
2426
2427
2428 void FileStore::_set_replay_guard(const coll_t& cid,
2429 const SequencerPosition &spos,
2430 bool in_progress=false)
2431 {
2432 char fn[PATH_MAX];
2433 get_cdir(cid, fn, sizeof(fn));
2434 int fd = ::open(fn, O_RDONLY);
2435 if (fd < 0) {
2436 int err = errno;
2437 derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
2438 assert(0 == "_set_replay_guard failed");
2439 }
2440 _set_replay_guard(fd, spos, 0, in_progress);
2441 VOID_TEMP_FAILURE_RETRY(::close(fd));
2442 }
2443
2444
2445 void FileStore::_set_replay_guard(int fd,
2446 const SequencerPosition& spos,
2447 const ghobject_t *hoid,
2448 bool in_progress)
2449 {
2450 if (backend->can_checkpoint())
2451 return;
2452
2453 dout(10) << __FUNC__ << ": " << spos << (in_progress ? " START" : "") << dendl;
2454
2455 _inject_failure();
2456
2457 // first make sure the previous operation commits
2458 ::fsync(fd);
2459
2460 if (!in_progress) {
2461 // sync object_map too. even if this object has a header or keys,
2462 // it have had them in the past and then removed them, so always
2463 // sync.
2464 object_map->sync(hoid, &spos);
2465 }
2466
2467 _inject_failure();
2468
2469 // then record that we did it
2470 bufferlist v(40);
2471 ::encode(spos, v);
2472 ::encode(in_progress, v);
2473 int r = chain_fsetxattr<true, true>(
2474 fd, REPLAY_GUARD_XATTR, v.c_str(), v.length());
2475 if (r < 0) {
2476 derr << "fsetxattr " << REPLAY_GUARD_XATTR << " got " << cpp_strerror(r) << dendl;
2477 assert(0 == "fsetxattr failed");
2478 }
2479
2480 // and make sure our xattr is durable.
2481 ::fsync(fd);
2482
2483 _inject_failure();
2484
2485 dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
2486 }
2487
2488 void FileStore::_close_replay_guard(const coll_t& cid,
2489 const SequencerPosition &spos)
2490 {
2491 char fn[PATH_MAX];
2492 get_cdir(cid, fn, sizeof(fn));
2493 int fd = ::open(fn, O_RDONLY);
2494 if (fd < 0) {
2495 int err = errno;
2496 derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
2497 assert(0 == "_close_replay_guard failed");
2498 }
2499 _close_replay_guard(fd, spos);
2500 VOID_TEMP_FAILURE_RETRY(::close(fd));
2501 }
2502
2503 void FileStore::_close_replay_guard(int fd, const SequencerPosition& spos,
2504 const ghobject_t *hoid)
2505 {
2506 if (backend->can_checkpoint())
2507 return;
2508
2509 dout(10) << __FUNC__ << ": " << spos << dendl;
2510
2511 _inject_failure();
2512
2513 // sync object_map too. even if this object has a header or keys,
2514 // it have had them in the past and then removed them, so always
2515 // sync.
2516 object_map->sync(hoid, &spos);
2517
2518 // then record that we are done with this operation
2519 bufferlist v(40);
2520 ::encode(spos, v);
2521 bool in_progress = false;
2522 ::encode(in_progress, v);
2523 int r = chain_fsetxattr<true, true>(
2524 fd, REPLAY_GUARD_XATTR, v.c_str(), v.length());
2525 if (r < 0) {
2526 derr << "fsetxattr " << REPLAY_GUARD_XATTR << " got " << cpp_strerror(r) << dendl;
2527 assert(0 == "fsetxattr failed");
2528 }
2529
2530 // and make sure our xattr is durable.
2531 ::fsync(fd);
2532
2533 _inject_failure();
2534
2535 dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
2536 }
2537
2538 int FileStore::_check_replay_guard(const coll_t& cid, const ghobject_t &oid,
2539 const SequencerPosition& spos)
2540 {
2541 if (!replaying || backend->can_checkpoint())
2542 return 1;
2543
2544 int r = _check_global_replay_guard(cid, spos);
2545 if (r < 0)
2546 return r;
2547
2548 FDRef fd;
2549 r = lfn_open(cid, oid, false, &fd);
2550 if (r < 0) {
2551 dout(10) << __FUNC__ << ": " << cid << " " << oid << " dne" << dendl;
2552 return 1; // if file does not exist, there is no guard, and we can replay.
2553 }
2554 int ret = _check_replay_guard(**fd, spos);
2555 lfn_close(fd);
2556 return ret;
2557 }
2558
2559 int FileStore::_check_replay_guard(const coll_t& cid, const SequencerPosition& spos)
2560 {
2561 if (!replaying || backend->can_checkpoint())
2562 return 1;
2563
2564 char fn[PATH_MAX];
2565 get_cdir(cid, fn, sizeof(fn));
2566 int fd = ::open(fn, O_RDONLY);
2567 if (fd < 0) {
2568 dout(10) << __FUNC__ << ": " << cid << " dne" << dendl;
2569 return 1; // if collection does not exist, there is no guard, and we can replay.
2570 }
2571 int ret = _check_replay_guard(fd, spos);
2572 VOID_TEMP_FAILURE_RETRY(::close(fd));
2573 return ret;
2574 }
2575
2576 int FileStore::_check_replay_guard(int fd, const SequencerPosition& spos)
2577 {
2578 if (!replaying || backend->can_checkpoint())
2579 return 1;
2580
2581 char buf[100];
2582 int r = chain_fgetxattr(fd, REPLAY_GUARD_XATTR, buf, sizeof(buf));
2583 if (r < 0) {
2584 dout(20) << __FUNC__ << ": no xattr" << dendl;
2585 assert(!m_filestore_fail_eio || r != -EIO);
2586 return 1; // no xattr
2587 }
2588 bufferlist bl;
2589 bl.append(buf, r);
2590
2591 SequencerPosition opos;
2592 bufferlist::iterator p = bl.begin();
2593 ::decode(opos, p);
2594 bool in_progress = false;
2595 if (!p.end()) // older journals don't have this
2596 ::decode(in_progress, p);
2597 if (opos > spos) {
2598 dout(10) << __FUNC__ << ": object has " << opos << " > current pos " << spos
2599 << ", now or in future, SKIPPING REPLAY" << dendl;
2600 return -1;
2601 } else if (opos == spos) {
2602 if (in_progress) {
2603 dout(10) << __FUNC__ << ": object has " << opos << " == current pos " << spos
2604 << ", in_progress=true, CONDITIONAL REPLAY" << dendl;
2605 return 0;
2606 } else {
2607 dout(10) << __FUNC__ << ": object has " << opos << " == current pos " << spos
2608 << ", in_progress=false, SKIPPING REPLAY" << dendl;
2609 return -1;
2610 }
2611 } else {
2612 dout(10) << __FUNC__ << ": object has " << opos << " < current pos " << spos
2613 << ", in past, will replay" << dendl;
2614 return 1;
2615 }
2616 }
2617
2618 void FileStore::_do_transaction(
2619 Transaction& t, uint64_t op_seq, int trans_num,
2620 ThreadPool::TPHandle *handle)
2621 {
2622 dout(10) << __FUNC__ << ": on " << &t << dendl;
2623
2624 #ifdef WITH_LTTNG
2625 const char *osr_name = t.get_osr() ? static_cast<OpSequencer*>(t.get_osr())->get_name().c_str() : "<NULL>";
2626 #endif
2627
2628 Transaction::iterator i = t.begin();
2629
2630 SequencerPosition spos(op_seq, trans_num, 0);
2631 while (i.have_op()) {
2632 if (handle)
2633 handle->reset_tp_timeout();
2634
2635 Transaction::Op *op = i.decode_op();
2636 int r = 0;
2637
2638 _inject_failure();
2639
2640 switch (op->op) {
2641 case Transaction::OP_NOP:
2642 break;
2643 case Transaction::OP_TOUCH:
2644 {
2645 const coll_t &_cid = i.get_cid(op->cid);
2646 const ghobject_t &oid = i.get_oid(op->oid);
2647 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2648 _cid : _cid.get_temp();
2649 tracepoint(objectstore, touch_enter, osr_name);
2650 if (_check_replay_guard(cid, oid, spos) > 0)
2651 r = _touch(cid, oid);
2652 tracepoint(objectstore, touch_exit, r);
2653 }
2654 break;
2655
2656 case Transaction::OP_WRITE:
2657 {
2658 const coll_t &_cid = i.get_cid(op->cid);
2659 const ghobject_t &oid = i.get_oid(op->oid);
2660 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2661 _cid : _cid.get_temp();
2662 uint64_t off = op->off;
2663 uint64_t len = op->len;
2664 uint32_t fadvise_flags = i.get_fadvise_flags();
2665 bufferlist bl;
2666 i.decode_bl(bl);
2667 tracepoint(objectstore, write_enter, osr_name, off, len);
2668 if (_check_replay_guard(cid, oid, spos) > 0)
2669 r = _write(cid, oid, off, len, bl, fadvise_flags);
2670 tracepoint(objectstore, write_exit, r);
2671 }
2672 break;
2673
2674 case Transaction::OP_ZERO:
2675 {
2676 const coll_t &_cid = i.get_cid(op->cid);
2677 const ghobject_t &oid = i.get_oid(op->oid);
2678 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2679 _cid : _cid.get_temp();
2680 uint64_t off = op->off;
2681 uint64_t len = op->len;
2682 tracepoint(objectstore, zero_enter, osr_name, off, len);
2683 if (_check_replay_guard(cid, oid, spos) > 0)
2684 r = _zero(cid, oid, off, len);
2685 tracepoint(objectstore, zero_exit, r);
2686 }
2687 break;
2688
2689 case Transaction::OP_TRIMCACHE:
2690 {
2691 // deprecated, no-op
2692 }
2693 break;
2694
2695 case Transaction::OP_TRUNCATE:
2696 {
2697 const coll_t &_cid = i.get_cid(op->cid);
2698 const ghobject_t &oid = i.get_oid(op->oid);
2699 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2700 _cid : _cid.get_temp();
2701 uint64_t off = op->off;
2702 tracepoint(objectstore, truncate_enter, osr_name, off);
2703 if (_check_replay_guard(cid, oid, spos) > 0)
2704 r = _truncate(cid, oid, off);
2705 tracepoint(objectstore, truncate_exit, r);
2706 }
2707 break;
2708
2709 case Transaction::OP_REMOVE:
2710 {
2711 const coll_t &_cid = i.get_cid(op->cid);
2712 const ghobject_t &oid = i.get_oid(op->oid);
2713 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2714 _cid : _cid.get_temp();
2715 tracepoint(objectstore, remove_enter, osr_name);
2716 if (_check_replay_guard(cid, oid, spos) > 0)
2717 r = _remove(cid, oid, spos);
2718 tracepoint(objectstore, remove_exit, r);
2719 }
2720 break;
2721
2722 case Transaction::OP_SETATTR:
2723 {
2724 const coll_t &_cid = i.get_cid(op->cid);
2725 const ghobject_t &oid = i.get_oid(op->oid);
2726 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2727 _cid : _cid.get_temp();
2728 string name = i.decode_string();
2729 bufferlist bl;
2730 i.decode_bl(bl);
2731 tracepoint(objectstore, setattr_enter, osr_name);
2732 if (_check_replay_guard(cid, oid, spos) > 0) {
2733 map<string, bufferptr> to_set;
2734 to_set[name] = bufferptr(bl.c_str(), bl.length());
2735 r = _setattrs(cid, oid, to_set, spos);
2736 if (r == -ENOSPC)
2737 dout(0) << " ENOSPC on setxattr on " << cid << "/" << oid
2738 << " name " << name << " size " << bl.length() << dendl;
2739 }
2740 tracepoint(objectstore, setattr_exit, r);
2741 }
2742 break;
2743
2744 case Transaction::OP_SETATTRS:
2745 {
2746 const coll_t &_cid = i.get_cid(op->cid);
2747 const ghobject_t &oid = i.get_oid(op->oid);
2748 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2749 _cid : _cid.get_temp();
2750 map<string, bufferptr> aset;
2751 i.decode_attrset(aset);
2752 tracepoint(objectstore, setattrs_enter, osr_name);
2753 if (_check_replay_guard(cid, oid, spos) > 0)
2754 r = _setattrs(cid, oid, aset, spos);
2755 tracepoint(objectstore, setattrs_exit, r);
2756 if (r == -ENOSPC)
2757 dout(0) << " ENOSPC on setxattrs on " << cid << "/" << oid << dendl;
2758 }
2759 break;
2760
2761 case Transaction::OP_RMATTR:
2762 {
2763 const coll_t &_cid = i.get_cid(op->cid);
2764 const ghobject_t &oid = i.get_oid(op->oid);
2765 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2766 _cid : _cid.get_temp();
2767 string name = i.decode_string();
2768 tracepoint(objectstore, rmattr_enter, osr_name);
2769 if (_check_replay_guard(cid, oid, spos) > 0)
2770 r = _rmattr(cid, oid, name.c_str(), spos);
2771 tracepoint(objectstore, rmattr_exit, r);
2772 }
2773 break;
2774
2775 case Transaction::OP_RMATTRS:
2776 {
2777 const coll_t &_cid = i.get_cid(op->cid);
2778 const ghobject_t &oid = i.get_oid(op->oid);
2779 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2780 _cid : _cid.get_temp();
2781 tracepoint(objectstore, rmattrs_enter, osr_name);
2782 if (_check_replay_guard(cid, oid, spos) > 0)
2783 r = _rmattrs(cid, oid, spos);
2784 tracepoint(objectstore, rmattrs_exit, r);
2785 }
2786 break;
2787
2788 case Transaction::OP_CLONE:
2789 {
2790 const coll_t &_cid = i.get_cid(op->cid);
2791 const ghobject_t &oid = i.get_oid(op->oid);
2792 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2793 _cid : _cid.get_temp();
2794 const ghobject_t &noid = i.get_oid(op->dest_oid);
2795 tracepoint(objectstore, clone_enter, osr_name);
2796 r = _clone(cid, oid, noid, spos);
2797 tracepoint(objectstore, clone_exit, r);
2798 }
2799 break;
2800
2801 case Transaction::OP_CLONERANGE:
2802 {
2803 const coll_t &_cid = i.get_cid(op->cid);
2804 const ghobject_t &oid = i.get_oid(op->oid);
2805 const ghobject_t &noid = i.get_oid(op->dest_oid);
2806 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2807 _cid : _cid.get_temp();
2808 const coll_t &ncid = !_need_temp_object_collection(_cid, noid) ?
2809 _cid : _cid.get_temp();
2810 uint64_t off = op->off;
2811 uint64_t len = op->len;
2812 tracepoint(objectstore, clone_range_enter, osr_name, len);
2813 r = _clone_range(cid, oid, ncid, noid, off, len, off, spos);
2814 tracepoint(objectstore, clone_range_exit, r);
2815 }
2816 break;
2817
2818 case Transaction::OP_CLONERANGE2:
2819 {
2820 const coll_t &_cid = i.get_cid(op->cid);
2821 const ghobject_t &oid = i.get_oid(op->oid);
2822 const ghobject_t &noid = i.get_oid(op->dest_oid);
2823 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2824 _cid : _cid.get_temp();
2825 const coll_t &ncid = !_need_temp_object_collection(_cid, noid) ?
2826 _cid : _cid.get_temp();
2827 uint64_t srcoff = op->off;
2828 uint64_t len = op->len;
2829 uint64_t dstoff = op->dest_off;
2830 tracepoint(objectstore, clone_range2_enter, osr_name, len);
2831 r = _clone_range(cid, oid, ncid, noid, srcoff, len, dstoff, spos);
2832 tracepoint(objectstore, clone_range2_exit, r);
2833 }
2834 break;
2835
2836 case Transaction::OP_MKCOLL:
2837 {
2838 const coll_t &cid = i.get_cid(op->cid);
2839 tracepoint(objectstore, mkcoll_enter, osr_name);
2840 if (_check_replay_guard(cid, spos) > 0)
2841 r = _create_collection(cid, op->split_bits, spos);
2842 tracepoint(objectstore, mkcoll_exit, r);
2843 }
2844 break;
2845
2846 case Transaction::OP_COLL_SET_BITS:
2847 {
2848 const coll_t &cid = i.get_cid(op->cid);
2849 int bits = op->split_bits;
2850 r = _collection_set_bits(cid, bits);
2851 }
2852 break;
2853
2854 case Transaction::OP_COLL_HINT:
2855 {
2856 const coll_t &cid = i.get_cid(op->cid);
2857 uint32_t type = op->hint_type;
2858 bufferlist hint;
2859 i.decode_bl(hint);
2860 bufferlist::iterator hiter = hint.begin();
2861 if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) {
2862 uint32_t pg_num;
2863 uint64_t num_objs;
2864 ::decode(pg_num, hiter);
2865 ::decode(num_objs, hiter);
2866 if (_check_replay_guard(cid, spos) > 0) {
2867 r = _collection_hint_expected_num_objs(cid, pg_num, num_objs, spos);
2868 }
2869 } else {
2870 // Ignore the hint
2871 dout(10) << "Unrecognized collection hint type: " << type << dendl;
2872 }
2873 }
2874 break;
2875
2876 case Transaction::OP_RMCOLL:
2877 {
2878 const coll_t &cid = i.get_cid(op->cid);
2879 tracepoint(objectstore, rmcoll_enter, osr_name);
2880 if (_check_replay_guard(cid, spos) > 0)
2881 r = _destroy_collection(cid);
2882 tracepoint(objectstore, rmcoll_exit, r);
2883 }
2884 break;
2885
2886 case Transaction::OP_COLL_ADD:
2887 {
2888 const coll_t &ocid = i.get_cid(op->cid);
2889 const coll_t &ncid = i.get_cid(op->dest_cid);
2890 const ghobject_t &oid = i.get_oid(op->oid);
2891
2892 assert(oid.hobj.pool >= -1);
2893
2894 // always followed by OP_COLL_REMOVE
2895 Transaction::Op *op2 = i.decode_op();
2896 const coll_t &ocid2 = i.get_cid(op2->cid);
2897 const ghobject_t &oid2 = i.get_oid(op2->oid);
2898 assert(op2->op == Transaction::OP_COLL_REMOVE);
2899 assert(ocid2 == ocid);
2900 assert(oid2 == oid);
2901
2902 tracepoint(objectstore, coll_add_enter);
2903 r = _collection_add(ncid, ocid, oid, spos);
2904 tracepoint(objectstore, coll_add_exit, r);
2905 spos.op++;
2906 if (r < 0)
2907 break;
2908 tracepoint(objectstore, coll_remove_enter, osr_name);
2909 if (_check_replay_guard(ocid, oid, spos) > 0)
2910 r = _remove(ocid, oid, spos);
2911 tracepoint(objectstore, coll_remove_exit, r);
2912 }
2913 break;
2914
2915 case Transaction::OP_COLL_MOVE:
2916 {
2917 // WARNING: this is deprecated and buggy; only here to replay old journals.
2918 const coll_t &ocid = i.get_cid(op->cid);
2919 const coll_t &ncid = i.get_cid(op->dest_cid);
2920 const ghobject_t &oid = i.get_oid(op->oid);
2921 tracepoint(objectstore, coll_move_enter);
2922 r = _collection_add(ocid, ncid, oid, spos);
2923 if (r == 0 &&
2924 (_check_replay_guard(ocid, oid, spos) > 0))
2925 r = _remove(ocid, oid, spos);
2926 tracepoint(objectstore, coll_move_exit, r);
2927 }
2928 break;
2929
2930 case Transaction::OP_COLL_MOVE_RENAME:
2931 {
2932 const coll_t &_oldcid = i.get_cid(op->cid);
2933 const ghobject_t &oldoid = i.get_oid(op->oid);
2934 const coll_t &_newcid = i.get_cid(op->dest_cid);
2935 const ghobject_t &newoid = i.get_oid(op->dest_oid);
2936 const coll_t &oldcid = !_need_temp_object_collection(_oldcid, oldoid) ?
2937 _oldcid : _oldcid.get_temp();
2938 const coll_t &newcid = !_need_temp_object_collection(_newcid, newoid) ?
2939 _oldcid : _newcid.get_temp();
2940 tracepoint(objectstore, coll_move_rename_enter);
2941 r = _collection_move_rename(oldcid, oldoid, newcid, newoid, spos);
2942 tracepoint(objectstore, coll_move_rename_exit, r);
2943 }
2944 break;
2945
2946 case Transaction::OP_TRY_RENAME:
2947 {
2948 const coll_t &_cid = i.get_cid(op->cid);
2949 const ghobject_t &oldoid = i.get_oid(op->oid);
2950 const ghobject_t &newoid = i.get_oid(op->dest_oid);
2951 const coll_t &oldcid = !_need_temp_object_collection(_cid, oldoid) ?
2952 _cid : _cid.get_temp();
2953 const coll_t &newcid = !_need_temp_object_collection(_cid, newoid) ?
2954 _cid : _cid.get_temp();
2955 tracepoint(objectstore, coll_try_rename_enter);
2956 r = _collection_move_rename(oldcid, oldoid, newcid, newoid, spos, true);
2957 tracepoint(objectstore, coll_try_rename_exit, r);
2958 }
2959 break;
2960
2961 case Transaction::OP_COLL_SETATTR:
2962 case Transaction::OP_COLL_RMATTR:
2963 assert(0 == "collection attr methods no longer implemented");
2964 break;
2965
2966 case Transaction::OP_STARTSYNC:
2967 tracepoint(objectstore, startsync_enter, osr_name);
2968 _start_sync();
2969 tracepoint(objectstore, startsync_exit);
2970 break;
2971
2972 case Transaction::OP_COLL_RENAME:
2973 {
2974 r = -EOPNOTSUPP;
2975 }
2976 break;
2977
2978 case Transaction::OP_OMAP_CLEAR:
2979 {
2980 const coll_t &_cid = i.get_cid(op->cid);
2981 const ghobject_t &oid = i.get_oid(op->oid);
2982 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2983 _cid : _cid.get_temp();
2984 tracepoint(objectstore, omap_clear_enter, osr_name);
2985 r = _omap_clear(cid, oid, spos);
2986 tracepoint(objectstore, omap_clear_exit, r);
2987 }
2988 break;
2989 case Transaction::OP_OMAP_SETKEYS:
2990 {
2991 const coll_t &_cid = i.get_cid(op->cid);
2992 const ghobject_t &oid = i.get_oid(op->oid);
2993 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2994 _cid : _cid.get_temp();
2995 map<string, bufferlist> aset;
2996 i.decode_attrset(aset);
2997 tracepoint(objectstore, omap_setkeys_enter, osr_name);
2998 r = _omap_setkeys(cid, oid, aset, spos);
2999 tracepoint(objectstore, omap_setkeys_exit, r);
3000 }
3001 break;
3002 case Transaction::OP_OMAP_RMKEYS:
3003 {
3004 const coll_t &_cid = i.get_cid(op->cid);
3005 const ghobject_t &oid = i.get_oid(op->oid);
3006 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3007 _cid : _cid.get_temp();
3008 set<string> keys;
3009 i.decode_keyset(keys);
3010 tracepoint(objectstore, omap_rmkeys_enter, osr_name);
3011 r = _omap_rmkeys(cid, oid, keys, spos);
3012 tracepoint(objectstore, omap_rmkeys_exit, r);
3013 }
3014 break;
3015 case Transaction::OP_OMAP_RMKEYRANGE:
3016 {
3017 const coll_t &_cid = i.get_cid(op->cid);
3018 const ghobject_t &oid = i.get_oid(op->oid);
3019 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3020 _cid : _cid.get_temp();
3021 string first, last;
3022 first = i.decode_string();
3023 last = i.decode_string();
3024 tracepoint(objectstore, omap_rmkeyrange_enter, osr_name);
3025 r = _omap_rmkeyrange(cid, oid, first, last, spos);
3026 tracepoint(objectstore, omap_rmkeyrange_exit, r);
3027 }
3028 break;
3029 case Transaction::OP_OMAP_SETHEADER:
3030 {
3031 const coll_t &_cid = i.get_cid(op->cid);
3032 const ghobject_t &oid = i.get_oid(op->oid);
3033 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3034 _cid : _cid.get_temp();
3035 bufferlist bl;
3036 i.decode_bl(bl);
3037 tracepoint(objectstore, omap_setheader_enter, osr_name);
3038 r = _omap_setheader(cid, oid, bl, spos);
3039 tracepoint(objectstore, omap_setheader_exit, r);
3040 }
3041 break;
3042 case Transaction::OP_SPLIT_COLLECTION:
3043 {
3044 assert(0 == "not legacy journal; upgrade to firefly first");
3045 }
3046 break;
3047 case Transaction::OP_SPLIT_COLLECTION2:
3048 {
3049 coll_t cid = i.get_cid(op->cid);
3050 uint32_t bits = op->split_bits;
3051 uint32_t rem = op->split_rem;
3052 coll_t dest = i.get_cid(op->dest_cid);
3053 tracepoint(objectstore, split_coll2_enter, osr_name);
3054 r = _split_collection(cid, bits, rem, dest, spos);
3055 tracepoint(objectstore, split_coll2_exit, r);
3056 }
3057 break;
3058
3059 case Transaction::OP_SETALLOCHINT:
3060 {
3061 const coll_t &_cid = i.get_cid(op->cid);
3062 const ghobject_t &oid = i.get_oid(op->oid);
3063 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3064 _cid : _cid.get_temp();
3065 uint64_t expected_object_size = op->expected_object_size;
3066 uint64_t expected_write_size = op->expected_write_size;
3067 tracepoint(objectstore, setallochint_enter, osr_name);
3068 if (_check_replay_guard(cid, oid, spos) > 0)
3069 r = _set_alloc_hint(cid, oid, expected_object_size,
3070 expected_write_size);
3071 tracepoint(objectstore, setallochint_exit, r);
3072 }
3073 break;
3074
3075 default:
3076 derr << "bad op " << op->op << dendl;
3077 ceph_abort();
3078 }
3079
3080 if (r < 0) {
3081 bool ok = false;
3082
3083 if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE ||
3084 op->op == Transaction::OP_CLONE ||
3085 op->op == Transaction::OP_CLONERANGE2 ||
3086 op->op == Transaction::OP_COLL_ADD ||
3087 op->op == Transaction::OP_SETATTR ||
3088 op->op == Transaction::OP_SETATTRS ||
3089 op->op == Transaction::OP_RMATTR ||
3090 op->op == Transaction::OP_OMAP_SETKEYS ||
3091 op->op == Transaction::OP_OMAP_RMKEYS ||
3092 op->op == Transaction::OP_OMAP_RMKEYRANGE ||
3093 op->op == Transaction::OP_OMAP_SETHEADER))
3094 // -ENOENT is normally okay
3095 // ...including on a replayed OP_RMCOLL with checkpoint mode
3096 ok = true;
3097 if (r == -ENODATA)
3098 ok = true;
3099
3100 if (op->op == Transaction::OP_SETALLOCHINT)
3101 // Either EOPNOTSUPP or EINVAL most probably. EINVAL in most
3102 // cases means invalid hint size (e.g. too big, not a multiple
3103 // of block size, etc) or, at least on xfs, an attempt to set
3104 // or change it when the file is not empty. However,
3105 // OP_SETALLOCHINT is advisory, so ignore all errors.
3106 ok = true;
3107
3108 if (replaying && !backend->can_checkpoint()) {
3109 if (r == -EEXIST && op->op == Transaction::OP_MKCOLL) {
3110 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3111 ok = true;
3112 }
3113 if (r == -EEXIST && op->op == Transaction::OP_COLL_ADD) {
3114 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3115 ok = true;
3116 }
3117 if (r == -EEXIST && op->op == Transaction::OP_COLL_MOVE) {
3118 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3119 ok = true;
3120 }
3121 if (r == -ERANGE) {
3122 dout(10) << "tolerating ERANGE on replay" << dendl;
3123 ok = true;
3124 }
3125 if (r == -ENOENT) {
3126 dout(10) << "tolerating ENOENT on replay" << dendl;
3127 ok = true;
3128 }
3129 }
3130
3131 if (!ok) {
3132 const char *msg = "unexpected error code";
3133
3134 if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE ||
3135 op->op == Transaction::OP_CLONE ||
3136 op->op == Transaction::OP_CLONERANGE2)) {
3137 msg = "ENOENT on clone suggests osd bug";
3138 } else if (r == -ENOSPC) {
3139 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
3140 // by partially applying transactions.
3141 msg = "ENOSPC from disk filesystem, misconfigured cluster";
3142 } else if (r == -ENOTEMPTY) {
3143 msg = "ENOTEMPTY suggests garbage data in osd data dir";
3144 } else if (r == -EPERM) {
3145 msg = "EPERM suggests file(s) in osd data dir not owned by ceph user, or leveldb corruption";
3146 }
3147
3148 derr << " error " << cpp_strerror(r) << " not handled on operation " << op
3149 << " (" << spos << ", or op " << spos.op << ", counting from 0)" << dendl;
3150 dout(0) << msg << dendl;
3151 dout(0) << " transaction dump:\n";
3152 JSONFormatter f(true);
3153 f.open_object_section("transaction");
3154 t.dump(&f);
3155 f.close_section();
3156 f.flush(*_dout);
3157 *_dout << dendl;
3158
3159 if (r == -EMFILE) {
3160 dump_open_fds(cct);
3161 }
3162
3163 assert(0 == "unexpected error");
3164 }
3165 }
3166
3167 spos.op++;
3168 }
3169
3170 _inject_failure();
3171 }
3172
3173 /*********************************************/
3174
3175
3176
3177 // --------------------
3178 // objects
3179
3180 bool FileStore::exists(const coll_t& _cid, const ghobject_t& oid)
3181 {
3182 tracepoint(objectstore, exists_enter, _cid.c_str());
3183 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3184 struct stat st;
3185 bool retval = stat(cid, oid, &st) == 0;
3186 tracepoint(objectstore, exists_exit, retval);
3187 return retval;
3188 }
3189
3190 int FileStore::stat(
3191 const coll_t& _cid, const ghobject_t& oid, struct stat *st, bool allow_eio)
3192 {
3193 tracepoint(objectstore, stat_enter, _cid.c_str());
3194 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3195 int r = lfn_stat(cid, oid, st);
3196 assert(allow_eio || !m_filestore_fail_eio || r != -EIO);
3197 if (r < 0) {
3198 dout(10) << __FUNC__ << ": " << cid << "/" << oid
3199 << " = " << r << dendl;
3200 } else {
3201 dout(10) << __FUNC__ << ": " << cid << "/" << oid
3202 << " = " << r
3203 << " (size " << st->st_size << ")" << dendl;
3204 }
3205 if (cct->_conf->filestore_debug_inject_read_err &&
3206 debug_mdata_eio(oid)) {
3207 return -EIO;
3208 } else {
3209 tracepoint(objectstore, stat_exit, r);
3210 return r;
3211 }
3212 }
3213
3214 int FileStore::set_collection_opts(
3215 const coll_t& cid,
3216 const pool_opts_t& opts)
3217 {
3218 return -EOPNOTSUPP;
3219 }
3220
3221 int FileStore::read(
3222 const coll_t& _cid,
3223 const ghobject_t& oid,
3224 uint64_t offset,
3225 size_t len,
3226 bufferlist& bl,
3227 uint32_t op_flags)
3228 {
3229 int got;
3230 tracepoint(objectstore, read_enter, _cid.c_str(), offset, len);
3231 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3232
3233 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3234
3235 FDRef fd;
3236 int r = lfn_open(cid, oid, false, &fd);
3237 if (r < 0) {
3238 dout(10) << __FUNC__ << ": (" << cid << "/" << oid << ") open error: "
3239 << cpp_strerror(r) << dendl;
3240 return r;
3241 }
3242
3243 if (offset == 0 && len == 0) {
3244 struct stat st;
3245 memset(&st, 0, sizeof(struct stat));
3246 int r = ::fstat(**fd, &st);
3247 assert(r == 0);
3248 len = st.st_size;
3249 }
3250
3251 #ifdef HAVE_POSIX_FADVISE
3252 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_RANDOM)
3253 posix_fadvise(**fd, offset, len, POSIX_FADV_RANDOM);
3254 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL)
3255 posix_fadvise(**fd, offset, len, POSIX_FADV_SEQUENTIAL);
3256 #endif
3257
3258 bufferptr bptr(len); // prealloc space for entire read
3259 got = safe_pread(**fd, bptr.c_str(), len, offset);
3260 if (got < 0) {
3261 dout(10) << __FUNC__ << ": (" << cid << "/" << oid << ") pread error: " << cpp_strerror(got) << dendl;
3262 lfn_close(fd);
3263 return got;
3264 }
3265 bptr.set_length(got); // properly size the buffer
3266 bl.clear();
3267 bl.push_back(std::move(bptr)); // put it in the target bufferlist
3268
3269 #ifdef HAVE_POSIX_FADVISE
3270 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED)
3271 posix_fadvise(**fd, offset, len, POSIX_FADV_DONTNEED);
3272 if (op_flags & (CEPH_OSD_OP_FLAG_FADVISE_RANDOM | CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL))
3273 posix_fadvise(**fd, offset, len, POSIX_FADV_NORMAL);
3274 #endif
3275
3276 if (m_filestore_sloppy_crc && (!replaying || backend->can_checkpoint())) {
3277 ostringstream ss;
3278 int errors = backend->_crc_verify_read(**fd, offset, got, bl, &ss);
3279 if (errors != 0) {
3280 dout(0) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~"
3281 << got << " ... BAD CRC:\n" << ss.str() << dendl;
3282 assert(0 == "bad crc on read");
3283 }
3284 }
3285
3286 lfn_close(fd);
3287
3288 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~"
3289 << got << "/" << len << dendl;
3290 if (cct->_conf->filestore_debug_inject_read_err &&
3291 debug_data_eio(oid)) {
3292 return -EIO;
3293 } else if (cct->_conf->filestore_debug_random_read_err &&
3294 (rand() % (int)(cct->_conf->filestore_debug_random_read_err * 100.0)) == 0) {
3295 dout(0) << __func__ << ": inject random EIO" << dendl;
3296 return -EIO;
3297 } else {
3298 tracepoint(objectstore, read_exit, got);
3299 return got;
3300 }
3301 }
3302
3303 int FileStore::_do_fiemap(int fd, uint64_t offset, size_t len,
3304 map<uint64_t, uint64_t> *m)
3305 {
3306 uint64_t i;
3307 struct fiemap_extent *extent = NULL;
3308 struct fiemap *fiemap = NULL;
3309 int r = 0;
3310
3311 more:
3312 r = backend->do_fiemap(fd, offset, len, &fiemap);
3313 if (r < 0)
3314 return r;
3315
3316 if (fiemap->fm_mapped_extents == 0) {
3317 free(fiemap);
3318 return r;
3319 }
3320
3321 extent = &fiemap->fm_extents[0];
3322
3323 /* start where we were asked to start */
3324 if (extent->fe_logical < offset) {
3325 extent->fe_length -= offset - extent->fe_logical;
3326 extent->fe_logical = offset;
3327 }
3328
3329 i = 0;
3330
3331 struct fiemap_extent *last = nullptr;
3332 while (i < fiemap->fm_mapped_extents) {
3333 struct fiemap_extent *next = extent + 1;
3334
3335 dout(10) << __FUNC__ << ": fm_mapped_extents=" << fiemap->fm_mapped_extents
3336 << " fe_logical=" << extent->fe_logical << " fe_length=" << extent->fe_length << dendl;
3337
3338 /* try to merge extents */
3339 while ((i < fiemap->fm_mapped_extents - 1) &&
3340 (extent->fe_logical + extent->fe_length == next->fe_logical)) {
3341 next->fe_length += extent->fe_length;
3342 next->fe_logical = extent->fe_logical;
3343 extent = next;
3344 next = extent + 1;
3345 i++;
3346 }
3347
3348 if (extent->fe_logical + extent->fe_length > offset + len)
3349 extent->fe_length = offset + len - extent->fe_logical;
3350 (*m)[extent->fe_logical] = extent->fe_length;
3351 i++;
3352 last = extent++;
3353 }
3354 uint64_t xoffset = last->fe_logical + last->fe_length - offset;
3355 offset = last->fe_logical + last->fe_length;
3356 len -= xoffset;
3357 const bool is_last = (last->fe_flags & FIEMAP_EXTENT_LAST) || (len == 0);
3358 free(fiemap);
3359 if (!is_last) {
3360 goto more;
3361 }
3362
3363 return r;
3364 }
3365
3366 int FileStore::_do_seek_hole_data(int fd, uint64_t offset, size_t len,
3367 map<uint64_t, uint64_t> *m)
3368 {
3369 #if defined(__linux__) && defined(SEEK_HOLE) && defined(SEEK_DATA)
3370 off_t hole_pos, data_pos;
3371 int r = 0;
3372
3373 // If lseek fails with errno setting to be ENXIO, this means the current
3374 // file offset is beyond the end of the file.
3375 off_t start = offset;
3376 while(start < (off_t)(offset + len)) {
3377 data_pos = lseek(fd, start, SEEK_DATA);
3378 if (data_pos < 0) {
3379 if (errno == ENXIO)
3380 break;
3381 else {
3382 r = -errno;
3383 dout(10) << "failed to lseek: " << cpp_strerror(r) << dendl;
3384 return r;
3385 }
3386 } else if (data_pos > (off_t)(offset + len)) {
3387 break;
3388 }
3389
3390 hole_pos = lseek(fd, data_pos, SEEK_HOLE);
3391 if (hole_pos < 0) {
3392 if (errno == ENXIO) {
3393 break;
3394 } else {
3395 r = -errno;
3396 dout(10) << "failed to lseek: " << cpp_strerror(r) << dendl;
3397 return r;
3398 }
3399 }
3400
3401 if (hole_pos >= (off_t)(offset + len)) {
3402 (*m)[data_pos] = offset + len - data_pos;
3403 break;
3404 }
3405 (*m)[data_pos] = hole_pos - data_pos;
3406 start = hole_pos;
3407 }
3408
3409 return r;
3410 #else
3411 (*m)[offset] = len;
3412 return 0;
3413 #endif
3414 }
3415
3416 int FileStore::fiemap(const coll_t& _cid, const ghobject_t& oid,
3417 uint64_t offset, size_t len,
3418 bufferlist& bl)
3419 {
3420 map<uint64_t, uint64_t> exomap;
3421 int r = fiemap(_cid, oid, offset, len, exomap);
3422 if (r >= 0) {
3423 ::encode(exomap, bl);
3424 }
3425 return r;
3426 }
3427
3428 int FileStore::fiemap(const coll_t& _cid, const ghobject_t& oid,
3429 uint64_t offset, size_t len,
3430 map<uint64_t, uint64_t>& destmap)
3431 {
3432 tracepoint(objectstore, fiemap_enter, _cid.c_str(), offset, len);
3433 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3434 destmap.clear();
3435
3436 if ((!backend->has_seek_data_hole() && !backend->has_fiemap()) ||
3437 len <= (size_t)m_filestore_fiemap_threshold) {
3438 destmap[offset] = len;
3439 return 0;
3440 }
3441
3442 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3443
3444 FDRef fd;
3445
3446 int r = lfn_open(cid, oid, false, &fd);
3447 if (r < 0) {
3448 dout(10) << "read couldn't open " << cid << "/" << oid << ": " << cpp_strerror(r) << dendl;
3449 goto done;
3450 }
3451
3452 if (backend->has_seek_data_hole()) {
3453 dout(15) << "seek_data/seek_hole " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3454 r = _do_seek_hole_data(**fd, offset, len, &destmap);
3455 } else if (backend->has_fiemap()) {
3456 dout(15) << "fiemap ioctl" << cid << "/" << oid << " " << offset << "~" << len << dendl;
3457 r = _do_fiemap(**fd, offset, len, &destmap);
3458 }
3459
3460 lfn_close(fd);
3461
3462 done:
3463
3464 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << " num_extents=" << destmap.size() << " " << destmap << dendl;
3465 assert(!m_filestore_fail_eio || r != -EIO);
3466 tracepoint(objectstore, fiemap_exit, r);
3467 return r;
3468 }
3469
3470 int FileStore::_remove(const coll_t& cid, const ghobject_t& oid,
3471 const SequencerPosition &spos)
3472 {
3473 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
3474 int r = lfn_unlink(cid, oid, spos);
3475 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
3476 return r;
3477 }
3478
3479 int FileStore::_truncate(const coll_t& cid, const ghobject_t& oid, uint64_t size)
3480 {
3481 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " size " << size << dendl;
3482 int r = lfn_truncate(cid, oid, size);
3483 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " size " << size << " = " << r << dendl;
3484 return r;
3485 }
3486
3487
3488 int FileStore::_touch(const coll_t& cid, const ghobject_t& oid)
3489 {
3490 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
3491
3492 FDRef fd;
3493 int r = lfn_open(cid, oid, true, &fd);
3494 if (r < 0) {
3495 return r;
3496 } else {
3497 lfn_close(fd);
3498 }
3499 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
3500 return r;
3501 }
3502
3503 int FileStore::_write(const coll_t& cid, const ghobject_t& oid,
3504 uint64_t offset, size_t len,
3505 const bufferlist& bl, uint32_t fadvise_flags)
3506 {
3507 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3508 int r;
3509
3510 FDRef fd;
3511 r = lfn_open(cid, oid, true, &fd);
3512 if (r < 0) {
3513 dout(0) << __FUNC__ << ": couldn't open " << cid << "/"
3514 << oid << ": "
3515 << cpp_strerror(r) << dendl;
3516 goto out;
3517 }
3518
3519 // write
3520 r = bl.write_fd(**fd, offset);
3521 if (r < 0) {
3522 derr << __FUNC__ << ": write_fd on " << cid << "/" << oid
3523 << " error: " << cpp_strerror(r) << dendl;
3524 lfn_close(fd);
3525 goto out;
3526 }
3527 r = bl.length();
3528
3529 if (r >= 0 && m_filestore_sloppy_crc) {
3530 int rc = backend->_crc_update_write(**fd, offset, len, bl);
3531 assert(rc >= 0);
3532 }
3533
3534 if (replaying || m_disable_wbthrottle) {
3535 if (fadvise_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED) {
3536 #ifdef HAVE_POSIX_FADVISE
3537 posix_fadvise(**fd, 0, 0, POSIX_FADV_DONTNEED);
3538 #endif
3539 }
3540 } else {
3541 wbthrottle.queue_wb(fd, oid, offset, len,
3542 fadvise_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
3543 }
3544
3545 lfn_close(fd);
3546
3547 out:
3548 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << dendl;
3549 return r;
3550 }
3551
3552 int FileStore::_zero(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len)
3553 {
3554 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3555 int ret = 0;
3556
3557 if (cct->_conf->filestore_punch_hole) {
3558 #ifdef CEPH_HAVE_FALLOCATE
3559 # if !defined(DARWIN) && !defined(__FreeBSD__)
3560 # ifdef FALLOC_FL_KEEP_SIZE
3561 // first try to punch a hole.
3562 FDRef fd;
3563 ret = lfn_open(cid, oid, false, &fd);
3564 if (ret < 0) {
3565 goto out;
3566 }
3567
3568 struct stat st;
3569 ret = ::fstat(**fd, &st);
3570 if (ret < 0) {
3571 ret = -errno;
3572 lfn_close(fd);
3573 goto out;
3574 }
3575
3576 // first try fallocate
3577 ret = fallocate(**fd, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE,
3578 offset, len);
3579 if (ret < 0) {
3580 ret = -errno;
3581 } else {
3582 // ensure we extent file size, if needed
3583 if (offset + len > (uint64_t)st.st_size) {
3584 ret = ::ftruncate(**fd, offset + len);
3585 if (ret < 0) {
3586 ret = -errno;
3587 lfn_close(fd);
3588 goto out;
3589 }
3590 }
3591 }
3592 lfn_close(fd);
3593
3594 if (ret >= 0 && m_filestore_sloppy_crc) {
3595 int rc = backend->_crc_update_zero(**fd, offset, len);
3596 assert(rc >= 0);
3597 }
3598
3599 if (ret == 0)
3600 goto out; // yay!
3601 if (ret != -EOPNOTSUPP)
3602 goto out; // some other error
3603 # endif
3604 # endif
3605 #endif
3606 }
3607
3608 // lame, kernel is old and doesn't support it.
3609 // write zeros.. yuck!
3610 dout(20) << __FUNC__ << ": falling back to writing zeros" << dendl;
3611 {
3612 bufferlist bl;
3613 bl.append_zero(len);
3614 ret = _write(cid, oid, offset, len, bl);
3615 }
3616
3617 #ifdef CEPH_HAVE_FALLOCATE
3618 # if !defined(DARWIN) && !defined(__FreeBSD__)
3619 # ifdef FALLOC_FL_KEEP_SIZE
3620 out:
3621 # endif
3622 # endif
3623 #endif
3624 dout(20) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << ret << dendl;
3625 return ret;
3626 }
3627
3628 int FileStore::_clone(const coll_t& cid, const ghobject_t& oldoid, const ghobject_t& newoid,
3629 const SequencerPosition& spos)
3630 {
3631 dout(15) << __FUNC__ << ": " << cid << "/" << oldoid << " -> " << cid << "/" << newoid << dendl;
3632
3633 if (_check_replay_guard(cid, newoid, spos) < 0)
3634 return 0;
3635
3636 int r;
3637 FDRef o, n;
3638 {
3639 Index index;
3640 r = lfn_open(cid, oldoid, false, &o, &index);
3641 if (r < 0) {
3642 goto out2;
3643 }
3644 assert(NULL != (index.index));
3645 RWLock::WLocker l((index.index)->access_lock);
3646
3647 r = lfn_open(cid, newoid, true, &n, &index);
3648 if (r < 0) {
3649 goto out;
3650 }
3651 r = ::ftruncate(**n, 0);
3652 if (r < 0) {
3653 r = -errno;
3654 goto out3;
3655 }
3656 struct stat st;
3657 r = ::fstat(**o, &st);
3658 if (r < 0) {
3659 r = -errno;
3660 goto out3;
3661 }
3662
3663 r = _do_clone_range(**o, **n, 0, st.st_size, 0);
3664 if (r < 0) {
3665 goto out3;
3666 }
3667
3668 dout(20) << "objectmap clone" << dendl;
3669 r = object_map->clone(oldoid, newoid, &spos);
3670 if (r < 0 && r != -ENOENT)
3671 goto out3;
3672 }
3673
3674 {
3675 char buf[2];
3676 map<string, bufferptr> aset;
3677 r = _fgetattrs(**o, aset);
3678 if (r < 0)
3679 goto out3;
3680
3681 r = chain_fgetxattr(**o, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
3682 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
3683 r = chain_fsetxattr<true, true>(**n, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
3684 sizeof(XATTR_NO_SPILL_OUT));
3685 } else {
3686 r = chain_fsetxattr<true, true>(**n, XATTR_SPILL_OUT_NAME, XATTR_SPILL_OUT,
3687 sizeof(XATTR_SPILL_OUT));
3688 }
3689 if (r < 0)
3690 goto out3;
3691
3692 r = _fsetattrs(**n, aset);
3693 if (r < 0)
3694 goto out3;
3695 }
3696
3697 // clone is non-idempotent; record our work.
3698 _set_replay_guard(**n, spos, &newoid);
3699
3700 out3:
3701 lfn_close(n);
3702 out:
3703 lfn_close(o);
3704 out2:
3705 dout(10) << __FUNC__ << ": " << cid << "/" << oldoid << " -> " << cid << "/" << newoid << " = " << r << dendl;
3706 assert(!m_filestore_fail_eio || r != -EIO);
3707 return r;
3708 }
3709
3710 int FileStore::_do_clone_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff)
3711 {
3712 dout(20) << __FUNC__ << ": copy " << srcoff << "~" << len << " to " << dstoff << dendl;
3713 return backend->clone_range(from, to, srcoff, len, dstoff);
3714 }
3715
3716 int FileStore::_do_sparse_copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff)
3717 {
3718 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << dendl;
3719 int r = 0;
3720 map<uint64_t, uint64_t> exomap;
3721 // fiemap doesn't allow zero length
3722 if (len == 0)
3723 return 0;
3724
3725 if (backend->has_seek_data_hole()) {
3726 dout(15) << "seek_data/seek_hole " << from << " " << srcoff << "~" << len << dendl;
3727 r = _do_seek_hole_data(from, srcoff, len, &exomap);
3728 } else if (backend->has_fiemap()) {
3729 dout(15) << "fiemap ioctl" << from << " " << srcoff << "~" << len << dendl;
3730 r = _do_fiemap(from, srcoff, len, &exomap);
3731 }
3732
3733
3734 int64_t written = 0;
3735 if (r < 0)
3736 goto out;
3737
3738 for (map<uint64_t, uint64_t>::iterator miter = exomap.begin(); miter != exomap.end(); ++miter) {
3739 uint64_t it_off = miter->first - srcoff + dstoff;
3740 r = _do_copy_range(from, to, miter->first, miter->second, it_off, true);
3741 if (r < 0) {
3742 derr << __FUNC__ << ": copy error at " << miter->first << "~" << miter->second
3743 << " to " << it_off << ", " << cpp_strerror(r) << dendl;
3744 break;
3745 }
3746 written += miter->second;
3747 }
3748
3749 if (r >= 0) {
3750 if (m_filestore_sloppy_crc) {
3751 int rc = backend->_crc_update_clone_range(from, to, srcoff, len, dstoff);
3752 assert(rc >= 0);
3753 }
3754 struct stat st;
3755 r = ::fstat(to, &st);
3756 if (r < 0) {
3757 r = -errno;
3758 derr << __FUNC__ << ": fstat error at " << to << " " << cpp_strerror(r) << dendl;
3759 goto out;
3760 }
3761 if (st.st_size < (int)(dstoff + len)) {
3762 r = ::ftruncate(to, dstoff + len);
3763 if (r < 0) {
3764 r = -errno;
3765 derr << __FUNC__ << ": ftruncate error at " << dstoff+len << " " << cpp_strerror(r) << dendl;
3766 goto out;
3767 }
3768 }
3769 r = written;
3770 }
3771
3772 out:
3773 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
3774 return r;
3775 }
3776
3777 int FileStore::_do_copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff, bool skip_sloppycrc)
3778 {
3779 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << dendl;
3780 int r = 0;
3781 loff_t pos = srcoff;
3782 loff_t end = srcoff + len;
3783 int buflen = 4096 * 16; //limit by pipe max size.see fcntl
3784
3785 #ifdef CEPH_HAVE_SPLICE
3786 if (backend->has_splice()) {
3787 int pipefd[2];
3788 if (pipe(pipefd) < 0) {
3789 r = -errno;
3790 derr << " pipe " << " got " << cpp_strerror(r) << dendl;
3791 return r;
3792 }
3793
3794 loff_t dstpos = dstoff;
3795 while (pos < end) {
3796 int l = MIN(end-pos, buflen);
3797 r = safe_splice(from, &pos, pipefd[1], NULL, l, SPLICE_F_NONBLOCK);
3798 dout(10) << " safe_splice read from " << pos << "~" << l << " got " << r << dendl;
3799 if (r < 0) {
3800 derr << __FUNC__ << ": safe_splice read error at " << pos << "~" << len
3801 << ", " << cpp_strerror(r) << dendl;
3802 break;
3803 }
3804 if (r == 0) {
3805 // hrm, bad source range, wtf.
3806 r = -ERANGE;
3807 derr << __FUNC__ << ": got short read result at " << pos
3808 << " of fd " << from << " len " << len << dendl;
3809 break;
3810 }
3811
3812 r = safe_splice(pipefd[0], NULL, to, &dstpos, r, 0);
3813 dout(10) << " safe_splice write to " << to << " len " << r
3814 << " got " << r << dendl;
3815 if (r < 0) {
3816 derr << __FUNC__ << ": write error at " << pos << "~"
3817 << r << ", " << cpp_strerror(r) << dendl;
3818 break;
3819 }
3820 }
3821 close(pipefd[0]);
3822 close(pipefd[1]);
3823 } else
3824 #endif
3825 {
3826 int64_t actual;
3827
3828 actual = ::lseek64(from, srcoff, SEEK_SET);
3829 if (actual != (int64_t)srcoff) {
3830 if (actual < 0)
3831 r = -errno;
3832 else
3833 r = -EINVAL;
3834 derr << "lseek64 to " << srcoff << " got " << cpp_strerror(r) << dendl;
3835 return r;
3836 }
3837 actual = ::lseek64(to, dstoff, SEEK_SET);
3838 if (actual != (int64_t)dstoff) {
3839 if (actual < 0)
3840 r = -errno;
3841 else
3842 r = -EINVAL;
3843 derr << "lseek64 to " << dstoff << " got " << cpp_strerror(r) << dendl;
3844 return r;
3845 }
3846
3847 char buf[buflen];
3848 while (pos < end) {
3849 int l = MIN(end-pos, buflen);
3850 r = ::read(from, buf, l);
3851 dout(25) << " read from " << pos << "~" << l << " got " << r << dendl;
3852 if (r < 0) {
3853 if (errno == EINTR) {
3854 continue;
3855 } else {
3856 r = -errno;
3857 derr << __FUNC__ << ": read error at " << pos << "~" << len
3858 << ", " << cpp_strerror(r) << dendl;
3859 break;
3860 }
3861 }
3862 if (r == 0) {
3863 // hrm, bad source range, wtf.
3864 r = -ERANGE;
3865 derr << __FUNC__ << ": got short read result at " << pos
3866 << " of fd " << from << " len " << len << dendl;
3867 break;
3868 }
3869 int op = 0;
3870 while (op < r) {
3871 int r2 = safe_write(to, buf+op, r-op);
3872 dout(25) << " write to " << to << " len " << (r-op)
3873 << " got " << r2 << dendl;
3874 if (r2 < 0) {
3875 r = r2;
3876 derr << __FUNC__ << ": write error at " << pos << "~"
3877 << r-op << ", " << cpp_strerror(r) << dendl;
3878
3879 break;
3880 }
3881 op += (r-op);
3882 }
3883 if (r < 0)
3884 break;
3885 pos += r;
3886 }
3887 }
3888
3889 if (r < 0 && replaying) {
3890 assert(r == -ERANGE);
3891 derr << __FUNC__ << ": short source tolerated because we are replaying" << dendl;
3892 r = pos - from;;
3893 }
3894 assert(replaying || pos == end);
3895 if (r >= 0 && !skip_sloppycrc && m_filestore_sloppy_crc) {
3896 int rc = backend->_crc_update_clone_range(from, to, srcoff, len, dstoff);
3897 assert(rc >= 0);
3898 }
3899 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
3900 return r;
3901 }
3902
3903 int FileStore::_clone_range(const coll_t& oldcid, const ghobject_t& oldoid, const coll_t& newcid, const ghobject_t& newoid,
3904 uint64_t srcoff, uint64_t len, uint64_t dstoff,
3905 const SequencerPosition& spos)
3906 {
3907 dout(15) << __FUNC__ << ": " << oldcid << "/" << oldoid << " -> " << newcid << "/" << newoid << " " << srcoff << "~" << len << " to " << dstoff << dendl;
3908
3909 if (_check_replay_guard(newcid, newoid, spos) < 0)
3910 return 0;
3911
3912 int r;
3913 FDRef o, n;
3914 r = lfn_open(oldcid, oldoid, false, &o);
3915 if (r < 0) {
3916 goto out2;
3917 }
3918 r = lfn_open(newcid, newoid, true, &n);
3919 if (r < 0) {
3920 goto out;
3921 }
3922 r = _do_clone_range(**o, **n, srcoff, len, dstoff);
3923 if (r < 0) {
3924 goto out3;
3925 }
3926
3927 // clone is non-idempotent; record our work.
3928 _set_replay_guard(**n, spos, &newoid);
3929
3930 out3:
3931 lfn_close(n);
3932 out:
3933 lfn_close(o);
3934 out2:
3935 dout(10) << __FUNC__ << ": " << oldcid << "/" << oldoid << " -> " << newcid << "/" << newoid << " "
3936 << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
3937 return r;
3938 }
3939
3940 class SyncEntryTimeout : public Context {
3941 public:
3942 CephContext* cct;
3943 explicit SyncEntryTimeout(CephContext* cct, int commit_timeo)
3944 : cct(cct), m_commit_timeo(commit_timeo)
3945 {
3946 }
3947
3948 void finish(int r) override {
3949 BackTrace *bt = new BackTrace(1);
3950 generic_dout(-1) << "FileStore: sync_entry timed out after "
3951 << m_commit_timeo << " seconds.\n";
3952 bt->print(*_dout);
3953 *_dout << dendl;
3954 delete bt;
3955 ceph_abort();
3956 }
3957 private:
3958 int m_commit_timeo;
3959 };
3960
3961 void FileStore::sync_entry()
3962 {
3963 lock.Lock();
3964 while (!stop) {
3965 utime_t max_interval;
3966 max_interval.set_from_double(m_filestore_max_sync_interval);
3967 utime_t min_interval;
3968 min_interval.set_from_double(m_filestore_min_sync_interval);
3969
3970 utime_t startwait = ceph_clock_now();
3971 if (!force_sync) {
3972 dout(20) << __FUNC__ << ": waiting for max_interval " << max_interval << dendl;
3973 sync_cond.WaitInterval(lock, max_interval);
3974 } else {
3975 dout(20) << __FUNC__ << ": not waiting, force_sync set" << dendl;
3976 }
3977
3978 if (force_sync) {
3979 dout(20) << __FUNC__ << ": force_sync set" << dendl;
3980 force_sync = false;
3981 } else if (stop) {
3982 dout(20) << __FUNC__ << ": stop set" << dendl;
3983 break;
3984 } else {
3985 // wait for at least the min interval
3986 utime_t woke = ceph_clock_now();
3987 woke -= startwait;
3988 dout(20) << __FUNC__ << ": woke after " << woke << dendl;
3989 if (woke < min_interval) {
3990 utime_t t = min_interval;
3991 t -= woke;
3992 dout(20) << __FUNC__ << ": waiting for another " << t
3993 << " to reach min interval " << min_interval << dendl;
3994 sync_cond.WaitInterval(lock, t);
3995 }
3996 }
3997
3998 list<Context*> fin;
3999 again:
4000 fin.swap(sync_waiters);
4001 lock.Unlock();
4002
4003 op_tp.pause();
4004 if (apply_manager.commit_start()) {
4005 utime_t start = ceph_clock_now();
4006 uint64_t cp = apply_manager.get_committing_seq();
4007
4008 sync_entry_timeo_lock.Lock();
4009 SyncEntryTimeout *sync_entry_timeo =
4010 new SyncEntryTimeout(cct, m_filestore_commit_timeout);
4011 if (!timer.add_event_after(m_filestore_commit_timeout,
4012 sync_entry_timeo)) {
4013 sync_entry_timeo = nullptr;
4014 }
4015 sync_entry_timeo_lock.Unlock();
4016
4017 logger->set(l_filestore_committing, 1);
4018
4019 dout(15) << __FUNC__ << ": committing " << cp << dendl;
4020 stringstream errstream;
4021 if (cct->_conf->filestore_debug_omap_check && !object_map->check(errstream)) {
4022 derr << errstream.str() << dendl;
4023 ceph_abort();
4024 }
4025
4026 if (backend->can_checkpoint()) {
4027 int err = write_op_seq(op_fd, cp);
4028 if (err < 0) {
4029 derr << "Error during write_op_seq: " << cpp_strerror(err) << dendl;
4030 assert(0 == "error during write_op_seq");
4031 }
4032
4033 char s[NAME_MAX];
4034 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)cp);
4035 uint64_t cid = 0;
4036 err = backend->create_checkpoint(s, &cid);
4037 if (err < 0) {
4038 int err = errno;
4039 derr << "snap create '" << s << "' got error " << err << dendl;
4040 assert(err == 0);
4041 }
4042
4043 snaps.push_back(cp);
4044 apply_manager.commit_started();
4045 op_tp.unpause();
4046
4047 if (cid > 0) {
4048 dout(20) << " waiting for checkpoint " << cid << " to complete" << dendl;
4049 err = backend->sync_checkpoint(cid);
4050 if (err < 0) {
4051 derr << "ioctl WAIT_SYNC got " << cpp_strerror(err) << dendl;
4052 assert(0 == "wait_sync got error");
4053 }
4054 dout(20) << " done waiting for checkpoint " << cid << " to complete" << dendl;
4055 }
4056 } else {
4057 apply_manager.commit_started();
4058 op_tp.unpause();
4059
4060 int err = object_map->sync();
4061 if (err < 0) {
4062 derr << "object_map sync got " << cpp_strerror(err) << dendl;
4063 assert(0 == "object_map sync returned error");
4064 }
4065
4066 err = backend->syncfs();
4067 if (err < 0) {
4068 derr << "syncfs got " << cpp_strerror(err) << dendl;
4069 assert(0 == "syncfs returned error");
4070 }
4071
4072 err = write_op_seq(op_fd, cp);
4073 if (err < 0) {
4074 derr << "Error during write_op_seq: " << cpp_strerror(err) << dendl;
4075 assert(0 == "error during write_op_seq");
4076 }
4077 err = ::fsync(op_fd);
4078 if (err < 0) {
4079 derr << "Error during fsync of op_seq: " << cpp_strerror(err) << dendl;
4080 assert(0 == "error during fsync of op_seq");
4081 }
4082 }
4083
4084 utime_t done = ceph_clock_now();
4085 utime_t lat = done - start;
4086 utime_t dur = done - startwait;
4087 dout(10) << __FUNC__ << ": commit took " << lat << ", interval was " << dur << dendl;
4088 utime_t max_pause_lat = logger->tget(l_filestore_sync_pause_max_lat);
4089 if (max_pause_lat < dur - lat) {
4090 logger->tinc(l_filestore_sync_pause_max_lat, dur - lat);
4091 }
4092
4093 logger->inc(l_filestore_commitcycle);
4094 logger->tinc(l_filestore_commitcycle_latency, lat);
4095 logger->tinc(l_filestore_commitcycle_interval, dur);
4096
4097 apply_manager.commit_finish();
4098 if (!m_disable_wbthrottle) {
4099 wbthrottle.clear();
4100 }
4101
4102 logger->set(l_filestore_committing, 0);
4103
4104 // remove old snaps?
4105 if (backend->can_checkpoint()) {
4106 char s[NAME_MAX];
4107 while (snaps.size() > 2) {
4108 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)snaps.front());
4109 snaps.pop_front();
4110 dout(10) << "removing snap '" << s << "'" << dendl;
4111 int r = backend->destroy_checkpoint(s);
4112 if (r) {
4113 int err = errno;
4114 derr << "unable to destroy snap '" << s << "' got " << cpp_strerror(err) << dendl;
4115 }
4116 }
4117 }
4118
4119 dout(15) << __FUNC__ << ": committed to op_seq " << cp << dendl;
4120
4121 if (sync_entry_timeo) {
4122 Mutex::Locker lock(sync_entry_timeo_lock);
4123 timer.cancel_event(sync_entry_timeo);
4124 }
4125 } else {
4126 op_tp.unpause();
4127 }
4128
4129 lock.Lock();
4130 finish_contexts(cct, fin, 0);
4131 fin.clear();
4132 if (!sync_waiters.empty()) {
4133 dout(10) << __FUNC__ << ": more waiters, committing again" << dendl;
4134 goto again;
4135 }
4136 if (!stop && journal && journal->should_commit_now()) {
4137 dout(10) << __FUNC__ << ": journal says we should commit again (probably is/was full)" << dendl;
4138 goto again;
4139 }
4140 }
4141 stop = false;
4142 lock.Unlock();
4143 }
4144
4145 void FileStore::_start_sync()
4146 {
4147 if (!journal) { // don't do a big sync if the journal is on
4148 dout(10) << __FUNC__ << dendl;
4149 sync_cond.Signal();
4150 } else {
4151 dout(10) << __FUNC__ << ": - NOOP (journal is on)" << dendl;
4152 }
4153 }
4154
4155 void FileStore::do_force_sync()
4156 {
4157 dout(10) << __FUNC__ << dendl;
4158 Mutex::Locker l(lock);
4159 force_sync = true;
4160 sync_cond.Signal();
4161 }
4162
4163 void FileStore::start_sync(Context *onsafe)
4164 {
4165 Mutex::Locker l(lock);
4166 sync_waiters.push_back(onsafe);
4167 sync_cond.Signal();
4168 force_sync = true;
4169 dout(10) << __FUNC__ << dendl;
4170 }
4171
4172 void FileStore::sync()
4173 {
4174 Mutex l("FileStore::sync");
4175 Cond c;
4176 bool done;
4177 C_SafeCond *fin = new C_SafeCond(&l, &c, &done);
4178
4179 start_sync(fin);
4180
4181 l.Lock();
4182 while (!done) {
4183 dout(10) << "sync waiting" << dendl;
4184 c.Wait(l);
4185 }
4186 l.Unlock();
4187 dout(10) << "sync done" << dendl;
4188 }
4189
4190 void FileStore::_flush_op_queue()
4191 {
4192 dout(10) << __FUNC__ << ": draining op tp" << dendl;
4193 op_wq.drain();
4194 dout(10) << __FUNC__ << ": waiting for apply finisher" << dendl;
4195 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
4196 (*it)->wait_for_empty();
4197 }
4198 }
4199
4200 /*
4201 * flush - make every queued write readable
4202 */
4203 void FileStore::flush()
4204 {
4205 dout(10) << __FUNC__ << dendl;
4206
4207 if (cct->_conf->filestore_blackhole) {
4208 // wait forever
4209 Mutex lock("FileStore::flush::lock");
4210 Cond cond;
4211 lock.Lock();
4212 while (true)
4213 cond.Wait(lock);
4214 ceph_abort();
4215 }
4216
4217 if (m_filestore_journal_writeahead) {
4218 if (journal)
4219 journal->flush();
4220 dout(10) << __FUNC__ << ": draining ondisk finisher" << dendl;
4221 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
4222 (*it)->wait_for_empty();
4223 }
4224 }
4225
4226 _flush_op_queue();
4227 dout(10) << __FUNC__ << ": complete" << dendl;
4228 }
4229
4230 /*
4231 * sync_and_flush - make every queued write readable AND committed to disk
4232 */
4233 void FileStore::sync_and_flush()
4234 {
4235 dout(10) << __FUNC__ << dendl;
4236
4237 if (m_filestore_journal_writeahead) {
4238 if (journal)
4239 journal->flush();
4240 _flush_op_queue();
4241 } else {
4242 // includes m_filestore_journal_parallel
4243 _flush_op_queue();
4244 sync();
4245 }
4246 dout(10) << __FUNC__ << ": done" << dendl;
4247 }
4248
4249 int FileStore::flush_journal()
4250 {
4251 dout(10) << __FUNC__ << dendl;
4252 sync_and_flush();
4253 sync();
4254 return 0;
4255 }
4256
4257 int FileStore::snapshot(const string& name)
4258 {
4259 dout(10) << __FUNC__ << ": " << name << dendl;
4260 sync_and_flush();
4261
4262 if (!backend->can_checkpoint()) {
4263 dout(0) << __FUNC__ << ": " << name << " failed, not supported" << dendl;
4264 return -EOPNOTSUPP;
4265 }
4266
4267 char s[NAME_MAX];
4268 snprintf(s, sizeof(s), CLUSTER_SNAP_ITEM, name.c_str());
4269
4270 int r = backend->create_checkpoint(s, NULL);
4271 if (r) {
4272 derr << __FUNC__ << ": " << name << " failed: " << cpp_strerror(r) << dendl;
4273 }
4274
4275 return r;
4276 }
4277
4278 // -------------------------------
4279 // attributes
4280
4281 int FileStore::_fgetattr(int fd, const char *name, bufferptr& bp)
4282 {
4283 char val[CHAIN_XATTR_MAX_BLOCK_LEN];
4284 int l = chain_fgetxattr(fd, name, val, sizeof(val));
4285 if (l >= 0) {
4286 bp = buffer::create(l);
4287 memcpy(bp.c_str(), val, l);
4288 } else if (l == -ERANGE) {
4289 l = chain_fgetxattr(fd, name, 0, 0);
4290 if (l > 0) {
4291 bp = buffer::create(l);
4292 l = chain_fgetxattr(fd, name, bp.c_str(), l);
4293 }
4294 }
4295 assert(!m_filestore_fail_eio || l != -EIO);
4296 return l;
4297 }
4298
4299 int FileStore::_fgetattrs(int fd, map<string,bufferptr>& aset)
4300 {
4301 // get attr list
4302 char names1[100];
4303 int len = chain_flistxattr(fd, names1, sizeof(names1)-1);
4304 char *names2 = 0;
4305 char *name = 0;
4306 if (len == -ERANGE) {
4307 len = chain_flistxattr(fd, 0, 0);
4308 if (len < 0) {
4309 assert(!m_filestore_fail_eio || len != -EIO);
4310 return len;
4311 }
4312 dout(10) << " -ERANGE, len is " << len << dendl;
4313 names2 = new char[len+1];
4314 len = chain_flistxattr(fd, names2, len);
4315 dout(10) << " -ERANGE, got " << len << dendl;
4316 if (len < 0) {
4317 assert(!m_filestore_fail_eio || len != -EIO);
4318 delete[] names2;
4319 return len;
4320 }
4321 name = names2;
4322 } else if (len < 0) {
4323 assert(!m_filestore_fail_eio || len != -EIO);
4324 return len;
4325 } else {
4326 name = names1;
4327 }
4328 name[len] = 0;
4329
4330 char *end = name + len;
4331 while (name < end) {
4332 char *attrname = name;
4333 if (parse_attrname(&name)) {
4334 if (*name) {
4335 dout(20) << __FUNC__ << ": " << fd << " getting '" << name << "'" << dendl;
4336 int r = _fgetattr(fd, attrname, aset[name]);
4337 if (r < 0) {
4338 delete[] names2;
4339 return r;
4340 }
4341 }
4342 }
4343 name += strlen(name) + 1;
4344 }
4345
4346 delete[] names2;
4347 return 0;
4348 }
4349
4350 int FileStore::_fsetattrs(int fd, map<string, bufferptr> &aset)
4351 {
4352 for (map<string, bufferptr>::iterator p = aset.begin();
4353 p != aset.end();
4354 ++p) {
4355 char n[CHAIN_XATTR_MAX_NAME_LEN];
4356 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4357 const char *val;
4358 if (p->second.length())
4359 val = p->second.c_str();
4360 else
4361 val = "";
4362 // ??? Why do we skip setting all the other attrs if one fails?
4363 int r = chain_fsetxattr(fd, n, val, p->second.length());
4364 if (r < 0) {
4365 derr << __FUNC__ << ": chain_setxattr returned " << r << dendl;
4366 return r;
4367 }
4368 }
4369 return 0;
4370 }
4371
4372 // debug EIO injection
4373 void FileStore::inject_data_error(const ghobject_t &oid) {
4374 Mutex::Locker l(read_error_lock);
4375 dout(10) << __FUNC__ << ": init error on " << oid << dendl;
4376 data_error_set.insert(oid);
4377 }
4378 void FileStore::inject_mdata_error(const ghobject_t &oid) {
4379 Mutex::Locker l(read_error_lock);
4380 dout(10) << __FUNC__ << ": init error on " << oid << dendl;
4381 mdata_error_set.insert(oid);
4382 }
4383
4384 void FileStore::debug_obj_on_delete(const ghobject_t &oid) {
4385 Mutex::Locker l(read_error_lock);
4386 dout(10) << __FUNC__ << ": clear error on " << oid << dendl;
4387 data_error_set.erase(oid);
4388 mdata_error_set.erase(oid);
4389 }
4390 bool FileStore::debug_data_eio(const ghobject_t &oid) {
4391 Mutex::Locker l(read_error_lock);
4392 if (data_error_set.count(oid)) {
4393 dout(10) << __FUNC__ << ": inject error on " << oid << dendl;
4394 return true;
4395 } else {
4396 return false;
4397 }
4398 }
4399 bool FileStore::debug_mdata_eio(const ghobject_t &oid) {
4400 Mutex::Locker l(read_error_lock);
4401 if (mdata_error_set.count(oid)) {
4402 dout(10) << __FUNC__ << ": inject error on " << oid << dendl;
4403 return true;
4404 } else {
4405 return false;
4406 }
4407 }
4408
4409
4410 // objects
4411
4412 int FileStore::getattr(const coll_t& _cid, const ghobject_t& oid, const char *name, bufferptr &bp)
4413 {
4414 tracepoint(objectstore, getattr_enter, _cid.c_str());
4415 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
4416 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "'" << dendl;
4417 FDRef fd;
4418 int r = lfn_open(cid, oid, false, &fd);
4419 if (r < 0) {
4420 goto out;
4421 }
4422 char n[CHAIN_XATTR_MAX_NAME_LEN];
4423 get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN);
4424 r = _fgetattr(**fd, n, bp);
4425 lfn_close(fd);
4426 if (r == -ENODATA) {
4427 map<string, bufferlist> got;
4428 set<string> to_get;
4429 to_get.insert(string(name));
4430 Index index;
4431 r = get_index(cid, &index);
4432 if (r < 0) {
4433 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
4434 goto out;
4435 }
4436 r = object_map->get_xattrs(oid, to_get, &got);
4437 if (r < 0 && r != -ENOENT) {
4438 dout(10) << __FUNC__ << ": get_xattrs err r =" << r << dendl;
4439 goto out;
4440 }
4441 if (got.empty()) {
4442 dout(10) << __FUNC__ << ": got.size() is 0" << dendl;
4443 return -ENODATA;
4444 }
4445 bp = bufferptr(got.begin()->second.c_str(),
4446 got.begin()->second.length());
4447 r = bp.length();
4448 }
4449 out:
4450 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "' = " << r << dendl;
4451 assert(!m_filestore_fail_eio || r != -EIO);
4452 if (cct->_conf->filestore_debug_inject_read_err &&
4453 debug_mdata_eio(oid)) {
4454 return -EIO;
4455 } else {
4456 tracepoint(objectstore, getattr_exit, r);
4457 return r < 0 ? r : 0;
4458 }
4459 }
4460
4461 int FileStore::getattrs(const coll_t& _cid, const ghobject_t& oid, map<string,bufferptr>& aset)
4462 {
4463 tracepoint(objectstore, getattrs_enter, _cid.c_str());
4464 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
4465 set<string> omap_attrs;
4466 map<string, bufferlist> omap_aset;
4467 Index index;
4468 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
4469 FDRef fd;
4470 bool spill_out = true;
4471 char buf[2];
4472
4473 int r = lfn_open(cid, oid, false, &fd);
4474 if (r < 0) {
4475 goto out;
4476 }
4477
4478 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4479 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT)))
4480 spill_out = false;
4481
4482 r = _fgetattrs(**fd, aset);
4483 lfn_close(fd);
4484 fd = FDRef(); // defensive
4485 if (r < 0) {
4486 goto out;
4487 }
4488
4489 if (!spill_out) {
4490 dout(10) << __FUNC__ << ": no xattr exists in object_map r = " << r << dendl;
4491 goto out;
4492 }
4493
4494 r = get_index(cid, &index);
4495 if (r < 0) {
4496 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
4497 goto out;
4498 }
4499 {
4500 r = object_map->get_all_xattrs(oid, &omap_attrs);
4501 if (r < 0 && r != -ENOENT) {
4502 dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
4503 goto out;
4504 }
4505
4506 r = object_map->get_xattrs(oid, omap_attrs, &omap_aset);
4507 if (r < 0 && r != -ENOENT) {
4508 dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
4509 goto out;
4510 }
4511 if (r == -ENOENT)
4512 r = 0;
4513 }
4514 assert(omap_attrs.size() == omap_aset.size());
4515 for (map<string, bufferlist>::iterator i = omap_aset.begin();
4516 i != omap_aset.end();
4517 ++i) {
4518 string key(i->first);
4519 aset.insert(make_pair(key,
4520 bufferptr(i->second.c_str(), i->second.length())));
4521 }
4522 out:
4523 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
4524 assert(!m_filestore_fail_eio || r != -EIO);
4525
4526 if (cct->_conf->filestore_debug_inject_read_err &&
4527 debug_mdata_eio(oid)) {
4528 return -EIO;
4529 } else {
4530 tracepoint(objectstore, getattrs_exit, r);
4531 return r;
4532 }
4533 }
4534
4535 int FileStore::_setattrs(const coll_t& cid, const ghobject_t& oid, map<string,bufferptr>& aset,
4536 const SequencerPosition &spos)
4537 {
4538 map<string, bufferlist> omap_set;
4539 set<string> omap_remove;
4540 map<string, bufferptr> inline_set;
4541 map<string, bufferptr> inline_to_set;
4542 FDRef fd;
4543 int spill_out = -1;
4544 bool incomplete_inline = false;
4545
4546 int r = lfn_open(cid, oid, false, &fd);
4547 if (r < 0) {
4548 goto out;
4549 }
4550
4551 char buf[2];
4552 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4553 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT)))
4554 spill_out = 0;
4555 else
4556 spill_out = 1;
4557
4558 r = _fgetattrs(**fd, inline_set);
4559 incomplete_inline = (r == -E2BIG);
4560 assert(!m_filestore_fail_eio || r != -EIO);
4561 dout(15) << __FUNC__ << ": " << cid << "/" << oid
4562 << (incomplete_inline ? " (incomplete_inline, forcing omap)" : "")
4563 << dendl;
4564
4565 for (map<string,bufferptr>::iterator p = aset.begin();
4566 p != aset.end();
4567 ++p) {
4568 char n[CHAIN_XATTR_MAX_NAME_LEN];
4569 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4570
4571 if (incomplete_inline) {
4572 chain_fremovexattr(**fd, n); // ignore any error
4573 omap_set[p->first].push_back(p->second);
4574 continue;
4575 }
4576
4577 if (p->second.length() > m_filestore_max_inline_xattr_size) {
4578 if (inline_set.count(p->first)) {
4579 inline_set.erase(p->first);
4580 r = chain_fremovexattr(**fd, n);
4581 if (r < 0)
4582 goto out_close;
4583 }
4584 omap_set[p->first].push_back(p->second);
4585 continue;
4586 }
4587
4588 if (!inline_set.count(p->first) &&
4589 inline_set.size() >= m_filestore_max_inline_xattrs) {
4590 omap_set[p->first].push_back(p->second);
4591 continue;
4592 }
4593 omap_remove.insert(p->first);
4594 inline_set.insert(*p);
4595
4596 inline_to_set.insert(*p);
4597 }
4598
4599 if (spill_out != 1 && !omap_set.empty()) {
4600 chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_SPILL_OUT,
4601 sizeof(XATTR_SPILL_OUT));
4602 }
4603
4604 r = _fsetattrs(**fd, inline_to_set);
4605 if (r < 0)
4606 goto out_close;
4607
4608 if (spill_out && !omap_remove.empty()) {
4609 r = object_map->remove_xattrs(oid, omap_remove, &spos);
4610 if (r < 0 && r != -ENOENT) {
4611 dout(10) << __FUNC__ << ": could not remove_xattrs r = " << r << dendl;
4612 assert(!m_filestore_fail_eio || r != -EIO);
4613 goto out_close;
4614 } else {
4615 r = 0; // don't confuse the debug output
4616 }
4617 }
4618
4619 if (!omap_set.empty()) {
4620 r = object_map->set_xattrs(oid, omap_set, &spos);
4621 if (r < 0) {
4622 dout(10) << __FUNC__ << ": could not set_xattrs r = " << r << dendl;
4623 assert(!m_filestore_fail_eio || r != -EIO);
4624 goto out_close;
4625 }
4626 }
4627 out_close:
4628 lfn_close(fd);
4629 out:
4630 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
4631 return r;
4632 }
4633
4634
4635 int FileStore::_rmattr(const coll_t& cid, const ghobject_t& oid, const char *name,
4636 const SequencerPosition &spos)
4637 {
4638 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "'" << dendl;
4639 FDRef fd;
4640 bool spill_out = true;
4641
4642 int r = lfn_open(cid, oid, false, &fd);
4643 if (r < 0) {
4644 goto out;
4645 }
4646
4647 char buf[2];
4648 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4649 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
4650 spill_out = false;
4651 }
4652
4653 char n[CHAIN_XATTR_MAX_NAME_LEN];
4654 get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN);
4655 r = chain_fremovexattr(**fd, n);
4656 if (r == -ENODATA && spill_out) {
4657 Index index;
4658 r = get_index(cid, &index);
4659 if (r < 0) {
4660 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
4661 goto out_close;
4662 }
4663 set<string> to_remove;
4664 to_remove.insert(string(name));
4665 r = object_map->remove_xattrs(oid, to_remove, &spos);
4666 if (r < 0 && r != -ENOENT) {
4667 dout(10) << __FUNC__ << ": could not remove_xattrs index r = " << r << dendl;
4668 assert(!m_filestore_fail_eio || r != -EIO);
4669 goto out_close;
4670 }
4671 }
4672 out_close:
4673 lfn_close(fd);
4674 out:
4675 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "' = " << r << dendl;
4676 return r;
4677 }
4678
4679 int FileStore::_rmattrs(const coll_t& cid, const ghobject_t& oid,
4680 const SequencerPosition &spos)
4681 {
4682 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
4683
4684 map<string,bufferptr> aset;
4685 FDRef fd;
4686 set<string> omap_attrs;
4687 Index index;
4688 bool spill_out = true;
4689
4690 int r = lfn_open(cid, oid, false, &fd);
4691 if (r < 0) {
4692 goto out;
4693 }
4694
4695 char buf[2];
4696 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4697 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
4698 spill_out = false;
4699 }
4700
4701 r = _fgetattrs(**fd, aset);
4702 if (r >= 0) {
4703 for (map<string,bufferptr>::iterator p = aset.begin(); p != aset.end(); ++p) {
4704 char n[CHAIN_XATTR_MAX_NAME_LEN];
4705 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4706 r = chain_fremovexattr(**fd, n);
4707 if (r < 0) {
4708 dout(10) << __FUNC__ << ": could not remove xattr r = " << r << dendl;
4709 goto out_close;
4710 }
4711 }
4712 }
4713
4714 if (!spill_out) {
4715 dout(10) << __FUNC__ << ": no xattr exists in object_map r = " << r << dendl;
4716 goto out_close;
4717 }
4718
4719 r = get_index(cid, &index);
4720 if (r < 0) {
4721 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
4722 goto out_close;
4723 }
4724 {
4725 r = object_map->get_all_xattrs(oid, &omap_attrs);
4726 if (r < 0 && r != -ENOENT) {
4727 dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
4728 assert(!m_filestore_fail_eio || r != -EIO);
4729 goto out_close;
4730 }
4731 r = object_map->remove_xattrs(oid, omap_attrs, &spos);
4732 if (r < 0 && r != -ENOENT) {
4733 dout(10) << __FUNC__ << ": could not remove omap_attrs r = " << r << dendl;
4734 goto out_close;
4735 }
4736 if (r == -ENOENT)
4737 r = 0;
4738 chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
4739 sizeof(XATTR_NO_SPILL_OUT));
4740 }
4741
4742 out_close:
4743 lfn_close(fd);
4744 out:
4745 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
4746 return r;
4747 }
4748
4749
4750
4751
4752 int FileStore::_collection_remove_recursive(const coll_t &cid,
4753 const SequencerPosition &spos)
4754 {
4755 struct stat st;
4756 int r = collection_stat(cid, &st);
4757 if (r < 0) {
4758 if (r == -ENOENT)
4759 return 0;
4760 return r;
4761 }
4762
4763 vector<ghobject_t> objects;
4764 ghobject_t max;
4765 while (!max.is_max()) {
4766 r = collection_list(cid, max, ghobject_t::get_max(),
4767 300, &objects, &max);
4768 if (r < 0)
4769 return r;
4770 for (vector<ghobject_t>::iterator i = objects.begin();
4771 i != objects.end();
4772 ++i) {
4773 assert(_check_replay_guard(cid, *i, spos));
4774 r = _remove(cid, *i, spos);
4775 if (r < 0)
4776 return r;
4777 }
4778 objects.clear();
4779 }
4780 return _destroy_collection(cid);
4781 }
4782
4783 // --------------------------
4784 // collections
4785
4786 int FileStore::list_collections(vector<coll_t>& ls)
4787 {
4788 return list_collections(ls, false);
4789 }
4790
4791 int FileStore::list_collections(vector<coll_t>& ls, bool include_temp)
4792 {
4793 tracepoint(objectstore, list_collections_enter);
4794 dout(10) << __FUNC__ << dendl;
4795
4796 char fn[PATH_MAX];
4797 snprintf(fn, sizeof(fn), "%s/current", basedir.c_str());
4798
4799 int r = 0;
4800 DIR *dir = ::opendir(fn);
4801 if (!dir) {
4802 r = -errno;
4803 derr << "tried opening directory " << fn << ": " << cpp_strerror(-r) << dendl;
4804 assert(!m_filestore_fail_eio || r != -EIO);
4805 return r;
4806 }
4807
4808 struct dirent *de = nullptr;
4809 while ((de = ::readdir(dir))) {
4810 if (de->d_type == DT_UNKNOWN) {
4811 // d_type not supported (non-ext[234], btrfs), must stat
4812 struct stat sb;
4813 char filename[PATH_MAX];
4814 snprintf(filename, sizeof(filename), "%s/%s", fn, de->d_name);
4815
4816 r = ::stat(filename, &sb);
4817 if (r < 0) {
4818 r = -errno;
4819 derr << "stat on " << filename << ": " << cpp_strerror(-r) << dendl;
4820 assert(!m_filestore_fail_eio || r != -EIO);
4821 break;
4822 }
4823 if (!S_ISDIR(sb.st_mode)) {
4824 continue;
4825 }
4826 } else if (de->d_type != DT_DIR) {
4827 continue;
4828 }
4829 if (strcmp(de->d_name, "omap") == 0) {
4830 continue;
4831 }
4832 if (de->d_name[0] == '.' &&
4833 (de->d_name[1] == '\0' ||
4834 (de->d_name[1] == '.' &&
4835 de->d_name[2] == '\0')))
4836 continue;
4837 coll_t cid;
4838 if (!cid.parse(de->d_name)) {
4839 derr << "ignoring invalid collection '" << de->d_name << "'" << dendl;
4840 continue;
4841 }
4842 if (!cid.is_temp() || include_temp)
4843 ls.push_back(cid);
4844 }
4845
4846 if (r > 0) {
4847 derr << "trying readdir " << fn << ": " << cpp_strerror(r) << dendl;
4848 r = -r;
4849 }
4850
4851 ::closedir(dir);
4852 assert(!m_filestore_fail_eio || r != -EIO);
4853 tracepoint(objectstore, list_collections_exit, r);
4854 return r;
4855 }
4856
4857 int FileStore::collection_stat(const coll_t& c, struct stat *st)
4858 {
4859 tracepoint(objectstore, collection_stat_enter, c.c_str());
4860 char fn[PATH_MAX];
4861 get_cdir(c, fn, sizeof(fn));
4862 dout(15) << __FUNC__ << ": " << fn << dendl;
4863 int r = ::stat(fn, st);
4864 if (r < 0)
4865 r = -errno;
4866 dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
4867 assert(!m_filestore_fail_eio || r != -EIO);
4868 tracepoint(objectstore, collection_stat_exit, r);
4869 return r;
4870 }
4871
4872 bool FileStore::collection_exists(const coll_t& c)
4873 {
4874 tracepoint(objectstore, collection_exists_enter, c.c_str());
4875 struct stat st;
4876 bool ret = collection_stat(c, &st) == 0;
4877 tracepoint(objectstore, collection_exists_exit, ret);
4878 return ret;
4879 }
4880
4881 int FileStore::collection_empty(const coll_t& c, bool *empty)
4882 {
4883 tracepoint(objectstore, collection_empty_enter, c.c_str());
4884 dout(15) << __FUNC__ << ": " << c << dendl;
4885 Index index;
4886 int r = get_index(c, &index);
4887 if (r < 0) {
4888 derr << __FUNC__ << ": get_index returned: " << cpp_strerror(r)
4889 << dendl;
4890 return r;
4891 }
4892
4893 assert(NULL != index.index);
4894 RWLock::RLocker l((index.index)->access_lock);
4895
4896 vector<ghobject_t> ls;
4897 r = index->collection_list_partial(ghobject_t(), ghobject_t::get_max(),
4898 1, &ls, NULL);
4899 if (r < 0) {
4900 derr << __FUNC__ << ": collection_list_partial returned: "
4901 << cpp_strerror(r) << dendl;
4902 assert(!m_filestore_fail_eio || r != -EIO);
4903 return r;
4904 }
4905 *empty = ls.empty();
4906 tracepoint(objectstore, collection_empty_exit, *empty);
4907 return 0;
4908 }
4909
4910 int FileStore::_collection_set_bits(const coll_t& c, int bits)
4911 {
4912 char fn[PATH_MAX];
4913 get_cdir(c, fn, sizeof(fn));
4914 dout(10) << __FUNC__ << ": " << fn << " " << bits << dendl;
4915 char n[PATH_MAX];
4916 int r;
4917 int32_t v = bits;
4918 int fd = ::open(fn, O_RDONLY);
4919 if (fd < 0) {
4920 r = -errno;
4921 goto out;
4922 }
4923 get_attrname("bits", n, PATH_MAX);
4924 r = chain_fsetxattr(fd, n, (char*)&v, sizeof(v));
4925 VOID_TEMP_FAILURE_RETRY(::close(fd));
4926 out:
4927 dout(10) << __FUNC__ << ": " << fn << " " << bits << " = " << r << dendl;
4928 return r;
4929 }
4930
4931 int FileStore::collection_bits(const coll_t& c)
4932 {
4933 char fn[PATH_MAX];
4934 get_cdir(c, fn, sizeof(fn));
4935 dout(15) << __FUNC__ << ": " << fn << dendl;
4936 int r;
4937 char n[PATH_MAX];
4938 int32_t bits;
4939 int fd = ::open(fn, O_RDONLY);
4940 if (fd < 0) {
4941 bits = r = -errno;
4942 goto out;
4943 }
4944 get_attrname("bits", n, PATH_MAX);
4945 r = chain_fgetxattr(fd, n, (char*)&bits, sizeof(bits));
4946 VOID_TEMP_FAILURE_RETRY(::close(fd));
4947 if (r < 0) {
4948 bits = r;
4949 goto out;
4950 }
4951 out:
4952 dout(10) << __FUNC__ << ": " << fn << " = " << bits << dendl;
4953 return bits;
4954 }
4955
4956 int FileStore::collection_list(const coll_t& c,
4957 const ghobject_t& orig_start,
4958 const ghobject_t& end,
4959 int max,
4960 vector<ghobject_t> *ls, ghobject_t *next)
4961 {
4962 ghobject_t start = orig_start;
4963 if (start.is_max())
4964 return 0;
4965
4966 ghobject_t temp_next;
4967 if (!next)
4968 next = &temp_next;
4969 // figure out the pool id. we need this in order to generate a
4970 // meaningful 'next' value.
4971 int64_t pool = -1;
4972 shard_id_t shard;
4973 {
4974 spg_t pgid;
4975 if (c.is_temp(&pgid)) {
4976 pool = -2 - pgid.pool();
4977 shard = pgid.shard;
4978 } else if (c.is_pg(&pgid)) {
4979 pool = pgid.pool();
4980 shard = pgid.shard;
4981 } else if (c.is_meta()) {
4982 pool = -1;
4983 shard = shard_id_t::NO_SHARD;
4984 } else {
4985 // hrm, the caller is test code! we should get kill it off. for now,
4986 // tolerate it.
4987 pool = 0;
4988 shard = shard_id_t::NO_SHARD;
4989 }
4990 dout(20) << __FUNC__ << ": pool is " << pool << " shard is " << shard
4991 << " pgid " << pgid << dendl;
4992 }
4993 ghobject_t sep;
4994 sep.hobj.pool = -1;
4995 sep.set_shard(shard);
4996 if (!c.is_temp() && !c.is_meta()) {
4997 if (start < sep) {
4998 dout(10) << __FUNC__ << ": first checking temp pool" << dendl;
4999 coll_t temp = c.get_temp();
5000 int r = collection_list(temp, start, end, max, ls, next);
5001 if (r < 0)
5002 return r;
5003 if (*next != ghobject_t::get_max())
5004 return r;
5005 start = sep;
5006 dout(10) << __FUNC__ << ": fall through to non-temp collection, start "
5007 << start << dendl;
5008 } else {
5009 dout(10) << __FUNC__ << ": start " << start << " >= sep " << sep << dendl;
5010 }
5011 }
5012
5013 Index index;
5014 int r = get_index(c, &index);
5015 if (r < 0)
5016 return r;
5017
5018 assert(NULL != index.index);
5019 RWLock::RLocker l((index.index)->access_lock);
5020
5021 r = index->collection_list_partial(start, end, max, ls, next);
5022
5023 if (r < 0) {
5024 assert(!m_filestore_fail_eio || r != -EIO);
5025 return r;
5026 }
5027 dout(20) << "objects: " << *ls << dendl;
5028
5029 // HashIndex doesn't know the pool when constructing a 'next' value
5030 if (next && !next->is_max()) {
5031 next->hobj.pool = pool;
5032 next->set_shard(shard);
5033 dout(20) << " next " << *next << dendl;
5034 }
5035
5036 return 0;
5037 }
5038
5039 int FileStore::omap_get(const coll_t& _c, const ghobject_t &hoid,
5040 bufferlist *header,
5041 map<string, bufferlist> *out)
5042 {
5043 tracepoint(objectstore, omap_get_enter, _c.c_str());
5044 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5045 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5046 Index index;
5047 int r = get_index(c, &index);
5048 if (r < 0)
5049 return r;
5050 {
5051 assert(NULL != index.index);
5052 RWLock::RLocker l((index.index)->access_lock);
5053 r = lfn_find(hoid, index);
5054 if (r < 0)
5055 return r;
5056 }
5057 r = object_map->get(hoid, header, out);
5058 if (r < 0 && r != -ENOENT) {
5059 assert(!m_filestore_fail_eio || r != -EIO);
5060 return r;
5061 }
5062 tracepoint(objectstore, omap_get_exit, 0);
5063 return 0;
5064 }
5065
5066 int FileStore::omap_get_header(
5067 const coll_t& _c,
5068 const ghobject_t &hoid,
5069 bufferlist *bl,
5070 bool allow_eio)
5071 {
5072 tracepoint(objectstore, omap_get_header_enter, _c.c_str());
5073 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5074 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5075 Index index;
5076 int r = get_index(c, &index);
5077 if (r < 0)
5078 return r;
5079 {
5080 assert(NULL != index.index);
5081 RWLock::RLocker l((index.index)->access_lock);
5082 r = lfn_find(hoid, index);
5083 if (r < 0)
5084 return r;
5085 }
5086 r = object_map->get_header(hoid, bl);
5087 if (r < 0 && r != -ENOENT) {
5088 assert(allow_eio || !m_filestore_fail_eio || r != -EIO);
5089 return r;
5090 }
5091 tracepoint(objectstore, omap_get_header_exit, 0);
5092 return 0;
5093 }
5094
5095 int FileStore::omap_get_keys(const coll_t& _c, const ghobject_t &hoid, set<string> *keys)
5096 {
5097 tracepoint(objectstore, omap_get_keys_enter, _c.c_str());
5098 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5099 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5100 Index index;
5101 int r = get_index(c, &index);
5102 if (r < 0)
5103 return r;
5104 {
5105 assert(NULL != index.index);
5106 RWLock::RLocker l((index.index)->access_lock);
5107 r = lfn_find(hoid, index);
5108 if (r < 0)
5109 return r;
5110 }
5111 r = object_map->get_keys(hoid, keys);
5112 if (r < 0 && r != -ENOENT) {
5113 assert(!m_filestore_fail_eio || r != -EIO);
5114 return r;
5115 }
5116 tracepoint(objectstore, omap_get_keys_exit, 0);
5117 return 0;
5118 }
5119
5120 int FileStore::omap_get_values(const coll_t& _c, const ghobject_t &hoid,
5121 const set<string> &keys,
5122 map<string, bufferlist> *out)
5123 {
5124 tracepoint(objectstore, omap_get_values_enter, _c.c_str());
5125 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5126 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5127 Index index;
5128 const char *where = "()";
5129 int r = get_index(c, &index);
5130 if (r < 0) {
5131 where = " (get_index)";
5132 goto out;
5133 }
5134 {
5135 assert(NULL != index.index);
5136 RWLock::RLocker l((index.index)->access_lock);
5137 r = lfn_find(hoid, index);
5138 if (r < 0) {
5139 where = " (lfn_find)";
5140 goto out;
5141 }
5142 }
5143 r = object_map->get_values(hoid, keys, out);
5144 if (r < 0 && r != -ENOENT) {
5145 assert(!m_filestore_fail_eio || r != -EIO);
5146 where = " (get_values)";
5147 goto out;
5148 }
5149 r = 0;
5150 out:
5151 tracepoint(objectstore, omap_get_values_exit, r);
5152 dout(15) << __FUNC__ << ": " << c << "/" << hoid << " = " << r
5153 << where << dendl;
5154 return r;
5155 }
5156
5157 int FileStore::omap_check_keys(const coll_t& _c, const ghobject_t &hoid,
5158 const set<string> &keys,
5159 set<string> *out)
5160 {
5161 tracepoint(objectstore, omap_check_keys_enter, _c.c_str());
5162 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5163 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5164
5165 Index index;
5166 int r = get_index(c, &index);
5167 if (r < 0)
5168 return r;
5169 {
5170 assert(NULL != index.index);
5171 RWLock::RLocker l((index.index)->access_lock);
5172 r = lfn_find(hoid, index);
5173 if (r < 0)
5174 return r;
5175 }
5176 r = object_map->check_keys(hoid, keys, out);
5177 if (r < 0 && r != -ENOENT) {
5178 assert(!m_filestore_fail_eio || r != -EIO);
5179 return r;
5180 }
5181 tracepoint(objectstore, omap_check_keys_exit, 0);
5182 return 0;
5183 }
5184
5185 ObjectMap::ObjectMapIterator FileStore::get_omap_iterator(const coll_t& _c,
5186 const ghobject_t &hoid)
5187 {
5188 tracepoint(objectstore, get_omap_iterator, _c.c_str());
5189 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5190 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5191 Index index;
5192 int r = get_index(c, &index);
5193 if (r < 0) {
5194 dout(10) << __FUNC__ << ": " << c << "/" << hoid << " = 0 "
5195 << "(get_index failed with " << cpp_strerror(r) << ")" << dendl;
5196 return ObjectMap::ObjectMapIterator();
5197 }
5198 {
5199 assert(NULL != index.index);
5200 RWLock::RLocker l((index.index)->access_lock);
5201 r = lfn_find(hoid, index);
5202 if (r < 0) {
5203 dout(10) << __FUNC__ << ": " << c << "/" << hoid << " = 0 "
5204 << "(lfn_find failed with " << cpp_strerror(r) << ")" << dendl;
5205 return ObjectMap::ObjectMapIterator();
5206 }
5207 }
5208 return object_map->get_iterator(hoid);
5209 }
5210
5211 int FileStore::_collection_hint_expected_num_objs(const coll_t& c, uint32_t pg_num,
5212 uint64_t expected_num_objs,
5213 const SequencerPosition &spos)
5214 {
5215 dout(15) << __FUNC__ << ": collection: " << c << " pg number: "
5216 << pg_num << " expected number of objects: " << expected_num_objs << dendl;
5217
5218 bool empty;
5219 int ret = collection_empty(c, &empty);
5220 if (ret < 0)
5221 return ret;
5222 if (!empty && !replaying) {
5223 dout(0) << "Failed to give an expected number of objects hint to collection : "
5224 << c << ", only empty collection can take such type of hint. " << dendl;
5225 return 0;
5226 }
5227
5228 Index index;
5229 ret = get_index(c, &index);
5230 if (ret < 0)
5231 return ret;
5232 // Pre-hash the collection
5233 ret = index->pre_hash_collection(pg_num, expected_num_objs);
5234 dout(10) << "pre_hash_collection " << c << " = " << ret << dendl;
5235 if (ret < 0)
5236 return ret;
5237 _set_replay_guard(c, spos);
5238
5239 return 0;
5240 }
5241
5242 int FileStore::_create_collection(
5243 const coll_t& c,
5244 int bits,
5245 const SequencerPosition &spos)
5246 {
5247 char fn[PATH_MAX];
5248 get_cdir(c, fn, sizeof(fn));
5249 dout(15) << __FUNC__ << ": " << fn << dendl;
5250 int r = ::mkdir(fn, 0755);
5251 if (r < 0)
5252 r = -errno;
5253 if (r == -EEXIST && replaying)
5254 r = 0;
5255 dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
5256
5257 if (r < 0)
5258 return r;
5259 r = init_index(c);
5260 if (r < 0)
5261 return r;
5262 r = _collection_set_bits(c, bits);
5263 if (r < 0)
5264 return r;
5265 // create parallel temp collection, too
5266 if (!c.is_meta() && !c.is_temp()) {
5267 coll_t temp = c.get_temp();
5268 r = _create_collection(temp, 0, spos);
5269 if (r < 0)
5270 return r;
5271 }
5272
5273 _set_replay_guard(c, spos);
5274 return 0;
5275 }
5276
5277 int FileStore::_destroy_collection(const coll_t& c)
5278 {
5279 int r = 0;
5280 char fn[PATH_MAX];
5281 get_cdir(c, fn, sizeof(fn));
5282 dout(15) << __FUNC__ << ": " << fn << dendl;
5283 {
5284 Index from;
5285 r = get_index(c, &from);
5286 if (r < 0)
5287 goto out;
5288 assert(NULL != from.index);
5289 RWLock::WLocker l((from.index)->access_lock);
5290
5291 r = from->prep_delete();
5292 if (r < 0)
5293 goto out;
5294 }
5295 r = ::rmdir(fn);
5296 if (r < 0) {
5297 r = -errno;
5298 goto out;
5299 }
5300
5301 out:
5302 // destroy parallel temp collection, too
5303 if (!c.is_meta() && !c.is_temp()) {
5304 coll_t temp = c.get_temp();
5305 int r2 = _destroy_collection(temp);
5306 if (r2 < 0) {
5307 r = r2;
5308 goto out_final;
5309 }
5310 }
5311
5312 out_final:
5313 dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
5314 return r;
5315 }
5316
5317
5318 int FileStore::_collection_add(const coll_t& c, const coll_t& oldcid, const ghobject_t& o,
5319 const SequencerPosition& spos)
5320 {
5321 dout(15) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << o << dendl;
5322
5323 int dstcmp = _check_replay_guard(c, o, spos);
5324 if (dstcmp < 0)
5325 return 0;
5326
5327 // check the src name too; it might have a newer guard, and we don't
5328 // want to clobber it
5329 int srccmp = _check_replay_guard(oldcid, o, spos);
5330 if (srccmp < 0)
5331 return 0;
5332
5333 // open guard on object so we don't any previous operations on the
5334 // new name that will modify the source inode.
5335 FDRef fd;
5336 int r = lfn_open(oldcid, o, 0, &fd);
5337 if (r < 0) {
5338 // the source collection/object does not exist. If we are replaying, we
5339 // should be safe, so just return 0 and move on.
5340 assert(replaying);
5341 dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
5342 << oldcid << "/" << o << " (dne, continue replay) " << dendl;
5343 return 0;
5344 }
5345 if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress"
5346 _set_replay_guard(**fd, spos, &o, true);
5347 }
5348
5349 r = lfn_link(oldcid, c, o, o);
5350 if (replaying && !backend->can_checkpoint() &&
5351 r == -EEXIST) // crashed between link() and set_replay_guard()
5352 r = 0;
5353
5354 _inject_failure();
5355
5356 // close guard on object so we don't do this again
5357 if (r == 0) {
5358 _close_replay_guard(**fd, spos);
5359 }
5360 lfn_close(fd);
5361
5362 dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << o << " = " << r << dendl;
5363 return r;
5364 }
5365
5366 int FileStore::_collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid,
5367 coll_t c, const ghobject_t& o,
5368 const SequencerPosition& spos,
5369 bool allow_enoent)
5370 {
5371 dout(15) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid << dendl;
5372 int r = 0;
5373 int dstcmp, srccmp;
5374
5375 if (replaying) {
5376 /* If the destination collection doesn't exist during replay,
5377 * we need to delete the src object and continue on
5378 */
5379 if (!collection_exists(c))
5380 goto out_rm_src;
5381 }
5382
5383 dstcmp = _check_replay_guard(c, o, spos);
5384 if (dstcmp < 0)
5385 goto out_rm_src;
5386
5387 // check the src name too; it might have a newer guard, and we don't
5388 // want to clobber it
5389 srccmp = _check_replay_guard(oldcid, oldoid, spos);
5390 if (srccmp < 0)
5391 return 0;
5392
5393 {
5394 // open guard on object so we don't any previous operations on the
5395 // new name that will modify the source inode.
5396 FDRef fd;
5397 r = lfn_open(oldcid, oldoid, 0, &fd);
5398 if (r < 0) {
5399 // the source collection/object does not exist. If we are replaying, we
5400 // should be safe, so just return 0 and move on.
5401 if (replaying) {
5402 dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
5403 << oldcid << "/" << oldoid << " (dne, continue replay) " << dendl;
5404 } else if (allow_enoent) {
5405 dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
5406 << oldcid << "/" << oldoid << " (dne, ignoring enoent)"
5407 << dendl;
5408 } else {
5409 assert(0 == "ERROR: source must exist");
5410 }
5411
5412 if (!replaying) {
5413 return 0;
5414 }
5415 if (allow_enoent && dstcmp > 0) { // if dstcmp == 0, try_rename was started.
5416 return 0;
5417 }
5418
5419 r = 0; // don't know if object_map was cloned
5420 } else {
5421 if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress"
5422 _set_replay_guard(**fd, spos, &o, true);
5423 }
5424
5425 r = lfn_link(oldcid, c, oldoid, o);
5426 if (replaying && !backend->can_checkpoint() &&
5427 r == -EEXIST) // crashed between link() and set_replay_guard()
5428 r = 0;
5429
5430 lfn_close(fd);
5431 fd = FDRef();
5432
5433 _inject_failure();
5434 }
5435
5436 if (r == 0) {
5437 // the name changed; link the omap content
5438 r = object_map->rename(oldoid, o, &spos);
5439 if (r == -ENOENT)
5440 r = 0;
5441 }
5442
5443 _inject_failure();
5444
5445 if (r == 0)
5446 r = lfn_unlink(oldcid, oldoid, spos, true);
5447
5448 if (r == 0)
5449 r = lfn_open(c, o, 0, &fd);
5450
5451 // close guard on object so we don't do this again
5452 if (r == 0) {
5453 _close_replay_guard(**fd, spos, &o);
5454 lfn_close(fd);
5455 }
5456 }
5457
5458 dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid
5459 << " = " << r << dendl;
5460 return r;
5461
5462 out_rm_src:
5463 // remove source
5464 if (_check_replay_guard(oldcid, oldoid, spos) > 0) {
5465 r = lfn_unlink(oldcid, oldoid, spos, true);
5466 }
5467
5468 dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid
5469 << " = " << r << dendl;
5470 return r;
5471 }
5472
5473 void FileStore::_inject_failure()
5474 {
5475 if (m_filestore_kill_at) {
5476 int final = --m_filestore_kill_at;
5477 dout(5) << __FUNC__ << ": " << (final+1) << " -> " << final << dendl;
5478 if (final == 0) {
5479 derr << __FUNC__ << ": KILLING" << dendl;
5480 cct->_log->flush();
5481 _exit(1);
5482 }
5483 }
5484 }
5485
5486 int FileStore::_omap_clear(const coll_t& cid, const ghobject_t &hoid,
5487 const SequencerPosition &spos) {
5488 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
5489 Index index;
5490 int r = get_index(cid, &index);
5491 if (r < 0)
5492 return r;
5493 {
5494 assert(NULL != index.index);
5495 RWLock::RLocker l((index.index)->access_lock);
5496 r = lfn_find(hoid, index);
5497 if (r < 0)
5498 return r;
5499 }
5500 r = object_map->clear_keys_header(hoid, &spos);
5501 if (r < 0 && r != -ENOENT)
5502 return r;
5503 return 0;
5504 }
5505
5506 int FileStore::_omap_setkeys(const coll_t& cid, const ghobject_t &hoid,
5507 const map<string, bufferlist> &aset,
5508 const SequencerPosition &spos) {
5509 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
5510 Index index;
5511 int r;
5512 //treat pgmeta as a logical object, skip to check exist
5513 if (hoid.is_pgmeta())
5514 goto skip;
5515
5516 r = get_index(cid, &index);
5517 if (r < 0) {
5518 dout(20) << __FUNC__ << ": get_index got " << cpp_strerror(r) << dendl;
5519 return r;
5520 }
5521 {
5522 assert(NULL != index.index);
5523 RWLock::RLocker l((index.index)->access_lock);
5524 r = lfn_find(hoid, index);
5525 if (r < 0) {
5526 dout(20) << __FUNC__ << ": lfn_find got " << cpp_strerror(r) << dendl;
5527 return r;
5528 }
5529 }
5530 skip:
5531 if (g_conf->subsys.should_gather(ceph_subsys_filestore, 20)) {
5532 for (auto& p : aset) {
5533 dout(20) << __FUNC__ << ": set " << p.first << dendl;
5534 }
5535 }
5536 r = object_map->set_keys(hoid, aset, &spos);
5537 dout(20) << __FUNC__ << ": " << cid << "/" << hoid << " = " << r << dendl;
5538 return r;
5539 }
5540
5541 int FileStore::_omap_rmkeys(const coll_t& cid, const ghobject_t &hoid,
5542 const set<string> &keys,
5543 const SequencerPosition &spos) {
5544 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
5545 Index index;
5546 int r;
5547 //treat pgmeta as a logical object, skip to check exist
5548 if (hoid.is_pgmeta())
5549 goto skip;
5550
5551 r = get_index(cid, &index);
5552 if (r < 0)
5553 return r;
5554 {
5555 assert(NULL != index.index);
5556 RWLock::RLocker l((index.index)->access_lock);
5557 r = lfn_find(hoid, index);
5558 if (r < 0)
5559 return r;
5560 }
5561 skip:
5562 r = object_map->rm_keys(hoid, keys, &spos);
5563 if (r < 0 && r != -ENOENT)
5564 return r;
5565 return 0;
5566 }
5567
5568 int FileStore::_omap_rmkeyrange(const coll_t& cid, const ghobject_t &hoid,
5569 const string& first, const string& last,
5570 const SequencerPosition &spos) {
5571 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << " [" << first << "," << last << "]" << dendl;
5572 set<string> keys;
5573 {
5574 ObjectMap::ObjectMapIterator iter = get_omap_iterator(cid, hoid);
5575 if (!iter)
5576 return -ENOENT;
5577 for (iter->lower_bound(first); iter->valid() && iter->key() < last;
5578 iter->next()) {
5579 keys.insert(iter->key());
5580 }
5581 }
5582 return _omap_rmkeys(cid, hoid, keys, spos);
5583 }
5584
5585 int FileStore::_omap_setheader(const coll_t& cid, const ghobject_t &hoid,
5586 const bufferlist &bl,
5587 const SequencerPosition &spos)
5588 {
5589 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
5590 Index index;
5591 int r = get_index(cid, &index);
5592 if (r < 0)
5593 return r;
5594 {
5595 assert(NULL != index.index);
5596 RWLock::RLocker l((index.index)->access_lock);
5597 r = lfn_find(hoid, index);
5598 if (r < 0)
5599 return r;
5600 }
5601 return object_map->set_header(hoid, bl, &spos);
5602 }
5603
5604 int FileStore::_split_collection(const coll_t& cid,
5605 uint32_t bits,
5606 uint32_t rem,
5607 coll_t dest,
5608 const SequencerPosition &spos)
5609 {
5610 int r;
5611 {
5612 dout(15) << __FUNC__ << ": " << cid << " bits: " << bits << dendl;
5613 if (!collection_exists(cid)) {
5614 dout(2) << __FUNC__ << ": " << cid << " DNE" << dendl;
5615 assert(replaying);
5616 return 0;
5617 }
5618 if (!collection_exists(dest)) {
5619 dout(2) << __FUNC__ << ": " << dest << " DNE" << dendl;
5620 assert(replaying);
5621 return 0;
5622 }
5623
5624 int dstcmp = _check_replay_guard(dest, spos);
5625 if (dstcmp < 0)
5626 return 0;
5627
5628 int srccmp = _check_replay_guard(cid, spos);
5629 if (srccmp < 0)
5630 return 0;
5631
5632 _set_global_replay_guard(cid, spos);
5633 _set_replay_guard(cid, spos, true);
5634 _set_replay_guard(dest, spos, true);
5635
5636 Index from;
5637 r = get_index(cid, &from);
5638
5639 Index to;
5640 if (!r)
5641 r = get_index(dest, &to);
5642
5643 if (!r) {
5644 assert(NULL != from.index);
5645 RWLock::WLocker l1((from.index)->access_lock);
5646
5647 assert(NULL != to.index);
5648 RWLock::WLocker l2((to.index)->access_lock);
5649
5650 r = from->split(rem, bits, to.index);
5651 }
5652
5653 _close_replay_guard(cid, spos);
5654 _close_replay_guard(dest, spos);
5655 }
5656 _collection_set_bits(cid, bits);
5657 if (!r && cct->_conf->filestore_debug_verify_split) {
5658 vector<ghobject_t> objects;
5659 ghobject_t next;
5660 while (1) {
5661 collection_list(
5662 cid,
5663 next, ghobject_t::get_max(),
5664 get_ideal_list_max(),
5665 &objects,
5666 &next);
5667 if (objects.empty())
5668 break;
5669 for (vector<ghobject_t>::iterator i = objects.begin();
5670 i != objects.end();
5671 ++i) {
5672 dout(20) << __FUNC__ << ": " << *i << " still in source "
5673 << cid << dendl;
5674 assert(!i->match(bits, rem));
5675 }
5676 objects.clear();
5677 }
5678 next = ghobject_t();
5679 while (1) {
5680 collection_list(
5681 dest,
5682 next, ghobject_t::get_max(),
5683 get_ideal_list_max(),
5684 &objects,
5685 &next);
5686 if (objects.empty())
5687 break;
5688 for (vector<ghobject_t>::iterator i = objects.begin();
5689 i != objects.end();
5690 ++i) {
5691 dout(20) << __FUNC__ << ": " << *i << " now in dest "
5692 << *i << dendl;
5693 assert(i->match(bits, rem));
5694 }
5695 objects.clear();
5696 }
5697 }
5698 return r;
5699 }
5700
5701 int FileStore::_set_alloc_hint(const coll_t& cid, const ghobject_t& oid,
5702 uint64_t expected_object_size,
5703 uint64_t expected_write_size)
5704 {
5705 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " object_size " << expected_object_size << " write_size " << expected_write_size << dendl;
5706
5707 FDRef fd;
5708 int ret = 0;
5709
5710 if (expected_object_size == 0 || expected_write_size == 0)
5711 goto out;
5712
5713 ret = lfn_open(cid, oid, false, &fd);
5714 if (ret < 0)
5715 goto out;
5716
5717 {
5718 // TODO: a more elaborate hint calculation
5719 uint64_t hint = MIN(expected_write_size, m_filestore_max_alloc_hint_size);
5720
5721 ret = backend->set_alloc_hint(**fd, hint);
5722 dout(20) << __FUNC__ << ": hint " << hint << " ret " << ret << dendl;
5723 }
5724
5725 lfn_close(fd);
5726 out:
5727 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " object_size " << expected_object_size << " write_size " << expected_write_size << " = " << ret << dendl;
5728 assert(!m_filestore_fail_eio || ret != -EIO);
5729 return ret;
5730 }
5731
5732 const char** FileStore::get_tracked_conf_keys() const
5733 {
5734 static const char* KEYS[] = {
5735 "filestore_max_inline_xattr_size",
5736 "filestore_max_inline_xattr_size_xfs",
5737 "filestore_max_inline_xattr_size_btrfs",
5738 "filestore_max_inline_xattr_size_other",
5739 "filestore_max_inline_xattrs",
5740 "filestore_max_inline_xattrs_xfs",
5741 "filestore_max_inline_xattrs_btrfs",
5742 "filestore_max_inline_xattrs_other",
5743 "filestore_max_xattr_value_size",
5744 "filestore_max_xattr_value_size_xfs",
5745 "filestore_max_xattr_value_size_btrfs",
5746 "filestore_max_xattr_value_size_other",
5747 "filestore_min_sync_interval",
5748 "filestore_max_sync_interval",
5749 "filestore_queue_max_ops",
5750 "filestore_queue_max_bytes",
5751 "filestore_expected_throughput_bytes",
5752 "filestore_expected_throughput_ops",
5753 "filestore_queue_low_threshhold",
5754 "filestore_queue_high_threshhold",
5755 "filestore_queue_high_delay_multiple",
5756 "filestore_queue_max_delay_multiple",
5757 "filestore_commit_timeout",
5758 "filestore_dump_file",
5759 "filestore_kill_at",
5760 "filestore_fail_eio",
5761 "filestore_fadvise",
5762 "filestore_sloppy_crc",
5763 "filestore_sloppy_crc_block_size",
5764 "filestore_max_alloc_hint_size",
5765 NULL
5766 };
5767 return KEYS;
5768 }
5769
5770 void FileStore::handle_conf_change(const struct md_config_t *conf,
5771 const std::set <std::string> &changed)
5772 {
5773 if (changed.count("filestore_max_inline_xattr_size") ||
5774 changed.count("filestore_max_inline_xattr_size_xfs") ||
5775 changed.count("filestore_max_inline_xattr_size_btrfs") ||
5776 changed.count("filestore_max_inline_xattr_size_other") ||
5777 changed.count("filestore_max_inline_xattrs") ||
5778 changed.count("filestore_max_inline_xattrs_xfs") ||
5779 changed.count("filestore_max_inline_xattrs_btrfs") ||
5780 changed.count("filestore_max_inline_xattrs_other") ||
5781 changed.count("filestore_max_xattr_value_size") ||
5782 changed.count("filestore_max_xattr_value_size_xfs") ||
5783 changed.count("filestore_max_xattr_value_size_btrfs") ||
5784 changed.count("filestore_max_xattr_value_size_other")) {
5785 if (backend) {
5786 Mutex::Locker l(lock);
5787 set_xattr_limits_via_conf();
5788 }
5789 }
5790
5791 if (changed.count("filestore_queue_max_bytes") ||
5792 changed.count("filestore_queue_max_ops") ||
5793 changed.count("filestore_expected_throughput_bytes") ||
5794 changed.count("filestore_expected_throughput_ops") ||
5795 changed.count("filestore_queue_low_threshhold") ||
5796 changed.count("filestore_queue_high_threshhold") ||
5797 changed.count("filestore_queue_high_delay_multiple") ||
5798 changed.count("filestore_queue_max_delay_multiple")) {
5799 Mutex::Locker l(lock);
5800 set_throttle_params();
5801 }
5802
5803 if (changed.count("filestore_min_sync_interval") ||
5804 changed.count("filestore_max_sync_interval") ||
5805 changed.count("filestore_kill_at") ||
5806 changed.count("filestore_fail_eio") ||
5807 changed.count("filestore_sloppy_crc") ||
5808 changed.count("filestore_sloppy_crc_block_size") ||
5809 changed.count("filestore_max_alloc_hint_size") ||
5810 changed.count("filestore_fadvise")) {
5811 Mutex::Locker l(lock);
5812 m_filestore_min_sync_interval = conf->filestore_min_sync_interval;
5813 m_filestore_max_sync_interval = conf->filestore_max_sync_interval;
5814 m_filestore_kill_at = conf->filestore_kill_at;
5815 m_filestore_fail_eio = conf->filestore_fail_eio;
5816 m_filestore_fadvise = conf->filestore_fadvise;
5817 m_filestore_sloppy_crc = conf->filestore_sloppy_crc;
5818 m_filestore_sloppy_crc_block_size = conf->filestore_sloppy_crc_block_size;
5819 m_filestore_max_alloc_hint_size = conf->filestore_max_alloc_hint_size;
5820 }
5821 if (changed.count("filestore_commit_timeout")) {
5822 Mutex::Locker l(sync_entry_timeo_lock);
5823 m_filestore_commit_timeout = conf->filestore_commit_timeout;
5824 }
5825 if (changed.count("filestore_dump_file")) {
5826 if (conf->filestore_dump_file.length() &&
5827 conf->filestore_dump_file != "-") {
5828 dump_start(conf->filestore_dump_file);
5829 } else {
5830 dump_stop();
5831 }
5832 }
5833 }
5834
5835 int FileStore::set_throttle_params()
5836 {
5837 stringstream ss;
5838 bool valid = throttle_bytes.set_params(
5839 cct->_conf->filestore_queue_low_threshhold,
5840 cct->_conf->filestore_queue_high_threshhold,
5841 cct->_conf->filestore_expected_throughput_bytes,
5842 cct->_conf->filestore_queue_high_delay_multiple,
5843 cct->_conf->filestore_queue_max_delay_multiple,
5844 cct->_conf->filestore_queue_max_bytes,
5845 &ss);
5846
5847 valid &= throttle_ops.set_params(
5848 cct->_conf->filestore_queue_low_threshhold,
5849 cct->_conf->filestore_queue_high_threshhold,
5850 cct->_conf->filestore_expected_throughput_ops,
5851 cct->_conf->filestore_queue_high_delay_multiple,
5852 cct->_conf->filestore_queue_max_delay_multiple,
5853 cct->_conf->filestore_queue_max_ops,
5854 &ss);
5855
5856 logger->set(l_filestore_op_queue_max_ops, throttle_ops.get_max());
5857 logger->set(l_filestore_op_queue_max_bytes, throttle_bytes.get_max());
5858
5859 if (!valid) {
5860 derr << "tried to set invalid params: "
5861 << ss.str()
5862 << dendl;
5863 }
5864 return valid ? 0 : -EINVAL;
5865 }
5866
5867 void FileStore::dump_start(const std::string& file)
5868 {
5869 dout(10) << __FUNC__ << ": " << file << dendl;
5870 if (m_filestore_do_dump) {
5871 dump_stop();
5872 }
5873 m_filestore_dump_fmt.reset();
5874 m_filestore_dump_fmt.open_array_section("dump");
5875 m_filestore_dump.open(file.c_str());
5876 m_filestore_do_dump = true;
5877 }
5878
5879 void FileStore::dump_stop()
5880 {
5881 dout(10) << __FUNC__ << dendl;
5882 m_filestore_do_dump = false;
5883 if (m_filestore_dump.is_open()) {
5884 m_filestore_dump_fmt.close_section();
5885 m_filestore_dump_fmt.flush(m_filestore_dump);
5886 m_filestore_dump.flush();
5887 m_filestore_dump.close();
5888 }
5889 }
5890
5891 void FileStore::dump_transactions(vector<ObjectStore::Transaction>& ls, uint64_t seq, OpSequencer *osr)
5892 {
5893 m_filestore_dump_fmt.open_array_section("transactions");
5894 unsigned trans_num = 0;
5895 for (vector<ObjectStore::Transaction>::iterator i = ls.begin(); i != ls.end(); ++i, ++trans_num) {
5896 m_filestore_dump_fmt.open_object_section("transaction");
5897 m_filestore_dump_fmt.dump_string("osr", osr->get_name());
5898 m_filestore_dump_fmt.dump_unsigned("seq", seq);
5899 m_filestore_dump_fmt.dump_unsigned("trans_num", trans_num);
5900 (*i).dump(&m_filestore_dump_fmt);
5901 m_filestore_dump_fmt.close_section();
5902 }
5903 m_filestore_dump_fmt.close_section();
5904 m_filestore_dump_fmt.flush(m_filestore_dump);
5905 m_filestore_dump.flush();
5906 }
5907
5908 void FileStore::set_xattr_limits_via_conf()
5909 {
5910 uint32_t fs_xattr_size;
5911 uint32_t fs_xattrs;
5912 uint32_t fs_xattr_max_value_size;
5913
5914 switch (m_fs_type) {
5915 #if defined(__linux__)
5916 case XFS_SUPER_MAGIC:
5917 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_xfs;
5918 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_xfs;
5919 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_xfs;
5920 break;
5921 case BTRFS_SUPER_MAGIC:
5922 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_btrfs;
5923 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_btrfs;
5924 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_btrfs;
5925 break;
5926 #endif
5927 default:
5928 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_other;
5929 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_other;
5930 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_other;
5931 break;
5932 }
5933
5934 // Use override value if set
5935 if (cct->_conf->filestore_max_inline_xattr_size)
5936 m_filestore_max_inline_xattr_size = cct->_conf->filestore_max_inline_xattr_size;
5937 else
5938 m_filestore_max_inline_xattr_size = fs_xattr_size;
5939
5940 // Use override value if set
5941 if (cct->_conf->filestore_max_inline_xattrs)
5942 m_filestore_max_inline_xattrs = cct->_conf->filestore_max_inline_xattrs;
5943 else
5944 m_filestore_max_inline_xattrs = fs_xattrs;
5945
5946 // Use override value if set
5947 if (cct->_conf->filestore_max_xattr_value_size)
5948 m_filestore_max_xattr_value_size = cct->_conf->filestore_max_xattr_value_size;
5949 else
5950 m_filestore_max_xattr_value_size = fs_xattr_max_value_size;
5951
5952 if (m_filestore_max_xattr_value_size < cct->_conf->osd_max_object_name_len) {
5953 derr << "WARNING: max attr value size ("
5954 << m_filestore_max_xattr_value_size
5955 << ") is smaller than osd_max_object_name_len ("
5956 << cct->_conf->osd_max_object_name_len
5957 << "). Your backend filesystem appears to not support attrs large "
5958 << "enough to handle the configured max rados name size. You may get "
5959 << "unexpected ENAMETOOLONG errors on rados operations or buggy "
5960 << "behavior"
5961 << dendl;
5962 }
5963 }
5964
5965 uint64_t FileStore::estimate_objects_overhead(uint64_t num_objects)
5966 {
5967 uint64_t res = num_objects * blk_size / 2; //assumes that each object uses ( in average ) additional 1/2 block due to FS allocation granularity.
5968 return res;
5969 }
5970
5971 int FileStore::apply_layout_settings(const coll_t &cid)
5972 {
5973 dout(20) << __FUNC__ << ": " << cid << dendl;
5974 Index index;
5975 int r = get_index(cid, &index);
5976 if (r < 0) {
5977 dout(10) << "Error getting index for " << cid << ": " << cpp_strerror(r)
5978 << dendl;
5979 return r;
5980 }
5981
5982 return index->apply_layout_settings();
5983 }
5984
5985
5986 // -- FSSuperblock --
5987
5988 void FSSuperblock::encode(bufferlist &bl) const
5989 {
5990 ENCODE_START(2, 1, bl);
5991 compat_features.encode(bl);
5992 ::encode(omap_backend, bl);
5993 ENCODE_FINISH(bl);
5994 }
5995
5996 void FSSuperblock::decode(bufferlist::iterator &bl)
5997 {
5998 DECODE_START(2, bl);
5999 compat_features.decode(bl);
6000 if (struct_v >= 2)
6001 ::decode(omap_backend, bl);
6002 else
6003 omap_backend = "leveldb";
6004 DECODE_FINISH(bl);
6005 }
6006
6007 void FSSuperblock::dump(Formatter *f) const
6008 {
6009 f->open_object_section("compat");
6010 compat_features.dump(f);
6011 f->dump_string("omap_backend", omap_backend);
6012 f->close_section();
6013 }
6014
6015 void FSSuperblock::generate_test_instances(list<FSSuperblock*>& o)
6016 {
6017 FSSuperblock z;
6018 o.push_back(new FSSuperblock(z));
6019 CompatSet::FeatureSet feature_compat;
6020 CompatSet::FeatureSet feature_ro_compat;
6021 CompatSet::FeatureSet feature_incompat;
6022 feature_incompat.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS);
6023 z.compat_features = CompatSet(feature_compat, feature_ro_compat,
6024 feature_incompat);
6025 o.push_back(new FSSuperblock(z));
6026 z.omap_backend = "rocksdb";
6027 o.push_back(new FSSuperblock(z));
6028 }