]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/filestore/FileStore.cc
update sources to v12.2.3
[ceph.git] / ceph / src / os / filestore / FileStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15 #include "include/compat.h"
16 #include "include/int_types.h"
17 #include "boost/tuple/tuple.hpp"
18
19 #include <unistd.h>
20 #include <stdlib.h>
21 #include <sys/types.h>
22 #include <sys/stat.h>
23 #include <fcntl.h>
24 #include <sys/file.h>
25 #include <errno.h>
26 #include <dirent.h>
27 #include <sys/ioctl.h>
28
29 #if defined(__linux__)
30 #include <linux/fs.h>
31 #endif
32
33 #include <iostream>
34 #include <map>
35
36 #include "include/linux_fiemap.h"
37
38 #include "common/xattr.h"
39 #include "chain_xattr.h"
40
41 #if defined(DARWIN) || defined(__FreeBSD__)
42 #include <sys/param.h>
43 #include <sys/mount.h>
44 #endif // DARWIN
45
46
47 #include <fstream>
48 #include <sstream>
49
50 #include "FileStore.h"
51 #include "GenericFileStoreBackend.h"
52 #include "BtrfsFileStoreBackend.h"
53 #include "XfsFileStoreBackend.h"
54 #include "ZFSFileStoreBackend.h"
55 #include "common/BackTrace.h"
56 #include "include/types.h"
57 #include "FileJournal.h"
58
59 #include "osd/osd_types.h"
60 #include "include/color.h"
61 #include "include/buffer.h"
62
63 #include "common/Timer.h"
64 #include "common/debug.h"
65 #include "common/errno.h"
66 #include "common/run_cmd.h"
67 #include "common/safe_io.h"
68 #include "common/perf_counters.h"
69 #include "common/sync_filesystem.h"
70 #include "common/fd.h"
71 #include "HashIndex.h"
72 #include "DBObjectMap.h"
73 #include "kv/KeyValueDB.h"
74
75 #include "common/ceph_crypto.h"
76 using ceph::crypto::SHA1;
77
78 #include "include/assert.h"
79
80 #include "common/config.h"
81 #include "common/blkdev.h"
82
83 #ifdef WITH_LTTNG
84 #define TRACEPOINT_DEFINE
85 #define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
86 #include "tracing/objectstore.h"
87 #undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE
88 #undef TRACEPOINT_DEFINE
89 #else
90 #define tracepoint(...)
91 #endif
92
93 #define dout_context cct
94 #define dout_subsys ceph_subsys_filestore
95 #undef dout_prefix
96 #define dout_prefix *_dout << "filestore(" << basedir << ") "
97
98 #define COMMIT_SNAP_ITEM "snap_%llu"
99 #define CLUSTER_SNAP_ITEM "clustersnap_%s"
100
101 #define REPLAY_GUARD_XATTR "user.cephos.seq"
102 #define GLOBAL_REPLAY_GUARD_XATTR "user.cephos.gseq"
103
104 // XATTR_SPILL_OUT_NAME as a xattr is used to maintain that indicates whether
105 // xattrs spill over into DBObjectMap, if XATTR_SPILL_OUT_NAME exists in file
106 // xattrs and the value is "no", it indicates no xattrs in DBObjectMap
107 #define XATTR_SPILL_OUT_NAME "user.cephos.spill_out"
108 #define XATTR_NO_SPILL_OUT "0"
109 #define XATTR_SPILL_OUT "1"
110 #define __FUNC__ __func__ << "(" << __LINE__ << ")"
111
112 //Initial features in new superblock.
113 static CompatSet get_fs_initial_compat_set() {
114 CompatSet::FeatureSet ceph_osd_feature_compat;
115 CompatSet::FeatureSet ceph_osd_feature_ro_compat;
116 CompatSet::FeatureSet ceph_osd_feature_incompat;
117 return CompatSet(ceph_osd_feature_compat, ceph_osd_feature_ro_compat,
118 ceph_osd_feature_incompat);
119 }
120
121 //Features are added here that this FileStore supports.
122 static CompatSet get_fs_supported_compat_set() {
123 CompatSet compat = get_fs_initial_compat_set();
124 //Any features here can be set in code, but not in initial superblock
125 compat.incompat.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS);
126 return compat;
127 }
128
129 int FileStore::validate_hobject_key(const hobject_t &obj) const
130 {
131 unsigned len = LFNIndex::get_max_escaped_name_len(obj);
132 return len > m_filestore_max_xattr_value_size ? -ENAMETOOLONG : 0;
133 }
134
135 int FileStore::get_block_device_fsid(CephContext* cct, const string& path,
136 uuid_d *fsid)
137 {
138 // make sure we don't try to use aio or direct_io (and get annoying
139 // error messages from failing to do so); performance implications
140 // should be irrelevant for this use
141 FileJournal j(cct, *fsid, 0, 0, path.c_str(), false, false);
142 return j.peek_fsid(*fsid);
143 }
144
145 void FileStore::FSPerfTracker::update_from_perfcounters(
146 PerfCounters &logger)
147 {
148 os_commit_latency.consume_next(
149 logger.get_tavg_ms(
150 l_filestore_journal_latency));
151 os_apply_latency.consume_next(
152 logger.get_tavg_ms(
153 l_filestore_apply_latency));
154 }
155
156
157 ostream& operator<<(ostream& out, const FileStore::OpSequencer& s)
158 {
159 return out << *s.parent;
160 }
161
162 int FileStore::get_cdir(const coll_t& cid, char *s, int len)
163 {
164 const string &cid_str(cid.to_str());
165 return snprintf(s, len, "%s/current/%s", basedir.c_str(), cid_str.c_str());
166 }
167
168 int FileStore::get_index(const coll_t& cid, Index *index)
169 {
170 int r = index_manager.get_index(cid, basedir, index);
171 assert(!m_filestore_fail_eio || r != -EIO);
172 return r;
173 }
174
175 int FileStore::init_index(const coll_t& cid)
176 {
177 char path[PATH_MAX];
178 get_cdir(cid, path, sizeof(path));
179 int r = index_manager.init_index(cid, path, target_version);
180 assert(!m_filestore_fail_eio || r != -EIO);
181 return r;
182 }
183
184 int FileStore::lfn_find(const ghobject_t& oid, const Index& index, IndexedPath *path)
185 {
186 IndexedPath path2;
187 if (!path)
188 path = &path2;
189 int r, exist;
190 assert(NULL != index.index);
191 r = (index.index)->lookup(oid, path, &exist);
192 if (r < 0) {
193 assert(!m_filestore_fail_eio || r != -EIO);
194 return r;
195 }
196 if (!exist)
197 return -ENOENT;
198 return 0;
199 }
200
201 int FileStore::lfn_truncate(const coll_t& cid, const ghobject_t& oid, off_t length)
202 {
203 FDRef fd;
204 int r = lfn_open(cid, oid, false, &fd);
205 if (r < 0)
206 return r;
207 r = ::ftruncate(**fd, length);
208 if (r < 0)
209 r = -errno;
210 if (r >= 0 && m_filestore_sloppy_crc) {
211 int rc = backend->_crc_update_truncate(**fd, length);
212 assert(rc >= 0);
213 }
214 lfn_close(fd);
215 assert(!m_filestore_fail_eio || r != -EIO);
216 return r;
217 }
218
219 int FileStore::lfn_stat(const coll_t& cid, const ghobject_t& oid, struct stat *buf)
220 {
221 IndexedPath path;
222 Index index;
223 int r = get_index(cid, &index);
224 if (r < 0)
225 return r;
226
227 assert(NULL != index.index);
228 RWLock::RLocker l((index.index)->access_lock);
229
230 r = lfn_find(oid, index, &path);
231 if (r < 0)
232 return r;
233 r = ::stat(path->path(), buf);
234 if (r < 0)
235 r = -errno;
236 return r;
237 }
238
239 int FileStore::lfn_open(const coll_t& cid,
240 const ghobject_t& oid,
241 bool create,
242 FDRef *outfd,
243 Index *index)
244 {
245 assert(outfd);
246 int r = 0;
247 bool need_lock = true;
248 int flags = O_RDWR;
249
250 if (create)
251 flags |= O_CREAT;
252 if (cct->_conf->filestore_odsync_write) {
253 flags |= O_DSYNC;
254 }
255
256 Index index2;
257 if (!index) {
258 index = &index2;
259 }
260 if (!((*index).index)) {
261 r = get_index(cid, index);
262 if (r < 0) {
263 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
264 return r;
265 }
266 } else {
267 need_lock = false;
268 }
269
270 int fd, exist;
271 assert(NULL != (*index).index);
272 if (need_lock) {
273 ((*index).index)->access_lock.get_write();
274 }
275 if (!replaying) {
276 *outfd = fdcache.lookup(oid);
277 if (*outfd) {
278 if (need_lock) {
279 ((*index).index)->access_lock.put_write();
280 }
281 return 0;
282 }
283 }
284
285
286 IndexedPath path2;
287 IndexedPath *path = &path2;
288
289 r = (*index)->lookup(oid, path, &exist);
290 if (r < 0) {
291 derr << "could not find " << oid << " in index: "
292 << cpp_strerror(-r) << dendl;
293 goto fail;
294 }
295
296 r = ::open((*path)->path(), flags, 0644);
297 if (r < 0) {
298 r = -errno;
299 dout(10) << "error opening file " << (*path)->path() << " with flags="
300 << flags << ": " << cpp_strerror(-r) << dendl;
301 goto fail;
302 }
303 fd = r;
304 if (create && (!exist)) {
305 r = (*index)->created(oid, (*path)->path());
306 if (r < 0) {
307 VOID_TEMP_FAILURE_RETRY(::close(fd));
308 derr << "error creating " << oid << " (" << (*path)->path()
309 << ") in index: " << cpp_strerror(-r) << dendl;
310 goto fail;
311 }
312 r = chain_fsetxattr<true, true>(
313 fd, XATTR_SPILL_OUT_NAME,
314 XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT));
315 if (r < 0) {
316 VOID_TEMP_FAILURE_RETRY(::close(fd));
317 derr << "error setting spillout xattr for oid " << oid << " (" << (*path)->path()
318 << "):" << cpp_strerror(-r) << dendl;
319 goto fail;
320 }
321 }
322
323 if (!replaying) {
324 bool existed;
325 *outfd = fdcache.add(oid, fd, &existed);
326 if (existed) {
327 TEMP_FAILURE_RETRY(::close(fd));
328 }
329 } else {
330 *outfd = std::make_shared<FDCache::FD>(fd);
331 }
332
333 if (need_lock) {
334 ((*index).index)->access_lock.put_write();
335 }
336
337 return 0;
338
339 fail:
340
341 if (need_lock) {
342 ((*index).index)->access_lock.put_write();
343 }
344
345 assert(!m_filestore_fail_eio || r != -EIO);
346 return r;
347 }
348
349 void FileStore::lfn_close(FDRef fd)
350 {
351 }
352
353 int FileStore::lfn_link(const coll_t& c, const coll_t& newcid, const ghobject_t& o, const ghobject_t& newoid)
354 {
355 Index index_new, index_old;
356 IndexedPath path_new, path_old;
357 int exist;
358 int r;
359 bool index_same = false;
360 if (c < newcid) {
361 r = get_index(newcid, &index_new);
362 if (r < 0)
363 return r;
364 r = get_index(c, &index_old);
365 if (r < 0)
366 return r;
367 } else if (c == newcid) {
368 r = get_index(c, &index_old);
369 if (r < 0)
370 return r;
371 index_new = index_old;
372 index_same = true;
373 } else {
374 r = get_index(c, &index_old);
375 if (r < 0)
376 return r;
377 r = get_index(newcid, &index_new);
378 if (r < 0)
379 return r;
380 }
381
382 assert(NULL != index_old.index);
383 assert(NULL != index_new.index);
384
385 if (!index_same) {
386
387 RWLock::RLocker l1((index_old.index)->access_lock);
388
389 r = index_old->lookup(o, &path_old, &exist);
390 if (r < 0) {
391 assert(!m_filestore_fail_eio || r != -EIO);
392 return r;
393 }
394 if (!exist)
395 return -ENOENT;
396
397 RWLock::WLocker l2((index_new.index)->access_lock);
398
399 r = index_new->lookup(newoid, &path_new, &exist);
400 if (r < 0) {
401 assert(!m_filestore_fail_eio || r != -EIO);
402 return r;
403 }
404 if (exist)
405 return -EEXIST;
406
407 dout(25) << __FUNC__ << ": path_old: " << path_old << dendl;
408 dout(25) << __FUNC__ << ": path_new: " << path_new << dendl;
409 r = ::link(path_old->path(), path_new->path());
410 if (r < 0)
411 return -errno;
412
413 r = index_new->created(newoid, path_new->path());
414 if (r < 0) {
415 assert(!m_filestore_fail_eio || r != -EIO);
416 return r;
417 }
418 } else {
419 RWLock::WLocker l1((index_old.index)->access_lock);
420
421 r = index_old->lookup(o, &path_old, &exist);
422 if (r < 0) {
423 assert(!m_filestore_fail_eio || r != -EIO);
424 return r;
425 }
426 if (!exist)
427 return -ENOENT;
428
429 r = index_new->lookup(newoid, &path_new, &exist);
430 if (r < 0) {
431 assert(!m_filestore_fail_eio || r != -EIO);
432 return r;
433 }
434 if (exist)
435 return -EEXIST;
436
437 dout(25) << __FUNC__ << ": path_old: " << path_old << dendl;
438 dout(25) << __FUNC__ << ": path_new: " << path_new << dendl;
439 r = ::link(path_old->path(), path_new->path());
440 if (r < 0)
441 return -errno;
442
443 // make sure old fd for unlinked/overwritten file is gone
444 fdcache.clear(newoid);
445
446 r = index_new->created(newoid, path_new->path());
447 if (r < 0) {
448 assert(!m_filestore_fail_eio || r != -EIO);
449 return r;
450 }
451 }
452 return 0;
453 }
454
455 int FileStore::lfn_unlink(const coll_t& cid, const ghobject_t& o,
456 const SequencerPosition &spos,
457 bool force_clear_omap)
458 {
459 Index index;
460 int r = get_index(cid, &index);
461 if (r < 0) {
462 dout(25) << __FUNC__ << ": get_index failed " << cpp_strerror(r) << dendl;
463 return r;
464 }
465
466 assert(NULL != index.index);
467 RWLock::WLocker l((index.index)->access_lock);
468
469 {
470 IndexedPath path;
471 int hardlink;
472 r = index->lookup(o, &path, &hardlink);
473 if (r < 0) {
474 assert(!m_filestore_fail_eio || r != -EIO);
475 return r;
476 }
477
478 if (!force_clear_omap) {
479 if (hardlink == 0 || hardlink == 1) {
480 force_clear_omap = true;
481 }
482 }
483 if (force_clear_omap) {
484 dout(20) << __FUNC__ << ": clearing omap on " << o
485 << " in cid " << cid << dendl;
486 r = object_map->clear(o, &spos);
487 if (r < 0 && r != -ENOENT) {
488 dout(25) << __FUNC__ << ": omap clear failed " << cpp_strerror(r) << dendl;
489 assert(!m_filestore_fail_eio || r != -EIO);
490 return r;
491 }
492 if (cct->_conf->filestore_debug_inject_read_err) {
493 debug_obj_on_delete(o);
494 }
495 if (!m_disable_wbthrottle) {
496 wbthrottle.clear_object(o); // should be only non-cache ref
497 }
498 fdcache.clear(o);
499 } else {
500 /* Ensure that replay of this op doesn't result in the object_map
501 * going away.
502 */
503 if (!backend->can_checkpoint())
504 object_map->sync(&o, &spos);
505 }
506 if (hardlink == 0) {
507 if (!m_disable_wbthrottle) {
508 wbthrottle.clear_object(o); // should be only non-cache ref
509 }
510 return 0;
511 }
512 }
513 r = index->unlink(o);
514 if (r < 0) {
515 dout(25) << __FUNC__ << ": index unlink failed " << cpp_strerror(r) << dendl;
516 return r;
517 }
518 return 0;
519 }
520
521 FileStore::FileStore(CephContext* cct, const std::string &base,
522 const std::string &jdev, osflagbits_t flags,
523 const char *name, bool do_update) :
524 JournalingObjectStore(cct, base),
525 internal_name(name),
526 basedir(base), journalpath(jdev),
527 generic_flags(flags),
528 blk_size(0),
529 fsid_fd(-1), op_fd(-1),
530 basedir_fd(-1), current_fd(-1),
531 backend(NULL),
532 index_manager(cct, do_update),
533 lock("FileStore::lock"),
534 force_sync(false),
535 sync_entry_timeo_lock("FileStore::sync_entry_timeo_lock"),
536 timer(cct, sync_entry_timeo_lock),
537 stop(false), sync_thread(this),
538 fdcache(cct),
539 wbthrottle(cct),
540 next_osr_id(0),
541 m_disable_wbthrottle(cct->_conf->filestore_odsync_write ||
542 !cct->_conf->filestore_wbthrottle_enable),
543 throttle_ops(cct, "filestore_ops", cct->_conf->filestore_caller_concurrency),
544 throttle_bytes(cct, "filestore_bytes", cct->_conf->filestore_caller_concurrency),
545 m_ondisk_finisher_num(cct->_conf->filestore_ondisk_finisher_threads),
546 m_apply_finisher_num(cct->_conf->filestore_apply_finisher_threads),
547 op_tp(cct, "FileStore::op_tp", "tp_fstore_op", cct->_conf->filestore_op_threads, "filestore_op_threads"),
548 op_wq(this, cct->_conf->filestore_op_thread_timeout,
549 cct->_conf->filestore_op_thread_suicide_timeout, &op_tp),
550 logger(NULL),
551 trace_endpoint("0.0.0.0", 0, "FileStore"),
552 read_error_lock("FileStore::read_error_lock"),
553 m_filestore_commit_timeout(cct->_conf->filestore_commit_timeout),
554 m_filestore_journal_parallel(cct->_conf->filestore_journal_parallel ),
555 m_filestore_journal_trailing(cct->_conf->filestore_journal_trailing),
556 m_filestore_journal_writeahead(cct->_conf->filestore_journal_writeahead),
557 m_filestore_fiemap_threshold(cct->_conf->filestore_fiemap_threshold),
558 m_filestore_max_sync_interval(cct->_conf->filestore_max_sync_interval),
559 m_filestore_min_sync_interval(cct->_conf->filestore_min_sync_interval),
560 m_filestore_fail_eio(cct->_conf->filestore_fail_eio),
561 m_filestore_fadvise(cct->_conf->filestore_fadvise),
562 do_update(do_update),
563 m_journal_dio(cct->_conf->journal_dio),
564 m_journal_aio(cct->_conf->journal_aio),
565 m_journal_force_aio(cct->_conf->journal_force_aio),
566 m_osd_rollback_to_cluster_snap(cct->_conf->osd_rollback_to_cluster_snap),
567 m_osd_use_stale_snap(cct->_conf->osd_use_stale_snap),
568 m_filestore_do_dump(false),
569 m_filestore_dump_fmt(true),
570 m_filestore_sloppy_crc(cct->_conf->filestore_sloppy_crc),
571 m_filestore_sloppy_crc_block_size(cct->_conf->filestore_sloppy_crc_block_size),
572 m_filestore_max_alloc_hint_size(cct->_conf->filestore_max_alloc_hint_size),
573 m_fs_type(0),
574 m_filestore_max_inline_xattr_size(0),
575 m_filestore_max_inline_xattrs(0),
576 m_filestore_max_xattr_value_size(0)
577 {
578 m_filestore_kill_at = cct->_conf->filestore_kill_at;
579 for (int i = 0; i < m_ondisk_finisher_num; ++i) {
580 ostringstream oss;
581 oss << "filestore-ondisk-" << i;
582 Finisher *f = new Finisher(cct, oss.str(), "fn_odsk_fstore");
583 ondisk_finishers.push_back(f);
584 }
585 for (int i = 0; i < m_apply_finisher_num; ++i) {
586 ostringstream oss;
587 oss << "filestore-apply-" << i;
588 Finisher *f = new Finisher(cct, oss.str(), "fn_appl_fstore");
589 apply_finishers.push_back(f);
590 }
591
592 ostringstream oss;
593 oss << basedir << "/current";
594 current_fn = oss.str();
595
596 ostringstream sss;
597 sss << basedir << "/current/commit_op_seq";
598 current_op_seq_fn = sss.str();
599
600 ostringstream omss;
601 if (cct->_conf->filestore_omap_backend_path != "") {
602 omap_dir = cct->_conf->filestore_omap_backend_path;
603 } else {
604 omss << basedir << "/current/omap";
605 omap_dir = omss.str();
606 }
607
608 // initialize logger
609 PerfCountersBuilder plb(cct, internal_name, l_filestore_first, l_filestore_last);
610
611 plb.add_u64(l_filestore_journal_queue_ops, "journal_queue_ops", "Operations in journal queue");
612 plb.add_u64(l_filestore_journal_ops, "journal_ops", "Active journal entries to be applied");
613 plb.add_u64(l_filestore_journal_queue_bytes, "journal_queue_bytes", "Size of journal queue");
614 plb.add_u64(l_filestore_journal_bytes, "journal_bytes", "Active journal operation size to be applied");
615 plb.add_time_avg(l_filestore_journal_latency, "journal_latency", "Average journal queue completing latency");
616 plb.add_u64_counter(l_filestore_journal_wr, "journal_wr", "Journal write IOs");
617 plb.add_u64_avg(l_filestore_journal_wr_bytes, "journal_wr_bytes", "Journal data written");
618 plb.add_u64(l_filestore_op_queue_max_ops, "op_queue_max_ops", "Max operations in writing to FS queue");
619 plb.add_u64(l_filestore_op_queue_ops, "op_queue_ops", "Operations in writing to FS queue");
620 plb.add_u64_counter(l_filestore_ops, "ops", "Operations written to store");
621 plb.add_u64(l_filestore_op_queue_max_bytes, "op_queue_max_bytes", "Max data in writing to FS queue");
622 plb.add_u64(l_filestore_op_queue_bytes, "op_queue_bytes", "Size of writing to FS queue");
623 plb.add_u64_counter(l_filestore_bytes, "bytes", "Data written to store");
624 plb.add_time_avg(l_filestore_apply_latency, "apply_latency", "Apply latency");
625 plb.add_u64(l_filestore_committing, "committing", "Is currently committing");
626
627 plb.add_u64_counter(l_filestore_commitcycle, "commitcycle", "Commit cycles");
628 plb.add_time_avg(l_filestore_commitcycle_interval, "commitcycle_interval", "Average interval between commits");
629 plb.add_time_avg(l_filestore_commitcycle_latency, "commitcycle_latency", "Average latency of commit");
630 plb.add_u64_counter(l_filestore_journal_full, "journal_full", "Journal writes while full");
631 plb.add_time_avg(l_filestore_queue_transaction_latency_avg, "queue_transaction_latency_avg", "Store operation queue latency");
632 plb.add_time(l_filestore_sync_pause_max_lat, "sync_pause_max_latency", "Max latency of op_wq pause before syncfs");
633
634 logger = plb.create_perf_counters();
635
636 cct->get_perfcounters_collection()->add(logger);
637 cct->_conf->add_observer(this);
638
639 superblock.compat_features = get_fs_initial_compat_set();
640 }
641
642 FileStore::~FileStore()
643 {
644 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
645 delete *it;
646 *it = NULL;
647 }
648 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
649 delete *it;
650 *it = NULL;
651 }
652 cct->_conf->remove_observer(this);
653 cct->get_perfcounters_collection()->remove(logger);
654
655 if (journal)
656 journal->logger = NULL;
657 delete logger;
658
659 if (m_filestore_do_dump) {
660 dump_stop();
661 }
662 }
663
664 static void get_attrname(const char *name, char *buf, int len)
665 {
666 snprintf(buf, len, "user.ceph.%s", name);
667 }
668
669 bool parse_attrname(char **name)
670 {
671 if (strncmp(*name, "user.ceph.", 10) == 0) {
672 *name += 10;
673 return true;
674 }
675 return false;
676 }
677
678 void FileStore::collect_metadata(map<string,string> *pm)
679 {
680 char partition_path[PATH_MAX];
681 char dev_node[PATH_MAX];
682 int rc = 0;
683
684 (*pm)["filestore_backend"] = backend->get_name();
685 ostringstream ss;
686 ss << "0x" << std::hex << m_fs_type << std::dec;
687 (*pm)["filestore_f_type"] = ss.str();
688
689 if (cct->_conf->filestore_collect_device_partition_information) {
690 rc = get_device_by_fd(fsid_fd, partition_path, dev_node, PATH_MAX);
691 } else {
692 rc = -EINVAL;
693 }
694
695 switch (rc) {
696 case -EOPNOTSUPP:
697 case -EINVAL:
698 (*pm)["backend_filestore_partition_path"] = "unknown";
699 (*pm)["backend_filestore_dev_node"] = "unknown";
700 break;
701 case -ENODEV:
702 (*pm)["backend_filestore_partition_path"] = string(partition_path);
703 (*pm)["backend_filestore_dev_node"] = "unknown";
704 break;
705 default:
706 (*pm)["backend_filestore_partition_path"] = string(partition_path);
707 (*pm)["backend_filestore_dev_node"] = string(dev_node);
708 }
709 }
710
711 int FileStore::statfs(struct store_statfs_t *buf0)
712 {
713 struct statfs buf;
714 buf0->reset();
715 if (::statfs(basedir.c_str(), &buf) < 0) {
716 int r = -errno;
717 assert(!m_filestore_fail_eio || r != -EIO);
718 assert(r != -ENOENT);
719 return r;
720 }
721 buf0->total = buf.f_blocks * buf.f_bsize;
722 buf0->available = buf.f_bavail * buf.f_bsize;
723 // Adjust for writes pending in the journal
724 if (journal) {
725 uint64_t estimate = journal->get_journal_size_estimate();
726 if (buf0->available > estimate)
727 buf0->available -= estimate;
728 else
729 buf0->available = 0;
730 }
731 return 0;
732 }
733
734
735 void FileStore::new_journal()
736 {
737 if (journalpath.length()) {
738 dout(10) << "open_journal at " << journalpath << dendl;
739 journal = new FileJournal(cct, fsid, &finisher, &sync_cond,
740 journalpath.c_str(),
741 m_journal_dio, m_journal_aio,
742 m_journal_force_aio);
743 if (journal)
744 journal->logger = logger;
745 }
746 return;
747 }
748
749 int FileStore::dump_journal(ostream& out)
750 {
751 int r;
752
753 if (!journalpath.length())
754 return -EINVAL;
755
756 FileJournal *journal = new FileJournal(cct, fsid, &finisher, &sync_cond, journalpath.c_str(), m_journal_dio);
757 r = journal->dump(out);
758 delete journal;
759 return r;
760 }
761
762 FileStoreBackend *FileStoreBackend::create(long f_type, FileStore *fs)
763 {
764 switch (f_type) {
765 #if defined(__linux__)
766 case BTRFS_SUPER_MAGIC:
767 return new BtrfsFileStoreBackend(fs);
768 # ifdef HAVE_LIBXFS
769 case XFS_SUPER_MAGIC:
770 return new XfsFileStoreBackend(fs);
771 # endif
772 #endif
773 #ifdef HAVE_LIBZFS
774 case ZFS_SUPER_MAGIC:
775 return new ZFSFileStoreBackend(fs);
776 #endif
777 default:
778 return new GenericFileStoreBackend(fs);
779 }
780 }
781
782 void FileStore::create_backend(long f_type)
783 {
784 m_fs_type = f_type;
785
786 assert(backend == NULL);
787 backend = FileStoreBackend::create(f_type, this);
788
789 dout(0) << "backend " << backend->get_name()
790 << " (magic 0x" << std::hex << f_type << std::dec << ")"
791 << dendl;
792
793 switch (f_type) {
794 #if defined(__linux__)
795 case BTRFS_SUPER_MAGIC:
796 if (!m_disable_wbthrottle){
797 wbthrottle.set_fs(WBThrottle::BTRFS);
798 }
799 break;
800
801 case XFS_SUPER_MAGIC:
802 // wbthrottle is constructed with fs(WBThrottle::XFS)
803 break;
804 #endif
805 }
806
807 set_xattr_limits_via_conf();
808 }
809
810 int FileStore::mkfs()
811 {
812 int ret = 0;
813 char fsid_fn[PATH_MAX];
814 char fsid_str[40];
815 uuid_d old_fsid;
816 uuid_d old_omap_fsid;
817
818 dout(1) << "mkfs in " << basedir << dendl;
819 basedir_fd = ::open(basedir.c_str(), O_RDONLY);
820 if (basedir_fd < 0) {
821 ret = -errno;
822 derr << __FUNC__ << ": failed to open base dir " << basedir << ": " << cpp_strerror(ret) << dendl;
823 return ret;
824 }
825
826 // open+lock fsid
827 snprintf(fsid_fn, sizeof(fsid_fn), "%s/fsid", basedir.c_str());
828 fsid_fd = ::open(fsid_fn, O_RDWR|O_CREAT, 0644);
829 if (fsid_fd < 0) {
830 ret = -errno;
831 derr << __FUNC__ << ": failed to open " << fsid_fn << ": " << cpp_strerror(ret) << dendl;
832 goto close_basedir_fd;
833 }
834
835 if (lock_fsid() < 0) {
836 ret = -EBUSY;
837 goto close_fsid_fd;
838 }
839
840 if (read_fsid(fsid_fd, &old_fsid) < 0 || old_fsid.is_zero()) {
841 if (fsid.is_zero()) {
842 fsid.generate_random();
843 dout(1) << __FUNC__ << ": generated fsid " << fsid << dendl;
844 } else {
845 dout(1) << __FUNC__ << ": using provided fsid " << fsid << dendl;
846 }
847
848 fsid.print(fsid_str);
849 strcat(fsid_str, "\n");
850 ret = ::ftruncate(fsid_fd, 0);
851 if (ret < 0) {
852 ret = -errno;
853 derr << __FUNC__ << ": failed to truncate fsid: "
854 << cpp_strerror(ret) << dendl;
855 goto close_fsid_fd;
856 }
857 ret = safe_write(fsid_fd, fsid_str, strlen(fsid_str));
858 if (ret < 0) {
859 derr << __FUNC__ << ": failed to write fsid: "
860 << cpp_strerror(ret) << dendl;
861 goto close_fsid_fd;
862 }
863 if (::fsync(fsid_fd) < 0) {
864 ret = -errno;
865 derr << __FUNC__ << ": close failed: can't write fsid: "
866 << cpp_strerror(ret) << dendl;
867 goto close_fsid_fd;
868 }
869 dout(10) << __FUNC__ << ": fsid is " << fsid << dendl;
870 } else {
871 if (!fsid.is_zero() && fsid != old_fsid) {
872 derr << __FUNC__ << ": on-disk fsid " << old_fsid << " != provided " << fsid << dendl;
873 ret = -EINVAL;
874 goto close_fsid_fd;
875 }
876 fsid = old_fsid;
877 dout(1) << __FUNC__ << ": fsid is already set to " << fsid << dendl;
878 }
879
880 // version stamp
881 ret = write_version_stamp();
882 if (ret < 0) {
883 derr << __FUNC__ << ": write_version_stamp() failed: "
884 << cpp_strerror(ret) << dendl;
885 goto close_fsid_fd;
886 }
887
888 // superblock
889 superblock.omap_backend = cct->_conf->filestore_omap_backend;
890 ret = write_superblock();
891 if (ret < 0) {
892 derr << __FUNC__ << ": write_superblock() failed: "
893 << cpp_strerror(ret) << dendl;
894 goto close_fsid_fd;
895 }
896
897 struct statfs basefs;
898 ret = ::fstatfs(basedir_fd, &basefs);
899 if (ret < 0) {
900 ret = -errno;
901 derr << __FUNC__ << ": cannot fstatfs basedir "
902 << cpp_strerror(ret) << dendl;
903 goto close_fsid_fd;
904 }
905
906 #if defined(__linux__)
907 if (basefs.f_type == BTRFS_SUPER_MAGIC &&
908 !g_ceph_context->check_experimental_feature_enabled("btrfs")) {
909 derr << __FUNC__ << ": deprecated btrfs support is not enabled" << dendl;
910 goto close_fsid_fd;
911 }
912 #endif
913
914 create_backend(basefs.f_type);
915
916 ret = backend->create_current();
917 if (ret < 0) {
918 derr << __FUNC__ << ": failed to create current/ " << cpp_strerror(ret) << dendl;
919 goto close_fsid_fd;
920 }
921
922 // write initial op_seq
923 {
924 uint64_t initial_seq = 0;
925 int fd = read_op_seq(&initial_seq);
926 if (fd < 0) {
927 ret = fd;
928 derr << __FUNC__ << ": failed to create " << current_op_seq_fn << ": "
929 << cpp_strerror(ret) << dendl;
930 goto close_fsid_fd;
931 }
932 if (initial_seq == 0) {
933 ret = write_op_seq(fd, 1);
934 if (ret < 0) {
935 VOID_TEMP_FAILURE_RETRY(::close(fd));
936 derr << __FUNC__ << ": failed to write to " << current_op_seq_fn << ": "
937 << cpp_strerror(ret) << dendl;
938 goto close_fsid_fd;
939 }
940
941 if (backend->can_checkpoint()) {
942 // create snap_1 too
943 current_fd = ::open(current_fn.c_str(), O_RDONLY);
944 assert(current_fd >= 0);
945 char s[NAME_MAX];
946 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, 1ull);
947 ret = backend->create_checkpoint(s, NULL);
948 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
949 if (ret < 0 && ret != -EEXIST) {
950 VOID_TEMP_FAILURE_RETRY(::close(fd));
951 derr << __FUNC__ << ": failed to create snap_1: " << cpp_strerror(ret) << dendl;
952 goto close_fsid_fd;
953 }
954 }
955 }
956 VOID_TEMP_FAILURE_RETRY(::close(fd));
957 }
958 ret = KeyValueDB::test_init(superblock.omap_backend, omap_dir);
959 if (ret < 0) {
960 derr << __FUNC__ << ": failed to create " << cct->_conf->filestore_omap_backend << dendl;
961 goto close_fsid_fd;
962 }
963 // create fsid under omap
964 // open+lock fsid
965 int omap_fsid_fd;
966 char omap_fsid_fn[PATH_MAX];
967 snprintf(omap_fsid_fn, sizeof(omap_fsid_fn), "%s/osd_uuid", omap_dir.c_str());
968 omap_fsid_fd = ::open(omap_fsid_fn, O_RDWR|O_CREAT, 0644);
969 if (omap_fsid_fd < 0) {
970 ret = -errno;
971 derr << __FUNC__ << ": failed to open " << omap_fsid_fn << ": " << cpp_strerror(ret) << dendl;
972 goto close_fsid_fd;
973 }
974
975 if (read_fsid(omap_fsid_fd, &old_omap_fsid) < 0 || old_omap_fsid.is_zero()) {
976 assert(!fsid.is_zero());
977 fsid.print(fsid_str);
978 strcat(fsid_str, "\n");
979 ret = ::ftruncate(omap_fsid_fd, 0);
980 if (ret < 0) {
981 ret = -errno;
982 derr << __FUNC__ << ": failed to truncate fsid: "
983 << cpp_strerror(ret) << dendl;
984 goto close_omap_fsid_fd;
985 }
986 ret = safe_write(omap_fsid_fd, fsid_str, strlen(fsid_str));
987 if (ret < 0) {
988 derr << __FUNC__ << ": failed to write fsid: "
989 << cpp_strerror(ret) << dendl;
990 goto close_omap_fsid_fd;
991 }
992 dout(10) << __FUNC__ << ": write success, fsid:" << fsid_str << ", ret:" << ret << dendl;
993 if (::fsync(omap_fsid_fd) < 0) {
994 ret = -errno;
995 derr << __FUNC__ << ": close failed: can't write fsid: "
996 << cpp_strerror(ret) << dendl;
997 goto close_omap_fsid_fd;
998 }
999 dout(10) << "mkfs omap fsid is " << fsid << dendl;
1000 } else {
1001 if (fsid != old_omap_fsid) {
1002 derr << __FUNC__ << ": " << omap_fsid_fn
1003 << " has existed omap fsid " << old_omap_fsid
1004 << " != expected osd fsid " << fsid
1005 << dendl;
1006 ret = -EINVAL;
1007 goto close_omap_fsid_fd;
1008 }
1009 dout(1) << __FUNC__ << ": omap fsid is already set to " << fsid << dendl;
1010 }
1011
1012 dout(1) << cct->_conf->filestore_omap_backend << " db exists/created" << dendl;
1013
1014 // journal?
1015 ret = mkjournal();
1016 if (ret)
1017 goto close_omap_fsid_fd;
1018
1019 ret = write_meta("type", "filestore");
1020 if (ret)
1021 goto close_omap_fsid_fd;
1022
1023 dout(1) << "mkfs done in " << basedir << dendl;
1024 ret = 0;
1025
1026 close_omap_fsid_fd:
1027 VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd));
1028 close_fsid_fd:
1029 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1030 fsid_fd = -1;
1031 close_basedir_fd:
1032 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1033 delete backend;
1034 backend = NULL;
1035 return ret;
1036 }
1037
1038 int FileStore::mkjournal()
1039 {
1040 // read fsid
1041 int ret;
1042 char fn[PATH_MAX];
1043 snprintf(fn, sizeof(fn), "%s/fsid", basedir.c_str());
1044 int fd = ::open(fn, O_RDONLY, 0644);
1045 if (fd < 0) {
1046 int err = errno;
1047 derr << __FUNC__ << ": open error: " << cpp_strerror(err) << dendl;
1048 return -err;
1049 }
1050 ret = read_fsid(fd, &fsid);
1051 if (ret < 0) {
1052 derr << __FUNC__ << ": read error: " << cpp_strerror(ret) << dendl;
1053 VOID_TEMP_FAILURE_RETRY(::close(fd));
1054 return ret;
1055 }
1056 VOID_TEMP_FAILURE_RETRY(::close(fd));
1057
1058 ret = 0;
1059
1060 new_journal();
1061 if (journal) {
1062 ret = journal->check();
1063 if (ret < 0) {
1064 ret = journal->create();
1065 if (ret)
1066 derr << __FUNC__ << ": error creating journal on " << journalpath
1067 << ": " << cpp_strerror(ret) << dendl;
1068 else
1069 dout(0) << __FUNC__ << ": created journal on " << journalpath << dendl;
1070 }
1071 delete journal;
1072 journal = 0;
1073 }
1074 return ret;
1075 }
1076
1077 int FileStore::read_fsid(int fd, uuid_d *uuid)
1078 {
1079 char fsid_str[40];
1080 memset(fsid_str, 0, sizeof(fsid_str));
1081 int ret = safe_read(fd, fsid_str, sizeof(fsid_str));
1082 if (ret < 0)
1083 return ret;
1084 if (ret == 8) {
1085 // old 64-bit fsid... mirror it.
1086 *(uint64_t*)&uuid->bytes()[0] = *(uint64_t*)fsid_str;
1087 *(uint64_t*)&uuid->bytes()[8] = *(uint64_t*)fsid_str;
1088 return 0;
1089 }
1090
1091 if (ret > 36)
1092 fsid_str[36] = 0;
1093 else
1094 fsid_str[ret] = 0;
1095 if (!uuid->parse(fsid_str))
1096 return -EINVAL;
1097 return 0;
1098 }
1099
1100 int FileStore::lock_fsid()
1101 {
1102 struct flock l;
1103 memset(&l, 0, sizeof(l));
1104 l.l_type = F_WRLCK;
1105 l.l_whence = SEEK_SET;
1106 l.l_start = 0;
1107 l.l_len = 0;
1108 int r = ::fcntl(fsid_fd, F_SETLK, &l);
1109 if (r < 0) {
1110 int err = errno;
1111 dout(0) << __FUNC__ << ": failed to lock " << basedir << "/fsid, is another ceph-osd still running? "
1112 << cpp_strerror(err) << dendl;
1113 return -err;
1114 }
1115 return 0;
1116 }
1117
1118 bool FileStore::test_mount_in_use()
1119 {
1120 dout(5) << __FUNC__ << ": basedir " << basedir << " journal " << journalpath << dendl;
1121 char fn[PATH_MAX];
1122 snprintf(fn, sizeof(fn), "%s/fsid", basedir.c_str());
1123
1124 // verify fs isn't in use
1125
1126 fsid_fd = ::open(fn, O_RDWR, 0644);
1127 if (fsid_fd < 0)
1128 return 0; // no fsid, ok.
1129 bool inuse = lock_fsid() < 0;
1130 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1131 fsid_fd = -1;
1132 return inuse;
1133 }
1134
1135 bool FileStore::is_rotational()
1136 {
1137 bool rotational;
1138 if (backend) {
1139 rotational = backend->is_rotational();
1140 } else {
1141 int fd = ::open(basedir.c_str(), O_RDONLY);
1142 if (fd < 0)
1143 return true;
1144 struct statfs st;
1145 int r = ::fstatfs(fd, &st);
1146 ::close(fd);
1147 if (r < 0) {
1148 return true;
1149 }
1150 create_backend(st.f_type);
1151 rotational = backend->is_rotational();
1152 delete backend;
1153 backend = NULL;
1154 }
1155 dout(10) << __func__ << " " << (int)rotational << dendl;
1156 return rotational;
1157 }
1158
1159 bool FileStore::is_journal_rotational()
1160 {
1161 bool journal_rotational;
1162 if (backend) {
1163 journal_rotational = backend->is_journal_rotational();
1164 } else {
1165 int fd = ::open(journalpath.c_str(), O_RDONLY);
1166 if (fd < 0)
1167 return true;
1168 struct statfs st;
1169 int r = ::fstatfs(fd, &st);
1170 ::close(fd);
1171 if (r < 0) {
1172 return true;
1173 }
1174 create_backend(st.f_type);
1175 journal_rotational = backend->is_journal_rotational();
1176 delete backend;
1177 backend = NULL;
1178 }
1179 dout(10) << __func__ << " " << (int)journal_rotational << dendl;
1180 return journal_rotational;
1181 }
1182
1183 int FileStore::_detect_fs()
1184 {
1185 struct statfs st;
1186 int r = ::fstatfs(basedir_fd, &st);
1187 if (r < 0)
1188 return -errno;
1189
1190 blk_size = st.f_bsize;
1191
1192 #if defined(__linux__)
1193 if (st.f_type == BTRFS_SUPER_MAGIC &&
1194 !g_ceph_context->check_experimental_feature_enabled("btrfs")) {
1195 derr <<__FUNC__ << ": deprecated btrfs support is not enabled" << dendl;
1196 return -EPERM;
1197 }
1198 #endif
1199
1200 create_backend(st.f_type);
1201
1202 r = backend->detect_features();
1203 if (r < 0) {
1204 derr << __FUNC__ << ": detect_features error: " << cpp_strerror(r) << dendl;
1205 return r;
1206 }
1207
1208 // test xattrs
1209 char fn[PATH_MAX];
1210 int x = rand();
1211 int y = x+1;
1212 snprintf(fn, sizeof(fn), "%s/xattr_test", basedir.c_str());
1213 int tmpfd = ::open(fn, O_CREAT|O_WRONLY|O_TRUNC, 0700);
1214 if (tmpfd < 0) {
1215 int ret = -errno;
1216 derr << __FUNC__ << ": unable to create " << fn << ": " << cpp_strerror(ret) << dendl;
1217 return ret;
1218 }
1219
1220 int ret = chain_fsetxattr(tmpfd, "user.test", &x, sizeof(x));
1221 if (ret >= 0)
1222 ret = chain_fgetxattr(tmpfd, "user.test", &y, sizeof(y));
1223 if ((ret < 0) || (x != y)) {
1224 derr << "Extended attributes don't appear to work. ";
1225 if (ret)
1226 *_dout << "Got error " + cpp_strerror(ret) + ". ";
1227 *_dout << "If you are using ext3 or ext4, be sure to mount the underlying "
1228 << "file system with the 'user_xattr' option." << dendl;
1229 ::unlink(fn);
1230 VOID_TEMP_FAILURE_RETRY(::close(tmpfd));
1231 return -ENOTSUP;
1232 }
1233
1234 char buf[1000];
1235 memset(buf, 0, sizeof(buf)); // shut up valgrind
1236 chain_fsetxattr(tmpfd, "user.test", &buf, sizeof(buf));
1237 chain_fsetxattr(tmpfd, "user.test2", &buf, sizeof(buf));
1238 chain_fsetxattr(tmpfd, "user.test3", &buf, sizeof(buf));
1239 chain_fsetxattr(tmpfd, "user.test4", &buf, sizeof(buf));
1240 ret = chain_fsetxattr(tmpfd, "user.test5", &buf, sizeof(buf));
1241 if (ret == -ENOSPC) {
1242 dout(0) << "limited size xattrs" << dendl;
1243 }
1244 chain_fremovexattr(tmpfd, "user.test");
1245 chain_fremovexattr(tmpfd, "user.test2");
1246 chain_fremovexattr(tmpfd, "user.test3");
1247 chain_fremovexattr(tmpfd, "user.test4");
1248 chain_fremovexattr(tmpfd, "user.test5");
1249
1250 ::unlink(fn);
1251 VOID_TEMP_FAILURE_RETRY(::close(tmpfd));
1252
1253 return 0;
1254 }
1255
1256 int FileStore::_sanity_check_fs()
1257 {
1258 // sanity check(s)
1259
1260 if (((int)m_filestore_journal_writeahead +
1261 (int)m_filestore_journal_parallel +
1262 (int)m_filestore_journal_trailing) > 1) {
1263 dout(0) << "mount ERROR: more than one of filestore journal {writeahead,parallel,trailing} enabled" << dendl;
1264 cerr << TEXT_RED
1265 << " ** WARNING: more than one of 'filestore journal {writeahead,parallel,trailing}'\n"
1266 << " is enabled in ceph.conf. You must choose a single journal mode."
1267 << TEXT_NORMAL << std::endl;
1268 return -EINVAL;
1269 }
1270
1271 if (!backend->can_checkpoint()) {
1272 if (!journal || !m_filestore_journal_writeahead) {
1273 dout(0) << "mount WARNING: no btrfs, and no journal in writeahead mode; data may be lost" << dendl;
1274 cerr << TEXT_RED
1275 << " ** WARNING: no btrfs AND (no journal OR journal not in writeahead mode)\n"
1276 << " For non-btrfs volumes, a writeahead journal is required to\n"
1277 << " maintain on-disk consistency in the event of a crash. Your conf\n"
1278 << " should include something like:\n"
1279 << " osd journal = /path/to/journal_device_or_file\n"
1280 << " filestore journal writeahead = true\n"
1281 << TEXT_NORMAL;
1282 }
1283 }
1284
1285 if (!journal) {
1286 dout(0) << "mount WARNING: no journal" << dendl;
1287 cerr << TEXT_YELLOW
1288 << " ** WARNING: No osd journal is configured: write latency may be high.\n"
1289 << " If you will not be using an osd journal, write latency may be\n"
1290 << " relatively high. It can be reduced somewhat by lowering\n"
1291 << " filestore_max_sync_interval, but lower values mean lower write\n"
1292 << " throughput, especially with spinning disks.\n"
1293 << TEXT_NORMAL;
1294 }
1295
1296 return 0;
1297 }
1298
1299 int FileStore::write_superblock()
1300 {
1301 bufferlist bl;
1302 ::encode(superblock, bl);
1303 return safe_write_file(basedir.c_str(), "superblock",
1304 bl.c_str(), bl.length());
1305 }
1306
1307 int FileStore::read_superblock()
1308 {
1309 bufferptr bp(PATH_MAX);
1310 int ret = safe_read_file(basedir.c_str(), "superblock",
1311 bp.c_str(), bp.length());
1312 if (ret < 0) {
1313 if (ret == -ENOENT) {
1314 // If the file doesn't exist write initial CompatSet
1315 return write_superblock();
1316 }
1317 return ret;
1318 }
1319
1320 bufferlist bl;
1321 bl.push_back(std::move(bp));
1322 bufferlist::iterator i = bl.begin();
1323 ::decode(superblock, i);
1324 return 0;
1325 }
1326
1327 int FileStore::update_version_stamp()
1328 {
1329 return write_version_stamp();
1330 }
1331
1332 int FileStore::version_stamp_is_valid(uint32_t *version)
1333 {
1334 bufferptr bp(PATH_MAX);
1335 int ret = safe_read_file(basedir.c_str(), "store_version",
1336 bp.c_str(), bp.length());
1337 if (ret < 0) {
1338 return ret;
1339 }
1340 bufferlist bl;
1341 bl.push_back(std::move(bp));
1342 bufferlist::iterator i = bl.begin();
1343 ::decode(*version, i);
1344 dout(10) << __FUNC__ << ": was " << *version << " vs target "
1345 << target_version << dendl;
1346 if (*version == target_version)
1347 return 1;
1348 else
1349 return 0;
1350 }
1351
1352 int FileStore::write_version_stamp()
1353 {
1354 dout(1) << __FUNC__ << ": " << target_version << dendl;
1355 bufferlist bl;
1356 ::encode(target_version, bl);
1357
1358 return safe_write_file(basedir.c_str(), "store_version",
1359 bl.c_str(), bl.length());
1360 }
1361
1362 int FileStore::upgrade()
1363 {
1364 dout(1) << __FUNC__ << dendl;
1365 uint32_t version;
1366 int r = version_stamp_is_valid(&version);
1367
1368 if (r == -ENOENT) {
1369 derr << "The store_version file doesn't exist." << dendl;
1370 return -EINVAL;
1371 }
1372 if (r < 0)
1373 return r;
1374 if (r == 1)
1375 return 0;
1376
1377 if (version < 3) {
1378 derr << "ObjectStore is old at version " << version << ". Please upgrade to firefly v0.80.x, convert your store, and then upgrade." << dendl;
1379 return -EINVAL;
1380 }
1381
1382 // nothing necessary in FileStore for v3 -> v4 upgrade; we just need to
1383 // open up DBObjectMap with the do_upgrade flag, which we already did.
1384 update_version_stamp();
1385 return 0;
1386 }
1387
1388 int FileStore::read_op_seq(uint64_t *seq)
1389 {
1390 int op_fd = ::open(current_op_seq_fn.c_str(), O_CREAT|O_RDWR, 0644);
1391 if (op_fd < 0) {
1392 int r = -errno;
1393 assert(!m_filestore_fail_eio || r != -EIO);
1394 return r;
1395 }
1396 char s[40];
1397 memset(s, 0, sizeof(s));
1398 int ret = safe_read(op_fd, s, sizeof(s) - 1);
1399 if (ret < 0) {
1400 derr << __FUNC__ << ": error reading " << current_op_seq_fn << ": " << cpp_strerror(ret) << dendl;
1401 VOID_TEMP_FAILURE_RETRY(::close(op_fd));
1402 assert(!m_filestore_fail_eio || ret != -EIO);
1403 return ret;
1404 }
1405 *seq = atoll(s);
1406 return op_fd;
1407 }
1408
1409 int FileStore::write_op_seq(int fd, uint64_t seq)
1410 {
1411 char s[30];
1412 snprintf(s, sizeof(s), "%" PRId64 "\n", seq);
1413 int ret = TEMP_FAILURE_RETRY(::pwrite(fd, s, strlen(s), 0));
1414 if (ret < 0) {
1415 ret = -errno;
1416 assert(!m_filestore_fail_eio || ret != -EIO);
1417 }
1418 return ret;
1419 }
1420
1421 int FileStore::mount()
1422 {
1423 int ret;
1424 char buf[PATH_MAX];
1425 uint64_t initial_op_seq;
1426 uuid_d omap_fsid;
1427 set<string> cluster_snaps;
1428 CompatSet supported_compat_set = get_fs_supported_compat_set();
1429
1430 dout(5) << "basedir " << basedir << " journal " << journalpath << dendl;
1431
1432 ret = set_throttle_params();
1433 if (ret != 0)
1434 goto done;
1435
1436 // make sure global base dir exists
1437 if (::access(basedir.c_str(), R_OK | W_OK)) {
1438 ret = -errno;
1439 derr << __FUNC__ << ": unable to access basedir '" << basedir << "': "
1440 << cpp_strerror(ret) << dendl;
1441 goto done;
1442 }
1443
1444 // get fsid
1445 snprintf(buf, sizeof(buf), "%s/fsid", basedir.c_str());
1446 fsid_fd = ::open(buf, O_RDWR, 0644);
1447 if (fsid_fd < 0) {
1448 ret = -errno;
1449 derr << __FUNC__ << ": error opening '" << buf << "': "
1450 << cpp_strerror(ret) << dendl;
1451 goto done;
1452 }
1453
1454 ret = read_fsid(fsid_fd, &fsid);
1455 if (ret < 0) {
1456 derr << __FUNC__ << ": error reading fsid_fd: " << cpp_strerror(ret)
1457 << dendl;
1458 goto close_fsid_fd;
1459 }
1460
1461 if (lock_fsid() < 0) {
1462 derr << __FUNC__ << ": lock_fsid failed" << dendl;
1463 ret = -EBUSY;
1464 goto close_fsid_fd;
1465 }
1466
1467 dout(10) << "mount fsid is " << fsid << dendl;
1468
1469
1470 uint32_t version_stamp;
1471 ret = version_stamp_is_valid(&version_stamp);
1472 if (ret < 0) {
1473 derr << __FUNC__ << ": error in version_stamp_is_valid: "
1474 << cpp_strerror(ret) << dendl;
1475 goto close_fsid_fd;
1476 } else if (ret == 0) {
1477 if (do_update || (int)version_stamp < cct->_conf->filestore_update_to) {
1478 derr << __FUNC__ << ": stale version stamp detected: "
1479 << version_stamp
1480 << ". Proceeding, do_update "
1481 << "is set, performing disk format upgrade."
1482 << dendl;
1483 do_update = true;
1484 } else {
1485 ret = -EINVAL;
1486 derr << __FUNC__ << ": stale version stamp " << version_stamp
1487 << ". Please run the FileStore update script before starting the "
1488 << "OSD, or set filestore_update_to to " << target_version
1489 << " (currently " << cct->_conf->filestore_update_to << ")"
1490 << dendl;
1491 goto close_fsid_fd;
1492 }
1493 }
1494
1495 ret = read_superblock();
1496 if (ret < 0) {
1497 goto close_fsid_fd;
1498 }
1499
1500 // Check if this FileStore supports all the necessary features to mount
1501 if (supported_compat_set.compare(superblock.compat_features) == -1) {
1502 derr << __FUNC__ << ": Incompatible features set "
1503 << superblock.compat_features << dendl;
1504 ret = -EINVAL;
1505 goto close_fsid_fd;
1506 }
1507
1508 // open some dir handles
1509 basedir_fd = ::open(basedir.c_str(), O_RDONLY);
1510 if (basedir_fd < 0) {
1511 ret = -errno;
1512 derr << __FUNC__ << ": failed to open " << basedir << ": "
1513 << cpp_strerror(ret) << dendl;
1514 basedir_fd = -1;
1515 goto close_fsid_fd;
1516 }
1517
1518 // test for btrfs, xattrs, etc.
1519 ret = _detect_fs();
1520 if (ret < 0) {
1521 derr << __FUNC__ << ": error in _detect_fs: "
1522 << cpp_strerror(ret) << dendl;
1523 goto close_basedir_fd;
1524 }
1525
1526 {
1527 list<string> ls;
1528 ret = backend->list_checkpoints(ls);
1529 if (ret < 0) {
1530 derr << __FUNC__ << ": error in _list_snaps: "<< cpp_strerror(ret) << dendl;
1531 goto close_basedir_fd;
1532 }
1533
1534 long long unsigned c, prev = 0;
1535 char clustersnap[NAME_MAX];
1536 for (list<string>::iterator it = ls.begin(); it != ls.end(); ++it) {
1537 if (sscanf(it->c_str(), COMMIT_SNAP_ITEM, &c) == 1) {
1538 assert(c > prev);
1539 prev = c;
1540 snaps.push_back(c);
1541 } else if (sscanf(it->c_str(), CLUSTER_SNAP_ITEM, clustersnap) == 1)
1542 cluster_snaps.insert(*it);
1543 }
1544 }
1545
1546 if (m_osd_rollback_to_cluster_snap.length() &&
1547 cluster_snaps.count(m_osd_rollback_to_cluster_snap) == 0) {
1548 derr << "rollback to cluster snapshot '" << m_osd_rollback_to_cluster_snap << "': not found" << dendl;
1549 ret = -ENOENT;
1550 goto close_basedir_fd;
1551 }
1552
1553 char nosnapfn[200];
1554 snprintf(nosnapfn, sizeof(nosnapfn), "%s/nosnap", current_fn.c_str());
1555
1556 if (backend->can_checkpoint()) {
1557 if (snaps.empty()) {
1558 dout(0) << __FUNC__ << ": WARNING: no consistent snaps found, store may be in inconsistent state" << dendl;
1559 } else {
1560 char s[NAME_MAX];
1561 uint64_t curr_seq = 0;
1562
1563 if (m_osd_rollback_to_cluster_snap.length()) {
1564 derr << TEXT_RED
1565 << " ** NOTE: rolling back to cluster snapshot " << m_osd_rollback_to_cluster_snap << " **"
1566 << TEXT_NORMAL
1567 << dendl;
1568 assert(cluster_snaps.count(m_osd_rollback_to_cluster_snap));
1569 snprintf(s, sizeof(s), CLUSTER_SNAP_ITEM, m_osd_rollback_to_cluster_snap.c_str());
1570 } else {
1571 {
1572 int fd = read_op_seq(&curr_seq);
1573 if (fd >= 0) {
1574 VOID_TEMP_FAILURE_RETRY(::close(fd));
1575 }
1576 }
1577 if (curr_seq)
1578 dout(10) << " current/ seq was " << curr_seq << dendl;
1579 else
1580 dout(10) << " current/ missing entirely (unusual, but okay)" << dendl;
1581
1582 uint64_t cp = snaps.back();
1583 dout(10) << " most recent snap from " << snaps << " is " << cp << dendl;
1584
1585 // if current/ is marked as non-snapshotted, refuse to roll
1586 // back (without clear direction) to avoid throwing out new
1587 // data.
1588 struct stat st;
1589 if (::stat(nosnapfn, &st) == 0) {
1590 if (!m_osd_use_stale_snap) {
1591 derr << "ERROR: " << nosnapfn << " exists, not rolling back to avoid losing new data" << dendl;
1592 derr << "Force rollback to old snapshotted version with 'osd use stale snap = true'" << dendl;
1593 derr << "config option for --osd-use-stale-snap startup argument." << dendl;
1594 ret = -ENOTSUP;
1595 goto close_basedir_fd;
1596 }
1597 derr << "WARNING: user forced start with data sequence mismatch: current was " << curr_seq
1598 << ", newest snap is " << cp << dendl;
1599 cerr << TEXT_YELLOW
1600 << " ** WARNING: forcing the use of stale snapshot data **"
1601 << TEXT_NORMAL << std::endl;
1602 }
1603
1604 dout(10) << __FUNC__ << ": rolling back to consistent snap " << cp << dendl;
1605 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)cp);
1606 }
1607
1608 // drop current?
1609 ret = backend->rollback_to(s);
1610 if (ret) {
1611 derr << __FUNC__ << ": error rolling back to " << s << ": "
1612 << cpp_strerror(ret) << dendl;
1613 goto close_basedir_fd;
1614 }
1615 }
1616 }
1617 initial_op_seq = 0;
1618
1619 current_fd = ::open(current_fn.c_str(), O_RDONLY);
1620 if (current_fd < 0) {
1621 ret = -errno;
1622 derr << __FUNC__ << ": error opening: " << current_fn << ": " << cpp_strerror(ret) << dendl;
1623 goto close_basedir_fd;
1624 }
1625
1626 assert(current_fd >= 0);
1627
1628 op_fd = read_op_seq(&initial_op_seq);
1629 if (op_fd < 0) {
1630 ret = op_fd;
1631 derr << __FUNC__ << ": read_op_seq failed" << dendl;
1632 goto close_current_fd;
1633 }
1634
1635 dout(5) << "mount op_seq is " << initial_op_seq << dendl;
1636 if (initial_op_seq == 0) {
1637 derr << "mount initial op seq is 0; something is wrong" << dendl;
1638 ret = -EINVAL;
1639 goto close_current_fd;
1640 }
1641
1642 if (!backend->can_checkpoint()) {
1643 // mark current/ as non-snapshotted so that we don't rollback away
1644 // from it.
1645 int r = ::creat(nosnapfn, 0644);
1646 if (r < 0) {
1647 ret = -errno;
1648 derr << __FUNC__ << ": failed to create current/nosnap" << dendl;
1649 goto close_current_fd;
1650 }
1651 VOID_TEMP_FAILURE_RETRY(::close(r));
1652 } else {
1653 // clear nosnap marker, if present.
1654 ::unlink(nosnapfn);
1655 }
1656
1657 // check fsid with omap
1658 // get omap fsid
1659 int omap_fsid_fd;
1660 char omap_fsid_buf[PATH_MAX];
1661 struct ::stat omap_fsid_stat;
1662 snprintf(omap_fsid_buf, sizeof(omap_fsid_buf), "%s/osd_uuid", omap_dir.c_str());
1663 // if osd_uuid not exists, assume as this omap matchs corresponding osd
1664 if (::stat(omap_fsid_buf, &omap_fsid_stat) != 0){
1665 dout(10) << __FUNC__ << ": osd_uuid not found under omap, "
1666 << "assume as matched."
1667 << dendl;
1668 }else{
1669 // if osd_uuid exists, compares osd_uuid with fsid
1670 omap_fsid_fd = ::open(omap_fsid_buf, O_RDONLY, 0644);
1671 if (omap_fsid_fd < 0) {
1672 ret = -errno;
1673 derr << __FUNC__ << ": error opening '" << omap_fsid_buf << "': "
1674 << cpp_strerror(ret)
1675 << dendl;
1676 goto close_current_fd;
1677 }
1678 ret = read_fsid(omap_fsid_fd, &omap_fsid);
1679 VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd));
1680 omap_fsid_fd = -1; // defensive
1681 if (ret < 0) {
1682 derr << __FUNC__ << ": error reading omap_fsid_fd"
1683 << ", omap_fsid = " << omap_fsid
1684 << cpp_strerror(ret)
1685 << dendl;
1686 goto close_current_fd;
1687 }
1688 if (fsid != omap_fsid) {
1689 derr << __FUNC__ << ": " << omap_fsid_buf
1690 << " has existed omap fsid " << omap_fsid
1691 << " != expected osd fsid " << fsid
1692 << dendl;
1693 ret = -EINVAL;
1694 goto close_current_fd;
1695 }
1696 }
1697
1698 dout(0) << "start omap initiation" << dendl;
1699 if (!(generic_flags & SKIP_MOUNT_OMAP)) {
1700 KeyValueDB * omap_store = KeyValueDB::create(cct,
1701 superblock.omap_backend,
1702 omap_dir);
1703 if (omap_store == NULL)
1704 {
1705 derr << __FUNC__ << ": Error creating " << superblock.omap_backend << dendl;
1706 ret = -1;
1707 goto close_current_fd;
1708 }
1709
1710 if (superblock.omap_backend == "rocksdb")
1711 ret = omap_store->init(cct->_conf->filestore_rocksdb_options);
1712 else
1713 ret = omap_store->init();
1714
1715 if (ret < 0) {
1716 derr << __FUNC__ << ": Error initializing omap_store: " << cpp_strerror(ret) << dendl;
1717 goto close_current_fd;
1718 }
1719
1720 stringstream err;
1721 if (omap_store->create_and_open(err)) {
1722 delete omap_store;
1723 derr << __FUNC__ << ": Error initializing " << superblock.omap_backend
1724 << " : " << err.str() << dendl;
1725 ret = -1;
1726 goto close_current_fd;
1727 }
1728
1729 DBObjectMap *dbomap = new DBObjectMap(cct, omap_store);
1730 ret = dbomap->init(do_update);
1731 if (ret < 0) {
1732 delete dbomap;
1733 derr << __FUNC__ << ": Error initializing DBObjectMap: " << ret << dendl;
1734 goto close_current_fd;
1735 }
1736 stringstream err2;
1737
1738 if (cct->_conf->filestore_debug_omap_check && !dbomap->check(err2)) {
1739 derr << err2.str() << dendl;
1740 delete dbomap;
1741 ret = -EINVAL;
1742 goto close_current_fd;
1743 }
1744 object_map.reset(dbomap);
1745 }
1746
1747 // journal
1748 new_journal();
1749
1750 // select journal mode?
1751 if (journal) {
1752 if (!m_filestore_journal_writeahead &&
1753 !m_filestore_journal_parallel &&
1754 !m_filestore_journal_trailing) {
1755 if (!backend->can_checkpoint()) {
1756 m_filestore_journal_writeahead = true;
1757 dout(0) << __FUNC__ << ": enabling WRITEAHEAD journal mode: checkpoint is not enabled" << dendl;
1758 } else {
1759 m_filestore_journal_parallel = true;
1760 dout(0) << __FUNC__ << ": enabling PARALLEL journal mode: fs, checkpoint is enabled" << dendl;
1761 }
1762 } else {
1763 if (m_filestore_journal_writeahead)
1764 dout(0) << __FUNC__ << ": WRITEAHEAD journal mode explicitly enabled in conf" << dendl;
1765 if (m_filestore_journal_parallel)
1766 dout(0) << __FUNC__ << ": PARALLEL journal mode explicitly enabled in conf" << dendl;
1767 if (m_filestore_journal_trailing)
1768 dout(0) << __FUNC__ << ": TRAILING journal mode explicitly enabled in conf" << dendl;
1769 }
1770 if (m_filestore_journal_writeahead)
1771 journal->set_wait_on_full(true);
1772 } else {
1773 dout(0) << __FUNC__ << ": no journal" << dendl;
1774 }
1775
1776 ret = _sanity_check_fs();
1777 if (ret) {
1778 derr << __FUNC__ << ": _sanity_check_fs failed with error "
1779 << ret << dendl;
1780 goto close_current_fd;
1781 }
1782
1783 // Cleanup possibly invalid collections
1784 {
1785 vector<coll_t> collections;
1786 ret = list_collections(collections, true);
1787 if (ret < 0) {
1788 derr << "Error " << ret << " while listing collections" << dendl;
1789 goto close_current_fd;
1790 }
1791 for (vector<coll_t>::iterator i = collections.begin();
1792 i != collections.end();
1793 ++i) {
1794 Index index;
1795 ret = get_index(*i, &index);
1796 if (ret < 0) {
1797 derr << "Unable to mount index " << *i
1798 << " with error: " << ret << dendl;
1799 goto close_current_fd;
1800 }
1801 assert(NULL != index.index);
1802 RWLock::WLocker l((index.index)->access_lock);
1803
1804 index->cleanup();
1805 }
1806 }
1807 if (!m_disable_wbthrottle) {
1808 wbthrottle.start();
1809 } else {
1810 dout(0) << __FUNC__ << ": INFO: WbThrottle is disabled" << dendl;
1811 if (cct->_conf->filestore_odsync_write) {
1812 dout(0) << __FUNC__ << ": INFO: O_DSYNC write is enabled" << dendl;
1813 }
1814 }
1815 sync_thread.create("filestore_sync");
1816
1817 if (!(generic_flags & SKIP_JOURNAL_REPLAY)) {
1818 ret = journal_replay(initial_op_seq);
1819 if (ret < 0) {
1820 derr << __FUNC__ << ": failed to open journal " << journalpath << ": " << cpp_strerror(ret) << dendl;
1821 if (ret == -ENOTTY) {
1822 derr << "maybe journal is not pointing to a block device and its size "
1823 << "wasn't configured?" << dendl;
1824 }
1825
1826 goto stop_sync;
1827 }
1828 }
1829
1830 {
1831 stringstream err2;
1832 if (cct->_conf->filestore_debug_omap_check && !object_map->check(err2)) {
1833 derr << err2.str() << dendl;
1834 ret = -EINVAL;
1835 goto stop_sync;
1836 }
1837 }
1838
1839 init_temp_collections();
1840
1841 journal_start();
1842
1843 op_tp.start();
1844 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
1845 (*it)->start();
1846 }
1847 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
1848 (*it)->start();
1849 }
1850
1851 timer.init();
1852
1853 // upgrade?
1854 if (cct->_conf->filestore_update_to >= (int)get_target_version()) {
1855 int err = upgrade();
1856 if (err < 0) {
1857 derr << "error converting store" << dendl;
1858 umount();
1859 return err;
1860 }
1861 }
1862
1863 // all okay.
1864 return 0;
1865
1866 stop_sync:
1867 // stop sync thread
1868 lock.Lock();
1869 stop = true;
1870 sync_cond.Signal();
1871 lock.Unlock();
1872 sync_thread.join();
1873 if (!m_disable_wbthrottle) {
1874 wbthrottle.stop();
1875 }
1876 close_current_fd:
1877 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
1878 current_fd = -1;
1879 close_basedir_fd:
1880 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1881 basedir_fd = -1;
1882 close_fsid_fd:
1883 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1884 fsid_fd = -1;
1885 done:
1886 assert(!m_filestore_fail_eio || ret != -EIO);
1887 delete backend;
1888 backend = NULL;
1889 object_map.reset();
1890 return ret;
1891 }
1892
1893 void FileStore::init_temp_collections()
1894 {
1895 dout(10) << __FUNC__ << dendl;
1896 vector<coll_t> ls;
1897 int r = list_collections(ls, true);
1898 assert(r >= 0);
1899
1900 dout(20) << " ls " << ls << dendl;
1901
1902 SequencerPosition spos;
1903
1904 set<coll_t> temps;
1905 for (vector<coll_t>::iterator p = ls.begin(); p != ls.end(); ++p)
1906 if (p->is_temp())
1907 temps.insert(*p);
1908 dout(20) << " temps " << temps << dendl;
1909
1910 for (vector<coll_t>::iterator p = ls.begin(); p != ls.end(); ++p) {
1911 if (p->is_temp())
1912 continue;
1913 if (p->is_meta())
1914 continue;
1915 coll_t temp = p->get_temp();
1916 if (temps.count(temp)) {
1917 temps.erase(temp);
1918 } else {
1919 dout(10) << __FUNC__ << ": creating " << temp << dendl;
1920 r = _create_collection(temp, 0, spos);
1921 assert(r == 0);
1922 }
1923 }
1924
1925 for (set<coll_t>::iterator p = temps.begin(); p != temps.end(); ++p) {
1926 dout(10) << __FUNC__ << ": removing stray " << *p << dendl;
1927 r = _collection_remove_recursive(*p, spos);
1928 assert(r == 0);
1929 }
1930 }
1931
1932 int FileStore::umount()
1933 {
1934 dout(5) << __FUNC__ << ": " << basedir << dendl;
1935
1936 flush();
1937 sync();
1938 do_force_sync();
1939
1940 lock.Lock();
1941 stop = true;
1942 sync_cond.Signal();
1943 lock.Unlock();
1944 sync_thread.join();
1945 if (!m_disable_wbthrottle){
1946 wbthrottle.stop();
1947 }
1948 op_tp.stop();
1949
1950 journal_stop();
1951 if (!(generic_flags & SKIP_JOURNAL_REPLAY))
1952 journal_write_close();
1953
1954 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
1955 (*it)->stop();
1956 }
1957 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
1958 (*it)->stop();
1959 }
1960
1961 if (fsid_fd >= 0) {
1962 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1963 fsid_fd = -1;
1964 }
1965 if (op_fd >= 0) {
1966 VOID_TEMP_FAILURE_RETRY(::close(op_fd));
1967 op_fd = -1;
1968 }
1969 if (current_fd >= 0) {
1970 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
1971 current_fd = -1;
1972 }
1973 if (basedir_fd >= 0) {
1974 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1975 basedir_fd = -1;
1976 }
1977
1978 force_sync = false;
1979
1980 delete backend;
1981 backend = NULL;
1982
1983 object_map.reset();
1984
1985 {
1986 Mutex::Locker l(sync_entry_timeo_lock);
1987 timer.shutdown();
1988 }
1989
1990 // nothing
1991 return 0;
1992 }
1993
1994
1995
1996
1997 /// -----------------------------
1998
1999 FileStore::Op *FileStore::build_op(vector<Transaction>& tls,
2000 Context *onreadable,
2001 Context *onreadable_sync,
2002 TrackedOpRef osd_op)
2003 {
2004 uint64_t bytes = 0, ops = 0;
2005 for (vector<Transaction>::iterator p = tls.begin();
2006 p != tls.end();
2007 ++p) {
2008 bytes += (*p).get_num_bytes();
2009 ops += (*p).get_num_ops();
2010 }
2011
2012 Op *o = new Op;
2013 o->start = ceph_clock_now();
2014 o->tls = std::move(tls);
2015 o->onreadable = onreadable;
2016 o->onreadable_sync = onreadable_sync;
2017 o->ops = ops;
2018 o->bytes = bytes;
2019 o->osd_op = osd_op;
2020 return o;
2021 }
2022
2023
2024
2025 void FileStore::queue_op(OpSequencer *osr, Op *o)
2026 {
2027 // queue op on sequencer, then queue sequencer for the threadpool,
2028 // so that regardless of which order the threads pick up the
2029 // sequencer, the op order will be preserved.
2030
2031 osr->queue(o);
2032 o->trace.event("queued");
2033
2034 logger->inc(l_filestore_ops);
2035 logger->inc(l_filestore_bytes, o->bytes);
2036
2037 dout(5) << __FUNC__ << ": " << o << " seq " << o->op
2038 << " " << *osr
2039 << " " << o->bytes << " bytes"
2040 << " (queue has " << throttle_ops.get_current() << " ops and " << throttle_bytes.get_current() << " bytes)"
2041 << dendl;
2042 op_wq.queue(osr);
2043 }
2044
2045 void FileStore::op_queue_reserve_throttle(Op *o)
2046 {
2047 throttle_ops.get();
2048 throttle_bytes.get(o->bytes);
2049
2050 logger->set(l_filestore_op_queue_ops, throttle_ops.get_current());
2051 logger->set(l_filestore_op_queue_bytes, throttle_bytes.get_current());
2052 }
2053
2054 void FileStore::op_queue_release_throttle(Op *o)
2055 {
2056 throttle_ops.put();
2057 throttle_bytes.put(o->bytes);
2058 logger->set(l_filestore_op_queue_ops, throttle_ops.get_current());
2059 logger->set(l_filestore_op_queue_bytes, throttle_bytes.get_current());
2060 }
2061
2062 void FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)
2063 {
2064 if (!m_disable_wbthrottle) {
2065 wbthrottle.throttle();
2066 }
2067 // inject a stall?
2068 if (cct->_conf->filestore_inject_stall) {
2069 int orig = cct->_conf->filestore_inject_stall;
2070 dout(5) << __FUNC__ << ": filestore_inject_stall " << orig << ", sleeping" << dendl;
2071 sleep(orig);
2072 cct->_conf->set_val("filestore_inject_stall", "0");
2073 dout(5) << __FUNC__ << ": done stalling" << dendl;
2074 }
2075
2076 osr->apply_lock.Lock();
2077 Op *o = osr->peek_queue();
2078 o->trace.event("op_apply_start");
2079 apply_manager.op_apply_start(o->op);
2080 dout(5) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " start" << dendl;
2081 o->trace.event("_do_transactions start");
2082 int r = _do_transactions(o->tls, o->op, &handle);
2083 o->trace.event("op_apply_finish");
2084 apply_manager.op_apply_finish(o->op);
2085 dout(10) << __FUNC__ << ": " << o << " seq " << o->op << " r = " << r
2086 << ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl;
2087
2088 o->tls.clear();
2089
2090 }
2091
2092 void FileStore::_finish_op(OpSequencer *osr)
2093 {
2094 list<Context*> to_queue;
2095 Op *o = osr->dequeue(&to_queue);
2096
2097 utime_t lat = ceph_clock_now();
2098 lat -= o->start;
2099
2100 dout(10) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " lat " << lat << dendl;
2101 osr->apply_lock.Unlock(); // locked in _do_op
2102 o->trace.event("_finish_op");
2103
2104 // called with tp lock held
2105 op_queue_release_throttle(o);
2106
2107 logger->tinc(l_filestore_apply_latency, lat);
2108
2109 if (o->onreadable_sync) {
2110 o->onreadable_sync->complete(0);
2111 }
2112 if (o->onreadable) {
2113 apply_finishers[osr->id % m_apply_finisher_num]->queue(o->onreadable);
2114 }
2115 if (!to_queue.empty()) {
2116 apply_finishers[osr->id % m_apply_finisher_num]->queue(to_queue);
2117 }
2118 delete o;
2119 }
2120
2121
2122 struct C_JournaledAhead : public Context {
2123 FileStore *fs;
2124 FileStore::OpSequencer *osr;
2125 FileStore::Op *o;
2126 Context *ondisk;
2127
2128 C_JournaledAhead(FileStore *f, FileStore::OpSequencer *os, FileStore::Op *o, Context *ondisk):
2129 fs(f), osr(os), o(o), ondisk(ondisk) { }
2130 void finish(int r) override {
2131 fs->_journaled_ahead(osr, o, ondisk);
2132 }
2133 };
2134
2135 int FileStore::queue_transactions(Sequencer *posr, vector<Transaction>& tls,
2136 TrackedOpRef osd_op,
2137 ThreadPool::TPHandle *handle)
2138 {
2139 Context *onreadable;
2140 Context *ondisk;
2141 Context *onreadable_sync;
2142 ObjectStore::Transaction::collect_contexts(
2143 tls, &onreadable, &ondisk, &onreadable_sync);
2144
2145 if (cct->_conf->objectstore_blackhole) {
2146 dout(0) << __FUNC__ << ": objectstore_blackhole = TRUE, dropping transaction"
2147 << dendl;
2148 delete ondisk;
2149 delete onreadable;
2150 delete onreadable_sync;
2151 return 0;
2152 }
2153
2154 utime_t start = ceph_clock_now();
2155 // set up the sequencer
2156 OpSequencer *osr;
2157 assert(posr);
2158 if (posr->p) {
2159 osr = static_cast<OpSequencer *>(posr->p.get());
2160 dout(5) << __FUNC__ << ": existing " << osr << " " << *osr << dendl;
2161 } else {
2162 osr = new OpSequencer(cct, ++next_osr_id);
2163 osr->set_cct(cct);
2164 osr->parent = posr;
2165 posr->p = osr;
2166 dout(5) << __FUNC__ << ": new " << osr << " " << *osr << dendl;
2167 }
2168
2169 // used to include osr information in tracepoints during transaction apply
2170 for (vector<Transaction>::iterator i = tls.begin(); i != tls.end(); ++i) {
2171 (*i).set_osr(osr);
2172 }
2173
2174 ZTracer::Trace trace;
2175 if (osd_op && osd_op->pg_trace) {
2176 osd_op->store_trace.init("filestore op", &trace_endpoint, &osd_op->pg_trace);
2177 trace = osd_op->store_trace;
2178 }
2179
2180 if (journal && journal->is_writeable() && !m_filestore_journal_trailing) {
2181 Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
2182
2183 //prepare and encode transactions data out of lock
2184 bufferlist tbl;
2185 int orig_len = journal->prepare_entry(o->tls, &tbl);
2186
2187 if (handle)
2188 handle->suspend_tp_timeout();
2189
2190 op_queue_reserve_throttle(o);
2191 journal->reserve_throttle_and_backoff(tbl.length());
2192
2193 if (handle)
2194 handle->reset_tp_timeout();
2195
2196 uint64_t op_num = submit_manager.op_submit_start();
2197 o->op = op_num;
2198 trace.keyval("opnum", op_num);
2199
2200 if (m_filestore_do_dump)
2201 dump_transactions(o->tls, o->op, osr);
2202
2203 if (m_filestore_journal_parallel) {
2204 dout(5) << __FUNC__ << ": (parallel) " << o->op << " " << o->tls << dendl;
2205
2206 trace.keyval("journal mode", "parallel");
2207 trace.event("journal started");
2208 _op_journal_transactions(tbl, orig_len, o->op, ondisk, osd_op);
2209
2210 // queue inside submit_manager op submission lock
2211 queue_op(osr, o);
2212 trace.event("op queued");
2213 } else if (m_filestore_journal_writeahead) {
2214 dout(5) << __FUNC__ << ": (writeahead) " << o->op << " " << o->tls << dendl;
2215
2216 osr->queue_journal(o->op);
2217
2218 trace.keyval("journal mode", "writeahead");
2219 trace.event("journal started");
2220 _op_journal_transactions(tbl, orig_len, o->op,
2221 new C_JournaledAhead(this, osr, o, ondisk),
2222 osd_op);
2223 } else {
2224 ceph_abort();
2225 }
2226 submit_manager.op_submit_finish(op_num);
2227 utime_t end = ceph_clock_now();
2228 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2229 return 0;
2230 }
2231
2232 if (!journal) {
2233 Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
2234 dout(5) << __FUNC__ << ": (no journal) " << o << " " << tls << dendl;
2235
2236 if (handle)
2237 handle->suspend_tp_timeout();
2238
2239 op_queue_reserve_throttle(o);
2240
2241 if (handle)
2242 handle->reset_tp_timeout();
2243
2244 uint64_t op_num = submit_manager.op_submit_start();
2245 o->op = op_num;
2246
2247 if (m_filestore_do_dump)
2248 dump_transactions(o->tls, o->op, osr);
2249
2250 queue_op(osr, o);
2251 trace.keyval("opnum", op_num);
2252 trace.keyval("journal mode", "none");
2253 trace.event("op queued");
2254
2255 if (ondisk)
2256 apply_manager.add_waiter(op_num, ondisk);
2257 submit_manager.op_submit_finish(op_num);
2258 utime_t end = ceph_clock_now();
2259 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2260 return 0;
2261 }
2262
2263 assert(journal);
2264 //prepare and encode transactions data out of lock
2265 bufferlist tbl;
2266 int orig_len = -1;
2267 if (journal->is_writeable()) {
2268 orig_len = journal->prepare_entry(tls, &tbl);
2269 }
2270 uint64_t op = submit_manager.op_submit_start();
2271 dout(5) << __FUNC__ << ": (trailing journal) " << op << " " << tls << dendl;
2272
2273 if (m_filestore_do_dump)
2274 dump_transactions(tls, op, osr);
2275
2276 trace.event("op_apply_start");
2277 trace.keyval("opnum", op);
2278 trace.keyval("journal mode", "trailing");
2279 apply_manager.op_apply_start(op);
2280 trace.event("do_transactions");
2281 int r = do_transactions(tls, op);
2282
2283 if (r >= 0) {
2284 trace.event("journal started");
2285 _op_journal_transactions(tbl, orig_len, op, ondisk, osd_op);
2286 } else {
2287 delete ondisk;
2288 }
2289
2290 // start on_readable finisher after we queue journal item, as on_readable callback
2291 // is allowed to delete the Transaction
2292 if (onreadable_sync) {
2293 onreadable_sync->complete(r);
2294 }
2295 apply_finishers[osr->id % m_apply_finisher_num]->queue(onreadable, r);
2296
2297 submit_manager.op_submit_finish(op);
2298 trace.event("op_apply_finish");
2299 apply_manager.op_apply_finish(op);
2300
2301 utime_t end = ceph_clock_now();
2302 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2303 return r;
2304 }
2305
2306 void FileStore::_journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk)
2307 {
2308 dout(5) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << " " << o->tls << dendl;
2309
2310 o->trace.event("writeahead journal finished");
2311
2312 // this should queue in order because the journal does it's completions in order.
2313 queue_op(osr, o);
2314
2315 list<Context*> to_queue;
2316 osr->dequeue_journal(&to_queue);
2317
2318 // do ondisk completions async, to prevent any onreadable_sync completions
2319 // getting blocked behind an ondisk completion.
2320 if (ondisk) {
2321 dout(10) << " queueing ondisk " << ondisk << dendl;
2322 ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(ondisk);
2323 }
2324 if (!to_queue.empty()) {
2325 ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(to_queue);
2326 }
2327 }
2328
2329 int FileStore::_do_transactions(
2330 vector<Transaction> &tls,
2331 uint64_t op_seq,
2332 ThreadPool::TPHandle *handle)
2333 {
2334 int trans_num = 0;
2335
2336 for (vector<Transaction>::iterator p = tls.begin();
2337 p != tls.end();
2338 ++p, trans_num++) {
2339 _do_transaction(*p, op_seq, trans_num, handle);
2340 if (handle)
2341 handle->reset_tp_timeout();
2342 }
2343
2344 return 0;
2345 }
2346
2347 void FileStore::_set_global_replay_guard(const coll_t& cid,
2348 const SequencerPosition &spos)
2349 {
2350 if (backend->can_checkpoint())
2351 return;
2352
2353 // sync all previous operations on this sequencer
2354 int ret = object_map->sync();
2355 if (ret < 0) {
2356 derr << __FUNC__ << ": omap sync error " << cpp_strerror(ret) << dendl;
2357 assert(0 == "_set_global_replay_guard failed");
2358 }
2359 ret = sync_filesystem(basedir_fd);
2360 if (ret < 0) {
2361 derr << __FUNC__ << ": sync_filesystem error " << cpp_strerror(ret) << dendl;
2362 assert(0 == "_set_global_replay_guard failed");
2363 }
2364
2365 char fn[PATH_MAX];
2366 get_cdir(cid, fn, sizeof(fn));
2367 int fd = ::open(fn, O_RDONLY);
2368 if (fd < 0) {
2369 int err = errno;
2370 derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
2371 assert(0 == "_set_global_replay_guard failed");
2372 }
2373
2374 _inject_failure();
2375
2376 // then record that we did it
2377 bufferlist v;
2378 ::encode(spos, v);
2379 int r = chain_fsetxattr<true, true>(
2380 fd, GLOBAL_REPLAY_GUARD_XATTR, v.c_str(), v.length());
2381 if (r < 0) {
2382 derr << __FUNC__ << ": fsetxattr " << GLOBAL_REPLAY_GUARD_XATTR
2383 << " got " << cpp_strerror(r) << dendl;
2384 assert(0 == "fsetxattr failed");
2385 }
2386
2387 // and make sure our xattr is durable.
2388 ::fsync(fd);
2389
2390 _inject_failure();
2391
2392 VOID_TEMP_FAILURE_RETRY(::close(fd));
2393 dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
2394 }
2395
2396 int FileStore::_check_global_replay_guard(const coll_t& cid,
2397 const SequencerPosition& spos)
2398 {
2399 char fn[PATH_MAX];
2400 get_cdir(cid, fn, sizeof(fn));
2401 int fd = ::open(fn, O_RDONLY);
2402 if (fd < 0) {
2403 dout(10) << __FUNC__ << ": " << cid << " dne" << dendl;
2404 return 1; // if collection does not exist, there is no guard, and we can replay.
2405 }
2406
2407 char buf[100];
2408 int r = chain_fgetxattr(fd, GLOBAL_REPLAY_GUARD_XATTR, buf, sizeof(buf));
2409 if (r < 0) {
2410 dout(20) << __FUNC__ << ": no xattr" << dendl;
2411 assert(!m_filestore_fail_eio || r != -EIO);
2412 VOID_TEMP_FAILURE_RETRY(::close(fd));
2413 return 1; // no xattr
2414 }
2415 bufferlist bl;
2416 bl.append(buf, r);
2417
2418 SequencerPosition opos;
2419 bufferlist::iterator p = bl.begin();
2420 ::decode(opos, p);
2421
2422 VOID_TEMP_FAILURE_RETRY(::close(fd));
2423 return spos >= opos ? 1 : -1;
2424 }
2425
2426
2427 void FileStore::_set_replay_guard(const coll_t& cid,
2428 const SequencerPosition &spos,
2429 bool in_progress=false)
2430 {
2431 char fn[PATH_MAX];
2432 get_cdir(cid, fn, sizeof(fn));
2433 int fd = ::open(fn, O_RDONLY);
2434 if (fd < 0) {
2435 int err = errno;
2436 derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
2437 assert(0 == "_set_replay_guard failed");
2438 }
2439 _set_replay_guard(fd, spos, 0, in_progress);
2440 VOID_TEMP_FAILURE_RETRY(::close(fd));
2441 }
2442
2443
2444 void FileStore::_set_replay_guard(int fd,
2445 const SequencerPosition& spos,
2446 const ghobject_t *hoid,
2447 bool in_progress)
2448 {
2449 if (backend->can_checkpoint())
2450 return;
2451
2452 dout(10) << __FUNC__ << ": " << spos << (in_progress ? " START" : "") << dendl;
2453
2454 _inject_failure();
2455
2456 // first make sure the previous operation commits
2457 ::fsync(fd);
2458
2459 if (!in_progress) {
2460 // sync object_map too. even if this object has a header or keys,
2461 // it have had them in the past and then removed them, so always
2462 // sync.
2463 object_map->sync(hoid, &spos);
2464 }
2465
2466 _inject_failure();
2467
2468 // then record that we did it
2469 bufferlist v(40);
2470 ::encode(spos, v);
2471 ::encode(in_progress, v);
2472 int r = chain_fsetxattr<true, true>(
2473 fd, REPLAY_GUARD_XATTR, v.c_str(), v.length());
2474 if (r < 0) {
2475 derr << "fsetxattr " << REPLAY_GUARD_XATTR << " got " << cpp_strerror(r) << dendl;
2476 assert(0 == "fsetxattr failed");
2477 }
2478
2479 // and make sure our xattr is durable.
2480 ::fsync(fd);
2481
2482 _inject_failure();
2483
2484 dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
2485 }
2486
2487 void FileStore::_close_replay_guard(const coll_t& cid,
2488 const SequencerPosition &spos)
2489 {
2490 char fn[PATH_MAX];
2491 get_cdir(cid, fn, sizeof(fn));
2492 int fd = ::open(fn, O_RDONLY);
2493 if (fd < 0) {
2494 int err = errno;
2495 derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
2496 assert(0 == "_close_replay_guard failed");
2497 }
2498 _close_replay_guard(fd, spos);
2499 VOID_TEMP_FAILURE_RETRY(::close(fd));
2500 }
2501
2502 void FileStore::_close_replay_guard(int fd, const SequencerPosition& spos,
2503 const ghobject_t *hoid)
2504 {
2505 if (backend->can_checkpoint())
2506 return;
2507
2508 dout(10) << __FUNC__ << ": " << spos << dendl;
2509
2510 _inject_failure();
2511
2512 // sync object_map too. even if this object has a header or keys,
2513 // it have had them in the past and then removed them, so always
2514 // sync.
2515 object_map->sync(hoid, &spos);
2516
2517 // then record that we are done with this operation
2518 bufferlist v(40);
2519 ::encode(spos, v);
2520 bool in_progress = false;
2521 ::encode(in_progress, v);
2522 int r = chain_fsetxattr<true, true>(
2523 fd, REPLAY_GUARD_XATTR, v.c_str(), v.length());
2524 if (r < 0) {
2525 derr << "fsetxattr " << REPLAY_GUARD_XATTR << " got " << cpp_strerror(r) << dendl;
2526 assert(0 == "fsetxattr failed");
2527 }
2528
2529 // and make sure our xattr is durable.
2530 ::fsync(fd);
2531
2532 _inject_failure();
2533
2534 dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
2535 }
2536
2537 int FileStore::_check_replay_guard(const coll_t& cid, const ghobject_t &oid,
2538 const SequencerPosition& spos)
2539 {
2540 if (!replaying || backend->can_checkpoint())
2541 return 1;
2542
2543 int r = _check_global_replay_guard(cid, spos);
2544 if (r < 0)
2545 return r;
2546
2547 FDRef fd;
2548 r = lfn_open(cid, oid, false, &fd);
2549 if (r < 0) {
2550 dout(10) << __FUNC__ << ": " << cid << " " << oid << " dne" << dendl;
2551 return 1; // if file does not exist, there is no guard, and we can replay.
2552 }
2553 int ret = _check_replay_guard(**fd, spos);
2554 lfn_close(fd);
2555 return ret;
2556 }
2557
2558 int FileStore::_check_replay_guard(const coll_t& cid, const SequencerPosition& spos)
2559 {
2560 if (!replaying || backend->can_checkpoint())
2561 return 1;
2562
2563 char fn[PATH_MAX];
2564 get_cdir(cid, fn, sizeof(fn));
2565 int fd = ::open(fn, O_RDONLY);
2566 if (fd < 0) {
2567 dout(10) << __FUNC__ << ": " << cid << " dne" << dendl;
2568 return 1; // if collection does not exist, there is no guard, and we can replay.
2569 }
2570 int ret = _check_replay_guard(fd, spos);
2571 VOID_TEMP_FAILURE_RETRY(::close(fd));
2572 return ret;
2573 }
2574
2575 int FileStore::_check_replay_guard(int fd, const SequencerPosition& spos)
2576 {
2577 if (!replaying || backend->can_checkpoint())
2578 return 1;
2579
2580 char buf[100];
2581 int r = chain_fgetxattr(fd, REPLAY_GUARD_XATTR, buf, sizeof(buf));
2582 if (r < 0) {
2583 dout(20) << __FUNC__ << ": no xattr" << dendl;
2584 assert(!m_filestore_fail_eio || r != -EIO);
2585 return 1; // no xattr
2586 }
2587 bufferlist bl;
2588 bl.append(buf, r);
2589
2590 SequencerPosition opos;
2591 bufferlist::iterator p = bl.begin();
2592 ::decode(opos, p);
2593 bool in_progress = false;
2594 if (!p.end()) // older journals don't have this
2595 ::decode(in_progress, p);
2596 if (opos > spos) {
2597 dout(10) << __FUNC__ << ": object has " << opos << " > current pos " << spos
2598 << ", now or in future, SKIPPING REPLAY" << dendl;
2599 return -1;
2600 } else if (opos == spos) {
2601 if (in_progress) {
2602 dout(10) << __FUNC__ << ": object has " << opos << " == current pos " << spos
2603 << ", in_progress=true, CONDITIONAL REPLAY" << dendl;
2604 return 0;
2605 } else {
2606 dout(10) << __FUNC__ << ": object has " << opos << " == current pos " << spos
2607 << ", in_progress=false, SKIPPING REPLAY" << dendl;
2608 return -1;
2609 }
2610 } else {
2611 dout(10) << __FUNC__ << ": object has " << opos << " < current pos " << spos
2612 << ", in past, will replay" << dendl;
2613 return 1;
2614 }
2615 }
2616
2617 void FileStore::_do_transaction(
2618 Transaction& t, uint64_t op_seq, int trans_num,
2619 ThreadPool::TPHandle *handle)
2620 {
2621 dout(10) << __FUNC__ << ": on " << &t << dendl;
2622
2623 #ifdef WITH_LTTNG
2624 const char *osr_name = t.get_osr() ? static_cast<OpSequencer*>(t.get_osr())->get_name().c_str() : "<NULL>";
2625 #endif
2626
2627 Transaction::iterator i = t.begin();
2628
2629 SequencerPosition spos(op_seq, trans_num, 0);
2630 while (i.have_op()) {
2631 if (handle)
2632 handle->reset_tp_timeout();
2633
2634 Transaction::Op *op = i.decode_op();
2635 int r = 0;
2636
2637 _inject_failure();
2638
2639 switch (op->op) {
2640 case Transaction::OP_NOP:
2641 break;
2642 case Transaction::OP_TOUCH:
2643 {
2644 const coll_t &_cid = i.get_cid(op->cid);
2645 const ghobject_t &oid = i.get_oid(op->oid);
2646 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2647 _cid : _cid.get_temp();
2648 tracepoint(objectstore, touch_enter, osr_name);
2649 if (_check_replay_guard(cid, oid, spos) > 0)
2650 r = _touch(cid, oid);
2651 tracepoint(objectstore, touch_exit, r);
2652 }
2653 break;
2654
2655 case Transaction::OP_WRITE:
2656 {
2657 const coll_t &_cid = i.get_cid(op->cid);
2658 const ghobject_t &oid = i.get_oid(op->oid);
2659 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2660 _cid : _cid.get_temp();
2661 uint64_t off = op->off;
2662 uint64_t len = op->len;
2663 uint32_t fadvise_flags = i.get_fadvise_flags();
2664 bufferlist bl;
2665 i.decode_bl(bl);
2666 tracepoint(objectstore, write_enter, osr_name, off, len);
2667 if (_check_replay_guard(cid, oid, spos) > 0)
2668 r = _write(cid, oid, off, len, bl, fadvise_flags);
2669 tracepoint(objectstore, write_exit, r);
2670 }
2671 break;
2672
2673 case Transaction::OP_ZERO:
2674 {
2675 const coll_t &_cid = i.get_cid(op->cid);
2676 const ghobject_t &oid = i.get_oid(op->oid);
2677 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2678 _cid : _cid.get_temp();
2679 uint64_t off = op->off;
2680 uint64_t len = op->len;
2681 tracepoint(objectstore, zero_enter, osr_name, off, len);
2682 if (_check_replay_guard(cid, oid, spos) > 0)
2683 r = _zero(cid, oid, off, len);
2684 tracepoint(objectstore, zero_exit, r);
2685 }
2686 break;
2687
2688 case Transaction::OP_TRIMCACHE:
2689 {
2690 // deprecated, no-op
2691 }
2692 break;
2693
2694 case Transaction::OP_TRUNCATE:
2695 {
2696 const coll_t &_cid = i.get_cid(op->cid);
2697 const ghobject_t &oid = i.get_oid(op->oid);
2698 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2699 _cid : _cid.get_temp();
2700 uint64_t off = op->off;
2701 tracepoint(objectstore, truncate_enter, osr_name, off);
2702 if (_check_replay_guard(cid, oid, spos) > 0)
2703 r = _truncate(cid, oid, off);
2704 tracepoint(objectstore, truncate_exit, r);
2705 }
2706 break;
2707
2708 case Transaction::OP_REMOVE:
2709 {
2710 const coll_t &_cid = i.get_cid(op->cid);
2711 const ghobject_t &oid = i.get_oid(op->oid);
2712 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2713 _cid : _cid.get_temp();
2714 tracepoint(objectstore, remove_enter, osr_name);
2715 if (_check_replay_guard(cid, oid, spos) > 0)
2716 r = _remove(cid, oid, spos);
2717 tracepoint(objectstore, remove_exit, r);
2718 }
2719 break;
2720
2721 case Transaction::OP_SETATTR:
2722 {
2723 const coll_t &_cid = i.get_cid(op->cid);
2724 const ghobject_t &oid = i.get_oid(op->oid);
2725 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2726 _cid : _cid.get_temp();
2727 string name = i.decode_string();
2728 bufferlist bl;
2729 i.decode_bl(bl);
2730 tracepoint(objectstore, setattr_enter, osr_name);
2731 if (_check_replay_guard(cid, oid, spos) > 0) {
2732 map<string, bufferptr> to_set;
2733 to_set[name] = bufferptr(bl.c_str(), bl.length());
2734 r = _setattrs(cid, oid, to_set, spos);
2735 if (r == -ENOSPC)
2736 dout(0) << " ENOSPC on setxattr on " << cid << "/" << oid
2737 << " name " << name << " size " << bl.length() << dendl;
2738 }
2739 tracepoint(objectstore, setattr_exit, r);
2740 }
2741 break;
2742
2743 case Transaction::OP_SETATTRS:
2744 {
2745 const coll_t &_cid = i.get_cid(op->cid);
2746 const ghobject_t &oid = i.get_oid(op->oid);
2747 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2748 _cid : _cid.get_temp();
2749 map<string, bufferptr> aset;
2750 i.decode_attrset(aset);
2751 tracepoint(objectstore, setattrs_enter, osr_name);
2752 if (_check_replay_guard(cid, oid, spos) > 0)
2753 r = _setattrs(cid, oid, aset, spos);
2754 tracepoint(objectstore, setattrs_exit, r);
2755 if (r == -ENOSPC)
2756 dout(0) << " ENOSPC on setxattrs on " << cid << "/" << oid << dendl;
2757 }
2758 break;
2759
2760 case Transaction::OP_RMATTR:
2761 {
2762 const coll_t &_cid = i.get_cid(op->cid);
2763 const ghobject_t &oid = i.get_oid(op->oid);
2764 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2765 _cid : _cid.get_temp();
2766 string name = i.decode_string();
2767 tracepoint(objectstore, rmattr_enter, osr_name);
2768 if (_check_replay_guard(cid, oid, spos) > 0)
2769 r = _rmattr(cid, oid, name.c_str(), spos);
2770 tracepoint(objectstore, rmattr_exit, r);
2771 }
2772 break;
2773
2774 case Transaction::OP_RMATTRS:
2775 {
2776 const coll_t &_cid = i.get_cid(op->cid);
2777 const ghobject_t &oid = i.get_oid(op->oid);
2778 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2779 _cid : _cid.get_temp();
2780 tracepoint(objectstore, rmattrs_enter, osr_name);
2781 if (_check_replay_guard(cid, oid, spos) > 0)
2782 r = _rmattrs(cid, oid, spos);
2783 tracepoint(objectstore, rmattrs_exit, r);
2784 }
2785 break;
2786
2787 case Transaction::OP_CLONE:
2788 {
2789 const coll_t &_cid = i.get_cid(op->cid);
2790 const ghobject_t &oid = i.get_oid(op->oid);
2791 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2792 _cid : _cid.get_temp();
2793 const ghobject_t &noid = i.get_oid(op->dest_oid);
2794 tracepoint(objectstore, clone_enter, osr_name);
2795 r = _clone(cid, oid, noid, spos);
2796 tracepoint(objectstore, clone_exit, r);
2797 }
2798 break;
2799
2800 case Transaction::OP_CLONERANGE:
2801 {
2802 const coll_t &_cid = i.get_cid(op->cid);
2803 const ghobject_t &oid = i.get_oid(op->oid);
2804 const ghobject_t &noid = i.get_oid(op->dest_oid);
2805 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2806 _cid : _cid.get_temp();
2807 const coll_t &ncid = !_need_temp_object_collection(_cid, noid) ?
2808 _cid : _cid.get_temp();
2809 uint64_t off = op->off;
2810 uint64_t len = op->len;
2811 tracepoint(objectstore, clone_range_enter, osr_name, len);
2812 r = _clone_range(cid, oid, ncid, noid, off, len, off, spos);
2813 tracepoint(objectstore, clone_range_exit, r);
2814 }
2815 break;
2816
2817 case Transaction::OP_CLONERANGE2:
2818 {
2819 const coll_t &_cid = i.get_cid(op->cid);
2820 const ghobject_t &oid = i.get_oid(op->oid);
2821 const ghobject_t &noid = i.get_oid(op->dest_oid);
2822 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2823 _cid : _cid.get_temp();
2824 const coll_t &ncid = !_need_temp_object_collection(_cid, noid) ?
2825 _cid : _cid.get_temp();
2826 uint64_t srcoff = op->off;
2827 uint64_t len = op->len;
2828 uint64_t dstoff = op->dest_off;
2829 tracepoint(objectstore, clone_range2_enter, osr_name, len);
2830 r = _clone_range(cid, oid, ncid, noid, srcoff, len, dstoff, spos);
2831 tracepoint(objectstore, clone_range2_exit, r);
2832 }
2833 break;
2834
2835 case Transaction::OP_MKCOLL:
2836 {
2837 const coll_t &cid = i.get_cid(op->cid);
2838 tracepoint(objectstore, mkcoll_enter, osr_name);
2839 if (_check_replay_guard(cid, spos) > 0)
2840 r = _create_collection(cid, op->split_bits, spos);
2841 tracepoint(objectstore, mkcoll_exit, r);
2842 }
2843 break;
2844
2845 case Transaction::OP_COLL_SET_BITS:
2846 {
2847 const coll_t &cid = i.get_cid(op->cid);
2848 int bits = op->split_bits;
2849 r = _collection_set_bits(cid, bits);
2850 }
2851 break;
2852
2853 case Transaction::OP_COLL_HINT:
2854 {
2855 const coll_t &cid = i.get_cid(op->cid);
2856 uint32_t type = op->hint_type;
2857 bufferlist hint;
2858 i.decode_bl(hint);
2859 bufferlist::iterator hiter = hint.begin();
2860 if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) {
2861 uint32_t pg_num;
2862 uint64_t num_objs;
2863 ::decode(pg_num, hiter);
2864 ::decode(num_objs, hiter);
2865 if (_check_replay_guard(cid, spos) > 0) {
2866 r = _collection_hint_expected_num_objs(cid, pg_num, num_objs, spos);
2867 }
2868 } else {
2869 // Ignore the hint
2870 dout(10) << "Unrecognized collection hint type: " << type << dendl;
2871 }
2872 }
2873 break;
2874
2875 case Transaction::OP_RMCOLL:
2876 {
2877 const coll_t &cid = i.get_cid(op->cid);
2878 tracepoint(objectstore, rmcoll_enter, osr_name);
2879 if (_check_replay_guard(cid, spos) > 0)
2880 r = _destroy_collection(cid);
2881 tracepoint(objectstore, rmcoll_exit, r);
2882 }
2883 break;
2884
2885 case Transaction::OP_COLL_ADD:
2886 {
2887 const coll_t &ocid = i.get_cid(op->cid);
2888 const coll_t &ncid = i.get_cid(op->dest_cid);
2889 const ghobject_t &oid = i.get_oid(op->oid);
2890
2891 assert(oid.hobj.pool >= -1);
2892
2893 // always followed by OP_COLL_REMOVE
2894 Transaction::Op *op2 = i.decode_op();
2895 const coll_t &ocid2 = i.get_cid(op2->cid);
2896 const ghobject_t &oid2 = i.get_oid(op2->oid);
2897 assert(op2->op == Transaction::OP_COLL_REMOVE);
2898 assert(ocid2 == ocid);
2899 assert(oid2 == oid);
2900
2901 tracepoint(objectstore, coll_add_enter);
2902 r = _collection_add(ncid, ocid, oid, spos);
2903 tracepoint(objectstore, coll_add_exit, r);
2904 spos.op++;
2905 if (r < 0)
2906 break;
2907 tracepoint(objectstore, coll_remove_enter, osr_name);
2908 if (_check_replay_guard(ocid, oid, spos) > 0)
2909 r = _remove(ocid, oid, spos);
2910 tracepoint(objectstore, coll_remove_exit, r);
2911 }
2912 break;
2913
2914 case Transaction::OP_COLL_MOVE:
2915 {
2916 // WARNING: this is deprecated and buggy; only here to replay old journals.
2917 const coll_t &ocid = i.get_cid(op->cid);
2918 const coll_t &ncid = i.get_cid(op->dest_cid);
2919 const ghobject_t &oid = i.get_oid(op->oid);
2920 tracepoint(objectstore, coll_move_enter);
2921 r = _collection_add(ocid, ncid, oid, spos);
2922 if (r == 0 &&
2923 (_check_replay_guard(ocid, oid, spos) > 0))
2924 r = _remove(ocid, oid, spos);
2925 tracepoint(objectstore, coll_move_exit, r);
2926 }
2927 break;
2928
2929 case Transaction::OP_COLL_MOVE_RENAME:
2930 {
2931 const coll_t &_oldcid = i.get_cid(op->cid);
2932 const ghobject_t &oldoid = i.get_oid(op->oid);
2933 const coll_t &_newcid = i.get_cid(op->dest_cid);
2934 const ghobject_t &newoid = i.get_oid(op->dest_oid);
2935 const coll_t &oldcid = !_need_temp_object_collection(_oldcid, oldoid) ?
2936 _oldcid : _oldcid.get_temp();
2937 const coll_t &newcid = !_need_temp_object_collection(_newcid, newoid) ?
2938 _oldcid : _newcid.get_temp();
2939 tracepoint(objectstore, coll_move_rename_enter);
2940 r = _collection_move_rename(oldcid, oldoid, newcid, newoid, spos);
2941 tracepoint(objectstore, coll_move_rename_exit, r);
2942 }
2943 break;
2944
2945 case Transaction::OP_TRY_RENAME:
2946 {
2947 const coll_t &_cid = i.get_cid(op->cid);
2948 const ghobject_t &oldoid = i.get_oid(op->oid);
2949 const ghobject_t &newoid = i.get_oid(op->dest_oid);
2950 const coll_t &oldcid = !_need_temp_object_collection(_cid, oldoid) ?
2951 _cid : _cid.get_temp();
2952 const coll_t &newcid = !_need_temp_object_collection(_cid, newoid) ?
2953 _cid : _cid.get_temp();
2954 tracepoint(objectstore, coll_try_rename_enter);
2955 r = _collection_move_rename(oldcid, oldoid, newcid, newoid, spos, true);
2956 tracepoint(objectstore, coll_try_rename_exit, r);
2957 }
2958 break;
2959
2960 case Transaction::OP_COLL_SETATTR:
2961 case Transaction::OP_COLL_RMATTR:
2962 assert(0 == "collection attr methods no longer implemented");
2963 break;
2964
2965 case Transaction::OP_STARTSYNC:
2966 tracepoint(objectstore, startsync_enter, osr_name);
2967 _start_sync();
2968 tracepoint(objectstore, startsync_exit);
2969 break;
2970
2971 case Transaction::OP_COLL_RENAME:
2972 {
2973 r = -EOPNOTSUPP;
2974 }
2975 break;
2976
2977 case Transaction::OP_OMAP_CLEAR:
2978 {
2979 const coll_t &_cid = i.get_cid(op->cid);
2980 const ghobject_t &oid = i.get_oid(op->oid);
2981 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2982 _cid : _cid.get_temp();
2983 tracepoint(objectstore, omap_clear_enter, osr_name);
2984 r = _omap_clear(cid, oid, spos);
2985 tracepoint(objectstore, omap_clear_exit, r);
2986 }
2987 break;
2988 case Transaction::OP_OMAP_SETKEYS:
2989 {
2990 const coll_t &_cid = i.get_cid(op->cid);
2991 const ghobject_t &oid = i.get_oid(op->oid);
2992 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2993 _cid : _cid.get_temp();
2994 map<string, bufferlist> aset;
2995 i.decode_attrset(aset);
2996 tracepoint(objectstore, omap_setkeys_enter, osr_name);
2997 r = _omap_setkeys(cid, oid, aset, spos);
2998 tracepoint(objectstore, omap_setkeys_exit, r);
2999 }
3000 break;
3001 case Transaction::OP_OMAP_RMKEYS:
3002 {
3003 const coll_t &_cid = i.get_cid(op->cid);
3004 const ghobject_t &oid = i.get_oid(op->oid);
3005 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3006 _cid : _cid.get_temp();
3007 set<string> keys;
3008 i.decode_keyset(keys);
3009 tracepoint(objectstore, omap_rmkeys_enter, osr_name);
3010 r = _omap_rmkeys(cid, oid, keys, spos);
3011 tracepoint(objectstore, omap_rmkeys_exit, r);
3012 }
3013 break;
3014 case Transaction::OP_OMAP_RMKEYRANGE:
3015 {
3016 const coll_t &_cid = i.get_cid(op->cid);
3017 const ghobject_t &oid = i.get_oid(op->oid);
3018 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3019 _cid : _cid.get_temp();
3020 string first, last;
3021 first = i.decode_string();
3022 last = i.decode_string();
3023 tracepoint(objectstore, omap_rmkeyrange_enter, osr_name);
3024 r = _omap_rmkeyrange(cid, oid, first, last, spos);
3025 tracepoint(objectstore, omap_rmkeyrange_exit, r);
3026 }
3027 break;
3028 case Transaction::OP_OMAP_SETHEADER:
3029 {
3030 const coll_t &_cid = i.get_cid(op->cid);
3031 const ghobject_t &oid = i.get_oid(op->oid);
3032 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3033 _cid : _cid.get_temp();
3034 bufferlist bl;
3035 i.decode_bl(bl);
3036 tracepoint(objectstore, omap_setheader_enter, osr_name);
3037 r = _omap_setheader(cid, oid, bl, spos);
3038 tracepoint(objectstore, omap_setheader_exit, r);
3039 }
3040 break;
3041 case Transaction::OP_SPLIT_COLLECTION:
3042 {
3043 assert(0 == "not legacy journal; upgrade to firefly first");
3044 }
3045 break;
3046 case Transaction::OP_SPLIT_COLLECTION2:
3047 {
3048 coll_t cid = i.get_cid(op->cid);
3049 uint32_t bits = op->split_bits;
3050 uint32_t rem = op->split_rem;
3051 coll_t dest = i.get_cid(op->dest_cid);
3052 tracepoint(objectstore, split_coll2_enter, osr_name);
3053 r = _split_collection(cid, bits, rem, dest, spos);
3054 tracepoint(objectstore, split_coll2_exit, r);
3055 }
3056 break;
3057
3058 case Transaction::OP_SETALLOCHINT:
3059 {
3060 const coll_t &_cid = i.get_cid(op->cid);
3061 const ghobject_t &oid = i.get_oid(op->oid);
3062 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3063 _cid : _cid.get_temp();
3064 uint64_t expected_object_size = op->expected_object_size;
3065 uint64_t expected_write_size = op->expected_write_size;
3066 tracepoint(objectstore, setallochint_enter, osr_name);
3067 if (_check_replay_guard(cid, oid, spos) > 0)
3068 r = _set_alloc_hint(cid, oid, expected_object_size,
3069 expected_write_size);
3070 tracepoint(objectstore, setallochint_exit, r);
3071 }
3072 break;
3073
3074 default:
3075 derr << "bad op " << op->op << dendl;
3076 ceph_abort();
3077 }
3078
3079 if (r < 0) {
3080 bool ok = false;
3081
3082 if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE ||
3083 op->op == Transaction::OP_CLONE ||
3084 op->op == Transaction::OP_CLONERANGE2 ||
3085 op->op == Transaction::OP_COLL_ADD ||
3086 op->op == Transaction::OP_SETATTR ||
3087 op->op == Transaction::OP_SETATTRS ||
3088 op->op == Transaction::OP_RMATTR ||
3089 op->op == Transaction::OP_OMAP_SETKEYS ||
3090 op->op == Transaction::OP_OMAP_RMKEYS ||
3091 op->op == Transaction::OP_OMAP_RMKEYRANGE ||
3092 op->op == Transaction::OP_OMAP_SETHEADER))
3093 // -ENOENT is normally okay
3094 // ...including on a replayed OP_RMCOLL with checkpoint mode
3095 ok = true;
3096 if (r == -ENODATA)
3097 ok = true;
3098
3099 if (op->op == Transaction::OP_SETALLOCHINT)
3100 // Either EOPNOTSUPP or EINVAL most probably. EINVAL in most
3101 // cases means invalid hint size (e.g. too big, not a multiple
3102 // of block size, etc) or, at least on xfs, an attempt to set
3103 // or change it when the file is not empty. However,
3104 // OP_SETALLOCHINT is advisory, so ignore all errors.
3105 ok = true;
3106
3107 if (replaying && !backend->can_checkpoint()) {
3108 if (r == -EEXIST && op->op == Transaction::OP_MKCOLL) {
3109 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3110 ok = true;
3111 }
3112 if (r == -EEXIST && op->op == Transaction::OP_COLL_ADD) {
3113 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3114 ok = true;
3115 }
3116 if (r == -EEXIST && op->op == Transaction::OP_COLL_MOVE) {
3117 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3118 ok = true;
3119 }
3120 if (r == -ERANGE) {
3121 dout(10) << "tolerating ERANGE on replay" << dendl;
3122 ok = true;
3123 }
3124 if (r == -ENOENT) {
3125 dout(10) << "tolerating ENOENT on replay" << dendl;
3126 ok = true;
3127 }
3128 }
3129
3130 if (!ok) {
3131 const char *msg = "unexpected error code";
3132
3133 if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE ||
3134 op->op == Transaction::OP_CLONE ||
3135 op->op == Transaction::OP_CLONERANGE2)) {
3136 msg = "ENOENT on clone suggests osd bug";
3137 } else if (r == -ENOSPC) {
3138 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
3139 // by partially applying transactions.
3140 msg = "ENOSPC from disk filesystem, misconfigured cluster";
3141 } else if (r == -ENOTEMPTY) {
3142 msg = "ENOTEMPTY suggests garbage data in osd data dir";
3143 } else if (r == -EPERM) {
3144 msg = "EPERM suggests file(s) in osd data dir not owned by ceph user, or leveldb corruption";
3145 }
3146
3147 derr << " error " << cpp_strerror(r) << " not handled on operation " << op
3148 << " (" << spos << ", or op " << spos.op << ", counting from 0)" << dendl;
3149 dout(0) << msg << dendl;
3150 dout(0) << " transaction dump:\n";
3151 JSONFormatter f(true);
3152 f.open_object_section("transaction");
3153 t.dump(&f);
3154 f.close_section();
3155 f.flush(*_dout);
3156 *_dout << dendl;
3157
3158 if (r == -EMFILE) {
3159 dump_open_fds(cct);
3160 }
3161
3162 assert(0 == "unexpected error");
3163 }
3164 }
3165
3166 spos.op++;
3167 }
3168
3169 _inject_failure();
3170 }
3171
3172 /*********************************************/
3173
3174
3175
3176 // --------------------
3177 // objects
3178
3179 bool FileStore::exists(const coll_t& _cid, const ghobject_t& oid)
3180 {
3181 tracepoint(objectstore, exists_enter, _cid.c_str());
3182 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3183 struct stat st;
3184 bool retval = stat(cid, oid, &st) == 0;
3185 tracepoint(objectstore, exists_exit, retval);
3186 return retval;
3187 }
3188
3189 int FileStore::stat(
3190 const coll_t& _cid, const ghobject_t& oid, struct stat *st, bool allow_eio)
3191 {
3192 tracepoint(objectstore, stat_enter, _cid.c_str());
3193 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3194 int r = lfn_stat(cid, oid, st);
3195 assert(allow_eio || !m_filestore_fail_eio || r != -EIO);
3196 if (r < 0) {
3197 dout(10) << __FUNC__ << ": " << cid << "/" << oid
3198 << " = " << r << dendl;
3199 } else {
3200 dout(10) << __FUNC__ << ": " << cid << "/" << oid
3201 << " = " << r
3202 << " (size " << st->st_size << ")" << dendl;
3203 }
3204 if (cct->_conf->filestore_debug_inject_read_err &&
3205 debug_mdata_eio(oid)) {
3206 return -EIO;
3207 } else {
3208 tracepoint(objectstore, stat_exit, r);
3209 return r;
3210 }
3211 }
3212
3213 int FileStore::set_collection_opts(
3214 const coll_t& cid,
3215 const pool_opts_t& opts)
3216 {
3217 return -EOPNOTSUPP;
3218 }
3219
3220 int FileStore::read(
3221 const coll_t& _cid,
3222 const ghobject_t& oid,
3223 uint64_t offset,
3224 size_t len,
3225 bufferlist& bl,
3226 uint32_t op_flags)
3227 {
3228 int got;
3229 tracepoint(objectstore, read_enter, _cid.c_str(), offset, len);
3230 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3231
3232 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3233
3234 FDRef fd;
3235 int r = lfn_open(cid, oid, false, &fd);
3236 if (r < 0) {
3237 dout(10) << __FUNC__ << ": (" << cid << "/" << oid << ") open error: "
3238 << cpp_strerror(r) << dendl;
3239 return r;
3240 }
3241
3242 if (offset == 0 && len == 0) {
3243 struct stat st;
3244 memset(&st, 0, sizeof(struct stat));
3245 int r = ::fstat(**fd, &st);
3246 assert(r == 0);
3247 len = st.st_size;
3248 }
3249
3250 #ifdef HAVE_POSIX_FADVISE
3251 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_RANDOM)
3252 posix_fadvise(**fd, offset, len, POSIX_FADV_RANDOM);
3253 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL)
3254 posix_fadvise(**fd, offset, len, POSIX_FADV_SEQUENTIAL);
3255 #endif
3256
3257 bufferptr bptr(len); // prealloc space for entire read
3258 got = safe_pread(**fd, bptr.c_str(), len, offset);
3259 if (got < 0) {
3260 dout(10) << __FUNC__ << ": (" << cid << "/" << oid << ") pread error: " << cpp_strerror(got) << dendl;
3261 lfn_close(fd);
3262 return got;
3263 }
3264 bptr.set_length(got); // properly size the buffer
3265 bl.clear();
3266 bl.push_back(std::move(bptr)); // put it in the target bufferlist
3267
3268 #ifdef HAVE_POSIX_FADVISE
3269 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED)
3270 posix_fadvise(**fd, offset, len, POSIX_FADV_DONTNEED);
3271 if (op_flags & (CEPH_OSD_OP_FLAG_FADVISE_RANDOM | CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL))
3272 posix_fadvise(**fd, offset, len, POSIX_FADV_NORMAL);
3273 #endif
3274
3275 if (m_filestore_sloppy_crc && (!replaying || backend->can_checkpoint())) {
3276 ostringstream ss;
3277 int errors = backend->_crc_verify_read(**fd, offset, got, bl, &ss);
3278 if (errors != 0) {
3279 dout(0) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~"
3280 << got << " ... BAD CRC:\n" << ss.str() << dendl;
3281 assert(0 == "bad crc on read");
3282 }
3283 }
3284
3285 lfn_close(fd);
3286
3287 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~"
3288 << got << "/" << len << dendl;
3289 if (cct->_conf->filestore_debug_inject_read_err &&
3290 debug_data_eio(oid)) {
3291 return -EIO;
3292 } else if (cct->_conf->filestore_debug_random_read_err &&
3293 (rand() % (int)(cct->_conf->filestore_debug_random_read_err * 100.0)) == 0) {
3294 dout(0) << __func__ << ": inject random EIO" << dendl;
3295 return -EIO;
3296 } else {
3297 tracepoint(objectstore, read_exit, got);
3298 return got;
3299 }
3300 }
3301
3302 int FileStore::_do_fiemap(int fd, uint64_t offset, size_t len,
3303 map<uint64_t, uint64_t> *m)
3304 {
3305 uint64_t i;
3306 struct fiemap_extent *extent = NULL;
3307 struct fiemap *fiemap = NULL;
3308 int r = 0;
3309
3310 more:
3311 r = backend->do_fiemap(fd, offset, len, &fiemap);
3312 if (r < 0)
3313 return r;
3314
3315 if (fiemap->fm_mapped_extents == 0) {
3316 free(fiemap);
3317 return r;
3318 }
3319
3320 extent = &fiemap->fm_extents[0];
3321
3322 /* start where we were asked to start */
3323 if (extent->fe_logical < offset) {
3324 extent->fe_length -= offset - extent->fe_logical;
3325 extent->fe_logical = offset;
3326 }
3327
3328 i = 0;
3329
3330 struct fiemap_extent *last = nullptr;
3331 while (i < fiemap->fm_mapped_extents) {
3332 struct fiemap_extent *next = extent + 1;
3333
3334 dout(10) << __FUNC__ << ": fm_mapped_extents=" << fiemap->fm_mapped_extents
3335 << " fe_logical=" << extent->fe_logical << " fe_length=" << extent->fe_length << dendl;
3336
3337 /* try to merge extents */
3338 while ((i < fiemap->fm_mapped_extents - 1) &&
3339 (extent->fe_logical + extent->fe_length == next->fe_logical)) {
3340 next->fe_length += extent->fe_length;
3341 next->fe_logical = extent->fe_logical;
3342 extent = next;
3343 next = extent + 1;
3344 i++;
3345 }
3346
3347 if (extent->fe_logical + extent->fe_length > offset + len)
3348 extent->fe_length = offset + len - extent->fe_logical;
3349 (*m)[extent->fe_logical] = extent->fe_length;
3350 i++;
3351 last = extent++;
3352 }
3353 uint64_t xoffset = last->fe_logical + last->fe_length - offset;
3354 offset = last->fe_logical + last->fe_length;
3355 len -= xoffset;
3356 const bool is_last = (last->fe_flags & FIEMAP_EXTENT_LAST) || (len == 0);
3357 free(fiemap);
3358 if (!is_last) {
3359 goto more;
3360 }
3361
3362 return r;
3363 }
3364
3365 int FileStore::_do_seek_hole_data(int fd, uint64_t offset, size_t len,
3366 map<uint64_t, uint64_t> *m)
3367 {
3368 #if defined(__linux__) && defined(SEEK_HOLE) && defined(SEEK_DATA)
3369 off_t hole_pos, data_pos;
3370 int r = 0;
3371
3372 // If lseek fails with errno setting to be ENXIO, this means the current
3373 // file offset is beyond the end of the file.
3374 off_t start = offset;
3375 while(start < (off_t)(offset + len)) {
3376 data_pos = lseek(fd, start, SEEK_DATA);
3377 if (data_pos < 0) {
3378 if (errno == ENXIO)
3379 break;
3380 else {
3381 r = -errno;
3382 dout(10) << "failed to lseek: " << cpp_strerror(r) << dendl;
3383 return r;
3384 }
3385 } else if (data_pos > (off_t)(offset + len)) {
3386 break;
3387 }
3388
3389 hole_pos = lseek(fd, data_pos, SEEK_HOLE);
3390 if (hole_pos < 0) {
3391 if (errno == ENXIO) {
3392 break;
3393 } else {
3394 r = -errno;
3395 dout(10) << "failed to lseek: " << cpp_strerror(r) << dendl;
3396 return r;
3397 }
3398 }
3399
3400 if (hole_pos >= (off_t)(offset + len)) {
3401 (*m)[data_pos] = offset + len - data_pos;
3402 break;
3403 }
3404 (*m)[data_pos] = hole_pos - data_pos;
3405 start = hole_pos;
3406 }
3407
3408 return r;
3409 #else
3410 (*m)[offset] = len;
3411 return 0;
3412 #endif
3413 }
3414
3415 int FileStore::fiemap(const coll_t& _cid, const ghobject_t& oid,
3416 uint64_t offset, size_t len,
3417 bufferlist& bl)
3418 {
3419 map<uint64_t, uint64_t> exomap;
3420 int r = fiemap(_cid, oid, offset, len, exomap);
3421 if (r >= 0) {
3422 ::encode(exomap, bl);
3423 }
3424 return r;
3425 }
3426
3427 int FileStore::fiemap(const coll_t& _cid, const ghobject_t& oid,
3428 uint64_t offset, size_t len,
3429 map<uint64_t, uint64_t>& destmap)
3430 {
3431 tracepoint(objectstore, fiemap_enter, _cid.c_str(), offset, len);
3432 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3433 destmap.clear();
3434
3435 if ((!backend->has_seek_data_hole() && !backend->has_fiemap()) ||
3436 len <= (size_t)m_filestore_fiemap_threshold) {
3437 destmap[offset] = len;
3438 return 0;
3439 }
3440
3441 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3442
3443 FDRef fd;
3444
3445 int r = lfn_open(cid, oid, false, &fd);
3446 if (r < 0) {
3447 dout(10) << "read couldn't open " << cid << "/" << oid << ": " << cpp_strerror(r) << dendl;
3448 goto done;
3449 }
3450
3451 if (backend->has_seek_data_hole()) {
3452 dout(15) << "seek_data/seek_hole " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3453 r = _do_seek_hole_data(**fd, offset, len, &destmap);
3454 } else if (backend->has_fiemap()) {
3455 dout(15) << "fiemap ioctl" << cid << "/" << oid << " " << offset << "~" << len << dendl;
3456 r = _do_fiemap(**fd, offset, len, &destmap);
3457 }
3458
3459 lfn_close(fd);
3460
3461 done:
3462
3463 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << " num_extents=" << destmap.size() << " " << destmap << dendl;
3464 assert(!m_filestore_fail_eio || r != -EIO);
3465 tracepoint(objectstore, fiemap_exit, r);
3466 return r;
3467 }
3468
3469 int FileStore::_remove(const coll_t& cid, const ghobject_t& oid,
3470 const SequencerPosition &spos)
3471 {
3472 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
3473 int r = lfn_unlink(cid, oid, spos);
3474 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
3475 return r;
3476 }
3477
3478 int FileStore::_truncate(const coll_t& cid, const ghobject_t& oid, uint64_t size)
3479 {
3480 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " size " << size << dendl;
3481 int r = lfn_truncate(cid, oid, size);
3482 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " size " << size << " = " << r << dendl;
3483 return r;
3484 }
3485
3486
3487 int FileStore::_touch(const coll_t& cid, const ghobject_t& oid)
3488 {
3489 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
3490
3491 FDRef fd;
3492 int r = lfn_open(cid, oid, true, &fd);
3493 if (r < 0) {
3494 return r;
3495 } else {
3496 lfn_close(fd);
3497 }
3498 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
3499 return r;
3500 }
3501
3502 int FileStore::_write(const coll_t& cid, const ghobject_t& oid,
3503 uint64_t offset, size_t len,
3504 const bufferlist& bl, uint32_t fadvise_flags)
3505 {
3506 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3507 int r;
3508
3509 FDRef fd;
3510 r = lfn_open(cid, oid, true, &fd);
3511 if (r < 0) {
3512 dout(0) << __FUNC__ << ": couldn't open " << cid << "/"
3513 << oid << ": "
3514 << cpp_strerror(r) << dendl;
3515 goto out;
3516 }
3517
3518 // write
3519 r = bl.write_fd(**fd, offset);
3520 if (r < 0) {
3521 derr << __FUNC__ << ": write_fd on " << cid << "/" << oid
3522 << " error: " << cpp_strerror(r) << dendl;
3523 lfn_close(fd);
3524 goto out;
3525 }
3526 r = bl.length();
3527
3528 if (r >= 0 && m_filestore_sloppy_crc) {
3529 int rc = backend->_crc_update_write(**fd, offset, len, bl);
3530 assert(rc >= 0);
3531 }
3532
3533 if (replaying || m_disable_wbthrottle) {
3534 if (fadvise_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED) {
3535 #ifdef HAVE_POSIX_FADVISE
3536 posix_fadvise(**fd, 0, 0, POSIX_FADV_DONTNEED);
3537 #endif
3538 }
3539 } else {
3540 wbthrottle.queue_wb(fd, oid, offset, len,
3541 fadvise_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
3542 }
3543
3544 lfn_close(fd);
3545
3546 out:
3547 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << dendl;
3548 return r;
3549 }
3550
3551 int FileStore::_zero(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len)
3552 {
3553 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3554 int ret = 0;
3555
3556 if (cct->_conf->filestore_punch_hole) {
3557 #ifdef CEPH_HAVE_FALLOCATE
3558 # if !defined(DARWIN) && !defined(__FreeBSD__)
3559 # ifdef FALLOC_FL_KEEP_SIZE
3560 // first try to punch a hole.
3561 FDRef fd;
3562 ret = lfn_open(cid, oid, false, &fd);
3563 if (ret < 0) {
3564 goto out;
3565 }
3566
3567 struct stat st;
3568 ret = ::fstat(**fd, &st);
3569 if (ret < 0) {
3570 ret = -errno;
3571 lfn_close(fd);
3572 goto out;
3573 }
3574
3575 // first try fallocate
3576 ret = fallocate(**fd, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE,
3577 offset, len);
3578 if (ret < 0) {
3579 ret = -errno;
3580 } else {
3581 // ensure we extend file size, if needed
3582 if (len > 0 && offset + len > (uint64_t)st.st_size) {
3583 ret = ::ftruncate(**fd, offset + len);
3584 if (ret < 0) {
3585 ret = -errno;
3586 lfn_close(fd);
3587 goto out;
3588 }
3589 }
3590 }
3591 lfn_close(fd);
3592
3593 if (ret >= 0 && m_filestore_sloppy_crc) {
3594 int rc = backend->_crc_update_zero(**fd, offset, len);
3595 assert(rc >= 0);
3596 }
3597
3598 if (ret == 0)
3599 goto out; // yay!
3600 if (ret != -EOPNOTSUPP)
3601 goto out; // some other error
3602 # endif
3603 # endif
3604 #endif
3605 }
3606
3607 // lame, kernel is old and doesn't support it.
3608 // write zeros.. yuck!
3609 dout(20) << __FUNC__ << ": falling back to writing zeros" << dendl;
3610 {
3611 bufferlist bl;
3612 bl.append_zero(len);
3613 ret = _write(cid, oid, offset, len, bl);
3614 }
3615
3616 #ifdef CEPH_HAVE_FALLOCATE
3617 # if !defined(DARWIN) && !defined(__FreeBSD__)
3618 # ifdef FALLOC_FL_KEEP_SIZE
3619 out:
3620 # endif
3621 # endif
3622 #endif
3623 dout(20) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << ret << dendl;
3624 return ret;
3625 }
3626
3627 int FileStore::_clone(const coll_t& cid, const ghobject_t& oldoid, const ghobject_t& newoid,
3628 const SequencerPosition& spos)
3629 {
3630 dout(15) << __FUNC__ << ": " << cid << "/" << oldoid << " -> " << cid << "/" << newoid << dendl;
3631
3632 if (_check_replay_guard(cid, newoid, spos) < 0)
3633 return 0;
3634
3635 int r;
3636 FDRef o, n;
3637 {
3638 Index index;
3639 r = lfn_open(cid, oldoid, false, &o, &index);
3640 if (r < 0) {
3641 goto out2;
3642 }
3643 assert(NULL != (index.index));
3644 RWLock::WLocker l((index.index)->access_lock);
3645
3646 r = lfn_open(cid, newoid, true, &n, &index);
3647 if (r < 0) {
3648 goto out;
3649 }
3650 r = ::ftruncate(**n, 0);
3651 if (r < 0) {
3652 r = -errno;
3653 goto out3;
3654 }
3655 struct stat st;
3656 r = ::fstat(**o, &st);
3657 if (r < 0) {
3658 r = -errno;
3659 goto out3;
3660 }
3661
3662 r = _do_clone_range(**o, **n, 0, st.st_size, 0);
3663 if (r < 0) {
3664 goto out3;
3665 }
3666
3667 dout(20) << "objectmap clone" << dendl;
3668 r = object_map->clone(oldoid, newoid, &spos);
3669 if (r < 0 && r != -ENOENT)
3670 goto out3;
3671 }
3672
3673 {
3674 char buf[2];
3675 map<string, bufferptr> aset;
3676 r = _fgetattrs(**o, aset);
3677 if (r < 0)
3678 goto out3;
3679
3680 r = chain_fgetxattr(**o, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
3681 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
3682 r = chain_fsetxattr<true, true>(**n, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
3683 sizeof(XATTR_NO_SPILL_OUT));
3684 } else {
3685 r = chain_fsetxattr<true, true>(**n, XATTR_SPILL_OUT_NAME, XATTR_SPILL_OUT,
3686 sizeof(XATTR_SPILL_OUT));
3687 }
3688 if (r < 0)
3689 goto out3;
3690
3691 r = _fsetattrs(**n, aset);
3692 if (r < 0)
3693 goto out3;
3694 }
3695
3696 // clone is non-idempotent; record our work.
3697 _set_replay_guard(**n, spos, &newoid);
3698
3699 out3:
3700 lfn_close(n);
3701 out:
3702 lfn_close(o);
3703 out2:
3704 dout(10) << __FUNC__ << ": " << cid << "/" << oldoid << " -> " << cid << "/" << newoid << " = " << r << dendl;
3705 assert(!m_filestore_fail_eio || r != -EIO);
3706 return r;
3707 }
3708
3709 int FileStore::_do_clone_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff)
3710 {
3711 dout(20) << __FUNC__ << ": copy " << srcoff << "~" << len << " to " << dstoff << dendl;
3712 return backend->clone_range(from, to, srcoff, len, dstoff);
3713 }
3714
3715 int FileStore::_do_sparse_copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff)
3716 {
3717 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << dendl;
3718 int r = 0;
3719 map<uint64_t, uint64_t> exomap;
3720 // fiemap doesn't allow zero length
3721 if (len == 0)
3722 return 0;
3723
3724 if (backend->has_seek_data_hole()) {
3725 dout(15) << "seek_data/seek_hole " << from << " " << srcoff << "~" << len << dendl;
3726 r = _do_seek_hole_data(from, srcoff, len, &exomap);
3727 } else if (backend->has_fiemap()) {
3728 dout(15) << "fiemap ioctl" << from << " " << srcoff << "~" << len << dendl;
3729 r = _do_fiemap(from, srcoff, len, &exomap);
3730 }
3731
3732
3733 int64_t written = 0;
3734 if (r < 0)
3735 goto out;
3736
3737 for (map<uint64_t, uint64_t>::iterator miter = exomap.begin(); miter != exomap.end(); ++miter) {
3738 uint64_t it_off = miter->first - srcoff + dstoff;
3739 r = _do_copy_range(from, to, miter->first, miter->second, it_off, true);
3740 if (r < 0) {
3741 derr << __FUNC__ << ": copy error at " << miter->first << "~" << miter->second
3742 << " to " << it_off << ", " << cpp_strerror(r) << dendl;
3743 break;
3744 }
3745 written += miter->second;
3746 }
3747
3748 if (r >= 0) {
3749 if (m_filestore_sloppy_crc) {
3750 int rc = backend->_crc_update_clone_range(from, to, srcoff, len, dstoff);
3751 assert(rc >= 0);
3752 }
3753 struct stat st;
3754 r = ::fstat(to, &st);
3755 if (r < 0) {
3756 r = -errno;
3757 derr << __FUNC__ << ": fstat error at " << to << " " << cpp_strerror(r) << dendl;
3758 goto out;
3759 }
3760 if (st.st_size < (int)(dstoff + len)) {
3761 r = ::ftruncate(to, dstoff + len);
3762 if (r < 0) {
3763 r = -errno;
3764 derr << __FUNC__ << ": ftruncate error at " << dstoff+len << " " << cpp_strerror(r) << dendl;
3765 goto out;
3766 }
3767 }
3768 r = written;
3769 }
3770
3771 out:
3772 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
3773 return r;
3774 }
3775
3776 int FileStore::_do_copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff, bool skip_sloppycrc)
3777 {
3778 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << dendl;
3779 int r = 0;
3780 loff_t pos = srcoff;
3781 loff_t end = srcoff + len;
3782 int buflen = 4096 * 16; //limit by pipe max size.see fcntl
3783
3784 #ifdef CEPH_HAVE_SPLICE
3785 if (backend->has_splice()) {
3786 int pipefd[2];
3787 if (pipe(pipefd) < 0) {
3788 r = -errno;
3789 derr << " pipe " << " got " << cpp_strerror(r) << dendl;
3790 return r;
3791 }
3792
3793 loff_t dstpos = dstoff;
3794 while (pos < end) {
3795 int l = MIN(end-pos, buflen);
3796 r = safe_splice(from, &pos, pipefd[1], NULL, l, SPLICE_F_NONBLOCK);
3797 dout(10) << " safe_splice read from " << pos << "~" << l << " got " << r << dendl;
3798 if (r < 0) {
3799 derr << __FUNC__ << ": safe_splice read error at " << pos << "~" << len
3800 << ", " << cpp_strerror(r) << dendl;
3801 break;
3802 }
3803 if (r == 0) {
3804 // hrm, bad source range, wtf.
3805 r = -ERANGE;
3806 derr << __FUNC__ << ": got short read result at " << pos
3807 << " of fd " << from << " len " << len << dendl;
3808 break;
3809 }
3810
3811 r = safe_splice(pipefd[0], NULL, to, &dstpos, r, 0);
3812 dout(10) << " safe_splice write to " << to << " len " << r
3813 << " got " << r << dendl;
3814 if (r < 0) {
3815 derr << __FUNC__ << ": write error at " << pos << "~"
3816 << r << ", " << cpp_strerror(r) << dendl;
3817 break;
3818 }
3819 }
3820 close(pipefd[0]);
3821 close(pipefd[1]);
3822 } else
3823 #endif
3824 {
3825 int64_t actual;
3826
3827 actual = ::lseek64(from, srcoff, SEEK_SET);
3828 if (actual != (int64_t)srcoff) {
3829 if (actual < 0)
3830 r = -errno;
3831 else
3832 r = -EINVAL;
3833 derr << "lseek64 to " << srcoff << " got " << cpp_strerror(r) << dendl;
3834 return r;
3835 }
3836 actual = ::lseek64(to, dstoff, SEEK_SET);
3837 if (actual != (int64_t)dstoff) {
3838 if (actual < 0)
3839 r = -errno;
3840 else
3841 r = -EINVAL;
3842 derr << "lseek64 to " << dstoff << " got " << cpp_strerror(r) << dendl;
3843 return r;
3844 }
3845
3846 char buf[buflen];
3847 while (pos < end) {
3848 int l = MIN(end-pos, buflen);
3849 r = ::read(from, buf, l);
3850 dout(25) << " read from " << pos << "~" << l << " got " << r << dendl;
3851 if (r < 0) {
3852 if (errno == EINTR) {
3853 continue;
3854 } else {
3855 r = -errno;
3856 derr << __FUNC__ << ": read error at " << pos << "~" << len
3857 << ", " << cpp_strerror(r) << dendl;
3858 break;
3859 }
3860 }
3861 if (r == 0) {
3862 // hrm, bad source range, wtf.
3863 r = -ERANGE;
3864 derr << __FUNC__ << ": got short read result at " << pos
3865 << " of fd " << from << " len " << len << dendl;
3866 break;
3867 }
3868 int op = 0;
3869 while (op < r) {
3870 int r2 = safe_write(to, buf+op, r-op);
3871 dout(25) << " write to " << to << " len " << (r-op)
3872 << " got " << r2 << dendl;
3873 if (r2 < 0) {
3874 r = r2;
3875 derr << __FUNC__ << ": write error at " << pos << "~"
3876 << r-op << ", " << cpp_strerror(r) << dendl;
3877
3878 break;
3879 }
3880 op += (r-op);
3881 }
3882 if (r < 0)
3883 break;
3884 pos += r;
3885 }
3886 }
3887
3888 if (r < 0 && replaying) {
3889 assert(r == -ERANGE);
3890 derr << __FUNC__ << ": short source tolerated because we are replaying" << dendl;
3891 r = pos - from;;
3892 }
3893 assert(replaying || pos == end);
3894 if (r >= 0 && !skip_sloppycrc && m_filestore_sloppy_crc) {
3895 int rc = backend->_crc_update_clone_range(from, to, srcoff, len, dstoff);
3896 assert(rc >= 0);
3897 }
3898 dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
3899 return r;
3900 }
3901
3902 int FileStore::_clone_range(const coll_t& oldcid, const ghobject_t& oldoid, const coll_t& newcid, const ghobject_t& newoid,
3903 uint64_t srcoff, uint64_t len, uint64_t dstoff,
3904 const SequencerPosition& spos)
3905 {
3906 dout(15) << __FUNC__ << ": " << oldcid << "/" << oldoid << " -> " << newcid << "/" << newoid << " " << srcoff << "~" << len << " to " << dstoff << dendl;
3907
3908 if (_check_replay_guard(newcid, newoid, spos) < 0)
3909 return 0;
3910
3911 int r;
3912 FDRef o, n;
3913 r = lfn_open(oldcid, oldoid, false, &o);
3914 if (r < 0) {
3915 goto out2;
3916 }
3917 r = lfn_open(newcid, newoid, true, &n);
3918 if (r < 0) {
3919 goto out;
3920 }
3921 r = _do_clone_range(**o, **n, srcoff, len, dstoff);
3922 if (r < 0) {
3923 goto out3;
3924 }
3925
3926 // clone is non-idempotent; record our work.
3927 _set_replay_guard(**n, spos, &newoid);
3928
3929 out3:
3930 lfn_close(n);
3931 out:
3932 lfn_close(o);
3933 out2:
3934 dout(10) << __FUNC__ << ": " << oldcid << "/" << oldoid << " -> " << newcid << "/" << newoid << " "
3935 << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
3936 return r;
3937 }
3938
3939 class SyncEntryTimeout : public Context {
3940 public:
3941 CephContext* cct;
3942 explicit SyncEntryTimeout(CephContext* cct, int commit_timeo)
3943 : cct(cct), m_commit_timeo(commit_timeo)
3944 {
3945 }
3946
3947 void finish(int r) override {
3948 BackTrace *bt = new BackTrace(1);
3949 generic_dout(-1) << "FileStore: sync_entry timed out after "
3950 << m_commit_timeo << " seconds.\n";
3951 bt->print(*_dout);
3952 *_dout << dendl;
3953 delete bt;
3954 ceph_abort();
3955 }
3956 private:
3957 int m_commit_timeo;
3958 };
3959
3960 void FileStore::sync_entry()
3961 {
3962 lock.Lock();
3963 while (!stop) {
3964 utime_t max_interval;
3965 max_interval.set_from_double(m_filestore_max_sync_interval);
3966 utime_t min_interval;
3967 min_interval.set_from_double(m_filestore_min_sync_interval);
3968
3969 utime_t startwait = ceph_clock_now();
3970 if (!force_sync) {
3971 dout(20) << __FUNC__ << ": waiting for max_interval " << max_interval << dendl;
3972 sync_cond.WaitInterval(lock, max_interval);
3973 } else {
3974 dout(20) << __FUNC__ << ": not waiting, force_sync set" << dendl;
3975 }
3976
3977 if (force_sync) {
3978 dout(20) << __FUNC__ << ": force_sync set" << dendl;
3979 force_sync = false;
3980 } else if (stop) {
3981 dout(20) << __FUNC__ << ": stop set" << dendl;
3982 break;
3983 } else {
3984 // wait for at least the min interval
3985 utime_t woke = ceph_clock_now();
3986 woke -= startwait;
3987 dout(20) << __FUNC__ << ": woke after " << woke << dendl;
3988 if (woke < min_interval) {
3989 utime_t t = min_interval;
3990 t -= woke;
3991 dout(20) << __FUNC__ << ": waiting for another " << t
3992 << " to reach min interval " << min_interval << dendl;
3993 sync_cond.WaitInterval(lock, t);
3994 }
3995 }
3996
3997 list<Context*> fin;
3998 again:
3999 fin.swap(sync_waiters);
4000 lock.Unlock();
4001
4002 op_tp.pause();
4003 if (apply_manager.commit_start()) {
4004 utime_t start = ceph_clock_now();
4005 uint64_t cp = apply_manager.get_committing_seq();
4006
4007 sync_entry_timeo_lock.Lock();
4008 SyncEntryTimeout *sync_entry_timeo =
4009 new SyncEntryTimeout(cct, m_filestore_commit_timeout);
4010 if (!timer.add_event_after(m_filestore_commit_timeout,
4011 sync_entry_timeo)) {
4012 sync_entry_timeo = nullptr;
4013 }
4014 sync_entry_timeo_lock.Unlock();
4015
4016 logger->set(l_filestore_committing, 1);
4017
4018 dout(15) << __FUNC__ << ": committing " << cp << dendl;
4019 stringstream errstream;
4020 if (cct->_conf->filestore_debug_omap_check && !object_map->check(errstream)) {
4021 derr << errstream.str() << dendl;
4022 ceph_abort();
4023 }
4024
4025 if (backend->can_checkpoint()) {
4026 int err = write_op_seq(op_fd, cp);
4027 if (err < 0) {
4028 derr << "Error during write_op_seq: " << cpp_strerror(err) << dendl;
4029 assert(0 == "error during write_op_seq");
4030 }
4031
4032 char s[NAME_MAX];
4033 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)cp);
4034 uint64_t cid = 0;
4035 err = backend->create_checkpoint(s, &cid);
4036 if (err < 0) {
4037 int err = errno;
4038 derr << "snap create '" << s << "' got error " << err << dendl;
4039 assert(err == 0);
4040 }
4041
4042 snaps.push_back(cp);
4043 apply_manager.commit_started();
4044 op_tp.unpause();
4045
4046 if (cid > 0) {
4047 dout(20) << " waiting for checkpoint " << cid << " to complete" << dendl;
4048 err = backend->sync_checkpoint(cid);
4049 if (err < 0) {
4050 derr << "ioctl WAIT_SYNC got " << cpp_strerror(err) << dendl;
4051 assert(0 == "wait_sync got error");
4052 }
4053 dout(20) << " done waiting for checkpoint " << cid << " to complete" << dendl;
4054 }
4055 } else {
4056 apply_manager.commit_started();
4057 op_tp.unpause();
4058
4059 int err = object_map->sync();
4060 if (err < 0) {
4061 derr << "object_map sync got " << cpp_strerror(err) << dendl;
4062 assert(0 == "object_map sync returned error");
4063 }
4064
4065 err = backend->syncfs();
4066 if (err < 0) {
4067 derr << "syncfs got " << cpp_strerror(err) << dendl;
4068 assert(0 == "syncfs returned error");
4069 }
4070
4071 err = write_op_seq(op_fd, cp);
4072 if (err < 0) {
4073 derr << "Error during write_op_seq: " << cpp_strerror(err) << dendl;
4074 assert(0 == "error during write_op_seq");
4075 }
4076 err = ::fsync(op_fd);
4077 if (err < 0) {
4078 derr << "Error during fsync of op_seq: " << cpp_strerror(err) << dendl;
4079 assert(0 == "error during fsync of op_seq");
4080 }
4081 }
4082
4083 utime_t done = ceph_clock_now();
4084 utime_t lat = done - start;
4085 utime_t dur = done - startwait;
4086 dout(10) << __FUNC__ << ": commit took " << lat << ", interval was " << dur << dendl;
4087 utime_t max_pause_lat = logger->tget(l_filestore_sync_pause_max_lat);
4088 if (max_pause_lat < dur - lat) {
4089 logger->tinc(l_filestore_sync_pause_max_lat, dur - lat);
4090 }
4091
4092 logger->inc(l_filestore_commitcycle);
4093 logger->tinc(l_filestore_commitcycle_latency, lat);
4094 logger->tinc(l_filestore_commitcycle_interval, dur);
4095
4096 apply_manager.commit_finish();
4097 if (!m_disable_wbthrottle) {
4098 wbthrottle.clear();
4099 }
4100
4101 logger->set(l_filestore_committing, 0);
4102
4103 // remove old snaps?
4104 if (backend->can_checkpoint()) {
4105 char s[NAME_MAX];
4106 while (snaps.size() > 2) {
4107 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)snaps.front());
4108 snaps.pop_front();
4109 dout(10) << "removing snap '" << s << "'" << dendl;
4110 int r = backend->destroy_checkpoint(s);
4111 if (r) {
4112 int err = errno;
4113 derr << "unable to destroy snap '" << s << "' got " << cpp_strerror(err) << dendl;
4114 }
4115 }
4116 }
4117
4118 dout(15) << __FUNC__ << ": committed to op_seq " << cp << dendl;
4119
4120 if (sync_entry_timeo) {
4121 Mutex::Locker lock(sync_entry_timeo_lock);
4122 timer.cancel_event(sync_entry_timeo);
4123 }
4124 } else {
4125 op_tp.unpause();
4126 }
4127
4128 lock.Lock();
4129 finish_contexts(cct, fin, 0);
4130 fin.clear();
4131 if (!sync_waiters.empty()) {
4132 dout(10) << __FUNC__ << ": more waiters, committing again" << dendl;
4133 goto again;
4134 }
4135 if (!stop && journal && journal->should_commit_now()) {
4136 dout(10) << __FUNC__ << ": journal says we should commit again (probably is/was full)" << dendl;
4137 goto again;
4138 }
4139 }
4140 stop = false;
4141 lock.Unlock();
4142 }
4143
4144 void FileStore::_start_sync()
4145 {
4146 if (!journal) { // don't do a big sync if the journal is on
4147 dout(10) << __FUNC__ << dendl;
4148 sync_cond.Signal();
4149 } else {
4150 dout(10) << __FUNC__ << ": - NOOP (journal is on)" << dendl;
4151 }
4152 }
4153
4154 void FileStore::do_force_sync()
4155 {
4156 dout(10) << __FUNC__ << dendl;
4157 Mutex::Locker l(lock);
4158 force_sync = true;
4159 sync_cond.Signal();
4160 }
4161
4162 void FileStore::start_sync(Context *onsafe)
4163 {
4164 Mutex::Locker l(lock);
4165 sync_waiters.push_back(onsafe);
4166 sync_cond.Signal();
4167 force_sync = true;
4168 dout(10) << __FUNC__ << dendl;
4169 }
4170
4171 void FileStore::sync()
4172 {
4173 Mutex l("FileStore::sync");
4174 Cond c;
4175 bool done;
4176 C_SafeCond *fin = new C_SafeCond(&l, &c, &done);
4177
4178 start_sync(fin);
4179
4180 l.Lock();
4181 while (!done) {
4182 dout(10) << "sync waiting" << dendl;
4183 c.Wait(l);
4184 }
4185 l.Unlock();
4186 dout(10) << "sync done" << dendl;
4187 }
4188
4189 void FileStore::_flush_op_queue()
4190 {
4191 dout(10) << __FUNC__ << ": draining op tp" << dendl;
4192 op_wq.drain();
4193 dout(10) << __FUNC__ << ": waiting for apply finisher" << dendl;
4194 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
4195 (*it)->wait_for_empty();
4196 }
4197 }
4198
4199 /*
4200 * flush - make every queued write readable
4201 */
4202 void FileStore::flush()
4203 {
4204 dout(10) << __FUNC__ << dendl;
4205
4206 if (cct->_conf->filestore_blackhole) {
4207 // wait forever
4208 Mutex lock("FileStore::flush::lock");
4209 Cond cond;
4210 lock.Lock();
4211 while (true)
4212 cond.Wait(lock);
4213 ceph_abort();
4214 }
4215
4216 if (m_filestore_journal_writeahead) {
4217 if (journal)
4218 journal->flush();
4219 dout(10) << __FUNC__ << ": draining ondisk finisher" << dendl;
4220 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
4221 (*it)->wait_for_empty();
4222 }
4223 }
4224
4225 _flush_op_queue();
4226 dout(10) << __FUNC__ << ": complete" << dendl;
4227 }
4228
4229 /*
4230 * sync_and_flush - make every queued write readable AND committed to disk
4231 */
4232 void FileStore::sync_and_flush()
4233 {
4234 dout(10) << __FUNC__ << dendl;
4235
4236 if (m_filestore_journal_writeahead) {
4237 if (journal)
4238 journal->flush();
4239 _flush_op_queue();
4240 } else {
4241 // includes m_filestore_journal_parallel
4242 _flush_op_queue();
4243 sync();
4244 }
4245 dout(10) << __FUNC__ << ": done" << dendl;
4246 }
4247
4248 int FileStore::flush_journal()
4249 {
4250 dout(10) << __FUNC__ << dendl;
4251 sync_and_flush();
4252 sync();
4253 return 0;
4254 }
4255
4256 int FileStore::snapshot(const string& name)
4257 {
4258 dout(10) << __FUNC__ << ": " << name << dendl;
4259 sync_and_flush();
4260
4261 if (!backend->can_checkpoint()) {
4262 dout(0) << __FUNC__ << ": " << name << " failed, not supported" << dendl;
4263 return -EOPNOTSUPP;
4264 }
4265
4266 char s[NAME_MAX];
4267 snprintf(s, sizeof(s), CLUSTER_SNAP_ITEM, name.c_str());
4268
4269 int r = backend->create_checkpoint(s, NULL);
4270 if (r) {
4271 derr << __FUNC__ << ": " << name << " failed: " << cpp_strerror(r) << dendl;
4272 }
4273
4274 return r;
4275 }
4276
4277 // -------------------------------
4278 // attributes
4279
4280 int FileStore::_fgetattr(int fd, const char *name, bufferptr& bp)
4281 {
4282 char val[CHAIN_XATTR_MAX_BLOCK_LEN];
4283 int l = chain_fgetxattr(fd, name, val, sizeof(val));
4284 if (l >= 0) {
4285 bp = buffer::create(l);
4286 memcpy(bp.c_str(), val, l);
4287 } else if (l == -ERANGE) {
4288 l = chain_fgetxattr(fd, name, 0, 0);
4289 if (l > 0) {
4290 bp = buffer::create(l);
4291 l = chain_fgetxattr(fd, name, bp.c_str(), l);
4292 }
4293 }
4294 assert(!m_filestore_fail_eio || l != -EIO);
4295 return l;
4296 }
4297
4298 int FileStore::_fgetattrs(int fd, map<string,bufferptr>& aset)
4299 {
4300 // get attr list
4301 char names1[100];
4302 int len = chain_flistxattr(fd, names1, sizeof(names1)-1);
4303 char *names2 = 0;
4304 char *name = 0;
4305 if (len == -ERANGE) {
4306 len = chain_flistxattr(fd, 0, 0);
4307 if (len < 0) {
4308 assert(!m_filestore_fail_eio || len != -EIO);
4309 return len;
4310 }
4311 dout(10) << " -ERANGE, len is " << len << dendl;
4312 names2 = new char[len+1];
4313 len = chain_flistxattr(fd, names2, len);
4314 dout(10) << " -ERANGE, got " << len << dendl;
4315 if (len < 0) {
4316 assert(!m_filestore_fail_eio || len != -EIO);
4317 delete[] names2;
4318 return len;
4319 }
4320 name = names2;
4321 } else if (len < 0) {
4322 assert(!m_filestore_fail_eio || len != -EIO);
4323 return len;
4324 } else {
4325 name = names1;
4326 }
4327 name[len] = 0;
4328
4329 char *end = name + len;
4330 while (name < end) {
4331 char *attrname = name;
4332 if (parse_attrname(&name)) {
4333 if (*name) {
4334 dout(20) << __FUNC__ << ": " << fd << " getting '" << name << "'" << dendl;
4335 int r = _fgetattr(fd, attrname, aset[name]);
4336 if (r < 0) {
4337 delete[] names2;
4338 return r;
4339 }
4340 }
4341 }
4342 name += strlen(name) + 1;
4343 }
4344
4345 delete[] names2;
4346 return 0;
4347 }
4348
4349 int FileStore::_fsetattrs(int fd, map<string, bufferptr> &aset)
4350 {
4351 for (map<string, bufferptr>::iterator p = aset.begin();
4352 p != aset.end();
4353 ++p) {
4354 char n[CHAIN_XATTR_MAX_NAME_LEN];
4355 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4356 const char *val;
4357 if (p->second.length())
4358 val = p->second.c_str();
4359 else
4360 val = "";
4361 // ??? Why do we skip setting all the other attrs if one fails?
4362 int r = chain_fsetxattr(fd, n, val, p->second.length());
4363 if (r < 0) {
4364 derr << __FUNC__ << ": chain_setxattr returned " << r << dendl;
4365 return r;
4366 }
4367 }
4368 return 0;
4369 }
4370
4371 // debug EIO injection
4372 void FileStore::inject_data_error(const ghobject_t &oid) {
4373 Mutex::Locker l(read_error_lock);
4374 dout(10) << __FUNC__ << ": init error on " << oid << dendl;
4375 data_error_set.insert(oid);
4376 }
4377 void FileStore::inject_mdata_error(const ghobject_t &oid) {
4378 Mutex::Locker l(read_error_lock);
4379 dout(10) << __FUNC__ << ": init error on " << oid << dendl;
4380 mdata_error_set.insert(oid);
4381 }
4382
4383 void FileStore::debug_obj_on_delete(const ghobject_t &oid) {
4384 Mutex::Locker l(read_error_lock);
4385 dout(10) << __FUNC__ << ": clear error on " << oid << dendl;
4386 data_error_set.erase(oid);
4387 mdata_error_set.erase(oid);
4388 }
4389 bool FileStore::debug_data_eio(const ghobject_t &oid) {
4390 Mutex::Locker l(read_error_lock);
4391 if (data_error_set.count(oid)) {
4392 dout(10) << __FUNC__ << ": inject error on " << oid << dendl;
4393 return true;
4394 } else {
4395 return false;
4396 }
4397 }
4398 bool FileStore::debug_mdata_eio(const ghobject_t &oid) {
4399 Mutex::Locker l(read_error_lock);
4400 if (mdata_error_set.count(oid)) {
4401 dout(10) << __FUNC__ << ": inject error on " << oid << dendl;
4402 return true;
4403 } else {
4404 return false;
4405 }
4406 }
4407
4408
4409 // objects
4410
4411 int FileStore::getattr(const coll_t& _cid, const ghobject_t& oid, const char *name, bufferptr &bp)
4412 {
4413 tracepoint(objectstore, getattr_enter, _cid.c_str());
4414 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
4415 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "'" << dendl;
4416 FDRef fd;
4417 int r = lfn_open(cid, oid, false, &fd);
4418 if (r < 0) {
4419 goto out;
4420 }
4421 char n[CHAIN_XATTR_MAX_NAME_LEN];
4422 get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN);
4423 r = _fgetattr(**fd, n, bp);
4424 lfn_close(fd);
4425 if (r == -ENODATA) {
4426 map<string, bufferlist> got;
4427 set<string> to_get;
4428 to_get.insert(string(name));
4429 Index index;
4430 r = get_index(cid, &index);
4431 if (r < 0) {
4432 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
4433 goto out;
4434 }
4435 r = object_map->get_xattrs(oid, to_get, &got);
4436 if (r < 0 && r != -ENOENT) {
4437 dout(10) << __FUNC__ << ": get_xattrs err r =" << r << dendl;
4438 goto out;
4439 }
4440 if (got.empty()) {
4441 dout(10) << __FUNC__ << ": got.size() is 0" << dendl;
4442 return -ENODATA;
4443 }
4444 bp = bufferptr(got.begin()->second.c_str(),
4445 got.begin()->second.length());
4446 r = bp.length();
4447 }
4448 out:
4449 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "' = " << r << dendl;
4450 assert(!m_filestore_fail_eio || r != -EIO);
4451 if (cct->_conf->filestore_debug_inject_read_err &&
4452 debug_mdata_eio(oid)) {
4453 return -EIO;
4454 } else {
4455 tracepoint(objectstore, getattr_exit, r);
4456 return r < 0 ? r : 0;
4457 }
4458 }
4459
4460 int FileStore::getattrs(const coll_t& _cid, const ghobject_t& oid, map<string,bufferptr>& aset)
4461 {
4462 tracepoint(objectstore, getattrs_enter, _cid.c_str());
4463 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
4464 set<string> omap_attrs;
4465 map<string, bufferlist> omap_aset;
4466 Index index;
4467 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
4468 FDRef fd;
4469 bool spill_out = true;
4470 char buf[2];
4471
4472 int r = lfn_open(cid, oid, false, &fd);
4473 if (r < 0) {
4474 goto out;
4475 }
4476
4477 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4478 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT)))
4479 spill_out = false;
4480
4481 r = _fgetattrs(**fd, aset);
4482 lfn_close(fd);
4483 fd = FDRef(); // defensive
4484 if (r < 0) {
4485 goto out;
4486 }
4487
4488 if (!spill_out) {
4489 dout(10) << __FUNC__ << ": no xattr exists in object_map r = " << r << dendl;
4490 goto out;
4491 }
4492
4493 r = get_index(cid, &index);
4494 if (r < 0) {
4495 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
4496 goto out;
4497 }
4498 {
4499 r = object_map->get_all_xattrs(oid, &omap_attrs);
4500 if (r < 0 && r != -ENOENT) {
4501 dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
4502 goto out;
4503 }
4504
4505 r = object_map->get_xattrs(oid, omap_attrs, &omap_aset);
4506 if (r < 0 && r != -ENOENT) {
4507 dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
4508 goto out;
4509 }
4510 if (r == -ENOENT)
4511 r = 0;
4512 }
4513 assert(omap_attrs.size() == omap_aset.size());
4514 for (map<string, bufferlist>::iterator i = omap_aset.begin();
4515 i != omap_aset.end();
4516 ++i) {
4517 string key(i->first);
4518 aset.insert(make_pair(key,
4519 bufferptr(i->second.c_str(), i->second.length())));
4520 }
4521 out:
4522 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
4523 assert(!m_filestore_fail_eio || r != -EIO);
4524
4525 if (cct->_conf->filestore_debug_inject_read_err &&
4526 debug_mdata_eio(oid)) {
4527 return -EIO;
4528 } else {
4529 tracepoint(objectstore, getattrs_exit, r);
4530 return r;
4531 }
4532 }
4533
4534 int FileStore::_setattrs(const coll_t& cid, const ghobject_t& oid, map<string,bufferptr>& aset,
4535 const SequencerPosition &spos)
4536 {
4537 map<string, bufferlist> omap_set;
4538 set<string> omap_remove;
4539 map<string, bufferptr> inline_set;
4540 map<string, bufferptr> inline_to_set;
4541 FDRef fd;
4542 int spill_out = -1;
4543 bool incomplete_inline = false;
4544
4545 int r = lfn_open(cid, oid, false, &fd);
4546 if (r < 0) {
4547 goto out;
4548 }
4549
4550 char buf[2];
4551 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4552 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT)))
4553 spill_out = 0;
4554 else
4555 spill_out = 1;
4556
4557 r = _fgetattrs(**fd, inline_set);
4558 incomplete_inline = (r == -E2BIG);
4559 assert(!m_filestore_fail_eio || r != -EIO);
4560 dout(15) << __FUNC__ << ": " << cid << "/" << oid
4561 << (incomplete_inline ? " (incomplete_inline, forcing omap)" : "")
4562 << dendl;
4563
4564 for (map<string,bufferptr>::iterator p = aset.begin();
4565 p != aset.end();
4566 ++p) {
4567 char n[CHAIN_XATTR_MAX_NAME_LEN];
4568 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4569
4570 if (incomplete_inline) {
4571 chain_fremovexattr(**fd, n); // ignore any error
4572 omap_set[p->first].push_back(p->second);
4573 continue;
4574 }
4575
4576 if (p->second.length() > m_filestore_max_inline_xattr_size) {
4577 if (inline_set.count(p->first)) {
4578 inline_set.erase(p->first);
4579 r = chain_fremovexattr(**fd, n);
4580 if (r < 0)
4581 goto out_close;
4582 }
4583 omap_set[p->first].push_back(p->second);
4584 continue;
4585 }
4586
4587 if (!inline_set.count(p->first) &&
4588 inline_set.size() >= m_filestore_max_inline_xattrs) {
4589 omap_set[p->first].push_back(p->second);
4590 continue;
4591 }
4592 omap_remove.insert(p->first);
4593 inline_set.insert(*p);
4594
4595 inline_to_set.insert(*p);
4596 }
4597
4598 if (spill_out != 1 && !omap_set.empty()) {
4599 chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_SPILL_OUT,
4600 sizeof(XATTR_SPILL_OUT));
4601 }
4602
4603 r = _fsetattrs(**fd, inline_to_set);
4604 if (r < 0)
4605 goto out_close;
4606
4607 if (spill_out && !omap_remove.empty()) {
4608 r = object_map->remove_xattrs(oid, omap_remove, &spos);
4609 if (r < 0 && r != -ENOENT) {
4610 dout(10) << __FUNC__ << ": could not remove_xattrs r = " << r << dendl;
4611 assert(!m_filestore_fail_eio || r != -EIO);
4612 goto out_close;
4613 } else {
4614 r = 0; // don't confuse the debug output
4615 }
4616 }
4617
4618 if (!omap_set.empty()) {
4619 r = object_map->set_xattrs(oid, omap_set, &spos);
4620 if (r < 0) {
4621 dout(10) << __FUNC__ << ": could not set_xattrs r = " << r << dendl;
4622 assert(!m_filestore_fail_eio || r != -EIO);
4623 goto out_close;
4624 }
4625 }
4626 out_close:
4627 lfn_close(fd);
4628 out:
4629 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
4630 return r;
4631 }
4632
4633
4634 int FileStore::_rmattr(const coll_t& cid, const ghobject_t& oid, const char *name,
4635 const SequencerPosition &spos)
4636 {
4637 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "'" << dendl;
4638 FDRef fd;
4639 bool spill_out = true;
4640
4641 int r = lfn_open(cid, oid, false, &fd);
4642 if (r < 0) {
4643 goto out;
4644 }
4645
4646 char buf[2];
4647 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4648 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
4649 spill_out = false;
4650 }
4651
4652 char n[CHAIN_XATTR_MAX_NAME_LEN];
4653 get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN);
4654 r = chain_fremovexattr(**fd, n);
4655 if (r == -ENODATA && spill_out) {
4656 Index index;
4657 r = get_index(cid, &index);
4658 if (r < 0) {
4659 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
4660 goto out_close;
4661 }
4662 set<string> to_remove;
4663 to_remove.insert(string(name));
4664 r = object_map->remove_xattrs(oid, to_remove, &spos);
4665 if (r < 0 && r != -ENOENT) {
4666 dout(10) << __FUNC__ << ": could not remove_xattrs index r = " << r << dendl;
4667 assert(!m_filestore_fail_eio || r != -EIO);
4668 goto out_close;
4669 }
4670 }
4671 out_close:
4672 lfn_close(fd);
4673 out:
4674 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "' = " << r << dendl;
4675 return r;
4676 }
4677
4678 int FileStore::_rmattrs(const coll_t& cid, const ghobject_t& oid,
4679 const SequencerPosition &spos)
4680 {
4681 dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
4682
4683 map<string,bufferptr> aset;
4684 FDRef fd;
4685 set<string> omap_attrs;
4686 Index index;
4687 bool spill_out = true;
4688
4689 int r = lfn_open(cid, oid, false, &fd);
4690 if (r < 0) {
4691 goto out;
4692 }
4693
4694 char buf[2];
4695 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4696 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
4697 spill_out = false;
4698 }
4699
4700 r = _fgetattrs(**fd, aset);
4701 if (r >= 0) {
4702 for (map<string,bufferptr>::iterator p = aset.begin(); p != aset.end(); ++p) {
4703 char n[CHAIN_XATTR_MAX_NAME_LEN];
4704 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4705 r = chain_fremovexattr(**fd, n);
4706 if (r < 0) {
4707 dout(10) << __FUNC__ << ": could not remove xattr r = " << r << dendl;
4708 goto out_close;
4709 }
4710 }
4711 }
4712
4713 if (!spill_out) {
4714 dout(10) << __FUNC__ << ": no xattr exists in object_map r = " << r << dendl;
4715 goto out_close;
4716 }
4717
4718 r = get_index(cid, &index);
4719 if (r < 0) {
4720 dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
4721 goto out_close;
4722 }
4723 {
4724 r = object_map->get_all_xattrs(oid, &omap_attrs);
4725 if (r < 0 && r != -ENOENT) {
4726 dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
4727 assert(!m_filestore_fail_eio || r != -EIO);
4728 goto out_close;
4729 }
4730 r = object_map->remove_xattrs(oid, omap_attrs, &spos);
4731 if (r < 0 && r != -ENOENT) {
4732 dout(10) << __FUNC__ << ": could not remove omap_attrs r = " << r << dendl;
4733 goto out_close;
4734 }
4735 if (r == -ENOENT)
4736 r = 0;
4737 chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
4738 sizeof(XATTR_NO_SPILL_OUT));
4739 }
4740
4741 out_close:
4742 lfn_close(fd);
4743 out:
4744 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
4745 return r;
4746 }
4747
4748
4749
4750
4751 int FileStore::_collection_remove_recursive(const coll_t &cid,
4752 const SequencerPosition &spos)
4753 {
4754 struct stat st;
4755 int r = collection_stat(cid, &st);
4756 if (r < 0) {
4757 if (r == -ENOENT)
4758 return 0;
4759 return r;
4760 }
4761
4762 vector<ghobject_t> objects;
4763 ghobject_t max;
4764 while (!max.is_max()) {
4765 r = collection_list(cid, max, ghobject_t::get_max(),
4766 300, &objects, &max);
4767 if (r < 0)
4768 return r;
4769 for (vector<ghobject_t>::iterator i = objects.begin();
4770 i != objects.end();
4771 ++i) {
4772 assert(_check_replay_guard(cid, *i, spos));
4773 r = _remove(cid, *i, spos);
4774 if (r < 0)
4775 return r;
4776 }
4777 objects.clear();
4778 }
4779 return _destroy_collection(cid);
4780 }
4781
4782 // --------------------------
4783 // collections
4784
4785 int FileStore::list_collections(vector<coll_t>& ls)
4786 {
4787 return list_collections(ls, false);
4788 }
4789
4790 int FileStore::list_collections(vector<coll_t>& ls, bool include_temp)
4791 {
4792 tracepoint(objectstore, list_collections_enter);
4793 dout(10) << __FUNC__ << dendl;
4794
4795 char fn[PATH_MAX];
4796 snprintf(fn, sizeof(fn), "%s/current", basedir.c_str());
4797
4798 int r = 0;
4799 DIR *dir = ::opendir(fn);
4800 if (!dir) {
4801 r = -errno;
4802 derr << "tried opening directory " << fn << ": " << cpp_strerror(-r) << dendl;
4803 assert(!m_filestore_fail_eio || r != -EIO);
4804 return r;
4805 }
4806
4807 struct dirent *de = nullptr;
4808 while ((de = ::readdir(dir))) {
4809 if (de->d_type == DT_UNKNOWN) {
4810 // d_type not supported (non-ext[234], btrfs), must stat
4811 struct stat sb;
4812 char filename[PATH_MAX];
4813 snprintf(filename, sizeof(filename), "%s/%s", fn, de->d_name);
4814
4815 r = ::stat(filename, &sb);
4816 if (r < 0) {
4817 r = -errno;
4818 derr << "stat on " << filename << ": " << cpp_strerror(-r) << dendl;
4819 assert(!m_filestore_fail_eio || r != -EIO);
4820 break;
4821 }
4822 if (!S_ISDIR(sb.st_mode)) {
4823 continue;
4824 }
4825 } else if (de->d_type != DT_DIR) {
4826 continue;
4827 }
4828 if (strcmp(de->d_name, "omap") == 0) {
4829 continue;
4830 }
4831 if (de->d_name[0] == '.' &&
4832 (de->d_name[1] == '\0' ||
4833 (de->d_name[1] == '.' &&
4834 de->d_name[2] == '\0')))
4835 continue;
4836 coll_t cid;
4837 if (!cid.parse(de->d_name)) {
4838 derr << "ignoring invalid collection '" << de->d_name << "'" << dendl;
4839 continue;
4840 }
4841 if (!cid.is_temp() || include_temp)
4842 ls.push_back(cid);
4843 }
4844
4845 if (r > 0) {
4846 derr << "trying readdir " << fn << ": " << cpp_strerror(r) << dendl;
4847 r = -r;
4848 }
4849
4850 ::closedir(dir);
4851 assert(!m_filestore_fail_eio || r != -EIO);
4852 tracepoint(objectstore, list_collections_exit, r);
4853 return r;
4854 }
4855
4856 int FileStore::collection_stat(const coll_t& c, struct stat *st)
4857 {
4858 tracepoint(objectstore, collection_stat_enter, c.c_str());
4859 char fn[PATH_MAX];
4860 get_cdir(c, fn, sizeof(fn));
4861 dout(15) << __FUNC__ << ": " << fn << dendl;
4862 int r = ::stat(fn, st);
4863 if (r < 0)
4864 r = -errno;
4865 dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
4866 assert(!m_filestore_fail_eio || r != -EIO);
4867 tracepoint(objectstore, collection_stat_exit, r);
4868 return r;
4869 }
4870
4871 bool FileStore::collection_exists(const coll_t& c)
4872 {
4873 tracepoint(objectstore, collection_exists_enter, c.c_str());
4874 struct stat st;
4875 bool ret = collection_stat(c, &st) == 0;
4876 tracepoint(objectstore, collection_exists_exit, ret);
4877 return ret;
4878 }
4879
4880 int FileStore::collection_empty(const coll_t& c, bool *empty)
4881 {
4882 tracepoint(objectstore, collection_empty_enter, c.c_str());
4883 dout(15) << __FUNC__ << ": " << c << dendl;
4884 Index index;
4885 int r = get_index(c, &index);
4886 if (r < 0) {
4887 derr << __FUNC__ << ": get_index returned: " << cpp_strerror(r)
4888 << dendl;
4889 return r;
4890 }
4891
4892 assert(NULL != index.index);
4893 RWLock::RLocker l((index.index)->access_lock);
4894
4895 vector<ghobject_t> ls;
4896 r = index->collection_list_partial(ghobject_t(), ghobject_t::get_max(),
4897 1, &ls, NULL);
4898 if (r < 0) {
4899 derr << __FUNC__ << ": collection_list_partial returned: "
4900 << cpp_strerror(r) << dendl;
4901 assert(!m_filestore_fail_eio || r != -EIO);
4902 return r;
4903 }
4904 *empty = ls.empty();
4905 tracepoint(objectstore, collection_empty_exit, *empty);
4906 return 0;
4907 }
4908
4909 int FileStore::_collection_set_bits(const coll_t& c, int bits)
4910 {
4911 char fn[PATH_MAX];
4912 get_cdir(c, fn, sizeof(fn));
4913 dout(10) << __FUNC__ << ": " << fn << " " << bits << dendl;
4914 char n[PATH_MAX];
4915 int r;
4916 int32_t v = bits;
4917 int fd = ::open(fn, O_RDONLY);
4918 if (fd < 0) {
4919 r = -errno;
4920 goto out;
4921 }
4922 get_attrname("bits", n, PATH_MAX);
4923 r = chain_fsetxattr(fd, n, (char*)&v, sizeof(v));
4924 VOID_TEMP_FAILURE_RETRY(::close(fd));
4925 out:
4926 dout(10) << __FUNC__ << ": " << fn << " " << bits << " = " << r << dendl;
4927 return r;
4928 }
4929
4930 int FileStore::collection_bits(const coll_t& c)
4931 {
4932 char fn[PATH_MAX];
4933 get_cdir(c, fn, sizeof(fn));
4934 dout(15) << __FUNC__ << ": " << fn << dendl;
4935 int r;
4936 char n[PATH_MAX];
4937 int32_t bits;
4938 int fd = ::open(fn, O_RDONLY);
4939 if (fd < 0) {
4940 bits = r = -errno;
4941 goto out;
4942 }
4943 get_attrname("bits", n, PATH_MAX);
4944 r = chain_fgetxattr(fd, n, (char*)&bits, sizeof(bits));
4945 VOID_TEMP_FAILURE_RETRY(::close(fd));
4946 if (r < 0) {
4947 bits = r;
4948 goto out;
4949 }
4950 out:
4951 dout(10) << __FUNC__ << ": " << fn << " = " << bits << dendl;
4952 return bits;
4953 }
4954
4955 int FileStore::collection_list(const coll_t& c,
4956 const ghobject_t& orig_start,
4957 const ghobject_t& end,
4958 int max,
4959 vector<ghobject_t> *ls, ghobject_t *next)
4960 {
4961 ghobject_t start = orig_start;
4962 if (start.is_max())
4963 return 0;
4964
4965 ghobject_t temp_next;
4966 if (!next)
4967 next = &temp_next;
4968 // figure out the pool id. we need this in order to generate a
4969 // meaningful 'next' value.
4970 int64_t pool = -1;
4971 shard_id_t shard;
4972 {
4973 spg_t pgid;
4974 if (c.is_temp(&pgid)) {
4975 pool = -2 - pgid.pool();
4976 shard = pgid.shard;
4977 } else if (c.is_pg(&pgid)) {
4978 pool = pgid.pool();
4979 shard = pgid.shard;
4980 } else if (c.is_meta()) {
4981 pool = -1;
4982 shard = shard_id_t::NO_SHARD;
4983 } else {
4984 // hrm, the caller is test code! we should get kill it off. for now,
4985 // tolerate it.
4986 pool = 0;
4987 shard = shard_id_t::NO_SHARD;
4988 }
4989 dout(20) << __FUNC__ << ": pool is " << pool << " shard is " << shard
4990 << " pgid " << pgid << dendl;
4991 }
4992 ghobject_t sep;
4993 sep.hobj.pool = -1;
4994 sep.set_shard(shard);
4995 if (!c.is_temp() && !c.is_meta()) {
4996 if (start < sep) {
4997 dout(10) << __FUNC__ << ": first checking temp pool" << dendl;
4998 coll_t temp = c.get_temp();
4999 int r = collection_list(temp, start, end, max, ls, next);
5000 if (r < 0)
5001 return r;
5002 if (*next != ghobject_t::get_max())
5003 return r;
5004 start = sep;
5005 dout(10) << __FUNC__ << ": fall through to non-temp collection, start "
5006 << start << dendl;
5007 } else {
5008 dout(10) << __FUNC__ << ": start " << start << " >= sep " << sep << dendl;
5009 }
5010 }
5011
5012 Index index;
5013 int r = get_index(c, &index);
5014 if (r < 0)
5015 return r;
5016
5017 assert(NULL != index.index);
5018 RWLock::RLocker l((index.index)->access_lock);
5019
5020 r = index->collection_list_partial(start, end, max, ls, next);
5021
5022 if (r < 0) {
5023 assert(!m_filestore_fail_eio || r != -EIO);
5024 return r;
5025 }
5026 dout(20) << "objects: " << *ls << dendl;
5027
5028 // HashIndex doesn't know the pool when constructing a 'next' value
5029 if (next && !next->is_max()) {
5030 next->hobj.pool = pool;
5031 next->set_shard(shard);
5032 dout(20) << " next " << *next << dendl;
5033 }
5034
5035 return 0;
5036 }
5037
5038 int FileStore::omap_get(const coll_t& _c, const ghobject_t &hoid,
5039 bufferlist *header,
5040 map<string, bufferlist> *out)
5041 {
5042 tracepoint(objectstore, omap_get_enter, _c.c_str());
5043 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5044 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5045 Index index;
5046 int r = get_index(c, &index);
5047 if (r < 0)
5048 return r;
5049 {
5050 assert(NULL != index.index);
5051 RWLock::RLocker l((index.index)->access_lock);
5052 r = lfn_find(hoid, index);
5053 if (r < 0)
5054 return r;
5055 }
5056 r = object_map->get(hoid, header, out);
5057 if (r < 0 && r != -ENOENT) {
5058 assert(!m_filestore_fail_eio || r != -EIO);
5059 return r;
5060 }
5061 tracepoint(objectstore, omap_get_exit, 0);
5062 return 0;
5063 }
5064
5065 int FileStore::omap_get_header(
5066 const coll_t& _c,
5067 const ghobject_t &hoid,
5068 bufferlist *bl,
5069 bool allow_eio)
5070 {
5071 tracepoint(objectstore, omap_get_header_enter, _c.c_str());
5072 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5073 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5074 Index index;
5075 int r = get_index(c, &index);
5076 if (r < 0)
5077 return r;
5078 {
5079 assert(NULL != index.index);
5080 RWLock::RLocker l((index.index)->access_lock);
5081 r = lfn_find(hoid, index);
5082 if (r < 0)
5083 return r;
5084 }
5085 r = object_map->get_header(hoid, bl);
5086 if (r < 0 && r != -ENOENT) {
5087 assert(allow_eio || !m_filestore_fail_eio || r != -EIO);
5088 return r;
5089 }
5090 tracepoint(objectstore, omap_get_header_exit, 0);
5091 return 0;
5092 }
5093
5094 int FileStore::omap_get_keys(const coll_t& _c, const ghobject_t &hoid, set<string> *keys)
5095 {
5096 tracepoint(objectstore, omap_get_keys_enter, _c.c_str());
5097 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5098 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5099 Index index;
5100 int r = get_index(c, &index);
5101 if (r < 0)
5102 return r;
5103 {
5104 assert(NULL != index.index);
5105 RWLock::RLocker l((index.index)->access_lock);
5106 r = lfn_find(hoid, index);
5107 if (r < 0)
5108 return r;
5109 }
5110 r = object_map->get_keys(hoid, keys);
5111 if (r < 0 && r != -ENOENT) {
5112 assert(!m_filestore_fail_eio || r != -EIO);
5113 return r;
5114 }
5115 tracepoint(objectstore, omap_get_keys_exit, 0);
5116 return 0;
5117 }
5118
5119 int FileStore::omap_get_values(const coll_t& _c, const ghobject_t &hoid,
5120 const set<string> &keys,
5121 map<string, bufferlist> *out)
5122 {
5123 tracepoint(objectstore, omap_get_values_enter, _c.c_str());
5124 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5125 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5126 Index index;
5127 const char *where = "()";
5128 int r = get_index(c, &index);
5129 if (r < 0) {
5130 where = " (get_index)";
5131 goto out;
5132 }
5133 {
5134 assert(NULL != index.index);
5135 RWLock::RLocker l((index.index)->access_lock);
5136 r = lfn_find(hoid, index);
5137 if (r < 0) {
5138 where = " (lfn_find)";
5139 goto out;
5140 }
5141 }
5142 r = object_map->get_values(hoid, keys, out);
5143 if (r < 0 && r != -ENOENT) {
5144 assert(!m_filestore_fail_eio || r != -EIO);
5145 where = " (get_values)";
5146 goto out;
5147 }
5148 r = 0;
5149 out:
5150 tracepoint(objectstore, omap_get_values_exit, r);
5151 dout(15) << __FUNC__ << ": " << c << "/" << hoid << " = " << r
5152 << where << dendl;
5153 return r;
5154 }
5155
5156 int FileStore::omap_check_keys(const coll_t& _c, const ghobject_t &hoid,
5157 const set<string> &keys,
5158 set<string> *out)
5159 {
5160 tracepoint(objectstore, omap_check_keys_enter, _c.c_str());
5161 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5162 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5163
5164 Index index;
5165 int r = get_index(c, &index);
5166 if (r < 0)
5167 return r;
5168 {
5169 assert(NULL != index.index);
5170 RWLock::RLocker l((index.index)->access_lock);
5171 r = lfn_find(hoid, index);
5172 if (r < 0)
5173 return r;
5174 }
5175 r = object_map->check_keys(hoid, keys, out);
5176 if (r < 0 && r != -ENOENT) {
5177 assert(!m_filestore_fail_eio || r != -EIO);
5178 return r;
5179 }
5180 tracepoint(objectstore, omap_check_keys_exit, 0);
5181 return 0;
5182 }
5183
5184 ObjectMap::ObjectMapIterator FileStore::get_omap_iterator(const coll_t& _c,
5185 const ghobject_t &hoid)
5186 {
5187 tracepoint(objectstore, get_omap_iterator, _c.c_str());
5188 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5189 dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5190 Index index;
5191 int r = get_index(c, &index);
5192 if (r < 0) {
5193 dout(10) << __FUNC__ << ": " << c << "/" << hoid << " = 0 "
5194 << "(get_index failed with " << cpp_strerror(r) << ")" << dendl;
5195 return ObjectMap::ObjectMapIterator();
5196 }
5197 {
5198 assert(NULL != index.index);
5199 RWLock::RLocker l((index.index)->access_lock);
5200 r = lfn_find(hoid, index);
5201 if (r < 0) {
5202 dout(10) << __FUNC__ << ": " << c << "/" << hoid << " = 0 "
5203 << "(lfn_find failed with " << cpp_strerror(r) << ")" << dendl;
5204 return ObjectMap::ObjectMapIterator();
5205 }
5206 }
5207 return object_map->get_iterator(hoid);
5208 }
5209
5210 int FileStore::_collection_hint_expected_num_objs(const coll_t& c, uint32_t pg_num,
5211 uint64_t expected_num_objs,
5212 const SequencerPosition &spos)
5213 {
5214 dout(15) << __FUNC__ << ": collection: " << c << " pg number: "
5215 << pg_num << " expected number of objects: " << expected_num_objs << dendl;
5216
5217 bool empty;
5218 int ret = collection_empty(c, &empty);
5219 if (ret < 0)
5220 return ret;
5221 if (!empty && !replaying) {
5222 dout(0) << "Failed to give an expected number of objects hint to collection : "
5223 << c << ", only empty collection can take such type of hint. " << dendl;
5224 return 0;
5225 }
5226
5227 Index index;
5228 ret = get_index(c, &index);
5229 if (ret < 0)
5230 return ret;
5231 // Pre-hash the collection
5232 ret = index->pre_hash_collection(pg_num, expected_num_objs);
5233 dout(10) << "pre_hash_collection " << c << " = " << ret << dendl;
5234 if (ret < 0)
5235 return ret;
5236 _set_replay_guard(c, spos);
5237
5238 return 0;
5239 }
5240
5241 int FileStore::_create_collection(
5242 const coll_t& c,
5243 int bits,
5244 const SequencerPosition &spos)
5245 {
5246 char fn[PATH_MAX];
5247 get_cdir(c, fn, sizeof(fn));
5248 dout(15) << __FUNC__ << ": " << fn << dendl;
5249 int r = ::mkdir(fn, 0755);
5250 if (r < 0)
5251 r = -errno;
5252 if (r == -EEXIST && replaying)
5253 r = 0;
5254 dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
5255
5256 if (r < 0)
5257 return r;
5258 r = init_index(c);
5259 if (r < 0)
5260 return r;
5261 r = _collection_set_bits(c, bits);
5262 if (r < 0)
5263 return r;
5264 // create parallel temp collection, too
5265 if (!c.is_meta() && !c.is_temp()) {
5266 coll_t temp = c.get_temp();
5267 r = _create_collection(temp, 0, spos);
5268 if (r < 0)
5269 return r;
5270 }
5271
5272 _set_replay_guard(c, spos);
5273 return 0;
5274 }
5275
5276 int FileStore::_destroy_collection(const coll_t& c)
5277 {
5278 int r = 0;
5279 char fn[PATH_MAX];
5280 get_cdir(c, fn, sizeof(fn));
5281 dout(15) << __FUNC__ << ": " << fn << dendl;
5282 {
5283 Index from;
5284 r = get_index(c, &from);
5285 if (r < 0)
5286 goto out;
5287 assert(NULL != from.index);
5288 RWLock::WLocker l((from.index)->access_lock);
5289
5290 r = from->prep_delete();
5291 if (r < 0)
5292 goto out;
5293 }
5294 r = ::rmdir(fn);
5295 if (r < 0) {
5296 r = -errno;
5297 goto out;
5298 }
5299
5300 out:
5301 // destroy parallel temp collection, too
5302 if (!c.is_meta() && !c.is_temp()) {
5303 coll_t temp = c.get_temp();
5304 int r2 = _destroy_collection(temp);
5305 if (r2 < 0) {
5306 r = r2;
5307 goto out_final;
5308 }
5309 }
5310
5311 out_final:
5312 dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
5313 return r;
5314 }
5315
5316
5317 int FileStore::_collection_add(const coll_t& c, const coll_t& oldcid, const ghobject_t& o,
5318 const SequencerPosition& spos)
5319 {
5320 dout(15) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << o << dendl;
5321
5322 int dstcmp = _check_replay_guard(c, o, spos);
5323 if (dstcmp < 0)
5324 return 0;
5325
5326 // check the src name too; it might have a newer guard, and we don't
5327 // want to clobber it
5328 int srccmp = _check_replay_guard(oldcid, o, spos);
5329 if (srccmp < 0)
5330 return 0;
5331
5332 // open guard on object so we don't any previous operations on the
5333 // new name that will modify the source inode.
5334 FDRef fd;
5335 int r = lfn_open(oldcid, o, 0, &fd);
5336 if (r < 0) {
5337 // the source collection/object does not exist. If we are replaying, we
5338 // should be safe, so just return 0 and move on.
5339 assert(replaying);
5340 dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
5341 << oldcid << "/" << o << " (dne, continue replay) " << dendl;
5342 return 0;
5343 }
5344 if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress"
5345 _set_replay_guard(**fd, spos, &o, true);
5346 }
5347
5348 r = lfn_link(oldcid, c, o, o);
5349 if (replaying && !backend->can_checkpoint() &&
5350 r == -EEXIST) // crashed between link() and set_replay_guard()
5351 r = 0;
5352
5353 _inject_failure();
5354
5355 // close guard on object so we don't do this again
5356 if (r == 0) {
5357 _close_replay_guard(**fd, spos);
5358 }
5359 lfn_close(fd);
5360
5361 dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << o << " = " << r << dendl;
5362 return r;
5363 }
5364
5365 int FileStore::_collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid,
5366 coll_t c, const ghobject_t& o,
5367 const SequencerPosition& spos,
5368 bool allow_enoent)
5369 {
5370 dout(15) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid << dendl;
5371 int r = 0;
5372 int dstcmp, srccmp;
5373
5374 if (replaying) {
5375 /* If the destination collection doesn't exist during replay,
5376 * we need to delete the src object and continue on
5377 */
5378 if (!collection_exists(c))
5379 goto out_rm_src;
5380 }
5381
5382 dstcmp = _check_replay_guard(c, o, spos);
5383 if (dstcmp < 0)
5384 goto out_rm_src;
5385
5386 // check the src name too; it might have a newer guard, and we don't
5387 // want to clobber it
5388 srccmp = _check_replay_guard(oldcid, oldoid, spos);
5389 if (srccmp < 0)
5390 return 0;
5391
5392 {
5393 // open guard on object so we don't any previous operations on the
5394 // new name that will modify the source inode.
5395 FDRef fd;
5396 r = lfn_open(oldcid, oldoid, 0, &fd);
5397 if (r < 0) {
5398 // the source collection/object does not exist. If we are replaying, we
5399 // should be safe, so just return 0 and move on.
5400 if (replaying) {
5401 dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
5402 << oldcid << "/" << oldoid << " (dne, continue replay) " << dendl;
5403 } else if (allow_enoent) {
5404 dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
5405 << oldcid << "/" << oldoid << " (dne, ignoring enoent)"
5406 << dendl;
5407 } else {
5408 assert(0 == "ERROR: source must exist");
5409 }
5410
5411 if (!replaying) {
5412 return 0;
5413 }
5414 if (allow_enoent && dstcmp > 0) { // if dstcmp == 0, try_rename was started.
5415 return 0;
5416 }
5417
5418 r = 0; // don't know if object_map was cloned
5419 } else {
5420 if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress"
5421 _set_replay_guard(**fd, spos, &o, true);
5422 }
5423
5424 r = lfn_link(oldcid, c, oldoid, o);
5425 if (replaying && !backend->can_checkpoint() &&
5426 r == -EEXIST) // crashed between link() and set_replay_guard()
5427 r = 0;
5428
5429 lfn_close(fd);
5430 fd = FDRef();
5431
5432 _inject_failure();
5433 }
5434
5435 if (r == 0) {
5436 // the name changed; link the omap content
5437 r = object_map->rename(oldoid, o, &spos);
5438 if (r == -ENOENT)
5439 r = 0;
5440 }
5441
5442 _inject_failure();
5443
5444 if (r == 0)
5445 r = lfn_unlink(oldcid, oldoid, spos, true);
5446
5447 if (r == 0)
5448 r = lfn_open(c, o, 0, &fd);
5449
5450 // close guard on object so we don't do this again
5451 if (r == 0) {
5452 _close_replay_guard(**fd, spos, &o);
5453 lfn_close(fd);
5454 }
5455 }
5456
5457 dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid
5458 << " = " << r << dendl;
5459 return r;
5460
5461 out_rm_src:
5462 // remove source
5463 if (_check_replay_guard(oldcid, oldoid, spos) > 0) {
5464 r = lfn_unlink(oldcid, oldoid, spos, true);
5465 }
5466
5467 dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid
5468 << " = " << r << dendl;
5469 return r;
5470 }
5471
5472 void FileStore::_inject_failure()
5473 {
5474 if (m_filestore_kill_at) {
5475 int final = --m_filestore_kill_at;
5476 dout(5) << __FUNC__ << ": " << (final+1) << " -> " << final << dendl;
5477 if (final == 0) {
5478 derr << __FUNC__ << ": KILLING" << dendl;
5479 cct->_log->flush();
5480 _exit(1);
5481 }
5482 }
5483 }
5484
5485 int FileStore::_omap_clear(const coll_t& cid, const ghobject_t &hoid,
5486 const SequencerPosition &spos) {
5487 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
5488 Index index;
5489 int r = get_index(cid, &index);
5490 if (r < 0)
5491 return r;
5492 {
5493 assert(NULL != index.index);
5494 RWLock::RLocker l((index.index)->access_lock);
5495 r = lfn_find(hoid, index);
5496 if (r < 0)
5497 return r;
5498 }
5499 r = object_map->clear_keys_header(hoid, &spos);
5500 if (r < 0 && r != -ENOENT)
5501 return r;
5502 return 0;
5503 }
5504
5505 int FileStore::_omap_setkeys(const coll_t& cid, const ghobject_t &hoid,
5506 const map<string, bufferlist> &aset,
5507 const SequencerPosition &spos) {
5508 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
5509 Index index;
5510 int r;
5511 //treat pgmeta as a logical object, skip to check exist
5512 if (hoid.is_pgmeta())
5513 goto skip;
5514
5515 r = get_index(cid, &index);
5516 if (r < 0) {
5517 dout(20) << __FUNC__ << ": get_index got " << cpp_strerror(r) << dendl;
5518 return r;
5519 }
5520 {
5521 assert(NULL != index.index);
5522 RWLock::RLocker l((index.index)->access_lock);
5523 r = lfn_find(hoid, index);
5524 if (r < 0) {
5525 dout(20) << __FUNC__ << ": lfn_find got " << cpp_strerror(r) << dendl;
5526 return r;
5527 }
5528 }
5529 skip:
5530 if (g_conf->subsys.should_gather(ceph_subsys_filestore, 20)) {
5531 for (auto& p : aset) {
5532 dout(20) << __FUNC__ << ": set " << p.first << dendl;
5533 }
5534 }
5535 r = object_map->set_keys(hoid, aset, &spos);
5536 dout(20) << __FUNC__ << ": " << cid << "/" << hoid << " = " << r << dendl;
5537 return r;
5538 }
5539
5540 int FileStore::_omap_rmkeys(const coll_t& cid, const ghobject_t &hoid,
5541 const set<string> &keys,
5542 const SequencerPosition &spos) {
5543 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
5544 Index index;
5545 int r;
5546 //treat pgmeta as a logical object, skip to check exist
5547 if (hoid.is_pgmeta())
5548 goto skip;
5549
5550 r = get_index(cid, &index);
5551 if (r < 0)
5552 return r;
5553 {
5554 assert(NULL != index.index);
5555 RWLock::RLocker l((index.index)->access_lock);
5556 r = lfn_find(hoid, index);
5557 if (r < 0)
5558 return r;
5559 }
5560 skip:
5561 r = object_map->rm_keys(hoid, keys, &spos);
5562 if (r < 0 && r != -ENOENT)
5563 return r;
5564 return 0;
5565 }
5566
5567 int FileStore::_omap_rmkeyrange(const coll_t& cid, const ghobject_t &hoid,
5568 const string& first, const string& last,
5569 const SequencerPosition &spos) {
5570 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << " [" << first << "," << last << "]" << dendl;
5571 set<string> keys;
5572 {
5573 ObjectMap::ObjectMapIterator iter = get_omap_iterator(cid, hoid);
5574 if (!iter)
5575 return -ENOENT;
5576 for (iter->lower_bound(first); iter->valid() && iter->key() < last;
5577 iter->next()) {
5578 keys.insert(iter->key());
5579 }
5580 }
5581 return _omap_rmkeys(cid, hoid, keys, spos);
5582 }
5583
5584 int FileStore::_omap_setheader(const coll_t& cid, const ghobject_t &hoid,
5585 const bufferlist &bl,
5586 const SequencerPosition &spos)
5587 {
5588 dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
5589 Index index;
5590 int r = get_index(cid, &index);
5591 if (r < 0)
5592 return r;
5593 {
5594 assert(NULL != index.index);
5595 RWLock::RLocker l((index.index)->access_lock);
5596 r = lfn_find(hoid, index);
5597 if (r < 0)
5598 return r;
5599 }
5600 return object_map->set_header(hoid, bl, &spos);
5601 }
5602
5603 int FileStore::_split_collection(const coll_t& cid,
5604 uint32_t bits,
5605 uint32_t rem,
5606 coll_t dest,
5607 const SequencerPosition &spos)
5608 {
5609 int r;
5610 {
5611 dout(15) << __FUNC__ << ": " << cid << " bits: " << bits << dendl;
5612 if (!collection_exists(cid)) {
5613 dout(2) << __FUNC__ << ": " << cid << " DNE" << dendl;
5614 assert(replaying);
5615 return 0;
5616 }
5617 if (!collection_exists(dest)) {
5618 dout(2) << __FUNC__ << ": " << dest << " DNE" << dendl;
5619 assert(replaying);
5620 return 0;
5621 }
5622
5623 int dstcmp = _check_replay_guard(dest, spos);
5624 if (dstcmp < 0)
5625 return 0;
5626
5627 int srccmp = _check_replay_guard(cid, spos);
5628 if (srccmp < 0)
5629 return 0;
5630
5631 _set_global_replay_guard(cid, spos);
5632 _set_replay_guard(cid, spos, true);
5633 _set_replay_guard(dest, spos, true);
5634
5635 Index from;
5636 r = get_index(cid, &from);
5637
5638 Index to;
5639 if (!r)
5640 r = get_index(dest, &to);
5641
5642 if (!r) {
5643 assert(NULL != from.index);
5644 RWLock::WLocker l1((from.index)->access_lock);
5645
5646 assert(NULL != to.index);
5647 RWLock::WLocker l2((to.index)->access_lock);
5648
5649 r = from->split(rem, bits, to.index);
5650 }
5651
5652 _close_replay_guard(cid, spos);
5653 _close_replay_guard(dest, spos);
5654 }
5655 _collection_set_bits(cid, bits);
5656 if (!r && cct->_conf->filestore_debug_verify_split) {
5657 vector<ghobject_t> objects;
5658 ghobject_t next;
5659 while (1) {
5660 collection_list(
5661 cid,
5662 next, ghobject_t::get_max(),
5663 get_ideal_list_max(),
5664 &objects,
5665 &next);
5666 if (objects.empty())
5667 break;
5668 for (vector<ghobject_t>::iterator i = objects.begin();
5669 i != objects.end();
5670 ++i) {
5671 dout(20) << __FUNC__ << ": " << *i << " still in source "
5672 << cid << dendl;
5673 assert(!i->match(bits, rem));
5674 }
5675 objects.clear();
5676 }
5677 next = ghobject_t();
5678 while (1) {
5679 collection_list(
5680 dest,
5681 next, ghobject_t::get_max(),
5682 get_ideal_list_max(),
5683 &objects,
5684 &next);
5685 if (objects.empty())
5686 break;
5687 for (vector<ghobject_t>::iterator i = objects.begin();
5688 i != objects.end();
5689 ++i) {
5690 dout(20) << __FUNC__ << ": " << *i << " now in dest "
5691 << *i << dendl;
5692 assert(i->match(bits, rem));
5693 }
5694 objects.clear();
5695 }
5696 }
5697 return r;
5698 }
5699
5700 int FileStore::_set_alloc_hint(const coll_t& cid, const ghobject_t& oid,
5701 uint64_t expected_object_size,
5702 uint64_t expected_write_size)
5703 {
5704 dout(15) << __FUNC__ << ": " << cid << "/" << oid << " object_size " << expected_object_size << " write_size " << expected_write_size << dendl;
5705
5706 FDRef fd;
5707 int ret = 0;
5708
5709 if (expected_object_size == 0 || expected_write_size == 0)
5710 goto out;
5711
5712 ret = lfn_open(cid, oid, false, &fd);
5713 if (ret < 0)
5714 goto out;
5715
5716 {
5717 // TODO: a more elaborate hint calculation
5718 uint64_t hint = MIN(expected_write_size, m_filestore_max_alloc_hint_size);
5719
5720 ret = backend->set_alloc_hint(**fd, hint);
5721 dout(20) << __FUNC__ << ": hint " << hint << " ret " << ret << dendl;
5722 }
5723
5724 lfn_close(fd);
5725 out:
5726 dout(10) << __FUNC__ << ": " << cid << "/" << oid << " object_size " << expected_object_size << " write_size " << expected_write_size << " = " << ret << dendl;
5727 assert(!m_filestore_fail_eio || ret != -EIO);
5728 return ret;
5729 }
5730
5731 const char** FileStore::get_tracked_conf_keys() const
5732 {
5733 static const char* KEYS[] = {
5734 "filestore_max_inline_xattr_size",
5735 "filestore_max_inline_xattr_size_xfs",
5736 "filestore_max_inline_xattr_size_btrfs",
5737 "filestore_max_inline_xattr_size_other",
5738 "filestore_max_inline_xattrs",
5739 "filestore_max_inline_xattrs_xfs",
5740 "filestore_max_inline_xattrs_btrfs",
5741 "filestore_max_inline_xattrs_other",
5742 "filestore_max_xattr_value_size",
5743 "filestore_max_xattr_value_size_xfs",
5744 "filestore_max_xattr_value_size_btrfs",
5745 "filestore_max_xattr_value_size_other",
5746 "filestore_min_sync_interval",
5747 "filestore_max_sync_interval",
5748 "filestore_queue_max_ops",
5749 "filestore_queue_max_bytes",
5750 "filestore_expected_throughput_bytes",
5751 "filestore_expected_throughput_ops",
5752 "filestore_queue_low_threshhold",
5753 "filestore_queue_high_threshhold",
5754 "filestore_queue_high_delay_multiple",
5755 "filestore_queue_max_delay_multiple",
5756 "filestore_commit_timeout",
5757 "filestore_dump_file",
5758 "filestore_kill_at",
5759 "filestore_fail_eio",
5760 "filestore_fadvise",
5761 "filestore_sloppy_crc",
5762 "filestore_sloppy_crc_block_size",
5763 "filestore_max_alloc_hint_size",
5764 NULL
5765 };
5766 return KEYS;
5767 }
5768
5769 void FileStore::handle_conf_change(const struct md_config_t *conf,
5770 const std::set <std::string> &changed)
5771 {
5772 if (changed.count("filestore_max_inline_xattr_size") ||
5773 changed.count("filestore_max_inline_xattr_size_xfs") ||
5774 changed.count("filestore_max_inline_xattr_size_btrfs") ||
5775 changed.count("filestore_max_inline_xattr_size_other") ||
5776 changed.count("filestore_max_inline_xattrs") ||
5777 changed.count("filestore_max_inline_xattrs_xfs") ||
5778 changed.count("filestore_max_inline_xattrs_btrfs") ||
5779 changed.count("filestore_max_inline_xattrs_other") ||
5780 changed.count("filestore_max_xattr_value_size") ||
5781 changed.count("filestore_max_xattr_value_size_xfs") ||
5782 changed.count("filestore_max_xattr_value_size_btrfs") ||
5783 changed.count("filestore_max_xattr_value_size_other")) {
5784 if (backend) {
5785 Mutex::Locker l(lock);
5786 set_xattr_limits_via_conf();
5787 }
5788 }
5789
5790 if (changed.count("filestore_queue_max_bytes") ||
5791 changed.count("filestore_queue_max_ops") ||
5792 changed.count("filestore_expected_throughput_bytes") ||
5793 changed.count("filestore_expected_throughput_ops") ||
5794 changed.count("filestore_queue_low_threshhold") ||
5795 changed.count("filestore_queue_high_threshhold") ||
5796 changed.count("filestore_queue_high_delay_multiple") ||
5797 changed.count("filestore_queue_max_delay_multiple")) {
5798 Mutex::Locker l(lock);
5799 set_throttle_params();
5800 }
5801
5802 if (changed.count("filestore_min_sync_interval") ||
5803 changed.count("filestore_max_sync_interval") ||
5804 changed.count("filestore_kill_at") ||
5805 changed.count("filestore_fail_eio") ||
5806 changed.count("filestore_sloppy_crc") ||
5807 changed.count("filestore_sloppy_crc_block_size") ||
5808 changed.count("filestore_max_alloc_hint_size") ||
5809 changed.count("filestore_fadvise")) {
5810 Mutex::Locker l(lock);
5811 m_filestore_min_sync_interval = conf->filestore_min_sync_interval;
5812 m_filestore_max_sync_interval = conf->filestore_max_sync_interval;
5813 m_filestore_kill_at = conf->filestore_kill_at;
5814 m_filestore_fail_eio = conf->filestore_fail_eio;
5815 m_filestore_fadvise = conf->filestore_fadvise;
5816 m_filestore_sloppy_crc = conf->filestore_sloppy_crc;
5817 m_filestore_sloppy_crc_block_size = conf->filestore_sloppy_crc_block_size;
5818 m_filestore_max_alloc_hint_size = conf->filestore_max_alloc_hint_size;
5819 }
5820 if (changed.count("filestore_commit_timeout")) {
5821 Mutex::Locker l(sync_entry_timeo_lock);
5822 m_filestore_commit_timeout = conf->filestore_commit_timeout;
5823 }
5824 if (changed.count("filestore_dump_file")) {
5825 if (conf->filestore_dump_file.length() &&
5826 conf->filestore_dump_file != "-") {
5827 dump_start(conf->filestore_dump_file);
5828 } else {
5829 dump_stop();
5830 }
5831 }
5832 }
5833
5834 int FileStore::set_throttle_params()
5835 {
5836 stringstream ss;
5837 bool valid = throttle_bytes.set_params(
5838 cct->_conf->filestore_queue_low_threshhold,
5839 cct->_conf->filestore_queue_high_threshhold,
5840 cct->_conf->filestore_expected_throughput_bytes,
5841 cct->_conf->filestore_queue_high_delay_multiple,
5842 cct->_conf->filestore_queue_max_delay_multiple,
5843 cct->_conf->filestore_queue_max_bytes,
5844 &ss);
5845
5846 valid &= throttle_ops.set_params(
5847 cct->_conf->filestore_queue_low_threshhold,
5848 cct->_conf->filestore_queue_high_threshhold,
5849 cct->_conf->filestore_expected_throughput_ops,
5850 cct->_conf->filestore_queue_high_delay_multiple,
5851 cct->_conf->filestore_queue_max_delay_multiple,
5852 cct->_conf->filestore_queue_max_ops,
5853 &ss);
5854
5855 logger->set(l_filestore_op_queue_max_ops, throttle_ops.get_max());
5856 logger->set(l_filestore_op_queue_max_bytes, throttle_bytes.get_max());
5857
5858 if (!valid) {
5859 derr << "tried to set invalid params: "
5860 << ss.str()
5861 << dendl;
5862 }
5863 return valid ? 0 : -EINVAL;
5864 }
5865
5866 void FileStore::dump_start(const std::string& file)
5867 {
5868 dout(10) << __FUNC__ << ": " << file << dendl;
5869 if (m_filestore_do_dump) {
5870 dump_stop();
5871 }
5872 m_filestore_dump_fmt.reset();
5873 m_filestore_dump_fmt.open_array_section("dump");
5874 m_filestore_dump.open(file.c_str());
5875 m_filestore_do_dump = true;
5876 }
5877
5878 void FileStore::dump_stop()
5879 {
5880 dout(10) << __FUNC__ << dendl;
5881 m_filestore_do_dump = false;
5882 if (m_filestore_dump.is_open()) {
5883 m_filestore_dump_fmt.close_section();
5884 m_filestore_dump_fmt.flush(m_filestore_dump);
5885 m_filestore_dump.flush();
5886 m_filestore_dump.close();
5887 }
5888 }
5889
5890 void FileStore::dump_transactions(vector<ObjectStore::Transaction>& ls, uint64_t seq, OpSequencer *osr)
5891 {
5892 m_filestore_dump_fmt.open_array_section("transactions");
5893 unsigned trans_num = 0;
5894 for (vector<ObjectStore::Transaction>::iterator i = ls.begin(); i != ls.end(); ++i, ++trans_num) {
5895 m_filestore_dump_fmt.open_object_section("transaction");
5896 m_filestore_dump_fmt.dump_string("osr", osr->get_name());
5897 m_filestore_dump_fmt.dump_unsigned("seq", seq);
5898 m_filestore_dump_fmt.dump_unsigned("trans_num", trans_num);
5899 (*i).dump(&m_filestore_dump_fmt);
5900 m_filestore_dump_fmt.close_section();
5901 }
5902 m_filestore_dump_fmt.close_section();
5903 m_filestore_dump_fmt.flush(m_filestore_dump);
5904 m_filestore_dump.flush();
5905 }
5906
5907 void FileStore::set_xattr_limits_via_conf()
5908 {
5909 uint32_t fs_xattr_size;
5910 uint32_t fs_xattrs;
5911 uint32_t fs_xattr_max_value_size;
5912
5913 switch (m_fs_type) {
5914 #if defined(__linux__)
5915 case XFS_SUPER_MAGIC:
5916 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_xfs;
5917 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_xfs;
5918 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_xfs;
5919 break;
5920 case BTRFS_SUPER_MAGIC:
5921 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_btrfs;
5922 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_btrfs;
5923 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_btrfs;
5924 break;
5925 #endif
5926 default:
5927 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_other;
5928 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_other;
5929 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_other;
5930 break;
5931 }
5932
5933 // Use override value if set
5934 if (cct->_conf->filestore_max_inline_xattr_size)
5935 m_filestore_max_inline_xattr_size = cct->_conf->filestore_max_inline_xattr_size;
5936 else
5937 m_filestore_max_inline_xattr_size = fs_xattr_size;
5938
5939 // Use override value if set
5940 if (cct->_conf->filestore_max_inline_xattrs)
5941 m_filestore_max_inline_xattrs = cct->_conf->filestore_max_inline_xattrs;
5942 else
5943 m_filestore_max_inline_xattrs = fs_xattrs;
5944
5945 // Use override value if set
5946 if (cct->_conf->filestore_max_xattr_value_size)
5947 m_filestore_max_xattr_value_size = cct->_conf->filestore_max_xattr_value_size;
5948 else
5949 m_filestore_max_xattr_value_size = fs_xattr_max_value_size;
5950
5951 if (m_filestore_max_xattr_value_size < cct->_conf->osd_max_object_name_len) {
5952 derr << "WARNING: max attr value size ("
5953 << m_filestore_max_xattr_value_size
5954 << ") is smaller than osd_max_object_name_len ("
5955 << cct->_conf->osd_max_object_name_len
5956 << "). Your backend filesystem appears to not support attrs large "
5957 << "enough to handle the configured max rados name size. You may get "
5958 << "unexpected ENAMETOOLONG errors on rados operations or buggy "
5959 << "behavior"
5960 << dendl;
5961 }
5962 }
5963
5964 uint64_t FileStore::estimate_objects_overhead(uint64_t num_objects)
5965 {
5966 uint64_t res = num_objects * blk_size / 2; //assumes that each object uses ( in average ) additional 1/2 block due to FS allocation granularity.
5967 return res;
5968 }
5969
5970 int FileStore::apply_layout_settings(const coll_t &cid)
5971 {
5972 dout(20) << __FUNC__ << ": " << cid << dendl;
5973 Index index;
5974 int r = get_index(cid, &index);
5975 if (r < 0) {
5976 dout(10) << "Error getting index for " << cid << ": " << cpp_strerror(r)
5977 << dendl;
5978 return r;
5979 }
5980
5981 return index->apply_layout_settings();
5982 }
5983
5984
5985 // -- FSSuperblock --
5986
5987 void FSSuperblock::encode(bufferlist &bl) const
5988 {
5989 ENCODE_START(2, 1, bl);
5990 compat_features.encode(bl);
5991 ::encode(omap_backend, bl);
5992 ENCODE_FINISH(bl);
5993 }
5994
5995 void FSSuperblock::decode(bufferlist::iterator &bl)
5996 {
5997 DECODE_START(2, bl);
5998 compat_features.decode(bl);
5999 if (struct_v >= 2)
6000 ::decode(omap_backend, bl);
6001 else
6002 omap_backend = "leveldb";
6003 DECODE_FINISH(bl);
6004 }
6005
6006 void FSSuperblock::dump(Formatter *f) const
6007 {
6008 f->open_object_section("compat");
6009 compat_features.dump(f);
6010 f->dump_string("omap_backend", omap_backend);
6011 f->close_section();
6012 }
6013
6014 void FSSuperblock::generate_test_instances(list<FSSuperblock*>& o)
6015 {
6016 FSSuperblock z;
6017 o.push_back(new FSSuperblock(z));
6018 CompatSet::FeatureSet feature_compat;
6019 CompatSet::FeatureSet feature_ro_compat;
6020 CompatSet::FeatureSet feature_incompat;
6021 feature_incompat.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS);
6022 z.compat_features = CompatSet(feature_compat, feature_ro_compat,
6023 feature_incompat);
6024 o.push_back(new FSSuperblock(z));
6025 z.omap_backend = "rocksdb";
6026 o.push_back(new FSSuperblock(z));
6027 }