]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/filestore/FileStore.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / os / filestore / FileStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15 #include "include/compat.h"
16 #include "include/int_types.h"
17 #include "boost/tuple/tuple.hpp"
18
19 #include <unistd.h>
20 #include <stdlib.h>
21 #include <sys/types.h>
22 #include <sys/stat.h>
23 #include <fcntl.h>
24 #include <sys/file.h>
25 #include <errno.h>
26 #include <dirent.h>
27 #include <sys/ioctl.h>
28
29 #if defined(__linux__)
30 #include <linux/fs.h>
31 #endif
32
33 #include <iostream>
34 #include <map>
35
36 #include "include/linux_fiemap.h"
37
38 #include "common/xattr.h"
39 #include "chain_xattr.h"
40
41 #if defined(DARWIN) || defined(__FreeBSD__)
42 #include <sys/param.h>
43 #include <sys/mount.h>
44 #endif // DARWIN
45
46
47 #include <fstream>
48 #include <sstream>
49
50 #include "FileStore.h"
51 #include "GenericFileStoreBackend.h"
52 #include "BtrfsFileStoreBackend.h"
53 #include "XfsFileStoreBackend.h"
54 #include "ZFSFileStoreBackend.h"
55 #include "common/BackTrace.h"
56 #include "include/types.h"
57 #include "FileJournal.h"
58
59 #include "osd/osd_types.h"
60 #include "include/color.h"
61 #include "include/buffer.h"
62
63 #include "common/Timer.h"
64 #include "common/debug.h"
65 #include "common/errno.h"
66 #include "common/run_cmd.h"
67 #include "common/safe_io.h"
68 #include "common/perf_counters.h"
69 #include "common/sync_filesystem.h"
70 #include "common/fd.h"
71 #include "HashIndex.h"
72 #include "DBObjectMap.h"
73 #include "kv/KeyValueDB.h"
74
75 #include "common/ceph_crypto.h"
76 using ceph::crypto::SHA1;
77
78 #include "include/assert.h"
79
80 #include "common/config.h"
81 #include "common/blkdev.h"
82
83 #ifdef WITH_LTTNG
84 #define TRACEPOINT_DEFINE
85 #define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
86 #include "tracing/objectstore.h"
87 #undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE
88 #undef TRACEPOINT_DEFINE
89 #else
90 #define tracepoint(...)
91 #endif
92
93 #define dout_context cct
94 #define dout_subsys ceph_subsys_filestore
95 #undef dout_prefix
96 #define dout_prefix *_dout << "filestore(" << basedir << ") "
97
98 #define COMMIT_SNAP_ITEM "snap_%llu"
99 #define CLUSTER_SNAP_ITEM "clustersnap_%s"
100
101 #define REPLAY_GUARD_XATTR "user.cephos.seq"
102 #define GLOBAL_REPLAY_GUARD_XATTR "user.cephos.gseq"
103
104 // XATTR_SPILL_OUT_NAME as a xattr is used to maintain that indicates whether
105 // xattrs spill over into DBObjectMap, if XATTR_SPILL_OUT_NAME exists in file
106 // xattrs and the value is "no", it indicates no xattrs in DBObjectMap
107 #define XATTR_SPILL_OUT_NAME "user.cephos.spill_out"
108 #define XATTR_NO_SPILL_OUT "0"
109 #define XATTR_SPILL_OUT "1"
110
111 //Initial features in new superblock.
112 static CompatSet get_fs_initial_compat_set() {
113 CompatSet::FeatureSet ceph_osd_feature_compat;
114 CompatSet::FeatureSet ceph_osd_feature_ro_compat;
115 CompatSet::FeatureSet ceph_osd_feature_incompat;
116 return CompatSet(ceph_osd_feature_compat, ceph_osd_feature_ro_compat,
117 ceph_osd_feature_incompat);
118 }
119
120 //Features are added here that this FileStore supports.
121 static CompatSet get_fs_supported_compat_set() {
122 CompatSet compat = get_fs_initial_compat_set();
123 //Any features here can be set in code, but not in initial superblock
124 compat.incompat.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS);
125 return compat;
126 }
127
128 int FileStore::validate_hobject_key(const hobject_t &obj) const
129 {
130 unsigned len = LFNIndex::get_max_escaped_name_len(obj);
131 return len > m_filestore_max_xattr_value_size ? -ENAMETOOLONG : 0;
132 }
133
134 int FileStore::get_block_device_fsid(CephContext* cct, const string& path,
135 uuid_d *fsid)
136 {
137 // make sure we don't try to use aio or direct_io (and get annoying
138 // error messages from failing to do so); performance implications
139 // should be irrelevant for this use
140 FileJournal j(cct, *fsid, 0, 0, path.c_str(), false, false);
141 return j.peek_fsid(*fsid);
142 }
143
144 void FileStore::FSPerfTracker::update_from_perfcounters(
145 PerfCounters &logger)
146 {
147 os_commit_latency.consume_next(
148 logger.get_tavg_ms(
149 l_filestore_journal_latency));
150 os_apply_latency.consume_next(
151 logger.get_tavg_ms(
152 l_filestore_apply_latency));
153 }
154
155
156 ostream& operator<<(ostream& out, const FileStore::OpSequencer& s)
157 {
158 return out << *s.parent;
159 }
160
161 int FileStore::get_cdir(const coll_t& cid, char *s, int len)
162 {
163 const string &cid_str(cid.to_str());
164 return snprintf(s, len, "%s/current/%s", basedir.c_str(), cid_str.c_str());
165 }
166
167 int FileStore::get_index(const coll_t& cid, Index *index)
168 {
169 int r = index_manager.get_index(cid, basedir, index);
170 assert(!m_filestore_fail_eio || r != -EIO);
171 return r;
172 }
173
174 int FileStore::init_index(const coll_t& cid)
175 {
176 char path[PATH_MAX];
177 get_cdir(cid, path, sizeof(path));
178 int r = index_manager.init_index(cid, path, target_version);
179 assert(!m_filestore_fail_eio || r != -EIO);
180 return r;
181 }
182
183 int FileStore::lfn_find(const ghobject_t& oid, const Index& index, IndexedPath *path)
184 {
185 IndexedPath path2;
186 if (!path)
187 path = &path2;
188 int r, exist;
189 assert(NULL != index.index);
190 r = (index.index)->lookup(oid, path, &exist);
191 if (r < 0) {
192 assert(!m_filestore_fail_eio || r != -EIO);
193 return r;
194 }
195 if (!exist)
196 return -ENOENT;
197 return 0;
198 }
199
200 int FileStore::lfn_truncate(const coll_t& cid, const ghobject_t& oid, off_t length)
201 {
202 FDRef fd;
203 int r = lfn_open(cid, oid, false, &fd);
204 if (r < 0)
205 return r;
206 r = ::ftruncate(**fd, length);
207 if (r < 0)
208 r = -errno;
209 if (r >= 0 && m_filestore_sloppy_crc) {
210 int rc = backend->_crc_update_truncate(**fd, length);
211 assert(rc >= 0);
212 }
213 lfn_close(fd);
214 assert(!m_filestore_fail_eio || r != -EIO);
215 return r;
216 }
217
218 int FileStore::lfn_stat(const coll_t& cid, const ghobject_t& oid, struct stat *buf)
219 {
220 IndexedPath path;
221 Index index;
222 int r = get_index(cid, &index);
223 if (r < 0)
224 return r;
225
226 assert(NULL != index.index);
227 RWLock::RLocker l((index.index)->access_lock);
228
229 r = lfn_find(oid, index, &path);
230 if (r < 0)
231 return r;
232 r = ::stat(path->path(), buf);
233 if (r < 0)
234 r = -errno;
235 return r;
236 }
237
238 int FileStore::lfn_open(const coll_t& cid,
239 const ghobject_t& oid,
240 bool create,
241 FDRef *outfd,
242 Index *index)
243 {
244 assert(outfd);
245 int r = 0;
246 bool need_lock = true;
247 int flags = O_RDWR;
248
249 if (create)
250 flags |= O_CREAT;
251 if (cct->_conf->filestore_odsync_write) {
252 flags |= O_DSYNC;
253 }
254
255 Index index2;
256 if (!index) {
257 index = &index2;
258 }
259 if (!((*index).index)) {
260 r = get_index(cid, index);
261 if (r < 0) {
262 dout(10) << __func__ << " could not get index r = " << r << dendl;
263 return r;
264 }
265 } else {
266 need_lock = false;
267 }
268
269 int fd, exist;
270 assert(NULL != (*index).index);
271 if (need_lock) {
272 ((*index).index)->access_lock.get_write();
273 }
274 if (!replaying) {
275 *outfd = fdcache.lookup(oid);
276 if (*outfd) {
277 if (need_lock) {
278 ((*index).index)->access_lock.put_write();
279 }
280 return 0;
281 }
282 }
283
284
285 IndexedPath path2;
286 IndexedPath *path = &path2;
287
288 r = (*index)->lookup(oid, path, &exist);
289 if (r < 0) {
290 derr << "could not find " << oid << " in index: "
291 << cpp_strerror(-r) << dendl;
292 goto fail;
293 }
294
295 r = ::open((*path)->path(), flags, 0644);
296 if (r < 0) {
297 r = -errno;
298 dout(10) << "error opening file " << (*path)->path() << " with flags="
299 << flags << ": " << cpp_strerror(-r) << dendl;
300 goto fail;
301 }
302 fd = r;
303 if (create && (!exist)) {
304 r = (*index)->created(oid, (*path)->path());
305 if (r < 0) {
306 VOID_TEMP_FAILURE_RETRY(::close(fd));
307 derr << "error creating " << oid << " (" << (*path)->path()
308 << ") in index: " << cpp_strerror(-r) << dendl;
309 goto fail;
310 }
311 r = chain_fsetxattr<true, true>(
312 fd, XATTR_SPILL_OUT_NAME,
313 XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT));
314 if (r < 0) {
315 VOID_TEMP_FAILURE_RETRY(::close(fd));
316 derr << "error setting spillout xattr for oid " << oid << " (" << (*path)->path()
317 << "):" << cpp_strerror(-r) << dendl;
318 goto fail;
319 }
320 }
321
322 if (!replaying) {
323 bool existed;
324 *outfd = fdcache.add(oid, fd, &existed);
325 if (existed) {
326 TEMP_FAILURE_RETRY(::close(fd));
327 }
328 } else {
329 *outfd = std::make_shared<FDCache::FD>(fd);
330 }
331
332 if (need_lock) {
333 ((*index).index)->access_lock.put_write();
334 }
335
336 return 0;
337
338 fail:
339
340 if (need_lock) {
341 ((*index).index)->access_lock.put_write();
342 }
343
344 assert(!m_filestore_fail_eio || r != -EIO);
345 return r;
346 }
347
348 void FileStore::lfn_close(FDRef fd)
349 {
350 }
351
352 int FileStore::lfn_link(const coll_t& c, const coll_t& newcid, const ghobject_t& o, const ghobject_t& newoid)
353 {
354 Index index_new, index_old;
355 IndexedPath path_new, path_old;
356 int exist;
357 int r;
358 bool index_same = false;
359 if (c < newcid) {
360 r = get_index(newcid, &index_new);
361 if (r < 0)
362 return r;
363 r = get_index(c, &index_old);
364 if (r < 0)
365 return r;
366 } else if (c == newcid) {
367 r = get_index(c, &index_old);
368 if (r < 0)
369 return r;
370 index_new = index_old;
371 index_same = true;
372 } else {
373 r = get_index(c, &index_old);
374 if (r < 0)
375 return r;
376 r = get_index(newcid, &index_new);
377 if (r < 0)
378 return r;
379 }
380
381 assert(NULL != index_old.index);
382 assert(NULL != index_new.index);
383
384 if (!index_same) {
385
386 RWLock::RLocker l1((index_old.index)->access_lock);
387
388 r = index_old->lookup(o, &path_old, &exist);
389 if (r < 0) {
390 assert(!m_filestore_fail_eio || r != -EIO);
391 return r;
392 }
393 if (!exist)
394 return -ENOENT;
395
396 RWLock::WLocker l2((index_new.index)->access_lock);
397
398 r = index_new->lookup(newoid, &path_new, &exist);
399 if (r < 0) {
400 assert(!m_filestore_fail_eio || r != -EIO);
401 return r;
402 }
403 if (exist)
404 return -EEXIST;
405
406 dout(25) << "lfn_link path_old: " << path_old << dendl;
407 dout(25) << "lfn_link path_new: " << path_new << dendl;
408 r = ::link(path_old->path(), path_new->path());
409 if (r < 0)
410 return -errno;
411
412 r = index_new->created(newoid, path_new->path());
413 if (r < 0) {
414 assert(!m_filestore_fail_eio || r != -EIO);
415 return r;
416 }
417 } else {
418 RWLock::WLocker l1((index_old.index)->access_lock);
419
420 r = index_old->lookup(o, &path_old, &exist);
421 if (r < 0) {
422 assert(!m_filestore_fail_eio || r != -EIO);
423 return r;
424 }
425 if (!exist)
426 return -ENOENT;
427
428 r = index_new->lookup(newoid, &path_new, &exist);
429 if (r < 0) {
430 assert(!m_filestore_fail_eio || r != -EIO);
431 return r;
432 }
433 if (exist)
434 return -EEXIST;
435
436 dout(25) << "lfn_link path_old: " << path_old << dendl;
437 dout(25) << "lfn_link path_new: " << path_new << dendl;
438 r = ::link(path_old->path(), path_new->path());
439 if (r < 0)
440 return -errno;
441
442 // make sure old fd for unlinked/overwritten file is gone
443 fdcache.clear(newoid);
444
445 r = index_new->created(newoid, path_new->path());
446 if (r < 0) {
447 assert(!m_filestore_fail_eio || r != -EIO);
448 return r;
449 }
450 }
451 return 0;
452 }
453
454 int FileStore::lfn_unlink(const coll_t& cid, const ghobject_t& o,
455 const SequencerPosition &spos,
456 bool force_clear_omap)
457 {
458 Index index;
459 int r = get_index(cid, &index);
460 if (r < 0) {
461 dout(25) << __func__ << " get_index failed " << cpp_strerror(r) << dendl;
462 return r;
463 }
464
465 assert(NULL != index.index);
466 RWLock::WLocker l((index.index)->access_lock);
467
468 {
469 IndexedPath path;
470 int hardlink;
471 r = index->lookup(o, &path, &hardlink);
472 if (r < 0) {
473 assert(!m_filestore_fail_eio || r != -EIO);
474 return r;
475 }
476
477 if (!force_clear_omap) {
478 if (hardlink == 0 || hardlink == 1) {
479 force_clear_omap = true;
480 }
481 }
482 if (force_clear_omap) {
483 dout(20) << __func__ << ": clearing omap on " << o
484 << " in cid " << cid << dendl;
485 r = object_map->clear(o, &spos);
486 if (r < 0 && r != -ENOENT) {
487 dout(25) << __func__ << " omap clear failed " << cpp_strerror(r) << dendl;
488 assert(!m_filestore_fail_eio || r != -EIO);
489 return r;
490 }
491 if (cct->_conf->filestore_debug_inject_read_err) {
492 debug_obj_on_delete(o);
493 }
494 if (!m_disable_wbthrottle) {
495 wbthrottle.clear_object(o); // should be only non-cache ref
496 }
497 fdcache.clear(o);
498 } else {
499 /* Ensure that replay of this op doesn't result in the object_map
500 * going away.
501 */
502 if (!backend->can_checkpoint())
503 object_map->sync(&o, &spos);
504 }
505 if (hardlink == 0) {
506 if (!m_disable_wbthrottle) {
507 wbthrottle.clear_object(o); // should be only non-cache ref
508 }
509 return 0;
510 }
511 }
512 r = index->unlink(o);
513 if (r < 0) {
514 dout(25) << __func__ << " index unlink failed " << cpp_strerror(r) << dendl;
515 return r;
516 }
517 return 0;
518 }
519
520 FileStore::FileStore(CephContext* cct, const std::string &base,
521 const std::string &jdev, osflagbits_t flags,
522 const char *name, bool do_update) :
523 JournalingObjectStore(cct, base),
524 internal_name(name),
525 basedir(base), journalpath(jdev),
526 generic_flags(flags),
527 blk_size(0),
528 fsid_fd(-1), op_fd(-1),
529 basedir_fd(-1), current_fd(-1),
530 backend(NULL),
531 index_manager(cct, do_update),
532 lock("FileStore::lock"),
533 force_sync(false),
534 sync_entry_timeo_lock("FileStore::sync_entry_timeo_lock"),
535 timer(cct, sync_entry_timeo_lock),
536 stop(false), sync_thread(this),
537 fdcache(cct),
538 wbthrottle(cct),
539 next_osr_id(0),
540 m_disable_wbthrottle(cct->_conf->filestore_odsync_write ||
541 !cct->_conf->filestore_wbthrottle_enable),
542 throttle_ops(cct, "filestore_ops", cct->_conf->filestore_caller_concurrency),
543 throttle_bytes(cct, "filestore_bytes", cct->_conf->filestore_caller_concurrency),
544 m_ondisk_finisher_num(cct->_conf->filestore_ondisk_finisher_threads),
545 m_apply_finisher_num(cct->_conf->filestore_apply_finisher_threads),
546 op_tp(cct, "FileStore::op_tp", "tp_fstore_op", cct->_conf->filestore_op_threads, "filestore_op_threads"),
547 op_wq(this, cct->_conf->filestore_op_thread_timeout,
548 cct->_conf->filestore_op_thread_suicide_timeout, &op_tp),
549 logger(NULL),
550 trace_endpoint("0.0.0.0", 0, "FileStore"),
551 read_error_lock("FileStore::read_error_lock"),
552 m_filestore_commit_timeout(cct->_conf->filestore_commit_timeout),
553 m_filestore_journal_parallel(cct->_conf->filestore_journal_parallel ),
554 m_filestore_journal_trailing(cct->_conf->filestore_journal_trailing),
555 m_filestore_journal_writeahead(cct->_conf->filestore_journal_writeahead),
556 m_filestore_fiemap_threshold(cct->_conf->filestore_fiemap_threshold),
557 m_filestore_max_sync_interval(cct->_conf->filestore_max_sync_interval),
558 m_filestore_min_sync_interval(cct->_conf->filestore_min_sync_interval),
559 m_filestore_fail_eio(cct->_conf->filestore_fail_eio),
560 m_filestore_fadvise(cct->_conf->filestore_fadvise),
561 do_update(do_update),
562 m_journal_dio(cct->_conf->journal_dio),
563 m_journal_aio(cct->_conf->journal_aio),
564 m_journal_force_aio(cct->_conf->journal_force_aio),
565 m_osd_rollback_to_cluster_snap(cct->_conf->osd_rollback_to_cluster_snap),
566 m_osd_use_stale_snap(cct->_conf->osd_use_stale_snap),
567 m_filestore_do_dump(false),
568 m_filestore_dump_fmt(true),
569 m_filestore_sloppy_crc(cct->_conf->filestore_sloppy_crc),
570 m_filestore_sloppy_crc_block_size(cct->_conf->filestore_sloppy_crc_block_size),
571 m_filestore_max_alloc_hint_size(cct->_conf->filestore_max_alloc_hint_size),
572 m_fs_type(0),
573 m_filestore_max_inline_xattr_size(0),
574 m_filestore_max_inline_xattrs(0),
575 m_filestore_max_xattr_value_size(0)
576 {
577 m_filestore_kill_at.set(cct->_conf->filestore_kill_at);
578 for (int i = 0; i < m_ondisk_finisher_num; ++i) {
579 ostringstream oss;
580 oss << "filestore-ondisk-" << i;
581 Finisher *f = new Finisher(cct, oss.str(), "fn_odsk_fstore");
582 ondisk_finishers.push_back(f);
583 }
584 for (int i = 0; i < m_apply_finisher_num; ++i) {
585 ostringstream oss;
586 oss << "filestore-apply-" << i;
587 Finisher *f = new Finisher(cct, oss.str(), "fn_appl_fstore");
588 apply_finishers.push_back(f);
589 }
590
591 ostringstream oss;
592 oss << basedir << "/current";
593 current_fn = oss.str();
594
595 ostringstream sss;
596 sss << basedir << "/current/commit_op_seq";
597 current_op_seq_fn = sss.str();
598
599 ostringstream omss;
600 if (cct->_conf->filestore_omap_backend_path != "") {
601 omap_dir = cct->_conf->filestore_omap_backend_path;
602 } else {
603 omss << basedir << "/current/omap";
604 omap_dir = omss.str();
605 }
606
607 // initialize logger
608 PerfCountersBuilder plb(cct, internal_name, l_filestore_first, l_filestore_last);
609
610 plb.add_u64(l_filestore_journal_queue_ops, "journal_queue_ops", "Operations in journal queue");
611 plb.add_u64(l_filestore_journal_ops, "journal_ops", "Active journal entries to be applied");
612 plb.add_u64(l_filestore_journal_queue_bytes, "journal_queue_bytes", "Size of journal queue");
613 plb.add_u64(l_filestore_journal_bytes, "journal_bytes", "Active journal operation size to be applied");
614 plb.add_time_avg(l_filestore_journal_latency, "journal_latency", "Average journal queue completing latency");
615 plb.add_u64_counter(l_filestore_journal_wr, "journal_wr", "Journal write IOs");
616 plb.add_u64_avg(l_filestore_journal_wr_bytes, "journal_wr_bytes", "Journal data written");
617 plb.add_u64(l_filestore_op_queue_max_ops, "op_queue_max_ops", "Max operations in writing to FS queue");
618 plb.add_u64(l_filestore_op_queue_ops, "op_queue_ops", "Operations in writing to FS queue");
619 plb.add_u64_counter(l_filestore_ops, "ops", "Operations written to store");
620 plb.add_u64(l_filestore_op_queue_max_bytes, "op_queue_max_bytes", "Max data in writing to FS queue");
621 plb.add_u64(l_filestore_op_queue_bytes, "op_queue_bytes", "Size of writing to FS queue");
622 plb.add_u64_counter(l_filestore_bytes, "bytes", "Data written to store");
623 plb.add_time_avg(l_filestore_apply_latency, "apply_latency", "Apply latency");
624 plb.add_u64(l_filestore_committing, "committing", "Is currently committing");
625
626 plb.add_u64_counter(l_filestore_commitcycle, "commitcycle", "Commit cycles");
627 plb.add_time_avg(l_filestore_commitcycle_interval, "commitcycle_interval", "Average interval between commits");
628 plb.add_time_avg(l_filestore_commitcycle_latency, "commitcycle_latency", "Average latency of commit");
629 plb.add_u64_counter(l_filestore_journal_full, "journal_full", "Journal writes while full");
630 plb.add_time_avg(l_filestore_queue_transaction_latency_avg, "queue_transaction_latency_avg", "Store operation queue latency");
631
632 logger = plb.create_perf_counters();
633
634 cct->get_perfcounters_collection()->add(logger);
635 cct->_conf->add_observer(this);
636
637 superblock.compat_features = get_fs_initial_compat_set();
638 }
639
640 FileStore::~FileStore()
641 {
642 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
643 delete *it;
644 *it = NULL;
645 }
646 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
647 delete *it;
648 *it = NULL;
649 }
650 cct->_conf->remove_observer(this);
651 cct->get_perfcounters_collection()->remove(logger);
652
653 if (journal)
654 journal->logger = NULL;
655 delete logger;
656
657 if (m_filestore_do_dump) {
658 dump_stop();
659 }
660 }
661
662 static void get_attrname(const char *name, char *buf, int len)
663 {
664 snprintf(buf, len, "user.ceph.%s", name);
665 }
666
667 bool parse_attrname(char **name)
668 {
669 if (strncmp(*name, "user.ceph.", 10) == 0) {
670 *name += 10;
671 return true;
672 }
673 return false;
674 }
675
676 void FileStore::collect_metadata(map<string,string> *pm)
677 {
678 char partition_path[PATH_MAX];
679 char dev_node[PATH_MAX];
680 int rc = 0;
681
682 (*pm)["filestore_backend"] = backend->get_name();
683 ostringstream ss;
684 ss << "0x" << std::hex << m_fs_type << std::dec;
685 (*pm)["filestore_f_type"] = ss.str();
686
687 if (cct->_conf->filestore_collect_device_partition_information) {
688 rc = get_device_by_uuid(get_fsid(), "PARTUUID", partition_path,
689 dev_node);
690 } else {
691 rc = -EINVAL;
692 }
693
694 switch (rc) {
695 case -EOPNOTSUPP:
696 case -EINVAL:
697 (*pm)["backend_filestore_partition_path"] = "unknown";
698 (*pm)["backend_filestore_dev_node"] = "unknown";
699 break;
700 case -ENODEV:
701 (*pm)["backend_filestore_partition_path"] = string(partition_path);
702 (*pm)["backend_filestore_dev_node"] = "unknown";
703 break;
704 default:
705 (*pm)["backend_filestore_partition_path"] = string(partition_path);
706 (*pm)["backend_filestore_dev_node"] = string(dev_node);
707 }
708 }
709
710 int FileStore::statfs(struct store_statfs_t *buf0)
711 {
712 struct statfs buf;
713 buf0->reset();
714 if (::statfs(basedir.c_str(), &buf) < 0) {
715 int r = -errno;
716 assert(!m_filestore_fail_eio || r != -EIO);
717 assert(r != -ENOENT);
718 return r;
719 }
720 buf0->total = buf.f_blocks * buf.f_bsize;
721 buf0->available = buf.f_bavail * buf.f_bsize;
722 // Adjust for writes pending in the journal
723 if (journal) {
724 uint64_t estimate = journal->get_journal_size_estimate();
725 if (buf0->available > estimate)
726 buf0->available -= estimate;
727 else
728 buf0->available = 0;
729 }
730 return 0;
731 }
732
733
734 void FileStore::new_journal()
735 {
736 if (journalpath.length()) {
737 dout(10) << "open_journal at " << journalpath << dendl;
738 journal = new FileJournal(cct, fsid, &finisher, &sync_cond,
739 journalpath.c_str(),
740 m_journal_dio, m_journal_aio,
741 m_journal_force_aio);
742 if (journal)
743 journal->logger = logger;
744 }
745 return;
746 }
747
748 int FileStore::dump_journal(ostream& out)
749 {
750 int r;
751
752 if (!journalpath.length())
753 return -EINVAL;
754
755 FileJournal *journal = new FileJournal(cct, fsid, &finisher, &sync_cond, journalpath.c_str(), m_journal_dio);
756 r = journal->dump(out);
757 delete journal;
758 return r;
759 }
760
761 FileStoreBackend *FileStoreBackend::create(long f_type, FileStore *fs)
762 {
763 switch (f_type) {
764 #if defined(__linux__)
765 case BTRFS_SUPER_MAGIC:
766 return new BtrfsFileStoreBackend(fs);
767 # ifdef HAVE_LIBXFS
768 case XFS_SUPER_MAGIC:
769 return new XfsFileStoreBackend(fs);
770 # endif
771 #endif
772 #ifdef HAVE_LIBZFS
773 case ZFS_SUPER_MAGIC:
774 return new ZFSFileStoreBackend(fs);
775 #endif
776 default:
777 return new GenericFileStoreBackend(fs);
778 }
779 }
780
781 void FileStore::create_backend(long f_type)
782 {
783 m_fs_type = f_type;
784
785 assert(backend == NULL);
786 backend = FileStoreBackend::create(f_type, this);
787
788 dout(0) << "backend " << backend->get_name()
789 << " (magic 0x" << std::hex << f_type << std::dec << ")"
790 << dendl;
791
792 switch (f_type) {
793 #if defined(__linux__)
794 case BTRFS_SUPER_MAGIC:
795 if (!m_disable_wbthrottle){
796 wbthrottle.set_fs(WBThrottle::BTRFS);
797 }
798 break;
799
800 case XFS_SUPER_MAGIC:
801 // wbthrottle is constructed with fs(WBThrottle::XFS)
802 break;
803 #endif
804 }
805
806 set_xattr_limits_via_conf();
807 }
808
809 int FileStore::mkfs()
810 {
811 int ret = 0;
812 char fsid_fn[PATH_MAX];
813 char fsid_str[40];
814 uuid_d old_fsid;
815 uuid_d old_omap_fsid;
816
817 dout(1) << "mkfs in " << basedir << dendl;
818 basedir_fd = ::open(basedir.c_str(), O_RDONLY);
819 if (basedir_fd < 0) {
820 ret = -errno;
821 derr << "mkfs failed to open base dir " << basedir << ": " << cpp_strerror(ret) << dendl;
822 return ret;
823 }
824
825 // open+lock fsid
826 snprintf(fsid_fn, sizeof(fsid_fn), "%s/fsid", basedir.c_str());
827 fsid_fd = ::open(fsid_fn, O_RDWR|O_CREAT, 0644);
828 if (fsid_fd < 0) {
829 ret = -errno;
830 derr << "mkfs: failed to open " << fsid_fn << ": " << cpp_strerror(ret) << dendl;
831 goto close_basedir_fd;
832 }
833
834 if (lock_fsid() < 0) {
835 ret = -EBUSY;
836 goto close_fsid_fd;
837 }
838
839 if (read_fsid(fsid_fd, &old_fsid) < 0 || old_fsid.is_zero()) {
840 if (fsid.is_zero()) {
841 fsid.generate_random();
842 dout(1) << "mkfs generated fsid " << fsid << dendl;
843 } else {
844 dout(1) << "mkfs using provided fsid " << fsid << dendl;
845 }
846
847 fsid.print(fsid_str);
848 strcat(fsid_str, "\n");
849 ret = ::ftruncate(fsid_fd, 0);
850 if (ret < 0) {
851 ret = -errno;
852 derr << "mkfs: failed to truncate fsid: "
853 << cpp_strerror(ret) << dendl;
854 goto close_fsid_fd;
855 }
856 ret = safe_write(fsid_fd, fsid_str, strlen(fsid_str));
857 if (ret < 0) {
858 derr << "mkfs: failed to write fsid: "
859 << cpp_strerror(ret) << dendl;
860 goto close_fsid_fd;
861 }
862 if (::fsync(fsid_fd) < 0) {
863 ret = -errno;
864 derr << "mkfs: close failed: can't write fsid: "
865 << cpp_strerror(ret) << dendl;
866 goto close_fsid_fd;
867 }
868 dout(10) << "mkfs fsid is " << fsid << dendl;
869 } else {
870 if (!fsid.is_zero() && fsid != old_fsid) {
871 derr << "mkfs on-disk fsid " << old_fsid << " != provided " << fsid << dendl;
872 ret = -EINVAL;
873 goto close_fsid_fd;
874 }
875 fsid = old_fsid;
876 dout(1) << "mkfs fsid is already set to " << fsid << dendl;
877 }
878
879 // version stamp
880 ret = write_version_stamp();
881 if (ret < 0) {
882 derr << "mkfs: write_version_stamp() failed: "
883 << cpp_strerror(ret) << dendl;
884 goto close_fsid_fd;
885 }
886
887 // superblock
888 superblock.omap_backend = cct->_conf->filestore_omap_backend;
889 ret = write_superblock();
890 if (ret < 0) {
891 derr << "mkfs: write_superblock() failed: "
892 << cpp_strerror(ret) << dendl;
893 goto close_fsid_fd;
894 }
895
896 struct statfs basefs;
897 ret = ::fstatfs(basedir_fd, &basefs);
898 if (ret < 0) {
899 ret = -errno;
900 derr << "mkfs cannot fstatfs basedir "
901 << cpp_strerror(ret) << dendl;
902 goto close_fsid_fd;
903 }
904
905 create_backend(basefs.f_type);
906
907 ret = backend->create_current();
908 if (ret < 0) {
909 derr << "mkfs: failed to create current/ " << cpp_strerror(ret) << dendl;
910 goto close_fsid_fd;
911 }
912
913 // write initial op_seq
914 {
915 uint64_t initial_seq = 0;
916 int fd = read_op_seq(&initial_seq);
917 if (fd < 0) {
918 ret = fd;
919 derr << "mkfs: failed to create " << current_op_seq_fn << ": "
920 << cpp_strerror(ret) << dendl;
921 goto close_fsid_fd;
922 }
923 if (initial_seq == 0) {
924 ret = write_op_seq(fd, 1);
925 if (ret < 0) {
926 VOID_TEMP_FAILURE_RETRY(::close(fd));
927 derr << "mkfs: failed to write to " << current_op_seq_fn << ": "
928 << cpp_strerror(ret) << dendl;
929 goto close_fsid_fd;
930 }
931
932 if (backend->can_checkpoint()) {
933 // create snap_1 too
934 current_fd = ::open(current_fn.c_str(), O_RDONLY);
935 assert(current_fd >= 0);
936 char s[NAME_MAX];
937 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, 1ull);
938 ret = backend->create_checkpoint(s, NULL);
939 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
940 if (ret < 0 && ret != -EEXIST) {
941 VOID_TEMP_FAILURE_RETRY(::close(fd));
942 derr << "mkfs: failed to create snap_1: " << cpp_strerror(ret) << dendl;
943 goto close_fsid_fd;
944 }
945 }
946 }
947 VOID_TEMP_FAILURE_RETRY(::close(fd));
948 }
949 ret = KeyValueDB::test_init(superblock.omap_backend, omap_dir);
950 if (ret < 0) {
951 derr << "mkfs failed to create " << cct->_conf->filestore_omap_backend << dendl;
952 goto close_fsid_fd;
953 }
954 // create fsid under omap
955 // open+lock fsid
956 int omap_fsid_fd;
957 char omap_fsid_fn[PATH_MAX];
958 snprintf(omap_fsid_fn, sizeof(omap_fsid_fn), "%s/osd_uuid", omap_dir.c_str());
959 omap_fsid_fd = ::open(omap_fsid_fn, O_RDWR|O_CREAT, 0644);
960 if (omap_fsid_fd < 0) {
961 ret = -errno;
962 derr << "mkfs: failed to open " << omap_fsid_fn << ": " << cpp_strerror(ret) << dendl;
963 goto close_fsid_fd;
964 }
965
966 if (read_fsid(omap_fsid_fd, &old_omap_fsid) < 0 || old_omap_fsid.is_zero()) {
967 assert(!fsid.is_zero());
968 fsid.print(fsid_str);
969 strcat(fsid_str, "\n");
970 ret = ::ftruncate(omap_fsid_fd, 0);
971 if (ret < 0) {
972 ret = -errno;
973 derr << "mkfs: failed to truncate fsid: "
974 << cpp_strerror(ret) << dendl;
975 goto close_omap_fsid_fd;
976 }
977 ret = safe_write(omap_fsid_fd, fsid_str, strlen(fsid_str));
978 if (ret < 0) {
979 derr << "mkfs: failed to write fsid: "
980 << cpp_strerror(ret) << dendl;
981 goto close_omap_fsid_fd;
982 }
983 dout(10) << "mkfs: write success, fsid:" << fsid_str << ", ret:" << ret << dendl;
984 if (::fsync(omap_fsid_fd) < 0) {
985 ret = -errno;
986 derr << "mkfs: close failed: can't write fsid: "
987 << cpp_strerror(ret) << dendl;
988 goto close_omap_fsid_fd;
989 }
990 dout(10) << "mkfs omap fsid is " << fsid << dendl;
991 } else {
992 if (fsid != old_omap_fsid) {
993 derr << "FileStore::mkfs: " << omap_fsid_fn
994 << " has existed omap fsid " << old_omap_fsid
995 << " != expected osd fsid " << fsid
996 << dendl;
997 ret = -EINVAL;
998 goto close_omap_fsid_fd;
999 }
1000 dout(1) << "FileStore::mkfs: omap fsid is already set to " << fsid << dendl;
1001 }
1002
1003 dout(1) << cct->_conf->filestore_omap_backend << " db exists/created" << dendl;
1004
1005 // journal?
1006 ret = mkjournal();
1007 if (ret)
1008 goto close_omap_fsid_fd;
1009
1010 ret = write_meta("type", "filestore");
1011 if (ret)
1012 goto close_omap_fsid_fd;
1013
1014 dout(1) << "mkfs done in " << basedir << dendl;
1015 ret = 0;
1016
1017 close_omap_fsid_fd:
1018 VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd));
1019 close_fsid_fd:
1020 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1021 fsid_fd = -1;
1022 close_basedir_fd:
1023 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1024 delete backend;
1025 backend = NULL;
1026 return ret;
1027 }
1028
1029 int FileStore::mkjournal()
1030 {
1031 // read fsid
1032 int ret;
1033 char fn[PATH_MAX];
1034 snprintf(fn, sizeof(fn), "%s/fsid", basedir.c_str());
1035 int fd = ::open(fn, O_RDONLY, 0644);
1036 if (fd < 0) {
1037 int err = errno;
1038 derr << "FileStore::mkjournal: open error: " << cpp_strerror(err) << dendl;
1039 return -err;
1040 }
1041 ret = read_fsid(fd, &fsid);
1042 if (ret < 0) {
1043 derr << "FileStore::mkjournal: read error: " << cpp_strerror(ret) << dendl;
1044 VOID_TEMP_FAILURE_RETRY(::close(fd));
1045 return ret;
1046 }
1047 VOID_TEMP_FAILURE_RETRY(::close(fd));
1048
1049 ret = 0;
1050
1051 new_journal();
1052 if (journal) {
1053 ret = journal->check();
1054 if (ret < 0) {
1055 ret = journal->create();
1056 if (ret)
1057 derr << "mkjournal error creating journal on " << journalpath
1058 << ": " << cpp_strerror(ret) << dendl;
1059 else
1060 dout(0) << "mkjournal created journal on " << journalpath << dendl;
1061 }
1062 delete journal;
1063 journal = 0;
1064 }
1065 return ret;
1066 }
1067
1068 int FileStore::read_fsid(int fd, uuid_d *uuid)
1069 {
1070 char fsid_str[40];
1071 memset(fsid_str, 0, sizeof(fsid_str));
1072 int ret = safe_read(fd, fsid_str, sizeof(fsid_str));
1073 if (ret < 0)
1074 return ret;
1075 if (ret == 8) {
1076 // old 64-bit fsid... mirror it.
1077 *(uint64_t*)&uuid->bytes()[0] = *(uint64_t*)fsid_str;
1078 *(uint64_t*)&uuid->bytes()[8] = *(uint64_t*)fsid_str;
1079 return 0;
1080 }
1081
1082 if (ret > 36)
1083 fsid_str[36] = 0;
1084 else
1085 fsid_str[ret] = 0;
1086 if (!uuid->parse(fsid_str))
1087 return -EINVAL;
1088 return 0;
1089 }
1090
1091 int FileStore::lock_fsid()
1092 {
1093 struct flock l;
1094 memset(&l, 0, sizeof(l));
1095 l.l_type = F_WRLCK;
1096 l.l_whence = SEEK_SET;
1097 l.l_start = 0;
1098 l.l_len = 0;
1099 int r = ::fcntl(fsid_fd, F_SETLK, &l);
1100 if (r < 0) {
1101 int err = errno;
1102 dout(0) << "lock_fsid failed to lock " << basedir << "/fsid, is another ceph-osd still running? "
1103 << cpp_strerror(err) << dendl;
1104 return -err;
1105 }
1106 return 0;
1107 }
1108
1109 bool FileStore::test_mount_in_use()
1110 {
1111 dout(5) << "test_mount basedir " << basedir << " journal " << journalpath << dendl;
1112 char fn[PATH_MAX];
1113 snprintf(fn, sizeof(fn), "%s/fsid", basedir.c_str());
1114
1115 // verify fs isn't in use
1116
1117 fsid_fd = ::open(fn, O_RDWR, 0644);
1118 if (fsid_fd < 0)
1119 return 0; // no fsid, ok.
1120 bool inuse = lock_fsid() < 0;
1121 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1122 fsid_fd = -1;
1123 return inuse;
1124 }
1125
1126 int FileStore::_detect_fs()
1127 {
1128 struct statfs st;
1129 int r = ::fstatfs(basedir_fd, &st);
1130 if (r < 0)
1131 return -errno;
1132
1133 blk_size = st.f_bsize;
1134
1135 create_backend(st.f_type);
1136
1137 r = backend->detect_features();
1138 if (r < 0) {
1139 derr << "_detect_fs: detect_features error: " << cpp_strerror(r) << dendl;
1140 return r;
1141 }
1142
1143 // test xattrs
1144 char fn[PATH_MAX];
1145 int x = rand();
1146 int y = x+1;
1147 snprintf(fn, sizeof(fn), "%s/xattr_test", basedir.c_str());
1148 int tmpfd = ::open(fn, O_CREAT|O_WRONLY|O_TRUNC, 0700);
1149 if (tmpfd < 0) {
1150 int ret = -errno;
1151 derr << "_detect_fs unable to create " << fn << ": " << cpp_strerror(ret) << dendl;
1152 return ret;
1153 }
1154
1155 int ret = chain_fsetxattr(tmpfd, "user.test", &x, sizeof(x));
1156 if (ret >= 0)
1157 ret = chain_fgetxattr(tmpfd, "user.test", &y, sizeof(y));
1158 if ((ret < 0) || (x != y)) {
1159 derr << "Extended attributes don't appear to work. ";
1160 if (ret)
1161 *_dout << "Got error " + cpp_strerror(ret) + ". ";
1162 *_dout << "If you are using ext3 or ext4, be sure to mount the underlying "
1163 << "file system with the 'user_xattr' option." << dendl;
1164 ::unlink(fn);
1165 VOID_TEMP_FAILURE_RETRY(::close(tmpfd));
1166 return -ENOTSUP;
1167 }
1168
1169 char buf[1000];
1170 memset(buf, 0, sizeof(buf)); // shut up valgrind
1171 chain_fsetxattr(tmpfd, "user.test", &buf, sizeof(buf));
1172 chain_fsetxattr(tmpfd, "user.test2", &buf, sizeof(buf));
1173 chain_fsetxattr(tmpfd, "user.test3", &buf, sizeof(buf));
1174 chain_fsetxattr(tmpfd, "user.test4", &buf, sizeof(buf));
1175 ret = chain_fsetxattr(tmpfd, "user.test5", &buf, sizeof(buf));
1176 if (ret == -ENOSPC) {
1177 dout(0) << "limited size xattrs" << dendl;
1178 }
1179 chain_fremovexattr(tmpfd, "user.test");
1180 chain_fremovexattr(tmpfd, "user.test2");
1181 chain_fremovexattr(tmpfd, "user.test3");
1182 chain_fremovexattr(tmpfd, "user.test4");
1183 chain_fremovexattr(tmpfd, "user.test5");
1184
1185 ::unlink(fn);
1186 VOID_TEMP_FAILURE_RETRY(::close(tmpfd));
1187
1188 return 0;
1189 }
1190
1191 int FileStore::_sanity_check_fs()
1192 {
1193 // sanity check(s)
1194
1195 if (((int)m_filestore_journal_writeahead +
1196 (int)m_filestore_journal_parallel +
1197 (int)m_filestore_journal_trailing) > 1) {
1198 dout(0) << "mount ERROR: more than one of filestore journal {writeahead,parallel,trailing} enabled" << dendl;
1199 cerr << TEXT_RED
1200 << " ** WARNING: more than one of 'filestore journal {writeahead,parallel,trailing}'\n"
1201 << " is enabled in ceph.conf. You must choose a single journal mode."
1202 << TEXT_NORMAL << std::endl;
1203 return -EINVAL;
1204 }
1205
1206 if (!backend->can_checkpoint()) {
1207 if (!journal || !m_filestore_journal_writeahead) {
1208 dout(0) << "mount WARNING: no btrfs, and no journal in writeahead mode; data may be lost" << dendl;
1209 cerr << TEXT_RED
1210 << " ** WARNING: no btrfs AND (no journal OR journal not in writeahead mode)\n"
1211 << " For non-btrfs volumes, a writeahead journal is required to\n"
1212 << " maintain on-disk consistency in the event of a crash. Your conf\n"
1213 << " should include something like:\n"
1214 << " osd journal = /path/to/journal_device_or_file\n"
1215 << " filestore journal writeahead = true\n"
1216 << TEXT_NORMAL;
1217 }
1218 }
1219
1220 if (!journal) {
1221 dout(0) << "mount WARNING: no journal" << dendl;
1222 cerr << TEXT_YELLOW
1223 << " ** WARNING: No osd journal is configured: write latency may be high.\n"
1224 << " If you will not be using an osd journal, write latency may be\n"
1225 << " relatively high. It can be reduced somewhat by lowering\n"
1226 << " filestore_max_sync_interval, but lower values mean lower write\n"
1227 << " throughput, especially with spinning disks.\n"
1228 << TEXT_NORMAL;
1229 }
1230
1231 return 0;
1232 }
1233
1234 int FileStore::write_superblock()
1235 {
1236 bufferlist bl;
1237 ::encode(superblock, bl);
1238 return safe_write_file(basedir.c_str(), "superblock",
1239 bl.c_str(), bl.length());
1240 }
1241
1242 int FileStore::read_superblock()
1243 {
1244 bufferptr bp(PATH_MAX);
1245 int ret = safe_read_file(basedir.c_str(), "superblock",
1246 bp.c_str(), bp.length());
1247 if (ret < 0) {
1248 if (ret == -ENOENT) {
1249 // If the file doesn't exist write initial CompatSet
1250 return write_superblock();
1251 }
1252 return ret;
1253 }
1254
1255 bufferlist bl;
1256 bl.push_back(std::move(bp));
1257 bufferlist::iterator i = bl.begin();
1258 ::decode(superblock, i);
1259 return 0;
1260 }
1261
1262 int FileStore::update_version_stamp()
1263 {
1264 return write_version_stamp();
1265 }
1266
1267 int FileStore::version_stamp_is_valid(uint32_t *version)
1268 {
1269 bufferptr bp(PATH_MAX);
1270 int ret = safe_read_file(basedir.c_str(), "store_version",
1271 bp.c_str(), bp.length());
1272 if (ret < 0) {
1273 return ret;
1274 }
1275 bufferlist bl;
1276 bl.push_back(std::move(bp));
1277 bufferlist::iterator i = bl.begin();
1278 ::decode(*version, i);
1279 dout(10) << __func__ << " was " << *version << " vs target "
1280 << target_version << dendl;
1281 if (*version == target_version)
1282 return 1;
1283 else
1284 return 0;
1285 }
1286
1287 int FileStore::write_version_stamp()
1288 {
1289 dout(1) << __func__ << " " << target_version << dendl;
1290 bufferlist bl;
1291 ::encode(target_version, bl);
1292
1293 return safe_write_file(basedir.c_str(), "store_version",
1294 bl.c_str(), bl.length());
1295 }
1296
1297 int FileStore::upgrade()
1298 {
1299 dout(1) << "upgrade" << dendl;
1300 uint32_t version;
1301 int r = version_stamp_is_valid(&version);
1302
1303 if (r == -ENOENT) {
1304 derr << "The store_version file doesn't exist." << dendl;
1305 return -EINVAL;
1306 }
1307 if (r < 0)
1308 return r;
1309 if (r == 1)
1310 return 0;
1311
1312 if (version < 3) {
1313 derr << "ObjectStore is old at version " << version << ". Please upgrade to firefly v0.80.x, convert your store, and then upgrade." << dendl;
1314 return -EINVAL;
1315 }
1316
1317 // nothing necessary in FileStore for v3 -> v4 upgrade; we just need to
1318 // open up DBObjectMap with the do_upgrade flag, which we already did.
1319 update_version_stamp();
1320 return 0;
1321 }
1322
1323 int FileStore::read_op_seq(uint64_t *seq)
1324 {
1325 int op_fd = ::open(current_op_seq_fn.c_str(), O_CREAT|O_RDWR, 0644);
1326 if (op_fd < 0) {
1327 int r = -errno;
1328 assert(!m_filestore_fail_eio || r != -EIO);
1329 return r;
1330 }
1331 char s[40];
1332 memset(s, 0, sizeof(s));
1333 int ret = safe_read(op_fd, s, sizeof(s) - 1);
1334 if (ret < 0) {
1335 derr << "error reading " << current_op_seq_fn << ": " << cpp_strerror(ret) << dendl;
1336 VOID_TEMP_FAILURE_RETRY(::close(op_fd));
1337 assert(!m_filestore_fail_eio || ret != -EIO);
1338 return ret;
1339 }
1340 *seq = atoll(s);
1341 return op_fd;
1342 }
1343
1344 int FileStore::write_op_seq(int fd, uint64_t seq)
1345 {
1346 char s[30];
1347 snprintf(s, sizeof(s), "%" PRId64 "\n", seq);
1348 int ret = TEMP_FAILURE_RETRY(::pwrite(fd, s, strlen(s), 0));
1349 if (ret < 0) {
1350 ret = -errno;
1351 assert(!m_filestore_fail_eio || ret != -EIO);
1352 }
1353 return ret;
1354 }
1355
1356 int FileStore::mount()
1357 {
1358 int ret;
1359 char buf[PATH_MAX];
1360 uint64_t initial_op_seq;
1361 uuid_d omap_fsid;
1362 set<string> cluster_snaps;
1363 CompatSet supported_compat_set = get_fs_supported_compat_set();
1364
1365 dout(5) << "basedir " << basedir << " journal " << journalpath << dendl;
1366
1367 ret = set_throttle_params();
1368 if (ret != 0)
1369 goto done;
1370
1371 // make sure global base dir exists
1372 if (::access(basedir.c_str(), R_OK | W_OK)) {
1373 ret = -errno;
1374 derr << "FileStore::mount: unable to access basedir '" << basedir << "': "
1375 << cpp_strerror(ret) << dendl;
1376 goto done;
1377 }
1378
1379 // get fsid
1380 snprintf(buf, sizeof(buf), "%s/fsid", basedir.c_str());
1381 fsid_fd = ::open(buf, O_RDWR, 0644);
1382 if (fsid_fd < 0) {
1383 ret = -errno;
1384 derr << "FileStore::mount: error opening '" << buf << "': "
1385 << cpp_strerror(ret) << dendl;
1386 goto done;
1387 }
1388
1389 ret = read_fsid(fsid_fd, &fsid);
1390 if (ret < 0) {
1391 derr << "FileStore::mount: error reading fsid_fd: " << cpp_strerror(ret)
1392 << dendl;
1393 goto close_fsid_fd;
1394 }
1395
1396 if (lock_fsid() < 0) {
1397 derr << "FileStore::mount: lock_fsid failed" << dendl;
1398 ret = -EBUSY;
1399 goto close_fsid_fd;
1400 }
1401
1402 dout(10) << "mount fsid is " << fsid << dendl;
1403
1404
1405 uint32_t version_stamp;
1406 ret = version_stamp_is_valid(&version_stamp);
1407 if (ret < 0) {
1408 derr << "FileStore::mount: error in version_stamp_is_valid: "
1409 << cpp_strerror(ret) << dendl;
1410 goto close_fsid_fd;
1411 } else if (ret == 0) {
1412 if (do_update || (int)version_stamp < cct->_conf->filestore_update_to) {
1413 derr << "FileStore::mount: stale version stamp detected: "
1414 << version_stamp
1415 << ". Proceeding, do_update "
1416 << "is set, performing disk format upgrade."
1417 << dendl;
1418 do_update = true;
1419 } else {
1420 ret = -EINVAL;
1421 derr << "FileStore::mount: stale version stamp " << version_stamp
1422 << ". Please run the FileStore update script before starting the "
1423 << "OSD, or set filestore_update_to to " << target_version
1424 << " (currently " << cct->_conf->filestore_update_to << ")"
1425 << dendl;
1426 goto close_fsid_fd;
1427 }
1428 }
1429
1430 ret = read_superblock();
1431 if (ret < 0) {
1432 goto close_fsid_fd;
1433 }
1434
1435 // Check if this FileStore supports all the necessary features to mount
1436 if (supported_compat_set.compare(superblock.compat_features) == -1) {
1437 derr << "FileStore::mount: Incompatible features set "
1438 << superblock.compat_features << dendl;
1439 ret = -EINVAL;
1440 goto close_fsid_fd;
1441 }
1442
1443 // open some dir handles
1444 basedir_fd = ::open(basedir.c_str(), O_RDONLY);
1445 if (basedir_fd < 0) {
1446 ret = -errno;
1447 derr << "FileStore::mount: failed to open " << basedir << ": "
1448 << cpp_strerror(ret) << dendl;
1449 basedir_fd = -1;
1450 goto close_fsid_fd;
1451 }
1452
1453 // test for btrfs, xattrs, etc.
1454 ret = _detect_fs();
1455 if (ret < 0) {
1456 derr << "FileStore::mount: error in _detect_fs: "
1457 << cpp_strerror(ret) << dendl;
1458 goto close_basedir_fd;
1459 }
1460
1461 {
1462 list<string> ls;
1463 ret = backend->list_checkpoints(ls);
1464 if (ret < 0) {
1465 derr << "FileStore::mount: error in _list_snaps: "<< cpp_strerror(ret) << dendl;
1466 goto close_basedir_fd;
1467 }
1468
1469 long long unsigned c, prev = 0;
1470 char clustersnap[NAME_MAX];
1471 for (list<string>::iterator it = ls.begin(); it != ls.end(); ++it) {
1472 if (sscanf(it->c_str(), COMMIT_SNAP_ITEM, &c) == 1) {
1473 assert(c > prev);
1474 prev = c;
1475 snaps.push_back(c);
1476 } else if (sscanf(it->c_str(), CLUSTER_SNAP_ITEM, clustersnap) == 1)
1477 cluster_snaps.insert(*it);
1478 }
1479 }
1480
1481 if (m_osd_rollback_to_cluster_snap.length() &&
1482 cluster_snaps.count(m_osd_rollback_to_cluster_snap) == 0) {
1483 derr << "rollback to cluster snapshot '" << m_osd_rollback_to_cluster_snap << "': not found" << dendl;
1484 ret = -ENOENT;
1485 goto close_basedir_fd;
1486 }
1487
1488 char nosnapfn[200];
1489 snprintf(nosnapfn, sizeof(nosnapfn), "%s/nosnap", current_fn.c_str());
1490
1491 if (backend->can_checkpoint()) {
1492 if (snaps.empty()) {
1493 dout(0) << "mount WARNING: no consistent snaps found, store may be in inconsistent state" << dendl;
1494 } else {
1495 char s[NAME_MAX];
1496 uint64_t curr_seq = 0;
1497
1498 if (m_osd_rollback_to_cluster_snap.length()) {
1499 derr << TEXT_RED
1500 << " ** NOTE: rolling back to cluster snapshot " << m_osd_rollback_to_cluster_snap << " **"
1501 << TEXT_NORMAL
1502 << dendl;
1503 assert(cluster_snaps.count(m_osd_rollback_to_cluster_snap));
1504 snprintf(s, sizeof(s), CLUSTER_SNAP_ITEM, m_osd_rollback_to_cluster_snap.c_str());
1505 } else {
1506 {
1507 int fd = read_op_seq(&curr_seq);
1508 if (fd >= 0) {
1509 VOID_TEMP_FAILURE_RETRY(::close(fd));
1510 }
1511 }
1512 if (curr_seq)
1513 dout(10) << " current/ seq was " << curr_seq << dendl;
1514 else
1515 dout(10) << " current/ missing entirely (unusual, but okay)" << dendl;
1516
1517 uint64_t cp = snaps.back();
1518 dout(10) << " most recent snap from " << snaps << " is " << cp << dendl;
1519
1520 // if current/ is marked as non-snapshotted, refuse to roll
1521 // back (without clear direction) to avoid throwing out new
1522 // data.
1523 struct stat st;
1524 if (::stat(nosnapfn, &st) == 0) {
1525 if (!m_osd_use_stale_snap) {
1526 derr << "ERROR: " << nosnapfn << " exists, not rolling back to avoid losing new data" << dendl;
1527 derr << "Force rollback to old snapshotted version with 'osd use stale snap = true'" << dendl;
1528 derr << "config option for --osd-use-stale-snap startup argument." << dendl;
1529 ret = -ENOTSUP;
1530 goto close_basedir_fd;
1531 }
1532 derr << "WARNING: user forced start with data sequence mismatch: current was " << curr_seq
1533 << ", newest snap is " << cp << dendl;
1534 cerr << TEXT_YELLOW
1535 << " ** WARNING: forcing the use of stale snapshot data **"
1536 << TEXT_NORMAL << std::endl;
1537 }
1538
1539 dout(10) << "mount rolling back to consistent snap " << cp << dendl;
1540 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)cp);
1541 }
1542
1543 // drop current?
1544 ret = backend->rollback_to(s);
1545 if (ret) {
1546 derr << "FileStore::mount: error rolling back to " << s << ": "
1547 << cpp_strerror(ret) << dendl;
1548 goto close_basedir_fd;
1549 }
1550 }
1551 }
1552 initial_op_seq = 0;
1553
1554 current_fd = ::open(current_fn.c_str(), O_RDONLY);
1555 if (current_fd < 0) {
1556 ret = -errno;
1557 derr << "FileStore::mount: error opening: " << current_fn << ": " << cpp_strerror(ret) << dendl;
1558 goto close_basedir_fd;
1559 }
1560
1561 assert(current_fd >= 0);
1562
1563 op_fd = read_op_seq(&initial_op_seq);
1564 if (op_fd < 0) {
1565 ret = op_fd;
1566 derr << "FileStore::mount: read_op_seq failed" << dendl;
1567 goto close_current_fd;
1568 }
1569
1570 dout(5) << "mount op_seq is " << initial_op_seq << dendl;
1571 if (initial_op_seq == 0) {
1572 derr << "mount initial op seq is 0; something is wrong" << dendl;
1573 ret = -EINVAL;
1574 goto close_current_fd;
1575 }
1576
1577 if (!backend->can_checkpoint()) {
1578 // mark current/ as non-snapshotted so that we don't rollback away
1579 // from it.
1580 int r = ::creat(nosnapfn, 0644);
1581 if (r < 0) {
1582 ret = -errno;
1583 derr << "FileStore::mount: failed to create current/nosnap" << dendl;
1584 goto close_current_fd;
1585 }
1586 VOID_TEMP_FAILURE_RETRY(::close(r));
1587 } else {
1588 // clear nosnap marker, if present.
1589 ::unlink(nosnapfn);
1590 }
1591
1592 // check fsid with omap
1593 // get omap fsid
1594 int omap_fsid_fd;
1595 char omap_fsid_buf[PATH_MAX];
1596 struct ::stat omap_fsid_stat;
1597 snprintf(omap_fsid_buf, sizeof(omap_fsid_buf), "%s/osd_uuid", omap_dir.c_str());
1598 // if osd_uuid not exists, assume as this omap matchs corresponding osd
1599 if (::stat(omap_fsid_buf, &omap_fsid_stat) != 0){
1600 dout(10) << "Filestore::mount osd_uuid not found under omap, "
1601 << "assume as matched."
1602 << dendl;
1603 }else{
1604 // if osd_uuid exists, compares osd_uuid with fsid
1605 omap_fsid_fd = ::open(omap_fsid_buf, O_RDONLY, 0644);
1606 if (omap_fsid_fd < 0) {
1607 ret = -errno;
1608 derr << "FileStore::mount: error opening '" << omap_fsid_buf << "': "
1609 << cpp_strerror(ret)
1610 << dendl;
1611 goto close_current_fd;
1612 }
1613 ret = read_fsid(omap_fsid_fd, &omap_fsid);
1614 VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd));
1615 omap_fsid_fd = -1; // defensive
1616 if (ret < 0) {
1617 derr << "FileStore::mount: error reading omap_fsid_fd"
1618 << ", omap_fsid = " << omap_fsid
1619 << cpp_strerror(ret)
1620 << dendl;
1621 goto close_current_fd;
1622 }
1623 if (fsid != omap_fsid) {
1624 derr << "FileStore::mount: " << omap_fsid_buf
1625 << " has existed omap fsid " << omap_fsid
1626 << " != expected osd fsid " << fsid
1627 << dendl;
1628 ret = -EINVAL;
1629 goto close_current_fd;
1630 }
1631 }
1632
1633 dout(0) << "start omap initiation" << dendl;
1634 if (!(generic_flags & SKIP_MOUNT_OMAP)) {
1635 KeyValueDB * omap_store = KeyValueDB::create(cct,
1636 superblock.omap_backend,
1637 omap_dir);
1638 if (omap_store == NULL)
1639 {
1640 derr << "Error creating " << superblock.omap_backend << dendl;
1641 ret = -1;
1642 goto close_current_fd;
1643 }
1644
1645 if (superblock.omap_backend == "rocksdb")
1646 ret = omap_store->init(cct->_conf->filestore_rocksdb_options);
1647 else
1648 ret = omap_store->init();
1649
1650 if (ret < 0) {
1651 derr << "Error initializing omap_store: " << cpp_strerror(ret) << dendl;
1652 goto close_current_fd;
1653 }
1654
1655 stringstream err;
1656 if (omap_store->create_and_open(err)) {
1657 delete omap_store;
1658 derr << "Error initializing " << superblock.omap_backend
1659 << " : " << err.str() << dendl;
1660 ret = -1;
1661 goto close_current_fd;
1662 }
1663
1664 DBObjectMap *dbomap = new DBObjectMap(cct, omap_store);
1665 ret = dbomap->init(do_update);
1666 if (ret < 0) {
1667 delete dbomap;
1668 derr << "Error initializing DBObjectMap: " << ret << dendl;
1669 goto close_current_fd;
1670 }
1671 stringstream err2;
1672
1673 if (cct->_conf->filestore_debug_omap_check && !dbomap->check(err2)) {
1674 derr << err2.str() << dendl;
1675 delete dbomap;
1676 ret = -EINVAL;
1677 goto close_current_fd;
1678 }
1679 object_map.reset(dbomap);
1680 }
1681
1682 // journal
1683 new_journal();
1684
1685 // select journal mode?
1686 if (journal) {
1687 if (!m_filestore_journal_writeahead &&
1688 !m_filestore_journal_parallel &&
1689 !m_filestore_journal_trailing) {
1690 if (!backend->can_checkpoint()) {
1691 m_filestore_journal_writeahead = true;
1692 dout(0) << "mount: enabling WRITEAHEAD journal mode: checkpoint is not enabled" << dendl;
1693 } else {
1694 m_filestore_journal_parallel = true;
1695 dout(0) << "mount: enabling PARALLEL journal mode: fs, checkpoint is enabled" << dendl;
1696 }
1697 } else {
1698 if (m_filestore_journal_writeahead)
1699 dout(0) << "mount: WRITEAHEAD journal mode explicitly enabled in conf" << dendl;
1700 if (m_filestore_journal_parallel)
1701 dout(0) << "mount: PARALLEL journal mode explicitly enabled in conf" << dendl;
1702 if (m_filestore_journal_trailing)
1703 dout(0) << "mount: TRAILING journal mode explicitly enabled in conf" << dendl;
1704 }
1705 if (m_filestore_journal_writeahead)
1706 journal->set_wait_on_full(true);
1707 } else {
1708 dout(0) << "mount: no journal" << dendl;
1709 }
1710
1711 ret = _sanity_check_fs();
1712 if (ret) {
1713 derr << "FileStore::mount: _sanity_check_fs failed with error "
1714 << ret << dendl;
1715 goto close_current_fd;
1716 }
1717
1718 // Cleanup possibly invalid collections
1719 {
1720 vector<coll_t> collections;
1721 ret = list_collections(collections, true);
1722 if (ret < 0) {
1723 derr << "Error " << ret << " while listing collections" << dendl;
1724 goto close_current_fd;
1725 }
1726 for (vector<coll_t>::iterator i = collections.begin();
1727 i != collections.end();
1728 ++i) {
1729 Index index;
1730 ret = get_index(*i, &index);
1731 if (ret < 0) {
1732 derr << "Unable to mount index " << *i
1733 << " with error: " << ret << dendl;
1734 goto close_current_fd;
1735 }
1736 assert(NULL != index.index);
1737 RWLock::WLocker l((index.index)->access_lock);
1738
1739 index->cleanup();
1740 }
1741 }
1742 if (!m_disable_wbthrottle) {
1743 wbthrottle.start();
1744 } else {
1745 dout(0) << "mount INFO: WbThrottle is disabled" << dendl;
1746 if (cct->_conf->filestore_odsync_write) {
1747 dout(0) << "mount INFO: O_DSYNC write is enabled" << dendl;
1748 }
1749 }
1750 sync_thread.create("filestore_sync");
1751
1752 if (!(generic_flags & SKIP_JOURNAL_REPLAY)) {
1753 ret = journal_replay(initial_op_seq);
1754 if (ret < 0) {
1755 derr << "mount failed to open journal " << journalpath << ": " << cpp_strerror(ret) << dendl;
1756 if (ret == -ENOTTY) {
1757 derr << "maybe journal is not pointing to a block device and its size "
1758 << "wasn't configured?" << dendl;
1759 }
1760
1761 goto stop_sync;
1762 }
1763 }
1764
1765 {
1766 stringstream err2;
1767 if (cct->_conf->filestore_debug_omap_check && !object_map->check(err2)) {
1768 derr << err2.str() << dendl;
1769 ret = -EINVAL;
1770 goto stop_sync;
1771 }
1772 }
1773
1774 init_temp_collections();
1775
1776 journal_start();
1777
1778 op_tp.start();
1779 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
1780 (*it)->start();
1781 }
1782 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
1783 (*it)->start();
1784 }
1785
1786 timer.init();
1787
1788 // upgrade?
1789 if (cct->_conf->filestore_update_to >= (int)get_target_version()) {
1790 int err = upgrade();
1791 if (err < 0) {
1792 derr << "error converting store" << dendl;
1793 umount();
1794 return err;
1795 }
1796 }
1797
1798 // all okay.
1799 return 0;
1800
1801 stop_sync:
1802 // stop sync thread
1803 lock.Lock();
1804 stop = true;
1805 sync_cond.Signal();
1806 lock.Unlock();
1807 sync_thread.join();
1808 if (!m_disable_wbthrottle) {
1809 wbthrottle.stop();
1810 }
1811 close_current_fd:
1812 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
1813 current_fd = -1;
1814 close_basedir_fd:
1815 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1816 basedir_fd = -1;
1817 close_fsid_fd:
1818 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1819 fsid_fd = -1;
1820 done:
1821 assert(!m_filestore_fail_eio || ret != -EIO);
1822 delete backend;
1823 backend = NULL;
1824 object_map.reset();
1825 return ret;
1826 }
1827
1828 void FileStore::init_temp_collections()
1829 {
1830 dout(10) << __func__ << dendl;
1831 vector<coll_t> ls;
1832 int r = list_collections(ls, true);
1833 assert(r >= 0);
1834
1835 dout(20) << " ls " << ls << dendl;
1836
1837 SequencerPosition spos;
1838
1839 set<coll_t> temps;
1840 for (vector<coll_t>::iterator p = ls.begin(); p != ls.end(); ++p)
1841 if (p->is_temp())
1842 temps.insert(*p);
1843 dout(20) << " temps " << temps << dendl;
1844
1845 for (vector<coll_t>::iterator p = ls.begin(); p != ls.end(); ++p) {
1846 if (p->is_temp())
1847 continue;
1848 if (p->is_meta())
1849 continue;
1850 coll_t temp = p->get_temp();
1851 if (temps.count(temp)) {
1852 temps.erase(temp);
1853 } else {
1854 dout(10) << __func__ << " creating " << temp << dendl;
1855 r = _create_collection(temp, 0, spos);
1856 assert(r == 0);
1857 }
1858 }
1859
1860 for (set<coll_t>::iterator p = temps.begin(); p != temps.end(); ++p) {
1861 dout(10) << __func__ << " removing stray " << *p << dendl;
1862 r = _collection_remove_recursive(*p, spos);
1863 assert(r == 0);
1864 }
1865 }
1866
1867 int FileStore::umount()
1868 {
1869 dout(5) << "umount " << basedir << dendl;
1870
1871 flush();
1872 sync();
1873 do_force_sync();
1874
1875 lock.Lock();
1876 stop = true;
1877 sync_cond.Signal();
1878 lock.Unlock();
1879 sync_thread.join();
1880 if (!m_disable_wbthrottle){
1881 wbthrottle.stop();
1882 }
1883 op_tp.stop();
1884
1885 journal_stop();
1886 if (!(generic_flags & SKIP_JOURNAL_REPLAY))
1887 journal_write_close();
1888
1889 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
1890 (*it)->stop();
1891 }
1892 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
1893 (*it)->stop();
1894 }
1895
1896 if (fsid_fd >= 0) {
1897 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1898 fsid_fd = -1;
1899 }
1900 if (op_fd >= 0) {
1901 VOID_TEMP_FAILURE_RETRY(::close(op_fd));
1902 op_fd = -1;
1903 }
1904 if (current_fd >= 0) {
1905 VOID_TEMP_FAILURE_RETRY(::close(current_fd));
1906 current_fd = -1;
1907 }
1908 if (basedir_fd >= 0) {
1909 VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1910 basedir_fd = -1;
1911 }
1912
1913 force_sync = false;
1914
1915 delete backend;
1916 backend = NULL;
1917
1918 object_map.reset();
1919
1920 {
1921 Mutex::Locker l(sync_entry_timeo_lock);
1922 timer.shutdown();
1923 }
1924
1925 // nothing
1926 return 0;
1927 }
1928
1929
1930
1931
1932 /// -----------------------------
1933
1934 FileStore::Op *FileStore::build_op(vector<Transaction>& tls,
1935 Context *onreadable,
1936 Context *onreadable_sync,
1937 TrackedOpRef osd_op)
1938 {
1939 uint64_t bytes = 0, ops = 0;
1940 for (vector<Transaction>::iterator p = tls.begin();
1941 p != tls.end();
1942 ++p) {
1943 bytes += (*p).get_num_bytes();
1944 ops += (*p).get_num_ops();
1945 }
1946
1947 Op *o = new Op;
1948 o->start = ceph_clock_now();
1949 o->tls = std::move(tls);
1950 o->onreadable = onreadable;
1951 o->onreadable_sync = onreadable_sync;
1952 o->ops = ops;
1953 o->bytes = bytes;
1954 o->osd_op = osd_op;
1955 return o;
1956 }
1957
1958
1959
1960 void FileStore::queue_op(OpSequencer *osr, Op *o)
1961 {
1962 // queue op on sequencer, then queue sequencer for the threadpool,
1963 // so that regardless of which order the threads pick up the
1964 // sequencer, the op order will be preserved.
1965
1966 osr->queue(o);
1967 o->trace.event("queued");
1968
1969 logger->inc(l_filestore_ops);
1970 logger->inc(l_filestore_bytes, o->bytes);
1971
1972 dout(5) << "queue_op " << o << " seq " << o->op
1973 << " " << *osr
1974 << " " << o->bytes << " bytes"
1975 << " (queue has " << throttle_ops.get_current() << " ops and " << throttle_bytes.get_current() << " bytes)"
1976 << dendl;
1977 op_wq.queue(osr);
1978 }
1979
1980 void FileStore::op_queue_reserve_throttle(Op *o)
1981 {
1982 throttle_ops.get();
1983 throttle_bytes.get(o->bytes);
1984
1985 logger->set(l_filestore_op_queue_ops, throttle_ops.get_current());
1986 logger->set(l_filestore_op_queue_bytes, throttle_bytes.get_current());
1987 }
1988
1989 void FileStore::op_queue_release_throttle(Op *o)
1990 {
1991 throttle_ops.put();
1992 throttle_bytes.put(o->bytes);
1993 logger->set(l_filestore_op_queue_ops, throttle_ops.get_current());
1994 logger->set(l_filestore_op_queue_bytes, throttle_bytes.get_current());
1995 }
1996
1997 void FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)
1998 {
1999 if (!m_disable_wbthrottle) {
2000 wbthrottle.throttle();
2001 }
2002 // inject a stall?
2003 if (cct->_conf->filestore_inject_stall) {
2004 int orig = cct->_conf->filestore_inject_stall;
2005 dout(5) << "_do_op filestore_inject_stall " << orig << ", sleeping" << dendl;
2006 sleep(orig);
2007 cct->_conf->set_val("filestore_inject_stall", "0");
2008 dout(5) << "_do_op done stalling" << dendl;
2009 }
2010
2011 osr->apply_lock.Lock();
2012 Op *o = osr->peek_queue();
2013 o->trace.event("op_apply_start");
2014 apply_manager.op_apply_start(o->op);
2015 dout(5) << "_do_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " start" << dendl;
2016 o->trace.event("_do_transactions start");
2017 int r = _do_transactions(o->tls, o->op, &handle);
2018 o->trace.event("op_apply_finish");
2019 apply_manager.op_apply_finish(o->op);
2020 dout(10) << "_do_op " << o << " seq " << o->op << " r = " << r
2021 << ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl;
2022
2023 o->tls.clear();
2024
2025 }
2026
2027 void FileStore::_finish_op(OpSequencer *osr)
2028 {
2029 list<Context*> to_queue;
2030 Op *o = osr->dequeue(&to_queue);
2031
2032 utime_t lat = ceph_clock_now();
2033 lat -= o->start;
2034
2035 dout(10) << "_finish_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " lat " << lat << dendl;
2036 osr->apply_lock.Unlock(); // locked in _do_op
2037 o->trace.event("_finish_op");
2038
2039 // called with tp lock held
2040 op_queue_release_throttle(o);
2041
2042 logger->tinc(l_filestore_apply_latency, lat);
2043
2044 if (o->onreadable_sync) {
2045 o->onreadable_sync->complete(0);
2046 }
2047 if (o->onreadable) {
2048 apply_finishers[osr->id % m_apply_finisher_num]->queue(o->onreadable);
2049 }
2050 if (!to_queue.empty()) {
2051 apply_finishers[osr->id % m_apply_finisher_num]->queue(to_queue);
2052 }
2053 delete o;
2054 }
2055
2056
2057 struct C_JournaledAhead : public Context {
2058 FileStore *fs;
2059 FileStore::OpSequencer *osr;
2060 FileStore::Op *o;
2061 Context *ondisk;
2062
2063 C_JournaledAhead(FileStore *f, FileStore::OpSequencer *os, FileStore::Op *o, Context *ondisk):
2064 fs(f), osr(os), o(o), ondisk(ondisk) { }
2065 void finish(int r) override {
2066 fs->_journaled_ahead(osr, o, ondisk);
2067 }
2068 };
2069
2070 int FileStore::queue_transactions(Sequencer *posr, vector<Transaction>& tls,
2071 TrackedOpRef osd_op,
2072 ThreadPool::TPHandle *handle)
2073 {
2074 Context *onreadable;
2075 Context *ondisk;
2076 Context *onreadable_sync;
2077 ObjectStore::Transaction::collect_contexts(
2078 tls, &onreadable, &ondisk, &onreadable_sync);
2079
2080 if (cct->_conf->objectstore_blackhole) {
2081 dout(0) << __func__ << " objectstore_blackhole = TRUE, dropping transaction"
2082 << dendl;
2083 delete ondisk;
2084 delete onreadable;
2085 delete onreadable_sync;
2086 return 0;
2087 }
2088
2089 utime_t start = ceph_clock_now();
2090 // set up the sequencer
2091 OpSequencer *osr;
2092 assert(posr);
2093 if (posr->p) {
2094 osr = static_cast<OpSequencer *>(posr->p.get());
2095 dout(5) << "queue_transactions existing " << osr << " " << *osr << dendl;
2096 } else {
2097 osr = new OpSequencer(cct, next_osr_id.inc());
2098 osr->set_cct(cct);
2099 osr->parent = posr;
2100 posr->p = osr;
2101 dout(5) << "queue_transactions new " << osr << " " << *osr << dendl;
2102 }
2103
2104 // used to include osr information in tracepoints during transaction apply
2105 for (vector<Transaction>::iterator i = tls.begin(); i != tls.end(); ++i) {
2106 (*i).set_osr(osr);
2107 }
2108
2109 ZTracer::Trace trace;
2110 if (osd_op && osd_op->pg_trace) {
2111 osd_op->store_trace.init("filestore op", &trace_endpoint, &osd_op->pg_trace);
2112 trace = osd_op->store_trace;
2113 }
2114
2115 if (journal && journal->is_writeable() && !m_filestore_journal_trailing) {
2116 Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
2117
2118 //prepare and encode transactions data out of lock
2119 bufferlist tbl;
2120 int orig_len = journal->prepare_entry(o->tls, &tbl);
2121
2122 if (handle)
2123 handle->suspend_tp_timeout();
2124
2125 op_queue_reserve_throttle(o);
2126 journal->reserve_throttle_and_backoff(tbl.length());
2127
2128 if (handle)
2129 handle->reset_tp_timeout();
2130
2131 uint64_t op_num = submit_manager.op_submit_start();
2132 o->op = op_num;
2133 trace.keyval("opnum", op_num);
2134
2135 if (m_filestore_do_dump)
2136 dump_transactions(o->tls, o->op, osr);
2137
2138 if (m_filestore_journal_parallel) {
2139 dout(5) << "queue_transactions (parallel) " << o->op << " " << o->tls << dendl;
2140
2141 trace.keyval("journal mode", "parallel");
2142 trace.event("journal started");
2143 _op_journal_transactions(tbl, orig_len, o->op, ondisk, osd_op);
2144
2145 // queue inside submit_manager op submission lock
2146 queue_op(osr, o);
2147 trace.event("op queued");
2148 } else if (m_filestore_journal_writeahead) {
2149 dout(5) << "queue_transactions (writeahead) " << o->op << " " << o->tls << dendl;
2150
2151 osr->queue_journal(o->op);
2152
2153 trace.keyval("journal mode", "writeahead");
2154 trace.event("journal started");
2155 _op_journal_transactions(tbl, orig_len, o->op,
2156 new C_JournaledAhead(this, osr, o, ondisk),
2157 osd_op);
2158 } else {
2159 ceph_abort();
2160 }
2161 submit_manager.op_submit_finish(op_num);
2162 utime_t end = ceph_clock_now();
2163 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2164 return 0;
2165 }
2166
2167 if (!journal) {
2168 Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
2169 dout(5) << __func__ << " (no journal) " << o << " " << tls << dendl;
2170
2171 if (handle)
2172 handle->suspend_tp_timeout();
2173
2174 op_queue_reserve_throttle(o);
2175
2176 if (handle)
2177 handle->reset_tp_timeout();
2178
2179 uint64_t op_num = submit_manager.op_submit_start();
2180 o->op = op_num;
2181
2182 if (m_filestore_do_dump)
2183 dump_transactions(o->tls, o->op, osr);
2184
2185 queue_op(osr, o);
2186 trace.keyval("opnum", op_num);
2187 trace.keyval("journal mode", "none");
2188 trace.event("op queued");
2189
2190 if (ondisk)
2191 apply_manager.add_waiter(op_num, ondisk);
2192 submit_manager.op_submit_finish(op_num);
2193 utime_t end = ceph_clock_now();
2194 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2195 return 0;
2196 }
2197
2198 assert(journal);
2199 //prepare and encode transactions data out of lock
2200 bufferlist tbl;
2201 int orig_len = -1;
2202 if (journal->is_writeable()) {
2203 orig_len = journal->prepare_entry(tls, &tbl);
2204 }
2205 uint64_t op = submit_manager.op_submit_start();
2206 dout(5) << "queue_transactions (trailing journal) " << op << " " << tls << dendl;
2207
2208 if (m_filestore_do_dump)
2209 dump_transactions(tls, op, osr);
2210
2211 trace.event("op_apply_start");
2212 trace.keyval("opnum", op);
2213 trace.keyval("journal mode", "trailing");
2214 apply_manager.op_apply_start(op);
2215 trace.event("do_transactions");
2216 int r = do_transactions(tls, op);
2217
2218 if (r >= 0) {
2219 trace.event("journal started");
2220 _op_journal_transactions(tbl, orig_len, op, ondisk, osd_op);
2221 } else {
2222 delete ondisk;
2223 }
2224
2225 // start on_readable finisher after we queue journal item, as on_readable callback
2226 // is allowed to delete the Transaction
2227 if (onreadable_sync) {
2228 onreadable_sync->complete(r);
2229 }
2230 apply_finishers[osr->id % m_apply_finisher_num]->queue(onreadable, r);
2231
2232 submit_manager.op_submit_finish(op);
2233 trace.event("op_apply_finish");
2234 apply_manager.op_apply_finish(op);
2235
2236 utime_t end = ceph_clock_now();
2237 logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2238 return r;
2239 }
2240
2241 void FileStore::_journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk)
2242 {
2243 dout(5) << "_journaled_ahead " << o << " seq " << o->op << " " << *osr << " " << o->tls << dendl;
2244
2245 o->trace.event("writeahead journal finished");
2246
2247 // this should queue in order because the journal does it's completions in order.
2248 queue_op(osr, o);
2249
2250 list<Context*> to_queue;
2251 osr->dequeue_journal(&to_queue);
2252
2253 // do ondisk completions async, to prevent any onreadable_sync completions
2254 // getting blocked behind an ondisk completion.
2255 if (ondisk) {
2256 dout(10) << " queueing ondisk " << ondisk << dendl;
2257 ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(ondisk);
2258 }
2259 if (!to_queue.empty()) {
2260 ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(to_queue);
2261 }
2262 }
2263
2264 int FileStore::_do_transactions(
2265 vector<Transaction> &tls,
2266 uint64_t op_seq,
2267 ThreadPool::TPHandle *handle)
2268 {
2269 int trans_num = 0;
2270
2271 for (vector<Transaction>::iterator p = tls.begin();
2272 p != tls.end();
2273 ++p, trans_num++) {
2274 _do_transaction(*p, op_seq, trans_num, handle);
2275 if (handle)
2276 handle->reset_tp_timeout();
2277 }
2278
2279 return 0;
2280 }
2281
2282 void FileStore::_set_global_replay_guard(const coll_t& cid,
2283 const SequencerPosition &spos)
2284 {
2285 if (backend->can_checkpoint())
2286 return;
2287
2288 // sync all previous operations on this sequencer
2289 int ret = object_map->sync();
2290 if (ret < 0) {
2291 derr << __func__ << " : omap sync error " << cpp_strerror(ret) << dendl;
2292 assert(0 == "_set_global_replay_guard failed");
2293 }
2294 ret = sync_filesystem(basedir_fd);
2295 if (ret < 0) {
2296 derr << __func__ << " : sync_filesystem error " << cpp_strerror(ret) << dendl;
2297 assert(0 == "_set_global_replay_guard failed");
2298 }
2299
2300 char fn[PATH_MAX];
2301 get_cdir(cid, fn, sizeof(fn));
2302 int fd = ::open(fn, O_RDONLY);
2303 if (fd < 0) {
2304 int err = errno;
2305 derr << __func__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
2306 assert(0 == "_set_global_replay_guard failed");
2307 }
2308
2309 _inject_failure();
2310
2311 // then record that we did it
2312 bufferlist v;
2313 ::encode(spos, v);
2314 int r = chain_fsetxattr<true, true>(
2315 fd, GLOBAL_REPLAY_GUARD_XATTR, v.c_str(), v.length());
2316 if (r < 0) {
2317 derr << __func__ << ": fsetxattr " << GLOBAL_REPLAY_GUARD_XATTR
2318 << " got " << cpp_strerror(r) << dendl;
2319 assert(0 == "fsetxattr failed");
2320 }
2321
2322 // and make sure our xattr is durable.
2323 ::fsync(fd);
2324
2325 _inject_failure();
2326
2327 VOID_TEMP_FAILURE_RETRY(::close(fd));
2328 dout(10) << __func__ << ": " << spos << " done" << dendl;
2329 }
2330
2331 int FileStore::_check_global_replay_guard(const coll_t& cid,
2332 const SequencerPosition& spos)
2333 {
2334 char fn[PATH_MAX];
2335 get_cdir(cid, fn, sizeof(fn));
2336 int fd = ::open(fn, O_RDONLY);
2337 if (fd < 0) {
2338 dout(10) << __func__ << ": " << cid << " dne" << dendl;
2339 return 1; // if collection does not exist, there is no guard, and we can replay.
2340 }
2341
2342 char buf[100];
2343 int r = chain_fgetxattr(fd, GLOBAL_REPLAY_GUARD_XATTR, buf, sizeof(buf));
2344 if (r < 0) {
2345 dout(20) << __func__ << " no xattr" << dendl;
2346 assert(!m_filestore_fail_eio || r != -EIO);
2347 VOID_TEMP_FAILURE_RETRY(::close(fd));
2348 return 1; // no xattr
2349 }
2350 bufferlist bl;
2351 bl.append(buf, r);
2352
2353 SequencerPosition opos;
2354 bufferlist::iterator p = bl.begin();
2355 ::decode(opos, p);
2356
2357 VOID_TEMP_FAILURE_RETRY(::close(fd));
2358 return spos >= opos ? 1 : -1;
2359 }
2360
2361
2362 void FileStore::_set_replay_guard(const coll_t& cid,
2363 const SequencerPosition &spos,
2364 bool in_progress=false)
2365 {
2366 char fn[PATH_MAX];
2367 get_cdir(cid, fn, sizeof(fn));
2368 int fd = ::open(fn, O_RDONLY);
2369 if (fd < 0) {
2370 int err = errno;
2371 derr << "_set_replay_guard " << cid << " error " << cpp_strerror(err) << dendl;
2372 assert(0 == "_set_replay_guard failed");
2373 }
2374 _set_replay_guard(fd, spos, 0, in_progress);
2375 VOID_TEMP_FAILURE_RETRY(::close(fd));
2376 }
2377
2378
2379 void FileStore::_set_replay_guard(int fd,
2380 const SequencerPosition& spos,
2381 const ghobject_t *hoid,
2382 bool in_progress)
2383 {
2384 if (backend->can_checkpoint())
2385 return;
2386
2387 dout(10) << "_set_replay_guard " << spos << (in_progress ? " START" : "") << dendl;
2388
2389 _inject_failure();
2390
2391 // first make sure the previous operation commits
2392 ::fsync(fd);
2393
2394 if (!in_progress) {
2395 // sync object_map too. even if this object has a header or keys,
2396 // it have had them in the past and then removed them, so always
2397 // sync.
2398 object_map->sync(hoid, &spos);
2399 }
2400
2401 _inject_failure();
2402
2403 // then record that we did it
2404 bufferlist v(40);
2405 ::encode(spos, v);
2406 ::encode(in_progress, v);
2407 int r = chain_fsetxattr<true, true>(
2408 fd, REPLAY_GUARD_XATTR, v.c_str(), v.length());
2409 if (r < 0) {
2410 derr << "fsetxattr " << REPLAY_GUARD_XATTR << " got " << cpp_strerror(r) << dendl;
2411 assert(0 == "fsetxattr failed");
2412 }
2413
2414 // and make sure our xattr is durable.
2415 ::fsync(fd);
2416
2417 _inject_failure();
2418
2419 dout(10) << "_set_replay_guard " << spos << " done" << dendl;
2420 }
2421
2422 void FileStore::_close_replay_guard(const coll_t& cid,
2423 const SequencerPosition &spos)
2424 {
2425 char fn[PATH_MAX];
2426 get_cdir(cid, fn, sizeof(fn));
2427 int fd = ::open(fn, O_RDONLY);
2428 if (fd < 0) {
2429 int err = errno;
2430 derr << "_close_replay_guard " << cid << " error " << cpp_strerror(err) << dendl;
2431 assert(0 == "_close_replay_guard failed");
2432 }
2433 _close_replay_guard(fd, spos);
2434 VOID_TEMP_FAILURE_RETRY(::close(fd));
2435 }
2436
2437 void FileStore::_close_replay_guard(int fd, const SequencerPosition& spos,
2438 const ghobject_t *hoid)
2439 {
2440 if (backend->can_checkpoint())
2441 return;
2442
2443 dout(10) << "_close_replay_guard " << spos << dendl;
2444
2445 _inject_failure();
2446
2447 // sync object_map too. even if this object has a header or keys,
2448 // it have had them in the past and then removed them, so always
2449 // sync.
2450 object_map->sync(hoid, &spos);
2451
2452 // then record that we are done with this operation
2453 bufferlist v(40);
2454 ::encode(spos, v);
2455 bool in_progress = false;
2456 ::encode(in_progress, v);
2457 int r = chain_fsetxattr<true, true>(
2458 fd, REPLAY_GUARD_XATTR, v.c_str(), v.length());
2459 if (r < 0) {
2460 derr << "fsetxattr " << REPLAY_GUARD_XATTR << " got " << cpp_strerror(r) << dendl;
2461 assert(0 == "fsetxattr failed");
2462 }
2463
2464 // and make sure our xattr is durable.
2465 ::fsync(fd);
2466
2467 _inject_failure();
2468
2469 dout(10) << "_close_replay_guard " << spos << " done" << dendl;
2470 }
2471
2472 int FileStore::_check_replay_guard(const coll_t& cid, const ghobject_t &oid,
2473 const SequencerPosition& spos)
2474 {
2475 if (!replaying || backend->can_checkpoint())
2476 return 1;
2477
2478 int r = _check_global_replay_guard(cid, spos);
2479 if (r < 0)
2480 return r;
2481
2482 FDRef fd;
2483 r = lfn_open(cid, oid, false, &fd);
2484 if (r < 0) {
2485 dout(10) << "_check_replay_guard " << cid << " " << oid << " dne" << dendl;
2486 return 1; // if file does not exist, there is no guard, and we can replay.
2487 }
2488 int ret = _check_replay_guard(**fd, spos);
2489 lfn_close(fd);
2490 return ret;
2491 }
2492
2493 int FileStore::_check_replay_guard(const coll_t& cid, const SequencerPosition& spos)
2494 {
2495 if (!replaying || backend->can_checkpoint())
2496 return 1;
2497
2498 char fn[PATH_MAX];
2499 get_cdir(cid, fn, sizeof(fn));
2500 int fd = ::open(fn, O_RDONLY);
2501 if (fd < 0) {
2502 dout(10) << "_check_replay_guard " << cid << " dne" << dendl;
2503 return 1; // if collection does not exist, there is no guard, and we can replay.
2504 }
2505 int ret = _check_replay_guard(fd, spos);
2506 VOID_TEMP_FAILURE_RETRY(::close(fd));
2507 return ret;
2508 }
2509
2510 int FileStore::_check_replay_guard(int fd, const SequencerPosition& spos)
2511 {
2512 if (!replaying || backend->can_checkpoint())
2513 return 1;
2514
2515 char buf[100];
2516 int r = chain_fgetxattr(fd, REPLAY_GUARD_XATTR, buf, sizeof(buf));
2517 if (r < 0) {
2518 dout(20) << "_check_replay_guard no xattr" << dendl;
2519 assert(!m_filestore_fail_eio || r != -EIO);
2520 return 1; // no xattr
2521 }
2522 bufferlist bl;
2523 bl.append(buf, r);
2524
2525 SequencerPosition opos;
2526 bufferlist::iterator p = bl.begin();
2527 ::decode(opos, p);
2528 bool in_progress = false;
2529 if (!p.end()) // older journals don't have this
2530 ::decode(in_progress, p);
2531 if (opos > spos) {
2532 dout(10) << "_check_replay_guard object has " << opos << " > current pos " << spos
2533 << ", now or in future, SKIPPING REPLAY" << dendl;
2534 return -1;
2535 } else if (opos == spos) {
2536 if (in_progress) {
2537 dout(10) << "_check_replay_guard object has " << opos << " == current pos " << spos
2538 << ", in_progress=true, CONDITIONAL REPLAY" << dendl;
2539 return 0;
2540 } else {
2541 dout(10) << "_check_replay_guard object has " << opos << " == current pos " << spos
2542 << ", in_progress=false, SKIPPING REPLAY" << dendl;
2543 return -1;
2544 }
2545 } else {
2546 dout(10) << "_check_replay_guard object has " << opos << " < current pos " << spos
2547 << ", in past, will replay" << dendl;
2548 return 1;
2549 }
2550 }
2551
2552 void FileStore::_do_transaction(
2553 Transaction& t, uint64_t op_seq, int trans_num,
2554 ThreadPool::TPHandle *handle)
2555 {
2556 dout(10) << "_do_transaction on " << &t << dendl;
2557
2558 #ifdef WITH_LTTNG
2559 const char *osr_name = t.get_osr() ? static_cast<OpSequencer*>(t.get_osr())->get_name().c_str() : "<NULL>";
2560 #endif
2561
2562 Transaction::iterator i = t.begin();
2563
2564 SequencerPosition spos(op_seq, trans_num, 0);
2565 while (i.have_op()) {
2566 if (handle)
2567 handle->reset_tp_timeout();
2568
2569 Transaction::Op *op = i.decode_op();
2570 int r = 0;
2571
2572 _inject_failure();
2573
2574 switch (op->op) {
2575 case Transaction::OP_NOP:
2576 break;
2577 case Transaction::OP_TOUCH:
2578 {
2579 const coll_t &_cid = i.get_cid(op->cid);
2580 const ghobject_t &oid = i.get_oid(op->oid);
2581 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2582 _cid : _cid.get_temp();
2583 tracepoint(objectstore, touch_enter, osr_name);
2584 if (_check_replay_guard(cid, oid, spos) > 0)
2585 r = _touch(cid, oid);
2586 tracepoint(objectstore, touch_exit, r);
2587 }
2588 break;
2589
2590 case Transaction::OP_WRITE:
2591 {
2592 const coll_t &_cid = i.get_cid(op->cid);
2593 const ghobject_t &oid = i.get_oid(op->oid);
2594 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2595 _cid : _cid.get_temp();
2596 uint64_t off = op->off;
2597 uint64_t len = op->len;
2598 uint32_t fadvise_flags = i.get_fadvise_flags();
2599 bufferlist bl;
2600 i.decode_bl(bl);
2601 tracepoint(objectstore, write_enter, osr_name, off, len);
2602 if (_check_replay_guard(cid, oid, spos) > 0)
2603 r = _write(cid, oid, off, len, bl, fadvise_flags);
2604 tracepoint(objectstore, write_exit, r);
2605 }
2606 break;
2607
2608 case Transaction::OP_ZERO:
2609 {
2610 const coll_t &_cid = i.get_cid(op->cid);
2611 const ghobject_t &oid = i.get_oid(op->oid);
2612 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2613 _cid : _cid.get_temp();
2614 uint64_t off = op->off;
2615 uint64_t len = op->len;
2616 tracepoint(objectstore, zero_enter, osr_name, off, len);
2617 if (_check_replay_guard(cid, oid, spos) > 0)
2618 r = _zero(cid, oid, off, len);
2619 tracepoint(objectstore, zero_exit, r);
2620 }
2621 break;
2622
2623 case Transaction::OP_TRIMCACHE:
2624 {
2625 // deprecated, no-op
2626 }
2627 break;
2628
2629 case Transaction::OP_TRUNCATE:
2630 {
2631 const coll_t &_cid = i.get_cid(op->cid);
2632 const ghobject_t &oid = i.get_oid(op->oid);
2633 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2634 _cid : _cid.get_temp();
2635 uint64_t off = op->off;
2636 tracepoint(objectstore, truncate_enter, osr_name, off);
2637 if (_check_replay_guard(cid, oid, spos) > 0)
2638 r = _truncate(cid, oid, off);
2639 tracepoint(objectstore, truncate_exit, r);
2640 }
2641 break;
2642
2643 case Transaction::OP_REMOVE:
2644 {
2645 const coll_t &_cid = i.get_cid(op->cid);
2646 const ghobject_t &oid = i.get_oid(op->oid);
2647 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2648 _cid : _cid.get_temp();
2649 tracepoint(objectstore, remove_enter, osr_name);
2650 if (_check_replay_guard(cid, oid, spos) > 0)
2651 r = _remove(cid, oid, spos);
2652 tracepoint(objectstore, remove_exit, r);
2653 }
2654 break;
2655
2656 case Transaction::OP_SETATTR:
2657 {
2658 const coll_t &_cid = i.get_cid(op->cid);
2659 const ghobject_t &oid = i.get_oid(op->oid);
2660 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2661 _cid : _cid.get_temp();
2662 string name = i.decode_string();
2663 bufferlist bl;
2664 i.decode_bl(bl);
2665 tracepoint(objectstore, setattr_enter, osr_name);
2666 if (_check_replay_guard(cid, oid, spos) > 0) {
2667 map<string, bufferptr> to_set;
2668 to_set[name] = bufferptr(bl.c_str(), bl.length());
2669 r = _setattrs(cid, oid, to_set, spos);
2670 if (r == -ENOSPC)
2671 dout(0) << " ENOSPC on setxattr on " << cid << "/" << oid
2672 << " name " << name << " size " << bl.length() << dendl;
2673 }
2674 tracepoint(objectstore, setattr_exit, r);
2675 }
2676 break;
2677
2678 case Transaction::OP_SETATTRS:
2679 {
2680 const coll_t &_cid = i.get_cid(op->cid);
2681 const ghobject_t &oid = i.get_oid(op->oid);
2682 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2683 _cid : _cid.get_temp();
2684 map<string, bufferptr> aset;
2685 i.decode_attrset(aset);
2686 tracepoint(objectstore, setattrs_enter, osr_name);
2687 if (_check_replay_guard(cid, oid, spos) > 0)
2688 r = _setattrs(cid, oid, aset, spos);
2689 tracepoint(objectstore, setattrs_exit, r);
2690 if (r == -ENOSPC)
2691 dout(0) << " ENOSPC on setxattrs on " << cid << "/" << oid << dendl;
2692 }
2693 break;
2694
2695 case Transaction::OP_RMATTR:
2696 {
2697 const coll_t &_cid = i.get_cid(op->cid);
2698 const ghobject_t &oid = i.get_oid(op->oid);
2699 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2700 _cid : _cid.get_temp();
2701 string name = i.decode_string();
2702 tracepoint(objectstore, rmattr_enter, osr_name);
2703 if (_check_replay_guard(cid, oid, spos) > 0)
2704 r = _rmattr(cid, oid, name.c_str(), spos);
2705 tracepoint(objectstore, rmattr_exit, r);
2706 }
2707 break;
2708
2709 case Transaction::OP_RMATTRS:
2710 {
2711 const coll_t &_cid = i.get_cid(op->cid);
2712 const ghobject_t &oid = i.get_oid(op->oid);
2713 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2714 _cid : _cid.get_temp();
2715 tracepoint(objectstore, rmattrs_enter, osr_name);
2716 if (_check_replay_guard(cid, oid, spos) > 0)
2717 r = _rmattrs(cid, oid, spos);
2718 tracepoint(objectstore, rmattrs_exit, r);
2719 }
2720 break;
2721
2722 case Transaction::OP_CLONE:
2723 {
2724 const coll_t &_cid = i.get_cid(op->cid);
2725 const ghobject_t &oid = i.get_oid(op->oid);
2726 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2727 _cid : _cid.get_temp();
2728 const ghobject_t &noid = i.get_oid(op->dest_oid);
2729 tracepoint(objectstore, clone_enter, osr_name);
2730 r = _clone(cid, oid, noid, spos);
2731 tracepoint(objectstore, clone_exit, r);
2732 }
2733 break;
2734
2735 case Transaction::OP_CLONERANGE:
2736 {
2737 const coll_t &_cid = i.get_cid(op->cid);
2738 const ghobject_t &oid = i.get_oid(op->oid);
2739 const ghobject_t &noid = i.get_oid(op->dest_oid);
2740 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2741 _cid : _cid.get_temp();
2742 const coll_t &ncid = !_need_temp_object_collection(_cid, noid) ?
2743 _cid : _cid.get_temp();
2744 uint64_t off = op->off;
2745 uint64_t len = op->len;
2746 tracepoint(objectstore, clone_range_enter, osr_name, len);
2747 r = _clone_range(cid, oid, ncid, noid, off, len, off, spos);
2748 tracepoint(objectstore, clone_range_exit, r);
2749 }
2750 break;
2751
2752 case Transaction::OP_CLONERANGE2:
2753 {
2754 const coll_t &_cid = i.get_cid(op->cid);
2755 const ghobject_t &oid = i.get_oid(op->oid);
2756 const ghobject_t &noid = i.get_oid(op->dest_oid);
2757 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2758 _cid : _cid.get_temp();
2759 const coll_t &ncid = !_need_temp_object_collection(_cid, noid) ?
2760 _cid : _cid.get_temp();
2761 uint64_t srcoff = op->off;
2762 uint64_t len = op->len;
2763 uint64_t dstoff = op->dest_off;
2764 tracepoint(objectstore, clone_range2_enter, osr_name, len);
2765 r = _clone_range(cid, oid, ncid, noid, srcoff, len, dstoff, spos);
2766 tracepoint(objectstore, clone_range2_exit, r);
2767 }
2768 break;
2769
2770 case Transaction::OP_MKCOLL:
2771 {
2772 const coll_t &cid = i.get_cid(op->cid);
2773 tracepoint(objectstore, mkcoll_enter, osr_name);
2774 if (_check_replay_guard(cid, spos) > 0)
2775 r = _create_collection(cid, op->split_bits, spos);
2776 tracepoint(objectstore, mkcoll_exit, r);
2777 }
2778 break;
2779
2780 case Transaction::OP_COLL_SET_BITS:
2781 {
2782 const coll_t &cid = i.get_cid(op->cid);
2783 int bits = op->split_bits;
2784 r = _collection_set_bits(cid, bits);
2785 }
2786 break;
2787
2788 case Transaction::OP_COLL_HINT:
2789 {
2790 const coll_t &cid = i.get_cid(op->cid);
2791 uint32_t type = op->hint_type;
2792 bufferlist hint;
2793 i.decode_bl(hint);
2794 bufferlist::iterator hiter = hint.begin();
2795 if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) {
2796 uint32_t pg_num;
2797 uint64_t num_objs;
2798 ::decode(pg_num, hiter);
2799 ::decode(num_objs, hiter);
2800 if (_check_replay_guard(cid, spos) > 0) {
2801 r = _collection_hint_expected_num_objs(cid, pg_num, num_objs, spos);
2802 }
2803 } else {
2804 // Ignore the hint
2805 dout(10) << "Unrecognized collection hint type: " << type << dendl;
2806 }
2807 }
2808 break;
2809
2810 case Transaction::OP_RMCOLL:
2811 {
2812 const coll_t &cid = i.get_cid(op->cid);
2813 tracepoint(objectstore, rmcoll_enter, osr_name);
2814 if (_check_replay_guard(cid, spos) > 0)
2815 r = _destroy_collection(cid);
2816 tracepoint(objectstore, rmcoll_exit, r);
2817 }
2818 break;
2819
2820 case Transaction::OP_COLL_ADD:
2821 {
2822 const coll_t &ocid = i.get_cid(op->cid);
2823 const coll_t &ncid = i.get_cid(op->dest_cid);
2824 const ghobject_t &oid = i.get_oid(op->oid);
2825
2826 assert(oid.hobj.pool >= -1);
2827
2828 // always followed by OP_COLL_REMOVE
2829 Transaction::Op *op2 = i.decode_op();
2830 const coll_t &ocid2 = i.get_cid(op2->cid);
2831 const ghobject_t &oid2 = i.get_oid(op2->oid);
2832 assert(op2->op == Transaction::OP_COLL_REMOVE);
2833 assert(ocid2 == ocid);
2834 assert(oid2 == oid);
2835
2836 tracepoint(objectstore, coll_add_enter);
2837 r = _collection_add(ncid, ocid, oid, spos);
2838 tracepoint(objectstore, coll_add_exit, r);
2839 spos.op++;
2840 if (r < 0)
2841 break;
2842 tracepoint(objectstore, coll_remove_enter, osr_name);
2843 if (_check_replay_guard(ocid, oid, spos) > 0)
2844 r = _remove(ocid, oid, spos);
2845 tracepoint(objectstore, coll_remove_exit, r);
2846 }
2847 break;
2848
2849 case Transaction::OP_COLL_MOVE:
2850 {
2851 // WARNING: this is deprecated and buggy; only here to replay old journals.
2852 const coll_t &ocid = i.get_cid(op->cid);
2853 const coll_t &ncid = i.get_cid(op->dest_cid);
2854 const ghobject_t &oid = i.get_oid(op->oid);
2855 tracepoint(objectstore, coll_move_enter);
2856 r = _collection_add(ocid, ncid, oid, spos);
2857 if (r == 0 &&
2858 (_check_replay_guard(ocid, oid, spos) > 0))
2859 r = _remove(ocid, oid, spos);
2860 tracepoint(objectstore, coll_move_exit, r);
2861 }
2862 break;
2863
2864 case Transaction::OP_COLL_MOVE_RENAME:
2865 {
2866 const coll_t &_oldcid = i.get_cid(op->cid);
2867 const ghobject_t &oldoid = i.get_oid(op->oid);
2868 const coll_t &_newcid = i.get_cid(op->dest_cid);
2869 const ghobject_t &newoid = i.get_oid(op->dest_oid);
2870 const coll_t &oldcid = !_need_temp_object_collection(_oldcid, oldoid) ?
2871 _oldcid : _oldcid.get_temp();
2872 const coll_t &newcid = !_need_temp_object_collection(_newcid, newoid) ?
2873 _oldcid : _newcid.get_temp();
2874 tracepoint(objectstore, coll_move_rename_enter);
2875 r = _collection_move_rename(oldcid, oldoid, newcid, newoid, spos);
2876 tracepoint(objectstore, coll_move_rename_exit, r);
2877 }
2878 break;
2879
2880 case Transaction::OP_TRY_RENAME:
2881 {
2882 const coll_t &_cid = i.get_cid(op->cid);
2883 const ghobject_t &oldoid = i.get_oid(op->oid);
2884 const ghobject_t &newoid = i.get_oid(op->dest_oid);
2885 const coll_t &oldcid = !_need_temp_object_collection(_cid, oldoid) ?
2886 _cid : _cid.get_temp();
2887 const coll_t &newcid = !_need_temp_object_collection(_cid, newoid) ?
2888 _cid : _cid.get_temp();
2889 tracepoint(objectstore, coll_try_rename_enter);
2890 r = _collection_move_rename(oldcid, oldoid, newcid, newoid, spos, true);
2891 tracepoint(objectstore, coll_try_rename_exit, r);
2892 }
2893 break;
2894
2895 case Transaction::OP_COLL_SETATTR:
2896 case Transaction::OP_COLL_RMATTR:
2897 assert(0 == "collection attr methods no longer implemented");
2898 break;
2899
2900 case Transaction::OP_STARTSYNC:
2901 tracepoint(objectstore, startsync_enter, osr_name);
2902 _start_sync();
2903 tracepoint(objectstore, startsync_exit);
2904 break;
2905
2906 case Transaction::OP_COLL_RENAME:
2907 {
2908 r = -EOPNOTSUPP;
2909 }
2910 break;
2911
2912 case Transaction::OP_OMAP_CLEAR:
2913 {
2914 const coll_t &_cid = i.get_cid(op->cid);
2915 const ghobject_t &oid = i.get_oid(op->oid);
2916 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2917 _cid : _cid.get_temp();
2918 tracepoint(objectstore, omap_clear_enter, osr_name);
2919 r = _omap_clear(cid, oid, spos);
2920 tracepoint(objectstore, omap_clear_exit, r);
2921 }
2922 break;
2923 case Transaction::OP_OMAP_SETKEYS:
2924 {
2925 const coll_t &_cid = i.get_cid(op->cid);
2926 const ghobject_t &oid = i.get_oid(op->oid);
2927 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2928 _cid : _cid.get_temp();
2929 map<string, bufferlist> aset;
2930 i.decode_attrset(aset);
2931 tracepoint(objectstore, omap_setkeys_enter, osr_name);
2932 r = _omap_setkeys(cid, oid, aset, spos);
2933 tracepoint(objectstore, omap_setkeys_exit, r);
2934 }
2935 break;
2936 case Transaction::OP_OMAP_RMKEYS:
2937 {
2938 const coll_t &_cid = i.get_cid(op->cid);
2939 const ghobject_t &oid = i.get_oid(op->oid);
2940 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2941 _cid : _cid.get_temp();
2942 set<string> keys;
2943 i.decode_keyset(keys);
2944 tracepoint(objectstore, omap_rmkeys_enter, osr_name);
2945 r = _omap_rmkeys(cid, oid, keys, spos);
2946 tracepoint(objectstore, omap_rmkeys_exit, r);
2947 }
2948 break;
2949 case Transaction::OP_OMAP_RMKEYRANGE:
2950 {
2951 const coll_t &_cid = i.get_cid(op->cid);
2952 const ghobject_t &oid = i.get_oid(op->oid);
2953 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2954 _cid : _cid.get_temp();
2955 string first, last;
2956 first = i.decode_string();
2957 last = i.decode_string();
2958 tracepoint(objectstore, omap_rmkeyrange_enter, osr_name);
2959 r = _omap_rmkeyrange(cid, oid, first, last, spos);
2960 tracepoint(objectstore, omap_rmkeyrange_exit, r);
2961 }
2962 break;
2963 case Transaction::OP_OMAP_SETHEADER:
2964 {
2965 const coll_t &_cid = i.get_cid(op->cid);
2966 const ghobject_t &oid = i.get_oid(op->oid);
2967 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2968 _cid : _cid.get_temp();
2969 bufferlist bl;
2970 i.decode_bl(bl);
2971 tracepoint(objectstore, omap_setheader_enter, osr_name);
2972 r = _omap_setheader(cid, oid, bl, spos);
2973 tracepoint(objectstore, omap_setheader_exit, r);
2974 }
2975 break;
2976 case Transaction::OP_SPLIT_COLLECTION:
2977 {
2978 assert(0 == "not legacy journal; upgrade to firefly first");
2979 }
2980 break;
2981 case Transaction::OP_SPLIT_COLLECTION2:
2982 {
2983 coll_t cid = i.get_cid(op->cid);
2984 uint32_t bits = op->split_bits;
2985 uint32_t rem = op->split_rem;
2986 coll_t dest = i.get_cid(op->dest_cid);
2987 tracepoint(objectstore, split_coll2_enter, osr_name);
2988 r = _split_collection(cid, bits, rem, dest, spos);
2989 tracepoint(objectstore, split_coll2_exit, r);
2990 }
2991 break;
2992
2993 case Transaction::OP_SETALLOCHINT:
2994 {
2995 const coll_t &_cid = i.get_cid(op->cid);
2996 const ghobject_t &oid = i.get_oid(op->oid);
2997 const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2998 _cid : _cid.get_temp();
2999 uint64_t expected_object_size = op->expected_object_size;
3000 uint64_t expected_write_size = op->expected_write_size;
3001 tracepoint(objectstore, setallochint_enter, osr_name);
3002 if (_check_replay_guard(cid, oid, spos) > 0)
3003 r = _set_alloc_hint(cid, oid, expected_object_size,
3004 expected_write_size);
3005 tracepoint(objectstore, setallochint_exit, r);
3006 }
3007 break;
3008
3009 default:
3010 derr << "bad op " << op->op << dendl;
3011 ceph_abort();
3012 }
3013
3014 if (r < 0) {
3015 bool ok = false;
3016
3017 if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE ||
3018 op->op == Transaction::OP_CLONE ||
3019 op->op == Transaction::OP_CLONERANGE2 ||
3020 op->op == Transaction::OP_COLL_ADD ||
3021 op->op == Transaction::OP_SETATTR ||
3022 op->op == Transaction::OP_SETATTRS ||
3023 op->op == Transaction::OP_RMATTR ||
3024 op->op == Transaction::OP_OMAP_SETKEYS ||
3025 op->op == Transaction::OP_OMAP_RMKEYS ||
3026 op->op == Transaction::OP_OMAP_RMKEYRANGE ||
3027 op->op == Transaction::OP_OMAP_SETHEADER))
3028 // -ENOENT is normally okay
3029 // ...including on a replayed OP_RMCOLL with checkpoint mode
3030 ok = true;
3031 if (r == -ENODATA)
3032 ok = true;
3033
3034 if (op->op == Transaction::OP_SETALLOCHINT)
3035 // Either EOPNOTSUPP or EINVAL most probably. EINVAL in most
3036 // cases means invalid hint size (e.g. too big, not a multiple
3037 // of block size, etc) or, at least on xfs, an attempt to set
3038 // or change it when the file is not empty. However,
3039 // OP_SETALLOCHINT is advisory, so ignore all errors.
3040 ok = true;
3041
3042 if (replaying && !backend->can_checkpoint()) {
3043 if (r == -EEXIST && op->op == Transaction::OP_MKCOLL) {
3044 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3045 ok = true;
3046 }
3047 if (r == -EEXIST && op->op == Transaction::OP_COLL_ADD) {
3048 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3049 ok = true;
3050 }
3051 if (r == -EEXIST && op->op == Transaction::OP_COLL_MOVE) {
3052 dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3053 ok = true;
3054 }
3055 if (r == -ERANGE) {
3056 dout(10) << "tolerating ERANGE on replay" << dendl;
3057 ok = true;
3058 }
3059 if (r == -ENOENT) {
3060 dout(10) << "tolerating ENOENT on replay" << dendl;
3061 ok = true;
3062 }
3063 }
3064
3065 if (!ok) {
3066 const char *msg = "unexpected error code";
3067
3068 if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE ||
3069 op->op == Transaction::OP_CLONE ||
3070 op->op == Transaction::OP_CLONERANGE2)) {
3071 msg = "ENOENT on clone suggests osd bug";
3072 } else if (r == -ENOSPC) {
3073 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
3074 // by partially applying transactions.
3075 msg = "ENOSPC from disk filesystem, misconfigured cluster";
3076 } else if (r == -ENOTEMPTY) {
3077 msg = "ENOTEMPTY suggests garbage data in osd data dir";
3078 } else if (r == -EPERM) {
3079 msg = "EPERM suggests file(s) in osd data dir not owned by ceph user, or leveldb corruption";
3080 }
3081
3082 derr << " error " << cpp_strerror(r) << " not handled on operation " << op
3083 << " (" << spos << ", or op " << spos.op << ", counting from 0)" << dendl;
3084 dout(0) << msg << dendl;
3085 dout(0) << " transaction dump:\n";
3086 JSONFormatter f(true);
3087 f.open_object_section("transaction");
3088 t.dump(&f);
3089 f.close_section();
3090 f.flush(*_dout);
3091 *_dout << dendl;
3092
3093 if (r == -EMFILE) {
3094 dump_open_fds(cct);
3095 }
3096
3097 assert(0 == "unexpected error");
3098 }
3099 }
3100
3101 spos.op++;
3102 }
3103
3104 _inject_failure();
3105 }
3106
3107 /*********************************************/
3108
3109
3110
3111 // --------------------
3112 // objects
3113
3114 bool FileStore::exists(const coll_t& _cid, const ghobject_t& oid)
3115 {
3116 tracepoint(objectstore, exists_enter, _cid.c_str());
3117 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3118 struct stat st;
3119 bool retval = stat(cid, oid, &st) == 0;
3120 tracepoint(objectstore, exists_exit, retval);
3121 return retval;
3122 }
3123
3124 int FileStore::stat(
3125 const coll_t& _cid, const ghobject_t& oid, struct stat *st, bool allow_eio)
3126 {
3127 tracepoint(objectstore, stat_enter, _cid.c_str());
3128 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3129 int r = lfn_stat(cid, oid, st);
3130 assert(allow_eio || !m_filestore_fail_eio || r != -EIO);
3131 if (r < 0) {
3132 dout(10) << "stat " << cid << "/" << oid
3133 << " = " << r << dendl;
3134 } else {
3135 dout(10) << "stat " << cid << "/" << oid
3136 << " = " << r
3137 << " (size " << st->st_size << ")" << dendl;
3138 }
3139 if (cct->_conf->filestore_debug_inject_read_err &&
3140 debug_mdata_eio(oid)) {
3141 return -EIO;
3142 } else {
3143 tracepoint(objectstore, stat_exit, r);
3144 return r;
3145 }
3146 }
3147
3148 int FileStore::set_collection_opts(
3149 const coll_t& cid,
3150 const pool_opts_t& opts)
3151 {
3152 return -EOPNOTSUPP;
3153 }
3154
3155 int FileStore::read(
3156 const coll_t& _cid,
3157 const ghobject_t& oid,
3158 uint64_t offset,
3159 size_t len,
3160 bufferlist& bl,
3161 uint32_t op_flags,
3162 bool allow_eio)
3163 {
3164 int got;
3165 tracepoint(objectstore, read_enter, _cid.c_str(), offset, len);
3166 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3167
3168 dout(15) << "read " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3169
3170 FDRef fd;
3171 int r = lfn_open(cid, oid, false, &fd);
3172 if (r < 0) {
3173 dout(10) << "FileStore::read(" << cid << "/" << oid << ") open error: "
3174 << cpp_strerror(r) << dendl;
3175 return r;
3176 }
3177
3178 if (offset == 0 && len == 0) {
3179 struct stat st;
3180 memset(&st, 0, sizeof(struct stat));
3181 int r = ::fstat(**fd, &st);
3182 assert(r == 0);
3183 len = st.st_size;
3184 }
3185
3186 #ifdef HAVE_POSIX_FADVISE
3187 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_RANDOM)
3188 posix_fadvise(**fd, offset, len, POSIX_FADV_RANDOM);
3189 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL)
3190 posix_fadvise(**fd, offset, len, POSIX_FADV_SEQUENTIAL);
3191 #endif
3192
3193 bufferptr bptr(len); // prealloc space for entire read
3194 got = safe_pread(**fd, bptr.c_str(), len, offset);
3195 if (got < 0) {
3196 dout(10) << "FileStore::read(" << cid << "/" << oid << ") pread error: " << cpp_strerror(got) << dendl;
3197 lfn_close(fd);
3198 if (!(allow_eio || !m_filestore_fail_eio || got != -EIO)) {
3199 derr << "FileStore::read(" << cid << "/" << oid << ") pread error: " << cpp_strerror(got) << dendl;
3200 assert(0 == "eio on pread");
3201 }
3202 return got;
3203 }
3204 bptr.set_length(got); // properly size the buffer
3205 bl.clear();
3206 bl.push_back(std::move(bptr)); // put it in the target bufferlist
3207
3208 #ifdef HAVE_POSIX_FADVISE
3209 if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED)
3210 posix_fadvise(**fd, offset, len, POSIX_FADV_DONTNEED);
3211 if (op_flags & (CEPH_OSD_OP_FLAG_FADVISE_RANDOM | CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL))
3212 posix_fadvise(**fd, offset, len, POSIX_FADV_NORMAL);
3213 #endif
3214
3215 if (m_filestore_sloppy_crc && (!replaying || backend->can_checkpoint())) {
3216 ostringstream ss;
3217 int errors = backend->_crc_verify_read(**fd, offset, got, bl, &ss);
3218 if (errors != 0) {
3219 dout(0) << "FileStore::read " << cid << "/" << oid << " " << offset << "~"
3220 << got << " ... BAD CRC:\n" << ss.str() << dendl;
3221 assert(0 == "bad crc on read");
3222 }
3223 }
3224
3225 lfn_close(fd);
3226
3227 dout(10) << "FileStore::read " << cid << "/" << oid << " " << offset << "~"
3228 << got << "/" << len << dendl;
3229 if (cct->_conf->filestore_debug_inject_read_err &&
3230 debug_data_eio(oid)) {
3231 return -EIO;
3232 } else {
3233 tracepoint(objectstore, read_exit, got);
3234 return got;
3235 }
3236 }
3237
3238 int FileStore::_do_fiemap(int fd, uint64_t offset, size_t len,
3239 map<uint64_t, uint64_t> *m)
3240 {
3241 uint64_t i;
3242 struct fiemap_extent *extent = NULL;
3243 struct fiemap *fiemap = NULL;
3244 int r = 0;
3245
3246 more:
3247 r = backend->do_fiemap(fd, offset, len, &fiemap);
3248 if (r < 0)
3249 return r;
3250
3251 if (fiemap->fm_mapped_extents == 0) {
3252 free(fiemap);
3253 return r;
3254 }
3255
3256 extent = &fiemap->fm_extents[0];
3257
3258 /* start where we were asked to start */
3259 if (extent->fe_logical < offset) {
3260 extent->fe_length -= offset - extent->fe_logical;
3261 extent->fe_logical = offset;
3262 }
3263
3264 i = 0;
3265
3266 struct fiemap_extent *last = nullptr;
3267 while (i < fiemap->fm_mapped_extents) {
3268 struct fiemap_extent *next = extent + 1;
3269
3270 dout(10) << "FileStore::fiemap() fm_mapped_extents=" << fiemap->fm_mapped_extents
3271 << " fe_logical=" << extent->fe_logical << " fe_length=" << extent->fe_length << dendl;
3272
3273 /* try to merge extents */
3274 while ((i < fiemap->fm_mapped_extents - 1) &&
3275 (extent->fe_logical + extent->fe_length == next->fe_logical)) {
3276 next->fe_length += extent->fe_length;
3277 next->fe_logical = extent->fe_logical;
3278 extent = next;
3279 next = extent + 1;
3280 i++;
3281 }
3282
3283 if (extent->fe_logical + extent->fe_length > offset + len)
3284 extent->fe_length = offset + len - extent->fe_logical;
3285 (*m)[extent->fe_logical] = extent->fe_length;
3286 i++;
3287 last = extent++;
3288 }
3289 uint64_t xoffset = last->fe_logical + last->fe_length - offset;
3290 offset = last->fe_logical + last->fe_length;
3291 len -= xoffset;
3292 const bool is_last = (last->fe_flags & FIEMAP_EXTENT_LAST) || (len == 0);
3293 free(fiemap);
3294 if (!is_last) {
3295 goto more;
3296 }
3297
3298 return r;
3299 }
3300
3301 int FileStore::_do_seek_hole_data(int fd, uint64_t offset, size_t len,
3302 map<uint64_t, uint64_t> *m)
3303 {
3304 #if defined(__linux__) && defined(SEEK_HOLE) && defined(SEEK_DATA)
3305 off_t hole_pos, data_pos;
3306 int r = 0;
3307
3308 // If lseek fails with errno setting to be ENXIO, this means the current
3309 // file offset is beyond the end of the file.
3310 off_t start = offset;
3311 while(start < (off_t)(offset + len)) {
3312 data_pos = lseek(fd, start, SEEK_DATA);
3313 if (data_pos < 0) {
3314 if (errno == ENXIO)
3315 break;
3316 else {
3317 r = -errno;
3318 dout(10) << "failed to lseek: " << cpp_strerror(r) << dendl;
3319 return r;
3320 }
3321 } else if (data_pos > (off_t)(offset + len)) {
3322 break;
3323 }
3324
3325 hole_pos = lseek(fd, data_pos, SEEK_HOLE);
3326 if (hole_pos < 0) {
3327 if (errno == ENXIO) {
3328 break;
3329 } else {
3330 r = -errno;
3331 dout(10) << "failed to lseek: " << cpp_strerror(r) << dendl;
3332 return r;
3333 }
3334 }
3335
3336 if (hole_pos >= (off_t)(offset + len)) {
3337 (*m)[data_pos] = offset + len - data_pos;
3338 break;
3339 }
3340 (*m)[data_pos] = hole_pos - data_pos;
3341 start = hole_pos;
3342 }
3343
3344 return r;
3345 #else
3346 (*m)[offset] = len;
3347 return 0;
3348 #endif
3349 }
3350
3351 int FileStore::fiemap(const coll_t& _cid, const ghobject_t& oid,
3352 uint64_t offset, size_t len,
3353 bufferlist& bl)
3354 {
3355 map<uint64_t, uint64_t> exomap;
3356 int r = fiemap(_cid, oid, offset, len, exomap);
3357 if (r >= 0) {
3358 ::encode(exomap, bl);
3359 }
3360 return r;
3361 }
3362
3363 int FileStore::fiemap(const coll_t& _cid, const ghobject_t& oid,
3364 uint64_t offset, size_t len,
3365 map<uint64_t, uint64_t>& destmap)
3366 {
3367 tracepoint(objectstore, fiemap_enter, _cid.c_str(), offset, len);
3368 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3369 destmap.clear();
3370
3371 if ((!backend->has_seek_data_hole() && !backend->has_fiemap()) ||
3372 len <= (size_t)m_filestore_fiemap_threshold) {
3373 destmap[offset] = len;
3374 return 0;
3375 }
3376
3377 dout(15) << "fiemap " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3378
3379 FDRef fd;
3380
3381 int r = lfn_open(cid, oid, false, &fd);
3382 if (r < 0) {
3383 dout(10) << "read couldn't open " << cid << "/" << oid << ": " << cpp_strerror(r) << dendl;
3384 goto done;
3385 }
3386
3387 if (backend->has_seek_data_hole()) {
3388 dout(15) << "seek_data/seek_hole " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3389 r = _do_seek_hole_data(**fd, offset, len, &destmap);
3390 } else if (backend->has_fiemap()) {
3391 dout(15) << "fiemap ioctl" << cid << "/" << oid << " " << offset << "~" << len << dendl;
3392 r = _do_fiemap(**fd, offset, len, &destmap);
3393 }
3394
3395 lfn_close(fd);
3396
3397 done:
3398
3399 dout(10) << "fiemap " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << " num_extents=" << destmap.size() << " " << destmap << dendl;
3400 assert(!m_filestore_fail_eio || r != -EIO);
3401 tracepoint(objectstore, fiemap_exit, r);
3402 return r;
3403 }
3404
3405 int FileStore::_remove(const coll_t& cid, const ghobject_t& oid,
3406 const SequencerPosition &spos)
3407 {
3408 dout(15) << "remove " << cid << "/" << oid << dendl;
3409 int r = lfn_unlink(cid, oid, spos);
3410 dout(10) << "remove " << cid << "/" << oid << " = " << r << dendl;
3411 return r;
3412 }
3413
3414 int FileStore::_truncate(const coll_t& cid, const ghobject_t& oid, uint64_t size)
3415 {
3416 dout(15) << "truncate " << cid << "/" << oid << " size " << size << dendl;
3417 int r = lfn_truncate(cid, oid, size);
3418 dout(10) << "truncate " << cid << "/" << oid << " size " << size << " = " << r << dendl;
3419 return r;
3420 }
3421
3422
3423 int FileStore::_touch(const coll_t& cid, const ghobject_t& oid)
3424 {
3425 dout(15) << "touch " << cid << "/" << oid << dendl;
3426
3427 FDRef fd;
3428 int r = lfn_open(cid, oid, true, &fd);
3429 if (r < 0) {
3430 return r;
3431 } else {
3432 lfn_close(fd);
3433 }
3434 dout(10) << "touch " << cid << "/" << oid << " = " << r << dendl;
3435 return r;
3436 }
3437
3438 int FileStore::_write(const coll_t& cid, const ghobject_t& oid,
3439 uint64_t offset, size_t len,
3440 const bufferlist& bl, uint32_t fadvise_flags)
3441 {
3442 dout(15) << "write " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3443 int r;
3444
3445 FDRef fd;
3446 r = lfn_open(cid, oid, true, &fd);
3447 if (r < 0) {
3448 dout(0) << "write couldn't open " << cid << "/"
3449 << oid << ": "
3450 << cpp_strerror(r) << dendl;
3451 goto out;
3452 }
3453
3454 // write
3455 r = bl.write_fd(**fd, offset);
3456 if (r < 0) {
3457 derr << __func__ << " write_fd on " << cid << "/" << oid
3458 << " error: " << cpp_strerror(r) << dendl;
3459 lfn_close(fd);
3460 goto out;
3461 }
3462 r = bl.length();
3463
3464 if (r >= 0 && m_filestore_sloppy_crc) {
3465 int rc = backend->_crc_update_write(**fd, offset, len, bl);
3466 assert(rc >= 0);
3467 }
3468
3469 if (replaying || m_disable_wbthrottle) {
3470 if (fadvise_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED) {
3471 #ifdef HAVE_POSIX_FADVISE
3472 posix_fadvise(**fd, 0, 0, POSIX_FADV_DONTNEED);
3473 #endif
3474 }
3475 } else {
3476 wbthrottle.queue_wb(fd, oid, offset, len,
3477 fadvise_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
3478 }
3479
3480 lfn_close(fd);
3481
3482 out:
3483 dout(10) << "write " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << dendl;
3484 return r;
3485 }
3486
3487 int FileStore::_zero(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len)
3488 {
3489 dout(15) << "zero " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3490 int ret = 0;
3491
3492 if (cct->_conf->filestore_punch_hole) {
3493 #ifdef CEPH_HAVE_FALLOCATE
3494 # if !defined(DARWIN) && !defined(__FreeBSD__)
3495 # ifdef FALLOC_FL_KEEP_SIZE
3496 // first try to punch a hole.
3497 FDRef fd;
3498 ret = lfn_open(cid, oid, false, &fd);
3499 if (ret < 0) {
3500 goto out;
3501 }
3502
3503 struct stat st;
3504 ret = ::fstat(**fd, &st);
3505 if (ret < 0) {
3506 ret = -errno;
3507 lfn_close(fd);
3508 goto out;
3509 }
3510
3511 // first try fallocate
3512 ret = fallocate(**fd, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE,
3513 offset, len);
3514 if (ret < 0) {
3515 ret = -errno;
3516 } else {
3517 // ensure we extent file size, if needed
3518 if (offset + len > (uint64_t)st.st_size) {
3519 ret = ::ftruncate(**fd, offset + len);
3520 if (ret < 0) {
3521 ret = -errno;
3522 lfn_close(fd);
3523 goto out;
3524 }
3525 }
3526 }
3527 lfn_close(fd);
3528
3529 if (ret >= 0 && m_filestore_sloppy_crc) {
3530 int rc = backend->_crc_update_zero(**fd, offset, len);
3531 assert(rc >= 0);
3532 }
3533
3534 if (ret == 0)
3535 goto out; // yay!
3536 if (ret != -EOPNOTSUPP)
3537 goto out; // some other error
3538 # endif
3539 # endif
3540 #endif
3541 }
3542
3543 // lame, kernel is old and doesn't support it.
3544 // write zeros.. yuck!
3545 dout(20) << "zero falling back to writing zeros" << dendl;
3546 {
3547 bufferlist bl;
3548 bl.append_zero(len);
3549 ret = _write(cid, oid, offset, len, bl);
3550 }
3551
3552 #ifdef CEPH_HAVE_FALLOCATE
3553 # if !defined(DARWIN) && !defined(__FreeBSD__)
3554 # ifdef FALLOC_FL_KEEP_SIZE
3555 out:
3556 # endif
3557 # endif
3558 #endif
3559 dout(20) << "zero " << cid << "/" << oid << " " << offset << "~" << len << " = " << ret << dendl;
3560 return ret;
3561 }
3562
3563 int FileStore::_clone(const coll_t& cid, const ghobject_t& oldoid, const ghobject_t& newoid,
3564 const SequencerPosition& spos)
3565 {
3566 dout(15) << "clone " << cid << "/" << oldoid << " -> " << cid << "/" << newoid << dendl;
3567
3568 if (_check_replay_guard(cid, newoid, spos) < 0)
3569 return 0;
3570
3571 int r;
3572 FDRef o, n;
3573 {
3574 Index index;
3575 r = lfn_open(cid, oldoid, false, &o, &index);
3576 if (r < 0) {
3577 goto out2;
3578 }
3579 assert(NULL != (index.index));
3580 RWLock::WLocker l((index.index)->access_lock);
3581
3582 r = lfn_open(cid, newoid, true, &n, &index);
3583 if (r < 0) {
3584 goto out;
3585 }
3586 r = ::ftruncate(**n, 0);
3587 if (r < 0) {
3588 r = -errno;
3589 goto out3;
3590 }
3591 struct stat st;
3592 r = ::fstat(**o, &st);
3593 if (r < 0) {
3594 r = -errno;
3595 goto out3;
3596 }
3597
3598 r = _do_clone_range(**o, **n, 0, st.st_size, 0);
3599 if (r < 0) {
3600 goto out3;
3601 }
3602
3603 dout(20) << "objectmap clone" << dendl;
3604 r = object_map->clone(oldoid, newoid, &spos);
3605 if (r < 0 && r != -ENOENT)
3606 goto out3;
3607 }
3608
3609 {
3610 char buf[2];
3611 map<string, bufferptr> aset;
3612 r = _fgetattrs(**o, aset);
3613 if (r < 0)
3614 goto out3;
3615
3616 r = chain_fgetxattr(**o, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
3617 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
3618 r = chain_fsetxattr<true, true>(**n, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
3619 sizeof(XATTR_NO_SPILL_OUT));
3620 } else {
3621 r = chain_fsetxattr<true, true>(**n, XATTR_SPILL_OUT_NAME, XATTR_SPILL_OUT,
3622 sizeof(XATTR_SPILL_OUT));
3623 }
3624 if (r < 0)
3625 goto out3;
3626
3627 r = _fsetattrs(**n, aset);
3628 if (r < 0)
3629 goto out3;
3630 }
3631
3632 // clone is non-idempotent; record our work.
3633 _set_replay_guard(**n, spos, &newoid);
3634
3635 out3:
3636 lfn_close(n);
3637 out:
3638 lfn_close(o);
3639 out2:
3640 dout(10) << "clone " << cid << "/" << oldoid << " -> " << cid << "/" << newoid << " = " << r << dendl;
3641 assert(!m_filestore_fail_eio || r != -EIO);
3642 return r;
3643 }
3644
3645 int FileStore::_do_clone_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff)
3646 {
3647 dout(20) << "_do_clone_range copy " << srcoff << "~" << len << " to " << dstoff << dendl;
3648 return backend->clone_range(from, to, srcoff, len, dstoff);
3649 }
3650
3651 int FileStore::_do_sparse_copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff)
3652 {
3653 dout(20) << __func__ << " " << srcoff << "~" << len << " to " << dstoff << dendl;
3654 int r = 0;
3655 map<uint64_t, uint64_t> exomap;
3656 // fiemap doesn't allow zero length
3657 if (len == 0)
3658 return 0;
3659
3660 if (backend->has_seek_data_hole()) {
3661 dout(15) << "seek_data/seek_hole " << from << " " << srcoff << "~" << len << dendl;
3662 r = _do_seek_hole_data(from, srcoff, len, &exomap);
3663 } else if (backend->has_fiemap()) {
3664 dout(15) << "fiemap ioctl" << from << " " << srcoff << "~" << len << dendl;
3665 r = _do_fiemap(from, srcoff, len, &exomap);
3666 }
3667
3668
3669 int64_t written = 0;
3670 if (r < 0)
3671 goto out;
3672
3673 for (map<uint64_t, uint64_t>::iterator miter = exomap.begin(); miter != exomap.end(); ++miter) {
3674 uint64_t it_off = miter->first - srcoff + dstoff;
3675 r = _do_copy_range(from, to, miter->first, miter->second, it_off, true);
3676 if (r < 0) {
3677 derr << "FileStore::_do_copy_range: copy error at " << miter->first << "~" << miter->second
3678 << " to " << it_off << ", " << cpp_strerror(r) << dendl;
3679 break;
3680 }
3681 written += miter->second;
3682 }
3683
3684 if (r >= 0) {
3685 if (m_filestore_sloppy_crc) {
3686 int rc = backend->_crc_update_clone_range(from, to, srcoff, len, dstoff);
3687 assert(rc >= 0);
3688 }
3689 struct stat st;
3690 r = ::fstat(to, &st);
3691 if (r < 0) {
3692 r = -errno;
3693 derr << __func__ << ": fstat error at " << to << " " << cpp_strerror(r) << dendl;
3694 goto out;
3695 }
3696 if (st.st_size < (int)(dstoff + len)) {
3697 r = ::ftruncate(to, dstoff + len);
3698 if (r < 0) {
3699 r = -errno;
3700 derr << __func__ << ": ftruncate error at " << dstoff+len << " " << cpp_strerror(r) << dendl;
3701 goto out;
3702 }
3703 }
3704 r = written;
3705 }
3706
3707 out:
3708 dout(20) << __func__ << " " << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
3709 return r;
3710 }
3711
3712 int FileStore::_do_copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff, bool skip_sloppycrc)
3713 {
3714 dout(20) << "_do_copy_range " << srcoff << "~" << len << " to " << dstoff << dendl;
3715 int r = 0;
3716 loff_t pos = srcoff;
3717 loff_t end = srcoff + len;
3718 int buflen = 4096 * 16; //limit by pipe max size.see fcntl
3719
3720 #ifdef CEPH_HAVE_SPLICE
3721 if (backend->has_splice()) {
3722 int pipefd[2];
3723 if (pipe(pipefd) < 0) {
3724 r = -errno;
3725 derr << " pipe " << " got " << cpp_strerror(r) << dendl;
3726 return r;
3727 }
3728
3729 loff_t dstpos = dstoff;
3730 while (pos < end) {
3731 int l = MIN(end-pos, buflen);
3732 r = safe_splice(from, &pos, pipefd[1], NULL, l, SPLICE_F_NONBLOCK);
3733 dout(10) << " safe_splice read from " << pos << "~" << l << " got " << r << dendl;
3734 if (r < 0) {
3735 derr << "FileStore::_do_copy_range: safe_splice read error at " << pos << "~" << len
3736 << ", " << cpp_strerror(r) << dendl;
3737 break;
3738 }
3739 if (r == 0) {
3740 // hrm, bad source range, wtf.
3741 r = -ERANGE;
3742 derr << "FileStore::_do_copy_range got short read result at " << pos
3743 << " of fd " << from << " len " << len << dendl;
3744 break;
3745 }
3746
3747 r = safe_splice(pipefd[0], NULL, to, &dstpos, r, 0);
3748 dout(10) << " safe_splice write to " << to << " len " << r
3749 << " got " << r << dendl;
3750 if (r < 0) {
3751 derr << "FileStore::_do_copy_range: write error at " << pos << "~"
3752 << r << ", " << cpp_strerror(r) << dendl;
3753 break;
3754 }
3755 }
3756 close(pipefd[0]);
3757 close(pipefd[1]);
3758 } else
3759 #endif
3760 {
3761 int64_t actual;
3762
3763 actual = ::lseek64(from, srcoff, SEEK_SET);
3764 if (actual != (int64_t)srcoff) {
3765 if (actual < 0)
3766 r = -errno;
3767 else
3768 r = -EINVAL;
3769 derr << "lseek64 to " << srcoff << " got " << cpp_strerror(r) << dendl;
3770 return r;
3771 }
3772 actual = ::lseek64(to, dstoff, SEEK_SET);
3773 if (actual != (int64_t)dstoff) {
3774 if (actual < 0)
3775 r = -errno;
3776 else
3777 r = -EINVAL;
3778 derr << "lseek64 to " << dstoff << " got " << cpp_strerror(r) << dendl;
3779 return r;
3780 }
3781
3782 char buf[buflen];
3783 while (pos < end) {
3784 int l = MIN(end-pos, buflen);
3785 r = ::read(from, buf, l);
3786 dout(25) << " read from " << pos << "~" << l << " got " << r << dendl;
3787 if (r < 0) {
3788 if (errno == EINTR) {
3789 continue;
3790 } else {
3791 r = -errno;
3792 derr << "FileStore::_do_copy_range: read error at " << pos << "~" << len
3793 << ", " << cpp_strerror(r) << dendl;
3794 break;
3795 }
3796 }
3797 if (r == 0) {
3798 // hrm, bad source range, wtf.
3799 r = -ERANGE;
3800 derr << "FileStore::_do_copy_range got short read result at " << pos
3801 << " of fd " << from << " len " << len << dendl;
3802 break;
3803 }
3804 int op = 0;
3805 while (op < r) {
3806 int r2 = safe_write(to, buf+op, r-op);
3807 dout(25) << " write to " << to << " len " << (r-op)
3808 << " got " << r2 << dendl;
3809 if (r2 < 0) {
3810 r = r2;
3811 derr << "FileStore::_do_copy_range: write error at " << pos << "~"
3812 << r-op << ", " << cpp_strerror(r) << dendl;
3813
3814 break;
3815 }
3816 op += (r-op);
3817 }
3818 if (r < 0)
3819 break;
3820 pos += r;
3821 }
3822 }
3823
3824 if (r < 0 && replaying) {
3825 assert(r == -ERANGE);
3826 derr << "Filestore: short source tolerated because we are replaying" << dendl;
3827 r = pos - from;;
3828 }
3829 assert(replaying || pos == end);
3830 if (r >= 0 && !skip_sloppycrc && m_filestore_sloppy_crc) {
3831 int rc = backend->_crc_update_clone_range(from, to, srcoff, len, dstoff);
3832 assert(rc >= 0);
3833 }
3834 dout(20) << "_do_copy_range " << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
3835 return r;
3836 }
3837
3838 int FileStore::_clone_range(const coll_t& oldcid, const ghobject_t& oldoid, const coll_t& newcid, const ghobject_t& newoid,
3839 uint64_t srcoff, uint64_t len, uint64_t dstoff,
3840 const SequencerPosition& spos)
3841 {
3842 dout(15) << "clone_range " << oldcid << "/" << oldoid << " -> " << newcid << "/" << newoid << " " << srcoff << "~" << len << " to " << dstoff << dendl;
3843
3844 if (_check_replay_guard(newcid, newoid, spos) < 0)
3845 return 0;
3846
3847 int r;
3848 FDRef o, n;
3849 r = lfn_open(oldcid, oldoid, false, &o);
3850 if (r < 0) {
3851 goto out2;
3852 }
3853 r = lfn_open(newcid, newoid, true, &n);
3854 if (r < 0) {
3855 goto out;
3856 }
3857 r = _do_clone_range(**o, **n, srcoff, len, dstoff);
3858 if (r < 0) {
3859 goto out3;
3860 }
3861
3862 // clone is non-idempotent; record our work.
3863 _set_replay_guard(**n, spos, &newoid);
3864
3865 out3:
3866 lfn_close(n);
3867 out:
3868 lfn_close(o);
3869 out2:
3870 dout(10) << "clone_range " << oldcid << "/" << oldoid << " -> " << newcid << "/" << newoid << " "
3871 << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
3872 return r;
3873 }
3874
3875 class SyncEntryTimeout : public Context {
3876 public:
3877 CephContext* cct;
3878 explicit SyncEntryTimeout(CephContext* cct, int commit_timeo)
3879 : cct(cct), m_commit_timeo(commit_timeo)
3880 {
3881 }
3882
3883 void finish(int r) override {
3884 BackTrace *bt = new BackTrace(1);
3885 generic_dout(-1) << "FileStore: sync_entry timed out after "
3886 << m_commit_timeo << " seconds.\n";
3887 bt->print(*_dout);
3888 *_dout << dendl;
3889 delete bt;
3890 ceph_abort();
3891 }
3892 private:
3893 int m_commit_timeo;
3894 };
3895
3896 void FileStore::sync_entry()
3897 {
3898 lock.Lock();
3899 while (!stop) {
3900 utime_t max_interval;
3901 max_interval.set_from_double(m_filestore_max_sync_interval);
3902 utime_t min_interval;
3903 min_interval.set_from_double(m_filestore_min_sync_interval);
3904
3905 utime_t startwait = ceph_clock_now();
3906 if (!force_sync) {
3907 dout(20) << "sync_entry waiting for max_interval " << max_interval << dendl;
3908 sync_cond.WaitInterval(lock, max_interval);
3909 } else {
3910 dout(20) << "sync_entry not waiting, force_sync set" << dendl;
3911 }
3912
3913 if (force_sync) {
3914 dout(20) << "sync_entry force_sync set" << dendl;
3915 force_sync = false;
3916 } else if (stop) {
3917 dout(20) << __func__ << " stop set" << dendl;
3918 break;
3919 } else {
3920 // wait for at least the min interval
3921 utime_t woke = ceph_clock_now();
3922 woke -= startwait;
3923 dout(20) << "sync_entry woke after " << woke << dendl;
3924 if (woke < min_interval) {
3925 utime_t t = min_interval;
3926 t -= woke;
3927 dout(20) << "sync_entry waiting for another " << t
3928 << " to reach min interval " << min_interval << dendl;
3929 sync_cond.WaitInterval(lock, t);
3930 }
3931 }
3932
3933 list<Context*> fin;
3934 again:
3935 fin.swap(sync_waiters);
3936 lock.Unlock();
3937
3938 op_tp.pause();
3939 if (apply_manager.commit_start()) {
3940 utime_t start = ceph_clock_now();
3941 uint64_t cp = apply_manager.get_committing_seq();
3942
3943 sync_entry_timeo_lock.Lock();
3944 SyncEntryTimeout *sync_entry_timeo =
3945 new SyncEntryTimeout(cct, m_filestore_commit_timeout);
3946 timer.add_event_after(m_filestore_commit_timeout, sync_entry_timeo);
3947 sync_entry_timeo_lock.Unlock();
3948
3949 logger->set(l_filestore_committing, 1);
3950
3951 dout(15) << "sync_entry committing " << cp << dendl;
3952 stringstream errstream;
3953 if (cct->_conf->filestore_debug_omap_check && !object_map->check(errstream)) {
3954 derr << errstream.str() << dendl;
3955 ceph_abort();
3956 }
3957
3958 if (backend->can_checkpoint()) {
3959 int err = write_op_seq(op_fd, cp);
3960 if (err < 0) {
3961 derr << "Error during write_op_seq: " << cpp_strerror(err) << dendl;
3962 assert(0 == "error during write_op_seq");
3963 }
3964
3965 char s[NAME_MAX];
3966 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)cp);
3967 uint64_t cid = 0;
3968 err = backend->create_checkpoint(s, &cid);
3969 if (err < 0) {
3970 int err = errno;
3971 derr << "snap create '" << s << "' got error " << err << dendl;
3972 assert(err == 0);
3973 }
3974
3975 snaps.push_back(cp);
3976 apply_manager.commit_started();
3977 op_tp.unpause();
3978
3979 if (cid > 0) {
3980 dout(20) << " waiting for checkpoint " << cid << " to complete" << dendl;
3981 err = backend->sync_checkpoint(cid);
3982 if (err < 0) {
3983 derr << "ioctl WAIT_SYNC got " << cpp_strerror(err) << dendl;
3984 assert(0 == "wait_sync got error");
3985 }
3986 dout(20) << " done waiting for checkpoint " << cid << " to complete" << dendl;
3987 }
3988 } else
3989 {
3990 apply_manager.commit_started();
3991 op_tp.unpause();
3992
3993 int err = object_map->sync();
3994 if (err < 0) {
3995 derr << "object_map sync got " << cpp_strerror(err) << dendl;
3996 assert(0 == "object_map sync returned error");
3997 }
3998
3999 err = backend->syncfs();
4000 if (err < 0) {
4001 derr << "syncfs got " << cpp_strerror(err) << dendl;
4002 assert(0 == "syncfs returned error");
4003 }
4004
4005 err = write_op_seq(op_fd, cp);
4006 if (err < 0) {
4007 derr << "Error during write_op_seq: " << cpp_strerror(err) << dendl;
4008 assert(0 == "error during write_op_seq");
4009 }
4010 err = ::fsync(op_fd);
4011 if (err < 0) {
4012 derr << "Error during fsync of op_seq: " << cpp_strerror(err) << dendl;
4013 assert(0 == "error during fsync of op_seq");
4014 }
4015 }
4016
4017 utime_t done = ceph_clock_now();
4018 utime_t lat = done - start;
4019 utime_t dur = done - startwait;
4020 dout(10) << "sync_entry commit took " << lat << ", interval was " << dur << dendl;
4021
4022 logger->inc(l_filestore_commitcycle);
4023 logger->tinc(l_filestore_commitcycle_latency, lat);
4024 logger->tinc(l_filestore_commitcycle_interval, dur);
4025
4026 apply_manager.commit_finish();
4027 if (!m_disable_wbthrottle) {
4028 wbthrottle.clear();
4029 }
4030
4031 logger->set(l_filestore_committing, 0);
4032
4033 // remove old snaps?
4034 if (backend->can_checkpoint()) {
4035 char s[NAME_MAX];
4036 while (snaps.size() > 2) {
4037 snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)snaps.front());
4038 snaps.pop_front();
4039 dout(10) << "removing snap '" << s << "'" << dendl;
4040 int r = backend->destroy_checkpoint(s);
4041 if (r) {
4042 int err = errno;
4043 derr << "unable to destroy snap '" << s << "' got " << cpp_strerror(err) << dendl;
4044 }
4045 }
4046 }
4047
4048 dout(15) << "sync_entry committed to op_seq " << cp << dendl;
4049
4050 sync_entry_timeo_lock.Lock();
4051 timer.cancel_event(sync_entry_timeo);
4052 sync_entry_timeo_lock.Unlock();
4053 } else {
4054 op_tp.unpause();
4055 }
4056
4057 lock.Lock();
4058 finish_contexts(cct, fin, 0);
4059 fin.clear();
4060 if (!sync_waiters.empty()) {
4061 dout(10) << "sync_entry more waiters, committing again" << dendl;
4062 goto again;
4063 }
4064 if (!stop && journal && journal->should_commit_now()) {
4065 dout(10) << "sync_entry journal says we should commit again (probably is/was full)" << dendl;
4066 goto again;
4067 }
4068 }
4069 stop = false;
4070 lock.Unlock();
4071 }
4072
4073 void FileStore::_start_sync()
4074 {
4075 if (!journal) { // don't do a big sync if the journal is on
4076 dout(10) << "start_sync" << dendl;
4077 sync_cond.Signal();
4078 } else {
4079 dout(10) << "start_sync - NOOP (journal is on)" << dendl;
4080 }
4081 }
4082
4083 void FileStore::do_force_sync()
4084 {
4085 dout(10) << __func__ << dendl;
4086 Mutex::Locker l(lock);
4087 force_sync = true;
4088 sync_cond.Signal();
4089 }
4090
4091 void FileStore::start_sync(Context *onsafe)
4092 {
4093 Mutex::Locker l(lock);
4094 sync_waiters.push_back(onsafe);
4095 sync_cond.Signal();
4096 force_sync = true;
4097 dout(10) << "start_sync" << dendl;
4098 }
4099
4100 void FileStore::sync()
4101 {
4102 Mutex l("FileStore::sync");
4103 Cond c;
4104 bool done;
4105 C_SafeCond *fin = new C_SafeCond(&l, &c, &done);
4106
4107 start_sync(fin);
4108
4109 l.Lock();
4110 while (!done) {
4111 dout(10) << "sync waiting" << dendl;
4112 c.Wait(l);
4113 }
4114 l.Unlock();
4115 dout(10) << "sync done" << dendl;
4116 }
4117
4118 void FileStore::_flush_op_queue()
4119 {
4120 dout(10) << "_flush_op_queue draining op tp" << dendl;
4121 op_wq.drain();
4122 dout(10) << "_flush_op_queue waiting for apply finisher" << dendl;
4123 for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
4124 (*it)->wait_for_empty();
4125 }
4126 }
4127
4128 /*
4129 * flush - make every queued write readable
4130 */
4131 void FileStore::flush()
4132 {
4133 dout(10) << "flush" << dendl;
4134
4135 if (cct->_conf->filestore_blackhole) {
4136 // wait forever
4137 Mutex lock("FileStore::flush::lock");
4138 Cond cond;
4139 lock.Lock();
4140 while (true)
4141 cond.Wait(lock);
4142 ceph_abort();
4143 }
4144
4145 if (m_filestore_journal_writeahead) {
4146 if (journal)
4147 journal->flush();
4148 dout(10) << "flush draining ondisk finisher" << dendl;
4149 for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
4150 (*it)->wait_for_empty();
4151 }
4152 }
4153
4154 _flush_op_queue();
4155 dout(10) << "flush complete" << dendl;
4156 }
4157
4158 /*
4159 * sync_and_flush - make every queued write readable AND committed to disk
4160 */
4161 void FileStore::sync_and_flush()
4162 {
4163 dout(10) << "sync_and_flush" << dendl;
4164
4165 if (m_filestore_journal_writeahead) {
4166 if (journal)
4167 journal->flush();
4168 _flush_op_queue();
4169 } else {
4170 // includes m_filestore_journal_parallel
4171 _flush_op_queue();
4172 sync();
4173 }
4174 dout(10) << "sync_and_flush done" << dendl;
4175 }
4176
4177 int FileStore::flush_journal()
4178 {
4179 dout(10) << __func__ << dendl;
4180 sync_and_flush();
4181 sync();
4182 return 0;
4183 }
4184
4185 int FileStore::snapshot(const string& name)
4186 {
4187 dout(10) << "snapshot " << name << dendl;
4188 sync_and_flush();
4189
4190 if (!backend->can_checkpoint()) {
4191 dout(0) << "snapshot " << name << " failed, not supported" << dendl;
4192 return -EOPNOTSUPP;
4193 }
4194
4195 char s[NAME_MAX];
4196 snprintf(s, sizeof(s), CLUSTER_SNAP_ITEM, name.c_str());
4197
4198 int r = backend->create_checkpoint(s, NULL);
4199 if (r) {
4200 derr << "snapshot " << name << " failed: " << cpp_strerror(r) << dendl;
4201 }
4202
4203 return r;
4204 }
4205
4206 // -------------------------------
4207 // attributes
4208
4209 int FileStore::_fgetattr(int fd, const char *name, bufferptr& bp)
4210 {
4211 char val[CHAIN_XATTR_MAX_BLOCK_LEN];
4212 int l = chain_fgetxattr(fd, name, val, sizeof(val));
4213 if (l >= 0) {
4214 bp = buffer::create(l);
4215 memcpy(bp.c_str(), val, l);
4216 } else if (l == -ERANGE) {
4217 l = chain_fgetxattr(fd, name, 0, 0);
4218 if (l > 0) {
4219 bp = buffer::create(l);
4220 l = chain_fgetxattr(fd, name, bp.c_str(), l);
4221 }
4222 }
4223 assert(!m_filestore_fail_eio || l != -EIO);
4224 return l;
4225 }
4226
4227 int FileStore::_fgetattrs(int fd, map<string,bufferptr>& aset)
4228 {
4229 // get attr list
4230 char names1[100];
4231 int len = chain_flistxattr(fd, names1, sizeof(names1)-1);
4232 char *names2 = 0;
4233 char *name = 0;
4234 if (len == -ERANGE) {
4235 len = chain_flistxattr(fd, 0, 0);
4236 if (len < 0) {
4237 assert(!m_filestore_fail_eio || len != -EIO);
4238 return len;
4239 }
4240 dout(10) << " -ERANGE, len is " << len << dendl;
4241 names2 = new char[len+1];
4242 len = chain_flistxattr(fd, names2, len);
4243 dout(10) << " -ERANGE, got " << len << dendl;
4244 if (len < 0) {
4245 assert(!m_filestore_fail_eio || len != -EIO);
4246 delete[] names2;
4247 return len;
4248 }
4249 name = names2;
4250 } else if (len < 0) {
4251 assert(!m_filestore_fail_eio || len != -EIO);
4252 return len;
4253 } else {
4254 name = names1;
4255 }
4256 name[len] = 0;
4257
4258 char *end = name + len;
4259 while (name < end) {
4260 char *attrname = name;
4261 if (parse_attrname(&name)) {
4262 if (*name) {
4263 dout(20) << "fgetattrs " << fd << " getting '" << name << "'" << dendl;
4264 int r = _fgetattr(fd, attrname, aset[name]);
4265 if (r < 0) {
4266 delete[] names2;
4267 return r;
4268 }
4269 }
4270 }
4271 name += strlen(name) + 1;
4272 }
4273
4274 delete[] names2;
4275 return 0;
4276 }
4277
4278 int FileStore::_fsetattrs(int fd, map<string, bufferptr> &aset)
4279 {
4280 for (map<string, bufferptr>::iterator p = aset.begin();
4281 p != aset.end();
4282 ++p) {
4283 char n[CHAIN_XATTR_MAX_NAME_LEN];
4284 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4285 const char *val;
4286 if (p->second.length())
4287 val = p->second.c_str();
4288 else
4289 val = "";
4290 // ??? Why do we skip setting all the other attrs if one fails?
4291 int r = chain_fsetxattr(fd, n, val, p->second.length());
4292 if (r < 0) {
4293 derr << "FileStore::_setattrs: chain_setxattr returned " << r << dendl;
4294 return r;
4295 }
4296 }
4297 return 0;
4298 }
4299
4300 // debug EIO injection
4301 void FileStore::inject_data_error(const ghobject_t &oid) {
4302 Mutex::Locker l(read_error_lock);
4303 dout(10) << __func__ << ": init error on " << oid << dendl;
4304 data_error_set.insert(oid);
4305 }
4306 void FileStore::inject_mdata_error(const ghobject_t &oid) {
4307 Mutex::Locker l(read_error_lock);
4308 dout(10) << __func__ << ": init error on " << oid << dendl;
4309 mdata_error_set.insert(oid);
4310 }
4311 void FileStore::debug_obj_on_delete(const ghobject_t &oid) {
4312 Mutex::Locker l(read_error_lock);
4313 dout(10) << __func__ << ": clear error on " << oid << dendl;
4314 data_error_set.erase(oid);
4315 mdata_error_set.erase(oid);
4316 }
4317 bool FileStore::debug_data_eio(const ghobject_t &oid) {
4318 Mutex::Locker l(read_error_lock);
4319 if (data_error_set.count(oid)) {
4320 dout(10) << __func__ << ": inject error on " << oid << dendl;
4321 return true;
4322 } else {
4323 return false;
4324 }
4325 }
4326 bool FileStore::debug_mdata_eio(const ghobject_t &oid) {
4327 Mutex::Locker l(read_error_lock);
4328 if (mdata_error_set.count(oid)) {
4329 dout(10) << __func__ << ": inject error on " << oid << dendl;
4330 return true;
4331 } else {
4332 return false;
4333 }
4334 }
4335
4336
4337 // objects
4338
4339 int FileStore::getattr(const coll_t& _cid, const ghobject_t& oid, const char *name, bufferptr &bp)
4340 {
4341 tracepoint(objectstore, getattr_enter, _cid.c_str());
4342 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
4343 dout(15) << "getattr " << cid << "/" << oid << " '" << name << "'" << dendl;
4344 FDRef fd;
4345 int r = lfn_open(cid, oid, false, &fd);
4346 if (r < 0) {
4347 goto out;
4348 }
4349 char n[CHAIN_XATTR_MAX_NAME_LEN];
4350 get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN);
4351 r = _fgetattr(**fd, n, bp);
4352 lfn_close(fd);
4353 if (r == -ENODATA) {
4354 map<string, bufferlist> got;
4355 set<string> to_get;
4356 to_get.insert(string(name));
4357 Index index;
4358 r = get_index(cid, &index);
4359 if (r < 0) {
4360 dout(10) << __func__ << " could not get index r = " << r << dendl;
4361 goto out;
4362 }
4363 r = object_map->get_xattrs(oid, to_get, &got);
4364 if (r < 0 && r != -ENOENT) {
4365 dout(10) << __func__ << " get_xattrs err r =" << r << dendl;
4366 goto out;
4367 }
4368 if (got.empty()) {
4369 dout(10) << __func__ << " got.size() is 0" << dendl;
4370 return -ENODATA;
4371 }
4372 bp = bufferptr(got.begin()->second.c_str(),
4373 got.begin()->second.length());
4374 r = bp.length();
4375 }
4376 out:
4377 dout(10) << "getattr " << cid << "/" << oid << " '" << name << "' = " << r << dendl;
4378 assert(!m_filestore_fail_eio || r != -EIO);
4379 if (cct->_conf->filestore_debug_inject_read_err &&
4380 debug_mdata_eio(oid)) {
4381 return -EIO;
4382 } else {
4383 tracepoint(objectstore, getattr_exit, r);
4384 return r < 0 ? r : 0;
4385 }
4386 }
4387
4388 int FileStore::getattrs(const coll_t& _cid, const ghobject_t& oid, map<string,bufferptr>& aset)
4389 {
4390 tracepoint(objectstore, getattrs_enter, _cid.c_str());
4391 const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
4392 set<string> omap_attrs;
4393 map<string, bufferlist> omap_aset;
4394 Index index;
4395 dout(15) << "getattrs " << cid << "/" << oid << dendl;
4396 FDRef fd;
4397 bool spill_out = true;
4398 char buf[2];
4399
4400 int r = lfn_open(cid, oid, false, &fd);
4401 if (r < 0) {
4402 goto out;
4403 }
4404
4405 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4406 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT)))
4407 spill_out = false;
4408
4409 r = _fgetattrs(**fd, aset);
4410 lfn_close(fd);
4411 fd = FDRef(); // defensive
4412 if (r < 0) {
4413 goto out;
4414 }
4415
4416 if (!spill_out) {
4417 dout(10) << __func__ << " no xattr exists in object_map r = " << r << dendl;
4418 goto out;
4419 }
4420
4421 r = get_index(cid, &index);
4422 if (r < 0) {
4423 dout(10) << __func__ << " could not get index r = " << r << dendl;
4424 goto out;
4425 }
4426 {
4427 r = object_map->get_all_xattrs(oid, &omap_attrs);
4428 if (r < 0 && r != -ENOENT) {
4429 dout(10) << __func__ << " could not get omap_attrs r = " << r << dendl;
4430 goto out;
4431 }
4432
4433 r = object_map->get_xattrs(oid, omap_attrs, &omap_aset);
4434 if (r < 0 && r != -ENOENT) {
4435 dout(10) << __func__ << " could not get omap_attrs r = " << r << dendl;
4436 goto out;
4437 }
4438 if (r == -ENOENT)
4439 r = 0;
4440 }
4441 assert(omap_attrs.size() == omap_aset.size());
4442 for (map<string, bufferlist>::iterator i = omap_aset.begin();
4443 i != omap_aset.end();
4444 ++i) {
4445 string key(i->first);
4446 aset.insert(make_pair(key,
4447 bufferptr(i->second.c_str(), i->second.length())));
4448 }
4449 out:
4450 dout(10) << "getattrs " << cid << "/" << oid << " = " << r << dendl;
4451 assert(!m_filestore_fail_eio || r != -EIO);
4452
4453 if (cct->_conf->filestore_debug_inject_read_err &&
4454 debug_mdata_eio(oid)) {
4455 return -EIO;
4456 } else {
4457 tracepoint(objectstore, getattrs_exit, r);
4458 return r;
4459 }
4460 }
4461
4462 int FileStore::_setattrs(const coll_t& cid, const ghobject_t& oid, map<string,bufferptr>& aset,
4463 const SequencerPosition &spos)
4464 {
4465 map<string, bufferlist> omap_set;
4466 set<string> omap_remove;
4467 map<string, bufferptr> inline_set;
4468 map<string, bufferptr> inline_to_set;
4469 FDRef fd;
4470 int spill_out = -1;
4471 bool incomplete_inline = false;
4472
4473 int r = lfn_open(cid, oid, false, &fd);
4474 if (r < 0) {
4475 goto out;
4476 }
4477
4478 char buf[2];
4479 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4480 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT)))
4481 spill_out = 0;
4482 else
4483 spill_out = 1;
4484
4485 r = _fgetattrs(**fd, inline_set);
4486 incomplete_inline = (r == -E2BIG);
4487 assert(!m_filestore_fail_eio || r != -EIO);
4488 dout(15) << "setattrs " << cid << "/" << oid
4489 << (incomplete_inline ? " (incomplete_inline, forcing omap)" : "")
4490 << dendl;
4491
4492 for (map<string,bufferptr>::iterator p = aset.begin();
4493 p != aset.end();
4494 ++p) {
4495 char n[CHAIN_XATTR_MAX_NAME_LEN];
4496 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4497
4498 if (incomplete_inline) {
4499 chain_fremovexattr(**fd, n); // ignore any error
4500 omap_set[p->first].push_back(p->second);
4501 continue;
4502 }
4503
4504 if (p->second.length() > m_filestore_max_inline_xattr_size) {
4505 if (inline_set.count(p->first)) {
4506 inline_set.erase(p->first);
4507 r = chain_fremovexattr(**fd, n);
4508 if (r < 0)
4509 goto out_close;
4510 }
4511 omap_set[p->first].push_back(p->second);
4512 continue;
4513 }
4514
4515 if (!inline_set.count(p->first) &&
4516 inline_set.size() >= m_filestore_max_inline_xattrs) {
4517 omap_set[p->first].push_back(p->second);
4518 continue;
4519 }
4520 omap_remove.insert(p->first);
4521 inline_set.insert(*p);
4522
4523 inline_to_set.insert(*p);
4524 }
4525
4526 if (spill_out != 1 && !omap_set.empty()) {
4527 chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_SPILL_OUT,
4528 sizeof(XATTR_SPILL_OUT));
4529 }
4530
4531 r = _fsetattrs(**fd, inline_to_set);
4532 if (r < 0)
4533 goto out_close;
4534
4535 if (spill_out && !omap_remove.empty()) {
4536 r = object_map->remove_xattrs(oid, omap_remove, &spos);
4537 if (r < 0 && r != -ENOENT) {
4538 dout(10) << __func__ << " could not remove_xattrs r = " << r << dendl;
4539 assert(!m_filestore_fail_eio || r != -EIO);
4540 goto out_close;
4541 } else {
4542 r = 0; // don't confuse the debug output
4543 }
4544 }
4545
4546 if (!omap_set.empty()) {
4547 r = object_map->set_xattrs(oid, omap_set, &spos);
4548 if (r < 0) {
4549 dout(10) << __func__ << " could not set_xattrs r = " << r << dendl;
4550 assert(!m_filestore_fail_eio || r != -EIO);
4551 goto out_close;
4552 }
4553 }
4554 out_close:
4555 lfn_close(fd);
4556 out:
4557 dout(10) << "setattrs " << cid << "/" << oid << " = " << r << dendl;
4558 return r;
4559 }
4560
4561
4562 int FileStore::_rmattr(const coll_t& cid, const ghobject_t& oid, const char *name,
4563 const SequencerPosition &spos)
4564 {
4565 dout(15) << "rmattr " << cid << "/" << oid << " '" << name << "'" << dendl;
4566 FDRef fd;
4567 bool spill_out = true;
4568
4569 int r = lfn_open(cid, oid, false, &fd);
4570 if (r < 0) {
4571 goto out;
4572 }
4573
4574 char buf[2];
4575 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4576 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
4577 spill_out = false;
4578 }
4579
4580 char n[CHAIN_XATTR_MAX_NAME_LEN];
4581 get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN);
4582 r = chain_fremovexattr(**fd, n);
4583 if (r == -ENODATA && spill_out) {
4584 Index index;
4585 r = get_index(cid, &index);
4586 if (r < 0) {
4587 dout(10) << __func__ << " could not get index r = " << r << dendl;
4588 goto out_close;
4589 }
4590 set<string> to_remove;
4591 to_remove.insert(string(name));
4592 r = object_map->remove_xattrs(oid, to_remove, &spos);
4593 if (r < 0 && r != -ENOENT) {
4594 dout(10) << __func__ << " could not remove_xattrs index r = " << r << dendl;
4595 assert(!m_filestore_fail_eio || r != -EIO);
4596 goto out_close;
4597 }
4598 }
4599 out_close:
4600 lfn_close(fd);
4601 out:
4602 dout(10) << "rmattr " << cid << "/" << oid << " '" << name << "' = " << r << dendl;
4603 return r;
4604 }
4605
4606 int FileStore::_rmattrs(const coll_t& cid, const ghobject_t& oid,
4607 const SequencerPosition &spos)
4608 {
4609 dout(15) << "rmattrs " << cid << "/" << oid << dendl;
4610
4611 map<string,bufferptr> aset;
4612 FDRef fd;
4613 set<string> omap_attrs;
4614 Index index;
4615 bool spill_out = true;
4616
4617 int r = lfn_open(cid, oid, false, &fd);
4618 if (r < 0) {
4619 goto out;
4620 }
4621
4622 char buf[2];
4623 r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4624 if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
4625 spill_out = false;
4626 }
4627
4628 r = _fgetattrs(**fd, aset);
4629 if (r >= 0) {
4630 for (map<string,bufferptr>::iterator p = aset.begin(); p != aset.end(); ++p) {
4631 char n[CHAIN_XATTR_MAX_NAME_LEN];
4632 get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4633 r = chain_fremovexattr(**fd, n);
4634 if (r < 0) {
4635 dout(10) << __func__ << " could not remove xattr r = " << r << dendl;
4636 goto out_close;
4637 }
4638 }
4639 }
4640
4641 if (!spill_out) {
4642 dout(10) << __func__ << " no xattr exists in object_map r = " << r << dendl;
4643 goto out_close;
4644 }
4645
4646 r = get_index(cid, &index);
4647 if (r < 0) {
4648 dout(10) << __func__ << " could not get index r = " << r << dendl;
4649 goto out_close;
4650 }
4651 {
4652 r = object_map->get_all_xattrs(oid, &omap_attrs);
4653 if (r < 0 && r != -ENOENT) {
4654 dout(10) << __func__ << " could not get omap_attrs r = " << r << dendl;
4655 assert(!m_filestore_fail_eio || r != -EIO);
4656 goto out_close;
4657 }
4658 r = object_map->remove_xattrs(oid, omap_attrs, &spos);
4659 if (r < 0 && r != -ENOENT) {
4660 dout(10) << __func__ << " could not remove omap_attrs r = " << r << dendl;
4661 goto out_close;
4662 }
4663 if (r == -ENOENT)
4664 r = 0;
4665 chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
4666 sizeof(XATTR_NO_SPILL_OUT));
4667 }
4668
4669 out_close:
4670 lfn_close(fd);
4671 out:
4672 dout(10) << "rmattrs " << cid << "/" << oid << " = " << r << dendl;
4673 return r;
4674 }
4675
4676
4677
4678
4679 int FileStore::_collection_remove_recursive(const coll_t &cid,
4680 const SequencerPosition &spos)
4681 {
4682 struct stat st;
4683 int r = collection_stat(cid, &st);
4684 if (r < 0) {
4685 if (r == -ENOENT)
4686 return 0;
4687 return r;
4688 }
4689
4690 vector<ghobject_t> objects;
4691 ghobject_t max;
4692 while (!max.is_max()) {
4693 r = collection_list(cid, max, ghobject_t::get_max(),
4694 300, &objects, &max);
4695 if (r < 0)
4696 return r;
4697 for (vector<ghobject_t>::iterator i = objects.begin();
4698 i != objects.end();
4699 ++i) {
4700 assert(_check_replay_guard(cid, *i, spos));
4701 r = _remove(cid, *i, spos);
4702 if (r < 0)
4703 return r;
4704 }
4705 objects.clear();
4706 }
4707 return _destroy_collection(cid);
4708 }
4709
4710 // --------------------------
4711 // collections
4712
4713 int FileStore::list_collections(vector<coll_t>& ls)
4714 {
4715 return list_collections(ls, false);
4716 }
4717
4718 int FileStore::list_collections(vector<coll_t>& ls, bool include_temp)
4719 {
4720 tracepoint(objectstore, list_collections_enter);
4721 dout(10) << "list_collections" << dendl;
4722
4723 char fn[PATH_MAX];
4724 snprintf(fn, sizeof(fn), "%s/current", basedir.c_str());
4725
4726 int r = 0;
4727 DIR *dir = ::opendir(fn);
4728 if (!dir) {
4729 r = -errno;
4730 derr << "tried opening directory " << fn << ": " << cpp_strerror(-r) << dendl;
4731 assert(!m_filestore_fail_eio || r != -EIO);
4732 return r;
4733 }
4734
4735 struct dirent *de = nullptr;
4736 while ((de = ::readdir(dir))) {
4737 if (de->d_type == DT_UNKNOWN) {
4738 // d_type not supported (non-ext[234], btrfs), must stat
4739 struct stat sb;
4740 char filename[PATH_MAX];
4741 snprintf(filename, sizeof(filename), "%s/%s", fn, de->d_name);
4742
4743 r = ::stat(filename, &sb);
4744 if (r < 0) {
4745 r = -errno;
4746 derr << "stat on " << filename << ": " << cpp_strerror(-r) << dendl;
4747 assert(!m_filestore_fail_eio || r != -EIO);
4748 break;
4749 }
4750 if (!S_ISDIR(sb.st_mode)) {
4751 continue;
4752 }
4753 } else if (de->d_type != DT_DIR) {
4754 continue;
4755 }
4756 if (strcmp(de->d_name, "omap") == 0) {
4757 continue;
4758 }
4759 if (de->d_name[0] == '.' &&
4760 (de->d_name[1] == '\0' ||
4761 (de->d_name[1] == '.' &&
4762 de->d_name[2] == '\0')))
4763 continue;
4764 coll_t cid;
4765 if (!cid.parse(de->d_name)) {
4766 derr << "ignoring invalid collection '" << de->d_name << "'" << dendl;
4767 continue;
4768 }
4769 if (!cid.is_temp() || include_temp)
4770 ls.push_back(cid);
4771 }
4772
4773 if (r > 0) {
4774 derr << "trying readdir " << fn << ": " << cpp_strerror(r) << dendl;
4775 r = -r;
4776 }
4777
4778 ::closedir(dir);
4779 assert(!m_filestore_fail_eio || r != -EIO);
4780 tracepoint(objectstore, list_collections_exit, r);
4781 return r;
4782 }
4783
4784 int FileStore::collection_stat(const coll_t& c, struct stat *st)
4785 {
4786 tracepoint(objectstore, collection_stat_enter, c.c_str());
4787 char fn[PATH_MAX];
4788 get_cdir(c, fn, sizeof(fn));
4789 dout(15) << "collection_stat " << fn << dendl;
4790 int r = ::stat(fn, st);
4791 if (r < 0)
4792 r = -errno;
4793 dout(10) << "collection_stat " << fn << " = " << r << dendl;
4794 assert(!m_filestore_fail_eio || r != -EIO);
4795 tracepoint(objectstore, collection_stat_exit, r);
4796 return r;
4797 }
4798
4799 bool FileStore::collection_exists(const coll_t& c)
4800 {
4801 tracepoint(objectstore, collection_exists_enter, c.c_str());
4802 struct stat st;
4803 bool ret = collection_stat(c, &st) == 0;
4804 tracepoint(objectstore, collection_exists_exit, ret);
4805 return ret;
4806 }
4807
4808 int FileStore::collection_empty(const coll_t& c, bool *empty)
4809 {
4810 tracepoint(objectstore, collection_empty_enter, c.c_str());
4811 dout(15) << "collection_empty " << c << dendl;
4812 Index index;
4813 int r = get_index(c, &index);
4814 if (r < 0) {
4815 derr << __func__ << " get_index returned: " << cpp_strerror(r)
4816 << dendl;
4817 return r;
4818 }
4819
4820 assert(NULL != index.index);
4821 RWLock::RLocker l((index.index)->access_lock);
4822
4823 vector<ghobject_t> ls;
4824 r = index->collection_list_partial(ghobject_t(), ghobject_t::get_max(),
4825 1, &ls, NULL);
4826 if (r < 0) {
4827 derr << __func__ << " collection_list_partial returned: "
4828 << cpp_strerror(r) << dendl;
4829 assert(!m_filestore_fail_eio || r != -EIO);
4830 return r;
4831 }
4832 *empty = ls.empty();
4833 tracepoint(objectstore, collection_empty_exit, *empty);
4834 return 0;
4835 }
4836
4837 int FileStore::_collection_set_bits(const coll_t& c, int bits)
4838 {
4839 char fn[PATH_MAX];
4840 get_cdir(c, fn, sizeof(fn));
4841 dout(10) << "collection_set_bits " << fn << " " << bits << dendl;
4842 char n[PATH_MAX];
4843 int r;
4844 int32_t v = bits;
4845 int fd = ::open(fn, O_RDONLY);
4846 if (fd < 0) {
4847 r = -errno;
4848 goto out;
4849 }
4850 get_attrname("bits", n, PATH_MAX);
4851 r = chain_fsetxattr(fd, n, (char*)&v, sizeof(v));
4852 VOID_TEMP_FAILURE_RETRY(::close(fd));
4853 out:
4854 dout(10) << "collection_setattr " << fn << " " << bits << " = " << r << dendl;
4855 return r;
4856 }
4857
4858 int FileStore::collection_bits(const coll_t& c)
4859 {
4860 char fn[PATH_MAX];
4861 get_cdir(c, fn, sizeof(fn));
4862 dout(15) << "collection_bits " << fn << dendl;
4863 int r;
4864 char n[PATH_MAX];
4865 int32_t bits;
4866 int fd = ::open(fn, O_RDONLY);
4867 if (fd < 0) {
4868 bits = r = -errno;
4869 goto out;
4870 }
4871 get_attrname("bits", n, PATH_MAX);
4872 r = chain_fgetxattr(fd, n, (char*)&bits, sizeof(bits));
4873 VOID_TEMP_FAILURE_RETRY(::close(fd));
4874 if (r < 0) {
4875 bits = r;
4876 goto out;
4877 }
4878 out:
4879 dout(10) << "collection_bits " << fn << " = " << bits << dendl;
4880 return bits;
4881 }
4882
4883 int FileStore::collection_list(const coll_t& c,
4884 const ghobject_t& orig_start,
4885 const ghobject_t& end,
4886 int max,
4887 vector<ghobject_t> *ls, ghobject_t *next)
4888 {
4889 ghobject_t start = orig_start;
4890 if (start.is_max())
4891 return 0;
4892
4893 ghobject_t temp_next;
4894 if (!next)
4895 next = &temp_next;
4896 // figure out the pool id. we need this in order to generate a
4897 // meaningful 'next' value.
4898 int64_t pool = -1;
4899 shard_id_t shard;
4900 {
4901 spg_t pgid;
4902 if (c.is_temp(&pgid)) {
4903 pool = -2 - pgid.pool();
4904 shard = pgid.shard;
4905 } else if (c.is_pg(&pgid)) {
4906 pool = pgid.pool();
4907 shard = pgid.shard;
4908 } else if (c.is_meta()) {
4909 pool = -1;
4910 shard = shard_id_t::NO_SHARD;
4911 } else {
4912 // hrm, the caller is test code! we should get kill it off. for now,
4913 // tolerate it.
4914 pool = 0;
4915 shard = shard_id_t::NO_SHARD;
4916 }
4917 dout(20) << __func__ << " pool is " << pool << " shard is " << shard
4918 << " pgid " << pgid << dendl;
4919 }
4920 ghobject_t sep;
4921 sep.hobj.pool = -1;
4922 sep.set_shard(shard);
4923 if (!c.is_temp() && !c.is_meta()) {
4924 if (start < sep) {
4925 dout(10) << __func__ << " first checking temp pool" << dendl;
4926 coll_t temp = c.get_temp();
4927 int r = collection_list(temp, start, end, max, ls, next);
4928 if (r < 0)
4929 return r;
4930 if (*next != ghobject_t::get_max())
4931 return r;
4932 start = sep;
4933 dout(10) << __func__ << " fall through to non-temp collection, start "
4934 << start << dendl;
4935 } else {
4936 dout(10) << __func__ << " start " << start << " >= sep " << sep << dendl;
4937 }
4938 }
4939
4940 Index index;
4941 int r = get_index(c, &index);
4942 if (r < 0)
4943 return r;
4944
4945 assert(NULL != index.index);
4946 RWLock::RLocker l((index.index)->access_lock);
4947
4948 r = index->collection_list_partial(start, end, max, ls, next);
4949
4950 if (r < 0) {
4951 assert(!m_filestore_fail_eio || r != -EIO);
4952 return r;
4953 }
4954 dout(20) << "objects: " << *ls << dendl;
4955
4956 // HashIndex doesn't know the pool when constructing a 'next' value
4957 if (next && !next->is_max()) {
4958 next->hobj.pool = pool;
4959 next->set_shard(shard);
4960 dout(20) << " next " << *next << dendl;
4961 }
4962
4963 return 0;
4964 }
4965
4966 int FileStore::omap_get(const coll_t& _c, const ghobject_t &hoid,
4967 bufferlist *header,
4968 map<string, bufferlist> *out)
4969 {
4970 tracepoint(objectstore, omap_get_enter, _c.c_str());
4971 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
4972 dout(15) << __func__ << " " << c << "/" << hoid << dendl;
4973 Index index;
4974 int r = get_index(c, &index);
4975 if (r < 0)
4976 return r;
4977 {
4978 assert(NULL != index.index);
4979 RWLock::RLocker l((index.index)->access_lock);
4980 r = lfn_find(hoid, index);
4981 if (r < 0)
4982 return r;
4983 }
4984 r = object_map->get(hoid, header, out);
4985 if (r < 0 && r != -ENOENT) {
4986 assert(!m_filestore_fail_eio || r != -EIO);
4987 return r;
4988 }
4989 tracepoint(objectstore, omap_get_exit, 0);
4990 return 0;
4991 }
4992
4993 int FileStore::omap_get_header(
4994 const coll_t& _c,
4995 const ghobject_t &hoid,
4996 bufferlist *bl,
4997 bool allow_eio)
4998 {
4999 tracepoint(objectstore, omap_get_header_enter, _c.c_str());
5000 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5001 dout(15) << __func__ << " " << c << "/" << hoid << dendl;
5002 Index index;
5003 int r = get_index(c, &index);
5004 if (r < 0)
5005 return r;
5006 {
5007 assert(NULL != index.index);
5008 RWLock::RLocker l((index.index)->access_lock);
5009 r = lfn_find(hoid, index);
5010 if (r < 0)
5011 return r;
5012 }
5013 r = object_map->get_header(hoid, bl);
5014 if (r < 0 && r != -ENOENT) {
5015 assert(allow_eio || !m_filestore_fail_eio || r != -EIO);
5016 return r;
5017 }
5018 tracepoint(objectstore, omap_get_header_exit, 0);
5019 return 0;
5020 }
5021
5022 int FileStore::omap_get_keys(const coll_t& _c, const ghobject_t &hoid, set<string> *keys)
5023 {
5024 tracepoint(objectstore, omap_get_keys_enter, _c.c_str());
5025 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5026 dout(15) << __func__ << " " << c << "/" << hoid << dendl;
5027 Index index;
5028 int r = get_index(c, &index);
5029 if (r < 0)
5030 return r;
5031 {
5032 assert(NULL != index.index);
5033 RWLock::RLocker l((index.index)->access_lock);
5034 r = lfn_find(hoid, index);
5035 if (r < 0)
5036 return r;
5037 }
5038 r = object_map->get_keys(hoid, keys);
5039 if (r < 0 && r != -ENOENT) {
5040 assert(!m_filestore_fail_eio || r != -EIO);
5041 return r;
5042 }
5043 tracepoint(objectstore, omap_get_keys_exit, 0);
5044 return 0;
5045 }
5046
5047 int FileStore::omap_get_values(const coll_t& _c, const ghobject_t &hoid,
5048 const set<string> &keys,
5049 map<string, bufferlist> *out)
5050 {
5051 tracepoint(objectstore, omap_get_values_enter, _c.c_str());
5052 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5053 dout(15) << __func__ << " " << c << "/" << hoid << dendl;
5054 Index index;
5055 const char *where = "()";
5056 int r = get_index(c, &index);
5057 if (r < 0) {
5058 where = " (get_index)";
5059 goto out;
5060 }
5061 {
5062 assert(NULL != index.index);
5063 RWLock::RLocker l((index.index)->access_lock);
5064 r = lfn_find(hoid, index);
5065 if (r < 0) {
5066 where = " (lfn_find)";
5067 goto out;
5068 }
5069 }
5070 r = object_map->get_values(hoid, keys, out);
5071 if (r < 0 && r != -ENOENT) {
5072 assert(!m_filestore_fail_eio || r != -EIO);
5073 where = " (get_values)";
5074 goto out;
5075 }
5076 r = 0;
5077 out:
5078 tracepoint(objectstore, omap_get_values_exit, r);
5079 dout(15) << __func__ << " " << c << "/" << hoid << " = " << r
5080 << where << dendl;
5081 return r;
5082 }
5083
5084 int FileStore::omap_check_keys(const coll_t& _c, const ghobject_t &hoid,
5085 const set<string> &keys,
5086 set<string> *out)
5087 {
5088 tracepoint(objectstore, omap_check_keys_enter, _c.c_str());
5089 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5090 dout(15) << __func__ << " " << c << "/" << hoid << dendl;
5091
5092 Index index;
5093 int r = get_index(c, &index);
5094 if (r < 0)
5095 return r;
5096 {
5097 assert(NULL != index.index);
5098 RWLock::RLocker l((index.index)->access_lock);
5099 r = lfn_find(hoid, index);
5100 if (r < 0)
5101 return r;
5102 }
5103 r = object_map->check_keys(hoid, keys, out);
5104 if (r < 0 && r != -ENOENT) {
5105 assert(!m_filestore_fail_eio || r != -EIO);
5106 return r;
5107 }
5108 tracepoint(objectstore, omap_check_keys_exit, 0);
5109 return 0;
5110 }
5111
5112 ObjectMap::ObjectMapIterator FileStore::get_omap_iterator(const coll_t& _c,
5113 const ghobject_t &hoid)
5114 {
5115 tracepoint(objectstore, get_omap_iterator, _c.c_str());
5116 const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5117 dout(15) << __func__ << " " << c << "/" << hoid << dendl;
5118 Index index;
5119 int r = get_index(c, &index);
5120 if (r < 0) {
5121 dout(10) << __func__ << " " << c << "/" << hoid << " = 0 "
5122 << "(get_index failed with " << cpp_strerror(r) << ")" << dendl;
5123 return ObjectMap::ObjectMapIterator();
5124 }
5125 {
5126 assert(NULL != index.index);
5127 RWLock::RLocker l((index.index)->access_lock);
5128 r = lfn_find(hoid, index);
5129 if (r < 0) {
5130 dout(10) << __func__ << " " << c << "/" << hoid << " = 0 "
5131 << "(lfn_find failed with " << cpp_strerror(r) << ")" << dendl;
5132 return ObjectMap::ObjectMapIterator();
5133 }
5134 }
5135 return object_map->get_iterator(hoid);
5136 }
5137
5138 int FileStore::_collection_hint_expected_num_objs(const coll_t& c, uint32_t pg_num,
5139 uint64_t expected_num_objs,
5140 const SequencerPosition &spos)
5141 {
5142 dout(15) << __func__ << " collection: " << c << " pg number: "
5143 << pg_num << " expected number of objects: " << expected_num_objs << dendl;
5144
5145 bool empty;
5146 int ret = collection_empty(c, &empty);
5147 if (ret < 0)
5148 return ret;
5149 if (!empty && !replaying) {
5150 dout(0) << "Failed to give an expected number of objects hint to collection : "
5151 << c << ", only empty collection can take such type of hint. " << dendl;
5152 return 0;
5153 }
5154
5155 Index index;
5156 ret = get_index(c, &index);
5157 if (ret < 0)
5158 return ret;
5159 // Pre-hash the collection
5160 ret = index->pre_hash_collection(pg_num, expected_num_objs);
5161 dout(10) << "pre_hash_collection " << c << " = " << ret << dendl;
5162 if (ret < 0)
5163 return ret;
5164 _set_replay_guard(c, spos);
5165
5166 return 0;
5167 }
5168
5169 int FileStore::_create_collection(
5170 const coll_t& c,
5171 int bits,
5172 const SequencerPosition &spos)
5173 {
5174 char fn[PATH_MAX];
5175 get_cdir(c, fn, sizeof(fn));
5176 dout(15) << "create_collection " << fn << dendl;
5177 int r = ::mkdir(fn, 0755);
5178 if (r < 0)
5179 r = -errno;
5180 if (r == -EEXIST && replaying)
5181 r = 0;
5182 dout(10) << "create_collection " << fn << " = " << r << dendl;
5183
5184 if (r < 0)
5185 return r;
5186 r = init_index(c);
5187 if (r < 0)
5188 return r;
5189 r = _collection_set_bits(c, bits);
5190 if (r < 0)
5191 return r;
5192 // create parallel temp collection, too
5193 if (!c.is_meta() && !c.is_temp()) {
5194 coll_t temp = c.get_temp();
5195 r = _create_collection(temp, 0, spos);
5196 if (r < 0)
5197 return r;
5198 }
5199
5200 _set_replay_guard(c, spos);
5201 return 0;
5202 }
5203
5204 int FileStore::_destroy_collection(const coll_t& c)
5205 {
5206 int r = 0;
5207 char fn[PATH_MAX];
5208 get_cdir(c, fn, sizeof(fn));
5209 dout(15) << "_destroy_collection " << fn << dendl;
5210 {
5211 Index from;
5212 r = get_index(c, &from);
5213 if (r < 0)
5214 goto out;
5215 assert(NULL != from.index);
5216 RWLock::WLocker l((from.index)->access_lock);
5217
5218 r = from->prep_delete();
5219 if (r < 0)
5220 goto out;
5221 }
5222 r = ::rmdir(fn);
5223 if (r < 0) {
5224 r = -errno;
5225 goto out;
5226 }
5227
5228 out:
5229 // destroy parallel temp collection, too
5230 if (!c.is_meta() && !c.is_temp()) {
5231 coll_t temp = c.get_temp();
5232 int r2 = _destroy_collection(temp);
5233 if (r2 < 0) {
5234 r = r2;
5235 goto out_final;
5236 }
5237 }
5238
5239 out_final:
5240 dout(10) << "_destroy_collection " << fn << " = " << r << dendl;
5241 return r;
5242 }
5243
5244
5245 int FileStore::_collection_add(const coll_t& c, const coll_t& oldcid, const ghobject_t& o,
5246 const SequencerPosition& spos)
5247 {
5248 dout(15) << "collection_add " << c << "/" << o << " from " << oldcid << "/" << o << dendl;
5249
5250 int dstcmp = _check_replay_guard(c, o, spos);
5251 if (dstcmp < 0)
5252 return 0;
5253
5254 // check the src name too; it might have a newer guard, and we don't
5255 // want to clobber it
5256 int srccmp = _check_replay_guard(oldcid, o, spos);
5257 if (srccmp < 0)
5258 return 0;
5259
5260 // open guard on object so we don't any previous operations on the
5261 // new name that will modify the source inode.
5262 FDRef fd;
5263 int r = lfn_open(oldcid, o, 0, &fd);
5264 if (r < 0) {
5265 // the source collection/object does not exist. If we are replaying, we
5266 // should be safe, so just return 0 and move on.
5267 assert(replaying);
5268 dout(10) << "collection_add " << c << "/" << o << " from "
5269 << oldcid << "/" << o << " (dne, continue replay) " << dendl;
5270 return 0;
5271 }
5272 if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress"
5273 _set_replay_guard(**fd, spos, &o, true);
5274 }
5275
5276 r = lfn_link(oldcid, c, o, o);
5277 if (replaying && !backend->can_checkpoint() &&
5278 r == -EEXIST) // crashed between link() and set_replay_guard()
5279 r = 0;
5280
5281 _inject_failure();
5282
5283 // close guard on object so we don't do this again
5284 if (r == 0) {
5285 _close_replay_guard(**fd, spos);
5286 }
5287 lfn_close(fd);
5288
5289 dout(10) << "collection_add " << c << "/" << o << " from " << oldcid << "/" << o << " = " << r << dendl;
5290 return r;
5291 }
5292
5293 int FileStore::_collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid,
5294 coll_t c, const ghobject_t& o,
5295 const SequencerPosition& spos,
5296 bool allow_enoent)
5297 {
5298 dout(15) << __func__ << " " << c << "/" << o << " from " << oldcid << "/" << oldoid << dendl;
5299 int r = 0;
5300 int dstcmp, srccmp;
5301
5302 if (replaying) {
5303 /* If the destination collection doesn't exist during replay,
5304 * we need to delete the src object and continue on
5305 */
5306 if (!collection_exists(c))
5307 goto out_rm_src;
5308 }
5309
5310 dstcmp = _check_replay_guard(c, o, spos);
5311 if (dstcmp < 0)
5312 goto out_rm_src;
5313
5314 // check the src name too; it might have a newer guard, and we don't
5315 // want to clobber it
5316 srccmp = _check_replay_guard(oldcid, oldoid, spos);
5317 if (srccmp < 0)
5318 return 0;
5319
5320 {
5321 // open guard on object so we don't any previous operations on the
5322 // new name that will modify the source inode.
5323 FDRef fd;
5324 r = lfn_open(oldcid, oldoid, 0, &fd);
5325 if (r < 0) {
5326 // the source collection/object does not exist. If we are replaying, we
5327 // should be safe, so just return 0 and move on.
5328 if (replaying) {
5329 dout(10) << __func__ << " " << c << "/" << o << " from "
5330 << oldcid << "/" << oldoid << " (dne, continue replay) " << dendl;
5331 } else if (allow_enoent) {
5332 dout(10) << __func__ << " " << c << "/" << o << " from "
5333 << oldcid << "/" << oldoid << " (dne, ignoring enoent)"
5334 << dendl;
5335 } else {
5336 assert(0 == "ERROR: source must exist");
5337 }
5338
5339 if (!replaying) {
5340 return 0;
5341 }
5342 if (allow_enoent && dstcmp > 0) { // if dstcmp == 0, try_rename was started.
5343 return 0;
5344 }
5345
5346 r = 0; // don't know if object_map was cloned
5347 } else {
5348 if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress"
5349 _set_replay_guard(**fd, spos, &o, true);
5350 }
5351
5352 r = lfn_link(oldcid, c, oldoid, o);
5353 if (replaying && !backend->can_checkpoint() &&
5354 r == -EEXIST) // crashed between link() and set_replay_guard()
5355 r = 0;
5356
5357 lfn_close(fd);
5358 fd = FDRef();
5359
5360 _inject_failure();
5361 }
5362
5363 if (r == 0) {
5364 // the name changed; link the omap content
5365 r = object_map->rename(oldoid, o, &spos);
5366 if (r == -ENOENT)
5367 r = 0;
5368 }
5369
5370 _inject_failure();
5371
5372 if (r == 0)
5373 r = lfn_unlink(oldcid, oldoid, spos, true);
5374
5375 if (r == 0)
5376 r = lfn_open(c, o, 0, &fd);
5377
5378 // close guard on object so we don't do this again
5379 if (r == 0) {
5380 _close_replay_guard(**fd, spos, &o);
5381 lfn_close(fd);
5382 }
5383 }
5384
5385 dout(10) << __func__ << " " << c << "/" << o << " from " << oldcid << "/" << oldoid
5386 << " = " << r << dendl;
5387 return r;
5388
5389 out_rm_src:
5390 // remove source
5391 if (_check_replay_guard(oldcid, oldoid, spos) > 0) {
5392 r = lfn_unlink(oldcid, oldoid, spos, true);
5393 }
5394
5395 dout(10) << __func__ << " " << c << "/" << o << " from " << oldcid << "/" << oldoid
5396 << " = " << r << dendl;
5397 return r;
5398 }
5399
5400 void FileStore::_inject_failure()
5401 {
5402 if (m_filestore_kill_at.read()) {
5403 int final = m_filestore_kill_at.dec();
5404 dout(5) << "_inject_failure " << (final+1) << " -> " << final << dendl;
5405 if (final == 0) {
5406 derr << "_inject_failure KILLING" << dendl;
5407 cct->_log->flush();
5408 _exit(1);
5409 }
5410 }
5411 }
5412
5413 int FileStore::_omap_clear(const coll_t& cid, const ghobject_t &hoid,
5414 const SequencerPosition &spos) {
5415 dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
5416 Index index;
5417 int r = get_index(cid, &index);
5418 if (r < 0)
5419 return r;
5420 {
5421 assert(NULL != index.index);
5422 RWLock::RLocker l((index.index)->access_lock);
5423 r = lfn_find(hoid, index);
5424 if (r < 0)
5425 return r;
5426 }
5427 r = object_map->clear_keys_header(hoid, &spos);
5428 if (r < 0 && r != -ENOENT)
5429 return r;
5430 return 0;
5431 }
5432
5433 int FileStore::_omap_setkeys(const coll_t& cid, const ghobject_t &hoid,
5434 const map<string, bufferlist> &aset,
5435 const SequencerPosition &spos) {
5436 dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
5437 Index index;
5438 int r;
5439 //treat pgmeta as a logical object, skip to check exist
5440 if (hoid.is_pgmeta())
5441 goto skip;
5442
5443 r = get_index(cid, &index);
5444 if (r < 0) {
5445 dout(20) << __func__ << " get_index got " << cpp_strerror(r) << dendl;
5446 return r;
5447 }
5448 {
5449 assert(NULL != index.index);
5450 RWLock::RLocker l((index.index)->access_lock);
5451 r = lfn_find(hoid, index);
5452 if (r < 0) {
5453 dout(20) << __func__ << " lfn_find got " << cpp_strerror(r) << dendl;
5454 return r;
5455 }
5456 }
5457 skip:
5458 if (g_conf->subsys.should_gather(ceph_subsys_filestore, 20)) {
5459 for (auto& p : aset) {
5460 dout(20) << __func__ << " set " << p.first << dendl;
5461 }
5462 }
5463 r = object_map->set_keys(hoid, aset, &spos);
5464 dout(20) << __func__ << " " << cid << "/" << hoid << " = " << r << dendl;
5465 return r;
5466 }
5467
5468 int FileStore::_omap_rmkeys(const coll_t& cid, const ghobject_t &hoid,
5469 const set<string> &keys,
5470 const SequencerPosition &spos) {
5471 dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
5472 Index index;
5473 int r;
5474 //treat pgmeta as a logical object, skip to check exist
5475 if (hoid.is_pgmeta())
5476 goto skip;
5477
5478 r = get_index(cid, &index);
5479 if (r < 0)
5480 return r;
5481 {
5482 assert(NULL != index.index);
5483 RWLock::RLocker l((index.index)->access_lock);
5484 r = lfn_find(hoid, index);
5485 if (r < 0)
5486 return r;
5487 }
5488 skip:
5489 r = object_map->rm_keys(hoid, keys, &spos);
5490 if (r < 0 && r != -ENOENT)
5491 return r;
5492 return 0;
5493 }
5494
5495 int FileStore::_omap_rmkeyrange(const coll_t& cid, const ghobject_t &hoid,
5496 const string& first, const string& last,
5497 const SequencerPosition &spos) {
5498 dout(15) << __func__ << " " << cid << "/" << hoid << " [" << first << "," << last << "]" << dendl;
5499 set<string> keys;
5500 {
5501 ObjectMap::ObjectMapIterator iter = get_omap_iterator(cid, hoid);
5502 if (!iter)
5503 return -ENOENT;
5504 for (iter->lower_bound(first); iter->valid() && iter->key() < last;
5505 iter->next()) {
5506 keys.insert(iter->key());
5507 }
5508 }
5509 return _omap_rmkeys(cid, hoid, keys, spos);
5510 }
5511
5512 int FileStore::_omap_setheader(const coll_t& cid, const ghobject_t &hoid,
5513 const bufferlist &bl,
5514 const SequencerPosition &spos)
5515 {
5516 dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
5517 Index index;
5518 int r = get_index(cid, &index);
5519 if (r < 0)
5520 return r;
5521 {
5522 assert(NULL != index.index);
5523 RWLock::RLocker l((index.index)->access_lock);
5524 r = lfn_find(hoid, index);
5525 if (r < 0)
5526 return r;
5527 }
5528 return object_map->set_header(hoid, bl, &spos);
5529 }
5530
5531 int FileStore::_split_collection(const coll_t& cid,
5532 uint32_t bits,
5533 uint32_t rem,
5534 coll_t dest,
5535 const SequencerPosition &spos)
5536 {
5537 int r;
5538 {
5539 dout(15) << __func__ << " " << cid << " bits: " << bits << dendl;
5540 if (!collection_exists(cid)) {
5541 dout(2) << __func__ << ": " << cid << " DNE" << dendl;
5542 assert(replaying);
5543 return 0;
5544 }
5545 if (!collection_exists(dest)) {
5546 dout(2) << __func__ << ": " << dest << " DNE" << dendl;
5547 assert(replaying);
5548 return 0;
5549 }
5550
5551 int dstcmp = _check_replay_guard(dest, spos);
5552 if (dstcmp < 0)
5553 return 0;
5554
5555 int srccmp = _check_replay_guard(cid, spos);
5556 if (srccmp < 0)
5557 return 0;
5558
5559 _set_global_replay_guard(cid, spos);
5560 _set_replay_guard(cid, spos, true);
5561 _set_replay_guard(dest, spos, true);
5562
5563 Index from;
5564 r = get_index(cid, &from);
5565
5566 Index to;
5567 if (!r)
5568 r = get_index(dest, &to);
5569
5570 if (!r) {
5571 assert(NULL != from.index);
5572 RWLock::WLocker l1((from.index)->access_lock);
5573
5574 assert(NULL != to.index);
5575 RWLock::WLocker l2((to.index)->access_lock);
5576
5577 r = from->split(rem, bits, to.index);
5578 }
5579
5580 _close_replay_guard(cid, spos);
5581 _close_replay_guard(dest, spos);
5582 }
5583 _collection_set_bits(cid, bits);
5584 if (!r && cct->_conf->filestore_debug_verify_split) {
5585 vector<ghobject_t> objects;
5586 ghobject_t next;
5587 while (1) {
5588 collection_list(
5589 cid,
5590 next, ghobject_t::get_max(),
5591 get_ideal_list_max(),
5592 &objects,
5593 &next);
5594 if (objects.empty())
5595 break;
5596 for (vector<ghobject_t>::iterator i = objects.begin();
5597 i != objects.end();
5598 ++i) {
5599 dout(20) << __func__ << ": " << *i << " still in source "
5600 << cid << dendl;
5601 assert(!i->match(bits, rem));
5602 }
5603 objects.clear();
5604 }
5605 next = ghobject_t();
5606 while (1) {
5607 collection_list(
5608 dest,
5609 next, ghobject_t::get_max(),
5610 get_ideal_list_max(),
5611 &objects,
5612 &next);
5613 if (objects.empty())
5614 break;
5615 for (vector<ghobject_t>::iterator i = objects.begin();
5616 i != objects.end();
5617 ++i) {
5618 dout(20) << __func__ << ": " << *i << " now in dest "
5619 << *i << dendl;
5620 assert(i->match(bits, rem));
5621 }
5622 objects.clear();
5623 }
5624 }
5625 return r;
5626 }
5627
5628 int FileStore::_set_alloc_hint(const coll_t& cid, const ghobject_t& oid,
5629 uint64_t expected_object_size,
5630 uint64_t expected_write_size)
5631 {
5632 dout(15) << "set_alloc_hint " << cid << "/" << oid << " object_size " << expected_object_size << " write_size " << expected_write_size << dendl;
5633
5634 FDRef fd;
5635 int ret = 0;
5636
5637 if (expected_object_size == 0 || expected_write_size == 0)
5638 goto out;
5639
5640 ret = lfn_open(cid, oid, false, &fd);
5641 if (ret < 0)
5642 goto out;
5643
5644 {
5645 // TODO: a more elaborate hint calculation
5646 uint64_t hint = MIN(expected_write_size, m_filestore_max_alloc_hint_size);
5647
5648 ret = backend->set_alloc_hint(**fd, hint);
5649 dout(20) << "set_alloc_hint hint " << hint << " ret " << ret << dendl;
5650 }
5651
5652 lfn_close(fd);
5653 out:
5654 dout(10) << "set_alloc_hint " << cid << "/" << oid << " object_size " << expected_object_size << " write_size " << expected_write_size << " = " << ret << dendl;
5655 assert(!m_filestore_fail_eio || ret != -EIO);
5656 return ret;
5657 }
5658
5659 const char** FileStore::get_tracked_conf_keys() const
5660 {
5661 static const char* KEYS[] = {
5662 "filestore_max_inline_xattr_size",
5663 "filestore_max_inline_xattr_size_xfs",
5664 "filestore_max_inline_xattr_size_btrfs",
5665 "filestore_max_inline_xattr_size_other",
5666 "filestore_max_inline_xattrs",
5667 "filestore_max_inline_xattrs_xfs",
5668 "filestore_max_inline_xattrs_btrfs",
5669 "filestore_max_inline_xattrs_other",
5670 "filestore_max_xattr_value_size",
5671 "filestore_max_xattr_value_size_xfs",
5672 "filestore_max_xattr_value_size_btrfs",
5673 "filestore_max_xattr_value_size_other",
5674 "filestore_min_sync_interval",
5675 "filestore_max_sync_interval",
5676 "filestore_queue_max_ops",
5677 "filestore_queue_max_bytes",
5678 "filestore_expected_throughput_bytes",
5679 "filestore_expected_throughput_ops",
5680 "filestore_queue_low_threshhold",
5681 "filestore_queue_high_threshhold",
5682 "filestore_queue_high_delay_multiple",
5683 "filestore_queue_max_delay_multiple",
5684 "filestore_commit_timeout",
5685 "filestore_dump_file",
5686 "filestore_kill_at",
5687 "filestore_fail_eio",
5688 "filestore_fadvise",
5689 "filestore_sloppy_crc",
5690 "filestore_sloppy_crc_block_size",
5691 "filestore_max_alloc_hint_size",
5692 NULL
5693 };
5694 return KEYS;
5695 }
5696
5697 void FileStore::handle_conf_change(const struct md_config_t *conf,
5698 const std::set <std::string> &changed)
5699 {
5700 if (changed.count("filestore_max_inline_xattr_size") ||
5701 changed.count("filestore_max_inline_xattr_size_xfs") ||
5702 changed.count("filestore_max_inline_xattr_size_btrfs") ||
5703 changed.count("filestore_max_inline_xattr_size_other") ||
5704 changed.count("filestore_max_inline_xattrs") ||
5705 changed.count("filestore_max_inline_xattrs_xfs") ||
5706 changed.count("filestore_max_inline_xattrs_btrfs") ||
5707 changed.count("filestore_max_inline_xattrs_other") ||
5708 changed.count("filestore_max_xattr_value_size") ||
5709 changed.count("filestore_max_xattr_value_size_xfs") ||
5710 changed.count("filestore_max_xattr_value_size_btrfs") ||
5711 changed.count("filestore_max_xattr_value_size_other")) {
5712 if (backend) {
5713 Mutex::Locker l(lock);
5714 set_xattr_limits_via_conf();
5715 }
5716 }
5717
5718 if (changed.count("filestore_queue_max_bytes") ||
5719 changed.count("filestore_queue_max_ops") ||
5720 changed.count("filestore_expected_throughput_bytes") ||
5721 changed.count("filestore_expected_throughput_ops") ||
5722 changed.count("filestore_queue_low_threshhold") ||
5723 changed.count("filestore_queue_high_threshhold") ||
5724 changed.count("filestore_queue_high_delay_multiple") ||
5725 changed.count("filestore_queue_max_delay_multiple")) {
5726 Mutex::Locker l(lock);
5727 set_throttle_params();
5728 }
5729
5730 if (changed.count("filestore_min_sync_interval") ||
5731 changed.count("filestore_max_sync_interval") ||
5732 changed.count("filestore_kill_at") ||
5733 changed.count("filestore_fail_eio") ||
5734 changed.count("filestore_sloppy_crc") ||
5735 changed.count("filestore_sloppy_crc_block_size") ||
5736 changed.count("filestore_max_alloc_hint_size") ||
5737 changed.count("filestore_fadvise")) {
5738 Mutex::Locker l(lock);
5739 m_filestore_min_sync_interval = conf->filestore_min_sync_interval;
5740 m_filestore_max_sync_interval = conf->filestore_max_sync_interval;
5741 m_filestore_kill_at.set(conf->filestore_kill_at);
5742 m_filestore_fail_eio = conf->filestore_fail_eio;
5743 m_filestore_fadvise = conf->filestore_fadvise;
5744 m_filestore_sloppy_crc = conf->filestore_sloppy_crc;
5745 m_filestore_sloppy_crc_block_size = conf->filestore_sloppy_crc_block_size;
5746 m_filestore_max_alloc_hint_size = conf->filestore_max_alloc_hint_size;
5747 }
5748 if (changed.count("filestore_commit_timeout")) {
5749 Mutex::Locker l(sync_entry_timeo_lock);
5750 m_filestore_commit_timeout = conf->filestore_commit_timeout;
5751 }
5752 if (changed.count("filestore_dump_file")) {
5753 if (conf->filestore_dump_file.length() &&
5754 conf->filestore_dump_file != "-") {
5755 dump_start(conf->filestore_dump_file);
5756 } else {
5757 dump_stop();
5758 }
5759 }
5760 }
5761
5762 int FileStore::set_throttle_params()
5763 {
5764 stringstream ss;
5765 bool valid = throttle_bytes.set_params(
5766 cct->_conf->filestore_queue_low_threshhold,
5767 cct->_conf->filestore_queue_high_threshhold,
5768 cct->_conf->filestore_expected_throughput_bytes,
5769 cct->_conf->filestore_queue_high_delay_multiple,
5770 cct->_conf->filestore_queue_max_delay_multiple,
5771 cct->_conf->filestore_queue_max_bytes,
5772 &ss);
5773
5774 valid &= throttle_ops.set_params(
5775 cct->_conf->filestore_queue_low_threshhold,
5776 cct->_conf->filestore_queue_high_threshhold,
5777 cct->_conf->filestore_expected_throughput_ops,
5778 cct->_conf->filestore_queue_high_delay_multiple,
5779 cct->_conf->filestore_queue_max_delay_multiple,
5780 cct->_conf->filestore_queue_max_ops,
5781 &ss);
5782
5783 logger->set(l_filestore_op_queue_max_ops, throttle_ops.get_max());
5784 logger->set(l_filestore_op_queue_max_bytes, throttle_bytes.get_max());
5785
5786 if (!valid) {
5787 derr << "tried to set invalid params: "
5788 << ss.str()
5789 << dendl;
5790 }
5791 return valid ? 0 : -EINVAL;
5792 }
5793
5794 void FileStore::dump_start(const std::string& file)
5795 {
5796 dout(10) << "dump_start " << file << dendl;
5797 if (m_filestore_do_dump) {
5798 dump_stop();
5799 }
5800 m_filestore_dump_fmt.reset();
5801 m_filestore_dump_fmt.open_array_section("dump");
5802 m_filestore_dump.open(file.c_str());
5803 m_filestore_do_dump = true;
5804 }
5805
5806 void FileStore::dump_stop()
5807 {
5808 dout(10) << "dump_stop" << dendl;
5809 m_filestore_do_dump = false;
5810 if (m_filestore_dump.is_open()) {
5811 m_filestore_dump_fmt.close_section();
5812 m_filestore_dump_fmt.flush(m_filestore_dump);
5813 m_filestore_dump.flush();
5814 m_filestore_dump.close();
5815 }
5816 }
5817
5818 void FileStore::dump_transactions(vector<ObjectStore::Transaction>& ls, uint64_t seq, OpSequencer *osr)
5819 {
5820 m_filestore_dump_fmt.open_array_section("transactions");
5821 unsigned trans_num = 0;
5822 for (vector<ObjectStore::Transaction>::iterator i = ls.begin(); i != ls.end(); ++i, ++trans_num) {
5823 m_filestore_dump_fmt.open_object_section("transaction");
5824 m_filestore_dump_fmt.dump_string("osr", osr->get_name());
5825 m_filestore_dump_fmt.dump_unsigned("seq", seq);
5826 m_filestore_dump_fmt.dump_unsigned("trans_num", trans_num);
5827 (*i).dump(&m_filestore_dump_fmt);
5828 m_filestore_dump_fmt.close_section();
5829 }
5830 m_filestore_dump_fmt.close_section();
5831 m_filestore_dump_fmt.flush(m_filestore_dump);
5832 m_filestore_dump.flush();
5833 }
5834
5835 void FileStore::set_xattr_limits_via_conf()
5836 {
5837 uint32_t fs_xattr_size;
5838 uint32_t fs_xattrs;
5839 uint32_t fs_xattr_max_value_size;
5840
5841 switch (m_fs_type) {
5842 #if defined(__linux__)
5843 case XFS_SUPER_MAGIC:
5844 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_xfs;
5845 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_xfs;
5846 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_xfs;
5847 break;
5848 case BTRFS_SUPER_MAGIC:
5849 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_btrfs;
5850 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_btrfs;
5851 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_btrfs;
5852 break;
5853 #endif
5854 default:
5855 fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_other;
5856 fs_xattrs = cct->_conf->filestore_max_inline_xattrs_other;
5857 fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_other;
5858 break;
5859 }
5860
5861 // Use override value if set
5862 if (cct->_conf->filestore_max_inline_xattr_size)
5863 m_filestore_max_inline_xattr_size = cct->_conf->filestore_max_inline_xattr_size;
5864 else
5865 m_filestore_max_inline_xattr_size = fs_xattr_size;
5866
5867 // Use override value if set
5868 if (cct->_conf->filestore_max_inline_xattrs)
5869 m_filestore_max_inline_xattrs = cct->_conf->filestore_max_inline_xattrs;
5870 else
5871 m_filestore_max_inline_xattrs = fs_xattrs;
5872
5873 // Use override value if set
5874 if (cct->_conf->filestore_max_xattr_value_size)
5875 m_filestore_max_xattr_value_size = cct->_conf->filestore_max_xattr_value_size;
5876 else
5877 m_filestore_max_xattr_value_size = fs_xattr_max_value_size;
5878
5879 if (m_filestore_max_xattr_value_size < cct->_conf->osd_max_object_name_len) {
5880 derr << "WARNING: max attr value size ("
5881 << m_filestore_max_xattr_value_size
5882 << ") is smaller than osd_max_object_name_len ("
5883 << cct->_conf->osd_max_object_name_len
5884 << "). Your backend filesystem appears to not support attrs large "
5885 << "enough to handle the configured max rados name size. You may get "
5886 << "unexpected ENAMETOOLONG errors on rados operations or buggy "
5887 << "behavior"
5888 << dendl;
5889 }
5890 }
5891
5892 uint64_t FileStore::estimate_objects_overhead(uint64_t num_objects)
5893 {
5894 uint64_t res = num_objects * blk_size / 2; //assumes that each object uses ( in average ) additional 1/2 block due to FS allocation granularity.
5895 return res;
5896 }
5897
5898 int FileStore::apply_layout_settings(const coll_t &cid)
5899 {
5900 dout(20) << __func__ << " " << cid << dendl;
5901 Index index;
5902 int r = get_index(cid, &index);
5903 if (r < 0) {
5904 dout(10) << "Error getting index for " << cid << ": " << cpp_strerror(r)
5905 << dendl;
5906 return r;
5907 }
5908
5909 return index->apply_layout_settings();
5910 }
5911
5912
5913 // -- FSSuperblock --
5914
5915 void FSSuperblock::encode(bufferlist &bl) const
5916 {
5917 ENCODE_START(2, 1, bl);
5918 compat_features.encode(bl);
5919 ::encode(omap_backend, bl);
5920 ENCODE_FINISH(bl);
5921 }
5922
5923 void FSSuperblock::decode(bufferlist::iterator &bl)
5924 {
5925 DECODE_START(2, bl);
5926 compat_features.decode(bl);
5927 if (struct_v >= 2)
5928 ::decode(omap_backend, bl);
5929 else
5930 omap_backend = "leveldb";
5931 DECODE_FINISH(bl);
5932 }
5933
5934 void FSSuperblock::dump(Formatter *f) const
5935 {
5936 f->open_object_section("compat");
5937 compat_features.dump(f);
5938 f->dump_string("omap_backend", omap_backend);
5939 f->close_section();
5940 }
5941
5942 void FSSuperblock::generate_test_instances(list<FSSuperblock*>& o)
5943 {
5944 FSSuperblock z;
5945 o.push_back(new FSSuperblock(z));
5946 CompatSet::FeatureSet feature_compat;
5947 CompatSet::FeatureSet feature_ro_compat;
5948 CompatSet::FeatureSet feature_incompat;
5949 feature_incompat.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS);
5950 z.compat_features = CompatSet(feature_compat, feature_ro_compat,
5951 feature_incompat);
5952 o.push_back(new FSSuperblock(z));
5953 z.omap_backend = "rocksdb";
5954 o.push_back(new FSSuperblock(z));
5955 }